Spaces:
Sleeping
Sleeping
import os | |
import base64 | |
import json | |
from typing import Dict, List, Any | |
import fitz | |
from PIL import Image | |
import io | |
import re | |
from dataclasses import dataclass | |
from pathlib import Path | |
from datetime import datetime | |
from docx import Document | |
from docx.shared import Inches, Pt | |
from docx.enum.text import WD_ALIGN_PARAGRAPH, WD_BREAK | |
from docx.enum.table import WD_TABLE_ALIGNMENT | |
from docx.oxml.shared import OxmlElement, qn | |
from docx.oxml.ns import nsdecls | |
from docx.oxml import parse_xml | |
import unicodedata | |
import docx | |
import camelot | |
class TextBlock: | |
text: str | |
x: float | |
y: float | |
width: float | |
height: float | |
font_size: float | |
font_name: str | |
is_bold: bool = False | |
is_italic: bool = False | |
block_id: str = "" | |
is_math: bool = False | |
class PDFToWordConverter: | |
def __init__(self, huggingface_token: str = None): | |
self.hf_token = huggingface_token | |
self.hf_headers = { | |
"Authorization": f"Bearer {huggingface_token}" if huggingface_token else None | |
} | |
self.models = { | |
"document_layout": "microsoft/layoutlm-base-uncased", | |
"table_detection": "microsoft/table-transformer-detection", | |
"ocr": "microsoft/trocr-base-printed", | |
"math_detection": "facebook/detr-resnet-50" | |
} | |
self.hf_inference_url = "https://api-inference.huggingface.co/models" | |
self.math_symbols = { | |
'√': '√', '∑': '∑', '∏': '∏', '∫': '∫', '∞': '∞', '≤': '≤', '≥': '≥', '≠': '≠', '±': '±', | |
'×': '×', '÷': '÷', 'α': 'α', 'β': 'β', 'γ': 'γ', 'δ': 'δ', 'θ': 'θ', 'λ': 'λ', 'μ': 'μ', | |
'π': 'π', 'σ': 'σ', 'φ': 'φ', 'ω': 'ω' | |
} | |
def detect_mathematical_content(self, text: str) -> bool: | |
math_patterns = [ | |
r'\d+\s*[+\-*/=]\s*\d+', r'[a-zA-Z]\s*=\s*\d+', r'\b(?:sin|cos|tan|log|ln|exp)\s*\(', | |
r'\d+\s*\^\s*\d+', r'√\d+', r'\d+/\d+', r'[∑∏∫]', r'[≤≥≠±×÷]', r'[αβγδθλμπσφω]', | |
r'\bEquation\s+\d+', r'\d+\.\d+', r'\$\d+,?\d*', r'NORMSINV', r'using Equation' | |
] | |
for pattern in math_patterns: | |
if re.search(pattern, text, re.IGNORECASE): | |
return True | |
return False | |
def preserve_mathematical_formatting(self, text: str) -> str: | |
if not text: | |
return "" | |
text = text.replace('×', '×') | |
text = text.replace('÷', '÷') | |
text = text.replace('±', '±') | |
text = text.replace('≤', '≤') | |
text = text.replace('≥', '≥') | |
text = text.replace('≠', '≠') | |
text = text.replace('√', '√') | |
text = text.replace('∑', '∑') | |
text = text.replace('∏', '∏') | |
text = text.replace('∫', '∫') | |
text = text.replace('∞', '∞') | |
text = re.sub(r'(\d+)\s*\^\s*(\d+)', r'\1^\2', text) | |
text = re.sub(r'(\w+)\s*\(\s*([^)]+)\s*\)', r'\1(\2)', text) | |
return text | |
def clean_text_for_xml(self, text: str) -> str: | |
if not text: | |
return "" | |
try: | |
text = re.sub(r'[\x00-\x08\x0B\x0C\x0E-\x1F\x7F]', '', text) | |
text = text.replace('\ufeff', '') | |
text = text.replace('\u0000', '') | |
text = unicodedata.normalize('NFKC', text) | |
printable_chars = [] | |
for char in text: | |
if char.isprintable() or char.isspace() or char in self.math_symbols: | |
printable_chars.append(char) | |
else: | |
printable_chars.append(' ') | |
text = ''.join(printable_chars) | |
text = re.sub(r'\s+', ' ', text).strip() | |
text = text.encode('utf-8', errors='ignore').decode('utf-8') | |
return self.preserve_mathematical_formatting(text) | |
except Exception: | |
return ''.join(char for char in str(text) if ord(char) < 128).strip() | |
def clean_font_name(self, font_name: str) -> str: | |
if not font_name: | |
return "Calibri" | |
try: | |
cleaned = self.clean_text_for_xml(font_name) | |
cleaned = re.sub(r'[^\w\s-]', '', cleaned) | |
if not cleaned.strip(): | |
return "Calibri" | |
return cleaned.strip() | |
except Exception: | |
return "Calibri" | |
def pdf_to_base64(self, pdf_path: str) -> str: | |
try: | |
with open(pdf_path, "rb") as pdf_file: | |
return base64.b64encode(pdf_file.read()).decode('utf-8') | |
except Exception as e: | |
raise Exception(f"Error converting PDF to base64: {str(e)}") | |
def extract_pdf_content(self, pdf_path: str) -> Dict[str, Any]: | |
doc = None | |
try: | |
if not os.path.exists(pdf_path): | |
raise FileNotFoundError(f"PDF file not found: {pdf_path}") | |
doc = fitz.open(pdf_path) | |
if doc is None: | |
raise RuntimeError("Failed to open PDF document") | |
if doc.page_count == 0: | |
raise ValueError("PDF document has no pages") | |
print(f"PDF opened successfully: {doc.page_count} pages") | |
pages_content = [] | |
for page_num in range(doc.page_count): | |
try: | |
page = doc[page_num] | |
print(f"Processing page {page_num + 1}/{doc.page_count}") | |
text_blocks = [] | |
try: | |
page_dict = page.get_text("dict") | |
text_blocks = self._extract_text_blocks_from_dict(page_dict, page_num) | |
except Exception as e: | |
print(f"Dict method failed for page {page_num + 1}, using fallback: {e}") | |
text_blocks = self._extract_text_blocks_simple(page, page_num) | |
images = self._extract_images_safely(page, doc, page_num) | |
tables = self._detect_tables_with_camelot(pdf_path, page_num) | |
page_rect = page.rect | |
pages_content.append({ | |
"page_number": page_num + 1, | |
"text_blocks": text_blocks, | |
"images": images, | |
"tables": tables, | |
"page_width": page_rect.width, | |
"page_height": page_rect.height | |
}) | |
except Exception as e: | |
print(f"Error processing page {page_num + 1}: {e}") | |
pages_content.append({ | |
"page_number": page_num + 1, | |
"text_blocks": [], | |
"images": [], | |
"tables": [], | |
"page_width": 595, | |
"page_height": 842 | |
}) | |
result = { | |
"pages": pages_content, | |
"total_pages": doc.page_count | |
} | |
return result | |
except Exception as e: | |
raise Exception(f"Error extracting PDF content: {str(e)}") | |
finally: | |
if doc is not None: | |
try: | |
doc.close() | |
print("PDF document closed successfully") | |
except Exception as e: | |
print(f"Error closing PDF document: {e}") | |
def _extract_text_blocks_from_dict(self, page_dict: dict, page_num: int) -> List[TextBlock]: | |
text_blocks = [] | |
for block_idx, block in enumerate(page_dict.get("blocks", [])): | |
if "lines" not in block: | |
continue | |
for line_idx, line in enumerate(block["lines"]): | |
for span_idx, span in enumerate(line["spans"]): | |
text_content = span.get("text", "").strip() | |
if text_content: | |
cleaned_text = self.clean_text_for_xml(text_content) | |
if not cleaned_text: | |
continue | |
bbox = span["bbox"] | |
font_name = self.clean_font_name(span.get("font", "Arial")) | |
font_info = { | |
"size": max(span.get("size", 12), 6), | |
"font": font_name, | |
"is_bold": "bold" in font_name.lower() or bool(span.get("flags", 0) & 16), | |
"is_italic": "italic" in font_name.lower() or bool(span.get("flags", 0) & 2) | |
} | |
is_math = self.detect_mathematical_content(cleaned_text) | |
text_block = TextBlock( | |
text=cleaned_text, | |
x=bbox[0], y=bbox[1], | |
width=bbox[2] - bbox[0], height=bbox[3] - bbox[1], | |
font_size=font_info["size"], font_name=font_info["font"], | |
is_bold=font_info["is_bold"], is_italic=font_info["is_italic"], | |
block_id=f"p{page_num}-b{block_idx}-l{line_idx}-s{span_idx}", | |
is_math=is_math | |
) | |
text_blocks.append(text_block) | |
return text_blocks | |
def _extract_text_blocks_simple(self, page, page_num: int) -> List[TextBlock]: | |
text_blocks = [] | |
try: | |
blocks_data = page.get_text("blocks") | |
for block_idx, block in enumerate(blocks_data): | |
if block[6] == 0: | |
text = block[4].strip() | |
if text: | |
cleaned_text = self.clean_text_for_xml(text) | |
if not cleaned_text: | |
continue | |
x0, y0, x1, y1 = block[0], block[1], block[2], block[3] | |
lines = cleaned_text.split('\n') | |
line_height = (y1 - y0) / max(len(lines), 1) | |
for line_idx, line in enumerate(lines): | |
line_text = self.clean_text_for_xml(line) | |
if line_text: | |
is_math = self.detect_mathematical_content(line_text) | |
text_block = TextBlock( | |
text=line_text, | |
x=x0, y=y0 + (line_idx * line_height), | |
width=x1 - x0, height=line_height, | |
font_size=12, font_name="Arial", | |
is_bold=False, is_italic=False, | |
block_id=f"p{page_num}-simple-b{block_idx}-l{line_idx}", | |
is_math=is_math | |
) | |
text_blocks.append(text_block) | |
except Exception as e: | |
print(f"Simple text block extraction failed: {e}") | |
return text_blocks | |
def _extract_images_safely(self, page, doc, page_num) -> List[Dict]: | |
images = [] | |
try: | |
image_list = page.get_images(full=True) | |
for img_index, img_info in enumerate(image_list): | |
try: | |
xref = img_info[0] | |
img_rects = [r for r in page.get_image_rects(xref)] | |
if not img_rects: | |
continue | |
bbox = img_rects[0] | |
pix = fitz.Pixmap(doc, xref) | |
if pix.n - pix.alpha < 4: | |
img_data = pix.tobytes("png") | |
img_base64 = base64.b64encode(img_data).decode() | |
images.append({ | |
"index": img_index, | |
"data": img_data, | |
"base64": img_base64, | |
"bbox": (bbox.x0, bbox.y0, bbox.x1, bbox.y1) | |
}) | |
pix = None | |
except Exception as e: | |
print(f"Error extracting image {img_index} on page {page_num+1}: {e}") | |
continue | |
except Exception as e: | |
print(f"General error in image extraction for page {page_num+1}: {e}") | |
return images | |
def _detect_tables_with_camelot(self, pdf_path: str, page_num: int) -> List[Dict]: | |
tables = [] | |
try: | |
try: | |
camelot_tables = camelot.read_pdf( | |
pdf_path, | |
pages=str(page_num + 1), | |
flavor='lattice', | |
suppress_stdout=True | |
) | |
if len(camelot_tables) == 0: | |
camelot_tables = camelot.read_pdf( | |
pdf_path, | |
pages=str(page_num + 1), | |
flavor='stream', | |
suppress_stdout=True | |
) | |
except: | |
camelot_tables = camelot.read_pdf( | |
pdf_path, | |
pages=str(page_num + 1), | |
flavor='stream', | |
suppress_stdout=True | |
) | |
for table in camelot_tables: | |
table_data = table.df.values.tolist() | |
if table_data and any(any(str(cell).strip() for cell in row) for row in table_data): | |
cleaned_data = [] | |
for row in table_data: | |
cleaned_row = [] | |
for cell in row: | |
cell_text = str(cell).strip() if cell is not None else "" | |
cleaned_cell = self.clean_text_for_xml(cell_text) | |
cleaned_row.append(cleaned_cell) | |
cleaned_data.append(cleaned_row) | |
tables.append({ | |
"bbox": table.bbox, | |
"data": cleaned_data, | |
"accuracy": getattr(table, 'accuracy', 0) | |
}) | |
print(f"Found table with {len(cleaned_data)} rows and {len(cleaned_data[0]) if cleaned_data else 0} columns on page {page_num + 1}") | |
except Exception as e: | |
print(f"Error detecting tables with Camelot on page {page_num + 1}: {e}") | |
return tables | |
def _add_page_break(self, doc): | |
try: | |
paragraph = doc.add_paragraph() | |
run = paragraph.runs[0] if paragraph.runs else paragraph.add_run() | |
run.add_break(WD_BREAK.PAGE) | |
except: | |
doc.add_page_break() | |
def _set_font_properties(self, run, text_block: TextBlock): | |
try: | |
font_name = self.clean_font_name(text_block.font_name) | |
if 'Times' in font_name or 'Roman' in font_name: | |
run.font.name = 'Times New Roman' | |
elif 'Arial' in font_name: | |
run.font.name = 'Arial' | |
elif 'Courier' in font_name: | |
run.font.name = 'Courier New' | |
else: | |
run.font.name = 'Calibri' | |
try: | |
font_size_val = float(text_block.font_size) | |
font_size = max(min(int(font_size_val), 72), 6) | |
run.font.size = Pt(font_size) | |
except (ValueError, TypeError): | |
print(f"Warning: Invalid font_size '{text_block.font_size}'. Using default 11pt.") | |
run.font.size = Pt(11) | |
run.font.bold = bool(text_block.is_bold) | |
run.font.italic = bool(text_block.is_italic) | |
if text_block.is_math: | |
run.font.name = 'Cambria Math' | |
except Exception as e: | |
print(f"Error setting font properties for text_block: {e}") | |
run.font.name = 'Calibri' | |
run.font.size = Pt(11) | |
run.font.bold = False | |
run.font.italic = False | |
def _group_text_blocks_by_lines(self, text_blocks: List[TextBlock]) -> List[List[TextBlock]]: | |
if not text_blocks: | |
return [] | |
sorted_blocks = sorted(text_blocks, key=lambda b: (round(b.y, 1), b.x)) | |
lines = [] | |
current_line = [] | |
current_y = None | |
for block in sorted_blocks: | |
if current_y is None or abs(block.y - current_y) <= 5: | |
current_line.append(block) | |
current_y = block.y if current_y is None else current_y | |
else: | |
if current_line: | |
lines.append(current_line) | |
current_line = [block] | |
current_y = block.y | |
if current_line: | |
lines.append(current_line) | |
return lines | |
def _set_table_borders(self, table): | |
tbl = table._tbl | |
for row in tbl.tr_lst: | |
for cell in row.tc_lst: | |
tcPr = cell.tcPr | |
tcBorders = OxmlElement('w:tcBorders') | |
for border_name in ['top', 'left', 'bottom', 'right']: | |
border = OxmlElement(f'w:{border_name}') | |
border.set(qn('w:val'), 'single') | |
border.set(qn('w:sz'), '4') | |
border.set(qn('w:space'), '0') | |
border.set(qn('w:color'), '000000') | |
tcBorders.append(border) | |
tcPr.append(tcBorders) | |
def _create_enhanced_table(self, doc, table_data): | |
try: | |
table_rows = table_data["data"] | |
if not table_rows or not any(any(str(cell).strip() for cell in row) for row in table_rows): | |
return None | |
max_cols = max(len(row) for row in table_rows) if table_rows else 0 | |
if max_cols == 0: | |
return None | |
word_table = doc.add_table(rows=len(table_rows), cols=max_cols) | |
self._set_table_borders(word_table) | |
word_table.alignment = WD_TABLE_ALIGNMENT.CENTER | |
word_table.autofit = False | |
for row_idx, row_data in enumerate(table_rows): | |
for col_idx in range(max_cols): | |
cell = word_table.cell(row_idx, col_idx) | |
cell_data = row_data[col_idx] if col_idx < len(row_data) else "" | |
clean_cell_data = self.clean_text_for_xml(str(cell_data) if cell_data else "") | |
paragraph = cell.paragraphs[0] | |
paragraph.clear() | |
run = paragraph.add_run(clean_cell_data) | |
if self.detect_mathematical_content(clean_cell_data): | |
run.font.name = 'Cambria Math' | |
else: | |
run.font.name = 'Calibri' | |
run.font.size = Pt(9) | |
if row_idx == 0: | |
run.font.bold = True | |
paragraph.alignment = WD_ALIGN_PARAGRAPH.CENTER | |
cell.vertical_alignment = docx.enum.table.WD_ALIGN_VERTICAL.CENTER | |
print(f"Created table with {len(table_rows)} rows and {max_cols} columns") | |
return word_table | |
except Exception as e: | |
print(f"Error creating enhanced table: {e}") | |
return None | |
def convert_to_word(self, pdf_content: Dict[str, Any], output_path: str = None) -> Document: | |
print("Creating Word document...") | |
doc = Document() | |
doc.core_properties.title = "PDF to Word Conversion" | |
doc.core_properties.author = "PDF Converter" | |
doc.core_properties.created = datetime.now() | |
header_para = doc.add_paragraph() | |
header_run = header_para.add_run("PDF Document Conversion") | |
header_run.font.size = Pt(16) | |
header_run.font.bold = True | |
header_para.alignment = WD_ALIGN_PARAGRAPH.CENTER | |
info_para = doc.add_paragraph() | |
info_run = info_para.add_run(f"Total Pages: {pdf_content.get('total_pages', 'Unknown')} | Converted on: {self._get_current_timestamp()}") | |
info_run.font.size = Pt(10) | |
info_para.alignment = WD_ALIGN_PARAGRAPH.CENTER | |
doc.add_paragraph() | |
for page_idx, page in enumerate(pdf_content["pages"]): | |
print(f"Converting page {page['page_number']}/{pdf_content.get('total_pages', '?')}") | |
page_header = doc.add_paragraph() | |
page_header_run = page_header.add_run(f"--- Page {page['page_number']} ---") | |
page_header_run.font.bold = True | |
page_header_run.font.size = Pt(12) | |
page_header.alignment = WD_ALIGN_PARAGRAPH.CENTER | |
for img in page["images"]: | |
try: | |
img_para = doc.add_paragraph() | |
img_run = img_para.add_run() | |
img_stream = io.BytesIO(img['data']) | |
img_bbox = img['bbox'] | |
img_width_px = img_bbox[2] - img_bbox[0] | |
page_width_px = page.get('page_width', 595) | |
img_width = min(Inches(img_width_px / 72), Inches(6.5)) | |
img_run.add_picture(img_stream, width=img_width) | |
img_para.alignment = WD_ALIGN_PARAGRAPH.CENTER | |
except Exception as e: | |
print(f"Error adding image to Word document: {e}") | |
img_para = doc.add_paragraph() | |
img_run = img_para.add_run(f"[Image {img['index']} - Could not be inserted]") | |
img_run.font.italic = True | |
if page["tables"]: | |
for table_data in page["tables"]: | |
try: | |
enhanced_table = self._create_enhanced_table(doc, table_data) | |
if enhanced_table: | |
doc.add_paragraph() | |
except Exception as e: | |
print(f"Error adding table to Word document: {e}") | |
text_lines = self._group_text_blocks_by_lines(page["text_blocks"]) | |
for line_blocks in text_lines: | |
if not line_blocks: | |
continue | |
para = doc.add_paragraph() | |
line_blocks.sort(key=lambda b: b.x) | |
for block in line_blocks: | |
cleaned_text = self.clean_text_for_xml(block.text) | |
if cleaned_text: | |
run = para.add_run(cleaned_text + " ") | |
self._set_font_properties(run, block) | |
if page_idx < len(pdf_content["pages"]) - 1: | |
self._add_page_break(doc) | |
if output_path: | |
try: | |
Path(output_path).parent.mkdir(parents=True, exist_ok=True) | |
doc.save(output_path) | |
print(f"Word document saved to: {output_path}") | |
except Exception as e: | |
print(f"Error saving Word document to {output_path}: {e}") | |
return doc | |
def _get_current_timestamp(self) -> str: | |
return datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
def process_pdf_to_word(self, pdf_path: str, output_path: str = None, use_hf_models: bool = False) -> Document: | |
print(f"Processing PDF to Word: {pdf_path}") | |
if not os.path.exists(pdf_path): | |
raise FileNotFoundError(f"PDF file not found: {pdf_path}") | |
print("Extracting PDF content...") | |
pdf_content = self.extract_pdf_content(pdf_path) | |
if use_hf_models and self.hf_token: | |
print("Attempting to enhance with Hugging Face models...") | |
try: | |
print("Note: Hugging Face model integration requires further implementation.") | |
except Exception as e: | |
print(f"Hugging Face enhancement failed: {e}") | |
print("Converting to Word document...") | |
word_doc = self.convert_to_word(pdf_content, output_path) | |
print("Processing complete!") | |
return word_doc | |
def main(): | |
HF_TOKEN = os.getenv("HF_API_TOKEN") | |
converter = PDFToWordConverter(huggingface_token=HF_TOKEN) | |
pdf_path = "supplychain (1).pdf" | |
output_path = "converted_document_enhanced.docx" | |
try: | |
word_document = converter.process_pdf_to_word( | |
pdf_path=pdf_path, | |
output_path=output_path, | |
use_hf_models=False | |
) | |
print(f"Successfully converted '{pdf_path}' to '{output_path}'") | |
print(f"Open '{output_path}' in Microsoft Word to view the result!") | |
except FileNotFoundError as e: | |
print(f"Error: {e}") | |
except Exception as e: | |
print(f"An unexpected error occurred: {str(e)}") | |
import traceback | |
traceback.print_exc() | |
if __name__ == "__main__": | |
main() |