Abbasid's picture
Update agent.py
a3e120b verified
raw
history blame
10.2 kB
"""
agent.py
This file defines the core logic for a sophisticated AI agent using LangGraph.
This version uses a dynamic system prompt to explicitly list the available tools
for the LLM on every run, designed to combat "tool refusal".
"""
# ----------------------------------------------------------
# Section 0: Imports and Configuration (Identical to before)
# ----------------------------------------------------------
import json
import os
import pickle
import re
import subprocess
import textwrap
import base64
import functools
from io import BytesIO
from pathlib import Path
import requests
from cachetools import TTLCache
from PIL import Image
from langchain.schema import Document
from langchain.tools.retriever import create_retriever_tool
from langchain_community.vectorstores import FAISS
from langchain_community.tools.tavily_search import TavilySearchResults
from langchain_community.document_loaders import WikipediaLoader, ArxivLoader
from langchain_core.messages import SystemMessage, HumanMessage, AIMessage
from langchain_core.tools import Tool, tool
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_groq import ChatGroq
from langchain_huggingface import HuggingFaceEmbeddings, HuggingFaceEndpoint, ChatHuggingFace
from langgraph.graph import START, StateGraph, MessagesState
from langgraph.prebuilt import ToolNode, tools_condition
from dotenv import load_dotenv
load_dotenv()
# --- Configuration and Caching (Identical) ---
JSONL_PATH, FAISS_CACHE, EMBED_MODEL = Path("metadata.jsonl"), Path("faiss_index.pkl"), "sentence-transformers/all-mpnet-base-v2"
RETRIEVER_K, CACHE_TTL = 5, 600
API_CACHE = TTLCache(maxsize=256, ttl=CACHE_TTL)
def cached_get(key: str, fetch_fn):
if key in API_CACHE: return API_CACHE[key]
val = fetch_fn()
API_CACHE[key] = val
return val
# ----------------------------------------------------------
# Section 2: Standalone Tool Functions (Identical to before)
# ----------------------------------------------------------
@tool
def python_repl(code: str) -> str:
"""Executes a string of Python code and returns the stdout/stderr."""
code = textwrap.dedent(code).strip()
try:
result = subprocess.run(["python", "-c", code], capture_output=True, text=True, timeout=10, check=False)
if result.returncode == 0: return f"Execution successful.\nSTDOUT:\n```\n{result.stdout}\n```"
else: return f"Execution failed.\nSTDOUT:\n```\n{result.stdout}\n```\nSTDERR:\n```\n{result.stderr}\n```"
except subprocess.TimeoutExpired: return "Execution timed out (>10s)."
def describe_image_func(image_source: str, vision_llm_instance) -> str:
"""Describes an image from a local file path or a URL using a provided vision LLM."""
try:
if image_source.startswith("http"): img = Image.open(BytesIO(requests.get(image_source, timeout=10).content))
else: img = Image.open(image_source)
buffered = BytesIO()
img.convert("RGB").save(buffered, format="JPEG")
b64_string = base64.b64encode(buffered.getvalue()).decode()
msg = HumanMessage(content=[{"type": "text", "text": "Describe this image in detail."}, {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{b64_string}"}}])
return vision_llm_instance.invoke([msg]).content
except Exception as e: return f"Error processing image: {e}"
def web_search_func(query: str, cache_func) -> str:
"""Performs a web search using Tavily and returns a compilation of results."""
key = f"web:{query}"
results = cache_func(key, lambda: TavilySearchResults(max_results=5).invoke(query))
return "\n\n---\n\n".join([f"Source: {res['url']}\nContent: {res['content']}" for res in results])
def wiki_search_func(query: str, cache_func) -> str:
"""Searches Wikipedia and returns the top 2 results."""
key = f"wiki:{query}"
docs = cache_func(key, lambda: WikipediaLoader(query=query, load_max_docs=2, doc_content_chars_max=2000).load())
return "\n\n---\n\n".join([f"Source: {d.metadata['source']}\n\n{d.page_content}" for d in docs])
def arxiv_search_func(query: str, cache_func) -> str:
"""Searches Arxiv for scientific papers and returns the top 2 results."""
key = f"arxiv:{query}"
docs = cache_func(key, lambda: ArxivLoader(query=query, load_max_docs=2).load())
return "\n\n---\n\n".join([f"Source: {d.metadata['source']}\nPublished: {d.metadata['Published']}\nTitle: {d.metadata['Title']}\n\nSummary:\n{d.page_content}" for d in docs])
# ----------------------------------------------------------
# Section 3: NEW DYNAMIC SYSTEM PROMPT
# ----------------------------------------------------------
# This is now a template string. The {tools} section will be filled in dynamically.
SYSTEM_PROMPT_TEMPLATE = (
"""You are an expert-level research assistant. Your goal is to answer the user's question accurately.
**CRITICAL INSTRUCTIONS:**
1. **USE YOUR TOOLS:** You have been given a set of tools to find information. You MUST use them when the answer is not immediately known to you. Do not make up answers. Do not apologize or refuse to use a tool. You must try.
2. **AVAILABLE TOOLS:** Here is the exact list of tools you have access to:
{tools}
3. **REASONING:** Think step-by-step. First, analyze the user's question. Second, decide which tool is appropriate. Third, call the tool with the correct parameters. Finally, analyze the tool's output to formulate your answer.
4. **LIMITATIONS:** If a question requires a capability you absolutely do not have (e.g., watching a video, listening to audio), you must state that limitation clearly.
5. **FINAL ANSWER FORMAT:** Your final response MUST strictly follow this format and nothing else:
`FINAL ANSWER: [Your concise and accurate answer here]`
"""
)
# ----------------------------------------------------------
# Section 4: Factory Function for Agent Executor (MODIFIED)
# ----------------------------------------------------------
def create_agent_executor(provider: str = "groq"):
"""
Factory function to create and compile the LangGraph agent executor.
This version dynamically builds the system prompt with the list of tools.
"""
print(f"Initializing agent with provider: {provider}")
# Step 1: Build LLMs (Identical)
if provider == "google": main_llm = ChatGoogleGenerativeAI(model="gemini-1.5-pro-latest", temperature=0)
elif provider == "groq": main_llm = ChatGroq(model_name="meta-llama/llama-4-maverick-17b-128e-instruct", temperature=0)
elif provider == "huggingface": main_llm = ChatHuggingFace(llm=HuggingFaceEndpoint(repo_id="mistralai/Mixtral-8x7B-Instruct-v0.1", temperature=0.1))
else: raise ValueError("Invalid provider selected")
vision_llm = ChatGroq(model_name="meta-llama/llama-4-maverick-17b-128e-instruct", temperature=0)
# Step 2: Build Retriever (Identical)
embeddings = HuggingFaceEmbeddings(model_name=EMBED_MODEL)
if FAISS_CACHE.exists():
with open(FAISS_CACHE, "rb") as f: vector_store = pickle.load(f)
else:
docs = [Document(page_content=f"Question: {rec['Question']}\n\nFinal answer: {rec['Final answer']}", metadata={"source": rec["task_id"]}) for rec in (json.loads(line) for line in open(JSONL_PATH, "rt", encoding="utf-8"))]
vector_store = FAISS.from_documents(docs, embeddings)
with open(FAISS_CACHE, "wb") as f: pickle.dump(vector_store, f)
retriever = vector_store.as_retriever(search_kwargs={"k": RETRIEVER_K})
# Step 3: Create the final list of tools (Identical)
tools_list = [
python_repl,
Tool(name="describe_image", func=functools.partial(describe_image_func, vision_llm_instance=vision_llm), description="Describes an image from a local file path or a URL."),
Tool(name="web_search", func=functools.partial(web_search_func, cache_func=cached_get), description="Performs a web search using Tavily."),
Tool(name="wiki_search", func=functools.partial(wiki_search_func, cache_func=cached_get), description="Searches Wikipedia."),
Tool(name="arxiv_search", func=functools.partial(arxiv_search_func, cache_func=cached_get), description="Searches Arxiv for scientific papers."),
create_retriever_tool(retriever=retriever, name="retrieve_examples", description="Retrieve solved questions similar to the user's query."),
]
# --- THIS PART IS NEW ---
# 4a. Format the tool list into a string for the prompt
tool_definitions = "\n".join([f"- `{tool.name}`: {tool.description}" for tool in tools_list])
# 4b. Create the final, dynamic system prompt
final_system_prompt = SYSTEM_PROMPT_TEMPLATE.format(tools=tool_definitions)
# --- END NEW PART ---
llm_with_tools = main_llm.bind_tools(tools_list)
# Step 5: Define Graph Nodes (Modified to use the new prompt)
def retriever_node(state: MessagesState):
user_query = state["messages"][-1].content
docs = retriever.invoke(user_query)
# Use the new, dynamic prompt here
messages = [SystemMessage(content=final_system_prompt)]
if docs:
example_text = "\n\n---\n\n".join(d.page_content for d in docs)
messages.append(AIMessage(content=f"I have found {len(docs)} similar solved examples:\n\n{example_text}", name="ExampleRetriever"))
messages.extend(state["messages"])
return {"messages": messages}
def assistant_node(state: MessagesState):
result = llm_with_tools.invoke(state["messages"])
return {"messages": [result]}
# Step 6: Build Graph (Identical)
builder = StateGraph(MessagesState)
builder.add_node("retriever", retriever_node)
builder.add_node("assistant", assistant_node)
builder.add_node("tools", ToolNode(tools_list))
builder.add_edge(START, "retriever")
builder.add_edge("retriever", "assistant")
builder.add_conditional_edges("assistant", tools_condition, {"tools": "tools", "__end__": "__end__"})
builder.add_edge("tools", "assistant")
agent_executor = builder.compile()
print("Agent Executor created successfully.")
return agent_executor
# --- Section 5 (Testing functions) remains the same ---
# ... (test_llm_connection and __main__ block)