import yt_dlp import cv2 import os from skimage.metrics import structural_similarity as ssim from tqdm import tqdm def download_video(url): """downlad video and audio from youtube url Args: url (str): youtube video url Returns: video_filename (str): path to the downloaded video file audio_filename (str): path to the downloaded audio file """ # instanciate output path output_path='/tmp' if not os.path.exists(output_path): os.mkdir(output_path) # get cookies export_cookies_path = "/tmp/exported_cookies.txt" os.makedirs(os.path.dirname(export_cookies_path), exist_ok=True) try: ydl_opts_export_cookies = { 'cookiesfrombrowser': ('firefox',None,None,None), 'cookiefile': export_cookies_path, 'quiet': True, } print(f"Attempting to export cookies from Firefox to {export_cookies_path}...") with yt_dlp.YoutubeDL(ydl_opts_export_cookies) as ydl: # A dummy URL is often sufficient for cookie export ydl.extract_info("https://www.youtube.com", download=False) print("Cookies exported successfully (if Firefox was installed and logged in).") except yt_dlp.utils.DownloadError as e: print(f"Could not export cookies from browser: {e}") print("Please ensure a supported browser is installed and logged in, or manually create a 'cookies.txt' file.") # get video ydl_opts_video = { 'format': 'worst[ext=mp4]', 'outtmpl': output_path+'/video/'+'%(title)s_video.%(ext)s', 'quiet': True } print('Downloading video...') with yt_dlp.YoutubeDL(ydl_opts_video) as ydl: info_dict = ydl.extract_info(url, download=True) video_filename = ydl.prepare_filename(info_dict) # get audio audio_opts = { 'format': 'bestaudio[ext=m4a]', 'outtmpl': output_path+'/audio/'+'%(title)s.audio.%(ext)s', 'quiet': False, 'noplaylist': True, } print('Downloading audio...') with yt_dlp.YoutubeDL(audio_opts) as ydl: info_dict = ydl.extract_info(url, download=True) audio_filename = ydl.prepare_filename(info_dict) return { "video_path": video_filename, "audio_path": audio_filename, } def is_significantly_different(img1, img2, threshold=0.1): """Check if two images are significantly different using SSIM. Args: img1 (numpy.ndarray): First image. img2 (numpy.ndarray): Second image. threshold (float): SSIM threshold to determine significant difference. Returns: bool: True if images are significantly different, False otherwise. """ grayA = cv2.cvtColor(img1, cv2.COLOR_BGR2GRAY) grayB = cv2.cvtColor(img2, cv2.COLOR_BGR2GRAY) score, _ = ssim(grayA, grayB, full=True) return score < threshold # Lower score means more different def extract_keyframes(video_path, diff_threshold=0.4): """Extract key frames from a video based on significant differences. Args: video_path (str): Path to the input video file. output_path (str): Directory to save the extracted key frames. diff_threshold (float): SSIM threshold to determine significant difference. """ cap = cv2.VideoCapture(video_path) frame_id = 0 saved_id = 0 success, prev_frame = cap.read() if not success: print("Failed to read video.") return output_path='/tmp/video/frames' if not os.path.exists(output_path): os.mkdir(output_path) while True: success, frame = cap.read() if not success: break frame_id += 1 if is_significantly_different(prev_frame, frame, threshold=diff_threshold): filename = os.path.join("/tmp/video/frames/",f"keyframe_{saved_id:04d}.jpg") cv2.imwrite(filename, frame) prev_frame = frame saved_id += 1 print(f"frame{saved_id} saved") cap.release() print(f"Extracted {saved_id} key frames.") return "success" def extract_nfps_frames(video_path, nfps=30,diff_threshold=0.4): """Extract 1 frame per second from a video. Args: video_path (str): Path to the input video file. """ cap = cv2.VideoCapture(video_path) if not cap.isOpened(): print("Failed to read video.") return output_path = '/tmp/video/frames' os.makedirs(output_path, exist_ok=True) fps = cap.get(cv2.CAP_PROP_FPS) frame_interval = int(fps) * nfps # Capture one frame every n second total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) pbar = tqdm(total=total_frames, desc="Processing Frames") frame_id = 0 saved_id = 0 success, prev_frame = cap.read() all_frames_data=[] from load_vision_model_locally import VideoAnalyzer analyser = VideoAnalyzer() while True: success, frame = cap.read() if not success: break if frame_id % frame_interval == 0 and is_significantly_different(prev_frame, frame, threshold=diff_threshold): filename = os.path.join(output_path, f"frame_{saved_id:04d}.jpg") cv2.imwrite(filename, frame) prev_frame = frame saved_id += 1 # append to a list that will constitute RAG Docuement timestamp_ms = cap.get(cv2.CAP_PROP_POS_MSEC) timestamp_sec = timestamp_ms / 1000.0 description = analyser.describe_frame(filename) objects = analyser.detect_objects(filename) frame_data = { "frame_id": saved_id, "timestamp_sec": timestamp_sec, "description": description, "detected_objects": objects, "frame_path": filename # Optional: path to the saved frame } all_frames_data.append(frame_data) print(5*"{*}\n",f"--> description {description}") frame_id += 1 pbar.update(1) cap.release() print(f"Extracted {saved_id} frames (1 per second).") return all_frames_data from langchain.docstore.document import Document def provide_video_RAG(all_frames_data): # Assuming 'all_frames_data' is the list from the previous step langchain_documents = [] for data in all_frames_data: # Combine the analysis into a single string for the document content content = f"Description: {data['description']}\nObjects Detected: {', '.join(data['detected_objects'])}" # Create the LangChain Document doc = Document( page_content=content, metadata={ "timestamp": data['timestamp_sec'], "frame_id": data['frame_id'] } ) langchain_documents.append(doc) return langchain_documents # Now 'langchain_documents' is ready to be indexed in a vector store for your RAG system