Spaces:
Running
Running
File size: 5,000 Bytes
08ccc8e |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 |
import os
import subprocess
import sys
import warnings
import pandas as pd
from rdkit import Chem, RDLogger
from sklearn.model_selection import train_test_split
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
from utils import remove_atom_mapping, seed_everything
seed_everything(seed=42)
# Disable RDKit warnings and Python warnings
RDLogger.DisableLog("rdApp.*")
warnings.filterwarnings("ignore")
script_dir = os.path.abspath(os.path.dirname(__file__))
project_root = os.path.abspath(os.path.join(script_dir, ".."))
data_dir = os.path.join(project_root, "data")
files_to_download = [
"1ZPsoUYb4HcxFzK_ac9rb_pQj7oO3Gagh",
"1XwkxxHiaWFbSNhGyxnv6hAliutIMNrIp",
"1yIwUH_OhER9nuMo9HjBhBmyc6zvmrSPA",
"1skFRirstIUijhieshvJEScBD2aB3H1YU",
"1fa2MyLdN1vcA7Rysk8kLQENE92YejS9B",
]
for file_id in files_to_download:
subprocess.run(
f"gdown 'https://drive.google.com/uc?export=download&id={file_id}'", shell=True
)
# Move downloaded files to data directory
subprocess.run("mv *.smi " + data_dir, shell=True)
subprocess.run("mv *.tsv " + data_dir, shell=True)
# Function to process SMILES files and save canonicalized versions
def process_smiles_files(file_paths):
unique_smiles = set()
for file_path in file_paths:
suppl = Chem.SmilesMolSupplier(file_path)
for mol in suppl:
if mol is not None:
try:
sm = Chem.MolToSmiles(mol, canonical=True)
unique_smiles.add(sm)
except:
continue
df = pd.DataFrame({"smiles": list(unique_smiles)})
df.to_csv(os.path.join(data_dir, "ZINC-canonicalized.csv"), index=False)
train, valid = train_test_split(df, test_size=0.1)
# Save train and validation data
train.to_csv(os.path.join(data_dir, "ZINC-canonicalized-train.csv"), index=False)
valid.to_csv(os.path.join(data_dir, "ZINC-canonicalized-valid.csv"), index=False)
# Process 16_p files
process_smiles_files([os.path.join(data_dir, f"16_p{i}.smi") for i in range(4)])
# Load reaction data
ord_df = pd.read_csv(
os.path.join(data_dir, "all_ord_reaction_uniq_with_attr20240506_v1.tsv"),
sep="\t",
names=["id", "input", "product", "condition"],
)
def data_split(row):
categories = [
"CATALYST",
"REACTANT",
"REAGENT",
"SOLVENT",
"INTERNAL_STANDARD",
"NoData",
]
data = {cat: [] for cat in categories}
input_data = row["input"]
if isinstance(input_data, str):
for item in input_data.split("."):
for cat in categories:
if cat in item:
data[cat].append(item[item.find(":") + 1 :])
break
for key, value in data.items():
data[key] = ".".join(value)
product_data = row["product"]
if isinstance(product_data, str):
product_data = product_data.replace(".PRODUCT", "PRODUCT")
pro_lis = []
for item in product_data.split("PRODUCT:"):
if item != "":
pro_lis.append(item)
data["PRODUCT"] = ".".join(pro_lis)
else:
data["PRODUCT"] = None
condition_data = row["condition"]
if isinstance(condition_data, str):
data["YIELD"] = (
float(condition_data.split(":")[1]) if "YIELD" in condition_data else None
)
temp_pos = condition_data.find("TEMP")
data["TEMP"] = (
float(condition_data[temp_pos:].split(":")[1])
if "TEMP" in condition_data
else None
)
else:
data["YIELD"] = None
data["TEMP"] = None
return list(data.values())
# Split data and create cleaned DataFrame
categories = [
"CATALYST",
"REACTANT",
"REAGENT",
"SOLVENT",
"INTERNAL_STANDARD",
"NoData",
"PRODUCT",
"YIELD",
"TEMP",
]
cleaned_data = {cat: [] for cat in categories}
for _, row in ord_df.iterrows():
split_data = data_split(row)
for i, value in enumerate(split_data):
cleaned_data[categories[i]].append(value)
cleaned_df = pd.DataFrame(cleaned_data)
# Apply remove_atom_mapping function to relevant columns
for column in [
"CATALYST",
"REACTANT",
"REAGENT",
"SOLVENT",
"INTERNAL_STANDARD",
"NoData",
"PRODUCT",
]:
cleaned_df[column] = cleaned_df[column].apply(
lambda x: remove_atom_mapping(x) if isinstance(x, str) else None
)
# Save cleaned DataFrame
cleaned_df.to_csv(os.path.join(data_dir, "preprocessed_ord.tsv"), index=False)
train, valid = train_test_split(cleaned_df, test_size=int(len(cleaned_df) * 0.1))
train, test = train_test_split(train, test_size=int(len(cleaned_df) * 0.1))
# Save train and validation data
train.to_csv(os.path.join(data_dir, "preprocessed_ord_train.csv"), index=False)
valid.to_csv(os.path.join(data_dir, "preprocessed_ord_valid.csv"), index=False)
test.to_csv(os.path.join(data_dir, "preprocessed_ord_test.csv"), index=False)
|