""" agent.py This file defines the core logic for a sophisticated AI agent using LangGraph. This version uses a dynamic system prompt to explicitly list the available tools for the LLM on every run, designed to combat "tool refusal". """ # ---------------------------------------------------------- # Section 0: Imports and Configuration (Identical to before) # ---------------------------------------------------------- import json import os import pickle import re import subprocess import textwrap import base64 import functools from io import BytesIO from pathlib import Path import requests from cachetools import TTLCache from PIL import Image from langchain.schema import Document from langchain.tools.retriever import create_retriever_tool from langchain_community.vectorstores import FAISS from langchain_community.tools.tavily_search import TavilySearchResults from langchain_community.document_loaders import WikipediaLoader, ArxivLoader from langchain_core.messages import SystemMessage, HumanMessage, AIMessage from langchain_core.tools import Tool, tool from langchain_google_genai import ChatGoogleGenerativeAI from langchain_groq import ChatGroq from langchain_huggingface import HuggingFaceEmbeddings, HuggingFaceEndpoint, ChatHuggingFace from langgraph.graph import START, StateGraph, MessagesState from langgraph.prebuilt import ToolNode, tools_condition from dotenv import load_dotenv load_dotenv() # --- Configuration and Caching (Identical) --- JSONL_PATH, FAISS_CACHE, EMBED_MODEL = Path("metadata.jsonl"), Path("faiss_index.pkl"), "sentence-transformers/all-mpnet-base-v2" RETRIEVER_K, CACHE_TTL = 5, 600 API_CACHE = TTLCache(maxsize=256, ttl=CACHE_TTL) def cached_get(key: str, fetch_fn): if key in API_CACHE: return API_CACHE[key] val = fetch_fn() API_CACHE[key] = val return val # ---------------------------------------------------------- # Section 2: Standalone Tool Functions (Identical to before) # ---------------------------------------------------------- @tool def python_repl(code: str) -> str: """Executes a string of Python code and returns the stdout/stderr.""" code = textwrap.dedent(code).strip() try: result = subprocess.run(["python", "-c", code], capture_output=True, text=True, timeout=10, check=False) if result.returncode == 0: return f"Execution successful.\nSTDOUT:\n```\n{result.stdout}\n```" else: return f"Execution failed.\nSTDOUT:\n```\n{result.stdout}\n```\nSTDERR:\n```\n{result.stderr}\n```" except subprocess.TimeoutExpired: return "Execution timed out (>10s)." def describe_image_func(image_source: str, vision_llm_instance) -> str: """Describes an image from a local file path or a URL using a provided vision LLM.""" try: if image_source.startswith("http"): img = Image.open(BytesIO(requests.get(image_source, timeout=10).content)) else: img = Image.open(image_source) buffered = BytesIO() img.convert("RGB").save(buffered, format="JPEG") b64_string = base64.b64encode(buffered.getvalue()).decode() msg = HumanMessage(content=[{"type": "text", "text": "Describe this image in detail."}, {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{b64_string}"}}]) return vision_llm_instance.invoke([msg]).content except Exception as e: return f"Error processing image: {e}" def web_search_func(query: str, cache_func) -> str: """Performs a web search using Tavily and returns a compilation of results.""" key = f"web:{query}" results = cache_func(key, lambda: TavilySearchResults(max_results=5).invoke(query)) return "\n\n---\n\n".join([f"Source: {res['url']}\nContent: {res['content']}" for res in results]) def wiki_search_func(query: str, cache_func) -> str: """Searches Wikipedia and returns the top 2 results.""" key = f"wiki:{query}" docs = cache_func(key, lambda: WikipediaLoader(query=query, load_max_docs=2, doc_content_chars_max=2000).load()) return "\n\n---\n\n".join([f"Source: {d.metadata['source']}\n\n{d.page_content}" for d in docs]) def arxiv_search_func(query: str, cache_func) -> str: """Searches Arxiv for scientific papers and returns the top 2 results.""" key = f"arxiv:{query}" docs = cache_func(key, lambda: ArxivLoader(query=query, load_max_docs=2).load()) return "\n\n---\n\n".join([f"Source: {d.metadata['source']}\nPublished: {d.metadata['Published']}\nTitle: {d.metadata['Title']}\n\nSummary:\n{d.page_content}" for d in docs]) # ---------------------------------------------------------- # Section 3: NEW DYNAMIC SYSTEM PROMPT # ---------------------------------------------------------- # This is now a template string. The {tools} section will be filled in dynamically. SYSTEM_PROMPT_TEMPLATE = ( """You are an expert-level research assistant. Your goal is to answer the user's question accurately. **CRITICAL INSTRUCTIONS:** 1. **USE YOUR TOOLS:** You have been given a set of tools to find information. You MUST use them when the answer is not immediately known to you. Do not make up answers. Do not apologize or refuse to use a tool. You must try. 2. **AVAILABLE TOOLS:** Here is the exact list of tools you have access to: {tools} 3. **REASONING:** Think step-by-step. First, analyze the user's question. Second, decide which tool is appropriate. Third, call the tool with the correct parameters. Finally, analyze the tool's output to formulate your answer. 4. **LIMITATIONS:** If a question requires a capability you absolutely do not have (e.g., watching a video, listening to audio), you must state that limitation clearly. 5. **FINAL ANSWER FORMAT:** Your final response MUST strictly follow this format and nothing else: `FINAL ANSWER: [Your concise and accurate answer here]` """ ) # ---------------------------------------------------------- # Section 4: Factory Function for Agent Executor (MODIFIED) # ---------------------------------------------------------- def create_agent_executor(provider: str = "groq"): """ Factory function to create and compile the LangGraph agent executor. This version dynamically builds the system prompt with the list of tools. """ print(f"Initializing agent with provider: {provider}") # Step 1: Build LLMs (Identical) if provider == "google": main_llm = ChatGoogleGenerativeAI(model="gemini-1.5-pro-latest", temperature=0) elif provider == "groq": main_llm = ChatGroq(model_name="meta-llama/llama-4-maverick-17b-128e-instruct", temperature=0) elif provider == "huggingface": main_llm = ChatHuggingFace(llm=HuggingFaceEndpoint(repo_id="mistralai/Mixtral-8x7B-Instruct-v0.1", temperature=0.1)) else: raise ValueError("Invalid provider selected") vision_llm = ChatGroq(model_name="meta-llama/llama-4-maverick-17b-128e-instruct", temperature=0) # Step 2: Build Retriever (Identical) embeddings = HuggingFaceEmbeddings(model_name=EMBED_MODEL) if FAISS_CACHE.exists(): with open(FAISS_CACHE, "rb") as f: vector_store = pickle.load(f) else: docs = [Document(page_content=f"Question: {rec['Question']}\n\nFinal answer: {rec['Final answer']}", metadata={"source": rec["task_id"]}) for rec in (json.loads(line) for line in open(JSONL_PATH, "rt", encoding="utf-8"))] vector_store = FAISS.from_documents(docs, embeddings) with open(FAISS_CACHE, "wb") as f: pickle.dump(vector_store, f) retriever = vector_store.as_retriever(search_kwargs={"k": RETRIEVER_K}) # Step 3: Create the final list of tools (Identical) tools_list = [ python_repl, Tool(name="describe_image", func=functools.partial(describe_image_func, vision_llm_instance=vision_llm), description="Describes an image from a local file path or a URL."), Tool(name="web_search", func=functools.partial(web_search_func, cache_func=cached_get), description="Performs a web search using Tavily."), Tool(name="wiki_search", func=functools.partial(wiki_search_func, cache_func=cached_get), description="Searches Wikipedia."), Tool(name="arxiv_search", func=functools.partial(arxiv_search_func, cache_func=cached_get), description="Searches Arxiv for scientific papers."), create_retriever_tool(retriever=retriever, name="retrieve_examples", description="Retrieve solved questions similar to the user's query."), ] # --- THIS PART IS NEW --- # 4a. Format the tool list into a string for the prompt tool_definitions = "\n".join([f"- `{tool.name}`: {tool.description}" for tool in tools_list]) # 4b. Create the final, dynamic system prompt final_system_prompt = SYSTEM_PROMPT_TEMPLATE.format(tools=tool_definitions) # --- END NEW PART --- llm_with_tools = main_llm.bind_tools(tools_list) # Step 5: Define Graph Nodes (Modified to use the new prompt) def retriever_node(state: MessagesState): user_query = state["messages"][-1].content docs = retriever.invoke(user_query) # Use the new, dynamic prompt here messages = [SystemMessage(content=final_system_prompt)] if docs: example_text = "\n\n---\n\n".join(d.page_content for d in docs) messages.append(AIMessage(content=f"I have found {len(docs)} similar solved examples:\n\n{example_text}", name="ExampleRetriever")) messages.extend(state["messages"]) return {"messages": messages} def assistant_node(state: MessagesState): result = llm_with_tools.invoke(state["messages"]) return {"messages": [result]} # Step 6: Build Graph (Identical) builder = StateGraph(MessagesState) builder.add_node("retriever", retriever_node) builder.add_node("assistant", assistant_node) builder.add_node("tools", ToolNode(tools_list)) builder.add_edge(START, "retriever") builder.add_edge("retriever", "assistant") builder.add_conditional_edges("assistant", tools_condition, {"tools": "tools", "__end__": "__end__"}) builder.add_edge("tools", "assistant") agent_executor = builder.compile() print("Agent Executor created successfully.") return agent_executor # --- Section 5 (Testing functions) remains the same --- # ... (test_llm_connection and __main__ block)