Spaces:
Sleeping
Sleeping
"""Streamlit front‑end entry‑point.""" | |
import yaml | |
import json | |
import streamlit as st | |
import logging | |
from dotenv import load_dotenv | |
from orchestrator.planner import Planner | |
from orchestrator.executor import Executor | |
from config.settings import settings | |
from config.config_manager import config_manager | |
import fitz # PyMuPDF local import to avoid heavy load on startup | |
import pandas as pd | |
from datetime import datetime | |
from services.cost_tracker import CostTracker | |
# Create a custom stream handler to capture logs | |
class LogCaptureHandler(logging.StreamHandler): | |
def __init__(self): | |
super().__init__() | |
self.logs = [] | |
def emit(self, record): | |
try: | |
msg = self.format(record) | |
self.logs.append(msg) | |
except Exception: | |
self.handleError(record) | |
def get_logs(self): | |
return "\n".join(self.logs) | |
def clear(self): | |
self.logs = [] | |
# Initialize session state for storing execution history | |
if 'execution_history' not in st.session_state: | |
st.session_state.execution_history = [] | |
# Initialize session state for field descriptions tables | |
if 'field_descriptions_table' not in st.session_state: | |
st.session_state.field_descriptions_table = [] | |
# Initialize session state for unique indices descriptions table | |
if 'unique_indices_descriptions_table' not in st.session_state: | |
st.session_state.unique_indices_descriptions_table = [] | |
# Initialize session state for fields string | |
if 'fields_str' not in st.session_state: | |
st.session_state.fields_str = "Chain, Percentage, Seq Loc" | |
# Set up logging capture | |
log_capture = LogCaptureHandler() | |
log_capture.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')) | |
# Configure root logger | |
root_logger = logging.getLogger() | |
root_logger.setLevel(logging.INFO) | |
root_logger.addHandler(log_capture) | |
# Configure specific loggers | |
for logger_name in ['orchestrator', 'agents', 'services']: | |
logger = logging.getLogger(logger_name) | |
logger.setLevel(logging.INFO) | |
logger.addHandler(log_capture) | |
load_dotenv() | |
st.set_page_config(page_title="PDF Field Extractor", layout="wide") | |
# Sidebar navigation | |
st.sidebar.title("Navigation") | |
page = st.sidebar.radio("Go to", ["Documentation", "Traces", "Execution"]) | |
# Documentation Page | |
if page == "Documentation": | |
st.title("Deep‑Research PDF Field Extractor") | |
st.markdown(""" | |
## Overview | |
This system uses a multi-agent architecture to extract fields from PDFs with high accuracy and reliability. | |
### Core Components | |
1. **Planner** | |
- Generates execution plans using Azure OpenAI | |
- Determines optimal extraction strategy | |
- Manages task dependencies | |
2. **Executor** | |
- Executes the generated plan | |
- Manages agent execution flow | |
- Handles context and result management | |
3. **Agents** | |
- `TableAgent`: Extracts text and tables using Azure Document Intelligence | |
- `FieldMapper`: Maps fields to values using extracted content | |
- `ForEachField`: Controls field iteration flow | |
### Processing Pipeline | |
1. **Document Processing** | |
- Text and table extraction using Azure Document Intelligence | |
- Layout and structure preservation | |
- Support for complex document formats | |
2. **Field Extraction** | |
- Document type inference | |
- User profile determination | |
- Page-by-page scanning | |
- Value extraction and validation | |
3. **Context Building** | |
- Document metadata | |
- Field descriptions | |
- User context | |
- Execution history | |
### Key Features | |
#### Smart Field Extraction | |
- Two-step extraction strategy: | |
1. Page-by-page scanning for precise extraction | |
2. Semantic search fallback if no value found | |
- Basic context awareness for improved extraction | |
- Support for tabular data extraction | |
#### Document Intelligence | |
- Azure Document Intelligence integration | |
- Layout and structure preservation | |
- Table extraction and formatting | |
- Complex document handling | |
#### Execution Monitoring | |
- Detailed execution traces | |
- Success/failure status | |
- Comprehensive logging | |
- Result storage and retrieval | |
### Technical Requirements | |
- Azure OpenAI API key | |
- Azure Document Intelligence endpoint | |
- Python 3.9 or higher | |
- Required Python packages (see requirements.txt) | |
### Getting Started | |
1. **Upload Your PDF** | |
- Click the "Upload PDF" button | |
- Select your PDF file | |
2. **Specify Fields** | |
- Enter comma-separated field names | |
- Example: `Date, Name, Value, Location` | |
3. **Optional: Add Field Descriptions** | |
- Provide YAML-formatted field descriptions | |
- Helps improve extraction accuracy | |
4. **Run Extraction** | |
- Click "Run extraction" | |
- Monitor progress in execution trace | |
- View results in table format | |
5. **Download Results** | |
- Export as CSV | |
- View detailed execution logs | |
### Support | |
For detailed technical documentation, please refer to: | |
- [Architecture Overview](ARCHITECTURE.md) | |
- [Developer Documentation](DEVELOPER.md) | |
""") | |
# Traces Page | |
elif page == "Traces": | |
st.title("Execution Traces") | |
if not st.session_state.execution_history: | |
st.info("No execution traces available yet. Run an extraction to see traces here.") | |
else: | |
# Create a DataFrame from the execution history | |
history_data = [] | |
for record in st.session_state.execution_history: | |
history_data.append({ | |
"filename": record["filename"], | |
"datetime": record["datetime"], | |
"fields": ", ".join(record.get("fields", [])), | |
"logs": record.get("logs", []), | |
"results": record.get("results", None) | |
}) | |
history_df = pd.DataFrame(history_data) | |
# Display column headers | |
col1, col2, col3, col4, col5 = st.columns([2, 2, 3, 1, 1]) | |
with col1: | |
st.markdown("**Filename**") | |
with col2: | |
st.markdown("**Timestamp**") | |
with col3: | |
st.markdown("**Fields**") | |
with col4: | |
st.markdown("**Logs**") | |
with col5: | |
st.markdown("**Results**") | |
st.markdown("---") # Add a separator line | |
# Display the table with download buttons | |
for idx, row in history_df.iterrows(): | |
col1, col2, col3, col4, col5 = st.columns([2, 2, 3, 1, 1]) | |
with col1: | |
st.write(row["filename"]) | |
with col2: | |
st.write(row["datetime"]) | |
with col3: | |
st.write(row["fields"]) | |
with col4: | |
if row["logs"]: # Check if we have any logs | |
st.download_button( | |
"Download Logs", | |
row["logs"], # Use the stored logs | |
file_name=f"logs_{row['filename']}_{row['datetime']}.txt", | |
key=f"logs_dl_{idx}" | |
) | |
else: | |
st.write("No Logs") | |
with col5: | |
if row["results"] is not None: | |
results_df = pd.DataFrame(row["results"]) | |
st.download_button( | |
"Download Results", | |
results_df.to_csv(index=False), | |
file_name=f"results_{row['filename']}_{row['datetime']}.csv", | |
key=f"results_dl_{idx}" | |
) | |
else: | |
st.write("No Results") | |
st.markdown("---") # Add a separator line between rows | |
# Execution Page | |
else: # page == "Execution" | |
st.title("Deep‑Research PDF Field Extractor (POC)") | |
def flatten_json_response(json_data, fields): | |
"""Flatten the nested JSON response into a tabular structure with dynamic columns.""" | |
logger = logging.getLogger(__name__) | |
logger.info("Starting flatten_json_response") | |
logger.info(f"Input fields: {fields}") | |
# Handle the case where the response is a string | |
if isinstance(json_data, str): | |
logger.info("Input is a string, attempting to parse as JSON") | |
try: | |
json_data = json.loads(json_data) | |
logger.info("Successfully parsed JSON string") | |
except json.JSONDecodeError as e: | |
logger.error(f"Failed to parse JSON string: {e}") | |
return pd.DataFrame(columns=fields) | |
# If the data is wrapped in an array, get the first item | |
if isinstance(json_data, list) and len(json_data) > 0: | |
logger.info("Data is wrapped in an array, extracting first item") | |
json_data = json_data[0] | |
# If the data is a dictionary with numeric keys, get the first value | |
if isinstance(json_data, dict): | |
keys = list(json_data.keys()) | |
logger.info(f"Checking dictionary keys: {keys}") | |
# Check if all keys are integers or string representations of integers | |
if all(isinstance(k, int) or (isinstance(k, str) and k.isdigit()) for k in keys): | |
logger.info("Data has numeric keys, extracting first value") | |
first_key = sorted(keys, key=lambda x: int(x) if isinstance(x, str) else x)[0] | |
json_data = json_data[first_key] | |
logger.info(f"Extracted data from key '{first_key}'") | |
logger.info(f"JSON data keys: {list(json_data.keys()) if isinstance(json_data, dict) else 'Not a dict'}") | |
# Create a list to store rows | |
rows = [] | |
# Get the length of the first array to determine number of rows | |
if isinstance(json_data, dict) and len(json_data) > 0: | |
first_field = list(json_data.keys())[0] | |
num_rows = len(json_data[first_field]) if isinstance(json_data[first_field], list) else 1 | |
logger.info(f"Number of rows to process: {num_rows}") | |
# Create a row for each index | |
for i in range(num_rows): | |
logger.debug(f"Processing row {i}") | |
row = {} | |
for field in fields: | |
if field in json_data and isinstance(json_data[field], list) and i < len(json_data[field]): | |
row[field] = json_data[field][i] | |
logger.debug(f"Field '{field}' value at index {i}: {json_data[field][i]}") | |
else: | |
row[field] = None | |
logger.debug(f"Field '{field}' not found or index {i} out of bounds") | |
rows.append(row) | |
else: | |
logger.error(f"Unexpected data structure: {type(json_data)}") | |
return pd.DataFrame(columns=fields) | |
# Create DataFrame with all requested fields as columns | |
df = pd.DataFrame(rows) | |
logger.info(f"Created DataFrame with shape: {df.shape}") | |
logger.info(f"DataFrame columns: {df.columns.tolist()}") | |
# Ensure columns are in the same order as the fields list | |
df = df[fields] | |
logger.info(f"Final DataFrame columns after reordering: {df.columns.tolist()}") | |
return df | |
# ============================================================================ | |
# SECTION 1: FILE UPLOAD | |
# ============================================================================ | |
st.header("📄 Step 1: Upload Document") | |
pdf_file = st.file_uploader("Upload PDF", type=["pdf"], help="Select a PDF file to process") | |
if pdf_file: | |
st.success(f"✅ File uploaded: {pdf_file.name}") | |
# ============================================================================ | |
# SECTION 2: STRATEGY SELECTION | |
# ============================================================================ | |
st.header("🎯 Step 2: Select Extraction Strategy") | |
strategy = st.radio( | |
"Choose your extraction approach:", | |
["Original Strategy", "Unique Indices Strategy"], | |
help="**Original Strategy**: Process document page by page, extracting each field individually. **Unique Indices Strategy**: Process entire document at once using unique combinations of indices.", | |
horizontal=True | |
) | |
if strategy == "Original Strategy": | |
st.info("📋 **Original Strategy**: Will extract fields one by one from the document pages.") | |
else: | |
st.info("🔍 **Unique Indices Strategy**: Will find unique combinations and extract additional fields for each.") | |
# ============================================================================ | |
# SECTION 3: CONFIGURATION (Only for Unique Indices Strategy) | |
# ============================================================================ | |
if strategy == "Unique Indices Strategy": | |
st.header("⚙️ Step 3: Configuration") | |
# File Type Selection | |
col1, col2 = st.columns([3, 1]) | |
with col1: | |
# Get available configurations | |
config_names = config_manager.get_config_names() | |
selected_config_name = st.selectbox( | |
"Select File Type Configuration:", | |
config_names, | |
format_func=lambda x: config_manager.get_config(x)['name'] if config_manager.get_config(x) else x, | |
help="Choose a predefined configuration or create a new one" | |
) | |
with col2: | |
if st.button("🔄 Load Config", help="Load the selected configuration"): | |
config = config_manager.get_config(selected_config_name) | |
if config: | |
# Update fields | |
st.session_state.fields_str = config.get('fields', '') | |
# Update field descriptions table | |
field_descs = config.get('field_descriptions', {}) | |
st.session_state.field_descriptions_table = [] | |
for field_name, field_info in field_descs.items(): | |
st.session_state.field_descriptions_table.append({ | |
'field_name': field_name, | |
'field_description': field_info.get('description', ''), | |
'format': field_info.get('format', ''), | |
'examples': field_info.get('examples', ''), | |
'possible_values': field_info.get('possible_values', '') | |
}) | |
# Update unique indices descriptions table | |
unique_descs = config.get('unique_indices_descriptions', {}) | |
st.session_state.unique_indices_descriptions_table = [] | |
for field_name, field_info in unique_descs.items(): | |
st.session_state.unique_indices_descriptions_table.append({ | |
'field_name': field_name, | |
'field_description': field_info.get('description', ''), | |
'format': field_info.get('format', ''), | |
'examples': field_info.get('examples', ''), | |
'possible_values': field_info.get('possible_values', '') | |
}) | |
st.session_state.last_selected_config = selected_config_name | |
st.success(f"✅ Configuration '{config['name']}' loaded successfully!") | |
st.rerun() | |
else: | |
st.error("❌ Failed to load configuration") | |
# Clear Configuration Button | |
if st.button("🗑️ Clear All Configuration", help="Clear all configuration and start fresh"): | |
st.session_state.field_descriptions_table = [] | |
st.session_state.unique_indices_descriptions_table = [] | |
st.session_state.fields_str = "" | |
st.session_state.last_selected_config = "" | |
st.success("✅ Configuration cleared!") | |
st.rerun() | |
# ============================================================================ | |
# SECTION 4: FIELD DESCRIPTIONS | |
# ============================================================================ | |
st.subheader("📝 Field Descriptions") | |
st.markdown(""" | |
<div style="background-color: #e8f4fd; padding: 1rem; border-radius: 0.5rem; border-left: 4px solid #1f77b4; color: #333;"> | |
<strong>Field Descriptions</strong><br> | |
Add descriptions for the fields you want to extract. These help the system understand what to look for. | |
</div> | |
""", unsafe_allow_html=True) | |
# Create the table interface | |
col1, col2, col3, col4, col5, col6 = st.columns([2, 3, 2, 2, 2, 1]) | |
with col1: | |
st.markdown("**Field Name**") | |
with col2: | |
st.markdown("**Field Description**") | |
with col3: | |
st.markdown("**Format**") | |
with col4: | |
st.markdown("**Examples**") | |
with col5: | |
st.markdown("**Possible Values**") | |
with col6: | |
st.markdown("**Actions**") | |
# Display existing rows | |
for i, row in enumerate(st.session_state.field_descriptions_table): | |
col1, col2, col3, col4, col5, col6 = st.columns([2, 3, 2, 2, 2, 1]) | |
with col1: | |
field_name = st.text_input("", value=row.get('field_name', ''), key=f"field_name_{i}") | |
with col2: | |
field_desc = st.text_input("", value=row.get('field_description', ''), key=f"field_desc_{i}") | |
with col3: | |
field_format = st.text_input("", value=row.get('format', ''), key=f"field_format_{i}") | |
with col4: | |
field_examples = st.text_input("", value=row.get('examples', ''), key=f"field_examples_{i}") | |
with col5: | |
field_possible_values = st.text_input("", value=row.get('possible_values', ''), key=f"field_possible_values_{i}") | |
with col6: | |
if st.button("🗑️", key=f"delete_{i}", help="Delete this row"): | |
st.session_state.field_descriptions_table.pop(i) | |
st.rerun() | |
# Update the row in session state | |
st.session_state.field_descriptions_table[i] = { | |
'field_name': field_name, | |
'field_description': field_desc, | |
'format': field_format, | |
'examples': field_examples, | |
'possible_values': field_possible_values | |
} | |
# Add new row button | |
if st.button("➕ Add Field Description Row"): | |
st.session_state.field_descriptions_table.append({ | |
'field_name': '', | |
'field_description': '', | |
'format': '', | |
'examples': '', | |
'possible_values': '' | |
}) | |
st.rerun() | |
# ============================================================================ | |
# SECTION 5: UNIQUE FIELD DESCRIPTIONS | |
# ============================================================================ | |
st.subheader("🔑 Unique Field Descriptions") | |
st.markdown(""" | |
<div style="background-color: #fff8e1; padding: 1rem; border-radius: 0.5rem; border-left: 4px solid #ffc107; color: #333;"> | |
<strong>Unique Field Descriptions</strong><br> | |
Add descriptions for the unique fields that will be used to identify different combinations in the document. | |
</div> | |
""", unsafe_allow_html=True) | |
# Create the table interface for unique indices | |
col1, col2, col3, col4, col5, col6 = st.columns([2, 3, 2, 2, 2, 1]) | |
with col1: | |
st.markdown("**Field Name**") | |
with col2: | |
st.markdown("**Field Description**") | |
with col3: | |
st.markdown("**Format**") | |
with col4: | |
st.markdown("**Examples**") | |
with col5: | |
st.markdown("**Possible Values**") | |
with col6: | |
st.markdown("**Actions**") | |
# Display existing rows for unique indices | |
for i, row in enumerate(st.session_state.unique_indices_descriptions_table): | |
col1, col2, col3, col4, col5, col6 = st.columns([2, 3, 2, 2, 2, 1]) | |
with col1: | |
idx_field_name = st.text_input("", value=row.get('field_name', ''), key=f"unique_field_name_{i}") | |
with col2: | |
idx_field_desc = st.text_input("", value=row.get('field_description', ''), key=f"unique_field_desc_{i}") | |
with col3: | |
idx_field_format = st.text_input("", value=row.get('format', ''), key=f"unique_field_format_{i}") | |
with col4: | |
idx_field_examples = st.text_input("", value=row.get('examples', ''), key=f"unique_field_examples_{i}") | |
with col5: | |
idx_field_possible_values = st.text_input("", value=row.get('possible_values', ''), key=f"unique_field_possible_values_{i}") | |
with col6: | |
if st.button("🗑️", key=f"unique_delete_{i}", help="Delete this row"): | |
st.session_state.unique_indices_descriptions_table.pop(i) | |
st.rerun() | |
# Update the row in session state | |
st.session_state.unique_indices_descriptions_table[i] = { | |
'field_name': idx_field_name, | |
'field_description': idx_field_desc, | |
'format': idx_field_format, | |
'examples': idx_field_examples, | |
'possible_values': idx_field_possible_values | |
} | |
# Add new row button for unique indices | |
if st.button("➕ Add Unique Field Description Row"): | |
st.session_state.unique_indices_descriptions_table.append({ | |
'field_name': '', | |
'field_description': '', | |
'format': '', | |
'examples': '', | |
'possible_values': '' | |
}) | |
st.rerun() | |
# ============================================================================ | |
# SECTION 6: SAVE CONFIGURATION | |
# ============================================================================ | |
st.subheader("💾 Save Configuration") | |
st.markdown(""" | |
<div style="background-color: #e1f5fe; padding: 1rem; border-radius: 0.5rem; border-left: 4px solid #17a2b8; color: #333;"> | |
<strong>Save Current Configuration</strong><br> | |
Save your current configuration as a new file type for future use. | |
</div> | |
""", unsafe_allow_html=True) | |
col1, col2 = st.columns([3, 1]) | |
with col1: | |
save_config_name = st.text_input( | |
"Configuration Name:", | |
placeholder="Enter a name for this configuration (e.g., 'Biotech Report', 'Clinical Data')", | |
help="Choose a descriptive name that will appear in the dropdown" | |
) | |
with col2: | |
if st.button("💾 Save Config", help="Save the current configuration"): | |
if save_config_name: | |
# Prepare configuration data | |
field_descs = {} | |
for row in st.session_state.field_descriptions_table: | |
if row['field_name']: # Only include rows with field names | |
field_descs[row['field_name']] = { | |
'description': row['field_description'], | |
'format': row['format'], | |
'examples': row['examples'], | |
'possible_values': row['possible_values'] | |
} | |
# Get unique indices descriptions | |
unique_indices_descs = {} | |
for row in st.session_state.unique_indices_descriptions_table: | |
if row['field_name']: # Only include rows with field names | |
unique_indices_descs[row['field_name']] = { | |
'description': row['field_description'], | |
'format': row['format'], | |
'examples': row['examples'], | |
'possible_values': row['possible_values'] | |
} | |
# Get fields from unique indices | |
fields_str = ", ".join([row['field_name'] for row in st.session_state.unique_indices_descriptions_table if row['field_name']]) | |
config_data = { | |
'name': save_config_name, | |
'description': f"Configuration for {save_config_name}", | |
'fields': fields_str, | |
'field_descriptions': field_descs, | |
'unique_indices_descriptions': unique_indices_descs | |
} | |
if config_manager.save_config(save_config_name, config_data): | |
st.success(f"✅ Configuration '{save_config_name}' saved successfully!") | |
config_manager.reload_configs() | |
st.rerun() | |
else: | |
st.error("❌ Failed to save configuration") | |
else: | |
st.error("❌ Please enter a configuration name") | |
# ============================================================================ | |
# SECTION 7: ORIGINAL STRATEGY CONFIGURATION | |
# ============================================================================ | |
else: # Original Strategy | |
st.header("⚙️ Step 3: Field Configuration") | |
fields_str = st.text_input( | |
"Fields to Extract (comma-separated):", | |
value=st.session_state.fields_str, | |
key="fields_input", | |
help="Enter the field names you want to extract, separated by commas" | |
) | |
st.session_state.fields_str = fields_str | |
# ============================================================================ | |
# SECTION 8: EXECUTION | |
# ============================================================================ | |
st.header("🚀 Step 4: Run Extraction") | |
# Convert table to JSON for processing | |
field_descs = {} | |
if st.session_state.field_descriptions_table: | |
for row in st.session_state.field_descriptions_table: | |
if row['field_name']: # Only include rows with field names | |
field_descs[row['field_name']] = { | |
'description': row['field_description'], | |
'format': row['format'], | |
'examples': row['examples'], | |
'possible_values': row['possible_values'] | |
} | |
# Prepare unique indices for Unique Indices Strategy | |
unique_indices = None | |
unique_indices_descriptions = None | |
if strategy == "Unique Indices Strategy": | |
# Convert unique indices table to JSON for processing and extract field names | |
unique_indices_descriptions = {} | |
unique_indices = [] | |
if st.session_state.unique_indices_descriptions_table: | |
for row in st.session_state.unique_indices_descriptions_table: | |
if row['field_name']: # Only include rows with field names | |
unique_indices.append(row['field_name']) | |
unique_indices_descriptions[row['field_name']] = { | |
'description': row['field_description'], | |
'format': row['format'], | |
'examples': row['examples'], | |
'possible_values': row['possible_values'] | |
} | |
# Status indicator | |
if pdf_file: | |
if strategy == "Original Strategy": | |
field_count = len([f.strip() for f in st.session_state.fields_str.split(",") if f.strip()]) | |
st.info(f"📊 Ready to extract {field_count} fields using Original Strategy") | |
else: | |
unique_count = len(unique_indices) if unique_indices else 0 | |
field_count = len(field_descs) | |
st.info(f"📊 Ready to extract {field_count} additional fields for {unique_count} unique combinations using Unique Indices Strategy") | |
# Run button | |
if st.button("🚀 Run Extraction", type="primary", disabled=not pdf_file): | |
if not pdf_file: | |
st.error("❌ Please upload a PDF file first") | |
else: | |
# Prepare field list based on strategy | |
if strategy == "Original Strategy": | |
field_list = [f.strip() for f in st.session_state.fields_str.split(",") if f.strip()] | |
else: # Unique Indices Strategy | |
# For Unique Indices Strategy, get fields from the unique indices descriptions table | |
field_list = [] | |
if st.session_state.unique_indices_descriptions_table: | |
for row in st.session_state.unique_indices_descriptions_table: | |
if row['field_name']: # Only include rows with field names | |
field_list.append(row['field_name']) | |
try: | |
with st.spinner("Planning …"): | |
# quick first-page text preview to give LLM document context | |
doc = fitz.open(stream=pdf_file.getvalue(), filetype="pdf") # type: ignore[arg-type] | |
preview = "\n".join(page.get_text() for page in doc[:10])[:20000] # first 2 pages, 2k chars | |
# Create a cost tracker for this run | |
cost_tracker = CostTracker() | |
planner = Planner(cost_tracker=cost_tracker) | |
plan = planner.build_plan( | |
pdf_meta={"filename": pdf_file.name}, | |
doc_preview=preview, | |
fields=field_list, | |
field_descs=field_descs, | |
strategy=strategy, | |
unique_indices=unique_indices, | |
unique_indices_descriptions=unique_indices_descriptions | |
) | |
# Add a visual separator | |
st.markdown("---") | |
with st.spinner("Executing …"): | |
executor = Executor(settings=settings, cost_tracker=cost_tracker) | |
results, logs = executor.run(plan, pdf_file) | |
# Get detailed costs | |
costs = executor.cost_tracker.calculate_current_file_costs() | |
model_cost = costs["openai"]["total_cost"] | |
di_cost = costs["document_intelligence"]["total_cost"] | |
# Add debug logging for cost tracking | |
logger.info(f"Cost tracker debug info:") | |
logger.info(f" LLM input tokens: {executor.cost_tracker.llm_input_tokens}") | |
logger.info(f" LLM output tokens: {executor.cost_tracker.llm_output_tokens}") | |
logger.info(f" DI pages: {executor.cost_tracker.di_pages}") | |
logger.info(f" LLM calls count: {len(executor.cost_tracker.llm_calls)}") | |
logger.info(f" Current file costs: {executor.cost_tracker.current_file_costs}") | |
logger.info(f" Calculated costs: {costs}") | |
# Display detailed costs table | |
st.subheader("Detailed Costs") | |
costs_df = executor.cost_tracker.get_detailed_costs_table() | |
st.dataframe(costs_df, use_container_width=True) | |
st.info( | |
f"LLM input tokens: {executor.cost_tracker.llm_input_tokens}, " | |
f"LLM output tokens: {executor.cost_tracker.llm_output_tokens}, " | |
f"DI pages: {executor.cost_tracker.di_pages}, " | |
f"Model cost: ${model_cost:.4f}, " | |
f"DI cost: ${di_cost:.4f}, " | |
f"Total cost: ${model_cost + di_cost:.4f}" | |
) | |
# Add detailed logging about what executor returned | |
logger.info(f"Executor returned results of type: {type(results)}") | |
logger.info(f"Results content: {results}") | |
# Check if results is already a DataFrame | |
if isinstance(results, pd.DataFrame): | |
logger.info(f"Results is already a DataFrame with shape: {results.shape}") | |
logger.info(f"DataFrame columns: {results.columns.tolist()}") | |
logger.info(f"DataFrame head: {results.head()}") | |
df = results | |
else: | |
logger.info("Results is not a DataFrame, calling flatten_json_response") | |
# Process results using flatten_json_response | |
df = flatten_json_response(results, field_list) | |
# Log final DataFrame info | |
logger.info(f"Final DataFrame shape: {df.shape}") | |
logger.info(f"Final DataFrame columns: {df.columns.tolist()}") | |
if not df.empty: | |
logger.info(f"Final DataFrame sample: {df.head()}") | |
# Store execution in history | |
execution_record = { | |
"filename": pdf_file.name, | |
"datetime": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), | |
"fields": field_list, | |
"logs": log_capture.get_logs(), # Store the actual logs | |
"results": df.to_dict() if not df.empty else None | |
} | |
st.session_state.execution_history.append(execution_record) | |
log_capture.clear() # Clear logs after storing them | |
# ----------------- UI: show execution tree ----------------- | |
st.subheader("Execution trace") | |
for log in logs: | |
indent = " " * 4 * log["depth"] | |
# Add error indicator if there was an error | |
error_indicator = "❌ " if log.get("error") else "✓ " | |
# Use a fixed preview text instead of the result | |
with st.expander(f"{indent}{error_indicator}{log['tool']} – Click to view result"): | |
st.markdown(f"**Args**: `{log['args']}`", unsafe_allow_html=True) | |
if log.get("error"): | |
st.error(f"Error: {log['error']}") | |
# Special handling for IndexAgent output | |
if log['tool'] == "IndexAgent" and isinstance(log["result"], dict): | |
# Display chunk statistics if available | |
if "chunk_stats" in log["result"]: | |
st.markdown("### Chunk Statistics") | |
# Create a DataFrame for better visualization | |
stats_df = pd.DataFrame(log["result"]["chunk_stats"]) | |
st.dataframe(stats_df) | |
# Add summary statistics | |
st.markdown("### Summary") | |
st.markdown(f""" | |
- Total chunks: {len(stats_df)} | |
- Average chunk length: {stats_df['length'].mean():.0f} characters | |
- Shortest chunk: {stats_df['length'].min()} characters | |
- Longest chunk: {stats_df['length'].max()} characters | |
""") | |
# Add a bar chart of chunk lengths | |
st.markdown("### Chunk Length Distribution") | |
st.bar_chart(stats_df.set_index('chunk_number')['length']) | |
else: | |
st.code(log["result"]) | |
if not df.empty: | |
st.success("Done ✓") | |
st.dataframe(df) | |
st.download_button("Download CSV", df.to_csv(index=False), "results.csv") | |
else: | |
st.warning("No results were extracted. Check the execution trace for errors.") | |
except Exception as e: | |
logging.exception("App error:") | |
st.error(f"An error occurred: {e}") |