import os import random import requests import gradio as gr from moviepy.editor import VideoFileClip, concatenate_videoclips, AudioFileClip, CompositeVideoClip from moviepy.audio.fx.all import audio_loop import edge_tts import asyncio from datetime import datetime from pathlib import Path from transformers import pipeline import logging # Configure logging logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") logger = logging.getLogger(__name__) # Pexels API key from environment variable PEXELS_API_KEY = os.getenv("PEXELS_API_KEY") if not PEXELS_API_KEY: logger.error("PEXELS_API_KEY no encontrada en variables de entorno") logger.info("Loaded PEXELS_API_KEY from environment") # Ensure asyncio works with Gradio def run_async(coro): logger.info("Running async coroutine") try: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) result = loop.run_until_complete(coro) loop.close() logger.info("Async coroutine completed") return result except Exception as e: logger.error(f"Error en run_async: {e}") return None # Load lightweight text generation model for Spanish logger.info("Loading text generation model: facebook/mbart-large-50") try: generator = pipeline("text-generation", model="facebook/mbart-large-50", device="cpu") logger.info("Model loaded successfully") except Exception as e: logger.error(f"Error loading model: {e}") generator = None # List available Spanish voices for Edge TTS SPANISH_VOICES = [ "es-MX-DaliaNeural", "es-MX-JorgeNeural", "es-MX-CecilioNeural", "es-MX-BeatrizNeural", "es-MX-CandelaNeural", "es-MX-CarlosNeural", "es-MX-LarissaNeural", "es-MX-ManuelNeural", "es-MX-MarinaNeural", "es-MX-NuriaNeural" ] # Fetch videos from Pexels def fetch_pexels_videos(query, num_videos=5): logger.info(f"Fetching {num_videos} videos from Pexels with query: {query}") headers = {"Authorization": PEXELS_API_KEY} url = f"https://api.pexels.com/videos/search?query={query}&per_page={num_videos}" try: response = requests.get(url, headers=headers) if response.status_code == 200: videos = [video["video_files"][0]["link"] for video in response.json()["videos"]] logger.info(f"Fetched {len(videos)} videos from Pexels") return videos logger.error(f"Failed to fetch videos from Pexels. Status code: {response.status_code}") except Exception as e: logger.error(f"Error fetching videos from Pexels: {e}") return [] # Generate script using local model or custom text def generate_script(prompt, custom_text=None): logger.info("Generating script") if custom_text and custom_text.strip(): logger.info("Using custom text provided by user") return custom_text.strip() if not prompt or not prompt.strip(): logger.error("No prompt or custom text provided") return "Error: Debes proporcionar un prompt o un guion personalizado." # Generate script with local model if generator: input_text = f"Genera un guion para un video sobre '{prompt}'. Crea una lista numerada con descripciones breves (máximo 20 palabras por ítem) para un top 10 relacionado con el tema en español." logger.info(f"Generating script with prompt: {prompt}") try: result = generator(input_text, max_length=300, num_return_sequences=1, do_sample=True, truncation=True)[0]['generated_text'] logger.info("Script generated successfully") return result.strip() except Exception as e: logger.error(f"Error generating script: {e}") # Fallback mock response logger.info("Using fallback mock response") if "recetas" in prompt.lower(): return """ 1. Tacos al pastor: Jugosa carne marinada con piña. 2. Lasagna: Capas de pasta, carne y queso fundido. 3. Sushi: Arroz y pescado fresco en rollos delicados. 4. Pizza casera: Masa crujiente con tus ingredientes favoritos. 5. Paella: Arroz con mariscos y azafrán. 6. Ceviche: Pescado fresco marinado en limón. 7. Ramen: Caldo rico con fideos y cerdo. 8. Tiramisú: Postre cremoso con café y mascarpone. 9. Enchiladas: Tortillas rellenas con salsa picante. 10. Curry: Especias intensas con carne o vegetales. """ return f"Top 10 sobre {prompt}: No se pudo generar un guion específico." # Generate voice using Edge TTS async def generate_voice(text, output_file="output.mp3"): if not text or len(text.strip()) < 10: logger.error("Texto demasiado corto para generar voz") return None logger.info(f"Generating voice with Edge TTS") try: # Seleccionar una voz aleatoria voice = random.choice(SPANISH_VOICES) logger.info(f"Using voice: {voice}") communicate = edge_tts.Communicate(text, voice=voice) await communicate.save(output_file) logger.info(f"Voice generated and saved to {output_file}") return output_file except Exception as e: logger.error(f"Error generating voice: {e}") # Intentar con otra voz si falla for voice in SPANISH_VOICES: try: logger.info(f"Trying with voice: {voice}") communicate = edge_tts.Communicate(text, voice=voice) await communicate.save(output_file) logger.info(f"Voice generated with backup voice {voice}") return output_file except: continue logger.error("All voice attempts failed") return None # Download and trim video def download_and_trim_video(url, duration, output_path): logger.info(f"Downloading video from {url}") try: response = requests.get(url, stream=True) response.raise_for_status() with open("temp_video.mp4", "wb") as f: for chunk in response.iter_content(chunk_size=1024): f.write(chunk) logger.info("Trimming video") clip = VideoFileClip("temp_video.mp4") clip_duration = min(duration, clip.duration) clip = clip.subclip(0, clip_duration) clip.write_videofile(output_path, codec="libx264", audio_codec="aac", threads=4) clip.close() os.remove("temp_video.mp4") logger.info(f"Video trimmed and saved to {output_path}") return output_path except Exception as e: logger.error(f"Error downloading/trimming video: {e}") return None # Main video creation function def create_video(prompt, custom_text, music_file): logger.info("Starting video creation process") # Validar inputs if not prompt and not custom_text: logger.error("No prompt or custom text provided") return "Error: Debes proporcionar un prompt o un guion personalizado." output_dir = "output_videos" os.makedirs(output_dir, exist_ok=True) timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") output_video = f"{output_dir}/video_{timestamp}.mp4" logger.info(f"Output video will be saved to {output_video}") # Generate or use provided script script = generate_script(prompt, custom_text) if "Error" in script: logger.error(script) return script # Generate voice voice_file = "temp_audio.mp3" voice_result = run_async(generate_voice(script, voice_file)) if not voice_result or not os.path.exists(voice_file): logger.error("Voice generation failed") return "Error: No se pudo generar la voz. Intenta con un texto diferente." try: audio = AudioFileClip(voice_file) video_duration = audio.duration logger.info(f"Audio duration: {video_duration} seconds") except Exception as e: logger.error(f"Error loading audio file: {e}") return "Error: El archivo de audio generado es inválido." # Fetch Pexels videos query = prompt.split()[0] if prompt else "generic" video_urls = fetch_pexels_videos(query, num_videos=5) if not video_urls: logger.error("No videos found on Pexels") return "Error: No se encontraron videos en Pexels." # Download and trim videos clips = [] for i, url in enumerate(video_urls): clip_path = f"temp_clip_{i}.mp4" result = download_and_trim_video(url, video_duration / len(video_urls), clip_path) if result: try: clip = VideoFileClip(clip_path) clips.append(clip) logger.info(f"Processed video clip {i+1}/{len(video_urls)}") except Exception as e: logger.error(f"Error loading clip {i+1}: {e}") continue if not clips: logger.error("No valid video clips available") return "Error: No se pudieron procesar los videos descargados." # Concatenate video clips logger.info("Concatenating video clips") try: final_clip = concatenate_videoclips(clips, method="compose") final_clip = final_clip.set_duration(video_duration) except Exception as e: logger.error(f"Error concatenating clips: {e}") return "Error: No se pudieron unir los segmentos de video." # Add looped music or voice try: if music_file: logger.info("Adding user-uploaded music") music = AudioFileClip(music_file.name) music = audio_loop(music, duration=video_duration) final_audio = CompositeAudioClip([audio, music.volumex(0.3)]) else: logger.info("Using generated voice as audio") final_audio = audio final_clip = final_clip.set_audio(final_audio) except Exception as e: logger.error(f"Error processing audio: {e}") return "Error: Problema al procesar el audio." # Write final video logger.info(f"Writing final video to {output_video}") try: final_clip.write_videofile( output_video, codec="libx264", audio_codec="aac", fps=24, threads=4, preset='ultrafast', bitrate="3000k" ) except Exception as e: logger.error(f"Error writing video file: {e}") return "Error: No se pudo generar el video final." # Clean up logger.info("Cleaning up temporary files") try: for clip in clips: clip.close() audio.close() if music_file: music.close() final_clip.close() os.remove(voice_file) for i in range(len(video_urls)): if os.path.exists(f"temp_clip_{i}.mp4"): os.remove(f"temp_clip_{i}.mp4") except Exception as e: logger.error(f"Error during cleanup: {e}") logger.info("Video creation completed successfully") return output_video # Gradio interface with gr.Blocks() as demo: gr.Markdown("# Generador de Videos Automáticos") gr.Markdown(""" Crea videos automáticos con voz en español. Ingresa un tema (ej. 'Top 10 recetas mexicanas') o escribe tu propio guion. """) with gr.Row(): with gr.Column(): prompt = gr.Textbox( label="Tema del Video", placeholder="Ejemplo: Top 10 lugares para visitar en México", max_lines=2 ) custom_text = gr.Textbox( label="Guion Personalizado (opcional)", placeholder="Pega aquí tu propio guion si prefieres...", max_lines=10 ) music_file = gr.File( label="Música de Fondo (opcional, MP3)", file_types=[".mp3"] ) submit = gr.Button("Generar Video", variant="primary") with gr.Column(): output = gr.Video(label="Video Generado", format="mp4") gr.Examples( examples=[ ["Top 10 ciudades de México", "", None], ["", "1. Ciudad de México - La capital vibrante\n2. Guadalajara - Cuna del mariachi\n3. Monterrey - Modernidad industrial", None] ], inputs=[prompt, custom_text, music_file], label="Ejemplos para probar" ) submit.click( fn=create_video, inputs=[prompt, custom_text, music_file], outputs=output, api_name="generate_video" ) demo.launch(server_name="0.0.0.0", server_port=7860, share=True)