File size: 10,311 Bytes
eba6bc8
 
 
 
bf48cd0
 
 
 
eba6bc8
 
 
 
 
6692a78
eba6bc8
8336be3
f1f8e2a
eba6bc8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1e90d4c
eba6bc8
 
 
f1f8e2a
eba6bc8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1e90d4c
eba6bc8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c6e67aa
eba6bc8
 
 
 
 
c6e67aa
eba6bc8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1e90d4c
eba6bc8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1e90d4c
eba6bc8
 
96a2f23
 
eba6bc8
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
import os, re, math, uuid, time, shutil, logging, tempfile, threading, requests, numpy as np
from datetime import datetime, timedelta
from collections import Counter

import gradio as gr
import torch
from transformers import GPT2Tokenizer, GPT2LMHeadModel
from keybert import KeyBERT
from TTS.api import TTS
from moviepy.editor import (
    VideoFileClip, AudioFileClip, concatenate_videoclips, concatenate_audioclips,
    CompositeAudioClip, AudioClip, TextClip, CompositeVideoClip, VideoClip, vfx
)

logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
logger = logging.getLogger(__name__)

PEXELS_API_KEY = os.getenv("PEXELS_API_KEY")
if not PEXELS_API_KEY:
    raise RuntimeError("Debes definir PEXELS_API_KEY en Variables & secrets")

tokenizer = GPT2Tokenizer.from_pretrained("datificate/gpt2-small-spanish")
gpt2 = GPT2LMHeadModel.from_pretrained("datificate/gpt2-small-spanish").eval()
if tokenizer.pad_token is None:
    tokenizer.pad_token = tokenizer.eos_token
kw_model = KeyBERT("distilbert-base-multilingual-cased")
tts_engine = TTS(model_name="tts_models/es/css10/vits", progress_bar=False, gpu=False)

RESULTS_DIR = "video_results"
os.makedirs(RESULTS_DIR, exist_ok=True)
TASKS = {}

# ───────── helpers ────────────────────────────────────────────────────────────────
def gpt2_script(prompt: str, mx: int = 160) -> str:
    ins = f"Escribe un guion corto, interesante y coherente sobre: {prompt}"
    inp = tokenizer(ins, return_tensors="pt", truncation=True, max_length=512)
    out = gpt2.generate(
        **inp, max_length=mx + inp["input_ids"].shape[1], do_sample=True,
        top_p=0.9, top_k=40, temperature=0.7, no_repeat_ngram_size=3,
        pad_token_id=tokenizer.pad_token_id, eos_token_id=tokenizer.eos_token_id,
    )
    txt = tokenizer.decode(out[0], skip_special_tokens=True)
    return txt.split("sobre:")[-1].strip()[:mx]

def coqui_tts(text: str, path: str):
    text = re.sub(r"[^\w\s.,!?Γ‘Γ©Γ­Γ³ΓΊΓ±ΓΓ‰ΓΓ“ΓšΓ‘]", "", text)[:500]
    tts_engine.tts_to_file(text=text, file_path=path)

def keywords(text: str) -> list[str]:
    clean = re.sub(r"[^\w\sΓ‘Γ©Γ­Γ³ΓΊΓ±ΓΓ‰ΓΓ“ΓšΓ‘]", "", text.lower())
    try:
        kws = kw_model.extract_keywords(clean, stop_words="spanish", top_n=5)
        return [k.replace(" ", "+") for k, _ in kws if k]
    except Exception:
        words = [w for w in clean.split() if len(w) > 4]
        return [w for w, _ in Counter(words).most_common(5)] or ["nature"]

def pexels_search(q: str, n: int) -> list[dict]:
    r = requests.get(
        "https://api.pexels.com/videos/search",
        headers={"Authorization": PEXELS_API_KEY},
        params={"query": q, "per_page": n, "orientation": "landscape"},
        timeout=20,
    )
    r.raise_for_status()
    return r.json().get("videos", [])

def download(url: str, folder: str) -> str | None:
    name = uuid.uuid4().hex + ".mp4"
    path = os.path.join(folder, name)
    with requests.get(url, stream=True, timeout=60) as r:
        r.raise_for_status()
        with open(path, "wb") as f:
            for chunk in r.iter_content(1024 * 1024):
                f.write(chunk)
    return path if os.path.getsize(path) > 1000 else None

def loop_audio(aclip: AudioFileClip, dur: float) -> AudioFileClip:
    if aclip.duration >= dur:
        return aclip.subclip(0, dur)
    loops = math.ceil(dur / aclip.duration)
    return concatenate_audioclips([aclip] * loops).subclip(0, dur)

def make_subs_clips(script: str, video_w: int, video_h: int, duration: float):
    sentences = [s.strip() for s in re.split(r"[.!?ΒΏΒ‘]", script) if s.strip()]
    total_words = sum(len(s.split()) for s in sentences) or 1
    word_time = duration / total_words
    clips, cursor = [], 0.0
    for sent in sentences:
        n_words = len(sent.split())
        dur = n_words * word_time
        txt_clip = (
            TextClip(sent, fontsize=int(video_h * 0.05), color="white",
                     stroke_color="black", stroke_width=2, method="caption",
                     size=(int(video_w * 0.9), None))
            .set_start(cursor)
            .set_duration(dur)
            .set_position(("center", video_h * 0.85))
        )
        clips.append(txt_clip)
        cursor += dur
    return clips

def make_grain_clip(size: tuple[int, int], duration: float):
    w, h = size
    def frame(_t):
        noise = np.random.randint(0, 256, (h, w, 1), dtype=np.uint8)
        return np.repeat(noise, 3, axis=2)
    return VideoClip(frame, duration=duration).set_opacity(0.15)

# ───────── video builder ──────────────────────────────────────────────────────────
def build_video(text: str, gen_script: bool, music_fp: str | None) -> str:
    tmp = tempfile.mkdtemp()
    script = gpt2_script(text) if gen_script else text.strip()
    voice_path = os.path.join(tmp, "voice.mp3")
    coqui_tts(script, voice_path)
    voice_clip = AudioFileClip(voice_path)
    adur = voice_clip.duration

    vids = []
    for kw in keywords(script):
        if len(vids) >= 8:
            break
        for v in pexels_search(kw, 2):
            best = max(v["video_files"], key=lambda x: x["width"] * x["height"])
            p = download(best["link"], tmp)
            if p:
                vids.append(p)
            if len(vids) >= 8:
                break
    if not vids:
        raise RuntimeError("Sin vΓ­deos disponibles")

    segs, acc = [], 0
    for path in vids:
        if acc >= adur + 2:
            break
        clip = VideoFileClip(path)
        seg = clip.subclip(0, min(8, clip.duration))
        segs.append(seg)
        acc += seg.duration
    base = concatenate_videoclips(segs, method="chain")
    if base.duration < adur:
        loops = math.ceil(adur / base.duration)
        base = concatenate_videoclips([base] * loops, method="chain")
    base = base.subclip(0, adur)

    if music_fp:
        mclip = loop_audio(AudioFileClip(music_fp), adur).volumex(0.2)
        audio = CompositeAudioClip([mclip, voice_clip])
    else:
        audio = voice_clip

    subs = make_subs_clips(script, base.w, base.h, adur)
    grain = make_grain_clip((base.w, base.h), adur)
    final_vid = CompositeVideoClip([base, grain, *subs]).set_audio(audio)

    out_path = os.path.join(tmp, "final.mp4")
    final_vid.write_videofile(out_path, fps=24, codec="libx264", audio_codec="aac", logger=None)
    return out_path

# ───────── async tasks ────────────────────────────────────────────────────────────
def worker(tid: str, mode: str, topic: str, user_script: str, music: str | None):
    try:
        txt = topic if mode == "Generar Guion con IA" else user_script
        res_tmp = build_video(txt, mode == "Generar Guion con IA", music)
        final_path = os.path.join(RESULTS_DIR, f"{tid}.mp4")
        shutil.copy2(res_tmp, final_path)
        TASKS[tid] = {"status": "done", "result": final_path, "ts": datetime.utcnow()}
    except Exception as e:
        TASKS[tid] = {"status": "error", "error": str(e), "ts": datetime.utcnow()}

def submit(mode, topic, user_script, music):
    content = topic if mode == "Generar Guion con IA" else user_script
    if not content.strip():
        return "", "Ingresa texto"
    tid = uuid.uuid4().hex[:8]
    TASKS[tid] = {"status": "processing", "ts": datetime.utcnow()}
    threading.Thread(target=worker, args=(tid, mode, topic, user_script, music), daemon=True).start()
    return tid, f"Tarea {tid} creada"

def check(tid):
    if tid not in TASKS:
        return None, None, "ID invΓ‘lido"
    info = TASKS[tid]
    stat = info["status"]
    if stat == "processing":
        return None, None, "Procesando..."
    if stat == "error":
        return None, None, f"Error: {info['error']}"
    return info["result"], info["result"], "VΓ­deo listo πŸŽ‰"

# ───────── janitor thread ─────────────────────────────────────────────────────────
def janitor():
    while True:
        now = datetime.utcnow()
        for fname in os.listdir(RESULTS_DIR):
            fpath = os.path.join(RESULTS_DIR, fname)
            try:
                mtime = datetime.utcfromtimestamp(os.path.getmtime(fpath))
                if now - mtime > timedelta(hours=24):
                    os.remove(fpath)
                    for k, v in list(TASKS.items()):
                        if v.get("result") == fpath:
                            del TASKS[k]
            except Exception:
                pass
        time.sleep(3600)

threading.Thread(target=janitor, daemon=True).start()

# ───────── gradio ui ─────────────────────────────────────────────────────────────
with gr.Blocks(title="Generador de VΓ­deos IA") as demo:
    with gr.Tabs():
        with gr.TabItem("Crear VΓ­deo"):
            mode = gr.Radio(["Generar Guion con IA", "Usar Mi Guion"], value="Generar Guion con IA")
            topic = gr.Textbox(label="Tema")
            user_script = gr.Textbox(label="Guion Completo", visible=False)
            music = gr.Audio(type="filepath", label="MΓΊsica (opcional)")
            btn = gr.Button("Generar")
            tid_out = gr.Textbox(label="ID de tarea")
            msg = gr.Textbox(label="Estado")
        with gr.TabItem("Revisar Estado"):
            tid_in = gr.Textbox(label="ID de tarea")
            chk = gr.Button("Verificar")
            vid = gr.Video()
            dlf = gr.File()

    mode.change(
        lambda m: (gr.update(visible=m == "Generar Guion con IA"), gr.update(visible=m != "Generar Guion con IA")),
        mode, [topic, user_script]
    )
    btn.click(submit, [mode, topic, user_script, music], [tid_out, msg])
    chk.click(check, tid_in, [vid, dlf, msg])

if __name__ == "__main__":
    demo.launch()