from fastapi import FastAPI, File, UploadFile, HTTPException from pydantic import BaseModel import base64 import io import os import logging from PIL import Image, UnidentifiedImageError import torch import asyncio from utils import ( check_ocr_box, get_yolo_model, get_caption_model_processor, get_som_labeled_img, ) from transformers import AutoProcessor, AutoModelForCausalLM # Configure logging logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger(__name__) # Load YOLO model yolo_model = get_yolo_model(model_path="weights/best.pt") # Handle device placement device = torch.device("cuda" if torch.cuda.is_available() else "cpu") yolo_model = yolo_model.to(device) # Load caption model and processor try: processor = AutoProcessor.from_pretrained( "microsoft/Florence-2-base", trust_remote_code=True ) model = AutoModelForCausalLM.from_pretrained( "weights/icon_caption_florence", torch_dtype=torch.float16, trust_remote_code=True, ).to("cuda" if torch.cuda.is_available() else "cpu") except Exception as e: logger.warning(f"Failed to load caption model on GPU: {e}. Falling back to CPU.") model = AutoModelForCausalLM.from_pretrained( "weights/icon_caption_florence", torch_dtype=torch.float16, trust_remote_code=True, ) caption_model_processor = {"processor": processor, "model": model} logger.info("Finished loading models!") # Initialize FastAPI app app = FastAPI() MAX_QUEUE_SIZE = 10 # Set a reasonable limit based on your system capacity request_queue = asyncio.Queue(maxsize=MAX_QUEUE_SIZE) # Define response model class ProcessResponse(BaseModel): image: str # Base64 encoded image parsed_content_list: str label_coordinates: str # Background worker to process queue tasks async def worker(): while True: task = await request_queue.get() try: await task except Exception as e: logger.error(f"Error while processing task: {e}") finally: request_queue.task_done() # Start worker on startup @app.on_event("startup") async def startup_event(): logger.info("Starting background worker...") asyncio.create_task(worker()) # Image processing function async def process(image_input: Image.Image, box_threshold: float, iou_threshold: float) -> ProcessResponse: try: # Define save path image_save_path = "imgs/saved_image_demo.png" os.makedirs(os.path.dirname(image_save_path), exist_ok=True) # Save image image_input.save(image_save_path) logger.debug(f"Image saved to: {image_save_path}") # YOLO and caption model inference box_overlay_ratio = image_input.size[0] / 3200 draw_bbox_config = { "text_scale": 0.8 * box_overlay_ratio, "text_thickness": max(int(2 * box_overlay_ratio), 1), "text_padding": max(int(3 * box_overlay_ratio), 1), "thickness": max(int(3 * box_overlay_ratio), 1), } ocr_bbox_rslt, is_goal_filtered = await asyncio.to_thread( check_ocr_box, image_save_path, display_img=False, output_bb_format="xyxy", goal_filtering=None, easyocr_args={"paragraph": False, "text_threshold": 0.9}, use_paddleocr=True, ) text, ocr_bbox = ocr_bbox_rslt dino_labled_img, label_coordinates, parsed_content_list = await asyncio.to_thread( get_som_labeled_img, image_save_path, yolo_model, BOX_TRESHOLD=box_threshold, output_coord_in_ratio=True, ocr_bbox=ocr_bbox, draw_bbox_config=draw_bbox_config, caption_model_processor=caption_model_processor, ocr_text=text, iou_threshold=iou_threshold, ) # Convert labeled image to base64 image = Image.open(io.BytesIO(base64.b64decode(dino_labled_img))) buffered = io.BytesIO() image.save(buffered, format="PNG") img_str = base64.b64encode(buffered.getvalue()).decode("utf-8") # Join parsed content list parsed_content_list_str = "\n".join([str(item) for item in parsed_content_list]) return ProcessResponse( image=img_str, parsed_content_list=parsed_content_list_str, label_coordinates=str(label_coordinates), ) except Exception as e: logger.error(f"Error in process function: {e}") raise HTTPException(status_code=500, detail=f"Failed to process the image: {e}") # API endpoint for processing images @app.post("/process_image", response_model=ProcessResponse) async def process_image( image_file: UploadFile = File(...), box_threshold: float = 0.05, iou_threshold: float = 0.1, ): try: # Read image file contents = await image_file.read() try: image_input = Image.open(io.BytesIO(contents)).convert("RGB") except UnidentifiedImageError: logger.error("Unsupported image format.") raise HTTPException(status_code=400, detail="Unsupported image format.") # Create processing task task = asyncio.create_task(process(image_input, box_threshold, iou_threshold)) # Add task to queue await request_queue.put(task) logger.info(f"Task added to queue. Current queue size: {request_queue.qsize()}") # Wait for processing to complete response = await task return response except HTTPException as he: raise he except Exception as e: logger.error(f"Error processing image: {e}") raise HTTPException(status_code=500, detail=f"Internal server error: {e}")