reward-bench / app.py
saumyamalik's picture
ignore open_instruct_dev models
33298d1
import os
from pathlib import Path
import gradio as gr
import numpy as np
import pandas as pd
from datasets import load_dataset
from huggingface_hub import HfApi, snapshot_download
from leaderboard.constants import example_counts, subset_mapping
from leaderboard.css import custom_css
from leaderboard.md import *
from leaderboard.utils import load_all_data
#######################################################
# Setup #
#######################################################
api = HfApi()
COLLAB_TOKEN = os.environ.get("COLLAB_TOKEN")
evals_repo = "allenai/reward-bench-2-results"
eval_set_repo = "allenai/reward-bench-2"
eval_set_repo_v1 = "allenai/reward-bench"
repo_dir_rewardbench = "./evals/rewardbench/"
print("Pulling evaluation results")
repo = snapshot_download(
local_dir=repo_dir_rewardbench,
ignore_patterns=["pref-sets-scores/*", "eval-set-scores/*", "eval-set/allenai/open_instruct_dev*"],
repo_id=evals_repo,
use_auth_token=COLLAB_TOKEN,
tqdm_class=None,
etag_timeout=30,
repo_type="dataset",
)
###########################################
# Load Data #
###########################################
def avg_over_rewardbench_v2(dataframe_core):
domain_cols = ["Factuality", "Precise IF", "Math", "Safety", "Focus", "Ties"]
domain_weights = [1, 1, 1, 1, 1, 1]
new_df = dataframe_core.copy()
# for main subsets, keys in subset_mapping, take the weighted avg by example_counts and store for the models
# Get the domain data and handle missing values
domain_data = new_df[domain_cols].values
masked_data = np.ma.masked_array(domain_data, np.isnan(domain_data))
# Calculate weighted average
average = np.ma.average(masked_data, axis=1, weights=domain_weights)
new_df["average"] = average.filled(np.nan)
# Rearrange columns for consistent output
keep_columns = ["model", "model_type", "average"] + domain_cols
new_df = new_df[keep_columns]
return new_df
def avg_over_rewardbench(dataframe_core, dataframe_prefs):
"""
Averages over the subsets alpacaeval, mt-bench, llmbar, refusals, hep and returns dataframe with only these columns.
We average over 4 core sections (per prompt weighting):
1. Chat: Includes the easy chat subsets (alpacaeval-easy, alpacaeval-length, alpacaeval-hard, mt-bench-easy, mt-bench-medium)
2. Chat Hard: Includes the hard chat subsets (mt-bench-hard, llmbar-natural, llmbar-adver-neighbor, llmbar-adver-GPTInst, llmbar-adver-GPTOut, llmbar-adver-manual)
3. Safety: Includes the safety subsets (refusals-dangerous, refusals-offensive, xstest-should-refuse, xstest-should-respond, do not answer)
4. Reasoning: Includes the code and math subsets (math-prm, hep-cpp, hep-go, hep-java, hep-js, hep-python, hep-rust)
5. Prior Sets (0.5 weight): Includes the test sets (anthropic_helpful, mtbench_human, shp, summarize)
"""
new_df = dataframe_core.copy()
dataframe_prefs = dataframe_prefs.copy()
# for main subsets, keys in subset_mapping, take the weighted avg by example_counts and store for the models
for subset, sub_subsets in subset_mapping.items():
subset_cols = [col for col in new_df.columns if col in sub_subsets]
sub_data = new_df[subset_cols].values # take the relevant column values
sub_counts = [example_counts[s] for s in subset_cols] # take the example counts
new_df[subset] = np.average(sub_data, axis=1, weights=sub_counts) # take the weighted average
# new_df[subset] = np.round(np.nanmean(new_df[subset_cols].values, axis=1), 2)
data_cols = list(subset_mapping.keys())
keep_columns = (
[
"model",
]
+ ["model_type"]
+ data_cols
)
# keep_columns = ["model", "average"] + subsets
new_df = new_df[keep_columns]
# selected average from pref_sets
pref_columns = ["anthropic_helpful", "anthropic_hhh", "shp", "summarize"]
pref_data = dataframe_prefs[pref_columns].values
# add column test sets knowing the rows are not identical, take superset
dataframe_prefs["Prior Sets (0.5 weight)"] = np.nanmean(pref_data, axis=1)
# add column Test Sets empty to new_df
new_df["Prior Sets (0.5 weight)"] = np.nan
# per row in new_df if model is in dataframe_prefs, add the value to new_df["Prior Sets (0.5 weight)"]
values = []
for i, row in new_df.iterrows():
model = row["model"]
if model in dataframe_prefs["model"].values:
values.append(dataframe_prefs[dataframe_prefs["model"] == model]["Prior Sets (0.5 weight)"].values[0])
# new_df.at[i, "Prior Sets (0.5 weight)"] = dataframe_prefs[dataframe_prefs["model"] == model]["Prior Sets (0.5 weight)"].values[0]
else:
values.append(np.nan)
new_df["Prior Sets (0.5 weight)"] = values
# add total average
data_cols += ["Prior Sets (0.5 weight)"]
final_data = new_df[data_cols].values
masked_data = np.ma.masked_array(final_data, np.isnan(final_data))
weights = [2, 2, 2, 2, 1]
average = np.ma.average(masked_data, axis=1, weights=weights)
new_df["average"] = average.filled(np.nan)
# new_df["average"] = np.nanmean(new_df[data_cols].values, axis=1)
# make average third column
keep_columns = ["model", "model_type", "average"] + data_cols
new_df = new_df[keep_columns]
return new_df
def prep_df(df):
# add column to 0th entry with count (column name itself empty)
df.insert(0, "", range(1, 1 + len(df)))
# replace "model" with "Model" and "model_type" with "Model Type" and "average" with "Average"
df = df.rename(columns={"model": "Model", "model_type": "Model Type", "average": "Average"})
# if "Model Type" in columns
if "Model Type" in df.columns:
# get model_types that have generative in them
mask = df["Model Type"].str.contains("generative", case=False, na=False)
# set these values to "Generative"
df.loc[mask, "Model Type"] = "Generative"
return df
# get v1 data
orig_data_path = "leaderboard/final-rbv1-data.csv"
rb_orig_snapshot = pd.read_csv(orig_data_path)
# rename column "Unnamed: 0" to ""
rb_orig_snapshot = rb_orig_snapshot.rename(columns={"Unnamed: 0": ""})
# rb_orig_snapshot = rb_orig_snapshot.drop(columns=["Unnamed: 0", ''])
rb_orig_snapshot.reset_index(drop=True, inplace=True)
rewardbench_data = load_all_data(repo_dir_rewardbench, subdir="eval-set").sort_values(by="average", ascending=False)
rewardbench_data_avg_intermediate = avg_over_rewardbench_v2(rewardbench_data.copy())
# Prepare RBv1 scores for merging
rb_v1_scores_to_merge = rb_orig_snapshot[["Model", "Score"]].copy()
# if " ⚠️" in rb_v1_scores_to_merge["Model"].values, shorten the model name without it
rb_v1_scores_to_merge["Model"] = rb_v1_scores_to_merge["Model"].str.replace(" ⚠️", "", regex=False)
rb_v1_scores_to_merge.rename(columns={"Score": "RBv1"}, inplace=True)
# rename rb_v1 "Model" to "model"
rb_v1_scores_to_merge.rename(columns={"Model": "model"}, inplace=True)
# Merge RBv1 scores into the v2 data
rewardbench_data_avg = pd.merge(rewardbench_data_avg_intermediate, rb_v1_scores_to_merge, on="model", how="left")
# Drop any models with only RBv1 scores and no v2 scores
rewardbench_data_avg = rewardbench_data_avg.dropna(subset=["average"])
# Sort by the v2 average
rewardbench_data_avg = rewardbench_data_avg.sort_values(by="average", ascending=False)
# add count column to all dataframes
rewardbench_data = prep_df(rewardbench_data)
rewardbench_data_avg = prep_df(rewardbench_data_avg).rename(columns={"Average": "Score"})
# Ensure RBv1 is the last column if it's not already (merge usually places it at the end of non-key columns)
# If 'RBv1' is present and not last, move it to be the last column.
if "RBv1" in rewardbench_data_avg.columns:
rbv1_col = rewardbench_data_avg.pop("RBv1")
rewardbench_data_avg["RBv1"] = rbv1_col
# save rewardbench_data_avg as csv to src/current-rbv2-data.csv
v2_data_path = "leaderboard/current-rbv2-data.csv"
rewardbench_data_avg.to_csv(v2_data_path, index=False)
col_types_rewardbench = ["number"] + ["markdown"] + ["str"] + ["number"] * (len(rewardbench_data_avg.columns) - 1)
col_types_rewardbench_v1 = ["number"] + ["markdown"] + ["str"] + ["number"] * (len(rb_orig_snapshot.columns) - 1)
# import ipdb; ipdb.set_trace()
###########################################
# Leaderboard Helpers & Setting #
###########################################
# for showing random samples
eval_set = load_dataset(eval_set_repo, use_auth_token=COLLAB_TOKEN, split="test")
eval_set_v1 = load_dataset(eval_set_repo_v1, use_auth_token=COLLAB_TOKEN, split="filtered")
subsets = eval_set.unique("subset")
subsets_v1 = eval_set_v1.unique("subset")
def random_sample(r: gr.Request, subset):
if subset is None or subset == []:
sample_index = np.random.randint(0, len(eval_set) - 1)
sample = eval_set[sample_index]
else: # filter by subsets (can be list)
if isinstance(subset, str):
subset = [subset]
# filter down dataset to only include the subset(s)
eval_set_filtered = eval_set.filter(lambda x: x["subset"] in subset)
sample_index = np.random.randint(0, len(eval_set_filtered) - 1)
sample = eval_set_filtered[sample_index]
markdown_text = "\n\n".join([f"**{key}**:\n\n{value}" for key, value in sample.items()])
return markdown_text
# Duplicating because they use global variables with gradio setup
def random_sample_v1(r: gr.Request, subset):
if subset is None or subset == []:
sample_index = np.random.randint(0, len(eval_set) - 1)
sample = eval_set[sample_index]
else: # filter by subsets (can be list)
if isinstance(subset, str):
subset = [subset]
# filter down dataset to only include the subset(s)
eval_set_filtered = eval_set.filter(lambda x: x["subset"] in subset)
sample_index = np.random.randint(0, len(eval_set_filtered) - 1)
sample = eval_set_filtered[sample_index]
markdown_text = "\n\n".join([f"**{key}**:\n\n{value}" for key, value in sample.items()])
return markdown_text
color_map = {
"Generative": "#7497db",
"Custom Classifier": "#E8ECF2",
"Seq. Classifier": "#ffcd75",
"DPO": "#75809c",
}
def color_model_type_column(df, color_map):
"""
Apply color to the 'Model Type' column of the DataFrame based on a given color mapping.
Parameters:
df (pd.DataFrame): The DataFrame containing the 'Model Type' column.
color_map (dict): A dictionary mapping model types to colors.
Returns:
pd.Styler: The styled DataFrame.
"""
# Function to apply color based on the model type
def apply_color(val):
color = color_map.get(val, "default") # Default color if not specified in color_map
return f"background-color: {color}"
# Format for different columns
format_dict = {col: "{:.1f}" for col in df.columns if col not in ["Average", "Model", "Model Type"]}
format_dict["Average"] = "{:.2f}"
format_dict[""] = "{:d}"
return df.style.applymap(apply_color, subset=["Model Type"]).format(format_dict, na_rep="")
def regex_table(dataframe, regex, filter_button, style=True):
"""
Takes a model name as a regex, then returns only the rows that has that in it.
"""
# Split regex statement by comma and trim whitespace around regexes
regex_list = [x.strip() for x in regex.split(",")]
# Join the list into a single regex pattern with '|' acting as OR
combined_regex = "|".join(regex_list)
# remove internal ai2 data
dataframe = dataframe[~dataframe["Model"].str.contains("ai2", case=False, na=False)]
# if filter_button, remove all rows with "ai2" in the model name
update_scores = False
if isinstance(filter_button, list) or isinstance(filter_button, str):
if "Prior Sets" not in filter_button and "Prior Sets (0.5 weight)" in dataframe.columns:
update_scores = True
# remove the column "Prior Sets (0.5 weight)" from the outputted table
dataframe = dataframe.drop(columns=["Prior Sets (0.5 weight)"])
if "RBv1" not in filter_button and "RBv1" in dataframe.columns:
# remove the column "Prior Sets (0.5 weight)" from the outputted table
dataframe = dataframe.drop(columns=["RBv1"])
if "Seq. Classifiers" not in filter_button:
dataframe = dataframe[~dataframe["Model Type"].str.contains("Seq. Classifier", case=False, na=False)]
if "DPO" not in filter_button:
dataframe = dataframe[~dataframe["Model Type"].str.contains("DPO", case=False, na=False)]
if "Custom Classifiers" not in filter_button:
dataframe = dataframe[~dataframe["Model Type"].str.contains("Custom Classifier", case=False, na=False)]
if "Generative" not in filter_button:
dataframe = dataframe[~dataframe["Model Type"].str.contains("generative", case=False, na=False)]
# Filter the dataframe such that 'model' contains any of the regex patterns
data = dataframe[dataframe["Model"].str.contains(combined_regex, case=False, na=False)]
# if update the score to not use prior sets, do so
if update_scores:
data["Score"] = (data["Chat"] + data["Chat Hard"] + data["Safety"] + data["Reasoning"]) / 4
# if "Prior Sets (0.5 weight)" in data.columns:
# data["Prior Sets (0.5 weight)"] = np.nan
# sort array by Score column
data = data.sort_values(by="Score", ascending=False)
data.reset_index(drop=True, inplace=True)
# replace column '' with count/rank
data[""] = np.arange(1, 1 + len(data))
# if Score exists, round to 2 decimals
if "Score" in data.columns:
data["Score"] = np.round(np.array(data["Score"].values).astype(float), 2)
if "Average" in data.columns:
data["Average"] = np.round(np.array(data["Average"].values).astype(float), 1)
# round all others to 1 decimal
for col in data.columns:
if col not in ["", "Model", "Model Type", "Score", "Average"]:
# replace any data[col].values == '' with np.nan
data[col] = data[col].replace("", np.nan)
data[col] = np.round(np.array(data[col].values).astype(float), 1)
if style:
# apply color
data = color_model_type_column(data, color_map)
return data
# import ipdb; ipdb.set_trace()
total_models = len(
regex_table(
rewardbench_data_avg.copy(), "", ["Seq. Classifiers", "DPO", "Custom Classifiers", "Generative"], style=False
).values
)
total_models_v1 = len(
regex_table(
rb_orig_snapshot.copy(), "", ["Seq. Classifiers", "DPO", "Custom Classifiers", "Generative"], style=False
).values
)
assets = Path("leaderboard").resolve() # absolute dir with the image
# Using a string for a predefined color
theme = gr.themes.Default(primary_hue="blue")
#############################################
# Gradio App #
#############################################
with gr.Blocks(theme=theme, css=custom_css) as app:
# create tabs for the app, moving the current table to one titled "rewardbench" and the benchmark_text to a tab called "About"
with gr.Row():
with gr.Column(scale=6):
gr.Markdown(TOP_TEXT)
# with gr.Column(scale=4):
# # search = gr.Textbox(label="Model Search (delimit with , )", placeholder="Regex search for a model")
# # filter_button = gr.Checkbox(label="Include AI2 training runs (or type ai2 above).", interactive=True)
# # img = gr.Image(value="https://private-user-images.githubusercontent.com/10695622/310698241-24ed272a-0844-451f-b414-fde57478703e.png", width=500)
# gr.Markdown("""
# ![](/gradio_api/file=leaderboard/logo.png)
# """)
with gr.Tabs(elem_id="outer-tabs", elem_classes="tabs-big") as tabs_big:
with gr.TabItem("🏆 RewardBench 2"):
with gr.Row():
with gr.Column(scale=7):
gr.Markdown(CAPTION_V2.format(str(total_models)))
with gr.Column(scale=3):
# search = gr.Textbox(label="Model Search (delimit with , )", placeholder="Regex search for a model")
# filter_button = gr.Checkbox(label="Include AI2 training runs (or type ai2 above).", interactive=True)
# img = gr.Image(value="https://private-user-images.githubusercontent.com/10695622/310698241-24ed272a-0844-451f-b414-fde57478703e.png", width=500)
gr.Markdown(
"""
![](/gradio_api/file=leaderboard/logo.png)
"""
)
with gr.Tabs(elem_id="inner-tabs", elem_classes="tabs-small") as tabs:
with gr.TabItem("Leaderboard"):
with gr.Row():
search_1 = gr.Textbox(
label="Model Search (delimit with , )",
placeholder="Model Search (delimit with , )",
show_label=False,
scale=8,
)
model_types_1 = gr.CheckboxGroup(
["Seq. Classifiers", "Custom Classifiers", "Generative", "RBv1"],
value=["Seq. Classifiers", "Custom Classifiers", "Generative"],
show_label=False,
scale=8,
)
# narrow, non-expanding download button
gr.DownloadButton(
label="Download CSV",
value=v2_data_path,
size="sm", # shorter height / padding
scale=0, # ← **width stays just big enough for the text**
min_width=140, # (optional) guarantee it doesn’t collapse
)
with gr.Row():
# reference data
rewardbench_table_hidden = gr.Dataframe(
rewardbench_data_avg.values,
datatype=col_types_rewardbench_v1,
headers=rewardbench_data_avg.columns.tolist(),
visible=False,
)
rewardbench_table = gr.Dataframe(
regex_table(
rewardbench_data_avg.copy(),
"",
["Seq. Classifiers", "Custom Classifiers", "Generative"],
),
datatype=col_types_rewardbench_v1,
headers=rewardbench_data_avg.columns.tolist(),
elem_id="rewardbench_dataframe_avg",
height=800, # 800 px ≈ ~25 rows on default row-height
)
with gr.TabItem("About"):
with gr.Row():
gr.Markdown(ABOUT_TEXT_V2)
with gr.TabItem("Dataset Viewer"):
with gr.Row():
# loads one sample
gr.Markdown("""## Random Dataset Sample Viewer""")
subset_selector = gr.Dropdown(subsets, label="Subset", value=None, multiselect=True)
button_data = gr.Button("Show Random Sample")
with gr.Row():
sample_display = gr.Markdown("{sampled data loads here}")
button_data.click(fn=random_sample, inputs=[subset_selector], outputs=[sample_display])
with gr.TabItem("RewardBench"):
with gr.Row():
gr.Markdown(CAPTION_V1.format(str(total_models_v1)))
with gr.Tabs(elem_id="inner-tabs", elem_classes="tabs-small") as tabs:
with gr.TabItem("Leaderboard"):
with gr.Row():
search_1_v1 = gr.Textbox(
label="Model Search (delimit with , )",
placeholder="Model Search (delimit with , )",
show_label=False,
)
model_types_1_v1 = gr.CheckboxGroup(
["Seq. Classifiers", "DPO", "Custom Classifiers", "Generative"],
value=["Seq. Classifiers", "Custom Classifiers", "Generative"],
label="Model Types",
show_label=False,
# info="Which model types to include.",
)
# narrow, non-expanding download button
gr.DownloadButton(
label="Download CSV",
value=orig_data_path,
size="sm", # shorter height / padding
scale=0, # ← **width stays just big enough for the text**
min_width=140, # (optional) guarantee it doesn’t collapse
)
with gr.Row():
# reference data
rewardbench_table_hidden_v1 = gr.Dataframe(
rb_orig_snapshot.values,
datatype=col_types_rewardbench,
headers=rb_orig_snapshot.columns.tolist(),
visible=False,
)
rewardbench_table_v1 = gr.Dataframe(
regex_table(
rb_orig_snapshot.copy(),
"",
["Seq. Classifiers", "Custom Classifiers", "Generative"],
),
datatype=col_types_rewardbench,
headers=rb_orig_snapshot.columns.tolist(),
elem_id="rewardbench_dataframe_avg_v1",
height=800, # 800 px ≈ ~25 rows on default row-height
)
with gr.TabItem("About"):
with gr.Row():
gr.Markdown(ABOUT_TEXT_V1)
with gr.TabItem("Dataset Viewer"):
with gr.Row():
# loads one sample
gr.Markdown("""## Random Dataset Sample Viewer""")
subset_selector_v1 = gr.Dropdown(subsets_v1, label="Subset", value=None, multiselect=True)
button_data_v1 = gr.Button("Show Random Sample")
with gr.Row():
sample_display_v1 = gr.Markdown("{sampled data loads here}")
button_data_v1.click(fn=random_sample_v1, inputs=[subset_selector_v1], outputs=[sample_display_v1])
search_1.change(regex_table, inputs=[rewardbench_table_hidden, search_1, model_types_1], outputs=rewardbench_table)
search_1_v1.change(
regex_table, inputs=[rewardbench_table_hidden_v1, search_1_v1, model_types_1_v1], outputs=rewardbench_table_v1
)
model_types_1.change(
regex_table, inputs=[rewardbench_table_hidden, search_1, model_types_1], outputs=rewardbench_table
)
model_types_1_v1.change(
regex_table, inputs=[rewardbench_table_hidden_v1, search_1_v1, model_types_1_v1], outputs=rewardbench_table_v1
)
with gr.Row():
with gr.Accordion("📚 Citation", open=False):
citation_button = gr.Textbox(
value=r"""@misc{RewardBench2,
title={RewardBench 2: Advancing Reward Model Evaluation},
author={Malik, Saumya and Pyatkin, Valentina and Land, Sander and Morrison, Jacob and Smith, Noah A. and Hajishirzi, Hannaneh and Lambert, Nathan},
year={2025},
howpublished={\url{https://huggingface.co/spaces/allenai/reward-bench}},
}
@misc{RewardBench,
title={RewardBench: Evaluating Reward Models for Language Modeling},
author={Lambert, Nathan and Pyatkin, Valentina and Morrison, Jacob and Miranda, LJ and Lin, Bill Yuchen and Chandu, Khyathi and Dziri, Nouha and Kumar, Sachin and Zick, Tom and Choi, Yejin and Smith, Noah A. and Hajishirzi, Hannaneh},
year={2024},
howpublished={\url{https://huggingface.co/spaces/allenai/reward-bench}
}""",
lines=7,
label="Copy the following to cite these results.",
elem_id="citation-button",
show_copy_button=True,
)
app.launch(allowed_paths=[str(assets)]) # had .queue() before launch before... not sure if that's necessary