import os import json import nltk import gradio as gr from datetime import datetime from pathlib import Path import shutil # Download NLTK data nltk.download('punkt') class TTSDatasetCollector: """Manages TTS dataset collection and organization""" def __init__(self): # Get the directory where app.py is located self.root_path = Path(os.path.dirname(os.path.abspath(__file__))) / "dataset" self.sentences = [] self.current_index = 0 self.setup_directories() def setup_directories(self): """Create necessary directory structure""" # Create main dataset directory self.root_path.mkdir(exist_ok=True) # Create subdirectories for subdir in ['audio', 'transcriptions', 'metadata']: (self.root_path / subdir).mkdir(exist_ok=True) # Create a log file to track operations log_file = self.root_path / 'dataset_log.txt' if not log_file.exists(): with open(log_file, 'w', encoding='utf-8') as f: f.write(f"Dataset collection started on {datetime.now().isoformat()}\n") def log_operation(self, message: str): """Log operations to keep track of dataset collection""" log_file = self.root_path / 'dataset_log.txt' with open(log_file, 'a', encoding='utf-8') as f: f.write(f"[{datetime.now().isoformat()}] {message}\n") def load_text_file(self, file): """Process and load text file""" try: with open(file.name, 'r', encoding='utf-8') as f: text = f.read() self.sentences = nltk.sent_tokenize(text) self.current_index = 0 # Log the file loading self.log_operation(f"Loaded text file with {len(self.sentences)} sentences") return True, f"Loaded {len(self.sentences)} sentences" except Exception as e: self.log_operation(f"Error loading file: {str(e)}") return False, f"Error loading file: {str(e)}" def generate_filenames(self, dataset_name: str, speaker_id: str) -> tuple: """Generate unique filenames for audio and text""" timestamp = datetime.now().strftime("%Y%m%d%H%M%S") sentence_id = f"{self.current_index+1:04d}" base_name = f"{dataset_name}_{speaker_id}_{sentence_id}_{timestamp}" return f"{base_name}.wav", f"{base_name}.txt" def save_recording(self, audio_file, speaker_id: str, dataset_name: str): """Save recording and transcription""" if not audio_file or not speaker_id or not dataset_name: return False, "Missing required information" try: # Generate filenames audio_name, text_name = self.generate_filenames(dataset_name, speaker_id) # Create speaker directories audio_dir = self.root_path / 'audio' / speaker_id text_dir = self.root_path / 'transcriptions' / speaker_id audio_dir.mkdir(exist_ok=True) text_dir.mkdir(exist_ok=True) # Save audio file audio_path = audio_dir / audio_name shutil.copy2(audio_file, audio_path) # Save transcription text_path = text_dir / text_name self.save_transcription( text_path, self.sentences[self.current_index], { 'speaker_id': speaker_id, 'dataset_name': dataset_name, 'timestamp': datetime.now().isoformat(), 'audio_file': audio_name } ) # Update metadata self.update_metadata(speaker_id, dataset_name) # Log the save operation self.log_operation( f"Saved recording: Speaker={speaker_id}, Dataset={dataset_name}, " f"Audio={audio_name}, Text={text_name}" ) return True, f"Recording saved successfully as {audio_name}" except Exception as e: error_msg = f"Error saving recording: {str(e)}" self.log_operation(error_msg) return False, error_msg def save_transcription(self, file_path: Path, text: str, metadata: dict): """Save transcription with metadata""" content = f"""[METADATA] Recording_ID: {metadata['audio_file']} Speaker_ID: {metadata['speaker_id']} Dataset_Name: {metadata['dataset_name']} Timestamp: {metadata['timestamp']} [TEXT] {text} """ with open(file_path, 'w', encoding='utf-8') as f: f.write(content) def update_metadata(self, speaker_id: str, dataset_name: str): """Update dataset metadata file""" metadata_file = self.root_path / 'metadata' / 'dataset_info.json' try: if metadata_file.exists(): with open(metadata_file, 'r') as f: metadata = json.load(f) else: metadata = {'speakers': {}, 'last_updated': None} # Update speaker data if speaker_id not in metadata['speakers']: metadata['speakers'][speaker_id] = { 'total_recordings': 0, 'datasets': {} } if dataset_name not in metadata['speakers'][speaker_id]['datasets']: metadata['speakers'][speaker_id]['datasets'][dataset_name] = { 'recordings': 0, 'sentences': len(self.sentences), 'first_recording': datetime.now().isoformat(), 'last_recording': None } # Update counts and timestamps metadata['speakers'][speaker_id]['total_recordings'] += 1 metadata['speakers'][speaker_id]['datasets'][dataset_name]['recordings'] += 1 metadata['speakers'][speaker_id]['datasets'][dataset_name]['last_recording'] = \ datetime.now().isoformat() metadata['last_updated'] = datetime.now().isoformat() # Save updated metadata with open(metadata_file, 'w') as f: json.dump(metadata, f, indent=2) self.log_operation(f"Updated metadata for {speaker_id} in {dataset_name}") except Exception as e: error_msg = f"Error updating metadata: {str(e)}" self.log_operation(error_msg) print(error_msg) def create_interface(): """Create Gradio interface for TTS data collection""" collector = TTSDatasetCollector() with gr.Blocks(title="TTS Dataset Collection Tool") as interface: gr.Markdown("# TTS Dataset Collection Tool") with gr.Row(): # Left column - Configuration with gr.Column(): file_input = gr.File( label="Upload Text File (.txt)", file_types=[".txt"] ) speaker_id = gr.Textbox( label="Speaker ID", placeholder="Enter unique speaker identifier" ) dataset_name = gr.Textbox( label="Dataset Name", placeholder="Enter dataset name" ) # Right column - Recording with gr.Column(): current_text = gr.Textbox( label="Current Sentence", interactive=False ) audio_recorder = gr.Audio( label="Record Audio", type="filepath" ) next_text = gr.Textbox( label="Next Sentence", interactive=False ) # Controls with gr.Row(): prev_btn = gr.Button("Previous") next_btn = gr.Button("Next") save_btn = gr.Button("Save Recording", variant="primary") # Status with gr.Row(): progress = gr.Textbox( label="Progress", interactive=False ) status = gr.Textbox( label="Status", interactive=False ) # Dataset Info with gr.Row(): dataset_info = gr.JSON( label="Dataset Statistics", value={} ) def update_dataset_info(): """Update dataset statistics display""" try: metadata_file = collector.root_path / 'metadata' / 'dataset_info.json' if metadata_file.exists(): with open(metadata_file, 'r') as f: return json.load(f) return {} except Exception: return {} # Event handlers def load_file(file): if not file: return { current_text: "", next_text: "", progress: "", status: "No file selected", dataset_info: update_dataset_info() } success, msg = collector.load_text_file(file) if not success: return { current_text: "", next_text: "", progress: "", status: msg, dataset_info: update_dataset_info() } return { current_text: collector.sentences[0], next_text: collector.sentences[1] if len(collector.sentences) > 1 else "", progress: f"Sentence 1 of {len(collector.sentences)}", status: msg, dataset_info: update_dataset_info() } def update_display(): """Update interface display""" if not collector.sentences: return { current_text: "", next_text: "", progress: "", status: "No text loaded", dataset_info: update_dataset_info() } next_idx = collector.current_index + 1 return { current_text: collector.sentences[collector.current_index], next_text: collector.sentences[next_idx] if next_idx < len(collector.sentences) else "", progress: f"Sentence {collector.current_index + 1} of {len(collector.sentences)}", status: "Ready for recording", dataset_info: update_dataset_info() } def next_sentence(): """Move to next sentence""" if collector.sentences and collector.current_index < len(collector.sentences) - 1: collector.current_index += 1 return update_display() def prev_sentence(): """Move to previous sentence""" if collector.sentences and collector.current_index > 0: collector.current_index -= 1 return update_display() def save_recording(audio, spk_id, ds_name): """Handle saving recording""" if not audio: return {status: "No audio recorded", dataset_info: update_dataset_info()} if not spk_id: return {status: "Speaker ID required", dataset_info: update_dataset_info()} if not ds_name: return {status: "Dataset name required", dataset_info: update_dataset_info()} success, msg = collector.save_recording(audio, spk_id, ds_name) return { status: msg, dataset_info: update_dataset_info() } # Connect event handlers file_input.change( load_file, inputs=[file_input], outputs=[current_text, next_text, progress, status, dataset_info] ) next_btn.click( next_sentence, outputs=[current_text, next_text, progress, status, dataset_info] ) prev_btn.click( prev_sentence, outputs=[current_text, next_text, progress, status, dataset_info] ) save_btn.click( save_recording, inputs=[audio_recorder, speaker_id, dataset_name], outputs=[status, dataset_info] ) return interface if __name__ == "__main__": interface = create_interface() interface.launch()