import gradio as gr import pandas as pd import plotly.express as px import plotly.graph_objects as go from collections import Counter import torch from transformers import AutoTokenizer, AutoModelForCausalLM import re import logging from typing import List, Dict, Any import gc import os import psutil def get_memory_usage(): """Return (gpu_mem_used_MB, gpu_mem_total_MB, ram_used_MB, ram_total_MB)""" # System RAM vm = psutil.virtual_memory() ram_used_mb = vm.used / (1024 ** 2) ram_total_mb = vm.total / (1024 ** 2) # GPU memory if torch.cuda.is_available(): gpu_idx = torch.cuda.current_device() torch.cuda.synchronize() gpu_mem_alloc = torch.cuda.memory_allocated(gpu_idx) / (1024 ** 2) gpu_mem_reserved = torch.cuda.memory_reserved(gpu_idx) / (1024 ** 2) gpu_mem_total = torch.cuda.get_device_properties(gpu_idx).total_memory / (1024 ** 2) gpu_mem_used = max(gpu_mem_alloc, gpu_mem_reserved) # safe estimate else: gpu_mem_used = 0 gpu_mem_total = 0 return gpu_mem_used, gpu_mem_total, ram_used_mb, ram_total_mb # Set up logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Model configurations - maps display names to HF model paths PREDEFINED_MODELS = [ "meta-llama/Llama-3.2-1B", "google/gemma-2-2b", "Qwen/Qwen3-0.6B", "Qwen/Qwen2.5-0.5B", "Qwen/Qwen2.5-1.5B", "bigscience/bloom-560m", "CohereForAI/aya-expanse-8b", "common-pile/comma-v0.1-2t", "google/byt5-small", "gsaltintas/supertoken_models-llama_gpt2", "gsaltintas/supertoken_models-llama_google-gemma-2-2b" ] # Global cache for loaded models model_cache = {} def parse_dataset(text): """Parse the input dataset text into structured questions""" if not text.strip(): return [], "Please enter your dataset" lines = text.strip().split('\n') if len(lines) < 2: return [], "Dataset must have at least a header and one question" # Skip header and detect delimiter first_data_line = lines[1] if len(lines) > 1 else lines[0] delimiter = '\t' if '\t' in first_data_line else ',' questions = [] errors = [] for i, line in enumerate(lines[1:], 2): # Start from line 2 (after header) line = line.strip() if not line: continue parts = [part.strip().strip('"') for part in line.split(delimiter)] if len(parts) < 5: errors.append(f"Line {i}: Not enough columns (need 5, got {len(parts)})") continue question = { 'question': parts[0], 'correct_answer': parts[1], 'choices': [parts[2], parts[3], parts[4]] } # Ensure correct answer is in choices if question['correct_answer'] not in question['choices']: question['choices'].append(question['correct_answer']) questions.append(question) error_msg = '\n'.join(errors) if errors else "" return questions, error_msg def setup_tokenizer(model_path): tokenizer_name = model_path if "supertoken" in model_path: from huggingface_hub import list_repo_files, hf_hub_download import json files = list_repo_files(model_path) if "tokenizer_config.json" in files: tokenizer_path = hf_hub_download(repo_id=model_path, filename="tokenizer_config.json") with open(tokenizer_path) as f: tok_config = json.load(f)["data"]["tokenizer"] if tok_config["name"] == "huggingface": tokenizer_name = tok_config["path"] # todo: tiktoken tokenizer = AutoTokenizer.from_pretrained(tokenizer_name, trust_remote_code=True, legacy=True) return tokenizer def load_model_and_tokenizer(model_path, progress_callback=None): """Load model and tokenizer with caching""" global model_cache # Decide caching strategy based on memory usage gpu_used, gpu_total, ram_used, ram_total = get_memory_usage() logger.info(f"Current GPU memory: {gpu_used:.1f}/{gpu_total:.1f} MB") logger.info(f"Current RAM: {ram_used:.1f}/{ram_total:.1f} MB") use_cache = not ( (gpu_total > 0 and gpu_used / gpu_total > 0.8) or (ram_used / ram_total > 0.8) ) or model_path in model_cache if not use_cache: logger.warning("High memory usage detected β disabling model cache.") if use_cache and model_path in model_cache: logger.info(f"Using cached model: {model_path}") if progress_callback: progress_callback(1.0, f"β Using cached model: {model_path}") return model_cache[model_path] try: if progress_callback: progress_callback(0.1, f"π Starting to load model: {model_path}") # Check if CUDA is available device = "cuda" if torch.cuda.is_available() else "cpu" logger.info(f"Loading model: {model_path} using device: {device}") if progress_callback: progress_callback(0.2, f"π₯ Loading tokenizer for {model_path}...") # Load tokenizer tokenizer = setup_tokenizer(model_path) # Add pad token if missing if tokenizer.pad_token is None: tokenizer.pad_token = tokenizer.eos_token if progress_callback: progress_callback(0.5, f"π§ Loading model weights for {model_path}... (this may take a while)") logger.info(os.getcwd()) # Load model with appropriate settings model = AutoModelForCausalLM.from_pretrained( model_path, torch_dtype=torch.float16 if device == "cuda" else torch.float32, device_map="auto" if device== "cuda" else None, trust_remote_code=True, low_cpu_mem_usage=True ) model_info = { 'tokenizer': tokenizer, 'model': model, 'device': device } if use_cache: model_cache[model_path] = model_info if progress_callback: progress_callback(1.0, f"β Successfully loaded model: {model_path}") return model_info except Exception as e: import code error_msg = f"β Error loading model {model_path}: {str(e)}" logger.error(error_msg) # code.interact(local=dict(globals(), **locals())) if progress_callback: progress_callback(0.0, error_msg) return None def calculate_choice_likelihood(model, tokenizer, question, choice): """Calculate the log-likelihood of the choice given the question prompt""" try: prompt = f"Question: {question}\nAnswer: " prompt=question full_text = f"{prompt} {choice}" # Tokenize full input (prompt + answer) input_ids = tokenizer.encode(full_text, return_tensors="pt", add_special_tokens=False).to(model.device) prompt_ids = tokenizer.encode(prompt, return_tensors="pt", add_special_tokens=False).to(model.device) if input_ids.size(1) <= prompt_ids.size(1): logger.warning("Answer tokens are empty after tokenization.") return float("-inf") with torch.no_grad(): outputs = model(input_ids) logits = outputs.logits # Get logits for the answer tokens only answer_len = input_ids.size(1) - prompt_ids.size(1) target_ids = input_ids[:, -answer_len:] logits = logits[:, prompt_ids.size(1)-1:-1, :] # shifted for next-token prediction log_probs = torch.nn.functional.log_softmax(logits, dim=-1) token_log_probs = log_probs.gather(2, target_ids.unsqueeze(-1)).squeeze(-1) total_log_prob = token_log_probs.sum().item() return total_log_prob except Exception as e: logger.error(f"Error calculating likelihood for choice '{choice}': {str(e)}") return float("-inf") def evaluate_model_on_questions(model_path, questions, progress_callback=None): """Evaluate a single model on all questions using likelihood-based scoring""" model_info = load_model_and_tokenizer(model_path, progress_callback=progress_callback) if model_info is None: return [{'error': f'Failed to load model {model_path}'}] * len(questions) results = [] model = model_info['model'] tokenizer = model_info['tokenizer'] for i, question in enumerate(questions): try: # Calculate likelihood for each choice choice_likelihoods = {} choice_probs = {} for choice in question['choices']: likelihood = calculate_choice_likelihood(model, tokenizer, question['question'], choice) choice_likelihoods[choice] = likelihood # Convert log probabilities to probabilities for confidence scoring max_log_prob = max(choice_likelihoods.values()) choice_probs = {choice: torch.exp(torch.tensor(log_prob - max_log_prob)).item() for choice, log_prob in choice_likelihoods.items()} # Normalize probabilities total_prob = sum(choice_probs.values()) if total_prob > 0: choice_probs = {choice: prob / total_prob for choice, prob in choice_probs.items()} # Select the choice with highest likelihood predicted_choice = max(choice_likelihoods.keys(), key=lambda x: choice_likelihoods[x]) is_correct = predicted_choice == question['correct_answer'] # Confidence is the probability of the selected choice confidence = choice_probs.get(predicted_choice, 0.0) results.append({ 'question_idx': i, 'predicted': predicted_choice, 'correct': is_correct, 'confidence': confidence, 'choice_likelihoods': choice_likelihoods, 'choice_probabilities': choice_probs, 'raw_response': f"Likelihoods: {choice_likelihoods}" }) if progress_callback: # Use remaining 80% for evaluation progress evaluation_progress = 0.2 + (i + 1) / len(questions) * 0.8 progress_callback(evaluation_progress, f"π Evaluating {model_path}: {i+1}/{len(questions)} questions (likelihood-based)") except Exception as e: logger.error(f"Error evaluating question {i} with {model_path}: {str(e)}") results.append({ 'question_idx': i, 'predicted': question['choices'][0] if question['choices'] else '', 'correct': False, 'confidence': 0.0, 'choice_likelihoods': {}, 'choice_probabilities': {}, 'raw_response': f"Error: {str(e)}" }) return results def run_evaluation(dataset_text, selected_predefined, custom_models_text="", progress=gr.Progress()): """Main evaluation function""" if not dataset_text.strip(): return ( "Please enter your dataset", "
No data provided
", None, None, gr.update(visible=True) ) # Parse custom models custom_models = [] if custom_models_text is None: custom_models_text = "" if custom_models_text.strip(): custom_models = [model.strip() for model in custom_models_text.strip().split('\n') if model.strip()] # Combine selected models all_models = [] # Add predefined models all_models.extend(selected_predefined) all_models.extend(custom_models) if not all_models: return ( "Please select at least one model or add custom models", "No models selected
", None, None, gr.update(visible=False) ) # Parse dataset questions, parse_error = parse_dataset(dataset_text) if parse_error: return ( f"Dataset parsing error:\n{parse_error}", "Failed to parse dataset
", None, None, gr.update(visible=True) ) if not questions: return ( "No valid questions found in dataset", "No questions to evaluate
", None, None, gr.update(visible=True) ) # Run evaluation progress(0, "Starting evaluation...") results = {} total_steps = len(all_models) * len(questions) current_step = 0 summary_md = create_summary_markdown({}) for model_path in all_models: display_name = model_path.split('/')[-1] if '/' in model_path else model_path try: def model_progress(p, msg): nonlocal current_step current_step = int(p * len(questions)) overall_progress = current_step / total_steps progress(overall_progress, msg) model_results = evaluate_model_on_questions(model_path, questions, model_progress) results[display_name] = model_results except Exception as e: logger.error(f"Failed to evaluate {display_name}: {str(e)}") results[display_name] = [{'error': str(e)}] * len(questions) # Clean up GPU memory if torch.cuda.is_available(): torch.cuda.empty_cache() gc.collect() # Generate outputs summary_stats = generate_summary_stats(questions, results) summary_md = create_summary_markdown(summary_stats) detailed_html = create_detailed_results_html(questions, results) accuracy_chart = create_accuracy_chart(summary_stats) confidence_chart = create_confidence_chart(results) return ( summary_md, detailed_html, accuracy_chart, confidence_chart, gr.update(visible=True) ) def generate_summary_stats(questions, results): """Generate summary statistics for all models""" summary = {} for model, model_results in results.items(): if not model_results or 'error' in model_results[0]: summary[model] = { 'accuracy': 0.0, 'correct': 0, 'total': len(questions), 'avg_confidence': 0.0, 'error': model_results[0].get('error', 'Unknown error') if model_results else 'No results' } continue correct_count = sum(1 for r in model_results if r.get('correct', False)) total_count = len(model_results) accuracy = correct_count / total_count if total_count > 0 else 0 # Calculate average confidence avg_confidence = sum(r.get('confidence', 0) for r in model_results) / total_count if total_count > 0 else 0 summary[model] = { 'accuracy': accuracy, 'correct': correct_count, 'total': total_count, 'avg_confidence': avg_confidence } return summary def create_summary_markdown(summary_stats): """Create markdown summary of results""" if not summary_stats: return "No results available" # Sort by accuracy sorted_models = sorted(summary_stats.items(), key=lambda x: x[1]['accuracy'], reverse=True) lines = ["## π Model Performance Summary\n"] for i, (model, stats) in enumerate(sorted_models): if 'error' in stats: lines.append(f"β **{model}**: Error - {stats['error']}") continue accuracy_pct = stats['accuracy'] * 100 medal = "π₯" if i == 0 else "π₯" if i == 1 else "π₯" if i == 2 else f"{i+1}." lines.append( f"{medal} **{model}**: {accuracy_pct:.1f}% " f"({stats['correct']}/{stats['total']} correct, " f"avg confidence: {stats['avg_confidence']:.2f})" ) return "\n".join(lines) def create_detailed_results_html(questions, results): """Create detailed HTML results for each question""" if not questions or not results: return "No detailed results available
" html_parts = [""" """] for q_idx, question in enumerate(questions): html_parts.append(f"""Detailed results will appear here...
", label="Detailed Question-by-Question Results" ) # Event handlers def update_dataset_from_sample(sample_name): if sample_name in SAMPLE_DATASETS: return gr.update(value=SAMPLE_DATASETS[sample_name]) return gr.update() sample_selector.change( fn=update_dataset_from_sample, inputs=sample_selector, outputs=dataset_input ) evaluate_btn.click( fn=run_evaluation, inputs=[dataset_input, predefined_selector, custom_models_input], outputs=[summary_output, detailed_results, accuracy_plot, confidence_plot, results_section] ) gr.Markdown(""" --- ### About Model Evaluation This tool loads and runs HuggingFace models for evaluation: **ποΈ How it works**: - Downloads models from HuggingFace Hub - Formats questions as prompts for each model - Runs likelihood based evaluation **β‘ Performance Tips**: - Use smaller models for testing - Larger models (7B+) require significant GPU memory - Models are cached after first load **π§ Supported Models**: - Any HuggingFace autoregressive language model - Both instruction-tuned and base models - Custom fine-tuned models via HF paths """) if __name__ == "__main__": demo.launch()