Spaces:
Running
Running
import json | |
import os | |
import io | |
from datetime import datetime, timezone | |
import hmac | |
import hashlib | |
import time | |
import requests | |
import pandas as pd | |
from colorama import Fore | |
from huggingface_hub import HfApi, snapshot_download | |
import os | |
import sys | |
current_script_path = os.path.abspath(__file__) | |
src_directory = os.path.join(os.path.dirname(current_script_path), '..', '..') | |
sys.path.append(src_directory) | |
# print(sys.path) | |
from src.display.utils import EVAL_COLS,BENCHMARK_COLS,COLS | |
from src.envs import API, EVAL_REQUESTS_PATH, DYNAMIC_INFO_REPO, DYNAMIC_INFO_FILE_PATH, DYNAMIC_INFO_PATH, EVAL_RESULTS_PATH, TOKEN, IS_PUBLIC, QUEUE_REPO, REPO_ID, RESULTS_REPO | |
status_mapping = { | |
'P': 'PENDING', | |
'R': 'RUNNING', | |
'S': 'FINISHED', | |
'F': 'FAILED', | |
'C': 'CANCELLED' | |
} | |
dataset_metric_mapping = { | |
'ChartQA': ('accuracy','acc'), | |
'CMMMU': ('accuracy','acc'), | |
'CMMU': ('accuracy','acc'), | |
'MMMU': ('accuracy','acc'), | |
'MMMU_Pro_standard': ('accuracy','acc'), | |
'MMMU_Pro_vision': ('accuracy','acc'), | |
'OCRBench': ('accuracy','acc'), | |
'MathVision': ('accuracy','acc'), | |
'CII-Bench': ('accuracy','acc'), | |
'Blink': ('accuracy','acc'), | |
} | |
failed_mapping = {} | |
# Example usage | |
# 生产环境 | |
#base_url = 'https://flageval.baai.ac.cn/api/hf' | |
#secret = b'M2L84t36MdzwS1Lb' | |
# 测试环境 | |
base_url = 'http://120.92.17.239:8080/api/hf' | |
secret = b'Dn29TMCxzvKBGMS8' | |
# model_id = 'Qwen/Qwen1.5-0.5B' | |
MAX_GPU_USAGE = 20 | |
LC_A800_QUEUE_ID = "877467e6-808b-487e-8a06-af8e96c83fa6" | |
A800_QUEUE_ID = "f016ff98-6ec8-4b1e-aed2-9a93753119b2" | |
A100_QUEUE_ID = "7f8cb309-295f-4f56-8159-f43f60f03f9c" | |
MAX_A800_UASGE = 1 | |
def get_gpu_number(params=0): | |
# 参数量除以 30 再向上取整,就算 params为0,最小为1 | |
# return -(-params // 35) | |
# return -(-params // 35) | |
# return -(-params // 35) | |
if params == 0: | |
return 0, A100_QUEUE_ID | |
if params < 9: | |
return 1, A100_QUEUE_ID | |
if params < 15: | |
return 2, A100_QUEUE_ID | |
elif params < 35: | |
return 4, A100_QUEUE_ID | |
elif params < 70: | |
return 3, LC_A800_QUEUE_ID | |
elif params < 100: | |
return 5, LC_A800_QUEUE_ID | |
elif params < 140: | |
return 6, LC_A800_QUEUE_ID | |
else: | |
return 8, LC_A800_QUEUE_ID | |
def generate_signature(secret, url, body): | |
timestamp = str(int(time.time())) | |
to_sign = f'{timestamp}{url}{body}' | |
h = hmac.new(secret, to_sign.encode('utf-8'), digestmod=hashlib.sha256) | |
sign = h.hexdigest() | |
return sign, timestamp | |
def submit_evaluation(base_url, secret, model_id, require_gpus=None, priority=None, gpus_queue_id=None, hf_user_id=None): | |
url = f'{base_url}/mm/batches' | |
data = {'modelId': model_id} | |
if require_gpus is not None: | |
data['requireGpus'] = require_gpus | |
if priority is not None: | |
data['priority'] = priority | |
if gpus_queue_id is not None: | |
data['gpus_queue_id'] = gpus_queue_id | |
if hf_user_id is not None: | |
data['hfUserId'] = hf_user_id | |
raw_body = json.dumps(data) | |
sign, timestamp = generate_signature(secret, url, raw_body) | |
headers = { | |
'Content-Type': 'application/json', | |
'X-Flageval-Sign': sign, | |
'X-Flageval-Timestamp': timestamp, | |
} | |
response = requests.post(url, data=raw_body, headers=headers) | |
print("submit_evaluation response",response) | |
response_data = response.json() | |
evaluation_info = { | |
'evaluationId': response_data.get('evaluationId'), | |
'eval_id': response_data.get('id') | |
} | |
return evaluation_info | |
def poll_evaluation_progress(base_url, secret, batch_id): | |
url = f'{base_url}/mm/batches/{int(batch_id)}' | |
sign, timestamp = generate_signature(secret, url, '') | |
headers = { | |
'X-Flageval-Sign': sign, | |
'X-Flageval-Timestamp': timestamp, | |
} | |
try: | |
response = requests.get(url, headers=headers) | |
response.raise_for_status() # 如果响应状态不是200,将引发HTTPError异常 | |
response_data = response.json() | |
evaluation_progress = { | |
'evaluationId': response_data.get('evaluationId'), | |
'eval_id': response_data.get('batchId'), | |
'status': response_data.get('status'), | |
'details': response_data.get('details', []) | |
} | |
return evaluation_progress | |
except requests.exceptions.RequestException as e: | |
print(f"请求错误: {e}") | |
except ValueError: | |
print(f"解析JSON时出错:{response}") | |
except Exception as e: | |
print(f"未知错误: {e}") | |
return {'status': '未执行成功'} | |
def update_gpu_usage(change): | |
global current_gpu_usage | |
current_gpu_usage += change | |
def get_evaluation_queue_df(save_path: str, cols: list) -> list[pd.DataFrame]: | |
all_evals = [] | |
for root, dirs, files in os.walk(save_path): | |
for file in files: | |
if file.endswith(".json"): | |
file_path = os.path.join(root, file) | |
with open(file_path) as fp: | |
data = json.load(fp) | |
# 确保所有列都存在,不存在的列初始化为 None | |
for col in cols: | |
if col not in data: | |
if col == "failed_status": | |
data[col] = 0 | |
else: | |
data[col] = None | |
all_evals.append(data) | |
# all_eval order by submited_time | |
all_evals = sorted(all_evals, key=lambda x: x['submitted_time']) | |
pending_list = [e for e in all_evals if e["status"] in ["PENDING", "RERUN"]] | |
pending_list = sorted(pending_list, key=lambda x: x['params']) | |
pending_list = sorted(pending_list, key=lambda x: x['failed_status']) | |
running_list = [e for e in all_evals if e["status"] == "RUNNING"] | |
finished_list = [e for e in all_evals if e["status"].startswith("FINISHED") or e["status"] == "PENDING_NEW_EVAL"] | |
df_pending = pd.DataFrame(pending_list) if pending_list else pd.DataFrame(columns=cols) | |
df_running = pd.DataFrame(running_list) if running_list else pd.DataFrame(columns=cols) | |
df_finished = pd.DataFrame(finished_list) if finished_list else pd.DataFrame(columns=cols) | |
return df_finished[cols], df_running[cols], df_pending[cols] | |
def update_evaluation_queue(model_name, nstatus, eval_id=None, flageval_id=None): | |
print("update_evaluation_queue", model_name, nstatus, eval_id) | |
fail_status = -1 | |
if len(nstatus.split("_")) == 2: | |
status, fail_status = nstatus.split("_")[0], int(nstatus.split("_")[1]) | |
else: | |
status = nstatus | |
user_name, model_path = model_name.split("/") if "/" in model_name else ("", model_name) | |
out_dir = f"{EVAL_REQUESTS_PATH}/{user_name}" | |
json_files = [f for f in os.listdir(out_dir) if f.startswith(model_path + '_') and f.endswith(".json")] | |
if not json_files: | |
print(f"No JSON file found for model {model_name}") | |
return | |
for json_file in json_files: | |
json_path = os.path.join(out_dir, json_file) | |
with open(json_path, "r") as f: | |
eval_entry = json.load(f) | |
print("befor update_evaluation_queue", eval_entry['status'], eval_entry['failed_status']) | |
eval_entry['status'] = status | |
if fail_status >=0: | |
eval_entry['failed_status'] = fail_status | |
if eval_id is not None: | |
eval_entry['eval_id'] = eval_id | |
if flageval_id is not None: | |
eval_entry['flageval_id'] = flageval_id | |
print("after update_evaluation_queue status change", eval_entry['status'], eval_entry['failed_status']) | |
with open(json_path, "w") as f: | |
# f.write(json.dumps(eval_entry)) | |
json.dump(eval_entry, f, indent=4) | |
api.upload_file( | |
path_or_fileobj=json_path, | |
path_in_repo=json_path.split(f"{EVAL_REQUESTS_PATH}/")[1], | |
repo_id=QUEUE_REPO, | |
repo_type="dataset", | |
commit_message=f"Update {model_name} status to {status}", | |
) | |
def save_and_upload_results(model_name, details): | |
converted_details = { | |
"config_general": { | |
"model_name": model_name, | |
"model_dtype": "float16", | |
"model_size": 0 | |
}, | |
"results": {}, | |
"versions": {}, | |
"config_tasks": {}, | |
"summary_tasks": {}, | |
"summary_general": {} | |
} | |
for detail in details: | |
dataset = detail['dataset'] | |
status = detail['status'] | |
# accuracy = detail['accuracy'] | |
if status == 'S' and dataset in dataset_metric_mapping.keys(): | |
# dataset_key = f"harness|{dataset}|5" | |
acc_key = dataset_metric_mapping[dataset][0] | |
acc = detail['accuracy'] if acc_key == 'accuracy' else detail['rawDetails'][acc_key] | |
converted_details['results'][dataset] = { | |
dataset_metric_mapping[dataset][1]: acc, | |
"acc_stderr": 0 | |
} | |
# 添加详细信息 | |
for metric, value in detail['rawDetails'].items(): | |
converted_details['results'][dataset][metric] = value | |
out_dir = f"{EVAL_RESULTS_PATH}/{model_name}" | |
os.makedirs(out_dir, exist_ok=True) | |
result_path = os.path.join(out_dir, f"results_{datetime.now().strftime('%Y-%m-%dT%H-%M-%S.%f')}.json") | |
with open(result_path, "w") as f: | |
json.dump(converted_details, f, indent=4) | |
api.upload_file( | |
path_or_fileobj=result_path, | |
path_in_repo=result_path.split(f"{EVAL_RESULTS_PATH}/")[1], | |
repo_id=RESULTS_REPO, | |
repo_type="dataset", | |
commit_message=f"Add results for {model_name}", | |
) | |
from tqdm.auto import tqdm | |
import io | |
class SilentTqdm(tqdm): | |
def __init__(self, *args, **kwargs): | |
kwargs['bar_format'] = '' | |
kwargs['leave'] = False | |
super().__init__(*args, **kwargs, file=io.StringIO()) | |
def update(self, n=1): | |
pass | |
def close(self): | |
pass | |
def snapshot_download_with_retry(max_retries, wait_time, *args, **kwargs): | |
for i in range(max_retries): | |
try: | |
return snapshot_download(*args, **kwargs) | |
except Exception as e: | |
if i < max_retries - 1: # i is zero indexed | |
print(f"Error occurred: {e}. Retrying in {wait_time} seconds...") | |
time.sleep(wait_time) | |
else: | |
print("Max retries reached. Raising exception.") | |
raise | |
api = HfApi() | |
print(EVAL_REQUESTS_PATH) | |
print(DYNAMIC_INFO_PATH) | |
print(EVAL_RESULTS_PATH) | |
prev_running_models = '' | |
while True: | |
snapshot_download_with_retry(5, 10, repo_id=QUEUE_REPO, local_dir=EVAL_REQUESTS_PATH, repo_type="dataset", tqdm_class=SilentTqdm, etag_timeout=30) | |
snapshot_download_with_retry(5, 10, repo_id=DYNAMIC_INFO_REPO, local_dir=DYNAMIC_INFO_PATH, repo_type="dataset", tqdm_class=SilentTqdm, etag_timeout=30) | |
snapshot_download_with_retry(5, 10, repo_id=RESULTS_REPO, local_dir=EVAL_RESULTS_PATH, repo_type="dataset", tqdm_class=SilentTqdm, etag_timeout=30) | |
( | |
finished_eval_queue_df, | |
running_eval_queue_df, | |
pending_eval_queue_df, | |
) = get_evaluation_queue_df(EVAL_REQUESTS_PATH, ['model','status','params','eval_id', 'failed_status']) | |
## pending list test | |
pending_list = [row for _,row in pending_eval_queue_df.iterrows()] | |
for pend in pending_list: | |
print("pending", pend) | |
# 根据正在运行的评测队列更新当前 GPU 使用情况 | |
current_gpu_usage = 0 | |
current_A800gpu_usage = 0 | |
for _, row in running_eval_queue_df.iterrows(): | |
print(get_gpu_number(row['params']), row['params']) | |
gpus_num, gpus_queue_id = get_gpu_number(row['params']) | |
current_gpu_usage += gpus_num | |
if gpus_queue_id == LC_A800_QUEUE_ID: | |
current_A800gpu_usage += 1 | |
# print(f'Current GPU usage: {current_gpu_usage}/{MAX_GPU_USAGE}') | |
running_models = ", ".join([row["model"] for _, row in running_eval_queue_df.iterrows()]) | |
if running_models != prev_running_models: | |
print(f'{datetime.now().strftime("%Y-%m-%d %H:%M:%S")} | GPU usage: {current_gpu_usage}/{MAX_GPU_USAGE} | Running models: {running_models}') | |
prev_running_models = running_models | |
print("current A800 GPU usage", current_A800gpu_usage) | |
# 只查询 pending_eval_queue_df 中的前5个待处理的评测 | |
if not pending_eval_queue_df.empty: | |
for i,row in pending_eval_queue_df.iterrows(): | |
#if i >= 3 : break | |
required_gpus, gpus_queue_id = get_gpu_number(row['params']) | |
if gpus_queue_id == LC_A800_QUEUE_ID: | |
if current_A800gpu_usage >= MAX_A800_UASGE: | |
print(current_A800gpu_usage >= MAX_A800_UASGE, row['model']) | |
continue | |
if "princeton-nlp/Llama-3-8B-ProLong-512k" in row['model']: | |
required_gpus += 1 | |
if current_gpu_usage + required_gpus <= MAX_GPU_USAGE: | |
#确认是否有重复提交 | |
if row['model'] in [row["model"] for _, row in running_eval_queue_df.iterrows()]: | |
priniit(f'{datetime.now().strftime("%Y-%m-%d %H:%M:%S")} | Evaluation {row["model"]} is already running') | |
update_evaluation_queue(row['model'], 'CANCELLED', evaluation_info['eval_id'], evaluation_info['evaluationId']) | |
continue | |
# 提交评测 | |
try: | |
evaluation_info = submit_evaluation(base_url, secret, row['model'], require_gpus=required_gpus,priority='high',gpus_queue_id=gpus_queue_id) | |
update_evaluation_queue(row['model'], 'RUNNING', evaluation_info['eval_id'], evaluation_info['evaluationId']) | |
update_gpu_usage(required_gpus) | |
print(f'{datetime.now().strftime("%Y-%m-%d %H:%M:%S")} | Submitted evaluation {row["model"]} with {required_gpus} GPUs, submit info: {evaluation_info}') | |
except Exception as e: | |
print(e) | |
continue | |
# 查询正在运行的评测状态 | |
for _, row in running_eval_queue_df.iterrows(): | |
progress = poll_evaluation_progress(base_url, secret, row['eval_id']) | |
if progress['status'] in ['S', 'F', 'C'] or progress['status'] == 'DI': | |
new_status = status_mapping.get(progress['status'], 'FINISHED') | |
update_evaluation_queue(row['model'], new_status) | |
gpus_num, gpus_queue_id = get_gpu_number(row['params']) | |
update_gpu_usage(-gpus_num) | |
if gpus_queue_id == LC_A800_QUEUE_ID: | |
current_A800gpu_usage -= 1 | |
print(f'{datetime.now().strftime("%Y-%m-%d %H:%M:%S")} | Evaluation {row["model"]} finished with status {progress["status"]}') | |
if new_status == 'FAILED': | |
print("failed_mapping0", failed_mapping) | |
if row['model'] in failed_mapping: | |
failed_mapping[row['model']] += 1 | |
else: | |
failed_mapping[row['model']] = 1 | |
print("failed_mapping add", failed_mapping, row['failed_status']) | |
if failed_mapping[row['model']] == 5: | |
del failed_mapping[row['model']] | |
update_evaluation_queue(row['model'], 'PENDING_'+str(int(row['failed_status']+1))) | |
else: | |
update_evaluation_queue(row['model'], 'PENDING') | |
print(f'{datetime.now().strftime("%Y-%m-%d %H:%M:%S")} |--------------- RePending {row["model"]} ------------ ') | |
elif new_status == 'FINISHED': | |
print(progress) | |
save_and_upload_results(row['model'], progress['details']) | |
time.sleep(300) # 调整队列检查间隔 | |