Spaces:
Running
Running
File size: 15,731 Bytes
4bf5701 3011301 4bf5701 3011301 0bbf2df 4bf5701 0bbf2df 4bf5701 0bbf2df 4bf5701 3011301 4bf5701 3011301 b4c92f5 3011301 4bf5701 3011301 4bf5701 3011301 4bf5701 3011301 4bf5701 3011301 4bf5701 3011301 4bf5701 b4c92f5 3011301 b4c92f5 3011301 4bf5701 3011301 4bf5701 3011301 4bf5701 3011301 b4c92f5 3011301 4bf5701 3011301 4bf5701 3011301 4bf5701 3011301 4bf5701 3011301 4bf5701 b4c92f5 4bf5701 3011301 4bf5701 3011301 4bf5701 3011301 b4c92f5 3011301 b4c92f5 3011301 b4c92f5 4bf5701 b4c92f5 4bf5701 0bbf2df b4c92f5 0bbf2df 4bf5701 0bbf2df 4bf5701 0bbf2df 4bf5701 0bbf2df 4bf5701 3011301 4bf5701 b4c92f5 4bf5701 b4c92f5 4bf5701 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 |
import numpy as np
import pandas as pd
from typing import List, Dict, Union
from itertools import combinations
from sklearn.metrics.pairwise import cosine_similarity
from .semantic_embedding import generate_embeddings
from .tokenize import tokenize_texts
import logging
from sklearn.feature_extraction.text import TfidfVectorizer
from .stopwords_bo import TIBETAN_STOPWORDS
from .stopwords_lite_bo import TIBETAN_STOPWORDS_LITE
# Attempt to import the Cython-compiled fast_lcs module
try:
from .fast_lcs import compute_lcs_fast
USE_CYTHON_LCS = True
except ImportError:
# print("Cython fast_lcs not found, using Python LCS. For better performance, compile the Cython module.")
USE_CYTHON_LCS = False
logger = logging.getLogger(__name__)
def compute_normalized_lcs(words1: List[str], words2: List[str]) -> float:
# Calculate m and n (lengths) here, so they are available for normalization
# regardless of which LCS implementation is used.
m, n = len(words1), len(words2)
if USE_CYTHON_LCS:
# Use the Cython-compiled version if available
lcs_length = compute_lcs_fast(words1, words2)
else:
# Fallback to pure Python implementation
# m, n = len(words1), len(words2) # Moved to the beginning of the function
# Using numpy array for dp table can be slightly faster than list of lists for large inputs
# but the primary bottleneck is the Python loop itself compared to Cython.
dp = np.zeros((m + 1, n + 1), dtype=np.int32)
for i in range(1, m + 1):
for j in range(1, n + 1):
if words1[i - 1] == words2[j - 1]:
dp[i, j] = dp[i - 1, j - 1] + 1
else:
dp[i, j] = max(dp[i - 1, j], dp[i, j - 1])
lcs_length = int(dp[m, n])
avg_length = (m + n) / 2
return lcs_length / avg_length if avg_length > 0 else 0.0
def compute_semantic_similarity(
text1_segment: str,
text2_segment: str,
tokens1: List[str], # botok tokens for text1, not directly used by FastText path but kept for signature
tokens2: List[str], # botok tokens for text2, not directly used by FastText path but kept for signature
model, # FastText model object
model_type: str = "fasttext", # Should always be 'fasttext' when called
use_stopwords: bool = True,
use_lite_stopwords: bool = False,
fasttext_tokenize_fn=None,
term_freq_corpus=None,
doc_freq_map=None,
total_docs_in_corpus=0
) -> float:
"""Computes semantic similarity using a FastText model."""
if model_type != "fasttext":
logger.error(f"compute_semantic_similarity called with unexpected model_type: {model_type}")
return np.nan
if model is None:
logger.warning(
"FastText model not available for semantic similarity. Skipping calculation."
)
return np.nan
if not text1_segment or not text2_segment:
logger.info(
"One or both texts are empty for semantic similarity. Returning 0.0."
)
return 0.0
def _get_aggregated_embedding(
raw_text_segment: str,
_botok_tokens: List[str], # Parameter name prefixed with _ to indicate it's not used
model_obj,
use_stopwords_param: bool,
use_lite_stopwords_param: bool,
tokenize_fn_param,
term_freq_corpus_param,
doc_freq_map_param,
total_docs_in_corpus_param
) -> Union[np.ndarray, None]:
"""Helper to get a single embedding for a text using FastText."""
if not raw_text_segment.strip():
logger.info(
f"Text segment is empty or only whitespace: {raw_text_segment[:100]}... Returning None for embedding."
)
return None
embedding = generate_embeddings(
texts=[raw_text_segment],
model=model_obj,
tokenize_fn=tokenize_fn_param,
use_stopwords=use_stopwords_param,
use_lite_stopwords=use_lite_stopwords_param,
corpus_token_freq=term_freq_corpus_param,
doc_freq_map=doc_freq_map_param,
total_docs_in_corpus=total_docs_in_corpus_param
)
if embedding is None or embedding.size == 0:
logger.error(
f"Failed to generate FastText embedding for text: {raw_text_segment[:100]}..."
)
return None
return embedding
try:
# Pass all relevant parameters to _get_aggregated_embedding
emb1 = _get_aggregated_embedding(text1_segment, tokens1, model, use_stopwords, use_lite_stopwords, fasttext_tokenize_fn, term_freq_corpus, doc_freq_map, total_docs_in_corpus)
emb2 = _get_aggregated_embedding(text2_segment, tokens2, model, use_stopwords, use_lite_stopwords, fasttext_tokenize_fn, term_freq_corpus, doc_freq_map, total_docs_in_corpus)
if emb1 is None or emb2 is None or emb1.size == 0 or emb2.size == 0:
logger.error(
"Failed to obtain one or both FastText embeddings for semantic similarity."
)
return np.nan
# Ensure embeddings are numpy arrays (should be, but defensive)
if not isinstance(emb1, np.ndarray): emb1 = np.array(emb1)
if not isinstance(emb2, np.ndarray): emb2 = np.array(emb2)
# Handle cases where embeddings are all zeros
if np.all(emb1 == 0) and np.all(emb2 == 0):
logger.info("Both FastText embeddings are zero. Semantic similarity is 0.0.")
return 0.0
if np.all(emb1 == 0) or np.all(emb2 == 0):
logger.info("One of the FastText embeddings is zero. Semantic similarity is 0.0.")
return 0.0
# Handle NaN or Inf in embeddings
if np.isnan(emb1).any() or np.isinf(emb1).any() or \
np.isnan(emb2).any() or np.isinf(emb2).any():
logger.warning("NaN or Inf found in FastText embeddings. Semantic similarity set to 0.0.")
return 0.0
# Ensure embeddings are 2D for cosine_similarity: [1, dim]
if emb1.ndim == 1: emb1 = emb1.reshape(1, -1)
if emb2.ndim == 1: emb2 = emb2.reshape(1, -1)
similarity_score = cosine_similarity(emb1, emb2)[0][0]
return max(0.0, float(similarity_score))
except Exception as e:
safe_text1 = str(text1_segment)[:100] if text1_segment is not None else "N/A"
safe_text2 = str(text2_segment)[:100] if text2_segment is not None else "N/A"
logger.error(
f"Error during FastText semantic similarity calculation:\nText1: {safe_text1}...\nText2: {safe_text2}...\nError: {e}"
)
logger.exception("Traceback for FastText semantic similarity calculation error:")
return np.nan
def compute_all_metrics(
texts: Dict[str, str], model=None, enable_semantic: bool = True, # device=None removed
model_type: str = "fasttext", use_stopwords: bool = True,
use_lite_stopwords: bool = False,
fasttext_tokenize_fn=None # Added for FastText specific tokenizer
) -> pd.DataFrame:
"""
Computes all selected similarity metrics between pairs of texts.
Args:
texts (Dict[str, str]): A dictionary where keys are text identifiers (e.g., filenames or segment IDs)
and values are the text content strings.
model (SentenceTransformer, optional): The pre-loaded sentence transformer model.
Defaults to None.
device (str, optional): The device the model is on ('cuda' or 'cpu').
Defaults to None.
Returns:
pd.DataFrame: A DataFrame where each row contains the metrics for a pair of texts,
including 'Text Pair', 'Jaccard Similarity (%)', 'Normalized LCS',
and 'Semantic Similarity'.
"""
files = list(texts.keys())
results = []
# Prepare token lists (always use tokenize_texts for raw Unicode)
token_lists = {} # Stores botok tokens for each text_id, used for Jaccard, LCS, and semantic sim
corpus_for_sklearn_tfidf = [] # For storing space-joined tokens for scikit-learn's TF-IDF
# For FastText TF-IDF related statistics
term_freq_corpus_for_fasttext = {} # Renamed from global_corpus_token_freq_for_fasttext
document_frequency_map_for_fasttext = {}
total_num_documents_for_fasttext = len(texts)
stopwords_set_for_fasttext_stats_calc = set()
if use_stopwords: # This 'use_stopwords' is an arg to compute_all_metrics
if use_lite_stopwords:
from .stopwords_lite_bo import TIBETAN_STOPWORDS_LITE_SET
stopwords_set_for_fasttext_stats_calc = TIBETAN_STOPWORDS_LITE_SET
else:
from .stopwords_bo import TIBETAN_STOPWORDS_SET
stopwords_set_for_fasttext_stats_calc = TIBETAN_STOPWORDS_SET
for fname, content in texts.items():
current_tokens_for_file = []
tokenized_content_list_of_lists = tokenize_texts([content])
if tokenized_content_list_of_lists and tokenized_content_list_of_lists[0]:
current_tokens_for_file = tokenized_content_list_of_lists[0]
token_lists[fname] = current_tokens_for_file
corpus_for_sklearn_tfidf.append(" ".join(current_tokens_for_file) if current_tokens_for_file else "")
if model_type == "fasttext":
tokens_for_fasttext_stats = []
if fasttext_tokenize_fn is not None:
tokens_for_fasttext_stats = fasttext_tokenize_fn(content)
else:
tokens_for_fasttext_stats = current_tokens_for_file
filtered_tokens_for_stats = [
token for token in tokens_for_fasttext_stats if token not in stopwords_set_for_fasttext_stats_calc
] if use_stopwords else tokens_for_fasttext_stats
# Update corpus-wide term frequencies
for token in filtered_tokens_for_stats:
if token.strip():
term_freq_corpus_for_fasttext[token] = term_freq_corpus_for_fasttext.get(token, 0) + 1
# Update document frequencies
unique_filtered_tokens_in_doc = set(filtered_tokens_for_stats)
for token in unique_filtered_tokens_in_doc:
if token.strip():
document_frequency_map_for_fasttext[token] = document_frequency_map_for_fasttext.get(token, 0) + 1
if model_type == "fasttext":
logger.info(f"Built FastText corpus term frequency map with {len(term_freq_corpus_for_fasttext)} unique tokens.")
logger.info(f"Built FastText document frequency map with {len(document_frequency_map_for_fasttext)} unique tokens across {total_num_documents_for_fasttext} documents.")
# TF-IDF Vectorization and Cosine Similarity Calculation
if corpus_for_sklearn_tfidf:
try:
# Using a dummy tokenizer and preprocessor as input is already tokenized (as space-separated strings)
# and we don't want further case changes or token modifications for Tibetan.
# Select appropriate stopwords list based on user preference
if use_stopwords:
# Choose between regular and lite stopwords list
if use_lite_stopwords:
stopwords_to_use = TIBETAN_STOPWORDS_LITE
else:
stopwords_to_use = TIBETAN_STOPWORDS
else:
# If stopwords are disabled, use an empty list
stopwords_to_use = []
vectorizer = TfidfVectorizer(
tokenizer=lambda x: x.split(),
preprocessor=lambda x: x,
token_pattern=None,
stop_words=stopwords_to_use
)
tfidf_matrix = vectorizer.fit_transform(corpus_for_sklearn_tfidf)
# Calculate pairwise cosine similarity on the TF-IDF matrix
# This gives a square matrix where cosine_sim_matrix[i, j] is the similarity between doc i and doc j
cosine_sim_matrix = cosine_similarity(tfidf_matrix)
except ValueError as e:
if "empty vocabulary" in str(e):
# If vocabulary is empty after stopword removal, create a zero matrix
n = len(corpus_for_sklearn_tfidf)
cosine_sim_matrix = np.zeros((n, n))
else:
# Re-raise other ValueError
raise
else:
# Handle case with no texts or all empty texts
n = len(files) if files else 0
cosine_sim_matrix = np.zeros((n, n))
for i, j in combinations(range(len(files)), 2):
f1, f2 = files[i], files[j]
words1_raw, words2_raw = token_lists[f1], token_lists[f2]
# Select appropriate stopwords set based on user preference
if use_stopwords:
# Choose between regular and lite stopwords sets
if use_lite_stopwords:
stopwords_set_to_use = TIBETAN_STOPWORDS_LITE_SET
else:
stopwords_set_to_use = TIBETAN_STOPWORDS_SET
else:
# If stopwords are disabled, use an empty set
stopwords_set_to_use = set()
# Filter stopwords for Jaccard calculation
words1_jaccard = [word for word in words1_raw if word not in stopwords_set_to_use]
words2_jaccard = [word for word in words2_raw if word not in stopwords_set_to_use]
# Check if both texts only contain stopwords
both_only_stopwords = len(words1_jaccard) == 0 and len(words2_jaccard) == 0
jaccard = (
len(set(words1_jaccard) & set(words2_jaccard)) / len(set(words1_jaccard) | set(words2_jaccard))
if set(words1_jaccard) | set(words2_jaccard) # Ensure denominator is not zero
else 0.0
)
# LCS uses raw tokens (words1_raw, words2_raw) to provide a complementary metric.
# Semantic similarity also uses raw text and its botok tokens for chunking decisions.
jaccard_percent = jaccard * 100.0
norm_lcs = compute_normalized_lcs(words1_raw, words2_raw)
# Semantic Similarity Calculation
if enable_semantic:
# Pass raw texts and their pre-computed botok tokens
semantic_sim = compute_semantic_similarity(
texts[f1], texts[f2], words1_raw, words2_raw, model, model_type, use_stopwords, use_lite_stopwords, # device removed
fasttext_tokenize_fn=fasttext_tokenize_fn,
term_freq_corpus=term_freq_corpus_for_fasttext if model_type == "fasttext" else None,
doc_freq_map=document_frequency_map_for_fasttext if model_type == "fasttext" else None,
total_docs_in_corpus=total_num_documents_for_fasttext if model_type == "fasttext" else 0
)
else:
semantic_sim = np.nan
results.append(
{
"Text Pair": f"{f1} vs {f2}",
"Jaccard Similarity (%)": jaccard_percent,
"Normalized LCS": norm_lcs,
# Pass tokens1 and tokens2 to compute_semantic_similarity
"Semantic Similarity": semantic_sim,
"TF-IDF Cosine Sim": (
0.0 if both_only_stopwords else
cosine_sim_matrix[i, j]
if cosine_sim_matrix.size > 0
and i < cosine_sim_matrix.shape[0]
and j < cosine_sim_matrix.shape[1]
else np.nan
),
}
)
return pd.DataFrame(results)
|