import gradio as gr import os import asyncio import torch import io import json import re import httpx import tempfile import wave import base64 import numpy as np import soundfile as sf import subprocess import shutil from dataclasses import dataclass from typing import List, Tuple, Dict, Optional from pathlib import Path from threading import Thread from dotenv import load_dotenv # Edge TTS imports import edge_tts from pydub import AudioSegment # OpenAI imports from openai import OpenAI # Transformers imports (for local mode) from transformers import ( AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer, BitsAndBytesConfig, ) # Spark TTS imports try: from huggingface_hub import snapshot_download SPARK_AVAILABLE = True except: SPARK_AVAILABLE = False # MeloTTS imports (for local mode) try: os.system("python -m unidic download") from melo.api import TTS as MeloTTS MELO_AVAILABLE = True except: MELO_AVAILABLE = False load_dotenv() @dataclass class ConversationConfig: max_words: int = 6000 prefix_url: str = "https://r.jina.ai/" model_name: str = "meta-llama/Meta-Llama-3.1-8B-Instruct-Turbo" local_model_name: str = "NousResearch/Hermes-2-Pro-Llama-3-8B" class UnifiedAudioConverter: def __init__(self, config: ConversationConfig): self.config = config self.llm_client = None self.local_model = None self.tokenizer = None self.melo_models = None self.spark_model_dir = None self.device = "cuda" if torch.cuda.is_available() else "cpu" def initialize_api_mode(self, api_key: str): """Initialize API mode with Together API""" self.llm_client = OpenAI(api_key=api_key, base_url="https://api.together.xyz/v1") def initialize_local_mode(self): """Initialize local mode with Hugging Face model""" if self.local_model is None: quantization_config = BitsAndBytesConfig( load_in_4bit=True, bnb_4bit_compute_dtype=torch.float16 ) self.local_model = AutoModelForCausalLM.from_pretrained( self.config.local_model_name, quantization_config=quantization_config ) self.tokenizer = AutoTokenizer.from_pretrained( self.config.local_model_name, revision='8ab73a6800796d84448bc936db9bac5ad9f984ae' ) def initialize_spark_tts(self): """Initialize Spark TTS model by downloading if needed""" if not SPARK_AVAILABLE: raise RuntimeError("Spark TTS dependencies not available") model_dir = "pretrained_models/Spark-TTS-0.5B" # Check if model exists, if not download it if not os.path.exists(model_dir): print("Downloading Spark-TTS model...") try: os.makedirs("pretrained_models", exist_ok=True) snapshot_download( "SparkAudio/Spark-TTS-0.5B", local_dir=model_dir ) print("Spark-TTS model downloaded successfully") except Exception as e: raise RuntimeError(f"Failed to download Spark-TTS model: {e}") self.spark_model_dir = model_dir # Check if we have the CLI inference script if not os.path.exists("cli/inference.py"): print("Warning: Spark-TTS CLI not found. Please clone the Spark-TTS repository.") def initialize_melo_tts(self): """Initialize MeloTTS models""" if MELO_AVAILABLE and self.melo_models is None: self.melo_models = {"EN": MeloTTS(language="EN", device=self.device)} def fetch_text(self, url: str) -> str: """Fetch text content from URL""" if not url: raise ValueError("URL cannot be empty") if not url.startswith("http://") and not url.startswith("https://"): raise ValueError("URL must start with 'http://' or 'https://'") full_url = f"{self.config.prefix_url}{url}" try: response = httpx.get(full_url, timeout=60.0) response.raise_for_status() return response.text except httpx.HTTPError as e: raise RuntimeError(f"Failed to fetch URL: {e}") def _build_prompt(self, text: str, language: str = "English") -> str: """Build prompt for conversation generation""" if language == "Korean": template = """ { "conversation": [ {"speaker": "", "text": ""}, {"speaker": "", "text": ""} ] } """ return ( f"{text}\n\n제공된 텍스트를 두 명의 전문가 간의 짧고 유익하며 명확한 " f"팟캐스트 대화로 변환해주세요. 톤은 전문적이고 매력적이어야 합니다. " f"다음 형식을 준수하고 JSON만 반환해주세요:\n{template}" ) else: template = """ { "conversation": [ {"speaker": "", "text": ""}, {"speaker": "", "text": ""} ] } """ return ( f"{text}\n\nConvert the provided text into a short, informative and crisp " f"podcast conversation between two experts. The tone should be " f"professional and engaging. Please adhere to the following " f"format and return ONLY the JSON:\n{template}" ) def extract_conversation_api(self, text: str, language: str = "English") -> Dict: """Extract conversation using API""" if not self.llm_client: raise RuntimeError("API mode not initialized") try: # 언어별 프롬프트 구성 if language == "Korean": system_message = "당신은 한국어로 팟캐스트 대화를 생성하는 전문가입니다. 자연스럽고 유익한 한국어 대화를 만들어주세요." else: system_message = "You are an expert at creating podcast conversations in English. Create natural and informative English conversations." chat_completion = self.llm_client.chat.completions.create( messages=[ {"role": "system", "content": system_message}, {"role": "user", "content": self._build_prompt(text, language)} ], model=self.config.model_name, ) pattern = r"\{(?:[^{}]|(?:\{[^{}]*\}))*\}" json_match = re.search(pattern, chat_completion.choices[0].message.content) if not json_match: raise ValueError("No valid JSON found in response") return json.loads(json_match.group()) except Exception as e: raise RuntimeError(f"Failed to extract conversation: {e}") def extract_conversation_local(self, text: str, language: str = "English", progress=None) -> Dict: """Extract conversation using local model""" if not self.local_model or not self.tokenizer: raise RuntimeError("Local mode not initialized") # 언어별 시스템 메시지 if language == "Korean": system_message = "당신은 한국어로 팟캐스트 대화를 생성하는 전문가입니다. 자연스럽고 유익한 한국어 대화를 만들어주세요." else: system_message = "You are an expert at creating podcast conversations in English. Create natural and informative English conversations." chat = [ {"role": "system", "content": system_message}, {"role": "user", "content": self._build_prompt(text, language)} ] terminators = [ self.tokenizer.eos_token_id, self.tokenizer.convert_tokens_to_ids("<|eot_id|>") ] messages = self.tokenizer.apply_chat_template( chat, tokenize=False, add_generation_prompt=True ) model_inputs = self.tokenizer([messages], return_tensors="pt").to(self.device) streamer = TextIteratorStreamer( self.tokenizer, timeout=10.0, skip_prompt=True, skip_special_tokens=True ) generate_kwargs = dict( model_inputs, streamer=streamer, max_new_tokens=4000, do_sample=True, temperature=0.9, eos_token_id=terminators, ) t = Thread(target=self.local_model.generate, kwargs=generate_kwargs) t.start() partial_text = "" for new_text in streamer: partial_text += new_text pattern = r"\{(?:[^{}]|(?:\{[^{}]*\}))*\}" json_match = re.search(pattern, partial_text) if json_match: return json.loads(json_match.group()) else: # Return a default template based on language if language == "Korean": return { "conversation": [ {"speaker": "진행자", "text": "안녕하세요, 팟캐스트에 오신 것을 환영합니다."}, {"speaker": "게스트", "text": "안녕하세요, 초대해 주셔서 감사합니다."} ] } else: return { "conversation": [ {"speaker": "Host", "text": "Welcome to our podcast."}, {"speaker": "Guest", "text": "Thank you for having me."} ] } def parse_conversation_text(self, conversation_text: str) -> Dict: """Parse conversation text back to JSON format""" lines = conversation_text.strip().split('\n') conversation_data = {"conversation": []} for line in lines: if ':' in line: speaker, text = line.split(':', 1) conversation_data["conversation"].append({ "speaker": speaker.strip(), "text": text.strip() }) return conversation_data async def text_to_speech_edge(self, conversation_json: Dict, language: str = "English") -> Tuple[str, str]: """Convert text to speech using Edge TTS""" output_dir = Path(self._create_output_directory()) filenames = [] try: # 언어별 음성 설정 if language == "Korean": voices = [ "ko-KR-SunHiNeural", # 여성 음성 (자연스러운 한국어) "ko-KR-InJoonNeural" # 남성 음성 (자연스러운 한국어) ] else: voices = [ "en-US-AvaMultilingualNeural", # 여성 음성 "en-US-AndrewMultilingualNeural" # 남성 음성 ] for i, turn in enumerate(conversation_json["conversation"]): filename = output_dir / f"output_{i}.wav" voice = voices[i % len(voices)] tmp_path = await self._generate_audio_edge(turn["text"], voice) os.rename(tmp_path, filename) filenames.append(str(filename)) # Combine audio files final_output = os.path.join(output_dir, "combined_output.wav") self._combine_audio_files(filenames, final_output) # Generate conversation text conversation_text = "\n".join( f"{turn.get('speaker', f'Speaker {i+1}')}: {turn['text']}" for i, turn in enumerate(conversation_json["conversation"]) ) return final_output, conversation_text except Exception as e: raise RuntimeError(f"Failed to convert text to speech: {e}") async def _generate_audio_edge(self, text: str, voice: str) -> str: """Generate audio using Edge TTS""" if not text.strip(): raise ValueError("Text cannot be empty") voice_short_name = voice.split(" - ")[0] if " - " in voice else voice communicate = edge_tts.Communicate(text, voice_short_name) with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as tmp_file: tmp_path = tmp_file.name await communicate.save(tmp_path) return tmp_path def text_to_speech_spark(self, conversation_json: Dict, language: str = "English", progress=None) -> Tuple[str, str]: """Convert text to speech using Spark TTS CLI""" if not SPARK_AVAILABLE or not self.spark_model_dir: raise RuntimeError("Spark TTS not available") try: output_dir = self._create_output_directory() audio_files = [] # Create different voice characteristics for different speakers if language == "Korean": voice_configs = [ {"prompt_text": "안녕하세요, 오늘 팟캐스트 진행을 맡은 진행자입니다.", "gender": "female"}, {"prompt_text": "안녕하세요, 오늘 게스트로 참여하게 되어 기쁩니다.", "gender": "male"} ] else: voice_configs = [ {"prompt_text": "Hello, welcome to our podcast. I'm your host today.", "gender": "female"}, {"prompt_text": "Thank you for having me. I'm excited to be here.", "gender": "male"} ] for i, turn in enumerate(conversation_json["conversation"]): text = turn["text"] if not text.strip(): continue # Use different voice config for each speaker voice_config = voice_configs[i % len(voice_configs)] output_file = os.path.join(output_dir, f"spark_output_{i}.wav") # Run Spark TTS CLI inference cmd = [ "python", "-m", "cli.inference", "--text", text, "--device", "0" if torch.cuda.is_available() else "cpu", "--save_dir", output_dir, "--model_dir", self.spark_model_dir, "--prompt_text", voice_config["prompt_text"], "--output_name", f"spark_output_{i}.wav" ] try: # Run the command result = subprocess.run( cmd, capture_output=True, text=True, timeout=60, cwd="." # Make sure we're in the right directory ) if result.returncode == 0: audio_files.append(output_file) else: print(f"Spark TTS error for turn {i}: {result.stderr}") # Create a short silence as fallback silence = np.zeros(int(22050 * 1.0)) # 1 second of silence sf.write(output_file, silence, 22050) audio_files.append(output_file) except subprocess.TimeoutExpired: print(f"Spark TTS timeout for turn {i}") # Create silence as fallback silence = np.zeros(int(22050 * 1.0)) sf.write(output_file, silence, 22050) audio_files.append(output_file) except Exception as e: print(f"Error running Spark TTS for turn {i}: {e}") # Create silence as fallback silence = np.zeros(int(22050 * 1.0)) sf.write(output_file, silence, 22050) audio_files.append(output_file) # Combine all audio files if audio_files: final_output = os.path.join(output_dir, "spark_combined.wav") self._combine_audio_files(audio_files, final_output) else: raise RuntimeError("No audio files generated") # Generate conversation text conversation_text = "\n".join( f"{turn.get('speaker', f'Speaker {i+1}')}: {turn['text']}" for i, turn in enumerate(conversation_json["conversation"]) ) return final_output, conversation_text except Exception as e: raise RuntimeError(f"Failed to convert text to speech with Spark TTS: {e}") def text_to_speech_melo(self, conversation_json: Dict, progress=None) -> Tuple[str, str]: """Convert text to speech using MeloTTS""" if not MELO_AVAILABLE or not self.melo_models: raise RuntimeError("MeloTTS not available") speakers = ["EN-Default", "EN-US"] combined_audio = AudioSegment.empty() for i, turn in enumerate(conversation_json["conversation"]): bio = io.BytesIO() text = turn["text"] speaker = speakers[i % 2] speaker_id = self.melo_models["EN"].hps.data.spk2id[speaker] # Generate audio self.melo_models["EN"].tts_to_file( text, speaker_id, bio, speed=1.0, pbar=progress.tqdm if progress else None, format="wav" ) bio.seek(0) audio_segment = AudioSegment.from_file(bio, format="wav") combined_audio += audio_segment # Save final audio final_audio_path = "melo_podcast.mp3" combined_audio.export(final_audio_path, format="mp3") # Generate conversation text conversation_text = "\n".join( f"{turn.get('speaker', f'Speaker {i+1}')}: {turn['text']}" for i, turn in enumerate(conversation_json["conversation"]) ) return final_audio_path, conversation_text def _create_output_directory(self) -> str: """Create a unique output directory""" random_bytes = os.urandom(8) folder_name = base64.urlsafe_b64encode(random_bytes).decode("utf-8") os.makedirs(folder_name, exist_ok=True) return folder_name def _combine_audio_files(self, filenames: List[str], output_file: str) -> None: """Combine multiple audio files into one""" if not filenames: raise ValueError("No input files provided") try: audio_segments = [] for filename in filenames: if os.path.exists(filename): audio_segment = AudioSegment.from_file(filename) audio_segments.append(audio_segment) if audio_segments: combined = sum(audio_segments) combined.export(output_file, format="wav") # Clean up temporary files for filename in filenames: if os.path.exists(filename): os.remove(filename) except Exception as e: raise RuntimeError(f"Failed to combine audio files: {e}") # Global converter instance converter = UnifiedAudioConverter(ConversationConfig()) async def synthesize(article_url: str, mode: str = "API", tts_engine: str = "Edge-TTS", language: str = "English"): """Main synthesis function""" if not article_url: return "Please provide a valid URL.", None try: # Fetch text from URL text = converter.fetch_text(article_url) # Limit text to max words words = text.split() if len(words) > converter.config.max_words: text = " ".join(words[:converter.config.max_words]) # Extract conversation based on mode if mode == "API": api_key = os.environ.get("TOGETHER_API_KEY") if not api_key: return "API key not found. Please set TOGETHER_API_KEY environment variable.", None converter.initialize_api_mode(api_key) conversation_json = converter.extract_conversation_api(text, language) else: # Local mode converter.initialize_local_mode() conversation_json = converter.extract_conversation_local(text, language) # Generate conversation text conversation_text = "\n".join( f"{turn.get('speaker', f'Speaker {i+1}')}: {turn['text']}" for i, turn in enumerate(conversation_json["conversation"]) ) return conversation_text, None except Exception as e: return f"Error: {str(e)}", None async def regenerate_audio(conversation_text: str, tts_engine: str = "Edge-TTS", language: str = "English"): """Regenerate audio from edited conversation text""" if not conversation_text.strip(): return "Please provide conversation text.", None try: # Parse the conversation text back to JSON format conversation_json = converter.parse_conversation_text(conversation_text) if not conversation_json["conversation"]: return "No valid conversation found in the text.", None # 한국어인 경우 Edge-TTS만 사용 (다른 TTS는 한국어 지원이 제한적) if language == "Korean" and tts_engine != "Edge-TTS": return "한국어는 Edge-TTS만 지원됩니다. TTS 엔진이 자동으로 Edge-TTS로 변경됩니다.", None # Generate audio based on TTS engine if tts_engine == "Edge-TTS": output_file, _ = await converter.text_to_speech_edge(conversation_json, language) elif tts_engine == "Spark-TTS": if not SPARK_AVAILABLE: return "Spark TTS not available. Please install required dependencies and clone the Spark-TTS repository.", None converter.initialize_spark_tts() output_file, _ = converter.text_to_speech_spark(conversation_json, language) else: # MeloTTS if not MELO_AVAILABLE: return "MeloTTS not available. Please install required dependencies.", None if language == "Korean": return "MeloTTS does not support Korean. Please use Edge-TTS for Korean.", None converter.initialize_melo_tts() output_file, _ = converter.text_to_speech_melo(conversation_json) return "Audio generated successfully!", output_file except Exception as e: return f"Error generating audio: {str(e)}", None def synthesize_sync(article_url: str, mode: str = "API", tts_engine: str = "Edge-TTS", language: str = "English"): """Synchronous wrapper for async synthesis""" return asyncio.run(synthesize(article_url, mode, tts_engine, language)) def regenerate_audio_sync(conversation_text: str, tts_engine: str = "Edge-TTS", language: str = "English"): """Synchronous wrapper for async audio regeneration""" return asyncio.run(regenerate_audio(conversation_text, tts_engine, language)) def update_tts_engine_for_korean(language): """한국어 선택 시 TTS 엔진 옵션 업데이트""" if language == "Korean": return gr.Radio( choices=["Edge-TTS"], value="Edge-TTS", label="TTS Engine", info="한국어는 Edge-TTS만 지원됩니다", interactive=False ) else: return gr.Radio( choices=["Edge-TTS", "Spark-TTS", "MeloTTS"], value="Edge-TTS", label="TTS Engine", info="Edge-TTS: Cloud-based, natural voices | Spark-TTS: Local AI model | MeloTTS: Local, requires GPU", interactive=True ) # Gradio Interface with gr.Blocks(theme='soft', title="URL to Podcast Converter") as demo: gr.Markdown("# 🎙️ URL to Podcast Converter") gr.Markdown("Convert any article, blog, or news into an engaging podcast conversation!") with gr.Row(): with gr.Column(scale=3): url_input = gr.Textbox( label="Article URL", placeholder="Enter the article URL here...", value="" ) with gr.Column(scale=1): # 언어 선택 추가 language_selector = gr.Radio( choices=["English", "Korean"], value="English", label="Language / 언어", info="Select output language / 출력 언어를 선택하세요" ) mode_selector = gr.Radio( choices=["API", "Local"], value="API", label="Processing Mode", info="API: Faster, requires API key | Local: Slower, runs on device" ) # TTS 엔진 선택 with gr.Group(): gr.Markdown("### TTS Engine Selection") tts_selector = gr.Radio( choices=["Edge-TTS", "Spark-TTS", "MeloTTS"], value="Edge-TTS", label="TTS Engine", info="Edge-TTS: Cloud-based, natural voices | Spark-TTS: Local AI model | MeloTTS: Local, requires GPU" ) gr.Markdown(""" **Recommended:** - 🌟 **Edge-TTS**: Best quality, cloud-based, instant setup - 🤖 **Spark-TTS**: Local AI model (0.5B), zero-shot voice cloning **Additional Option:** - ⚡ **MeloTTS**: Local processing, GPU recommended **한국어 지원:** - 🇰🇷 한국어 선택 시 Edge-TTS만 사용 가능합니다 """) convert_btn = gr.Button("🎯 Generate Conversation / 대화 생성", variant="primary", size="lg") with gr.Row(): with gr.Column(): conversation_output = gr.Textbox( label="Generated Conversation (Editable) / 생성된 대화 (편집 가능)", lines=15, max_lines=30, interactive=True, placeholder="Generated conversation will appear here. You can edit it before generating audio.\n생성된 대화가 여기에 표시됩니다. 오디오 생성 전에 편집할 수 있습니다.", info="Edit the conversation as needed. Format: 'Speaker Name: Text' / 필요에 따라 대화를 편집하세요. 형식: '화자 이름: 텍스트'" ) # 오디오 생성 버튼 추가 with gr.Row(): generate_audio_btn = gr.Button("🎙️ Generate Audio from Text / 텍스트에서 오디오 생성", variant="secondary", size="lg") gr.Markdown("*Edit the conversation above, then click to generate audio / 위의 대화를 편집한 후 클릭하여 오디오를 생성하세요*") with gr.Column(): audio_output = gr.Audio( label="Podcast Audio / 팟캐스트 오디오", type="filepath", interactive=False ) # 상태 메시지 추가 status_output = gr.Textbox( label="Status / 상태", interactive=False, visible=True ) # TTS 엔진별 설명 및 설치 안내 추가 with gr.Row(): gr.Markdown(""" ### TTS Engine Details / TTS 엔진 상세정보: - **Edge-TTS**: Microsoft's cloud TTS service with high-quality natural voices. Requires internet connection. - 🇰🇷 **한국어 지원**: 자연스러운 한국어 음성 (여성: SunHi, 남성: InJoon) - **Spark-TTS**: SparkAudio's local AI model (0.5B parameters) with zero-shot voice cloning capability. - **Setup required**: Clone [Spark-TTS repository](https://github.com/SparkAudio/Spark-TTS) in current directory - Features: Bilingual support (Chinese/English), controllable speech generation - License: CC BY-NC-SA (Non-commercial use only) - ⚠️ **한국어 미지원** - **MeloTTS**: Local TTS with multiple voice options. GPU recommended for better performance. - ⚠️ **한국어 미지원** ### Spark-TTS Setup Instructions: ```bash git clone https://github.com/SparkAudio/Spark-TTS.git cd Spark-TTS pip install -r requirements.txt ``` """) gr.Examples( examples=[ ["https://huggingface.co/blog/openfree/cycle-navigator", "API", "Edge-TTS", "English"], ["https://www.bbc.com/news/technology-67988517", "API", "Spark-TTS", "English"], ["https://arxiv.org/abs/2301.00810", "API", "Edge-TTS", "Korean"], ], inputs=[url_input, mode_selector, tts_selector, language_selector], outputs=[conversation_output, status_output], fn=synthesize_sync, cache_examples=False, ) # 언어 변경 시 TTS 엔진 옵션 업데이트 language_selector.change( fn=update_tts_engine_for_korean, inputs=[language_selector], outputs=[tts_selector] ) # 이벤트 연결 convert_btn.click( fn=synthesize_sync, inputs=[url_input, mode_selector, tts_selector, language_selector], outputs=[conversation_output, status_output] ) generate_audio_btn.click( fn=regenerate_audio_sync, inputs=[conversation_output, tts_selector, language_selector], outputs=[status_output, audio_output] ) # Launch the app if __name__ == "__main__": demo.queue(api_open=True, default_concurrency_limit=10).launch( show_api=True, share=False, server_name="0.0.0.0", server_port=7860 )