|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import os, uuid, threading, shutil, time, heapq, cv2, numpy as np |
|
from PIL import Image |
|
import uvicorn |
|
from fastapi import FastAPI, File, UploadFile, Request |
|
from fastapi.responses import HTMLResponse, StreamingResponse, Response |
|
from fastapi.staticfiles import StaticFiles |
|
|
|
|
|
|
|
import torch, yolov5, ffmpeg |
|
from ultralytics import YOLO |
|
from transformers import ( |
|
DetrImageProcessor, DetrForObjectDetection, |
|
SegformerFeatureExtractor, SegformerForSemanticSegmentation |
|
) |
|
|
|
from inference_sdk import InferenceHTTPClient |
|
|
|
|
|
|
|
BASE = "/home/user/app" |
|
CACHE = f"{BASE}/cache" |
|
UPLOAD_DIR = f"{CACHE}/uploads" |
|
OUTPUT_DIR = f"{BASE}/outputs" |
|
MODEL_DIR = f"{BASE}/model" |
|
SPRITE = f"{BASE}/sprite.png" |
|
|
|
os.makedirs(UPLOAD_DIR, exist_ok=True) |
|
os.makedirs(OUTPUT_DIR, exist_ok=True) |
|
os.makedirs(CACHE , exist_ok=True) |
|
os.environ["TRANSFORMERS_CACHE"] = CACHE |
|
os.environ["HF_HOME"] = CACHE |
|
|
|
|
|
print("🔄 Loading models …") |
|
|
|
model_self = YOLO(f"{MODEL_DIR}/garbage_detector.pt") |
|
model_yolo5 = yolov5.load(f"{MODEL_DIR}/yolov5-detect-trash-classification.pt") |
|
processor_detr = DetrImageProcessor.from_pretrained(f"{MODEL_DIR}/detr") |
|
model_detr = DetrForObjectDetection.from_pretrained(f"{MODEL_DIR}/detr") |
|
feat_extractor = SegformerFeatureExtractor.from_pretrained( |
|
"nvidia/segformer-b4-finetuned-ade-512-512") |
|
segformer = SegformerForSemanticSegmentation.from_pretrained( |
|
"nvidia/segformer-b4-finetuned-ade-512-512") |
|
|
|
model_animal = YOLO(f"{MODEL_DIR}/yolov8n.pt") |
|
|
|
model_garbage_cls = YOLO(f"{MODEL_DIR}/garbage_cls_yolov8s.pt") |
|
print("✅ Models ready\n") |
|
|
|
|
|
|
|
ade_palette = np.array([ |
|
[0, 0, 0], [120, 120, 120], [180, 120, 120], [6, 230, 230], [80, 50, 50], |
|
[4, 200, 3], [120, 120, 80], [140, 140, 140], [204, 5, 255], [230, 230, 230], |
|
[4, 250, 7], [224, 5, 255], [235, 255, 7], [150, 5, 61], [120, 120, 70], |
|
[8, 255, 51], [255, 6, 82], [143, 255, 140], [204, 255, 4], [255, 51, 7], |
|
[204, 70, 3], [0, 102, 200], [61, 230, 250], [255, 6, 51], [11, 102, 255], |
|
[255, 7, 71], [255, 9, 224], [9, 7, 230], [220, 220, 220], |
|
[112, 9, 255], [8, 255, 214], [7, 255, 224], [255, 184, 6], [10, 255, 71], |
|
[255, 41, 10], [7, 255, 255], [224, 255, 8], [102, 8, 255], [255, 61, 6], |
|
[255, 194, 7], [255, 122, 8], [0, 255, 20], [255, 8, 41], [255, 5, 153], |
|
[6, 51, 255], [235, 12, 255], [160, 150, 20], [0, 163, 255], [140, 140, 140], |
|
[250, 10, 15], [20, 255, 0], [31, 255, 0], [255, 31, 0], [255, 224, 0], |
|
[153, 255, 0], [0, 0, 255], [255, 71, 0], [0, 235, 255], [0, 173, 255], |
|
[31, 0, 255], [11, 200, 200], [255, 82, 0], [0, 255, 245], [0, 61, 255], |
|
[0, 255, 112], [0, 255, 133], [255, 0, 0], [255, 163, 0], [255, 102, 0], |
|
[194, 255, 0], [0, 143, 255], [51, 255, 0], [0, 82, 255], [0, 255, 41], |
|
[255, 0, 255], [255, 0, 245], [255, 0, 102], [255, 173, 0], [255, 0, 20], |
|
[255, 184, 184], [0, 31, 255], [0, 255, 61], [0, 71, 255], [255, 0, 204], |
|
[0, 255, 194], [0, 255, 82], [0, 10, 255], [0, 112, 255], [51, 0, 255], |
|
[0, 194, 255], [0, 122, 255], [0, 255, 163], [255, 153, 0], [0, 255, 10], |
|
[255, 112, 0], [143, 255, 0], [82, 0, 255], [163, 255, 0], [255, 235, 0], |
|
[8, 184, 170], [133, 0, 255], [0, 255, 92], [184, 0, 255], [255, 0, 31], |
|
[0, 184, 255], [0, 214, 255], [255, 0, 112], [92, 255, 0], [0, 224, 255], |
|
[112, 224, 255], [70, 184, 160], [163, 0, 255], [153, 0, 255], [71, 255, 0], |
|
[255, 0, 163], [255, 204, 0], [255, 0, 143], [0, 255, 235], [133, 255, 0], |
|
[255, 0, 235], [245, 0, 255], [255, 0, 122], [255, 245, 0], [10, 190, 212], |
|
[214, 255, 0], [0, 204, 255], [20, 0, 255], [255, 255, 0], [0, 153, 255], |
|
[0, 41, 255], [0, 255, 204], [41, 0, 255], [41, 255, 0], [173, 0, 255], |
|
[0, 245, 255], [71, 0, 255], [122, 0, 255], [0, 255, 184], [0, 92, 255], |
|
[184, 255, 0], [0, 133, 255], [255, 214, 0], [25, 194, 194], [102, 255, 0], |
|
[92, 0, 255] |
|
], dtype=np.uint8) |
|
if ade_palette.shape[0] < 150: |
|
missing = 150 - ade_palette.shape[0] |
|
padding = np.zeros((missing, 3), dtype=np.uint8) |
|
ade_palette = np.vstack([ade_palette, padding]) |
|
|
|
custom_class_map = { |
|
"Garbage": [(255, 8, 41), (235, 255, 7), (255, 5, 153), (255, 0, 102)], |
|
"Water": [(0, 102, 200), (11, 102, 255), (31, 0, 255), (10, 0, 255), (9, 7, 230)], |
|
"Grass / Vegetation": [(10, 255, 71), (143, 255, 140)], |
|
"Tree / Natural Obstacle": [(4, 200, 3), (235, 12, 255), (255, 6, 82), (255, 163, 0)], |
|
"Sand / Soil / Ground": [(80, 50, 50), (230, 230, 230)], |
|
"Buildings / Structures": [(255, 0, 255), (184, 0, 255), (120, 120, 120), (7, 255, 224)], |
|
"Sky / Background": [(180, 120, 120)], |
|
"Undetecable": [(0, 0, 0)], |
|
"Unknown Class": [] |
|
} |
|
TOL = 30 |
|
|
|
|
|
def interpret_rgb_class(decoded_img): |
|
ambiguous_rgb = np.array([150, 5, 61]) |
|
matches = np.all(np.abs(decoded_img - ambiguous_rgb) <= TOL, axis=-1) |
|
match_ratio = np.count_nonzero(matches) / matches.size |
|
return "garbage" if match_ratio > 0.15 else "sand" |
|
|
|
|
|
def build_masks(seg): |
|
""" |
|
Returns three binary masks at (H,W): |
|
water_mask – 1 = water |
|
garbage_mask – 1 = semantic “Garbage” pixels |
|
movable_mask – union of water & garbage (robot can travel here) |
|
""" |
|
decoded = ade_palette[seg] |
|
water_mask = np.zeros(seg.shape, np.uint8) |
|
garbage_mask = np.zeros_like(water_mask) |
|
|
|
context_label = interpret_rgb_class(decoded) |
|
resolved_map = custom_class_map.copy() |
|
|
|
if context_label == "garbage": |
|
resolved_map["Garbage"].append((150, 5, 61)) |
|
resolved_map["Sand / Soil / Ground"] = [rgb for rgb in resolved_map["Sand / Soil / Ground"] if rgb != (150, 5, 61)] |
|
else: |
|
resolved_map["Sand / Soil / Ground"].append((150, 5, 61)) |
|
resolved_map["Garbage"] = [rgb for rgb in resolved_map["Garbage"] if rgb != (150, 5, 61)] |
|
|
|
for rgb in custom_class_map["Water"]: |
|
water_mask |= (np.abs(decoded - rgb).max(axis=-1) <= TOL) |
|
|
|
for rgb in custom_class_map["Garbage"]: |
|
garbage_mask |= (np.abs(decoded - rgb).max(axis=-1) <= TOL) |
|
movable_mask = water_mask | garbage_mask |
|
return water_mask, garbage_mask, movable_mask |
|
|
|
|
|
def highlight_chunk_masks_on_frame( |
|
frame, labels, objs, |
|
color_uncollected=(0, 0, 128), |
|
color_collected=(0, 128, 0), |
|
color_unreachable=(0, 255, 255), |
|
alpha=0.8 |
|
): |
|
""" |
|
Overlays semi-transparent colored regions for garbage chunks on the frame. |
|
`objs` must have 'pos' and 'col' keys. The collection status changes the overlay color. |
|
""" |
|
overlay = frame.copy() |
|
for i, obj in enumerate(objs): |
|
x, y = obj["pos"] |
|
lab = labels[y, x] |
|
if lab == 0: |
|
continue |
|
mask = (labels == lab).astype(np.uint8) |
|
contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) |
|
|
|
if obj.get("unreachable"): |
|
color = color_unreachable |
|
elif obj["col"]: |
|
color = color_collected |
|
else: |
|
color = color_uncollected |
|
cv2.drawContours(overlay, contours, -1, color, thickness=cv2.FILLED) |
|
|
|
return cv2.addWeighted(overlay, alpha, frame, 1 - alpha, 0) |
|
|
|
def highlight_water_mask_on_frame(frame, binary_mask, color=(255, 0, 0), alpha=0.3): |
|
""" |
|
Overlays semi-transparent colored mask (binary) on the frame. |
|
""" |
|
overlay = frame.copy() |
|
mask = binary_mask.astype(np.uint8) * 255 |
|
contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) |
|
|
|
cv2.drawContours(overlay, contours, -1, color, thickness=cv2.FILLED) |
|
return cv2.addWeighted(overlay, alpha, frame, 1 - alpha, 0) |
|
|
|
|
|
|
|
def astar(start, goal, occ): |
|
h = lambda a,b: abs(a[0]-b[0])+abs(a[1]-b[1]) |
|
N8 = [(-1,-1),(-1,0),(-1,1),(0,-1),(0,1),(1,-1),(1,0),(1,1)] |
|
openq=[(0,start)]; g={start:0}; came={} |
|
while openq: |
|
_,cur=heapq.heappop(openq) |
|
if cur==goal: |
|
p=[cur]; |
|
while cur in came: cur=came[cur]; p.append(cur) |
|
return p[::-1] |
|
for dx,dy in N8: |
|
nx,ny=cur[0]+dx,cur[1]+dy |
|
|
|
if not (0<=nx<640 and 0<=ny<640) or occ[ny,nx]==0: continue |
|
|
|
if abs(dx)==1 and abs(dy)==1: |
|
if occ[cur[1]+dy, cur[0]]==0 or occ[cur[1], cur[0]+dx]==0: |
|
continue |
|
ng=g[cur]+1 |
|
if (nx,ny) not in g or ng<g[(nx,ny)]: |
|
g[(nx,ny)]=ng |
|
f=ng+h((nx,ny),goal) |
|
heapq.heappush(openq,(f,(nx,ny))) |
|
came[(nx,ny)]=cur |
|
return [] |
|
|
|
def knn_path(start, targets, occ): |
|
todo = targets[:]; path=[] |
|
cur = tuple(start) |
|
reachable = []; unreachable = [] |
|
while todo: |
|
|
|
best = None |
|
best_len = float('inf') |
|
best_seg = [] |
|
|
|
for t in todo: |
|
seg = astar(cur, tuple(t), occ) |
|
if seg and len(seg) < best_len: |
|
best = tuple(t) |
|
best_len = len(seg) |
|
best_seg = seg |
|
if not best: |
|
|
|
for u in todo: |
|
print(f"⚠️ Garbage unreachable at {u}") |
|
unreachable.append(u) |
|
break |
|
if path and path[-1] == best_seg[0]: |
|
best_seg = best_seg[1:] |
|
path.extend(best_seg) |
|
reachable.append(list(best)) |
|
cur = best |
|
todo.remove(list(best)) |
|
return path, unreachable |
|
|
|
|
|
|
|
class Robot: |
|
def __init__(self, sprite, speed=2000): |
|
img = Image.open(sprite).convert("RGBA").resize((40, 40)) |
|
self.png = np.array(img) |
|
if self.png.shape[-1] != 4: |
|
raise ValueError("Sprite image must have 4 channels (RGBA)") |
|
self.png = np.array(Image.open(sprite).convert("RGBA").resize((40,40))) |
|
self.speed = speed |
|
self.pos = [20, 20] |
|
def step(self, path): |
|
while path: |
|
dx, dy = path[0][0] - self.pos[0], path[0][1] - self.pos[1] |
|
dist = (dx * dx + dy * dy) ** 0.5 |
|
if dist <= self.speed: |
|
self.pos = list(path.pop(0)) |
|
else: |
|
r = self.speed / dist |
|
new_x = self.pos[0] + dx * r |
|
new_y = self.pos[1] + dy * r |
|
|
|
self.pos = [ |
|
int(np.clip(new_x, 20, 640 - 20)), |
|
int(np.clip(new_y, 20, 640 - 20)) |
|
] |
|
|
|
break |
|
|
|
|
|
|
|
from fastapi.responses import JSONResponse, FileResponse |
|
from fastapi.middleware.cors import CORSMiddleware |
|
app = FastAPI() |
|
app.add_middleware( |
|
CORSMiddleware, |
|
allow_origins=["*"], |
|
allow_methods=["*"], |
|
allow_headers=["*"], |
|
) |
|
app.mount("/statics", StaticFiles(directory="statics"), name="statics") |
|
video_ready={} |
|
@app.get("/ui", response_class=HTMLResponse) |
|
async def serve_index(): |
|
p = "statics/index.html" |
|
if os.path.exists(p): |
|
print("[STATIC] Serving index.html") |
|
return FileResponse(p) |
|
print("[STATIC] index.html not found") |
|
return JSONResponse(status_code=404, content={"detail":"Not found"}) |
|
def _uid(): return uuid.uuid4().hex[:8] |
|
|
|
|
|
|
|
|
|
@app.post("/upload/") |
|
async def upload(file:UploadFile=File(...)): |
|
uid=_uid(); dest=f"{UPLOAD_DIR}/{uid}_{file.filename}" |
|
with open(dest,"wb") as bf: shutil.copyfileobj(file.file,bf) |
|
threading.Thread(target=_pipeline, args=(uid,dest)).start() |
|
return {"user_id":uid} |
|
|
|
|
|
@app.get("/check_video/{uid}") |
|
def chk(uid:str): return {"ready":video_ready.get(uid,False)} |
|
|
|
|
|
@app.get("/video/{uid}") |
|
def stream(uid:str): |
|
vid=f"{OUTPUT_DIR}/{uid}.mp4" |
|
if not os.path.exists(vid): return Response(status_code=404) |
|
return StreamingResponse(open(vid,"rb"), media_type="video/mp4") |
|
|
|
|
|
|
|
|
|
import base64, requests |
|
def roboflow_infer(image_path, api_url, api_key): |
|
with open(image_path, "rb") as image_file: |
|
files = {"file": image_file} |
|
res = requests.post( |
|
f"{api_url}?api_key={api_key}&confidence=70", |
|
files=files |
|
) |
|
print(f"[Roboflow] {res.status_code} response") |
|
try: |
|
return res.json() |
|
except Exception as e: |
|
print("[Roboflow JSON decode error]", e) |
|
return {} |
|
|
|
|
|
@app.post("/animal/") |
|
async def detect_animals(file: UploadFile = File(...)): |
|
img_id = _uid() |
|
img_path = f"{UPLOAD_DIR}/{img_id}_{file.filename}" |
|
with open(img_path, "wb") as f: |
|
shutil.copyfileobj(file.file, f) |
|
print(f"[Animal] Uploaded image: {img_path}") |
|
|
|
image = cv2.imread(img_path) |
|
detections = [] |
|
|
|
|
|
print("[Animal] Detecting via YOLOv8…") |
|
try: |
|
results = model_animal(image)[0] |
|
for box in results.boxes: |
|
conf = box.conf[0].item() |
|
if conf >= 0.70: |
|
cls_id = int(box.cls[0].item()) |
|
label = model_animal.names[cls_id].lower() |
|
if label in ["dog", "cat", "cow", "horse", "elephant", "bear", "zebra", "giraffe", "bird"]: |
|
x1, y1, x2, y2 = map(int, box.xyxy[0].tolist()) |
|
detections.append(((x1, y1, x2, y2), f"Animal Alert {conf}")) |
|
except Exception as e: |
|
print("[YOLOv8 Error]", e) |
|
|
|
|
|
print("[API] Roboflow key:", os.getenv("ROBOFLOW_KEY", "❌ not set")) |
|
|
|
|
|
try: |
|
print("[Animal] Detecting via Roboflow Fish model…") |
|
fish_response = roboflow_infer( |
|
img_path, |
|
"https://detect.roboflow.com/hydroquest/1", |
|
api_key=os.getenv("ROBOFLOW_KEY", "") |
|
) |
|
for pred in fish_response.get("predictions", []): |
|
if pred["confidence"] >= 0.70: |
|
acc = pred["confidence"] |
|
x1 = int(pred["x"] - pred["width"] / 2) |
|
y1 = int(pred["y"] - pred["height"] / 2) |
|
x2 = int(pred["x"] + pred["width"] / 2) |
|
y2 = int(pred["y"] + pred["height"] / 2) |
|
detections.append(((x1, y1, x2, y2), f"Fish Alert {acc}")) |
|
print("[Roboflow Fish Response]", fish_response) |
|
except Exception as e: |
|
print("[Roboflow Fish Error]", e) |
|
|
|
|
|
try: |
|
print("[Animal] Detecting via Roboflow Bird model…") |
|
bird_response = roboflow_infer( |
|
img_path, |
|
"https://detect.roboflow.com/bird_only-pt0bm/1", |
|
api_key=os.getenv("ROBOFLOW_KEY", "") |
|
) |
|
for pred in bird_response.get("predictions", []): |
|
if pred["confidence"] >= 0.70: |
|
acc = pred["confidence"] |
|
x1 = int(pred["x"] - pred["width"] / 2) |
|
y1 = int(pred["y"] - pred["height"] / 2) |
|
x2 = int(pred["x"] + pred["width"] / 2) |
|
y2 = int(pred["y"] + pred["height"] / 2) |
|
detections.append(((x1, y1, x2, y2), f"Bird Alert {acc}")) |
|
print("[Roboflow Bird Response]", bird_response) |
|
except Exception as e: |
|
print("[Roboflow Bird Error]", e) |
|
|
|
|
|
print(f"[Animal] Total detections: {len(detections)}") |
|
|
|
for (x1, y1, x2, y2), label in detections: |
|
cv2.rectangle(image, (x1, y1), (x2, y2), (0, 0, 255), 2) |
|
cv2.putText(image, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 255), 2) |
|
|
|
result_path = f"{OUTPUT_DIR}/{img_id}_animal.jpg" |
|
cv2.imwrite(result_path, image) |
|
return FileResponse(result_path, media_type="image/jpeg") |
|
|
|
|
|
|
|
@app.post("/classification/") |
|
async def classify_garbage(file: UploadFile = File(...)): |
|
img_id = _uid() |
|
img_path = f"{UPLOAD_DIR}/{img_id}_{file.filename}" |
|
out_path = f"{OUTPUT_DIR}/{img_id}_classified.jpg" |
|
|
|
with open(img_path, "wb") as f: |
|
shutil.copyfileobj(file.file, f) |
|
|
|
print(f"[Classification] Received image: {img_path}") |
|
image = cv2.imread(img_path) |
|
rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) |
|
pil = Image.fromarray(rgb) |
|
|
|
detections = [] |
|
|
|
for r in model_self(image): |
|
detections += [b.xyxy[0].tolist() for b in r.boxes] |
|
|
|
r = model_yolo5(image) |
|
if hasattr(r, 'pred') and len(r.pred) > 0: |
|
detections += [p[:4].tolist() for p in r.pred[0]] |
|
|
|
with torch.no_grad(): |
|
out = model_detr(**processor_detr(images=pil, return_tensors="pt")) |
|
results = processor_detr.post_process_object_detection( |
|
outputs=out, |
|
target_sizes=torch.tensor([pil.size[::-1]]), |
|
threshold=0.5 |
|
)[0] |
|
detections += [b.tolist() for b in results["boxes"]] |
|
print(f"[Classification] Total detections from 3 models: {len(detections)}") |
|
|
|
for box in detections: |
|
x1, y1, x2, y2 = map(int, box) |
|
x1, x2 = max(0, min(x1, 639)), max(0, min(x2, 639)) |
|
y1, y2 = max(0, min(y1, 639)), max(0, min(y2, 639)) |
|
|
|
crop = image[y1:y2, x1:x2] |
|
if crop.shape[0] < 10 or crop.shape[1] < 10: |
|
continue |
|
|
|
pil_crop = Image.fromarray(cv2.cvtColor(crop, cv2.COLOR_BGR2RGB)) |
|
with torch.no_grad(): |
|
pred = model_garbage_cls(pil_crop, verbose=False)[0] |
|
class_id = int(pred.probs.top1) |
|
class_name = model_garbage_cls.names[class_id] |
|
conf = float(pred.probs.top1conf) |
|
|
|
label = f"{class_name} ({conf:.2f})" |
|
|
|
if conf < 0.4: |
|
color = (0, 0, 255) |
|
elif conf < 0.6: |
|
color = (0, 255, 0) |
|
elif conf < 0.8: |
|
color = (255, 255, 0) |
|
else: |
|
color = (255, 0, 255) |
|
|
|
cv2.rectangle(image, (x1, y1), (x2, y2), color, 2) |
|
cv2.putText(image, label, (x1, y1 - 6), cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2) |
|
|
|
cv2.imwrite(out_path, image) |
|
print(f"[Classification] Output saved: {out_path}") |
|
return FileResponse(out_path, media_type="image/jpeg") |
|
|
|
|
|
|
|
|
|
def _pipeline(uid,img_path): |
|
print(f"▶️ [{uid}] processing") |
|
bgr=cv2.resize(cv2.imread(img_path),(640,640)); rgb=cv2.cvtColor(bgr,cv2.COLOR_BGR2RGB) |
|
pil=Image.fromarray(rgb) |
|
|
|
|
|
with torch.no_grad(): |
|
inputs = feat_extractor(pil, return_tensors="pt") |
|
seg_logits = segformer(**inputs).logits |
|
|
|
seg_tensor = seg_logits.argmax(1)[0].cpu() |
|
if seg_tensor.numel() == 0: |
|
print(f"❌ [{uid}] segmentation failed (empty tensor)") |
|
video_ready[uid] = True |
|
return |
|
|
|
seg = cv2.resize(seg_tensor.numpy(), (640, 640), interpolation=cv2.INTER_NEAREST) |
|
print(f"🧪 [{uid}] segmentation input shape: {inputs['pixel_values'].shape}") |
|
water_mask, garbage_mask, movable_mask = build_masks(seg) |
|
|
|
|
|
detections=[] |
|
|
|
num_cc, labels = cv2.connectedComponents(garbage_mask.astype(np.uint8)) |
|
chunk_centres = [] |
|
for lab in range(1, num_cc): |
|
ys, xs = np.where(labels == lab) |
|
if xs.size == 0: |
|
continue |
|
chunk_centres.append([int(xs.mean()), int(ys.mean())]) |
|
print(f"🧠 {len(chunk_centres)} garbage chunk detected") |
|
|
|
for r in model_self(bgr): |
|
detections += [b.xyxy[0].tolist() for b in r.boxes] |
|
r = model_yolo5(bgr) |
|
if hasattr(r, 'pred') and len(r.pred) > 0: |
|
detections += [p[:4].tolist() for p in r.pred[0]] |
|
inp=processor_detr(images=pil,return_tensors="pt") |
|
with torch.no_grad(): out=model_detr(**inp) |
|
post = processor_detr.post_process_object_detection( |
|
outputs=out, |
|
target_sizes=torch.tensor([pil.size[::-1]]), |
|
threshold=0.5 |
|
)[0] |
|
detections += [b.tolist() for b in post["boxes"]] |
|
|
|
centres = [] |
|
for x1, y1, x2, y2 in detections: |
|
''' |
|
We conduct a 20% allowance whether the center |
|
of the detected garbage's bbox lies within the travelable zone |
|
which was segmented earlier to be the water and garbage zone |
|
''' |
|
x1, y1, x2, y2 = map(int, [x1, y1, x2, y2]) |
|
x1 = max(0, min(x1, 639)); y1 = max(0, min(y1, 639)) |
|
x2 = max(0, min(x2, 639)); y2 = max(0, min(y2, 639)) |
|
box_mask = movable_mask[y1:y2, x1:x2] |
|
if box_mask.size == 0: |
|
continue |
|
if np.count_nonzero(box_mask) / box_mask.size >= 0.5: |
|
centres.append([int((x1 + x2) / 2), int((y1 + y2) / 2)]) |
|
|
|
centres.extend(chunk_centres) |
|
|
|
|
|
|
|
|
|
|
|
|
|
if not centres: |
|
print(f"🛑 [{uid}] no reachable garbage"); video_ready[uid]=True; return |
|
else: |
|
print(f"🧠 {len(centres)} garbage objects on water selected from {len(detections)} detections") |
|
|
|
|
|
|
|
ys, xs = np.where(water_mask) |
|
if len(ys)==0: |
|
|
|
print(f"❌ [{uid}] no water to spawn on") |
|
video_ready[uid] = True |
|
return |
|
|
|
idx = np.lexsort((xs, ys)) |
|
spawn_y, spawn_x = int(ys[idx[0]]), int(xs[idx[0]]) |
|
|
|
spawn_x = np.clip(spawn_x, 20, 640-20) |
|
spawn_y = np.clip(spawn_y, 20, 640-20) |
|
robot = Robot(SPRITE) |
|
|
|
robot.pos = [spawn_x, spawn_y] |
|
path, unreachable = knn_path(robot.pos, centres, movable_mask) |
|
if unreachable: |
|
print(f"⚠️ Unreachable garbage chunks at: {unreachable}") |
|
|
|
|
|
out_tmp=f"{OUTPUT_DIR}/{uid}_tmp.mp4" |
|
vw=cv2.VideoWriter(out_tmp,cv2.VideoWriter_fourcc(*"mp4v"),10.0,(640,640)) |
|
objs = [{"pos": p, "col": False, "unreachable": False} for p in centres if p not in unreachable] |
|
objs += [{"pos": p, "col": False, "unreachable": True} for p in unreachable] |
|
bg = bgr.copy() |
|
for _ in range(15000): |
|
frame=bg.copy() |
|
|
|
frame = highlight_chunk_masks_on_frame( |
|
frame, |
|
labels, |
|
objs, |
|
color_uncollected=(0, 0, 128), |
|
color_collected=(0, 128, 0), |
|
color_unreachable=(0, 255, 255) |
|
) |
|
frame = highlight_water_mask_on_frame(frame, water_mask) |
|
|
|
for o in objs: |
|
color = (0, 0, 128) if not o["col"] else (0, 128, 0) |
|
x, y = o["pos"] |
|
cv2.circle(frame, (x, y), 6, color, -1) |
|
|
|
robot.step(path) |
|
sp = robot.png |
|
sprite_h, sprite_w = sp.shape[:2] |
|
rx, ry = robot.pos |
|
x1, y1 = rx - sprite_w // 2, ry - sprite_h // 2 |
|
x2, y2 = x1 + sprite_w, y1 + sprite_h |
|
|
|
x1_clip, x2_clip = max(0, x1), min(frame.shape[1], x2) |
|
y1_clip, y2_clip = max(0, y1), min(frame.shape[0], y2) |
|
|
|
sx1, sy1 = x1_clip - x1, y1_clip - y1 |
|
sx2, sy2 = sprite_w - (x2 - x2_clip), sprite_h - (y2 - y2_clip) |
|
sprite_crop = sp[sy1:sy2, sx1:sx2] |
|
alpha = sprite_crop[:, :, 3] / 255.0 |
|
alpha = np.stack([alpha] * 3, axis=-1) |
|
bgroi = frame[y1_clip:y2_clip, x1_clip:x2_clip] |
|
blended = (alpha * sprite_crop[:, :, :3] + (1 - alpha) * bgroi).astype(np.uint8) |
|
frame[y1_clip:y2_clip, x1_clip:x2_clip] = blended |
|
|
|
for o in objs: |
|
if not o["col"] and np.hypot(o["pos"][0]-robot.pos[0], o["pos"][1]-robot.pos[1]) <= 20: |
|
o["col"]=True |
|
vw.write(frame) |
|
if all(o["col"] for o in objs): break |
|
if not path: break |
|
vw.release() |
|
|
|
|
|
final=f"{OUTPUT_DIR}/{uid}.mp4" |
|
ffmpeg.input(out_tmp).output(final,vcodec="libx264",pix_fmt="yuv420p").run(overwrite_output=True,quiet=True) |
|
os.remove(out_tmp); video_ready[uid]=True |
|
print(f"✅ [{uid}] video ready → {final}") |
|
|
|
|
|
|
|
if __name__=="__main__": |
|
uvicorn.run(app,host="0.0.0.0",port=7860) |
|
|