DIY_assistant / graph.py
wishwakankanamg's picture
this is the one that hit
40babd7
raw
history blame
63.5 kB
import logging
import os
import uuid
import aiohttp
import json
import httpx
import io
import requests
from urllib.parse import quote
from typing import Annotated
from typing import TypedDict, List, Optional, Literal
from typing_extensions import TypedDict
from pydantic import BaseModel, Field
from trafilatura import extract
from huggingface_hub import InferenceClient
from langchain_core.messages import AIMessage, HumanMessage, AnyMessage, ToolCall, SystemMessage, ToolMessage
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.tools import tool
from langchain_community.tools import TavilySearchResults
from langgraph.graph.state import CompiledStateGraph
from langgraph.graph import StateGraph, START, END, add_messages
from langgraph.prebuilt import ToolNode
from langgraph.prebuilt import ToolNode, tools_condition
from langgraph.checkpoint.memory import MemorySaver
from langgraph.types import Command, interrupt
from langchain_anthropic import ChatAnthropic
from langchain_openai import ChatOpenAI
from mistralai import Mistral
from langchain.chat_models import init_chat_model
from langchain_core.messages.utils import convert_to_openai_messages
class State(TypedDict):
messages: Annotated[list, add_messages]
class DebugToolNode(ToolNode):
async def invoke(self, state, config=None):
print("🛠️ ToolNode activated")
print(f"Available tools: {[tool.name for tool in self.tool_map.values()]}")
print(f"Tool calls in last message: {state.messages[-1].tool_calls}")
return await super().invoke(state, config)
logger = logging.getLogger(__name__)
ASSISTANT_SYSTEM_PROMPT_BASE = """"""
search_enabled = bool(os.environ.get("TAVILY_API_KEY"))
try:
with open('brainstorming_system_prompt.txt', 'r') as file:
brainstorming_system_prompt = file.read()
except FileNotFoundError:
print("File 'system_prompt.txt' not found!")
except Exception as e:
print(f"Error reading file: {e}")
def evaluate_idea_completion(response) -> bool:
"""
Evaluates whether the assistant's response indicates a complete DIY project idea.
You can customize the logic based on your specific criteria.
"""
# Example logic: Check if the response contains certain keywords
required_keywords = ["materials", "dimensions", "tools", "steps"]
# Determine the type of response and extract text accordingly
if isinstance(response, dict):
# If response is a dictionary, extract values and join them into a single string
response_text = ' '.join(str(value).lower() for value in response.values())
elif isinstance(response, str):
# If response is a string, convert it to lowercase
response_text = response.lower()
else:
# If response is of an unexpected type, convert it to string and lowercase
response_text = str(response).lower()
return all(keyword in response_text for keyword in required_keywords)
@tool
async def human_assistance(query: str) -> str:
"""Request assistance from a human."""
human_response = await interrupt({"query": query}) # async wait
return human_response["data"]
@tool
async def download_website_text(url: str) -> str:
"""Download the text from a website"""
try:
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
response.raise_for_status()
downloaded = await response.text()
result = extract(downloaded, include_formatting=True, include_links=True, output_format='json', with_metadata=True)
return result or "No text found on the website"
except Exception as e:
logger.error(f"Failed to download {url}: {str(e)}")
return f"Error retrieving website content: {str(e)}"
@tool
async def finalize_idea() -> str:
"""Marks the brainstorming phase as complete. This function does nothing else."""
return "Brainstorming finalized."
tools = [download_website_text, human_assistance,finalize_idea]
memory = MemorySaver()
if search_enabled:
tavily_search_tool = TavilySearchResults(
max_results=5,
search_depth="advanced",
include_answer=True,
include_raw_content=True,
)
tools.append(tavily_search_tool)
else:
print("TAVILY_API_KEY environment variable not found. Websearch disabled")
weak_model = ChatOpenAI(
model="gpt-4o",
temperature=0,
max_tokens=None,
timeout=None,
max_retries=2,
# api_key="...", # if you prefer to pass api key in directly instaed of using env vars
# base_url="...",
# organization="...",
# other params...
)
api_key = os.environ["MISTRAL_API_KEY"]
model = "mistral-large-latest"
client = Mistral(api_key=api_key)
# ChatAnthropic(
# model="claude-3-5-sonnet-20240620",
# temperature=0,
# max_tokens=1024,
# timeout=None,
# max_retries=2,
# # other params...
# )
search_enabled = bool(os.environ.get("TAVILY_API_KEY"))
if not os.environ.get("OPENAI_API_KEY"):
print('Open API key not found')
prompt_planning_model = ChatOpenAI(
model="gpt-4o",
temperature=0,
max_tokens=None,
timeout=None,
max_retries=2,
# api_key="...", # if you prefer to pass api key in directly instaed of using env vars
# base_url="...",
# organization="...",
# other params...
)
threed_object_gen_model = ChatOpenAI(
model="gpt-4o",
temperature=0,
max_tokens=None,
timeout=None,
max_retries=2,
# api_key="...", # if you prefer to pass api key in directly instaed of using env vars
# base_url="...",
# organization="...",
# other params...
)
huggingfaceclient = InferenceClient(
provider="hf-inference",
api_key=os.environ["HF_TOKEN"],
)
model = weak_model
assistant_model = weak_model
class GraphProcessingState(BaseModel):
# user_input: str = Field(default_factory=str, description="The original user input")
messages: Annotated[list[AnyMessage], add_messages] = Field(default_factory=list)
prompt: str = Field(default_factory=str, description="The prompt to be used for the model")
tools_enabled: dict = Field(default_factory=dict, description="The tools enabled for the assistant")
search_enabled: bool = Field(default=True, description="Whether to enable search tools")
next_stage: str = Field(default="", description="The next stage to execute, decided by the guidance node.")
tool_call_required: bool = Field(default=False, description="Whether a tool should be called from brainstorming.")
loop_brainstorming: bool = Field(default=False, description="Whether to loop back to brainstorming for further iteration.")
# Completion flags for each stage
idea_complete: bool = Field(default=False)
brainstorming_complete: bool = Field(default=False)
planning_complete: bool = Field(default=False)
drawing_complete: bool = Field(default=False)
product_searching_complete: bool = Field(default=False)
purchasing_complete: bool = Field(default=False)
generated_image_url_from_dalle: str = Field(default="", description="The generated_image_url_from_dalle.")
async def guidance_node(state: GraphProcessingState, config=None):
# print(f"Prompt: {state.prompt}")
# print(f"Prompt: {state.prompt}")
# # print(f"Message: {state.messages}")
# print(f"Tools Enabled: {state.tools_enabled}")
# print(f"Search Enabled: {state.search_enabled}")
# for message in state.messages:
# print(f'\ncomplete message', message)
# if isinstance(message, HumanMessage):
# print(f"Human: {message.content}\n")
# elif isinstance(message, AIMessage):
# # Check if content is non-empty
# if message.content:
# # If content is a list (e.g., list of dicts), extract text
# if isinstance(message.content, list):
# texts = [item.get('text', '') for item in message.content if isinstance(item, dict) and 'text' in item]
# if texts:
# print(f"AI: {' '.join(texts)}\n")
# elif isinstance(message.content, str):
# print(f"AI: {message.content}")
# elif isinstance(message, SystemMessage):
# print(f"System: {message.content}\n")
# elif isinstance(message, ToolMessage):
# print(f"Tool: {message.content}\n")
print("\n🕵️‍♀️🕵️‍♀️ | start | progress checking nodee \n") # Added a newline for clarity
# print(f"Prompt: {state.prompt}\n")
if state.messages:
last_message = state.messages[-1]
if isinstance(last_message, HumanMessage):
print(f"🧑 Human: {last_message.content}\n")
elif isinstance(last_message, AIMessage):
if last_message.content:
if isinstance(last_message.content, list):
texts = [item.get('text', '') for item in last_message.content if isinstance(item, dict) and 'text' in item]
if texts:
print(f"🤖 AI: {' '.join(texts)}\n")
elif isinstance(last_message.content, str):
print(f"🤖 AI: {last_message.content}\n")
elif isinstance(last_message, SystemMessage):
print(f"⚙️ System: {last_message.content}\n")
elif isinstance(last_message, ToolMessage):
print(f"🛠️ Tool: {last_message.content}\n")
else:
print("\n(No messages found.)")
# Log boolean completion flags
# Define the order of stages
stage_order = ["brainstorming", "planning", "drawing", "product_searching", "purchasing"]
# Identify completed and incomplete stages
completed = [stage for stage in stage_order if getattr(state, f"{stage}_complete", False)]
incomplete = [stage for stage in stage_order if not getattr(state, f"{stage}_complete", False)]
# Determine the next stage
if not incomplete:
# All stages are complete
return {
"messages": [AIMessage(content="All DIY project stages are complete!")],
"next_stage": "end_project",
"pending_approval_stage": None,
}
else:
# Set the next stage to the first incomplete stage
next_stage = incomplete[0]
print(f"Next Stage: {state.next_stage}")
print("\n🕵️‍♀️🕵️‍♀️ | end | progress checking nodee \n") # Added a newline for clarity
return {
"messages": [],
"next_stage": next_stage,
"pending_approval_stage": None,
}
def guidance_routing(state: GraphProcessingState) -> str:
print("\n🔀🔀 Routing checkpoint 🔀🔀\n")
print(f"Next Stage: {state.next_stage}\n")
print(f"Brainstorming complete: {state.brainstorming_complete}")
print(f"Prompt planing: {state.planning_complete}")
print(f"Drwaing 3d model: {state.drawing_complete}")
print(f"Finding products: {state.product_searching_complete}\n")
next_stage = state.next_stage
if next_stage == "brainstorming":
return "brainstorming_node"
elif next_stage == "planning":
# return "generate_3d_node"
return "prompt_planning_node"
elif next_stage == "drawing":
return "generate_3d_node"
elif next_stage == "product_searching":
print('\n may day may day may day may day may day')
print(f"Prompt: {state.prompt}")
print(f"Prompt: {state.prompt}")
# print(f"Message: {state.messages}")
print(f"Tools Enabled: {state.tools_enabled}")
print(f"Search Enabled: {state.search_enabled}")
for message in state.messages:
print(f'\ncomplete message', message)
if isinstance(message, HumanMessage):
print(f"Human: {message.content}\n")
elif isinstance(message, AIMessage):
# Check if content is non-empty
if message.content:
# If content is a list (e.g., list of dicts), extract text
if isinstance(message.content, list):
texts = [item.get('text', '') for item in message.content if isinstance(item, dict) and 'text' in item]
if texts:
print(f"AI: {' '.join(texts)}\n")
elif isinstance(message.content, str):
print(f"AI: {message.content}")
elif isinstance(message, SystemMessage):
print(f"System: {message.content}\n")
elif isinstance(message, ToolMessage):
print(f"Tool: {message.content}\n")
# return "drawing_node"
# elif next_stage == "product_searching":
# return "product_searching"
# elif next_stage == "purchasing":
# return "purchasing_node"
return END
async def brainstorming_node(state: GraphProcessingState, config=None):
print("\n🧠🧠 | start | brainstorming Node \n") # Added a newline for clarity
# Check if model is available
if not model:
return {"messages": [AIMessage(content="Model not available for brainstorming.")]}
# Filter out messages with empty content
filtered_messages = [
message for message in state.messages
if isinstance(message, (HumanMessage, AIMessage, SystemMessage, ToolMessage)) and message.content
]
# Ensure there is at least one message with content
if not filtered_messages:
filtered_messages.append(AIMessage(content="No valid messages provided."))
stage_order = ["brainstorming", "planning", "drawing", "product_searching", "purchasing"]
completed = [stage for stage in stage_order if getattr(state, f"{stage}_complete", False)]
incomplete = [stage for stage in stage_order if not getattr(state, f"{stage}_complete", False)]
if not incomplete:
print("All stages complete!")
# Handle case where all stages are complete
# You might want to return a message and end, or set proposed_next_stage to a special value
ai_all_complete_msg = AIMessage(content="All DIY project stages are complete!")
return {
"messages": current_messages + [ai_all_complete_msg],
"next_stage": "end_project", # Or None, or a final summary node
"pending_approval_stage": None,
}
else:
# THIS LINE DEFINES THE VARIABLE
proposed_next_stage = incomplete[0]
guidance_prompt_text = (
"""
You are a warm, encouraging, and knowledgeable AI assistant, acting as a **Creative DIY Collaborator**. Your primary goal is to guide the user through a friendly and inspiring conversation to finalize **ONE specific, viable DIY project idea**. While we want to be efficient, the top priority is making the user feel heard, understood, and confident in their final choice.
⚠️ Your core directive remains speed and convergence: If you identify an idea that clearly meets ALL **Critical Criteria** and the user seems positive or neutral, you must suggest finalizing it **immediately**. Do NOT delay by offering too many alternatives once a solid candidate emerges. Your goal is to converge on a "good enough" idea the user is happy with, not to explore every possibility.
**Your Conversational Style & Strategy:**
1. **Be an Active Listener:** Start by acknowledging and validating the user's input. Show you understand their core desire (e.g., "That sounds like a fun goal! Creating a custom piece for your living room is always rewarding.").
2. **Ask Inspiring, Open-Ended Questions:** Instead of generic questions, make them feel personal and insightful.
* *Instead of:* "What do you want to build?"
* *Try:* "What part of your home are you dreaming of improving?" or "Are you thinking of a gift for someone special, or a project just for you?"
3. **Act as a Knowledgeable Guide:** When a user is unsure, proactively suggest appealing ideas based on their subtle clues. Connect their interests to tangible projects.
* *Example:* If the user mentions liking plants and having a small balcony, you could suggest: "That's great! We could think about a vertical herb garden to save space, or maybe some simple, stylish hanging macrame planters. Does either of those spark your interest?"
4. **Guide, Don't Just Gatekeep:** When an idea *almost* meets the criteria, don't just reject it. Gently guide it towards feasibility.
* *Example:* "A full-sized dining table might require some specialized tools. How about we adapt that idea into a beautiful, buildable coffee table or a set of side tables using similar techniques?"
**Critical Criteria for the Final DIY Project Idea (Your non-negotiable checklist):**
1. **Buildable:** Achievable by an average person with basic DIY skills.
2. **Common Materials/Tools:** Uses only materials (e.g., wood, screws, glue, paint, fabric, cardboard) and basic hand tools (e.g., screwdrivers, hammers, saws, drills) commonly available in general hardware stores, craft stores, or supermarkets worldwide.
3. **Avoid Specializations:** Explicitly AVOID projects requiring specialized electronic components, 3D printing, specific brand items not universally available, or complex machinery.
4. **Tangible Product:** The final result must be a physical, tangible item.
**Your Internal Process (How you think on each turn):**
1. **THOUGHT:**
* Clearly state your understanding of the user’s current input and conversational state.
* Outline your plan: Engage with their latest input using your **Conversational Style**. Propose or refine an idea to meet the **Critical Criteria**.
* **Tool Identification (`human_assistance`):** Decide if you need to ask a question. The question should be formulated according to the "Inspiring, Open-Ended Questions" principle. Clearly state your intention to use the `human_assistance` tool with the exact friendly and natural-sounding question as the `query`.
* **Idea Finalization Check:** Check if the current idea satisfies ALL **Critical Criteria**. If yes, and the user shows no objection, move to finalize immediately. Remember: **good enough is final enough**.
2. **TOOL USE (`human_assistance` - If Needed):**
* Invoke `human_assistance` with your well-formulated, friendly query.
3. **RESPONSE SYNTHESIS / IDEA FINALIZATION:**
* **If an idea is finalized:** Respond *only* with the exact phrase:
`IDEA FINALIZED: [Name of the Idea]`
(e.g., `IDEA FINALIZED: Simple Wooden Spice Rack`)
* **If brainstorming continues:**
* Provide your engaging suggestions or refinements based on your **Conversational Style**.
* Await the user response.
**General Guidelines (Your core principles):**
* **Empathy Over Pure Efficiency:** A positive, collaborative experience is the primary goal. Don't rush the user if they are still exploring.
* **Criteria Focused:** Always gently guide ideas toward the **Critical Criteria**.
* **One Main Idea at a Time:** Focus the conversation on a single project idea to avoid confusion.
* **Rapid Convergence:** Despite the friendly tone, always be looking for the fastest path to a final, viable idea.
"""
)
if state.prompt:
final_prompt = "\n".join([ guidance_prompt_text, state.prompt, ASSISTANT_SYSTEM_PROMPT_BASE])
else:
final_prompt = "\n".join([ guidance_prompt_text, ASSISTANT_SYSTEM_PROMPT_BASE])
prompt = ChatPromptTemplate.from_messages(
[
("system", final_prompt),
MessagesPlaceholder(variable_name="messages"),
]
)
# Tools allowed for brainstorming
node_tools = [human_assistance]
if state.search_enabled and tavily_search_tool: # only add search tool if enabled and initialized
node_tools.append(tavily_search_tool)
mistraltools = [
{
"type": "function",
"function": {
"name": "human_assistance",
"description": "Ask a question from the user",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"query": "The transaction id.",
}
},
"required": ["query"],
},
},
},
{
"type": "function",
"function": {
"name": "finalize_idea",
"description": "Handles finalized ideas. Saves or dispatches the confirmed idea for the next steps. but make sure you give your response with key word IDEA FINALIZED",
"parameters": {
"type": "object",
"properties": {
"idea_name": {
"type": "string",
"description": "The name of the finalized DIY idea.",
}
},
"required": ["idea_name"]
}
}
}
]
llm = init_chat_model("mistral-large-latest", model_provider="mistralai")
llm_with_tools = llm.bind_tools(mistraltools)
chain = prompt | llm_with_tools
openai_messages = convert_to_openai_messages(state.messages)
openai_messages_with_prompt = [
{"role": "system", "content": final_prompt}, # your guidance prompt
*openai_messages # history you’ve already converted
]
print('open ai formatted', openai_messages_with_prompt[-1])
for msg in openai_messages_with_prompt:
print(msg)
mistralmodel = "mistral-saba-2502"
# Pass filtered messages to the chain
try:
# response = await chain.ainvoke({"messages": filtered_messages}, config=config)
response = client.chat.complete(
model = mistralmodel,
messages = openai_messages_with_prompt,
tools = mistraltools,
tool_choice = "any",
parallel_tool_calls = False,
)
mistral_message = response.choices[0].message
tool_call = response.choices[0].message.tool_calls[0]
function_name = tool_call.function.name
function_params = json.loads(tool_call.function.arguments)
ai_message = AIMessage(
content=mistral_message.content or "", # Use empty string if blank
additional_kwargs={
"tool_calls": [
{
"id": tool_call.id,
"function": {
"name": tool_call.function.name,
"arguments": tool_call.function.arguments,
},
"type": "function", # Add this if your chain expects it
}
]
}
)
updates = {
"messages": [ai_message],
"tool_calls": [
{
"name": function_name,
"arguments": function_params,
}
],
"next": function_name,
}
print("\nfunction_name: ", function_name, "\nfunction_params: ", function_params)
print('\n🔍 response from brainstorm\n', updates)
if function_name == "finalize_idea":
print('finalazing idea')
state.brainstorming_complete = True
updates["brainstorming_complete"] = True
if isinstance(response, AIMessage) and response.content:
print(' Identified last AI message', response)
if isinstance(response.content, str):
content = response.content.strip()
elif isinstance(response.content, list):
texts = [item.get("text", "") for item in response.content if isinstance(item, dict)]
content = " ".join(texts).strip()
else:
content = str(response.content).strip()
print('content for idea finalizing:', content)
if "finalize_idea:" in content: # Use 'in' instead of 'startswith'
print('✅ final idea')
updates.update({
"brainstorming_complete": True,
"tool_call_required": False,
"loop_brainstorming": False,
})
return updates
else:
# tool_calls = getattr(response, "tool_calls", None)
if tool_call:
print('🛠️ tool call requested at brainstorming node')
updates.update({
"tool_call_required": True,
"loop_brainstorming": False,
})
if tool_call:
tool_call = response.choices[0].message.tool_calls[0]
function_name = tool_call.function.name
function_params = json.loads(tool_call.function.arguments)
print("\nfunction_name: ", function_name, "\nfunction_params: ", function_params)
# for tool_call in response.tool_calls:
# tool_name = tool_call['name']
# if tool_name == "human_assistance":
# query = tool_call['args']['query']
# print(f"Human input needed: {query}")
# for tool_call in tool_calls:
# if isinstance(tool_call, dict) and 'name' in tool_call and 'args' in tool_call:
# print(f"🔧 Tool Call (Dict): {tool_call.get('name')}, Args: {tool_call.get('args')}")
# else:
# print(f"🔧 Unknown tool_call format: {tool_call}")
else:
print('💬 decided tp keep brainstorming')
updates.update({
"tool_call_required": False,
"loop_brainstorming": True,
})
print(f"Brainstorming continues: {content}")
else:
# If no proper response, keep looping brainstorming
updates["tool_call_required"] = False
updates["loop_brainstorming"] = True
print("\n🧠🧠 | end | brainstorming Node \n")
return updates
except Exception as e:
print(f"Error: {e}")
return {
"messages": [AIMessage(content="Error.")],
"next_stage": "brainstorming"
}
async def prompt_planning_node(state: GraphProcessingState, config=None):
print("\n🚩🚩 | start | prompt planing Node \n")
# Ensure we have a model
if not model:
return {"messages": [AIMessage(content="Model not available for planning.")]}
filtered_messages = state.messages
# Filter out empty messages
# filtered_messages = [
# msg for msg in state.messages
# if isinstance(msg, (HumanMessage, AIMessage, SystemMessage, ToolMessage)) and msg.content
# ]
# filtered_messages = []
# for msg in state.messages:
# if isinstance(msg, ToolMessage):
# # 🛠️ ToolMessage needs to be paired with a prior assistant message that called the tool
# tool_name = msg.name or "unknown_tool"
# tool_call_id = msg.tool_call_id or "tool_call_id_missing"
# # Simulated assistant message that initiated the tool call
# fake_assistant_msg = AIMessage(
# content="",
# additional_kwargs={
# "tool_calls": [
# {
# "id": tool_call_id,
# "type": "function",
# "function": {
# "name": tool_name,
# "arguments": json.dumps({"content": msg.content or ""}),
# }
# }
# ]
# }
# )
# # Append both in correct sequence
# filtered_messages.append(fake_assistant_msg)
# filtered_messages.append(msg)
# elif isinstance(msg, (HumanMessage, AIMessage, SystemMessage)) and msg.content:
# filtered_messages.append(msg)
# Fallback if list ends up empty
if not filtered_messages:
filtered_messages.append(AIMessage(content="No valid messages provided."))
# Define the system prompt for planning
guidance_prompt_text = """
You are a creative and helpful AI assistant acting as a **DIY Project Brainstorming & 3D-Prompt Generator**. Your mission is to collaborate with the user to:
1. Brainstorm and refine one specific, viable DIY project idea.
2. Identify the single key component from that idea that should be 3D-modeled.
3. Produce a final, precise text prompt for an OpenAI 3D-generation endpoint.
---
**Critical Criteria for the DIY Project** (must be met):
• Buildable by an average person with only basic DIY skills.
• Uses common materials/tools (e.g., wood, screws, glue, paint; hammer, saw, drill).
• No specialized electronics, 3D printers, or proprietary parts.
• Results in a tangible, physical item.
---
**Available Tools**
• human_assistance – ask the user clarifying questions.
• (optional) your project-specific search tool – look up inspiration or standard dimensions if needed.
---
**When the DIY idea is fully detailed and meets all criteria, output exactly and only:**
ACCURATE PROMPT FOR MODEL GENERATING: [Your final single-paragraph prompt here]
"""
# Build final prompt
if state.prompt:
final_prompt = "\n".join([guidance_prompt_text, state.prompt, ASSISTANT_SYSTEM_PROMPT_BASE])
else:
final_prompt = "\n".join([guidance_prompt_text, ASSISTANT_SYSTEM_PROMPT_BASE])
prompt = ChatPromptTemplate.from_messages([
("system", final_prompt),
MessagesPlaceholder(variable_name="messages"),
])
# Bind tools
node_tools = [human_assistance]
if state.search_enabled and tavily_search_tool:
node_tools.append(tavily_search_tool)
llm_with_tools = prompt_planning_model.bind_tools(node_tools)
chain = prompt | llm_with_tools
# print(' 👾👾👾👾Debugging the request going in to prompt planing model')
# print("Prompt: ", prompt)
# print("chain: ", chain)
for msg in filtered_messages:
print('✨msg : ',msg)
print('\n')
try:
response = await chain.ainvoke({"messages": filtered_messages}, config=config)
print('\nresponse ->: ', response)
# Log any required human assistance query
if hasattr(response, "tool_calls"):
for call in response.tool_calls:
if call.get("name") == "human_assistance":
print(f"Human input needed: {call['args']['query']}")
updates = {"messages": [response]}
# Extract response text
content = ""
if isinstance(response.content, str):
content = response.content.strip()
elif isinstance(response.content, list):
content = " ".join(item.get("text","") for item in response.content if isinstance(item, dict)).strip()
# Check for finalization signalif "finalize_idea:" in content:
if "ACCURATE PROMPT FOR MODEL GENERATING" in content:
dalle_prompt_text = content.replace("ACCURATE PROMPT FOR MODEL GENERATING:", "").strip()
print(f"\n🤖🤖🤖🤖Extracted DALL-E prompt: {dalle_prompt_text}")
generated_image_url = None
generated_3d_model_url = None # This will store the final 3D model URL
# --- START: New code for DALL-E and Trellis API calls ---
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
if not OPENAI_API_KEY:
print("Error: OPENAI_API_KEY environment variable not set.")
updates["messages"].append(AIMessage(content="OpenAI API key not configured. Cannot generate image."))
else:
# try:
# --- Your existing client setup ---
# prompt = dalle_prompt_text
# model_id = "black-forest-labs/FLUX.1-dev" # or any other model
# print(f"Generating image for prompt: '{prompt}' with model '{model_id}'...")
# # output is a PIL.Image object
# image = huggingfaceclient.text_to_image(
# prompt,
# model=model_id,
# )
# print("Image generated successfully.")
# # --- Code to save the image ---
# # 1. Define the directory name
# output_directory = "files"
# os.makedirs(output_directory, exist_ok=True)
# print(f"Ensured directory '{output_directory}' exists.")
# image_filename = "astronaut_horse.png"
# full_save_path = os.path.join(output_directory, image_filename)
# # 5. Save the PIL.Image object
# # The image object (if it's a PIL.Image) has a .save() method
# image.save(full_save_path)
# print(f"Image saved successfully to: {full_save_path}")
# if image:
# print("\nAttempting to upload generated image to Supabase...")
# # Define the filename for Supabase (can include a path prefix)
# supabase_target_filename = f"hf_generated_{uuid}" # Example: put in a 'hf_generated' folder
# # 1. Save the PIL image to a temporary in-memory buffer
# img_byte_arr = io.BytesIO()
# image.save(img_byte_arr, format='JPEG') # Match Dart's 'image/jpeg'
# img_byte_arr.seek(0) # Reset buffer's position to the beginning
# # Prepare the file for the multipart/form-data request
# # The field name 'file' and 'filename' should match what your Edge Function expects.
# files_payload = {
# 'file': (supabase_target_filename, img_byte_arr, 'image/jpeg')
# }
# # Headers (Content-Type for multipart/form-data is set automatically by requests
# # when using the `files` parameter, but you can set other headers if your edge function needs them)
# upload_headers = {
# # 'Authorization': 'Bearer YOUR_SUPABASE_ANON_KEY_OR_SERVICE_KEY_IF_EDGE_FUNCTION_NEEDS_IT'
# }
# print(f"Uploading image to Supabase Edge Function: as {supabase_target_filename}...")
# supabase_public_url = None
# try:
# upload_response = requests.post(
# 'https://yqewezudxihyadvmfovd.supabase.co/functions/v1/storage-upload',
# files=files_payload,
# headers=upload_headers
# )
# upload_response.raise_for_status() # Raise an HTTPError for bad responses (4XX or 5XX)
# # 2. Parse the response from the Edge Function
# # The Dart code expects: imgresponse.data['data']['path']
# response_json = upload_response.json()
# if 'data' in response_json and 'path' in response_json['data']:
# raw_path = response_json['data']['path']
# print(f"Edge function returned raw path: {raw_path}")
# # 3. Construct the public URL
# # The public URL format for Supabase Storage is:
# # SUPABASE_URL/storage/v1/object/public/BUCKET_NAME/FILE_PATH
# # The FILE_PATH needs to be URL encoded.
# encoded_path = quote(raw_path)
# # generated_image_url = f"{encoded_path}"'https://yqewezudxihyadvmfovd.supabase.co/storage/v1/object/public/product_images/$encodedPath';
# generated_image_url =f"https://yqewezudxihyadvmfovd.supabase.co/storage/v1/object/public/product_images/{encoded_path}"
# print(f"\nSuccessfully uploaded to Supabase!")
# print(f"Public URL: {generated_image_url}")
# else:
# print(f"Error: Unexpected response format from Edge Function: {response_json}")
# print("\nFailed to upload image to Supabase.")
# except requests.exceptions.RequestException as e_upload:
# print(f"Error uploading to Supabase: {e_upload}")
# if hasattr(e_upload, 'response') and e_upload.response is not None:
# print(f"Supabase Response status: {e_upload.response.status_code}")
# print(f"Supabase Response text: {e_upload.response.text}")
# print("\nFailed to upload image to Supabase.")
# except Exception as e_upload_generic:
# print(f"An unexpected error occurred during Supabase upload: {e_upload_generic}")
# print("\nFailed to upload image to Supabase.")
# else:
# print("No image was generated, skipping Supabase upload.")
# except KeyError:
# print("Error: The HF_TOKEN environment variable is not set.")
# print("Please set it before running the script. For example:")
# print(" export HF_TOKEN='your_hugging_face_api_token'")
# except ImportError:
# print("Error: The Pillow (PIL) library might not be installed correctly.")
# print("If 'image' is a PIL.Image object, Pillow is required to save it.")
# print("You might need to install it: pip install Pillow huggingface_hub")
# except Exception as e:
# print(f"An error occurred: {e}")
# print("Make sure your API token is valid, has the necessary permissions,")
# print(f"and the model '{model_id}' is accessible and compatible.")
# 1. Call DALL-E API
dalle_api_url = "https://api.openai.com/v1/images/generations"
dalle_headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {OPENAI_API_KEY}"
}
_model_to_use_for_dalle_call = "dall-e-2" # <<< IMPORTANT: Set this to "dall-e-2" or "dall-e-3"
_processed_prompt_text = dalle_prompt_text # Start with the original prompt
_prompt_was_trimmed_or_issue_found = False
_warning_or_error_message_for_updates = None
max_prompt_lengths = {
"dall-e-2": 1000,
"dall-e-3": 4000,
"gpt-image-1": 32000 # Included for completeness, though payload is for DALL-E
}
if not _processed_prompt_text: # Check for empty prompt
_message = f"Error: The DALL-E prompt for model '{_model_to_use_for_dalle_call}' cannot be empty. API call will likely fail."
print(f"\n🛑🛑🛑🛑 {_message}")
_warning_or_error_message_for_updates = _message
_prompt_was_trimmed_or_issue_found = True
# NOTE: OpenAI API will return an error for an empty prompt.
# If you want to prevent the call entirely here, you could add:
# updates["messages"].append(AIMessage(content=_message))
# return # or raise an exception
elif _model_to_use_for_dalle_call in max_prompt_lengths:
_max_len = max_prompt_lengths[_model_to_use_for_dalle_call]
_original_len = len(_processed_prompt_text)
if _original_len > _max_len:
_processed_prompt_text = _processed_prompt_text[:_max_len]
_message = (
f"Warning: Prompt for DALL-E ({_model_to_use_for_dalle_call}) was {_original_len} characters. "
f"It has been TRUNCATED to the maximum of {_max_len} characters."
)
print(f"\n⚠️⚠️⚠️⚠️ {_message}")
_warning_or_error_message_for_updates = _message
_prompt_was_trimmed_or_issue_found = True
else:
# Model specified in _model_to_use_for_dalle_call is not in our length check dictionary
_message = (
f"Notice: Model '{_model_to_use_for_dalle_call}' not found in pre-defined prompt length limits. "
"Proceeding with the original prompt. API may reject if prompt is too long for this model."
)
print(f"\nℹ️ℹ️ℹ️ℹ️ {_message}")
# You might not want to add this specific notice to 'updates["messages"]' unless it's critical
# _warning_or_error_message_for_updates = _message
# _prompt_was_trimmed_or_issue_found = True # Or not, depending on how you view this
# Add warning/error to updates if one was generated
if _warning_or_error_message_for_updates:
# Check if 'updates' and 'AIMessage' are available in the current scope to avoid errors
if 'updates' in locals() and isinstance(updates, dict) and 'messages' in updates and 'AIMessage' in globals():
updates["messages"].append(AIMessage(content=_warning_or_error_message_for_updates))
elif 'updates' in globals() and isinstance(updates, dict) and 'messages' in updates: # If AIMessage isn't defined, just append string
updates["messages"].append(_warning_or_error_message_for_updates)
# --- Prompt Trimming Logic END ---
dalle_payload = {
"model": _model_to_use_for_dalle_call, # Use the model determined above
"prompt": _processed_prompt_text, # Use the processed (potentially trimmed) prompt
"n": 1,
"size": "1024x1024"
# You can add other DALL-E 3 specific params if _model_to_use_for_dalle_call is "dall-e-3"
# e.g., "quality": "hd", "style": "vivid"
}
print(f"\n🤖🤖🤖🤖Calling DALL-E with prompt: {dalle_prompt_text}")
async with aiohttp.ClientSession() as session:
try:
async with session.post(dalle_api_url, headers=dalle_headers, json=dalle_payload) as dalle_response:
dalle_response.raise_for_status() # Raise an exception for HTTP errors
dalle_data = await dalle_response.json()
if dalle_data.get("data") and len(dalle_data["data"]) > 0:
generated_image_url = dalle_data["data"][0].get("url")
print(f"DALL-E generated image URL: {generated_image_url}")
updates["messages"].append(AIMessage(content=f"Image generated by DALL-E: {generated_image_url}"))
else:
print("Error: DALL-E API did not return image data.")
updates["messages"].append(AIMessage(content="Failed to get image from DALL-E."))
except aiohttp.ClientError as e:
print(f"DALL-E API call error: {e}")
updates["messages"].append(AIMessage(content=f"Error calling DALL-E: {e}"))
except json.JSONDecodeError as e:
print(f"DALL-E API JSON decode error: {e}. Response: {await dalle_response.text()}")
updates["messages"].append(AIMessage(content=f"Error decoding DALL-E response: {e}"))
except Exception as e:
print(f"Unexpected error during DALL-E processing: {e}")
updates["messages"].append(AIMessage(content=f"Unexpected error with DALL-E: {e}"))
updates.update({
"generated_image_url_from_dalle": generated_image_url,
"planning_complete": True,
"tool_call_required": False,
"loop_planning": False,
})
else:
# Check if a tool call was requested
if getattr(response, "tool_calls", None):
updates.update({
"tool_call_required": True,
"loop_planning": False,
})
else:
updates.update({
"tool_call_required": False,
"loop_planning": True,
})
print("\n🚩🚩 | end | prompt planing Node \n")
return updates
except Exception as e:
print(f"Error in prompt_planning node: {e}")
return {
"messages": [AIMessage(content="Error in prompt_planning node.")],
"next_stage": state.next_stage or "planning"
}
async def generate_3d_node(state: GraphProcessingState, config=None):
print("\n🚀🚀🚀 | start | Generate 3D Node 🚀🚀🚀\n")
# 1. Get the image URL
# For now, using a hardcoded URL as requested for testing.
# In a real scenario, you might get this from the state:
# image_url = state.get("image_url_for_3d")
# if not image_url:
# print("No image_url_for_3d found in state.")
# return {"messages": [AIMessage(content="No image URL found for 3D generation.")]}
hardcoded_image_url = state.generated_image_url_from_dalle
print(f"Using hardcoded image_url: {hardcoded_image_url}")
# 2. Define API endpoint and parameters
api_base_url = "https://wishwa-code--trellis-3d-model-generate-dev.modal.run/"
params = {
"image_url": hardcoded_image_url,
"simplify": "0.95",
"texture_size": "1024",
"sparse_sampling_steps": "12",
"sparse_sampling_cfg": "7.5",
"slat_sampling_steps": "12",
"slat_sampling_cfg": "3",
"seed": "42",
"output_format": "glb"
}
# Create a directory to store generated models if it doesn't exist
output_dir = "generated_3d_models"
os.makedirs(output_dir, exist_ok=True)
# 3. Attempt generation with retries
for attempt in range(1, 2):
print(f"Attempt {attempt} to call 3D generation API...")
try:
# Note: The API call can take a long time (1.5 mins in your curl example)
# Ensure your HTTP client timeout is sufficient.
# httpx default timeout is 5 seconds, which is too short.
async with httpx.AsyncClient(timeout=120.0) as client: # Timeout set to 120 seconds
response = await client.get(api_base_url, params=params)
response.raise_for_status() # Raises an HTTPStatusError for 4XX/5XX responses
# Successfully got a response
if response.status_code == 200:
# Assuming the response body is the .glb file content
file_name = f"model_{uuid.uuid4()}.glb"
file_path = os.path.join(output_dir, file_name)
with open(file_path, "wb") as f:
f.write(response.content)
print(f"Success: 3D model saved to {file_path}")
return {
"messages": [AIMessage(content=f"3D object generation successful: {file_path}")],
"generate_3d_complete": True,
"three_d_model_path": file_path,
"next_stage": state.get("next_stage") or 'end' # Use .get for safer access
}
else:
# This case might not be reached if raise_for_status() is used effectively,
# but good for explicit handling.
error_message = f"API returned status {response.status_code}: {response.text}"
print(error_message)
if attempt == 3: # Last attempt
return {"messages": [AIMessage(content=f"Failed to generate 3D object. Last error: {error_message}")]}
except httpx.HTTPStatusError as e:
error_message = f"HTTP error occurred: {e.response.status_code} - {e.response.text}"
print(error_message)
if attempt == 3:
return {"messages": [AIMessage(content=f"Failed to generate 3D object after 3 attempts. Last HTTP error: {error_message}")]}
except httpx.RequestError as e: # Catches network errors, timeout errors etc.
error_message = f"Request error occurred: {str(e)}"
print(error_message)
if attempt == 3:
return {"messages": [AIMessage(content=f"Failed to generate 3D object after 3 attempts. Last request error: {error_message}")]}
except Exception as e:
error_message = f"An unexpected error occurred: {str(e)}"
print(error_message)
if attempt == 3:
return {"messages": [AIMessage(content=f"Failed to generate 3D object after 3 attempts. Last unexpected error: {error_message}")]}
if attempt < 2:
print("Retrying...")
else:
print("Max retries reached.")
# Failed after retries (this path should ideally be covered by returns in the loop)
return {"messages": [AIMessage(content="Failed to generate a valid 3D object after 3 attempts.")]}
def define_workflow() -> CompiledStateGraph:
"""Defines the workflow graph"""
# Initialize the graph
workflow = StateGraph(GraphProcessingState)
# Add nodes
workflow.add_node("tools", DebugToolNode(tools))
workflow.add_node("guidance_node", guidance_node)
workflow.add_node("brainstorming_node", brainstorming_node)
workflow.add_node("prompt_planning_node", prompt_planning_node)
workflow.add_node("generate_3d_node", generate_3d_node)
# workflow.add_node("planning_node", planning_node)
# Edges
workflow.add_conditional_edges(
"guidance_node",
guidance_routing,
{
"brainstorming_node" : "brainstorming_node",
"prompt_planning_node" : "prompt_planning_node",
"generate_3d_node" : "generate_3d_node"
}
)
workflow.add_conditional_edges(
"brainstorming_node",
tools_condition,
)
workflow.add_conditional_edges(
"prompt_planning_node",
tools_condition,
)
workflow.add_edge("tools", "guidance_node")
workflow.add_edge("brainstorming_node", "guidance_node")
workflow.add_edge("prompt_planning_node", "guidance_node")
workflow.add_edge("generate_3d_node", "guidance_node")
# workflow.add_conditional_edges(
# "guidance_node", # The source node
# custom_route_after_guidance, # Your custom condition function
# {
# # "Path name": "Destination node name"
# "execute_tools": "tools", # If function returns "execute_tools"
# "proceed_to_next_stage": "planning_node" # If function returns "proceed_to_next_stage"
# # Or this could be another router, or END
# }
# )
# workflow.add_conditional_edges("guidance_node", guidance_routing)
# workflow.add_conditional_edges("brainstorming_node", brainstorming_routing)
# # Set end nodes
workflow.set_entry_point("guidance_node")
# workflow.set_finish_point("assistant_node")
compiled_graph = workflow.compile(checkpointer=memory)
try:
img_bytes = compiled_graph.get_graph().draw_mermaid_png()
with open("graph.png", "wb") as f:
f.write(img_bytes)
print("Graph image saved as graph.png")
except Exception as e:
print("Can't print the graph:")
print(e)
return compiled_graph
graph = define_workflow()
# async def assistant_node(state: GraphProcessingState, config=None):
# print("\n--- Assistance Node (Debug via print) ---") # Added a newline for clarity
# print(f"Prompt: {state.prompt}")
# print(f"Tools Enabled: {state.tools_enabled}")
# print(f"Search Enabled: {state.search_enabled}")
# print(f"Next Stage: {state.next_stage}")
# # Log boolean completion flags
# print(f"Idea Complete: {state.idea_complete}")
# print(f"Brainstorming Complete: {state.brainstorming_complete}")
# print(f"Planning Complete: {state.planning_complete}")
# print(f"Drawing Complete: {state.drawing_complete}")
# print(f"Product Searching Complete: {state.product_searching_complete}")
# print(f"Purchasing Complete: {state.purchasing_complete}")
# print("--- End Guidance Node Debug ---") # Added for clarity
# print(f"\nMessage: {state.messages}")
# assistant_tools = []
# if state.tools_enabled.get("download_website_text", True):
# assistant_tools.append(download_website_text)
# if search_enabled and state.tools_enabled.get("tavily_search_results_json", True):
# assistant_tools.append(tavily_search_tool)
# assistant_model = model.bind_tools(assistant_tools)
# if state.prompt:
# final_prompt = "\n".join([state.prompt, ASSISTANT_SYSTEM_PROMPT_BASE])
# else:
# final_prompt = ASSISTANT_SYSTEM_PROMPT_BASE
# prompt = ChatPromptTemplate.from_messages(
# [
# ("system", final_prompt),
# MessagesPlaceholder(variable_name="messages"),
# ]
# )
# chain = prompt | assistant_model
# response = await chain.ainvoke({"messages": state.messages}, config=config)
# for msg in response:
# if isinstance(msg, HumanMessage):
# print("Human:", msg.content)
# elif isinstance(msg, AIMessage):
# if isinstance(msg.content, list):
# ai_texts = [part.get("text", "") for part in msg.content if isinstance(part, dict)]
# print("AI:", " ".join(ai_texts))
# else:
# print("AI:", msg.content)
# idea_complete = evaluate_idea_completion(response)
# return {
# "messages": response,
# "idea_complete": idea_complete
# }
# # message = llm_with_tools.invoke(state["messages"])
# # Because we will be interrupting during tool execution,
# # we disable parallel tool calling to avoid repeating any
# # tool invocations when we resume.
# assert len(response.tool_calls) <= 1
# idea_complete = evaluate_idea_completion(response)
# return {
# "messages": response,
# "idea_complete": idea_complete
# }
#
# async def planning_node(state: GraphProcessingState, config=None):
# # Define the system prompt for planning
# planning_prompt = "Based on the user's idea, create a detailed step-by-step plan to build the DIY product."
# # Combine the planning prompt with any existing prompts
# if state.prompt:
# final_prompt = "\n".join([planning_prompt, state.prompt, ASSISTANT_SYSTEM_PROMPT_BASE])
# else:
# final_prompt = "\n".join([planning_prompt, ASSISTANT_SYSTEM_PROMPT_BASE])
# # Create the prompt template
# prompt = ChatPromptTemplate.from_messages(
# [
# ("system", final_prompt),
# MessagesPlaceholder(variable_name="messages"),
# ]
# )
# # Bind tools if necessary
# assistant_tools = []
# if state.tools_enabled.get("download_website_text", True):
# assistant_tools.append(download_website_text)
# if search_enabled and state.tools_enabled.get("tavily_search_results_json", True):
# assistant_tools.append(tavily_search_tool)
# assistant_model = model.bind_tools(assistant_tools)
# # Create the chain and invoke it
# chain = prompt | assistant_model
# response = await chain.ainvoke({"messages": state.messages}, config=config)
# return {
# "messages": response
# }
# async def guidance_node(state: GraphProcessingState, config=None):
# print("\n--- Guidance Node (Debug via print) ---")
# print(f"Prompt: {state.prompt}")
# for message in state.messages:
# if isinstance(message, HumanMessage):
# print(f"Human: {message.content}")
# elif isinstance(message, AIMessage):
# if message.content:
# if isinstance(message.content, list):
# texts = [item.get('text', '') for item in message.content if isinstance(item, dict) and 'text' in item]
# if texts:
# print(f"AI: {' '.join(texts)}")
# elif isinstance(message.content, str):
# print(f"AI: {message.content}")
# elif isinstance(message, SystemMessage):
# print(f"System: {message.content}")
# elif isinstance(message, ToolMessage):
# print(f"Tool: {message.content}")
# print(f"Tools Enabled: {state.tools_enabled}")
# print(f"Search Enabled: {state.search_enabled}")
# print(f"Next Stage: {state.next_stage}")
# print(f"Brainstorming Complete: {state.brainstorming_complete}")
# guidance_node.count = getattr(guidance_node, 'count', 0) + 1
# print('\nGuidance Node called count', guidance_node.count)
# print("\n--- End Guidance Node Debug ---")
# stage_order = ["brainstorming", "planning", "drawing", "product_searching", "purchasing"]
# completed = [stage for stage in stage_order if getattr(state, f"{stage}_complete", False)]
# incomplete = [stage for stage in stage_order if not getattr(state, f"{stage}_complete", False)]
# if not incomplete:
# print("All stages complete!")
# # Handle case where all stages are complete
# # You might want to return a message and end, or set proposed_next_stage to a special value
# ai_all_complete_msg = AIMessage(content="All DIY project stages are complete!")
# return {
# "messages": current_messages + [ai_all_complete_msg],
# "next_stage": "end_project", # Or None, or a final summary node
# "pending_approval_stage": None,
# }
# else:
# # THIS LINE DEFINES THE VARIABLE
# proposed_next_stage = incomplete[0]
# print(f"Proposed next stage: {proposed_next_stage}")
# status_summary = f"Completed stages: {completed}\nIncomplete stages: {incomplete}"
# guidance_prompt_text = (
# "You are the Guiding Assistant for a DIY project. Your primary responsibility is to determine the next logical step "
# "and then **obtain the user's explicit approval** before proceeding.\n\n"
# f"CURRENT PROJECT STATUS:\n{status_summary}\n\n"
# f"Based on the status, the most logical next stage appears to be: **'{proposed_next_stage}'**.\n\n"
# "YOUR TASK:\n"
# f"1. Formulate a clear and concise question for the user, asking if they agree to proceed to the **'{proposed_next_stage}'** stage. For example: 'It looks like '{proposed_next_stage}' is next. Shall we proceed with that?' or 'Are you ready to move on to {proposed_next_stage}?'\n"
# "2. **You MUST use the 'human_assistance' tool to ask this question.** Do not answer directly. Invoke the tool with your question.\n"
# "Example of tool usage (though you don't write this, you *call* the tool):\n"
# "Tool Call: human_assistance(query='The next stage is planning. Do you want to proceed with planning?')\n\n"
# "Consider the user's most recent message if it provides any preference."
# )
# if state.prompt:
# final_prompt = "\n".join([guidance_prompt_text, state.prompt, ASSISTANT_SYSTEM_PROMPT_BASE])
# else:
# final_prompt = "\n".join([guidance_prompt_text, ASSISTANT_SYSTEM_PROMPT_BASE])
# prompt = ChatPromptTemplate.from_messages(
# [
# ("system", final_prompt),
# MessagesPlaceholder(variable_name="messages"),
# ]
# )
# assistant_model = model.bind_tools([human_assistance])
# chain = prompt | assistant_model
# try:
# response = await chain.ainvoke({"messages": state.messages}, config=config)
# for msg in response:
# if isinstance(msg, HumanMessage):
# print("Human:", msg.content)
# elif isinstance(msg, AIMessage):
# if isinstance(msg.content, list):
# ai_texts = [part.get("text", "") for part in msg.content if isinstance(part, dict)]
# print("AI:", " ".join(ai_texts))
# else:
# print("AI:", msg.content)
# # Check for tool calls in the response
# if hasattr(response, "tool_calls"):
# for tool_call in response.tool_calls:
# tool_name = tool_call['name']
# if tool_name == "human_assistance":
# query = tool_call['args']['query']
# print(f"Human input needed: {query}")
# # Handle human assistance tool call
# # You can pause execution and wait for user input here
# return {
# "messages": [response],
# "next_stage": incomplete[0] if incomplete else "brainstorming"
# }
# except Exception as e:
# print(f"Error in guidance node: {e}")
# return {
# "messages": [AIMessage(content="Error in guidance node.")],
# "next_stage": "brainstorming"
# }