Spaces:
No application file
No application file
from flask import Flask, request, jsonify, render_template, send_from_directory | |
from langgraph.prebuilt import create_react_agent | |
from langchain_groq import ChatGroq | |
#from langgraph.graph import draw | |
from langchain.chat_models import ChatOpenAI | |
from PIL import Image | |
import os, json, re | |
import shutil | |
import uuid | |
from langgraph.graph import StateGraph, END | |
import logging | |
from typing import Dict, TypedDict, Optional, Any, List | |
# --- Configure logging --- | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
logger = logging.getLogger(__name__) | |
app = Flask(__name__) | |
# --- LLM / Vision model setup --- | |
os.environ["GROQ_API_KEY"] = os.getenv("GROQ_API_KEY", "default_key_or_placeholder") | |
os.environ["OPENROUTER_API_KEY"] = os.getenv("OPENROUTER_API_KEY", "default_key_or_placeholder") | |
os.environ["OPENROUTER_BASE_URL"] = os.getenv("OPENROUTER_BASE_URL", "default_key_or_placeholder") | |
groq_key = os.environ["GROQ_API_KEY"] | |
if not groq_key or groq_key == "default_key_or_placeholder": | |
logger.critical("GROQ_API_KEY environment variable is not set or invalid. Please set it to proceed.") | |
raise ValueError("GROQ_API_KEY environment variable is not set or invalid.") | |
#Main LLM for the SCRATCH 3.0 Agent | |
llm = ChatGroq( | |
#model="deepseek-r1-distill-llama-70b", | |
#model="llama-3.3-70b-versatile", | |
#model="meta-llama/llama-4-maverick-17b-128e-instruct", | |
model="meta-llama/llama-4-scout-17b-16e-instruct", | |
temperature=0.0, | |
) | |
# Json debugger [temporary] | |
llm2 = ChatGroq( | |
model="deepseek-r1-distill-llama-70b", | |
#model="llama-3.3-70b-versatile", | |
#model="meta-llama/llama-4-maverick-17b-128e-instruct", | |
#model="meta-llama/llama-4-scout-17b-16e-instruct", | |
temperature=0.0, | |
) | |
# llm = ChatOpenAI( | |
# openai_api_key=os.environ["OPENROUTER_API_KEY"], | |
# openai_api_base=os.environ["OPENROUTER_BASE_URL"], | |
# model_name="deepseek/deepseek-r1-0528:free", | |
# model_kwargs={ | |
# "headers": { | |
# "HTTP-Referer": "https://localhost:5000/", | |
# "X-Title": "agent_scratch", | |
# } | |
# }, | |
#) | |
# Refined SYSTEM_PROMPT with more explicit Scratch JSON rules, especially for variables | |
SYSTEM_PROMPT = """ | |
You are an expert AI assistant named GameScratchAgent, specialized in generating and modifying Scratch-VM 3.x game project JSON. | |
Your core task is to process game descriptions and existing Scratch JSON structures, then produce or update JSON segments accurately. | |
You possess deep knowledge of Scratch 3.0 project schema, informed by comprehensive reference materials. When generating or modifying the `blocks` section, pay extremely close attention to the following: | |
**Scratch Target and Block Schema Rules:** | |
1. **Target Structure:** | |
* Every target (Stage or Sprite) must have an `objName` property. For the Stage, it's typically `"objName": "Stage"`. For sprites, it's their name (e.g., `"objName": "Cat"`). | |
* `variables` dictionary: This dictionary maps unique variable IDs to arrays `[variable_name, initial_value]`. | |
Example: `"variables": { "myVarId123": ["score", 0] }` | |
2. **Block Structure:** Every block must have `opcode`, `blockType`, `parent`, `next`, `inputs`, `fields`, `shadow`, `topLevel`. | |
* `opcode`: Unique internal identifier for the block's specific functionality (e.g., `"motion_movesteps"`)[cite: 439, 452]. | |
* `blockType`: Specifies the visual shape and general behavior. Common types include `"command"` (Stack block), `"reporter"` (Reporter block), `"Boolean"` (Boolean block), and `"hat"` (Hat block)[cite: 453, 454]. | |
* `parent`: ID of the block it's stacked on (or `null` for top-level). | |
* `next`: ID of the block directly below it (or `null` for end of stack). | |
* `topLevel`: `true` if it's a hat block or a standalone block, `false` otherwise. | |
* `shadow`: `true` if it's a shadow block (e.g., a default number input), `false` otherwise. | |
3. **`inputs` vs. `fields`:** This is critical. | |
* **`inputs`**: Used for blocks or values *plugged into* a block (e.g., condition for an `if` block, target for `go to`). Values are **arrays**. | |
* `[1, <blockId>]`: Represents a Reporter or Boolean block *plugged in* (e.g., `operator_equals`). | |
* `[1, [<type>, "<value>"]]`: For a shadow block with a literal value. The `type` specifies the data type and can be "num" (number), "str" (string), "bool" (Boolean), "colour", "angle", "date", "dropdown", "iconmenu", or "variable"[cite: 457, 461]. | |
Example: `[1, ["num", "10"]]` for a number input. | |
* `[2, <blockId>]`: For a C-block's substack (e.g., `SUBSTACK` in `control_if`). | |
* **`fields`**: Used for dropdown menus or direct internal values *within* a block (e.g., key selected in `when key pressed`, variable name in `set variable`). Values are always **arrays** `["<value>", null]`. | |
* Example for `event_whenkeypressed`: `"KEY": ["space", null]` | |
* Example for `data_setvariableto`: `"VARIABLE": ["score", null]` | |
4. **Unique IDs:** All block IDs, variable IDs, and list IDs must be unique strings (e.g., "myBlock123", "myVarId456"). Do NOT use placeholder strings like "block_id_here". | |
5. **No Nested `blocks` Dictionary:** The `blocks` dictionary should only appear once per `target` (sprite/stage). Do NOT nest a `blocks` dictionary inside an individual block definition. Blocks that are part of a substack (like inside an `if` or `forever` or `repeat` loop and other) are linked via the `SUBSTACK` input, where the input value is `[2, "ID_of_first_block_in_substack"]`. | |
6. **Asset Properties:** `assetId`, `md5ext`, `bitmapResolution`, `rotationCenterX`/`rotationCenterY` should be correct for costumes/sounds. | |
**General Principles and Important Considerations:** | |
* **Backward Compatibility:** Adhere strictly to existing Scratch 3.0 opcodes and schema to ensure backward compatibility with older projects[cite: 439, 440, 446]. | |
* **Forgiving Inputs:** Recognize that Scratch is designed to be "forgiving in its interpretation of inputs"[cite: 442, 459]. While generating valid JSON, understand that the Scratch VM handles potentially "invalid" inputs gracefully (e.g., type coercion, returning default values like zero or empty strings, rather than crashing)[cite: 443, 459]. This implies that precise type matching for inputs might be handled internally by Scratch, allowing for some flexibility in how values are provided, but the agent should aim for the most common and logical type. | |
**When responding, you must:** | |
1. **Output ONLY the complete, valid JSON object.** Do not include markdown, commentary, or conversational text. | |
2. Ensure every generated block ID, input, field, and variable declaration strictly follows the Scratch 3.0 schema and the rules above. | |
3. If presented with partial or problematic JSON, aim to complete or correct it based on the given instructions and your schema knowledge. | |
""" | |
SYSTEM_PROMPT_JSON_CORRECTOR =""" | |
You are an assistant that outputs JSON responses strictly following the given schema. | |
If the JSON you produce has any formatting errors, missing required fields, or invalid structure, you must identify the problems and correct them. | |
Always return only valid JSON that fully conforms to the schema below, enclosed in triple backticks (```), without any extra text or explanation. | |
If you receive an invalid or incomplete JSON response, fix it by: | |
- Adding any missing required fields with appropriate values. | |
- Correcting syntax errors such as missing commas, brackets, or quotes. | |
- Ensuring the JSON structure matches the schema exactly. | |
Remember: Your output must be valid JSON only, ready to be parsed without errors. | |
""" | |
# Main agent of the system agent for Scratch 3.0 | |
agent = create_react_agent( | |
model=llm, | |
tools=[], # No specific tools are defined here, but could be added later | |
prompt=SYSTEM_PROMPT | |
) | |
# debugger and resolver agent for Scratch 3.0 | |
agent_json_resolver = create_react_agent( | |
model=llm, | |
tools=[], # No specific tools are defined here, but could be added later | |
prompt=SYSTEM_PROMPT_JSON_CORRECTOR | |
) | |
# --- Serve the form --- | |
def index(): | |
return render_template("index4.html") | |
# --- List static assets for the front-end --- | |
def list_assets(): | |
bdir = os.path.join(app.static_folder, "assets", "backdrops") | |
sdir = os.path.join(app.static_folder, "assets", "sprites") | |
backdrops = [] | |
sprites = [] | |
try: | |
if os.path.isdir(bdir): | |
backdrops = [f for f in os.listdir(bdir) if f.lower().endswith(".svg")] | |
if os.path.isdir(sdir): | |
sprites = [f for f in os.listdir(sdir) if f.lower().endswith(".svg")] | |
logger.info("Successfully listed static assets.") | |
except Exception as e: | |
logger.error(f"Error listing static assets: {e}") | |
return jsonify({"error": "Failed to list assets"}), 500 | |
return jsonify(backdrops=backdrops, sprites=sprites) | |
# --- Helper: build costume entries --- | |
def make_costume_entry(folder, filename, name): | |
asset_id = filename.rsplit(".", 1)[0] | |
entry = { | |
"name": name, | |
"bitmapResolution": 1, | |
"dataFormat": filename.rsplit(".", 1)[1], | |
"assetId": asset_id, | |
"md5ext": filename, | |
} | |
path = os.path.join(app.static_folder, "assets", folder, filename) | |
ext = filename.lower().endswith(".svg") | |
if ext: | |
if folder == "backdrops": | |
entry["rotationCenterX"] = 240 | |
entry["rotationCenterY"] = 180 | |
else: | |
entry["rotationCenterX"] = 0 | |
entry["rotationCenterY"] = 0 | |
else: | |
try: | |
img = Image.open(path) | |
w, h = img.size | |
entry["rotationCenterX"] = w // 2 | |
entry["rotationCenterY"] = h // 2 | |
except Exception as e: | |
logger.warning(f"Could not determine image dimensions for {filename}: {e}. Setting center to 0,0.") | |
entry["rotationCenterX"] = 0 | |
entry["rotationCenterY"] = 0 | |
return entry | |
# --- New endpoint to fetch project.json --- | |
def get_project(project_id): | |
project_folder = os.path.join("generated_projects", project_id) | |
project_json_path = os.path.join(project_folder, "project.json") | |
try: | |
if os.path.exists(project_json_path): | |
logger.info(f"Serving project.json for project ID: {project_id}") | |
return send_from_directory(project_folder, "project.json", as_attachment=True, download_name=f"{project_id}.json") | |
else: | |
logger.warning(f"Project JSON not found for ID: {project_id}") | |
return jsonify({"error": "Project not found"}), 404 | |
except Exception as e: | |
logger.error(f"Error serving project.json for ID {project_id}: {e}") | |
return jsonify({"error": "Failed to retrieve project"}), 500 | |
# --- New endpoint to fetch assets --- | |
def get_asset(project_id, filename): | |
project_folder = os.path.join("generated_projects", project_id) | |
asset_path = os.path.join(project_folder, filename) | |
try: | |
if os.path.exists(asset_path): | |
logger.info(f"Serving asset '{filename}' for project ID: {project_id}") | |
return send_from_directory(project_folder, filename) | |
else: | |
logger.warning(f"Asset '{filename}' not found for project ID: {project_id}") | |
return jsonify({"error": "Asset not found"}), 404 | |
except Exception as e: | |
logger.error(f"Error serving asset '{filename}' for project ID {project_id}: {e}") | |
return jsonify({"error": "Failed to retrieve asset"}), 500 | |
### LangGraph Workflow Definition | |
# Define a state for your graph using TypedDict | |
class GameState(TypedDict): | |
project_json: dict | |
description: str | |
project_id: str | |
sprite_initial_positions: dict # Add this as well, as it's part of your state | |
action_plan: Optional[Dict] | |
#behavior_plan: Optional[Dict] | |
improvement_plan: Optional[Dict] | |
needs_improvement: bool | |
plan_validation_feedback: Optional[Dict] | |
iteration_count: int # Track the number of iterations for improvements | |
review_block_feedback: Optional[Dict] # Feedback from the agent on the blocks after verification | |
# Helper function to update project JSON with sprite positions | |
import copy | |
def update_project_with_sprite_positions(project_json: dict, sprite_positions: dict) -> dict: | |
""" | |
Update the 'x' and 'y' coordinates of sprites in the Scratch project JSON. | |
Args: | |
project_json (dict): Original Scratch project JSON. | |
sprite_positions (dict): Dict mapping sprite names to {'x': int, 'y': int}. | |
Returns: | |
dict: Updated project JSON with new sprite positions. | |
""" | |
updated_project = copy.deepcopy(project_json) | |
for target in updated_project.get("targets", []): | |
if not target.get("isStage", False): | |
sprite_name = target.get("name") | |
if sprite_name in sprite_positions: | |
pos = sprite_positions[sprite_name] | |
if "x" in pos and "y" in pos: | |
target["x"] = pos["x"] | |
target["y"] = pos["y"] | |
return updated_project | |
# Helper function to load the block catalog from a JSON file | |
def _load_block_catalog(file_path: str) -> Dict: | |
"""Loads the Scratch block catalog from a specified JSON file.""" | |
try: | |
with open(file_path, 'r') as f: | |
catalog = json.load(f) | |
logger.info(f"Successfully loaded block catalog from {file_path}") | |
return catalog | |
except FileNotFoundError: | |
logger.error(f"Error: Block catalog file not found at {file_path}") | |
# Return an empty dict or raise an error, depending on desired behavior | |
return {} | |
except json.JSONDecodeError as e: | |
logger.error(f"Error decoding JSON from {file_path}: {e}") | |
return {} | |
except Exception as e: | |
logger.error(f"An unexpected error occurred while loading {file_path}: {e}") | |
return {} | |
# --- Global variable for the block catalog --- | |
ALL_SCRATCH_BLOCKS_CATALOG = {} | |
BLOCK_CATALOG_PATH = r"blocks\blocks.json" # Define the path to your JSON file | |
HAT_BLOCKS_PATH = r"blocks\hat_blocks.json" # Path to the hat blocks JSON file | |
STACK_BLOCKS_PATH = r"blocks\stack_blocks.json" # Path to the stack blocks JSON file | |
REPORTER_BLOCKS_PATH = r"blocks\reporter_blocks.json" # Path to the reporter blocks JSON file | |
BOOLEAN_BLOCKS_PATH = r"blocks\boolean_blocks.json" # Path to the boolean blocks JSON file | |
C_BLOCKS_PATH = r"blocks\c_blocks.json" # Path to the C blocks JSON file | |
CAP_BLOCKS_PATH = r"blocks\cap_blocks.json" # Path to the cap blocks JSON file | |
# Load the block catalogs from their respective JSON files | |
hat_block_data = _load_block_catalog(HAT_BLOCKS_PATH) | |
hat_description = hat_block_data["description"] | |
hat_opcodes_functionalities = "\n".join([f" - Opcode: {block['op_code']}, functionality: {block['functionality']}" for block in hat_block_data["blocks"]]) | |
print("Hat blocks loaded successfully.", hat_description) | |
boolean_block_data = _load_block_catalog(BOOLEAN_BLOCKS_PATH) | |
boolean_description = boolean_block_data["description"] | |
boolean_opcodes_functionalities = "\n".join([f" - Opcode: {block['op_code']}, functionality: {block['functionality']}" for block in boolean_block_data["blocks"]]) | |
c_block_data = _load_block_catalog(C_BLOCKS_PATH) | |
c_description = c_block_data["description"] | |
c_opcodes_functionalities = "\n".join([f" - Opcode: {block['op_code']}, functionality: {block['functionality']}" for block in c_block_data["blocks"]]) | |
cap_block_data = _load_block_catalog(CAP_BLOCKS_PATH) | |
cap_description = cap_block_data["description"] | |
cap_opcodes_functionalities = "\n".join([f" - Opcode: {block['op_code']}, functionality: {block['functionality']}" for block in cap_block_data["blocks"]]) | |
reporter_block_data = _load_block_catalog(REPORTER_BLOCKS_PATH) | |
reporter_description = reporter_block_data["description"] | |
reporter_opcodes_functionalities = "\n".join([f" - Opcode: {block['op_code']}, functionality: {block['functionality']}" for block in reporter_block_data["blocks"]]) | |
stack_block_data = _load_block_catalog(STACK_BLOCKS_PATH) | |
stack_description = stack_block_data["description"] | |
stack_opcodes_functionalities = "\n".join([f" - Opcode: {block['op_code']}, functionality: {block['functionality']}" for block in stack_block_data["blocks"]]) | |
# This makes ALL_SCRATCH_BLOCKS_CATALOG available globally | |
ALL_SCRATCH_BLOCKS_CATALOG = _load_block_catalog(BLOCK_CATALOG_PATH) | |
# Helper function to generate a unique block ID | |
def generate_block_id(): | |
"""Generates a short, unique ID for a Scratch block.""" | |
return str(uuid.uuid4())[:10].replace('-', '') # Shorten for readability, ensure uniqueness | |
# Placeholder for your extract_json_from_llm_response function | |
# # Helper function to extract JSON from LLM response | |
# def extract_json_from_llm_response(raw_response: str) -> dict: | |
# """ | |
# Extracts a JSON object from an LLM response string, robustly handling | |
# literal newlines inside string values by escaping them. | |
# """ | |
# # 1) Pull out the inner JSON block (```json … ```) | |
# json_block = raw_response | |
# m = re.search(r"```json\s*(\{.*\})\s*```", raw_response, re.DOTALL) | |
# if m: | |
# json_block = m.group(1) | |
# # 2) Find the feedback field and escape its newlines | |
# # This pattern captures everything between "feedback": " … " | |
# feedback_pattern = r'("feedback"\s*:\s*")(.+?)("\s*,)' | |
# def _escape_feedback(m): | |
# prefix, val, suffix = m.groups() | |
# # replace each literal newline and carriage return with \n | |
# safe_val = val.replace("\r", "\\r").replace("\n", "\\n") | |
# return prefix + safe_val + suffix | |
# json_sanitized = re.sub(feedback_pattern, _escape_feedback, json_block, flags=re.DOTALL) | |
# # 3) Now try loading | |
# try: | |
# return json.loads(json_sanitized) | |
# except json.JSONDecodeError as e: | |
# logger.error(f"Failed to parse sanitized JSON: {e!r}\nContent was:\n{json_sanitized}") | |
# raise | |
def extract_json_from_llm_response(raw_response: str) -> dict: | |
""" | |
Extracts a JSON object from an LLM response string, robustly handling | |
various common LLM output formats and parsing errors. | |
""" | |
json_string_to_parse = raw_response | |
# --- Step 1: Try to extract JSON from markdown code block (most common and cleanest) --- | |
# This pattern is more robust: it matches "```json" or "```" (for general code blocks) | |
# and captures everything until the next "```" | |
markdown_match = re.search(r"```(?:json)?\s*([\s\S]*?)\s*```", raw_response) | |
if markdown_match: | |
json_string_to_parse = markdown_match.group(1).strip() | |
logger.debug("Extracted potential JSON from markdown block.") | |
else: | |
logger.debug("No markdown JSON block found. Attempting to parse raw response.") | |
# --- Step 2: Pre-process for common LLM JSON generation errors --- | |
# 2.1) Escape newlines/carriage returns in string values | |
# This is a general fix for all string values, not just 'feedback' | |
# It looks for "key": "value" patterns and replaces newlines within 'value' | |
# This might be tricky to get right for all cases without a full parser. | |
# A safer bet is to hope the LLM formats strings correctly, or to rely on direct JSON parsing. | |
# For now, let's keep your feedback-specific one if you find LLM adds newlines there, | |
# but the primary error is structural. | |
# Re-apply feedback escaping if you still suspect newlines in specific fields | |
feedback_pattern = r'("feedback"\s*:\s*")(.+?)("(?:\s*,|\s*\})?)' # Adjusted regex to handle end of object/array | |
def _escape_feedback(m): | |
prefix, val, suffix = m.groups() | |
safe_val = val.replace("\r", "\\r").replace("\n", "\\n") | |
return prefix + safe_val + suffix | |
json_string_to_parse = re.sub(feedback_pattern, _escape_feedback, json_string_to_parse, flags=re.DOTALL) | |
logger.debug("Applied feedback newline escaping.") | |
# --- Step 3: Attempt to parse the (sanitized) JSON string --- | |
try: | |
parsed_json = json.loads(json_string_to_parse) | |
logger.info("Successfully parsed JSON from LLM response.") | |
return parsed_json | |
except json.JSONDecodeError as original_error: | |
# If parsing fails, log the problematic string and the error for debugging | |
logger.error(f"Failed to parse JSON. Error: {original_error!r}") | |
logger.error(f"Problematic JSON string (start):\n{json_string_to_parse[:1000]}...") # Log first 1000 chars | |
logger.error(f"Problematic JSON string (full length: {len(json_string_to_parse)}):\n{json_string_to_parse}") # Log full for deep dive | |
# Attempt a fallback for common structural issues (e.g., extra trailing commas) | |
# This is speculative and may not always work, but worth a try before failing. | |
try: | |
# Remove trailing commas from objects and arrays | |
# This regex is simplified and might not catch all cases, but handles common ones | |
# For example, {"a": 1,} -> {"a": 1} | |
# Or [1, 2,] -> [1, 2] | |
sanitized_for_trailing_commas = re.sub(r',\s*([}\]])', r'\1', json_string_to_parse) | |
logger.warning("Attempting to parse after removing potential trailing commas.") | |
return json.loads(sanitized_for_trailing_commas) | |
except json.JSONDecodeError as e_fallback: | |
logger.error(f"Fallback parsing also failed. Error: {e_fallback!r}") | |
# The original error is more informative for 'Expecting ,' delimiter, so raise that. | |
raise original_error # Re-raise the original, more specific JSONDecodeError | |
# Node 1: Detailed description generator. | |
def game_description_node(state: GameState): | |
""" | |
Generates a detailed narrative description of the game based on the initial query. | |
""" | |
logger.info("--- Running GameDescriptionNode ---") | |
sprite_name = {} | |
initial_description = state.get("description", "A simple game.") | |
project_json = state["project_json"] | |
for target in project_json["targets"]: | |
sprite_name[target["name"]] = target["name"] | |
description_prompt = ( | |
f"You are an AI assistant tasked with generating a detailed narrative description for a Scratch 3.0 game.\n" | |
f"The initial high-level description is: '{initial_description}'.\n" | |
f"The current Scratch project JSON is:\n```json\n{json.dumps(state['project_json'], indent=2)}\n```\n" | |
f"Make sure you donot change Sprite and Stage name. Here are all the name: {sprite_name} \n" | |
f"Create a rich, engaging, and detailed description, including potential gameplay elements, objectives, and overall feel with the available resources\n" | |
f"The output should be a plain text description." | |
) | |
try: | |
response = agent.invoke({"messages": [{"role": "user", "content": description_prompt}]}) | |
detailed_game_description = response["messages"][-1].content | |
state["description"] = detailed_game_description | |
logger.info("Detailed game description generated by GameDescriptionNode.") | |
print(f"Detailed Game Description: {detailed_game_description}") | |
return state | |
except Exception as e: | |
logger.error(f"Error in GameDescriptionNode: {e}") | |
raise | |
# Node 2: Analysis User Query and Initial Positions | |
def parse_query_and_set_initial_positions(state: GameState): | |
logger.info("--- Running ParseQueryNode ---") | |
llm_query_prompt = ( | |
f"Based on the user's game description: '{state['description']}', " | |
f"and the current Scratch project JSON below, " | |
f"determine the most appropriate initial 'x' and 'y' coordinates for each sprite. " | |
f"Return ONLY a JSON object with a single key 'sprite_initial_positions' mapping sprite names to their {{'x': int, 'y': int}} coordinates.\n\n" | |
f"The current Scratch project JSON is:\n```json\n{json.dumps(state['project_json'], indent=2)}\n```\n" | |
f"Example Json output:\n" | |
"```json\n" | |
"{\n" | |
" \"sprite_initial_positions\": {\n" | |
" \"Sprite1\": {\"x\": -160, \"y\": -110},\n" | |
" \"Sprite2\": {\"x\": 240, \"y\": -135}\n" | |
" }\n" | |
"}" | |
"```\n" | |
) | |
try: | |
response = agent.invoke({"messages": [{"role": "user", "content": llm_query_prompt}]}) | |
raw_response = response["messages"][-1].content | |
print("Raw response from LLM:", raw_response) | |
# json debugging and solving | |
try: | |
updated_data = extract_json_from_llm_response(raw_response) | |
sprite_positions = updated_data.get("sprite_initial_positions", {}) | |
except json.JSONDecodeError: | |
logger.error("Failed to extract JSON from LLM response. Attempting to correct the response.") | |
# Use the JSON resolver agent to fix the response | |
correction_prompt = ( | |
"Correct the following JSON respose to ensure it follows proper schema rules:\n\n" | |
"Take care of following instruction carefully:\n" | |
"1. Do **NOT** put anything inside the json just correct and resolve the issues inside the json.\n" | |
"2. Do **NOT** add any additional text, comments, or explanations.\n" | |
"3. Just response correct the json where you find and put the same content as it is.\n" | |
"Here is the raw response:\n\n" | |
f"raw_response: {raw_response}\n\n" | |
) | |
correction_response = agent_json_resolver.invoke({"messages": [{"role": "user", "content": correction_prompt}]}) | |
print(f"[JSON CORRECTOR RESPONSE AT PARSER]: {correction_response}") | |
corrected_data = extract_json_from_llm_response(correction_response["messages"][-1].content) | |
sprite_positions = corrected_data.get("sprite_initial_positions", {}) | |
new_project_json = update_project_with_sprite_positions(state["project_json"], sprite_positions) | |
state["project_json"]= new_project_json | |
print("Updated project JSON with sprite positions:", json.dumps(new_project_json, indent=2)) | |
return state | |
#return {"project_json": new_project_json} | |
except Exception as e: | |
logger.error(f"Error in ParseQueryNode: {e}") | |
raise | |
# Node 3: Sprite Action Plan Builder | |
def overall_planner_node(state: GameState): | |
""" | |
Generates a comprehensive action plan for sprites, including detailed Scratch block information. | |
This node acts as an overall planner, leveraging knowledge of all block shapes and categories. | |
""" | |
logger.info("--- Running OverallPlannerNode ---") | |
description = state.get("description", "") | |
project_json = state["project_json"] | |
sprite_names = [t["name"] for t in project_json["targets"] if not t["isStage"]] | |
# Get sprite positions from the project_json if available, or set defaults | |
sprite_positions = {} | |
for target in project_json["targets"]: | |
if not target["isStage"]: | |
sprite_positions[target["name"]] = {"x": target.get("x", 0), "y": target.get("y", 0)} | |
planning_prompt = ( | |
"Generate a detailed action plan for the game's sprites based on the user query and sprite details.\n\n" | |
f"**Game Description:** '{description}'\n\n" | |
f"**Sprites in Game:** {', '.join(sprite_names)}\n" | |
f"**Current Sprite Positions:** {json.dumps(sprite_positions)}\n\n" | |
"--- Scratch 3.0 Block Reference ---\n" | |
"This section provides a comprehensive reference of Scratch 3.0 blocks, categorized by shape, including their opcodes and functional descriptions. Use this to accurately identify block types and behavior.\n\n" | |
f"### Hat Blocks\nDescription: {hat_description}\nBlocks:\n{hat_opcodes_functionalities}\n\n" | |
f"### Boolean Blocks\nDescription: {boolean_description}\nBlocks:\n{boolean_opcodes_functionalities}\n\n" | |
f"### C Blocks\nDescription: {c_description}\nBlocks:\n{c_opcodes_functionalities}\n\n" | |
f"### Cap Blocks\nDescription: {cap_description}\nBlocks:\n{cap_opcodes_functionalities}\n\n" | |
f"### Reporter Blocks\nDescription: {reporter_description}\nBlocks:\n{reporter_opcodes_functionalities}\n\n" | |
f"### Stack Blocks\nDescription: {stack_description}\nBlocks:\n{stack_opcodes_functionalities}\n\n" | |
"-----------------------------------\n\n" | |
"Your task is to define the primary actions and movements for each sprite.\n" | |
"The output should be a JSON object with a single key 'action_overall_flow'. Each key inside this object should be a sprite name (e.g., 'Player', 'Enemy'), and its value must include a 'description' and a list of 'plans'.\n" | |
"Each plan must begin with a **single Scratch Hat Block** (e.g., 'event_whenflagclicked') and should contain:\n" | |
"1. **'event'**: the exact `opcode` of the hat block that initiates the logic.\n" | |
"2. **'logic'**: a natural language breakdown of each step taken after the event. Separate each step with a semicolon ';'. Ensure clarity and granularity—each described action should map closely to a Scratch block or tight sequence.\n" | |
" - For example: for a jump: 'change y by N; wait M seconds; change y by -N'.\n" | |
" - Use 'forever: ...' to prefix repeating logic explicitly.\n" | |
" - Use Scratch-consistent verbs: 'move', 'change', 'wait', 'hide', 'say', etc.\n" | |
"3. **Opcode Lists**: include relevant Scratch opcodes grouped under `motion`, `control`, `operator`, `sensing`, `looks`, `sounds`, `events`, and `data`. List only the non-empty categories. Use exact opcodes including shadow/helper blocks (e.g., 'math_number').\n\n" | |
"Use sprite names exactly as listed in `sprite_names`. Do NOT rename or invent new sprites.\n" | |
"Ensure the plan reflects accurate opcode usage derived strictly from the block reference above.\n\n" | |
"Example structure for 'action_overall_flow':\n" | |
"```json\n" | |
"{\n" | |
" \"action_overall_flow\": {\n" | |
" \"Sprite1\": {\n" | |
" \"description\": \"Main character (cat) actions\",\n" | |
" \"plans\": [\n" | |
" {\n" | |
" \"event\": \"event_whenflagclicked\",\n" | |
" \"logic\": \"go to initial position at starting point.\",\n" | |
" \"motion\": [\"motion_gotoxy\"],\n" | |
" \"control\": [],\n" | |
" \"operator\": [],\n" | |
" \"sensing\": [],\n" | |
" \"looks\": [],\n" | |
" \"sounds\": [],\n" | |
" \"data\": []\n" | |
" },\n" | |
" {\n" | |
" \"event\": \"event_whenkeypressed\",\n" | |
" \"logic\": \"repeat(10): change y by 10; wait 0.1 seconds; change y by -10;\",\n" | |
" \"motion\": [\"motion_changeyby\"],\n" | |
" \"control\": [\"control_repeat\", \"control_wait\"],\n" | |
" \"operator\": [],\n" | |
" \"sensing\": [],\n" | |
" \"looks\": [],\n" | |
" \"sounds\": [],\n" | |
" \"data\": []\n" | |
" }\n" | |
" ]\n" | |
" },\n" | |
" \"soccer ball\": {\n" | |
" \"description\": \"Obstacle movement and interaction\",\n" | |
" \"plans\": [\n" | |
" {\n" | |
" \"event\": \"event_whenflagclicked\",\n" | |
" \"logic\": \"go to x:240 y:-135; forever: glide 2 seconds to x:-240 y:-135; if x position < -235, then set x to 240; if touching Sprite1, then hide;\",\n" | |
" \"motion\": [\"motion_gotoxy\", \"motion_glidesecstoxy\", \"motion_xposition\", \"motion_setx\"],\n" | |
" \"control\": [\"control_forever\", \"control_if\"],\n" | |
" \"operator\": [\"operator_lt\"],\n" | |
" \"sensing\": [\"sensing_istouching\", \"sensing_touchingobjectmenu\"],\n" | |
" \"looks\": [\"looks_hide\"],\n" | |
" \"data\": []\n" | |
" }\n" | |
" ]\n" | |
" }\n" | |
" }\n" | |
"}\n" | |
"```\n" | |
"Based on the provided context, generate the `action_overall_flow`." | |
) | |
try: | |
response = agent.invoke({"messages": [{"role": "user", "content": planning_prompt}]}) | |
raw_response = response["messages"][-1].content | |
print("Raw response from LLM [OverallPlannerNode]:", raw_response) # Uncomment for debugging | |
# json debugging and solving | |
try: | |
overall_plan = extract_json_from_llm_response(raw_response) | |
except json.JSONDecodeError: | |
logger.error("Failed to extract JSON from LLM response. Attempting to correct the response.") | |
# Use the JSON resolver agent to fix the response | |
correction_prompt = ( | |
"Correct the following JSON respose to ensure it follows proper schema rules:\n\n" | |
"Take care of following instruction carefully:\n" | |
"1. Do **NOT** put anything inside the json just correct and resolve the issues inside the json.\n" | |
"2. Do **NOT** add any additional text, comments, or explanations.\n" | |
"3. Just response correct the json where you find and put the same content as it is.\n" | |
"Here is the raw response:\n\n" | |
f"raw_response: {raw_response}\n\n" | |
) | |
correction_response = agent_json_resolver.invoke({"messages": [{"role": "user", "content": correction_prompt}]}) | |
print(f"[JSON CORRECTOR RESPONSE AT OVERALLPLANNERNODE ]: {correction_response}") | |
overall_plan= extract_json_from_llm_response(correction_response["messages"][-1].content) | |
state["action_plan"] = overall_plan | |
logger.info("Overall plan generated by OverallPlannerNode.") | |
return state | |
except Exception as e: | |
logger.error(f"Error in OverallPlannerNode: {e}") | |
raise | |
# Helper function to get a block by its opcode from a single catalog | |
def get_block_by_opcode(catalog_data: dict, opcode: str) -> dict | None: | |
""" | |
Search a single catalog (with keys "description" and "blocks": List[dict]) | |
for a block whose 'op_code' matches the given opcode. | |
Returns the block dict or None if not found. | |
""" | |
for block in catalog_data["blocks"]: | |
if block.get("op_code") == opcode: | |
return block | |
return None | |
# Helper function to find a block in all catalogs by opcode | |
def find_block_in_all(opcode: str, all_catalogs: list[dict]) -> dict | None: | |
""" | |
Search across multiple catalogs for a given opcode. | |
Returns the first matching block dict or None. | |
""" | |
for catalog in all_catalogs: | |
blk = get_block_by_opcode(catalog, opcode) | |
if blk is not None: | |
return blk | |
return None | |
# Node 4: Sprite Plan Verification Node | |
def plan_verification_node(state: GameState): | |
""" | |
Validates the generated action plan/blocks, identifies missing logic, | |
provides feedback, and determines if further improvements are needed. | |
Also manages the iteration count for the improvement loop. | |
""" | |
logger.info(f"--- Running VerificationNode (Iteration: {state.get('iteration_count', 0)}) ---") | |
MAX_IMPROVEMENT_ITERATIONS = 1 # Set a sensible limit to prevent infinite loops | |
current_iteration = state.get("iteration_count", 0) | |
#project_json = state["project_json"] | |
action_plan = state.get("action_plan", {}) | |
print(f"[action_plan before verification] on ({current_iteration}): {json.dumps(action_plan, indent=2)}") | |
#improvement_plan = state.get("improvement_plan", {}) # May contain prior improvement guidance | |
# Corrected validation_prompt | |
validation_prompt = ( | |
f"You are an AI validator for Scratch project plans and generated blocks. " | |
f"Your task is to review the current state of the game's action plan and block structure. " | |
f"Critically analyze if there are any missing logic, structural inconsistencies, or unclear intentions. " | |
f"Provide **precise** and **constructive** feedback for improvement.\n\n" | |
f"**Game Description:**\n" | |
f"{state.get('description', '')}\n\n" | |
f"**Current Action Plan (High-Level Logic):**\n" | |
f"```json\n{json.dumps(action_plan, indent=2)}\n```\n\n" | |
# Uncomment below if needed | |
# f"**Current Project JSON (Generated Blocks):**\n" | |
# f"```json\n{json.dumps(project_json, indent=2)}\n```\n\n" | |
f"**Previous Feedback (if any):**\n" | |
f"{state.get('plan_validation_feedback', 'None')}\n\n" | |
f"Based on the above, return a response strictly in the following JSON format:\n" | |
"```json\n" | |
"{\n" | |
" \"feedback\": \"Detailed comments on any missing logic, inconsistencies, or unclear intent. Be concise but specific. If everything is perfect, state that explicitly.\",\n" | |
" \"needs_improvement\": true,\n" | |
" \"suggested_description_updates\": \"Concise revision of the game description if needed. Use an empty string if no change is required.\"\n" | |
"}\n" | |
"```\n" | |
"**Important:**\n" | |
"- The `needs_improvement` field must be strictly `true` or `false` (boolean). Do **not** include any other text or explanation inside the JSON.\n" | |
"- Be strict in evaluation. If **any** part of the plan or block logic appears incomplete, ambiguous, or incorrect, set `needs_improvement` to `true`." | |
) | |
try: | |
response = agent.invoke({"messages": [{"role": "user", "content": validation_prompt}]}) | |
raw_response = response["messages"][-1].content | |
logger.info(f"Raw response from LLM [VerificationNode]: {raw_response[:500]}...") | |
# json debugging and solving | |
try: | |
validation_result = extract_json_from_llm_response(raw_response) | |
except json.JSONDecodeError: | |
logger.error("Failed to extract JSON from LLM response. Attempting to correct the response.") | |
# Use the JSON resolver agent to fix the response | |
correction_prompt = ( | |
"Correct the following JSON respose to ensure it follows proper schema rules:\n\n" | |
"Take care of following instruction carefully:\n" | |
"1. Do **NOT** put anything inside the json just correct and resolve the issues inside the json.\n" | |
"2. Do **NOT** add any additional text, comments, or explanations.\n" | |
"3. Just response correct the json where you find and put the same content as it is.\n" | |
"Here is the raw response:\n\n" | |
f"raw_response: {raw_response}\n\n" | |
) | |
correction_response = agent_json_resolver.invoke({"messages": [{"role": "user", "content": correction_prompt}]}) | |
print(f"[JSON CORRECTOR RESPONSE AT PLANVERIFICATIONNODE ]: {correction_response}") | |
validation_result = extract_json_from_llm_response(correction_response["messages"][-1].content) | |
# Update state with feedback and improvement flag | |
state["plan_validation_feedback"] = validation_result.get("feedback", "No specific feedback provided.") | |
state["needs_improvement"] = validation_result.get("needs_improvement", False) | |
suggested_description_updates = validation_result.get("suggested_description_updates", "") | |
if suggested_description_updates: | |
# You might want to append or intelligently merge this with the existing detailed_game_description | |
# For simplicity, let's just append for now or update a specific field | |
#current_description = state.get("detailed_game_description", state.get("description", "")) | |
current_description = state.get(state.get("description", "")) | |
#state["detailed_game_description"] = f"{current_description}\n\nSuggested Update: {suggested_description_updates}"\ | |
state["description"] = f"{current_description}\n\nSuggested Update: {suggested_description_updates}" | |
logger.info("Updated detailed game description based on validation feedback.") | |
# Manage iteration count | |
if state["needs_improvement"]: | |
state["iteration_count"] = current_iteration + 1 | |
if state["iteration_count"] >= MAX_IMPROVEMENT_ITERATIONS: | |
logger.warning(f"Max improvement iterations ({MAX_IMPROVEMENT_ITERATIONS}) reached. Forcing 'needs_improvement' to False.") | |
state["needs_improvement"] = False | |
state["plan_validation_feedback"] += "\n(Note: Max iterations reached, stopping further improvements.)" | |
else: | |
state["iteration_count"] = 0 # Reset if no more improvement needed | |
logger.info(f"Verification completed. Needs Improvement: {state['needs_improvement']}. Feedback: {state['plan_validation_feedback'][:100]}...") | |
print(f"[updated action_plan after verification] on ({current_iteration}): {json.dumps(state.get("action_plan", {}), indent=2)}") | |
return state | |
except Exception as e: | |
logger.error(f"Error in VerificationNode: {e}") | |
state["needs_improvement"] = False # Force end loop on error | |
state["plan_validation_feedback"] = f"Validation error: {e}" | |
raise | |
# Node 5: Refined Planner Node | |
def refined_planner_node(state: GameState): | |
""" | |
Refines the action plan based on validation feedback and game description. | |
""" | |
logger.info("--- Running RefinedPlannerNode ---") | |
#detailed_game_description = state.get("detailed_game_description", state.get("description", "A game.")) | |
detailed_game_description = state.get("description","") | |
current_action_plan = state.get("action_plan", {}) | |
print(f"[current_action_plan before refinement] on ({state.get('iteration_count', 0)}): {json.dumps(current_action_plan, indent=2)}") | |
plan_validation_feedback = state.get("plan_validation_feedback", "No specific feedback provided. Assume general refinement is needed.") | |
project_json = state["project_json"] | |
sprite_names = [t["name"] for t in project_json["targets"] if not t["isStage"]] | |
# Get sprite positions from the project_json if available, or set defaults | |
sprite_positions = {} | |
for target in project_json["targets"]: | |
if not target["isStage"]: | |
sprite_positions[target["name"]] = {"x": target.get("x", 0), "y": target.get("y", 0)} | |
refinement_prompt = ( | |
"Refine the existing action plan for the game's sprites based on the detailed game description and the validation feedback provided.\n\n" | |
f"**Detailed Game Description:** '{detailed_game_description}'\n\n" | |
f"**Sprites in Game:** {', '.join(sprite_names)}\n" | |
f"**Current Sprite Positions:** {json.dumps(sprite_positions)}\n\n" | |
f"**Current Action Plan (to be refined):**\n" | |
f"```json\n{json.dumps(current_action_plan, indent=2)}\n```\n\n" | |
f"**Validation Feedback:**\n" | |
f"'{plan_validation_feedback}'\n\n" | |
"--- Scratch 3.0 Block Reference ---\n" | |
f"### Hat Blocks\nDescription: {hat_description}\nBlocks:\n{hat_opcodes_functionalities}\n\n" | |
f"### Boolean Blocks\nDescription: {boolean_description}\nBlocks:\n{boolean_opcodes_functionalities}\n\n" | |
f"### C Blocks\nDescription: {c_description}\nBlocks:\n{c_opcodes_functionalities}\n\n" | |
f"### Cap Blocks\nDescription: {cap_description}\nBlocks:\n{cap_opcodes_functionalities}\n\n" | |
f"### Reporter Blocks\nDescription: {reporter_description}\nBlocks:\n{reporter_opcodes_functionalities}\n\n" | |
f"### Stack Blocks\nDescription: {stack_description}\nBlocks:\n{stack_opcodes_functionalities}\n\n" | |
"-----------------------------------\n\n" | |
"Your task is to refine the JSON object 'action_overall_flow'.\n" | |
"Use sprite names exactly as provided in `sprite_names` (e.g., 'Sprite1', 'soccer ball'); do **NOT** rename them.\n\n" | |
"Follow this exact format for the output (example):\n" | |
"Example structure for 'action_overall_flow':\n" | |
"```json\n" | |
"{\n" | |
" \"action_overall_flow\": {\n" | |
" \"Sprite1\": {\n" | |
" \"description\": \"Main character (cat) actions\",\n" | |
" \"plans\": [\n" | |
" {\n" | |
" \"event\": \"event_whenflagclicked\",\n" | |
" \"logic\": \"go to initial position at starting point.\",\n" | |
" \"motion\": [\"motion_gotoxy\"],\n" | |
" \"control\": [],\n" | |
" \"operator\": [],\n" | |
" \"sensing\": [],\n" | |
" \"looks\": [],\n" | |
" \"sounds\": [],\n" | |
" \"data\": []\n" | |
" },\n" | |
" {\n" | |
" \"event\": \"event_whenkeypressed\",\n" | |
" \"logic\": \"repeat(10): change y by 10 → wait 0.1 sec → change y by -10\",\n" | |
" \"motion\": [\"motion_changeyby\"],\n" | |
" \"control\": [\"control_repeat\", \"control_wait\"],\n" | |
" \"operator\": [],\n" | |
" \"sensing\": [],\n" | |
" \"looks\": [],\n" | |
" \"sounds\": [],\n" | |
" \"data\": []\n" | |
" }\n" | |
" ]\n" | |
" },\n" | |
" \"soccer ball\": {\n" | |
" \"description\": \"Obstacle movement and interaction\",\n" | |
" \"plans\": [\n" | |
" {\n" | |
" \"event\": \"event_whenflagclicked\",\n" | |
" \"logic\": \"go to x:240 y:-135 → forever glide to x:-240 y:-135 → if x position < -235 then set x to 240 → if touching Sprite1 then hide\",\n" | |
" \"motion\": [\"motion_gotoxy\", \"motion_glidesecstoxy\", \"motion_xposition\", \"motion_setx\"],\n" | |
" \"control\": [\"control_forever\", \"control_if\"],\n" | |
" \"operator\": [\"operator_lt\"],\n" | |
" \"sensing\": [\"sensing_istouching\", \"sensing_touchingobjectmenu\"],\n" | |
" \"looks\": [\"looks_hide\"],\n" | |
" \"data\": []\n" | |
" }\n" | |
" ]\n" | |
" }\n" | |
" }\n" | |
"}\n" | |
"```\n" | |
"Use the validation feedback to address errors, fill in missing logic, or enhance clarity.\n" | |
"example of few possible improvements: 1.event_whenflagclicked is used to control sprite but its used for actual start scratch project and reset scratch. 2. looping like forever used where we should use iterative. 3. missing of for variable we used in the block\n" | |
"- Maintain the **exact JSON structure** shown above.\n" | |
"- All `logic` fields must be **clear and granular**.\n" | |
"- Only include opcode categories that contain relevant opcodes.\n" | |
"- Ensure that each opcode matches its intended Scratch functionality.\n" | |
"- If feedback suggests major change, **rethink the entire plan** for the affected sprite(s).\n" | |
"- If feedback is minor, make precise, minimal improvements only." | |
) | |
try: | |
response = agent.invoke({"messages": [{"role": "user", "content": refinement_prompt}]}) | |
raw_response = response["messages"][-1].content | |
logger.info(f"Raw response from LLM [RefinedPlannerNode]: {raw_response[:500]}...") | |
# json debugging and solving | |
try: | |
refined_plan = extract_json_from_llm_response(raw_response) | |
except json.JSONDecodeError: | |
logger.error("Failed to extract JSON from LLM response. Attempting to correct the response.") | |
# Use the JSON resolver agent to fix the response | |
correction_prompt = ( | |
"Correct the following JSON respose to ensure it follows proper schema rules:\n\n" | |
"Take care of following instruction carefully:\n" | |
"1. Do **NOT** put anything inside the json just correct and resolve the issues inside the json.\n" | |
"2. Do **NOT** add any additional text, comments, or explanations.\n" | |
"3. Just response correct the json where you find and put the same content as it is.\n" | |
"Here is the raw response:\n\n" | |
f"raw_response: {raw_response}\n\n" | |
) | |
correction_response = agent_json_resolver.invoke({"messages": [{"role": "user", "content": correction_prompt}]}) | |
print(f"[JSON CORRECTOR RESPONSE AT REFINEPLANNER ]: {correction_response}") | |
refined_plan = extract_json_from_llm_response(correction_response["messages"][-1].content) | |
logger.info("Refined plan corrected by JSON resolver agent.") | |
if refined_plan: | |
#state["action_plan"] = refined_plan.get("action_overall_flow", {}) # Update to the key 'action_overall_flow' [error] | |
state["action_plan"] = refined_plan.get("action_overall_flow", {}) # Update the main the prompt includes updated only | |
logger.info("Action plan refined by RefinedPlannerNode.") | |
else: | |
logger.warning("RefinedPlannerNode did not return a valid 'action_overall_flow' structure. Keeping previous plan.") | |
print("[Refined Action Plan]:", json.dumps(state["action_plan"], indent=2)) | |
print("[current state after refinement]:", json.dumps(state, indent=2)) | |
return state | |
except Exception as e: | |
logger.error(f"Error in RefinedPlannerNode: {e}") | |
raise | |
# Node 4: Overall Block Builder Node | |
def overall_block_builder_node(state: GameState): | |
logger.info("--- Running OverallBlockBuilderNode ---") | |
project_json = state["project_json"] | |
targets = project_json["targets"] | |
sprite_map = {target["name"]: target for target in targets if not target["isStage"]} | |
# Also get the Stage target | |
stage_target = next((target for target in targets if target["isStage"]), None) | |
if stage_target: | |
sprite_map[stage_target["name"]] = stage_target | |
# Pre-load all block-catalog JSONs once | |
all_catalogs = [ | |
hat_block_data, | |
boolean_block_data, | |
c_block_data, | |
cap_block_data, | |
reporter_block_data, | |
stack_block_data | |
] | |
action_plan = state.get("action_plan", {}) | |
print("[Overall Action Plan received at the block generator]:", json.dumps(action_plan, indent=2)) | |
if not action_plan: | |
logger.warning("No action plan found in state. Skipping OverallBlockBuilderNode.") | |
return state | |
script_y_offset = {} | |
script_x_offset_per_sprite = {name: 0 for name in sprite_map.keys()} | |
# This is the handler which ensure if somehow json response changed it handle it.[DONOT REMOVE BELOW LOGIC] | |
if action_plan.get("action_overall_flow", {})=={}: | |
plan_data = action_plan.items() | |
else: | |
plan_data= action_plan.get("action_overall_flow", {}).items() | |
for sprite_name, sprite_actions_data in plan_data: | |
#for sprite_name, sprite_actions_data in action_plan.items(): | |
if sprite_name in sprite_map: | |
current_sprite_target = sprite_map[sprite_name] | |
if "blocks" not in current_sprite_target: | |
current_sprite_target["blocks"] = {} | |
if sprite_name not in script_y_offset: | |
script_y_offset[sprite_name] = 0 | |
for plan_entry in sprite_actions_data.get("plans", []): | |
event_opcode = plan_entry["event"] # This is now expected to be an opcode | |
logic_sequence = plan_entry["logic"] # This is the semicolon-separated string | |
# Extract the new opcode lists from the plan_entry | |
motion_opcodes = plan_entry.get("motion", []) | |
control_opcodes = plan_entry.get("control", []) | |
operator_opcodes = plan_entry.get("operator", []) | |
sensing_opcodes = plan_entry.get("sensing", []) | |
looks_opcodes = plan_entry.get("looks", []) | |
sound_opcodes = plan_entry.get("sound", []) | |
events_opcodes = plan_entry.get("events", []) | |
data_opcodes = plan_entry.get("data", []) | |
# Create a string representation of the identified opcodes for the prompt | |
identified_opcodes_str = "" | |
if motion_opcodes: | |
identified_opcodes_str += f" Motion Blocks (opcodes): {', '.join(motion_opcodes)}\n" | |
if control_opcodes: | |
identified_opcodes_str += f" Control Blocks (opcodes): {', '.join(control_opcodes)}\n" | |
if operator_opcodes: | |
identified_opcodes_str += f" Operator Blocks (opcodes): {', '.join(operator_opcodes)}\n" | |
if sensing_opcodes: | |
identified_opcodes_str += f" Sensing Blocks (opcodes): {', '.join(sensing_opcodes)}\n" | |
if looks_opcodes: | |
identified_opcodes_str += f" Looks Blocks (opcodes): {', '.join(looks_opcodes)}\n" | |
if sound_opcodes: | |
identified_opcodes_str += f" Sound Blocks (opcodes): {', '.join(sound_opcodes)}\n" | |
if events_opcodes: | |
identified_opcodes_str += f" Event Blocks (opcodes): {', '.join(events_opcodes)}\n" | |
if data_opcodes: | |
identified_opcodes_str += f" Data Blocks (opcodes): {', '.join(data_opcodes)}\n" | |
needed_opcodes = motion_opcodes + control_opcodes + operator_opcodes + sensing_opcodes + looks_opcodes + sound_opcodes + events_opcodes + data_opcodes | |
needed_opcodes = list(set(needed_opcodes)) | |
# 2) build filtered runtime catalog (if you still need it) | |
filtered_catalog = { | |
op: ALL_SCRATCH_BLOCKS_CATALOG[op] | |
for op in needed_opcodes | |
if op in ALL_SCRATCH_BLOCKS_CATALOG | |
} | |
# 3) merge human‑written catalog + runtime entry for each opcode | |
combined_blocks = {} | |
for op in needed_opcodes: | |
catalog_def = find_block_in_all(op, all_catalogs) or {} | |
runtime_def = ALL_SCRATCH_BLOCKS_CATALOG.get(op, {}) | |
# merge: catalog fields first, then runtime overrides/adds | |
merged = {**catalog_def, **runtime_def} | |
combined_blocks[op] = merged | |
print("[Combined blocks for this script]:", json.dumps(combined_blocks, indent=2)) | |
llm_block_generation_prompt = ( | |
f"You are an AI assistant generating a **single complete Scratch 3.0 script** in JSON format.\n" | |
f"The current sprite is '{sprite_name}'.\n" | |
f"This script must start with the event block (Hat Block) for opcode '{event_opcode}'.\n" | |
f"The sequential logic to implement for this script is:\n" | |
f" Logic: {logic_sequence}\n\n" | |
f"**Required Block Opcodes & Catalog:**\n" | |
f"Based on the planning, the following specific Scratch block opcodes are expected to be used. You MUST use these opcodes where applicable.\n" | |
f"Here is the comprehensive catalog for these blocks, including their structure and required inputs/fields:\n" | |
f"```json\n{json.dumps(combined_blocks, indent=2)}\n```\n\n" | |
f"Current Scratch project JSON for this sprite (provided for context; you are generating a NEW, complete script):\n" | |
f"```json\n{json.dumps(current_sprite_target, indent=2)}\n```\n\n" | |
f"**CRITICAL INSTRUCTIONS FOR GENERATING THE BLOCK JSON (READ CAREFULLY AND FOLLOW PRECISELY):**\n" | |
f"1. **Unique Block IDs:** Generate a **globally unique ID** for EVERY block (main and shadow blocks) within the entire JSON output for this script. Example: 'myBlockID123'.\n" | |
f"2. **Script Initiation (Hat Block - VERY IMPORTANT):**\n" | |
f" * The **first block** of the script (the Hat block, opcode '{event_opcode}') MUST have `\"topLevel\": true` and `\"parent\": null`.\n" | |
f" * ONLY this Hat block should have `\"topLevel\": true`. All other blocks in the script MUST have `\"topLevel\": false`.\n" | |
f" * Set its `x` and `y` coordinates (e.g., `x: 0, y: 0` or similar for clear placement).\n" | |
f"3. **Strict Block Chaining (`next` and `parent`):**\n" | |
f" * Use the `next` field to point to the ID of the block DIRECTLY BELOW it in the stack. If a block has a `next`, its `next` block's `parent` MUST point back to the current block's ID.\n" | |
f" * Use the `parent` field to point to the ID of the block DIRECTLY ABOVE it in the stack. If a block has a `parent`, then that `parent` block's `next` MUST point to the current block's ID.\n" | |
f" * The **last block** in a linear stack (e.g., a Stack block not containing other blocks, or a Cap block) MUST have `\"next\": null`.\n" | |
f" * Blocks plugged into inputs (like Boolean reporters or Reporter blocks) or substacks (like inside C-blocks) do NOT use `next` to connect to the block holding them. Their connection is solely via the `inputs` field of their parent.\n" | |
f"4. **`inputs` Field Structure (ABSOLUTELY CRITICAL - ADHERE TO THIS RIGIDLY):**\n" | |
f" * The value for ANY key within the `inputs` dictionary MUST be an **array of EXACTLY two elements**: `[type_code, value_or_block_id]`.\n" | |
f" * **NEVER embed direct primitive values or arrays within `inputs` without a `type_code`**: E.g., `\"INPUT_NAME\": [\"value\", null]` or `\"INPUT_NAME\": [\"nestedArray\"]` are **STRICTLY FORBIDDEN and will cause errors**.\n" | |
f" * **`type_code` (first element):**\n" | |
f" * `1`: Use when `value_or_block_id` refers to a **primitive value** (number, string, boolean) or the ID of a **shadow block** (e.g., `math_number`, `text`, menu blocks).\n" | |
f" * `2`: Use when `value_or_block_id` refers to the ID of a **non-shadow block** that is PLUGGED IN (e.g., a Boolean block in a condition, or the first block of a C-block's `SUBSTACK`).\n" | |
f" * **Correct Examples (Re-emphasized):**\n" | |
f" * **Numerical Input (`motion_movesteps`):** To input `10` steps, define a separate `math_number` shadow block and reference its ID:\n" | |
f" ```json\n" | |
f" // Main block\n" | |
f" \"moveStepsID\": {{\n" | |
f" \"opcode\": \"motion_movesteps\", \"inputs\": {{ \"STEPS\": [1, \"shadowNumID\"] }},\n" | |
f" \"parent\": \"parentBlockID\", \"next\": \"nextBlockID\", \"topLevel\": false, \"shadow\": false\n" | |
f" }},\n" | |
f" // Separate shadow block for '10'\n" | |
f" \"shadowNumID\": {{\n" | |
f" \"opcode\": \"math_number\", \"fields\": {{ \"NUM\": [\"10\", null] }},\n" | |
f" \"parent\": \"moveStepsID\", \"shadow\": true, \"topLevel\": false\n" | |
f" }}\n" | |
f" ```\n" | |
f" * **Dropdown/Menu Input (`sensing_touchingobject` with 'edge'):** Define a separate menu shadow block and reference its ID:\n" | |
f" ```json\n" | |
f" // Main block\n" | |
f" \"touchingBlockID\": {{\n" | |
f" \"opcode\": \"sensing_touchingobject\", \"inputs\": {{ \"TOUCHINGOBJECTMENU\": [1, \"shadowEdgeMenuID\"] }},\n" | |
f" \"parent\": \"parentBlockID\", \"next\": null, \"topLevel\": false, \"shadow\": false\n" | |
f" }},\n" | |
f" // Separate shadow block for '_edge_'\n" | |
f" \"shadowEdgeMenuID\": {{\n" | |
f" \"opcode\": \"sensing_touchingobjectmenu\", \"fields\": {{ \"TOUCHINGOBJECTMENU\": [\"_edge_\", null] }},\n" | |
f" \"parent\": \"touchingBlockID\", \"shadow\": true, \"topLevel\": false\n" | |
f" }}\n" | |
f" ```\n" | |
f" * **C-block (`control_forever`):** Its `SUBSTACK` input MUST point to the first block inside its loop using `type_code: 2`.\n" | |
f" ```json\n" | |
f" \"foreverBlockID\": {{\n" | |
f" \"opcode\": \"control_forever\", \"inputs\": {{ \"SUBSTACK\": [2, \"firstBlockInsideForeverID\"] }},\n" | |
f" \"next\": null, \"parent\": \"blockAboveForeverID\", \"shadow\": false, \"topLevel\": false\n" | |
f" }},\n" | |
f" \"firstBlockInsideForeverID\": {{\n" | |
f" \"opcode\": \"motion_movesteps\", \"parent\": \"foreverBlockID\", \"next\": null, \"topLevel\": false, \"shadow\": false\n" | |
f" }}\n" | |
f" ```\n" | |
f" * **C-block with Condition (`control_if`):** `CONDITION` input uses `type_code: 1` (referencing a Boolean reporter), and `SUBSTACK` uses `type_code: 2` (referencing the first block inside the if-body).\n" | |
f" ```json\n" | |
f" \"ifBlockID\": {{\n" | |
f" \"opcode\": \"control_if\",\n" | |
f" \"inputs\": {{\n" | |
f" \"CONDITION\": [1, \"conditionBlockID\"],\n" | |
f" \"SUBSTACK\": [2, \"firstBlockInsideIfID\"]\n" | |
f" }},\n" | |
f" \"next\": \"blockAfterIfID\", \"parent\": \"blockAboveIfID\", \"shadow\": false, \"topLevel\": false\n" | |
f" }},\n" | |
f" \"conditionBlockID\": {{\n" | |
f" \"opcode\": \"sensing_touchingobject\", \"parent\": \"ifBlockID\", \"shadow\": false, \"topLevel\": false // ... and its own inputs/fields\n" | |
f" }},\n" | |
f" \"firstBlockInsideIfID\": {{\n" | |
f" \"opcode\": \"looks_sayforsecs\", \"parent\": \"ifBlockID\", \"next\": null, \"topLevel\": false, \"shadow\": false\n" | |
f" }}\n" | |
f" ```\n" | |
f"5. **Separate Shadow Blocks (MANDATORY):** Every time a block's input requires a numeric value, string literal, or a selection from a dropdown/menu, you MUST define a **separate block entry** in the top-level blocks dictionary for that shadow. Each shadow block MUST have `\"shadow\": true` and `\"topLevel\": false`.\n" | |
f"6. **`fields` for Direct Values:** Use the `fields` dictionary ONLY if the block directly embeds a dropdown value or text field without an `inputs` connection (e.g., `NUM` field in `math_number`, `KEY_OPTION` in `event_whenkeypressed`, `VARIABLE` field in `data_setvariableto`). Example: `\"fields\": {{\"NUM\": [\"10\", null]}}`.\n" | |
f"7. **Strictly Use Catalog Opcodes:** You MUST only use `opcode` values that are present in the provided `combined_blocks` catalog. Do NOT use unlisted or hypothetical opcodes (e.g., `motion_jump`).\n" | |
f"8. **Output Format:** Return **ONLY the JSON object** representing all the blocks for THIS SINGLE SCRIPT. Do NOT wrap it in a 'blocks' key or the full project JSON. The output should be a dictionary where keys are block IDs and values are block definitions." | |
) | |
try: | |
response = agent.invoke({"messages": [{"role": "user", "content": llm_block_generation_prompt}]}) | |
raw_response = response["messages"][-1].content | |
logger.info(f"Raw response from LLM [OverallBlockBuilderNode - {sprite_name} - {event_opcode}]: {raw_response[:500]}...") | |
print(f"Raw response from LLM [OverallBlockBuilderNode - {sprite_name} - {event_opcode}]: {raw_response}") # Uncomment for debugging | |
try: | |
generated_blocks = extract_json_from_llm_response(raw_response) | |
except json.JSONDecodeError: | |
logger.error("Failed to extract JSON from LLM response. Attempting to correct the response.") | |
# Use the JSON resolver agent to fix the response | |
correction_prompt = ( | |
"Correct the following JSON respose to ensure it follows proper schema rules:\n\n" | |
"Take care of following instruction carefully:\n" | |
"1. Do **NOT** put anything inside the json just correct and resolve the issues inside the json.\n" | |
"2. Do **NOT** add any additional text, comments, or explanations.\n" | |
"3. Just response correct the json where you find and put the same content as it is.\n" | |
"Here is the raw response:\n\n" | |
f"raw_response: {raw_response}\n\n" | |
) | |
correction_response = agent_json_resolver.invoke({"messages": [{"role": "user", "content": correction_prompt}]}) | |
print(f"[JSON CORRECTOR RESPONSE AT OVERALLBLOCKBUILDER ]: {correction_response}") | |
generated_blocks = extract_json_from_llm_response(correction_response["messages"][-1].content) | |
if "blocks" in generated_blocks and isinstance(generated_blocks["blocks"], dict): | |
logger.warning(f"LLM returned nested 'blocks' key for {sprite_name}. Unwrapping.") | |
generated_blocks = generated_blocks["blocks"] | |
# Update block positions for top-level script | |
for block_id, block_data in generated_blocks.items(): | |
if block_data.get("topLevel"): | |
block_data["x"] = script_x_offset_per_sprite.get(sprite_name, 0) | |
block_data["y"] = script_y_offset[sprite_name] | |
script_y_offset[sprite_name] += 150 # Increment for next script | |
current_sprite_target["blocks"].update(generated_blocks) | |
# setting the iteration count for the script | |
state["iteration_count"] = 0 | |
logger.info(f"Action blocks added for sprite '{sprite_name}', script '{event_opcode}' by OverallBlockBuilderNode.") | |
except Exception as e: | |
logger.error(f"Error generating blocks for sprite '{sprite_name}', script '{event_opcode}': {e}") | |
raise | |
state["project_json"] = project_json # Update the state with the modified project_json | |
logger.info("Updated project JSON with action nodes.") | |
print("Updated project JSON with action nodes:", json.dumps(project_json, indent=2)) # Print for direct visibility | |
return state # Return the state object | |
#helper function to identify the shape of block utilized by the block verifier | |
def get_block_type(opcode: str) -> str: | |
"""Determines the general type of a Scratch block based on its opcode.""" | |
if not opcode: | |
return "unknown" | |
if opcode.startswith("event_when") or opcode == "control_start_as_clone": | |
return "hat" | |
elif opcode.startswith("control_") and ("if" in opcode or "repeat" in opcode or "forever" in opcode): | |
return "c_block" | |
elif opcode in ["operator_equals", "operator_gt", "operator_lt", "operator_and", "operator_or", "operator_not"] or \ | |
(opcode.startswith("sensing_") and ("mousedown" in opcode or "keypressed" in opcode or "touching" in opcode)): | |
return "boolean" | |
elif opcode.endswith("menu"): # For dropdown shadow blocks, treat as reporter for type checking | |
return "reporter" | |
elif any(s in opcode for s in ["position", "direction", "size", "volume", "costume", "backdrop", "random", "add", "subtract", "multiply", "divide", "length", "item", "of"]): | |
# A more comprehensive check for reporters | |
return "reporter" | |
elif "stop_all" in opcode or "delete_this_clone" in opcode or "procedures_definition" in opcode: | |
return "cap" | |
# Default to stack for most command blocks if not explicitly defined | |
return ALL_SCRATCH_BLOCKS_CATALOG.get(opcode, {}).get("blockType", "stack") | |
#helper function to identify the shape of block utilized by the block verifier | |
def filter_script_blocks(all_blocks: dict, hat_block_id: str) -> dict: | |
""" | |
Filters and returns only the blocks that are part of a specific script | |
starting from the given hat_block_id, including connected reporters/shadows. | |
""" | |
script_blocks = {} | |
q = [hat_block_id] | |
visited = set() | |
while q: | |
current_block_id = q.pop(0) | |
if current_block_id in visited: | |
continue | |
visited.add(current_block_id) | |
block_data = all_blocks.get(current_block_id) | |
if not block_data: | |
continue | |
script_blocks[current_block_id] = block_data | |
# Add next block in sequence | |
next_id = block_data.get("next") | |
if next_id and next_id in all_blocks: | |
q.append(next_id) | |
# Add blocks connected via inputs (e.g., reporters, shadow blocks, substacks) | |
if "inputs" in block_data: | |
for input_name, input_value in block_data["inputs"].items(): | |
if isinstance(input_value, list) and len(input_value) >= 2: | |
value_or_block_id = input_value[1] | |
if isinstance(value_or_block_id, str) and value_or_block_id in all_blocks: | |
q.append(value_or_block_id) | |
# For type code 3 (reporter with default value), the third element might be a connected block | |
if len(input_value) >= 3 and isinstance(input_value[2], str) and input_value[2] in all_blocks: | |
q.append(input_value[2]) | |
# For C-blocks, add blocks in substacks (if present) | |
# SUBSTACKs are inputs that have type code 2 and contain the block ID of the first block in the substack | |
if get_block_type(block_data.get("opcode")) == "c_block": | |
for input_key, input_val in block_data.get("inputs", {}).items(): | |
if input_key.startswith("SUBSTACK") and isinstance(input_val, list) and len(input_val) >= 2 and input_val[0] == 2 and isinstance(input_val[1], str) and input_val[1] in all_blocks: | |
q.append(input_val[1]) | |
return script_blocks | |
def analyze_script_structure(script_blocks: dict, hat_block_id: str, sprite_name: str) -> list: | |
""" | |
Analyzes the structure of a single Scratch script for common errors. | |
Returns a list of issue strings. | |
""" | |
issues = [] | |
# 1. Validate the hat block | |
hat_block = script_blocks.get(hat_block_id) | |
if not hat_block: | |
issues.append(f"Script for sprite '{sprite_name}' (hat ID: {hat_block_id}) has no hat block data.") | |
return issues # Cannot proceed without a hat block | |
if not hat_block.get("topLevel") or hat_block.get("parent") is not None: | |
issues.append(f"Hat block '{hat_block_id}' for sprite '{sprite_name}' is not marked as topLevel or has a parent.") | |
# 2. Check all blocks within the script | |
for block_id, block_data in script_blocks.items(): | |
opcode = block_data.get("opcode") | |
if not opcode: | |
issues.append(f"Block '{block_id}' for sprite '{sprite_name}' is missing an opcode.") | |
continue | |
# Check if opcode exists in the catalog (simplified check for this example) | |
if opcode not in ALL_SCRATCH_BLOCKS_CATALOG: | |
issues.append(f"Block '{block_id}' for sprite '{sprite_name}' has unknown opcode '{opcode}'.") | |
# Parent-Child and Next-Previous Linkage | |
parent_id = block_data.get("parent") | |
next_id = block_data.get("next") | |
if parent_id: | |
parent_block = script_blocks.get(parent_id) | |
if not parent_block: | |
issues.append(f"Block '{block_id}' (opcode: {opcode}) for sprite '{sprite_name}' references non-existent parent '{parent_id}'.") | |
else: | |
parent_block_type = get_block_type(parent_block.get("opcode")) | |
current_block_type = get_block_type(opcode) | |
if parent_block_type in ["stack", "hat"]: | |
# For stack and hat blocks, the parent's 'next' should point to this block | |
if parent_block.get("next") != block_id: | |
issues.append(f"Block '{block_id}' (opcode: {opcode})'s parent '{parent_id}' does not link back to it via 'next'.") | |
elif parent_block_type == "c_block": | |
# For C-blocks, this block should be in a substack input | |
found_in_substack = False | |
for input_key, input_val in parent_block.get("inputs", {}).items(): | |
if isinstance(input_val, list) and len(input_val) >= 2 and input_val[1] == block_id: | |
if input_val[0] == 2 and input_key.startswith("SUBSTACK"): # Type code 2 for substacks | |
found_in_substack = True | |
break | |
# Shadow blocks and reporter blocks can also be inputs to C-blocks but not as substacks | |
if not found_in_substack and not block_data.get("shadow") and current_block_type not in ["reporter", "boolean"]: | |
issues.append(f"Block '{block_id}' (opcode: {opcode}) has parent '{parent_id}' but is not a substack, shadow, or reporter/boolean connection to a C-block.") | |
elif parent_block_type in ["reporter", "boolean"]: | |
# Reporter/Boolean blocks can be parents if they are input to another block | |
found_in_input = False | |
for input_key, input_val in parent_block.get("inputs", {}).items(): | |
if isinstance(input_val, list) and len(input_val) >= 2 and input_val[1] == block_id: | |
found_in_input = True | |
break | |
if not found_in_input: | |
issues.append(f"Block '{block_id}' (opcode: {opcode}) has reporter/boolean parent '{parent_id}' but is not linked via an input.") | |
else: # e.g., cap blocks cannot be parents to other blocks in a sequence | |
issues.append(f"Block '{block_id}' (opcode: {opcode}) has an unexpected parent block type '{parent_block_type}' for parent '{parent_id}'.") | |
if next_id: | |
next_block = script_blocks.get(next_id) | |
if not next_block: | |
issues.append(f"Block '{block_id}' (opcode: {opcode}) for sprite '{sprite_name}' references non-existent next block '{next_id}'.") | |
elif next_block.get("parent") != block_id: | |
issues.append(f"Block '{block_id}' (opcode: {opcode})'s next block '{next_id}' does not link back to it via 'parent'.") | |
elif block_data.get("shadow"): | |
issues.append(f"Shadow block '{block_id}' (opcode: {opcode}) for sprite '{sprite_name}' should not have a 'next' connection.") | |
elif get_block_type(opcode) == "hat": # Hat blocks should not generally have a 'next' in a linear script sequence | |
issues.append(f"Hat block '{block_id}' (opcode: {opcode}) for sprite '{sprite_name}' should not have a 'next' connection as a sequential block.") | |
elif get_block_type(opcode) == "cap": | |
issues.append(f"Cap block '{block_id}' (opcode: {opcode}) for sprite '{sprite_name}' should not have a 'next' connection as it signifies script end.") | |
# Input validation | |
if "inputs" in block_data: | |
for input_name, input_value in block_data["inputs"].items(): | |
if not isinstance(input_value, list) or len(input_value) < 2: | |
issues.append(f"Block '{block_id}' (opcode: {opcode}) for sprite '{sprite_name}' has malformed input '{input_name}': {input_value}.") | |
continue | |
type_code = input_value[0] | |
value_or_block_id = input_value[1] | |
# Type code 1: number/string/boolean literal or block ID (reporter/boolean) | |
# Type code 2: block ID (substack or reporter/boolean plugged in) | |
# Type code 3: number/string/boolean literal with shadow, or reporter with default value if nothing plugged in | |
if type_code not in [1, 2, 3]: | |
issues.append(f"Block '{block_id}' (opcode: {opcode}) input '{input_name}' has invalid type code: {type_code}. Expected 1, 2, or 3.") | |
if isinstance(value_or_block_id, str): | |
# It's a block ID, check if it exists and its parent link is correct | |
connected_block = script_blocks.get(value_or_block_id) | |
if not connected_block: | |
issues.append(f"Block '{block_id}' (opcode: {opcode}) input '{input_name}' references non-existent block ID '{value_or_block_id}'.") | |
elif connected_block.get("parent") != block_id: | |
issues.append(f"Block '{block_id}' (opcode: {opcode}) input '{input_name}' connects to '{value_or_block_id}', but '{value_or_block_id}'s parent is not '{block_id}'.") | |
# Check for type code consistency with connected block type | |
elif type_code == 2 and get_block_type(connected_block.get("opcode")) not in ["stack", "hat", "c_block", "cap", "reporter", "boolean"]: | |
issues.append(f"Block '{block_id}' (opcode: {opcode}) input '{input_name}' has type code 2 but connected block '{value_or_block_id}' is not a valid block type for substack/input.") | |
elif type_code == 1 and get_block_type(connected_block.get("opcode")) not in ["reporter", "boolean"]: | |
issues.append(f"Block '{block_id}' (opcode: {opcode}) input '{input_name}' has type code 1 but connected block '{value_or_block_id}' is not a reporter or boolean (expected for direct value input).") | |
# Specific checks for C-blocks' SUBSTACK | |
if block_data.get("blockType") == "c_block" and input_name.startswith("SUBSTACK"): # Changed to startswith as SUBSTACK1, SUBSTACK2 might exist | |
if not (isinstance(value_or_block_id, str) and script_blocks.get(value_or_block_id) and type_code == 2): | |
issues.append(f"C-block '{block_id}' (opcode: {opcode}) for sprite '{sprite_name}' has an invalid or missing SUBSTACK input configuration.") | |
# Shadow block specific checks | |
if block_data.get("shadow"): | |
if block_data.get("topLevel"): | |
issues.append(f"Shadow block '{block_id}' (opcode: {opcode}) for sprite '{sprite_name}' is incorrectly marked as topLevel.") | |
if not block_data.get("parent"): | |
issues.append(f"Shadow block '{block_id}' (opcode: {opcode}) for sprite '{sprite_name}' is missing a parent.") | |
if block_data.get("next"): | |
issues.append(f"Shadow block '{block_id}' (opcode: {opcode}) for sprite '{sprite_name}' should not have a 'next' connection.") | |
# Check fields for math_number and menu blocks | |
if opcode == "math_number" and not (isinstance(block_data.get("fields", {}).get("NUM"), list) and len(block_data["fields"]["NUM"]) >= 1 and isinstance(block_data["fields"]["NUM"][0], (str, int, float))): | |
issues.append(f"Math_number shadow block '{block_id}' (opcode: {opcode}) has malformed 'NUM' field.") | |
# This logic for menu shadow blocks assumes the field name is the opcode in uppercase without _MENU | |
# Example: for "looks_costumemenu", it expects a field "COSTUME" | |
if opcode.endswith("menu"): | |
expected_field = opcode.upper().replace("_MENU", "") | |
if not (isinstance(block_data.get("fields", {}).get(expected_field), list) and len(block_data["fields"][expected_field]) >= 1 and isinstance(block_data["fields"][expected_field][0], str)): | |
issues.append(f"Menu shadow block '{block_id}' (opcode: {opcode}) has malformed field '{expected_field}' for its specific menu option.") | |
return issues | |
# Node 5: Verification Node | |
def block_verification_node(state: dict) -> dict: | |
""" | |
The block verifier check for the if any improvement need if any through logical if else and then add it to improvement_plan. | |
aftet improvement plan the llm reviewer node also check for other error or issues if any and at last give the review as feedback. | |
Args: | |
state (dict): _description_ | |
Returns: | |
dict: _description_ | |
""" | |
logger.info(f"--- Running BlockVerificationNode (Iteration: {state.get('iteration_count', 0)}) ---") | |
MAX_IMPROVEMENT_ITERATIONS = 1 # Set a sensible limit to prevent infinite loops | |
current_iteration = state.get("iteration_count", 0) | |
project_json = state["project_json"] | |
targets = project_json["targets"] | |
# Initialize needs_improvement for the current run | |
state["needs_improvement"] = False | |
block_validation_feedback_overall = [] | |
improvement_plan = {"sprite_issues": {}} | |
for target in targets: | |
sprite_name = target["name"] | |
all_blocks_for_sprite = target.get("blocks", {}) | |
if not all_blocks_for_sprite: | |
logger.info(f"Sprite '{sprite_name}' has no blocks. Skipping verification.") | |
continue | |
sprite_issues = [] | |
hat_block_ids = [ | |
block_id for block_id, block_data in all_blocks_for_sprite.items() | |
if block_data.get("topLevel") and get_block_type(block_data.get("opcode")) == "hat" | |
] | |
processed_script_blocks = set() | |
if not hat_block_ids: | |
sprite_issues.append("No top-level hat blocks found for this sprite. Scripts may not run.") | |
for block_id, block_data in all_blocks_for_sprite.items(): | |
if block_data.get("topLevel") and not get_block_type(block_data.get("opcode")) == "hat": | |
sprite_issues.append(f"Top-level block '{block_id}' (opcode: {block_data.get('opcode')}) is not a hat block, so it will not run automatically.") | |
if not block_data.get("topLevel") and not block_data.get("parent") and not block_data.get("shadow"): | |
sprite_issues.append(f"Orphaned block '{block_id}' (opcode: {block_data.get('opcode')}) is not top-level, has no parent, and is not a shadow block.") | |
for hat_id in hat_block_ids: | |
logger.info(f"Verifying script starting with hat block '{hat_id}' for sprite '{sprite_name}'.") | |
current_script_blocks = filter_script_blocks(all_blocks_for_sprite, hat_id) | |
processed_script_blocks.update(current_script_blocks.keys()) | |
script_issues = analyze_script_structure(current_script_blocks, hat_id, sprite_name) | |
if script_issues: | |
sprite_issues.append(f"Issues in script starting with '{hat_id}':") | |
sprite_issues.extend([f" - {issue}" for issue in script_issues]) | |
else: | |
logger.info(f"Script starting with '{hat_id}' for sprite '{sprite_name}' passed basic verification.") | |
orphaned_blocks_overall = { | |
block_id for block_id in all_blocks_for_sprite.keys() | |
if block_id not in processed_script_blocks | |
and not all_blocks_for_sprite[block_id].get("topLevel") | |
and not all_blocks_for_sprite[block_id].get("parent") | |
} | |
if orphaned_blocks_overall: | |
sprite_issues.append(f"Found {len(orphaned_blocks_overall)} truly orphaned blocks not connected to any valid script: {', '.join(list(orphaned_blocks_overall)[:5])}{'...' if len(orphaned_blocks_overall) > 5 else ''}.") | |
if sprite_issues: | |
improvement_plan["sprite_issues"][sprite_name] = sprite_issues | |
logger.warning(f"Verification found issues for sprite '{sprite_name}'.") | |
block_validation_feedback_overall.append(f"Issues for {sprite_name}:\n" + "\n".join([f"- {issue}" for issue in sprite_issues])) | |
state["needs_improvement"] = True | |
print(f"\n--- Verification Report (Issues Found for {sprite_name}) ---") | |
print(json.dumps({sprite_name: sprite_issues}, indent=2)) | |
else: | |
logger.info(f"Sprite '{sprite_name}' passed all verification checks.") | |
# Consolidate feedback for the LLM | |
state["block_validation_feedback"] = "\n\n".join(block_validation_feedback_overall) | |
print(F"[OVERALL IMPROVEMENT PLAN ON ITERATION {current_iteration}]: {improvement_plan}") | |
if state["needs_improvement"]: | |
llm_reviewer_prompt = ( | |
"You are an expert Scratch project reviewer. Your task is to analyze the provided " | |
"structural issues found in a Scratch project's sprites and suggest improvements or " | |
"further insights. Focus on clarity, accuracy, and actionable advice.\n\n" | |
"Here are the detected structural issues:\n" | |
f"{json.dumps(improvement_plan['sprite_issues'], indent=2)}\n\n" | |
"Here are the block validation feedback:\n" | |
f"{json.dumps(state["block_validation_feedback"], indent=2)}\n\n" | |
"Please review these issues and provide a consolidated report with potential causes " | |
"and recommendations for fixing them. If an issue is minor or expected in certain " | |
"scenarios (e.g., hidden blocks for backward compatibility), please note that." | |
"Structure your response as a JSON object with 'review_summary' as the key, " | |
"containing a dictionary where keys are sprite names and values are lists of suggested improvements." | |
"Example:\n" | |
"```json\n" | |
"{\n" | |
" \"review_summary\": {\n" | |
" \"Sprite1\": [\n" | |
" \"Issue: Hat block 'abc' is not topLevel. Recommendation: Ensure all scripts start with a top-level hat block that has no parent.\",\n" | |
" \"Issue: Block 'xyz' has unknown opcode 'motion_nonexistent'. Recommendation: Verify the opcode against the Scratch 3.0 block reference. This might be a typo or a deprecated block.\"\n" | |
" ],\n" | |
" \"Sprite2\": [\n" | |
" \"Issue: Found 3 orphaned blocks. Recommendation: Reconnect these blocks to existing scripts or remove them if no longer needed.\"\n" | |
" ]\n" | |
" }\n" | |
"}\n" | |
"```" | |
) | |
try: | |
response = agent.invoke({"messages": [{"role": "user", "content": llm_reviewer_prompt}]}) | |
raw_review_response = response["messages"][-1].content | |
try: | |
state["review_block_feedback"] = extract_json_from_llm_response(raw_review_response) | |
except json.JSONDecodeError: | |
logger.error("Failed to extract JSON from LLM response. Attempting to correct the response.") | |
# Use the JSON resolver agent to fix the response | |
correction_prompt = ( | |
"Correct the following JSON respose to ensure it follows proper schema rules:\n\n" | |
"Take care of following instruction carefully:\n" | |
"1. Do **NOT** put anything inside the json just correct and resolve the issues inside the json.\n" | |
"2. Do **NOT** add any additional text, comments, or explanations.\n" | |
"3. Just response correct the json where you find and put the same content as it is.\n" | |
"Here is the raw response:\n\n" | |
f"raw_response: {raw_review_response}\n\n" | |
) | |
correction_response = agent_json_resolver.invoke({"messages": [{"role": "user", "content": correction_prompt}]}) | |
print(f"[JSON CORRECTOR RESPONSE AT BLOCKVERFIER ]: {correction_response}") | |
state["review_block_feedback"] = extract_json_from_llm_response(correction_response["messages"][-1].content) | |
logger.info("Agent review feedback added to the state.") | |
print("\n--- Agent Review Feedback ---") | |
print(json.dumps(state["review_block_feedback"], indent=2)) | |
except Exception as e: | |
logger.error(f"Error invoking agent for review in BlockVerificationNode: {e}") | |
state["review_block_feedback"] = {"review_summary": {"Overall": [f"Error during LLM review: {e}"]}} | |
else: | |
logger.info("BlockVerificationNode completed: No issues found in any sprite blocks.") | |
print("\n--- Verification Report (No Issues Found) ---") | |
state["block_validation_feedback"] = "No issues found in sprite blocks." | |
state["review_block_feedback"] = {"review_summary": {"Overall": ["No issues found in sprite blocks. All good!"]}} | |
# Manage iteration count based on overall needs_improvement flag | |
if state["needs_improvement"]: | |
state["iteration_count"] = current_iteration + 1 | |
if state["iteration_count"] >= MAX_IMPROVEMENT_ITERATIONS: | |
logger.warning(f"Max improvement iterations ({MAX_IMPROVEMENT_ITERATIONS}) reached for block verification. Forcing 'needs_improvement' to False.") | |
state["needs_improvement"] = False | |
state["block_validation_feedback"] += "\n(Note: Max iterations reached for block verification, stopping further improvements.)" | |
state["improvement_plan"] = improvement_plan | |
state["review_block_feedback"] = {"review_summary": {"Overall": ["No issues found in sprite blocks. All good!"]}} | |
logger.info("BlockVerificationNode found issues and added an improvement plan to the state.") | |
print("\n--- Overall Verification Report (Issues Found) ---") | |
print(json.dumps(improvement_plan, indent=2)) | |
else: | |
state["iteration_count"] = 0 # Reset if no more improvement needed for blocks | |
logger.info(f"Block verification completed. Needs Improvement: {state['needs_improvement']}. Feedback: {state['block_validation_feedback'][:100]}...") | |
print("===========================================================================") | |
print(f"[BLOCK VERIFICATION NODE: (improvement_plan)]:{state["improvement_plan"]}") | |
print(f"[BLOCK VERIFICATION NODE: (review_block_feedback)]:{state["review_block_feedback"]}") | |
return state | |
def improvement_block_builder_node(state: GameState): | |
logger.info("--- Running ImprovementBlockBuilderNode ---") | |
project_json = state["project_json"] | |
targets = project_json["targets"] | |
sprite_map = {target["name"]: target for target in targets if not target["isStage"]} | |
# Also get the Stage target | |
stage_target = next((target for target in targets if target["isStage"]), None) | |
if stage_target: | |
sprite_map[stage_target["name"]] = stage_target | |
# Pre-load all block-catalog JSONs once | |
all_catalogs = [ | |
hat_block_data, | |
boolean_block_data, | |
c_block_data, | |
cap_block_data, | |
reporter_block_data, | |
stack_block_data | |
] | |
improvement_plan = state.get("improvement_plan", {}) | |
block_verification_feedback = state.get("block_validation_feedback", "no feedback") | |
if not improvement_plan: | |
logger.warning("No improvement plan found in state. Skipping ImprovementBlockBuilderNode.") | |
return state | |
script_y_offset = {} | |
script_x_offset_per_sprite = {name: 0 for name in sprite_map.keys()} | |
# This is the handler which ensure if somehow json response changed it handle it.[DONOT REMOVE BELOW LOGIC] | |
if improvement_plan.get("improvement_overall_flow", {})=={}: | |
plan_data = improvement_plan.items() | |
else: | |
plan_data= improvement_plan.get("action_overall_flow", {}).items() | |
for sprite_name, sprite_improvements_data in plan_data: | |
if sprite_name in sprite_map: | |
current_sprite_target = sprite_map[sprite_name] | |
if "blocks" not in current_sprite_target: | |
current_sprite_target["blocks"] = {} | |
if sprite_name not in script_y_offset: | |
script_y_offset[sprite_name] = 0 | |
for plan_entry in sprite_improvements_data.get("plans", []): | |
event_opcode = plan_entry["event"] # This is now expected to be an opcode | |
logic_sequence = plan_entry["logic"] # This is the semicolon-separated string | |
# Extract the new opcode lists from the plan_entry | |
motion_opcodes = plan_entry.get("motion", []) | |
control_opcodes = plan_entry.get("control", []) | |
operator_opcodes = plan_entry.get("operator", []) | |
sensing_opcodes = plan_entry.get("sensing", []) | |
looks_opcodes = plan_entry.get("looks", []) | |
sound_opcodes = plan_entry.get("sound", []) | |
events_opcodes = plan_entry.get("events", []) | |
data_opcodes = plan_entry.get("data", []) | |
needed_opcodes = ( | |
motion_opcodes + control_opcodes + operator_opcodes + | |
sensing_opcodes + looks_opcodes + sound_opcodes + | |
events_opcodes + data_opcodes | |
) | |
needed_opcodes = list(set(needed_opcodes)) | |
# 2) build filtered runtime catalog (if you still need it) | |
filtered_catalog = { | |
op: ALL_SCRATCH_BLOCKS_CATALOG[op] | |
for op in needed_opcodes | |
if op in ALL_SCRATCH_BLOCKS_CATALOG | |
} | |
# 3) merge human-written catalog + runtime entry for each opcode | |
combined_blocks = {} | |
for op in needed_opcodes: | |
catalog_def = find_block_in_all(op, all_catalogs) or {} | |
runtime_def = ALL_SCRATCH_BLOCKS_CATALOG.get(op, {}) | |
# merge: catalog fields first, then runtime overrides/adds | |
merged = {**catalog_def, **runtime_def} | |
combined_blocks[op] = merged | |
print("Combined blocks for this script:", json.dumps(combined_blocks, indent=2)) | |
llm_block_generation_prompt = ( | |
f"You are an AI assistant generating Scratch 3.0 block JSON for a single script based on an improvement plan.\n" | |
f"The current sprite is '{sprite_name}'.\n" | |
f"The specific script to generate blocks for is for the event with opcode '{event_opcode}'.\n" | |
f"The sequential logic to implement is:\n" | |
f" Logic: {logic_sequence}\n\n" | |
f"**Based on the planning, the following Scratch block opcodes are expected to be used to implement this logic. Focus on using these specific opcodes where applicable, and refer to the ALL_SCRATCH_BLOCKS_CATALOG for their full structure and required inputs/fields:**\n" | |
f"Here is the comprehensive catalog of required Scratch 3.0 blocks:\n" | |
f"```json\n{json.dumps(combined_blocks, indent=2)}\n```\n\n" | |
f"Here is feedback ans suggetion you should take care of:\n" | |
f" suggestion:{block_verification_feedback}\n\n" | |
f"Current Scratch project JSON for this sprite (for context, its existing blocks if any, though you should generate a new script entirely if this is a hat block):\n" | |
f"```json\n{json.dumps(current_sprite_target, indent=2)}\n```\n\n" | |
f"**Instructions for generating the block JSON (EXTREMELY IMPORTANT - FOLLOW THESE EXAMPLES PRECISELY):**\n" | |
f"1. **Start with the event block (Hat Block):** This block's `topLevel` should be `true` and `parent` should be `null`. Its `x` and `y` coordinates should be set (e.g., `x: 0, y: 0` or reasonable offsets for multiple scripts).\n" | |
f"2. **Generate a sequence of connected blocks:** For each block, generate a **unique block ID** (e.g., 'myBlockID123').\n" | |
f"3. **Link blocks correctly:**\n" | |
f" * Use the `next` field to point to the ID of the block directly below it in the stack.\n" | |
f" * Use the `parent` field to point to the ID of the block directly above it in the stack.\n" | |
f" * For the last block in a stack, `next` should be `null`.\n" | |
f"4. **Handle `inputs` for parameters and substacks (CRITICAL DETAIL - PAY CLOSE ATTENTION TO EXAMPLES):**\n" | |
f" * The value for any key within the `inputs` dictionary MUST always be an **array** of two elements: `[type_code, value_or_block_id]`.\n" | |
f" * **STRICTLY FORBIDDEN MISTAKE:** DO NOT put an array like `[\"num\", \"value\"]` or `[\"_edge_\", null]` directly as the `value_or_block_id`. This is the source of past errors.\n" | |
f" * The `type_code` (first element of the array) indicates the nature of the input:\n" | |
f" * `1`: For a primitive value or a shadow block ID (e.g., a number, string, boolean, or reference to a shadow block). This is the most common type for direct values.\n" | |
f" * `2`: For a block ID where another block is directly plugged into this input (e.g., an operator block connected to an `if` condition, or the first block of a C-block's substack).\n" | |
f" * **Correct Example for Numerical Input (e.g., for `motion_movesteps` STEPS, or `motion_gotoxy` X/Y):**\n" | |
f" If you need to input the number `10` into a block, you MUST create a separate `math_number` shadow block for it, and then reference its ID.\n" | |
f" ```json\n" | |
f" // Main block using the number\n" | |
f" \"mainBlockID\": {{\n" | |
f" \"opcode\": \"motion_movesteps\",\n" | |
f" \"inputs\": {{\n" | |
f" \"STEPS\": [1, \"shadowNumID\"]\n" | |
f" }},\n" | |
f" // ... other fields\n" | |
f" }},\n" | |
f"\n" | |
f" // The separate shadow block for the number '10'\n" | |
f" \"shadowNumID\": {{\n" | |
f" \"opcode\": \"math_number\",\n" | |
f" \"fields\": {{\n" | |
f" \"NUM\": [\"10\", null]\n" | |
f" }},\n" | |
f" \"shadow\": true,\n" | |
f" \"parent\": \"mainBlockID\",\n" | |
f" \"topLevel\": false\n" | |
f" }}\n" | |
f" ```\n" | |
f" * **Correct Example for Dropdown/Menu Input (e.g., `sensing_touchingobject` with 'edge'):**\n" | |
f" If you need to select 'edge' for the `touching ()?` block, you MUST create a separate `sensing_touchingobjectmenu` shadow block and reference its ID.\n" | |
f" ```json\n" | |
f" // Main block using the dropdown selection\n" | |
f" \"touchingBlockID\": {{\n" | |
f" \"opcode\": \"sensing_touchingobject\",\n" | |
f" \"inputs\": {{\n" | |
f" \"TOUCHINGOBJECTMENU\": [1, \"shadowEdgeMenuID\"]\n" | |
f" }},\n" | |
f" // ... other fields\n" | |
f" }},\n" | |
f"\n" | |
f" // The separate shadow block for the 'edge' menu option\n" | |
f" \"shadowEdgeMenuID\": {{\n" | |
f" \"opcode\": \"sensing_touchingobjectmenu\",\n" | |
f" \"fields\": {{\n" | |
f" \"TOUCHINGOBJECTMENU\": [\"_edge_\", null]\n" | |
f" }},\n" | |
f" \"shadow\": true,\n" | |
f" \"parent\": \"touchingBlockID\",\n" | |
f" \"topLevel\": false\n" | |
f" }}\n" | |
f" ```\n" | |
f" * **Correct Example for C-block (e.g., `control_forever`):**\n" | |
f" The `control_forever` block MUST have a `SUBSTACK` input pointing to the first block inside its loop. The value for `SUBSTACK` must be an array: `[2, \"FIRST_BLOCK_IN_FOREVER_LOOP_ID\"]`.\n" | |
f" ```json\n" | |
f" \"foreverBlockID\": {{\n" | |
f" \"opcode\": \"control_forever\",\n" | |
f" \"inputs\": {{\n" | |
f" \"SUBSTACK\": [2, \"firstBlockInsideForeverID\"]\n" | |
f" }},\n" | |
f" \"next\": null,\n" | |
f" \"parent\": \"blockAboveForeverID\",\n" | |
f" \"shadow\": false,\n" | |
f" \"topLevel\": false\n" | |
f" }},\n" | |
f" \"firstBlockInsideForeverID\": {{\n" | |
f" // ... definition of the first block inside the forever loop\n" | |
f" \"parent\": \"foreverBlockID\",\n" | |
f" \"next\": \"secondBlockInsideForeverID\" // if there's another block\n" | |
f" }}\n" | |
f" ```\n" | |
f" * **Correct Example for C-block with condition (e.g., `control_if`):**\n" | |
f" The `control_if` block MUST have a `CONDITION` input (typically `type_code: 1` referencing a boolean reporter block) and a `SUBSTACK` input (`type_code: 2` referencing the first block inside the if-body).\n" | |
f" ```json\n" | |
f" \"ifBlockID\": {{\n" | |
f" \"opcode\": \"control_if\",\n" | |
f" \"inputs\": {{\n" | |
f" \"CONDITION\": [1, \"conditionBlockID\"],\n" | |
f" \"SUBSTACK\": [2, \"firstBlockInsideIfID\"]\n" | |
f" }},\n" | |
f" \"next\": \"blockAfterIfID\",\n" | |
f" \"parent\": \"blockAboveIfID\",\n" | |
f" \"shadow\": false,\n" | |
f" \"topLevel\": false\n" | |
f" }},\n" | |
f" \"conditionBlockID\": {{\n" | |
f" \"opcode\": \"sensing_touchingobject\", // Example condition block\n" | |
f" // ... definition for condition block, parent should be \"ifBlockID\"\n" | |
f" }},\n" | |
f" \"firstBlockInsideIfID\": {{\n" | |
f" // ... definition of the first block inside the if body\n" | |
f" \"parent\": \"ifBlockID\",\n" | |
f" \"next\": null // or next block if more\n" | |
f" }}\n" | |
f" ```\n" | |
f"5. **Define ALL Shadow Blocks Separately (THIS IS ESSENTIAL):** Every time a block's input requires a number, string literal, or a selection from a dropdown/menu, you MUST define a **separate block entry** in the top-level blocks dictionary for that shadow. Each shadow block MUST have `\"shadow\": true` and `\"topLevel\": false`.\n" | |
f"6. **`fields` for direct dropdown values/text:** Use the `fields` dictionary ONLY IF the block directly embeds a dropdown value or text field without an `inputs` connection (e.g., the `KEY_OPTION` field within the `event_whenkeypressed` shadow block itself, or the `variable` field on a `data_setvariableto` block). Example: `\"fields\": {{\"KEY_OPTION\": [\"space\", null]}}`.\n" | |
f"7. **`topLevel: true` for hat blocks:** Only the starting (hat) block of a script should have `\"topLevel\": true`.\n" | |
f"8. **Ensure Unique Block IDs:** Every block you generate (main blocks and shadow blocks) must have a unique ID within the entire script's block dictionary.\n" | |
f"9. **Strictly Use Catalog Opcodes:** You MUST only use `opcode` values that are present in the provided `ALL_SCRATCH_BLOCKS_CATALOG`. Do NOT use unlisted opcodes like `motion_jump`.\n" | |
f"10. **Return ONLY the JSON object representing all the blocks for THIS SINGLE SCRIPT.** Do NOT wrap it in a 'blocks' key or the full project JSON. The output should be a dictionary where keys are block IDs and values are block definitions." | |
) | |
try: | |
response = agent.invoke({"messages": [{"role": "user", "content": llm_block_generation_prompt}]}) | |
raw_response = response["messages"][-1].content | |
logger.info(f"Raw response from LLM [ImprovementBlockBuilderNode - {sprite_name} - {event_opcode}]: {raw_response[:500]}...") | |
try: | |
generated_blocks = extract_json_from_llm_response(raw_response) | |
except json.JSONDecodeError: | |
logger.error("Failed to extract JSON from LLM response. Attempting to correct the response.") | |
# Use the JSON resolver agent to fix the response | |
correction_prompt = ( | |
"Correct the following JSON respose to ensure it follows proper schema rules:\n\n" | |
"Take care of following instruction carefully:\n" | |
"1. Do **NOT** put anything inside the json just correct and resolve the issues inside the json.\n" | |
"2. Do **NOT** add any additional text, comments, or explanations.\n" | |
"3. Just response correct the json where you find and put the same content as it is.\n" | |
"Here is the raw response:\n\n" | |
f"raw_response: {raw_response}\n\n" | |
) | |
correction_response = agent_json_resolver.invoke({"messages": [{"role": "user", "content": correction_prompt}]}) | |
print(f"[JSON CORRECTOR RESPONSE AT IMPROVEMENTBLOCKBUILDER ]: {correction_response}") | |
generated_blocks = extract_json_from_llm_response(correction_response["messages"][-1].content) | |
if "blocks" in generated_blocks and isinstance(generated_blocks["blocks"], dict): | |
logger.warning(f"LLM returned nested 'blocks' key for {sprite_name}. Unwrapping.") | |
generated_blocks = generated_blocks["blocks"] | |
# Update block positions for top-level script | |
for block_id, block_data in generated_blocks.items(): | |
if block_data.get("topLevel"): | |
block_data["x"] = script_x_offset_per_sprite.get(sprite_name, 0) | |
block_data["y"] = script_y_offset[sprite_name] | |
script_y_offset[sprite_name] += 150 # Increment for next script | |
current_sprite_target["blocks"].update(generated_blocks) | |
logger.info(f"Improvement blocks added for sprite '{sprite_name}', script '{event_opcode}' by ImprovementBlockBuilderNode.") | |
except Exception as e: | |
logger.error(f"Error generating blocks for sprite '{sprite_name}', script '{event_opcode}': {e}") | |
raise | |
state["project_json"] = project_json # Update the state with the modified project_json | |
logger.info("Updated project JSON with improvement nodes.") | |
print("Updated project JSON with improvement nodes:", json.dumps(project_json, indent=2)) # Print for direct visibility | |
return state # Return the state object | |
#temporarry time delay for handling TPM issue | |
import time | |
def delay_for_tpm_node(state: GameState): | |
logger.info("--- Running DelayForTPMNode ---") | |
time.sleep(80) # Adjust the delay as needed | |
logger.info("Delay completed.") | |
return state | |
# Build the LangGraph workflow | |
workflow = StateGraph(GameState) | |
# Add all nodes to the workflow | |
workflow.add_node("parse_query", parse_query_and_set_initial_positions) | |
workflow.add_node("game_description", game_description_node) | |
workflow.add_node("time_delay_1", delay_for_tpm_node) # this is a temporary node to handle TPM issues | |
workflow.add_node("time_delay_2", delay_for_tpm_node) # this is a temporary node to handle TPM issues | |
workflow.add_node("time_delay_3", delay_for_tpm_node) # this is a temporary node to handle TPM issues | |
workflow.add_node("time_delay_4", delay_for_tpm_node) # this is a temporary node to handle TPM issues | |
workflow.add_node("time_delay_5", delay_for_tpm_node) # this is a temporary node to handle TPM issues | |
workflow.add_node("initial_plan_build", overall_planner_node) # High-level planning node | |
workflow.add_node("plan_verifier", plan_verification_node) # Verifies the high-level plan | |
workflow.add_node("refined_planner", refined_planner_node) # Refines the action plan | |
workflow.add_node("block_builder", overall_block_builder_node) # Builds blocks from a plan | |
workflow.add_node("block_verifier", block_verification_node) # Verifies the generated blocks | |
workflow.add_node("improved_block_builder", improvement_block_builder_node) # For specific block-level improvements | |
# Set the entry point | |
workflow.set_entry_point("game_description") | |
# Define the standard initial flow | |
workflow.add_edge("game_description", "parse_query") | |
workflow.add_edge("parse_query", "time_delay_1") | |
workflow.add_edge("time_delay_1", "initial_plan_build") | |
workflow.add_edge("initial_plan_build", "time_delay_5") | |
workflow.add_edge("time_delay_5", "plan_verifier") | |
# Define the conditional logic after plan_verifier (for high-level plan issues) | |
def decide_next_step_after_plan_verification(state: GameState): | |
if state.get("needs_improvement", False): | |
# If the plan needs refinement, go to the refined_planner | |
return "refined_planner" | |
else: | |
# If the plan is good, proceed to building blocks from this plan | |
return "block_builder" | |
workflow.add_conditional_edges( | |
"plan_verifier", | |
decide_next_step_after_plan_verification, | |
{ | |
"refined_planner": "refined_planner", # Path if plan needs refinement | |
"block_builder": "block_builder" # Path if plan is approved, proceeds to block building | |
} | |
) | |
# --- CRITICAL CHANGE FOR THE PLAN REFINEMENT LOOP --- | |
# After refining the plan, it should go back to plan_verifier for re-verification. | |
workflow.add_edge("refined_planner", "time_delay_2") | |
workflow.add_edge("time_delay_2", "plan_verifier") # This closes the loop for plan refinement and re-verification. | |
# Note: The original code had workflow.add_edge("time_delay", "block_builder") here, | |
# but after refined_planner -> time_delay -> plan_verifier, the decision is made by plan_verifier. | |
# So, this edge might be redundant or incorrect depending on the desired flow. | |
# Assuming the intent is for plan_verifier to always decide the next step. | |
# After blocks are built, they need to be verified | |
#workflow.add_edge("time_delay_3", "block_builder") | |
workflow.add_edge("block_builder", "time_delay_3") | |
workflow.add_edge("time_delay_3", "block_verifier") | |
# Define the conditional logic after block_verifier (for generated blocks issues) | |
def decide_after_block_verification(state: GameState): | |
if state.get("needs_improvement", False): | |
# If blocks need improvement, go to improved_block_builder. | |
# This assumes improved_block_builder handles specific block-level fixes. | |
return "improved_block_builder" | |
else: | |
# If blocks are good, end the workflow | |
return "END" | |
workflow.add_conditional_edges( | |
"block_verifier", | |
decide_after_block_verification, | |
{ | |
"improved_block_builder": "improved_block_builder", # Path if blocks need improvement | |
"END": END # Path if blocks are good | |
} | |
) | |
# Create the loop: If blocks improved, re-verify them | |
#workflow.add_edge("time_delay_4", "improved_block_builder") | |
workflow.add_edge("improved_block_builder", "time_delay_4") | |
workflow.add_edge("time_delay_4", "block_verifier") | |
# Compile the workflow graph | |
app_graph = workflow.compile() | |
# # Build the LangGraph workflow | |
# workflow = StateGraph(GameState) | |
# # Add all nodes to the workflow | |
# workflow.add_node("parse_query", parse_query_and_set_initial_positions) | |
# workflow.add_node("game_description", game_description_node) | |
# workflow.add_node("initial_plan_build", overall_planner_node) # High-level planning node | |
# workflow.add_node("plan_verifier", plan_verification_node) # Verifies the high-level plan | |
# workflow.add_node("refined_planner", refined_planner_node) # Refines the action plan | |
# workflow.add_node("block_builder", overall_block_builder_node) # Builds blocks from a plan | |
# workflow.add_node("block_verifier", block_verification_node) # Verifies the generated blocks | |
# workflow.add_node("improved_block_builder", improvement_block_builder_node) # For specific block-level improvements | |
# # Set the entry point | |
# workflow.set_entry_point("game_description") | |
# # Define the standard initial flow | |
# workflow.add_edge("game_description", "parse_query") | |
# workflow.add_edge("parse_query", "initial_plan_build") | |
# workflow.add_edge("initial_plan_build", "plan_verifier") | |
# # Define the conditional logic after plan_verifier (for high-level plan issues) | |
# def decide_next_step_after_plan_verification(state: GameState): | |
# if state.get("needs_improvement", False): | |
# # If the plan needs refinement, go to the refined_planner | |
# return "refined_planner" | |
# else: | |
# # If the plan is good, proceed to building blocks from this plan | |
# return "block_builder" | |
# workflow.add_conditional_edges( | |
# "plan_verifier", | |
# decide_next_step_after_plan_verification, | |
# { | |
# "refined_planner": "refined_planner", # Path if plan needs refinement | |
# "block_builder": "block_builder" # Path if plan is approved, proceeds to block building | |
# } | |
# ) | |
# # --- CRITICAL CHANGE FOR THE PLAN REFINEMENT LOOP --- | |
# # After refining the plan, it should go back to plan_verifier for re-verification. | |
# workflow.add_edge("refined_planner", "plan_verifier") # This closes the loop for plan refinement and re-verification. | |
# # After blocks are built, they need to be verified | |
# workflow.add_edge("block_builder", "block_verifier") | |
# # Define the conditional logic after block_verifier (for generated blocks issues) | |
# def decide_after_block_verification(state: GameState): | |
# if state.get("needs_improvement", False): | |
# # If blocks need improvement, go to improved_block_builder. | |
# # This assumes improved_block_builder handles specific block-level fixes. | |
# return "improved_block_builder" | |
# else: | |
# # If blocks are good, end the workflow | |
# return "END" | |
# workflow.add_conditional_edges( | |
# "block_verifier", | |
# decide_after_block_verification, | |
# { | |
# "improved_block_builder": "improved_block_builder", # Path if blocks need improvement | |
# "END": END # Path if blocks are good | |
# } | |
# ) | |
# # Create the loop: If blocks improved, re-verify them | |
# workflow.add_edge("improved_block_builder", "block_verifier") | |
# # Compile the workflow graph | |
# app_graph = workflow.compile() | |
from IPython.display import Image, display | |
png_bytes = app_graph.get_graph().draw_mermaid_png() | |
with open("langgraph_workflow.png", "wb") as f: | |
f.write(png_bytes) | |
### Modified `generate_game` Endpoint | |
def generate_game(): | |
payload = request.json | |
desc = payload.get("description", "") | |
backdrops = payload.get("backdrops", []) | |
sprites = payload.get("sprites", []) | |
logger.info(f"Starting game generation for description: '{desc}'") | |
# 1) Initial skeleton generation | |
project_skeleton = { | |
"targets": [ | |
{ | |
"isStage": True, | |
"name":"Stage", | |
"objName": "Stage", # ADDED THIS FOR THE STAGE | |
"variables":{}, # This must be a dict: { "var_id": ["var_name", value] } | |
"lists":{}, "broadcasts":{}, | |
"blocks":{}, "comments":{}, | |
"currentCostume": len(backdrops)-1 if backdrops else 0, | |
"costumes": [make_costume_entry("backdrops",b["filename"],b["name"]) | |
for b in backdrops], | |
"sounds": [], "volume":100,"layerOrder":0, | |
"tempo":60,"videoTransparency":50,"videoState":"on", | |
"textToSpeechLanguage": None | |
} | |
], | |
"monitors": [], "extensions": [], "meta":{ | |
"semver":"3.0.0","vm":"11.1.0", | |
"agent": request.headers.get("User-Agent","") | |
} | |
} | |
for idx, s in enumerate(sprites, start=1): | |
costume = make_costume_entry("sprites", s["filename"], s["name"]) | |
project_skeleton["targets"].append({ | |
"isStage": False, | |
"name": s["name"], | |
"objName": s["name"], # objName is also important for sprites | |
"variables":{}, "lists":{}, "broadcasts":{}, | |
"blocks":{}, | |
"comments":{}, | |
"currentCostume":0, | |
"costumes":[costume], | |
"sounds":[], "volume":100, | |
"layerOrder": idx+1, | |
"visible":True, "x":0,"y":0,"size":100,"direction":90, | |
"draggable":False, "rotationStyle":"all around" | |
}) | |
logger.info("Initial project skeleton created.") | |
project_id = str(uuid.uuid4()) | |
project_folder = os.path.join("generated_projects", project_id) | |
try: | |
os.makedirs(project_folder, exist_ok=True) | |
# Save initial skeleton and copy assets | |
project_json_path = os.path.join(project_folder, "project.json") | |
with open(project_json_path, "w") as f: | |
json.dump(project_skeleton, f, indent=2) | |
logger.info(f"Initial project skeleton saved to {project_json_path}") | |
for b in backdrops: | |
src_path = os.path.join(app.static_folder, "assets", "backdrops", b["filename"]) | |
dst_path = os.path.join(project_folder, b["filename"]) | |
if os.path.exists(src_path): | |
shutil.copy(src_path, dst_path) | |
else: | |
logger.warning(f"Source backdrop asset not found: {src_path}") | |
for s in sprites: | |
src_path = os.path.join(app.static_folder, "assets", "sprites", s["filename"]) | |
dst_path = os.path.join(project_folder, s["filename"]) | |
if os.path.exists(src_path): | |
shutil.copy(src_path, dst_path) | |
else: | |
logger.warning(f"Source sprite asset not found: {src_path}") | |
logger.info("Assets copied to project folder.") | |
# Initialize the state for LangGraph as a dictionary matching the TypedDict structure | |
initial_state_dict: GameState = { # Type hint for clarity | |
"project_json": project_skeleton, | |
"description": desc, | |
"project_id": project_id, | |
"sprite_initial_positions": {}, | |
"action_plan": {}, | |
#"behavior_plan": {}, | |
"improvement_plan": {}, | |
"needs_improvement": False, | |
"plan_validation_feedback": {}, | |
"iteration_count": 0, | |
"review_block_feedback": {} | |
} | |
# Run the LangGraph workflow with the dictionary input. | |
final_state_dict: GameState = app_graph.invoke(initial_state_dict) | |
# Access elements from the final_state_dict | |
final_project_json = final_state_dict['project_json'] | |
# Save the *final* filled project JSON, overwriting the skeleton | |
with open(project_json_path, "w") as f: | |
json.dump(final_project_json, f, indent=2) | |
logger.info(f"Final project JSON saved to {project_json_path}") | |
return jsonify({"message": "Game generated successfully", "project_id": project_id}) | |
except Exception as e: | |
logger.error(f"Error during game generation workflow for project ID {project_id}: {e}", exc_info=True) | |
return jsonify(error=f"Error generating game: {e}"), 500 | |
if __name__=="__main__": | |
logger.info("Starting Flask application...") | |
app.run(debug=True,port=5000) |