cmulgy's picture
add demo
41b743c
import json
import re
import string
import pickle
from collections import Counter
from typing import List, Optional, Tuple
import numpy as np
import torch
from sentence_transformers import SentenceTransformer
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler
from bert_score import score
import litellm
# Initialize the sentence transformer model
model = SentenceTransformer('all-MiniLM-L6-v2')
# File I/O functions
def loadjson(filename: str) -> dict:
"""
Load data from a JSON file.
Args:
filename: Path to the JSON file
Returns:
Dictionary containing the loaded JSON data
"""
with open(filename, 'r', encoding='utf-8') as file:
data = json.load(file)
return data
def savejson(data: dict, filename: str) -> None:
"""
Save data to a JSON file.
Args:
data: Dictionary to save
filename: Path where the JSON file will be saved
"""
with open(filename, 'w') as json_file:
json.dump(data, json_file, indent=4)
def loadpkl(filename: str) -> any:
"""
Load data from a pickle file.
Args:
filename: Path to the pickle file
Returns:
The unpickled object
"""
with open(filename, 'rb') as file:
data = pickle.load(file)
return data
def savepkl(data: any, filename: str) -> None:
"""
Save data to a pickle file.
Args:
data: Object to save
filename: Path where the pickle file will be saved
"""
with open(filename, 'wb') as pkl_file:
pickle.dump(data, pkl_file)
# Text normalization and evaluation functions
def normalize_answer(s: str, normal_method: str = "") -> str:
"""
Normalize text for evaluation.
Args:
s: String to normalize
normal_method: Method for normalization ("mc" for multiple choice, "" for standard)
Returns:
Normalized string
"""
def remove_articles(text):
return re.sub(r'\b(a|an|the)\b', ' ', text)
def white_space_fix(text):
return ' '.join(text.split())
def remove_punc(text):
exclude = set(string.punctuation)
return ''.join(ch for ch in text if ch not in exclude)
def lower(text):
return text.lower()
def mc_remove(text):
a1 = re.findall('\([a-zA-Z]\)', text)
if len(a1) == 0:
return ""
return re.findall('\([a-zA-Z]\)', text)[-1]
if normal_method == "mc":
return mc_remove(s)
return white_space_fix(remove_articles(remove_punc(lower(s))))
def f1_score(prediction: str, ground_truth: str) -> Tuple[float, float, float]:
"""
Calculate F1 score between prediction and ground truth.
Args:
prediction: Predicted text
ground_truth: Ground truth text
Returns:
Tuple of (f1, precision, recall)
"""
normalized_prediction = normalize_answer(prediction)
normalized_ground_truth = normalize_answer(ground_truth)
ZERO_METRIC = (0, 0, 0)
if normalized_prediction in ['yes', 'no', 'noanswer'] and normalized_prediction != normalized_ground_truth:
return ZERO_METRIC
if normalized_ground_truth in ['yes', 'no', 'noanswer'] and normalized_prediction != normalized_ground_truth:
return ZERO_METRIC
prediction_tokens = normalized_prediction.split()
ground_truth_tokens = normalized_ground_truth.split()
common = Counter(prediction_tokens) & Counter(ground_truth_tokens)
num_same = sum(common.values())
if num_same == 0:
return ZERO_METRIC
precision = 1.0 * num_same / len(prediction_tokens)
recall = 1.0 * num_same / len(ground_truth_tokens)
f1 = (2 * precision * recall) / (precision + recall)
return f1, precision, recall
def exact_match_score(prediction: str, ground_truth: str, normal_method: str = "") -> bool:
"""
Check if prediction exactly matches ground truth after normalization.
Args:
prediction: Predicted text
ground_truth: Ground truth text
normal_method: Method for normalization
Returns:
True if exact match, False otherwise
"""
return (normalize_answer(prediction, normal_method=normal_method) ==
normalize_answer(ground_truth, normal_method=normal_method))
def get_bert_score(generate_response: List[str], ground_truth: List[str]) -> float:
"""
Calculate BERT score between generated responses and ground truths.
Args:
generate_response: List of generated responses
ground_truth: List of ground truth texts
Returns:
Average BERT score (F1)
"""
F_l = []
for inter in range(len(generate_response)):
generation = generate_response[inter]
gt = ground_truth[inter]
P, R, F = score([generation], [gt], lang="en", verbose=True)
F_l.append(F.mean().numpy().reshape(1)[0])
return np.array(F_l).mean()
# Embedding and dimensionality reduction
def reduce_embedding_dim(embed: np.ndarray, dim: int = 50) -> np.ndarray:
"""
Reduce dimensionality of embeddings using PCA.
Args:
embed: Embedding vectors
dim: Target dimension
Returns:
Reduced embeddings
"""
pca = PCA(n_components=dim)
reduced_embeddings = pca.fit_transform(embed)
return reduced_embeddings
def get_embedding(instructions: List[str]) -> np.ndarray:
"""
Get embeddings for a list of texts and optionally reduce dimensions.
Args:
instructions: List of texts to embed
dim: Target dimension for embeddings
Returns:
Numpy array of embeddings
"""
emb_list = model.encode(instructions)
return emb_list
# LLM prompting
def model_prompting(
llm_model: str,
prompt: str,
return_num: Optional[int] = 1,
max_token_num: Optional[int] = 512,
temperature: Optional[float] = 0.0,
top_p: Optional[float] = None,
stream: Optional[bool] = None,
) -> str:
"""
Get a response from an LLM model using LiteLLM.
Args:
llm_model: Name of the model to use
prompt: Input prompt text
return_num: Number of completions to generate
max_token_num: Maximum number of tokens to generate
temperature: Sampling temperature
top_p: Top-p sampling parameter
stream: Whether to stream the response
Returns:
Generated text response
"""
completion = litellm.completion(
model=llm_model,
messages=[{'role': 'user', 'content': prompt}],
max_tokens=max_token_num,
n=return_num,
top_p=top_p,
temperature=temperature,
stream=stream,
)
content = completion.choices[0].message.content
return content