import os import re import random import time import logging from typing import Optional, List from datetime import datetime from pathlib import Path # Configuración inicial para HF Spaces os.environ["TOKENIZERS_PARALLELISM"] = "false" os.environ["GRADIO_ANALYTICS_ENABLED"] = "false" os.environ["HF_HUB_DISABLE_PROGRESS_BARS"] = "1" # Configuración de logging logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") logger = logging.getLogger(__name__) try: import requests from moviepy.editor import VideoFileClip, concatenate_videoclips, AudioFileClip, CompositeAudioClip from moviepy.audio.fx.all import audio_loop import edge_tts import gradio as gr import numpy as np from transformers import pipeline import backoff except ImportError as e: logger.error(f"Error importing dependencies: {e}") raise # Constantes configurables MAX_VIDEOS = 3 # Reducir para evitar rate limiting VIDEO_SEGMENT_DURATION = 5 # Duración de cada segmento en segundos MAX_RETRIES = 3 # Máximo de reintentos para descargas REQUEST_TIMEOUT = 15 # Timeout para requests # Configuración de modelos MODEL_NAME = "facebook/mbart-large-50" PEXELS_API_KEY = os.getenv("PEXELS_API_KEY", "") @backoff.on_exception(backoff.expo, (requests.exceptions.RequestException, requests.exceptions.HTTPError), max_tries=MAX_RETRIES, max_time=30) def safe_download(url: str, timeout: int = REQUEST_TIMEOUT) -> Optional[str]: """Descarga segura con reintentos y manejo de rate limiting""" try: response = requests.get(url, stream=True, timeout=timeout) response.raise_for_status() filename = f"temp_{random.randint(1000,9999)}.mp4" with open(filename, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): f.write(chunk) return filename except requests.exceptions.HTTPError as e: if e.response.status_code == 429: retry_after = int(e.response.headers.get('Retry-After', 5)) logger.warning(f"Rate limited. Waiting {retry_after} seconds...") time.sleep(retry_after) logger.error(f"Download failed: {str(e)}") return None except Exception as e: logger.error(f"Unexpected download error: {str(e)}") return None def download_video_segment(url: str, duration: float, output_path: str) -> bool: """Descarga y procesa un segmento de video""" temp_path = None try: temp_path = safe_download(url) if not temp_path: return False with VideoFileClip(temp_path) as clip: if clip.duration < 1: logger.error("Video demasiado corto") return False end_time = min(duration, clip.duration - 0.1) subclip = clip.subclip(0, end_time) # Configuración optimizada para HF Spaces subclip.write_videofile( output_path, codec="libx264", audio_codec="aac", threads=2, preset='ultrafast', verbose=False, ffmpeg_params=[ '-max_muxing_queue_size', '1024', '-movflags', '+faststart' ] ) return True except Exception as e: logger.error(f"Video processing error: {str(e)}") return False finally: if temp_path and os.path.exists(temp_path): os.remove(temp_path) def fetch_pexels_videos(query: str) -> List[str]: """Busca videos en Pexels con manejo de errores""" if not PEXELS_API_KEY: logger.error("PEXELS_API_KEY no configurada") return [] headers = {"Authorization": PEXELS_API_KEY} url = f"https://api.pexels.com/videos/search?query={query}&per_page={MAX_VIDEOS}" try: response = requests.get(url, headers=headers, timeout=REQUEST_TIMEOUT) response.raise_for_status() videos = [] for video in response.json().get("videos", [])[:MAX_VIDEOS]: video_files = [vf for vf in video.get("video_files", []) if vf.get("width", 0) >= 720] # Calidad mínima if video_files: best_file = max(video_files, key=lambda x: x.get("width", 0)) videos.append(best_file["link"]) return videos except Exception as e: logger.error(f"Error fetching Pexels videos: {str(e)}") return [] def generate_script(prompt: str) -> str: """Genera un script usando IA local con fallback""" try: generator = pipeline("text-generation", model=MODEL_NAME) result = generator( f"Genera un guion breve sobre {prompt} en español con {MAX_VIDEOS} puntos:", max_length=200, num_return_sequences=1 )[0]['generated_text'] return result except Exception as e: logger.error(f"Error generating script: {str(e)}") return f"1. Punto uno sobre {prompt}\n2. Punto dos\n3. Punto tres" async def generate_voice(text: str, output_file: str = "voice.mp3") -> bool: """Genera narración de voz con manejo de errores""" try: communicate = edge_tts.Communicate(text, voice="es-MX-DaliaNeural") await communicate.save(output_file) return True except Exception as e: logger.error(f"Voice generation failed: {str(e)}") return False def run_async(coro): """Ejecuta corrutinas asíncronas desde código síncrono""" import asyncio loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: return loop.run_until_complete(coro) finally: loop.close() def create_video(prompt: str) -> Optional[str]: """Función principal para crear el video""" try: # 1. Generar contenido script = generate_script(prompt) logger.info(f"Script generado: {script[:100]}...") # 2. Buscar videos video_urls = fetch_pexels_videos(prompt) if not video_urls: logger.error("No se encontraron videos") return None # 3. Generar voz voice_file = "voice.mp3" if not run_async(generate_voice(script, voice_file)): logger.error("No se pudo generar voz") return None # 4. Procesar videos output_dir = "output" os.makedirs(output_dir, exist_ok=True) output_path = os.path.join(output_dir, f"video_{datetime.now().strftime('%Y%m%d_%H%M%S')}.mp4") clips = [] segment_duration = VIDEO_SEGMENT_DURATION for i, url in enumerate(video_urls): clip_path = f"segment_{i}.mp4" if download_video_segment(url, segment_duration, clip_path): clips.append(VideoFileClip(clip_path)) if not clips: logger.error("No se pudieron procesar los videos") return None # 5. Ensamblar video final final_video = concatenate_videoclips(clips, method="compose") audio_clip = AudioFileClip(voice_file) final_video = final_video.set_audio(audio_clip) final_video.write_videofile( output_path, codec="libx264", audio_codec="aac", threads=2, preset='ultrafast', verbose=False ) return output_path except Exception as e: logger.error(f"Error creating video: {str(e)}") return None finally: # Limpieza for clip in clips: clip.close() if os.path.exists(voice_file): os.remove(voice_file) for i in range(len(video_urls)): if os.path.exists(f"segment_{i}.mp4"): os.remove(f"segment_{i}.mp4") # Interfaz Gradio optimizada with gr.Blocks(title="Generador de Videos HF", theme=gr.themes.Soft()) as app: gr.Markdown("# 🎥 Generador Automático de Videos") with gr.Row(): with gr.Column(): prompt_input = gr.Textbox( label="Tema del video", placeholder="Ej: Paisajes naturales de Chile", max_lines=2 ) generate_btn = gr.Button("Generar Video", variant="primary") with gr.Column(): output_video = gr.Video(label="Resultado", interactive=False) generate_btn.click( fn=create_video, inputs=prompt_input, outputs=output_video ) # Para Hugging Face Spaces if __name__ == "__main__": app.launch(server_name="0.0.0.0", server_port=7860)