|
import os |
|
import gradio as gr |
|
import requests |
|
import pandas as pd |
|
from smolagents import CodeAgent, DuckDuckGoSearchTool, HfApiModel, tool |
|
import re |
|
import json |
|
import math |
|
import tempfile |
|
from pathlib import Path |
|
from urllib.parse import urlparse, parse_qs |
|
import yt_dlp |
|
from PIL import Image |
|
import pytesseract |
|
|
|
hf_token = os.getenv("HF_TOKEN") |
|
SPACE_ID = os.getenv("SPACE_ID") |
|
SPACE_HOST = os.getenv("SPACE_HOST") |
|
|
|
@tool |
|
def web_browser(url: str) -> str: |
|
""" |
|
Fetches content from a web URL. |
|
|
|
Args: |
|
url: The URL to fetch content from. |
|
|
|
Returns: |
|
Text content from the webpage. |
|
""" |
|
try: |
|
headers = { |
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' |
|
} |
|
response = requests.get(url, headers=headers, timeout=10) |
|
response.raise_for_status() |
|
|
|
|
|
content = response.text |
|
|
|
content = re.sub(r'<[^>]+>', ' ', content) |
|
content = re.sub(r'\s+', ' ', content).strip() |
|
|
|
return content[:2000] + "..." if len(content) > 2000 else content |
|
|
|
except Exception as e: |
|
return f"Error accessing URL: {str(e)}" |
|
|
|
@tool |
|
def youtube_transcript_extractor(url: str) -> str: |
|
""" |
|
Extracts transcript or information from YouTube videos. |
|
|
|
Args: |
|
url: YouTube URL. |
|
|
|
Returns: |
|
Video information and transcript if available. |
|
""" |
|
try: |
|
|
|
if "youtube.com/watch" in url: |
|
video_id = parse_qs(urlparse(url).query).get('v', [None])[0] |
|
elif "youtu.be/" in url: |
|
video_id = urlparse(url).path[1:] |
|
else: |
|
return "Invalid YouTube URL format" |
|
|
|
if not video_id: |
|
return "Could not extract video ID from URL" |
|
|
|
|
|
ydl_opts = { |
|
'quiet': True, |
|
'no_warnings': True, |
|
'writesubtitles': True, |
|
'writeautomaticsub': True, |
|
} |
|
|
|
with yt_dlp.YoutubeDL(ydl_opts) as ydl: |
|
info = ydl.extract_info(f"https://www.youtube.com/watch?v={video_id}", download=False) |
|
|
|
result = f"Title: {info.get('title', 'N/A')}\n" |
|
result += f"Description: {info.get('description', 'N/A')[:500]}...\n" |
|
result += f"Duration: {info.get('duration', 'N/A')} seconds\n" |
|
result += f"View count: {info.get('view_count', 'N/A')}\n" |
|
|
|
|
|
if 'subtitles' in info and info['subtitles']: |
|
result += "\n--- Transcript Available ---\n" |
|
|
|
|
|
return result |
|
|
|
except Exception as e: |
|
return f"Error extracting YouTube content: {str(e)}" |
|
|
|
@tool |
|
def image_ocr_analyzer(image_path: str) -> str: |
|
""" |
|
Performs OCR on images to extract text. |
|
|
|
Args: |
|
image_path: Path to the image file. |
|
|
|
Returns: |
|
Extracted text from the image. |
|
""" |
|
try: |
|
|
|
image = Image.open(image_path) |
|
|
|
|
|
extracted_text = pytesseract.image_to_string(image) |
|
|
|
if not extracted_text.strip(): |
|
return "No text found in the image" |
|
|
|
return f"Extracted text:\n{extracted_text.strip()}" |
|
|
|
except Exception as e: |
|
return f"Error performing OCR: {str(e)}" |
|
|
|
@tool |
|
def pdf_text_extractor(file_path: str) -> str: |
|
""" |
|
Extracts text from PDF files. |
|
|
|
Args: |
|
file_path: Path to the PDF file. |
|
|
|
Returns: |
|
Extracted text from PDF. |
|
""" |
|
try: |
|
import PyPDF2 |
|
|
|
with open(file_path, 'rb') as file: |
|
pdf_reader = PyPDF2.PdfReader(file) |
|
text = "" |
|
|
|
for page_num in range(len(pdf_reader.pages)): |
|
page = pdf_reader.pages[page_num] |
|
text += page.extract_text() + "\n" |
|
|
|
return text[:3000] + "..." if len(text) > 3000 else text |
|
|
|
except Exception as e: |
|
return f"Error extracting PDF text: {str(e)}" |
|
|
|
@tool |
|
def veterinary_document_analyzer(text: str) -> str: |
|
""" |
|
Analyzes veterinary documents to extract specific information like names. |
|
|
|
Args: |
|
text: Document text to analyze. |
|
|
|
Returns: |
|
Extracted veterinary information. |
|
""" |
|
try: |
|
|
|
vet_patterns = [ |
|
r"Dr\.?\s+([A-Z][a-z]+)\s+([A-Z][a-z]+)", |
|
r"Doctor\s+([A-Z][a-z]+)\s+([A-Z][a-z]+)", |
|
r"veterinarian\s+([A-Z][a-z]+)\s+([A-Z][a-z]+)", |
|
r"DVM\s+([A-Z][a-z]+)\s+([A-Z][a-z]+)", |
|
] |
|
|
|
found_vets = [] |
|
for pattern in vet_patterns: |
|
matches = re.findall(pattern, text, re.IGNORECASE) |
|
for match in matches: |
|
full_name = f"{match[0]} {match[1]}" |
|
if full_name not in found_vets: |
|
found_vets.append(full_name) |
|
|
|
if found_vets: |
|
return f"Found veterinarian(s): {', '.join(found_vets)}" |
|
else: |
|
return "No veterinarian names found in the document" |
|
|
|
except Exception as e: |
|
return f"Error analyzing veterinary document: {str(e)}" |
|
|
|
|
|
@tool |
|
def analyze_excel_file(file_path: str, analysis_type: str = "general") -> str: |
|
""" |
|
Analyzes Excel files with multiple analysis types. |
|
""" |
|
try: |
|
df = pd.read_excel(file_path) |
|
|
|
if analysis_type == "general": |
|
return f"Excel file contains {len(df)} rows and {len(df.columns)} columns. Columns: {list(df.columns)}" |
|
|
|
elif analysis_type == "food_sales": |
|
if 'category' in df.columns and 'price' in df.columns and 'quantity' in df.columns: |
|
food_df = df[df['category'].str.lower() == 'food'] |
|
total_sales = (food_df['price'] * food_df['quantity']).sum() |
|
return f"Total food sales: ${total_sales:.2f}" |
|
else: |
|
return "Required columns (category, price, quantity) not found" |
|
|
|
elif analysis_type == "summary": |
|
summary = df.describe(include='all').to_string() |
|
return f"Data summary:\n{summary}" |
|
|
|
elif analysis_type == "categories": |
|
if 'category' in df.columns: |
|
categories = df['category'].value_counts() |
|
return f"Categories breakdown:\n{categories.to_string()}" |
|
else: |
|
return "No category column found" |
|
|
|
return "Unknown analysis type" |
|
|
|
except Exception as e: |
|
return f"Error analyzing Excel file: {str(e)}" |
|
|
|
@tool |
|
def advanced_calculator(expression: str) -> str: |
|
""" |
|
Evaluates mathematical expressions safely, including advanced functions. |
|
""" |
|
try: |
|
expression = expression.replace('^', '**') |
|
allowed_functions = { |
|
'abs': abs, 'round': round, 'min': min, 'max': max, |
|
'sum': sum, 'len': len, |
|
'sqrt': math.sqrt, 'pow': math.pow, 'log': math.log, |
|
'sin': math.sin, 'cos': math.cos, 'tan': math.tan, |
|
'pi': math.pi, 'e': math.e, |
|
'floor': math.floor, 'ceil': math.ceil |
|
} |
|
result = eval(expression, {"__builtins__": {}}, allowed_functions) |
|
return str(result) |
|
|
|
except Exception as e: |
|
return f"Error in calculation: {str(e)}" |
|
|
|
@tool |
|
def smart_text_analyzer(text: str, task_type: str = "general") -> str: |
|
""" |
|
Analyzes text with focus on GAIA-specific tasks. |
|
|
|
Args: |
|
text: Text to analyze. |
|
task_type: 'general', 'names', 'dates', 'numbers', 'veterinary'. |
|
|
|
Returns: |
|
Analysis results. |
|
""" |
|
try: |
|
if task_type == "names": |
|
|
|
name_pattern = r'\b[A-Z][a-z]+(?:\s+[A-Z][a-z]+)*\b' |
|
names = re.findall(name_pattern, text) |
|
return f"Found names: {list(set(names))}" |
|
|
|
elif task_type == "veterinary": |
|
return veterinary_document_analyzer(text) |
|
|
|
elif task_type == "dates": |
|
date_patterns = [ |
|
r'\d{1,2}/\d{1,2}/\d{4}', |
|
r'\d{4}-\d{2}-\d{2}', |
|
r'\b\w+\s+\d{1,2},\s+\d{4}\b' |
|
] |
|
dates = [] |
|
for pattern in date_patterns: |
|
dates.extend(re.findall(pattern, text)) |
|
return f"Found dates: {dates}" |
|
|
|
elif task_type == "numbers": |
|
numbers = re.findall(r'-?\d+\.?\d*', text) |
|
return f"Found numbers: {[float(n) for n in numbers if n]}" |
|
|
|
else: |
|
return f"Characters: {len(text)}, Words: {len(text.split())}, Lines: {len(text.splitlines())}" |
|
|
|
except Exception as e: |
|
return f"Error in text analysis: {str(e)}" |
|
|
|
|
|
|
|
model = HfApiModel( |
|
max_tokens=2048, |
|
temperature=0.1, |
|
model_id='microsoft/DialoGPT-medium', |
|
|
|
) |
|
|
|
|
|
search_tool = DuckDuckGoSearchTool() |
|
|
|
|
|
tools = [ |
|
search_tool, |
|
web_browser, |
|
youtube_transcript_extractor, |
|
image_ocr_analyzer, |
|
pdf_text_extractor, |
|
veterinary_document_analyzer, |
|
smart_text_analyzer, |
|
advanced_calculator, |
|
analyze_excel_file, |
|
] |
|
|
|
|
|
agent_code = CodeAgent( |
|
tools=tools, |
|
model=model, |
|
max_steps=15, |
|
additional_authorized_imports=[ |
|
"os", "tempfile", "pathlib", "re", "json", "math", "pandas", |
|
"requests", "PIL", "pytesseract", "PyPDF2", "yt_dlp" |
|
] |
|
) |
|
DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" |
|
|
|
class BasicAgent: |
|
def __init__(self): |
|
print("Enhanced GAIA Agent initialized with web browsing capabilities.") |
|
self.agent = agent_code |
|
|
|
def __call__(self, question: str) -> str: |
|
try: |
|
|
|
enhanced_question = self._create_gaia_prompt(question) |
|
|
|
result = self.agent.run(enhanced_question) |
|
|
|
|
|
cleaned_result = self._clean_gaia_result(result) |
|
|
|
return cleaned_result if cleaned_result else "No response generated." |
|
|
|
except Exception as e: |
|
print(f"Agent error: {e}") |
|
|
|
try: |
|
fallback_prompt = f""" |
|
CRITICAL GAIA TASK: {question} |
|
|
|
Use available tools to find the answer. If it's a YouTube video, use youtube_transcript_extractor. |
|
If it's about documents, use appropriate analyzers. |
|
Be precise and direct in your final answer. |
|
""" |
|
simple_result = self.agent.run(fallback_prompt) |
|
return simple_result if simple_result else f"Error: {e}" |
|
except: |
|
return f"Error: {e}" |
|
|
|
def _create_gaia_prompt(self, question: str) -> str: |
|
"""Crée un prompt optimisé pour GAIA.""" |
|
return f""" |
|
GAIA EVALUATION TASK - ANSWER PRECISELY |
|
|
|
Question: {question} |
|
|
|
INSTRUCTIONS: |
|
1. If this involves a YouTube video, use youtube_transcript_extractor tool |
|
2. If this involves web content, use web_browser tool |
|
3. If this involves documents/PDFs, use appropriate analyzers |
|
4. If this involves images, use image_ocr_analyzer |
|
5. If this needs search, use the search tool |
|
6. For calculations, use advanced_calculator |
|
7. Be EXACT and SPECIFIC in your final answer |
|
8. Don't provide explanations unless asked - just the answer |
|
|
|
Work step by step and use the right tools for this task. |
|
""" |
|
|
|
|
|
|
|
|
|
|
|
def run_and_submit_all( profile: gr.OAuthProfile | None): |
|
""" |
|
Fetches all questions, runs the BasicAgent on them, submits all answers, |
|
and displays the results. |
|
""" |
|
|
|
space_id = os.getenv("SPACE_ID") |
|
|
|
if profile: |
|
username= f"{profile.username}" |
|
print(f"User logged in: {username}") |
|
else: |
|
print("User not logged in.") |
|
return "Please Login to Hugging Face with the button.", None |
|
|
|
api_url = DEFAULT_API_URL |
|
questions_url = f"{api_url}/questions" |
|
submit_url = f"{api_url}/submit" |
|
|
|
|
|
try: |
|
agent = BasicAgent() |
|
except Exception as e: |
|
print(f"Error instantiating agent: {e}") |
|
return f"Error initializing agent: {e}", None |
|
|
|
agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main" |
|
print(agent_code) |
|
|
|
|
|
print(f"Fetching questions from: {questions_url}") |
|
try: |
|
response = requests.get(questions_url, timeout=15) |
|
response.raise_for_status() |
|
questions_data = response.json() |
|
if not questions_data: |
|
print("Fetched questions list is empty.") |
|
return "Fetched questions list is empty or invalid format.", None |
|
print(f"Fetched {len(questions_data)} questions.") |
|
except requests.exceptions.RequestException as e: |
|
print(f"Error fetching questions: {e}") |
|
return f"Error fetching questions: {e}", None |
|
except requests.exceptions.JSONDecodeError as e: |
|
print(f"Error decoding JSON response from questions endpoint: {e}") |
|
print(f"Response text: {response.text[:500]}") |
|
return f"Error decoding server response for questions: {e}", None |
|
except Exception as e: |
|
print(f"An unexpected error occurred fetching questions: {e}") |
|
return f"An unexpected error occurred fetching questions: {e}", None |
|
|
|
|
|
results_log = [] |
|
answers_payload = [] |
|
print(f"Running agent on {len(questions_data)} questions...") |
|
for item in questions_data: |
|
task_id = item.get("task_id") |
|
question_text = item.get("question") |
|
if not task_id or question_text is None: |
|
print(f"Skipping item with missing task_id or question: {item}") |
|
continue |
|
try: |
|
submitted_answer = agent(question_text) |
|
answers_payload.append({"task_id": task_id, "submitted_answer": submitted_answer}) |
|
results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": submitted_answer}) |
|
except Exception as e: |
|
print(f"Error running agent on task {task_id}: {e}") |
|
results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": f"AGENT ERROR: {e}"}) |
|
|
|
if not answers_payload: |
|
print("Agent did not produce any answers to submit.") |
|
return "Agent did not produce any answers to submit.", pd.DataFrame(results_log) |
|
|
|
|
|
submission_data = {"username": username.strip(), "agent_code": agent_code, "answers": answers_payload} |
|
status_update = f"Agent finished. Submitting {len(answers_payload)} answers for user '{username}'..." |
|
print(status_update) |
|
|
|
|
|
print(f"Submitting {len(answers_payload)} answers to: {submit_url}") |
|
try: |
|
response = requests.post(submit_url, json=submission_data, timeout=60) |
|
response.raise_for_status() |
|
result_data = response.json() |
|
final_status = ( |
|
f"Submission Successful!\n" |
|
f"User: {result_data.get('username')}\n" |
|
f"Overall Score: {result_data.get('score', 'N/A')}% " |
|
f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n" |
|
f"Message: {result_data.get('message', 'No message received.')}" |
|
) |
|
print("Submission successful.") |
|
results_df = pd.DataFrame(results_log) |
|
return final_status, results_df |
|
except requests.exceptions.HTTPError as e: |
|
error_detail = f"Server responded with status {e.response.status_code}." |
|
try: |
|
error_json = e.response.json() |
|
error_detail += f" Detail: {error_json.get('detail', e.response.text)}" |
|
except requests.exceptions.JSONDecodeError: |
|
error_detail += f" Response: {e.response.text[:500]}" |
|
status_message = f"Submission Failed: {error_detail}" |
|
print(status_message) |
|
results_df = pd.DataFrame(results_log) |
|
return status_message, results_df |
|
except requests.exceptions.Timeout: |
|
status_message = "Submission Failed: The request timed out." |
|
print(status_message) |
|
results_df = pd.DataFrame(results_log) |
|
return status_message, results_df |
|
except requests.exceptions.RequestException as e: |
|
status_message = f"Submission Failed: Network error - {e}" |
|
print(status_message) |
|
results_df = pd.DataFrame(results_log) |
|
return status_message, results_df |
|
except Exception as e: |
|
status_message = f"An unexpected error occurred during submission: {e}" |
|
print(status_message) |
|
results_df = pd.DataFrame(results_log) |
|
return status_message, results_df |
|
|
|
|
|
|
|
with gr.Blocks() as demo: |
|
gr.Markdown("# Basic Agent Evaluation Runner") |
|
gr.Markdown( |
|
""" |
|
**Instructions:** |
|
|
|
1. Please clone this space, then modify the code to define your agent's logic, the tools, the necessary packages, etc ... |
|
2. Log in to your Hugging Face account using the button below. This uses your HF username for submission. |
|
3. Click 'Run Evaluation & Submit All Answers' to fetch questions, run your agent, submit answers, and see the score. |
|
|
|
--- |
|
**Disclaimers:** |
|
Once clicking on the "submit button, it can take quite some time ( this is the time for the agent to go through all the questions). |
|
This space provides a basic setup and is intentionally sub-optimal to encourage you to develop your own, more robust solution. For instance for the delay process of the submit button, a solution could be to cache the answers and submit in a seperate action or even to answer the questions in async. |
|
""" |
|
) |
|
|
|
gr.LoginButton() |
|
|
|
run_button = gr.Button("Run Evaluation & Submit All Answers") |
|
|
|
status_output = gr.Textbox(label="Run Status / Submission Result", lines=5, interactive=False) |
|
|
|
results_table = gr.DataFrame(label="Questions and Agent Answers", wrap=True) |
|
|
|
run_button.click( |
|
fn=run_and_submit_all, |
|
outputs=[status_output, results_table] |
|
) |
|
|
|
if __name__ == "__main__": |
|
print("\n" + "-"*30 + " App Starting " + "-"*30) |
|
|
|
space_host_startup = os.getenv("SPACE_HOST") |
|
space_id_startup = os.getenv("SPACE_ID") |
|
|
|
if space_host_startup: |
|
print(f"✅ SPACE_HOST found: {space_host_startup}") |
|
print(f" Runtime URL should be: https://{space_host_startup}.hf.space") |
|
else: |
|
print("ℹ️ SPACE_HOST environment variable not found (running locally?).") |
|
|
|
if space_id_startup: |
|
print(f"✅ SPACE_ID found: {space_id_startup}") |
|
print(f" Repo URL: https://huggingface.co/spaces/{space_id_startup}") |
|
print(f" Repo Tree URL: https://huggingface.co/spaces/{space_id_startup}/tree/main") |
|
else: |
|
print("ℹ️ SPACE_ID environment variable not found (running locally?). Repo URL cannot be determined.") |
|
|
|
print("-"*(60 + len(" App Starting ")) + "\n") |
|
|
|
print("Launching Gradio Interface for Basic Agent Evaluation...") |
|
demo.launch(debug=True, share=False) |