import json import re import string import pickle from collections import Counter from typing import List, Optional, Tuple import numpy as np import torch from sentence_transformers import SentenceTransformer from sklearn.decomposition import PCA from sklearn.preprocessing import StandardScaler from bert_score import score import litellm # Initialize the sentence transformer model model = SentenceTransformer('all-MiniLM-L6-v2') # File I/O functions def loadjson(filename: str) -> dict: """ Load data from a JSON file. Args: filename: Path to the JSON file Returns: Dictionary containing the loaded JSON data """ with open(filename, 'r', encoding='utf-8') as file: data = json.load(file) return data def savejson(data: dict, filename: str) -> None: """ Save data to a JSON file. Args: data: Dictionary to save filename: Path where the JSON file will be saved """ with open(filename, 'w') as json_file: json.dump(data, json_file, indent=4) def loadpkl(filename: str) -> any: """ Load data from a pickle file. Args: filename: Path to the pickle file Returns: The unpickled object """ with open(filename, 'rb') as file: data = pickle.load(file) return data def savepkl(data: any, filename: str) -> None: """ Save data to a pickle file. Args: data: Object to save filename: Path where the pickle file will be saved """ with open(filename, 'wb') as pkl_file: pickle.dump(data, pkl_file) # Text normalization and evaluation functions def normalize_answer(s: str, normal_method: str = "") -> str: """ Normalize text for evaluation. Args: s: String to normalize normal_method: Method for normalization ("mc" for multiple choice, "" for standard) Returns: Normalized string """ def remove_articles(text): return re.sub(r'\b(a|an|the)\b', ' ', text) def white_space_fix(text): return ' '.join(text.split()) def remove_punc(text): exclude = set(string.punctuation) return ''.join(ch for ch in text if ch not in exclude) def lower(text): return text.lower() def mc_remove(text): a1 = re.findall('\([a-zA-Z]\)', text) if len(a1) == 0: return "" return re.findall('\([a-zA-Z]\)', text)[-1] if normal_method == "mc": return mc_remove(s) return white_space_fix(remove_articles(remove_punc(lower(s)))) def f1_score(prediction: str, ground_truth: str) -> Tuple[float, float, float]: """ Calculate F1 score between prediction and ground truth. Args: prediction: Predicted text ground_truth: Ground truth text Returns: Tuple of (f1, precision, recall) """ normalized_prediction = normalize_answer(prediction) normalized_ground_truth = normalize_answer(ground_truth) ZERO_METRIC = (0, 0, 0) if normalized_prediction in ['yes', 'no', 'noanswer'] and normalized_prediction != normalized_ground_truth: return ZERO_METRIC if normalized_ground_truth in ['yes', 'no', 'noanswer'] and normalized_prediction != normalized_ground_truth: return ZERO_METRIC prediction_tokens = normalized_prediction.split() ground_truth_tokens = normalized_ground_truth.split() common = Counter(prediction_tokens) & Counter(ground_truth_tokens) num_same = sum(common.values()) if num_same == 0: return ZERO_METRIC precision = 1.0 * num_same / len(prediction_tokens) recall = 1.0 * num_same / len(ground_truth_tokens) f1 = (2 * precision * recall) / (precision + recall) return f1, precision, recall def exact_match_score(prediction: str, ground_truth: str, normal_method: str = "") -> bool: """ Check if prediction exactly matches ground truth after normalization. Args: prediction: Predicted text ground_truth: Ground truth text normal_method: Method for normalization Returns: True if exact match, False otherwise """ return (normalize_answer(prediction, normal_method=normal_method) == normalize_answer(ground_truth, normal_method=normal_method)) def get_bert_score(generate_response: List[str], ground_truth: List[str]) -> float: """ Calculate BERT score between generated responses and ground truths. Args: generate_response: List of generated responses ground_truth: List of ground truth texts Returns: Average BERT score (F1) """ F_l = [] for inter in range(len(generate_response)): generation = generate_response[inter] gt = ground_truth[inter] P, R, F = score([generation], [gt], lang="en", verbose=True) F_l.append(F.mean().numpy().reshape(1)[0]) return np.array(F_l).mean() # Embedding and dimensionality reduction def reduce_embedding_dim(embed: np.ndarray, dim: int = 50) -> np.ndarray: """ Reduce dimensionality of embeddings using PCA. Args: embed: Embedding vectors dim: Target dimension Returns: Reduced embeddings """ pca = PCA(n_components=dim) reduced_embeddings = pca.fit_transform(embed) return reduced_embeddings def get_embedding(instructions: List[str]) -> np.ndarray: """ Get embeddings for a list of texts and optionally reduce dimensions. Args: instructions: List of texts to embed dim: Target dimension for embeddings Returns: Numpy array of embeddings """ emb_list = model.encode(instructions) return emb_list # LLM prompting def model_prompting( llm_model: str, prompt: str, return_num: Optional[int] = 1, max_token_num: Optional[int] = 512, temperature: Optional[float] = 0.0, top_p: Optional[float] = None, stream: Optional[bool] = None, ) -> str: """ Get a response from an LLM model using LiteLLM. Args: llm_model: Name of the model to use prompt: Input prompt text return_num: Number of completions to generate max_token_num: Maximum number of tokens to generate temperature: Sampling temperature top_p: Top-p sampling parameter stream: Whether to stream the response Returns: Generated text response """ completion = litellm.completion( model=llm_model, messages=[{'role': 'user', 'content': prompt}], max_tokens=max_token_num, n=return_num, top_p=top_p, temperature=temperature, stream=stream, ) content = completion.choices[0].message.content return content