from flask import Flask, jsonify from datasets import load_dataset, Audio import pandas as pd import os import os os.environ["HF_HOME"] = "/tmp/huggingface" # Load dataset without decoding audio (required!) dataset = load_dataset("satyamr196/asr_fairness_audio", split="train") # dataset = dataset.with_format("python", decode_audio=False) dataset = dataset.cast_column("audio", Audio(decode=False)) print(" ___ ") csv_path = "test.csv" df = pd.read_csv(csv_path) print(f"CSV Loaded with {len(df)} rows") # def generateTranscript(ASR_model, dataset, csv_path, output_dir="./"): # import os # import time # import pandas as pd # import librosa # import tqdm # from transformers import pipeline # os.makedirs(output_dir, exist_ok=True) # # output_csv_path = os.path.join(output_dir, f"test_with_{ASR_model.replace('/', '_')}.csv") # output_csv_path = os.path.join(output_dir, f"test_with_{ASR_model}.csv") # if os.path.exists(output_csv_path): # print(f"Transcript already exists for model {ASR_model}. Skipping transcription.") # return # # Load metadata CSV # df = pd.read_csv(csv_path) # print(f"CSV Loaded with {len(df)} rows") # # Prepare # df[df.columns[0]] = df[df.columns[0]].str.strip().str.lower() # filename_column = df.columns[0] # transcripts = [] # rtfx_score = [] # # Load ASR model # pipe = pipeline("automatic-speech-recognition", model=ASR_model) # # Create a map of dataset samples by file name (assumes filename is in dataset) # dataset_map = { # sample["audio"]["path"].split("/")[-1].lower(): sample for sample in dataset # } # for idx, row in tqdm.tqdm(df.iterrows(), total=len(df)): # filename = row[filename_column].strip().lower() + ".wav" # if filename in dataset_map: # sample = dataset_map[filename] # try: # audio_array = sample["audio"]["array"] # sample_rate = sample["audio"]["sampling_rate"] # start_time = time.time() # result = pipe({"array": audio_array, "sampling_rate": sample_rate}) # end_time = time.time() # transcript = result["text"] # duration = librosa.get_duration(y=audio_array, sr=sample_rate) # rtfx = (end_time - start_time) / duration if duration > 0 else 0 # transcripts.append(transcript) # rtfx_score.append(rtfx) # print(f"✅ {filename}: RTFX = {rtfx:.2f}") # except Exception as e: # print(f"❌ Error with {filename}: {e}") # transcripts.append("") # rtfx_score.append(0) # else: # print(f"⚠️ File not in dataset: {filename}") # transcripts.append("") # rtfx_score.append(0) # # Append to original DataFrame # df['transcript'] = transcripts # df['rtfx'] = rtfx_score # df.to_csv(output_csv_path, index=False) # print(f"✅ Transcripts saved to {output_csv_path}") def generateTranscript(ASR_model, dataset, csv_path, output_dir="./"): import os import time import tqdm import pandas as pd import soundfile as sf from transformers import pipeline output_csv_path = os.path.join("./", f"test_with_{ASR_model}.csv") # Check if transcript already exists if os.path.exists(output_csv_path): print(f"Transcript already exists for model {ASR_model}. Skipping transcription.") return # Load CSV df = pd.read_csv(csv_path) print(f"CSV Loaded with {len(df)} rows") # Initialize ASR pipeline pipe = pipeline("automatic-speech-recognition", model=ASR_model, device=-1) print("Device set to use CPU") # Column with filenames in the CSV filename_column = df.columns[0] df[filename_column] = df[filename_column].str.strip().str.lower() # Build map from filename -> dataset sample (without decoding audio) print("Creating dataset map from filenames...") # dataset = dataset.with_format("python", decode_audio=False) dataset_map = { os.path.basename(sample["audio"]["path"]).lower(): sample for sample in dataset } transcripts = [] rtfx_score = [] for idx, row in tqdm.tqdm(df.iterrows(), total=len(df)): filename = row[filename_column] + ".wav" if filename in dataset_map: sample = dataset_map[filename] try: # Decode audio only when needed file_path = sample["audio"]["path"] audio_array, sample_rate = sf.read(file_path) start_time = time.time() result = pipe({"array": audio_array, "sampling_rate": sample_rate}) end_time = time.time() transcript = result["text"] duration = len(audio_array) / sample_rate rtfx = (end_time - start_time) / duration if duration > 0 else 0 transcripts.append(transcript) rtfx_score.append(rtfx) print(f"✅ {filename}: RTFX = {rtfx:.2f}") except Exception as e: print(f"❌ Error with {filename}: {e}") transcripts.append("") rtfx_score.append(0) else: print(f"❌ File not found in dataset: {filename}") transcripts.append("") rtfx_score.append(0) # Save results df["transcript"] = transcripts df["rtfx"] = rtfx_score os.makedirs(output_dir, exist_ok=True) # Create the directory if it doesn't exist output_dir = os.path.dirname(os.path.join(output_dir, f"test_with_{ASR_model}.csv")) # Get the directory path if not os.path.exists(output_dir): # Check if directory exists os.makedirs(output_dir) # Create directory if it doesn't exist print(f"Created directory: {output_dir}") df.to_csv(output_csv_path, index=False) print(f"\n📄 Transcripts saved to: {output_csv_path}") app = Flask(__name__) @app.route("/") def home(): return jsonify( { "message": "Welcome to the ASR Server! Please use the App link to access ASR-FairBench application.", "App link": "https://github.com/SatyamR196/ASR-FairBench" } ) @app.route("/asr_models") def asr_models(): models = [ "DeepSpeech", "Wav2Vec", "Jasper", "QuartzNet", "Conformer", "whisper", "Kaldi", "SpeechBrain", "Fairseq S2T", "ESPnet" ] generateTranscript("openai/whisper-base", dataset, csv_path, output_dir="./") ; # print("Transcript generation completed.") return jsonify({"asr_models": models}) # if __name__ == "__main__": # app.run(debug=True)