Spaces:
Running
Running
# Propensity Score Weighting (IPW) Implementation | |
import pandas as pd | |
import numpy as np | |
import statsmodels.api as sm | |
from typing import Dict, List, Optional, Any | |
from .base import estimate_propensity_scores, format_ps_results, select_propensity_model | |
from .diagnostics import assess_weight_distribution, plot_overlap, plot_balance # Import diagnostic functions | |
from .llm_assist import determine_optimal_weight_type, determine_optimal_trim_threshold, get_llm_parameters # Import LLM helpers | |
def estimate_effect(df: pd.DataFrame, treatment: str, outcome: str, | |
covariates: List[str], **kwargs) -> Dict[str, Any]: | |
'''Generic propensity score weighting (IPW) implementation. | |
Args: | |
df: Dataset containing causal variables | |
treatment: Name of treatment variable | |
outcome: Name of outcome variable | |
covariates: List of covariate names | |
**kwargs: Method-specific parameters (e.g., weight_type, trim_threshold, query) | |
Returns: | |
Dictionary with effect estimate and diagnostics | |
''' | |
query = kwargs.get('query') | |
# --- LLM-Assisted Parameter Optimization / Defaults --- | |
llm_params = get_llm_parameters(df, query, "PS.Weighting") | |
llm_suggested_params = llm_params.get("parameters", {}) | |
# Explicitly check LLM suggestion before falling back to default helper | |
llm_weight_type = llm_suggested_params.get('weight_type') | |
default_weight_type = determine_optimal_weight_type(df, treatment, query) if llm_weight_type is None else llm_weight_type | |
weight_type = kwargs.get('weight_type', default_weight_type) | |
# Similar explicit check for trim_threshold | |
llm_trim_thresh = llm_suggested_params.get('trim_threshold') | |
default_trim_thresh = determine_optimal_trim_threshold(df, treatment, query=query) if llm_trim_thresh is None else llm_trim_thresh | |
trim_threshold = kwargs.get('trim_threshold', default_trim_thresh) | |
propensity_model_type = kwargs.get('propensity_model_type', | |
llm_suggested_params.get('propensity_model_type', | |
select_propensity_model(df, treatment, covariates, query))) | |
robust_se = kwargs.get('robust_se', True) | |
# --- Step 1: Estimate propensity scores --- | |
propensity_scores = estimate_propensity_scores(df, treatment, covariates, | |
model_type=propensity_model_type, | |
**kwargs) # Pass other kwargs like C, penalty etc. | |
df_ps = df.copy() | |
df_ps['propensity_score'] = propensity_scores | |
# --- Step 2: Calculate weights --- | |
if weight_type.upper() == 'ATE': | |
weights = np.where(df_ps[treatment] == 1, | |
1 / df_ps['propensity_score'], | |
1 / (1 - df_ps['propensity_score'])) | |
elif weight_type.upper() == 'ATT': | |
weights = np.where(df_ps[treatment] == 1, | |
1, | |
df_ps['propensity_score'] / (1 - df_ps['propensity_score'])) | |
# TODO: Add other weight types like ATC if needed | |
else: | |
raise ValueError(f"Unsupported weight type: {weight_type}") | |
df_ps['ipw'] = weights | |
# --- Step 3: Apply trimming if needed --- | |
if trim_threshold is not None and trim_threshold > 0: | |
# Trim based on propensity score percentile | |
min_ps_thresh = np.percentile(propensity_scores, trim_threshold * 100) | |
max_ps_thresh = np.percentile(propensity_scores, (1 - trim_threshold) * 100) | |
keep_indices = (df_ps['propensity_score'] >= min_ps_thresh) & (df_ps['propensity_score'] <= max_ps_thresh) | |
df_trimmed = df_ps[keep_indices].copy() | |
print(f"Trimming {len(df_ps) - len(df_trimmed)} units ({trim_threshold*100:.1f}% percentile trim)") | |
if df_trimmed.empty: | |
raise ValueError("All units removed after trimming. Try a smaller trim_threshold.") | |
df_analysis = df_trimmed | |
else: | |
# Trim based on weight percentile (alternative approach) | |
# q_low, q_high = np.percentile(weights, [trim_threshold*100, (1-trim_threshold)*100]) | |
# df_ps['ipw'] = np.clip(df_ps['ipw'], q_low, q_high) | |
df_analysis = df_ps.copy() | |
trim_threshold = 0 # Explicitly set for parameters output | |
# --- Step 4: Normalize weights (optional but common) --- | |
# Normalize weights to sum to sample size within treated/control groups if ATT | |
if weight_type.upper() == 'ATT': | |
sum_weights_treated = df_analysis.loc[df_analysis[treatment] == 1, 'ipw'].sum() | |
sum_weights_control = df_analysis.loc[df_analysis[treatment] == 0, 'ipw'].sum() | |
n_treated = (df_analysis[treatment] == 1).sum() | |
n_control = (df_analysis[treatment] == 0).sum() | |
if sum_weights_treated > 0: | |
df_analysis.loc[df_analysis[treatment] == 1, 'ipw'] *= n_treated / sum_weights_treated | |
if sum_weights_control > 0: | |
df_analysis.loc[df_analysis[treatment] == 0, 'ipw'] *= n_control / sum_weights_control | |
else: # ATE normalization | |
df_analysis['ipw'] *= len(df_analysis) / df_analysis['ipw'].sum() | |
# --- Step 5: Estimate weighted treatment effect --- | |
X_treat = sm.add_constant(df_analysis[[treatment]]) # Use only treatment variable for direct effect | |
wls_model = sm.WLS(df_analysis[outcome], X_treat, weights=df_analysis['ipw']) | |
results = wls_model.fit(cov_type='HC1' if robust_se else 'nonrobust') | |
effect = results.params[treatment] | |
effect_se = results.bse[treatment] | |
# --- Step 6: Validate weight quality / Diagnostics --- | |
diagnostics = assess_weight_distribution(df_analysis['ipw'], df_analysis[treatment]) | |
# Could also add balance assessment on the weighted sample | |
# weighted_diagnostics = assess_balance(df, df_analysis, treatment, covariates, method="PSW", weights=df_analysis['ipw']) | |
# diagnostics.update(weighted_diagnostics) | |
diagnostics["propensity_score_model"] = propensity_model_type | |
# --- Step 7: Format and return results --- | |
return format_ps_results(effect, effect_se, diagnostics, | |
method_details="PS.Weighting", | |
parameters={"weight_type": weight_type, | |
"trim_threshold": trim_threshold, | |
"propensity_model": propensity_model_type, | |
"robust_se": robust_se}) |