# app.py import asyncio import os import re import shutil import uuid from pathlib import Path import html from typing import Optional, Tuple import gradio as gr # ------------------------- # Config # ------------------------- BASE_DIR = Path(".") UPLOAD_FOLDER = BASE_DIR / "uploads" CONVERTED_FOLDER = BASE_DIR / "converted" UPLOAD_FOLDER.mkdir(exist_ok=True) CONVERTED_FOLDER.mkdir(exist_ok=True) ALLOWED_EXTENSIONS = { ".3g2", ".3gp", ".3gpp", ".avi", ".cavs", ".dv", ".dvr", ".flv", ".m2ts", ".m4v", ".mkv", ".mod", ".mov", ".mp4", ".mpeg", ".mpg", ".mts", ".mxf", ".ogg", ".rm", ".rmvb", ".swf", ".ts", ".vob", ".webm", ".wmv", ".wtv", ".ogv", ".opus", ".aac", ".ac3", ".aif", ".aifc", ".aiff", ".amr", ".au", ".caf", ".dss", ".flac", ".m4a", ".m4b", ".mp3", ".oga", ".voc", ".wav", ".weba", ".wma" } VIDEO_BASE_OPTS = ["-crf", "63", "-c:v", "libx264", "-tune", "zerolatency"] ACCEL = "auto" FFMPEG_TIME_RE = re.compile(r"time=(\d+):(\d+):(\d+\.\d+)") FDKAAC_PATH = Path("./fdkaac") if FDKAAC_PATH.exists(): FDKAAC_PATH.chmod(FDKAAC_PATH.stat().st_mode | 0o111) # ------------------------- # Helpers # ------------------------- def is_audio_file(path: str) -> bool: return Path(path).suffix.lower() in {".mp3", ".m4a", ".wav", ".aac", ".oga", ".ogg"} async def run_command_capture(cmd, cwd=None, env=None) -> Tuple[str, str, int]: """Run command, log stdout/stderr live.""" print(f"[CMD] {' '.join(cmd)}") proc = await asyncio.create_subprocess_exec( *cmd, cwd=cwd, env=env or os.environ.copy(), stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) stdout_lines = [] stderr_lines = [] async def read_stream(stream, buffer, name): while True: line = await stream.readline() if not line: break line_txt = line.decode(errors="ignore").rstrip() print(f"[{name}] {line_txt}") buffer.append(line_txt) await asyncio.gather( read_stream(proc.stdout, stdout_lines, "STDOUT"), read_stream(proc.stderr, stderr_lines, "STDERR"), ) await proc.wait() return "\n".join(stdout_lines), "\n".join(stderr_lines), proc.returncode async def get_duration_seconds(path: Path) -> Optional[float]: cmd = [ "ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", str(path) ] stdout, stderr, rc = await run_command_capture(cmd) if rc != 0 or not stdout: return None try: return float(stdout.strip()) except Exception: return None # ------------------------- # Converter generator # ------------------------- async def convert_stream( use_youtube: bool, youtube_url: str, video_file, # gr.File downscale: bool, faster: bool, use_mp3: bool, audio_only: bool, custom_bitrate: bool, video_bitrate: float ): print("Starting conversion...") temp_files = [] input_path: Optional[Path] = None try: # SOURCE if use_youtube: yield None, None, "Starting YouTube download..." out_uuid = uuid.uuid4().hex out_template = str(UPLOAD_FOLDER / f"{out_uuid}.%(ext)s") ytdlp_cmd = ["yt-dlp", "--extractor-args", "youtube:player_client=tv_simply", "-f", "b", "-o", out_template, youtube_url] stdout, stderr, rc = await run_command_capture(ytdlp_cmd) files = list(UPLOAD_FOLDER.glob(f"{out_uuid}.*")) if not files: yield None, None, "Failed to download YouTube video." return input_path = files[0] temp_files.append(input_path) else: input_path = UPLOAD_FOLDER / f"{uuid.uuid4().hex}{Path(video_file.name).suffix}" shutil.copy2(video_file.name, input_path) temp_files.append(input_path) total_seconds = await get_duration_seconds(input_path) print(f"Duration: {total_seconds}s") # AUDIO ONLY if audio_only: out_audio = CONVERTED_FOLDER / f"{uuid.uuid4().hex}.m4a" wav_tmp = CONVERTED_FOLDER / f"{uuid.uuid4().hex}.wav" # Step1: generate WAV yield None, None, "Generating WAV for AAC..." ffmpeg_wav_cmd = ["ffmpeg", "-y", "-i", str(input_path), "-ac", "1", "-ar", "8000", str(wav_tmp)] await run_command_capture(ffmpeg_wav_cmd) # Step2: fdkaac yield None, None, "Encoding AAC via fdkaac..." fdkaac_cmd = [ "./fdkaac", "-b", "1k", "-C", "-f", "2", "-G", "1", "-w", "8000", "-o", str(out_audio), str(wav_tmp) ] await run_command_capture(fdkaac_cmd) try: wav_tmp.unlink() except: pass yield str(out_audio), str(out_audio), "AAC conversion complete!" return # FULL VIDEO out_audio = CONVERTED_FOLDER / f"{uuid.uuid4().hex}.m4a" # Step1: encode audio yield None, None, "Encoding audio track..." if use_mp3: out_audio = CONVERTED_FOLDER / f"{uuid.uuid4().hex}.mp3" ffmpeg_audio_cmd = ["ffmpeg", "-y", "-i", str(input_path), "-ac", "1", "-ar", "24k", "-b:a", "8k", "-vn", str(out_audio)] await run_command_capture(ffmpeg_audio_cmd) else: # fdkaac branch wav_tmp = CONVERTED_FOLDER / f"{uuid.uuid4().hex}.wav" ffmpeg_wav_cmd = ["ffmpeg", "-y", "-i", str(input_path), "-ac", "1", "-ar", "8000", str(wav_tmp)] await run_command_capture(ffmpeg_wav_cmd) fdkaac_cmd = ["./fdkaac", "-b", "1k", "-C", "-f", "2", "-G", "1", "-w", "8000", "-o", str(out_audio), str(wav_tmp)] await run_command_capture(fdkaac_cmd) try: wav_tmp.unlink() except: pass # Step2: encode video out_video = CONVERTED_FOLDER / f"{uuid.uuid4().hex}.mp4" yield None, None, "Encoding video track..." ffmpeg_video_cmd = ["ffmpeg", "-y", "-hwaccel", ACCEL, "-i", str(input_path)] if downscale: ffmpeg_video_cmd += ["-vf", "scale=-2:144"] if custom_bitrate and video_bitrate: ffmpeg_video_cmd += ["-b:v", f"{int(video_bitrate)}k"] else: ffmpeg_video_cmd += VIDEO_BASE_OPTS if faster: ffmpeg_video_cmd += ["-preset", "ultrafast"] ffmpeg_video_cmd += ["-an", str(out_video)] await run_command_capture(ffmpeg_video_cmd) # Step3: merge merged_out = CONVERTED_FOLDER / f"{uuid.uuid4().hex}.mp4" yield None, None, "Merging audio & video..." merge_cmd = ["ffmpeg", "-y", "-i", str(out_video), "-i", str(out_audio), "-c", "copy", str(merged_out)] stdout, stderr, rc = await run_command_capture(merge_cmd) if rc != 0: merge_cmd = ["ffmpeg", "-y", "-i", str(out_video), "-i", str(out_audio), "-c:v", "copy", "-c:a", "aac", str(merged_out)] await run_command_capture(merge_cmd) for f in (out_audio, out_video): try: f.unlink() except: pass yield str(merged_out), str(merged_out), "Conversion complete!" return finally: for f in temp_files: try: f.unlink() except: pass # ------------------------- # Gradio UI # ------------------------- with gr.Blocks(title="Low Quality Video Inator (fdkaac)") as demo: gr.Markdown("## Low Quality Video Inator\nUpload a file or paste a YouTube URL.") with gr.Row(): use_youtube = gr.Checkbox(label="Use YouTube URL", value=False) youtube_url = gr.Textbox(label="YouTube URL", placeholder="https://youtube.com/...") video_file = gr.File(label="Upload Video", file_types=list(ALLOWED_EXTENSIONS)) with gr.Row(): downscale = gr.Checkbox(label="Downscale to 144p", value=False) faster = gr.Checkbox(label="Faster and lower quality encoding", value=False) with gr.Row(): use_mp3 = gr.Checkbox(label="Use MP3 audio (AAC is however lower quality)", value=False) audio_only = gr.Checkbox(label="Audio only", value=False) with gr.Row(): custom_bitrate = gr.Checkbox(label="Custom video bitrate", value=False) video_bitrate = gr.Number(label="Video bitrate (kbps)", value=64, visible=False) def toggle_bitrate(v): return gr.update(visible=v) custom_bitrate.change(toggle_bitrate, inputs=[custom_bitrate], outputs=[video_bitrate]) convert_btn = gr.Button("Convert Now", variant="primary") video_preview = gr.Video(label="Video Preview") download_file = gr.File(label="Download Result") step_text = gr.Textbox(label="Status / Progress", interactive=False) convert_btn.click( fn=convert_stream, inputs=[use_youtube, youtube_url, video_file, downscale, faster, use_mp3, audio_only, custom_bitrate, video_bitrate], outputs=[video_preview, download_file, step_text] ) demo.launch(share=False)