Spaces:
Running
Running
## This file contains the functions that uses the classes in generator.py to generate the synthetic data | |
from .generator import PSMGenerator, PSWGenerator, IVGenerator, RDDGenerator, RCTGenerator, DiDGenerator, MultiTreatRCTGenerator, FrontDoorGenerator | |
import pandas as pd | |
import numpy as np | |
from pathlib import Path | |
import logging | |
import logging.config | |
import json | |
from .util import export_info | |
Path("reproduce_results/logs").mkdir(parents=True, exist_ok=True) | |
logging.config.fileConfig('reproduce_results/log_config.ini') | |
def config_hyperparameters(base_seed, base_mean, base_cov_diag, max_cont, max_bin, n_obs, | |
max_obs, min_obs, max_treat=2, max_periods=5, cutoff_max=25): | |
""" | |
configure the hyperparameters for the data generation process. | |
Args: | |
base_seed (int): Base seed for random number generation | |
base_mean (np.ndarray): Base mean vector for the covariates | |
base_cov_diag (np.ndarray): Base (diagonal) covariance matrix for the covariates | |
max_cont (int): Maximum number of continuous covariates | |
max_bin (int): Maximum number of binary covariates | |
n_obs (int): Number of observations to generate | |
max_obs (int): Maximum number of observations to generate | |
min_obs (int): Minimum number of observations to generate | |
max_treat (int): Maximum number of treatment groups (default is 2) | |
max_periods (int): Maximum number of periods for DiD data (default is 5) | |
cutoff_max (int): Maximum value for the cutoff in RDD data (default is 25) | |
Returns: | |
dict: A dictionary containing the hyperparameters for data generation. | |
(str) attribute -> (int) value | |
""" | |
base_cov_mat = np.diag(base_cov_diag) | |
np.random.seed(base_seed) | |
n_treat = np.random.randint(2, max_treat + 1) | |
true_effect = np.random.uniform(1, 10) | |
true_effect_vec = np.array([0] + [np.random.uniform(1, 10) for i in range(n_treat)]) | |
n_continuous = np.random.randint(2, max_cont + 1) | |
n_binary = np.random.randint(2, max_bin) | |
n_observations = np.random.randint(min_obs, max_obs + 1) | |
if n_obs is not None: | |
n_observations = n_obs | |
n_periods = np.random.randint(3, max_periods + 1) | |
cutoff = np.random.randint(2, cutoff_max + 1) | |
mean_vec = base_mean[0:n_continuous] | |
cov_mat = base_cov_mat[0:n_continuous, 0:n_continuous] | |
param_dict = {'tau': true_effect, 'continuous': n_continuous, 'binary': n_binary, | |
'obs': n_observations, 'mean': mean_vec, 'covar': cov_mat, | |
'tau_vec':true_effect_vec, "treat":n_treat, "periods": n_periods, | |
'cutoff':cutoff} | |
return param_dict | |
def generate_observational_data(base_mean, base_cov, dset_size, max_cont, max_bin, min_obs, | |
max_obs, data_save_loc, metadata_save_loc, n_obs=None): | |
""" | |
Generate observational data using the PSMGenerator class. | |
Args: | |
base_mean (np.ndarray): Base mean vector for the covariates | |
base_cov (np.ndarray): Base covariance matrix for the covariates | |
dset_size (int): Number of datasets to generate | |
max_cont (int): Maximum number of continuous covariates | |
max_bin (int): Maximum number of binary covariates | |
min_obs (int): Minimum number of observations to generate | |
max_obs (int): Maximum number of observations to generate | |
data_save_loc (str): Directory to save the generated data files | |
metadata_save_loc (str): Directory to save the metadata information | |
n_obs (int, None): number of observations. If None, it will be randomly | |
generated within the range of min_obs and max_obs. | |
""" | |
logger = logging.getLogger("observational_data_logger") | |
logger.info("Generating observational data") | |
metadata_dict = {} | |
base_seed = 31 | |
for i in range(dset_size): | |
logger.info("Iteration: {}".format(i)) | |
seed = (i + 1) * base_seed | |
params = config_hyperparameters(seed, base_mean, base_cov, max_cont, max_bin, | |
n_obs, max_obs, min_obs) | |
logger.info("n_observations:{}, n_continuous: {}, n_binary: {}".format( | |
params['obs'], params['continuous'], params['binary'])) | |
logger.info("true_effect: {}".format(params['tau'])) | |
mean_vec = params['mean'] | |
cov_mat = params['covar'] | |
gen = PSMGenerator(params['obs'], params['continuous'], n_binary_covars=params['binary'], | |
mean=mean_vec, covar=cov_mat, true_effect=params['tau'], seed=seed*2) | |
data = gen.generate_data() | |
name = "observational_data_{}.csv".format(i) | |
data_dict = {"true_effect": params['tau'], "observation": params['obs'], "continuous": params['continuous'], | |
"binary": params['binary'], "type": "observational"} | |
test_result = gen.test_data() | |
logger.info("Test result: {}\n".format(test_result)) | |
metadata_dict[name] = data_dict | |
gen.save_data(data_save_loc, name) | |
export_info(metadata_dict, metadata_save_loc, "observational") | |
def generate_rct_data(base_mean, base_cov, dset_size, max_cont, max_bin, min_obs, max_obs, | |
data_save_loc, metadata_save_loc, n_obs=None): | |
""" | |
Generates RCT data | |
Args: | |
base_mean (np.ndarray): Base mean vector for the covariates | |
base_cov (np.ndarray): Base covariance matrix for the covariates | |
dset_size (int): Number of datasets to generate | |
max_cont (int): Maximum number of continuous covariates | |
max_bin (int): Maximum number of binary covariates | |
min_obs (int): Minimum number of observations to generate | |
max_obs (int): Maximum number of observations to generate | |
data_save_loc (str): Directory to save the generated data files | |
metadata_save_loc (str): Directory to save the metadata information | |
n_obs (int, None): number of observations. If None, it will be randomly | |
generated within the range of min_obs and max_obs. | |
""" | |
logger = logging.getLogger("rct_data_logger") | |
logger.info("Generating RCT data") | |
metadata_dict = {} | |
base_seed = 197 | |
for i in range(dset_size): | |
logger.info("Iteration: {}".format(i)) | |
seed = (i + 1) * base_seed | |
params = config_hyperparameters(seed, base_mean, base_cov, max_cont, max_bin, n_obs, | |
max_obs, min_obs) | |
logger.info("n_observations:{}, n_continuous: {}, n_binary: {}".format( | |
params['obs'], params['continuous'], params['binary'])) | |
logger.info("true_effect: {}".format(params['tau'])) | |
mean_vec = params['mean'] | |
cov_mat = params['covar'] | |
gen = RCTGenerator(params['obs'], params['continuous'], n_binary_covars=params['binary'], | |
mean=mean_vec, covar=cov_mat, true_effect=params['tau'], seed=seed) | |
data = gen.generate_data() | |
test_result = gen.test_data() | |
data_dict = {"true_effect": params['tau'], "observation": params['obs'], "continuous": params['continuous'], | |
"binary": params['binary'], "type": "rct"} | |
name = "rct_data_{}.csv".format(i) | |
logger.info("Test result: {}\n".format(test_result)) | |
metadata_dict[name] = data_dict | |
gen.save_data(data_save_loc, name) | |
export_info(metadata_dict, metadata_save_loc, "rct") | |
def generate_multi_rct_data(base_mean, base_cov, dset_size, max_n_treat, max_cont, max_bin, min_obs, max_obs, | |
data_save_loc, metadata_save_loc, n_obs=None): | |
""" | |
Generate multi-treatment RCT data | |
Args: | |
base_mean (np.ndarray): Base mean vector for the covariates | |
base_cov (np.ndarray): Base covariance matrix for the covariates | |
dset_size (int): Number of datasets to generate | |
max_n_treat (int): Maximum number of treatment groups | |
max_cont (int): Maximum number of continuous covariates | |
max_bin (int): Maximum number of binary covariates | |
min_obs (int): Minimum number of observations to generate | |
max_obs (int): Maximum number of observations to generate | |
data_save_loc (str): Directory to save the generated data files | |
metadata_save_loc (str): Directory to save the metadata information | |
n_obs (int, None): number of observations. If None, it will be randomly | |
generated within the range of min_obs and max_obs. | |
""" | |
logger = logging.getLogger("multi_rct_data_logger") | |
logger.info("Generating multi-treatment RCT data") | |
metadata_dict = {} | |
base_seed = 173 | |
for i in range(dset_size): | |
logger.info("Iteration: {}".format(i)) | |
seed = (i+1) * base_seed | |
params = config_hyperparameters(seed, base_mean, base_cov, max_cont, max_bin, n_obs, | |
max_obs, min_obs, max_treat=max_n_treat) | |
n_treat = params['treat'] | |
logger.info("n_observations:{}, n_continuous: {}, n_binary: {}, n_treat: {}".format( | |
params['obs'], params['continuous'], params['binary'], n_treat)) | |
logger.info("true_effect: {}".format(params['tau_vec'])) | |
mean_vec = params['mean'] | |
cov_mat = params['covar'] | |
gen = MultiTreatRCTGenerator(params['obs'], params['continuous'], params['treat'], n_binary_covars=params['binary'], | |
mean=mean_vec, covar=cov_mat, true_effect_vec=params['tau_vec'], seed=seed, | |
true_effect=params['tau']) | |
data = gen.generate_data() | |
test_result = gen.test_data() | |
data_dict = {"true_effect": list(params['tau_vec']), "observation": params['obs'], "continuous": params['continuous'], | |
"binary": params['binary'], "type": "multi_rct"} | |
name = "multi_rct_data_{}.csv".format(i) | |
logger.info("Test result: {}\n".format(test_result)) | |
metadata_dict[name] = data_dict | |
gen.save_data(data_save_loc, name) | |
export_info(metadata_dict, metadata_save_loc, "multi_rct") | |
def generate_frontdoor_data(base_mean, base_cov, dset_size, max_cont, max_bin, min_obs, max_obs, | |
data_save_loc, metadata_save_loc, n_obs=None): | |
""" | |
Generates front-door data | |
Args: | |
base_mean (np.ndarray): Base mean vector for the covariates | |
base_cov (np.ndarray): Base covariance matrix for the covariates | |
dset_size (int): Number of datasets to generate | |
max_cont (int): Max number of continuous covariates | |
max_bin (int): Max number of binary covariates | |
min_obs (int): Minimum number of observations | |
max_obs (int): Maximum number of observations | |
data_save_loc (str): Folder to save generated CSV files | |
metadata_save_loc (str): Folder to save metadata JSON | |
n_obs (int or None): Fixed number of observations (if provided) | |
""" | |
logger = logging.getLogger("frontdoor_data_logger") | |
logger.info("Generating Front-Door synthetic data") | |
metadata_dict = {} | |
base_seed = 311 | |
for i in range(dset_size): | |
logger.info(f"Iteration: {i}") | |
seed = (i + 1) * base_seed | |
params = config_hyperparameters(seed, base_mean, base_cov, max_cont, max_bin, n_obs, | |
max_obs, min_obs) | |
logger.info("n_observations: {}, n_continuous: {}, n_binary: {}".format( | |
params['obs'], params['continuous'], params['binary'])) | |
logger.info("true_effect: {}".format(params['tau'])) | |
mean_vec = params['mean'] | |
cov_mat = params['covar'] | |
gen = FrontDoorGenerator( | |
n_observations=params['obs'], | |
n_continuous_covars=params['continuous'], | |
n_binary_covars=params['binary'], | |
mean=mean_vec, | |
covar=cov_mat, | |
true_effect=params['tau'], | |
seed=seed | |
) | |
data = gen.generate_data() | |
test_result = gen.test_data() | |
logger.info("Test result: {}\n".format(test_result)) | |
# Save CSV | |
filename = f"frontdoor_data_{i}.csv" | |
gen.save_data(data_save_loc, filename) | |
# Metadata | |
data_dict = { | |
"true_effect": params['tau'], | |
"observation": params['obs'], | |
"continuous": params['continuous'], | |
"binary": params['binary'], | |
"type": "frontdoor" | |
} | |
metadata_dict[filename] = data_dict | |
# Save metadata JSON | |
export_info(metadata_dict, metadata_save_loc, "frontdoor") | |
def generate_canonical_did_data(base_mean, base_cov, dset_size, max_cont, max_bin, min_obs, max_obs, | |
data_save_loc, metadata_save_loc, n_obs=None): | |
""" | |
Generate canonical DiD data | |
Args: | |
base_mean (np.ndarray): Base mean vector for the covariates | |
base_cov (np.ndarray): Base covariance matrix for the covariates | |
dset_size (int): Number of datasets to generate | |
max_cont (int): Maximum number of continuous covariates | |
max_bin (int): Maximum number of binary covariates | |
min_obs (int): Minimum number of observations to generate | |
max_obs (int): Maximum number of observations to generate | |
data_save_loc (str): Directory to save the generated data files | |
metadata_save_loc (str): Directory to save the metadata information | |
n_obs (int, None): number of observations. If None, it will be randomly | |
generated within the range of min_obs and max_obs. | |
""" | |
logger = logging.getLogger("did_data_logger") | |
logger.info("Generating canonical DiD data") | |
metadata_dict = {} | |
base_seed = 281 | |
for i in range(dset_size): | |
logger.info("Iteration: {}".format(i)) | |
seed = (i + 1) * base_seed | |
params = config_hyperparameters(seed, base_mean, base_cov, max_cont, max_bin, n_obs, | |
max_obs, min_obs) | |
logger.info("n_observations:{}, n_continuous: {}, n_binary: {}".format( | |
params['obs'], params['continuous'], params['binary'])) | |
logger.info("true_effect: {}".format(params['tau'])) | |
mean_vec = params['mean'] | |
cov_mat = params['covar'] | |
gen = DiDGenerator(params['obs'], params['continuous'], n_binary_covars=params['binary'], | |
mean=mean_vec, covar=cov_mat, true_effect=params['tau'], seed=seed) | |
data = gen.generate_data() | |
test_result = gen.test_data() | |
data_dict = {"true_effect": params['tau'], "observation": params['obs'], "continuous": params['continuous'], | |
"binary": params['binary'], "type": "did_canonical"} | |
name = "did_canonical_data_{}.csv".format(i) | |
logger.info("Test result: {}\n".format(test_result)) | |
metadata_dict[name] = data_dict | |
gen.save_data(data_save_loc, name) | |
export_info(metadata_dict, metadata_save_loc, "did_canonical") | |
def generate_data_iv(base_mean, base_cov, dset_size, max_cont, max_bin, min_obs, max_obs, | |
data_save_loc, metadata_save_loc, n_obs=None): | |
""" | |
Generate IV data | |
Args: | |
base_mean (np.ndarray): Base mean vector for the covariates | |
base_cov (np.ndarray): Base covariance matrix for the covariates | |
dset_size (int): Number of datasets to generate | |
max_cont (int): Maximum number of continuous covariates | |
max_bin (int): Maximum number of binary covariates | |
min_obs (int): Minimum number of observations to generate | |
max_obs (int): Maximum number of observations to generate | |
data_save_loc (str): Directory to save the generated data files | |
metadata_save_loc (str): Directory to save the metadata information | |
n_obs (int, None): number of observations. If None, it will be randomly | |
generated within the range of min_obs and max_obs. | |
""" | |
logger = logging.getLogger("iv_data_logger") | |
logger.info("Generating IV data") | |
metadata_dict = {} | |
base_seed = 343 | |
for i in range(dset_size): | |
logger.info("Iteration: {}".format(i)) | |
seed = (i + 1) * base_seed | |
params = config_hyperparameters(seed, base_mean, base_cov, max_cont, max_bin, n_obs, | |
max_obs, min_obs) | |
logger.info("n_observations:{}, n_continuous: {}, n_binary: {}".format( | |
params['obs'], params['continuous'], params['binary'])) | |
logger.info("true_effect: {}".format(params['tau'])) | |
mean_vec = params['mean'] | |
cov_mat = params['covar'] | |
gen = IVGenerator(params['obs'], params['continuous'], n_binary_covars=params['binary'], | |
mean=mean_vec, covar=cov_mat, true_effect=params['tau'], seed=seed) | |
data = gen.generate_data() | |
test_result = gen.test_data() | |
data_dict = {"true_effect": params['tau'], "observation": params['obs'], "continuous": params['continuous'], | |
"binary": params['binary'], "type": "IV"} | |
name = "iv_data_{}.csv".format(i) | |
logger.info("Test result: {}\n".format(test_result)) | |
metadata_dict[name] = data_dict | |
gen.save_data(data_save_loc, name) | |
export_info(metadata_dict, metadata_save_loc, "iv") | |
def generate_twfe_did_data(base_mean, base_cov, dset_size, max_cont, max_bin, n_periods, | |
min_obs, max_obs, data_save_loc, metadata_save_loc, n_obs=None): | |
""" | |
Generate TWFE DiD data | |
Args: | |
base_mean (np.ndarray): Base mean vector for the covariates | |
base_cov (np.ndarray): Base covariance matrix for the covariates | |
dset_size (int): Number of datasets to generate | |
max_cont (int): Maximum number of continuous covariates | |
max_bin (int): Maximum number of binary covariates | |
n_periods (int): Number of periods for the DiD data | |
min_obs (int): Minimum number of observations to generate | |
max_obs (int): Maximum number of observations to generate | |
data_save_loc (str): Directory to save the generated data files | |
metadata_save_loc (str): Directory to save the metadata information | |
n_obs (int, None): number of observations. If None, it will be randomly | |
generated within the range of min_obs and max_obs. | |
""" | |
logger = logging.getLogger("did_data_logger") | |
logger.info("Generating TWFE DiD data") | |
metadata_dict = {} | |
base_seed = 447 | |
print("preiods: ", n_periods) | |
for i in range(dset_size): | |
logger.info("Iteration: {}".format(i)) | |
seed = (i + 1) * base_seed | |
params = config_hyperparameters(seed, base_mean, base_cov, max_cont, max_bin, n_obs, | |
max_obs, min_obs, max_periods=n_periods) | |
logger.info("n_observations:{}, n_continuous: {}, n_binary: {}, n_periods:{}".format( | |
params['obs'], params['continuous'], params['binary'], params['periods'])) | |
logger.info("true_effect: {}".format(params['tau'])) | |
mean_vec = params['mean'] | |
cov_mat = params['covar'] | |
gen = DiDGenerator(params['obs'], params['continuous'], n_binary_covars=params['binary'], | |
mean=mean_vec, covar=cov_mat, true_effect=params['tau'], seed=seed, | |
n_periods=n_periods) | |
data = gen.generate_data() | |
test_result = gen.test_data() | |
data_dict = {"true_effect": params['tau'], "observation": params['obs'], "continuous": params['continuous'], | |
"binary": params['binary'], "type": "did_twfe", "periods": params['periods']} | |
name = "did_twfe_data_{}.csv".format(i) | |
logger.info("Test result: {}\n".format(test_result)) | |
metadata_dict[name] = data_dict | |
gen.save_data(data_save_loc, name) | |
export_info(metadata_dict, metadata_save_loc, "did_twfe") | |
def generate_encouragement_data(base_mean, base_cov, dset_size, max_cont, max_bin, min_obs, max_obs, | |
data_save_loc, metadata_save_loc, n_obs=None): | |
""" | |
Generate encouragement design data | |
Args: | |
base_mean (np.ndarray): Base mean vector for the covariates | |
base_cov (np.ndarray): Base covariance matrix for the covariates | |
dset_size (int): Number of datasets to generate | |
max_cont (int): Maximum number of continuous covariates | |
max_bin (int): Maximum number of binary covariates | |
min_obs (int): Minimum number of observations to generate | |
max_obs (int): Maximum number of observations to generate | |
data_save_loc (str): Directory to save the generated data files | |
metadata_save_loc (str): Directory to save the metadata information | |
n_obs (int, None): number of observations. If None, it will be randomly | |
generated within the range of min_obs and max_obs. | |
""" | |
logger = logging.getLogger("iv_data_logger") | |
logger.info("Generating encouragement design data") | |
metadata_dict = {} | |
base_seed = 571 | |
for i in range(dset_size): | |
logger.info("Iteration: {}".format(i)) | |
seed = (i + 1) * base_seed | |
params = config_hyperparameters(seed, base_mean, base_cov, max_cont, max_bin, n_obs, | |
max_obs, min_obs) | |
logger.info("n_observations:{}, n_continuous: {}, n_binary: {}".format( | |
params['obs'], params['continuous'], params['binary'])) | |
logger.info("true_effect: {}".format(params['tau'])) | |
mean_vec = params['mean'] | |
cov_mat = params['covar'] | |
gen = IVGenerator(params['obs'], params['continuous'], n_binary_covars=params['binary'], | |
mean=mean_vec, covar=cov_mat, true_effect=params['tau'], seed=seed, | |
encouragement=True) | |
data = gen.generate_data() | |
test_result = gen.test_data() | |
data_dict = {"true_effect": params['tau'], "observation": params['obs'], "continuous": params['continuous'], | |
"binary": params['binary'], "type": "encouragement"} | |
name = "iv_encouragement_data_{}.csv".format(i) | |
logger.info("Test result: {}\n".format(test_result)) | |
metadata_dict[name] = data_dict | |
gen.save_data(data_save_loc, name) | |
export_info(metadata_dict, metadata_save_loc, "iv_encouragement") | |
def generate_rdd_data(base_mean, base_cov, dset_size, max_cont, max_bin, max_cutoff, | |
min_obs, max_obs, data_save_loc, metadata_save_loc, n_obs=None): | |
""" | |
Generates (sharp) RDD data | |
Args: | |
base_mean (np.ndarray): Base mean vector for the covariates | |
base_cov (np.ndarray): Base covariance matrix for the covariates | |
dset_size (int): Number of datasets to generate | |
max_cont (int): Maximum number of continuous covariates | |
max_bin (int): Maximum number of binary covariates | |
max_cutoff (int): Maximum value for the cutoff in RDD data | |
min_obs (int): Minimum number of observations to generate | |
max_obs (int): Maximum number of observations to generate | |
data_save_loc (str): Directory to save the generated data files | |
metadata_save_loc (str): Directory to save the metadata information | |
n_obs (int, None): number of observations. If None, it will be randomly | |
generated within the range of min_obs and max_obs. | |
""" | |
logger = logging.getLogger("rdd_data_logger") | |
logger.info("Generating RDD data") | |
metadata_dict = {} | |
base_seed = 683 | |
for i in range(dset_size): | |
logger.info("Iteration:{}".format(i)) | |
seed = (i + 1) * base_seed | |
params = config_hyperparameters(seed, base_mean, base_cov, max_cont, max_bin, n_obs, | |
max_obs, min_obs, cutoff_max=max_cutoff) | |
logger.info("n_observations:{}, n_continuous: {}, n_binary: {}, cutoff:{}".format( | |
params['obs'], params['continuous'], params['binary'], params['cutoff'])) | |
logger.info("true_effect: {}".format(params['tau'])) | |
mean_vec = params['mean'] | |
cov_mat = params['covar'] | |
gen = RDDGenerator(params['obs'], params['continuous'], n_binary_covars=params['binary'], | |
mean=mean_vec, covar=cov_mat, true_effect=params['tau'], seed=seed, | |
cutoff=params['cutoff'], plot=True) | |
data = gen.generate_data() | |
test_result = gen.test_data() | |
data_dict = {"true_effect": params['tau'], "observation": params['obs'], "continuous": params['continuous'], | |
"binary": params['binary'], "type": "rdd", 'cutoff': params['cutoff']} | |
name = "rdd_data_{}.csv".format(i) | |
logger.info("Test result: {}\n".format(test_result)) | |
metadata_dict[name] = data_dict | |
gen.save_data(data_save_loc, name) | |
export_info(metadata_dict, metadata_save_loc, "rdd") | |