INVIDEO_BASIC / app.py
gnosticdev's picture
Update app.py
9e5ee0a verified
raw
history blame
11.7 kB
import os
import re
import random
import time
import logging
from typing import Optional, List, Dict
from datetime import datetime
from pathlib import Path
# Configuraci贸n inicial para HF Spaces
os.environ["TOKENIZERS_PARALLELISM"] = "false"
os.environ["GRADIO_ANALYTICS_ENABLED"] = "false"
os.environ["HF_HUB_DISABLE_PROGRESS_BARS"] = "1"
# Configuraci贸n de logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)
try:
import requests
from moviepy.editor import VideoFileClip, concatenate_videoclips, AudioFileClip, CompositeAudioClip
from moviepy.audio.fx.all import audio_loop
import edge_tts
import gradio as gr
import numpy as np
from transformers import pipeline
import backoff
from pydub import AudioSegment
except ImportError as e:
logger.error(f"Error importing dependencies: {e}")
raise
# Constantes configurables
MAX_VIDEOS = 3
VIDEO_SEGMENT_DURATION = 5
MAX_RETRIES = 3
REQUEST_TIMEOUT = 15
# Voces disponibles en Edge TTS (espa帽ol)
VOICES = {
"Femenino MX": "es-MX-DaliaNeural",
"Masculino MX": "es-MX-JorgeNeural",
"Femenino ES": "es-ES-ElviraNeural",
"Masculino ES": "es-ES-AlvaroNeural",
"Femenino CO": "es-CO-SalomeNeural",
"Masculino CO": "es-CO-GonzaloNeural",
"Femenino AR": "es-AR-ElenaNeural",
"Masculino AR": "es-AR-TomasNeural"
}
# Configuraci贸n de modelos
MODEL_NAME = "facebook/mbart-large-50"
PEXELS_API_KEY = os.getenv("PEXELS_API_KEY", "")
@backoff.on_exception(backoff.expo,
(requests.exceptions.RequestException,
requests.exceptions.HTTPError),
max_tries=MAX_RETRIES,
max_time=30)
def safe_download(url: str, timeout: int = REQUEST_TIMEOUT) -> Optional[str]:
"""Descarga segura con reintentos"""
try:
response = requests.get(url, stream=True, timeout=timeout)
response.raise_for_status()
filename = f"temp_{random.randint(1000,9999)}.mp4"
with open(filename, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
return filename
except requests.exceptions.HTTPError as e:
if e.response.status_code == 429:
retry_after = int(e.response.headers.get('Retry-After', 5))
logger.warning(f"Rate limited. Waiting {retry_after} seconds...")
time.sleep(retry_after)
logger.error(f"Download failed: {str(e)}")
return None
except Exception as e:
logger.error(f"Unexpected download error: {str(e)}")
return None
def process_music(music_path: str, target_duration: float) -> str:
"""Procesa m煤sica para loop y duraci贸n correcta"""
processed_path = "processed_music.mp3"
try:
audio = AudioSegment.from_file(music_path)
# Crear loop si es m谩s corto que el video
if len(audio) < target_duration * 1000:
loops_needed = int(target_duration * 1000 / len(audio)) + 1
audio = audio * loops_needed
# Recortar a la duraci贸n exacta
audio = audio[:int(target_duration * 1000)]
audio.export(processed_path, format="mp3")
return processed_path
except Exception as e:
logger.error(f"Error processing music: {str(e)}")
return music_path # Fallback al original
def download_video_segment(url: str, duration: float, output_path: str) -> bool:
"""Descarga y procesa un segmento de video"""
temp_path = None
try:
temp_path = safe_download(url)
if not temp_path:
return False
with VideoFileClip(temp_path) as clip:
if clip.duration < 1:
logger.error("Video demasiado corto")
return False
end_time = min(duration, clip.duration - 0.1)
subclip = clip.subclip(0, end_time)
subclip.write_videofile(
output_path,
codec="libx264",
audio_codec="aac",
threads=2,
preset='ultrafast',
verbose=False,
ffmpeg_params=['-max_muxing_queue_size', '1024']
)
return True
except Exception as e:
logger.error(f"Video processing error: {str(e)}")
return False
finally:
if temp_path and os.path.exists(temp_path):
os.remove(temp_path)
def fetch_pexels_videos(query: str) -> List[str]:
"""Busca videos en Pexels"""
if not PEXELS_API_KEY:
logger.error("PEXELS_API_KEY no configurada")
return []
headers = {"Authorization": PEXELS_API_KEY}
url = f"https://api.pexels.com/videos/search?query={query}&per_page={MAX_VIDEOS}"
try:
response = requests.get(url, headers=headers, timeout=REQUEST_TIMEOUT)
response.raise_for_status()
videos = []
for video in response.json().get("videos", [])[:MAX_VIDEOS]:
video_files = [vf for vf in video.get("video_files", [])
if vf.get("width", 0) >= 720]
if video_files:
best_file = max(video_files, key=lambda x: x.get("width", 0))
videos.append(best_file["link"])
return videos
except Exception as e:
logger.error(f"Error fetching Pexels videos: {str(e)}")
return []
def generate_script(prompt: str, custom_script: Optional[str] = None) -> str:
"""Genera un script usando IA o custom text"""
if custom_script and custom_script.strip():
return custom_script.strip()
try:
generator = pipeline("text-generation", model=MODEL_NAME)
result = generator(
f"Genera un guion breve sobre {prompt} en espa帽ol con {MAX_VIDEOS} puntos:",
max_length=200,
num_return_sequences=1
)[0]['generated_text']
return result
except Exception as e:
logger.error(f"Error generating script: {str(e)}")
return f"1. Punto uno sobre {prompt}\n2. Punto dos\n3. Punto tres"
async def generate_voice(text: str, voice_id: str, output_file: str = "voice.mp3") -> bool:
"""Genera narraci贸n de voz"""
try:
communicate = edge_tts.Communicate(text, voice=voice_id)
await communicate.save(output_file)
return True
except Exception as e:
logger.error(f"Voice generation failed: {str(e)}")
return False
def run_async(coro):
"""Ejecuta corrutinas as铆ncronas"""
import asyncio
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
return loop.run_until_complete(coro)
finally:
loop.close()
def create_video(
prompt: str,
custom_script: Optional[str] = None,
voice_choice: str = "es-MX-DaliaNeural",
music_file: Optional[str] = None
) -> Optional[str]:
"""Funci贸n principal para crear el video"""
try:
# 1. Generar contenido
script = generate_script(prompt, custom_script)
logger.info(f"Script generado: {script[:100]}...")
# 2. Buscar videos
video_urls = fetch_pexels_videos(prompt)
if not video_urls:
logger.error("No se encontraron videos")
return None
# 3. Generar voz
voice_file = "voice.mp3"
if not run_async(generate_voice(script, voice_choice, voice_file)):
logger.error("No se pudo generar voz")
return None
# 4. Procesar m煤sica si existe
music_path = None
if music_file:
audio_clip = AudioFileClip(voice_file)
target_duration = audio_clip.duration
audio_clip.close()
music_path = process_music(music_file.name, target_duration)
# 5. Procesar videos
output_dir = "output"
os.makedirs(output_dir, exist_ok=True)
output_path = os.path.join(output_dir, f"video_{datetime.now().strftime('%Y%m%d_%H%M%S')}.mp4")
clips = []
segment_duration = VIDEO_SEGMENT_DURATION
for i, url in enumerate(video_urls):
clip_path = f"segment_{i}.mp4"
if download_video_segment(url, segment_duration, clip_path):
clips.append(VideoFileClip(clip_path))
if not clips:
logger.error("No se pudieron procesar los videos")
return None
# 6. Ensamblar video final
final_video = concatenate_videoclips(clips, method="compose")
voice_audio = AudioFileClip(voice_file)
if music_path:
music_audio = AudioFileClip(music_path)
final_audio = CompositeAudioClip([voice_audio, music_audio.volumex(0.3)])
else:
final_audio = voice_audio
final_video = final_video.set_audio(final_audio)
final_video.write_videofile(
output_path,
codec="libx264",
audio_codec="aac",
threads=2,
preset='ultrafast',
verbose=False
)
return output_path
except Exception as e:
logger.error(f"Error creating video: {str(e)}")
return None
finally:
# Limpieza
for clip in clips:
clip.close()
if os.path.exists(voice_file):
os.remove(voice_file)
if music_path and os.path.exists(music_path):
os.remove(music_path)
for i in range(len(video_urls)):
if os.path.exists(f"segment_{i}.mp4"):
os.remove(f"segment_{i}.mp4")
# Interfaz Gradio completa
with gr.Blocks(title="Generador de Videos Avanzado", theme=gr.themes.Soft()) as app:
gr.Markdown("# 馃幀 Generador de Videos con IA")
with gr.Row():
with gr.Column():
prompt_input = gr.Textbox(
label="Tema del video",
placeholder="Ej: Lugares tur铆sticos de Argentina",
max_lines=2
)
custom_script_input = gr.TextArea(
label="Guion personalizado (opcional)",
placeholder="Pega aqu铆 tu propio guion si lo tienes...",
lines=5
)
voice_dropdown = gr.Dropdown(
label="Selecciona una voz",
choices=list(VOICES.keys()),
value="Femenino MX"
)
music_input = gr.File(
label="M煤sica de fondo (opcional)",
type="file",
file_types=["audio"]
)
generate_btn = gr.Button("Generar Video", variant="primary")
with gr.Column():
output_video = gr.Video(
label="Video Resultante",
interactive=False,
format="mp4"
)
generate_btn.click(
fn=create_video,
inputs=[
prompt_input,
custom_script_input,
gr.Dropdown(value="es-MX-DaliaNeural", visible=False), # Valor real de voz
music_input
],
outputs=output_video
)
# Actualizar el valor de voz real cuando cambia el dropdown
voice_dropdown.change(
lambda x: VOICES[x],
inputs=voice_dropdown,
outputs=gr.Dropdown(visible=False)
)
# Para Hugging Face Spaces
if __name__ == "__main__":
app.launch(server_name="0.0.0.0", server_port=7860)