Spaces:
Build error
Build error
# ./backend/app/youtube_parser.py | |
import os | |
import re | |
import contextlib | |
import asyncio | |
import json # json 모듈 추가 | |
import shutil | |
import sys # sys 모듈 추가 | |
from urllib.parse import urlparse, parse_qs | |
from loguru import logger | |
from yt_dlp import YoutubeDL | |
from proxy_manager import proxy_manager | |
from dotenv import load_dotenv | |
# 환경 변수 로드 (코드의 가장 위에 위치) | |
load_dotenv() | |
# --- Loguru 설정 시작 --- | |
# 기본 핸들러(콘솔 출력) 제거 | |
logger.remove() | |
# 콘솔에 INFO 레벨 이상 로그 출력 | |
logger.add(sys.stderr, level="INFO") | |
# --- Loguru 설정 끝 --- | |
def validate_youtube_url(url): | |
ydl_opts = { | |
'quiet': True | |
} | |
try: | |
with YoutubeDL(ydl_opts) as ydl: | |
ydl.extract_info(url, download=False) | |
return True | |
except: | |
return False | |
async def get_youtube_video_id(url: str) -> str | None: | |
""" | |
유튜브 URL에서 비디오 ID를 추출합니다. | |
표준 유튜브 URL (youtube.com/watch?v=..., youtu.be/...)을 처리합니다. | |
""" | |
parsed_url = urlparse(url) | |
# 표준 YouTube Watch 페이지 도메인 확인 | |
# www.youtube.com, m.youtube.com, youtube.com 등을 포함합니다. | |
# 'www.youtube.com', 'm.youtube.com', 'youtube.com'은 실제 YouTube 도메인을 의미합니다. | |
if parsed_url.hostname and any(domain in parsed_url.hostname for domain in ['www.youtube.com', 'm.youtube.com', 'youtube.com']): | |
query_params = parse_qs(parsed_url.query) | |
if 'v' in query_params: | |
return query_params['v'][0] | |
# 짧은 YouTube URL (youtu.be/VIDEO_ID) | |
elif parsed_url.hostname == 'youtu.be': | |
# path가 /VIDEO_ID 형태이므로 맨 앞의 '/'를 제거 | |
video_id = parsed_url.path.strip('/') | |
# 유튜브 비디오 ID는 보통 11자리이므로, 유효성 검사 | |
if len(video_id) == 11: | |
return video_id | |
logger.warning(f"알 수 없는 형식의 YouTube URL: {url}") | |
return None | |
def clean_caption_text(text: str) -> str: | |
# <00:00:00.000><c>...</c> 또는 </c> 같은 태그 제거 | |
cleaned_text = re.sub(r'<[^>]+>', '', text) | |
# 여러 공백을 하나의 공백으로 줄이고 양쪽 끝 공백 제거 | |
cleaned_text = re.sub(r'\s+', ' ', cleaned_text).strip() | |
return cleaned_text | |
async def get_transcript_with_timestamps(video_id: str) -> list[dict] | None: | |
logger.info(f"비디오 ID '{video_id}'에 대한 자막 가져오기 시도.") | |
processed_chunks = [] | |
# yt-dlp 옵션 설정 | |
ydl_opts = { | |
'writesubtitles': True, # 사용자가 업로드한 수동 자막 파일 쓰기 활성화 | |
'writeautomaticsub': True, # YouTube에서 자동으로 생성된 자막 파일 쓰기 활성화 | |
'subtitleslangs': ['ko', 'en'], # 다운로드할 자막 언어 목록 (한국어 우선, 없으면 영어) | |
'skip_download': True, # 동영상 자체는 다운로드하지 않고 자막만 다운로드 | |
'outtmpl': '%(id)s.%(language)s.%(ext)s', # 다운로드될 파일 이름 템플릿 (temp_dir 안에서 상대 경로로 저장됨) | |
'quiet': False, # 콘솔 출력 활성화 (디버깅용) | |
'no_warnings': False, # 경고 메시지 활성화 (디버깅용) | |
'extractor_args': { # 특정 extractor (예: 유튜브)에 대한 추가 인자 | |
'youtube': {'skip': ['dash']} # dash manifest 관련 오류 회피 시도 (유튜브 관련) | |
} | |
} | |
logger.info("yt-dlp에 프록시가 적용되지 않았습니다.") | |
temp_dir = "./temp_captions" | |
os.makedirs(temp_dir, exist_ok=True) | |
original_cwd = os.getcwd() | |
try: | |
with contextlib.chdir(temp_dir): # 임시 디렉토리로 작업 디렉토리 변경 | |
# outtmpl은 현재 chdir된 디렉토리 내의 상대 경로로 지정 | |
# yt-dlp가 파일을 temp_dir 안에 바로 생성하도록 함 | |
ydl_opts['outtmpl'] = '%(id)s.%(ext)s' | |
logger.debug(f"yt-dlp 실행 전 현재 작업 디렉토리: {os.getcwd()}") | |
logger.debug(f"yt-dlp 옵션: {ydl_opts}") | |
with YoutubeDL(ydl_opts) as ydl: | |
# download=True 설정으로 자막 다운로드 시도 | |
# 비디오 ID만 전달해도 yt-dlp가 알아서 처리합니다. | |
info_dict = await asyncio.to_thread(ydl.extract_info, video_id, download=True) | |
logger.debug(f"yt-dlp extract_info 결과 (자세한 정보는 debug_yt_dlp.log 파일 확인): {json.dumps(info_dict.get('requested_subtitles', 'No subtitles requested'), indent=2, ensure_ascii=False)}") | |
caption_file_path = None | |
# 1. info_dict에서 직접 자막 파일 경로를 찾으려는 시도 (가장 정확) | |
# yt-dlp 0.0.12 버전 이상에서는 _download_lock이 반환됨. info_dict에서 직접 파일을 찾아야 함 | |
if 'requested_subtitles' in info_dict and info_dict['requested_subtitles']: | |
for lang_code in ydl_opts['subtitleslangs']: | |
if lang_code in info_dict['requested_subtitles']: | |
sub_info = info_dict['requested_subtitles'][lang_code] | |
# 'filepath' 키가 없거나 None일 수 있으므로 확인 | |
if 'filepath' in sub_info and sub_info['filepath']: | |
# filepath는 이미 현재 작업 디렉토리(temp_dir) 기준으로 되어 있을 것 | |
caption_file_path = sub_info['filepath'] | |
logger.info(f"yt-dlp가 '{lang_code}' 자막 파일을 info_dict에서 찾았습니다: {caption_file_path}") | |
break # 찾았으면 루프 종료 | |
# 2. info_dict에서 찾지 못했을 경우, 폴백으로 임시 디렉토리를 탐색 | |
if not caption_file_path: | |
logger.debug(f"info_dict에서 자막 파일 경로를 찾지 못했습니다. 임시 디렉토리 스캔 시작.") | |
downloaded_files = [f for f in os.listdir('.') if f.startswith(video_id) and ('sub' in f or 'vtt' in f or 'json' in f or 'srt' in f)] | |
logger.debug(f"임시 디렉토리의 파일 목록: {downloaded_files}") | |
# 한국어 자막 우선 검색 (vtt, srt, json 순) | |
for ext in ['vtt', 'srt', 'json']: | |
ko_file = next((f for f in downloaded_files if f.endswith(f'.ko.{ext}')), None) | |
if ko_file: | |
caption_file_path = os.path.join(os.getcwd(), ko_file) # 현재 작업 디렉토리 기준으로 경로 조합 | |
logger.info(f"폴백: yt-dlp로 한국어 {ext.upper()} 자막 파일 '{ko_file}'을 다운로드했습니다.") | |
break | |
if not caption_file_path: | |
# 한국어 없으면 첫 번째 사용 가능한 자막 찾기 | |
for ext in ['vtt', 'srt', 'json']: | |
any_file = next((f for f in downloaded_files if f.endswith(f'.{ext}')), None) | |
if any_file: | |
caption_file_path = os.path.join(os.getcwd(), any_file) # 현재 작업 디렉토리 기준으로 경로 조합 | |
logger.warning(f"폴백: 한국어 자막이 없어 첫 번째 {ext.upper()} 자막 파일 '{any_file}'을 사용합니다.") | |
break | |
# 3. 자막 파일이 찾아졌으면 파싱 시작 | |
if caption_file_path and os.path.exists(caption_file_path): | |
if caption_file_path.endswith('.vtt'): | |
with open(caption_file_path, 'r', encoding='utf-8') as f: | |
vtt_content = f.read() | |
# WEBVTT 파싱 | |
segments = vtt_content.split('\n\n') | |
for segment in segments: | |
if '-->' in segment: | |
lines = segment.split('\n') | |
time_str = lines[0].strip() | |
text_content = ' '.join(lines[1:]).strip() | |
text_content = clean_caption_text(text_content) | |
try: | |
# VTT 시간은 HH:MM:SS.ms 형태로 제공되므로, 밀리초를 float로 처리 후 정수로 변환 | |
start_time_parts = time_str.split(' --> ')[0].split(':') | |
if len(start_time_parts) == 3: # HH:MM:SS.ms | |
hours = int(start_time_parts[0]) | |
minutes = int(start_time_parts[1]) | |
seconds = int(float(start_time_parts[2].split('.')[0])) | |
elif len(start_time_parts) == 2: # MM:SS.ms | |
hours = 0 | |
minutes = int(start_time_parts[0]) | |
seconds = int(float(start_time_parts[1].split('.')[0])) | |
else: | |
raise ValueError("Unsupported time format") | |
# HH:MM:SS 포맷으로 맞춤 | |
if hours > 0: | |
timestamp_str = f"{hours:02d}:{minutes:02d}:{seconds:02d}" | |
else: | |
timestamp_str = f"{minutes:02d}:{seconds:02d}" | |
processed_chunks.append({ | |
"text": text_content, | |
"timestamp": timestamp_str | |
}) | |
except Exception as e: | |
logger.warning(f"VTT 시간 파싱 오류: {time_str} - {e}") | |
logger.info(f"yt-dlp로 VTT 자막 {len(processed_chunks)}개 청크 처리 완료.") | |
elif caption_file_path.endswith('.json'): | |
# JSON 자막 파싱 (yt-dlp가 가끔 JSON 포맷으로도 다운로드함) | |
with open(caption_file_path, 'r', encoding='utf-8') as f: | |
json_content = json.load(f) | |
# yt-dlp의 JSON 자막 형식에 맞춰 파싱 (예시, 실제 구조는 info_dict를 통해 확인 필요) | |
for entry in json_content: | |
if 'start' in entry and 'text' in entry: | |
total_seconds = int(entry['start']) | |
hours = total_seconds // 3600 | |
minutes = (total_seconds % 3600) // 60 | |
seconds = total_seconds % 60 | |
text = entry['text'] | |
text = clean_caption_text(text) | |
if hours > 0: | |
timestamp_str = f"{hours:02d}:{minutes:02d}:{seconds:02d}" | |
else: | |
timestamp_str = f"{minutes:02d}:{seconds:02d}" | |
processed_chunks.append({ | |
"text": text, | |
"timestamp": timestamp_str | |
}) | |
logger.info(f"yt-dlp로 JSON 자막 {len(processed_chunks)}개 청크 처리 완료.") | |
elif caption_file_path.endswith('.srt'): | |
# SRT 자막 파싱 (간단한 예시, 실제로는 정규식 등으로 파싱) | |
with open(caption_file_path, 'r', encoding='utf-8') as f: | |
srt_content = f.read() | |
# SRT 파싱 로직 (매우 간단한 예시, 실제론 srt 라이브러리 사용 권장) | |
blocks = srt_content.strip().split('\n\n') | |
for block in blocks: | |
lines = block.split('\n') | |
# 최소한 순번, 시간, 텍스트가 있어야 함 | |
if len(lines) >= 3 and '-->' in lines[1]: | |
time_str = lines[1].strip() | |
text_content = ' '.join(lines[2:]).strip() | |
text_content = clean_caption_text(text_content) | |
try: | |
# SRT 시간은 HH:MM:SS,ms 형태로 제공 | |
start_time_parts = time_str.split(' --> ')[0].split(':') | |
seconds_ms = float(start_time_parts[-1].replace(',', '.')) # 밀리초 처리 | |
seconds = int(seconds_ms) | |
minutes = int(start_time_parts[-2]) | |
hours = int(start_time_parts[0]) if len(start_time_parts) == 3 else 0 | |
if hours > 0: | |
timestamp_str = f"{hours:02d}:{minutes:02d}:{seconds:02d}" | |
else: | |
timestamp_str = f"{minutes:02d}:{seconds:02d}" | |
processed_chunks.append({ | |
"text": text_content, | |
"timestamp": timestamp_str | |
}) | |
except Exception as e: | |
logger.warning(f"SRT 시간 파싱 오류: {time_str} - {e}") | |
logger.info(f"yt-dlp로 SRT 자막 {len(processed_chunks)}개 청크 처리 완료.") | |
else: | |
logger.warning(f"지원하지 않는 자막 파일 형식입니다: {caption_file_path}") | |
else: | |
logger.warning(f"비디오 ID '{video_id}'에 대한 yt-dlp 자막 파일을 찾을 수 없습니다. 최종 시도 경로: {caption_file_path}") | |
except Exception as e: | |
logger.error(f"yt-dlp 자막 추출 중 예기치 않은 오류 발생 for video ID '{video_id}': {type(e).__name__}: {e}") | |
return [] | |
finally: | |
if os.path.exists(temp_dir): | |
for file_name in os.listdir(temp_dir): | |
file_path = os.path.join(temp_dir, file_name) | |
try: | |
if os.path.isfile(file_path): | |
os.remove(file_path) | |
except Exception as e: | |
logger.error(f"임시 파일 삭제 실패 {file_path}: {e}") | |
os.rmdir(temp_dir) | |
logger.info(f"임시 자막 디렉토리 '{temp_dir}' 정리 완료.") | |
os.chdir(original_cwd) # 원래 작업 디렉토리로 돌아옴 | |
return processed_chunks | |
def parse_srt_content(srt_content: str) -> list[dict]: | |
chunks = [] | |
# 간단한 SRT 파싱 로직 (yt-dlp의 SRT 출력은 더 간단할 수 있음) | |
# 실제 프로덕션에서는 더 견고한 SRT 파서 라이브러리를 사용하는 것이 좋습니다. | |
import re | |
# SRT 패턴: 1\n00:00:01,000 --> 00:00:03,000\nHello World\n\n | |
blocks = re.split(r'\n\s*\n', srt_content.strip()) | |
for block in blocks: | |
lines = block.split('\n') | |
if len(lines) >= 3: | |
# 첫 번째 라인은 순번, 두 번째 라인은 시간, 나머지는 텍스트 | |
time_str = lines[1] | |
text = " ".join(lines[2:]).strip() | |
# 시간 형식: 00:00:01,000 --> 00:00:03,000 | |
time_parts = time_str.split(' --> ') | |
if len(time_parts) == 2: | |
start_time = time_parts[0].replace(',', '.') # yt-dlp의 VTT 파서와 일관성을 위해 쉼표를 점으로 변경 | |
chunks.append({"text": text, "timestamp": start_time}) | |
return chunks | |
def parse_vtt_content(vtt_content: str) -> list[dict]: | |
chunks = [] | |
lines = vtt_content.split('\n') | |
i = 0 | |
while i < len(lines): | |
line = lines[i].strip() | |
if '-->' in line: | |
# 시간 정보 라인 | |
time_str = line.split(' ')[0] # 예: 00:00:01.000 | |
# 다음 라인부터 텍스트 시작 | |
text_lines = [] | |
j = i + 1 | |
while j < len(lines) and lines[j].strip() != '': | |
text_lines.append(lines[j].strip()) | |
j += 1 | |
text = ' '.join(text_lines) | |
if text: | |
chunks.append({"text": text, "timestamp": time_str}) | |
i = j # 다음 자막 블록으로 이동 | |
i += 1 | |
return chunks | |
def parse_json_content(json_content: dict) -> list[dict]: | |
chunks = [] | |
for entry in json_content.get('events', []): | |
if 'segs' in entry: | |
text = "".join([seg.get('utf8', '') for seg in entry['segs']]) | |
# JSON3 형식은 밀리초까지 표현된 시작 시간이 't' 키에 있을 수 있음 | |
# yt-dlp가 생성하는 json3 파일 구조에 따라 유연하게 처리 필요 | |
start_ms = entry.get('t', 0) | |
# 밀리초를 HH:MM:SS.mmm 형식으로 변환 (yt-dlp의 VTT timestamp와 유사하게) | |
total_seconds = start_ms / 1000 | |
hours = int(total_seconds // 3600) | |
minutes = int((total_seconds % 3600) // 60) | |
seconds = total_seconds % 60 | |
timestamp = f"{hours:02d}:{minutes:02d}:{seconds:06.3f}" | |
chunks.append({"text": text, "timestamp": timestamp}) | |
return chunks | |
async def process_youtube_video_data(video_url: str) -> list[dict] | None: | |
video_id = await get_youtube_video_id(video_url) | |
if not video_id: | |
logger.error(f"유효하지 않은 YouTube URL: {video_url}") | |
return None | |
return await get_transcript_with_timestamps(video_id) |