# app.py import asyncio import os import re import shutil import uuid import html from pathlib import Path from typing import Optional, Tuple import gradio as gr # ------------------------- # Config # ------------------------- BASE_DIR = Path(".") UPLOAD_FOLDER = BASE_DIR / "uploads" CONVERTED_FOLDER = BASE_DIR / "converted" UPLOAD_FOLDER.mkdir(exist_ok=True) CONVERTED_FOLDER.mkdir(exist_ok=True) ALLOWED_EXTENSIONS = { ".3g2", ".3gp", ".3gpp", ".avi", ".cavs", ".dv", ".dvr", ".flv", ".m2ts", ".m4v", ".mkv", ".mod", ".mov", ".mp4", ".mpeg", ".mpg", ".mts", ".mxf", ".ogg", ".rm", ".rmvb", ".swf", ".ts", ".vob", ".webm", ".wmv", ".wtv", ".ogv", ".opus", ".aac", ".ac3", ".aif", ".aifc", ".aiff", ".amr", ".au", ".caf", ".dss", ".flac", ".m4a", ".m4b", ".mp3", ".oga", ".voc", ".wav", ".weba", ".wma" } VIDEO_BASE_OPTS = ["-crf", "63", "-c:v", "libx264", "-tune", "zerolatency"] ACCEL = "auto" FFMPEG_TIME_RE = re.compile(r"time=(\d+):(\d+):(\d+\.\d+)") # Ensure fdkaac executable bit if it exists in cwd FDKAAC_PATH = Path("./fdkaac") try: if FDKAAC_PATH.exists(): FDKAAC_PATH.chmod(FDKAAC_PATH.stat().st_mode | 0o111) except Exception: pass # ------------------------- # Helpers # ------------------------- def is_audio_file(path: str) -> bool: ext = Path(path).suffix.lower() return ext in {".mp3", ".m4a", ".wav", ".aac", ".oga", ".ogg"} def preview_html(percent: float, step: str, media_src: Optional[str] = None) -> str: """Return HTML containing media preview (video or audio) with overlayed progress/step.""" pct = max(0.0, min(100.0, percent)) esc_step = html.escape(str(step)) media_tag = "" if media_src: esc_src = html.escape(str(media_src)) if is_audio_file(media_src): media_tag = f'' else: media_tag = f'' else: media_tag = ( '
Preview will appear here once available
' ) return f'''
{media_tag}
{esc_step}
{pct:.1f}%
''' async def run_command_capture(cmd, cwd=None, env=None) -> Tuple[str, str, int]: """Run command to completion, capture stdout/stderr.""" proc = await asyncio.create_subprocess_exec( *cmd, cwd=cwd, env=env or os.environ.copy(), stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) stdout, stderr = await proc.communicate() return stdout.decode(errors="ignore"), stderr.decode(errors="ignore"), proc.returncode async def get_duration_seconds(path: Path) -> Optional[float]: """Get duration using ffprobe (seconds) or None if unknown.""" cmd = [ "ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", str(path) ] stdout, stderr, rc = await run_command_capture(cmd) if rc != 0 or not stdout: return None try: return float(stdout.strip()) except Exception: return None def which_cmd(name: str) -> Optional[str]: return shutil.which(name) # ------------------------- # Converter generator # streams tuples: (preview_html_str, download_path_or_None) # ------------------------- async def convert_stream( use_youtube: bool, youtube_url: str, video_file, # gr.File downscale: bool, faster: bool, use_mp3: bool, audio_only: bool, custom_bitrate: bool, video_bitrate: float ): # initial yield preview_html(0.0, "Starting..."), None temp_files = [] input_path: Optional[Path] = None try: # SOURCE if use_youtube: if not youtube_url: yield preview_html(0.0, "Error: YouTube URL required."), None return if not which_cmd("yt-dlp"): yield preview_html(0.0, "yt-dlp not found on server. Please upload the file manually."), None return yield preview_html(1.0, "Attempting YouTube download..."), None out_uuid = uuid.uuid4().hex out_template = str(UPLOAD_FOLDER / f"{out_uuid}.%(ext)s") ytdlp_cmd = ["yt-dlp", "-f", "b", "-o", out_template, youtube_url] stdout, stderr, rc = await run_command_capture(ytdlp_cmd) combined = (stdout or "") + "\n" + (stderr or "") if rc != 0: if "Video unavailable" in combined or "This video is unavailable" in combined: yield preview_html(0.0, "Video unavailable (removed/private/age-restricted)."), None return yield preview_html(0.0, "Could not download from YouTube from this server (cloud host blocked). Please upload manually."), None return files = list(UPLOAD_FOLDER.glob(f"{out_uuid}.*")) if not files: yield preview_html(0.0, "Download completed but file not found."), None return input_path = files[0] temp_files.append(input_path) else: if not video_file: yield preview_html(0.0, "No video provided. Upload or use YouTube URL."), None return try: ext = Path(video_file.name).suffix.lower() except Exception: ext = None if ext not in ALLOWED_EXTENSIONS: yield preview_html(0.0, f"Unsupported file type: {ext}"), None return input_path = UPLOAD_FOLDER / f"{uuid.uuid4().hex}{ext}" shutil.copy2(video_file.name, input_path) temp_files.append(input_path) yield preview_html(1.0, "Probing duration..."), None total_seconds = await get_duration_seconds(input_path) if not total_seconds: yield preview_html(0.0, "Warning: duration unknown; progress will be step-based."), None # AUDIO-ONLY PATH if audio_only: # MP3 branch if use_mp3: step = "Converting to MP3..." out_audio = CONVERTED_FOLDER / f"{uuid.uuid4().hex}.mp3" ffmpeg_cmd = [ "ffmpeg", "-y", "-i", str(input_path), "-ac", "1", "-ar", "24000", "-b:a", "8k", str(out_audio) ] yield preview_html(2.0, step), None if total_seconds: proc = await asyncio.create_subprocess_exec(*ffmpeg_cmd, stderr=asyncio.subprocess.PIPE) last = 0.0 while True: line = await proc.stderr.readline() if not line: break txt = line.decode(errors="ignore") m = FFMPEG_TIME_RE.search(txt) if m: hh, mm, ss = m.groups() current = int(hh) * 3600 + int(mm) * 60 + float(ss) pct = (current / total_seconds) * 100.0 if pct - last >= 0.5: last = pct yield preview_html(pct, step), None await proc.wait() if proc.returncode != 0: yield preview_html(0.0, "ffmpeg failed while encoding MP3."), None return else: stdout, stderr, rc = await run_command_capture(ffmpeg_cmd) if rc != 0: yield preview_html(0.0, "ffmpeg failed while encoding MP3."), None return yield preview_html(100.0, "MP3 conversion finished.", media_src=str(out_audio)), str(out_audio) return # AAC via fdkaac branch (audio-only) # Ensure fdkaac exists if not FDKAAC_PATH.exists(): yield preview_html(0.0, "fdkaac not found at ./fdkaac — please add it to the app folder and make executable."), None return step = "Preparing WAV for fdkaac..." yield preview_html(2.0, step), None wav_tmp = CONVERTED_FOLDER / f"{uuid.uuid4().hex}.wav" aac_out = CONVERTED_FOLDER / f"{uuid.uuid4().hex}.m4a" # Generate WAV (low sample rate) ffmpeg_wav_cmd = ["ffmpeg", "-y", "-i", str(input_path), "-ac", "1", "-ar", "8000", str(wav_tmp)] stdout, stderr, rc = await run_command_capture(ffmpeg_wav_cmd) if rc != 0: yield preview_html(0.0, "ffmpeg failed to produce WAV for fdkaac."), None try: wav_tmp.unlink() except Exception: pass return # Run fdkaac to produce m4a fdkaac_cmd = [ "./fdkaac", "-b", "1k", "-C", "-f", "2", "-G", "1", "-w", "8000", "-o", str(aac_out), str(wav_tmp) ] yield preview_html(5.0, "Encoding AAC with fdkaac..."), None stdout, stderr, rc = await run_command_capture(fdkaac_cmd) try: wav_tmp.unlink() except Exception: pass if rc != 0: yield preview_html(0.0, f"fdkaac failed: {stderr[:300] if stderr else 'no output'}"), None return yield preview_html(100.0, "AAC (fdkaac) audio ready.", media_src=str(aac_out)), str(aac_out) return # ---------------------- # FULL VIDEO FLOW # ---------------------- # 1) Audio encode (via fdkaac or mp3) out_audio = CONVERTED_FOLDER / f"{uuid.uuid4().hex}.m4a" if use_mp3: ffmpeg_audio_cmd = ["ffmpeg", "-y", "-i", str(input_path), "-ac", "1", "-ar", "8000", "-c:a", "libmp3lame", "-b:a", "8k", str(out_audio)] yield preview_html(2.0, "Encoding audio (MP3)..."), None if total_seconds: proc = await asyncio.create_subprocess_exec(*ffmpeg_audio_cmd, stderr=asyncio.subprocess.PIPE) last = 0.0 while True: line = await proc.stderr.readline() if not line: break txt = line.decode(errors="ignore") m = FFMPEG_TIME_RE.search(txt) if m: hh, mm, ss = m.groups() current = int(hh) * 3600 + int(mm) * 60 + float(ss) pct = (current / total_seconds) * 100.0 * 0.20 if pct - last >= 0.5: last = pct yield preview_html(pct, "Encoding audio (MP3)..."), None await proc.wait() if proc.returncode != 0: yield preview_html(0.0, "ffmpeg audio encoding failed."), None return else: stdout, stderr, rc = await run_command_capture(ffmpeg_audio_cmd) if rc != 0: yield preview_html(0.0, "ffmpeg audio encoding failed."), None return else: # Use fdkaac path for AAC if not FDKAAC_PATH.exists(): yield preview_html(0.0, "fdkaac not found at ./fdkaac — please add it to the app folder and make executable."), None return # 1a: Create WAV yield preview_html(2.0, "Generating WAV for fdkaac..."), None wav_tmp = CONVERTED_FOLDER / f"{uuid.uuid4().hex}.wav" ffmpeg_wav_cmd = ["ffmpeg", "-y", "-i", str(input_path), "-ac", "1", "-ar", "8000", str(wav_tmp)] stdout, stderr, rc = await run_command_capture(ffmpeg_wav_cmd) if rc != 0: yield preview_html(0.0, "ffmpeg failed generating WAV for fdkaac."), None try: wav_tmp.unlink() except Exception: pass return # 1b: Run fdkaac aac_tmp = CONVERTED_FOLDER / f"{uuid.uuid4().hex}.m4a" fdkaac_cmd = [ "./fdkaac", "-b", "1k", "-C", "-f", "2", "-G", "1", "-w", "8000", "-o", str(aac_tmp), str(wav_tmp) ] yield preview_html(6.0, "Encoding AAC with fdkaac..."), None stdout, stderr, rc = await run_command_capture(fdkaac_cmd) try: wav_tmp.unlink() except Exception: pass if rc != 0: yield preview_html(0.0, f"fdkaac failed: {stderr[:300] if stderr else 'no output'}"), None return out_audio = aac_tmp # 2) Video encode step_video = "Encoding video track..." yield preview_html(20.0, step_video), None out_video = CONVERTED_FOLDER / f"{uuid.uuid4().hex}.mp4" ffmpeg_video_cmd = ["ffmpeg", "-y", "-hwaccel", ACCEL, "-i", str(input_path)] if downscale: ffmpeg_video_cmd += ["-vf", "scale=-2:144"] if custom_bitrate and video_bitrate: ffmpeg_video_cmd += ["-b:v", f"{int(video_bitrate)}k"] else: ffmpeg_video_cmd += VIDEO_BASE_OPTS if faster: ffmpeg_video_cmd += ["-preset", "ultrafast"] ffmpeg_video_cmd += ["-an", str(out_video)] if total_seconds: proc = await asyncio.create_subprocess_exec(*ffmpeg_video_cmd, stderr=asyncio.subprocess.PIPE) last = 20.0 while True: line = await proc.stderr.readline() if not line: break txt = line.decode(errors="ignore") m = FFMPEG_TIME_RE.search(txt) if m: hh, mm, ss = m.groups() current = int(hh) * 3600 + int(mm) * 60 + float(ss) pct_video = (current / total_seconds) * 100.0 combined = 20.0 + (pct_video * 0.70) if combined - last >= 0.5: last = combined yield preview_html(combined, step_video), None await proc.wait() if proc.returncode != 0: yield preview_html(0.0, "ffmpeg video encoding failed."), None return else: stdout, stderr, rc = await run_command_capture(ffmpeg_video_cmd) if rc != 0: yield preview_html(0.0, "ffmpeg video encoding failed."), None return # 3) Merge audio + video yield preview_html(90.0, "Merging audio & video..."), None merged_out = CONVERTED_FOLDER / f"{uuid.uuid4().hex}.mp4" merge_cmd = ["ffmpeg", "-y", "-i", str(out_video), "-i", str(out_audio), "-c", "copy", str(merged_out)] stdout, stderr, rc = await run_command_capture(merge_cmd) if rc != 0: # fallback re-encode audio into AAC during merge merge_cmd = ["ffmpeg", "-y", "-i", str(out_video), "-i", str(out_audio), "-c:v", "copy", "-c:a", "aac", str(merged_out)] stdout, stderr, rc = await run_command_capture(merge_cmd) if rc != 0: yield preview_html(0.0, "Merging audio and video failed."), None return # cleanup intermediate audio/video for p in (out_audio, out_video): try: p.unlink() except Exception: pass # final yield preview_html(100.0, "Conversion complete!", media_src=str(merged_out)), str(merged_out) return except Exception as e: yield preview_html(0.0, f"Error: {e}"), None return finally: # remove any temp files downloaded for p in temp_files: try: p.unlink() except Exception: pass # ------------------------- # Gradio UI # ------------------------- with gr.Blocks(title="Low Quality Video Inator (fdkaac)") as demo: gr.Markdown("# Low Quality Video Inator\nUpload a file or paste a YouTube URL. The app streams a step-aware overlayed progress bar while encoding.") with gr.Row(): use_youtube = gr.Checkbox(label="Use YouTube URL (server will try to download first)", value=False) youtube_url = gr.Textbox(label="YouTube URL", placeholder="https://youtube.com/...", lines=1) video_file = gr.File(label="Upload Video File", file_types=list(ALLOWED_EXTENSIONS), file_count="single") gr.Markdown("### Conversion Settings") with gr.Row(): downscale = gr.Checkbox(label="Downscale to 144p", value=False) faster = gr.Checkbox(label="Faster encoding (lower quality)", value=False) with gr.Row(): use_mp3 = gr.Checkbox(label="Use MP3 audio", value=False) audio_only = gr.Checkbox(label="Audio Only", value=False) with gr.Row(): custom_bitrate = gr.Checkbox(label="Use custom video bitrate (kbps)", value=False) video_bitrate = gr.Number(label="Video bitrate (kbps)", value=64, visible=False) def toggle_bitrate(v): return gr.update(visible=v) custom_bitrate.change(toggle_bitrate, inputs=[custom_bitrate], outputs=[video_bitrate]) convert_btn = gr.Button("Convert Now", variant="primary") preview_html_el = gr.HTML("
Ready. Preview will appear here.
", label="Preview") download_file = gr.File(label="Download Result") convert_btn.click( fn=convert_stream, inputs=[use_youtube, youtube_url, video_file, downscale, faster, use_mp3, audio_only, custom_bitrate, video_bitrate], outputs=[preview_html_el, download_file] ) demo.launch(share=False)