""" Populate the GuardBench leaderboard from HuggingFace datasets. """ import json import os import pandas as pd import tempfile from typing import Dict, Tuple, List from glob import glob from huggingface_hub import snapshot_download, hf_hub_download, HfApi from datasets import load_dataset from src.display.utils import GUARDBENCH_COLUMN, DISPLAY_COLS, CATEGORIES from src.envs import RESULTS_DATASET_ID, TOKEN, LEADERBOARD_FILE, CACHE_PATH from src.leaderboard.processor import leaderboard_to_dataframe, load_leaderboard_data, save_leaderboard_data, process_jsonl_submission, add_entries_to_leaderboard def get_versioned_leaderboard_file(version="v0"): """ Get the versioned leaderboard file path. """ base_name, ext = os.path.splitext(LEADERBOARD_FILE) return f"{base_name}_{version}{ext}" def download_leaderboard_data(version="v0") -> bool: """ Download the latest leaderboard data from HuggingFace. Args: version: The dataset version to download """ try: # Create a temporary directory to download the submissions temp_dir = os.path.join(CACHE_PATH, f"temp_submissions_{version}") os.makedirs(temp_dir, exist_ok=True) # Get the versioned leaderboard file leaderboard_file = get_versioned_leaderboard_file(version) # Download the entire repository try: snapshot_path = snapshot_download( repo_id=RESULTS_DATASET_ID, repo_type="dataset", local_dir=temp_dir, token=TOKEN, ignore_patterns=["*.md", ".*"], etag_timeout=30 ) # Process all submission files all_entries = [] submission_files = [] # Look for submission files in the submissions directory submissions_dir = os.path.join(snapshot_path, "submissions") version_submissions_dir = os.path.join(snapshot_path, f"submissions_{version}") # Check both standard and versioned submission directories if os.path.exists(submissions_dir): submission_files.extend(glob(os.path.join(submissions_dir, "*.jsonl"))) if os.path.exists(version_submissions_dir): submission_files.extend(glob(os.path.join(version_submissions_dir, "*.jsonl"))) # Also look for any versioned JSONL files in the root submission_files.extend(glob(os.path.join(snapshot_path, f"*_{version}.jsonl"))) # If we're looking for v0 and no versioned files found, use generic ones if version == "v0" and not submission_files: submission_files.extend(glob(os.path.join(snapshot_path, "*.jsonl"))) # Process each submission file for file_path in submission_files: entries, _ = process_jsonl_submission(file_path) # Filter entries to those that match the version or don't have version specified filtered_entries = [ entry for entry in entries if entry.get("version", "v0") == version or "version" not in entry ] all_entries.extend(filtered_entries) # Create leaderboard data structure leaderboard_data = { "entries": all_entries, "last_updated": pd.Timestamp.now().isoformat(), "version": version } # Save to local file save_leaderboard_data(leaderboard_data, leaderboard_file) return True except Exception as e: print(f"Error downloading repository: {e}") # If we can't download the repository, try to download individual files try: api = HfApi(token=TOKEN) files = api.list_repo_files(repo_id=RESULTS_DATASET_ID, repo_type="dataset") # Look for versioned and regular files submission_files = [ f for f in files if (f.endswith(f'_{version}.jsonl') or f.startswith(f'submissions_{version}/') or (version == "v0" and f.endswith('.jsonl'))) ] all_entries = [] for file_path in submission_files: try: local_path = hf_hub_download( repo_id=RESULTS_DATASET_ID, filename=file_path, repo_type="dataset", token=TOKEN ) entries, _ = process_jsonl_submission(local_path) # Filter entries to those that match the version or don't have version specified filtered_entries = [ entry for entry in entries if entry.get("version", "v0") == version or "version" not in entry ] all_entries.extend(filtered_entries) except Exception as file_error: print(f"Error downloading file {file_path}: {file_error}") # Create leaderboard data structure leaderboard_data = { "entries": all_entries, "last_updated": pd.Timestamp.now().isoformat(), "version": version } # Save to local file save_leaderboard_data(leaderboard_data, leaderboard_file) return True except Exception as list_error: print(f"Error listing repository files: {list_error}") # If we can't download anything, create an empty leaderboard if not os.path.exists(leaderboard_file): empty_data = { "entries": [], "last_updated": pd.Timestamp.now().isoformat(), "version": version } save_leaderboard_data(empty_data, leaderboard_file) return False except Exception as e: print(f"Error downloading leaderboard data: {e}") # Ensure we have at least an empty leaderboard file leaderboard_file = get_versioned_leaderboard_file(version) if not os.path.exists(leaderboard_file): empty_data = { "entries": [], "last_updated": pd.Timestamp.now().isoformat(), "version": version } save_leaderboard_data(empty_data, leaderboard_file) return False def get_leaderboard_df(version="v0") -> pd.DataFrame: """ Get the leaderboard data as a DataFrame. Args: version: The dataset version to retrieve """ # Try to download the latest data download_leaderboard_data(version=version) # Load from local file leaderboard_file = get_versioned_leaderboard_file(version) leaderboard_data = load_leaderboard_data(leaderboard_file) # Convert to DataFrame df = leaderboard_to_dataframe(leaderboard_data) return df def get_category_leaderboard_df(category: str, version="v0") -> pd.DataFrame: """ Get the leaderboard data filtered by a specific category. Args: category: The category to filter by (e.g., "Criminal, Violent, and Terrorist Activity") version: The dataset version to retrieve Returns: DataFrame with metrics for the specified category """ # Load the leaderboard data leaderboard_file = get_versioned_leaderboard_file(version) leaderboard_data = load_leaderboard_data(leaderboard_file) # Filter entries to only include those with data for the specified category filtered_entries = [] for entry in leaderboard_data.get("entries", []): # Check if the entry has data for this category if "per_category_metrics" in entry and category in entry["per_category_metrics"]: # Create a new entry with just the overall info and this category's metrics filtered_entry = { "model_name": entry.get("model_name", "Unknown Model"), "model_type": entry.get("model_type", "Unknown"), "submission_date": entry.get("submission_date", ""), "version": entry.get("version", version), } # Extract metrics for this category category_metrics = entry["per_category_metrics"][category] # Add metrics for each test type for test_type in category_metrics: if test_type and isinstance(category_metrics[test_type], dict): for metric, value in category_metrics[test_type].items(): col_name = f"{test_type}_{metric}" filtered_entry[col_name] = value # Calculate average F1 for this category f1_values = [] for test_type in category_metrics: if test_type and isinstance(category_metrics[test_type], dict) and "f1_binary" in category_metrics[test_type]: f1_values.append(category_metrics[test_type]["f1_binary"]) if f1_values: filtered_entry["average_f1"] = sum(f1_values) / len(f1_values) # Add specific test type F1 scores for display for test_type in ["default_prompts", "jailbreaked_prompts", "default_answers", "jailbreaked_answers"]: if test_type in category_metrics and "f1_binary" in category_metrics[test_type]: filtered_entry[f"{test_type}_f1"] = category_metrics[test_type]["f1_binary"] filtered_entries.append(filtered_entry) # Create a new leaderboard data structure with the filtered entries filtered_leaderboard = { "entries": filtered_entries, "last_updated": leaderboard_data.get("last_updated", pd.Timestamp.now().isoformat()), "version": version } # Convert to DataFrame df = leaderboard_to_dataframe(filtered_leaderboard) return df def get_detailed_model_data(model_name: str, version="v0") -> Dict: """ Get detailed data for a specific model. Args: model_name: The name of the model to get data for version: The dataset version to retrieve """ leaderboard_file = get_versioned_leaderboard_file(version) leaderboard_data = load_leaderboard_data(leaderboard_file) for entry in leaderboard_data.get("entries", []): # Check both the model name and version entry_version = entry.get("version", "v0") if entry.get("model_name") == model_name and (entry_version == version or entry_version is None): return entry return {}