# ---------------------------------------------------------------------- # IMPORTS # ---------------------------------------------------------------------- import numpy as np import logging import time from PIL import Image, ImageDraw from typing import Dict, List, Optional, Tuple, Any from scipy.ndimage import ( binary_fill_holes, binary_closing, label, find_objects ) from src.utils import ProcessingContext, create_pipeline_step, LOG_LEVEL_MAP, EMOJI_MAP # ---------------------------------------------------------------------- # CONSTANTS # ---------------------------------------------------------------------- UNIVERSAL_PAD_RATIO = 0.075 COVERAGE_THRESHOLD = 0.25 FEATHER_THRESHOLD_MIN = 0.3 FEATHER_THRESHOLD_MAX = 0.7 ENABLE_CROPPING_PADDING = True PRODUCT_TYPE_LIST = ["jacket", "shirt", "vest", "jeans", "shorts", "skirt", "overall", "dress"] HEAD_LIST = ["head"] SHOES_LIST = ["shoes"] CLOTHING_FEATURES_LIST = ["neckline", "collar", "sleeve", "closure", "pocket"] # ---------------------------------------------------------------------- # HELPER FUNCTIONS # ---------------------------------------------------------------------- def calculate_transparency(image, coverage_threshold=0.9999): alpha = image.getchannel("A") px = alpha.load() w, h = image.size total_pixels = w * h non_transparent = 0 for y in range(h): for x in range(w): if px[x, y] >= 1: non_transparent += 1 ratio = non_transparent / float(total_pixels) if total_pixels else 0 return ratio def parse_line_flag(val) -> bool: if isinstance(val, bool): return val if isinstance(val, str): return val.strip().lower().startswith("true") return False def partial_pad_square( img: Image.Image, pad_left: int, pad_right: int, pad_top: int, pad_bottom: int ) -> Tuple[Image.Image, Dict[str,int]]: w,h= img.size new_w= w+ pad_left + pad_right new_h= h+ pad_top + pad_bottom side= max(new_w, new_h) out= Image.new("RGBA",(side, side),(0,0,0,0)) offx= (side- new_w)//2 + pad_left offy= (side- new_h)//2 + pad_top out.paste(img,(offx,offy)) changes= { "left": pad_left, "right": pad_right, "top": pad_top, "bottom": pad_bottom } return (out, changes) def two_step_pad_to_square( img: Image.Image, orientation: str, border_pad: int )->Tuple[Image.Image, Dict[str,int]]: changes= {"left":0,"right":0,"top":0,"bottom":0} w2,h2= img.size working= img if orientation=="Landscape" and w2>h2: diff= w2- h2 tpad= diff//2 bpad= diff- tpad wtmp,c1= partial_pad_square(working,0,0,tpad,bpad) working= wtmp for k_ in c1: changes[k_]+= c1[k_] elif orientation=="Portrait" and h2> w2: diff= h2- w2 lpad= diff//2 rpad= diff- lpad wtmp,c2= partial_pad_square(working,lpad,rpad,0,0) working= wtmp for k_ in c2: changes[k_]+= c2[k_] wtmp2,c3= partial_pad_square(working, border_pad,border_pad,border_pad,border_pad) for k_ in c3: changes[k_]+= c3[k_] return (wtmp2, changes) def pad_left_right_only( img: Image.Image, pad_val: int )->Tuple[Image.Image, Dict[str,int]]: changes= {"left":0,"right":0,"top":0,"bottom":0} w3,h3= img.size new_w= w3+ pad_val*2 new_h= h3 side3= max(new_w, new_h) out3= Image.new("RGBA",(side3,side3),(0,0,0,0)) offx3= (side3- new_w)//2 + pad_val offy3= (side3- new_h)//2 out3.paste(img,(offx3,offy3)) changes["left"] += pad_val changes["right"]+= pad_val return (out3, changes) def _center_min_square( img: Image.Image, orientation: str )->Tuple[Image.Image, Dict[str,int]]: w,h= img.size side_= min(w,h) l_= (w-side_)//2 t_= (h-side_)//2 r_= l_+ side_ b_= t_+ side_ crp= img.crop((l_,t_,r_,b_)) chg={ "left": -l_, "top": -t_, "right": -(w- r_), "bottom": -(h- b_) } return (crp, chg) def coverage_crop_with_shorter_dimension( img: Image.Image, ctx: "ProcessingContext", orientation: str, force_side_to_min: bool )->Tuple[Image.Image, Dict[str,int]]: w,h= img.size rbc_min= min(w,h) def fallback_center_crop(): return _center_min_square(img, orientation) dr= ctx.detection_result if not dr or dr.get("status")!="ok": return fallback_center_crop() bxs= dr.get("boxes",[]) kws= dr.get("final_keywords",[]) if not bxs or not kws or len(bxs)!= len(kws): return fallback_center_crop() cf_ = [bx for (bx,kw) in zip(bxs,kws) if kw in CLOTHING_FEATURES_LIST] if not cf_: fallback_box= ctx.define_result.get("largest_box") if fallback_box and isinstance(fallback_box,list) and len(fallback_box)==4: cf_=[fallback_box] else: return fallback_center_crop() x1= max(0, min(b[0] for b in cf_)) y1= max(0, min(b[1] for b in cf_)) x2= min(w, max(b[2] for b in cf_)) y2= min(h, max(b[3] for b in cf_)) bw= x2- x1 bh= y2- y1 if bw<=0 or bh<=0: return fallback_center_crop() side0= min(bw,bh) if side0< rbc_min: side0= rbc_min cx= (x1+ x2)//2 cy= (y1+ y2)//2 half= side0//2 left_= cx- half top_= cy- half right_= left_ + side0 bot_= top_ + side0 if left_<0: left_=0 right_= side0 if top_<0: top_=0 bot_= side0 if right_> w: right_= w left_= w- side0 if bot_> h: bot_= h top_= h- side0 cropped= img.crop((left_,top_,right_,bot_)) changes={ "left": -left_, "top": -top_, "right": -(w- right_), "bottom": -(h- bot_) } return (cropped, changes) # ---------------------------------------------------------------------- # ACTION DICTIONARIES # ---------------------------------------------------------------------- ACTION_LIB_SQUARE = { "square_shoes_exception": "SQUARE_SHOES => pad bottom ignoring lower_line, no top", "square_head_exception": "SQUARE_HEAD => lines => no changes", "square_has_lines": "SQUARE_HAS_LINES => lines => no changes", "square_all_false": "SQUARE_ALL_FALSE => no lines => 2-step pad => square", } ACTION_LIB_LANDSCAPE = { "landscape_shoes_exception":"LANDSCAPE_SHOES => pad bottom, coverage-crop => square", "landscape_coverage": "LANDSCAPE_COVERAGE => any line => coverage-crop => square", "landscape_all_false": "LANDSCAPE_ALL_FALSE => no lines => 2-step pad => square", "landscape_head_exception": "LANDSCAPE_HEAD => remove top pad, keep shape square", } ACTION_LIB_PORTRAIT = { "portrait_shoes_exception": "PORTRAIT_SHOES => never pad top, pad bottom ignoring lower_line", "portrait_lr_any": "PORTRAIT_LR_COVERAGE => (left_line/right_line) => coverage-crop => square", "portrait_up_low_both": "PORTRAIT_BOTH_UP_LOW => pad left/right only => no top/bottom => square", "portrait_any_up_low": "PORTRAIT_ANY_UP_OR_LOW => exactly 1 => 2-step pad => square", "portrait_all_false": "PORTRAIT_ALL_FALSE => no lines => 2-step pad => square", "portrait_head_exception": "PORTRAIT_HEAD => pad left/right only => no top/bottom => square" } # ---------------------------------------------------------------------- # PIPELINE FUNCTIONS # ---------------------------------------------------------------------- def cropp_batch(contexts: List[ProcessingContext], batch_logs: List[dict]): function_name = "cropp_batch" start_time = time.perf_counter() logging.log(LOG_LEVEL_MAP["INFO"], f"{EMOJI_MAP['INFO']} Starting {function_name} for {len(contexts)} items") processed_count = 0 skipped_count = 0 error_count = 0 morph_structure = np.ones((5, 5), dtype=np.uint8) for ctx in contexts: it_ = {"image_url": ctx.url, "function": function_name} if ctx.skip_run or ctx.skip_processing: it_["status"] = "skipped" batch_logs.append(it_) skipped_count += 1 continue dr = ctx.detection_result if dr.get("status") != "ok": it_["status"] = "no_detection" batch_logs.append(it_) error_count += 1 continue if "original" not in ctx.pil_img: it_["status"] = "no_original" batch_logs.append(it_) error_count += 1 continue try: pi_rgba, orig_filename, _ = ctx.pil_img["original"] if ctx.adjusted_blue_box: abx1, aby1, abx2, aby2 = ctx.adjusted_blue_box W_full = pi_rgba.width H_full = pi_rgba.height x1 = 0 x2 = W_full y1 = max(0, min(aby1, H_full)) y2 = max(0, min(aby2, H_full)) if y2 <= y1: it_["status"] = "invalid_crop_range" batch_logs.append(it_) error_count += 1 continue cropped = pi_rgba.crop((x1, y1, x2, y2)) else: cropped = pi_rgba cropped_np = np.array(cropped) if cropped_np.shape[2] < 4: cropped = cropped.convert("RGBA") cropped_np = np.array(cropped) alpha = cropped_np[:, :, 3] bin_mask = (alpha > 0).astype(np.uint8) bin_mask = binary_fill_holes(bin_mask).astype(np.uint8) bin_mask = binary_closing(bin_mask, structure=morph_structure, iterations=1).astype(np.uint8) labeled, num_components = label(bin_mask) if num_components > 1: largest_area = 0 largest_label = None for i in range(1, num_components + 1): area = (labeled == i).sum() if area > largest_area: largest_area = area largest_label = i bin_mask = (labeled == largest_label).astype(np.uint8) alpha_clean = alpha.copy() alpha_clean[bin_mask == 0] = 0 cropped_np[:, :, 3] = alpha_clean non_zero_rows = np.where(np.any(bin_mask != 0, axis=1))[0] non_zero_cols = np.where(np.any(bin_mask != 0, axis=0))[0] if len(non_zero_rows) > 0 and len(non_zero_cols) > 0: row_min, row_max = non_zero_rows[0], non_zero_rows[-1] col_min, col_max = non_zero_cols[0], non_zero_cols[-1] cropped_np = cropped_np[row_min:row_max + 1, col_min:col_max + 1, :] final_img = Image.fromarray(cropped_np, mode="RGBA") ctx.pil_img["original"] = [final_img, orig_filename, None] it_["status"] = "ok" processed_count += 1 except Exception as e: it_["status"] = "error" it_["exception"] = str(e) error_count += 1 batch_logs.append(it_) processing_time = time.perf_counter() - start_time logging.log(LOG_LEVEL_MAP["SUCCESS"], f"{EMOJI_MAP['SUCCESS']} Completed {function_name}: processed={processed_count}, skipped={skipped_count}, errors={error_count} in {processing_time:.3f}s") return batch_logs def shrink_primary_box_batch(contexts: List[ProcessingContext], batch_logs: List[dict]): function_name = "shrink_primary_box_batch" start_time = time.perf_counter() logging.log(LOG_LEVEL_MAP["INFO"], f"{EMOJI_MAP['INFO']} Starting {function_name} for {len(contexts)} items") processed_count = 0 skipped_count = 0 error_count = 0 WHITE_CUTOFF = 240 for ctx in contexts: step_log = { "function": function_name, "image_url": ctx.url, "status": None, "data": { "primary_box": None, "primary_box_dimensions": None, "primary_box_orientation": None, "primary_box_transparency": None, "primary_box_border_lines_transparency": {}, "primary_shrinked_box_dimensions": None, "primary_shrinked_box_transparency": None, "primary_shrinked_box_border_lines_transparency": {}, "shrink_top": None, "shrink_bottom": None, "shrink_left": None, "shrink_right": None, "notes": "" } } if ctx.skip_run or ctx.skip_processing: step_log["status"] = "skipped" batch_logs.append(step_log) skipped_count += 1 continue if "original" not in ctx.pil_img: step_log["status"] = "error" step_log["data"]["notes"] = "No original image found in context." batch_logs.append(step_log) ctx.skip_run = True error_count += 1 continue try: pil_img_obj = ctx.pil_img["original"][0] width, height = pil_img_obj.size alpha = pil_img_obj.getchannel("A") top, bottom = 0, height - 1 left, right = 0, width - 1 while top < height: row_data = alpha.crop((0, top, width, top + 1)).tobytes() if all(v == 0 for v in row_data): top += 1 else: break while bottom >= 0: row_data = alpha.crop((0, bottom, width, bottom + 1)).tobytes() if all(v == 0 for v in row_data): bottom -= 1 else: break while left < width: col_data = alpha.crop((left, 0, left + 1, height)).tobytes() if all(v == 0 for v in col_data): left += 1 else: break while right >= 0: col_data = alpha.crop((right, 0, right + 1, height)).tobytes() if all(v == 0 for v in col_data): right -= 1 else: break pil_rgb = pil_img_obj.convert("RGB") px = pil_rgb.load() def is_white_row(row_idx: int) -> bool: for x in range(left, right + 1): r, g, b = px[x, row_idx] if not (r >= WHITE_CUTOFF and g >= WHITE_CUTOFF and b >= WHITE_CUTOFF): return False return True def is_white_col(col_idx: int) -> bool: for y in range(top, bottom + 1): r, g, b = px[col_idx, y] if not (r >= WHITE_CUTOFF and g >= WHITE_CUTOFF and b >= WHITE_CUTOFF): return False return True while top <= bottom: if is_white_row(top): top += 1 else: break while bottom >= top: if is_white_row(bottom): bottom -= 1 else: break while left <= right: if is_white_col(left): left += 1 else: break while right >= left: if is_white_col(right): right -= 1 else: break if left > right or top > bottom: step_log["data"]["notes"] += " Entire image trimmed away by alpha/white => skipping" step_log["status"] = "error" batch_logs.append(step_log) ctx.skip_run = True error_count += 1 continue shrink_top = top shrink_bottom = (height - 1) - bottom shrink_left = left shrink_right = (width - 1) - right step_log["data"]["shrink_top"] = shrink_top step_log["data"]["shrink_bottom"] = shrink_bottom step_log["data"]["shrink_left"] = shrink_left step_log["data"]["shrink_right"] = shrink_right primary_box = [left, top, right, bottom] w = right - left + 1 h = bottom - top + 1 step_log["data"]["primary_box"] = primary_box step_log["data"]["primary_box_dimensions"] = [w, h] orientation = "Square" if h > w: orientation = "Portrait" elif w > h: orientation = "Landscape" step_log["data"]["primary_box_orientation"] = orientation cropped_img = pil_img_obj.crop((left, top, right + 1, bottom + 1)) box_transparency = calculate_transparency(cropped_img) step_log["data"]["primary_box_transparency"] = box_transparency ctx.pil_img["original"] = [cropped_img, ctx.pil_img["original"][1], None] cw, ch = cropped_img.size step_log["data"]["primary_shrinked_box_dimensions"] = [cw, ch] step_log["data"]["primary_shrinked_box_transparency"] = box_transparency step_log["status"] = "ok" step_log["data"]["notes"] += " alpha+white trim done." ctx.define_result["primary_box_transparency"] = box_transparency processed_count += 1 except Exception as e: step_log["status"] = "error" step_log["data"]["notes"] = f"Exception: {e}" error_count += 1 batch_logs.append(step_log) processing_time = time.perf_counter() - start_time logging.log(LOG_LEVEL_MAP["SUCCESS"], f"{EMOJI_MAP['SUCCESS']} Completed {function_name}: processed={processed_count}, skipped={skipped_count}, errors={error_count} in {processing_time:.3f}s") return batch_logs def detect_border_stright_line_batch(contexts: List[ProcessingContext], batch_logs: List[dict]): function_name = "detect_border_stright_line_batch" start_time = time.perf_counter() logging.log(LOG_LEVEL_MAP["INFO"], f"{EMOJI_MAP['INFO']} Starting {function_name} for {len(contexts)} items") processed_count = 0 skipped_count = 0 error_count = 0 local_patch_size = 7 std_threshold = 5 for ctx in contexts: step_log = { "function": function_name, "image_url": ctx.url, "status": None, "data": { "left_line": False, "right_line": False, "upper_line": False, "lower_line": False, "left_line_coverage": 0.0, "right_line_coverage": 0.0, "upper_line_coverage": 0.0, "lower_line_coverage": 0.0, "left_feather_ratio": 0.0, "right_feather_ratio": 0.0, "upper_feather_ratio": 0.0, "lower_feather_ratio": 0.0, "performed_action": "single_px_border_feather_ratio_inverted_logic", "current_feather_threshold": (0.0, 0.0), "current_coverage_threshold": 0.0 } } if ctx.skip_run or ctx.skip_processing: step_log["status"] = "skipped" batch_logs.append(step_log) skipped_count += 1 continue if "original" not in ctx.pil_img: step_log["status"] = "error" step_log["error"] = "No padded image found in context." ctx.skip_run = True batch_logs.append(step_log) error_count += 1 continue try: pil_img_obj = ctx.pil_img["original"][0] w, h = pil_img_obj.size if w == 0 or h == 0: step_log["status"] = "error" step_log["error"] = f"Invalid dims (w={w}, h={h})" ctx.skip_run = True batch_logs.append(step_log) error_count += 1 continue pil_rgba = pil_img_obj.convert("RGBA") top_cov, bot_cov, left_cov, right_cov = 0.0, 0.0, 0.0, 0.0 if h > 0: strip_top = pil_rgba.crop((0, 0, w, 1)) top_cov = calculate_transparency(strip_top) step_log["data"]["upper_line_coverage"] = round(top_cov, 3) if h > 1: strip_bot = pil_rgba.crop((0, h - 1, w, h)) bot_cov = calculate_transparency(strip_bot) step_log["data"]["lower_line_coverage"] = round(bot_cov, 3) if w > 0: strip_left = pil_rgba.crop((0, 0, 1, h)) left_cov = calculate_transparency(strip_left) step_log["data"]["left_line_coverage"] = round(left_cov, 3) if w > 1: strip_right = pil_rgba.crop((w - 1, 0, w, h)) right_cov = calculate_transparency(strip_right) step_log["data"]["right_line_coverage"] = round(right_cov, 3) px_data = pil_rgba.load() def patch_alpha_values(cx, cy): half = local_patch_size // 2 vals = [] for dy in range(-half, half+1): for dx in range(-half, half+1): nx = cx + dx ny = cy + dy if 0 <= nx < w and 0 <= ny < h: _, _, _, a_ = px_data[nx, ny] vals.append(a_) return vals def is_feather_pixel(alpha_vals): if len(alpha_vals) <= 1: return True avg_ = sum(alpha_vals) / len(alpha_vals) var_ = sum((v - avg_)**2 for v in alpha_vals) / len(alpha_vals) return (var_**0.5 < std_threshold) def measure_feather_ratio(x1, y1, x2, y2): ww = x2 - x1 hh = y2 - y1 total_ = ww * hh if total_ <= 0: return 0.0 c_ = 0 for yy in range(y1, y2): for xx in range(x1, x2): pv = patch_alpha_values(xx, yy) if is_feather_pixel(pv): c_ += 1 return c_ / float(total_) top_f = 0.0 bot_f = 0.0 left_f = 0.0 right_f = 0.0 if h > 0: top_f = measure_feather_ratio(0, 0, w, 1) step_log["data"]["upper_feather_ratio"] = round(top_f, 3) if h > 1: bot_f = measure_feather_ratio(0, h - 1, w, h) step_log["data"]["lower_feather_ratio"] = round(bot_f, 3) if w > 0: left_f = measure_feather_ratio(0, 0, 1, h) step_log["data"]["left_feather_ratio"] = round(left_f, 3) if w > 1: right_f = measure_feather_ratio(w - 1, 0, w, h) step_log["data"]["right_feather_ratio"] = round(right_f, 3) if top_cov >= COVERAGE_THRESHOLD and (top_f < FEATHER_THRESHOLD_MIN or top_f > FEATHER_THRESHOLD_MAX): step_log["data"]["upper_line"] = True if bot_cov >= COVERAGE_THRESHOLD and (bot_f < FEATHER_THRESHOLD_MIN or bot_f > FEATHER_THRESHOLD_MAX): step_log["data"]["lower_line"] = True if left_cov >= COVERAGE_THRESHOLD and (left_f < FEATHER_THRESHOLD_MIN or left_f > FEATHER_THRESHOLD_MAX): step_log["data"]["left_line"] = True if right_cov >= COVERAGE_THRESHOLD and (right_f < FEATHER_THRESHOLD_MIN or right_f > FEATHER_THRESHOLD_MAX): step_log["data"]["right_line"] = True ctx.define_result["borders"] = { "left_line": step_log["data"]["left_line"], "right_line": step_log["data"]["right_line"], "upper_line": step_log["data"]["upper_line"], "lower_line": step_log["data"]["lower_line"] } step_log["data"]["current_feather_threshold"] = (FEATHER_THRESHOLD_MIN, FEATHER_THRESHOLD_MAX) step_log["data"]["current_coverage_threshold"] = COVERAGE_THRESHOLD step_log["status"] = "ok" processed_count += 1 except Exception as e: step_log["status"] = "error" step_log["error"] = str(e) error_count += 1 batch_logs.append(step_log) processing_time = time.perf_counter() - start_time logging.log(LOG_LEVEL_MAP["SUCCESS"], f"{EMOJI_MAP['SUCCESS']} Completed {function_name}: processed={processed_count}, skipped={skipped_count}, errors={error_count} in {processing_time:.3f}s") return batch_logs def pad_image_box_to_squere_batch(contexts: List[ProcessingContext], batch_logs: List[dict]): function_name = "pad_image_box_to_squere_batch" start_time = time.perf_counter() logging.log(LOG_LEVEL_MAP["INFO"], f"{EMOJI_MAP['INFO']} Starting {function_name} for {len(contexts)} items") processed_count = 0 skipped_count = 0 error_count = 0 for ctx in contexts: step_log = { "function": function_name, "image_url": ctx.url, "status": None, "data": { "primary_width": None, "primary_height": None, "primary_orientation": None, "border_lines": {}, "final_width": None, "final_height": None, "condition": None, "actions": [] } } if ctx.skip_run or ctx.skip_processing: step_log["status"]="skipped" batch_logs.append(step_log) skipped_count += 1 continue if "original" not in ctx.pil_img: step_log["status"]="error" step_log["data"]["actions"].append("ERROR => RBC missing or no original image.") ctx.skip_run= True batch_logs.append(step_log) error_count += 1 continue try: im, fn, _= ctx.pil_img["original"] w,h= im.size step_log["data"]["primary_width"]= w step_log["data"]["primary_height"]= h if w==h: orientation= "Square" elif w> h: orientation= "Landscape" else: orientation= "Portrait" step_log["data"]["primary_orientation"]= orientation brds= ctx.define_result.get("borders",{}) left_line = parse_line_flag(brds.get("left_line",False)) right_line = parse_line_flag(brds.get("right_line",False)) upper_line = parse_line_flag(brds.get("upper_line",False)) lower_line = parse_line_flag(brds.get("lower_line",False)) step_log["data"]["border_lines"]={ "left_line": str(left_line), "right_line": str(right_line), "upper_line": str(upper_line), "lower_line": str(lower_line) } dr= ctx.detection_result final_kws= dr.get("final_keywords",[]) if dr else [] shoes_detected= any(k in SHOES_LIST for k in final_kws) head_detected= any(k in HEAD_LIST for k in final_kws) border_pad= int(0.075* max(w,h)) final_img= im px_info= {"left":0,"right":0,"top":0,"bottom":0} scenario= None if orientation=="Square": if shoes_detected: scenario= "square_shoes_exception" pl= border_pad if not left_line else 0 pr= border_pad if not right_line else 0 pt= 0 pb= border_pad padded,cA= partial_pad_square(final_img, pl,pr,pt,pb) for kk in cA: px_info[kk]+= cA[kk] final_img= padded elif (left_line or right_line or upper_line or lower_line): scenario= "square_has_lines" else: scenario= "square_all_false" wtmp2,cB= two_step_pad_to_square(final_img,"Landscape",border_pad) for kk in cB: px_info[kk]+= cB[kk] final_img= wtmp2 if head_detected and not shoes_detected: scenario= "square_head_exception" step_log["data"]["condition"]= scenario step_log["data"]["actions"]= [ ACTION_LIB_SQUARE[scenario] ] elif orientation=="Landscape": if shoes_detected: scenario= "landscape_shoes_exception" pad_l=0; pad_r=0; pad_t=0; pad_b= border_pad padded0,c0= partial_pad_square(final_img,pad_l,pad_r,pad_t,pad_b) for kk in c0: px_info[kk]+= c0[kk] final_img= padded0 cimg,cx1= coverage_crop_with_shorter_dimension( final_img, ctx, "landscape", False ) for kk in cx1: px_info[kk]+= cx1[kk] final_img= cimg elif (left_line or right_line or upper_line or lower_line): scenario= "landscape_coverage" cimg2,cx2= coverage_crop_with_shorter_dimension( final_img, ctx, "landscape", False ) for kk in cx2: px_info[kk]+= cx2[kk] final_img= cimg2 else: scenario= "landscape_all_false" wtmp3,c3= two_step_pad_to_square(final_img,"Landscape",border_pad) for kk in c3: px_info[kk]+= c3[kk] final_img= wtmp3 if head_detected and not shoes_detected: scenario= "landscape_head_exception" if h> w: forced_y = h - w ctx.define_result["largest_box"] = [0, forced_y, w, h] cimgH,cxH= coverage_crop_with_shorter_dimension( final_img, ctx, "landscape", False ) for kk in cxH: px_info[kk]+= cxH[kk] final_img= cimgH step_log["data"]["condition"]= scenario step_log["data"]["actions"]= [ ACTION_LIB_LANDSCAPE[scenario] ] else: up_low_count= (1 if upper_line else 0)+(1 if lower_line else 0) if shoes_detected: scenario= "portrait_shoes_exception" pl= border_pad if not left_line else 0 pr= border_pad if not right_line else 0 pt=0 pb= border_pad paddedP,cS= partial_pad_square(final_img, pl,pr,pt,pb) for kk in cS: px_info[kk]+= cS[kk] final_img= paddedP elif head_detected: scenario= "portrait_head_exception" side_diff= h - w if h> w else 0 half_ = side_diff//2 leftover= side_diff- half_ pimg, cHL= partial_pad_square(final_img, half_, leftover, 0, 0) for kk in cHL: px_info[kk]+= cHL[kk] final_img= pimg elif (left_line or right_line): scenario= "portrait_lr_any" cimg3,c33= coverage_crop_with_shorter_dimension( final_img, ctx, "portrait", False ) for kk in c33: px_info[kk]+= c33[kk] final_img= cimg3 elif up_low_count==2: scenario= "portrait_up_low_both" wtmp4,c44= pad_left_right_only(final_img,border_pad) for kk in c44: px_info[kk]+= c44[kk] final_img= wtmp4 elif up_low_count==1: scenario= "portrait_any_up_low" wtmp5,c55= two_step_pad_to_square(final_img,"Portrait",border_pad) for kk in c55: px_info[kk]+= c55[kk] final_img= wtmp5 else: scenario= "portrait_all_false" wtmp6,c66= two_step_pad_to_square(final_img,"Portrait",border_pad) for kk in c66: px_info[kk]+= c66[kk] final_img= wtmp6 step_log["data"]["condition"]= scenario step_log["data"]["actions"]= [ ACTION_LIB_PORTRAIT[scenario] ] step_log["status"]="ok" fw,fh= final_img.size step_log["data"]["final_width"] = fw step_log["data"]["final_height"]= fh def plus_minus(dx): return f"+{dx}" if dx>=0 else str(dx) step_log["data"]["border_lines"] = { "left_line": f"{left_line} {plus_minus(px_info['left'])}px", "right_line": f"{right_line} {plus_minus(px_info['right'])}px", "upper_line": f"{upper_line} {plus_minus(px_info['top'])}px", "lower_line": f"{lower_line} {plus_minus(px_info['bottom'])}px" } ctx.pil_img["original"]= [final_img, fn, None] ctx.pad_info.update(px_info) processed_count += 1 except Exception as e: step_log["status"]="error" step_log["data"]["actions"].append(f"ERROR => exception => {repr(e)}") error_count += 1 batch_logs.append(step_log) processing_time = time.perf_counter() - start_time logging.log(LOG_LEVEL_MAP["SUCCESS"], f"{EMOJI_MAP['SUCCESS']} Completed {function_name}: processed={processed_count}, skipped={skipped_count}, errors={error_count} in {processing_time:.3f}s") return batch_logs def center_object_batch(contexts: List[ProcessingContext], batch_logs: List[dict]): function_name = "center_object_batch" start_time = time.perf_counter() logging.log(LOG_LEVEL_MAP["INFO"], f"{EMOJI_MAP['INFO']} Starting {function_name} for {len(contexts)} items") processed_count = 0 skipped_count = 0 error_count = 0 for ctx in contexts: step_log = { "function": function_name, "image_url": ctx.url, "status": None, "data": { "leftmost_x": None, "rightmost_x": None, "midpoint_x": None, "shift_x": None, "bbox": None, "notes": "" } } if ctx.skip_run or ctx.skip_processing: step_log["status"] = "skipped" batch_logs.append(step_log) skipped_count += 1 continue if "original" not in ctx.pil_img: step_log["status"] = "error" step_log["data"]["notes"] = "No final image found in context." ctx.skip_run = True batch_logs.append(step_log) error_count += 1 continue try: pil_img, _, _ = ctx.pil_img["original"] image = pil_img.convert("RGBA") width, height = image.size center_y = height // 2 alpha = image.split()[3] non_transparent_xs = [] for x in range(width): if alpha.getpixel((x, center_y)) != 0: non_transparent_xs.append(x) if not non_transparent_xs: step_log["status"] = "no_op" step_log["data"]["notes"] = "No non-transparent pixel found on horizontal mid-line." batch_logs.append(step_log) skipped_count += 1 continue leftmost = min(non_transparent_xs) rightmost = max(non_transparent_xs) midpoint_x = (leftmost + rightmost) / 2.0 image_center_x = width / 2.0 shift_x = image_center_x - midpoint_x bbox = alpha.getbbox() if not bbox: step_log["status"] = "no_op" step_log["data"]["notes"] = "Image has no non-transparent bounding box." batch_logs.append(step_log) skipped_count += 1 continue region = image.crop(bbox) new_left = int(bbox[0] + shift_x) new_top = bbox[1] new_image = Image.new("RGBA", (width, height), (0, 0, 0, 0)) new_image.paste(region, (new_left, new_top), region) ctx.pil_img["original"] = [new_image, ctx.pil_img["original"][1], None] step_log["status"] = "ok" step_log["data"]["leftmost_x"] = leftmost step_log["data"]["rightmost_x"] = rightmost step_log["data"]["midpoint_x"] = round(midpoint_x, 2) step_log["data"]["shift_x"] = round(shift_x, 2) step_log["data"]["bbox"] = bbox step_log["data"]["notes"] = "Object horizontally centered (red lines removed)." processed_count += 1 except Exception as e: step_log["status"] = "error" step_log["data"]["notes"] = f"Error: {str(e)}" error_count += 1 batch_logs.append(step_log) processing_time = time.perf_counter() - start_time logging.log(LOG_LEVEL_MAP["SUCCESS"], f"{EMOJI_MAP['SUCCESS']} Completed {function_name}: processed={processed_count}, skipped={skipped_count}, errors={error_count} in {processing_time:.3f}s") return batch_logs # ---------------------------------------------------------------------- # MAIN PIPELINE FUNCTION # ---------------------------------------------------------------------- def ensure_models_loaded(): import app app.ensure_models_loaded() pipeline_step = create_pipeline_step(ensure_models_loaded) @pipeline_step def cropping_padding(contexts: List[ProcessingContext], batch_logs: List[dict] = None): if batch_logs is None: batch_logs = [] start_time = time.perf_counter() logging.log(LOG_LEVEL_MAP["INFO"], f"{EMOJI_MAP['INFO']} Starting cropping_padding pipeline for {len(contexts)} items") if not ENABLE_CROPPING_PADDING: logging.log(LOG_LEVEL_MAP["WARNING"], f"{EMOJI_MAP['WARNING']} Cropping and padding operations are disabled (ENABLE_CROPPING_PADDING=False)") logging.log(LOG_LEVEL_MAP["INFO"], f"{EMOJI_MAP['INFO']} Returning original images unchanged") for ctx in contexts: skip_log = { "function": "cropping_padding_pipeline", "image_url": ctx.url, "status": "skipped", "reason": "ENABLE_CROPPING_PADDING is False", "data": {"operations_performed": "none", "original_image_preserved": True} } batch_logs.append(skip_log) processing_time = time.perf_counter() - start_time logging.log(LOG_LEVEL_MAP["SUCCESS"], f"{EMOJI_MAP['SUCCESS']} Completed cropping_padding pipeline (skipped) for {len(contexts)} items in {processing_time:.3f}s") return batch_logs cropp_batch(contexts, batch_logs) shrink_primary_box_batch(contexts, batch_logs) detect_border_stright_line_batch(contexts, batch_logs) pad_image_box_to_squere_batch(contexts, batch_logs) center_object_batch(contexts, batch_logs) processing_time = time.perf_counter() - start_time logging.log(LOG_LEVEL_MAP["SUCCESS"], f"{EMOJI_MAP['SUCCESS']} Completed cropping_padding pipeline for {len(contexts)} items in {processing_time:.3f}s") return batch_logs