import os import argparse import numpy as np import pandas as pd import matplotlib.pyplot as plt import seaborn as sns from sklearn.model_selection import train_test_split from sklearn.preprocessing import OneHotEncoder, StandardScaler from sklearn.metrics import roc_curve, auc, precision_recall_curve import statsmodels.api as sm # import warnings # warnings.filterwarnings('ignore') def load_data(file_path): df = pd.read_csv(file_path, index_col=0) return df def explore_data(df): print("Dataset shape:", df.shape) print("\nFeature data types:") print(df.dtypes) print("\nCheck for missing values:") print(df.isnull().sum()) # Check class balance for target variable print("\nTarget variable distribution:") target_dist = df['is_effective'].value_counts(normalize=True) * 100 print(target_dist) return df def analyze_feature_correlations(df): numeric_cols = df.select_dtypes(include=['int64', 'float64']).columns.tolist() corr_matrix = df[numeric_cols].corr(method='spearman') #plt.figure(figsize=(5,5)) sns.heatmap(corr_matrix, annot=True, cmap='coolwarm', linewidths=0.5, fmt='.2f', annot_kws={"size": 8}) plt.title('Feature Correlation Matrix') plt.tight_layout() plt.savefig('feature_correlation_matrix.png', dpi=300, bbox_inches='tight') plt.close() # Show correlations with target variable target_corr = corr_matrix['is_effective'].sort_values(ascending=False) print("Correlations with target variable 'is_effective':") print(target_corr) return corr_matrix def prepare_data(X, y, split_seed=42): """ Prepare data for model training by scaling numeric features and encoding categorical features. Args: X: DataFrame containing features. Expects indices to be drug-cell_line. y: Series containing target variable Returns: X_train, X_test, y_train, y_test: Train-test split of prepared data """ # Assert that indices of X and y are the same assert X.index.equals(y.index), "X and y must have the same indices" # Extract drug and cell line info drug_name = X.index.str.split('_CVCL_').str[0] cell_line_name = 'CVCL_' + X.index.str.split('_CVCL_').str[1] cat_cols = X.select_dtypes(include=['object', 'category']).columns.tolist() num_cols = X.select_dtypes(include=['int64', 'float64']).columns.tolist() # Create a copy of X to avoid modifying the original X_processed = X.copy() scaler = StandardScaler() if num_cols: X_processed[num_cols] = scaler.fit_transform(X_processed[num_cols]) if cat_cols: encoder = OneHotEncoder(sparse_output=False, handle_unknown='ignore', drop='first') encoded_cats = encoder.fit_transform(X_processed[cat_cols]) feature_names = encoder.get_feature_names_out(cat_cols) encoded_df = pd.DataFrame( encoded_cats, columns=feature_names, index=X_processed.index ) X_processed = X_processed.drop(cat_cols, axis=1) X_processed = pd.concat([X_processed, encoded_df], axis=1) # Split based on unique drugs to prevent data leakage unique_drugs = drug_name.unique() train_drugs, test_drugs = train_test_split( unique_drugs, test_size=0.8, random_state=split_seed ) # Create train/test masks based on drug splits train_mask = drug_name.isin(train_drugs) test_mask = drug_name.isin(test_drugs) # Split the data X_train, X_test = X_processed.loc[train_mask], X_processed.loc[test_mask] y_train, y_test = y.loc[train_mask], y.loc[test_mask] # # Verify stratification is reasonable # print(f"Train set positive rate: {y_train.mean():.4f}") # print(f"Test set positive rate: {y_test.mean():.4f}") # print(f"Training set: {X_train.shape}") # print(f"Testing set: {X_test.shape}") return X_train, X_test, y_train, y_test def evaluate_model(model, X_test, y_test): """Evaluate model performance and return PR AUC score.""" X_test_const = sm.add_constant(X_test) y_prob = model.predict(X_test_const) y_pred = (y_prob > 0.5).astype(int) # Calculate ROC AUC for model fpr, tpr, _ = roc_curve(y_test, y_prob) roc_auc = auc(fpr, tpr) print(f"\nROC AUC: {roc_auc:.4f}") # Calculate baseline ROC AUC (random classifier) no_skill_prob = np.ones(len(y_test)) * np.mean(y_test) fpr_baseline, tpr_baseline, _ = roc_curve(y_test, no_skill_prob) roc_auc_baseline = auc(fpr_baseline, tpr_baseline) print(f"Baseline ROC AUC: {roc_auc_baseline:.4f}") # Calculate PR AUC for model precision, recall, _ = precision_recall_curve(y_test, y_prob) pr_auc = auc(recall, precision) print(f"PR AUC: {pr_auc:.4f}") # Calculate baseline PR AUC no_skill = (y_test.sum()/y_test.shape[0])[0] precision_baseline = np.ones_like(recall) * no_skill pr_auc_baseline = auc(recall, precision_baseline) print(f"Baseline PR AUC: {pr_auc_baseline:.4f}") # # ROC curve with baseline comparison # plt.figure(figsize=(5,5)) # plt.plot(fpr, tpr, color='darkorange', lw=2, label=f'Model ROC (AUC = {roc_auc:.2f})') # plt.plot(fpr_baseline, tpr_baseline, color='green', lw=2, linestyle='--', # label=f'Baseline ROC (AUC = {roc_auc_baseline:.2f})') # plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--') # plt.xlim([0.0, 1.0]) # plt.ylim([0.0, 1.05]) # plt.xlabel('False Positive Rate') # plt.ylabel('True Positive Rate') # plt.title('Receiver Operating Characteristic') # plt.legend(loc='lower right') # plt.tight_layout() # plt.savefig('roc_curve.png') # Plot Precision-Recall curve with baseline plt.figure(figsize=(5,5)) plt.plot(recall, precision, color='blue', lw=2, label=f'Model (AUC = {pr_auc:.2f})') plt.plot([0, 1], [no_skill, no_skill], linestyle='--', color='red', label=f'No Skill ({no_skill:.2f})') plt.xlabel('Recall') plt.ylabel('Precision') plt.title('Precision-Recall Curve') plt.legend() plt.tight_layout() plt.savefig('precision_recall_curve.png') return y_pred, y_prob, pr_auc, pr_auc_baseline def train_linear_model(X_train, y_train, alpha=0.01): X_train_const = sm.add_constant(X_train) final_logit = sm.Logit(y_train, X_train_const) # Train model with L1 regularization (Lasso) and robust standard errors final_model = final_logit.fit_regularized( method='l1', alpha=alpha, cov_type='HC3', trim_mode='auto', maxiter=10_000 ) # Try L2 regularization (Ridge) if L1 fails to converge if not hasattr(final_model, 'params'): print("L1 regularization did not converge, trying L2...") final_model = final_logit.fit_regularized( method='l2', alpha=alpha, cov_type='HC3', trim_mode='auto', maxiter=10_000 ) train_pred_proba = final_model.predict(X_train_const) train_precision, train_recall, _ = precision_recall_curve(y_train, train_pred_proba) train_pr_auc = auc(train_recall, train_precision) print(f"Model training Precision-Recall AUC: {train_pr_auc:.4f}") fpr, tpr, _ = roc_curve(y_train, train_pred_proba) train_roc_auc = auc(fpr, tpr) print(f"Model training ROC AUC: {train_roc_auc:.4f}") print(f"Regularization: alpha={alpha}") return final_model def analyze_coefficients(model, feature_names): feature_names = ['const'] + feature_names.tolist() coefficients = model.params odds_ratios = np.exp(coefficients) coef_df = pd.DataFrame({ 'Feature': feature_names, 'Coefficient': coefficients, 'Odds_Ratio': odds_ratios, 'HC_Std_Error': model.bse, 'z_value': model.tvalues, 'p_value': model.pvalues, 'CI_Lower_95': model.conf_int(alpha=0.05).iloc[:, 0], 'CI_Upper_95': model.conf_int(alpha=0.05).iloc[:, 1] }) # Sort by absolute coefficient value (excluding constant) coef_df['Abs_Coefficient'] = np.abs(coef_df['Coefficient']) coef_df = coef_df.sort_values('Abs_Coefficient', ascending=False).drop('Abs_Coefficient', axis=1) # Add significance indicator coef_df['Significant'] = coef_df['p_value'] < 0.05 # Print top significant coefficients (excluding constant) print("\nTop significant features (p < 0.05):") sig_features = coef_df[coef_df['Significant'] & (coef_df['Feature'] != 'const')] if len(sig_features) > 0: print(sig_features.head(10)[['Feature', 'Coefficient', 'p_value', 'Odds_Ratio', 'CI_Lower_95', 'CI_Upper_95']]) else: print("No statistically significant features found.") # Count significant coefficients (excluding constant) n_significant = sum(coef_df['Significant'] & (coef_df['Feature'] != 'const')) n_features = sum(coef_df['Feature'] != 'const') print(f"\nNumber of significant coefficients (p < 0.05): {n_significant} out of {n_features}") # Plot top coefficients with confidence intervals (excluding constant) plt.figure(figsize=(5,5)) # Get top 15 features by absolute coefficient value (excluding constant) top_coefs = coef_df[coef_df['Feature'] != 'const'].head(15) # Plot coefficients with confidence intervals colors = ['blue' if sig else 'gray' for sig in top_coefs['Significant']] for i, (coef, ci_lower, ci_upper, color) in enumerate(zip( top_coefs['Coefficient'], top_coefs['CI_Lower_95'], top_coefs['CI_Upper_95'], colors)): plt.errorbar( x=coef, y=i, xerr=[[coef - ci_lower], [ci_upper - coef]], fmt='o', capsize=5, color=color ) # Add feature names as y-tick labels plt.yticks(range(len(top_coefs)), top_coefs['Feature']) plt.axvline(x=0, color='black', linestyle='-', alpha=0.3) plt.title('Top Logistic Regression Coefficients with 95% CIs') plt.xlabel('Coefficient Value') plt.tight_layout() plt.savefig('top_coefficients.png', dpi=300, bbox_inches='tight') return coef_df # def main(): # parser = argparse.ArgumentParser() # parser.add_argument('--file_path', type=str, help='Path to the input CSV file') # parser.add_argument('--output_dir', type=str, default='results', help='Directory to save results') # args = parser.parse_args() # if not os.path.exists(args.output_dir): # os.makedirs(args.output_dir) # df = load_data(args.file_path) # df = explore_data(df) # corr_matrix = analyze_feature_correlations(df) # X_train, X_test, y_train, y_test = prepare_data(df) # lr_model = train_linear_model(X_train, y_train) # lr_pred, lr_prob = evaluate_model(lr_model, X_test, y_test) # coef_df = analyze_coefficients(lr_model, X_train.columns) # results = { # 'logistic_regression': lr_model, # 'lr_coefficients': coef_df, # } # return results, df # if __name__ == "__main__": # results, df = main()