import logging import os import re import uuid from langgraph.types import Command from graph import agent_graph # Configure logging logging.basicConfig(level=logging.INFO) # Default to INFO level logger = logging.getLogger(__name__) # Enable LiteLLM debug logging only if environment variable is set import litellm if os.getenv("LITELLM_DEBUG", "false").lower() == "true": litellm.set_verbose = True logger.setLevel(logging.DEBUG) else: litellm.set_verbose = False logger.setLevel(logging.INFO) class AgentRunner: """Runner class for the code agent.""" def __init__(self): """Initialize the agent runner with graph and tools.""" logger.info("Initializing AgentRunner") self.graph = agent_graph self.last_state = None # Store the last state for testing/debugging self.thread_id = str( uuid.uuid4() ) # Generate a unique thread_id for this runner logger.info(f"Created AgentRunner with thread_id: {self.thread_id}") def _extract_answer(self, state: dict) -> str: """Extract the answer from the state.""" if not state: return None # First try to get answer from direct answer field if "answer" in state and state["answer"]: logger.info(f"Found answer in direct field: {state['answer']}") return state["answer"] # Then try to get answer from messages if "messages" in state and state["messages"]: for msg in reversed(state["messages"]): if hasattr(msg, "content") and msg.content: # Look for code blocks that might contain the answer if "```" in msg.content: # Extract code between ```py and ``` or ```python and ``` code_match = re.search( r"```(?:py|python)?\s*\n(.*?)\n```", msg.content, re.DOTALL ) if code_match: code = code_match.group(1) # Look for final_answer call final_answer_match = re.search( r"final_answer\((.*?)\)", code ) if final_answer_match: answer = final_answer_match.group(1) logger.info( f"Found answer in final_answer call: {answer}" ) return answer # If no code block with final_answer, use the content logger.info(f"Found answer in message: {msg.content}") return msg.content return None def __call__(self, input_data) -> str: """Process a question through the agent graph and return the answer. Args: input_data: Either a question string or a Command object for resuming Returns: str: The agent's response """ try: config = {"configurable": {"thread_id": self.thread_id}} logger.info(f"Using config: {config}") if isinstance(input_data, str): # Initial question logger.info(f"Processing initial question: {input_data}") initial_state = { "question": input_data, "messages": [], "answer": None, "step_logs": [], "is_complete": False, "step_count": 0, # Initialize new memory fields "context": {}, "memory_buffer": [], "last_action": None, "action_history": [], "error_count": 0, "success_count": 0, } logger.info(f"Initial state: {initial_state}") # Use stream to get results logger.info("Starting graph stream for initial question") for chunk in self.graph.stream(initial_state, config): logger.debug(f"Received chunk: {chunk}") if isinstance(chunk, dict): if "__interrupt__" in chunk: logger.info("Detected interrupt in stream") logger.info(f"Interrupt details: {chunk['__interrupt__']}") # Let the graph handle the interrupt naturally continue answer = self._extract_answer(chunk) if answer: self.last_state = chunk # If the state is complete, return the answer if chunk.get("is_complete", False): return answer else: logger.debug(f"Skipping chunk without answer: {chunk}") else: # Resuming from interrupt logger.info(f"Resuming from interrupt with input: {input_data}") for result in self.graph.stream(input_data, config): logger.debug(f"Received resume result: {result}") if isinstance(result, dict): answer = self._extract_answer(result) if answer: self.last_state = result # If the state is complete, return the answer if result.get("is_complete", False): return answer else: logger.debug(f"Skipping result without answer: {result}") # If we get here, we didn't find an answer logger.warning("No answer generated from stream") return "No answer generated" except Exception as e: logger.error(f"Error processing input: {str(e)}") raise if __name__ == "__main__": import argparse from langgraph.types import Command # Set up argument parser parser = argparse.ArgumentParser(description="Run the agent with a question") parser.add_argument("question", type=str, help="The question to ask the agent") parser.add_argument( "--resume", type=str, help="Value to resume with after an interrupt", default=None, ) args = parser.parse_args() # Create agent runner runner = AgentRunner() if args.resume: # Resume from interrupt with provided value print(f"\nResuming with value: {args.resume}") response = runner(Command(resume=args.resume)) else: # Initial run with question print(f"\nAsking question: {args.question}") response = runner(args.question) print(f"\nFinal response: {response}")