Spaces:
Running
on
Zero
Running
on
Zero
# ---------------------------------------------------------------------- | |
# IMPORTS | |
# ---------------------------------------------------------------------- | |
import spaces | |
# Simple GPU function to ensure Zero GPU detection | |
def gpu_available(): | |
import torch | |
return torch.cuda.is_available() | |
import os | |
import sys | |
import json | |
import time | |
import logging | |
import traceback | |
import subprocess | |
from datetime import datetime | |
from typing import List, Dict, Optional, Union | |
from contextlib import asynccontextmanager | |
import torch | |
import uvicorn | |
import threading | |
import requests | |
from fastapi import FastAPI, HTTPException, Request | |
from fastapi.responses import JSONResponse, HTMLResponse | |
from fastapi.middleware.cors import CORSMiddleware | |
from pydantic import BaseModel, Field | |
# ---------------------------------------------------------------------- | |
# PATH SETUP | |
# ---------------------------------------------------------------------- | |
script_dir = os.path.dirname(os.path.abspath(__file__)) | |
sys.path.insert(0, script_dir) | |
# ---------------------------------------------------------------------- | |
# LOCAL IMPORTS | |
# ---------------------------------------------------------------------- | |
from src.utils import ( | |
ProcessingContext, | |
ProcessingResponse, | |
ProcessedImage, | |
setup_logging, | |
get_system_info, | |
cleanup_memory, | |
custom_dumps, | |
LOG_LEVEL_MAP, | |
EMOJI_MAP | |
) | |
from src.models.model_loader import ( | |
ensure_models_loaded, | |
check_hardware_environment, | |
MODELS_LOADED, | |
LOAD_ERROR, | |
DEVICE | |
) | |
from src.pipeline import run_functions_in_sequence, PIPELINE_STEPS | |
# ---------------------------------------------------------------------- | |
# CONFIGURATION | |
# ---------------------------------------------------------------------- | |
from src.config import ( | |
API_TITLE, | |
API_VERSION, | |
API_DESCRIPTION, | |
API_HOST, | |
API_PORT, | |
GPU_DURATION_LONG, | |
STATUS_SUCCESS, | |
STATUS_ERROR, | |
STATUS_PROCESSED, | |
STATUS_NOT_PROCESSED, | |
ERROR_NO_VALID_URLS, | |
HTTP_OK, | |
HTTP_BAD_REQUEST, | |
HTTP_INTERNAL_SERVER_ERROR | |
) | |
# ---------------------------------------------------------------------- | |
# IMPORT TEST CONFIGURATION | |
# ---------------------------------------------------------------------- | |
try: | |
from tests.config import RUN_TESTS | |
except ImportError: | |
try: | |
sys.path.insert(0, os.path.join(script_dir, 'tests')) | |
from config import RUN_TESTS | |
except ImportError: | |
RUN_TESTS = False | |
print("Warning: Could not import RUN_TESTS from tests.config, defaulting to False") | |
# ---------------------------------------------------------------------- | |
# PYDANTIC MODELS | |
# ---------------------------------------------------------------------- | |
class ImageRequest(BaseModel): | |
urls: Union[str, List[str]] = Field(..., description="Image URL(s)") | |
product_type: str = Field("General", description="Product type") | |
options: Optional[Dict] = Field(default_factory=dict, description="Processing options") | |
class ShopifyWebhook(BaseModel): | |
data: List = Field(..., description="Shopify webhook data") | |
class HealthResponse(BaseModel): | |
status: str | |
timestamp: float | |
device: str | |
models_loaded: bool | |
gpu_available: bool = False | |
system_info: Dict | |
# ---------------------------------------------------------------------- | |
# LIFESPAN MANAGEMENT | |
# ---------------------------------------------------------------------- | |
async def lifespan(app: FastAPI): | |
setup_logging() | |
logging.info(f"{EMOJI_MAP['INFO']} Starting {API_TITLE} v{API_VERSION}") | |
check_hardware_environment() | |
# Load models FIRST | |
try: | |
ensure_models_loaded() | |
if os.getenv("SPACE_ID"): | |
logging.info(f"{EMOJI_MAP['INFO']} Zero GPU environment - models will be loaded on first request") | |
else: | |
if MODELS_LOADED: | |
logging.info(f"{EMOJI_MAP['SUCCESS']} Models loaded successfully") | |
else: | |
logging.warning(f"{EMOJI_MAP['WARNING']} Models not fully loaded") | |
# Run GPU initialization for Spaces | |
if os.getenv("SPACE_ID"): | |
try: | |
init_gpu() | |
logging.info(f"{EMOJI_MAP['SUCCESS']} GPU initialization completed") | |
except Exception as e: | |
error_msg = str(e) | |
if "GPU task aborted" in error_msg: | |
logging.warning(f"{EMOJI_MAP['WARNING']} GPU initialization aborted (Zero GPU not ready yet) - this is normal during startup") | |
logging.info("GPU will be initialized on first request") | |
else: | |
logging.warning(f"{EMOJI_MAP['WARNING']} GPU initialization failed: {error_msg}") | |
except Exception as e: | |
logging.error(f"{EMOJI_MAP['ERROR']} Failed to load models: {str(e)}") | |
# Now run tests after models are loaded | |
# Skip tests in Zero GPU if SKIP_STARTUP_TEST is set | |
skip_startup_test = os.getenv("SKIP_STARTUP_TEST", "false").lower() == "true" | |
if RUN_TESTS and os.environ.get("IN_PYTEST") != "true" and not skip_startup_test: | |
logging.info(f"{EMOJI_MAP['INFO']} Running tests at startup...") | |
# Run a simple test that calls the endpoint after server starts | |
def run_endpoint_test(): | |
logging.info(f"{EMOJI_MAP['INFO']} Starting endpoint test with RUN_TESTS={RUN_TESTS}") | |
# Configuration for retries - increased for Zero GPU warming up | |
max_retries = 5 | |
retry_delay = 60 # seconds - increased from 30 | |
initial_delay = 45 # seconds - increased from 30 | |
# Test payload | |
payload = { | |
"data": [ | |
[{"url": "https://cdn.shopify.com/s/files/1/0505/0928/3527/files/hugging_face_test_image_shirt_product_type.jpg"}], | |
"Shirt" | |
] | |
} | |
# In Zero GPU environments, wait longer and handle GPU task abort gracefully | |
if os.getenv("SPACE_ID"): | |
logging.info(f"{EMOJI_MAP['INFO']} Zero GPU environment detected - waiting {initial_delay}s for GPU to warm up...") | |
time.sleep(initial_delay) # Initial wait for Zero GPU environment to be ready | |
logging.info(f"{EMOJI_MAP['INFO']} Running full processing test with enhanced retry logic (max {max_retries} attempts)") | |
for retry in range(max_retries): | |
try: | |
logging.info(f"{EMOJI_MAP['INFO']} Testing /api/rb_and_crop endpoint (attempt {retry + 1}/{max_retries})...") | |
response = requests.post( | |
"http://localhost:7860/api/rb_and_crop", | |
json=payload, | |
timeout=180 # Longer timeout for Zero GPU | |
) | |
if response.status_code == 200: | |
data = response.json() | |
if "processed_images" in data and data["processed_images"]: | |
img = data["processed_images"][0] | |
img_status = img.get('status') | |
if img_status == STATUS_PROCESSED: | |
logging.info(f"{EMOJI_MAP['SUCCESS']} Test passed! Image status: {img_status}") | |
if img.get('base64_image'): | |
logging.info(f"{EMOJI_MAP['SUCCESS']} Image processed and base64 encoded successfully") | |
logging.info(f"{EMOJI_MAP['SUCCESS']} Full image processing test completed successfully") | |
break # Success, exit retry loop | |
elif img_status == STATUS_ERROR: | |
error_detail = img.get('error', 'Unknown error') | |
if "GPU task aborted" in error_detail or "GPU resources temporarily unavailable" in error_detail: | |
logging.warning(f"{EMOJI_MAP['WARNING']} GPU task aborted during processing (attempt {retry + 1}/{max_retries})") | |
logging.info(f"{EMOJI_MAP['INFO']} Zero GPU is warming up - this is expected during startup") | |
if retry < max_retries - 1: | |
logging.info(f"{EMOJI_MAP['INFO']} Waiting {retry_delay}s for GPU to stabilize...") | |
time.sleep(retry_delay) | |
continue | |
else: | |
logging.error(f"{EMOJI_MAP['ERROR']} Processing error: {error_detail}") | |
else: | |
logging.warning(f"{EMOJI_MAP['WARNING']} Unexpected image status: {img_status}") | |
else: | |
logging.warning(f"{EMOJI_MAP['WARNING']} Test returned no images") | |
elif response.status_code == 503: | |
# GPU resources temporarily unavailable | |
logging.warning(f"{EMOJI_MAP['WARNING']} GPU resources unavailable (503), will retry...") | |
if retry < max_retries - 1: | |
logging.info(f"{EMOJI_MAP['INFO']} Waiting {retry_delay}s for GPU to become available...") | |
time.sleep(retry_delay) | |
continue | |
elif response.status_code == 500: | |
# Check if it's a GPU abort error | |
try: | |
error_data = response.json() | |
error_detail = error_data.get('error', '') | |
if "GPU task aborted" in error_detail or "GPU resources temporarily unavailable" in error_detail: | |
logging.warning(f"{EMOJI_MAP['WARNING']} GPU task aborted (500): {error_detail}") | |
if retry < max_retries - 1: | |
logging.info(f"{EMOJI_MAP['INFO']} Zero GPU is still warming up. Waiting {retry_delay}s before retry...") | |
time.sleep(retry_delay) | |
continue | |
else: | |
logging.error(f"{EMOJI_MAP['ERROR']} Server error (500): {error_detail}") | |
except: | |
logging.error(f"{EMOJI_MAP['ERROR']} Test failed with status 500: {response.text[:200]}") | |
else: | |
logging.error(f"{EMOJI_MAP['ERROR']} Test failed with status {response.status_code}") | |
if response.text: | |
try: | |
error_data = response.json() | |
logging.error(f"Error details: {error_data.get('error', 'Unknown error')}") | |
except: | |
logging.error(f"Response: {response.text[:200]}") | |
except requests.exceptions.Timeout: | |
logging.warning(f"{EMOJI_MAP['WARNING']} Request timeout on attempt {retry + 1} - GPU might be initializing") | |
if retry < max_retries - 1: | |
logging.info(f"{EMOJI_MAP['INFO']} Waiting {retry_delay}s before retry...") | |
time.sleep(retry_delay) | |
continue | |
except Exception as e: | |
error_msg = str(e) | |
if "GPU task aborted" in error_msg or "503" in error_msg or "Connection refused" in error_msg: | |
logging.warning(f"{EMOJI_MAP['WARNING']} Connection/GPU error on attempt {retry + 1}: {error_msg}") | |
if retry < max_retries - 1: | |
logging.info(f"{EMOJI_MAP['INFO']} Zero GPU warming up. Waiting {retry_delay}s before retry...") | |
time.sleep(retry_delay) | |
continue | |
else: | |
logging.error(f"{EMOJI_MAP['ERROR']} Test error: {error_msg}") | |
if retry < max_retries - 1: | |
logging.info(f"{EMOJI_MAP['INFO']} Will retry after {retry_delay}s...") | |
time.sleep(retry_delay) | |
continue | |
# Final health check only runs if we exhausted all retries without success | |
# The 'break' statement above ensures we only reach here if test failed | |
if retry == max_retries - 1: | |
logging.warning(f"{EMOJI_MAP['WARNING']} Full test failed after {max_retries} attempts") | |
logging.info(f"{EMOJI_MAP['INFO']} This is normal for Zero GPU during startup - the GPU needs time to warm up") | |
try: | |
response = requests.get("http://localhost:7860/health", timeout=10) | |
if response.status_code == 200: | |
data = response.json() | |
logging.info(f"{EMOJI_MAP['SUCCESS']} Health check passed - service is running and ready") | |
logging.info(f"Device: {data.get('device')}, Models loaded: {data.get('models_loaded')}") | |
logging.info(f"{EMOJI_MAP['INFO']} The GPU will be fully initialized on the first real request") | |
else: | |
logging.warning(f"{EMOJI_MAP['WARNING']} Health check returned status {response.status_code}") | |
except Exception as e: | |
logging.warning(f"{EMOJI_MAP['WARNING']} Health check failed: {str(e)}") | |
logging.info(f"{EMOJI_MAP['INFO']} Service is available and will handle requests normally once GPU warms up") | |
else: | |
# Non-Zero GPU environment - run full test after shorter delay | |
time.sleep(10) # Wait for server to fully start | |
try: | |
logging.info(f"{EMOJI_MAP['INFO']} Testing /api/rb_and_crop endpoint...") | |
# Normal timeout for non-Zero GPU environments | |
response = requests.post( | |
"http://localhost:7860/api/rb_and_crop", | |
json=payload, | |
timeout=120 | |
) | |
if response.status_code == 200: | |
data = response.json() | |
if "processed_images" in data and data["processed_images"]: | |
img = data["processed_images"][0] | |
img_status = img.get('status') | |
if img_status == STATUS_PROCESSED: | |
logging.info(f"{EMOJI_MAP['SUCCESS']} Test passed! Image status: {img_status}") | |
if img.get('base64_image'): | |
logging.info(f"{EMOJI_MAP['SUCCESS']} Image processed and base64 encoded successfully") | |
elif img_status == STATUS_ERROR: | |
logging.error(f"{EMOJI_MAP['ERROR']} Processing error: {img.get('error', 'Unknown error')}") | |
else: | |
logging.warning(f"{EMOJI_MAP['WARNING']} Unexpected image status: {img_status}") | |
else: | |
logging.warning(f"{EMOJI_MAP['WARNING']} Test returned no images") | |
else: | |
logging.error(f"{EMOJI_MAP['ERROR']} Test failed with status {response.status_code}") | |
if response.text: | |
try: | |
error_data = response.json() | |
logging.error(f"Error details: {error_data.get('error', 'Unknown error')}") | |
except: | |
logging.error(f"Response: {response.text[:200]}") | |
except Exception as e: | |
logging.error(f"{EMOJI_MAP['ERROR']} Test error: {str(e)}") | |
# Run test in background thread | |
import threading | |
test_thread = threading.Thread(target=run_endpoint_test, daemon=True) | |
test_thread.start() | |
yield | |
logging.info(f"{EMOJI_MAP['INFO']} API shutdown initiated") | |
cleanup_memory() | |
# ---------------------------------------------------------------------- | |
# FASTAPI APP | |
# ---------------------------------------------------------------------- | |
app = FastAPI( | |
title=API_TITLE, | |
version=API_VERSION, | |
description=API_DESCRIPTION, | |
docs_url="/api/docs", | |
redoc_url="/api/redoc", | |
lifespan=lifespan | |
) | |
# ---------------------------------------------------------------------- | |
# MIDDLEWARE | |
# ---------------------------------------------------------------------- | |
app.add_middleware( | |
CORSMiddleware, | |
allow_origins=["*"], | |
allow_credentials=True, | |
allow_methods=["*"], | |
allow_headers=["*"], | |
) | |
# ---------------------------------------------------------------------- | |
# GPU INITIALIZATION | |
# ---------------------------------------------------------------------- | |
def init_gpu(): | |
"""Initialize GPU for Spaces environment""" | |
try: | |
logging.info(f"{EMOJI_MAP['INFO']} Initializing GPU...") | |
if torch.cuda.is_available(): | |
torch.cuda.empty_cache() | |
try: | |
torch.cuda.ipc_collect() | |
except Exception as e: | |
logging.warning(f"IPC collect failed, continuing anyway: {e}") | |
# Test GPU availability | |
test_tensor = torch.tensor([1.0]).cuda() | |
del test_tensor | |
logging.info(f"{EMOJI_MAP['SUCCESS']} GPU is available and working") | |
else: | |
logging.warning(f"{EMOJI_MAP['WARNING']} CUDA not available in GPU context") | |
return True | |
except Exception as e: | |
error_msg = str(e) | |
if "GPU task aborted" in error_msg: | |
logging.warning(f"{EMOJI_MAP['WARNING']} GPU initialization aborted - Zero GPU not ready") | |
else: | |
logging.error(f"{EMOJI_MAP['ERROR']} GPU initialization error: {error_msg}") | |
raise | |
# ---------------------------------------------------------------------- | |
# HELPER FUNCTIONS | |
# ---------------------------------------------------------------------- | |
def _process_images_impl(urls: Union[str, List[str]], product_type: str) -> Dict: | |
start_time = time.time() | |
if isinstance(urls, str): | |
url_list = [url.strip() for url in urls.split(",") if url.strip()] | |
else: | |
url_list = urls | |
if not url_list: | |
raise HTTPException(status_code=HTTP_BAD_REQUEST, detail=ERROR_NO_VALID_URLS) | |
# Import build_keywords function to generate keywords based on product type | |
from src.processing.bounding_box.bounding_box import build_keywords | |
# Generate keywords for this product type | |
keywords = build_keywords(product_type) | |
contexts = [ProcessingContext(url=url, product_type=product_type, keywords=keywords) for url in url_list] | |
batch_logs = [] | |
try: | |
ensure_models_loaded() | |
run_functions_in_sequence(contexts, PIPELINE_STEPS) | |
processed_images = [] | |
for ctx in contexts: | |
if hasattr(ctx, 'error') and ctx.error: | |
processed_images.append({ | |
"url": ctx.url, | |
"status": STATUS_ERROR, | |
"error": str(ctx.error) | |
}) | |
elif hasattr(ctx, 'skip_processing') and ctx.skip_processing: | |
# Check if there's a specific error message | |
error_msg = "Processing skipped" | |
if hasattr(ctx, 'processing_error'): | |
error_msg = str(ctx.processing_error) | |
processed_images.append({ | |
"url": ctx.url, | |
"status": STATUS_ERROR, | |
"error": error_msg | |
}) | |
elif hasattr(ctx, 'result_image') and ctx.result_image: | |
processed_images.append({ | |
"url": ctx.url, | |
"status": STATUS_PROCESSED, | |
"base64_image": ctx.result_image, | |
"metadata": ctx.metadata, | |
"processing_logs": ctx.processing_logs | |
}) | |
else: | |
processed_images.append({ | |
"url": ctx.url, | |
"status": STATUS_NOT_PROCESSED | |
}) | |
total_time = time.time() - start_time | |
return { | |
"status": "success", | |
"processed_images": processed_images, | |
"total_time": total_time, | |
"batch_logs": batch_logs, | |
"system_info": get_system_info() | |
} | |
except Exception as e: | |
logging.error(f"{EMOJI_MAP['ERROR']} Processing failed: {str(e)}") | |
raise HTTPException(status_code=500, detail=str(e)) | |
def process_images_gpu(urls: Union[str, List[str]], product_type: str) -> Dict: | |
"""GPU-accelerated image processing for Spaces""" | |
try: | |
# Force model loading in GPU context for Zero GPU environment | |
if not MODELS_LOADED: | |
logging.info(f"{EMOJI_MAP['INFO']} Loading models in GPU context...") | |
from src.models.model_loader import load_models | |
try: | |
load_models() | |
logging.info(f"{EMOJI_MAP['SUCCESS']} Models loaded in GPU context") | |
except Exception as e: | |
logging.error(f"{EMOJI_MAP['ERROR']} Failed to load models in GPU context: {str(e)}") | |
# Continue anyway - some steps might work without all models | |
# Move models to GPU within the GPU context | |
logging.info(f"{EMOJI_MAP['INFO']} Moving models to GPU...") | |
from src.models.model_loader import move_models_to_gpu | |
try: | |
move_models_to_gpu() | |
logging.info(f"{EMOJI_MAP['SUCCESS']} Models moved to GPU") | |
except Exception as e: | |
logging.warning(f"{EMOJI_MAP['WARNING']} Failed to move some models to GPU: {str(e)}") | |
# Continue anyway - will run on CPU but slower | |
return _process_images_impl(urls, product_type) | |
except Exception as e: | |
error_msg = str(e) | |
if "GPU task aborted" in error_msg: | |
logging.error(f"{EMOJI_MAP['ERROR']} GPU task was aborted - Zero GPU might be overloaded or warming up") | |
logging.info(f"{EMOJI_MAP['INFO']} This often happens during startup - the GPU will be ready soon") | |
raise HTTPException( | |
status_code=503, | |
detail="GPU resources temporarily unavailable. Zero GPU is warming up. Please try again in 30-60 seconds." | |
) | |
else: | |
raise | |
def process_images(urls: Union[str, List[str]], product_type: str) -> Dict: | |
"""Process images with automatic GPU/CPU selection""" | |
if os.getenv("SPACE_ID"): | |
return process_images_gpu(urls, product_type) | |
else: | |
return _process_images_impl(urls, product_type) | |
# ---------------------------------------------------------------------- | |
# ENDPOINTS | |
# ---------------------------------------------------------------------- | |
async def root(): | |
return f""" | |
<html> | |
<head> | |
<title>{API_TITLE}</title> | |
</head> | |
<body> | |
<h1>{API_TITLE} v{API_VERSION}</h1> | |
<p>Visit <a href="/api/docs">/api/docs</a> for API documentation</p> | |
</body> | |
</html> | |
""" | |
async def health(): | |
# Check GPU availability | |
gpu_available = False | |
gpu_name = None | |
try: | |
if torch.cuda.is_available(): | |
gpu_available = True | |
gpu_name = torch.cuda.get_device_name(0) | |
except: | |
pass | |
system_info = get_system_info() | |
system_info["gpu_available"] = gpu_available | |
system_info["gpu_name"] = gpu_name | |
system_info["space_id"] = os.getenv("SPACE_ID", None) | |
system_info["zero_gpu"] = bool(os.getenv("SPACE_ID")) | |
return HealthResponse( | |
status="healthy", | |
timestamp=time.time(), | |
device=DEVICE, | |
models_loaded=MODELS_LOADED, | |
gpu_available=gpu_available, | |
system_info=system_info | |
) | |
async def wake_up(): | |
"""Lightweight endpoint for waking up the space""" | |
logging.info(f"{EMOJI_MAP['INFO']} Wake-up request received") | |
# Try to initialize GPU if in Zero GPU environment | |
if os.getenv("SPACE_ID"): | |
try: | |
# This will trigger GPU allocation in Zero GPU spaces | |
init_gpu() | |
logging.info(f"{EMOJI_MAP['SUCCESS']} GPU initialized for wake-up") | |
except Exception as e: | |
logging.warning(f"{EMOJI_MAP['WARNING']} GPU initialization during wake-up: {str(e)}") | |
# Ensure models are loaded | |
try: | |
ensure_models_loaded() | |
logging.info(f"{EMOJI_MAP['SUCCESS']} Models loaded during wake-up") | |
except Exception as e: | |
logging.warning(f"{EMOJI_MAP['WARNING']} Model loading during wake-up: {str(e)}") | |
return { | |
"status": "awake", | |
"timestamp": time.time(), | |
"device": DEVICE, | |
"models_loaded": MODELS_LOADED, | |
"message": "Service is awake and ready" | |
} | |
async def quota_info(): | |
"""Provide information about GPU quota (informational only - actual quota is managed by HF)""" | |
return { | |
"status": "info", | |
"quota_management": "Hugging Face ZeroGPU Infrastructure", | |
"quota_details": { | |
"total_seconds": 300, | |
"refill_rate": "1 GPU second per 30 real seconds", | |
"half_life": "2 hours", | |
"full_recovery_time": "2.5 hours (9000 seconds)" | |
}, | |
"recovery_suggestions": { | |
"light_usage": { | |
"gpu_seconds": 30, | |
"wait_minutes": 15, | |
"suitable_for": "1-2 images" | |
}, | |
"moderate_usage": { | |
"gpu_seconds": 60, | |
"wait_minutes": 30, | |
"suitable_for": "2-4 images" | |
}, | |
"heavy_usage": { | |
"gpu_seconds": 120, | |
"wait_minutes": 60, | |
"suitable_for": "4-8 images" | |
}, | |
"full_quota": { | |
"gpu_seconds": 300, | |
"wait_minutes": 150, | |
"suitable_for": "10+ images" | |
} | |
}, | |
"note": "This endpoint provides information only. Actual quota tracking is done by Hugging Face infrastructure.", | |
"timestamp": time.time() | |
} | |
async def predict(request: ImageRequest): | |
result = process_images(request.urls, request.product_type) | |
return ProcessingResponse( | |
status=result["status"], | |
results=[ | |
ProcessedImage( | |
image_url=img["url"], | |
status=img["status"], | |
base64=img.get("base64_image", ""), | |
format="png", | |
type="processed", | |
metadata=img.get("metadata", {}), | |
error=img.get("error") | |
) | |
for img in result["processed_images"] | |
], | |
processed_count=len([img for img in result["processed_images"] if img["status"] == STATUS_PROCESSED]), | |
total_time=result["total_time"], | |
system_info=result["system_info"] | |
) | |
async def shopify_webhook(webhook: ShopifyWebhook): | |
if not webhook.data or len(webhook.data) < 2: | |
raise HTTPException(status_code=HTTP_BAD_REQUEST, detail="Invalid webhook data") | |
images_info = webhook.data[0] | |
product_type = webhook.data[1] if len(webhook.data) > 1 else "General" | |
if not isinstance(images_info, list): | |
raise HTTPException(status_code=HTTP_BAD_REQUEST, detail="Invalid images data") | |
urls = [] | |
for img_dict in images_info: | |
if isinstance(img_dict, dict) and "url" in img_dict: | |
urls.append(img_dict["url"]) | |
if not urls: | |
raise HTTPException(status_code=HTTP_BAD_REQUEST, detail=ERROR_NO_VALID_URLS) | |
# Special handling for wake-up requests (single placeholder image with "Test" product type) | |
if len(urls) == 1 and product_type == "Test" and "placeholder.com" in urls[0]: | |
logging.info(f"{EMOJI_MAP['INFO']} Wake-up request detected, returning minimal response") | |
return { | |
"status": STATUS_SUCCESS, | |
"processed_images": [{ | |
"url": urls[0], | |
"status": STATUS_PROCESSED, | |
"base64_image": "iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR42mNkYPhfDwAChwGA60e6kgAAAABJRU5ErkJggg==", # 1x1 transparent PNG | |
"color": "#ffffff", | |
"image_type": "wake_up", | |
"artifacts": "false" | |
}] | |
} | |
result = process_images(urls, product_type) | |
return { | |
"status": result["status"], | |
"processed_images": [ | |
{ | |
"url": img["url"], | |
"status": img["status"], | |
"base64_image": img.get("base64_image", ""), | |
"color": "#ffffff", | |
"image_type": "remove_background", | |
"artifacts": "false" | |
} | |
for img in result["processed_images"] | |
] | |
} | |
async def batch_process(requests: List[ImageRequest]): | |
results = [] | |
for req in requests: | |
try: | |
result = process_images(req.urls, req.product_type) | |
results.append(result) | |
except Exception as e: | |
results.append({ | |
"status": "error", | |
"error": str(e), | |
"urls": req.urls | |
}) | |
return { | |
"status": "success", | |
"batch_results": results, | |
"total_requests": len(requests) | |
} | |
# ---------------------------------------------------------------------- | |
# ERROR HANDLERS | |
# ---------------------------------------------------------------------- | |
async def http_exception_handler(request: Request, exc: HTTPException): | |
return JSONResponse( | |
status_code=exc.status_code, | |
content={ | |
"status": "error", | |
"error": exc.detail, | |
"timestamp": time.time() | |
} | |
) | |
async def general_exception_handler(request: Request, exc: Exception): | |
# Determine error type and prepare detailed response | |
error_type = "UNKNOWN_ERROR" | |
error_message = str(exc) | |
error_details = {} | |
# Check for specific error types | |
if ("GPU" in error_message and ("limit" in error_message or "quota" in error_message)) or "ZeroGPU quota exceeded" in error_message: | |
# For GPU quota errors, log a simple notification without traceback | |
logging.warning(f"{EMOJI_MAP['WARNING']} GPU quota exceeded: Space app has reached its GPU limit") | |
error_type = "GPU_LIMIT_ERROR" | |
error_details["gpu_error"] = True | |
# Provide quota recovery information | |
error_details["quota_info"] = { | |
"message": "GPU quota exceeded. ZeroGPU quota refills at 1 GPU second per 30 real seconds.", | |
"recommended_wait_times": { | |
"minimal": 900, # 15 minutes for ~30s GPU quota | |
"moderate": 1800, # 30 minutes for ~60s GPU quota | |
"full": 5400 # 90 minutes for ~180s GPU quota | |
}, | |
"note": "Quota is managed by Hugging Face infrastructure, not this application." | |
} | |
error_details["retry_after"] = 900 # Suggest 15 minutes minimum | |
elif "GPU task aborted" in error_message: | |
logging.error(f"{EMOJI_MAP['ERROR']} GPU task aborted") | |
error_type = "GPU_TASK_ABORTED" | |
error_details["gpu_error"] = True | |
elif "gradio.exceptions.Error" in str(type(exc)): | |
error_type = "GRADIO_ERROR" | |
error_details["gradio_error"] = True | |
# For Gradio errors related to GPU limits, don't log traceback | |
if "GPU limit" in error_message: | |
logging.warning(f"{EMOJI_MAP['WARNING']} GPU quota exceeded: Space app has reached its GPU limit") | |
else: | |
logging.error(f"{EMOJI_MAP['ERROR']} Unhandled exception: {str(exc)}") | |
logging.error(traceback.format_exc()) | |
elif isinstance(exc, ValueError): | |
error_type = "VALIDATION_ERROR" | |
logging.error(f"{EMOJI_MAP['ERROR']} Unhandled exception: {str(exc)}") | |
logging.error(traceback.format_exc()) | |
elif isinstance(exc, TimeoutError): | |
error_type = "TIMEOUT_ERROR" | |
logging.error(f"{EMOJI_MAP['ERROR']} Unhandled exception: {str(exc)}") | |
logging.error(traceback.format_exc()) | |
else: | |
# For other errors, log with traceback | |
logging.error(f"{EMOJI_MAP['ERROR']} Unhandled exception: {str(exc)}") | |
logging.error(traceback.format_exc()) | |
# Prepare response with detailed error information | |
error_response = { | |
"status": "error", | |
"error_type": error_type, | |
"error_message": error_message, | |
"error_details": error_details, | |
"timestamp": time.time(), | |
"request_path": str(request.url.path), | |
"request_method": request.method | |
} | |
# Only include traceback for non-GPU quota errors | |
if error_type not in ["GPU_LIMIT_ERROR", "GPU_TASK_ABORTED"] and not ("GPU limit" in error_message) and not ("ZeroGPU quota exceeded" in error_message): | |
tb_lines = traceback.format_exception(type(exc), exc, exc.__traceback__) | |
error_response["traceback"] = ''.join(tb_lines) | |
# For GPU quota errors, log a simple summary instead of full response | |
if error_type == "GPU_LIMIT_ERROR": | |
logging.warning(f"{EMOJI_MAP['WARNING']} GPU quota limit response sent to client with retry_after: {error_details.get('retry_after', 900)}s") | |
else: | |
# Log the full error details for other errors | |
logging.error(f"{EMOJI_MAP['ERROR']} Error response: {json.dumps(error_response, indent=2)}") | |
return JSONResponse( | |
status_code=500, | |
content=error_response | |
) | |
# ---------------------------------------------------------------------- | |
# MAIN | |
# ---------------------------------------------------------------------- | |
if __name__ == "__main__": | |
# Configure uvicorn logging to avoid duplicates | |
log_config = uvicorn.config.LOGGING_CONFIG | |
log_config["formatters"]["default"]["fmt"] = "%(asctime)s [%(levelname)s] %(name)s: %(message)s" | |
log_config["formatters"]["access"]["fmt"] = '%(asctime)s [%(levelname)s] %(name)s: %(client_addr)s - "%(request_line)s" %(status_code)s' | |
# Disable duplicate logging from uvicorn | |
log_config["loggers"]["uvicorn"]["propagate"] = False | |
log_config["loggers"]["uvicorn.access"]["propagate"] = False | |
uvicorn.run( | |
app, | |
host=API_HOST, | |
port=API_PORT, | |
log_level="info", | |
log_config=log_config | |
) | |