Spaces:
Runtime error
Runtime error
import os | |
import time | |
import requests | |
from pyrogram.types import Message | |
from yt_dlp import YoutubeDL | |
from Hellbot.functions.driver import YoutubeDriver | |
from Hellbot.functions.formatter import secs_to_mins | |
from Hellbot.functions.tools import progress | |
from . import HelpMenu, Symbols, hellbot, on_message | |
async def youtube_audio(_, message: Message): | |
if len(message.command) < 2: | |
return await hellbot.delete( | |
message, "Give a valid youtube link to download audio." | |
) | |
query = await hellbot.input(message) | |
hell = await hellbot.edit(message, "Checking ...") | |
status, url = YoutubeDriver.check_url(query) | |
if not status: | |
return await hellbot.delete(hell, url) | |
await hell.edit("πΌ __Downloading audio ...__") | |
try: | |
with YoutubeDL(YoutubeDriver.song_options()) as ytdl: | |
yt_data = ytdl.extract_info(url, False) | |
yt_file = ytdl.prepare_filename(yt_data) | |
ytdl.process_info(yt_data) | |
upload_text = f"**β¬οΈ π΄ππ ππΊπ½πππ π²πππ ...** \n\n**{Symbols.anchor} π³πππ πΎ:** `{yt_data['title'][:50]}`\n**{Symbols.anchor} π’ππΊπππΎπ :** `{yt_data['channel']}`" | |
await hell.edit(upload_text) | |
response = requests.get(f"https://i.ytimg.com/vi/{yt_data['id']}/hqdefault.jpg") | |
with open(f"{yt_file}.jpg", "wb") as f: | |
f.write(response.content) | |
await message.reply_audio( | |
f"{yt_file}.mp3", | |
caption=f"**π§ π³πππ πΎ:** {yt_data['title']} \n\n**π π΅ππΎππ:** `{yt_data['view_count']}` \n**β π£πππΊππππ:** `{secs_to_mins(int(yt_data['duration']))}`", | |
duration=int(yt_data["duration"]), | |
performer="[ΡΠ½Ρ Π½ΡββΠ²ΟΡ]", | |
title=yt_data["title"], | |
thumb=f"{yt_file}.jpg", | |
progress=progress, | |
progress_args=( | |
hell, | |
time.time(), | |
upload_text, | |
), | |
) | |
await hell.delete() | |
except Exception as e: | |
return await hellbot.delete(hell, f"**π Audio not Downloaded:** `{e}`") | |
try: | |
os.remove(f"{yt_file}.jpg") | |
os.remove(f"{yt_file}.mp3") | |
except: | |
pass | |
async def ytvideo(_, message: Message): | |
if len(message.command) < 2: | |
return await hellbot.delete( | |
message, "Give a valid youtube link to download video." | |
) | |
query = await hellbot.input(message) | |
hell = await hellbot.edit(message, "Checking ...") | |
status, url = YoutubeDriver.check_url(query) | |
if not status: | |
return await hellbot.delete(hell, url) | |
await hell.edit("πΌ __Downloading video ...__") | |
try: | |
with YoutubeDL(YoutubeDriver.video_options()) as ytdl: | |
yt_data = ytdl.extract_info(url, True) | |
yt_file = yt_data["id"] | |
upload_text = f"**β¬οΈ π΄ππ ππΊπ½πππ π²πππ ...** \n\n**{Symbols.anchor} π³πππ πΎ:** `{yt_data['title'][:50]}`\n**{Symbols.anchor} π’ππΊπππΎπ :** `{yt_data['channel']}`" | |
await hell.edit(upload_text) | |
response = requests.get(f"https://i.ytimg.com/vi/{yt_data['id']}/hqdefault.jpg") | |
with open(f"{yt_file}.jpg", "wb") as f: | |
f.write(response.content) | |
await message.reply_video( | |
f"{yt_file}.mp4", | |
caption=f"**π§ π³πππ πΎ:** {yt_data['title']} \n\n**π π΅ππΎππ:** `{yt_data['view_count']}` \n**β π£πππΊππππ:** `{secs_to_mins(int(yt_data['duration']))}`", | |
duration=int(yt_data["duration"]), | |
thumb=f"{yt_file}.jpg", | |
progress=progress, | |
progress_args=( | |
hell, | |
time.time(), | |
upload_text, | |
), | |
) | |
await hell.delete() | |
except Exception as e: | |
return await hellbot.delete(hell, f"**π Video not Downloaded:** `{e}`") | |
try: | |
os.remove(f"{yt_file}.jpg") | |
os.remove(f"{yt_file}.mp4") | |
except: | |
pass | |
async def ytlink(_, message: Message): | |
if len(message.command) < 2: | |
return await hellbot.delete(message, "Give something to search on youtube.") | |
query = await hellbot.input(message) | |
hell = await hellbot.edit(message, "Searching ...") | |
try: | |
results = YoutubeDriver(query, 7).to_dict() | |
except Exception as e: | |
return await hellbot.delete(hell, f"**π Error:** `{e}`") | |
if not results: | |
return await hellbot.delete(hell, "No results found.") | |
text = f"**π π³πππΊπ π±πΎπππ ππ π₯ππππ½:** `{len(results)}`\n\n" | |
for result in results: | |
text += f" **{Symbols.anchor} π³πππ πΎ:** `{result['title'][:50]}`\n**{Symbols.anchor} π’ππΊπππΎπ :** `{result['channel']}`\n**{Symbols.anchor} π΅ππΎππ:** `{result['views']}`\n**{Symbols.anchor} π£πππΊππππ:** `{result['duration']}`\n**{Symbols.anchor} π«πππ:** `https://youtube.com{result['url_suffix']}`\n\n" | |
await hell.edit(text, disable_web_page_preview=True) | |
HelpMenu("youtube").add( | |
"yta", | |
"<youtube link>", | |
"Download the youtube video in .mp3 format!", | |
"yta https://youtu.be/xxxxxxxxxxx", | |
).add( | |
"ytv", | |
"<youtube link>", | |
"Download the youtube video in .mp4 format!", | |
"ytv https://youtu.be/xxxxxxxxxxx", | |
).add( | |
"ytlink", | |
"<query>", | |
"Search for a video on youtube.", | |
"ytlink the hellbot", | |
).info( | |
"Youtube Downloader", | |
).done() | |