ASR-FairBench-Server / utils /generate_results.py
satyamr196's picture
minor bug : double quotes inside double quotes not allowed in csv_path string
8e62829
raw
history blame
12.1 kB
import pandas as pd
import re
import numpy as np
from jiwer import wer
import statsmodels.api as sm
import statsmodels.formula.api as smf
from utils.load_csv import download_csv, upload_csv
def generateResults(ASR_model):
# Define normalization function
def normalize_text(text):
"""
Normalize text by converting to lowercase, removing special characters,
except digits, and handling None or float values.
"""
if text is None or pd.isna(text): # Check for None or NaN
return ""
if isinstance(text, float): # Check for floats and convert them to empty string
return ""
text = text.lower() # Convert to lowercase
text = re.sub(r'[^a-z0-9\s]', '', text) # Keep only letters, digits, and spaces
return text.strip() # Remove leading/trailing spaces
# Load the CSV with whisper transcripts f"test_with_{ASR_model}.csv"
csv_transcript = f'test_with_{ASR_model.replace("/","_")}.csv'
# Read the CSV file
df = download_csv(csv_transcript)
if(df is None):
print(f"CSV not found in the dataset repo. Please generate the transcript file first.")
return
# Normalize original text and whisper transcripts
df['normalized_transcription'] = df[df.columns[1]].apply(normalize_text) # Replace 'original_text' with your column name
# Check if whisper transcript column exists
if 'transcript' in df.columns:
df['normalized_transcript'] = df[df.columns[8]].apply(normalize_text)
# Calculate WER
wer_scores = []
for index, row in df.iterrows():
original = row['normalized_transcription']
transcript = row['normalized_transcript']
if original and transcript:
wer_score = wer(original, transcript)
else:
wer_score = 1.0 # Maximum error if one text is missing
wer_scores.append(wer_score)
df['WER'] = wer_scores
# Compute IQR
Q1 = df['WER'].quantile(0.25)
Q3 = df['WER'].quantile(0.75)
IQR = Q3 - Q1
# Define outlier range
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
# Remove outliers
df = df[(df['WER'] >= lower_bound) & (df['WER'] <= upper_bound)]
else:
print("Column 'transcript' not found in CSV")
# Save the updated CSV
csv_result = f'test_with_{ASR_model.replace("/","_")}_WER.csv'
upload_csv(df,csv_result)
print(f"WER calculations saved to {csv_result}")
avg_wer = df["WER"].mean()
avg_rtfx = df["rtfx"].mean()
print(f"Average WER: {avg_wer} and Avg RTFX : {avg_rtfx}")
#----------------------------------------------------------------------------------------------------------
#----------------------------------------------------------------------------------------------------------
# Define protected attributes and label columns
protected_attributes = ['gender', 'first_language', 'socioeconomic_bkgd', 'ethnicity']
label_column = 'normalized_transcription'
prediction_column = 'normalized_transcript'
wer_column = 'WER'
data = df
# Function to calculate WER disparity
def calculate_wer_disparity(data, protected_attribute, wer_column):
groups = data[protected_attribute].unique()
wer_disparity = {}
for group in groups:
group_data = data[data[protected_attribute] == group]
avg_wer = group_data[wer_column].mean()
wer_disparity[group] = avg_wer
return wer_disparity
# Calculate WER disparity for each protected attribute
for attribute in protected_attributes:
disparity = calculate_wer_disparity(data, attribute, wer_column)
print(f"WER Disparity for {attribute}:", disparity)
#-------------------------------------------------------------------------------------------------------
#-------------------------------------------------------------------------------------------------------
data["Reference_words"] = data["normalized_transcription"].str.split().str.len()
# Compute word error count (WER_count)
data["WER_count"] = data["Reference_words"] * data["WER"]
df = data
categorical_cols = ['gender', 'first_language', 'socioeconomic_bkgd', 'ethnicity']
for col in categorical_cols:
df[col] = df[col].astype("category")
# Offset: log of reference word count (to adjust for different transcript lengths)
df["log_Ref_Words"] = np.log(df["Reference_words"] + 1) # Adding 1 to avoid log(0)
# Fit a Mixed-Effects Poisson Regression Model
mixed_model = smf.mixedlm(
formula="WER_count ~ log_Ref_Words + age + gender + first_language + socioeconomic_bkgd + ethnicity", # Fixed effects
data=df,
groups=df["combined_column"] # Random effect on speaker
).fit()
# Display results
# print(mixed_model.summary())
#--------------------------------------------------------------------------------------------------------------------------
#--------------------------------------------------------------------------------------------------------------------------
from scipy.stats import chi2
# Assume 'mixed_model' is your already-fitted mixed-effects model and 'df' is your DataFrame.
# Also assume df["log_Ref_Words"] = np.log(df["Reference_words"] + 1)
params = mixed_model.params
# Set fixed values for continuous predictors:
fixed_log_ref = df["log_Ref_Words"].mean()
baseline_log = params["Intercept"] + params["log_Ref_Words"] * fixed_log_ref
exposure = np.exp(fixed_log_ref) - 1
def compute_predicted_error_rate(category, level, params, baseline_log, exposure):
"""Computes the predicted WER (error rate) for a given level of a demographic attribute."""
coef_name = f"{category}[T.{level}]"
effect = params.get(coef_name, 0) # For the baseline level, effect is 0.
pred_log = baseline_log + effect
pred_count = np.exp(pred_log)
return pred_count / exposure
def compute_category_fairness(category, params, baseline_log, exposure, df):
"""
For a given category, compute:
- Predicted error rates for each subgroup level.
- Raw fairness scores (0-100 scale: 100 = best, 0 = worst) based on linear scaling.
- A weighted category fairness score using group proportions.
"""
levels = df[category].cat.categories
predictions = {}
for lvl in levels:
predictions[lvl] = compute_predicted_error_rate(category, lvl, params, baseline_log, exposure)
# Convert predictions to a Series.
pred_series = pd.Series(predictions)
min_pred, max_pred = pred_series.min(), pred_series.max()
# Compute raw fairness scores: if all levels are identical, assign 100 to everyone.
if max_pred == min_pred:
raw_fairness = pred_series.apply(lambda x: 100.0)
else:
raw_fairness = pred_series.apply(lambda x: 100 * (1 - (x - min_pred) / (max_pred - min_pred)))
# Weight the subgroup fairness scores by their sample proportions in the dataset.
group_proportions = df[category].value_counts(normalize=True)
# Ensure ordering matches the fairness scores index:
group_proportions = group_proportions.reindex(raw_fairness.index, fill_value=0)
weighted_category_fairness = np.average(raw_fairness, weights=group_proportions)
return pred_series, raw_fairness, weighted_category_fairness
def perform_lrt(attribute, df):
"""Performs Likelihood Ratio Test (LRT) to test the overall significance of an attribute."""
full_model = smf.mixedlm(f"WER ~ {attribute} + log_Ref_Words", df, groups=df["combined_column"]).fit()
reduced_model = smf.mixedlm("WER ~ log_Ref_Words", df, groups=df["combined_column"]).fit()
lr_stat = 2 * (full_model.llf - reduced_model.llf)
df_diff = full_model.df_modelwc - reduced_model.df_modelwc
p_value = chi2.sf(lr_stat, df_diff)
return p_value
# List of attributes to evaluate
categories = ['gender', 'first_language', 'socioeconomic_bkgd', 'ethnicity']
results = {}
adjusted_category_scores = [] # To store adjusted fairness scores for each category.
weights_for_categories = [] # Weight each category based on significance if desired.
for cat in categories:
preds, raw_fairness, category_raw_score = compute_category_fairness(cat, params, baseline_log, exposure, df)
# Perform LRT to get overall significance for this attribute.
lrt_p_value = perform_lrt(cat, df)
# Compute multiplier based on significance.
# If p-value < 0.05, we penalize the fairness score proportionally.
multiplier = (lrt_p_value / 0.05) if lrt_p_value < 0.05 else 1.0
# Adjusted fairness score for the category:
adjusted_score = category_raw_score * multiplier
# Save results.
results[cat] = {
'Predicted Error Rates': preds,
'Raw Fairness Scores': raw_fairness,
# 'Weighted Raw Fairness Score': category_raw_score,
# 'LRT p-value': lrt_p_value,
'Adjusted Category Fairness Score': adjusted_score
}
# For overall score, we could weight categories (here we simply use the adjusted score).
adjusted_category_scores.append(adjusted_score)
# Optionally, use multiplier as a weight for overall aggregation.
weights_for_categories.append(multiplier)
# Compute overall fairness score across attributes using the adjusted category scores.
overall_fairness_score = np.average(adjusted_category_scores)
#FAAS is the Fairness Adjusted ASR Score based on which models will be ranked
faas = 10*np.log10(overall_fairness_score/avg_wer)
print("Fairness Adjusted ASR Score for the model is", faas)
# print("\nFinal Overall Fairness Score (Weighted Average over Categories):", overall_fairness_score) # used for summary_speedometer,Leaderboard
# print(results['gender'])
# print(results['gender']['Predicted Error Rates'])
# print(results['gender']['Adjusted Category Fairness Score'])
print("________________________________")
Results = {
'Predicted Error Rates': {
'gender': results['gender']['Predicted Error Rates'].to_dict(), # Convert Series to dict
'first_language': results['first_language']['Predicted Error Rates'].to_dict(),
'socioeconomic_bkgd': results['socioeconomic_bkgd']['Predicted Error Rates'].to_dict(),
'ethnicity': results['ethnicity']['Predicted Error Rates'].to_dict()
},
'Raw Fairness Scores': {
'gender': results['gender']['Raw Fairness Scores'].to_dict(),
'first_language': results['first_language']['Raw Fairness Scores'].to_dict(),
'socioeconomic_bkgd': results['socioeconomic_bkgd']['Raw Fairness Scores'].to_dict(),
'ethnicity': results['ethnicity']['Raw Fairness Scores'].to_dict()
},
'Adjusted Category Fairness Score': {
'gender': float(results['gender']['Adjusted Category Fairness Score']), # Convert NumPy float to Python float
'first_language': float(results['first_language']['Adjusted Category Fairness Score']),
'socioeconomic_bkgd': float(results['socioeconomic_bkgd']['Adjusted Category Fairness Score']),
'ethnicity': float(results['ethnicity']['Adjusted Category Fairness Score'])
},
'Overall Fairness Score': float(overall_fairness_score), # Convert NumPy float to Python float
'Avg_wer': float(avg_wer), # Convert NumPy float to Python float
'Avg_rtfx': float(avg_rtfx), # Convert NumPy float to Python float
'FAAS': float(faas), # Convert NumPy float to Python float
'ASR_model': ASR_model,
}
# print(Results)
return Results