|
|
|
|
|
|
|
import io |
|
import json |
|
import re |
|
import traceback |
|
import time |
|
import logging |
|
import os |
|
from typing import Dict, List, Optional, Tuple, Any, Callable, Union |
|
from PIL import Image |
|
from pydantic import BaseModel, Field |
|
import torch |
|
import numpy as np |
|
|
|
|
|
try: |
|
from .logging_utils import get_system_info, cleanup_memory, LOG_LEVEL_MAP, EMOJI_MAP |
|
except ImportError: |
|
|
|
try: |
|
from logging_utils import get_system_info, cleanup_memory, LOG_LEVEL_MAP, EMOJI_MAP |
|
except ImportError: |
|
|
|
def get_system_info(): |
|
return {} |
|
|
|
def cleanup_memory(): |
|
pass |
|
|
|
LOG_LEVEL_MAP = {} |
|
EMOJI_MAP = {} |
|
|
|
|
|
|
|
|
|
class ModelNotLoadedException(Exception): |
|
pass |
|
|
|
class PipelineExecutionError(Exception): |
|
def __init__(self, message: str, step: Optional[str] = None): |
|
self.message = message |
|
self.step = step |
|
super().__init__(self.message) |
|
|
|
class ConfigurationError(Exception): |
|
pass |
|
|
|
class ModelInferenceError(Exception): |
|
pass |
|
|
|
class ImageProcessingError(Exception): |
|
pass |
|
|
|
|
|
|
|
|
|
class DetectRequest(BaseModel): |
|
data: List[Any] |
|
options: Optional[Dict[str, Any]] = Field(default_factory=dict) |
|
|
|
class ProcessingError(BaseModel): |
|
type: str |
|
message: str |
|
step: str |
|
traceback: Optional[str] = None |
|
|
|
class ProcessingWarning(BaseModel): |
|
type: str |
|
message: str |
|
step: str |
|
|
|
class ProcessedImage(BaseModel): |
|
url: str |
|
status: str |
|
base64_image: Optional[str] = None |
|
color: Optional[str] = None |
|
image_type: Optional[str] = None |
|
artifacts: Optional[str] = None |
|
processing_time: Optional[float] = None |
|
detections: Optional[Dict[str, int]] = None |
|
|
|
class ProcessingResponse(BaseModel): |
|
processed_images: List[ProcessedImage] |
|
status: Optional[str] = None |
|
warnings: Optional[List[ProcessingWarning]] = None |
|
total_processing_time: Optional[float] = None |
|
system_info: Optional[Dict[str, Any]] = None |
|
|
|
|
|
|
|
|
|
class BoundingBox(BaseModel): |
|
x1: int |
|
y1: int |
|
x2: int |
|
y2: int |
|
|
|
def to_list(self) -> List[int]: |
|
return [self.x1, self.y1, self.x2, self.y2] |
|
|
|
def area(self) -> int: |
|
return (self.x2 - self.x1) * (self.y2 - self.y1) |
|
|
|
def center(self) -> Tuple[float, float]: |
|
return ((self.x1 + self.x2) / 2, (self.y1 + self.y2) / 2) |
|
|
|
class Detection(BaseModel): |
|
box: BoundingBox |
|
label: str |
|
score: float |
|
model: str |
|
keyword: Optional[str] = None |
|
|
|
|
|
|
|
|
|
class ProcessingContext: |
|
def __init__(self, url: str, product_type: str, keywords: List[str]): |
|
self.url = url |
|
self.product_type = product_type |
|
self.keywords = keywords |
|
self.skip_run = False |
|
self.skip_processing = False |
|
self.filename: Optional[str] = None |
|
self.final_base64: Optional[str] = None |
|
|
|
self.pil_img: Dict[str, Any] = {} |
|
self.define_result: Dict[str, Any] = {} |
|
self.detection_result: Dict[str, Any] = {} |
|
self.grounding_dino_result: Dict[str, Any] = {} |
|
self.box_colors: List[Tuple[int, int, int, int]] = [] |
|
self.adjusted_blue_box: Optional[Tuple[int, int, int, int]] = None |
|
|
|
self.final_color: str = "none" |
|
self.final_image_type: str = "none" |
|
|
|
self.pad_info = {"left": 0, "right": 0, "top": 0, "bottom": 0} |
|
|
|
|
|
self.timing: Dict[str, float] = {} |
|
self.memory_usage: Dict[str, Dict[str, float]] = {} |
|
|
|
def add_timing(self, step: str, duration: float): |
|
self.timing[step] = duration |
|
|
|
def add_memory_usage(self, step: str): |
|
self.memory_usage[step] = get_system_info() |
|
|
|
|
|
|
|
|
|
def create_pipeline_step(ensure_models_loaded_func: Callable) -> Callable: |
|
def pipeline_step(func: Callable) -> Callable: |
|
def wrapper(contexts: List[ProcessingContext], batch_logs: Optional[List[Dict]] = None) -> Any: |
|
if batch_logs is None: |
|
batch_logs = [] |
|
|
|
|
|
|
|
ensure_models_loaded_func() |
|
start_time = time.time() |
|
|
|
try: |
|
|
|
if len(contexts) > 10: |
|
cleanup_memory() |
|
|
|
result = func(contexts, batch_logs) |
|
processing_time = round(time.time() - start_time, 3) |
|
processed_count = sum( |
|
not context.skip_run and not context.skip_processing |
|
for context in contexts |
|
) |
|
|
|
log_data = { |
|
"function_name": func.__name__, |
|
"spent_time_seconds": processing_time, |
|
"processed_image_count": processed_count, |
|
"batch_log": batch_logs, |
|
"system_info": get_system_info() |
|
} |
|
|
|
log_content = custom_dumps(log_data) |
|
print(log_content, flush=True) |
|
|
|
|
|
if processed_count > 10: |
|
cleanup_memory() |
|
|
|
return result |
|
|
|
except Exception as e: |
|
error_trace = traceback.format_exc() |
|
processing_time = round(time.time() - start_time, 3) |
|
|
|
logging.error(f"Error in {func.__name__}: {str(e)}") |
|
|
|
error_log = { |
|
"function_name": func.__name__, |
|
"spent_time_seconds": processing_time, |
|
"error": str(e), |
|
"error_type": type(e).__name__, |
|
"traceback": error_trace, |
|
"system_info": get_system_info() |
|
} |
|
|
|
log_content = custom_dumps(error_log) |
|
print(log_content, flush=True) |
|
|
|
for context in contexts: |
|
context.skip_run = True |
|
|
|
batch_logs.append({ |
|
"function": func.__name__, |
|
"status": "error", |
|
"error": str(e), |
|
"error_type": type(e).__name__ |
|
}) |
|
|
|
|
|
if "CUDA must not be initialized" not in str(e): |
|
try: |
|
cleanup_memory() |
|
except Exception: |
|
pass |
|
|
|
raise |
|
return wrapper |
|
return pipeline_step |
|
|
|
|
|
|
|
|
|
def validate_image(image: Union[Image.Image, np.ndarray]) -> bool: |
|
if isinstance(image, Image.Image): |
|
return image.size[0] > 0 and image.size[1] > 0 |
|
elif isinstance(image, np.ndarray): |
|
return image.shape[0] > 0 and image.shape[1] > 0 |
|
return False |
|
|
|
def resize_image_aspect_ratio(image: Image.Image, max_size: int = 1920) -> Image.Image: |
|
width, height = image.size |
|
if width > max_size or height > max_size: |
|
if width > height: |
|
new_width = max_size |
|
new_height = int(height * (max_size / width)) |
|
else: |
|
new_height = max_size |
|
new_width = int(width * (max_size / height)) |
|
return image.resize((new_width, new_height), Image.Resampling.LANCZOS) |
|
return image |
|
|
|
|
|
|
|
|
|
def custom_dumps(data: Any) -> str: |
|
def default_handler(obj): |
|
if isinstance(obj, (BoundingBox, Detection)): |
|
return obj.model_dump() |
|
elif isinstance(obj, np.ndarray): |
|
return obj.tolist() |
|
elif torch.is_tensor(obj): |
|
return obj.cpu().numpy().tolist() |
|
elif isinstance(obj, (np.integer, np.floating)): |
|
return obj.item() |
|
elif hasattr(obj, '__dict__'): |
|
return obj.__dict__ |
|
else: |
|
return str(obj) |
|
|
|
text = json.dumps(data, indent=2, default=default_handler) |
|
|
|
|
|
box_pattern = re.compile( |
|
r'\[\s*(-?\d+(?:\.\d+)?),\s*(-?\d+(?:\.\d+)?),\s*(-?\d+(?:\.\d+)?),\s*(-?\d+(?:\.\d+)?)(?:,\s*)?\s*\]', |
|
re.MULTILINE |
|
) |
|
text = box_pattern.sub(r'[\1, \2, \3, \4]', text) |
|
|
|
return text |
|
|
|
|
|
|
|
|
|
def safe_model_inference(model_func: Callable, *args, **kwargs) -> Any: |
|
max_retries = kwargs.pop('max_retries', 3) |
|
retry_delay = kwargs.pop('retry_delay', 1.0) |
|
|
|
for attempt in range(max_retries): |
|
try: |
|
return model_func(*args, **kwargs) |
|
except (torch.cuda.OutOfMemoryError, RuntimeError) as e: |
|
if "out of memory" in str(e).lower(): |
|
logging.warning(f"GPU OOM on attempt {attempt + 1}, cleaning memory...") |
|
cleanup_memory() |
|
if attempt < max_retries - 1: |
|
time.sleep(retry_delay) |
|
continue |
|
raise ModelInferenceError(f"Model inference failed: {str(e)}") |
|
except Exception as e: |
|
if attempt < max_retries - 1: |
|
logging.warning(f"Model inference attempt {attempt + 1} failed: {str(e)}") |
|
time.sleep(retry_delay) |
|
continue |
|
raise ModelInferenceError(f"Model inference failed after {max_retries} attempts: {str(e)}") |
|
|
|
raise ModelInferenceError("Model inference failed: max retries exceeded") |