File size: 5,823 Bytes
eda02a7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# backend/app/rag_core.py

from sentence_transformers import SentenceTransformer
import faiss
import numpy as np
from typing import List, Dict, Tuple

# 전역 변수로 모델 로드 (앱 시작 시 한 번만 로드되도록)
# 네이버 클로바의 한국어 SentenceBERT 모델을 로드합니다.
try:
    # 네이버 클로바 HyperCLOVAX-SEED-Text-Instruct-0.5B 모델 로드
    model = SentenceTransformer('jhgan/ko-sroberta-multitask')
    print("INFO: 임베딩 모델 'jhgan/ko-sroberta-multitask' 로드 완료.")
except Exception as e:
    print(f"ERROR: 임베딩 모델 'jhgan/ko-sroberta-multitask' 로드 실패: {e}. 다국어 모델로 시도합니다.")
    # 대체 모델 로직 (필요하다면 유지하거나 제거할 수 있습니다)
    try:
        model = SentenceTransformer('sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2')
        print("INFO: 임베딩 모델 'sentence-transformers/paraphrase-multilingual-L12-v2' 로드 완료.")
    except Exception as e:
        print(f"ERROR: 대체 임베딩 모델 로드 실패: {e}. RAG 기능을 사용할 수 없습니다.")
        raise

async def perform_rag_query(chunks_with_timestamps: List[Dict], query: str, top_k: int = 5) -> List[Dict]:
    """
    제공된 텍스트 청크들과 쿼리를 사용하여 RAG(Retrieval-Augmented Generation) 검색을 수행합니다.
    현재는 임베딩 기반 유사도 검색만 수행하며, LLM 호출은 추후 추가됩니다.

    Args:
        chunks_with_timestamps: [{"text": "...", "timestamp": "...", "start_seconds": ...}] 형태의 리스트.
        query: 사용자 쿼리 문자열.
        top_k: 쿼리와 가장 유사한 상위 N개의 청크를 반환.

    Returns:
        쿼리와 가장 관련성 높은 상위 N개의 청크 (Dict) 리스트.
    """
    if not chunks_with_timestamps:
        print("WARNING: RAG 검색을 위한 텍스트 청크가 없습니다.")
        return []

    # 1. 텍스트 임베딩 생성
    # 모든 청크의 텍스트만 추출
    texts = [chunk["text"] for chunk in chunks_with_timestamps]
    print(f"INFO: 총 {len(texts)}개의 텍스트 청크 임베딩 시작.")
    
    # 모델의 encode 메서드는 비동기가 아니므로, 직접 호출.
    # 만약 시간이 오래 걸린다면 FastAPI의 `run_in_threadpool` 등을 고려.
    try:
        chunk_embeddings = model.encode(texts, convert_to_numpy=True)
        print("INFO: 텍스트 청크 임베딩 완료.")
    except Exception as e:
        print(f"ERROR: 텍스트 청크 임베딩 중 오류 발생: {e}")
        return []

    # 2. FAISS 인덱스 생성 및 청크 임베딩 추가
    dimension = chunk_embeddings.shape[1] # 임베딩 벡터의 차원
    index = faiss.IndexFlatL2(dimension) # L2 유클리드 거리를 사용하는 간단한 인덱스
    index.add(chunk_embeddings)
    print("INFO: FAISS 인덱스 생성 및 임베딩 추가 완료.")

    # 3. 쿼리 임베딩
    query_embedding = model.encode([query], convert_to_numpy=True)
    print("INFO: 쿼리 임베딩 완료.")

    # 4. 유사도 검색 (FAISS)
    # D: 거리 (Distance), I: 인덱스 (Index)
    distances, indices = index.search(query_embedding, top_k)
    print(f"INFO: FAISS 유사도 검색 완료. 상위 {top_k}개 결과.")

    retrieved_chunks = []
    for i, idx in enumerate(indices[0]):
        if idx >= 0: # 유효한 인덱스만 (FAISS가 -1을 반환할 수 있음)
            original_chunk = chunks_with_timestamps[idx]
            retrieved_chunks.append({
                "text": original_chunk["text"],
                "timestamp": original_chunk["timestamp"],
                "score": float(distances[0][i]) # 유사도 점수 (거리가 작을수록 유사)
            })
    
    # 거리가 작은 순서(유사도가 높은 순서)로 정렬하여 반환
    retrieved_chunks.sort(key=lambda x: x['score'])

    print(f"DEBUG: 최종 검색된 청크 수: {len(retrieved_chunks)}")
    return retrieved_chunks

# ... (나머지 perform_rag_query 함수 코드는 동일합니다) ...

# 테스트용 코드 (직접 실행 시)
if __name__ == "__main__":
    import asyncio

    sample_chunks = [
        {"text": "안녕하세요, 오늘 우리는 AI와 머신러닝의 미래에 대해 이야기할 것입니다.", "timestamp": "00:00:05", "start_seconds": 5},
        {"text": "특히 딥러닝과 신경망이 어떻게 혁신을 이끄는지 살펴보겠습니다.", "timestamp": "00:00:15", "start_seconds": 15},
        {"text": "이번 영상에서는 유튜브 영상 데이트 서비스 개발 과정을 보여드립니다.", "timestamp": "00:00:25", "start_seconds": 25},
        {"text": "파이썬과 FastAPI를 사용하여 백엔드를 구축하고 있습니다.", "timestamp": "00:00:35", "start_seconds": 35},
        {"text": "데이트 앱 개발은 쉬운 일이 아니지만, 재미있습니다.", "timestamp": "00:00:45", "start_seconds": 45},
        {"text": "이 모델은 자연어 처리(NLP) 작업에 매우 유용합니다.", "timestamp": "00:00:55", "start_seconds": 55},
        {"text": "다음주에는 새로운 데이트 장소를 탐험할 계획입니다.", "timestamp": "01:00:05", "start_seconds": 65},
    ]

    async def main():
        print("\n[테스트 1] 쿼리: 데이트 앱")
        query1 = "데이트 앱"
        results1 = await perform_rag_query(sample_chunks, query1, top_k=3)
        for r in results1:
            print(f"  [{r['score']:.4f}] {r['timestamp']}: {r['text']}")

        print("\n[테스트 2] 쿼리: 인공지능")
        query2 = "인공지능"
        results2 = await perform_rag_query(sample_chunks, query2, top_k=2)
        for r in results2:
            print(f"  [{r['score']:.4f}] {r['timestamp']}: {r['text']}")

    asyncio.run(main())