INVIDEO_BASIC / app.py
gnosticdev's picture
Update app.py
eba6bc8 verified
raw
history blame
10.3 kB
import os, re, math, uuid, time, shutil, logging, tempfile, threading, requests, numpy as np
from datetime import datetime, timedelta
from collections import Counter
import gradio as gr
import torch
from transformers import GPT2Tokenizer, GPT2LMHeadModel
from keybert import KeyBERT
from TTS.api import TTS
from moviepy.editor import (
VideoFileClip, AudioFileClip, concatenate_videoclips, concatenate_audioclips,
CompositeAudioClip, AudioClip, TextClip, CompositeVideoClip, VideoClip, vfx
)
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
logger = logging.getLogger(__name__)
PEXELS_API_KEY = os.getenv("PEXELS_API_KEY")
if not PEXELS_API_KEY:
raise RuntimeError("Debes definir PEXELS_API_KEY en Variables & secrets")
tokenizer = GPT2Tokenizer.from_pretrained("datificate/gpt2-small-spanish")
gpt2 = GPT2LMHeadModel.from_pretrained("datificate/gpt2-small-spanish").eval()
if tokenizer.pad_token is None:
tokenizer.pad_token = tokenizer.eos_token
kw_model = KeyBERT("distilbert-base-multilingual-cased")
tts_engine = TTS(model_name="tts_models/es/css10/vits", progress_bar=False, gpu=False)
RESULTS_DIR = "video_results"
os.makedirs(RESULTS_DIR, exist_ok=True)
TASKS = {}
# ───────── helpers ────────────────────────────────────────────────────────────────
def gpt2_script(prompt: str, mx: int = 160) -> str:
ins = f"Escribe un guion corto, interesante y coherente sobre: {prompt}"
inp = tokenizer(ins, return_tensors="pt", truncation=True, max_length=512)
out = gpt2.generate(
**inp, max_length=mx + inp["input_ids"].shape[1], do_sample=True,
top_p=0.9, top_k=40, temperature=0.7, no_repeat_ngram_size=3,
pad_token_id=tokenizer.pad_token_id, eos_token_id=tokenizer.eos_token_id,
)
txt = tokenizer.decode(out[0], skip_special_tokens=True)
return txt.split("sobre:")[-1].strip()[:mx]
def coqui_tts(text: str, path: str):
text = re.sub(r"[^\w\s.,!?Γ‘Γ©Γ­Γ³ΓΊΓ±ΓΓ‰ΓΓ“ΓšΓ‘]", "", text)[:500]
tts_engine.tts_to_file(text=text, file_path=path)
def keywords(text: str) -> list[str]:
clean = re.sub(r"[^\w\sΓ‘Γ©Γ­Γ³ΓΊΓ±ΓΓ‰ΓΓ“ΓšΓ‘]", "", text.lower())
try:
kws = kw_model.extract_keywords(clean, stop_words="spanish", top_n=5)
return [k.replace(" ", "+") for k, _ in kws if k]
except Exception:
words = [w for w in clean.split() if len(w) > 4]
return [w for w, _ in Counter(words).most_common(5)] or ["nature"]
def pexels_search(q: str, n: int) -> list[dict]:
r = requests.get(
"https://api.pexels.com/videos/search",
headers={"Authorization": PEXELS_API_KEY},
params={"query": q, "per_page": n, "orientation": "landscape"},
timeout=20,
)
r.raise_for_status()
return r.json().get("videos", [])
def download(url: str, folder: str) -> str | None:
name = uuid.uuid4().hex + ".mp4"
path = os.path.join(folder, name)
with requests.get(url, stream=True, timeout=60) as r:
r.raise_for_status()
with open(path, "wb") as f:
for chunk in r.iter_content(1024 * 1024):
f.write(chunk)
return path if os.path.getsize(path) > 1000 else None
def loop_audio(aclip: AudioFileClip, dur: float) -> AudioFileClip:
if aclip.duration >= dur:
return aclip.subclip(0, dur)
loops = math.ceil(dur / aclip.duration)
return concatenate_audioclips([aclip] * loops).subclip(0, dur)
def make_subs_clips(script: str, video_w: int, video_h: int, duration: float):
sentences = [s.strip() for s in re.split(r"[.!?ΒΏΒ‘]", script) if s.strip()]
total_words = sum(len(s.split()) for s in sentences) or 1
word_time = duration / total_words
clips, cursor = [], 0.0
for sent in sentences:
n_words = len(sent.split())
dur = n_words * word_time
txt_clip = (
TextClip(sent, fontsize=int(video_h * 0.05), color="white",
stroke_color="black", stroke_width=2, method="caption",
size=(int(video_w * 0.9), None))
.set_start(cursor)
.set_duration(dur)
.set_position(("center", video_h * 0.85))
)
clips.append(txt_clip)
cursor += dur
return clips
def make_grain_clip(size: tuple[int, int], duration: float):
w, h = size
def frame(_t):
noise = np.random.randint(0, 256, (h, w, 1), dtype=np.uint8)
return np.repeat(noise, 3, axis=2)
return VideoClip(frame, duration=duration).set_opacity(0.15)
# ───────── video builder ──────────────────────────────────────────────────────────
def build_video(text: str, gen_script: bool, music_fp: str | None) -> str:
tmp = tempfile.mkdtemp()
script = gpt2_script(text) if gen_script else text.strip()
voice_path = os.path.join(tmp, "voice.mp3")
coqui_tts(script, voice_path)
voice_clip = AudioFileClip(voice_path)
adur = voice_clip.duration
vids = []
for kw in keywords(script):
if len(vids) >= 8:
break
for v in pexels_search(kw, 2):
best = max(v["video_files"], key=lambda x: x["width"] * x["height"])
p = download(best["link"], tmp)
if p:
vids.append(p)
if len(vids) >= 8:
break
if not vids:
raise RuntimeError("Sin vΓ­deos disponibles")
segs, acc = [], 0
for path in vids:
if acc >= adur + 2:
break
clip = VideoFileClip(path)
seg = clip.subclip(0, min(8, clip.duration))
segs.append(seg)
acc += seg.duration
base = concatenate_videoclips(segs, method="chain")
if base.duration < adur:
loops = math.ceil(adur / base.duration)
base = concatenate_videoclips([base] * loops, method="chain")
base = base.subclip(0, adur)
if music_fp:
mclip = loop_audio(AudioFileClip(music_fp), adur).volumex(0.2)
audio = CompositeAudioClip([mclip, voice_clip])
else:
audio = voice_clip
subs = make_subs_clips(script, base.w, base.h, adur)
grain = make_grain_clip((base.w, base.h), adur)
final_vid = CompositeVideoClip([base, grain, *subs]).set_audio(audio)
out_path = os.path.join(tmp, "final.mp4")
final_vid.write_videofile(out_path, fps=24, codec="libx264", audio_codec="aac", logger=None)
return out_path
# ───────── async tasks ────────────────────────────────────────────────────────────
def worker(tid: str, mode: str, topic: str, user_script: str, music: str | None):
try:
txt = topic if mode == "Generar Guion con IA" else user_script
res_tmp = build_video(txt, mode == "Generar Guion con IA", music)
final_path = os.path.join(RESULTS_DIR, f"{tid}.mp4")
shutil.copy2(res_tmp, final_path)
TASKS[tid] = {"status": "done", "result": final_path, "ts": datetime.utcnow()}
except Exception as e:
TASKS[tid] = {"status": "error", "error": str(e), "ts": datetime.utcnow()}
def submit(mode, topic, user_script, music):
content = topic if mode == "Generar Guion con IA" else user_script
if not content.strip():
return "", "Ingresa texto"
tid = uuid.uuid4().hex[:8]
TASKS[tid] = {"status": "processing", "ts": datetime.utcnow()}
threading.Thread(target=worker, args=(tid, mode, topic, user_script, music), daemon=True).start()
return tid, f"Tarea {tid} creada"
def check(tid):
if tid not in TASKS:
return None, None, "ID invΓ‘lido"
info = TASKS[tid]
stat = info["status"]
if stat == "processing":
return None, None, "Procesando..."
if stat == "error":
return None, None, f"Error: {info['error']}"
return info["result"], info["result"], "VΓ­deo listo πŸŽ‰"
# ───────── janitor thread ─────────────────────────────────────────────────────────
def janitor():
while True:
now = datetime.utcnow()
for fname in os.listdir(RESULTS_DIR):
fpath = os.path.join(RESULTS_DIR, fname)
try:
mtime = datetime.utcfromtimestamp(os.path.getmtime(fpath))
if now - mtime > timedelta(hours=24):
os.remove(fpath)
for k, v in list(TASKS.items()):
if v.get("result") == fpath:
del TASKS[k]
except Exception:
pass
time.sleep(3600)
threading.Thread(target=janitor, daemon=True).start()
# ───────── gradio ui ─────────────────────────────────────────────────────────────
with gr.Blocks(title="Generador de VΓ­deos IA") as demo:
with gr.Tabs():
with gr.TabItem("Crear VΓ­deo"):
mode = gr.Radio(["Generar Guion con IA", "Usar Mi Guion"], value="Generar Guion con IA")
topic = gr.Textbox(label="Tema")
user_script = gr.Textbox(label="Guion Completo", visible=False)
music = gr.Audio(type="filepath", label="MΓΊsica (opcional)")
btn = gr.Button("Generar")
tid_out = gr.Textbox(label="ID de tarea")
msg = gr.Textbox(label="Estado")
with gr.TabItem("Revisar Estado"):
tid_in = gr.Textbox(label="ID de tarea")
chk = gr.Button("Verificar")
vid = gr.Video()
dlf = gr.File()
mode.change(
lambda m: (gr.update(visible=m == "Generar Guion con IA"), gr.update(visible=m != "Generar Guion con IA")),
mode, [topic, user_script]
)
btn.click(submit, [mode, topic, user_script, music], [tid_out, msg])
chk.click(check, tid_in, [vid, dlf, msg])
if __name__ == "__main__":
demo.launch()