UVIS / app.py
DurgaDeepak's picture
Update app.py
9438399 verified
raw
history blame
19.6 kB
# UVIS - Gradio App with Upload, URL & Video Support
"""
This script launches the UVIS (Unified Visual Intelligence System) as a Gradio Web App.
Supports image, video, and URL-based media inputs for detection, segmentation, and depth estimation.
Outputs include scene blueprint, structured JSON, and downloadable results.
"""
import time
import logging
import gradio as gr
from PIL import Image
import cv2
import timeout_decorator
import spaces
from registry import get_model
from core.describe_scene import describe_scene
from core.process import process_image
from core.input_handler import resolve_input, validate_video, validate_image
from utils.helpers import format_error, generate_session_id, toggle_visibility
from huggingface_hub import hf_hub_download
# Setup logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Model mappings
DETECTION_MODEL_MAP = {
"YOLOv5-Nano": "yolov5n-seg",
"YOLOv5-Small": "yolov5s-seg",
"YOLOv8-Small": "yolov8s",
"YOLOv8-Large": "yolov8l",
"RT-DETR": "rtdetr" # For future support
}
SEGMENTATION_MODEL_MAP = {
"SegFormer-B0": "nvidia/segformer-b0-finetuned-ade-512-512",
"SegFormer-B5": "nvidia/segformer-b5-finetuned-ade-512-512",
"DeepLabV3-ResNet50": "deeplabv3_resnet50"
}
DEPTH_MODEL_MAP = {
"MiDaS v21 Small 256": "midas_v21_small_256",
"MiDaS v21 384": "midas_v21_384",
"DPT Hybrid 384": "dpt_hybrid_384",
"DPT Swin2 Large 384": "dpt_swin2_large_384",
"DPT Beit Large 512": "dpt_beit_large_512"
}
# Resource Limits
MAX_IMAGE_MB = 5
MAX_IMAGE_RES = (1920, 1080)
MAX_VIDEO_MB = 50
MAX_VIDEO_DURATION = 30 # seconds
@spaces.GPU
def preload_models():
"""
This function is needed to activate ZeroGPU. It must be decorated with @spaces.GPU.
It can be used to warm up models or load them into memory.
"""
from registry import get_model
print("Warming up models for ZeroGPU...")
get_model("detection", "yolov5n-seg", device="cpu")
get_model("segmentation", "deeplabv3_resnet50", device="cpu")
get_model("depth", "midas_v21_small_256", device="cpu")
# Utility Functions
def format_error(message):
"""Formats error messages for consistent user feedback."""
return {"error": message}
def toggle_visibility(show, *components):
"""Toggles visibility for multiple Gradio components."""
return [gr.update(visible=show) for _ in components]
def generate_session_id():
"""Generates a unique session ID for tracking inputs."""
return str(uuid.uuid4())
def log_runtime(start_time):
"""Logs the runtime of a process."""
elapsed_time = time.time() - start_time
logger.info(f"Process completed in {elapsed_time:.2f} seconds.")
return elapsed_time
def is_public_ip(url):
"""
Checks whether the resolved IP address of a URL is public (non-local).
Prevents SSRF by blocking internal addresses like 127.0.0.1 or 192.168.x.x.
"""
try:
hostname = urlparse(url).hostname
ip = socket.gethostbyname(hostname)
ip_obj = ipaddress.ip_address(ip)
return ip_obj.is_global # Only allow globally routable IPs
except Exception as e:
logger.warning(f"URL IP validation failed: {e}")
return False
def fetch_media_from_url(url):
"""
Downloads media from a URL. Supports images and videos.
Returns PIL.Image or video file path.
"""
logger.info(f"Fetching media from URL: {url}")
if not is_public_ip(url):
logger.warning("Blocked non-public URL request (possible SSRF).")
return None
try:
parsed_url = urlparse(url)
ext = os.path.splitext(parsed_url.path)[-1].lower()
headers = {"User-Agent": "Mozilla/5.0"}
r = requests.get(url, headers=headers, timeout=10)
if r.status_code != 200 or len(r.content) > 50 * 1024 * 1024:
logger.warning(f"Download failed or file too large.")
return None
tmp_file = tempfile.NamedTemporaryFile(delete=False, suffix=ext)
tmp_file.write(r.content)
tmp_file.close()
if ext in [".jpg", ".jpeg", ".png"]:
return Image.open(tmp_file.name).convert("RGB")
elif ext in [".mp4", ".avi", ".mov"]:
return tmp_file.name
else:
logger.warning("Unsupported file type from URL.")
return None
except Exception as e:
logger.error(f"URL fetch failed: {e}")
return None
# Input Validation Functions
def validate_image(img):
"""
Validates the uploaded image based on size and resolution limits.
Args:
img (PIL.Image.Image): Image to validate.
Returns:
Tuple[bool, str or None]: (True, None) if valid; (False, reason) otherwise.
"""
logger.info("Validating uploaded image.")
try:
buffer = io.BytesIO()
img.save(buffer, format="PNG")
size_mb = len(buffer.getvalue()) / (1024 * 1024)
if size_mb > MAX_IMAGE_MB:
logger.warning("Image exceeds size limit of 5MB.")
return False, "Image exceeds 5MB limit."
if img.width > MAX_IMAGE_RES[0] or img.height > MAX_IMAGE_RES[1]:
logger.warning("Image resolution exceeds 1920x1080.")
return False, "Image resolution exceeds 1920x1080."
logger.info("Image validation passed.")
return True, None
except Exception as e:
logger.error(f"Error validating image: {e}")
return False, str(e)
def validate_video(path):
"""
Validates the uploaded video based on size and duration limits.
Args:
path (str): Path to the video file.
Returns:
Tuple[bool, str or None]: (True, None) if valid; (False, reason) otherwise.
"""
logger.info(f"Validating video file at: {path}")
try:
size_mb = os.path.getsize(path) / (1024 * 1024)
if size_mb > MAX_VIDEO_MB:
logger.warning("Video exceeds size limit of 50MB.")
return False, "Video exceeds 50MB limit."
cap = cv2.VideoCapture(path)
fps = cap.get(cv2.CAP_PROP_FPS)
frames = cap.get(cv2.CAP_PROP_FRAME_COUNT)
duration = frames / fps if fps else 0
cap.release()
if duration > MAX_VIDEO_DURATION:
logger.warning("Video exceeds 30 seconds duration limit.")
return False, "Video exceeds 30 seconds duration limit."
logger.info("Video validation passed.")
return True, None
except Exception as e:
logger.error(f"Error validating video: {e}")
return False, str(e)
# Input Resolution
def resolve_input(mode, media_upload, url):
"""
Resolves the media input based on selected mode.
- If mode is 'Upload', accepts either:
* 1–5 images (PIL.Image)
* OR 1 video file (file path as string)
- If mode is 'URL', fetches remote image or video.
Args:
mode (str): 'Upload' or 'URL'
media_upload (List[Union[PIL.Image.Image, str]]): Uploaded media
url (str): URL to image or video
Returns:
List[Union[PIL.Image.Image, str]] or None
"""
try:
logger.info(f"Resolving input for mode: {mode}")
if mode == "Upload":
if not media_upload:
logger.warning("No upload detected.")
return None
image_files = [f for f in media_upload if isinstance(f, Image.Image)]
video_files = [f for f in media_upload if isinstance(f, str) and f.lower().endswith((".mp4", ".mov", ".avi"))]
if image_files and video_files:
logger.warning("Mixed media upload not supported (images + video).")
return None
if image_files:
if 1 <= len(image_files) <= 5:
logger.info(f"Accepted {len(image_files)} image(s).")
return image_files
logger.warning("Invalid number of images. Must be 1 to 5.")
return None
if video_files:
if len(video_files) == 1:
logger.info("Accepted single video upload.")
return video_files
logger.warning("Only one video allowed.")
return None
logger.warning("Unsupported upload type.")
return None
elif mode == "URL":
if not url:
logger.warning("URL mode selected but URL is empty.")
return None
media = fetch_media_from_url(url)
if media:
logger.info("Media successfully fetched from URL.")
return [media]
else:
logger.warning("Failed to resolve media from URL.")
return None
else:
logger.error(f"Invalid mode selected: {mode}")
return None
except Exception as e:
logger.error(f"Exception in resolve_input(): {e}")
return None
@timeout_decorator.timeout(35, use_signals=False) # 35 sec limit per image
def process_image(
image: Image.Image,
run_det: bool,
det_model: str,
det_confidence: float,
run_seg: bool,
seg_model: str,
run_depth: bool,
depth_model: str,
blend: float
):
"""
Runs selected perception tasks on the input image and packages results.
Args:
image (PIL.Image): Input image.
run_det (bool): Run object detection.
det_model (str): Detection model key.
det_confidence (float): Detection confidence threshold.
run_seg (bool): Run segmentation.
seg_model (str): Segmentation model key.
run_depth (bool): Run depth estimation.
depth_model (str): Depth model key.
blend (float): Overlay blend alpha (0.0 - 1.0).
Returns:
Tuple[Image, dict, Tuple[str, bytes]]: Final image, scene JSON, and downloadable ZIP.
"""
logger.info("Starting image processing pipeline.")
start_time = time.time()
outputs, scene = {}, {}
combined_np = np.array(image)
try:
# Detection
if run_det:
logger.info(f"Running detection with model: {det_model}")
load_start = time.time()
model = get_model("detection", DETECTION_MODEL_MAP[det_model], device="cpu")
logger.info(f"{det_model} detection model loaded in {time.time() - load_start:.2f} seconds.")
boxes = model.predict(image, conf_threshold=det_confidence)
overlay = model.draw(image, boxes)
combined_np = np.array(overlay)
buf = io.BytesIO()
overlay.save(buf, format="PNG")
outputs["detection.png"] = buf.getvalue()
scene["detection"] = boxes
# Segmentation
if run_seg:
logger.info(f"Running segmentation with model: {seg_model}")
load_start = time.time()
model = get_model("segmentation", SEGMENTATION_MODEL_MAP[seg_model], device="cpu")
logger.info(f"{seg_model} segmentation model loaded in {time.time() - load_start:.2f} seconds.")
mask = model.predict(image)
overlay = model.draw(image, mask, alpha=blend)
combined_np = cv2.addWeighted(combined_np, 1 - blend, np.array(overlay), blend, 0)
buf = io.BytesIO()
overlay.save(buf, format="PNG")
outputs["segmentation.png"] = buf.getvalue()
scene["segmentation"] = mask.tolist()
# Depth Estimation
if run_depth:
logger.info(f"Running depth estimation with model: {depth_model}")
load_start = time.time()
model = get_model("depth", DEPTH_MODEL_MAP[depth_model], device="cpu")
logger.info(f"{depth_model} depth model loaded in {time.time() - load_start:.2f} seconds.")
dmap = model.predict(image)
norm_dmap = ((dmap - dmap.min()) / (dmap.ptp()) * 255).astype(np.uint8)
d_pil = Image.fromarray(norm_dmap)
combined_np = cv2.addWeighted(combined_np, 1 - blend, np.array(d_pil.convert("RGB")), blend, 0)
buf = io.BytesIO()
d_pil.save(buf, format="PNG")
outputs["depth_map.png"] = buf.getvalue()
scene["depth"] = dmap.tolist()
# Final image overlay
final_img = Image.fromarray(combined_np)
buf = io.BytesIO()
final_img.save(buf, format="PNG")
outputs["scene_blueprint.png"] = buf.getvalue()
# Scene description
try:
scene_json = describe_scene(**scene)
except Exception as e:
logger.warning(f"describe_scene failed: {e}")
scene_json = {"error": str(e)}
telemetry = {
"session_id": generate_session_id(),
"runtime_sec": round(log_runtime(start_time), 2),
"used_models": {
"detection": det_model if run_det else None,
"segmentation": seg_model if run_seg else None,
"depth": depth_model if run_depth else None
}
}
scene_json["telemetry"] = telemetry
outputs["scene_description.json"] = json.dumps(scene_json, indent=2).encode("utf-8")
# ZIP file creation
zip_buf = io.BytesIO()
with zipfile.ZipFile(zip_buf, "w") as zipf:
for name, data in outputs.items():
zipf.writestr(name, data)
elapsed = log_runtime(start_time)
logger.info(f"Image processing completed in {elapsed:.2f} seconds.")
return final_img, scene_json, ("uvis_results.zip", zip_buf.getvalue())
except Exception as e:
logger.error(f"Error in processing pipeline: {e}")
return None, {"error": str(e)}, None
# Main Handler
def handle(mode, media_upload, url, run_det, det_model, det_confidence, run_seg, seg_model, run_depth, depth_model, blend):
"""
Master handler for resolving input and processing.
Returns outputs for Gradio interface.
"""
session_id = generate_session_id()
logger.info(f"Session ID: {session_id} | Handler activated with mode: {mode}")
start_time = time.time()
media = resolve_input(mode, media_upload, url)
if not media:
return None, format_error("No valid input provided. Please check your upload or URL."), None
results = []
for single_media in media:
if isinstance(single_media, str): # Video file
valid, err = validate_video(single_media)
if not valid:
return None, format_error(err), None
cap = cv2.VideoCapture(single_media)
ret, frame = cap.read()
cap.release()
if not ret:
return None, format_error("Failed to read video frame."), None
single_media = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
if isinstance(single_media, Image.Image):
valid, err = validate_image(single_media)
if not valid:
return None, format_error(err), None
try:
return process_image(single_media, run_det, det_model, det_confidence, run_seg, seg_model, run_depth, depth_model, blend)
except timeout_decorator.timeout_decorator.TimeoutError:
logger.error("Image processing timed out.")
return None, format_error("Processing timed out. Try a smaller image or simpler model."), None
logger.warning("Unsupported media type resolved.")
log_runtime(start_time)
return None, format_error("Invalid input. Please check your upload or URL."), None
# Gradio Interface
# Gradio Interface
with gr.Blocks() as demo:
gr.Markdown("## Unified Visual Intelligence System (UVIS)")
with gr.Row():
with gr.Column(scale=2):
# Input Mode Toggle
mode = gr.Radio(["Upload", "URL"], value="Upload", label="Input Mode")
# File upload: accepts multiple images or one video (user chooses wisely)
media_upload = gr.File(
label="Upload Images (1–5) or 1 Video",
file_types=["image", ".mp4", ".mov", ".avi"],
file_count="multiple"
)
# URL input
url = gr.Textbox(label="URL (Image/Video)", visible=False)
# Toggle visibility
def toggle_inputs(selected_mode):
return [
gr.update(visible=(selected_mode == "Upload")), # media_upload
gr.update(visible=(selected_mode == "URL")) # url
]
mode.change(toggle_inputs, inputs=mode, outputs=[media_upload, url])
run_det = gr.Checkbox(label="Object Detection")
run_seg = gr.Checkbox(label="Semantic Segmentation")
run_depth = gr.Checkbox(label="Depth Estimation")
det_model = gr.Dropdown(choices=list(DETECTION_MODEL_MAP), label="Detection Model", visible=False)
seg_model = gr.Dropdown(choices=list(SEGMENTATION_MODEL_MAP), label="Segmentation Model", visible=False)
depth_model = gr.Dropdown(choices=list(DEPTH_MODEL_MAP), label="Depth Model", visible=False)
#det_confidence = gr.Slider(0.1, 1.0, 0.5, label="Detection Confidence Threshold", visible=False)
# Attach Visibility Logic
run_det.change(fn=toggle_det, inputs=[run_det], outputs=[det_model, det_confidence])
run_seg.change(fn=toggle_seg, inputs=[run_seg], outputs=[seg_model])
run_depth.change(fn=toggle_depth, inputs=[run_depth], outputs=[depth_model])
blend = gr.Slider(0.0, 1.0, 0.5, label="Overlay Blend")
# Run Button
run = gr.Button("Run Analysis")
#Right panel
with gr.Column(scale=1):
# single_img_preview = gr.Image(label="Preview (Image)", visible=False)
# gallery_preview = gr.Gallery(label="Preview (Gallery)", columns=3, height="auto", visible=False)
# video_preview = gr.Video(label="Preview (Video)", visible=False)
img_out = gr.Image(label="Scene Blueprint")
json_out = gr.JSON(label="Scene JSON")
zip_out = gr.File(label="Download Results")
# # Output Tabs
# with gr.Tab("Scene JSON"):
# json_out = gr.JSON()
# with gr.Tab("Scene Blueprint"):
# img_out = gr.Image()
# with gr.Tab("Download"):
# zip_out = gr.File()
def toggle_det(checked):
return [gr.update(visible=checked), gr.update(visible=checked)]
def toggle_seg(checked):
return [gr.update(visible=checked)]
def toggle_depth(checked):
return [gr.update(visible=checked)]
# Button Click Event
run.click(
handle,
inputs=[mode, media_upload, url, run_det, det_model, det_confidence, run_seg, seg_model, run_depth, depth_model, blend],
outputs=[img_out, json_out, zip_out]
)
# Footer Section
gr.Markdown("---")
gr.Markdown(
"""
<div style='text-align: center; font-size: 14px;'>
Built by <b>Durga Deepak Valluri</b><br>
<a href="https://github.com/DurgaDeepakValluri" target="_blank">GitHub</a> |
<a href="https://deecoded.io" target="_blank">Website</a> |
<a href="https://www.linkedin.com/in/durga-deepak-valluri" target="_blank">LinkedIn</a>
</div>
""",
)
# Launch the Gradio App
demo.launch()