AI-Podcast / app.py
openfree's picture
Update app.py
a7f45e8 verified
raw
history blame
67.1 kB
import spaces
import gradio as gr
import os
import asyncio
import torch
import io
import json
import re
import httpx
import tempfile
import wave
import base64
import numpy as np
import soundfile as sf
import subprocess
import shutil
import requests
import logging
from datetime import datetime, timedelta
from typing import List, Tuple, Dict, Optional
from pathlib import Path
from threading import Thread
from dotenv import load_dotenv
# PDF processing imports
from langchain_community.document_loaders import PyPDFLoader
# Edge TTS imports
import edge_tts
from pydub import AudioSegment
# OpenAI imports
from openai import OpenAI
# Transformers imports (for legacy local mode)
from transformers import (
AutoModelForCausalLM,
AutoTokenizer,
TextIteratorStreamer,
BitsAndBytesConfig,
)
# Llama CPP imports (for new local mode)
try:
from llama_cpp import Llama
from llama_cpp_agent import LlamaCppAgent, MessagesFormatterType
from llama_cpp_agent.providers import LlamaCppPythonProvider
from llama_cpp_agent.chat_history import BasicChatHistory
from llama_cpp_agent.chat_history.messages import Roles
from huggingface_hub import hf_hub_download
LLAMA_CPP_AVAILABLE = True
except ImportError:
LLAMA_CPP_AVAILABLE = False
# Spark TTS imports
try:
from huggingface_hub import snapshot_download
SPARK_AVAILABLE = True
except:
SPARK_AVAILABLE = False
# MeloTTS imports (for local mode)
try:
# unidic ๋‹ค์šด๋กœ๋“œ๋ฅผ ์กฐ๊ฑด๋ถ€๋กœ ์ฒ˜๋ฆฌ
if not os.path.exists("/usr/local/lib/python3.10/site-packages/unidic"):
try:
os.system("python -m unidic download")
except:
pass
from melo.api import TTS as MeloTTS
MELO_AVAILABLE = True
except:
MELO_AVAILABLE = False
# Import config and prompts
from config_prompts import (
ConversationConfig,
PromptBuilder,
DefaultConversations,
EDGE_TTS_ONLY_LANGUAGES,
EDGE_TTS_VOICES
)
load_dotenv()
# Brave Search API ์„ค์ •
BRAVE_KEY = os.getenv("BSEARCH_API")
BRAVE_ENDPOINT = "https://api.search.brave.com/res/v1/web/search"
def brave_search(query: str, count: int = 8, freshness_days: int | None = None):
"""Brave Search API๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ์ตœ์‹  ์ •๋ณด ๊ฒ€์ƒ‰"""
if not BRAVE_KEY:
return []
params = {"q": query, "count": str(count)}
if freshness_days:
dt_from = (datetime.utcnow() - timedelta(days=freshness_days)).strftime("%Y-%m-%d")
params["freshness"] = dt_from
try:
r = requests.get(
BRAVE_ENDPOINT,
headers={"Accept": "application/json", "X-Subscription-Token": BRAVE_KEY},
params=params,
timeout=15
)
raw = r.json().get("web", {}).get("results") or []
return [{
"title": r.get("title", ""),
"url": r.get("url", r.get("link", "")),
"snippet": r.get("description", r.get("text", "")),
"host": re.sub(r"https?://(www\.)?", "", r.get("url", "")).split("/")[0]
} for r in raw[:count]]
except Exception as e:
logging.error(f"Brave search error: {e}")
return []
def format_search_results(query: str, for_keyword: bool = False) -> str:
"""๊ฒ€์ƒ‰ ๊ฒฐ๊ณผ๋ฅผ ํฌ๋งทํŒ…ํ•˜์—ฌ ๋ฐ˜ํ™˜"""
# ํ‚ค์›Œ๋“œ ๊ฒ€์ƒ‰์˜ ๊ฒฝ์šฐ ๋” ๋งŽ์€ ๊ฒฐ๊ณผ ์‚ฌ์šฉ
count = 5 if for_keyword else 3
rows = brave_search(query, count, freshness_days=7 if not for_keyword else None)
if not rows:
return ""
results = []
# ํ‚ค์›Œ๋“œ ๊ฒ€์ƒ‰์˜ ๊ฒฝ์šฐ ๋” ์ƒ์„ธํ•œ ์ •๋ณด ํฌํ•จ
max_results = 4 if for_keyword else 2
for r in rows[:max_results]:
if for_keyword:
# ํ‚ค์›Œ๋“œ ๊ฒ€์ƒ‰์€ ๋” ๊ธด ์Šค๋‹ˆํŽซ ์‚ฌ์šฉ
snippet = r['snippet'][:200] + "..." if len(r['snippet']) > 200 else r['snippet']
results.append(f"**{r['title']}**\n{snippet}\nSource: {r['host']}")
else:
# ์ผ๋ฐ˜ ๊ฒ€์ƒ‰์€ ์งง์€ ์Šค๋‹ˆํŽซ
snippet = r['snippet'][:100] + "..." if len(r['snippet']) > 100 else r['snippet']
results.append(f"- {r['title']}: {snippet}")
return "\n\n".join(results) + "\n"
def extract_keywords_for_search(text: str, language: str = "English") -> List[str]:
"""ํ…์ŠคํŠธ์—์„œ ๊ฒ€์ƒ‰ํ•  ํ‚ค์›Œ๋“œ ์ถ”์ถœ (๊ฐœ์„ )"""
# ํ…์ŠคํŠธ ์•ž๋ถ€๋ถ„๋งŒ ์‚ฌ์šฉ (๋„ˆ๋ฌด ๋งŽ์€ ํ…์ŠคํŠธ ์ฒ˜๋ฆฌ ๋ฐฉ์ง€)
text_sample = text[:500]
if language == "Korean":
import re
# ํ•œ๊ตญ์–ด ๋ช…์‚ฌ ์ถ”์ถœ (2๊ธ€์ž ์ด์ƒ)
keywords = re.findall(r'[๊ฐ€-ํžฃ]{2,}', text_sample)
# ์ค‘๋ณต ์ œ๊ฑฐํ•˜๊ณ  ๊ฐ€์žฅ ๊ธด ๋‹จ์–ด 1๊ฐœ๋งŒ ์„ ํƒ
unique_keywords = list(dict.fromkeys(keywords))
# ๊ธธ์ด ์ˆœ์œผ๋กœ ์ •๋ ฌํ•˜๊ณ  ๊ฐ€์žฅ ์˜๋ฏธ์žˆ์„ ๊ฒƒ ๊ฐ™์€ ๋‹จ์–ด ์„ ํƒ
unique_keywords.sort(key=len, reverse=True)
return unique_keywords[:1] # 1๊ฐœ๋งŒ ๋ฐ˜ํ™˜
else:
# ์˜์–ด๋Š” ๋Œ€๋ฌธ์ž๋กœ ์‹œ์ž‘ํ•˜๋Š” ๋‹จ์–ด ์ค‘ ๊ฐ€์žฅ ๊ธด ๊ฒƒ 1๊ฐœ
words = text_sample.split()
keywords = [word.strip('.,!?;:') for word in words
if len(word) > 4 and word[0].isupper()]
if keywords:
return [max(keywords, key=len)] # ๊ฐ€์žฅ ๊ธด ๋‹จ์–ด 1๊ฐœ
return []
def search_and_compile_content(keyword: str, language: str = "English") -> str:
"""ํ‚ค์›Œ๋“œ๋กœ ๊ฒ€์ƒ‰ํ•˜์—ฌ ์ถฉ๋ถ„ํ•œ ์ฝ˜ํ…์ธ  ์ปดํŒŒ์ผ"""
if not BRAVE_KEY:
# API ์—†์„ ๋•Œ๋„ ๊ธฐ๋ณธ ์ฝ˜ํ…์ธ  ์ƒ์„ฑ
if language == "Korean":
return f"""
'{keyword}'์— ๋Œ€ํ•œ ์ข…ํ•ฉ์ ์ธ ์ •๋ณด:
{keyword}๋Š” ํ˜„๋Œ€ ์‚ฌํšŒ์—์„œ ๋งค์šฐ ์ค‘์š”ํ•œ ์ฃผ์ œ์ž…๋‹ˆ๋‹ค.
์ด ์ฃผ์ œ๋Š” ๋‹ค์–‘ํ•œ ์ธก๋ฉด์—์„œ ์šฐ๋ฆฌ์˜ ์‚ถ์— ์˜ํ–ฅ์„ ๋ฏธ์น˜๊ณ  ์žˆ์œผ๋ฉฐ,
์ตœ๊ทผ ๋“ค์–ด ๋”์šฑ ์ฃผ๋ชฉ๋ฐ›๊ณ  ์žˆ์Šต๋‹ˆ๋‹ค.
์ฃผ์š” ํŠน์ง•:
1. ๊ธฐ์ˆ ์  ๋ฐœ์ „๊ณผ ํ˜์‹ 
2. ์‚ฌํšŒ์  ์˜ํ–ฅ๊ณผ ๋ณ€ํ™”
3. ๋ฏธ๋ž˜ ์ „๋ง๊ณผ ๊ฐ€๋Šฅ์„ฑ
4. ์‹ค์šฉ์  ํ™œ์šฉ ๋ฐฉ์•ˆ
5. ๊ธ€๋กœ๋ฒŒ ํŠธ๋ Œ๋“œ์™€ ๋™ํ–ฅ
์ „๋ฌธ๊ฐ€๋“ค์€ {keyword}๊ฐ€ ์•ž์œผ๋กœ ๋”์šฑ ์ค‘์š”ํ•ด์งˆ ๊ฒƒ์œผ๋กœ ์˜ˆ์ƒํ•˜๊ณ  ์žˆ์œผ๋ฉฐ,
์ด์— ๋Œ€ํ•œ ๊นŠ์ด ์žˆ๋Š” ์ดํ•ด๊ฐ€ ํ•„์š”ํ•œ ์‹œ์ ์ž…๋‹ˆ๋‹ค.
"""
else:
return f"""
Comprehensive information about '{keyword}':
{keyword} is a significant topic in modern society.
This subject impacts our lives in various ways and has been
gaining increasing attention recently.
Key aspects:
1. Technological advancement and innovation
2. Social impact and changes
3. Future prospects and possibilities
4. Practical applications
5. Global trends and developments
Experts predict that {keyword} will become even more important,
and it's crucial to develop a deep understanding of this topic.
"""
# ์–ธ์–ด์— ๋”ฐ๋ฅธ ๋‹ค์–‘ํ•œ ๊ฒ€์ƒ‰ ์ฟผ๋ฆฌ
if language == "Korean":
queries = [
f"{keyword} ์ตœ์‹  ๋‰ด์Šค 2024",
f"{keyword} ์ •๋ณด ์„ค๋ช…",
f"{keyword} ํŠธ๋ Œ๋“œ ์ „๋ง",
f"{keyword} ์žฅ์  ๋‹จ์ ",
f"{keyword} ํ™œ์šฉ ๋ฐฉ๋ฒ•",
f"{keyword} ์ „๋ฌธ๊ฐ€ ์˜๊ฒฌ"
]
else:
queries = [
f"{keyword} latest news 2024",
f"{keyword} explained comprehensive",
f"{keyword} trends forecast",
f"{keyword} advantages disadvantages",
f"{keyword} how to use",
f"{keyword} expert opinions"
]
all_content = []
total_content_length = 0
for query in queries:
results = brave_search(query, count=5) # ๋” ๋งŽ์€ ๊ฒฐ๊ณผ ๊ฐ€์ ธ์˜ค๊ธฐ
for r in results[:3]: # ๊ฐ ์ฟผ๋ฆฌ๋‹น ์ƒ์œ„ 3๊ฐœ
content = f"**{r['title']}**\n{r['snippet']}\nSource: {r['host']}\n"
all_content.append(content)
total_content_length += len(r['snippet'])
# ์ฝ˜ํ…์ธ ๊ฐ€ ๋ถ€์กฑํ•˜๋ฉด ์ถ”๊ฐ€ ์ƒ์„ฑ
if total_content_length < 1000: # ์ตœ์†Œ 1000์ž ํ™•๋ณด
if language == "Korean":
additional_content = f"""
์ถ”๊ฐ€ ์ •๋ณด:
{keyword}์™€ ๊ด€๋ จ๋œ ์ตœ๊ทผ ๋™ํ–ฅ์„ ์‚ดํŽด๋ณด๋ฉด, ์ด ๋ถ„์•ผ๋Š” ๋น ๋ฅด๊ฒŒ ๋ฐœ์ „ํ•˜๊ณ  ์žˆ์Šต๋‹ˆ๋‹ค.
๋งŽ์€ ์ „๋ฌธ๊ฐ€๋“ค์ด ์ด ์ฃผ์ œ์— ๋Œ€ํ•ด ํ™œ๋ฐœํžˆ ์—ฐ๊ตฌํ•˜๊ณ  ์žˆ์œผ๋ฉฐ,
์‹ค์ƒํ™œ์—์„œ์˜ ์‘์šฉ ๊ฐ€๋Šฅ์„ฑ๋„ ๊ณ„์† ํ™•๋Œ€๋˜๊ณ  ์žˆ์Šต๋‹ˆ๋‹ค.
ํŠนํžˆ ์ฃผ๋ชฉํ•  ์ ์€:
- ๊ธฐ์ˆ  ํ˜์‹ ์˜ ๊ฐ€์†ํ™”
- ์‚ฌ์šฉ์ž ๊ฒฝํ—˜์˜ ๊ฐœ์„ 
- ์ ‘๊ทผ์„ฑ์˜ ํ–ฅ์ƒ
- ๋น„์šฉ ํšจ์œจ์„ฑ ์ฆ๋Œ€
- ๊ธ€๋กœ๋ฒŒ ์‹œ์žฅ์˜ ์„ฑ์žฅ
์ด๋Ÿฌํ•œ ์š”์†Œ๋“ค์ด {keyword}์˜ ๋ฏธ๋ž˜๋ฅผ ๋”์šฑ ๋ฐ๊ฒŒ ๋งŒ๋“ค๊ณ  ์žˆ์Šต๋‹ˆ๋‹ค.
"""
else:
additional_content = f"""
Additional insights:
Recent developments in {keyword} show rapid advancement in this field.
Many experts are actively researching this topic, and its practical
applications continue to expand.
Key points to note:
- Accelerating technological innovation
- Improving user experience
- Enhanced accessibility
- Increased cost efficiency
- Growing global market
These factors are making the future of {keyword} increasingly promising.
"""
all_content.append(additional_content)
# ์ปดํŒŒ์ผ๋œ ์ฝ˜ํ…์ธ  ๋ฐ˜ํ™˜
compiled = "\n\n".join(all_content)
# ํ‚ค์›Œ๋“œ ๊ธฐ๋ฐ˜ ์†Œ๊ฐœ
if language == "Korean":
intro = f"### '{keyword}'์— ๋Œ€ํ•œ ์ข…ํ•ฉ์ ์ธ ์ •๋ณด์™€ ์ตœ์‹  ๋™ํ–ฅ:\n\n"
else:
intro = f"### Comprehensive information and latest trends about '{keyword}':\n\n"
return intro + compiled
class UnifiedAudioConverter:
def __init__(self, config: ConversationConfig):
self.config = config
self.llm_client = None
self.legacy_local_model = None
self.legacy_tokenizer = None
# ์ƒˆ๋กœ์šด ๋กœ์ปฌ LLM ๊ด€๋ จ
self.local_llm = None
self.local_llm_model = None
self.melo_models = None
self.spark_model_dir = None
self.device = "cuda" if torch.cuda.is_available() else "cpu"
# ํ”„๋กฌํ”„ํŠธ ๋นŒ๋” ์ถ”๊ฐ€
self.prompt_builder = PromptBuilder()
def initialize_api_mode(self, api_key: str):
"""Initialize API mode with Together API"""
self.llm_client = OpenAI(api_key=api_key, base_url="https://api.together.xyz/v1")
@spaces.GPU(duration=120)
def initialize_local_mode(self):
"""Initialize new local mode with Llama CPP"""
if not LLAMA_CPP_AVAILABLE:
raise RuntimeError("Llama CPP dependencies not available. Please install llama-cpp-python and llama-cpp-agent.")
if self.local_llm is None or self.local_llm_model != self.config.local_model_name:
try:
# ๋ชจ๋ธ ๋‹ค์šด๋กœ๋“œ
model_path = hf_hub_download(
repo_id=self.config.local_model_repo,
filename=self.config.local_model_name,
local_dir="./models"
)
model_path_local = os.path.join("./models", self.config.local_model_name)
if not os.path.exists(model_path_local):
raise RuntimeError(f"Model file not found at {model_path_local}")
# Llama ๋ชจ๋ธ ์ดˆ๊ธฐํ™”
self.local_llm = Llama(
model_path=model_path_local,
flash_attn=True,
n_gpu_layers=81 if torch.cuda.is_available() else 0,
n_batch=1024,
n_ctx=16384,
)
self.local_llm_model = self.config.local_model_name
print(f"Local LLM initialized: {model_path_local}")
except Exception as e:
print(f"Failed to initialize local LLM: {e}")
raise RuntimeError(f"Failed to initialize local LLM: {e}")
@spaces.GPU(duration=60)
def initialize_legacy_local_mode(self):
"""Initialize legacy local mode with Hugging Face model (fallback)"""
if self.legacy_local_model is None:
quantization_config = BitsAndBytesConfig(
load_in_4bit=True,
bnb_4bit_compute_dtype=torch.float16
)
self.legacy_local_model = AutoModelForCausalLM.from_pretrained(
self.config.legacy_local_model_name,
quantization_config=quantization_config
)
self.legacy_tokenizer = AutoTokenizer.from_pretrained(
self.config.legacy_local_model_name,
revision='8ab73a6800796d84448bc936db9bac5ad9f984ae'
)
def initialize_spark_tts(self):
"""Initialize Spark TTS model by downloading if needed"""
if not SPARK_AVAILABLE:
raise RuntimeError("Spark TTS dependencies not available")
model_dir = "pretrained_models/Spark-TTS-0.5B"
# Check if model exists, if not download it
if not os.path.exists(model_dir):
print("Downloading Spark-TTS model...")
try:
os.makedirs("pretrained_models", exist_ok=True)
snapshot_download(
"SparkAudio/Spark-TTS-0.5B",
local_dir=model_dir
)
print("Spark-TTS model downloaded successfully")
except Exception as e:
raise RuntimeError(f"Failed to download Spark-TTS model: {e}")
self.spark_model_dir = model_dir
# Check if we have the CLI inference script
if not os.path.exists("cli/inference.py"):
print("Warning: Spark-TTS CLI not found. Please clone the Spark-TTS repository.")
@spaces.GPU(duration=60)
def initialize_melo_tts(self):
"""Initialize MeloTTS models"""
if MELO_AVAILABLE and self.melo_models is None:
self.melo_models = {"EN": MeloTTS(language="EN", device=self.device)}
def fetch_text(self, url: str) -> str:
"""Fetch text content from URL"""
if not url:
raise ValueError("URL cannot be empty")
if not url.startswith("http://") and not url.startswith("https://"):
raise ValueError("URL must start with 'http://' or 'https://'")
full_url = f"{self.config.prefix_url}{url}"
try:
response = httpx.get(full_url, timeout=60.0)
response.raise_for_status()
return response.text
except httpx.HTTPError as e:
raise RuntimeError(f"Failed to fetch URL: {e}")
def extract_text_from_pdf(self, pdf_file) -> str:
"""Extract text content from PDF file"""
try:
# Gradio returns file path, not file object
if isinstance(pdf_file, str):
pdf_path = pdf_file
else:
# If it's a file object (shouldn't happen with Gradio)
with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tmp_file:
tmp_file.write(pdf_file.read())
pdf_path = tmp_file.name
# PDF ๋กœ๋“œ ๋ฐ ํ…์ŠคํŠธ ์ถ”์ถœ
loader = PyPDFLoader(pdf_path)
pages = loader.load()
# ๋ชจ๋“  ํŽ˜์ด์ง€์˜ ํ…์ŠคํŠธ๋ฅผ ๊ฒฐํ•ฉ
text = "\n".join([page.page_content for page in pages])
# ์ž„์‹œ ํŒŒ์ผ์ธ ๊ฒฝ์šฐ ์‚ญ์ œ
if not isinstance(pdf_file, str) and os.path.exists(pdf_path):
os.unlink(pdf_path)
return text
except Exception as e:
raise RuntimeError(f"Failed to extract text from PDF: {e}")
def _get_messages_formatter_type(self, model_name):
"""Get appropriate message formatter for the model"""
if "Mistral" in model_name or "BitSix" in model_name:
return MessagesFormatterType.CHATML
else:
return MessagesFormatterType.LLAMA_3
@spaces.GPU(duration=120)
def extract_conversation_local(self, text: str, language: str = "English", progress=None) -> Dict:
"""Extract conversation using new local LLM with enhanced professional style"""
try:
# ๊ฒ€์ƒ‰ ์ปจํ…์ŠคํŠธ ์ƒ์„ฑ (ํ‚ค์›Œ๋“œ ๊ธฐ๋ฐ˜์ด ์•„๋‹Œ ๊ฒฝ์šฐ)
search_context = ""
if BRAVE_KEY and not text.startswith("Keyword-based content:"):
try:
keywords = extract_keywords_for_search(text, language)
if keywords:
search_query = keywords[0] if language == "Korean" else f"{keywords[0]} latest news"
search_context = format_search_results(search_query)
print(f"Search context added for: {search_query}")
except Exception as e:
print(f"Search failed, continuing without context: {e}")
# ๋จผ์ € ์ƒˆ๋กœ์šด ๋กœ์ปฌ LLM ์‹œ๋„
self.initialize_local_mode()
chat_template = self._get_messages_formatter_type(self.config.local_model_name)
provider = LlamaCppPythonProvider(self.local_llm)
# ์–ธ์–ด๋ณ„ ์‹œ์Šคํ…œ ๋ฉ”์‹œ์ง€
system_messages = {
"Korean": (
"๋‹น์‹ ์€ ํ•œ๊ตญ์˜ ์œ ๋ช… ํŒŸ์บ์ŠคํŠธ ์ „๋ฌธ ์ž‘๊ฐ€์ž…๋‹ˆ๋‹ค. "
"์ฒญ์ทจ์ž๋“ค์ด ๊นŠ์ด ์žˆ๋Š” ์ „๋ฌธ ์ง€์‹์„ ์–ป์„ ์ˆ˜ ์žˆ๋Š” ๊ณ ํ’ˆ์งˆ ๋Œ€๋‹ด์„ ํ•œ๊ตญ์–ด๋กœ ๋งŒ๋“ญ๋‹ˆ๋‹ค. "
"๋ฐ˜๋“œ์‹œ ์„œ๋กœ ์กด๋Œ“๋ง์„ ์‚ฌ์šฉํ•˜๋ฉฐ, 12ํšŒ์˜ ๋Œ€ํ™” ๊ตํ™˜์œผ๋กœ ๊ตฌ์„ฑํ•˜์„ธ์š”. "
"๋ชจ๋“  ๋Œ€ํ™”๋Š” ๋ฐ˜๋“œ์‹œ ํ•œ๊ตญ์–ด๋กœ ์ž‘์„ฑํ•˜๊ณ  JSON ํ˜•์‹์œผ๋กœ๋งŒ ์‘๋‹ตํ•˜์„ธ์š”."
),
"Japanese": (
"ใ‚ใชใŸใฏๆ—ฅๆœฌใฎๆœ‰ๅใชใƒใƒƒใƒ‰ใ‚ญใƒฃใ‚นใƒˆๅฐ‚้–€ไฝœๅฎถใงใ™ใ€‚"
"่ด่ก†ใŒๆทฑใ„ๅฐ‚้–€็Ÿฅ่ญ˜ใ‚’ๅพ—ใ‚‰ใ‚Œใ‚‹้ซ˜ๅ“่ณชใชๅฏพ่ซ‡ใ‚’ๆ—ฅๆœฌ่ชžใงไฝœๆˆใ—ใพใ™ใ€‚"
"ๅฟ…ใšใŠไบ’ใ„ใซไธๅฏง่ชžใ‚’ไฝฟ็”จใ—ใ€12ๅ›žใฎๅฏพ่ฉฑไบคๆ›ใงๆง‹ๆˆใ—ใฆใใ ใ•ใ„ใ€‚"
"ใ™ในใฆใฎๅฏพ่ฉฑใฏๅฟ…ใšๆ—ฅๆœฌ่ชžใงไฝœๆˆใ—ใ€JSONๅฝขๅผใงใฎใฟๅ›ž็ญ”ใ—ใฆใใ ใ•ใ„ใ€‚"
),
"French": (
"Vous รชtes un cรฉlรจbre scรฉnariste de podcast professionnel franรงais. "
"Crรฉez des discussions de haute qualitรฉ en franรงais qui donnent au public "
"des connaissances professionnelles approfondies. "
"Crรฉez exactement 12 รฉchanges de conversation et rรฉpondez uniquement en format JSON."
),
"German": (
"Sie sind ein berรผhmter professioneller Podcast-Drehbuchautor aus Deutschland. "
"Erstellen Sie hochwertige Diskussionen auf Deutsch, die dem Publikum "
"tiefgreifendes Fachwissen vermitteln. "
"Erstellen Sie genau 12 Gesprรคchsaustausche und antworten Sie nur im JSON-Format."
),
"Spanish": (
"Eres un famoso guionista de podcast profesional espaรฑol. "
"Crea discusiones de alta calidad en espaรฑol que brinden al pรบblico "
"conocimientos profesionales profundos. "
"Crea exactamente 12 intercambios de conversaciรณn y responde solo en formato JSON."
),
"Chinese": (
"ๆ‚จๆ˜ฏไธญๅ›ฝ่‘—ๅ็š„ไธ“ไธšๆ’ญๅฎข็ผ–ๅ‰งใ€‚"
"ๅˆ›ๅปบ้ซ˜่ดจ้‡็š„ไธญๆ–‡่ฎจ่ฎบ๏ผŒไธบ่ง‚ไผ—ๆไพ›ๆทฑๅ…ฅ็š„ไธ“ไธš็Ÿฅ่ฏ†ใ€‚"
"ๅˆ›ๅปบๆฐๅฅฝ12ๆฌกๅฏน่ฏไบคๆข๏ผŒไป…ไปฅJSONๆ ผๅผๅ›ž็ญ”ใ€‚"
),
"Russian": (
"ะ’ั‹ ะธะทะฒะตัั‚ะฝั‹ะน ะฟั€ะพั„ะตััะธะพะฝะฐะปัŒะฝั‹ะน ัั†ะตะฝะฐั€ะธัั‚ ะฟะพะดะบะฐัั‚ะพะฒ ะธะท ะ ะพััะธะธ. "
"ะกะพะทะดะฐะฒะฐะนั‚ะต ะฒั‹ัะพะบะพะบะฐั‡ะตัั‚ะฒะตะฝะฝั‹ะต ะดะธัะบัƒััะธะธ ะฝะฐ ั€ัƒััะบะพะผ ัะทั‹ะบะต, ะบะพั‚ะพั€ั‹ะต ะดะฐัŽั‚ ะฐัƒะดะธั‚ะพั€ะธะธ "
"ะณะปัƒะฑะพะบะธะต ะฟั€ะพั„ะตััะธะพะฝะฐะปัŒะฝั‹ะต ะทะฝะฐะฝะธั. "
"ะกะพะทะดะฐะนั‚ะต ั€ะพะฒะฝะพ 12 ะพะฑะผะตะฝะพะฒ ั€ะฐะทะณะพะฒะพั€ะพะผ ะธ ะพั‚ะฒะตั‡ะฐะนั‚ะต ั‚ะพะปัŒะบะพ ะฒ ั„ะพั€ะผะฐั‚ะต JSON."
)
}
system_message = system_messages.get(language,
f"You are a professional podcast scriptwriter creating high-quality, "
f"insightful discussions in {language}. Create exactly 12 conversation exchanges "
f"with professional expertise. All dialogue must be in {language}. "
f"Respond only in JSON format."
)
agent = LlamaCppAgent(
provider,
system_prompt=system_message,
predefined_messages_formatter_type=chat_template,
debug_output=False
)
settings = provider.get_provider_default_settings()
settings.temperature = 0.75
settings.top_k = 40
settings.top_p = 0.95
settings.max_tokens = self.config.max_tokens
settings.repeat_penalty = 1.1
settings.stream = False
messages = BasicChatHistory()
prompt = self.prompt_builder.build_prompt(text, language, search_context)
response = agent.get_chat_response(
prompt,
llm_sampling_settings=settings,
chat_history=messages,
returns_streaming_generator=False,
print_output=False
)
# JSON ํŒŒ์‹ฑ
pattern = r"\{(?:[^{}]|(?:\{[^{}]*\}))*\}"
json_match = re.search(pattern, response)
if json_match:
conversation_data = json.loads(json_match.group())
return conversation_data
else:
raise ValueError("No valid JSON found in local LLM response")
except Exception as e:
print(f"Local LLM failed: {e}, falling back to legacy local method")
return self.extract_conversation_legacy_local(text, language, progress, search_context)
@spaces.GPU(duration=120)
def extract_conversation_legacy_local(self, text: str, language: str = "English", progress=None, search_context: str = "") -> Dict:
"""Extract conversation using legacy local model"""
try:
self.initialize_legacy_local_mode()
# ์–ธ์–ด๋ณ„ ์‹œ์Šคํ…œ ๋ฉ”์‹œ์ง€๋Š” config_prompts์—์„œ ๊ฐ€์ ธ์˜ด
messages = self.prompt_builder.build_messages_for_local(text, language, search_context)
terminators = [
self.legacy_tokenizer.eos_token_id,
self.legacy_tokenizer.convert_tokens_to_ids("<|eot_id|>")
]
chat_messages = self.legacy_tokenizer.apply_chat_template(
messages, tokenize=False, add_generation_prompt=True
)
model_inputs = self.legacy_tokenizer([chat_messages], return_tensors="pt").to(self.device)
streamer = TextIteratorStreamer(
self.legacy_tokenizer, timeout=10.0, skip_prompt=True, skip_special_tokens=True
)
generate_kwargs = dict(
model_inputs,
streamer=streamer,
max_new_tokens=self.config.max_new_tokens,
do_sample=True,
temperature=0.75,
eos_token_id=terminators,
)
t = Thread(target=self.legacy_local_model.generate, kwargs=generate_kwargs)
t.start()
partial_text = ""
for new_text in streamer:
partial_text += new_text
pattern = r"\{(?:[^{}]|(?:\{[^{}]*\}))*\}"
json_match = re.search(pattern, partial_text)
if json_match:
return json.loads(json_match.group())
else:
raise ValueError("No valid JSON found in legacy local response")
except Exception as e:
print(f"Legacy local model also failed: {e}")
return DefaultConversations.get_conversation(language)
def extract_conversation_api(self, text: str, language: str = "English") -> Dict:
"""Extract conversation using API"""
if not self.llm_client:
raise RuntimeError("API mode not initialized")
try:
# ๊ฒ€์ƒ‰ ์ปจํ…์ŠคํŠธ ์ƒ์„ฑ
search_context = ""
if BRAVE_KEY and not text.startswith("Keyword-based content:"):
try:
keywords = extract_keywords_for_search(text, language)
if keywords:
search_query = keywords[0] if language == "Korean" else f"{keywords[0]} latest news"
search_context = format_search_results(search_query)
print(f"Search context added for: {search_query}")
except Exception as e:
print(f"Search failed, continuing without context: {e}")
# ๋ฉ”์‹œ์ง€ ๋นŒ๋“œ
messages = self.prompt_builder.build_messages_for_local(text, language, search_context)
chat_completion = self.llm_client.chat.completions.create(
messages=messages,
model=self.config.api_model_name,
temperature=0.75,
)
pattern = r"\{(?:[^{}]|(?:\{[^{}]*\}))*\}"
json_match = re.search(pattern, chat_completion.choices[0].message.content)
if not json_match:
raise ValueError("No valid JSON found in response")
return json.loads(json_match.group())
except Exception as e:
raise RuntimeError(f"Failed to extract conversation: {e}")
def parse_conversation_text(self, conversation_text: str) -> Dict:
"""Parse conversation text back to JSON format"""
lines = conversation_text.strip().split('\n')
conversation_data = {"conversation": []}
for line in lines:
if ':' in line:
speaker, text = line.split(':', 1)
conversation_data["conversation"].append({
"speaker": speaker.strip(),
"text": text.strip()
})
return conversation_data
async def text_to_speech_edge(self, conversation_json: Dict, language: str = "English") -> Tuple[str, str]:
"""Convert text to speech using Edge TTS"""
output_dir = Path(self._create_output_directory())
filenames = []
try:
# ์–ธ์–ด๋ณ„ ์Œ์„ฑ ์„ค์ •
voices = EDGE_TTS_VOICES.get(language, EDGE_TTS_VOICES["English"])
for i, turn in enumerate(conversation_json["conversation"]):
filename = output_dir / f"output_{i}.wav"
voice = voices[i % len(voices)]
tmp_path = await self._generate_audio_edge(turn["text"], voice)
os.rename(tmp_path, filename)
filenames.append(str(filename))
# Combine audio files
final_output = os.path.join(output_dir, "combined_output.wav")
self._combine_audio_files(filenames, final_output)
# Generate conversation text
conversation_text = "\n".join(
f"{turn.get('speaker', f'Speaker {i+1}')}: {turn['text']}"
for i, turn in enumerate(conversation_json["conversation"])
)
return final_output, conversation_text
except Exception as e:
raise RuntimeError(f"Failed to convert text to speech: {e}")
async def _generate_audio_edge(self, text: str, voice: str) -> str:
"""Generate audio using Edge TTS"""
if not text.strip():
raise ValueError("Text cannot be empty")
voice_short_name = voice.split(" - ")[0] if " - " in voice else voice
communicate = edge_tts.Communicate(text, voice_short_name)
with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as tmp_file:
tmp_path = tmp_file.name
await communicate.save(tmp_path)
return tmp_path
@spaces.GPU(duration=60)
def text_to_speech_spark(self, conversation_json: Dict, language: str = "English", progress=None) -> Tuple[str, str]:
"""Convert text to speech using Spark TTS CLI"""
if not SPARK_AVAILABLE or not self.spark_model_dir:
raise RuntimeError("Spark TTS not available")
try:
output_dir = self._create_output_directory()
audio_files = []
# Create different voice characteristics for different speakers
speaker1, speaker2 = self.prompt_builder.get_speaker_names(language)
if language == "Korean":
voice_configs = [
{"prompt_text": f"์•ˆ๋…•ํ•˜์„ธ์š”, ์˜ค๋Š˜ ํŒŸ์บ์ŠคํŠธ ์ง„ํ–‰์„ ๋งก์€ {speaker1}์ž…๋‹ˆ๋‹ค.", "gender": "male"},
{"prompt_text": f"์•ˆ๋…•ํ•˜์„ธ์š”, ์ €๋Š” ์˜ค๋Š˜ ์ด ์ฃผ์ œ์— ๋Œ€ํ•ด ์„ค๋ช…๋“œ๋ฆด {speaker2}์ž…๋‹ˆ๋‹ค.", "gender": "male"}
]
else:
voice_configs = [
{"prompt_text": f"Hello everyone, I'm {speaker1}, your host for today's podcast.", "gender": "male"},
{"prompt_text": f"Hi, I'm {speaker2}. I'm excited to share my insights with you.", "gender": "male"}
]
for i, turn in enumerate(conversation_json["conversation"]):
text = turn["text"]
if not text.strip():
continue
voice_config = voice_configs[i % len(voice_configs)]
output_file = os.path.join(output_dir, f"spark_output_{i}.wav")
cmd = [
"python", "-m", "cli.inference",
"--text", text,
"--device", "0" if torch.cuda.is_available() else "cpu",
"--save_dir", output_dir,
"--model_dir", self.spark_model_dir,
"--prompt_text", voice_config["prompt_text"],
"--output_name", f"spark_output_{i}.wav"
]
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=60,
cwd="."
)
if result.returncode == 0:
audio_files.append(output_file)
else:
print(f"Spark TTS error for turn {i}: {result.stderr}")
silence = np.zeros(int(22050 * 1.0))
sf.write(output_file, silence, 22050)
audio_files.append(output_file)
except subprocess.TimeoutExpired:
print(f"Spark TTS timeout for turn {i}")
silence = np.zeros(int(22050 * 1.0))
sf.write(output_file, silence, 22050)
audio_files.append(output_file)
except Exception as e:
print(f"Error running Spark TTS for turn {i}: {e}")
silence = np.zeros(int(22050 * 1.0))
sf.write(output_file, silence, 22050)
audio_files.append(output_file)
# Combine all audio files
if audio_files:
final_output = os.path.join(output_dir, "spark_combined.wav")
self._combine_audio_files(audio_files, final_output)
else:
raise RuntimeError("No audio files generated")
conversation_text = "\n".join(
f"{turn.get('speaker', f'Speaker {i+1}')}: {turn['text']}"
for i, turn in enumerate(conversation_json["conversation"])
)
return final_output, conversation_text
except Exception as e:
raise RuntimeError(f"Failed to convert text to speech with Spark TTS: {e}")
@spaces.GPU(duration=60)
def text_to_speech_melo(self, conversation_json: Dict, progress=None) -> Tuple[str, str]:
"""Convert text to speech using MeloTTS"""
if not MELO_AVAILABLE or not self.melo_models:
raise RuntimeError("MeloTTS not available")
speakers = ["EN-Default", "EN-US"]
combined_audio = AudioSegment.empty()
for i, turn in enumerate(conversation_json["conversation"]):
bio = io.BytesIO()
text = turn["text"]
speaker = speakers[i % 2]
speaker_id = self.melo_models["EN"].hps.data.spk2id[speaker]
self.melo_models["EN"].tts_to_file(
text, speaker_id, bio, speed=1.0,
pbar=progress.tqdm if progress else None,
format="wav"
)
bio.seek(0)
audio_segment = AudioSegment.from_file(bio, format="wav")
combined_audio += audio_segment
final_audio_path = "melo_podcast.mp3"
combined_audio.export(final_audio_path, format="mp3")
conversation_text = "\n".join(
f"{turn.get('speaker', f'Speaker {i+1}')}: {turn['text']}"
for i, turn in enumerate(import spaces
import gradio as gr
import os
import asyncio
import torch
import io
import json
import re
import httpx
import tempfile
import wave
import base64
import numpy as np
import soundfile as sf
import subprocess
import shutil
import requests
import logging
from datetime import datetime, timedelta
from typing import List, Tuple, Dict, Optional
from pathlib import Path
from threading import Thread
from dotenv import load_dotenv
# PDF processing imports
from langchain_community.document_loaders import PyPDFLoader
# Edge TTS imports
import edge_tts
from pydub import AudioSegment
# OpenAI imports
from openai import OpenAI
# Transformers imports (for legacy local mode)
from transformers import (
AutoModelForCausalLM,
AutoTokenizer,
TextIteratorStreamer,
BitsAndBytesConfig,
)
# Llama CPP imports (for new local mode)
try:
from llama_cpp import Llama
from llama_cpp_agent import LlamaCppAgent, MessagesFormatterType
from llama_cpp_agent.providers import LlamaCppPythonProvider
from llama_cpp_agent.chat_history import BasicChatHistory
from llama_cpp_agent.chat_history.messages import Roles
from huggingface_hub import hf_hub_download
LLAMA_CPP_AVAILABLE = True
except ImportError:
LLAMA_CPP_AVAILABLE = False
# Spark TTS imports
try:
from huggingface_hub import snapshot_download
SPARK_AVAILABLE = True
except:
SPARK_AVAILABLE = False
# MeloTTS imports (for local mode)
try:
# unidic ๋‹ค์šด๋กœ๋“œ๋ฅผ ์กฐ๊ฑด๋ถ€๋กœ ์ฒ˜๋ฆฌ
if not os.path.exists("/usr/local/lib/python3.10/site-packages/unidic"):
try:
os.system("python -m unidic download")
except:
pass
from melo.api import TTS as MeloTTS
MELO_AVAILABLE = True
except:
MELO_AVAILABLE = False
# Import config and prompts
from config_prompts import (
ConversationConfig,
PromptBuilder,
DefaultConversations,
EDGE_TTS_ONLY_LANGUAGES,
EDGE_TTS_VOICES
)
load_dotenv()
# Brave Search API ์„ค์ •
BRAVE_KEY = os.getenv("BSEARCH_API")
BRAVE_ENDPOINT = "https://api.search.brave.com/res/v1/web/search"
def brave_search(query: str, count: int = 8, freshness_days: int | None = None):
"""Brave Search API๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ์ตœ์‹  ์ •๋ณด ๊ฒ€์ƒ‰"""
if not BRAVE_KEY:
return []
params = {"q": query, "count": str(count)}
if freshness_days:
dt_from = (datetime.utcnow() - timedelta(days=freshness_days)).strftime("%Y-%m-%d")
params["freshness"] = dt_from
try:
r = requests.get(
BRAVE_ENDPOINT,
headers={"Accept": "application/json", "X-Subscription-Token": BRAVE_KEY},
params=params,
timeout=15
)
raw = r.json().get("web", {}).get("results") or []
return [{
"title": r.get("title", ""),
"url": r.get("url", r.get("link", "")),
"snippet": r.get("description", r.get("text", "")),
"host": re.sub(r"https?://(www\.)?", "", r.get("url", "")).split("/")[0]
} for r in raw[:count]]
except Exception as e:
logging.error(f"Brave search error: {e}")
return []
def format_search_results(query: str, for_keyword: bool = False) -> str:
"""๊ฒ€์ƒ‰ ๊ฒฐ๊ณผ๋ฅผ ํฌ๋งทํŒ…ํ•˜์—ฌ ๋ฐ˜ํ™˜"""
# ํ‚ค์›Œ๋“œ ๊ฒ€์ƒ‰์˜ ๊ฒฝ์šฐ ๋” ๋งŽ์€ ๊ฒฐ๊ณผ ์‚ฌ์šฉ
count = 5 if for_keyword else 3
rows = brave_search(query, count, freshness_days=7 if not for_keyword else None)
if not rows:
return ""
results = []
# ํ‚ค์›Œ๋“œ ๊ฒ€์ƒ‰์˜ ๊ฒฝ์šฐ ๋” ์ƒ์„ธํ•œ ์ •๋ณด ํฌํ•จ
max_results = 4 if for_keyword else 2
for r in rows[:max_results]:
if for_keyword:
# ํ‚ค์›Œ๋“œ ๊ฒ€์ƒ‰์€ ๋” ๊ธด ์Šค๋‹ˆํŽซ ์‚ฌ์šฉ
snippet = r['snippet'][:200] + "..." if len(r['snippet']) > 200 else r['snippet']
results.append(f"**{r['title']}**\n{snippet}\nSource: {r['host']}")
else:
# ์ผ๋ฐ˜ ๊ฒ€์ƒ‰์€ ์งง์€ ์Šค๋‹ˆํŽซ
snippet = r['snippet'][:100] + "..." if len(r['snippet']) > 100 else r['snippet']
results.append(f"- {r['title']}: {snippet}")
return "\n\n".join(results) + "\n"
def extract_keywords_for_search(text: str, language: str = "English") -> List[str]:
"""ํ…์ŠคํŠธ์—์„œ ๊ฒ€์ƒ‰ํ•  ํ‚ค์›Œ๋“œ ์ถ”์ถœ (๊ฐœ์„ )"""
# ํ…์ŠคํŠธ ์•ž๋ถ€๋ถ„๋งŒ ์‚ฌ์šฉ (๋„ˆ๋ฌด ๋งŽ์€ ํ…์ŠคํŠธ ์ฒ˜๋ฆฌ ๋ฐฉ์ง€)
text_sample = text[:500]
if language == "Korean":
import re
# ํ•œ๊ตญ์–ด ๋ช…์‚ฌ ์ถ”์ถœ (2๊ธ€์ž ์ด์ƒ)
keywords = re.findall(r'[๊ฐ€-ํžฃ]{2,}', text_sample)
# ์ค‘๋ณต ์ œ๊ฑฐํ•˜๊ณ  ๊ฐ€์žฅ ๊ธด ๋‹จ์–ด 1๊ฐœ๋งŒ ์„ ํƒ
unique_keywords = list(dict.fromkeys(keywords))
# ๊ธธ์ด ์ˆœ์œผ๋กœ ์ •๋ ฌํ•˜๊ณ  ๊ฐ€์žฅ ์˜๋ฏธ์žˆ์„ ๊ฒƒ ๊ฐ™์€ ๋‹จ์–ด ์„ ํƒ
unique_keywords.sort(key=len, reverse=True)
return unique_keywords[:1] # 1๊ฐœ๋งŒ ๋ฐ˜ํ™˜
else:
# ์˜์–ด๋Š” ๋Œ€๋ฌธ์ž๋กœ ์‹œ์ž‘ํ•˜๋Š” ๋‹จ์–ด ์ค‘ ๊ฐ€์žฅ ๊ธด ๊ฒƒ 1๊ฐœ
words = text_sample.split()
keywords = [word.strip('.,!?;:') for word in words
if len(word) > 4 and word[0].isupper()]
if keywords:
return [max(keywords, key=len)] # ๊ฐ€์žฅ ๊ธด ๋‹จ์–ด 1๊ฐœ
return []
def search_and_compile_content(keyword: str, language: str = "English") -> str:
"""ํ‚ค์›Œ๋“œ๋กœ ๊ฒ€์ƒ‰ํ•˜์—ฌ ์ถฉ๋ถ„ํ•œ ์ฝ˜ํ…์ธ  ์ปดํŒŒ์ผ"""
if not BRAVE_KEY:
# API ์—†์„ ๋•Œ๋„ ๊ธฐ๋ณธ ์ฝ˜ํ…์ธ  ์ƒ์„ฑ
if language == "Korean":
return f"""
'{keyword}'์— ๋Œ€ํ•œ ์ข…ํ•ฉ์ ์ธ ์ •๋ณด:
{keyword}๋Š” ํ˜„๋Œ€ ์‚ฌํšŒ์—์„œ ๋งค์šฐ ์ค‘์š”ํ•œ ์ฃผ์ œ์ž…๋‹ˆ๋‹ค.
์ด ์ฃผ์ œ๋Š” ๋‹ค์–‘ํ•œ ์ธก๋ฉด์—์„œ ์šฐ๋ฆฌ์˜ ์‚ถ์— ์˜ํ–ฅ์„ ๋ฏธ์น˜๊ณ  ์žˆ์œผ๋ฉฐ,
์ตœ๊ทผ ๋“ค์–ด ๋”์šฑ ์ฃผ๋ชฉ๋ฐ›๊ณ  ์žˆ์Šต๋‹ˆ๋‹ค.
์ฃผ์š” ํŠน์ง•:
1. ๊ธฐ์ˆ ์  ๋ฐœ์ „๊ณผ ํ˜์‹ 
2. ์‚ฌํšŒ์  ์˜ํ–ฅ๊ณผ ๋ณ€ํ™”
3. ๋ฏธ๋ž˜ ์ „๋ง๊ณผ ๊ฐ€๋Šฅ์„ฑ
4. ์‹ค์šฉ์  ํ™œ์šฉ ๋ฐฉ์•ˆ
5. ๊ธ€๋กœ๋ฒŒ ํŠธ๋ Œ๋“œ์™€ ๋™ํ–ฅ
์ „๋ฌธ๊ฐ€๋“ค์€ {keyword}๊ฐ€ ์•ž์œผ๋กœ ๋”์šฑ ์ค‘์š”ํ•ด์งˆ ๊ฒƒ์œผ๋กœ ์˜ˆ์ƒํ•˜๊ณ  ์žˆ์œผ๋ฉฐ,
์ด์— ๋Œ€ํ•œ ๊นŠ์ด ์žˆ๋Š” ์ดํ•ด๊ฐ€ ํ•„์š”ํ•œ ์‹œ์ ์ž…๋‹ˆ๋‹ค.
"""
else:
return f"""
Comprehensive information about '{keyword}':
{keyword} is a significant topic in modern society.
This subject impacts our lives in various ways and has been
gaining increasing attention recently.
Key aspects:
1. Technological advancement and innovation
2. Social impact and changes
3. Future prospects and possibilities
4. Practical applications
5. Global trends and developments
Experts predict that {keyword} will become even more important,
and it's crucial to develop a deep understanding of this topic.
"""
# ์–ธ์–ด์— ๋”ฐ๋ฅธ ๋‹ค์–‘ํ•œ ๊ฒ€์ƒ‰ ์ฟผ๋ฆฌ
if language == "Korean":
queries = [
f"{keyword} ์ตœ์‹  ๋‰ด์Šค 2024",
f"{keyword} ์ •๋ณด ์„ค๋ช…",
f"{keyword} ํŠธ๋ Œ๋“œ ์ „๋ง",
f"{keyword} ์žฅ์  ๋‹จ์ ",
f"{keyword} ํ™œ์šฉ ๋ฐฉ๋ฒ•",
f"{keyword} ์ „๋ฌธ๊ฐ€ ์˜๊ฒฌ"
]
else:
queries = [
f"{keyword} latest news 2024",
f"{keyword} explained comprehensive",
f"{keyword} trends forecast",
f"{keyword} advantages disadvantages",
f"{keyword} how to use",
f"{keyword} expert opinions"
]
all_content = []
total_content_length = 0
for query in queries:
results = brave_search(query, count=5) # ๋” ๋งŽ์€ ๊ฒฐ๊ณผ ๊ฐ€์ ธ์˜ค๊ธฐ
for r in results[:3]: # ๊ฐ ์ฟผ๋ฆฌ๋‹น ์ƒ์œ„ 3๊ฐœ
content = f"**{r['title']}**\n{r['snippet']}\nSource: {r['host']}\n"
all_content.append(content)
total_content_length += len(r['snippet'])
# ์ฝ˜ํ…์ธ ๊ฐ€ ๋ถ€์กฑํ•˜๋ฉด ์ถ”๊ฐ€ ์ƒ์„ฑ
if total_content_length < 1000: # ์ตœ์†Œ 1000์ž ํ™•๋ณด
if language == "Korean":
additional_content = f"""
์ถ”๊ฐ€ ์ •๋ณด:
{keyword}์™€ ๊ด€๋ จ๋œ ์ตœ๊ทผ ๋™ํ–ฅ์„ ์‚ดํŽด๋ณด๋ฉด, ์ด ๋ถ„์•ผ๋Š” ๋น ๋ฅด๊ฒŒ ๋ฐœ์ „ํ•˜๊ณ  ์žˆ์Šต๋‹ˆ๋‹ค.
๋งŽ์€ ์ „๋ฌธ๊ฐ€๋“ค์ด ์ด ์ฃผ์ œ์— ๋Œ€ํ•ด ํ™œ๋ฐœํžˆ ์—ฐ๊ตฌํ•˜๊ณ  ์žˆ์œผ๋ฉฐ,
์‹ค์ƒํ™œ์—์„œ์˜ ์‘์šฉ ๊ฐ€๋Šฅ์„ฑ๋„ ๊ณ„์† ํ™•๋Œ€๋˜๊ณ  ์žˆ์Šต๋‹ˆ๋‹ค.
ํŠนํžˆ ์ฃผ๋ชฉํ•  ์ ์€:
- ๊ธฐ์ˆ  ํ˜์‹ ์˜ ๊ฐ€์†ํ™”
- ์‚ฌ์šฉ์ž ๊ฒฝํ—˜์˜ ๊ฐœ์„ 
- ์ ‘๊ทผ์„ฑ์˜ ํ–ฅ์ƒ
- ๋น„์šฉ ํšจ์œจ์„ฑ ์ฆ๋Œ€
- ๊ธ€๋กœ๋ฒŒ ์‹œ์žฅ์˜ ์„ฑ์žฅ
์ด๋Ÿฌํ•œ ์š”์†Œ๋“ค์ด {keyword}์˜ ๋ฏธ๋ž˜๋ฅผ ๋”์šฑ ๋ฐ๊ฒŒ ๋งŒ๋“ค๊ณ  ์žˆ์Šต๋‹ˆ๋‹ค.
"""
else:
additional_content = f"""
Additional insights:
Recent developments in {keyword} show rapid advancement in this field.
Many experts are actively researching this topic, and its practical
applications continue to expand.
Key points to note:
- Accelerating technological innovation
- Improving user experience
- Enhanced accessibility
- Increased cost efficiency
- Growing global market
These factors are making the future of {keyword} increasingly promising.
"""
all_content.append(additional_content)
# ์ปดํŒŒ์ผ๋œ ์ฝ˜ํ…์ธ  ๋ฐ˜ํ™˜
compiled = "\n\n".join(all_content)
# ํ‚ค์›Œ๋“œ ๊ธฐ๋ฐ˜ ์†Œ๊ฐœ
if language == "Korean":
intro = f"### '{keyword}'์— ๋Œ€ํ•œ ์ข…ํ•ฉ์ ์ธ ์ •๋ณด์™€ ์ตœ์‹  ๋™ํ–ฅ:\n\n"
else:
intro = f"### Comprehensive information and latest trends about '{keyword}':\n\n"
return intro + compiled
class UnifiedAudioConverter:
def __init__(self, config: ConversationConfig):
self.config = config
self.llm_client = None
self.legacy_local_model = None
self.legacy_tokenizer = None
# ์ƒˆ๋กœ์šด ๋กœ์ปฌ LLM ๊ด€๋ จ
self.local_llm = None
self.local_llm_model = None
self.melo_models = None
self.spark_model_dir = None
self.device = "cuda" if torch.cuda.is_available() else "cpu"
# ํ”„๋กฌํ”„ํŠธ ๋นŒ๋” ์ถ”๊ฐ€
self.prompt_builder = PromptBuilder()
def initialize_api_mode(self, api_key: str):
"""Initialize API mode with Together API"""
self.llm_client = OpenAI(api_key=api_key, base_url="https://api.together.xyz/v1")
@spaces.GPU(duration=120)
def initialize_local_mode(self):
"""Initialize new local mode with Llama CPP"""
if not LLAMA_CPP_AVAILABLE:
raise RuntimeError("Llama CPP dependencies not available. Please install llama-cpp-python and llama-cpp-agent.")
if self.local_llm is None or self.local_llm_model != self.config.local_model_name:
try:
# ๋ชจ๋ธ ๋‹ค์šด๋กœ๋“œ
model_path = hf_hub_download(
repo_id=self.config.local_model_repo,
filename=self.config.local_model_name,
local_dir="./models"
)
model_path_local = os.path.join("./models", self.config.local_model_name)
if not os.path.exists(model_path_local):
raise RuntimeError(f"Model file not found at {model_path_local}")
# Llama ๋ชจ๋ธ ์ดˆ๊ธฐํ™”
self.local_llm = Llama(
model_path=model_path_local,
flash_attn=True,
n_gpu_layers=81 if torch.cuda.is_available() else 0,
n_batch=1024,
n_ctx=16384,
)
self.local_llm_model = self.config.local_model_name
print(f"Local LLM initialized: {model_path_local}")
except Exception as e:
print(f"Failed to initialize local LLM: {e}")
raise RuntimeError(f"Failed to initialize local LLM: {e}")
@spaces.GPU(duration=60)
def initialize_legacy_local_mode(self):
"""Initialize legacy local mode with Hugging Face model (fallback)"""
if self.legacy_local_model is None:
quantization_config = BitsAndBytesConfig(
load_in_4bit=True,
bnb_4bit_compute_dtype=torch.float16
)
self.legacy_local_model = AutoModelForCausalLM.from_pretrained(
self.config.legacy_local_model_name,
quantization_config=quantization_config
)
self.legacy_tokenizer = AutoTokenizer.from_pretrained(
self.config.legacy_local_model_name,
revision='8ab73a6800796d84448bc936db9bac5ad9f984ae'
)
def initialize_spark_tts(self):
"""Initialize Spark TTS model by downloading if needed"""
if not SPARK_AVAILABLE:
raise RuntimeError("Spark TTS dependencies not available")
model_dir = "pretrained_models/Spark-TTS-0.5B"
# Check if model exists, if not download it
if not os.path.exists(model_dir):
print("Downloading Spark-TTS model...")
try:
os.makedirs("pretrained_models", exist_ok=True)
snapshot_download(
"SparkAudio/Spark-TTS-0.5B",
local_dir=model_dir
)
print("Spark-TTS model downloaded successfully")
except Exception as e:
raise RuntimeError(f"Failed to download Spark-TTS model: {e}")
self.spark_model_dir = model_dir
# Check if we have the CLI inference script
if not os.path.exists("cli/inference.py"):
print("Warning: Spark-TTS CLI not found. Please clone the Spark-TTS repository.")
@spaces.GPU(duration=60)
def initialize_melo_tts(self):
"""Initialize MeloTTS models"""
if MELO_AVAILABLE and self.melo_models is None:
self.melo_models = {"EN": MeloTTS(language="EN", device=self.device)}
def fetch_text(self, url: str) -> str:
"""Fetch text content from URL"""
if not url:
raise ValueError("URL cannot be empty")
if not url.startswith("http://") and not url.startswith("https://"):
raise ValueError("URL must start with 'http://' or 'https://'")
full_url = f"{self.config.prefix_url}{url}"
try:
response = httpx.get(full_url, timeout=60.0)
response.raise_for_status()
return response.text
except httpx.HTTPError as e:
raise RuntimeError(f"Failed to fetch URL: {e}")
def extract_text_from_pdf(self, pdf_file) -> str:
"""Extract text content from PDF file"""
try:
# Gradio returns file path, not file object
if isinstance(pdf_file, str):
pdf_path = pdf_file
else:
# If it's a file object (shouldn't happen with Gradio)
with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tmp_file:
tmp_file.write(pdf_file.read())
pdf_path = tmp_file.name
# PDF ๋กœ๋“œ ๋ฐ ํ…์ŠคํŠธ ์ถ”์ถœ
loader = PyPDFLoader(pdf_path)
pages = loader.load()
# ๋ชจ๋“  ํŽ˜์ด์ง€์˜ ํ…์ŠคํŠธ๋ฅผ ๊ฒฐํ•ฉ
text = "\n".join([page.page_content for page in pages])
# ์ž„์‹œ ํŒŒ์ผ์ธ ๊ฒฝ์šฐ ์‚ญ์ œ
if not isinstance(pdf_file, str) and os.path.exists(pdf_path):
os.unlink(pdf_path)
return text
except Exception as e:
raise RuntimeError(f"Failed to extract text from PDF: {e}")
def _get_messages_formatter_type(self, model_name):
"""Get appropriate message formatter for the model"""
if "Mistral" in model_name or "BitSix" in model_name:
return MessagesFormatterType.CHATML
else:
return MessagesFormatterType.LLAMA_3
@spaces.GPU(duration=120)
def extract_conversation_local(self, text: str, language: str = "English", progress=None) -> Dict:
"""Extract conversation using new local LLM with enhanced professional style"""
try:
# ๊ฒ€์ƒ‰ ์ปจํ…์ŠคํŠธ ์ƒ์„ฑ (ํ‚ค์›Œ๋“œ ๊ธฐ๋ฐ˜์ด ์•„๋‹Œ ๊ฒฝ์šฐ)
search_context = ""
if BRAVE_KEY and not text.startswith("Keyword-based content:"):
try:
keywords = extract_keywords_for_search(text, language)
if keywords:
search_query = keywords[0] if language == "Korean" else f"{keywords[0]} latest news"
search_context = format_search_results(search_query)
print(f"Search context added for: {search_query}")
except Exception as e:
print(f"Search failed, continuing without context: {e}")
# ๋จผ์ € ์ƒˆ๋กœ์šด ๋กœ์ปฌ LLM ์‹œ๋„
self.initialize_local_mode()
chat_template = self._get_messages_formatter_type(self.config.local_model_name)
provider = LlamaCppPythonProvider(self.local_llm)
# ์–ธ์–ด๋ณ„ ์‹œ์Šคํ…œ ๋ฉ”์‹œ์ง€
system_messages = {
"Korean": (
"๋‹น์‹ ์€ ํ•œ๊ตญ์˜ ์œ ๋ช… ํŒŸ์บ์ŠคํŠธ ์ „๋ฌธ ์ž‘๊ฐ€์ž…๋‹ˆ๋‹ค. "
"์ฒญ์ทจ์ž๋“ค์ด ๊นŠ์ด ์žˆ๋Š” ์ „๋ฌธ ์ง€์‹์„ ์–ป์„ ์ˆ˜ ์žˆ๋Š” ๊ณ ํ’ˆ์งˆ ๋Œ€๋‹ด์„ ํ•œ๊ตญ์–ด๋กœ ๋งŒ๋“ญ๋‹ˆ๋‹ค. "
"๋ฐ˜๋“œ์‹œ ์„œ๋กœ ์กด๋Œ“๋ง์„ ์‚ฌ์šฉํ•˜๋ฉฐ, 12ํšŒ์˜ ๋Œ€ํ™” ๊ตํ™˜์œผ๋กœ ๊ตฌ์„ฑํ•˜์„ธ์š”. "
"๋ชจ๋“  ๋Œ€ํ™”๋Š” ๋ฐ˜๋“œ์‹œ ํ•œ๊ตญ์–ด๋กœ ์ž‘์„ฑํ•˜๊ณ  JSON ํ˜•์‹์œผ๋กœ๋งŒ ์‘๋‹ตํ•˜์„ธ์š”."
),
"Japanese": (
"ใ‚ใชใŸใฏๆ—ฅๆœฌใฎๆœ‰ๅใชใƒใƒƒใƒ‰ใ‚ญใƒฃใ‚นใƒˆๅฐ‚้–€ไฝœๅฎถใงใ™ใ€‚"
"่ด่ก†ใŒๆทฑใ„ๅฐ‚้–€็Ÿฅ่ญ˜ใ‚’ๅพ—ใ‚‰ใ‚Œใ‚‹้ซ˜ๅ“่ณชใชๅฏพ่ซ‡ใ‚’ๆ—ฅๆœฌ่ชžใงไฝœๆˆใ—ใพใ™ใ€‚"
"ๅฟ…ใšใŠไบ’ใ„ใซไธๅฏง่ชžใ‚’ไฝฟ็”จใ—ใ€12ๅ›žใฎๅฏพ่ฉฑไบคๆ›ใงๆง‹ๆˆใ—ใฆใใ ใ•ใ„ใ€‚"
"ใ™ในใฆใฎๅฏพ่ฉฑใฏๅฟ…ใšๆ—ฅๆœฌ่ชžใงไฝœๆˆใ—ใ€JSONๅฝขๅผใงใฎใฟๅ›ž็ญ”ใ—ใฆใใ ใ•ใ„ใ€‚"
),
"French": (
"Vous รชtes un cรฉlรจbre scรฉnariste de podcast professionnel franรงais. "
"Crรฉez des discussions de haute qualitรฉ en franรงais qui donnent au public "
"des connaissances professionnelles approfondies. "
"Crรฉez exactement 12 รฉchanges de conversation et rรฉpondez uniquement en format JSON."
),
"German": (
"Sie sind ein berรผhmter professioneller Podcast-Drehbuchautor aus Deutschland. "
"Erstellen Sie hochwertige Diskussionen auf Deutsch, die dem Publikum "
"tiefgreifendes Fachwissen vermitteln. "
"Erstellen Sie genau 12 Gesprรคchsaustausche und antworten Sie nur im JSON-Format."
),
"Spanish": (
"Eres un famoso guionista de podcast profesional espaรฑol. "
"Crea discusiones de alta calidad en espaรฑol que brinden al pรบblico "
"conocimientos profesionales profundos. "
"Crea exactamente 12 intercambios de conversaciรณn y responde solo en formato JSON."
),
"Chinese": (
"ๆ‚จๆ˜ฏไธญๅ›ฝ่‘—ๅ็š„ไธ“ไธšๆ’ญๅฎข็ผ–ๅ‰งใ€‚"
"ๅˆ›ๅปบ้ซ˜่ดจ้‡็š„ไธญๆ–‡่ฎจ่ฎบ๏ผŒไธบ่ง‚ไผ—ๆไพ›ๆทฑๅ…ฅ็š„ไธ“ไธš็Ÿฅ่ฏ†ใ€‚"
"ๅˆ›ๅปบๆฐๅฅฝ12ๆฌกๅฏน่ฏไบคๆข๏ผŒไป…ไปฅJSONๆ ผๅผๅ›ž็ญ”ใ€‚"
),
"Russian": (
"ะ’ั‹ ะธะทะฒะตัั‚ะฝั‹ะน ะฟั€ะพั„ะตััะธะพะฝะฐะปัŒะฝั‹ะน ัั†ะตะฝะฐั€ะธัั‚ ะฟะพะดะบะฐัั‚ะพะฒ ะธะท ะ ะพััะธะธ. "
"ะกะพะทะดะฐะฒะฐะนั‚ะต ะฒั‹ัะพะบะพะบะฐั‡ะตัั‚ะฒะตะฝะฝั‹ะต ะดะธัะบัƒััะธะธ ะฝะฐ ั€ัƒััะบะพะผ ัะทั‹ะบะต, ะบะพั‚ะพั€ั‹ะต ะดะฐัŽั‚ ะฐัƒะดะธั‚ะพั€ะธะธ "
"ะณะปัƒะฑะพะบะธะต ะฟั€ะพั„ะตััะธะพะฝะฐะปัŒะฝั‹ะต ะทะฝะฐะฝะธั. "
"ะกะพะทะดะฐะนั‚ะต ั€ะพะฒะฝะพ 12 ะพะฑะผะตะฝะพะฒ ั€ะฐะทะณะพะฒะพั€ะพะผ ะธ ะพั‚ะฒะตั‡ะฐะนั‚ะต ั‚ะพะปัŒะบะพ ะฒ ั„ะพั€ะผะฐั‚ะต JSON."
)
}
system_message = system_messages.get(language,
f"You are a professional podcast scriptwriter creating high-quality, "
f"insightful discussions in {language}. Create exactly 12 conversation exchanges "
f"with professional expertise. All dialogue must be in {language}. "
f"Respond only in JSON format."
)
agent = LlamaCppAgent(
provider,
system_prompt=system_message,
predefined_messages_formatter_type=chat_template,
debug_output=False
)
settings = provider.get_provider_default_settings()
settings.temperature = 0.75
settings.top_k = 40
settings.top_p = 0.95
settings.max_tokens = self.config.max_tokens
settings.repeat_penalty = 1.1
settings.stream = False
messages = BasicChatHistory()
prompt = self.prompt_builder.build_prompt(text, language, search_context)
response = agent.get_chat_response(
prompt,
llm_sampling_settings=settings,
chat_history=messages,
returns_streaming_generator=False,
print_output=False
)
# JSON ํŒŒ์‹ฑ
pattern = r"\{(?:[^{}]|(?:\{[^{}]*\}))*\}"
json_match = re.search(pattern, response)
if json_match:
conversation_data = json.loads(json_match.group())
return conversation_data
else:
raise ValueError("No valid JSON found in local LLM response")
except Exception as e:
print(f"Local LLM failed: {e}, falling back to legacy local method")
return self.extract_conversation_legacy_local(text, language, progress, search_context)
@spaces.GPU(duration=120)
def extract_conversation_legacy_local(self, text: str, language: str = "English", progress=None, search_context: str = "") -> Dict:
"""Extract conversation using legacy local model"""
try:
self.initialize_legacy_local_mode()
# ์–ธ์–ด๋ณ„ ์‹œ์Šคํ…œ ๋ฉ”์‹œ์ง€๋Š” config_prompts์—์„œ ๊ฐ€์ ธ์˜ด
messages = self.prompt_builder.build_messages_for_local(text, language, search_context)
terminators = [
self.legacy_tokenizer.eos_token_id,
self.legacy_tokenizer.convert_tokens_to_ids("<|eot_id|>")
]
chat_messages = self.legacy_tokenizer.apply_chat_template(
messages, tokenize=False, add_generation_prompt=True
)
model_inputs = self.legacy_tokenizer([chat_messages], return_tensors="pt").to(self.device)
streamer = TextIteratorStreamer(
self.legacy_tokenizer, timeout=10.0, skip_prompt=True, skip_special_tokens=True
)
generate_kwargs = dict(
model_inputs,
streamer=streamer,
max_new_tokens=self.config.max_new_tokens,
do_sample=True,
temperature=0.75,
eos_token_id=terminators,
)
t = Thread(target=self.legacy_local_model.generate, kwargs=generate_kwargs)
t.start()
partial_text = ""
for new_text in streamer:
partial_text += new_text
pattern = r"\{(?:[^{}]|(?:\{[^{}]*\}))*\}"
json_match = re.search(pattern, partial_text)
if json_match:
return json.loads(json_match.group())
else:
raise ValueError("No valid JSON found in legacy local response")
except Exception as e:
print(f"Legacy local model also failed: {e}")
return DefaultConversations.get_conversation(language)
def extract_conversation_api(self, text: str, language: str = "English") -> Dict:
"""Extract conversation using API"""
if not self.llm_client:
raise RuntimeError("API mode not initialized")
try:
# ๊ฒ€์ƒ‰ ์ปจํ…์ŠคํŠธ ์ƒ์„ฑ
search_context = ""
if BRAVE_KEY and not text.startswith("Keyword-based content:"):
try:
keywords = extract_keywords_for_search(text, language)
if keywords:
search_query = keywords[0] if language == "Korean" else f"{keywords[0]} latest news"
search_context = format_search_results(search_query)
print(f"Search context added for: {search_query}")
except Exception as e:
print(f"Search failed, continuing without context: {e}")
# ๋ฉ”์‹œ์ง€ ๋นŒ๋“œ
messages = self.prompt_builder.build_messages_for_local(text, language, search_context)
chat_completion = self.llm_client.chat.completions.create(
messages=messages,
model=self.config.api_model_name,
temperature=0.75,
)
pattern = r"\{(?:[^{}]|(?:\{[^{}]*\}))*\}"
json_match = re.search(pattern, chat_completion.choices[0].message.content)
if not json_match:
raise ValueError("No valid JSON found in response")
return json.loads(json_match.group())
except Exception as e:
raise RuntimeError(f"Failed to extract conversation: {e}")
def parse_conversation_text(self, conversation_text: str) -> Dict:
"""Parse conversation text back to JSON format"""
lines = conversation_text.strip().split('\n')
conversation_data = {"conversation": []}
for line in lines:
if ':' in line:
speaker, text = line.split(':', 1)
conversation_data["conversation"].append({
"speaker": speaker.strip(),
"text": text.strip()
})
return conversation_data
async def text_to_speech_edge(self, conversation_json: Dict, language: str = "English") -> Tuple[str, str]:
"""Convert text to speech using Edge TTS"""
output_dir = Path(self._create_output_directory())
filenames = []
try:
# ์–ธ์–ด๋ณ„ ์Œ์„ฑ ์„ค์ •
voices = EDGE_TTS_VOICES.get(language, EDGE_TTS_VOICES["English"])
for i, turn in enumerate(conversation_json["conversation"]):
filename = output_dir / f"output_{i}.wav"
voice = voices[i % len(voices)]
tmp_path = await self._generate_audio_edge(turn["text"], voice)
os.rename(tmp_path, filename)
filenames.append(str(filename))
# Combine audio files
final_output = os.path.join(output_dir, "combined_output.wav")
self._combine_audio_files(filenames, final_output)
# Generate conversation text
conversation_text = "\n".join(
f"{turn.get('speaker', f'Speaker {i+1}')}: {turn['text']}"
for i, turn in enumerate(conversation_json["conversation"])
)
return final_output, conversation_text
except Exception as e:
raise RuntimeError(f"Failed to convert text to speech: {e}")
async def _generate_audio_edge(self, text: str, voice: str) -> str:
"""Generate audio using Edge TTS"""
if not text.strip():
raise ValueError("Text cannot be empty")
voice_short_name = voice.split(" - ")[0] if " - " in voice else voice
communicate = edge_tts.Communicate(text, voice_short_name)
with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as tmp_file:
tmp_path = tmp_file.name
await communicate.save(tmp_path)
return tmp_path
@spaces.GPU(duration=60)
def text_to_speech_spark(self, conversation_json: Dict, language: str = "English", progress=None) -> Tuple[str, str]:
"""Convert text to speech using Spark TTS CLI"""
if not SPARK_AVAILABLE or not self.spark_model_dir:
raise RuntimeError("Spark TTS not available")
try:
output_dir = self._create_output_directory()
audio_files = []
# Create different voice characteristics for different speakers
speaker1, speaker2 = self.prompt_builder.get_speaker_names(language)
if language == "Korean":
voice_configs = [
{"prompt_text": f"์•ˆ๋…•ํ•˜์„ธ์š”, ์˜ค๋Š˜ ํŒŸ์บ์ŠคํŠธ ์ง„ํ–‰์„ ๋งก์€ {speaker1}์ž…๋‹ˆ๋‹ค.", "gender": "male"},
{"prompt_text": f"์•ˆ๋…•ํ•˜์„ธ์š”, ์ €๋Š” ์˜ค๋Š˜ ์ด ์ฃผ์ œ์— ๋Œ€ํ•ด ์„ค๋ช…๋“œ๋ฆด {speaker2}์ž…๋‹ˆ๋‹ค.", "gender": "male"}
]
else:
voice_configs = [
{"prompt_text": f"Hello everyone, I'm {speaker1}, your host for today's podcast.", "gender": "male"},
{"prompt_text": f"Hi, I'm {speaker2}. I'm excited to share my insights with you.", "gender": "male"}
]
for i, turn in enumerate(conversation_json["conversation"]):
text = turn["text"]
if not text.strip():
continue
voice_config = voice_configs[i % len(voice_configs)]
output_file = os.path.join(output_dir, f"spark_output_{i}.wav")
cmd = [
"python", "-m", "cli.inference",
"--text", text,
"--device", "0" if torch.cuda.is_available() else "cpu",
"--save_dir", output_dir,
"--model_dir", self.spark_model_dir,
"--prompt_text", voice_config["prompt_text"],
"--output_name", f"spark_output_{i}.wav"
]
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=60,
cwd="."
)
if result.returncode == 0:
audio_files.append(output_file)
else:
print(f"Spark TTS error for turn {i}: {result.stderr}")
silence = np.zeros(int(22050 * 1.0))
sf.write(output_file, silence, 22050)
audio_files.append(output_file)
except subprocess.TimeoutExpired:
print(f"Spark TTS timeout for turn {i}")
silence = np.zeros(int(22050 * 1.0))
sf.write(output_file, silence, 22050)
audio_files.append(output_file)
except Exception as e:
print(f"Error running Spark TTS for turn {i}: {e}")
silence = np.zeros(int(22050 * 1.0))
sf.write(output_file, silence, 22050)
audio_files.append(output_file)
# Combine all audio files
if audio_files:
final_output = os.path.join(output_dir, "spark_combined.wav")
self._combine_audio_files(audio_files, final_output)
else:
raise RuntimeError("No audio files generated")
conversation_text = "\n".join(
f"{turn.get('speaker', f'Speaker {i+1}')}: {turn['text']}"
for i, turn in enumerate(conversation_json["conversation"])
)
return final_output, conversation_text
except Exception as e:
raise RuntimeError(f"Failed to convert text to speech with Spark TTS: {e}")
@spaces.GPU(duration=60)
def text_to_speech_melo(self, conversation_json: Dict, progress=None) -> Tuple[str, str]:
"""Convert text to speech using MeloTTS"""
if not MELO_AVAILABLE or not self.melo_models:
raise RuntimeError("MeloTTS not available")
speakers = ["EN-Default", "EN-US"]
combined_audio = AudioSegment.empty()
for i, turn in enumerate(conversation_json["conversation"]):
bio = io.BytesIO()
text = turn["text"]
speaker = speakers[i % 2]
speaker_id = self.melo_models["EN"].hps.data.spk2id[speaker]
self.melo_models["EN"].tts_to_file(
text, speaker_id, bio, speed=1.0,
pbar=progress.tqdm if progress else None,
format="wav"
)
bio.seek(0)
audio_segment = AudioSegment.from_file(bio, format="wav")
combined_audio += audio_segment
final_audio_path = "melo_podcast.mp3"
combined_audio.export(final_audio_path, format="mp3")
conversation_text = "\n".join(
f"{turn.get('speaker', f'Speaker {i+1}')}: {turn['text']}"
for i, turn in enumerate(