# Utility functions for Difference-in-Differences import pandas as pd import logging logger = logging.getLogger(__name__) def create_post_indicator(df: pd.DataFrame, time_var: str, treatment_period_start: any) -> pd.Series: """Creates the post-treatment indicator variable. Checks if time_var is already a 0/1 indicator; otherwise, compares to treatment_period_start. """ try: time_var_series = df[time_var] # Ensure numeric for checks and direct comparison if pd.api.types.is_bool_dtype(time_var_series): time_var_series = time_var_series.astype(int) # Check if it's already a binary 0/1 indicator if pd.api.types.is_numeric_dtype(time_var_series): unique_vals = set(time_var_series.dropna().unique()) if unique_vals == {0, 1}: logger.info(f"Time variable '{time_var}' is already a binary 0/1 indicator. Using it directly as post indicator.") return time_var_series.astype(int) else: # Numeric, but not 0/1, so compare with treatment_period_start logger.info(f"Time variable '{time_var}' is numeric. Comparing with treatment_period_start: {treatment_period_start}") return (time_var_series >= treatment_period_start).astype(int) else: # Non-numeric and not boolean, will likely fall into TypeError for datetime conversion # This else block might not be strictly necessary if TypeError is caught below # but added for logical completeness before attempting datetime conversion. pass # Let it fall through to TypeError if not numeric here # If we reached here, it means it wasn't numeric or bool, try direct comparison which will likely raise TypeError # and be caught by the except block for datetime conversion if applicable. # This line is kept to ensure non-numeric non-datetime-like strings also trigger the except. return (df[time_var] >= treatment_period_start).astype(int) except TypeError: # If direct comparison fails (e.g., comparing datetime with int/str, or non-numeric string with number), # attempt to convert both to datetime objects for comparison. logger.info(f"Direct comparison/numeric check failed for time_var '{time_var}'. Attempting datetime conversion.") try: time_series_dt = pd.to_datetime(df[time_var], errors='coerce') # Try to convert treatment_period_start to datetime if it's not already # This handles cases where treatment_period_start might be a date string try: treatment_start_dt = pd.to_datetime(treatment_period_start) except Exception as e_conv: logger.error(f"Could not convert treatment_period_start '{treatment_period_start}' to datetime: {e_conv}") raise TypeError(f"treatment_period_start '{treatment_period_start}' could not be converted to a comparable datetime format.") if time_series_dt.isna().all(): # if all values are NaT after conversion raise ValueError(f"Time variable '{time_var}' could not be converted to datetime (all values NaT).") if pd.isna(treatment_start_dt): raise ValueError(f"Treatment start period '{treatment_period_start}' converted to NaT.") logger.info(f"Comparing time_var '{time_var}' (as datetime) with treatment_start_dt '{treatment_start_dt}' (as datetime).") return (time_series_dt >= treatment_start_dt).astype(int) except Exception as e: logger.error(f"Failed to compare time variable '{time_var}' with treatment start '{treatment_period_start}' using datetime logic: {e}", exc_info=True) raise TypeError(f"Could not compare time variable '{time_var}' with treatment start '{treatment_period_start}'. Ensure they are comparable or convertible to datetime. Error: {e}") except Exception as ex: # Catch any other unexpected errors during the initial numeric processing logger.error(f"Unexpected error processing time_var '{time_var}' for post indicator: {ex}", exc_info=True) raise TypeError(f"Unexpected error processing time_var '{time_var}': {ex}")