File size: 14,062 Bytes
9a6a4dc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
#!/usr/bin/env python3
"""
Video Analysis Tool for GAIA Agent
Provides video frame extraction and visual analysis capabilities for YouTube videos.
Specifically designed to handle questions requiring visual analysis (e.g., counting objects).
"""

import os
import logging
import tempfile
import subprocess
from typing import Dict, Any, List, Optional, Union
from pathlib import Path
import requests
import re

try:
    import cv2
    import numpy as np
    from PIL import Image
    CV2_AVAILABLE = True
except ImportError:
    cv2 = None
    np = None
    Image = None
    CV2_AVAILABLE = False

try:
    import yt_dlp
    YT_DLP_AVAILABLE = True
except ImportError:
    YT_DLP_AVAILABLE = False

# Import existing multimodal tools
try:
    from agents.mistral_multimodal_agent import OpenSourceMultimodalTools
    MULTIMODAL_AVAILABLE = True
except ImportError:
    MULTIMODAL_AVAILABLE = False

logger = logging.getLogger(__name__)

class VideoAnalysisTool:
    """
    Video Analysis Tool for extracting frames and performing visual analysis.
    
    Capabilities:
    - Extract frames from YouTube videos
    - Analyze frames using multimodal image analysis
    - Count objects across multiple frames
    - Handle visual questions that require frame-by-frame analysis
    """
    
    def __init__(self):
        """Initialize the video analysis tool."""
        logger.info("🎬 Initializing Video Analysis Tool...")
        
        # Check dependencies
        self.cv2_available = CV2_AVAILABLE
        self.yt_dlp_available = YT_DLP_AVAILABLE
        self.multimodal_available = MULTIMODAL_AVAILABLE
        
        # Initialize multimodal tools if available
        self.multimodal_tools = None
        if self.multimodal_available:
            try:
                self.multimodal_tools = OpenSourceMultimodalTools()
                logger.info("βœ… Multimodal tools initialized")
            except Exception as e:
                logger.warning(f"⚠️ Multimodal tools initialization failed: {e}")
                self.multimodal_available = False
        
        # Log capabilities
        capabilities = []
        if self.cv2_available:
            capabilities.append("Frame extraction (OpenCV)")
        if self.yt_dlp_available:
            capabilities.append("YouTube download (yt-dlp)")
        if self.multimodal_available:
            capabilities.append("Image analysis (Multimodal)")
        
        logger.info(f"πŸ“Š Available capabilities: {', '.join(capabilities)}")
        
        if not any([self.cv2_available, self.yt_dlp_available]):
            logger.warning("⚠️ Limited functionality - install opencv-python and yt-dlp for full capabilities")
    
    def extract_video_id(self, youtube_url: str) -> Optional[str]:
        """Extract video ID from YouTube URL."""
        patterns = [
            r'(?:youtube\.com/watch\?v=|youtu\.be/|youtube\.com/embed/)([^&\n?#]+)',
            r'youtube\.com/watch\?.*v=([^&\n?#]+)'
        ]
        
        for pattern in patterns:
            match = re.search(pattern, youtube_url)
            if match:
                return match.group(1)
        
        return None
    
    def download_video(self, youtube_url: str, output_dir: str) -> Optional[str]:
        """
        Download YouTube video for frame extraction.
        
        Args:
            youtube_url: YouTube video URL
            output_dir: Directory to save the video
            
        Returns:
            Path to downloaded video file or None if failed
        """
        if not self.yt_dlp_available:
            logger.error("❌ yt-dlp not available for video download")
            return None
        
        try:
            video_id = self.extract_video_id(youtube_url)
            if not video_id:
                logger.error(f"❌ Could not extract video ID from URL: {youtube_url}")
                return None
            
            output_path = os.path.join(output_dir, f"{video_id}.%(ext)s")
            
            ydl_opts = {
                'format': 'best[height<=720]',  # Limit quality for faster processing
                'outtmpl': output_path,
                'quiet': True,
                'no_warnings': True,
            }
            
            with yt_dlp.YoutubeDL(ydl_opts) as ydl:
                ydl.download([youtube_url])
            
            # Find the downloaded file
            for file in os.listdir(output_dir):
                if file.startswith(video_id):
                    downloaded_path = os.path.join(output_dir, file)
                    logger.info(f"βœ… Video downloaded: {downloaded_path}")
                    return downloaded_path
            
            logger.error("❌ Downloaded video file not found")
            return None
            
        except Exception as e:
            logger.error(f"❌ Video download failed: {e}")
            return None
    
    def extract_frames(self, video_path: str, max_frames: int = 10, interval_seconds: float = 5.0) -> List[Any]:
        """
        Extract frames from video at regular intervals.
        
        Args:
            video_path: Path to video file
            max_frames: Maximum number of frames to extract
            interval_seconds: Interval between frames in seconds
            
        Returns:
            List of frame arrays
        """
        if not self.cv2_available:
            logger.error("❌ OpenCV not available for frame extraction")
            return []
        
        try:
            cap = cv2.VideoCapture(video_path)
            if not cap.isOpened():
                logger.error(f"❌ Could not open video: {video_path}")
                return []
            
            fps = cap.get(cv2.CAP_PROP_FPS)
            total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
            duration = total_frames / fps if fps > 0 else 0
            
            logger.info(f"πŸ“Ή Video info: {duration:.1f}s, {fps:.1f} FPS, {total_frames} frames")
            
            frames = []
            frame_interval = int(fps * interval_seconds) if fps > 0 else 30
            
            frame_count = 0
            extracted_count = 0
            
            while extracted_count < max_frames:
                ret, frame = cap.read()
                if not ret:
                    break
                
                if frame_count % frame_interval == 0:
                    # Convert BGR to RGB for PIL compatibility
                    frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                    frames.append(frame_rgb)
                    extracted_count += 1
                    logger.info(f"πŸ“Έ Extracted frame {extracted_count} at {frame_count/fps:.1f}s")
                
                frame_count += 1
            
            cap.release()
            logger.info(f"βœ… Extracted {len(frames)} frames from video")
            return frames
            
        except Exception as e:
            logger.error(f"❌ Frame extraction failed: {e}")
            return []
    
    def analyze_frame(self, frame: Any, question: str) -> str:
        """
        Analyze a single frame using multimodal image analysis.
        
        Args:
            frame: Frame array (RGB format)
            question: Question about the frame
            
        Returns:
            Analysis result
        """
        if not self.multimodal_available or not self.multimodal_tools:
            return "Error: Multimodal analysis not available"
        
        try:
            # Convert numpy array to PIL Image
            pil_image = Image.fromarray(frame)
            
            # Use multimodal tools for analysis
            result = self.multimodal_tools.analyze_image(pil_image, question)
            return result
            
        except Exception as e:
            logger.error(f"❌ Frame analysis failed: {e}")
            return f"Error analyzing frame: {e}"
    
    def analyze_video_for_objects(self, youtube_url: str, question: str, max_frames: int = 10) -> str:
        """
        Analyze YouTube video for object counting or visual questions.
        
        Args:
            youtube_url: YouTube video URL
            question: Question about the video (e.g., "count bird species")
            max_frames: Maximum frames to analyze
            
        Returns:
            Analysis result with object counts or visual information
        """
        logger.info(f"🎬 Starting video analysis for: {youtube_url}")
        logger.info(f"❓ Question: {question}")
        
        with tempfile.TemporaryDirectory() as temp_dir:
            # Step 1: Download video
            video_path = self.download_video(youtube_url, temp_dir)
            if not video_path:
                return "Error: Could not download video for analysis"
            
            # Step 2: Extract frames
            frames = self.extract_frames(video_path, max_frames=max_frames)
            if not frames:
                return "Error: Could not extract frames from video"
            
            # Step 3: Analyze each frame
            frame_analyses = []
            for i, frame in enumerate(frames):
                logger.info(f"πŸ” Analyzing frame {i+1}/{len(frames)}")
                analysis = self.analyze_frame(frame, question)
                frame_analyses.append({
                    'frame_number': i + 1,
                    'timestamp': f"{i * 5.0:.1f}s",  # Assuming 5s intervals
                    'analysis': analysis
                })
            
            # Step 4: Synthesize results
            return self._synthesize_video_analysis(frame_analyses, question)
    
    def _synthesize_video_analysis(self, frame_analyses: List[Dict], question: str) -> str:
        """
        Synthesize analysis results from multiple frames.
        
        Args:
            frame_analyses: List of frame analysis results
            question: Original question
            
        Returns:
            Synthesized answer
        """
        if not frame_analyses:
            return "No frames were analyzed"
        
        # For counting questions, extract numbers and find maximum
        if any(word in question.lower() for word in ['count', 'number', 'how many', 'species']):
            numbers_found = []
            
            for frame_analysis in frame_analyses:
                analysis_text = frame_analysis['analysis'].lower()
                
                # Extract numbers from analysis
                import re
                numbers = re.findall(r'\b(\d+)\b', analysis_text)
                for num_str in numbers:
                    try:
                        num = int(num_str)
                        if 1 <= num <= 20:  # Reasonable range for object counting
                            numbers_found.append(num)
                    except ValueError:
                        continue
            
            if numbers_found:
                max_count = max(numbers_found)
                logger.info(f"πŸ”’ Found counts across frames: {numbers_found}, max: {max_count}")
                
                # Build detailed response
                response_parts = [
                    f"Analysis of {len(frame_analyses)} video frames:",
                    ""
                ]
                
                for frame_analysis in frame_analyses:
                    response_parts.append(
                        f"Frame {frame_analysis['frame_number']} ({frame_analysis['timestamp']}): "
                        f"{frame_analysis['analysis'][:100]}..."
                    )
                
                response_parts.extend([
                    "",
                    f"Maximum count detected: {max_count}",
                    f"Answer: {max_count}"
                ])
                
                return "\n".join(response_parts)
        
        # For non-counting questions, provide comprehensive analysis
        response_parts = [
            f"Video analysis results ({len(frame_analyses)} frames):",
            ""
        ]
        
        for frame_analysis in frame_analyses:
            response_parts.append(
                f"Frame {frame_analysis['frame_number']} ({frame_analysis['timestamp']}): "
                f"{frame_analysis['analysis']}"
            )
        
        return "\n".join(response_parts)
    
    def get_capabilities(self) -> Dict[str, bool]:
        """Get current tool capabilities."""
        return {
            'video_download': self.yt_dlp_available,
            'frame_extraction': self.cv2_available,
            'image_analysis': self.multimodal_available,
            'full_video_analysis': all([
                self.yt_dlp_available,
                self.cv2_available,
                self.multimodal_available
            ])
        }

# AGNO Tool Integration
def analyze_youtube_video(url: str, question: str) -> str:
    """
    AGNO-compatible function for YouTube video analysis.
    
    Args:
        url: YouTube video URL
        question: Question about the video
        
    Returns:
        Analysis result
    """
    tool = VideoAnalysisTool()
    return tool.analyze_video_for_objects(url, question)

if __name__ == "__main__":
    # Test the video analysis tool
    tool = VideoAnalysisTool()
    
    print("🎬 Video Analysis Tool Test")
    print("=" * 50)
    print(f"Capabilities: {tool.get_capabilities()}")
    
    # Test with the bird species question
    test_url = "https://www.youtube.com/watch?v=LivXCYZAYYM"
    test_question = "What is the highest number of bird species to be on camera simultaneously?"
    
    print(f"\nπŸ§ͺ Testing with:")
    print(f"URL: {test_url}")
    print(f"Question: {test_question}")
    
    if tool.get_capabilities()['full_video_analysis']:
        result = tool.analyze_video_for_objects(test_url, test_question, max_frames=5)
        print(f"\nπŸ“Š Result:\n{result}")
    else:
        print("\n⚠️ Cannot run full test - missing dependencies")
        print("Install: pip install opencv-python yt-dlp")