Spaces:
Runtime error
Runtime error
import logging | |
import os | |
import uuid | |
import aiohttp | |
import json | |
import httpx | |
from typing import Annotated | |
from typing import TypedDict, List, Optional, Literal | |
from typing_extensions import TypedDict | |
from pydantic import BaseModel, Field | |
from trafilatura import extract | |
from langchain_core.messages import AIMessage, HumanMessage, AnyMessage, ToolCall, SystemMessage, ToolMessage | |
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder | |
from langchain_core.tools import tool | |
from langchain_community.tools import TavilySearchResults | |
from langgraph.graph.state import CompiledStateGraph | |
from langgraph.graph import StateGraph, START, END, add_messages | |
from langgraph.prebuilt import ToolNode | |
from langgraph.prebuilt import ToolNode, tools_condition | |
from langgraph.checkpoint.memory import MemorySaver | |
from langgraph.types import Command, interrupt | |
from langchain_anthropic import ChatAnthropic | |
from langchain_openai import ChatOpenAI | |
from mistralai import Mistral | |
from langchain.chat_models import init_chat_model | |
from langchain_core.messages.utils import convert_to_openai_messages | |
class State(TypedDict): | |
messages: Annotated[list, add_messages] | |
class DebugToolNode(ToolNode): | |
async def invoke(self, state, config=None): | |
print("🛠️ ToolNode activated") | |
print(f"Available tools: {[tool.name for tool in self.tool_map.values()]}") | |
print(f"Tool calls in last message: {state.messages[-1].tool_calls}") | |
return await super().invoke(state, config) | |
logger = logging.getLogger(__name__) | |
ASSISTANT_SYSTEM_PROMPT_BASE = """""" | |
search_enabled = bool(os.environ.get("TAVILY_API_KEY")) | |
try: | |
with open('brainstorming_system_prompt.txt', 'r') as file: | |
brainstorming_system_prompt = file.read() | |
except FileNotFoundError: | |
print("File 'system_prompt.txt' not found!") | |
except Exception as e: | |
print(f"Error reading file: {e}") | |
def evaluate_idea_completion(response) -> bool: | |
""" | |
Evaluates whether the assistant's response indicates a complete DIY project idea. | |
You can customize the logic based on your specific criteria. | |
""" | |
# Example logic: Check if the response contains certain keywords | |
required_keywords = ["materials", "dimensions", "tools", "steps"] | |
# Determine the type of response and extract text accordingly | |
if isinstance(response, dict): | |
# If response is a dictionary, extract values and join them into a single string | |
response_text = ' '.join(str(value).lower() for value in response.values()) | |
elif isinstance(response, str): | |
# If response is a string, convert it to lowercase | |
response_text = response.lower() | |
else: | |
# If response is of an unexpected type, convert it to string and lowercase | |
response_text = str(response).lower() | |
return all(keyword in response_text for keyword in required_keywords) | |
async def human_assistance(query: str) -> str: | |
"""Request assistance from a human.""" | |
human_response = await interrupt({"query": query}) # async wait | |
return human_response["data"] | |
async def download_website_text(url: str) -> str: | |
"""Download the text from a website""" | |
try: | |
async with aiohttp.ClientSession() as session: | |
async with session.get(url) as response: | |
response.raise_for_status() | |
downloaded = await response.text() | |
result = extract(downloaded, include_formatting=True, include_links=True, output_format='json', with_metadata=True) | |
return result or "No text found on the website" | |
except Exception as e: | |
logger.error(f"Failed to download {url}: {str(e)}") | |
return f"Error retrieving website content: {str(e)}" | |
async def finalize_idea() -> str: | |
"""Marks the brainstorming phase as complete. This function does nothing else.""" | |
return "Brainstorming finalized." | |
tools = [download_website_text, human_assistance,finalize_idea] | |
memory = MemorySaver() | |
if search_enabled: | |
tavily_search_tool = TavilySearchResults( | |
max_results=5, | |
search_depth="advanced", | |
include_answer=True, | |
include_raw_content=True, | |
) | |
tools.append(tavily_search_tool) | |
else: | |
print("TAVILY_API_KEY environment variable not found. Websearch disabled") | |
weak_model = ChatOpenAI( | |
model="gpt-4o", | |
temperature=0, | |
max_tokens=None, | |
timeout=None, | |
max_retries=2, | |
# api_key="...", # if you prefer to pass api key in directly instaed of using env vars | |
# base_url="...", | |
# organization="...", | |
# other params... | |
) | |
api_key = os.environ["MISTRAL_API_KEY"] | |
model = "mistral-large-latest" | |
client = Mistral(api_key=api_key) | |
# ChatAnthropic( | |
# model="claude-3-5-sonnet-20240620", | |
# temperature=0, | |
# max_tokens=1024, | |
# timeout=None, | |
# max_retries=2, | |
# # other params... | |
# ) | |
search_enabled = bool(os.environ.get("TAVILY_API_KEY")) | |
if not os.environ.get("OPENAI_API_KEY"): | |
print('Open API key not found') | |
prompt_planning_model = ChatOpenAI( | |
model="gpt-4o", | |
temperature=0, | |
max_tokens=None, | |
timeout=None, | |
max_retries=2, | |
# api_key="...", # if you prefer to pass api key in directly instaed of using env vars | |
# base_url="...", | |
# organization="...", | |
# other params... | |
) | |
threed_object_gen_model = ChatOpenAI( | |
model="gpt-4o", | |
temperature=0, | |
max_tokens=None, | |
timeout=None, | |
max_retries=2, | |
# api_key="...", # if you prefer to pass api key in directly instaed of using env vars | |
# base_url="...", | |
# organization="...", | |
# other params... | |
) | |
model = weak_model | |
assistant_model = weak_model | |
class GraphProcessingState(BaseModel): | |
# user_input: str = Field(default_factory=str, description="The original user input") | |
messages: Annotated[list[AnyMessage], add_messages] = Field(default_factory=list) | |
prompt: str = Field(default_factory=str, description="The prompt to be used for the model") | |
tools_enabled: dict = Field(default_factory=dict, description="The tools enabled for the assistant") | |
search_enabled: bool = Field(default=True, description="Whether to enable search tools") | |
next_stage: str = Field(default="", description="The next stage to execute, decided by the guidance node.") | |
tool_call_required: bool = Field(default=False, description="Whether a tool should be called from brainstorming.") | |
loop_brainstorming: bool = Field(default=False, description="Whether to loop back to brainstorming for further iteration.") | |
# Completion flags for each stage | |
idea_complete: bool = Field(default=False) | |
brainstorming_complete: bool = Field(default=False) | |
planning_complete: bool = Field(default=False) | |
drawing_complete: bool = Field(default=False) | |
product_searching_complete: bool = Field(default=False) | |
purchasing_complete: bool = Field(default=False) | |
generated_image_url_from_dalle: str = Field(default="", description="The generated_image_url_from_dalle.") | |
async def guidance_node(state: GraphProcessingState, config=None): | |
# print(f"Prompt: {state.prompt}") | |
# print(f"Prompt: {state.prompt}") | |
# # print(f"Message: {state.messages}") | |
# print(f"Tools Enabled: {state.tools_enabled}") | |
# print(f"Search Enabled: {state.search_enabled}") | |
# for message in state.messages: | |
# print(f'\ncomplete message', message) | |
# if isinstance(message, HumanMessage): | |
# print(f"Human: {message.content}\n") | |
# elif isinstance(message, AIMessage): | |
# # Check if content is non-empty | |
# if message.content: | |
# # If content is a list (e.g., list of dicts), extract text | |
# if isinstance(message.content, list): | |
# texts = [item.get('text', '') for item in message.content if isinstance(item, dict) and 'text' in item] | |
# if texts: | |
# print(f"AI: {' '.join(texts)}\n") | |
# elif isinstance(message.content, str): | |
# print(f"AI: {message.content}") | |
# elif isinstance(message, SystemMessage): | |
# print(f"System: {message.content}\n") | |
# elif isinstance(message, ToolMessage): | |
# print(f"Tool: {message.content}\n") | |
print("\n🕵️♀️🕵️♀️ | start | progress checking nodee \n") # Added a newline for clarity | |
# print(f"Prompt: {state.prompt}\n") | |
if state.messages: | |
last_message = state.messages[-1] | |
if isinstance(last_message, HumanMessage): | |
print(f"🧑 Human: {last_message.content}\n") | |
elif isinstance(last_message, AIMessage): | |
if last_message.content: | |
if isinstance(last_message.content, list): | |
texts = [item.get('text', '') for item in last_message.content if isinstance(item, dict) and 'text' in item] | |
if texts: | |
print(f"🤖 AI: {' '.join(texts)}\n") | |
elif isinstance(last_message.content, str): | |
print(f"🤖 AI: {last_message.content}\n") | |
elif isinstance(last_message, SystemMessage): | |
print(f"⚙️ System: {last_message.content}\n") | |
elif isinstance(last_message, ToolMessage): | |
print(f"🛠️ Tool: {last_message.content}\n") | |
else: | |
print("\n(No messages found.)") | |
# Log boolean completion flags | |
# Define the order of stages | |
stage_order = ["brainstorming", "planning", "drawing", "product_searching", "purchasing"] | |
# Identify completed and incomplete stages | |
completed = [stage for stage in stage_order if getattr(state, f"{stage}_complete", False)] | |
incomplete = [stage for stage in stage_order if not getattr(state, f"{stage}_complete", False)] | |
# Determine the next stage | |
if not incomplete: | |
# All stages are complete | |
return { | |
"messages": [AIMessage(content="All DIY project stages are complete!")], | |
"next_stage": "end_project", | |
"pending_approval_stage": None, | |
} | |
else: | |
# Set the next stage to the first incomplete stage | |
next_stage = incomplete[0] | |
print(f"Next Stage: {state.next_stage}") | |
print("\n🕵️♀️🕵️♀️ | end | progress checking nodee \n") # Added a newline for clarity | |
return { | |
"messages": [], | |
"next_stage": next_stage, | |
"pending_approval_stage": None, | |
} | |
def guidance_routing(state: GraphProcessingState) -> str: | |
print("\n🔀🔀 Routing checkpoint 🔀🔀\n") | |
print(f"Next Stage: {state.next_stage}\n") | |
print(f"Brainstorming complete: {state.brainstorming_complete}") | |
print(f"Prompt planing: {state.planning_complete}") | |
print(f"Drwaing 3d model: {state.drawing_complete}") | |
print(f"Finding products: {state.product_searching_complete}\n") | |
next_stage = state.next_stage | |
if next_stage == "brainstorming": | |
return "brainstorming_node" | |
elif next_stage == "planning": | |
# return "generate_3d_node" | |
return "prompt_planning_node" | |
elif next_stage == "drawing": | |
return "generate_3d_node" | |
elif next_stage == "product_searching": | |
print('\n may day may day may day may day may day') | |
print(f"Prompt: {state.prompt}") | |
print(f"Prompt: {state.prompt}") | |
# print(f"Message: {state.messages}") | |
print(f"Tools Enabled: {state.tools_enabled}") | |
print(f"Search Enabled: {state.search_enabled}") | |
for message in state.messages: | |
print(f'\ncomplete message', message) | |
if isinstance(message, HumanMessage): | |
print(f"Human: {message.content}\n") | |
elif isinstance(message, AIMessage): | |
# Check if content is non-empty | |
if message.content: | |
# If content is a list (e.g., list of dicts), extract text | |
if isinstance(message.content, list): | |
texts = [item.get('text', '') for item in message.content if isinstance(item, dict) and 'text' in item] | |
if texts: | |
print(f"AI: {' '.join(texts)}\n") | |
elif isinstance(message.content, str): | |
print(f"AI: {message.content}") | |
elif isinstance(message, SystemMessage): | |
print(f"System: {message.content}\n") | |
elif isinstance(message, ToolMessage): | |
print(f"Tool: {message.content}\n") | |
# return "drawing_node" | |
# elif next_stage == "product_searching": | |
# return "product_searching" | |
# elif next_stage == "purchasing": | |
# return "purchasing_node" | |
return END | |
async def brainstorming_node(state: GraphProcessingState, config=None): | |
print("\n🧠🧠 | start | brainstorming Node \n") # Added a newline for clarity | |
# Check if model is available | |
if not model: | |
return {"messages": [AIMessage(content="Model not available for brainstorming.")]} | |
# Filter out messages with empty content | |
filtered_messages = [ | |
message for message in state.messages | |
if isinstance(message, (HumanMessage, AIMessage, SystemMessage, ToolMessage)) and message.content | |
] | |
# Ensure there is at least one message with content | |
if not filtered_messages: | |
filtered_messages.append(AIMessage(content="No valid messages provided.")) | |
stage_order = ["brainstorming", "planning", "drawing", "product_searching", "purchasing"] | |
completed = [stage for stage in stage_order if getattr(state, f"{stage}_complete", False)] | |
incomplete = [stage for stage in stage_order if not getattr(state, f"{stage}_complete", False)] | |
if not incomplete: | |
print("All stages complete!") | |
# Handle case where all stages are complete | |
# You might want to return a message and end, or set proposed_next_stage to a special value | |
ai_all_complete_msg = AIMessage(content="All DIY project stages are complete!") | |
return { | |
"messages": current_messages + [ai_all_complete_msg], | |
"next_stage": "end_project", # Or None, or a final summary node | |
"pending_approval_stage": None, | |
} | |
else: | |
# THIS LINE DEFINES THE VARIABLE | |
proposed_next_stage = incomplete[0] | |
guidance_prompt_text = ( | |
""" | |
You are a warm, encouraging, and knowledgeable AI assistant, acting as a **Creative DIY Collaborator**. Your primary goal is to guide the user through a friendly and inspiring conversation to finalize **ONE specific, viable DIY project idea**. While we want to be efficient, the top priority is making the user feel heard, understood, and confident in their final choice. | |
⚠️ Your core directive remains speed and convergence: If you identify an idea that clearly meets ALL **Critical Criteria** and the user seems positive or neutral, you must suggest finalizing it **immediately**. Do NOT delay by offering too many alternatives once a solid candidate emerges. Your goal is to converge on a "good enough" idea the user is happy with, not to explore every possibility. | |
**Your Conversational Style & Strategy:** | |
1. **Be an Active Listener:** Start by acknowledging and validating the user's input. Show you understand their core desire (e.g., "That sounds like a fun goal! Creating a custom piece for your living room is always rewarding."). | |
2. **Ask Inspiring, Open-Ended Questions:** Instead of generic questions, make them feel personal and insightful. | |
* *Instead of:* "What do you want to build?" | |
* *Try:* "What part of your home are you dreaming of improving?" or "Are you thinking of a gift for someone special, or a project just for you?" | |
3. **Act as a Knowledgeable Guide:** When a user is unsure, proactively suggest appealing ideas based on their subtle clues. Connect their interests to tangible projects. | |
* *Example:* If the user mentions liking plants and having a small balcony, you could suggest: "That's great! We could think about a vertical herb garden to save space, or maybe some simple, stylish hanging macrame planters. Does either of those spark your interest?" | |
4. **Guide, Don't Just Gatekeep:** When an idea *almost* meets the criteria, don't just reject it. Gently guide it towards feasibility. | |
* *Example:* "A full-sized dining table might require some specialized tools. How about we adapt that idea into a beautiful, buildable coffee table or a set of side tables using similar techniques?" | |
**Critical Criteria for the Final DIY Project Idea (Your non-negotiable checklist):** | |
1. **Buildable:** Achievable by an average person with basic DIY skills. | |
2. **Common Materials/Tools:** Uses only materials (e.g., wood, screws, glue, paint, fabric, cardboard) and basic hand tools (e.g., screwdrivers, hammers, saws, drills) commonly available in general hardware stores, craft stores, or supermarkets worldwide. | |
3. **Avoid Specializations:** Explicitly AVOID projects requiring specialized electronic components, 3D printing, specific brand items not universally available, or complex machinery. | |
4. **Tangible Product:** The final result must be a physical, tangible item. | |
**Your Internal Process (How you think on each turn):** | |
1. **THOUGHT:** | |
* Clearly state your understanding of the user’s current input and conversational state. | |
* Outline your plan: Engage with their latest input using your **Conversational Style**. Propose or refine an idea to meet the **Critical Criteria**. | |
* **Tool Identification (`human_assistance`):** Decide if you need to ask a question. The question should be formulated according to the "Inspiring, Open-Ended Questions" principle. Clearly state your intention to use the `human_assistance` tool with the exact friendly and natural-sounding question as the `query`. | |
* **Idea Finalization Check:** Check if the current idea satisfies ALL **Critical Criteria**. If yes, and the user shows no objection, move to finalize immediately. Remember: **good enough is final enough**. | |
2. **TOOL USE (`human_assistance` - If Needed):** | |
* Invoke `human_assistance` with your well-formulated, friendly query. | |
3. **RESPONSE SYNTHESIS / IDEA FINALIZATION:** | |
* **If an idea is finalized:** Respond *only* with the exact phrase: | |
`IDEA FINALIZED: [Name of the Idea]` | |
(e.g., `IDEA FINALIZED: Simple Wooden Spice Rack`) | |
* **If brainstorming continues:** | |
* Provide your engaging suggestions or refinements based on your **Conversational Style**. | |
* Await the user response. | |
**General Guidelines (Your core principles):** | |
* **Empathy Over Pure Efficiency:** A positive, collaborative experience is the primary goal. Don't rush the user if they are still exploring. | |
* **Criteria Focused:** Always gently guide ideas toward the **Critical Criteria**. | |
* **One Main Idea at a Time:** Focus the conversation on a single project idea to avoid confusion. | |
* **Rapid Convergence:** Despite the friendly tone, always be looking for the fastest path to a final, viable idea. | |
""" | |
) | |
if state.prompt: | |
final_prompt = "\n".join([ guidance_prompt_text, state.prompt, ASSISTANT_SYSTEM_PROMPT_BASE]) | |
else: | |
final_prompt = "\n".join([ guidance_prompt_text, ASSISTANT_SYSTEM_PROMPT_BASE]) | |
prompt = ChatPromptTemplate.from_messages( | |
[ | |
("system", final_prompt), | |
MessagesPlaceholder(variable_name="messages"), | |
] | |
) | |
# Tools allowed for brainstorming | |
node_tools = [human_assistance] | |
if state.search_enabled and tavily_search_tool: # only add search tool if enabled and initialized | |
node_tools.append(tavily_search_tool) | |
mistraltools = [ | |
{ | |
"type": "function", | |
"function": { | |
"name": "human_assistance", | |
"description": "Ask a question from the user", | |
"parameters": { | |
"type": "object", | |
"properties": { | |
"query": { | |
"type": "string", | |
"query": "The transaction id.", | |
} | |
}, | |
"required": ["query"], | |
}, | |
}, | |
}, | |
{ | |
"type": "function", | |
"function": { | |
"name": "finalize_idea", | |
"description": "Handles finalized ideas. Saves or dispatches the confirmed idea for the next steps. but make sure you give your response with key word IDEA FINALIZED", | |
"parameters": { | |
"type": "object", | |
"properties": { | |
"idea_name": { | |
"type": "string", | |
"description": "The name of the finalized DIY idea.", | |
} | |
}, | |
"required": ["idea_name"] | |
} | |
} | |
} | |
] | |
llm = init_chat_model("mistral-large-latest", model_provider="mistralai") | |
llm_with_tools = llm.bind_tools(mistraltools) | |
chain = prompt | llm_with_tools | |
openai_messages = convert_to_openai_messages(state.messages) | |
openai_messages_with_prompt = [ | |
{"role": "system", "content": final_prompt}, # your guidance prompt | |
*openai_messages # history you’ve already converted | |
] | |
print('open ai formatted', openai_messages_with_prompt[-1]) | |
for msg in openai_messages_with_prompt: | |
print(msg) | |
mistralmodel = "mistral-saba-2502" | |
# Pass filtered messages to the chain | |
try: | |
# response = await chain.ainvoke({"messages": filtered_messages}, config=config) | |
response = client.chat.complete( | |
model = mistralmodel, | |
messages = openai_messages_with_prompt, | |
tools = mistraltools, | |
tool_choice = "any", | |
parallel_tool_calls = False, | |
) | |
mistral_message = response.choices[0].message | |
tool_call = response.choices[0].message.tool_calls[0] | |
function_name = tool_call.function.name | |
function_params = json.loads(tool_call.function.arguments) | |
ai_message = AIMessage( | |
content=mistral_message.content or "", # Use empty string if blank | |
additional_kwargs={ | |
"tool_calls": [ | |
{ | |
"id": tool_call.id, | |
"function": { | |
"name": tool_call.function.name, | |
"arguments": tool_call.function.arguments, | |
}, | |
"type": "function", # Add this if your chain expects it | |
} | |
] | |
} | |
) | |
updates = { | |
"messages": [ai_message], | |
"tool_calls": [ | |
{ | |
"name": function_name, | |
"arguments": function_params, | |
} | |
], | |
"next": function_name, | |
} | |
print("\nfunction_name: ", function_name, "\nfunction_params: ", function_params) | |
print('\n🔍 response from brainstorm\n', updates) | |
if function_name == "finalize_idea": | |
print('finalazing idea') | |
state.brainstorming_complete = True | |
updates["brainstorming_complete"] = True | |
if isinstance(response, AIMessage) and response.content: | |
print(' Identified last AI message', response) | |
if isinstance(response.content, str): | |
content = response.content.strip() | |
elif isinstance(response.content, list): | |
texts = [item.get("text", "") for item in response.content if isinstance(item, dict)] | |
content = " ".join(texts).strip() | |
else: | |
content = str(response.content).strip() | |
print('content for idea finalizing:', content) | |
if "finalize_idea:" in content: # Use 'in' instead of 'startswith' | |
print('✅ final idea') | |
updates.update({ | |
"brainstorming_complete": True, | |
"tool_call_required": False, | |
"loop_brainstorming": False, | |
}) | |
return updates | |
else: | |
# tool_calls = getattr(response, "tool_calls", None) | |
if tool_call: | |
print('🛠️ tool call requested at brainstorming node') | |
updates.update({ | |
"tool_call_required": True, | |
"loop_brainstorming": False, | |
}) | |
if tool_call: | |
tool_call = response.choices[0].message.tool_calls[0] | |
function_name = tool_call.function.name | |
function_params = json.loads(tool_call.function.arguments) | |
print("\nfunction_name: ", function_name, "\nfunction_params: ", function_params) | |
# for tool_call in response.tool_calls: | |
# tool_name = tool_call['name'] | |
# if tool_name == "human_assistance": | |
# query = tool_call['args']['query'] | |
# print(f"Human input needed: {query}") | |
# for tool_call in tool_calls: | |
# if isinstance(tool_call, dict) and 'name' in tool_call and 'args' in tool_call: | |
# print(f"🔧 Tool Call (Dict): {tool_call.get('name')}, Args: {tool_call.get('args')}") | |
# else: | |
# print(f"🔧 Unknown tool_call format: {tool_call}") | |
else: | |
print('💬 decided tp keep brainstorming') | |
updates.update({ | |
"tool_call_required": False, | |
"loop_brainstorming": True, | |
}) | |
print(f"Brainstorming continues: {content}") | |
else: | |
# If no proper response, keep looping brainstorming | |
updates["tool_call_required"] = False | |
updates["loop_brainstorming"] = True | |
print("\n🧠🧠 | end | brainstorming Node \n") | |
return updates | |
except Exception as e: | |
print(f"Error: {e}") | |
return { | |
"messages": [AIMessage(content="Error.")], | |
"next_stage": "brainstorming" | |
} | |
async def prompt_planning_node(state: GraphProcessingState, config=None): | |
print("\n🚩🚩 | start | prompt planing Node \n") | |
# Ensure we have a model | |
if not model: | |
return {"messages": [AIMessage(content="Model not available for planning.")]} | |
filtered_messages = state.messages | |
# Filter out empty messages | |
# filtered_messages = [ | |
# msg for msg in state.messages | |
# if isinstance(msg, (HumanMessage, AIMessage, SystemMessage, ToolMessage)) and msg.content | |
# ] | |
# filtered_messages = [] | |
# for msg in state.messages: | |
# if isinstance(msg, ToolMessage): | |
# # 🛠️ ToolMessage needs to be paired with a prior assistant message that called the tool | |
# tool_name = msg.name or "unknown_tool" | |
# tool_call_id = msg.tool_call_id or "tool_call_id_missing" | |
# # Simulated assistant message that initiated the tool call | |
# fake_assistant_msg = AIMessage( | |
# content="", | |
# additional_kwargs={ | |
# "tool_calls": [ | |
# { | |
# "id": tool_call_id, | |
# "type": "function", | |
# "function": { | |
# "name": tool_name, | |
# "arguments": json.dumps({"content": msg.content or ""}), | |
# } | |
# } | |
# ] | |
# } | |
# ) | |
# # Append both in correct sequence | |
# filtered_messages.append(fake_assistant_msg) | |
# filtered_messages.append(msg) | |
# elif isinstance(msg, (HumanMessage, AIMessage, SystemMessage)) and msg.content: | |
# filtered_messages.append(msg) | |
# Fallback if list ends up empty | |
if not filtered_messages: | |
filtered_messages.append(AIMessage(content="No valid messages provided.")) | |
# Define the system prompt for planning | |
guidance_prompt_text = """ | |
You are a creative and helpful AI assistant acting as a **DIY Project Brainstorming & 3D-Prompt Generator**. Your mission is to collaborate with the user to: | |
1. Brainstorm and refine one specific, viable DIY project idea. | |
2. Identify the single key component from that idea that should be 3D-modeled. | |
3. Produce a final, precise text prompt for an OpenAI 3D-generation endpoint. | |
--- | |
**Critical Criteria for the DIY Project** (must be met): | |
• Buildable by an average person with only basic DIY skills. | |
• Uses common materials/tools (e.g., wood, screws, glue, paint; hammer, saw, drill). | |
• No specialized electronics, 3D printers, or proprietary parts. | |
• Results in a tangible, physical item. | |
--- | |
**Available Tools** | |
• human_assistance – ask the user clarifying questions. | |
• (optional) your project-specific search tool – look up inspiration or standard dimensions if needed. | |
--- | |
**When the DIY idea is fully detailed and meets all criteria, output exactly and only:** | |
ACCURATE PROMPT FOR MODEL GENERATING: [Your final single-paragraph prompt here] | |
""" | |
# Build final prompt | |
if state.prompt: | |
final_prompt = "\n".join([guidance_prompt_text, state.prompt, ASSISTANT_SYSTEM_PROMPT_BASE]) | |
else: | |
final_prompt = "\n".join([guidance_prompt_text, ASSISTANT_SYSTEM_PROMPT_BASE]) | |
prompt = ChatPromptTemplate.from_messages([ | |
("system", final_prompt), | |
MessagesPlaceholder(variable_name="messages"), | |
]) | |
# Bind tools | |
node_tools = [human_assistance] | |
if state.search_enabled and tavily_search_tool: | |
node_tools.append(tavily_search_tool) | |
llm_with_tools = prompt_planning_model.bind_tools(node_tools) | |
chain = prompt | llm_with_tools | |
# print(' 👾👾👾👾Debugging the request going in to prompt planing model') | |
# print("Prompt: ", prompt) | |
# print("chain: ", chain) | |
for msg in filtered_messages: | |
print('✨msg : ',msg) | |
print('\n') | |
try: | |
response = await chain.ainvoke({"messages": filtered_messages}, config=config) | |
print('\nresponse ->: ', response) | |
# Log any required human assistance query | |
if hasattr(response, "tool_calls"): | |
for call in response.tool_calls: | |
if call.get("name") == "human_assistance": | |
print(f"Human input needed: {call['args']['query']}") | |
updates = {"messages": [response]} | |
# Extract response text | |
content = "" | |
if isinstance(response.content, str): | |
content = response.content.strip() | |
elif isinstance(response.content, list): | |
content = " ".join(item.get("text","") for item in response.content if isinstance(item, dict)).strip() | |
# Check for finalization signalif "finalize_idea:" in content: | |
if "ACCURATE PROMPT FOR MODEL GENERATING" in content: | |
dalle_prompt_text = content.replace("ACCURATE PROMPT FOR MODEL GENERATING:", "").strip() | |
print(f"\n🤖🤖🤖🤖Extracted DALL-E prompt: {dalle_prompt_text}") | |
generated_image_url = None | |
generated_3d_model_url = None # This will store the final 3D model URL | |
# --- START: New code for DALL-E and Trellis API calls --- | |
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") | |
if not OPENAI_API_KEY: | |
print("Error: OPENAI_API_KEY environment variable not set.") | |
updates["messages"].append(AIMessage(content="OpenAI API key not configured. Cannot generate image.")) | |
else: | |
# 1. Call DALL-E API | |
dalle_api_url = "https://api.openai.com/v1/images/generations" | |
dalle_headers = { | |
"Content-Type": "application/json", | |
"Authorization": f"Bearer {OPENAI_API_KEY}" | |
} | |
_model_to_use_for_dalle_call = "dall-e-2" # <<< IMPORTANT: Set this to "dall-e-2" or "dall-e-3" | |
_processed_prompt_text = dalle_prompt_text # Start with the original prompt | |
_prompt_was_trimmed_or_issue_found = False | |
_warning_or_error_message_for_updates = None | |
max_prompt_lengths = { | |
"dall-e-2": 1000, | |
"dall-e-3": 4000, | |
"gpt-image-1": 32000 # Included for completeness, though payload is for DALL-E | |
} | |
if not _processed_prompt_text: # Check for empty prompt | |
_message = f"Error: The DALL-E prompt for model '{_model_to_use_for_dalle_call}' cannot be empty. API call will likely fail." | |
print(f"\n🛑🛑🛑🛑 {_message}") | |
_warning_or_error_message_for_updates = _message | |
_prompt_was_trimmed_or_issue_found = True | |
# NOTE: OpenAI API will return an error for an empty prompt. | |
# If you want to prevent the call entirely here, you could add: | |
# updates["messages"].append(AIMessage(content=_message)) | |
# return # or raise an exception | |
elif _model_to_use_for_dalle_call in max_prompt_lengths: | |
_max_len = max_prompt_lengths[_model_to_use_for_dalle_call] | |
_original_len = len(_processed_prompt_text) | |
if _original_len > _max_len: | |
_processed_prompt_text = _processed_prompt_text[:_max_len] | |
_message = ( | |
f"Warning: Prompt for DALL-E ({_model_to_use_for_dalle_call}) was {_original_len} characters. " | |
f"It has been TRUNCATED to the maximum of {_max_len} characters." | |
) | |
print(f"\n⚠️⚠️⚠️⚠️ {_message}") | |
_warning_or_error_message_for_updates = _message | |
_prompt_was_trimmed_or_issue_found = True | |
else: | |
# Model specified in _model_to_use_for_dalle_call is not in our length check dictionary | |
_message = ( | |
f"Notice: Model '{_model_to_use_for_dalle_call}' not found in pre-defined prompt length limits. " | |
"Proceeding with the original prompt. API may reject if prompt is too long for this model." | |
) | |
print(f"\nℹ️ℹ️ℹ️ℹ️ {_message}") | |
# You might not want to add this specific notice to 'updates["messages"]' unless it's critical | |
# _warning_or_error_message_for_updates = _message | |
# _prompt_was_trimmed_or_issue_found = True # Or not, depending on how you view this | |
# Add warning/error to updates if one was generated | |
if _warning_or_error_message_for_updates: | |
# Check if 'updates' and 'AIMessage' are available in the current scope to avoid errors | |
if 'updates' in locals() and isinstance(updates, dict) and 'messages' in updates and 'AIMessage' in globals(): | |
updates["messages"].append(AIMessage(content=_warning_or_error_message_for_updates)) | |
elif 'updates' in globals() and isinstance(updates, dict) and 'messages' in updates: # If AIMessage isn't defined, just append string | |
updates["messages"].append(_warning_or_error_message_for_updates) | |
# --- Prompt Trimming Logic END --- | |
dalle_payload = { | |
"model": _model_to_use_for_dalle_call, # Use the model determined above | |
"prompt": _processed_prompt_text, # Use the processed (potentially trimmed) prompt | |
"n": 1, | |
"size": "1024x1024" | |
# You can add other DALL-E 3 specific params if _model_to_use_for_dalle_call is "dall-e-3" | |
# e.g., "quality": "hd", "style": "vivid" | |
} | |
print(f"\n🤖🤖🤖🤖Calling DALL-E with prompt: {dalle_prompt_text}") | |
async with aiohttp.ClientSession() as session: | |
try: | |
async with session.post(dalle_api_url, headers=dalle_headers, json=dalle_payload) as dalle_response: | |
dalle_response.raise_for_status() # Raise an exception for HTTP errors | |
dalle_data = await dalle_response.json() | |
if dalle_data.get("data") and len(dalle_data["data"]) > 0: | |
generated_image_url = dalle_data["data"][0].get("url") | |
print(f"DALL-E generated image URL: {generated_image_url}") | |
updates["messages"].append(AIMessage(content=f"Image generated by DALL-E: {generated_image_url}")) | |
else: | |
print("Error: DALL-E API did not return image data.") | |
updates["messages"].append(AIMessage(content="Failed to get image from DALL-E.")) | |
except aiohttp.ClientError as e: | |
print(f"DALL-E API call error: {e}") | |
updates["messages"].append(AIMessage(content=f"Error calling DALL-E: {e}")) | |
except json.JSONDecodeError as e: | |
print(f"DALL-E API JSON decode error: {e}. Response: {await dalle_response.text()}") | |
updates["messages"].append(AIMessage(content=f"Error decoding DALL-E response: {e}")) | |
except Exception as e: | |
print(f"Unexpected error during DALL-E processing: {e}") | |
updates["messages"].append(AIMessage(content=f"Unexpected error with DALL-E: {e}")) | |
updates.update({ | |
"generated_image_url_from_dalle": generated_image_url, | |
"planning_complete": True, | |
"tool_call_required": False, | |
"loop_planning": False, | |
}) | |
else: | |
# Check if a tool call was requested | |
if getattr(response, "tool_calls", None): | |
updates.update({ | |
"tool_call_required": True, | |
"loop_planning": False, | |
}) | |
else: | |
updates.update({ | |
"tool_call_required": False, | |
"loop_planning": True, | |
}) | |
print("\n🚩🚩 | end | prompt planing Node \n") | |
return updates | |
except Exception as e: | |
print(f"Error in prompt_planning node: {e}") | |
return { | |
"messages": [AIMessage(content="Error in prompt_planning node.")], | |
"next_stage": state.next_stage or "planning" | |
} | |
async def generate_3d_node(state: GraphProcessingState, config=None): | |
print("\n🚀🚀🚀 | start | Generate 3D Node 🚀🚀🚀\n") | |
# 1. Get the image URL | |
# For now, using a hardcoded URL as requested for testing. | |
# In a real scenario, you might get this from the state: | |
# image_url = state.get("image_url_for_3d") | |
# if not image_url: | |
# print("No image_url_for_3d found in state.") | |
# return {"messages": [AIMessage(content="No image URL found for 3D generation.")]} | |
hardcoded_image_url = state.generated_image_url_from_dalle | |
print(f"Using hardcoded image_url: {hardcoded_image_url}") | |
# 2. Define API endpoint and parameters | |
api_base_url = "https://wishwa-code--trellis-3d-model-generate-dev.modal.run/" | |
params = { | |
"image_url": hardcoded_image_url, | |
"simplify": "0.95", | |
"texture_size": "1024", | |
"sparse_sampling_steps": "12", | |
"sparse_sampling_cfg": "7.5", | |
"slat_sampling_steps": "12", | |
"slat_sampling_cfg": "3", | |
"seed": "42", | |
"output_format": "glb" | |
} | |
# Create a directory to store generated models if it doesn't exist | |
output_dir = "generated_3d_models" | |
os.makedirs(output_dir, exist_ok=True) | |
# 3. Attempt generation with retries | |
for attempt in range(1, 2): | |
print(f"Attempt {attempt} to call 3D generation API...") | |
try: | |
# Note: The API call can take a long time (1.5 mins in your curl example) | |
# Ensure your HTTP client timeout is sufficient. | |
# httpx default timeout is 5 seconds, which is too short. | |
async with httpx.AsyncClient(timeout=120.0) as client: # Timeout set to 120 seconds | |
response = await client.get(api_base_url, params=params) | |
response.raise_for_status() # Raises an HTTPStatusError for 4XX/5XX responses | |
# Successfully got a response | |
if response.status_code == 200: | |
# Assuming the response body is the .glb file content | |
file_name = f"model_{uuid.uuid4()}.glb" | |
file_path = os.path.join(output_dir, file_name) | |
with open(file_path, "wb") as f: | |
f.write(response.content) | |
print(f"Success: 3D model saved to {file_path}") | |
return { | |
"messages": [AIMessage(content=f"3D object generation successful: {file_path}")], | |
"generate_3d_complete": True, | |
"three_d_model_path": file_path, | |
"next_stage": state.get("next_stage") or 'end' # Use .get for safer access | |
} | |
else: | |
# This case might not be reached if raise_for_status() is used effectively, | |
# but good for explicit handling. | |
error_message = f"API returned status {response.status_code}: {response.text}" | |
print(error_message) | |
if attempt == 3: # Last attempt | |
return {"messages": [AIMessage(content=f"Failed to generate 3D object. Last error: {error_message}")]} | |
except httpx.HTTPStatusError as e: | |
error_message = f"HTTP error occurred: {e.response.status_code} - {e.response.text}" | |
print(error_message) | |
if attempt == 3: | |
return {"messages": [AIMessage(content=f"Failed to generate 3D object after 3 attempts. Last HTTP error: {error_message}")]} | |
except httpx.RequestError as e: # Catches network errors, timeout errors etc. | |
error_message = f"Request error occurred: {str(e)}" | |
print(error_message) | |
if attempt == 3: | |
return {"messages": [AIMessage(content=f"Failed to generate 3D object after 3 attempts. Last request error: {error_message}")]} | |
except Exception as e: | |
error_message = f"An unexpected error occurred: {str(e)}" | |
print(error_message) | |
if attempt == 3: | |
return {"messages": [AIMessage(content=f"Failed to generate 3D object after 3 attempts. Last unexpected error: {error_message}")]} | |
if attempt < 2: | |
print("Retrying...") | |
else: | |
print("Max retries reached.") | |
# Failed after retries (this path should ideally be covered by returns in the loop) | |
return {"messages": [AIMessage(content="Failed to generate a valid 3D object after 3 attempts.")]} | |
def define_workflow() -> CompiledStateGraph: | |
"""Defines the workflow graph""" | |
# Initialize the graph | |
workflow = StateGraph(GraphProcessingState) | |
# Add nodes | |
workflow.add_node("tools", DebugToolNode(tools)) | |
workflow.add_node("guidance_node", guidance_node) | |
workflow.add_node("brainstorming_node", brainstorming_node) | |
workflow.add_node("prompt_planning_node", prompt_planning_node) | |
workflow.add_node("generate_3d_node", generate_3d_node) | |
# workflow.add_node("planning_node", planning_node) | |
# Edges | |
workflow.add_conditional_edges( | |
"guidance_node", | |
guidance_routing, | |
{ | |
"brainstorming_node" : "brainstorming_node", | |
"prompt_planning_node" : "prompt_planning_node", | |
"generate_3d_node" : "generate_3d_node" | |
} | |
) | |
workflow.add_conditional_edges( | |
"brainstorming_node", | |
tools_condition, | |
) | |
workflow.add_conditional_edges( | |
"prompt_planning_node", | |
tools_condition, | |
) | |
workflow.add_edge("tools", "guidance_node") | |
workflow.add_edge("brainstorming_node", "guidance_node") | |
workflow.add_edge("prompt_planning_node", "guidance_node") | |
workflow.add_edge("generate_3d_node", "guidance_node") | |
# workflow.add_conditional_edges( | |
# "guidance_node", # The source node | |
# custom_route_after_guidance, # Your custom condition function | |
# { | |
# # "Path name": "Destination node name" | |
# "execute_tools": "tools", # If function returns "execute_tools" | |
# "proceed_to_next_stage": "planning_node" # If function returns "proceed_to_next_stage" | |
# # Or this could be another router, or END | |
# } | |
# ) | |
# workflow.add_conditional_edges("guidance_node", guidance_routing) | |
# workflow.add_conditional_edges("brainstorming_node", brainstorming_routing) | |
# # Set end nodes | |
workflow.set_entry_point("guidance_node") | |
# workflow.set_finish_point("assistant_node") | |
compiled_graph = workflow.compile(checkpointer=memory) | |
try: | |
img_bytes = compiled_graph.get_graph().draw_mermaid_png() | |
with open("graph.png", "wb") as f: | |
f.write(img_bytes) | |
print("Graph image saved as graph.png") | |
except Exception as e: | |
print("Can't print the graph:") | |
print(e) | |
return compiled_graph | |
graph = define_workflow() | |
# async def assistant_node(state: GraphProcessingState, config=None): | |
# print("\n--- Assistance Node (Debug via print) ---") # Added a newline for clarity | |
# print(f"Prompt: {state.prompt}") | |
# print(f"Tools Enabled: {state.tools_enabled}") | |
# print(f"Search Enabled: {state.search_enabled}") | |
# print(f"Next Stage: {state.next_stage}") | |
# # Log boolean completion flags | |
# print(f"Idea Complete: {state.idea_complete}") | |
# print(f"Brainstorming Complete: {state.brainstorming_complete}") | |
# print(f"Planning Complete: {state.planning_complete}") | |
# print(f"Drawing Complete: {state.drawing_complete}") | |
# print(f"Product Searching Complete: {state.product_searching_complete}") | |
# print(f"Purchasing Complete: {state.purchasing_complete}") | |
# print("--- End Guidance Node Debug ---") # Added for clarity | |
# print(f"\nMessage: {state.messages}") | |
# assistant_tools = [] | |
# if state.tools_enabled.get("download_website_text", True): | |
# assistant_tools.append(download_website_text) | |
# if search_enabled and state.tools_enabled.get("tavily_search_results_json", True): | |
# assistant_tools.append(tavily_search_tool) | |
# assistant_model = model.bind_tools(assistant_tools) | |
# if state.prompt: | |
# final_prompt = "\n".join([state.prompt, ASSISTANT_SYSTEM_PROMPT_BASE]) | |
# else: | |
# final_prompt = ASSISTANT_SYSTEM_PROMPT_BASE | |
# prompt = ChatPromptTemplate.from_messages( | |
# [ | |
# ("system", final_prompt), | |
# MessagesPlaceholder(variable_name="messages"), | |
# ] | |
# ) | |
# chain = prompt | assistant_model | |
# response = await chain.ainvoke({"messages": state.messages}, config=config) | |
# for msg in response: | |
# if isinstance(msg, HumanMessage): | |
# print("Human:", msg.content) | |
# elif isinstance(msg, AIMessage): | |
# if isinstance(msg.content, list): | |
# ai_texts = [part.get("text", "") for part in msg.content if isinstance(part, dict)] | |
# print("AI:", " ".join(ai_texts)) | |
# else: | |
# print("AI:", msg.content) | |
# idea_complete = evaluate_idea_completion(response) | |
# return { | |
# "messages": response, | |
# "idea_complete": idea_complete | |
# } | |
# # message = llm_with_tools.invoke(state["messages"]) | |
# # Because we will be interrupting during tool execution, | |
# # we disable parallel tool calling to avoid repeating any | |
# # tool invocations when we resume. | |
# assert len(response.tool_calls) <= 1 | |
# idea_complete = evaluate_idea_completion(response) | |
# return { | |
# "messages": response, | |
# "idea_complete": idea_complete | |
# } | |
# | |
# async def planning_node(state: GraphProcessingState, config=None): | |
# # Define the system prompt for planning | |
# planning_prompt = "Based on the user's idea, create a detailed step-by-step plan to build the DIY product." | |
# # Combine the planning prompt with any existing prompts | |
# if state.prompt: | |
# final_prompt = "\n".join([planning_prompt, state.prompt, ASSISTANT_SYSTEM_PROMPT_BASE]) | |
# else: | |
# final_prompt = "\n".join([planning_prompt, ASSISTANT_SYSTEM_PROMPT_BASE]) | |
# # Create the prompt template | |
# prompt = ChatPromptTemplate.from_messages( | |
# [ | |
# ("system", final_prompt), | |
# MessagesPlaceholder(variable_name="messages"), | |
# ] | |
# ) | |
# # Bind tools if necessary | |
# assistant_tools = [] | |
# if state.tools_enabled.get("download_website_text", True): | |
# assistant_tools.append(download_website_text) | |
# if search_enabled and state.tools_enabled.get("tavily_search_results_json", True): | |
# assistant_tools.append(tavily_search_tool) | |
# assistant_model = model.bind_tools(assistant_tools) | |
# # Create the chain and invoke it | |
# chain = prompt | assistant_model | |
# response = await chain.ainvoke({"messages": state.messages}, config=config) | |
# return { | |
# "messages": response | |
# } | |
# async def guidance_node(state: GraphProcessingState, config=None): | |
# print("\n--- Guidance Node (Debug via print) ---") | |
# print(f"Prompt: {state.prompt}") | |
# for message in state.messages: | |
# if isinstance(message, HumanMessage): | |
# print(f"Human: {message.content}") | |
# elif isinstance(message, AIMessage): | |
# if message.content: | |
# if isinstance(message.content, list): | |
# texts = [item.get('text', '') for item in message.content if isinstance(item, dict) and 'text' in item] | |
# if texts: | |
# print(f"AI: {' '.join(texts)}") | |
# elif isinstance(message.content, str): | |
# print(f"AI: {message.content}") | |
# elif isinstance(message, SystemMessage): | |
# print(f"System: {message.content}") | |
# elif isinstance(message, ToolMessage): | |
# print(f"Tool: {message.content}") | |
# print(f"Tools Enabled: {state.tools_enabled}") | |
# print(f"Search Enabled: {state.search_enabled}") | |
# print(f"Next Stage: {state.next_stage}") | |
# print(f"Brainstorming Complete: {state.brainstorming_complete}") | |
# guidance_node.count = getattr(guidance_node, 'count', 0) + 1 | |
# print('\nGuidance Node called count', guidance_node.count) | |
# print("\n--- End Guidance Node Debug ---") | |
# stage_order = ["brainstorming", "planning", "drawing", "product_searching", "purchasing"] | |
# completed = [stage for stage in stage_order if getattr(state, f"{stage}_complete", False)] | |
# incomplete = [stage for stage in stage_order if not getattr(state, f"{stage}_complete", False)] | |
# if not incomplete: | |
# print("All stages complete!") | |
# # Handle case where all stages are complete | |
# # You might want to return a message and end, or set proposed_next_stage to a special value | |
# ai_all_complete_msg = AIMessage(content="All DIY project stages are complete!") | |
# return { | |
# "messages": current_messages + [ai_all_complete_msg], | |
# "next_stage": "end_project", # Or None, or a final summary node | |
# "pending_approval_stage": None, | |
# } | |
# else: | |
# # THIS LINE DEFINES THE VARIABLE | |
# proposed_next_stage = incomplete[0] | |
# print(f"Proposed next stage: {proposed_next_stage}") | |
# status_summary = f"Completed stages: {completed}\nIncomplete stages: {incomplete}" | |
# guidance_prompt_text = ( | |
# "You are the Guiding Assistant for a DIY project. Your primary responsibility is to determine the next logical step " | |
# "and then **obtain the user's explicit approval** before proceeding.\n\n" | |
# f"CURRENT PROJECT STATUS:\n{status_summary}\n\n" | |
# f"Based on the status, the most logical next stage appears to be: **'{proposed_next_stage}'**.\n\n" | |
# "YOUR TASK:\n" | |
# f"1. Formulate a clear and concise question for the user, asking if they agree to proceed to the **'{proposed_next_stage}'** stage. For example: 'It looks like '{proposed_next_stage}' is next. Shall we proceed with that?' or 'Are you ready to move on to {proposed_next_stage}?'\n" | |
# "2. **You MUST use the 'human_assistance' tool to ask this question.** Do not answer directly. Invoke the tool with your question.\n" | |
# "Example of tool usage (though you don't write this, you *call* the tool):\n" | |
# "Tool Call: human_assistance(query='The next stage is planning. Do you want to proceed with planning?')\n\n" | |
# "Consider the user's most recent message if it provides any preference." | |
# ) | |
# if state.prompt: | |
# final_prompt = "\n".join([guidance_prompt_text, state.prompt, ASSISTANT_SYSTEM_PROMPT_BASE]) | |
# else: | |
# final_prompt = "\n".join([guidance_prompt_text, ASSISTANT_SYSTEM_PROMPT_BASE]) | |
# prompt = ChatPromptTemplate.from_messages( | |
# [ | |
# ("system", final_prompt), | |
# MessagesPlaceholder(variable_name="messages"), | |
# ] | |
# ) | |
# assistant_model = model.bind_tools([human_assistance]) | |
# chain = prompt | assistant_model | |
# try: | |
# response = await chain.ainvoke({"messages": state.messages}, config=config) | |
# for msg in response: | |
# if isinstance(msg, HumanMessage): | |
# print("Human:", msg.content) | |
# elif isinstance(msg, AIMessage): | |
# if isinstance(msg.content, list): | |
# ai_texts = [part.get("text", "") for part in msg.content if isinstance(part, dict)] | |
# print("AI:", " ".join(ai_texts)) | |
# else: | |
# print("AI:", msg.content) | |
# # Check for tool calls in the response | |
# if hasattr(response, "tool_calls"): | |
# for tool_call in response.tool_calls: | |
# tool_name = tool_call['name'] | |
# if tool_name == "human_assistance": | |
# query = tool_call['args']['query'] | |
# print(f"Human input needed: {query}") | |
# # Handle human assistance tool call | |
# # You can pause execution and wait for user input here | |
# return { | |
# "messages": [response], | |
# "next_stage": incomplete[0] if incomplete else "brainstorming" | |
# } | |
# except Exception as e: | |
# print(f"Error in guidance node: {e}") | |
# return { | |
# "messages": [AIMessage(content="Error in guidance node.")], | |
# "next_stage": "brainstorming" | |
# } |