Spaces:
Running
on
Zero
Running
on
Zero
# UVIS - Gradio App with Upload, URL & Video Support | |
""" | |
This script launches the UVIS (Unified Visual Intelligence System) as a Gradio Web App. | |
Supports image, video, and URL-based media inputs for detection, segmentation, and depth estimation. | |
Outputs include scene blueprint, structured JSON, and downloadable results. | |
""" | |
import time | |
import logging | |
import traceback | |
import gradio as gr | |
from PIL import Image | |
import cv2 | |
import timeout_decorator | |
import spaces | |
import tempfile | |
import shutil | |
import os | |
from registry import get_model | |
from core.describe_scene import describe_scene | |
from core.process import process_image | |
from core.input_handler import resolve_input, validate_video, validate_image | |
from utils.helpers import format_error, generate_session_id | |
from huggingface_hub import hf_hub_download | |
try: | |
shutil.rmtree(os.path.expanduser("~/.cache/huggingface"), ignore_errors=True) | |
shutil.rmtree("/home/user/.cache/huggingface", ignore_errors=True) | |
print("π₯ Nuked HF model cache from runtime.") | |
except Exception as e: | |
print("π« Failed to nuke cache:", e) | |
# Setup logging | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
logger = logging.getLogger(__name__) | |
# Model mappings | |
DETECTION_MODEL_MAP = { | |
"YOLOv8-Nano": "yolov8n", | |
"YOLOv8-Small": "yolov8s", | |
"YOLOv8-Large": "yolov8l", | |
"YOLOv11-Beta": "yolov11b" | |
} | |
SEGMENTATION_MODEL_MAP = { | |
"SegFormer-B0": "segformer_b0", | |
"SegFormer-B5": "segformer_b5", | |
"DeepLabV3-ResNet50": "deeplabv3_resnet50" | |
} | |
DEPTH_MODEL_MAP = { | |
"MiDaS v21 Small 256": "midas_v21_small_256", | |
"MiDaS v21 384": "midas_v21_384", | |
"DPT Hybrid 384": "dpt_hybrid_384", | |
"DPT Swin2 Large 384": "dpt_swin2_large_384", | |
"DPT Beit Large 512": "dpt_beit_large_512" | |
} | |
# # Resource Limits | |
# MAX_IMAGE_MB = 15 | |
# MAX_IMAGE_RES = (1920, 1080) | |
# MAX_VIDEO_MB = 50 | |
# MAX_VIDEO_DURATION = 15 # seconds | |
# def preload_models(): | |
# """ | |
# This function is needed to activate ZeroGPU. It must be decorated with @spaces.GPU. | |
# It can be used to warm up models or load them into memory. | |
# """ | |
# from registry import get_model | |
# print("Warming up models for ZeroGPU...") | |
# get_model("detection", "yolov8n", device="cpu") | |
# get_model("segmentation", "deeplabv3_resnet50", device="cpu") | |
# get_model("depth", "midas_v21_small_256", device="cpu") | |
def handle(mode, media_upload, url, | |
run_det, det_model, det_confidence, | |
run_seg, seg_model, | |
run_depth, depth_model, | |
blend): | |
""" | |
Master handler for resolving input and processing. | |
Returns: (img_out, vid_out, json_out, zip_out) | |
""" | |
session_id = generate_session_id() | |
logger.info(f"Session ID: {session_id} | Handler activated with mode: {mode}") | |
start_time = time.time() | |
media = resolve_input(mode, media_upload, url) | |
if not media: | |
return ( | |
gr.update(visible=False), | |
gr.update(visible=False), | |
format_error("No valid input provided. Please check your upload or URL."), | |
None | |
) | |
first_input = media[0] | |
# π§ Resolve dropdown label to model keys | |
resolved_det_model = DETECTION_MODEL_MAP.get(det_model, det_model) | |
resolved_seg_model = SEGMENTATION_MODEL_MAP.get(seg_model, seg_model) | |
resolved_depth_model = DEPTH_MODEL_MAP.get(depth_model, depth_model) | |
# --- VIDEO PATH --- | |
if isinstance(first_input, str) and first_input.lower().endswith((".mp4", ".mov", ".avi")): | |
valid, err = validate_video(first_input) | |
if not valid: | |
return ( | |
gr.update(visible=False), | |
gr.update(visible=False), | |
format_error(err), | |
None | |
) | |
try: | |
_, msg, output_video_path = process_video( | |
video_path=first_input, | |
run_det=run_det, | |
det_model=resolved_det_model, | |
det_confidence=det_confidence, | |
run_seg=run_seg, | |
seg_model=resolved_seg_model, | |
run_depth=run_depth, | |
depth_model=resolved_depth_model, | |
blend=blend | |
) | |
return ( | |
gr.update(visible=False), # hide image | |
gr.update(value=output_video_path, visible=True), # show video | |
msg, | |
output_video_path # for download | |
) | |
except Exception as e: | |
logger.error(f"Video processing failed: {e}") | |
return ( | |
gr.update(visible=False), | |
gr.update(visible=False), | |
format_error(str(e)), | |
None | |
) | |
# --- IMAGE PATH --- | |
elif isinstance(first_input, Image.Image): | |
valid, err = validate_image(first_input) | |
if not valid: | |
return ( | |
gr.update(visible=False), | |
gr.update(visible=False), | |
format_error(err), | |
None | |
) | |
try: | |
result_img, msg, output_zip = process_image( | |
image=first_input, | |
run_det=run_det, | |
det_model=resolved_det_model, | |
det_confidence=det_confidence, | |
run_seg=run_seg, | |
seg_model=resolved_seg_model, | |
run_depth=run_depth, | |
depth_model=resolved_depth_model, | |
blend=blend | |
) | |
return ( | |
gr.update(value=result_img, visible=True), # show image | |
gr.update(visible=False), # hide video | |
msg, | |
output_zip | |
) | |
except timeout_decorator.timeout_decorator.TimeoutError: | |
logger.error("Image processing timed out.") | |
return ( | |
gr.update(visible=False), | |
gr.update(visible=False), | |
format_error("Processing timed out. Try a smaller image or simpler model."), | |
None | |
) | |
except Exception as e: | |
traceback.print_exc() | |
logger.error(f"Image processing failed: {e}") | |
return ( | |
gr.update(visible=False), | |
gr.update(visible=False), | |
format_error(str(e)), | |
None | |
) | |
logger.warning("Unsupported media type resolved.") | |
log_runtime(start_time) | |
return ( | |
gr.update(visible=False), | |
gr.update(visible=False), | |
format_error("Unsupported input type."), | |
None | |
) | |
def show_preview_from_upload(files): | |
if not files: | |
return gr.update(visible=False), gr.update(visible=False) | |
file = files[0] | |
filename = file.name.lower() | |
if filename.endswith((".png", ".jpg", ".jpeg", ".webp")): | |
img = Image.open(file).convert("RGB") | |
return gr.update(value=img, visible=True), gr.update(visible=False) | |
elif filename.endswith((".mp4", ".mov", ".avi")): | |
# Copy uploaded video to a known temp location | |
temp_dir = tempfile.mkdtemp() | |
ext = os.path.splitext(filename)[-1] | |
safe_path = os.path.join(temp_dir, f"uploaded_video{ext}") | |
with open(safe_path, "wb") as f: | |
f.write(file.read()) | |
return gr.update(visible=False), gr.update(value=safe_path, visible=True) | |
return gr.update(visible=False), gr.update(visible=False) | |
def show_preview_from_url(url_input): | |
if not url_input: | |
return gr.update(visible=False), gr.update(visible=False) | |
path = url_input.strip().lower() | |
if path.endswith((".png", ".jpg", ".jpeg", ".webp")): | |
return gr.update(value=url_input, visible=True), gr.update(visible=False) | |
elif path.endswith((".mp4", ".mov", ".avi")): | |
return gr.update(visible=False), gr.update(value=url_input, visible=True) | |
return gr.update(visible=False), gr.update(visible=False) | |
def clear_model_cache(): | |
""" | |
Deletes all model weight folders so they are redownloaded fresh. | |
""" | |
folders = [ | |
"models/detection/weights", | |
"models/segmentation/weights", | |
"models/depth/weights" | |
] | |
for folder in folders: | |
shutil.rmtree(folder, ignore_errors=True) | |
logger.info(f" Cleared: {folder}") | |
return " Model cache cleared. Models will be reloaded on next run." | |
# Gradio Interface | |
with gr.Blocks() as demo: | |
gr.Markdown("## Unified Visual Intelligence System (UVIS)") | |
with gr.Row(): | |
# left panel | |
with gr.Column(scale=2): | |
# Input Mode Toggle | |
mode = gr.Radio(["Upload", "URL"], value="Upload", label="Input Mode") | |
# File upload: accepts multiple images or one video (user chooses wisely) | |
media_upload = gr.File( | |
label="Upload Images (1β5) or 1 Video", | |
file_types=["image", ".mp4", ".mov", ".avi"], | |
file_count="multiple", | |
visible=True | |
) | |
# URL input | |
url = gr.Textbox(label="URL (Image/Video)", visible=False) | |
# Toggle visibility | |
def toggle_inputs(selected_mode): | |
return [ | |
gr.update(visible=(selected_mode == "Upload")), # media_upload | |
gr.update(visible=(selected_mode == "URL")), # url | |
gr.update(visible=False), # preview_image | |
gr.update(visible=False) # preview_video | |
] | |
mode.change(toggle_inputs, inputs=mode, outputs=[media_upload, url]) | |
# Visibility logic function | |
def toggle_visibility(checked): | |
return gr.update(visible=checked) | |
# def toggle_det_visibility(checked): | |
# return [gr.update(visible=checked), gr.update(visible=checked)] | |
run_det = gr.Checkbox(label="Object Detection") | |
run_seg = gr.Checkbox(label="Semantic Segmentation") | |
run_depth = gr.Checkbox(label="Depth Estimation") | |
with gr.Row(): | |
with gr.Column(visible=False) as OD_Settings: | |
with gr.Accordion("Object Detection Settings", open=True): | |
det_model = gr.Dropdown(choices=list(DETECTION_MODEL_MAP), label="Detection Model") | |
det_confidence = gr.Slider(0.1, 1.0, 0.5, label="Detection Confidence Threshold") | |
nms_thresh = gr.Slider(0.1, 1.0, 0.45, label="NMS Threshold") | |
max_det = gr.Slider(1, 100, 20, step=1, label="Max Detections") | |
iou_thresh = gr.Slider(0.1, 1.0, 0.5, label="IoU Threshold") | |
class_filter = gr.CheckboxGroup(["Person", "Car", "Dog"], label="Class Filter") | |
with gr.Column(visible=False) as SS_Settings: | |
with gr.Accordion("Semantic Segmentation Settings", open=True): | |
seg_model = gr.Dropdown(choices=list(SEGMENTATION_MODEL_MAP), label="Segmentation Model") | |
resize_strategy = gr.Dropdown(["Crop", "Pad", "Scale"], label="Resize Strategy") | |
overlay_alpha = gr.Slider(0.0, 1.0, 0.5, label="Overlay Opacity") | |
seg_classes = gr.CheckboxGroup(["Road", "Sky", "Building"], label="Target Classes") | |
enable_crf = gr.Checkbox(label="Postprocessing (CRF)") | |
with gr.Column(visible=False) as DE_Settings: | |
with gr.Accordion("Depth Estimation Settings", open=True): | |
depth_model = gr.Dropdown(choices=list(DEPTH_MODEL_MAP), label="Depth Model") | |
output_type = gr.Dropdown(["Raw", "Disparity", "Scaled"], label="Output Type") | |
colormap = gr.Dropdown(["Jet", "Viridis", "Plasma"], label="Colormap") | |
blend = gr.Slider(0.0, 1.0, 0.5, label="Overlay Blend") | |
normalize = gr.Checkbox(label="Normalize Depth") | |
max_depth = gr.Slider(0.1, 10.0, 5.0, label="Max Depth (meters)") | |
# Attach Visibility Logic | |
run_det.change(fn=toggle_visibility, inputs=[run_det], outputs=[OD_Settings]) | |
run_seg.change(fn=toggle_visibility, inputs=[run_seg], outputs=[SS_Settings]) | |
run_depth.change(fn=toggle_visibility, inputs=[run_depth], outputs=[DE_Settings]) | |
blend = gr.Slider(0.0, 1.0, 0.5, label="Overlay Blend") | |
# Run Button | |
run = gr.Button("Run Analysis") | |
#Right panel | |
with gr.Column(scale=1): | |
# single_img_preview = gr.Image(label="Preview (Image)", visible=False) | |
# gallery_preview = gr.Gallery(label="Preview (Gallery)", columns=3, height="auto", visible=False) | |
# video_preview = gr.Video(label="Preview (Video)", visible=False) | |
# Only one is shown at a time β image or video | |
img_out = gr.Image(label="Preview / Processed Output", visible=False) | |
vid_out = gr.Video(label="Preview / Processed Video", visible=False, streaming=True, autoplay=True) | |
json_out = gr.JSON(label="Scene JSON") | |
zip_out = gr.File(label="Download Results") | |
clear_button = gr.Button("π§Ή Clear Model Cache") | |
status_box = gr.Textbox(label="Status", interactive=False) | |
clear_button.click(fn=clear_model_cache, inputs=[], outputs=[status_box]) | |
media_upload.change(show_preview_from_upload, inputs=media_upload, outputs=[img_out, vid_out]) | |
url.submit(show_preview_from_url, inputs=url, outputs=[img_out, vid_out]) | |
# Unified run click β switch visibility based on image or video output | |
def route_output(image_output, json_output, zip_file): | |
# Show img_out if image was returned, else show video | |
if isinstance(image_output, Image.Image): | |
return gr.update(value=image_output, visible=True), gr.update(visible=False), json_output, zip_file | |
elif isinstance(zip_file, str) and zip_file.endswith(".mp4"): | |
return gr.update(visible=False), gr.update(value=zip_file, visible=True), json_output, zip_file | |
else: | |
return gr.update(visible=False), gr.update(visible=False), json_output, zip_file | |
# # Output Tabs | |
# with gr.Tab("Scene JSON"): | |
# json_out = gr.JSON() | |
# with gr.Tab("Scene Blueprint"): | |
# img_out = gr.Image() | |
# with gr.Tab("Download"): | |
# zip_out = gr.File() | |
# Button Click Event | |
run.click( | |
fn=handle, | |
inputs=[ | |
mode, media_upload, url, | |
run_det, det_model, det_confidence, | |
run_seg, seg_model, | |
run_depth, depth_model, | |
blend | |
], | |
outputs=[ | |
img_out, # will be visible only if it's an image | |
vid_out, # will be visible only if it's a video | |
json_out, | |
zip_out | |
] | |
) | |
# Footer Section | |
gr.Markdown("---") | |
gr.Markdown( | |
""" | |
<div style='text-align: center; font-size: 14px;'> | |
Built by <b>Durga Deepak Valluri</b><br> | |
<a href="https://github.com/DurgaDeepakValluri" target="_blank">GitHub</a> | | |
<a href="https://deecoded.io" target="_blank">Website</a> | | |
<a href="https://www.linkedin.com/in/durga-deepak-valluri" target="_blank">LinkedIn</a> | |
</div> | |
""", | |
) | |
# Launch the Gradio App | |
demo.launch(share=True) | |