Spaces:
Runtime error
Runtime error
File size: 56,483 Bytes
807e22d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 |
import logging
import os
import uuid
import aiohttp
import json
import httpx
from typing import Annotated
from typing import TypedDict, List, Optional, Literal
from typing_extensions import TypedDict
from pydantic import BaseModel, Field
from trafilatura import extract
from langchain_core.messages import AIMessage, HumanMessage, AnyMessage, ToolCall, SystemMessage, ToolMessage
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.tools import tool
from langchain_community.tools import TavilySearchResults
from langgraph.graph.state import CompiledStateGraph
from langgraph.graph import StateGraph, START, END, add_messages
from langgraph.prebuilt import ToolNode
from langgraph.prebuilt import ToolNode, tools_condition
from langgraph.checkpoint.memory import MemorySaver
from langgraph.types import Command, interrupt
from langchain_anthropic import ChatAnthropic
from langchain_openai import ChatOpenAI
from mistralai import Mistral
from langchain.chat_models import init_chat_model
from langchain_core.messages.utils import convert_to_openai_messages
class State(TypedDict):
messages: Annotated[list, add_messages]
class DebugToolNode(ToolNode):
async def invoke(self, state, config=None):
print("🛠️ ToolNode activated")
print(f"Available tools: {[tool.name for tool in self.tool_map.values()]}")
print(f"Tool calls in last message: {state.messages[-1].tool_calls}")
return await super().invoke(state, config)
logger = logging.getLogger(__name__)
ASSISTANT_SYSTEM_PROMPT_BASE = """"""
search_enabled = bool(os.environ.get("TAVILY_API_KEY"))
try:
with open('brainstorming_system_prompt.txt', 'r') as file:
brainstorming_system_prompt = file.read()
except FileNotFoundError:
print("File 'system_prompt.txt' not found!")
except Exception as e:
print(f"Error reading file: {e}")
def evaluate_idea_completion(response) -> bool:
"""
Evaluates whether the assistant's response indicates a complete DIY project idea.
You can customize the logic based on your specific criteria.
"""
# Example logic: Check if the response contains certain keywords
required_keywords = ["materials", "dimensions", "tools", "steps"]
# Determine the type of response and extract text accordingly
if isinstance(response, dict):
# If response is a dictionary, extract values and join them into a single string
response_text = ' '.join(str(value).lower() for value in response.values())
elif isinstance(response, str):
# If response is a string, convert it to lowercase
response_text = response.lower()
else:
# If response is of an unexpected type, convert it to string and lowercase
response_text = str(response).lower()
return all(keyword in response_text for keyword in required_keywords)
@tool
async def human_assistance(query: str) -> str:
"""Request assistance from a human."""
human_response = await interrupt({"query": query}) # async wait
return human_response["data"]
@tool
async def download_website_text(url: str) -> str:
"""Download the text from a website"""
try:
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
response.raise_for_status()
downloaded = await response.text()
result = extract(downloaded, include_formatting=True, include_links=True, output_format='json', with_metadata=True)
return result or "No text found on the website"
except Exception as e:
logger.error(f"Failed to download {url}: {str(e)}")
return f"Error retrieving website content: {str(e)}"
@tool
async def finalize_idea() -> str:
"""Marks the brainstorming phase as complete. This function does nothing else."""
return "Brainstorming finalized."
tools = [download_website_text, human_assistance,finalize_idea]
memory = MemorySaver()
if search_enabled:
tavily_search_tool = TavilySearchResults(
max_results=5,
search_depth="advanced",
include_answer=True,
include_raw_content=True,
)
tools.append(tavily_search_tool)
else:
print("TAVILY_API_KEY environment variable not found. Websearch disabled")
weak_model = ChatOpenAI(
model="gpt-4o",
temperature=0,
max_tokens=None,
timeout=None,
max_retries=2,
# api_key="...", # if you prefer to pass api key in directly instaed of using env vars
# base_url="...",
# organization="...",
# other params...
)
api_key = os.environ["MISTRAL_API_KEY"]
model = "mistral-large-latest"
client = Mistral(api_key=api_key)
# ChatAnthropic(
# model="claude-3-5-sonnet-20240620",
# temperature=0,
# max_tokens=1024,
# timeout=None,
# max_retries=2,
# # other params...
# )
search_enabled = bool(os.environ.get("TAVILY_API_KEY"))
if not os.environ.get("OPENAI_API_KEY"):
print('Open API key not found')
prompt_planning_model = ChatOpenAI(
model="gpt-4o",
temperature=0,
max_tokens=None,
timeout=None,
max_retries=2,
# api_key="...", # if you prefer to pass api key in directly instaed of using env vars
# base_url="...",
# organization="...",
# other params...
)
threed_object_gen_model = ChatOpenAI(
model="gpt-4o",
temperature=0,
max_tokens=None,
timeout=None,
max_retries=2,
# api_key="...", # if you prefer to pass api key in directly instaed of using env vars
# base_url="...",
# organization="...",
# other params...
)
model = weak_model
assistant_model = weak_model
class GraphProcessingState(BaseModel):
# user_input: str = Field(default_factory=str, description="The original user input")
messages: Annotated[list[AnyMessage], add_messages] = Field(default_factory=list)
prompt: str = Field(default_factory=str, description="The prompt to be used for the model")
tools_enabled: dict = Field(default_factory=dict, description="The tools enabled for the assistant")
search_enabled: bool = Field(default=True, description="Whether to enable search tools")
next_stage: str = Field(default="", description="The next stage to execute, decided by the guidance node.")
tool_call_required: bool = Field(default=False, description="Whether a tool should be called from brainstorming.")
loop_brainstorming: bool = Field(default=False, description="Whether to loop back to brainstorming for further iteration.")
# Completion flags for each stage
idea_complete: bool = Field(default=False)
brainstorming_complete: bool = Field(default=False)
planning_complete: bool = Field(default=False)
drawing_complete: bool = Field(default=False)
product_searching_complete: bool = Field(default=False)
purchasing_complete: bool = Field(default=False)
generated_image_url_from_dalle: str = Field(default="", description="The generated_image_url_from_dalle.")
async def guidance_node(state: GraphProcessingState, config=None):
# print(f"Prompt: {state.prompt}")
# print(f"Prompt: {state.prompt}")
# # print(f"Message: {state.messages}")
# print(f"Tools Enabled: {state.tools_enabled}")
# print(f"Search Enabled: {state.search_enabled}")
# for message in state.messages:
# print(f'\ncomplete message', message)
# if isinstance(message, HumanMessage):
# print(f"Human: {message.content}\n")
# elif isinstance(message, AIMessage):
# # Check if content is non-empty
# if message.content:
# # If content is a list (e.g., list of dicts), extract text
# if isinstance(message.content, list):
# texts = [item.get('text', '') for item in message.content if isinstance(item, dict) and 'text' in item]
# if texts:
# print(f"AI: {' '.join(texts)}\n")
# elif isinstance(message.content, str):
# print(f"AI: {message.content}")
# elif isinstance(message, SystemMessage):
# print(f"System: {message.content}\n")
# elif isinstance(message, ToolMessage):
# print(f"Tool: {message.content}\n")
print("\n🕵️♀️🕵️♀️ | start | progress checking nodee \n") # Added a newline for clarity
# print(f"Prompt: {state.prompt}\n")
if state.messages:
last_message = state.messages[-1]
if isinstance(last_message, HumanMessage):
print(f"🧑 Human: {last_message.content}\n")
elif isinstance(last_message, AIMessage):
if last_message.content:
if isinstance(last_message.content, list):
texts = [item.get('text', '') for item in last_message.content if isinstance(item, dict) and 'text' in item]
if texts:
print(f"🤖 AI: {' '.join(texts)}\n")
elif isinstance(last_message.content, str):
print(f"🤖 AI: {last_message.content}\n")
elif isinstance(last_message, SystemMessage):
print(f"⚙️ System: {last_message.content}\n")
elif isinstance(last_message, ToolMessage):
print(f"🛠️ Tool: {last_message.content}\n")
else:
print("\n(No messages found.)")
# Log boolean completion flags
# Define the order of stages
stage_order = ["brainstorming", "planning", "drawing", "product_searching", "purchasing"]
# Identify completed and incomplete stages
completed = [stage for stage in stage_order if getattr(state, f"{stage}_complete", False)]
incomplete = [stage for stage in stage_order if not getattr(state, f"{stage}_complete", False)]
# Determine the next stage
if not incomplete:
# All stages are complete
return {
"messages": [AIMessage(content="All DIY project stages are complete!")],
"next_stage": "end_project",
"pending_approval_stage": None,
}
else:
# Set the next stage to the first incomplete stage
next_stage = incomplete[0]
print(f"Next Stage: {state.next_stage}")
print("\n🕵️♀️🕵️♀️ | end | progress checking nodee \n") # Added a newline for clarity
return {
"messages": [],
"next_stage": next_stage,
"pending_approval_stage": None,
}
def guidance_routing(state: GraphProcessingState) -> str:
print("\n🔀🔀 Routing checkpoint 🔀🔀\n")
print(f"Next Stage: {state.next_stage}\n")
print(f"Brainstorming complete: {state.brainstorming_complete}")
print(f"Prompt planing: {state.planning_complete}")
print(f"Drwaing 3d model: {state.drawing_complete}")
print(f"Finding products: {state.product_searching_complete}\n")
next_stage = state.next_stage
if next_stage == "brainstorming":
return "brainstorming_node"
elif next_stage == "planning":
# return "generate_3d_node"
return "prompt_planning_node"
elif next_stage == "drawing":
return "generate_3d_node"
elif next_stage == "product_searching":
print('\n may day may day may day may day may day')
print(f"Prompt: {state.prompt}")
print(f"Prompt: {state.prompt}")
# print(f"Message: {state.messages}")
print(f"Tools Enabled: {state.tools_enabled}")
print(f"Search Enabled: {state.search_enabled}")
for message in state.messages:
print(f'\ncomplete message', message)
if isinstance(message, HumanMessage):
print(f"Human: {message.content}\n")
elif isinstance(message, AIMessage):
# Check if content is non-empty
if message.content:
# If content is a list (e.g., list of dicts), extract text
if isinstance(message.content, list):
texts = [item.get('text', '') for item in message.content if isinstance(item, dict) and 'text' in item]
if texts:
print(f"AI: {' '.join(texts)}\n")
elif isinstance(message.content, str):
print(f"AI: {message.content}")
elif isinstance(message, SystemMessage):
print(f"System: {message.content}\n")
elif isinstance(message, ToolMessage):
print(f"Tool: {message.content}\n")
# return "drawing_node"
# elif next_stage == "product_searching":
# return "product_searching"
# elif next_stage == "purchasing":
# return "purchasing_node"
return END
async def brainstorming_node(state: GraphProcessingState, config=None):
print("\n🧠🧠 | start | brainstorming Node \n") # Added a newline for clarity
# Check if model is available
if not model:
return {"messages": [AIMessage(content="Model not available for brainstorming.")]}
# Filter out messages with empty content
filtered_messages = [
message for message in state.messages
if isinstance(message, (HumanMessage, AIMessage, SystemMessage, ToolMessage)) and message.content
]
# Ensure there is at least one message with content
if not filtered_messages:
filtered_messages.append(AIMessage(content="No valid messages provided."))
stage_order = ["brainstorming", "planning", "drawing", "product_searching", "purchasing"]
completed = [stage for stage in stage_order if getattr(state, f"{stage}_complete", False)]
incomplete = [stage for stage in stage_order if not getattr(state, f"{stage}_complete", False)]
if not incomplete:
print("All stages complete!")
# Handle case where all stages are complete
# You might want to return a message and end, or set proposed_next_stage to a special value
ai_all_complete_msg = AIMessage(content="All DIY project stages are complete!")
return {
"messages": current_messages + [ai_all_complete_msg],
"next_stage": "end_project", # Or None, or a final summary node
"pending_approval_stage": None,
}
else:
# THIS LINE DEFINES THE VARIABLE
proposed_next_stage = incomplete[0]
guidance_prompt_text = (
"""
You are a warm, encouraging, and knowledgeable AI assistant, acting as a **Creative DIY Collaborator**. Your primary goal is to guide the user through a friendly and inspiring conversation to finalize **ONE specific, viable DIY project idea**. While we want to be efficient, the top priority is making the user feel heard, understood, and confident in their final choice.
⚠️ Your core directive remains speed and convergence: If you identify an idea that clearly meets ALL **Critical Criteria** and the user seems positive or neutral, you must suggest finalizing it **immediately**. Do NOT delay by offering too many alternatives once a solid candidate emerges. Your goal is to converge on a "good enough" idea the user is happy with, not to explore every possibility.
**Your Conversational Style & Strategy:**
1. **Be an Active Listener:** Start by acknowledging and validating the user's input. Show you understand their core desire (e.g., "That sounds like a fun goal! Creating a custom piece for your living room is always rewarding.").
2. **Ask Inspiring, Open-Ended Questions:** Instead of generic questions, make them feel personal and insightful.
* *Instead of:* "What do you want to build?"
* *Try:* "What part of your home are you dreaming of improving?" or "Are you thinking of a gift for someone special, or a project just for you?"
3. **Act as a Knowledgeable Guide:** When a user is unsure, proactively suggest appealing ideas based on their subtle clues. Connect their interests to tangible projects.
* *Example:* If the user mentions liking plants and having a small balcony, you could suggest: "That's great! We could think about a vertical herb garden to save space, or maybe some simple, stylish hanging macrame planters. Does either of those spark your interest?"
4. **Guide, Don't Just Gatekeep:** When an idea *almost* meets the criteria, don't just reject it. Gently guide it towards feasibility.
* *Example:* "A full-sized dining table might require some specialized tools. How about we adapt that idea into a beautiful, buildable coffee table or a set of side tables using similar techniques?"
**Critical Criteria for the Final DIY Project Idea (Your non-negotiable checklist):**
1. **Buildable:** Achievable by an average person with basic DIY skills.
2. **Common Materials/Tools:** Uses only materials (e.g., wood, screws, glue, paint, fabric, cardboard) and basic hand tools (e.g., screwdrivers, hammers, saws, drills) commonly available in general hardware stores, craft stores, or supermarkets worldwide.
3. **Avoid Specializations:** Explicitly AVOID projects requiring specialized electronic components, 3D printing, specific brand items not universally available, or complex machinery.
4. **Tangible Product:** The final result must be a physical, tangible item.
**Your Internal Process (How you think on each turn):**
1. **THOUGHT:**
* Clearly state your understanding of the user’s current input and conversational state.
* Outline your plan: Engage with their latest input using your **Conversational Style**. Propose or refine an idea to meet the **Critical Criteria**.
* **Tool Identification (`human_assistance`):** Decide if you need to ask a question. The question should be formulated according to the "Inspiring, Open-Ended Questions" principle. Clearly state your intention to use the `human_assistance` tool with the exact friendly and natural-sounding question as the `query`.
* **Idea Finalization Check:** Check if the current idea satisfies ALL **Critical Criteria**. If yes, and the user shows no objection, move to finalize immediately. Remember: **good enough is final enough**.
2. **TOOL USE (`human_assistance` - If Needed):**
* Invoke `human_assistance` with your well-formulated, friendly query.
3. **RESPONSE SYNTHESIS / IDEA FINALIZATION:**
* **If an idea is finalized:** Respond *only* with the exact phrase:
`IDEA FINALIZED: [Name of the Idea]`
(e.g., `IDEA FINALIZED: Simple Wooden Spice Rack`)
* **If brainstorming continues:**
* Provide your engaging suggestions or refinements based on your **Conversational Style**.
* Await the user response.
**General Guidelines (Your core principles):**
* **Empathy Over Pure Efficiency:** A positive, collaborative experience is the primary goal. Don't rush the user if they are still exploring.
* **Criteria Focused:** Always gently guide ideas toward the **Critical Criteria**.
* **One Main Idea at a Time:** Focus the conversation on a single project idea to avoid confusion.
* **Rapid Convergence:** Despite the friendly tone, always be looking for the fastest path to a final, viable idea.
"""
)
if state.prompt:
final_prompt = "\n".join([ guidance_prompt_text, state.prompt, ASSISTANT_SYSTEM_PROMPT_BASE])
else:
final_prompt = "\n".join([ guidance_prompt_text, ASSISTANT_SYSTEM_PROMPT_BASE])
prompt = ChatPromptTemplate.from_messages(
[
("system", final_prompt),
MessagesPlaceholder(variable_name="messages"),
]
)
# Tools allowed for brainstorming
node_tools = [human_assistance]
if state.search_enabled and tavily_search_tool: # only add search tool if enabled and initialized
node_tools.append(tavily_search_tool)
mistraltools = [
{
"type": "function",
"function": {
"name": "human_assistance",
"description": "Ask a question from the user",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"query": "The transaction id.",
}
},
"required": ["query"],
},
},
},
{
"type": "function",
"function": {
"name": "finalize_idea",
"description": "Handles finalized ideas. Saves or dispatches the confirmed idea for the next steps. but make sure you give your response with key word IDEA FINALIZED",
"parameters": {
"type": "object",
"properties": {
"idea_name": {
"type": "string",
"description": "The name of the finalized DIY idea.",
}
},
"required": ["idea_name"]
}
}
}
]
llm = init_chat_model("mistral-large-latest", model_provider="mistralai")
llm_with_tools = llm.bind_tools(mistraltools)
chain = prompt | llm_with_tools
openai_messages = convert_to_openai_messages(state.messages)
openai_messages_with_prompt = [
{"role": "system", "content": final_prompt}, # your guidance prompt
*openai_messages # history you’ve already converted
]
print('open ai formatted', openai_messages_with_prompt[-1])
for msg in openai_messages_with_prompt:
print(msg)
mistralmodel = "mistral-saba-2502"
# Pass filtered messages to the chain
try:
# response = await chain.ainvoke({"messages": filtered_messages}, config=config)
response = client.chat.complete(
model = mistralmodel,
messages = openai_messages_with_prompt,
tools = mistraltools,
tool_choice = "any",
parallel_tool_calls = False,
)
mistral_message = response.choices[0].message
tool_call = response.choices[0].message.tool_calls[0]
function_name = tool_call.function.name
function_params = json.loads(tool_call.function.arguments)
ai_message = AIMessage(
content=mistral_message.content or "", # Use empty string if blank
additional_kwargs={
"tool_calls": [
{
"id": tool_call.id,
"function": {
"name": tool_call.function.name,
"arguments": tool_call.function.arguments,
},
"type": "function", # Add this if your chain expects it
}
]
}
)
updates = {
"messages": [ai_message],
"tool_calls": [
{
"name": function_name,
"arguments": function_params,
}
],
"next": function_name,
}
print("\nfunction_name: ", function_name, "\nfunction_params: ", function_params)
print('\n🔍 response from brainstorm\n', updates)
if function_name == "finalize_idea":
print('finalazing idea')
state.brainstorming_complete = True
updates["brainstorming_complete"] = True
if isinstance(response, AIMessage) and response.content:
print(' Identified last AI message', response)
if isinstance(response.content, str):
content = response.content.strip()
elif isinstance(response.content, list):
texts = [item.get("text", "") for item in response.content if isinstance(item, dict)]
content = " ".join(texts).strip()
else:
content = str(response.content).strip()
print('content for idea finalizing:', content)
if "finalize_idea:" in content: # Use 'in' instead of 'startswith'
print('✅ final idea')
updates.update({
"brainstorming_complete": True,
"tool_call_required": False,
"loop_brainstorming": False,
})
return updates
else:
# tool_calls = getattr(response, "tool_calls", None)
if tool_call:
print('🛠️ tool call requested at brainstorming node')
updates.update({
"tool_call_required": True,
"loop_brainstorming": False,
})
if tool_call:
tool_call = response.choices[0].message.tool_calls[0]
function_name = tool_call.function.name
function_params = json.loads(tool_call.function.arguments)
print("\nfunction_name: ", function_name, "\nfunction_params: ", function_params)
# for tool_call in response.tool_calls:
# tool_name = tool_call['name']
# if tool_name == "human_assistance":
# query = tool_call['args']['query']
# print(f"Human input needed: {query}")
# for tool_call in tool_calls:
# if isinstance(tool_call, dict) and 'name' in tool_call and 'args' in tool_call:
# print(f"🔧 Tool Call (Dict): {tool_call.get('name')}, Args: {tool_call.get('args')}")
# else:
# print(f"🔧 Unknown tool_call format: {tool_call}")
else:
print('💬 decided tp keep brainstorming')
updates.update({
"tool_call_required": False,
"loop_brainstorming": True,
})
print(f"Brainstorming continues: {content}")
else:
# If no proper response, keep looping brainstorming
updates["tool_call_required"] = False
updates["loop_brainstorming"] = True
print("\n🧠🧠 | end | brainstorming Node \n")
return updates
except Exception as e:
print(f"Error: {e}")
return {
"messages": [AIMessage(content="Error.")],
"next_stage": "brainstorming"
}
async def prompt_planning_node(state: GraphProcessingState, config=None):
print("\n🚩🚩 | start | prompt planing Node \n")
# Ensure we have a model
if not model:
return {"messages": [AIMessage(content="Model not available for planning.")]}
filtered_messages = state.messages
# Filter out empty messages
# filtered_messages = [
# msg for msg in state.messages
# if isinstance(msg, (HumanMessage, AIMessage, SystemMessage, ToolMessage)) and msg.content
# ]
# filtered_messages = []
# for msg in state.messages:
# if isinstance(msg, ToolMessage):
# # 🛠️ ToolMessage needs to be paired with a prior assistant message that called the tool
# tool_name = msg.name or "unknown_tool"
# tool_call_id = msg.tool_call_id or "tool_call_id_missing"
# # Simulated assistant message that initiated the tool call
# fake_assistant_msg = AIMessage(
# content="",
# additional_kwargs={
# "tool_calls": [
# {
# "id": tool_call_id,
# "type": "function",
# "function": {
# "name": tool_name,
# "arguments": json.dumps({"content": msg.content or ""}),
# }
# }
# ]
# }
# )
# # Append both in correct sequence
# filtered_messages.append(fake_assistant_msg)
# filtered_messages.append(msg)
# elif isinstance(msg, (HumanMessage, AIMessage, SystemMessage)) and msg.content:
# filtered_messages.append(msg)
# Fallback if list ends up empty
if not filtered_messages:
filtered_messages.append(AIMessage(content="No valid messages provided."))
# Define the system prompt for planning
guidance_prompt_text = """
You are a creative and helpful AI assistant acting as a **DIY Project Brainstorming & 3D-Prompt Generator**. Your mission is to collaborate with the user to:
1. Brainstorm and refine one specific, viable DIY project idea.
2. Identify the single key component from that idea that should be 3D-modeled.
3. Produce a final, precise text prompt for an OpenAI 3D-generation endpoint.
---
**Critical Criteria for the DIY Project** (must be met):
• Buildable by an average person with only basic DIY skills.
• Uses common materials/tools (e.g., wood, screws, glue, paint; hammer, saw, drill).
• No specialized electronics, 3D printers, or proprietary parts.
• Results in a tangible, physical item.
---
**Available Tools**
• human_assistance – ask the user clarifying questions.
• (optional) your project-specific search tool – look up inspiration or standard dimensions if needed.
---
**When the DIY idea is fully detailed and meets all criteria, output exactly and only:**
ACCURATE PROMPT FOR MODEL GENERATING: [Your final single-paragraph prompt here]
"""
# Build final prompt
if state.prompt:
final_prompt = "\n".join([guidance_prompt_text, state.prompt, ASSISTANT_SYSTEM_PROMPT_BASE])
else:
final_prompt = "\n".join([guidance_prompt_text, ASSISTANT_SYSTEM_PROMPT_BASE])
prompt = ChatPromptTemplate.from_messages([
("system", final_prompt),
MessagesPlaceholder(variable_name="messages"),
])
# Bind tools
node_tools = [human_assistance]
if state.search_enabled and tavily_search_tool:
node_tools.append(tavily_search_tool)
llm_with_tools = prompt_planning_model.bind_tools(node_tools)
chain = prompt | llm_with_tools
# print(' 👾👾👾👾Debugging the request going in to prompt planing model')
# print("Prompt: ", prompt)
# print("chain: ", chain)
for msg in filtered_messages:
print('✨msg : ',msg)
print('\n')
try:
response = await chain.ainvoke({"messages": filtered_messages}, config=config)
print('\nresponse ->: ', response)
# Log any required human assistance query
if hasattr(response, "tool_calls"):
for call in response.tool_calls:
if call.get("name") == "human_assistance":
print(f"Human input needed: {call['args']['query']}")
updates = {"messages": [response]}
# Extract response text
content = ""
if isinstance(response.content, str):
content = response.content.strip()
elif isinstance(response.content, list):
content = " ".join(item.get("text","") for item in response.content if isinstance(item, dict)).strip()
# Check for finalization signalif "finalize_idea:" in content:
if "ACCURATE PROMPT FOR MODEL GENERATING" in content:
dalle_prompt_text = content.replace("ACCURATE PROMPT FOR MODEL GENERATING:", "").strip()
print(f"\n🤖🤖🤖🤖Extracted DALL-E prompt: {dalle_prompt_text}")
generated_image_url = None
generated_3d_model_url = None # This will store the final 3D model URL
# --- START: New code for DALL-E and Trellis API calls ---
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
if not OPENAI_API_KEY:
print("Error: OPENAI_API_KEY environment variable not set.")
updates["messages"].append(AIMessage(content="OpenAI API key not configured. Cannot generate image."))
else:
# 1. Call DALL-E API
dalle_api_url = "https://api.openai.com/v1/images/generations"
dalle_headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {OPENAI_API_KEY}"
}
_model_to_use_for_dalle_call = "dall-e-2" # <<< IMPORTANT: Set this to "dall-e-2" or "dall-e-3"
_processed_prompt_text = dalle_prompt_text # Start with the original prompt
_prompt_was_trimmed_or_issue_found = False
_warning_or_error_message_for_updates = None
max_prompt_lengths = {
"dall-e-2": 1000,
"dall-e-3": 4000,
"gpt-image-1": 32000 # Included for completeness, though payload is for DALL-E
}
if not _processed_prompt_text: # Check for empty prompt
_message = f"Error: The DALL-E prompt for model '{_model_to_use_for_dalle_call}' cannot be empty. API call will likely fail."
print(f"\n🛑🛑🛑🛑 {_message}")
_warning_or_error_message_for_updates = _message
_prompt_was_trimmed_or_issue_found = True
# NOTE: OpenAI API will return an error for an empty prompt.
# If you want to prevent the call entirely here, you could add:
# updates["messages"].append(AIMessage(content=_message))
# return # or raise an exception
elif _model_to_use_for_dalle_call in max_prompt_lengths:
_max_len = max_prompt_lengths[_model_to_use_for_dalle_call]
_original_len = len(_processed_prompt_text)
if _original_len > _max_len:
_processed_prompt_text = _processed_prompt_text[:_max_len]
_message = (
f"Warning: Prompt for DALL-E ({_model_to_use_for_dalle_call}) was {_original_len} characters. "
f"It has been TRUNCATED to the maximum of {_max_len} characters."
)
print(f"\n⚠️⚠️⚠️⚠️ {_message}")
_warning_or_error_message_for_updates = _message
_prompt_was_trimmed_or_issue_found = True
else:
# Model specified in _model_to_use_for_dalle_call is not in our length check dictionary
_message = (
f"Notice: Model '{_model_to_use_for_dalle_call}' not found in pre-defined prompt length limits. "
"Proceeding with the original prompt. API may reject if prompt is too long for this model."
)
print(f"\nℹ️ℹ️ℹ️ℹ️ {_message}")
# You might not want to add this specific notice to 'updates["messages"]' unless it's critical
# _warning_or_error_message_for_updates = _message
# _prompt_was_trimmed_or_issue_found = True # Or not, depending on how you view this
# Add warning/error to updates if one was generated
if _warning_or_error_message_for_updates:
# Check if 'updates' and 'AIMessage' are available in the current scope to avoid errors
if 'updates' in locals() and isinstance(updates, dict) and 'messages' in updates and 'AIMessage' in globals():
updates["messages"].append(AIMessage(content=_warning_or_error_message_for_updates))
elif 'updates' in globals() and isinstance(updates, dict) and 'messages' in updates: # If AIMessage isn't defined, just append string
updates["messages"].append(_warning_or_error_message_for_updates)
# --- Prompt Trimming Logic END ---
dalle_payload = {
"model": _model_to_use_for_dalle_call, # Use the model determined above
"prompt": _processed_prompt_text, # Use the processed (potentially trimmed) prompt
"n": 1,
"size": "1024x1024"
# You can add other DALL-E 3 specific params if _model_to_use_for_dalle_call is "dall-e-3"
# e.g., "quality": "hd", "style": "vivid"
}
print(f"\n🤖🤖🤖🤖Calling DALL-E with prompt: {dalle_prompt_text}")
async with aiohttp.ClientSession() as session:
try:
async with session.post(dalle_api_url, headers=dalle_headers, json=dalle_payload) as dalle_response:
dalle_response.raise_for_status() # Raise an exception for HTTP errors
dalle_data = await dalle_response.json()
if dalle_data.get("data") and len(dalle_data["data"]) > 0:
generated_image_url = dalle_data["data"][0].get("url")
print(f"DALL-E generated image URL: {generated_image_url}")
updates["messages"].append(AIMessage(content=f"Image generated by DALL-E: {generated_image_url}"))
else:
print("Error: DALL-E API did not return image data.")
updates["messages"].append(AIMessage(content="Failed to get image from DALL-E."))
except aiohttp.ClientError as e:
print(f"DALL-E API call error: {e}")
updates["messages"].append(AIMessage(content=f"Error calling DALL-E: {e}"))
except json.JSONDecodeError as e:
print(f"DALL-E API JSON decode error: {e}. Response: {await dalle_response.text()}")
updates["messages"].append(AIMessage(content=f"Error decoding DALL-E response: {e}"))
except Exception as e:
print(f"Unexpected error during DALL-E processing: {e}")
updates["messages"].append(AIMessage(content=f"Unexpected error with DALL-E: {e}"))
updates.update({
"generated_image_url_from_dalle": generated_image_url,
"planning_complete": True,
"tool_call_required": False,
"loop_planning": False,
})
else:
# Check if a tool call was requested
if getattr(response, "tool_calls", None):
updates.update({
"tool_call_required": True,
"loop_planning": False,
})
else:
updates.update({
"tool_call_required": False,
"loop_planning": True,
})
print("\n🚩🚩 | end | prompt planing Node \n")
return updates
except Exception as e:
print(f"Error in prompt_planning node: {e}")
return {
"messages": [AIMessage(content="Error in prompt_planning node.")],
"next_stage": state.next_stage or "planning"
}
async def generate_3d_node(state: GraphProcessingState, config=None):
print("\n🚀🚀🚀 | start | Generate 3D Node 🚀🚀🚀\n")
# 1. Get the image URL
# For now, using a hardcoded URL as requested for testing.
# In a real scenario, you might get this from the state:
# image_url = state.get("image_url_for_3d")
# if not image_url:
# print("No image_url_for_3d found in state.")
# return {"messages": [AIMessage(content="No image URL found for 3D generation.")]}
hardcoded_image_url = state.generated_image_url_from_dalle
print(f"Using hardcoded image_url: {hardcoded_image_url}")
# 2. Define API endpoint and parameters
api_base_url = "https://wishwa-code--trellis-3d-model-generate-dev.modal.run/"
params = {
"image_url": hardcoded_image_url,
"simplify": "0.95",
"texture_size": "1024",
"sparse_sampling_steps": "12",
"sparse_sampling_cfg": "7.5",
"slat_sampling_steps": "12",
"slat_sampling_cfg": "3",
"seed": "42",
"output_format": "glb"
}
# Create a directory to store generated models if it doesn't exist
output_dir = "generated_3d_models"
os.makedirs(output_dir, exist_ok=True)
# 3. Attempt generation with retries
for attempt in range(1, 2):
print(f"Attempt {attempt} to call 3D generation API...")
try:
# Note: The API call can take a long time (1.5 mins in your curl example)
# Ensure your HTTP client timeout is sufficient.
# httpx default timeout is 5 seconds, which is too short.
async with httpx.AsyncClient(timeout=120.0) as client: # Timeout set to 120 seconds
response = await client.get(api_base_url, params=params)
response.raise_for_status() # Raises an HTTPStatusError for 4XX/5XX responses
# Successfully got a response
if response.status_code == 200:
# Assuming the response body is the .glb file content
file_name = f"model_{uuid.uuid4()}.glb"
file_path = os.path.join(output_dir, file_name)
with open(file_path, "wb") as f:
f.write(response.content)
print(f"Success: 3D model saved to {file_path}")
return {
"messages": [AIMessage(content=f"3D object generation successful: {file_path}")],
"generate_3d_complete": True,
"three_d_model_path": file_path,
"next_stage": state.get("next_stage") or 'end' # Use .get for safer access
}
else:
# This case might not be reached if raise_for_status() is used effectively,
# but good for explicit handling.
error_message = f"API returned status {response.status_code}: {response.text}"
print(error_message)
if attempt == 3: # Last attempt
return {"messages": [AIMessage(content=f"Failed to generate 3D object. Last error: {error_message}")]}
except httpx.HTTPStatusError as e:
error_message = f"HTTP error occurred: {e.response.status_code} - {e.response.text}"
print(error_message)
if attempt == 3:
return {"messages": [AIMessage(content=f"Failed to generate 3D object after 3 attempts. Last HTTP error: {error_message}")]}
except httpx.RequestError as e: # Catches network errors, timeout errors etc.
error_message = f"Request error occurred: {str(e)}"
print(error_message)
if attempt == 3:
return {"messages": [AIMessage(content=f"Failed to generate 3D object after 3 attempts. Last request error: {error_message}")]}
except Exception as e:
error_message = f"An unexpected error occurred: {str(e)}"
print(error_message)
if attempt == 3:
return {"messages": [AIMessage(content=f"Failed to generate 3D object after 3 attempts. Last unexpected error: {error_message}")]}
if attempt < 2:
print("Retrying...")
else:
print("Max retries reached.")
# Failed after retries (this path should ideally be covered by returns in the loop)
return {"messages": [AIMessage(content="Failed to generate a valid 3D object after 3 attempts.")]}
def define_workflow() -> CompiledStateGraph:
"""Defines the workflow graph"""
# Initialize the graph
workflow = StateGraph(GraphProcessingState)
# Add nodes
workflow.add_node("tools", DebugToolNode(tools))
workflow.add_node("guidance_node", guidance_node)
workflow.add_node("brainstorming_node", brainstorming_node)
workflow.add_node("prompt_planning_node", prompt_planning_node)
workflow.add_node("generate_3d_node", generate_3d_node)
# workflow.add_node("planning_node", planning_node)
# Edges
workflow.add_conditional_edges(
"guidance_node",
guidance_routing,
{
"brainstorming_node" : "brainstorming_node",
"prompt_planning_node" : "prompt_planning_node",
"generate_3d_node" : "generate_3d_node"
}
)
workflow.add_conditional_edges(
"brainstorming_node",
tools_condition,
)
workflow.add_conditional_edges(
"prompt_planning_node",
tools_condition,
)
workflow.add_edge("tools", "guidance_node")
workflow.add_edge("brainstorming_node", "guidance_node")
workflow.add_edge("prompt_planning_node", "guidance_node")
workflow.add_edge("generate_3d_node", "guidance_node")
# workflow.add_conditional_edges(
# "guidance_node", # The source node
# custom_route_after_guidance, # Your custom condition function
# {
# # "Path name": "Destination node name"
# "execute_tools": "tools", # If function returns "execute_tools"
# "proceed_to_next_stage": "planning_node" # If function returns "proceed_to_next_stage"
# # Or this could be another router, or END
# }
# )
# workflow.add_conditional_edges("guidance_node", guidance_routing)
# workflow.add_conditional_edges("brainstorming_node", brainstorming_routing)
# # Set end nodes
workflow.set_entry_point("guidance_node")
# workflow.set_finish_point("assistant_node")
compiled_graph = workflow.compile(checkpointer=memory)
try:
img_bytes = compiled_graph.get_graph().draw_mermaid_png()
with open("graph.png", "wb") as f:
f.write(img_bytes)
print("Graph image saved as graph.png")
except Exception as e:
print("Can't print the graph:")
print(e)
return compiled_graph
graph = define_workflow()
# async def assistant_node(state: GraphProcessingState, config=None):
# print("\n--- Assistance Node (Debug via print) ---") # Added a newline for clarity
# print(f"Prompt: {state.prompt}")
# print(f"Tools Enabled: {state.tools_enabled}")
# print(f"Search Enabled: {state.search_enabled}")
# print(f"Next Stage: {state.next_stage}")
# # Log boolean completion flags
# print(f"Idea Complete: {state.idea_complete}")
# print(f"Brainstorming Complete: {state.brainstorming_complete}")
# print(f"Planning Complete: {state.planning_complete}")
# print(f"Drawing Complete: {state.drawing_complete}")
# print(f"Product Searching Complete: {state.product_searching_complete}")
# print(f"Purchasing Complete: {state.purchasing_complete}")
# print("--- End Guidance Node Debug ---") # Added for clarity
# print(f"\nMessage: {state.messages}")
# assistant_tools = []
# if state.tools_enabled.get("download_website_text", True):
# assistant_tools.append(download_website_text)
# if search_enabled and state.tools_enabled.get("tavily_search_results_json", True):
# assistant_tools.append(tavily_search_tool)
# assistant_model = model.bind_tools(assistant_tools)
# if state.prompt:
# final_prompt = "\n".join([state.prompt, ASSISTANT_SYSTEM_PROMPT_BASE])
# else:
# final_prompt = ASSISTANT_SYSTEM_PROMPT_BASE
# prompt = ChatPromptTemplate.from_messages(
# [
# ("system", final_prompt),
# MessagesPlaceholder(variable_name="messages"),
# ]
# )
# chain = prompt | assistant_model
# response = await chain.ainvoke({"messages": state.messages}, config=config)
# for msg in response:
# if isinstance(msg, HumanMessage):
# print("Human:", msg.content)
# elif isinstance(msg, AIMessage):
# if isinstance(msg.content, list):
# ai_texts = [part.get("text", "") for part in msg.content if isinstance(part, dict)]
# print("AI:", " ".join(ai_texts))
# else:
# print("AI:", msg.content)
# idea_complete = evaluate_idea_completion(response)
# return {
# "messages": response,
# "idea_complete": idea_complete
# }
# # message = llm_with_tools.invoke(state["messages"])
# # Because we will be interrupting during tool execution,
# # we disable parallel tool calling to avoid repeating any
# # tool invocations when we resume.
# assert len(response.tool_calls) <= 1
# idea_complete = evaluate_idea_completion(response)
# return {
# "messages": response,
# "idea_complete": idea_complete
# }
#
# async def planning_node(state: GraphProcessingState, config=None):
# # Define the system prompt for planning
# planning_prompt = "Based on the user's idea, create a detailed step-by-step plan to build the DIY product."
# # Combine the planning prompt with any existing prompts
# if state.prompt:
# final_prompt = "\n".join([planning_prompt, state.prompt, ASSISTANT_SYSTEM_PROMPT_BASE])
# else:
# final_prompt = "\n".join([planning_prompt, ASSISTANT_SYSTEM_PROMPT_BASE])
# # Create the prompt template
# prompt = ChatPromptTemplate.from_messages(
# [
# ("system", final_prompt),
# MessagesPlaceholder(variable_name="messages"),
# ]
# )
# # Bind tools if necessary
# assistant_tools = []
# if state.tools_enabled.get("download_website_text", True):
# assistant_tools.append(download_website_text)
# if search_enabled and state.tools_enabled.get("tavily_search_results_json", True):
# assistant_tools.append(tavily_search_tool)
# assistant_model = model.bind_tools(assistant_tools)
# # Create the chain and invoke it
# chain = prompt | assistant_model
# response = await chain.ainvoke({"messages": state.messages}, config=config)
# return {
# "messages": response
# }
# async def guidance_node(state: GraphProcessingState, config=None):
# print("\n--- Guidance Node (Debug via print) ---")
# print(f"Prompt: {state.prompt}")
# for message in state.messages:
# if isinstance(message, HumanMessage):
# print(f"Human: {message.content}")
# elif isinstance(message, AIMessage):
# if message.content:
# if isinstance(message.content, list):
# texts = [item.get('text', '') for item in message.content if isinstance(item, dict) and 'text' in item]
# if texts:
# print(f"AI: {' '.join(texts)}")
# elif isinstance(message.content, str):
# print(f"AI: {message.content}")
# elif isinstance(message, SystemMessage):
# print(f"System: {message.content}")
# elif isinstance(message, ToolMessage):
# print(f"Tool: {message.content}")
# print(f"Tools Enabled: {state.tools_enabled}")
# print(f"Search Enabled: {state.search_enabled}")
# print(f"Next Stage: {state.next_stage}")
# print(f"Brainstorming Complete: {state.brainstorming_complete}")
# guidance_node.count = getattr(guidance_node, 'count', 0) + 1
# print('\nGuidance Node called count', guidance_node.count)
# print("\n--- End Guidance Node Debug ---")
# stage_order = ["brainstorming", "planning", "drawing", "product_searching", "purchasing"]
# completed = [stage for stage in stage_order if getattr(state, f"{stage}_complete", False)]
# incomplete = [stage for stage in stage_order if not getattr(state, f"{stage}_complete", False)]
# if not incomplete:
# print("All stages complete!")
# # Handle case where all stages are complete
# # You might want to return a message and end, or set proposed_next_stage to a special value
# ai_all_complete_msg = AIMessage(content="All DIY project stages are complete!")
# return {
# "messages": current_messages + [ai_all_complete_msg],
# "next_stage": "end_project", # Or None, or a final summary node
# "pending_approval_stage": None,
# }
# else:
# # THIS LINE DEFINES THE VARIABLE
# proposed_next_stage = incomplete[0]
# print(f"Proposed next stage: {proposed_next_stage}")
# status_summary = f"Completed stages: {completed}\nIncomplete stages: {incomplete}"
# guidance_prompt_text = (
# "You are the Guiding Assistant for a DIY project. Your primary responsibility is to determine the next logical step "
# "and then **obtain the user's explicit approval** before proceeding.\n\n"
# f"CURRENT PROJECT STATUS:\n{status_summary}\n\n"
# f"Based on the status, the most logical next stage appears to be: **'{proposed_next_stage}'**.\n\n"
# "YOUR TASK:\n"
# f"1. Formulate a clear and concise question for the user, asking if they agree to proceed to the **'{proposed_next_stage}'** stage. For example: 'It looks like '{proposed_next_stage}' is next. Shall we proceed with that?' or 'Are you ready to move on to {proposed_next_stage}?'\n"
# "2. **You MUST use the 'human_assistance' tool to ask this question.** Do not answer directly. Invoke the tool with your question.\n"
# "Example of tool usage (though you don't write this, you *call* the tool):\n"
# "Tool Call: human_assistance(query='The next stage is planning. Do you want to proceed with planning?')\n\n"
# "Consider the user's most recent message if it provides any preference."
# )
# if state.prompt:
# final_prompt = "\n".join([guidance_prompt_text, state.prompt, ASSISTANT_SYSTEM_PROMPT_BASE])
# else:
# final_prompt = "\n".join([guidance_prompt_text, ASSISTANT_SYSTEM_PROMPT_BASE])
# prompt = ChatPromptTemplate.from_messages(
# [
# ("system", final_prompt),
# MessagesPlaceholder(variable_name="messages"),
# ]
# )
# assistant_model = model.bind_tools([human_assistance])
# chain = prompt | assistant_model
# try:
# response = await chain.ainvoke({"messages": state.messages}, config=config)
# for msg in response:
# if isinstance(msg, HumanMessage):
# print("Human:", msg.content)
# elif isinstance(msg, AIMessage):
# if isinstance(msg.content, list):
# ai_texts = [part.get("text", "") for part in msg.content if isinstance(part, dict)]
# print("AI:", " ".join(ai_texts))
# else:
# print("AI:", msg.content)
# # Check for tool calls in the response
# if hasattr(response, "tool_calls"):
# for tool_call in response.tool_calls:
# tool_name = tool_call['name']
# if tool_name == "human_assistance":
# query = tool_call['args']['query']
# print(f"Human input needed: {query}")
# # Handle human assistance tool call
# # You can pause execution and wait for user input here
# return {
# "messages": [response],
# "next_stage": incomplete[0] if incomplete else "brainstorming"
# }
# except Exception as e:
# print(f"Error in guidance node: {e}")
# return {
# "messages": [AIMessage(content="Error in guidance node.")],
# "next_stage": "brainstorming"
# } |