karimouda's picture
fixes
a92eec7
raw
history blame
9.86 kB
import glob
import json
import math
import os
from dataclasses import dataclass
import dateutil
import numpy as np
from src.display.formatting import make_clickable_model, make_contamination_red
from src.display.utils import AutoEvalColumn, EvalDimensions#, ModelType, Precision, WeightType
from src.submission.check_validity import is_model_on_hub
@dataclass
class EvalResult:
"""Represents one full evaluation. Built from a combination of the result and request file for a given run.
"""
eval_name: str # org_model_precision (uid)
full_model: str # org/model (path on hub)
org: str
model: str
#revision: str # commit hash, "" if main
results: dict
#precision: Precision = Precision.Unknown
#model_type: ModelType = ModelType.Unknown # Pretrained, fine tuned, ...
model_source: str = "" # HF, API, ...
model_category: str = "" #Nano, Small, Medium, Large
#weight_type: WeightType = WeightType.Original # Original or Adapter
#architecture: str = "Unknown"
#license: str = "?"
#likes: int = 0
#num_params: int = 0
date: str = "" # submission date of request file
still_on_hub: bool = False
@classmethod
def init_from_json_file(self, json_filepath):
"""Inits the result from the specific model result file"""
with open(json_filepath) as fp:
data = json.load(fp)
config = data.get("config")
# Precision
#precision = Precision.from_str(config.get("model_dtype"))
# Get model and org
org_and_model = config.get("model", config.get("model_args", None))
print("******* org_and_model **********", config)
org_and_model = org_and_model.split("/", 1)
if len(org_and_model) == 1:
org = None
model = org_and_model[0]
result_key = f"{model}"#_{precision.value.name}
else:
org = org_and_model[0]
model = org_and_model[1]
result_key = f"{org}_{model}"#_{precision.value.name}
full_model = "/".join(org_and_model)
still_on_hub, _, model_config = is_model_on_hub(
full_model, config.get("model_sha", "main"), trust_remote_code=True, test_tokenizer=False
)
"""
architecture = "?"
if model_config is not None:
architectures = getattr(model_config, "architectures", None)
if architectures:
architecture = ";".join(architectures)
"""
# Extract results available in this file (some results are split in several files)
results = {}
results_obj = data.get("results")
print(results_obj)
results["average_score"] = results_obj.get("average_score")
results["speed"] = results_obj.get("speed")
results["contamination_score"] = results_obj.get("contamination_score")
scores_by_category = results_obj.get("scores_by_category")
for category_obj in scores_by_category:
category = category_obj["category"]
average_score = category_obj["average_score"]
results[category.lower()] = average_score
return self(
eval_name=result_key,
full_model=full_model,
org=org,
model=model,
model_source=config.get("model_source", ""),
model_category=config.get("model_category", ""),
#num_params=config.get("params", 0),
#license=config.get("license", "?"),
#likes=config.get("likes", -1),
results=results,
#precision=precision,
#revision= config.get("model_sha", ""),
still_on_hub=still_on_hub,
#architecture=architecture
)
def update_with_request_file(self, requests_path):
"""Finds the relevant request file for the current model and updates info with it"""
request_file = get_request_file_for_model(requests_path, self.full_model) #, self.precision.value.name
try:
with open(request_file, "r") as f:
request = json.load(f)
#self.model_type = ModelType.from_str(request.get("model_type", ""))
#self.weight_type = WeightType[request.get("weight_type", "Original")]
#self.license = request.get("license", "?")
#self.likes = request.get("likes", 0)
#self.params = request.get("params", 0)
self.date = request.get("submitted_time", "")
except Exception:
print(f"Could not find request file for {self.org}/{self.model}") # with precision {self.precision.value.name}
def to_dict(self):
"""Converts the Eval Result to a dict compatible with our dataframe display"""
average_score = self.results["average_score"]
data_dict = {
"eval_name": self.eval_name, # not a column, just a save name,
#AutoEvalColumn.precision.name: self.precision.value.name,
AutoEvalColumn.model_source.name: self.model_source,
AutoEvalColumn.model_category.name: self.model_category,
#AutoEvalColumn.model_type_symbol.name: self.model_type.value.symbol,
#AutoEvalColumn.weight_type.name: self.weight_type.value.name,
#AutoEvalColumn.architecture.name: self.architecture,
AutoEvalColumn.model.name: make_clickable_model(self.full_model),
#AutoEvalColumn.revision.name: self.revision,
AutoEvalColumn.average_score.name: average_score,
#AutoEvalColumn.license.name: self.license,
#AutoEvalColumn.likes.name: self.likes,
#AutoEvalColumn.params.name: self.num_params,
#AutoEvalColumn.still_on_hub.name: self.still_on_hub,
}
for eval_dim in EvalDimensions:
dimension_name = eval_dim.value.col_name
try:
dimension_value = self.results[eval_dim.value.metric]
except KeyError:
dimension_value = 0
if dimension_name == "Contamination Score":
dimension_value = 0 if dimension_value < 0 else round(dimension_value,2)
data_dict[dimension_name] = dimension_value
return data_dict
def get_request_file_for_model(requests_path, model_name): #,precision
"""Selects the correct request file for a given model. Only keeps runs tagged as FINISHED"""
request_files = os.path.join(
requests_path,
f"{model_name}_eval_request_*.json",
)
request_files = glob.glob(request_files)
# Select correct request file (precision)
request_file = ""
request_files = sorted(request_files, reverse=True)
for tmp_request_file in request_files:
with open(tmp_request_file, "r") as f:
req_content = json.load(f)
if (
req_content["status"] in ["FINISHED"]
#and req_content["precision"] == precision.split(".")[-1]
):
request_file = tmp_request_file
return request_file
def get_raw_eval_results(results_path: str, requests_path: str) -> list[EvalResult]:
"""From the path of the results folder root, extract all needed info for results"""
model_result_filepaths = []
for root, _, files in os.walk(results_path):
print("HERE",files)
# We should only have json files in model results ##we allow HTML files
#if len(files) == 0 or any([not f.endswith(".json") for f in files]):
# continue
files = [f for f in files if f.endswith(".json")]
# Sort the files by date
try:
files.sort(key=lambda x: x.removesuffix(".json").removeprefix("results_")[:-7])
except dateutil.parser._parser.ParserError as e:
print("Error",e)
files = [files[-1]]
print(files)
for file in files:
model_result_filepaths.append(os.path.join(root, file))
eval_results = {}
for model_result_filepath in model_result_filepaths:
# Creation of result
eval_result = EvalResult.init_from_json_file(model_result_filepath)
eval_result.update_with_request_file(requests_path)
# Store results of same eval together
eval_name = eval_result.eval_name
if eval_name in eval_results.keys():
eval_results[eval_name].results.update({k: v for k, v in eval_result.results.items() if v is not None})
else:
eval_results[eval_name] = eval_result
results = []
#print(eval_results.values())
for v in eval_results.values():
try:
print(v.to_dict())
v.to_dict() # we test if the dict version is complete
results.append(v)
except KeyError: # not all eval values present
print("Key error in eval result, skipping")
print(v)
print(v.to_dict())
continue
print(results)
return results
def get_model_answers_html_file(results_path, model_name):
model_org,model_name_only = model_name.split("/")
model_answers_prefix = f"{results_path}/{model_org}/"
html_file_content = "EMPTY"
download_file_path = "https://huggingface.co/spaces/silma-ai/Arabic-LLM-Broad-Leaderboard/raw/main/"
for root, _, files in os.walk(model_answers_prefix):
for file_name in files:
if file_name.startswith(f"{model_name_only}_abb_benchmark_answers_"):
file_path = os.path.join(root, file_name)
with open(file_path, "r") as f:
html_file_content = f.read()
download_file_path = download_file_path + file_path.replace("./", "")
break
return html_file_content,download_file_path