Spaces:
Sleeping
Sleeping
""" | |
app.py | |
This script provides the Gradio web interface to run the evaluation. | |
This version properly handles multimodal inputs including images, videos, and audio. | |
""" | |
import os | |
import re | |
import gradio as gr | |
import requests | |
import pandas as pd | |
from urllib.parse import urlparse | |
from agent import create_agent_executor | |
# --- Constants --- | |
DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" | |
# --- Helper function to parse the agent's output --- | |
def parse_final_answer(agent_response: str) -> str: | |
match = re.search(r"FINAL ANSWER:\s*(.*)", agent_response, re.IGNORECASE | re.DOTALL) | |
if match: return match.group(1).strip() | |
lines = [line for line in agent_response.split('\n') if line.strip()] | |
if lines: return lines[-1].strip() | |
return "Could not parse a final answer." | |
def detect_file_type(url: str) -> str: | |
"""Detect the type of file from URL.""" | |
if not url: | |
return "unknown" | |
url_lower = url.lower() | |
# Image extensions | |
if any(ext in url_lower for ext in ['.jpg', '.jpeg', '.png', '.gif', '.bmp', '.webp', '.svg']): | |
return "image" | |
# Video extensions and YouTube | |
if any(domain in url_lower for domain in ['youtube.com', 'youtu.be', 'vimeo.com']): | |
return "youtube" | |
if any(ext in url_lower for ext in ['.mp4', '.avi', '.mov', '.wmv', '.flv', '.webm']): | |
return "video" | |
# Audio extensions | |
if any(ext in url_lower for ext in ['.mp3', '.wav', '.flac', '.aac', '.ogg', '.m4a']): | |
return "audio" | |
# Try to detect from headers if possible | |
try: | |
response = requests.head(url, timeout=5) | |
content_type = response.headers.get('content-type', '').lower() | |
if 'image' in content_type: | |
return "image" | |
elif 'audio' in content_type: | |
return "audio" | |
elif 'video' in content_type: | |
return "video" | |
except: | |
pass | |
return "unknown" | |
def create_enhanced_prompt(question_text: str, file_url: str = None) -> str: | |
"""Create an enhanced prompt that guides the agent to use appropriate tools.""" | |
if not file_url: | |
return question_text | |
file_type = detect_file_type(file_url) | |
if file_type == "image": | |
return f"""{question_text} | |
[IMAGE ATTACHMENT]: {file_url} | |
INSTRUCTION: There is an image attached to this question. You MUST use the 'describe_image' tool to analyze this image before answering the question.""" | |
elif file_type == "youtube": | |
return f"""{question_text} | |
[YOUTUBE VIDEO]: {file_url} | |
INSTRUCTION: There is a YouTube video attached to this question. You MUST use the 'process_youtube_video' tool to analyze this video before answering the question.""" | |
elif file_type == "audio": | |
return f"""{question_text} | |
[AUDIO FILE]: {file_url} | |
INSTRUCTION: There is an audio file attached to this question. You MUST use the 'process_audio_file' tool to analyze this audio before answering the question.""" | |
else: | |
return f"""{question_text} | |
[ATTACHMENT]: {file_url} | |
INSTRUCTION: There is a file attachment. Analyze the URL and use the appropriate tool to process this content before answering the question.""" | |
def run_and_submit_all(profile: gr.OAuthProfile | None): | |
""" | |
Fetches all questions, runs the agent on them, submits all answers, | |
and displays the results. | |
""" | |
if not profile: | |
return "Please log in to Hugging Face with the button above to submit.", None | |
username = profile.username | |
print(f"User logged in: {username}") | |
space_id = os.getenv("SPACE_ID") | |
agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main" | |
questions_url = f"{DEFAULT_API_URL}/questions" | |
submit_url = f"{DEFAULT_API_URL}/submit" | |
# 1. Instantiate Agent | |
print("Initializing your custom agent...") | |
try: | |
agent_executor = create_agent_executor(provider="google") # Using Google for better multimodal support | |
except Exception as e: | |
return f"Fatal Error: Could not initialize agent. Check logs. Details: {e}", None | |
# 2. Fetch Questions | |
print(f"Fetching questions from: {questions_url}") | |
try: | |
response = requests.get(questions_url, timeout=20) | |
response.raise_for_status() | |
questions_data = response.json() | |
print(f"Fetched {len(questions_data)} questions.") | |
except Exception as e: | |
return f"Error fetching questions: {e}", pd.DataFrame() | |
# 3. Run your Agent | |
results_log, answers_payload = [], [] | |
print(f"Running agent on {len(questions_data)} questions...") | |
for i, item in enumerate(questions_data): | |
task_id = item.get("task_id") | |
question_text = item.get("question") | |
if not task_id or question_text is None: | |
continue | |
print(f"\n--- Running Task {i+1}/{len(questions_data)} (ID: {task_id}) ---") | |
# Get file URL if it exists | |
file_url = item.get("file_url") | |
# Create enhanced prompt that instructs the agent to use appropriate tools | |
full_question_text = create_enhanced_prompt(question_text, file_url) | |
if file_url: | |
file_type = detect_file_type(file_url) | |
print(f"File detected: {file_url} (Type: {file_type})") | |
print(f"Enhanced Prompt for Agent:\n{full_question_text}") | |
try: | |
# Pass the enhanced question to the agent | |
result = agent_executor.invoke({"messages": [("user", full_question_text)]}) | |
raw_answer = result['messages'][-1].content | |
submitted_answer = parse_final_answer(raw_answer) | |
print(f"Raw LLM Response: '{raw_answer}'") | |
print(f"PARSED FINAL ANSWER: '{submitted_answer}'") | |
answers_payload.append({"task_id": task_id, "submitted_answer": submitted_answer}) | |
results_log.append({ | |
"Task ID": task_id, | |
"Question": question_text, | |
"File URL": file_url or "None", | |
"File Type": detect_file_type(file_url) if file_url else "None", | |
"Submitted Answer": submitted_answer | |
}) | |
except Exception as e: | |
print(f"!! AGENT ERROR on task {task_id}: {e}") | |
error_msg = f"AGENT RUNTIME ERROR: {e}" | |
answers_payload.append({"task_id": task_id, "submitted_answer": error_msg}) | |
results_log.append({ | |
"Task ID": task_id, | |
"Question": question_text, | |
"File URL": file_url or "None", | |
"File Type": detect_file_type(file_url) if file_url else "None", | |
"Submitted Answer": error_msg | |
}) | |
if not answers_payload: | |
return "Agent did not produce any answers to submit.", pd.DataFrame(results_log) | |
# 4. Prepare and 5. Submit | |
submission_data = {"username": username, "agent_code": agent_code, "answers": answers_payload} | |
print(f"\nSubmitting {len(answers_payload)} answers for user '{username}'...") | |
try: | |
response = requests.post(submit_url, json=submission_data, timeout=60) | |
response.raise_for_status() | |
result_data = response.json() | |
final_status = (f"Submission Successful!\nUser: {result_data.get('username')}\n" | |
f"Overall Score: {result_data.get('score', 'N/A')}%\n" | |
f"Processed {len([r for r in results_log if 'ERROR' not in r['Submitted Answer']])} successful tasks") | |
return final_status, pd.DataFrame(results_log) | |
except Exception as e: | |
status_message = f"Submission Failed: {e}" | |
print(status_message) | |
return status_message, pd.DataFrame(results_log) | |
# --- Gradio UI --- | |
with gr.Blocks(title="Multimodal Agent Evaluation") as demo: | |
gr.Markdown("# Multimodal Agent Evaluation Runner") | |
gr.Markdown("This agent can process images, YouTube videos, audio files, and perform web searches.") | |
gr.LoginButton() | |
run_button = gr.Button("Run Evaluation & Submit All Answers", variant="primary") | |
status_output = gr.Textbox(label="Run Status / Submission Result", lines=6, interactive=False) | |
results_table = gr.DataFrame( | |
label="Questions and Agent Answers", | |
wrap=True, | |
row_count=10, | |
column_widths=[80, 200, 150, 80, 200] | |
) | |
run_button.click(fn=run_and_submit_all, outputs=[status_output, results_table]) | |
if __name__ == "__main__": | |
print("\n" + "-"*30 + " Multimodal App Starting " + "-"*30) | |
demo.launch() |