Spaces:
Running
on
Zero
Running
on
Zero
import gradio as gr | |
import os | |
import asyncio | |
import torch | |
import io | |
import json | |
import re | |
import httpx | |
import tempfile | |
import wave | |
import base64 | |
import numpy as np | |
import soundfile as sf | |
import subprocess | |
import shutil | |
from dataclasses import dataclass | |
from typing import List, Tuple, Dict, Optional | |
from pathlib import Path | |
from threading import Thread | |
from dotenv import load_dotenv | |
# Edge TTS imports | |
import edge_tts | |
from pydub import AudioSegment | |
# OpenAI imports | |
from openai import OpenAI | |
# Transformers imports (for local mode) | |
from transformers import ( | |
AutoModelForCausalLM, | |
AutoTokenizer, | |
TextIteratorStreamer, | |
BitsAndBytesConfig, | |
) | |
# Spark TTS imports | |
try: | |
from huggingface_hub import snapshot_download | |
SPARK_AVAILABLE = True | |
except: | |
SPARK_AVAILABLE = False | |
# MeloTTS imports (for local mode) | |
try: | |
os.system("python -m unidic download") | |
from melo.api import TTS as MeloTTS | |
MELO_AVAILABLE = True | |
except: | |
MELO_AVAILABLE = False | |
load_dotenv() | |
class ConversationConfig: | |
max_words: int = 6000 | |
prefix_url: str = "https://r.jina.ai/" | |
model_name: str = "meta-llama/Meta-Llama-3.1-8B-Instruct-Turbo" | |
local_model_name: str = "NousResearch/Hermes-2-Pro-Llama-3-8B" | |
class UnifiedAudioConverter: | |
def __init__(self, config: ConversationConfig): | |
self.config = config | |
self.llm_client = None | |
self.local_model = None | |
self.tokenizer = None | |
self.melo_models = None | |
self.spark_model_dir = None | |
self.device = "cuda" if torch.cuda.is_available() else "cpu" | |
def initialize_api_mode(self, api_key: str): | |
"""Initialize API mode with Together API""" | |
self.llm_client = OpenAI(api_key=api_key, base_url="https://api.together.xyz/v1") | |
def initialize_local_mode(self): | |
"""Initialize local mode with Hugging Face model""" | |
if self.local_model is None: | |
quantization_config = BitsAndBytesConfig( | |
load_in_4bit=True, | |
bnb_4bit_compute_dtype=torch.float16 | |
) | |
self.local_model = AutoModelForCausalLM.from_pretrained( | |
self.config.local_model_name, | |
quantization_config=quantization_config | |
) | |
self.tokenizer = AutoTokenizer.from_pretrained( | |
self.config.local_model_name, | |
revision='8ab73a6800796d84448bc936db9bac5ad9f984ae' | |
) | |
def initialize_spark_tts(self): | |
"""Initialize Spark TTS model by downloading if needed""" | |
if not SPARK_AVAILABLE: | |
raise RuntimeError("Spark TTS dependencies not available") | |
model_dir = "pretrained_models/Spark-TTS-0.5B" | |
# Check if model exists, if not download it | |
if not os.path.exists(model_dir): | |
print("Downloading Spark-TTS model...") | |
try: | |
os.makedirs("pretrained_models", exist_ok=True) | |
snapshot_download( | |
"SparkAudio/Spark-TTS-0.5B", | |
local_dir=model_dir | |
) | |
print("Spark-TTS model downloaded successfully") | |
except Exception as e: | |
raise RuntimeError(f"Failed to download Spark-TTS model: {e}") | |
self.spark_model_dir = model_dir | |
# Check if we have the CLI inference script | |
if not os.path.exists("cli/inference.py"): | |
print("Warning: Spark-TTS CLI not found. Please clone the Spark-TTS repository.") | |
def initialize_melo_tts(self): | |
"""Initialize MeloTTS models""" | |
if MELO_AVAILABLE and self.melo_models is None: | |
self.melo_models = {"EN": MeloTTS(language="EN", device=self.device)} | |
def fetch_text(self, url: str) -> str: | |
"""Fetch text content from URL""" | |
if not url: | |
raise ValueError("URL cannot be empty") | |
if not url.startswith("http://") and not url.startswith("https://"): | |
raise ValueError("URL must start with 'http://' or 'https://'") | |
full_url = f"{self.config.prefix_url}{url}" | |
try: | |
response = httpx.get(full_url, timeout=60.0) | |
response.raise_for_status() | |
return response.text | |
except httpx.HTTPError as e: | |
raise RuntimeError(f"Failed to fetch URL: {e}") | |
def _build_prompt(self, text: str, language: str = "English") -> str: | |
"""Build prompt for conversation generation""" | |
if language == "Korean": | |
template = """ | |
{ | |
"conversation": [ | |
{"speaker": "", "text": ""}, | |
{"speaker": "", "text": ""} | |
] | |
} | |
""" | |
return ( | |
f"{text}\n\n์ ๊ณต๋ ํ ์คํธ๋ฅผ ๋ ๋ช ์ ์ ๋ฌธ๊ฐ ๊ฐ์ ์งง๊ณ ์ ์ตํ๋ฉฐ ๋ช ํํ " | |
f"ํ์บ์คํธ ๋ํ๋ก ๋ณํํด์ฃผ์ธ์. ํค์ ์ ๋ฌธ์ ์ด๊ณ ๋งค๋ ฅ์ ์ด์ด์ผ ํฉ๋๋ค. " | |
f"๋ค์ ํ์์ ์ค์ํ๊ณ JSON๋ง ๋ฐํํด์ฃผ์ธ์:\n{template}" | |
) | |
else: | |
template = """ | |
{ | |
"conversation": [ | |
{"speaker": "", "text": ""}, | |
{"speaker": "", "text": ""} | |
] | |
} | |
""" | |
return ( | |
f"{text}\n\nConvert the provided text into a short, informative and crisp " | |
f"podcast conversation between two experts. The tone should be " | |
f"professional and engaging. Please adhere to the following " | |
f"format and return ONLY the JSON:\n{template}" | |
) | |
def extract_conversation_api(self, text: str, language: str = "English") -> Dict: | |
"""Extract conversation using API""" | |
if not self.llm_client: | |
raise RuntimeError("API mode not initialized") | |
try: | |
# ์ธ์ด๋ณ ํ๋กฌํํธ ๊ตฌ์ฑ | |
if language == "Korean": | |
system_message = "๋น์ ์ ํ๊ตญ์ด๋ก ํ์บ์คํธ ๋ํ๋ฅผ ์์ฑํ๋ ์ ๋ฌธ๊ฐ์ ๋๋ค. ์์ฐ์ค๋ฝ๊ณ ์ ์ตํ ํ๊ตญ์ด ๋ํ๋ฅผ ๋ง๋ค์ด์ฃผ์ธ์." | |
else: | |
system_message = "You are an expert at creating podcast conversations in English. Create natural and informative English conversations." | |
chat_completion = self.llm_client.chat.completions.create( | |
messages=[ | |
{"role": "system", "content": system_message}, | |
{"role": "user", "content": self._build_prompt(text, language)} | |
], | |
model=self.config.model_name, | |
) | |
pattern = r"\{(?:[^{}]|(?:\{[^{}]*\}))*\}" | |
json_match = re.search(pattern, chat_completion.choices[0].message.content) | |
if not json_match: | |
raise ValueError("No valid JSON found in response") | |
return json.loads(json_match.group()) | |
except Exception as e: | |
raise RuntimeError(f"Failed to extract conversation: {e}") | |
def extract_conversation_local(self, text: str, language: str = "English", progress=None) -> Dict: | |
"""Extract conversation using local model""" | |
if not self.local_model or not self.tokenizer: | |
raise RuntimeError("Local mode not initialized") | |
# ์ธ์ด๋ณ ์์คํ ๋ฉ์์ง | |
if language == "Korean": | |
system_message = "๋น์ ์ ํ๊ตญ์ด๋ก ํ์บ์คํธ ๋ํ๋ฅผ ์์ฑํ๋ ์ ๋ฌธ๊ฐ์ ๋๋ค. ์์ฐ์ค๋ฝ๊ณ ์ ์ตํ ํ๊ตญ์ด ๋ํ๋ฅผ ๋ง๋ค์ด์ฃผ์ธ์." | |
else: | |
system_message = "You are an expert at creating podcast conversations in English. Create natural and informative English conversations." | |
chat = [ | |
{"role": "system", "content": system_message}, | |
{"role": "user", "content": self._build_prompt(text, language)} | |
] | |
terminators = [ | |
self.tokenizer.eos_token_id, | |
self.tokenizer.convert_tokens_to_ids("<|eot_id|>") | |
] | |
messages = self.tokenizer.apply_chat_template( | |
chat, tokenize=False, add_generation_prompt=True | |
) | |
model_inputs = self.tokenizer([messages], return_tensors="pt").to(self.device) | |
streamer = TextIteratorStreamer( | |
self.tokenizer, timeout=10.0, skip_prompt=True, skip_special_tokens=True | |
) | |
generate_kwargs = dict( | |
model_inputs, | |
streamer=streamer, | |
max_new_tokens=4000, | |
do_sample=True, | |
temperature=0.9, | |
eos_token_id=terminators, | |
) | |
t = Thread(target=self.local_model.generate, kwargs=generate_kwargs) | |
t.start() | |
partial_text = "" | |
for new_text in streamer: | |
partial_text += new_text | |
pattern = r"\{(?:[^{}]|(?:\{[^{}]*\}))*\}" | |
json_match = re.search(pattern, partial_text) | |
if json_match: | |
return json.loads(json_match.group()) | |
else: | |
# Return a default template based on language | |
if language == "Korean": | |
return { | |
"conversation": [ | |
{"speaker": "์งํ์", "text": "์๋ ํ์ธ์, ํ์บ์คํธ์ ์ค์ ๊ฒ์ ํ์ํฉ๋๋ค."}, | |
{"speaker": "๊ฒ์คํธ", "text": "์๋ ํ์ธ์, ์ด๋ํด ์ฃผ์ ์ ๊ฐ์ฌํฉ๋๋ค."} | |
] | |
} | |
else: | |
return { | |
"conversation": [ | |
{"speaker": "Host", "text": "Welcome to our podcast."}, | |
{"speaker": "Guest", "text": "Thank you for having me."} | |
] | |
} | |
def parse_conversation_text(self, conversation_text: str) -> Dict: | |
"""Parse conversation text back to JSON format""" | |
lines = conversation_text.strip().split('\n') | |
conversation_data = {"conversation": []} | |
for line in lines: | |
if ':' in line: | |
speaker, text = line.split(':', 1) | |
conversation_data["conversation"].append({ | |
"speaker": speaker.strip(), | |
"text": text.strip() | |
}) | |
return conversation_data | |
async def text_to_speech_edge(self, conversation_json: Dict, language: str = "English") -> Tuple[str, str]: | |
"""Convert text to speech using Edge TTS""" | |
output_dir = Path(self._create_output_directory()) | |
filenames = [] | |
try: | |
# ์ธ์ด๋ณ ์์ฑ ์ค์ | |
if language == "Korean": | |
voices = [ | |
"ko-KR-SunHiNeural", # ์ฌ์ฑ ์์ฑ (์์ฐ์ค๋ฌ์ด ํ๊ตญ์ด) | |
"ko-KR-InJoonNeural" # ๋จ์ฑ ์์ฑ (์์ฐ์ค๋ฌ์ด ํ๊ตญ์ด) | |
] | |
else: | |
voices = [ | |
"en-US-AvaMultilingualNeural", # ์ฌ์ฑ ์์ฑ | |
"en-US-AndrewMultilingualNeural" # ๋จ์ฑ ์์ฑ | |
] | |
for i, turn in enumerate(conversation_json["conversation"]): | |
filename = output_dir / f"output_{i}.wav" | |
voice = voices[i % len(voices)] | |
tmp_path = await self._generate_audio_edge(turn["text"], voice) | |
os.rename(tmp_path, filename) | |
filenames.append(str(filename)) | |
# Combine audio files | |
final_output = os.path.join(output_dir, "combined_output.wav") | |
self._combine_audio_files(filenames, final_output) | |
# Generate conversation text | |
conversation_text = "\n".join( | |
f"{turn.get('speaker', f'Speaker {i+1}')}: {turn['text']}" | |
for i, turn in enumerate(conversation_json["conversation"]) | |
) | |
return final_output, conversation_text | |
except Exception as e: | |
raise RuntimeError(f"Failed to convert text to speech: {e}") | |
async def _generate_audio_edge(self, text: str, voice: str) -> str: | |
"""Generate audio using Edge TTS""" | |
if not text.strip(): | |
raise ValueError("Text cannot be empty") | |
voice_short_name = voice.split(" - ")[0] if " - " in voice else voice | |
communicate = edge_tts.Communicate(text, voice_short_name) | |
with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as tmp_file: | |
tmp_path = tmp_file.name | |
await communicate.save(tmp_path) | |
return tmp_path | |
def text_to_speech_spark(self, conversation_json: Dict, language: str = "English", progress=None) -> Tuple[str, str]: | |
"""Convert text to speech using Spark TTS CLI""" | |
if not SPARK_AVAILABLE or not self.spark_model_dir: | |
raise RuntimeError("Spark TTS not available") | |
try: | |
output_dir = self._create_output_directory() | |
audio_files = [] | |
# Create different voice characteristics for different speakers | |
if language == "Korean": | |
voice_configs = [ | |
{"prompt_text": "์๋ ํ์ธ์, ์ค๋ ํ์บ์คํธ ์งํ์ ๋งก์ ์งํ์์ ๋๋ค.", "gender": "female"}, | |
{"prompt_text": "์๋ ํ์ธ์, ์ค๋ ๊ฒ์คํธ๋ก ์ฐธ์ฌํ๊ฒ ๋์ด ๊ธฐ์ฉ๋๋ค.", "gender": "male"} | |
] | |
else: | |
voice_configs = [ | |
{"prompt_text": "Hello, welcome to our podcast. I'm your host today.", "gender": "female"}, | |
{"prompt_text": "Thank you for having me. I'm excited to be here.", "gender": "male"} | |
] | |
for i, turn in enumerate(conversation_json["conversation"]): | |
text = turn["text"] | |
if not text.strip(): | |
continue | |
# Use different voice config for each speaker | |
voice_config = voice_configs[i % len(voice_configs)] | |
output_file = os.path.join(output_dir, f"spark_output_{i}.wav") | |
# Run Spark TTS CLI inference | |
cmd = [ | |
"python", "-m", "cli.inference", | |
"--text", text, | |
"--device", "0" if torch.cuda.is_available() else "cpu", | |
"--save_dir", output_dir, | |
"--model_dir", self.spark_model_dir, | |
"--prompt_text", voice_config["prompt_text"], | |
"--output_name", f"spark_output_{i}.wav" | |
] | |
try: | |
# Run the command | |
result = subprocess.run( | |
cmd, | |
capture_output=True, | |
text=True, | |
timeout=60, | |
cwd="." # Make sure we're in the right directory | |
) | |
if result.returncode == 0: | |
audio_files.append(output_file) | |
else: | |
print(f"Spark TTS error for turn {i}: {result.stderr}") | |
# Create a short silence as fallback | |
silence = np.zeros(int(22050 * 1.0)) # 1 second of silence | |
sf.write(output_file, silence, 22050) | |
audio_files.append(output_file) | |
except subprocess.TimeoutExpired: | |
print(f"Spark TTS timeout for turn {i}") | |
# Create silence as fallback | |
silence = np.zeros(int(22050 * 1.0)) | |
sf.write(output_file, silence, 22050) | |
audio_files.append(output_file) | |
except Exception as e: | |
print(f"Error running Spark TTS for turn {i}: {e}") | |
# Create silence as fallback | |
silence = np.zeros(int(22050 * 1.0)) | |
sf.write(output_file, silence, 22050) | |
audio_files.append(output_file) | |
# Combine all audio files | |
if audio_files: | |
final_output = os.path.join(output_dir, "spark_combined.wav") | |
self._combine_audio_files(audio_files, final_output) | |
else: | |
raise RuntimeError("No audio files generated") | |
# Generate conversation text | |
conversation_text = "\n".join( | |
f"{turn.get('speaker', f'Speaker {i+1}')}: {turn['text']}" | |
for i, turn in enumerate(conversation_json["conversation"]) | |
) | |
return final_output, conversation_text | |
except Exception as e: | |
raise RuntimeError(f"Failed to convert text to speech with Spark TTS: {e}") | |
def text_to_speech_melo(self, conversation_json: Dict, progress=None) -> Tuple[str, str]: | |
"""Convert text to speech using MeloTTS""" | |
if not MELO_AVAILABLE or not self.melo_models: | |
raise RuntimeError("MeloTTS not available") | |
speakers = ["EN-Default", "EN-US"] | |
combined_audio = AudioSegment.empty() | |
for i, turn in enumerate(conversation_json["conversation"]): | |
bio = io.BytesIO() | |
text = turn["text"] | |
speaker = speakers[i % 2] | |
speaker_id = self.melo_models["EN"].hps.data.spk2id[speaker] | |
# Generate audio | |
self.melo_models["EN"].tts_to_file( | |
text, speaker_id, bio, speed=1.0, | |
pbar=progress.tqdm if progress else None, | |
format="wav" | |
) | |
bio.seek(0) | |
audio_segment = AudioSegment.from_file(bio, format="wav") | |
combined_audio += audio_segment | |
# Save final audio | |
final_audio_path = "melo_podcast.mp3" | |
combined_audio.export(final_audio_path, format="mp3") | |
# Generate conversation text | |
conversation_text = "\n".join( | |
f"{turn.get('speaker', f'Speaker {i+1}')}: {turn['text']}" | |
for i, turn in enumerate(conversation_json["conversation"]) | |
) | |
return final_audio_path, conversation_text | |
def _create_output_directory(self) -> str: | |
"""Create a unique output directory""" | |
random_bytes = os.urandom(8) | |
folder_name = base64.urlsafe_b64encode(random_bytes).decode("utf-8") | |
os.makedirs(folder_name, exist_ok=True) | |
return folder_name | |
def _combine_audio_files(self, filenames: List[str], output_file: str) -> None: | |
"""Combine multiple audio files into one""" | |
if not filenames: | |
raise ValueError("No input files provided") | |
try: | |
audio_segments = [] | |
for filename in filenames: | |
if os.path.exists(filename): | |
audio_segment = AudioSegment.from_file(filename) | |
audio_segments.append(audio_segment) | |
if audio_segments: | |
combined = sum(audio_segments) | |
combined.export(output_file, format="wav") | |
# Clean up temporary files | |
for filename in filenames: | |
if os.path.exists(filename): | |
os.remove(filename) | |
except Exception as e: | |
raise RuntimeError(f"Failed to combine audio files: {e}") | |
# Global converter instance | |
converter = UnifiedAudioConverter(ConversationConfig()) | |
async def synthesize(article_url: str, mode: str = "API", tts_engine: str = "Edge-TTS", language: str = "English"): | |
"""Main synthesis function""" | |
if not article_url: | |
return "Please provide a valid URL.", None | |
try: | |
# Fetch text from URL | |
text = converter.fetch_text(article_url) | |
# Limit text to max words | |
words = text.split() | |
if len(words) > converter.config.max_words: | |
text = " ".join(words[:converter.config.max_words]) | |
# Extract conversation based on mode | |
if mode == "API": | |
api_key = os.environ.get("TOGETHER_API_KEY") | |
if not api_key: | |
return "API key not found. Please set TOGETHER_API_KEY environment variable.", None | |
converter.initialize_api_mode(api_key) | |
conversation_json = converter.extract_conversation_api(text, language) | |
else: # Local mode | |
converter.initialize_local_mode() | |
conversation_json = converter.extract_conversation_local(text, language) | |
# Generate conversation text | |
conversation_text = "\n".join( | |
f"{turn.get('speaker', f'Speaker {i+1}')}: {turn['text']}" | |
for i, turn in enumerate(conversation_json["conversation"]) | |
) | |
return conversation_text, None | |
except Exception as e: | |
return f"Error: {str(e)}", None | |
async def regenerate_audio(conversation_text: str, tts_engine: str = "Edge-TTS", language: str = "English"): | |
"""Regenerate audio from edited conversation text""" | |
if not conversation_text.strip(): | |
return "Please provide conversation text.", None | |
try: | |
# Parse the conversation text back to JSON format | |
conversation_json = converter.parse_conversation_text(conversation_text) | |
if not conversation_json["conversation"]: | |
return "No valid conversation found in the text.", None | |
# ํ๊ตญ์ด์ธ ๊ฒฝ์ฐ Edge-TTS๋ง ์ฌ์ฉ (๋ค๋ฅธ TTS๋ ํ๊ตญ์ด ์ง์์ด ์ ํ์ ) | |
if language == "Korean" and tts_engine != "Edge-TTS": | |
return "ํ๊ตญ์ด๋ Edge-TTS๋ง ์ง์๋ฉ๋๋ค. TTS ์์ง์ด ์๋์ผ๋ก Edge-TTS๋ก ๋ณ๊ฒฝ๋ฉ๋๋ค.", None | |
# Generate audio based on TTS engine | |
if tts_engine == "Edge-TTS": | |
output_file, _ = await converter.text_to_speech_edge(conversation_json, language) | |
elif tts_engine == "Spark-TTS": | |
if not SPARK_AVAILABLE: | |
return "Spark TTS not available. Please install required dependencies and clone the Spark-TTS repository.", None | |
converter.initialize_spark_tts() | |
output_file, _ = converter.text_to_speech_spark(conversation_json, language) | |
else: # MeloTTS | |
if not MELO_AVAILABLE: | |
return "MeloTTS not available. Please install required dependencies.", None | |
if language == "Korean": | |
return "MeloTTS does not support Korean. Please use Edge-TTS for Korean.", None | |
converter.initialize_melo_tts() | |
output_file, _ = converter.text_to_speech_melo(conversation_json) | |
return "Audio generated successfully!", output_file | |
except Exception as e: | |
return f"Error generating audio: {str(e)}", None | |
def synthesize_sync(article_url: str, mode: str = "API", tts_engine: str = "Edge-TTS", language: str = "English"): | |
"""Synchronous wrapper for async synthesis""" | |
return asyncio.run(synthesize(article_url, mode, tts_engine, language)) | |
def regenerate_audio_sync(conversation_text: str, tts_engine: str = "Edge-TTS", language: str = "English"): | |
"""Synchronous wrapper for async audio regeneration""" | |
return asyncio.run(regenerate_audio(conversation_text, tts_engine, language)) | |
def update_tts_engine_for_korean(language): | |
"""ํ๊ตญ์ด ์ ํ ์ TTS ์์ง ์ต์ ์ ๋ฐ์ดํธ""" | |
if language == "Korean": | |
return gr.Radio( | |
choices=["Edge-TTS"], | |
value="Edge-TTS", | |
label="TTS Engine", | |
info="ํ๊ตญ์ด๋ Edge-TTS๋ง ์ง์๋ฉ๋๋ค", | |
interactive=False | |
) | |
else: | |
return gr.Radio( | |
choices=["Edge-TTS", "Spark-TTS", "MeloTTS"], | |
value="Edge-TTS", | |
label="TTS Engine", | |
info="Edge-TTS: Cloud-based, natural voices | Spark-TTS: Local AI model | MeloTTS: Local, requires GPU", | |
interactive=True | |
) | |
# Gradio Interface | |
with gr.Blocks(theme='soft', title="URL to Podcast Converter") as demo: | |
gr.Markdown("# ๐๏ธ URL to Podcast Converter") | |
gr.Markdown("Convert any article, blog, or news into an engaging podcast conversation!") | |
with gr.Row(): | |
with gr.Column(scale=3): | |
url_input = gr.Textbox( | |
label="Article URL", | |
placeholder="Enter the article URL here...", | |
value="" | |
) | |
with gr.Column(scale=1): | |
# ์ธ์ด ์ ํ ์ถ๊ฐ | |
language_selector = gr.Radio( | |
choices=["English", "Korean"], | |
value="English", | |
label="Language / ์ธ์ด", | |
info="Select output language / ์ถ๋ ฅ ์ธ์ด๋ฅผ ์ ํํ์ธ์" | |
) | |
mode_selector = gr.Radio( | |
choices=["API", "Local"], | |
value="API", | |
label="Processing Mode", | |
info="API: Faster, requires API key | Local: Slower, runs on device" | |
) | |
# TTS ์์ง ์ ํ | |
with gr.Group(): | |
gr.Markdown("### TTS Engine Selection") | |
tts_selector = gr.Radio( | |
choices=["Edge-TTS", "Spark-TTS", "MeloTTS"], | |
value="Edge-TTS", | |
label="TTS Engine", | |
info="Edge-TTS: Cloud-based, natural voices | Spark-TTS: Local AI model | MeloTTS: Local, requires GPU" | |
) | |
gr.Markdown(""" | |
**Recommended:** | |
- ๐ **Edge-TTS**: Best quality, cloud-based, instant setup | |
- ๐ค **Spark-TTS**: Local AI model (0.5B), zero-shot voice cloning | |
**Additional Option:** | |
- โก **MeloTTS**: Local processing, GPU recommended | |
**ํ๊ตญ์ด ์ง์:** | |
- ๐ฐ๐ท ํ๊ตญ์ด ์ ํ ์ Edge-TTS๋ง ์ฌ์ฉ ๊ฐ๋ฅํฉ๋๋ค | |
""") | |
convert_btn = gr.Button("๐ฏ Generate Conversation / ๋ํ ์์ฑ", variant="primary", size="lg") | |
with gr.Row(): | |
with gr.Column(): | |
conversation_output = gr.Textbox( | |
label="Generated Conversation (Editable) / ์์ฑ๋ ๋ํ (ํธ์ง ๊ฐ๋ฅ)", | |
lines=15, | |
max_lines=30, | |
interactive=True, | |
placeholder="Generated conversation will appear here. You can edit it before generating audio.\n์์ฑ๋ ๋ํ๊ฐ ์ฌ๊ธฐ์ ํ์๋ฉ๋๋ค. ์ค๋์ค ์์ฑ ์ ์ ํธ์งํ ์ ์์ต๋๋ค.", | |
info="Edit the conversation as needed. Format: 'Speaker Name: Text' / ํ์์ ๋ฐ๋ผ ๋ํ๋ฅผ ํธ์งํ์ธ์. ํ์: 'ํ์ ์ด๋ฆ: ํ ์คํธ'" | |
) | |
# ์ค๋์ค ์์ฑ ๋ฒํผ ์ถ๊ฐ | |
with gr.Row(): | |
generate_audio_btn = gr.Button("๐๏ธ Generate Audio from Text / ํ ์คํธ์์ ์ค๋์ค ์์ฑ", variant="secondary", size="lg") | |
gr.Markdown("*Edit the conversation above, then click to generate audio / ์์ ๋ํ๋ฅผ ํธ์งํ ํ ํด๋ฆญํ์ฌ ์ค๋์ค๋ฅผ ์์ฑํ์ธ์*") | |
with gr.Column(): | |
audio_output = gr.Audio( | |
label="Podcast Audio / ํ์บ์คํธ ์ค๋์ค", | |
type="filepath", | |
interactive=False | |
) | |
# ์ํ ๋ฉ์์ง ์ถ๊ฐ | |
status_output = gr.Textbox( | |
label="Status / ์ํ", | |
interactive=False, | |
visible=True | |
) | |
# TTS ์์ง๋ณ ์ค๋ช ๋ฐ ์ค์น ์๋ด ์ถ๊ฐ | |
with gr.Row(): | |
gr.Markdown(""" | |
### TTS Engine Details / TTS ์์ง ์์ธ์ ๋ณด: | |
- **Edge-TTS**: Microsoft's cloud TTS service with high-quality natural voices. Requires internet connection. | |
- ๐ฐ๐ท **ํ๊ตญ์ด ์ง์**: ์์ฐ์ค๋ฌ์ด ํ๊ตญ์ด ์์ฑ (์ฌ์ฑ: SunHi, ๋จ์ฑ: InJoon) | |
- **Spark-TTS**: SparkAudio's local AI model (0.5B parameters) with zero-shot voice cloning capability. | |
- **Setup required**: Clone [Spark-TTS repository](https://github.com/SparkAudio/Spark-TTS) in current directory | |
- Features: Bilingual support (Chinese/English), controllable speech generation | |
- License: CC BY-NC-SA (Non-commercial use only) | |
- โ ๏ธ **ํ๊ตญ์ด ๋ฏธ์ง์** | |
- **MeloTTS**: Local TTS with multiple voice options. GPU recommended for better performance. | |
- โ ๏ธ **ํ๊ตญ์ด ๋ฏธ์ง์** | |
### Spark-TTS Setup Instructions: | |
```bash | |
git clone https://github.com/SparkAudio/Spark-TTS.git | |
cd Spark-TTS | |
pip install -r requirements.txt | |
``` | |
""") | |
gr.Examples( | |
examples=[ | |
["https://huggingface.co/blog/openfree/cycle-navigator", "API", "Edge-TTS", "English"], | |
["https://www.bbc.com/news/technology-67988517", "API", "Spark-TTS", "English"], | |
["https://arxiv.org/abs/2301.00810", "API", "Edge-TTS", "Korean"], | |
], | |
inputs=[url_input, mode_selector, tts_selector, language_selector], | |
outputs=[conversation_output, status_output], | |
fn=synthesize_sync, | |
cache_examples=False, | |
) | |
# ์ธ์ด ๋ณ๊ฒฝ ์ TTS ์์ง ์ต์ ์ ๋ฐ์ดํธ | |
language_selector.change( | |
fn=update_tts_engine_for_korean, | |
inputs=[language_selector], | |
outputs=[tts_selector] | |
) | |
# ์ด๋ฒคํธ ์ฐ๊ฒฐ | |
convert_btn.click( | |
fn=synthesize_sync, | |
inputs=[url_input, mode_selector, tts_selector, language_selector], | |
outputs=[conversation_output, status_output] | |
) | |
generate_audio_btn.click( | |
fn=regenerate_audio_sync, | |
inputs=[conversation_output, tts_selector, language_selector], | |
outputs=[status_output, audio_output] | |
) | |
# Launch the app | |
if __name__ == "__main__": | |
demo.queue(api_open=True, default_concurrency_limit=10).launch( | |
show_api=True, | |
share=False, | |
server_name="0.0.0.0", | |
server_port=7860 | |
) |