new-project / pdf_json.py
amit01Xindus's picture
Upload 8 files
96c003e verified
import os
import base64
import json
import requests
from typing import Dict, List, Any, Optional
import fitz # PyMuPDF
from PIL import Image
import io
import re
from dataclasses import dataclass, asdict
from pathlib import Path
from datetime import datetime
@dataclass
class TextBlock:
text: str
x: float
y: float
width: float
height: float
font_size: float
font_name: str
is_bold: bool = False
is_italic: bool = False
block_id: str = ""
def to_dict(self) -> Dict[str, Any]:
"""Convert TextBlock to dictionary"""
return asdict(self)
@dataclass
class ImageData:
index: int
base64_data: str
bbox: tuple
width: float
height: float
format: str = "PNG"
def to_dict(self) -> Dict[str, Any]:
"""Convert ImageData to dictionary"""
return asdict(self)
@dataclass
class TableData:
bbox: tuple
data: List[List[str]]
rows: int
columns: int
def to_dict(self) -> Dict[str, Any]:
"""Convert TableData to dictionary"""
return asdict(self)
@dataclass
class PageData:
page_number: int
text_blocks: List[TextBlock]
images: List[ImageData]
tables: List[TableData]
page_width: float
page_height: float
word_count: int = 0
character_count: int = 0
def to_dict(self) -> Dict[str, Any]:
"""Convert PageData to dictionary"""
return {
"page_number": self.page_number,
"text_blocks": [block.to_dict() for block in self.text_blocks],
"images": [img.to_dict() for img in self.images],
"tables": [table.to_dict() for table in self.tables],
"page_width": self.page_width,
"page_height": self.page_height,
"word_count": self.word_count,
"character_count": self.character_count
}
class PDFToJSONConverter:
def __init__(self, huggingface_token: str = None):
self.hf_token = huggingface_token
self.hf_headers = {
"Authorization": f"Bearer {huggingface_token}" if huggingface_token else None
}
self.models = {
"document_layout": "microsoft/layoutlm-base-uncased",
"table_detection": "microsoft/table-transformer-detection",
"ocr": "microsoft/trocr-base-printed",
"math_detection": "facebook/detr-resnet-50"
}
self.hf_inference_url = "https://api-inference.huggingface.co/models"
def pdf_to_base64(self, pdf_path: str) -> str:
"""Convert PDF file to base64 string"""
try:
with open(pdf_path, "rb") as pdf_file:
return base64.b64encode(pdf_file.read()).decode('utf-8')
except Exception as e:
raise Exception(f"Error converting PDF to base64: {str(e)}")
def extract_pdf_content(self, pdf_path: str) -> Dict[str, Any]:
"""Extract all content from PDF and return structured data"""
doc = None
try:
if not os.path.exists(pdf_path):
raise FileNotFoundError(f"PDF file not found: {pdf_path}")
doc = fitz.open(pdf_path)
if doc is None:
raise RuntimeError("Failed to open PDF document")
if doc.page_count == 0:
raise ValueError("PDF document has no pages")
print(f"πŸ“„ PDF opened successfully: {doc.page_count} pages")
pages_data = []
document_stats = {
"total_pages": doc.page_count,
"total_words": 0,
"total_characters": 0,
"total_images": 0,
"total_tables": 0
}
for page_num in range(doc.page_count):
try:
page = doc[page_num]
print(f"πŸ”„ Processing page {page_num + 1}/{doc.page_count}")
# Extract text blocks
text_blocks = []
try:
page_dict = page.get_text("dict")
text_blocks = self._extract_text_blocks_from_dict(page_dict, page_num)
except Exception as e:
print(f"⚠️ Dict method failed for page {page_num + 1}, falling back to simple text extraction: {e}")
text_blocks = self._extract_text_blocks_simple(page, page_num)
# Extract images
images = self._extract_images_safely(page, doc, page_num)
# Extract tables
tables = self._detect_tables_safely(page)
# Get page dimensions
page_rect = page.rect
# Calculate statistics
page_text = " ".join([block.text for block in text_blocks])
word_count = len(page_text.split())
char_count = len(page_text)
# Create page data
page_data = PageData(
page_number=page_num + 1,
text_blocks=text_blocks,
images=images,
tables=tables,
page_width=page_rect.width,
page_height=page_rect.height,
word_count=word_count,
character_count=char_count
)
pages_data.append(page_data)
# Update document statistics
document_stats["total_words"] += word_count
document_stats["total_characters"] += char_count
document_stats["total_images"] += len(images)
document_stats["total_tables"] += len(tables)
except Exception as e:
print(f"❌ Error processing page {page_num + 1}: {e}")
# Create empty page data for failed pages
empty_page = PageData(
page_number=page_num + 1,
text_blocks=[],
images=[],
tables=[],
page_width=595,
page_height=842,
word_count=0,
character_count=0
)
pages_data.append(empty_page)
result = {
"document_info": {
"filename": os.path.basename(pdf_path),
"file_size": os.path.getsize(pdf_path),
"conversion_timestamp": self._get_current_timestamp(),
"converter_version": "1.0.0"
},
"document_statistics": document_stats,
"pages": [page.to_dict() for page in pages_data]
}
return result
except Exception as e:
raise Exception(f"Error extracting PDF content: {str(e)}")
finally:
if doc is not None:
try:
doc.close()
print("βœ… PDF document closed successfully")
except Exception as e:
print(f"⚠️ Error closing PDF document: {e}")
def _extract_text_blocks_from_dict(self, page_dict: dict, page_num: int) -> List[TextBlock]:
"""Extract text blocks from page dictionary with detailed formatting"""
text_blocks = []
for block_idx, block in enumerate(page_dict.get("blocks", [])):
if "lines" not in block:
continue
for line_idx, line in enumerate(block["lines"]):
for span_idx, span in enumerate(line["spans"]):
text_content = span.get("text", "").strip()
if text_content:
bbox = span["bbox"]
font_info = {
"size": span.get("size", 12),
"font": span.get("font", "Arial"),
"is_bold": "bold" in span.get("font", "").lower() or span.get("flags", 0) & 16,
"is_italic": "italic" in span.get("font", "").lower() or span.get("flags", 0) & 2
}
text_block = TextBlock(
text=text_content,
x=round(bbox[0], 2),
y=round(bbox[1], 2),
width=round(bbox[2] - bbox[0], 2),
height=round(bbox[3] - bbox[1], 2),
font_size=round(font_info["size"], 2),
font_name=font_info["font"],
is_bold=font_info["is_bold"],
is_italic=font_info["is_italic"],
block_id=f"p{page_num}-b{block_idx}-l{line_idx}-s{span_idx}"
)
text_blocks.append(text_block)
return text_blocks
def _extract_text_blocks_simple(self, page, page_num: int) -> List[TextBlock]:
"""Fallback method for text extraction"""
text_blocks = []
try:
blocks_data = page.get_text("blocks")
for block_idx, block in enumerate(blocks_data):
if block[6] == 0: # Text block
text = block[4].strip()
if text:
x0, y0, x1, y1 = block[0], block[1], block[2], block[3]
lines = text.split('\n')
line_height = (y1 - y0) / max(len(lines), 1)
for line_idx, line in enumerate(lines):
if line.strip():
text_block = TextBlock(
text=line.strip(),
x=round(x0, 2),
y=round(y0 + (line_idx * line_height), 2),
width=round(x1 - x0, 2),
height=round(line_height, 2),
font_size=12.0,
font_name="Arial",
is_bold=False,
is_italic=False,
block_id=f"p{page_num}-simple-b{block_idx}-l{line_idx}"
)
text_blocks.append(text_block)
except Exception as e:
print(f"⚠️ Simple text block extraction failed: {e}")
return text_blocks
def _extract_images_safely(self, page, doc, page_num) -> List[ImageData]:
"""Extract images from page and return structured data"""
images = []
try:
image_list = page.get_images(full=True)
for img_index, img_info in enumerate(image_list):
try:
xref = img_info[0]
# Get image rectangles
img_rects = [r for r in page.get_image_rects(xref)]
if not img_rects:
continue
bbox = img_rects[0]
# Extract image data
pix = fitz.Pixmap(doc, xref)
if pix.n - pix.alpha < 4: # Valid image
img_data = pix.tobytes("png")
img_base64 = base64.b64encode(img_data).decode()
image_data = ImageData(
index=img_index,
base64_data=img_base64,
bbox=(round(bbox.x0, 2), round(bbox.y0, 2),
round(bbox.x1, 2), round(bbox.y1, 2)),
width=round(bbox.x1 - bbox.x0, 2),
height=round(bbox.y1 - bbox.y0, 2),
format="PNG"
)
images.append(image_data)
pix = None
except Exception as e:
print(f"⚠️ Error extracting image {img_index} on page {page_num+1}: {e}")
continue
except Exception as e:
print(f"⚠️ General error in image extraction for page {page_num+1}: {e}")
return images
def _detect_tables_safely(self, page) -> List[TableData]:
"""Extract tables from page and return structured data"""
tables = []
try:
tabs = page.find_tables()
for tab_index, tab in enumerate(tabs):
try:
table_data = tab.extract()
if table_data:
# Clean table data
cleaned_data = []
for row in table_data:
cleaned_row = [str(cell).strip() if cell else "" for cell in row]
if any(cleaned_row): # Only add non-empty rows
cleaned_data.append(cleaned_row)
if cleaned_data:
table_obj = TableData(
bbox=(round(tab.bbox.x0, 2), round(tab.bbox.y0, 2),
round(tab.bbox.x1, 2), round(tab.bbox.y1, 2)),
data=cleaned_data,
rows=len(cleaned_data),
columns=max(len(row) for row in cleaned_data) if cleaned_data else 0
)
tables.append(table_obj)
except Exception as e:
print(f"⚠️ Error extracting table {tab_index}: {e}")
continue
except Exception as e:
print(f"⚠️ General error in table detection: {e}")
return tables
def convert_to_json(self, pdf_content: Dict[str, Any], output_path: str = None,
pretty_print: bool = True, include_base64_images: bool = True) -> str:
"""Convert PDF content to JSON format"""
print("πŸ”„ Converting to JSON format...")
try:
# Create a copy of the content for modification
json_content = pdf_content.copy()
# Add metadata
json_content["conversion_options"] = {
"pretty_print": pretty_print,
"include_base64_images": include_base64_images,
"json_schema_version": "1.0"
}
# Optionally remove base64 image data to reduce file size
if not include_base64_images:
for page in json_content["pages"]:
for image in page["images"]:
image["base64_data"] = "[Base64 data removed - set include_base64_images=True to include]"
# Convert to JSON string
if pretty_print:
json_string = json.dumps(json_content, indent=2, ensure_ascii=False)
else:
json_string = json.dumps(json_content, ensure_ascii=False)
# Save to file if output path provided
if output_path:
try:
Path(output_path).parent.mkdir(parents=True, exist_ok=True)
with open(output_path, 'w', encoding='utf-8') as f:
f.write(json_string)
print(f"βœ… JSON saved to: {output_path}")
print(f"πŸ“Š File size: {len(json_string):,} characters")
except Exception as e:
print(f"⚠️ Error saving JSON to {output_path}: {e}")
return json_string
except Exception as e:
raise Exception(f"Error converting to JSON: {str(e)}")
def create_json_summary(self, pdf_content: Dict[str, Any]) -> Dict[str, Any]:
"""Create a summary of the PDF content without full data"""
summary = {
"document_info": pdf_content.get("document_info", {}),
"document_statistics": pdf_content.get("document_statistics", {}),
"page_summaries": []
}
for page in pdf_content.get("pages", []):
page_summary = {
"page_number": page["page_number"],
"text_blocks_count": len(page["text_blocks"]),
"images_count": len(page["images"]),
"tables_count": len(page["tables"]),
"word_count": page["word_count"],
"character_count": page["character_count"],
"page_dimensions": {
"width": page["page_width"],
"height": page["page_height"]
},
"sample_text": " ".join([block["text"] for block in page["text_blocks"][:3]])[:200] + "..." if page["text_blocks"] else ""
}
summary["page_summaries"].append(page_summary)
return summary
def _get_current_timestamp(self) -> str:
"""Get current timestamp as string"""
return datetime.now().strftime("%Y-%m-%d %H:%M:%S")
def process_pdf_to_json(self, pdf_path: str, output_path: str = None,
pretty_print: bool = True, include_base64_images: bool = True,
create_summary: bool = False, use_hf_models: bool = False) -> str:
"""Main method to process PDF and convert to JSON"""
print(f"πŸš€ Processing PDF to JSON: {pdf_path}")
if not os.path.exists(pdf_path):
raise FileNotFoundError(f"PDF file not found: {pdf_path}")
print("πŸ“„ Extracting PDF content...")
pdf_content = self.extract_pdf_content(pdf_path)
if use_hf_models and self.hf_token:
print("πŸ€– Attempting to enhance with Hugging Face models...")
try:
print("Note: Hugging Face model integration requires further implementation.")
except Exception as e:
print(f"⚠️ Hugging Face enhancement failed: {e}")
print("πŸ”„ Converting to JSON...")
json_content = self.convert_to_json(
pdf_content,
output_path,
pretty_print,
include_base64_images
)
# Create summary file if requested
if create_summary and output_path:
summary_path = output_path.replace('.json', '_summary.json')
summary_data = self.create_json_summary(pdf_content)
summary_json = json.dumps(summary_data, indent=2, ensure_ascii=False)
try:
with open(summary_path, 'w', encoding='utf-8') as f:
f.write(summary_json)
print(f"βœ… Summary JSON saved to: {summary_path}")
except Exception as e:
print(f"⚠️ Error saving summary: {e}")
print("βœ… Processing complete!")
return json_content
def main():
"""Main function to demonstrate PDF to JSON conversion"""
# Set your Hugging Face token if needed
HF_TOKEN = os.getenv("HF_API_TOKEN")
# Initialize converter
converter = PDFToJSONConverter(huggingface_token=HF_TOKEN)
# Define paths
pdf_path = "new-pdf.pdf" # Change this to your PDF file path
output_path = "converted_document.json" # Output JSON file path
try:
# Convert PDF to JSON
json_content = converter.process_pdf_to_json(
pdf_path=pdf_path,
output_path=output_path,
pretty_print=True, # Format JSON with indentation
include_base64_images=True, # Include image data (set False to reduce file size)
create_summary=True, # Create additional summary file
use_hf_models=False # Set to True if you want to use HuggingFace models
)
print(f"βœ… Successfully converted '{pdf_path}' to '{output_path}'")
print(f"πŸ“Š JSON length: {len(json_content):,} characters")
print(f"πŸ“„ Open '{output_path}' to view the structured JSON data!")
# Optional: Print first 500 characters of JSON as preview
print("\nπŸ“‹ JSON Preview (first 500 characters):")
print("-" * 50)
print(json_content[:500] + "..." if len(json_content) > 500 else json_content)
except FileNotFoundError as e:
print(f"❌ Error: {e}")
print("Please ensure the PDF file exists at the specified path.")
except Exception as e:
print(f"❌ An unexpected error occurred: {str(e)}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
main()