Spaces:
Sleeping
Sleeping
import os | |
import asyncio | |
import logging | |
import tempfile | |
import requests | |
from datetime import datetime | |
import edge_tts | |
import gradio as gr | |
import torch | |
from transformers import GPT2Tokenizer, GPT2LMHeadModel | |
from keybert import KeyBERT | |
from moviepy.editor import VideoFileClip, concatenate_videoclips, AudioFileClip, CompositeAudioClip | |
import re | |
import json | |
import uuid | |
import threading | |
from queue import Queue | |
import time | |
# Configuración de logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
# Directorio persistente para archivos | |
PERSIST_DIR = "persistent_data" | |
os.makedirs(PERSIST_DIR, exist_ok=True) | |
# Cola de procesamiento | |
processing_queue = Queue() | |
task_status = {} | |
# Clase para manejar tareas | |
class VideoTask: | |
def __init__(self, task_id, prompt_type, input_text, musica_file=None): | |
self.task_id = task_id | |
self.prompt_type = prompt_type | |
self.input_text = input_text | |
self.musica_file = musica_file | |
self.status = "pending" | |
self.progress = 0 | |
self.result = None | |
self.error = None | |
self.steps_completed = [] | |
def to_dict(self): | |
return { | |
"task_id": self.task_id, | |
"status": self.status, | |
"progress": self.progress, | |
"result": self.result, | |
"error": self.error, | |
"steps_completed": self.steps_completed | |
} | |
# Worker thread para procesar videos | |
def video_processor_worker(): | |
while True: | |
try: | |
task = processing_queue.get(timeout=1) | |
if task is None: | |
break | |
logger.info(f"Procesando tarea: {task.task_id}") | |
process_video_task(task) | |
except: | |
time.sleep(0.1) | |
continue | |
# Iniciar worker thread | |
worker_thread = threading.Thread(target=video_processor_worker, daemon=True) | |
worker_thread.start() | |
def save_task_state(task): | |
"""Guarda el estado de la tarea en un archivo JSON""" | |
task_file = os.path.join(PERSIST_DIR, f"{task.task_id}.json") | |
with open(task_file, 'w') as f: | |
json.dump(task.to_dict(), f) | |
def load_task_state(task_id): | |
"""Carga el estado de una tarea desde archivo""" | |
task_file = os.path.join(PERSIST_DIR, f"{task_id}.json") | |
if os.path.exists(task_file): | |
with open(task_file, 'r') as f: | |
return json.load(f) | |
return None | |
def process_video_task(task): | |
"""Procesa una tarea de video paso a paso""" | |
try: | |
task.status = "processing" | |
task.progress = 0 | |
save_task_state(task) | |
# Paso 1: Generar guión | |
task.progress = 10 | |
save_task_state(task) | |
if task.prompt_type == "Generar Guion con IA": | |
guion = generate_script_simple(task.input_text) | |
else: | |
guion = task.input_text.strip() | |
task.steps_completed.append("guion_generado") | |
task.progress = 20 | |
save_task_state(task) | |
# Paso 2: Generar audio TTS | |
audio_path = os.path.join(PERSIST_DIR, f"{task.task_id}_audio.mp3") | |
success = asyncio.run(text_to_speech_simple(guion, audio_path)) | |
if not success: | |
raise Exception("Error generando audio") | |
task.steps_completed.append("audio_generado") | |
task.progress = 40 | |
save_task_state(task) | |
# Paso 3: Buscar videos (simplificado) | |
keywords = extract_keywords_simple(guion) | |
video_urls = search_videos_simple(keywords) | |
task.steps_completed.append("videos_encontrados") | |
task.progress = 60 | |
save_task_state(task) | |
# Paso 4: Crear video final (simplificado) | |
output_path = os.path.join(PERSIST_DIR, f"{task.task_id}_final.mp4") | |
# Simulación de creación de video | |
# En producción, aquí iría tu lógica de moviepy | |
create_simple_video(video_urls, audio_path, output_path, task.musica_file) | |
task.steps_completed.append("video_creado") | |
task.progress = 100 | |
task.status = "completed" | |
task.result = output_path | |
save_task_state(task) | |
except Exception as e: | |
logger.error(f"Error procesando tarea {task.task_id}: {str(e)}") | |
task.status = "error" | |
task.error = str(e) | |
save_task_state(task) | |
# Funciones simplificadas | |
def generate_script_simple(prompt): | |
"""Versión simplificada de generación de guión""" | |
# Aquí puedes usar tu lógica GPT-2 existente | |
return f"Este es un video sobre {prompt}. Es fascinante y educativo." | |
async def text_to_speech_simple(text, output_path): | |
"""TTS simplificado""" | |
try: | |
communicate = edge_tts.Communicate(text, "es-ES-JuanNeural") | |
await communicate.save(output_path) | |
return os.path.exists(output_path) | |
except: | |
return False | |
def extract_keywords_simple(text): | |
"""Extracción simple de keywords""" | |
words = text.lower().split() | |
# Filtrar palabras comunes | |
keywords = [w for w in words if len(w) > 4][:3] | |
return keywords if keywords else ["nature", "video", "background"] | |
def search_videos_simple(keywords): | |
"""Búsqueda simplificada de videos""" | |
# Aquí iría tu lógica de Pexels | |
# Por ahora retornamos URLs de ejemplo | |
return ["video1.mp4", "video2.mp4"] | |
def create_simple_video(video_urls, audio_path, output_path, music_path=None): | |
"""Creación simplificada de video""" | |
# Aquí iría tu lógica de MoviePy | |
# Por ahora creamos un archivo dummy | |
with open(output_path, 'w') as f: | |
f.write("dummy video content") | |
time.sleep(2) # Simular procesamiento | |
# Interfaz Gradio mejorada | |
def submit_video_request(prompt_type, prompt_ia, prompt_manual, musica_file): | |
"""Envía una solicitud de video a la cola""" | |
input_text = prompt_ia if prompt_type == "Generar Guion con IA" else prompt_manual | |
if not input_text or not input_text.strip(): | |
return None, "Por favor ingresa un texto" | |
task_id = str(uuid.uuid4()) | |
task = VideoTask(task_id, prompt_type, input_text, musica_file) | |
# Guardar estado inicial | |
task_status[task_id] = task | |
save_task_state(task) | |
# Añadir a la cola | |
processing_queue.put(task) | |
return task_id, f"Tarea creada: {task_id}" | |
def check_video_status(task_id): | |
"""Verifica el estado de una tarea""" | |
if not task_id: | |
return "No hay ID de tarea", None, None | |
# Intentar cargar desde archivo | |
task_data = load_task_state(task_id) | |
if not task_data: | |
return "Tarea no encontrada", None, None | |
status = task_data['status'] | |
progress = task_data['progress'] | |
if status == "pending": | |
return f"⏳ En cola... ({progress}%)", None, None | |
elif status == "processing": | |
steps = ", ".join(task_data['steps_completed']) | |
return f"🔄 Procesando... ({progress}%) - Completado: {steps}", None, None | |
elif status == "completed": | |
video_path = task_data['result'] | |
if os.path.exists(video_path): | |
return "✅ Video completado!", video_path, video_path | |
else: | |
return "❌ Video completado pero archivo no encontrado", None, None | |
elif status == "error": | |
return f"❌ Error: {task_data['error']}", None, None | |
return "Estado desconocido", None, None | |
# Interfaz Gradio | |
with gr.Blocks(title="Generador de Videos con IA") as app: | |
gr.Markdown("# 🎬 Generador de Videos con IA (Sistema de Cola)") | |
with gr.Tabs(): | |
with gr.TabItem("Crear Video"): | |
with gr.Row(): | |
with gr.Column(): | |
prompt_type = gr.Radio( | |
["Generar Guion con IA", "Usar Mi Guion"], | |
label="Método de Entrada", | |
value="Generar Guion con IA" | |
) | |
prompt_ia = gr.Textbox( | |
label="Tema para IA", | |
placeholder="Describe el tema del video..." | |
) | |
prompt_manual = gr.Textbox( | |
label="Tu Guion Completo", | |
placeholder="Escribe tu guion aquí...", | |
visible=False | |
) | |
musica_input = gr.Audio( | |
label="Música de fondo (opcional)", | |
type="filepath" | |
) | |
submit_btn = gr.Button("📤 Enviar a Cola", variant="primary") | |
with gr.Column(): | |
task_id_output = gr.Textbox( | |
label="ID de Tarea", | |
interactive=False | |
) | |
submit_status = gr.Textbox( | |
label="Estado de Envío", | |
interactive=False | |
) | |
with gr.TabItem("Verificar Estado"): | |
with gr.Row(): | |
with gr.Column(): | |
task_id_input = gr.Textbox( | |
label="ID de Tarea", | |
placeholder="Pega aquí el ID de tu tarea..." | |
) | |
check_btn = gr.Button("🔍 Verificar Estado") | |
auto_check = gr.Checkbox( | |
label="Verificar automáticamente cada 5 segundos" | |
) | |
with gr.Column(): | |
status_output = gr.Textbox( | |
label="Estado Actual", | |
interactive=False | |
) | |
video_output = gr.Video( | |
label="Video Generado", | |
interactive=False | |
) | |
download_output = gr.File( | |
label="Descargar Video", | |
interactive=False | |
) | |
# Eventos | |
prompt_type.change( | |
lambda x: ( | |
gr.update(visible=x == "Generar Guion con IA"), | |
gr.update(visible=x == "Usar Mi Guion") | |
), | |
inputs=prompt_type, | |
outputs=[prompt_ia, prompt_manual] | |
) | |
submit_btn.click( | |
submit_video_request, | |
inputs=[prompt_type, prompt_ia, prompt_manual, musica_input], | |
outputs=[task_id_output, submit_status] | |
) | |
check_btn.click( | |
check_video_status, | |
inputs=[task_id_input], | |
outputs=[status_output, video_output, download_output] | |
) | |
# Auto-check cada 5 segundos si está activado | |
def auto_check_status(task_id, should_check): | |
if should_check and task_id: | |
return check_video_status(task_id) | |
return gr.update(), gr.update(), gr.update() | |
auto_check.change( | |
lambda x: gr.update(visible=x), | |
inputs=[auto_check], | |
outputs=[check_btn] | |
) | |
if __name__ == "__main__": | |
app.launch(server_name="0.0.0.0", server_port=7860) |