## This code contains the base classess used in generating synthetic data from linearmodels.iv import IV2SLS from dowhy import CausalModel from dowhy import datasets as dset from sklearn.linear_model import LogisticRegression import statsmodels.api as sm import statsmodels.formula.api as smf import numpy as np import pandas as pd from pathlib import Path import matplotlib.pyplot as plt class DataGenerator: """ Base class for generating synthetic data Attributes: n_observations (int): Number of observations n_continuous_covars (int): Number of covariates n_covars (int): total number of covariates (continuous + binary) n_treatments (int): Number of treatments true_effect (float): True effect size seed (int): Random seed for reproducibility data (pd.DataFrame): Generated data info (dict): Dictionary to store additional information about the data method (str): the causal inference method assocated with the synthetic mean (np.ndarray): mean of the covariates covar (np.ndarray): covariance matrix for the covariates heterogeneity (bool): whether or not the treatment effects are heterogeneous """ def __init__(self, n_observations, n_continuous_covars, n_binary_covars=2, mean=None, covar = None, n_treatments=1, true_effect=0 ,seed=111, heterogeneity=0): np.random.seed(seed) self.n_observations = n_observations self.n_continuous_covars = n_continuous_covars self.n_covars = n_continuous_covars + n_binary_covars self.n_treatments = n_treatments self.n_binary_covars = n_binary_covars self.data = None self.seed = seed self.true_effect = true_effect self.method = None self.mean = mean self.covar = covar if mean is None: self.mean = np.random.randint(3, 20, size=self.n_continuous_covars) if self.covar is None: self.covar = np.identity(self.n_continuous_covars) self.heterogeneity = heterogeneity def generate_data(self): """ Generates the synthetic data Returns: pd.DataFrame: The generated data """ raise NotImplementedError("Invoke the method in the subclass") def save_data(self, folder, filename): """ Saves the generated data as a CSV file Args: folder (str): path to the folder where the data is saved filename (str): name of the file """ if self.data is None: raise ValueError("Data not generated yet. Please generate data first.") path = Path(folder) path.mkdir(parents=True, exist_ok=True) if not filename.endswith('.csv'): filename += '.csv' self.data.to_csv(path / filename, index=False) def test_data(self, print_=False): """ Test the generated data, using the appropriate method. """ raise NotImplementedError("This method should be overridden by subclasses") def generate_covariates(self): """ Generate covariates. For continuous covariates, we use multivariate normal distribution, and for binary covars, we use binomial distribution. The non-binary covariates are discretized to their floor integer. """ X_c = np.random.multivariate_normal(mean=self.mean, cov=self.covar, size=self.n_observations) p = np.random.uniform(0.3, 0.7) X_b = np.random.binomial(1, p, size=(self.n_observations, self.n_binary_covars)).astype(int) covariates = np.hstack((X_c, X_b)) covariates = covariates.astype(int) return covariates class MultiTreatRCTGenerator(DataGenerator): """ Base class for generating synthetic data for multi-treatment RCTs Additional Attributes: true_effect_vec (np.ndarray): the treatment effect for different treatments. """ def __init__(self, n_observations, n_continuous_covars, n_treatments, n_binary_covars=2, mean=None, covar=None, true_effect=1.0, true_effect_vec = None, seed=111, heterogeneity=0): super().__init__(n_observations, n_continuous_covars, n_binary_covars=n_binary_covars, mean=mean, covar=covar, true_effect=true_effect, seed=seed, heterogeneity=heterogeneity, n_treatments=n_treatments) self.method = "MultiTreatRCT" self.true_effect_vec = true_effect_vec ## if true effect vec is None, we set the treatment effects to be the same for all treatments if true_effect_vec is None: self.true_effect_vec = np.zeros(n_treatments) for i in range(1, n_treatments): self.true_effect_vec[i] = self.true_effect def generate_data(self): X = self.generate_covariates() cols = [f"X{i+1}" for i in range(self.n_covars)] df = pd.DataFrame(X, columns=cols) df['D'] = np.random.randint(0, self.n_treatments+1, size=self.n_observations) vec = np.random.uniform(0, 1, size=self.n_covars) intercept = np.random.normal(50, 3) noise = np.random.normal(0, 1, size=self.n_observations) # Apply appropriate treatment effect per treatment arm treatment_effects = np.array(self.true_effect_vec) df['treat_effect'] = treatment_effects[df['D']] df['Y'] = intercept + X.dot(vec) + df['treat_effect'] + noise df.drop(columns='treat_effect', inplace=True) self.data = df return df def test_data(self, print_=False): if self.data is None: raise ValueError("Data not generated yet. Please generate data first.") model = smf.ols('Y ~ C(D)', data=self.data).fit() result = model.summary() if print_: print(result) return result # Front-Door Criterion Generator class FrontDoorGenerator(DataGenerator): """ Generates synthetic data satisfying the front-door criterion. D → M → Y, D ← U → Y """ def __init__(self, n_observations, n_continuous_covars=2, n_binary_covars=2, mean=None, covar=None, seed=111, true_effect=2.0, heterogeneity=0): super().__init__(n_observations, n_continuous_covars, n_binary_covars=n_binary_covars, mean=mean, covar=covar, seed=seed, true_effect=true_effect, n_treatments=1, heterogeneity=heterogeneity) self.method = "FrontDoor" def generate_data(self): X = self.generate_covariates() cols = [f"X{i+1}" for i in range(self.n_covars)] df = pd.DataFrame(X, columns=cols) # Latent confounder U = np.random.normal(0, 1, self.n_observations) # Treatment depends on U and X vec_d = np.random.uniform(0.5, 1.5, size=self.n_covars) df['D'] = (X @ vec_d + 0.8 * U + np.random.normal(0, 1, self.n_observations)) > 0 df['D'] = df['D'].astype(int) # Mediator depends on D and X vec_m = np.random.uniform(0.5, 1.5, size=self.n_covars) df['M'] = X @ vec_m + df['D'] * 1.5 + np.random.normal(0, 1, self.n_observations) # Outcome depends on M, U and X vec_y = np.random.uniform(0.5, 1.5, size=self.n_covars) df['Y'] = 50 + 2.0 * df['M'] + 1.0 * U + X @ vec_y + np.random.normal(0, 1, self.n_observations) self.data = df return df def test_data(self, print_=False): if self.data is None: raise ValueError("Data not generated yet. Please generate data first.") model_m = smf.ols("M ~ D", data=self.data).fit() model_y = smf.ols("Y ~ M + D", data=self.data).fit() if print_: print("Regression: M ~ D") print(model_m.summary()) print("\nRegression: Y ~ M + D") print(model_y.summary()) return {"M~D": model_m.summary(), "Y~M+D": model_y.summary()} class ObservationalDataGenerator(DataGenerator): """ Generate synthetic data for observational studies. Additional Attributes: self.weights (np.ndarray): the propoensity score weights for each observation """ def __init__(self, n_observations, n_continuous_covars, n_binary_covars=2, mean=None, covar=None, true_effect=1.0, seed=111, heterogeneity=0): super().__init__(n_observations, n_continuous_covars, n_binary_covars=n_binary_covars, mean=mean, covar=covar, true_effect=true_effect, seed=seed, heterogeneity=heterogeneity) def generate_data(self): X = self.generate_covariates() cols = [f"X{i+1}" for i in range(self.n_covars)] df = pd.DataFrame(X, columns=cols) X_norm = (X - X.mean(axis=0)) / X.std(axis=0) vec1 = np.random.normal(0, 0.5, size=self.n_covars) lin = X_norm @ vec1 + np.random.normal(0, 1, self.n_observations) ## the propensity score ps = 1 / (1 + np.exp(-lin)) ## we do this for stability reasons ps = np.clip(ps, 1e-3, 1 -1e-3) df['D'] = np.random.binomial(1, ps).astype(int) vec2 = np.random.normal(0, 0.5, size=self.n_covars) intercept = np.random.normal(50, 3) noise = np.random.normal(0, 1, size=self.n_observations) df['Y'] = intercept + X @ vec2 + self.true_effect * df['D'] + noise self.propensity = ps self.weights = np.where(df['D'] == 1, 1 / ps, 1 / (1 - ps)) self.data = df return self.data class PSMGenerator(ObservationalDataGenerator): """ Generate synthetic data for Propensity Score Matching (PSM) """ def __init__(self, n_observations, n_continuous_covars, n_binary_covars=2, mean=None, covar=None, true_effect=1.0, seed=111, heterogeneity=0): super().__init__(n_observations, n_continuous_covars, n_binary_covars=n_binary_covars, mean=mean, covar=covar, true_effect=true_effect, seed=seed, heterogeneity=heterogeneity) self.method = "PSM" def test_data(self, print_=False): """ Test the generated data """ if self.data is None: raise ValueError("Data not generated yet. Please generate data first.") lr = LogisticRegression(solver='lbfgs') X = self.data[[f"X{i+1}" for i in range(self.n_covars)]] lr.fit(X, self.data['D']) ps_hat = lr.predict_proba(X)[:, 1] treated = self.data[self.data['D'] == 1] control = self.data[self.data['D'] == 0] ## perform matching using the propensity scores match_idxs = [np.abs(ps_hat[control.index] - ps_hat[i]).argmin() for i in treated.index] matches = control.iloc[match_idxs] att = treated['Y'].mean() - matches['Y'].mean() result = f"Estimated ATT (matching): {att:.3f} | True: {self.true_effect}" if print_: print(result) return result class PSWGenerator(ObservationalDataGenerator): """ Generate synthetic data for Propensity Score Weighting (PSW) """ def __init__(self, n_observations, n_continuous_covars, n_binary_covars=2, mean=None, covar=None, true_effect=1.0, seed=111, heterogeneity=0): super().__init__(n_observations, n_continuous_covars, n_binary_covars=n_binary_covars, mean=mean, covar=covar, true_effect=true_effect, seed=seed, heterogeneity=heterogeneity) self.method = "PSW" def test_data(self, print_=False): """ Test the generated data """ if self.data is None: raise ValueError("Data not generated yet. Please generate data first.") df = self.data.copy() D = df['D'] Y = df['Y'] treated = D == 1 control = D == 0 w = np.zeros(self.n_observations) w[control] = self.propensity[control] / (1 - self.propensity[control]) w[treated] = 1 Y1 = Y[treated].mean() Y0_weighted = np.average(Y[control], weights=w[control]) att = Y1 - Y0_weighted ate = np.average(Y * D / self.propensity - (1 - D) * Y / (1 - self.propensity)) result = f"Estimated ATT (IPW): {att:.3f} | True: {self.true_effect}\nEstimated ATE: {ate:.3f} | True:{self.true_effect}" if print_: print(result) return result class RCTGenerator(DataGenerator): """ Generate synthetic data for Randomized Controlled Trials (RCT) """ def __init__(self, n_observations, n_continuous_covars, n_binary_covars=2, mean=None, covar=None, true_effect=1.0, seed=111, heterogeneity=0): super().__init__(n_observations, n_continuous_covars, n_binary_covars=n_binary_covars, mean=mean, covar=covar, true_effect=true_effect, seed=seed, heterogeneity=heterogeneity) self.method = "RCT" def generate_data(self): X = self.generate_covariates() cols = [f"X{i+1}" for i in range(self.n_covars)] df = pd.DataFrame(X, columns=cols) df['D'] = np.random.binomial(1, 0.5, size=self.n_observations) vec = np.random.uniform(0, 1, size=self.n_covars) intercept = np.random.normal(50, 3) noise = np.random.normal(0, 1, size=self.n_observations) df['Y'] = (intercept + X.dot(vec) + self.true_effect * df['D'] + noise) self.data = df def test_data(self, print=False): if self.data is None: raise ValueError("Data not generated yet. Please generate data first.") model = smf.ols('Y ~ D', data=self.data).fit() result = model.summary() if print: print(result) est = model.params['D'] conf_int = model.conf_int().loc['D'] result = f"TRUE ATE: {self.true_effect:.3f}, ESTIMATED ATE: {est:.3f}, \ 95% CI: [{conf_int[0]:.3f}, {conf_int[1]:.3f}]" return result class IVGenerator(DataGenerator): """ Generate synthetic data for Instrumental Variables (IV) analysis. We assume two forms: 1. Encouragement Design: Z -> D -> Y In this setting, encouragements (Z) is randomized. For instance, consider the administering of vaccines. We cannot force people to take vaccines, however we can encourage them to take the vaccine. We could run a vaccine awareness campaign, where we randomly pick participants, and inform them about the benefits of vaccine. The user can either comply (take the vaccine) or not comply (not take the vaccine). Likewise, in the control group, the user can comply (not take the vaccine) or defy (take the vaccine) 2. U / \ Z -> D -> Y This is the classical setting where we have an unobserved confounder affecting both treatment (D) and outcome (Y). Additional Attributes: alpha (float): the effect of the instrument on the treatment (Z on D) encouragement (bool): whether or not this is an encouragement design beta_d (float): effect of the unobserved confounder (U) on treatment (D) beta_y (float): effect of the unobserved confounders (U) on outcome (Y) """ def __init__(self, n_observations, n_continuous_covars, n_binary_covars=2, mean=None, beta_d = 1.0, beta_y = 1.5, covar=None, true_effect=1.0, seed=111, heterogeneity=0, alpha=0.5, encouragement=False): super().__init__(n_observations, n_continuous_covars, n_binary_covars=n_binary_covars, mean=mean, covar=covar, true_effect=true_effect, seed=seed, heterogeneity=heterogeneity) self.method = "IV" self.alpha = alpha self.encouragement = encouragement self.beta_d = beta_d self.beta_y = beta_y def generate_data(self): X = self.generate_covariates() mean = np.random.randint(8, 13) Z = np.random.normal(mean, 2, size=self.n_observations).astype(int) U = np.random.normal(0, 1, size=self.n_observations) vec1 = np.random.normal(0, 0.5, size=self.n_covars) intercept1 = np.random.normal(30, 2) D = self.alpha * Z + X @ vec1 + np.random.normal(size=self.n_observations) + intercept1 if self.encouragement: D = (D > np.mean(D)).astype(int) else: D = D + self.beta_d * U D = D.astype(int) intercept2 = np.random.normal(50, 3) vec2 = np.random.normal(0, 0.5, size=self.n_covars) Y = self.true_effect * D + X @ vec2 + np.random.normal(size=self.n_observations) + intercept2 if not self.encouragement: Y = Y + self.beta_y * U df = pd.DataFrame(X, columns=[f"X{i+1}" for i in range(self.n_covars)]) df['Z'] = Z df['D'] = D df['Y'] = Y self.data = df return self.data def test_data(self, print_=False): if self.data is None: raise ValueError("Data not generated yet.") model = IV2SLS.from_formula('Y ~ 1 + [D ~ Z]', data=self.data).fit() est = model.params['D'] conf_int = model.conf_int().loc['D'] result = f"TRUE LATE: {self.true_effect:.3f}, ESTIMATED LATE: {est:.3f}, \ 95% CI: [{conf_int[0]:.3f}, {conf_int[1]:.3f}]" if print_: print(result) return result class RDDGenerator(DataGenerator): """ Generate synthetic data for (sharp) Regression Discontinuity Design (RDD). Additional Attributes: cutoff (float): the cutoff for treatment assignment bandwidth (float): the bandwidth for the running variable we consider when estimating the treatment effects plot (bool): whether we plot the data or not """ def __init__(self, n_observations, n_continuous_covars, n_binary_covars=2, mean=None, plot=False, covar=None, true_effect=1.0, seed=111, heterogeneity=0, cutoff=10, bandwidth=0.1): super().__init__(n_observations, n_continuous_covars, n_binary_covars=n_binary_covars, mean=mean, covar=covar, true_effect=true_effect, seed=seed, heterogeneity=heterogeneity) self.cutoff = cutoff self.bandwidth = bandwidth self.method = "RDD" self.plot=plot print("self.plot", self.plot) def generate_data(self): X = self.generate_covariates() cols = [f"X{i+1}" for i in range(self.n_covars)] df = pd.DataFrame(X, columns=cols) df['running_X'] = np.random.normal(0, 2, size=self.n_observations) + self.cutoff df['D'] = (df['running_X'] >= self.cutoff).astype(int) intercept = 10 coeffs = np.random.normal(0, 0.1, size=self.n_covars) ## slope of the line below the threshold m_below = 1.5 ## slope of the line above the threshold m_above = 0.8 df['running_centered'] = df['running_X'] - self.cutoff # Use centered version for slope df["Y"] = (intercept + self.true_effect * df['D'] + m_below * df['running_centered'] * (1 - df['D']) + \ m_above * df['running_centered'] * df['D'] + X @ coeffs + np.random.normal(0, 0.5, size=self.n_observations)) if self.plot: plt.figure(figsize=(10, 6)) plt.scatter(df[df['D']==0]['running_X'], df[df['D']==0]['Y'], alpha=0.5, label='Control', color='blue') plt.scatter(df[df['D']==1]['running_X'], df[df['D']==1]['Y'], alpha=0.5, label='Treatment', color='red') plt.axvline(self.cutoff, color='black', linestyle='--', label='Cutoff') plt.show() self.data = df[[cols for cols in df.columns if cols != 'running_centered']] return self.data def test_data(self, print_=False): if self.data is None: raise ValueError("Data not generated yet.") df = self.data.copy() df['running_adj'] = df['running_X'].astype(float) - self.cutoff df = df[np.abs(df['running_adj']) <= self.bandwidth].copy() model = smf.ols('Y ~ D + running_adj + D:running_adj', data=df).fit() est = model.params['D'] conf_int = model.conf_int().loc['D'] result = f"TRUE LATE: {self.true_effect:.3f}, ESTIMATED LATE: {est:.3f}, \ 95% CI: [{conf_int[0]:.3f}, {conf_int[1]:.3f}]" if print_: print(result) return result class DiDGenerator(DataGenerator): """ Generate synthetic data for Difference-in-Differences (DiD) analysis Additional Attributes: 1. n_periods (int): number of time-periods """ def __init__(self, n_observations, n_continuous_covars, n_binary_covars=2, n_periods=2, mean=None, covar=None, true_effect=1.0, seed=111, heterogeneity=0): super().__init__(n_observations, n_continuous_covars, n_binary_covars=n_binary_covars, mean=mean, covar=covar, true_effect=true_effect, seed=seed, heterogeneity=heterogeneity) self.method = "DiD" self.n_periods = n_periods def canonical_did_model(self): """ This is the classical DiD setting with two periods (pre and post treatment) and two groups (treatment and control) """ ## fraction of observations that receives the treatment frac_treated = np.random.uniform(0.35, 0.65) n_treated = int(frac_treated * self.n_observations) unit_ids = np.arange(self.n_observations) treatment_status = np.zeros(self.n_observations, dtype=int) treatment_status[:n_treated] = 1 np.random.shuffle(treatment_status) X = self.generate_covariates() cols = [f"X{i+1}" for i in range(self.n_covars)] covar_df = pd.DataFrame(X, columns=cols) vec = np.random.normal(0, 0.1, size=self.n_covars) intercept = np.random.normal(50, 3) treat_effect = np.random.normal(0, 1) time_effect = np.random.normal(0, 1) covar_term = X @ vec pre_noise = np.random.normal(0, 1, self.n_observations) pre_outcome = intercept + covar_term + pre_noise + treat_effect * treatment_status pre_data = pd.DataFrame({'unit_id': unit_ids, 'post': 0, 'D': treatment_status, 'Y': pre_outcome}) post_noise = np.random.normal(0, 1, self.n_observations) post_outcome = (intercept + time_effect + covar_term + self.true_effect * treatment_status + treat_effect * treatment_status + post_noise) post_data = pd.DataFrame({'unit_id': unit_ids, 'post': 1, 'D': treatment_status, 'Y': post_outcome}) df = pd.concat([pre_data, post_data], ignore_index=True) df = df.merge(covar_df, left_on="unit_id", right_index=True) return df[['unit_id', 'post', 'D', 'Y'] + cols] def twfe_model(self): """ Generate panel data for Two-Way Fixed Effects DiD model. This is a generalization of 2-period DiD for multi-year treatments """ ## fraction of observations that receives the treatment frac_treated = np.random.uniform(0.35, 0.65) unit_ids = np.arange(1, self.n_observations + 1) time_periods = np.arange(0, self.n_periods) df = pd.DataFrame([(i, t) for i in unit_ids for t in time_periods], columns=["unit", "time"]) X = self.generate_covariates() for j in range(self.n_covars): df[f"X{j+1}"] = np.repeat(X[:, j], self.n_periods) ## Assign treatment timing n_treated = int(frac_treated * self.n_observations) treated_units = np.random.choice(unit_ids, size=n_treated, replace=False) treatment_start = {unit: np.random.randint(1, self.n_periods) for unit in treated_units} df["treat_post"] = df.apply(lambda row: int(row["unit"] in treatment_start and row["time"] >= treatment_start[row["unit"]]),axis=1) ## State fixed effects unit_effects = dict(zip(unit_ids, np.random.normal(0, 1.0, self.n_observations))) ## Time fixed effects time_effects = dict(zip(time_periods, np.random.normal(0, 1, len(time_periods)))) df["unit_fe"] = df["unit"].map(unit_effects) df["time_fe"] = df["time"].map(time_effects) covar_effects = np.random.normal(0, 0.1, self.n_covars) X_matrix = df[[f"X{j+1}" for j in range(self.n_covars)]].values covar_term = X_matrix @ covar_effects intercept = np.random.normal(50, 3) noise = np.random.normal(0, 1, len(df)) df["Y"] = intercept + covar_term + df["unit_fe"] + df["time_fe"] + self.true_effect * df["treat_post"] + noise final_df = df[["unit", "time", "treat_post", "Y"] + [f"X{j+1}" for j in range(self.n_covars)]] final_df = final_df.rename(columns={"time": "year", "treat_post": "D"}) return final_df def generate_data(self): if self.n_periods == 2: self.data = self.canonical_did_model() else: self.data = self.twfe_model() return self.data def test_data(self, print_=False): estimated_att = None if self.data is None: raise ValueError("Data not generated yet.") if self.n_periods == 2: print("Testing canonical DiD model") model = smf.ols('Y ~ D * post', data=self.data).fit() estimated_att = model.params['D:post'] conf_int = model.conf_int().loc['D:post'] else: print("Testing TWFE model") model = smf.ols('Y ~ D + C(unit) + C(year)', data=self.data).fit() estimated_att = model.params['D'] conf_int = model.conf_int().loc['D'] result = "TRUE ATT: {:.3f}, EMPRICAL ATT:{:.3f}\nCONFIDENCE INTERVAL:{}".format( self.true_effect, estimated_att, conf_int) if print_: print(result) return result