Spaces:
Running
Running
""" | |
Advanced Video Analyzer for GAIA Agent - Phase 5 | |
Comprehensive video analysis tool for YouTube videos with object detection and temporal tracking. | |
Features: | |
- YouTube video downloading and processing | |
- Advanced object detection using YOLO models | |
- Bird and animal species identification | |
- Temporal object tracking across frames | |
- Simultaneous object counting | |
- Integration with AGNO framework | |
""" | |
import os | |
import logging | |
import cv2 | |
import numpy as np | |
from typing import Dict, Any, List, Optional, Tuple | |
import json | |
import tempfile | |
import shutil | |
from pathlib import Path | |
from datetime import datetime | |
import yt_dlp | |
# Import detection engines | |
try: | |
from .object_detection_engine import ObjectDetectionEngine | |
from .video_content_analyzer import create_video_content_analyzer | |
except ImportError: | |
try: | |
from object_detection_engine import ObjectDetectionEngine | |
from video_content_analyzer import create_video_content_analyzer | |
except ImportError: | |
ObjectDetectionEngine = None | |
create_video_content_analyzer = None | |
# Configure logging | |
logger = logging.getLogger(__name__) | |
class AdvancedVideoAnalyzer: | |
"""Advanced video analyzer for comprehensive video content analysis.""" | |
def __init__(self): | |
"""Initialize the advanced video analyzer.""" | |
self.available = True | |
self.temp_dir = tempfile.mkdtemp() | |
# Initialize detection engine | |
self.detection_engine = None | |
if ObjectDetectionEngine: | |
try: | |
self.detection_engine = ObjectDetectionEngine() | |
if not self.detection_engine.available: | |
logger.warning("⚠️ Object detection engine not available") | |
except Exception as e: | |
logger.warning(f"⚠️ Failed to initialize object detection engine: {e}") | |
# Initialize content analyzer | |
self.content_analyzer = None | |
if create_video_content_analyzer: | |
try: | |
self.content_analyzer = create_video_content_analyzer() | |
if not self.content_analyzer.available: | |
logger.warning("⚠️ Video content analyzer not available") | |
except Exception as e: | |
logger.warning(f"⚠️ Failed to initialize video content analyzer: {e}") | |
# Analysis parameters | |
self.frame_sampling_rate = 1 # Analyze every frame by default | |
self.max_frames = 1000 # Maximum frames to analyze | |
self.confidence_threshold = 0.3 | |
self.nms_threshold = 0.4 | |
logger.info(f"📹 Advanced Video Analyzer initialized - Available: {self.available}") | |
def analyze_video(self, video_url: str, question: str = None, | |
max_duration: int = 300) -> Dict[str, Any]: | |
""" | |
Analyze a video comprehensively for object detection and counting. | |
Args: | |
video_url: URL of the video (YouTube supported) | |
question: Optional question to guide analysis | |
max_duration: Maximum video duration to process (seconds) | |
Returns: | |
Comprehensive video analysis results | |
""" | |
try: | |
logger.info(f"📹 Starting video analysis for: {video_url}") | |
# Download video | |
video_path = self._download_video(video_url, max_duration) | |
if not video_path: | |
return { | |
'success': False, | |
'error': 'Failed to download video' | |
} | |
# Extract video metadata | |
metadata = self._extract_video_metadata(video_path) | |
# Perform frame-by-frame object detection | |
detection_results = self._analyze_video_frames(video_path, question) | |
# Perform content analysis | |
content_analysis = None | |
if self.content_analyzer: | |
content_analysis = self.content_analyzer.analyze_video_content( | |
video_path, detection_results.get('frame_detections', []), question | |
) | |
# Generate comprehensive analysis report | |
analysis_report = self._create_analysis_report( | |
video_url, metadata, detection_results, content_analysis, question | |
) | |
# Cleanup | |
self._cleanup_temp_files(video_path) | |
return analysis_report | |
except Exception as e: | |
logger.error(f"❌ Video analysis failed: {e}") | |
return { | |
'success': False, | |
'error': f'Video analysis failed: {str(e)}' | |
} | |
def _download_video(self, video_url: str, max_duration: int = 300) -> Optional[str]: | |
"""Download video from URL using yt-dlp.""" | |
try: | |
output_path = os.path.join(self.temp_dir, 'video.%(ext)s') | |
ydl_opts = { | |
'format': 'best[height<=720][ext=mp4]/best[ext=mp4]/best', | |
'outtmpl': output_path, | |
'quiet': True, | |
'no_warnings': True, | |
'extract_flat': False, | |
'writethumbnail': False, | |
'writeinfojson': False, | |
'match_filter': lambda info_dict: None if info_dict.get('duration', 0) <= max_duration else "Video too long" | |
} | |
with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
# Extract info first to check duration | |
info = ydl.extract_info(video_url, download=False) | |
duration = info.get('duration', 0) | |
if duration > max_duration: | |
logger.warning(f"⚠️ Video duration ({duration}s) exceeds maximum ({max_duration}s)") | |
return None | |
# Download the video | |
ydl.download([video_url]) | |
# Find the downloaded file | |
for file in os.listdir(self.temp_dir): | |
if file.startswith('video.') and file.endswith(('.mp4', '.webm', '.mkv')): | |
video_path = os.path.join(self.temp_dir, file) | |
logger.info(f"✅ Video downloaded: {video_path}") | |
return video_path | |
logger.error("❌ Downloaded video file not found") | |
return None | |
except Exception as e: | |
logger.error(f"❌ Video download failed: {e}") | |
return None | |
def _extract_video_metadata(self, video_path: str) -> Dict[str, Any]: | |
"""Extract video metadata using OpenCV.""" | |
try: | |
cap = cv2.VideoCapture(video_path) | |
if not cap.isOpened(): | |
raise Exception("Failed to open video file") | |
fps = cap.get(cv2.CAP_PROP_FPS) | |
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
duration = frame_count / fps if fps > 0 else 0 | |
cap.release() | |
metadata = { | |
'duration_seconds': duration, | |
'fps': fps, | |
'frame_count': frame_count, | |
'resolution': {'width': width, 'height': height}, | |
'file_size': os.path.getsize(video_path), | |
'analysis_timestamp': datetime.now().isoformat() | |
} | |
logger.info(f"📊 Video metadata: {duration:.1f}s, {width}x{height}, {fps:.1f} FPS") | |
return metadata | |
except Exception as e: | |
logger.error(f"❌ Failed to extract video metadata: {e}") | |
return {} | |
def _analyze_video_frames(self, video_path: str, question: str = None) -> Dict[str, Any]: | |
"""Analyze video frames for object detection and tracking.""" | |
try: | |
if not self.detection_engine or not self.detection_engine.available: | |
logger.warning("⚠️ Object detection engine not available") | |
return {'frame_detections': [], 'summary': {}} | |
cap = cv2.VideoCapture(video_path) | |
if not cap.isOpened(): | |
raise Exception("Failed to open video file") | |
frame_detections = [] | |
frame_count = 0 | |
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
fps = cap.get(cv2.CAP_PROP_FPS) | |
# Determine frame sampling rate based on video length | |
if total_frames > self.max_frames: | |
self.frame_sampling_rate = max(1, total_frames // self.max_frames) | |
logger.info(f"📊 Sampling every {self.frame_sampling_rate} frames") | |
# Track objects across frames | |
object_tracker = {} | |
next_object_id = 0 | |
while cap.isOpened() and frame_count < total_frames: | |
ret, frame = cap.read() | |
if not ret: | |
break | |
# Sample frames based on sampling rate | |
if frame_count % self.frame_sampling_rate == 0: | |
# Detect objects in frame | |
detections = self.detection_engine.detect_objects( | |
frame, | |
confidence_threshold=self.confidence_threshold, | |
nms_threshold=self.nms_threshold | |
) | |
# Add temporal information | |
timestamp = frame_count / fps | |
for detection in detections: | |
detection['frame_number'] = frame_count | |
detection['timestamp'] = timestamp | |
frame_detections.append(detections) | |
# Progress logging | |
if len(frame_detections) % 50 == 0: | |
progress = (frame_count / total_frames) * 100 | |
logger.info(f"📈 Analysis progress: {progress:.1f}% ({len(frame_detections)} frames analyzed)") | |
frame_count += 1 | |
# Break if we've analyzed enough frames | |
if len(frame_detections) >= self.max_frames: | |
break | |
cap.release() | |
# Generate detection summary | |
summary = self._generate_detection_summary(frame_detections, question) | |
logger.info(f"✅ Frame analysis complete: {len(frame_detections)} frames analyzed") | |
return { | |
'frame_detections': frame_detections, | |
'summary': summary, | |
'analysis_params': { | |
'frame_sampling_rate': self.frame_sampling_rate, | |
'confidence_threshold': self.confidence_threshold, | |
'nms_threshold': self.nms_threshold, | |
'frames_analyzed': len(frame_detections) | |
} | |
} | |
except Exception as e: | |
logger.error(f"❌ Frame analysis failed: {e}") | |
return {'frame_detections': [], 'summary': {}} | |
def _generate_detection_summary(self, frame_detections: List[List[Dict[str, Any]]], | |
question: str = None) -> Dict[str, Any]: | |
"""Generate summary of detection results.""" | |
try: | |
summary = { | |
'total_frames_analyzed': len(frame_detections), | |
'total_detections': 0, | |
'species_counts': {}, | |
'max_simultaneous_objects': 0, | |
'max_simultaneous_birds': 0, | |
'max_simultaneous_animals': 0, | |
'temporal_patterns': [], | |
'answer_analysis': {} | |
} | |
# Analyze each frame | |
simultaneous_counts = [] | |
bird_counts = [] | |
animal_counts = [] | |
for frame_dets in frame_detections: | |
summary['total_detections'] += len(frame_dets) | |
# Count objects by type | |
frame_birds = 0 | |
frame_animals = 0 | |
frame_objects = len(frame_dets) | |
for detection in frame_dets: | |
species_type = detection.get('species_type', 'unknown') | |
class_name = detection.get('class', 'unknown') | |
# Update species counts | |
if species_type not in summary['species_counts']: | |
summary['species_counts'][species_type] = 0 | |
summary['species_counts'][species_type] += 1 | |
# Count birds and animals | |
if species_type == 'bird': | |
frame_birds += 1 | |
elif species_type == 'animal': | |
frame_animals += 1 | |
simultaneous_counts.append(frame_objects) | |
bird_counts.append(frame_birds) | |
animal_counts.append(frame_animals) | |
# Calculate maximums | |
if simultaneous_counts: | |
summary['max_simultaneous_objects'] = max(simultaneous_counts) | |
if bird_counts: | |
summary['max_simultaneous_birds'] = max(bird_counts) | |
if animal_counts: | |
summary['max_simultaneous_animals'] = max(animal_counts) | |
# Analyze question-specific patterns | |
if question: | |
summary['answer_analysis'] = self._analyze_question_specific_patterns( | |
question, frame_detections, bird_counts, animal_counts | |
) | |
# Generate temporal patterns | |
summary['temporal_patterns'] = { | |
'avg_objects_per_frame': np.mean(simultaneous_counts) if simultaneous_counts else 0, | |
'avg_birds_per_frame': np.mean(bird_counts) if bird_counts else 0, | |
'avg_animals_per_frame': np.mean(animal_counts) if animal_counts else 0, | |
'object_variance': np.var(simultaneous_counts) if simultaneous_counts else 0 | |
} | |
return summary | |
except Exception as e: | |
logger.error(f"❌ Detection summary generation failed: {e}") | |
return {} | |
def _analyze_question_specific_patterns(self, question: str, | |
frame_detections: List[List[Dict[str, Any]]], | |
bird_counts: List[int], | |
animal_counts: List[int]) -> Dict[str, Any]: | |
"""Analyze patterns specific to the question asked.""" | |
try: | |
analysis = { | |
'question_type': 'unknown', | |
'target_answer': None, | |
'confidence': 0.0, | |
'reasoning': [] | |
} | |
question_lower = question.lower() | |
# Detect question type and provide specific analysis | |
if 'bird' in question_lower and ('highest' in question_lower or 'maximum' in question_lower): | |
analysis['question_type'] = 'max_birds_simultaneous' | |
analysis['target_answer'] = max(bird_counts) if bird_counts else 0 | |
analysis['confidence'] = 0.9 if bird_counts else 0.1 | |
analysis['reasoning'].append(f"Maximum simultaneous birds detected: {analysis['target_answer']}") | |
# Find frames with maximum birds | |
max_bird_count = analysis['target_answer'] | |
max_frames = [i for i, count in enumerate(bird_counts) if count == max_bird_count] | |
analysis['reasoning'].append(f"Maximum occurred in {len(max_frames)} frame(s)") | |
elif 'animal' in question_lower and ('highest' in question_lower or 'maximum' in question_lower): | |
analysis['question_type'] = 'max_animals_simultaneous' | |
analysis['target_answer'] = max(animal_counts) if animal_counts else 0 | |
analysis['confidence'] = 0.9 if animal_counts else 0.1 | |
analysis['reasoning'].append(f"Maximum simultaneous animals detected: {analysis['target_answer']}") | |
elif 'species' in question_lower and ('highest' in question_lower or 'maximum' in question_lower): | |
analysis['question_type'] = 'max_species_simultaneous' | |
# For species counting, we need to count unique species per frame | |
max_species = 0 | |
for frame_dets in frame_detections: | |
unique_species = set() | |
for det in frame_dets: | |
species_type = det.get('species_type', 'unknown') | |
if species_type in ['bird', 'animal']: | |
class_name = det.get('class', 'unknown') | |
unique_species.add(class_name) | |
max_species = max(max_species, len(unique_species)) | |
analysis['target_answer'] = max_species | |
analysis['confidence'] = 0.8 if max_species > 0 else 0.1 | |
analysis['reasoning'].append(f"Maximum simultaneous species detected: {analysis['target_answer']}") | |
return analysis | |
except Exception as e: | |
logger.error(f"❌ Question-specific analysis failed: {e}") | |
return {'question_type': 'unknown', 'target_answer': None, 'confidence': 0.0} | |
def _create_analysis_report(self, video_url: str, metadata: Dict[str, Any], | |
detection_results: Dict[str, Any], | |
content_analysis: Dict[str, Any] = None, | |
question: str = None) -> Dict[str, Any]: | |
"""Create comprehensive analysis report.""" | |
try: | |
report = { | |
'success': True, | |
'video_url': video_url, | |
'question': question, | |
'analysis_timestamp': datetime.now().isoformat(), | |
'metadata': metadata, | |
'detection_results': detection_results, | |
'content_analysis': content_analysis, | |
'final_answer': None, | |
'confidence': 0.0, | |
'reasoning': [] | |
} | |
# Extract final answer from detection summary | |
summary = detection_results.get('summary', {}) | |
answer_analysis = summary.get('answer_analysis', {}) | |
if answer_analysis.get('target_answer') is not None: | |
report['final_answer'] = answer_analysis['target_answer'] | |
report['confidence'] = answer_analysis.get('confidence', 0.0) | |
report['reasoning'] = answer_analysis.get('reasoning', []) | |
else: | |
# Fallback to general analysis | |
if question and 'bird' in question.lower(): | |
report['final_answer'] = summary.get('max_simultaneous_birds', 0) | |
report['confidence'] = 0.7 | |
report['reasoning'] = [f"Maximum simultaneous birds detected: {report['final_answer']}"] | |
elif question and 'animal' in question.lower(): | |
report['final_answer'] = summary.get('max_simultaneous_animals', 0) | |
report['confidence'] = 0.7 | |
report['reasoning'] = [f"Maximum simultaneous animals detected: {report['final_answer']}"] | |
else: | |
report['final_answer'] = summary.get('max_simultaneous_objects', 0) | |
report['confidence'] = 0.5 | |
report['reasoning'] = [f"Maximum simultaneous objects detected: {report['final_answer']}"] | |
# Add analysis insights | |
insights = [] | |
if summary.get('total_frames_analyzed', 0) > 0: | |
insights.append(f"Analyzed {summary['total_frames_analyzed']} frames") | |
if summary.get('total_detections', 0) > 0: | |
insights.append(f"Total detections: {summary['total_detections']}") | |
if summary.get('species_counts'): | |
species_info = ", ".join([f"{k}: {v}" for k, v in summary['species_counts'].items()]) | |
insights.append(f"Species distribution: {species_info}") | |
report['insights'] = insights | |
logger.info("📊 Analysis report generated successfully") | |
return report | |
except Exception as e: | |
logger.error(f"❌ Failed to create analysis report: {e}") | |
return { | |
'success': False, | |
'error': f'Failed to create analysis report: {str(e)}' | |
} | |
def _cleanup_temp_files(self, video_path: str = None): | |
"""Clean up temporary files.""" | |
try: | |
if video_path and os.path.exists(video_path): | |
os.remove(video_path) | |
# Clean up temp directory if it exists and is empty | |
if os.path.exists(self.temp_dir): | |
try: | |
os.rmdir(self.temp_dir) | |
except OSError: | |
# Directory not empty, clean up individual files | |
shutil.rmtree(self.temp_dir, ignore_errors=True) | |
except Exception as e: | |
logger.warning(f"⚠️ Cleanup failed: {e}") | |
def get_capabilities(self) -> Dict[str, Any]: | |
"""Get video analyzer capabilities.""" | |
return { | |
'available': self.available, | |
'detection_engine_available': self.detection_engine is not None and self.detection_engine.available, | |
'content_analyzer_available': self.content_analyzer is not None and self.content_analyzer.available, | |
'supported_formats': ['YouTube URLs', 'MP4', 'WebM', 'MKV'], | |
'max_duration': 300, | |
'max_frames': self.max_frames, | |
'features': [ | |
'YouTube video downloading', | |
'Object detection and classification', | |
'Bird and animal species identification', | |
'Temporal object tracking', | |
'Simultaneous object counting', | |
'Content analysis and summarization', | |
'Question-specific analysis' | |
] | |
} | |
# AGNO Framework Integration Functions | |
def get_advanced_video_analysis_tools() -> List[AdvancedVideoAnalyzer]: | |
"""Get advanced video analysis tools for AGNO framework integration.""" | |
try: | |
analyzer = AdvancedVideoAnalyzer() | |
if analyzer.available: | |
return [analyzer] | |
else: | |
logger.warning("⚠️ Advanced video analyzer not available") | |
return [] | |
except Exception as e: | |
logger.error(f"❌ Failed to create advanced video analysis tools: {e}") | |
return [] | |
if __name__ == "__main__": | |
# Test the advanced video analyzer | |
analyzer = AdvancedVideoAnalyzer() | |
print(f"Video analyzer available: {analyzer.available}") | |
print(f"Capabilities: {json.dumps(analyzer.get_capabilities(), indent=2)}") | |
# Test with a sample YouTube video (if available) | |
test_url = "https://www.youtube.com/watch?v=L1vXCYZAYYM" | |
test_question = "What is the highest number of bird species to be on camera simultaneously?" | |
print(f"\nTesting with: {test_url}") | |
print(f"Question: {test_question}") | |
# Note: Actual testing would require running the analyzer | |
# result = analyzer.analyze_video(test_url, test_question) | |
# print(f"Result: {json.dumps(result, indent=2)}") |