Sodagraph commited on
Commit
f6b1133
·
1 Parent(s): 5fd4118
backend/app/main.py CHANGED
@@ -2,21 +2,20 @@
2
 
3
  from fastapi import FastAPI, HTTPException
4
  from fastapi.middleware.cors import CORSMiddleware
5
- from fastapi.staticfiles import StaticFiles # StaticFiles 임포트
6
  import os
7
  from pydantic import BaseModel
8
- import httpx
9
 
10
  from youtube_parser import process_youtube_video_data
11
- from rag_core import perform_rag_query
12
 
13
  app = FastAPI()
14
 
15
- # CORS 설정: 프론트엔드와 백엔드가 다른 포트에서 실행될 때 필요
16
  origins = [
17
- "http://localhost:8080", # Vue 개발 서버 기본 포트
18
- "http://localhost:5173", # Vue Vite 개발 서버 기본 포트
19
- "https://sodagraph-po.hf.space", # 여러분의 Hugging Face Space URL
20
  ]
21
 
22
  app.add_middleware(
@@ -32,102 +31,47 @@ current_file_dir = os.path.dirname(os.path.abspath(__file__))
32
  project_root_dir = os.path.join(current_file_dir, "..", "..")
33
  static_files_dir = os.path.join(project_root_dir, "static")
34
 
35
- # OLLAMA_API_BASE_URL 환경 변수 설정
36
- OLLAMA_API_BASE_URL = os.getenv("OLLAMA_API_BASE_URL", "http://127.0.0.1:11434")
37
-
38
- async def generate_answer_with_ollama(model_name: str, prompt: str) -> str:
39
- """
40
- Ollama 서버에 질의하여 답변을 생성합니다.
41
- """
42
- url = f"{OLLAMA_API_BASE_URL}/api/generate"
43
- headers = {"Content-Type": "application/json"}
44
- data = {
45
- "model": model_name,
46
- "prompt": prompt,
47
- "stream": False # 스트리밍을 사용하지 않고 한 번에 답변을 받습니다.
48
- }
49
- print(f"INFO: Ollama API 호출 시작. 모델: {model_name}")
50
- print(f"INFO: 프롬프트 미리보기: {prompt[:200]}...")
51
-
52
- try:
53
- async with httpx.AsyncClient(timeout=600.0) as client:
54
- response = await client.post(url, headers=headers, json=data)
55
- response.raise_for_status()
56
-
57
- response_data = response.json()
58
- full_response = response_data.get("response", "").strip()
59
- return full_response
60
- except httpx.HTTPStatusError as e:
61
- print(f"ERROR: Ollama API 호출 실패: {e}")
62
- raise HTTPException(status_code=500, detail="Ollama API 호출 실패")
63
- except httpx.RequestError as e:
64
- print(f"ERROR: 네트워크 오류: {e}")
65
- raise HTTPException(status_code=500, detail="네트워크 오류가 발생했습니다. 잠시 후 다시 시도해주세요.")
66
- except Exception as e:
67
- print(f"ERROR: 알 수 없는 오류: {e}")
68
- raise HTTPException(status_code=500, detail="알 수 없는 오류가 발생했습니다. 잠시 후 다시 시도해주세요.")
69
-
70
  class VideoProcessRequest(BaseModel):
71
  video_url: str
72
  query: str
73
- # 사용자가 사용할 Ollama 모델 이름을 지정할 수 있도록 추가
74
  ollama_model_name: str = "hf.co/DevQuasar/naver-hyperclovax.HyperCLOVAX-SEED-Text-Instruct-0.5B-GGUF:F16"
75
 
76
  # ✅ 유튜브 영상 처리 API
77
  @app.post("/api/process_youtube_video")
78
  async def process_youtube_video(request: VideoProcessRequest):
79
  try:
 
80
  processed_chunks_with_timestamps = await process_youtube_video_data(request.video_url)
81
 
82
  if not processed_chunks_with_timestamps:
83
  return {"message": "자막 또는 내용을 추출할 수 없습니다.", "results": []}
84
 
85
- # 1. RAG 검색 수행
86
- rag_results = await perform_rag_query(
87
- chunks_with_timestamps=processed_chunks_with_timestamps,
88
  query=request.query,
 
 
89
  top_k=50
90
  )
91
 
92
- if not rag_results:
93
- return {
94
- "status": "error",
95
- "message": "검색 결과가 없습니다.",
96
- "video_url": request.video_url,
97
- "query": request.query,
98
- "results": []
99
- }
100
-
101
- # 2. 검색 결과를 프롬프트에 추가
102
- context = "\\n\\n".join([chunk["text"] for chunk in rag_results])
103
- prompt = f"다음 정보와 대화 내용을 참고하여 사용자의 질문에 답변하세요. 제공된 정보에서 답을 찾을 수 없다면 '정보 부족'이라고 명시하세요.\\n\\n참고 정보:\\n{context}\\n\\n사용자 질문: {request.query}\\n\\n답변:"
104
-
105
- # 3. Ollama 모델에 질의하여 답변 생성
106
- generated_answer = await generate_answer_with_ollama(
107
- model_name=request.ollama_model_name,
108
- prompt=prompt
109
- )
110
-
111
  return {
112
- "status": "success",
113
- "message": "성공적으로 영상을 처리하고 RAG 검색을 수행했습니다.",
114
  "video_url": request.video_url,
115
  "query": request.query,
116
  "ollama_model_used": request.ollama_model_name,
117
- "retrieved_chunks": rag_results,
118
- "generated_answer": generated_answer
119
  }
 
120
  except Exception as e:
121
  print(f"ERROR: 서버 처리 중 오류 발생: {str(e)}")
122
- raise HTTPException(status_code=500, detail="서버 처리 오류가 발생했습니다. 잠시 후 다시 시도해주세요.")
 
123
 
124
  # ✅ 정적 파일은 마지막에 mount
125
  app.mount("/", StaticFiles(directory=static_files_dir, html=True), name="static")
126
 
127
- # 서버 실행을 위한 메인 진입점 (Docker에서는 Uvicorn이 직접 호출하므로 필수는 아님)
128
  if __name__ == "__main__":
129
  import uvicorn
130
- import os
131
-
132
- port = int(os.environ.get("PORT", 7860)) # Hugging Face가 전달하는 포트를 우선 사용
133
- uvicorn.run(app, host="0.0.0.0", port=port)
 
2
 
3
  from fastapi import FastAPI, HTTPException
4
  from fastapi.middleware.cors import CORSMiddleware
5
+ from fastapi.staticfiles import StaticFiles
6
  import os
7
  from pydantic import BaseModel
 
8
 
9
  from youtube_parser import process_youtube_video_data
10
+ from rag_core import perform_rag_and_generate # 수정된 함수를 임포트
11
 
12
  app = FastAPI()
13
 
14
+ # CORS 설정
15
  origins = [
16
+ "http://localhost:8080",
17
+ "http://localhost:5173",
18
+ "https://sodagraph-po.hf.space",
19
  ]
20
 
21
  app.add_middleware(
 
31
  project_root_dir = os.path.join(current_file_dir, "..", "..")
32
  static_files_dir = os.path.join(project_root_dir, "static")
33
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
34
  class VideoProcessRequest(BaseModel):
35
  video_url: str
36
  query: str
 
37
  ollama_model_name: str = "hf.co/DevQuasar/naver-hyperclovax.HyperCLOVAX-SEED-Text-Instruct-0.5B-GGUF:F16"
38
 
39
  # ✅ 유튜브 영상 처리 API
40
  @app.post("/api/process_youtube_video")
41
  async def process_youtube_video(request: VideoProcessRequest):
42
  try:
43
+ # 1. 유튜브 영상에서 자막/콘텐츠 추출
44
  processed_chunks_with_timestamps = await process_youtube_video_data(request.video_url)
45
 
46
  if not processed_chunks_with_timestamps:
47
  return {"message": "자막 또는 내용을 추출할 수 없습니다.", "results": []}
48
 
49
+ # 2. RAG 프로세스 실행 (검색 + 생성)
50
+ rag_result = await perform_rag_and_generate(
 
51
  query=request.query,
52
+ chunks_with_timestamps=processed_chunks_with_timestamps,
53
+ ollama_model_name=request.ollama_model_name,
54
  top_k=50
55
  )
56
 
57
+ # 3. 최종 결과 반환
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
58
  return {
59
+ **rag_result, # rag_core에서 반환된 결과 딕셔너리를 그대로 사용
 
60
  "video_url": request.video_url,
61
  "query": request.query,
62
  "ollama_model_used": request.ollama_model_name,
 
 
63
  }
64
+
65
  except Exception as e:
66
  print(f"ERROR: 서버 처리 중 오류 발생: {str(e)}")
67
+ # 실제 Exception의 세부 정보를 로깅하는 것이 좋음
68
+ raise HTTPException(status_code=500, detail=f"서버 처리 중 오류가 발생했습니다: {e}")
69
 
70
  # ✅ 정적 파일은 마지막에 mount
71
  app.mount("/", StaticFiles(directory=static_files_dir, html=True), name="static")
72
 
73
+ # 서버 실행을 위한 메인 진입점
74
  if __name__ == "__main__":
75
  import uvicorn
76
+ port = int(os.environ.get("PORT", 7860))
77
+ uvicorn.run(app, host="0.0.0.0", port=port)
 
 
backend/app/rag_core.py CHANGED
@@ -1,19 +1,21 @@
1
  # ./backend/app/rag_core.py
2
-
 
 
3
  from sentence_transformers import SentenceTransformer
4
  import faiss
5
  import numpy as np
6
  from typing import List, Dict, Tuple
7
 
 
 
 
8
  # 전역 변수로 모델 로드 (앱 시작 시 한 번만 로드되도록)
9
- # 네이버 클로바의 한국어 SentenceBERT 모델을 로드합니다.
10
  try:
11
- # 네이버 클로바 HyperCLOVAX-SEED-Text-Instruct-0.5B 모델 로드
12
  model = SentenceTransformer('jhgan/ko-sroberta-multitask', device='cpu')
13
  print("INFO: 임베딩 모델 'jhgan/ko-sroberta-multitask' 로드 완료.")
14
  except Exception as e:
15
  print(f"ERROR: 임베딩 모델 'jhgan/ko-sroberta-multitask' 로드 실패: {e}. 다국어 모델로 시도합니다.")
16
- # 대체 모델 로직 (필요하다면 유지하거나 제거할 수 있습니다)
17
  try:
18
  model = SentenceTransformer('sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2', device='cpu')
19
  print("INFO: 임베딩 모델 'sentence-transformers/paraphrase-multilingual-L12-v2' 로드 완료.")
@@ -21,79 +23,131 @@ except Exception as e:
21
  print(f"ERROR: 대체 임베딩 모델 로드 실패: {e}. RAG 기능을 사용할 수 없습니다.")
22
  raise
23
 
24
- async def perform_rag_query(chunks_with_timestamps: List[Dict], query: str, top_k: int = 5) -> List[Dict]:
 
 
25
  """
26
- 제공된 텍스트 청크들과 쿼리를 사용하여 RAG(Retrieval-Augmented Generation) 검색을 수행합니다.
27
- 현재는 임베딩 기반 유사도 검색만 수행하며, LLM 호출은 추후 추가됩니다.
 
 
 
 
 
 
 
28
 
29
- Args:
30
- chunks_with_timestamps: [{"text": "...", "timestamp": "...", "start_seconds": ...}] 형태의 리스트.
31
- query: 사용자 쿼리 문자열.
32
- top_k: 쿼리와 가장 유사한 상위 N개의 청크를 반환.
 
 
 
 
 
 
 
 
 
 
 
 
 
33
 
34
- Returns:
35
- 쿼리와 가장 관련성 높은 상위 N개의 청크 (Dict) 리스트.
 
36
  """
37
  if not chunks_with_timestamps:
38
  print("WARNING: RAG 검색을 위한 텍스트 청크가 없습니다.")
39
  return []
40
 
41
- # 1. 텍스트 임베딩 생성
42
- # 모든 청크의 텍스트만 추출
43
  texts = [chunk["text"] for chunk in chunks_with_timestamps]
44
  print(f"INFO: 총 {len(texts)}개의 텍스트 청크 임베딩 시작.")
45
 
46
- # 모델의 encode 메서드는 비동기가 아니므로, 직접 호출.
47
- # 만약 시간이 오래 걸린다면 FastAPI의 `run_in_threadpool` 등을 고려.
48
  try:
49
  chunk_embeddings = model.encode(texts, convert_to_numpy=True)
50
- print("INFO: 텍스트 청크 임베딩 완료.")
51
  except Exception as e:
52
  print(f"ERROR: 텍스트 청크 임베딩 중 오류 발생: {e}")
53
  return []
54
 
55
- # 2. FAISS 인덱스 생성 및 청크 임베딩 추가
56
- dimension = chunk_embeddings.shape[1] # 임베딩 벡터의 차원
57
- index = faiss.IndexFlatL2(dimension) # L2 유클리드 거리를 사용하는 간단한 인덱스
 
58
  index.add(chunk_embeddings)
59
- print("INFO: FAISS 인덱스 생성 및 임베딩 추가 완료.")
60
 
61
- # 3. 쿼리 임베딩
62
  query_embedding = model.encode([query], convert_to_numpy=True)
63
- print("INFO: 쿼리 임베딩 완료.")
64
-
65
- # 4. 유사도 검색 (FAISS)
66
- # D: 거리 (Distance), I: 인덱스 (Index)
67
- distances, indices = index.search(query_embedding, top_k)
68
- print(f"INFO: FAISS 유사도 검색 완료. 상위 {top_k}개 결과.")
69
 
 
 
70
  retrieved_chunks = []
71
-
72
- # ✨✨✨ 유사도 임계값 설정 및 필터링 추가 ✨✨✨
73
- # 이 값은 실험을 통해 최적의 값을 찾아야 합니다.
74
- # 거리가 낮을수록 유사하므로, 이 값보다 '거리(score)'가 낮아야만 결과를 포함합니다.
75
- # 예를 들어, 0.5는 '거리가 0.5 미만인 경우에만 결과에 포함하라'는 의미입니다.
76
- # 거리가 0이면 완벽히 일치합니다.
77
- MIN_DISTANCE_THRESHOLD = 150 # 예시 값: 이 값보다 거리가 작아야 합니다 (더 유사해야 함)
78
 
79
  for i in range(len(indices[0])):
80
  idx = indices[0][i]
81
  original_chunk = chunks_with_timestamps[idx]
82
- score = float(distances[0][i]) # FAISS에서 반환된 거리 값 (낮을수록 유사)
83
 
84
- # 설정된 임계값보다 거리가 작을 때만 (즉, 유사도가 높을 때만) 결과에 포함
85
- if score < MIN_DISTANCE_THRESHOLD:
86
  retrieved_chunks.append({
87
  "text": original_chunk["text"],
88
  "timestamp": original_chunk["timestamp"],
89
- "score": score
 
90
  })
91
  else:
92
- # 디버깅용: 임계값 때문에 제외된 청크를 로그로 확인
93
- print(f"DEBUG: 유사도 임계값({MIN_DISTANCE_THRESHOLD:.4f}) 초과로 제외된 청크 (거리: {score:.4f}): {original_chunk['text'][:50]}...")
94
 
95
- # 거리가 작은 순서(유사도가 높은 순서)로 정렬하여 반환
96
- retrieved_chunks.sort(key=lambda x: x['timestamp'])
97
-
98
  print(f"DEBUG: 최종 검색된 청크 수: {len(retrieved_chunks)}")
99
- return retrieved_chunks
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  # ./backend/app/rag_core.py
2
+ import os
3
+ import httpx
4
+ from fastapi import HTTPException
5
  from sentence_transformers import SentenceTransformer
6
  import faiss
7
  import numpy as np
8
  from typing import List, Dict, Tuple
9
 
10
+ # OLLAMA_API_BASE_URL 환경 변수 설정
11
+ OLLAMA_API_BASE_URL = os.getenv("OLLAMA_API_BASE_URL", "http://127.0.0.1:11434")
12
+
13
  # 전역 변수로 모델 로드 (앱 시작 시 한 번만 로드되도록)
 
14
  try:
 
15
  model = SentenceTransformer('jhgan/ko-sroberta-multitask', device='cpu')
16
  print("INFO: 임베딩 모델 'jhgan/ko-sroberta-multitask' 로드 완료.")
17
  except Exception as e:
18
  print(f"ERROR: 임베딩 모델 'jhgan/ko-sroberta-multitask' 로드 실패: {e}. 다국어 모델로 시도합니다.")
 
19
  try:
20
  model = SentenceTransformer('sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2', device='cpu')
21
  print("INFO: 임베딩 모델 'sentence-transformers/paraphrase-multilingual-L12-v2' 로드 완료.")
 
23
  print(f"ERROR: 대체 임베딩 모델 로드 실패: {e}. RAG 기능을 사용할 수 없습니다.")
24
  raise
25
 
26
+ async def generate_answer_with_ollama(model_name: str, prompt: str) -> str:
27
+ """
28
+ Ollama 서버에 질의하여 답변을 생성합니다.
29
  """
30
+ url = f"{OLLAMA_API_BASE_URL}/api/generate"
31
+ headers = {"Content-Type": "application/json"}
32
+ data = {
33
+ "model": model_name,
34
+ "prompt": prompt,
35
+ "stream": False
36
+ }
37
+ print(f"INFO: Ollama API 호출 시작. 모델: {model_name}")
38
+ print(f"INFO: 프롬프트 미리보기: {prompt[:200]}...")
39
 
40
+ try:
41
+ async with httpx.AsyncClient(timeout=600.0) as client:
42
+ response = await client.post(url, headers=headers, json=data)
43
+ response.raise_for_status()
44
+
45
+ response_data = response.json()
46
+ full_response = response_data.get("response", "").strip()
47
+ return full_response
48
+ except httpx.HTTPStatusError as e:
49
+ print(f"ERROR: Ollama API 호출 실패: {e}")
50
+ raise HTTPException(status_code=500, detail="Ollama API 호출 실패")
51
+ except httpx.RequestError as e:
52
+ print(f"ERROR: 네트워크 오류: {e}")
53
+ raise HTTPException(status_code=500, detail="네트워크 오류가 발생했습니다. 잠시 후 다시 시도해주세요.")
54
+ except Exception as e:
55
+ print(f"ERROR: 알 수 없는 오류: {e}")
56
+ raise HTTPException(status_code=500, detail="알 수 없는 오류가 발생했습니다. 잠시 후 다시 시도해주세요.")
57
 
58
+ async def perform_retrieval(chunks_with_timestamps: List[Dict], query: str, top_k: int = 5) -> List[Dict]:
59
+ """
60
+ 제공된 텍스트 청크에서 쿼리와 가장 유사한 부분을 검색합니다. (Retrieval-only)
61
  """
62
  if not chunks_with_timestamps:
63
  print("WARNING: RAG 검색을 위한 텍스트 청크가 없습니다.")
64
  return []
65
 
 
 
66
  texts = [chunk["text"] for chunk in chunks_with_timestamps]
67
  print(f"INFO: 총 {len(texts)}개의 텍스트 청크 임베딩 시작.")
68
 
 
 
69
  try:
70
  chunk_embeddings = model.encode(texts, convert_to_numpy=True)
 
71
  except Exception as e:
72
  print(f"ERROR: 텍스트 청크 임베딩 중 오류 발생: {e}")
73
  return []
74
 
75
+ dimension = chunk_embeddings.shape[1]
76
+ index = faiss.IndexFlatIP(dimension)
77
+
78
+ faiss.normalize_L2(chunk_embeddings)
79
  index.add(chunk_embeddings)
 
80
 
 
81
  query_embedding = model.encode([query], convert_to_numpy=True)
82
+ faiss.normalize_L2(query_embedding)
 
 
 
 
 
83
 
84
+ similarities, indices = index.search(query_embedding, top_k)
85
+
86
  retrieved_chunks = []
87
+ MIN_SIMILARITY_THRESHOLD = 0.35 # 임계값
 
 
 
 
 
 
88
 
89
  for i in range(len(indices[0])):
90
  idx = indices[0][i]
91
  original_chunk = chunks_with_timestamps[idx]
92
+ score = float(similarities[0][i])
93
 
94
+ if score > MIN_SIMILARITY_THRESHOLD:
 
95
  retrieved_chunks.append({
96
  "text": original_chunk["text"],
97
  "timestamp": original_chunk["timestamp"],
98
+ "score": score,
99
+ "start_seconds": original_chunk["start_seconds"]
100
  })
101
  else:
102
+ print(f"DEBUG: 유사도 임계값({MIN_SIMILARITY_THRESHOLD:.4f}) 미만으로 제외된 청크 (유사도: {score:.4f}): {original_chunk['text'][:50]}...")
 
103
 
104
+ retrieved_chunks.sort(key=lambda x: x['start_seconds'])
 
 
105
  print(f"DEBUG: 최종 검색된 청크 수: {len(retrieved_chunks)}")
106
+ return retrieved_chunks
107
+
108
+ async def perform_rag_and_generate(query: str, chunks_with_timestamps: List[Dict], ollama_model_name: str, top_k: int = 50) -> Dict:
109
+ """
110
+ RAG의 전체 프로세스(검색, 프롬프트 구성, 생성)를 수행합니다.
111
+ """
112
+ # 1. RAG 검색 수행
113
+ retrieved_chunks = await perform_retrieval(
114
+ chunks_with_timestamps=chunks_with_timestamps,
115
+ query=query,
116
+ top_k=top_k
117
+ )
118
+
119
+ if not retrieved_chunks:
120
+ return {
121
+ "status": "error",
122
+ "message": "검색 결과가 없습니다.",
123
+ "retrieved_chunks": [],
124
+ "generated_answer": "관련 정보를 찾지 못해 답변을 생성할 수 없습니다."
125
+ }
126
+
127
+ # 2. 검색 결과를 프롬프트에 추가
128
+ context = "\n\n".join([chunk["text"] for chunk in retrieved_chunks])
129
+ prompt = f"""당신은 유튜브 영상 내용을 완벽하게 이해하고 사용자의 질문에 답변하는 AI 어시스턴트입니다.
130
+
131
+ 아래는 분석한 유튜브 영상의 자막 내용입니다. 이 정보를 바탕으로 사용자의 질문에 대해 상세하고 친절하게 답변하세요.
132
+ 답변은 반드시 영상 내용에 근거해야 하며, 내용과 관련 없는 질문에는 '영상 내용과 관련이 없어 답변할 수 없습니다'라고 솔직하게 말해야 합니다.
133
+
134
+ --- 유튜브 영상 자막 내용 ---
135
+ {context}
136
+ --------------------------
137
+
138
+ 사용자 질문: {query}
139
+
140
+ 답변:"""
141
+
142
+ # 3. Ollama 모델에 질의하여 답변 생성
143
+ generated_answer = await generate_answer_with_ollama(
144
+ model_name=ollama_model_name,
145
+ prompt=prompt
146
+ )
147
+
148
+ return {
149
+ "status": "success",
150
+ "message": "성공적으로 영상을 처리하고 RAG 검색을 수행했습니다.",
151
+ "retrieved_chunks": retrieved_chunks,
152
+ "generated_answer": generated_answer
153
+ }
backend/app/youtube_parser.py CHANGED
@@ -37,120 +37,79 @@ def validate_youtube_url(url):
37
  async def get_youtube_video_id(url: str) -> str | None:
38
  """
39
  유튜브 URL에서 비디오 ID를 추출합니다.
40
- 표준 유튜브 URL (youtube.com/watch?v=..., youtu.be/...)을 처리합니다.
41
  """
42
  parsed_url = urlparse(url)
43
-
44
- # 표준 YouTube Watch 페이지 도메인 확인
45
- # www.youtube.com, m.youtube.com, youtube.com 등을 포함합니다.
46
- # 'www.youtube.com', 'm.youtube.com', 'youtube.com'은 실제 YouTube 도메인을 의미합니다.
47
  if parsed_url.hostname and any(domain in parsed_url.hostname for domain in ['www.youtube.com', 'm.youtube.com', 'youtube.com']):
48
  query_params = parse_qs(parsed_url.query)
49
  if 'v' in query_params:
50
  return query_params['v'][0]
51
-
52
- # 짧은 YouTube URL (youtu.be/VIDEO_ID)
53
  elif parsed_url.hostname == 'youtu.be':
54
- # path가 /VIDEO_ID 형태이므로 맨 앞의 '/'를 제거
55
  video_id = parsed_url.path.strip('/')
56
- # 유튜브 비디오 ID는 보통 11자리이므로, 유효성 검사
57
  if len(video_id) == 11:
58
  return video_id
59
-
60
  logger.warning(f"알 수 없는 형식의 YouTube URL: {url}")
61
  return None
62
 
63
  def clean_caption_text(text: str) -> str:
64
- # <00:00:00.000><c>...</c> 또는 </c> 같은 태그 제거
65
  cleaned_text = re.sub(r'<[^>]+>', '', text)
66
- # 여러 공백을 하나의 공백으로 줄이고 양쪽 끝 공백 제거
67
  cleaned_text = re.sub(r'\s+', ' ', cleaned_text).strip()
68
  return cleaned_text
69
 
70
  async def get_transcript_with_timestamps(video_id: str) -> list[dict] | None:
71
  logger.info(f"비디오 ID '{video_id}'에 대한 자막 가져오기 시도.")
72
-
73
  processed_chunks = []
74
- # yt-dlp 옵션 설정
75
  ydl_opts = {
76
- 'writesubtitles': True, # 사용자가 업로드한 수동 자막 파일 쓰기 활성화
77
- 'writeautomaticsub': True, # YouTube에서 자동으로 생성된 자막 파일 쓰기 활성화
78
- 'subtitleslangs': ['ko', 'en'], # 다운로드할 자막 언어 목록 (한국어 우선, 없으면 영어)
79
- 'skip_download': True, # 동영상 자체는 다운로드하지 않고 자막만 다운로드
80
- 'outtmpl': '%(id)s.%(language)s.%(ext)s', # 다운로드될 파일 이름 템플릿 (temp_dir 안에서 상대 경로로 저장됨)
81
- 'quiet': False, # 콘솔 출력 활성화 (디버깅용)
82
- 'no_warnings': False, # 경고 메시지 활성화 (디버깅용)
83
- 'extractor_args': { # 특정 extractor (예: 유튜브)에 대한 추가 인자
84
- 'youtube': {'skip': ['dash']} # dash manifest 관련 오류 회피 시도 (유튜브 관련)
85
  }
86
  }
87
  logger.info("yt-dlp에 프록시가 적용되지 않았습니다.")
88
 
89
- temp_dir = "./temp_captions"
90
  os.makedirs(temp_dir, exist_ok=True)
91
  original_cwd = os.getcwd()
92
 
93
  try:
94
- with contextlib.chdir(temp_dir): # 임시 디렉토리로 작업 디렉토리 변경
95
- # outtmpl 현재 chdir된 디렉토리 내의 상대 경로로 지정
96
- # yt-dlp가 파일을 temp_dir 안에 바로 생성하도록 함
97
- ydl_opts['outtmpl'] = '%(id)s.%(ext)s'
98
-
99
- logger.debug(f"yt-dlp 실행 전 현재 작업 디렉토리: {os.getcwd()}")
100
- logger.debug(f"yt-dlp 옵션: {ydl_opts}")
101
-
102
  with YoutubeDL(ydl_opts) as ydl:
103
- # download=True 설정으로 자막 다운로드 시도
104
- # 비디오 ID만 전달해도 yt-dlp가 알아서 처리합니다.
105
  info_dict = await asyncio.to_thread(ydl.extract_info, video_id, download=True)
106
-
107
- logger.debug(f"yt-dlp extract_info 결과 (자세한 정보는 debug_yt_dlp.log 파일 확인): {json.dumps(info_dict.get('requested_subtitles', 'No subtitles requested'), indent=2, ensure_ascii=False)}")
108
 
109
  caption_file_path = None
110
-
111
- # 1. info_dict에서 직접 자막 파일 경로를 찾으려는 시도 (가장 정확)
112
- # yt-dlp 0.0.12 버전 이상에서는 _download_lock이 반환됨. info_dict에서 직접 파일을 찾아야 함
113
  if 'requested_subtitles' in info_dict and info_dict['requested_subtitles']:
114
  for lang_code in ydl_opts['subtitleslangs']:
115
  if lang_code in info_dict['requested_subtitles']:
116
  sub_info = info_dict['requested_subtitles'][lang_code]
117
- # 'filepath' 키가 없거나 None일 수 있으므로 확인
118
  if 'filepath' in sub_info and sub_info['filepath']:
119
- # filepath는 이미 현재 작업 디렉토리(temp_dir) 기준으로 되어 있을 것
120
  caption_file_path = sub_info['filepath']
121
  logger.info(f"yt-dlp가 '{lang_code}' 자막 파일을 info_dict에서 찾았습니다: {caption_file_path}")
122
- break # 찾았으면 루프 종료
123
 
124
- # 2. info_dict에서 찾지 못했을 경우, 폴백으로 임시 디렉토리를 탐색
125
  if not caption_file_path:
126
- logger.debug(f"info_dict에서 자막 파일 경로를 찾지 못했습니다. 임시 디렉토리 스캔 시작.")
127
- downloaded_files = [f for f in os.listdir('.') if f.startswith(video_id) and ('sub' in f or 'vtt' in f or 'json' in f or 'srt' in f)]
128
- logger.debug(f"임시 디렉토리의 파일 목록: {downloaded_files}")
129
-
130
- # 한국어 자막 우선 검색 (vtt, srt, json 순)
131
  for ext in ['vtt', 'srt', 'json']:
132
  ko_file = next((f for f in downloaded_files if f.endswith(f'.ko.{ext}')), None)
133
  if ko_file:
134
- caption_file_path = os.path.join(os.getcwd(), ko_file) # 현재 작업 디렉토리 기준으로 경로 조합
135
- logger.info(f"폴백: yt-dlp로 한국어 {ext.upper()} 자막 파일 '{ko_file}'을 다운로드했습니다.")
136
  break
137
-
138
  if not caption_file_path:
139
- # 한국어 없으면 첫 번째 사용 가능한 자막 찾기
140
  for ext in ['vtt', 'srt', 'json']:
141
  any_file = next((f for f in downloaded_files if f.endswith(f'.{ext}')), None)
142
  if any_file:
143
- caption_file_path = os.path.join(os.getcwd(), any_file) # 현재 작업 디렉토리 기준으로 경로 조합
144
- logger.warning(f"폴백: 한국어 자막이 없어 첫 번째 {ext.upper()} 자막 파일 '{any_file}'을 사용합니다.")
145
  break
146
 
147
- # 3. 자막 파일이 찾아졌으면 파싱 시작
148
  if caption_file_path and os.path.exists(caption_file_path):
149
  if caption_file_path.endswith('.vtt'):
150
  with open(caption_file_path, 'r', encoding='utf-8') as f:
151
  vtt_content = f.read()
152
-
153
- # WEBVTT 파싱
154
  segments = vtt_content.split('\n\n')
155
  for segment in segments:
156
  if '-->' in segment:
@@ -158,41 +117,33 @@ async def get_transcript_with_timestamps(video_id: str) -> list[dict] | None:
158
  time_str = lines[0].strip()
159
  text_content = ' '.join(lines[1:]).strip()
160
  text_content = clean_caption_text(text_content)
161
-
162
  try:
163
- # VTT 시간은 HH:MM:SS.ms 형태로 제공되므로, 밀리초를 float로 처리 후 정수로 변환
164
  start_time_parts = time_str.split(' --> ')[0].split(':')
165
- if len(start_time_parts) == 3: # HH:MM:SS.ms
166
  hours = int(start_time_parts[0])
167
  minutes = int(start_time_parts[1])
168
  seconds = int(float(start_time_parts[2].split('.')[0]))
169
- elif len(start_time_parts) == 2: # MM:SS.ms
170
  hours = 0
171
  minutes = int(start_time_parts[0])
172
  seconds = int(float(start_time_parts[1].split('.')[0]))
173
- else:
174
- raise ValueError("Unsupported time format")
175
-
176
- # HH:MM:SS 포맷으로 맞춤
177
- if hours > 0:
178
- timestamp_str = f"{hours:02d}:{minutes:02d}:{seconds:02d}"
179
- else:
180
- timestamp_str = f"{minutes:02d}:{seconds:02d}"
181
-
182
  processed_chunks.append({
183
  "text": text_content,
184
- "timestamp": timestamp_str
 
185
  })
186
  except Exception as e:
187
  logger.warning(f"VTT 시간 파싱 오류: {time_str} - {e}")
188
- logger.info(f"yt-dlp로 VTT 자막 {len(processed_chunks)}개 청크 처리 완료.")
189
 
190
  elif caption_file_path.endswith('.json'):
191
- # JSON 자막 파싱 (yt-dlp가 가끔 JSON 포맷으로도 다운로드함)
192
  with open(caption_file_path, 'r', encoding='utf-8') as f:
193
  json_content = json.load(f)
194
 
195
- # yt-dlp의 JSON 자막 형식에 맞춰 파싱 (예시, 실제 구조는 info_dict를 통해 확인 필요)
196
  for entry in json_content:
197
  if 'start' in entry and 'text' in entry:
198
  total_seconds = int(entry['start'])
@@ -202,52 +153,43 @@ async def get_transcript_with_timestamps(video_id: str) -> list[dict] | None:
202
  text = entry['text']
203
  text = clean_caption_text(text)
204
 
205
- if hours > 0:
206
- timestamp_str = f"{hours:02d}:{minutes:02d}:{seconds:02d}"
207
- else:
208
- timestamp_str = f"{minutes:02d}:{seconds:02d}"
209
-
210
  processed_chunks.append({
211
  "text": text,
212
- "timestamp": timestamp_str
 
213
  })
214
- logger.info(f"yt-dlp로 JSON 자막 {len(processed_chunks)}개 청크 처리 완료.")
215
 
216
  elif caption_file_path.endswith('.srt'):
217
- # SRT 자막 파싱 (간단한 예시, 실제로는 정규식 등으로 파싱)
218
  with open(caption_file_path, 'r', encoding='utf-8') as f:
219
  srt_content = f.read()
220
 
221
- # SRT 파싱 로직 (매우 간단한 예시, 실제론 srt 라이브러리 사용 권장)
222
  blocks = srt_content.strip().split('\n\n')
223
  for block in blocks:
224
  lines = block.split('\n')
225
- # 최소한 순번, 시간, 텍스트가 있어야 함
226
  if len(lines) >= 3 and '-->' in lines[1]:
227
  time_str = lines[1].strip()
228
  text_content = ' '.join(lines[2:]).strip()
229
  text_content = clean_caption_text(text_content)
230
 
231
  try:
232
- # SRT 시간은 HH:MM:SS,ms 형태로 제공
233
  start_time_parts = time_str.split(' --> ')[0].split(':')
234
- seconds_ms = float(start_time_parts[-1].replace(',', '.')) # 밀리초 처리
235
  seconds = int(seconds_ms)
236
  minutes = int(start_time_parts[-2])
237
  hours = int(start_time_parts[0]) if len(start_time_parts) == 3 else 0
238
-
239
- if hours > 0:
240
- timestamp_str = f"{hours:02d}:{minutes:02d}:{seconds:02d}"
241
- else:
242
- timestamp_str = f"{minutes:02d}:{seconds:02d}"
243
-
244
  processed_chunks.append({
245
  "text": text_content,
246
- "timestamp": timestamp_str
 
247
  })
248
  except Exception as e:
249
  logger.warning(f"SRT 시간 파싱 오류: {time_str} - {e}")
250
- logger.info(f"yt-dlp로 SRT 자막 {len(processed_chunks)}개 청크 처리 완료.")
251
 
252
  else:
253
  logger.warning(f"지원하지 않는 자막 파일 형식입니다: {caption_file_path}")
@@ -255,83 +197,82 @@ async def get_transcript_with_timestamps(video_id: str) -> list[dict] | None:
255
  logger.warning(f"비디오 ID '{video_id}'에 대한 yt-dlp 자막 파일을 찾을 수 없습니다. 최종 시도 경로: {caption_file_path}")
256
 
257
  except Exception as e:
258
- logger.error(f"yt-dlp 자막 추출 중 예기치 않은 오류 발생 for video ID '{video_id}': {type(e).__name__}: {e}")
259
  return []
260
  finally:
 
261
  if os.path.exists(temp_dir):
262
- for file_name in os.listdir(temp_dir):
263
- file_path = os.path.join(temp_dir, file_name)
264
- try:
265
- if os.path.isfile(file_path):
266
- os.remove(file_path)
267
- except Exception as e:
268
- logger.error(f"임시 파일 삭제 실패 {file_path}: {e}")
269
- os.rmdir(temp_dir)
270
  logger.info(f"임시 자막 디렉토리 '{temp_dir}' 정리 완료.")
271
- os.chdir(original_cwd) # 원래 작업 디렉토리로 돌아옴
272
 
273
  return processed_chunks
274
 
275
- def parse_srt_content(srt_content: str) -> list[dict]:
276
- chunks = []
277
- # 간단한 SRT 파싱 로직 (yt-dlp의 SRT 출력은 더 간단할 수 있음)
278
- # 실제 프로덕션에서는 더 견고한 SRT 파서 라이브러리를 사용하는 것이 좋습니다.
279
- import re
280
- # SRT 패턴: 1\n00:00:01,000 --> 00:00:03,000\nHello World\n\n
281
- blocks = re.split(r'\n\s*\n', srt_content.strip())
282
- for block in blocks:
283
- lines = block.split('\n')
284
- if len(lines) >= 3:
285
- # 번째 라인은 순번, 두 번째 라인은 시간, 나머지는 텍스트
286
- time_str = lines[1]
287
- text = " ".join(lines[2:]).strip()
 
 
 
 
 
 
 
 
 
288
 
289
- # 시간 형식: 00:00:01,000 --> 00:00:03,000
290
- time_parts = time_str.split(' --> ')
291
- if len(time_parts) == 2:
292
- start_time = time_parts[0].replace(',', '.') # yt-dlp의 VTT 파서와 일관성을 위해 쉼표를 점으로 변경
293
- chunks.append({"text": text, "timestamp": start_time})
294
- return chunks
295
 
296
 
297
- def parse_vtt_content(vtt_content: str) -> list[dict]:
298
- chunks = []
299
- lines = vtt_content.split('\n')
300
- i = 0
301
- while i < len(lines):
302
- line = lines[i].strip()
303
- if '-->' in line:
304
- # 시간 정보 라인
305
- time_str = line.split(' ')[0] # 예: 00:00:01.000
306
- # 다음 라인부터 텍스트 시작
307
- text_lines = []
308
- j = i + 1
309
- while j < len(lines) and lines[j].strip() != '':
310
- text_lines.append(lines[j].strip())
311
- j += 1
312
- text = ' '.join(text_lines)
313
- if text:
314
- chunks.append({"text": text, "timestamp": time_str})
315
- i = j # 다음 자막 블록으로 이동
316
- i += 1
317
- return chunks
318
 
319
- def parse_json_content(json_content: dict) -> list[dict]:
320
- chunks = []
321
- for entry in json_content.get('events', []):
322
- if 'segs' in entry:
323
- text = "".join([seg.get('utf8', '') for seg in entry['segs']])
324
- # JSON3 형식은 밀리초까지 표현된 시작 시간이 't' 키에 있을 수 있음
325
- # yt-dlp가 생성하는 json3 파일 구조에 따라 유연하게 처리 필요
326
- start_ms = entry.get('t', 0)
327
- # 밀리초를 HH:MM:SS.mmm 형식으로 변환 (yt-dlp의 VTT timestamp와 유사하게)
328
- total_seconds = start_ms / 1000
329
- hours = int(total_seconds // 3600)
330
- minutes = int((total_seconds % 3600) // 60)
331
- seconds = total_seconds % 60
332
- timestamp = f"{hours:02d}:{minutes:02d}:{seconds:06.3f}"
333
- chunks.append({"text": text, "timestamp": timestamp})
334
- return chunks
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
335
 
336
  async def process_youtube_video_data(video_url: str) -> list[dict] | None:
337
  video_id = await get_youtube_video_id(video_url)
@@ -339,4 +280,13 @@ async def process_youtube_video_data(video_url: str) -> list[dict] | None:
339
  logger.error(f"유효하지 않은 YouTube URL: {video_url}")
340
  return None
341
 
342
- return await get_transcript_with_timestamps(video_id)
 
 
 
 
 
 
 
 
 
 
37
  async def get_youtube_video_id(url: str) -> str | None:
38
  """
39
  유튜브 URL에서 비디오 ID를 추출합니다.
 
40
  """
41
  parsed_url = urlparse(url)
 
 
 
 
42
  if parsed_url.hostname and any(domain in parsed_url.hostname for domain in ['www.youtube.com', 'm.youtube.com', 'youtube.com']):
43
  query_params = parse_qs(parsed_url.query)
44
  if 'v' in query_params:
45
  return query_params['v'][0]
 
 
46
  elif parsed_url.hostname == 'youtu.be':
 
47
  video_id = parsed_url.path.strip('/')
 
48
  if len(video_id) == 11:
49
  return video_id
 
50
  logger.warning(f"알 수 없는 형식의 YouTube URL: {url}")
51
  return None
52
 
53
  def clean_caption_text(text: str) -> str:
 
54
  cleaned_text = re.sub(r'<[^>]+>', '', text)
 
55
  cleaned_text = re.sub(r'\s+', ' ', cleaned_text).strip()
56
  return cleaned_text
57
 
58
  async def get_transcript_with_timestamps(video_id: str) -> list[dict] | None:
59
  logger.info(f"비디오 ID '{video_id}'에 대한 자막 가져오기 시도.")
 
60
  processed_chunks = []
 
61
  ydl_opts = {
62
+ 'writesubtitles': True,
63
+ 'writeautomaticsub': True,
64
+ 'subtitleslangs': ['ko', 'en'],
65
+ 'skip_download': True,
66
+ 'outtmpl': '%(id)s.%(language)s.%(ext)s',
67
+ 'quiet': False,
68
+ 'no_warnings': False,
69
+ 'extractor_args': {
70
+ 'youtube': {'skip': ['dash']}
71
  }
72
  }
73
  logger.info("yt-dlp에 프록시가 적용되지 않았습니다.")
74
 
75
+ temp_dir = f"./temp_captions_{video_id}" # 각 요청별 고유 디렉토리 생성
76
  os.makedirs(temp_dir, exist_ok=True)
77
  original_cwd = os.getcwd()
78
 
79
  try:
80
+ with contextlib.chdir(temp_dir):
81
+ ydl_opts['outtmpl'] = '%(id)s.%(ext)s'
 
 
 
 
 
 
82
  with YoutubeDL(ydl_opts) as ydl:
 
 
83
  info_dict = await asyncio.to_thread(ydl.extract_info, video_id, download=True)
 
 
84
 
85
  caption_file_path = None
 
 
 
86
  if 'requested_subtitles' in info_dict and info_dict['requested_subtitles']:
87
  for lang_code in ydl_opts['subtitleslangs']:
88
  if lang_code in info_dict['requested_subtitles']:
89
  sub_info = info_dict['requested_subtitles'][lang_code]
 
90
  if 'filepath' in sub_info and sub_info['filepath']:
 
91
  caption_file_path = sub_info['filepath']
92
  logger.info(f"yt-dlp가 '{lang_code}' 자막 파일을 info_dict에서 찾았습니다: {caption_file_path}")
93
+ break
94
 
 
95
  if not caption_file_path:
96
+ downloaded_files = [f for f in os.listdir('.') if f.startswith(video_id) and any(ext in f for ext in ['vtt', 'srt', 'json'])]
 
 
 
 
97
  for ext in ['vtt', 'srt', 'json']:
98
  ko_file = next((f for f in downloaded_files if f.endswith(f'.ko.{ext}')), None)
99
  if ko_file:
100
+ caption_file_path = os.path.join(os.getcwd(), ko_file)
 
101
  break
 
102
  if not caption_file_path:
 
103
  for ext in ['vtt', 'srt', 'json']:
104
  any_file = next((f for f in downloaded_files if f.endswith(f'.{ext}')), None)
105
  if any_file:
106
+ caption_file_path = os.path.join(os.getcwd(), any_file)
 
107
  break
108
 
 
109
  if caption_file_path and os.path.exists(caption_file_path):
110
  if caption_file_path.endswith('.vtt'):
111
  with open(caption_file_path, 'r', encoding='utf-8') as f:
112
  vtt_content = f.read()
 
 
113
  segments = vtt_content.split('\n\n')
114
  for segment in segments:
115
  if '-->' in segment:
 
117
  time_str = lines[0].strip()
118
  text_content = ' '.join(lines[1:]).strip()
119
  text_content = clean_caption_text(text_content)
120
+ if not text_content: continue
121
  try:
 
122
  start_time_parts = time_str.split(' --> ')[0].split(':')
123
+ if len(start_time_parts) == 3:
124
  hours = int(start_time_parts[0])
125
  minutes = int(start_time_parts[1])
126
  seconds = int(float(start_time_parts[2].split('.')[0]))
127
+ else: # MM:SS.ms
128
  hours = 0
129
  minutes = int(start_time_parts[0])
130
  seconds = int(float(start_time_parts[1].split('.')[0]))
131
+
132
+ total_seconds = hours * 3600 + minutes * 60 + seconds
133
+ timestamp_str = f"{hours:02d}:{minutes:02d}:{seconds:02d}" if hours > 0 else f"{minutes:02d}:{seconds:02d}"
 
 
 
 
 
 
134
  processed_chunks.append({
135
  "text": text_content,
136
+ "timestamp": timestamp_str,
137
+ "start_seconds": total_seconds # Added
138
  })
139
  except Exception as e:
140
  logger.warning(f"VTT 시간 파싱 오류: {time_str} - {e}")
141
+ logger.info(f"VTT 자막 {len(processed_chunks)}개 청크 처리 완료.")
142
 
143
  elif caption_file_path.endswith('.json'):
 
144
  with open(caption_file_path, 'r', encoding='utf-8') as f:
145
  json_content = json.load(f)
146
 
 
147
  for entry in json_content:
148
  if 'start' in entry and 'text' in entry:
149
  total_seconds = int(entry['start'])
 
153
  text = entry['text']
154
  text = clean_caption_text(text)
155
 
156
+ timestamp_str = f"{hours:02d}:{minutes:02d}:{seconds:02d}" if hours > 0 else f"{minutes:02d}:{seconds:02d}"
 
 
 
 
157
  processed_chunks.append({
158
  "text": text,
159
+ "timestamp": timestamp_str,
160
+ "start_seconds": total_seconds # Added
161
  })
162
+ logger.info(f"JSON 자막 {len(processed_chunks)}개 청크 처리 완료.")
163
 
164
  elif caption_file_path.endswith('.srt'):
 
165
  with open(caption_file_path, 'r', encoding='utf-8') as f:
166
  srt_content = f.read()
167
 
 
168
  blocks = srt_content.strip().split('\n\n')
169
  for block in blocks:
170
  lines = block.split('\n')
 
171
  if len(lines) >= 3 and '-->' in lines[1]:
172
  time_str = lines[1].strip()
173
  text_content = ' '.join(lines[2:]).strip()
174
  text_content = clean_caption_text(text_content)
175
 
176
  try:
 
177
  start_time_parts = time_str.split(' --> ')[0].split(':')
178
+ seconds_ms = float(start_time_parts[-1].replace(',', '.'))
179
  seconds = int(seconds_ms)
180
  minutes = int(start_time_parts[-2])
181
  hours = int(start_time_parts[0]) if len(start_time_parts) == 3 else 0
182
+
183
+ total_seconds = hours * 3600 + minutes * 60 + seconds
184
+ timestamp_str = f"{hours:02d}:{minutes:02d}:{seconds:02d}" if hours > 0 else f"{minutes:02d}:{seconds:02d}"
 
 
 
185
  processed_chunks.append({
186
  "text": text_content,
187
+ "timestamp": timestamp_str,
188
+ "start_seconds": total_seconds # Added
189
  })
190
  except Exception as e:
191
  logger.warning(f"SRT 시간 파싱 오류: {time_str} - {e}")
192
+ logger.info(f"SRT 자막 {len(processed_chunks)}개 청크 처리 완료.")
193
 
194
  else:
195
  logger.warning(f"지원하지 않는 자막 파일 형식입니다: {caption_file_path}")
 
197
  logger.warning(f"비디오 ID '{video_id}'에 대한 yt-dlp 자막 파일을 찾을 수 없습니다. 최종 시도 경로: {caption_file_path}")
198
 
199
  except Exception as e:
200
+ logger.error(f"자막 추출 중 오류: {e}")
201
  return []
202
  finally:
203
+ os.chdir(original_cwd)
204
  if os.path.exists(temp_dir):
205
+ shutil.rmtree(temp_dir) # 임시 디렉토리와 내용물 모두 삭제
 
 
 
 
 
 
 
206
  logger.info(f"임시 자막 디렉토리 '{temp_dir}' 정리 완료.")
 
207
 
208
  return processed_chunks
209
 
210
+ def remove_duplicate_captions(chunks: list[dict]) -> list[dict]:
211
+ if not chunks:
212
+ return []
213
+
214
+ # 첫 번째 청크는 항상 포함
215
+ deduplicated_chunks = [chunks[0]]
216
+
217
+ for i in range(1, len(chunks)):
218
+ # 이전 청크의 텍스트와 현재 청크의 텍스트를 가져옴
219
+ # .strip()으로 양쪽 공백을 제거하여 비교 정확도 향상
220
+ prev_text = deduplicated_chunks[-1]["text"].strip()
221
+ current_text = chunks[i]["text"].strip()
222
+
223
+ # 현재 텍스트가 이전 텍스트로 시작하고, 길이가 더 긴 경우 (점진적 구성)
224
+ # 예: prev="안녕하세요", current="안녕하세요 제 이름은"
225
+ if current_text.startswith(prev_text) and len(current_text) > len(prev_text):
226
+ # 이전 항목을 현재의 더 완전한 문장으로 교체
227
+ deduplicated_chunks[-1] = chunks[i]
228
+ # 현재 텍스트와 이전 텍스트가 완전히 다른 내용일 경우에만 새로 추가
229
+ # (완전히 똑같은 중복도 이 조건에서 걸러짐)
230
+ elif prev_text != current_text:
231
+ deduplicated_chunks.append(chunks[i])
232
 
233
+ logger.info(f"중복 제거 최종 청크 수: {len(deduplicated_chunks)}")
234
+ return deduplicated_chunks
 
 
 
 
235
 
236
 
237
+ def merge_incomplete_sentences(chunks: list[dict]) -> list[dict]:
238
+ if not chunks:
239
+ return []
240
+
241
+ merged_chunks = []
242
+ current_merged_chunk = None
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
243
 
244
+ for i, chunk in enumerate(chunks):
245
+ text = chunk["text"].strip()
246
+ if not text:
247
+ continue
248
+
249
+ if current_merged_chunk is None:
250
+ current_merged_chunk = chunk.copy()
251
+ else:
252
+ # 이전 청크가 문장 종결 부호로 끝나는지 확인
253
+ prev_text_ends_with_punctuation = current_merged_chunk["text"].strip().endswith(('.', '?', '!', '...'))
254
+
255
+ # 시간 간격 확인 (예: 0.5초 미만)
256
+ time_gap_small = True
257
+ if "start_seconds" in current_merged_chunk and "start_seconds" in chunk:
258
+ if chunk["start_seconds"] - current_merged_chunk["start_seconds"] > 0.5: # 0.5초 이상 차이나면 병합하지 않음
259
+ time_gap_small = False
260
+
261
+ # 이전 청크가 문장 종결 부호로 끝나지 않았고, 시간 간격이 작을 때만 병합
262
+ if not prev_text_ends_with_punctuation and time_gap_small:
263
+ current_merged_chunk["text"] += " " + text
264
+ # 병합된 청크의 시간은 첫 청크의 시간을 유지
265
+ else:
266
+ # 병합하지 않는 경우, 현재까지 병합된 청크를 추가하고 새 청크 시작
267
+ merged_chunks.append(current_merged_chunk)
268
+ current_merged_chunk = chunk.copy()
269
+
270
+ # 마지막으로 남아있는 병합된 청크 추가
271
+ if current_merged_chunk is not None:
272
+ merged_chunks.append(current_merged_chunk)
273
+
274
+ logger.info(f"스마트 병합 후 청크 수: {len(merged_chunks)}")
275
+ return merged_chunks
276
 
277
  async def process_youtube_video_data(video_url: str) -> list[dict] | None:
278
  video_id = await get_youtube_video_id(video_url)
 
280
  logger.error(f"유효하지 않은 YouTube URL: {video_url}")
281
  return None
282
 
283
+ processed_chunks = await get_transcript_with_timestamps(video_id)
284
+
285
+ if processed_chunks:
286
+ # 1. 중복 제거
287
+ deduplicated_chunks = remove_duplicate_captions(processed_chunks)
288
+ # 2. 불완전한 문장 병합
289
+ final_chunks = merge_incomplete_sentences(deduplicated_chunks)
290
+ return final_chunks
291
+ else:
292
+ return processed_chunks