Spaces:
Running
on
Zero
Running
on
Zero
| import spaces | |
| import gradio as gr | |
| import os | |
| import asyncio | |
| import torch | |
| import io | |
| import json | |
| import re | |
| import httpx | |
| import tempfile | |
| import wave | |
| import base64 | |
| import numpy as np | |
| import soundfile as sf | |
| import subprocess | |
| import shutil | |
| import requests | |
| import logging | |
| from datetime import datetime, timedelta | |
| from typing import List, Tuple, Dict, Optional | |
| from pathlib import Path | |
| from threading import Thread | |
| from dotenv import load_dotenv | |
| # PDF processing imports | |
| from langchain_community.document_loaders import PyPDFLoader | |
| # Edge TTS imports | |
| import edge_tts | |
| from pydub import AudioSegment | |
| # OpenAI imports | |
| from openai import OpenAI | |
| # Transformers imports (for legacy local mode) | |
| from transformers import ( | |
| AutoModelForCausalLM, | |
| AutoTokenizer, | |
| TextIteratorStreamer, | |
| BitsAndBytesConfig, | |
| ) | |
| # Llama CPP imports (for new local mode) | |
| try: | |
| from llama_cpp import Llama | |
| from llama_cpp_agent import LlamaCppAgent, MessagesFormatterType | |
| from llama_cpp_agent.providers import LlamaCppPythonProvider | |
| from llama_cpp_agent.chat_history import BasicChatHistory | |
| from llama_cpp_agent.chat_history.messages import Roles | |
| from huggingface_hub import hf_hub_download | |
| LLAMA_CPP_AVAILABLE = True | |
| except ImportError: | |
| LLAMA_CPP_AVAILABLE = False | |
| # Spark TTS imports | |
| try: | |
| from huggingface_hub import snapshot_download | |
| SPARK_AVAILABLE = True | |
| except: | |
| SPARK_AVAILABLE = False | |
| # MeloTTS imports (for local mode) | |
| try: | |
| # unidic ๋ค์ด๋ก๋๋ฅผ ์กฐ๊ฑด๋ถ๋ก ์ฒ๋ฆฌ | |
| if not os.path.exists("/usr/local/lib/python3.10/site-packages/unidic"): | |
| try: | |
| os.system("python -m unidic download") | |
| except: | |
| pass | |
| from melo.api import TTS as MeloTTS | |
| MELO_AVAILABLE = True | |
| except: | |
| MELO_AVAILABLE = False | |
| # Import config and prompts | |
| from config_prompts import ( | |
| ConversationConfig, | |
| PromptBuilder, | |
| DefaultConversations, | |
| EDGE_TTS_ONLY_LANGUAGES, | |
| EDGE_TTS_VOICES | |
| ) | |
| load_dotenv() | |
| # Brave Search API ์ค์ | |
| BRAVE_KEY = os.getenv("BSEARCH_API") | |
| BRAVE_ENDPOINT = "https://api.search.brave.com/res/v1/web/search" | |
| def brave_search(query: str, count: int = 8, freshness_days: int | None = None): | |
| """Brave Search API๋ฅผ ์ฌ์ฉํ์ฌ ์ต์ ์ ๋ณด ๊ฒ์""" | |
| if not BRAVE_KEY: | |
| return [] | |
| params = {"q": query, "count": str(count)} | |
| if freshness_days: | |
| dt_from = (datetime.utcnow() - timedelta(days=freshness_days)).strftime("%Y-%m-%d") | |
| params["freshness"] = dt_from | |
| try: | |
| r = requests.get( | |
| BRAVE_ENDPOINT, | |
| headers={"Accept": "application/json", "X-Subscription-Token": BRAVE_KEY}, | |
| params=params, | |
| timeout=15 | |
| ) | |
| raw = r.json().get("web", {}).get("results") or [] | |
| return [{ | |
| "title": r.get("title", ""), | |
| "url": r.get("url", r.get("link", "")), | |
| "snippet": r.get("description", r.get("text", "")), | |
| "host": re.sub(r"https?://(www\.)?", "", r.get("url", "")).split("/")[0] | |
| } for r in raw[:count]] | |
| except Exception as e: | |
| logging.error(f"Brave search error: {e}") | |
| return [] | |
| def format_search_results(query: str, for_keyword: bool = False) -> str: | |
| """๊ฒ์ ๊ฒฐ๊ณผ๋ฅผ ํฌ๋งทํ ํ์ฌ ๋ฐํ""" | |
| # ํค์๋ ๊ฒ์์ ๊ฒฝ์ฐ ๋ ๋ง์ ๊ฒฐ๊ณผ ์ฌ์ฉ | |
| count = 5 if for_keyword else 3 | |
| rows = brave_search(query, count, freshness_days=7 if not for_keyword else None) | |
| if not rows: | |
| return "" | |
| results = [] | |
| # ํค์๋ ๊ฒ์์ ๊ฒฝ์ฐ ๋ ์์ธํ ์ ๋ณด ํฌํจ | |
| max_results = 4 if for_keyword else 2 | |
| for r in rows[:max_results]: | |
| if for_keyword: | |
| # ํค์๋ ๊ฒ์์ ๋ ๊ธด ์ค๋ํซ ์ฌ์ฉ | |
| snippet = r['snippet'][:200] + "..." if len(r['snippet']) > 200 else r['snippet'] | |
| results.append(f"**{r['title']}**\n{snippet}\nSource: {r['host']}") | |
| else: | |
| # ์ผ๋ฐ ๊ฒ์์ ์งง์ ์ค๋ํซ | |
| snippet = r['snippet'][:100] + "..." if len(r['snippet']) > 100 else r['snippet'] | |
| results.append(f"- {r['title']}: {snippet}") | |
| return "\n\n".join(results) + "\n" | |
| def extract_keywords_for_search(text: str, language: str = "English") -> List[str]: | |
| """ํ ์คํธ์์ ๊ฒ์ํ ํค์๋ ์ถ์ถ (๊ฐ์ )""" | |
| # ํ ์คํธ ์๋ถ๋ถ๋ง ์ฌ์ฉ (๋๋ฌด ๋ง์ ํ ์คํธ ์ฒ๋ฆฌ ๋ฐฉ์ง) | |
| text_sample = text[:500] | |
| if language == "Korean": | |
| import re | |
| # ํ๊ตญ์ด ๋ช ์ฌ ์ถ์ถ (2๊ธ์ ์ด์) | |
| keywords = re.findall(r'[๊ฐ-ํฃ]{2,}', text_sample) | |
| # ์ค๋ณต ์ ๊ฑฐํ๊ณ ๊ฐ์ฅ ๊ธด ๋จ์ด 1๊ฐ๋ง ์ ํ | |
| unique_keywords = list(dict.fromkeys(keywords)) | |
| # ๊ธธ์ด ์์ผ๋ก ์ ๋ ฌํ๊ณ ๊ฐ์ฅ ์๋ฏธ์์ ๊ฒ ๊ฐ์ ๋จ์ด ์ ํ | |
| unique_keywords.sort(key=len, reverse=True) | |
| return unique_keywords[:1] # 1๊ฐ๋ง ๋ฐํ | |
| else: | |
| # ์์ด๋ ๋๋ฌธ์๋ก ์์ํ๋ ๋จ์ด ์ค ๊ฐ์ฅ ๊ธด ๊ฒ 1๊ฐ | |
| words = text_sample.split() | |
| keywords = [word.strip('.,!?;:') for word in words | |
| if len(word) > 4 and word[0].isupper()] | |
| if keywords: | |
| return [max(keywords, key=len)] # ๊ฐ์ฅ ๊ธด ๋จ์ด 1๊ฐ | |
| return [] | |
| def search_and_compile_content(keyword: str, language: str = "English") -> str: | |
| """ํค์๋๋ก ๊ฒ์ํ์ฌ ์ถฉ๋ถํ ์ฝํ ์ธ ์ปดํ์ผ""" | |
| if not BRAVE_KEY: | |
| # API ์์ ๋๋ ๊ธฐ๋ณธ ์ฝํ ์ธ ์์ฑ | |
| if language == "Korean": | |
| return f""" | |
| '{keyword}'์ ๋ํ ์ข ํฉ์ ์ธ ์ ๋ณด: | |
| {keyword}๋ ํ๋ ์ฌํ์์ ๋งค์ฐ ์ค์ํ ์ฃผ์ ์ ๋๋ค. | |
| ์ด ์ฃผ์ ๋ ๋ค์ํ ์ธก๋ฉด์์ ์ฐ๋ฆฌ์ ์ถ์ ์ํฅ์ ๋ฏธ์น๊ณ ์์ผ๋ฉฐ, | |
| ์ต๊ทผ ๋ค์ด ๋์ฑ ์ฃผ๋ชฉ๋ฐ๊ณ ์์ต๋๋ค. | |
| ์ฃผ์ ํน์ง: | |
| 1. ๊ธฐ์ ์ ๋ฐ์ ๊ณผ ํ์ | |
| 2. ์ฌํ์ ์ํฅ๊ณผ ๋ณํ | |
| 3. ๋ฏธ๋ ์ ๋ง๊ณผ ๊ฐ๋ฅ์ฑ | |
| 4. ์ค์ฉ์ ํ์ฉ ๋ฐฉ์ | |
| 5. ๊ธ๋ก๋ฒ ํธ๋ ๋์ ๋ํฅ | |
| ์ ๋ฌธ๊ฐ๋ค์ {keyword}๊ฐ ์์ผ๋ก ๋์ฑ ์ค์ํด์ง ๊ฒ์ผ๋ก ์์ํ๊ณ ์์ผ๋ฉฐ, | |
| ์ด์ ๋ํ ๊น์ด ์๋ ์ดํด๊ฐ ํ์ํ ์์ ์ ๋๋ค. | |
| """ | |
| else: | |
| return f""" | |
| Comprehensive information about '{keyword}': | |
| {keyword} is a significant topic in modern society. | |
| This subject impacts our lives in various ways and has been | |
| gaining increasing attention recently. | |
| Key aspects: | |
| 1. Technological advancement and innovation | |
| 2. Social impact and changes | |
| 3. Future prospects and possibilities | |
| 4. Practical applications | |
| 5. Global trends and developments | |
| Experts predict that {keyword} will become even more important, | |
| and it's crucial to develop a deep understanding of this topic. | |
| """ | |
| # ์ธ์ด์ ๋ฐ๋ฅธ ๋ค์ํ ๊ฒ์ ์ฟผ๋ฆฌ | |
| if language == "Korean": | |
| queries = [ | |
| f"{keyword} ์ต์ ๋ด์ค 2024", | |
| f"{keyword} ์ ๋ณด ์ค๋ช ", | |
| f"{keyword} ํธ๋ ๋ ์ ๋ง", | |
| f"{keyword} ์ฅ์ ๋จ์ ", | |
| f"{keyword} ํ์ฉ ๋ฐฉ๋ฒ", | |
| f"{keyword} ์ ๋ฌธ๊ฐ ์๊ฒฌ" | |
| ] | |
| else: | |
| queries = [ | |
| f"{keyword} latest news 2024", | |
| f"{keyword} explained comprehensive", | |
| f"{keyword} trends forecast", | |
| f"{keyword} advantages disadvantages", | |
| f"{keyword} how to use", | |
| f"{keyword} expert opinions" | |
| ] | |
| all_content = [] | |
| total_content_length = 0 | |
| for query in queries: | |
| results = brave_search(query, count=5) # ๋ ๋ง์ ๊ฒฐ๊ณผ ๊ฐ์ ธ์ค๊ธฐ | |
| for r in results[:3]: # ๊ฐ ์ฟผ๋ฆฌ๋น ์์ 3๊ฐ | |
| content = f"**{r['title']}**\n{r['snippet']}\nSource: {r['host']}\n" | |
| all_content.append(content) | |
| total_content_length += len(r['snippet']) | |
| # ์ฝํ ์ธ ๊ฐ ๋ถ์กฑํ๋ฉด ์ถ๊ฐ ์์ฑ | |
| if total_content_length < 1000: # ์ต์ 1000์ ํ๋ณด | |
| if language == "Korean": | |
| additional_content = f""" | |
| ์ถ๊ฐ ์ ๋ณด: | |
| {keyword}์ ๊ด๋ จ๋ ์ต๊ทผ ๋ํฅ์ ์ดํด๋ณด๋ฉด, ์ด ๋ถ์ผ๋ ๋น ๋ฅด๊ฒ ๋ฐ์ ํ๊ณ ์์ต๋๋ค. | |
| ๋ง์ ์ ๋ฌธ๊ฐ๋ค์ด ์ด ์ฃผ์ ์ ๋ํด ํ๋ฐํ ์ฐ๊ตฌํ๊ณ ์์ผ๋ฉฐ, | |
| ์ค์ํ์์์ ์์ฉ ๊ฐ๋ฅ์ฑ๋ ๊ณ์ ํ๋๋๊ณ ์์ต๋๋ค. | |
| ํนํ ์ฃผ๋ชฉํ ์ ์: | |
| - ๊ธฐ์ ํ์ ์ ๊ฐ์ํ | |
| - ์ฌ์ฉ์ ๊ฒฝํ์ ๊ฐ์ | |
| - ์ ๊ทผ์ฑ์ ํฅ์ | |
| - ๋น์ฉ ํจ์จ์ฑ ์ฆ๋ | |
| - ๊ธ๋ก๋ฒ ์์ฅ์ ์ฑ์ฅ | |
| ์ด๋ฌํ ์์๋ค์ด {keyword}์ ๋ฏธ๋๋ฅผ ๋์ฑ ๋ฐ๊ฒ ๋ง๋ค๊ณ ์์ต๋๋ค. | |
| """ | |
| else: | |
| additional_content = f""" | |
| Additional insights: | |
| Recent developments in {keyword} show rapid advancement in this field. | |
| Many experts are actively researching this topic, and its practical | |
| applications continue to expand. | |
| Key points to note: | |
| - Accelerating technological innovation | |
| - Improving user experience | |
| - Enhanced accessibility | |
| - Increased cost efficiency | |
| - Growing global market | |
| These factors are making the future of {keyword} increasingly promising. | |
| """ | |
| all_content.append(additional_content) | |
| # ์ปดํ์ผ๋ ์ฝํ ์ธ ๋ฐํ | |
| compiled = "\n\n".join(all_content) | |
| # ํค์๋ ๊ธฐ๋ฐ ์๊ฐ | |
| if language == "Korean": | |
| intro = f"### '{keyword}'์ ๋ํ ์ข ํฉ์ ์ธ ์ ๋ณด์ ์ต์ ๋ํฅ:\n\n" | |
| else: | |
| intro = f"### Comprehensive information and latest trends about '{keyword}':\n\n" | |
| return intro + compiled | |
| class UnifiedAudioConverter: | |
| def __init__(self, config: ConversationConfig): | |
| self.config = config | |
| self.llm_client = None | |
| self.legacy_local_model = None | |
| self.legacy_tokenizer = None | |
| # ์๋ก์ด ๋ก์ปฌ LLM ๊ด๋ จ | |
| self.local_llm = None | |
| self.local_llm_model = None | |
| self.melo_models = None | |
| self.spark_model_dir = None | |
| self.device = "cuda" if torch.cuda.is_available() else "cpu" | |
| # ํ๋กฌํํธ ๋น๋ ์ถ๊ฐ | |
| self.prompt_builder = PromptBuilder() | |
| def initialize_api_mode(self, api_key: str): | |
| """Initialize API mode with Together API""" | |
| self.llm_client = OpenAI(api_key=api_key, base_url="https://api.together.xyz/v1") | |
| def initialize_local_mode(self): | |
| """Initialize new local mode with Llama CPP""" | |
| if not LLAMA_CPP_AVAILABLE: | |
| raise RuntimeError("Llama CPP dependencies not available. Please install llama-cpp-python and llama-cpp-agent.") | |
| if self.local_llm is None or self.local_llm_model != self.config.local_model_name: | |
| try: | |
| # ๋ชจ๋ธ ๋ค์ด๋ก๋ | |
| model_path = hf_hub_download( | |
| repo_id=self.config.local_model_repo, | |
| filename=self.config.local_model_name, | |
| local_dir="./models" | |
| ) | |
| model_path_local = os.path.join("./models", self.config.local_model_name) | |
| if not os.path.exists(model_path_local): | |
| raise RuntimeError(f"Model file not found at {model_path_local}") | |
| # Llama ๋ชจ๋ธ ์ด๊ธฐํ | |
| self.local_llm = Llama( | |
| model_path=model_path_local, | |
| flash_attn=True, | |
| n_gpu_layers=81 if torch.cuda.is_available() else 0, | |
| n_batch=1024, | |
| n_ctx=16384, | |
| ) | |
| self.local_llm_model = self.config.local_model_name | |
| print(f"Local LLM initialized: {model_path_local}") | |
| except Exception as e: | |
| print(f"Failed to initialize local LLM: {e}") | |
| raise RuntimeError(f"Failed to initialize local LLM: {e}") | |
| def initialize_legacy_local_mode(self): | |
| """Initialize legacy local mode with Hugging Face model (fallback)""" | |
| if self.legacy_local_model is None: | |
| quantization_config = BitsAndBytesConfig( | |
| load_in_4bit=True, | |
| bnb_4bit_compute_dtype=torch.float16 | |
| ) | |
| self.legacy_local_model = AutoModelForCausalLM.from_pretrained( | |
| self.config.legacy_local_model_name, | |
| quantization_config=quantization_config | |
| ) | |
| self.legacy_tokenizer = AutoTokenizer.from_pretrained( | |
| self.config.legacy_local_model_name, | |
| revision='8ab73a6800796d84448bc936db9bac5ad9f984ae' | |
| ) | |
| def initialize_spark_tts(self): | |
| """Initialize Spark TTS model by downloading if needed""" | |
| if not SPARK_AVAILABLE: | |
| raise RuntimeError("Spark TTS dependencies not available") | |
| model_dir = "pretrained_models/Spark-TTS-0.5B" | |
| # Check if model exists, if not download it | |
| if not os.path.exists(model_dir): | |
| print("Downloading Spark-TTS model...") | |
| try: | |
| os.makedirs("pretrained_models", exist_ok=True) | |
| snapshot_download( | |
| "SparkAudio/Spark-TTS-0.5B", | |
| local_dir=model_dir | |
| ) | |
| print("Spark-TTS model downloaded successfully") | |
| except Exception as e: | |
| raise RuntimeError(f"Failed to download Spark-TTS model: {e}") | |
| self.spark_model_dir = model_dir | |
| # Check if we have the CLI inference script | |
| if not os.path.exists("cli/inference.py"): | |
| print("Warning: Spark-TTS CLI not found. Please clone the Spark-TTS repository.") | |
| def initialize_melo_tts(self): | |
| """Initialize MeloTTS models""" | |
| if MELO_AVAILABLE and self.melo_models is None: | |
| self.melo_models = {"EN": MeloTTS(language="EN", device=self.device)} | |
| def fetch_text(self, url: str) -> str: | |
| """Fetch text content from URL""" | |
| if not url: | |
| raise ValueError("URL cannot be empty") | |
| if not url.startswith("http://") and not url.startswith("https://"): | |
| raise ValueError("URL must start with 'http://' or 'https://'") | |
| full_url = f"{self.config.prefix_url}{url}" | |
| try: | |
| response = httpx.get(full_url, timeout=60.0) | |
| response.raise_for_status() | |
| return response.text | |
| except httpx.HTTPError as e: | |
| raise RuntimeError(f"Failed to fetch URL: {e}") | |
| def extract_text_from_pdf(self, pdf_file) -> str: | |
| """Extract text content from PDF file""" | |
| try: | |
| # Gradio returns file path, not file object | |
| if isinstance(pdf_file, str): | |
| pdf_path = pdf_file | |
| else: | |
| # If it's a file object (shouldn't happen with Gradio) | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tmp_file: | |
| tmp_file.write(pdf_file.read()) | |
| pdf_path = tmp_file.name | |
| # PDF ๋ก๋ ๋ฐ ํ ์คํธ ์ถ์ถ | |
| loader = PyPDFLoader(pdf_path) | |
| pages = loader.load() | |
| # ๋ชจ๋ ํ์ด์ง์ ํ ์คํธ๋ฅผ ๊ฒฐํฉ | |
| text = "\n".join([page.page_content for page in pages]) | |
| # ์์ ํ์ผ์ธ ๊ฒฝ์ฐ ์ญ์ | |
| if not isinstance(pdf_file, str) and os.path.exists(pdf_path): | |
| os.unlink(pdf_path) | |
| return text | |
| except Exception as e: | |
| raise RuntimeError(f"Failed to extract text from PDF: {e}") | |
| def _get_messages_formatter_type(self, model_name): | |
| """Get appropriate message formatter for the model""" | |
| if "Mistral" in model_name or "BitSix" in model_name: | |
| return MessagesFormatterType.CHATML | |
| else: | |
| return MessagesFormatterType.LLAMA_3 | |
| def extract_conversation_local(self, text: str, language: str = "English", progress=None) -> Dict: | |
| """Extract conversation using new local LLM with enhanced professional style""" | |
| try: | |
| # ๊ฒ์ ์ปจํ ์คํธ ์์ฑ (ํค์๋ ๊ธฐ๋ฐ์ด ์๋ ๊ฒฝ์ฐ) | |
| search_context = "" | |
| if BRAVE_KEY and not text.startswith("Keyword-based content:"): | |
| try: | |
| keywords = extract_keywords_for_search(text, language) | |
| if keywords: | |
| search_query = keywords[0] if language == "Korean" else f"{keywords[0]} latest news" | |
| search_context = format_search_results(search_query) | |
| print(f"Search context added for: {search_query}") | |
| except Exception as e: | |
| print(f"Search failed, continuing without context: {e}") | |
| # ๋จผ์ ์๋ก์ด ๋ก์ปฌ LLM ์๋ | |
| self.initialize_local_mode() | |
| chat_template = self._get_messages_formatter_type(self.config.local_model_name) | |
| provider = LlamaCppPythonProvider(self.local_llm) | |
| # ์ธ์ด๋ณ ์์คํ ๋ฉ์์ง | |
| system_messages = { | |
| "Korean": ( | |
| "๋น์ ์ ํ๊ตญ์ ์ ๋ช ํ์บ์คํธ ์ ๋ฌธ ์๊ฐ์ ๋๋ค. " | |
| "์ฒญ์ทจ์๋ค์ด ๊น์ด ์๋ ์ ๋ฌธ ์ง์์ ์ป์ ์ ์๋ ๊ณ ํ์ง ๋๋ด์ ํ๊ตญ์ด๋ก ๋ง๋ญ๋๋ค. " | |
| "๋ฐ๋์ ์๋ก ์กด๋๋ง์ ์ฌ์ฉํ๋ฉฐ, 12ํ์ ๋ํ ๊ตํ์ผ๋ก ๊ตฌ์ฑํ์ธ์. " | |
| "๋ชจ๋ ๋ํ๋ ๋ฐ๋์ ํ๊ตญ์ด๋ก ์์ฑํ๊ณ JSON ํ์์ผ๋ก๋ง ์๋ตํ์ธ์." | |
| ), | |
| "Japanese": ( | |
| "ใใชใใฏๆฅๆฌใฎๆๅใชใใใใญใฃในใๅฐ้ไฝๅฎถใงใใ" | |
| "่ด่กใๆทฑใๅฐ้็ฅ่ญใๅพใใใ้ซๅ่ณชใชๅฏพ่ซใๆฅๆฌ่ชใงไฝๆใใพใใ" | |
| "ๅฟ ใใไบใใซไธๅฏง่ชใไฝฟ็จใใ12ๅใฎๅฏพ่ฉฑไบคๆใงๆงๆใใฆใใ ใใใ" | |
| "ใในใฆใฎๅฏพ่ฉฑใฏๅฟ ใๆฅๆฌ่ชใงไฝๆใใJSONๅฝขๅผใงใฎใฟๅ็ญใใฆใใ ใใใ" | |
| ), | |
| "French": ( | |
| "Vous รชtes un cรฉlรจbre scรฉnariste de podcast professionnel franรงais. " | |
| "Crรฉez des discussions de haute qualitรฉ en franรงais qui donnent au public " | |
| "des connaissances professionnelles approfondies. " | |
| "Crรฉez exactement 12 รฉchanges de conversation et rรฉpondez uniquement en format JSON." | |
| ), | |
| "German": ( | |
| "Sie sind ein berรผhmter professioneller Podcast-Drehbuchautor aus Deutschland. " | |
| "Erstellen Sie hochwertige Diskussionen auf Deutsch, die dem Publikum " | |
| "tiefgreifendes Fachwissen vermitteln. " | |
| "Erstellen Sie genau 12 Gesprรคchsaustausche und antworten Sie nur im JSON-Format." | |
| ), | |
| "Spanish": ( | |
| "Eres un famoso guionista de podcast profesional espaรฑol. " | |
| "Crea discusiones de alta calidad en espaรฑol que brinden al pรบblico " | |
| "conocimientos profesionales profundos. " | |
| "Crea exactamente 12 intercambios de conversaciรณn y responde solo en formato JSON." | |
| ), | |
| "Chinese": ( | |
| "ๆจๆฏไธญๅฝ่ๅ็ไธไธๆญๅฎข็ผๅงใ" | |
| "ๅๅปบ้ซ่ดจ้็ไธญๆ่ฎจ่ฎบ๏ผไธบ่งไผๆไพๆทฑๅ ฅ็ไธไธ็ฅ่ฏใ" | |
| "ๅๅปบๆฐๅฅฝ12ๆฌกๅฏน่ฏไบคๆข๏ผไป ไปฅJSONๆ ผๅผๅ็ญใ" | |
| ), | |
| "Russian": ( | |
| "ะั ะธะทะฒะตััะฝัะน ะฟัะพัะตััะธะพะฝะฐะปัะฝัะน ััะตะฝะฐัะธัั ะฟะพะดะบะฐััะพะฒ ะธะท ะ ะพััะธะธ. " | |
| "ะกะพะทะดะฐะฒะฐะนัะต ะฒััะพะบะพะบะฐัะตััะฒะตะฝะฝัะต ะดะธัะบัััะธะธ ะฝะฐ ััััะบะพะผ ัะทัะบะต, ะบะพัะพััะต ะดะฐัั ะฐัะดะธัะพัะธะธ " | |
| "ะณะปัะฑะพะบะธะต ะฟัะพัะตััะธะพะฝะฐะปัะฝัะต ะทะฝะฐะฝะธั. " | |
| "ะกะพะทะดะฐะนัะต ัะพะฒะฝะพ 12 ะพะฑะผะตะฝะพะฒ ัะฐะทะณะพะฒะพัะพะผ ะธ ะพัะฒะตัะฐะนัะต ัะพะปัะบะพ ะฒ ัะพัะผะฐัะต JSON." | |
| ) | |
| } | |
| system_message = system_messages.get(language, | |
| f"You are a professional podcast scriptwriter creating high-quality, " | |
| f"insightful discussions in {language}. Create exactly 12 conversation exchanges " | |
| f"with professional expertise. All dialogue must be in {language}. " | |
| f"Respond only in JSON format." | |
| ) | |
| agent = LlamaCppAgent( | |
| provider, | |
| system_prompt=system_message, | |
| predefined_messages_formatter_type=chat_template, | |
| debug_output=False | |
| ) | |
| settings = provider.get_provider_default_settings() | |
| settings.temperature = 0.75 | |
| settings.top_k = 40 | |
| settings.top_p = 0.95 | |
| settings.max_tokens = self.config.max_tokens | |
| settings.repeat_penalty = 1.1 | |
| settings.stream = False | |
| messages = BasicChatHistory() | |
| prompt = self.prompt_builder.build_prompt(text, language, search_context) | |
| response = agent.get_chat_response( | |
| prompt, | |
| llm_sampling_settings=settings, | |
| chat_history=messages, | |
| returns_streaming_generator=False, | |
| print_output=False | |
| ) | |
| # JSON ํ์ฑ | |
| pattern = r"\{(?:[^{}]|(?:\{[^{}]*\}))*\}" | |
| json_match = re.search(pattern, response) | |
| if json_match: | |
| conversation_data = json.loads(json_match.group()) | |
| return conversation_data | |
| else: | |
| raise ValueError("No valid JSON found in local LLM response") | |
| except Exception as e: | |
| print(f"Local LLM failed: {e}, falling back to legacy local method") | |
| return self.extract_conversation_legacy_local(text, language, progress, search_context) | |
| def extract_conversation_legacy_local(self, text: str, language: str = "English", progress=None, search_context: str = "") -> Dict: | |
| """Extract conversation using legacy local model""" | |
| try: | |
| self.initialize_legacy_local_mode() | |
| # ์ธ์ด๋ณ ์์คํ ๋ฉ์์ง๋ config_prompts์์ ๊ฐ์ ธ์ด | |
| messages = self.prompt_builder.build_messages_for_local(text, language, search_context) | |
| terminators = [ | |
| self.legacy_tokenizer.eos_token_id, | |
| self.legacy_tokenizer.convert_tokens_to_ids("<|eot_id|>") | |
| ] | |
| chat_messages = self.legacy_tokenizer.apply_chat_template( | |
| messages, tokenize=False, add_generation_prompt=True | |
| ) | |
| model_inputs = self.legacy_tokenizer([chat_messages], return_tensors="pt").to(self.device) | |
| streamer = TextIteratorStreamer( | |
| self.legacy_tokenizer, timeout=10.0, skip_prompt=True, skip_special_tokens=True | |
| ) | |
| generate_kwargs = dict( | |
| model_inputs, | |
| streamer=streamer, | |
| max_new_tokens=self.config.max_new_tokens, | |
| do_sample=True, | |
| temperature=0.75, | |
| eos_token_id=terminators, | |
| ) | |
| t = Thread(target=self.legacy_local_model.generate, kwargs=generate_kwargs) | |
| t.start() | |
| partial_text = "" | |
| for new_text in streamer: | |
| partial_text += new_text | |
| pattern = r"\{(?:[^{}]|(?:\{[^{}]*\}))*\}" | |
| json_match = re.search(pattern, partial_text) | |
| if json_match: | |
| return json.loads(json_match.group()) | |
| else: | |
| raise ValueError("No valid JSON found in legacy local response") | |
| except Exception as e: | |
| print(f"Legacy local model also failed: {e}") | |
| return DefaultConversations.get_conversation(language) | |
| def extract_conversation_api(self, text: str, language: str = "English") -> Dict: | |
| """Extract conversation using API""" | |
| if not self.llm_client: | |
| raise RuntimeError("API mode not initialized") | |
| try: | |
| # ๊ฒ์ ์ปจํ ์คํธ ์์ฑ | |
| search_context = "" | |
| if BRAVE_KEY and not text.startswith("Keyword-based content:"): | |
| try: | |
| keywords = extract_keywords_for_search(text, language) | |
| if keywords: | |
| search_query = keywords[0] if language == "Korean" else f"{keywords[0]} latest news" | |
| search_context = format_search_results(search_query) | |
| print(f"Search context added for: {search_query}") | |
| except Exception as e: | |
| print(f"Search failed, continuing without context: {e}") | |
| # ๋ฉ์์ง ๋น๋ | |
| messages = self.prompt_builder.build_messages_for_local(text, language, search_context) | |
| chat_completion = self.llm_client.chat.completions.create( | |
| messages=messages, | |
| model=self.config.api_model_name, | |
| temperature=0.75, | |
| ) | |
| pattern = r"\{(?:[^{}]|(?:\{[^{}]*\}))*\}" | |
| json_match = re.search(pattern, chat_completion.choices[0].message.content) | |
| if not json_match: | |
| raise ValueError("No valid JSON found in response") | |
| return json.loads(json_match.group()) | |
| except Exception as e: | |
| raise RuntimeError(f"Failed to extract conversation: {e}") | |
| def parse_conversation_text(self, conversation_text: str) -> Dict: | |
| """Parse conversation text back to JSON format""" | |
| lines = conversation_text.strip().split('\n') | |
| conversation_data = {"conversation": []} | |
| for line in lines: | |
| if ':' in line: | |
| speaker, text = line.split(':', 1) | |
| conversation_data["conversation"].append({ | |
| "speaker": speaker.strip(), | |
| "text": text.strip() | |
| }) | |
| return conversation_data | |
| async def text_to_speech_edge(self, conversation_json: Dict, language: str = "English") -> Tuple[str, str]: | |
| """Convert text to speech using Edge TTS""" | |
| output_dir = Path(self._create_output_directory()) | |
| filenames = [] | |
| try: | |
| # ์ธ์ด๋ณ ์์ฑ ์ค์ | |
| voices = EDGE_TTS_VOICES.get(language, EDGE_TTS_VOICES["English"]) | |
| for i, turn in enumerate(conversation_json["conversation"]): | |
| filename = output_dir / f"output_{i}.wav" | |
| voice = voices[i % len(voices)] | |
| tmp_path = await self._generate_audio_edge(turn["text"], voice) | |
| os.rename(tmp_path, filename) | |
| filenames.append(str(filename)) | |
| # Combine audio files | |
| final_output = os.path.join(output_dir, "combined_output.wav") | |
| self._combine_audio_files(filenames, final_output) | |
| # Generate conversation text | |
| conversation_text = "\n".join( | |
| f"{turn.get('speaker', f'Speaker {i+1}')}: {turn['text']}" | |
| for i, turn in enumerate(conversation_json["conversation"]) | |
| ) | |
| return final_output, conversation_text | |
| except Exception as e: | |
| raise RuntimeError(f"Failed to convert text to speech: {e}") | |
| async def _generate_audio_edge(self, text: str, voice: str) -> str: | |
| """Generate audio using Edge TTS""" | |
| if not text.strip(): | |
| raise ValueError("Text cannot be empty") | |
| voice_short_name = voice.split(" - ")[0] if " - " in voice else voice | |
| communicate = edge_tts.Communicate(text, voice_short_name) | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as tmp_file: | |
| tmp_path = tmp_file.name | |
| await communicate.save(tmp_path) | |
| return tmp_path | |
| def text_to_speech_spark(self, conversation_json: Dict, language: str = "English", progress=None) -> Tuple[str, str]: | |
| """Convert text to speech using Spark TTS CLI""" | |
| if not SPARK_AVAILABLE or not self.spark_model_dir: | |
| raise RuntimeError("Spark TTS not available") | |
| try: | |
| output_dir = self._create_output_directory() | |
| audio_files = [] | |
| # Create different voice characteristics for different speakers | |
| speaker1, speaker2 = self.prompt_builder.get_speaker_names(language) | |
| if language == "Korean": | |
| voice_configs = [ | |
| {"prompt_text": f"์๋ ํ์ธ์, ์ค๋ ํ์บ์คํธ ์งํ์ ๋งก์ {speaker1}์ ๋๋ค.", "gender": "male"}, | |
| {"prompt_text": f"์๋ ํ์ธ์, ์ ๋ ์ค๋ ์ด ์ฃผ์ ์ ๋ํด ์ค๋ช ๋๋ฆด {speaker2}์ ๋๋ค.", "gender": "male"} | |
| ] | |
| else: | |
| voice_configs = [ | |
| {"prompt_text": f"Hello everyone, I'm {speaker1}, your host for today's podcast.", "gender": "male"}, | |
| {"prompt_text": f"Hi, I'm {speaker2}. I'm excited to share my insights with you.", "gender": "male"} | |
| ] | |
| for i, turn in enumerate(conversation_json["conversation"]): | |
| text = turn["text"] | |
| if not text.strip(): | |
| continue | |
| voice_config = voice_configs[i % len(voice_configs)] | |
| output_file = os.path.join(output_dir, f"spark_output_{i}.wav") | |
| cmd = [ | |
| "python", "-m", "cli.inference", | |
| "--text", text, | |
| "--device", "0" if torch.cuda.is_available() else "cpu", | |
| "--save_dir", output_dir, | |
| "--model_dir", self.spark_model_dir, | |
| "--prompt_text", voice_config["prompt_text"], | |
| "--output_name", f"spark_output_{i}.wav" | |
| ] | |
| try: | |
| result = subprocess.run( | |
| cmd, | |
| capture_output=True, | |
| text=True, | |
| timeout=60, | |
| cwd="." | |
| ) | |
| if result.returncode == 0: | |
| audio_files.append(output_file) | |
| else: | |
| print(f"Spark TTS error for turn {i}: {result.stderr}") | |
| silence = np.zeros(int(22050 * 1.0)) | |
| sf.write(output_file, silence, 22050) | |
| audio_files.append(output_file) | |
| except subprocess.TimeoutExpired: | |
| print(f"Spark TTS timeout for turn {i}") | |
| silence = np.zeros(int(22050 * 1.0)) | |
| sf.write(output_file, silence, 22050) | |
| audio_files.append(output_file) | |
| except Exception as e: | |
| print(f"Error running Spark TTS for turn {i}: {e}") | |
| silence = np.zeros(int(22050 * 1.0)) | |
| sf.write(output_file, silence, 22050) | |
| audio_files.append(output_file) | |
| # Combine all audio files | |
| if audio_files: | |
| final_output = os.path.join(output_dir, "spark_combined.wav") | |
| self._combine_audio_files(audio_files, final_output) | |
| else: | |
| raise RuntimeError("No audio files generated") | |
| conversation_text = "\n".join( | |
| f"{turn.get('speaker', f'Speaker {i+1}')}: {turn['text']}" | |
| for i, turn in enumerate(conversation_json["conversation"]) | |
| ) | |
| return final_output, conversation_text | |
| except Exception as e: | |
| raise RuntimeError(f"Failed to convert text to speech with Spark TTS: {e}") | |
| def text_to_speech_melo(self, conversation_json: Dict, progress=None) -> Tuple[str, str]: | |
| """Convert text to speech using MeloTTS""" | |
| if not MELO_AVAILABLE or not self.melo_models: | |
| raise RuntimeError("MeloTTS not available") | |
| speakers = ["EN-Default", "EN-US"] | |
| combined_audio = AudioSegment.empty() | |
| for i, turn in enumerate(conversation_json["conversation"]): | |
| bio = io.BytesIO() | |
| text = turn["text"] | |
| speaker = speakers[i % 2] | |
| speaker_id = self.melo_models["EN"].hps.data.spk2id[speaker] | |
| self.melo_models["EN"].tts_to_file( | |
| text, speaker_id, bio, speed=1.0, | |
| pbar=progress.tqdm if progress else None, | |
| format="wav" | |
| ) | |
| bio.seek(0) | |
| audio_segment = AudioSegment.from_file(bio, format="wav") | |
| combined_audio += audio_segment | |
| final_audio_path = "melo_podcast.mp3" | |
| combined_audio.export(final_audio_path, format="mp3") | |
| conversation_text = "\n".join( | |
| f"{turn.get('speaker', f'Speaker {i+1}')}: {turn['text']}" | |
| for i, turn in enumerate(import spaces | |
| import gradio as gr | |
| import os | |
| import asyncio | |
| import torch | |
| import io | |
| import json | |
| import re | |
| import httpx | |
| import tempfile | |
| import wave | |
| import base64 | |
| import numpy as np | |
| import soundfile as sf | |
| import subprocess | |
| import shutil | |
| import requests | |
| import logging | |
| from datetime import datetime, timedelta | |
| from typing import List, Tuple, Dict, Optional | |
| from pathlib import Path | |
| from threading import Thread | |
| from dotenv import load_dotenv | |
| # PDF processing imports | |
| from langchain_community.document_loaders import PyPDFLoader | |
| # Edge TTS imports | |
| import edge_tts | |
| from pydub import AudioSegment | |
| # OpenAI imports | |
| from openai import OpenAI | |
| # Transformers imports (for legacy local mode) | |
| from transformers import ( | |
| AutoModelForCausalLM, | |
| AutoTokenizer, | |
| TextIteratorStreamer, | |
| BitsAndBytesConfig, | |
| ) | |
| # Llama CPP imports (for new local mode) | |
| try: | |
| from llama_cpp import Llama | |
| from llama_cpp_agent import LlamaCppAgent, MessagesFormatterType | |
| from llama_cpp_agent.providers import LlamaCppPythonProvider | |
| from llama_cpp_agent.chat_history import BasicChatHistory | |
| from llama_cpp_agent.chat_history.messages import Roles | |
| from huggingface_hub import hf_hub_download | |
| LLAMA_CPP_AVAILABLE = True | |
| except ImportError: | |
| LLAMA_CPP_AVAILABLE = False | |
| # Spark TTS imports | |
| try: | |
| from huggingface_hub import snapshot_download | |
| SPARK_AVAILABLE = True | |
| except: | |
| SPARK_AVAILABLE = False | |
| # MeloTTS imports (for local mode) | |
| try: | |
| # unidic ๋ค์ด๋ก๋๋ฅผ ์กฐ๊ฑด๋ถ๋ก ์ฒ๋ฆฌ | |
| if not os.path.exists("/usr/local/lib/python3.10/site-packages/unidic"): | |
| try: | |
| os.system("python -m unidic download") | |
| except: | |
| pass | |
| from melo.api import TTS as MeloTTS | |
| MELO_AVAILABLE = True | |
| except: | |
| MELO_AVAILABLE = False | |
| # Import config and prompts | |
| from config_prompts import ( | |
| ConversationConfig, | |
| PromptBuilder, | |
| DefaultConversations, | |
| EDGE_TTS_ONLY_LANGUAGES, | |
| EDGE_TTS_VOICES | |
| ) | |
| load_dotenv() | |
| # Brave Search API ์ค์ | |
| BRAVE_KEY = os.getenv("BSEARCH_API") | |
| BRAVE_ENDPOINT = "https://api.search.brave.com/res/v1/web/search" | |
| def brave_search(query: str, count: int = 8, freshness_days: int | None = None): | |
| """Brave Search API๋ฅผ ์ฌ์ฉํ์ฌ ์ต์ ์ ๋ณด ๊ฒ์""" | |
| if not BRAVE_KEY: | |
| return [] | |
| params = {"q": query, "count": str(count)} | |
| if freshness_days: | |
| dt_from = (datetime.utcnow() - timedelta(days=freshness_days)).strftime("%Y-%m-%d") | |
| params["freshness"] = dt_from | |
| try: | |
| r = requests.get( | |
| BRAVE_ENDPOINT, | |
| headers={"Accept": "application/json", "X-Subscription-Token": BRAVE_KEY}, | |
| params=params, | |
| timeout=15 | |
| ) | |
| raw = r.json().get("web", {}).get("results") or [] | |
| return [{ | |
| "title": r.get("title", ""), | |
| "url": r.get("url", r.get("link", "")), | |
| "snippet": r.get("description", r.get("text", "")), | |
| "host": re.sub(r"https?://(www\.)?", "", r.get("url", "")).split("/")[0] | |
| } for r in raw[:count]] | |
| except Exception as e: | |
| logging.error(f"Brave search error: {e}") | |
| return [] | |
| def format_search_results(query: str, for_keyword: bool = False) -> str: | |
| """๊ฒ์ ๊ฒฐ๊ณผ๋ฅผ ํฌ๋งทํ ํ์ฌ ๋ฐํ""" | |
| # ํค์๋ ๊ฒ์์ ๊ฒฝ์ฐ ๋ ๋ง์ ๊ฒฐ๊ณผ ์ฌ์ฉ | |
| count = 5 if for_keyword else 3 | |
| rows = brave_search(query, count, freshness_days=7 if not for_keyword else None) | |
| if not rows: | |
| return "" | |
| results = [] | |
| # ํค์๋ ๊ฒ์์ ๊ฒฝ์ฐ ๋ ์์ธํ ์ ๋ณด ํฌํจ | |
| max_results = 4 if for_keyword else 2 | |
| for r in rows[:max_results]: | |
| if for_keyword: | |
| # ํค์๋ ๊ฒ์์ ๋ ๊ธด ์ค๋ํซ ์ฌ์ฉ | |
| snippet = r['snippet'][:200] + "..." if len(r['snippet']) > 200 else r['snippet'] | |
| results.append(f"**{r['title']}**\n{snippet}\nSource: {r['host']}") | |
| else: | |
| # ์ผ๋ฐ ๊ฒ์์ ์งง์ ์ค๋ํซ | |
| snippet = r['snippet'][:100] + "..." if len(r['snippet']) > 100 else r['snippet'] | |
| results.append(f"- {r['title']}: {snippet}") | |
| return "\n\n".join(results) + "\n" | |
| def extract_keywords_for_search(text: str, language: str = "English") -> List[str]: | |
| """ํ ์คํธ์์ ๊ฒ์ํ ํค์๋ ์ถ์ถ (๊ฐ์ )""" | |
| # ํ ์คํธ ์๋ถ๋ถ๋ง ์ฌ์ฉ (๋๋ฌด ๋ง์ ํ ์คํธ ์ฒ๋ฆฌ ๋ฐฉ์ง) | |
| text_sample = text[:500] | |
| if language == "Korean": | |
| import re | |
| # ํ๊ตญ์ด ๋ช ์ฌ ์ถ์ถ (2๊ธ์ ์ด์) | |
| keywords = re.findall(r'[๊ฐ-ํฃ]{2,}', text_sample) | |
| # ์ค๋ณต ์ ๊ฑฐํ๊ณ ๊ฐ์ฅ ๊ธด ๋จ์ด 1๊ฐ๋ง ์ ํ | |
| unique_keywords = list(dict.fromkeys(keywords)) | |
| # ๊ธธ์ด ์์ผ๋ก ์ ๋ ฌํ๊ณ ๊ฐ์ฅ ์๋ฏธ์์ ๊ฒ ๊ฐ์ ๋จ์ด ์ ํ | |
| unique_keywords.sort(key=len, reverse=True) | |
| return unique_keywords[:1] # 1๊ฐ๋ง ๋ฐํ | |
| else: | |
| # ์์ด๋ ๋๋ฌธ์๋ก ์์ํ๋ ๋จ์ด ์ค ๊ฐ์ฅ ๊ธด ๊ฒ 1๊ฐ | |
| words = text_sample.split() | |
| keywords = [word.strip('.,!?;:') for word in words | |
| if len(word) > 4 and word[0].isupper()] | |
| if keywords: | |
| return [max(keywords, key=len)] # ๊ฐ์ฅ ๊ธด ๋จ์ด 1๊ฐ | |
| return [] | |
| def search_and_compile_content(keyword: str, language: str = "English") -> str: | |
| """ํค์๋๋ก ๊ฒ์ํ์ฌ ์ถฉ๋ถํ ์ฝํ ์ธ ์ปดํ์ผ""" | |
| if not BRAVE_KEY: | |
| # API ์์ ๋๋ ๊ธฐ๋ณธ ์ฝํ ์ธ ์์ฑ | |
| if language == "Korean": | |
| return f""" | |
| '{keyword}'์ ๋ํ ์ข ํฉ์ ์ธ ์ ๋ณด: | |
| {keyword}๋ ํ๋ ์ฌํ์์ ๋งค์ฐ ์ค์ํ ์ฃผ์ ์ ๋๋ค. | |
| ์ด ์ฃผ์ ๋ ๋ค์ํ ์ธก๋ฉด์์ ์ฐ๋ฆฌ์ ์ถ์ ์ํฅ์ ๋ฏธ์น๊ณ ์์ผ๋ฉฐ, | |
| ์ต๊ทผ ๋ค์ด ๋์ฑ ์ฃผ๋ชฉ๋ฐ๊ณ ์์ต๋๋ค. | |
| ์ฃผ์ ํน์ง: | |
| 1. ๊ธฐ์ ์ ๋ฐ์ ๊ณผ ํ์ | |
| 2. ์ฌํ์ ์ํฅ๊ณผ ๋ณํ | |
| 3. ๋ฏธ๋ ์ ๋ง๊ณผ ๊ฐ๋ฅ์ฑ | |
| 4. ์ค์ฉ์ ํ์ฉ ๋ฐฉ์ | |
| 5. ๊ธ๋ก๋ฒ ํธ๋ ๋์ ๋ํฅ | |
| ์ ๋ฌธ๊ฐ๋ค์ {keyword}๊ฐ ์์ผ๋ก ๋์ฑ ์ค์ํด์ง ๊ฒ์ผ๋ก ์์ํ๊ณ ์์ผ๋ฉฐ, | |
| ์ด์ ๋ํ ๊น์ด ์๋ ์ดํด๊ฐ ํ์ํ ์์ ์ ๋๋ค. | |
| """ | |
| else: | |
| return f""" | |
| Comprehensive information about '{keyword}': | |
| {keyword} is a significant topic in modern society. | |
| This subject impacts our lives in various ways and has been | |
| gaining increasing attention recently. | |
| Key aspects: | |
| 1. Technological advancement and innovation | |
| 2. Social impact and changes | |
| 3. Future prospects and possibilities | |
| 4. Practical applications | |
| 5. Global trends and developments | |
| Experts predict that {keyword} will become even more important, | |
| and it's crucial to develop a deep understanding of this topic. | |
| """ | |
| # ์ธ์ด์ ๋ฐ๋ฅธ ๋ค์ํ ๊ฒ์ ์ฟผ๋ฆฌ | |
| if language == "Korean": | |
| queries = [ | |
| f"{keyword} ์ต์ ๋ด์ค 2024", | |
| f"{keyword} ์ ๋ณด ์ค๋ช ", | |
| f"{keyword} ํธ๋ ๋ ์ ๋ง", | |
| f"{keyword} ์ฅ์ ๋จ์ ", | |
| f"{keyword} ํ์ฉ ๋ฐฉ๋ฒ", | |
| f"{keyword} ์ ๋ฌธ๊ฐ ์๊ฒฌ" | |
| ] | |
| else: | |
| queries = [ | |
| f"{keyword} latest news 2024", | |
| f"{keyword} explained comprehensive", | |
| f"{keyword} trends forecast", | |
| f"{keyword} advantages disadvantages", | |
| f"{keyword} how to use", | |
| f"{keyword} expert opinions" | |
| ] | |
| all_content = [] | |
| total_content_length = 0 | |
| for query in queries: | |
| results = brave_search(query, count=5) # ๋ ๋ง์ ๊ฒฐ๊ณผ ๊ฐ์ ธ์ค๊ธฐ | |
| for r in results[:3]: # ๊ฐ ์ฟผ๋ฆฌ๋น ์์ 3๊ฐ | |
| content = f"**{r['title']}**\n{r['snippet']}\nSource: {r['host']}\n" | |
| all_content.append(content) | |
| total_content_length += len(r['snippet']) | |
| # ์ฝํ ์ธ ๊ฐ ๋ถ์กฑํ๋ฉด ์ถ๊ฐ ์์ฑ | |
| if total_content_length < 1000: # ์ต์ 1000์ ํ๋ณด | |
| if language == "Korean": | |
| additional_content = f""" | |
| ์ถ๊ฐ ์ ๋ณด: | |
| {keyword}์ ๊ด๋ จ๋ ์ต๊ทผ ๋ํฅ์ ์ดํด๋ณด๋ฉด, ์ด ๋ถ์ผ๋ ๋น ๋ฅด๊ฒ ๋ฐ์ ํ๊ณ ์์ต๋๋ค. | |
| ๋ง์ ์ ๋ฌธ๊ฐ๋ค์ด ์ด ์ฃผ์ ์ ๋ํด ํ๋ฐํ ์ฐ๊ตฌํ๊ณ ์์ผ๋ฉฐ, | |
| ์ค์ํ์์์ ์์ฉ ๊ฐ๋ฅ์ฑ๋ ๊ณ์ ํ๋๋๊ณ ์์ต๋๋ค. | |
| ํนํ ์ฃผ๋ชฉํ ์ ์: | |
| - ๊ธฐ์ ํ์ ์ ๊ฐ์ํ | |
| - ์ฌ์ฉ์ ๊ฒฝํ์ ๊ฐ์ | |
| - ์ ๊ทผ์ฑ์ ํฅ์ | |
| - ๋น์ฉ ํจ์จ์ฑ ์ฆ๋ | |
| - ๊ธ๋ก๋ฒ ์์ฅ์ ์ฑ์ฅ | |
| ์ด๋ฌํ ์์๋ค์ด {keyword}์ ๋ฏธ๋๋ฅผ ๋์ฑ ๋ฐ๊ฒ ๋ง๋ค๊ณ ์์ต๋๋ค. | |
| """ | |
| else: | |
| additional_content = f""" | |
| Additional insights: | |
| Recent developments in {keyword} show rapid advancement in this field. | |
| Many experts are actively researching this topic, and its practical | |
| applications continue to expand. | |
| Key points to note: | |
| - Accelerating technological innovation | |
| - Improving user experience | |
| - Enhanced accessibility | |
| - Increased cost efficiency | |
| - Growing global market | |
| These factors are making the future of {keyword} increasingly promising. | |
| """ | |
| all_content.append(additional_content) | |
| # ์ปดํ์ผ๋ ์ฝํ ์ธ ๋ฐํ | |
| compiled = "\n\n".join(all_content) | |
| # ํค์๋ ๊ธฐ๋ฐ ์๊ฐ | |
| if language == "Korean": | |
| intro = f"### '{keyword}'์ ๋ํ ์ข ํฉ์ ์ธ ์ ๋ณด์ ์ต์ ๋ํฅ:\n\n" | |
| else: | |
| intro = f"### Comprehensive information and latest trends about '{keyword}':\n\n" | |
| return intro + compiled | |
| class UnifiedAudioConverter: | |
| def __init__(self, config: ConversationConfig): | |
| self.config = config | |
| self.llm_client = None | |
| self.legacy_local_model = None | |
| self.legacy_tokenizer = None | |
| # ์๋ก์ด ๋ก์ปฌ LLM ๊ด๋ จ | |
| self.local_llm = None | |
| self.local_llm_model = None | |
| self.melo_models = None | |
| self.spark_model_dir = None | |
| self.device = "cuda" if torch.cuda.is_available() else "cpu" | |
| # ํ๋กฌํํธ ๋น๋ ์ถ๊ฐ | |
| self.prompt_builder = PromptBuilder() | |
| def initialize_api_mode(self, api_key: str): | |
| """Initialize API mode with Together API""" | |
| self.llm_client = OpenAI(api_key=api_key, base_url="https://api.together.xyz/v1") | |
| def initialize_local_mode(self): | |
| """Initialize new local mode with Llama CPP""" | |
| if not LLAMA_CPP_AVAILABLE: | |
| raise RuntimeError("Llama CPP dependencies not available. Please install llama-cpp-python and llama-cpp-agent.") | |
| if self.local_llm is None or self.local_llm_model != self.config.local_model_name: | |
| try: | |
| # ๋ชจ๋ธ ๋ค์ด๋ก๋ | |
| model_path = hf_hub_download( | |
| repo_id=self.config.local_model_repo, | |
| filename=self.config.local_model_name, | |
| local_dir="./models" | |
| ) | |
| model_path_local = os.path.join("./models", self.config.local_model_name) | |
| if not os.path.exists(model_path_local): | |
| raise RuntimeError(f"Model file not found at {model_path_local}") | |
| # Llama ๋ชจ๋ธ ์ด๊ธฐํ | |
| self.local_llm = Llama( | |
| model_path=model_path_local, | |
| flash_attn=True, | |
| n_gpu_layers=81 if torch.cuda.is_available() else 0, | |
| n_batch=1024, | |
| n_ctx=16384, | |
| ) | |
| self.local_llm_model = self.config.local_model_name | |
| print(f"Local LLM initialized: {model_path_local}") | |
| except Exception as e: | |
| print(f"Failed to initialize local LLM: {e}") | |
| raise RuntimeError(f"Failed to initialize local LLM: {e}") | |
| def initialize_legacy_local_mode(self): | |
| """Initialize legacy local mode with Hugging Face model (fallback)""" | |
| if self.legacy_local_model is None: | |
| quantization_config = BitsAndBytesConfig( | |
| load_in_4bit=True, | |
| bnb_4bit_compute_dtype=torch.float16 | |
| ) | |
| self.legacy_local_model = AutoModelForCausalLM.from_pretrained( | |
| self.config.legacy_local_model_name, | |
| quantization_config=quantization_config | |
| ) | |
| self.legacy_tokenizer = AutoTokenizer.from_pretrained( | |
| self.config.legacy_local_model_name, | |
| revision='8ab73a6800796d84448bc936db9bac5ad9f984ae' | |
| ) | |
| def initialize_spark_tts(self): | |
| """Initialize Spark TTS model by downloading if needed""" | |
| if not SPARK_AVAILABLE: | |
| raise RuntimeError("Spark TTS dependencies not available") | |
| model_dir = "pretrained_models/Spark-TTS-0.5B" | |
| # Check if model exists, if not download it | |
| if not os.path.exists(model_dir): | |
| print("Downloading Spark-TTS model...") | |
| try: | |
| os.makedirs("pretrained_models", exist_ok=True) | |
| snapshot_download( | |
| "SparkAudio/Spark-TTS-0.5B", | |
| local_dir=model_dir | |
| ) | |
| print("Spark-TTS model downloaded successfully") | |
| except Exception as e: | |
| raise RuntimeError(f"Failed to download Spark-TTS model: {e}") | |
| self.spark_model_dir = model_dir | |
| # Check if we have the CLI inference script | |
| if not os.path.exists("cli/inference.py"): | |
| print("Warning: Spark-TTS CLI not found. Please clone the Spark-TTS repository.") | |
| def initialize_melo_tts(self): | |
| """Initialize MeloTTS models""" | |
| if MELO_AVAILABLE and self.melo_models is None: | |
| self.melo_models = {"EN": MeloTTS(language="EN", device=self.device)} | |
| def fetch_text(self, url: str) -> str: | |
| """Fetch text content from URL""" | |
| if not url: | |
| raise ValueError("URL cannot be empty") | |
| if not url.startswith("http://") and not url.startswith("https://"): | |
| raise ValueError("URL must start with 'http://' or 'https://'") | |
| full_url = f"{self.config.prefix_url}{url}" | |
| try: | |
| response = httpx.get(full_url, timeout=60.0) | |
| response.raise_for_status() | |
| return response.text | |
| except httpx.HTTPError as e: | |
| raise RuntimeError(f"Failed to fetch URL: {e}") | |
| def extract_text_from_pdf(self, pdf_file) -> str: | |
| """Extract text content from PDF file""" | |
| try: | |
| # Gradio returns file path, not file object | |
| if isinstance(pdf_file, str): | |
| pdf_path = pdf_file | |
| else: | |
| # If it's a file object (shouldn't happen with Gradio) | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tmp_file: | |
| tmp_file.write(pdf_file.read()) | |
| pdf_path = tmp_file.name | |
| # PDF ๋ก๋ ๋ฐ ํ ์คํธ ์ถ์ถ | |
| loader = PyPDFLoader(pdf_path) | |
| pages = loader.load() | |
| # ๋ชจ๋ ํ์ด์ง์ ํ ์คํธ๋ฅผ ๊ฒฐํฉ | |
| text = "\n".join([page.page_content for page in pages]) | |
| # ์์ ํ์ผ์ธ ๊ฒฝ์ฐ ์ญ์ | |
| if not isinstance(pdf_file, str) and os.path.exists(pdf_path): | |
| os.unlink(pdf_path) | |
| return text | |
| except Exception as e: | |
| raise RuntimeError(f"Failed to extract text from PDF: {e}") | |
| def _get_messages_formatter_type(self, model_name): | |
| """Get appropriate message formatter for the model""" | |
| if "Mistral" in model_name or "BitSix" in model_name: | |
| return MessagesFormatterType.CHATML | |
| else: | |
| return MessagesFormatterType.LLAMA_3 | |
| def extract_conversation_local(self, text: str, language: str = "English", progress=None) -> Dict: | |
| """Extract conversation using new local LLM with enhanced professional style""" | |
| try: | |
| # ๊ฒ์ ์ปจํ ์คํธ ์์ฑ (ํค์๋ ๊ธฐ๋ฐ์ด ์๋ ๊ฒฝ์ฐ) | |
| search_context = "" | |
| if BRAVE_KEY and not text.startswith("Keyword-based content:"): | |
| try: | |
| keywords = extract_keywords_for_search(text, language) | |
| if keywords: | |
| search_query = keywords[0] if language == "Korean" else f"{keywords[0]} latest news" | |
| search_context = format_search_results(search_query) | |
| print(f"Search context added for: {search_query}") | |
| except Exception as e: | |
| print(f"Search failed, continuing without context: {e}") | |
| # ๋จผ์ ์๋ก์ด ๋ก์ปฌ LLM ์๋ | |
| self.initialize_local_mode() | |
| chat_template = self._get_messages_formatter_type(self.config.local_model_name) | |
| provider = LlamaCppPythonProvider(self.local_llm) | |
| # ์ธ์ด๋ณ ์์คํ ๋ฉ์์ง | |
| system_messages = { | |
| "Korean": ( | |
| "๋น์ ์ ํ๊ตญ์ ์ ๋ช ํ์บ์คํธ ์ ๋ฌธ ์๊ฐ์ ๋๋ค. " | |
| "์ฒญ์ทจ์๋ค์ด ๊น์ด ์๋ ์ ๋ฌธ ์ง์์ ์ป์ ์ ์๋ ๊ณ ํ์ง ๋๋ด์ ํ๊ตญ์ด๋ก ๋ง๋ญ๋๋ค. " | |
| "๋ฐ๋์ ์๋ก ์กด๋๋ง์ ์ฌ์ฉํ๋ฉฐ, 12ํ์ ๋ํ ๊ตํ์ผ๋ก ๊ตฌ์ฑํ์ธ์. " | |
| "๋ชจ๋ ๋ํ๋ ๋ฐ๋์ ํ๊ตญ์ด๋ก ์์ฑํ๊ณ JSON ํ์์ผ๋ก๋ง ์๋ตํ์ธ์." | |
| ), | |
| "Japanese": ( | |
| "ใใชใใฏๆฅๆฌใฎๆๅใชใใใใญใฃในใๅฐ้ไฝๅฎถใงใใ" | |
| "่ด่กใๆทฑใๅฐ้็ฅ่ญใๅพใใใ้ซๅ่ณชใชๅฏพ่ซใๆฅๆฌ่ชใงไฝๆใใพใใ" | |
| "ๅฟ ใใไบใใซไธๅฏง่ชใไฝฟ็จใใ12ๅใฎๅฏพ่ฉฑไบคๆใงๆงๆใใฆใใ ใใใ" | |
| "ใในใฆใฎๅฏพ่ฉฑใฏๅฟ ใๆฅๆฌ่ชใงไฝๆใใJSONๅฝขๅผใงใฎใฟๅ็ญใใฆใใ ใใใ" | |
| ), | |
| "French": ( | |
| "Vous รชtes un cรฉlรจbre scรฉnariste de podcast professionnel franรงais. " | |
| "Crรฉez des discussions de haute qualitรฉ en franรงais qui donnent au public " | |
| "des connaissances professionnelles approfondies. " | |
| "Crรฉez exactement 12 รฉchanges de conversation et rรฉpondez uniquement en format JSON." | |
| ), | |
| "German": ( | |
| "Sie sind ein berรผhmter professioneller Podcast-Drehbuchautor aus Deutschland. " | |
| "Erstellen Sie hochwertige Diskussionen auf Deutsch, die dem Publikum " | |
| "tiefgreifendes Fachwissen vermitteln. " | |
| "Erstellen Sie genau 12 Gesprรคchsaustausche und antworten Sie nur im JSON-Format." | |
| ), | |
| "Spanish": ( | |
| "Eres un famoso guionista de podcast profesional espaรฑol. " | |
| "Crea discusiones de alta calidad en espaรฑol que brinden al pรบblico " | |
| "conocimientos profesionales profundos. " | |
| "Crea exactamente 12 intercambios de conversaciรณn y responde solo en formato JSON." | |
| ), | |
| "Chinese": ( | |
| "ๆจๆฏไธญๅฝ่ๅ็ไธไธๆญๅฎข็ผๅงใ" | |
| "ๅๅปบ้ซ่ดจ้็ไธญๆ่ฎจ่ฎบ๏ผไธบ่งไผๆไพๆทฑๅ ฅ็ไธไธ็ฅ่ฏใ" | |
| "ๅๅปบๆฐๅฅฝ12ๆฌกๅฏน่ฏไบคๆข๏ผไป ไปฅJSONๆ ผๅผๅ็ญใ" | |
| ), | |
| "Russian": ( | |
| "ะั ะธะทะฒะตััะฝัะน ะฟัะพัะตััะธะพะฝะฐะปัะฝัะน ััะตะฝะฐัะธัั ะฟะพะดะบะฐััะพะฒ ะธะท ะ ะพััะธะธ. " | |
| "ะกะพะทะดะฐะฒะฐะนัะต ะฒััะพะบะพะบะฐัะตััะฒะตะฝะฝัะต ะดะธัะบัััะธะธ ะฝะฐ ััััะบะพะผ ัะทัะบะต, ะบะพัะพััะต ะดะฐัั ะฐัะดะธัะพัะธะธ " | |
| "ะณะปัะฑะพะบะธะต ะฟัะพัะตััะธะพะฝะฐะปัะฝัะต ะทะฝะฐะฝะธั. " | |
| "ะกะพะทะดะฐะนัะต ัะพะฒะฝะพ 12 ะพะฑะผะตะฝะพะฒ ัะฐะทะณะพะฒะพัะพะผ ะธ ะพัะฒะตัะฐะนัะต ัะพะปัะบะพ ะฒ ัะพัะผะฐัะต JSON." | |
| ) | |
| } | |
| system_message = system_messages.get(language, | |
| f"You are a professional podcast scriptwriter creating high-quality, " | |
| f"insightful discussions in {language}. Create exactly 12 conversation exchanges " | |
| f"with professional expertise. All dialogue must be in {language}. " | |
| f"Respond only in JSON format." | |
| ) | |
| agent = LlamaCppAgent( | |
| provider, | |
| system_prompt=system_message, | |
| predefined_messages_formatter_type=chat_template, | |
| debug_output=False | |
| ) | |
| settings = provider.get_provider_default_settings() | |
| settings.temperature = 0.75 | |
| settings.top_k = 40 | |
| settings.top_p = 0.95 | |
| settings.max_tokens = self.config.max_tokens | |
| settings.repeat_penalty = 1.1 | |
| settings.stream = False | |
| messages = BasicChatHistory() | |
| prompt = self.prompt_builder.build_prompt(text, language, search_context) | |
| response = agent.get_chat_response( | |
| prompt, | |
| llm_sampling_settings=settings, | |
| chat_history=messages, | |
| returns_streaming_generator=False, | |
| print_output=False | |
| ) | |
| # JSON ํ์ฑ | |
| pattern = r"\{(?:[^{}]|(?:\{[^{}]*\}))*\}" | |
| json_match = re.search(pattern, response) | |
| if json_match: | |
| conversation_data = json.loads(json_match.group()) | |
| return conversation_data | |
| else: | |
| raise ValueError("No valid JSON found in local LLM response") | |
| except Exception as e: | |
| print(f"Local LLM failed: {e}, falling back to legacy local method") | |
| return self.extract_conversation_legacy_local(text, language, progress, search_context) | |
| def extract_conversation_legacy_local(self, text: str, language: str = "English", progress=None, search_context: str = "") -> Dict: | |
| """Extract conversation using legacy local model""" | |
| try: | |
| self.initialize_legacy_local_mode() | |
| # ์ธ์ด๋ณ ์์คํ ๋ฉ์์ง๋ config_prompts์์ ๊ฐ์ ธ์ด | |
| messages = self.prompt_builder.build_messages_for_local(text, language, search_context) | |
| terminators = [ | |
| self.legacy_tokenizer.eos_token_id, | |
| self.legacy_tokenizer.convert_tokens_to_ids("<|eot_id|>") | |
| ] | |
| chat_messages = self.legacy_tokenizer.apply_chat_template( | |
| messages, tokenize=False, add_generation_prompt=True | |
| ) | |
| model_inputs = self.legacy_tokenizer([chat_messages], return_tensors="pt").to(self.device) | |
| streamer = TextIteratorStreamer( | |
| self.legacy_tokenizer, timeout=10.0, skip_prompt=True, skip_special_tokens=True | |
| ) | |
| generate_kwargs = dict( | |
| model_inputs, | |
| streamer=streamer, | |
| max_new_tokens=self.config.max_new_tokens, | |
| do_sample=True, | |
| temperature=0.75, | |
| eos_token_id=terminators, | |
| ) | |
| t = Thread(target=self.legacy_local_model.generate, kwargs=generate_kwargs) | |
| t.start() | |
| partial_text = "" | |
| for new_text in streamer: | |
| partial_text += new_text | |
| pattern = r"\{(?:[^{}]|(?:\{[^{}]*\}))*\}" | |
| json_match = re.search(pattern, partial_text) | |
| if json_match: | |
| return json.loads(json_match.group()) | |
| else: | |
| raise ValueError("No valid JSON found in legacy local response") | |
| except Exception as e: | |
| print(f"Legacy local model also failed: {e}") | |
| return DefaultConversations.get_conversation(language) | |
| def extract_conversation_api(self, text: str, language: str = "English") -> Dict: | |
| """Extract conversation using API""" | |
| if not self.llm_client: | |
| raise RuntimeError("API mode not initialized") | |
| try: | |
| # ๊ฒ์ ์ปจํ ์คํธ ์์ฑ | |
| search_context = "" | |
| if BRAVE_KEY and not text.startswith("Keyword-based content:"): | |
| try: | |
| keywords = extract_keywords_for_search(text, language) | |
| if keywords: | |
| search_query = keywords[0] if language == "Korean" else f"{keywords[0]} latest news" | |
| search_context = format_search_results(search_query) | |
| print(f"Search context added for: {search_query}") | |
| except Exception as e: | |
| print(f"Search failed, continuing without context: {e}") | |
| # ๋ฉ์์ง ๋น๋ | |
| messages = self.prompt_builder.build_messages_for_local(text, language, search_context) | |
| chat_completion = self.llm_client.chat.completions.create( | |
| messages=messages, | |
| model=self.config.api_model_name, | |
| temperature=0.75, | |
| ) | |
| pattern = r"\{(?:[^{}]|(?:\{[^{}]*\}))*\}" | |
| json_match = re.search(pattern, chat_completion.choices[0].message.content) | |
| if not json_match: | |
| raise ValueError("No valid JSON found in response") | |
| return json.loads(json_match.group()) | |
| except Exception as e: | |
| raise RuntimeError(f"Failed to extract conversation: {e}") | |
| def parse_conversation_text(self, conversation_text: str) -> Dict: | |
| """Parse conversation text back to JSON format""" | |
| lines = conversation_text.strip().split('\n') | |
| conversation_data = {"conversation": []} | |
| for line in lines: | |
| if ':' in line: | |
| speaker, text = line.split(':', 1) | |
| conversation_data["conversation"].append({ | |
| "speaker": speaker.strip(), | |
| "text": text.strip() | |
| }) | |
| return conversation_data | |
| async def text_to_speech_edge(self, conversation_json: Dict, language: str = "English") -> Tuple[str, str]: | |
| """Convert text to speech using Edge TTS""" | |
| output_dir = Path(self._create_output_directory()) | |
| filenames = [] | |
| try: | |
| # ์ธ์ด๋ณ ์์ฑ ์ค์ | |
| voices = EDGE_TTS_VOICES.get(language, EDGE_TTS_VOICES["English"]) | |
| for i, turn in enumerate(conversation_json["conversation"]): | |
| filename = output_dir / f"output_{i}.wav" | |
| voice = voices[i % len(voices)] | |
| tmp_path = await self._generate_audio_edge(turn["text"], voice) | |
| os.rename(tmp_path, filename) | |
| filenames.append(str(filename)) | |
| # Combine audio files | |
| final_output = os.path.join(output_dir, "combined_output.wav") | |
| self._combine_audio_files(filenames, final_output) | |
| # Generate conversation text | |
| conversation_text = "\n".join( | |
| f"{turn.get('speaker', f'Speaker {i+1}')}: {turn['text']}" | |
| for i, turn in enumerate(conversation_json["conversation"]) | |
| ) | |
| return final_output, conversation_text | |
| except Exception as e: | |
| raise RuntimeError(f"Failed to convert text to speech: {e}") | |
| async def _generate_audio_edge(self, text: str, voice: str) -> str: | |
| """Generate audio using Edge TTS""" | |
| if not text.strip(): | |
| raise ValueError("Text cannot be empty") | |
| voice_short_name = voice.split(" - ")[0] if " - " in voice else voice | |
| communicate = edge_tts.Communicate(text, voice_short_name) | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as tmp_file: | |
| tmp_path = tmp_file.name | |
| await communicate.save(tmp_path) | |
| return tmp_path | |
| def text_to_speech_spark(self, conversation_json: Dict, language: str = "English", progress=None) -> Tuple[str, str]: | |
| """Convert text to speech using Spark TTS CLI""" | |
| if not SPARK_AVAILABLE or not self.spark_model_dir: | |
| raise RuntimeError("Spark TTS not available") | |
| try: | |
| output_dir = self._create_output_directory() | |
| audio_files = [] | |
| # Create different voice characteristics for different speakers | |
| speaker1, speaker2 = self.prompt_builder.get_speaker_names(language) | |
| if language == "Korean": | |
| voice_configs = [ | |
| {"prompt_text": f"์๋ ํ์ธ์, ์ค๋ ํ์บ์คํธ ์งํ์ ๋งก์ {speaker1}์ ๋๋ค.", "gender": "male"}, | |
| {"prompt_text": f"์๋ ํ์ธ์, ์ ๋ ์ค๋ ์ด ์ฃผ์ ์ ๋ํด ์ค๋ช ๋๋ฆด {speaker2}์ ๋๋ค.", "gender": "male"} | |
| ] | |
| else: | |
| voice_configs = [ | |
| {"prompt_text": f"Hello everyone, I'm {speaker1}, your host for today's podcast.", "gender": "male"}, | |
| {"prompt_text": f"Hi, I'm {speaker2}. I'm excited to share my insights with you.", "gender": "male"} | |
| ] | |
| for i, turn in enumerate(conversation_json["conversation"]): | |
| text = turn["text"] | |
| if not text.strip(): | |
| continue | |
| voice_config = voice_configs[i % len(voice_configs)] | |
| output_file = os.path.join(output_dir, f"spark_output_{i}.wav") | |
| cmd = [ | |
| "python", "-m", "cli.inference", | |
| "--text", text, | |
| "--device", "0" if torch.cuda.is_available() else "cpu", | |
| "--save_dir", output_dir, | |
| "--model_dir", self.spark_model_dir, | |
| "--prompt_text", voice_config["prompt_text"], | |
| "--output_name", f"spark_output_{i}.wav" | |
| ] | |
| try: | |
| result = subprocess.run( | |
| cmd, | |
| capture_output=True, | |
| text=True, | |
| timeout=60, | |
| cwd="." | |
| ) | |
| if result.returncode == 0: | |
| audio_files.append(output_file) | |
| else: | |
| print(f"Spark TTS error for turn {i}: {result.stderr}") | |
| silence = np.zeros(int(22050 * 1.0)) | |
| sf.write(output_file, silence, 22050) | |
| audio_files.append(output_file) | |
| except subprocess.TimeoutExpired: | |
| print(f"Spark TTS timeout for turn {i}") | |
| silence = np.zeros(int(22050 * 1.0)) | |
| sf.write(output_file, silence, 22050) | |
| audio_files.append(output_file) | |
| except Exception as e: | |
| print(f"Error running Spark TTS for turn {i}: {e}") | |
| silence = np.zeros(int(22050 * 1.0)) | |
| sf.write(output_file, silence, 22050) | |
| audio_files.append(output_file) | |
| # Combine all audio files | |
| if audio_files: | |
| final_output = os.path.join(output_dir, "spark_combined.wav") | |
| self._combine_audio_files(audio_files, final_output) | |
| else: | |
| raise RuntimeError("No audio files generated") | |
| conversation_text = "\n".join( | |
| f"{turn.get('speaker', f'Speaker {i+1}')}: {turn['text']}" | |
| for i, turn in enumerate(conversation_json["conversation"]) | |
| ) | |
| return final_output, conversation_text | |
| except Exception as e: | |
| raise RuntimeError(f"Failed to convert text to speech with Spark TTS: {e}") | |
| def text_to_speech_melo(self, conversation_json: Dict, progress=None) -> Tuple[str, str]: | |
| """Convert text to speech using MeloTTS""" | |
| if not MELO_AVAILABLE or not self.melo_models: | |
| raise RuntimeError("MeloTTS not available") | |
| speakers = ["EN-Default", "EN-US"] | |
| combined_audio = AudioSegment.empty() | |
| for i, turn in enumerate(conversation_json["conversation"]): | |
| bio = io.BytesIO() | |
| text = turn["text"] | |
| speaker = speakers[i % 2] | |
| speaker_id = self.melo_models["EN"].hps.data.spk2id[speaker] | |
| self.melo_models["EN"].tts_to_file( | |
| text, speaker_id, bio, speed=1.0, | |
| pbar=progress.tqdm if progress else None, | |
| format="wav" | |
| ) | |
| bio.seek(0) | |
| audio_segment = AudioSegment.from_file(bio, format="wav") | |
| combined_audio += audio_segment | |
| final_audio_path = "melo_podcast.mp3" | |
| combined_audio.export(final_audio_path, format="mp3") | |
| conversation_text = "\n".join( | |
| f"{turn.get('speaker', f'Speaker {i+1}')}: {turn['text']}" | |
| for i, turn in enumerate( | |
| conversation_json["conversation"]) | |
| ) | |
| return final_audio_path, conversation_text | |
| def _create_output_directory(self) -> str: | |
| """Create a unique output directory""" | |
| random_bytes = os.urandom(8) | |
| folder_name = base64.urlsafe_b64encode(random_bytes).decode("utf-8") | |
| os.makedirs(folder_name, exist_ok=True) | |
| return folder_name | |
| def _combine_audio_files(self, filenames: List[str], output_file: str) -> None: | |
| """Combine multiple audio files into one""" | |
| if not filenames: | |
| raise ValueError("No input files provided") | |
| try: | |
| audio_segments = [] | |
| for filename in filenames: | |
| if os.path.exists(filename): | |
| audio_segment = AudioSegment.from_file(filename) | |
| audio_segments.append(audio_segment) | |
| if audio_segments: | |
| combined = sum(audio_segments) | |
| combined.export(output_file, format="wav") | |
| # Clean up temporary files | |
| for filename in filenames: | |
| if os.path.exists(filename): | |
| os.remove(filename) | |
| except Exception as e: | |
| raise RuntimeError(f"Failed to combine audio files: {e}") | |
| # Global converter instance | |
| converter = UnifiedAudioConverter(ConversationConfig()) | |
| async def synthesize(article_input, input_type: str = "URL", mode: str = "Local", tts_engine: str = "Edge-TTS", language: str = "English"): | |
| """Main synthesis function - handles URL, PDF, and Keyword inputs""" | |
| try: | |
| # Extract text based on input type | |
| if input_type == "URL": | |
| if not article_input or not isinstance(article_input, str): | |
| return "Please provide a valid URL.", None | |
| text = converter.fetch_text(article_input) | |
| elif input_type == "PDF": | |
| if not article_input: | |
| return "Please upload a PDF file.", None | |
| text = converter.extract_text_from_pdf(article_input) | |
| else: # Keyword | |
| if not article_input or not isinstance(article_input, str): | |
| return "Please provide a keyword or topic.", None | |
| text = search_and_compile_content(article_input, language) | |
| text = f"Keyword-based content:\n{text}" | |
| # Limit text to max words | |
| words = text.split() | |
| if len(words) > converter.config.max_words: | |
| text = " ".join(words[:converter.config.max_words]) | |
| # Extract conversation based on mode | |
| if mode == "Local": | |
| try: | |
| conversation_json = converter.extract_conversation_local(text, language) | |
| except Exception as e: | |
| print(f"Local mode failed: {e}, trying API fallback") | |
| api_key = os.environ.get("TOGETHER_API_KEY") | |
| if api_key: | |
| converter.initialize_api_mode(api_key) | |
| conversation_json = converter.extract_conversation_api(text, language) | |
| else: | |
| raise RuntimeError("Local mode failed and no API key available for fallback") | |
| else: # API mode | |
| api_key = os.environ.get("TOGETHER_API_KEY") | |
| if not api_key: | |
| print("API key not found, falling back to local mode") | |
| conversation_json = converter.extract_conversation_local(text, language) | |
| else: | |
| try: | |
| converter.initialize_api_mode(api_key) | |
| conversation_json = converter.extract_conversation_api(text, language) | |
| except Exception as e: | |
| print(f"API mode failed: {e}, falling back to local mode") | |
| conversation_json = converter.extract_conversation_local(text, language) | |
| # Generate conversation text | |
| conversation_text = "\n".join( | |
| f"{turn.get('speaker', f'Speaker {i+1}')}: {turn['text']}" | |
| for i, turn in enumerate(conversation_json["conversation"]) | |
| ) | |
| return conversation_text, None | |
| except Exception as e: | |
| return f"Error: {str(e)}", None | |
| async def regenerate_audio(conversation_text: str, tts_engine: str = "Edge-TTS", language: str = "English"): | |
| """Regenerate audio from edited conversation text""" | |
| if not conversation_text.strip(): | |
| return "Please provide conversation text.", None | |
| try: | |
| conversation_json = converter.parse_conversation_text(conversation_text) | |
| if not conversation_json["conversation"]: | |
| return "No valid conversation found in the text.", None | |
| # Edge TTS ์ ์ฉ ์ธ์ด๋ ์๋์ผ๋ก Edge-TTS ์ฌ์ฉ | |
| if language in EDGE_TTS_ONLY_LANGUAGES and tts_engine != "Edge-TTS": | |
| tts_engine = "Edge-TTS" | |
| # Generate audio based on TTS engine | |
| if tts_engine == "Edge-TTS": | |
| output_file, _ = await converter.text_to_speech_edge(conversation_json, language) | |
| elif tts_engine == "Spark-TTS": | |
| if not SPARK_AVAILABLE: | |
| return "Spark TTS not available. Please install required dependencies and clone the Spark-TTS repository.", None | |
| converter.initialize_spark_tts() | |
| output_file, _ = converter.text_to_speech_spark(conversation_json, language) | |
| else: # MeloTTS | |
| if not MELO_AVAILABLE: | |
| return "MeloTTS not available. Please install required dependencies.", None | |
| if language in EDGE_TTS_ONLY_LANGUAGES: | |
| return f"MeloTTS does not support {language}. Please use Edge-TTS for this language.", None | |
| converter.initialize_melo_tts() | |
| output_file, _ = converter.text_to_speech_melo(conversation_json) | |
| return "Audio generated successfully!", output_file | |
| except Exception as e: | |
| return f"Error generating audio: {str(e)}", None | |
| def synthesize_sync(article_input, input_type: str = "URL", mode: str = "Local", tts_engine: str = "Edge-TTS", language: str = "English"): | |
| """Synchronous wrapper for async synthesis""" | |
| return asyncio.run(synthesize(article_input, input_type, mode, tts_engine, language)) | |
| def regenerate_audio_sync(conversation_text: str, tts_engine: str = "Edge-TTS", language: str = "English"): | |
| """Synchronous wrapper for async audio regeneration""" | |
| return asyncio.run(regenerate_audio(conversation_text, tts_engine, language)) | |
| def update_tts_engine_for_language(language): | |
| """์ธ์ด๋ณ TTS ์์ง ์ต์ ์ ๋ฐ์ดํธ""" | |
| if language in EDGE_TTS_ONLY_LANGUAGES: | |
| language_info = { | |
| "Korean": "ํ๊ตญ์ด๋ Edge-TTS๋ง ์ง์๋ฉ๋๋ค", | |
| "Japanese": "ๆฅๆฌ่ชใฏEdge-TTSใฎใฟใตใใผใใใใฆใใพใ", | |
| "French": "Le franรงais n'est pris en charge que par Edge-TTS", | |
| "German": "Deutsch wird nur von Edge-TTS unterstรผtzt", | |
| "Spanish": "El espaรฑol solo es compatible con Edge-TTS", | |
| "Italian": "L'italiano รจ supportato solo da Edge-TTS", | |
| "Portuguese": "O portuguรชs รฉ suportado apenas pelo Edge-TTS", | |
| "Dutch": "Nederlands wordt alleen ondersteund door Edge-TTS", | |
| "Thai": "เธ เธฒเธฉเธฒเนเธเธขเธฃเธญเธเธฃเธฑเธเนเธเธเธฒเธฐ Edge-TTS เนเธเนเธฒเธเธฑเนเธ", | |
| "Vietnamese": "Tiแบฟng Viแปt chแป ฤฦฐแปฃc hแป trแปฃ bแปi Edge-TTS", | |
| "Arabic": "ุงูุนุฑุจูุฉ ู ุฏุนูู ุฉ ููุท ู ู Edge-TTS", | |
| "Hebrew": "ืขืืจืืช ื ืชืืืช ืจืง ืขื ืืื Edge-TTS", | |
| "Indonesian": "Bahasa Indonesia hanya didukung oleh Edge-TTS", | |
| "Hindi": "เคนเคฟเคเคฆเฅ เคเฅเคตเคฒ Edge-TTS เคฆเฅเคตเคพเคฐเคพ เคธเคฎเคฐเฅเคฅเคฟเคค เคนเฅ", | |
| "Russian": "ะ ัััะบะธะน ะฟะพะดะดะตัะถะธะฒะฐะตััั ัะพะปัะบะพ Edge-TTS", | |
| "Chinese": "ไธญๆไป ๆฏๆEdge-TTS" | |
| } | |
| info_text = language_info.get(language, f"{language} is only supported by Edge-TTS") | |
| return gr.Radio( | |
| choices=["Edge-TTS"], | |
| value="Edge-TTS", | |
| label="TTS Engine", | |
| info=info_text, | |
| interactive=False | |
| ) | |
| else: | |
| return gr.Radio( | |
| choices=["Edge-TTS", "Spark-TTS", "MeloTTS"], | |
| value="Edge-TTS", | |
| label="TTS Engine", | |
| info="Edge-TTS: Cloud-based, natural voices | Spark-TTS: Local AI model | MeloTTS: Local, requires GPU", | |
| interactive=True | |
| ) | |
| def toggle_input_visibility(input_type): | |
| """Toggle visibility of URL input, file upload, and keyword input based on input type""" | |
| if input_type == "URL": | |
| return gr.update(visible=True), gr.update(visible=False), gr.update(visible=False) | |
| elif input_type == "PDF": | |
| return gr.update(visible=False), gr.update(visible=True), gr.update(visible=False) | |
| else: # Keyword | |
| return gr.update(visible=False), gr.update(visible=False), gr.update(visible=True) | |
| # ๋ชจ๋ธ ์ด๊ธฐํ (์ฑ ์์ ์) | |
| if LLAMA_CPP_AVAILABLE: | |
| try: | |
| model_path = hf_hub_download( | |
| repo_id=converter.config.local_model_repo, | |
| filename=converter.config.local_model_name, | |
| local_dir="./models" | |
| ) | |
| print(f"Model downloaded to: {model_path}") | |
| except Exception as e: | |
| print(f"Failed to download model at startup: {e}") | |
| # Gradio Interface - ๊ฐ์ ๋ ๋ค๊ตญ์ด ๋ ์ด์์ | |
| with gr.Blocks(theme='soft', title="AI Podcast Generator", css=""" | |
| .container {max-width: 1200px; margin: auto; padding: 20px;} | |
| .header-text {text-align: center; margin-bottom: 30px;} | |
| .input-group {background: #f7f7f7; padding: 20px; border-radius: 10px; margin-bottom: 20px;} | |
| .output-group {background: #f0f0f0; padding: 20px; border-radius: 10px;} | |
| .status-box {background: #e8f4f8; padding: 15px; border-radius: 8px; margin-top: 10px;} | |
| """) as demo: | |
| with gr.Column(elem_classes="container"): | |
| # ํค๋ | |
| with gr.Row(elem_classes="header-text"): | |
| gr.Markdown(""" | |
| # ๐๏ธ AI Podcast Generator - Professional Multi-Language Edition | |
| ### Convert any article, blog, PDF document, or topic into an engaging professional podcast conversation in 24+ languages! | |
| """) | |
| with gr.Row(elem_classes="discord-badge"): | |
| gr.HTML(""" | |
| <p style="text-align: center;"> | |
| <a href="https://discord.gg/openfreeai" target="_blank"> | |
| <img src="https://img.shields.io/static/v1?label=Discord&message=Openfree%20AI&color=%230000ff&labelColor=%23800080&logo=discord&logoColor=white&style=for-the-badge" alt="badge"> | |
| </a> | |
| </p> | |
| """) | |
| # ์ํ ํ์ ์น์ | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| gr.Markdown(f""" | |
| #### ๐ค System Status | |
| - **LLM**: {converter.config.local_model_name.split('.')[0]} | |
| - **Fallback**: {converter.config.api_model_name.split('/')[-1]} | |
| - **Llama CPP**: {"โ Ready" if LLAMA_CPP_AVAILABLE else "โ Not Available"} | |
| - **Search**: {"โ Brave API" if BRAVE_KEY else "โ No API"} | |
| """) | |
| with gr.Column(scale=1): | |
| gr.Markdown(""" | |
| #### ๐ Multi-Language Support | |
| - **24+ Languages**: Korean, Japanese, French, German, Spanish, Italian, etc. | |
| - **Native Voices**: Optimized for each language | |
| - **Professional Style**: Expert discussions with data & insights | |
| - **Auto-TTS Selection**: Best engine per language | |
| """) | |
| # ๋ฉ์ธ ์ ๋ ฅ ์น์ | |
| with gr.Group(elem_classes="input-group"): | |
| with gr.Row(): | |
| # ์ผ์ชฝ: ์ ๋ ฅ ์ต์ ๋ค | |
| with gr.Column(scale=2): | |
| # ์ ๋ ฅ ํ์ ์ ํ | |
| input_type_selector = gr.Radio( | |
| choices=["URL", "PDF", "Keyword"], | |
| value="URL", | |
| label="๐ฅ Input Type", | |
| info="Choose your content source" | |
| ) | |
| # URL ์ ๋ ฅ | |
| url_input = gr.Textbox( | |
| label="๐ Article URL", | |
| placeholder="Enter the article URL here...", | |
| value="", | |
| visible=True, | |
| lines=2 | |
| ) | |
| # PDF ์ ๋ก๋ | |
| pdf_input = gr.File( | |
| label="๐ Upload PDF", | |
| file_types=[".pdf"], | |
| visible=False | |
| ) | |
| # ํค์๋ ์ ๋ ฅ | |
| keyword_input = gr.Textbox( | |
| label="๐ Topic/Keyword", | |
| placeholder="Enter a topic (e.g., 'AI trends 2024', '์ธ๊ณต์ง๋ฅ', 'IA tendances', 'KI Trends')", | |
| value="", | |
| visible=False, | |
| info="System will search and compile latest information", | |
| lines=2 | |
| ) | |
| # ์ค๋ฅธ์ชฝ: ์ค์ ์ต์ ๋ค | |
| with gr.Column(scale=1): | |
| # ์ธ์ด ์ ํ | |
| language_selector = gr.Radio( | |
| choices=[ | |
| "English", "Korean", "Japanese", "French", "German", | |
| "Spanish", "Italian", "Portuguese", "Dutch", "Thai", | |
| "Vietnamese", "Arabic", "Hebrew", "Indonesian", "Hindi", | |
| "Russian", "Chinese", "Norwegian", "Swedish", "Finnish", | |
| "Danish", "Polish", "Turkish", "Greek", "Czech" | |
| ], | |
| value="English", | |
| label="๐ Language / ์ธ์ด / ่ฏญ่จ", | |
| info="Select podcast language" | |
| ) | |
| # ์ฒ๋ฆฌ ๋ชจ๋ | |
| mode_selector = gr.Radio( | |
| choices=["Local", "API"], | |
| value="Local", | |
| label="โ๏ธ Processing Mode", | |
| info="Local: On-device | API: Cloud" | |
| ) | |
| # TTS ์์ง | |
| tts_selector = gr.Radio( | |
| choices=["Edge-TTS", "Spark-TTS", "MeloTTS"], | |
| value="Edge-TTS", | |
| label="๐ TTS Engine", | |
| info="Voice synthesis engine" | |
| ) | |
| # ์์ฑ ๋ฒํผ | |
| with gr.Row(): | |
| convert_btn = gr.Button( | |
| "๐ฏ Generate Professional Conversation", | |
| variant="primary", | |
| size="lg", | |
| scale=1 | |
| ) | |
| # ์ถ๋ ฅ ์น์ | |
| with gr.Group(elem_classes="output-group"): | |
| with gr.Row(): | |
| # ์ผ์ชฝ: ๋ํ ํ ์คํธ | |
| with gr.Column(scale=3): | |
| conversation_output = gr.Textbox( | |
| label="๐ฌ Generated Professional Conversation (Editable)", | |
| lines=25, | |
| max_lines=50, | |
| interactive=True, | |
| placeholder="Professional podcast conversation will appear here...\n์ ๋ฌธ ํ์บ์คํธ ๋ํ๊ฐ ์ฌ๊ธฐ์ ํ์๋ฉ๋๋ค...\nLa conversation professionnelle du podcast apparaรฎtra ici...", | |
| info="Edit the conversation as needed. Format: 'Speaker Name: Text'" | |
| ) | |
| # ์ค๋์ค ์์ฑ ๋ฒํผ | |
| with gr.Row(): | |
| generate_audio_btn = gr.Button( | |
| "๐๏ธ Generate Audio from Text", | |
| variant="secondary", | |
| size="lg" | |
| ) | |
| # ์ค๋ฅธ์ชฝ: ์ค๋์ค ์ถ๋ ฅ ๋ฐ ์ํ | |
| with gr.Column(scale=2): | |
| audio_output = gr.Audio( | |
| label="๐ง Professional Podcast Audio", | |
| type="filepath", | |
| interactive=False | |
| ) | |
| status_output = gr.Textbox( | |
| label="๐ Status", | |
| interactive=False, | |
| lines=3, | |
| elem_classes="status-box" | |
| ) | |
| # ๋์๋ง | |
| gr.Markdown(""" | |
| #### ๐ก Quick Tips: | |
| - **URL**: Paste any article link | |
| - **PDF**: Upload documents directly | |
| - **Keyword**: Enter topics for AI research | |
| - **24+ Languages** fully supported | |
| - Edit conversation before audio generation | |
| - Auto TTS engine selection per language | |
| """) | |
| # ์์ ์น์ | |
| with gr.Accordion("๐ Multi-Language Examples", open=False): | |
| gr.Examples( | |
| examples=[ | |
| ["https://huggingface.co/blog/openfreeai/cycle-navigator", "URL", "Local", "Edge-TTS", "English"], | |
| ["quantum computing breakthroughs", "Keyword", "Local", "Edge-TTS", "English"], | |
| ["์ธ๊ณต์ง๋ฅ ์ค๋ฆฌ์ ๊ท์ ", "Keyword", "Local", "Edge-TTS", "Korean"], | |
| ["https://huggingface.co/papers/2505.14810", "URL", "Local", "Edge-TTS", "Japanese"], | |
| ["intelligence artificielle tendances", "Keyword", "Local", "Edge-TTS", "French"], | |
| ["kรผnstliche intelligenz entwicklung", "Keyword", "Local", "Edge-TTS", "German"], | |
| ["inteligencia artificial avances", "Keyword", "Local", "Edge-TTS", "Spanish"], | |
| ], | |
| inputs=[url_input, input_type_selector, mode_selector, tts_selector, language_selector], | |
| outputs=[conversation_output, status_output], | |
| fn=synthesize_sync, | |
| cache_examples=False, | |
| ) | |
| # Input type change handler | |
| input_type_selector.change( | |
| fn=toggle_input_visibility, | |
| inputs=[input_type_selector], | |
| outputs=[url_input, pdf_input, keyword_input] | |
| ) | |
| # ์ธ์ด ๋ณ๊ฒฝ ์ TTS ์์ง ์ต์ ์ ๋ฐ์ดํธ | |
| language_selector.change( | |
| fn=update_tts_engine_for_language, | |
| inputs=[language_selector], | |
| outputs=[tts_selector] | |
| ) | |
| # ์ด๋ฒคํธ ์ฐ๊ฒฐ | |
| def get_article_input(input_type, url_input, pdf_input, keyword_input): | |
| """Get the appropriate input based on input type""" | |
| if input_type == "URL": | |
| return url_input | |
| elif input_type == "PDF": | |
| return pdf_input | |
| else: # Keyword | |
| return keyword_input | |
| convert_btn.click( | |
| fn=lambda input_type, url_input, pdf_input, keyword_input, mode, tts, lang: synthesize_sync( | |
| get_article_input(input_type, url_input, pdf_input, keyword_input), input_type, mode, tts, lang | |
| ), | |
| inputs=[input_type_selector, url_input, pdf_input, keyword_input, mode_selector, tts_selector, language_selector], | |
| outputs=[conversation_output, status_output] | |
| ) | |
| generate_audio_btn.click( | |
| fn=regenerate_audio_sync, | |
| inputs=[conversation_output, tts_selector, language_selector], | |
| outputs=[status_output, audio_output] | |
| ) | |
| # Launch the app | |
| if __name__ == "__main__": | |
| demo.queue(api_open=True, default_concurrency_limit=10).launch( | |
| show_api=True, | |
| share=False, | |
| server_name="0.0.0.0", | |
| server_port=7860 | |
| ) |