Spaces:
Running
Running
File size: 17,431 Bytes
1721aea |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 |
# Propensity Score Matching Implementation
import pandas as pd
import numpy as np
from sklearn.neighbors import NearestNeighbors
import statsmodels.api as sm # For bias adjustment regression
import logging # For logging fallback
from typing import Dict, List, Optional, Any
# Import DoWhy
from dowhy import CausalModel
from .base import estimate_propensity_scores, format_ps_results, select_propensity_model
from .diagnostics import assess_balance #, plot_overlap, plot_balance # Import diagnostic functions
# Remove determine_optimal_caliper, it will be replaced by a heuristic
from .llm_assist import get_llm_parameters # Import LLM helpers
logger = logging.getLogger(__name__)
def _calculate_logit(pscore):
"""Calculate logit of propensity score, clipping to avoid inf."""
# Clip pscore to prevent log(0) or log(1) issues which lead to inf
epsilon = 1e-7
pscore_clipped = np.clip(pscore, epsilon, 1 - epsilon)
return np.log(pscore_clipped / (1 - pscore_clipped))
def _perform_matching_and_get_att(
df_sample: pd.DataFrame,
treatment: str,
outcome: str,
covariates: List[str],
propensity_model_type: str,
n_neighbors: int,
caliper: float,
perform_bias_adjustment: bool,
**kwargs
) -> float:
"""
Helper to perform Custom KNN PSM and calculate ATT, potentially with bias adjustment.
Returns the ATT estimate.
"""
df_ps = df_sample.copy()
try:
propensity_scores = estimate_propensity_scores(
df_ps, treatment, covariates, model_type=propensity_model_type, **kwargs
)
except Exception as e:
logger.warning(f"Propensity score estimation failed in helper: {e}")
return np.nan # Cannot proceed without propensity scores
df_ps['propensity_score'] = propensity_scores
treated = df_ps[df_ps[treatment] == 1]
control = df_ps[df_ps[treatment] == 0]
if treated.empty or control.empty:
return np.nan
nn = NearestNeighbors(n_neighbors=n_neighbors, radius=caliper if caliper is not None else np.inf, metric='minkowski', p=2)
try:
# Ensure control PS are valid before fitting
control_ps_values = control[['propensity_score']].values
if np.isnan(control_ps_values).any():
logger.warning("NaN values found in control propensity scores before NN fitting.")
return np.nan
nn.fit(control_ps_values)
# Ensure treated PS are valid before querying
treated_ps_values = treated[['propensity_score']].values
if np.isnan(treated_ps_values).any():
logger.warning("NaN values found in treated propensity scores before NN query.")
return np.nan
distances, indices = nn.kneighbors(treated_ps_values)
except ValueError as e:
# Handles case where control group might be too small or have NaN PS scores
logger.warning(f"NearestNeighbors fitting/query failed: {e}")
return np.nan
matched_outcomes_treated = []
matched_outcomes_control_means = []
propensity_diffs = []
for i in range(len(treated)):
treated_unit = treated.iloc[[i]]
valid_neighbors_mask = distances[i] <= (caliper if caliper is not None else np.inf)
valid_neighbors_idx = indices[i][valid_neighbors_mask]
if len(valid_neighbors_idx) > 0:
matched_controls_for_this_treated = control.iloc[valid_neighbors_idx]
if matched_controls_for_this_treated.empty:
continue # Should not happen with valid_neighbors_idx check, but safety
matched_outcomes_treated.append(treated_unit[outcome].values[0])
matched_outcomes_control_means.append(matched_controls_for_this_treated[outcome].mean())
if perform_bias_adjustment:
# Ensure PS scores are valid before calculating difference
treated_ps = treated_unit['propensity_score'].values[0]
control_ps_mean = matched_controls_for_this_treated['propensity_score'].mean()
if np.isnan(treated_ps) or np.isnan(control_ps_mean):
logger.warning("NaN propensity score encountered during bias adjustment calculation.")
# Cannot perform bias adjustment for this unit, potentially skip or handle
# For now, let's skip adding to propensity_diffs if NaN found
continue
propensity_diff = treated_ps - control_ps_mean
propensity_diffs.append(propensity_diff)
if not matched_outcomes_treated:
return np.nan
raw_att_components = np.array(matched_outcomes_treated) - np.array(matched_outcomes_control_means)
if perform_bias_adjustment:
# Ensure lengths match *after* potential skips due to NaNs
if not propensity_diffs or len(raw_att_components) != len(propensity_diffs):
logger.warning("Bias adjustment skipped due to inconsistent data lengths after NaN checks.")
return np.mean(raw_att_components)
try:
X_bias_adj = sm.add_constant(np.array(propensity_diffs))
y_bias_adj = raw_att_components
# Add check for NaNs/Infs in inputs to OLS
if np.isnan(X_bias_adj).any() or np.isnan(y_bias_adj).any() or \
np.isinf(X_bias_adj).any() or np.isinf(y_bias_adj).any():
logger.warning("NaN/Inf values detected in OLS inputs for bias adjustment. Falling back.")
return np.mean(raw_att_components)
bias_model = sm.OLS(y_bias_adj, X_bias_adj).fit()
bias_adjusted_att = bias_model.params[0]
return bias_adjusted_att
except Exception as e:
logger.warning(f"OLS for bias adjustment failed: {e}. Falling back to raw ATT.")
return np.mean(raw_att_components)
else:
return np.mean(raw_att_components)
def estimate_effect(df: pd.DataFrame, treatment: str, outcome: str,
covariates: List[str], **kwargs) -> Dict[str, Any]:
'''Estimate ATT using Propensity Score Matching.
Tries DoWhy's PSM first, falls back to custom implementation if DoWhy fails.
Uses bootstrap SE based on the custom implementation regardless.
'''
query = kwargs.get('query')
n_bootstraps = kwargs.get('n_bootstraps', 100)
# --- Parameter Setup (as before) ---
llm_params = get_llm_parameters(df, query, "PS.Matching")
llm_suggested_params = llm_params.get("parameters", {})
caliper = kwargs.get('caliper', llm_suggested_params.get('caliper'))
temp_propensity_scores_for_caliper = None
try:
temp_propensity_scores_for_caliper = estimate_propensity_scores(
df, treatment, covariates,
model_type=llm_suggested_params.get('propensity_model_type', 'logistic'),
**kwargs
)
if caliper is None and temp_propensity_scores_for_caliper is not None:
logit_ps = _calculate_logit(temp_propensity_scores_for_caliper)
if not np.isnan(logit_ps).all(): # Check if logit calculation was successful
caliper = 0.2 * np.nanstd(logit_ps) # Use nanstd for robustness
else:
logger.warning("Logit of propensity scores resulted in NaNs, cannot calculate heuristic caliper.")
caliper = None
elif caliper is None:
logger.warning("Could not estimate propensity scores for caliper heuristic.")
caliper = None
except Exception as e:
logger.warning(f"Failed to estimate initial propensity scores for caliper heuristic: {e}. Caliper set to None.")
caliper = None # Proceed without caliper if heuristic fails
n_neighbors = kwargs.get('n_neighbors', llm_suggested_params.get('n_neighbors', 1))
propensity_model_type = kwargs.get('propensity_model_type',
llm_suggested_params.get('propensity_model_type',
select_propensity_model(df, treatment, covariates, query)))
# --- Attempt DoWhy PSM for Point Estimate ---
att_estimate = np.nan
method_used_for_att = "Fallback Custom PSM"
dowhy_model = None
identified_estimand = None
try:
logger.info("Attempting estimation using DoWhy Propensity Score Matching...")
dowhy_model = CausalModel(
data=df,
treatment=treatment,
outcome=outcome,
common_causes=covariates,
estimand_type='nonparametric-ate' # Provide list of names directly
)
# Identify estimand (optional step, but good practice)
identified_estimand = dowhy_model.identify_effect(proceed_when_unidentifiable=True)
logger.info(f"DoWhy identified estimand: {identified_estimand}")
# Estimate effect using DoWhy's PSM
estimate = dowhy_model.estimate_effect(
identified_estimand,
method_name="backdoor.propensity_score_matching",
target_units="att",
method_params={}
)
att_estimate = estimate.value
method_used_for_att = "DoWhy PSM"
logger.info(f"DoWhy PSM successful. ATT Estimate: {att_estimate}")
except Exception as e:
logger.warning(f"DoWhy PSM failed: {e}. Falling back to custom PSM implementation.")
# Fallback is triggered implicitly if att_estimate remains NaN
# --- Fallback or if DoWhy failed ---
if np.isnan(att_estimate):
logger.info("Calculating ATT estimate using fallback custom PSM...")
att_estimate = _perform_matching_and_get_att(
df, treatment, outcome, covariates,
propensity_model_type, n_neighbors, caliper,
perform_bias_adjustment=True, **kwargs # Bias adjust the fallback
)
method_used_for_att = "Fallback Custom PSM" # Confirm it's fallback
if np.isnan(att_estimate):
raise ValueError("Fallback custom PSM estimation also failed. Cannot proceed.")
logger.info(f"Fallback Custom PSM successful. ATT Estimate: {att_estimate}")
# --- Bootstrap SE (using custom helper for consistency) ---
logger.info(f"Calculating Bootstrap SE using custom helper ({n_bootstraps} iterations)...")
bootstrap_atts = []
for i in range(n_bootstraps):
try:
# Ensure bootstrap samples are drawn correctly
df_boot = df.sample(n=len(df), replace=True, random_state=np.random.randint(1000000) + i)
# Bias adjustment in bootstrap can be slow, optionally disable it
boot_att = _perform_matching_and_get_att(
df_boot, treatment, outcome, covariates,
propensity_model_type, n_neighbors, caliper,
perform_bias_adjustment=False, **kwargs # Set bias adjustment to False for speed in bootstrap
)
if not np.isnan(boot_att):
bootstrap_atts.append(boot_att)
except Exception as boot_e:
logger.warning(f"Bootstrap iteration {i+1} failed: {boot_e}")
continue # Skip failed bootstrap iteration
att_se = np.nanstd(bootstrap_atts) if bootstrap_atts else np.nan # Use nanstd
actual_bootstrap_iterations = len(bootstrap_atts)
logger.info(f"Bootstrap SE calculated: {att_se} from {actual_bootstrap_iterations} successful iterations.")
# --- Diagnostics (using custom matching logic for consistency) ---
logger.info("Performing diagnostic checks using custom matching logic...")
diagnostics = {"error": "Diagnostics failed to run."}
propensity_scores_orig = temp_propensity_scores_for_caliper # Reuse if available and not None
if propensity_scores_orig is None:
try:
propensity_scores_orig = estimate_propensity_scores(
df, treatment, covariates, model_type=propensity_model_type, **kwargs
)
except Exception as e:
logger.error(f"Failed to estimate propensity scores for diagnostics: {e}")
propensity_scores_orig = None
if propensity_scores_orig is not None and not np.isnan(propensity_scores_orig).all():
df_ps_orig = df.copy()
df_ps_orig['propensity_score'] = propensity_scores_orig
treated_orig = df_ps_orig[df_ps_orig[treatment] == 1]
control_orig = df_ps_orig[df_ps_orig[treatment] == 0]
unmatched_treated_count = 0
# Drop rows with NaN propensity scores before diagnostics
treated_orig = treated_orig.dropna(subset=['propensity_score'])
control_orig = control_orig.dropna(subset=['propensity_score'])
if not treated_orig.empty and not control_orig.empty:
try:
nn_diag = NearestNeighbors(n_neighbors=n_neighbors, radius=caliper if caliper is not None else np.inf, metric='minkowski', p=2)
nn_diag.fit(control_orig[['propensity_score']].values)
distances_diag, indices_diag = nn_diag.kneighbors(treated_orig[['propensity_score']].values)
matched_treated_indices_diag = []
matched_control_indices_diag = []
for i in range(len(treated_orig)):
valid_neighbors_mask_diag = distances_diag[i] <= (caliper if caliper is not None else np.inf)
valid_neighbors_idx_diag = indices_diag[i][valid_neighbors_mask_diag]
if len(valid_neighbors_idx_diag) > 0:
# Get original DataFrame indices from control_orig based on iloc indices
selected_control_original_indices = control_orig.index[valid_neighbors_idx_diag]
matched_treated_indices_diag.extend([treated_orig.index[i]] * len(selected_control_original_indices))
matched_control_indices_diag.extend(selected_control_original_indices)
else:
unmatched_treated_count += 1
if matched_control_indices_diag:
# Use unique indices for creating the diagnostic dataframe
unique_matched_control_indices = list(set(matched_control_indices_diag))
unique_matched_treated_indices = list(set(matched_treated_indices_diag))
matched_control_df_diag = df.loc[unique_matched_control_indices]
matched_treated_df_for_diag = df.loc[unique_matched_treated_indices]
matched_df_diag = pd.concat([matched_treated_df_for_diag, matched_control_df_diag]).drop_duplicates()
# Retrieve propensity scores for the specific units in matched_df_diag
ps_matched_for_diag = propensity_scores_orig.loc[matched_df_diag.index]
diagnostics = assess_balance(df, matched_df_diag, treatment, covariates,
method="PSM",
propensity_scores_original=propensity_scores_orig,
propensity_scores_matched=ps_matched_for_diag)
else:
diagnostics = {"message": "No units could be matched for diagnostic assessment."}
# If no controls were matched, all treated were unmatched
unmatched_treated_count = len(treated_orig) if not treated_orig.empty else 0
except Exception as diag_e:
logger.error(f"Error during diagnostic matching/balance assessment: {diag_e}")
diagnostics = {"error": f"Diagnostics failed: {diag_e}"}
else:
diagnostics = {"message": "Treatment or control group empty after dropping NaN PS, diagnostics skipped."}
unmatched_treated_count = len(treated_orig) if not treated_orig.empty else 0
# Ensure unmatched count calculation is safe
if 'unmatched_treated_count' not in locals():
unmatched_treated_count = 0 # Initialize if loop didn't run
diagnostics["unmatched_treated_count"] = unmatched_treated_count
diagnostics["percent_treated_matched"] = (len(treated_orig) - unmatched_treated_count) / len(treated_orig) * 100 if len(treated_orig) > 0 else 0
else:
diagnostics = {"error": "Propensity scores could not be estimated for diagnostics."}
# Add final details to diagnostics
diagnostics["att_estimation_method"] = method_used_for_att
diagnostics["propensity_score_model"] = propensity_model_type
diagnostics["bootstrap_iterations_for_se"] = actual_bootstrap_iterations
diagnostics["final_caliper_used"] = caliper
# --- Format and return results ---
logger.info(f"Formatting results. ATT Estimate: {att_estimate}, SE: {att_se}, Method: {method_used_for_att}")
return format_ps_results(att_estimate, att_se, diagnostics,
method_details=f"PSM ({method_used_for_att})",
parameters={"caliper": caliper,
"n_neighbors": n_neighbors, # n_neighbors used in fallback/bootstrap/diag
"propensity_model": propensity_model_type,
"n_bootstraps_config": n_bootstraps}) |