Spaces:
Running
Running
"""Streamlit front‑end entry‑point.""" | |
import yaml | |
import json | |
import streamlit as st | |
import logging | |
from dotenv import load_dotenv | |
from orchestrator.planner import Planner | |
from orchestrator.executor import Executor | |
from config.settings import settings | |
import fitz # PyMuPDF local import to avoid heavy load on startup | |
import pandas as pd | |
from datetime import datetime | |
from services.cost_tracker import CostTracker | |
# Create a custom stream handler to capture logs | |
class LogCaptureHandler(logging.StreamHandler): | |
def __init__(self): | |
super().__init__() | |
self.logs = [] | |
def emit(self, record): | |
try: | |
msg = self.format(record) | |
self.logs.append(msg) | |
except Exception: | |
self.handleError(record) | |
def get_logs(self): | |
return "\n".join(self.logs) | |
def clear(self): | |
self.logs = [] | |
# Initialize session state for storing execution history | |
if 'execution_history' not in st.session_state: | |
st.session_state.execution_history = [] | |
# Set up logging capture | |
log_capture = LogCaptureHandler() | |
log_capture.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')) | |
# Configure root logger | |
root_logger = logging.getLogger() | |
root_logger.setLevel(logging.INFO) | |
root_logger.addHandler(log_capture) | |
# Configure specific loggers | |
for logger_name in ['orchestrator', 'agents', 'services']: | |
logger = logging.getLogger(logger_name) | |
logger.setLevel(logging.INFO) | |
logger.addHandler(log_capture) | |
load_dotenv() | |
st.set_page_config(page_title="PDF Field Extractor", layout="wide") | |
# Sidebar navigation | |
st.sidebar.title("Navigation") | |
page = st.sidebar.radio("Go to", ["Documentation", "Traces", "Execution"]) | |
# Documentation Page | |
if page == "Documentation": | |
st.title("Deep‑Research PDF Field Extractor") | |
st.markdown(""" | |
## Overview | |
This system uses a multi-agent architecture to extract fields from PDFs with high accuracy and reliability. | |
### Core Components | |
1. **Planner** | |
- Generates execution plans using Azure OpenAI | |
- Determines optimal extraction strategy | |
- Manages task dependencies | |
2. **Executor** | |
- Executes the generated plan | |
- Manages agent execution flow | |
- Handles context and result management | |
3. **Agents** | |
- `TableAgent`: Extracts text and tables using Azure Document Intelligence | |
- `FieldMapper`: Maps fields to values using extracted content | |
- `ForEachField`: Controls field iteration flow | |
### Processing Pipeline | |
1. **Document Processing** | |
- Text and table extraction using Azure Document Intelligence | |
- Layout and structure preservation | |
- Support for complex document formats | |
2. **Field Extraction** | |
- Document type inference | |
- User profile determination | |
- Page-by-page scanning | |
- Value extraction and validation | |
3. **Context Building** | |
- Document metadata | |
- Field descriptions | |
- User context | |
- Execution history | |
### Key Features | |
#### Smart Field Extraction | |
- Two-step extraction strategy: | |
1. Page-by-page scanning for precise extraction | |
2. Semantic search fallback if no value found | |
- Basic context awareness for improved extraction | |
- Support for tabular data extraction | |
#### Document Intelligence | |
- Azure Document Intelligence integration | |
- Layout and structure preservation | |
- Table extraction and formatting | |
- Complex document handling | |
#### Execution Monitoring | |
- Detailed execution traces | |
- Success/failure status | |
- Comprehensive logging | |
- Result storage and retrieval | |
### Technical Requirements | |
- Azure OpenAI API key | |
- Azure Document Intelligence endpoint | |
- Python 3.9 or higher | |
- Required Python packages (see requirements.txt) | |
### Getting Started | |
1. **Upload Your PDF** | |
- Click the "Upload PDF" button | |
- Select your PDF file | |
2. **Specify Fields** | |
- Enter comma-separated field names | |
- Example: `Date, Name, Value, Location` | |
3. **Optional: Add Field Descriptions** | |
- Provide YAML-formatted field descriptions | |
- Helps improve extraction accuracy | |
4. **Run Extraction** | |
- Click "Run extraction" | |
- Monitor progress in execution trace | |
- View results in table format | |
5. **Download Results** | |
- Export as CSV | |
- View detailed execution logs | |
### Support | |
For detailed technical documentation, please refer to: | |
- [Architecture Overview](ARCHITECTURE.md) | |
- [Developer Documentation](DEVELOPER.md) | |
""") | |
# Traces Page | |
elif page == "Traces": | |
st.title("Execution Traces") | |
if not st.session_state.execution_history: | |
st.info("No execution traces available yet. Run an extraction to see traces here.") | |
else: | |
# Create a DataFrame from the execution history | |
history_data = [] | |
for record in st.session_state.execution_history: | |
history_data.append({ | |
"filename": record["filename"], | |
"datetime": record["datetime"], | |
"fields": ", ".join(record.get("fields", [])), | |
"logs": record.get("logs", []), | |
"results": record.get("results", None) | |
}) | |
history_df = pd.DataFrame(history_data) | |
# Display column headers | |
col1, col2, col3, col4, col5 = st.columns([2, 2, 3, 1, 1]) | |
with col1: | |
st.markdown("**Filename**") | |
with col2: | |
st.markdown("**Timestamp**") | |
with col3: | |
st.markdown("**Fields**") | |
with col4: | |
st.markdown("**Logs**") | |
with col5: | |
st.markdown("**Results**") | |
st.markdown("---") # Add a separator line | |
# Display the table with download buttons | |
for idx, row in history_df.iterrows(): | |
col1, col2, col3, col4, col5 = st.columns([2, 2, 3, 1, 1]) | |
with col1: | |
st.write(row["filename"]) | |
with col2: | |
st.write(row["datetime"]) | |
with col3: | |
st.write(row["fields"]) | |
with col4: | |
if row["logs"]: # Check if we have any logs | |
st.download_button( | |
"Download Logs", | |
row["logs"], # Use the stored logs | |
file_name=f"logs_{row['filename']}_{row['datetime']}.txt", | |
key=f"logs_dl_{idx}" | |
) | |
else: | |
st.write("No Logs") | |
with col5: | |
if row["results"] is not None: | |
results_df = pd.DataFrame(row["results"]) | |
st.download_button( | |
"Download Results", | |
results_df.to_csv(index=False), | |
file_name=f"results_{row['filename']}_{row['datetime']}.csv", | |
key=f"results_dl_{idx}" | |
) | |
else: | |
st.write("No Results") | |
st.markdown("---") # Add a separator line between rows | |
# Execution Page | |
else: # page == "Execution" | |
st.title("Deep‑Research PDF Field Extractor (POC)") | |
pdf_file = st.file_uploader("Upload PDF", type=["pdf"]) | |
fields_str = st.text_input("Fields (comma‑separated)", "Protein Lot, Chain, Residue") | |
desc_blob = st.text_area("Field descriptions / rules (YAML, optional)") | |
# Add strategy selector | |
strategy = st.radio( | |
"Select Extraction Strategy", | |
["Original Strategy", "Unique Indices Strategy"], | |
help="Original Strategy: Process document page by page. Unique Indices Strategy: Process entire document at once using unique indices." | |
) | |
# Add unique indices input if Unique Indices Strategy is selected | |
unique_indices = None | |
unique_indices_descriptions = None | |
if strategy == "Unique Indices Strategy": | |
unique_indices_str = st.text_input( | |
"Unique Fields (comma-separated)", | |
help="Enter the field names that uniquely identify each record (e.g., 'timepoint, Modification, peptide')" | |
) | |
if unique_indices_str: | |
unique_indices = [idx.strip() for idx in unique_indices_str.split(",") if idx.strip()] | |
# Add descriptions for each unique index | |
st.subheader("Unique Fields Descriptions") | |
st.markdown(""" | |
Please provide a description for each unique field. This helps the system better understand what to look for. | |
Example: | |
``` | |
Protein Lot: Batch number of the Proteins | |
Timepoint: Time at which modification was measured (e.g., 0w, 2w, 4w) | |
Modification: Type of post-translational modification | |
Peptide: Peptide sequence containing the modification | |
``` | |
""") | |
unique_indices_descriptions_str = st.text_area( | |
"Unique Fields Descriptions (YAML format)", | |
help="Enter descriptions for each unique field in YAML format" | |
) | |
if unique_indices_descriptions_str: | |
try: | |
unique_indices_descriptions = yaml.safe_load(unique_indices_descriptions_str) | |
if not isinstance(unique_indices_descriptions, dict): | |
st.error("Descriptions must be in YAML format with field names as keys") | |
unique_indices_descriptions = None | |
except yaml.YAMLError as e: | |
st.error(f"Invalid YAML format: {e}") | |
unique_indices_descriptions = None | |
def flatten_json_response(json_data, fields): | |
"""Flatten the nested JSON response into a tabular structure with dynamic columns.""" | |
logger = logging.getLogger(__name__) | |
logger.info("Starting flatten_json_response") | |
logger.info(f"Input fields: {fields}") | |
# Handle the case where the response is a string | |
if isinstance(json_data, str): | |
logger.info("Input is a string, attempting to parse as JSON") | |
try: | |
json_data = json.loads(json_data) | |
logger.info("Successfully parsed JSON string") | |
except json.JSONDecodeError as e: | |
logger.error(f"Failed to parse JSON string: {e}") | |
return pd.DataFrame(columns=fields) | |
# If the data is wrapped in an array, get the first item | |
if isinstance(json_data, list) and len(json_data) > 0: | |
logger.info("Data is wrapped in an array, extracting first item") | |
json_data = json_data[0] | |
# If the data is a dictionary with numeric keys, get the first value | |
if isinstance(json_data, dict): | |
keys = list(json_data.keys()) | |
logger.info(f"Checking dictionary keys: {keys}") | |
# Check if all keys are integers or string representations of integers | |
if all(isinstance(k, int) or (isinstance(k, str) and k.isdigit()) for k in keys): | |
logger.info("Data has numeric keys, extracting first value") | |
first_key = sorted(keys, key=lambda x: int(x) if isinstance(x, str) else x)[0] | |
json_data = json_data[first_key] | |
logger.info(f"Extracted data from key '{first_key}'") | |
logger.info(f"JSON data keys: {list(json_data.keys()) if isinstance(json_data, dict) else 'Not a dict'}") | |
# Create a list to store rows | |
rows = [] | |
# Get the length of the first array to determine number of rows | |
if isinstance(json_data, dict) and len(json_data) > 0: | |
first_field = list(json_data.keys())[0] | |
num_rows = len(json_data[first_field]) if isinstance(json_data[first_field], list) else 1 | |
logger.info(f"Number of rows to process: {num_rows}") | |
# Create a row for each index | |
for i in range(num_rows): | |
logger.debug(f"Processing row {i}") | |
row = {} | |
for field in fields: | |
if field in json_data and isinstance(json_data[field], list) and i < len(json_data[field]): | |
row[field] = json_data[field][i] | |
logger.debug(f"Field '{field}' value at index {i}: {json_data[field][i]}") | |
else: | |
row[field] = None | |
logger.debug(f"Field '{field}' not found or index {i} out of bounds") | |
rows.append(row) | |
else: | |
logger.error(f"Unexpected data structure: {type(json_data)}") | |
return pd.DataFrame(columns=fields) | |
# Create DataFrame with all requested fields as columns | |
df = pd.DataFrame(rows) | |
logger.info(f"Created DataFrame with shape: {df.shape}") | |
logger.info(f"DataFrame columns: {df.columns.tolist()}") | |
# Ensure columns are in the same order as the fields list | |
df = df[fields] | |
logger.info(f"Final DataFrame columns after reordering: {df.columns.tolist()}") | |
return df | |
if st.button("Run extraction") and pdf_file: | |
field_list = [f.strip() for f in fields_str.split(",") if f.strip()] | |
field_descs = yaml.safe_load(desc_blob) if desc_blob.strip() else {} | |
try: | |
with st.spinner("Planning …"): | |
# quick first-page text preview to give LLM document context | |
doc = fitz.open(stream=pdf_file.getvalue(), filetype="pdf") # type: ignore[arg-type] | |
preview = "\n".join(page.get_text() for page in doc[:10])[:20000] # first 2 pages, 2k chars | |
# Create a cost tracker for this run | |
cost_tracker = CostTracker() | |
planner = Planner(cost_tracker=cost_tracker) | |
plan = planner.build_plan( | |
pdf_meta={"filename": pdf_file.name}, | |
doc_preview=preview, | |
fields=field_list, | |
field_descs=field_descs, | |
strategy=strategy, | |
unique_indices=unique_indices, | |
unique_indices_descriptions=unique_indices_descriptions | |
) | |
# Add a visual separator | |
st.markdown("---") | |
with st.spinner("Executing …"): | |
executor = Executor(settings=settings, cost_tracker=cost_tracker) | |
results, logs = executor.run(plan, pdf_file) | |
# Get detailed costs | |
costs = executor.cost_tracker.calculate_current_file_costs() | |
model_cost = costs["openai"]["total_cost"] | |
di_cost = costs["document_intelligence"]["total_cost"] | |
# Display detailed costs table | |
st.subheader("Detailed Costs") | |
costs_df = executor.cost_tracker.get_detailed_costs_table() | |
st.dataframe(costs_df, use_container_width=True) | |
st.info( | |
f"LLM input tokens: {executor.cost_tracker.llm_input_tokens}, " | |
f"LLM output tokens: {executor.cost_tracker.llm_output_tokens}, " | |
f"DI pages: {executor.cost_tracker.di_pages}, " | |
f"Model cost: ${model_cost:.4f}, " | |
f"DI cost: ${di_cost:.4f}, " | |
f"Total cost: ${model_cost + di_cost:.4f}" | |
) | |
# Add detailed logging about what executor returned | |
logger.info(f"Executor returned results of type: {type(results)}") | |
logger.info(f"Results content: {results}") | |
# Check if results is already a DataFrame | |
if isinstance(results, pd.DataFrame): | |
logger.info(f"Results is already a DataFrame with shape: {results.shape}") | |
logger.info(f"DataFrame columns: {results.columns.tolist()}") | |
logger.info(f"DataFrame head: {results.head()}") | |
df = results | |
else: | |
logger.info("Results is not a DataFrame, calling flatten_json_response") | |
# Process results using flatten_json_response | |
df = flatten_json_response(results, field_list) | |
# Log final DataFrame info | |
logger.info(f"Final DataFrame shape: {df.shape}") | |
logger.info(f"Final DataFrame columns: {df.columns.tolist()}") | |
if not df.empty: | |
logger.info(f"Final DataFrame sample: {df.head()}") | |
# Store execution in history | |
execution_record = { | |
"filename": pdf_file.name, | |
"datetime": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), | |
"fields": field_list, | |
"logs": log_capture.get_logs(), # Store the actual logs | |
"results": df.to_dict() if not df.empty else None | |
} | |
st.session_state.execution_history.append(execution_record) | |
log_capture.clear() # Clear logs after storing them | |
# ----------------- UI: show execution tree ----------------- | |
st.subheader("Execution trace") | |
for log in logs: | |
indent = " " * 4 * log["depth"] | |
# Add error indicator if there was an error | |
error_indicator = "❌ " if log.get("error") else "✓ " | |
# Use a fixed preview text instead of the result | |
with st.expander(f"{indent}{error_indicator}{log['tool']} – Click to view result"): | |
st.markdown(f"**Args**: `{log['args']}`", unsafe_allow_html=True) | |
if log.get("error"): | |
st.error(f"Error: {log['error']}") | |
# Special handling for IndexAgent output | |
if log['tool'] == "IndexAgent" and isinstance(log["result"], dict): | |
# Display chunk statistics if available | |
if "chunk_stats" in log["result"]: | |
st.markdown("### Chunk Statistics") | |
# Create a DataFrame for better visualization | |
stats_df = pd.DataFrame(log["result"]["chunk_stats"]) | |
st.dataframe(stats_df) | |
# Add summary statistics | |
st.markdown("### Summary") | |
st.markdown(f""" | |
- Total chunks: {len(stats_df)} | |
- Average chunk length: {stats_df['length'].mean():.0f} characters | |
- Shortest chunk: {stats_df['length'].min()} characters | |
- Longest chunk: {stats_df['length'].max()} characters | |
""") | |
# Add a bar chart of chunk lengths | |
st.markdown("### Chunk Length Distribution") | |
st.bar_chart(stats_df.set_index('chunk_number')['length']) | |
else: | |
st.code(log["result"]) | |
if not df.empty: | |
st.success("Done ✓") | |
st.dataframe(df) | |
st.download_button("Download CSV", df.to_csv(index=False), "results.csv") | |
else: | |
st.warning("No results were extracted. Check the execution trace for errors.") | |
except Exception as e: | |
logging.exception("App error:") | |
st.error(f"An error occurred: {e}") |