AI-Podcast / app-backup2.py
openfree's picture
Rename app.py to app-backup2.py
897d75d verified
raw
history blame
24.8 kB
import gradio as gr
import os
import asyncio
import torch
import io
import json
import re
import httpx
import tempfile
import wave
import base64
import numpy as np
import soundfile as sf
import subprocess
import shutil
from dataclasses import dataclass
from typing import List, Tuple, Dict, Optional
from pathlib import Path
from threading import Thread
from dotenv import load_dotenv
# Edge TTS imports
import edge_tts
from pydub import AudioSegment
# OpenAI imports
from openai import OpenAI
# Transformers imports (for local mode)
from transformers import (
AutoModelForCausalLM,
AutoTokenizer,
TextIteratorStreamer,
BitsAndBytesConfig,
)
# Spark TTS imports
try:
from huggingface_hub import snapshot_download
SPARK_AVAILABLE = True
except:
SPARK_AVAILABLE = False
# MeloTTS imports (for local mode)
try:
os.system("python -m unidic download")
from melo.api import TTS as MeloTTS
MELO_AVAILABLE = True
except:
MELO_AVAILABLE = False
load_dotenv()
@dataclass
class ConversationConfig:
max_words: int = 6000
prefix_url: str = "https://r.jina.ai/"
model_name: str = "meta-llama/Meta-Llama-3.1-8B-Instruct-Turbo"
local_model_name: str = "NousResearch/Hermes-2-Pro-Llama-3-8B"
class UnifiedAudioConverter:
def __init__(self, config: ConversationConfig):
self.config = config
self.llm_client = None
self.local_model = None
self.tokenizer = None
self.melo_models = None
self.spark_model_dir = None
self.device = "cuda" if torch.cuda.is_available() else "cpu"
def initialize_api_mode(self, api_key: str):
"""Initialize API mode with Together API"""
self.llm_client = OpenAI(api_key=api_key, base_url="https://api.together.xyz/v1")
def initialize_local_mode(self):
"""Initialize local mode with Hugging Face model"""
if self.local_model is None:
quantization_config = BitsAndBytesConfig(
load_in_4bit=True,
bnb_4bit_compute_dtype=torch.float16
)
self.local_model = AutoModelForCausalLM.from_pretrained(
self.config.local_model_name,
quantization_config=quantization_config
)
self.tokenizer = AutoTokenizer.from_pretrained(
self.config.local_model_name,
revision='8ab73a6800796d84448bc936db9bac5ad9f984ae'
)
def initialize_spark_tts(self):
"""Initialize Spark TTS model by downloading if needed"""
if not SPARK_AVAILABLE:
raise RuntimeError("Spark TTS dependencies not available")
model_dir = "pretrained_models/Spark-TTS-0.5B"
# Check if model exists, if not download it
if not os.path.exists(model_dir):
print("Downloading Spark-TTS model...")
try:
os.makedirs("pretrained_models", exist_ok=True)
snapshot_download(
"SparkAudio/Spark-TTS-0.5B",
local_dir=model_dir
)
print("Spark-TTS model downloaded successfully")
except Exception as e:
raise RuntimeError(f"Failed to download Spark-TTS model: {e}")
self.spark_model_dir = model_dir
# Check if we have the CLI inference script
if not os.path.exists("cli/inference.py"):
print("Warning: Spark-TTS CLI not found. Please clone the Spark-TTS repository.")
def initialize_melo_tts(self):
"""Initialize MeloTTS models"""
if MELO_AVAILABLE and self.melo_models is None:
self.melo_models = {"EN": MeloTTS(language="EN", device=self.device)}
def fetch_text(self, url: str) -> str:
"""Fetch text content from URL"""
if not url:
raise ValueError("URL cannot be empty")
if not url.startswith("http://") and not url.startswith("https://"):
raise ValueError("URL must start with 'http://' or 'https://'")
full_url = f"{self.config.prefix_url}{url}"
try:
response = httpx.get(full_url, timeout=60.0)
response.raise_for_status()
return response.text
except httpx.HTTPError as e:
raise RuntimeError(f"Failed to fetch URL: {e}")
def _build_prompt(self, text: str) -> str:
"""Build prompt for conversation generation"""
template = """
{
"conversation": [
{"speaker": "", "text": ""},
{"speaker": "", "text": ""}
]
}
"""
return (
f"{text}\n\nConvert the provided text into a short, informative and crisp "
f"podcast conversation between two experts. The tone should be "
f"professional and engaging. Please adhere to the following "
f"format and return ONLY the JSON:\n{template}"
)
def extract_conversation_api(self, text: str) -> Dict:
"""Extract conversation using API"""
if not self.llm_client:
raise RuntimeError("API mode not initialized")
try:
chat_completion = self.llm_client.chat.completions.create(
messages=[{"role": "user", "content": self._build_prompt(text)}],
model=self.config.model_name,
)
pattern = r"\{(?:[^{}]|(?:\{[^{}]*\}))*\}"
json_match = re.search(pattern, chat_completion.choices[0].message.content)
if not json_match:
raise ValueError("No valid JSON found in response")
return json.loads(json_match.group())
except Exception as e:
raise RuntimeError(f"Failed to extract conversation: {e}")
def extract_conversation_local(self, text: str, progress=None) -> Dict:
"""Extract conversation using local model"""
if not self.local_model or not self.tokenizer:
raise RuntimeError("Local mode not initialized")
chat = [{
"role": "user",
"content": self._build_prompt(text)
}]
terminators = [
self.tokenizer.eos_token_id,
self.tokenizer.convert_tokens_to_ids("<|eot_id|>")
]
messages = self.tokenizer.apply_chat_template(
chat, tokenize=False, add_generation_prompt=True
)
model_inputs = self.tokenizer([messages], return_tensors="pt").to(self.device)
streamer = TextIteratorStreamer(
self.tokenizer, timeout=10.0, skip_prompt=True, skip_special_tokens=True
)
generate_kwargs = dict(
model_inputs,
streamer=streamer,
max_new_tokens=4000,
do_sample=True,
temperature=0.9,
eos_token_id=terminators,
)
t = Thread(target=self.local_model.generate, kwargs=generate_kwargs)
t.start()
partial_text = ""
for new_text in streamer:
partial_text += new_text
pattern = r"\{(?:[^{}]|(?:\{[^{}]*\}))*\}"
json_match = re.search(pattern, partial_text)
if json_match:
return json.loads(json_match.group())
else:
# Return a default template if no valid JSON found
return {
"conversation": [
{"speaker": "Host", "text": "Welcome to our podcast."},
{"speaker": "Guest", "text": "Thank you for having me."}
]
}
def parse_conversation_text(self, conversation_text: str) -> Dict:
"""Parse conversation text back to JSON format"""
lines = conversation_text.strip().split('\n')
conversation_data = {"conversation": []}
for line in lines:
if ':' in line:
speaker, text = line.split(':', 1)
conversation_data["conversation"].append({
"speaker": speaker.strip(),
"text": text.strip()
})
return conversation_data
async def text_to_speech_edge(self, conversation_json: Dict, voice_1: str, voice_2: str) -> Tuple[str, str]:
"""Convert text to speech using Edge TTS"""
output_dir = Path(self._create_output_directory())
filenames = []
try:
for i, turn in enumerate(conversation_json["conversation"]):
filename = output_dir / f"output_{i}.wav"
voice = voice_1 if i % 2 == 0 else voice_2
tmp_path = await self._generate_audio_edge(turn["text"], voice)
os.rename(tmp_path, filename)
filenames.append(str(filename))
# Combine audio files
final_output = os.path.join(output_dir, "combined_output.wav")
self._combine_audio_files(filenames, final_output)
# Generate conversation text
conversation_text = "\n".join(
f"{turn.get('speaker', f'Speaker {i+1}')}: {turn['text']}"
for i, turn in enumerate(conversation_json["conversation"])
)
return final_output, conversation_text
except Exception as e:
raise RuntimeError(f"Failed to convert text to speech: {e}")
async def _generate_audio_edge(self, text: str, voice: str) -> str:
"""Generate audio using Edge TTS"""
if not text.strip():
raise ValueError("Text cannot be empty")
voice_short_name = voice.split(" - ")[0] if " - " in voice else voice
communicate = edge_tts.Communicate(text, voice_short_name)
with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as tmp_file:
tmp_path = tmp_file.name
await communicate.save(tmp_path)
return tmp_path
def text_to_speech_spark(self, conversation_json: Dict, progress=None) -> Tuple[str, str]:
"""Convert text to speech using Spark TTS CLI"""
if not SPARK_AVAILABLE or not self.spark_model_dir:
raise RuntimeError("Spark TTS not available")
try:
output_dir = self._create_output_directory()
audio_files = []
# Create different voice characteristics for different speakers
voice_configs = [
{"prompt_text": "Hello, welcome to our podcast. I'm your host today.", "gender": "female"},
{"prompt_text": "Thank you for having me. I'm excited to be here.", "gender": "male"}
]
for i, turn in enumerate(conversation_json["conversation"]):
text = turn["text"]
if not text.strip():
continue
# Use different voice config for each speaker
voice_config = voice_configs[i % len(voice_configs)]
output_file = os.path.join(output_dir, f"spark_output_{i}.wav")
# Run Spark TTS CLI inference
cmd = [
"python", "-m", "cli.inference",
"--text", text,
"--device", "0" if torch.cuda.is_available() else "cpu",
"--save_dir", output_dir,
"--model_dir", self.spark_model_dir,
"--prompt_text", voice_config["prompt_text"],
"--output_name", f"spark_output_{i}.wav"
]
try:
# Run the command
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=60,
cwd="." # Make sure we're in the right directory
)
if result.returncode == 0:
audio_files.append(output_file)
else:
print(f"Spark TTS error for turn {i}: {result.stderr}")
# Create a short silence as fallback
silence = np.zeros(int(22050 * 1.0)) # 1 second of silence
sf.write(output_file, silence, 22050)
audio_files.append(output_file)
except subprocess.TimeoutExpired:
print(f"Spark TTS timeout for turn {i}")
# Create silence as fallback
silence = np.zeros(int(22050 * 1.0))
sf.write(output_file, silence, 22050)
audio_files.append(output_file)
except Exception as e:
print(f"Error running Spark TTS for turn {i}: {e}")
# Create silence as fallback
silence = np.zeros(int(22050 * 1.0))
sf.write(output_file, silence, 22050)
audio_files.append(output_file)
# Combine all audio files
if audio_files:
final_output = os.path.join(output_dir, "spark_combined.wav")
self._combine_audio_files(audio_files, final_output)
else:
raise RuntimeError("No audio files generated")
# Generate conversation text
conversation_text = "\n".join(
f"{turn.get('speaker', f'Speaker {i+1}')}: {turn['text']}"
for i, turn in enumerate(conversation_json["conversation"])
)
return final_output, conversation_text
except Exception as e:
raise RuntimeError(f"Failed to convert text to speech with Spark TTS: {e}")
def text_to_speech_melo(self, conversation_json: Dict, progress=None) -> Tuple[str, str]:
"""Convert text to speech using MeloTTS"""
if not MELO_AVAILABLE or not self.melo_models:
raise RuntimeError("MeloTTS not available")
speakers = ["EN-Default", "EN-US"]
combined_audio = AudioSegment.empty()
for i, turn in enumerate(conversation_json["conversation"]):
bio = io.BytesIO()
text = turn["text"]
speaker = speakers[i % 2]
speaker_id = self.melo_models["EN"].hps.data.spk2id[speaker]
# Generate audio
self.melo_models["EN"].tts_to_file(
text, speaker_id, bio, speed=1.0,
pbar=progress.tqdm if progress else None,
format="wav"
)
bio.seek(0)
audio_segment = AudioSegment.from_file(bio, format="wav")
combined_audio += audio_segment
# Save final audio
final_audio_path = "melo_podcast.mp3"
combined_audio.export(final_audio_path, format="mp3")
# Generate conversation text
conversation_text = "\n".join(
f"{turn.get('speaker', f'Speaker {i+1}')}: {turn['text']}"
for i, turn in enumerate(conversation_json["conversation"])
)
return final_audio_path, conversation_text
def _create_output_directory(self) -> str:
"""Create a unique output directory"""
random_bytes = os.urandom(8)
folder_name = base64.urlsafe_b64encode(random_bytes).decode("utf-8")
os.makedirs(folder_name, exist_ok=True)
return folder_name
def _combine_audio_files(self, filenames: List[str], output_file: str) -> None:
"""Combine multiple audio files into one"""
if not filenames:
raise ValueError("No input files provided")
try:
audio_segments = []
for filename in filenames:
if os.path.exists(filename):
audio_segment = AudioSegment.from_file(filename)
audio_segments.append(audio_segment)
if audio_segments:
combined = sum(audio_segments)
combined.export(output_file, format="wav")
# Clean up temporary files
for filename in filenames:
if os.path.exists(filename):
os.remove(filename)
except Exception as e:
raise RuntimeError(f"Failed to combine audio files: {e}")
# Global converter instance
converter = UnifiedAudioConverter(ConversationConfig())
async def synthesize(article_url: str, mode: str = "API", tts_engine: str = "Edge-TTS"):
"""Main synthesis function"""
if not article_url:
return "Please provide a valid URL.", None
try:
# Fetch text from URL
text = converter.fetch_text(article_url)
# Limit text to max words
words = text.split()
if len(words) > converter.config.max_words:
text = " ".join(words[:converter.config.max_words])
# Extract conversation based on mode
if mode == "API":
api_key = os.environ.get("TOGETHER_API_KEY")
if not api_key:
return "API key not found. Please set TOGETHER_API_KEY environment variable.", None
converter.initialize_api_mode(api_key)
conversation_json = converter.extract_conversation_api(text)
else: # Local mode
converter.initialize_local_mode()
conversation_json = converter.extract_conversation_local(text)
# Generate conversation text
conversation_text = "\n".join(
f"{turn.get('speaker', f'Speaker {i+1}')}: {turn['text']}"
for i, turn in enumerate(conversation_json["conversation"])
)
return conversation_text, None
except Exception as e:
return f"Error: {str(e)}", None
async def regenerate_audio(conversation_text: str, tts_engine: str = "Edge-TTS"):
"""Regenerate audio from edited conversation text"""
if not conversation_text.strip():
return "Please provide conversation text.", None
try:
# Parse the conversation text back to JSON format
conversation_json = converter.parse_conversation_text(conversation_text)
if not conversation_json["conversation"]:
return "No valid conversation found in the text.", None
# Generate audio based on TTS engine
if tts_engine == "Edge-TTS":
output_file, _ = await converter.text_to_speech_edge(
conversation_json,
"en-US-AvaMultilingualNeural",
"en-US-AndrewMultilingualNeural"
)
elif tts_engine == "Spark-TTS":
if not SPARK_AVAILABLE:
return "Spark TTS not available. Please install required dependencies and clone the Spark-TTS repository.", None
converter.initialize_spark_tts()
output_file, _ = converter.text_to_speech_spark(conversation_json)
else: # MeloTTS
if not MELO_AVAILABLE:
return "MeloTTS not available. Please install required dependencies.", None
converter.initialize_melo_tts()
output_file, _ = converter.text_to_speech_melo(conversation_json)
return "Audio generated successfully!", output_file
except Exception as e:
return f"Error generating audio: {str(e)}", None
def synthesize_sync(article_url: str, mode: str = "API", tts_engine: str = "Edge-TTS"):
"""Synchronous wrapper for async synthesis"""
return asyncio.run(synthesize(article_url, mode, tts_engine))
def regenerate_audio_sync(conversation_text: str, tts_engine: str = "Edge-TTS"):
"""Synchronous wrapper for async audio regeneration"""
return asyncio.run(regenerate_audio(conversation_text, tts_engine))
# Gradio Interface
with gr.Blocks(theme='soft', title="URL to Podcast Converter") as demo:
gr.Markdown("# ๐ŸŽ™๏ธ URL to Podcast Converter")
gr.Markdown("Convert any article, blog, or news into an engaging podcast conversation!")
with gr.Row():
with gr.Column(scale=3):
url_input = gr.Textbox(
label="Article URL",
placeholder="Enter the article URL here...",
value=""
)
with gr.Column(scale=1):
mode_selector = gr.Radio(
choices=["API", "Local"],
value="API",
label="Processing Mode",
info="API: Faster, requires API key | Local: Slower, runs on device"
)
# TTS ์—”์ง„ ์„ ํƒ - ๊ธฐ๋ณธ 2๊ฐœ์™€ ์ถ”๊ฐ€ ์˜ต์…˜์œผ๋กœ ๊ตฌ๋ถ„
with gr.Group():
gr.Markdown("### TTS Engine Selection")
tts_selector = gr.Radio(
choices=["Edge-TTS", "Spark-TTS", "MeloTTS"],
value="Edge-TTS",
label="TTS Engine",
info="Edge-TTS: Cloud-based, natural voices | Spark-TTS: Local AI model | MeloTTS: Local, requires GPU"
)
gr.Markdown("""
**Recommended:**
- ๐ŸŒŸ **Edge-TTS**: Best quality, cloud-based, instant setup
- ๐Ÿค– **Spark-TTS**: Local AI model (0.5B), zero-shot voice cloning
**Additional Option:**
- โšก **MeloTTS**: Local processing, GPU recommended
""")
convert_btn = gr.Button("๐ŸŽฏ Generate Conversation", variant="primary", size="lg")
with gr.Row():
with gr.Column():
conversation_output = gr.Textbox(
label="Generated Conversation (Editable)",
lines=15,
max_lines=30,
interactive=True,
placeholder="Generated conversation will appear here. You can edit it before generating audio.",
info="Edit the conversation as needed. Format: 'Speaker Name: Text'"
)
# ์˜ค๋””์˜ค ์ƒ์„ฑ ๋ฒ„ํŠผ ์ถ”๊ฐ€
with gr.Row():
generate_audio_btn = gr.Button("๐ŸŽ™๏ธ Generate Audio from Text", variant="secondary", size="lg")
gr.Markdown("*Edit the conversation above, then click to generate audio*")
with gr.Column():
audio_output = gr.Audio(
label="Podcast Audio",
type="filepath",
interactive=False
)
# ์ƒํƒœ ๋ฉ”์‹œ์ง€ ์ถ”๊ฐ€
status_output = gr.Textbox(
label="Status",
interactive=False,
visible=True
)
# TTS ์—”์ง„๋ณ„ ์„ค๋ช… ๋ฐ ์„ค์น˜ ์•ˆ๋‚ด ์ถ”๊ฐ€
with gr.Row():
gr.Markdown("""
### TTS Engine Details:
- **Edge-TTS**: Microsoft's cloud TTS service with high-quality natural voices. Requires internet connection.
- **Spark-TTS**: SparkAudio's local AI model (0.5B parameters) with zero-shot voice cloning capability.
- **Setup required**: Clone [Spark-TTS repository](https://github.com/SparkAudio/Spark-TTS) in current directory
- Features: Bilingual support (Chinese/English), controllable speech generation
- License: CC BY-NC-SA (Non-commercial use only)
- **MeloTTS**: Local TTS with multiple voice options. GPU recommended for better performance.
### Spark-TTS Setup Instructions:
```bash
git clone https://github.com/SparkAudio/Spark-TTS.git
cd Spark-TTS
pip install -r requirements.txt
```
""")
gr.Examples(
examples=[
["https://huggingface.co/blog/openfree/cycle-navigator", "API", "Edge-TTS"],
["https://www.bbc.com/news/technology-67988517", "API", "Spark-TTS"],
["https://arxiv.org/abs/2301.00810", "API", "Edge-TTS"],
],
inputs=[url_input, mode_selector, tts_selector],
outputs=[conversation_output, status_output],
fn=synthesize_sync,
cache_examples=False,
)
# ์ด๋ฒคํŠธ ์—ฐ๊ฒฐ
convert_btn.click(
fn=synthesize_sync,
inputs=[url_input, mode_selector, tts_selector],
outputs=[conversation_output, status_output]
)
generate_audio_btn.click(
fn=regenerate_audio_sync,
inputs=[conversation_output, tts_selector],
outputs=[status_output, audio_output]
)
# Launch the app
if __name__ == "__main__":
demo.queue(api_open=True, default_concurrency_limit=10).launch(
show_api=True,
share=False,
server_name="0.0.0.0",
server_port=7860
)