Spaces:
Sleeping
Sleeping
import os, re, math, uuid, time, shutil, logging, tempfile, threading, requests, asyncio, numpy as np | |
from datetime import datetime, timedelta | |
from collections import Counter | |
import gradio as gr | |
import torch | |
from transformers import GPT2Tokenizer, GPT2LMHeadModel | |
from keybert import KeyBERT | |
import edge_tts | |
from moviepy.editor import ( | |
VideoFileClip, AudioFileClip, concatenate_videoclips, concatenate_audioclips, | |
CompositeAudioClip, AudioClip, TextClip, CompositeVideoClip, VideoClip | |
) | |
# ------------------- Configuración & Globals ------------------- | |
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") | |
logger = logging.getLogger(__name__) | |
PEXELS_API_KEY = os.getenv("PEXELS_API_KEY") | |
if not PEXELS_API_KEY: | |
raise RuntimeError("Debes definir PEXELS_API_KEY en 'Settings' -> 'Variables & secrets'") | |
# Carga de modelos (se hace una sola vez al iniciar el Space) | |
tokenizer = GPT2Tokenizer.from_pretrained("datificate/gpt2-small-spanish") | |
gpt2_model = GPT2LMHeadModel.from_pretrained("datificate/gpt2-small-spanish").eval() | |
if tokenizer.pad_token is None: | |
tokenizer.pad_token = tokenizer.eos_token | |
kw_model = KeyBERT("distilbert-base-multilingual-cased") | |
RESULTS_DIR = "video_results" | |
os.makedirs(RESULTS_DIR, exist_ok=True) | |
TASKS = {} # Diccionario para almacenar el estado de las tareas | |
# ------------------- Funciones del Pipeline de Vídeo ------------------- | |
def get_edge_voices_es(): | |
"""Obtiene y cachea la lista de voces en español de edge-tts.""" | |
try: | |
voices = asyncio.run(edge_tts.list_voices()) | |
es_voices = [v['ShortName'] for v in voices if v['Locale'].startswith('es-')] | |
return sorted(es_voices) | |
except Exception as e: | |
logger.error(f"No se pudieron cargar las voces de Edge TTS: {e}") | |
return ["es-ES-ElviraNeural"] # Fallback | |
SPANISH_VOICES = get_edge_voices_es() | |
def gpt2_script(prompt: str, max_len: int = 160) -> str: | |
instruction = f"Escribe un guion corto, interesante y coherente sobre: {prompt}" | |
inputs = tokenizer(instruction, return_tensors="pt", truncation=True, max_length=512) | |
outputs = gpt2_model.generate( | |
**inputs, max_length=max_len + inputs["input_ids"].shape[1], do_sample=True, | |
top_p=0.9, top_k=40, temperature=0.7, no_repeat_ngram_size=3, | |
pad_token_id=tokenizer.pad_token_id, eos_token_id=tokenizer.eos_token_id, | |
) | |
text = tokenizer.decode(outputs[0], skip_special_tokens=True) | |
return text.split("sobre:")[-1].strip()[:max_len] | |
async def edge_tts_synth(text: str, voice: str, path: str): | |
"""Sintetiza audio usando edge-tts de forma asíncrona.""" | |
communicate = edge_tts.Communicate(text, voice) | |
await communicate.save(path) | |
def keywords(text: str) -> list[str]: | |
clean_text = re.sub(r"[^\w\sáéíóúñÁÉÍÓÚÑ]", "", text.lower()) | |
try: | |
kws = kw_model.extract_keywords(clean_text, stop_words="spanish", top_n=5) | |
return [k.replace(" ", "+") for k, _ in kws if k] | |
except Exception: | |
words = [w for w in clean_text.split() if len(w) > 4] | |
return [w for w, _ in Counter(words).most_common(5)] or ["naturaleza"] | |
def pexels_search(query: str, count: int) -> list[dict]: | |
res = requests.get( | |
"https://api.pexels.com/videos/search", | |
headers={"Authorization": PEXELS_API_KEY}, | |
params={"query": query, "per_page": count, "orientation": "landscape"}, | |
timeout=20, | |
) | |
res.raise_for_status() | |
return res.json().get("videos", []) | |
def download_file(url: str, folder: str) -> str | None: | |
name = uuid.uuid4().hex + ".mp4" | |
path = os.path.join(folder, name) | |
with requests.get(url, stream=True, timeout=60) as r: | |
r.raise_for_status() | |
with open(path, "wb") as f: | |
for chunk in r.iter_content(1024 * 1024): | |
f.write(chunk) | |
return path if os.path.exists(path) and os.path.getsize(path) > 1000 else None | |
def loop_audio(audio_clip: AudioFileClip, duration: float) -> AudioFileClip: | |
if audio_clip.duration >= duration: | |
return audio_clip.subclip(0, duration) | |
loops = math.ceil(duration / audio_clip.duration) | |
return concatenate_audioclips([audio_clip] * loops).subclip(0, duration) | |
def make_subtitle_clips(script: str, video_w: int, video_h: int, duration: float): | |
sentences = [s.strip() for s in re.split(r"[.!?¿¡]", script) if s.strip()] | |
if not sentences: return [] | |
total_words = sum(len(s.split()) for s in sentences) | |
if total_words == 0: return [] | |
time_per_word = duration / total_words | |
clips, current_time = [], 0.0 | |
for sentence in sentences: | |
num_words = len(sentence.split()) | |
sentence_duration = num_words * time_per_word | |
txt_clip = ( | |
TextClip(sentence, fontsize=int(video_h * 0.05), color="white", | |
stroke_color="black", stroke_width=1.5, method="caption", | |
size=(int(video_w * 0.9), None), font="Arial-Bold") | |
.set_start(current_time) | |
.set_duration(sentence_duration) | |
.set_position(("center", "bottom")) | |
) | |
clips.append(txt_clip) | |
current_time += sentence_duration | |
return clips | |
def make_grain_clip(size: tuple[int, int], duration: float): | |
w, h = size | |
def make_frame(t): | |
noise = np.random.randint(0, 40, (h, w, 1), dtype=np.uint8) | |
return np.repeat(noise, 3, axis=2) | |
return VideoClip(make_frame, duration=duration).set_opacity(0.15) | |
# ------------------- Función Principal de Creación de Vídeo ------------------- | |
def build_video(script_text: str, generate_script_flag: bool, voice: str, music_path: str | None) -> str: | |
tmp_dir = tempfile.mkdtemp() | |
# 1. Guion | |
script = gpt2_script(script_text) if generate_script_flag else script_text.strip() | |
# 2. Voz (TTS) | |
voice_path = os.path.join(tmp_dir, "voice.mp3") | |
asyncio.run(edge_tts_synth(script, voice, voice_path)) | |
voice_clip = AudioFileClip(voice_path) | |
video_duration = voice_clip.duration | |
# 3. Clips de Pexels | |
video_paths = [] | |
for kw in keywords(script): | |
if len(video_paths) >= 8: break | |
for video_data in pexels_search(kw, 2): | |
best_file = max(video_data["video_files"], key=lambda f: f.get("width", 0) * f.get("height", 0)) | |
path = download_file(best_file['link'], tmp_dir) | |
if path: | |
video_paths.append(path) | |
if len(video_paths) >= 8: break | |
if not video_paths: | |
raise RuntimeError("No se encontraron vídeos en Pexels para este guion.") | |
# 4. Ensamblado de vídeo base | |
segments, total_duration = [], 0 | |
for path in video_paths: | |
if total_duration >= video_duration + 5: break | |
clip = VideoFileClip(path) | |
segment = clip.subclip(0, min(8, clip.duration)) | |
segments.append(segment) | |
total_duration += segment.duration | |
base_video = concatenate_videoclips(segments, method="chain") | |
if base_video.duration < video_duration: | |
base_video = loop_audio(base_video, video_duration) # Reutiliza loop_audio para vídeo si es necesario | |
base_video = base_video.subclip(0, video_duration) | |
# 5. Audio de fondo | |
if music_path: | |
music_clip = loop_audio(AudioFileClip(music_path), video_duration).volumex(0.20) | |
final_audio = CompositeAudioClip([music_clip, voice_clip]) | |
else: | |
final_audio = voice_clip | |
# 6. Efectos y subtítulos | |
subtitles = make_subtitle_clips(script, base_video.w, base_video.h, video_duration) | |
grain_effect = make_grain_clip(base_video.size, video_duration) | |
# 7. Composición final y renderizado | |
final_video = CompositeVideoClip([base_video, grain_effect, *subtitles]).set_audio(final_audio) | |
output_path = os.path.join(tmp_dir, "final_video.mp4") | |
final_video.write_videofile(output_path, fps=24, codec="libx264", audio_codec="aac", logger=None) | |
return output_path | |
# ------------------- Sistema de Tareas Asíncronas y Limpieza ------------------- | |
def worker(task_id: str, mode: str, topic: str, user_script: str, voice: str, music: str | None): | |
try: | |
text = topic if mode == "Generar Guion con IA" else user_script | |
result_tmp_path = build_video(text, mode == "Generar Guion con IA", voice, music) | |
final_path = os.path.join(RESULTS_DIR, f"{task_id}.mp4") | |
shutil.copy2(result_tmp_path, final_path) | |
TASKS[task_id] = {"status": "done", "result": final_path, "timestamp": datetime.utcnow()} | |
shutil.rmtree(os.path.dirname(result_tmp_path)) # Limpia el directorio temporal | |
except Exception as e: | |
logger.error(f"Error en la tarea {task_id}: {e}", exc_info=True) | |
TASKS[task_id] = {"status": "error", "error": str(e), "timestamp": datetime.utcnow()} | |
def submit_task(mode, topic, user_script, voice, music): | |
content = topic if mode == "Generar Guion con IA" else user_script | |
if not content.strip(): | |
return "", "Por favor, ingresa un tema o guion." | |
task_id = uuid.uuid4().hex[:8] | |
TASKS[task_id] = {"status": "processing", "timestamp": datetime.utcnow()} | |
threading.Thread(target=worker, args=(task_id, mode, topic, user_script, voice, music), daemon=True).start() | |
return task_id, f"✅ Tarea creada con ID: {task_id}. Comprueba el estado en unos minutos." | |
def check_task_status(task_id): | |
if not task_id or task_id not in TASKS: | |
return None, None, "ID de tarea no válido o no encontrado." | |
task_info = TASKS[task_id] | |
status = task_info["status"] | |
if status == "processing": | |
return None, None, "⏳ La tarea se está procesando..." | |
if status == "error": | |
return None, None, f"❌ Error en la tarea: {task_info['error']}" | |
if status == "done": | |
return task_info["result"], task_info["result"], "✅ ¡Vídeo listo para descargar!" | |
return None, None, "Estado desconocido." | |
def janitor_thread(): | |
"""Hilo que se ejecuta periódicamente para limpiar vídeos antiguos.""" | |
while True: | |
time.sleep(3600) # Cada hora | |
now = datetime.utcnow() | |
for task_id, info in list(TASKS.items()): | |
if now - info["timestamp"] > timedelta(hours=24): | |
if info.get("result") and os.path.exists(info["result"]): | |
try: | |
os.remove(info["result"]) | |
logger.info(f"Limpiado vídeo antiguo: {info['result']}") | |
except Exception as e: | |
logger.error(f"Error al limpiar {info['result']}: {e}") | |
del TASKS[task_id] | |
threading.Thread(target=janitor_thread, daemon=True).start() | |
# ------------------- Interfaz de Gradio ------------------- | |
with gr.Blocks(title="Generador de Vídeos IA", theme=gr.themes.Soft()) as demo: | |
gr.Markdown("# 🎬 Generador de Vídeos con IA") | |
gr.Markdown("Crea vídeos a partir de texto, con voz, música, subtítulos y efectos visuales.") | |
with gr.Tabs(): | |
with gr.TabItem("1. Crear Vídeo"): | |
with gr.Row(): | |
with gr.Column(scale=2): | |
mode_radio = gr.Radio(["Generar Guion con IA", "Usar Mi Guion"], value="Generar Guion con IA", label="Elige el método") | |
topic_textbox = gr.Textbox(label="Tema para la IA", placeholder="Ej: La historia de la Vía Láctea") | |
script_textbox = gr.Textbox(label="Tu Guion Completo", lines=5, visible=False, placeholder="Pega aquí tu guion...") | |
voice_dropdown = gr.Dropdown(SPANISH_VOICES, value=SPANISH_VOICES[0] if SPANISH_VOICES else None, label="Elige una voz") | |
music_upload = gr.Audio(type="filepath", label="Música de fondo (opcional)") | |
submit_button = gr.Button("✨ Generar Vídeo", variant="primary") | |
with gr.Column(scale=1): | |
task_id_output = gr.Textbox(label="ID de tu Tarea (Guárdalo)", interactive=False) | |
status_output = gr.Textbox(label="Estado", interactive=False) | |
gr.Markdown("---") | |
gr.Markdown("### ¿Cómo funciona?\n1. Elige un método y rellena el texto.\n2. (Opcional) Sube música de fondo.\n3. Pulsa **Generar Vídeo**.\n4. **Copia el ID** que aparecerá.\n5. Ve a la pestaña **'2. Revisar Estado'** para ver tu vídeo.") | |
with gr.TabItem("2. Revisar Estado"): | |
gr.Markdown("### Consulta el estado de tu vídeo") | |
with gr.Row(): | |
task_id_input = gr.Textbox(label="Pega aquí el ID de tu tarea", scale=3) | |
check_button = gr.Button("🔍 Verificar", scale=1) | |
status_check_output = gr.Textbox(label="Estado Actual", interactive=False) | |
video_output = gr.Video(label="Resultado del Vídeo") | |
download_file_output = gr.File(label="Descargar Fichero") | |
# Lógica de la interfaz | |
def toggle_textboxes(mode): | |
is_ai_mode = mode == "Generar Guion con IA" | |
return gr.update(visible=is_ai_mode), gr.update(visible=not is_ai_mode) | |
mode_radio.change(toggle_textboxes, inputs=mode_radio, outputs=[topic_textbox, script_textbox]) | |
submit_button.click(submit_task, inputs=[mode_radio, topic_textbox, script_textbox, voice_dropdown, music_upload], outputs=[task_id_output, status_output]) | |
check_button.click(check_task_status, inputs=task_id_input, outputs=[video_output, download_file_output, status_check_output]) | |
if __name__ == "__main__": | |
demo.launch() |