WebashalarForML's picture
Upload 111 files
adadaa5 verified
raw
history blame
124 kB
from flask import Flask, request, jsonify, render_template, send_from_directory
from langgraph.prebuilt import create_react_agent
from langchain_groq import ChatGroq
#from langgraph.graph import draw
from langchain.chat_models import ChatOpenAI
from PIL import Image
import os, json, re
import shutil
import uuid
from langgraph.graph import StateGraph, END
import logging
from typing import Dict, TypedDict, Optional, Any, List
# --- Configure logging ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
app = Flask(__name__)
# --- LLM / Vision model setup ---
os.environ["GROQ_API_KEY"] = os.getenv("GROQ_API_KEY", "default_key_or_placeholder")
os.environ["OPENROUTER_API_KEY"] = os.getenv("OPENROUTER_API_KEY", "default_key_or_placeholder")
os.environ["OPENROUTER_BASE_URL"] = os.getenv("OPENROUTER_BASE_URL", "default_key_or_placeholder")
groq_key = os.environ["GROQ_API_KEY"]
if not groq_key or groq_key == "default_key_or_placeholder":
logger.critical("GROQ_API_KEY environment variable is not set or invalid. Please set it to proceed.")
raise ValueError("GROQ_API_KEY environment variable is not set or invalid.")
#Main LLM for the SCRATCH 3.0 Agent
llm = ChatGroq(
#model="deepseek-r1-distill-llama-70b",
#model="llama-3.3-70b-versatile",
#model="meta-llama/llama-4-maverick-17b-128e-instruct",
model="meta-llama/llama-4-scout-17b-16e-instruct",
temperature=0.0,
)
# Json debugger [temporary]
llm2 = ChatGroq(
model="deepseek-r1-distill-llama-70b",
#model="llama-3.3-70b-versatile",
#model="meta-llama/llama-4-maverick-17b-128e-instruct",
#model="meta-llama/llama-4-scout-17b-16e-instruct",
temperature=0.0,
)
# llm = ChatOpenAI(
# openai_api_key=os.environ["OPENROUTER_API_KEY"],
# openai_api_base=os.environ["OPENROUTER_BASE_URL"],
# model_name="deepseek/deepseek-r1-0528:free",
# model_kwargs={
# "headers": {
# "HTTP-Referer": "https://localhost:5000/",
# "X-Title": "agent_scratch",
# }
# },
#)
# Refined SYSTEM_PROMPT with more explicit Scratch JSON rules, especially for variables
SYSTEM_PROMPT = """
You are an expert AI assistant named GameScratchAgent, specialized in generating and modifying Scratch-VM 3.x game project JSON.
Your core task is to process game descriptions and existing Scratch JSON structures, then produce or update JSON segments accurately.
You possess deep knowledge of Scratch 3.0 project schema, informed by comprehensive reference materials. When generating or modifying the `blocks` section, pay extremely close attention to the following:
**Scratch Target and Block Schema Rules:**
1. **Target Structure:**
* Every target (Stage or Sprite) must have an `objName` property. For the Stage, it's typically `"objName": "Stage"`. For sprites, it's their name (e.g., `"objName": "Cat"`).
* `variables` dictionary: This dictionary maps unique variable IDs to arrays `[variable_name, initial_value]`.
Example: `"variables": { "myVarId123": ["score", 0] }`
2. **Block Structure:** Every block must have `opcode`, `blockType`, `parent`, `next`, `inputs`, `fields`, `shadow`, `topLevel`.
* `opcode`: Unique internal identifier for the block's specific functionality (e.g., `"motion_movesteps"`)[cite: 439, 452].
* `blockType`: Specifies the visual shape and general behavior. Common types include `"command"` (Stack block), `"reporter"` (Reporter block), `"Boolean"` (Boolean block), and `"hat"` (Hat block)[cite: 453, 454].
* `parent`: ID of the block it's stacked on (or `null` for top-level).
* `next`: ID of the block directly below it (or `null` for end of stack).
* `topLevel`: `true` if it's a hat block or a standalone block, `false` otherwise.
* `shadow`: `true` if it's a shadow block (e.g., a default number input), `false` otherwise.
3. **`inputs` vs. `fields`:** This is critical.
* **`inputs`**: Used for blocks or values *plugged into* a block (e.g., condition for an `if` block, target for `go to`). Values are **arrays**.
* `[1, <blockId>]`: Represents a Reporter or Boolean block *plugged in* (e.g., `operator_equals`).
* `[1, [<type>, "<value>"]]`: For a shadow block with a literal value. The `type` specifies the data type and can be "num" (number), "str" (string), "bool" (Boolean), "colour", "angle", "date", "dropdown", "iconmenu", or "variable"[cite: 457, 461].
Example: `[1, ["num", "10"]]` for a number input.
* `[2, <blockId>]`: For a C-block's substack (e.g., `SUBSTACK` in `control_if`).
* **`fields`**: Used for dropdown menus or direct internal values *within* a block (e.g., key selected in `when key pressed`, variable name in `set variable`). Values are always **arrays** `["<value>", null]`.
* Example for `event_whenkeypressed`: `"KEY": ["space", null]`
* Example for `data_setvariableto`: `"VARIABLE": ["score", null]`
4. **Unique IDs:** All block IDs, variable IDs, and list IDs must be unique strings (e.g., "myBlock123", "myVarId456"). Do NOT use placeholder strings like "block_id_here".
5. **No Nested `blocks` Dictionary:** The `blocks` dictionary should only appear once per `target` (sprite/stage). Do NOT nest a `blocks` dictionary inside an individual block definition. Blocks that are part of a substack (like inside an `if` or `forever` or `repeat` loop and other) are linked via the `SUBSTACK` input, where the input value is `[2, "ID_of_first_block_in_substack"]`.
6. **Asset Properties:** `assetId`, `md5ext`, `bitmapResolution`, `rotationCenterX`/`rotationCenterY` should be correct for costumes/sounds.
**General Principles and Important Considerations:**
* **Backward Compatibility:** Adhere strictly to existing Scratch 3.0 opcodes and schema to ensure backward compatibility with older projects[cite: 439, 440, 446].
* **Forgiving Inputs:** Recognize that Scratch is designed to be "forgiving in its interpretation of inputs"[cite: 442, 459]. While generating valid JSON, understand that the Scratch VM handles potentially "invalid" inputs gracefully (e.g., type coercion, returning default values like zero or empty strings, rather than crashing)[cite: 443, 459]. This implies that precise type matching for inputs might be handled internally by Scratch, allowing for some flexibility in how values are provided, but the agent should aim for the most common and logical type.
**When responding, you must:**
1. **Output ONLY the complete, valid JSON object.** Do not include markdown, commentary, or conversational text.
2. Ensure every generated block ID, input, field, and variable declaration strictly follows the Scratch 3.0 schema and the rules above.
3. If presented with partial or problematic JSON, aim to complete or correct it based on the given instructions and your schema knowledge.
"""
SYSTEM_PROMPT_JSON_CORRECTOR ="""
You are an assistant that outputs JSON responses strictly following the given schema.
If the JSON you produce has any formatting errors, missing required fields, or invalid structure, you must identify the problems and correct them.
Always return only valid JSON that fully conforms to the schema below, enclosed in triple backticks (```), without any extra text or explanation.
If you receive an invalid or incomplete JSON response, fix it by:
- Adding any missing required fields with appropriate values.
- Correcting syntax errors such as missing commas, brackets, or quotes.
- Ensuring the JSON structure matches the schema exactly.
Remember: Your output must be valid JSON only, ready to be parsed without errors.
"""
# Main agent of the system agent for Scratch 3.0
agent = create_react_agent(
model=llm,
tools=[], # No specific tools are defined here, but could be added later
prompt=SYSTEM_PROMPT
)
# debugger and resolver agent for Scratch 3.0
agent_json_resolver = create_react_agent(
model=llm,
tools=[], # No specific tools are defined here, but could be added later
prompt=SYSTEM_PROMPT_JSON_CORRECTOR
)
# --- Serve the form ---
@app.route("/", methods=["GET"])
def index():
return render_template("index4.html")
# --- List static assets for the front-end ---
@app.route("/list_assets", methods=["GET"])
def list_assets():
bdir = os.path.join(app.static_folder, "assets", "backdrops")
sdir = os.path.join(app.static_folder, "assets", "sprites")
backdrops = []
sprites = []
try:
if os.path.isdir(bdir):
backdrops = [f for f in os.listdir(bdir) if f.lower().endswith(".svg")]
if os.path.isdir(sdir):
sprites = [f for f in os.listdir(sdir) if f.lower().endswith(".svg")]
logger.info("Successfully listed static assets.")
except Exception as e:
logger.error(f"Error listing static assets: {e}")
return jsonify({"error": "Failed to list assets"}), 500
return jsonify(backdrops=backdrops, sprites=sprites)
# --- Helper: build costume entries ---
def make_costume_entry(folder, filename, name):
asset_id = filename.rsplit(".", 1)[0]
entry = {
"name": name,
"bitmapResolution": 1,
"dataFormat": filename.rsplit(".", 1)[1],
"assetId": asset_id,
"md5ext": filename,
}
path = os.path.join(app.static_folder, "assets", folder, filename)
ext = filename.lower().endswith(".svg")
if ext:
if folder == "backdrops":
entry["rotationCenterX"] = 240
entry["rotationCenterY"] = 180
else:
entry["rotationCenterX"] = 0
entry["rotationCenterY"] = 0
else:
try:
img = Image.open(path)
w, h = img.size
entry["rotationCenterX"] = w // 2
entry["rotationCenterY"] = h // 2
except Exception as e:
logger.warning(f"Could not determine image dimensions for {filename}: {e}. Setting center to 0,0.")
entry["rotationCenterX"] = 0
entry["rotationCenterY"] = 0
return entry
# --- New endpoint to fetch project.json ---
@app.route("/get_project/<project_id>", methods=["GET"])
def get_project(project_id):
project_folder = os.path.join("generated_projects", project_id)
project_json_path = os.path.join(project_folder, "project.json")
try:
if os.path.exists(project_json_path):
logger.info(f"Serving project.json for project ID: {project_id}")
return send_from_directory(project_folder, "project.json", as_attachment=True, download_name=f"{project_id}.json")
else:
logger.warning(f"Project JSON not found for ID: {project_id}")
return jsonify({"error": "Project not found"}), 404
except Exception as e:
logger.error(f"Error serving project.json for ID {project_id}: {e}")
return jsonify({"error": "Failed to retrieve project"}), 500
# --- New endpoint to fetch assets ---
@app.route("/get_asset/<project_id>/<filename>", methods=["GET"])
def get_asset(project_id, filename):
project_folder = os.path.join("generated_projects", project_id)
asset_path = os.path.join(project_folder, filename)
try:
if os.path.exists(asset_path):
logger.info(f"Serving asset '{filename}' for project ID: {project_id}")
return send_from_directory(project_folder, filename)
else:
logger.warning(f"Asset '{filename}' not found for project ID: {project_id}")
return jsonify({"error": "Asset not found"}), 404
except Exception as e:
logger.error(f"Error serving asset '{filename}' for project ID {project_id}: {e}")
return jsonify({"error": "Failed to retrieve asset"}), 500
### LangGraph Workflow Definition
# Define a state for your graph using TypedDict
class GameState(TypedDict):
project_json: dict
description: str
project_id: str
sprite_initial_positions: dict # Add this as well, as it's part of your state
action_plan: Optional[Dict]
#behavior_plan: Optional[Dict]
improvement_plan: Optional[Dict]
needs_improvement: bool
plan_validation_feedback: Optional[Dict]
iteration_count: int # Track the number of iterations for improvements
review_block_feedback: Optional[Dict] # Feedback from the agent on the blocks after verification
# Helper function to update project JSON with sprite positions
import copy
def update_project_with_sprite_positions(project_json: dict, sprite_positions: dict) -> dict:
"""
Update the 'x' and 'y' coordinates of sprites in the Scratch project JSON.
Args:
project_json (dict): Original Scratch project JSON.
sprite_positions (dict): Dict mapping sprite names to {'x': int, 'y': int}.
Returns:
dict: Updated project JSON with new sprite positions.
"""
updated_project = copy.deepcopy(project_json)
for target in updated_project.get("targets", []):
if not target.get("isStage", False):
sprite_name = target.get("name")
if sprite_name in sprite_positions:
pos = sprite_positions[sprite_name]
if "x" in pos and "y" in pos:
target["x"] = pos["x"]
target["y"] = pos["y"]
return updated_project
# Helper function to load the block catalog from a JSON file
def _load_block_catalog(file_path: str) -> Dict:
"""Loads the Scratch block catalog from a specified JSON file."""
try:
with open(file_path, 'r') as f:
catalog = json.load(f)
logger.info(f"Successfully loaded block catalog from {file_path}")
return catalog
except FileNotFoundError:
logger.error(f"Error: Block catalog file not found at {file_path}")
# Return an empty dict or raise an error, depending on desired behavior
return {}
except json.JSONDecodeError as e:
logger.error(f"Error decoding JSON from {file_path}: {e}")
return {}
except Exception as e:
logger.error(f"An unexpected error occurred while loading {file_path}: {e}")
return {}
# --- Global variable for the block catalog ---
ALL_SCRATCH_BLOCKS_CATALOG = {}
BLOCK_CATALOG_PATH = r"blocks\blocks.json" # Define the path to your JSON file
HAT_BLOCKS_PATH = r"blocks\hat_blocks.json" # Path to the hat blocks JSON file
STACK_BLOCKS_PATH = r"blocks\stack_blocks.json" # Path to the stack blocks JSON file
REPORTER_BLOCKS_PATH = r"blocks\reporter_blocks.json" # Path to the reporter blocks JSON file
BOOLEAN_BLOCKS_PATH = r"blocks\boolean_blocks.json" # Path to the boolean blocks JSON file
C_BLOCKS_PATH = r"blocks\c_blocks.json" # Path to the C blocks JSON file
CAP_BLOCKS_PATH = r"blocks\cap_blocks.json" # Path to the cap blocks JSON file
# Load the block catalogs from their respective JSON files
hat_block_data = _load_block_catalog(HAT_BLOCKS_PATH)
hat_description = hat_block_data["description"]
hat_opcodes_functionalities = "\n".join([f" - Opcode: {block['op_code']}, functionality: {block['functionality']}" for block in hat_block_data["blocks"]])
print("Hat blocks loaded successfully.", hat_description)
boolean_block_data = _load_block_catalog(BOOLEAN_BLOCKS_PATH)
boolean_description = boolean_block_data["description"]
boolean_opcodes_functionalities = "\n".join([f" - Opcode: {block['op_code']}, functionality: {block['functionality']}" for block in boolean_block_data["blocks"]])
c_block_data = _load_block_catalog(C_BLOCKS_PATH)
c_description = c_block_data["description"]
c_opcodes_functionalities = "\n".join([f" - Opcode: {block['op_code']}, functionality: {block['functionality']}" for block in c_block_data["blocks"]])
cap_block_data = _load_block_catalog(CAP_BLOCKS_PATH)
cap_description = cap_block_data["description"]
cap_opcodes_functionalities = "\n".join([f" - Opcode: {block['op_code']}, functionality: {block['functionality']}" for block in cap_block_data["blocks"]])
reporter_block_data = _load_block_catalog(REPORTER_BLOCKS_PATH)
reporter_description = reporter_block_data["description"]
reporter_opcodes_functionalities = "\n".join([f" - Opcode: {block['op_code']}, functionality: {block['functionality']}" for block in reporter_block_data["blocks"]])
stack_block_data = _load_block_catalog(STACK_BLOCKS_PATH)
stack_description = stack_block_data["description"]
stack_opcodes_functionalities = "\n".join([f" - Opcode: {block['op_code']}, functionality: {block['functionality']}" for block in stack_block_data["blocks"]])
# This makes ALL_SCRATCH_BLOCKS_CATALOG available globally
ALL_SCRATCH_BLOCKS_CATALOG = _load_block_catalog(BLOCK_CATALOG_PATH)
# Helper function to generate a unique block ID
def generate_block_id():
"""Generates a short, unique ID for a Scratch block."""
return str(uuid.uuid4())[:10].replace('-', '') # Shorten for readability, ensure uniqueness
# Placeholder for your extract_json_from_llm_response function
# # Helper function to extract JSON from LLM response
# def extract_json_from_llm_response(raw_response: str) -> dict:
# """
# Extracts a JSON object from an LLM response string, robustly handling
# literal newlines inside string values by escaping them.
# """
# # 1) Pull out the inner JSON block (```json … ```)
# json_block = raw_response
# m = re.search(r"```json\s*(\{.*\})\s*```", raw_response, re.DOTALL)
# if m:
# json_block = m.group(1)
# # 2) Find the feedback field and escape its newlines
# # This pattern captures everything between "feedback": " … "
# feedback_pattern = r'("feedback"\s*:\s*")(.+?)("\s*,)'
# def _escape_feedback(m):
# prefix, val, suffix = m.groups()
# # replace each literal newline and carriage return with \n
# safe_val = val.replace("\r", "\\r").replace("\n", "\\n")
# return prefix + safe_val + suffix
# json_sanitized = re.sub(feedback_pattern, _escape_feedback, json_block, flags=re.DOTALL)
# # 3) Now try loading
# try:
# return json.loads(json_sanitized)
# except json.JSONDecodeError as e:
# logger.error(f"Failed to parse sanitized JSON: {e!r}\nContent was:\n{json_sanitized}")
# raise
def extract_json_from_llm_response(raw_response: str) -> dict:
"""
Extracts a JSON object from an LLM response string, robustly handling
various common LLM output formats and parsing errors.
"""
json_string_to_parse = raw_response
# --- Step 1: Try to extract JSON from markdown code block (most common and cleanest) ---
# This pattern is more robust: it matches "```json" or "```" (for general code blocks)
# and captures everything until the next "```"
markdown_match = re.search(r"```(?:json)?\s*([\s\S]*?)\s*```", raw_response)
if markdown_match:
json_string_to_parse = markdown_match.group(1).strip()
logger.debug("Extracted potential JSON from markdown block.")
else:
logger.debug("No markdown JSON block found. Attempting to parse raw response.")
# --- Step 2: Pre-process for common LLM JSON generation errors ---
# 2.1) Escape newlines/carriage returns in string values
# This is a general fix for all string values, not just 'feedback'
# It looks for "key": "value" patterns and replaces newlines within 'value'
# This might be tricky to get right for all cases without a full parser.
# A safer bet is to hope the LLM formats strings correctly, or to rely on direct JSON parsing.
# For now, let's keep your feedback-specific one if you find LLM adds newlines there,
# but the primary error is structural.
# Re-apply feedback escaping if you still suspect newlines in specific fields
feedback_pattern = r'("feedback"\s*:\s*")(.+?)("(?:\s*,|\s*\})?)' # Adjusted regex to handle end of object/array
def _escape_feedback(m):
prefix, val, suffix = m.groups()
safe_val = val.replace("\r", "\\r").replace("\n", "\\n")
return prefix + safe_val + suffix
json_string_to_parse = re.sub(feedback_pattern, _escape_feedback, json_string_to_parse, flags=re.DOTALL)
logger.debug("Applied feedback newline escaping.")
# --- Step 3: Attempt to parse the (sanitized) JSON string ---
try:
parsed_json = json.loads(json_string_to_parse)
logger.info("Successfully parsed JSON from LLM response.")
return parsed_json
except json.JSONDecodeError as original_error:
# If parsing fails, log the problematic string and the error for debugging
logger.error(f"Failed to parse JSON. Error: {original_error!r}")
logger.error(f"Problematic JSON string (start):\n{json_string_to_parse[:1000]}...") # Log first 1000 chars
logger.error(f"Problematic JSON string (full length: {len(json_string_to_parse)}):\n{json_string_to_parse}") # Log full for deep dive
# Attempt a fallback for common structural issues (e.g., extra trailing commas)
# This is speculative and may not always work, but worth a try before failing.
try:
# Remove trailing commas from objects and arrays
# This regex is simplified and might not catch all cases, but handles common ones
# For example, {"a": 1,} -> {"a": 1}
# Or [1, 2,] -> [1, 2]
sanitized_for_trailing_commas = re.sub(r',\s*([}\]])', r'\1', json_string_to_parse)
logger.warning("Attempting to parse after removing potential trailing commas.")
return json.loads(sanitized_for_trailing_commas)
except json.JSONDecodeError as e_fallback:
logger.error(f"Fallback parsing also failed. Error: {e_fallback!r}")
# The original error is more informative for 'Expecting ,' delimiter, so raise that.
raise original_error # Re-raise the original, more specific JSONDecodeError
# Node 1: Detailed description generator.
def game_description_node(state: GameState):
"""
Generates a detailed narrative description of the game based on the initial query.
"""
logger.info("--- Running GameDescriptionNode ---")
sprite_name = {}
initial_description = state.get("description", "A simple game.")
project_json = state["project_json"]
for target in project_json["targets"]:
sprite_name[target["name"]] = target["name"]
description_prompt = (
f"You are an AI assistant tasked with generating a detailed narrative description for a Scratch 3.0 game.\n"
f"The initial high-level description is: '{initial_description}'.\n"
f"The current Scratch project JSON is:\n```json\n{json.dumps(state['project_json'], indent=2)}\n```\n"
f"Make sure you donot change Sprite and Stage name. Here are all the name: {sprite_name} \n"
f"Create a rich, engaging, and detailed description, including potential gameplay elements, objectives, and overall feel with the available resources\n"
f"The output should be a plain text description."
)
try:
response = agent.invoke({"messages": [{"role": "user", "content": description_prompt}]})
detailed_game_description = response["messages"][-1].content
state["description"] = detailed_game_description
logger.info("Detailed game description generated by GameDescriptionNode.")
print(f"Detailed Game Description: {detailed_game_description}")
return state
except Exception as e:
logger.error(f"Error in GameDescriptionNode: {e}")
raise
# Node 2: Analysis User Query and Initial Positions
def parse_query_and_set_initial_positions(state: GameState):
logger.info("--- Running ParseQueryNode ---")
llm_query_prompt = (
f"Based on the user's game description: '{state['description']}', "
f"and the current Scratch project JSON below, "
f"determine the most appropriate initial 'x' and 'y' coordinates for each sprite. "
f"Return ONLY a JSON object with a single key 'sprite_initial_positions' mapping sprite names to their {{'x': int, 'y': int}} coordinates.\n\n"
f"The current Scratch project JSON is:\n```json\n{json.dumps(state['project_json'], indent=2)}\n```\n"
f"Example Json output:\n"
"```json\n"
"{\n"
" \"sprite_initial_positions\": {\n"
" \"Sprite1\": {\"x\": -160, \"y\": -110},\n"
" \"Sprite2\": {\"x\": 240, \"y\": -135}\n"
" }\n"
"}"
"```\n"
)
try:
response = agent.invoke({"messages": [{"role": "user", "content": llm_query_prompt}]})
raw_response = response["messages"][-1].content
print("Raw response from LLM:", raw_response)
# json debugging and solving
try:
updated_data = extract_json_from_llm_response(raw_response)
sprite_positions = updated_data.get("sprite_initial_positions", {})
except json.JSONDecodeError:
logger.error("Failed to extract JSON from LLM response. Attempting to correct the response.")
# Use the JSON resolver agent to fix the response
correction_prompt = (
"Correct the following JSON respose to ensure it follows proper schema rules:\n\n"
"Take care of following instruction carefully:\n"
"1. Do **NOT** put anything inside the json just correct and resolve the issues inside the json.\n"
"2. Do **NOT** add any additional text, comments, or explanations.\n"
"3. Just response correct the json where you find and put the same content as it is.\n"
"Here is the raw response:\n\n"
f"raw_response: {raw_response}\n\n"
)
correction_response = agent_json_resolver.invoke({"messages": [{"role": "user", "content": correction_prompt}]})
print(f"[JSON CORRECTOR RESPONSE AT PARSER]: {correction_response}")
corrected_data = extract_json_from_llm_response(correction_response["messages"][-1].content)
sprite_positions = corrected_data.get("sprite_initial_positions", {})
new_project_json = update_project_with_sprite_positions(state["project_json"], sprite_positions)
state["project_json"]= new_project_json
print("Updated project JSON with sprite positions:", json.dumps(new_project_json, indent=2))
return state
#return {"project_json": new_project_json}
except Exception as e:
logger.error(f"Error in ParseQueryNode: {e}")
raise
# Node 3: Sprite Action Plan Builder
def overall_planner_node(state: GameState):
"""
Generates a comprehensive action plan for sprites, including detailed Scratch block information.
This node acts as an overall planner, leveraging knowledge of all block shapes and categories.
"""
logger.info("--- Running OverallPlannerNode ---")
description = state.get("description", "")
project_json = state["project_json"]
sprite_names = [t["name"] for t in project_json["targets"] if not t["isStage"]]
# Get sprite positions from the project_json if available, or set defaults
sprite_positions = {}
for target in project_json["targets"]:
if not target["isStage"]:
sprite_positions[target["name"]] = {"x": target.get("x", 0), "y": target.get("y", 0)}
planning_prompt = (
"Generate a detailed action plan for the game's sprites based on the user query and sprite details.\n\n"
f"**Game Description:** '{description}'\n\n"
f"**Sprites in Game:** {', '.join(sprite_names)}\n"
f"**Current Sprite Positions:** {json.dumps(sprite_positions)}\n\n"
"--- Scratch 3.0 Block Reference ---\n"
"This section provides a comprehensive reference of Scratch 3.0 blocks, categorized by shape, including their opcodes and functional descriptions. Use this to accurately identify block types and behavior.\n\n"
f"### Hat Blocks\nDescription: {hat_description}\nBlocks:\n{hat_opcodes_functionalities}\n\n"
f"### Boolean Blocks\nDescription: {boolean_description}\nBlocks:\n{boolean_opcodes_functionalities}\n\n"
f"### C Blocks\nDescription: {c_description}\nBlocks:\n{c_opcodes_functionalities}\n\n"
f"### Cap Blocks\nDescription: {cap_description}\nBlocks:\n{cap_opcodes_functionalities}\n\n"
f"### Reporter Blocks\nDescription: {reporter_description}\nBlocks:\n{reporter_opcodes_functionalities}\n\n"
f"### Stack Blocks\nDescription: {stack_description}\nBlocks:\n{stack_opcodes_functionalities}\n\n"
"-----------------------------------\n\n"
"Your task is to define the primary actions and movements for each sprite.\n"
"The output should be a JSON object with a single key 'action_overall_flow'. Each key inside this object should be a sprite name (e.g., 'Player', 'Enemy'), and its value must include a 'description' and a list of 'plans'.\n"
"Each plan must begin with a **single Scratch Hat Block** (e.g., 'event_whenflagclicked') and should contain:\n"
"1. **'event'**: the exact `opcode` of the hat block that initiates the logic.\n"
"2. **'logic'**: a natural language breakdown of each step taken after the event. Separate each step with a semicolon ';'. Ensure clarity and granularity—each described action should map closely to a Scratch block or tight sequence.\n"
" - For example: for a jump: 'change y by N; wait M seconds; change y by -N'.\n"
" - Use 'forever: ...' to prefix repeating logic explicitly.\n"
" - Use Scratch-consistent verbs: 'move', 'change', 'wait', 'hide', 'say', etc.\n"
"3. **Opcode Lists**: include relevant Scratch opcodes grouped under `motion`, `control`, `operator`, `sensing`, `looks`, `sounds`, `events`, and `data`. List only the non-empty categories. Use exact opcodes including shadow/helper blocks (e.g., 'math_number').\n\n"
"Use sprite names exactly as listed in `sprite_names`. Do NOT rename or invent new sprites.\n"
"Ensure the plan reflects accurate opcode usage derived strictly from the block reference above.\n\n"
"Example structure for 'action_overall_flow':\n"
"```json\n"
"{\n"
" \"action_overall_flow\": {\n"
" \"Sprite1\": {\n"
" \"description\": \"Main character (cat) actions\",\n"
" \"plans\": [\n"
" {\n"
" \"event\": \"event_whenflagclicked\",\n"
" \"logic\": \"go to initial position at starting point.\",\n"
" \"motion\": [\"motion_gotoxy\"],\n"
" \"control\": [],\n"
" \"operator\": [],\n"
" \"sensing\": [],\n"
" \"looks\": [],\n"
" \"sounds\": [],\n"
" \"data\": []\n"
" },\n"
" {\n"
" \"event\": \"event_whenkeypressed\",\n"
" \"logic\": \"repeat(10): change y by 10; wait 0.1 seconds; change y by -10;\",\n"
" \"motion\": [\"motion_changeyby\"],\n"
" \"control\": [\"control_repeat\", \"control_wait\"],\n"
" \"operator\": [],\n"
" \"sensing\": [],\n"
" \"looks\": [],\n"
" \"sounds\": [],\n"
" \"data\": []\n"
" }\n"
" ]\n"
" },\n"
" \"soccer ball\": {\n"
" \"description\": \"Obstacle movement and interaction\",\n"
" \"plans\": [\n"
" {\n"
" \"event\": \"event_whenflagclicked\",\n"
" \"logic\": \"go to x:240 y:-135; forever: glide 2 seconds to x:-240 y:-135; if x position < -235, then set x to 240; if touching Sprite1, then hide;\",\n"
" \"motion\": [\"motion_gotoxy\", \"motion_glidesecstoxy\", \"motion_xposition\", \"motion_setx\"],\n"
" \"control\": [\"control_forever\", \"control_if\"],\n"
" \"operator\": [\"operator_lt\"],\n"
" \"sensing\": [\"sensing_istouching\", \"sensing_touchingobjectmenu\"],\n"
" \"looks\": [\"looks_hide\"],\n"
" \"data\": []\n"
" }\n"
" ]\n"
" }\n"
" }\n"
"}\n"
"```\n"
"Based on the provided context, generate the `action_overall_flow`."
)
try:
response = agent.invoke({"messages": [{"role": "user", "content": planning_prompt}]})
raw_response = response["messages"][-1].content
print("Raw response from LLM [OverallPlannerNode]:", raw_response) # Uncomment for debugging
# json debugging and solving
try:
overall_plan = extract_json_from_llm_response(raw_response)
except json.JSONDecodeError:
logger.error("Failed to extract JSON from LLM response. Attempting to correct the response.")
# Use the JSON resolver agent to fix the response
correction_prompt = (
"Correct the following JSON respose to ensure it follows proper schema rules:\n\n"
"Take care of following instruction carefully:\n"
"1. Do **NOT** put anything inside the json just correct and resolve the issues inside the json.\n"
"2. Do **NOT** add any additional text, comments, or explanations.\n"
"3. Just response correct the json where you find and put the same content as it is.\n"
"Here is the raw response:\n\n"
f"raw_response: {raw_response}\n\n"
)
correction_response = agent_json_resolver.invoke({"messages": [{"role": "user", "content": correction_prompt}]})
print(f"[JSON CORRECTOR RESPONSE AT OVERALLPLANNERNODE ]: {correction_response}")
overall_plan= extract_json_from_llm_response(correction_response["messages"][-1].content)
state["action_plan"] = overall_plan
logger.info("Overall plan generated by OverallPlannerNode.")
return state
except Exception as e:
logger.error(f"Error in OverallPlannerNode: {e}")
raise
# Helper function to get a block by its opcode from a single catalog
def get_block_by_opcode(catalog_data: dict, opcode: str) -> dict | None:
"""
Search a single catalog (with keys "description" and "blocks": List[dict])
for a block whose 'op_code' matches the given opcode.
Returns the block dict or None if not found.
"""
for block in catalog_data["blocks"]:
if block.get("op_code") == opcode:
return block
return None
# Helper function to find a block in all catalogs by opcode
def find_block_in_all(opcode: str, all_catalogs: list[dict]) -> dict | None:
"""
Search across multiple catalogs for a given opcode.
Returns the first matching block dict or None.
"""
for catalog in all_catalogs:
blk = get_block_by_opcode(catalog, opcode)
if blk is not None:
return blk
return None
# Node 4: Sprite Plan Verification Node
def plan_verification_node(state: GameState):
"""
Validates the generated action plan/blocks, identifies missing logic,
provides feedback, and determines if further improvements are needed.
Also manages the iteration count for the improvement loop.
"""
logger.info(f"--- Running VerificationNode (Iteration: {state.get('iteration_count', 0)}) ---")
MAX_IMPROVEMENT_ITERATIONS = 1 # Set a sensible limit to prevent infinite loops
current_iteration = state.get("iteration_count", 0)
#project_json = state["project_json"]
action_plan = state.get("action_plan", {})
print(f"[action_plan before verification] on ({current_iteration}): {json.dumps(action_plan, indent=2)}")
#improvement_plan = state.get("improvement_plan", {}) # May contain prior improvement guidance
# Corrected validation_prompt
validation_prompt = (
f"You are an AI validator for Scratch project plans and generated blocks. "
f"Your task is to review the current state of the game's action plan and block structure. "
f"Critically analyze if there are any missing logic, structural inconsistencies, or unclear intentions. "
f"Provide **precise** and **constructive** feedback for improvement.\n\n"
f"**Game Description:**\n"
f"{state.get('description', '')}\n\n"
f"**Current Action Plan (High-Level Logic):**\n"
f"```json\n{json.dumps(action_plan, indent=2)}\n```\n\n"
# Uncomment below if needed
# f"**Current Project JSON (Generated Blocks):**\n"
# f"```json\n{json.dumps(project_json, indent=2)}\n```\n\n"
f"**Previous Feedback (if any):**\n"
f"{state.get('plan_validation_feedback', 'None')}\n\n"
f"Based on the above, return a response strictly in the following JSON format:\n"
"```json\n"
"{\n"
" \"feedback\": \"Detailed comments on any missing logic, inconsistencies, or unclear intent. Be concise but specific. If everything is perfect, state that explicitly.\",\n"
" \"needs_improvement\": true,\n"
" \"suggested_description_updates\": \"Concise revision of the game description if needed. Use an empty string if no change is required.\"\n"
"}\n"
"```\n"
"**Important:**\n"
"- The `needs_improvement` field must be strictly `true` or `false` (boolean). Do **not** include any other text or explanation inside the JSON.\n"
"- Be strict in evaluation. If **any** part of the plan or block logic appears incomplete, ambiguous, or incorrect, set `needs_improvement` to `true`."
)
try:
response = agent.invoke({"messages": [{"role": "user", "content": validation_prompt}]})
raw_response = response["messages"][-1].content
logger.info(f"Raw response from LLM [VerificationNode]: {raw_response[:500]}...")
# json debugging and solving
try:
validation_result = extract_json_from_llm_response(raw_response)
except json.JSONDecodeError:
logger.error("Failed to extract JSON from LLM response. Attempting to correct the response.")
# Use the JSON resolver agent to fix the response
correction_prompt = (
"Correct the following JSON respose to ensure it follows proper schema rules:\n\n"
"Take care of following instruction carefully:\n"
"1. Do **NOT** put anything inside the json just correct and resolve the issues inside the json.\n"
"2. Do **NOT** add any additional text, comments, or explanations.\n"
"3. Just response correct the json where you find and put the same content as it is.\n"
"Here is the raw response:\n\n"
f"raw_response: {raw_response}\n\n"
)
correction_response = agent_json_resolver.invoke({"messages": [{"role": "user", "content": correction_prompt}]})
print(f"[JSON CORRECTOR RESPONSE AT PLANVERIFICATIONNODE ]: {correction_response}")
validation_result = extract_json_from_llm_response(correction_response["messages"][-1].content)
# Update state with feedback and improvement flag
state["plan_validation_feedback"] = validation_result.get("feedback", "No specific feedback provided.")
state["needs_improvement"] = validation_result.get("needs_improvement", False)
suggested_description_updates = validation_result.get("suggested_description_updates", "")
if suggested_description_updates:
# You might want to append or intelligently merge this with the existing detailed_game_description
# For simplicity, let's just append for now or update a specific field
#current_description = state.get("detailed_game_description", state.get("description", ""))
current_description = state.get(state.get("description", ""))
#state["detailed_game_description"] = f"{current_description}\n\nSuggested Update: {suggested_description_updates}"\
state["description"] = f"{current_description}\n\nSuggested Update: {suggested_description_updates}"
logger.info("Updated detailed game description based on validation feedback.")
# Manage iteration count
if state["needs_improvement"]:
state["iteration_count"] = current_iteration + 1
if state["iteration_count"] >= MAX_IMPROVEMENT_ITERATIONS:
logger.warning(f"Max improvement iterations ({MAX_IMPROVEMENT_ITERATIONS}) reached. Forcing 'needs_improvement' to False.")
state["needs_improvement"] = False
state["plan_validation_feedback"] += "\n(Note: Max iterations reached, stopping further improvements.)"
else:
state["iteration_count"] = 0 # Reset if no more improvement needed
logger.info(f"Verification completed. Needs Improvement: {state['needs_improvement']}. Feedback: {state['plan_validation_feedback'][:100]}...")
print(f"[updated action_plan after verification] on ({current_iteration}): {json.dumps(state.get("action_plan", {}), indent=2)}")
return state
except Exception as e:
logger.error(f"Error in VerificationNode: {e}")
state["needs_improvement"] = False # Force end loop on error
state["plan_validation_feedback"] = f"Validation error: {e}"
raise
# Node 5: Refined Planner Node
def refined_planner_node(state: GameState):
"""
Refines the action plan based on validation feedback and game description.
"""
logger.info("--- Running RefinedPlannerNode ---")
#detailed_game_description = state.get("detailed_game_description", state.get("description", "A game."))
detailed_game_description = state.get("description","")
current_action_plan = state.get("action_plan", {})
print(f"[current_action_plan before refinement] on ({state.get('iteration_count', 0)}): {json.dumps(current_action_plan, indent=2)}")
plan_validation_feedback = state.get("plan_validation_feedback", "No specific feedback provided. Assume general refinement is needed.")
project_json = state["project_json"]
sprite_names = [t["name"] for t in project_json["targets"] if not t["isStage"]]
# Get sprite positions from the project_json if available, or set defaults
sprite_positions = {}
for target in project_json["targets"]:
if not target["isStage"]:
sprite_positions[target["name"]] = {"x": target.get("x", 0), "y": target.get("y", 0)}
refinement_prompt = (
"Refine the existing action plan for the game's sprites based on the detailed game description and the validation feedback provided.\n\n"
f"**Detailed Game Description:** '{detailed_game_description}'\n\n"
f"**Sprites in Game:** {', '.join(sprite_names)}\n"
f"**Current Sprite Positions:** {json.dumps(sprite_positions)}\n\n"
f"**Current Action Plan (to be refined):**\n"
f"```json\n{json.dumps(current_action_plan, indent=2)}\n```\n\n"
f"**Validation Feedback:**\n"
f"'{plan_validation_feedback}'\n\n"
"--- Scratch 3.0 Block Reference ---\n"
f"### Hat Blocks\nDescription: {hat_description}\nBlocks:\n{hat_opcodes_functionalities}\n\n"
f"### Boolean Blocks\nDescription: {boolean_description}\nBlocks:\n{boolean_opcodes_functionalities}\n\n"
f"### C Blocks\nDescription: {c_description}\nBlocks:\n{c_opcodes_functionalities}\n\n"
f"### Cap Blocks\nDescription: {cap_description}\nBlocks:\n{cap_opcodes_functionalities}\n\n"
f"### Reporter Blocks\nDescription: {reporter_description}\nBlocks:\n{reporter_opcodes_functionalities}\n\n"
f"### Stack Blocks\nDescription: {stack_description}\nBlocks:\n{stack_opcodes_functionalities}\n\n"
"-----------------------------------\n\n"
"Your task is to refine the JSON object 'action_overall_flow'.\n"
"Use sprite names exactly as provided in `sprite_names` (e.g., 'Sprite1', 'soccer ball'); do **NOT** rename them.\n\n"
"Follow this exact format for the output (example):\n"
"Example structure for 'action_overall_flow':\n"
"```json\n"
"{\n"
" \"action_overall_flow\": {\n"
" \"Sprite1\": {\n"
" \"description\": \"Main character (cat) actions\",\n"
" \"plans\": [\n"
" {\n"
" \"event\": \"event_whenflagclicked\",\n"
" \"logic\": \"go to initial position at starting point.\",\n"
" \"motion\": [\"motion_gotoxy\"],\n"
" \"control\": [],\n"
" \"operator\": [],\n"
" \"sensing\": [],\n"
" \"looks\": [],\n"
" \"sounds\": [],\n"
" \"data\": []\n"
" },\n"
" {\n"
" \"event\": \"event_whenkeypressed\",\n"
" \"logic\": \"repeat(10): change y by 10 → wait 0.1 sec → change y by -10\",\n"
" \"motion\": [\"motion_changeyby\"],\n"
" \"control\": [\"control_repeat\", \"control_wait\"],\n"
" \"operator\": [],\n"
" \"sensing\": [],\n"
" \"looks\": [],\n"
" \"sounds\": [],\n"
" \"data\": []\n"
" }\n"
" ]\n"
" },\n"
" \"soccer ball\": {\n"
" \"description\": \"Obstacle movement and interaction\",\n"
" \"plans\": [\n"
" {\n"
" \"event\": \"event_whenflagclicked\",\n"
" \"logic\": \"go to x:240 y:-135 → forever glide to x:-240 y:-135 → if x position < -235 then set x to 240 → if touching Sprite1 then hide\",\n"
" \"motion\": [\"motion_gotoxy\", \"motion_glidesecstoxy\", \"motion_xposition\", \"motion_setx\"],\n"
" \"control\": [\"control_forever\", \"control_if\"],\n"
" \"operator\": [\"operator_lt\"],\n"
" \"sensing\": [\"sensing_istouching\", \"sensing_touchingobjectmenu\"],\n"
" \"looks\": [\"looks_hide\"],\n"
" \"data\": []\n"
" }\n"
" ]\n"
" }\n"
" }\n"
"}\n"
"```\n"
"Use the validation feedback to address errors, fill in missing logic, or enhance clarity.\n"
"example of few possible improvements: 1.event_whenflagclicked is used to control sprite but its used for actual start scratch project and reset scratch. 2. looping like forever used where we should use iterative. 3. missing of for variable we used in the block\n"
"- Maintain the **exact JSON structure** shown above.\n"
"- All `logic` fields must be **clear and granular**.\n"
"- Only include opcode categories that contain relevant opcodes.\n"
"- Ensure that each opcode matches its intended Scratch functionality.\n"
"- If feedback suggests major change, **rethink the entire plan** for the affected sprite(s).\n"
"- If feedback is minor, make precise, minimal improvements only."
)
try:
response = agent.invoke({"messages": [{"role": "user", "content": refinement_prompt}]})
raw_response = response["messages"][-1].content
logger.info(f"Raw response from LLM [RefinedPlannerNode]: {raw_response[:500]}...")
# json debugging and solving
try:
refined_plan = extract_json_from_llm_response(raw_response)
except json.JSONDecodeError:
logger.error("Failed to extract JSON from LLM response. Attempting to correct the response.")
# Use the JSON resolver agent to fix the response
correction_prompt = (
"Correct the following JSON respose to ensure it follows proper schema rules:\n\n"
"Take care of following instruction carefully:\n"
"1. Do **NOT** put anything inside the json just correct and resolve the issues inside the json.\n"
"2. Do **NOT** add any additional text, comments, or explanations.\n"
"3. Just response correct the json where you find and put the same content as it is.\n"
"Here is the raw response:\n\n"
f"raw_response: {raw_response}\n\n"
)
correction_response = agent_json_resolver.invoke({"messages": [{"role": "user", "content": correction_prompt}]})
print(f"[JSON CORRECTOR RESPONSE AT REFINEPLANNER ]: {correction_response}")
refined_plan = extract_json_from_llm_response(correction_response["messages"][-1].content)
logger.info("Refined plan corrected by JSON resolver agent.")
if refined_plan:
#state["action_plan"] = refined_plan.get("action_overall_flow", {}) # Update to the key 'action_overall_flow' [error]
state["action_plan"] = refined_plan.get("action_overall_flow", {}) # Update the main the prompt includes updated only
logger.info("Action plan refined by RefinedPlannerNode.")
else:
logger.warning("RefinedPlannerNode did not return a valid 'action_overall_flow' structure. Keeping previous plan.")
print("[Refined Action Plan]:", json.dumps(state["action_plan"], indent=2))
print("[current state after refinement]:", json.dumps(state, indent=2))
return state
except Exception as e:
logger.error(f"Error in RefinedPlannerNode: {e}")
raise
# Node 4: Overall Block Builder Node
def overall_block_builder_node(state: GameState):
logger.info("--- Running OverallBlockBuilderNode ---")
project_json = state["project_json"]
targets = project_json["targets"]
sprite_map = {target["name"]: target for target in targets if not target["isStage"]}
# Also get the Stage target
stage_target = next((target for target in targets if target["isStage"]), None)
if stage_target:
sprite_map[stage_target["name"]] = stage_target
# Pre-load all block-catalog JSONs once
all_catalogs = [
hat_block_data,
boolean_block_data,
c_block_data,
cap_block_data,
reporter_block_data,
stack_block_data
]
action_plan = state.get("action_plan", {})
print("[Overall Action Plan received at the block generator]:", json.dumps(action_plan, indent=2))
if not action_plan:
logger.warning("No action plan found in state. Skipping OverallBlockBuilderNode.")
return state
script_y_offset = {}
script_x_offset_per_sprite = {name: 0 for name in sprite_map.keys()}
# This is the handler which ensure if somehow json response changed it handle it.[DONOT REMOVE BELOW LOGIC]
if action_plan.get("action_overall_flow", {})=={}:
plan_data = action_plan.items()
else:
plan_data= action_plan.get("action_overall_flow", {}).items()
for sprite_name, sprite_actions_data in plan_data:
#for sprite_name, sprite_actions_data in action_plan.items():
if sprite_name in sprite_map:
current_sprite_target = sprite_map[sprite_name]
if "blocks" not in current_sprite_target:
current_sprite_target["blocks"] = {}
if sprite_name not in script_y_offset:
script_y_offset[sprite_name] = 0
for plan_entry in sprite_actions_data.get("plans", []):
event_opcode = plan_entry["event"] # This is now expected to be an opcode
logic_sequence = plan_entry["logic"] # This is the semicolon-separated string
# Extract the new opcode lists from the plan_entry
motion_opcodes = plan_entry.get("motion", [])
control_opcodes = plan_entry.get("control", [])
operator_opcodes = plan_entry.get("operator", [])
sensing_opcodes = plan_entry.get("sensing", [])
looks_opcodes = plan_entry.get("looks", [])
sound_opcodes = plan_entry.get("sound", [])
events_opcodes = plan_entry.get("events", [])
data_opcodes = plan_entry.get("data", [])
# Create a string representation of the identified opcodes for the prompt
identified_opcodes_str = ""
if motion_opcodes:
identified_opcodes_str += f" Motion Blocks (opcodes): {', '.join(motion_opcodes)}\n"
if control_opcodes:
identified_opcodes_str += f" Control Blocks (opcodes): {', '.join(control_opcodes)}\n"
if operator_opcodes:
identified_opcodes_str += f" Operator Blocks (opcodes): {', '.join(operator_opcodes)}\n"
if sensing_opcodes:
identified_opcodes_str += f" Sensing Blocks (opcodes): {', '.join(sensing_opcodes)}\n"
if looks_opcodes:
identified_opcodes_str += f" Looks Blocks (opcodes): {', '.join(looks_opcodes)}\n"
if sound_opcodes:
identified_opcodes_str += f" Sound Blocks (opcodes): {', '.join(sound_opcodes)}\n"
if events_opcodes:
identified_opcodes_str += f" Event Blocks (opcodes): {', '.join(events_opcodes)}\n"
if data_opcodes:
identified_opcodes_str += f" Data Blocks (opcodes): {', '.join(data_opcodes)}\n"
needed_opcodes = motion_opcodes + control_opcodes + operator_opcodes + sensing_opcodes + looks_opcodes + sound_opcodes + events_opcodes + data_opcodes
needed_opcodes = list(set(needed_opcodes))
# 2) build filtered runtime catalog (if you still need it)
filtered_catalog = {
op: ALL_SCRATCH_BLOCKS_CATALOG[op]
for op in needed_opcodes
if op in ALL_SCRATCH_BLOCKS_CATALOG
}
# 3) merge human‑written catalog + runtime entry for each opcode
combined_blocks = {}
for op in needed_opcodes:
catalog_def = find_block_in_all(op, all_catalogs) or {}
runtime_def = ALL_SCRATCH_BLOCKS_CATALOG.get(op, {})
# merge: catalog fields first, then runtime overrides/adds
merged = {**catalog_def, **runtime_def}
combined_blocks[op] = merged
print("[Combined blocks for this script]:", json.dumps(combined_blocks, indent=2))
llm_block_generation_prompt = (
f"You are an AI assistant generating a **single complete Scratch 3.0 script** in JSON format.\n"
f"The current sprite is '{sprite_name}'.\n"
f"This script must start with the event block (Hat Block) for opcode '{event_opcode}'.\n"
f"The sequential logic to implement for this script is:\n"
f" Logic: {logic_sequence}\n\n"
f"**Required Block Opcodes & Catalog:**\n"
f"Based on the planning, the following specific Scratch block opcodes are expected to be used. You MUST use these opcodes where applicable.\n"
f"Here is the comprehensive catalog for these blocks, including their structure and required inputs/fields:\n"
f"```json\n{json.dumps(combined_blocks, indent=2)}\n```\n\n"
f"Current Scratch project JSON for this sprite (provided for context; you are generating a NEW, complete script):\n"
f"```json\n{json.dumps(current_sprite_target, indent=2)}\n```\n\n"
f"**CRITICAL INSTRUCTIONS FOR GENERATING THE BLOCK JSON (READ CAREFULLY AND FOLLOW PRECISELY):**\n"
f"1. **Unique Block IDs:** Generate a **globally unique ID** for EVERY block (main and shadow blocks) within the entire JSON output for this script. Example: 'myBlockID123'.\n"
f"2. **Script Initiation (Hat Block - VERY IMPORTANT):**\n"
f" * The **first block** of the script (the Hat block, opcode '{event_opcode}') MUST have `\"topLevel\": true` and `\"parent\": null`.\n"
f" * ONLY this Hat block should have `\"topLevel\": true`. All other blocks in the script MUST have `\"topLevel\": false`.\n"
f" * Set its `x` and `y` coordinates (e.g., `x: 0, y: 0` or similar for clear placement).\n"
f"3. **Strict Block Chaining (`next` and `parent`):**\n"
f" * Use the `next` field to point to the ID of the block DIRECTLY BELOW it in the stack. If a block has a `next`, its `next` block's `parent` MUST point back to the current block's ID.\n"
f" * Use the `parent` field to point to the ID of the block DIRECTLY ABOVE it in the stack. If a block has a `parent`, then that `parent` block's `next` MUST point to the current block's ID.\n"
f" * The **last block** in a linear stack (e.g., a Stack block not containing other blocks, or a Cap block) MUST have `\"next\": null`.\n"
f" * Blocks plugged into inputs (like Boolean reporters or Reporter blocks) or substacks (like inside C-blocks) do NOT use `next` to connect to the block holding them. Their connection is solely via the `inputs` field of their parent.\n"
f"4. **`inputs` Field Structure (ABSOLUTELY CRITICAL - ADHERE TO THIS RIGIDLY):**\n"
f" * The value for ANY key within the `inputs` dictionary MUST be an **array of EXACTLY two elements**: `[type_code, value_or_block_id]`.\n"
f" * **NEVER embed direct primitive values or arrays within `inputs` without a `type_code`**: E.g., `\"INPUT_NAME\": [\"value\", null]` or `\"INPUT_NAME\": [\"nestedArray\"]` are **STRICTLY FORBIDDEN and will cause errors**.\n"
f" * **`type_code` (first element):**\n"
f" * `1`: Use when `value_or_block_id` refers to a **primitive value** (number, string, boolean) or the ID of a **shadow block** (e.g., `math_number`, `text`, menu blocks).\n"
f" * `2`: Use when `value_or_block_id` refers to the ID of a **non-shadow block** that is PLUGGED IN (e.g., a Boolean block in a condition, or the first block of a C-block's `SUBSTACK`).\n"
f" * **Correct Examples (Re-emphasized):**\n"
f" * **Numerical Input (`motion_movesteps`):** To input `10` steps, define a separate `math_number` shadow block and reference its ID:\n"
f" ```json\n"
f" // Main block\n"
f" \"moveStepsID\": {{\n"
f" \"opcode\": \"motion_movesteps\", \"inputs\": {{ \"STEPS\": [1, \"shadowNumID\"] }},\n"
f" \"parent\": \"parentBlockID\", \"next\": \"nextBlockID\", \"topLevel\": false, \"shadow\": false\n"
f" }},\n"
f" // Separate shadow block for '10'\n"
f" \"shadowNumID\": {{\n"
f" \"opcode\": \"math_number\", \"fields\": {{ \"NUM\": [\"10\", null] }},\n"
f" \"parent\": \"moveStepsID\", \"shadow\": true, \"topLevel\": false\n"
f" }}\n"
f" ```\n"
f" * **Dropdown/Menu Input (`sensing_touchingobject` with 'edge'):** Define a separate menu shadow block and reference its ID:\n"
f" ```json\n"
f" // Main block\n"
f" \"touchingBlockID\": {{\n"
f" \"opcode\": \"sensing_touchingobject\", \"inputs\": {{ \"TOUCHINGOBJECTMENU\": [1, \"shadowEdgeMenuID\"] }},\n"
f" \"parent\": \"parentBlockID\", \"next\": null, \"topLevel\": false, \"shadow\": false\n"
f" }},\n"
f" // Separate shadow block for '_edge_'\n"
f" \"shadowEdgeMenuID\": {{\n"
f" \"opcode\": \"sensing_touchingobjectmenu\", \"fields\": {{ \"TOUCHINGOBJECTMENU\": [\"_edge_\", null] }},\n"
f" \"parent\": \"touchingBlockID\", \"shadow\": true, \"topLevel\": false\n"
f" }}\n"
f" ```\n"
f" * **C-block (`control_forever`):** Its `SUBSTACK` input MUST point to the first block inside its loop using `type_code: 2`.\n"
f" ```json\n"
f" \"foreverBlockID\": {{\n"
f" \"opcode\": \"control_forever\", \"inputs\": {{ \"SUBSTACK\": [2, \"firstBlockInsideForeverID\"] }},\n"
f" \"next\": null, \"parent\": \"blockAboveForeverID\", \"shadow\": false, \"topLevel\": false\n"
f" }},\n"
f" \"firstBlockInsideForeverID\": {{\n"
f" \"opcode\": \"motion_movesteps\", \"parent\": \"foreverBlockID\", \"next\": null, \"topLevel\": false, \"shadow\": false\n"
f" }}\n"
f" ```\n"
f" * **C-block with Condition (`control_if`):** `CONDITION` input uses `type_code: 1` (referencing a Boolean reporter), and `SUBSTACK` uses `type_code: 2` (referencing the first block inside the if-body).\n"
f" ```json\n"
f" \"ifBlockID\": {{\n"
f" \"opcode\": \"control_if\",\n"
f" \"inputs\": {{\n"
f" \"CONDITION\": [1, \"conditionBlockID\"],\n"
f" \"SUBSTACK\": [2, \"firstBlockInsideIfID\"]\n"
f" }},\n"
f" \"next\": \"blockAfterIfID\", \"parent\": \"blockAboveIfID\", \"shadow\": false, \"topLevel\": false\n"
f" }},\n"
f" \"conditionBlockID\": {{\n"
f" \"opcode\": \"sensing_touchingobject\", \"parent\": \"ifBlockID\", \"shadow\": false, \"topLevel\": false // ... and its own inputs/fields\n"
f" }},\n"
f" \"firstBlockInsideIfID\": {{\n"
f" \"opcode\": \"looks_sayforsecs\", \"parent\": \"ifBlockID\", \"next\": null, \"topLevel\": false, \"shadow\": false\n"
f" }}\n"
f" ```\n"
f"5. **Separate Shadow Blocks (MANDATORY):** Every time a block's input requires a numeric value, string literal, or a selection from a dropdown/menu, you MUST define a **separate block entry** in the top-level blocks dictionary for that shadow. Each shadow block MUST have `\"shadow\": true` and `\"topLevel\": false`.\n"
f"6. **`fields` for Direct Values:** Use the `fields` dictionary ONLY if the block directly embeds a dropdown value or text field without an `inputs` connection (e.g., `NUM` field in `math_number`, `KEY_OPTION` in `event_whenkeypressed`, `VARIABLE` field in `data_setvariableto`). Example: `\"fields\": {{\"NUM\": [\"10\", null]}}`.\n"
f"7. **Strictly Use Catalog Opcodes:** You MUST only use `opcode` values that are present in the provided `combined_blocks` catalog. Do NOT use unlisted or hypothetical opcodes (e.g., `motion_jump`).\n"
f"8. **Output Format:** Return **ONLY the JSON object** representing all the blocks for THIS SINGLE SCRIPT. Do NOT wrap it in a 'blocks' key or the full project JSON. The output should be a dictionary where keys are block IDs and values are block definitions."
)
try:
response = agent.invoke({"messages": [{"role": "user", "content": llm_block_generation_prompt}]})
raw_response = response["messages"][-1].content
logger.info(f"Raw response from LLM [OverallBlockBuilderNode - {sprite_name} - {event_opcode}]: {raw_response[:500]}...")
print(f"Raw response from LLM [OverallBlockBuilderNode - {sprite_name} - {event_opcode}]: {raw_response}") # Uncomment for debugging
try:
generated_blocks = extract_json_from_llm_response(raw_response)
except json.JSONDecodeError:
logger.error("Failed to extract JSON from LLM response. Attempting to correct the response.")
# Use the JSON resolver agent to fix the response
correction_prompt = (
"Correct the following JSON respose to ensure it follows proper schema rules:\n\n"
"Take care of following instruction carefully:\n"
"1. Do **NOT** put anything inside the json just correct and resolve the issues inside the json.\n"
"2. Do **NOT** add any additional text, comments, or explanations.\n"
"3. Just response correct the json where you find and put the same content as it is.\n"
"Here is the raw response:\n\n"
f"raw_response: {raw_response}\n\n"
)
correction_response = agent_json_resolver.invoke({"messages": [{"role": "user", "content": correction_prompt}]})
print(f"[JSON CORRECTOR RESPONSE AT OVERALLBLOCKBUILDER ]: {correction_response}")
generated_blocks = extract_json_from_llm_response(correction_response["messages"][-1].content)
if "blocks" in generated_blocks and isinstance(generated_blocks["blocks"], dict):
logger.warning(f"LLM returned nested 'blocks' key for {sprite_name}. Unwrapping.")
generated_blocks = generated_blocks["blocks"]
# Update block positions for top-level script
for block_id, block_data in generated_blocks.items():
if block_data.get("topLevel"):
block_data["x"] = script_x_offset_per_sprite.get(sprite_name, 0)
block_data["y"] = script_y_offset[sprite_name]
script_y_offset[sprite_name] += 150 # Increment for next script
current_sprite_target["blocks"].update(generated_blocks)
# setting the iteration count for the script
state["iteration_count"] = 0
logger.info(f"Action blocks added for sprite '{sprite_name}', script '{event_opcode}' by OverallBlockBuilderNode.")
except Exception as e:
logger.error(f"Error generating blocks for sprite '{sprite_name}', script '{event_opcode}': {e}")
raise
state["project_json"] = project_json # Update the state with the modified project_json
logger.info("Updated project JSON with action nodes.")
print("Updated project JSON with action nodes:", json.dumps(project_json, indent=2)) # Print for direct visibility
return state # Return the state object
#helper function to identify the shape of block utilized by the block verifier
def get_block_type(opcode: str) -> str:
"""Determines the general type of a Scratch block based on its opcode."""
if not opcode:
return "unknown"
if opcode.startswith("event_when") or opcode == "control_start_as_clone":
return "hat"
elif opcode.startswith("control_") and ("if" in opcode or "repeat" in opcode or "forever" in opcode):
return "c_block"
elif opcode in ["operator_equals", "operator_gt", "operator_lt", "operator_and", "operator_or", "operator_not"] or \
(opcode.startswith("sensing_") and ("mousedown" in opcode or "keypressed" in opcode or "touching" in opcode)):
return "boolean"
elif opcode.endswith("menu"): # For dropdown shadow blocks, treat as reporter for type checking
return "reporter"
elif any(s in opcode for s in ["position", "direction", "size", "volume", "costume", "backdrop", "random", "add", "subtract", "multiply", "divide", "length", "item", "of"]):
# A more comprehensive check for reporters
return "reporter"
elif "stop_all" in opcode or "delete_this_clone" in opcode or "procedures_definition" in opcode:
return "cap"
# Default to stack for most command blocks if not explicitly defined
return ALL_SCRATCH_BLOCKS_CATALOG.get(opcode, {}).get("blockType", "stack")
#helper function to identify the shape of block utilized by the block verifier
def filter_script_blocks(all_blocks: dict, hat_block_id: str) -> dict:
"""
Filters and returns only the blocks that are part of a specific script
starting from the given hat_block_id, including connected reporters/shadows.
"""
script_blocks = {}
q = [hat_block_id]
visited = set()
while q:
current_block_id = q.pop(0)
if current_block_id in visited:
continue
visited.add(current_block_id)
block_data = all_blocks.get(current_block_id)
if not block_data:
continue
script_blocks[current_block_id] = block_data
# Add next block in sequence
next_id = block_data.get("next")
if next_id and next_id in all_blocks:
q.append(next_id)
# Add blocks connected via inputs (e.g., reporters, shadow blocks, substacks)
if "inputs" in block_data:
for input_name, input_value in block_data["inputs"].items():
if isinstance(input_value, list) and len(input_value) >= 2:
value_or_block_id = input_value[1]
if isinstance(value_or_block_id, str) and value_or_block_id in all_blocks:
q.append(value_or_block_id)
# For type code 3 (reporter with default value), the third element might be a connected block
if len(input_value) >= 3 and isinstance(input_value[2], str) and input_value[2] in all_blocks:
q.append(input_value[2])
# For C-blocks, add blocks in substacks (if present)
# SUBSTACKs are inputs that have type code 2 and contain the block ID of the first block in the substack
if get_block_type(block_data.get("opcode")) == "c_block":
for input_key, input_val in block_data.get("inputs", {}).items():
if input_key.startswith("SUBSTACK") and isinstance(input_val, list) and len(input_val) >= 2 and input_val[0] == 2 and isinstance(input_val[1], str) and input_val[1] in all_blocks:
q.append(input_val[1])
return script_blocks
def analyze_script_structure(script_blocks: dict, hat_block_id: str, sprite_name: str) -> list:
"""
Analyzes the structure of a single Scratch script for common errors.
Returns a list of issue strings.
"""
issues = []
# 1. Validate the hat block
hat_block = script_blocks.get(hat_block_id)
if not hat_block:
issues.append(f"Script for sprite '{sprite_name}' (hat ID: {hat_block_id}) has no hat block data.")
return issues # Cannot proceed without a hat block
if not hat_block.get("topLevel") or hat_block.get("parent") is not None:
issues.append(f"Hat block '{hat_block_id}' for sprite '{sprite_name}' is not marked as topLevel or has a parent.")
# 2. Check all blocks within the script
for block_id, block_data in script_blocks.items():
opcode = block_data.get("opcode")
if not opcode:
issues.append(f"Block '{block_id}' for sprite '{sprite_name}' is missing an opcode.")
continue
# Check if opcode exists in the catalog (simplified check for this example)
if opcode not in ALL_SCRATCH_BLOCKS_CATALOG:
issues.append(f"Block '{block_id}' for sprite '{sprite_name}' has unknown opcode '{opcode}'.")
# Parent-Child and Next-Previous Linkage
parent_id = block_data.get("parent")
next_id = block_data.get("next")
if parent_id:
parent_block = script_blocks.get(parent_id)
if not parent_block:
issues.append(f"Block '{block_id}' (opcode: {opcode}) for sprite '{sprite_name}' references non-existent parent '{parent_id}'.")
else:
parent_block_type = get_block_type(parent_block.get("opcode"))
current_block_type = get_block_type(opcode)
if parent_block_type in ["stack", "hat"]:
# For stack and hat blocks, the parent's 'next' should point to this block
if parent_block.get("next") != block_id:
issues.append(f"Block '{block_id}' (opcode: {opcode})'s parent '{parent_id}' does not link back to it via 'next'.")
elif parent_block_type == "c_block":
# For C-blocks, this block should be in a substack input
found_in_substack = False
for input_key, input_val in parent_block.get("inputs", {}).items():
if isinstance(input_val, list) and len(input_val) >= 2 and input_val[1] == block_id:
if input_val[0] == 2 and input_key.startswith("SUBSTACK"): # Type code 2 for substacks
found_in_substack = True
break
# Shadow blocks and reporter blocks can also be inputs to C-blocks but not as substacks
if not found_in_substack and not block_data.get("shadow") and current_block_type not in ["reporter", "boolean"]:
issues.append(f"Block '{block_id}' (opcode: {opcode}) has parent '{parent_id}' but is not a substack, shadow, or reporter/boolean connection to a C-block.")
elif parent_block_type in ["reporter", "boolean"]:
# Reporter/Boolean blocks can be parents if they are input to another block
found_in_input = False
for input_key, input_val in parent_block.get("inputs", {}).items():
if isinstance(input_val, list) and len(input_val) >= 2 and input_val[1] == block_id:
found_in_input = True
break
if not found_in_input:
issues.append(f"Block '{block_id}' (opcode: {opcode}) has reporter/boolean parent '{parent_id}' but is not linked via an input.")
else: # e.g., cap blocks cannot be parents to other blocks in a sequence
issues.append(f"Block '{block_id}' (opcode: {opcode}) has an unexpected parent block type '{parent_block_type}' for parent '{parent_id}'.")
if next_id:
next_block = script_blocks.get(next_id)
if not next_block:
issues.append(f"Block '{block_id}' (opcode: {opcode}) for sprite '{sprite_name}' references non-existent next block '{next_id}'.")
elif next_block.get("parent") != block_id:
issues.append(f"Block '{block_id}' (opcode: {opcode})'s next block '{next_id}' does not link back to it via 'parent'.")
elif block_data.get("shadow"):
issues.append(f"Shadow block '{block_id}' (opcode: {opcode}) for sprite '{sprite_name}' should not have a 'next' connection.")
elif get_block_type(opcode) == "hat": # Hat blocks should not generally have a 'next' in a linear script sequence
issues.append(f"Hat block '{block_id}' (opcode: {opcode}) for sprite '{sprite_name}' should not have a 'next' connection as a sequential block.")
elif get_block_type(opcode) == "cap":
issues.append(f"Cap block '{block_id}' (opcode: {opcode}) for sprite '{sprite_name}' should not have a 'next' connection as it signifies script end.")
# Input validation
if "inputs" in block_data:
for input_name, input_value in block_data["inputs"].items():
if not isinstance(input_value, list) or len(input_value) < 2:
issues.append(f"Block '{block_id}' (opcode: {opcode}) for sprite '{sprite_name}' has malformed input '{input_name}': {input_value}.")
continue
type_code = input_value[0]
value_or_block_id = input_value[1]
# Type code 1: number/string/boolean literal or block ID (reporter/boolean)
# Type code 2: block ID (substack or reporter/boolean plugged in)
# Type code 3: number/string/boolean literal with shadow, or reporter with default value if nothing plugged in
if type_code not in [1, 2, 3]:
issues.append(f"Block '{block_id}' (opcode: {opcode}) input '{input_name}' has invalid type code: {type_code}. Expected 1, 2, or 3.")
if isinstance(value_or_block_id, str):
# It's a block ID, check if it exists and its parent link is correct
connected_block = script_blocks.get(value_or_block_id)
if not connected_block:
issues.append(f"Block '{block_id}' (opcode: {opcode}) input '{input_name}' references non-existent block ID '{value_or_block_id}'.")
elif connected_block.get("parent") != block_id:
issues.append(f"Block '{block_id}' (opcode: {opcode}) input '{input_name}' connects to '{value_or_block_id}', but '{value_or_block_id}'s parent is not '{block_id}'.")
# Check for type code consistency with connected block type
elif type_code == 2 and get_block_type(connected_block.get("opcode")) not in ["stack", "hat", "c_block", "cap", "reporter", "boolean"]:
issues.append(f"Block '{block_id}' (opcode: {opcode}) input '{input_name}' has type code 2 but connected block '{value_or_block_id}' is not a valid block type for substack/input.")
elif type_code == 1 and get_block_type(connected_block.get("opcode")) not in ["reporter", "boolean"]:
issues.append(f"Block '{block_id}' (opcode: {opcode}) input '{input_name}' has type code 1 but connected block '{value_or_block_id}' is not a reporter or boolean (expected for direct value input).")
# Specific checks for C-blocks' SUBSTACK
if block_data.get("blockType") == "c_block" and input_name.startswith("SUBSTACK"): # Changed to startswith as SUBSTACK1, SUBSTACK2 might exist
if not (isinstance(value_or_block_id, str) and script_blocks.get(value_or_block_id) and type_code == 2):
issues.append(f"C-block '{block_id}' (opcode: {opcode}) for sprite '{sprite_name}' has an invalid or missing SUBSTACK input configuration.")
# Shadow block specific checks
if block_data.get("shadow"):
if block_data.get("topLevel"):
issues.append(f"Shadow block '{block_id}' (opcode: {opcode}) for sprite '{sprite_name}' is incorrectly marked as topLevel.")
if not block_data.get("parent"):
issues.append(f"Shadow block '{block_id}' (opcode: {opcode}) for sprite '{sprite_name}' is missing a parent.")
if block_data.get("next"):
issues.append(f"Shadow block '{block_id}' (opcode: {opcode}) for sprite '{sprite_name}' should not have a 'next' connection.")
# Check fields for math_number and menu blocks
if opcode == "math_number" and not (isinstance(block_data.get("fields", {}).get("NUM"), list) and len(block_data["fields"]["NUM"]) >= 1 and isinstance(block_data["fields"]["NUM"][0], (str, int, float))):
issues.append(f"Math_number shadow block '{block_id}' (opcode: {opcode}) has malformed 'NUM' field.")
# This logic for menu shadow blocks assumes the field name is the opcode in uppercase without _MENU
# Example: for "looks_costumemenu", it expects a field "COSTUME"
if opcode.endswith("menu"):
expected_field = opcode.upper().replace("_MENU", "")
if not (isinstance(block_data.get("fields", {}).get(expected_field), list) and len(block_data["fields"][expected_field]) >= 1 and isinstance(block_data["fields"][expected_field][0], str)):
issues.append(f"Menu shadow block '{block_id}' (opcode: {opcode}) has malformed field '{expected_field}' for its specific menu option.")
return issues
# Node 5: Verification Node
def block_verification_node(state: dict) -> dict:
"""
The block verifier check for the if any improvement need if any through logical if else and then add it to improvement_plan.
aftet improvement plan the llm reviewer node also check for other error or issues if any and at last give the review as feedback.
Args:
state (dict): _description_
Returns:
dict: _description_
"""
logger.info(f"--- Running BlockVerificationNode (Iteration: {state.get('iteration_count', 0)}) ---")
MAX_IMPROVEMENT_ITERATIONS = 1 # Set a sensible limit to prevent infinite loops
current_iteration = state.get("iteration_count", 0)
project_json = state["project_json"]
targets = project_json["targets"]
# Initialize needs_improvement for the current run
state["needs_improvement"] = False
block_validation_feedback_overall = []
improvement_plan = {"sprite_issues": {}}
for target in targets:
sprite_name = target["name"]
all_blocks_for_sprite = target.get("blocks", {})
if not all_blocks_for_sprite:
logger.info(f"Sprite '{sprite_name}' has no blocks. Skipping verification.")
continue
sprite_issues = []
hat_block_ids = [
block_id for block_id, block_data in all_blocks_for_sprite.items()
if block_data.get("topLevel") and get_block_type(block_data.get("opcode")) == "hat"
]
processed_script_blocks = set()
if not hat_block_ids:
sprite_issues.append("No top-level hat blocks found for this sprite. Scripts may not run.")
for block_id, block_data in all_blocks_for_sprite.items():
if block_data.get("topLevel") and not get_block_type(block_data.get("opcode")) == "hat":
sprite_issues.append(f"Top-level block '{block_id}' (opcode: {block_data.get('opcode')}) is not a hat block, so it will not run automatically.")
if not block_data.get("topLevel") and not block_data.get("parent") and not block_data.get("shadow"):
sprite_issues.append(f"Orphaned block '{block_id}' (opcode: {block_data.get('opcode')}) is not top-level, has no parent, and is not a shadow block.")
for hat_id in hat_block_ids:
logger.info(f"Verifying script starting with hat block '{hat_id}' for sprite '{sprite_name}'.")
current_script_blocks = filter_script_blocks(all_blocks_for_sprite, hat_id)
processed_script_blocks.update(current_script_blocks.keys())
script_issues = analyze_script_structure(current_script_blocks, hat_id, sprite_name)
if script_issues:
sprite_issues.append(f"Issues in script starting with '{hat_id}':")
sprite_issues.extend([f" - {issue}" for issue in script_issues])
else:
logger.info(f"Script starting with '{hat_id}' for sprite '{sprite_name}' passed basic verification.")
orphaned_blocks_overall = {
block_id for block_id in all_blocks_for_sprite.keys()
if block_id not in processed_script_blocks
and not all_blocks_for_sprite[block_id].get("topLevel")
and not all_blocks_for_sprite[block_id].get("parent")
}
if orphaned_blocks_overall:
sprite_issues.append(f"Found {len(orphaned_blocks_overall)} truly orphaned blocks not connected to any valid script: {', '.join(list(orphaned_blocks_overall)[:5])}{'...' if len(orphaned_blocks_overall) > 5 else ''}.")
if sprite_issues:
improvement_plan["sprite_issues"][sprite_name] = sprite_issues
logger.warning(f"Verification found issues for sprite '{sprite_name}'.")
block_validation_feedback_overall.append(f"Issues for {sprite_name}:\n" + "\n".join([f"- {issue}" for issue in sprite_issues]))
state["needs_improvement"] = True
print(f"\n--- Verification Report (Issues Found for {sprite_name}) ---")
print(json.dumps({sprite_name: sprite_issues}, indent=2))
else:
logger.info(f"Sprite '{sprite_name}' passed all verification checks.")
# Consolidate feedback for the LLM
state["block_validation_feedback"] = "\n\n".join(block_validation_feedback_overall)
print(F"[OVERALL IMPROVEMENT PLAN ON ITERATION {current_iteration}]: {improvement_plan}")
if state["needs_improvement"]:
llm_reviewer_prompt = (
"You are an expert Scratch project reviewer. Your task is to analyze the provided "
"structural issues found in a Scratch project's sprites and suggest improvements or "
"further insights. Focus on clarity, accuracy, and actionable advice.\n\n"
"Here are the detected structural issues:\n"
f"{json.dumps(improvement_plan['sprite_issues'], indent=2)}\n\n"
"Here are the block validation feedback:\n"
f"{json.dumps(state["block_validation_feedback"], indent=2)}\n\n"
"Please review these issues and provide a consolidated report with potential causes "
"and recommendations for fixing them. If an issue is minor or expected in certain "
"scenarios (e.g., hidden blocks for backward compatibility), please note that."
"Structure your response as a JSON object with 'review_summary' as the key, "
"containing a dictionary where keys are sprite names and values are lists of suggested improvements."
"Example:\n"
"```json\n"
"{\n"
" \"review_summary\": {\n"
" \"Sprite1\": [\n"
" \"Issue: Hat block 'abc' is not topLevel. Recommendation: Ensure all scripts start with a top-level hat block that has no parent.\",\n"
" \"Issue: Block 'xyz' has unknown opcode 'motion_nonexistent'. Recommendation: Verify the opcode against the Scratch 3.0 block reference. This might be a typo or a deprecated block.\"\n"
" ],\n"
" \"Sprite2\": [\n"
" \"Issue: Found 3 orphaned blocks. Recommendation: Reconnect these blocks to existing scripts or remove them if no longer needed.\"\n"
" ]\n"
" }\n"
"}\n"
"```"
)
try:
response = agent.invoke({"messages": [{"role": "user", "content": llm_reviewer_prompt}]})
raw_review_response = response["messages"][-1].content
try:
state["review_block_feedback"] = extract_json_from_llm_response(raw_review_response)
except json.JSONDecodeError:
logger.error("Failed to extract JSON from LLM response. Attempting to correct the response.")
# Use the JSON resolver agent to fix the response
correction_prompt = (
"Correct the following JSON respose to ensure it follows proper schema rules:\n\n"
"Take care of following instruction carefully:\n"
"1. Do **NOT** put anything inside the json just correct and resolve the issues inside the json.\n"
"2. Do **NOT** add any additional text, comments, or explanations.\n"
"3. Just response correct the json where you find and put the same content as it is.\n"
"Here is the raw response:\n\n"
f"raw_response: {raw_review_response}\n\n"
)
correction_response = agent_json_resolver.invoke({"messages": [{"role": "user", "content": correction_prompt}]})
print(f"[JSON CORRECTOR RESPONSE AT BLOCKVERFIER ]: {correction_response}")
state["review_block_feedback"] = extract_json_from_llm_response(correction_response["messages"][-1].content)
logger.info("Agent review feedback added to the state.")
print("\n--- Agent Review Feedback ---")
print(json.dumps(state["review_block_feedback"], indent=2))
except Exception as e:
logger.error(f"Error invoking agent for review in BlockVerificationNode: {e}")
state["review_block_feedback"] = {"review_summary": {"Overall": [f"Error during LLM review: {e}"]}}
else:
logger.info("BlockVerificationNode completed: No issues found in any sprite blocks.")
print("\n--- Verification Report (No Issues Found) ---")
state["block_validation_feedback"] = "No issues found in sprite blocks."
state["review_block_feedback"] = {"review_summary": {"Overall": ["No issues found in sprite blocks. All good!"]}}
# Manage iteration count based on overall needs_improvement flag
if state["needs_improvement"]:
state["iteration_count"] = current_iteration + 1
if state["iteration_count"] >= MAX_IMPROVEMENT_ITERATIONS:
logger.warning(f"Max improvement iterations ({MAX_IMPROVEMENT_ITERATIONS}) reached for block verification. Forcing 'needs_improvement' to False.")
state["needs_improvement"] = False
state["block_validation_feedback"] += "\n(Note: Max iterations reached for block verification, stopping further improvements.)"
state["improvement_plan"] = improvement_plan
state["review_block_feedback"] = {"review_summary": {"Overall": ["No issues found in sprite blocks. All good!"]}}
logger.info("BlockVerificationNode found issues and added an improvement plan to the state.")
print("\n--- Overall Verification Report (Issues Found) ---")
print(json.dumps(improvement_plan, indent=2))
else:
state["iteration_count"] = 0 # Reset if no more improvement needed for blocks
logger.info(f"Block verification completed. Needs Improvement: {state['needs_improvement']}. Feedback: {state['block_validation_feedback'][:100]}...")
print("===========================================================================")
print(f"[BLOCK VERIFICATION NODE: (improvement_plan)]:{state["improvement_plan"]}")
print(f"[BLOCK VERIFICATION NODE: (review_block_feedback)]:{state["review_block_feedback"]}")
return state
def improvement_block_builder_node(state: GameState):
logger.info("--- Running ImprovementBlockBuilderNode ---")
project_json = state["project_json"]
targets = project_json["targets"]
sprite_map = {target["name"]: target for target in targets if not target["isStage"]}
# Also get the Stage target
stage_target = next((target for target in targets if target["isStage"]), None)
if stage_target:
sprite_map[stage_target["name"]] = stage_target
# Pre-load all block-catalog JSONs once
all_catalogs = [
hat_block_data,
boolean_block_data,
c_block_data,
cap_block_data,
reporter_block_data,
stack_block_data
]
improvement_plan = state.get("improvement_plan", {})
block_verification_feedback = state.get("block_validation_feedback", "no feedback")
if not improvement_plan:
logger.warning("No improvement plan found in state. Skipping ImprovementBlockBuilderNode.")
return state
script_y_offset = {}
script_x_offset_per_sprite = {name: 0 for name in sprite_map.keys()}
# This is the handler which ensure if somehow json response changed it handle it.[DONOT REMOVE BELOW LOGIC]
if improvement_plan.get("improvement_overall_flow", {})=={}:
plan_data = improvement_plan.items()
else:
plan_data= improvement_plan.get("action_overall_flow", {}).items()
for sprite_name, sprite_improvements_data in plan_data:
if sprite_name in sprite_map:
current_sprite_target = sprite_map[sprite_name]
if "blocks" not in current_sprite_target:
current_sprite_target["blocks"] = {}
if sprite_name not in script_y_offset:
script_y_offset[sprite_name] = 0
for plan_entry in sprite_improvements_data.get("plans", []):
event_opcode = plan_entry["event"] # This is now expected to be an opcode
logic_sequence = plan_entry["logic"] # This is the semicolon-separated string
# Extract the new opcode lists from the plan_entry
motion_opcodes = plan_entry.get("motion", [])
control_opcodes = plan_entry.get("control", [])
operator_opcodes = plan_entry.get("operator", [])
sensing_opcodes = plan_entry.get("sensing", [])
looks_opcodes = plan_entry.get("looks", [])
sound_opcodes = plan_entry.get("sound", [])
events_opcodes = plan_entry.get("events", [])
data_opcodes = plan_entry.get("data", [])
needed_opcodes = (
motion_opcodes + control_opcodes + operator_opcodes +
sensing_opcodes + looks_opcodes + sound_opcodes +
events_opcodes + data_opcodes
)
needed_opcodes = list(set(needed_opcodes))
# 2) build filtered runtime catalog (if you still need it)
filtered_catalog = {
op: ALL_SCRATCH_BLOCKS_CATALOG[op]
for op in needed_opcodes
if op in ALL_SCRATCH_BLOCKS_CATALOG
}
# 3) merge human-written catalog + runtime entry for each opcode
combined_blocks = {}
for op in needed_opcodes:
catalog_def = find_block_in_all(op, all_catalogs) or {}
runtime_def = ALL_SCRATCH_BLOCKS_CATALOG.get(op, {})
# merge: catalog fields first, then runtime overrides/adds
merged = {**catalog_def, **runtime_def}
combined_blocks[op] = merged
print("Combined blocks for this script:", json.dumps(combined_blocks, indent=2))
llm_block_generation_prompt = (
f"You are an AI assistant generating Scratch 3.0 block JSON for a single script based on an improvement plan.\n"
f"The current sprite is '{sprite_name}'.\n"
f"The specific script to generate blocks for is for the event with opcode '{event_opcode}'.\n"
f"The sequential logic to implement is:\n"
f" Logic: {logic_sequence}\n\n"
f"**Based on the planning, the following Scratch block opcodes are expected to be used to implement this logic. Focus on using these specific opcodes where applicable, and refer to the ALL_SCRATCH_BLOCKS_CATALOG for their full structure and required inputs/fields:**\n"
f"Here is the comprehensive catalog of required Scratch 3.0 blocks:\n"
f"```json\n{json.dumps(combined_blocks, indent=2)}\n```\n\n"
f"Here is feedback ans suggetion you should take care of:\n"
f" suggestion:{block_verification_feedback}\n\n"
f"Current Scratch project JSON for this sprite (for context, its existing blocks if any, though you should generate a new script entirely if this is a hat block):\n"
f"```json\n{json.dumps(current_sprite_target, indent=2)}\n```\n\n"
f"**Instructions for generating the block JSON (EXTREMELY IMPORTANT - FOLLOW THESE EXAMPLES PRECISELY):**\n"
f"1. **Start with the event block (Hat Block):** This block's `topLevel` should be `true` and `parent` should be `null`. Its `x` and `y` coordinates should be set (e.g., `x: 0, y: 0` or reasonable offsets for multiple scripts).\n"
f"2. **Generate a sequence of connected blocks:** For each block, generate a **unique block ID** (e.g., 'myBlockID123').\n"
f"3. **Link blocks correctly:**\n"
f" * Use the `next` field to point to the ID of the block directly below it in the stack.\n"
f" * Use the `parent` field to point to the ID of the block directly above it in the stack.\n"
f" * For the last block in a stack, `next` should be `null`.\n"
f"4. **Handle `inputs` for parameters and substacks (CRITICAL DETAIL - PAY CLOSE ATTENTION TO EXAMPLES):**\n"
f" * The value for any key within the `inputs` dictionary MUST always be an **array** of two elements: `[type_code, value_or_block_id]`.\n"
f" * **STRICTLY FORBIDDEN MISTAKE:** DO NOT put an array like `[\"num\", \"value\"]` or `[\"_edge_\", null]` directly as the `value_or_block_id`. This is the source of past errors.\n"
f" * The `type_code` (first element of the array) indicates the nature of the input:\n"
f" * `1`: For a primitive value or a shadow block ID (e.g., a number, string, boolean, or reference to a shadow block). This is the most common type for direct values.\n"
f" * `2`: For a block ID where another block is directly plugged into this input (e.g., an operator block connected to an `if` condition, or the first block of a C-block's substack).\n"
f" * **Correct Example for Numerical Input (e.g., for `motion_movesteps` STEPS, or `motion_gotoxy` X/Y):**\n"
f" If you need to input the number `10` into a block, you MUST create a separate `math_number` shadow block for it, and then reference its ID.\n"
f" ```json\n"
f" // Main block using the number\n"
f" \"mainBlockID\": {{\n"
f" \"opcode\": \"motion_movesteps\",\n"
f" \"inputs\": {{\n"
f" \"STEPS\": [1, \"shadowNumID\"]\n"
f" }},\n"
f" // ... other fields\n"
f" }},\n"
f"\n"
f" // The separate shadow block for the number '10'\n"
f" \"shadowNumID\": {{\n"
f" \"opcode\": \"math_number\",\n"
f" \"fields\": {{\n"
f" \"NUM\": [\"10\", null]\n"
f" }},\n"
f" \"shadow\": true,\n"
f" \"parent\": \"mainBlockID\",\n"
f" \"topLevel\": false\n"
f" }}\n"
f" ```\n"
f" * **Correct Example for Dropdown/Menu Input (e.g., `sensing_touchingobject` with 'edge'):**\n"
f" If you need to select 'edge' for the `touching ()?` block, you MUST create a separate `sensing_touchingobjectmenu` shadow block and reference its ID.\n"
f" ```json\n"
f" // Main block using the dropdown selection\n"
f" \"touchingBlockID\": {{\n"
f" \"opcode\": \"sensing_touchingobject\",\n"
f" \"inputs\": {{\n"
f" \"TOUCHINGOBJECTMENU\": [1, \"shadowEdgeMenuID\"]\n"
f" }},\n"
f" // ... other fields\n"
f" }},\n"
f"\n"
f" // The separate shadow block for the 'edge' menu option\n"
f" \"shadowEdgeMenuID\": {{\n"
f" \"opcode\": \"sensing_touchingobjectmenu\",\n"
f" \"fields\": {{\n"
f" \"TOUCHINGOBJECTMENU\": [\"_edge_\", null]\n"
f" }},\n"
f" \"shadow\": true,\n"
f" \"parent\": \"touchingBlockID\",\n"
f" \"topLevel\": false\n"
f" }}\n"
f" ```\n"
f" * **Correct Example for C-block (e.g., `control_forever`):**\n"
f" The `control_forever` block MUST have a `SUBSTACK` input pointing to the first block inside its loop. The value for `SUBSTACK` must be an array: `[2, \"FIRST_BLOCK_IN_FOREVER_LOOP_ID\"]`.\n"
f" ```json\n"
f" \"foreverBlockID\": {{\n"
f" \"opcode\": \"control_forever\",\n"
f" \"inputs\": {{\n"
f" \"SUBSTACK\": [2, \"firstBlockInsideForeverID\"]\n"
f" }},\n"
f" \"next\": null,\n"
f" \"parent\": \"blockAboveForeverID\",\n"
f" \"shadow\": false,\n"
f" \"topLevel\": false\n"
f" }},\n"
f" \"firstBlockInsideForeverID\": {{\n"
f" // ... definition of the first block inside the forever loop\n"
f" \"parent\": \"foreverBlockID\",\n"
f" \"next\": \"secondBlockInsideForeverID\" // if there's another block\n"
f" }}\n"
f" ```\n"
f" * **Correct Example for C-block with condition (e.g., `control_if`):**\n"
f" The `control_if` block MUST have a `CONDITION` input (typically `type_code: 1` referencing a boolean reporter block) and a `SUBSTACK` input (`type_code: 2` referencing the first block inside the if-body).\n"
f" ```json\n"
f" \"ifBlockID\": {{\n"
f" \"opcode\": \"control_if\",\n"
f" \"inputs\": {{\n"
f" \"CONDITION\": [1, \"conditionBlockID\"],\n"
f" \"SUBSTACK\": [2, \"firstBlockInsideIfID\"]\n"
f" }},\n"
f" \"next\": \"blockAfterIfID\",\n"
f" \"parent\": \"blockAboveIfID\",\n"
f" \"shadow\": false,\n"
f" \"topLevel\": false\n"
f" }},\n"
f" \"conditionBlockID\": {{\n"
f" \"opcode\": \"sensing_touchingobject\", // Example condition block\n"
f" // ... definition for condition block, parent should be \"ifBlockID\"\n"
f" }},\n"
f" \"firstBlockInsideIfID\": {{\n"
f" // ... definition of the first block inside the if body\n"
f" \"parent\": \"ifBlockID\",\n"
f" \"next\": null // or next block if more\n"
f" }}\n"
f" ```\n"
f"5. **Define ALL Shadow Blocks Separately (THIS IS ESSENTIAL):** Every time a block's input requires a number, string literal, or a selection from a dropdown/menu, you MUST define a **separate block entry** in the top-level blocks dictionary for that shadow. Each shadow block MUST have `\"shadow\": true` and `\"topLevel\": false`.\n"
f"6. **`fields` for direct dropdown values/text:** Use the `fields` dictionary ONLY IF the block directly embeds a dropdown value or text field without an `inputs` connection (e.g., the `KEY_OPTION` field within the `event_whenkeypressed` shadow block itself, or the `variable` field on a `data_setvariableto` block). Example: `\"fields\": {{\"KEY_OPTION\": [\"space\", null]}}`.\n"
f"7. **`topLevel: true` for hat blocks:** Only the starting (hat) block of a script should have `\"topLevel\": true`.\n"
f"8. **Ensure Unique Block IDs:** Every block you generate (main blocks and shadow blocks) must have a unique ID within the entire script's block dictionary.\n"
f"9. **Strictly Use Catalog Opcodes:** You MUST only use `opcode` values that are present in the provided `ALL_SCRATCH_BLOCKS_CATALOG`. Do NOT use unlisted opcodes like `motion_jump`.\n"
f"10. **Return ONLY the JSON object representing all the blocks for THIS SINGLE SCRIPT.** Do NOT wrap it in a 'blocks' key or the full project JSON. The output should be a dictionary where keys are block IDs and values are block definitions."
)
try:
response = agent.invoke({"messages": [{"role": "user", "content": llm_block_generation_prompt}]})
raw_response = response["messages"][-1].content
logger.info(f"Raw response from LLM [ImprovementBlockBuilderNode - {sprite_name} - {event_opcode}]: {raw_response[:500]}...")
try:
generated_blocks = extract_json_from_llm_response(raw_response)
except json.JSONDecodeError:
logger.error("Failed to extract JSON from LLM response. Attempting to correct the response.")
# Use the JSON resolver agent to fix the response
correction_prompt = (
"Correct the following JSON respose to ensure it follows proper schema rules:\n\n"
"Take care of following instruction carefully:\n"
"1. Do **NOT** put anything inside the json just correct and resolve the issues inside the json.\n"
"2. Do **NOT** add any additional text, comments, or explanations.\n"
"3. Just response correct the json where you find and put the same content as it is.\n"
"Here is the raw response:\n\n"
f"raw_response: {raw_response}\n\n"
)
correction_response = agent_json_resolver.invoke({"messages": [{"role": "user", "content": correction_prompt}]})
print(f"[JSON CORRECTOR RESPONSE AT IMPROVEMENTBLOCKBUILDER ]: {correction_response}")
generated_blocks = extract_json_from_llm_response(correction_response["messages"][-1].content)
if "blocks" in generated_blocks and isinstance(generated_blocks["blocks"], dict):
logger.warning(f"LLM returned nested 'blocks' key for {sprite_name}. Unwrapping.")
generated_blocks = generated_blocks["blocks"]
# Update block positions for top-level script
for block_id, block_data in generated_blocks.items():
if block_data.get("topLevel"):
block_data["x"] = script_x_offset_per_sprite.get(sprite_name, 0)
block_data["y"] = script_y_offset[sprite_name]
script_y_offset[sprite_name] += 150 # Increment for next script
current_sprite_target["blocks"].update(generated_blocks)
logger.info(f"Improvement blocks added for sprite '{sprite_name}', script '{event_opcode}' by ImprovementBlockBuilderNode.")
except Exception as e:
logger.error(f"Error generating blocks for sprite '{sprite_name}', script '{event_opcode}': {e}")
raise
state["project_json"] = project_json # Update the state with the modified project_json
logger.info("Updated project JSON with improvement nodes.")
print("Updated project JSON with improvement nodes:", json.dumps(project_json, indent=2)) # Print for direct visibility
return state # Return the state object
#temporarry time delay for handling TPM issue
import time
def delay_for_tpm_node(state: GameState):
logger.info("--- Running DelayForTPMNode ---")
time.sleep(80) # Adjust the delay as needed
logger.info("Delay completed.")
return state
# Build the LangGraph workflow
workflow = StateGraph(GameState)
# Add all nodes to the workflow
workflow.add_node("parse_query", parse_query_and_set_initial_positions)
workflow.add_node("game_description", game_description_node)
workflow.add_node("time_delay_1", delay_for_tpm_node) # this is a temporary node to handle TPM issues
workflow.add_node("time_delay_2", delay_for_tpm_node) # this is a temporary node to handle TPM issues
workflow.add_node("time_delay_3", delay_for_tpm_node) # this is a temporary node to handle TPM issues
workflow.add_node("time_delay_4", delay_for_tpm_node) # this is a temporary node to handle TPM issues
workflow.add_node("time_delay_5", delay_for_tpm_node) # this is a temporary node to handle TPM issues
workflow.add_node("initial_plan_build", overall_planner_node) # High-level planning node
workflow.add_node("plan_verifier", plan_verification_node) # Verifies the high-level plan
workflow.add_node("refined_planner", refined_planner_node) # Refines the action plan
workflow.add_node("block_builder", overall_block_builder_node) # Builds blocks from a plan
workflow.add_node("block_verifier", block_verification_node) # Verifies the generated blocks
workflow.add_node("improved_block_builder", improvement_block_builder_node) # For specific block-level improvements
# Set the entry point
workflow.set_entry_point("game_description")
# Define the standard initial flow
workflow.add_edge("game_description", "parse_query")
workflow.add_edge("parse_query", "time_delay_1")
workflow.add_edge("time_delay_1", "initial_plan_build")
workflow.add_edge("initial_plan_build", "time_delay_5")
workflow.add_edge("time_delay_5", "plan_verifier")
# Define the conditional logic after plan_verifier (for high-level plan issues)
def decide_next_step_after_plan_verification(state: GameState):
if state.get("needs_improvement", False):
# If the plan needs refinement, go to the refined_planner
return "refined_planner"
else:
# If the plan is good, proceed to building blocks from this plan
return "block_builder"
workflow.add_conditional_edges(
"plan_verifier",
decide_next_step_after_plan_verification,
{
"refined_planner": "refined_planner", # Path if plan needs refinement
"block_builder": "block_builder" # Path if plan is approved, proceeds to block building
}
)
# --- CRITICAL CHANGE FOR THE PLAN REFINEMENT LOOP ---
# After refining the plan, it should go back to plan_verifier for re-verification.
workflow.add_edge("refined_planner", "time_delay_2")
workflow.add_edge("time_delay_2", "plan_verifier") # This closes the loop for plan refinement and re-verification.
# Note: The original code had workflow.add_edge("time_delay", "block_builder") here,
# but after refined_planner -> time_delay -> plan_verifier, the decision is made by plan_verifier.
# So, this edge might be redundant or incorrect depending on the desired flow.
# Assuming the intent is for plan_verifier to always decide the next step.
# After blocks are built, they need to be verified
#workflow.add_edge("time_delay_3", "block_builder")
workflow.add_edge("block_builder", "time_delay_3")
workflow.add_edge("time_delay_3", "block_verifier")
# Define the conditional logic after block_verifier (for generated blocks issues)
def decide_after_block_verification(state: GameState):
if state.get("needs_improvement", False):
# If blocks need improvement, go to improved_block_builder.
# This assumes improved_block_builder handles specific block-level fixes.
return "improved_block_builder"
else:
# If blocks are good, end the workflow
return "END"
workflow.add_conditional_edges(
"block_verifier",
decide_after_block_verification,
{
"improved_block_builder": "improved_block_builder", # Path if blocks need improvement
"END": END # Path if blocks are good
}
)
# Create the loop: If blocks improved, re-verify them
#workflow.add_edge("time_delay_4", "improved_block_builder")
workflow.add_edge("improved_block_builder", "time_delay_4")
workflow.add_edge("time_delay_4", "block_verifier")
# Compile the workflow graph
app_graph = workflow.compile()
# # Build the LangGraph workflow
# workflow = StateGraph(GameState)
# # Add all nodes to the workflow
# workflow.add_node("parse_query", parse_query_and_set_initial_positions)
# workflow.add_node("game_description", game_description_node)
# workflow.add_node("initial_plan_build", overall_planner_node) # High-level planning node
# workflow.add_node("plan_verifier", plan_verification_node) # Verifies the high-level plan
# workflow.add_node("refined_planner", refined_planner_node) # Refines the action plan
# workflow.add_node("block_builder", overall_block_builder_node) # Builds blocks from a plan
# workflow.add_node("block_verifier", block_verification_node) # Verifies the generated blocks
# workflow.add_node("improved_block_builder", improvement_block_builder_node) # For specific block-level improvements
# # Set the entry point
# workflow.set_entry_point("game_description")
# # Define the standard initial flow
# workflow.add_edge("game_description", "parse_query")
# workflow.add_edge("parse_query", "initial_plan_build")
# workflow.add_edge("initial_plan_build", "plan_verifier")
# # Define the conditional logic after plan_verifier (for high-level plan issues)
# def decide_next_step_after_plan_verification(state: GameState):
# if state.get("needs_improvement", False):
# # If the plan needs refinement, go to the refined_planner
# return "refined_planner"
# else:
# # If the plan is good, proceed to building blocks from this plan
# return "block_builder"
# workflow.add_conditional_edges(
# "plan_verifier",
# decide_next_step_after_plan_verification,
# {
# "refined_planner": "refined_planner", # Path if plan needs refinement
# "block_builder": "block_builder" # Path if plan is approved, proceeds to block building
# }
# )
# # --- CRITICAL CHANGE FOR THE PLAN REFINEMENT LOOP ---
# # After refining the plan, it should go back to plan_verifier for re-verification.
# workflow.add_edge("refined_planner", "plan_verifier") # This closes the loop for plan refinement and re-verification.
# # After blocks are built, they need to be verified
# workflow.add_edge("block_builder", "block_verifier")
# # Define the conditional logic after block_verifier (for generated blocks issues)
# def decide_after_block_verification(state: GameState):
# if state.get("needs_improvement", False):
# # If blocks need improvement, go to improved_block_builder.
# # This assumes improved_block_builder handles specific block-level fixes.
# return "improved_block_builder"
# else:
# # If blocks are good, end the workflow
# return "END"
# workflow.add_conditional_edges(
# "block_verifier",
# decide_after_block_verification,
# {
# "improved_block_builder": "improved_block_builder", # Path if blocks need improvement
# "END": END # Path if blocks are good
# }
# )
# # Create the loop: If blocks improved, re-verify them
# workflow.add_edge("improved_block_builder", "block_verifier")
# # Compile the workflow graph
# app_graph = workflow.compile()
from IPython.display import Image, display
png_bytes = app_graph.get_graph().draw_mermaid_png()
with open("langgraph_workflow.png", "wb") as f:
f.write(png_bytes)
### Modified `generate_game` Endpoint
@app.route("/generate_game", methods=["POST"])
def generate_game():
payload = request.json
desc = payload.get("description", "")
backdrops = payload.get("backdrops", [])
sprites = payload.get("sprites", [])
logger.info(f"Starting game generation for description: '{desc}'")
# 1) Initial skeleton generation
project_skeleton = {
"targets": [
{
"isStage": True,
"name":"Stage",
"objName": "Stage", # ADDED THIS FOR THE STAGE
"variables":{}, # This must be a dict: { "var_id": ["var_name", value] }
"lists":{}, "broadcasts":{},
"blocks":{}, "comments":{},
"currentCostume": len(backdrops)-1 if backdrops else 0,
"costumes": [make_costume_entry("backdrops",b["filename"],b["name"])
for b in backdrops],
"sounds": [], "volume":100,"layerOrder":0,
"tempo":60,"videoTransparency":50,"videoState":"on",
"textToSpeechLanguage": None
}
],
"monitors": [], "extensions": [], "meta":{
"semver":"3.0.0","vm":"11.1.0",
"agent": request.headers.get("User-Agent","")
}
}
for idx, s in enumerate(sprites, start=1):
costume = make_costume_entry("sprites", s["filename"], s["name"])
project_skeleton["targets"].append({
"isStage": False,
"name": s["name"],
"objName": s["name"], # objName is also important for sprites
"variables":{}, "lists":{}, "broadcasts":{},
"blocks":{},
"comments":{},
"currentCostume":0,
"costumes":[costume],
"sounds":[], "volume":100,
"layerOrder": idx+1,
"visible":True, "x":0,"y":0,"size":100,"direction":90,
"draggable":False, "rotationStyle":"all around"
})
logger.info("Initial project skeleton created.")
project_id = str(uuid.uuid4())
project_folder = os.path.join("generated_projects", project_id)
try:
os.makedirs(project_folder, exist_ok=True)
# Save initial skeleton and copy assets
project_json_path = os.path.join(project_folder, "project.json")
with open(project_json_path, "w") as f:
json.dump(project_skeleton, f, indent=2)
logger.info(f"Initial project skeleton saved to {project_json_path}")
for b in backdrops:
src_path = os.path.join(app.static_folder, "assets", "backdrops", b["filename"])
dst_path = os.path.join(project_folder, b["filename"])
if os.path.exists(src_path):
shutil.copy(src_path, dst_path)
else:
logger.warning(f"Source backdrop asset not found: {src_path}")
for s in sprites:
src_path = os.path.join(app.static_folder, "assets", "sprites", s["filename"])
dst_path = os.path.join(project_folder, s["filename"])
if os.path.exists(src_path):
shutil.copy(src_path, dst_path)
else:
logger.warning(f"Source sprite asset not found: {src_path}")
logger.info("Assets copied to project folder.")
# Initialize the state for LangGraph as a dictionary matching the TypedDict structure
initial_state_dict: GameState = { # Type hint for clarity
"project_json": project_skeleton,
"description": desc,
"project_id": project_id,
"sprite_initial_positions": {},
"action_plan": {},
#"behavior_plan": {},
"improvement_plan": {},
"needs_improvement": False,
"plan_validation_feedback": {},
"iteration_count": 0,
"review_block_feedback": {}
}
# Run the LangGraph workflow with the dictionary input.
final_state_dict: GameState = app_graph.invoke(initial_state_dict)
# Access elements from the final_state_dict
final_project_json = final_state_dict['project_json']
# Save the *final* filled project JSON, overwriting the skeleton
with open(project_json_path, "w") as f:
json.dump(final_project_json, f, indent=2)
logger.info(f"Final project JSON saved to {project_json_path}")
return jsonify({"message": "Game generated successfully", "project_id": project_id})
except Exception as e:
logger.error(f"Error during game generation workflow for project ID {project_id}: {e}", exc_info=True)
return jsonify(error=f"Error generating game: {e}"), 500
if __name__=="__main__":
logger.info("Starting Flask application...")
app.run(debug=True,port=5000)