onisj's picture
feat(advance): Deploy corrected app.py and tools fo advance functions
4701375
raw
history blame
21.7 kB
import os
import json
import logging
import asyncio
import aiohttp
import nest_asyncio
import requests
import pandas as pd
from typing import Dict, Any, List
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.messages import SystemMessage, HumanMessage
from langgraph.graph import StateGraph, END
from sentence_transformers import SentenceTransformer
import gradio as gr
from dotenv import load_dotenv
from huggingface_hub import InferenceClient
from state import JARVISState
from tools import (
search_tool, multi_hop_search_tool, file_parser_tool, image_parser_tool,
calculator_tool, document_retriever_tool, duckduckgo_search_tool,
weather_info_tool, hub_stats_tool, guest_info_retriever_tool
)
# Setup logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Apply nest_asyncio
nest_asyncio.apply()
# Load environment variables
load_dotenv()
SPACE_ID = os.getenv("SPACE_ID", "onisj/jarvis_gaia_agent")
GAIA_API_URL = "https://agents-course-unit4-scoring.hf.space"
GAIA_FILE_URL = f"{GAIA_API_URL}/files/"
HF_TOKEN = os.getenv("HUGGINGFACEHUB_API_TOKEN")
# Verify environment variables
if not SPACE_ID:
raise ValueError("SPACE_ID not set")
if not HF_TOKEN:
raise ValueError("HUGGINGFACEHUB_API_TOKEN not set")
logger.info(f"SPACE_ID: {SPACE_ID}")
# Initialize models
try:
llm = InferenceClient(
model="meta-llama/Meta-Llama-3-8B-Instruct",
token=HF_TOKEN,
timeout=30
)
logger.info("Hugging Face Inference LLM initialized")
except Exception as e:
logger.error(f"Failed to initialize LLM: {e}")
llm = None
try:
embedder = SentenceTransformer("all-MiniLM-L6-v2")
logger.info("Sentence transformer initialized")
except Exception as e:
logger.error(f"Failed to initialize embedder: {e}")
embedder = None
# --- Helper Functions ---
async def test_gaia_api(task_id: str, file_type: str = "txt") -> tuple[bool, str | None]:
"""Test if a file exists for the task ID."""
try:
for ext in [file_type, "txt", "csv", "xlsx", "jpg", "pdf"]:
async with aiohttp.ClientSession() as session:
async with session.get(f"{GAIA_FILE_URL}{task_id}.{ext}", timeout=5) as resp:
logger.info(f"GAIA API test for task {task_id} with .{ext}: HTTP {resp.status}")
if resp.status == 200:
file_path = f"temp_{task_id}.{ext}"
with open(file_path, "wb") as f:
f.write(await resp.read())
return True, ext
logger.info(f"No file found for task {task_id}")
return False, None
except Exception as e:
logger.warning(f"GAIA API test failed: {str(e)}")
return False, None
# --- Node Functions ---
async def parse_question(state: Dict[str, Any]) -> Dict[str, Any]:
"""Parse the question to select appropriate tools."""
try:
question = state["question"]
task_id = state["task_id"]
tools_needed = ["search_tool"]
if llm:
prompt = ChatPromptTemplate.from_messages([
SystemMessage(content="""Select tools from: ['search_tool', 'multi_hop_search_tool', 'file_parser_tool', 'image_parser_tool', 'calculator_tool', 'document_retriever_tool', 'duckduckgo_search_tool', 'weather_info_tool', 'hub_stats_tool', 'guest_info_retriever_tool'].
Return JSON list, e.g., ["search_tool", "file_parser_tool"].
Rules:
- Always include "search_tool" unless purely computational.
- Use "multi_hop_search_tool" for complex queries (over 20 words).
- Use "file_parser_tool" for data, tables, or Excel.
- Use "image_parser_tool" for images/videos.
- Use "calculator_tool" for math calculations.
- Use "document_retriever_tool" for documents/PDFs.
- Use "duckduckgo_search_tool" for additional search capability.
- Use "weather_info_tool" for weather-related queries.
- Use "hub_stats_tool" for Hugging Face Hub queries.
- Use "guest_info_retriever_tool" for guest-related queries.
- Output ONLY valid JSON."""),
HumanMessage(content=f"Query: {question}")
])
try:
response = llm.chat_completion(
messages=[
{"role": "system", "content": prompt[0].content},
{"role": "user", "content": prompt[1].content}
],
max_tokens=512,
temperature=0.7
)
tools_needed = json.loads(response["choices"][0]["message"]["content"].strip())
valid_tools = {
"search_tool", "multi_hop_search_tool", "file_parser_tool", "image_parser_tool",
"calculator_tool", "document_retriever_tool", "duckduckgo_search_tool",
"weather_info_tool", "hub_stats_tool", "guest_info_retriever_tool"
}
tools_needed = [tool for tool in tools_needed if tool in valid_tools]
except Exception as e:
logger.warning(f"Task {task_id} failed: JSON parse error: {e}")
tools_needed = ["search_tool"]
# Keyword-based fallback
question_lower = question.lower()
if any(word in question_lower for word in ["image", "video"]):
tools_needed.append("image_parser_tool")
if any(word in question_lower for word in ["data", "table", "excel"]):
tools_needed.append("file_parser_tool")
if any(word in question_lower for word in ["calculate", "math"]):
tools_needed.append("calculator_tool")
if any(word in question_lower for word in ["document", "pdf"]):
tools_needed.append("document_retriever_tool")
if any(word in question_lower for word in ["weather"]):
tools_needed.append("weather_info_tool")
if any(word in question_lower for word in ["model", "huggingface"]):
tools_needed.append("hub_stats_tool")
if any(word in question_lower for word in ["guest", "name", "relation"]):
tools_needed.append("guest_info_retriever_tool")
if len(question.split()) > 20:
tools_needed.append("multi_hop_search_tool")
file_available, file_ext = await test_gaia_api(task_id)
if file_available:
if "file_parser_tool" not in tools_needed and any(word in question_lower for word in ["data", "table", "excel"]):
tools_needed.append("file_parser_tool")
if "image_parser_tool" not in tools_needed and "image" in question_lower:
tools_needed.append("image_parser_tool")
if "document_retriever_tool" not in tools_needed and file_ext == "pdf":
tools_needed.append("document_retriever_tool")
else:
tools_needed = [tool for tool in tools_needed if tool not in ["file_parser_tool", "image_parser_tool", "document_retriever_tool"]]
state["tools_needed"] = list(set(tools_needed)) # Remove duplicates
logger.info(f"Task {task_id}: Selected tools: {tools_needed}")
return state
except Exception as e:
logger.error(f"Error parsing task {task_id}: {e}")
state["tools_needed"] = ["search_tool"]
return state
async def tool_dispatcher(state: JARVISState) -> JARVISState:
"""Dispatch selected tools to process the state."""
try:
updated_state = state.copy()
file_type = "jpg" if "image" in state["question"].lower() else "txt"
if "menu" in state["question"].lower() or "report" in state["question"].lower():
file_type = "pdf"
elif "data" in state["question"].lower():
file_type = "xlsx"
can_download, file_ext = await test_gaia_api(updated_state["task_id"], file_type)
for tool in updated_state["tools_needed"]:
try:
if tool == "search_tool":
result = await search_tool.ainvoke({"query": updated_state["question"]})
updated_state["web_results"].extend([r["content"] for r in result])
elif tool == "multi_hop_search_tool":
result = await multi_hop_search_tool.ainvoke({"query": updated_state["question"], "steps": 3})
updated_state["web_results"].extend([r["content"] for r in result])
await asyncio.sleep(2) # Rate limit
elif tool == "file_parser_tool" and can_download:
result = await file_parser_tool.ainvoke({"task_id": updated_state["task_id"], "file_type": file_ext})
updated_state["file_results"] = str(result)
elif tool == "image_parser_tool" and can_download:
result = await image_parser_tool.ainvoke({
"file_path": f"temp_{updated_state['task_id']}.{file_ext}",
"task": "describe"
})
updated_state["image_results"] = str(result)
elif tool == "calculator_tool":
result = await calculator_tool.ainvoke({"expression": updated_state.get("question", "")})
updated_state["calculation_results"] = str(result)
elif tool == "document_retriever_tool" and can_download:
result = await document_retriever_tool.ainvoke({
"task_id": updated_state["task_id"],
"query": updated_state["question"],
"file_type": file_ext
})
updated_state["document_results"] = str(result)
elif tool == "duckduckgo_search_tool":
result = await duckduckgo_search_tool.run(updated_state["question"])
updated_state["web_results"].append(str(result))
elif tool == "weather_info_tool":
location = updated_state["question"].split("weather in ")[1].split()[0] if "weather in" in updated_state["question"].lower() else "Unknown"
result = await weather_info_tool.ainvoke({"location": location})
updated_state["web_results"].append(str(result))
elif tool == "hub_stats_tool":
author = updated_state["question"].split("by ")[1].split()[0] if "by" in updated_state["question"].lower() else "Unknown"
result = await hub_stats_tool.ainvoke({"author": author})
updated_state["web_results"].append(str(result))
elif tool == "guest_info_retriever_tool":
query = updated_state["question"].split("about ")[1] if "about" in updated_state["question"].lower() else updated_state["question"]
result = await guest_info_retriever_tool.ainvoke({"query": query})
updated_state["web_results"].append(str(result))
except Exception as e:
logger.warning(f"Error in tool {tool} for task {updated_state['task_id']}: {str(e)}")
updated_state[f"{tool}_results"] = f"Error: {str(e)}"
logger.info(f"Task {updated_state['task_id']}: Tool results: {updated_state}")
return updated_state
except Exception as e:
logger.error(f"Tool dispatch failed for task {state['task_id']}: {e}")
return state
async def reasoning(state: JARVISState) -> Dict[str, Any]:
"""Generate exact-match answer with specific formatting."""
try:
if not llm:
return {"answer": "LLM unavailable"}
prompt = ChatPromptTemplate.from_messages([
SystemMessage(content="""Provide ONLY the exact answer (e.g., '90', 'HUE'). For USD, use two decimal places (e.g., '1234.00'). For lists, use comma-separated values (e.g., 'Smith, Lee'). For IOC codes, use three-letter codes (e.g., 'ARG'). No explanations or conversational text."""),
HumanMessage(content="""Question: {question}
Web results: {web_results}
File results: {file_results}
Image results: {image_results}
Calculation results: {calculation_results}
Document results: {document_results}""")
])
response = llm.chat_completion(
messages=[
{"role": "system", "content": prompt[0].content},
{"role": "user", "content": prompt[1].content.format(
question=state["question"],
web_results="\n".join(state["web_results"]),
file_results=state["file_results"],
image_results=state["image_results"],
calculation_results=state["calculation_results"],
document_results=state["document_results"]
)}
],
max_tokens=512,
temperature=0.7
)
answer = response["choices"][0]["message"]["content"].strip()
# Clean answer for specific formats
if "USD" in state["question"].lower():
try:
answer = f"{float(answer):.2f}"
except ValueError:
pass
if "before and after" in state["question"].lower():
answer = answer.replace(" and ", ", ")
elif "IOC code" in state["question"].lower():
answer = answer.upper()[:3]
logger.info(f"Task {state['task_id']}: Answer: {answer}")
return {"answer": answer}
except Exception as e:
logger.error(f"Reasoning failed for task {state['task_id']}: {e}")
return {"answer": f"Error: {str(e)}"}
def router(state: JARVISState) -> str:
"""Route based on tools needed."""
if state["tools_needed"]:
return "tool_dispatcher"
return "reasoning"
# --- Define StateGraph ---
workflow = StateGraph(JARVISState)
workflow.add_node("parse", parse_question)
workflow.add_node("tool_dispatcher", tool_dispatcher)
workflow.add_node("reasoning", reasoning)
workflow.set_entry_point("parse")
workflow.add_conditional_edges(
"parse",
router,
{
"tool_dispatcher": "tool_dispatcher",
"reasoning": "reasoning"
}
)
workflow.add_edge("tool_dispatcher", "reasoning")
workflow.add_edge("reasoning", END)
graph = workflow.compile()
# --- Basic Agent ---
class BasicAgent:
def __init__(self):
logger.info("BasicAgent initialized.")
async def process_question(self, task_id: str, question: str) -> str:
"""Process a single question with file handling."""
file_type = "jpg" if "image" in question.lower() else "txt"
if "menu" in question.lower() or "report" in question.lower():
file_type = "pdf"
elif "data" in question.lower():
file_type = "xlsx"
file_path = f"temp_{task_id}.{file_type}"
file_available, file_ext = await test_gaia_api(task_id, file_type)
if file_available:
try:
async with aiohttp.ClientSession() as session:
async with session.get(f"{GAIA_FILE_URL}{task_id}.{file_ext}") as resp:
if resp.status == 200:
with open(file_path, "wb") as f:
f.write(await resp.read())
else:
logger.warning(f"Failed to fetch file for {task_id}: HTTP {resp.status}")
except Exception as e:
logger.error(f"Error downloading file for task {task_id}: {str(e)}")
state = JARVISState(
task_id=task_id,
question=question,
tools_needed=["search_tool"],
web_results=[],
file_results="",
image_results="",
calculation_results="",
document_results="",
messages=[HumanMessage(content=question)],
answer=""
)
try:
result = await graph.ainvoke(state)
answer = result["answer"] or "Unknown"
logger.info(f"Task {task_id}: Final answer generated: {answer}")
return answer
except Exception as e:
logger.error(f"Error processing task {task_id}: {e}")
return f"Error: {str(e)}"
finally:
for ext in ["txt", "csv", "xlsx", "jpg", "pdf"]:
file_path = f"temp_{task_id}.{ext}"
if os.path.exists(file_path):
try:
os.remove(file_path)
except Exception as e:
logger.error(f"Error removing file {file_path}: {e}")
async def async_call(self, question: str, task_id: str) -> str:
return await self.process_question(question, task_id)
def __call__(self, question: str, task_id: str = None) -> str:
logger.info(f"Processing question: {question[:50]}...")
if task_id is None:
task_id = "unknown_task_id"
try:
loop = asyncio.get_event_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
return loop.run_until_complete(self.async_call(question, task_id))
# --- Evaluation and Submission ---
def run_and_submit_all(profile: gr.OAuthProfile | None):
"""Run evaluation and submit answers to GAIA API."""
if not profile:
logger.error("User not logged in.")
return "Please Login to Hugging Face.", None
username = f"{profile.username}"
logger.info(f"User logged in: {username}")
questions_url = f"{GAIA_API_URL}/questions"
submit_url = f"{GAIA_API_URL}/submit"
agent_code = f"https://huggingface.co/spaces/{SPACE_ID}/tree/main"
try:
agent = BasicAgent()
except Exception as e:
logger.error(f"Agent initialization failed: {e}")
return f"Error initializing agent: {e}", None
logger.info(f"Fetching questions from: {questions_url}")
try:
response = requests.get(questions_url, timeout=15)
response.raise_for_status()
questions_data = response.json()
if not questions_data:
logger.error("Empty questions list.")
return "No questions fetched.", None
logger.info(f"Fetched {len(questions_data)} questions.")
except Exception as e:
logger.error(f"Error fetching questions: {e}")
return f"Error fetching questions: {e}", None
results_log = []
answers_payload = []
logger.info(f"Processing {len(questions_data)} questions...")
for item in questions_data:
task_id = item.get("task_id")
question_text = item.get("question")
if not task_id or question_text is None:
logger.warning(f"Skipping invalid item: {item}")
continue
try:
submitted_answer = agent(question_text, task_id)
answers_payload.append({"task_id": task_id, "submitted_answer": submitted_answer})
results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": submitted_answer})
except Exception as e:
logger.error(f"Error for task {task_id}: {e}")
results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": f"AGENT ERROR: {e}"})
if not answers_payload:
logger.error("No answers generated.")
return "No answers to submit.", pd.DataFrame(results_log)
submission_data = {"username": username.strip(), "agent_code": agent_code, "answers": answers_payload}
logger.info(f"Submitting {len(answers_payload)} answers to: {submit_url}")
try:
response = requests.post(submit_url, json=submission_data, timeout=120)
response.raise_for_status()
result_data = response.json()
final_status = (
f"Submission Successful!\n"
f"User: {result_data.get('username')}\n"
f"Overall Score: {result_data.get('score', 'N/A')}% "
f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n"
f"Message: {result_data.get('message', 'No message received.')}"
)
results_df = pd.DataFrame(results_log)
return final_status, results_df
except Exception as e:
logger.error(f"Submission failed: {e}")
results_df = pd.DataFrame(results_log)
return f"Submission Failed: {e}", results_df
# --- Gradio Interface ---
with gr.Blocks() as demo:
gr.Markdown("# Evolved JARVIS Agent Evaluation")
gr.Markdown(
"""
**Instructions:**
1. Log in to Hugging Face using the button below.
2. Click 'Run Evaluation & Submit All Answers' to process GAIA questions and submit.
---
**Disclaimers:**
Uses Hugging Face Inference, SERPAPI, and OpenWeatherMap for GAIA benchmark.
"""
)
gr.LoginButton()
run_button = gr.Button("Run Evaluation & Submit All Answers")
status_output = gr.Textbox(label="Run Status / Submission Result", lines=5, interactive=False)
results_table = gr.DataFrame(label="Questions and Answers", wrap=True)
run_button.click(
fn=run_and_submit_all,
outputs=[status_output, results_table]
)
# --- Main ---
if __name__ == "__main__":
logger.info("\n" + "-"*30 + " App Starting " + "-"*30)
logger.info(f"SPACE_ID: {SPACE_ID}")
logger.info("Launching Gradio Interface...")
demo.launch(debug=True, share=False)