Spaces:
Running
on
Zero
Running
on
Zero
# UVIS - Gradio App with Upload, URL & Video Support | |
""" | |
This script launches the UVIS (Unified Visual Intelligence System) as a Gradio Web App. | |
Supports image, video, and URL-based media inputs for detection, segmentation, and depth estimation. | |
Outputs include scene blueprint, structured JSON, and downloadable results. | |
""" | |
import gradio as gr | |
from PIL import Image | |
import numpy as np | |
import os | |
import io | |
import zipfile | |
import json | |
import tempfile | |
import logging | |
import cv2 | |
import requests | |
from urllib.parse import urlparse | |
from registry import get_model | |
from core.describe_scene import describe_scene | |
import uuid | |
import time | |
import timeout_decorator | |
import socket | |
import ipaddress | |
from huggingface_hub import hf_hub_download | |
import spaces | |
# Setup logging | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
logger = logging.getLogger(__name__) | |
# Model mappings | |
DETECTION_MODEL_MAP = { | |
"YOLOv5-Nano": "yolov5n-seg", | |
"YOLOv5-Small": "yolov5s-seg", | |
"YOLOv8-Small": "yolov8s", | |
"YOLOv8-Large": "yolov8l", | |
"RT-DETR": "rtdetr" # For future support | |
} | |
SEGMENTATION_MODEL_MAP = { | |
"SegFormer-B0": "nvidia/segformer-b0-finetuned-ade-512-512", | |
"SegFormer-B5": "nvidia/segformer-b5-finetuned-ade-512-512", | |
"DeepLabV3-ResNet50": "deeplabv3_resnet50" | |
} | |
DEPTH_MODEL_MAP = { | |
"MiDaS v21 Small 256": "midas_v21_small_256", | |
"MiDaS v21 384": "midas_v21_384", | |
"DPT Hybrid 384": "dpt_hybrid_384", | |
"DPT Swin2 Large 384": "dpt_swin2_large_384", | |
"DPT Beit Large 512": "dpt_beit_large_512" | |
} | |
# Resource Limits | |
MAX_IMAGE_MB = 5 | |
MAX_IMAGE_RES = (1920, 1080) | |
MAX_VIDEO_MB = 50 | |
MAX_VIDEO_DURATION = 30 # seconds | |
def preload_models(): | |
""" | |
This function is needed to activate ZeroGPU. It must be decorated with @spaces.GPU. | |
It can be used to warm up models or load them into memory. | |
""" | |
from registry import get_model | |
print("Warming up models for ZeroGPU...") | |
get_model("detection", "yolov5n-seg", device="cpu") | |
get_model("segmentation", "deeplabv3_resnet50", device="cpu") | |
get_model("depth", "midas_v21_small_256", device="cpu") | |
# Utility Functions | |
def format_error(message): | |
"""Formats error messages for consistent user feedback.""" | |
return {"error": message} | |
def toggle_visibility(show, *components): | |
"""Toggles visibility for multiple Gradio components.""" | |
return [gr.update(visible=show) for _ in components] | |
def generate_session_id(): | |
"""Generates a unique session ID for tracking inputs.""" | |
return str(uuid.uuid4()) | |
def log_runtime(start_time): | |
"""Logs the runtime of a process.""" | |
elapsed_time = time.time() - start_time | |
logger.info(f"Process completed in {elapsed_time:.2f} seconds.") | |
return elapsed_time | |
def is_public_ip(url): | |
""" | |
Checks whether the resolved IP address of a URL is public (non-local). | |
Prevents SSRF by blocking internal addresses like 127.0.0.1 or 192.168.x.x. | |
""" | |
try: | |
hostname = urlparse(url).hostname | |
ip = socket.gethostbyname(hostname) | |
ip_obj = ipaddress.ip_address(ip) | |
return ip_obj.is_global # Only allow globally routable IPs | |
except Exception as e: | |
logger.warning(f"URL IP validation failed: {e}") | |
return False | |
def fetch_media_from_url(url): | |
""" | |
Downloads media from a URL. Supports images and videos. | |
Returns PIL.Image or video file path. | |
""" | |
logger.info(f"Fetching media from URL: {url}") | |
if not is_public_ip(url): | |
logger.warning("Blocked non-public URL request (possible SSRF).") | |
return None | |
try: | |
parsed_url = urlparse(url) | |
ext = os.path.splitext(parsed_url.path)[-1].lower() | |
headers = {"User-Agent": "Mozilla/5.0"} | |
r = requests.get(url, headers=headers, timeout=10) | |
if r.status_code != 200 or len(r.content) > 50 * 1024 * 1024: | |
logger.warning(f"Download failed or file too large.") | |
return None | |
tmp_file = tempfile.NamedTemporaryFile(delete=False, suffix=ext) | |
tmp_file.write(r.content) | |
tmp_file.close() | |
if ext in [".jpg", ".jpeg", ".png"]: | |
return Image.open(tmp_file.name).convert("RGB") | |
elif ext in [".mp4", ".avi", ".mov"]: | |
return tmp_file.name | |
else: | |
logger.warning("Unsupported file type from URL.") | |
return None | |
except Exception as e: | |
logger.error(f"URL fetch failed: {e}") | |
return None | |
# Input Validation Functions | |
def validate_image(img): | |
""" | |
Validates the uploaded image based on size and resolution limits. | |
Args: | |
img (PIL.Image.Image): Image to validate. | |
Returns: | |
Tuple[bool, str or None]: (True, None) if valid; (False, reason) otherwise. | |
""" | |
logger.info("Validating uploaded image.") | |
try: | |
buffer = io.BytesIO() | |
img.save(buffer, format="PNG") | |
size_mb = len(buffer.getvalue()) / (1024 * 1024) | |
if size_mb > MAX_IMAGE_MB: | |
logger.warning("Image exceeds size limit of 5MB.") | |
return False, "Image exceeds 5MB limit." | |
if img.width > MAX_IMAGE_RES[0] or img.height > MAX_IMAGE_RES[1]: | |
logger.warning("Image resolution exceeds 1920x1080.") | |
return False, "Image resolution exceeds 1920x1080." | |
logger.info("Image validation passed.") | |
return True, None | |
except Exception as e: | |
logger.error(f"Error validating image: {e}") | |
return False, str(e) | |
def validate_video(path): | |
""" | |
Validates the uploaded video based on size and duration limits. | |
Args: | |
path (str): Path to the video file. | |
Returns: | |
Tuple[bool, str or None]: (True, None) if valid; (False, reason) otherwise. | |
""" | |
logger.info(f"Validating video file at: {path}") | |
try: | |
size_mb = os.path.getsize(path) / (1024 * 1024) | |
if size_mb > MAX_VIDEO_MB: | |
logger.warning("Video exceeds size limit of 50MB.") | |
return False, "Video exceeds 50MB limit." | |
cap = cv2.VideoCapture(path) | |
fps = cap.get(cv2.CAP_PROP_FPS) | |
frames = cap.get(cv2.CAP_PROP_FRAME_COUNT) | |
duration = frames / fps if fps else 0 | |
cap.release() | |
if duration > MAX_VIDEO_DURATION: | |
logger.warning("Video exceeds 30 seconds duration limit.") | |
return False, "Video exceeds 30 seconds duration limit." | |
logger.info("Video validation passed.") | |
return True, None | |
except Exception as e: | |
logger.error(f"Error validating video: {e}") | |
return False, str(e) | |
# Input Resolution | |
def resolve_input(mode, uploaded_img, uploaded_imgs, uploaded_vid, url): | |
""" | |
Resolves the input source based on user selection. | |
Supports single image, multiple images, video, or URL-based media. | |
Args: | |
mode (str): Input mode - 'Upload' or 'URL'. | |
uploaded_img (PIL.Image.Image): Single uploaded image. | |
uploaded_imgs (List[PIL.Image.Image]): List of uploaded images (batch). | |
uploaded_vid (str): Uploaded video file path. | |
url (str): URL pointing to media content. | |
Returns: | |
List[Union[PIL.Image.Image, str, None]]: A list of media items to process. | |
""" | |
logger.info(f"Resolving input based on mode: {mode}") | |
try: | |
if mode == "Upload": | |
# Prefer batch if provided | |
if uploaded_imgs and len(uploaded_imgs) > 0: | |
return uploaded_imgs | |
elif uploaded_img: | |
return [uploaded_img] | |
elif uploaded_vid: | |
return [uploaded_vid] | |
else: | |
logger.warning("No valid upload provided.") | |
return None | |
elif mode == "URL": | |
media_from_url = fetch_media_from_url(url) | |
if media_from_url: | |
return [media_from_url] | |
else: | |
logger.warning("Failed to fetch valid media from URL.") | |
return None | |
else: | |
logger.warning("Invalid input mode selected.") | |
return None | |
except Exception as e: | |
logger.error(f"Error resolving input: {e}") | |
return None | |
# 35 sec limit per image | |
def process_image( | |
image: Image.Image, | |
run_det: bool, | |
det_model: str, | |
det_confidence: float, | |
run_seg: bool, | |
seg_model: str, | |
run_depth: bool, | |
depth_model: str, | |
blend: float | |
): | |
""" | |
Runs selected perception tasks on the input image and packages results. | |
Args: | |
image (PIL.Image): Input image. | |
run_det (bool): Run object detection. | |
det_model (str): Detection model key. | |
det_confidence (float): Detection confidence threshold. | |
run_seg (bool): Run segmentation. | |
seg_model (str): Segmentation model key. | |
run_depth (bool): Run depth estimation. | |
depth_model (str): Depth model key. | |
blend (float): Overlay blend alpha (0.0 - 1.0). | |
Returns: | |
Tuple[Image, dict, Tuple[str, bytes]]: Final image, scene JSON, and downloadable ZIP. | |
""" | |
logger.info("Starting image processing pipeline.") | |
start_time = time.time() | |
outputs, scene = {}, {} | |
combined_np = np.array(image) | |
try: | |
# Detection | |
if run_det: | |
logger.info(f"Running detection with model: {det_model}") | |
load_start = time.time() | |
model = get_model("detection", DETECTION_MODEL_MAP[det_model], device="cpu") | |
logger.info(f"{det_model} detection model loaded in {time.time() - load_start:.2f} seconds.") | |
boxes = model.predict(image, conf_threshold=det_confidence) | |
overlay = model.draw(image, boxes) | |
combined_np = np.array(overlay) | |
buf = io.BytesIO() | |
overlay.save(buf, format="PNG") | |
outputs["detection.png"] = buf.getvalue() | |
scene["detection"] = boxes | |
# Segmentation | |
if run_seg: | |
logger.info(f"Running segmentation with model: {seg_model}") | |
load_start = time.time() | |
model = get_model("segmentation", SEGMENTATION_MODEL_MAP[seg_model], device="cpu") | |
logger.info(f"{seg_model} segmentation model loaded in {time.time() - load_start:.2f} seconds.") | |
mask = model.predict(image) | |
overlay = model.draw(image, mask, alpha=blend) | |
combined_np = cv2.addWeighted(combined_np, 1 - blend, np.array(overlay), blend, 0) | |
buf = io.BytesIO() | |
overlay.save(buf, format="PNG") | |
outputs["segmentation.png"] = buf.getvalue() | |
scene["segmentation"] = mask.tolist() | |
# Depth Estimation | |
if run_depth: | |
logger.info(f"Running depth estimation with model: {depth_model}") | |
load_start = time.time() | |
model = get_model("depth", DEPTH_MODEL_MAP[depth_model], device="cpu") | |
logger.info(f"{depth_model} depth model loaded in {time.time() - load_start:.2f} seconds.") | |
dmap = model.predict(image) | |
norm_dmap = ((dmap - dmap.min()) / (dmap.ptp()) * 255).astype(np.uint8) | |
d_pil = Image.fromarray(norm_dmap) | |
combined_np = cv2.addWeighted(combined_np, 1 - blend, np.array(d_pil.convert("RGB")), blend, 0) | |
buf = io.BytesIO() | |
d_pil.save(buf, format="PNG") | |
outputs["depth_map.png"] = buf.getvalue() | |
scene["depth"] = dmap.tolist() | |
# Final image overlay | |
final_img = Image.fromarray(combined_np) | |
buf = io.BytesIO() | |
final_img.save(buf, format="PNG") | |
outputs["scene_blueprint.png"] = buf.getvalue() | |
# Scene description | |
try: | |
scene_json = describe_scene(**scene) | |
except Exception as e: | |
logger.warning(f"describe_scene failed: {e}") | |
scene_json = {"error": str(e)} | |
telemetry = { | |
"session_id": generate_session_id(), | |
"runtime_sec": round(log_runtime(start_time), 2), | |
"used_models": { | |
"detection": det_model if run_det else None, | |
"segmentation": seg_model if run_seg else None, | |
"depth": depth_model if run_depth else None | |
} | |
} | |
scene_json["telemetry"] = telemetry | |
outputs["scene_description.json"] = json.dumps(scene_json, indent=2).encode("utf-8") | |
# ZIP file creation | |
zip_buf = io.BytesIO() | |
with zipfile.ZipFile(zip_buf, "w") as zipf: | |
for name, data in outputs.items(): | |
zipf.writestr(name, data) | |
elapsed = log_runtime(start_time) | |
logger.info(f"Image processing completed in {elapsed:.2f} seconds.") | |
return final_img, scene_json, ("uvis_results.zip", zip_buf.getvalue()) | |
except Exception as e: | |
logger.error(f"Error in processing pipeline: {e}") | |
return None, {"error": str(e)}, None | |
# Main Handler | |
def handle(mode, img, imgs, vid, url, run_det, det_model, det_confidence, run_seg, seg_model, run_depth, depth_model, blend): | |
""" | |
Master handler for resolving input and processing. | |
Returns outputs for Gradio interface. | |
""" | |
session_id = generate_session_id() | |
logger.info(f"Session ID: {session_id} | Handler activated with mode: {mode}") | |
start_time = time.time() | |
media = resolve_input(mode, img, imgs, vid, url) | |
if not media: | |
return None, format_error("No valid input provided. Please check your upload or URL."), None | |
results = [] | |
for single_media in media: | |
if isinstance(single_media, str): # Video file | |
valid, err = validate_video(single_media) | |
if not valid: | |
return None, format_error(err), None | |
cap = cv2.VideoCapture(single_media) | |
ret, frame = cap.read() | |
cap.release() | |
if not ret: | |
return None, format_error("Failed to read video frame."), None | |
single_media = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) | |
if isinstance(single_media, Image.Image): | |
valid, err = validate_image(single_media) | |
if not valid: | |
return None, format_error(err), None | |
try: | |
return process_image(single_media, run_det, det_model, det_confidence, run_seg, seg_model, run_depth, depth_model, blend) | |
except timeout_decorator.timeout_decorator.TimeoutError: | |
logger.error("Image processing timed out.") | |
return None, format_error("Processing timed out. Try a smaller image or simpler model."), None | |
logger.warning("Unsupported media type resolved.") | |
log_runtime(start_time) | |
return None, format_error("Invalid input. Please check your upload or URL."), None | |
# Gradio Interface | |
with gr.Blocks() as demo: | |
gr.Markdown("## Unified Visual Intelligence System (UVIS)") | |
# Input Mode Selection | |
mode = gr.Radio(["Upload", "URL"], value="Upload", label="Input Mode") | |
img = gr.Image(type="pil", label="Upload Image") | |
imgs = gr.Gallery(label="Upload Multiple Images (Up to 5)") | |
vid = gr.Video(label="Upload Video (<= 30s)") | |
url = gr.Textbox(label="URL (Image/Video)") | |
# Task Selection with parameters | |
with gr.Accordion("Object Detection Settings", open=False): | |
run_det = gr.Checkbox(label="Enable Object Detection") | |
det_model = gr.Dropdown(list(DETECTION_MODEL_MAP), label="Detection Model", visible=False) | |
det_confidence = gr.Slider(0.1, 1.0, 0.5, label="Detection Confidence Threshold", visible=False) | |
with gr.Accordion("Semantic Segmentation Settings", open=False): | |
run_seg = gr.Checkbox(label="Enable Segmentation") | |
seg_model = gr.Dropdown(list(SEGMENTATION_MODEL_MAP), label="Segmentation Model", visible=False) | |
with gr.Accordion("Depth Estimation Settings", open=False): | |
run_depth = gr.Checkbox(label="Enable Depth Estimation") | |
depth_model = gr.Dropdown(list(DEPTH_MODEL_MAP), label="Depth Model", visible=False) | |
blend = gr.Slider(0.0, 1.0, 0.5, label="Overlay Blend") | |
# Run Button | |
run = gr.Button("Run Analysis") | |
# Output Tabs | |
with gr.Tab("Scene JSON"): | |
json_out = gr.JSON() | |
with gr.Tab("Scene Blueprint"): | |
img_out = gr.Image() | |
with gr.Tab("Download"): | |
zip_out = gr.File() | |
# Attach Visibility Logic | |
run_det.change(toggle_visibility, run_det, [det_model, det_confidence]) | |
run_seg.change(toggle_visibility, run_seg, [seg_model]) | |
run_depth.change(toggle_visibility, run_depth, [depth_model]) | |
# Button Click Event | |
run.click( | |
handle, | |
inputs=[mode, img, imgs, vid, url, run_det, det_model, det_confidence, run_seg, seg_model, run_depth, depth_model, blend], | |
outputs=[img_out, json_out, zip_out] | |
) | |
# Footer Section | |
gr.Markdown("---") | |
gr.Markdown( | |
""" | |
<div style='text-align: center; font-size: 14px;'> | |
Built by <b>Durga Deepak Valluri</b><br> | |
<a href="https://github.com/DurgaDeepakValluri/UVIS" target="_blank">GitHub</a> | | |
<a href="https://deecoded.io" target="_blank">Website</a> | | |
<a href="https://www.linkedin.com/in/durga-deepak-valluri" target="_blank">LinkedIn</a> | |
</div> | |
""", | |
) | |
# Launch the Gradio App | |
demo.launch() | |