Spaces:
Running
Running
"""Map a single field to a candidate value using page-by-page analysis and LLM-based extraction.""" | |
from typing import Dict, Any, Optional, List | |
import logging | |
import re | |
import json | |
from .base_agent import BaseAgent | |
from services.llm_client import LLMClient | |
from services.embedding_client import EmbeddingClient | |
from config.settings import settings | |
# Configure logging to disable verbose Azure HTTP logs | |
logging.getLogger('azure.core.pipeline.policies.http_logging_policy').setLevel(logging.WARNING) | |
logging.getLogger('azure.core.pipeline').setLevel(logging.WARNING) | |
logging.getLogger('azure').setLevel(logging.WARNING) | |
class FieldMapperAgent(BaseAgent): | |
def __init__(self): | |
self.logger = logging.getLogger(__name__) | |
self.llm = LLMClient(settings) | |
self.embedding_client = EmbeddingClient() | |
def _infer_document_context(self, text: str) -> str: | |
"""Use LLM to infer document context and user profile.""" | |
prompt = f"""Given this document text, describe the document type and typical user profile in 1-2 sentences. | |
Focus on the domain, purpose, and who would use this document. | |
Document text: | |
{text[:2000]} # First 2000 chars for context | |
Response format: | |
Document type: [type] | |
User profile: [profile] | |
""" | |
try: | |
self.logger.info("Inferring document context...") | |
self.logger.debug(f"Using text preview: {text[:500]}...") | |
# Get cost tracker from context | |
cost_tracker = self.ctx.get("cost_tracker") if hasattr(self, 'ctx') else None | |
if cost_tracker: | |
self.logger.info("Cost tracker found in context") | |
else: | |
self.logger.warning("No cost tracker found in context") | |
context = self.llm.responses( | |
prompt, temperature=0.0, | |
ctx={"cost_tracker": cost_tracker} if cost_tracker else None, | |
description="Document Context Inference" | |
) | |
# Log cost tracking results if available | |
if cost_tracker: | |
self.logger.info(f"Context inference costs - Input tokens: {cost_tracker.llm_input_tokens}, Output tokens: {cost_tracker.llm_output_tokens}") | |
self.logger.info(f"Context inference cost: ${cost_tracker.calculate_current_file_costs()['openai']['total_cost']:.4f}") | |
self.logger.info(f"Inferred context: {context}") | |
return context | |
except Exception as e: | |
self.logger.error(f"Error inferring context: {str(e)}") | |
return "Generic document user" | |
def _find_similar_chunks_search(self, query: str, index: Dict[str, Any], top_k: int = 3) -> List[Dict[str, Any]]: | |
"""Find chunks semantically similar to the query using cosine similarity.""" | |
try: | |
self.logger.info(f"Finding similar chunks for query: {query}") | |
self.logger.debug(f"Index contains {len(index['chunks'])} chunks and {len(index['embeddings'])} embeddings") | |
# Get query embedding | |
self.logger.debug("Generating embedding for query...") | |
query_embedding = self.embedding_client.embed([query])[0] | |
self.logger.debug(f"Query embedding generated, length: {len(query_embedding)}") | |
# Calculate similarities | |
similarities = [] | |
for i, (chunk, embedding) in enumerate(zip(index["chunks"], index["embeddings"])): | |
similarity = self._cosine_similarity(query_embedding, embedding) | |
similarities.append((similarity, chunk)) | |
self.logger.debug(f"Chunk {i} similarity: {similarity:.3f}") | |
self.logger.debug(f"Chunk {i} preview: {chunk['text'][:100]}...") | |
# Sort by similarity and return top k | |
similarities.sort(reverse=True) | |
results = [chunk for _, chunk in similarities[:top_k]] | |
# Log top results | |
self.logger.info(f"Found {len(results)} similar chunks") | |
for i, (sim, chunk) in enumerate(similarities[:top_k]): | |
self.logger.info(f"Top {i+1} match (similarity: {sim:.3f}): {chunk['text'][:200]}...") | |
return results | |
except Exception as e: | |
self.logger.error(f"Error finding similar chunks: {str(e)}", exc_info=True) | |
return [] | |
def _cosine_similarity(self, a: List[float], b: List[float]) -> float: | |
"""Calculate cosine similarity between two vectors.""" | |
import numpy as np | |
try: | |
# Check for zero vectors | |
if not a or not b or all(x == 0 for x in a) or all(x == 0 for x in b): | |
self.logger.warning("Zero vector detected in cosine similarity calculation") | |
return 0.0 | |
# Convert to numpy arrays | |
a_np = np.array(a) | |
b_np = np.array(b) | |
# Calculate norms | |
norm_a = np.linalg.norm(a_np) | |
norm_b = np.linalg.norm(b_np) | |
# Check for zero norms | |
if norm_a == 0 or norm_b == 0: | |
self.logger.warning("Zero norm detected in cosine similarity calculation") | |
return 0.0 | |
# Calculate similarity | |
similarity = np.dot(a_np, b_np) / (norm_a * norm_b) | |
# Check for NaN | |
if np.isnan(similarity): | |
self.logger.warning("NaN detected in cosine similarity calculation") | |
return 0.0 | |
return float(similarity) | |
except Exception as e: | |
self.logger.error(f"Error calculating cosine similarity: {str(e)}") | |
return 0.0 | |
def _extract_field_value_search(self, field: str, chunks: List[Dict[str, Any]], context: str) -> Optional[str]: | |
"""Use LLM to extract field value from relevant chunks.""" | |
# Combine chunks into context | |
chunk_texts = [chunk["text"] for chunk in chunks] | |
combined_context = "\n".join(chunk_texts) | |
self.logger.info(f"Extracting value for field '{field}' from {len(chunks)} chunks") | |
self.logger.debug(f"Combined context preview: {combined_context[:500]}...") | |
# Get filename from context if available | |
filename = self.ctx.get("pdf_meta", {}).get("filename", "") | |
filename_context = f"\nDocument filename: {filename}" if filename else "" | |
# Get field descriptions from context if available | |
field_descriptions = self.ctx.get("field_descriptions", {}) | |
# Format field descriptions for the prompt | |
field_descriptions_text = "" | |
if field_descriptions and field in field_descriptions: | |
desc_info = field_descriptions[field] | |
if isinstance(desc_info, dict): | |
description = desc_info.get('description', '') | |
format_info = desc_info.get('format', '') | |
examples = desc_info.get('examples', '') | |
possible_values = desc_info.get('possible_values', '') | |
field_descriptions_text = f"\nField information for '{field}':" | |
if description: | |
field_descriptions_text += f"\nDescription: {description}" | |
if format_info: | |
field_descriptions_text += f"\nFormat: {format_info}" | |
if examples: | |
field_descriptions_text += f"\nExamples: {examples}" | |
if possible_values: | |
field_descriptions_text += f"\nPossible Values: {possible_values}" | |
field_descriptions_text += "\n" | |
prompt = f"""You are an expert in {context} | |
Your task is to extract the value for the field: {field}{filename_context}{field_descriptions_text} | |
Consider the following context from the document: | |
{combined_context} | |
Instructions: | |
1. Look for the field value in the context | |
2. If you find multiple potential values, choose the most relevant one | |
3. If you're not sure, return None | |
4. Return ONLY the value, no explanations | |
Field value:""" | |
try: | |
self.logger.info(f"Calling LLM to extract value for field '{field}'") | |
self.logger.debug(f"Using prompt: {prompt}") | |
# Get cost tracker from context | |
cost_tracker = self.ctx.get("cost_tracker") if hasattr(self, 'ctx') else None | |
if cost_tracker: | |
self.logger.info("Cost tracker found in context") | |
else: | |
self.logger.warning("No cost tracker found in context") | |
value = self.llm.responses( | |
prompt, temperature=0.0, | |
ctx={"cost_tracker": cost_tracker} if cost_tracker else None, | |
description=f"Field Extraction - {field} (Search)" | |
) | |
# Log cost tracking results if available | |
if cost_tracker: | |
self.logger.info(f"Field extraction costs - Input tokens: {cost_tracker.llm_input_tokens}, Output tokens: {cost_tracker.llm_output_tokens}") | |
self.logger.info(f"Field extraction cost: ${cost_tracker.calculate_current_file_costs()['openai']['total_cost']:.4f}") | |
self.logger.debug(f"Raw LLM response: {value}") | |
if value and value.lower() not in ["none", "null", "n/a"]: | |
self.logger.info(f"Successfully extracted value: {value}") | |
return value.strip() | |
else: | |
self.logger.warning(f"LLM returned no valid value for field '{field}'") | |
return None | |
except Exception as e: | |
self.logger.error(f"Error extracting field value: {str(e)}", exc_info=True) | |
return None | |
def _extract_field_value_from_page(self, field: str, page_text: str, context: str) -> Optional[str]: | |
"""Use LLM to extract field value from a single page.""" | |
self.logger.info(f"Extracting value for field '{field}' from page") | |
self.logger.debug(f"Page text preview: {page_text[:500]}...") | |
# Get filename from context if available | |
filename = self.ctx.get("pdf_meta", {}).get("filename", "") | |
filename_context = f"\nDocument filename: {filename}" if filename else "" | |
# Get field descriptions from context if available | |
field_descriptions = self.ctx.get("field_descriptions", {}) | |
# Format field descriptions for the prompt | |
field_descriptions_text = "" | |
if field_descriptions and field in field_descriptions: | |
desc_info = field_descriptions[field] | |
if isinstance(desc_info, dict): | |
description = desc_info.get('description', '') | |
format_info = desc_info.get('format', '') | |
examples = desc_info.get('examples', '') | |
possible_values = desc_info.get('possible_values', '') | |
field_descriptions_text = f"\nField information for '{field}':" | |
if description: | |
field_descriptions_text += f"\nDescription: {description}" | |
if format_info: | |
field_descriptions_text += f"\nFormat: {format_info}" | |
if examples: | |
field_descriptions_text += f"\nExamples: {examples}" | |
if possible_values: | |
field_descriptions_text += f"\nPossible Values: {possible_values}" | |
field_descriptions_text += "\n" | |
prompt = f"""You are an expert in {context} | |
Your task is to extract the value for the field: {field}{filename_context}{field_descriptions_text} | |
Consider the following page from the document: | |
{page_text} | |
Instructions: | |
1. Look for the field values in this page | |
2. Return the data in a tabular format where each field is a column | |
3. Each field should have an array of values | |
4. The arrays must be aligned (same length) to represent rows | |
5. Return ONLY the JSON value, no explanations | |
6. Format the response as a valid JSON object with field names as keys | |
7. Keep the structure flat - do not nest values under 'details' or other keys | |
Example response format: | |
{{ | |
"field1": ["value1", "value2", "value3"], | |
"field2": ["value4", "value5", "value6"], | |
"field3": ["value7", "value8", "value9"] | |
}} | |
Field value:""" | |
try: | |
self.logger.info(f"Calling LLM to extract value for field '{field}' from page") | |
# Get cost tracker from context | |
cost_tracker = self.ctx.get("cost_tracker") if hasattr(self, 'ctx') else None | |
if cost_tracker: | |
self.logger.info("Cost tracker found in context") | |
else: | |
self.logger.warning("No cost tracker found in context") | |
value = self.llm.responses( | |
prompt, temperature=0.0, | |
ctx={"cost_tracker": cost_tracker} if cost_tracker else None, | |
description=f"Field Extraction - {field} (Page)" | |
) | |
# Log cost tracking results if available | |
if cost_tracker: | |
self.logger.info(f"Page extraction costs - Input tokens: {cost_tracker.llm_input_tokens}, Output tokens: {cost_tracker.llm_output_tokens}") | |
self.logger.info(f"Page extraction cost: ${cost_tracker.calculate_current_file_costs()['openai']['total_cost']:.4f}") | |
self.logger.debug(f"Raw LLM response: {value}") | |
if value and value.lower() not in ["none", "null", "n/a"]: | |
# Try to parse as JSON to ensure it's valid | |
try: | |
json_value = json.loads(value) | |
self.logger.info(f"Successfully extracted value: {json.dumps(json_value, indent=2)}") | |
return json.dumps(json_value, indent=2) | |
except json.JSONDecodeError: | |
# If not valid JSON, wrap it in a JSON object | |
json_value = {field: value.strip()} | |
self.logger.info(f"Wrapped non-JSON value in JSON object: {json.dumps(json_value, indent=2)}") | |
return json.dumps(json_value, indent=2) | |
else: | |
self.logger.warning(f"LLM returned no valid value for field '{field}'") | |
return None | |
except Exception as e: | |
self.logger.error(f"Error extracting field value from page: {str(e)}", exc_info=True) | |
return None | |
def _extract_with_unique_indices(self, text: str, context: str, unique_indices: List[str], fields_to_extract: List[str]) -> Optional[str]: | |
"""Extract values using unique indices strategy.""" | |
self.logger.info(f"Using unique indices strategy with indices: {unique_indices}") | |
self.logger.info(f"Fields to extract: {fields_to_extract}") | |
# Get filename from context if available | |
filename = self.ctx.get("pdf_meta", {}).get("filename", "") | |
filename_context = f"\nDocument filename: {filename}" if filename else "" | |
# Get field descriptions from context if available | |
field_descriptions = self.ctx.get("field_descriptions", {}) | |
unique_indices_descriptions = self.ctx.get("unique_indices_descriptions", {}) | |
# Format field descriptions for the prompt | |
field_descriptions_text = "" | |
if field_descriptions: | |
field_descriptions_text = "\nField descriptions:\n" | |
for field, desc_info in field_descriptions.items(): | |
if isinstance(desc_info, dict): | |
description = desc_info.get('description', '') | |
format_info = desc_info.get('format', '') | |
examples = desc_info.get('examples', '') | |
possible_values = desc_info.get('possible_values', '') | |
desc_line = f" {field}:" | |
if description: | |
desc_line += f" {description}" | |
if format_info: | |
desc_line += f" (Format: {format_info})" | |
if examples: | |
desc_line += f" (Examples: {examples})" | |
if possible_values: | |
desc_line += f" (Possible Values: {possible_values})" | |
field_descriptions_text += desc_line + "\n" | |
else: | |
field_descriptions_text += f" {field}: {desc_info}\n" | |
# Format unique indices descriptions for the prompt | |
unique_indices_text = "" | |
if unique_indices_descriptions: | |
unique_indices_text = "\nUnique indices descriptions:\n" | |
for index, desc_info in unique_indices_descriptions.items(): | |
if isinstance(desc_info, dict): | |
description = desc_info.get('description', '') | |
format_info = desc_info.get('format', '') | |
examples = desc_info.get('examples', '') | |
possible_values = desc_info.get('possible_values', '') | |
desc_line = f" {index}:" | |
if description: | |
desc_line += f" {description}" | |
if format_info: | |
desc_line += f" (Format: {format_info})" | |
if examples: | |
desc_line += f" (Examples: {examples})" | |
if possible_values: | |
desc_line += f" (Possible Values: {possible_values})" | |
unique_indices_text += desc_line + "\n" | |
else: | |
unique_indices_text += f" {index}: {desc_info}\n" | |
prompt = f"""You are an expert in {context} | |
Your task is to extract information from the document based on unique combinations of indices and their corresponding fields. | |
Unique Indices to look for: {', '.join(unique_indices)} | |
Fields to extract for each combination: {', '.join(fields_to_extract)}{filename_context}{field_descriptions_text}{unique_indices_text} | |
Consider the following document: | |
{text} | |
Instructions: | |
1. First, identify all unique combinations of the specified indices ({', '.join(unique_indices)}) in the document | |
2. For each unique combination found, extract the values for all specified fields ({', '.join(fields_to_extract)}) | |
3. Return the data in a tabular format where: | |
- Each row represents a unique combination | |
- Each column represents a field value | |
4. Return ONLY the JSON value, no explanations | |
5. Format the response as a valid JSON object with field names as keys | |
6. Keep the structure flat - do not nest values under 'details' or other keys | |
Example response format: | |
{{ | |
"index1": ["value1", "value2", "value3"], | |
"index2": ["value4", "value5", "value6"], | |
"field1": ["value7", "value8", "value9"], | |
"field2": ["value10", "value11", "value12"] | |
}} | |
Note: Each array in the response must have the same length, representing aligned rows of data. | |
For example, if there are 3 unique combinations found, each array should have 3 values. | |
Field values:""" | |
try: | |
self.logger.info("Calling LLM for unique indices extraction") | |
# Get cost tracker from context | |
cost_tracker = self.ctx.get("cost_tracker") if hasattr(self, 'ctx') else None | |
value = self.llm.responses( | |
prompt, temperature=0.0, | |
ctx={"cost_tracker": cost_tracker} if cost_tracker else None, | |
description="Unique Indices Field Extraction" | |
) | |
# Log cost tracking results if available | |
if cost_tracker: | |
self.logger.info(f"Unique indices extraction costs - Input tokens: {cost_tracker.llm_input_tokens}, Output tokens: {cost_tracker.llm_output_tokens}") | |
self.logger.info(f"Unique indices extraction cost: ${cost_tracker.calculate_current_file_costs()['openai']['total_cost']:.4f}") | |
self.logger.debug(f"Raw LLM response: {value}") | |
if value and value.lower() not in ["none", "null", "n/a"]: | |
# Try to parse as JSON to ensure it's valid | |
try: | |
json_value = json.loads(value) | |
self.logger.info(f"Successfully extracted values: {json.dumps(json_value, indent=2)}") | |
return json.dumps(json_value, indent=2) | |
except json.JSONDecodeError: | |
# If not valid JSON, wrap it in a JSON object | |
json_value = {", ".join(unique_indices + fields_to_extract): value.strip()} | |
self.logger.info(f"Wrapped non-JSON value in JSON object: {json.dumps(json_value, indent=2)}") | |
return json.dumps(json_value, indent=2) | |
else: | |
self.logger.warning("LLM returned no valid value") | |
return None | |
except Exception as e: | |
self.logger.error(f"Error in unique indices extraction: {str(e)}", exc_info=True) | |
return None | |
def execute(self, ctx: Dict[str, Any]): # noqa: D401 | |
field = ctx.get("current_field") | |
strategy = ctx.get("strategy", "original") # Default to original strategy | |
self.logger.info(f"Starting field mapping for: {field} using strategy: {strategy}") | |
# Store context for use in extraction methods | |
self.ctx = ctx | |
# Get text and index | |
text = "" | |
index = None | |
if "index" in ctx and isinstance(ctx["index"], dict): | |
index = ctx["index"] | |
text = index.get("text", "") | |
self.logger.info(f"Using text from index (length: {len(text)})") | |
self.logger.debug(f"Index contains {len(index.get('chunks', []))} chunks") | |
self.logger.debug(f"Index contains {len(index.get('embeddings', []))} embeddings") | |
elif "text" in ctx: | |
text = ctx["text"] | |
self.logger.info(f"Using text from direct context (length: {len(text)})") | |
if not text: | |
self.logger.warning("No text content found in context or index") | |
return None | |
# Infer document context if not already present | |
if "document_context" not in ctx: | |
ctx["document_context"] = self._infer_document_context(text) | |
self.logger.info(f"Using document context: {ctx['document_context']}") | |
# Process based on selected strategy | |
if strategy == "unique_indices": | |
unique_indices = ctx.get("unique_indices", []) | |
fields_to_extract = ctx.get("fields_to_extract", []) | |
if not unique_indices or not fields_to_extract: | |
self.logger.warning("Missing unique indices or fields to extract") | |
return None | |
return self._extract_with_unique_indices(text, ctx["document_context"], unique_indices, fields_to_extract) | |
else: | |
# Original strategy | |
if not field: | |
self.logger.warning("No field provided in context") | |
return None | |
self.logger.info(f"Processing field: {field}") | |
self.logger.info("Processing entire document...") | |
value = self._extract_field_value_from_page(field, text, ctx["document_context"]) | |
if value: | |
return value | |
# If no value found, try the search-based approach as fallback | |
self.logger.warning("No value found in document analysis, falling back to search-based approach") | |
if index and "embeddings" in index: | |
self.logger.info("Using semantic search with embeddings") | |
search_query = f"{field} in {ctx['document_context']}" | |
similar_chunks = self._find_similar_chunks_search(search_query, index) | |
if similar_chunks: | |
self.logger.info(f"Found {len(similar_chunks)} relevant chunks, attempting value extraction") | |
value = self._extract_field_value_search(field, similar_chunks, ctx["document_context"]) | |
if value: | |
return value | |
self.logger.warning(f"No candidate found for field: {field}") | |
return f"<no candidate for {field}>" |