Spaces:
Sleeping
Sleeping
import os | |
import base64 | |
import json | |
import requests | |
from typing import Dict, List, Any, Optional | |
import fitz # PyMuPDF | |
from PIL import Image | |
import io | |
import re | |
from dataclasses import dataclass, asdict | |
from pathlib import Path | |
from datetime import datetime | |
class TextBlock: | |
text: str | |
x: float | |
y: float | |
width: float | |
height: float | |
font_size: float | |
font_name: str | |
is_bold: bool = False | |
is_italic: bool = False | |
block_id: str = "" | |
def to_dict(self) -> Dict[str, Any]: | |
"""Convert TextBlock to dictionary""" | |
return asdict(self) | |
class ImageData: | |
index: int | |
base64_data: str | |
bbox: tuple | |
width: float | |
height: float | |
format: str = "PNG" | |
def to_dict(self) -> Dict[str, Any]: | |
"""Convert ImageData to dictionary""" | |
return asdict(self) | |
class TableData: | |
bbox: tuple | |
data: List[List[str]] | |
rows: int | |
columns: int | |
def to_dict(self) -> Dict[str, Any]: | |
"""Convert TableData to dictionary""" | |
return asdict(self) | |
class PageData: | |
page_number: int | |
text_blocks: List[TextBlock] | |
images: List[ImageData] | |
tables: List[TableData] | |
page_width: float | |
page_height: float | |
word_count: int = 0 | |
character_count: int = 0 | |
def to_dict(self) -> Dict[str, Any]: | |
"""Convert PageData to dictionary""" | |
return { | |
"page_number": self.page_number, | |
"text_blocks": [block.to_dict() for block in self.text_blocks], | |
"images": [img.to_dict() for img in self.images], | |
"tables": [table.to_dict() for table in self.tables], | |
"page_width": self.page_width, | |
"page_height": self.page_height, | |
"word_count": self.word_count, | |
"character_count": self.character_count | |
} | |
class PDFToJSONConverter: | |
def __init__(self, huggingface_token: str = None): | |
self.hf_token = huggingface_token | |
self.hf_headers = { | |
"Authorization": f"Bearer {huggingface_token}" if huggingface_token else None | |
} | |
self.models = { | |
"document_layout": "microsoft/layoutlm-base-uncased", | |
"table_detection": "microsoft/table-transformer-detection", | |
"ocr": "microsoft/trocr-base-printed", | |
"math_detection": "facebook/detr-resnet-50" | |
} | |
self.hf_inference_url = "https://api-inference.huggingface.co/models" | |
def pdf_to_base64(self, pdf_path: str) -> str: | |
"""Convert PDF file to base64 string""" | |
try: | |
with open(pdf_path, "rb") as pdf_file: | |
return base64.b64encode(pdf_file.read()).decode('utf-8') | |
except Exception as e: | |
raise Exception(f"Error converting PDF to base64: {str(e)}") | |
def extract_pdf_content(self, pdf_path: str) -> Dict[str, Any]: | |
"""Extract all content from PDF and return structured data""" | |
doc = None | |
try: | |
if not os.path.exists(pdf_path): | |
raise FileNotFoundError(f"PDF file not found: {pdf_path}") | |
doc = fitz.open(pdf_path) | |
if doc is None: | |
raise RuntimeError("Failed to open PDF document") | |
if doc.page_count == 0: | |
raise ValueError("PDF document has no pages") | |
print(f"π PDF opened successfully: {doc.page_count} pages") | |
pages_data = [] | |
document_stats = { | |
"total_pages": doc.page_count, | |
"total_words": 0, | |
"total_characters": 0, | |
"total_images": 0, | |
"total_tables": 0 | |
} | |
for page_num in range(doc.page_count): | |
try: | |
page = doc[page_num] | |
print(f"π Processing page {page_num + 1}/{doc.page_count}") | |
# Extract text blocks | |
text_blocks = [] | |
try: | |
page_dict = page.get_text("dict") | |
text_blocks = self._extract_text_blocks_from_dict(page_dict, page_num) | |
except Exception as e: | |
print(f"β οΈ Dict method failed for page {page_num + 1}, falling back to simple text extraction: {e}") | |
text_blocks = self._extract_text_blocks_simple(page, page_num) | |
# Extract images | |
images = self._extract_images_safely(page, doc, page_num) | |
# Extract tables | |
tables = self._detect_tables_safely(page) | |
# Get page dimensions | |
page_rect = page.rect | |
# Calculate statistics | |
page_text = " ".join([block.text for block in text_blocks]) | |
word_count = len(page_text.split()) | |
char_count = len(page_text) | |
# Create page data | |
page_data = PageData( | |
page_number=page_num + 1, | |
text_blocks=text_blocks, | |
images=images, | |
tables=tables, | |
page_width=page_rect.width, | |
page_height=page_rect.height, | |
word_count=word_count, | |
character_count=char_count | |
) | |
pages_data.append(page_data) | |
# Update document statistics | |
document_stats["total_words"] += word_count | |
document_stats["total_characters"] += char_count | |
document_stats["total_images"] += len(images) | |
document_stats["total_tables"] += len(tables) | |
except Exception as e: | |
print(f"β Error processing page {page_num + 1}: {e}") | |
# Create empty page data for failed pages | |
empty_page = PageData( | |
page_number=page_num + 1, | |
text_blocks=[], | |
images=[], | |
tables=[], | |
page_width=595, | |
page_height=842, | |
word_count=0, | |
character_count=0 | |
) | |
pages_data.append(empty_page) | |
result = { | |
"document_info": { | |
"filename": os.path.basename(pdf_path), | |
"file_size": os.path.getsize(pdf_path), | |
"conversion_timestamp": self._get_current_timestamp(), | |
"converter_version": "1.0.0" | |
}, | |
"document_statistics": document_stats, | |
"pages": [page.to_dict() for page in pages_data] | |
} | |
return result | |
except Exception as e: | |
raise Exception(f"Error extracting PDF content: {str(e)}") | |
finally: | |
if doc is not None: | |
try: | |
doc.close() | |
print("β PDF document closed successfully") | |
except Exception as e: | |
print(f"β οΈ Error closing PDF document: {e}") | |
def _extract_text_blocks_from_dict(self, page_dict: dict, page_num: int) -> List[TextBlock]: | |
"""Extract text blocks from page dictionary with detailed formatting""" | |
text_blocks = [] | |
for block_idx, block in enumerate(page_dict.get("blocks", [])): | |
if "lines" not in block: | |
continue | |
for line_idx, line in enumerate(block["lines"]): | |
for span_idx, span in enumerate(line["spans"]): | |
text_content = span.get("text", "").strip() | |
if text_content: | |
bbox = span["bbox"] | |
font_info = { | |
"size": span.get("size", 12), | |
"font": span.get("font", "Arial"), | |
"is_bold": "bold" in span.get("font", "").lower() or span.get("flags", 0) & 16, | |
"is_italic": "italic" in span.get("font", "").lower() or span.get("flags", 0) & 2 | |
} | |
text_block = TextBlock( | |
text=text_content, | |
x=round(bbox[0], 2), | |
y=round(bbox[1], 2), | |
width=round(bbox[2] - bbox[0], 2), | |
height=round(bbox[3] - bbox[1], 2), | |
font_size=round(font_info["size"], 2), | |
font_name=font_info["font"], | |
is_bold=font_info["is_bold"], | |
is_italic=font_info["is_italic"], | |
block_id=f"p{page_num}-b{block_idx}-l{line_idx}-s{span_idx}" | |
) | |
text_blocks.append(text_block) | |
return text_blocks | |
def _extract_text_blocks_simple(self, page, page_num: int) -> List[TextBlock]: | |
"""Fallback method for text extraction""" | |
text_blocks = [] | |
try: | |
blocks_data = page.get_text("blocks") | |
for block_idx, block in enumerate(blocks_data): | |
if block[6] == 0: # Text block | |
text = block[4].strip() | |
if text: | |
x0, y0, x1, y1 = block[0], block[1], block[2], block[3] | |
lines = text.split('\n') | |
line_height = (y1 - y0) / max(len(lines), 1) | |
for line_idx, line in enumerate(lines): | |
if line.strip(): | |
text_block = TextBlock( | |
text=line.strip(), | |
x=round(x0, 2), | |
y=round(y0 + (line_idx * line_height), 2), | |
width=round(x1 - x0, 2), | |
height=round(line_height, 2), | |
font_size=12.0, | |
font_name="Arial", | |
is_bold=False, | |
is_italic=False, | |
block_id=f"p{page_num}-simple-b{block_idx}-l{line_idx}" | |
) | |
text_blocks.append(text_block) | |
except Exception as e: | |
print(f"β οΈ Simple text block extraction failed: {e}") | |
return text_blocks | |
def _extract_images_safely(self, page, doc, page_num) -> List[ImageData]: | |
"""Extract images from page and return structured data""" | |
images = [] | |
try: | |
image_list = page.get_images(full=True) | |
for img_index, img_info in enumerate(image_list): | |
try: | |
xref = img_info[0] | |
# Get image rectangles | |
img_rects = [r for r in page.get_image_rects(xref)] | |
if not img_rects: | |
continue | |
bbox = img_rects[0] | |
# Extract image data | |
pix = fitz.Pixmap(doc, xref) | |
if pix.n - pix.alpha < 4: # Valid image | |
img_data = pix.tobytes("png") | |
img_base64 = base64.b64encode(img_data).decode() | |
image_data = ImageData( | |
index=img_index, | |
base64_data=img_base64, | |
bbox=(round(bbox.x0, 2), round(bbox.y0, 2), | |
round(bbox.x1, 2), round(bbox.y1, 2)), | |
width=round(bbox.x1 - bbox.x0, 2), | |
height=round(bbox.y1 - bbox.y0, 2), | |
format="PNG" | |
) | |
images.append(image_data) | |
pix = None | |
except Exception as e: | |
print(f"β οΈ Error extracting image {img_index} on page {page_num+1}: {e}") | |
continue | |
except Exception as e: | |
print(f"β οΈ General error in image extraction for page {page_num+1}: {e}") | |
return images | |
def _detect_tables_safely(self, page) -> List[TableData]: | |
"""Extract tables from page and return structured data""" | |
tables = [] | |
try: | |
tabs = page.find_tables() | |
for tab_index, tab in enumerate(tabs): | |
try: | |
table_data = tab.extract() | |
if table_data: | |
# Clean table data | |
cleaned_data = [] | |
for row in table_data: | |
cleaned_row = [str(cell).strip() if cell else "" for cell in row] | |
if any(cleaned_row): # Only add non-empty rows | |
cleaned_data.append(cleaned_row) | |
if cleaned_data: | |
table_obj = TableData( | |
bbox=(round(tab.bbox.x0, 2), round(tab.bbox.y0, 2), | |
round(tab.bbox.x1, 2), round(tab.bbox.y1, 2)), | |
data=cleaned_data, | |
rows=len(cleaned_data), | |
columns=max(len(row) for row in cleaned_data) if cleaned_data else 0 | |
) | |
tables.append(table_obj) | |
except Exception as e: | |
print(f"β οΈ Error extracting table {tab_index}: {e}") | |
continue | |
except Exception as e: | |
print(f"β οΈ General error in table detection: {e}") | |
return tables | |
def convert_to_json(self, pdf_content: Dict[str, Any], output_path: str = None, | |
pretty_print: bool = True, include_base64_images: bool = True) -> str: | |
"""Convert PDF content to JSON format""" | |
print("π Converting to JSON format...") | |
try: | |
# Create a copy of the content for modification | |
json_content = pdf_content.copy() | |
# Add metadata | |
json_content["conversion_options"] = { | |
"pretty_print": pretty_print, | |
"include_base64_images": include_base64_images, | |
"json_schema_version": "1.0" | |
} | |
# Optionally remove base64 image data to reduce file size | |
if not include_base64_images: | |
for page in json_content["pages"]: | |
for image in page["images"]: | |
image["base64_data"] = "[Base64 data removed - set include_base64_images=True to include]" | |
# Convert to JSON string | |
if pretty_print: | |
json_string = json.dumps(json_content, indent=2, ensure_ascii=False) | |
else: | |
json_string = json.dumps(json_content, ensure_ascii=False) | |
# Save to file if output path provided | |
if output_path: | |
try: | |
Path(output_path).parent.mkdir(parents=True, exist_ok=True) | |
with open(output_path, 'w', encoding='utf-8') as f: | |
f.write(json_string) | |
print(f"β JSON saved to: {output_path}") | |
print(f"π File size: {len(json_string):,} characters") | |
except Exception as e: | |
print(f"β οΈ Error saving JSON to {output_path}: {e}") | |
return json_string | |
except Exception as e: | |
raise Exception(f"Error converting to JSON: {str(e)}") | |
def create_json_summary(self, pdf_content: Dict[str, Any]) -> Dict[str, Any]: | |
"""Create a summary of the PDF content without full data""" | |
summary = { | |
"document_info": pdf_content.get("document_info", {}), | |
"document_statistics": pdf_content.get("document_statistics", {}), | |
"page_summaries": [] | |
} | |
for page in pdf_content.get("pages", []): | |
page_summary = { | |
"page_number": page["page_number"], | |
"text_blocks_count": len(page["text_blocks"]), | |
"images_count": len(page["images"]), | |
"tables_count": len(page["tables"]), | |
"word_count": page["word_count"], | |
"character_count": page["character_count"], | |
"page_dimensions": { | |
"width": page["page_width"], | |
"height": page["page_height"] | |
}, | |
"sample_text": " ".join([block["text"] for block in page["text_blocks"][:3]])[:200] + "..." if page["text_blocks"] else "" | |
} | |
summary["page_summaries"].append(page_summary) | |
return summary | |
def _get_current_timestamp(self) -> str: | |
"""Get current timestamp as string""" | |
return datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
def process_pdf_to_json(self, pdf_path: str, output_path: str = None, | |
pretty_print: bool = True, include_base64_images: bool = True, | |
create_summary: bool = False, use_hf_models: bool = False) -> str: | |
"""Main method to process PDF and convert to JSON""" | |
print(f"π Processing PDF to JSON: {pdf_path}") | |
if not os.path.exists(pdf_path): | |
raise FileNotFoundError(f"PDF file not found: {pdf_path}") | |
print("π Extracting PDF content...") | |
pdf_content = self.extract_pdf_content(pdf_path) | |
if use_hf_models and self.hf_token: | |
print("π€ Attempting to enhance with Hugging Face models...") | |
try: | |
print("Note: Hugging Face model integration requires further implementation.") | |
except Exception as e: | |
print(f"β οΈ Hugging Face enhancement failed: {e}") | |
print("π Converting to JSON...") | |
json_content = self.convert_to_json( | |
pdf_content, | |
output_path, | |
pretty_print, | |
include_base64_images | |
) | |
# Create summary file if requested | |
if create_summary and output_path: | |
summary_path = output_path.replace('.json', '_summary.json') | |
summary_data = self.create_json_summary(pdf_content) | |
summary_json = json.dumps(summary_data, indent=2, ensure_ascii=False) | |
try: | |
with open(summary_path, 'w', encoding='utf-8') as f: | |
f.write(summary_json) | |
print(f"β Summary JSON saved to: {summary_path}") | |
except Exception as e: | |
print(f"β οΈ Error saving summary: {e}") | |
print("β Processing complete!") | |
return json_content | |
def main(): | |
"""Main function to demonstrate PDF to JSON conversion""" | |
# Set your Hugging Face token if needed | |
HF_TOKEN = os.getenv("HF_API_TOKEN") | |
# Initialize converter | |
converter = PDFToJSONConverter(huggingface_token=HF_TOKEN) | |
# Define paths | |
pdf_path = "new-pdf.pdf" # Change this to your PDF file path | |
output_path = "converted_document.json" # Output JSON file path | |
try: | |
# Convert PDF to JSON | |
json_content = converter.process_pdf_to_json( | |
pdf_path=pdf_path, | |
output_path=output_path, | |
pretty_print=True, # Format JSON with indentation | |
include_base64_images=True, # Include image data (set False to reduce file size) | |
create_summary=True, # Create additional summary file | |
use_hf_models=False # Set to True if you want to use HuggingFace models | |
) | |
print(f"β Successfully converted '{pdf_path}' to '{output_path}'") | |
print(f"π JSON length: {len(json_content):,} characters") | |
print(f"π Open '{output_path}' to view the structured JSON data!") | |
# Optional: Print first 500 characters of JSON as preview | |
print("\nπ JSON Preview (first 500 characters):") | |
print("-" * 50) | |
print(json_content[:500] + "..." if len(json_content) > 500 else json_content) | |
except FileNotFoundError as e: | |
print(f"β Error: {e}") | |
print("Please ensure the PDF file exists at the specified path.") | |
except Exception as e: | |
print(f"β An unexpected error occurred: {str(e)}") | |
import traceback | |
traceback.print_exc() | |
if __name__ == "__main__": | |
main() |