|
import os |
|
import time |
|
import tempfile |
|
import traceback |
|
import logging |
|
from pathlib import Path |
|
from typing import List, Union, Optional, Dict, Any, Literal |
|
import uuid |
|
|
|
from fastapi import FastAPI, File, UploadFile, HTTPException, status, Request |
|
from pydantic import BaseModel, Field |
|
|
|
|
|
|
|
|
|
from mdr_pdf_parser import ( |
|
MagicPDFProcessor, |
|
MDRStructuredBlock, |
|
MDRTextBlock, |
|
MDRTableBlock, |
|
MDRFormulaBlock, |
|
MDRFigureBlock, |
|
MDRTextKind, |
|
MDRTableFormat, |
|
MDRRectangle, |
|
MDRTextSpan, |
|
MDRExtractedTableFormat |
|
) |
|
|
|
|
|
LOG_LEVEL_STR = os.environ.get("LOG_LEVEL", "INFO").upper() |
|
LOG_LEVEL = getattr(logging, LOG_LEVEL_STR, logging.INFO) |
|
|
|
logging.basicConfig( |
|
level=LOG_LEVEL, |
|
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", |
|
datefmt="%Y-%m-%d %H:%M:%S", |
|
) |
|
logger = logging.getLogger("mdr_fastapi_service") |
|
|
|
|
|
|
|
MODEL_DIR = os.environ.get("MDR_MODEL_DIR", "/models") |
|
DEVICE = os.environ.get("MDR_DEVICE", "cuda") |
|
TABLE_FORMAT_STR = os.environ.get("MDR_TABLE_FORMAT", "MARKDOWN") |
|
DEBUG_DIR_PATH_STR = os.environ.get("MDR_DEBUG_DIR_PATH", None) |
|
|
|
|
|
try: |
|
TABLE_FORMAT = MDRExtractedTableFormat[TABLE_FORMAT_STR.upper()] |
|
except KeyError: |
|
logger.warning(f"Invalid MDR_TABLE_FORMAT '{TABLE_FORMAT_STR}'. Defaulting to DISABLE.") |
|
TABLE_FORMAT = MDRExtractedTableFormat.DISABLE |
|
|
|
|
|
mdr_processor: Optional[MagicPDFProcessor] = None |
|
|
|
|
|
|
|
class MDRPointModel(BaseModel): |
|
x: float |
|
y: float |
|
|
|
class MDRRectangleModel(BaseModel): |
|
lt: MDRPointModel |
|
rt: MDRPointModel |
|
lb: MDRPointModel |
|
rb: MDRPointModel |
|
|
|
@classmethod |
|
def from_mdr_rectangle(cls, rect: MDRRectangle): |
|
return cls( |
|
lt=MDRPointModel(x=rect.lt[0], y=rect.lt[1]), |
|
rt=MDRPointModel(x=rect.rt[0], y=rect.rt[1]), |
|
lb=MDRPointModel(x=rect.lb[0], y=rect.lb[1]), |
|
rb=MDRPointModel(x=rect.rb[0], y=rect.rb[1]), |
|
) |
|
|
|
class MDRTextSpanModel(BaseModel): |
|
content: str |
|
rank: float |
|
rect: MDRRectangleModel |
|
|
|
@classmethod |
|
def from_mdr_text_span(cls, span: MDRTextSpan): |
|
return cls( |
|
content=span.content, |
|
rank=span.rank, |
|
rect=MDRRectangleModel.from_mdr_rectangle(span.rect) |
|
) |
|
|
|
class MDRBasicBlockModel(BaseModel): |
|
block_type: str |
|
rect: MDRRectangleModel |
|
texts: List[MDRTextSpanModel] = Field(default_factory=list) |
|
font_size: float |
|
|
|
class MDRTextBlockModel(MDRBasicBlockModel): |
|
block_type: Literal["TextBlock"] = "TextBlock" |
|
kind: str |
|
has_paragraph_indentation: bool |
|
last_line_touch_end: bool |
|
texts: List[MDRTextSpanModel] |
|
|
|
@classmethod |
|
def from_mdr_text_block(cls, block: MDRTextBlock): |
|
return cls( |
|
rect=MDRRectangleModel.from_mdr_rectangle(block.rect), |
|
texts=[MDRTextSpanModel.from_mdr_text_span(span) for span in block.texts], |
|
font_size=block.font_size, |
|
kind=block.kind.name, |
|
has_paragraph_indentation=block.has_paragraph_indentation, |
|
last_line_touch_end=block.last_line_touch_end |
|
) |
|
|
|
class MDRTableBlockModel(MDRBasicBlockModel): |
|
block_type: Literal["TableBlock"] = "TableBlock" |
|
content: str |
|
format: str |
|
|
|
@classmethod |
|
def from_mdr_table_block(cls, block: MDRTableBlock): |
|
return cls( |
|
rect=MDRRectangleModel.from_mdr_rectangle(block.rect), |
|
texts=[MDRTextSpanModel.from_mdr_text_span(span) for span in block.texts], |
|
font_size=block.font_size, |
|
content=block.content, |
|
format=block.format.name |
|
) |
|
|
|
class MDRFormulaBlockModel(MDRBasicBlockModel): |
|
block_type: Literal["FormulaBlock"] = "FormulaBlock" |
|
content: Optional[str] = None |
|
|
|
@classmethod |
|
def from_mdr_formula_block(cls, block: MDRFormulaBlock): |
|
return cls( |
|
rect=MDRRectangleModel.from_mdr_rectangle(block.rect), |
|
texts=[MDRTextSpanModel.from_mdr_text_span(span) for span in block.texts], |
|
font_size=block.font_size, |
|
content=block.content |
|
) |
|
|
|
class MDRFigureBlockModel(MDRBasicBlockModel): |
|
block_type: Literal["FigureBlock"] = "FigureBlock" |
|
|
|
@classmethod |
|
def from_mdr_figure_block(cls, block: MDRFigureBlock): |
|
return cls( |
|
rect=MDRRectangleModel.from_mdr_rectangle(block.rect), |
|
texts=[MDRTextSpanModel.from_mdr_text_span(span) for span in block.texts], |
|
font_size=block.font_size |
|
) |
|
|
|
MDRStructuredBlockModelAPI = Union[MDRTextBlockModel, MDRTableBlockModel, MDRFormulaBlockModel, MDRFigureBlockModel] |
|
|
|
|
|
app = FastAPI( |
|
title="MagicDataReadiness PDF Processor", |
|
description="API service to extract structured content from PDF files.", |
|
version="1.0.0" |
|
) |
|
|
|
|
|
def _convert_block_to_api_model(block: MDRStructuredBlock) -> Optional[MDRStructuredBlockModelAPI]: |
|
"""Converts internal MDR block to an API model.""" |
|
if isinstance(block, MDRTextBlock): |
|
return MDRTextBlockModel.from_mdr_text_block(block) |
|
elif isinstance(block, MDRTableBlock): |
|
return MDRTableBlockModel.from_mdr_table_block(block) |
|
elif isinstance(block, MDRFormulaBlock): |
|
return MDRFormulaBlockModel.from_mdr_formula_block(block) |
|
elif isinstance(block, MDRFigureBlock): |
|
return MDRFigureBlockModel.from_mdr_figure_block(block) |
|
logger.warning(f"Unknown block type encountered: {type(block)}. Skipping conversion.") |
|
return None |
|
|
|
|
|
@app.on_event("startup") |
|
async def startup_event(): |
|
global mdr_processor |
|
logger.info("Application startup sequence initiated.") |
|
logger.info("--- Configuration ---") |
|
logger.info(f" MDR_MODEL_DIR: {MODEL_DIR}") |
|
logger.info(f" MDR_DEVICE: {DEVICE}") |
|
logger.info(f" MDR_TABLE_FORMAT: {TABLE_FORMAT.name}") |
|
logger.info(f" MDR_DEBUG_DIR_PATH: {DEBUG_DIR_PATH_STR if DEBUG_DIR_PATH_STR else 'Not set'}") |
|
logger.info(f" LOG_LEVEL: {LOG_LEVEL_STR}") |
|
logger.info("---------------------") |
|
|
|
logger.info("Initializing MagicPDFProcessor...") |
|
init_start_time = time.time() |
|
try: |
|
mdr_processor = MagicPDFProcessor( |
|
device=DEVICE, |
|
model_dir_path=MODEL_DIR, |
|
extract_table_format=TABLE_FORMAT, |
|
debug_dir_path=DEBUG_DIR_PATH_STR |
|
) |
|
init_duration = time.time() - init_start_time |
|
logger.info(f"MagicPDFProcessor initialized successfully ({init_duration:.2f}s)") |
|
except Exception as e: |
|
logger.critical(f"Failed to initialize MagicPDFProcessor: {e}", exc_info=True) |
|
|
|
|
|
|
|
@app.on_event("startup") |
|
async def startup_event_check(): |
|
if mdr_processor is None: |
|
logger.error("MagicPDFProcessor is not initialized. Service cannot function correctly.") |
|
|
|
|
|
|
|
else: |
|
logger.info("MagicDataReadiness Service is ready and processor is available.") |
|
|
|
|
|
|
|
@app.get("/health", summary="Health Check") |
|
async def health_check(): |
|
"""Simple health check endpoint.""" |
|
if mdr_processor is None: |
|
logger.warning("/health endpoint called but processor is not initialized.") |
|
raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="Processor not initialized") |
|
return {"status": "ok", "message": "MagicPDFProcessor is running."} |
|
|
|
@app.post("/process-pdf/", |
|
response_model=List[MDRStructuredBlockModelAPI], |
|
summary="Process a PDF file", |
|
description="Upload a PDF file to extract structured blocks (text, tables, figures, formulas).") |
|
async def process_pdf_endpoint(request: Request, file: UploadFile = File(..., description="The PDF file to process.")): |
|
""" |
|
Handles PDF file upload, processing, and returns extracted blocks. |
|
""" |
|
request_id = str(uuid.uuid4()) |
|
client_host = request.client.host if request.client else "unknown" |
|
logger.info(f"RID-{request_id}: Received /process-pdf request from {client_host} for file: '{file.filename}' (type: {file.content_type}, size: {file.size})") |
|
|
|
if mdr_processor is None: |
|
logger.error(f"RID-{request_id}: Processor not initialized. Cannot process request.") |
|
raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="Processor not initialized") |
|
|
|
if not file.filename or not file.filename.lower().endswith(".pdf"): |
|
logger.warning(f"RID-{request_id}: Invalid file type uploaded: '{file.filename}'") |
|
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid file type. Please upload a PDF.") |
|
|
|
temp_pdf_path_obj = None |
|
try: |
|
|
|
save_start_time = time.time() |
|
|
|
temp_dir = Path("./temp_uploads") |
|
temp_dir.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf", dir=temp_dir, prefix=f"req_{request_id}_") as temp_file: |
|
content = await file.read() |
|
temp_file.write(content) |
|
temp_pdf_path_obj = Path(temp_file.name) |
|
save_duration = time.time() - save_start_time |
|
logger.info(f"RID-{request_id}: File '{file.filename}' saved temporarily to '{temp_pdf_path_obj}' ({save_duration:.2f}s)") |
|
|
|
except Exception as e: |
|
logger.error(f"RID-{request_id}: Failed to save uploaded file '{file.filename}': {e}", exc_info=True) |
|
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to save uploaded file: {e}") |
|
|
|
extracted_blocks_api: List[MDRStructuredBlockModelAPI] = [] |
|
processing_start_time = time.time() |
|
|
|
try: |
|
logger.info(f"RID-{request_id}: Starting PDF processing for '{temp_pdf_path_obj}'...") |
|
|
|
|
|
def log_progress(completed_pages: int, total_pages: int): |
|
if total_pages > 0: |
|
percent_done = (completed_pages / total_pages) * 100 |
|
logger.info(f"RID-{request_id}: Processor progress - Page {completed_pages}/{total_pages} ({percent_done:.1f}%) scanned.") |
|
else: |
|
logger.info(f"RID-{request_id}: Processor progress - Page {completed_pages} scanned (total pages unknown or zero).") |
|
|
|
|
|
|
|
|
|
all_blocks_internal = list(mdr_processor.process_document( |
|
pdf_input=str(temp_pdf_path_obj), |
|
report_progress=log_progress |
|
)) |
|
collection_duration = time.time() - processing_start_time |
|
logger.info(f"RID-{request_id}: Extracted {len(all_blocks_internal)} raw blocks from processor ({collection_duration:.2f}s).") |
|
|
|
conversion_start_time = time.time() |
|
for i, block_internal in enumerate(all_blocks_internal): |
|
logger.debug(f"RID-{request_id}: Converting internal block {i+1}/{len(all_blocks_internal)} of type {type(block_internal)} to API model.") |
|
api_block = _convert_block_to_api_model(block_internal) |
|
if api_block: |
|
extracted_blocks_api.append(api_block) |
|
conversion_duration = time.time() - conversion_start_time |
|
logger.info(f"RID-{request_id}: Converted {len(extracted_blocks_api)} blocks to API models ({conversion_duration:.2f}s).") |
|
|
|
total_processing_duration = time.time() - processing_start_time |
|
logger.info(f"RID-{request_id}: PDF processing finished in {total_processing_duration:.2f}s. Returning {len(extracted_blocks_api)} blocks.") |
|
|
|
except Exception as e: |
|
logger.error(f"RID-{request_id}: Error during PDF processing for '{temp_pdf_path_obj}': {e}", exc_info=True) |
|
|
|
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"An error occurred during PDF processing: {e}") |
|
finally: |
|
|
|
if temp_pdf_path_obj and temp_pdf_path_obj.exists(): |
|
try: |
|
os.remove(temp_pdf_path_obj) |
|
logger.info(f"RID-{request_id}: Cleaned up temporary file: {temp_pdf_path_obj}") |
|
except OSError as e: |
|
logger.warning(f"RID-{request_id}: Could not remove temporary file {temp_pdf_path_obj}: {e}") |
|
elif temp_pdf_path_obj: |
|
logger.info(f"RID-{request_id}: Temporary file {temp_pdf_path_obj} not found for cleanup (may have failed to save).") |
|
|
|
|
|
return extracted_blocks_api |
|
|
|
@app.get("/", summary="Root Endpoint") |
|
async def read_root(): |
|
"""Provides basic information about the API.""" |
|
return { |
|
"message": "Welcome to the MagicDataReadiness PDF Processor API!", |
|
"docs_url": "/docs", |
|
"redoc_url": "/redoc", |
|
"health_url": "/health", |
|
"active_configuration": { |
|
"model_directory": MODEL_DIR, |
|
"target_device": DEVICE, |
|
"table_format": TABLE_FORMAT.name, |
|
"log_level": LOG_LEVEL_STR, |
|
"processor_debug_output": DEBUG_DIR_PATH_STR if DEBUG_DIR_PATH_STR else "Disabled" |
|
} |
|
} |
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
|
|
import uvicorn |
|
logger.info("Starting Uvicorn server for local development...") |
|
uvicorn.run(app, host="0.0.0.0", port=7860, log_level=LOG_LEVEL_STR.lower()) |