levalencia's picture
feat: update unique indices combinator to return array of objects
f98e92f
"""Planner: turns a user request into a JSON tool-plan via Azure OpenAI *Responses*."""
from __future__ import annotations
import json
import logging
from pathlib import Path
from typing import Dict, List, Any
import yaml
from services.llm_client import LLMClient
from config.settings import settings
from services.cost_tracker import CostTracker
_PROMPTS_FILE = Path(__file__).parent.parent / "config" / "prompts.yaml"
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class Planner:
"""Generate a plan with the Responses API; fall back to a static template if parsing fails."""
def __init__(self, cost_tracker=None) -> None:
self.prompt_template = self._load_prompt("planner")
self.llm = LLMClient(settings)
self.cost_tracker = cost_tracker or CostTracker()
logger.info("Planner initialized with prompt template")
# --------------------------------------------------
def build_plan(
self,
pdf_meta: Dict[str, Any],
fields: List[str],
doc_preview: str | None = None,
field_descs: Dict | None = None,
strategy: str = "Original Strategy",
unique_indices: List[str] | None = None,
unique_indices_descriptions: Dict[str, str] | None = None,
) -> Dict[str, Any]:
"""Return a JSON dict representing the execution plan."""
logger.info(f"Building plan for strategy: {strategy}")
logger.info(f"Fields: {fields}")
logger.info(f"Unique indices: {unique_indices}")
logger.info(f"Unique indices descriptions: {unique_indices_descriptions}")
# For Unique Indices Strategy, use static plan directly
if strategy == "Unique Indices Strategy":
logger.info("Using static plan for Unique Indices Strategy")
return self._static_plan(fields, strategy, unique_indices, unique_indices_descriptions)
# For Original Strategy, try LLM first
user_context = {
"pdf_meta": pdf_meta,
"doc_preview": doc_preview or "",
"fields": fields,
"field_descriptions": field_descs or {},
"strategy": strategy,
"unique_indices": unique_indices or [],
"unique_indices_descriptions": unique_indices_descriptions or {},
}
logger.info(f"Building plan for fields: {fields}")
logger.info(f"Using strategy: {strategy}")
if unique_indices:
logger.info(f"Unique indices: {unique_indices}")
logger.info(f"Unique indices descriptions: {unique_indices_descriptions}")
logger.debug(f"User context: {user_context}")
prompt = self.prompt_template.format_json(**user_context)
logger.debug(f"Generated prompt: {prompt}")
try:
logger.info("Calling LLM to generate plan")
raw = self.llm.responses(
prompt,
temperature=0.0,
ctx={"cost_tracker": self.cost_tracker},
description="Execution Plan Generation"
)
logger.debug(f"Raw LLM response: {raw}")
try:
logger.info("Parsing LLM response as JSON")
plan = json.loads(raw)
logger.debug(f"Parsed plan: {plan}")
# ensure minimal structure exists
if "steps" in plan and "fields" in plan:
logger.info("Plan successfully generated with required structure")
# Add pdf_meta and strategy info to the plan
plan["pdf_meta"] = pdf_meta
plan["strategy"] = strategy
if unique_indices:
plan["unique_indices"] = unique_indices
if unique_indices_descriptions:
plan["unique_indices_descriptions"] = unique_indices_descriptions
return plan
else:
missing_keys = []
if "steps" not in plan:
missing_keys.append("steps")
if "fields" not in plan:
missing_keys.append("fields")
logger.error(f"Planner: LLM output missing required keys: {missing_keys}. Output: {raw}")
except json.JSONDecodeError as parse_exc:
logger.error(f"Planner: Failed to parse LLM output as JSON. Output: {raw}")
logger.error(f"JSON parsing error: {parse_exc}")
except Exception as parse_exc:
logger.error(f"Planner: Unexpected error parsing LLM output: {parse_exc}")
logger.error(f"LLM output: {raw}")
except Exception as llm_exc:
logger.error(f"Planner: LLM call failed: {llm_exc}")
logger.exception("Full traceback:")
# ---------- fallback static plan ----------
logger.info("Falling back to static plan")
return self._static_plan(fields, strategy, unique_indices, unique_indices_descriptions)
# --------------------------------------------------
@staticmethod
def _load_prompt(name: str):
try:
data = yaml.safe_load(_PROMPTS_FILE.read_text())
logger.debug(f"Loaded prompt template for '{name}'")
except Exception as e:
logger.error(f"Failed to load prompt template: {e}")
data = {}
class _Fmt:
def __init__(self, s: str):
self.s = s
def format_json(self, **kwargs):
# Format the template with the provided fields
fields = kwargs.get("fields", [])
field_descriptions = kwargs.get("field_descriptions", {})
doc_preview = kwargs.get("doc_preview", "")
pdf_meta = kwargs.get("pdf_meta", {})
strategy = kwargs.get("strategy", "Original Strategy")
unique_indices = kwargs.get("unique_indices", [])
unique_indices_descriptions = kwargs.get("unique_indices_descriptions", {})
# Create a formatted string with the actual values
formatted = self.s
if fields:
# Ensure fields is a flat list of strings
fields_json = json.dumps([str(f) for f in fields])
formatted = formatted.replace("<same list you received>", fields_json)
if field_descriptions:
formatted = formatted.replace("field_descriptions for extra context", f"field descriptions: {json.dumps(field_descriptions)}")
if doc_preview:
formatted = formatted.replace("a few kB of raw text from the uploaded document", f"document preview: {doc_preview[:1000]}...")
if pdf_meta:
formatted = formatted.replace("pdf_meta / field_descriptions for extra context", f"document metadata: {json.dumps(pdf_meta)}")
if strategy:
formatted = formatted.replace("strategy for extraction", f"extraction strategy: {strategy}")
if unique_indices:
formatted = formatted.replace("unique indices for extraction", f"unique indices: {json.dumps(unique_indices)}")
if unique_indices_descriptions:
formatted = formatted.replace("unique indices descriptions for extra context", f"unique indices descriptions: {json.dumps(unique_indices_descriptions)}")
return formatted
return _Fmt(data.get(name, "You are a planning agent. Produce a JSON tool plan."))
# --------------------------------------------------
@staticmethod
def _static_plan(fields: List[str], strategy: str = "Original Strategy", unique_indices: List[str] | None = None, unique_indices_descriptions: Dict[str, str] | None = None) -> Dict[str, Any]:
"""Return a hard-coded plan to guarantee offline functionality."""
logger.info("Generating static fallback plan")
logger.info(f"Strategy: {strategy}")
logger.info(f"Fields: {fields}")
logger.info(f"Unique indices: {unique_indices}")
logger.info(f"Unique indices descriptions: {unique_indices_descriptions}")
if strategy == "Unique Indices Strategy":
steps = [
{"tool": "PDFAgent", "args": {}},
{"tool": "TableAgent", "args": {}},
{"tool": "UniqueIndicesCombinator", "args": {}},
]
logger.info("Generated plan for Unique Indices Strategy")
logger.info(f"Steps: {steps}")
else:
steps = [
{"tool": "PDFAgent", "args": {}},
{"tool": "TableAgent", "args": {}},
{
"tool": "ForEachField",
"loop": [
{"tool": "FieldMapper", "args": {"field": "$field"}},
],
},
]
logger.info("Generated plan for Original Strategy")
logger.info(f"Steps: {steps}")
plan = {
"steps": steps,
"fields": fields,
"pdf_meta": {},
"strategy": strategy
}
if unique_indices:
plan["unique_indices"] = unique_indices
if unique_indices_descriptions:
plan["unique_indices_descriptions"] = unique_indices_descriptions
logger.info(f"Final plan: {json.dumps(plan, indent=2)}")
return plan