Spaces:
Sleeping
Sleeping
import os | |
import gradio as gr | |
import requests | |
import pandas as pd | |
from smolagents import Tool, CodeAgent, Model | |
# Import internal modules | |
from config import ( | |
DEFAULT_API_URL, | |
USE_LLAMACPP, | |
LLAMACPP_CONFIG | |
) | |
from tools.tool_manager import ToolManager | |
from utils.llama_cpp_model import LlamaCppModel | |
class GaiaToolCallingAgent: | |
"""Tool-calling agent specifically designed for the GAIA system.""" | |
def __init__(self, local_model=None): | |
print("GaiaToolCallingAgent initialized.") | |
self.tool_manager = ToolManager() | |
self.name = "tool_agent" | |
self.description = "A specialized agent that uses various tools to answer questions" | |
self.local_model = local_model | |
if not self.local_model: | |
try: | |
from utils.llama_cpp_model import LlamaCppModel | |
self.local_model = LlamaCppModel( | |
max_tokens=512 | |
) | |
except Exception as e: | |
print(f"Couldn't initialize local model in tool agent: {e}") | |
self.local_model = None | |
def run(self, query: str) -> str: | |
print(f"Processing query: {query}") | |
tools = self.tool_manager.get_tools() | |
context_info = [] | |
for tool in tools: | |
try: | |
if self._should_use_tool(tool, query): | |
print(f"Using tool: {tool.name}") | |
result = tool.forward(query) | |
if result: | |
context_info.append(f"{tool.name} Results:\n{result}") | |
except Exception as e: | |
print(f"Error using {tool.name}: {e}") | |
full_context = "\n\n".join(context_info) if context_info else "" | |
if full_context and self.local_model: | |
try: | |
prompt = f""" | |
Based on the following information, please provide a comprehensive answer to the question: "{query}" | |
CONTEXT INFORMATION: | |
{full_context} | |
Answer: | |
""" | |
response = self.local_model.generate(prompt) | |
return response | |
except Exception as e: | |
print(f"Error generating response with local model: {e}") | |
return full_context | |
else: | |
if not full_context: | |
return "I couldn't find any relevant information to answer your question." | |
return full_context | |
def __call__(self, query: str) -> str: | |
print(f"Tool agent received query: {query}") | |
return self.run(query) | |
def _should_use_tool(self, tool: Tool, query: str) -> bool: | |
query_lower = query.lower() | |
patterns = { | |
"web_search": ["current", "latest", "recent", "who", "what", "when", "where", "how"], | |
"web_content": ["content", "webpage", "website", "page"], | |
"youtube_video": ["youtube.com", "youtu.be"], | |
"wikipedia_search": ["wikipedia", "wiki", "article"], | |
"gaia_retriever": ["gaia", "agent", "ai", "artificial intelligence"] | |
} | |
if tool.name not in patterns: | |
return True | |
return any(pattern in query_lower for pattern in patterns.get(tool.name, [])) | |
def download_model_if_needed(model_path, model_url): | |
if not os.path.exists(model_path): | |
print(f"Downloading model from {model_url}...") | |
os.makedirs(os.path.dirname(model_path), exist_ok=True) | |
with requests.get(model_url, stream=True) as response: | |
response.raise_for_status() | |
with open(model_path, "wb") as f: | |
for chunk in response.iter_content(chunk_size=8192): | |
f.write(chunk) | |
print("Download complete.") | |
def create_manager_agent() -> CodeAgent: | |
try: | |
from config import USE_LLAMACPP, LLAMACPP_CONFIG | |
if USE_LLAMACPP: | |
# Use TheBloke's model with auto-download | |
model_path = LLAMACPP_CONFIG.get("model_path") or "./models/llama-2-7b.Q4_0.gguf" | |
model_url = "https://huggingface.co/TheBloke/Llama-2-7B-GGUF/resolve/main/llama-2-7b.Q4_0.gguf" | |
download_model_if_needed(model_path, model_url) | |
model = LlamaCppModel( | |
model_path=model_path, | |
n_ctx=LLAMACPP_CONFIG.get("n_ctx", 2048), | |
n_gpu_layers=LLAMACPP_CONFIG.get("n_gpu_layers", 0), | |
temperature=LLAMACPP_CONFIG.get("temperature", 0.7) | |
) | |
print(f"Using LlamaCpp model from {model_path}") | |
else: | |
from smolagents import StubModel | |
model = StubModel() | |
print("Using StubModel as fallback") | |
except Exception as e: | |
print(f"Error setting up model: {e}") | |
try: | |
model = LlamaCppModel() | |
print("Using fallback LlamaCpp model configuration") | |
except Exception as e2: | |
from smolagents import StubModel | |
model = StubModel() | |
print(f"Using StubModel due to error: {e2}") | |
tool_agent = GaiaToolCallingAgent(local_model=model) | |
manager_agent = CodeAgent( | |
model=model, | |
tools=[], | |
managed_agents=[tool_agent], | |
additional_authorized_imports=[ | |
"json", "pandas", "numpy", "re", "requests", "bs4" | |
], | |
planning_interval=3, | |
verbosity_level=2, | |
max_steps=10 | |
) | |
print("Manager agent created with local model") | |
return manager_agent | |
def create_agent(): | |
try: | |
print("Initializing GAIA agent system...") | |
return create_manager_agent() | |
except Exception as e: | |
print(f"Error creating GAIA agent: {e}") | |
return None | |
def run_and_submit_all(profile: gr.OAuthProfile | None): | |
space_id = os.getenv("SPACE_ID") | |
if profile: | |
username = f"{profile.username}" | |
print(f"User logged in: {username}") | |
else: | |
print("User not logged in.") | |
return "Please Login to Hugging Face with the button.", None | |
api_url = DEFAULT_API_URL | |
questions_url = f"{api_url}/questions" | |
submit_url = f"{api_url}/submit" | |
try: | |
print("Initializing GAIA agent system...") | |
agent = create_agent() | |
if not agent: | |
return "Error: Could not initialize agent.", None | |
print("GAIA agent initialization complete.") | |
except Exception as e: | |
print(f"Error initializing agent: {e}") | |
return f"Error initializing agent: {e}", None | |
print(f"Fetching questions from: {questions_url}") | |
try: | |
response = requests.get(questions_url, timeout=15) | |
response.raise_for_status() | |
questions_data = response.json() | |
if not questions_data: | |
print("Fetched questions list is empty.") | |
return "Fetched questions list is empty or invalid format.", None | |
print(f"Fetched {len(questions_data)} questions.") | |
except requests.exceptions.RequestException as e: | |
print(f"Error fetching questions: {e}") | |
return f"Error fetching questions: {e}", None | |
except Exception as e: | |
print(f"An unexpected error occurred fetching questions: {e}") | |
return f"An unexpected error occurred fetching questions: {e}", None | |
results_log = [] | |
answers_payload = [] | |
print(f"Running agent on {len(questions_data)} questions...") | |
for item in questions_data: | |
task_id = item.get("task_id") | |
question_text = item.get("question") | |
if not task_id or question_text is None: | |
print(f"Skipping item with missing task_id or question: {item}") | |
continue | |
try: | |
response = agent.run(f"Answer this question concisely: {question_text}") | |
if isinstance(response, dict): | |
submitted_answer = response.get("answer", str(response)) | |
else: | |
submitted_answer = str(response) | |
answers_payload.append({ | |
"task_id": task_id, | |
"submitted_answer": submitted_answer | |
}) | |
results_log.append({ | |
"Task ID": task_id, | |
"Question": question_text, | |
"Submitted Answer": submitted_answer | |
}) | |
except Exception as e: | |
print(f"Error running agent on task {task_id}: {e}") | |
results_log.append({ | |
"Task ID": task_id, | |
"Question": question_text, | |
"Submitted Answer": f"AGENT ERROR: {e}" | |
}) | |
if not answers_payload: | |
print("Agent did not produce any answers to submit.") | |
return "Agent did not produce any answers to submit.", pd.DataFrame(results_log) | |
agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main" | |
submission_data = { | |
"username": username.strip(), | |
"agent_code": agent_code, | |
"answers": answers_payload | |
} | |
print(f"Submitting {len(answers_payload)} answers to API...") | |
try: | |
response = requests.post(submit_url, json=submission_data, timeout=60) | |
response.raise_for_status() | |
result_data = response.json() | |
status_message = ( | |
f"Submission Successful!\n" | |
f"User: {result_data.get('username')}\n" | |
f"Overall Score: {result_data.get('score', 'N/A')}% " | |
f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n" | |
f"Message: {result_data.get('message', 'No message received.')}" | |
) | |
print("Submission successful.") | |
return status_message, pd.DataFrame(results_log) | |
except Exception as e: | |
status_message = f"Submission Failed: {str(e)}" | |
print(f"Error during submission: {e}") | |
return status_message, pd.DataFrame(results_log) | |
with gr.Blocks() as demo: | |
gr.Markdown("# GAIA Agent Evaluation Runner") | |
gr.Markdown(""" | |
**Instructions:** | |
1. Log in to your Hugging Face account using the button below. | |
2. Click 'Run Evaluation & Submit All Answers' to fetch questions, run the agent, and see the score. | |
""") | |
gr.LoginButton() | |
run_button = gr.Button("Run Evaluation & Submit All Answers") | |
status_output = gr.Textbox(label="Run Status / Submission Result", lines=5, interactive=False) | |
results_table = gr.DataFrame(label="Questions and Agent Answers", wrap=True) | |
run_button.click(fn=run_and_submit_all, outputs=[status_output, results_table]) | |
if __name__ == "__main__": | |
print("\n" + "-"*30 + " GAIA Agent Starting " + "-"*30) | |
demo.launch(debug=True, share=False) | |