Spaces:
Sleeping
Sleeping
""" | |
Digital Review System (DRS) application for LBW decisions | |
======================================================== | |
This application provides a simplified demonstration of how a cricket‑style | |
digital review system (DRS) could be implemented using open source | |
computer vision tools. It is not a complete Hawk‑Eye replacement, but | |
illustrates the key steps in building such a system: capturing video, | |
detecting and tracking the ball, estimating its flight trajectory, | |
analysing whether it would have hit the stumps, estimating speed and | |
generating a replay with annotations. A Gradio interface ties these | |
components together to provide an easy way to record a match, appeal | |
for an LBW decision and review the result. | |
The app has two main pages: | |
• **Live Match Recording** – allows the user to upload or record match video. | |
The video is stored on disk and can be analysed later. | |
• **LBW Review** – analyses the last few seconds of the recorded video | |
whenever an appeal is made. It performs ball tracking, trajectory | |
estimation and stumps intersection checks to predict whether the | |
batsman is out or not. An annotated replay and a 3D trajectory | |
visualisation are returned along with speed and impact information. | |
The implementation relies on simple background subtraction and circle | |
detection rather than proprietary tracking systems. It assumes a | |
single static camera behind the bowler and a fairly unobstructed view | |
of the pitch. See the individual modules in the ``modules`` package | |
for more details on each processing step. | |
Note: because this space is intended to run on Hugging Face, file | |
paths and heavy downloads are avoided wherever possible. The code is | |
fully self contained and uses only packages available in this runtime. | |
""" | |
from __future__ import annotations | |
import os | |
import shutil | |
import tempfile | |
from pathlib import Path | |
from typing import Any, Dict, Tuple | |
import gradio as gr | |
from drs_modules.video_processing import trim_last_seconds, save_uploaded_video | |
from drs_modules.detection import detect_and_track_ball | |
from drs_modules.trajectory import estimate_trajectory, predict_stumps_intersection | |
from drs_modules.lbw_decision import make_lbw_decision | |
from drs_modules.visualization import ( | |
generate_trajectory_plot, | |
annotate_video_with_tracking, | |
) | |
def analyse_appeal(video_path: str, review_seconds: int = 8) -> Tuple[str, Dict[str, Any]]: | |
"""Analyse the last few seconds of a match video and return DRS results. | |
Parameters | |
---------- | |
video_path: str | |
Path to the full match video recorded on the Live Match Recording page. | |
review_seconds: int, optional | |
Number of seconds from the end of the video to analyse. Defaults to 8. | |
Returns | |
------- | |
Tuple[str, Dict[str, Any]] | |
A message summarising the decision and a dictionary with the | |
underlying data for display (decision text, ball speed, impact | |
frame number, annotated video path and trajectory plot path). | |
""" | |
# Create a temporary directory to hold intermediate files | |
temp_dir = tempfile.mkdtemp() | |
trimmed_path = os.path.join(temp_dir, "trimmed.mp4") | |
# Step 1: Trim the last N seconds of the input video | |
trim_last_seconds(video_path, trimmed_path, review_seconds) | |
# Step 2: Detect and track the ball through the trimmed segment | |
tracking_data = detect_and_track_ball(trimmed_path) | |
# Step 3: Estimate the ball's trajectory (2D for simplicity) and predict | |
# whether it will hit the stumps | |
trajectory_model = estimate_trajectory(tracking_data["centers"], tracking_data["timestamps"]) | |
will_hit_stumps = predict_stumps_intersection(trajectory_model) | |
# Step 4: Make a decision based on trajectory and impact detection | |
decision, impact_frame_idx = make_lbw_decision( | |
tracking_data["centers"], | |
trajectory_model, | |
will_hit_stumps, | |
) | |
# Step 5: Calculate ball speed (pixels per second scaled to km/h) | |
total_distance_px = 0.0 | |
for i in range(1, len(tracking_data["centers"])): | |
cx0, cy0 = tracking_data["centers"][i - 1] | |
cx1, cy1 = tracking_data["centers"][i] | |
total_distance_px += ((cx1 - cx0) ** 2 + (cy1 - cy0) ** 2) ** 0.5 | |
# Duration of captured frames | |
duration = tracking_data["timestamps"][-1] - tracking_data["timestamps"][0] | |
if duration <= 0: | |
speed_kmh = 0.0 | |
else: | |
# Convert pixel distance per second to km/h using an assumed scale | |
pixels_per_metre = 50.0 | |
speed_mps = (total_distance_px / pixels_per_metre) / duration | |
speed_kmh = speed_mps * 3.6 | |
# Step 6: Generate annotated replay video and trajectory plot | |
annotated_video_path = os.path.join(temp_dir, "annotated.mp4") | |
annotate_video_with_tracking( | |
trimmed_path, | |
tracking_data["centers"], | |
trajectory_model, | |
will_hit_stumps, | |
impact_frame_idx, | |
annotated_video_path, | |
) | |
plot_path = os.path.join(temp_dir, "trajectory_plot.png") | |
generate_trajectory_plot( | |
tracking_data["centers"], trajectory_model, will_hit_stumps, plot_path | |
) | |
# Compose the message and result dictionary | |
decision_message = f"Decision: {decision}" | |
result = { | |
"decision": decision, | |
"ball_speed_kmh": round(speed_kmh, 2), | |
"impact_frame_index": impact_frame_idx, | |
"annotated_video": annotated_video_path, | |
"trajectory_plot": plot_path, | |
} | |
return decision_message, result | |
def build_interface() -> gr.Blocks: | |
"""Construct the Gradio interface with multiple pages.""" | |
with gr.Blocks(title="Cricket LBW DRS Demo") as demo: | |
gr.Markdown( | |
"""# Digital Review System (LBW) | |
This demo illustrates how a simplified digital review system can be | |
implemented using computer vision techniques. You can record or | |
upload match footage, and when an appeal occurs, the system will | |
analyse the last few seconds to decide whether the batsman is **OUT** | |
or **NOT OUT**. Alongside the decision you will receive an | |
annotated replay, a 3D trajectory plot and an estimate of the ball | |
speed. | |
""" | |
) | |
with gr.Tab("Live Match Recording"): | |
video_input = gr.Video( | |
label="Record or upload match video", | |
sources=["upload", "webcam"], | |
# Do not specify `type` because some versions of Gradio | |
# reject that argument. The file path is available via | |
# video_file.name in the callback. | |
) | |
out_video_path = gr.State() | |
def on_video_upload(video_file): | |
if video_file is None: | |
return None | |
save_path = save_uploaded_video(video_file, video_file) | |
return save_path | |
video_input.change( | |
fn=on_video_upload, | |
inputs=[video_input], | |
outputs=[out_video_path], | |
) | |
gr.Markdown( | |
""" | |
After recording or uploading a video, switch to the **LBW Review** | |
tab and press **Analyse Appeal** to review the last 8 seconds. | |
""" | |
) | |
with gr.Tab("LBW Review"): | |
with gr.Row(): | |
analyse_button = gr.Button("Analyse Appeal") | |
review_seconds = gr.Number( | |
value=8, label="Seconds to review", minimum=2, maximum=20 | |
) | |
decision_output = gr.Textbox(label="Decision", lines=1) | |
ball_speed_output = gr.Textbox( | |
label="Ball speed (km/h)", lines=1, interactive=False | |
) | |
impact_frame_output = gr.Textbox( | |
label="Impact frame index", lines=1, interactive=False | |
) | |
annotated_video_output = gr.Video( | |
label="Annotated replay video" | |
) | |
trajectory_plot_output = gr.Image( | |
label="3D Trajectory plot" | |
) | |
def on_analyse(_): | |
video_path = out_video_path.value | |
if not video_path or not os.path.exists(video_path): | |
return ( | |
"Please record or upload a video in the first tab.", | |
None, | |
None, | |
None, | |
None, | |
) | |
message, result = analyse_appeal(video_path, int(review_seconds.value)) | |
return ( | |
message, | |
str(result["ball_speed_kmh"]), | |
str(result["impact_frame_index"]), | |
result["annotated_video"], | |
result["trajectory_plot"], | |
) | |
analyse_button.click( | |
fn=on_analyse, | |
inputs=[analyse_button], | |
outputs=[ | |
decision_output, | |
ball_speed_output, | |
impact_frame_output, | |
annotated_video_output, | |
trajectory_plot_output, | |
], | |
) | |
return demo | |
if __name__ == "__main__": | |
demo = build_interface() | |
demo.launch() | |