# Import necessary libraries (ensure all required imports are at the top) import os import pandas as pd from fastapi import FastAPI, HTTPException, Body from pydantic import BaseModel, Field from typing import List, Dict, Any, Optional from datasets import load_dataset, Dataset, DatasetDict from huggingface_hub import HfApi, hf_hub_download from datetime import datetime, timezone import logging import uvicorn import random # --- Constants and Config --- HF_DATASET_ID = "agents-course/unit4-students-scores" # --- Data Structures --- # questions_for_api will now hold richer dictionaries questions_for_api: List[Dict[str, Any]] = [] ground_truth_answers: Dict[str, str] = {} # --- Logging Setup --- logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) logger = logging.getLogger(__name__) # Make sure logger is initialized tool_threshold = 3 step_threshold = 5 questions_for_api: List[Dict[str, Any]] = [] # Use Dict[str, Any] for flexibility before validation ground_truth_answers: Dict[str, str] = {} filtered_dataset = None # Or initialize as empty list: [] def load_questions(): global filtered_dataset global questions_for_api global ground_truth_answers tempo_filtered = [] # Clear existing data questions_for_api.clear() ground_truth_answers.clear() logger.info("Starting to load and filter GAIA dataset (validation split)...") try: # Load the specified split and features dataset = load_dataset("gaia-benchmark/GAIA", "2023_level1", split="validation", trust_remote_code=True) logger.info(f"GAIA dataset validation split loaded. Features: {dataset.features}") # You can uncomment below to see the first item's structure if needed # logger.debug(f"First item structure: {dataset[0]}") except Exception as e: logger.error(f"Failed to load GAIA dataset: {e}", exc_info=True) raise RuntimeError("Could not load the primary GAIA dataset.") from e # --- Filtering Logic (remains the same) --- for item in dataset: metadata = item.get('Annotator Metadata') if metadata: num_tools_str = metadata.get('Number of tools') num_steps_str = metadata.get('Number of steps') if num_tools_str is not None and num_steps_str is not None: try: num_tools = int(num_tools_str) num_steps = int(num_steps_str) if num_tools < tool_threshold and num_steps < step_threshold: tempo_filtered.append(item) # Add the original item if it matches filter except ValueError: logger.warning(f"Skipping Task ID: {item.get('task_id', 'N/A')} - Could not convert tool/step count: tools='{num_tools_str}', steps='{num_steps_str}'.") # else: # If needed: log missing numbers in metadata # logger.warning(f"Skipping Task ID: {item.get('task_id', 'N/A')} - 'Number of tools' or 'Number of steps' missing in Metadata.") else: logger.warning(f"Skipping Task ID: {item.get('task_id', 'N/A')} - Missing 'Annotator Metadata'.") filtered_dataset = tempo_filtered # Store the list of filtered original items logger.info(f"Found {len(filtered_dataset)} questions matching the criteria (tools < {tool_threshold}, steps < {step_threshold}).") processed_count = 0 # --- REVISED Processing Logic to match the new Pydantic model --- for item in filtered_dataset: task_id = item.get('task_id') original_question_text = item.get('Question') # Get original text final_answer = item.get('Final answer') # Validate essential fields needed for processing & ground truth if task_id and original_question_text and final_answer is not None: # Create the dictionary for the API, selecting only the desired fields processed_item = { "task_id": str(task_id), # Ensure string type "question": str(original_question_text), # Rename and ensure string type # Include optional fields *if they exist* in the source item "Level": item.get("Level"), # Use .get() for safety, Pydantic handles None "file_name": item.get("file_name"), "file_path": item.get("file_path"), } # Optional: Clean up None values if Pydantic model doesn't handle them as desired # processed_item = {k: v for k, v in processed_item.items() if v is not None} # However, the Optional[...] fields in Pydantic should handle None correctly. # Append the structured dictionary matching the Pydantic model questions_for_api.append(processed_item) # Store the ground truth answer separately (as before) ground_truth_answers[str(task_id)] = str(final_answer) processed_count += 1 else: logger.warning(f"Skipping item due to missing essential fields (task_id, Question, or Final answer): task_id={task_id}") logger.info(f"Successfully processed {processed_count} questions for the API matching the Pydantic model.") if not questions_for_api: logger.error("CRITICAL: No valid questions loaded after filtering and processing. API endpoints needing questions will fail.") # raise RuntimeError("Failed to load mandatory question data after filtering.") # --- END REVISED Processing Logic --- # --- Pydantic Models --- class Question(BaseModel): task_id: str question: str Level: Optional[str] = None file_name: Optional[str] = None file_path: Optional[str] = None # --- The rest of your Pydantic models remain the same --- class AnswerItem(BaseModel): task_id: str submitted_answer: str = Field(..., description="The agent's answer for the task_id") class Submission(BaseModel): username: str = Field(..., description="Hugging Face username", min_length=1) agent_code: str = Field(..., description="The Python class code for the agent", min_length=10) # Basic check answers: List[AnswerItem] = Field(..., description="List of answers submitted by the agent") class ScoreResponse(BaseModel): username: str score: float correct_count: int total_attempted: int message: str timestamp: str class ErrorResponse(BaseModel): detail: str # Keep other models as they are (AnswerItem, Submission, ScoreResponse, ErrorResponse) # ... (rest of the Pydantic models remain the same) ... class AnswerItem(BaseModel): task_id: str submitted_answer: str = Field(..., description="The agent's answer for the task_id") class Submission(BaseModel): username: str = Field(..., description="Hugging Face username", min_length=1) agent_code: str = Field(..., description="The Python class code for the agent", min_length=10) # Basic check answers: List[AnswerItem] = Field(..., description="List of answers submitted by the agent") class ScoreResponse(BaseModel): username: str score: float correct_count: int total_attempted: int message: str timestamp: str class ErrorResponse(BaseModel): detail: str # --- FastAPI Application --- app = FastAPI( title="Agent Evaluation API", description="API to fetch questions and submit agent answers for scoring.", ) # --- Startup Event --- @app.on_event("startup") async def startup_event(): logger.info("Application startup: Loading questions...") try: load_questions() if not questions_for_api: logger.error("CRITICAL: No questions were loaded during startup.") else: logger.info(f"Successfully loaded {len(questions_for_api)} questions.") except Exception as e: logger.error(f"CRITICAL ERROR DURING STARTUP while loading questions: {e}", exc_info=True) # import sys # sys.exit(1) # Consider exiting if questions are critical # --- Helper Function (update_huggingface_dataset remains the same) --- # ... (update_huggingface_dataset function code) ... def update_huggingface_dataset(username: str, score: float): """Loads the dataset, updates the score if higher, and pushes back.""" try: # 1. Load the dataset logger.info(f"Loading dataset '{HF_DATASET_ID}'...") ds_dict = None try: # Use hf_hub_download to check if the parquet file exists, avoiding full dataset load error if empty # This assumes the dataset uses the default 'train' split and parquet format. Adjust if needed. hf_hub_download(repo_id=HF_DATASET_ID, filename="data/train-00000-of-00001.parquet", repo_type="dataset") ds_dict = load_dataset(HF_DATASET_ID) logger.info("Dataset loaded successfully.") if "train" not in ds_dict: logger.warning(f"Dataset '{HF_DATASET_ID}' does not contain a 'train' split. Creating one.") df = pd.DataFrame({'username': pd.Series(dtype='str'), 'score': pd.Series(dtype='float'), 'timestamp': pd.Series(dtype='str')}) else: # Convert the 'train' split to a pandas DataFrame for easier manipulation df = ds_dict['train'].to_pandas() except Exception as load_error: # Catch broad exception for file not found or other loading issues logger.warning(f"Could not load dataset '{HF_DATASET_ID}' or it might be empty/new ({load_error}). Creating structure.") # Create an empty DataFrame with the correct schema df = pd.DataFrame({'username': pd.Series(dtype='str'), 'score': pd.Series(dtype='float'), 'timestamp': pd.Series(dtype='str')}) # Ensure columns exist, add if they don't for col, dtype in [('username', 'str'), ('score', 'float'), ('timestamp', 'str')]: if col not in df.columns: logger.warning(f"Column '{col}' not found in dataset. Adding it.") df[col] = pd.Series(dtype=dtype) # Convert score column to numeric, coercing errors df['score'] = pd.to_numeric(df['score'], errors='coerce') # Fill potential NaN values in score with 0.0 before comparison/aggregation df['score'] = df['score'].fillna(0.0) # 2. Find existing score for the user existing_entries = df[df['username'] == username] current_timestamp = datetime.now(timezone.utc).isoformat() needs_update = False if not existing_entries.empty: # User exists, find their highest score # Handle potential NaN scores from coercion or previous bad data (though fillna above should help) max_existing_score = existing_entries['score'].max() if score > max_existing_score: logger.info(f"New score {score} is higher than existing max {max_existing_score} for {username}. Updating.") # Remove old entries for this user df = df[df['username'] != username] # Add new entry new_entry = pd.DataFrame([{'username': username, 'score': score, 'timestamp': current_timestamp}]) df = pd.concat([df, new_entry], ignore_index=True) needs_update = True else: logger.info(f"New score {score} is not higher than existing max {max_existing_score} for {username}. No update needed.") else: # User does not exist, add them logger.info(f"User {username} not found. Adding new entry.") new_entry = pd.DataFrame([{'username': username, 'score': score, 'timestamp': current_timestamp}]) df = pd.concat([df, new_entry], ignore_index=True) needs_update = True # 3. Push updated data back to Hugging Face Hub if changes were made if needs_update: logger.info(f"Pushing updated dataset to '{HF_DATASET_ID}'...") # Convert potentially modified DataFrame back to a Dataset object # Ensure the schema matches if columns were added/modified. # Use 'train' split convention. # Make sure the dtypes are correct before creating the Dataset df['username'] = df['username'].astype(str) df['score'] = df['score'].astype(float) df['timestamp'] = df['timestamp'].astype(str) updated_ds = DatasetDict({'train': Dataset.from_pandas(df)}) logger.info(f"Dataset to push: {updated_ds}") # Log the dataset structure # updated_ds.push_to_hub(HF_DATASET_ID) # Uncomment this line to enable leaderboard updates logger.warning("Dataset push to hub is currently commented out. Uncomment the line above to enable leaderboard updates.") # REMINDER logger.info("Dataset push simulated/attempted.") return True else: return False # No update was pushed except Exception as e: logger.error(f"Error interacting with Hugging Face dataset '{HF_DATASET_ID}': {e}", exc_info=True) # Re-raise the exception to be caught by the endpoint handler raise HTTPException(status_code=500, detail=f"Failed to update Hugging Face dataset: {e}") # --- API Endpoints (Modified response_model) --- @app.get("/questions", # Return a list of dictionaries with arbitrary keys/values response_model=List[Dict[str, Any]], summary="Get All Filtered Questions (Full Data)", description="Returns the complete list of questions with all associated data (excluding answer/annotation) filtered based on criteria.") async def get_questions(): """ Provides the list of questions (with extended data) that agents should answer. """ if not questions_for_api: logger.error("GET /questions requested but no questions are loaded.") raise HTTPException(status_code=404, detail="No questions available.") # questions_for_api now contains the richer dictionaries return questions_for_api @app.get("/random-question", # Return a single dictionary with arbitrary keys/values response_model=Dict[str, Any], summary="Get One Random Question (Full Data)", description="Returns a single random question with all associated data (excluding answer/annotation) from the available filtered set.", responses={ 200: {"description": "A random question with its full data."}, 404: {"model": ErrorResponse, "description": "No questions available to choose from."} }) async def get_random_question(): """ Provides a single, randomly selected question with its extended data. """ if not questions_for_api: logger.warning("GET /random-question requested but no questions are loaded.") raise HTTPException(status_code=404, detail="No questions available to choose from.") # Select and return a random question dictionary random_question = random.choice(questions_for_api) logger.info(f"Returning random question with task_id: {random_question.get('task_id', 'N/A')}") # random_question is already the richer dictionary return random_question # --- Submit Endpoint (remains the same, uses ground_truth_answers) --- @app.post("/submit", response_model=ScoreResponse, summary="Submit Agent Answers", description="Submit answers from an agent, calculate score, and update leaderboard on Hugging Face.", responses={ 200: {"description": "Submission successful, score calculated."}, 400: {"model": ErrorResponse, "description": "Invalid input data."}, 404: {"model": ErrorResponse, "description": "Task ID not found in submission or ground truth."}, 500: {"model": ErrorResponse, "description": "Server error (e.g., failed to update dataset)."} }) async def submit_answers(submission: Submission = Body(...)): """ Receives agent submissions: - Validates input. - Checks presence of agent code (basic anti-cheat). - Calculates score based on submitted answers vs ground truth. - Updates the score on the Hugging Face dataset if it's a new high score for the user. """ logger.info(f"Received submission from username: {submission.username}") # Basic check for agent code presence if not submission.agent_code or len(submission.agent_code.strip()) < 10: logger.warning(f"Submission rejected for {submission.username}: Agent code missing or too short.") raise HTTPException(status_code=400, detail="Agent code is required and must be sufficiently long.") if not submission.answers: logger.warning(f"Submission rejected for {submission.username}: No answers provided.") raise HTTPException(status_code=400, detail="No answers provided in the submission.") correct_count = 0 total_attempted_in_payload = len(submission.answers) valid_attempted_count = 0 # Count attempts where task_id was valid processed_ids = set() for answer_item in submission.answers: task_id = str(answer_item.task_id) # Ensure string comparison submitted = str(answer_item.submitted_answer) # Ensure string comparison # Prevent duplicate task_id submissions in the same request if task_id in processed_ids: logger.warning(f"Duplicate task_id '{task_id}' in submission from {submission.username}. Skipping.") continue # Don't count this as an attempt for scoring processed_ids.add(task_id) # Check if task_id is valid (exists in our loaded ground truth) if task_id not in ground_truth_answers: logger.warning(f"Task ID '{task_id}' submitted by {submission.username} not found in ground truth list. Skipping this answer.") # Don't count this as a valid attempt for score calculation continue # If we reach here, the task_id is valid valid_attempted_count += 1 ground_truth = ground_truth_answers[task_id] # Compare answers (case-insensitive, strip whitespace) if submitted.strip().lower() == ground_truth.strip().lower(): correct_count += 1 logger.debug(f"Correct answer for {task_id} from {submission.username}") else: logger.debug(f"Incorrect answer for {task_id} from {submission.username}. Submitted: '{submitted}', Expected: '{ground_truth}'") # Calculate score based on valid attempts AND total number of questions available if valid_attempted_count == 0: score = 0.0 message = f"Submission received, but no valid/matching task IDs were found in the {total_attempted_in_payload} answers provided." logger.warning(f"No valid answers processed for {submission.username} out of {total_attempted_in_payload} submitted.") elif not ground_truth_answers: # Prevent division by zero if no questions loaded score = 0.0 message = "Score cannot be calculated because no ground truth answers are loaded." logger.error(f"Cannot calculate score for {submission.username}: ground_truth_answers is empty.") else: # Score is based on correct answers divided by the TOTAL number of questions in the filtered set score = round((correct_count / len(ground_truth_answers)) * 100, 2) message = f"Score calculated successfully: {correct_count}/{len(ground_truth_answers)} total questions answered correctly ({valid_attempted_count} valid tasks attempted)." if valid_attempted_count < total_attempted_in_payload: message += f" ({total_attempted_in_payload - valid_attempted_count} submitted answers had invalid or duplicate task IDs)." logger.info(f"Score for {submission.username}: {score}% ({correct_count}/{len(ground_truth_answers)} correct, based on {valid_attempted_count} valid attempts)") # Update Hugging Face dataset try: updated = update_huggingface_dataset(submission.username, score) if updated: message += " High score updated on leaderboard." logger.info(f"Leaderboard updated for {submission.username}.") else: message += " Score did not improve previous record, leaderboard not updated." logger.info(f"Leaderboard not updated for {submission.username} as score was not higher.") except HTTPException as http_exc: # Propagate HTTPException from the helper function (e.g., 500 error) raise http_exc except Exception as e: # Catch any other unexpected errors during HF update logger.error(f"Unexpected error during dataset update for {submission.username}: {e}", exc_info=True) raise HTTPException(status_code=500, detail="An unexpected error occurred while updating the leaderboard.") return ScoreResponse( username=submission.username, score=score, correct_count=correct_count, # Return the count of *valid* attempts for clarity total_attempted=valid_attempted_count, message=message, timestamp=datetime.now(timezone.utc).isoformat() ) # --- Run the application --- if __name__ == "__main__": logger.info("Starting FastAPI server for local development...") try: load_questions() # Load questions before starting server if not questions_for_api: logger.error("EXITING: Cannot start server without loaded questions.") # Optional: exit if questions are essential # import sys # sys.exit(1) else: local_port = int(os.getenv("PORT", "8000")) logger.info(f"Running Uvicorn locally on http://127.0.0.1:{local_port}") uvicorn.run(app, host="127.0.0.1", port=local_port, log_level="info") except Exception as e: logger.error(f"Failed to start server: {e}", exc_info=True)