# Propensity Score Matching Implementation import pandas as pd import numpy as np from sklearn.neighbors import NearestNeighbors import statsmodels.api as sm # For bias adjustment regression import logging # For logging fallback from typing import Dict, List, Optional, Any # Import DoWhy from dowhy import CausalModel from .base import estimate_propensity_scores, format_ps_results, select_propensity_model from .diagnostics import assess_balance #, plot_overlap, plot_balance # Import diagnostic functions # Remove determine_optimal_caliper, it will be replaced by a heuristic from .llm_assist import get_llm_parameters # Import LLM helpers logger = logging.getLogger(__name__) def _calculate_logit(pscore): """Calculate logit of propensity score, clipping to avoid inf.""" # Clip pscore to prevent log(0) or log(1) issues which lead to inf epsilon = 1e-7 pscore_clipped = np.clip(pscore, epsilon, 1 - epsilon) return np.log(pscore_clipped / (1 - pscore_clipped)) def _perform_matching_and_get_att( df_sample: pd.DataFrame, treatment: str, outcome: str, covariates: List[str], propensity_model_type: str, n_neighbors: int, caliper: float, perform_bias_adjustment: bool, **kwargs ) -> float: """ Helper to perform Custom KNN PSM and calculate ATT, potentially with bias adjustment. Returns the ATT estimate. """ df_ps = df_sample.copy() try: propensity_scores = estimate_propensity_scores( df_ps, treatment, covariates, model_type=propensity_model_type, **kwargs ) except Exception as e: logger.warning(f"Propensity score estimation failed in helper: {e}") return np.nan # Cannot proceed without propensity scores df_ps['propensity_score'] = propensity_scores treated = df_ps[df_ps[treatment] == 1] control = df_ps[df_ps[treatment] == 0] if treated.empty or control.empty: return np.nan nn = NearestNeighbors(n_neighbors=n_neighbors, radius=caliper if caliper is not None else np.inf, metric='minkowski', p=2) try: # Ensure control PS are valid before fitting control_ps_values = control[['propensity_score']].values if np.isnan(control_ps_values).any(): logger.warning("NaN values found in control propensity scores before NN fitting.") return np.nan nn.fit(control_ps_values) # Ensure treated PS are valid before querying treated_ps_values = treated[['propensity_score']].values if np.isnan(treated_ps_values).any(): logger.warning("NaN values found in treated propensity scores before NN query.") return np.nan distances, indices = nn.kneighbors(treated_ps_values) except ValueError as e: # Handles case where control group might be too small or have NaN PS scores logger.warning(f"NearestNeighbors fitting/query failed: {e}") return np.nan matched_outcomes_treated = [] matched_outcomes_control_means = [] propensity_diffs = [] for i in range(len(treated)): treated_unit = treated.iloc[[i]] valid_neighbors_mask = distances[i] <= (caliper if caliper is not None else np.inf) valid_neighbors_idx = indices[i][valid_neighbors_mask] if len(valid_neighbors_idx) > 0: matched_controls_for_this_treated = control.iloc[valid_neighbors_idx] if matched_controls_for_this_treated.empty: continue # Should not happen with valid_neighbors_idx check, but safety matched_outcomes_treated.append(treated_unit[outcome].values[0]) matched_outcomes_control_means.append(matched_controls_for_this_treated[outcome].mean()) if perform_bias_adjustment: # Ensure PS scores are valid before calculating difference treated_ps = treated_unit['propensity_score'].values[0] control_ps_mean = matched_controls_for_this_treated['propensity_score'].mean() if np.isnan(treated_ps) or np.isnan(control_ps_mean): logger.warning("NaN propensity score encountered during bias adjustment calculation.") # Cannot perform bias adjustment for this unit, potentially skip or handle # For now, let's skip adding to propensity_diffs if NaN found continue propensity_diff = treated_ps - control_ps_mean propensity_diffs.append(propensity_diff) if not matched_outcomes_treated: return np.nan raw_att_components = np.array(matched_outcomes_treated) - np.array(matched_outcomes_control_means) if perform_bias_adjustment: # Ensure lengths match *after* potential skips due to NaNs if not propensity_diffs or len(raw_att_components) != len(propensity_diffs): logger.warning("Bias adjustment skipped due to inconsistent data lengths after NaN checks.") return np.mean(raw_att_components) try: X_bias_adj = sm.add_constant(np.array(propensity_diffs)) y_bias_adj = raw_att_components # Add check for NaNs/Infs in inputs to OLS if np.isnan(X_bias_adj).any() or np.isnan(y_bias_adj).any() or \ np.isinf(X_bias_adj).any() or np.isinf(y_bias_adj).any(): logger.warning("NaN/Inf values detected in OLS inputs for bias adjustment. Falling back.") return np.mean(raw_att_components) bias_model = sm.OLS(y_bias_adj, X_bias_adj).fit() bias_adjusted_att = bias_model.params[0] return bias_adjusted_att except Exception as e: logger.warning(f"OLS for bias adjustment failed: {e}. Falling back to raw ATT.") return np.mean(raw_att_components) else: return np.mean(raw_att_components) def estimate_effect(df: pd.DataFrame, treatment: str, outcome: str, covariates: List[str], **kwargs) -> Dict[str, Any]: '''Estimate ATT using Propensity Score Matching. Tries DoWhy's PSM first, falls back to custom implementation if DoWhy fails. Uses bootstrap SE based on the custom implementation regardless. ''' query = kwargs.get('query') n_bootstraps = kwargs.get('n_bootstraps', 100) # --- Parameter Setup (as before) --- llm_params = get_llm_parameters(df, query, "PS.Matching") llm_suggested_params = llm_params.get("parameters", {}) caliper = kwargs.get('caliper', llm_suggested_params.get('caliper')) temp_propensity_scores_for_caliper = None try: temp_propensity_scores_for_caliper = estimate_propensity_scores( df, treatment, covariates, model_type=llm_suggested_params.get('propensity_model_type', 'logistic'), **kwargs ) if caliper is None and temp_propensity_scores_for_caliper is not None: logit_ps = _calculate_logit(temp_propensity_scores_for_caliper) if not np.isnan(logit_ps).all(): # Check if logit calculation was successful caliper = 0.2 * np.nanstd(logit_ps) # Use nanstd for robustness else: logger.warning("Logit of propensity scores resulted in NaNs, cannot calculate heuristic caliper.") caliper = None elif caliper is None: logger.warning("Could not estimate propensity scores for caliper heuristic.") caliper = None except Exception as e: logger.warning(f"Failed to estimate initial propensity scores for caliper heuristic: {e}. Caliper set to None.") caliper = None # Proceed without caliper if heuristic fails n_neighbors = kwargs.get('n_neighbors', llm_suggested_params.get('n_neighbors', 1)) propensity_model_type = kwargs.get('propensity_model_type', llm_suggested_params.get('propensity_model_type', select_propensity_model(df, treatment, covariates, query))) # --- Attempt DoWhy PSM for Point Estimate --- att_estimate = np.nan method_used_for_att = "Fallback Custom PSM" dowhy_model = None identified_estimand = None try: logger.info("Attempting estimation using DoWhy Propensity Score Matching...") dowhy_model = CausalModel( data=df, treatment=treatment, outcome=outcome, common_causes=covariates, estimand_type='nonparametric-ate' # Provide list of names directly ) # Identify estimand (optional step, but good practice) identified_estimand = dowhy_model.identify_effect(proceed_when_unidentifiable=True) logger.info(f"DoWhy identified estimand: {identified_estimand}") # Estimate effect using DoWhy's PSM estimate = dowhy_model.estimate_effect( identified_estimand, method_name="backdoor.propensity_score_matching", target_units="att", method_params={} ) att_estimate = estimate.value method_used_for_att = "DoWhy PSM" logger.info(f"DoWhy PSM successful. ATT Estimate: {att_estimate}") except Exception as e: logger.warning(f"DoWhy PSM failed: {e}. Falling back to custom PSM implementation.") # Fallback is triggered implicitly if att_estimate remains NaN # --- Fallback or if DoWhy failed --- if np.isnan(att_estimate): logger.info("Calculating ATT estimate using fallback custom PSM...") att_estimate = _perform_matching_and_get_att( df, treatment, outcome, covariates, propensity_model_type, n_neighbors, caliper, perform_bias_adjustment=True, **kwargs # Bias adjust the fallback ) method_used_for_att = "Fallback Custom PSM" # Confirm it's fallback if np.isnan(att_estimate): raise ValueError("Fallback custom PSM estimation also failed. Cannot proceed.") logger.info(f"Fallback Custom PSM successful. ATT Estimate: {att_estimate}") # --- Bootstrap SE (using custom helper for consistency) --- logger.info(f"Calculating Bootstrap SE using custom helper ({n_bootstraps} iterations)...") bootstrap_atts = [] for i in range(n_bootstraps): try: # Ensure bootstrap samples are drawn correctly df_boot = df.sample(n=len(df), replace=True, random_state=np.random.randint(1000000) + i) # Bias adjustment in bootstrap can be slow, optionally disable it boot_att = _perform_matching_and_get_att( df_boot, treatment, outcome, covariates, propensity_model_type, n_neighbors, caliper, perform_bias_adjustment=False, **kwargs # Set bias adjustment to False for speed in bootstrap ) if not np.isnan(boot_att): bootstrap_atts.append(boot_att) except Exception as boot_e: logger.warning(f"Bootstrap iteration {i+1} failed: {boot_e}") continue # Skip failed bootstrap iteration att_se = np.nanstd(bootstrap_atts) if bootstrap_atts else np.nan # Use nanstd actual_bootstrap_iterations = len(bootstrap_atts) logger.info(f"Bootstrap SE calculated: {att_se} from {actual_bootstrap_iterations} successful iterations.") # --- Diagnostics (using custom matching logic for consistency) --- logger.info("Performing diagnostic checks using custom matching logic...") diagnostics = {"error": "Diagnostics failed to run."} propensity_scores_orig = temp_propensity_scores_for_caliper # Reuse if available and not None if propensity_scores_orig is None: try: propensity_scores_orig = estimate_propensity_scores( df, treatment, covariates, model_type=propensity_model_type, **kwargs ) except Exception as e: logger.error(f"Failed to estimate propensity scores for diagnostics: {e}") propensity_scores_orig = None if propensity_scores_orig is not None and not np.isnan(propensity_scores_orig).all(): df_ps_orig = df.copy() df_ps_orig['propensity_score'] = propensity_scores_orig treated_orig = df_ps_orig[df_ps_orig[treatment] == 1] control_orig = df_ps_orig[df_ps_orig[treatment] == 0] unmatched_treated_count = 0 # Drop rows with NaN propensity scores before diagnostics treated_orig = treated_orig.dropna(subset=['propensity_score']) control_orig = control_orig.dropna(subset=['propensity_score']) if not treated_orig.empty and not control_orig.empty: try: nn_diag = NearestNeighbors(n_neighbors=n_neighbors, radius=caliper if caliper is not None else np.inf, metric='minkowski', p=2) nn_diag.fit(control_orig[['propensity_score']].values) distances_diag, indices_diag = nn_diag.kneighbors(treated_orig[['propensity_score']].values) matched_treated_indices_diag = [] matched_control_indices_diag = [] for i in range(len(treated_orig)): valid_neighbors_mask_diag = distances_diag[i] <= (caliper if caliper is not None else np.inf) valid_neighbors_idx_diag = indices_diag[i][valid_neighbors_mask_diag] if len(valid_neighbors_idx_diag) > 0: # Get original DataFrame indices from control_orig based on iloc indices selected_control_original_indices = control_orig.index[valid_neighbors_idx_diag] matched_treated_indices_diag.extend([treated_orig.index[i]] * len(selected_control_original_indices)) matched_control_indices_diag.extend(selected_control_original_indices) else: unmatched_treated_count += 1 if matched_control_indices_diag: # Use unique indices for creating the diagnostic dataframe unique_matched_control_indices = list(set(matched_control_indices_diag)) unique_matched_treated_indices = list(set(matched_treated_indices_diag)) matched_control_df_diag = df.loc[unique_matched_control_indices] matched_treated_df_for_diag = df.loc[unique_matched_treated_indices] matched_df_diag = pd.concat([matched_treated_df_for_diag, matched_control_df_diag]).drop_duplicates() # Retrieve propensity scores for the specific units in matched_df_diag ps_matched_for_diag = propensity_scores_orig.loc[matched_df_diag.index] diagnostics = assess_balance(df, matched_df_diag, treatment, covariates, method="PSM", propensity_scores_original=propensity_scores_orig, propensity_scores_matched=ps_matched_for_diag) else: diagnostics = {"message": "No units could be matched for diagnostic assessment."} # If no controls were matched, all treated were unmatched unmatched_treated_count = len(treated_orig) if not treated_orig.empty else 0 except Exception as diag_e: logger.error(f"Error during diagnostic matching/balance assessment: {diag_e}") diagnostics = {"error": f"Diagnostics failed: {diag_e}"} else: diagnostics = {"message": "Treatment or control group empty after dropping NaN PS, diagnostics skipped."} unmatched_treated_count = len(treated_orig) if not treated_orig.empty else 0 # Ensure unmatched count calculation is safe if 'unmatched_treated_count' not in locals(): unmatched_treated_count = 0 # Initialize if loop didn't run diagnostics["unmatched_treated_count"] = unmatched_treated_count diagnostics["percent_treated_matched"] = (len(treated_orig) - unmatched_treated_count) / len(treated_orig) * 100 if len(treated_orig) > 0 else 0 else: diagnostics = {"error": "Propensity scores could not be estimated for diagnostics."} # Add final details to diagnostics diagnostics["att_estimation_method"] = method_used_for_att diagnostics["propensity_score_model"] = propensity_model_type diagnostics["bootstrap_iterations_for_se"] = actual_bootstrap_iterations diagnostics["final_caliper_used"] = caliper # --- Format and return results --- logger.info(f"Formatting results. ATT Estimate: {att_estimate}, SE: {att_se}, Method: {method_used_for_att}") return format_ps_results(att_estimate, att_se, diagnostics, method_details=f"PSM ({method_used_for_att})", parameters={"caliper": caliper, "n_neighbors": n_neighbors, # n_neighbors used in fallback/bootstrap/diag "propensity_model": propensity_model_type, "n_bootstraps_config": n_bootstraps})