## This file contains the functions that uses the classes in generator.py to generate the synthetic data from .generator import PSMGenerator, PSWGenerator, IVGenerator, RDDGenerator, RCTGenerator, DiDGenerator, MultiTreatRCTGenerator, FrontDoorGenerator import pandas as pd import numpy as np from pathlib import Path import logging import logging.config import json from .util import export_info Path("reproduce_results/logs").mkdir(parents=True, exist_ok=True) logging.config.fileConfig('reproduce_results/log_config.ini') def config_hyperparameters(base_seed, base_mean, base_cov_diag, max_cont, max_bin, n_obs, max_obs, min_obs, max_treat=2, max_periods=5, cutoff_max=25): """ configure the hyperparameters for the data generation process. Args: base_seed (int): Base seed for random number generation base_mean (np.ndarray): Base mean vector for the covariates base_cov_diag (np.ndarray): Base (diagonal) covariance matrix for the covariates max_cont (int): Maximum number of continuous covariates max_bin (int): Maximum number of binary covariates n_obs (int): Number of observations to generate max_obs (int): Maximum number of observations to generate min_obs (int): Minimum number of observations to generate max_treat (int): Maximum number of treatment groups (default is 2) max_periods (int): Maximum number of periods for DiD data (default is 5) cutoff_max (int): Maximum value for the cutoff in RDD data (default is 25) Returns: dict: A dictionary containing the hyperparameters for data generation. (str) attribute -> (int) value """ base_cov_mat = np.diag(base_cov_diag) np.random.seed(base_seed) n_treat = np.random.randint(2, max_treat + 1) true_effect = np.random.uniform(1, 10) true_effect_vec = np.array([0] + [np.random.uniform(1, 10) for i in range(n_treat)]) n_continuous = np.random.randint(2, max_cont + 1) n_binary = np.random.randint(2, max_bin) n_observations = np.random.randint(min_obs, max_obs + 1) if n_obs is not None: n_observations = n_obs n_periods = np.random.randint(3, max_periods + 1) cutoff = np.random.randint(2, cutoff_max + 1) mean_vec = base_mean[0:n_continuous] cov_mat = base_cov_mat[0:n_continuous, 0:n_continuous] param_dict = {'tau': true_effect, 'continuous': n_continuous, 'binary': n_binary, 'obs': n_observations, 'mean': mean_vec, 'covar': cov_mat, 'tau_vec':true_effect_vec, "treat":n_treat, "periods": n_periods, 'cutoff':cutoff} return param_dict def generate_observational_data(base_mean, base_cov, dset_size, max_cont, max_bin, min_obs, max_obs, data_save_loc, metadata_save_loc, n_obs=None): """ Generate observational data using the PSMGenerator class. Args: base_mean (np.ndarray): Base mean vector for the covariates base_cov (np.ndarray): Base covariance matrix for the covariates dset_size (int): Number of datasets to generate max_cont (int): Maximum number of continuous covariates max_bin (int): Maximum number of binary covariates min_obs (int): Minimum number of observations to generate max_obs (int): Maximum number of observations to generate data_save_loc (str): Directory to save the generated data files metadata_save_loc (str): Directory to save the metadata information n_obs (int, None): number of observations. If None, it will be randomly generated within the range of min_obs and max_obs. """ logger = logging.getLogger("observational_data_logger") logger.info("Generating observational data") metadata_dict = {} base_seed = 31 for i in range(dset_size): logger.info("Iteration: {}".format(i)) seed = (i + 1) * base_seed params = config_hyperparameters(seed, base_mean, base_cov, max_cont, max_bin, n_obs, max_obs, min_obs) logger.info("n_observations:{}, n_continuous: {}, n_binary: {}".format( params['obs'], params['continuous'], params['binary'])) logger.info("true_effect: {}".format(params['tau'])) mean_vec = params['mean'] cov_mat = params['covar'] gen = PSMGenerator(params['obs'], params['continuous'], n_binary_covars=params['binary'], mean=mean_vec, covar=cov_mat, true_effect=params['tau'], seed=seed*2) data = gen.generate_data() name = "observational_data_{}.csv".format(i) data_dict = {"true_effect": params['tau'], "observation": params['obs'], "continuous": params['continuous'], "binary": params['binary'], "type": "observational"} test_result = gen.test_data() logger.info("Test result: {}\n".format(test_result)) metadata_dict[name] = data_dict gen.save_data(data_save_loc, name) export_info(metadata_dict, metadata_save_loc, "observational") def generate_rct_data(base_mean, base_cov, dset_size, max_cont, max_bin, min_obs, max_obs, data_save_loc, metadata_save_loc, n_obs=None): """ Generates RCT data Args: base_mean (np.ndarray): Base mean vector for the covariates base_cov (np.ndarray): Base covariance matrix for the covariates dset_size (int): Number of datasets to generate max_cont (int): Maximum number of continuous covariates max_bin (int): Maximum number of binary covariates min_obs (int): Minimum number of observations to generate max_obs (int): Maximum number of observations to generate data_save_loc (str): Directory to save the generated data files metadata_save_loc (str): Directory to save the metadata information n_obs (int, None): number of observations. If None, it will be randomly generated within the range of min_obs and max_obs. """ logger = logging.getLogger("rct_data_logger") logger.info("Generating RCT data") metadata_dict = {} base_seed = 197 for i in range(dset_size): logger.info("Iteration: {}".format(i)) seed = (i + 1) * base_seed params = config_hyperparameters(seed, base_mean, base_cov, max_cont, max_bin, n_obs, max_obs, min_obs) logger.info("n_observations:{}, n_continuous: {}, n_binary: {}".format( params['obs'], params['continuous'], params['binary'])) logger.info("true_effect: {}".format(params['tau'])) mean_vec = params['mean'] cov_mat = params['covar'] gen = RCTGenerator(params['obs'], params['continuous'], n_binary_covars=params['binary'], mean=mean_vec, covar=cov_mat, true_effect=params['tau'], seed=seed) data = gen.generate_data() test_result = gen.test_data() data_dict = {"true_effect": params['tau'], "observation": params['obs'], "continuous": params['continuous'], "binary": params['binary'], "type": "rct"} name = "rct_data_{}.csv".format(i) logger.info("Test result: {}\n".format(test_result)) metadata_dict[name] = data_dict gen.save_data(data_save_loc, name) export_info(metadata_dict, metadata_save_loc, "rct") def generate_multi_rct_data(base_mean, base_cov, dset_size, max_n_treat, max_cont, max_bin, min_obs, max_obs, data_save_loc, metadata_save_loc, n_obs=None): """ Generate multi-treatment RCT data Args: base_mean (np.ndarray): Base mean vector for the covariates base_cov (np.ndarray): Base covariance matrix for the covariates dset_size (int): Number of datasets to generate max_n_treat (int): Maximum number of treatment groups max_cont (int): Maximum number of continuous covariates max_bin (int): Maximum number of binary covariates min_obs (int): Minimum number of observations to generate max_obs (int): Maximum number of observations to generate data_save_loc (str): Directory to save the generated data files metadata_save_loc (str): Directory to save the metadata information n_obs (int, None): number of observations. If None, it will be randomly generated within the range of min_obs and max_obs. """ logger = logging.getLogger("multi_rct_data_logger") logger.info("Generating multi-treatment RCT data") metadata_dict = {} base_seed = 173 for i in range(dset_size): logger.info("Iteration: {}".format(i)) seed = (i+1) * base_seed params = config_hyperparameters(seed, base_mean, base_cov, max_cont, max_bin, n_obs, max_obs, min_obs, max_treat=max_n_treat) n_treat = params['treat'] logger.info("n_observations:{}, n_continuous: {}, n_binary: {}, n_treat: {}".format( params['obs'], params['continuous'], params['binary'], n_treat)) logger.info("true_effect: {}".format(params['tau_vec'])) mean_vec = params['mean'] cov_mat = params['covar'] gen = MultiTreatRCTGenerator(params['obs'], params['continuous'], params['treat'], n_binary_covars=params['binary'], mean=mean_vec, covar=cov_mat, true_effect_vec=params['tau_vec'], seed=seed, true_effect=params['tau']) data = gen.generate_data() test_result = gen.test_data() data_dict = {"true_effect": list(params['tau_vec']), "observation": params['obs'], "continuous": params['continuous'], "binary": params['binary'], "type": "multi_rct"} name = "multi_rct_data_{}.csv".format(i) logger.info("Test result: {}\n".format(test_result)) metadata_dict[name] = data_dict gen.save_data(data_save_loc, name) export_info(metadata_dict, metadata_save_loc, "multi_rct") def generate_frontdoor_data(base_mean, base_cov, dset_size, max_cont, max_bin, min_obs, max_obs, data_save_loc, metadata_save_loc, n_obs=None): """ Generates front-door data Args: base_mean (np.ndarray): Base mean vector for the covariates base_cov (np.ndarray): Base covariance matrix for the covariates dset_size (int): Number of datasets to generate max_cont (int): Max number of continuous covariates max_bin (int): Max number of binary covariates min_obs (int): Minimum number of observations max_obs (int): Maximum number of observations data_save_loc (str): Folder to save generated CSV files metadata_save_loc (str): Folder to save metadata JSON n_obs (int or None): Fixed number of observations (if provided) """ logger = logging.getLogger("frontdoor_data_logger") logger.info("Generating Front-Door synthetic data") metadata_dict = {} base_seed = 311 for i in range(dset_size): logger.info(f"Iteration: {i}") seed = (i + 1) * base_seed params = config_hyperparameters(seed, base_mean, base_cov, max_cont, max_bin, n_obs, max_obs, min_obs) logger.info("n_observations: {}, n_continuous: {}, n_binary: {}".format( params['obs'], params['continuous'], params['binary'])) logger.info("true_effect: {}".format(params['tau'])) mean_vec = params['mean'] cov_mat = params['covar'] gen = FrontDoorGenerator( n_observations=params['obs'], n_continuous_covars=params['continuous'], n_binary_covars=params['binary'], mean=mean_vec, covar=cov_mat, true_effect=params['tau'], seed=seed ) data = gen.generate_data() test_result = gen.test_data() logger.info("Test result: {}\n".format(test_result)) # Save CSV filename = f"frontdoor_data_{i}.csv" gen.save_data(data_save_loc, filename) # Metadata data_dict = { "true_effect": params['tau'], "observation": params['obs'], "continuous": params['continuous'], "binary": params['binary'], "type": "frontdoor" } metadata_dict[filename] = data_dict # Save metadata JSON export_info(metadata_dict, metadata_save_loc, "frontdoor") def generate_canonical_did_data(base_mean, base_cov, dset_size, max_cont, max_bin, min_obs, max_obs, data_save_loc, metadata_save_loc, n_obs=None): """ Generate canonical DiD data Args: base_mean (np.ndarray): Base mean vector for the covariates base_cov (np.ndarray): Base covariance matrix for the covariates dset_size (int): Number of datasets to generate max_cont (int): Maximum number of continuous covariates max_bin (int): Maximum number of binary covariates min_obs (int): Minimum number of observations to generate max_obs (int): Maximum number of observations to generate data_save_loc (str): Directory to save the generated data files metadata_save_loc (str): Directory to save the metadata information n_obs (int, None): number of observations. If None, it will be randomly generated within the range of min_obs and max_obs. """ logger = logging.getLogger("did_data_logger") logger.info("Generating canonical DiD data") metadata_dict = {} base_seed = 281 for i in range(dset_size): logger.info("Iteration: {}".format(i)) seed = (i + 1) * base_seed params = config_hyperparameters(seed, base_mean, base_cov, max_cont, max_bin, n_obs, max_obs, min_obs) logger.info("n_observations:{}, n_continuous: {}, n_binary: {}".format( params['obs'], params['continuous'], params['binary'])) logger.info("true_effect: {}".format(params['tau'])) mean_vec = params['mean'] cov_mat = params['covar'] gen = DiDGenerator(params['obs'], params['continuous'], n_binary_covars=params['binary'], mean=mean_vec, covar=cov_mat, true_effect=params['tau'], seed=seed) data = gen.generate_data() test_result = gen.test_data() data_dict = {"true_effect": params['tau'], "observation": params['obs'], "continuous": params['continuous'], "binary": params['binary'], "type": "did_canonical"} name = "did_canonical_data_{}.csv".format(i) logger.info("Test result: {}\n".format(test_result)) metadata_dict[name] = data_dict gen.save_data(data_save_loc, name) export_info(metadata_dict, metadata_save_loc, "did_canonical") def generate_data_iv(base_mean, base_cov, dset_size, max_cont, max_bin, min_obs, max_obs, data_save_loc, metadata_save_loc, n_obs=None): """ Generate IV data Args: base_mean (np.ndarray): Base mean vector for the covariates base_cov (np.ndarray): Base covariance matrix for the covariates dset_size (int): Number of datasets to generate max_cont (int): Maximum number of continuous covariates max_bin (int): Maximum number of binary covariates min_obs (int): Minimum number of observations to generate max_obs (int): Maximum number of observations to generate data_save_loc (str): Directory to save the generated data files metadata_save_loc (str): Directory to save the metadata information n_obs (int, None): number of observations. If None, it will be randomly generated within the range of min_obs and max_obs. """ logger = logging.getLogger("iv_data_logger") logger.info("Generating IV data") metadata_dict = {} base_seed = 343 for i in range(dset_size): logger.info("Iteration: {}".format(i)) seed = (i + 1) * base_seed params = config_hyperparameters(seed, base_mean, base_cov, max_cont, max_bin, n_obs, max_obs, min_obs) logger.info("n_observations:{}, n_continuous: {}, n_binary: {}".format( params['obs'], params['continuous'], params['binary'])) logger.info("true_effect: {}".format(params['tau'])) mean_vec = params['mean'] cov_mat = params['covar'] gen = IVGenerator(params['obs'], params['continuous'], n_binary_covars=params['binary'], mean=mean_vec, covar=cov_mat, true_effect=params['tau'], seed=seed) data = gen.generate_data() test_result = gen.test_data() data_dict = {"true_effect": params['tau'], "observation": params['obs'], "continuous": params['continuous'], "binary": params['binary'], "type": "IV"} name = "iv_data_{}.csv".format(i) logger.info("Test result: {}\n".format(test_result)) metadata_dict[name] = data_dict gen.save_data(data_save_loc, name) export_info(metadata_dict, metadata_save_loc, "iv") def generate_twfe_did_data(base_mean, base_cov, dset_size, max_cont, max_bin, n_periods, min_obs, max_obs, data_save_loc, metadata_save_loc, n_obs=None): """ Generate TWFE DiD data Args: base_mean (np.ndarray): Base mean vector for the covariates base_cov (np.ndarray): Base covariance matrix for the covariates dset_size (int): Number of datasets to generate max_cont (int): Maximum number of continuous covariates max_bin (int): Maximum number of binary covariates n_periods (int): Number of periods for the DiD data min_obs (int): Minimum number of observations to generate max_obs (int): Maximum number of observations to generate data_save_loc (str): Directory to save the generated data files metadata_save_loc (str): Directory to save the metadata information n_obs (int, None): number of observations. If None, it will be randomly generated within the range of min_obs and max_obs. """ logger = logging.getLogger("did_data_logger") logger.info("Generating TWFE DiD data") metadata_dict = {} base_seed = 447 print("preiods: ", n_periods) for i in range(dset_size): logger.info("Iteration: {}".format(i)) seed = (i + 1) * base_seed params = config_hyperparameters(seed, base_mean, base_cov, max_cont, max_bin, n_obs, max_obs, min_obs, max_periods=n_periods) logger.info("n_observations:{}, n_continuous: {}, n_binary: {}, n_periods:{}".format( params['obs'], params['continuous'], params['binary'], params['periods'])) logger.info("true_effect: {}".format(params['tau'])) mean_vec = params['mean'] cov_mat = params['covar'] gen = DiDGenerator(params['obs'], params['continuous'], n_binary_covars=params['binary'], mean=mean_vec, covar=cov_mat, true_effect=params['tau'], seed=seed, n_periods=n_periods) data = gen.generate_data() test_result = gen.test_data() data_dict = {"true_effect": params['tau'], "observation": params['obs'], "continuous": params['continuous'], "binary": params['binary'], "type": "did_twfe", "periods": params['periods']} name = "did_twfe_data_{}.csv".format(i) logger.info("Test result: {}\n".format(test_result)) metadata_dict[name] = data_dict gen.save_data(data_save_loc, name) export_info(metadata_dict, metadata_save_loc, "did_twfe") def generate_encouragement_data(base_mean, base_cov, dset_size, max_cont, max_bin, min_obs, max_obs, data_save_loc, metadata_save_loc, n_obs=None): """ Generate encouragement design data Args: base_mean (np.ndarray): Base mean vector for the covariates base_cov (np.ndarray): Base covariance matrix for the covariates dset_size (int): Number of datasets to generate max_cont (int): Maximum number of continuous covariates max_bin (int): Maximum number of binary covariates min_obs (int): Minimum number of observations to generate max_obs (int): Maximum number of observations to generate data_save_loc (str): Directory to save the generated data files metadata_save_loc (str): Directory to save the metadata information n_obs (int, None): number of observations. If None, it will be randomly generated within the range of min_obs and max_obs. """ logger = logging.getLogger("iv_data_logger") logger.info("Generating encouragement design data") metadata_dict = {} base_seed = 571 for i in range(dset_size): logger.info("Iteration: {}".format(i)) seed = (i + 1) * base_seed params = config_hyperparameters(seed, base_mean, base_cov, max_cont, max_bin, n_obs, max_obs, min_obs) logger.info("n_observations:{}, n_continuous: {}, n_binary: {}".format( params['obs'], params['continuous'], params['binary'])) logger.info("true_effect: {}".format(params['tau'])) mean_vec = params['mean'] cov_mat = params['covar'] gen = IVGenerator(params['obs'], params['continuous'], n_binary_covars=params['binary'], mean=mean_vec, covar=cov_mat, true_effect=params['tau'], seed=seed, encouragement=True) data = gen.generate_data() test_result = gen.test_data() data_dict = {"true_effect": params['tau'], "observation": params['obs'], "continuous": params['continuous'], "binary": params['binary'], "type": "encouragement"} name = "iv_encouragement_data_{}.csv".format(i) logger.info("Test result: {}\n".format(test_result)) metadata_dict[name] = data_dict gen.save_data(data_save_loc, name) export_info(metadata_dict, metadata_save_loc, "iv_encouragement") def generate_rdd_data(base_mean, base_cov, dset_size, max_cont, max_bin, max_cutoff, min_obs, max_obs, data_save_loc, metadata_save_loc, n_obs=None): """ Generates (sharp) RDD data Args: base_mean (np.ndarray): Base mean vector for the covariates base_cov (np.ndarray): Base covariance matrix for the covariates dset_size (int): Number of datasets to generate max_cont (int): Maximum number of continuous covariates max_bin (int): Maximum number of binary covariates max_cutoff (int): Maximum value for the cutoff in RDD data min_obs (int): Minimum number of observations to generate max_obs (int): Maximum number of observations to generate data_save_loc (str): Directory to save the generated data files metadata_save_loc (str): Directory to save the metadata information n_obs (int, None): number of observations. If None, it will be randomly generated within the range of min_obs and max_obs. """ logger = logging.getLogger("rdd_data_logger") logger.info("Generating RDD data") metadata_dict = {} base_seed = 683 for i in range(dset_size): logger.info("Iteration:{}".format(i)) seed = (i + 1) * base_seed params = config_hyperparameters(seed, base_mean, base_cov, max_cont, max_bin, n_obs, max_obs, min_obs, cutoff_max=max_cutoff) logger.info("n_observations:{}, n_continuous: {}, n_binary: {}, cutoff:{}".format( params['obs'], params['continuous'], params['binary'], params['cutoff'])) logger.info("true_effect: {}".format(params['tau'])) mean_vec = params['mean'] cov_mat = params['covar'] gen = RDDGenerator(params['obs'], params['continuous'], n_binary_covars=params['binary'], mean=mean_vec, covar=cov_mat, true_effect=params['tau'], seed=seed, cutoff=params['cutoff'], plot=True) data = gen.generate_data() test_result = gen.test_data() data_dict = {"true_effect": params['tau'], "observation": params['obs'], "continuous": params['continuous'], "binary": params['binary'], "type": "rdd", 'cutoff': params['cutoff']} name = "rdd_data_{}.csv".format(i) logger.info("Test result: {}\n".format(test_result)) metadata_dict[name] = data_dict gen.save_data(data_save_loc, name) export_info(metadata_dict, metadata_save_loc, "rdd")