|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
""" |
|
Python helper function to customize json sentences locally. |
|
""" |
|
|
|
__author__ = "Vagner Santana, Melina Alberio, Cassia Sanctos and Tiago Machado" |
|
__copyright__ = "IBM Corporation 2024" |
|
__credits__ = ["Vagner Santana, Melina Alberio, Cassia Sanctos, Tiago Machado"] |
|
__license__ = "Apache 2.0" |
|
__version__ = "0.0.1" |
|
|
|
import os |
|
import json |
|
import pandas as pd |
|
import numpy as np |
|
import math |
|
from sentence_transformers import SentenceTransformer |
|
|
|
|
|
def query_model(texts, model_path): |
|
out = [] |
|
model = SentenceTransformer(model_path) |
|
input_embedding = model.encode(texts) |
|
out.append(input_embedding) |
|
if( out != [] ): |
|
return out[0] |
|
else: |
|
return out |
|
|
|
|
|
def get_distance(embedding1, embedding2): |
|
total = 0 |
|
if( len(embedding1) != len(embedding2)): |
|
return math.inf |
|
|
|
for i, obj in enumerate(embedding1): |
|
total += math.pow(embedding2[0][i] - embedding1[0][i], 2) |
|
return(math.sqrt(total)) |
|
|
|
|
|
def get_centroid(v, dimension = 384, k = 10): |
|
centroid = [0] * dimension |
|
count = 0 |
|
for p in v['prompts']: |
|
i = 0 |
|
while i < len(p['embedding']): |
|
centroid[i] += p['embedding'][i] |
|
i += 1 |
|
count += 1 |
|
i = 0 |
|
while i < len(centroid): |
|
centroid[i] /= count |
|
i += 1 |
|
|
|
|
|
if(len(v['prompts']) <= k): |
|
return centroid |
|
else: |
|
k_items = pd.DataFrame(columns=['embedding', 'distance']) |
|
for p in v['prompts']: |
|
dist = get_distance(pd.DataFrame(centroid), pd.DataFrame(p['embedding'])) |
|
k_items = pd.concat([pd.DataFrame([[p['embedding'], dist]], columns=k_items.columns), k_items], ignore_index=True) |
|
|
|
k_items = k_items.sort_values(by='distance') |
|
k_items = k_items.head(k) |
|
|
|
|
|
centroid = [0] * dimension |
|
for i, embedding in enumerate(k_items['embedding']): |
|
for j, dimension in enumerate(embedding): |
|
centroid[j] += embedding[j] |
|
i = 0 |
|
while i < len(centroid): |
|
centroid[i] /= k |
|
i += 1 |
|
return centroid |
|
|
|
def populate_embeddings(prompt_json, model_path): |
|
errors, successess = 0, 0 |
|
for v in prompt_json['positive_values']: |
|
for p in v['prompts']: |
|
if( p['text'] != '' and p['embedding'] == []): |
|
embedding = query_model(p['text'], model_path) |
|
if( 'error' in embedding ): |
|
p['embedding'] = [] |
|
errors += 1 |
|
else: |
|
p['embedding'] = embedding.tolist() |
|
|
|
|
|
for v in prompt_json['negative_values']: |
|
for p in v['prompts']: |
|
if(p['text'] != '' and p['embedding'] == []): |
|
embedding = query_model(p['text'], model_path) |
|
if('error' in embedding): |
|
p['embedding'] = [] |
|
errors += 1 |
|
else: |
|
p['embedding'] = embedding.tolist() |
|
|
|
return prompt_json |
|
|
|
def populate_centroids(prompt_json): |
|
for v in prompt_json['positive_values']: |
|
v['centroid'] = get_centroid(v, dimension = 384, k = 10) |
|
for v in prompt_json['negative_values']: |
|
v['centroid'] = get_centroid(v, dimension = 384, k = 10) |
|
return prompt_json |
|
|
|
|
|
def save_json(prompt_json, json_out_file_name): |
|
with open(json_out_file_name, 'w') as outfile: |
|
json.dump(prompt_json, outfile) |