DIY_assistant / graph.py
wishwakankanamg's picture
going to fix loading issue
a9bcab4
raw
history blame
47.1 kB
import logging
import os
import uuid
import aiohttp
import json
import httpx
from typing import Annotated
from typing import TypedDict, List, Optional, Literal
from typing_extensions import TypedDict
from pydantic import BaseModel, Field
from trafilatura import extract
from langchain_core.messages import AIMessage, HumanMessage, AnyMessage, ToolCall, SystemMessage, ToolMessage
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.tools import tool
from langchain_community.tools import TavilySearchResults
from langgraph.graph.state import CompiledStateGraph
from langgraph.graph import StateGraph, START, END, add_messages
from langgraph.prebuilt import ToolNode
from langgraph.prebuilt import ToolNode, tools_condition
from langgraph.checkpoint.memory import MemorySaver
from langgraph.types import Command, interrupt
from langchain_anthropic import ChatAnthropic
from langchain_openai import ChatOpenAI
from mistralai import Mistral
from langchain.chat_models import init_chat_model
from langchain_core.messages.utils import convert_to_openai_messages
class State(TypedDict):
messages: Annotated[list, add_messages]
class DebugToolNode(ToolNode):
async def invoke(self, state, config=None):
print("🛠️ ToolNode activated")
print(f"Available tools: {[tool.name for tool in self.tool_map.values()]}")
print(f"Tool calls in last message: {state.messages[-1].tool_calls}")
return await super().invoke(state, config)
logger = logging.getLogger(__name__)
ASSISTANT_SYSTEM_PROMPT_BASE = """"""
search_enabled = bool(os.environ.get("TAVILY_API_KEY"))
try:
with open('brainstorming_system_prompt.txt', 'r') as file:
brainstorming_system_prompt = file.read()
except FileNotFoundError:
print("File 'system_prompt.txt' not found!")
except Exception as e:
print(f"Error reading file: {e}")
def evaluate_idea_completion(response) -> bool:
"""
Evaluates whether the assistant's response indicates a complete DIY project idea.
You can customize the logic based on your specific criteria.
"""
# Example logic: Check if the response contains certain keywords
required_keywords = ["materials", "dimensions", "tools", "steps"]
# Determine the type of response and extract text accordingly
if isinstance(response, dict):
# If response is a dictionary, extract values and join them into a single string
response_text = ' '.join(str(value).lower() for value in response.values())
elif isinstance(response, str):
# If response is a string, convert it to lowercase
response_text = response.lower()
else:
# If response is of an unexpected type, convert it to string and lowercase
response_text = str(response).lower()
return all(keyword in response_text for keyword in required_keywords)
@tool
async def human_assistance(query: str) -> str:
"""Request assistance from a human."""
human_response = await interrupt({"query": query}) # async wait
return human_response["data"]
@tool
async def download_website_text(url: str) -> str:
"""Download the text from a website"""
try:
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
response.raise_for_status()
downloaded = await response.text()
result = extract(downloaded, include_formatting=True, include_links=True, output_format='json', with_metadata=True)
return result or "No text found on the website"
except Exception as e:
logger.error(f"Failed to download {url}: {str(e)}")
return f"Error retrieving website content: {str(e)}"
@tool
async def finalize_idea() -> str:
"""Marks the brainstorming phase as complete. This function does nothing else."""
return "Brainstorming finalized."
tools = [download_website_text, human_assistance,finalize_idea]
memory = MemorySaver()
if search_enabled:
tavily_search_tool = TavilySearchResults(
max_results=5,
search_depth="advanced",
include_answer=True,
include_raw_content=True,
)
tools.append(tavily_search_tool)
else:
print("TAVILY_API_KEY environment variable not found. Websearch disabled")
weak_model = ChatOpenAI(
model="gpt-4o",
temperature=0,
max_tokens=None,
timeout=None,
max_retries=2,
# api_key="...", # if you prefer to pass api key in directly instaed of using env vars
# base_url="...",
# organization="...",
# other params...
)
api_key = os.environ["MISTRAL_API_KEY"]
model = "mistral-large-latest"
client = Mistral(api_key=api_key)
# ChatAnthropic(
# model="claude-3-5-sonnet-20240620",
# temperature=0,
# max_tokens=1024,
# timeout=None,
# max_retries=2,
# # other params...
# )
search_enabled = bool(os.environ.get("TAVILY_API_KEY"))
if not os.environ.get("OPENAI_API_KEY"):
print('Open API key not found')
prompt_planning_model = ChatOpenAI(
model="gpt-4o",
temperature=0,
max_tokens=None,
timeout=None,
max_retries=2,
# api_key="...", # if you prefer to pass api key in directly instaed of using env vars
# base_url="...",
# organization="...",
# other params...
)
threed_object_gen_model = ChatOpenAI(
model="gpt-4o",
temperature=0,
max_tokens=None,
timeout=None,
max_retries=2,
# api_key="...", # if you prefer to pass api key in directly instaed of using env vars
# base_url="...",
# organization="...",
# other params...
)
model = weak_model
assistant_model = weak_model
class GraphProcessingState(BaseModel):
# user_input: str = Field(default_factory=str, description="The original user input")
messages: Annotated[list[AnyMessage], add_messages] = Field(default_factory=list)
prompt: str = Field(default_factory=str, description="The prompt to be used for the model")
tools_enabled: dict = Field(default_factory=dict, description="The tools enabled for the assistant")
search_enabled: bool = Field(default=True, description="Whether to enable search tools")
next_stage: str = Field(default="", description="The next stage to execute, decided by the guidance node.")
tool_call_required: bool = Field(default=False, description="Whether a tool should be called from brainstorming.")
loop_brainstorming: bool = Field(default=False, description="Whether to loop back to brainstorming for further iteration.")
# Completion flags for each stage
idea_complete: bool = Field(default=False)
brainstorming_complete: bool = Field(default=False)
planning_complete: bool = Field(default=False)
drawing_complete: bool = Field(default=False)
product_searching_complete: bool = Field(default=False)
purchasing_complete: bool = Field(default=False)
async def guidance_node(state: GraphProcessingState, config=None):
print("\n--- Guidance Node (Debug via print) ---\n") # Added a newline for clarity
print(f"Prompt: {state.prompt}")
print(f"Prompt: {state.prompt}")
# print(f"Message: {state.messages}")
print(f"Tools Enabled: {state.tools_enabled}")
print(f"Search Enabled: {state.search_enabled}")
for message in state.messages:
print(f'\ncomplete message', message)
if isinstance(message, HumanMessage):
print(f"Human: {message.content}\n")
elif isinstance(message, AIMessage):
# Check if content is non-empty
if message.content:
# If content is a list (e.g., list of dicts), extract text
if isinstance(message.content, list):
texts = [item.get('text', '') for item in message.content if isinstance(item, dict) and 'text' in item]
if texts:
print(f"AI: {' '.join(texts)}\n")
elif isinstance(message.content, str):
print(f"AI: {message.content}")
elif isinstance(message, SystemMessage):
print(f"System: {message.content}\n")
elif isinstance(message, ToolMessage):
print(f"Tool: {message.content}\n")
# Log boolean completion flags
# Define the order of stages
stage_order = ["brainstorming", "planning", "drawing", "product_searching", "purchasing"]
# Identify completed and incomplete stages
completed = [stage for stage in stage_order if getattr(state, f"{stage}_complete", False)]
incomplete = [stage for stage in stage_order if not getattr(state, f"{stage}_complete", False)]
# Determine the next stage
if not incomplete:
# All stages are complete
return {
"messages": [AIMessage(content="All DIY project stages are complete!")],
"next_stage": "end_project",
"pending_approval_stage": None,
}
else:
# Set the next stage to the first incomplete stage
next_stage = incomplete[0]
print(f"Next Stage: {state.next_stage}")
print("\n--- End of Guidance Node Debug ---\n")
return {
"messages": [],
"next_stage": next_stage,
"pending_approval_stage": None,
}
def guidance_routing(state: GraphProcessingState) -> str:
print("\n--- Guidance Routing (Debug via print) ---\n") # Added a newline for clarity
print(f"Next Stage: {state.next_stage}\n")
print(f"Brainstorming complete: {state.brainstorming_complete}")
print(f"3D prompt: {state.planning_complete}")
print(f"Drwaing 3d model: {state.drawing_complete}")
print(f"Finding products: {state.product_searching_complete}\n")
next_stage = state.next_stage
if next_stage == "brainstorming":
return "brainstorming_node"
elif next_stage == "planning":
return "generate_3d_node"
# return "prompt_planning_node"
elif next_stage == "drawing":
print('\n may day may day may day may day may day')
# return "generate_3d_node"
elif next_stage == "product_searching":
print('\n may day may day may day may day may day')
# return "drawing_node"
# elif next_stage == "product_searching":
# return "product_searching"
# elif next_stage == "purchasing":
# return "purchasing_node"
return END
async def brainstorming_node(state: GraphProcessingState, config=None):
print("\n--- brainstorming Node (Debug via print) ---\n") # Added a newline for clarity
print(f"Tools Enabled: {state.tools_enabled}")
print(f"Search Enabled: {state.search_enabled}")
print(f"Next Stage: {state.next_stage}")
# Log boolean completion flags
print(f"is Brainstorming Complete: {state.brainstorming_complete}")
# Check if model is available
if not model:
return {"messages": [AIMessage(content="Model not available for brainstorming.")]}
# Filter out messages with empty content
filtered_messages = [
message for message in state.messages
if isinstance(message, (HumanMessage, AIMessage, SystemMessage, ToolMessage)) and message.content
]
# Ensure there is at least one message with content
if not filtered_messages:
filtered_messages.append(AIMessage(content="No valid messages provided."))
stage_order = ["brainstorming", "planning", "drawing", "product_searching", "purchasing"]
completed = [stage for stage in stage_order if getattr(state, f"{stage}_complete", False)]
incomplete = [stage for stage in stage_order if not getattr(state, f"{stage}_complete", False)]
if not incomplete:
print("All stages complete!")
# Handle case where all stages are complete
# You might want to return a message and end, or set proposed_next_stage to a special value
ai_all_complete_msg = AIMessage(content="All DIY project stages are complete!")
return {
"messages": current_messages + [ai_all_complete_msg],
"next_stage": "end_project", # Or None, or a final summary node
"pending_approval_stage": None,
}
else:
# THIS LINE DEFINES THE VARIABLE
proposed_next_stage = incomplete[0]
guidance_prompt_text = (
"""
You are a creative and helpful AI assistant acting as a **DIY Project Brainstorming Facilitator**. Your primary goal is to collaborate with the user to finalize **ONE specific, viable DIY project idea** as efficiently and quickly as possible.
⚠️ If you identify any idea that clearly meets ALL **Critical Criteria** and the user appears positive or neutral, you must finalize it **immediately** — even if it is the very first idea proposed. Do NOT delay finalization by over-analyzing or seeking excessive confirmation.
> Your goal is NOT to perfect the idea or generate many options. Instead, **converge rapidly on a “good enough” final idea** that the user can confidently pursue.
**Critical Criteria for the Final DIY Project Idea (MUST be met):**
1. **Buildable:** Achievable by an average person with basic DIY skills.
2. **Common Materials/Tools:** Uses only materials (e.g., wood, screws, glue, paint, fabric, cardboard) and basic hand tools (e.g., screwdrivers, hammers, saws, drills) commonly available in general hardware stores, craft stores, or supermarkets worldwide.
3. **Avoid Specializations:** Explicitly AVOID projects requiring specialized electronic components, 3D printing, specific brand items not universally available, or complex machinery.
4. **Tangible Product:** The final result must be a physical, tangible item.
**Your Process for Each Brainstorming Interaction Cycle:**
1. **THOUGHT:**
* Clearly state your understanding of the user’s current input or the brainstorming state (e.g., "User is seeking initial ideas," "User proposed an idea needing refinement," "We are close to finalizing an idea.").
* Outline your plan for this turn:
* Engage with the user's latest input.
* Propose or refine an idea to meet **Critical Criteria**.
* Decide if you need to ask the user a clarifying question.
* **Tool Identification (`human_assistance`):** If a question is needed for:
* Understanding user interests or skill level.
* Clarifying preferences.
* Getting feedback on a proposed idea.
* Refining the idea to meet criteria.
Clearly state your intention to use the `human_assistance` tool with the exact question as the `query`.
* **Idea Finalization Check:**
* Immediately check if the current idea satisfies ALL **Critical Criteria**.
* If yes, and the user shows no objection, **finalize immediately without waiting for multiple iterations**.
* Prioritize minimal iterations: finalize an idea at the earliest confident point.
* Remember: **good enough is final enough** — do not delay finalization.
2. **TOOL USE (`human_assistance` - If Needed):**
* If a question is necessary, invoke `human_assistance` with your formulated query.
* (Note: The system will execute your tool call.)
3. **RESPONSE SYNTHESIS / IDEA FINALIZATION:**
* After any tool use (or if no tool was needed), synthesize your response.
* **If an idea is finalized:** respond *only* with the exact phrase:
`IDEA FINALIZED: [Name of the Idea]`
(e.g., `IDEA FINALIZED: Simple Wooden Spice Rack`)
Do NOT add extra text.
This signals the end of brainstorming.
* **If brainstorming continues:**
* Provide engaging suggestions or refinements.
* If you just called `human_assistance`, your main output may be the tool call or a brief lead-in.
* Await user response before proceeding.
**General Guidelines:**
* **Collaborative & Iterative:** Work *with* the user; this is a conversation.
* **Criteria Focused:** Gently guide ideas toward ALL **Critical Criteria**.
* **One Main Idea at a Time:** Avoid confusion by focusing discussion on a single project idea or comparable alternatives.
* **User-Centric:** Help the user find a project *they* will be happy with.
* **Clarity:** Be clear and direct.
* **Tool Protocol:** Use `human_assistance` correctly; do not answer your own questions.
* **Rapid Convergence:** Prioritize **finalizing quickly** once criteria are met; avoid endless brainstorming or perfectionism.
"""
)
if state.prompt:
final_prompt = "\n".join([ guidance_prompt_text, state.prompt, ASSISTANT_SYSTEM_PROMPT_BASE])
else:
final_prompt = "\n".join([ guidance_prompt_text, ASSISTANT_SYSTEM_PROMPT_BASE])
prompt = ChatPromptTemplate.from_messages(
[
("system", final_prompt),
MessagesPlaceholder(variable_name="messages"),
]
)
# Tools allowed for brainstorming
node_tools = [human_assistance]
if state.search_enabled and tavily_search_tool: # only add search tool if enabled and initialized
node_tools.append(tavily_search_tool)
mistraltools = [
{
"type": "function",
"function": {
"name": "human_assistance",
"description": "Ask a question from the user",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"query": "The transaction id.",
}
},
"required": ["query"],
},
},
},
{
"type": "function",
"function": {
"name": "finalize_idea",
"description": "Handles finalized ideas. Saves or dispatches the confirmed idea for the next steps. but make sure you give your response with key word IDEA FINALIZED",
"parameters": {
"type": "object",
"properties": {
"idea_name": {
"type": "string",
"description": "The name of the finalized DIY idea.",
}
},
"required": ["idea_name"]
}
}
}
]
llm = init_chat_model("mistral-large-latest", model_provider="mistralai")
llm_with_tools = llm.bind_tools(mistraltools)
chain = prompt | llm_with_tools
openai_messages = convert_to_openai_messages(state.messages)
print('open ai format messaage', openai_messages)
openai_messages_with_prompt = [
{"role": "system", "content": final_prompt}, # your guidance prompt
*openai_messages # history you’ve already converted
]
print('open ai format messaage', openai_messages_with_prompt)
mistralmodel = "mistral-large-latest"
# Pass filtered messages to the chain
try:
# response = await chain.ainvoke({"messages": filtered_messages}, config=config)
response = client.chat.complete(
model = mistralmodel,
messages = openai_messages_with_prompt,
tools = mistraltools,
tool_choice = "any",
parallel_tool_calls = False,
)
mistral_message = response.choices[0].message
tool_call = response.choices[0].message.tool_calls[0]
function_name = tool_call.function.name
function_params = json.loads(tool_call.function.arguments)
ai_message = AIMessage(
content=mistral_message.content or "", # Use empty string if blank
additional_kwargs={
"tool_calls": [
{
"id": tool_call.id,
"function": {
"name": tool_call.function.name,
"arguments": tool_call.function.arguments,
},
"type": "function", # Add this if your chain expects it
}
]
}
)
updates = {
"messages": [ai_message],
"tool_calls": [
{
"name": function_name,
"arguments": function_params,
}
],
"next": function_name,
}
print("\nfunction_name: ", function_name, "\nfunction_params: ", function_params)
print(' 🔍 response from brainstorm', updates)
if function_name == "finalize_idea":
print('came in 🌴🌴🌴🌴🌴')
state.brainstorming_complete = True
updates["brainstorming_complete"] = True
if isinstance(response, AIMessage) and response.content:
print(' 💀💀 came inside the loop', response)
if isinstance(response.content, str):
content = response.content.strip()
elif isinstance(response.content, list):
texts = [item.get("text", "") for item in response.content if isinstance(item, dict)]
content = " ".join(texts).strip()
else:
content = str(response.content).strip()
print('content for idea finalizing:', content)
if "finalize_idea:" in content: # Use 'in' instead of 'startswith'
print('✅ final idea')
updates.update({
"brainstorming_complete": True,
"tool_call_required": False,
"loop_brainstorming": False,
})
return updates
else:
# tool_calls = getattr(response, "tool_calls", None)
if tool_call:
print('🛠️ tool call requested')
updates.update({
"tool_call_required": True,
"loop_brainstorming": False,
})
if tool_call:
tool_call = response.choices[0].message.tool_calls[0]
function_name = tool_call.function.name
function_params = json.loads(tool_call.function.arguments)
print("\nfunction_name: ", function_name, "\nfunction_params: ", function_params)
# for tool_call in response.tool_calls:
# tool_name = tool_call['name']
# if tool_name == "human_assistance":
# query = tool_call['args']['query']
# print(f"Human input needed: {query}")
# for tool_call in tool_calls:
# if isinstance(tool_call, dict) and 'name' in tool_call and 'args' in tool_call:
# print(f"🔧 Tool Call (Dict): {tool_call.get('name')}, Args: {tool_call.get('args')}")
# else:
# print(f"🔧 Unknown tool_call format: {tool_call}")
else:
print('💬 keep brainstorming')
updates.update({
"tool_call_required": False,
"loop_brainstorming": True,
})
print(f"Brainstorming continues: {content}")
else:
# If no proper response, keep looping brainstorming
updates["tool_call_required"] = False
updates["loop_brainstorming"] = True
print("\n--- End Brainstorming Node Debug ---\n")
return updates
except Exception as e:
print(f"Error in guidance node: {e}")
return {
"messages": [AIMessage(content="Error in guidance node.")],
"next_stage": "brainstorming"
}
def brainstorming_routing(state: GraphProcessingState) -> str:
print("\n--- brainstorming_routing Edge (Debug via print) ---") # Added a newline for clarity
print(f"Prompt: {state.prompt}")
# print(f"Message: {state.messages}")
print(f"Tools called: {state.tool_call_required}")
print(f"Search Enabled: {state.search_enabled}")
print(f"Next Stage: {state.next_stage}")
# Log boolean completion flags
print(f"Idea Complete: {state.idea_complete}")
print(f"Brainstorming Complete: {state.brainstorming_complete}")
print(f"Planning Complete: {state.planning_complete}")
print(f"Drawing Complete: {state.drawing_complete}")
print(f"Product Searching Complete: {state.product_searching_complete}")
print(f"Purchasing Complete: {state.purchasing_complete}")
print("--- End Guidance Node Debug ---") # Added for clarity
if state.tool_call_required:
print('calling tools for brainstorming')
return "tools"
elif state.loop_brainstorming:
print('returning back to brainstorming at the route')
return "brainstorming_node"
else:
print('all good in brainstorming route going back to guidance')
return "guidance_node"
# def route_brainstorming(state):
# if state.get("tool_call_required"):
# return "tools"
# else:
# return "guidance_node"
async def prompt_planning_node(state: GraphProcessingState, config=None):
print("\n--- prompt_planning Node (Debug) ---\n")
print(f"Tools Enabled: {state.tools_enabled}")
print(f"Search Enabled: {state.search_enabled}")
print(f"Planning Complete: {state.planning_complete}")
# Ensure we have a model
if not model:
return {"messages": [AIMessage(content="Model not available for planning.")]}
# Filter out empty messages
filtered_messages = [
msg for msg in state.messages
if isinstance(msg, (HumanMessage, AIMessage, SystemMessage, ToolMessage)) and msg.content
]
if not filtered_messages:
filtered_messages.append(AIMessage(content="No valid messages provided."))
# Define the system prompt for planning
guidance_prompt_text = """
You are a creative and helpful AI assistant acting as a **DIY Project Brainstorming & 3D-Prompt Generator**. Your mission is to collaborate with the user to:
1. Brainstorm and refine one specific, viable DIY project idea.
2. Identify the single key component from that idea that should be 3D-modeled.
3. Produce a final, precise text prompt for an OpenAI 3D-generation endpoint.
---
**Critical Criteria for the DIY Project** (must be met):
• Buildable by an average person with only basic DIY skills.
• Uses common materials/tools (e.g., wood, screws, glue, paint; hammer, saw, drill).
• No specialized electronics, 3D printers, or proprietary parts.
• Results in a tangible, physical item.
---
**Available Tools**
• human_assistance – ask the user clarifying questions.
• (optional) your project-specific search tool – look up inspiration or standard dimensions if needed.
---
**When the DIY idea is fully detailed and meets all criteria, output exactly and only:**
ACCURATE PROMPT FOR MODEL GENERATING: [Your final single-paragraph prompt here]
"""
# Build final prompt
if state.prompt:
final_prompt = "\n".join([guidance_prompt_text, state.prompt, ASSISTANT_SYSTEM_PROMPT_BASE])
else:
final_prompt = "\n".join([guidance_prompt_text, ASSISTANT_SYSTEM_PROMPT_BASE])
prompt = ChatPromptTemplate.from_messages([
("system", final_prompt),
MessagesPlaceholder(variable_name="messages"),
])
# Bind tools
node_tools = [human_assistance]
if state.search_enabled and tavily_search_tool:
node_tools.append(tavily_search_tool)
llm_with_tools = prompt_planning_model.bind_tools(node_tools)
chain = prompt | llm_with_tools
try:
response = await chain.ainvoke({"messages": filtered_messages}, config=config)
# Log any required human assistance query
if hasattr(response, "tool_calls"):
for call in response.tool_calls:
if call.get("name") == "human_assistance":
print(f"Human input needed: {call['args']['query']}")
updates = {"messages": [response]}
# Extract response text
content = ""
if isinstance(response.content, str):
content = response.content.strip()
elif isinstance(response.content, list):
content = " ".join(item.get("text","") for item in response.content if isinstance(item, dict)).strip()
# Check for finalization signal
if content.startswith("ACCURATE PROMPT FOR MODEL GENERATING:"):
updates.update({
"planning_complete": True,
"tool_call_required": False,
"loop_planning": False,
})
else:
# Check if a tool call was requested
if getattr(response, "tool_calls", None):
updates.update({
"tool_call_required": True,
"loop_planning": False,
})
else:
updates.update({
"tool_call_required": False,
"loop_planning": True,
})
print("\n--- End prompt_planning Node Debug ---\n")
return updates
except Exception as e:
print(f"Error in prompt_planning node: {e}")
return {
"messages": [AIMessage(content="Error in prompt_planning node.")],
"next_stage": state.next_stage or "planning"
}
async def generate_3d_node(state: GraphProcessingState, config=None):
print("\n-🚀🚀🚀-- Generate 3D via API Node ---🚀🚀🚀\n")
# 1. Get the image URL
# For now, using a hardcoded URL as requested for testing.
# In a real scenario, you might get this from the state:
# image_url = state.get("image_url_for_3d")
# if not image_url:
# print("No image_url_for_3d found in state.")
# return {"messages": [AIMessage(content="No image URL found for 3D generation.")]}
hardcoded_image_url = "https://images.unsplash.com/photo-1748973750733-d037dded16dd?q=80&w=1974&auto=format&fit=crop&ixlib=rb-4.1.0&ixid=M3wxMjA3fDB8MHxwaG90by1wYWdlfHx8fGVufDB8fHx8fA%3D%3D"
print(f"Using hardcoded image_url: {hardcoded_image_url}")
# 2. Define API endpoint and parameters
api_base_url = "https://wishwa-code--trellis-3d-model-generate-dev.modal.run/"
params = {
"image_url": hardcoded_image_url,
"simplify": "0.95",
"texture_size": "1024",
"sparse_sampling_steps": "12",
"sparse_sampling_cfg": "7.5",
"slat_sampling_steps": "12",
"slat_sampling_cfg": "3",
"seed": "42",
"output_format": "glb"
}
# Create a directory to store generated models if it doesn't exist
output_dir = "generated_3d_models"
os.makedirs(output_dir, exist_ok=True)
# 3. Attempt generation with retries
for attempt in range(1, 2):
print(f"Attempt {attempt} to call 3D generation API...")
try:
# Note: The API call can take a long time (1.5 mins in your curl example)
# Ensure your HTTP client timeout is sufficient.
# httpx default timeout is 5 seconds, which is too short.
async with httpx.AsyncClient(timeout=120.0) as client: # Timeout set to 120 seconds
response = await client.get(api_base_url, params=params)
response.raise_for_status() # Raises an HTTPStatusError for 4XX/5XX responses
# Successfully got a response
if response.status_code == 200:
# Assuming the response body is the .glb file content
file_name = f"model_{uuid.uuid4()}.glb"
file_path = os.path.join(output_dir, file_name)
with open(file_path, "wb") as f:
f.write(response.content)
print(f"Success: 3D model saved to {file_path}")
return {
"messages": [AIMessage(content=f"3D object generation successful: {file_path}")],
"generate_3d_complete": True,
"three_d_model_path": file_path,
"next_stage": state.get("next_stage") or 'end' # Use .get for safer access
}
else:
# This case might not be reached if raise_for_status() is used effectively,
# but good for explicit handling.
error_message = f"API returned status {response.status_code}: {response.text}"
print(error_message)
if attempt == 3: # Last attempt
return {"messages": [AIMessage(content=f"Failed to generate 3D object. Last error: {error_message}")]}
except httpx.HTTPStatusError as e:
error_message = f"HTTP error occurred: {e.response.status_code} - {e.response.text}"
print(error_message)
if attempt == 3:
return {"messages": [AIMessage(content=f"Failed to generate 3D object after 3 attempts. Last HTTP error: {error_message}")]}
except httpx.RequestError as e: # Catches network errors, timeout errors etc.
error_message = f"Request error occurred: {str(e)}"
print(error_message)
if attempt == 3:
return {"messages": [AIMessage(content=f"Failed to generate 3D object after 3 attempts. Last request error: {error_message}")]}
except Exception as e:
error_message = f"An unexpected error occurred: {str(e)}"
print(error_message)
if attempt == 3:
return {"messages": [AIMessage(content=f"Failed to generate 3D object after 3 attempts. Last unexpected error: {error_message}")]}
if attempt < 2:
print("Retrying...")
else:
print("Max retries reached.")
# Failed after retries (this path should ideally be covered by returns in the loop)
return {"messages": [AIMessage(content="Failed to generate a valid 3D object after 3 attempts.")]}
def define_workflow() -> CompiledStateGraph:
"""Defines the workflow graph"""
# Initialize the graph
workflow = StateGraph(GraphProcessingState)
# Add nodes
workflow.add_node("tools", DebugToolNode(tools))
workflow.add_node("guidance_node", guidance_node)
workflow.add_node("brainstorming_node", brainstorming_node)
workflow.add_node("prompt_planning_node", prompt_planning_node)
workflow.add_node("generate_3d_node", generate_3d_node)
# workflow.add_node("planning_node", planning_node)
# Edges
workflow.add_conditional_edges(
"guidance_node",
guidance_routing,
{
"brainstorming_node" : "brainstorming_node",
"prompt_planning_node" : "prompt_planning_node",
"generate_3d_node" : "generate_3d_node"
}
)
workflow.add_conditional_edges(
"brainstorming_node",
tools_condition,
)
workflow.add_conditional_edges(
"prompt_planning_node",
tools_condition,
)
workflow.add_edge("tools", "guidance_node")
workflow.add_edge("brainstorming_node", "guidance_node")
workflow.add_edge("prompt_planning_node", "guidance_node")
workflow.add_edge("generate_3d_node", "guidance_node")
# workflow.add_conditional_edges(
# "guidance_node", # The source node
# custom_route_after_guidance, # Your custom condition function
# {
# # "Path name": "Destination node name"
# "execute_tools": "tools", # If function returns "execute_tools"
# "proceed_to_next_stage": "planning_node" # If function returns "proceed_to_next_stage"
# # Or this could be another router, or END
# }
# )
# workflow.add_conditional_edges("guidance_node", guidance_routing)
# workflow.add_conditional_edges("brainstorming_node", brainstorming_routing)
# # Set end nodes
workflow.set_entry_point("guidance_node")
# workflow.set_finish_point("assistant_node")
compiled_graph = workflow.compile(checkpointer=memory)
try:
img_bytes = compiled_graph.get_graph().draw_mermaid_png()
with open("graph.png", "wb") as f:
f.write(img_bytes)
print("Graph image saved as graph.png")
except Exception as e:
print("Can't print the graph:")
print(e)
return compiled_graph
graph = define_workflow()
# async def assistant_node(state: GraphProcessingState, config=None):
# print("\n--- Assistance Node (Debug via print) ---") # Added a newline for clarity
# print(f"Prompt: {state.prompt}")
# print(f"Tools Enabled: {state.tools_enabled}")
# print(f"Search Enabled: {state.search_enabled}")
# print(f"Next Stage: {state.next_stage}")
# # Log boolean completion flags
# print(f"Idea Complete: {state.idea_complete}")
# print(f"Brainstorming Complete: {state.brainstorming_complete}")
# print(f"Planning Complete: {state.planning_complete}")
# print(f"Drawing Complete: {state.drawing_complete}")
# print(f"Product Searching Complete: {state.product_searching_complete}")
# print(f"Purchasing Complete: {state.purchasing_complete}")
# print("--- End Guidance Node Debug ---") # Added for clarity
# print(f"\nMessage: {state.messages}")
# assistant_tools = []
# if state.tools_enabled.get("download_website_text", True):
# assistant_tools.append(download_website_text)
# if search_enabled and state.tools_enabled.get("tavily_search_results_json", True):
# assistant_tools.append(tavily_search_tool)
# assistant_model = model.bind_tools(assistant_tools)
# if state.prompt:
# final_prompt = "\n".join([state.prompt, ASSISTANT_SYSTEM_PROMPT_BASE])
# else:
# final_prompt = ASSISTANT_SYSTEM_PROMPT_BASE
# prompt = ChatPromptTemplate.from_messages(
# [
# ("system", final_prompt),
# MessagesPlaceholder(variable_name="messages"),
# ]
# )
# chain = prompt | assistant_model
# response = await chain.ainvoke({"messages": state.messages}, config=config)
# for msg in response:
# if isinstance(msg, HumanMessage):
# print("Human:", msg.content)
# elif isinstance(msg, AIMessage):
# if isinstance(msg.content, list):
# ai_texts = [part.get("text", "") for part in msg.content if isinstance(part, dict)]
# print("AI:", " ".join(ai_texts))
# else:
# print("AI:", msg.content)
# idea_complete = evaluate_idea_completion(response)
# return {
# "messages": response,
# "idea_complete": idea_complete
# }
# # message = llm_with_tools.invoke(state["messages"])
# # Because we will be interrupting during tool execution,
# # we disable parallel tool calling to avoid repeating any
# # tool invocations when we resume.
# assert len(response.tool_calls) <= 1
# idea_complete = evaluate_idea_completion(response)
# return {
# "messages": response,
# "idea_complete": idea_complete
# }
#
# async def planning_node(state: GraphProcessingState, config=None):
# # Define the system prompt for planning
# planning_prompt = "Based on the user's idea, create a detailed step-by-step plan to build the DIY product."
# # Combine the planning prompt with any existing prompts
# if state.prompt:
# final_prompt = "\n".join([planning_prompt, state.prompt, ASSISTANT_SYSTEM_PROMPT_BASE])
# else:
# final_prompt = "\n".join([planning_prompt, ASSISTANT_SYSTEM_PROMPT_BASE])
# # Create the prompt template
# prompt = ChatPromptTemplate.from_messages(
# [
# ("system", final_prompt),
# MessagesPlaceholder(variable_name="messages"),
# ]
# )
# # Bind tools if necessary
# assistant_tools = []
# if state.tools_enabled.get("download_website_text", True):
# assistant_tools.append(download_website_text)
# if search_enabled and state.tools_enabled.get("tavily_search_results_json", True):
# assistant_tools.append(tavily_search_tool)
# assistant_model = model.bind_tools(assistant_tools)
# # Create the chain and invoke it
# chain = prompt | assistant_model
# response = await chain.ainvoke({"messages": state.messages}, config=config)
# return {
# "messages": response
# }
# async def guidance_node(state: GraphProcessingState, config=None):
# print("\n--- Guidance Node (Debug via print) ---")
# print(f"Prompt: {state.prompt}")
# for message in state.messages:
# if isinstance(message, HumanMessage):
# print(f"Human: {message.content}")
# elif isinstance(message, AIMessage):
# if message.content:
# if isinstance(message.content, list):
# texts = [item.get('text', '') for item in message.content if isinstance(item, dict) and 'text' in item]
# if texts:
# print(f"AI: {' '.join(texts)}")
# elif isinstance(message.content, str):
# print(f"AI: {message.content}")
# elif isinstance(message, SystemMessage):
# print(f"System: {message.content}")
# elif isinstance(message, ToolMessage):
# print(f"Tool: {message.content}")
# print(f"Tools Enabled: {state.tools_enabled}")
# print(f"Search Enabled: {state.search_enabled}")
# print(f"Next Stage: {state.next_stage}")
# print(f"Brainstorming Complete: {state.brainstorming_complete}")
# guidance_node.count = getattr(guidance_node, 'count', 0) + 1
# print('\nGuidance Node called count', guidance_node.count)
# print("\n--- End Guidance Node Debug ---")
# stage_order = ["brainstorming", "planning", "drawing", "product_searching", "purchasing"]
# completed = [stage for stage in stage_order if getattr(state, f"{stage}_complete", False)]
# incomplete = [stage for stage in stage_order if not getattr(state, f"{stage}_complete", False)]
# if not incomplete:
# print("All stages complete!")
# # Handle case where all stages are complete
# # You might want to return a message and end, or set proposed_next_stage to a special value
# ai_all_complete_msg = AIMessage(content="All DIY project stages are complete!")
# return {
# "messages": current_messages + [ai_all_complete_msg],
# "next_stage": "end_project", # Or None, or a final summary node
# "pending_approval_stage": None,
# }
# else:
# # THIS LINE DEFINES THE VARIABLE
# proposed_next_stage = incomplete[0]
# print(f"Proposed next stage: {proposed_next_stage}")
# status_summary = f"Completed stages: {completed}\nIncomplete stages: {incomplete}"
# guidance_prompt_text = (
# "You are the Guiding Assistant for a DIY project. Your primary responsibility is to determine the next logical step "
# "and then **obtain the user's explicit approval** before proceeding.\n\n"
# f"CURRENT PROJECT STATUS:\n{status_summary}\n\n"
# f"Based on the status, the most logical next stage appears to be: **'{proposed_next_stage}'**.\n\n"
# "YOUR TASK:\n"
# f"1. Formulate a clear and concise question for the user, asking if they agree to proceed to the **'{proposed_next_stage}'** stage. For example: 'It looks like '{proposed_next_stage}' is next. Shall we proceed with that?' or 'Are you ready to move on to {proposed_next_stage}?'\n"
# "2. **You MUST use the 'human_assistance' tool to ask this question.** Do not answer directly. Invoke the tool with your question.\n"
# "Example of tool usage (though you don't write this, you *call* the tool):\n"
# "Tool Call: human_assistance(query='The next stage is planning. Do you want to proceed with planning?')\n\n"
# "Consider the user's most recent message if it provides any preference."
# )
# if state.prompt:
# final_prompt = "\n".join([guidance_prompt_text, state.prompt, ASSISTANT_SYSTEM_PROMPT_BASE])
# else:
# final_prompt = "\n".join([guidance_prompt_text, ASSISTANT_SYSTEM_PROMPT_BASE])
# prompt = ChatPromptTemplate.from_messages(
# [
# ("system", final_prompt),
# MessagesPlaceholder(variable_name="messages"),
# ]
# )
# assistant_model = model.bind_tools([human_assistance])
# chain = prompt | assistant_model
# try:
# response = await chain.ainvoke({"messages": state.messages}, config=config)
# for msg in response:
# if isinstance(msg, HumanMessage):
# print("Human:", msg.content)
# elif isinstance(msg, AIMessage):
# if isinstance(msg.content, list):
# ai_texts = [part.get("text", "") for part in msg.content if isinstance(part, dict)]
# print("AI:", " ".join(ai_texts))
# else:
# print("AI:", msg.content)
# # Check for tool calls in the response
# if hasattr(response, "tool_calls"):
# for tool_call in response.tool_calls:
# tool_name = tool_call['name']
# if tool_name == "human_assistance":
# query = tool_call['args']['query']
# print(f"Human input needed: {query}")
# # Handle human assistance tool call
# # You can pause execution and wait for user input here
# return {
# "messages": [response],
# "next_stage": incomplete[0] if incomplete else "brainstorming"
# }
# except Exception as e:
# print(f"Error in guidance node: {e}")
# return {
# "messages": [AIMessage(content="Error in guidance node.")],
# "next_stage": "brainstorming"
# }