MCP_Track3_Discover / get_transcripts_with_openai.py
RCaz's picture
movie bug resolved
ef6c3af
import os
import math
import tempfile
from pydub import AudioSegment
from langchain.schema import Document
from openai import OpenAI
from moviepy import *
from dotenv import load_dotenv
load_dotenv()
def get_langchain_Document_for_rag(video_path):
# Extract audio from video file
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as temp_audio_file:
video_clip = moviepy.editor.VideoFileClip(video_path)
video_clip.audio.write_audiofile(temp_audio_file.name, logger=None)
temp_audio_path = temp_audio_file.name
video_clip.close()
# Instantiate llm client
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
# Load extracted audio
audio = AudioSegment.from_file(temp_audio_path)
# Chunk audio for translation
translations = []
chunk_duration_ms = 5 * 60 * 1000 # 5 minutes
num_chunks = math.ceil(len(audio) / chunk_duration_ms)
for i in range(num_chunks):
start = i * chunk_duration_ms
end = min((i + 1) * chunk_duration_ms, len(audio))
chunk = audio[start:end]
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as chunk_file:
chunk.export(chunk_file.name, format="mp3")
chunk_file.seek(0)
with open(chunk_file.name, "rb") as f:
translation = client.audio.translations.create(
model="whisper-1", # or use your preferred model
file=f,
)
translations.append({
'chunk_id': i,
'start_time': start,
'end_time': end,
'transcript': translation.text,
})
os.unlink(chunk_file.name) # clean up chunk file
os.unlink(temp_audio_path) # clean up extracted audio file
# Create LangChain documents
langchain_documents = []
for data in translations:
content = f"Transcript: {data['transcript']}"
doc = Document(
page_content=content,
metadata={
"start_time": data['start_time'],
"end_time": data['end_time'],
"chunk_id": data['chunk_id']
}
)
langchain_documents.append(doc)
return langchain_documents