File size: 5,660 Bytes
2db7738
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
"""Ball detection and tracking for the DRS application.

This module implements a simple motion‑based tracker to follow the cricket
ball in a video.  Professional ball tracking systems use multiple high
frame‑rate cameras and sophisticated object detectors.  Here, we rely on
background subtraction combined with circle detection (Hough circles) to
locate the ball in each frame.  The tracker keeps the coordinates and
timestamps of the ball's centre so that downstream modules can
estimate its trajectory and predict whether it will hit the stumps.

The detection pipeline makes the following assumptions:

  * Only one ball is present in the scene at a time.
  * The ball is approximately circular in appearance.
  * The camera is static or moves little compared to the ball.

These assumptions hold for many amateur cricket recordings but are
obviously simplified compared to a true DRS system.
"""

from __future__ import annotations

import cv2
import numpy as np
from typing import Dict, List, Tuple


def detect_and_track_ball(video_path: str) -> Dict[str, List]:
    """Detect and track the cricket ball in a video.

    Parameters
    ----------
    video_path: str
        Path to the trimmed video segment containing the delivery and appeal.

    Returns
    -------
    Dict[str, List]
        A dictionary containing:
            ``centers``: list of (x, y) coordinates of the ball in successive frames.
            ``timestamps``: list of timestamps (in seconds) corresponding to each centre.
            ``radii``: list of detected circle radii (in pixels).
    """
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        raise RuntimeError(f"Could not open video {video_path}")

    fps = cap.get(cv2.CAP_PROP_FPS) or 30.0

    # Background subtractor for motion detection
    bg_sub = cv2.createBackgroundSubtractorMOG2(history=500, varThreshold=32, detectShadows=False)

    centers: List[Tuple[int, int]] = []
    radii: List[int] = []
    timestamps: List[float] = []

    previous_center: Tuple[int, int] | None = None
    frame_idx = 0
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        timestamp = frame_idx / fps

        # Preprocess: grayscale and blur
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        blurred = cv2.GaussianBlur(gray, (5, 5), 0)

        # Apply background subtraction to emphasise moving objects
        fg_mask = bg_sub.apply(frame)
        # Remove noise from mask
        kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (3, 3))
        fg_mask = cv2.morphologyEx(fg_mask, cv2.MORPH_OPEN, kernel)

        detected_center: Tuple[int, int] | None = None
        detected_radius: int | None = None

        # Attempt to detect circles using Hough transform
        circles = cv2.HoughCircles(
            blurred,
            cv2.HOUGH_GRADIENT,
            dp=1.2,
            minDist=20,
            param1=50,
            param2=30,
            minRadius=3,
            maxRadius=30,
        )
        if circles is not None:
            circles = np.round(circles[0, :]).astype("int")
            # Choose the circle closest to the previous detection to maintain
            # continuity.  If no previous detection exists, pick the circle
            # with the smallest radius (likely the ball).
            if previous_center is not None:
                min_dist = float("inf")
                chosen = None
                for x, y, r in circles:
                    dist = (x - previous_center[0]) ** 2 + (y - previous_center[1]) ** 2
                    if dist < min_dist:
                        min_dist = dist
                        chosen = (x, y, r)
                if chosen is not None:
                    detected_center = (int(chosen[0]), int(chosen[1]))
                    detected_radius = int(chosen[2])
            else:
                # No previous centre: pick the smallest radius circle
                chosen = min(circles, key=lambda c: c[2])
                detected_center = (int(chosen[0]), int(chosen[1]))
                detected_radius = int(chosen[2])

        # Fallback: use contours on the foreground mask to find moving blobs
        if detected_center is None:
            contours, _ = cv2.findContours(fg_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
            # Filter contours by area to eliminate noise; choose the one
            # closest to previous centre or the smallest area blob
            candidates = []
            for cnt in contours:
                area = cv2.contourArea(cnt)
                if 10 < area < 800:  # adjust thresholds as necessary
                    x, y, w, h = cv2.boundingRect(cnt)
                    cx = x + w // 2
                    cy = y + h // 2
                    candidates.append((cx, cy, w, h, area))
            if candidates:
                if previous_center is not None:
                    chosen = min(candidates, key=lambda c: (c[0] - previous_center[0]) ** 2 + (c[1] - previous_center[1]) ** 2)
                else:
                    chosen = min(candidates, key=lambda c: c[4])
                cx, cy, w, h, _ = chosen
                detected_center = (int(cx), int(cy))
                detected_radius = int(max(w, h) / 2)

        if detected_center is not None:
            centers.append(detected_center)
            radii.append(detected_radius or 5)
            timestamps.append(timestamp)
            previous_center = detected_center
        # Increment frame index regardless of detection
        frame_idx += 1

    cap.release()
    return {"centers": centers, "radii": radii, "timestamps": timestamps}