File size: 10,763 Bytes
1721aea
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
# Placeholder for IV-specific diagnostic functions
import pandas as pd
import statsmodels.api as sm
from statsmodels.regression.linear_model import OLS
# from statsmodels.sandbox.regression.gmm import IV2SLSResults # Removed problematic import
from typing import Dict, Any, List, Tuple, Optional
import logging # Import logging
import numpy as np # Import numpy for np.zeros

# Configure logger
logger = logging.getLogger(__name__)

def calculate_first_stage_f_statistic(df: pd.DataFrame, treatment: str, instruments: List[str], covariates: List[str]) -> Tuple[Optional[float], Optional[float]]:
    """
    Calculates the F-statistic for instrument relevance in the first stage regression.

    Regresses treatment ~ instruments + covariates.
    Tests the joint significance of the instrument coefficients.

    Args:
        df: Input DataFrame.
        treatment: Name of the treatment variable.
        instruments: List of instrument variable names.
        covariates: List of covariate names.

    Returns:
        A tuple containing (F-statistic, p-value). Returns (None, None) on error.
    """
    logger.info("Diagnostics: Calculating First-Stage F-statistic...")
    try:
        df_copy = df.copy()
        df_copy['intercept'] = 1
        exog_vars = ['intercept'] + covariates
        all_first_stage_exog = list(dict.fromkeys(exog_vars + instruments)) # Ensure unique columns

        endog = df_copy[treatment]
        exog = df_copy[all_first_stage_exog]

        # Check for perfect multicollinearity before fitting
        if exog.shape[1] > 1:
            corr_matrix = exog.corr()
            # Check if correlation matrix calculation failed (e.g., constant columns) or high correlation
            if corr_matrix.isnull().values.any() or (corr_matrix.abs() > 0.9999).sum().sum() > exog.shape[1]: # Check off-diagonal elements
                 logger.warning("High multicollinearity or constant column detected in first stage exogenous variables.")
                 # Note: statsmodels OLS might handle perfect collinearity by dropping columns, but F-test might be unreliable.

        first_stage_model = OLS(endog, exog).fit()

        # Construct the restriction matrix (R) to test H0: instrument coeffs = 0
        num_instruments = len(instruments)
        if num_instruments == 0:
            logger.warning("No instruments provided for F-statistic calculation.")
            return None, None
        num_exog_total = len(all_first_stage_exog)

        # Ensure instruments are actually in the fitted model's exog names (in case statsmodels dropped some)
        fitted_exog_names = first_stage_model.model.exog_names
        valid_instruments = [inst for inst in instruments if inst in fitted_exog_names]
        if not valid_instruments:
             logger.error("None of the provided instruments were included in the first-stage regression model (possibly due to collinearity).")
             return None, None
        if len(valid_instruments) < len(instruments):
            logger.warning(f"Instruments dropped by OLS: {set(instruments) - set(valid_instruments)}")

        instrument_indices = [fitted_exog_names.index(inst) for inst in valid_instruments]

        # Need to adjust R matrix size based on fitted model's exog
        R = np.zeros((len(valid_instruments), len(fitted_exog_names)))
        for i, idx in enumerate(instrument_indices):
            R[i, idx] = 1

        # Perform F-test
        f_test_result = first_stage_model.f_test(R)

        f_statistic = float(f_test_result.fvalue)
        p_value = float(f_test_result.pvalue)

        logger.info(f"  F-statistic: {f_statistic:.4f}, p-value: {p_value:.4f}")
        return f_statistic, p_value

    except Exception as e:
        logger.error(f"Error calculating first-stage F-statistic: {e}", exc_info=True)
        return None, None

def run_overidentification_test(sm_results: Optional[Any], df: pd.DataFrame, treatment: str, outcome: str, instruments: List[str], covariates: List[str]) -> Tuple[Optional[float], Optional[float], Optional[str]]:
    """
    Runs an overidentification test (Sargan-Hansen) if applicable.

    This test is only valid if the number of instruments exceeds the number
    of endogenous regressors (typically 1, the treatment variable).

    Requires results from a statsmodels IV estimation.

    Args:
        sm_results: The fitted results object from statsmodels IV2SLS.fit().
        df: Input DataFrame.
        treatment: Name of the treatment variable.
        outcome: Name of the outcome variable.
        instruments: List of instrument variable names.
        covariates: List of covariate names.

    Returns:
        Tuple: (test_statistic, p_value, status_message) or (None, None, error_message)
    """
    logger.info("Diagnostics: Running Overidentification Test...")
    num_instruments = len(instruments)
    num_endog = 1 # Assuming only one treatment variable is endogenous

    if num_instruments <= num_endog:
        logger.info("  Over-ID test not applicable (model is exactly identified or underidentified).")
        return None, None, "Test not applicable (Need more instruments than endogenous regressors)"

    if sm_results is None or not hasattr(sm_results, 'resid'):
        logger.warning("  Over-ID test requires valid statsmodels results object with residuals.")
        return None, None, "Statsmodels results object not available or invalid for test."

    try:
        # Statsmodels IV2SLSResults does not seem to have a direct method for this test (as of common versions).
        # We need to calculate it manually using residuals and instruments.
        # Formula: N * R^2 from regressing residuals (u_hat) on all exogenous variables (instruments + covariates).
        # Degrees of freedom = num_instruments - num_endogenous_vars

        residuals = sm_results.resid
        df_copy = df.copy()
        df_copy['intercept'] = 1
        exog_vars = ['intercept'] + covariates
        all_exog_instruments = list(dict.fromkeys(exog_vars + instruments))

        # Ensure columns exist in the dataframe before selecting
        missing_cols = [col for col in all_exog_instruments if col not in df_copy.columns]
        if missing_cols:
            raise ValueError(f"Missing columns required for Over-ID test: {missing_cols}")

        exog_for_test = df_copy[all_exog_instruments]

        # Check shapes match after potential NA handling in main estimator
        if len(residuals) != exog_for_test.shape[0]:
             # Attempt to align based on index if lengths differ (might happen if NAs were dropped)
            logger.warning(f"Residual length ({len(residuals)}) differs from exog_for_test rows ({exog_for_test.shape[0]}). Trying to align indices.")
            common_index = residuals.index.intersection(exog_for_test.index)
            if len(common_index) == 0:
                 raise ValueError("Cannot align residuals and exogenous variables for Over-ID test after NA handling.")
            residuals = residuals.loc[common_index]
            exog_for_test = exog_for_test.loc[common_index]
            logger.warning(f"Aligned to {len(common_index)} common observations.")


        # Regress residuals on all exogenous instruments
        aux_model = OLS(residuals, exog_for_test).fit()
        r_squared = aux_model.rsquared
        n_obs = len(residuals) # Use length of residuals after potential alignment

        test_statistic = n_obs * r_squared

        # Calculate p-value from Chi-squared distribution
        from scipy.stats import chi2
        degrees_of_freedom = num_instruments - num_endog
        if degrees_of_freedom < 0:
            # This shouldn't happen if the initial check passed, but as a safeguard
            raise ValueError("Degrees of freedom for Sargan test are negative.")
        elif degrees_of_freedom == 0:
            # R-squared should be 0 if exactly identified, but handle edge case
            p_value = 1.0 if np.isclose(test_statistic, 0) else 0.0
        else:
            p_value = chi2.sf(test_statistic, degrees_of_freedom)

        logger.info(f"  Sargan Test Statistic: {test_statistic:.4f}, p-value: {p_value:.4f}, df: {degrees_of_freedom}")
        return test_statistic, p_value, "Test successful"

    except Exception as e:
        logger.error(f"Error running overidentification test: {e}", exc_info=True)
        return None, None, f"Error during test: {e}"

def run_iv_diagnostics(df: pd.DataFrame, treatment: str, outcome: str, instruments: List[str], covariates: List[str], sm_results: Optional[Any] = None, dw_results: Optional[Any] = None) -> Dict[str, Any]:
    """
    Runs standard IV diagnostic checks.

    Args:
        df: Input DataFrame.
        treatment: Name of the treatment variable.
        outcome: Name of the outcome variable.
        instruments: List of instrument variable names.
        covariates: List of covariate names.
        sm_results: Optional fitted results object from statsmodels IV2SLS.fit().
        dw_results: Optional results object from DoWhy (structure may vary).

    Returns:
        Dictionary containing diagnostic results.
    """
    diagnostics = {}

    # 1. Instrument Relevance / Weak Instrument Test (First-Stage F-statistic)
    f_stat, f_p_val = calculate_first_stage_f_statistic(df, treatment, instruments, covariates)
    diagnostics['first_stage_f_statistic'] = f_stat
    diagnostics['first_stage_p_value'] = f_p_val
    diagnostics['is_instrument_weak'] = (f_stat < 10) if f_stat is not None else None # Common rule of thumb
    if f_stat is None:
        diagnostics['weak_instrument_test_status'] = "Error during calculation"
    elif diagnostics['is_instrument_weak']:
        diagnostics['weak_instrument_test_status'] = "Warning: Instrument(s) may be weak (F < 10)"
    else:
        diagnostics['weak_instrument_test_status'] = "Instrument(s) appear sufficiently strong (F >= 10)"


    # 2. Overidentification Test (e.g., Sargan-Hansen)
    overid_stat, overid_p_val, overid_status = run_overidentification_test(sm_results, df, treatment, outcome, instruments, covariates)
    diagnostics['overid_test_statistic'] = overid_stat
    diagnostics['overid_test_p_value'] = overid_p_val
    diagnostics['overid_test_status'] = overid_status
    diagnostics['overid_test_applicable'] = not ("not applicable" in overid_status.lower() if overid_status else True)

    # 3. Exogeneity/Exclusion Restriction (Conceptual Check)
    diagnostics['exclusion_restriction_assumption'] = "Assumed based on graph/input; cannot be statistically tested directly. Qualitative LLM check recommended."

    # Potential future additions:
    # - Endogeneity tests (e.g., Hausman test - requires comparing OLS and IV estimates)

    return diagnostics