Spaces:
Build error
Build error
File size: 18,083 Bytes
9b9d44e eda02a7 9b9d44e eda02a7 efb5a4e eda02a7 8659b57 eda02a7 8659b57 eda02a7 9b9d44e eda02a7 efb5a4e eda02a7 9b9d44e eda02a7 9b9d44e eda02a7 9b9d44e eda02a7 9b9d44e eda02a7 9b9d44e eda02a7 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 |
# ./backend/app/youtube_parser.py
import os
import re
import contextlib
import asyncio
import json # json 모듈 추가
import shutil
import sys # sys 모듈 추가
from urllib.parse import urlparse, parse_qs
from loguru import logger
from yt_dlp import YoutubeDL
from proxy_manager import proxy_manager
from dotenv import load_dotenv
# 환경 변수 로드 (코드의 가장 위에 위치)
load_dotenv()
# --- Loguru 설정 시작 ---
# 기본 핸들러(콘솔 출력) 제거
logger.remove()
# 콘솔에 INFO 레벨 이상 로그 출력
logger.add(sys.stderr, level="INFO")
# --- Loguru 설정 끝 ---
def validate_youtube_url(url):
ydl_opts = {
'quiet': True
}
try:
with YoutubeDL(ydl_opts) as ydl:
ydl.extract_info(url, download=False)
return True
except:
return False
async def get_youtube_video_id(url: str) -> str | None:
"""
유튜브 URL에서 비디오 ID를 추출합니다.
표준 유튜브 URL (youtube.com/watch?v=..., youtu.be/...)을 처리합니다.
"""
parsed_url = urlparse(url)
# 표준 YouTube Watch 페이지 도메인 확인
# www.youtube.com, m.youtube.com, youtube.com 등을 포함합니다.
# 'www.youtube.com', 'm.youtube.com', 'youtube.com'은 실제 YouTube 도메인을 의미합니다.
if parsed_url.hostname and any(domain in parsed_url.hostname for domain in ['www.youtube.com', 'm.youtube.com', 'youtube.com']):
query_params = parse_qs(parsed_url.query)
if 'v' in query_params:
return query_params['v'][0]
# 짧은 YouTube URL (youtu.be/VIDEO_ID)
elif parsed_url.hostname == 'youtu.be':
# path가 /VIDEO_ID 형태이므로 맨 앞의 '/'를 제거
video_id = parsed_url.path.strip('/')
# 유튜브 비디오 ID는 보통 11자리이므로, 유효성 검사
if len(video_id) == 11:
return video_id
logger.warning(f"알 수 없는 형식의 YouTube URL: {url}")
return None
def clean_caption_text(text: str) -> str:
# <00:00:00.000><c>...</c> 또는 </c> 같은 태그 제거
cleaned_text = re.sub(r'<[^>]+>', '', text)
# 여러 공백을 하나의 공백으로 줄이고 양쪽 끝 공백 제거
cleaned_text = re.sub(r'\s+', ' ', cleaned_text).strip()
return cleaned_text
async def get_transcript_with_timestamps(video_id: str) -> list[dict] | None:
logger.info(f"비디오 ID '{video_id}'에 대한 자막 가져오기 시도.")
processed_chunks = []
# yt-dlp 옵션 설정
ydl_opts = {
'writesubtitles': True, # 사용자가 업로드한 수동 자막 파일 쓰기 활성화
'writeautomaticsub': True, # YouTube에서 자동으로 생성된 자막 파일 쓰기 활성화
'subtitleslangs': ['ko', 'en'], # 다운로드할 자막 언어 목록 (한국어 우선, 없으면 영어)
'skip_download': True, # 동영상 자체는 다운로드하지 않고 자막만 다운로드
'outtmpl': '%(id)s.%(language)s.%(ext)s', # 다운로드될 파일 이름 템플릿 (temp_dir 안에서 상대 경로로 저장됨)
'quiet': False, # 콘솔 출력 활성화 (디버깅용)
'no_warnings': False, # 경고 메시지 활성화 (디버깅용)
'extractor_args': { # 특정 extractor (예: 유튜브)에 대한 추가 인자
'youtube': {'skip': ['dash']} # dash manifest 관련 오류 회피 시도 (유튜브 관련)
}
}
logger.info("yt-dlp에 프록시가 적용되지 않았습니다.")
temp_dir = "./temp_captions"
os.makedirs(temp_dir, exist_ok=True)
original_cwd = os.getcwd()
try:
with contextlib.chdir(temp_dir): # 임시 디렉토리로 작업 디렉토리 변경
# outtmpl은 현재 chdir된 디렉토리 내의 상대 경로로 지정
# yt-dlp가 파일을 temp_dir 안에 바로 생성하도록 함
ydl_opts['outtmpl'] = '%(id)s.%(ext)s'
logger.debug(f"yt-dlp 실행 전 현재 작업 디렉토리: {os.getcwd()}")
logger.debug(f"yt-dlp 옵션: {ydl_opts}")
with YoutubeDL(ydl_opts) as ydl:
# download=True 설정으로 자막 다운로드 시도
# 비디오 ID만 전달해도 yt-dlp가 알아서 처리합니다.
info_dict = await asyncio.to_thread(ydl.extract_info, video_id, download=True)
logger.debug(f"yt-dlp extract_info 결과 (자세한 정보는 debug_yt_dlp.log 파일 확인): {json.dumps(info_dict.get('requested_subtitles', 'No subtitles requested'), indent=2, ensure_ascii=False)}")
caption_file_path = None
# 1. info_dict에서 직접 자막 파일 경로를 찾으려는 시도 (가장 정확)
# yt-dlp 0.0.12 버전 이상에서는 _download_lock이 반환됨. info_dict에서 직접 파일을 찾아야 함
if 'requested_subtitles' in info_dict and info_dict['requested_subtitles']:
for lang_code in ydl_opts['subtitleslangs']:
if lang_code in info_dict['requested_subtitles']:
sub_info = info_dict['requested_subtitles'][lang_code]
# 'filepath' 키가 없거나 None일 수 있으므로 확인
if 'filepath' in sub_info and sub_info['filepath']:
# filepath는 이미 현재 작업 디렉토리(temp_dir) 기준으로 되어 있을 것
caption_file_path = sub_info['filepath']
logger.info(f"yt-dlp가 '{lang_code}' 자막 파일을 info_dict에서 찾았습니다: {caption_file_path}")
break # 찾았으면 루프 종료
# 2. info_dict에서 찾지 못했을 경우, 폴백으로 임시 디렉토리를 탐색
if not caption_file_path:
logger.debug(f"info_dict에서 자막 파일 경로를 찾지 못했습니다. 임시 디렉토리 스캔 시작.")
downloaded_files = [f for f in os.listdir('.') if f.startswith(video_id) and ('sub' in f or 'vtt' in f or 'json' in f or 'srt' in f)]
logger.debug(f"임시 디렉토리의 파일 목록: {downloaded_files}")
# 한국어 자막 우선 검색 (vtt, srt, json 순)
for ext in ['vtt', 'srt', 'json']:
ko_file = next((f for f in downloaded_files if f.endswith(f'.ko.{ext}')), None)
if ko_file:
caption_file_path = os.path.join(os.getcwd(), ko_file) # 현재 작업 디렉토리 기준으로 경로 조합
logger.info(f"폴백: yt-dlp로 한국어 {ext.upper()} 자막 파일 '{ko_file}'을 다운로드했습니다.")
break
if not caption_file_path:
# 한국어 없으면 첫 번째 사용 가능한 자막 찾기
for ext in ['vtt', 'srt', 'json']:
any_file = next((f for f in downloaded_files if f.endswith(f'.{ext}')), None)
if any_file:
caption_file_path = os.path.join(os.getcwd(), any_file) # 현재 작업 디렉토리 기준으로 경로 조합
logger.warning(f"폴백: 한국어 자막이 없어 첫 번째 {ext.upper()} 자막 파일 '{any_file}'을 사용합니다.")
break
# 3. 자막 파일이 찾아졌으면 파싱 시작
if caption_file_path and os.path.exists(caption_file_path):
if caption_file_path.endswith('.vtt'):
with open(caption_file_path, 'r', encoding='utf-8') as f:
vtt_content = f.read()
# WEBVTT 파싱
segments = vtt_content.split('\n\n')
for segment in segments:
if '-->' in segment:
lines = segment.split('\n')
time_str = lines[0].strip()
text_content = ' '.join(lines[1:]).strip()
text_content = clean_caption_text(text_content)
try:
# VTT 시간은 HH:MM:SS.ms 형태로 제공되므로, 밀리초를 float로 처리 후 정수로 변환
start_time_parts = time_str.split(' --> ')[0].split(':')
if len(start_time_parts) == 3: # HH:MM:SS.ms
hours = int(start_time_parts[0])
minutes = int(start_time_parts[1])
seconds = int(float(start_time_parts[2].split('.')[0]))
elif len(start_time_parts) == 2: # MM:SS.ms
hours = 0
minutes = int(start_time_parts[0])
seconds = int(float(start_time_parts[1].split('.')[0]))
else:
raise ValueError("Unsupported time format")
# HH:MM:SS 포맷으로 맞춤
if hours > 0:
timestamp_str = f"{hours:02d}:{minutes:02d}:{seconds:02d}"
else:
timestamp_str = f"{minutes:02d}:{seconds:02d}"
processed_chunks.append({
"text": text_content,
"timestamp": timestamp_str
})
except Exception as e:
logger.warning(f"VTT 시간 파싱 오류: {time_str} - {e}")
logger.info(f"yt-dlp로 VTT 자막 {len(processed_chunks)}개 청크 처리 완료.")
elif caption_file_path.endswith('.json'):
# JSON 자막 파싱 (yt-dlp가 가끔 JSON 포맷으로도 다운로드함)
with open(caption_file_path, 'r', encoding='utf-8') as f:
json_content = json.load(f)
# yt-dlp의 JSON 자막 형식에 맞춰 파싱 (예시, 실제 구조는 info_dict를 통해 확인 필요)
for entry in json_content:
if 'start' in entry and 'text' in entry:
total_seconds = int(entry['start'])
hours = total_seconds // 3600
minutes = (total_seconds % 3600) // 60
seconds = total_seconds % 60
text = entry['text']
text = clean_caption_text(text)
if hours > 0:
timestamp_str = f"{hours:02d}:{minutes:02d}:{seconds:02d}"
else:
timestamp_str = f"{minutes:02d}:{seconds:02d}"
processed_chunks.append({
"text": text,
"timestamp": timestamp_str
})
logger.info(f"yt-dlp로 JSON 자막 {len(processed_chunks)}개 청크 처리 완료.")
elif caption_file_path.endswith('.srt'):
# SRT 자막 파싱 (간단한 예시, 실제로는 정규식 등으로 파싱)
with open(caption_file_path, 'r', encoding='utf-8') as f:
srt_content = f.read()
# SRT 파싱 로직 (매우 간단한 예시, 실제론 srt 라이브러리 사용 권장)
blocks = srt_content.strip().split('\n\n')
for block in blocks:
lines = block.split('\n')
# 최소한 순번, 시간, 텍스트가 있어야 함
if len(lines) >= 3 and '-->' in lines[1]:
time_str = lines[1].strip()
text_content = ' '.join(lines[2:]).strip()
text_content = clean_caption_text(text_content)
try:
# SRT 시간은 HH:MM:SS,ms 형태로 제공
start_time_parts = time_str.split(' --> ')[0].split(':')
seconds_ms = float(start_time_parts[-1].replace(',', '.')) # 밀리초 처리
seconds = int(seconds_ms)
minutes = int(start_time_parts[-2])
hours = int(start_time_parts[0]) if len(start_time_parts) == 3 else 0
if hours > 0:
timestamp_str = f"{hours:02d}:{minutes:02d}:{seconds:02d}"
else:
timestamp_str = f"{minutes:02d}:{seconds:02d}"
processed_chunks.append({
"text": text_content,
"timestamp": timestamp_str
})
except Exception as e:
logger.warning(f"SRT 시간 파싱 오류: {time_str} - {e}")
logger.info(f"yt-dlp로 SRT 자막 {len(processed_chunks)}개 청크 처리 완료.")
else:
logger.warning(f"지원하지 않는 자막 파일 형식입니다: {caption_file_path}")
else:
logger.warning(f"비디오 ID '{video_id}'에 대한 yt-dlp 자막 파일을 찾을 수 없습니다. 최종 시도 경로: {caption_file_path}")
except Exception as e:
logger.error(f"yt-dlp 자막 추출 중 예기치 않은 오류 발생 for video ID '{video_id}': {type(e).__name__}: {e}")
return []
finally:
if os.path.exists(temp_dir):
for file_name in os.listdir(temp_dir):
file_path = os.path.join(temp_dir, file_name)
try:
if os.path.isfile(file_path):
os.remove(file_path)
except Exception as e:
logger.error(f"임시 파일 삭제 실패 {file_path}: {e}")
os.rmdir(temp_dir)
logger.info(f"임시 자막 디렉토리 '{temp_dir}' 정리 완료.")
os.chdir(original_cwd) # 원래 작업 디렉토리로 돌아옴
return processed_chunks
def parse_srt_content(srt_content: str) -> list[dict]:
chunks = []
# 간단한 SRT 파싱 로직 (yt-dlp의 SRT 출력은 더 간단할 수 있음)
# 실제 프로덕션에서는 더 견고한 SRT 파서 라이브러리를 사용하는 것이 좋습니다.
import re
# SRT 패턴: 1\n00:00:01,000 --> 00:00:03,000\nHello World\n\n
blocks = re.split(r'\n\s*\n', srt_content.strip())
for block in blocks:
lines = block.split('\n')
if len(lines) >= 3:
# 첫 번째 라인은 순번, 두 번째 라인은 시간, 나머지는 텍스트
time_str = lines[1]
text = " ".join(lines[2:]).strip()
# 시간 형식: 00:00:01,000 --> 00:00:03,000
time_parts = time_str.split(' --> ')
if len(time_parts) == 2:
start_time = time_parts[0].replace(',', '.') # yt-dlp의 VTT 파서와 일관성을 위해 쉼표를 점으로 변경
chunks.append({"text": text, "timestamp": start_time})
return chunks
def parse_vtt_content(vtt_content: str) -> list[dict]:
chunks = []
lines = vtt_content.split('\n')
i = 0
while i < len(lines):
line = lines[i].strip()
if '-->' in line:
# 시간 정보 라인
time_str = line.split(' ')[0] # 예: 00:00:01.000
# 다음 라인부터 텍스트 시작
text_lines = []
j = i + 1
while j < len(lines) and lines[j].strip() != '':
text_lines.append(lines[j].strip())
j += 1
text = ' '.join(text_lines)
if text:
chunks.append({"text": text, "timestamp": time_str})
i = j # 다음 자막 블록으로 이동
i += 1
return chunks
def parse_json_content(json_content: dict) -> list[dict]:
chunks = []
for entry in json_content.get('events', []):
if 'segs' in entry:
text = "".join([seg.get('utf8', '') for seg in entry['segs']])
# JSON3 형식은 밀리초까지 표현된 시작 시간이 't' 키에 있을 수 있음
# yt-dlp가 생성하는 json3 파일 구조에 따라 유연하게 처리 필요
start_ms = entry.get('t', 0)
# 밀리초를 HH:MM:SS.mmm 형식으로 변환 (yt-dlp의 VTT timestamp와 유사하게)
total_seconds = start_ms / 1000
hours = int(total_seconds // 3600)
minutes = int((total_seconds % 3600) // 60)
seconds = total_seconds % 60
timestamp = f"{hours:02d}:{minutes:02d}:{seconds:06.3f}"
chunks.append({"text": text, "timestamp": timestamp})
return chunks
async def process_youtube_video_data(video_url: str) -> list[dict] | None:
video_id = await get_youtube_video_id(video_url)
if not video_id:
logger.error(f"유효하지 않은 YouTube URL: {video_url}")
return None
return await get_transcript_with_timestamps(video_id) |