doctorecord / src /app.py
levalencia's picture
feat: update unique indices combinator to return array of objects
f98e92f
"""Streamlit front‑end entry‑point."""
import yaml
import json
import streamlit as st
import logging
from dotenv import load_dotenv
from orchestrator.planner import Planner
from orchestrator.executor import Executor
from config.settings import settings
import fitz # PyMuPDF local import to avoid heavy load on startup
import pandas as pd
from datetime import datetime
from services.cost_tracker import CostTracker
# Create a custom stream handler to capture logs
class LogCaptureHandler(logging.StreamHandler):
def __init__(self):
super().__init__()
self.logs = []
def emit(self, record):
try:
msg = self.format(record)
self.logs.append(msg)
except Exception:
self.handleError(record)
def get_logs(self):
return "\n".join(self.logs)
def clear(self):
self.logs = []
# Initialize session state for storing execution history
if 'execution_history' not in st.session_state:
st.session_state.execution_history = []
# Set up logging capture
log_capture = LogCaptureHandler()
log_capture.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
# Configure root logger
root_logger = logging.getLogger()
root_logger.setLevel(logging.INFO)
root_logger.addHandler(log_capture)
# Configure specific loggers
for logger_name in ['orchestrator', 'agents', 'services']:
logger = logging.getLogger(logger_name)
logger.setLevel(logging.INFO)
logger.addHandler(log_capture)
load_dotenv()
st.set_page_config(page_title="PDF Field Extractor", layout="wide")
# Sidebar navigation
st.sidebar.title("Navigation")
page = st.sidebar.radio("Go to", ["Documentation", "Traces", "Execution"])
# Documentation Page
if page == "Documentation":
st.title("Deep‑Research PDF Field Extractor")
st.markdown("""
## Overview
This system uses a multi-agent architecture to extract fields from PDFs with high accuracy and reliability.
### Core Components
1. **Planner**
- Generates execution plans using Azure OpenAI
- Determines optimal extraction strategy
- Manages task dependencies
2. **Executor**
- Executes the generated plan
- Manages agent execution flow
- Handles context and result management
3. **Agents**
- `TableAgent`: Extracts text and tables using Azure Document Intelligence
- `FieldMapper`: Maps fields to values using extracted content
- `ForEachField`: Controls field iteration flow
### Processing Pipeline
1. **Document Processing**
- Text and table extraction using Azure Document Intelligence
- Layout and structure preservation
- Support for complex document formats
2. **Field Extraction**
- Document type inference
- User profile determination
- Page-by-page scanning
- Value extraction and validation
3. **Context Building**
- Document metadata
- Field descriptions
- User context
- Execution history
### Key Features
#### Smart Field Extraction
- Two-step extraction strategy:
1. Page-by-page scanning for precise extraction
2. Semantic search fallback if no value found
- Basic context awareness for improved extraction
- Support for tabular data extraction
#### Document Intelligence
- Azure Document Intelligence integration
- Layout and structure preservation
- Table extraction and formatting
- Complex document handling
#### Execution Monitoring
- Detailed execution traces
- Success/failure status
- Comprehensive logging
- Result storage and retrieval
### Technical Requirements
- Azure OpenAI API key
- Azure Document Intelligence endpoint
- Python 3.9 or higher
- Required Python packages (see requirements.txt)
### Getting Started
1. **Upload Your PDF**
- Click the "Upload PDF" button
- Select your PDF file
2. **Specify Fields**
- Enter comma-separated field names
- Example: `Date, Name, Value, Location`
3. **Optional: Add Field Descriptions**
- Provide YAML-formatted field descriptions
- Helps improve extraction accuracy
4. **Run Extraction**
- Click "Run extraction"
- Monitor progress in execution trace
- View results in table format
5. **Download Results**
- Export as CSV
- View detailed execution logs
### Support
For detailed technical documentation, please refer to:
- [Architecture Overview](ARCHITECTURE.md)
- [Developer Documentation](DEVELOPER.md)
""")
# Traces Page
elif page == "Traces":
st.title("Execution Traces")
if not st.session_state.execution_history:
st.info("No execution traces available yet. Run an extraction to see traces here.")
else:
# Create a DataFrame from the execution history
history_data = []
for record in st.session_state.execution_history:
history_data.append({
"filename": record["filename"],
"datetime": record["datetime"],
"fields": ", ".join(record.get("fields", [])),
"logs": record.get("logs", []),
"results": record.get("results", None)
})
history_df = pd.DataFrame(history_data)
# Display column headers
col1, col2, col3, col4, col5 = st.columns([2, 2, 3, 1, 1])
with col1:
st.markdown("**Filename**")
with col2:
st.markdown("**Timestamp**")
with col3:
st.markdown("**Fields**")
with col4:
st.markdown("**Logs**")
with col5:
st.markdown("**Results**")
st.markdown("---") # Add a separator line
# Display the table with download buttons
for idx, row in history_df.iterrows():
col1, col2, col3, col4, col5 = st.columns([2, 2, 3, 1, 1])
with col1:
st.write(row["filename"])
with col2:
st.write(row["datetime"])
with col3:
st.write(row["fields"])
with col4:
if row["logs"]: # Check if we have any logs
st.download_button(
"Download Logs",
row["logs"], # Use the stored logs
file_name=f"logs_{row['filename']}_{row['datetime']}.txt",
key=f"logs_dl_{idx}"
)
else:
st.write("No Logs")
with col5:
if row["results"] is not None:
results_df = pd.DataFrame(row["results"])
st.download_button(
"Download Results",
results_df.to_csv(index=False),
file_name=f"results_{row['filename']}_{row['datetime']}.csv",
key=f"results_dl_{idx}"
)
else:
st.write("No Results")
st.markdown("---") # Add a separator line between rows
# Execution Page
else: # page == "Execution"
st.title("Deep‑Research PDF Field Extractor (POC)")
pdf_file = st.file_uploader("Upload PDF", type=["pdf"])
fields_str = st.text_input("Fields (comma‑separated)", "Protein Lot, Chain, Residue")
desc_blob = st.text_area("Field descriptions / rules (YAML, optional)")
# Add strategy selector
strategy = st.radio(
"Select Extraction Strategy",
["Original Strategy", "Unique Indices Strategy"],
help="Original Strategy: Process document page by page. Unique Indices Strategy: Process entire document at once using unique indices."
)
# Add unique indices input if Unique Indices Strategy is selected
unique_indices = None
unique_indices_descriptions = None
if strategy == "Unique Indices Strategy":
unique_indices_str = st.text_input(
"Unique Fields (comma-separated)",
help="Enter the field names that uniquely identify each record (e.g., 'timepoint, Modification, peptide')"
)
if unique_indices_str:
unique_indices = [idx.strip() for idx in unique_indices_str.split(",") if idx.strip()]
# Add descriptions for each unique index
st.subheader("Unique Fields Descriptions")
st.markdown("""
Please provide a description for each unique field. This helps the system better understand what to look for.
Example:
```
Protein Lot: Batch number of the Proteins
Timepoint: Time at which modification was measured (e.g., 0w, 2w, 4w)
Modification: Type of post-translational modification
Peptide: Peptide sequence containing the modification
```
""")
unique_indices_descriptions_str = st.text_area(
"Unique Fields Descriptions (YAML format)",
help="Enter descriptions for each unique field in YAML format"
)
if unique_indices_descriptions_str:
try:
unique_indices_descriptions = yaml.safe_load(unique_indices_descriptions_str)
if not isinstance(unique_indices_descriptions, dict):
st.error("Descriptions must be in YAML format with field names as keys")
unique_indices_descriptions = None
except yaml.YAMLError as e:
st.error(f"Invalid YAML format: {e}")
unique_indices_descriptions = None
def flatten_json_response(json_data, fields):
"""Flatten the nested JSON response into a tabular structure with dynamic columns."""
logger = logging.getLogger(__name__)
logger.info("Starting flatten_json_response")
logger.info(f"Input fields: {fields}")
# Handle the case where the response is a string
if isinstance(json_data, str):
logger.info("Input is a string, attempting to parse as JSON")
try:
json_data = json.loads(json_data)
logger.info("Successfully parsed JSON string")
except json.JSONDecodeError as e:
logger.error(f"Failed to parse JSON string: {e}")
return pd.DataFrame(columns=fields)
# If the data is wrapped in an array, get the first item
if isinstance(json_data, list) and len(json_data) > 0:
logger.info("Data is wrapped in an array, extracting first item")
json_data = json_data[0]
# If the data is a dictionary with numeric keys, get the first value
if isinstance(json_data, dict):
keys = list(json_data.keys())
logger.info(f"Checking dictionary keys: {keys}")
# Check if all keys are integers or string representations of integers
if all(isinstance(k, int) or (isinstance(k, str) and k.isdigit()) for k in keys):
logger.info("Data has numeric keys, extracting first value")
first_key = sorted(keys, key=lambda x: int(x) if isinstance(x, str) else x)[0]
json_data = json_data[first_key]
logger.info(f"Extracted data from key '{first_key}'")
logger.info(f"JSON data keys: {list(json_data.keys()) if isinstance(json_data, dict) else 'Not a dict'}")
# Create a list to store rows
rows = []
# Get the length of the first array to determine number of rows
if isinstance(json_data, dict) and len(json_data) > 0:
first_field = list(json_data.keys())[0]
num_rows = len(json_data[first_field]) if isinstance(json_data[first_field], list) else 1
logger.info(f"Number of rows to process: {num_rows}")
# Create a row for each index
for i in range(num_rows):
logger.debug(f"Processing row {i}")
row = {}
for field in fields:
if field in json_data and isinstance(json_data[field], list) and i < len(json_data[field]):
row[field] = json_data[field][i]
logger.debug(f"Field '{field}' value at index {i}: {json_data[field][i]}")
else:
row[field] = None
logger.debug(f"Field '{field}' not found or index {i} out of bounds")
rows.append(row)
else:
logger.error(f"Unexpected data structure: {type(json_data)}")
return pd.DataFrame(columns=fields)
# Create DataFrame with all requested fields as columns
df = pd.DataFrame(rows)
logger.info(f"Created DataFrame with shape: {df.shape}")
logger.info(f"DataFrame columns: {df.columns.tolist()}")
# Ensure columns are in the same order as the fields list
df = df[fields]
logger.info(f"Final DataFrame columns after reordering: {df.columns.tolist()}")
return df
if st.button("Run extraction") and pdf_file:
field_list = [f.strip() for f in fields_str.split(",") if f.strip()]
field_descs = yaml.safe_load(desc_blob) if desc_blob.strip() else {}
try:
with st.spinner("Planning …"):
# quick first-page text preview to give LLM document context
doc = fitz.open(stream=pdf_file.getvalue(), filetype="pdf") # type: ignore[arg-type]
preview = "\n".join(page.get_text() for page in doc[:10])[:20000] # first 2 pages, 2k chars
# Create a cost tracker for this run
cost_tracker = CostTracker()
planner = Planner(cost_tracker=cost_tracker)
plan = planner.build_plan(
pdf_meta={"filename": pdf_file.name},
doc_preview=preview,
fields=field_list,
field_descs=field_descs,
strategy=strategy,
unique_indices=unique_indices,
unique_indices_descriptions=unique_indices_descriptions
)
# Add a visual separator
st.markdown("---")
with st.spinner("Executing …"):
executor = Executor(settings=settings, cost_tracker=cost_tracker)
results, logs = executor.run(plan, pdf_file)
# Get detailed costs
costs = executor.cost_tracker.calculate_current_file_costs()
model_cost = costs["openai"]["total_cost"]
di_cost = costs["document_intelligence"]["total_cost"]
# Display detailed costs table
st.subheader("Detailed Costs")
costs_df = executor.cost_tracker.get_detailed_costs_table()
st.dataframe(costs_df, use_container_width=True)
st.info(
f"LLM input tokens: {executor.cost_tracker.llm_input_tokens}, "
f"LLM output tokens: {executor.cost_tracker.llm_output_tokens}, "
f"DI pages: {executor.cost_tracker.di_pages}, "
f"Model cost: ${model_cost:.4f}, "
f"DI cost: ${di_cost:.4f}, "
f"Total cost: ${model_cost + di_cost:.4f}"
)
# Add detailed logging about what executor returned
logger.info(f"Executor returned results of type: {type(results)}")
logger.info(f"Results content: {results}")
# Check if results is already a DataFrame
if isinstance(results, pd.DataFrame):
logger.info(f"Results is already a DataFrame with shape: {results.shape}")
logger.info(f"DataFrame columns: {results.columns.tolist()}")
logger.info(f"DataFrame head: {results.head()}")
df = results
else:
logger.info("Results is not a DataFrame, calling flatten_json_response")
# Process results using flatten_json_response
df = flatten_json_response(results, field_list)
# Log final DataFrame info
logger.info(f"Final DataFrame shape: {df.shape}")
logger.info(f"Final DataFrame columns: {df.columns.tolist()}")
if not df.empty:
logger.info(f"Final DataFrame sample: {df.head()}")
# Store execution in history
execution_record = {
"filename": pdf_file.name,
"datetime": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"fields": field_list,
"logs": log_capture.get_logs(), # Store the actual logs
"results": df.to_dict() if not df.empty else None
}
st.session_state.execution_history.append(execution_record)
log_capture.clear() # Clear logs after storing them
# ----------------- UI: show execution tree -----------------
st.subheader("Execution trace")
for log in logs:
indent = "&nbsp;" * 4 * log["depth"]
# Add error indicator if there was an error
error_indicator = "❌ " if log.get("error") else "✓ "
# Use a fixed preview text instead of the result
with st.expander(f"{indent}{error_indicator}{log['tool']} – Click to view result"):
st.markdown(f"**Args**: `{log['args']}`", unsafe_allow_html=True)
if log.get("error"):
st.error(f"Error: {log['error']}")
# Special handling for IndexAgent output
if log['tool'] == "IndexAgent" and isinstance(log["result"], dict):
# Display chunk statistics if available
if "chunk_stats" in log["result"]:
st.markdown("### Chunk Statistics")
# Create a DataFrame for better visualization
stats_df = pd.DataFrame(log["result"]["chunk_stats"])
st.dataframe(stats_df)
# Add summary statistics
st.markdown("### Summary")
st.markdown(f"""
- Total chunks: {len(stats_df)}
- Average chunk length: {stats_df['length'].mean():.0f} characters
- Shortest chunk: {stats_df['length'].min()} characters
- Longest chunk: {stats_df['length'].max()} characters
""")
# Add a bar chart of chunk lengths
st.markdown("### Chunk Length Distribution")
st.bar_chart(stats_df.set_index('chunk_number')['length'])
else:
st.code(log["result"])
if not df.empty:
st.success("Done ✓")
st.dataframe(df)
st.download_button("Download CSV", df.to_csv(index=False), "results.csv")
else:
st.warning("No results were extracted. Check the execution trace for errors.")
except Exception as e:
logging.exception("App error:")
st.error(f"An error occurred: {e}")