AI-Podcast / app.py
openfree's picture
Update app.py
24c8c4e verified
raw
history blame
22.5 kB
import gradio as gr
import os
import asyncio
import torch
import io
import json
import re
import httpx
import tempfile
import wave
import base64
import numpy as np
import soundfile as sf
from dataclasses import dataclass
from typing import List, Tuple, Dict, Optional
from pathlib import Path
from threading import Thread
from dotenv import load_dotenv
# Edge TTS imports
import edge_tts
from pydub import AudioSegment
# OpenAI imports
from openai import OpenAI
# Transformers imports (for local mode)
from transformers import (
AutoModelForCausalLM,
AutoTokenizer,
TextIteratorStreamer,
BitsAndBytesConfig,
)
# Spark TTS imports
try:
from transformers import AutoModel
SPARK_AVAILABLE = True
except:
SPARK_AVAILABLE = False
# MeloTTS imports (for local mode)
try:
os.system("python -m unidic download")
from melo.api import TTS as MeloTTS
MELO_AVAILABLE = True
except:
MELO_AVAILABLE = False
load_dotenv()
@dataclass
class ConversationConfig:
max_words: int = 6000
prefix_url: str = "https://r.jina.ai/"
model_name: str = "meta-llama/Meta-Llama-3.1-8B-Instruct-Turbo"
local_model_name: str = "NousResearch/Hermes-2-Pro-Llama-3-8B"
class UnifiedAudioConverter:
def __init__(self, config: ConversationConfig):
self.config = config
self.llm_client = None
self.local_model = None
self.tokenizer = None
self.melo_models = None
self.spark_model = None
self.spark_tokenizer = None
self.device = "cuda" if torch.cuda.is_available() else "cpu"
def initialize_api_mode(self, api_key: str):
"""Initialize API mode with Together API"""
self.llm_client = OpenAI(api_key=api_key, base_url="https://api.together.xyz/v1")
def initialize_local_mode(self):
"""Initialize local mode with Hugging Face model"""
if self.local_model is None:
quantization_config = BitsAndBytesConfig(
load_in_4bit=True,
bnb_4bit_compute_dtype=torch.float16
)
self.local_model = AutoModelForCausalLM.from_pretrained(
self.config.local_model_name,
quantization_config=quantization_config
)
self.tokenizer = AutoTokenizer.from_pretrained(
self.config.local_model_name,
revision='8ab73a6800796d84448bc936db9bac5ad9f984ae'
)
def initialize_spark_tts(self):
"""Initialize Spark TTS model"""
if SPARK_AVAILABLE and self.spark_model is None:
try:
self.spark_model = AutoModel.from_pretrained(
"SparkAudio/Spark-TTS-0.5B",
trust_remote_code=True,
torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32
).to(self.device)
# Spark TTS๋Š” ๋ณ„๋„์˜ ํ† ํฌ๋‚˜์ด์ €๊ฐ€ ํ•„์š”ํ•˜์ง€ ์•Š์„ ์ˆ˜ ์žˆ์Œ
print("Spark TTS model loaded successfully")
except Exception as e:
print(f"Failed to load Spark TTS: {e}")
def initialize_melo_tts(self):
"""Initialize MeloTTS models"""
if MELO_AVAILABLE and self.melo_models is None:
self.melo_models = {"EN": MeloTTS(language="EN", device=self.device)}
def fetch_text(self, url: str) -> str:
"""Fetch text content from URL"""
if not url:
raise ValueError("URL cannot be empty")
if not url.startswith("http://") and not url.startswith("https://"):
raise ValueError("URL must start with 'http://' or 'https://'")
full_url = f"{self.config.prefix_url}{url}"
try:
response = httpx.get(full_url, timeout=60.0)
response.raise_for_status()
return response.text
except httpx.HTTPError as e:
raise RuntimeError(f"Failed to fetch URL: {e}")
def _build_prompt(self, text: str) -> str:
"""Build prompt for conversation generation"""
template = """
{
"conversation": [
{"speaker": "", "text": ""},
{"speaker": "", "text": ""}
]
}
"""
return (
f"{text}\n\nConvert the provided text into a short, informative and crisp "
f"podcast conversation between two experts. The tone should be "
f"professional and engaging. Please adhere to the following "
f"format and return ONLY the JSON:\n{template}"
)
def extract_conversation_api(self, text: str) -> Dict:
"""Extract conversation using API"""
if not self.llm_client:
raise RuntimeError("API mode not initialized")
try:
chat_completion = self.llm_client.chat.completions.create(
messages=[{"role": "user", "content": self._build_prompt(text)}],
model=self.config.model_name,
)
pattern = r"\{(?:[^{}]|(?:\{[^{}]*\}))*\}"
json_match = re.search(pattern, chat_completion.choices[0].message.content)
if not json_match:
raise ValueError("No valid JSON found in response")
return json.loads(json_match.group())
except Exception as e:
raise RuntimeError(f"Failed to extract conversation: {e}")
def extract_conversation_local(self, text: str, progress=None) -> Dict:
"""Extract conversation using local model"""
if not self.local_model or not self.tokenizer:
raise RuntimeError("Local mode not initialized")
chat = [{
"role": "user",
"content": self._build_prompt(text)
}]
terminators = [
self.tokenizer.eos_token_id,
self.tokenizer.convert_tokens_to_ids("<|eot_id|>")
]
messages = self.tokenizer.apply_chat_template(
chat, tokenize=False, add_generation_prompt=True
)
model_inputs = self.tokenizer([messages], return_tensors="pt").to(self.device)
streamer = TextIteratorStreamer(
self.tokenizer, timeout=10.0, skip_prompt=True, skip_special_tokens=True
)
generate_kwargs = dict(
model_inputs,
streamer=streamer,
max_new_tokens=4000,
do_sample=True,
temperature=0.9,
eos_token_id=terminators,
)
t = Thread(target=self.local_model.generate, kwargs=generate_kwargs)
t.start()
partial_text = ""
for new_text in streamer:
partial_text += new_text
pattern = r"\{(?:[^{}]|(?:\{[^{}]*\}))*\}"
json_match = re.search(pattern, partial_text)
if json_match:
return json.loads(json_match.group())
else:
# Return a default template if no valid JSON found
return {
"conversation": [
{"speaker": "Host", "text": "Welcome to our podcast."},
{"speaker": "Guest", "text": "Thank you for having me."}
]
}
def parse_conversation_text(self, conversation_text: str) -> Dict:
"""Parse conversation text back to JSON format"""
lines = conversation_text.strip().split('\n')
conversation_data = {"conversation": []}
for line in lines:
if ':' in line:
speaker, text = line.split(':', 1)
conversation_data["conversation"].append({
"speaker": speaker.strip(),
"text": text.strip()
})
return conversation_data
async def text_to_speech_edge(self, conversation_json: Dict, voice_1: str, voice_2: str) -> Tuple[str, str]:
"""Convert text to speech using Edge TTS"""
output_dir = Path(self._create_output_directory())
filenames = []
try:
for i, turn in enumerate(conversation_json["conversation"]):
filename = output_dir / f"output_{i}.wav"
voice = voice_1 if i % 2 == 0 else voice_2
tmp_path = await self._generate_audio_edge(turn["text"], voice)
os.rename(tmp_path, filename)
filenames.append(str(filename))
# Combine audio files
final_output = os.path.join(output_dir, "combined_output.wav")
self._combine_audio_files(filenames, final_output)
# Generate conversation text
conversation_text = "\n".join(
f"{turn.get('speaker', f'Speaker {i+1}')}: {turn['text']}"
for i, turn in enumerate(conversation_json["conversation"])
)
return final_output, conversation_text
except Exception as e:
raise RuntimeError(f"Failed to convert text to speech: {e}")
async def _generate_audio_edge(self, text: str, voice: str) -> str:
"""Generate audio using Edge TTS"""
if not text.strip():
raise ValueError("Text cannot be empty")
voice_short_name = voice.split(" - ")[0] if " - " in voice else voice
communicate = edge_tts.Communicate(text, voice_short_name)
with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as tmp_file:
tmp_path = tmp_file.name
await communicate.save(tmp_path)
return tmp_path
def text_to_speech_spark(self, conversation_json: Dict, progress=None) -> Tuple[str, str]:
"""Convert text to speech using Spark TTS"""
if not SPARK_AVAILABLE or self.spark_model is None:
raise RuntimeError("Spark TTS not available")
try:
combined_audio = []
sample_rate = 22050 # Default sample rate for Spark TTS
# Two different speaker configurations for variety
speakers = ["female_1", "male_1"] # Adjust based on actual Spark TTS speaker options
for i, turn in enumerate(conversation_json["conversation"]):
text = turn["text"]
if not text.strip():
continue
# Generate audio with Spark TTS
# Note: The exact API might differ, adjust based on actual Spark TTS documentation
try:
with torch.no_grad():
audio_output = self.spark_model.synthesize(
text=text,
voice=speakers[i % 2] if len(speakers) > 1 else speakers[0],
speed=1.0,
# Add other parameters as needed
)
# Convert to numpy array if needed
if isinstance(audio_output, torch.Tensor):
audio_output = audio_output.cpu().numpy()
combined_audio.append(audio_output)
except Exception as e:
print(f"Error generating audio for turn {i}: {e}")
# Generate silence as fallback
silence = np.zeros(int(sample_rate * 0.5)) # 0.5 second silence
combined_audio.append(silence)
# Combine all audio segments
if combined_audio:
final_audio = np.concatenate(combined_audio)
# Save to file
output_path = "spark_podcast_output.wav"
sf.write(output_path, final_audio, sample_rate)
else:
raise RuntimeError("No audio generated")
# Generate conversation text
conversation_text = "\n".join(
f"{turn.get('speaker', f'Speaker {i+1}')}: {turn['text']}"
for i, turn in enumerate(conversation_json["conversation"])
)
return output_path, conversation_text
except Exception as e:
raise RuntimeError(f"Failed to convert text to speech with Spark TTS: {e}")
def text_to_speech_melo(self, conversation_json: Dict, progress=None) -> Tuple[str, str]:
"""Convert text to speech using MeloTTS"""
if not MELO_AVAILABLE or not self.melo_models:
raise RuntimeError("MeloTTS not available")
speakers = ["EN-Default", "EN-US"]
combined_audio = AudioSegment.empty()
for i, turn in enumerate(conversation_json["conversation"]):
bio = io.BytesIO()
text = turn["text"]
speaker = speakers[i % 2]
speaker_id = self.melo_models["EN"].hps.data.spk2id[speaker]
# Generate audio
self.melo_models["EN"].tts_to_file(
text, speaker_id, bio, speed=1.0,
pbar=progress.tqdm if progress else None,
format="wav"
)
bio.seek(0)
audio_segment = AudioSegment.from_file(bio, format="wav")
combined_audio += audio_segment
# Save final audio
final_audio_path = "melo_podcast.mp3"
combined_audio.export(final_audio_path, format="mp3")
# Generate conversation text
conversation_text = "\n".join(
f"{turn.get('speaker', f'Speaker {i+1}')}: {turn['text']}"
for i, turn in enumerate(conversation_json["conversation"])
)
return final_audio_path, conversation_text
def _create_output_directory(self) -> str:
"""Create a unique output directory"""
random_bytes = os.urandom(8)
folder_name = base64.urlsafe_b64encode(random_bytes).decode("utf-8")
os.makedirs(folder_name, exist_ok=True)
return folder_name
def _combine_audio_files(self, filenames: List[str], output_file: str) -> None:
"""Combine multiple audio files into one"""
if not filenames:
raise ValueError("No input files provided")
try:
audio_segments = []
for filename in filenames:
audio_segment = AudioSegment.from_file(filename)
audio_segments.append(audio_segment)
combined = sum(audio_segments)
combined.export(output_file, format="wav")
# Clean up temporary files
for filename in filenames:
os.remove(filename)
except Exception as e:
raise RuntimeError(f"Failed to combine audio files: {e}")
# Global converter instance
converter = UnifiedAudioConverter(ConversationConfig())
async def synthesize(article_url: str, mode: str = "API", tts_engine: str = "Edge-TTS"):
"""Main synthesis function"""
if not article_url:
return "Please provide a valid URL.", None
try:
# Fetch text from URL
text = converter.fetch_text(article_url)
# Limit text to max words
words = text.split()
if len(words) > converter.config.max_words:
text = " ".join(words[:converter.config.max_words])
# Extract conversation based on mode
if mode == "API":
api_key = os.environ.get("TOGETHER_API_KEY")
if not api_key:
return "API key not found. Please set TOGETHER_API_KEY environment variable.", None
converter.initialize_api_mode(api_key)
conversation_json = converter.extract_conversation_api(text)
else: # Local mode
converter.initialize_local_mode()
conversation_json = converter.extract_conversation_local(text)
# Generate conversation text
conversation_text = "\n".join(
f"{turn.get('speaker', f'Speaker {i+1}')}: {turn['text']}"
for i, turn in enumerate(conversation_json["conversation"])
)
return conversation_text, None
except Exception as e:
return f"Error: {str(e)}", None
async def regenerate_audio(conversation_text: str, tts_engine: str = "Edge-TTS"):
"""Regenerate audio from edited conversation text"""
if not conversation_text.strip():
return "Please provide conversation text.", None
try:
# Parse the conversation text back to JSON format
conversation_json = converter.parse_conversation_text(conversation_text)
if not conversation_json["conversation"]:
return "No valid conversation found in the text.", None
# Generate audio based on TTS engine
if tts_engine == "Edge-TTS":
output_file, _ = await converter.text_to_speech_edge(
conversation_json,
"en-US-AvaMultilingualNeural",
"en-US-AndrewMultilingualNeural"
)
elif tts_engine == "Spark-TTS":
if not SPARK_AVAILABLE:
return "Spark TTS not available. Please install required dependencies.", None
converter.initialize_spark_tts()
output_file, _ = converter.text_to_speech_spark(conversation_json)
else: # MeloTTS
if not MELO_AVAILABLE:
return "MeloTTS not available. Please install required dependencies.", None
converter.initialize_melo_tts()
output_file, _ = converter.text_to_speech_melo(conversation_json)
return "Audio generated successfully!", output_file
except Exception as e:
return f"Error generating audio: {str(e)}", None
def synthesize_sync(article_url: str, mode: str = "API", tts_engine: str = "Edge-TTS"):
"""Synchronous wrapper for async synthesis"""
return asyncio.run(synthesize(article_url, mode, tts_engine))
def regenerate_audio_sync(conversation_text: str, tts_engine: str = "Edge-TTS"):
"""Synchronous wrapper for async audio regeneration"""
return asyncio.run(regenerate_audio(conversation_text, tts_engine))
# Gradio Interface
with gr.Blocks(theme='soft', title="URL to Podcast Converter") as demo:
gr.Markdown("# ๐ŸŽ™๏ธ URL to Podcast Converter")
gr.Markdown("Convert any article, blog, or news into an engaging podcast conversation!")
with gr.Row():
with gr.Column(scale=3):
url_input = gr.Textbox(
label="Article URL",
placeholder="Enter the article URL here...",
value=""
)
with gr.Column(scale=1):
mode_selector = gr.Radio(
choices=["API", "Local"],
value="API",
label="Processing Mode",
info="API: Faster, requires API key | Local: Slower, runs on device"
)
# TTS ์—”์ง„ ์„ ํƒ - ๊ธฐ๋ณธ 2๊ฐœ์™€ ์ถ”๊ฐ€ ์˜ต์…˜์œผ๋กœ ๊ตฌ๋ถ„
with gr.Group():
gr.Markdown("### TTS Engine Selection")
tts_selector = gr.Radio(
choices=["Edge-TTS", "Spark-TTS", "MeloTTS"],
value="Edge-TTS",
label="TTS Engine",
info="Edge-TTS: Cloud-based, natural voices | Spark-TTS: Local AI model | MeloTTS: Local, requires GPU"
)
gr.Markdown("""
**Recommended:**
- ๐ŸŒŸ **Edge-TTS**: Best quality, cloud-based
- ๐Ÿค– **Spark-TTS**: Local AI model, good quality
**Additional Option:**
- โšก **MeloTTS**: Local processing, GPU recommended
""")
convert_btn = gr.Button("๐ŸŽฏ Generate Conversation", variant="primary", size="lg")
with gr.Row():
with gr.Column():
conversation_output = gr.Textbox(
label="Generated Conversation (Editable)",
lines=15,
max_lines=30,
interactive=True,
placeholder="Generated conversation will appear here. You can edit it before generating audio.",
info="Edit the conversation as needed. Format: 'Speaker Name: Text'"
)
# ์˜ค๋””์˜ค ์ƒ์„ฑ ๋ฒ„ํŠผ ์ถ”๊ฐ€
with gr.Row():
generate_audio_btn = gr.Button("๐ŸŽ™๏ธ Generate Audio from Text", variant="secondary", size="lg")
gr.Markdown("*Edit the conversation above, then click to generate audio*")
with gr.Column():
audio_output = gr.Audio(
label="Podcast Audio",
type="filepath",
interactive=False
)
# ์ƒํƒœ ๋ฉ”์‹œ์ง€ ์ถ”๊ฐ€
status_output = gr.Textbox(
label="Status",
interactive=False,
visible=True
)
# TTS ์—”์ง„๋ณ„ ์„ค๋ช… ์ถ”๊ฐ€
with gr.Row():
gr.Markdown("""
### TTS Engine Details:
- **Edge-TTS**: Microsoft's cloud TTS service with high-quality natural voices. Requires internet connection.
- **Spark-TTS**: SparkAudio's local AI model (0.5B parameters). Runs on your device, good for privacy.
- **MeloTTS**: Local TTS with multiple voice options. GPU recommended for better performance.
""")
gr.Examples(
examples=[
["https://huggingface.co/blog/openfree/cycle-navigator", "API", "Edge-TTS"],
["https://www.bbc.com/news/technology-67988517", "API", "Spark-TTS"],
["https://arxiv.org/abs/2301.00810", "API", "Edge-TTS"],
],
inputs=[url_input, mode_selector, tts_selector],
outputs=[conversation_output, status_output],
fn=synthesize_sync,
cache_examples=False,
)
# ์ด๋ฒคํŠธ ์—ฐ๊ฒฐ
convert_btn.click(
fn=synthesize_sync,
inputs=[url_input, mode_selector, tts_selector],
outputs=[conversation_output, status_output]
)
generate_audio_btn.click(
fn=regenerate_audio_sync,
inputs=[conversation_output, tts_selector],
outputs=[status_output, audio_output]
)
# Launch the app
if __name__ == "__main__":
demo.queue(api_open=True, default_concurrency_limit=10).launch(
show_api=True,
share=False,
server_name="0.0.0.0",
server_port=7860
)