tatianija's picture
Update app.py
46a7b3e verified
raw
history blame
13.9 kB
import os
import gradio as gr
import requests
import inspect
import time
import pandas as pd
from smolagents import DuckDuckGoSearchTool
import threading
from typing import Dict, List, Optional, Tuple
import json
from huggingface_hub import InferenceClient
# --- Constants ---
DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space"
# --- Global Cache for Answers ---
cached_answers = {}
cached_questions = []
processing_status = {"is_processing": False, "progress": 0, "total": 0}
# --- Basic Agent Definition ---
class BasicAgent:
def __init__(self, debug: bool = False):
self.search = DuckDuckGoSearchTool()
self.debug = debug
if self.debug:
print("BasicAgent initialized.")
def __call__(self, question: str) -> str:
if self.debug:
print(f"Agent received question: {question}")
# Early validation
if not question or not question.strip():
return "Please provide a valid question."
try:
time.sleep(1)
results = self.search(question)
# Use truthfulness check and early return
if not results:
return "No results found for that query."
# Direct access with get() method chaining
top = results[0]
title = top.get("title") or "No title"
snippet = top.get("snippet", "").strip()
link = top.get("link", "")
# Build answer more efficiently
parts = [f"**{title}**"]
if snippet:
parts.append(snippet)
if link:
parts.append(f"Source: {link}")
answer = "\n".join(parts)
except (IndexError, KeyError, AttributeError):
# More specific exception handling
answer = "Sorry, I couldn't process the search results properly."
except Exception as e:
answer = f"Sorry, I couldn't fetch results due to: {e}"
if self.debug:
print(f"Agent returning answer: {answer}")
return answer
def fetch_questions() -> Tuple[str, Optional[pd.DataFrame]]:
"""
Fetch questions from the API and cache them.
"""
global cached_questions
api_url = DEFAULT_API_URL
questions_url = f"{api_url}/questions"
print(f"Fetching questions from: {questions_url}")
try:
response = requests.get(questions_url, timeout=15)
response.raise_for_status()
questions_data = response.json()
if not questions_data:
return "Fetched questions list is empty.", None
cached_questions = questions_data
# Create DataFrame for display
display_data = []
for item in questions_data:
display_data.append({
"Task ID": item.get("task_id", "Unknown"),
"Question": item.get("question", "")
})
df = pd.DataFrame(display_data)
status_msg = f"Successfully fetched {len(questions_data)} questions. Ready to generate answers."
return status_msg, df
except requests.exceptions.RequestException as e:
return f"Error fetching questions: {e}", None
except Exception as e:
return f"An unexpected error occurred: {e}", None
def generate_answers_async(progress_callback=None):
"""
Generate answers for all cached questions asynchronously.
"""
global cached_answers, processing_status
if not cached_questions:
return "No questions available. Please fetch questions first."
processing_status["is_processing"] = True
processing_status["progress"] = 0
processing_status["total"] = len(cached_questions)
try:
agent = BasicAgent()
cached_answers = {}
for i, item in enumerate(cached_questions):
if not processing_status["is_processing"]: # Check if cancelled
break
task_id = item.get("task_id")
question_text = item.get("question")
if not task_id or question_text is None:
continue
try:
answer = agent(question_text)
cached_answers[task_id] = {
"question": question_text,
"answer": answer
}
except Exception as e:
cached_answers[task_id] = {
"question": question_text,
"answer": f"AGENT ERROR: {e}"
}
processing_status["progress"] = i + 1
if progress_callback:
progress_callback(i + 1, len(cached_questions))
except Exception as e:
print(f"Error in generate_answers_async: {e}")
finally:
processing_status["is_processing"] = False
def start_answer_generation():
"""
Start the answer generation process in a separate thread.
"""
if processing_status["is_processing"]:
return "Answer generation is already in progress.", None
if not cached_questions:
return "No questions available. Please fetch questions first.", None
# Start generation in background thread
thread = threading.Thread(target=generate_answers_async)
thread.daemon = True
thread.start()
return "Answer generation started. Check progress below.", None
def get_generation_progress():
"""
Get the current progress of answer generation.
"""
if not processing_status["is_processing"] and processing_status["progress"] == 0:
return "Not started", None
if processing_status["is_processing"]:
progress = processing_status["progress"]
total = processing_status["total"]
status_msg = f"Generating answers... {progress}/{total} completed"
return status_msg, None
else:
# Generation completed
if cached_answers:
# Create DataFrame with results
display_data = []
for task_id, data in cached_answers.items():
display_data.append({
"Task ID": task_id,
"Question": data["question"][:100] + "..." if len(data["question"]) > 100 else data["question"],
"Generated Answer": data["answer"][:200] + "..." if len(data["answer"]) > 200 else data["answer"]
})
df = pd.DataFrame(display_data)
status_msg = f"Answer generation completed! {len(cached_answers)} answers ready for submission."
return status_msg, df
else:
return "Answer generation completed but no answers were generated.", None
def submit_cached_answers(profile: gr.OAuthProfile | None):
"""
Submit the cached answers to the evaluation API.
"""
global cached_answers
if not profile:
return "Please log in to Hugging Face first.", None
if not cached_answers:
return "No cached answers available. Please generate answers first.", None
username = profile.username
space_id = os.getenv("SPACE_ID")
agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main" if space_id else "Unknown"
# Prepare submission payload
answers_payload = []
for task_id, data in cached_answers.items():
answers_payload.append({
"task_id": task_id,
"submitted_answer": data["answer"]
})
submission_data = {
"username": username.strip(),
"agent_code": agent_code,
"answers": answers_payload
}
# Submit to API
api_url = DEFAULT_API_URL
submit_url = f"{api_url}/submit"
print(f"Submitting {len(answers_payload)} answers to: {submit_url}")
try:
response = requests.post(submit_url, json=submission_data, timeout=60)
response.raise_for_status()
result_data = response.json()
final_status = (
f"Submission Successful!\n"
f"User: {result_data.get('username')}\n"
f"Overall Score: {result_data.get('score', 'N/A')}% "
f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n"
f"Message: {result_data.get('message', 'No message received.')}"
)
# Create results DataFrame
results_log = []
for task_id, data in cached_answers.items():
results_log.append({
"Task ID": task_id,
"Question": data["question"],
"Submitted Answer": data["answer"]
})
results_df = pd.DataFrame(results_log)
return final_status, results_df
except requests.exceptions.HTTPError as e:
error_detail = f"Server responded with status {e.response.status_code}."
try:
error_json = e.response.json()
error_detail += f" Detail: {error_json.get('detail', e.response.text)}"
except:
error_detail += f" Response: {e.response.text[:500]}"
return f"Submission Failed: {error_detail}", None
except requests.exceptions.Timeout:
return "Submission Failed: The request timed out.", None
except Exception as e:
return f"Submission Failed: {e}", None
def clear_cache():
"""
Clear all cached data.
"""
global cached_answers, cached_questions, processing_status
cached_answers = {}
cached_questions = []
processing_status = {"is_processing": False, "progress": 0, "total": 0}
return "Cache cleared successfully.", None
# --- Enhanced Gradio Interface ---
with gr.Blocks(title="Enhanced Agent Evaluation Runner") as demo:
gr.Markdown("# Enhanced Agent Evaluation Runner with Answer Caching")
gr.Markdown(
"""
**Enhanced Instructions:**
1. **Clone and Modify**: Clone this space and modify the agent logic as needed.
2. **Login**: Log in to your Hugging Face account.
3. **Fetch Questions**: Load all questions from the evaluation API.
4. **Generate Answers**: Create answers for all questions (runs in background).
5. **Review Results**: Check the generated answers before submission.
6. **Submit**: Submit your answers when ready.
**Benefits of this approach:**
- ✅ Faster user feedback (separate steps)
- ✅ Ability to review answers before submission
- ✅ Progress tracking during answer generation
- ✅ Cache management for multiple runs
---
"""
)
with gr.Row():
gr.LoginButton()
clear_btn = gr.Button("Clear Cache", variant="secondary")
with gr.Tab("Step 1: Fetch Questions"):
gr.Markdown("### Fetch Questions from API")
fetch_btn = gr.Button("Fetch Questions", variant="primary")
fetch_status = gr.Textbox(label="Fetch Status", lines=2, interactive=False)
questions_table = gr.DataFrame(label="Available Questions", wrap=True)
fetch_btn.click(
fn=fetch_questions,
outputs=[fetch_status, questions_table]
)
with gr.Tab("Step 2: Generate Answers"):
gr.Markdown("### Generate Answers (Background Processing)")
with gr.Row():
generate_btn = gr.Button("Start Answer Generation", variant="primary")
refresh_btn = gr.Button("Refresh Progress", variant="secondary")
generation_status = gr.Textbox(label="Generation Status", lines=2, interactive=False)
answers_preview = gr.DataFrame(label="Generated Answers Preview", wrap=True)
generate_btn.click(
fn=start_answer_generation,
outputs=[generation_status, answers_preview]
)
refresh_btn.click(
fn=get_generation_progress,
outputs=[generation_status, answers_preview]
)
with gr.Tab("Step 3: Submit Results"):
gr.Markdown("### Submit Generated Answers")
submit_btn = gr.Button("Submit Cached Answers", variant="primary")
submission_status = gr.Textbox(label="Submission Status", lines=5, interactive=False)
final_results = gr.DataFrame(label="Final Submission Results", wrap=True)
submit_btn.click(
fn=submit_cached_answers,
outputs=[submission_status, final_results]
)
# Clear cache functionality
clear_btn.click(
fn=clear_cache,
outputs=[fetch_status, questions_table]
)
# Auto-refresh progress every 5 seconds when generation is active
demo.load(
fn=get_generation_progress,
outputs=[generation_status, answers_preview]
)
if __name__ == "__main__":
print("\n" + "-"*30 + " Enhanced App Starting " + "-"*30)
space_host_startup = os.getenv("SPACE_HOST")
space_id_startup = os.getenv("SPACE_ID")
if space_host_startup:
print(f"✅ SPACE_HOST found: {space_host_startup}")
print(f" Runtime URL should be: https://{space_host_startup}.hf.space")
else:
print("ℹ️ SPACE_HOST environment variable not found (running locally?).")
if space_id_startup:
print(f"✅ SPACE_ID found: {space_id_startup}")
print(f" Repo URL: https://huggingface.co/spaces/{space_id_startup}")
print(f" Repo Tree URL: https://huggingface.co/spaces/{space_id_startup}/tree/main")
else:
print("ℹ️ SPACE_ID environment variable not found (running locally?). Repo URL cannot be determined.")
print("-"*(60 + len(" Enhanced App Starting ")) + "\n")
print("Launching Enhanced Gradio Interface...")
demo.launch(debug=True, share=False)