khanpm / app.py
morethanair's picture
Update app.py
4b6ebf0
raw
history blame
20.7 kB
import streamlit as st
import os
from pinecone import Pinecone
from sentence_transformers import SentenceTransformer
from typing import List, Dict
import re # For parsing timestamp and extracting video ID
import streamlit.components.v1 as components # For embedding HTML
from openai import OpenAI # Import OpenAI library
import logging
# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# --- Helper Functions (Existing: parse_timestamp_to_seconds, get_youtube_video_id, add_timestamp_to_youtube_url, generate_youtube_embed_html) ---
def parse_timestamp_to_seconds(timestamp: str) -> int | None:
"""HH:MM:SS 또는 HH:MM:SS.ms 형식의 타임스탬프를 초 단위로 변환합니다."""
if not isinstance(timestamp, str):
return None
# Remove milliseconds part if present
timestamp_no_ms = timestamp.split('.')[0]
parts = timestamp_no_ms.split(':')
try:
if len(parts) == 3:
h, m, s = map(int, parts)
return h * 3600 + m * 60 + s
elif len(parts) == 2:
m, s = map(int, parts)
return m * 60 + s
elif len(parts) == 1:
return int(parts[0])
else:
return None
except ValueError:
return None
def get_youtube_video_id(url: str) -> str | None:
"""YouTube URL에서 비디오 ID를 추출합니다."""
if not isinstance(url, str):
return None
# Standard YouTube URLs (youtube.com/watch?v=...), shortened URLs (youtu.be/...), etc.
match = re.search(r"(?:v=|/|youtu\.be/|embed/|shorts/)([0-9A-Za-z_-]{11})", url)
return match.group(1) if match else None
def add_timestamp_to_youtube_url(youtube_url: str, timestamp: str) -> str:
"""YouTube URL에 타임스탬프를 추가합니다."""
seconds = parse_timestamp_to_seconds(timestamp)
if seconds is None or not youtube_url:
return youtube_url # Return original URL if timestamp is invalid or URL is empty
separator = '&' if '?' in youtube_url else '?'
# Remove existing t= parameter if present
cleaned_url = re.sub(r'[?&]t=\d+s?', '', youtube_url)
separator = '&' if '?' in cleaned_url else '?' # Re-check separator after cleaning
return f"{cleaned_url}{separator}t={seconds}s"
def generate_youtube_embed_html(youtube_url: str, timestamp: str) -> str | None:
"""타임스탬프가 적용된 YouTube 임베드 HTML 코드를 생성합니다. 가로 800px 고정, 세로 자동 조절."""
video_id = get_youtube_video_id(youtube_url)
start_seconds = parse_timestamp_to_seconds(timestamp)
if not video_id:
logger.warning(f"Could not extract video ID from URL: {youtube_url}")
return None # Cannot generate embed code without video ID
start_param = f"start={start_seconds}" if start_seconds is not None else ""
# Use aspect ratio approach with fixed width 800px
return f'''
<div style="position: relative; width: 800px; padding-bottom: 450px; /* 800px * 9 / 16 = 450px */ height: 0; overflow: hidden;">
<iframe
src="https://www.youtube.com/embed/{video_id}?{start_param}&autoplay=0&rel=0"
frameborder="0"
allow="accelerometer; autoplay; clipboard-write; encrypted-media; gyroscope; picture-in-picture; web-share"
referrerpolicy="strict-origin-when-cross-origin"
allowfullscreen
style="position: absolute; top: 0; left: 0; width: 100%; height: 100%;">
</iframe>
</div>
'''
# --- 설정 ---
# Pinecone 설정
PINECONE_API_KEY = os.getenv("PINECONE_API_KEY","pcsk_PZHLK_TRAvMCyNmJM4FKGCX7rbbY22a58fhnWYasx1mf3WL6sRasoASZXfsbnJYvCQ13w") # Load from environment variable
PINECONE_ENV = os.getenv("PINECONE_ENV", "us-east-1")
INDEX_NAME = "video-embeddings"
EMBEDDING_MODEL = "jhgan/ko-sroberta-multitask"
# OpenAI 설정
OPENAI_API_KEY = "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"
# --- 리소스 로딩 (캐싱 활용) ---
@st.cache_resource
def init_pinecone():
"""Pinecone 클라이언트를 초기화합니다."""
api_key = PINECONE_API_KEY
if not api_key:
st.error("Pinecone API 키가 설정되지 않았습니다. 환경 변수를 확인하세요.")
st.stop()
try:
pc = Pinecone(api_key=api_key)
logger.info("Successfully connected to Pinecone.")
return pc
except Exception as e:
st.error(f"Pinecone 초기화 중 오류 발생: {e}")
st.stop()
@st.cache_resource
def load_embedding_model():
"""Sentence Transformer 모델을 로드합니다."""
try:
model = SentenceTransformer(EMBEDDING_MODEL)
logger.info(f"Successfully loaded embedding model: {EMBEDDING_MODEL}")
return model
except Exception as e:
st.error(f"임베딩 모델 로딩 중 오류 발생: {e}")
st.stop()
@st.cache_resource
def get_pinecone_index(_pc: Pinecone, index_name: str):
"""Pinecone 인덱스 객체를 가져옵니다."""
try:
index = _pc.Index(index_name)
# Optionally, do a quick check like index.describe_index_stats() to confirm connection
stats = index.describe_index_stats()
logger.info(f"Successfully connected to Pinecone index '{index_name}'. Stats: {stats.get('total_vector_count', 'N/A')} vectors")
return index
except Exception as e:
st.error(f"Pinecone 인덱스 '{index_name}' 연결 중 오류 발생: {e}. 인덱스가 존재하고 활성 상태인지 확인하세요.")
st.stop()
@st.cache_resource
def init_openai_client():
"""OpenAI 클라이언트를 초기화합니다."""
if not OPENAI_API_KEY:
st.error("OpenAI API 키가 설정되지 않았습니다. 환경 변수를 확인하세요.")
st.stop()
try:
client = OpenAI(api_key=OPENAI_API_KEY)
# Test connection (optional, but recommended)
client.models.list()
logger.info("Successfully connected to OpenAI.")
return client
except Exception as e:
st.error(f"OpenAI 클라이언트 초기화 또는 연결 테스트 중 오류 발생: {e}")
st.stop()
# --- 검색 함수 ---
def search(query: str, top_k: int = 5, _index=None, _model=None) -> List[Dict]:
"""Pinecone 인덱스에서 검색을 수행하고 title과 original_text를 포함합니다."""
if not query or _index is None or _model is None:
return []
try:
query_vec = _model.encode(query, convert_to_numpy=True).tolist()
result = _index.query(vector=query_vec, top_k=top_k, include_metadata=True)
matches = result.get("matches", [])
search_results = []
for m in matches:
metadata = m.get("metadata", {})
search_results.append({
"URL": metadata.get("url", "N/A"),
"타임스탬프": metadata.get("timestamp", "N/A"),
"타입": metadata.get("type", "N/A"),
"제목": metadata.get("title", "N/A"), # 제목 추가
"요약": metadata.get("summary", "N/A"),
"원본텍스트": metadata.get("original_text", "N/A"), # 컨텍스트로 활용할 원본 텍스트
"점수": m.get("score", 0.0)
})
logger.info(f"Pinecone search returned {len(search_results)} results for query: '{query[:50]}...'")
return search_results
except Exception as e:
st.error(f"Pinecone 검색 중 오류 발생: {e}")
logger.error(f"Error during Pinecone search: {e}", exc_info=True)
return []
# --- OpenAI 답변 생성 함수 ---
def generate_khan_answer(query: str, search_results: List[Dict], client: OpenAI) -> str:
"""사용자 질문과 검색 결과를 바탕으로 Khan 페르소나 답변을 생성합니다."""
if not search_results:
# Return a persona-consistent message even when no results are found
return "현재 질문에 대해 참고할 만한 관련 영상을 찾지 못했습니다. 질문을 조금 더 명확하게 해주시거나 다른 방식으로 질문해주시면 도움이 될 것 같습니다."
# Build context string for OpenAI more robustly, including timestamped URL
context_parts = []
for i, r in enumerate(search_results):
original_text_snippet = ""
if r.get('원본텍스트'):
snippet = r['원본텍스트'][:200]
original_text_snippet = f"\n(원본 내용 일부: {snippet}...)"
# Generate timestamped URL if possible
timestamped_url_str = "N/A"
url = r.get('URL', 'N/A')
timestamp = r.get('타임스탬프', 'N/A')
is_youtube = url and isinstance(url, str) and ('youtube.com' in url or 'youtu.be' in url)
has_valid_timestamp = timestamp and timestamp != 'N/A' and parse_timestamp_to_seconds(timestamp) is not None
if is_youtube and has_valid_timestamp:
try:
timestamped_url_str = add_timestamp_to_youtube_url(url, timestamp)
except Exception:
timestamped_url_str = url # Fallback to original URL on error
elif url != "N/A":
timestamped_url_str = url # Use original URL if not YouTube/no timestamp
context_parts.append(
f"관련 정보 {i+1}:\n"
f"제목: {r.get('제목', 'N/A')}\n"
f"영상 URL (원본): {url}\n"
f"타임스탬프: {timestamp}\n"
f"타임스탬프 적용 URL: {timestamped_url_str}\n" # Add the timestamped URL here
f"내용 타입: {r.get('타입', 'N/A')}\n"
f"요약: {r.get('요약', 'N/A')}"
f"{original_text_snippet}" # Append the snippet safely
)
context = "\n\n---\n\n".join(context_parts) # Join the parts
# Updated system prompt to instruct Markdown link usage
system_prompt = """너는 현실적인 조언을 잘하는 PM 멘토 Khan이다.
- 말투는 단호하지만 공감력이 있다. "~입니다." 또는 "~죠." 와 같이 명확하게 끝맺는다. 존댓말을 사용한다.
- 모호한 위로보다는 구조적이고 실용적인 제안을 한다. 문제의 핵심을 파악하고 구체적인 해결책이나 다음 단계를 제시한다.
- 질문이 막연하면 구체화해서 되물어본다.
- (이전 대화 기록은 없으므로) 반복 질문에는 "이전에 유사한 내용을 찾아봤었죠. 다시 한번 살펴보면..." 과 같이 언급할 수 있다.
- 긴 설명보단 핵심을 빠르게 전달하고, 필요하다면 간결한 비유를 활용한다.
- 주어진 '관련 정보' (영상 제목, 요약, 원본 내용, 타임스탬프 적용 URL 등)를 바탕으로 답변해야 한다. 정보가 부족하거나 질문과 관련성이 낮으면, 그 점을 명확히 밝히고 추가 정보를 요청하거나 질문을 구체화하도록 유도한다.
- **답변 중 관련 정보를 참조할 때는, 반드시 '타임스탬프 적용 URL'을 사용하여 다음과 같은 Markdown 링크 형식으로 만들어야 한다: `[영상 제목](타임스탬프_적용_URL)`. 예를 들어, "자세한 내용은 [비개발자가 연봉 2억을 받는 현실적인 방법](https://www.youtube.com/watch?v=VIDEO_ID&t=178s) 영상을 참고하시면 도움이 될 겁니다." 와 같이 표시한다.**
- 답변은 한국어로 한다."""
# Use triple quotes for the multi-line f-string
user_message = f"""사용자 질문: {query}
아래 관련 정보를 바탕으로 Khan 멘토로서 답변해주세요:
{context}"""
try:
logger.info("Calling OpenAI API...")
completion = client.chat.completions.create(
model="gpt-4o-mini", # Use gpt-4 if available and preferred
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_message}
],
temperature=0.5, # Slightly less creative, more focused on instructions
)
answer = completion.choices[0].message.content
logger.info("Received response from OpenAI.")
return answer.strip()
except Exception as e:
st.error(f"OpenAI 답변 생성 중 오류 발생: {e}")
logger.error(f"Error during OpenAI API call: {e}", exc_info=True)
return "답변을 생성하는 중에 문제가 발생했습니다. OpenAI API 키 또는 서비스 상태를 확인해주세요."
# --- Streamlit 앱 UI (리팩토링) ---
st.set_page_config(page_title="Khan 멘토 (PM 영상 기반)", layout="wide")
st.title("✨ Khan 멘토에게 질문하기")
st.markdown(
'<a href="https://forms.gle/SUqrGBT3dktSB7v26" target="_blank" style="display:inline-block; background:#f9e79f; color:#1a237e; font-weight:bold; padding:0.5em 1.2em; border-radius:8px; text-decoration:none; font-size:1.1em; margin-bottom:16px;">📝 서비스 사용성 설문조사 참여하기</a>',
unsafe_allow_html=True
)
st.markdown("PM 관련 영상 내용을 기반으로 Khan 멘토가 답변해 드립니다.")
# --- API 키 확인 및 리소스 초기화 ---
openai_client = init_openai_client()
pc = init_pinecone()
model = load_embedding_model()
index = get_pinecone_index(pc, INDEX_NAME)
# --- 상태 관리 ---
if 'user_state' not in st.session_state:
st.session_state['user_state'] = ''
if 'empathy_message' not in st.session_state:
st.session_state['empathy_message'] = ''
if 'example_questions' not in st.session_state:
st.session_state['example_questions'] = []
if 'selected_question' not in st.session_state:
st.session_state['selected_question'] = ''
if 'khan_answer' not in st.session_state:
st.session_state['khan_answer'] = ''
if 'step' not in st.session_state:
st.session_state['step'] = 1
# --- 1단계: 사용자 상태 입력 ---
if st.session_state['step'] == 1:
user_state = st.text_area("지금 어떤 상황이신가요? 고민/상황을 자유롭게 적어주세요.", value=st.session_state['user_state'])
if st.button("상태 말하기"):
st.session_state['user_state'] = user_state
# 2단계로 이동
st.session_state['step'] = 2
st.rerun()
# --- 2단계: 공감 메시지 + 예시 질문 생성 ---
if st.session_state['step'] == 2:
with st.spinner("어떤 상황인지 고민해볼께요..."):
# 1. 공감 메시지 생성
empathy_prompt = f"""
너는 따뜻하고 공감 능력이 뛰어난 상담가야.
아래 사용자의 상황을 듣고, 충분히 감정적으로 공감해주고, 용기를 북돋아주는 말을 해줘.
상황: "{st.session_state['user_state']}"
"""
try:
empathy_response = openai_client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "system", "content": empathy_prompt}],
temperature=0.7,
)
st.session_state['empathy_message'] = empathy_response.choices[0].message.content.strip()
except Exception as e:
st.session_state['empathy_message'] = f"공감 메시지 생성 중 오류: {e}"
# 2. 예시 질문 생성
example_prompt = f"""
아래 상황에서 Khan 멘토에게 할 수 있는 구체적이고 실용적인 질문 3~4가지를 한국어로 만들어줘.\n각 질문은 한 문장으로, 실제로 도움이 될 만한 내용이어야 해.\n상황: "{st.session_state['user_state']}"
"""
try:
example_response = openai_client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "system", "content": example_prompt}],
temperature=0.5,
)
# 응답에서 질문만 추출 (숫자/기호/줄바꿈 등 정제)
import re
raw = example_response.choices[0].message.content.strip()
questions = re.findall(r'\d+\.\s*(.+)', raw)
if not questions:
# 숫자 없이 줄바꿈만 있을 경우
questions = [q.strip('-• ').strip() for q in raw.split('\n') if q.strip()]
st.session_state['example_questions'] = questions[:4]
except Exception as e:
st.session_state['example_questions'] = [f"예시 질문 생성 중 오류: {e}"]
# 3단계로 이동
st.session_state['step'] = 3
st.rerun()
# --- 3단계: 공감 메시지 + 예시 질문 버튼/직접입력 + Khan 답변 ---
if st.session_state['step'] == 3:
st.success(st.session_state['empathy_message'])
st.markdown("#### 이런 질문을 해볼 수 있어요!")
cols = st.columns(len(st.session_state['example_questions']))
for i, q in enumerate(st.session_state['example_questions']):
if cols[i].button(q):
st.session_state['selected_question'] = q
st.session_state['step'] = 4
st.rerun()
st.markdown("---")
user_q = st.text_input("직접 궁금한 점을 입력해도 좋아요!", value=st.session_state['selected_question'])
if st.button("Khan 멘토에게 질문하기"):
st.session_state['selected_question'] = user_q
st.session_state['step'] = 4
st.rerun()
# --- 4단계: Khan 멘토 답변 ---
if st.session_state['step'] == 4:
with st.spinner("Khan 멘토가 답변을 준비하는 중..."):
pinecone_results = search(st.session_state['selected_question'], top_k=5, _index=index, _model=model)
khan_answer = generate_khan_answer(st.session_state['selected_question'], pinecone_results, openai_client)
st.session_state['khan_answer'] = khan_answer
st.subheader("💡 Khan 멘토의 답변")
st.markdown(st.session_state['khan_answer'])
# 참고 영상 정보 표시
if pinecone_results:
with st.expander("답변에 참고한 영상 정보 보기"):
displayed_urls = set()
for i, r in enumerate(pinecone_results):
url = r.get('URL', 'N/A')
if url in displayed_urls or url == 'N/A':
continue
displayed_urls.add(url)
st.markdown(f"--- **참고 자료 {len(displayed_urls)} (유사도: {r['점수']:.4f})** ---")
st.markdown(f"**제목:** {r.get('제목', 'N/A')}")
st.markdown(f"**요약:** {r.get('요약', 'N/A')}")
timestamp = r.get('타임스탬프', 'N/A')
is_youtube = url and isinstance(url, str) and ('youtube.com' in url or 'youtu.be' in url)
start_seconds = None
if is_youtube and timestamp and timestamp != 'N/A':
start_seconds = parse_timestamp_to_seconds(timestamp)
if is_youtube and start_seconds is not None:
try:
timestamped_link_url = add_timestamp_to_youtube_url(url, timestamp)
st.markdown(f"**영상 링크 (타임스탬프 포함):** [{timestamped_link_url}]({timestamped_link_url})")
except Exception as e:
logger.error(f"Error creating timestamped URL for link: {e}")
st.markdown(f"**영상 링크 (원본):** [{url}]({url})")
elif url != "N/A" and isinstance(url, str) and url.startswith("http"):
st.markdown(f"**URL:** [{url}]({url})")
else:
st.markdown(f"**URL:** {url}")
if is_youtube and url != "N/A":
col1, col2 = st.columns(2)
with col1:
try:
st.video(url, start_time=start_seconds or 0)
except Exception as e:
st.error(f"비디오({url}) 재생 중 오류 발생: {e}")
st.markdown(f"[YouTube에서 보기]({url})")
elif url != "N/A":
col1, col2 = st.columns(2)
with col1:
try:
st.video(url)
except Exception as e:
logger.warning(f"st.video failed for non-YouTube URL {url}: {e}")
st.markdown("---")
st.caption("Powered by Pinecone, Sentence Transformers, and OpenAI")
# 다시 처음으로 돌아가기 버튼
if st.button("다시 질문 흐름 시작하기"):
for k in ['user_state','empathy_message','example_questions','selected_question','khan_answer','step']:
st.session_state[k] = '' if k != 'step' else 1
st.rerun()