Spaces:
Running
Running
"""Planner: turns a user request into a JSON tool-plan via Azure OpenAI *Responses*.""" | |
from __future__ import annotations | |
import json | |
import logging | |
from pathlib import Path | |
from typing import Dict, List, Any | |
import yaml | |
from services.llm_client import LLMClient | |
from config.settings import settings | |
from services.cost_tracker import CostTracker | |
_PROMPTS_FILE = Path(__file__).parent.parent / "config" / "prompts.yaml" | |
# Configure logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
class Planner: | |
"""Generate a plan with the Responses API; fall back to a static template if parsing fails.""" | |
def __init__(self, cost_tracker=None) -> None: | |
self.prompt_template = self._load_prompt("planner") | |
self.llm = LLMClient(settings) | |
self.cost_tracker = cost_tracker or CostTracker() | |
logger.info("Planner initialized with prompt template") | |
# -------------------------------------------------- | |
def build_plan( | |
self, | |
pdf_meta: Dict[str, Any], | |
fields: List[str], | |
doc_preview: str | None = None, | |
field_descs: Dict | None = None, | |
strategy: str = "Original Strategy", | |
unique_indices: List[str] | None = None, | |
unique_indices_descriptions: Dict[str, str] | None = None, | |
) -> Dict[str, Any]: | |
"""Return a JSON dict representing the execution plan.""" | |
logger.info(f"Building plan for strategy: {strategy}") | |
logger.info(f"Fields: {fields}") | |
logger.info(f"Unique indices: {unique_indices}") | |
logger.info(f"Unique indices descriptions: {unique_indices_descriptions}") | |
# For Unique Indices Strategy, use static plan directly | |
if strategy == "Unique Indices Strategy": | |
logger.info("Using static plan for Unique Indices Strategy") | |
return self._static_plan(fields, strategy, unique_indices, unique_indices_descriptions) | |
# For Original Strategy, try LLM first | |
user_context = { | |
"pdf_meta": pdf_meta, | |
"doc_preview": doc_preview or "", | |
"fields": fields, | |
"field_descriptions": field_descs or {}, | |
"strategy": strategy, | |
"unique_indices": unique_indices or [], | |
"unique_indices_descriptions": unique_indices_descriptions or {}, | |
} | |
logger.info(f"Building plan for fields: {fields}") | |
logger.info(f"Using strategy: {strategy}") | |
if unique_indices: | |
logger.info(f"Unique indices: {unique_indices}") | |
logger.info(f"Unique indices descriptions: {unique_indices_descriptions}") | |
logger.debug(f"User context: {user_context}") | |
prompt = self.prompt_template.format_json(**user_context) | |
logger.debug(f"Generated prompt: {prompt}") | |
try: | |
logger.info("Calling LLM to generate plan") | |
raw = self.llm.responses( | |
prompt, | |
temperature=0.0, | |
ctx={"cost_tracker": self.cost_tracker}, | |
description="Execution Plan Generation" | |
) | |
logger.debug(f"Raw LLM response: {raw}") | |
try: | |
logger.info("Parsing LLM response as JSON") | |
plan = json.loads(raw) | |
logger.debug(f"Parsed plan: {plan}") | |
# ensure minimal structure exists | |
if "steps" in plan and "fields" in plan: | |
logger.info("Plan successfully generated with required structure") | |
# Add pdf_meta and strategy info to the plan | |
plan["pdf_meta"] = pdf_meta | |
plan["strategy"] = strategy | |
if unique_indices: | |
plan["unique_indices"] = unique_indices | |
if unique_indices_descriptions: | |
plan["unique_indices_descriptions"] = unique_indices_descriptions | |
return plan | |
else: | |
missing_keys = [] | |
if "steps" not in plan: | |
missing_keys.append("steps") | |
if "fields" not in plan: | |
missing_keys.append("fields") | |
logger.error(f"Planner: LLM output missing required keys: {missing_keys}. Output: {raw}") | |
except json.JSONDecodeError as parse_exc: | |
logger.error(f"Planner: Failed to parse LLM output as JSON. Output: {raw}") | |
logger.error(f"JSON parsing error: {parse_exc}") | |
except Exception as parse_exc: | |
logger.error(f"Planner: Unexpected error parsing LLM output: {parse_exc}") | |
logger.error(f"LLM output: {raw}") | |
except Exception as llm_exc: | |
logger.error(f"Planner: LLM call failed: {llm_exc}") | |
logger.exception("Full traceback:") | |
# ---------- fallback static plan ---------- | |
logger.info("Falling back to static plan") | |
return self._static_plan(fields, strategy, unique_indices, unique_indices_descriptions) | |
# -------------------------------------------------- | |
def _load_prompt(name: str): | |
try: | |
data = yaml.safe_load(_PROMPTS_FILE.read_text()) | |
logger.debug(f"Loaded prompt template for '{name}'") | |
except Exception as e: | |
logger.error(f"Failed to load prompt template: {e}") | |
data = {} | |
class _Fmt: | |
def __init__(self, s: str): | |
self.s = s | |
def format_json(self, **kwargs): | |
# Format the template with the provided fields | |
fields = kwargs.get("fields", []) | |
field_descriptions = kwargs.get("field_descriptions", {}) | |
doc_preview = kwargs.get("doc_preview", "") | |
pdf_meta = kwargs.get("pdf_meta", {}) | |
strategy = kwargs.get("strategy", "Original Strategy") | |
unique_indices = kwargs.get("unique_indices", []) | |
unique_indices_descriptions = kwargs.get("unique_indices_descriptions", {}) | |
# Create a formatted string with the actual values | |
formatted = self.s | |
if fields: | |
# Ensure fields is a flat list of strings | |
fields_json = json.dumps([str(f) for f in fields]) | |
formatted = formatted.replace("<same list you received>", fields_json) | |
if field_descriptions: | |
formatted = formatted.replace("field_descriptions for extra context", f"field descriptions: {json.dumps(field_descriptions)}") | |
if doc_preview: | |
formatted = formatted.replace("a few kB of raw text from the uploaded document", f"document preview: {doc_preview[:1000]}...") | |
if pdf_meta: | |
formatted = formatted.replace("pdf_meta / field_descriptions for extra context", f"document metadata: {json.dumps(pdf_meta)}") | |
if strategy: | |
formatted = formatted.replace("strategy for extraction", f"extraction strategy: {strategy}") | |
if unique_indices: | |
formatted = formatted.replace("unique indices for extraction", f"unique indices: {json.dumps(unique_indices)}") | |
if unique_indices_descriptions: | |
formatted = formatted.replace("unique indices descriptions for extra context", f"unique indices descriptions: {json.dumps(unique_indices_descriptions)}") | |
return formatted | |
return _Fmt(data.get(name, "You are a planning agent. Produce a JSON tool plan.")) | |
# -------------------------------------------------- | |
def _static_plan(fields: List[str], strategy: str = "Original Strategy", unique_indices: List[str] | None = None, unique_indices_descriptions: Dict[str, str] | None = None) -> Dict[str, Any]: | |
"""Return a hard-coded plan to guarantee offline functionality.""" | |
logger.info("Generating static fallback plan") | |
logger.info(f"Strategy: {strategy}") | |
logger.info(f"Fields: {fields}") | |
logger.info(f"Unique indices: {unique_indices}") | |
logger.info(f"Unique indices descriptions: {unique_indices_descriptions}") | |
if strategy == "Unique Indices Strategy": | |
steps = [ | |
{"tool": "PDFAgent", "args": {}}, | |
{"tool": "TableAgent", "args": {}}, | |
{"tool": "UniqueIndicesCombinator", "args": {}}, | |
] | |
logger.info("Generated plan for Unique Indices Strategy") | |
logger.info(f"Steps: {steps}") | |
else: | |
steps = [ | |
{"tool": "PDFAgent", "args": {}}, | |
{"tool": "TableAgent", "args": {}}, | |
{ | |
"tool": "ForEachField", | |
"loop": [ | |
{"tool": "FieldMapper", "args": {"field": "$field"}}, | |
], | |
}, | |
] | |
logger.info("Generated plan for Original Strategy") | |
logger.info(f"Steps: {steps}") | |
plan = { | |
"steps": steps, | |
"fields": fields, | |
"pdf_meta": {}, | |
"strategy": strategy | |
} | |
if unique_indices: | |
plan["unique_indices"] = unique_indices | |
if unique_indices_descriptions: | |
plan["unique_indices_descriptions"] = unique_indices_descriptions | |
logger.info(f"Final plan: {json.dumps(plan, indent=2)}") | |
return plan |