Ahmud's picture
Update agent.py
66f6cc6 verified
raw
history blame
5.84 kB
import os
import itertools
from typing import TypedDict, Annotated, Literal
from langchain_openai import ChatOpenAI
from langchain_core.messages import AnyMessage, SystemMessage, HumanMessage, ToolMessage
from langgraph.graph.message import add_messages
from langgraph.graph import MessagesState, StateGraph, START, END
from langchain_core.runnables import RunnableConfig # for LangSmith tracking
from langchain_community.tools import BraveSearch # web search
from langchain_experimental.tools.python.tool import PythonAstREPLTool # for logic/math problems
from tools import (
calculator_basic, datetime_tools, transcribe_audio,
transcribe_youtube, query_image, webpage_content, read_excel
)
from prompt import system_prompt
# --------------------------------------------------------------------
# 1. API Key Rotation Setup
# --------------------------------------------------------------------
api_keys = [
os.getenv("OPENROUTER_API_KEY"),
os.getenv("OPENROUTER_API_KEY_1")
]
if not any(api_keys):
raise EnvironmentError("No OpenRouter API keys found in environment variables.")
api_key_cycle = itertools.cycle([k for k in api_keys if k])
def get_next_api_key():
"""Get the next API key in rotation."""
return next(api_key_cycle)
class RotatingChatOpenAI(ChatOpenAI):
"""ChatOpenAI wrapper that automatically rotates API keys on failure."""
def invoke(self, *args, **kwargs):
# Try each key once per call
for _ in range(len(api_keys)):
self.api_key = get_next_api_key()
try:
return super().invoke(*args, **kwargs)
except Exception as e:
# Handle rate-limits or auth errors
if any(code in str(e) for code in ["429", "401", "403"]):
print(f"[API Key Rotation] Key {self.api_key[:5]}... failed, trying next key...")
continue
raise # Re-raise other unexpected errors
raise RuntimeError("All OpenRouter API keys failed or rate-limited.")
# --------------------------------------------------------------------
# 2. Initialize LLM with API Key Rotation
# --------------------------------------------------------------------
llm = RotatingChatOpenAI(
base_url="https://openrouter.ai/api/v1",
api_key=get_next_api_key(), # Start with the first key
model="qwen/qwen3-coder:free", # Model must support function calling
temperature=1
)
# --------------------------------------------------------------------
# 3. Tools Setup
# --------------------------------------------------------------------
python_tool = PythonAstREPLTool()
search_tool = BraveSearch.from_api_key(
api_key=os.getenv("BRAVE_SEARCH_API"),
search_kwargs={"count": 4},
description="Web search using Brave"
)
community_tools = [search_tool, python_tool]
custom_tools = calculator_basic + datetime_tools + [
transcribe_audio, transcribe_youtube, query_image, webpage_content, read_excel
]
tools = community_tools + custom_tools
llm_with_tools = llm.bind_tools(tools)
tools_by_name = {tool.name: tool for tool in tools}
# --------------------------------------------------------------------
# 4. Define LangGraph State and Nodes
# --------------------------------------------------------------------
class MessagesState(TypedDict):
messages: Annotated[list[AnyMessage], add_messages]
# LLM Node
def llm_call(state: MessagesState):
return {
"messages": [
llm_with_tools.invoke(
[SystemMessage(content=system_prompt)] + state["messages"]
)
]
}
# Tool Node
def tool_node(state: MessagesState):
"""Executes tools requested by the LLM."""
result = []
for tool_call in state["messages"][-1].tool_calls:
tool = tools_by_name[tool_call["name"]]
observation = tool.invoke(tool_call["args"])
result.append(ToolMessage(content=observation, tool_call_id=tool_call["id"]))
return {"messages": result}
# Conditional Routing
def should_continue(state: MessagesState) -> Literal["Action", END]:
"""Route to tools if LLM made a tool call, else end."""
last_message = state["messages"][-1]
return "Action" if last_message.tool_calls else END
# --------------------------------------------------------------------
# 5. Build LangGraph Agent
# --------------------------------------------------------------------
builder = StateGraph(MessagesState)
builder.add_node("llm_call", llm_call)
builder.add_node("environment", tool_node)
builder.add_edge(START, "llm_call")
builder.add_conditional_edges(
"llm_call",
should_continue,
{"Action": "environment", END: END}
)
builder.add_edge("environment", "llm_call")
gaia_agent = builder.compile()
# --------------------------------------------------------------------
# 6. Agent Wrapper
# --------------------------------------------------------------------
class LangGraphAgent:
def __init__(self):
print("LangGraphAgent initialized with API key rotation.")
def __call__(self, question: str) -> str:
input_state = {"messages": [HumanMessage(content=question)]}
print(f"Running LangGraphAgent with input: {question[:150]}...")
config = RunnableConfig(
config={
"run_name": "GAIA Agent",
"tags": ["gaia", "langgraph", "agent"],
"metadata": {"user_input": question},
"recursion_limit": 30,
"tracing": True
}
)
result = gaia_agent.invoke(input_state, config)
final_response = result["messages"][-1].content
try:
return final_response.split("FINAL ANSWER:")[-1].strip()
except Exception:
print("Could not split on 'FINAL ANSWER:'")
return final_response