import os import gradio as gr import requests import inspect import time import pandas as pd from smolagents import DuckDuckGoSearchTool import threading from typing import Dict, List, Optional, Tuple import json from huggingface_hub import InferenceClient # --- Constants --- DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" # --- Global Cache for Answers --- cached_answers = {} cached_questions = [] processing_status = {"is_processing": False, "progress": 0, "total": 0} # --- Basic Agent Definition --- class BasicAgent: def __init__(self, debug: bool = False): self.search = DuckDuckGoSearchTool() self.debug = debug if self.debug: print("BasicAgent initialized.") def __call__(self, question: str) -> str: if self.debug: print(f"Agent received question: {question}") # Early validation if not question or not question.strip(): return "Please provide a valid question." try: time.sleep(1) results = self.search(question) # Use truthfulness check and early return if not results: return "No results found for that query." # Direct access with get() method chaining top = results[0] title = top.get("title") or "No title" snippet = top.get("snippet", "").strip() link = top.get("link", "") # Build answer more efficiently parts = [f"**{title}**"] if snippet: parts.append(snippet) if link: parts.append(f"Source: {link}") answer = "\n".join(parts) except (IndexError, KeyError, AttributeError): # More specific exception handling answer = "Sorry, I couldn't process the search results properly." except Exception as e: answer = f"Sorry, I couldn't fetch results due to: {e}" if self.debug: print(f"Agent returning answer: {answer}") return answer def fetch_questions() -> Tuple[str, Optional[pd.DataFrame]]: """ Fetch questions from the API and cache them. """ global cached_questions api_url = DEFAULT_API_URL questions_url = f"{api_url}/questions" print(f"Fetching questions from: {questions_url}") try: response = requests.get(questions_url, timeout=15) response.raise_for_status() questions_data = response.json() if not questions_data: return "Fetched questions list is empty.", None cached_questions = questions_data # Create DataFrame for display display_data = [] for item in questions_data: display_data.append({ "Task ID": item.get("task_id", "Unknown"), "Question": item.get("question", "") }) df = pd.DataFrame(display_data) status_msg = f"Successfully fetched {len(questions_data)} questions. Ready to generate answers." return status_msg, df except requests.exceptions.RequestException as e: return f"Error fetching questions: {e}", None except Exception as e: return f"An unexpected error occurred: {e}", None def generate_answers_async(progress_callback=None): """ Generate answers for all cached questions asynchronously. """ global cached_answers, processing_status if not cached_questions: return "No questions available. Please fetch questions first." processing_status["is_processing"] = True processing_status["progress"] = 0 processing_status["total"] = len(cached_questions) try: agent = BasicAgent() cached_answers = {} for i, item in enumerate(cached_questions): if not processing_status["is_processing"]: # Check if cancelled break task_id = item.get("task_id") question_text = item.get("question") if not task_id or question_text is None: continue try: answer = agent(question_text) cached_answers[task_id] = { "question": question_text, "answer": answer } except Exception as e: cached_answers[task_id] = { "question": question_text, "answer": f"AGENT ERROR: {e}" } processing_status["progress"] = i + 1 if progress_callback: progress_callback(i + 1, len(cached_questions)) except Exception as e: print(f"Error in generate_answers_async: {e}") finally: processing_status["is_processing"] = False def start_answer_generation(): """ Start the answer generation process in a separate thread. """ if processing_status["is_processing"]: return "Answer generation is already in progress.", None if not cached_questions: return "No questions available. Please fetch questions first.", None # Start generation in background thread thread = threading.Thread(target=generate_answers_async) thread.daemon = True thread.start() return "Answer generation started. Check progress below.", None def get_generation_progress(): """ Get the current progress of answer generation. """ if not processing_status["is_processing"] and processing_status["progress"] == 0: return "Not started", None if processing_status["is_processing"]: progress = processing_status["progress"] total = processing_status["total"] status_msg = f"Generating answers... {progress}/{total} completed" return status_msg, None else: # Generation completed if cached_answers: # Create DataFrame with results display_data = [] for task_id, data in cached_answers.items(): display_data.append({ "Task ID": task_id, "Question": data["question"][:100] + "..." if len(data["question"]) > 100 else data["question"], "Generated Answer": data["answer"][:200] + "..." if len(data["answer"]) > 200 else data["answer"] }) df = pd.DataFrame(display_data) status_msg = f"Answer generation completed! {len(cached_answers)} answers ready for submission." return status_msg, df else: return "Answer generation completed but no answers were generated.", None def submit_cached_answers(profile: gr.OAuthProfile | None): """ Submit the cached answers to the evaluation API. """ global cached_answers if not profile: return "Please log in to Hugging Face first.", None if not cached_answers: return "No cached answers available. Please generate answers first.", None username = profile.username space_id = os.getenv("SPACE_ID") agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main" if space_id else "Unknown" # Prepare submission payload answers_payload = [] for task_id, data in cached_answers.items(): answers_payload.append({ "task_id": task_id, "submitted_answer": data["answer"] }) submission_data = { "username": username.strip(), "agent_code": agent_code, "answers": answers_payload } # Submit to API api_url = DEFAULT_API_URL submit_url = f"{api_url}/submit" print(f"Submitting {len(answers_payload)} answers to: {submit_url}") try: response = requests.post(submit_url, json=submission_data, timeout=60) response.raise_for_status() result_data = response.json() final_status = ( f"Submission Successful!\n" f"User: {result_data.get('username')}\n" f"Overall Score: {result_data.get('score', 'N/A')}% " f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n" f"Message: {result_data.get('message', 'No message received.')}" ) # Create results DataFrame results_log = [] for task_id, data in cached_answers.items(): results_log.append({ "Task ID": task_id, "Question": data["question"], "Submitted Answer": data["answer"] }) results_df = pd.DataFrame(results_log) return final_status, results_df except requests.exceptions.HTTPError as e: error_detail = f"Server responded with status {e.response.status_code}." try: error_json = e.response.json() error_detail += f" Detail: {error_json.get('detail', e.response.text)}" except: error_detail += f" Response: {e.response.text[:500]}" return f"Submission Failed: {error_detail}", None except requests.exceptions.Timeout: return "Submission Failed: The request timed out.", None except Exception as e: return f"Submission Failed: {e}", None def clear_cache(): """ Clear all cached data. """ global cached_answers, cached_questions, processing_status cached_answers = {} cached_questions = [] processing_status = {"is_processing": False, "progress": 0, "total": 0} return "Cache cleared successfully.", None # --- Enhanced Gradio Interface --- with gr.Blocks(title="Enhanced Agent Evaluation Runner") as demo: gr.Markdown("# Enhanced Agent Evaluation Runner with Answer Caching") gr.Markdown( """ **Enhanced Instructions:** 1. **Clone and Modify**: Clone this space and modify the agent logic as needed. 2. **Login**: Log in to your Hugging Face account. 3. **Fetch Questions**: Load all questions from the evaluation API. 4. **Generate Answers**: Create answers for all questions (runs in background). 5. **Review Results**: Check the generated answers before submission. 6. **Submit**: Submit your answers when ready. **Benefits of this approach:** - ✅ Faster user feedback (separate steps) - ✅ Ability to review answers before submission - ✅ Progress tracking during answer generation - ✅ Cache management for multiple runs --- """ ) with gr.Row(): gr.LoginButton() clear_btn = gr.Button("Clear Cache", variant="secondary") with gr.Tab("Step 1: Fetch Questions"): gr.Markdown("### Fetch Questions from API") fetch_btn = gr.Button("Fetch Questions", variant="primary") fetch_status = gr.Textbox(label="Fetch Status", lines=2, interactive=False) questions_table = gr.DataFrame(label="Available Questions", wrap=True) fetch_btn.click( fn=fetch_questions, outputs=[fetch_status, questions_table] ) with gr.Tab("Step 2: Generate Answers"): gr.Markdown("### Generate Answers (Background Processing)") with gr.Row(): generate_btn = gr.Button("Start Answer Generation", variant="primary") refresh_btn = gr.Button("Refresh Progress", variant="secondary") generation_status = gr.Textbox(label="Generation Status", lines=2, interactive=False) answers_preview = gr.DataFrame(label="Generated Answers Preview", wrap=True) generate_btn.click( fn=start_answer_generation, outputs=[generation_status, answers_preview] ) refresh_btn.click( fn=get_generation_progress, outputs=[generation_status, answers_preview] ) with gr.Tab("Step 3: Submit Results"): gr.Markdown("### Submit Generated Answers") submit_btn = gr.Button("Submit Cached Answers", variant="primary") submission_status = gr.Textbox(label="Submission Status", lines=5, interactive=False) final_results = gr.DataFrame(label="Final Submission Results", wrap=True) submit_btn.click( fn=submit_cached_answers, outputs=[submission_status, final_results] ) # Clear cache functionality clear_btn.click( fn=clear_cache, outputs=[fetch_status, questions_table] ) # Auto-refresh progress every 5 seconds when generation is active demo.load( fn=get_generation_progress, outputs=[generation_status, answers_preview] ) if __name__ == "__main__": print("\n" + "-"*30 + " Enhanced App Starting " + "-"*30) space_host_startup = os.getenv("SPACE_HOST") space_id_startup = os.getenv("SPACE_ID") if space_host_startup: print(f"✅ SPACE_HOST found: {space_host_startup}") print(f" Runtime URL should be: https://{space_host_startup}.hf.space") else: print("ℹ️ SPACE_HOST environment variable not found (running locally?).") if space_id_startup: print(f"✅ SPACE_ID found: {space_id_startup}") print(f" Repo URL: https://huggingface.co/spaces/{space_id_startup}") print(f" Repo Tree URL: https://huggingface.co/spaces/{space_id_startup}/tree/main") else: print("ℹ️ SPACE_ID environment variable not found (running locally?). Repo URL cannot be determined.") print("-"*(60 + len(" Enhanced App Starting ")) + "\n") print("Launching Enhanced Gradio Interface...") demo.launch(debug=True, share=False)