from vitpose import VitPose import requests import os from config import API_URL,API_KEY from fastapi import UploadFile import logging import cv2 import numpy as np from dataclasses import dataclass from typing import Optional, Tuple, Dict, List import time import json from fastapi.responses import JSONResponse logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Jump Analysis Constants JUMP_THRESHOLD_PERCENT = 0.05 SMOOTHING_WINDOW = 5 HORIZONTAL_OFFSET_FACTOR = 0.75 VELOCITY_WINDOW = 3 METRICS_BELOW_FEET_OFFSET = 20 # Color Constants BLUE = (255, 0, 0) GREEN = (0, 255, 0) YELLOW = (0, 255, 255) WHITE = (255, 255, 255) BLACK = (0, 0, 0) GRAY = (128, 128, 128) LIGHT_GRAY = (200, 200, 200) COLORS = { "blue": BLUE, "green": GREEN, "yellow": YELLOW, "white": WHITE, "black": BLACK, "gray": GRAY, "light_gray": LIGHT_GRAY } # Keypoint indices KEYPOINT_INDICES = { 'L_Ankle': 15, 'L_Ear': 3, 'L_Elbow': 7, 'L_Eye': 1, 'L_Hip': 11, 'L_Knee': 13, 'L_Shoulder': 5, 'L_Wrist': 9, 'Nose': 0, 'R_Ankle': 16, 'R_Ear': 4, 'R_Elbow': 8, 'R_Eye': 2, 'R_Hip': 12, 'R_Knee': 14, 'R_Shoulder': 6, 'R_Wrist': 10 } # Skeleton connections SKELETON_CONNECTIONS = [ ("Nose", "L_Eye"), ("Nose", "R_Eye"), ("L_Eye", "L_Ear"), ("R_Eye", "R_Ear"), ("Nose", "L_Shoulder"), ("Nose", "R_Shoulder"), ("L_Shoulder", "R_Shoulder"), ("L_Shoulder", "L_Elbow"), ("R_Shoulder", "R_Elbow"), ("L_Elbow", "L_Wrist"), ("R_Elbow", "R_Wrist"), ("L_Shoulder", "L_Hip"), ("R_Shoulder", "R_Hip"), ("L_Hip", "R_Hip"), ("L_Hip", "L_Knee"), ("R_Hip", "R_Knee"), ("L_Knee", "L_Ankle"), ("R_Knee", "R_Ankle") ] @dataclass class JumpMetrics: max_jump_height: float = 0.0 velocity_vertical: float = 0.0 peak_power_sayer: float = 0.0 jump_peak_power: float = 0.0 repetition_count: int = 0 ground_level: Optional[float] = None takeoff_head_y: Optional[float] = None max_head_height_px: Optional[float] = None jump_started: bool = False @dataclass class OverlayConfig: alpha: float = 0.7 font: int = cv2.FONT_HERSHEY_SIMPLEX font_scale_title_metric: float = 0.5 font_scale_value: float = 0.7 font_scale_title_main: float = 1.2 font_thickness_metric: int = 1 font_thickness_title_main: int = 1 line_height_title_metric: int = int(20 * 1.2) line_height_value: int = int(25 * 1.2) padding_vertical: int = int(15 * 1.2) padding_horizontal: int = int(15 * 1.2) border_thickness: int = 1 corner_radius: int = 10 spacing_horizontal: int = 30 title_y_offset: int = 50 metrics_y_offset_alto: int = 80 @dataclass class FramePosition: x: int y: int width: int height: int def process_video(file_name: str,vitpose: VitPose,user_id: str,player_id: str): """ Process a video file using VitPose for pose estimation and send results to webhook. This function processes a video file by applying pose estimation, saving the annotated video to the static directory, and sending the processed video to a webhook endpoint. Args: file_name (str): Path to the input video file vitpose (VitPose): VitPose instance for pose estimation user_id (str): ID of the user uploading the video player_id (str): ID of the player in the video Returns: None Raises: ValueError: If video file cannot be opened or processed requests.RequestException: If webhook request fails """ video_path = file_name contents = open(video_path, "rb").read() with open(video_path, "wb") as f: f.write(contents) logger.info(f"file saved {video_path}") logger.info(f"starting task {video_path}") new_file_name = os.path.join("static", video_path) logger.info(f"new file name {new_file_name}") vitpose.output_video_path = new_file_name annotated_frames = vitpose.run(video_path) vitpose.frames_to_video(annotated_frames) logger.info(f"Video processed {video_path}") with open(new_file_name, "rb") as f: contents = f.read() url = API_URL+ "/excercises/webhooks/video-processed" logger.info(f"Sending video to {url}") files = {"file": (video_path, contents, "video/mp4")} logger.info(f"video_path: {video_path}") response = requests.post(url, files=files, data={"user_id":user_id,"typeMessage":"video_processed","file_name":video_path, "player_id":player_id}, stream=True, headers={"token":API_KEY}) logger.info(f"Response: {response.status_code}") logger.info(f"Response: {response.text}") logger.info(f"Video sent to {url}") def process_salto_alto(file_name: str, vitpose: VitPose, player_data: dict, exercise_id: str, repetitions) -> dict: """ Process a high jump exercise video using VitPose for pose estimation and analyze jump metrics. This function processes a high jump video by analyzing pose keypoints to calculate jump metrics including height, velocity, and power. Results are sent to an API endpoint. Args: file_name (str): Path to the input video file vitpose (VitPose): VitPose instance for pose estimation player_data (dict): Dictionary containing player information including: - height: Player height in cm - weight: Player weight in kg - id: Player identifier exercise_id (str): Unique identifier for the exercise repetitions (int): Expected number of jump repetitions in the video Returns: dict: Dictionary containing analysis results and video information Raises: ValueError: If video processing fails or player data is invalid requests.RequestException: If API request fails """ # Use the provided VitPose instance print(f"start processing") model = vitpose.pipeline # Get player parameters from player_data or use defaults reference_height = player_data.get('height', 1.68) # Altura aproximada de la persona en metros body_mass_kg = player_data.get('weight', 64) # Peso corporal en kg # Generate output paths output_video = file_name.replace('.mp4', '_analyzed.mp4') # Process the video and get the jump metrics # print(f"reference_height: {reference_height}") results_dict = analyze_jump_video( model=model, input_video=file_name, output_video=output_video, player_height= float(reference_height) / 100, #cm to m body_mass_kg= float(body_mass_kg), repetitions=repetitions ) results_dict = {'video_analysis': {'output_video': 'user_id_2_player_id_2_exercise_salto_alto_VIDEO-2025-05-19-18-55-47_analyzed.mp4'}, 'repetition_data': [{'repetition': 1, 'distancia_elevada': 0.47999998927116394, 'salto_alto': 2.180000066757202, 'potencia_sayer': 3768.719970703125}, {'repetition': 2, 'distancia_elevada': 0.49000000953674316, 'salto_alto': 2.190000057220459, 'potencia_sayer': 3827.929931640625}, {'repetition': 3, 'distancia_elevada': 0.5099999904632568, 'salto_alto': 2.2100000381469727, 'potencia_sayer': 3915.5}]} print(f"results_dict: {results_dict}") response = send_results_api(results_dict, player_data["id"], exercise_id, file_name) # os.remove(file_name) # os.remove(output_video) def send_results_api(results_dict: dict, player_id: str, exercise_id: str, video_path: str) -> JSONResponse: """ Send video analysis results to the API webhook endpoint. This function uploads the analyzed video file along with the computed metrics to the API's webhook endpoint for processing and storage. Args: results_dict (dict): Dictionary containing analysis results including: - video_analysis: Information about the processed video - repetition_data: List of metrics for each jump repetition player_id (str): Unique identifier for the player exercise_id (str): Unique identifier for the exercise video_path (str): Path to the video file to upload Returns: JSONResponse: HTTP response from the API endpoint Raises: FileNotFoundError: If the video file doesn't exist requests.RequestException: If the API request fails json.JSONEncodeError: If results_dict cannot be serialized to JSON """ url = API_URL + "/excercises/webhooks/video-processed-results" logger.info(f"Sending video results to {url}") # Open the video file with open(video_path, 'rb') as video_file: # Prepare the files dictionary for file upload files = { 'file': (video_path.split('/')[-1], video_file, 'video/mp4') } # Prepare the form data data = { 'player_id': player_id, 'exercise_id': exercise_id, 'results': json.dumps(results_dict) # Convert dict to JSON string } # Send the request with both files and data response = requests.post( url, headers={"token": API_KEY}, files=files, data=data, stream=True ) logger.info(f"Response: {response.status_code}") logger.info(f"Response: {response.text}") return response def setup_video_capture(input_video: str, output_video: str) -> Tuple[cv2.VideoCapture, cv2.VideoWriter, int, int]: """ Initialize video capture and writer objects for video processing. This function creates OpenCV VideoCapture and VideoWriter objects with matching properties (frame rate, dimensions) for reading from input and writing to output. Args: input_video (str): Path to the input video file output_video (str): Path for the output video file Returns: Tuple[cv2.VideoCapture, cv2.VideoWriter, int, int]: A tuple containing: - cap: VideoCapture object for reading input video - out: VideoWriter object for writing output video - width: Video frame width in pixels - height: Video frame height in pixels Raises: ValueError: If the input video cannot be opened or read cv2.error: If video writer initialization fails """ cap = cv2.VideoCapture(input_video) if not cap.isOpened(): raise ValueError("Error al abrir el video") fps = cap.get(cv2.CAP_PROP_FPS) width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) out = cv2.VideoWriter(output_video, cv2.VideoWriter_fourcc(*'mp4v'), fps, (width, height)) return cap, out, width, height def calibrate_pose_detection(model, cap, player_height: float) -> Tuple[float, int, int]: """ Calibrate pose detection scale and reference points using the first video frame. This function analyzes the first frame to establish the pixel-to-meter conversion ratio based on the player's known height and detects initial shoulder positions for reference during video processing. Args: model: VitPose model instance for pose estimation cap: OpenCV VideoCapture object player_height (float): Actual height of the player in meters Returns: Tuple[float, int, int]: A tuple containing: - PX_PER_METER: Conversion factor from pixels to meters - initial_left_shoulder_x: X-coordinate of left shoulder in pixels - initial_right_shoulder_x: X-coordinate of right shoulder in pixels Raises: ValueError: If video cannot be read or pose detection fails on first frame IndexError: If required keypoints are not detected in the first frame """ ret, frame = cap.read() if not ret: raise ValueError("Error al leer el video") output = model(frame) keypoints = output.keypoints_xy.float().cpu().numpy() labels = model.pose_estimator_config.label2id nose_keypoint = labels["Nose"] L_ankle_keypoint = labels["L_Ankle"] R_ankle_keypoint = labels["R_Ankle"] L_shoulder_keypoint = labels["L_Shoulder"] R_shoulder_keypoint = labels["R_Shoulder"] PX_PER_METER = None initial_left_shoulder_x = None initial_right_shoulder_x = None if (keypoints is not None and len(keypoints) > 0 and len(keypoints[0]) > 0): kpts_first = keypoints[0] if len(kpts_first[nose_keypoint]) > 0 and len(kpts_first[L_ankle_keypoint]) > 0: initial_person_height_px = min(kpts_first[L_ankle_keypoint][1], kpts_first[R_ankle_keypoint][1]) - kpts_first[nose_keypoint][1] PX_PER_METER = initial_person_height_px / player_height if len(kpts_first[L_shoulder_keypoint]) > 0 and len(kpts_first[R_shoulder_keypoint]) > 0: initial_left_shoulder_x = int(kpts_first[L_shoulder_keypoint][0]) initial_right_shoulder_x = int(kpts_first[R_shoulder_keypoint][0]) if PX_PER_METER is None or initial_left_shoulder_x is None or initial_right_shoulder_x is None: raise ValueError("No se pudo calibrar la escala o detectar los hombros en el primer frame.") return PX_PER_METER, initial_left_shoulder_x, initial_right_shoulder_x def process_frame_keypoints(model, frame): """ Process a video frame and extract human pose keypoints. This function applies the pose estimation model to a frame and validates that all required keypoints (nose, ankles, shoulders) are detected and visible. Args: model: VitPose model instance for pose estimation frame: Input video frame as numpy array Returns: Tuple containing: - success (bool): True if all required keypoints were detected, False otherwise - current_ankle_y (float or None): Y-coordinate of the highest ankle point if detected - current_head_y (float or None): Y-coordinate of the nose point if detected - keypoints (numpy.ndarray or None): Array of detected keypoints if successful """ try: output = model(frame) keypoints = output.keypoints_xy.float().cpu().numpy() labels = model.pose_estimator_config.label2id nose_keypoint = labels["Nose"] L_ankle_keypoint = labels["L_Ankle"] R_ankle_keypoint = labels["R_Ankle"] L_shoulder_keypoint = labels["L_Shoulder"] R_shoulder_keypoint = labels["R_Shoulder"] if (keypoints is not None and len(keypoints) > 0 and len(keypoints[0]) > 0 and keypoints.size > 0): kpts = keypoints[0] if (nose_keypoint < len(kpts) and L_ankle_keypoint < len(kpts) and R_ankle_keypoint < len(kpts) and L_shoulder_keypoint < len(kpts) and R_shoulder_keypoint < len(kpts)): nose = kpts[nose_keypoint] ankles = [kpts[L_ankle_keypoint], kpts[R_ankle_keypoint]] left_shoulder = kpts[L_shoulder_keypoint] right_shoulder = kpts[R_shoulder_keypoint] if (nose[0] > 0 and nose[1] > 0 and all(a[0] > 0 and a[1] > 0 for a in ankles) and left_shoulder[0] > 0 and left_shoulder[1] > 0 and right_shoulder[0] > 0 and right_shoulder[1] > 0): current_ankle_y = min(a[1] for a in ankles) current_head_y = nose[1] return True, current_ankle_y, current_head_y, keypoints return False, None, None, None except Exception as e: print(f"Error processing frame: {e}") return False, None, None, None def detect_jump_events(metrics: JumpMetrics, smoothed_ankle_y: float, smoothed_head_y: float, repetition_data: List[Dict], player_height: float, body_mass_kg: float, repetitions: int) -> bool: """ Detect jump start and end events based on ankle position changes. This function monitors ankle position relative to ground level to detect when a jump begins and ends. It calculates jump metrics for completed jumps and tracks repetition count. Args: metrics (JumpMetrics): Object tracking current jump state and metrics smoothed_ankle_y (float): Current smoothed ankle Y-coordinate smoothed_head_y (float): Current smoothed head Y-coordinate repetition_data (List[Dict]): List to store completed jump data player_height (float): Player height in meters body_mass_kg (float): Player body mass in kilograms repetitions (int): Target number of repetitions to detect Returns: bool: True if target number of repetitions has been reached, False otherwise Side Effects: - Updates metrics object with jump state - Appends completed jump data to repetition_data list - Modifies metrics.ground_level, metrics.jump_started, metrics.repetition_count """ if metrics.ground_level is None: metrics.ground_level = smoothed_ankle_y metrics.takeoff_head_y = smoothed_head_y return False relative_ankle_change = (metrics.ground_level - smoothed_ankle_y) / metrics.ground_level if metrics.ground_level > 0 else 0 # Detect jump start if not metrics.jump_started and relative_ankle_change > JUMP_THRESHOLD_PERCENT: metrics.jump_started = True metrics.takeoff_head_y = smoothed_head_y metrics.max_jump_height = 0 metrics.max_head_height_px = smoothed_head_y metrics.jump_peak_power = 0.0 return False # Detect jump end if metrics.jump_started and relative_ankle_change <= JUMP_THRESHOLD_PERCENT: high_jump = calculate_high_jump(player_height, metrics.max_jump_height) repetition_data.append({ "repetition": metrics.repetition_count + 1, "distancia_elevada": round(metrics.max_jump_height, 2), "salto_alto": round(high_jump, 2), "potencia_sayer": round(metrics.jump_peak_power, 2) }) metrics.repetition_count += 1 metrics.jump_started = False return metrics.repetition_count >= repetitions return False def calculate_jump_metrics(metrics: JumpMetrics, smoothed_head_y: float, PX_PER_METER: float, body_mass_kg: float, head_y_buffer: List[float], fps: float): """ Calculate jump metrics during an active jump phase. This function continuously updates jump metrics while a jump is in progress, tracking maximum jump height, peak power, and other performance indicators. Args: metrics (JumpMetrics): Object containing current jump state and metrics smoothed_head_y (float): Current smoothed head Y-coordinate in pixels PX_PER_METER (float): Conversion factor from pixels to meters body_mass_kg (float): Player body mass in kilograms head_y_buffer (List[float]): Buffer of recent head positions for velocity calculation fps (float): Video frame rate in frames per second Returns: None Side Effects: - Updates metrics.max_jump_height if current jump exceeds previous maximum - Updates metrics.max_head_height_px with lowest Y-coordinate (highest position) - Updates metrics.jump_peak_power and metrics.peak_power_sayer with calculated power values """ if not metrics.jump_started: return relative_jump = (metrics.takeoff_head_y - smoothed_head_y) / PX_PER_METER if relative_jump > metrics.max_jump_height: metrics.max_jump_height = relative_jump if smoothed_head_y < metrics.max_head_height_px: metrics.max_head_height_px = smoothed_head_y if relative_jump: current_power = calculate_peak_power_sayer(relative_jump, body_mass_kg) if current_power > metrics.jump_peak_power: metrics.jump_peak_power = current_power if current_power > metrics.peak_power_sayer: metrics.peak_power_sayer = current_power def calculate_velocity(head_y_buffer: List[float], PX_PER_METER: float, fps: float) -> float: """ Calculate vertical velocity based on head position changes over time. This function computes the vertical velocity by analyzing the change in head position over a specified time window, converting from pixel coordinates to real-world units. Args: head_y_buffer (List[float]): Buffer containing recent head Y-coordinates in pixels PX_PER_METER (float): Conversion factor from pixels to meters fps (float): Video frame rate in frames per second Returns: float: Vertical velocity in meters per second (positive = upward motion) Returns 0.0 if calculation cannot be performed Note: - Requires at least VELOCITY_WINDOW frames in the buffer - Velocity is calculated as the change from oldest to newest position - Y-coordinates decrease as objects move upward in image coordinates """ if len(head_y_buffer) < VELOCITY_WINDOW or PX_PER_METER is None or fps <= 0: return 0.0 delta_y_pixels = head_y_buffer[0] - head_y_buffer[-1] delta_y_meters = delta_y_pixels / PX_PER_METER delta_t = VELOCITY_WINDOW / fps return delta_y_meters / delta_t def draw_skeleton(frame, keypoints): """ Draw human pose skeleton on a video frame. This function visualizes the detected pose by drawing keypoints as circles and connecting them with lines according to the human body structure. Args: frame (numpy.ndarray): Video frame to draw on (modified in-place) keypoints (numpy.ndarray or None): Array of detected keypoints with shape (N, 17, 2) where N is batch size, 17 is number of keypoints, and 2 represents (x, y) coordinates Returns: None Side Effects: - Modifies the input frame by drawing circles for keypoints - Draws lines connecting related body parts (skeleton connections) - Uses GREEN color for keypoints and YELLOW for connections Note: - Safely handles None or empty keypoints arrays - Only draws keypoints and connections with positive coordinates - Uses SKELETON_CONNECTIONS constant for body part relationships """ if keypoints is None or len(keypoints) == 0 or len(keypoints[0]) == 0: return try: kpts = keypoints[0] # Draw points for point in kpts: if point[0] > 0 and point[1] > 0: cv2.circle(frame, (int(point[0]), int(point[1])), 5, GREEN, -1) # Draw connections for connection in SKELETON_CONNECTIONS: start_name, end_name = connection start_idx = KEYPOINT_INDICES[start_name] end_idx = KEYPOINT_INDICES[end_name] if (start_idx < len(kpts) and end_idx < len(kpts) and kpts[start_idx][0] > 0 and kpts[start_idx][1] > 0 and kpts[end_idx][0] > 0 and kpts[end_idx][1] > 0): start_point = (int(kpts[start_idx][0]), int(kpts[start_idx][1])) end_point = (int(kpts[end_idx][0]), int(kpts[end_idx][1])) cv2.line(frame, start_point, end_point, YELLOW, 2) except Exception as e: print(f"Error drawing skeleton: {e}") def analyze_jump_video(model: VitPose, input_video: str, output_video: str, player_height: float, body_mass_kg: float, repetitions: int) -> dict | None: """ Analyze a jump video to calculate various jump metrics. Args: model: VitPose model instance input_video: Path to input video output_video: Path to output video player_height: Height of the person in meters body_mass_kg: Weight of the person in kg repetitions: Expected number of repetitions Returns: Dictionary containing jump metrics and video analysis data """ try: # Setup video capture and writer cap, out, width, height = setup_video_capture(input_video, output_video) fps = cap.get(cv2.CAP_PROP_FPS) # Calibrate pose detection PX_PER_METER, initial_left_shoulder_x, initial_right_shoulder_x = calibrate_pose_detection( model, cap, player_height) # Reset video for processing cap.release() cap = cv2.VideoCapture(input_video) # Initialize tracking variables metrics = JumpMetrics() repetition_data = [] head_y_history = [] ankle_y_history = [] head_y_buffer = [] last_detected_ankles_y = None # Process each frame while cap.isOpened(): ret, frame = cap.read() if not ret: break annotated_frame = frame.copy() if metrics.repetition_count >= repetitions: out.write(annotated_frame) continue # Process frame keypoints keypoints_valid, current_ankle_y, current_head_y, keypoints = process_frame_keypoints(model, annotated_frame) if keypoints_valid: last_detected_ankles_y = current_ankle_y # Smooth positions ankle_y_history.append(current_ankle_y) if len(ankle_y_history) > SMOOTHING_WINDOW: ankle_y_history.pop(0) smoothed_ankle_y = np.mean(ankle_y_history) head_y_history.append(current_head_y) if len(head_y_history) > SMOOTHING_WINDOW: head_y_history.pop(0) smoothed_head_y = np.mean(head_y_history) # Calculate velocity head_y_buffer.append(smoothed_head_y) if len(head_y_buffer) > VELOCITY_WINDOW: head_y_buffer.pop(0) metrics.velocity_vertical = calculate_velocity(head_y_buffer, PX_PER_METER, fps) # Detect jump events should_stop = detect_jump_events(metrics, smoothed_ankle_y, smoothed_head_y, repetition_data, player_height, body_mass_kg, repetitions) if should_stop: break # Calculate jump metrics during jump calculate_jump_metrics(metrics, smoothed_head_y, PX_PER_METER, body_mass_kg, head_y_buffer, fps) else: last_detected_ankles_y = None metrics.velocity_vertical = 0.0 # Draw overlay and skeleton high_jump = calculate_high_jump(player_height, metrics.max_jump_height) annotated_frame = draw_metrics_overlay( frame=annotated_frame, max_jump_height=metrics.max_jump_height, salto_alto=high_jump, velocity_vertical=metrics.velocity_vertical, peak_power_sayer=metrics.peak_power_sayer, repetition_count=metrics.repetition_count, last_detected_ankles_y=last_detected_ankles_y, initial_left_shoulder_x=initial_left_shoulder_x, initial_right_shoulder_x=initial_right_shoulder_x, width=width, height=height, colors=COLORS, metrics_below_feet_offset=METRICS_BELOW_FEET_OFFSET, horizontal_offset_factor=HORIZONTAL_OFFSET_FACTOR ) if keypoints_valid and keypoints is not None: draw_skeleton(annotated_frame, keypoints) out.write(annotated_frame) # Prepare results results_dict = { "video_analysis": { "output_video": str(output_video), }, "repetition_data": [ { "repetition": int(rep["repetition"]), "distancia_elevada": float(rep["distancia_elevada"]), "salto_alto": float(rep["salto_alto"]), "potencia_sayer": float(rep["potencia_sayer"]) } for rep in repetition_data ] } cap.release() out.release() return results_dict except Exception as e: print(f"Error in analyze_jump_video: {e}") return None def calculate_peak_power_sayer(jump_height_m, body_mass_kg): """ Estimates peak anaerobic power using Sayer's equation. Args: jump_height_m: Jump height in meters body_mass_kg: Body mass in kg Returns: Estimated peak power in watts """ jump_height_cm = jump_height_m * 100 return (60.7 * jump_height_cm) + (45.3 * body_mass_kg) - 2055 def calculate_high_jump(player_height:float, max_jump_height:float) -> float: """ Calculate the high jump height based on the player height and the max jump height. Args: player_height: Player height in meters max_jump_height: Relative jump height in meters Returns: the high jump height in meters """ return player_height + max_jump_height def draw_rounded_rect(img, pt1, pt2, color, thickness=-1, lineType=cv2.LINE_AA, radius=10): """ Draw a rectangle with rounded corners on an image. This function creates a rounded rectangle by drawing four corner ellipses and connecting them with straight rectangular sections. Args: img (numpy.ndarray): Image to draw on (modified in-place) pt1 (tuple): Top-left corner coordinates (x, y) pt2 (tuple): Bottom-right corner coordinates (x, y) color (tuple): BGR color tuple (B, G, R) thickness (int, optional): Line thickness. -1 for filled rectangle. Defaults to -1. lineType (int, optional): Type of line drawing. Defaults to cv2.LINE_AA. radius (int, optional): Corner radius in pixels. Defaults to 10. Returns: numpy.ndarray: The modified image with rounded rectangle drawn Note: - If radius is 0, draws a regular rectangle - For filled rectangles, use thickness=-1 - Corner ellipses are drawn at each corner with specified radius - Rectangle sections fill the gaps between ellipses """ x1, y1 = pt1 x2, y2 = pt2 if radius > 0: img = cv2.ellipse(img, (x1 + radius, y1 + radius), (radius, radius), 0, 0, 90, color, thickness, lineType) img = cv2.ellipse(img, (x2 - radius, y1 + radius), (radius, radius), 0, 90, 180, color, thickness, lineType) img = cv2.ellipse(img, (x2 - radius, y2 - radius), (radius, radius), 0, 180, 270, color, thickness, lineType) img = cv2.ellipse(img, (x1 + radius, y2 - radius), (radius, radius), 0, 270, 360, color, thickness, lineType) img = cv2.rectangle(img, (x1, y1 + radius), (x2, y2 - radius), color, thickness, lineType) img = cv2.rectangle(img, (x1 + radius, y1), (x2 - radius, y2), color, thickness, lineType) else: img = cv2.rectangle(img, pt1, pt2, color, thickness, lineType) return img def draw_main_title(overlay, config: OverlayConfig, width: int, colors: Dict): """ Draw the main title text centered at the top of the video frame. This function renders "Ejercicio de Salto" (Jump Exercise) as the main title using specified font configuration and centers it horizontally. Args: overlay (numpy.ndarray): Image overlay to draw on (modified in-place) config (OverlayConfig): Configuration object containing font settings width (int): Width of the video frame in pixels colors (Dict): Dictionary containing color definitions Returns: None Side Effects: - Draws text on the overlay image using white color - Text is positioned at the top center of the frame - Uses config.font_scale_title_main and config.font_thickness_title_main """ title_text = "Ejercicio de Salto" title_text_size = cv2.getTextSize(title_text, config.font, config.font_scale_title_main, config.font_thickness_title_main)[0] title_x = (width - title_text_size[0]) // 2 title_y = config.title_y_offset cv2.putText(overlay, title_text, (title_x, title_y), config.font, config.font_scale_title_main, colors["white"], config.font_thickness_title_main, cv2.LINE_AA) def calculate_metric_box_size(title: str, value: str, config: OverlayConfig) -> Tuple[int, int]: """ Calculate the required dimensions for a metric display box. This function determines the width and height needed to display a metric with its title and value, including padding and spacing requirements. Args: title (str): The metric title text (e.g., "SALTO ALTO") value (str): The metric value text (e.g., "2.15 m") config (OverlayConfig): Configuration object with font and spacing settings Returns: Tuple[int, int]: A tuple containing: - bg_width: Required width in pixels for the metric box - bg_height: Required height in pixels for the metric box Note: - Width is based on the maximum of title and value text widths - Height accounts for both text lines plus vertical padding - Includes horizontal padding on both sides """ title_size = cv2.getTextSize(title, config.font, config.font_scale_title_metric, config.font_thickness_metric)[0] value_size = cv2.getTextSize(value, config.font, config.font_scale_value, config.font_thickness_metric)[0] bg_width = max(title_size[0], value_size[0]) + 2 * config.padding_horizontal bg_height = config.line_height_title_metric + config.line_height_value + 2 * config.padding_vertical return bg_width, bg_height def draw_metric_box(overlay, title: str, value: str, x: int, y: int, bg_width: int, bg_height: int, config: OverlayConfig, colors: Dict): """ Draw a styled metric box with title and value text. This function creates a rounded rectangle background and draws metric information with proper text alignment and styling for video overlay display. Args: overlay (numpy.ndarray): Image overlay to draw on (modified in-place) title (str): Metric title text (displayed in smaller font) value (str): Metric value text (displayed in larger font) x (int): X-coordinate of box top-left corner y (int): Y-coordinate of box top-left corner bg_width (int): Width of the background box in pixels bg_height (int): Height of the background box in pixels config (OverlayConfig): Configuration object with styling settings colors (Dict): Dictionary containing color definitions Returns: numpy.ndarray: The modified overlay with the metric box drawn Side Effects: - Draws a rounded rectangle background with gray fill and white border - Centers title text in light gray color - Centers value text in white color below the title - Uses different font scales for title and value """ pt1 = (x, y) pt2 = (x + bg_width, y + bg_height) # Draw background overlay = draw_rounded_rect(overlay, pt1, pt2, colors["gray"], cv2.FILLED, cv2.LINE_AA, config.corner_radius) cv2.rectangle(overlay, pt1, pt2, colors["white"], config.border_thickness, cv2.LINE_AA) # Draw title title_size = cv2.getTextSize(title, config.font, config.font_scale_title_metric, config.font_thickness_metric)[0] title_x = x + (bg_width - title_size[0]) // 2 title_y = y + config.padding_vertical + config.line_height_title_metric // 2 + 2 cv2.putText(overlay, title, (title_x, title_y), config.font, config.font_scale_title_metric, colors["light_gray"], config.font_thickness_metric, cv2.LINE_AA) # Draw value value_size = cv2.getTextSize(value, config.font, config.font_scale_value, config.font_thickness_metric)[0] value_x = x + (bg_width - value_size[0]) // 2 value_y = y + config.padding_vertical + config.line_height_title_metric + config.line_height_value // 2 + 5 cv2.putText(overlay, value, (value_x, value_y), config.font, config.font_scale_value, colors["white"], config.font_thickness_metric, cv2.LINE_AA) return overlay def calculate_positions(width: int, height: int, last_detected_ankles_y: Optional[float], initial_left_shoulder_x: Optional[int], initial_right_shoulder_x: Optional[int], config: OverlayConfig, horizontal_offset_factor: float, metrics_below_feet_offset: int) -> Dict[str, Tuple[int, int]]: """ Calculate optimal positions for all metric display boxes on the video frame. This function determines where to place metric boxes based on detected body positions to avoid overlapping with the person while maintaining good visibility. Args: width (int): Video frame width in pixels height (int): Video frame height in pixels last_detected_ankles_y (Optional[float]): Y-coordinate of last detected ankles initial_left_shoulder_x (Optional[int]): X-coordinate of left shoulder reference initial_right_shoulder_x (Optional[int]): X-coordinate of right shoulder reference config (OverlayConfig): Configuration object with layout settings horizontal_offset_factor (float): Factor for horizontal positioning relative to shoulders metrics_below_feet_offset (int): Vertical offset below feet for metric placement Returns: Dict[str, Tuple[int, int]]: Dictionary mapping metric names to (x, y) positions: - "relativo": Position for relative jump metric - "alto": Position for high jump metric - "reps": Position for repetitions counter - "velocidad": Position for velocity metric (if ankles detected) - "potencia": Position for power metric (if ankles detected) Note: - Positions are calculated to avoid overlapping with the detected person - Some metrics are positioned relative to body parts when available - Falls back to default positions when body parts are not detected """ positions = {} # Relative jump box (left side, dynamically positioned) relativo_bg_width, relativo_bg_height = calculate_metric_box_size("SALTO RELATIVO", "0.00 m", config) x_relativo = 20 if last_detected_ankles_y is not None: y_relativo = int(last_detected_ankles_y - relativo_bg_height - 10) if y_relativo < config.title_y_offset + 50: y_relativo = int(last_detected_ankles_y + metrics_below_feet_offset) else: y_relativo = height - 150 positions["relativo"] = (x_relativo, y_relativo) # High jump box (top right) alto_bg_width, alto_bg_height = calculate_metric_box_size("SALTO ALTO", "0.00 m", config) x_alto = width - alto_bg_width - 20 if initial_right_shoulder_x is not None: available_space = width - initial_right_shoulder_x x_alto_calculated = initial_right_shoulder_x + int(available_space * (1 - horizontal_offset_factor)) - alto_bg_width if (x_alto_calculated > x_relativo + relativo_bg_width + config.spacing_horizontal + 10 and x_alto_calculated + alto_bg_width < width - 10): x_alto = x_alto_calculated positions["alto"] = (x_alto, config.metrics_y_offset_alto) # Repetitions box (below relative jump) positions["reps"] = (x_relativo, y_relativo + relativo_bg_height + 10) # Velocity and power boxes (centered below feet) if last_detected_ankles_y is not None: velocidad_bg_width, velocidad_bg_height = calculate_metric_box_size("VELOCIDAD VERTICAL", "0.00 m/s", config) x_velocidad = int(width / 2 - velocidad_bg_width / 2) y_velocidad = int(last_detected_ankles_y + metrics_below_feet_offset + velocidad_bg_height) positions["velocidad"] = (x_velocidad, y_velocidad - velocidad_bg_height) positions["potencia"] = (x_velocidad, y_velocidad + 5) return positions def draw_metrics_overlay(frame, max_jump_height, salto_alto, velocity_vertical, peak_power_sayer, repetition_count, last_detected_ankles_y, initial_left_shoulder_x, initial_right_shoulder_x, width, height, colors, metrics_below_feet_offset=20, horizontal_offset_factor=0.75): """ Draw metrics overlay on the frame. Args: frame: Input frame max_jump_height: Maximum jump height in meters salto_alto: Absolute jump height in meters velocity_vertical: Vertical velocity in m/s peak_power_sayer: Peak power in watts repetition_count: Number of repetitions last_detected_ankles_y: Y-coordinate of last detected ankles initial_left_shoulder_x: X-coordinate of left shoulder initial_right_shoulder_x: X-coordinate of right shoulder width: Frame width height: Frame height colors: Dictionary with color values metrics_below_feet_offset: Offset for metrics below feet horizontal_offset_factor: Factor for horizontal offset Returns: Frame with metrics overlay """ overlay = frame.copy() config = OverlayConfig() # Draw main title draw_main_title(overlay, config, width, colors) # Calculate positions for all metric boxes positions = calculate_positions(width, height, last_detected_ankles_y, initial_left_shoulder_x, initial_right_shoulder_x, config, horizontal_offset_factor, metrics_below_feet_offset) # Draw relative jump box if "relativo" in positions: relativo_value = f"{max(0, max_jump_height):.2f} m" bg_width, bg_height = calculate_metric_box_size("SALTO RELATIVO", relativo_value, config) x, y = positions["relativo"] overlay = draw_metric_box(overlay, "SALTO RELATIVO", relativo_value, x, y, bg_width, bg_height, config, colors) # Draw high jump box if "alto" in positions: alto_value = f"{max(0, salto_alto):.2f} m" bg_width, bg_height = calculate_metric_box_size("SALTO ALTO", alto_value, config) x, y = positions["alto"] overlay = draw_metric_box(overlay, "SALTO ALTO", alto_value, x, y, bg_width, bg_height, config, colors) # Draw repetitions box if "reps" in positions: reps_value = f"{repetition_count}" bg_width, bg_height = calculate_metric_box_size("REPETICIONES", reps_value, config) x, y = positions["reps"] overlay = draw_metric_box(overlay, "REPETICIONES", reps_value, x, y, bg_width, bg_height, config, colors) # Draw velocity box (only if ankles detected) if "velocidad" in positions: velocidad_value = f"{abs(velocity_vertical):.2f} m/s" bg_width, bg_height = calculate_metric_box_size("VELOCIDAD VERTICAL", velocidad_value, config) x, y = positions["velocidad"] overlay = draw_metric_box(overlay, "VELOCIDAD VERTICAL", velocidad_value, x, y, bg_width, bg_height, config, colors) # Draw power box (only if ankles detected) if "potencia" in positions: potencia_value = f"{peak_power_sayer:.2f} W" bg_width, bg_height = calculate_metric_box_size("POTENCIA SAYER", potencia_value, config) x, y = positions["potencia"] overlay = draw_metric_box(overlay, "POTENCIA SAYER", potencia_value, x, y, bg_width, bg_height, config, colors) # Blend overlay with original frame result = cv2.addWeighted(overlay, config.alpha, frame, 1 - config.alpha, 0) return result