File size: 17,929 Bytes
eda02a7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8659b57
 
eda02a7
 
8659b57
 
 
 
 
 
 
 
 
 
 
eda02a7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
import os
import contextlib
import asyncio
import json # json 모듈 추가
import shutil
import sys # sys 모듈 추가
from urllib.parse import urlparse, parse_qs

from loguru import logger
from yt_dlp import YoutubeDL
from proxy_manager import get_proxy_url
from dotenv import load_dotenv

# 환경 변수 로드 (코드의 가장 위에 위치)
load_dotenv()

# --- Loguru 설정 시작 ---
# 기본 핸들러(콘솔 출력) 제거
logger.remove()
# 콘솔에 INFO 레벨 이상 로그 출력
logger.add(sys.stderr, level="INFO") 
# --- Loguru 설정 끝 ---

def validate_youtube_url(url):
    ydl_opts = {
        'quiet': True
    }
    try:
        with YoutubeDL(ydl_opts) as ydl:
            ydl.extract_info(url, download=False)
        return True
    except:
        return False
        
async def get_youtube_video_id(url: str) -> str | None:
    """
    유튜브 URL에서 비디오 ID를 추출합니다.
    표준 유튜브 URL (youtube.com/watch?v=..., youtu.be/...)을 처리합니다.
    """
    parsed_url = urlparse(url)
    
    # 표준 YouTube Watch 페이지 도메인 확인
    # www.youtube.com, m.youtube.com, youtube.com 등을 포함합니다.
    # 'www.youtube.com', 'm.youtube.com', 'youtube.com'은 실제 YouTube 도메인을 의미합니다.
    if parsed_url.hostname and any(domain in parsed_url.hostname for domain in ['www.youtube.com', 'm.youtube.com', 'youtube.com']):
        query_params = parse_qs(parsed_url.query)
        if 'v' in query_params:
            return query_params['v'][0]
    
    # 짧은 YouTube URL (youtu.be/VIDEO_ID)
    elif parsed_url.hostname == 'youtu.be':
        # path가 /VIDEO_ID 형태이므로 맨 앞의 '/'를 제거
        video_id = parsed_url.path.strip('/')
        # 유튜브 비디오 ID는 보통 11자리이므로, 유효성 검사
        if len(video_id) == 11:
            return video_id
        
    logger.warning(f"알 수 없는 형식의 YouTube URL: {url}")
    return None
    
async def get_transcript_with_timestamps(video_id: str) -> list[dict] | None:
    logger.info(f"비디오 ID '{video_id}'에 대한 자막 가져오기 시도.")

    processed_chunks = []
    proxy_address = await get_proxy_url()

    # yt-dlp 옵션 설정
    ydl_opts = {
        'writesubtitles': True,          # 사용자가 업로드한 수동 자막 파일 쓰기 활성화
        'writeautomaticsub': True,       # YouTube에서 자동으로 생성된 자막 파일 쓰기 활성화
        'subtitleslangs': ['ko', 'en'],  # 다운로드할 자막 언어 목록 (한국어 우선, 없으면 영어)
        'skip_download': True,           # 동영상 자체는 다운로드하지 않고 자막만 다운로드
        'outtmpl': '%(id)s.%(language)s.%(ext)s', # 다운로드될 파일 이름 템플릿 (temp_dir 안에서 상대 경로로 저장됨)
        'quiet': False,                  # 콘솔 출력 활성화 (디버깅용)
        'no_warnings': False,            # 경고 메시지 활성화 (디버깅용)
        'extractor_args': {              # 특정 extractor (예: 유튜브)에 대한 추가 인자
            'youtube': {'skip': ['dash']} # dash manifest 관련 오류 회피 시도 (유튜브 관련)
        }
        # 프록시가 필요한 경우, 'proxy': 'http://your.proxy.com:port' 형태로 여기에 추가됩니다.
    }

    if proxy_address:
        ydl_opts['proxy'] = proxy_address
        logger.info(f"yt-dlp에 프록시 적용: {proxy_address}")
    else:
        logger.info("yt-dlp에 프록시가 적용되지 않았습니다.")

    temp_dir = "./temp_captions"
    os.makedirs(temp_dir, exist_ok=True)
    original_cwd = os.getcwd()

    try:
        with contextlib.chdir(temp_dir): # 임시 디렉토리로 작업 디렉토리 변경
            # outtmpl은 현재 chdir된 디렉토리 내의 상대 경로로 지정
            # yt-dlp가 파일을 temp_dir 안에 바로 생성하도록 함
            ydl_opts['outtmpl'] = '%(id)s.%(ext)s' 
            
            logger.debug(f"yt-dlp 실행 전 현재 작업 디렉토리: {os.getcwd()}")
            logger.debug(f"yt-dlp 옵션: {ydl_opts}")
            
            with YoutubeDL(ydl_opts) as ydl:
                # download=True 설정으로 자막 다운로드 시도
                # 비디오 ID만 전달해도 yt-dlp가 알아서 처리합니다.
                info_dict = await asyncio.to_thread(ydl.extract_info, video_id, download=True)
                
            logger.debug(f"yt-dlp extract_info 결과 (자세한 정보는 debug_yt_dlp.log 파일 확인): {json.dumps(info_dict.get('requested_subtitles', 'No subtitles requested'), indent=2, ensure_ascii=False)}") 
            
            caption_file_path = None
            
            # 1. info_dict에서 직접 자막 파일 경로를 찾으려는 시도 (가장 정확)
            # yt-dlp 0.0.12 버전 이상에서는 _download_lock이 반환됨. info_dict에서 직접 파일을 찾아야 함
            if 'requested_subtitles' in info_dict and info_dict['requested_subtitles']:
                for lang_code in ydl_opts['subtitleslangs']:
                    if lang_code in info_dict['requested_subtitles']:
                        sub_info = info_dict['requested_subtitles'][lang_code]
                        # 'filepath' 키가 없거나 None일 수 있으므로 확인
                        if 'filepath' in sub_info and sub_info['filepath']:
                            # filepath는 이미 현재 작업 디렉토리(temp_dir) 기준으로 되어 있을 것
                            caption_file_path = sub_info['filepath']
                            logger.info(f"yt-dlp가 '{lang_code}' 자막 파일을 info_dict에서 찾았습니다: {caption_file_path}")
                            break # 찾았으면 루프 종료
            
            # 2. info_dict에서 찾지 못했을 경우, 폴백으로 임시 디렉토리를 탐색
            if not caption_file_path:
                logger.debug(f"info_dict에서 자막 파일 경로를 찾지 못했습니다. 임시 디렉토리 스캔 시작.")
                downloaded_files = [f for f in os.listdir('.') if f.startswith(video_id) and ('sub' in f or 'vtt' in f or 'json' in f or 'srt' in f)]
                logger.debug(f"임시 디렉토리의 파일 목록: {downloaded_files}")
                
                # 한국어 자막 우선 검색 (vtt, srt, json 순)
                for ext in ['vtt', 'srt', 'json']:
                    ko_file = next((f for f in downloaded_files if f.endswith(f'.ko.{ext}')), None)
                    if ko_file:
                        caption_file_path = os.path.join(os.getcwd(), ko_file) # 현재 작업 디렉토리 기준으로 경로 조합
                        logger.info(f"폴백: yt-dlp로 한국어 {ext.upper()} 자막 파일 '{ko_file}'을 다운로드했습니다.")
                        break
                
                if not caption_file_path:
                    # 한국어 없으면 첫 번째 사용 가능한 자막 찾기
                    for ext in ['vtt', 'srt', 'json']:
                        any_file = next((f for f in downloaded_files if f.endswith(f'.{ext}')), None)
                        if any_file:
                            caption_file_path = os.path.join(os.getcwd(), any_file) # 현재 작업 디렉토리 기준으로 경로 조합
                            logger.warning(f"폴백: 한국어 자막이 없어 첫 번째 {ext.upper()} 자막 파일 '{any_file}'을 사용합니다.")
                            break

            # 3. 자막 파일이 찾아졌으면 파싱 시작
            if caption_file_path and os.path.exists(caption_file_path):
                # VTT, SRT, JSON 등 다양한 자막 파일 형식에 대한 파싱 로직 분기
                if caption_file_path.endswith('.vtt'):
                    with open(caption_file_path, 'r', encoding='utf-8') as f:
                        vtt_content = f.read()
                    
                    # WEBVTT 파싱
                    segments = vtt_content.split('\n\n')
                    for segment in segments:
                        if '-->' in segment: # 시간 정보 포함 세그먼트
                            lines = segment.split('\n')
                            time_str = lines[0].strip()
                            text_content = ' '.join(lines[1:]).strip()
                            
                            try:
                                # VTT 시간은 HH:MM:SS.ms 형태로 제공되므로, 밀리초를 float로 처리 후 정수로 변환
                                start_time_parts = time_str.split(' --> ')[0].split(':')
                                if len(start_time_parts) == 3: # HH:MM:SS.ms
                                    hours = int(start_time_parts[0])
                                    minutes = int(start_time_parts[1])
                                    seconds = int(float(start_time_parts[2].split('.')[0]))
                                elif len(start_time_parts) == 2: # MM:SS.ms
                                    hours = 0
                                    minutes = int(start_time_parts[0])
                                    seconds = int(float(start_time_parts[1].split('.')[0]))
                                else:
                                    raise ValueError("Unsupported time format")

                                # HH:MM:SS 포맷으로 맞춤
                                if hours > 0:
                                    timestamp_str = f"{hours:02d}:{minutes:02d}:{seconds:02d}"
                                else:
                                    timestamp_str = f"{minutes:02d}:{seconds:02d}"
                                    
                                processed_chunks.append({
                                    "text": text_content,
                                    "timestamp": timestamp_str
                                })
                            except Exception as e:
                                logger.warning(f"VTT 시간 파싱 오류: {time_str} - {e}")
                    
                    logger.info(f"yt-dlp로 VTT 자막 {len(processed_chunks)}개 청크 처리 완료.")
                
                elif caption_file_path.endswith('.json'):
                    # JSON 자막 파싱 (yt-dlp가 가끔 JSON 포맷으로도 다운로드함)
                    with open(caption_file_path, 'r', encoding='utf-8') as f:
                        json_content = json.load(f)
                    
                    # yt-dlp의 JSON 자막 형식에 맞춰 파싱 (예시, 실제 구조는 info_dict를 통해 확인 필요)
                    for entry in json_content:
                        if 'start' in entry and 'text' in entry:
                            total_seconds = int(entry['start'])
                            hours = total_seconds // 3600
                            minutes = (total_seconds % 3600) // 60
                            seconds = total_seconds % 60

                            if hours > 0:
                                timestamp_str = f"{hours:02d}:{minutes:02d}:{seconds:02d}"
                            else:
                                timestamp_str = f"{minutes:02d}:{seconds:02d}"
                                
                            processed_chunks.append({
                                "text": entry['text'],
                                "timestamp": timestamp_str
                            })
                    logger.info(f"yt-dlp로 JSON 자막 {len(processed_chunks)}개 청크 처리 완료.")

                elif caption_file_path.endswith('.srt'):
                    # SRT 자막 파싱 (간단한 예시, 실제로는 정규식 등으로 파싱)
                    with open(caption_file_path, 'r', encoding='utf-8') as f:
                        srt_content = f.read()

                    # SRT 파싱 로직 (매우 간단한 예시, 실제론 srt 라이브러리 사용 권장)
                    blocks = srt_content.strip().split('\n\n')
                    for block in blocks:
                        lines = block.split('\n')
                        # 최소한 순번, 시간, 텍스트가 있어야 함
                        if len(lines) >= 3 and '-->' in lines[1]:
                            time_str = lines[1].strip()
                            text_content = ' '.join(lines[2:]).strip()

                            try:
                                # SRT 시간은 HH:MM:SS,ms 형태로 제공
                                start_time_parts = time_str.split(' --> ')[0].split(':')
                                seconds_ms = float(start_time_parts[-1].replace(',', '.')) # 밀리초 처리
                                seconds = int(seconds_ms)
                                minutes = int(start_time_parts[-2])
                                hours = int(start_time_parts[0]) if len(start_time_parts) == 3 else 0

                                if hours > 0:
                                    timestamp_str = f"{hours:02d}:{minutes:02d}:{seconds:02d}"
                                else:
                                    timestamp_str = f"{minutes:02d}:{seconds:02d}"
                                    
                                processed_chunks.append({
                                    "text": text_content,
                                    "timestamp": timestamp_str
                                })
                            except Exception as e:
                                logger.warning(f"SRT 시간 파싱 오류: {time_str} - {e}")
                    logger.info(f"yt-dlp로 SRT 자막 {len(processed_chunks)}개 청크 처리 완료.")

                else:
                    logger.warning(f"지원하지 않는 자막 파일 형식입니다: {caption_file_path}")

            else:
                logger.warning(f"비디오 ID '{video_id}'에 대한 yt-dlp 자막 파일을 찾을 수 없습니다. 최종 시도 경로: {caption_file_path}")

    except Exception as e:
        logger.error(f"yt-dlp 자막 추출 중 예기치 않은 오류 발생 for video ID '{video_id}': {type(e).__name__}: {e}")
        return []
    finally:
        if os.path.exists(temp_dir):
            for file_name in os.listdir(temp_dir):
                file_path = os.path.join(temp_dir, file_name)
                try:
                    if os.path.isfile(file_path):
                        os.remove(file_path)
                except Exception as e:
                    logger.error(f"임시 파일 삭제 실패 {file_path}: {e}")
            os.rmdir(temp_dir)
            logger.info(f"임시 자막 디렉토리 '{temp_dir}' 정리 완료.")
        os.chdir(original_cwd) # 원래 작업 디렉토리로 돌아옴
        
    return processed_chunks

def parse_srt_content(srt_content: str) -> list[dict]:
    chunks = []
    # 간단한 SRT 파싱 로직 (yt-dlp의 SRT 출력은 더 간단할 수 있음)
    # 실제 프로덕션에서는 더 견고한 SRT 파서 라이브러리를 사용하는 것이 좋습니다.
    import re
    # SRT 패턴: 1\n00:00:01,000 --> 00:00:03,000\nHello World\n\n
    blocks = re.split(r'\n\s*\n', srt_content.strip())
    for block in blocks:
        lines = block.split('\n')
        if len(lines) >= 3:
            # 첫 번째 라인은 순번, 두 번째 라인은 시간, 나머지는 텍스트
            time_str = lines[1]
            text = " ".join(lines[2:]).strip()
            
            # 시간 형식: 00:00:01,000 --> 00:00:03,000
            time_parts = time_str.split(' --> ')
            if len(time_parts) == 2:
                start_time = time_parts[0].replace(',', '.') # yt-dlp의 VTT 파서와 일관성을 위해 쉼표를 점으로 변경
                chunks.append({"text": text, "timestamp": start_time})
    return chunks


def parse_vtt_content(vtt_content: str) -> list[dict]:
    chunks = []
    lines = vtt_content.split('\n')
    i = 0
    while i < len(lines):
        line = lines[i].strip()
        if '-->' in line:
            # 시간 정보 라인
            time_str = line.split(' ')[0] # 예: 00:00:01.000
            # 다음 라인부터 텍스트 시작
            text_lines = []
            j = i + 1
            while j < len(lines) and lines[j].strip() != '':
                text_lines.append(lines[j].strip())
                j += 1
            text = ' '.join(text_lines)
            if text:
                chunks.append({"text": text, "timestamp": time_str})
            i = j # 다음 자막 블록으로 이동
        i += 1
    return chunks

def parse_json_content(json_content: dict) -> list[dict]:
    chunks = []
    for entry in json_content.get('events', []):
        if 'segs' in entry:
            text = "".join([seg.get('utf8', '') for seg in entry['segs']])
            # JSON3 형식은 밀리초까지 표현된 시작 시간이 't' 키에 있을 수 있음
            # yt-dlp가 생성하는 json3 파일 구조에 따라 유연하게 처리 필요
            start_ms = entry.get('t', 0)
            # 밀리초를 HH:MM:SS.mmm 형식으로 변환 (yt-dlp의 VTT timestamp와 유사하게)
            total_seconds = start_ms / 1000
            hours = int(total_seconds // 3600)
            minutes = int((total_seconds % 3600) // 60)
            seconds = total_seconds % 60
            timestamp = f"{hours:02d}:{minutes:02d}:{seconds:06.3f}"
            chunks.append({"text": text, "timestamp": timestamp})
    return chunks

async def process_youtube_video_data(video_url: str) -> list[dict] | None:
    video_id = await get_youtube_video_id(video_url)
    if not video_id:
        logger.error(f"유효하지 않은 YouTube URL: {video_url}")
        return None
    
    return await get_transcript_with_timestamps(video_id)