Spaces:
Running
Running
import random | |
import numpy as np | |
from .graph_nn import form_data,GNN_prediction | |
from ..data_processing.utils import savejson,loadjson,savepkl,loadpkl | |
import pandas as pd | |
import json | |
import re | |
import yaml | |
from sklearn.preprocessing import MinMaxScaler | |
import torch | |
device = "cuda" if torch.cuda.is_available() else "cpu" | |
class graph_router_prediction: | |
def __init__(self, router_data_train,router_data_test,llm_path,llm_embedding_path,config): | |
self.config = config | |
self.router_data_train, self.router_data_test=router_data_train,router_data_test | |
self.data_df = pd.concat([self.router_data_train, self.router_data_test], ignore_index=True) | |
self.llm_description = loadjson(llm_path) | |
self.llm_names = list(self.llm_description.keys()) | |
self.num_llms=len(self.llm_names) | |
self.num_query=int(len(self.data_df)/self.num_llms) | |
self.num_query_train = int(len(self.router_data_train) / self.num_llms) | |
self.num_query_test = int(len(self.router_data_test) / self.num_llms) | |
self.num_task=config['num_task'] | |
self.llm_description_embedding=loadpkl(llm_embedding_path) | |
self.prepare_data_for_GNN() | |
self.split_data() | |
self.form_data = form_data(device) | |
self.query_dim = self.query_embedding_list.shape[1] | |
self.llm_dim = self.llm_description_embedding.shape[1] | |
self.GNN_predict = GNN_prediction(query_feature_dim=self.query_dim, llm_feature_dim=self.llm_dim, | |
hidden_features_size=self.config['embedding_dim'], in_edges_size=self.config['edge_dim'],config=self.config,device=device) | |
self.test_GNN() | |
def split_data(self): | |
split_ratio = self.config['split_ratio'] | |
# Calculate the size for train,val,test | |
train_size = int(self.num_query_train * split_ratio[0]) | |
val_size = int(self.num_query_train * split_ratio[1]) | |
test_size = self.num_query_test | |
all_query_indices = np.arange(self.num_query_train) | |
np.random.shuffle(all_query_indices) | |
train_query_indices = all_query_indices[:train_size] | |
val_query_indices = all_query_indices[train_size:train_size + val_size] | |
# Generate indices | |
train_idx = [] | |
validate_idx = [] | |
for query_idx in train_query_indices: | |
start_idx = query_idx * self.num_llms | |
end_idx = start_idx + self.num_llms | |
train_idx.extend(range(start_idx, end_idx)) | |
for query_idx in val_query_indices: | |
start_idx = query_idx * self.num_llms | |
end_idx = start_idx + self.num_llms | |
validate_idx.extend(range(start_idx, end_idx)) | |
test_idx=[range(self.num_llms*self.num_query-self.num_llms*test_size,self.num_llms*self.num_query)] | |
self.combined_edge=np.concatenate((self.cost_list.reshape(-1,1),self.effect_list.reshape(-1,1)),axis=1) | |
self.scenario=self.config['scenario'] | |
if self.scenario== "Performance First": | |
self.effect_list = 1.0 * self.effect_list - 0.0 * self.cost_list | |
elif self.scenario== "Balance": | |
self.effect_list = 0.5 * self.effect_list - 0.5 * self.cost_list | |
else: | |
self.effect_list = 0.2 * self.effect_list - 0.8 * self.cost_list | |
effect_re=self.effect_list.reshape(-1,self.num_llms) | |
self.label=np.eye(self.num_llms)[np.argmax(effect_re, axis=1)].reshape(-1,1) | |
self.edge_org_id=[num for num in range(self.num_query) for _ in range(self.num_llms)] | |
self.edge_des_id=list(range(self.edge_org_id[0],self.edge_org_id[0]+self.num_llms))*self.num_query | |
self.mask_train =torch.zeros(len(self.edge_org_id)) | |
self.mask_train[train_idx]=1 | |
self.mask_validate = torch.zeros(len(self.edge_org_id)) | |
self.mask_validate[validate_idx] = 1 | |
self.mask_test = torch.zeros(len(self.edge_org_id)) | |
self.mask_test[test_idx] = 1 | |
def check_tensor_values(self): | |
def check_array(name, array): | |
array = np.array(array) # Ensure it's a numpy array | |
has_nan = np.isnan(array).any() | |
out_of_bounds = ((array < 0) | (array > 1)).any() | |
if has_nan or out_of_bounds: | |
print(f"[Warning] '{name}' has invalid values:") | |
if has_nan: | |
print(f" - Contains NaN values.") | |
if out_of_bounds: | |
min_val = np.min(array) | |
max_val = np.max(array) | |
print(f" - Values outside [0, 1] range. Min: {min_val}, Max: {max_val}") | |
else: | |
print(f"[OK] '{name}' is valid (all values in [0, 1] and no NaNs).") | |
check_array("query_embedding_list", self.query_embedding_list) | |
check_array("task_embedding_list", self.task_embedding_list) | |
check_array("effect_list", self.effect_list) | |
check_array("cost_list", self.cost_list) | |
def prepare_data_for_GNN(self): | |
unique_index_list = list(range(0, len(self.data_df), self.num_llms)) | |
query_embedding_list_raw = self.data_df['query_embedding'].tolist() | |
task_embedding_list_raw = self.data_df['task_description_embedding'].tolist() | |
self.query_embedding_list = [] | |
self.task_embedding_list = [] | |
def parse_embedding(tensor_str): | |
if pd.isna(tensor_str) or not isinstance(tensor_str, str): | |
return [] | |
tensor_str = tensor_str.replace('tensor(', '').replace(')', '') | |
try: | |
values = json.loads(tensor_str) | |
except: | |
numbers = re.findall(r'[-+]?\d*\.\d+|\d+', tensor_str) | |
values = [float(x) for x in numbers] | |
return np.nan_to_num(values, nan=0.0).tolist() | |
# Extract and clean query embeddings | |
for i in range(0, len(query_embedding_list_raw), self.num_llms): | |
embedding = parse_embedding(query_embedding_list_raw[i]) | |
self.query_embedding_list.append(embedding) | |
# Extract and clean task embeddings | |
for i in range(0, len(task_embedding_list_raw), self.num_llms): | |
embedding = parse_embedding(task_embedding_list_raw[i]) | |
self.task_embedding_list.append(embedding) | |
# import pdb; pdb.set_trace() | |
# Convert to numpy arrays | |
print("\nAttempting to convert to numpy arrays...") | |
print(f"Query embedding list lengths: {set(len(x) for x in self.query_embedding_list)}") | |
print(f"Task embedding list lengths: {set(len(x) for x in self.task_embedding_list)}") | |
self.query_embedding_list = np.array(self.query_embedding_list) | |
self.task_embedding_list = np.array(self.task_embedding_list) | |
# Normalize embeddings to [0, 1] | |
def normalize_array(arr): | |
scaler = MinMaxScaler() | |
return scaler.fit_transform(arr) | |
self.query_embedding_list = normalize_array(self.query_embedding_list) | |
self.task_embedding_list = normalize_array(self.task_embedding_list) | |
# Process and normalize effect and cost lists | |
effect_raw = np.nan_to_num(self.data_df['normalized_performance'].tolist(), nan=0.0) | |
cost_raw = np.nan_to_num(self.data_df['normalized_cost'].tolist(), nan=0.0) | |
self.effect_list=effect_raw.flatten() | |
self.cost_list =cost_raw.flatten() | |
self.check_tensor_values() | |
def test_GNN(self): | |
self.data_for_test = self.form_data.formulation(task_id=self.task_embedding_list, | |
query_feature=self.query_embedding_list, | |
llm_feature=self.llm_description_embedding, | |
org_node=self.edge_org_id, | |
des_node=self.edge_des_id, | |
edge_feature=self.effect_list, edge_mask=self.mask_test, | |
label=self.label, combined_edge=self.combined_edge, | |
train_mask=self.mask_train, valide_mask=self.mask_validate, | |
test_mask=self.mask_test) | |
best_llm = self.GNN_predict.test(data=self.data_for_test,model_path=self.config['model_path'],llm_names=self.llm_names) | |
return best_llm | |