from langchain_chroma import Chroma from langchain_huggingface import HuggingFaceEmbeddings from sentence_transformers import SentenceTransformer from sklearn.metrics.pairwise import cosine_similarity from openpyxl import load_workbook import pandas as pd import gradio as gr import numpy as np import re # Chroma DB 로드 ###기본(BGE) model_huggingface_ori = HuggingFaceEmbeddings(model_name='BAAI/bge-m3') persist_directory_ori = './chroma_kpi_250416' kpi_pool_ori = Chroma(persist_directory=persist_directory_ori, embedding_function=model_huggingface_ori) print(kpi_pool_ori._collection.count()) persist_directory_ori2 = './chroma_ncs_241230' ncs_db_ori = Chroma(persist_directory=persist_directory_ori2, embedding_function=model_huggingface_ori) print(ncs_db_ori._collection.count()) ### model_huggingface = HuggingFaceEmbeddings(model_name='snunlp/KR-SBERT-V40K-klueNLI-augSTS') persist_directory1 = './chroma_kpi_250528_SBERT' kpi_pool = Chroma(persist_directory=persist_directory1, embedding_function=model_huggingface) print(kpi_pool._collection.count()) #### model_huggingface2 = HuggingFaceEmbeddings(model_name='jhgan/ko-sbert-sts') persist_directory_jhgan1 = './chroma_kpi_250528_SBERTjhgan' kpi_pool2 = Chroma(persist_directory=persist_directory_jhgan1, embedding_function=model_huggingface2) print(kpi_pool2._collection.count()) def search_unit(unit_query): results = ncs_db_ori.similarity_search_with_relevance_scores(unit_query, k=7) # 검색 결과 텍스트 포맷팅 text = "" for i, doc in enumerate(results): # 텍스트와 메타데이터 처리 unit = doc[0].page_content.replace(", ", "  /  ") job_name = doc[0].metadata['세분류코드명'] # 코드 계층 처리 code = [ doc[0].metadata['대분류코드명'], doc[0].metadata['중분류코드명'], doc[0].metadata['소분류코드명'] ] code_str = " > ".join(code) # 텍스트 포맷팅 similarity = round(doc[1], 3) text += f""" **[{i+1}] {job_name}**   |   {code_str}   |   similarity {similarity}
{unit}
""" return text def search_unit_all(unit_query): text_bge = search_unit(unit_query, "BGE") text_snu = search_unit(unit_query, "SBERT-snunlp") #text_jh = search_unit(unit_query, "SBERT-jhgan") return text_bge, text_snu downsample_model_ori = SentenceTransformer('BAAI/bge-m3') downsample_model = SentenceTransformer('snunlp/KR-SBERT-V40K-klueNLI-augSTS') downsample_model_2 = SentenceTransformer('jhgan/ko-sbert-sts') def filter_semantically_similar_texts_by_embedding(df, mode, embedding_field='embedding_input', similarity_threshold=0.8): texts = df[embedding_field].tolist() # 텍스트를 임베딩 if mode == "BGE": embeddings = downsample_model_ori.encode(texts) elif mode == "SBERT-snunlp": embeddings = downsample_model.encode(texts) else: embeddings = downsample_model_2.encode(texts) # 코사인 유사도 행렬 계산 similarity_matrix = cosine_similarity(embeddings) np.fill_diagonal(similarity_matrix, 0) # 유사도가 threshold 이상인 항목 필터링 filtered_indices = [] excluded_indices = set() for i in range(len(texts)): if i not in excluded_indices: filtered_indices.append(i) similar_indices = np.where(similarity_matrix[i] > similarity_threshold)[0] excluded_indices.update(similar_indices) return df.iloc[filtered_indices].reset_index(drop=True) def search_kpi(kpi_query, kpi_count, mode): if mode == "BGE": print("BGE 검색 시작") results = kpi_pool_ori.similarity_search_with_relevance_scores(kpi_query, k=50) elif mode == "SBERT-snunlp": print("SBERT-snunlp 검색 시작") results = kpi_pool.similarity_search_with_relevance_scores(kpi_query, k=50) else: print("SBERT-jhgan 검색 시작") results = kpi_pool2.similarity_search_with_relevance_scores(kpi_query, k=50) # 메타데이터 + 점수 추출 records = [ {**doc.metadata, '유사도점수': score} for doc, score in results ] # DataFrame으로 변환 df = pd.DataFrame(records) df['카테고리'] = df['BSC 관점'] + " > " + df['전략방향'] df = df.drop_duplicates(subset=['정의', '산식']).head(15) df = df.iloc[:kpi_count] df = df.reset_index(drop=True) # 카테고리 생성 (BSC 관점 + 전략방향) df['카테고리'] = df['BSC 관점'] + " > " + df['전략방향'] visible_df = df[['지표명', '산식', '비고', '카테고리']].copy() kpi_list = list(range(1, len(visible_df) + 1)) kpi_df = df[['지표명', '정의', '산식', '유형', '비고', 'BSC 관점', '전략방향', '전략과제']].copy() return gr.update(visible=True), gr.update(choices=kpi_list), visible_df, kpi_df, kpi_list, gr.update(visible=False) def search_kpi_one(kpi_query, kpi_count, mode): if mode == "BGE": print("BGE 검색 시작") results = kpi_pool_ori.similarity_search_with_relevance_scores(kpi_query, k=50) elif mode == "SBERT-snunlp": print("SBERT-snunlp 검색 시작") results = kpi_pool.similarity_search_with_relevance_scores(kpi_query, k=50) else: print("SBERT-jhgan 검색 시작") results = kpi_pool2.similarity_search_with_relevance_scores(kpi_query, k=50) # 메타데이터 + 점수 추출 records = [ {**doc.metadata, '유사도점수': score} for doc, score in results ] # DataFrame으로 변환 df = pd.DataFrame(records) df['카테고리'] = df['BSC 관점'] + " > " + df['전략방향'] df = df.drop_duplicates(subset=['정의', '산식']).head(15) df = df.iloc[:kpi_count] df = df.reset_index(drop=True) # 카테고리 생성 (BSC 관점 + 전략방향) df['카테고리'] = df['BSC 관점'] + " > " + df['전략방향'] visible_df = df[['지표명', '산식', '비고']].copy() kpi_list = list(range(1, len(visible_df) + 1)) kpi_df = df[['지표명', '정의', '산식', '유형', '비고', 'BSC 관점', '전략방향', '전략과제']].copy() return visible_df, kpi_df, kpi_list def format_df_html(df): html = "" for i, row in df.iterrows(): html += f"""
[{i+1}] {row['지표명']}
{row['비고']}
{row['산식']}
""" return html def search_kpi_all_models(kpi_query, kpi_count): print("함수 호출, 테이블 생성 시작") # 각 모델별 결과 visible_bge, kpi_bge, list_bge = search_kpi_one(kpi_query, kpi_count, "BGE") visible_sn, kpi_sn, list_sn = search_kpi_one(kpi_query, kpi_count, "SBERT-snunlp") visible_jh, kpi_jh, list_jh = search_kpi_one(kpi_query, kpi_count, "SBERT-jhgan") print("함수 종료") visible_df = [visible_bge, visible_sn, visible_jh] visible_df_text = [format_df_html(df) for df in visible_df] #gr.update(visible=True), gr.update(choices=kpi_list), visible_df, kpi_df, kpi_list, gr.update(visible=False) return ( gr.update(visible=True), gr.update(choices=list_bge), #체크박스리스트 gr.update(choices=list_sn), gr.update(choices=list_jh), visible_df_text[0], # kpi_table1 visible_df_text[1], # kpi_table2 visible_df_text[2], # kpi_table3 kpi_bge, kpi_sn, kpi_jh, list_bge, list_sn, list_jh, gr.update(visible=False) ) # 셀 주소와 값을 매핑한 딕셔너리 생성 def make_excel_table(dataframe, start_cell): table_dict = {} # 시작 셀 좌표 계산 start_row = int(''.join(filter(str.isdigit, start_cell))) # 시작 행 (숫자) start_col = ord(start_cell[0].upper()) - ord('A') + 1 # 시작 열 (문자 -> 숫자) # 데이터프레임 반복 처리 for row_index, row in enumerate(dataframe.itertuples(index=False), start=start_row): for col_index, value in enumerate(row, start=start_col): # 셀 주소 계산 (예: B5, C5, ...) cell = f"{chr(ord('A') + col_index - 1)}{row_index}" table_dict[cell] = value return table_dict # 다운로드 파일 생성 함수 def generate_excel(df1, df2, df3, kpi_list1, kpi_list2, kpi_list3, kpi_query): #각 모델별 filtered_df 생성 def get_filtered(df, kpi_list, model_name): if kpi_list: indices = [int(i) - 1 for i in kpi_list] # -1 보정 filtered = df.iloc[indices].copy() filtered["출처"] = model_name return filtered else: # 선택된 KPI 없을 때: 빈 DataFrame 반환 return pd.DataFrame(columns=list(df.columns) + ["출처"]) # 인덱스(-1 보정)로 DataFrame 필터링 #filtered_df = df.iloc[[int(i) - 1 for i in kpi_list]] if kpi_list else pd.DataFrame(columns=df.columns) filtered_df1 = get_filtered(df1, kpi_list1, "BGE3") filtered_df2 = get_filtered(df2, kpi_list2, "snunlp") filtered_df3 = get_filtered(df3, kpi_list3, "jhgan") filtered_df = pd.concat([filtered_df1, filtered_df2, filtered_df3], ignore_index=True) #필터링내용 병합 filtered_df = filtered_df.drop_duplicates(subset='산식') # '산식' 기준 중복 제거 # 엑셀 파일 열기 file_path = "./template.xlsx" workbook = load_workbook(file_path) sheet = workbook.active update_values = make_excel_table(filtered_df, 'B4') for cell, value in update_values.items(): sheet[cell].value = value # 워크시트 기본 확대 수준 설정(%) sheet.sheet_view.zoomScaleNormal = 85 # 파일 저장 filename = f"KPI_POOL_{kpi_query}.xlsx" safe_filename = re.sub(r'\s*/\s*', '_', filename) safe_filename = re.sub(r'\s+', ' ', safe_filename) output_file = safe_filename.strip() workbook.save(output_file) return gr.update(value=output_file, visible=True) def toggle_selection(current_selection, kpi_list): if set(current_selection) == set(kpi_list): # 이미 전체 선택된 경우 return [] else: # 전체 선택 안 된 경우 return kpi_list def toggle_all_selections(sel1, list1, sel2, list2, sel3, list3): def toggle(current, full_list): return [] if set(current) == set(full_list) else full_list return ( toggle(sel1, list1), toggle(sel2, list2), toggle(sel3, list3) ) css = """ /* 데이터프레임 스타일 */ .gradio-container table { table-layout: fixed; width: 100%; } .gradio-container td{ white-space: nowrap !important; overflow-x: auto !important; text-align: left; font-size: 13px; letter-spacing: -1px !important; } /* 헤더 기본 스타일 */ .gradio-container th[aria-sort]::after { visibility: hidden !important; /* 아이콘만 감춤 */ } .gradio-container th .header-content { justify-content: center !important; text-align: center; font-size: 13px; letter-spacing: -1px !important; } .gradio-container th span { text-align: center !important; display: block !important; width: 100%; } .v_check { padding-top: 39px !important; margin-right: 0px !important; padding-right: 0px !important; margin-left: 0px !important; padding-left: 0px !important; } .v_check div { display: block !important; } .v_check label { max-width: 80%; /* 전체 너비 유지 */ padding: 2px; /* 내부 여백 조정 */ margin-bottom: 52px; /* 라벨 간 간격 설정 */ border: 1px solid transparent !important; letter-spacing: -1px !important; /* 자간 좁게 설정*/ justify-content: center; } div.svelte-1nguped { background: transparent !important; border: none !important; } .left-padding { padding-left: 43px !important; /* 왼쪽 패딩 추가 */ } .custom-markdown h3 { font-size: 18px; /* 본문 및 목록 글자 크기 */ } .custom-markdown blockquote { margin-bottom: 8px !important; } .custom-markdown p, .custom-markdown li { margin-top: 8px !important; font-size: 15px; /* 본문 및 목록 글자 크기 */ line-height: 1.5; } .custom-markdown a { font-size: 15px; color: #000000; } .no-margine { margin-bottom: 0px !important; padding-bottom: 0px !important; margin-top: 0px !important; padding-top: 0px !important; gap: 0px !important; } """ guide = """> ### 저작권 및 유의사항 안내 - 본 앱은 시앤피컨설팅이 개발한 KPI POOL 검색 도구로, AI 기반 추천 결과는 참고용으로 제공됩니다. - AI 기반 추천 알고리즘은 전문 컨설팅을 대체할 수 없으며, 반드시 조직의 전략, 평가 목적, 데이터 수집 가능성 등과의 적합성 검토가 필요합니다. - KPI 설정과 적용에 대한 개별 맞춤 검토는 시앤피컨설팅의 전문 컨설턴트에게 문의해 주세요.

> ### Contact Us 시앤피컨설팅그룹 일터혁신본부  |  Tel. 02-6257-1448  |  http://www.cnp.re.kr  |  hpw@cnp.re.kr """ empty_df = pd.DataFrame(columns=["지표명", "산식", "비고", "카테고리"]) with gr.Blocks(css=css, fill_width=True) as demo: df_state1 = gr.State() df_state2 = gr.State() df_state3 = gr.State() check_state1 = gr.State() check_state2 = gr.State() check_state3 = gr.State() with gr.Row(): #mode = gr.Dropdown(choices={"BGE","SBERT-snunlp","SBERT-jhgan"}, label="모델을 선택하세요") gr.Markdown(" ") with gr.Tab("KPI Pool 검색"): with gr.Column(elem_classes="left-padding"): with gr.Row(equal_height=True): kpi_query = gr.Textbox(scale=30, submit_btn=True, label= "성과평가를 진행할 [핵심업무 or 핵심성공요인]을 입력해주세요😊! (검색 키워드는 직무기술서 또는 NCS 능력단위 참고)", placeholder="예: 자금 → 자금조달 / 재무위험관리 / 자금운용") kpi_count = gr.Slider(label="KPI 출력 개수", value = 7, minimum=5, maximum=10, step=1, scale=7) copyright = gr.Markdown(guide, visible=True, elem_classes="custom-markdown") with gr.Column(visible=False) as output_area: with gr.Group(): with gr.Row(): with gr.Group(): with gr.Tab("BAAI"): with gr.Row(): kpi_checkbox1 = gr.CheckboxGroup(choices=[], interactive=True, elem_classes="v_check", container=False, min_width=5, scale=1) with gr.Column(scale=11): kpi_table1 = gr.HTML(label="BGE 결과") with gr.Group(): with gr.Tab("snunlp"): with gr.Row(): kpi_checkbox2 = gr.CheckboxGroup(choices=[], interactive=True, elem_classes="v_check", container=False, min_width=5, scale=1) with gr.Column(scale=11): kpi_table2 = gr.HTML(label="SBERT-snunlp 결과") with gr.Group(): with gr.Tab("jhgan"): with gr.Row(): kpi_checkbox3 = gr.CheckboxGroup(choices=[], interactive=True, elem_classes="v_check", container=False, min_width=5, scale=1) with gr.Column(scale=11): kpi_table3 = gr.HTML(label="SBERT-jhgan 결과") with gr.Row(): gr.Column(scale=2) select_button = gr.Button("All", scale=1) download_button = gr.Button("Download",scale=1) clear_button = gr.Button("Clear",scale=1) gr.Column(scale=2) file_download = gr.Files(label="Download", interactive=False, visible=False) #kpi_query.submit(search_kpi, inputs = [kpi_query, kpi_count, mode], outputs = [output_area, kpi_checkbox, kpi_table, df_state, check_state, copyright]) kpi_query.submit( search_kpi_all_models, inputs = [kpi_query, kpi_count], outputs = [ output_area, kpi_checkbox1, kpi_checkbox2, kpi_checkbox3, kpi_table1, kpi_table2, kpi_table3, df_state1, df_state2, df_state3, check_state1,check_state2,check_state3, copyright ] ) #select_button.click(fn=toggle_selection, inputs=[kpi_checkbox, check_state], outputs=kpi_checkbox, show_progress='hidden') select_button.click( fn=toggle_all_selections, inputs=[ kpi_checkbox1, check_state1, kpi_checkbox2, check_state2, kpi_checkbox3, check_state3 ], outputs=[ kpi_checkbox1, kpi_checkbox2, kpi_checkbox3 ], show_progress='hidden' ) download_button.click( generate_excel, inputs=[df_state1, df_state2, df_state3, kpi_checkbox1, kpi_checkbox2, kpi_checkbox3, kpi_query], outputs=[file_download] ) clear_button.click( fn=lambda: (None, None, None, None, gr.update(visible=False), gr.update(choices=[], value=[]),gr.update(choices=[], value=[]),gr.update(choices=[], value=[]), gr.update(value=""),gr.update(value=""),gr.update(value=""), gr.update(value=None, visible=False), gr.update(visible=True)), outputs=[df_state1, df_state2, df_state3, kpi_query, output_area, kpi_checkbox1, kpi_checkbox2, kpi_checkbox3, kpi_table1, kpi_table2, kpi_table3, file_download, copyright], show_progress='hidden' ) with gr.Tab("[참고] NCS 능력단위"): unit_query = gr.Textbox(label="업종 or 직종 + 직무명을 입력하세요😊", scale=1, submit_btn=True, placeholder="예: 의약품 법률자문, 공공행정 경영기획, 재무회계 자금") with gr.Row(): with gr.Group(): unit_result = gr.Markdown() #with gr.Group(): # with gr.Tab("SBERT-snunlp"): # unit_result2 = gr.Markdown() unit_query.submit(search_unit, inputs=[unit_query], outputs=[unit_result]) #unit_query.submit(search_unit_all, inputs=unit_query, outputs=[unit_result1, unit_result2]) demo.launch(debug=True)