import gradio as gr import base64 import mimetypes import os import re import struct import time import zipfile from google import genai from google.genai import types # تلاش برای ایمپورت pydub و تنظیم فلگ در دسترس بودن try: from pydub import AudioSegment PYDUB_AVAILABLE = True except ImportError: PYDUB_AVAILABLE = False # --- ثابتها --- SPEAKER_VOICES = [ "Achird", "Zubenelgenubi", "Vindemiatrix", "Sadachbia", "Sadaltager", "Sulafat", "Laomedeia", "Achernar", "Alnilam", "Schedar", "Gacrux", "Pulcherrima", "Umbriel", "Algieba", "Despina", "Erinome", "Algenib", "Rasalthgeti", "Orus", "Aoede", "Callirrhoe", "Autonoe", "Enceladus", "Iapetus", "Zephyr", "Puck", "Charon", "Kore", "Fenrir", "Leda" ] MODEL_NAMES = ["gemini-2.5-flash-preview-tts", "gemini-2.5-pro-preview-tts"] # --- توابع کمکی (سازگار شده برای لاگنویسی در Gradio) --- def save_binary_file(file_name, data, log_messages_list): try: with open(file_name, "wb") as f: f.write(data) log_messages_list.append(f"✅ فایل در مسیر زیر ذخیره شد: {file_name}") return file_name except Exception as e: log_messages_list.append(f"❌ خطا در ذخیره فایل {file_name}: {e}") return None def convert_to_wav(audio_data: bytes, mime_type: str) -> bytes: parameters = parse_audio_mime_type(mime_type) bits_per_sample = parameters["bits_per_sample"] sample_rate = parameters["rate"] num_channels = 1 data_size = len(audio_data) bytes_per_sample = bits_per_sample // 8 block_align = num_channels * bytes_per_sample byte_rate = sample_rate * block_align chunk_size = 36 + data_size header = struct.pack( "<4sI4s4sIHHIIHH4sI", b"RIFF", chunk_size, b"WAVE", b"fmt ", 16, 1, num_channels, sample_rate, byte_rate, block_align, bits_per_sample, b"data", data_size ) return header + audio_data def parse_audio_mime_type(mime_type: str) -> dict[str, int | None]: bits_per_sample = 16 rate = 24000 # Default rate for Gemini TTS parts = mime_type.split(";") for param in parts: param = param.strip() if param.lower().startswith("rate="): try: rate_str = param.split("=", 1)[1] rate = int(rate_str) except (ValueError, IndexError): pass elif param.startswith("audio/L"): # e.g., audio/L16 try: bits_per_sample = int(param.split("L", 1)[1]) except (ValueError, IndexError): pass return {"bits_per_sample": bits_per_sample, "rate": rate} def smart_text_split(text, max_size=3800): if len(text) <= max_size: return [text] chunks = [] current_chunk = "" # Split by sentences, keeping delimiters. Prioritize common Persian sentence enders. sentences = re.split(r'(?<=[.!?؟])\s+', text) for sentence in sentences: sentence_with_space = sentence + " " # Add potential space for length calculation if len(current_chunk) + len(sentence_with_space) > max_size: if current_chunk: # Add the current chunk if it's not empty chunks.append(current_chunk.strip()) # Now, current_chunk becomes the new sentence. # If this new sentence itself is too long, it needs to be split further. current_chunk = sentence while len(current_chunk) > max_size: # Find a good split point (e.g., comma, space) near max_size # Fallback to hard split if no good point found split_idx = -1 # Try splitting at Persian/English punctuation within the oversized chunk possible_split_chars = ['،', ',', ';', ':', ' '] for char_idx in range(max_size - 1, max_size // 2, -1): # Search backwards from max_size if current_chunk[char_idx] in possible_split_chars: split_idx = char_idx + 1 break if split_idx != -1: chunks.append(current_chunk[:split_idx].strip()) current_chunk = current_chunk[split_idx:].strip() else: # Hard split chunks.append(current_chunk[:max_size].strip()) current_chunk = current_chunk[max_size:].strip() else: current_chunk += (" " if current_chunk else "") + sentence if current_chunk: # Add any remaining part chunks.append(current_chunk.strip()) return [c for c in chunks if c] # Ensure no empty chunks def merge_audio_files_func(file_paths, output_path, log_messages_list): if not PYDUB_AVAILABLE: log_messages_list.append("❌ pydub در دسترس نیست. نمیتوان فایلها را ادغام کرد.") return False try: log_messages_list.append(f"🔗 در حال ادغام {len(file_paths)} فایل صوتی...") combined = AudioSegment.empty() for i, file_path in enumerate(file_paths): if os.path.exists(file_path): log_messages_list.append(f"📎 اضافه کردن فایل {i+1}: {os.path.basename(file_path)}") audio = AudioSegment.from_file(file_path) # pydub usually infers format combined += audio if i < len(file_paths) - 1: # Add short silence between segments combined += AudioSegment.silent(duration=200) # 200ms silence else: log_messages_list.append(f"⚠️ فایل پیدا نشد: {file_path}") combined.export(output_path, format="wav") log_messages_list.append(f"✅ فایل ادغام شده ذخیره شد: {output_path}") return True except Exception as e: log_messages_list.append(f"❌ خطا در ادغام فایلها: {e}") return False def create_zip_file(file_paths, zip_name, log_messages_list): try: with zipfile.ZipFile(zip_name, 'w') as zipf: for file_path in file_paths: if os.path.exists(file_path): zipf.write(file_path, os.path.basename(file_path)) log_messages_list.append(f"📦 فایل ZIP ایجاد شد: {zip_name}") return True except Exception as e: log_messages_list.append(f"❌ خطا در ایجاد فایل ZIP: {e}") return False # --- تابع اصلی تولید (سازگار شده برای Gradio) --- def core_generate_audio( text_input, prompt_input, selected_voice, output_base_name, model, temperature_val, max_chunk, sleep_time, merge_files, delete_partials, log_messages_list # Pass the list to append logs ): log_messages_list.append("🚀 شروع فرآیند تبدیل متن به گفتار...") # دریافت کلید API api_key = os.environ.get("GEMINI_API_KEY") if not api_key: log_messages_list.append("❌ خطا: کلید API جمینای (GEMINI_API_KEY) در Secrets این Space تنظیم نشده است.") log_messages_list.append("لطفاً به تنظیمات Space رفته و یک Secret با نام GEMINI_API_KEY و مقدار کلید خود ایجاد کنید.") return None, None # No audio path, no download path # مقداردهی اولیه کلاینت GenAI try: log_messages_list.append("🛠️ در حال ایجاد کلاینت جمینای...") client = genai.Client(api_key=api_key) log_messages_list.append("✅ کلاینت جمینای با موفقیت ایجاد شد.") except Exception as e: log_messages_list.append(f"❌ خطا در ایجاد کلاینت جمینای: {e}") log_messages_list.append("لطفاً از صحت کلید API خود اطمینان حاصل کنید.") return None, None if not text_input or text_input.strip() == "": log_messages_list.append("❌ خطا: متن ورودی برای تبدیل به گفتار خالی است.") return None, None text_chunks = smart_text_split(text_input, max_chunk) log_messages_list.append(f"📊 متن به {len(text_chunks)} قطعه تقسیم شد.") for i, chunk in enumerate(text_chunks): log_messages_list.append(f"📝 قطعه {i+1}: {len(chunk)} کاراکتر") text_chunks = [c for c in text_chunks if c] # فیلتر کردن قطعات خالی احتمالی if not text_chunks: log_messages_list.append("❌ خطا: پس از تقسیمبندی، هیچ قطعه متنی برای پردازش وجود ندارد.") return None, None generated_files = [] # نامگذاری فایلها بدون مسیر اضافی برای سادگی در محیط Space # فایلها در ریشه فضای کاری Space ذخیره میشوند for i, chunk in enumerate(text_chunks): log_messages_list.append(f"\n🔊 تولید صدا برای قطعه {i+1}/{len(text_chunks)}...") final_text = f'"{prompt_input}"\n{chunk}' if prompt_input and prompt_input.strip() else chunk contents = [types.Content(role="user", parts=[types.Part.from_text(text=final_text)])] generate_content_config = types.GenerateContentConfig( temperature=temperature_val, response_modalities=["audio"], speech_config=types.SpeechConfig( voice_config=types.VoiceConfig( prebuilt_voice_config=types.PrebuiltVoiceConfig(voice_name=selected_voice) ) ), ) current_chunk_filename_base = f"{output_base_name}_part{i+1:03d}" try: response = client.models.generate_content( # استفاده از generate_content برای سادگی model=model, contents=contents, config=generate_content_config, ) if (response.candidates and response.candidates[0].content and response.candidates[0].content.parts and response.candidates[0].content.parts[0].inline_data): inline_data = response.candidates[0].content.parts[0].inline_data data_buffer = inline_data.data # حدس پسوند فایل بر اساس MIME type file_extension = mimetypes.guess_extension(inline_data.mime_type) # اگر پسوند قابل تشخیص نبود یا باینری عمومی بود، WAV را در نظر میگیریم # و در صورت نیاز (مثلاً برای audio/L16) هدر WAV اضافه میکنیم if file_extension is None or "binary" in inline_data.mime_type or file_extension == ".bin": file_extension = ".wav" if "audio/L" in inline_data.mime_type: # نیاز به هدر WAV data_buffer = convert_to_wav(data_buffer, inline_data.mime_type) elif inline_data.mime_type == "audio/mpeg": file_extension = ".mp3" # اگر API مستقیما MP3 داد elif inline_data.mime_type == "audio/wav": file_extension = ".wav" # اگر API مستقیما WAV داد generated_file_path = save_binary_file(f"{current_chunk_filename_base}{file_extension}", data_buffer, log_messages_list) if generated_file_path: generated_files.append(generated_file_path) log_messages_list.append(f"✅ قطعه {i+1} تولید شد: {os.path.basename(generated_file_path)}") elif response.text: log_messages_list.append(f"ℹ️ پیام متنی از API برای قطعه {i+1}: {response.text}") if "rate limit" in response.text.lower() or "quota" in response.text.lower(): log_messages_list.append(f"⏳ به نظر میرسد به محدودیت تعداد درخواست API (Quota) رسیدهاید. لطفاً چند دقیقه صبر کنید و دوباره امتحان کنید، یا فاصله زمانی بین درخواستها را افزایش دهید.") else: log_messages_list.append(f"⚠️ پاسخ API برای قطعه {i+1} حاوی داده صوتی یا پیام متنی نبود. جزئیات پاسخ: {response.prompt_feedback if response else 'No response'}") except types.generation_types.BlockedPromptException as bpe: log_messages_list.append(f"❌ محتوای پرامپت برای قطعه {i+1} مسدود شد: {bpe}") log_messages_list.append(f"علت مسدود شدن: {bpe.response.prompt_feedback if bpe.response else 'نامشخص'}") log_messages_list.append("لطفاً متن ورودی یا پرامپت سبک گفتار را بررسی و اصلاح کنید.") continue except types.generation_types.StopCandidateException as sce: log_messages_list.append(f"❌ تولید محتوا برای قطعه {i+1} به دلیل نامشخصی متوقف شد: {sce}") continue except Exception as e: log_messages_list.append(f"❌ خطا در تولید قطعه {i+1}: {e}") if "API key not valid" in str(e): log_messages_list.append("خطای کلید API. لطفاً از معتبر بودن کلید و تنظیم صحیح آن در Secrets مطمئن شوید.") elif "resource has been exhausted" in str(e).lower() or "quota" in str(e).lower(): log_messages_list.append("به نظر میرسد محدودیت استفاده از API (Quota) شما تمام شده است.") continue if i < len(text_chunks) - 1 and len(text_chunks) > 1 : log_messages_list.append(f"⏱️ انتظار {sleep_time} ثانیه...") time.sleep(sleep_time) if not generated_files: log_messages_list.append("❌ هیچ فایل صوتی تولید نشد!") return None, None log_messages_list.append(f"\n🎉 {len(generated_files)} فایل صوتی با موفقیت تولید شد!") playback_file = None download_file = None if merge_files and len(generated_files) > 1: if not PYDUB_AVAILABLE: log_messages_list.append("⚠️ pydub برای ادغام در دسترس نیست. فایلها به صورت جداگانه در یک فایل ZIP ارائه میشوند.") zip_filename = f"{output_base_name}_all_parts.zip" if create_zip_file(generated_files, zip_filename, log_messages_list): download_file = zip_filename if generated_files: playback_file = generated_files[0] else: merged_filename = f"{output_base_name}_merged.wav" if merge_audio_files_func(generated_files, merged_filename, log_messages_list): playback_file = merged_filename download_file = merged_filename log_messages_list.append(f"🎵 فایل نهایی ادغام شده: {os.path.basename(merged_filename)}") if delete_partials: for file_path in generated_files: try: if os.path.abspath(file_path) != os.path.abspath(merged_filename): os.remove(file_path) log_messages_list.append(f"🗑️ فایل جزئی حذف شد: {os.path.basename(file_path)}") except Exception as e: log_messages_list.append(f"⚠️ خطا در حذف فایل جزئی {os.path.basename(file_path)}: {e}") else: log_messages_list.append("⚠️ ادغام ممکن نبود. فایلها به صورت جداگانه در یک فایل ZIP ارائه میشوند.") zip_filename = f"{output_base_name}_all_parts.zip" if create_zip_file(generated_files, zip_filename, log_messages_list): download_file = zip_filename if generated_files: playback_file = generated_files[0] elif len(generated_files) == 1: playback_file = generated_files[0] download_file = generated_files[0] else: # Multiple files, no merge requested (or PYDUB_AVAILABLE is False and merge_files was True) zip_filename = f"{output_base_name}_all_parts.zip" if create_zip_file(generated_files, zip_filename, log_messages_list): download_file = zip_filename if generated_files: playback_file = generated_files[0] if playback_file and not os.path.exists(playback_file): log_messages_list.append(f"⚠️ فایل پخش {os.path.basename(playback_file)} وجود ندارد!") playback_file = None if download_file and not os.path.exists(download_file): log_messages_list.append(f"⚠️ فایل دانلود {os.path.basename(download_file)} وجود ندارد!") download_file = None return playback_file, download_file # --- تابع رابط کاربری Gradio --- def gradio_tts_interface( use_file_input, uploaded_file, text_to_speak, speech_prompt, speaker_voice, output_filename_base_in, model_name, temperature, max_chunk_size, sleep_between_requests, merge_audio_files_flag, delete_partial_files_flag, progress=gr.Progress(track_tqdm=True) # track_tqdm for visual progress if using loops with tqdm ): log_messages = [] actual_text_input = "" if use_file_input: if uploaded_file is not None: try: # Gradio file objects have a .name attribute which is the temp path with open(uploaded_file.name, 'r', encoding='utf-8') as f: actual_text_input = f.read().strip() log_messages.append(f"✅ متن از فایل '{os.path.basename(uploaded_file.name)}' بارگذاری شد: {len(actual_text_input)} کاراکتر.") log_messages.append(f"📝 نمونه متن فایل: '{actual_text_input[:100]}{'...' if len(actual_text_input) > 100 else ''}'") if not actual_text_input: log_messages.append("❌ خطا: فایل آپلود شده خالی است یا قابل خواندن نیست.") return None, None, "\n".join(log_messages) except Exception as e: log_messages.append(f"❌ خطا در خواندن فایل آپلود شده: {e}") return None, None, "\n".join(log_messages) else: log_messages.append("❌ خطا: گزینه 'استفاده از فایل ورودی' انتخاب شده اما هیچ فایلی آپلود نشده است.") return None, None, "\n".join(log_messages) else: actual_text_input = text_to_speak if not actual_text_input or not actual_text_input.strip(): log_messages.append("❌ خطا: متن ورودی برای تبدیل به گفتار خالی است. لطفاً متنی را وارد کنید یا گزینه فایل را فعال کنید.") return None, None, "\n".join(log_messages) log_messages.append(f"📖 متن ورودی دستی: {len(actual_text_input)} کاراکتر") log_messages.append(f"📝 نمونه متن ورودی: '{actual_text_input[:100]}{'...' if len(actual_text_input) > 100 else ''}'") # Sanitize output_filename_base to prevent path traversal or invalid characters output_filename_base = re.sub(r'[^\w\-_]', '', output_filename_base_in if output_filename_base_in else "gemini_tts_output") if not output_filename_base: # If sanitization results in empty string output_filename_base = "gemini_tts_output" log_messages.append(f"🏷️ نام پایه فایل خروجی: {output_filename_base}") if not PYDUB_AVAILABLE: log_messages.append("⚠️ کتابخانه pydub در دسترس نیست. امکان ادغام فایلهای صوتی وجود نخواهد داشت و فایلهای صوتی به صورت جداگانه (در صورت وجود چند بخش) در یک فایل ZIP ارائه میشوند.") current_merge_audio_files = False # Force disable merge if pydub is not available else: current_merge_audio_files = merge_audio_files_flag playback_path, download_path = core_generate_audio( actual_text_input, speech_prompt, speaker_voice, output_filename_base, model_name, temperature, max_chunk_size, sleep_between_requests, current_merge_audio_files, delete_partial_files_flag, log_messages ) log_output_str = "\n".join(log_messages) valid_playback_path = playback_path if playback_path and os.path.exists(playback_path) else None valid_download_path = download_path if download_path and os.path.exists(download_path) else None if not valid_playback_path and not valid_download_path and not actual_text_input.strip(): pass # Avoid error message if it was just an empty input from the start elif not valid_playback_path and not valid_download_path and actual_text_input.strip(): # Add this only if there was text input but no output files log_output_str += "\n🛑 هیچ فایل صوتی برای پخش یا دانلود در دسترس نیست." return valid_playback_path, valid_download_path, log_output_str # --- تعریف رابط کاربری Gradio --- css = """ body { font-family: 'Tahoma', 'Arial', sans-serif; direction: rtl; } .gradio-container { max-width: 95% !important; margin: auto !important; padding: 10px !important; } @media (min-width: 768px) { .gradio-container { max-width: 800px !important; } } footer { display: none !important; } .gr-button { background-color: #1d67a3 !important; color: white !important; border-radius: 8px !important; } .gr-button:hover { background-color: #164f7e !important; } .gr-input, .gr-dropdown, .gr-slider, .gr-checkbox, .gr-textbox, .gr-file { border-radius: 6px !important; } .gr-panel { padding: 15px !important; border-radius: 8px !important; box-shadow: 0 2px 4px rgba(0,0,0,0.1) !important; } h2, h3 { color: #1d67a3; text-align: center; } label { font-weight: bold; color: #333; } #output_audio .gallery, #download_file_output .gallery { display: none !important; } /* Ensure text inputs and textareas are also LTR for code/API keys if needed, but general UI is RTL */ textarea, input[type="text"] { direction: rtl; text-align: right; } /* Override for specific LTR elements if any, e.g. API key input if it were visible */ """ with gr.Blocks(css=css, theme=gr.themes.Soft(primary_hue=gr.themes.colors.blue, secondary_hue=gr.themes.colors.sky)) as demo: gr.Markdown("## 🔊 تبدیل متن به گفتار با Gemini API (فارسی)") gr.Markdown("
ساخته شده بر اساس کد کولب توسط: aigolden
") gr.HTML("