simple-discord-music-bot/src/oden/main.py

602 lines
23 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import io
import discord
import subprocess
import glob
import os
import os.path
import time
import datetime
import yt_dlp
import logging
from time import strftime, gmtime
from asyncio import sleep
from discord.ext import commands
from StringProgressBar import progressBar
from pprint import pprint
## LINTING / TYPING ##
#####
from typing import Optional, TypedDict
## LOCAL IMPORTS ##
#####
import oden.utils as utils
from oden.utils import QueueItem
logging.basicConfig()
ServerInfo = TypedDict("ServerInfo", {
"loop": bool,
"paused": bool,
"elapsed": int,
"queue_position": int,
"queue": list[QueueItem],
})
# Dictionary to store queues for individual servers
server_info: dict[str, ServerInfo] = {}
# Create a new Discord client
bot = commands.Bot(
command_prefix = "!",
intents = discord.Intents.all(),
status = discord.Status.dnd,
activity = discord.Activity(
type = discord.ActivityType.listening,
name = "ななひら",
),
shard_count = 1
)
@bot.event
async def on_ready():
print("Bot is ready!")
@bot.command()
async def play(ctx, *, query: Optional[str] = None):
# Get all the id and channel info
server_id, voice_channel, user_voice_channel = await utils.getIds(ctx)
# Create a queue and info dictionary for the current server
if server_id not in server_info:
server_info[server_id] = {
"loop": False,
"paused": False,
"elapsed": 0,
"queue_position": 0,
"queue": [],
}
# Create the server ID folder for storing downloads
if not os.path.exists(str(server_id)):
os.makedirs(str(server_id))
global paused
# If the user is not in a voice channel, return an error message
if user_voice_channel is None:
await ctx.send(":no_entry_sign: You must be in a voice channel to use this command.")
return
# Connect to the voice channel if not already connected
if not voice_channel:
voice_channel = await user_voice_channel.channel.connect()
# If the user is not in a voice channel or the voice channel is not the same one as the bot, return an error message
if await utils.notSameChannel(ctx):
await ctx.send(":no_entry_sign: You must be in the same voice channel as the bot to use this command.")
return
if query is None and not ctx.message.attachments:
# If the bot is already playing, return an error message
if voice_channel.is_playing():
await ctx.send(":no_entry_sign: The bot is already playing")
return
# If the bot is paused, resume playback
if voice_channel.is_paused():
voice_channel.resume()
paused = False
return
await ctx.send(":no_entry_sign: Must input a valid query or attachment.", delete_after=3)
await voice_channel.disconnect()
return
if query is None and ctx.message.attachments:
success_list = []
notice = await ctx.send(":arrow_double_up: Uploading...", suppress_embeds=True)
success_string = "File Status: \n"
for song in ctx.message.attachments:
filename, thumbname = utils.getFileNames(server_id)
# Make sure the file is either audio or video
filetype = song.content_type
if filetype is None or (filetype.split("/")[0] != "audio" and filetype.split("/")[0] != "video"):
success_string += ":no_entry_sign: — `" + song.filename + "`, invalid file!\n"
continue
# Save the song to the temp folder
await song.save(filename)
# Get all the info about the file and create an "item" for it
item = utils.parseMediaFile(song, filename, thumbname)
success_list.append(item["name"])
success_string += ":white_check_mark: — `" + item["name"] + "`\n"
await notice.edit(content=success_string)
server_info[server_id]["queue"].append(item)
print(str(server_id) + " | " + f"Uploaded {song.filename}")
if len(success_list) > 0:
await notice.edit(content=success_string, delete_after=3)
else:
await notice.edit(content=":no_entry_sign: No files successfully uploaded.", delete_after=3)
return
elif query is not None and (query[0:4] != "http" and query[0:3] != "www"):
filename, _ = utils.getFileNames(server_id)
# Let the user know the bot is searching for a video
notice = await ctx.send(f":mag_right: Searching for \"{query}\" ...", suppress_embeds=True)
# Search metadata for youtube video
with yt_dlp.YoutubeDL({"quiet": True}) as ydl:
search_list = ydl.extract_info(f"ytsearch:{query}", download = False)
if search_list is None or len(search_list["entries"]) == 0:
await notice.edit(content=":question: No songs found for query, try something else!", delete_after=3)
return
info = search_list["entries"][0]
title = info["title"]
audio_url = info["webpage_url"]
thumb_url = info["thumbnail"]
duration = info["duration"]
print(str(server_id) + " | " + audio_url)
# Create the item dictionary
item: QueueItem = {
"name": title,
"artist": None,
"album": None,
"url": audio_url,
"id": filename,
"thumbnail": None,
"thumbnail_url": thumb_url,
"duration": int(float(duration)),
}
await notice.edit(content=":white_check_mark: Found " + title + ": " + audio_url, suppress=True, delete_after=3)
server_info[server_id]["queue"].append(item)
elif query is not None and (query[0:4] == "http" or query[0:3] == "www"):
filename, _ = utils.getFileNames(server_id)
notice = await ctx.send(":mag_right: Adding video \"" + query + "\" ...", suppress_embeds=True)
if "youtube" not in query and "youtu.be" not in query:
await notice.edit(content=":no_entry_sign: Must be a valid youtube link.", delete_after=3)
await voice_channel.disconnect()
return
# Search metadata for youtube video
with yt_dlp.YoutubeDL({"quiet": True}) as ydl:
info = ydl.extract_info(query, download = False)
pprint(info)
if info is None:
await notice.edit(content=":no_entry_sign: Could not get metadata.", delete_after=3)
return
title = info["title"]
thumb_url = info["thumbnail"]
duration = info["duration"]
print(str(server_id) + " | " + query)
# Create the item dictionary
item = {
"name": title,
"artist": None,
"album": None,
"url": query,
"id": filename,
"thumbnail": None,
"thumbnail_url": thumb_url,
"duration": int(float(duration)),
}
await notice.edit(content=":white_check_mark: Found \"" + title + "\": " + query, suppress=True, delete_after=3)
server_info[server_id]["queue"].append(item)
else:
print("Error")
await ctx.send("Something went wrong, please try a different query.", delete_after=3)
return
# It's already playing, don't start another playback stream!
if voice_channel.is_playing():
return
# Loop that repeats as long as the queue position has not reached the length of the queue
while len(server_info[server_id]["queue"]) >= server_info[server_id]["queue_position"]:
embed = discord.Embed(title="▶️ Playing: ", description="Name: " + "\nURL: ", color=0x42f5a7)
playing = await ctx.send(embed=embed)
# Get the current queue item
queue_position = server_info[server_id]["queue_position"]
queue = server_info[server_id]["queue"]
current_item = queue[queue_position]
# Set song variables
song_id = current_item["id"]
song_url = current_item["url"]
song_name = current_item["name"]
song_thumb = current_item["thumbnail"]
song_duration = current_item["duration"]
song_thumb_url = current_item["thumbnail_url"]
song_thumbname = str(int(time.time())) + ".png"
color = 0x42f5a7
# Create the embed
if current_item["artist"] and current_item["album"]:
song_desc = "Artist: " + current_item["artist"] + "\nAlbum: " + current_item["album"]
else:
song_desc = ""
embed = discord.Embed(title=":arrow_forward: Playing: " + song_name, url=song_url, description=song_desc, color=color)
if song_thumb is not None:
await playing.add_files(discord.File(song_thumb, filename=song_thumbname))
embed.set_thumbnail(url="attachment://" + song_thumbname)
elif song_thumb_url is not None:
embed.set_thumbnail(url=song_thumb_url)
await playing.edit(embed = embed)
song_source = None
pipe = False
if song_url is not None and song_url != "":
song_source = subprocess.Popen(
["yt-dlp", "-q", "-o", "-", "-x", song_url],
stdout=subprocess.PIPE,
).stdout
if song_source is None:
await notice.edit(content=":no_entry_sign: Could not get metadata.", delete_after=3)
return
song_source = io.BufferedReader(song_source) # type: ignore
pipe = True
print(str(server_id) + " | " + "Playing song through yt-dlp")
else:
print(str(server_id) + " | " + "Playing song from file")
song_source = song_id
# Play the converted audio in the voice channel from the temporary file
# or the FFMPEG stream
_player = voice_channel.play(discord.FFmpegPCMAudio(source=song_source, pipe=pipe))
time1 = int(time.time())
total = song_duration
# Wait for audio to finish playing
while voice_channel.is_playing() or voice_channel.is_paused():
await sleep(1)
time2 = int(time.time())
current = time2 - time1
server_info[server_id]["elapsed"] = current
bardata = progressBar.filledBar(total, current, size=20) # type: ignore
# Create embed
embed=discord.Embed(title="▶️ Playing: " + song_name, url=song_url, description=song_desc, color=color)
if song_thumb is not None:
embed.set_thumbnail(url="attachment://" + song_thumbname)
elif song_thumb_url is not None:
embed.set_thumbnail(url=song_thumb_url)
embed.add_field(
name=str(datetime.timedelta(seconds=current)) + "/" + str(datetime.timedelta(seconds=total)),
value=bardata[0],
inline=False
)
await playing.edit(embed=embed)
embed = discord.Embed(title=":stop_button: Finished Playing: " + song_name, url=song_url, description=song_desc, color=color)
if song_thumb is not None:
embed.set_thumbnail(url="attachment://" + song_thumbname)
elif song_thumb_url is not None:
embed.set_thumbnail(url=song_thumb_url)
await playing.edit(embed=embed)
server_info[server_id]["elapsed"] = 0
if not server_info[server_id]["loop"]:
# Increment the queue position by 1
server_info[server_id]["queue_position"] += 1
# If it is not possible to get the queue position wanted, break!
try:
server_info[server_id]["queue"][server_info[server_id]["queue_position"]]
except IndexError:
print(str(server_id) + " | " + "Queue position out of range.")
break
print(str(server_id) + " | " + "Play position: " + str(queue_position))
print(str(server_id) + " | " + "Queue finished.")
# Disconnect from the voice channel if the loop finishes
await voice_channel.disconnect()
server_info[server_id]["queue"].clear()
server_info[server_id]["queue_position"] = 0
# Remove all queued files and folders... This is a bit dangerous, maybe it
# should be made failsafe somehow? TODO
fileList = glob.glob(os.path.join(str(server_id), "*"))
for filePath in fileList:
try:
os.remove(filePath)
except OSError:
print("Error while deleting file ", filePath)
os.rmdir(str(server_id))
@bot.command()
async def skip(ctx, command: Optional[str] = None, number: Optional[int] = None):
# Get all the id and channel info
server_id, voice_channel, user_voice_channel = await utils.getIds(ctx)
if voice_channel is None:
await ctx.send(":no_entry_sign: Bot must be playing to skip!", delete_after=3)
return
if await utils.notSameChannel(ctx):
# If the user is not in a voice channel, return an error message
await ctx.send(":no_entry_sign: You must be in the same voice channel as the bot to use this command.", delete_after=3)
return
if number is not None:
try:
number = int(number)
except:
pass
queue_length = len(server_info[server_id]["queue"])
queue_position = server_info[server_id]["queue_position"]
# Check which direction the user asked to skip
if command is None or command == "forward" or command.isnumeric():
if command is not None and command.isnumeric():
# If command is just a number, eg. `!skip 2`
command_int = int(command)
if command_int > 0 and not queue_position + command_int > queue_length:
server_info[server_id]["queue_position"] += int(command) - 1
else:
await ctx.send(f":no_entry_sign: Invalid skip number! Max is {queue_length - queue_position}.")
return
elif number is not None and number > 0:
if not queue_position + number > queue_length:
server_info[server_id]["queue_position"] += number - 1
else:
await ctx.send(f":no_entry_sign: Invalid skip number! Max is {queue_length - queue_position}.")
return
# Stop the audio playback
voice_channel.stop()
elif command == "back" and not server_info[server_id]["queue_position"] == 0:
# Decrement the queue position
back = 2
# Decrement the queue position if the number is set
if number is not None and number > 0:
back = number + 1
if queue_position - back < 0:
return
server_info[server_id]["queue_position"] -= back
# Stop the audio playback of the current track
voice_channel.stop()
elif command == "to" and number is not None:
if number <= 0:
await ctx.send(":no_entry_sign: Cannot skip to negative number or 0!")
return
elif number >= queue_length:
await ctx.send(f":no_entry_sign: Invalid skip-to number! Max is {queue_length}.")
return
server_info[server_id]["queue_position"] = number - 2
voice_channel.stop()
elif server_info[server_id]["queue_position"] == 0:
await ctx.send(":no_entry_sign: Already at first song in queue.")
else:
await ctx.send(":no_entry_sign: Invalid argument.")
@bot.command()
async def stop(ctx):
# Get all the id and channel info
server_id, voice_channel, user_voice_channel = await utils.getIds(ctx)
if voice_channel is None:
await ctx.send(":no_entry_sign: Bot must be playing to stop!", delete_after=3)
return
if await utils.notSameChannel(ctx):
# If the user is not in a voice channel, return an error message
await ctx.send(":no_entry_sign: You must be in the same voice channel as the bot to use this command.", delete_after=3)
return
server_info[server_id]["queue"].clear()
server_info[server_id]["queue_position"] = 0
await sleep(0.5)
voice_channel.stop()
await voice_channel.disconnect()
#
# @bot.command()
# async def pause(ctx):
# # Get all the id and channel info
# server_id, voice_channel, user_voice_channel = await utils.getIds(ctx)
#
# if voice_channel is None or not voice_channel.is_playing():
# await ctx.send(":no_entry_sign: Bot must be playing to pause!", delete_after=3)
# return
# if user_voice_channel is None or user_voice_channel.channel != voice_channel.channel:
# # If the user is not in a voice channel, return an error message
# await ctx.send(":no_entry_sign: You must be in the same voice channel as the bot to use this command.", delete_after=3)
# return
#
# voice_channel.pause()
@bot.command()
async def queue(ctx, action = None, selection = None):
await q(ctx, action, selection)
@bot.command()
async def q(ctx, action = None, selection = None):
# Get all the id and channel info
server_id, voice_channel, user_voice_channel = await utils.getIds(ctx)
if voice_channel is None:
await ctx.send(":no_entry_sign: Bot must be in a channel to view the queue!", delete_after=3)
return
if await utils.notSameChannel(ctx):
# If the user is not in a voice channel, return an error message
await ctx.send(":no_entry_sign: You must be in the same voice channel as the bot to use this command.", delete_after=3)
return
if action == "show" or action == "list" or action is None:
print(str(server_id) + " | " + "Updating queue, position: " + str(server_info[server_id]["queue_position"]))
index = 0
queue_string = ""
duration_string = ""
position_string = ""
total_duration = 0
for entry in server_info[server_id]["queue"]:
if index == server_info[server_id]["queue_position"]:
if server_info[server_id]["loop"]:
position_string += ":repeat_one:\n"
else:
position_string += ":arrow_right:\n"
else:
position_string += "\n"
if len(entry["name"]) >= 30:
entry_cut = entry["name"][0:30]
else:
entry_cut = entry["name"]
queue_string += "**" + str(index + 1) + ":** " + entry_cut + "\n"
if entry["duration"] < 3600:
duration_string += str(strftime("%M:%S", gmtime(entry["duration"]))) + "\n"
else:
duration_string += str(strftime("%H:%M:%S", gmtime(entry["duration"]))) + "\n"
if index == server_info[server_id]["queue_position"]:
total_duration += entry["duration"] - server_info[server_id]["elapsed"]
if index > server_info[server_id]["queue_position"]:
total_duration += entry["duration"]
index += 1
# Calculate the time remaining in the queue
total_duration = str(strftime("%H:%M:%S", gmtime(total_duration)))
embed = discord.Embed(title=f"Queue ({total_duration} left):", description="", color=0xa032a8)
try:
embed.add_field(name="", value=position_string, inline=True)
embed.add_field(name="List", value=queue_string, inline=True)
embed.add_field(name="Length", value=duration_string, inline=True)
except:
embed.add_field(name="List", value="Queue is **empty**", inline=False)
# Send the constructed queue
queue_embed = await ctx.send(embed=embed)
elif action == "remove" and selection is not None:
print(str(server_id) + " | " + "Removing item #" + str(selection) + " from queue")
selection = int(selection) - 1
current_position = server_info[server_id]["queue_position"]
path = server_info[server_id]["queue"][selection]["id"]
if server_info[server_id]["queue"][selection]["thumbnail"] != "/assets/unknown.png":
thumbnail = None
else:
thumbnail = server_info[server_id]["queue"][selection]["thumbnail"]
if selection is current_position:
await ctx.send(":no_entry_sign: Error, cannot remove currently playing item", delete_after=3)
return
if not selection < 0 and not selection > len(server_info[server_id]["queue"]):
try:
os.remove(path)
if thumbnail is not None:
os.remove(thumbnail)
else:
pass
except OSError:
print(str(server_id) + " | " + "Error while deleting song or thumbnail")
pass
if selection < current_position and current_position:
try:
server_info[server_id]["queue_position"] -= 1
except:
print(str(server_id) + " | " + "Queue position out of range.")
pass
await ctx.send(":white_check_mark: Removed item #" + str(selection + 1) + " from queue.", delete_after=3)
server_info[server_id]["queue"].pop(selection)
await q(ctx)
elif action == "remove" and selection is None:
await ctx.send(":no_entry_sign: Error, please select a queue item to remove", delete_after=3)
else:
await ctx.send(":no_entry_sign: Error, item #" + str(selection) + "not a valid queue item", delete_after=3)
@bot.command()
async def loop(ctx, number = None):
server_id, voice_channel, user_voice_channel = await utils.getIds(ctx)
if voice_channel is None or not voice_channel.is_playing() and not voice_channel.is_paused():
await ctx.send(":no_entry_sign: Bot must be playing to loop!", delete_after=3)
return
if user_voice_channel is None or user_voice_channel.channel != voice_channel.channel:
# If the user is not in a voice channel, return an error message
await ctx.send(":no_entry_sign: You must be in the same voice channel as the bot to use this command.", delete_after=3)
return
if server_info[server_id]["loop"] is False:
print(str(server_id) + " | " + "Looping current song.")
server_info[server_id]["loop"] = True
await ctx.send(":repeat_one: Looping the current song.")
return
else:
print(str(server_id) + " | " + "Not looping current song.")
server_info[server_id]["loop"] = False
await ctx.send(":arrow_forward: Turning off loop on current song.")
return
@bot.event
async def on_command_error(ctx, error):
server_id, voice_channel, user_voice_channel = await utils.getIds(ctx)
if isinstance(error, commands.CommandNotFound):
await ctx.send("Unknown command", delete_after=3)
else:
print(error)
# Run the bot using the Discord bot token
bot.run(os.environ["ODEN_DISCORD_SECRET"])