import gradio as gr import os import json import base64 import tempfile from pathlib import Path EXTRACTORS = ['pdf_plumber', 'py_pdf', 'docling', 'extractous', 'pypdfium2', 'pymupdf', 'pymupdf_llm'] def add_page_breaks(text, page_offsets): """Add page break markers to text based on page_offsets.""" if not page_offsets: return text result = [] last_offset = 0 for offset in page_offsets: result.append(text[last_offset:offset]) result.append("\n<---page-break--->\n") last_offset = offset # Add any remaining text if last_offset < len(text): result.append(text[last_offset:]) return "".join(result) class ExtractorComparer: def __init__(self): self.json_files = [] self.current_index = 0 self.current_data = None self.temp_pdf_path = None self.current_pdf_bytes = None def load_files(self, directory_path): """Load all JSON files from the specified directory.""" self.json_files = [] try: for filename in os.listdir(directory_path): if filename.endswith('.json') or filename.endswith('.jsonl'): self.json_files.append(os.path.join(directory_path, filename)) if self.json_files: self.current_index = 0 file_progress, annotation_status = self.get_progress_info() return file_progress, annotation_status else: return "No JSON files found", "No files loaded" except Exception as e: return f"Error loading files: {str(e)}", "Error" def load_current_file(self): """Load the current JSON file data.""" if not self.json_files: return None, "N/A", "N/A" try: with open(self.json_files[self.current_index], 'r') as f: self.current_data = json.load(f) # Extract PDF bytes from pdf_plumber pdf_bytes = None debug_info = "" if 'pdf_plumber' in self.current_data: plumber_data = self.current_data['pdf_plumber'] if 'media' in plumber_data and plumber_data['media'] and isinstance(plumber_data['media'], list) and len(plumber_data['media']) > 0: media_item = plumber_data['media'][0] if 'media_bytes' in media_item and media_item['media_bytes']: try: pdf_bytes = base64.b64decode(media_item['media_bytes']) self.current_pdf_bytes = pdf_bytes except Exception as e: debug_info = f"Error decoding media_bytes: {str(e)}" # Create temporary file for the PDF if we have bytes if pdf_bytes: if self.temp_pdf_path: try: os.remove(self.temp_pdf_path) except: pass with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as temp_file: temp_file.write(pdf_bytes) self.temp_pdf_path = temp_file.name # Convert to base64 for passing to the frontend base64_pdf = base64.b64encode(pdf_bytes).decode('utf-8') # Generate progress information file_progress, annotation_status = self.get_progress_info() return base64_pdf, file_progress, annotation_status else: file_progress, annotation_status = self.get_progress_info() return None, file_progress, annotation_status except Exception as e: return None, "Error loading file", "No annotation" def get_progress_info(self): """Generate progress information and annotation status.""" if not self.json_files: return "No files loaded", "No annotation" current_file = self.json_files[self.current_index] filename = Path(current_file).name # File progress information file_progress = f"File {self.current_index + 1} of {len(self.json_files)}: {filename}" # Check if this file has been annotated with a best extractor best_extractor_file = os.path.splitext(current_file)[0] + "_best.txt" annotation_status = "Not annotated" if os.path.exists(best_extractor_file): try: with open(best_extractor_file, 'r') as f: best_extractor = f.read().strip() annotation_status = f"Best extractor: {best_extractor}" except: pass # Count total annotated files annotated_count = 0 for json_file in self.json_files: best_file = os.path.splitext(json_file)[0] + "_best.txt" if os.path.exists(best_file): annotated_count += 1 file_progress = f"{file_progress} (Annotated: {annotated_count}/{len(self.json_files)})" return file_progress, annotation_status def get_extractor_text(self, extractor_name): """Get text with page breaks for the specified extractor.""" if not self.current_data or extractor_name not in self.current_data: return "" extractor_data = self.current_data[extractor_name] if 'text' not in extractor_data: return f"No text found for {extractor_name}" text = extractor_data.get('text', '') # Get page offsets page_offsets = [] if 'media' in extractor_data and extractor_data['media'] and len(extractor_data['media']) > 0: media_item = extractor_data['media'][0] if 'metadata' in media_item and 'pdf_metadata' in media_item['metadata'] and 'page_offsets' in media_item['metadata']['pdf_metadata']: page_offsets = media_item['metadata']['pdf_metadata']['page_offsets'] return add_page_breaks(text, page_offsets) def next_pdf(self): """Load the next PDF in the list.""" if not self.json_files: return None, "N/A", "N/A" self.current_index = (self.current_index + 1) % len(self.json_files) return self.load_current_file() def prev_pdf(self): """Load the previous PDF in the list.""" if not self.json_files: return None, "N/A", "N/A" self.current_index = (self.current_index - 1) % len(self.json_files) return self.load_current_file() def set_best_extractor(self, extractor_name): """Record that this extractor is the best for the current file.""" if not self.json_files or not self.current_data: return "N/A", "N/A" try: # Create a record about the best extractor result_file = os.path.splitext(self.json_files[self.current_index])[0] + "_best.txt" with open(result_file, 'w') as f: f.write(extractor_name) # Get updated progress info after annotation file_progress, annotation_status = self.get_progress_info() return file_progress, annotation_status except Exception as e: return "Error saving annotation", "No annotation" def create_interface(): comparer = ExtractorComparer() # Custom CSS for basic font in text areas custom_css = """ .extraction-text textarea { font-family: Arial, Helvetica, sans-serif !important; font-size: 14px !important; line-height: 1.5 !important; } """ with gr.Blocks(title="PDF Extractor Comparer", theme="soft", css=custom_css) as demo: gr.Markdown("## PDF Extractor Comparer") with gr.Row(): directory_input = gr.Textbox( label="Path to JSON Directory", placeholder="e.g., /path/to/your/json/files" ) load_button = gr.Button("Load PDFs", variant="primary") # Main layout: PDF viewer on left, status and controls on right with gr.Row(): # Left column: PDF viewer with gr.Column(scale=3): # PDF viewer using iframe with JavaScript handling pdf_viewer_html = gr.HTML( label="PDF Document", value='''
Click "Load PDFs" to start viewing documents.
''' ) # Hidden component to store the Base64 PDF data pdf_data_hidden = gr.Textbox(visible=False, elem_id="pdf_base64_data") # Right column: Progress and controls with gr.Column(scale=1): # Progress information file_progress_output = gr.Textbox(label="File Progress", interactive=False) annotation_status_output = gr.Textbox(label="Annotation Status", interactive=False) # Navigation with gr.Row(): prev_button = gr.Button("⬅️ Previous", elem_id="prev_button") next_button = gr.Button("Next ➡️", elem_id="next_button") # Best extractor selection gr.Markdown("### Select Best Extractor") extractor_buttons = [] for extractor in EXTRACTORS: button = gr.Button(extractor, variant="secondary") extractor_buttons.append(button) button.click( comparer.set_best_extractor, inputs=[gr.Textbox(value=extractor, visible=False)], outputs=[file_progress_output, annotation_status_output] ) # Extractors section below the PDF gr.Markdown("### Extractor Comparison") # Extractor dropdowns with gr.Row(): extractor1_dropdown = gr.Dropdown( choices=EXTRACTORS, label="Extractor 1", value=EXTRACTORS[0] if EXTRACTORS else None ) extractor2_dropdown = gr.Dropdown( choices=EXTRACTORS, label="Extractor 2", value=EXTRACTORS[1] if len(EXTRACTORS) > 1 else EXTRACTORS[0] if EXTRACTORS else None ) # Extractor text outputs with applied class for styling with gr.Row(): extractor1_text = gr.Textbox( label="Extractor 1 Output", lines=15, elem_classes=["extraction-text"] ) extractor2_text = gr.Textbox( label="Extractor 2 Output", lines=15, elem_classes=["extraction-text"] ) # Event handlers load_button.click( comparer.load_files, inputs=[directory_input], outputs=[file_progress_output, annotation_status_output] ).then( comparer.load_current_file, outputs=[pdf_data_hidden, file_progress_output, annotation_status_output] ).then( comparer.get_extractor_text, inputs=[extractor1_dropdown], outputs=[extractor1_text] ).then( comparer.get_extractor_text, inputs=[extractor2_dropdown], outputs=[extractor2_text] ) prev_button.click( comparer.prev_pdf, outputs=[pdf_data_hidden, file_progress_output, annotation_status_output] ).then( comparer.get_extractor_text, inputs=[extractor1_dropdown], outputs=[extractor1_text] ).then( comparer.get_extractor_text, inputs=[extractor2_dropdown], outputs=[extractor2_text] ) next_button.click( comparer.next_pdf, outputs=[pdf_data_hidden, file_progress_output, annotation_status_output] ).then( comparer.get_extractor_text, inputs=[extractor1_dropdown], outputs=[extractor1_text] ).then( comparer.get_extractor_text, inputs=[extractor2_dropdown], outputs=[extractor2_text] ) extractor1_dropdown.change( comparer.get_extractor_text, inputs=[extractor1_dropdown], outputs=[extractor1_text] ) extractor2_dropdown.change( comparer.get_extractor_text, inputs=[extractor2_dropdown], outputs=[extractor2_text] ) # JavaScript for PDF handling demo.load( fn=None, js=""" // Function to safely setup the MutationObserver for the PDF data function setupPdfDataObserver() { console.log('Setting up PDF data observer...'); // Wait for Gradio components to fully render setTimeout(() => { try { const targetNode = document.getElementById('pdf_base64_data'); if (!targetNode) { console.error('PDF data container not found!'); return; } // Find the textarea within the Gradio component const hiddenTextArea = targetNode.querySelector('textarea'); if (!hiddenTextArea) { console.error('Hidden textarea not found within the container!'); return; } console.log('Found hidden textarea to observe'); // Setup observer configuration const observerConfig = { characterData: true, childList: true, subtree: true, attributes: true }; // Create and attach the observer const observer = new MutationObserver(function(mutationsList) { console.log('Mutation detected, checking textarea value'); if (hiddenTextArea.value && hiddenTextArea.value.length > 100) { console.log('Valid value found in textarea, displaying PDF'); displayPdfBlob(hiddenTextArea.value); } }); // Observe the textarea itself, not its parent observer.observe(hiddenTextArea, observerConfig); console.log('MutationObserver attached to textarea'); // Also check initial value if (hiddenTextArea.value && hiddenTextArea.value.length > 100) { console.log('Initial valid value found, displaying PDF'); displayPdfBlob(hiddenTextArea.value); } } catch (error) { console.error('Error setting up observer:', error); } }, 1000); // Wait 1 second for components to render } // Function to display PDF from base64 data function displayPdfBlob(base64Data) { try { // Get iframe and fallback elements const iframe = document.getElementById('pdf-iframe'); const fallback = document.getElementById('pdf-fallback'); if (!iframe || !fallback) { console.error('PDF viewer elements not found'); return; } // Convert base64 to binary const binaryString = atob(base64Data); const len = binaryString.length; const bytes = new Uint8Array(len); for (let i = 0; i < len; i++) { bytes[i] = binaryString.charCodeAt(i); } // Create blob and URL const blob = new Blob([bytes], { type: 'application/pdf' }); const objectUrl = URL.createObjectURL(blob); // Update iframe iframe.src = objectUrl; // Hide fallback message fallback.style.display = 'none'; // Log success console.log('PDF displayed successfully'); } catch (error) { console.error('Error displaying PDF:', error); } } // Initialize the observer after everything is loaded window.addEventListener('load', function() { console.log('Window loaded, initializing PDF observer...'); setupPdfDataObserver(); }); // Also setup when Gradio mounts the component document.addEventListener('DOMContentLoaded', function() { console.log('DOM loaded, waiting for Gradio components...'); // Wait a bit longer for Gradio components to mount setTimeout(setupPdfDataObserver, 2000); }); """ ) return demo if __name__ == "__main__": demo = create_interface() demo.launch()