File size: 15,791 Bytes
3003ba0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
import json
import os
import io
from datetime import datetime, timezone
import hmac
import hashlib
import time
import requests
import pandas as pd

from colorama import Fore
from huggingface_hub import HfApi, snapshot_download
import os
import sys


current_script_path = os.path.abspath(__file__)
src_directory = os.path.join(os.path.dirname(current_script_path), '..', '..')
sys.path.append(src_directory)

# print(sys.path)
from src.display.utils import EVAL_COLS,BENCHMARK_COLS,COLS
from src.envs import API, EVAL_REQUESTS_PATH, DYNAMIC_INFO_REPO, DYNAMIC_INFO_FILE_PATH, DYNAMIC_INFO_PATH, EVAL_RESULTS_PATH, TOKEN, IS_PUBLIC, QUEUE_REPO, REPO_ID, RESULTS_REPO



status_mapping = {
    'P': 'PENDING',
    'R': 'RUNNING',
    'S': 'FINISHED',
    'F': 'FAILED',
    'C': 'CANCELLED'
}

dataset_metric_mapping = {
    'ChartQA': ('accuracy','acc'),
    'CMMMU': ('accuracy','acc'),
    'CMMU': ('accuracy','acc'),
    'MMMU': ('accuracy','acc'),
    'MMMU_Pro_standard': ('accuracy','acc'),
    'MMMU_Pro_vision': ('accuracy','acc'),
    'OCRBench': ('accuracy','acc'),
    'MathVision': ('accuracy','acc'),
    'CII-Bench': ('accuracy','acc'),
    'Blink': ('accuracy','acc'),
}


failed_mapping = {}
# Example usage
# 生产环境
#base_url = 'https://flageval.baai.ac.cn/api/hf'
#secret = b'M2L84t36MdzwS1Lb'
# 测试环境
base_url = 'http://120.92.17.239:8080/api/hf'
secret = b'Dn29TMCxzvKBGMS8'
# model_id = 'Qwen/Qwen1.5-0.5B'

MAX_GPU_USAGE = 20
LC_A800_QUEUE_ID = "877467e6-808b-487e-8a06-af8e96c83fa6"
A800_QUEUE_ID = "f016ff98-6ec8-4b1e-aed2-9a93753119b2"
A100_QUEUE_ID = "7f8cb309-295f-4f56-8159-f43f60f03f9c"
MAX_A800_UASGE = 1

def get_gpu_number(params=0):
    # 参数量除以 30 再向上取整,就算 params为0,最小为1
    # return -(-params // 35)
    # return -(-params // 35)
    # return -(-params // 35)
    if params == 0:
        return 0, A100_QUEUE_ID
    if params < 9:
        return 1, A100_QUEUE_ID
    if params < 15:
        return 2, A100_QUEUE_ID
    elif params < 35:
        return 4, A100_QUEUE_ID
    elif params < 70:
        return 3, LC_A800_QUEUE_ID
    elif params < 100:
        return 5, LC_A800_QUEUE_ID
    elif params < 140:
        return 6, LC_A800_QUEUE_ID
    else:
        return 8, LC_A800_QUEUE_ID

def generate_signature(secret, url, body):
    timestamp = str(int(time.time()))
    to_sign = f'{timestamp}{url}{body}'
    h = hmac.new(secret, to_sign.encode('utf-8'), digestmod=hashlib.sha256)
    sign = h.hexdigest()
    return sign, timestamp

def submit_evaluation(base_url, secret, model_id, require_gpus=None, priority=None, gpus_queue_id=None, hf_user_id=None):
    url = f'{base_url}/mm/batches'
    data = {'modelId': model_id}
    if require_gpus is not None:
        data['requireGpus'] = require_gpus
    if priority is not None:
        data['priority'] = priority
    if gpus_queue_id is not None:
        data['gpus_queue_id'] = gpus_queue_id
    if hf_user_id is not None:
        data['hfUserId'] = hf_user_id

    raw_body = json.dumps(data)
    sign, timestamp = generate_signature(secret, url, raw_body)

    headers = {
        'Content-Type': 'application/json',
        'X-Flageval-Sign': sign,
        'X-Flageval-Timestamp': timestamp,
    }

    response = requests.post(url, data=raw_body, headers=headers)
    print("submit_evaluation response",response)
    response_data = response.json()

    evaluation_info = {
        'evaluationId': response_data.get('evaluationId'),
        'eval_id': response_data.get('id')
    }
    return evaluation_info

def poll_evaluation_progress(base_url, secret, batch_id):
    url = f'{base_url}/mm/batches/{int(batch_id)}'
    sign, timestamp = generate_signature(secret, url, '')

    headers = {
        'X-Flageval-Sign': sign,
        'X-Flageval-Timestamp': timestamp,
    }

    try:
        response = requests.get(url, headers=headers)
        response.raise_for_status()  # 如果响应状态不是200,将引发HTTPError异常

        response_data = response.json()

        evaluation_progress = {
            'evaluationId': response_data.get('evaluationId'),
            'eval_id': response_data.get('batchId'),
            'status': response_data.get('status'),
            'details': response_data.get('details', [])
        }
        return evaluation_progress

    except requests.exceptions.RequestException as e:
        print(f"请求错误: {e}")
    except ValueError:
        print(f"解析JSON时出错:{response}")
    except Exception as e:
        print(f"未知错误: {e}")

    return {'status': '未执行成功'}

def update_gpu_usage(change):
    global current_gpu_usage
    current_gpu_usage += change

def get_evaluation_queue_df(save_path: str, cols: list) -> list[pd.DataFrame]:
    all_evals = []

    for root, dirs, files in os.walk(save_path):
        for file in files:
            if file.endswith(".json"):
                file_path = os.path.join(root, file)
                with open(file_path) as fp:
                    data = json.load(fp)
                    # 确保所有列都存在,不存在的列初始化为 None
                    for col in cols:
                        if col not in data:
                            if col == "failed_status":
                                data[col] = 0
                            else:
                                data[col] = None

                    all_evals.append(data)
    # all_eval order by submited_time
    all_evals = sorted(all_evals, key=lambda x: x['submitted_time'])

    pending_list = [e for e in all_evals if e["status"] in ["PENDING", "RERUN"]]
    pending_list = sorted(pending_list, key=lambda x: x['params'])
    pending_list = sorted(pending_list, key=lambda x: x['failed_status'])
    running_list = [e for e in all_evals if e["status"] == "RUNNING"]
    finished_list = [e for e in all_evals if e["status"].startswith("FINISHED") or e["status"] == "PENDING_NEW_EVAL"]

    df_pending = pd.DataFrame(pending_list) if pending_list else pd.DataFrame(columns=cols)
    df_running = pd.DataFrame(running_list) if running_list else pd.DataFrame(columns=cols)
    df_finished = pd.DataFrame(finished_list) if finished_list else pd.DataFrame(columns=cols)

    return df_finished[cols], df_running[cols], df_pending[cols]

def update_evaluation_queue(model_name, nstatus, eval_id=None, flageval_id=None):
    print("update_evaluation_queue", model_name, nstatus, eval_id)
    fail_status = -1
    if len(nstatus.split("_")) == 2:
        status, fail_status = nstatus.split("_")[0], int(nstatus.split("_")[1])
    else:
        status = nstatus
    user_name, model_path = model_name.split("/") if "/" in model_name else ("", model_name)
    out_dir = f"{EVAL_REQUESTS_PATH}/{user_name}"
    json_files = [f for f in os.listdir(out_dir) if f.startswith(model_path + '_') and f.endswith(".json")]

    if not json_files:
        print(f"No JSON file found for model {model_name}")
        return

    for json_file in json_files:
        json_path = os.path.join(out_dir, json_file)
        with open(json_path, "r") as f:
            eval_entry = json.load(f)

        print("befor update_evaluation_queue", eval_entry['status'], eval_entry['failed_status'])
        eval_entry['status'] = status
        if fail_status >=0:
            eval_entry['failed_status'] = fail_status
        if eval_id is not None:
            eval_entry['eval_id'] = eval_id
        if flageval_id is not None:
            eval_entry['flageval_id'] = flageval_id
        print("after update_evaluation_queue status change", eval_entry['status'], eval_entry['failed_status'])
        with open(json_path, "w") as f:
            # f.write(json.dumps(eval_entry))
            json.dump(eval_entry, f, indent=4)

        api.upload_file(
            path_or_fileobj=json_path,
            path_in_repo=json_path.split(f"{EVAL_REQUESTS_PATH}/")[1],
            repo_id=QUEUE_REPO,
            repo_type="dataset",
            commit_message=f"Update {model_name} status to {status}",
        )

def save_and_upload_results(model_name, details):
    converted_details = {
        "config_general": {
            "model_name": model_name,
            "model_dtype": "float16",
            "model_size": 0
        },
        "results": {},
        "versions": {},
        "config_tasks": {},
        "summary_tasks": {},
        "summary_general": {}
    }
    for detail in details:
        dataset = detail['dataset']
        status = detail['status']
        # accuracy = detail['accuracy']
        if status == 'S' and dataset in dataset_metric_mapping.keys():
            # dataset_key = f"harness|{dataset}|5"
            acc_key = dataset_metric_mapping[dataset][0]
            acc = detail['accuracy'] if acc_key == 'accuracy' else detail['rawDetails'][acc_key]
            converted_details['results'][dataset] = {
                dataset_metric_mapping[dataset][1]: acc,
                "acc_stderr": 0
            }
            # 添加详细信息
            for metric, value in detail['rawDetails'].items():
                converted_details['results'][dataset][metric] = value

    out_dir = f"{EVAL_RESULTS_PATH}/{model_name}"
    os.makedirs(out_dir, exist_ok=True)
    result_path = os.path.join(out_dir, f"results_{datetime.now().strftime('%Y-%m-%dT%H-%M-%S.%f')}.json")

    with open(result_path, "w") as f:
        json.dump(converted_details, f, indent=4)

    api.upload_file(
        path_or_fileobj=result_path,
        path_in_repo=result_path.split(f"{EVAL_RESULTS_PATH}/")[1],
        repo_id=RESULTS_REPO,
        repo_type="dataset",
        commit_message=f"Add results for {model_name}",
    )

from tqdm.auto import tqdm
import io

class SilentTqdm(tqdm):
    def __init__(self, *args, **kwargs):
        kwargs['bar_format'] = ''
        kwargs['leave'] = False
        super().__init__(*args, **kwargs, file=io.StringIO())

    def update(self, n=1):
        pass

    def close(self):
        pass


def snapshot_download_with_retry(max_retries, wait_time, *args, **kwargs):
    for i in range(max_retries):
        try:
            return snapshot_download(*args, **kwargs)
        except Exception as e:
            if i < max_retries - 1:  # i is zero indexed
                print(f"Error occurred: {e}. Retrying in {wait_time} seconds...")
                time.sleep(wait_time)
            else:
                print("Max retries reached. Raising exception.")
                raise


api = HfApi()

print(EVAL_REQUESTS_PATH)
print(DYNAMIC_INFO_PATH)
print(EVAL_RESULTS_PATH)

prev_running_models = ''

while True:

    snapshot_download_with_retry(5, 10, repo_id=QUEUE_REPO, local_dir=EVAL_REQUESTS_PATH, repo_type="dataset", tqdm_class=SilentTqdm, etag_timeout=30)
    snapshot_download_with_retry(5, 10, repo_id=DYNAMIC_INFO_REPO, local_dir=DYNAMIC_INFO_PATH, repo_type="dataset", tqdm_class=SilentTqdm, etag_timeout=30)
    snapshot_download_with_retry(5, 10, repo_id=RESULTS_REPO, local_dir=EVAL_RESULTS_PATH, repo_type="dataset", tqdm_class=SilentTqdm, etag_timeout=30)

    (
        finished_eval_queue_df,
        running_eval_queue_df,
        pending_eval_queue_df,
    ) = get_evaluation_queue_df(EVAL_REQUESTS_PATH, ['model','status','params','eval_id', 'failed_status'])
    ## pending list test
    pending_list = [row for _,row in pending_eval_queue_df.iterrows()]

    for pend in pending_list:
        print("pending", pend)

    # 根据正在运行的评测队列更新当前 GPU 使用情况
    current_gpu_usage = 0
    current_A800gpu_usage = 0
    for _, row in running_eval_queue_df.iterrows():
        print(get_gpu_number(row['params']), row['params'])
        gpus_num, gpus_queue_id = get_gpu_number(row['params'])
        current_gpu_usage += gpus_num
        if gpus_queue_id == LC_A800_QUEUE_ID:
            current_A800gpu_usage += 1
    # print(f'Current GPU usage: {current_gpu_usage}/{MAX_GPU_USAGE}')
    running_models = ", ".join([row["model"] for _, row in running_eval_queue_df.iterrows()])
    if running_models != prev_running_models:
        print(f'{datetime.now().strftime("%Y-%m-%d %H:%M:%S")} | GPU usage: {current_gpu_usage}/{MAX_GPU_USAGE} | Running models: {running_models}')
        prev_running_models = running_models

    print("current A800 GPU usage", current_A800gpu_usage)
    # 只查询 pending_eval_queue_df 中的前5个待处理的评测
    if not pending_eval_queue_df.empty:
        for i,row in pending_eval_queue_df.iterrows():
            #if i >= 3 : break
            required_gpus, gpus_queue_id = get_gpu_number(row['params'])
            if gpus_queue_id == LC_A800_QUEUE_ID:
                if current_A800gpu_usage >= MAX_A800_UASGE:
                    print(current_A800gpu_usage >= MAX_A800_UASGE, row['model'])
                    continue
            if "princeton-nlp/Llama-3-8B-ProLong-512k" in row['model']:
                required_gpus += 1
            if current_gpu_usage + required_gpus <= MAX_GPU_USAGE:
                #确认是否有重复提交
                if row['model'] in [row["model"] for _, row in running_eval_queue_df.iterrows()]:
                    priniit(f'{datetime.now().strftime("%Y-%m-%d %H:%M:%S")} | Evaluation {row["model"]} is already running')
                    update_evaluation_queue(row['model'], 'CANCELLED', evaluation_info['eval_id'], evaluation_info['evaluationId'])
                    continue
                # 提交评测
                try:
                    evaluation_info = submit_evaluation(base_url, secret, row['model'], require_gpus=required_gpus,priority='high',gpus_queue_id=gpus_queue_id)
                    update_evaluation_queue(row['model'], 'RUNNING', evaluation_info['eval_id'], evaluation_info['evaluationId'])
                    update_gpu_usage(required_gpus)
                    print(f'{datetime.now().strftime("%Y-%m-%d %H:%M:%S")} | Submitted evaluation {row["model"]} with {required_gpus} GPUs, submit info: {evaluation_info}')
                except Exception as e:
                    print(e)
                    continue
    
     # 查询正在运行的评测状态
    for _, row in running_eval_queue_df.iterrows():
        progress = poll_evaluation_progress(base_url, secret, row['eval_id'])
        if progress['status'] in ['S', 'F', 'C'] or progress['status'] == 'DI':
            new_status = status_mapping.get(progress['status'], 'FINISHED')
            update_evaluation_queue(row['model'], new_status)
            gpus_num, gpus_queue_id = get_gpu_number(row['params'])
            update_gpu_usage(-gpus_num)
            if gpus_queue_id == LC_A800_QUEUE_ID:
                current_A800gpu_usage -= 1
            print(f'{datetime.now().strftime("%Y-%m-%d %H:%M:%S")} | Evaluation {row["model"]} finished with status {progress["status"]}')
            if new_status == 'FAILED':
                print("failed_mapping0", failed_mapping)
                if row['model'] in failed_mapping:
                    failed_mapping[row['model']] += 1
                else:
                    failed_mapping[row['model']] = 1
                print("failed_mapping add", failed_mapping, row['failed_status'])
                if failed_mapping[row['model']] == 5:
                    del failed_mapping[row['model']]
                    update_evaluation_queue(row['model'], 'PENDING_'+str(int(row['failed_status']+1)))
                else:
                    update_evaluation_queue(row['model'], 'PENDING')
                print(f'{datetime.now().strftime("%Y-%m-%d %H:%M:%S")} |--------------- RePending {row["model"]} ------------ ')
            elif new_status == 'FINISHED':
                print(progress)
                save_and_upload_results(row['model'], progress['details'])

    time.sleep(300)  # 调整队列检查间隔