Spaces:
Sleeping
Sleeping
import os | |
import gradio as gr | |
import requests | |
import inspect | |
import time | |
import pandas as pd | |
from smolagents import DuckDuckGoSearchTool | |
import threading | |
from typing import Dict, List, Optional, Tuple | |
import json | |
from huggingface_hub import InferenceClient | |
# --- Constants --- | |
DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" | |
# --- Global Cache for Answers --- | |
cached_answers = {} | |
cached_questions = [] | |
processing_status = {"is_processing": False, "progress": 0, "total": 0} | |
# --- Basic Agent Definition --- | |
class BasicAgent: | |
def __init__(self, debug: bool = False): | |
self.search = DuckDuckGoSearchTool() | |
self.debug = debug | |
if self.debug: | |
print("BasicAgent initialized.") | |
def __call__(self, question: str) -> str: | |
if self.debug: | |
print(f"Agent received question: {question}") | |
# Early validation | |
if not question or not question.strip(): | |
return "Please provide a valid question." | |
try: | |
time.sleep(1) | |
results = self.search(question) | |
# Use truthfulness check and early return | |
if not results: | |
return "No results found for that query." | |
# Direct access with get() method chaining | |
top = results[0] | |
title = top.get("title") or "No title" | |
snippet = top.get("snippet", "").strip() | |
link = top.get("link", "") | |
# Build answer more efficiently | |
parts = [f"**{title}**"] | |
if snippet: | |
parts.append(snippet) | |
if link: | |
parts.append(f"Source: {link}") | |
answer = "\n".join(parts) | |
except (IndexError, KeyError, AttributeError): | |
# More specific exception handling | |
answer = "Sorry, I couldn't process the search results properly." | |
except Exception as e: | |
answer = f"Sorry, I couldn't fetch results due to: {e}" | |
if self.debug: | |
print(f"Agent returning answer: {answer}") | |
return answer | |
def fetch_questions() -> Tuple[str, Optional[pd.DataFrame]]: | |
""" | |
Fetch questions from the API and cache them. | |
""" | |
global cached_questions | |
api_url = DEFAULT_API_URL | |
questions_url = f"{api_url}/questions" | |
print(f"Fetching questions from: {questions_url}") | |
try: | |
response = requests.get(questions_url, timeout=15) | |
response.raise_for_status() | |
questions_data = response.json() | |
if not questions_data: | |
return "Fetched questions list is empty.", None | |
cached_questions = questions_data | |
# Create DataFrame for display | |
display_data = [] | |
for item in questions_data: | |
display_data.append({ | |
"Task ID": item.get("task_id", "Unknown"), | |
"Question": item.get("question", "") | |
}) | |
df = pd.DataFrame(display_data) | |
status_msg = f"Successfully fetched {len(questions_data)} questions. Ready to generate answers." | |
return status_msg, df | |
except requests.exceptions.RequestException as e: | |
return f"Error fetching questions: {e}", None | |
except Exception as e: | |
return f"An unexpected error occurred: {e}", None | |
def generate_answers_async(progress_callback=None): | |
""" | |
Generate answers for all cached questions asynchronously. | |
""" | |
global cached_answers, processing_status | |
if not cached_questions: | |
return "No questions available. Please fetch questions first." | |
processing_status["is_processing"] = True | |
processing_status["progress"] = 0 | |
processing_status["total"] = len(cached_questions) | |
try: | |
agent = BasicAgent() | |
cached_answers = {} | |
for i, item in enumerate(cached_questions): | |
if not processing_status["is_processing"]: # Check if cancelled | |
break | |
task_id = item.get("task_id") | |
question_text = item.get("question") | |
if not task_id or question_text is None: | |
continue | |
try: | |
answer = agent(question_text) | |
cached_answers[task_id] = { | |
"question": question_text, | |
"answer": answer | |
} | |
except Exception as e: | |
cached_answers[task_id] = { | |
"question": question_text, | |
"answer": f"AGENT ERROR: {e}" | |
} | |
processing_status["progress"] = i + 1 | |
if progress_callback: | |
progress_callback(i + 1, len(cached_questions)) | |
except Exception as e: | |
print(f"Error in generate_answers_async: {e}") | |
finally: | |
processing_status["is_processing"] = False | |
def start_answer_generation(): | |
""" | |
Start the answer generation process in a separate thread. | |
""" | |
if processing_status["is_processing"]: | |
return "Answer generation is already in progress.", None | |
if not cached_questions: | |
return "No questions available. Please fetch questions first.", None | |
# Start generation in background thread | |
thread = threading.Thread(target=generate_answers_async) | |
thread.daemon = True | |
thread.start() | |
return "Answer generation started. Check progress below.", None | |
def get_generation_progress(): | |
""" | |
Get the current progress of answer generation. | |
""" | |
if not processing_status["is_processing"] and processing_status["progress"] == 0: | |
return "Not started", None | |
if processing_status["is_processing"]: | |
progress = processing_status["progress"] | |
total = processing_status["total"] | |
status_msg = f"Generating answers... {progress}/{total} completed" | |
return status_msg, None | |
else: | |
# Generation completed | |
if cached_answers: | |
# Create DataFrame with results | |
display_data = [] | |
for task_id, data in cached_answers.items(): | |
display_data.append({ | |
"Task ID": task_id, | |
"Question": data["question"][:100] + "..." if len(data["question"]) > 100 else data["question"], | |
"Generated Answer": data["answer"][:200] + "..." if len(data["answer"]) > 200 else data["answer"] | |
}) | |
df = pd.DataFrame(display_data) | |
status_msg = f"Answer generation completed! {len(cached_answers)} answers ready for submission." | |
return status_msg, df | |
else: | |
return "Answer generation completed but no answers were generated.", None | |
def submit_cached_answers(profile: gr.OAuthProfile | None): | |
""" | |
Submit the cached answers to the evaluation API. | |
""" | |
global cached_answers | |
if not profile: | |
return "Please log in to Hugging Face first.", None | |
if not cached_answers: | |
return "No cached answers available. Please generate answers first.", None | |
username = profile.username | |
space_id = os.getenv("SPACE_ID") | |
agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main" if space_id else "Unknown" | |
# Prepare submission payload | |
answers_payload = [] | |
for task_id, data in cached_answers.items(): | |
answers_payload.append({ | |
"task_id": task_id, | |
"submitted_answer": data["answer"] | |
}) | |
submission_data = { | |
"username": username.strip(), | |
"agent_code": agent_code, | |
"answers": answers_payload | |
} | |
# Submit to API | |
api_url = DEFAULT_API_URL | |
submit_url = f"{api_url}/submit" | |
print(f"Submitting {len(answers_payload)} answers to: {submit_url}") | |
try: | |
response = requests.post(submit_url, json=submission_data, timeout=60) | |
response.raise_for_status() | |
result_data = response.json() | |
final_status = ( | |
f"Submission Successful!\n" | |
f"User: {result_data.get('username')}\n" | |
f"Overall Score: {result_data.get('score', 'N/A')}% " | |
f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n" | |
f"Message: {result_data.get('message', 'No message received.')}" | |
) | |
# Create results DataFrame | |
results_log = [] | |
for task_id, data in cached_answers.items(): | |
results_log.append({ | |
"Task ID": task_id, | |
"Question": data["question"], | |
"Submitted Answer": data["answer"] | |
}) | |
results_df = pd.DataFrame(results_log) | |
return final_status, results_df | |
except requests.exceptions.HTTPError as e: | |
error_detail = f"Server responded with status {e.response.status_code}." | |
try: | |
error_json = e.response.json() | |
error_detail += f" Detail: {error_json.get('detail', e.response.text)}" | |
except: | |
error_detail += f" Response: {e.response.text[:500]}" | |
return f"Submission Failed: {error_detail}", None | |
except requests.exceptions.Timeout: | |
return "Submission Failed: The request timed out.", None | |
except Exception as e: | |
return f"Submission Failed: {e}", None | |
def clear_cache(): | |
""" | |
Clear all cached data. | |
""" | |
global cached_answers, cached_questions, processing_status | |
cached_answers = {} | |
cached_questions = [] | |
processing_status = {"is_processing": False, "progress": 0, "total": 0} | |
return "Cache cleared successfully.", None | |
# --- Enhanced Gradio Interface --- | |
with gr.Blocks(title="Enhanced Agent Evaluation Runner") as demo: | |
gr.Markdown("# Enhanced Agent Evaluation Runner with Answer Caching") | |
gr.Markdown( | |
""" | |
**Enhanced Instructions:** | |
1. **Clone and Modify**: Clone this space and modify the agent logic as needed. | |
2. **Login**: Log in to your Hugging Face account. | |
3. **Fetch Questions**: Load all questions from the evaluation API. | |
4. **Generate Answers**: Create answers for all questions (runs in background). | |
5. **Review Results**: Check the generated answers before submission. | |
6. **Submit**: Submit your answers when ready. | |
**Benefits of this approach:** | |
- ✅ Faster user feedback (separate steps) | |
- ✅ Ability to review answers before submission | |
- ✅ Progress tracking during answer generation | |
- ✅ Cache management for multiple runs | |
--- | |
""" | |
) | |
with gr.Row(): | |
gr.LoginButton() | |
clear_btn = gr.Button("Clear Cache", variant="secondary") | |
with gr.Tab("Step 1: Fetch Questions"): | |
gr.Markdown("### Fetch Questions from API") | |
fetch_btn = gr.Button("Fetch Questions", variant="primary") | |
fetch_status = gr.Textbox(label="Fetch Status", lines=2, interactive=False) | |
questions_table = gr.DataFrame(label="Available Questions", wrap=True) | |
fetch_btn.click( | |
fn=fetch_questions, | |
outputs=[fetch_status, questions_table] | |
) | |
with gr.Tab("Step 2: Generate Answers"): | |
gr.Markdown("### Generate Answers (Background Processing)") | |
with gr.Row(): | |
generate_btn = gr.Button("Start Answer Generation", variant="primary") | |
refresh_btn = gr.Button("Refresh Progress", variant="secondary") | |
generation_status = gr.Textbox(label="Generation Status", lines=2, interactive=False) | |
answers_preview = gr.DataFrame(label="Generated Answers Preview", wrap=True) | |
generate_btn.click( | |
fn=start_answer_generation, | |
outputs=[generation_status, answers_preview] | |
) | |
refresh_btn.click( | |
fn=get_generation_progress, | |
outputs=[generation_status, answers_preview] | |
) | |
with gr.Tab("Step 3: Submit Results"): | |
gr.Markdown("### Submit Generated Answers") | |
submit_btn = gr.Button("Submit Cached Answers", variant="primary") | |
submission_status = gr.Textbox(label="Submission Status", lines=5, interactive=False) | |
final_results = gr.DataFrame(label="Final Submission Results", wrap=True) | |
submit_btn.click( | |
fn=submit_cached_answers, | |
outputs=[submission_status, final_results] | |
) | |
# Clear cache functionality | |
clear_btn.click( | |
fn=clear_cache, | |
outputs=[fetch_status, questions_table] | |
) | |
# Auto-refresh progress every 5 seconds when generation is active | |
demo.load( | |
fn=get_generation_progress, | |
outputs=[generation_status, answers_preview] | |
) | |
if __name__ == "__main__": | |
print("\n" + "-"*30 + " Enhanced App Starting " + "-"*30) | |
space_host_startup = os.getenv("SPACE_HOST") | |
space_id_startup = os.getenv("SPACE_ID") | |
if space_host_startup: | |
print(f"✅ SPACE_HOST found: {space_host_startup}") | |
print(f" Runtime URL should be: https://{space_host_startup}.hf.space") | |
else: | |
print("ℹ️ SPACE_HOST environment variable not found (running locally?).") | |
if space_id_startup: | |
print(f"✅ SPACE_ID found: {space_id_startup}") | |
print(f" Repo URL: https://huggingface.co/spaces/{space_id_startup}") | |
print(f" Repo Tree URL: https://huggingface.co/spaces/{space_id_startup}/tree/main") | |
else: | |
print("ℹ️ SPACE_ID environment variable not found (running locally?). Repo URL cannot be determined.") | |
print("-"*(60 + len(" Enhanced App Starting ")) + "\n") | |
print("Launching Enhanced Gradio Interface...") | |
demo.launch(debug=True, share=False) |