Spaces:
Sleeping
Sleeping
import os, re, math, uuid, time, shutil, logging, tempfile, threading, requests, numpy as np | |
from datetime import datetime, timedelta | |
from collections import Counter | |
import gradio as gr | |
import torch | |
from transformers import GPT2Tokenizer, GPT2LMHeadModel | |
from keybert import KeyBERT | |
from TTS.api import TTS | |
from moviepy.editor import ( | |
VideoFileClip, AudioFileClip, concatenate_videoclips, concatenate_audioclips, | |
CompositeAudioClip, AudioClip, TextClip, CompositeVideoClip, VideoClip, vfx | |
) | |
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") | |
logger = logging.getLogger(__name__) | |
PEXELS_API_KEY = os.getenv("PEXELS_API_KEY") | |
if not PEXELS_API_KEY: | |
raise RuntimeError("Debes definir PEXELS_API_KEY en Variables & secrets") | |
tokenizer = GPT2Tokenizer.from_pretrained("datificate/gpt2-small-spanish") | |
gpt2 = GPT2LMHeadModel.from_pretrained("datificate/gpt2-small-spanish").eval() | |
if tokenizer.pad_token is None: | |
tokenizer.pad_token = tokenizer.eos_token | |
kw_model = KeyBERT("distilbert-base-multilingual-cased") | |
tts_engine = TTS(model_name="tts_models/es/css10/vits", progress_bar=False, gpu=False) | |
RESULTS_DIR = "video_results" | |
os.makedirs(RESULTS_DIR, exist_ok=True) | |
TASKS = {} | |
# βββββββββ helpers ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
def gpt2_script(prompt: str, mx: int = 160) -> str: | |
ins = f"Escribe un guion corto, interesante y coherente sobre: {prompt}" | |
inp = tokenizer(ins, return_tensors="pt", truncation=True, max_length=512) | |
out = gpt2.generate( | |
**inp, max_length=mx + inp["input_ids"].shape[1], do_sample=True, | |
top_p=0.9, top_k=40, temperature=0.7, no_repeat_ngram_size=3, | |
pad_token_id=tokenizer.pad_token_id, eos_token_id=tokenizer.eos_token_id, | |
) | |
txt = tokenizer.decode(out[0], skip_special_tokens=True) | |
return txt.split("sobre:")[-1].strip()[:mx] | |
def coqui_tts(text: str, path: str): | |
text = re.sub(r"[^\w\s.,!?ÑéΓΓ³ΓΊΓ±ΓΓΓΓΓΓ]", "", text)[:500] | |
tts_engine.tts_to_file(text=text, file_path=path) | |
def keywords(text: str) -> list[str]: | |
clean = re.sub(r"[^\w\sÑéΓΓ³ΓΊΓ±ΓΓΓΓΓΓ]", "", text.lower()) | |
try: | |
kws = kw_model.extract_keywords(clean, stop_words="spanish", top_n=5) | |
return [k.replace(" ", "+") for k, _ in kws if k] | |
except Exception: | |
words = [w for w in clean.split() if len(w) > 4] | |
return [w for w, _ in Counter(words).most_common(5)] or ["nature"] | |
def pexels_search(q: str, n: int) -> list[dict]: | |
r = requests.get( | |
"https://api.pexels.com/videos/search", | |
headers={"Authorization": PEXELS_API_KEY}, | |
params={"query": q, "per_page": n, "orientation": "landscape"}, | |
timeout=20, | |
) | |
r.raise_for_status() | |
return r.json().get("videos", []) | |
def download(url: str, folder: str) -> str | None: | |
name = uuid.uuid4().hex + ".mp4" | |
path = os.path.join(folder, name) | |
with requests.get(url, stream=True, timeout=60) as r: | |
r.raise_for_status() | |
with open(path, "wb") as f: | |
for chunk in r.iter_content(1024 * 1024): | |
f.write(chunk) | |
return path if os.path.getsize(path) > 1000 else None | |
def loop_audio(aclip: AudioFileClip, dur: float) -> AudioFileClip: | |
if aclip.duration >= dur: | |
return aclip.subclip(0, dur) | |
loops = math.ceil(dur / aclip.duration) | |
return concatenate_audioclips([aclip] * loops).subclip(0, dur) | |
def make_subs_clips(script: str, video_w: int, video_h: int, duration: float): | |
sentences = [s.strip() for s in re.split(r"[.!?ΒΏΒ‘]", script) if s.strip()] | |
total_words = sum(len(s.split()) for s in sentences) or 1 | |
word_time = duration / total_words | |
clips, cursor = [], 0.0 | |
for sent in sentences: | |
n_words = len(sent.split()) | |
dur = n_words * word_time | |
txt_clip = ( | |
TextClip(sent, fontsize=int(video_h * 0.05), color="white", | |
stroke_color="black", stroke_width=2, method="caption", | |
size=(int(video_w * 0.9), None)) | |
.set_start(cursor) | |
.set_duration(dur) | |
.set_position(("center", video_h * 0.85)) | |
) | |
clips.append(txt_clip) | |
cursor += dur | |
return clips | |
def make_grain_clip(size: tuple[int, int], duration: float): | |
w, h = size | |
def frame(_t): | |
noise = np.random.randint(0, 256, (h, w, 1), dtype=np.uint8) | |
return np.repeat(noise, 3, axis=2) | |
return VideoClip(frame, duration=duration).set_opacity(0.15) | |
# βββββββββ video builder ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
def build_video(text: str, gen_script: bool, music_fp: str | None) -> str: | |
tmp = tempfile.mkdtemp() | |
script = gpt2_script(text) if gen_script else text.strip() | |
voice_path = os.path.join(tmp, "voice.mp3") | |
coqui_tts(script, voice_path) | |
voice_clip = AudioFileClip(voice_path) | |
adur = voice_clip.duration | |
vids = [] | |
for kw in keywords(script): | |
if len(vids) >= 8: | |
break | |
for v in pexels_search(kw, 2): | |
best = max(v["video_files"], key=lambda x: x["width"] * x["height"]) | |
p = download(best["link"], tmp) | |
if p: | |
vids.append(p) | |
if len(vids) >= 8: | |
break | |
if not vids: | |
raise RuntimeError("Sin vΓdeos disponibles") | |
segs, acc = [], 0 | |
for path in vids: | |
if acc >= adur + 2: | |
break | |
clip = VideoFileClip(path) | |
seg = clip.subclip(0, min(8, clip.duration)) | |
segs.append(seg) | |
acc += seg.duration | |
base = concatenate_videoclips(segs, method="chain") | |
if base.duration < adur: | |
loops = math.ceil(adur / base.duration) | |
base = concatenate_videoclips([base] * loops, method="chain") | |
base = base.subclip(0, adur) | |
if music_fp: | |
mclip = loop_audio(AudioFileClip(music_fp), adur).volumex(0.2) | |
audio = CompositeAudioClip([mclip, voice_clip]) | |
else: | |
audio = voice_clip | |
subs = make_subs_clips(script, base.w, base.h, adur) | |
grain = make_grain_clip((base.w, base.h), adur) | |
final_vid = CompositeVideoClip([base, grain, *subs]).set_audio(audio) | |
out_path = os.path.join(tmp, "final.mp4") | |
final_vid.write_videofile(out_path, fps=24, codec="libx264", audio_codec="aac", logger=None) | |
return out_path | |
# βββββββββ async tasks ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
def worker(tid: str, mode: str, topic: str, user_script: str, music: str | None): | |
try: | |
txt = topic if mode == "Generar Guion con IA" else user_script | |
res_tmp = build_video(txt, mode == "Generar Guion con IA", music) | |
final_path = os.path.join(RESULTS_DIR, f"{tid}.mp4") | |
shutil.copy2(res_tmp, final_path) | |
TASKS[tid] = {"status": "done", "result": final_path, "ts": datetime.utcnow()} | |
except Exception as e: | |
TASKS[tid] = {"status": "error", "error": str(e), "ts": datetime.utcnow()} | |
def submit(mode, topic, user_script, music): | |
content = topic if mode == "Generar Guion con IA" else user_script | |
if not content.strip(): | |
return "", "Ingresa texto" | |
tid = uuid.uuid4().hex[:8] | |
TASKS[tid] = {"status": "processing", "ts": datetime.utcnow()} | |
threading.Thread(target=worker, args=(tid, mode, topic, user_script, music), daemon=True).start() | |
return tid, f"Tarea {tid} creada" | |
def check(tid): | |
if tid not in TASKS: | |
return None, None, "ID invΓ‘lido" | |
info = TASKS[tid] | |
stat = info["status"] | |
if stat == "processing": | |
return None, None, "Procesando..." | |
if stat == "error": | |
return None, None, f"Error: {info['error']}" | |
return info["result"], info["result"], "VΓdeo listo π" | |
# βββββββββ janitor thread βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
def janitor(): | |
while True: | |
now = datetime.utcnow() | |
for fname in os.listdir(RESULTS_DIR): | |
fpath = os.path.join(RESULTS_DIR, fname) | |
try: | |
mtime = datetime.utcfromtimestamp(os.path.getmtime(fpath)) | |
if now - mtime > timedelta(hours=24): | |
os.remove(fpath) | |
for k, v in list(TASKS.items()): | |
if v.get("result") == fpath: | |
del TASKS[k] | |
except Exception: | |
pass | |
time.sleep(3600) | |
threading.Thread(target=janitor, daemon=True).start() | |
# βββββββββ gradio ui βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
with gr.Blocks(title="Generador de VΓdeos IA") as demo: | |
with gr.Tabs(): | |
with gr.TabItem("Crear VΓdeo"): | |
mode = gr.Radio(["Generar Guion con IA", "Usar Mi Guion"], value="Generar Guion con IA") | |
topic = gr.Textbox(label="Tema") | |
user_script = gr.Textbox(label="Guion Completo", visible=False) | |
music = gr.Audio(type="filepath", label="MΓΊsica (opcional)") | |
btn = gr.Button("Generar") | |
tid_out = gr.Textbox(label="ID de tarea") | |
msg = gr.Textbox(label="Estado") | |
with gr.TabItem("Revisar Estado"): | |
tid_in = gr.Textbox(label="ID de tarea") | |
chk = gr.Button("Verificar") | |
vid = gr.Video() | |
dlf = gr.File() | |
mode.change( | |
lambda m: (gr.update(visible=m == "Generar Guion con IA"), gr.update(visible=m != "Generar Guion con IA")), | |
mode, [topic, user_script] | |
) | |
btn.click(submit, [mode, topic, user_script, music], [tid_out, msg]) | |
chk.click(check, tid_in, [vid, dlf, msg]) | |
if __name__ == "__main__": | |
demo.launch() |