Spaces:
Sleeping
Sleeping
File size: 20,509 Bytes
4bf5701 0bbf2df b4c92f5 0bbf2df 4bf5701 0bbf2df 4bf5701 0bbf2df 4bf5701 b4c92f5 4bf5701 b4c92f5 4bf5701 b4c92f5 4bf5701 b4c92f5 4bf5701 b4c92f5 4bf5701 b4c92f5 4bf5701 b4c92f5 4bf5701 b4c92f5 4bf5701 b4c92f5 4bf5701 b4c92f5 4bf5701 b4c92f5 4bf5701 b4c92f5 4bf5701 b4c92f5 4bf5701 b4c92f5 4bf5701 b4c92f5 4bf5701 b4c92f5 4bf5701 b4c92f5 4bf5701 b4c92f5 4bf5701 b4c92f5 4bf5701 0bbf2df b4c92f5 0bbf2df 4bf5701 0bbf2df 4bf5701 0bbf2df 4bf5701 0bbf2df 4bf5701 b4c92f5 4bf5701 b4c92f5 4bf5701 b4c92f5 4bf5701 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 |
import numpy as np
import pandas as pd
from typing import List, Dict
from itertools import combinations
from sklearn.metrics.pairwise import cosine_similarity
import torch
from .semantic_embedding import generate_embeddings
from .tokenize import tokenize_texts
import logging
from sklearn.feature_extraction.text import TfidfVectorizer
from .stopwords_bo import TIBETAN_STOPWORDS, TIBETAN_STOPWORDS_SET
from .stopwords_lite_bo import TIBETAN_STOPWORDS_LITE, TIBETAN_STOPWORDS_LITE_SET
# Attempt to import the Cython-compiled fast_lcs module
try:
from .fast_lcs import compute_lcs_fast
USE_CYTHON_LCS = True
except ImportError:
# print("Cython fast_lcs not found, using Python LCS. For better performance, compile the Cython module.")
USE_CYTHON_LCS = False
logger = logging.getLogger(__name__)
MAX_TOKENS_PER_CHUNK = 500 # Max tokens (words via botok) per chunk
CHUNK_OVERLAP = 50 # Number of tokens to overlap between chunks
def _chunk_text(
original_text_content: str,
tokens: List[str],
max_chunk_tokens: int,
overlap_tokens: int,
) -> List[str]:
"""
Splits a list of tokens into chunks and reconstructs text segments from these token chunks.
The reconstructed text segments are intended for embedding models.
Args:
original_text_content (str): The original raw text string. Used if no chunking is needed.
tokens (List[str]): The list of botok tokens for the original_text_content.
max_chunk_tokens (int): Maximum number of botok tokens per chunk.
overlap_tokens (int): Number of botok tokens to overlap between chunks.
Returns:
List[str]: A list of text strings, where each string is a chunk.
"""
if (
not tokens
): # Handles empty or whitespace-only original text that led to no tokens
return [original_text_content] if original_text_content.strip() else []
if len(tokens) <= max_chunk_tokens:
# If not chunking, return the original text content directly, as per MEMORY[a777e6ad-11c4-4b90-8e6e-63a923a94432]
# The memory states raw text segments are passed directly to the model.
# Joining tokens here would alter spacing, etc.
return [original_text_content]
reconstructed_text_chunks = []
start_idx = 0
while start_idx < len(tokens):
end_idx = min(start_idx + max_chunk_tokens, len(tokens))
current_chunk_botok_tokens = tokens[start_idx:end_idx]
# Reconstruct the text chunk by joining the botok tokens. This is an approximation.
# The semantic model's internal tokenizer will handle this string.
reconstructed_text_chunks.append(" ".join(current_chunk_botok_tokens))
if end_idx == len(tokens):
break
next_start_idx = start_idx + max_chunk_tokens - overlap_tokens
if next_start_idx <= start_idx:
next_start_idx = start_idx + 1
start_idx = next_start_idx
return reconstructed_text_chunks
def compute_normalized_lcs(words1: List[str], words2: List[str]) -> float:
# Calculate m and n (lengths) here, so they are available for normalization
# regardless of which LCS implementation is used.
m, n = len(words1), len(words2)
if USE_CYTHON_LCS:
# Use the Cython-compiled version if available
lcs_length = compute_lcs_fast(words1, words2)
else:
# Fallback to pure Python implementation
# m, n = len(words1), len(words2) # Moved to the beginning of the function
# Using numpy array for dp table can be slightly faster than list of lists for large inputs
# but the primary bottleneck is the Python loop itself compared to Cython.
dp = np.zeros((m + 1, n + 1), dtype=np.int32)
for i in range(1, m + 1):
for j in range(1, n + 1):
if words1[i - 1] == words2[j - 1]:
dp[i, j] = dp[i - 1, j - 1] + 1
else:
dp[i, j] = max(dp[i - 1, j], dp[i, j - 1])
lcs_length = int(dp[m, n])
avg_length = (m + n) / 2
return lcs_length / avg_length if avg_length > 0 else 0.0
def compute_semantic_similarity(
text1_segment: str,
text2_segment: str,
tokens1: List[str],
tokens2: List[str],
model,
device,
model_type: str = "sentence_transformer",
use_stopwords: bool = True,
use_lite_stopwords: bool = False,
) -> float:
"""Computes semantic similarity using a sentence transformer model, with chunking for long texts."""
if model is None or device is None:
logger.warning(
"Semantic similarity model or device not available. Skipping calculation."
)
return np.nan # Return NaN if model isn't loaded
if not text1_segment or not text2_segment:
logger.info(
"One or both texts are empty for semantic similarity. Returning 0.0."
)
return 0.0 # Or np.nan, depending on desired behavior for empty inputs
def _get_aggregated_embedding(
raw_text_segment: str, botok_tokens: List[str], model_obj, device_str, model_type: str = "sentence_transformer", use_stopwords: bool = True, use_lite_stopwords: bool = False
) -> torch.Tensor | None:
"""Helper to get a single embedding for a text, chunking if necessary for transformer models."""
if (
not botok_tokens and not raw_text_segment.strip()
): # Check if effectively empty
logger.info(
f"Text segment is empty or only whitespace: {raw_text_segment[:100]}... Returning None for embedding."
)
return None
# For FastText, we don't need chunking as it processes tokens directly
if model_type == "fasttext":
if not raw_text_segment.strip():
logger.info(
f"Text segment is empty or only whitespace: {raw_text_segment[:100]}... Returning None for embedding."
)
return None
# Pass the raw text, pre-tokenized tokens, and stopword parameters
# Wrap the tokens in a list since generate_embeddings expects a list of token lists
embedding = generate_embeddings(
[raw_text_segment],
model_obj,
device_str,
model_type,
tokenize_fn=[botok_tokens], # Wrap in list since we're passing tokens for one text
use_stopwords=use_stopwords,
use_lite_stopwords=use_lite_stopwords
)
if embedding is None or embedding.nelement() == 0:
logger.error(
f"Failed to generate FastText embedding for text: {raw_text_segment[:100]}..."
)
return None
return embedding # Already [1, embed_dim]
# For transformer models, check if all tokens are stopwords when filtering is enabled
elif use_stopwords:
# Filter stopwords to see if any content remains
filtered_tokens = []
if use_lite_stopwords:
from .stopwords_lite_bo import TIBETAN_STOPWORDS_LITE_SET
filtered_tokens = [token for token in botok_tokens if token not in TIBETAN_STOPWORDS_LITE_SET]
else:
from .stopwords_bo import TIBETAN_STOPWORDS_SET
filtered_tokens = [token for token in botok_tokens if token not in TIBETAN_STOPWORDS_SET]
# If all tokens were filtered out as stopwords, return zero embedding
if not filtered_tokens:
logger.info("All tokens in text are stopwords. Returning zero embedding.")
# Create a zero tensor with the same dimension as the model's output
# For transformer models, typically 384 or 768 dimensions
embedding_dim = 384 # Default dimension for MiniLM models
return torch.zeros(1, embedding_dim)
# Continue with normal processing if content remains after filtering
if len(botok_tokens) > MAX_TOKENS_PER_CHUNK:
logger.info(
f"Text segment with ~{len(botok_tokens)} tokens exceeds {MAX_TOKENS_PER_CHUNK}, chunking {raw_text_segment[:30]}..."
)
# Pass the original raw text and its pre-computed botok tokens to _chunk_text
text_chunks = _chunk_text(
raw_text_segment, botok_tokens, MAX_TOKENS_PER_CHUNK, CHUNK_OVERLAP
)
if not text_chunks:
logger.warning(
f"Chunking resulted in no chunks for segment: {raw_text_segment[:100]}..."
)
return None
logger.info(
f"Generated {len(text_chunks)} chunks for segment: {raw_text_segment[:30]}..."
)
# Generate embeddings for each chunk using the model
chunk_embeddings = generate_embeddings(text_chunks, model_obj, device_str, model_type)
if chunk_embeddings is None or chunk_embeddings.nelement() == 0:
logger.error(
f"Failed to generate embeddings for chunks of text: {raw_text_segment[:100]}..."
)
return None
# Mean pooling of chunk embeddings
aggregated_embedding = torch.mean(chunk_embeddings, dim=0, keepdim=True)
return aggregated_embedding
else:
# Text is short enough for transformer model, embed raw text directly
if not raw_text_segment.strip():
logger.info(
f"Text segment is empty or only whitespace: {raw_text_segment[:100]}... Returning None for embedding."
)
return None
embedding = generate_embeddings([raw_text_segment], model_obj, device_str, model_type)
if embedding is None or embedding.nelement() == 0:
logger.error(
f"Failed to generate embedding for text: {raw_text_segment[:100]}..."
)
return None
return embedding # Already [1, embed_dim]
else:
# No stopword filtering, proceed with normal processing
if len(botok_tokens) > MAX_TOKENS_PER_CHUNK:
logger.info(
f"Text segment with ~{len(botok_tokens)} tokens exceeds {MAX_TOKENS_PER_CHUNK}, chunking {raw_text_segment[:30]}..."
)
# Pass the original raw text and its pre-computed botok tokens to _chunk_text
text_chunks = _chunk_text(
raw_text_segment, botok_tokens, MAX_TOKENS_PER_CHUNK, CHUNK_OVERLAP
)
if not text_chunks:
logger.warning(
f"Chunking resulted in no chunks for segment: {raw_text_segment[:100]}..."
)
return None
logger.info(
f"Generated {len(text_chunks)} chunks for segment: {raw_text_segment[:30]}..."
)
# Generate embeddings for each chunk using the model
chunk_embeddings = generate_embeddings(text_chunks, model_obj, device_str, model_type)
if chunk_embeddings is None or chunk_embeddings.nelement() == 0:
logger.error(
f"Failed to generate embeddings for chunks of text: {raw_text_segment[:100]}..."
)
return None
# Mean pooling of chunk embeddings
aggregated_embedding = torch.mean(chunk_embeddings, dim=0, keepdim=True)
return aggregated_embedding
else:
# Text is short enough for transformer model, embed raw text directly
if not raw_text_segment.strip():
logger.info(
f"Text segment is empty or only whitespace: {raw_text_segment[:100]}... Returning None for embedding."
)
return None
embedding = generate_embeddings([raw_text_segment], model_obj, device_str, model_type)
if embedding is None or embedding.nelement() == 0:
logger.error(
f"Failed to generate embedding for text: {raw_text_segment[:100]}..."
)
return None
return embedding # Already [1, embed_dim]
try:
# Pass raw text and its pre-computed botok tokens with stopword preference
embedding1 = _get_aggregated_embedding(text1_segment, tokens1, model, device, model_type, use_stopwords, use_lite_stopwords)
embedding2 = _get_aggregated_embedding(text2_segment, tokens2, model, device, model_type, use_stopwords, use_lite_stopwords)
if (
embedding1 is None
or embedding2 is None
or embedding1.nelement() == 0
or embedding2.nelement() == 0
):
logger.error(
"Failed to obtain one or both aggregated embeddings for semantic similarity."
)
return np.nan
# Check if both embeddings are zero vectors (which happens when all tokens are stopwords)
if np.all(embedding1.numpy() == 0) and np.all(embedding2.numpy() == 0):
# If both texts contain only stopwords, return 0 similarity
return 0.0
# Cosine similarity expects 2D arrays, embeddings are [1, embed_dim] and on CPU
similarity = cosine_similarity(embedding1.numpy(), embedding2.numpy())
return float(similarity[0][0])
except Exception as e:
logger.error(
f"Error computing semantic similarity with chunking:\nText1: '{text1_segment[:100]}...'\nText2: '{text2_segment[:100]}...'\nError: {e}",
exc_info=True,
)
return np.nan
def compute_all_metrics(
texts: Dict[str, str], model=None, device=None, enable_semantic: bool = True,
model_type: str = "sentence_transformer", use_stopwords: bool = True,
use_lite_stopwords: bool = False
) -> pd.DataFrame:
"""
Computes all selected similarity metrics between pairs of texts.
Args:
texts (Dict[str, str]): A dictionary where keys are text identifiers (e.g., filenames or segment IDs)
and values are the text content strings.
model (SentenceTransformer, optional): The pre-loaded sentence transformer model.
Defaults to None.
device (str, optional): The device the model is on ('cuda' or 'cpu').
Defaults to None.
Returns:
pd.DataFrame: A DataFrame where each row contains the metrics for a pair of texts,
including 'Text Pair', 'Jaccard Similarity (%)', 'Normalized LCS',
and 'Semantic Similarity'.
"""
files = list(texts.keys())
results = []
# Prepare token lists (always use tokenize_texts for raw Unicode)
token_lists = {}
corpus_for_tfidf = [] # For storing space-joined tokens for TF-IDF
for fname, content in texts.items():
tokenized_content = tokenize_texts([content]) # Returns a list of lists
if tokenized_content and tokenized_content[0]:
token_lists[fname] = tokenized_content[0]
else:
token_lists[fname] = []
# Regardless of whether tokenized_content[0] exists, prepare entry for TF-IDF corpus
# If tokens exist, join them; otherwise, use an empty string for that document
corpus_for_tfidf.append(
" ".join(token_lists[fname])
if fname in token_lists and token_lists[fname]
else ""
)
# TF-IDF Vectorization and Cosine Similarity Calculation
if corpus_for_tfidf:
try:
# Using a dummy tokenizer and preprocessor as input is already tokenized (as space-separated strings)
# and we don't want further case changes or token modifications for Tibetan.
# Select appropriate stopwords list based on user preference
if use_stopwords:
# Choose between regular and lite stopwords list
if use_lite_stopwords:
stopwords_to_use = TIBETAN_STOPWORDS_LITE
else:
stopwords_to_use = TIBETAN_STOPWORDS
else:
# If stopwords are disabled, use an empty list
stopwords_to_use = []
vectorizer = TfidfVectorizer(
tokenizer=lambda x: x.split(),
preprocessor=lambda x: x,
token_pattern=None,
stop_words=stopwords_to_use
)
tfidf_matrix = vectorizer.fit_transform(corpus_for_tfidf)
# Calculate pairwise cosine similarity on the TF-IDF matrix
# This gives a square matrix where cosine_sim_matrix[i, j] is the similarity between doc i and doc j
cosine_sim_matrix = cosine_similarity(tfidf_matrix)
except ValueError as e:
if "empty vocabulary" in str(e):
# If vocabulary is empty after stopword removal, create a zero matrix
n = len(corpus_for_tfidf)
cosine_sim_matrix = np.zeros((n, n))
else:
# Re-raise other ValueError
raise
else:
# Handle case with no texts or all empty texts
n = len(files) if files else 0
cosine_sim_matrix = np.zeros((n, n))
for i, j in combinations(range(len(files)), 2):
f1, f2 = files[i], files[j]
words1_raw, words2_raw = token_lists[f1], token_lists[f2]
# Select appropriate stopwords set based on user preference
if use_stopwords:
# Choose between regular and lite stopwords sets
if use_lite_stopwords:
stopwords_set_to_use = TIBETAN_STOPWORDS_LITE_SET
else:
stopwords_set_to_use = TIBETAN_STOPWORDS_SET
else:
# If stopwords are disabled, use an empty set
stopwords_set_to_use = set()
# Filter stopwords for Jaccard calculation
words1_jaccard = [word for word in words1_raw if word not in stopwords_set_to_use]
words2_jaccard = [word for word in words2_raw if word not in stopwords_set_to_use]
# Check if both texts only contain stopwords
both_only_stopwords = len(words1_jaccard) == 0 and len(words2_jaccard) == 0
jaccard = (
len(set(words1_jaccard) & set(words2_jaccard)) / len(set(words1_jaccard) | set(words2_jaccard))
if set(words1_jaccard) | set(words2_jaccard) # Ensure denominator is not zero
else 0.0
)
# LCS uses raw tokens (words1_raw, words2_raw) to provide a complementary metric.
# Semantic similarity also uses raw text and its botok tokens for chunking decisions.
jaccard_percent = jaccard * 100.0
norm_lcs = compute_normalized_lcs(words1_raw, words2_raw)
# Semantic Similarity Calculation
if enable_semantic:
# Pass raw texts and their pre-computed botok tokens
semantic_sim = compute_semantic_similarity(
texts[f1], texts[f2], words1_raw, words2_raw, model, device, model_type, use_stopwords, use_lite_stopwords
)
else:
semantic_sim = np.nan
results.append(
{
"Text Pair": f"{f1} vs {f2}",
"Jaccard Similarity (%)": jaccard_percent,
"Normalized LCS": norm_lcs,
# Pass tokens1 and tokens2 to compute_semantic_similarity
"Semantic Similarity": semantic_sim,
"TF-IDF Cosine Sim": (
0.0 if both_only_stopwords else
cosine_sim_matrix[i, j]
if cosine_sim_matrix.size > 0
and i < cosine_sim_matrix.shape[0]
and j < cosine_sim_matrix.shape[1]
else np.nan
),
}
)
return pd.DataFrame(results)
|