"""Planner: turns a user request into a JSON tool-plan via Azure OpenAI *Responses*.""" from __future__ import annotations import json import logging from pathlib import Path from typing import Dict, List, Any import yaml from services.llm_client import LLMClient from config.settings import settings from services.cost_tracker import CostTracker _PROMPTS_FILE = Path(__file__).parent.parent / "config" / "prompts.yaml" # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class Planner: """Generate a plan with the Responses API; fall back to a static template if parsing fails.""" def __init__(self, cost_tracker=None) -> None: self.prompt_template = self._load_prompt("planner") self.llm = LLMClient(settings) self.cost_tracker = cost_tracker or CostTracker() logger.info("Planner initialized with prompt template") # -------------------------------------------------- def build_plan( self, pdf_meta: Dict[str, Any], fields: List[str], doc_preview: str | None = None, field_descs: Dict | None = None, strategy: str = "Original Strategy", unique_indices: List[str] | None = None, unique_indices_descriptions: Dict[str, str] | None = None, ) -> Dict[str, Any]: """Return a JSON dict representing the execution plan.""" logger.info(f"Building plan for strategy: {strategy}") logger.info(f"Fields: {fields}") logger.info(f"Unique indices: {unique_indices}") logger.info(f"Unique indices descriptions: {unique_indices_descriptions}") logger.info(f"Field descriptions: {field_descs}") # For Unique Indices Strategy, use static plan directly if strategy == "Unique Indices Strategy": logger.info("Using static plan for Unique Indices Strategy") return self._static_plan(fields, strategy, unique_indices, unique_indices_descriptions, field_descs) # For Original Strategy, try LLM first user_context = { "pdf_meta": pdf_meta, "doc_preview": doc_preview or "", "fields": fields, "field_descriptions": field_descs or {}, "strategy": strategy, "unique_indices": unique_indices or [], "unique_indices_descriptions": unique_indices_descriptions or {}, } logger.info(f"Building plan for fields: {fields}") logger.info(f"Using strategy: {strategy}") if unique_indices: logger.info(f"Unique indices: {unique_indices}") logger.info(f"Unique indices descriptions: {unique_indices_descriptions}") logger.debug(f"User context: {user_context}") prompt = self.prompt_template.format_json(**user_context) logger.debug(f"Generated prompt: {prompt}") try: logger.info("Calling LLM to generate plan") raw = self.llm.responses( prompt, temperature=0.0, ctx={"cost_tracker": self.cost_tracker}, description="Execution Plan Generation" ) logger.debug(f"Raw LLM response: {raw}") try: logger.info("Parsing LLM response as JSON") plan = json.loads(raw) logger.debug(f"Parsed plan: {plan}") # ensure minimal structure exists if "steps" in plan and "fields" in plan: logger.info("Plan successfully generated with required structure") # Add pdf_meta and strategy info to the plan plan["pdf_meta"] = pdf_meta plan["strategy"] = strategy if unique_indices: plan["unique_indices"] = unique_indices if unique_indices_descriptions: plan["unique_indices_descriptions"] = unique_indices_descriptions if field_descs: plan["field_descriptions"] = field_descs return plan else: missing_keys = [] if "steps" not in plan: missing_keys.append("steps") if "fields" not in plan: missing_keys.append("fields") logger.error(f"Planner: LLM output missing required keys: {missing_keys}. Output: {raw}") except json.JSONDecodeError as parse_exc: logger.error(f"Planner: Failed to parse LLM output as JSON. Output: {raw}") logger.error(f"JSON parsing error: {parse_exc}") except Exception as parse_exc: logger.error(f"Planner: Unexpected error parsing LLM output: {parse_exc}") logger.error(f"LLM output: {raw}") except Exception as llm_exc: logger.error(f"Planner: LLM call failed: {llm_exc}") logger.exception("Full traceback:") # ---------- fallback static plan ---------- logger.info("Falling back to static plan") return self._static_plan(fields, strategy, unique_indices, unique_indices_descriptions, field_descs) # -------------------------------------------------- @staticmethod def _load_prompt(name: str): try: data = yaml.safe_load(_PROMPTS_FILE.read_text()) logger.debug(f"Loaded prompt template for '{name}'") except Exception as e: logger.error(f"Failed to load prompt template: {e}") data = {} class _Fmt: def __init__(self, s: str): self.s = s def format_json(self, **kwargs): # Format the template with the provided fields fields = kwargs.get("fields", []) field_descriptions = kwargs.get("field_descriptions", {}) doc_preview = kwargs.get("doc_preview", "") pdf_meta = kwargs.get("pdf_meta", {}) strategy = kwargs.get("strategy", "Original Strategy") unique_indices = kwargs.get("unique_indices", []) unique_indices_descriptions = kwargs.get("unique_indices_descriptions", {}) # Create a formatted string with the actual values formatted = self.s if fields: # Ensure fields is a flat list of strings fields_json = json.dumps([str(f) for f in fields]) formatted = formatted.replace("", fields_json) if field_descriptions: formatted = formatted.replace("field_descriptions for extra context", f"field descriptions: {json.dumps(field_descriptions)}") if doc_preview: formatted = formatted.replace("a few kB of raw text from the uploaded document", f"document preview: {doc_preview[:1000]}...") if pdf_meta: formatted = formatted.replace("pdf_meta / field_descriptions for extra context", f"document metadata: {json.dumps(pdf_meta)}") if strategy: formatted = formatted.replace("strategy for extraction", f"extraction strategy: {strategy}") if unique_indices: formatted = formatted.replace("unique indices for extraction", f"unique indices: {json.dumps(unique_indices)}") if unique_indices_descriptions: formatted = formatted.replace("unique indices descriptions for extra context", f"unique indices descriptions: {json.dumps(unique_indices_descriptions)}") return formatted return _Fmt(data.get(name, "You are a planning agent. Produce a JSON tool plan.")) # -------------------------------------------------- @staticmethod def _static_plan(fields: List[str], strategy: str = "Original Strategy", unique_indices: List[str] | None = None, unique_indices_descriptions: Dict[str, str] | None = None, field_descs: Dict | None = None) -> Dict[str, Any]: """Return a hard-coded plan to guarantee offline functionality.""" logger.info("Generating static fallback plan") logger.info(f"Strategy: {strategy}") logger.info(f"Fields: {fields}") logger.info(f"Unique indices: {unique_indices}") logger.info(f"Unique indices descriptions: {unique_indices_descriptions}") logger.info(f"Field descriptions: {field_descs}") if strategy == "Unique Indices Strategy": steps = [ {"tool": "PDFAgent", "args": {}}, {"tool": "TableAgent", "args": {}}, {"tool": "UniqueIndicesCombinator", "args": {}}, {"tool": "UniqueIndicesLoopAgent", "args": {}}, ] logger.info("Generated plan for Unique Indices Strategy") logger.info(f"Steps: {steps}") else: steps = [ {"tool": "PDFAgent", "args": {}}, {"tool": "TableAgent", "args": {}}, { "tool": "ForEachField", "loop": [ {"tool": "FieldMapper", "args": {"field": "$field"}}, ], }, ] logger.info("Generated plan for Original Strategy") logger.info(f"Steps: {steps}") plan = { "steps": steps, "fields": fields, "pdf_meta": {}, "strategy": strategy } if unique_indices: plan["unique_indices"] = unique_indices if unique_indices_descriptions: plan["unique_indices_descriptions"] = unique_indices_descriptions if field_descs: plan["field_descriptions"] = field_descs logger.info(f"Final plan: {json.dumps(plan, indent=2)}") return plan