Spaces:
Running
on
Zero
Running
on
Zero
import random | |
import numpy as np | |
import torch | |
from transformers import AutoTokenizer, T5ForConditionalGeneration | |
import spaces | |
# Available models | |
MODEL_OPTIONS_INSTRUCT_TUNED = { | |
"EXT1 Model": "alakxender/flan-t5-base-alpaca-dv-ext" | |
} | |
# Cache for loaded models/tokenizers | |
MODEL_CACHE = {} | |
def get_model_and_tokenizer(model_dir): | |
if model_dir not in MODEL_CACHE: | |
print(f"Loading model: {model_dir}") | |
tokenizer = AutoTokenizer.from_pretrained(model_dir) | |
model = T5ForConditionalGeneration.from_pretrained(model_dir) | |
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
print(f"Moving model to device: {device}") | |
model.to(device) | |
MODEL_CACHE[model_dir] = (tokenizer, model) | |
return MODEL_CACHE[model_dir] | |
def generate_response_tuned(instruction, input_text, seed, model_choice,max_tokens, temperature, num_beams): | |
random.seed(seed) | |
np.random.seed(seed) | |
torch.manual_seed(seed) | |
if torch.cuda.is_available(): | |
torch.cuda.manual_seed_all(seed) | |
model_dir = MODEL_OPTIONS_INSTRUCT_TUNED[model_choice] | |
tokenizer, model = get_model_and_tokenizer(model_dir) | |
combined_input = f"{instruction.strip()} {input_text.strip()}" if input_text else instruction.strip() | |
inputs = tokenizer( | |
combined_input, | |
return_tensors="pt", | |
truncation=True | |
) | |
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
inputs = {k: v.to(device) for k, v in inputs.items()} | |
gen_kwargs = { | |
**inputs, | |
"max_length":max_tokens, | |
"num_beams":num_beams, | |
"do_sample":(temperature > 0.0), | |
"temperature":temperature, | |
"repetition_penalty":1.2, | |
"top_p":0.95, | |
"top_k":50 | |
} | |
with torch.no_grad(): | |
outputs = model.generate(**gen_kwargs) | |
decoded_output = tokenizer.decode(outputs[0], skip_special_tokens=True) | |
# Trim to the last period | |
if '.' in decoded_output: | |
last_period = decoded_output.rfind('.') | |
decoded_output = decoded_output[:last_period+1] | |
decoded_output = ' '.join(decoded_output.split()) | |
return decoded_output |