# Propensity Score Weighting (IPW) Implementation import pandas as pd import numpy as np import statsmodels.api as sm from typing import Dict, List, Optional, Any from .base import estimate_propensity_scores, format_ps_results, select_propensity_model from .diagnostics import assess_weight_distribution, plot_overlap, plot_balance # Import diagnostic functions from .llm_assist import determine_optimal_weight_type, determine_optimal_trim_threshold, get_llm_parameters # Import LLM helpers def estimate_effect(df: pd.DataFrame, treatment: str, outcome: str, covariates: List[str], **kwargs) -> Dict[str, Any]: '''Generic propensity score weighting (IPW) implementation. Args: df: Dataset containing causal variables treatment: Name of treatment variable outcome: Name of outcome variable covariates: List of covariate names **kwargs: Method-specific parameters (e.g., weight_type, trim_threshold, query) Returns: Dictionary with effect estimate and diagnostics ''' query = kwargs.get('query') # --- LLM-Assisted Parameter Optimization / Defaults --- llm_params = get_llm_parameters(df, query, "PS.Weighting") llm_suggested_params = llm_params.get("parameters", {}) # Explicitly check LLM suggestion before falling back to default helper llm_weight_type = llm_suggested_params.get('weight_type') default_weight_type = determine_optimal_weight_type(df, treatment, query) if llm_weight_type is None else llm_weight_type weight_type = kwargs.get('weight_type', default_weight_type) # Similar explicit check for trim_threshold llm_trim_thresh = llm_suggested_params.get('trim_threshold') default_trim_thresh = determine_optimal_trim_threshold(df, treatment, query=query) if llm_trim_thresh is None else llm_trim_thresh trim_threshold = kwargs.get('trim_threshold', default_trim_thresh) propensity_model_type = kwargs.get('propensity_model_type', llm_suggested_params.get('propensity_model_type', select_propensity_model(df, treatment, covariates, query))) robust_se = kwargs.get('robust_se', True) # --- Step 1: Estimate propensity scores --- propensity_scores = estimate_propensity_scores(df, treatment, covariates, model_type=propensity_model_type, **kwargs) # Pass other kwargs like C, penalty etc. df_ps = df.copy() df_ps['propensity_score'] = propensity_scores # --- Step 2: Calculate weights --- if weight_type.upper() == 'ATE': weights = np.where(df_ps[treatment] == 1, 1 / df_ps['propensity_score'], 1 / (1 - df_ps['propensity_score'])) elif weight_type.upper() == 'ATT': weights = np.where(df_ps[treatment] == 1, 1, df_ps['propensity_score'] / (1 - df_ps['propensity_score'])) # TODO: Add other weight types like ATC if needed else: raise ValueError(f"Unsupported weight type: {weight_type}") df_ps['ipw'] = weights # --- Step 3: Apply trimming if needed --- if trim_threshold is not None and trim_threshold > 0: # Trim based on propensity score percentile min_ps_thresh = np.percentile(propensity_scores, trim_threshold * 100) max_ps_thresh = np.percentile(propensity_scores, (1 - trim_threshold) * 100) keep_indices = (df_ps['propensity_score'] >= min_ps_thresh) & (df_ps['propensity_score'] <= max_ps_thresh) df_trimmed = df_ps[keep_indices].copy() print(f"Trimming {len(df_ps) - len(df_trimmed)} units ({trim_threshold*100:.1f}% percentile trim)") if df_trimmed.empty: raise ValueError("All units removed after trimming. Try a smaller trim_threshold.") df_analysis = df_trimmed else: # Trim based on weight percentile (alternative approach) # q_low, q_high = np.percentile(weights, [trim_threshold*100, (1-trim_threshold)*100]) # df_ps['ipw'] = np.clip(df_ps['ipw'], q_low, q_high) df_analysis = df_ps.copy() trim_threshold = 0 # Explicitly set for parameters output # --- Step 4: Normalize weights (optional but common) --- # Normalize weights to sum to sample size within treated/control groups if ATT if weight_type.upper() == 'ATT': sum_weights_treated = df_analysis.loc[df_analysis[treatment] == 1, 'ipw'].sum() sum_weights_control = df_analysis.loc[df_analysis[treatment] == 0, 'ipw'].sum() n_treated = (df_analysis[treatment] == 1).sum() n_control = (df_analysis[treatment] == 0).sum() if sum_weights_treated > 0: df_analysis.loc[df_analysis[treatment] == 1, 'ipw'] *= n_treated / sum_weights_treated if sum_weights_control > 0: df_analysis.loc[df_analysis[treatment] == 0, 'ipw'] *= n_control / sum_weights_control else: # ATE normalization df_analysis['ipw'] *= len(df_analysis) / df_analysis['ipw'].sum() # --- Step 5: Estimate weighted treatment effect --- X_treat = sm.add_constant(df_analysis[[treatment]]) # Use only treatment variable for direct effect wls_model = sm.WLS(df_analysis[outcome], X_treat, weights=df_analysis['ipw']) results = wls_model.fit(cov_type='HC1' if robust_se else 'nonrobust') effect = results.params[treatment] effect_se = results.bse[treatment] # --- Step 6: Validate weight quality / Diagnostics --- diagnostics = assess_weight_distribution(df_analysis['ipw'], df_analysis[treatment]) # Could also add balance assessment on the weighted sample # weighted_diagnostics = assess_balance(df, df_analysis, treatment, covariates, method="PSW", weights=df_analysis['ipw']) # diagnostics.update(weighted_diagnostics) diagnostics["propensity_score_model"] = propensity_model_type # --- Step 7: Format and return results --- return format_ps_results(effect, effect_se, diagnostics, method_details="PS.Weighting", parameters={"weight_type": weight_type, "trim_threshold": trim_threshold, "propensity_model": propensity_model_type, "robust_se": robust_se})