Spaces:
Runtime error
Runtime error
import yt_dlp | |
import cv2 | |
import os | |
from skimage.metrics import structural_similarity as ssim | |
from tqdm import tqdm | |
def download_video(url): | |
"""downlad video and audio from youtube url | |
Args: | |
url (str): youtube video url | |
Returns: | |
video_filename (str): path to the downloaded video file | |
audio_filename (str): path to the downloaded audio file | |
""" | |
# instanciate output path | |
output_path='/tmp' | |
if not os.path.exists(output_path): | |
os.mkdir(output_path) | |
# get cookies | |
export_cookies_path = "/tmp/exported_cookies.txt" | |
os.makedirs(os.path.dirname(export_cookies_path), exist_ok=True) | |
try: | |
ydl_opts_export_cookies = { | |
'cookiesfrombrowser': ('firefox',None,None,None), | |
'cookiefile': export_cookies_path, | |
'quiet': True, | |
} | |
print(f"Attempting to export cookies from Firefox to {export_cookies_path}...") | |
with yt_dlp.YoutubeDL(ydl_opts_export_cookies) as ydl: | |
# A dummy URL is often sufficient for cookie export | |
ydl.extract_info("https://www.youtube.com", download=False) | |
print("Cookies exported successfully (if Firefox was installed and logged in).") | |
except yt_dlp.utils.DownloadError as e: | |
print(f"Could not export cookies from browser: {e}") | |
print("Please ensure a supported browser is installed and logged in, or manually create a 'cookies.txt' file.") | |
# get video | |
ydl_opts_video = { | |
'format': 'worst[ext=mp4]', | |
'outtmpl': output_path+'/video/'+'%(title)s_video.%(ext)s', | |
'quiet': True | |
} | |
print('Downloading video...') | |
with yt_dlp.YoutubeDL(ydl_opts_video) as ydl: | |
info_dict = ydl.extract_info(url, download=True) | |
video_filename = ydl.prepare_filename(info_dict) | |
# get audio | |
audio_opts = { | |
'format': 'bestaudio[ext=m4a]', | |
'outtmpl': output_path+'/audio/'+'%(title)s.audio.%(ext)s', | |
'quiet': False, | |
'noplaylist': True, | |
} | |
print('Downloading audio...') | |
with yt_dlp.YoutubeDL(audio_opts) as ydl: | |
info_dict = ydl.extract_info(url, download=True) | |
audio_filename = ydl.prepare_filename(info_dict) | |
return { | |
"video_path": video_filename, | |
"audio_path": audio_filename, | |
} | |
def is_significantly_different(img1, img2, threshold=0.1): | |
"""Check if two images are significantly different using SSIM. | |
Args: | |
img1 (numpy.ndarray): First image. | |
img2 (numpy.ndarray): Second image. | |
threshold (float): SSIM threshold to determine significant difference. | |
Returns: | |
bool: True if images are significantly different, False otherwise. | |
""" | |
grayA = cv2.cvtColor(img1, cv2.COLOR_BGR2GRAY) | |
grayB = cv2.cvtColor(img2, cv2.COLOR_BGR2GRAY) | |
score, _ = ssim(grayA, grayB, full=True) | |
return score < threshold # Lower score means more different | |
def extract_keyframes(video_path, diff_threshold=0.4): | |
"""Extract key frames from a video based on significant differences. | |
Args: | |
video_path (str): Path to the input video file. | |
output_path (str): Directory to save the extracted key frames. | |
diff_threshold (float): SSIM threshold to determine significant difference. | |
""" | |
cap = cv2.VideoCapture(video_path) | |
frame_id = 0 | |
saved_id = 0 | |
success, prev_frame = cap.read() | |
if not success: | |
print("Failed to read video.") | |
return | |
output_path='/tmp/video/frames' | |
if not os.path.exists(output_path): | |
os.mkdir(output_path) | |
while True: | |
success, frame = cap.read() | |
if not success: | |
break | |
frame_id += 1 | |
if is_significantly_different(prev_frame, frame, threshold=diff_threshold): | |
filename = os.path.join("/tmp/video/frames/",f"keyframe_{saved_id:04d}.jpg") | |
cv2.imwrite(filename, frame) | |
prev_frame = frame | |
saved_id += 1 | |
print(f"frame{saved_id} saved") | |
cap.release() | |
print(f"Extracted {saved_id} key frames.") | |
return "success" | |
def extract_nfps_frames(video_path, nfps=30,diff_threshold=0.4): | |
"""Extract 1 frame per second from a video. | |
Args: | |
video_path (str): Path to the input video file. | |
""" | |
cap = cv2.VideoCapture(video_path) | |
if not cap.isOpened(): | |
print("Failed to read video.") | |
return | |
output_path = '/tmp/video/frames' | |
os.makedirs(output_path, exist_ok=True) | |
fps = cap.get(cv2.CAP_PROP_FPS) | |
frame_interval = int(fps) * nfps # Capture one frame every n second | |
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
pbar = tqdm(total=total_frames, desc="Processing Frames") | |
frame_id = 0 | |
saved_id = 0 | |
success, prev_frame = cap.read() | |
all_frames_data=[] | |
from load_vision_model_locally import VideoAnalyzer | |
analyser = VideoAnalyzer() | |
while True: | |
success, frame = cap.read() | |
if not success: | |
break | |
if frame_id % frame_interval == 0 and is_significantly_different(prev_frame, frame, threshold=diff_threshold): | |
filename = os.path.join(output_path, f"frame_{saved_id:04d}.jpg") | |
cv2.imwrite(filename, frame) | |
prev_frame = frame | |
saved_id += 1 | |
# append to a list that will constitute RAG Docuement | |
timestamp_ms = cap.get(cv2.CAP_PROP_POS_MSEC) | |
timestamp_sec = timestamp_ms / 1000.0 | |
description = analyser.describe_frame(filename) | |
objects = analyser.detect_objects(filename) | |
frame_data = { | |
"frame_id": saved_id, | |
"timestamp_sec": timestamp_sec, | |
"description": description, | |
"detected_objects": objects, | |
"frame_path": filename # Optional: path to the saved frame | |
} | |
all_frames_data.append(frame_data) | |
print(5*"{*}\n",f"--> description {description}") | |
frame_id += 1 | |
pbar.update(1) | |
cap.release() | |
print(f"Extracted {saved_id} frames (1 per second).") | |
return all_frames_data | |
from langchain.docstore.document import Document | |
def provide_video_RAG(all_frames_data): | |
# Assuming 'all_frames_data' is the list from the previous step | |
langchain_documents = [] | |
for data in all_frames_data: | |
# Combine the analysis into a single string for the document content | |
content = f"Description: {data['description']}\nObjects Detected: {', '.join(data['detected_objects'])}" | |
# Create the LangChain Document | |
doc = Document( | |
page_content=content, | |
metadata={ | |
"timestamp": data['timestamp_sec'], | |
"frame_id": data['frame_id'] | |
} | |
) | |
langchain_documents.append(doc) | |
return langchain_documents | |
# Now 'langchain_documents' is ready to be indexed in a vector store for your RAG system |