"""Streamlit frontโ€‘end entryโ€‘point.""" import yaml import json import streamlit as st import logging from dotenv import load_dotenv from orchestrator.planner import Planner from orchestrator.executor import Executor from config.settings import settings from config.config_manager import config_manager import fitz # PyMuPDF local import to avoid heavy load on startup import pandas as pd from datetime import datetime from services.cost_tracker import CostTracker # Create a custom stream handler to capture logs class LogCaptureHandler(logging.StreamHandler): def __init__(self): super().__init__() self.logs = [] def emit(self, record): try: msg = self.format(record) self.logs.append(msg) except Exception: self.handleError(record) def get_logs(self): return "\n".join(self.logs) def clear(self): self.logs = [] # Initialize session state for storing execution history if 'execution_history' not in st.session_state: st.session_state.execution_history = [] # Initialize session state for field descriptions tables if 'field_descriptions_table' not in st.session_state: st.session_state.field_descriptions_table = [] # Initialize session state for unique indices descriptions table if 'unique_indices_descriptions_table' not in st.session_state: st.session_state.unique_indices_descriptions_table = [] # Initialize session state for fields string if 'fields_str' not in st.session_state: st.session_state.fields_str = "Chain, Percentage, Seq Loc" # Set up logging capture log_capture = LogCaptureHandler() log_capture.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')) # Configure root logger root_logger = logging.getLogger() root_logger.setLevel(logging.INFO) root_logger.addHandler(log_capture) # Configure specific loggers for logger_name in ['orchestrator', 'agents', 'services']: logger = logging.getLogger(logger_name) logger.setLevel(logging.INFO) logger.addHandler(log_capture) load_dotenv() st.set_page_config(page_title="PDF Field Extractor", layout="wide") # Sidebar navigation st.sidebar.title("Navigation") page = st.sidebar.radio("Go to", ["Documentation", "Traces", "Execution"]) # Documentation Page if page == "Documentation": st.title("Deepโ€‘Research PDF Field Extractor") st.markdown(""" ## Overview This system uses a multi-agent architecture to extract fields from PDFs with high accuracy and reliability. ### Core Components 1. **Planner** - Generates execution plans using Azure OpenAI - Determines optimal extraction strategy - Manages task dependencies 2. **Executor** - Executes the generated plan - Manages agent execution flow - Handles context and result management 3. **Agents** - `TableAgent`: Extracts text and tables using Azure Document Intelligence - `FieldMapper`: Maps fields to values using extracted content - `ForEachField`: Controls field iteration flow ### Processing Pipeline 1. **Document Processing** - Text and table extraction using Azure Document Intelligence - Layout and structure preservation - Support for complex document formats 2. **Field Extraction** - Document type inference - User profile determination - Page-by-page scanning - Value extraction and validation 3. **Context Building** - Document metadata - Field descriptions - User context - Execution history ### Key Features #### Smart Field Extraction - Two-step extraction strategy: 1. Page-by-page scanning for precise extraction 2. Semantic search fallback if no value found - Basic context awareness for improved extraction - Support for tabular data extraction #### Document Intelligence - Azure Document Intelligence integration - Layout and structure preservation - Table extraction and formatting - Complex document handling #### Execution Monitoring - Detailed execution traces - Success/failure status - Comprehensive logging - Result storage and retrieval ### Technical Requirements - Azure OpenAI API key - Azure Document Intelligence endpoint - Python 3.9 or higher - Required Python packages (see requirements.txt) ### Getting Started 1. **Upload Your PDF** - Click the "Upload PDF" button - Select your PDF file 2. **Specify Fields** - Enter comma-separated field names - Example: `Date, Name, Value, Location` 3. **Optional: Add Field Descriptions** - Provide YAML-formatted field descriptions - Helps improve extraction accuracy 4. **Run Extraction** - Click "Run extraction" - Monitor progress in execution trace - View results in table format 5. **Download Results** - Export as CSV - View detailed execution logs ### Support For detailed technical documentation, please refer to: - [Architecture Overview](ARCHITECTURE.md) - [Developer Documentation](DEVELOPER.md) """) # Traces Page elif page == "Traces": st.title("Execution Traces") if not st.session_state.execution_history: st.info("No execution traces available yet. Run an extraction to see traces here.") else: # Create a DataFrame from the execution history history_data = [] for record in st.session_state.execution_history: history_data.append({ "filename": record["filename"], "datetime": record["datetime"], "fields": ", ".join(record.get("fields", [])), "logs": record.get("logs", []), "results": record.get("results", None) }) history_df = pd.DataFrame(history_data) # Display column headers col1, col2, col3, col4, col5 = st.columns([2, 2, 3, 1, 1]) with col1: st.markdown("**Filename**") with col2: st.markdown("**Timestamp**") with col3: st.markdown("**Fields**") with col4: st.markdown("**Logs**") with col5: st.markdown("**Results**") st.markdown("---") # Add a separator line # Display the table with download buttons for idx, row in history_df.iterrows(): col1, col2, col3, col4, col5 = st.columns([2, 2, 3, 1, 1]) with col1: st.write(row["filename"]) with col2: st.write(row["datetime"]) with col3: st.write(row["fields"]) with col4: if row["logs"]: # Check if we have any logs st.download_button( "Download Logs", row["logs"], # Use the stored logs file_name=f"logs_{row['filename']}_{row['datetime']}.txt", key=f"logs_dl_{idx}" ) else: st.write("No Logs") with col5: if row["results"] is not None: results_df = pd.DataFrame(row["results"]) st.download_button( "Download Results", results_df.to_csv(index=False), file_name=f"results_{row['filename']}_{row['datetime']}.csv", key=f"results_dl_{idx}" ) else: st.write("No Results") st.markdown("---") # Add a separator line between rows # Execution Page else: # page == "Execution" st.title("Deepโ€‘Research PDF Field Extractor (POC)") def flatten_json_response(json_data, fields): """Flatten the nested JSON response into a tabular structure with dynamic columns.""" logger = logging.getLogger(__name__) logger.info("Starting flatten_json_response") logger.info(f"Input fields: {fields}") # Handle the case where the response is a string if isinstance(json_data, str): logger.info("Input is a string, attempting to parse as JSON") try: json_data = json.loads(json_data) logger.info("Successfully parsed JSON string") except json.JSONDecodeError as e: logger.error(f"Failed to parse JSON string: {e}") return pd.DataFrame(columns=fields) # If the data is wrapped in an array, get the first item if isinstance(json_data, list) and len(json_data) > 0: logger.info("Data is wrapped in an array, extracting first item") json_data = json_data[0] # If the data is a dictionary with numeric keys, get the first value if isinstance(json_data, dict): keys = list(json_data.keys()) logger.info(f"Checking dictionary keys: {keys}") # Check if all keys are integers or string representations of integers if all(isinstance(k, int) or (isinstance(k, str) and k.isdigit()) for k in keys): logger.info("Data has numeric keys, extracting first value") first_key = sorted(keys, key=lambda x: int(x) if isinstance(x, str) else x)[0] json_data = json_data[first_key] logger.info(f"Extracted data from key '{first_key}'") logger.info(f"JSON data keys: {list(json_data.keys()) if isinstance(json_data, dict) else 'Not a dict'}") # Create a list to store rows rows = [] # Get the length of the first array to determine number of rows if isinstance(json_data, dict) and len(json_data) > 0: first_field = list(json_data.keys())[0] num_rows = len(json_data[first_field]) if isinstance(json_data[first_field], list) else 1 logger.info(f"Number of rows to process: {num_rows}") # Create a row for each index for i in range(num_rows): logger.debug(f"Processing row {i}") row = {} for field in fields: if field in json_data and isinstance(json_data[field], list) and i < len(json_data[field]): row[field] = json_data[field][i] logger.debug(f"Field '{field}' value at index {i}: {json_data[field][i]}") else: row[field] = None logger.debug(f"Field '{field}' not found or index {i} out of bounds") rows.append(row) else: logger.error(f"Unexpected data structure: {type(json_data)}") return pd.DataFrame(columns=fields) # Create DataFrame with all requested fields as columns df = pd.DataFrame(rows) logger.info(f"Created DataFrame with shape: {df.shape}") logger.info(f"DataFrame columns: {df.columns.tolist()}") # Ensure columns are in the same order as the fields list df = df[fields] logger.info(f"Final DataFrame columns after reordering: {df.columns.tolist()}") return df # ============================================================================ # SECTION 1: FILE UPLOAD # ============================================================================ st.header("๐Ÿ“„ Step 1: Upload Document") pdf_file = st.file_uploader("Upload PDF", type=["pdf"], help="Select a PDF file to process") if pdf_file: st.success(f"โœ… File uploaded: {pdf_file.name}") # ============================================================================ # SECTION 2: STRATEGY SELECTION # ============================================================================ st.header("๐ŸŽฏ Step 2: Select Extraction Strategy") strategy = st.radio( "Choose your extraction approach:", ["Original Strategy", "Unique Indices Strategy"], help="**Original Strategy**: Process document page by page, extracting each field individually. **Unique Indices Strategy**: Process entire document at once using unique combinations of indices.", horizontal=True ) if strategy == "Original Strategy": st.info("๐Ÿ“‹ **Original Strategy**: Will extract fields one by one from the document pages.") else: st.info("๐Ÿ” **Unique Indices Strategy**: Will find unique combinations and extract additional fields for each.") # ============================================================================ # SECTION 3: CONFIGURATION (Only for Unique Indices Strategy) # ============================================================================ if strategy == "Unique Indices Strategy": st.header("โš™๏ธ Step 3: Configuration") # File Type Selection col1, col2 = st.columns([3, 1]) with col1: # Get available configurations config_names = config_manager.get_config_names() selected_config_name = st.selectbox( "Select File Type Configuration:", config_names, format_func=lambda x: config_manager.get_config(x)['name'] if config_manager.get_config(x) else x, help="Choose a predefined configuration or create a new one" ) with col2: if st.button("๐Ÿ”„ Load Config", help="Load the selected configuration"): config = config_manager.get_config(selected_config_name) if config: # Update fields st.session_state.fields_str = config.get('fields', '') # Update field descriptions table field_descs = config.get('field_descriptions', {}) st.session_state.field_descriptions_table = [] for field_name, field_info in field_descs.items(): st.session_state.field_descriptions_table.append({ 'field_name': field_name, 'field_description': field_info.get('description', ''), 'format': field_info.get('format', ''), 'examples': field_info.get('examples', ''), 'possible_values': field_info.get('possible_values', '') }) # Update unique indices descriptions table unique_descs = config.get('unique_indices_descriptions', {}) st.session_state.unique_indices_descriptions_table = [] for field_name, field_info in unique_descs.items(): st.session_state.unique_indices_descriptions_table.append({ 'field_name': field_name, 'field_description': field_info.get('description', ''), 'format': field_info.get('format', ''), 'examples': field_info.get('examples', ''), 'possible_values': field_info.get('possible_values', '') }) st.session_state.last_selected_config = selected_config_name st.success(f"โœ… Configuration '{config['name']}' loaded successfully!") st.rerun() else: st.error("โŒ Failed to load configuration") # Clear Configuration Button if st.button("๐Ÿ—‘๏ธ Clear All Configuration", help="Clear all configuration and start fresh"): st.session_state.field_descriptions_table = [] st.session_state.unique_indices_descriptions_table = [] st.session_state.fields_str = "" st.session_state.last_selected_config = "" st.success("โœ… Configuration cleared!") st.rerun() # ============================================================================ # SECTION 4: FIELD DESCRIPTIONS # ============================================================================ st.subheader("๐Ÿ“ Field Descriptions") st.markdown("""
Field Descriptions
Add descriptions for the fields you want to extract. These help the system understand what to look for.
""", unsafe_allow_html=True) # Create the table interface col1, col2, col3, col4, col5, col6 = st.columns([2, 3, 2, 2, 2, 1]) with col1: st.markdown("**Field Name**") with col2: st.markdown("**Field Description**") with col3: st.markdown("**Format**") with col4: st.markdown("**Examples**") with col5: st.markdown("**Possible Values**") with col6: st.markdown("**Actions**") # Display existing rows for i, row in enumerate(st.session_state.field_descriptions_table): col1, col2, col3, col4, col5, col6 = st.columns([2, 3, 2, 2, 2, 1]) with col1: field_name = st.text_input("", value=row.get('field_name', ''), key=f"field_name_{i}") with col2: field_desc = st.text_input("", value=row.get('field_description', ''), key=f"field_desc_{i}") with col3: field_format = st.text_input("", value=row.get('format', ''), key=f"field_format_{i}") with col4: field_examples = st.text_input("", value=row.get('examples', ''), key=f"field_examples_{i}") with col5: field_possible_values = st.text_input("", value=row.get('possible_values', ''), key=f"field_possible_values_{i}") with col6: if st.button("๐Ÿ—‘๏ธ", key=f"delete_{i}", help="Delete this row"): st.session_state.field_descriptions_table.pop(i) st.rerun() # Update the row in session state st.session_state.field_descriptions_table[i] = { 'field_name': field_name, 'field_description': field_desc, 'format': field_format, 'examples': field_examples, 'possible_values': field_possible_values } # Add new row button if st.button("โž• Add Field Description Row"): st.session_state.field_descriptions_table.append({ 'field_name': '', 'field_description': '', 'format': '', 'examples': '', 'possible_values': '' }) st.rerun() # ============================================================================ # SECTION 5: UNIQUE FIELD DESCRIPTIONS # ============================================================================ st.subheader("๐Ÿ”‘ Unique Field Descriptions") st.markdown("""
Unique Field Descriptions
Add descriptions for the unique fields that will be used to identify different combinations in the document.
""", unsafe_allow_html=True) # Create the table interface for unique indices col1, col2, col3, col4, col5, col6 = st.columns([2, 3, 2, 2, 2, 1]) with col1: st.markdown("**Field Name**") with col2: st.markdown("**Field Description**") with col3: st.markdown("**Format**") with col4: st.markdown("**Examples**") with col5: st.markdown("**Possible Values**") with col6: st.markdown("**Actions**") # Display existing rows for unique indices for i, row in enumerate(st.session_state.unique_indices_descriptions_table): col1, col2, col3, col4, col5, col6 = st.columns([2, 3, 2, 2, 2, 1]) with col1: idx_field_name = st.text_input("", value=row.get('field_name', ''), key=f"unique_field_name_{i}") with col2: idx_field_desc = st.text_input("", value=row.get('field_description', ''), key=f"unique_field_desc_{i}") with col3: idx_field_format = st.text_input("", value=row.get('format', ''), key=f"unique_field_format_{i}") with col4: idx_field_examples = st.text_input("", value=row.get('examples', ''), key=f"unique_field_examples_{i}") with col5: idx_field_possible_values = st.text_input("", value=row.get('possible_values', ''), key=f"unique_field_possible_values_{i}") with col6: if st.button("๐Ÿ—‘๏ธ", key=f"unique_delete_{i}", help="Delete this row"): st.session_state.unique_indices_descriptions_table.pop(i) st.rerun() # Update the row in session state st.session_state.unique_indices_descriptions_table[i] = { 'field_name': idx_field_name, 'field_description': idx_field_desc, 'format': idx_field_format, 'examples': idx_field_examples, 'possible_values': idx_field_possible_values } # Add new row button for unique indices if st.button("โž• Add Unique Field Description Row"): st.session_state.unique_indices_descriptions_table.append({ 'field_name': '', 'field_description': '', 'format': '', 'examples': '', 'possible_values': '' }) st.rerun() # ============================================================================ # SECTION 6: SAVE CONFIGURATION # ============================================================================ st.subheader("๐Ÿ’พ Save Configuration") st.markdown("""
Save Current Configuration
Save your current configuration as a new file type for future use.
""", unsafe_allow_html=True) col1, col2 = st.columns([3, 1]) with col1: save_config_name = st.text_input( "Configuration Name:", placeholder="Enter a name for this configuration (e.g., 'Biotech Report', 'Clinical Data')", help="Choose a descriptive name that will appear in the dropdown" ) with col2: if st.button("๐Ÿ’พ Save Config", help="Save the current configuration"): if save_config_name: # Prepare configuration data field_descs = {} for row in st.session_state.field_descriptions_table: if row['field_name']: # Only include rows with field names field_descs[row['field_name']] = { 'description': row['field_description'], 'format': row['format'], 'examples': row['examples'], 'possible_values': row['possible_values'] } # Get unique indices descriptions unique_indices_descs = {} for row in st.session_state.unique_indices_descriptions_table: if row['field_name']: # Only include rows with field names unique_indices_descs[row['field_name']] = { 'description': row['field_description'], 'format': row['format'], 'examples': row['examples'], 'possible_values': row['possible_values'] } # Get fields from unique indices fields_str = ", ".join([row['field_name'] for row in st.session_state.unique_indices_descriptions_table if row['field_name']]) config_data = { 'name': save_config_name, 'description': f"Configuration for {save_config_name}", 'fields': fields_str, 'field_descriptions': field_descs, 'unique_indices_descriptions': unique_indices_descs } if config_manager.save_config(save_config_name, config_data): st.success(f"โœ… Configuration '{save_config_name}' saved successfully!") config_manager.reload_configs() st.rerun() else: st.error("โŒ Failed to save configuration") else: st.error("โŒ Please enter a configuration name") # ============================================================================ # SECTION 7: ORIGINAL STRATEGY CONFIGURATION # ============================================================================ else: # Original Strategy st.header("โš™๏ธ Step 3: Field Configuration") fields_str = st.text_input( "Fields to Extract (comma-separated):", value=st.session_state.fields_str, key="fields_input", help="Enter the field names you want to extract, separated by commas" ) st.session_state.fields_str = fields_str # ============================================================================ # SECTION 8: EXECUTION # ============================================================================ st.header("๐Ÿš€ Step 4: Run Extraction") # Convert table to JSON for processing field_descs = {} if st.session_state.field_descriptions_table: for row in st.session_state.field_descriptions_table: if row['field_name']: # Only include rows with field names field_descs[row['field_name']] = { 'description': row['field_description'], 'format': row['format'], 'examples': row['examples'], 'possible_values': row['possible_values'] } # Prepare unique indices for Unique Indices Strategy unique_indices = None unique_indices_descriptions = None if strategy == "Unique Indices Strategy": # Convert unique indices table to JSON for processing and extract field names unique_indices_descriptions = {} unique_indices = [] if st.session_state.unique_indices_descriptions_table: for row in st.session_state.unique_indices_descriptions_table: if row['field_name']: # Only include rows with field names unique_indices.append(row['field_name']) unique_indices_descriptions[row['field_name']] = { 'description': row['field_description'], 'format': row['format'], 'examples': row['examples'], 'possible_values': row['possible_values'] } # Status indicator if pdf_file: if strategy == "Original Strategy": field_count = len([f.strip() for f in st.session_state.fields_str.split(",") if f.strip()]) st.info(f"๐Ÿ“Š Ready to extract {field_count} fields using Original Strategy") else: unique_count = len(unique_indices) if unique_indices else 0 field_count = len(field_descs) st.info(f"๐Ÿ“Š Ready to extract {field_count} additional fields for {unique_count} unique combinations using Unique Indices Strategy") # Run button if st.button("๐Ÿš€ Run Extraction", type="primary", disabled=not pdf_file): if not pdf_file: st.error("โŒ Please upload a PDF file first") else: # Prepare field list based on strategy if strategy == "Original Strategy": field_list = [f.strip() for f in st.session_state.fields_str.split(",") if f.strip()] else: # Unique Indices Strategy # For Unique Indices Strategy, get additional fields from the field descriptions table field_list = [] if st.session_state.field_descriptions_table: for row in st.session_state.field_descriptions_table: if row['field_name']: # Only include rows with field names field_list.append(row['field_name']) try: with st.spinner("Planning โ€ฆ"): # quick first-page text preview to give LLM document context doc = fitz.open(stream=pdf_file.getvalue(), filetype="pdf") # type: ignore[arg-type] preview = "\n".join(page.get_text() for page in doc[:10])[:20000] # first 2 pages, 2k chars # Create a cost tracker for this run cost_tracker = CostTracker() planner = Planner(cost_tracker=cost_tracker) plan = planner.build_plan( pdf_meta={"filename": pdf_file.name}, doc_preview=preview, fields=field_list, field_descs=field_descs, strategy=strategy, unique_indices=unique_indices, unique_indices_descriptions=unique_indices_descriptions ) # Add a visual separator st.markdown("---") with st.spinner("Executing โ€ฆ"): executor = Executor(settings=settings, cost_tracker=cost_tracker) results, logs = executor.run(plan, pdf_file) # Get detailed costs costs = executor.cost_tracker.calculate_current_file_costs() model_cost = costs["openai"]["total_cost"] di_cost = costs["document_intelligence"]["total_cost"] # Add debug logging for cost tracking logger.info(f"Cost tracker debug info:") logger.info(f" LLM input tokens: {executor.cost_tracker.llm_input_tokens}") logger.info(f" LLM output tokens: {executor.cost_tracker.llm_output_tokens}") logger.info(f" DI pages: {executor.cost_tracker.di_pages}") logger.info(f" LLM calls count: {len(executor.cost_tracker.llm_calls)}") logger.info(f" Current file costs: {executor.cost_tracker.current_file_costs}") logger.info(f" Calculated costs: {costs}") # Display detailed costs table st.subheader("Detailed Costs") costs_df = executor.cost_tracker.get_detailed_costs_table() st.dataframe(costs_df, use_container_width=True) st.info( f"LLM input tokens: {executor.cost_tracker.llm_input_tokens}, " f"LLM output tokens: {executor.cost_tracker.llm_output_tokens}, " f"DI pages: {executor.cost_tracker.di_pages}, " f"Model cost: ${model_cost:.4f}, " f"DI cost: ${di_cost:.4f}, " f"Total cost: ${model_cost + di_cost:.4f}" ) # Add detailed logging about what executor returned logger.info(f"Executor returned results of type: {type(results)}") logger.info(f"Results content: {results}") # Check if results is already a DataFrame if isinstance(results, pd.DataFrame): logger.info(f"Results is already a DataFrame with shape: {results.shape}") logger.info(f"DataFrame columns: {results.columns.tolist()}") logger.info(f"DataFrame head: {results.head()}") df = results else: logger.info("Results is not a DataFrame, calling flatten_json_response") # Process results using flatten_json_response df = flatten_json_response(results, field_list) # Log final DataFrame info logger.info(f"Final DataFrame shape: {df.shape}") logger.info(f"Final DataFrame columns: {df.columns.tolist()}") if not df.empty: logger.info(f"Final DataFrame sample: {df.head()}") # Store execution in history execution_record = { "filename": pdf_file.name, "datetime": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "fields": field_list, "logs": log_capture.get_logs(), # Store the actual logs "results": df.to_dict() if not df.empty else None } st.session_state.execution_history.append(execution_record) log_capture.clear() # Clear logs after storing them # ----------------- UI: show execution tree ----------------- st.subheader("Execution trace") for log in logs: indent = " " * 4 * log["depth"] # Add error indicator if there was an error error_indicator = "โŒ " if log.get("error") else "โœ“ " # Use a fixed preview text instead of the result with st.expander(f"{indent}{error_indicator}{log['tool']} โ€“ Click to view result"): st.markdown(f"**Args**: `{log['args']}`", unsafe_allow_html=True) if log.get("error"): st.error(f"Error: {log['error']}") # Special handling for IndexAgent output if log['tool'] == "IndexAgent" and isinstance(log["result"], dict): # Display chunk statistics if available if "chunk_stats" in log["result"]: st.markdown("### Chunk Statistics") # Create a DataFrame for better visualization stats_df = pd.DataFrame(log["result"]["chunk_stats"]) st.dataframe(stats_df) # Add summary statistics st.markdown("### Summary") st.markdown(f""" - Total chunks: {len(stats_df)} - Average chunk length: {stats_df['length'].mean():.0f} characters - Shortest chunk: {stats_df['length'].min()} characters - Longest chunk: {stats_df['length'].max()} characters """) # Add a bar chart of chunk lengths st.markdown("### Chunk Length Distribution") st.bar_chart(stats_df.set_index('chunk_number')['length']) else: st.code(log["result"]) if not df.empty: st.success("Done โœ“") st.dataframe(df) st.download_button("Download CSV", df.to_csv(index=False), "results.csv") else: st.warning("No results were extracted. Check the execution trace for errors.") except Exception as e: logging.exception("App error:") st.error(f"An error occurred: {e}")