Spaces:
Running
Running
""" | |
Video Content Analyzer for GAIA Agent - Phase 5 | |
Provides comprehensive video content analysis including scene segmentation, temporal patterns, and content summarization. | |
Features: | |
- Scene segmentation and analysis | |
- Temporal pattern recognition | |
- Object interaction analysis | |
- Content summarization and reporting | |
- Key frame identification and extraction | |
- Video metadata analysis | |
""" | |
import os | |
import logging | |
import cv2 | |
import numpy as np | |
from typing import Dict, Any, List, Optional, Tuple | |
import json | |
from datetime import datetime, timedelta | |
from pathlib import Path | |
import tempfile | |
# Configure logging | |
logger = logging.getLogger(__name__) | |
class VideoContentAnalyzer: | |
"""Advanced video content analyzer for scene understanding and temporal analysis.""" | |
def __init__(self): | |
"""Initialize the video content analyzer.""" | |
self.available = True | |
self.temp_dir = tempfile.mkdtemp() | |
# Analysis parameters | |
self.scene_change_threshold = 0.3 | |
self.keyframe_interval = 30 # Extract keyframe every 30 frames | |
self.min_scene_duration = 2.0 # Minimum scene duration in seconds | |
self.max_scenes = 50 # Maximum number of scenes to analyze | |
# Initialize analysis components | |
self._init_scene_analyzer() | |
self._init_temporal_analyzer() | |
logger.info(f"πΉ Video Content Analyzer initialized - Available: {self.available}") | |
def _init_scene_analyzer(self): | |
"""Initialize scene analysis components.""" | |
try: | |
# Scene change detection parameters | |
self.scene_detector_params = { | |
'histogram_bins': 32, | |
'color_spaces': ['HSV', 'RGB'], | |
'comparison_methods': [cv2.HISTCMP_CORREL, cv2.HISTCMP_CHISQR], | |
'motion_threshold': 0.1 | |
} | |
logger.info("β Scene analyzer initialized") | |
except Exception as e: | |
logger.warning(f"β οΈ Scene analyzer initialization failed: {e}") | |
def _init_temporal_analyzer(self): | |
"""Initialize temporal analysis components.""" | |
try: | |
# Temporal pattern analysis parameters | |
self.temporal_params = { | |
'pattern_window': 10, # Analyze patterns over 10 frame windows | |
'smoothing_factor': 0.3, | |
'trend_threshold': 0.1, | |
'periodicity_detection': True | |
} | |
logger.info("β Temporal analyzer initialized") | |
except Exception as e: | |
logger.warning(f"β οΈ Temporal analyzer initialization failed: {e}") | |
def analyze_video_content(self, video_path: str, | |
object_detections: List[List[Dict[str, Any]]] = None, | |
question: str = None) -> Dict[str, Any]: | |
""" | |
Perform comprehensive video content analysis. | |
Args: | |
video_path: Path to video file | |
object_detections: Optional pre-computed object detections per frame | |
question: Optional question to guide analysis | |
Returns: | |
Comprehensive content analysis results | |
""" | |
try: | |
logger.info(f"πΉ Starting video content analysis for: {video_path}") | |
# Extract video metadata | |
metadata = self._extract_video_metadata(video_path) | |
# Perform scene segmentation | |
scenes = self._segment_scenes(video_path) | |
# Extract key frames | |
keyframes = self._extract_keyframes(video_path, scenes) | |
# Analyze temporal patterns | |
temporal_analysis = self._analyze_temporal_patterns( | |
video_path, object_detections, scenes | |
) | |
# Perform content summarization | |
content_summary = self._summarize_content( | |
scenes, keyframes, temporal_analysis, object_detections | |
) | |
# Generate interaction analysis | |
interaction_analysis = self._analyze_object_interactions( | |
object_detections, scenes | |
) | |
# Create comprehensive report | |
analysis_report = self._create_content_report( | |
metadata, scenes, keyframes, temporal_analysis, | |
content_summary, interaction_analysis, question | |
) | |
return analysis_report | |
except Exception as e: | |
logger.error(f"β Video content analysis failed: {e}") | |
return { | |
'success': False, | |
'error': f'Content analysis failed: {str(e)}' | |
} | |
def _extract_video_metadata(self, video_path: str) -> Dict[str, Any]: | |
"""Extract comprehensive video metadata.""" | |
try: | |
cap = cv2.VideoCapture(video_path) | |
if not cap.isOpened(): | |
raise Exception("Failed to open video file") | |
# Basic properties | |
fps = cap.get(cv2.CAP_PROP_FPS) | |
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
duration = frame_count / fps if fps > 0 else 0 | |
# Additional properties | |
fourcc = int(cap.get(cv2.CAP_PROP_FOURCC)) | |
codec = "".join([chr((fourcc >> 8 * i) & 0xFF) for i in range(4)]) | |
cap.release() | |
metadata = { | |
'filename': os.path.basename(video_path), | |
'duration_seconds': duration, | |
'fps': fps, | |
'frame_count': frame_count, | |
'resolution': {'width': width, 'height': height}, | |
'aspect_ratio': width / height if height > 0 else 1.0, | |
'codec': codec, | |
'file_size': os.path.getsize(video_path) if os.path.exists(video_path) else 0, | |
'analysis_timestamp': datetime.now().isoformat() | |
} | |
logger.info(f"π Video metadata extracted: {duration:.1f}s, {width}x{height}, {fps:.1f} FPS") | |
return metadata | |
except Exception as e: | |
logger.error(f"β Failed to extract video metadata: {e}") | |
return {} | |
def _segment_scenes(self, video_path: str) -> List[Dict[str, Any]]: | |
"""Segment video into distinct scenes based on visual changes.""" | |
try: | |
cap = cv2.VideoCapture(video_path) | |
if not cap.isOpened(): | |
raise Exception("Failed to open video file") | |
scenes = [] | |
prev_hist = None | |
scene_start = 0 | |
frame_count = 0 | |
fps = cap.get(cv2.CAP_PROP_FPS) | |
scene_id = 0 | |
while cap.isOpened(): | |
ret, frame = cap.read() | |
if not ret: | |
break | |
# Calculate histogram for scene change detection | |
hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV) | |
hist = cv2.calcHist([hsv], [0, 1, 2], None, | |
[self.scene_detector_params['histogram_bins']] * 3, | |
[0, 180, 0, 256, 0, 256]) | |
# Detect scene change | |
if prev_hist is not None: | |
correlation = cv2.compareHist(hist, prev_hist, cv2.HISTCMP_CORREL) | |
if correlation < self.scene_change_threshold: | |
# Scene change detected | |
scene_end = frame_count | |
scene_duration = (scene_end - scene_start) / fps | |
if scene_duration >= self.min_scene_duration: | |
scene = { | |
'id': scene_id, | |
'start_frame': scene_start, | |
'end_frame': scene_end, | |
'start_time': scene_start / fps, | |
'end_time': scene_end / fps, | |
'duration': scene_duration, | |
'frame_count': scene_end - scene_start | |
} | |
scenes.append(scene) | |
scene_id += 1 | |
if len(scenes) >= self.max_scenes: | |
break | |
scene_start = frame_count | |
prev_hist = hist | |
frame_count += 1 | |
# Add final scene | |
if scene_start < frame_count: | |
scene_duration = (frame_count - scene_start) / fps | |
if scene_duration >= self.min_scene_duration: | |
scene = { | |
'id': scene_id, | |
'start_frame': scene_start, | |
'end_frame': frame_count, | |
'start_time': scene_start / fps, | |
'end_time': frame_count / fps, | |
'duration': scene_duration, | |
'frame_count': frame_count - scene_start | |
} | |
scenes.append(scene) | |
cap.release() | |
logger.info(f"π¬ Scene segmentation complete: {len(scenes)} scenes detected") | |
return scenes | |
except Exception as e: | |
logger.error(f"β Scene segmentation failed: {e}") | |
return [] | |
def _extract_keyframes(self, video_path: str, scenes: List[Dict[str, Any]]) -> List[Dict[str, Any]]: | |
"""Extract representative keyframes from video scenes.""" | |
try: | |
cap = cv2.VideoCapture(video_path) | |
if not cap.isOpened(): | |
raise Exception("Failed to open video file") | |
keyframes = [] | |
fps = cap.get(cv2.CAP_PROP_FPS) | |
for scene in scenes: | |
# Extract keyframes from each scene | |
scene_keyframes = [] | |
# Extract keyframe from middle of scene | |
mid_frame = (scene['start_frame'] + scene['end_frame']) // 2 | |
cap.set(cv2.CAP_PROP_POS_FRAMES, mid_frame) | |
ret, frame = cap.read() | |
if ret: | |
keyframe = { | |
'scene_id': scene['id'], | |
'frame_number': mid_frame, | |
'timestamp': mid_frame / fps, | |
'type': 'scene_representative', | |
'frame_data': frame, | |
'visual_features': self._extract_visual_features(frame) | |
} | |
scene_keyframes.append(keyframe) | |
# Extract additional keyframes for longer scenes | |
if scene['duration'] > 10: # For scenes longer than 10 seconds | |
# Extract keyframes at 1/4 and 3/4 points | |
for fraction in [0.25, 0.75]: | |
frame_pos = int(scene['start_frame'] + | |
fraction * (scene['end_frame'] - scene['start_frame'])) | |
cap.set(cv2.CAP_PROP_POS_FRAMES, frame_pos) | |
ret, frame = cap.read() | |
if ret: | |
keyframe = { | |
'scene_id': scene['id'], | |
'frame_number': frame_pos, | |
'timestamp': frame_pos / fps, | |
'type': 'temporal_sample', | |
'frame_data': frame, | |
'visual_features': self._extract_visual_features(frame) | |
} | |
scene_keyframes.append(keyframe) | |
keyframes.extend(scene_keyframes) | |
cap.release() | |
logger.info(f"πΌοΈ Keyframe extraction complete: {len(keyframes)} keyframes extracted") | |
return keyframes | |
except Exception as e: | |
logger.error(f"β Keyframe extraction failed: {e}") | |
return [] | |
def _extract_visual_features(self, frame: np.ndarray) -> Dict[str, Any]: | |
"""Extract visual features from a frame.""" | |
try: | |
features = {} | |
# Color histogram | |
hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV) | |
hist_h = cv2.calcHist([hsv], [0], None, [32], [0, 180]) | |
hist_s = cv2.calcHist([hsv], [1], None, [32], [0, 256]) | |
hist_v = cv2.calcHist([hsv], [2], None, [32], [0, 256]) | |
features['color_histogram'] = { | |
'hue': hist_h.flatten().tolist(), | |
'saturation': hist_s.flatten().tolist(), | |
'value': hist_v.flatten().tolist() | |
} | |
# Edge density | |
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) | |
edges = cv2.Canny(gray, 50, 150) | |
edge_density = np.sum(edges > 0) / (edges.shape[0] * edges.shape[1]) | |
features['edge_density'] = float(edge_density) | |
# Brightness and contrast | |
features['brightness'] = float(np.mean(gray)) | |
features['contrast'] = float(np.std(gray)) | |
# Dominant colors | |
features['dominant_colors'] = self._get_dominant_colors(frame) | |
return features | |
except Exception as e: | |
logger.error(f"β Visual feature extraction failed: {e}") | |
return {} | |
def _get_dominant_colors(self, frame: np.ndarray, k: int = 3) -> List[List[int]]: | |
"""Extract dominant colors from frame using k-means clustering.""" | |
try: | |
# Reshape frame to list of pixels | |
pixels = frame.reshape(-1, 3) | |
# Use k-means to find dominant colors | |
from sklearn.cluster import KMeans | |
kmeans = KMeans(n_clusters=k, random_state=42, n_init=10) | |
kmeans.fit(pixels) | |
# Get dominant colors | |
colors = kmeans.cluster_centers_.astype(int) | |
return colors.tolist() | |
except ImportError: | |
# Fallback without sklearn | |
return [[128, 128, 128]] # Gray as default | |
except Exception as e: | |
logger.error(f"β Dominant color extraction failed: {e}") | |
return [[128, 128, 128]] | |
def _analyze_temporal_patterns(self, video_path: str, | |
object_detections: List[List[Dict[str, Any]]] = None, | |
scenes: List[Dict[str, Any]] = None) -> Dict[str, Any]: | |
"""Analyze temporal patterns in video content.""" | |
try: | |
temporal_analysis = { | |
'motion_patterns': [], | |
'object_appearance_patterns': [], | |
'scene_transition_patterns': [], | |
'activity_levels': [], | |
'periodicity': {} | |
} | |
if not object_detections: | |
return temporal_analysis | |
# Analyze motion patterns | |
motion_levels = [] | |
for frame_detections in object_detections: | |
# Calculate motion level based on number and size of objects | |
motion_level = len(frame_detections) | |
if frame_detections: | |
avg_area = np.mean([det.get('area', 0) for det in frame_detections]) | |
motion_level += avg_area / 10000 # Normalize area contribution | |
motion_levels.append(motion_level) | |
temporal_analysis['motion_patterns'] = motion_levels | |
# Analyze object appearance patterns | |
object_counts_over_time = [] | |
bird_counts_over_time = [] | |
animal_counts_over_time = [] | |
for frame_detections in object_detections: | |
object_count = len(frame_detections) | |
bird_count = sum(1 for det in frame_detections | |
if det.get('species_type') == 'bird') | |
animal_count = sum(1 for det in frame_detections | |
if det.get('species_type') == 'animal') | |
object_counts_over_time.append(object_count) | |
bird_counts_over_time.append(bird_count) | |
animal_counts_over_time.append(animal_count) | |
temporal_analysis['object_appearance_patterns'] = { | |
'total_objects': object_counts_over_time, | |
'birds': bird_counts_over_time, | |
'animals': animal_counts_over_time | |
} | |
# Analyze activity levels | |
window_size = self.temporal_params['pattern_window'] | |
activity_levels = [] | |
for i in range(0, len(motion_levels), window_size): | |
window = motion_levels[i:i+window_size] | |
if window: | |
activity_level = { | |
'start_frame': i, | |
'end_frame': min(i + window_size, len(motion_levels)), | |
'avg_motion': np.mean(window), | |
'max_motion': np.max(window), | |
'motion_variance': np.var(window) | |
} | |
activity_levels.append(activity_level) | |
temporal_analysis['activity_levels'] = activity_levels | |
# Detect periodicity in object appearances | |
if len(bird_counts_over_time) > 20: # Need sufficient data | |
temporal_analysis['periodicity'] = self._detect_periodicity( | |
bird_counts_over_time, animal_counts_over_time | |
) | |
logger.info("π Temporal pattern analysis complete") | |
return temporal_analysis | |
except Exception as e: | |
logger.error(f"β Temporal pattern analysis failed: {e}") | |
return {} | |
def _detect_periodicity(self, bird_counts: List[int], | |
animal_counts: List[int]) -> Dict[str, Any]: | |
"""Detect periodic patterns in object appearances.""" | |
try: | |
periodicity = { | |
'bird_patterns': {}, | |
'animal_patterns': {}, | |
'combined_patterns': {} | |
} | |
# Simple autocorrelation-based periodicity detection | |
def autocorrelation(signal, max_lag=50): | |
signal = np.array(signal) | |
n = len(signal) | |
signal = signal - np.mean(signal) | |
autocorr = [] | |
for lag in range(min(max_lag, n//2)): | |
if n - lag > 0: | |
corr = np.corrcoef(signal[:-lag], signal[lag:])[0, 1] | |
autocorr.append(corr if not np.isnan(corr) else 0) | |
else: | |
autocorr.append(0) | |
return autocorr | |
# Analyze bird count periodicity | |
bird_autocorr = autocorrelation(bird_counts) | |
if bird_autocorr: | |
max_corr_idx = np.argmax(bird_autocorr[1:]) + 1 # Skip lag 0 | |
periodicity['bird_patterns'] = { | |
'dominant_period': max_corr_idx, | |
'correlation_strength': bird_autocorr[max_corr_idx], | |
'is_periodic': bird_autocorr[max_corr_idx] > 0.3 | |
} | |
# Analyze animal count periodicity | |
animal_autocorr = autocorrelation(animal_counts) | |
if animal_autocorr: | |
max_corr_idx = np.argmax(animal_autocorr[1:]) + 1 | |
periodicity['animal_patterns'] = { | |
'dominant_period': max_corr_idx, | |
'correlation_strength': animal_autocorr[max_corr_idx], | |
'is_periodic': animal_autocorr[max_corr_idx] > 0.3 | |
} | |
return periodicity | |
except Exception as e: | |
logger.error(f"β Periodicity detection failed: {e}") | |
return {} | |
def _summarize_content(self, scenes: List[Dict[str, Any]], | |
keyframes: List[Dict[str, Any]], | |
temporal_analysis: Dict[str, Any], | |
object_detections: List[List[Dict[str, Any]]] = None) -> Dict[str, Any]: | |
"""Generate comprehensive content summary.""" | |
try: | |
summary = { | |
'overview': {}, | |
'scene_summary': [], | |
'key_moments': [], | |
'content_highlights': [], | |
'statistical_summary': {} | |
} | |
# Overview | |
total_duration = sum(scene.get('duration', 0) for scene in scenes) | |
summary['overview'] = { | |
'total_scenes': len(scenes), | |
'total_duration': total_duration, | |
'avg_scene_duration': total_duration / len(scenes) if scenes else 0, | |
'keyframes_extracted': len(keyframes) | |
} | |
# Scene summary | |
for scene in scenes: | |
scene_summary = { | |
'scene_id': scene['id'], | |
'duration': scene['duration'], | |
'description': f"Scene {scene['id'] + 1}: {scene['duration']:.1f}s", | |
'activity_level': 'unknown' | |
} | |
# Determine activity level from temporal analysis | |
if temporal_analysis.get('activity_levels'): | |
scene_start_frame = scene['start_frame'] | |
scene_end_frame = scene['end_frame'] | |
relevant_activities = [ | |
activity for activity in temporal_analysis['activity_levels'] | |
if (activity['start_frame'] <= scene_end_frame and | |
activity['end_frame'] >= scene_start_frame) | |
] | |
if relevant_activities: | |
avg_motion = np.mean([a['avg_motion'] for a in relevant_activities]) | |
if avg_motion > 2: | |
scene_summary['activity_level'] = 'high' | |
elif avg_motion > 1: | |
scene_summary['activity_level'] = 'medium' | |
else: | |
scene_summary['activity_level'] = 'low' | |
summary['scene_summary'].append(scene_summary) | |
# Key moments (high activity periods) | |
if temporal_analysis.get('activity_levels'): | |
high_activity_moments = [ | |
activity for activity in temporal_analysis['activity_levels'] | |
if activity['avg_motion'] > 2 | |
] | |
summary['key_moments'] = [ | |
{ | |
'timestamp': moment['start_frame'] / 30, # Assume 30 FPS | |
'duration': (moment['end_frame'] - moment['start_frame']) / 30, | |
'activity_level': moment['avg_motion'], | |
'description': f"High activity period: {moment['avg_motion']:.1f}" | |
} | |
for moment in high_activity_moments[:5] # Top 5 moments | |
] | |
# Statistical summary | |
if object_detections: | |
all_detections = [det for frame_dets in object_detections for det in frame_dets] | |
species_counts = {} | |
for detection in all_detections: | |
species = detection.get('species_type', 'unknown') | |
species_counts[species] = species_counts.get(species, 0) + 1 | |
summary['statistical_summary'] = { | |
'total_detections': len(all_detections), | |
'species_distribution': species_counts, | |
'avg_detections_per_frame': len(all_detections) / len(object_detections) if object_detections else 0 | |
} | |
logger.info("π Content summarization complete") | |
return summary | |
except Exception as e: | |
logger.error(f"β Content summarization failed: {e}") | |
return {} | |
def _analyze_object_interactions(self, object_detections: List[List[Dict[str, Any]]] = None, | |
scenes: List[Dict[str, Any]] = None) -> Dict[str, Any]: | |
"""Analyze interactions between detected objects.""" | |
try: | |
interaction_analysis = { | |
'proximity_interactions': [], | |
'temporal_interactions': [], | |
'species_interactions': {}, | |
'interaction_summary': {} | |
} | |
if not object_detections: | |
return interaction_analysis | |
# Analyze proximity interactions within frames | |
for frame_idx, frame_detections in enumerate(object_detections): | |
if len(frame_detections) > 1: | |
# Check all pairs of objects in the frame | |
for i, obj1 in enumerate(frame_detections): | |
for j, obj2 in enumerate(frame_detections[i+1:], i+1): | |
distance = self._calculate_object_distance(obj1, obj2) | |
if distance < 100: # Close proximity threshold | |
interaction = { | |
'frame': frame_idx, | |
'timestamp': frame_idx / 30, # Assume 30 FPS | |
'object1': obj1.get('class', 'unknown'), | |
'object2': obj2.get('class', 'unknown'), | |
'distance': distance, | |
'interaction_type': 'proximity' | |
} | |
interaction_analysis['proximity_interactions'].append(interaction) | |
# Analyze species interactions | |
species_pairs = {} | |
for interaction in interaction_analysis['proximity_interactions']: | |
obj1_type = interaction['object1'] | |
obj2_type = interaction['object2'] | |
pair_key = tuple(sorted([obj1_type, obj2_type])) | |
if pair_key not in species_pairs: | |
species_pairs[pair_key] = [] | |
species_pairs[pair_key].append(interaction) | |
interaction_analysis['species_interactions'] = { | |
f"{pair[0]}-{pair[1]}": { | |
'interaction_count': len(interactions), | |
'avg_distance': np.mean([i['distance'] for i in interactions]), | |
'duration': len(interactions) / 30 # Approximate duration | |
} | |
for pair, interactions in species_pairs.items() | |
} | |
# Interaction summary | |
interaction_analysis['interaction_summary'] = { | |
'total_proximity_interactions': len(interaction_analysis['proximity_interactions']), | |
'unique_species_pairs': len(species_pairs), | |
'most_interactive_pair': max(species_pairs.keys(), | |
key=lambda x: len(species_pairs[x])) if species_pairs else None | |
} | |
logger.info("π€ Object interaction analysis complete") | |
return interaction_analysis | |
except Exception as e: | |
logger.error(f"β Object interaction analysis failed: {e}") | |
return {} | |
def _calculate_object_distance(self, obj1: Dict[str, Any], obj2: Dict[str, Any]) -> float: | |
"""Calculate distance between two objects based on their centers.""" | |
try: | |
center1 = obj1.get('center', [0, 0]) | |
center2 = obj2.get('center', [0, 0]) | |
distance = np.sqrt((center1[0] - center2[0])**2 + (center1[1] - center2[1])**2) | |
return float(distance) | |
except Exception as e: | |
logger.error(f"β Distance calculation failed: {e}") | |
return float('inf') | |
def _create_content_report(self, metadata: Dict[str, Any], | |
scenes: List[Dict[str, Any]], | |
keyframes: List[Dict[str, Any]], | |
temporal_analysis: Dict[str, Any], | |
content_summary: Dict[str, Any], | |
interaction_analysis: Dict[str, Any], | |
question: str = None) -> Dict[str, Any]: | |
"""Create comprehensive content analysis report.""" | |
try: | |
report = { | |
'success': True, | |
'analysis_timestamp': datetime.now().isoformat(), | |
'question': question, | |
'metadata': metadata, | |
'content_analysis': { | |
'scenes': scenes, | |
'keyframes': [ | |
{k: v for k, v in kf.items() if k != 'frame_data'} # Exclude frame data | |
for kf in keyframes | |
], | |
'temporal_patterns': temporal_analysis, | |
'content_summary': content_summary, | |
'interactions': interaction_analysis | |
}, | |
'insights': [], | |
'recommendations': [] | |
} | |
# Generate insights | |
insights = [] | |
# Scene insights | |
if scenes: | |
avg_scene_duration = np.mean([s['duration'] for s in scenes]) | |
insights.append(f"Video contains {len(scenes)} distinct scenes with average duration of {avg_scene_duration:.1f}s") | |
# Activity insights | |
if temporal_analysis.get('activity_levels'): | |
high_activity_count = sum(1 for a in temporal_analysis['activity_levels'] if a['avg_motion'] > 2) | |
insights.append(f"Detected {high_activity_count} high-activity periods in the video") | |
# Interaction insights | |
if interaction_analysis.get('interaction_summary', {}).get('total_proximity_interactions', 0) > 0: | |
total_interactions = interaction_analysis['interaction_summary']['total_proximity_interactions'] | |
insights.append(f"Found {total_interactions} object proximity interactions") | |
report['insights'] = insights | |
# Generate recommendations | |
recommendations = [] | |
if question and 'bird' in question.lower(): | |
if temporal_analysis.get('object_appearance_patterns', {}).get('birds'): | |
max_birds = max(temporal_analysis['object_appearance_patterns']['birds']) | |
recommendations.append(f"Maximum simultaneous birds detected: {max_birds}") | |
if len(scenes) > 10: | |
recommendations.append("Video has many scene changes - consider analyzing key scenes only") | |
report['recommendations'] = recommendations | |
logger.info("π Content analysis report generated successfully") | |
return report | |
except Exception as e: | |
logger.error(f"β Failed to create content report: {e}") | |
return { | |
'success': False, | |
'error': f'Failed to create content report: {str(e)}' | |
} | |
def get_capabilities(self) -> Dict[str, Any]: | |
"""Get video content analyzer capabilities.""" | |
return { | |
'available': self.available, | |
'scene_change_threshold': self.scene_change_threshold, | |
'keyframe_interval': self.keyframe_interval, | |
'min_scene_duration': self.min_scene_duration, | |
'max_scenes': self.max_scenes, | |
'features': [ | |
'Scene segmentation', | |
'Keyframe extraction', | |
'Temporal pattern analysis', | |
'Object interaction analysis', | |
'Content summarization', | |
'Visual feature extraction', | |
'Activity level detection', | |
'Periodicity detection' | |
] | |
} | |
# Factory function for creating content analyzer | |
def create_video_content_analyzer() -> VideoContentAnalyzer: | |
"""Create and return a video content analyzer instance.""" | |
return VideoContentAnalyzer() | |
if __name__ == "__main__": | |
# Test the content analyzer | |
analyzer = VideoContentAnalyzer() | |
print(f"Content analyzer available: {analyzer.available}") | |
print(f"Capabilities: {json.dumps(analyzer.get_capabilities(), indent=2)}") |