Spaces:
Sleeping
Sleeping
# app.py | |
import asyncio | |
import os | |
import re | |
import shutil | |
import uuid | |
from pathlib import Path | |
import html | |
from typing import Optional, Tuple | |
import gradio as gr | |
# ------------------------- | |
# Config | |
# ------------------------- | |
BASE_DIR = Path(".") | |
UPLOAD_FOLDER = BASE_DIR / "uploads" | |
CONVERTED_FOLDER = BASE_DIR / "converted" | |
UPLOAD_FOLDER.mkdir(exist_ok=True) | |
CONVERTED_FOLDER.mkdir(exist_ok=True) | |
ALLOWED_EXTENSIONS = { | |
".3g2", ".3gp", ".3gpp", ".avi", ".cavs", ".dv", ".dvr", ".flv", | |
".m2ts", ".m4v", ".mkv", ".mod", ".mov", ".mp4", ".mpeg", ".mpg", | |
".mts", ".mxf", ".ogg", ".rm", ".rmvb", ".swf", ".ts", ".vob", | |
".webm", ".wmv", ".wtv", ".ogv", ".opus", ".aac", ".ac3", ".aif", | |
".aifc", ".aiff", ".amr", ".au", ".caf", ".dss", ".flac", ".m4a", | |
".m4b", ".mp3", ".oga", ".voc", ".wav", ".weba", ".wma" | |
} | |
VIDEO_BASE_OPTS = ["-crf", "63", "-c:v", "libx264", "-tune", "zerolatency"] | |
ACCEL = "auto" | |
FFMPEG_TIME_RE = re.compile(r"time=(\d+):(\d+):(\d+\.\d+)") | |
FDKAAC_PATH = Path("./fdkaac") | |
if FDKAAC_PATH.exists(): | |
FDKAAC_PATH.chmod(FDKAAC_PATH.stat().st_mode | 0o111) | |
# ------------------------- | |
# Helpers | |
# ------------------------- | |
def is_audio_file(path: str) -> bool: | |
return Path(path).suffix.lower() in {".mp3", ".m4a", ".wav", ".aac", ".oga", ".ogg"} | |
async def run_command_capture(cmd, cwd=None, env=None) -> Tuple[str, str, int]: | |
"""Run command, log stdout/stderr live.""" | |
print(f"[CMD] {' '.join(cmd)}") | |
proc = await asyncio.create_subprocess_exec( | |
*cmd, | |
cwd=cwd, | |
env=env or os.environ.copy(), | |
stdout=asyncio.subprocess.PIPE, | |
stderr=asyncio.subprocess.PIPE, | |
) | |
stdout_lines = [] | |
stderr_lines = [] | |
async def read_stream(stream, buffer, name): | |
while True: | |
line = await stream.readline() | |
if not line: | |
break | |
line_txt = line.decode(errors="ignore").rstrip() | |
print(f"[{name}] {line_txt}") | |
buffer.append(line_txt) | |
await asyncio.gather( | |
read_stream(proc.stdout, stdout_lines, "STDOUT"), | |
read_stream(proc.stderr, stderr_lines, "STDERR"), | |
) | |
await proc.wait() | |
return "\n".join(stdout_lines), "\n".join(stderr_lines), proc.returncode | |
async def get_duration_seconds(path: Path) -> Optional[float]: | |
cmd = [ | |
"ffprobe", "-v", "error", | |
"-show_entries", "format=duration", | |
"-of", "default=noprint_wrappers=1:nokey=1", | |
str(path) | |
] | |
stdout, stderr, rc = await run_command_capture(cmd) | |
if rc != 0 or not stdout: | |
return None | |
try: | |
return float(stdout.strip()) | |
except Exception: | |
return None | |
# ------------------------- | |
# Converter generator | |
# ------------------------- | |
async def convert_stream( | |
use_youtube: bool, | |
youtube_url: str, | |
video_file, # gr.File | |
downscale: bool, | |
faster: bool, | |
use_mp3: bool, | |
audio_only: bool, | |
custom_bitrate: bool, | |
video_bitrate: float | |
): | |
print("Starting conversion...") | |
temp_files = [] | |
input_path: Optional[Path] = None | |
try: | |
# SOURCE | |
if use_youtube: | |
yield None, None, "Starting YouTube download..." | |
out_uuid = uuid.uuid4().hex | |
out_template = str(UPLOAD_FOLDER / f"{out_uuid}.%(ext)s") | |
ytdlp_cmd = ["yt-dlp", "--extractor-args", "youtube:player_client=tv_simply", "-f", "b", "-o", out_template, youtube_url] | |
stdout, stderr, rc = await run_command_capture(ytdlp_cmd) | |
files = list(UPLOAD_FOLDER.glob(f"{out_uuid}.*")) | |
if not files: | |
yield None, None, "Failed to download YouTube video." | |
return | |
input_path = files[0] | |
temp_files.append(input_path) | |
else: | |
input_path = UPLOAD_FOLDER / f"{uuid.uuid4().hex}{Path(video_file.name).suffix}" | |
shutil.copy2(video_file.name, input_path) | |
temp_files.append(input_path) | |
total_seconds = await get_duration_seconds(input_path) | |
print(f"Duration: {total_seconds}s") | |
# AUDIO ONLY | |
if audio_only: | |
out_audio = CONVERTED_FOLDER / f"{uuid.uuid4().hex}.m4a" | |
wav_tmp = CONVERTED_FOLDER / f"{uuid.uuid4().hex}.wav" | |
# Step1: generate WAV | |
yield None, None, "Generating WAV for AAC..." | |
ffmpeg_wav_cmd = ["ffmpeg", "-y", "-i", str(input_path), "-ac", "1", "-ar", "8000", str(wav_tmp)] | |
await run_command_capture(ffmpeg_wav_cmd) | |
# Step2: fdkaac | |
yield None, None, "Encoding AAC via fdkaac..." | |
fdkaac_cmd = [ | |
"./fdkaac", "-b", "1k", "-C", "-f", "2", "-G", "1", "-w", "8000", | |
"-o", str(out_audio), str(wav_tmp) | |
] | |
await run_command_capture(fdkaac_cmd) | |
try: wav_tmp.unlink() | |
except: pass | |
yield str(out_audio), str(out_audio), "AAC conversion complete!" | |
return | |
# FULL VIDEO | |
out_audio = CONVERTED_FOLDER / f"{uuid.uuid4().hex}.m4a" | |
# Step1: encode audio | |
yield None, None, "Encoding audio track..." | |
if use_mp3: | |
out_audio = CONVERTED_FOLDER / f"{uuid.uuid4().hex}.mp3" | |
ffmpeg_audio_cmd = ["ffmpeg", "-y", "-i", str(input_path), "-ac", "1", "-ar", "24k", | |
"-b:a", "8k", "-vn", str(out_audio)] | |
await run_command_capture(ffmpeg_audio_cmd) | |
else: | |
# fdkaac branch | |
wav_tmp = CONVERTED_FOLDER / f"{uuid.uuid4().hex}.wav" | |
ffmpeg_wav_cmd = ["ffmpeg", "-y", "-i", str(input_path), "-ac", "1", "-ar", "8000", str(wav_tmp)] | |
await run_command_capture(ffmpeg_wav_cmd) | |
fdkaac_cmd = ["./fdkaac", "-b", "1k", "-C", "-f", "2", "-G", "1", "-w", "8000", | |
"-o", str(out_audio), str(wav_tmp)] | |
await run_command_capture(fdkaac_cmd) | |
try: wav_tmp.unlink() | |
except: pass | |
# Step2: encode video | |
out_video = CONVERTED_FOLDER / f"{uuid.uuid4().hex}.mp4" | |
yield None, None, "Encoding video track..." | |
ffmpeg_video_cmd = ["ffmpeg", "-y", "-hwaccel", ACCEL, "-i", str(input_path)] | |
if downscale: ffmpeg_video_cmd += ["-vf", "scale=-2:144"] | |
if custom_bitrate and video_bitrate: ffmpeg_video_cmd += ["-b:v", f"{int(video_bitrate)}k"] | |
else: ffmpeg_video_cmd += VIDEO_BASE_OPTS | |
if faster: ffmpeg_video_cmd += ["-preset", "ultrafast"] | |
ffmpeg_video_cmd += ["-an", str(out_video)] | |
await run_command_capture(ffmpeg_video_cmd) | |
# Step3: merge | |
merged_out = CONVERTED_FOLDER / f"{uuid.uuid4().hex}.mp4" | |
yield None, None, "Merging audio & video..." | |
merge_cmd = ["ffmpeg", "-y", "-i", str(out_video), "-i", str(out_audio), "-c", "copy", str(merged_out)] | |
stdout, stderr, rc = await run_command_capture(merge_cmd) | |
if rc != 0: | |
merge_cmd = ["ffmpeg", "-y", "-i", str(out_video), "-i", str(out_audio), | |
"-c:v", "copy", "-c:a", "aac", str(merged_out)] | |
await run_command_capture(merge_cmd) | |
for f in (out_audio, out_video): | |
try: f.unlink() | |
except: pass | |
yield str(merged_out), str(merged_out), "Conversion complete!" | |
return | |
finally: | |
for f in temp_files: | |
try: f.unlink() | |
except: pass | |
# ------------------------- | |
# Gradio UI | |
# ------------------------- | |
with gr.Blocks(title="Low Quality Video Inator (fdkaac)") as demo: | |
gr.Markdown("## Low Quality Video Inator\nUpload a file or paste a YouTube URL.") | |
with gr.Row(): | |
use_youtube = gr.Checkbox(label="Use YouTube URL", value=False) | |
youtube_url = gr.Textbox(label="YouTube URL", placeholder="https://youtube.com/...") | |
video_file = gr.File(label="Upload Video", file_types=list(ALLOWED_EXTENSIONS)) | |
with gr.Row(): | |
downscale = gr.Checkbox(label="Downscale to 144p", value=False) | |
faster = gr.Checkbox(label="Faster and lower quality encoding", value=False) | |
with gr.Row(): | |
use_mp3 = gr.Checkbox(label="Use MP3 audio (AAC is however lower quality)", value=False) | |
audio_only = gr.Checkbox(label="Audio only", value=False) | |
with gr.Row(): | |
custom_bitrate = gr.Checkbox(label="Custom video bitrate", value=False) | |
video_bitrate = gr.Number(label="Video bitrate (kbps)", value=64, visible=False) | |
def toggle_bitrate(v): | |
return gr.update(visible=v) | |
custom_bitrate.change(toggle_bitrate, inputs=[custom_bitrate], outputs=[video_bitrate]) | |
convert_btn = gr.Button("Convert Now", variant="primary") | |
video_preview = gr.Video(label="Video Preview") | |
download_file = gr.File(label="Download Result") | |
step_text = gr.Textbox(label="Status / Progress", interactive=False) | |
convert_btn.click( | |
fn=convert_stream, | |
inputs=[use_youtube, youtube_url, video_file, downscale, faster, use_mp3, audio_only, custom_bitrate, video_bitrate], | |
outputs=[video_preview, download_file, step_text] | |
) | |
demo.launch(share=False) |