Spaces:
Runtime error
Runtime error
import logging | |
import os | |
import uuid | |
import aiohttp | |
import json | |
import httpx | |
from typing import Annotated | |
from typing import TypedDict, List, Optional, Literal | |
from typing_extensions import TypedDict | |
from pydantic import BaseModel, Field | |
from trafilatura import extract | |
from langchain_core.messages import AIMessage, HumanMessage, AnyMessage, ToolCall, SystemMessage, ToolMessage | |
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder | |
from langchain_core.tools import tool | |
from langchain_community.tools import TavilySearchResults | |
from langgraph.graph.state import CompiledStateGraph | |
from langgraph.graph import StateGraph, START, END, add_messages | |
from langgraph.prebuilt import ToolNode | |
from langgraph.prebuilt import ToolNode, tools_condition | |
from langgraph.checkpoint.memory import MemorySaver | |
from langgraph.types import Command, interrupt | |
from langchain_anthropic import ChatAnthropic | |
from langchain_openai import ChatOpenAI | |
from mistralai import Mistral | |
from langchain.chat_models import init_chat_model | |
from langchain_core.messages.utils import convert_to_openai_messages | |
class State(TypedDict): | |
messages: Annotated[list, add_messages] | |
class DebugToolNode(ToolNode): | |
async def invoke(self, state, config=None): | |
print("🛠️ ToolNode activated") | |
print(f"Available tools: {[tool.name for tool in self.tool_map.values()]}") | |
print(f"Tool calls in last message: {state.messages[-1].tool_calls}") | |
return await super().invoke(state, config) | |
logger = logging.getLogger(__name__) | |
ASSISTANT_SYSTEM_PROMPT_BASE = """""" | |
search_enabled = bool(os.environ.get("TAVILY_API_KEY")) | |
try: | |
with open('brainstorming_system_prompt.txt', 'r') as file: | |
brainstorming_system_prompt = file.read() | |
except FileNotFoundError: | |
print("File 'system_prompt.txt' not found!") | |
except Exception as e: | |
print(f"Error reading file: {e}") | |
def evaluate_idea_completion(response) -> bool: | |
""" | |
Evaluates whether the assistant's response indicates a complete DIY project idea. | |
You can customize the logic based on your specific criteria. | |
""" | |
# Example logic: Check if the response contains certain keywords | |
required_keywords = ["materials", "dimensions", "tools", "steps"] | |
# Determine the type of response and extract text accordingly | |
if isinstance(response, dict): | |
# If response is a dictionary, extract values and join them into a single string | |
response_text = ' '.join(str(value).lower() for value in response.values()) | |
elif isinstance(response, str): | |
# If response is a string, convert it to lowercase | |
response_text = response.lower() | |
else: | |
# If response is of an unexpected type, convert it to string and lowercase | |
response_text = str(response).lower() | |
return all(keyword in response_text for keyword in required_keywords) | |
async def human_assistance(query: str) -> str: | |
"""Request assistance from a human.""" | |
human_response = await interrupt({"query": query}) # async wait | |
return human_response["data"] | |
async def download_website_text(url: str) -> str: | |
"""Download the text from a website""" | |
try: | |
async with aiohttp.ClientSession() as session: | |
async with session.get(url) as response: | |
response.raise_for_status() | |
downloaded = await response.text() | |
result = extract(downloaded, include_formatting=True, include_links=True, output_format='json', with_metadata=True) | |
return result or "No text found on the website" | |
except Exception as e: | |
logger.error(f"Failed to download {url}: {str(e)}") | |
return f"Error retrieving website content: {str(e)}" | |
async def finalize_idea() -> str: | |
"""Marks the brainstorming phase as complete. This function does nothing else.""" | |
return "Brainstorming finalized." | |
tools = [download_website_text, human_assistance,finalize_idea] | |
memory = MemorySaver() | |
if search_enabled: | |
tavily_search_tool = TavilySearchResults( | |
max_results=5, | |
search_depth="advanced", | |
include_answer=True, | |
include_raw_content=True, | |
) | |
tools.append(tavily_search_tool) | |
else: | |
print("TAVILY_API_KEY environment variable not found. Websearch disabled") | |
weak_model = ChatOpenAI( | |
model="gpt-4o", | |
temperature=0, | |
max_tokens=None, | |
timeout=None, | |
max_retries=2, | |
# api_key="...", # if you prefer to pass api key in directly instaed of using env vars | |
# base_url="...", | |
# organization="...", | |
# other params... | |
) | |
api_key = os.environ["MISTRAL_API_KEY"] | |
model = "mistral-large-latest" | |
client = Mistral(api_key=api_key) | |
# ChatAnthropic( | |
# model="claude-3-5-sonnet-20240620", | |
# temperature=0, | |
# max_tokens=1024, | |
# timeout=None, | |
# max_retries=2, | |
# # other params... | |
# ) | |
search_enabled = bool(os.environ.get("TAVILY_API_KEY")) | |
if not os.environ.get("OPENAI_API_KEY"): | |
print('Open API key not found') | |
prompt_planning_model = ChatOpenAI( | |
model="gpt-4o", | |
temperature=0, | |
max_tokens=None, | |
timeout=None, | |
max_retries=2, | |
# api_key="...", # if you prefer to pass api key in directly instaed of using env vars | |
# base_url="...", | |
# organization="...", | |
# other params... | |
) | |
threed_object_gen_model = ChatOpenAI( | |
model="gpt-4o", | |
temperature=0, | |
max_tokens=None, | |
timeout=None, | |
max_retries=2, | |
# api_key="...", # if you prefer to pass api key in directly instaed of using env vars | |
# base_url="...", | |
# organization="...", | |
# other params... | |
) | |
model = weak_model | |
assistant_model = weak_model | |
class GraphProcessingState(BaseModel): | |
# user_input: str = Field(default_factory=str, description="The original user input") | |
messages: Annotated[list[AnyMessage], add_messages] = Field(default_factory=list) | |
prompt: str = Field(default_factory=str, description="The prompt to be used for the model") | |
tools_enabled: dict = Field(default_factory=dict, description="The tools enabled for the assistant") | |
search_enabled: bool = Field(default=True, description="Whether to enable search tools") | |
next_stage: str = Field(default="", description="The next stage to execute, decided by the guidance node.") | |
tool_call_required: bool = Field(default=False, description="Whether a tool should be called from brainstorming.") | |
loop_brainstorming: bool = Field(default=False, description="Whether to loop back to brainstorming for further iteration.") | |
# Completion flags for each stage | |
idea_complete: bool = Field(default=False) | |
brainstorming_complete: bool = Field(default=False) | |
planning_complete: bool = Field(default=False) | |
drawing_complete: bool = Field(default=False) | |
product_searching_complete: bool = Field(default=False) | |
purchasing_complete: bool = Field(default=False) | |
async def guidance_node(state: GraphProcessingState, config=None): | |
print("\n--- Guidance Node (Debug via print) ---\n") # Added a newline for clarity | |
print(f"Prompt: {state.prompt}") | |
print(f"Prompt: {state.prompt}") | |
# print(f"Message: {state.messages}") | |
print(f"Tools Enabled: {state.tools_enabled}") | |
print(f"Search Enabled: {state.search_enabled}") | |
for message in state.messages: | |
print(f'\ncomplete message', message) | |
if isinstance(message, HumanMessage): | |
print(f"Human: {message.content}\n") | |
elif isinstance(message, AIMessage): | |
# Check if content is non-empty | |
if message.content: | |
# If content is a list (e.g., list of dicts), extract text | |
if isinstance(message.content, list): | |
texts = [item.get('text', '') for item in message.content if isinstance(item, dict) and 'text' in item] | |
if texts: | |
print(f"AI: {' '.join(texts)}\n") | |
elif isinstance(message.content, str): | |
print(f"AI: {message.content}") | |
elif isinstance(message, SystemMessage): | |
print(f"System: {message.content}\n") | |
elif isinstance(message, ToolMessage): | |
print(f"Tool: {message.content}\n") | |
# Log boolean completion flags | |
# Define the order of stages | |
stage_order = ["brainstorming", "planning", "drawing", "product_searching", "purchasing"] | |
# Identify completed and incomplete stages | |
completed = [stage for stage in stage_order if getattr(state, f"{stage}_complete", False)] | |
incomplete = [stage for stage in stage_order if not getattr(state, f"{stage}_complete", False)] | |
# Determine the next stage | |
if not incomplete: | |
# All stages are complete | |
return { | |
"messages": [AIMessage(content="All DIY project stages are complete!")], | |
"next_stage": "end_project", | |
"pending_approval_stage": None, | |
} | |
else: | |
# Set the next stage to the first incomplete stage | |
next_stage = incomplete[0] | |
print(f"Next Stage: {state.next_stage}") | |
print("\n--- End of Guidance Node Debug ---\n") | |
return { | |
"messages": [], | |
"next_stage": next_stage, | |
"pending_approval_stage": None, | |
} | |
def guidance_routing(state: GraphProcessingState) -> str: | |
print("\n--- Guidance Routing (Debug via print) ---\n") # Added a newline for clarity | |
print(f"Next Stage: {state.next_stage}\n") | |
print(f"Brainstorming complete: {state.brainstorming_complete}") | |
print(f"3D prompt: {state.planning_complete}") | |
print(f"Drwaing 3d model: {state.drawing_complete}") | |
print(f"Finding products: {state.product_searching_complete}\n") | |
next_stage = state.next_stage | |
if next_stage == "brainstorming": | |
return "brainstorming_node" | |
elif next_stage == "planning": | |
return "generate_3d_node" | |
# return "prompt_planning_node" | |
elif next_stage == "drawing": | |
print('\n may day may day may day may day may day') | |
# return "generate_3d_node" | |
elif next_stage == "product_searching": | |
print('\n may day may day may day may day may day') | |
# return "drawing_node" | |
# elif next_stage == "product_searching": | |
# return "product_searching" | |
# elif next_stage == "purchasing": | |
# return "purchasing_node" | |
return END | |
async def brainstorming_node(state: GraphProcessingState, config=None): | |
print("\n--- brainstorming Node (Debug via print) ---\n") # Added a newline for clarity | |
print(f"Tools Enabled: {state.tools_enabled}") | |
print(f"Search Enabled: {state.search_enabled}") | |
print(f"Next Stage: {state.next_stage}") | |
# Log boolean completion flags | |
print(f"is Brainstorming Complete: {state.brainstorming_complete}") | |
# Check if model is available | |
if not model: | |
return {"messages": [AIMessage(content="Model not available for brainstorming.")]} | |
# Filter out messages with empty content | |
filtered_messages = [ | |
message for message in state.messages | |
if isinstance(message, (HumanMessage, AIMessage, SystemMessage, ToolMessage)) and message.content | |
] | |
# Ensure there is at least one message with content | |
if not filtered_messages: | |
filtered_messages.append(AIMessage(content="No valid messages provided.")) | |
stage_order = ["brainstorming", "planning", "drawing", "product_searching", "purchasing"] | |
completed = [stage for stage in stage_order if getattr(state, f"{stage}_complete", False)] | |
incomplete = [stage for stage in stage_order if not getattr(state, f"{stage}_complete", False)] | |
if not incomplete: | |
print("All stages complete!") | |
# Handle case where all stages are complete | |
# You might want to return a message and end, or set proposed_next_stage to a special value | |
ai_all_complete_msg = AIMessage(content="All DIY project stages are complete!") | |
return { | |
"messages": current_messages + [ai_all_complete_msg], | |
"next_stage": "end_project", # Or None, or a final summary node | |
"pending_approval_stage": None, | |
} | |
else: | |
# THIS LINE DEFINES THE VARIABLE | |
proposed_next_stage = incomplete[0] | |
guidance_prompt_text = ( | |
""" | |
You are a creative and helpful AI assistant acting as a **DIY Project Brainstorming Facilitator**. Your primary goal is to collaborate with the user to finalize **ONE specific, viable DIY project idea** as efficiently and quickly as possible. | |
⚠️ If you identify any idea that clearly meets ALL **Critical Criteria** and the user appears positive or neutral, you must finalize it **immediately** — even if it is the very first idea proposed. Do NOT delay finalization by over-analyzing or seeking excessive confirmation. | |
> Your goal is NOT to perfect the idea or generate many options. Instead, **converge rapidly on a “good enough” final idea** that the user can confidently pursue. | |
**Critical Criteria for the Final DIY Project Idea (MUST be met):** | |
1. **Buildable:** Achievable by an average person with basic DIY skills. | |
2. **Common Materials/Tools:** Uses only materials (e.g., wood, screws, glue, paint, fabric, cardboard) and basic hand tools (e.g., screwdrivers, hammers, saws, drills) commonly available in general hardware stores, craft stores, or supermarkets worldwide. | |
3. **Avoid Specializations:** Explicitly AVOID projects requiring specialized electronic components, 3D printing, specific brand items not universally available, or complex machinery. | |
4. **Tangible Product:** The final result must be a physical, tangible item. | |
**Your Process for Each Brainstorming Interaction Cycle:** | |
1. **THOUGHT:** | |
* Clearly state your understanding of the user’s current input or the brainstorming state (e.g., "User is seeking initial ideas," "User proposed an idea needing refinement," "We are close to finalizing an idea."). | |
* Outline your plan for this turn: | |
* Engage with the user's latest input. | |
* Propose or refine an idea to meet **Critical Criteria**. | |
* Decide if you need to ask the user a clarifying question. | |
* **Tool Identification (`human_assistance`):** If a question is needed for: | |
* Understanding user interests or skill level. | |
* Clarifying preferences. | |
* Getting feedback on a proposed idea. | |
* Refining the idea to meet criteria. | |
Clearly state your intention to use the `human_assistance` tool with the exact question as the `query`. | |
* **Idea Finalization Check:** | |
* Immediately check if the current idea satisfies ALL **Critical Criteria**. | |
* If yes, and the user shows no objection, **finalize immediately without waiting for multiple iterations**. | |
* Prioritize minimal iterations: finalize an idea at the earliest confident point. | |
* Remember: **good enough is final enough** — do not delay finalization. | |
2. **TOOL USE (`human_assistance` - If Needed):** | |
* If a question is necessary, invoke `human_assistance` with your formulated query. | |
* (Note: The system will execute your tool call.) | |
3. **RESPONSE SYNTHESIS / IDEA FINALIZATION:** | |
* After any tool use (or if no tool was needed), synthesize your response. | |
* **If an idea is finalized:** respond *only* with the exact phrase: | |
`IDEA FINALIZED: [Name of the Idea]` | |
(e.g., `IDEA FINALIZED: Simple Wooden Spice Rack`) | |
Do NOT add extra text. | |
This signals the end of brainstorming. | |
* **If brainstorming continues:** | |
* Provide engaging suggestions or refinements. | |
* If you just called `human_assistance`, your main output may be the tool call or a brief lead-in. | |
* Await user response before proceeding. | |
**General Guidelines:** | |
* **Collaborative & Iterative:** Work *with* the user; this is a conversation. | |
* **Criteria Focused:** Gently guide ideas toward ALL **Critical Criteria**. | |
* **One Main Idea at a Time:** Avoid confusion by focusing discussion on a single project idea or comparable alternatives. | |
* **User-Centric:** Help the user find a project *they* will be happy with. | |
* **Clarity:** Be clear and direct. | |
* **Tool Protocol:** Use `human_assistance` correctly; do not answer your own questions. | |
* **Rapid Convergence:** Prioritize **finalizing quickly** once criteria are met; avoid endless brainstorming or perfectionism. | |
""" | |
) | |
if state.prompt: | |
final_prompt = "\n".join([ guidance_prompt_text, state.prompt, ASSISTANT_SYSTEM_PROMPT_BASE]) | |
else: | |
final_prompt = "\n".join([ guidance_prompt_text, ASSISTANT_SYSTEM_PROMPT_BASE]) | |
prompt = ChatPromptTemplate.from_messages( | |
[ | |
("system", final_prompt), | |
MessagesPlaceholder(variable_name="messages"), | |
] | |
) | |
# Tools allowed for brainstorming | |
node_tools = [human_assistance] | |
if state.search_enabled and tavily_search_tool: # only add search tool if enabled and initialized | |
node_tools.append(tavily_search_tool) | |
mistraltools = [ | |
{ | |
"type": "function", | |
"function": { | |
"name": "human_assistance", | |
"description": "Ask a question from the user", | |
"parameters": { | |
"type": "object", | |
"properties": { | |
"query": { | |
"type": "string", | |
"query": "The transaction id.", | |
} | |
}, | |
"required": ["query"], | |
}, | |
}, | |
}, | |
{ | |
"type": "function", | |
"function": { | |
"name": "finalize_idea", | |
"description": "Handles finalized ideas. Saves or dispatches the confirmed idea for the next steps. but make sure you give your response with key word IDEA FINALIZED", | |
"parameters": { | |
"type": "object", | |
"properties": { | |
"idea_name": { | |
"type": "string", | |
"description": "The name of the finalized DIY idea.", | |
} | |
}, | |
"required": ["idea_name"] | |
} | |
} | |
} | |
] | |
llm = init_chat_model("mistral-large-latest", model_provider="mistralai") | |
llm_with_tools = llm.bind_tools(mistraltools) | |
chain = prompt | llm_with_tools | |
openai_messages = convert_to_openai_messages(state.messages) | |
print('open ai format messaage', openai_messages) | |
openai_messages_with_prompt = [ | |
{"role": "system", "content": final_prompt}, # your guidance prompt | |
*openai_messages # history you’ve already converted | |
] | |
print('open ai format messaage', openai_messages_with_prompt) | |
mistralmodel = "mistral-large-latest" | |
# Pass filtered messages to the chain | |
try: | |
# response = await chain.ainvoke({"messages": filtered_messages}, config=config) | |
response = client.chat.complete( | |
model = mistralmodel, | |
messages = openai_messages_with_prompt, | |
tools = mistraltools, | |
tool_choice = "any", | |
parallel_tool_calls = False, | |
) | |
mistral_message = response.choices[0].message | |
tool_call = response.choices[0].message.tool_calls[0] | |
function_name = tool_call.function.name | |
function_params = json.loads(tool_call.function.arguments) | |
ai_message = AIMessage( | |
content=mistral_message.content or "", # Use empty string if blank | |
additional_kwargs={ | |
"tool_calls": [ | |
{ | |
"id": tool_call.id, | |
"function": { | |
"name": tool_call.function.name, | |
"arguments": tool_call.function.arguments, | |
}, | |
"type": "function", # Add this if your chain expects it | |
} | |
] | |
} | |
) | |
updates = { | |
"messages": [ai_message], | |
"tool_calls": [ | |
{ | |
"name": function_name, | |
"arguments": function_params, | |
} | |
], | |
"next": function_name, | |
} | |
print("\nfunction_name: ", function_name, "\nfunction_params: ", function_params) | |
print(' 🔍 response from brainstorm', updates) | |
if function_name == "finalize_idea": | |
print('came in 🌴🌴🌴🌴🌴') | |
state.brainstorming_complete = True | |
updates["brainstorming_complete"] = True | |
if isinstance(response, AIMessage) and response.content: | |
print(' 💀💀 came inside the loop', response) | |
if isinstance(response.content, str): | |
content = response.content.strip() | |
elif isinstance(response.content, list): | |
texts = [item.get("text", "") for item in response.content if isinstance(item, dict)] | |
content = " ".join(texts).strip() | |
else: | |
content = str(response.content).strip() | |
print('content for idea finalizing:', content) | |
if "finalize_idea:" in content: # Use 'in' instead of 'startswith' | |
print('✅ final idea') | |
updates.update({ | |
"brainstorming_complete": True, | |
"tool_call_required": False, | |
"loop_brainstorming": False, | |
}) | |
return updates | |
else: | |
# tool_calls = getattr(response, "tool_calls", None) | |
if tool_call: | |
print('🛠️ tool call requested') | |
updates.update({ | |
"tool_call_required": True, | |
"loop_brainstorming": False, | |
}) | |
if tool_call: | |
tool_call = response.choices[0].message.tool_calls[0] | |
function_name = tool_call.function.name | |
function_params = json.loads(tool_call.function.arguments) | |
print("\nfunction_name: ", function_name, "\nfunction_params: ", function_params) | |
# for tool_call in response.tool_calls: | |
# tool_name = tool_call['name'] | |
# if tool_name == "human_assistance": | |
# query = tool_call['args']['query'] | |
# print(f"Human input needed: {query}") | |
# for tool_call in tool_calls: | |
# if isinstance(tool_call, dict) and 'name' in tool_call and 'args' in tool_call: | |
# print(f"🔧 Tool Call (Dict): {tool_call.get('name')}, Args: {tool_call.get('args')}") | |
# else: | |
# print(f"🔧 Unknown tool_call format: {tool_call}") | |
else: | |
print('💬 keep brainstorming') | |
updates.update({ | |
"tool_call_required": False, | |
"loop_brainstorming": True, | |
}) | |
print(f"Brainstorming continues: {content}") | |
else: | |
# If no proper response, keep looping brainstorming | |
updates["tool_call_required"] = False | |
updates["loop_brainstorming"] = True | |
print("\n--- End Brainstorming Node Debug ---\n") | |
return updates | |
except Exception as e: | |
print(f"Error in guidance node: {e}") | |
return { | |
"messages": [AIMessage(content="Error in guidance node.")], | |
"next_stage": "brainstorming" | |
} | |
def brainstorming_routing(state: GraphProcessingState) -> str: | |
print("\n--- brainstorming_routing Edge (Debug via print) ---") # Added a newline for clarity | |
print(f"Prompt: {state.prompt}") | |
# print(f"Message: {state.messages}") | |
print(f"Tools called: {state.tool_call_required}") | |
print(f"Search Enabled: {state.search_enabled}") | |
print(f"Next Stage: {state.next_stage}") | |
# Log boolean completion flags | |
print(f"Idea Complete: {state.idea_complete}") | |
print(f"Brainstorming Complete: {state.brainstorming_complete}") | |
print(f"Planning Complete: {state.planning_complete}") | |
print(f"Drawing Complete: {state.drawing_complete}") | |
print(f"Product Searching Complete: {state.product_searching_complete}") | |
print(f"Purchasing Complete: {state.purchasing_complete}") | |
print("--- End Guidance Node Debug ---") # Added for clarity | |
if state.tool_call_required: | |
print('calling tools for brainstorming') | |
return "tools" | |
elif state.loop_brainstorming: | |
print('returning back to brainstorming at the route') | |
return "brainstorming_node" | |
else: | |
print('all good in brainstorming route going back to guidance') | |
return "guidance_node" | |
# def route_brainstorming(state): | |
# if state.get("tool_call_required"): | |
# return "tools" | |
# else: | |
# return "guidance_node" | |
async def prompt_planning_node(state: GraphProcessingState, config=None): | |
print("\n--- prompt_planning Node (Debug) ---\n") | |
print(f"Tools Enabled: {state.tools_enabled}") | |
print(f"Search Enabled: {state.search_enabled}") | |
print(f"Planning Complete: {state.planning_complete}") | |
# Ensure we have a model | |
if not model: | |
return {"messages": [AIMessage(content="Model not available for planning.")]} | |
# Filter out empty messages | |
filtered_messages = [ | |
msg for msg in state.messages | |
if isinstance(msg, (HumanMessage, AIMessage, SystemMessage, ToolMessage)) and msg.content | |
] | |
if not filtered_messages: | |
filtered_messages.append(AIMessage(content="No valid messages provided.")) | |
# Define the system prompt for planning | |
guidance_prompt_text = """ | |
You are a creative and helpful AI assistant acting as a **DIY Project Brainstorming & 3D-Prompt Generator**. Your mission is to collaborate with the user to: | |
1. Brainstorm and refine one specific, viable DIY project idea. | |
2. Identify the single key component from that idea that should be 3D-modeled. | |
3. Produce a final, precise text prompt for an OpenAI 3D-generation endpoint. | |
--- | |
**Critical Criteria for the DIY Project** (must be met): | |
• Buildable by an average person with only basic DIY skills. | |
• Uses common materials/tools (e.g., wood, screws, glue, paint; hammer, saw, drill). | |
• No specialized electronics, 3D printers, or proprietary parts. | |
• Results in a tangible, physical item. | |
--- | |
**Available Tools** | |
• human_assistance – ask the user clarifying questions. | |
• (optional) your project-specific search tool – look up inspiration or standard dimensions if needed. | |
--- | |
**When the DIY idea is fully detailed and meets all criteria, output exactly and only:** | |
ACCURATE PROMPT FOR MODEL GENERATING: [Your final single-paragraph prompt here] | |
""" | |
# Build final prompt | |
if state.prompt: | |
final_prompt = "\n".join([guidance_prompt_text, state.prompt, ASSISTANT_SYSTEM_PROMPT_BASE]) | |
else: | |
final_prompt = "\n".join([guidance_prompt_text, ASSISTANT_SYSTEM_PROMPT_BASE]) | |
prompt = ChatPromptTemplate.from_messages([ | |
("system", final_prompt), | |
MessagesPlaceholder(variable_name="messages"), | |
]) | |
# Bind tools | |
node_tools = [human_assistance] | |
if state.search_enabled and tavily_search_tool: | |
node_tools.append(tavily_search_tool) | |
llm_with_tools = prompt_planning_model.bind_tools(node_tools) | |
chain = prompt | llm_with_tools | |
try: | |
response = await chain.ainvoke({"messages": filtered_messages}, config=config) | |
# Log any required human assistance query | |
if hasattr(response, "tool_calls"): | |
for call in response.tool_calls: | |
if call.get("name") == "human_assistance": | |
print(f"Human input needed: {call['args']['query']}") | |
updates = {"messages": [response]} | |
# Extract response text | |
content = "" | |
if isinstance(response.content, str): | |
content = response.content.strip() | |
elif isinstance(response.content, list): | |
content = " ".join(item.get("text","") for item in response.content if isinstance(item, dict)).strip() | |
# Check for finalization signal | |
if content.startswith("ACCURATE PROMPT FOR MODEL GENERATING:"): | |
updates.update({ | |
"planning_complete": True, | |
"tool_call_required": False, | |
"loop_planning": False, | |
}) | |
else: | |
# Check if a tool call was requested | |
if getattr(response, "tool_calls", None): | |
updates.update({ | |
"tool_call_required": True, | |
"loop_planning": False, | |
}) | |
else: | |
updates.update({ | |
"tool_call_required": False, | |
"loop_planning": True, | |
}) | |
print("\n--- End prompt_planning Node Debug ---\n") | |
return updates | |
except Exception as e: | |
print(f"Error in prompt_planning node: {e}") | |
return { | |
"messages": [AIMessage(content="Error in prompt_planning node.")], | |
"next_stage": state.next_stage or "planning" | |
} | |
async def generate_3d_node(state: GraphProcessingState, config=None): | |
print("\n-🚀🚀🚀-- Generate 3D via API Node ---🚀🚀🚀\n") | |
# 1. Get the image URL | |
# For now, using a hardcoded URL as requested for testing. | |
# In a real scenario, you might get this from the state: | |
# image_url = state.get("image_url_for_3d") | |
# if not image_url: | |
# print("No image_url_for_3d found in state.") | |
# return {"messages": [AIMessage(content="No image URL found for 3D generation.")]} | |
hardcoded_image_url = "https://images.unsplash.com/photo-1748973750733-d037dded16dd?q=80&w=1974&auto=format&fit=crop&ixlib=rb-4.1.0&ixid=M3wxMjA3fDB8MHxwaG90by1wYWdlfHx8fGVufDB8fHx8fA%3D%3D" | |
print(f"Using hardcoded image_url: {hardcoded_image_url}") | |
# 2. Define API endpoint and parameters | |
api_base_url = "https://wishwa-code--trellis-3d-model-generate-dev.modal.run/" | |
params = { | |
"image_url": hardcoded_image_url, | |
"simplify": "0.95", | |
"texture_size": "1024", | |
"sparse_sampling_steps": "12", | |
"sparse_sampling_cfg": "7.5", | |
"slat_sampling_steps": "12", | |
"slat_sampling_cfg": "3", | |
"seed": "42", | |
"output_format": "glb" | |
} | |
# Create a directory to store generated models if it doesn't exist | |
output_dir = "generated_3d_models" | |
os.makedirs(output_dir, exist_ok=True) | |
# 3. Attempt generation with retries | |
for attempt in range(1, 2): | |
print(f"Attempt {attempt} to call 3D generation API...") | |
try: | |
# Note: The API call can take a long time (1.5 mins in your curl example) | |
# Ensure your HTTP client timeout is sufficient. | |
# httpx default timeout is 5 seconds, which is too short. | |
async with httpx.AsyncClient(timeout=120.0) as client: # Timeout set to 120 seconds | |
response = await client.get(api_base_url, params=params) | |
response.raise_for_status() # Raises an HTTPStatusError for 4XX/5XX responses | |
# Successfully got a response | |
if response.status_code == 200: | |
# Assuming the response body is the .glb file content | |
file_name = f"model_{uuid.uuid4()}.glb" | |
file_path = os.path.join(output_dir, file_name) | |
with open(file_path, "wb") as f: | |
f.write(response.content) | |
print(f"Success: 3D model saved to {file_path}") | |
return { | |
"messages": [AIMessage(content=f"3D object generation successful: {file_path}")], | |
"generate_3d_complete": True, | |
"three_d_model_path": file_path, | |
"next_stage": state.get("next_stage") or 'end' # Use .get for safer access | |
} | |
else: | |
# This case might not be reached if raise_for_status() is used effectively, | |
# but good for explicit handling. | |
error_message = f"API returned status {response.status_code}: {response.text}" | |
print(error_message) | |
if attempt == 3: # Last attempt | |
return {"messages": [AIMessage(content=f"Failed to generate 3D object. Last error: {error_message}")]} | |
except httpx.HTTPStatusError as e: | |
error_message = f"HTTP error occurred: {e.response.status_code} - {e.response.text}" | |
print(error_message) | |
if attempt == 3: | |
return {"messages": [AIMessage(content=f"Failed to generate 3D object after 3 attempts. Last HTTP error: {error_message}")]} | |
except httpx.RequestError as e: # Catches network errors, timeout errors etc. | |
error_message = f"Request error occurred: {str(e)}" | |
print(error_message) | |
if attempt == 3: | |
return {"messages": [AIMessage(content=f"Failed to generate 3D object after 3 attempts. Last request error: {error_message}")]} | |
except Exception as e: | |
error_message = f"An unexpected error occurred: {str(e)}" | |
print(error_message) | |
if attempt == 3: | |
return {"messages": [AIMessage(content=f"Failed to generate 3D object after 3 attempts. Last unexpected error: {error_message}")]} | |
if attempt < 2: | |
print("Retrying...") | |
else: | |
print("Max retries reached.") | |
# Failed after retries (this path should ideally be covered by returns in the loop) | |
return {"messages": [AIMessage(content="Failed to generate a valid 3D object after 3 attempts.")]} | |
def define_workflow() -> CompiledStateGraph: | |
"""Defines the workflow graph""" | |
# Initialize the graph | |
workflow = StateGraph(GraphProcessingState) | |
# Add nodes | |
workflow.add_node("tools", DebugToolNode(tools)) | |
workflow.add_node("guidance_node", guidance_node) | |
workflow.add_node("brainstorming_node", brainstorming_node) | |
workflow.add_node("prompt_planning_node", prompt_planning_node) | |
workflow.add_node("generate_3d_node", generate_3d_node) | |
# workflow.add_node("planning_node", planning_node) | |
# Edges | |
workflow.add_conditional_edges( | |
"guidance_node", | |
guidance_routing, | |
{ | |
"brainstorming_node" : "brainstorming_node", | |
"prompt_planning_node" : "prompt_planning_node", | |
"generate_3d_node" : "generate_3d_node" | |
} | |
) | |
workflow.add_conditional_edges( | |
"brainstorming_node", | |
tools_condition, | |
) | |
workflow.add_conditional_edges( | |
"prompt_planning_node", | |
tools_condition, | |
) | |
workflow.add_edge("tools", "guidance_node") | |
workflow.add_edge("brainstorming_node", "guidance_node") | |
workflow.add_edge("prompt_planning_node", "guidance_node") | |
workflow.add_edge("generate_3d_node", "guidance_node") | |
# workflow.add_conditional_edges( | |
# "guidance_node", # The source node | |
# custom_route_after_guidance, # Your custom condition function | |
# { | |
# # "Path name": "Destination node name" | |
# "execute_tools": "tools", # If function returns "execute_tools" | |
# "proceed_to_next_stage": "planning_node" # If function returns "proceed_to_next_stage" | |
# # Or this could be another router, or END | |
# } | |
# ) | |
# workflow.add_conditional_edges("guidance_node", guidance_routing) | |
# workflow.add_conditional_edges("brainstorming_node", brainstorming_routing) | |
# # Set end nodes | |
workflow.set_entry_point("guidance_node") | |
# workflow.set_finish_point("assistant_node") | |
compiled_graph = workflow.compile(checkpointer=memory) | |
try: | |
img_bytes = compiled_graph.get_graph().draw_mermaid_png() | |
with open("graph.png", "wb") as f: | |
f.write(img_bytes) | |
print("Graph image saved as graph.png") | |
except Exception as e: | |
print("Can't print the graph:") | |
print(e) | |
return compiled_graph | |
graph = define_workflow() | |
# async def assistant_node(state: GraphProcessingState, config=None): | |
# print("\n--- Assistance Node (Debug via print) ---") # Added a newline for clarity | |
# print(f"Prompt: {state.prompt}") | |
# print(f"Tools Enabled: {state.tools_enabled}") | |
# print(f"Search Enabled: {state.search_enabled}") | |
# print(f"Next Stage: {state.next_stage}") | |
# # Log boolean completion flags | |
# print(f"Idea Complete: {state.idea_complete}") | |
# print(f"Brainstorming Complete: {state.brainstorming_complete}") | |
# print(f"Planning Complete: {state.planning_complete}") | |
# print(f"Drawing Complete: {state.drawing_complete}") | |
# print(f"Product Searching Complete: {state.product_searching_complete}") | |
# print(f"Purchasing Complete: {state.purchasing_complete}") | |
# print("--- End Guidance Node Debug ---") # Added for clarity | |
# print(f"\nMessage: {state.messages}") | |
# assistant_tools = [] | |
# if state.tools_enabled.get("download_website_text", True): | |
# assistant_tools.append(download_website_text) | |
# if search_enabled and state.tools_enabled.get("tavily_search_results_json", True): | |
# assistant_tools.append(tavily_search_tool) | |
# assistant_model = model.bind_tools(assistant_tools) | |
# if state.prompt: | |
# final_prompt = "\n".join([state.prompt, ASSISTANT_SYSTEM_PROMPT_BASE]) | |
# else: | |
# final_prompt = ASSISTANT_SYSTEM_PROMPT_BASE | |
# prompt = ChatPromptTemplate.from_messages( | |
# [ | |
# ("system", final_prompt), | |
# MessagesPlaceholder(variable_name="messages"), | |
# ] | |
# ) | |
# chain = prompt | assistant_model | |
# response = await chain.ainvoke({"messages": state.messages}, config=config) | |
# for msg in response: | |
# if isinstance(msg, HumanMessage): | |
# print("Human:", msg.content) | |
# elif isinstance(msg, AIMessage): | |
# if isinstance(msg.content, list): | |
# ai_texts = [part.get("text", "") for part in msg.content if isinstance(part, dict)] | |
# print("AI:", " ".join(ai_texts)) | |
# else: | |
# print("AI:", msg.content) | |
# idea_complete = evaluate_idea_completion(response) | |
# return { | |
# "messages": response, | |
# "idea_complete": idea_complete | |
# } | |
# # message = llm_with_tools.invoke(state["messages"]) | |
# # Because we will be interrupting during tool execution, | |
# # we disable parallel tool calling to avoid repeating any | |
# # tool invocations when we resume. | |
# assert len(response.tool_calls) <= 1 | |
# idea_complete = evaluate_idea_completion(response) | |
# return { | |
# "messages": response, | |
# "idea_complete": idea_complete | |
# } | |
# | |
# async def planning_node(state: GraphProcessingState, config=None): | |
# # Define the system prompt for planning | |
# planning_prompt = "Based on the user's idea, create a detailed step-by-step plan to build the DIY product." | |
# # Combine the planning prompt with any existing prompts | |
# if state.prompt: | |
# final_prompt = "\n".join([planning_prompt, state.prompt, ASSISTANT_SYSTEM_PROMPT_BASE]) | |
# else: | |
# final_prompt = "\n".join([planning_prompt, ASSISTANT_SYSTEM_PROMPT_BASE]) | |
# # Create the prompt template | |
# prompt = ChatPromptTemplate.from_messages( | |
# [ | |
# ("system", final_prompt), | |
# MessagesPlaceholder(variable_name="messages"), | |
# ] | |
# ) | |
# # Bind tools if necessary | |
# assistant_tools = [] | |
# if state.tools_enabled.get("download_website_text", True): | |
# assistant_tools.append(download_website_text) | |
# if search_enabled and state.tools_enabled.get("tavily_search_results_json", True): | |
# assistant_tools.append(tavily_search_tool) | |
# assistant_model = model.bind_tools(assistant_tools) | |
# # Create the chain and invoke it | |
# chain = prompt | assistant_model | |
# response = await chain.ainvoke({"messages": state.messages}, config=config) | |
# return { | |
# "messages": response | |
# } | |
# async def guidance_node(state: GraphProcessingState, config=None): | |
# print("\n--- Guidance Node (Debug via print) ---") | |
# print(f"Prompt: {state.prompt}") | |
# for message in state.messages: | |
# if isinstance(message, HumanMessage): | |
# print(f"Human: {message.content}") | |
# elif isinstance(message, AIMessage): | |
# if message.content: | |
# if isinstance(message.content, list): | |
# texts = [item.get('text', '') for item in message.content if isinstance(item, dict) and 'text' in item] | |
# if texts: | |
# print(f"AI: {' '.join(texts)}") | |
# elif isinstance(message.content, str): | |
# print(f"AI: {message.content}") | |
# elif isinstance(message, SystemMessage): | |
# print(f"System: {message.content}") | |
# elif isinstance(message, ToolMessage): | |
# print(f"Tool: {message.content}") | |
# print(f"Tools Enabled: {state.tools_enabled}") | |
# print(f"Search Enabled: {state.search_enabled}") | |
# print(f"Next Stage: {state.next_stage}") | |
# print(f"Brainstorming Complete: {state.brainstorming_complete}") | |
# guidance_node.count = getattr(guidance_node, 'count', 0) + 1 | |
# print('\nGuidance Node called count', guidance_node.count) | |
# print("\n--- End Guidance Node Debug ---") | |
# stage_order = ["brainstorming", "planning", "drawing", "product_searching", "purchasing"] | |
# completed = [stage for stage in stage_order if getattr(state, f"{stage}_complete", False)] | |
# incomplete = [stage for stage in stage_order if not getattr(state, f"{stage}_complete", False)] | |
# if not incomplete: | |
# print("All stages complete!") | |
# # Handle case where all stages are complete | |
# # You might want to return a message and end, or set proposed_next_stage to a special value | |
# ai_all_complete_msg = AIMessage(content="All DIY project stages are complete!") | |
# return { | |
# "messages": current_messages + [ai_all_complete_msg], | |
# "next_stage": "end_project", # Or None, or a final summary node | |
# "pending_approval_stage": None, | |
# } | |
# else: | |
# # THIS LINE DEFINES THE VARIABLE | |
# proposed_next_stage = incomplete[0] | |
# print(f"Proposed next stage: {proposed_next_stage}") | |
# status_summary = f"Completed stages: {completed}\nIncomplete stages: {incomplete}" | |
# guidance_prompt_text = ( | |
# "You are the Guiding Assistant for a DIY project. Your primary responsibility is to determine the next logical step " | |
# "and then **obtain the user's explicit approval** before proceeding.\n\n" | |
# f"CURRENT PROJECT STATUS:\n{status_summary}\n\n" | |
# f"Based on the status, the most logical next stage appears to be: **'{proposed_next_stage}'**.\n\n" | |
# "YOUR TASK:\n" | |
# f"1. Formulate a clear and concise question for the user, asking if they agree to proceed to the **'{proposed_next_stage}'** stage. For example: 'It looks like '{proposed_next_stage}' is next. Shall we proceed with that?' or 'Are you ready to move on to {proposed_next_stage}?'\n" | |
# "2. **You MUST use the 'human_assistance' tool to ask this question.** Do not answer directly. Invoke the tool with your question.\n" | |
# "Example of tool usage (though you don't write this, you *call* the tool):\n" | |
# "Tool Call: human_assistance(query='The next stage is planning. Do you want to proceed with planning?')\n\n" | |
# "Consider the user's most recent message if it provides any preference." | |
# ) | |
# if state.prompt: | |
# final_prompt = "\n".join([guidance_prompt_text, state.prompt, ASSISTANT_SYSTEM_PROMPT_BASE]) | |
# else: | |
# final_prompt = "\n".join([guidance_prompt_text, ASSISTANT_SYSTEM_PROMPT_BASE]) | |
# prompt = ChatPromptTemplate.from_messages( | |
# [ | |
# ("system", final_prompt), | |
# MessagesPlaceholder(variable_name="messages"), | |
# ] | |
# ) | |
# assistant_model = model.bind_tools([human_assistance]) | |
# chain = prompt | assistant_model | |
# try: | |
# response = await chain.ainvoke({"messages": state.messages}, config=config) | |
# for msg in response: | |
# if isinstance(msg, HumanMessage): | |
# print("Human:", msg.content) | |
# elif isinstance(msg, AIMessage): | |
# if isinstance(msg.content, list): | |
# ai_texts = [part.get("text", "") for part in msg.content if isinstance(part, dict)] | |
# print("AI:", " ".join(ai_texts)) | |
# else: | |
# print("AI:", msg.content) | |
# # Check for tool calls in the response | |
# if hasattr(response, "tool_calls"): | |
# for tool_call in response.tool_calls: | |
# tool_name = tool_call['name'] | |
# if tool_name == "human_assistance": | |
# query = tool_call['args']['query'] | |
# print(f"Human input needed: {query}") | |
# # Handle human assistance tool call | |
# # You can pause execution and wait for user input here | |
# return { | |
# "messages": [response], | |
# "next_stage": incomplete[0] if incomplete else "brainstorming" | |
# } | |
# except Exception as e: | |
# print(f"Error in guidance node: {e}") | |
# return { | |
# "messages": [AIMessage(content="Error in guidance node.")], | |
# "next_stage": "brainstorming" | |
# } |