import discord, subprocess, glob, os, os.path, urllib.request, ffmpeg import re, time, datetime, yt_dlp, typing, functools from time import strftime, gmtime from yt_dlp import YoutubeDL from asyncio import sleep from discord.ext import commands from StringProgressBar import progressBar import uuid import logging logging.basicConfig(level=logging.DEBUG) intents = discord.Intents.default() intents.message_content = True # Dictionary to store queues between servers queue_list = {} server_info = {} downloading = 0 paused = False # Create a new Discord client bot = commands.Bot(command_prefix="!",intents=discord.Intents.all(), shard_count=1) @bot.event async def on_ready(): print("Bot is ready!") await bot.change_presence(activity=discord.Activity(type=discord.ActivityType.listening, name="Nanahira")) async def delete_after_delay(ctx, delay): await sleep(delay) await ctx.message.delete() async def get_ids(ctx): await ctx.message.delete(delay=3) server_id = ctx.message.guild.id voice_channel = ctx.message.guild.voice_client user_voice_channel = ctx.author.voice return server_id, voice_channel, user_voice_channel async def getFileNames(server_id): downloading = 1 # Get the current unix timestamp to the nearest millisecond for the filename uuid_stamp = uuid.uuid1() audioname = str(uuid_stamp) + "audio" thumbname = str(uuid_stamp) + "image" # Create the id and thumbnail of the attachment as "tmp.flac" and "tmp.png" respectively # And add the server ID as the path, making it unique audioname = os.path.join(str(server_id), audioname) thumbname = os.path.join(str(server_id), thumbname) return audioname, thumbname @bot.command() async def play(ctx, *, query: str = None): # Get all the id and channel info server_id, voice_channel, user_voice_channel = await get_ids(ctx) # Create a queue and info dictionary for the current server if server_id not in queue_list: queue_list[server_id] = [1] server_info[server_id] = { "loop": False } # Create the server ID folder for storing downloads if not os.path.exists(str(server_id)): os.makedirs(str(server_id)) global paused # If the user is not in a voice channel, return an error message if user_voice_channel is None: await ctx.send("You must be in a voice channel to use this command.", delete_after=3) return # Connect to the voice channel if not already connected if not voice_channel: voice_channel = await user_voice_channel.channel.connect() # If the user is not in a voice channel or the voice channel is not the same one as the bot, return an error message if user_voice_channel.channel != voice_channel.channel: await ctx.send("You must be in the same voice channel as the bot to use this command.", delete_after=3) return if query == None and not ctx.message.attachments: # If the bot is already playing, return an error message if voice_channel.is_playing(): await ctx.send(":no_entry_sign: The bot is already playing", delete_after=3) return # If the bot is paused, resume playback if voice_channel.is_paused(): voice_channel.resume() paused = False return await ctx.send(":no_entry_sign: Must input a valid query or attachment.", delete_after=3) await voice_channel.disconnect() return if voice_channel.is_playing(): await ctx.send("The bot is already playing, adding song to queue", delete_after=3) print("1"); if ctx.message.attachments: downloading = 1 notice = await ctx.send(":arrow_double_up: Uploading...", suppress_embeds=True) for song in ctx.message.attachments: filename, thumbname = await getFileNames(server_id) # Make sure the file is either audio or video filetype = song.content_type if filetype.split('/')[0] != "audio" and filetype.split('/')[0] != "video": await notice.edit(content=":no_entry_sign: Not a valid video or audio file...") continue await song.save(filename) # Grab thumbnail from file f = open("tmp", "w") subprocess.run(["ffmpeg", "-y", "-i", filename, "-map", "0:v", "-map", "-0:V", "-c", "copy", thumbname], stderr=subprocess.STDOUT, stdout=f) # Grab metadata from file f = open("tmp", "w") subprocess.run(["ffmpeg", "-y", "-i", filename, "-f", "ffmetadata", str(server_id) + "/meta.txt"], stderr=subprocess.STDOUT, stdout=f) file_title = song.filename file_artist = None file_album = None with open(str(server_id) + '/meta.txt', 'r') as m: lines = m.readlines() file_artist = "Unknown" file_album = "Unknown" for line in lines: if re.search(r'TITLE=', line, re.IGNORECASE): file_title = line.split('=')[1] break for line in lines: if re.search(r'ARTIST=', line, re.IGNORECASE): file_artist = line.split('=')[1] break for line in lines: if re.search(r'ALBUM=', line, re.IGNORECASE): file_album = line.split('=')[1] break if os.path.exists(thumbname): thumbnail = thumbname else: thumbnail = "assets/unknown.png" try: duration = ffmpeg.probe(filename)['format']['duration'] except: duration = None # Create the item dictionary item = { "name": file_title.rstrip(), "artist": file_artist.rstrip(), "album": file_album.rstrip(), "url": song.url, "id": filename, "thumbnail": thumbnail, "duration": duration } await notice.edit(content=":white_check_mark: Successfully uploaded \"" + song.filename + "\"") queue_list[server_id].append(item) downloading = 0 await notice.edit(content=":white_check_mark: Successfully uploaded \"" + song.filename + "\"", delete_after=3) elif query[0:4] != "http" and query[0:3] != "www": filename, thumbname = await getFileNames(server_id) # Let the user know the bot is searching for a video notice = await ctx.send(":mag_right: Searching for \"" + query + "\" ...", suppress_embeds=True) # Search metadata for youtube video with yt_dlp.YoutubeDL({'quiet': True}) as ydl: info = ydl.extract_info(f"ytsearch:{query}", download = False)["entries"][0] title = info["title"] audio_url = info["webpage_url"] thumb_url = info["thumbnail"] duration = info["duration"] print(str(server_id) + " | " + audio_url) # Create the item dictionary item = { "name": title, "artist": None, "album": None, "url": audio_url, "id": filename, "thumbnail": None, "thumbnail_url": thumb_url, "duration": duration } await notice.edit(content=":white_check_mark: Downloaded " + title + ": " + audio_url, suppress=True, delete_after=3) queue_list[server_id].append(item) downloading = 0 elif query[0:4] == "http" or query[0:3] == "www": filename, thumbname = await getFileNames(server_id) # Let the user know the bot is searching for a video notice = await ctx.send(":mag_right: Searching for \"" + query + "\" ...", suppress_embeds=True) if query[0:17] != "https://www.youtu" and query[0:13] != "https://youtu": await notice.edit(content=":no_entry_sign: Must input a valid query or attachment.", delete_after=3) await voice_channel.disconnect() return # Search metadata for youtube video with yt_dlp.YoutubeDL({'quiet': True}) as ydl: info = ydl.extract_info(query, download = False) title = info["title"] thumb_url = info["thumbnail"] duration = info["duration"] print(str(server_id) + " | " + query) # Create the item dictionary item = { "name": title, "artist": None, "album": None, "url": query, "id": filename, "thumbnail": None, "thumbnail_url": thumb_url, "duration": duration } await notice.edit(content=":white_check_mark: Added \"" + title + "\": " + query, suppress=True, delete_after=3) queue_list[server_id].append(item) downloading = 0 else: print("Error") await ctx.send("Something went wrong, please try a different query.", delete_after=3) return print(str(server_id) + " | " + str(item)) if voice_channel.is_playing() or downloading == 1: return embed = discord.Embed(title="▶️ Playing: ", description="Name: " + "\nURL: ", color=0x42f5a7) playing = await ctx.send(embed=embed) # Loop that repeats as long as the queue position has not reached the length of the queue while len(queue_list[server_id]) - 1 >= queue_list[server_id][0]: # Get the current queue position queue_position = queue_list[server_id][0] # Set song variables song_id = queue_list[server_id][queue_position]['id'] song_url = queue_list[server_id][queue_position]['url'] song_name = queue_list[server_id][queue_position]['name'] song_thumb = queue_list[server_id][queue_position]['thumbnail'] song_duration = queue_list[server_id][queue_position]['duration'] song_thumb_url = queue_list[server_id][queue_position]['thumbnail_url'] song_thumbname = str(int(time.time())) + ".png" # Create the embed if queue_list[server_id][queue_position]['artist'] and queue_list[server_id][queue_position]['album']: song_desc = "Name: " + song_name + "\nArtist: " + queue_list[server_id][queue_position]['artist'] + "\nAlbum: " + queue_list[server_id][queue_position]['album'] else: song_desc = "Name: " + song_name + "\nURL: " + song_url embed=discord.Embed(title="▶️ Playing: " + song_name, url=song_url, description=song_desc, color=0x42f5a7) if song_thumb is not None: await playing.add_files(discord.File(song_thumb, filename=song_thumbname)) embed.set_thumbnail(url="attachment://" + song_thumbname) elif song_thumb_url is not None: embed.set_thumbnail(url=song_thumb_url) await playing.edit(embed=embed) song_source = None pipe = False if song_url is not None: song_source = subprocess.Popen( ['yt-dlp', '-q', '-o', '-', '-x', song_url], stdout=subprocess.PIPE, ).stdout pipe = True print("Playing song through yt-dlp") else: print("Playing song from file") song_source = song_id # Play the converted audio in the voice channel from the temporary file # or the FFMPEG stream player = voice_channel.play(discord.FFmpegOpusAudio(source=song_source, pipe=pipe)) time1 = int(time.time()) total = int(float(song_duration)) # Wait for audio to finish playing while voice_channel.is_playing() or voice_channel.is_paused(): await sleep(1) time2 = int(time.time()) if paused: time1 = time2 - current else: current = time2 - time1 bardata = progressBar.splitBar(total, current, size=20) # Create embed if not paused: embed=discord.Embed(title="▶️ Playing: " + song_name, url=song_url, description=song_desc, color=0x42f5a7) else: embed=discord.Embed(title="⏸ Paused: " + song_name, url=song_url, description=song_desc, color=0x42f5a7) if song_thumb is not None: embed.set_thumbnail(url="attachment://" + song_thumbname) elif song_thumb_url is not None: embed.set_thumbnail(url=song_thumb_url) embed.add_field( name=str(datetime.timedelta(seconds=current)) + "/" + str(datetime.timedelta(seconds=total)), value=bardata[0], inline=False ) await playing.edit(embed=embed) if server_info[server_id]["loop"]: continue # Increment the queue position by 1 try: queue_list[server_id][0] += 1 except: print(str(server_id) + " | " + "Queue position out of range.") break print(str(server_id) + " | " + "Play position: " + str(queue_position)) try: await q(ctx) except: pass # Display the stop embed try: await q(ctx, "hide") except: pass embed=discord.Embed(title="⏹️ Finished Queue: " + song_name, url=song_url, description="Name: " + song_name + "\nURL: " + song_url, color=0x42f5a7) embed.set_thumbnail(url="attachment://" + song_thumbname) await playing.edit(embed=embed) await ctx.send("Finished queue, disconnecting.", suppress_embeds=True, delete_after=3) print(str(server_id) + " | " + "Queue finished.") # Disconnect from the voice channel if the loop finishes await voice_channel.disconnect() queue_list[server_id].clear() queue_list[server_id].insert(0, 1) # Remove all queued files and folders fileList = glob.glob(os.path.join(str(server_id),'*')) for filePath in fileList: try: os.remove(filePath) except OSError: print("Error while deleting file") os.rmdir(str(server_id)) @bot.command() async def skip(ctx, direction = None, number = None): # Get all the id and channel info server_id, voice_channel, user_voice_channel = await get_ids(ctx) if voice_channel is None: await ctx.send(":no_entry_sign: Bot must be playing to skip!", delete_after=3) return if user_voice_channel is None or user_voice_channel.channel != voice_channel.channel: # If the user is not in a voice channel, return an error message await ctx.send(":no_entry_sign: You must be in the same voice channel as the bot to use this command.", delete_after=3) return try: number = int(number) except: pass # Check which direction the user asked to skip if direction is None or direction == "forward" or direction.isnumeric(): try: if direction.isnumeric(): if int(direction) > 0: queue_list[server_id][0] += int(direction) - 1 except: pass if number != None and number > 0: queue_list[server_id][0] += number - 1 # Stop the audio playback voice_channel.stop() elif direction == "back" and not queue_list[server_id][0] == 1: # Decrement the queue position back = 2 # Decrement the queue position if the number is set if number != None and number > 0: back = number + 1 queue_list[server_id][0] -= back # Stop the audio playback of the current track voice_channel.stop() elif direction == "to" and number is not None: queue_list[server_id][0] = number - 1 voice_channel.stop() elif queue_list[server_id][0] == 1: await ctx.send(":no_entry_sign: Already at first song in queue.", delete_after=3) else: await ctx.send(":no_entry_sign: Invalid argument.", delete_after=3) await sleep(2) try: await q(ctx) except: pass @bot.command() async def stop(ctx): # Get all the id and channel info server_id, voice_channel, user_voice_channel = await get_ids(ctx) if voice_channel is None: await ctx.send(":no_entry_sign: Bot must be playing to stop!", delete_after=3) return if user_voice_channel is None or user_voice_channel.channel != voice_channel.channel: # If the user is not in a voice channel, return an error message await ctx.send(":no_entry_sign: You must be in the same voice channel as the bot to use this command.", delete_after=3) return queue_list[server_id].clear() queue_list[server_id].insert(0, 1) await sleep(0.5) voice_channel.stop() await voice_channel.disconnect() @bot.command() async def pause(ctx): # Get all the id and channel info server_id, voice_channel, user_voice_channel = await get_ids(ctx) if voice_channel is None or not voice_channel.is_playing(): await ctx.send(":no_entry_sign: Bot must be playing to pause!", delete_after=3) return if user_voice_channel is None or user_voice_channel.channel != voice_channel.channel: # If the user is not in a voice channel, return an error message await ctx.send(":no_entry_sign: You must be in the same voice channel as the bot to use this command.", delete_after=3) return voice_channel.pause() global paused paused = True @bot.command() async def queue(ctx, action = None, selection = None): await q(ctx, action, selection) @bot.command() async def q(ctx, action = None, selection = None): # Get all the id and channel info server_id, voice_channel, user_voice_channel = await get_ids(ctx) global queue_embed if action == "hide": try: queue_embed except: pass else: if queue_embed is None: pass else: await queue_embed.delete() queue_embed = None del queue_embed return if voice_channel is None: await ctx.send(":no_entry_sign: Bot must be in a channel to view the queue!", delete_after=3) return if user_voice_channel is None or user_voice_channel.channel != voice_channel.channel: # If the user is not in a voice channel, return an error message await ctx.send(":no_entry_sign: You must be in the same voice channel as the bot to use this command.", delete_after=3) return if action == "show" or action == "list" or action == None: if action == "show": try: await q(ctx, "hide") except: pass print(str(server_id) + " | " + "Updating queue, position: " + str(queue_list[server_id][0])) x = 0 qu = "" d = "" p = "" now_playing = "⠀" for entry in queue_list[server_id]: if x != 0: if x == queue_list[server_id][0]: now_playing = ":arrow_right:" else: now_playing = "⠀" if len(entry['name']) >= 30: entry_cut = entry['name'][0:30] else: entry_cut = entry['name'] p += now_playing + "\n" qu += "**" + str(x) + ":** " + entry_cut + "\n" if str(strftime("%H", gmtime(int(float(entry['duration'])))))[0:1] == "00": d += str(strftime("%M:%S", gmtime(int(float(entry['duration']))))) + "\n" else: d += str(strftime("%H:%M:%S", gmtime(int(float(entry['duration']))))) + "\n" x += 1 embed=discord.Embed(title="Queue:", description="", color=0xa032a8) try: embed.add_field(name="⠀", value=p, inline=True) embed.add_field(name="List", value=qu, inline=True) embed.add_field(name="Length", value=d, inline=True) except: embed.add_field(name="List", value="Queue is **empty**", inline=False) try: queue_embed except: queue_embed = await ctx.send(embed=embed) else: if queue_embed is None: queue_embed = await ctx.send(embed=embed) else: await queue_embed.edit(embed=embed) return if action == "remove": print(str(server_id) + " | " + "Removing item #" + str(selection) + " from queue") selection = int(selection) position = queue_list[server_id][0] id = queue_list[server_id][selection]['id'] if queue_list[server_id][selection]['thumbnail'] != "/assets/unknown.png": thumbnail = None else: thumbnail = queue_list[server_id][selection]['thumbnail'] if selection is position: await ctx.send(":no_entry_sign: Error, cannot remove currently playing item", delete_after=3) return if selection != 0 and not int(selection) > len(queue_list[server_id]): try: os.remove(id) if not thumbnail is None: os.remove(thumbnail) else: pass except OSError: print(str(server_id) + " | " + "Error while deleting song or thumbnail") pass if selection < position and position > 1: try: queue_list[server_id][0] -= 1 except: print(str(server_id) + " | " + "Queue position out of range.") pass await ctx.send(":white_check_mark: Removed item #" + str(selection) + " from queue.", delete_after=3) queue_list[server_id].pop(selection) await q(ctx) else: await ctx.send(":no_entry_sign: Error, item #" + str(selection) + "not a valid queue item", delete_after=3) return @bot.command() async def loop(ctx, number = None): server_id, voice_channel, user_voice_channel = await get_ids(ctx) if voice_channel is None or not voice_channel.is_playing() and not voice_channel.is_paused(): await ctx.send(":no_entry_sign: Bot must be playing to loop!", delete_after=3) return if user_voice_channel is None or user_voice_channel.channel != voice_channel.channel: # If the user is not in a voice channel, return an error message await ctx.send(":no_entry_sign: You must be in the same voice channel as the bot to use this command.", delete_after=3) return if server_info[server_id]["loop"] is False: print(str(server_id) + " | " + "Looping current song.") server_info[server_id]["loop"] = True print(server_info[server_id]["loop"]) return else: print(str(server_id) + " | " + "Not looping current song.") server_info[server_id]["loop"] = False print(server_info[server_id]["loop"]) return @bot.event async def on_command_error(ctx, error): server_id, voice_channel, user_voice_channel = await get_ids(ctx) if isinstance(error, commands.CommandNotFound): await ctx.send("Unknown command", delete_after=3) # Run the bot using the Discord bot token bot.run("<>")