kpi_search_beta / app.py
cnp-ai's picture
Update app.py
7f00ec1 verified
from langchain_chroma import Chroma
from langchain_huggingface import HuggingFaceEmbeddings
from sentence_transformers import SentenceTransformer
from sklearn.metrics.pairwise import cosine_similarity
from openpyxl import load_workbook
import pandas as pd
import gradio as gr
import numpy as np
import re
# Chroma DB 로드
###기본(BGE)
model_huggingface_ori = HuggingFaceEmbeddings(model_name='BAAI/bge-m3')
persist_directory_ori = './chroma_kpi_250416'
kpi_pool_ori = Chroma(persist_directory=persist_directory_ori, embedding_function=model_huggingface_ori)
print(kpi_pool_ori._collection.count())
persist_directory_ori2 = './chroma_ncs_241230'
ncs_db_ori = Chroma(persist_directory=persist_directory_ori2, embedding_function=model_huggingface_ori)
print(ncs_db_ori._collection.count())
###
model_huggingface = HuggingFaceEmbeddings(model_name='snunlp/KR-SBERT-V40K-klueNLI-augSTS')
persist_directory1 = './chroma_kpi_250528_SBERT'
kpi_pool = Chroma(persist_directory=persist_directory1, embedding_function=model_huggingface)
print(kpi_pool._collection.count())
####
model_huggingface2 = HuggingFaceEmbeddings(model_name='jhgan/ko-sbert-sts')
persist_directory_jhgan1 = './chroma_kpi_250528_SBERTjhgan'
kpi_pool2 = Chroma(persist_directory=persist_directory_jhgan1, embedding_function=model_huggingface2)
print(kpi_pool2._collection.count())
def search_unit(unit_query):
results = ncs_db_ori.similarity_search_with_relevance_scores(unit_query, k=7)
# 검색 결과 텍스트 포맷팅
text = ""
for i, doc in enumerate(results):
# 텍스트와 메타데이터 처리
unit = doc[0].page_content.replace(", ", "  /  ")
job_name = doc[0].metadata['세분류코드명']
# 코드 계층 처리
code = [
doc[0].metadata['대분류코드명'],
doc[0].metadata['중분류코드명'],
doc[0].metadata['소분류코드명']
]
code_str = " > ".join(code)
# 텍스트 포맷팅
similarity = round(doc[1], 3)
text += f"""
<span style="font-size: 18px;">**[{i+1}] {job_name}**</span>
<span style="font-size: 13px;"> &nbsp; | &nbsp; {code_str} &nbsp; | &nbsp; similarity {similarity} </span>
<br> {unit} <br>
"""
return text
def search_unit_all(unit_query):
text_bge = search_unit(unit_query, "BGE")
text_snu = search_unit(unit_query, "SBERT-snunlp")
#text_jh = search_unit(unit_query, "SBERT-jhgan")
return text_bge, text_snu
downsample_model_ori = SentenceTransformer('BAAI/bge-m3')
downsample_model = SentenceTransformer('snunlp/KR-SBERT-V40K-klueNLI-augSTS')
downsample_model_2 = SentenceTransformer('jhgan/ko-sbert-sts')
def filter_semantically_similar_texts_by_embedding(df, mode, embedding_field='embedding_input', similarity_threshold=0.8):
texts = df[embedding_field].tolist()
# 텍스트를 임베딩
if mode == "BGE":
embeddings = downsample_model_ori.encode(texts)
elif mode == "SBERT-snunlp":
embeddings = downsample_model.encode(texts)
else:
embeddings = downsample_model_2.encode(texts)
# 코사인 유사도 행렬 계산
similarity_matrix = cosine_similarity(embeddings)
np.fill_diagonal(similarity_matrix, 0)
# 유사도가 threshold 이상인 항목 필터링
filtered_indices = []
excluded_indices = set()
for i in range(len(texts)):
if i not in excluded_indices:
filtered_indices.append(i)
similar_indices = np.where(similarity_matrix[i] > similarity_threshold)[0]
excluded_indices.update(similar_indices)
return df.iloc[filtered_indices].reset_index(drop=True)
def search_kpi(kpi_query, kpi_count, mode):
if mode == "BGE":
print("BGE 검색 시작")
results = kpi_pool_ori.similarity_search_with_relevance_scores(kpi_query, k=50)
elif mode == "SBERT-snunlp":
print("SBERT-snunlp 검색 시작")
results = kpi_pool.similarity_search_with_relevance_scores(kpi_query, k=50)
else:
print("SBERT-jhgan 검색 시작")
results = kpi_pool2.similarity_search_with_relevance_scores(kpi_query, k=50)
# 메타데이터 + 점수 추출
records = [
{**doc.metadata, '유사도점수': score}
for doc, score in results
]
# DataFrame으로 변환
df = pd.DataFrame(records)
df['카테고리'] = df['BSC 관점'] + " > " + df['전략방향']
df = df.drop_duplicates(subset=['정의', '산식']).head(15)
df = df.iloc[:kpi_count]
df = df.reset_index(drop=True)
# 카테고리 생성 (BSC 관점 + 전략방향)
df['카테고리'] = df['BSC 관점'] + " > " + df['전략방향']
visible_df = df[['지표명', '산식', '비고', '카테고리']].copy()
kpi_list = list(range(1, len(visible_df) + 1))
kpi_df = df[['지표명', '정의', '산식', '유형', '비고', 'BSC 관점', '전략방향', '전략과제']].copy()
return gr.update(visible=True), gr.update(choices=kpi_list), visible_df, kpi_df, kpi_list, gr.update(visible=False)
def search_kpi_one(kpi_query, kpi_count, mode):
if mode == "BGE":
print("BGE 검색 시작")
results = kpi_pool_ori.similarity_search_with_relevance_scores(kpi_query, k=50)
elif mode == "SBERT-snunlp":
print("SBERT-snunlp 검색 시작")
results = kpi_pool.similarity_search_with_relevance_scores(kpi_query, k=50)
else:
print("SBERT-jhgan 검색 시작")
results = kpi_pool2.similarity_search_with_relevance_scores(kpi_query, k=50)
# 메타데이터 + 점수 추출
records = [
{**doc.metadata, '유사도점수': score}
for doc, score in results
]
# DataFrame으로 변환
df = pd.DataFrame(records)
df['카테고리'] = df['BSC 관점'] + " > " + df['전략방향']
df = df.drop_duplicates(subset=['정의', '산식']).head(15)
df = df.iloc[:kpi_count]
df = df.reset_index(drop=True)
# 카테고리 생성 (BSC 관점 + 전략방향)
df['카테고리'] = df['BSC 관점'] + " > " + df['전략방향']
visible_df = df[['지표명', '산식', '비고']].copy()
kpi_list = list(range(1, len(visible_df) + 1))
kpi_df = df[['지표명', '정의', '산식', '유형', '비고', 'BSC 관점', '전략방향', '전략과제']].copy()
return visible_df, kpi_df, kpi_list
def format_df_html(df):
html = ""
for i, row in df.iterrows():
html += f"""
<div style="margin-bottom: 5px;">
<span style="font-size: 18px; font-weight: bold;">[{i+1}] {row['지표명']}</span><br>
<span style="font-size: 13px; color: gray;">{row['비고']}</span><br>
<div style="margin-top: 5px; font-size: 14px; color: #333;">{row['산식']}
</div>
<div style="height: 8px;"></div>
</div>
"""
return html
def search_kpi_all_models(kpi_query, kpi_count):
print("함수 호출, 테이블 생성 시작")
# 각 모델별 결과
visible_bge, kpi_bge, list_bge = search_kpi_one(kpi_query, kpi_count, "BGE")
visible_sn, kpi_sn, list_sn = search_kpi_one(kpi_query, kpi_count, "SBERT-snunlp")
visible_jh, kpi_jh, list_jh = search_kpi_one(kpi_query, kpi_count, "SBERT-jhgan")
print("함수 종료")
visible_df = [visible_bge, visible_sn, visible_jh]
visible_df_text = [format_df_html(df) for df in visible_df]
#gr.update(visible=True), gr.update(choices=kpi_list), visible_df, kpi_df, kpi_list, gr.update(visible=False)
return (
gr.update(visible=True),
gr.update(choices=list_bge), #체크박스리스트
gr.update(choices=list_sn),
gr.update(choices=list_jh),
visible_df_text[0], # kpi_table1
visible_df_text[1], # kpi_table2
visible_df_text[2], # kpi_table3
kpi_bge, kpi_sn, kpi_jh,
list_bge, list_sn, list_jh,
gr.update(visible=False)
)
# 셀 주소와 값을 매핑한 딕셔너리 생성
def make_excel_table(dataframe, start_cell):
table_dict = {}
# 시작 셀 좌표 계산
start_row = int(''.join(filter(str.isdigit, start_cell))) # 시작 행 (숫자)
start_col = ord(start_cell[0].upper()) - ord('A') + 1 # 시작 열 (문자 -> 숫자)
# 데이터프레임 반복 처리
for row_index, row in enumerate(dataframe.itertuples(index=False), start=start_row):
for col_index, value in enumerate(row, start=start_col):
# 셀 주소 계산 (예: B5, C5, ...)
cell = f"{chr(ord('A') + col_index - 1)}{row_index}"
table_dict[cell] = value
return table_dict
# 다운로드 파일 생성 함수
def generate_excel(df1, df2, df3, kpi_list1, kpi_list2, kpi_list3, kpi_query):
#각 모델별 filtered_df 생성
def get_filtered(df, kpi_list, model_name):
if kpi_list:
indices = [int(i) - 1 for i in kpi_list] # -1 보정
filtered = df.iloc[indices].copy()
filtered["출처"] = model_name
return filtered
else:
# 선택된 KPI 없을 때: 빈 DataFrame 반환
return pd.DataFrame(columns=list(df.columns) + ["출처"])
# 인덱스(-1 보정)로 DataFrame 필터링
#filtered_df = df.iloc[[int(i) - 1 for i in kpi_list]] if kpi_list else pd.DataFrame(columns=df.columns)
filtered_df1 = get_filtered(df1, kpi_list1, "BGE3")
filtered_df2 = get_filtered(df2, kpi_list2, "snunlp")
filtered_df3 = get_filtered(df3, kpi_list3, "jhgan")
filtered_df = pd.concat([filtered_df1, filtered_df2, filtered_df3], ignore_index=True) #필터링내용 병합
filtered_df = filtered_df.drop_duplicates(subset='산식') # '산식' 기준 중복 제거
# 엑셀 파일 열기
file_path = "./template.xlsx"
workbook = load_workbook(file_path)
sheet = workbook.active
update_values = make_excel_table(filtered_df, 'B4')
for cell, value in update_values.items():
sheet[cell].value = value
# 워크시트 기본 확대 수준 설정(%)
sheet.sheet_view.zoomScaleNormal = 85
# 파일 저장
filename = f"KPI_POOL_{kpi_query}.xlsx"
safe_filename = re.sub(r'\s*/\s*', '_', filename)
safe_filename = re.sub(r'\s+', ' ', safe_filename)
output_file = safe_filename.strip()
workbook.save(output_file)
return gr.update(value=output_file, visible=True)
def toggle_selection(current_selection, kpi_list):
if set(current_selection) == set(kpi_list): # 이미 전체 선택된 경우
return []
else: # 전체 선택 안 된 경우
return kpi_list
def toggle_all_selections(sel1, list1, sel2, list2, sel3, list3):
def toggle(current, full_list):
return [] if set(current) == set(full_list) else full_list
return (
toggle(sel1, list1),
toggle(sel2, list2),
toggle(sel3, list3)
)
css = """
/* 데이터프레임 스타일 */
.gradio-container table {
table-layout: fixed;
width: 100%;
}
.gradio-container td{
white-space: nowrap !important;
overflow-x: auto !important;
text-align: left;
font-size: 13px;
letter-spacing: -1px !important;
}
/* 헤더 기본 스타일 */
.gradio-container th[aria-sort]::after {
visibility: hidden !important; /* 아이콘만 감춤 */
}
.gradio-container th .header-content {
justify-content: center !important;
text-align: center;
font-size: 13px;
letter-spacing: -1px !important;
}
.gradio-container th span {
text-align: center !important;
display: block !important;
width: 100%;
}
.v_check { padding-top: 39px !important;
margin-right: 0px !important;
padding-right: 0px !important;
margin-left: 0px !important;
padding-left: 0px !important;
}
.v_check div { display: block !important; }
.v_check label {
max-width: 80%; /* 전체 너비 유지 */
padding: 2px; /* 내부 여백 조정 */
margin-bottom: 52px; /* 라벨 간 간격 설정 */
border: 1px solid transparent !important;
letter-spacing: -1px !important; /* 자간 좁게 설정*/
justify-content: center;
}
div.svelte-1nguped {
background: transparent !important;
border: none !important;
}
.left-padding { padding-left: 43px !important; /* 왼쪽 패딩 추가 */ }
.custom-markdown h3 {
font-size: 18px; /* 본문 및 목록 글자 크기 */
}
.custom-markdown blockquote {
margin-bottom: 8px !important;
}
.custom-markdown p, .custom-markdown li {
margin-top: 8px !important;
font-size: 15px; /* 본문 및 목록 글자 크기 */
line-height: 1.5;
}
.custom-markdown a {
font-size: 15px;
color: #000000;
}
.no-margine {
margin-bottom: 0px !important;
padding-bottom: 0px !important;
margin-top: 0px !important;
padding-top: 0px !important;
gap: 0px !important;
}
"""
guide = """> ### 저작권 및 유의사항 안내
- 본 앱은 시앤피컨설팅이 개발한 KPI POOL 검색 도구로, AI 기반 추천 결과는 참고용으로 제공됩니다.
- AI 기반 추천 알고리즘은 전문 컨설팅을 대체할 수 없으며, 반드시 조직의 전략, 평가 목적, 데이터 수집 가능성 등과의 적합성 검토가 필요합니다.
- KPI 설정과 적용에 대한 개별 맞춤 검토는 시앤피컨설팅의 전문 컨설턴트에게 문의해 주세요.
<br><br>
> ### Contact Us
시앤피컨설팅그룹 일터혁신본부  |  Tel. 02-6257-1448  |  http://www.cnp.re.kr  |  hpw@cnp.re.kr
"""
empty_df = pd.DataFrame(columns=["지표명", "산식", "비고", "카테고리"])
with gr.Blocks(css=css, fill_width=True) as demo:
df_state1 = gr.State()
df_state2 = gr.State()
df_state3 = gr.State()
check_state1 = gr.State()
check_state2 = gr.State()
check_state3 = gr.State()
with gr.Row():
#mode = gr.Dropdown(choices={"BGE","SBERT-snunlp","SBERT-jhgan"}, label="모델을 선택하세요")
gr.Markdown(" ")
with gr.Tab("KPI Pool 검색"):
with gr.Column(elem_classes="left-padding"):
with gr.Row(equal_height=True):
kpi_query = gr.Textbox(scale=30, submit_btn=True,
label= "성과평가를 진행할 [핵심업무 or 핵심성공요인]을 입력해주세요😊! (검색 키워드는 직무기술서 또는 NCS 능력단위 참고)",
placeholder="예: 자금 → 자금조달 / 재무위험관리 / 자금운용")
kpi_count = gr.Slider(label="KPI 출력 개수", value = 7, minimum=5, maximum=10, step=1, scale=7)
copyright = gr.Markdown(guide, visible=True, elem_classes="custom-markdown")
with gr.Column(visible=False) as output_area:
with gr.Group():
with gr.Row():
with gr.Group():
with gr.Tab("BAAI"):
with gr.Row():
kpi_checkbox1 = gr.CheckboxGroup(choices=[], interactive=True, elem_classes="v_check", container=False, min_width=5, scale=1)
with gr.Column(scale=11):
kpi_table1 = gr.HTML(label="BGE 결과")
with gr.Group():
with gr.Tab("snunlp"):
with gr.Row():
kpi_checkbox2 = gr.CheckboxGroup(choices=[], interactive=True, elem_classes="v_check", container=False, min_width=5, scale=1)
with gr.Column(scale=11):
kpi_table2 = gr.HTML(label="SBERT-snunlp 결과")
with gr.Group():
with gr.Tab("jhgan"):
with gr.Row():
kpi_checkbox3 = gr.CheckboxGroup(choices=[], interactive=True, elem_classes="v_check", container=False, min_width=5, scale=1)
with gr.Column(scale=11):
kpi_table3 = gr.HTML(label="SBERT-jhgan 결과")
with gr.Row():
gr.Column(scale=2)
select_button = gr.Button("All", scale=1)
download_button = gr.Button("Download",scale=1)
clear_button = gr.Button("Clear",scale=1)
gr.Column(scale=2)
file_download = gr.Files(label="Download", interactive=False, visible=False)
#kpi_query.submit(search_kpi, inputs = [kpi_query, kpi_count, mode], outputs = [output_area, kpi_checkbox, kpi_table, df_state, check_state, copyright])
kpi_query.submit(
search_kpi_all_models,
inputs = [kpi_query, kpi_count],
outputs = [
output_area,
kpi_checkbox1, kpi_checkbox2, kpi_checkbox3,
kpi_table1, kpi_table2, kpi_table3,
df_state1, df_state2, df_state3,
check_state1,check_state2,check_state3,
copyright
]
)
#select_button.click(fn=toggle_selection, inputs=[kpi_checkbox, check_state], outputs=kpi_checkbox, show_progress='hidden')
select_button.click(
fn=toggle_all_selections,
inputs=[
kpi_checkbox1, check_state1,
kpi_checkbox2, check_state2,
kpi_checkbox3, check_state3
],
outputs=[
kpi_checkbox1,
kpi_checkbox2,
kpi_checkbox3
],
show_progress='hidden'
)
download_button.click(
generate_excel,
inputs=[df_state1, df_state2, df_state3, kpi_checkbox1, kpi_checkbox2, kpi_checkbox3, kpi_query],
outputs=[file_download]
)
clear_button.click(
fn=lambda: (None, None, None,
None, gr.update(visible=False),
gr.update(choices=[], value=[]),gr.update(choices=[], value=[]),gr.update(choices=[], value=[]),
gr.update(value=""),gr.update(value=""),gr.update(value=""),
gr.update(value=None, visible=False), gr.update(visible=True)),
outputs=[df_state1, df_state2, df_state3,
kpi_query, output_area,
kpi_checkbox1, kpi_checkbox2, kpi_checkbox3,
kpi_table1, kpi_table2, kpi_table3,
file_download, copyright],
show_progress='hidden'
)
with gr.Tab("[참고] NCS 능력단위"):
unit_query = gr.Textbox(label="업종 or 직종 + 직무명을 입력하세요😊", scale=1, submit_btn=True,
placeholder="예: 의약품 법률자문, 공공행정 경영기획, 재무회계 자금")
with gr.Row():
with gr.Group():
unit_result = gr.Markdown()
#with gr.Group():
# with gr.Tab("SBERT-snunlp"):
# unit_result2 = gr.Markdown()
unit_query.submit(search_unit, inputs=[unit_query], outputs=[unit_result])
#unit_query.submit(search_unit_all, inputs=unit_query, outputs=[unit_result1, unit_result2])
demo.launch(debug=True)