import os, re, math, uuid, time, shutil, logging, tempfile, threading, requests, numpy as np from datetime import datetime, timedelta from collections import Counter import gradio as gr import torch from transformers import GPT2Tokenizer, GPT2LMHeadModel from keybert import KeyBERT from TTS.api import TTS from moviepy.editor import ( VideoFileClip, AudioFileClip, concatenate_videoclips, concatenate_audioclips, CompositeAudioClip, AudioClip, TextClip, CompositeVideoClip, VideoClip, vfx ) logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") logger = logging.getLogger(__name__) PEXELS_API_KEY = os.getenv("PEXELS_API_KEY") if not PEXELS_API_KEY: raise RuntimeError("Debes definir PEXELS_API_KEY en Variables & secrets") tokenizer = GPT2Tokenizer.from_pretrained("datificate/gpt2-small-spanish") gpt2 = GPT2LMHeadModel.from_pretrained("datificate/gpt2-small-spanish").eval() if tokenizer.pad_token is None: tokenizer.pad_token = tokenizer.eos_token kw_model = KeyBERT("distilbert-base-multilingual-cased") tts_engine = TTS(model_name="tts_models/es/css10/vits", progress_bar=False, gpu=False) RESULTS_DIR = "video_results" os.makedirs(RESULTS_DIR, exist_ok=True) TASKS = {} # ───────── helpers ──────────────────────────────────────────────────────────────── def gpt2_script(prompt: str, mx: int = 160) -> str: ins = f"Escribe un guion corto, interesante y coherente sobre: {prompt}" inp = tokenizer(ins, return_tensors="pt", truncation=True, max_length=512) out = gpt2.generate( **inp, max_length=mx + inp["input_ids"].shape[1], do_sample=True, top_p=0.9, top_k=40, temperature=0.7, no_repeat_ngram_size=3, pad_token_id=tokenizer.pad_token_id, eos_token_id=tokenizer.eos_token_id, ) txt = tokenizer.decode(out[0], skip_special_tokens=True) return txt.split("sobre:")[-1].strip()[:mx] def coqui_tts(text: str, path: str): text = re.sub(r"[^\w\s.,!?áéíóúñÁÉÍÓÚÑ]", "", text)[:500] tts_engine.tts_to_file(text=text, file_path=path) def keywords(text: str) -> list[str]: clean = re.sub(r"[^\w\sáéíóúñÁÉÍÓÚÑ]", "", text.lower()) try: kws = kw_model.extract_keywords(clean, stop_words="spanish", top_n=5) return [k.replace(" ", "+") for k, _ in kws if k] except Exception: words = [w for w in clean.split() if len(w) > 4] return [w for w, _ in Counter(words).most_common(5)] or ["nature"] def pexels_search(q: str, n: int) -> list[dict]: r = requests.get( "https://api.pexels.com/videos/search", headers={"Authorization": PEXELS_API_KEY}, params={"query": q, "per_page": n, "orientation": "landscape"}, timeout=20, ) r.raise_for_status() return r.json().get("videos", []) def download(url: str, folder: str) -> str | None: name = uuid.uuid4().hex + ".mp4" path = os.path.join(folder, name) with requests.get(url, stream=True, timeout=60) as r: r.raise_for_status() with open(path, "wb") as f: for chunk in r.iter_content(1024 * 1024): f.write(chunk) return path if os.path.getsize(path) > 1000 else None def loop_audio(aclip: AudioFileClip, dur: float) -> AudioFileClip: if aclip.duration >= dur: return aclip.subclip(0, dur) loops = math.ceil(dur / aclip.duration) return concatenate_audioclips([aclip] * loops).subclip(0, dur) def make_subs_clips(script: str, video_w: int, video_h: int, duration: float): sentences = [s.strip() for s in re.split(r"[.!?¿¡]", script) if s.strip()] total_words = sum(len(s.split()) for s in sentences) or 1 word_time = duration / total_words clips, cursor = [], 0.0 for sent in sentences: n_words = len(sent.split()) dur = n_words * word_time txt_clip = ( TextClip(sent, fontsize=int(video_h * 0.05), color="white", stroke_color="black", stroke_width=2, method="caption", size=(int(video_w * 0.9), None)) .set_start(cursor) .set_duration(dur) .set_position(("center", video_h * 0.85)) ) clips.append(txt_clip) cursor += dur return clips def make_grain_clip(size: tuple[int, int], duration: float): w, h = size def frame(_t): noise = np.random.randint(0, 256, (h, w, 1), dtype=np.uint8) return np.repeat(noise, 3, axis=2) return VideoClip(frame, duration=duration).set_opacity(0.15) # ───────── video builder ────────────────────────────────────────────────────────── def build_video(text: str, gen_script: bool, music_fp: str | None) -> str: tmp = tempfile.mkdtemp() script = gpt2_script(text) if gen_script else text.strip() voice_path = os.path.join(tmp, "voice.mp3") coqui_tts(script, voice_path) voice_clip = AudioFileClip(voice_path) adur = voice_clip.duration vids = [] for kw in keywords(script): if len(vids) >= 8: break for v in pexels_search(kw, 2): best = max(v["video_files"], key=lambda x: x["width"] * x["height"]) p = download(best["link"], tmp) if p: vids.append(p) if len(vids) >= 8: break if not vids: raise RuntimeError("Sin vídeos disponibles") segs, acc = [], 0 for path in vids: if acc >= adur + 2: break clip = VideoFileClip(path) seg = clip.subclip(0, min(8, clip.duration)) segs.append(seg) acc += seg.duration base = concatenate_videoclips(segs, method="chain") if base.duration < adur: loops = math.ceil(adur / base.duration) base = concatenate_videoclips([base] * loops, method="chain") base = base.subclip(0, adur) if music_fp: mclip = loop_audio(AudioFileClip(music_fp), adur).volumex(0.2) audio = CompositeAudioClip([mclip, voice_clip]) else: audio = voice_clip subs = make_subs_clips(script, base.w, base.h, adur) grain = make_grain_clip((base.w, base.h), adur) final_vid = CompositeVideoClip([base, grain, *subs]).set_audio(audio) out_path = os.path.join(tmp, "final.mp4") final_vid.write_videofile(out_path, fps=24, codec="libx264", audio_codec="aac", logger=None) return out_path # ───────── async tasks ──────────────────────────────────────────────────────────── def worker(tid: str, mode: str, topic: str, user_script: str, music: str | None): try: txt = topic if mode == "Generar Guion con IA" else user_script res_tmp = build_video(txt, mode == "Generar Guion con IA", music) final_path = os.path.join(RESULTS_DIR, f"{tid}.mp4") shutil.copy2(res_tmp, final_path) TASKS[tid] = {"status": "done", "result": final_path, "ts": datetime.utcnow()} except Exception as e: TASKS[tid] = {"status": "error", "error": str(e), "ts": datetime.utcnow()} def submit(mode, topic, user_script, music): content = topic if mode == "Generar Guion con IA" else user_script if not content.strip(): return "", "Ingresa texto" tid = uuid.uuid4().hex[:8] TASKS[tid] = {"status": "processing", "ts": datetime.utcnow()} threading.Thread(target=worker, args=(tid, mode, topic, user_script, music), daemon=True).start() return tid, f"Tarea {tid} creada" def check(tid): if tid not in TASKS: return None, None, "ID inválido" info = TASKS[tid] stat = info["status"] if stat == "processing": return None, None, "Procesando..." if stat == "error": return None, None, f"Error: {info['error']}" return info["result"], info["result"], "Vídeo listo 🎉" # ───────── janitor thread ───────────────────────────────────────────────────────── def janitor(): while True: now = datetime.utcnow() for fname in os.listdir(RESULTS_DIR): fpath = os.path.join(RESULTS_DIR, fname) try: mtime = datetime.utcfromtimestamp(os.path.getmtime(fpath)) if now - mtime > timedelta(hours=24): os.remove(fpath) for k, v in list(TASKS.items()): if v.get("result") == fpath: del TASKS[k] except Exception: pass time.sleep(3600) threading.Thread(target=janitor, daemon=True).start() # ───────── gradio ui ───────────────────────────────────────────────────────────── with gr.Blocks(title="Generador de Vídeos IA") as demo: with gr.Tabs(): with gr.TabItem("Crear Vídeo"): mode = gr.Radio(["Generar Guion con IA", "Usar Mi Guion"], value="Generar Guion con IA") topic = gr.Textbox(label="Tema") user_script = gr.Textbox(label="Guion Completo", visible=False) music = gr.Audio(type="filepath", label="Música (opcional)") btn = gr.Button("Generar") tid_out = gr.Textbox(label="ID de tarea") msg = gr.Textbox(label="Estado") with gr.TabItem("Revisar Estado"): tid_in = gr.Textbox(label="ID de tarea") chk = gr.Button("Verificar") vid = gr.Video() dlf = gr.File() mode.change( lambda m: (gr.update(visible=m == "Generar Guion con IA"), gr.update(visible=m != "Generar Guion con IA")), mode, [topic, user_script] ) btn.click(submit, [mode, topic, user_script, music], [tid_out, msg]) chk.click(check, tid_in, [vid, dlf, msg]) if __name__ == "__main__": demo.launch()