Spaces:
Sleeping
Sleeping
import spaces | |
from snac import SNAC | |
import torch | |
import gradio as gr | |
from transformers import AutoModelForCausalLM, AutoTokenizer | |
from huggingface_hub import snapshot_download | |
import google.generativeai as genai | |
import re | |
import logging | |
# Set up logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
# Check if CUDA is available | |
device = "cuda" if torch.cuda.is_available() else "cpu" | |
print("Loading SNAC model...") | |
snac_model = SNAC.from_pretrained("hubertsiuzdak/snac_24khz") | |
snac_model = snac_model.to(device) | |
model_name = "canopylabs/orpheus-3b-0.1-ft" | |
model = AutoModelForCausalLM.from_pretrained(model_name, torch_dtype=torch.bfloat16) | |
model.to(device) | |
tokenizer = AutoTokenizer.from_pretrained(model_name) | |
print(f"Orpheus model loaded to {device}") | |
def generate_podcast_script(api_key, content, uploaded_file, duration, num_hosts): | |
try: | |
genai.configure(api_key=api_key) | |
model = genai.GenerativeModel('gemini-2.5-pro-preview-03-25') | |
combined_content = content or "" | |
if uploaded_file: | |
file_content = uploaded_file.read().decode('utf-8') | |
combined_content += "\n" + file_content if combined_content else file_content | |
prompt = f""" | |
Create a podcast script for {'one person' if num_hosts == 1 else 'two people'} discussing: | |
{combined_content} | |
Duration: {duration}. Include natural speech, humor, and occasional off-topic thoughts. | |
Use speech fillers like um, ah. Vary emotional tone. | |
Format: {'Monologue' if num_hosts == 1 else 'Alternating dialogue'} without speaker labels. | |
Separate {'paragraphs' if num_hosts == 1 else 'lines'} with blank lines. | |
Use emotion tags in angle brackets: <laugh>, <sigh>, <chuckle>, <cough>, <sniffle>, <groan>, <yawn>, <gasp>. | |
Example: "I can't believe I stayed up all night <yawn> only to find out the meeting was canceled <groan>." | |
Ensure content flows naturally and stays on topic. Match the script length to {duration}. | |
""" | |
response = model.generate_content(prompt) | |
return re.sub(r'[^a-zA-Z0-9\s.,?!<>]', '', response.text) | |
except Exception as e: | |
logger.error(f"Error generating podcast script: {str(e)}") | |
raise | |
def process_prompt(prompt, voice, tokenizer, device): | |
prompt = f"{voice}: {prompt}" | |
input_ids = tokenizer(prompt, return_tensors="pt").input_ids | |
start_token = torch.tensor([[128259]], dtype=torch.int64) | |
end_tokens = torch.tensor([[128009, 128260]], dtype=torch.int64) | |
modified_input_ids = torch.cat([start_token, input_ids, end_tokens], dim=1) | |
attention_mask = torch.ones_like(modified_input_ids) | |
return modified_input_ids.to(device), attention_mask.to(device) | |
def parse_output(generated_ids): | |
token_to_find = 128257 | |
token_to_remove = 128258 | |
token_indices = (generated_ids == token_to_find).nonzero(as_tuple=True) | |
if len(token_indices[1]) > 0: | |
last_occurrence_idx = token_indices[1][-1].item() | |
cropped_tensor = generated_ids[:, last_occurrence_idx+1:] | |
else: | |
cropped_tensor = generated_ids | |
processed_rows = [] | |
for row in cropped_tensor: | |
masked_row = row[row != token_to_remove] | |
processed_rows.append(masked_row) | |
code_lists = [] | |
for row in processed_rows: | |
row_length = row.size(0) | |
new_length = (row_length // 7) * 7 | |
trimmed_row = row[:new_length] | |
trimmed_row = [t - 128266 for t in trimmed_row] | |
code_lists.append(trimmed_row) | |
return code_lists[0] | |
def generate_speech(text, voice, temperature, top_p, repetition_penalty, max_new_tokens, progress=gr.Progress()): | |
if not text.strip(): | |
return None | |
try: | |
progress(0.1, "Processing text...") | |
input_ids, attention_mask = process_prompt(text, voice, tokenizer, device) | |
progress(0.3, "Generating speech tokens...") | |
with torch.no_grad(): | |
generated_ids = model.generate( | |
input_ids, | |
attention_mask=attention_mask, | |
do_sample=True, | |
temperature=temperature, | |
top_p=top_p, | |
repetition_penalty=repetition_penalty, | |
max_new_tokens=max_new_tokens, | |
num_return_sequences=1, | |
eos_token_id=128258, | |
) | |
progress(0.6, "Processing speech tokens...") | |
code_list = parse_output(generated_ids) | |
progress(0.8, "Converting to audio...") | |
audio_samples = redistribute_codes(code_list, snac_model) | |
return (24000, audio_samples) # Return sample rate and audio | |
except Exception as e: | |
print(f"Error generating speech: {e}") | |
return None | |
# Create Gradio interface | |
with gr.Blocks(title="AI Podcaster") as demo: | |
with gr.Row(): | |
with gr.Column(scale=1): | |
gemini_api_key = gr.Textbox(label="Gemini API Key", type="password") | |
content = gr.Textbox(label="Content", lines=5) | |
uploaded_file = gr.File(label="Upload File") | |
duration = gr.Slider(minimum=1, maximum=60, value=5, step=1, label="Duration (minutes)") | |
num_hosts = gr.Radio(["1", "2"], label="Number of Hosts", value="1") | |
generate_script_btn = gr.Button("Generate Podcast Script") | |
with gr.Column(scale=2): | |
script_output = gr.Textbox(label="Generated Script", lines=10) | |
text_input = gr.Textbox(label="Text to speak", lines=5) | |
voice = gr.Dropdown(choices=["Narrator", "Male", "Female"], value="Narrator", label="Voice") | |
with gr.Row(): | |
temperature = gr.Slider(minimum=0.1, maximum=1.0, value=0.7, step=0.1, label="Temperature") | |
top_p = gr.Slider(minimum=0.1, maximum=1.0, value=0.9, step=0.1, label="Top P") | |
with gr.Row(): | |
repetition_penalty = gr.Slider(minimum=1.0, maximum=2.0, value=1.2, step=0.1, label="Repetition Penalty") | |
max_new_tokens = gr.Slider(minimum=100, maximum=1000, value=500, step=50, label="Max New Tokens") | |
submit_btn = gr.Button("Generate Speech") | |
clear_btn = gr.Button("Clear") | |
with gr.Column(scale=2): | |
audio_output = gr.Audio(label="Generated Speech", type="numpy") | |
# Set up event handlers | |
generate_script_btn.click( | |
fn=generate_podcast_script, | |
inputs=[gemini_api_key, content, uploaded_file, duration, num_hosts], | |
outputs=script_output | |
) | |
submit_btn.click( | |
fn=generate_speech, | |
inputs=[text_input, voice, temperature, top_p, repetition_penalty, max_new_tokens], | |
outputs=audio_output | |
) | |
clear_btn.click( | |
fn=lambda: (None, None), | |
inputs=[], | |
outputs=[text_input, audio_output] | |
) | |
# Launch the app | |
if __name__ == "__main__": | |
demo.queue().launch(share=False, ssr_mode=False) |