|
|
|
import gradio as gr |
|
import torch |
|
|
|
|
|
import pytorch_lightning as pl |
|
import os |
|
import json |
|
import logging |
|
from tokenizers import Tokenizer |
|
from huggingface_hub import hf_hub_download |
|
import gc |
|
from rdkit.Chem import CanonSmiles |
|
|
|
|
|
|
|
MODEL_REPO_ID = ( |
|
"AdrianM0/smiles-to-iupac-translator" |
|
) |
|
CHECKPOINT_FILENAME = "last.ckpt" |
|
SMILES_TOKENIZER_FILENAME = "smiles_bytelevel_bpe_tokenizer_scaled.json" |
|
IUPAC_TOKENIZER_FILENAME = "iupac_unigram_tokenizer_scaled.json" |
|
CONFIG_FILENAME = "config.json" |
|
|
|
|
|
|
|
logging.basicConfig( |
|
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" |
|
) |
|
|
|
|
|
try: |
|
from enhanced_trainer import SmilesIupacLitModule, generate_square_subsequent_mask |
|
|
|
logging.info("Successfully imported from enhanced_trainer.py.") |
|
except ImportError as e: |
|
logging.error( |
|
f"Failed to import helper code from enhanced_trainer.py: {e}. " |
|
f"Make sure enhanced_trainer.py is in the root of the Hugging Face repo '{MODEL_REPO_ID}'." |
|
) |
|
raise gr.Error( |
|
f"Initialization Error: Could not load necessary Python modules (enhanced_trainer.py). Check Space logs. Error: {e}" |
|
) |
|
except Exception as e: |
|
logging.error( |
|
f"An unexpected error occurred during helper code import: {e}", exc_info=True |
|
) |
|
raise gr.Error( |
|
f"Initialization Error: An unexpected error occurred loading helper modules. Check Space logs. Error: {e}" |
|
) |
|
|
|
|
|
model: pl.LightningModule | None = None |
|
smiles_tokenizer: Tokenizer | None = None |
|
iupac_tokenizer: Tokenizer | None = None |
|
device: torch.device | None = None |
|
config: dict | None = None |
|
|
|
|
|
|
|
def greedy_decode( |
|
model: pl.LightningModule, |
|
src: torch.Tensor, |
|
src_padding_mask: torch.Tensor, |
|
max_len: int, |
|
sos_idx: int, |
|
eos_idx: int, |
|
device: torch.device, |
|
) -> torch.Tensor: |
|
""" |
|
Performs greedy decoding using the LightningModule's model. |
|
""" |
|
model.eval() |
|
transformer_model = model.model |
|
|
|
try: |
|
with torch.no_grad(): |
|
|
|
memory = transformer_model.encode( |
|
src, src_padding_mask |
|
) |
|
memory = memory.to(device) |
|
memory_key_padding_mask = src_padding_mask.to(memory.device) |
|
|
|
|
|
|
|
ys = torch.ones(1, 1, dtype=torch.long, device=device).fill_( |
|
sos_idx |
|
) |
|
|
|
|
|
for _ in range(max_len - 1): |
|
tgt_seq_len = ys.shape[1] |
|
tgt_mask = generate_square_subsequent_mask(tgt_seq_len, device).to( |
|
device |
|
) |
|
|
|
tgt_padding_mask = torch.zeros( |
|
ys.shape, dtype=torch.bool, device=device |
|
) |
|
|
|
|
|
decoder_output = transformer_model.decode( |
|
tgt=ys, |
|
memory=memory, |
|
tgt_mask=tgt_mask, |
|
tgt_padding_mask=tgt_padding_mask, |
|
memory_key_padding_mask=memory_key_padding_mask, |
|
) |
|
|
|
|
|
next_token_logits = transformer_model.generator( |
|
decoder_output[ |
|
:, -1, : |
|
] |
|
) |
|
|
|
|
|
|
|
|
|
next_word_id_tensor = torch.argmax(next_token_logits, dim=1) |
|
next_word_id = next_word_id_tensor.item() |
|
|
|
|
|
ys = torch.cat( |
|
[ |
|
ys, |
|
torch.ones(1, 1, dtype=torch.long, device=device).fill_( |
|
next_word_id |
|
), |
|
], |
|
dim=1, |
|
) |
|
|
|
|
|
if next_word_id == eos_idx: |
|
break |
|
|
|
|
|
return ys[:, 1:] |
|
|
|
except RuntimeError as e: |
|
logging.error(f"Runtime error during greedy decode: {e}", exc_info=True) |
|
if "CUDA out of memory" in str(e) and device.type == "cuda": |
|
gc.collect() |
|
torch.cuda.empty_cache() |
|
return torch.empty( |
|
(1, 0), dtype=torch.long, device=device |
|
) |
|
except Exception as e: |
|
logging.error(f"Unexpected error during greedy decode: {e}", exc_info=True) |
|
return torch.empty((1, 0), dtype=torch.long, device=device) |
|
|
|
|
|
|
|
def translate( |
|
model: pl.LightningModule, |
|
src_sentence: str, |
|
smiles_tokenizer: Tokenizer, |
|
iupac_tokenizer: Tokenizer, |
|
device: torch.device, |
|
max_len: int, |
|
sos_idx: int, |
|
eos_idx: int, |
|
pad_idx: int, |
|
) -> str: |
|
""" |
|
Translates a single SMILES string using greedy decoding. |
|
""" |
|
model.eval() |
|
|
|
|
|
try: |
|
|
|
smiles_tokenizer.enable_truncation( |
|
max_length=max_len |
|
) |
|
src_encoded = smiles_tokenizer.encode(src_sentence) |
|
if not src_encoded or not src_encoded.ids: |
|
logging.warning(f"Encoding failed or empty for SMILES: {src_sentence}") |
|
return "[Encoding Error]" |
|
|
|
src_ids = src_encoded.ids |
|
except Exception as e: |
|
logging.error(f"Error tokenizing SMILES '{src_sentence}': {e}", exc_info=True) |
|
return "[Encoding Error]" |
|
|
|
|
|
src = ( |
|
torch.tensor(src_ids, dtype=torch.long).unsqueeze(0).to(device) |
|
) |
|
|
|
src_padding_mask = (src == pad_idx).to(device) |
|
|
|
|
|
|
|
|
|
generation_max_len = config.get( |
|
"max_len", 256 |
|
) |
|
tgt_tokens_tensor = greedy_decode( |
|
model=model, |
|
src=src, |
|
src_padding_mask=src_padding_mask, |
|
max_len=generation_max_len, |
|
sos_idx=sos_idx, |
|
eos_idx=eos_idx, |
|
|
|
device=device, |
|
) |
|
|
|
|
|
if tgt_tokens_tensor is None or tgt_tokens_tensor.numel() == 0: |
|
logging.warning( |
|
f"Greedy decode returned empty tensor for SMILES: {src_sentence}" |
|
) |
|
return "[Decoding Error - Empty Output]" |
|
|
|
tgt_tokens = tgt_tokens_tensor.flatten().cpu().numpy().tolist() |
|
try: |
|
|
|
translation = iupac_tokenizer.decode(tgt_tokens, skip_special_tokens=True) |
|
return translation |
|
except Exception as e: |
|
logging.error( |
|
f"Error decoding target tokens {tgt_tokens}: {e}", |
|
exc_info=True, |
|
) |
|
return "[Decoding Error]" |
|
|
|
|
|
|
|
def load_model_and_tokenizers(): |
|
"""Loads tokenizers, config, and model from Hugging Face Hub.""" |
|
global model, smiles_tokenizer, iupac_tokenizer, device, config |
|
if model is not None: |
|
logging.info("Model and tokenizers already loaded.") |
|
return |
|
|
|
logging.info(f"Starting model and tokenizer loading from {MODEL_REPO_ID}...") |
|
try: |
|
|
|
if torch.cuda.is_available(): |
|
logging.warning( |
|
"CUDA is available, but forcing CPU for Gradio app simplicity. Modify if GPU is intended." |
|
) |
|
device = torch.device("cpu") |
|
|
|
|
|
else: |
|
device = torch.device("cpu") |
|
logging.info("CUDA not available, using CPU.") |
|
|
|
|
|
logging.info("Downloading files from Hugging Face Hub...") |
|
try: |
|
cache_dir = os.environ.get("GRADIO_CACHE", "./hf_cache") |
|
os.makedirs(cache_dir, exist_ok=True) |
|
logging.info(f"Using cache directory: {cache_dir}") |
|
|
|
checkpoint_path = hf_hub_download( |
|
repo_id=MODEL_REPO_ID, filename=CHECKPOINT_FILENAME, cache_dir=cache_dir |
|
) |
|
smiles_tokenizer_path = hf_hub_download( |
|
repo_id=MODEL_REPO_ID, |
|
filename=SMILES_TOKENIZER_FILENAME, |
|
cache_dir=cache_dir, |
|
) |
|
iupac_tokenizer_path = hf_hub_download( |
|
repo_id=MODEL_REPO_ID, |
|
filename=IUPAC_TOKENIZER_FILENAME, |
|
cache_dir=cache_dir, |
|
) |
|
config_path = hf_hub_download( |
|
repo_id=MODEL_REPO_ID, filename=CONFIG_FILENAME, cache_dir=cache_dir |
|
) |
|
logging.info("Files downloaded successfully.") |
|
except Exception as e: |
|
logging.error( |
|
f"Failed to download files from {MODEL_REPO_ID}. Check filenames and repo status. Error: {e}", |
|
exc_info=True, |
|
) |
|
raise gr.Error( |
|
f"Download Error: Could not download required files from {MODEL_REPO_ID}. Check Space logs. Error: {e}" |
|
) |
|
|
|
|
|
logging.info("Loading configuration...") |
|
try: |
|
with open(config_path, "r") as f: |
|
config = json.load(f) |
|
logging.info("Configuration loaded.") |
|
|
|
required_keys = [ |
|
"src_vocab_size", |
|
"tgt_vocab_size", |
|
"emb_size", |
|
"nhead", |
|
"ffn_hid_dim", |
|
"num_encoder_layers", |
|
"num_decoder_layers", |
|
"dropout", |
|
"max_len", |
|
"pad_token_id", |
|
"bos_token_id", |
|
"eos_token_id", |
|
] |
|
|
|
config_key_mapping = { |
|
"src_vocab_size": config.get( |
|
"src_vocab_size", config.get("src_vocab_size") |
|
), |
|
"tgt_vocab_size": config.get( |
|
"tgt_vocab_size", config.get("tgt_vocab_size") |
|
), |
|
|
|
} |
|
config.update(config_key_mapping) |
|
|
|
missing_keys = [key for key in required_keys if config.get(key) is None] |
|
if missing_keys: |
|
raise ValueError( |
|
f"Config file '{CONFIG_FILENAME}' is missing required keys: {missing_keys}. " |
|
f"Ensure these were saved in the hyperparameters during training." |
|
) |
|
|
|
logging.info( |
|
f"Using config values: src_vocab={config['src_vocab_size']}, tgt_vocab={config['tgt_vocab_size']}, " |
|
f"emb={config['emb_size']}, nhead={config['nhead']}, enc={config['num_encoder_layers']}, dec={config['num_decoder_layers']}, " |
|
f"pad={config['pad_token_id']}, sos={config['bos_token_id']}, eos={config['eos_token_id']}, max_len={config['max_len']}" |
|
) |
|
|
|
except FileNotFoundError: |
|
logging.error(f"Config file not found: {config_path}") |
|
raise gr.Error(f"Config Error: Config file '{CONFIG_FILENAME}' not found.") |
|
except json.JSONDecodeError as e: |
|
logging.error(f"Error decoding JSON from config file {config_path}: {e}") |
|
raise gr.Error( |
|
f"Config Error: Could not parse '{CONFIG_FILENAME}'. Error: {e}" |
|
) |
|
except ValueError as e: |
|
logging.error(f"Config validation error: {e}") |
|
raise gr.Error(f"Config Error: {e}") |
|
except Exception as e: |
|
logging.error(f"Unexpected error loading config: {e}", exc_info=True) |
|
raise gr.Error(f"Config Error: Unexpected error. Check logs. Error: {e}") |
|
|
|
|
|
logging.info("Loading tokenizers...") |
|
try: |
|
smiles_tokenizer = Tokenizer.from_file(smiles_tokenizer_path) |
|
iupac_tokenizer = Tokenizer.from_file(iupac_tokenizer_path) |
|
logging.info("Tokenizers loaded.") |
|
|
|
|
|
pad_token = "<pad>" |
|
sos_token = "<sos>" |
|
eos_token = "<eos>" |
|
unk_token = "<unk>" |
|
issues = [] |
|
|
|
if smiles_tokenizer.token_to_id(pad_token) != config["pad_token_id"]: |
|
issues.append(f"SMILES PAD ID mismatch") |
|
if smiles_tokenizer.token_to_id(unk_token) is None: |
|
issues.append("SMILES UNK token not found") |
|
if iupac_tokenizer.token_to_id(pad_token) != config["pad_token_id"]: |
|
issues.append(f"IUPAC PAD ID mismatch") |
|
if iupac_tokenizer.token_to_id(sos_token) != config["bos_token_id"]: |
|
issues.append(f"IUPAC SOS ID mismatch") |
|
if iupac_tokenizer.token_to_id(eos_token) != config["eos_token_id"]: |
|
issues.append(f"IUPAC EOS ID mismatch") |
|
if iupac_tokenizer.token_to_id(unk_token) is None: |
|
issues.append("IUPAC UNK token not found") |
|
if issues: |
|
logging.warning("Tokenizer validation issues: " + "; ".join(issues)) |
|
|
|
except Exception as e: |
|
logging.error(f"Failed to load tokenizers: {e}", exc_info=True) |
|
raise gr.Error( |
|
f"Tokenizer Error: Could not load tokenizers. Check logs. Error: {e}" |
|
) |
|
|
|
|
|
logging.info("Loading model from checkpoint...") |
|
try: |
|
|
|
model = SmilesIupacLitModule.load_from_checkpoint( |
|
checkpoint_path, |
|
|
|
src_vocab_size=config["src_vocab_size"], |
|
tgt_vocab_size=config["tgt_vocab_size"], |
|
|
|
|
|
**config, |
|
map_location=device, |
|
strict=True, |
|
) |
|
|
|
model.to(device) |
|
model.eval() |
|
model.freeze() |
|
logging.info( |
|
f"Model loaded successfully from {checkpoint_path}, set to eval mode, frozen, and moved to device '{device}'." |
|
) |
|
|
|
except FileNotFoundError: |
|
logging.error(f"Checkpoint file not found: {checkpoint_path}") |
|
raise gr.Error( |
|
f"Model Error: Checkpoint file '{CHECKPOINT_FILENAME}' not found." |
|
) |
|
except Exception as e: |
|
logging.error( |
|
f"Error loading model checkpoint {checkpoint_path}: {e}", exc_info=True |
|
) |
|
if "size mismatch" in str(e): |
|
error_detail = f"Potential size mismatch. Check vocab sizes in config.json (src={config.get('src_vocab_size')}, tgt={config.get('tgt_vocab_size')}) vs checkpoint." |
|
logging.error(error_detail) |
|
raise gr.Error(f"Model Error: {error_detail} Original error: {e}") |
|
elif "memory" in str(e).lower(): |
|
logging.warning("Potential OOM error during model loading.") |
|
gc.collect() |
|
torch.cuda.empty_cache() if device.type == "cuda" else None |
|
raise gr.Error( |
|
f"Model Error: OOM loading model. Check Space resources. Error: {e}" |
|
) |
|
else: |
|
raise gr.Error( |
|
f"Model Error: Failed to load checkpoint. Check logs. Error: {e}" |
|
) |
|
|
|
except gr.Error: |
|
raise |
|
except Exception as e: |
|
logging.error(f"Unexpected error during loading: {e}", exc_info=True) |
|
raise gr.Error( |
|
f"Initialization Error: Unexpected error. Check logs. Error: {e}" |
|
) |
|
|
|
|
|
|
|
def predict_iupac(smiles_string): |
|
""" |
|
Performs SMILES to IUPAC translation using the loaded model and greedy decoding. |
|
""" |
|
global model, smiles_tokenizer, iupac_tokenizer, device, config |
|
|
|
if not all([model, smiles_tokenizer, iupac_tokenizer, device, config]): |
|
error_msg = "Error: Model or tokenizers not loaded properly. App initialization might have failed. Check Space logs." |
|
logging.error(error_msg) |
|
return f"Error: {error_msg}" |
|
|
|
if not smiles_string or not smiles_string.strip(): |
|
error_msg = "Error: Please enter a valid SMILES string." |
|
return f"Error: {error_msg}" |
|
|
|
smiles_input = smiles_string.strip() |
|
|
|
try: |
|
|
|
sos_idx = config["bos_token_id"] |
|
eos_idx = config["eos_token_id"] |
|
pad_idx = config["pad_token_id"] |
|
gen_max_len = config["max_len"] |
|
|
|
predicted_name = translate( |
|
model=model, |
|
src_sentence=smiles_input, |
|
smiles_tokenizer=smiles_tokenizer, |
|
iupac_tokenizer=iupac_tokenizer, |
|
device=device, |
|
max_len=gen_max_len, |
|
sos_idx=sos_idx, |
|
eos_idx=eos_idx, |
|
pad_idx=pad_idx, |
|
) |
|
logging.info(f"Prediction returned: {predicted_name}") |
|
|
|
|
|
if "[Error]" in predicted_name: |
|
output_text = ( |
|
f"Input SMILES: {smiles_input}\n\nPrediction Failed: {predicted_name}" |
|
) |
|
elif not predicted_name: |
|
output_text = f"Input SMILES: {smiles_input}\n\nNo prediction generated (decoding might have failed)." |
|
else: |
|
output_text = ( |
|
f"Input SMILES: {smiles_input}\n\n" |
|
f"Predicted IUPAC Name (Greedy Decode):\n" |
|
f"{predicted_name}" |
|
) |
|
return output_text |
|
|
|
except RuntimeError as e: |
|
logging.error(f"Runtime error during translation: {e}", exc_info=True) |
|
return f"Error: {error_msg}" |
|
|
|
except Exception as e: |
|
logging.error(f"Unexpected error during translation: {e}", exc_info=True) |
|
error_msg = f"Unexpected Error during translation: {e}" |
|
return f"Error: {error_msg}" |
|
|
|
|
|
|
|
try: |
|
load_model_and_tokenizers() |
|
except gr.Error as ge: |
|
logging.error(f"Gradio Initialization Error: {ge}") |
|
pass |
|
except Exception as e: |
|
logging.error(f"Critical error during initial model loading: {e}", exc_info=True) |
|
|
|
|
|
|
|
|
|
title = "SMILES to IUPAC Name Translator (Greedy Decoding)" |
|
description = f""" |
|
Enter a SMILES string to translate it into its IUPAC chemical name using a Transformer model ({MODEL_REPO_ID}) trained via PyTorch Lightning. |
|
Translation uses **greedy decoding** (picks the most likely next word at each step). |
|
**Note:** Model loaded on **{str(device).upper() if device else "N/A"}**. Performance may vary. Check `config.json` in the repo for model details. |
|
""" |
|
|
|
|
|
smiles_input = gr.Textbox( |
|
label="SMILES String", |
|
placeholder="Enter SMILES string here (e.g., CCO for Ethanol)", |
|
lines=1, |
|
) |
|
smiles_input = CanonSmiles(smiles_input) |
|
|
|
output_text = gr.Textbox( |
|
label="Predicted IUPAC Name", |
|
lines=3, |
|
show_copy_button=True, |
|
) |
|
|
|
|
|
iface = gr.Interface( |
|
fn=predict_iupac, |
|
inputs=smiles_input, |
|
outputs=output_text, |
|
title=title, |
|
description=description, |
|
allow_flagging="never", |
|
theme=gr.themes.Soft(primary_hue="blue", secondary_hue="cyan"), |
|
article=""" |
|
**Limitations:** Translation quality depends heavily on the model size, training data, and the complexity of the SMILES input. |
|
Very long or unusual SMILES strings may result in errors, timeouts, or inaccurate translations. Greedy decoding can sometimes get stuck in repetitive loops or produce suboptimal results compared to beam search. |
|
""", |
|
) |
|
|
|
|
|
if __name__ == "__main__": |
|
iface.launch(share=True) |
|
|