Spaces:
Running
Running
import pandas as pd | |
import re | |
import numpy as np | |
from jiwer import wer | |
import statsmodels.api as sm | |
import statsmodels.formula.api as smf | |
from utils.load_csv import download_csv, upload_csv | |
def generateResults(ASR_model): | |
# Define normalization function | |
def normalize_text(text): | |
""" | |
Normalize text by converting to lowercase, removing special characters, | |
except digits, and handling None or float values. | |
""" | |
if text is None or pd.isna(text): # Check for None or NaN | |
return "" | |
if isinstance(text, float): # Check for floats and convert them to empty string | |
return "" | |
text = text.lower() # Convert to lowercase | |
text = re.sub(r'[^a-z0-9\s]', '', text) # Keep only letters, digits, and spaces | |
return text.strip() # Remove leading/trailing spaces | |
# Load the CSV with whisper transcripts f"test_with_{ASR_model}.csv" | |
csv_transcript = f'test_with_{ASR_model.replace("/","_")}.csv' | |
# Read the CSV file | |
df = download_csv(csv_transcript) | |
if(df is None): | |
print(f"CSV not found in the dataset repo. Please generate the transcript file first.") | |
return | |
# Normalize original text and whisper transcripts | |
df['normalized_transcription'] = df[df.columns[1]].apply(normalize_text) # Replace 'original_text' with your column name | |
# Check if whisper transcript column exists | |
if 'transcript' in df.columns: | |
df['normalized_transcript'] = df[df.columns[8]].apply(normalize_text) | |
# Calculate WER | |
wer_scores = [] | |
for index, row in df.iterrows(): | |
original = row['normalized_transcription'] | |
transcript = row['normalized_transcript'] | |
if original and transcript: | |
wer_score = wer(original, transcript) | |
else: | |
wer_score = 1.0 # Maximum error if one text is missing | |
wer_scores.append(wer_score) | |
df['WER'] = wer_scores | |
# Compute IQR | |
Q1 = df['WER'].quantile(0.25) | |
Q3 = df['WER'].quantile(0.75) | |
IQR = Q3 - Q1 | |
# Define outlier range | |
lower_bound = Q1 - 1.5 * IQR | |
upper_bound = Q3 + 1.5 * IQR | |
# Remove outliers | |
df = df[(df['WER'] >= lower_bound) & (df['WER'] <= upper_bound)] | |
else: | |
print("Column 'transcript' not found in CSV") | |
# Save the updated CSV | |
csv_result = f'test_with_{ASR_model.replace("/","_")}_WER.csv' | |
upload_csv(df,csv_result) | |
print(f"WER calculations saved to {csv_result}") | |
avg_wer = df["WER"].mean() | |
avg_rtfx = df["rtfx"].mean() | |
print(f"Average WER: {avg_wer} and Avg RTFX : {avg_rtfx}") | |
#---------------------------------------------------------------------------------------------------------- | |
#---------------------------------------------------------------------------------------------------------- | |
# Define protected attributes and label columns | |
protected_attributes = ['gender', 'first_language', 'socioeconomic_bkgd', 'ethnicity'] | |
label_column = 'normalized_transcription' | |
prediction_column = 'normalized_transcript' | |
wer_column = 'WER' | |
data = df | |
# Function to calculate WER disparity | |
def calculate_wer_disparity(data, protected_attribute, wer_column): | |
groups = data[protected_attribute].unique() | |
wer_disparity = {} | |
for group in groups: | |
group_data = data[data[protected_attribute] == group] | |
avg_wer = group_data[wer_column].mean() | |
wer_disparity[group] = avg_wer | |
return wer_disparity | |
# Calculate WER disparity for each protected attribute | |
for attribute in protected_attributes: | |
disparity = calculate_wer_disparity(data, attribute, wer_column) | |
print(f"WER Disparity for {attribute}:", disparity) | |
#------------------------------------------------------------------------------------------------------- | |
#------------------------------------------------------------------------------------------------------- | |
data["Reference_words"] = data["normalized_transcription"].str.split().str.len() | |
# Compute word error count (WER_count) | |
data["WER_count"] = data["Reference_words"] * data["WER"] | |
df = data | |
categorical_cols = ['gender', 'first_language', 'socioeconomic_bkgd', 'ethnicity'] | |
for col in categorical_cols: | |
df[col] = df[col].astype("category") | |
# Offset: log of reference word count (to adjust for different transcript lengths) | |
df["log_Ref_Words"] = np.log(df["Reference_words"] + 1) # Adding 1 to avoid log(0) | |
# Fit a Mixed-Effects Poisson Regression Model | |
mixed_model = smf.mixedlm( | |
formula="WER_count ~ log_Ref_Words + age + gender + first_language + socioeconomic_bkgd + ethnicity", # Fixed effects | |
data=df, | |
groups=df["combined_column"] # Random effect on speaker | |
).fit() | |
# Display results | |
# print(mixed_model.summary()) | |
#-------------------------------------------------------------------------------------------------------------------------- | |
#-------------------------------------------------------------------------------------------------------------------------- | |
from scipy.stats import chi2 | |
# Assume 'mixed_model' is your already-fitted mixed-effects model and 'df' is your DataFrame. | |
# Also assume df["log_Ref_Words"] = np.log(df["Reference_words"] + 1) | |
params = mixed_model.params | |
# Set fixed values for continuous predictors: | |
fixed_log_ref = df["log_Ref_Words"].mean() | |
baseline_log = params["Intercept"] + params["log_Ref_Words"] * fixed_log_ref | |
exposure = np.exp(fixed_log_ref) - 1 | |
def compute_predicted_error_rate(category, level, params, baseline_log, exposure): | |
"""Computes the predicted WER (error rate) for a given level of a demographic attribute.""" | |
coef_name = f"{category}[T.{level}]" | |
effect = params.get(coef_name, 0) # For the baseline level, effect is 0. | |
pred_log = baseline_log + effect | |
pred_count = np.exp(pred_log) | |
return pred_count / exposure | |
def compute_category_fairness(category, params, baseline_log, exposure, df): | |
""" | |
For a given category, compute: | |
- Predicted error rates for each subgroup level. | |
- Raw fairness scores (0-100 scale: 100 = best, 0 = worst) based on linear scaling. | |
- A weighted category fairness score using group proportions. | |
""" | |
levels = df[category].cat.categories | |
predictions = {} | |
for lvl in levels: | |
predictions[lvl] = compute_predicted_error_rate(category, lvl, params, baseline_log, exposure) | |
# Convert predictions to a Series. | |
pred_series = pd.Series(predictions) | |
min_pred, max_pred = pred_series.min(), pred_series.max() | |
# Compute raw fairness scores: if all levels are identical, assign 100 to everyone. | |
if max_pred == min_pred: | |
raw_fairness = pred_series.apply(lambda x: 100.0) | |
else: | |
raw_fairness = pred_series.apply(lambda x: 100 * (1 - (x - min_pred) / (max_pred - min_pred))) | |
# Weight the subgroup fairness scores by their sample proportions in the dataset. | |
group_proportions = df[category].value_counts(normalize=True) | |
# Ensure ordering matches the fairness scores index: | |
group_proportions = group_proportions.reindex(raw_fairness.index, fill_value=0) | |
weighted_category_fairness = np.average(raw_fairness, weights=group_proportions) | |
return pred_series, raw_fairness, weighted_category_fairness | |
def perform_lrt(attribute, df): | |
"""Performs Likelihood Ratio Test (LRT) to test the overall significance of an attribute.""" | |
full_model = smf.mixedlm(f"WER ~ {attribute} + log_Ref_Words", df, groups=df["combined_column"]).fit() | |
reduced_model = smf.mixedlm("WER ~ log_Ref_Words", df, groups=df["combined_column"]).fit() | |
lr_stat = 2 * (full_model.llf - reduced_model.llf) | |
df_diff = full_model.df_modelwc - reduced_model.df_modelwc | |
p_value = chi2.sf(lr_stat, df_diff) | |
return p_value | |
# List of attributes to evaluate | |
categories = ['gender', 'first_language', 'socioeconomic_bkgd', 'ethnicity'] | |
results = {} | |
adjusted_category_scores = [] # To store adjusted fairness scores for each category. | |
weights_for_categories = [] # Weight each category based on significance if desired. | |
for cat in categories: | |
preds, raw_fairness, category_raw_score = compute_category_fairness(cat, params, baseline_log, exposure, df) | |
# Perform LRT to get overall significance for this attribute. | |
lrt_p_value = perform_lrt(cat, df) | |
# Compute multiplier based on significance. | |
# If p-value < 0.05, we penalize the fairness score proportionally. | |
multiplier = (lrt_p_value / 0.05) if lrt_p_value < 0.05 else 1.0 | |
# Adjusted fairness score for the category: | |
adjusted_score = category_raw_score * multiplier | |
# Save results. | |
results[cat] = { | |
'Predicted Error Rates': preds, | |
'Raw Fairness Scores': raw_fairness, | |
# 'Weighted Raw Fairness Score': category_raw_score, | |
# 'LRT p-value': lrt_p_value, | |
'Adjusted Category Fairness Score': adjusted_score | |
} | |
# For overall score, we could weight categories (here we simply use the adjusted score). | |
adjusted_category_scores.append(adjusted_score) | |
# Optionally, use multiplier as a weight for overall aggregation. | |
weights_for_categories.append(multiplier) | |
# Compute overall fairness score across attributes using the adjusted category scores. | |
overall_fairness_score = np.average(adjusted_category_scores) | |
#FAAS is the Fairness Adjusted ASR Score based on which models will be ranked | |
faas = 10*np.log10(overall_fairness_score/avg_wer) | |
print("Fairness Adjusted ASR Score for the model is", faas) | |
# print("\nFinal Overall Fairness Score (Weighted Average over Categories):", overall_fairness_score) # used for summary_speedometer,Leaderboard | |
# print(results['gender']) | |
# print(results['gender']['Predicted Error Rates']) | |
# print(results['gender']['Adjusted Category Fairness Score']) | |
print("________________________________") | |
Results = { | |
'Predicted Error Rates': { | |
'gender': results['gender']['Predicted Error Rates'].to_dict(), # Convert Series to dict | |
'first_language': results['first_language']['Predicted Error Rates'].to_dict(), | |
'socioeconomic_bkgd': results['socioeconomic_bkgd']['Predicted Error Rates'].to_dict(), | |
'ethnicity': results['ethnicity']['Predicted Error Rates'].to_dict() | |
}, | |
'Raw Fairness Scores': { | |
'gender': results['gender']['Raw Fairness Scores'].to_dict(), | |
'first_language': results['first_language']['Raw Fairness Scores'].to_dict(), | |
'socioeconomic_bkgd': results['socioeconomic_bkgd']['Raw Fairness Scores'].to_dict(), | |
'ethnicity': results['ethnicity']['Raw Fairness Scores'].to_dict() | |
}, | |
'Adjusted Category Fairness Score': { | |
'gender': float(results['gender']['Adjusted Category Fairness Score']), # Convert NumPy float to Python float | |
'first_language': float(results['first_language']['Adjusted Category Fairness Score']), | |
'socioeconomic_bkgd': float(results['socioeconomic_bkgd']['Adjusted Category Fairness Score']), | |
'ethnicity': float(results['ethnicity']['Adjusted Category Fairness Score']) | |
}, | |
'Overall Fairness Score': float(overall_fairness_score), # Convert NumPy float to Python float | |
'Avg_wer': float(avg_wer), # Convert NumPy float to Python float | |
'Avg_rtfx': float(avg_rtfx), # Convert NumPy float to Python float | |
'FAAS': float(faas), # Convert NumPy float to Python float | |
'ASR_model': ASR_model, | |
} | |
# print(Results) | |
return Results | |