File size: 9,630 Bytes
0a40afa
 
 
 
 
 
 
 
 
 
 
 
 
966ffcd
0a40afa
 
 
 
 
 
 
 
 
 
 
 
966ffcd
0a40afa
 
966ffcd
0a40afa
 
 
 
 
 
 
 
 
924cb7d
 
f98e92f
0a40afa
 
f98e92f
 
 
 
0a40afa
f98e92f
 
 
 
 
 
0a40afa
 
 
 
 
924cb7d
 
f98e92f
0a40afa
 
 
924cb7d
 
 
f98e92f
0a40afa
 
 
 
 
 
 
966ffcd
 
 
 
 
 
0a40afa
 
 
 
 
 
 
 
 
 
924cb7d
0a40afa
924cb7d
 
 
f98e92f
 
0a40afa
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f98e92f
0a40afa
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
924cb7d
 
f98e92f
0a40afa
 
 
 
 
 
 
 
 
 
 
 
 
924cb7d
 
 
 
f98e92f
 
0a40afa
 
 
 
 
 
 
f98e92f
0a40afa
 
f98e92f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
924cb7d
 
 
 
 
 
 
 
f98e92f
 
 
 
924cb7d
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
"""Planner: turns a user request into a JSON tool-plan via Azure OpenAI *Responses*."""

from __future__ import annotations

import json
import logging
from pathlib import Path
from typing import Dict, List, Any

import yaml

from services.llm_client import LLMClient
from config.settings import settings
from services.cost_tracker import CostTracker


_PROMPTS_FILE = Path(__file__).parent.parent / "config" / "prompts.yaml"

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


class Planner:
    """Generate a plan with the Responses API; fall back to a static template if parsing fails."""

    def __init__(self, cost_tracker=None) -> None:
        self.prompt_template = self._load_prompt("planner")
        self.llm = LLMClient(settings)
        self.cost_tracker = cost_tracker or CostTracker()
        logger.info("Planner initialized with prompt template")

    # --------------------------------------------------
    def build_plan(
        self,
        pdf_meta: Dict[str, Any],
        fields: List[str],
        doc_preview: str | None = None,
        field_descs: Dict | None = None,
        strategy: str = "Original Strategy",
        unique_indices: List[str] | None = None,
        unique_indices_descriptions: Dict[str, str] | None = None,
    ) -> Dict[str, Any]:
        """Return a JSON dict representing the execution plan."""
        logger.info(f"Building plan for strategy: {strategy}")
        logger.info(f"Fields: {fields}")
        logger.info(f"Unique indices: {unique_indices}")
        logger.info(f"Unique indices descriptions: {unique_indices_descriptions}")

        # For Unique Indices Strategy, use static plan directly
        if strategy == "Unique Indices Strategy":
            logger.info("Using static plan for Unique Indices Strategy")
            return self._static_plan(fields, strategy, unique_indices, unique_indices_descriptions)

        # For Original Strategy, try LLM first
        user_context = {
            "pdf_meta": pdf_meta,
            "doc_preview": doc_preview or "",
            "fields": fields,
            "field_descriptions": field_descs or {},
            "strategy": strategy,
            "unique_indices": unique_indices or [],
            "unique_indices_descriptions": unique_indices_descriptions or {},
        }

        logger.info(f"Building plan for fields: {fields}")
        logger.info(f"Using strategy: {strategy}")
        if unique_indices:
            logger.info(f"Unique indices: {unique_indices}")
            logger.info(f"Unique indices descriptions: {unique_indices_descriptions}")
        logger.debug(f"User context: {user_context}")

        prompt = self.prompt_template.format_json(**user_context)
        logger.debug(f"Generated prompt: {prompt}")

        try:
            logger.info("Calling LLM to generate plan")
            raw = self.llm.responses(
                prompt,
                temperature=0.0,
                ctx={"cost_tracker": self.cost_tracker},
                description="Execution Plan Generation"
            )
            logger.debug(f"Raw LLM response: {raw}")
            
            try:
                logger.info("Parsing LLM response as JSON")
                plan = json.loads(raw)
                logger.debug(f"Parsed plan: {plan}")
                
                # ensure minimal structure exists
                if "steps" in plan and "fields" in plan:
                    logger.info("Plan successfully generated with required structure")
                    # Add pdf_meta and strategy info to the plan
                    plan["pdf_meta"] = pdf_meta
                    plan["strategy"] = strategy
                    if unique_indices:
                        plan["unique_indices"] = unique_indices
                    if unique_indices_descriptions:
                        plan["unique_indices_descriptions"] = unique_indices_descriptions
                    return plan
                else:
                    missing_keys = []
                    if "steps" not in plan:
                        missing_keys.append("steps")
                    if "fields" not in plan:
                        missing_keys.append("fields")
                    logger.error(f"Planner: LLM output missing required keys: {missing_keys}. Output: {raw}")
            except json.JSONDecodeError as parse_exc:
                logger.error(f"Planner: Failed to parse LLM output as JSON. Output: {raw}")
                logger.error(f"JSON parsing error: {parse_exc}")
            except Exception as parse_exc:
                logger.error(f"Planner: Unexpected error parsing LLM output: {parse_exc}")
                logger.error(f"LLM output: {raw}")
        except Exception as llm_exc:
            logger.error(f"Planner: LLM call failed: {llm_exc}")
            logger.exception("Full traceback:")

        # ---------- fallback static plan ----------
        logger.info("Falling back to static plan")
        return self._static_plan(fields, strategy, unique_indices, unique_indices_descriptions)

    # --------------------------------------------------
    @staticmethod
    def _load_prompt(name: str):
        try:
            data = yaml.safe_load(_PROMPTS_FILE.read_text())
            logger.debug(f"Loaded prompt template for '{name}'")
        except Exception as e:
            logger.error(f"Failed to load prompt template: {e}")
            data = {}

        class _Fmt:
            def __init__(self, s: str):
                self.s = s

            def format_json(self, **kwargs):
                # Format the template with the provided fields
                fields = kwargs.get("fields", [])
                field_descriptions = kwargs.get("field_descriptions", {})
                doc_preview = kwargs.get("doc_preview", "")
                pdf_meta = kwargs.get("pdf_meta", {})
                strategy = kwargs.get("strategy", "Original Strategy")
                unique_indices = kwargs.get("unique_indices", [])
                unique_indices_descriptions = kwargs.get("unique_indices_descriptions", {})
                
                # Create a formatted string with the actual values
                formatted = self.s
                if fields:
                    # Ensure fields is a flat list of strings
                    fields_json = json.dumps([str(f) for f in fields])
                    formatted = formatted.replace("<same list you received>", fields_json)
                if field_descriptions:
                    formatted = formatted.replace("field_descriptions for extra context", f"field descriptions: {json.dumps(field_descriptions)}")
                if doc_preview:
                    formatted = formatted.replace("a few kB of raw text from the uploaded document", f"document preview: {doc_preview[:1000]}...")
                if pdf_meta:
                    formatted = formatted.replace("pdf_meta / field_descriptions for extra context", f"document metadata: {json.dumps(pdf_meta)}")
                if strategy:
                    formatted = formatted.replace("strategy for extraction", f"extraction strategy: {strategy}")
                if unique_indices:
                    formatted = formatted.replace("unique indices for extraction", f"unique indices: {json.dumps(unique_indices)}")
                if unique_indices_descriptions:
                    formatted = formatted.replace("unique indices descriptions for extra context", f"unique indices descriptions: {json.dumps(unique_indices_descriptions)}")
                
                return formatted

        return _Fmt(data.get(name, "You are a planning agent. Produce a JSON tool plan."))

    # --------------------------------------------------
    @staticmethod
    def _static_plan(fields: List[str], strategy: str = "Original Strategy", unique_indices: List[str] | None = None, unique_indices_descriptions: Dict[str, str] | None = None) -> Dict[str, Any]:
        """Return a hard-coded plan to guarantee offline functionality."""
        logger.info("Generating static fallback plan")
        logger.info(f"Strategy: {strategy}")
        logger.info(f"Fields: {fields}")
        logger.info(f"Unique indices: {unique_indices}")
        logger.info(f"Unique indices descriptions: {unique_indices_descriptions}")
        
        if strategy == "Unique Indices Strategy":
            steps = [
                {"tool": "PDFAgent", "args": {}},
                {"tool": "TableAgent", "args": {}},
                {"tool": "UniqueIndicesCombinator", "args": {}},
            ]
            logger.info("Generated plan for Unique Indices Strategy")
            logger.info(f"Steps: {steps}")
        else:
            steps = [
                {"tool": "PDFAgent", "args": {}},
                {"tool": "TableAgent", "args": {}},
                {
                    "tool": "ForEachField",
                    "loop": [
                        {"tool": "FieldMapper", "args": {"field": "$field"}},
                    ],
                },
            ]
            logger.info("Generated plan for Original Strategy")
            logger.info(f"Steps: {steps}")
            
        plan = {
            "steps": steps,
            "fields": fields,
            "pdf_meta": {},
            "strategy": strategy
        }
        if unique_indices:
            plan["unique_indices"] = unique_indices
        if unique_indices_descriptions:
            plan["unique_indices_descriptions"] = unique_indices_descriptions
            
        logger.info(f"Final plan: {json.dumps(plan, indent=2)}")
        return plan