Spaces:
Runtime error
Runtime error
File size: 5,582 Bytes
ab176c3 d3c7abc ab176c3 d3c7abc ab176c3 d3c7abc ab176c3 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 |
import torch
from transformers import AutoModelForMaskedLM, AutoTokenizer
from langchain_text_splitters import SpacyTextSplitter
from sentence_transformers import SentenceTransformer
from typing import Dict, List
import torch
from qdrant_client import http, models, QdrantClient
from transformers import T5Tokenizer, T5ForConditionalGeneration
class HybridVectorSearch:
# dd="cuda"
dd="cpu"
cuda_device = torch.device("cpu")
sparse_model = "naver/splade-v3"
tokenizer = AutoTokenizer.from_pretrained(sparse_model)
model = AutoModelForMaskedLM.from_pretrained(sparse_model).to(cuda_device)
text_splitter = SpacyTextSplitter(chunk_size=1000)
dense_encoder = SentenceTransformer("all-MiniLM-L6-v2", device="cpu")
model_name_t5 = "Falconsai/text_summarization" # "t5-small"
tokenizer_t5 = T5Tokenizer.from_pretrained(model_name_t5)
model_t5 = T5ForConditionalGeneration.from_pretrained(model_name_t5).to(dd)
client = QdrantClient(url="http://localhost:6333")
earnings_collection = "earnings_calls"
@staticmethod
def reciprocal_rank_fusion(
responses: List[List[http.models.ScoredPoint]], limit: int = 10
) -> List[http.models.ScoredPoint]:
def compute_score(pos: int) -> float:
ranking_constant = 2 # the constant mitigates the impact of high rankings by outlier systems
return 1 / (ranking_constant + pos)
scores: Dict[http.models.ExtendedPointId, float] = {}
point_pile = {}
for response in responses:
for i, scored_point in enumerate(response):
if scored_point.id in scores:
scores[scored_point.id] += compute_score(i)
else:
point_pile[scored_point.id] = scored_point
scores[scored_point.id] = compute_score(i)
sorted_scores = sorted(scores.items(), key=lambda item: item[1], reverse=True)
sorted_points = []
for point_id, score in sorted_scores[:limit]:
point = point_pile[point_id]
point.score = score
sorted_points.append(point)
return sorted_points
@staticmethod
def summary(text: str):
inputs = HybridVectorSearch.tokenizer_t5.encode(
f"summarize: {text}", return_tensors="pt", max_length=1024, truncation=True
).to(dd)
summary_ids = HybridVectorSearch.model_t5.generate(
inputs,
max_length=512,
min_length=100,
length_penalty=2.0,
num_beams=4,
early_stopping=True,
)
summary = HybridVectorSearch.tokenizer_t5.decode(
summary_ids[0], skip_special_tokens=True
)
return summary
@staticmethod
def compute_vector(text):
tokens = HybridVectorSearch.tokenizer(text, return_tensors="pt").to(
HybridVectorSearch.cuda_device
)
split_texts = []
if len(tokens["input_ids"][0]) >= 512:
summary = HybridVectorSearch.summary(text)
split_texts = HybridVectorSearch.text_splitter.split_text(text)
tokens = HybridVectorSearch.tokenizer(summary, return_tensors="pt").to(
HybridVectorSearch.cuda_device
)
output = HybridVectorSearch.model(**tokens)
logits, attention_mask = output.logits, tokens.attention_mask
relu_log = torch.log(1 + torch.relu(logits))
weighted_log = relu_log * attention_mask.unsqueeze(-1)
max_val, _ = torch.max(weighted_log, dim=1)
vec = max_val.squeeze()
return vec, tokens, split_texts
@staticmethod
def search(query_text: str, symbol="AMD"):
vectors, tokens, split_texts = HybridVectorSearch.compute_vector(query_text)
indices = vectors.cpu().nonzero().numpy().flatten()
values = vectors.cpu().detach().numpy()[indices]
sparse_query_vector = models.SparseVector(indices=indices, values=values)
query_vector = HybridVectorSearch.dense_encoder.encode(query_text).tolist()
limit = 3
dense_request = models.SearchRequest(
vector=models.NamedVector(name="dense_vector", vector=query_vector),
limit=limit,
with_payload=True,
)
sparse_request = models.SearchRequest(
vector=models.NamedSparseVector(
name="sparse_vector", vector=sparse_query_vector
),
limit=limit,
with_payload=True,
)
(dense_request_response, sparse_request_response) = (
HybridVectorSearch.client.search_batch(
collection_name=HybridVectorSearch.earnings_collection,
requests=[dense_request, sparse_request],
)
)
ranked_search_response = HybridVectorSearch.reciprocal_rank_fusion(
[dense_request_response, sparse_request_response], limit=10
)
search_response = ""
for search_result in ranked_search_response:
search_response += search_result.payload["conversation"] + "\n"
return ranked_search_response
@staticmethod
def chat_search(query: str, chat_history):
result = HybridVectorSearch.search(query)
chat_history.append((query, "Search Results"))
for search_result in result[:3]:
text = search_result.payload["conversation"]
summary = HybridVectorSearch.summary(text) + f'\n```\n{text} \n```'
chat_history.append((None, summary))
return "", chat_history
|