Spaces:
Runtime error
Runtime error
import streamlit as st | |
import os | |
from pinecone import Pinecone | |
from sentence_transformers import SentenceTransformer | |
from typing import List, Dict | |
import re # For parsing timestamp and extracting video ID | |
import streamlit.components.v1 as components # For embedding HTML | |
from openai import OpenAI # Import OpenAI library | |
import logging | |
# Setup logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
# --- Helper Functions (Existing: parse_timestamp_to_seconds, get_youtube_video_id, add_timestamp_to_youtube_url, generate_youtube_embed_html) --- | |
def parse_timestamp_to_seconds(timestamp: str) -> int | None: | |
"""HH:MM:SS 또는 HH:MM:SS.ms 형식의 타임스탬프를 초 단위로 변환합니다.""" | |
if not isinstance(timestamp, str): | |
return None | |
# Remove milliseconds part if present | |
timestamp_no_ms = timestamp.split('.')[0] | |
parts = timestamp_no_ms.split(':') | |
try: | |
if len(parts) == 3: | |
h, m, s = map(int, parts) | |
return h * 3600 + m * 60 + s | |
elif len(parts) == 2: | |
m, s = map(int, parts) | |
return m * 60 + s | |
elif len(parts) == 1: | |
return int(parts[0]) | |
else: | |
return None | |
except ValueError: | |
return None | |
def get_youtube_video_id(url: str) -> str | None: | |
"""YouTube URL에서 비디오 ID를 추출합니다.""" | |
if not isinstance(url, str): | |
return None | |
# Standard YouTube URLs (youtube.com/watch?v=...), shortened URLs (youtu.be/...), etc. | |
match = re.search(r"(?:v=|/|youtu\.be/|embed/|shorts/)([0-9A-Za-z_-]{11})", url) | |
return match.group(1) if match else None | |
def add_timestamp_to_youtube_url(youtube_url: str, timestamp: str) -> str: | |
"""YouTube URL에 타임스탬프를 추가합니다.""" | |
seconds = parse_timestamp_to_seconds(timestamp) | |
if seconds is None or not youtube_url: | |
return youtube_url # Return original URL if timestamp is invalid or URL is empty | |
separator = '&' if '?' in youtube_url else '?' | |
# Remove existing t= parameter if present | |
cleaned_url = re.sub(r'[?&]t=\d+s?', '', youtube_url) | |
separator = '&' if '?' in cleaned_url else '?' # Re-check separator after cleaning | |
return f"{cleaned_url}{separator}t={seconds}s" | |
def generate_youtube_embed_html(youtube_url: str, timestamp: str) -> str | None: | |
"""타임스탬프가 적용된 YouTube 임베드 HTML 코드를 생성합니다. 가로 800px 고정, 세로 자동 조절.""" | |
video_id = get_youtube_video_id(youtube_url) | |
start_seconds = parse_timestamp_to_seconds(timestamp) | |
if not video_id: | |
logger.warning(f"Could not extract video ID from URL: {youtube_url}") | |
return None # Cannot generate embed code without video ID | |
start_param = f"start={start_seconds}" if start_seconds is not None else "" | |
# Use aspect ratio approach with fixed width 800px | |
return f''' | |
<div style="position: relative; width: 800px; padding-bottom: 450px; /* 800px * 9 / 16 = 450px */ height: 0; overflow: hidden;"> | |
<iframe | |
src="https://www.youtube.com/embed/{video_id}?{start_param}&autoplay=0&rel=0" | |
frameborder="0" | |
allow="accelerometer; autoplay; clipboard-write; encrypted-media; gyroscope; picture-in-picture; web-share" | |
referrerpolicy="strict-origin-when-cross-origin" | |
allowfullscreen | |
style="position: absolute; top: 0; left: 0; width: 100%; height: 100%;"> | |
</iframe> | |
</div> | |
''' | |
# --- 설정 --- | |
# Pinecone 설정 | |
PINECONE_API_KEY = os.getenv("PINECONE_API_KEY","pcsk_PZHLK_TRAvMCyNmJM4FKGCX7rbbY22a58fhnWYasx1mf3WL6sRasoASZXfsbnJYvCQ13w") # Load from environment variable | |
PINECONE_ENV = os.getenv("PINECONE_ENV", "us-east-1") | |
INDEX_NAME = "video-embeddings" | |
EMBEDDING_MODEL = "jhgan/ko-sroberta-multitask" | |
# OpenAI 설정 | |
OPENAI_API_KEY = "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" | |
# --- 리소스 로딩 (캐싱 활용) --- | |
def init_pinecone(): | |
"""Pinecone 클라이언트를 초기화합니다.""" | |
api_key = PINECONE_API_KEY | |
if not api_key: | |
st.error("Pinecone API 키가 설정되지 않았습니다. 환경 변수를 확인하세요.") | |
st.stop() | |
try: | |
pc = Pinecone(api_key=api_key) | |
logger.info("Successfully connected to Pinecone.") | |
return pc | |
except Exception as e: | |
st.error(f"Pinecone 초기화 중 오류 발생: {e}") | |
st.stop() | |
def load_embedding_model(): | |
"""Sentence Transformer 모델을 로드합니다.""" | |
try: | |
model = SentenceTransformer(EMBEDDING_MODEL) | |
logger.info(f"Successfully loaded embedding model: {EMBEDDING_MODEL}") | |
return model | |
except Exception as e: | |
st.error(f"임베딩 모델 로딩 중 오류 발생: {e}") | |
st.stop() | |
def get_pinecone_index(_pc: Pinecone, index_name: str): | |
"""Pinecone 인덱스 객체를 가져옵니다.""" | |
try: | |
index = _pc.Index(index_name) | |
# Optionally, do a quick check like index.describe_index_stats() to confirm connection | |
stats = index.describe_index_stats() | |
logger.info(f"Successfully connected to Pinecone index '{index_name}'. Stats: {stats.get('total_vector_count', 'N/A')} vectors") | |
return index | |
except Exception as e: | |
st.error(f"Pinecone 인덱스 '{index_name}' 연결 중 오류 발생: {e}. 인덱스가 존재하고 활성 상태인지 확인하세요.") | |
st.stop() | |
def init_openai_client(): | |
"""OpenAI 클라이언트를 초기화합니다.""" | |
if not OPENAI_API_KEY: | |
st.error("OpenAI API 키가 설정되지 않았습니다. 환경 변수를 확인하세요.") | |
st.stop() | |
try: | |
client = OpenAI(api_key=OPENAI_API_KEY) | |
# Test connection (optional, but recommended) | |
client.models.list() | |
logger.info("Successfully connected to OpenAI.") | |
return client | |
except Exception as e: | |
st.error(f"OpenAI 클라이언트 초기화 또는 연결 테스트 중 오류 발생: {e}") | |
st.stop() | |
# --- 검색 함수 --- | |
def search(query: str, top_k: int = 5, _index=None, _model=None) -> List[Dict]: | |
"""Pinecone 인덱스에서 검색을 수행하고 title과 original_text를 포함합니다.""" | |
if not query or _index is None or _model is None: | |
return [] | |
try: | |
query_vec = _model.encode(query, convert_to_numpy=True).tolist() | |
result = _index.query(vector=query_vec, top_k=top_k, include_metadata=True) | |
matches = result.get("matches", []) | |
search_results = [] | |
for m in matches: | |
metadata = m.get("metadata", {}) | |
search_results.append({ | |
"URL": metadata.get("url", "N/A"), | |
"타임스탬프": metadata.get("timestamp", "N/A"), | |
"타입": metadata.get("type", "N/A"), | |
"제목": metadata.get("title", "N/A"), # 제목 추가 | |
"요약": metadata.get("summary", "N/A"), | |
"원본텍스트": metadata.get("original_text", "N/A"), # 컨텍스트로 활용할 원본 텍스트 | |
"점수": m.get("score", 0.0) | |
}) | |
logger.info(f"Pinecone search returned {len(search_results)} results for query: '{query[:50]}...'") | |
return search_results | |
except Exception as e: | |
st.error(f"Pinecone 검색 중 오류 발생: {e}") | |
logger.error(f"Error during Pinecone search: {e}", exc_info=True) | |
return [] | |
# --- OpenAI 답변 생성 함수 --- | |
def generate_khan_answer(query: str, search_results: List[Dict], client: OpenAI) -> str: | |
"""사용자 질문과 검색 결과를 바탕으로 Khan 페르소나 답변을 생성합니다.""" | |
if not search_results: | |
# Return a persona-consistent message even when no results are found | |
return "현재 질문에 대해 참고할 만한 관련 영상을 찾지 못했습니다. 질문을 조금 더 명확하게 해주시거나 다른 방식으로 질문해주시면 도움이 될 것 같습니다." | |
# Build context string for OpenAI more robustly, including timestamped URL | |
context_parts = [] | |
for i, r in enumerate(search_results): | |
original_text_snippet = "" | |
if r.get('원본텍스트'): | |
snippet = r['원본텍스트'][:200] | |
original_text_snippet = f"\n(원본 내용 일부: {snippet}...)" | |
# Generate timestamped URL if possible | |
timestamped_url_str = "N/A" | |
url = r.get('URL', 'N/A') | |
timestamp = r.get('타임스탬프', 'N/A') | |
is_youtube = url and isinstance(url, str) and ('youtube.com' in url or 'youtu.be' in url) | |
has_valid_timestamp = timestamp and timestamp != 'N/A' and parse_timestamp_to_seconds(timestamp) is not None | |
if is_youtube and has_valid_timestamp: | |
try: | |
timestamped_url_str = add_timestamp_to_youtube_url(url, timestamp) | |
except Exception: | |
timestamped_url_str = url # Fallback to original URL on error | |
elif url != "N/A": | |
timestamped_url_str = url # Use original URL if not YouTube/no timestamp | |
context_parts.append( | |
f"관련 정보 {i+1}:\n" | |
f"제목: {r.get('제목', 'N/A')}\n" | |
f"영상 URL (원본): {url}\n" | |
f"타임스탬프: {timestamp}\n" | |
f"타임스탬프 적용 URL: {timestamped_url_str}\n" # Add the timestamped URL here | |
f"내용 타입: {r.get('타입', 'N/A')}\n" | |
f"요약: {r.get('요약', 'N/A')}" | |
f"{original_text_snippet}" # Append the snippet safely | |
) | |
context = "\n\n---\n\n".join(context_parts) # Join the parts | |
# Updated system prompt to instruct Markdown link usage | |
system_prompt = """너는 현실적인 조언을 잘하는 PM 멘토 Khan이다. | |
- 말투는 단호하지만 공감력이 있다. "~입니다." 또는 "~죠." 와 같이 명확하게 끝맺는다. 존댓말을 사용한다. | |
- 모호한 위로보다는 구조적이고 실용적인 제안을 한다. 문제의 핵심을 파악하고 구체적인 해결책이나 다음 단계를 제시한다. | |
- 질문이 막연하면 구체화해서 되물어본다. | |
- (이전 대화 기록은 없으므로) 반복 질문에는 "이전에 유사한 내용을 찾아봤었죠. 다시 한번 살펴보면..." 과 같이 언급할 수 있다. | |
- 긴 설명보단 핵심을 빠르게 전달하고, 필요하다면 간결한 비유를 활용한다. | |
- 주어진 '관련 정보' (영상 제목, 요약, 원본 내용, 타임스탬프 적용 URL 등)를 바탕으로 답변해야 한다. 정보가 부족하거나 질문과 관련성이 낮으면, 그 점을 명확히 밝히고 추가 정보를 요청하거나 질문을 구체화하도록 유도한다. | |
- **답변 중 관련 정보를 참조할 때는, 반드시 '타임스탬프 적용 URL'을 사용하여 다음과 같은 Markdown 링크 형식으로 만들어야 한다: `[영상 제목](타임스탬프_적용_URL)`. 예를 들어, "자세한 내용은 [비개발자가 연봉 2억을 받는 현실적인 방법](https://www.youtube.com/watch?v=VIDEO_ID&t=178s) 영상을 참고하시면 도움이 될 겁니다." 와 같이 표시한다.** | |
- 답변은 한국어로 한다.""" | |
# Use triple quotes for the multi-line f-string | |
user_message = f"""사용자 질문: {query} | |
아래 관련 정보를 바탕으로 Khan 멘토로서 답변해주세요: | |
{context}""" | |
try: | |
logger.info("Calling OpenAI API...") | |
completion = client.chat.completions.create( | |
model="gpt-4o-mini", # Use gpt-4 if available and preferred | |
messages=[ | |
{"role": "system", "content": system_prompt}, | |
{"role": "user", "content": user_message} | |
], | |
temperature=0.5, # Slightly less creative, more focused on instructions | |
) | |
answer = completion.choices[0].message.content | |
logger.info("Received response from OpenAI.") | |
return answer.strip() | |
except Exception as e: | |
st.error(f"OpenAI 답변 생성 중 오류 발생: {e}") | |
logger.error(f"Error during OpenAI API call: {e}", exc_info=True) | |
return "답변을 생성하는 중에 문제가 발생했습니다. OpenAI API 키 또는 서비스 상태를 확인해주세요." | |
# --- Streamlit 앱 UI (리팩토링) --- | |
st.set_page_config(page_title="Khan 멘토 (PM 영상 기반)", layout="wide") | |
st.title("✨ Khan 멘토에게 질문하기") | |
st.markdown( | |
'<a href="https://forms.gle/SUqrGBT3dktSB7v26" target="_blank" style="display:inline-block; background:#f9e79f; color:#1a237e; font-weight:bold; padding:0.5em 1.2em; border-radius:8px; text-decoration:none; font-size:1.1em; margin-bottom:16px;">📝 서비스 사용성 설문조사 참여하기</a>', | |
unsafe_allow_html=True | |
) | |
st.markdown("PM 관련 영상 내용을 기반으로 Khan 멘토가 답변해 드립니다.") | |
# --- API 키 확인 및 리소스 초기화 --- | |
openai_client = init_openai_client() | |
pc = init_pinecone() | |
model = load_embedding_model() | |
index = get_pinecone_index(pc, INDEX_NAME) | |
# --- 상태 관리 --- | |
if 'user_state' not in st.session_state: | |
st.session_state['user_state'] = '' | |
if 'empathy_message' not in st.session_state: | |
st.session_state['empathy_message'] = '' | |
if 'example_questions' not in st.session_state: | |
st.session_state['example_questions'] = [] | |
if 'selected_question' not in st.session_state: | |
st.session_state['selected_question'] = '' | |
if 'khan_answer' not in st.session_state: | |
st.session_state['khan_answer'] = '' | |
if 'step' not in st.session_state: | |
st.session_state['step'] = 1 | |
# --- 1단계: 사용자 상태 입력 --- | |
if st.session_state['step'] == 1: | |
user_state = st.text_area("지금 어떤 상황이신가요? 고민/상황을 자유롭게 적어주세요.", value=st.session_state['user_state']) | |
if st.button("상태 말하기"): | |
st.session_state['user_state'] = user_state | |
# 2단계로 이동 | |
st.session_state['step'] = 2 | |
st.rerun() | |
# --- 2단계: 공감 메시지 + 예시 질문 생성 --- | |
if st.session_state['step'] == 2: | |
with st.spinner("어떤 상황인지 고민해볼께요..."): | |
# 1. 공감 메시지 생성 | |
empathy_prompt = f""" | |
너는 따뜻하고 공감 능력이 뛰어난 상담가야. | |
아래 사용자의 상황을 듣고, 충분히 감정적으로 공감해주고, 용기를 북돋아주는 말을 해줘. | |
상황: "{st.session_state['user_state']}" | |
""" | |
try: | |
empathy_response = openai_client.chat.completions.create( | |
model="gpt-4o", | |
messages=[{"role": "system", "content": empathy_prompt}], | |
temperature=0.7, | |
) | |
st.session_state['empathy_message'] = empathy_response.choices[0].message.content.strip() | |
except Exception as e: | |
st.session_state['empathy_message'] = f"공감 메시지 생성 중 오류: {e}" | |
# 2. 예시 질문 생성 | |
example_prompt = f""" | |
아래 상황에서 Khan 멘토에게 할 수 있는 구체적이고 실용적인 질문 3~4가지를 한국어로 만들어줘.\n각 질문은 한 문장으로, 실제로 도움이 될 만한 내용이어야 해.\n상황: "{st.session_state['user_state']}" | |
""" | |
try: | |
example_response = openai_client.chat.completions.create( | |
model="gpt-4o", | |
messages=[{"role": "system", "content": example_prompt}], | |
temperature=0.5, | |
) | |
# 응답에서 질문만 추출 (숫자/기호/줄바꿈 등 정제) | |
import re | |
raw = example_response.choices[0].message.content.strip() | |
questions = re.findall(r'\d+\.\s*(.+)', raw) | |
if not questions: | |
# 숫자 없이 줄바꿈만 있을 경우 | |
questions = [q.strip('-• ').strip() for q in raw.split('\n') if q.strip()] | |
st.session_state['example_questions'] = questions[:4] | |
except Exception as e: | |
st.session_state['example_questions'] = [f"예시 질문 생성 중 오류: {e}"] | |
# 3단계로 이동 | |
st.session_state['step'] = 3 | |
st.rerun() | |
# --- 3단계: 공감 메시지 + 예시 질문 버튼/직접입력 + Khan 답변 --- | |
if st.session_state['step'] == 3: | |
st.success(st.session_state['empathy_message']) | |
st.markdown("#### 이런 질문을 해볼 수 있어요!") | |
cols = st.columns(len(st.session_state['example_questions'])) | |
for i, q in enumerate(st.session_state['example_questions']): | |
if cols[i].button(q): | |
st.session_state['selected_question'] = q | |
st.session_state['step'] = 4 | |
st.rerun() | |
st.markdown("---") | |
user_q = st.text_input("직접 궁금한 점을 입력해도 좋아요!", value=st.session_state['selected_question']) | |
if st.button("Khan 멘토에게 질문하기"): | |
st.session_state['selected_question'] = user_q | |
st.session_state['step'] = 4 | |
st.rerun() | |
# --- 4단계: Khan 멘토 답변 --- | |
if st.session_state['step'] == 4: | |
with st.spinner("Khan 멘토가 답변을 준비하는 중..."): | |
pinecone_results = search(st.session_state['selected_question'], top_k=5, _index=index, _model=model) | |
khan_answer = generate_khan_answer(st.session_state['selected_question'], pinecone_results, openai_client) | |
st.session_state['khan_answer'] = khan_answer | |
st.subheader("💡 Khan 멘토의 답변") | |
st.markdown(st.session_state['khan_answer']) | |
# 참고 영상 정보 표시 | |
if pinecone_results: | |
with st.expander("답변에 참고한 영상 정보 보기"): | |
displayed_urls = set() | |
for i, r in enumerate(pinecone_results): | |
url = r.get('URL', 'N/A') | |
if url in displayed_urls or url == 'N/A': | |
continue | |
displayed_urls.add(url) | |
st.markdown(f"--- **참고 자료 {len(displayed_urls)} (유사도: {r['점수']:.4f})** ---") | |
st.markdown(f"**제목:** {r.get('제목', 'N/A')}") | |
st.markdown(f"**요약:** {r.get('요약', 'N/A')}") | |
timestamp = r.get('타임스탬프', 'N/A') | |
is_youtube = url and isinstance(url, str) and ('youtube.com' in url or 'youtu.be' in url) | |
start_seconds = None | |
if is_youtube and timestamp and timestamp != 'N/A': | |
start_seconds = parse_timestamp_to_seconds(timestamp) | |
if is_youtube and start_seconds is not None: | |
try: | |
timestamped_link_url = add_timestamp_to_youtube_url(url, timestamp) | |
st.markdown(f"**영상 링크 (타임스탬프 포함):** [{timestamped_link_url}]({timestamped_link_url})") | |
except Exception as e: | |
logger.error(f"Error creating timestamped URL for link: {e}") | |
st.markdown(f"**영상 링크 (원본):** [{url}]({url})") | |
elif url != "N/A" and isinstance(url, str) and url.startswith("http"): | |
st.markdown(f"**URL:** [{url}]({url})") | |
else: | |
st.markdown(f"**URL:** {url}") | |
if is_youtube and url != "N/A": | |
col1, col2 = st.columns(2) | |
with col1: | |
try: | |
st.video(url, start_time=start_seconds or 0) | |
except Exception as e: | |
st.error(f"비디오({url}) 재생 중 오류 발생: {e}") | |
st.markdown(f"[YouTube에서 보기]({url})") | |
elif url != "N/A": | |
col1, col2 = st.columns(2) | |
with col1: | |
try: | |
st.video(url) | |
except Exception as e: | |
logger.warning(f"st.video failed for non-YouTube URL {url}: {e}") | |
st.markdown("---") | |
st.caption("Powered by Pinecone, Sentence Transformers, and OpenAI") | |
# 다시 처음으로 돌아가기 버튼 | |
if st.button("다시 질문 흐름 시작하기"): | |
for k in ['user_state','empathy_message','example_questions','selected_question','khan_answer','step']: | |
st.session_state[k] = '' if k != 'step' else 1 | |
st.rerun() | |