import logging import os import uuid import aiohttp import json import httpx import io import requests from urllib.parse import quote from typing import Annotated from typing import TypedDict, List, Optional, Literal from typing_extensions import TypedDict from pydantic import BaseModel, Field from trafilatura import extract from huggingface_hub import InferenceClient from langchain_core.messages import AIMessage, HumanMessage, AnyMessage, ToolCall, SystemMessage, ToolMessage from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder from langchain_core.tools import tool from langchain_community.tools import TavilySearchResults from langgraph.graph.state import CompiledStateGraph from langgraph.graph import StateGraph, START, END, add_messages from langgraph.prebuilt import ToolNode from langgraph.prebuilt import ToolNode, tools_condition from langgraph.checkpoint.memory import MemorySaver from langgraph.types import Command, interrupt from langchain_anthropic import ChatAnthropic from langchain_openai import ChatOpenAI from mistralai import Mistral from langchain.chat_models import init_chat_model from langchain_core.messages.utils import convert_to_openai_messages class State(TypedDict): messages: Annotated[list, add_messages] class DebugToolNode(ToolNode): async def invoke(self, state, config=None): print("๐Ÿ› ๏ธ ToolNode activated") print(f"Available tools: {[tool.name for tool in self.tool_map.values()]}") print(f"Tool calls in last message: {state.messages[-1].tool_calls}") return await super().invoke(state, config) logger = logging.getLogger(__name__) ASSISTANT_SYSTEM_PROMPT_BASE = """""" search_enabled = bool(os.environ.get("TAVILY_API_KEY")) try: with open('brainstorming_system_prompt.txt', 'r') as file: brainstorming_system_prompt = file.read() except FileNotFoundError: print("File 'system_prompt.txt' not found!") except Exception as e: print(f"Error reading file: {e}") def evaluate_idea_completion(response) -> bool: """ Evaluates whether the assistant's response indicates a complete DIY project idea. You can customize the logic based on your specific criteria. """ # Example logic: Check if the response contains certain keywords required_keywords = ["materials", "dimensions", "tools", "steps"] # Determine the type of response and extract text accordingly if isinstance(response, dict): # If response is a dictionary, extract values and join them into a single string response_text = ' '.join(str(value).lower() for value in response.values()) elif isinstance(response, str): # If response is a string, convert it to lowercase response_text = response.lower() else: # If response is of an unexpected type, convert it to string and lowercase response_text = str(response).lower() return all(keyword in response_text for keyword in required_keywords) @tool async def human_assistance(query: str) -> str: """Request assistance from a human.""" human_response = await interrupt({"query": query}) # async wait return human_response["data"] @tool async def download_website_text(url: str) -> str: """Download the text from a website""" try: async with aiohttp.ClientSession() as session: async with session.get(url) as response: response.raise_for_status() downloaded = await response.text() result = extract(downloaded, include_formatting=True, include_links=True, output_format='json', with_metadata=True) return result or "No text found on the website" except Exception as e: logger.error(f"Failed to download {url}: {str(e)}") return f"Error retrieving website content: {str(e)}" @tool async def finalize_idea() -> str: """Marks the brainstorming phase as complete. This function does nothing else.""" return "Brainstorming finalized." tools = [download_website_text, human_assistance,finalize_idea] memory = MemorySaver() if search_enabled: tavily_search_tool = TavilySearchResults( max_results=5, search_depth="advanced", include_answer=True, include_raw_content=True, ) tools.append(tavily_search_tool) else: print("TAVILY_API_KEY environment variable not found. Websearch disabled") weak_model = ChatOpenAI( model="gpt-4o", temperature=0, max_tokens=None, timeout=None, max_retries=2, # api_key="...", # if you prefer to pass api key in directly instaed of using env vars # base_url="...", # organization="...", # other params... ) api_key = os.environ["MISTRAL_API_KEY"] model = "mistral-large-latest" client = Mistral(api_key=api_key) # ChatAnthropic( # model="claude-3-5-sonnet-20240620", # temperature=0, # max_tokens=1024, # timeout=None, # max_retries=2, # # other params... # ) search_enabled = bool(os.environ.get("TAVILY_API_KEY")) if not os.environ.get("OPENAI_API_KEY"): print('Open API key not found') prompt_planning_model = ChatOpenAI( model="gpt-4o", temperature=0, max_tokens=None, timeout=None, max_retries=2, # api_key="...", # if you prefer to pass api key in directly instaed of using env vars # base_url="...", # organization="...", # other params... ) threed_object_gen_model = ChatOpenAI( model="gpt-4o", temperature=0, max_tokens=None, timeout=None, max_retries=2, # api_key="...", # if you prefer to pass api key in directly instaed of using env vars # base_url="...", # organization="...", # other params... ) huggingfaceclient = InferenceClient( provider="hf-inference", api_key=os.environ["HF_TOKEN"], ) model = weak_model assistant_model = weak_model class GraphProcessingState(BaseModel): # user_input: str = Field(default_factory=str, description="The original user input") messages: Annotated[list[AnyMessage], add_messages] = Field(default_factory=list) prompt: str = Field(default_factory=str, description="The prompt to be used for the model") tools_enabled: dict = Field(default_factory=dict, description="The tools enabled for the assistant") search_enabled: bool = Field(default=True, description="Whether to enable search tools") next_stage: str = Field(default="", description="The next stage to execute, decided by the guidance node.") tool_call_required: bool = Field(default=False, description="Whether a tool should be called from brainstorming.") loop_brainstorming: bool = Field(default=False, description="Whether to loop back to brainstorming for further iteration.") # Completion flags for each stage brainstorming_complete: bool = Field(default=False) planning_complete: bool = Field(default=False) drawing_complete: bool = Field(default=False) product_searching_complete: bool = Field(default=False) purchasing_complete: bool = Field(default=False) idea_complete: bool = Field(default=False) generated_image_url_from_dalle: str = Field(default="", description="The generated_image_url_from_dalle.") drawing_attempts: int = Field(default=0) drawing_failed :bool = Field(default=False) async def guidance_node(state: GraphProcessingState, config=None): # print(f"Prompt: {state.prompt}") # print(f"Prompt: {state.prompt}") # # print(f"Message: {state.messages}") # print(f"Tools Enabled: {state.tools_enabled}") # print(f"Search Enabled: {state.search_enabled}") # for message in state.messages: # print(f'\ncomplete message', message) # if isinstance(message, HumanMessage): # print(f"Human: {message.content}\n") # elif isinstance(message, AIMessage): # # Check if content is non-empty # if message.content: # # If content is a list (e.g., list of dicts), extract text # if isinstance(message.content, list): # texts = [item.get('text', '') for item in message.content if isinstance(item, dict) and 'text' in item] # if texts: # print(f"AI: {' '.join(texts)}\n") # elif isinstance(message.content, str): # print(f"AI: {message.content}") # elif isinstance(message, SystemMessage): # print(f"System: {message.content}\n") # elif isinstance(message, ToolMessage): # print(f"Tool: {message.content}\n") print("\n๐Ÿ•ต๏ธโ€โ™€๏ธ๐Ÿ•ต๏ธโ€โ™€๏ธ | start | progress checking nodee \n") # Added a newline for clarity # print(f"Prompt: {state.prompt}\n") if state.messages: last_message = state.messages[-1] if isinstance(last_message, HumanMessage): print(f"๐Ÿง‘ Human: {last_message.content}\n") elif isinstance(last_message, AIMessage): if last_message.content: if isinstance(last_message.content, list): texts = [item.get('text', '') for item in last_message.content if isinstance(item, dict) and 'text' in item] if texts: print(f"๐Ÿค– AI: {' '.join(texts)}\n") elif isinstance(last_message.content, str): print(f"๐Ÿค– AI: {last_message.content}\n") elif isinstance(last_message, SystemMessage): print(f"โš™๏ธ System: {last_message.content}\n") elif isinstance(last_message, ToolMessage): print(f"๐Ÿ› ๏ธ Tool: {last_message.content}\n") else: print("\n(No messages found.)") # Log boolean completion flags # Define the order of stages stage_order = ["brainstorming", "planning", "drawing", "product_searching", "purchasing"] # Identify completed and incomplete stages incomplete = [stage for stage in stage_order if not getattr(state, f"{stage}_complete", False)] # Determine the next stage if not incomplete: # All stages are complete return { "messages": [AIMessage(content="All DIY project stages are complete!")], "next_stage": "end_project", "pending_approval_stage": None, } else: # Set the next stage to the first incomplete stage if state.drawing_failed: next_stage = "end" else: next_stage = incomplete[0] print(f"Next Stage decided at guidance: {next_stage}") print("\n๐Ÿ•ต๏ธโ€โ™€๏ธ๐Ÿ•ต๏ธโ€โ™€๏ธ | end | progress checking nodee \n") # Added a newline for clarity return { "messages": [], "next_stage": next_stage, "pending_approval_stage": None, } def guidance_routing(state: GraphProcessingState) -> str: print("\n๐Ÿ”€๐Ÿ”€ Routing checkpoint ๐Ÿ”€๐Ÿ”€\n") print(f"Next Stage received guidance routing: {state.next_stage}\n") print(f"Brainstorming complete: {state.brainstorming_complete}") print(f"Prompt planing: {state.planning_complete}") print(f"Drwaing 3d model: {state.drawing_complete}") print(f"Finding products: {state.product_searching_complete}\n") next_stage = state.next_stage if next_stage == "brainstorming": return "brainstorming_node" elif next_stage == "planning": # return "generate_3d_node" return "prompt_planning_node" elif next_stage == "drawing": return "generate_3d_node" elif next_stage == "product_searching": return "END" elif next_stage == "purchasing": print('\n Exit status 0 )') print(f"Prompt: {state.prompt}") print(f"Prompt: {state.prompt}") # print(f"Message: {state.messages}") print(f"Tools Enabled: {state.tools_enabled}") print(f"Search Enabled: {state.search_enabled}") for message in state.messages: print(f'\ncomplete message', message) if isinstance(message, HumanMessage): print(f"Human: {message.content}\n") elif isinstance(message, AIMessage): # Check if content is non-empty if message.content: # If content is a list (e.g., list of dicts), extract text if isinstance(message.content, list): texts = [item.get('text', '') for item in message.content if isinstance(item, dict) and 'text' in item] if texts: print(f"AI: {' '.join(texts)}\n") elif isinstance(message.content, str): print(f"AI: {message.content}") elif isinstance(message, SystemMessage): print(f"System: {message.content}\n") elif isinstance(message, ToolMessage): print(f"Tool: {message.content}\n") # return "drawing_node" # elif next_stage == "product_searching": # return "product_searching" # elif next_stage == "purchasing": # return "purchasing_node" return "END" elif next_stage == "end": print('\n graph was forced to move to end because some error please analyze the state') print('\n Exit status 1 )') for message in state.messages: print(f'\ncomplete message', message) if isinstance(message, HumanMessage): print(f"Human: {message.content}\n") elif isinstance(message, AIMessage): # Check if content is non-empty if message.content: # If content is a list (e.g., list of dicts), extract text if isinstance(message.content, list): texts = [item.get('text', '') for item in message.content if isinstance(item, dict) and 'text' in item] if texts: print(f"AI: {' '.join(texts)}\n") elif isinstance(message.content, str): print(f"AI: {message.content}") elif isinstance(message, SystemMessage): print(f"System: {message.content}\n") elif isinstance(message, ToolMessage): print(f"Tool: {message.content}\n") return "END" async def brainstorming_node(state: GraphProcessingState, config=None): print("\n๐Ÿง ๐Ÿง  | start | brainstorming Node \n") # Added a newline for clarity # Check if model is available if not model: return {"messages": [AIMessage(content="Model not available for brainstorming.")]} # Filter out messages with empty content filtered_messages = [ message for message in state.messages if isinstance(message, (HumanMessage, AIMessage, SystemMessage, ToolMessage)) and message.content ] # Ensure there is at least one message with content if not filtered_messages: filtered_messages.append(AIMessage(content="No valid messages provided.")) stage_order = ["brainstorming", "planning", "drawing", "product_searching", "purchasing"] completed = [stage for stage in stage_order if getattr(state, f"{stage}_complete", False)] incomplete = [stage for stage in stage_order if not getattr(state, f"{stage}_complete", False)] if not incomplete: print("All stages complete!") # Handle case where all stages are complete # You might want to return a message and end, or set proposed_next_stage to a special value ai_all_complete_msg = AIMessage(content="All DIY project stages are complete!") return { "messages": current_messages + [ai_all_complete_msg], "next_stage": "end_project", # Or None, or a final summary node "pending_approval_stage": None, } else: # THIS LINE DEFINES THE VARIABLE proposed_next_stage = incomplete[0] guidance_prompt_text = ( """ You are a warm, encouraging, and knowledgeable AI assistant, acting as a **Creative DIY Collaborator**. Your primary goal is to guide the user through a friendly and inspiring conversation to finalize **ONE specific, viable DIY project idea**. While we want to be efficient, the top priority is making the user feel heard, understood, and confident in their final choice. โš ๏ธ Your core directive remains speed and convergence: If you identify an idea that clearly meets ALL **Critical Criteria** and the user seems positive or neutral, you must suggest finalizing it **immediately**. Do NOT delay by offering too many alternatives once a solid candidate emerges. Your goal is to converge on a "good enough" idea the user is happy with, not to explore every possibility. **Your Conversational Style & Strategy:** 1. **Be an Active Listener:** Start by acknowledging and validating the user's input. Show you understand their core desire (e.g., "That sounds like a fun goal! Creating a custom piece for your living room is always rewarding."). 2. **Ask Inspiring, Open-Ended Questions:** Instead of generic questions, make them feel personal and insightful. * *Instead of:* "What do you want to build?" * *Try:* "What part of your home are you dreaming of improving?" or "Are you thinking of a gift for someone special, or a project just for you?" 3. **Act as a Knowledgeable Guide:** When a user is unsure, proactively suggest appealing ideas based on their subtle clues. Connect their interests to tangible projects. * *Example:* If the user mentions liking plants and having a small balcony, you could suggest: "That's great! We could think about a vertical herb garden to save space, or maybe some simple, stylish hanging macrame planters. Does either of those spark your interest?" 4. **Guide, Don't Just Gatekeep:** When an idea *almost* meets the criteria, don't just reject it. Gently guide it towards feasibility. * *Example:* "A full-sized dining table might require some specialized tools. How about we adapt that idea into a beautiful, buildable coffee table or a set of side tables using similar techniques?" **Critical Criteria for the Final DIY Project Idea (Your non-negotiable checklist):** 1. **Buildable:** Achievable by an average person with basic DIY skills. 2. **Common Materials/Tools:** Uses only materials (e.g., wood, screws, glue, paint, fabric, cardboard) and basic hand tools (e.g., screwdrivers, hammers, saws, drills) commonly available in general hardware stores, craft stores, or supermarkets worldwide. 3. **Avoid Specializations:** Explicitly AVOID projects requiring specialized electronic components, 3D printing, specific brand items not universally available, or complex machinery. 4. **Tangible Product:** The final result must be a physical, tangible item. **Your Internal Process (How you think on each turn):** 1. **THOUGHT:** * Clearly state your understanding of the userโ€™s current input and conversational state. * Outline your plan: Engage with their latest input using your **Conversational Style**. Propose or refine an idea to meet the **Critical Criteria**. * **Tool Identification (`human_assistance`):** Decide if you need to ask a question. The question should be formulated according to the "Inspiring, Open-Ended Questions" principle. Clearly state your intention to use the `human_assistance` tool with the exact friendly and natural-sounding question as the `query`. * **Idea Finalization Check:** Check if the current idea satisfies ALL **Critical Criteria**. If yes, and the user shows no objection, move to finalize immediately. Remember: **good enough is final enough**. 2. **TOOL USE (`human_assistance` - If Needed):** * Invoke `human_assistance` with your well-formulated, friendly query. 3. **RESPONSE SYNTHESIS / IDEA FINALIZATION:** * **If an idea is finalized:** Respond *only* with the exact phrase: `IDEA FINALIZED: [Name of the Idea]` (e.g., `IDEA FINALIZED: Simple Wooden Spice Rack`) * **If brainstorming continues:** * Provide your engaging suggestions or refinements based on your **Conversational Style**. * Await the user response. **General Guidelines (Your core principles):** * **Empathy Over Pure Efficiency:** A positive, collaborative experience is the primary goal. Don't rush the user if they are still exploring. * **Criteria Focused:** Always gently guide ideas toward the **Critical Criteria**. * **One Main Idea at a Time:** Focus the conversation on a single project idea to avoid confusion. * **Rapid Convergence:** Despite the friendly tone, always be looking for the fastest path to a final, viable idea. """ ) if state.prompt: final_prompt = "\n".join([ guidance_prompt_text, state.prompt, ASSISTANT_SYSTEM_PROMPT_BASE]) else: final_prompt = "\n".join([ guidance_prompt_text, ASSISTANT_SYSTEM_PROMPT_BASE]) prompt = ChatPromptTemplate.from_messages( [ ("system", final_prompt), MessagesPlaceholder(variable_name="messages"), ] ) # Tools allowed for brainstorming node_tools = [human_assistance] if state.search_enabled and tavily_search_tool: # only add search tool if enabled and initialized node_tools.append(tavily_search_tool) mistraltools = [ { "type": "function", "function": { "name": "human_assistance", "description": "Ask a question from the user", "parameters": { "type": "object", "properties": { "query": { "type": "string", "query": "The transaction id.", } }, "required": ["query"], }, }, }, { "type": "function", "function": { "name": "finalize_idea", "description": "Handles finalized ideas. Saves or dispatches the confirmed idea for the next steps. but make sure you give your response with key word IDEA FINALIZED", "parameters": { "type": "object", "properties": { "idea_name": { "type": "string", "description": "The name of the finalized DIY idea.", } }, "required": ["idea_name"] } } } ] llm = init_chat_model("mistral-large-latest", model_provider="mistralai") llm_with_tools = llm.bind_tools(mistraltools) chain = prompt | llm_with_tools openai_messages = convert_to_openai_messages(state.messages) openai_messages_with_prompt = [ {"role": "system", "content": final_prompt}, # your guidance prompt *openai_messages # history youโ€™ve already converted ] # print('open ai formatted', openai_messages_with_prompt[-1]) for msg in openai_messages_with_prompt: print(msg) mistralmodel = "mistral-saba-2502" # Pass filtered messages to the chain try: # response = await chain.ainvoke({"messages": filtered_messages}, config=config) response = client.chat.complete( model = mistralmodel, messages = openai_messages_with_prompt, tools = mistraltools, tool_choice = "any", parallel_tool_calls = False, ) mistral_message = response.choices[0].message tool_call = response.choices[0].message.tool_calls[0] function_name = tool_call.function.name function_params = json.loads(tool_call.function.arguments) ai_message = AIMessage( content=mistral_message.content or "", # Use empty string if blank additional_kwargs={ "tool_calls": [ { "id": tool_call.id, "function": { "name": tool_call.function.name, "arguments": tool_call.function.arguments, }, "type": "function", # Add this if your chain expects it } ] } ) updates = { "messages": [ai_message], "tool_calls": [ { "name": function_name, "arguments": function_params, } ], "next": function_name, } print("\nfunction_name: ", function_name, "\nfunction_params: ", function_params) print('\n๐Ÿ” response from brainstorm\n', updates) if function_name == "finalize_idea": print('finalazing idea') state.brainstorming_complete = True updates["brainstorming_complete"] = True if isinstance(response, AIMessage) and response.content: print(' Identified last AI message', response) if isinstance(response.content, str): content = response.content.strip() elif isinstance(response.content, list): texts = [item.get("text", "") for item in response.content if isinstance(item, dict)] content = " ".join(texts).strip() else: content = str(response.content).strip() print('content for idea finalizing:', content) if "finalize_idea:" in content: # Use 'in' instead of 'startswith' print('โœ… final idea') updates.update({ "brainstorming_complete": True, "tool_call_required": False, "loop_brainstorming": False, }) return updates else: # tool_calls = getattr(response, "tool_calls", None) if tool_call: print('๐Ÿ› ๏ธ tool call requested at brainstorming node') updates.update({ "tool_call_required": True, "loop_brainstorming": False, }) if tool_call: tool_call = response.choices[0].message.tool_calls[0] function_name = tool_call.function.name function_params = json.loads(tool_call.function.arguments) print("\nfunction_name: ", function_name, "\nfunction_params: ", function_params) # for tool_call in response.tool_calls: # tool_name = tool_call['name'] # if tool_name == "human_assistance": # query = tool_call['args']['query'] # print(f"Human input needed: {query}") # for tool_call in tool_calls: # if isinstance(tool_call, dict) and 'name' in tool_call and 'args' in tool_call: # print(f"๐Ÿ”ง Tool Call (Dict): {tool_call.get('name')}, Args: {tool_call.get('args')}") # else: # print(f"๐Ÿ”ง Unknown tool_call format: {tool_call}") else: print('๐Ÿ’ฌ decided tp keep brainstorming') updates.update({ "tool_call_required": False, "loop_brainstorming": True, }) print(f"Brainstorming continues: {content}") else: # If no proper response, keep looping brainstorming updates["tool_call_required"] = False updates["loop_brainstorming"] = True print("\n๐Ÿง ๐Ÿง  | end | brainstorming Node \n") return updates except Exception as e: print(f"Error: {e}") return { "messages": [AIMessage(content="Error.")], "next_stage": "brainstorming" } async def prompt_planning_node(state: GraphProcessingState, config=None): print("\n๐Ÿšฉ๐Ÿšฉ | start | prompt planing Node \n") # Ensure we have a model if not model: return {"messages": [AIMessage(content="Model not available for planning.")]} filtered_messages = state.messages # Filter out empty messages # filtered_messages = [ # msg for msg in state.messages # if isinstance(msg, (HumanMessage, AIMessage, SystemMessage, ToolMessage)) and msg.content # ] # filtered_messages = [] # for msg in state.messages: # if isinstance(msg, ToolMessage): # # ๐Ÿ› ๏ธ ToolMessage needs to be paired with a prior assistant message that called the tool # tool_name = msg.name or "unknown_tool" # tool_call_id = msg.tool_call_id or "tool_call_id_missing" # # Simulated assistant message that initiated the tool call # fake_assistant_msg = AIMessage( # content="", # additional_kwargs={ # "tool_calls": [ # { # "id": tool_call_id, # "type": "function", # "function": { # "name": tool_name, # "arguments": json.dumps({"content": msg.content or ""}), # } # } # ] # } # ) # # Append both in correct sequence # filtered_messages.append(fake_assistant_msg) # filtered_messages.append(msg) # elif isinstance(msg, (HumanMessage, AIMessage, SystemMessage)) and msg.content: # filtered_messages.append(msg) # Fallback if list ends up empty if not filtered_messages: filtered_messages.append(AIMessage(content="No valid messages provided.")) # Define the system prompt for planning guidance_prompt_text = """ You are a creative and helpful AI assistant acting as a **DIY Project Brainstorming & 3D-Prompt Generator**. Your mission is to collaborate with the user to: 1. Brainstorm and refine one specific, viable DIY project idea. 2. Your goal is to create a prompt for generating a 3D model of the **entire DIY project**. If modeling the whole project isn't practical (e.g., it's too complex or has too many parts), then identify the **single largest or most essential component** and create the prompt for that instead. 3. Produce a final, precise text prompt for an microsoft trellis 3D-generation endpoint. --- **Critical Criteria for the DIY Project** (must be met): โ€ข Buildable by an average person with only basic DIY skills. โ€ข Uses common materials/tools (e.g., wood, screws, glue, paint; hammer, saw, drill). โ€ข No specialized electronics, 3D printers, or proprietary parts. โ€ข Results in a tangible, physical item. --- **Available Tools** โ€ข human_assistance โ€“ ask the user clarifying questions. โ€ข (optional) your project-specific search tool โ€“ look up inspiration or standard dimensions if needed. --- **When the DIY idea is fully detailed and meets all criteria, output exactly and only:** ACCURATE PROMPT FOR MODEL GENERATING: [Your final single-paragraph prompt here] """ # Build final prompt if state.prompt: final_prompt = "\n".join([guidance_prompt_text, state.prompt, ASSISTANT_SYSTEM_PROMPT_BASE]) else: final_prompt = "\n".join([guidance_prompt_text, ASSISTANT_SYSTEM_PROMPT_BASE]) prompt = ChatPromptTemplate.from_messages([ ("system", final_prompt), MessagesPlaceholder(variable_name="messages"), ]) # Bind tools node_tools = [human_assistance] if state.search_enabled and tavily_search_tool: node_tools.append(tavily_search_tool) llm_with_tools = prompt_planning_model.bind_tools(node_tools) chain = prompt | llm_with_tools # print(' ๐Ÿ‘พ๐Ÿ‘พ๐Ÿ‘พ๐Ÿ‘พDebugging the request going in to prompt planing model') # print("Prompt: ", prompt) # print("chain: ", chain) for msg in filtered_messages: print('โœจmsg : ',msg) print('\n') try: response = await chain.ainvoke({"messages": filtered_messages}, config=config) print('\nresponse ->: ', response) # Log any required human assistance query if hasattr(response, "tool_calls"): for call in response.tool_calls: if call.get("name") == "human_assistance": print(f"Human input needed: {call['args']['query']}") updates = {"messages": [response]} # Extract response text content = "" if isinstance(response.content, str): content = response.content.strip() elif isinstance(response.content, list): content = " ".join(item.get("text","") for item in response.content if isinstance(item, dict)).strip() # Check for finalization signalif "finalize_idea:" in content: if "ACCURATE PROMPT FOR MODEL GENERATING" in content: dalle_prompt_text = content.replace("ACCURATE PROMPT FOR MODEL GENERATING:", "").strip() # print(f"\n๐Ÿค–๐Ÿค–๐Ÿค–๐Ÿค–Extracted DALL-E prompt: {dalle_prompt_text}") generated_image_url = None generated_3d_model_url = None # This will store the final 3D model URL # --- START: New code for DALL-E and Trellis API calls --- OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") if not OPENAI_API_KEY: print("Error: OPENAI_API_KEY environment variable not set.") updates["messages"].append(AIMessage(content="OpenAI API key not configured. Cannot generate image.")) else: # try: # --- Your existing client setup --- # prompt = dalle_prompt_text # model_id = "black-forest-labs/FLUX.1-dev" # or any other model # print(f"Generating image for prompt: '{prompt}' with model '{model_id}'...") # # output is a PIL.Image object # image = huggingfaceclient.text_to_image( # prompt, # model=model_id, # ) # print("Image generated successfully.") # # --- Code to save the image --- # # 1. Define the directory name # output_directory = "files" # os.makedirs(output_directory, exist_ok=True) # print(f"Ensured directory '{output_directory}' exists.") # image_filename = "astronaut_horse.png" # full_save_path = os.path.join(output_directory, image_filename) # # 5. Save the PIL.Image object # # The image object (if it's a PIL.Image) has a .save() method # image.save(full_save_path) # print(f"Image saved successfully to: {full_save_path}") # if image: # print("\nAttempting to upload generated image to Supabase...") # # Define the filename for Supabase (can include a path prefix) # supabase_target_filename = f"hf_generated_{uuid}" # Example: put in a 'hf_generated' folder # # 1. Save the PIL image to a temporary in-memory buffer # img_byte_arr = io.BytesIO() # image.save(img_byte_arr, format='JPEG') # Match Dart's 'image/jpeg' # img_byte_arr.seek(0) # Reset buffer's position to the beginning # # Prepare the file for the multipart/form-data request # # The field name 'file' and 'filename' should match what your Edge Function expects. # files_payload = { # 'file': (supabase_target_filename, img_byte_arr, 'image/jpeg') # } # # Headers (Content-Type for multipart/form-data is set automatically by requests # # when using the `files` parameter, but you can set other headers if your edge function needs them) # upload_headers = { # # 'Authorization': 'Bearer YOUR_SUPABASE_ANON_KEY_OR_SERVICE_KEY_IF_EDGE_FUNCTION_NEEDS_IT' # } # print(f"Uploading image to Supabase Edge Function: as {supabase_target_filename}...") # supabase_public_url = None # try: # upload_response = requests.post( # 'https://yqewezudxihyadvmfovd.supabase.co/functions/v1/storage-upload', # files=files_payload, # headers=upload_headers # ) # upload_response.raise_for_status() # Raise an HTTPError for bad responses (4XX or 5XX) # # 2. Parse the response from the Edge Function # # The Dart code expects: imgresponse.data['data']['path'] # response_json = upload_response.json() # if 'data' in response_json and 'path' in response_json['data']: # raw_path = response_json['data']['path'] # print(f"Edge function returned raw path: {raw_path}") # # 3. Construct the public URL # # The public URL format for Supabase Storage is: # # SUPABASE_URL/storage/v1/object/public/BUCKET_NAME/FILE_PATH # # The FILE_PATH needs to be URL encoded. # encoded_path = quote(raw_path) # # generated_image_url = f"{encoded_path}"'https://yqewezudxihyadvmfovd.supabase.co/storage/v1/object/public/product_images/$encodedPath'; # generated_image_url =f"https://yqewezudxihyadvmfovd.supabase.co/storage/v1/object/public/product_images/{encoded_path}" # print(f"\nSuccessfully uploaded to Supabase!") # print(f"Public URL: {generated_image_url}") # else: # print(f"Error: Unexpected response format from Edge Function: {response_json}") # print("\nFailed to upload image to Supabase.") # except requests.exceptions.RequestException as e_upload: # print(f"Error uploading to Supabase: {e_upload}") # if hasattr(e_upload, 'response') and e_upload.response is not None: # print(f"Supabase Response status: {e_upload.response.status_code}") # print(f"Supabase Response text: {e_upload.response.text}") # print("\nFailed to upload image to Supabase.") # except Exception as e_upload_generic: # print(f"An unexpected error occurred during Supabase upload: {e_upload_generic}") # print("\nFailed to upload image to Supabase.") # else: # print("No image was generated, skipping Supabase upload.") # except KeyError: # print("Error: The HF_TOKEN environment variable is not set.") # print("Please set it before running the script. For example:") # print(" export HF_TOKEN='your_hugging_face_api_token'") # except ImportError: # print("Error: The Pillow (PIL) library might not be installed correctly.") # print("If 'image' is a PIL.Image object, Pillow is required to save it.") # print("You might need to install it: pip install Pillow huggingface_hub") # except Exception as e: # print(f"An error occurred: {e}") # print("Make sure your API token is valid, has the necessary permissions,") # print(f"and the model '{model_id}' is accessible and compatible.") # 1. Call DALL-E API dalle_api_url = "https://api.openai.com/v1/images/generations" dalle_headers = { "Content-Type": "application/json", "Authorization": f"Bearer {OPENAI_API_KEY}" } _model_to_use_for_dalle_call = "dall-e-2" # <<< IMPORTANT: Set this to "dall-e-2" or "dall-e-3" _processed_prompt_text = dalle_prompt_text # Start with the original prompt _prompt_was_trimmed_or_issue_found = False _warning_or_error_message_for_updates = None max_prompt_lengths = { "dall-e-2": 1000, "dall-e-3": 4000, "gpt-image-1": 32000 # Included for completeness, though payload is for DALL-E } if not _processed_prompt_text: # Check for empty prompt _message = f"Error: The DALL-E prompt for model '{_model_to_use_for_dalle_call}' cannot be empty. API call will likely fail." print(f"\n๐Ÿ›‘๐Ÿ›‘๐Ÿ›‘๐Ÿ›‘ {_message}") _warning_or_error_message_for_updates = _message _prompt_was_trimmed_or_issue_found = True # NOTE: OpenAI API will return an error for an empty prompt. # If you want to prevent the call entirely here, you could add: # updates["messages"].append(AIMessage(content=_message)) # return # or raise an exception elif _model_to_use_for_dalle_call in max_prompt_lengths: _max_len = max_prompt_lengths[_model_to_use_for_dalle_call] _original_len = len(_processed_prompt_text) if _original_len > _max_len: _processed_prompt_text = _processed_prompt_text[:_max_len] _message = ( f"Warning: Prompt for DALL-E ({_model_to_use_for_dalle_call}) was {_original_len} characters. " f"It has been TRUNCATED to the maximum of {_max_len} characters." ) print(f"\nโš ๏ธโš ๏ธโš ๏ธโš ๏ธ {_message}") _warning_or_error_message_for_updates = _message _prompt_was_trimmed_or_issue_found = True else: # Model specified in _model_to_use_for_dalle_call is not in our length check dictionary _message = ( f"Notice: Model '{_model_to_use_for_dalle_call}' not found in pre-defined prompt length limits. " "Proceeding with the original prompt. API may reject if prompt is too long for this model." ) print(f"\nโ„น๏ธโ„น๏ธโ„น๏ธโ„น๏ธ {_message}") # You might not want to add this specific notice to 'updates["messages"]' unless it's critical # _warning_or_error_message_for_updates = _message # _prompt_was_trimmed_or_issue_found = True # Or not, depending on how you view this # Add warning/error to updates if one was generated if _warning_or_error_message_for_updates: # Check if 'updates' and 'AIMessage' are available in the current scope to avoid errors if 'updates' in locals() and isinstance(updates, dict) and 'messages' in updates and 'AIMessage' in globals(): updates["messages"].append(AIMessage(content=_warning_or_error_message_for_updates)) elif 'updates' in globals() and isinstance(updates, dict) and 'messages' in updates: # If AIMessage isn't defined, just append string updates["messages"].append(_warning_or_error_message_for_updates) # --- Prompt Trimming Logic END --- dalle_payload = { "model": _model_to_use_for_dalle_call, # Use the model determined above "prompt": _processed_prompt_text, # Use the processed (potentially trimmed) prompt "n": 1, "size": "1024x1024" # You can add other DALL-E 3 specific params if _model_to_use_for_dalle_call is "dall-e-3" # e.g., "quality": "hd", "style": "vivid" } print(f"\n๐Ÿค–๐Ÿค–๐Ÿค–๐Ÿค–Generating image using DALL-E with this prompt: {dalle_prompt_text}") async with aiohttp.ClientSession() as session: try: async with session.post(dalle_api_url, headers=dalle_headers, json=dalle_payload) as dalle_response: dalle_response.raise_for_status() # Raise an exception for HTTP errors dalle_data = await dalle_response.json() if dalle_data.get("data") and len(dalle_data["data"]) > 0: generated_image_url = dalle_data["data"][0].get("url") print(f"DALL-E generated image URL: {generated_image_url}") updates["messages"].append(AIMessage(content=f"Image generated by DALL-E: {generated_image_url}")) else: print("Error: DALL-E API did not return image data.") updates["messages"].append(AIMessage(content="Failed to get image from DALL-E.")) except aiohttp.ClientError as e: print(f"DALL-E API call error: {e}") updates["messages"].append(AIMessage(content=f"Error calling DALL-E: {e}")) except json.JSONDecodeError as e: print(f"DALL-E API JSON decode error: {e}. Response: {await dalle_response.text()}") updates["messages"].append(AIMessage(content=f"Error decoding DALL-E response: {e}")) except Exception as e: print(f"Unexpected error during DALL-E processing: {e}") updates["messages"].append(AIMessage(content=f"Unexpected error with DALL-E: {e}")) updates.update({ "generated_image_url_from_dalle": generated_image_url, "planning_complete": True, "tool_call_required": False, "loop_planning": False, }) else: # Check if a tool call was requested if getattr(response, "tool_calls", None): updates.update({ "tool_call_required": True, "loop_planning": False, }) else: updates.update({ "tool_call_required": False, "loop_planning": True, }) print("\n๐Ÿšฉ๐Ÿšฉ | end | prompt planing Node \n") return updates except Exception as e: print(f"Error in prompt_planning node: {e}") return { "messages": [AIMessage(content="Error in prompt_planning node.")], "next_stage": state.next_stage or "planning" } async def generate_3d_node(state: GraphProcessingState, config=None): print("\n๐Ÿš€๐Ÿš€๐Ÿš€ | start | Generate 3D Node ๐Ÿš€๐Ÿš€๐Ÿš€\n") # 1. Get the image URL # For now, using a hardcoded URL as requested for testing. # In a real scenario, you might get this from the state: # image_url = state.get("image_url_for_3d") # if not image_url: # print("No image_url_for_3d found in state.") # return {"messages": [AIMessage(content="No image URL found for 3D generation.")]} # print(f"Using generated image_url: {state.generated_image_url_from_dalle}") # 2. Define API endpoint and parameters api_base_url = "https://wishwa-code--trellis-3d-model-generate.modal.run/" params = { "image_url": state.generated_image_url_from_dalle, "simplify": "0.95", "texture_size": "1024", "sparse_sampling_steps": "12", "sparse_sampling_cfg": "7.5", "slat_sampling_steps": "12", "slat_sampling_cfg": "3", "seed": "42", "output_format": "glb" } # Create a directory to store generated models if it doesn't exist output_dir = "generated_3d_models" os.makedirs(output_dir, exist_ok=True) current_attempts = state.drawing_attempts + 1 max_attempts = 2 if current_attempts > max_attempts: print(f"โŒ Max attempts ({max_attempts}) reached. Halting workflow.") return { "messages": [AIMessage(content=f"Failed to generate 3D model after {max_attempts} attempts.")], "drawing_failed": True } # 3. Attempt generation with retries print(f"\n ๐Ÿง™๐Ÿง™ Calling 3d moded generation API for the { state.drawing_attempts} time. Max attempts : {max_attempts}") try: # Note: The API call can take a long time (1.5 mins in your curl example) # Ensure your HTTP client timeout is sufficient. # httpx default timeout is 5 seconds, which is too short. async with httpx.AsyncClient(timeout=300.0) as client: # Timeout set to 120 seconds response = await client.get(api_base_url, params=params) response.raise_for_status() # Raises an HTTPStatusError for 4XX/5XX responses # Successfully got a response if response.status_code == 200: # Assuming the response body is the .glb file content file_name = f"model_{uuid.uuid4()}.glb" file_path = os.path.join(output_dir, file_name) with open(file_path, "wb") as f: f.write(response.content) print(f"Success: 3D model saved to {file_path}") return { "messages": [AIMessage(content=f"3D object generation successful: {file_path}")], "drawing_complete": True, "three_d_model_path": file_path, } else: # This case might not be reached if raise_for_status() is used effectively, # but good for explicit handling. error_message = f"API returned status {response.status_code}: {response.text}" print(error_message) if current_attempts == 2: # Last attempt return { "messages": [AIMessage(content=f"Failed to generate 3D object in {current_attempts}. Last error: {error_message}")], "drawing_attempts": current_attempts, } except httpx.HTTPStatusError as e: error_message = f"HTTP error occurred: {e.response.status_code} - {e.response.text}" print(error_message) return { "messages": [AIMessage(content=f"Failed to generate 3D object after {current_attempts} attempts. Last HTTP error: {error_message}")], "drawing_attempts": current_attempts } except httpx.RequestError as e: # Catches network errors, timeout errors etc. error_message = f"Request error occurred: {str(e)}" print(error_message) return { "messages": [AIMessage(content=f"Failed to generate 3D object after {current_attempts} attempts. Last request error: {error_message}")], "drawing_attempts": current_attempts } except Exception as e: error_message = f"An unexpected error occurred: {str(e)}" print(error_message) print("Retrying...") return { "messages": [AIMessage(content=f"Failed to generate 3D object after {current_attempts} attempts. Last unexpected error: {error_message}")], "drawing_attempts": current_attempts } # Failed after retries (this path should ideally be covered by returns in the loop) return { "messages": [AIMessage(content=f"Failed to generate a valid 3D object after {current_attempts} attempts.")], "drawing_attempts": current_attempts } async def satisfaction_clarification_node(state: GraphProcessingState): """ A simple node that first asks the user for satisfaction, then provides a response. This node demonstrates a two-step human-in-the-loop interaction: 1. โ“ Ask the user a question using the `human_assistance` tool. 2. ๐Ÿ’ฌ Process the (simulated) user response and provide a final message. """ print("๐Ÿšฉ Running Satisfaction Clarification Node...") # Get the last message from the state. In a real graph, this would be # the user's response from the tool call. last_message = state['messages'][-1] if state['messages'] else None # --- Logic to handle the conversation flow --- # Scenario 1: The node is just starting. Ask the user if they are satisfied. # We check if the last message is NOT a ToolMessage, meaning the loop is starting. if not isinstance(last_message, ToolMessage): print(" -> Step 1: Asking user for satisfaction.") # Create a tool call to ask the user for their feedback. tool_call_id = "human_assist_1" human_assistance_call = ToolMessage( tool_call_id=tool_call_id, content="Tool call to ask for user feedback." # Internal note ) response_message = AIMessage( content="", tool_calls=[{ "id": tool_call_id, "name": "human_assistance", "args": {"query": "Are you satisfied with the initial result? (Please answer 'yes' or 'no')"} }] ) # The graph would pause here, wait for the user's input, # and then that input would be added to the state as a ToolMessage. return {"messages": [response_message]} # Scenario 2: The user has responded. The last message IS a ToolMessage. else: print(f" -> Step 2: Received user response: '{last_message.content}'") user_response = last_message.content.lower().strip() final_message = "" if "yes" in user_response: final_message = "Great! I'm glad I could help. The process is now complete. โœจ" elif "no" in user_response: final_message = "I'm sorry to hear that. Let's try refining the request. What would you like to change?" else: final_message = "I didn't understand that response. Let's start over." # Create a final tool call to display the concluding message to the user. print(" -> Step 3: Displaying final message to the user.") tool_call_id = "human_assist_2" response_message = AIMessage( content="", tool_calls=[{ "id": tool_call_id, "name": "human_assistance", "args": {"query": final_message}, }] ) # In a real graph, you might end the process here or loop back. return { "messages": [response_message], "product_searching_complete": True } def define_workflow() -> CompiledStateGraph: """Defines the workflow graph""" # Initialize the graph workflow = StateGraph(GraphProcessingState) # Add nodes workflow.add_node("tools", DebugToolNode(tools)) workflow.add_node("guidance_node", guidance_node) workflow.add_node("brainstorming_node", brainstorming_node) workflow.add_node("prompt_planning_node", prompt_planning_node) workflow.add_node("generate_3d_node", generate_3d_node) workflow.add_node("user_feedback", satisfaction_clarification_node) # workflow.add_node("planning_node", planning_node) # Edges workflow.add_conditional_edges( "guidance_node", guidance_routing, { "brainstorming_node" : "brainstorming_node", "prompt_planning_node" : "prompt_planning_node", "generate_3d_node" : "generate_3d_node", "user_feedback":"user_feedback", "END": END } ) workflow.add_conditional_edges( "brainstorming_node", tools_condition, ) workflow.add_conditional_edges( "prompt_planning_node", tools_condition, ) workflow.add_edge("tools", "guidance_node") workflow.add_edge("brainstorming_node", "guidance_node") workflow.add_edge("prompt_planning_node", "guidance_node") workflow.add_edge("generate_3d_node", "guidance_node") workflow.add_edge("user_feedback", "guidance_node") # # Set end nodes workflow.set_entry_point("guidance_node") # workflow.set_finish_point("assistant_node") compiled_graph = workflow.compile(checkpointer=memory) try: img_bytes = compiled_graph.get_graph().draw_mermaid_png() with open("graph.png", "wb") as f: f.write(img_bytes) print("Graph image saved as graph.png") except Exception as e: print("Can't print the graph:") print(e) return compiled_graph graph = define_workflow() # workflow.add_conditional_edges( # "guidance_node", # The source node # custom_route_after_guidance, # Your custom condition function # { # # "Path name": "Destination node name" # "execute_tools": "tools", # If function returns "execute_tools" # "proceed_to_next_stage": "planning_node" # If function returns "proceed_to_next_stage" # # Or this could be another router, or END # } # ) # workflow.add_conditional_edges("guidance_node", guidance_routing) # workflow.add_conditional_edges("brainstorming_node", brainstorming_routing) # async def assistant_node(state: GraphProcessingState, config=None): # print("\n--- Assistance Node (Debug via print) ---") # Added a newline for clarity # print(f"Prompt: {state.prompt}") # print(f"Tools Enabled: {state.tools_enabled}") # print(f"Search Enabled: {state.search_enabled}") # print(f"Next Stage: {state.next_stage}") # # Log boolean completion flags # print(f"Idea Complete: {state.idea_complete}") # print(f"Brainstorming Complete: {state.brainstorming_complete}") # print(f"Planning Complete: {state.planning_complete}") # print(f"Drawing Complete: {state.drawing_complete}") # print(f"Product Searching Complete: {state.product_searching_complete}") # print(f"Purchasing Complete: {state.purchasing_complete}") # print("--- End Guidance Node Debug ---") # Added for clarity # print(f"\nMessage: {state.messages}") # assistant_tools = [] # if state.tools_enabled.get("download_website_text", True): # assistant_tools.append(download_website_text) # if search_enabled and state.tools_enabled.get("tavily_search_results_json", True): # assistant_tools.append(tavily_search_tool) # assistant_model = model.bind_tools(assistant_tools) # if state.prompt: # final_prompt = "\n".join([state.prompt, ASSISTANT_SYSTEM_PROMPT_BASE]) # else: # final_prompt = ASSISTANT_SYSTEM_PROMPT_BASE # prompt = ChatPromptTemplate.from_messages( # [ # ("system", final_prompt), # MessagesPlaceholder(variable_name="messages"), # ] # ) # chain = prompt | assistant_model # response = await chain.ainvoke({"messages": state.messages}, config=config) # for msg in response: # if isinstance(msg, HumanMessage): # print("Human:", msg.content) # elif isinstance(msg, AIMessage): # if isinstance(msg.content, list): # ai_texts = [part.get("text", "") for part in msg.content if isinstance(part, dict)] # print("AI:", " ".join(ai_texts)) # else: # print("AI:", msg.content) # idea_complete = evaluate_idea_completion(response) # return { # "messages": response, # "idea_complete": idea_complete # } # # message = llm_with_tools.invoke(state["messages"]) # # Because we will be interrupting during tool execution, # # we disable parallel tool calling to avoid repeating any # # tool invocations when we resume. # assert len(response.tool_calls) <= 1 # idea_complete = evaluate_idea_completion(response) # return { # "messages": response, # "idea_complete": idea_complete # } # # async def planning_node(state: GraphProcessingState, config=None): # # Define the system prompt for planning # planning_prompt = "Based on the user's idea, create a detailed step-by-step plan to build the DIY product." # # Combine the planning prompt with any existing prompts # if state.prompt: # final_prompt = "\n".join([planning_prompt, state.prompt, ASSISTANT_SYSTEM_PROMPT_BASE]) # else: # final_prompt = "\n".join([planning_prompt, ASSISTANT_SYSTEM_PROMPT_BASE]) # # Create the prompt template # prompt = ChatPromptTemplate.from_messages( # [ # ("system", final_prompt), # MessagesPlaceholder(variable_name="messages"), # ] # ) # # Bind tools if necessary # assistant_tools = [] # if state.tools_enabled.get("download_website_text", True): # assistant_tools.append(download_website_text) # if search_enabled and state.tools_enabled.get("tavily_search_results_json", True): # assistant_tools.append(tavily_search_tool) # assistant_model = model.bind_tools(assistant_tools) # # Create the chain and invoke it # chain = prompt | assistant_model # response = await chain.ainvoke({"messages": state.messages}, config=config) # return { # "messages": response # } # async def guidance_node(state: GraphProcessingState, config=None): # print("\n--- Guidance Node (Debug via print) ---") # print(f"Prompt: {state.prompt}") # for message in state.messages: # if isinstance(message, HumanMessage): # print(f"Human: {message.content}") # elif isinstance(message, AIMessage): # if message.content: # if isinstance(message.content, list): # texts = [item.get('text', '') for item in message.content if isinstance(item, dict) and 'text' in item] # if texts: # print(f"AI: {' '.join(texts)}") # elif isinstance(message.content, str): # print(f"AI: {message.content}") # elif isinstance(message, SystemMessage): # print(f"System: {message.content}") # elif isinstance(message, ToolMessage): # print(f"Tool: {message.content}") # print(f"Tools Enabled: {state.tools_enabled}") # print(f"Search Enabled: {state.search_enabled}") # print(f"Next Stage: {state.next_stage}") # print(f"Brainstorming Complete: {state.brainstorming_complete}") # guidance_node.count = getattr(guidance_node, 'count', 0) + 1 # print('\nGuidance Node called count', guidance_node.count) # print("\n--- End Guidance Node Debug ---") # stage_order = ["brainstorming", "planning", "drawing", "product_searching", "purchasing"] # completed = [stage for stage in stage_order if getattr(state, f"{stage}_complete", False)] # incomplete = [stage for stage in stage_order if not getattr(state, f"{stage}_complete", False)] # if not incomplete: # print("All stages complete!") # # Handle case where all stages are complete # # You might want to return a message and end, or set proposed_next_stage to a special value # ai_all_complete_msg = AIMessage(content="All DIY project stages are complete!") # return { # "messages": current_messages + [ai_all_complete_msg], # "next_stage": "end_project", # Or None, or a final summary node # "pending_approval_stage": None, # } # else: # # THIS LINE DEFINES THE VARIABLE # proposed_next_stage = incomplete[0] # print(f"Proposed next stage: {proposed_next_stage}") # status_summary = f"Completed stages: {completed}\nIncomplete stages: {incomplete}" # guidance_prompt_text = ( # "You are the Guiding Assistant for a DIY project. Your primary responsibility is to determine the next logical step " # "and then **obtain the user's explicit approval** before proceeding.\n\n" # f"CURRENT PROJECT STATUS:\n{status_summary}\n\n" # f"Based on the status, the most logical next stage appears to be: **'{proposed_next_stage}'**.\n\n" # "YOUR TASK:\n" # f"1. Formulate a clear and concise question for the user, asking if they agree to proceed to the **'{proposed_next_stage}'** stage. For example: 'It looks like '{proposed_next_stage}' is next. Shall we proceed with that?' or 'Are you ready to move on to {proposed_next_stage}?'\n" # "2. **You MUST use the 'human_assistance' tool to ask this question.** Do not answer directly. Invoke the tool with your question.\n" # "Example of tool usage (though you don't write this, you *call* the tool):\n" # "Tool Call: human_assistance(query='The next stage is planning. Do you want to proceed with planning?')\n\n" # "Consider the user's most recent message if it provides any preference." # ) # if state.prompt: # final_prompt = "\n".join([guidance_prompt_text, state.prompt, ASSISTANT_SYSTEM_PROMPT_BASE]) # else: # final_prompt = "\n".join([guidance_prompt_text, ASSISTANT_SYSTEM_PROMPT_BASE]) # prompt = ChatPromptTemplate.from_messages( # [ # ("system", final_prompt), # MessagesPlaceholder(variable_name="messages"), # ] # ) # assistant_model = model.bind_tools([human_assistance]) # chain = prompt | assistant_model # try: # response = await chain.ainvoke({"messages": state.messages}, config=config) # for msg in response: # if isinstance(msg, HumanMessage): # print("Human:", msg.content) # elif isinstance(msg, AIMessage): # if isinstance(msg.content, list): # ai_texts = [part.get("text", "") for part in msg.content if isinstance(part, dict)] # print("AI:", " ".join(ai_texts)) # else: # print("AI:", msg.content) # # Check for tool calls in the response # if hasattr(response, "tool_calls"): # for tool_call in response.tool_calls: # tool_name = tool_call['name'] # if tool_name == "human_assistance": # query = tool_call['args']['query'] # print(f"Human input needed: {query}") # # Handle human assistance tool call # # You can pause execution and wait for user input here # return { # "messages": [response], # "next_stage": incomplete[0] if incomplete else "brainstorming" # } # except Exception as e: # print(f"Error in guidance node: {e}") # return { # "messages": [AIMessage(content="Error in guidance node.")], # "next_stage": "brainstorming" # }