LiamKhoaLe commited on
Commit
1d88641
·
1 Parent(s): f214779

System update 13/5. 1. Spawn in valid zone. 2. Keep body offset (20x20). 3. Prevent skip-jump over obstacle. 4. Enhance garbage overlay. 5. Truncate KNN to focus on A*.

Browse files
Files changed (1) hide show
  1. app.py +62 -16
app.py CHANGED
@@ -134,7 +134,7 @@ def build_masks(seg):
134
  return water_mask, garbage_mask, movable_mask
135
 
136
  # Garbage mask can be highlighted in red
137
- def highlight_chunk_masks_on_frame(frame, labels, objs, color_uncollected=(0, 0, 128), color_collected=(0, 128, 0), alpha=0.3):
138
  """
139
  Overlays semi-transparent colored regions for garbage chunks on the frame.
140
  `objs` must have 'pos' and 'col' keys. The collection status changes the overlay color.
@@ -148,6 +148,7 @@ def highlight_chunk_masks_on_frame(frame, labels, objs, color_uncollected=(0, 0,
148
  mask = (labels == lab).astype(np.uint8)
149
  contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
150
  color = color_collected if obj["col"] else color_uncollected
 
151
  cv2.drawContours(overlay, contours, -1, color, thickness=cv2.FILLED)
152
  # Blend overlay with original frame using alpha
153
  return cv2.addWeighted(overlay, alpha, frame, 1 - alpha, 0)
@@ -159,6 +160,7 @@ def highlight_water_mask_on_frame(frame, binary_mask, color=(255, 0, 0), alpha=0
159
  overlay = frame.copy()
160
  mask = binary_mask.astype(np.uint8) * 255
161
  contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
 
162
  cv2.drawContours(overlay, contours, -1, color, thickness=cv2.FILLED)
163
  return cv2.addWeighted(overlay, alpha, frame, 1 - alpha, 0)
164
 
@@ -175,8 +177,12 @@ def astar(start, goal, occ):
175
  return p[::-1]
176
  for dx,dy in N8:
177
  nx,ny=cur[0]+dx,cur[1]+dy
178
- if not (0<=nx<640 and 0<=ny<640): continue
179
- if occ[ny,nx]==0: continue
 
 
 
 
180
  ng=g[cur]+1
181
  if (nx,ny) not in g or ng<g[(nx,ny)]:
182
  g[(nx,ny)]=ng
@@ -184,19 +190,37 @@ def astar(start, goal, occ):
184
  heapq.heappush(openq,(f,(nx,ny)))
185
  came[(nx,ny)]=cur
186
  return []
187
-
188
- # KNN fit
189
  def knn_path(start, targets, occ):
190
  todo = targets[:]; path=[]
191
  cur = tuple(start)
192
  while todo:
193
- nbrs = NearestNeighbors(n_neighbors=1).fit(todo)
194
- _,idx = nbrs.kneighbors([cur]); nxt=tuple(todo[idx[0][0]])
195
- seg = astar(cur, nxt, occ)
196
- if seg:
197
- if path and seg[0]==path[-1]: seg=seg[1:]
198
- path.extend(seg)
199
- cur = nxt; todo.remove(list(nxt))
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
200
  return path
201
 
202
  # ── Robot sprite/class -──────────────────────────────────────────────────
@@ -207,16 +231,23 @@ class Robot:
207
  if self.png.shape[-1] != 4:
208
  raise ValueError("Sprite image must have 4 channels (RGBA)")
209
  self.png = np.array(Image.open(sprite).convert("RGBA").resize((40,40)))
210
- self.pos = [20,20]; self.speed=speed
 
211
  def step(self, path):
212
  while path:
213
  dx, dy = path[0][0] - self.pos[0], path[0][1] - self.pos[1]
214
  dist = (dx * dx + dy * dy) ** 0.5
215
  if dist <= self.speed:
216
  self.pos = list(path.pop(0))
217
- else:
218
  r = self.speed / dist
219
- self.pos = [int(self.pos[0] + dx * r), int(self.pos[1] + dy * r)]
 
 
 
 
 
 
220
  # Break after one logical move to avoid overshooting
221
  break
222
 
@@ -461,8 +492,23 @@ def _pipeline(uid,img_path):
461
  else: # Garbage within valid travelable zone
462
  print(f"🧠 {len(centres)} garbage objects on water selected from {len(detections)} detections")
463
 
464
- # 3- Global route
 
 
 
 
 
 
 
 
 
 
 
 
 
465
  robot = Robot(SPRITE)
 
 
466
  path = knn_path(robot.pos, centres, movable_mask)
467
 
468
  # 4- Video synthesis
 
134
  return water_mask, garbage_mask, movable_mask
135
 
136
  # Garbage mask can be highlighted in red
137
+ def highlight_chunk_masks_on_frame(frame, labels, objs, color_uncollected=(0, 0, 128), color_collected=(0, 128, 0), alpha=0.8):
138
  """
139
  Overlays semi-transparent colored regions for garbage chunks on the frame.
140
  `objs` must have 'pos' and 'col' keys. The collection status changes the overlay color.
 
148
  mask = (labels == lab).astype(np.uint8)
149
  contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
150
  color = color_collected if obj["col"] else color_uncollected
151
+ # drawContours on overlay
152
  cv2.drawContours(overlay, contours, -1, color, thickness=cv2.FILLED)
153
  # Blend overlay with original frame using alpha
154
  return cv2.addWeighted(overlay, alpha, frame, 1 - alpha, 0)
 
160
  overlay = frame.copy()
161
  mask = binary_mask.astype(np.uint8) * 255
162
  contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
163
+ # drawContours on overlay
164
  cv2.drawContours(overlay, contours, -1, color, thickness=cv2.FILLED)
165
  return cv2.addWeighted(overlay, alpha, frame, 1 - alpha, 0)
166
 
 
177
  return p[::-1]
178
  for dx,dy in N8:
179
  nx,ny=cur[0]+dx,cur[1]+dy
180
+ # out-of-bounds / blocked
181
+ if not (0<=nx<640 and 0<=ny<640) or occ[ny,nx]==0: continue
182
+ # if diagonal, ensure both orthogonals are free
183
+ if abs(dx)==1 and abs(dy)==1:
184
+ if occ[cur[1]+dy, cur[0]]==0 or occ[cur[1], cur[0]+dx]==0:
185
+ continue
186
  ng=g[cur]+1
187
  if (nx,ny) not in g or ng<g[(nx,ny)]:
188
  g[(nx,ny)]=ng
 
190
  heapq.heappush(openq,(f,(nx,ny)))
191
  came[(nx,ny)]=cur
192
  return []
193
+ # KNN fit optimal path
 
194
  def knn_path(start, targets, occ):
195
  todo = targets[:]; path=[]
196
  cur = tuple(start)
197
  while todo:
198
+ # KNN follow a Greedy approach, which may not guarantee shortest path, hence only use A*
199
+ # nbrs = NearestNeighbors(n_neighbors=1).fit(todo)
200
+ # _,idx = nbrs.kneighbors([cur]); nxt=tuple(todo[idx[0][0]])
201
+ # seg = astar(cur, nxt, occ)
202
+ # if seg:
203
+ # if path and seg[0]==path[-1]: seg=seg[1:]
204
+ # path.extend(seg)
205
+ # cur = nxt; todo.remove(list(nxt))
206
+ best = None
207
+ best_len = float('inf')
208
+ best_seg = []
209
+ # Try A* to each target, find shortest actual path
210
+ for t in todo:
211
+ seg = astar(cur, tuple(t), occ)
212
+ if seg and len(seg) < best_len:
213
+ best = tuple(t)
214
+ best_len = len(seg)
215
+ best_seg = seg
216
+ if not best:
217
+ print("⚠️ Some garbage unreachable")
218
+ break # stop if no reachable targets left
219
+ if path and path[-1] == best_seg[0]:
220
+ best_seg = best_seg[1:] # avoid duplicate point
221
+ path.extend(best_seg)
222
+ cur = best
223
+ todo.remove(list(best))
224
  return path
225
 
226
  # ── Robot sprite/class -──────────────────────────────────────────────────
 
231
  if self.png.shape[-1] != 4:
232
  raise ValueError("Sprite image must have 4 channels (RGBA)")
233
  self.png = np.array(Image.open(sprite).convert("RGBA").resize((40,40)))
234
+ self.speed = speed
235
+ self.pos = [20, 20] # Fallback spawn with body offset at top-left
236
  def step(self, path):
237
  while path:
238
  dx, dy = path[0][0] - self.pos[0], path[0][1] - self.pos[1]
239
  dist = (dx * dx + dy * dy) ** 0.5
240
  if dist <= self.speed:
241
  self.pos = list(path.pop(0))
242
+ else: # If valid path within
243
  r = self.speed / dist
244
+ new_x = self.pos[0] + dx * r
245
+ new_y = self.pos[1] + dy * r
246
+ # Clip to valid region with 20px margin (for body offset)
247
+ self.pos = [
248
+ int(np.clip(new_x, 20, 640 - 20)),
249
+ int(np.clip(new_y, 20, 640 - 20))
250
+ ]
251
  # Break after one logical move to avoid overshooting
252
  break
253
 
 
492
  else: # Garbage within valid travelable zone
493
  print(f"🧠 {len(centres)} garbage objects on water selected from {len(detections)} detections")
494
 
495
+ # 3- Robot initialization, position and navigation
496
+ # find all (y,x) within movable_mask
497
+ ys, xs = np.where(movable_mask)
498
+ if len(ys)==0:
499
+ # no travelable zone → bail out
500
+ print(f"❌ [{uid}] no water to spawn on")
501
+ video_ready[uid] = True
502
+ return
503
+ # sort by y, then x
504
+ idx = np.lexsort((xs, ys))
505
+ spawn_y, spawn_x = int(ys[idx[0]]), int(xs[idx[0]])
506
+ # enforce 20px margin so sprite never pokes out
507
+ spawn_x = np.clip(spawn_x, 20, 640-20)
508
+ spawn_y = np.clip(spawn_y, 20, 640-20)
509
  robot = Robot(SPRITE)
510
+ # Robot will be spawn on the closest movable mask to top-left
511
+ robot.pos = [spawn_x, spawn_y]
512
  path = knn_path(robot.pos, centres, movable_mask)
513
 
514
  # 4- Video synthesis