FireShadow's picture
Initial clean commit
1721aea
# Propensity Score Matching Implementation
import pandas as pd
import numpy as np
from sklearn.neighbors import NearestNeighbors
import statsmodels.api as sm # For bias adjustment regression
import logging # For logging fallback
from typing import Dict, List, Optional, Any
# Import DoWhy
from dowhy import CausalModel
from .base import estimate_propensity_scores, format_ps_results, select_propensity_model
from .diagnostics import assess_balance #, plot_overlap, plot_balance # Import diagnostic functions
# Remove determine_optimal_caliper, it will be replaced by a heuristic
from .llm_assist import get_llm_parameters # Import LLM helpers
logger = logging.getLogger(__name__)
def _calculate_logit(pscore):
"""Calculate logit of propensity score, clipping to avoid inf."""
# Clip pscore to prevent log(0) or log(1) issues which lead to inf
epsilon = 1e-7
pscore_clipped = np.clip(pscore, epsilon, 1 - epsilon)
return np.log(pscore_clipped / (1 - pscore_clipped))
def _perform_matching_and_get_att(
df_sample: pd.DataFrame,
treatment: str,
outcome: str,
covariates: List[str],
propensity_model_type: str,
n_neighbors: int,
caliper: float,
perform_bias_adjustment: bool,
**kwargs
) -> float:
"""
Helper to perform Custom KNN PSM and calculate ATT, potentially with bias adjustment.
Returns the ATT estimate.
"""
df_ps = df_sample.copy()
try:
propensity_scores = estimate_propensity_scores(
df_ps, treatment, covariates, model_type=propensity_model_type, **kwargs
)
except Exception as e:
logger.warning(f"Propensity score estimation failed in helper: {e}")
return np.nan # Cannot proceed without propensity scores
df_ps['propensity_score'] = propensity_scores
treated = df_ps[df_ps[treatment] == 1]
control = df_ps[df_ps[treatment] == 0]
if treated.empty or control.empty:
return np.nan
nn = NearestNeighbors(n_neighbors=n_neighbors, radius=caliper if caliper is not None else np.inf, metric='minkowski', p=2)
try:
# Ensure control PS are valid before fitting
control_ps_values = control[['propensity_score']].values
if np.isnan(control_ps_values).any():
logger.warning("NaN values found in control propensity scores before NN fitting.")
return np.nan
nn.fit(control_ps_values)
# Ensure treated PS are valid before querying
treated_ps_values = treated[['propensity_score']].values
if np.isnan(treated_ps_values).any():
logger.warning("NaN values found in treated propensity scores before NN query.")
return np.nan
distances, indices = nn.kneighbors(treated_ps_values)
except ValueError as e:
# Handles case where control group might be too small or have NaN PS scores
logger.warning(f"NearestNeighbors fitting/query failed: {e}")
return np.nan
matched_outcomes_treated = []
matched_outcomes_control_means = []
propensity_diffs = []
for i in range(len(treated)):
treated_unit = treated.iloc[[i]]
valid_neighbors_mask = distances[i] <= (caliper if caliper is not None else np.inf)
valid_neighbors_idx = indices[i][valid_neighbors_mask]
if len(valid_neighbors_idx) > 0:
matched_controls_for_this_treated = control.iloc[valid_neighbors_idx]
if matched_controls_for_this_treated.empty:
continue # Should not happen with valid_neighbors_idx check, but safety
matched_outcomes_treated.append(treated_unit[outcome].values[0])
matched_outcomes_control_means.append(matched_controls_for_this_treated[outcome].mean())
if perform_bias_adjustment:
# Ensure PS scores are valid before calculating difference
treated_ps = treated_unit['propensity_score'].values[0]
control_ps_mean = matched_controls_for_this_treated['propensity_score'].mean()
if np.isnan(treated_ps) or np.isnan(control_ps_mean):
logger.warning("NaN propensity score encountered during bias adjustment calculation.")
# Cannot perform bias adjustment for this unit, potentially skip or handle
# For now, let's skip adding to propensity_diffs if NaN found
continue
propensity_diff = treated_ps - control_ps_mean
propensity_diffs.append(propensity_diff)
if not matched_outcomes_treated:
return np.nan
raw_att_components = np.array(matched_outcomes_treated) - np.array(matched_outcomes_control_means)
if perform_bias_adjustment:
# Ensure lengths match *after* potential skips due to NaNs
if not propensity_diffs or len(raw_att_components) != len(propensity_diffs):
logger.warning("Bias adjustment skipped due to inconsistent data lengths after NaN checks.")
return np.mean(raw_att_components)
try:
X_bias_adj = sm.add_constant(np.array(propensity_diffs))
y_bias_adj = raw_att_components
# Add check for NaNs/Infs in inputs to OLS
if np.isnan(X_bias_adj).any() or np.isnan(y_bias_adj).any() or \
np.isinf(X_bias_adj).any() or np.isinf(y_bias_adj).any():
logger.warning("NaN/Inf values detected in OLS inputs for bias adjustment. Falling back.")
return np.mean(raw_att_components)
bias_model = sm.OLS(y_bias_adj, X_bias_adj).fit()
bias_adjusted_att = bias_model.params[0]
return bias_adjusted_att
except Exception as e:
logger.warning(f"OLS for bias adjustment failed: {e}. Falling back to raw ATT.")
return np.mean(raw_att_components)
else:
return np.mean(raw_att_components)
def estimate_effect(df: pd.DataFrame, treatment: str, outcome: str,
covariates: List[str], **kwargs) -> Dict[str, Any]:
'''Estimate ATT using Propensity Score Matching.
Tries DoWhy's PSM first, falls back to custom implementation if DoWhy fails.
Uses bootstrap SE based on the custom implementation regardless.
'''
query = kwargs.get('query')
n_bootstraps = kwargs.get('n_bootstraps', 100)
# --- Parameter Setup (as before) ---
llm_params = get_llm_parameters(df, query, "PS.Matching")
llm_suggested_params = llm_params.get("parameters", {})
caliper = kwargs.get('caliper', llm_suggested_params.get('caliper'))
temp_propensity_scores_for_caliper = None
try:
temp_propensity_scores_for_caliper = estimate_propensity_scores(
df, treatment, covariates,
model_type=llm_suggested_params.get('propensity_model_type', 'logistic'),
**kwargs
)
if caliper is None and temp_propensity_scores_for_caliper is not None:
logit_ps = _calculate_logit(temp_propensity_scores_for_caliper)
if not np.isnan(logit_ps).all(): # Check if logit calculation was successful
caliper = 0.2 * np.nanstd(logit_ps) # Use nanstd for robustness
else:
logger.warning("Logit of propensity scores resulted in NaNs, cannot calculate heuristic caliper.")
caliper = None
elif caliper is None:
logger.warning("Could not estimate propensity scores for caliper heuristic.")
caliper = None
except Exception as e:
logger.warning(f"Failed to estimate initial propensity scores for caliper heuristic: {e}. Caliper set to None.")
caliper = None # Proceed without caliper if heuristic fails
n_neighbors = kwargs.get('n_neighbors', llm_suggested_params.get('n_neighbors', 1))
propensity_model_type = kwargs.get('propensity_model_type',
llm_suggested_params.get('propensity_model_type',
select_propensity_model(df, treatment, covariates, query)))
# --- Attempt DoWhy PSM for Point Estimate ---
att_estimate = np.nan
method_used_for_att = "Fallback Custom PSM"
dowhy_model = None
identified_estimand = None
try:
logger.info("Attempting estimation using DoWhy Propensity Score Matching...")
dowhy_model = CausalModel(
data=df,
treatment=treatment,
outcome=outcome,
common_causes=covariates,
estimand_type='nonparametric-ate' # Provide list of names directly
)
# Identify estimand (optional step, but good practice)
identified_estimand = dowhy_model.identify_effect(proceed_when_unidentifiable=True)
logger.info(f"DoWhy identified estimand: {identified_estimand}")
# Estimate effect using DoWhy's PSM
estimate = dowhy_model.estimate_effect(
identified_estimand,
method_name="backdoor.propensity_score_matching",
target_units="att",
method_params={}
)
att_estimate = estimate.value
method_used_for_att = "DoWhy PSM"
logger.info(f"DoWhy PSM successful. ATT Estimate: {att_estimate}")
except Exception as e:
logger.warning(f"DoWhy PSM failed: {e}. Falling back to custom PSM implementation.")
# Fallback is triggered implicitly if att_estimate remains NaN
# --- Fallback or if DoWhy failed ---
if np.isnan(att_estimate):
logger.info("Calculating ATT estimate using fallback custom PSM...")
att_estimate = _perform_matching_and_get_att(
df, treatment, outcome, covariates,
propensity_model_type, n_neighbors, caliper,
perform_bias_adjustment=True, **kwargs # Bias adjust the fallback
)
method_used_for_att = "Fallback Custom PSM" # Confirm it's fallback
if np.isnan(att_estimate):
raise ValueError("Fallback custom PSM estimation also failed. Cannot proceed.")
logger.info(f"Fallback Custom PSM successful. ATT Estimate: {att_estimate}")
# --- Bootstrap SE (using custom helper for consistency) ---
logger.info(f"Calculating Bootstrap SE using custom helper ({n_bootstraps} iterations)...")
bootstrap_atts = []
for i in range(n_bootstraps):
try:
# Ensure bootstrap samples are drawn correctly
df_boot = df.sample(n=len(df), replace=True, random_state=np.random.randint(1000000) + i)
# Bias adjustment in bootstrap can be slow, optionally disable it
boot_att = _perform_matching_and_get_att(
df_boot, treatment, outcome, covariates,
propensity_model_type, n_neighbors, caliper,
perform_bias_adjustment=False, **kwargs # Set bias adjustment to False for speed in bootstrap
)
if not np.isnan(boot_att):
bootstrap_atts.append(boot_att)
except Exception as boot_e:
logger.warning(f"Bootstrap iteration {i+1} failed: {boot_e}")
continue # Skip failed bootstrap iteration
att_se = np.nanstd(bootstrap_atts) if bootstrap_atts else np.nan # Use nanstd
actual_bootstrap_iterations = len(bootstrap_atts)
logger.info(f"Bootstrap SE calculated: {att_se} from {actual_bootstrap_iterations} successful iterations.")
# --- Diagnostics (using custom matching logic for consistency) ---
logger.info("Performing diagnostic checks using custom matching logic...")
diagnostics = {"error": "Diagnostics failed to run."}
propensity_scores_orig = temp_propensity_scores_for_caliper # Reuse if available and not None
if propensity_scores_orig is None:
try:
propensity_scores_orig = estimate_propensity_scores(
df, treatment, covariates, model_type=propensity_model_type, **kwargs
)
except Exception as e:
logger.error(f"Failed to estimate propensity scores for diagnostics: {e}")
propensity_scores_orig = None
if propensity_scores_orig is not None and not np.isnan(propensity_scores_orig).all():
df_ps_orig = df.copy()
df_ps_orig['propensity_score'] = propensity_scores_orig
treated_orig = df_ps_orig[df_ps_orig[treatment] == 1]
control_orig = df_ps_orig[df_ps_orig[treatment] == 0]
unmatched_treated_count = 0
# Drop rows with NaN propensity scores before diagnostics
treated_orig = treated_orig.dropna(subset=['propensity_score'])
control_orig = control_orig.dropna(subset=['propensity_score'])
if not treated_orig.empty and not control_orig.empty:
try:
nn_diag = NearestNeighbors(n_neighbors=n_neighbors, radius=caliper if caliper is not None else np.inf, metric='minkowski', p=2)
nn_diag.fit(control_orig[['propensity_score']].values)
distances_diag, indices_diag = nn_diag.kneighbors(treated_orig[['propensity_score']].values)
matched_treated_indices_diag = []
matched_control_indices_diag = []
for i in range(len(treated_orig)):
valid_neighbors_mask_diag = distances_diag[i] <= (caliper if caliper is not None else np.inf)
valid_neighbors_idx_diag = indices_diag[i][valid_neighbors_mask_diag]
if len(valid_neighbors_idx_diag) > 0:
# Get original DataFrame indices from control_orig based on iloc indices
selected_control_original_indices = control_orig.index[valid_neighbors_idx_diag]
matched_treated_indices_diag.extend([treated_orig.index[i]] * len(selected_control_original_indices))
matched_control_indices_diag.extend(selected_control_original_indices)
else:
unmatched_treated_count += 1
if matched_control_indices_diag:
# Use unique indices for creating the diagnostic dataframe
unique_matched_control_indices = list(set(matched_control_indices_diag))
unique_matched_treated_indices = list(set(matched_treated_indices_diag))
matched_control_df_diag = df.loc[unique_matched_control_indices]
matched_treated_df_for_diag = df.loc[unique_matched_treated_indices]
matched_df_diag = pd.concat([matched_treated_df_for_diag, matched_control_df_diag]).drop_duplicates()
# Retrieve propensity scores for the specific units in matched_df_diag
ps_matched_for_diag = propensity_scores_orig.loc[matched_df_diag.index]
diagnostics = assess_balance(df, matched_df_diag, treatment, covariates,
method="PSM",
propensity_scores_original=propensity_scores_orig,
propensity_scores_matched=ps_matched_for_diag)
else:
diagnostics = {"message": "No units could be matched for diagnostic assessment."}
# If no controls were matched, all treated were unmatched
unmatched_treated_count = len(treated_orig) if not treated_orig.empty else 0
except Exception as diag_e:
logger.error(f"Error during diagnostic matching/balance assessment: {diag_e}")
diagnostics = {"error": f"Diagnostics failed: {diag_e}"}
else:
diagnostics = {"message": "Treatment or control group empty after dropping NaN PS, diagnostics skipped."}
unmatched_treated_count = len(treated_orig) if not treated_orig.empty else 0
# Ensure unmatched count calculation is safe
if 'unmatched_treated_count' not in locals():
unmatched_treated_count = 0 # Initialize if loop didn't run
diagnostics["unmatched_treated_count"] = unmatched_treated_count
diagnostics["percent_treated_matched"] = (len(treated_orig) - unmatched_treated_count) / len(treated_orig) * 100 if len(treated_orig) > 0 else 0
else:
diagnostics = {"error": "Propensity scores could not be estimated for diagnostics."}
# Add final details to diagnostics
diagnostics["att_estimation_method"] = method_used_for_att
diagnostics["propensity_score_model"] = propensity_model_type
diagnostics["bootstrap_iterations_for_se"] = actual_bootstrap_iterations
diagnostics["final_caliper_used"] = caliper
# --- Format and return results ---
logger.info(f"Formatting results. ATT Estimate: {att_estimate}, SE: {att_se}, Method: {method_used_for_att}")
return format_ps_results(att_estimate, att_se, diagnostics,
method_details=f"PSM ({method_used_for_att})",
parameters={"caliper": caliper,
"n_neighbors": n_neighbors, # n_neighbors used in fallback/bootstrap/diag
"propensity_model": propensity_model_type,
"n_bootstraps_config": n_bootstraps})