import logging import time import timeout_decorator import io import os import zipfile import json import cv2 import torch import numpy as np from PIL import Image from registry import get_model from core.describe_scene import describe_scene from utils.helpers import generate_session_id, log_runtime # Setup logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # Model mappings DETECTION_MODEL_MAP = { "YOLOv8-Nano": "yolov8n", "YOLOv8-Small": "yolov8s", "YOLOv8-Large": "yolov8l", "YOLOv11-Beta": "yolov11b" } SEGMENTATION_MODEL_MAP = { "SegFormer-B0": "segformer_b0", "SegFormer-B5": "segformer_b5", "DeepLabV3-ResNet50": "deeplabv3_resnet50" } DEPTH_MODEL_MAP = { "MiDaS v21 Small 256": "midas_v21_small_256", "MiDaS v21 384": "midas_v21_384", "DPT Hybrid 384": "dpt_hybrid_384", "DPT Swin2 Large 384": "dpt_swin2_large_384", "DPT Beit Large 512": "dpt_beit_large_512" } #@timeout_decorator.timeout(35, use_signals=False) # 35 sec limit per image def process_image( image: Image.Image, run_det: bool, det_model: str, det_confidence: float, run_seg: bool, seg_model: str, run_depth: bool, depth_model: str, blend: float ): """ Runs selected perception tasks on the input image and packages results. Args: image (PIL.Image): Input image. run_det (bool): Run object detection. det_model (str): Detection model key. det_confidence (float): Detection confidence threshold. run_seg (bool): Run segmentation. seg_model (str): Segmentation model key. run_depth (bool): Run depth estimation. depth_model (str): Depth model key. blend (float): Overlay blend alpha (0.0 - 1.0). Returns: Tuple[Image, dict, Tuple[str, bytes]]: Final image, scene JSON, and downloadable ZIP. """ logger.info("Starting image processing pipeline.") start_time = time.time() outputs, scene = {}, {} combined_np = np.array(image) try: # Detection if run_det: logger.info(f"Running detection with model: {det_model}") load_start = time.time() model = get_model("detection", det_model, device="cpu") model.load_model() logger.info(f"{det_model} detection model loaded in {time.time() - load_start:.2f} seconds.") boxes = model.predict(image, conf_threshold=det_confidence) overlay = model.draw(image, boxes) combined_np = np.array(overlay) buf = io.BytesIO() overlay.save(buf, format="PNG") outputs["detection.png"] = buf.getvalue() scene["detection"] = boxes # Segmentation if run_seg: logger.info(f"Running segmentation with model: {seg_model}") load_start = time.time() model = get_model("segmentation", seg_model, device="cpu") logger.info(f"{seg_model} segmentation model loaded in {time.time() - load_start:.2f} seconds.") mask = model.predict(image) overlay = model.draw(image, mask, alpha=blend) combined_np = cv2.addWeighted(combined_np, 1 - blend, np.array(overlay), blend, 0) buf = io.BytesIO() overlay.save(buf, format="PNG") outputs["segmentation.png"] = buf.getvalue() scene["segmentation"] = mask.tolist() # Depth Estimation if run_depth: logger.info(f"Running depth estimation with model: {depth_model}") load_start = time.time() model = get_model("depth", depth_model, device="cpu") logger.info(f"{depth_model} depth model loaded in {time.time() - load_start:.2f} seconds.") dmap = model.predict(image) norm_dmap = ((dmap - dmap.min()) / (dmap.ptp()) * 255).astype(np.uint8) d_pil = Image.fromarray(norm_dmap) combined_np = cv2.addWeighted(combined_np, 1 - blend, np.array(d_pil.convert("RGB")), blend, 0) buf = io.BytesIO() d_pil.save(buf, format="PNG") outputs["depth_map.png"] = buf.getvalue() scene["depth"] = dmap.tolist() # Final image overlay final_img = Image.fromarray(combined_np) buf = io.BytesIO() final_img.save(buf, format="PNG") outputs["scene_blueprint.png"] = buf.getvalue() # Scene description try: scene_json = describe_scene(**scene) except Exception as e: logger.warning(f"describe_scene failed: {e}") scene_json = {"error": str(e)} telemetry = { "session_id": generate_session_id(), "runtime_sec": round(log_runtime(start_time), 2), "used_models": { "detection": det_model if run_det else None, "segmentation": seg_model if run_seg else None, "depth": depth_model if run_depth else None } } scene_json["telemetry"] = telemetry outputs["scene_description.json"] = json.dumps(scene_json, indent=2).encode("utf-8") # ZIP file creation zip_buf = io.BytesIO() with zipfile.ZipFile(zip_buf, "w") as zipf: for name, data in outputs.items(): zipf.writestr(name, data) elapsed = log_runtime(start_time) logger.info(f"Image processing completed in {elapsed:.2f} seconds.") #return final_img, scene_json, ("uvis_results.zip", zip_buf.getvalue()) # Save ZIP to disk for Gradio file output zip_path = "outputs/uvis_results.zip" os.makedirs("outputs", exist_ok=True) with open(zip_path, "wb") as f: f.write(zip_buf.getvalue()) return final_img, scene_json, zip_path except Exception as e: logger.error(f"Error in processing pipeline: {e}") return None, {"error": str(e)}, None