Spaces:
Runtime error
Runtime error
File size: 5,946 Bytes
78b07ad |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 |
import os
import time
import requests
from pyrogram.types import Message
from yt_dlp import YoutubeDL
from Hellbot.functions.driver import YoutubeDriver
from Hellbot.functions.formatter import secs_to_mins
from Hellbot.functions.tools import progress
from . import HelpMenu, Symbols, hellbot, on_message
@on_message("yta", allow_stan=True)
async def youtube_audio(_, message: Message):
if len(message.command) < 2:
return await hellbot.delete(
message, "Give a valid youtube link to download audio."
)
query = await hellbot.input(message)
hell = await hellbot.edit(message, "Checking ...")
status, url = YoutubeDriver.check_url(query)
if not status:
return await hellbot.delete(hell, url)
await hell.edit("πΌ __Downloading audio ...__")
try:
with YoutubeDL(YoutubeDriver.song_options()) as ytdl:
yt_data = ytdl.extract_info(url, False)
yt_file = ytdl.prepare_filename(yt_data)
ytdl.process_info(yt_data)
upload_text = f"**β¬οΈ π΄ππ
ππΊπ½πππ π²πππ ...** \n\n**{Symbols.anchor} π³πππ
πΎ:** `{yt_data['title'][:50]}`\n**{Symbols.anchor} π’ππΊπππΎπ
:** `{yt_data['channel']}`"
await hell.edit(upload_text)
response = requests.get(f"https://i.ytimg.com/vi/{yt_data['id']}/hqdefault.jpg")
with open(f"{yt_file}.jpg", "wb") as f:
f.write(response.content)
await message.reply_audio(
f"{yt_file}.mp3",
caption=f"**π§ π³πππ
πΎ:** {yt_data['title']} \n\n**π π΅ππΎππ:** `{yt_data['view_count']}` \n**β π£πππΊππππ:** `{secs_to_mins(int(yt_data['duration']))}`",
duration=int(yt_data["duration"]),
performer="[ΡΠ½Ρ Π½ΡββΠ²ΟΡ]",
title=yt_data["title"],
thumb=f"{yt_file}.jpg",
progress=progress,
progress_args=(
hell,
time.time(),
upload_text,
),
)
await hell.delete()
except Exception as e:
return await hellbot.delete(hell, f"**π Audio not Downloaded:** `{e}`")
try:
os.remove(f"{yt_file}.jpg")
os.remove(f"{yt_file}.mp3")
except:
pass
@on_message("ytv", allow_stan=True)
async def ytvideo(_, message: Message):
if len(message.command) < 2:
return await hellbot.delete(
message, "Give a valid youtube link to download video."
)
query = await hellbot.input(message)
hell = await hellbot.edit(message, "Checking ...")
status, url = YoutubeDriver.check_url(query)
if not status:
return await hellbot.delete(hell, url)
await hell.edit("πΌ __Downloading video ...__")
try:
with YoutubeDL(YoutubeDriver.video_options()) as ytdl:
yt_data = ytdl.extract_info(url, True)
yt_file = yt_data["id"]
upload_text = f"**β¬οΈ π΄ππ
ππΊπ½πππ π²πππ ...** \n\n**{Symbols.anchor} π³πππ
πΎ:** `{yt_data['title'][:50]}`\n**{Symbols.anchor} π’ππΊπππΎπ
:** `{yt_data['channel']}`"
await hell.edit(upload_text)
response = requests.get(f"https://i.ytimg.com/vi/{yt_data['id']}/hqdefault.jpg")
with open(f"{yt_file}.jpg", "wb") as f:
f.write(response.content)
await message.reply_video(
f"{yt_file}.mp4",
caption=f"**π§ π³πππ
πΎ:** {yt_data['title']} \n\n**π π΅ππΎππ:** `{yt_data['view_count']}` \n**β π£πππΊππππ:** `{secs_to_mins(int(yt_data['duration']))}`",
duration=int(yt_data["duration"]),
thumb=f"{yt_file}.jpg",
progress=progress,
progress_args=(
hell,
time.time(),
upload_text,
),
)
await hell.delete()
except Exception as e:
return await hellbot.delete(hell, f"**π Video not Downloaded:** `{e}`")
try:
os.remove(f"{yt_file}.jpg")
os.remove(f"{yt_file}.mp4")
except:
pass
@on_message("ytlink", allow_stan=True)
async def ytlink(_, message: Message):
if len(message.command) < 2:
return await hellbot.delete(message, "Give something to search on youtube.")
query = await hellbot.input(message)
hell = await hellbot.edit(message, "Searching ...")
try:
results = YoutubeDriver(query, 7).to_dict()
except Exception as e:
return await hellbot.delete(hell, f"**π Error:** `{e}`")
if not results:
return await hellbot.delete(hell, "No results found.")
text = f"**π π³πππΊπ
π±πΎπππ
ππ π₯ππππ½:** `{len(results)}`\n\n"
for result in results:
text += f" **{Symbols.anchor} π³πππ
πΎ:** `{result['title'][:50]}`\n**{Symbols.anchor} π’ππΊπππΎπ
:** `{result['channel']}`\n**{Symbols.anchor} π΅ππΎππ:** `{result['views']}`\n**{Symbols.anchor} π£πππΊππππ:** `{result['duration']}`\n**{Symbols.anchor} π«πππ:** `https://youtube.com{result['url_suffix']}`\n\n"
await hell.edit(text, disable_web_page_preview=True)
HelpMenu("youtube").add(
"yta",
"<youtube link>",
"Download the youtube video in .mp3 format!",
"yta https://youtu.be/xxxxxxxxxxx",
).add(
"ytv",
"<youtube link>",
"Download the youtube video in .mp4 format!",
"ytv https://youtu.be/xxxxxxxxxxx",
).add(
"ytlink",
"<query>",
"Search for a video on youtube.",
"ytlink the hellbot",
).info(
"Youtube Downloader",
).done()
|