import os import time import requests from pyrogram.types import Message from yt_dlp import YoutubeDL from Hellbot.functions.driver import YoutubeDriver from Hellbot.functions.formatter import secs_to_mins from Hellbot.functions.tools import progress from . import HelpMenu, Symbols, hellbot, on_message @on_message("yta", allow_stan=True) async def youtube_audio(_, message: Message): if len(message.command) < 2: return await hellbot.delete( message, "Give a valid youtube link to download audio." ) query = await hellbot.input(message) hell = await hellbot.edit(message, "Checking ...") status, url = YoutubeDriver.check_url(query) if not status: return await hellbot.delete(hell, url) await hell.edit("🎼 __Downloading audio ...__") try: with YoutubeDL(YoutubeDriver.song_options()) as ytdl: yt_data = ytdl.extract_info(url, False) yt_file = ytdl.prepare_filename(yt_data) ytdl.process_info(yt_data) upload_text = f"**⬆️ π–΄π—‰π—…π—ˆπ–Ίπ–½π—‚π—‡π—€ π–²π—ˆπ—‡π—€ ...** \n\n**{Symbols.anchor} 𝖳𝗂𝗍𝗅𝖾:** `{yt_data['title'][:50]}`\n**{Symbols.anchor} 𝖒𝗁𝖺𝗇𝗇𝖾𝗅:** `{yt_data['channel']}`" await hell.edit(upload_text) response = requests.get(f"https://i.ytimg.com/vi/{yt_data['id']}/hqdefault.jpg") with open(f"{yt_file}.jpg", "wb") as f: f.write(response.content) await message.reply_audio( f"{yt_file}.mp3", caption=f"**🎧 𝖳𝗂𝗍𝗅𝖾:** {yt_data['title']} \n\n**πŸ‘€ π–΅π—‚π–Ύπ—π—Œ:** `{yt_data['view_count']}` \n**βŒ› π–£π—Žπ—‹π–Ίπ—π—‚π—ˆπ—‡:** `{secs_to_mins(int(yt_data['duration']))}`", duration=int(yt_data["duration"]), performer="[Ρ‚Π½Ρ” Π½Ρ”β„“β„“Π²ΟƒΡ‚]", title=yt_data["title"], thumb=f"{yt_file}.jpg", progress=progress, progress_args=( hell, time.time(), upload_text, ), ) await hell.delete() except Exception as e: return await hellbot.delete(hell, f"**πŸ€ Audio not Downloaded:** `{e}`") try: os.remove(f"{yt_file}.jpg") os.remove(f"{yt_file}.mp3") except: pass @on_message("ytv", allow_stan=True) async def ytvideo(_, message: Message): if len(message.command) < 2: return await hellbot.delete( message, "Give a valid youtube link to download video." ) query = await hellbot.input(message) hell = await hellbot.edit(message, "Checking ...") status, url = YoutubeDriver.check_url(query) if not status: return await hellbot.delete(hell, url) await hell.edit("🎼 __Downloading video ...__") try: with YoutubeDL(YoutubeDriver.video_options()) as ytdl: yt_data = ytdl.extract_info(url, True) yt_file = yt_data["id"] upload_text = f"**⬆️ π–΄π—‰π—…π—ˆπ–Ίπ–½π—‚π—‡π—€ π–²π—ˆπ—‡π—€ ...** \n\n**{Symbols.anchor} 𝖳𝗂𝗍𝗅𝖾:** `{yt_data['title'][:50]}`\n**{Symbols.anchor} 𝖒𝗁𝖺𝗇𝗇𝖾𝗅:** `{yt_data['channel']}`" await hell.edit(upload_text) response = requests.get(f"https://i.ytimg.com/vi/{yt_data['id']}/hqdefault.jpg") with open(f"{yt_file}.jpg", "wb") as f: f.write(response.content) await message.reply_video( f"{yt_file}.mp4", caption=f"**🎧 𝖳𝗂𝗍𝗅𝖾:** {yt_data['title']} \n\n**πŸ‘€ π–΅π—‚π–Ύπ—π—Œ:** `{yt_data['view_count']}` \n**βŒ› π–£π—Žπ—‹π–Ίπ—π—‚π—ˆπ—‡:** `{secs_to_mins(int(yt_data['duration']))}`", duration=int(yt_data["duration"]), thumb=f"{yt_file}.jpg", progress=progress, progress_args=( hell, time.time(), upload_text, ), ) await hell.delete() except Exception as e: return await hellbot.delete(hell, f"**πŸ€ Video not Downloaded:** `{e}`") try: os.remove(f"{yt_file}.jpg") os.remove(f"{yt_file}.mp4") except: pass @on_message("ytlink", allow_stan=True) async def ytlink(_, message: Message): if len(message.command) < 2: return await hellbot.delete(message, "Give something to search on youtube.") query = await hellbot.input(message) hell = await hellbot.edit(message, "Searching ...") try: results = YoutubeDriver(query, 7).to_dict() except Exception as e: return await hellbot.delete(hell, f"**πŸ€ Error:** `{e}`") if not results: return await hellbot.delete(hell, "No results found.") text = f"**πŸ”Ž π–³π—ˆπ—π–Ίπ—… π–±π–Ύπ—Œπ—Žπ—…π—π—Œ π–₯π—ˆπ—Žπ—‡π–½:** `{len(results)}`\n\n" for result in results: text += f" **{Symbols.anchor} 𝖳𝗂𝗍𝗅𝖾:** `{result['title'][:50]}`\n**{Symbols.anchor} 𝖒𝗁𝖺𝗇𝗇𝖾𝗅:** `{result['channel']}`\n**{Symbols.anchor} π–΅π—‚π–Ύπ—π—Œ:** `{result['views']}`\n**{Symbols.anchor} π–£π—Žπ—‹π–Ίπ—π—‚π—ˆπ—‡:** `{result['duration']}`\n**{Symbols.anchor} 𝖫𝗂𝗇𝗄:** `https://youtube.com{result['url_suffix']}`\n\n" await hell.edit(text, disable_web_page_preview=True) HelpMenu("youtube").add( "yta", "", "Download the youtube video in .mp3 format!", "yta https://youtu.be/xxxxxxxxxxx", ).add( "ytv", "", "Download the youtube video in .mp4 format!", "ytv https://youtu.be/xxxxxxxxxxx", ).add( "ytlink", "", "Search for a video on youtube.", "ytlink the hellbot", ).info( "Youtube Downloader", ).done()