import gradio as gr import json import pandas as pd from collections import defaultdict import copy as cp from urllib.request import urlopen, URLError import re from datetime import datetime import time # Constants CITATION_BUTTON_TEXT = r"""@misc{2023opencompass, title={OpenCompass: A Universal Evaluation Platform for Foundation Models}, author={OpenCompass Contributors}, howpublished = {\url{https://github.com/open-compass/opencompass}}, year={2023} }, }""" CITATION_BUTTON_LABEL = "Copy the following snippet to cite these results" OPENCOMPASS_README = ( 'https://raw.githubusercontent.com/open-compass/opencompass/main/README.md' ) GITHUB_REPO = 'https://github.com/open-compass/opencompass' GITHUB_RAW = 'https://raw.githubusercontent.com/open-compass/opencompass' GITHUB_BLOB = 'https://github.com/open-compass/opencompass/blob' # Base URL for the JSON data DATA_URL_BASE = "http://opencompass.oss-cn-shanghai.aliyuncs.com/assets/research-rank/research-data.REALTIME." def find_latest_data_url(): """Find the latest available data URL by trying different dates.""" today = datetime.now() # Try last 365 days for i in range(365): date = today.replace(day=today.day - i) date_str = date.strftime("%Y%m%d") url = f"{DATA_URL_BASE}{date_str}.json" try: urlopen(url) return url, date_str except URLError: continue # If no valid URL found, return None return None, None def get_latest_data(): """Get latest data URL and update time""" data_url, update_time = find_latest_data_url() if not data_url: raise Exception("Could not find valid data URL") formatted_update_time = datetime.strptime(update_time, "%Y%m%d").strftime("%Y-%m-%d") return data_url, formatted_update_time # Markdown content def get_leaderboard_title(update_time): return f"# CompassAcademic Leaderboard (Last Updated: {update_time})" MAIN_LEADERBOARD_DESCRIPTION = """## Main Evaluation Results The CompassAcademic currently focuses on the comprehensive reasoning abilities of LLMs. - The datasets selected so far include General Knowledge Reasoning (MMLU-Pro/GPQA-Diamond), Logical Reasoning (BBH), Mathematical Reasoning (MATH-500, AIME), Code Completion (LiveCodeBench, HumanEval), and Instruction Following (IFEval). - Currently, the evaluation primarily targets chat models, with updates featuring the latest community models at irregular intervals. - Prompts and reproduction scripts can be found in [**OpenCompass**: A Toolkit for Evaluation of LLMs](https://github.com/open-compass/opencompass)🏆. """ def fix_image_urls(content): """Fix image URLs in markdown content.""" # Handle the specific logo.svg path content = content.replace( 'docs/en/_static/image/logo.svg', 'https://raw.githubusercontent.com/open-compass/opencompass/main/docs/en/_static/image/logo.svg', ) # Replace other relative image paths with absolute GitHub URLs content = re.sub( r'!\[[^\]]*\]\((?!http)([^)]+)\)', lambda m: f'![{m.group(0)}](https://raw.githubusercontent.com/open-compass/opencompass/main/{m.group(1)})', content, ) return content MODEL_SIZE = ['<10B', '10B-70B', '>70B', 'Unknown'] MODEL_TYPE = ['API', 'OpenSource'] def load_data(data_url): response = urlopen(data_url) data = json.loads(response.read().decode('utf-8')) return data def build_main_table(data): df = pd.DataFrame(data['globalData']['OverallTable']) # Add OpenSource column based on models data models_data = data['models'] df['OpenSource'] = df['model'].apply( lambda x: 'Yes' if models_data[x]['release'] == 'OpenSource' else 'No' ) # Add Rank column based on Average Score df['Rank'] = df['Average'].rank(ascending=False, method='min').astype(int) columns = { 'Rank': 'Rank', 'model': 'Model', 'org': 'Organization', 'num': 'Parameters', 'OpenSource': 'OpenSource', 'Average': 'Average Score', 'BBH': 'BBH', 'Math-500': 'Math-500', 'AIME': 'AIME', 'MMLU-Pro': 'MMLU-Pro', 'LiveCodeBench': 'LiveCodeBench', 'HumanEval': 'HumanEval', 'GQPA-Diamond': 'GQPA-Diamond', 'IFEval': 'IFEval', } df = df[list(columns.keys())].rename(columns=columns) return df def filter_table(df, size_ranges, model_types): filtered_df = df.copy() # Filter by size if size_ranges: def get_size_in_B(param): if param == 'N/A': return None try: return float(param.replace('B', '')) except: return None filtered_df['size_in_B'] = filtered_df['Parameters'].apply( get_size_in_B ) mask = pd.Series(False, index=filtered_df.index) for size_range in size_ranges: if size_range == '<10B': mask |= (filtered_df['size_in_B'] < 10) & ( filtered_df['size_in_B'].notna() ) elif size_range == '10B-70B': mask |= (filtered_df['size_in_B'] >= 10) & ( filtered_df['size_in_B'] < 70 ) elif size_range == '>70B': mask |= filtered_df['size_in_B'] >= 70 elif size_range == 'Unknown': mask |= filtered_df['size_in_B'].isna() filtered_df = filtered_df[mask] filtered_df.drop('size_in_B', axis=1, inplace=True) # Filter by model type if model_types: type_mask = pd.Series(False, index=filtered_df.index) for model_type in model_types: if model_type == 'API': type_mask |= filtered_df['OpenSource'] == 'No' elif model_type == 'OpenSource': type_mask |= filtered_df['OpenSource'] == 'Yes' filtered_df = filtered_df[type_mask] return filtered_df def calculate_column_widths(df): """Dynamically calculate column widths based on content length.""" column_widths = [] for column in df.columns: # Get max length of column name and values header_length = len(str(column)) max_content_length = df[column].astype(str).map(len).max() # Use the larger of header or content length # Multiply by average character width (approximately 8 pixels) # Add padding (20 pixels) # Increase the multiplier for header length to ensure it fits width = max(header_length * 10, max_content_length * 8) + 20 # Set minimum width (200 pixels) width = max(160, width) # Set maximum width (400 pixels) to prevent extremely wide columns width = min(400, width) column_widths.append(width) return column_widths def create_interface(): data_url, update_time = get_latest_data() data = load_data(data_url) df = build_main_table(data) title = gr.Markdown(get_leaderboard_title(update_time)) with gr.Blocks() as demo: title_comp = gr.Markdown(get_leaderboard_title(update_time)) with gr.Tabs() as tabs: with gr.TabItem("🏅 Main Leaderboard", elem_id='main'): gr.Markdown(MAIN_LEADERBOARD_DESCRIPTION) with gr.Row(): with gr.Column(): size_filter = gr.CheckboxGroup( choices=MODEL_SIZE, value=MODEL_SIZE, label='Model Size', interactive=True, ) with gr.Column(): type_filter = gr.CheckboxGroup( choices=MODEL_TYPE, value=MODEL_TYPE, label='Model Type', interactive=True, ) with gr.Column(): table = gr.DataFrame( value=df.sort_values("Average Score", ascending=False), interactive=False, wrap=False, # 禁用自动换行 column_widths=calculate_column_widths(df), ) def update_data(): """Periodically check for new data and update the interface""" while True: time.sleep(300) # Check every 5 minutes try: new_data_url, new_update_time = get_latest_data() if new_data_url != data_url: new_data = load_data(new_data_url) new_df = build_main_table(new_data) filtered_df = filter_table(new_df, size_filter.value, type_filter.value) title_comp.value = get_leaderboard_title(new_update_time) table.value = filtered_df.sort_values("Average Score", ascending=False) except Exception as e: print(f"Error updating data: {e}") continue def update_table(size_ranges, model_types): filtered_df = filter_table(df, size_ranges, model_types) return filtered_df.sort_values( "Average Score", ascending=False ) size_filter.change( fn=update_table, inputs=[size_filter, type_filter], outputs=table, ) type_filter.change( fn=update_table, inputs=[size_filter, type_filter], outputs=table, ) # Set up periodic data update demo.load(update_data) with gr.Row(): with gr.Accordion("Citation", open=False): citation_button = gr.Textbox( value=CITATION_BUTTON_TEXT, label=CITATION_BUTTON_LABEL, elem_id='citation-button', ) return demo if __name__ == '__main__': demo = create_interface() demo.launch(server_name='0.0.0.0')