import pandas as pd import re import numpy as np from jiwer import wer import statsmodels.api as sm import statsmodels.formula.api as smf from utils.load_csv import download_csv, upload_csv def generateResults(ASR_model): # Define normalization function def normalize_text(text): """ Normalize text by converting to lowercase, removing special characters, except digits, and handling None or float values. """ if text is None or pd.isna(text): # Check for None or NaN return "" if isinstance(text, float): # Check for floats and convert them to empty string return "" text = text.lower() # Convert to lowercase text = re.sub(r'[^a-z0-9\s]', '', text) # Keep only letters, digits, and spaces return text.strip() # Remove leading/trailing spaces # Load the CSV with whisper transcripts f"test_with_{ASR_model}.csv" csv_transcript = f'test_with_{ASR_model.replace("/","_")}.csv' # Read the CSV file df = download_csv(csv_transcript) if(df is None): print(f"CSV not found in the dataset repo. Please generate the transcript file first.") return # Normalize original text and whisper transcripts df['normalized_transcription'] = df[df.columns[1]].apply(normalize_text) # Replace 'original_text' with your column name # Check if whisper transcript column exists if 'transcript' in df.columns: df['normalized_transcript'] = df[df.columns[8]].apply(normalize_text) # Calculate WER wer_scores = [] for index, row in df.iterrows(): original = row['normalized_transcription'] transcript = row['normalized_transcript'] if original and transcript: wer_score = wer(original, transcript) else: wer_score = 1.0 # Maximum error if one text is missing wer_scores.append(wer_score) df['WER'] = wer_scores # Compute IQR Q1 = df['WER'].quantile(0.25) Q3 = df['WER'].quantile(0.75) IQR = Q3 - Q1 # Define outlier range lower_bound = Q1 - 1.5 * IQR upper_bound = Q3 + 1.5 * IQR # Remove outliers df = df[(df['WER'] >= lower_bound) & (df['WER'] <= upper_bound)] else: print("Column 'transcript' not found in CSV") # Save the updated CSV csv_result = f'test_with_{ASR_model.replace("/","_")}_WER.csv' upload_csv(df,csv_result) print(f"WER calculations saved to {csv_result}") avg_wer = df["WER"].mean() avg_rtfx = df["rtfx"].mean() print(f"Average WER: {avg_wer} and Avg RTFX : {avg_rtfx}") #---------------------------------------------------------------------------------------------------------- #---------------------------------------------------------------------------------------------------------- # Define protected attributes and label columns protected_attributes = ['gender', 'first_language', 'socioeconomic_bkgd', 'ethnicity'] label_column = 'normalized_transcription' prediction_column = 'normalized_transcript' wer_column = 'WER' data = df # Function to calculate WER disparity def calculate_wer_disparity(data, protected_attribute, wer_column): groups = data[protected_attribute].unique() wer_disparity = {} for group in groups: group_data = data[data[protected_attribute] == group] avg_wer = group_data[wer_column].mean() wer_disparity[group] = avg_wer return wer_disparity # Calculate WER disparity for each protected attribute for attribute in protected_attributes: disparity = calculate_wer_disparity(data, attribute, wer_column) print(f"WER Disparity for {attribute}:", disparity) #------------------------------------------------------------------------------------------------------- #------------------------------------------------------------------------------------------------------- data["Reference_words"] = data["normalized_transcription"].str.split().str.len() # Compute word error count (WER_count) data["WER_count"] = data["Reference_words"] * data["WER"] df = data categorical_cols = ['gender', 'first_language', 'socioeconomic_bkgd', 'ethnicity'] for col in categorical_cols: df[col] = df[col].astype("category") # Offset: log of reference word count (to adjust for different transcript lengths) df["log_Ref_Words"] = np.log(df["Reference_words"] + 1) # Adding 1 to avoid log(0) # Fit a Mixed-Effects Poisson Regression Model mixed_model = smf.mixedlm( formula="WER_count ~ log_Ref_Words + age + gender + first_language + socioeconomic_bkgd + ethnicity", # Fixed effects data=df, groups=df["combined_column"] # Random effect on speaker ).fit() # Display results # print(mixed_model.summary()) #-------------------------------------------------------------------------------------------------------------------------- #-------------------------------------------------------------------------------------------------------------------------- from scipy.stats import chi2 # Assume 'mixed_model' is your already-fitted mixed-effects model and 'df' is your DataFrame. # Also assume df["log_Ref_Words"] = np.log(df["Reference_words"] + 1) params = mixed_model.params # Set fixed values for continuous predictors: fixed_log_ref = df["log_Ref_Words"].mean() baseline_log = params["Intercept"] + params["log_Ref_Words"] * fixed_log_ref exposure = np.exp(fixed_log_ref) - 1 def compute_predicted_error_rate(category, level, params, baseline_log, exposure): """Computes the predicted WER (error rate) for a given level of a demographic attribute.""" coef_name = f"{category}[T.{level}]" effect = params.get(coef_name, 0) # For the baseline level, effect is 0. pred_log = baseline_log + effect pred_count = np.exp(pred_log) return pred_count / exposure def compute_category_fairness(category, params, baseline_log, exposure, df): """ For a given category, compute: - Predicted error rates for each subgroup level. - Raw fairness scores (0-100 scale: 100 = best, 0 = worst) based on linear scaling. - A weighted category fairness score using group proportions. """ levels = df[category].cat.categories predictions = {} for lvl in levels: predictions[lvl] = compute_predicted_error_rate(category, lvl, params, baseline_log, exposure) # Convert predictions to a Series. pred_series = pd.Series(predictions) min_pred, max_pred = pred_series.min(), pred_series.max() # Compute raw fairness scores: if all levels are identical, assign 100 to everyone. if max_pred == min_pred: raw_fairness = pred_series.apply(lambda x: 100.0) else: raw_fairness = pred_series.apply(lambda x: 100 * (1 - (x - min_pred) / (max_pred - min_pred))) # Weight the subgroup fairness scores by their sample proportions in the dataset. group_proportions = df[category].value_counts(normalize=True) # Ensure ordering matches the fairness scores index: group_proportions = group_proportions.reindex(raw_fairness.index, fill_value=0) weighted_category_fairness = np.average(raw_fairness, weights=group_proportions) return pred_series, raw_fairness, weighted_category_fairness def perform_lrt(attribute, df): """Performs Likelihood Ratio Test (LRT) to test the overall significance of an attribute.""" full_model = smf.mixedlm(f"WER ~ {attribute} + log_Ref_Words", df, groups=df["combined_column"]).fit() reduced_model = smf.mixedlm("WER ~ log_Ref_Words", df, groups=df["combined_column"]).fit() lr_stat = 2 * (full_model.llf - reduced_model.llf) df_diff = full_model.df_modelwc - reduced_model.df_modelwc p_value = chi2.sf(lr_stat, df_diff) return p_value # List of attributes to evaluate categories = ['gender', 'first_language', 'socioeconomic_bkgd', 'ethnicity'] results = {} adjusted_category_scores = [] # To store adjusted fairness scores for each category. weights_for_categories = [] # Weight each category based on significance if desired. for cat in categories: preds, raw_fairness, category_raw_score = compute_category_fairness(cat, params, baseline_log, exposure, df) # Perform LRT to get overall significance for this attribute. lrt_p_value = perform_lrt(cat, df) # Compute multiplier based on significance. # If p-value < 0.05, we penalize the fairness score proportionally. multiplier = (lrt_p_value / 0.05) if lrt_p_value < 0.05 else 1.0 # Adjusted fairness score for the category: adjusted_score = category_raw_score * multiplier # Save results. results[cat] = { 'Predicted Error Rates': preds, 'Raw Fairness Scores': raw_fairness, # 'Weighted Raw Fairness Score': category_raw_score, # 'LRT p-value': lrt_p_value, 'Adjusted Category Fairness Score': adjusted_score } # For overall score, we could weight categories (here we simply use the adjusted score). adjusted_category_scores.append(adjusted_score) # Optionally, use multiplier as a weight for overall aggregation. weights_for_categories.append(multiplier) # Compute overall fairness score across attributes using the adjusted category scores. overall_fairness_score = np.average(adjusted_category_scores) #FAAS is the Fairness Adjusted ASR Score based on which models will be ranked faas = 10*np.log10(overall_fairness_score/avg_wer) print("Fairness Adjusted ASR Score for the model is", faas) # print("\nFinal Overall Fairness Score (Weighted Average over Categories):", overall_fairness_score) # used for summary_speedometer,Leaderboard # print(results['gender']) # print(results['gender']['Predicted Error Rates']) # print(results['gender']['Adjusted Category Fairness Score']) print("________________________________") Results = { 'Predicted Error Rates': { 'gender': results['gender']['Predicted Error Rates'].to_dict(), # Convert Series to dict 'first_language': results['first_language']['Predicted Error Rates'].to_dict(), 'socioeconomic_bkgd': results['socioeconomic_bkgd']['Predicted Error Rates'].to_dict(), 'ethnicity': results['ethnicity']['Predicted Error Rates'].to_dict() }, 'Raw Fairness Scores': { 'gender': results['gender']['Raw Fairness Scores'].to_dict(), 'first_language': results['first_language']['Raw Fairness Scores'].to_dict(), 'socioeconomic_bkgd': results['socioeconomic_bkgd']['Raw Fairness Scores'].to_dict(), 'ethnicity': results['ethnicity']['Raw Fairness Scores'].to_dict() }, 'Adjusted Category Fairness Score': { 'gender': float(results['gender']['Adjusted Category Fairness Score']), # Convert NumPy float to Python float 'first_language': float(results['first_language']['Adjusted Category Fairness Score']), 'socioeconomic_bkgd': float(results['socioeconomic_bkgd']['Adjusted Category Fairness Score']), 'ethnicity': float(results['ethnicity']['Adjusted Category Fairness Score']) }, 'Overall Fairness Score': float(overall_fairness_score), # Convert NumPy float to Python float 'Avg_wer': float(avg_wer), # Convert NumPy float to Python float 'Avg_rtfx': float(avg_rtfx), # Convert NumPy float to Python float 'FAAS': float(faas), # Convert NumPy float to Python float 'ASR_model': ASR_model, } # print(Results) return Results