RandomPersonRR's picture
Update app.py
9341904 verified
# app.py
import asyncio
import os
import re
import shutil
import uuid
from pathlib import Path
import html
from typing import Optional, Tuple
import gradio as gr
# -------------------------
# Config
# -------------------------
BASE_DIR = Path(".")
UPLOAD_FOLDER = BASE_DIR / "uploads"
CONVERTED_FOLDER = BASE_DIR / "converted"
UPLOAD_FOLDER.mkdir(exist_ok=True)
CONVERTED_FOLDER.mkdir(exist_ok=True)
ALLOWED_EXTENSIONS = {
".3g2", ".3gp", ".3gpp", ".avi", ".cavs", ".dv", ".dvr", ".flv",
".m2ts", ".m4v", ".mkv", ".mod", ".mov", ".mp4", ".mpeg", ".mpg",
".mts", ".mxf", ".ogg", ".rm", ".rmvb", ".swf", ".ts", ".vob",
".webm", ".wmv", ".wtv", ".ogv", ".opus", ".aac", ".ac3", ".aif",
".aifc", ".aiff", ".amr", ".au", ".caf", ".dss", ".flac", ".m4a",
".m4b", ".mp3", ".oga", ".voc", ".wav", ".weba", ".wma"
}
VIDEO_BASE_OPTS = ["-crf", "63", "-c:v", "libx264", "-tune", "zerolatency"]
ACCEL = "auto"
FFMPEG_TIME_RE = re.compile(r"time=(\d+):(\d+):(\d+\.\d+)")
FDKAAC_PATH = Path("./fdkaac")
if FDKAAC_PATH.exists():
FDKAAC_PATH.chmod(FDKAAC_PATH.stat().st_mode | 0o111)
# -------------------------
# Helpers
# -------------------------
def is_audio_file(path: str) -> bool:
return Path(path).suffix.lower() in {".mp3", ".m4a", ".wav", ".aac", ".oga", ".ogg"}
async def run_command_capture(cmd, cwd=None, env=None) -> Tuple[str, str, int]:
"""Run command, log stdout/stderr live."""
print(f"[CMD] {' '.join(cmd)}")
proc = await asyncio.create_subprocess_exec(
*cmd,
cwd=cwd,
env=env or os.environ.copy(),
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout_lines = []
stderr_lines = []
async def read_stream(stream, buffer, name):
while True:
line = await stream.readline()
if not line:
break
line_txt = line.decode(errors="ignore").rstrip()
print(f"[{name}] {line_txt}")
buffer.append(line_txt)
await asyncio.gather(
read_stream(proc.stdout, stdout_lines, "STDOUT"),
read_stream(proc.stderr, stderr_lines, "STDERR"),
)
await proc.wait()
return "\n".join(stdout_lines), "\n".join(stderr_lines), proc.returncode
async def get_duration_seconds(path: Path) -> Optional[float]:
cmd = [
"ffprobe", "-v", "error",
"-show_entries", "format=duration",
"-of", "default=noprint_wrappers=1:nokey=1",
str(path)
]
stdout, stderr, rc = await run_command_capture(cmd)
if rc != 0 or not stdout:
return None
try:
return float(stdout.strip())
except Exception:
return None
# -------------------------
# Converter generator
# -------------------------
async def convert_stream(
use_youtube: bool,
youtube_url: str,
video_file, # gr.File
downscale: bool,
faster: bool,
use_mp3: bool,
audio_only: bool,
custom_bitrate: bool,
video_bitrate: float
):
print("Starting conversion...")
temp_files = []
input_path: Optional[Path] = None
try:
# SOURCE
if use_youtube:
yield None, None, "Starting YouTube download..."
out_uuid = uuid.uuid4().hex
out_template = str(UPLOAD_FOLDER / f"{out_uuid}.%(ext)s")
ytdlp_cmd = ["yt-dlp", "--extractor-args", "youtube:player_client=tv_simply", "-f", "b", "-o", out_template, youtube_url]
stdout, stderr, rc = await run_command_capture(ytdlp_cmd)
files = list(UPLOAD_FOLDER.glob(f"{out_uuid}.*"))
if not files:
yield None, None, "Failed to download YouTube video."
return
input_path = files[0]
temp_files.append(input_path)
else:
input_path = UPLOAD_FOLDER / f"{uuid.uuid4().hex}{Path(video_file.name).suffix}"
shutil.copy2(video_file.name, input_path)
temp_files.append(input_path)
total_seconds = await get_duration_seconds(input_path)
print(f"Duration: {total_seconds}s")
# AUDIO ONLY
if audio_only:
out_audio = CONVERTED_FOLDER / f"{uuid.uuid4().hex}.m4a"
wav_tmp = CONVERTED_FOLDER / f"{uuid.uuid4().hex}.wav"
# Step1: generate WAV
yield None, None, "Generating WAV for AAC..."
ffmpeg_wav_cmd = ["ffmpeg", "-y", "-i", str(input_path), "-ac", "1", "-ar", "8000", str(wav_tmp)]
await run_command_capture(ffmpeg_wav_cmd)
# Step2: fdkaac
yield None, None, "Encoding AAC via fdkaac..."
fdkaac_cmd = [
"./fdkaac", "-b", "1k", "-C", "-f", "2", "-G", "1", "-w", "8000",
"-o", str(out_audio), str(wav_tmp)
]
await run_command_capture(fdkaac_cmd)
try: wav_tmp.unlink()
except: pass
yield str(out_audio), str(out_audio), "AAC conversion complete!"
return
# FULL VIDEO
out_audio = CONVERTED_FOLDER / f"{uuid.uuid4().hex}.m4a"
# Step1: encode audio
yield None, None, "Encoding audio track..."
if use_mp3:
out_audio = CONVERTED_FOLDER / f"{uuid.uuid4().hex}.mp3"
ffmpeg_audio_cmd = ["ffmpeg", "-y", "-i", str(input_path), "-ac", "1", "-ar", "24k",
"-b:a", "8k", "-vn", str(out_audio)]
await run_command_capture(ffmpeg_audio_cmd)
else:
# fdkaac branch
wav_tmp = CONVERTED_FOLDER / f"{uuid.uuid4().hex}.wav"
ffmpeg_wav_cmd = ["ffmpeg", "-y", "-i", str(input_path), "-ac", "1", "-ar", "8000", str(wav_tmp)]
await run_command_capture(ffmpeg_wav_cmd)
fdkaac_cmd = ["./fdkaac", "-b", "1k", "-C", "-f", "2", "-G", "1", "-w", "8000",
"-o", str(out_audio), str(wav_tmp)]
await run_command_capture(fdkaac_cmd)
try: wav_tmp.unlink()
except: pass
# Step2: encode video
out_video = CONVERTED_FOLDER / f"{uuid.uuid4().hex}.mp4"
yield None, None, "Encoding video track..."
ffmpeg_video_cmd = ["ffmpeg", "-y", "-hwaccel", ACCEL, "-i", str(input_path)]
if downscale: ffmpeg_video_cmd += ["-vf", "scale=-2:144"]
if custom_bitrate and video_bitrate: ffmpeg_video_cmd += ["-b:v", f"{int(video_bitrate)}k"]
else: ffmpeg_video_cmd += VIDEO_BASE_OPTS
if faster: ffmpeg_video_cmd += ["-preset", "ultrafast"]
ffmpeg_video_cmd += ["-an", str(out_video)]
await run_command_capture(ffmpeg_video_cmd)
# Step3: merge
merged_out = CONVERTED_FOLDER / f"{uuid.uuid4().hex}.mp4"
yield None, None, "Merging audio & video..."
merge_cmd = ["ffmpeg", "-y", "-i", str(out_video), "-i", str(out_audio), "-c", "copy", str(merged_out)]
stdout, stderr, rc = await run_command_capture(merge_cmd)
if rc != 0:
merge_cmd = ["ffmpeg", "-y", "-i", str(out_video), "-i", str(out_audio),
"-c:v", "copy", "-c:a", "aac", str(merged_out)]
await run_command_capture(merge_cmd)
for f in (out_audio, out_video):
try: f.unlink()
except: pass
yield str(merged_out), str(merged_out), "Conversion complete!"
return
finally:
for f in temp_files:
try: f.unlink()
except: pass
# -------------------------
# Gradio UI
# -------------------------
with gr.Blocks(title="Low Quality Video Inator (fdkaac)") as demo:
gr.Markdown("## Low Quality Video Inator\nUpload a file or paste a YouTube URL.")
with gr.Row():
use_youtube = gr.Checkbox(label="Use YouTube URL", value=False)
youtube_url = gr.Textbox(label="YouTube URL", placeholder="https://youtube.com/...")
video_file = gr.File(label="Upload Video", file_types=list(ALLOWED_EXTENSIONS))
with gr.Row():
downscale = gr.Checkbox(label="Downscale to 144p", value=False)
faster = gr.Checkbox(label="Faster and lower quality encoding", value=False)
with gr.Row():
use_mp3 = gr.Checkbox(label="Use MP3 audio (AAC is however lower quality)", value=False)
audio_only = gr.Checkbox(label="Audio only", value=False)
with gr.Row():
custom_bitrate = gr.Checkbox(label="Custom video bitrate", value=False)
video_bitrate = gr.Number(label="Video bitrate (kbps)", value=64, visible=False)
def toggle_bitrate(v):
return gr.update(visible=v)
custom_bitrate.change(toggle_bitrate, inputs=[custom_bitrate], outputs=[video_bitrate])
convert_btn = gr.Button("Convert Now", variant="primary")
video_preview = gr.Video(label="Video Preview")
download_file = gr.File(label="Download Result")
step_text = gr.Textbox(label="Status / Progress", interactive=False)
convert_btn.click(
fn=convert_stream,
inputs=[use_youtube, youtube_url, video_file, downscale, faster, use_mp3, audio_only, custom_bitrate, video_bitrate],
outputs=[video_preview, download_file, step_text]
)
demo.launch(share=False)