Spaces:
Running
on
Zero
Running
on
Zero
def is_public_ip(url): | |
""" | |
Checks whether the resolved IP address of a URL is public (non-local). | |
Prevents SSRF by blocking internal addresses like 127.0.0.1 or 192.168.x.x. | |
""" | |
try: | |
hostname = urlparse(url).hostname | |
ip = socket.gethostbyname(hostname) | |
ip_obj = ipaddress.ip_address(ip) | |
return ip_obj.is_global # Only allow globally routable IPs | |
except Exception as e: | |
logger.warning(f"URL IP validation failed: {e}") | |
return False | |
def fetch_media_from_url(url): | |
""" | |
Downloads media from a URL. Supports images and videos. | |
Returns PIL.Image or video file path. | |
""" | |
logger.info(f"Fetching media from URL: {url}") | |
if not is_public_ip(url): | |
logger.warning("Blocked non-public URL request (possible SSRF).") | |
return None | |
try: | |
parsed_url = urlparse(url) | |
ext = os.path.splitext(parsed_url.path)[-1].lower() | |
headers = {"User-Agent": "Mozilla/5.0"} | |
r = requests.get(url, headers=headers, timeout=10) | |
if r.status_code != 200 or len(r.content) > 50 * 1024 * 1024: | |
logger.warning(f"Download failed or file too large.") | |
return None | |
tmp_file = tempfile.NamedTemporaryFile(delete=False, suffix=ext) | |
tmp_file.write(r.content) | |
tmp_file.close() | |
if ext in [".jpg", ".jpeg", ".png"]: | |
return Image.open(tmp_file.name).convert("RGB") | |
elif ext in [".mp4", ".avi", ".mov"]: | |
return tmp_file.name | |
else: | |
logger.warning("Unsupported file type from URL.") | |
return None | |
except Exception as e: | |
logger.error(f"URL fetch failed: {e}") | |
return None | |
# Input Validation Functions | |
def validate_image(img): | |
""" | |
Validates the uploaded image based on size and resolution limits. | |
Args: | |
img (PIL.Image.Image): Image to validate. | |
Returns: | |
Tuple[bool, str or None]: (True, None) if valid; (False, reason) otherwise. | |
""" | |
logger.info("Validating uploaded image.") | |
try: | |
buffer = io.BytesIO() | |
img.save(buffer, format="PNG") | |
size_mb = len(buffer.getvalue()) / (1024 * 1024) | |
if size_mb > MAX_IMAGE_MB: | |
logger.warning("Image exceeds size limit of 5MB.") | |
return False, "Image exceeds 5MB limit." | |
if img.width > MAX_IMAGE_RES[0] or img.height > MAX_IMAGE_RES[1]: | |
logger.warning("Image resolution exceeds 1920x1080.") | |
return False, "Image resolution exceeds 1920x1080." | |
logger.info("Image validation passed.") | |
return True, None | |
except Exception as e: | |
logger.error(f"Error validating image: {e}") | |
return False, str(e) | |
def validate_video(path): | |
""" | |
Validates the uploaded video based on size and duration limits. | |
Args: | |
path (str): Path to the video file. | |
Returns: | |
Tuple[bool, str or None]: (True, None) if valid; (False, reason) otherwise. | |
""" | |
logger.info(f"Validating video file at: {path}") | |
try: | |
size_mb = os.path.getsize(path) / (1024 * 1024) | |
if size_mb > MAX_VIDEO_MB: | |
logger.warning("Video exceeds size limit of 50MB.") | |
return False, "Video exceeds 50MB limit." | |
cap = cv2.VideoCapture(path) | |
fps = cap.get(cv2.CAP_PROP_FPS) | |
frames = cap.get(cv2.CAP_PROP_FRAME_COUNT) | |
duration = frames / fps if fps else 0 | |
cap.release() | |
if duration > MAX_VIDEO_DURATION: | |
logger.warning("Video exceeds 30 seconds duration limit.") | |
return False, "Video exceeds 30 seconds duration limit." | |
logger.info("Video validation passed.") | |
return True, None | |
except Exception as e: | |
logger.error(f"Error validating video: {e}") | |
return False, str(e) | |
# Input Resolution | |
def resolve_input(mode, uploaded_files, url): | |
""" | |
Resolves the media input based on selected mode. | |
- If mode is 'Upload', accepts either: | |
* 1–5 images (PIL.Image) | |
* OR 1 video file (file path as string) | |
- If mode is 'URL', fetches remote image or video. | |
Args: | |
mode (str): 'Upload' or 'URL' | |
uploaded_files (List[Union[PIL.Image.Image, str]]): Uploaded media | |
url (str): URL to image or video | |
Returns: | |
List[Union[PIL.Image.Image, str]] or None | |
""" | |
try: | |
logger.info(f"Resolving input for mode: {mode}") | |
if mode == "Upload": | |
if not uploaded_files: | |
logger.warning("No upload detected.") | |
return None | |
image_files = [f for f in uploaded_files if isinstance(f, Image.Image)] | |
video_files = [f for f in uploaded_files if isinstance(f, str) and f.lower().endswith((".mp4", ".mov", ".avi"))] | |
if image_files and video_files: | |
logger.warning("Mixed media upload not supported (images + video).") | |
return None | |
if image_files: | |
if 1 <= len(image_files) <= 5: | |
logger.info(f"Accepted {len(image_files)} image(s).") | |
return image_files | |
logger.warning("Invalid number of images. Must be 1 to 5.") | |
return None | |
if video_files: | |
if len(video_files) == 1: | |
logger.info("Accepted single video upload.") | |
return video_files | |
logger.warning("Only one video allowed.") | |
return None | |
logger.warning("Unsupported upload type.") | |
return None | |
elif mode == "URL": | |
if not url: | |
logger.warning("URL mode selected but URL is empty.") | |
return None | |
media = fetch_media_from_url(url) | |
if media: | |
logger.info("Media successfully fetched from URL.") | |
return [media] | |
else: | |
logger.warning("Failed to resolve media from URL.") | |
return None | |
else: | |
logger.error(f"Invalid mode selected: {mode}") | |
return None | |
except Exception as e: | |
logger.error(f"Exception in resolve_input(): {e}") | |
return None |