File size: 11,181 Bytes
18faf97
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
# ----------------------------------------------------------------------
# IMPORTS
# ----------------------------------------------------------------------
import io
import json
import re
import traceback
import time
import logging
import os
from typing import Dict, List, Optional, Tuple, Any, Callable, Union
from PIL import Image
from pydantic import BaseModel, Field
import torch
import numpy as np

# Import from logging_utils
try:
    from .logging_utils import get_system_info, cleanup_memory, LOG_LEVEL_MAP, EMOJI_MAP
except ImportError:
    # Try alternative import paths
    try:
        from logging_utils import get_system_info, cleanup_memory, LOG_LEVEL_MAP, EMOJI_MAP
    except ImportError:
        # Fallback implementations
        def get_system_info():
            return {}
        
        def cleanup_memory():
            pass
        
        LOG_LEVEL_MAP = {}
        EMOJI_MAP = {}

# ----------------------------------------------------------------------
# CUSTOM EXCEPTIONS
# ----------------------------------------------------------------------
class ModelNotLoadedException(Exception):
    pass

class PipelineExecutionError(Exception):
    def __init__(self, message: str, step: Optional[str] = None):
        self.message = message
        self.step = step
        super().__init__(self.message)

class ConfigurationError(Exception):
    pass

class ModelInferenceError(Exception):
    pass

class ImageProcessingError(Exception):
    pass

# ----------------------------------------------------------------------
# REQUEST/RESPONSE MODELS
# ----------------------------------------------------------------------
class DetectRequest(BaseModel):
    data: List[Any]
    options: Optional[Dict[str, Any]] = Field(default_factory=dict)

class ProcessingError(BaseModel):
    type: str
    message: str
    step: str
    traceback: Optional[str] = None

class ProcessingWarning(BaseModel):
    type: str
    message: str
    step: str

class ProcessedImage(BaseModel):
    url: str
    status: str
    base64_image: Optional[str] = None
    color: Optional[str] = None
    image_type: Optional[str] = None
    artifacts: Optional[str] = None
    processing_time: Optional[float] = None
    detections: Optional[Dict[str, int]] = None

class ProcessingResponse(BaseModel):
    processed_images: List[ProcessedImage]
    status: Optional[str] = None
    warnings: Optional[List[ProcessingWarning]] = None
    total_processing_time: Optional[float] = None
    system_info: Optional[Dict[str, Any]] = None

# ----------------------------------------------------------------------
# DETECTION RESULT MODELS
# ----------------------------------------------------------------------
class BoundingBox(BaseModel):
    x1: int
    y1: int
    x2: int
    y2: int
    
    def to_list(self) -> List[int]:
        return [self.x1, self.y1, self.x2, self.y2]
    
    def area(self) -> int:
        return (self.x2 - self.x1) * (self.y2 - self.y1)
    
    def center(self) -> Tuple[float, float]:
        return ((self.x1 + self.x2) / 2, (self.y1 + self.y2) / 2)

class Detection(BaseModel):
    box: BoundingBox
    label: str
    score: float
    model: str
    keyword: Optional[str] = None

# ----------------------------------------------------------------------
# PROCESSING CONTEXT CLASS
# ----------------------------------------------------------------------
class ProcessingContext:
    def __init__(self, url: str, product_type: str, keywords: List[str]):
        self.url = url
        self.product_type = product_type
        self.keywords = keywords
        self.skip_run = False
        self.skip_processing = False
        self.filename: Optional[str] = None
        self.final_base64: Optional[str] = None

        self.pil_img: Dict[str, Any] = {}
        self.define_result: Dict[str, Any] = {}
        self.detection_result: Dict[str, Any] = {}
        self.grounding_dino_result: Dict[str, Any] = {}
        self.box_colors: List[Tuple[int, int, int, int]] = []
        self.adjusted_blue_box: Optional[Tuple[int, int, int, int]] = None

        self.final_color: str = "none"
        self.final_image_type: str = "none"

        self.pad_info = {"left": 0, "right": 0, "top": 0, "bottom": 0}
        
        # Performance tracking
        self.timing: Dict[str, float] = {}
        self.memory_usage: Dict[str, Dict[str, float]] = {}
    
    def add_timing(self, step: str, duration: float):
        self.timing[step] = duration
    
    def add_memory_usage(self, step: str):
        self.memory_usage[step] = get_system_info()

# ----------------------------------------------------------------------
# PIPELINE DECORATOR
# ----------------------------------------------------------------------
def create_pipeline_step(ensure_models_loaded_func: Callable) -> Callable:
    def pipeline_step(func: Callable) -> Callable:
        def wrapper(contexts: List[ProcessingContext], batch_logs: Optional[List[Dict]] = None) -> Any:
            if batch_logs is None:
                batch_logs = []
                
            # Only load models if not already loaded
            # The ensure_models_loaded_func should internally check if models are loaded
            ensure_models_loaded_func()
            start_time = time.time()
            
            try:
                # Memory cleanup before processing
                if len(contexts) > 10:  # For large batches
                    cleanup_memory()
                
                result = func(contexts, batch_logs)
                processing_time = round(time.time() - start_time, 3)
                processed_count = sum(
                    not context.skip_run and not context.skip_processing 
                    for context in contexts
                )
                
                log_data = {
                    "function_name": func.__name__,
                    "spent_time_seconds": processing_time,
                    "processed_image_count": processed_count,
                    "batch_log": batch_logs,
                    "system_info": get_system_info()
                }
                
                log_content = custom_dumps(log_data)
                print(log_content, flush=True)
                
                # Memory cleanup after processing large batches
                if processed_count > 10:
                    cleanup_memory()
                
                return result
                
            except Exception as e:
                error_trace = traceback.format_exc()
                processing_time = round(time.time() - start_time, 3)
                
                logging.error(f"Error in {func.__name__}: {str(e)}")
                
                error_log = {
                    "function_name": func.__name__,
                    "spent_time_seconds": processing_time,
                    "error": str(e),
                    "error_type": type(e).__name__,
                    "traceback": error_trace,
                    "system_info": get_system_info()
                }
                
                log_content = custom_dumps(error_log)
                print(log_content, flush=True)
                
                for context in contexts:
                    context.skip_run = True
                    
                batch_logs.append({
                    "function": func.__name__,
                    "status": "error",
                    "error": str(e),
                    "error_type": type(e).__name__
                })
                
                # Cleanup on error - but skip if CUDA initialization error
                if "CUDA must not be initialized" not in str(e):
                    try:
                        cleanup_memory()
                    except Exception:
                        pass  # Ignore cleanup errors
                
                raise
        return wrapper
    return pipeline_step

# ----------------------------------------------------------------------
# IMAGE UTILITIES
# ----------------------------------------------------------------------
def validate_image(image: Union[Image.Image, np.ndarray]) -> bool:
    if isinstance(image, Image.Image):
        return image.size[0] > 0 and image.size[1] > 0
    elif isinstance(image, np.ndarray):
        return image.shape[0] > 0 and image.shape[1] > 0
    return False

def resize_image_aspect_ratio(image: Image.Image, max_size: int = 1920) -> Image.Image:
    width, height = image.size
    if width > max_size or height > max_size:
        if width > height:
            new_width = max_size
            new_height = int(height * (max_size / width))
        else:
            new_height = max_size
            new_width = int(width * (max_size / height))
        return image.resize((new_width, new_height), Image.Resampling.LANCZOS)
    return image

# ----------------------------------------------------------------------
# JSON FORMATTING UTILITIES
# ----------------------------------------------------------------------
def custom_dumps(data: Any) -> str:
    def default_handler(obj):
        if isinstance(obj, (BoundingBox, Detection)):
            return obj.model_dump()
        elif isinstance(obj, np.ndarray):
            return obj.tolist()
        elif torch.is_tensor(obj):
            return obj.cpu().numpy().tolist()
        elif isinstance(obj, (np.integer, np.floating)):
            return obj.item()
        elif hasattr(obj, '__dict__'):
            return obj.__dict__
        else:
            return str(obj)
    
    text = json.dumps(data, indent=2, default=default_handler)
    
    # Format bounding boxes on single lines
    box_pattern = re.compile(
        r'\[\s*(-?\d+(?:\.\d+)?),\s*(-?\d+(?:\.\d+)?),\s*(-?\d+(?:\.\d+)?),\s*(-?\d+(?:\.\d+)?)(?:,\s*)?\s*\]',
        re.MULTILINE
    )
    text = box_pattern.sub(r'[\1, \2, \3, \4]', text)
    
    return text

# ----------------------------------------------------------------------
# ERROR RECOVERY UTILITIES
# ----------------------------------------------------------------------
def safe_model_inference(model_func: Callable, *args, **kwargs) -> Any:
    max_retries = kwargs.pop('max_retries', 3)
    retry_delay = kwargs.pop('retry_delay', 1.0)
    
    for attempt in range(max_retries):
        try:
            return model_func(*args, **kwargs)
        except (torch.cuda.OutOfMemoryError, RuntimeError) as e:
            if "out of memory" in str(e).lower():
                logging.warning(f"GPU OOM on attempt {attempt + 1}, cleaning memory...")
                cleanup_memory()
                if attempt < max_retries - 1:
                    time.sleep(retry_delay)
                    continue
            raise ModelInferenceError(f"Model inference failed: {str(e)}")
        except Exception as e:
            if attempt < max_retries - 1:
                logging.warning(f"Model inference attempt {attempt + 1} failed: {str(e)}")
                time.sleep(retry_delay)
                continue
            raise ModelInferenceError(f"Model inference failed after {max_retries} attempts: {str(e)}")
    
    raise ModelInferenceError("Model inference failed: max retries exceeded")