import gradio as gr import os import asyncio import torch import io import json import re import httpx import tempfile import wave import base64 import numpy as np import soundfile as sf import subprocess import shutil from dataclasses import dataclass from typing import List, Tuple, Dict, Optional from pathlib import Path from threading import Thread from dotenv import load_dotenv # Edge TTS imports import edge_tts from pydub import AudioSegment # OpenAI imports from openai import OpenAI # Transformers imports (for local mode) from transformers import ( AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer, BitsAndBytesConfig, ) # Spark TTS imports try: from huggingface_hub import snapshot_download SPARK_AVAILABLE = True except: SPARK_AVAILABLE = False # MeloTTS imports (for local mode) try: os.system("python -m unidic download") from melo.api import TTS as MeloTTS MELO_AVAILABLE = True except: MELO_AVAILABLE = False load_dotenv() @dataclass class ConversationConfig: max_words: int = 6000 prefix_url: str = "https://r.jina.ai/" model_name: str = "meta-llama/Meta-Llama-3.1-8B-Instruct-Turbo" local_model_name: str = "NousResearch/Hermes-2-Pro-Llama-3-8B" class UnifiedAudioConverter: def __init__(self, config: ConversationConfig): self.config = config self.llm_client = None self.local_model = None self.tokenizer = None self.melo_models = None self.spark_model_dir = None self.device = "cuda" if torch.cuda.is_available() else "cpu" def initialize_api_mode(self, api_key: str): """Initialize API mode with Together API""" self.llm_client = OpenAI(api_key=api_key, base_url="https://api.together.xyz/v1") def initialize_local_mode(self): """Initialize local mode with Hugging Face model""" if self.local_model is None: quantization_config = BitsAndBytesConfig( load_in_4bit=True, bnb_4bit_compute_dtype=torch.float16 ) self.local_model = AutoModelForCausalLM.from_pretrained( self.config.local_model_name, quantization_config=quantization_config ) self.tokenizer = AutoTokenizer.from_pretrained( self.config.local_model_name, revision='8ab73a6800796d84448bc936db9bac5ad9f984ae' ) def initialize_spark_tts(self): """Initialize Spark TTS model by downloading if needed""" if not SPARK_AVAILABLE: raise RuntimeError("Spark TTS dependencies not available") model_dir = "pretrained_models/Spark-TTS-0.5B" # Check if model exists, if not download it if not os.path.exists(model_dir): print("Downloading Spark-TTS model...") try: os.makedirs("pretrained_models", exist_ok=True) snapshot_download( "SparkAudio/Spark-TTS-0.5B", local_dir=model_dir ) print("Spark-TTS model downloaded successfully") except Exception as e: raise RuntimeError(f"Failed to download Spark-TTS model: {e}") self.spark_model_dir = model_dir # Check if we have the CLI inference script if not os.path.exists("cli/inference.py"): print("Warning: Spark-TTS CLI not found. Please clone the Spark-TTS repository.") def initialize_melo_tts(self): """Initialize MeloTTS models""" if MELO_AVAILABLE and self.melo_models is None: self.melo_models = {"EN": MeloTTS(language="EN", device=self.device)} def fetch_text(self, url: str) -> str: """Fetch text content from URL""" if not url: raise ValueError("URL cannot be empty") if not url.startswith("http://") and not url.startswith("https://"): raise ValueError("URL must start with 'http://' or 'https://'") full_url = f"{self.config.prefix_url}{url}" try: response = httpx.get(full_url, timeout=60.0) response.raise_for_status() return response.text except httpx.HTTPError as e: raise RuntimeError(f"Failed to fetch URL: {e}") def _build_prompt(self, text: str) -> str: """Build prompt for conversation generation""" template = """ { "conversation": [ {"speaker": "", "text": ""}, {"speaker": "", "text": ""} ] } """ return ( f"{text}\n\nConvert the provided text into a short, informative and crisp " f"podcast conversation between two experts. The tone should be " f"professional and engaging. Please adhere to the following " f"format and return ONLY the JSON:\n{template}" ) def extract_conversation_api(self, text: str) -> Dict: """Extract conversation using API""" if not self.llm_client: raise RuntimeError("API mode not initialized") try: chat_completion = self.llm_client.chat.completions.create( messages=[{"role": "user", "content": self._build_prompt(text)}], model=self.config.model_name, ) pattern = r"\{(?:[^{}]|(?:\{[^{}]*\}))*\}" json_match = re.search(pattern, chat_completion.choices[0].message.content) if not json_match: raise ValueError("No valid JSON found in response") return json.loads(json_match.group()) except Exception as e: raise RuntimeError(f"Failed to extract conversation: {e}") def extract_conversation_local(self, text: str, progress=None) -> Dict: """Extract conversation using local model""" if not self.local_model or not self.tokenizer: raise RuntimeError("Local mode not initialized") chat = [{ "role": "user", "content": self._build_prompt(text) }] terminators = [ self.tokenizer.eos_token_id, self.tokenizer.convert_tokens_to_ids("<|eot_id|>") ] messages = self.tokenizer.apply_chat_template( chat, tokenize=False, add_generation_prompt=True ) model_inputs = self.tokenizer([messages], return_tensors="pt").to(self.device) streamer = TextIteratorStreamer( self.tokenizer, timeout=10.0, skip_prompt=True, skip_special_tokens=True ) generate_kwargs = dict( model_inputs, streamer=streamer, max_new_tokens=4000, do_sample=True, temperature=0.9, eos_token_id=terminators, ) t = Thread(target=self.local_model.generate, kwargs=generate_kwargs) t.start() partial_text = "" for new_text in streamer: partial_text += new_text pattern = r"\{(?:[^{}]|(?:\{[^{}]*\}))*\}" json_match = re.search(pattern, partial_text) if json_match: return json.loads(json_match.group()) else: # Return a default template if no valid JSON found return { "conversation": [ {"speaker": "Host", "text": "Welcome to our podcast."}, {"speaker": "Guest", "text": "Thank you for having me."} ] } def parse_conversation_text(self, conversation_text: str) -> Dict: """Parse conversation text back to JSON format""" lines = conversation_text.strip().split('\n') conversation_data = {"conversation": []} for line in lines: if ':' in line: speaker, text = line.split(':', 1) conversation_data["conversation"].append({ "speaker": speaker.strip(), "text": text.strip() }) return conversation_data async def text_to_speech_edge(self, conversation_json: Dict, voice_1: str, voice_2: str) -> Tuple[str, str]: """Convert text to speech using Edge TTS""" output_dir = Path(self._create_output_directory()) filenames = [] try: for i, turn in enumerate(conversation_json["conversation"]): filename = output_dir / f"output_{i}.wav" voice = voice_1 if i % 2 == 0 else voice_2 tmp_path = await self._generate_audio_edge(turn["text"], voice) os.rename(tmp_path, filename) filenames.append(str(filename)) # Combine audio files final_output = os.path.join(output_dir, "combined_output.wav") self._combine_audio_files(filenames, final_output) # Generate conversation text conversation_text = "\n".join( f"{turn.get('speaker', f'Speaker {i+1}')}: {turn['text']}" for i, turn in enumerate(conversation_json["conversation"]) ) return final_output, conversation_text except Exception as e: raise RuntimeError(f"Failed to convert text to speech: {e}") async def _generate_audio_edge(self, text: str, voice: str) -> str: """Generate audio using Edge TTS""" if not text.strip(): raise ValueError("Text cannot be empty") voice_short_name = voice.split(" - ")[0] if " - " in voice else voice communicate = edge_tts.Communicate(text, voice_short_name) with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as tmp_file: tmp_path = tmp_file.name await communicate.save(tmp_path) return tmp_path def text_to_speech_spark(self, conversation_json: Dict, progress=None) -> Tuple[str, str]: """Convert text to speech using Spark TTS CLI""" if not SPARK_AVAILABLE or not self.spark_model_dir: raise RuntimeError("Spark TTS not available") try: output_dir = self._create_output_directory() audio_files = [] # Create different voice characteristics for different speakers voice_configs = [ {"prompt_text": "Hello, welcome to our podcast. I'm your host today.", "gender": "female"}, {"prompt_text": "Thank you for having me. I'm excited to be here.", "gender": "male"} ] for i, turn in enumerate(conversation_json["conversation"]): text = turn["text"] if not text.strip(): continue # Use different voice config for each speaker voice_config = voice_configs[i % len(voice_configs)] output_file = os.path.join(output_dir, f"spark_output_{i}.wav") # Run Spark TTS CLI inference cmd = [ "python", "-m", "cli.inference", "--text", text, "--device", "0" if torch.cuda.is_available() else "cpu", "--save_dir", output_dir, "--model_dir", self.spark_model_dir, "--prompt_text", voice_config["prompt_text"], "--output_name", f"spark_output_{i}.wav" ] try: # Run the command result = subprocess.run( cmd, capture_output=True, text=True, timeout=60, cwd="." # Make sure we're in the right directory ) if result.returncode == 0: audio_files.append(output_file) else: print(f"Spark TTS error for turn {i}: {result.stderr}") # Create a short silence as fallback silence = np.zeros(int(22050 * 1.0)) # 1 second of silence sf.write(output_file, silence, 22050) audio_files.append(output_file) except subprocess.TimeoutExpired: print(f"Spark TTS timeout for turn {i}") # Create silence as fallback silence = np.zeros(int(22050 * 1.0)) sf.write(output_file, silence, 22050) audio_files.append(output_file) except Exception as e: print(f"Error running Spark TTS for turn {i}: {e}") # Create silence as fallback silence = np.zeros(int(22050 * 1.0)) sf.write(output_file, silence, 22050) audio_files.append(output_file) # Combine all audio files if audio_files: final_output = os.path.join(output_dir, "spark_combined.wav") self._combine_audio_files(audio_files, final_output) else: raise RuntimeError("No audio files generated") # Generate conversation text conversation_text = "\n".join( f"{turn.get('speaker', f'Speaker {i+1}')}: {turn['text']}" for i, turn in enumerate(conversation_json["conversation"]) ) return final_output, conversation_text except Exception as e: raise RuntimeError(f"Failed to convert text to speech with Spark TTS: {e}") def text_to_speech_melo(self, conversation_json: Dict, progress=None) -> Tuple[str, str]: """Convert text to speech using MeloTTS""" if not MELO_AVAILABLE or not self.melo_models: raise RuntimeError("MeloTTS not available") speakers = ["EN-Default", "EN-US"] combined_audio = AudioSegment.empty() for i, turn in enumerate(conversation_json["conversation"]): bio = io.BytesIO() text = turn["text"] speaker = speakers[i % 2] speaker_id = self.melo_models["EN"].hps.data.spk2id[speaker] # Generate audio self.melo_models["EN"].tts_to_file( text, speaker_id, bio, speed=1.0, pbar=progress.tqdm if progress else None, format="wav" ) bio.seek(0) audio_segment = AudioSegment.from_file(bio, format="wav") combined_audio += audio_segment # Save final audio final_audio_path = "melo_podcast.mp3" combined_audio.export(final_audio_path, format="mp3") # Generate conversation text conversation_text = "\n".join( f"{turn.get('speaker', f'Speaker {i+1}')}: {turn['text']}" for i, turn in enumerate(conversation_json["conversation"]) ) return final_audio_path, conversation_text def _create_output_directory(self) -> str: """Create a unique output directory""" random_bytes = os.urandom(8) folder_name = base64.urlsafe_b64encode(random_bytes).decode("utf-8") os.makedirs(folder_name, exist_ok=True) return folder_name def _combine_audio_files(self, filenames: List[str], output_file: str) -> None: """Combine multiple audio files into one""" if not filenames: raise ValueError("No input files provided") try: audio_segments = [] for filename in filenames: if os.path.exists(filename): audio_segment = AudioSegment.from_file(filename) audio_segments.append(audio_segment) if audio_segments: combined = sum(audio_segments) combined.export(output_file, format="wav") # Clean up temporary files for filename in filenames: if os.path.exists(filename): os.remove(filename) except Exception as e: raise RuntimeError(f"Failed to combine audio files: {e}") # Global converter instance converter = UnifiedAudioConverter(ConversationConfig()) async def synthesize(article_url: str, mode: str = "API", tts_engine: str = "Edge-TTS"): """Main synthesis function""" if not article_url: return "Please provide a valid URL.", None try: # Fetch text from URL text = converter.fetch_text(article_url) # Limit text to max words words = text.split() if len(words) > converter.config.max_words: text = " ".join(words[:converter.config.max_words]) # Extract conversation based on mode if mode == "API": api_key = os.environ.get("TOGETHER_API_KEY") if not api_key: return "API key not found. Please set TOGETHER_API_KEY environment variable.", None converter.initialize_api_mode(api_key) conversation_json = converter.extract_conversation_api(text) else: # Local mode converter.initialize_local_mode() conversation_json = converter.extract_conversation_local(text) # Generate conversation text conversation_text = "\n".join( f"{turn.get('speaker', f'Speaker {i+1}')}: {turn['text']}" for i, turn in enumerate(conversation_json["conversation"]) ) return conversation_text, None except Exception as e: return f"Error: {str(e)}", None async def regenerate_audio(conversation_text: str, tts_engine: str = "Edge-TTS"): """Regenerate audio from edited conversation text""" if not conversation_text.strip(): return "Please provide conversation text.", None try: # Parse the conversation text back to JSON format conversation_json = converter.parse_conversation_text(conversation_text) if not conversation_json["conversation"]: return "No valid conversation found in the text.", None # Generate audio based on TTS engine if tts_engine == "Edge-TTS": output_file, _ = await converter.text_to_speech_edge( conversation_json, "en-US-AvaMultilingualNeural", "en-US-AndrewMultilingualNeural" ) elif tts_engine == "Spark-TTS": if not SPARK_AVAILABLE: return "Spark TTS not available. Please install required dependencies and clone the Spark-TTS repository.", None converter.initialize_spark_tts() output_file, _ = converter.text_to_speech_spark(conversation_json) else: # MeloTTS if not MELO_AVAILABLE: return "MeloTTS not available. Please install required dependencies.", None converter.initialize_melo_tts() output_file, _ = converter.text_to_speech_melo(conversation_json) return "Audio generated successfully!", output_file except Exception as e: return f"Error generating audio: {str(e)}", None def synthesize_sync(article_url: str, mode: str = "API", tts_engine: str = "Edge-TTS"): """Synchronous wrapper for async synthesis""" return asyncio.run(synthesize(article_url, mode, tts_engine)) def regenerate_audio_sync(conversation_text: str, tts_engine: str = "Edge-TTS"): """Synchronous wrapper for async audio regeneration""" return asyncio.run(regenerate_audio(conversation_text, tts_engine)) # Gradio Interface with gr.Blocks(theme='soft', title="URL to Podcast Converter") as demo: gr.Markdown("# πŸŽ™οΈ URL to Podcast Converter") gr.Markdown("Convert any article, blog, or news into an engaging podcast conversation!") with gr.Row(): with gr.Column(scale=3): url_input = gr.Textbox( label="Article URL", placeholder="Enter the article URL here...", value="" ) with gr.Column(scale=1): mode_selector = gr.Radio( choices=["API", "Local"], value="API", label="Processing Mode", info="API: Faster, requires API key | Local: Slower, runs on device" ) # TTS μ—”μ§„ 선택 - κΈ°λ³Έ 2κ°œμ™€ μΆ”κ°€ μ˜΅μ…˜μœΌλ‘œ ꡬ뢄 with gr.Group(): gr.Markdown("### TTS Engine Selection") tts_selector = gr.Radio( choices=["Edge-TTS", "Spark-TTS", "MeloTTS"], value="Edge-TTS", label="TTS Engine", info="Edge-TTS: Cloud-based, natural voices | Spark-TTS: Local AI model | MeloTTS: Local, requires GPU" ) gr.Markdown(""" **Recommended:** - 🌟 **Edge-TTS**: Best quality, cloud-based, instant setup - πŸ€– **Spark-TTS**: Local AI model (0.5B), zero-shot voice cloning **Additional Option:** - ⚑ **MeloTTS**: Local processing, GPU recommended """) convert_btn = gr.Button("🎯 Generate Conversation", variant="primary", size="lg") with gr.Row(): with gr.Column(): conversation_output = gr.Textbox( label="Generated Conversation (Editable)", lines=15, max_lines=30, interactive=True, placeholder="Generated conversation will appear here. You can edit it before generating audio.", info="Edit the conversation as needed. Format: 'Speaker Name: Text'" ) # μ˜€λ””μ˜€ 생성 λ²„νŠΌ μΆ”κ°€ with gr.Row(): generate_audio_btn = gr.Button("πŸŽ™οΈ Generate Audio from Text", variant="secondary", size="lg") gr.Markdown("*Edit the conversation above, then click to generate audio*") with gr.Column(): audio_output = gr.Audio( label="Podcast Audio", type="filepath", interactive=False ) # μƒνƒœ λ©”μ‹œμ§€ μΆ”κ°€ status_output = gr.Textbox( label="Status", interactive=False, visible=True ) # TTS 엔진별 μ„€λͺ… 및 μ„€μΉ˜ μ•ˆλ‚΄ μΆ”κ°€ with gr.Row(): gr.Markdown(""" ### TTS Engine Details: - **Edge-TTS**: Microsoft's cloud TTS service with high-quality natural voices. Requires internet connection. - **Spark-TTS**: SparkAudio's local AI model (0.5B parameters) with zero-shot voice cloning capability. - **Setup required**: Clone [Spark-TTS repository](https://github.com/SparkAudio/Spark-TTS) in current directory - Features: Bilingual support (Chinese/English), controllable speech generation - License: CC BY-NC-SA (Non-commercial use only) - **MeloTTS**: Local TTS with multiple voice options. GPU recommended for better performance. ### Spark-TTS Setup Instructions: ```bash git clone https://github.com/SparkAudio/Spark-TTS.git cd Spark-TTS pip install -r requirements.txt ``` """) gr.Examples( examples=[ ["https://huggingface.co/blog/openfree/cycle-navigator", "API", "Edge-TTS"], ["https://www.bbc.com/news/technology-67988517", "API", "Spark-TTS"], ["https://arxiv.org/abs/2301.00810", "API", "Edge-TTS"], ], inputs=[url_input, mode_selector, tts_selector], outputs=[conversation_output, status_output], fn=synthesize_sync, cache_examples=False, ) # 이벀트 μ—°κ²° convert_btn.click( fn=synthesize_sync, inputs=[url_input, mode_selector, tts_selector], outputs=[conversation_output, status_output] ) generate_audio_btn.click( fn=regenerate_audio_sync, inputs=[conversation_output, tts_selector], outputs=[status_output, audio_output] ) # Launch the app if __name__ == "__main__": demo.queue(api_open=True, default_concurrency_limit=10).launch( show_api=True, share=False, server_name="0.0.0.0", server_port=7860 )