Spaces:
Running
Running
import os | |
from pathlib import Path | |
from typing import Optional, Dict, Any, Union | |
import magic | |
from docling.document_converter import DocumentConverter | |
from datetime import datetime | |
import shutil | |
import tempfile | |
from .types import ParsedDocument, DocumentMetadata | |
from .exceptions import UnsupportedFormatError, ParseError | |
class DocumentParser: | |
""" | |
A multiformat document parser using Docling | |
""" | |
SUPPORTED_FORMATS = { | |
'application/pdf': 'pdf', | |
'application/vnd.openxmlformats-officedocument.wordprocessingml.document': 'docx', | |
'text/plain': 'txt', | |
'text/html': 'html', | |
'text/markdown': 'md', | |
# Add common variations | |
'application/x-pdf': 'pdf', | |
'application/acrobat': 'pdf', | |
'application/msword': 'docx', | |
'text/x-markdown': 'md', | |
'text/x-html': 'html' | |
} | |
EXTENSION_TO_MIME = { | |
'.pdf': 'application/pdf', | |
'.docx': 'application/vnd.openxmlformats-officedocument.wordprocessingml.document', | |
'.txt': 'text/plain', | |
'.html': 'text/html', | |
'.htm': 'text/html', | |
'.md': 'text/markdown', | |
'.markdown': 'text/markdown' | |
} | |
def __init__(self, config: Optional[Dict[str, Any]] = None): | |
self.config = config or {} | |
self.converter = DocumentConverter() | |
# Create a temporary directory for processing files | |
self.temp_dir = Path(tempfile.mkdtemp(prefix="dockling_")) | |
def __del__(self): | |
"""Cleanup temporary directory on object destruction""" | |
if hasattr(self, 'temp_dir') and self.temp_dir.exists(): | |
shutil.rmtree(self.temp_dir, ignore_errors=True) | |
def _validate_and_copy_file(self, file_path: Union[str, Path]) -> Path: | |
""" | |
Validate file and copy to temporary location with correct extension | |
""" | |
file_path = Path(file_path) | |
if not file_path.exists(): | |
raise FileNotFoundError(f"File not found: {file_path}") | |
# Try to determine format from extension first | |
extension = file_path.suffix.lower() | |
mime_type = self.EXTENSION_TO_MIME.get(extension) | |
# If extension not recognized, use magic | |
if not mime_type: | |
mime_type = magic.from_file(str(file_path), mime=True) | |
if mime_type in self.SUPPORTED_FORMATS: | |
extension = f".{self.SUPPORTED_FORMATS[mime_type]}" | |
else: | |
raise UnsupportedFormatError( | |
f"Unsupported file format: {mime_type}. " | |
f"Supported formats are: {', '.join(set(self.SUPPORTED_FORMATS.values()))}" | |
) | |
# Copy file to temp directory with correct extension | |
temp_file = self.temp_dir / f"doc{extension}" | |
shutil.copy2(file_path, temp_file) | |
return temp_file | |
def parse(self, file_path: Union[str, Path]) -> ParsedDocument: | |
""" | |
Parse a document file and return structured content | |
Args: | |
file_path: Path to the document file | |
Returns: | |
ParsedDocument object containing parsed content and metadata | |
Raises: | |
UnsupportedFormatError: If the file format is not supported | |
ParseError: If parsing fails | |
""" | |
try: | |
# Validate and prepare file | |
temp_file = self._validate_and_copy_file(file_path) | |
# Get file metadata | |
stats = temp_file.stat() | |
mime_type = magic.from_file(str(temp_file), mime=True) | |
metadata = DocumentMetadata( | |
filename=Path(file_path).name, # Use original filename | |
file_type=self.SUPPORTED_FORMATS[mime_type], | |
size_bytes=stats.st_size, | |
created_at=datetime.fromtimestamp(stats.st_ctime), | |
modified_at=datetime.fromtimestamp(stats.st_mtime), | |
mime_type=mime_type | |
) | |
try: | |
# Parse document using Docling | |
result = self.converter.convert(str(temp_file)) | |
doc = result.document | |
# Extract content using proper methods | |
try: | |
content = doc.export_to_text() | |
except Exception as e: | |
raise ParseError(f"Failed to extract text content: {str(e)}") | |
# Extract structured content | |
structured_content = { | |
'sections': doc.sections if hasattr(doc, 'sections') else [], | |
'paragraphs': doc.paragraphs if hasattr(doc, 'paragraphs') else [], | |
'entities': doc.entities if hasattr(doc, 'entities') else {}, | |
'metadata': doc.metadata if hasattr(doc, 'metadata') else {} | |
} | |
# Get raw text if available | |
try: | |
raw_text = doc.export_to_text(include_layout=True) | |
except: | |
raw_text = content | |
# Update metadata with document-specific information | |
if hasattr(doc, 'metadata') and doc.metadata: | |
metadata.title = doc.metadata.get('title') | |
metadata.author = doc.metadata.get('author') | |
metadata.pages = doc.metadata.get('pages') | |
metadata.extra.update(doc.metadata) | |
return ParsedDocument( | |
content=content, | |
metadata=metadata, | |
raw_text=raw_text, | |
structured_content=structured_content, | |
confidence_score=getattr(doc, 'confidence', 1.0) | |
) | |
except Exception as e: | |
raise ParseError(f"Failed to parse document: {str(e)}") | |
except Exception as e: | |
raise ParseError(str(e)) | |
finally: | |
# Cleanup temporary files | |
if 'temp_file' in locals() and temp_file.exists(): | |
try: | |
temp_file.unlink() | |
except: | |
pass | |
def supports_format(self, mime_type: str) -> bool: | |
"""Check if a given MIME type is supported""" | |
return mime_type in self.SUPPORTED_FORMATS |