RCaz's picture
resolve transcript bug
9d315b3
import yt_dlp
import cv2
import os
from skimage.metrics import structural_similarity as ssim
from tqdm import tqdm
def download_video(url):
"""downlad video and audio from youtube url
Args:
url (str): youtube video url
Returns:
video_filename (str): path to the downloaded video file
audio_filename (str): path to the downloaded audio file
"""
# instanciate output path
output_path='/tmp'
if not os.path.exists(output_path):
os.mkdir(output_path)
# get cookies
export_cookies_path = "/tmp/exported_cookies.txt"
os.makedirs(os.path.dirname(export_cookies_path), exist_ok=True)
try:
ydl_opts_export_cookies = {
'cookiesfrombrowser': ('firefox',None,None,None),
'cookiefile': export_cookies_path,
'quiet': True,
}
print(f"Attempting to export cookies from Firefox to {export_cookies_path}...")
with yt_dlp.YoutubeDL(ydl_opts_export_cookies) as ydl:
# A dummy URL is often sufficient for cookie export
ydl.extract_info("https://www.youtube.com", download=False)
print("Cookies exported successfully (if Firefox was installed and logged in).")
except yt_dlp.utils.DownloadError as e:
print(f"Could not export cookies from browser: {e}")
print("Please ensure a supported browser is installed and logged in, or manually create a 'cookies.txt' file.")
# get video
ydl_opts_video = {
'format': 'worst[ext=mp4]',
'outtmpl': output_path+'/video/'+'%(title)s_video.%(ext)s',
'quiet': True
}
print('Downloading video...')
with yt_dlp.YoutubeDL(ydl_opts_video) as ydl:
info_dict = ydl.extract_info(url, download=True)
video_filename = ydl.prepare_filename(info_dict)
# get audio
audio_opts = {
'format': 'bestaudio[ext=m4a]',
'outtmpl': output_path+'/audio/'+'%(title)s.audio.%(ext)s',
'quiet': False,
'noplaylist': True,
}
print('Downloading audio...')
with yt_dlp.YoutubeDL(audio_opts) as ydl:
info_dict = ydl.extract_info(url, download=True)
audio_filename = ydl.prepare_filename(info_dict)
return {
"video_path": video_filename,
"audio_path": audio_filename,
}
def is_significantly_different(img1, img2, threshold=0.1):
"""Check if two images are significantly different using SSIM.
Args:
img1 (numpy.ndarray): First image.
img2 (numpy.ndarray): Second image.
threshold (float): SSIM threshold to determine significant difference.
Returns:
bool: True if images are significantly different, False otherwise.
"""
grayA = cv2.cvtColor(img1, cv2.COLOR_BGR2GRAY)
grayB = cv2.cvtColor(img2, cv2.COLOR_BGR2GRAY)
score, _ = ssim(grayA, grayB, full=True)
return score < threshold # Lower score means more different
def extract_keyframes(video_path, diff_threshold=0.4):
"""Extract key frames from a video based on significant differences.
Args:
video_path (str): Path to the input video file.
output_path (str): Directory to save the extracted key frames.
diff_threshold (float): SSIM threshold to determine significant difference.
"""
cap = cv2.VideoCapture(video_path)
frame_id = 0
saved_id = 0
success, prev_frame = cap.read()
if not success:
print("Failed to read video.")
return
output_path='/tmp/video/frames'
if not os.path.exists(output_path):
os.mkdir(output_path)
while True:
success, frame = cap.read()
if not success:
break
frame_id += 1
if is_significantly_different(prev_frame, frame, threshold=diff_threshold):
filename = os.path.join("/tmp/video/frames/",f"keyframe_{saved_id:04d}.jpg")
cv2.imwrite(filename, frame)
prev_frame = frame
saved_id += 1
print(f"frame{saved_id} saved")
cap.release()
print(f"Extracted {saved_id} key frames.")
return "success"
def extract_nfps_frames(video_path, nfps=30,diff_threshold=0.4):
"""Extract 1 frame per second from a video.
Args:
video_path (str): Path to the input video file.
"""
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
print("Failed to read video.")
return
output_path = '/tmp/video/frames'
os.makedirs(output_path, exist_ok=True)
fps = cap.get(cv2.CAP_PROP_FPS)
frame_interval = int(fps) * nfps # Capture one frame every n second
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
pbar = tqdm(total=total_frames, desc="Processing Frames")
frame_id = 0
saved_id = 0
success, prev_frame = cap.read()
all_frames_data=[]
from load_vision_model_locally import VideoAnalyzer
analyser = VideoAnalyzer()
while True:
success, frame = cap.read()
if not success:
break
if frame_id % frame_interval == 0 and is_significantly_different(prev_frame, frame, threshold=diff_threshold):
filename = os.path.join(output_path, f"frame_{saved_id:04d}.jpg")
cv2.imwrite(filename, frame)
prev_frame = frame
saved_id += 1
# append to a list that will constitute RAG Docuement
timestamp_ms = cap.get(cv2.CAP_PROP_POS_MSEC)
timestamp_sec = timestamp_ms / 1000.0
description = analyser.describe_frame(filename)
objects = analyser.detect_objects(filename)
frame_data = {
"frame_id": saved_id,
"timestamp_sec": timestamp_sec,
"description": description,
"detected_objects": objects,
"frame_path": filename # Optional: path to the saved frame
}
all_frames_data.append(frame_data)
print(5*"{*}\n",f"--> description {description}")
frame_id += 1
pbar.update(1)
cap.release()
print(f"Extracted {saved_id} frames (1 per second).")
return all_frames_data
from langchain.docstore.document import Document
def provide_video_RAG(all_frames_data):
# Assuming 'all_frames_data' is the list from the previous step
langchain_documents = []
for data in all_frames_data:
# Combine the analysis into a single string for the document content
content = f"Description: {data['description']}\nObjects Detected: {', '.join(data['detected_objects'])}"
# Create the LangChain Document
doc = Document(
page_content=content,
metadata={
"timestamp": data['timestamp_sec'],
"frame_id": data['frame_id']
}
)
langchain_documents.append(doc)
return langchain_documents
# Now 'langchain_documents' is ready to be indexed in a vector store for your RAG system