Spaces:
Running
Running
File size: 12,061 Bytes
547836e 8e62829 547836e 8e62829 547836e |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 |
import pandas as pd
import re
import numpy as np
from jiwer import wer
import statsmodels.api as sm
import statsmodels.formula.api as smf
from utils.load_csv import download_csv, upload_csv
def generateResults(ASR_model):
# Define normalization function
def normalize_text(text):
"""
Normalize text by converting to lowercase, removing special characters,
except digits, and handling None or float values.
"""
if text is None or pd.isna(text): # Check for None or NaN
return ""
if isinstance(text, float): # Check for floats and convert them to empty string
return ""
text = text.lower() # Convert to lowercase
text = re.sub(r'[^a-z0-9\s]', '', text) # Keep only letters, digits, and spaces
return text.strip() # Remove leading/trailing spaces
# Load the CSV with whisper transcripts f"test_with_{ASR_model}.csv"
csv_transcript = f'test_with_{ASR_model.replace("/","_")}.csv'
# Read the CSV file
df = download_csv(csv_transcript)
if(df is None):
print(f"CSV not found in the dataset repo. Please generate the transcript file first.")
return
# Normalize original text and whisper transcripts
df['normalized_transcription'] = df[df.columns[1]].apply(normalize_text) # Replace 'original_text' with your column name
# Check if whisper transcript column exists
if 'transcript' in df.columns:
df['normalized_transcript'] = df[df.columns[8]].apply(normalize_text)
# Calculate WER
wer_scores = []
for index, row in df.iterrows():
original = row['normalized_transcription']
transcript = row['normalized_transcript']
if original and transcript:
wer_score = wer(original, transcript)
else:
wer_score = 1.0 # Maximum error if one text is missing
wer_scores.append(wer_score)
df['WER'] = wer_scores
# Compute IQR
Q1 = df['WER'].quantile(0.25)
Q3 = df['WER'].quantile(0.75)
IQR = Q3 - Q1
# Define outlier range
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
# Remove outliers
df = df[(df['WER'] >= lower_bound) & (df['WER'] <= upper_bound)]
else:
print("Column 'transcript' not found in CSV")
# Save the updated CSV
csv_result = f'test_with_{ASR_model.replace("/","_")}_WER.csv'
upload_csv(df,csv_result)
print(f"WER calculations saved to {csv_result}")
avg_wer = df["WER"].mean()
avg_rtfx = df["rtfx"].mean()
print(f"Average WER: {avg_wer} and Avg RTFX : {avg_rtfx}")
#----------------------------------------------------------------------------------------------------------
#----------------------------------------------------------------------------------------------------------
# Define protected attributes and label columns
protected_attributes = ['gender', 'first_language', 'socioeconomic_bkgd', 'ethnicity']
label_column = 'normalized_transcription'
prediction_column = 'normalized_transcript'
wer_column = 'WER'
data = df
# Function to calculate WER disparity
def calculate_wer_disparity(data, protected_attribute, wer_column):
groups = data[protected_attribute].unique()
wer_disparity = {}
for group in groups:
group_data = data[data[protected_attribute] == group]
avg_wer = group_data[wer_column].mean()
wer_disparity[group] = avg_wer
return wer_disparity
# Calculate WER disparity for each protected attribute
for attribute in protected_attributes:
disparity = calculate_wer_disparity(data, attribute, wer_column)
print(f"WER Disparity for {attribute}:", disparity)
#-------------------------------------------------------------------------------------------------------
#-------------------------------------------------------------------------------------------------------
data["Reference_words"] = data["normalized_transcription"].str.split().str.len()
# Compute word error count (WER_count)
data["WER_count"] = data["Reference_words"] * data["WER"]
df = data
categorical_cols = ['gender', 'first_language', 'socioeconomic_bkgd', 'ethnicity']
for col in categorical_cols:
df[col] = df[col].astype("category")
# Offset: log of reference word count (to adjust for different transcript lengths)
df["log_Ref_Words"] = np.log(df["Reference_words"] + 1) # Adding 1 to avoid log(0)
# Fit a Mixed-Effects Poisson Regression Model
mixed_model = smf.mixedlm(
formula="WER_count ~ log_Ref_Words + age + gender + first_language + socioeconomic_bkgd + ethnicity", # Fixed effects
data=df,
groups=df["combined_column"] # Random effect on speaker
).fit()
# Display results
# print(mixed_model.summary())
#--------------------------------------------------------------------------------------------------------------------------
#--------------------------------------------------------------------------------------------------------------------------
from scipy.stats import chi2
# Assume 'mixed_model' is your already-fitted mixed-effects model and 'df' is your DataFrame.
# Also assume df["log_Ref_Words"] = np.log(df["Reference_words"] + 1)
params = mixed_model.params
# Set fixed values for continuous predictors:
fixed_log_ref = df["log_Ref_Words"].mean()
baseline_log = params["Intercept"] + params["log_Ref_Words"] * fixed_log_ref
exposure = np.exp(fixed_log_ref) - 1
def compute_predicted_error_rate(category, level, params, baseline_log, exposure):
"""Computes the predicted WER (error rate) for a given level of a demographic attribute."""
coef_name = f"{category}[T.{level}]"
effect = params.get(coef_name, 0) # For the baseline level, effect is 0.
pred_log = baseline_log + effect
pred_count = np.exp(pred_log)
return pred_count / exposure
def compute_category_fairness(category, params, baseline_log, exposure, df):
"""
For a given category, compute:
- Predicted error rates for each subgroup level.
- Raw fairness scores (0-100 scale: 100 = best, 0 = worst) based on linear scaling.
- A weighted category fairness score using group proportions.
"""
levels = df[category].cat.categories
predictions = {}
for lvl in levels:
predictions[lvl] = compute_predicted_error_rate(category, lvl, params, baseline_log, exposure)
# Convert predictions to a Series.
pred_series = pd.Series(predictions)
min_pred, max_pred = pred_series.min(), pred_series.max()
# Compute raw fairness scores: if all levels are identical, assign 100 to everyone.
if max_pred == min_pred:
raw_fairness = pred_series.apply(lambda x: 100.0)
else:
raw_fairness = pred_series.apply(lambda x: 100 * (1 - (x - min_pred) / (max_pred - min_pred)))
# Weight the subgroup fairness scores by their sample proportions in the dataset.
group_proportions = df[category].value_counts(normalize=True)
# Ensure ordering matches the fairness scores index:
group_proportions = group_proportions.reindex(raw_fairness.index, fill_value=0)
weighted_category_fairness = np.average(raw_fairness, weights=group_proportions)
return pred_series, raw_fairness, weighted_category_fairness
def perform_lrt(attribute, df):
"""Performs Likelihood Ratio Test (LRT) to test the overall significance of an attribute."""
full_model = smf.mixedlm(f"WER ~ {attribute} + log_Ref_Words", df, groups=df["combined_column"]).fit()
reduced_model = smf.mixedlm("WER ~ log_Ref_Words", df, groups=df["combined_column"]).fit()
lr_stat = 2 * (full_model.llf - reduced_model.llf)
df_diff = full_model.df_modelwc - reduced_model.df_modelwc
p_value = chi2.sf(lr_stat, df_diff)
return p_value
# List of attributes to evaluate
categories = ['gender', 'first_language', 'socioeconomic_bkgd', 'ethnicity']
results = {}
adjusted_category_scores = [] # To store adjusted fairness scores for each category.
weights_for_categories = [] # Weight each category based on significance if desired.
for cat in categories:
preds, raw_fairness, category_raw_score = compute_category_fairness(cat, params, baseline_log, exposure, df)
# Perform LRT to get overall significance for this attribute.
lrt_p_value = perform_lrt(cat, df)
# Compute multiplier based on significance.
# If p-value < 0.05, we penalize the fairness score proportionally.
multiplier = (lrt_p_value / 0.05) if lrt_p_value < 0.05 else 1.0
# Adjusted fairness score for the category:
adjusted_score = category_raw_score * multiplier
# Save results.
results[cat] = {
'Predicted Error Rates': preds,
'Raw Fairness Scores': raw_fairness,
# 'Weighted Raw Fairness Score': category_raw_score,
# 'LRT p-value': lrt_p_value,
'Adjusted Category Fairness Score': adjusted_score
}
# For overall score, we could weight categories (here we simply use the adjusted score).
adjusted_category_scores.append(adjusted_score)
# Optionally, use multiplier as a weight for overall aggregation.
weights_for_categories.append(multiplier)
# Compute overall fairness score across attributes using the adjusted category scores.
overall_fairness_score = np.average(adjusted_category_scores)
#FAAS is the Fairness Adjusted ASR Score based on which models will be ranked
faas = 10*np.log10(overall_fairness_score/avg_wer)
print("Fairness Adjusted ASR Score for the model is", faas)
# print("\nFinal Overall Fairness Score (Weighted Average over Categories):", overall_fairness_score) # used for summary_speedometer,Leaderboard
# print(results['gender'])
# print(results['gender']['Predicted Error Rates'])
# print(results['gender']['Adjusted Category Fairness Score'])
print("________________________________")
Results = {
'Predicted Error Rates': {
'gender': results['gender']['Predicted Error Rates'].to_dict(), # Convert Series to dict
'first_language': results['first_language']['Predicted Error Rates'].to_dict(),
'socioeconomic_bkgd': results['socioeconomic_bkgd']['Predicted Error Rates'].to_dict(),
'ethnicity': results['ethnicity']['Predicted Error Rates'].to_dict()
},
'Raw Fairness Scores': {
'gender': results['gender']['Raw Fairness Scores'].to_dict(),
'first_language': results['first_language']['Raw Fairness Scores'].to_dict(),
'socioeconomic_bkgd': results['socioeconomic_bkgd']['Raw Fairness Scores'].to_dict(),
'ethnicity': results['ethnicity']['Raw Fairness Scores'].to_dict()
},
'Adjusted Category Fairness Score': {
'gender': float(results['gender']['Adjusted Category Fairness Score']), # Convert NumPy float to Python float
'first_language': float(results['first_language']['Adjusted Category Fairness Score']),
'socioeconomic_bkgd': float(results['socioeconomic_bkgd']['Adjusted Category Fairness Score']),
'ethnicity': float(results['ethnicity']['Adjusted Category Fairness Score'])
},
'Overall Fairness Score': float(overall_fairness_score), # Convert NumPy float to Python float
'Avg_wer': float(avg_wer), # Convert NumPy float to Python float
'Avg_rtfx': float(avg_rtfx), # Convert NumPy float to Python float
'FAAS': float(faas), # Convert NumPy float to Python float
'ASR_model': ASR_model,
}
# print(Results)
return Results
|