|
import logging |
|
import os |
|
import uuid |
|
|
|
from langgraph.types import Command |
|
|
|
from services.graph import agent_graph |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
import litellm |
|
|
|
if os.getenv("LITELLM_DEBUG", "false").lower() == "true": |
|
litellm.set_verbose = True |
|
logger.setLevel(logging.DEBUG) |
|
else: |
|
litellm.set_verbose = False |
|
logger.setLevel(logging.INFO) |
|
|
|
|
|
class AgentRunner: |
|
"""Runner class for the code agent.""" |
|
|
|
def __init__(self): |
|
"""Initialize the agent runner with graph and tools.""" |
|
logger.info("Initializing AgentRunner") |
|
self.graph = agent_graph |
|
self.last_state = None |
|
self.thread_id = str( |
|
uuid.uuid4() |
|
) |
|
logger.info(f"Created AgentRunner with thread_id: {self.thread_id}") |
|
|
|
def _extract_answer(self, state: dict) -> str: |
|
"""Extract the answer from the state.""" |
|
if not state: |
|
return None |
|
|
|
|
|
if "answer" in state and state["answer"]: |
|
logger.info(f"Found answer in direct field: {state['answer']}") |
|
return state["answer"] |
|
|
|
|
|
if "messages" in state and state["messages"]: |
|
for msg in reversed(state["messages"]): |
|
if hasattr(msg, "content") and msg.content: |
|
logger.info(f"Found answer in message: {msg.content}") |
|
return msg.content |
|
|
|
return None |
|
|
|
def __call__(self, input_data) -> str: |
|
"""Process a question through the agent graph and return the answer. |
|
|
|
Args: |
|
input_data: Either a question string or a Command object for resuming |
|
|
|
Returns: |
|
str: The agent's response |
|
""" |
|
try: |
|
config = {"configurable": {"thread_id": self.thread_id}} |
|
logger.info(f"Using config: {config}") |
|
|
|
if isinstance(input_data, str): |
|
|
|
logger.info(f"Processing initial question: {input_data}") |
|
initial_state = { |
|
"question": input_data, |
|
"messages": [], |
|
"answer": None, |
|
"step_logs": [], |
|
"is_complete": False, |
|
"step_count": 0, |
|
|
|
"context": {}, |
|
"memory_buffer": [], |
|
"last_action": None, |
|
"action_history": [], |
|
"error_count": 0, |
|
"success_count": 0, |
|
} |
|
logger.info(f"Initial state: {initial_state}") |
|
|
|
|
|
logger.info("Starting graph stream for initial question") |
|
for chunk in self.graph.stream(initial_state, config): |
|
logger.debug(f"Received chunk: {chunk}") |
|
if isinstance(chunk, dict): |
|
if "__interrupt__" in chunk: |
|
logger.info("Detected interrupt in stream") |
|
logger.info(f"Interrupt details: {chunk['__interrupt__']}") |
|
|
|
continue |
|
answer = self._extract_answer(chunk) |
|
if answer: |
|
self.last_state = chunk |
|
return answer |
|
else: |
|
logger.debug(f"Skipping chunk without answer: {chunk}") |
|
else: |
|
|
|
logger.info(f"Resuming from interrupt with input: {input_data}") |
|
for result in self.graph.stream(input_data, config): |
|
logger.debug(f"Received resume result: {result}") |
|
if isinstance(result, dict): |
|
answer = self._extract_answer(result) |
|
if answer: |
|
self.last_state = result |
|
return answer |
|
else: |
|
logger.debug(f"Skipping result without answer: {result}") |
|
|
|
|
|
logger.warning("No answer generated from stream") |
|
return "No answer generated" |
|
|
|
except Exception as e: |
|
logger.error(f"Error processing input: {str(e)}") |
|
raise |
|
|