Files
AoyoBot/cogs/voiceBot.py
Дмитрий Шиманский ae43a31e39 Initialize project
2024-02-22 19:04:48 +03:00

208 lines
9.8 KiB
Python

import disnake
import yt_dlp # type: ignore
from disnake.ext import commands, tasks
import asyncio
from typing import Any, Dict, Optional
# Suppress noise about console usage from errors
yt_dlp.utils.bug_reports_message = lambda: ""
from yandex_music import ClientAsync, Client
import ytmusicapi
from handlers import logger
from pathlib import Path
import json
import os
type_to_name = {
'track': 'трек',
'artist': 'исполнитель',
'album': 'альбом',
'playlist': 'плейлист',
'video': 'видео',
'user': 'пользователь',
'podcast': 'подкаст',
'podcast_episode': 'эпизод подкаста',
}
yt = ytmusicapi.YTMusic()
ffmpeg_options = {'options': '-vn', 'before_options': '-reconnect 1 -reconnect_streamed 1 -reconnect_delay_max 5'}
ytdl = yt_dlp.YoutubeDL({'outtmpl': '%(id)s.%(ext)s'})
class YTDLSource(disnake.PCMVolumeTransformer):
def __init__(self, source: disnake.AudioSource, *, data: Dict[str, Any], volume: float = 0.125):
super().__init__(source, volume)
self.title = data.get("title")
self.author = data.get("author")
self.source = data.get("source")
self.id = data.get("unicalID")
self.thumbnails: str = data.get("icon")
self.data = data
@classmethod
async def from_url(
cls, url, *, loop: Optional[asyncio.AbstractEventLoop] = None, stream: bool = True
):
player = None
if not "http" in url:
try:
url = f"https://youtube.com/watch?v={yt.search(url)[0]['videoId']}"
except Exception:
env = os.environ
client = await ClientAsync(env.get("YANDEX_TOKEN")).init()
func = (await client.search(url, nocorrect=False)).best
artists = str(func.result.artists_name()).replace("'", '').replace("[", '').replace("]", '')
logger.info(f"[yandex] Downloading {func.result.title}")
# func.result.download(f"music.mp3")
logger.info(f"[yandex] Downloaded...")
logger.info(f"[yandex] Openning ....")
try:
player = cls(disnake.FFmpegPCMAudio(await (await func.result.get_download_info_async())[0].get_direct_link_async(), **ffmpeg_options), data={'title': func.result.title, 'author': artists, 'source': "YandexMusic", 'fileName': f"{func.result.title}", 'type': "mp3", 'unicalID': f"https://music.yandex.ru/track/{func.result.id}", "icon": func.result.get_og_image_url()})
except Exception:
player = cls(disnake.FFmpegPCMAudio(f"music.mp3", **ffmpeg_options), data={'title': func.result.title, 'author': artists, 'source': "YandexMusic", 'fileName': f"{func.result.title}", 'type': "mp3", 'unicalID': f"https://music.yandex.ru/track/{func.result.id}", "icon": func.result.get_og_image_url()})
logger.info(f"[yandex] Playing {func.result.title}")
if not player:
loop = loop or asyncio.get_event_loop()
data: Any = await loop.run_in_executor(
None, lambda: ytdl.extract_info(url, download=False)
)
filename = data["formats"][5]["url"] if stream else ytdl.prepare_filename(data)
data["author"] = data["channel"]
data["source"] = "YouTube"
data["type"] = data["formats"][5]["audio_ext"]
data["unicalID"] = url
data["icon"] = data["thumbnails"][38]["url"]
data["fileName"] = filename
title = data.get("title")
audio_type = data["type"]
logger.info(f"[YouTube] Playing {title}")
logger.info(f"[Youtube] Audio type {audio_type}")
player = cls(disnake.FFmpegPCMAudio(filename, **ffmpeg_options), data=data)
return player
class Music(commands.Cog):
def __init__(self, bot: commands.Bot):
self.bot = bot
self.queue = {"a": "a"}
def __channel(self, inter) -> disnake.VoiceProtocol | None:
for voiceProtocol in self.bot.voice_clients:
if voiceProtocol.guild == inter.guild:
return voiceProtocol
# @tasks.loop()
# async def on_Voice_state_loop(self):
# voice_state = self.bot.voice_clients
@commands.slash_command()
async def join(self, inter: disnake.ApplicationCommandInteraction, channel: Optional[disnake.VoiceChannel] = None):
"""Joins a voice channel"""
channel = inter.author.voice.channel if not channel else channel
if self.__channel(inter): await self.__channel(inter).disconnect()
await channel.connect()
await inter.response.send_message(f"Подключился к каналу {channel.mention}", ephemeral=True)
# @commands.slash_command()
# async def queue(self, inter, *args): pass
# @commands.slash_command(name="queue")
# async def add_to_queue(self, inter: disnake.ApplicationCommandInteraction, prompt_to_queue: str):
# try:
# await inter.response.defer()
# player = await YTDLSource.from_url(prompt_to_queue, loop=self.bot.loop, stream=True)
# try:
# if len(self.queue[inter.guild.id]) == 0:
# if self.__channel(inter): await self.__channel(inter).disconnect()
# self.start_playing(await self.ensure_voice(inter), player, inter.guild)
# await inter.send(f':mag_right: **Searching for** ``' + prompt_to_queue + '``\n<:youtube:763374159567781890> **Now Playing:** ``{}'.format(player.title) + "``")
# else:
# self.queue[len(self.queue[inter.guild.id])] = player
# await inter.send(f':mag_right: **Searching for** ``' + prompt_to_queue + '``\n<:youtube:763374159567781890> **Added to queue:** ``{}'.format(player.title) + "``")
# except KeyError:
# self.queue[inter.guild.id] = []
# if self.__channel(inter): await self.__channel(inter).disconnect()
# self.start_playing(await self.ensure_voice(inter), player, inter.guild)
# await inter.send(f':mag_right: **Searching for** ``' + prompt_to_queue + '``\n<:youtube:763374159567781890> **Now Playing:** ``{}'.format(player.title) + "``")
# except Exception as e:
# await inter.send(f"Somenthing went wrong - please try again later! ||{e}||")
# def start_playing(self, voice_client, player, guild:disnake.Guild):
# self.queue[guild.id].append(player)
# for i in self.queue[guild.id]:
# try:
# voice_client.play(i, after=lambda e: print('Player error: %s' % e) if e else None)
# except:
# pass
@commands.slash_command()
async def play(self, inter: disnake.ApplicationCommandInteraction, prompt: str):
"""Plays from a url (almost anything youtube_dl supports)"""
if self.__channel(inter): await self.__channel(inter).disconnect()
voiceState: disnake.VoiceClient = await self.ensure_voice(inter)
await inter.response.defer(with_message=True)
player = await YTDLSource.from_url(prompt, loop=self.bot.loop, stream=True)
embed = disnake.Embed(title="Вопроизведение музыки.", description=f"**Трек**: {player.title}\n **Автор**: {player.author}\n **Ссылка**: [Тык]({player.id})").set_footer(text=f"Источник: {player.source}").set_thumbnail(url=player.thumbnails)
await inter.send(embed=embed)
voiceState.play(
player, after=lambda e: logger.error(f"Error: {e}") if e else None
)
@commands.slash_command()
async def leave(self, inter: disnake.ApplicationCommandInteraction):
"""Stops and disconnects the bot from voice"""
await self.__channel(inter).disconnect()
await inter.response.send_message("Бот вышел из канала", ephemeral=True)
@commands.slash_command()
async def stop(self, inter: disnake.ApplicationCommandInteraction):
"""Paused music"""
await self.ensure_voice(inter)
await inter.response.send_message("Бот остановил воспроизведение", ephemeral=True)
@commands.slash_command()
async def pause(self, inter: disnake.ApplicationCommandInteraction):
"""Paused music"""
await self.ensure_voice(inter, 'pause')
await inter.response.send_message("Бот остановил воспроизведение", ephemeral=True)
@commands.slash_command()
async def resume(self, inter: disnake.ApplicationCommandInteraction):
"""Paused music"""
await self.ensure_voice(inter, 'resume')
await inter.response.send_message("Бот продолжил воспроизведение воспроизведение", ephemeral=True)
async def ensure_voice(self, inter, types:str = None):
if self.__channel(inter) == None:
if inter.author.voice:
return await inter.author.voice.channel.connect()
else:
await inter.send("You are not connected to a voice channel.", ephemeral=True)
raise commands.CommandError("Author not connected to a voice channel.")
elif types == "pause" and self.__channel(inter).is_playing():
self.__channel(inter).pause()
elif types == "resume" and not self.__channel(inter).is_playing():
self.__channel(inter).resume()
elif self.__channel(inter).is_playing():
self.__channel(inter).stop()
def setup(bot):
bot.add_cog(Music(bot))