podcastgen / app.py
Rausda6's picture
Update app.py
a9df8f7 verified
import gradio as gr
from pydub import AudioSegment
import json
import uuid
import edge_tts
import asyncio
import aiofiles
import os
import time
import torch
import re
from typing import List, Dict, Optional
from transformers import AutoTokenizer, AutoModelForCausalLM, GenerationConfig
import PyPDF2
import traceback
import os
import shutil
from pathlib import Path
model_subdir = Path.home() / ".cache" / "huggingface" / "hub" / "models--unsloth--Llama-3.2-3B"
# Enable persistent caching on Hugging Face Spaces (if persistent storage is enabled)
os.environ["TRANSFORMERS_CACHE"] = "/data/models"
#from git import Repo
#Repo.clone_from("https://huggingface.co/unsloth/Llama-3.2-3B-bnb-4bit", "./local_model_dir")
# Constants
MAX_FILE_SIZE_MB = 20
MAX_FILE_SIZE_BYTES = MAX_FILE_SIZE_MB * 1024 * 1024
MODEL_ID = "meta-llama/Meta-Llama-3-8B" #unsloth/Llama-3.2-3B" #meta-llama/Meta-Llama-3-8B"# unsloth/Llama-3.2-3B"#meta-llama/Meta-Llama-3-8B" #"unsloth/Llama-4-Scout-17B-16E-Instruct-GGUF"# unsloth/Qwen2.5-1.5B" #unsloth/Llama-3.2-3B" #unsloth/Llama-3.2-1B"
glotoken = os.environ.get("Tokentest")
# Global logging system -
logs = []
def add_log(message):
"""Thread-safe logging function"""
logs.append(f"[{time.strftime('%H:%M:%S')}] {message}")
print(message)
# Initialize model with comprehensive error handling
model = None
tokenizer = None
generation_config = None
def test_llm_generation():
try:
test_prompt = "Hello, how are you today?"
inputs = tokenizer(test_prompt, return_tensors="pt").to(model.device)
with torch.no_grad():
outputs = model.generate(
**inputs,
max_new_tokens=10,
do_sample=False,
pad_token_id=tokenizer.pad_token_id,
eos_token_id=tokenizer.eos_token_id
)
result = tokenizer.decode(outputs[0], skip_special_tokens=True)
add_log(f"πŸ§ͺ Test LLM response: {result[:100]}")
except Exception as e:
add_log(f"❌ LLM quick test failed: {e}")
def initialize_model():
global model, tokenizer, generation_config
try:
add_log("πŸ”„ Initializing model...")
tokenizer = AutoTokenizer.from_pretrained(
MODEL_ID,
cache_dir="/data/models",
token=glotoken,
trust_remote_code=True,
use_fast=False
)
if tokenizer.pad_token is None:
tokenizer.pad_token = tokenizer.eos_token
add_log("βœ… Set pad_token to eos_token")
# Force GPU settings
model = AutoModelForCausalLM.from_pretrained(
MODEL_ID,
torch_dtype=torch.float16,
cache_dir="/data/models",
trust_remote_code=True,
token=glotoken,
device_map={"": 0}, # <- force GPU:0
low_cpu_mem_usage=True
)
# model = AutoModelForCausalLM.from_pretrained(
# MODEL_ID,
# cache_dir="/data/models",
# trust_remote_code=True
# )
model.eval()
generation_config = GenerationConfig(
max_new_tokens=4096,
temperature=0.7,
top_p=0.9,
do_sample=True,
pad_token_id=tokenizer.pad_token_id,
eos_token_id=tokenizer.eos_token_id,
repetition_penalty=1.1,
length_penalty=1.0
)
add_log(f"βœ… Model loaded successfully on device: {model.device}")
return True
except Exception as e:
error_msg = f"❌ Model initialization failed: {str(e)}"
add_log(error_msg)
add_log(f"Traceback: {traceback.format_exc()}")
return False
except Exception as e:
error_msg = f"❌ Model initialization failed: {str(e)}"
add_log(error_msg)
add_log(f"Traceback: {traceback.format_exc()}")
return False
# Initialize model at startup
model_loaded = initialize_model()
class PodcastGenerator:
def __init__(self):
self.model = model
self.tokenizer = tokenizer
self.generation_config = generation_config
def extract_text_from_pdf(self, file_path: str) -> str:
"""Extract text from PDF file - CRITICAL FIX #3"""
try:
add_log(f"πŸ“– Extracting text from PDF: {file_path}")
with open(file_path, 'rb') as file:
pdf_reader = PyPDF2.PdfReader(file)
text = ""
for page_num, page in enumerate(pdf_reader.pages):
try:
page_text = page.extract_text()
text += page_text + "\n"
add_log(f"βœ… Extracted page {page_num + 1}")
except Exception as e:
add_log(f"⚠️ Failed to extract page {page_num + 1}: {e}")
continue
if not text.strip():
raise Exception("No text could be extracted from PDF")
add_log(f"βœ… PDF extraction complete. Text length: {len(text)} characters")
return text.strip()
except Exception as e:
error_msg = f"❌ PDF extraction failed: {str(e)}"
add_log(error_msg)
raise Exception(error_msg)
async def postprocess_conversation(self, raw_text: str) -> str:
"""Run LLM again to enforce strict Speaker 1/2 format"""
prompt = f"""
You are a podcast formatter.
You just reformat text as if two persons have a conversation
- Every line begins with exactly and strictily with `Speaker 1:` or `Speaker 2:` (with colon)
- No timestamps, no names, no parentheses, no extra formatting, no chapter names, no special characters beside ":"
- No blank lines allowed
- Do not invent or change the content, do not add or use -any- person or speaker names, chapeter names , time stamps etc
- you are not allowed to use anywhere in the text the character +#-*<>"()[]
Example output - you have to follow this structure:
Speaker 1: Hello and welcome.
Speaker 2: Thanks! Glad to be here.
Speaker 1: ...
Speaker 2: ...
Speaker 1: ...
Speaker 2: ...
Now format the following according to above instructions
{raw_text}
"""
inputs = self.tokenizer(
prompt,
return_tensors="pt",
truncation=True,
max_length=2048
)
inputs = {k: v.to(self.model.device) for k, v in inputs.items()}
#inputs = {k: v for k, v in inputs.items()}
with torch.no_grad():
outputs = self.model.generate(
**inputs,
max_new_tokens=1024,
pad_token_id=self.tokenizer.pad_token_id,
eos_token_id=self.tokenizer.eos_token_id
)
formatted = self.tokenizer.decode(
outputs[0][inputs['input_ids'].shape[1]:],
skip_special_tokens=True
)
return formatted.strip()
def clean_and_validate_json(self, text: str) -> Dict:
"""Improved JSON extraction and validation - CRITICAL FIX #4"""
add_log("πŸ” Attempting to extract JSON from generated text")
# Multiple strategies for JSON extraction
strategies = [
# Strategy 1: Look for complete JSON objects
r'\{[^{}]*"topic"[^{}]*"podcast"[^{}]*\[[^\]]*\][^{}]*\}',
# Strategy 2: More flexible pattern
r'\{.*?"topic".*?"podcast".*?\[.*?\].*?\}',
# Strategy 3: Extract content between first { and last }
r'\{.*\}'
]
for i, pattern in enumerate(strategies):
add_log(f"🎯 Trying extraction strategy {i+1}")
matches = re.findall(pattern, text, re.DOTALL | re.IGNORECASE)
for match in matches:
try:
# Clean the match
cleaned = match.strip()
# Fix common JSON issues
cleaned = re.sub(r',\s*}', '}', cleaned) # Remove trailing commas
cleaned = re.sub(r',\s*]', ']', cleaned) # Remove trailing commas in arrays
parsed = json.loads(cleaned)
# Validate structure
if self.validate_podcast_structure(parsed):
add_log("βœ… Valid JSON structure found")
return parsed
except json.JSONDecodeError as e:
add_log(f"⚠️ JSON parse error in strategy {i+1}: {e}")
continue
add_log("⚠️ No valid JSON found, creating fallback")
return self.create_fallback_podcast(text)
def normalize_speaker_lines(self,text: str) -> str:
"""Normalize lines to 'Speaker 1: text' format based on presence of 1 or 2 and a ':' or '-'."""
# Convert markdown and bracketed formats to 'Speaker X: ...'
text = re.sub(
r'(?i)^.*?([12])[^a-zA-Z0-9]*[:\-]\s*',
lambda m: f"Speaker {m.group(1)}: ",
text,
flags=re.MULTILINE
)
return text
def conversation_to_json(self, text: str) -> Dict:
"""Convert speaker-formatted text to podcast JSON structure"""
# Allow leading whitespace and enforce full line match
"""Convert speaker-formatted text to podcast JSON structure"""
text = self.normalize_speaker_lines(text)
# Match strict "Speaker X: ..." lines only
lines = re.findall(r'^Speaker\s+([12]):\s*(.+)', text, flags=re.MULTILINE)
podcast = [{"speaker": int(s), "line": l.strip()} for s, l in lines]
return {
"topic": "Generated from Input",
"podcast": podcast
}
def validate_podcast_structure(self, data: Dict) -> bool:
"""Validate podcast JSON structure"""
try:
if not isinstance(data, dict):
return False
if 'topic' not in data or 'podcast' not in data:
return False
if not isinstance(data['podcast'], list):
return False
for item in data['podcast']:
if not isinstance(item, dict):
return False
if 'speaker' not in item or 'line' not in item:
return False
if not isinstance(item['speaker'], int) or item['speaker'] not in [1, 2]:
return False
if not isinstance(item['line'], str) or len(item['line'].strip()) == 0:
return False
return len(data['podcast']) > 0
except Exception:
return False
def create_fallback_podcast(self, text: str) -> Dict:
"""Create fallback podcast structure - IMPROVED"""
add_log("πŸ”§ Creating fallback podcast structure")
# Extract meaningful content from the original text
sentences = [s.strip() for s in text.split('.') if len(s.strip()) > 20]
if not sentences:
add_log("πŸ”§ failed sentences creating, fallback standard text")
sentences = [
"Welcome to our podcast discussion",
"Today we're exploring an interesting topic",
"Let's dive into the key points",
"That's a fascinating perspective",
"What are your thoughts on this matter",
"I think there are multiple angles to consider",
"This is definitely worth exploring further",
"Thank you for this engaging conversation"
]
# Create balanced conversation
podcast_lines = []
for i, sentence in enumerate(sentences[:12]): # Limit to 12 exchanges
speaker = (i % 2) + 1
line = sentence + "." if not sentence.endswith('.') else sentence
podcast_lines.append({
"speaker": speaker,
"line": line
})
result = {
"topic": "Generated Discussion",
"podcast": podcast_lines
}
add_log(f"βœ… Fallback podcast created with {len(podcast_lines)} lines")
return result
async def generate_script(self, prompt: str, language: str, file_obj=None, progress=None) -> Dict:
"""Improved script generation with better error handling"""
if not model_loaded or not self.model or not self.tokenizer:
raise Exception("❌ Model not properly initialized. Please restart the application.")
add_log("🎬 Starting script generation")
# Process file if provided - CRITICAL FIX #5
if file_obj is not None:
try:
add_log(f"πŸ“ Processing uploaded file: {file_obj}")
if file_obj.endswith('.pdf'):
extracted_text = self.extract_text_from_pdf(file_obj)
# Truncate if too long
if len(extracted_text) > 2000:
extracted_text = extracted_text[:2000] + "..."
add_log("βœ‚οΈ Text truncated to 2000 characters")
prompt = extracted_text
elif file_obj.endswith('.txt'):
with open(file_obj, 'r', encoding='utf-8') as f:
file_content = f.read()
if len(file_content) > 2000:
file_content = file_content[:2000] + "..."
prompt = file_content
except Exception as e:
add_log(f"⚠️ File processing error: {e}")
# Continue with original prompt
# Create focused prompt - CRITICAL FIX #6
example_json = {
"topic": "AI Technology",
"podcast": [
{"speaker": 1, "line": "Welcome to our discussion about AI technology."},
{"speaker": 2, "line": "Thanks for having me. This is such an exciting field."},
{"speaker": 1, "line": "What aspects of AI do you find most interesting?"},
{"speaker": 2, "line": "I'm particularly fascinated by machine learning applications."}
]
}
# Simplified and more reliable prompt
system_prompt = f"""Create a podcast script
Requirements:
- Exactly two speakers: Speaker 1 and Speaker 2
- The podcast should fill 4-5 minutes, focusing on the core context of the input text
- DO NOT copy the example below , only use it as conversation reference
- The podcast should be professional, lively, witty and engaging, and hook the listener from the start.
- The input text might be disorganized or unformatted. Ignore any formatting inconsistencies or irrelevant details; your task is to distill the essential points,
{{
"topic": "Short and engaging title",
"podcast": [
{{"speaker": 1, "line": "Welcome to the podcast."}},
{{"speaker": 2, "line": "Thank you, great to be here."}},
{{"speaker": 1, "line": "..."}},
{{"speaker": 2, "line": "..."}}
]
}}
Return only valid JSON. Do not include explanation, markdown, or comments.
"""
#Example JSON structure:
#{json.dumps(example_json, indent=2)}
#user_prompt = f"\nInput Text:\n{prompt}\n\nPodcast Script:" #user_prompt = user_prompt = f"\nInput Text:\n{prompt}\n\nJSON:"# f"\nTopic: {prompt}\nJSON:"
user_prompt = f"\nInput Text:\n{prompt}\n\nJSON:"
full_prompt = system_prompt + user_prompt
add_log("πŸ” Prompt Preview:\n" + full_prompt[:2000])
try:
if progress:
progress(0.3, "πŸ€– Generating script...")
add_log("πŸ”€ Tokenizing input...")
# Tokenize with proper handling
inputs = self.tokenizer(
full_prompt,
return_tensors="pt",
padding=True,
truncation=True,
max_length=1200, # Reduced for stability
return_attention_mask=True
)
# Move to correct device
inputs = {k: v.to(self.model.device) for k, v in inputs.items()}
add_log(f"βœ… Inputs moved to device: ")
add_log("self🧠 Generating with model...")
# Generate with timeout and better parameters
with torch.no_grad():
torch.cuda.empty_cache() if torch.cuda.is_available() else None
outputs = self.model.generate(
**inputs,
generation_config=self.generation_config,
pad_token_id=self.tokenizer.pad_token_id,
# attention_mask=inputs.get('attention_mask'),
use_cache=True
)
add_log("βœ… Model generation complete")
# Decode only new tokens
generated_text = self.tokenizer.decode(
outputs[0][inputs['input_ids'].shape[1]:],
skip_special_tokens=True,
clean_up_tokenization_spaces=True
)
add_log(f"πŸ“ Generated text length: {len(generated_text)} characters")
add_log(f"πŸ” Generated text preview: {generated_text[:2000]}...")
#formatted_text = await self.postprocess_conversation(generated_text)
#add_log(f"🧼 Post-processed text:\n{formatted_text[:2000]}")
if progress:
progress(0.4, "πŸ” Processing generated script...")
# Extract and validate JSON
result = self.clean_and_validate_json(generated_text)
#result = self.conversation_to_json(formatted_text)
if progress:
progress(0.5, "βœ… Script generated successfully!")
add_log(f"πŸ“„ Full generated text:\n{generated_text}")
add_log(f"βœ… Final script has {len(result.get('podcast', []))} lines")
return result
except Exception as e:
error_msg = f"❌ Script generation error: {str(e)}"
add_log(error_msg)
add_log(f"πŸ” failed script creation")
add_log(f"πŸ” Traceback: {traceback.format_exc()}")
# Return robust fallback
return self.create_fallback_podcast("Welcome to our podcast")
async def tts_generate(self, text: str, speaker: int, speaker1: str, speaker2: str) -> str:
"""Improved TTS generation with better error handling - CRITICAL FIX #7"""
voice = speaker1 if speaker == 1 else speaker2
add_log(f"πŸŽ™οΈ Generating TTS for speaker {speaker} with voice {voice}")
# Clean text for TTS
text = text.strip()
if not text:
raise Exception("Empty text for TTS")
# Remove problematic characters
text = re.sub(r'[^\w\s.,!?;:\-\'"()]', '', text)
temp_filename = f"temp_audio_{uuid.uuid4().hex[:8]}.wav"
max_retries = 3
for attempt in range(max_retries):
try:
add_log(f"🎡 TTS attempt {attempt + 1} for: {text[:50]}...")
communicate = edge_tts.Communicate(text, voice)
# Use asyncio.wait_for with timeout
await asyncio.wait_for(
communicate.save(temp_filename),
timeout=30.0
)
# Verify file was created and has content
if os.path.exists(temp_filename) and os.path.getsize(temp_filename) > 1000:
add_log(f"βœ… TTS successful: {os.path.getsize(temp_filename)} bytes")
return temp_filename
else:
raise Exception("Generated audio file is too small or empty")
except asyncio.TimeoutError:
add_log(f"⏰ TTS timeout on attempt {attempt + 1}")
if os.path.exists(temp_filename):
os.remove(temp_filename)
if attempt == max_retries - 1:
raise Exception("TTS generation timed out after multiple attempts")
await asyncio.sleep(2)
except Exception as e:
add_log(f"❌ TTS error on attempt {attempt + 1}: {str(e)}")
if os.path.exists(temp_filename):
os.remove(temp_filename)
if attempt == max_retries - 1:
raise Exception(f"TTS generation failed after {max_retries} attempts: {str(e)}")
await asyncio.sleep(2)
async def combine_audio_files(self, audio_files: List[str], progress=None) -> str:
"""Improved audio combination - CRITICAL FIX #8"""
if progress:
progress(0.9, "🎡 Combining audio files...")
add_log(f"πŸ”— Combining {len(audio_files)} audio files")
try:
combined_audio = AudioSegment.empty()
silence_padding = AudioSegment.silent(duration=800) # 800ms silence
for i, audio_file in enumerate(audio_files):
try:
add_log(f"πŸ“ Processing audio file {i+1}: {audio_file}")
if not os.path.exists(audio_file):
add_log(f"⚠️ Audio file not found: {audio_file}")
continue
file_size = os.path.getsize(audio_file)
add_log(f"πŸ“Š File size: {file_size} bytes")
if file_size < 2000:
add_log(f"⚠️ 1 Audio file too small, skipping: {audio_file}")
continue
audio_segment = AudioSegment.from_file(audio_file)
if len(audio_segment) < 500: # Less than 100ms
add_log(f"⚠️ 2 Audio segment too short, skipping")
continue
combined_audio += audio_segment
# Add silence between speakers (except for the last file)
if i < len(audio_files) - 1:
combined_audio += silence_padding
add_log(f"βœ… Added audio segment {i+1}, total duration: {len(combined_audio)}ms")
except Exception as e:
add_log(f"⚠️ Could not process audio file {audio_file}: {e}")
continue
finally:
# Clean up temporary file
try:
if os.path.exists(audio_file):
os.remove(audio_file)
add_log(f"πŸ—‘οΈ Cleaned up temp file: {audio_file}")
except:
pass
if len(combined_audio) == 0:
raise Exception("No valid audio content was generated")
if len(combined_audio) < 5000: # Less than 5 seconds
raise Exception("3 Combined audio is too short")
output_filename = f"podcast_output_{uuid.uuid4().hex[:8]}.wav"
combined_audio.export(output_filename, format="wav")
file_size = os.path.getsize(output_filename)
duration = len(combined_audio) / 1000 # Duration in seconds
add_log(f"βœ… Final podcast: {output_filename} ({file_size} bytes, {duration:.1f}s)")
if progress:
progress(1.0, "πŸŽ‰ Podcast generated successfully!")
return output_filename
except Exception as e:
error_msg = f"❌ Audio combination failed: {str(e)}"
add_log(error_msg)
# Clean up any remaining temp files
for audio_file in audio_files:
try:
if os.path.exists(audio_file):
os.remove(audio_file)
except:
pass
raise Exception(error_msg)
async def generate_podcast(self, input_text: str, language: str, speaker1: str, speaker2: str, file_obj=None, progress=None) -> str:
"""Main podcast generation pipeline - CRITICAL FIX #9"""
start_time = time.time()
add_log("🎬 Starting podcast generation pipeline")
try:
if progress:
progress(0.1, "πŸš€ Starting podcast generation...")
# Generate script
add_log("πŸ“ Generating podcast script...")
podcast_json = await self.generate_script(input_text, language, file_obj, progress)
if not podcast_json.get('podcast') or len(podcast_json['podcast']) == 0:
raise Exception("No podcast content was generated")
add_log(f"βœ… Script generated with {len(podcast_json['podcast'])} dialogue lines")
if progress:
progress(0.5, "πŸŽ™οΈ Converting text to speech...")
# Generate TTS with proper error handling
audio_files = []
total_lines = len(podcast_json['podcast'])
successful_lines = 0
for i, item in enumerate(podcast_json['podcast']):
try:
add_log(f"🎡 Processing line {i+1}/{total_lines}: Speaker {item['speaker']}")
clean_line = item['line']
# πŸ”§ Sanitize malformed lines
if not isinstance(clean_line, str) or len(clean_line.strip()) == 0 or clean_line.strip().startswith('"') or "{" in clean_line:
add_log(f"⚠️ Malformed line detected for speaker {item['speaker']}: {repr(clean_line[:80])}")
# Try to recover from JSON-like noise
candidates = re.findall(r'\"line\"\s*:\s*\"([^\"]+)\"', clean_line)
if candidates:
clean_line = candidates[0]
add_log(f"βœ… Recovered line: {clean_line}")
else:
# Fallback: strip bad characters
clean_line = re.sub(r'[^A-Za-z0-9\s.,!?;:\-\'"]+', '', clean_line)
add_log(f"πŸ› οΈ Cleaned fallback line: {clean_line}")
audio_file = await self.tts_generate(
clean_line,
#item['line'],
item['speaker'],
speaker1,
speaker2
)
audio_files.append(audio_file)
successful_lines += 1
# Update progress
if progress:
current_progress = 0.5 + (0.4 * (i + 1) / total_lines)
progress(current_progress, f"πŸŽ™οΈ Generated speech {successful_lines}/{total_lines}")
except Exception as e:
add_log(f"❌ TTS failed for line {i+1}: {e}")
# Continue with remaining lines rather than failing completely
continue
if not audio_files:
raise Exception("No audio files were generated successfully")
if successful_lines < len(podcast_json['podcast']) / 2:
add_log(f"⚠️ Warning: Only {successful_lines}/{total_lines} lines processed successfully")
add_log(f"βœ… TTS generation complete: {len(audio_files)} audio files")
# Combine audio files
combined_audio = await self.combine_audio_files(audio_files, progress)
elapsed_time = time.time() - start_time
add_log(f"πŸŽ‰ Podcast generation completed in {elapsed_time:.1f} seconds")
return combined_audio
except Exception as e:
elapsed_time = time.time() - start_time
error_msg = f"❌ Podcast generation failed after {elapsed_time:.1f}s: {str(e)}"
add_log(error_msg)
add_log(f"πŸ” Full traceback: {traceback.format_exc()}")
raise Exception(error_msg)
# Voice mapping
VOICE_MAPPING = {
"Andrew - English (United States)": "en-US-AndrewMultilingualNeural",
"Ava - English (United States)": "en-US-AvaMultilingualNeural",
"Brian - English (United States)": "en-US-BrianMultilingualNeural",
"Emma - English (United States)": "en-US-EmmaMultilingualNeural",
"Florian - German (Germany)": "de-DE-FlorianMultilingualNeural",
"Seraphina - German (Germany)": "de-DE-SeraphinaMultilingualNeural",
"Remy - French (France)": "fr-FR-RemyMultilingualNeural",
"Vivienne - French (France)": "fr-FR-VivienneMultilingualNeural"
}
async def process_input(input_text: str, input_file, language: str, speaker1: str, speaker2: str, progress=None) -> str:
"""Process input and generate podcast - MAIN ENTRY POINT"""
add_log("=" * 50)
add_log("🎬 NEW PODCAST GENERATION REQUEST")
add_log("=" * 50)
try:
if progress:
progress(0.05, "πŸ” Processing input...")
# Map speaker names to voice IDs
speaker1_voice = VOICE_MAPPING.get(speaker1, "en-US-AndrewMultilingualNeural")
speaker2_voice = VOICE_MAPPING.get(speaker2, "en-US-AvaMultilingualNeural")
add_log(f"🎭 Speaker 1: {speaker1} -> {speaker1_voice}")
add_log(f"🎭 Speaker 2: {speaker2} -> {speaker2_voice}")
# Validate input
if not input_text or input_text.strip() == "":
if input_file is None:
raise Exception("❌ Please provide either text input or upload a file")
add_log("πŸ“ No text input provided, will process uploaded file")
else:
add_log(f"πŸ“ Text input provided: {len(input_text)} characters")
if input_file:
add_log(f"πŸ“Ž File uploaded: {input_file}")
# Check model status
if not model_loaded:
raise Exception("❌ Model not loaded. Please restart the application.")
podcast_generator = PodcastGenerator()
result = await podcast_generator.generate_podcast(
input_text, language, speaker1_voice, speaker2_voice, input_file, progress
)
add_log("πŸŽ‰ PODCAST GENERATION COMPLETED SUCCESSFULLY")
return result
except Exception as e:
error_msg = f"❌ CRITICAL ERROR: {str(e)}"
add_log(error_msg)
add_log(f"πŸ” Traceback: {traceback.format_exc()}")
raise Exception(error_msg)
def generate_podcast_gradio(input_text, input_file, language, speaker1, speaker2):
"""Gradio interface function - CRITICAL FIX #10"""
global logs
logs = [] # Reset logs for each generation
try:
add_log("🎬 Gradio function called")
add_log(f"πŸ“‹ Parameters: text={bool(input_text)}, file={bool(input_file)}, lang={language}")
# Validate inputs
if not input_text and input_file is None:
add_log("❌ No input provided")
return None, "\n".join(logs)
if input_text and len(input_text.strip()) == 0:
input_text = None
# Progress tracking
def progress_callback(value, text):
add_log(f"πŸ“Š Progress: {value:.1%} - {text}")
# Create new event loop for this request - CRITICAL FIX
try:
# Try to get existing loop
try:
loop = asyncio.get_running_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
if loop.is_running():
# If loop is running, we need to run in thread
import concurrent.futures
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(
lambda: asyncio.run(
process_input(input_text, input_file, language, speaker1, speaker2, progress_callback)
)
)
result = future.result(timeout=300) # 5 minute timeout
else:
result = loop.run_until_complete(
process_input(input_text, input_file, language, speaker1, speaker2, progress_callback)
)
except RuntimeError:
# No event loop exists, create new one
result = asyncio.run(
process_input(input_text, input_file, language, speaker1, speaker2, progress_callback)
)
add_log("βœ… Gradio function completed successfully")
return result, "\n".join(logs)
except Exception as e:
error_msg = f"❌ Gradio function error: {str(e)}"
add_log(error_msg)
add_log(f"πŸ” Traceback: {traceback.format_exc()}")
return None, "\n".join(logs)
def create_interface():
#model_loaded = initialize_model()
if model_loaded:
test_llm_generation()
"""Create the Gradio interface"""
language_options = [
"Auto Detect", "English", "German", "French", "Spanish", "Italian",
"Portuguese", "Dutch", "Russian", "Chinese", "Japanese", "Korean"
]
voice_options = list(VOICE_MAPPING.keys())
with gr.Blocks(
title="Pasching Podcast 2πŸŽ™οΈ",
theme=gr.themes.Soft(),
css=".gradio-container {max-width: 1200px; margin: auto;}"
) as demo:
gr.Markdown("# πŸŽ™οΈ Pasching Podcast 2")
gr.Markdown("Generate professional 2-speaker podcasts from text input!")
# Model status indicator
if model_loaded:
gr.Markdown("βœ… **Model Status: Ready**")
else:
gr.Markdown("❌ **Model Status: Failed to Load**")
with gr.Row():
with gr.Column(scale=2):
input_text = gr.Textbox(
label="Input Text",
lines=8,
placeholder="Enter your topic or text for podcast generation...",
info="Describe what you want the podcast to discuss"
)
with gr.Column(scale=1):
input_file = gr.File(
label="Upload File (Optional)",
file_types=[".pdf", ".txt"],
type="filepath",
#info=f"Max size: {MAX_FILE_SIZE_MB}MB"
)
with gr.Row():
language = gr.Dropdown(
label="Language",
choices=language_options,
value="Auto Detect",
info="Select output language"
)
speaker1 = gr.Dropdown(
label="Speaker 1 Voice",
choices=voice_options,
value="Andrew - English (United States)"
)
speaker2 = gr.Dropdown(
label="Speaker 2 Voice",
choices=voice_options,
value="Ava - English (United States)"
)
generate_btn = gr.Button(
"πŸŽ™οΈ Generate Podcast",
variant="primary",
size="lg",
interactive=model_loaded
)
log_output = gr.Textbox(
label="πŸͺ΅ Debug & Transcript Log",
lines=15,
interactive=False,
info="Real-time generation logs and debugging information"
)
output_audio = gr.Audio(
label="Generated Podcast",
type="filepath",
format="wav",
show_download_button=True
)
# Connect the interface
generate_btn.click(
fn=generate_podcast_gradio,
inputs=[input_text, input_file, language, speaker1, speaker2],
outputs=[output_audio, log_output],
show_progress=True
)
return demo
if __name__ == "__main__":
demo = create_interface()
demo.launch(
server_name="0.0.0.0",
server_port=7860,
show_error=True,
share=False
)