Spaces:
Sleeping
Sleeping
import os | |
import pandas as pd | |
import fitz # PyMuPDF | |
import openpyxl | |
from openpyxl.utils.dataframe import dataframe_to_rows | |
from openpyxl.styles import Font, PatternFill, Border, Side, Alignment | |
from dataclasses import dataclass | |
from typing import List, Dict, Any, Tuple, Optional | |
import re | |
from pathlib import Path | |
import logging | |
from datetime import datetime | |
import numpy as np | |
# Optional imports with graceful fallback | |
try: | |
import camelot # For advanced table extraction | |
CAMELOT_AVAILABLE = True | |
except ImportError: | |
CAMELOT_AVAILABLE = False | |
print("β οΈ Camelot not installed. Run: pip install camelot-py[cv]") | |
try: | |
import tabula # Alternative table extraction | |
TABULA_AVAILABLE = True | |
except ImportError: | |
TABULA_AVAILABLE = False | |
print("β οΈ Tabula not installed. Run: pip install tabula-py") | |
# Set up logging | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
logger = logging.getLogger(__name__) | |
class TextBlock: | |
text: str | |
x: float | |
y: float | |
width: float | |
height: float | |
font_size: float | |
font_name: str | |
is_bold: bool = False | |
is_italic: bool = False | |
page_num: int = 1 | |
block_id: str = "" | |
class TableData: | |
data: List[List[str]] | |
bbox: Tuple[float, float, float, float] | |
page_num: int | |
confidence: float = 0.0 | |
has_header: bool = True | |
class PDFToExcelConverter: | |
""" | |
Enhanced PDF to Excel converter with multiple extraction methods | |
for better accuracy and handling of complex documents. | |
""" | |
def __init__(self): | |
# Check available extraction methods | |
available_methods = ['pymupdf'] # Always available | |
if CAMELOT_AVAILABLE: | |
available_methods.append('camelot') | |
if TABULA_AVAILABLE: | |
available_methods.append('tabula') | |
self.extraction_methods = available_methods | |
self.output_formats = { | |
'separate_sheets': 'Each table and text section on separate sheets', | |
'combined': 'All content combined logically', | |
'structured': 'Maintain document structure with proper formatting' | |
} | |
# Log available methods | |
logger.info(f"Available extraction methods: {', '.join(available_methods)}") | |
def extract_text_blocks_advanced(self, page, page_num: int) -> List[TextBlock]: | |
""" | |
Advanced text extraction with better formatting detection | |
""" | |
text_blocks = [] | |
try: | |
# Method 1: Dictionary-based extraction (most detailed) | |
page_dict = page.get_text("dict") | |
for block_idx, block in enumerate(page_dict.get("blocks", [])): | |
if block.get("type", 1) != 0: # Skip non-text blocks | |
continue | |
for line_idx, line in enumerate(block.get("lines", [])): | |
for span_idx, span in enumerate(line.get("spans", [])): | |
text_content = span.get("text", "").strip() | |
if not text_content: | |
continue | |
bbox = span["bbox"] | |
flags = span.get("flags", 0) | |
# Enhanced font detection | |
font_name = span.get("font", "Arial") | |
font_size = span.get("size", 12) | |
is_bold = bool(flags & 16) or "bold" in font_name.lower() | |
is_italic = bool(flags & 2) or "italic" in font_name.lower() | |
text_block = TextBlock( | |
text=text_content, | |
x=bbox[0], y=bbox[1], | |
width=bbox[2] - bbox[0], | |
height=bbox[3] - bbox[1], | |
font_size=font_size, | |
font_name=font_name, | |
is_bold=is_bold, | |
is_italic=is_italic, | |
page_num=page_num, | |
block_id=f"p{page_num}_b{block_idx}_l{line_idx}_s{span_idx}" | |
) | |
text_blocks.append(text_block) | |
except Exception as e: | |
logger.warning(f"Advanced text extraction failed for page {page_num}: {e}") | |
# Fallback to simple extraction | |
text_blocks = self._extract_text_simple_fallback(page, page_num) | |
return text_blocks | |
def _extract_text_simple_fallback(self, page, page_num: int) -> List[TextBlock]: | |
""" | |
Fallback text extraction method | |
""" | |
text_blocks = [] | |
try: | |
text = page.get_text() | |
if text.strip(): | |
# Create a single text block for the entire page content | |
rect = page.rect | |
text_block = TextBlock( | |
text=text.strip(), | |
x=0, y=0, | |
width=rect.width, | |
height=rect.height, | |
font_size=12, | |
font_name="Arial", | |
page_num=page_num, | |
block_id=f"p{page_num}_fallback" | |
) | |
text_blocks.append(text_block) | |
except Exception as e: | |
logger.error(f"Fallback text extraction failed for page {page_num}: {e}") | |
return text_blocks | |
def extract_tables_multiple_methods(self, pdf_path: str, page_num: int) -> List[TableData]: | |
""" | |
Extract tables using multiple methods and combine results | |
""" | |
all_tables = [] | |
# Method 1: PyMuPDF built-in table detection | |
tables_pymupdf = self._extract_tables_pymupdf(pdf_path, page_num) | |
all_tables.extend(tables_pymupdf) | |
# Method 2: Camelot (if available) | |
if CAMELOT_AVAILABLE: | |
try: | |
tables_camelot = self._extract_tables_camelot(pdf_path, page_num) | |
all_tables.extend(tables_camelot) | |
except Exception as e: | |
logger.warning(f"Camelot extraction failed: {e}") | |
# Method 3: Tabula (if available) | |
if TABULA_AVAILABLE: | |
try: | |
tables_tabula = self._extract_tables_tabula(pdf_path, page_num) | |
all_tables.extend(tables_tabula) | |
except Exception as e: | |
logger.warning(f"Tabula extraction failed: {e}") | |
# Remove duplicates and return best tables | |
return self._deduplicate_tables(all_tables) | |
def _extract_tables_pymupdf(self, pdf_path: str, page_num: int) -> List[TableData]: | |
""" | |
Extract tables using PyMuPDF | |
""" | |
tables = [] | |
try: | |
doc = fitz.open(pdf_path) | |
page = doc[page_num - 1] # Convert to 0-based index | |
detected_tables = page.find_tables() | |
for i, table in enumerate(detected_tables): | |
try: | |
table_data = table.extract() | |
if table_data and len(table_data) > 0: | |
# Clean the table data | |
cleaned_data = [] | |
for row in table_data: | |
cleaned_row = [] | |
for cell in row: | |
cell_text = str(cell).strip() if cell else "" | |
cleaned_row.append(cell_text) | |
if any(cleaned_row): # Only add non-empty rows | |
cleaned_data.append(cleaned_row) | |
if cleaned_data: | |
tables.append(TableData( | |
data=cleaned_data, | |
bbox=table.bbox, | |
page_num=page_num, | |
confidence=0.8, # PyMuPDF generally reliable | |
has_header=True | |
)) | |
except Exception as e: | |
logger.warning(f"Error extracting PyMuPDF table {i}: {e}") | |
doc.close() | |
except Exception as e: | |
logger.error(f"PyMuPDF table extraction failed: {e}") | |
return tables | |
def _extract_tables_camelot(self, pdf_path: str, page_num: int) -> List[TableData]: | |
""" | |
Extract tables using Camelot (only if available) | |
""" | |
if not CAMELOT_AVAILABLE: | |
return [] | |
tables = [] | |
try: | |
# Camelot works with page numbers (1-based) | |
camelot_tables = camelot.read_pdf(pdf_path, pages=str(page_num), flavor='lattice') | |
for i, table in enumerate(camelot_tables): | |
df = table.df | |
if not df.empty: | |
# Convert DataFrame to list of lists | |
table_data = df.values.tolist() | |
# Add headers if they exist | |
if not df.columns.empty: | |
headers = df.columns.tolist() | |
table_data.insert(0, headers) | |
tables.append(TableData( | |
data=table_data, | |
bbox=(0, 0, 100, 100), # Camelot doesn't provide bbox | |
page_num=page_num, | |
confidence=table.accuracy / 100.0 if hasattr(table, 'accuracy') else 0.7, | |
has_header=True | |
)) | |
except Exception as e: | |
logger.warning(f"Camelot extraction failed: {e}") | |
return tables | |
def _extract_tables_tabula(self, pdf_path: str, page_num: int) -> List[TableData]: | |
""" | |
Extract tables using Tabula (only if available) | |
""" | |
if not TABULA_AVAILABLE: | |
return [] | |
tables = [] | |
try: | |
# Tabula works with page numbers (1-based) | |
tabula_tables = tabula.read_pdf(pdf_path, pages=page_num, multiple_tables=True) | |
for i, df in enumerate(tabula_tables): | |
if not df.empty: | |
# Convert DataFrame to list of lists | |
table_data = df.fillna('').values.tolist() | |
# Add headers | |
headers = df.columns.tolist() | |
table_data.insert(0, headers) | |
tables.append(TableData( | |
data=table_data, | |
bbox=(0, 0, 100, 100), # Tabula doesn't provide bbox | |
page_num=page_num, | |
confidence=0.7, | |
has_header=True | |
)) | |
except Exception as e: | |
logger.warning(f"Tabula extraction failed: {e}") | |
return tables | |
def _deduplicate_tables(self, tables: List[TableData]) -> List[TableData]: | |
""" | |
Remove duplicate tables by comparing content | |
""" | |
if not tables: | |
return tables | |
unique_tables = [] | |
for table in tables: | |
is_duplicate = False | |
for existing_table in unique_tables: | |
if self._tables_are_similar(table, existing_table): | |
# Keep the one with higher confidence | |
if table.confidence > existing_table.confidence: | |
unique_tables.remove(existing_table) | |
unique_tables.append(table) | |
is_duplicate = True | |
break | |
if not is_duplicate: | |
unique_tables.append(table) | |
return unique_tables | |
def _tables_are_similar(self, table1: TableData, table2: TableData, threshold: float = 0.8) -> bool: | |
""" | |
Check if two tables are similar (likely duplicates) | |
""" | |
if len(table1.data) != len(table2.data): | |
return False | |
if not table1.data or not table2.data: | |
return False | |
# Compare dimensions | |
if len(table1.data[0]) != len(table2.data[0]): | |
return False | |
# Compare content similarity | |
matching_cells = 0 | |
total_cells = len(table1.data) * len(table1.data[0]) | |
for i, (row1, row2) in enumerate(zip(table1.data, table2.data)): | |
for j, (cell1, cell2) in enumerate(zip(row1, row2)): | |
if str(cell1).strip().lower() == str(cell2).strip().lower(): | |
matching_cells += 1 | |
similarity = matching_cells / total_cells if total_cells > 0 else 0 | |
return similarity >= threshold | |
def process_pdf_to_excel(self, pdf_path: str, output_path: str, format_type: str = 'structured') -> str: | |
""" | |
Convert PDF to Excel with enhanced processing | |
""" | |
logger.info(f"Starting PDF to Excel conversion: {pdf_path}") | |
if not os.path.exists(pdf_path): | |
raise FileNotFoundError(f"PDF file not found: {pdf_path}") | |
# Extract content from PDF | |
pdf_content = self._extract_comprehensive_content(pdf_path) | |
# Create Excel workbook | |
output_path = self._create_excel_workbook(pdf_content, output_path, format_type) | |
logger.info(f"Successfully converted PDF to Excel: {output_path}") | |
return output_path | |
def _extract_comprehensive_content(self, pdf_path: str) -> Dict[str, Any]: | |
""" | |
Extract all content from PDF using multiple methods | |
""" | |
content = { | |
'pages': [], | |
'total_pages': 0, | |
'metadata': {} | |
} | |
try: | |
doc = fitz.open(pdf_path) | |
content['total_pages'] = doc.page_count | |
content['metadata'] = doc.metadata | |
logger.info(f"Processing {doc.page_count} pages...") | |
for page_num in range(doc.page_count): | |
page = doc[page_num] | |
logger.info(f"Processing page {page_num + 1}/{doc.page_count}") | |
# Extract text blocks | |
text_blocks = self.extract_text_blocks_advanced(page, page_num + 1) | |
# Extract tables using multiple methods | |
tables = self.extract_tables_multiple_methods(pdf_path, page_num + 1) | |
# Extract images (basic) | |
images = self._extract_images_basic(page, page_num + 1) | |
page_content = { | |
'page_number': page_num + 1, | |
'text_blocks': text_blocks, | |
'tables': tables, | |
'images': images, | |
'page_width': page.rect.width, | |
'page_height': page.rect.height | |
} | |
content['pages'].append(page_content) | |
doc.close() | |
except Exception as e: | |
logger.error(f"Error extracting PDF content: {e}") | |
raise | |
return content | |
def _extract_images_basic(self, page, page_num: int) -> List[Dict]: | |
""" | |
Basic image extraction for reference | |
""" | |
images = [] | |
try: | |
image_list = page.get_images() | |
for i, img in enumerate(image_list): | |
images.append({ | |
'index': i, | |
'page': page_num, | |
'bbox': img # Simplified | |
}) | |
except Exception as e: | |
logger.warning(f"Image extraction failed for page {page_num}: {e}") | |
return images | |
def _create_excel_workbook(self, content: Dict[str, Any], output_path: str, format_type: str) -> str: | |
""" | |
Create Excel workbook with proper formatting | |
""" | |
with pd.ExcelWriter(output_path, engine='openpyxl') as writer: | |
if format_type == 'structured': | |
self._create_structured_workbook(content, writer) | |
elif format_type == 'combined': | |
self._create_combined_workbook(content, writer) | |
else: # separate_sheets | |
self._create_separate_sheets_workbook(content, writer) | |
# Add summary sheet | |
self._add_summary_sheet(content, writer) | |
# Apply formatting | |
self._apply_excel_formatting(output_path) | |
return output_path | |
def _create_structured_workbook(self, content: Dict[str, Any], writer): | |
""" | |
Create structured workbook maintaining document flow | |
""" | |
for page_data in content['pages']: | |
page_num = page_data['page_number'] | |
# Process tables first | |
table_count = 0 | |
for table in page_data['tables']: | |
if table.data: | |
df = pd.DataFrame(table.data[1:], columns=table.data[0] if table.has_header else None) | |
sheet_name = f"P{page_num}_Table{table_count + 1}"[:31] | |
df.to_excel(writer, sheet_name=sheet_name, index=False) | |
table_count += 1 | |
# Process text content | |
if page_data['text_blocks']: | |
# Group text blocks by proximity and formatting | |
text_groups = self._group_text_blocks(page_data['text_blocks']) | |
for i, group in enumerate(text_groups): | |
if group['content'].strip(): | |
text_df = pd.DataFrame([{ | |
'Content': group['content'], | |
'Font_Size': group.get('font_size', 12), | |
'Is_Bold': group.get('is_bold', False), | |
'Position_X': group.get('x', 0), | |
'Position_Y': group.get('y', 0) | |
}]) | |
sheet_name = f"P{page_num}_Text{i + 1}"[:31] | |
text_df.to_excel(writer, sheet_name=sheet_name, index=False) | |
def _create_combined_workbook(self, content: Dict[str, Any], writer): | |
""" | |
Create combined workbook with all tables and text together | |
""" | |
all_tables = [] | |
all_text = [] | |
for page_data in content['pages']: | |
page_num = page_data['page_number'] | |
# Collect all tables | |
for i, table in enumerate(page_data['tables']): | |
if table.data: | |
df = pd.DataFrame(table.data[1:], columns=table.data[0] if table.has_header else None) | |
df['Source_Page'] = page_num | |
df['Table_Index'] = i + 1 | |
all_tables.append(df) | |
# Collect all text | |
text_content = '\n'.join([block.text for block in page_data['text_blocks']]) | |
if text_content.strip(): | |
all_text.append({ | |
'Page': page_num, | |
'Content': text_content.strip() | |
}) | |
# Write combined tables | |
if all_tables: | |
combined_tables = pd.concat(all_tables, ignore_index=True) | |
combined_tables.to_excel(writer, sheet_name='All_Tables', index=False) | |
# Write combined text | |
if all_text: | |
text_df = pd.DataFrame(all_text) | |
text_df.to_excel(writer, sheet_name='All_Text', index=False) | |
def _create_separate_sheets_workbook(self, content: Dict[str, Any], writer): | |
""" | |
Create workbook with each element on separate sheets | |
""" | |
table_counter = 1 | |
text_counter = 1 | |
for page_data in content['pages']: | |
page_num = page_data['page_number'] | |
# Each table gets its own sheet | |
for table in page_data['tables']: | |
if table.data: | |
df = pd.DataFrame(table.data[1:], columns=table.data[0] if table.has_header else None) | |
sheet_name = f"Table_{table_counter}"[:31] | |
df.to_excel(writer, sheet_name=sheet_name, index=False) | |
table_counter += 1 | |
# Page text gets its own sheet | |
if page_data['text_blocks']: | |
text_content = '\n'.join([block.text for block in page_data['text_blocks']]) | |
if text_content.strip(): | |
text_df = pd.DataFrame([{'Page': page_num, 'Content': text_content}]) | |
sheet_name = f"Text_{text_counter}"[:31] | |
text_df.to_excel(writer, sheet_name=sheet_name, index=False) | |
text_counter += 1 | |
def _group_text_blocks(self, text_blocks: List[TextBlock]) -> List[Dict]: | |
""" | |
Group text blocks by proximity and formatting | |
""" | |
if not text_blocks: | |
return [] | |
# Sort by position (top to bottom, left to right) | |
sorted_blocks = sorted(text_blocks, key=lambda b: (b.y, b.x)) | |
groups = [] | |
current_group = { | |
'content': '', | |
'font_size': sorted_blocks[0].font_size, | |
'is_bold': sorted_blocks[0].is_bold, | |
'x': sorted_blocks[0].x, | |
'y': sorted_blocks[0].y | |
} | |
for block in sorted_blocks: | |
# Check if block should be in current group (similar formatting and position) | |
if (abs(current_group['font_size'] - block.font_size) < 2 and | |
current_group['is_bold'] == block.is_bold): | |
current_group['content'] += ' ' + block.text | |
else: | |
# Start new group | |
if current_group['content'].strip(): | |
groups.append(current_group) | |
current_group = { | |
'content': block.text, | |
'font_size': block.font_size, | |
'is_bold': block.is_bold, | |
'x': block.x, | |
'y': block.y | |
} | |
# Add last group | |
if current_group['content'].strip(): | |
groups.append(current_group) | |
return groups | |
def _add_summary_sheet(self, content: Dict[str, Any], writer): | |
""" | |
Add summary sheet with document statistics | |
""" | |
total_tables = sum(len(page['tables']) for page in content['pages']) | |
total_text_blocks = sum(len(page['text_blocks']) for page in content['pages']) | |
summary_data = { | |
'Statistic': [ | |
'Total Pages', | |
'Total Tables', | |
'Total Text Blocks', | |
'Processing Date', | |
'Document Title' | |
], | |
'Value': [ | |
content['total_pages'], | |
total_tables, | |
total_text_blocks, | |
datetime.now().strftime('%Y-%m-%d %H:%M:%S'), | |
content['metadata'].get('title', 'Unknown') | |
] | |
} | |
summary_df = pd.DataFrame(summary_data) | |
summary_df.to_excel(writer, sheet_name='Summary', index=False) | |
def _apply_excel_formatting(self, file_path: str): | |
""" | |
Apply formatting to the Excel file | |
""" | |
try: | |
wb = openpyxl.load_workbook(file_path) | |
# Define styles | |
header_font = Font(bold=True, color="FFFFFF") | |
header_fill = PatternFill(start_color="366092", end_color="366092", fill_type="solid") | |
border = Border( | |
left=Side(style='thin'), | |
right=Side(style='thin'), | |
top=Side(style='thin'), | |
bottom=Side(style='thin') | |
) | |
for sheet_name in wb.sheetnames: | |
ws = wb[sheet_name] | |
# Format headers | |
if ws.max_row > 0: | |
for cell in ws[1]: | |
cell.font = header_font | |
cell.fill = header_fill | |
cell.alignment = Alignment(horizontal='center', vertical='center') | |
cell.border = border | |
# Auto-adjust column widths | |
for column in ws.columns: | |
max_length = 0 | |
column_letter = column[0].column_letter | |
for cell in column: | |
try: | |
if len(str(cell.value)) > max_length: | |
max_length = len(str(cell.value)) | |
except: | |
pass | |
adjusted_width = min(max_length + 2, 50) | |
ws.column_dimensions[column_letter].width = adjusted_width | |
wb.save(file_path) | |
except Exception as e: | |
logger.warning(f"Could not apply formatting: {e}") | |
# Usage example and main function | |
def install_dependencies(): | |
""" | |
Print installation instructions for missing dependencies | |
""" | |
print("π¦ INSTALLATION INSTRUCTIONS:") | |
print("=" * 50) | |
required_packages = [ | |
("PyMuPDF", "pip install PyMuPDF", True), | |
("pandas", "pip install pandas", True), | |
("openpyxl", "pip install openpyxl", True), | |
("numpy", "pip install numpy", True), | |
("camelot-py", "pip install camelot-py[cv]", CAMELOT_AVAILABLE), | |
("tabula-py", "pip install tabula-py", TABULA_AVAILABLE) | |
] | |
print("\nβ CORE PACKAGES (Required):") | |
for name, cmd, available in required_packages[:4]: | |
status = "β Installed" if available else "β Missing" | |
print(f" {name}: {status}") | |
if not available: | |
print(f" Install: {cmd}") | |
print("\nπ§ OPTIONAL PACKAGES (For better table extraction):") | |
for name, cmd, available in required_packages[4:]: | |
status = "β Installed" if available else "β Missing" | |
print(f" {name}: {status}") | |
if not available: | |
print(f" Install: {cmd}") | |
print("\nπ‘ INSTALL ALL AT ONCE:") | |
print("pip install PyMuPDF pandas openpyxl numpy camelot-py[cv] tabula-py") | |
print("\n" + "=" * 50) | |
def main(): | |
""" | |
Main function to demonstrate usage | |
""" | |
print("π Enhanced PDF to Excel Converter") | |
print("=" * 40) | |
# Show installation status | |
install_dependencies() | |
converter = PDFToExcelConverter() | |
# Example usage | |
pdf_path = "input.pdf" # Replace with your PDF path | |
output_path = "output.xlsx" # Replace with desired output path | |
try: | |
# Check if PDF file exists | |
if not os.path.exists(pdf_path): | |
print(f"\nβ PDF file not found: {pdf_path}") | |
print("Please update the 'pdf_path' variable with your actual PDF file path.") | |
return | |
print(f"\nπ Converting: {pdf_path}") | |
result = converter.process_pdf_to_excel( | |
pdf_path=pdf_path, | |
output_path=output_path, | |
format_type='structured' # Options: 'structured', 'combined', 'separate_sheets' | |
) | |
print(f"β Conversion completed successfully: {result}") | |
except Exception as e: | |
print(f"β Conversion failed: {e}") | |
print("\nπ οΈ TROUBLESHOOTING:") | |
print("1. Make sure all required packages are installed") | |
print("2. Check that your PDF file exists and is readable") | |
print("3. Ensure you have write permissions for the output directory") | |
if __name__ == "__main__": | |
main() |