DurgaDeepak commited on
Commit
aec7cfd
·
verified ·
1 Parent(s): 4af0fae

Update core/input_handler.py

Browse files
Files changed (1) hide show
  1. core/input_handler.py +182 -0
core/input_handler.py CHANGED
@@ -0,0 +1,182 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ def is_public_ip(url):
2
+ """
3
+ Checks whether the resolved IP address of a URL is public (non-local).
4
+ Prevents SSRF by blocking internal addresses like 127.0.0.1 or 192.168.x.x.
5
+ """
6
+ try:
7
+ hostname = urlparse(url).hostname
8
+ ip = socket.gethostbyname(hostname)
9
+ ip_obj = ipaddress.ip_address(ip)
10
+ return ip_obj.is_global # Only allow globally routable IPs
11
+ except Exception as e:
12
+ logger.warning(f"URL IP validation failed: {e}")
13
+ return False
14
+
15
+
16
+ def fetch_media_from_url(url):
17
+ """
18
+ Downloads media from a URL. Supports images and videos.
19
+ Returns PIL.Image or video file path.
20
+ """
21
+ logger.info(f"Fetching media from URL: {url}")
22
+ if not is_public_ip(url):
23
+ logger.warning("Blocked non-public URL request (possible SSRF).")
24
+ return None
25
+
26
+ try:
27
+ parsed_url = urlparse(url)
28
+ ext = os.path.splitext(parsed_url.path)[-1].lower()
29
+ headers = {"User-Agent": "Mozilla/5.0"}
30
+ r = requests.get(url, headers=headers, timeout=10)
31
+
32
+ if r.status_code != 200 or len(r.content) > 50 * 1024 * 1024:
33
+ logger.warning(f"Download failed or file too large.")
34
+ return None
35
+
36
+ tmp_file = tempfile.NamedTemporaryFile(delete=False, suffix=ext)
37
+ tmp_file.write(r.content)
38
+ tmp_file.close()
39
+
40
+ if ext in [".jpg", ".jpeg", ".png"]:
41
+ return Image.open(tmp_file.name).convert("RGB")
42
+ elif ext in [".mp4", ".avi", ".mov"]:
43
+ return tmp_file.name
44
+ else:
45
+ logger.warning("Unsupported file type from URL.")
46
+ return None
47
+ except Exception as e:
48
+ logger.error(f"URL fetch failed: {e}")
49
+ return None
50
+
51
+ # Input Validation Functions
52
+ def validate_image(img):
53
+ """
54
+ Validates the uploaded image based on size and resolution limits.
55
+
56
+ Args:
57
+ img (PIL.Image.Image): Image to validate.
58
+
59
+ Returns:
60
+ Tuple[bool, str or None]: (True, None) if valid; (False, reason) otherwise.
61
+ """
62
+ logger.info("Validating uploaded image.")
63
+ try:
64
+ buffer = io.BytesIO()
65
+ img.save(buffer, format="PNG")
66
+ size_mb = len(buffer.getvalue()) / (1024 * 1024)
67
+
68
+ if size_mb > MAX_IMAGE_MB:
69
+ logger.warning("Image exceeds size limit of 5MB.")
70
+ return False, "Image exceeds 5MB limit."
71
+
72
+ if img.width > MAX_IMAGE_RES[0] or img.height > MAX_IMAGE_RES[1]:
73
+ logger.warning("Image resolution exceeds 1920x1080.")
74
+ return False, "Image resolution exceeds 1920x1080."
75
+
76
+ logger.info("Image validation passed.")
77
+ return True, None
78
+ except Exception as e:
79
+ logger.error(f"Error validating image: {e}")
80
+ return False, str(e)
81
+
82
+ def validate_video(path):
83
+ """
84
+ Validates the uploaded video based on size and duration limits.
85
+
86
+ Args:
87
+ path (str): Path to the video file.
88
+
89
+ Returns:
90
+ Tuple[bool, str or None]: (True, None) if valid; (False, reason) otherwise.
91
+ """
92
+ logger.info(f"Validating video file at: {path}")
93
+ try:
94
+ size_mb = os.path.getsize(path) / (1024 * 1024)
95
+ if size_mb > MAX_VIDEO_MB:
96
+ logger.warning("Video exceeds size limit of 50MB.")
97
+ return False, "Video exceeds 50MB limit."
98
+
99
+ cap = cv2.VideoCapture(path)
100
+ fps = cap.get(cv2.CAP_PROP_FPS)
101
+ frames = cap.get(cv2.CAP_PROP_FRAME_COUNT)
102
+ duration = frames / fps if fps else 0
103
+ cap.release()
104
+
105
+ if duration > MAX_VIDEO_DURATION:
106
+ logger.warning("Video exceeds 30 seconds duration limit.")
107
+ return False, "Video exceeds 30 seconds duration limit."
108
+
109
+ logger.info("Video validation passed.")
110
+ return True, None
111
+ except Exception as e:
112
+ logger.error(f"Error validating video: {e}")
113
+ return False, str(e)
114
+
115
+ # Input Resolution
116
+ def resolve_input(mode, uploaded_files, url):
117
+ """
118
+ Resolves the media input based on selected mode.
119
+ - If mode is 'Upload', accepts either:
120
+ * 1–5 images (PIL.Image)
121
+ * OR 1 video file (file path as string)
122
+ - If mode is 'URL', fetches remote image or video.
123
+
124
+ Args:
125
+ mode (str): 'Upload' or 'URL'
126
+ uploaded_files (List[Union[PIL.Image.Image, str]]): Uploaded media
127
+ url (str): URL to image or video
128
+
129
+ Returns:
130
+ List[Union[PIL.Image.Image, str]] or None
131
+ """
132
+ try:
133
+ logger.info(f"Resolving input for mode: {mode}")
134
+
135
+ if mode == "Upload":
136
+ if not uploaded_files:
137
+ logger.warning("No upload detected.")
138
+ return None
139
+
140
+ image_files = [f for f in uploaded_files if isinstance(f, Image.Image)]
141
+ video_files = [f for f in uploaded_files if isinstance(f, str) and f.lower().endswith((".mp4", ".mov", ".avi"))]
142
+
143
+ if image_files and video_files:
144
+ logger.warning("Mixed media upload not supported (images + video).")
145
+ return None
146
+
147
+ if image_files:
148
+ if 1 <= len(image_files) <= 5:
149
+ logger.info(f"Accepted {len(image_files)} image(s).")
150
+ return image_files
151
+ logger.warning("Invalid number of images. Must be 1 to 5.")
152
+ return None
153
+
154
+ if video_files:
155
+ if len(video_files) == 1:
156
+ logger.info("Accepted single video upload.")
157
+ return video_files
158
+ logger.warning("Only one video allowed.")
159
+ return None
160
+
161
+ logger.warning("Unsupported upload type.")
162
+ return None
163
+
164
+ elif mode == "URL":
165
+ if not url:
166
+ logger.warning("URL mode selected but URL is empty.")
167
+ return None
168
+ media = fetch_media_from_url(url)
169
+ if media:
170
+ logger.info("Media successfully fetched from URL.")
171
+ return [media]
172
+ else:
173
+ logger.warning("Failed to resolve media from URL.")
174
+ return None
175
+
176
+ else:
177
+ logger.error(f"Invalid mode selected: {mode}")
178
+ return None
179
+
180
+ except Exception as e:
181
+ logger.error(f"Exception in resolve_input(): {e}")
182
+ return None