File size: 17,385 Bytes
f289d8a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
64ed036
a9d8551
f289d8a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
49d2559
a9d8551
 
 
 
 
 
 
 
 
 
 
64ed036
 
f289d8a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
64ed036
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
# UVIS - Gradio App with Upload, URL & Video Support
"""
This script launches the UVIS (Unified Visual Intelligence System) as a Gradio Web App.
Supports image, video, and URL-based media inputs for detection, segmentation, and depth estimation.
Outputs include scene blueprint, structured JSON, and downloadable results.
"""

import gradio as gr
from PIL import Image
import numpy as np
import os
import io
import zipfile
import json
import tempfile
import logging
import cv2
import requests
from urllib.parse import urlparse
from registry import get_model
from core.describe_scene import describe_scene
import uuid
import time
import timeout_decorator
import socket
import ipaddress
from huggingface_hub import hf_hub_download
import spaces

# Setup logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# Model mappings
DETECTION_MODEL_MAP = {
    "YOLOv5-Nano": "yolov5n-seg",
    "YOLOv5-Small": "yolov5s-seg",
    "YOLOv8-Small": "yolov8s",
    "YOLOv8-Large": "yolov8l",
    "RT-DETR": "rtdetr"  # For future support
}

SEGMENTATION_MODEL_MAP = {
    "SegFormer-B0": "nvidia/segformer-b0-finetuned-ade-512-512",
    "SegFormer-B5": "nvidia/segformer-b5-finetuned-ade-512-512",
    "DeepLabV3-ResNet50": "deeplabv3_resnet50"
}

DEPTH_MODEL_MAP = {
    "MiDaS v21 Small 256": "midas_v21_small_256",
    "MiDaS v21 384": "midas_v21_384",
    "DPT Hybrid 384": "dpt_hybrid_384",
    "DPT Swin2 Large 384": "dpt_swin2_large_384",
    "DPT Beit Large 512": "dpt_beit_large_512"
}

# Resource Limits
MAX_IMAGE_MB = 5
MAX_IMAGE_RES = (1920, 1080)
MAX_VIDEO_MB = 50
MAX_VIDEO_DURATION = 30  # seconds


@spaces.GPU
def preload_models():
    """
    This function is needed to activate ZeroGPU. It must be decorated with @spaces.GPU.
    It can be used to warm up models or load them into memory.
    """
    from registry import get_model
    print("Warming up models for ZeroGPU...")
    get_model("detection", "yolov5n-seg", device="cpu")
    get_model("segmentation", "deeplabv3_resnet50", device="cpu")
    get_model("depth", "midas_v21_small_256", device="cpu")


# Utility Functions
def format_error(message):
    """Formats error messages for consistent user feedback."""
    return {"error": message}

def toggle_visibility(show, *components):
    """Toggles visibility for multiple Gradio components."""
    return [gr.update(visible=show) for _ in components]

def generate_session_id():
    """Generates a unique session ID for tracking inputs."""
    return str(uuid.uuid4())

def log_runtime(start_time):
    """Logs the runtime of a process."""
    elapsed_time = time.time() - start_time
    logger.info(f"Process completed in {elapsed_time:.2f} seconds.")
    return elapsed_time

def is_public_ip(url):
    """
    Checks whether the resolved IP address of a URL is public (non-local).
    Prevents SSRF by blocking internal addresses like 127.0.0.1 or 192.168.x.x.
    """
    try:
        hostname = urlparse(url).hostname
        ip = socket.gethostbyname(hostname)
        ip_obj = ipaddress.ip_address(ip)
        return ip_obj.is_global  # Only allow globally routable IPs
    except Exception as e:
        logger.warning(f"URL IP validation failed: {e}")
        return False


def fetch_media_from_url(url):
    """
    Downloads media from a URL. Supports images and videos.
    Returns PIL.Image or video file path.
    """
    logger.info(f"Fetching media from URL: {url}")
    if not is_public_ip(url):
        logger.warning("Blocked non-public URL request (possible SSRF).")
        return None

    try:
        parsed_url = urlparse(url)
        ext = os.path.splitext(parsed_url.path)[-1].lower()
        headers = {"User-Agent": "Mozilla/5.0"}
        r = requests.get(url, headers=headers, timeout=10)

        if r.status_code != 200 or len(r.content) > 50 * 1024 * 1024:
            logger.warning(f"Download failed or file too large.")
            return None

        tmp_file = tempfile.NamedTemporaryFile(delete=False, suffix=ext)
        tmp_file.write(r.content)
        tmp_file.close()

        if ext in [".jpg", ".jpeg", ".png"]:
            return Image.open(tmp_file.name).convert("RGB")
        elif ext in [".mp4", ".avi", ".mov"]:
            return tmp_file.name
        else:
            logger.warning("Unsupported file type from URL.")
            return None
    except Exception as e:
        logger.error(f"URL fetch failed: {e}")
        return None

# Input Validation Functions
def validate_image(img):
    """
    Validates the uploaded image based on size and resolution limits.

    Args:
        img (PIL.Image.Image): Image to validate.

    Returns:
        Tuple[bool, str or None]: (True, None) if valid; (False, reason) otherwise.
    """
    logger.info("Validating uploaded image.")
    try:
        buffer = io.BytesIO()
        img.save(buffer, format="PNG")
        size_mb = len(buffer.getvalue()) / (1024 * 1024)

        if size_mb > MAX_IMAGE_MB:
            logger.warning("Image exceeds size limit of 5MB.")
            return False, "Image exceeds 5MB limit."

        if img.width > MAX_IMAGE_RES[0] or img.height > MAX_IMAGE_RES[1]:
            logger.warning("Image resolution exceeds 1920x1080.")
            return False, "Image resolution exceeds 1920x1080."

        logger.info("Image validation passed.")
        return True, None
    except Exception as e:
        logger.error(f"Error validating image: {e}")
        return False, str(e)

def validate_video(path):
    """
    Validates the uploaded video based on size and duration limits.

    Args:
        path (str): Path to the video file.

    Returns:
        Tuple[bool, str or None]: (True, None) if valid; (False, reason) otherwise.
    """
    logger.info(f"Validating video file at: {path}")
    try:
        size_mb = os.path.getsize(path) / (1024 * 1024)
        if size_mb > MAX_VIDEO_MB:
            logger.warning("Video exceeds size limit of 50MB.")
            return False, "Video exceeds 50MB limit."

        cap = cv2.VideoCapture(path)
        fps = cap.get(cv2.CAP_PROP_FPS)
        frames = cap.get(cv2.CAP_PROP_FRAME_COUNT)
        duration = frames / fps if fps else 0
        cap.release()

        if duration > MAX_VIDEO_DURATION:
            logger.warning("Video exceeds 30 seconds duration limit.")
            return False, "Video exceeds 30 seconds duration limit."

        logger.info("Video validation passed.")
        return True, None
    except Exception as e:
        logger.error(f"Error validating video: {e}")
        return False, str(e)

# Input Resolution
def resolve_input(mode, uploaded_img, uploaded_imgs, uploaded_vid, url):
    """
    Resolves the input source based on user selection.
    Supports single image, multiple images, video, or URL-based media.

    Args:
        mode (str): Input mode - 'Upload' or 'URL'.
        uploaded_img (PIL.Image.Image): Single uploaded image.
        uploaded_imgs (List[PIL.Image.Image]): List of uploaded images (batch).
        uploaded_vid (str): Uploaded video file path.
        url (str): URL pointing to media content.

    Returns:
        List[Union[PIL.Image.Image, str, None]]: A list of media items to process.
    """
    logger.info(f"Resolving input based on mode: {mode}")
    try:
        if mode == "Upload":
            # Prefer batch if provided
            if uploaded_imgs and len(uploaded_imgs) > 0:
                return uploaded_imgs
            elif uploaded_img:
                return [uploaded_img]
            elif uploaded_vid:
                return [uploaded_vid]
            else:
                logger.warning("No valid upload provided.")
                return None

        elif mode == "URL":
            media_from_url = fetch_media_from_url(url)
            if media_from_url:
                return [media_from_url]
            else:
                logger.warning("Failed to fetch valid media from URL.")
                return None

        else:
            logger.warning("Invalid input mode selected.")
            return None

    except Exception as e:
        logger.error(f"Error resolving input: {e}")
        return None

@timeout_decorator.timeout(35, use_signals=False)  # 35 sec limit per image
def process_image(
    image: Image.Image,
    run_det: bool,
    det_model: str,
    det_confidence: float,
    run_seg: bool,
    seg_model: str,
    run_depth: bool,
    depth_model: str,
    blend: float
):
    """
    Runs selected perception tasks on the input image and packages results.

    Args:
        image (PIL.Image): Input image.
        run_det (bool): Run object detection.
        det_model (str): Detection model key.
        det_confidence (float): Detection confidence threshold.
        run_seg (bool): Run segmentation.
        seg_model (str): Segmentation model key.
        run_depth (bool): Run depth estimation.
        depth_model (str): Depth model key.
        blend (float): Overlay blend alpha (0.0 - 1.0).

    Returns:
        Tuple[Image, dict, Tuple[str, bytes]]: Final image, scene JSON, and downloadable ZIP.
    """
    logger.info("Starting image processing pipeline.")
    start_time = time.time()
    outputs, scene = {}, {}
    combined_np = np.array(image)

    try:
        # Detection
        if run_det:
            logger.info(f"Running detection with model: {det_model}")
            load_start = time.time()
            model = get_model("detection", DETECTION_MODEL_MAP[det_model], device="cpu")
            logger.info(f"{det_model} detection model loaded in {time.time() - load_start:.2f} seconds.")
            boxes = model.predict(image, conf_threshold=det_confidence)
            overlay = model.draw(image, boxes)
            combined_np = np.array(overlay)
            buf = io.BytesIO()
            overlay.save(buf, format="PNG")
            outputs["detection.png"] = buf.getvalue()
            scene["detection"] = boxes

        # Segmentation
        if run_seg:
            logger.info(f"Running segmentation with model: {seg_model}")
            load_start = time.time()
            model = get_model("segmentation", SEGMENTATION_MODEL_MAP[seg_model], device="cpu")
            logger.info(f"{seg_model} segmentation model loaded in {time.time() - load_start:.2f} seconds.")
            mask = model.predict(image)
            overlay = model.draw(image, mask, alpha=blend)
            combined_np = cv2.addWeighted(combined_np, 1 - blend, np.array(overlay), blend, 0)
            buf = io.BytesIO()
            overlay.save(buf, format="PNG")
            outputs["segmentation.png"] = buf.getvalue()
            scene["segmentation"] = mask.tolist()

        # Depth Estimation
        if run_depth:
            logger.info(f"Running depth estimation with model: {depth_model}")
            load_start = time.time()
            model = get_model("depth", DEPTH_MODEL_MAP[depth_model], device="cpu")
            logger.info(f"{depth_model} depth model loaded in {time.time() - load_start:.2f} seconds.")
            dmap = model.predict(image)
            norm_dmap = ((dmap - dmap.min()) / (dmap.ptp()) * 255).astype(np.uint8)
            d_pil = Image.fromarray(norm_dmap)
            combined_np = cv2.addWeighted(combined_np, 1 - blend, np.array(d_pil.convert("RGB")), blend, 0)
            buf = io.BytesIO()
            d_pil.save(buf, format="PNG")
            outputs["depth_map.png"] = buf.getvalue()
            scene["depth"] = dmap.tolist()

        # Final image overlay
        final_img = Image.fromarray(combined_np)
        buf = io.BytesIO()
        final_img.save(buf, format="PNG")
        outputs["scene_blueprint.png"] = buf.getvalue()

        # Scene description
        try:
            scene_json = describe_scene(**scene)
        except Exception as e:
            logger.warning(f"describe_scene failed: {e}")
            scene_json = {"error": str(e)}
        telemetry = {
        "session_id": generate_session_id(),
        "runtime_sec": round(log_runtime(start_time), 2),
        "used_models": {
            "detection": det_model if run_det else None,
            "segmentation": seg_model if run_seg else None,
            "depth": depth_model if run_depth else None
            }
        }
        scene_json["telemetry"] = telemetry

        outputs["scene_description.json"] = json.dumps(scene_json, indent=2).encode("utf-8")

        # ZIP file creation
        zip_buf = io.BytesIO()
        with zipfile.ZipFile(zip_buf, "w") as zipf:
            for name, data in outputs.items():
                zipf.writestr(name, data)

        elapsed = log_runtime(start_time)
        logger.info(f"Image processing completed in {elapsed:.2f} seconds.")

        return final_img, scene_json, ("uvis_results.zip", zip_buf.getvalue())

    except Exception as e:
        logger.error(f"Error in processing pipeline: {e}")
        return None, {"error": str(e)}, None

# Main Handler
def handle(mode, img, imgs, vid, url, run_det, det_model, det_confidence, run_seg, seg_model, run_depth, depth_model, blend):
    """
    Master handler for resolving input and processing.
    Returns outputs for Gradio interface.
    """
    session_id = generate_session_id()
    logger.info(f"Session ID: {session_id} | Handler activated with mode: {mode}")
    start_time = time.time()

    media = resolve_input(mode, img, imgs, vid, url)
    if not media:
        return None, format_error("No valid input provided. Please check your upload or URL."), None

    results = []
    for single_media in media:
        if isinstance(single_media, str):  # Video file
            valid, err = validate_video(single_media)
            if not valid:
                return None, format_error(err), None
            cap = cv2.VideoCapture(single_media)
            ret, frame = cap.read()
            cap.release()
            if not ret:
                return None, format_error("Failed to read video frame."), None
            single_media = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))

        if isinstance(single_media, Image.Image):
            valid, err = validate_image(single_media)
            if not valid:
                return None, format_error(err), None
            try:
                return process_image(single_media, run_det, det_model, det_confidence, run_seg, seg_model, run_depth, depth_model, blend)
            except timeout_decorator.timeout_decorator.TimeoutError:
                logger.error("Image processing timed out.")
                return None, format_error("Processing timed out. Try a smaller image or simpler model."), None

    logger.warning("Unsupported media type resolved.")
    log_runtime(start_time)
    return None, format_error("Invalid input. Please check your upload or URL."), None

# Gradio Interface
with gr.Blocks() as demo:
    gr.Markdown("## Unified Visual Intelligence System (UVIS)")

    # Input Mode Selection
    mode = gr.Radio(["Upload", "URL"], value="Upload", label="Input Mode")
    img = gr.Image(type="pil", label="Upload Image")
    imgs = gr.Gallery(label="Upload Multiple Images (Up to 5)")
    vid = gr.Video(label="Upload Video (<= 30s)")
    url = gr.Textbox(label="URL (Image/Video)")

    # Task Selection with parameters
    with gr.Accordion("Object Detection Settings", open=False):
        run_det = gr.Checkbox(label="Enable Object Detection")
        det_model = gr.Dropdown(list(DETECTION_MODEL_MAP), label="Detection Model", visible=False)
        det_confidence = gr.Slider(0.1, 1.0, 0.5, label="Detection Confidence Threshold", visible=False)

    with gr.Accordion("Semantic Segmentation Settings", open=False):
        run_seg = gr.Checkbox(label="Enable Segmentation")
        seg_model = gr.Dropdown(list(SEGMENTATION_MODEL_MAP), label="Segmentation Model", visible=False)

    with gr.Accordion("Depth Estimation Settings", open=False):
        run_depth = gr.Checkbox(label="Enable Depth Estimation")
        depth_model = gr.Dropdown(list(DEPTH_MODEL_MAP), label="Depth Model", visible=False)

    blend = gr.Slider(0.0, 1.0, 0.5, label="Overlay Blend")
    
    # Run Button
    run = gr.Button("Run Analysis")
    
    # Output Tabs
    with gr.Tab("Scene JSON"):
        json_out = gr.JSON()
    with gr.Tab("Scene Blueprint"):
        img_out = gr.Image()
    with gr.Tab("Download"):
        zip_out = gr.File()

    # Attach Visibility Logic
    run_det.change(toggle_visibility, run_det, [det_model, det_confidence])
    run_seg.change(toggle_visibility, run_seg, [seg_model])
    run_depth.change(toggle_visibility, run_depth, [depth_model])

    # Button Click Event
    run.click(
        handle,
        inputs=[mode, img, imgs, vid, url, run_det, det_model, det_confidence, run_seg, seg_model, run_depth, depth_model, blend],
        outputs=[img_out, json_out, zip_out]
    )

    # Footer Section
    gr.Markdown("---")
    gr.Markdown(
        """
        <div style='text-align: center; font-size: 14px;'>
            Built by <b>Durga Deepak Valluri</b><br>
            <a href="https://github.com/DurgaDeepakValluri/UVIS" target="_blank">GitHub</a> |
            <a href="https://deecoded.io" target="_blank">Website</a> |
            <a href="https://www.linkedin.com/in/durga-deepak-valluri" target="_blank">LinkedIn</a>
        </div>
        """,
    )

# Launch the Gradio App
demo.launch()