Spaces:
Sleeping
Sleeping
import gradio as gr | |
from pydub import AudioSegment | |
import json | |
import uuid | |
import edge_tts | |
import asyncio | |
import aiofiles | |
import os | |
import time | |
import torch | |
import re | |
from typing import List, Dict, Optional | |
from transformers import AutoTokenizer, AutoModelForCausalLM, GenerationConfig | |
import PyPDF2 | |
import traceback | |
import os | |
import shutil | |
from pathlib import Path | |
model_subdir = Path.home() / ".cache" / "huggingface" / "hub" / "models--unsloth--Llama-3.2-3B" | |
# Enable persistent caching on Hugging Face Spaces (if persistent storage is enabled) | |
os.environ["TRANSFORMERS_CACHE"] = "/data/models" | |
#from git import Repo | |
#Repo.clone_from("https://huggingface.co/unsloth/Llama-3.2-3B-bnb-4bit", "./local_model_dir") | |
# Constants | |
MAX_FILE_SIZE_MB = 20 | |
MAX_FILE_SIZE_BYTES = MAX_FILE_SIZE_MB * 1024 * 1024 | |
MODEL_ID = "meta-llama/Meta-Llama-3-8B" #unsloth/Llama-3.2-3B" #meta-llama/Meta-Llama-3-8B"# unsloth/Llama-3.2-3B"#meta-llama/Meta-Llama-3-8B" #"unsloth/Llama-4-Scout-17B-16E-Instruct-GGUF"# unsloth/Qwen2.5-1.5B" #unsloth/Llama-3.2-3B" #unsloth/Llama-3.2-1B" | |
glotoken = os.environ.get("Tokentest") | |
# Global logging system - | |
logs = [] | |
def add_log(message): | |
"""Thread-safe logging function""" | |
logs.append(f"[{time.strftime('%H:%M:%S')}] {message}") | |
print(message) | |
# Initialize model with comprehensive error handling | |
model = None | |
tokenizer = None | |
generation_config = None | |
def test_llm_generation(): | |
try: | |
test_prompt = "Hello, how are you today?" | |
inputs = tokenizer(test_prompt, return_tensors="pt").to(model.device) | |
with torch.no_grad(): | |
outputs = model.generate( | |
**inputs, | |
max_new_tokens=10, | |
do_sample=False, | |
pad_token_id=tokenizer.pad_token_id, | |
eos_token_id=tokenizer.eos_token_id | |
) | |
result = tokenizer.decode(outputs[0], skip_special_tokens=True) | |
add_log(f"π§ͺ Test LLM response: {result[:100]}") | |
except Exception as e: | |
add_log(f"β LLM quick test failed: {e}") | |
def initialize_model(): | |
global model, tokenizer, generation_config | |
try: | |
add_log("π Initializing model...") | |
tokenizer = AutoTokenizer.from_pretrained( | |
MODEL_ID, | |
cache_dir="/data/models", | |
token=glotoken, | |
trust_remote_code=True, | |
use_fast=False | |
) | |
if tokenizer.pad_token is None: | |
tokenizer.pad_token = tokenizer.eos_token | |
add_log("β Set pad_token to eos_token") | |
# Force GPU settings | |
model = AutoModelForCausalLM.from_pretrained( | |
MODEL_ID, | |
torch_dtype=torch.float16, | |
cache_dir="/data/models", | |
trust_remote_code=True, | |
token=glotoken, | |
device_map={"": 0}, # <- force GPU:0 | |
low_cpu_mem_usage=True | |
) | |
# model = AutoModelForCausalLM.from_pretrained( | |
# MODEL_ID, | |
# cache_dir="/data/models", | |
# trust_remote_code=True | |
# ) | |
model.eval() | |
generation_config = GenerationConfig( | |
max_new_tokens=4096, | |
temperature=0.7, | |
top_p=0.9, | |
do_sample=True, | |
pad_token_id=tokenizer.pad_token_id, | |
eos_token_id=tokenizer.eos_token_id, | |
repetition_penalty=1.1, | |
length_penalty=1.0 | |
) | |
add_log(f"β Model loaded successfully on device: {model.device}") | |
return True | |
except Exception as e: | |
error_msg = f"β Model initialization failed: {str(e)}" | |
add_log(error_msg) | |
add_log(f"Traceback: {traceback.format_exc()}") | |
return False | |
except Exception as e: | |
error_msg = f"β Model initialization failed: {str(e)}" | |
add_log(error_msg) | |
add_log(f"Traceback: {traceback.format_exc()}") | |
return False | |
# Initialize model at startup | |
model_loaded = initialize_model() | |
class PodcastGenerator: | |
def __init__(self): | |
self.model = model | |
self.tokenizer = tokenizer | |
self.generation_config = generation_config | |
def extract_text_from_pdf(self, file_path: str) -> str: | |
"""Extract text from PDF file - CRITICAL FIX #3""" | |
try: | |
add_log(f"π Extracting text from PDF: {file_path}") | |
with open(file_path, 'rb') as file: | |
pdf_reader = PyPDF2.PdfReader(file) | |
text = "" | |
for page_num, page in enumerate(pdf_reader.pages): | |
try: | |
page_text = page.extract_text() | |
text += page_text + "\n" | |
add_log(f"β Extracted page {page_num + 1}") | |
except Exception as e: | |
add_log(f"β οΈ Failed to extract page {page_num + 1}: {e}") | |
continue | |
if not text.strip(): | |
raise Exception("No text could be extracted from PDF") | |
add_log(f"β PDF extraction complete. Text length: {len(text)} characters") | |
return text.strip() | |
except Exception as e: | |
error_msg = f"β PDF extraction failed: {str(e)}" | |
add_log(error_msg) | |
raise Exception(error_msg) | |
async def postprocess_conversation(self, raw_text: str) -> str: | |
"""Run LLM again to enforce strict Speaker 1/2 format""" | |
prompt = f""" | |
You are a podcast formatter. | |
You just reformat text as if two persons have a conversation | |
- Every line begins with exactly and strictily with `Speaker 1:` or `Speaker 2:` (with colon) | |
- No timestamps, no names, no parentheses, no extra formatting, no chapter names, no special characters beside ":" | |
- No blank lines allowed | |
- Do not invent or change the content, do not add or use -any- person or speaker names, chapeter names , time stamps etc | |
- you are not allowed to use anywhere in the text the character +#-*<>"()[] | |
Example output - you have to follow this structure: | |
Speaker 1: Hello and welcome. | |
Speaker 2: Thanks! Glad to be here. | |
Speaker 1: ... | |
Speaker 2: ... | |
Speaker 1: ... | |
Speaker 2: ... | |
Now format the following according to above instructions | |
{raw_text} | |
""" | |
inputs = self.tokenizer( | |
prompt, | |
return_tensors="pt", | |
truncation=True, | |
max_length=2048 | |
) | |
inputs = {k: v.to(self.model.device) for k, v in inputs.items()} | |
#inputs = {k: v for k, v in inputs.items()} | |
with torch.no_grad(): | |
outputs = self.model.generate( | |
**inputs, | |
max_new_tokens=1024, | |
pad_token_id=self.tokenizer.pad_token_id, | |
eos_token_id=self.tokenizer.eos_token_id | |
) | |
formatted = self.tokenizer.decode( | |
outputs[0][inputs['input_ids'].shape[1]:], | |
skip_special_tokens=True | |
) | |
return formatted.strip() | |
def clean_and_validate_json(self, text: str) -> Dict: | |
"""Improved JSON extraction and validation - CRITICAL FIX #4""" | |
add_log("π Attempting to extract JSON from generated text") | |
# Multiple strategies for JSON extraction | |
strategies = [ | |
# Strategy 1: Look for complete JSON objects | |
r'\{[^{}]*"topic"[^{}]*"podcast"[^{}]*\[[^\]]*\][^{}]*\}', | |
# Strategy 2: More flexible pattern | |
r'\{.*?"topic".*?"podcast".*?\[.*?\].*?\}', | |
# Strategy 3: Extract content between first { and last } | |
r'\{.*\}' | |
] | |
for i, pattern in enumerate(strategies): | |
add_log(f"π― Trying extraction strategy {i+1}") | |
matches = re.findall(pattern, text, re.DOTALL | re.IGNORECASE) | |
for match in matches: | |
try: | |
# Clean the match | |
cleaned = match.strip() | |
# Fix common JSON issues | |
cleaned = re.sub(r',\s*}', '}', cleaned) # Remove trailing commas | |
cleaned = re.sub(r',\s*]', ']', cleaned) # Remove trailing commas in arrays | |
parsed = json.loads(cleaned) | |
# Validate structure | |
if self.validate_podcast_structure(parsed): | |
add_log("β Valid JSON structure found") | |
return parsed | |
except json.JSONDecodeError as e: | |
add_log(f"β οΈ JSON parse error in strategy {i+1}: {e}") | |
continue | |
add_log("β οΈ No valid JSON found, creating fallback") | |
return self.create_fallback_podcast(text) | |
def normalize_speaker_lines(self,text: str) -> str: | |
"""Normalize lines to 'Speaker 1: text' format based on presence of 1 or 2 and a ':' or '-'.""" | |
# Convert markdown and bracketed formats to 'Speaker X: ...' | |
text = re.sub( | |
r'(?i)^.*?([12])[^a-zA-Z0-9]*[:\-]\s*', | |
lambda m: f"Speaker {m.group(1)}: ", | |
text, | |
flags=re.MULTILINE | |
) | |
return text | |
def conversation_to_json(self, text: str) -> Dict: | |
"""Convert speaker-formatted text to podcast JSON structure""" | |
# Allow leading whitespace and enforce full line match | |
"""Convert speaker-formatted text to podcast JSON structure""" | |
text = self.normalize_speaker_lines(text) | |
# Match strict "Speaker X: ..." lines only | |
lines = re.findall(r'^Speaker\s+([12]):\s*(.+)', text, flags=re.MULTILINE) | |
podcast = [{"speaker": int(s), "line": l.strip()} for s, l in lines] | |
return { | |
"topic": "Generated from Input", | |
"podcast": podcast | |
} | |
def validate_podcast_structure(self, data: Dict) -> bool: | |
"""Validate podcast JSON structure""" | |
try: | |
if not isinstance(data, dict): | |
return False | |
if 'topic' not in data or 'podcast' not in data: | |
return False | |
if not isinstance(data['podcast'], list): | |
return False | |
for item in data['podcast']: | |
if not isinstance(item, dict): | |
return False | |
if 'speaker' not in item or 'line' not in item: | |
return False | |
if not isinstance(item['speaker'], int) or item['speaker'] not in [1, 2]: | |
return False | |
if not isinstance(item['line'], str) or len(item['line'].strip()) == 0: | |
return False | |
return len(data['podcast']) > 0 | |
except Exception: | |
return False | |
def create_fallback_podcast(self, text: str) -> Dict: | |
"""Create fallback podcast structure - IMPROVED""" | |
add_log("π§ Creating fallback podcast structure") | |
# Extract meaningful content from the original text | |
sentences = [s.strip() for s in text.split('.') if len(s.strip()) > 20] | |
if not sentences: | |
add_log("π§ failed sentences creating, fallback standard text") | |
sentences = [ | |
"Welcome to our podcast discussion", | |
"Today we're exploring an interesting topic", | |
"Let's dive into the key points", | |
"That's a fascinating perspective", | |
"What are your thoughts on this matter", | |
"I think there are multiple angles to consider", | |
"This is definitely worth exploring further", | |
"Thank you for this engaging conversation" | |
] | |
# Create balanced conversation | |
podcast_lines = [] | |
for i, sentence in enumerate(sentences[:12]): # Limit to 12 exchanges | |
speaker = (i % 2) + 1 | |
line = sentence + "." if not sentence.endswith('.') else sentence | |
podcast_lines.append({ | |
"speaker": speaker, | |
"line": line | |
}) | |
result = { | |
"topic": "Generated Discussion", | |
"podcast": podcast_lines | |
} | |
add_log(f"β Fallback podcast created with {len(podcast_lines)} lines") | |
return result | |
async def generate_script(self, prompt: str, language: str, file_obj=None, progress=None) -> Dict: | |
"""Improved script generation with better error handling""" | |
if not model_loaded or not self.model or not self.tokenizer: | |
raise Exception("β Model not properly initialized. Please restart the application.") | |
add_log("π¬ Starting script generation") | |
# Process file if provided - CRITICAL FIX #5 | |
if file_obj is not None: | |
try: | |
add_log(f"π Processing uploaded file: {file_obj}") | |
if file_obj.endswith('.pdf'): | |
extracted_text = self.extract_text_from_pdf(file_obj) | |
# Truncate if too long | |
if len(extracted_text) > 2000: | |
extracted_text = extracted_text[:2000] + "..." | |
add_log("βοΈ Text truncated to 2000 characters") | |
prompt = extracted_text | |
elif file_obj.endswith('.txt'): | |
with open(file_obj, 'r', encoding='utf-8') as f: | |
file_content = f.read() | |
if len(file_content) > 2000: | |
file_content = file_content[:2000] + "..." | |
prompt = file_content | |
except Exception as e: | |
add_log(f"β οΈ File processing error: {e}") | |
# Continue with original prompt | |
# Create focused prompt - CRITICAL FIX #6 | |
example_json = { | |
"topic": "AI Technology", | |
"podcast": [ | |
{"speaker": 1, "line": "Welcome to our discussion about AI technology."}, | |
{"speaker": 2, "line": "Thanks for having me. This is such an exciting field."}, | |
{"speaker": 1, "line": "What aspects of AI do you find most interesting?"}, | |
{"speaker": 2, "line": "I'm particularly fascinated by machine learning applications."} | |
] | |
} | |
# Simplified and more reliable prompt | |
system_prompt = f"""Create a podcast script | |
Requirements: | |
- Exactly two speakers: Speaker 1 and Speaker 2 | |
- The podcast should fill 4-5 minutes, focusing on the core context of the input text | |
- DO NOT copy the example below , only use it as conversation reference | |
- The podcast should be professional, lively, witty and engaging, and hook the listener from the start. | |
- The input text might be disorganized or unformatted. Ignore any formatting inconsistencies or irrelevant details; your task is to distill the essential points, | |
{{ | |
"topic": "Short and engaging title", | |
"podcast": [ | |
{{"speaker": 1, "line": "Welcome to the podcast."}}, | |
{{"speaker": 2, "line": "Thank you, great to be here."}}, | |
{{"speaker": 1, "line": "..."}}, | |
{{"speaker": 2, "line": "..."}} | |
] | |
}} | |
Return only valid JSON. Do not include explanation, markdown, or comments. | |
""" | |
#Example JSON structure: | |
#{json.dumps(example_json, indent=2)} | |
#user_prompt = f"\nInput Text:\n{prompt}\n\nPodcast Script:" #user_prompt = user_prompt = f"\nInput Text:\n{prompt}\n\nJSON:"# f"\nTopic: {prompt}\nJSON:" | |
user_prompt = f"\nInput Text:\n{prompt}\n\nJSON:" | |
full_prompt = system_prompt + user_prompt | |
add_log("π Prompt Preview:\n" + full_prompt[:2000]) | |
try: | |
if progress: | |
progress(0.3, "π€ Generating script...") | |
add_log("π€ Tokenizing input...") | |
# Tokenize with proper handling | |
inputs = self.tokenizer( | |
full_prompt, | |
return_tensors="pt", | |
padding=True, | |
truncation=True, | |
max_length=1200, # Reduced for stability | |
return_attention_mask=True | |
) | |
# Move to correct device | |
inputs = {k: v.to(self.model.device) for k, v in inputs.items()} | |
add_log(f"β Inputs moved to device: ") | |
add_log("selfπ§ Generating with model...") | |
# Generate with timeout and better parameters | |
with torch.no_grad(): | |
torch.cuda.empty_cache() if torch.cuda.is_available() else None | |
outputs = self.model.generate( | |
**inputs, | |
generation_config=self.generation_config, | |
pad_token_id=self.tokenizer.pad_token_id, | |
# attention_mask=inputs.get('attention_mask'), | |
use_cache=True | |
) | |
add_log("β Model generation complete") | |
# Decode only new tokens | |
generated_text = self.tokenizer.decode( | |
outputs[0][inputs['input_ids'].shape[1]:], | |
skip_special_tokens=True, | |
clean_up_tokenization_spaces=True | |
) | |
add_log(f"π Generated text length: {len(generated_text)} characters") | |
add_log(f"π Generated text preview: {generated_text[:2000]}...") | |
#formatted_text = await self.postprocess_conversation(generated_text) | |
#add_log(f"π§Ό Post-processed text:\n{formatted_text[:2000]}") | |
if progress: | |
progress(0.4, "π Processing generated script...") | |
# Extract and validate JSON | |
result = self.clean_and_validate_json(generated_text) | |
#result = self.conversation_to_json(formatted_text) | |
if progress: | |
progress(0.5, "β Script generated successfully!") | |
add_log(f"π Full generated text:\n{generated_text}") | |
add_log(f"β Final script has {len(result.get('podcast', []))} lines") | |
return result | |
except Exception as e: | |
error_msg = f"β Script generation error: {str(e)}" | |
add_log(error_msg) | |
add_log(f"π failed script creation") | |
add_log(f"π Traceback: {traceback.format_exc()}") | |
# Return robust fallback | |
return self.create_fallback_podcast("Welcome to our podcast") | |
async def tts_generate(self, text: str, speaker: int, speaker1: str, speaker2: str) -> str: | |
"""Improved TTS generation with better error handling - CRITICAL FIX #7""" | |
voice = speaker1 if speaker == 1 else speaker2 | |
add_log(f"ποΈ Generating TTS for speaker {speaker} with voice {voice}") | |
# Clean text for TTS | |
text = text.strip() | |
if not text: | |
raise Exception("Empty text for TTS") | |
# Remove problematic characters | |
text = re.sub(r'[^\w\s.,!?;:\-\'"()]', '', text) | |
temp_filename = f"temp_audio_{uuid.uuid4().hex[:8]}.wav" | |
max_retries = 3 | |
for attempt in range(max_retries): | |
try: | |
add_log(f"π΅ TTS attempt {attempt + 1} for: {text[:50]}...") | |
communicate = edge_tts.Communicate(text, voice) | |
# Use asyncio.wait_for with timeout | |
await asyncio.wait_for( | |
communicate.save(temp_filename), | |
timeout=30.0 | |
) | |
# Verify file was created and has content | |
if os.path.exists(temp_filename) and os.path.getsize(temp_filename) > 1000: | |
add_log(f"β TTS successful: {os.path.getsize(temp_filename)} bytes") | |
return temp_filename | |
else: | |
raise Exception("Generated audio file is too small or empty") | |
except asyncio.TimeoutError: | |
add_log(f"β° TTS timeout on attempt {attempt + 1}") | |
if os.path.exists(temp_filename): | |
os.remove(temp_filename) | |
if attempt == max_retries - 1: | |
raise Exception("TTS generation timed out after multiple attempts") | |
await asyncio.sleep(2) | |
except Exception as e: | |
add_log(f"β TTS error on attempt {attempt + 1}: {str(e)}") | |
if os.path.exists(temp_filename): | |
os.remove(temp_filename) | |
if attempt == max_retries - 1: | |
raise Exception(f"TTS generation failed after {max_retries} attempts: {str(e)}") | |
await asyncio.sleep(2) | |
async def combine_audio_files(self, audio_files: List[str], progress=None) -> str: | |
"""Improved audio combination - CRITICAL FIX #8""" | |
if progress: | |
progress(0.9, "π΅ Combining audio files...") | |
add_log(f"π Combining {len(audio_files)} audio files") | |
try: | |
combined_audio = AudioSegment.empty() | |
silence_padding = AudioSegment.silent(duration=800) # 800ms silence | |
for i, audio_file in enumerate(audio_files): | |
try: | |
add_log(f"π Processing audio file {i+1}: {audio_file}") | |
if not os.path.exists(audio_file): | |
add_log(f"β οΈ Audio file not found: {audio_file}") | |
continue | |
file_size = os.path.getsize(audio_file) | |
add_log(f"π File size: {file_size} bytes") | |
if file_size < 2000: | |
add_log(f"β οΈ 1 Audio file too small, skipping: {audio_file}") | |
continue | |
audio_segment = AudioSegment.from_file(audio_file) | |
if len(audio_segment) < 500: # Less than 100ms | |
add_log(f"β οΈ 2 Audio segment too short, skipping") | |
continue | |
combined_audio += audio_segment | |
# Add silence between speakers (except for the last file) | |
if i < len(audio_files) - 1: | |
combined_audio += silence_padding | |
add_log(f"β Added audio segment {i+1}, total duration: {len(combined_audio)}ms") | |
except Exception as e: | |
add_log(f"β οΈ Could not process audio file {audio_file}: {e}") | |
continue | |
finally: | |
# Clean up temporary file | |
try: | |
if os.path.exists(audio_file): | |
os.remove(audio_file) | |
add_log(f"ποΈ Cleaned up temp file: {audio_file}") | |
except: | |
pass | |
if len(combined_audio) == 0: | |
raise Exception("No valid audio content was generated") | |
if len(combined_audio) < 5000: # Less than 5 seconds | |
raise Exception("3 Combined audio is too short") | |
output_filename = f"podcast_output_{uuid.uuid4().hex[:8]}.wav" | |
combined_audio.export(output_filename, format="wav") | |
file_size = os.path.getsize(output_filename) | |
duration = len(combined_audio) / 1000 # Duration in seconds | |
add_log(f"β Final podcast: {output_filename} ({file_size} bytes, {duration:.1f}s)") | |
if progress: | |
progress(1.0, "π Podcast generated successfully!") | |
return output_filename | |
except Exception as e: | |
error_msg = f"β Audio combination failed: {str(e)}" | |
add_log(error_msg) | |
# Clean up any remaining temp files | |
for audio_file in audio_files: | |
try: | |
if os.path.exists(audio_file): | |
os.remove(audio_file) | |
except: | |
pass | |
raise Exception(error_msg) | |
async def generate_podcast(self, input_text: str, language: str, speaker1: str, speaker2: str, file_obj=None, progress=None) -> str: | |
"""Main podcast generation pipeline - CRITICAL FIX #9""" | |
start_time = time.time() | |
add_log("π¬ Starting podcast generation pipeline") | |
try: | |
if progress: | |
progress(0.1, "π Starting podcast generation...") | |
# Generate script | |
add_log("π Generating podcast script...") | |
podcast_json = await self.generate_script(input_text, language, file_obj, progress) | |
if not podcast_json.get('podcast') or len(podcast_json['podcast']) == 0: | |
raise Exception("No podcast content was generated") | |
add_log(f"β Script generated with {len(podcast_json['podcast'])} dialogue lines") | |
if progress: | |
progress(0.5, "ποΈ Converting text to speech...") | |
# Generate TTS with proper error handling | |
audio_files = [] | |
total_lines = len(podcast_json['podcast']) | |
successful_lines = 0 | |
for i, item in enumerate(podcast_json['podcast']): | |
try: | |
add_log(f"π΅ Processing line {i+1}/{total_lines}: Speaker {item['speaker']}") | |
clean_line = item['line'] | |
# π§ Sanitize malformed lines | |
if not isinstance(clean_line, str) or len(clean_line.strip()) == 0 or clean_line.strip().startswith('"') or "{" in clean_line: | |
add_log(f"β οΈ Malformed line detected for speaker {item['speaker']}: {repr(clean_line[:80])}") | |
# Try to recover from JSON-like noise | |
candidates = re.findall(r'\"line\"\s*:\s*\"([^\"]+)\"', clean_line) | |
if candidates: | |
clean_line = candidates[0] | |
add_log(f"β Recovered line: {clean_line}") | |
else: | |
# Fallback: strip bad characters | |
clean_line = re.sub(r'[^A-Za-z0-9\s.,!?;:\-\'"]+', '', clean_line) | |
add_log(f"π οΈ Cleaned fallback line: {clean_line}") | |
audio_file = await self.tts_generate( | |
clean_line, | |
#item['line'], | |
item['speaker'], | |
speaker1, | |
speaker2 | |
) | |
audio_files.append(audio_file) | |
successful_lines += 1 | |
# Update progress | |
if progress: | |
current_progress = 0.5 + (0.4 * (i + 1) / total_lines) | |
progress(current_progress, f"ποΈ Generated speech {successful_lines}/{total_lines}") | |
except Exception as e: | |
add_log(f"β TTS failed for line {i+1}: {e}") | |
# Continue with remaining lines rather than failing completely | |
continue | |
if not audio_files: | |
raise Exception("No audio files were generated successfully") | |
if successful_lines < len(podcast_json['podcast']) / 2: | |
add_log(f"β οΈ Warning: Only {successful_lines}/{total_lines} lines processed successfully") | |
add_log(f"β TTS generation complete: {len(audio_files)} audio files") | |
# Combine audio files | |
combined_audio = await self.combine_audio_files(audio_files, progress) | |
elapsed_time = time.time() - start_time | |
add_log(f"π Podcast generation completed in {elapsed_time:.1f} seconds") | |
return combined_audio | |
except Exception as e: | |
elapsed_time = time.time() - start_time | |
error_msg = f"β Podcast generation failed after {elapsed_time:.1f}s: {str(e)}" | |
add_log(error_msg) | |
add_log(f"π Full traceback: {traceback.format_exc()}") | |
raise Exception(error_msg) | |
# Voice mapping | |
VOICE_MAPPING = { | |
"Andrew - English (United States)": "en-US-AndrewMultilingualNeural", | |
"Ava - English (United States)": "en-US-AvaMultilingualNeural", | |
"Brian - English (United States)": "en-US-BrianMultilingualNeural", | |
"Emma - English (United States)": "en-US-EmmaMultilingualNeural", | |
"Florian - German (Germany)": "de-DE-FlorianMultilingualNeural", | |
"Seraphina - German (Germany)": "de-DE-SeraphinaMultilingualNeural", | |
"Remy - French (France)": "fr-FR-RemyMultilingualNeural", | |
"Vivienne - French (France)": "fr-FR-VivienneMultilingualNeural" | |
} | |
async def process_input(input_text: str, input_file, language: str, speaker1: str, speaker2: str, progress=None) -> str: | |
"""Process input and generate podcast - MAIN ENTRY POINT""" | |
add_log("=" * 50) | |
add_log("π¬ NEW PODCAST GENERATION REQUEST") | |
add_log("=" * 50) | |
try: | |
if progress: | |
progress(0.05, "π Processing input...") | |
# Map speaker names to voice IDs | |
speaker1_voice = VOICE_MAPPING.get(speaker1, "en-US-AndrewMultilingualNeural") | |
speaker2_voice = VOICE_MAPPING.get(speaker2, "en-US-AvaMultilingualNeural") | |
add_log(f"π Speaker 1: {speaker1} -> {speaker1_voice}") | |
add_log(f"π Speaker 2: {speaker2} -> {speaker2_voice}") | |
# Validate input | |
if not input_text or input_text.strip() == "": | |
if input_file is None: | |
raise Exception("β Please provide either text input or upload a file") | |
add_log("π No text input provided, will process uploaded file") | |
else: | |
add_log(f"π Text input provided: {len(input_text)} characters") | |
if input_file: | |
add_log(f"π File uploaded: {input_file}") | |
# Check model status | |
if not model_loaded: | |
raise Exception("β Model not loaded. Please restart the application.") | |
podcast_generator = PodcastGenerator() | |
result = await podcast_generator.generate_podcast( | |
input_text, language, speaker1_voice, speaker2_voice, input_file, progress | |
) | |
add_log("π PODCAST GENERATION COMPLETED SUCCESSFULLY") | |
return result | |
except Exception as e: | |
error_msg = f"β CRITICAL ERROR: {str(e)}" | |
add_log(error_msg) | |
add_log(f"π Traceback: {traceback.format_exc()}") | |
raise Exception(error_msg) | |
def generate_podcast_gradio(input_text, input_file, language, speaker1, speaker2): | |
"""Gradio interface function - CRITICAL FIX #10""" | |
global logs | |
logs = [] # Reset logs for each generation | |
try: | |
add_log("π¬ Gradio function called") | |
add_log(f"π Parameters: text={bool(input_text)}, file={bool(input_file)}, lang={language}") | |
# Validate inputs | |
if not input_text and input_file is None: | |
add_log("β No input provided") | |
return None, "\n".join(logs) | |
if input_text and len(input_text.strip()) == 0: | |
input_text = None | |
# Progress tracking | |
def progress_callback(value, text): | |
add_log(f"π Progress: {value:.1%} - {text}") | |
# Create new event loop for this request - CRITICAL FIX | |
try: | |
# Try to get existing loop | |
try: | |
loop = asyncio.get_running_loop() | |
except RuntimeError: | |
loop = asyncio.new_event_loop() | |
asyncio.set_event_loop(loop) | |
if loop.is_running(): | |
# If loop is running, we need to run in thread | |
import concurrent.futures | |
with concurrent.futures.ThreadPoolExecutor() as executor: | |
future = executor.submit( | |
lambda: asyncio.run( | |
process_input(input_text, input_file, language, speaker1, speaker2, progress_callback) | |
) | |
) | |
result = future.result(timeout=300) # 5 minute timeout | |
else: | |
result = loop.run_until_complete( | |
process_input(input_text, input_file, language, speaker1, speaker2, progress_callback) | |
) | |
except RuntimeError: | |
# No event loop exists, create new one | |
result = asyncio.run( | |
process_input(input_text, input_file, language, speaker1, speaker2, progress_callback) | |
) | |
add_log("β Gradio function completed successfully") | |
return result, "\n".join(logs) | |
except Exception as e: | |
error_msg = f"β Gradio function error: {str(e)}" | |
add_log(error_msg) | |
add_log(f"π Traceback: {traceback.format_exc()}") | |
return None, "\n".join(logs) | |
def create_interface(): | |
#model_loaded = initialize_model() | |
if model_loaded: | |
test_llm_generation() | |
"""Create the Gradio interface""" | |
language_options = [ | |
"Auto Detect", "English", "German", "French", "Spanish", "Italian", | |
"Portuguese", "Dutch", "Russian", "Chinese", "Japanese", "Korean" | |
] | |
voice_options = list(VOICE_MAPPING.keys()) | |
with gr.Blocks( | |
title="Pasching Podcast 2ποΈ", | |
theme=gr.themes.Soft(), | |
css=".gradio-container {max-width: 1200px; margin: auto;}" | |
) as demo: | |
gr.Markdown("# ποΈ Pasching Podcast 2") | |
gr.Markdown("Generate professional 2-speaker podcasts from text input!") | |
# Model status indicator | |
if model_loaded: | |
gr.Markdown("β **Model Status: Ready**") | |
else: | |
gr.Markdown("β **Model Status: Failed to Load**") | |
with gr.Row(): | |
with gr.Column(scale=2): | |
input_text = gr.Textbox( | |
label="Input Text", | |
lines=8, | |
placeholder="Enter your topic or text for podcast generation...", | |
info="Describe what you want the podcast to discuss" | |
) | |
with gr.Column(scale=1): | |
input_file = gr.File( | |
label="Upload File (Optional)", | |
file_types=[".pdf", ".txt"], | |
type="filepath", | |
#info=f"Max size: {MAX_FILE_SIZE_MB}MB" | |
) | |
with gr.Row(): | |
language = gr.Dropdown( | |
label="Language", | |
choices=language_options, | |
value="Auto Detect", | |
info="Select output language" | |
) | |
speaker1 = gr.Dropdown( | |
label="Speaker 1 Voice", | |
choices=voice_options, | |
value="Andrew - English (United States)" | |
) | |
speaker2 = gr.Dropdown( | |
label="Speaker 2 Voice", | |
choices=voice_options, | |
value="Ava - English (United States)" | |
) | |
generate_btn = gr.Button( | |
"ποΈ Generate Podcast", | |
variant="primary", | |
size="lg", | |
interactive=model_loaded | |
) | |
log_output = gr.Textbox( | |
label="πͺ΅ Debug & Transcript Log", | |
lines=15, | |
interactive=False, | |
info="Real-time generation logs and debugging information" | |
) | |
output_audio = gr.Audio( | |
label="Generated Podcast", | |
type="filepath", | |
format="wav", | |
show_download_button=True | |
) | |
# Connect the interface | |
generate_btn.click( | |
fn=generate_podcast_gradio, | |
inputs=[input_text, input_file, language, speaker1, speaker2], | |
outputs=[output_audio, log_output], | |
show_progress=True | |
) | |
return demo | |
if __name__ == "__main__": | |
demo = create_interface() | |
demo.launch( | |
server_name="0.0.0.0", | |
server_port=7860, | |
show_error=True, | |
share=False | |
) | |