Spaces:
Running
Running
import numpy as np | |
import pandas as pd | |
from typing import List, Dict | |
from itertools import combinations | |
from sklearn.metrics.pairwise import cosine_similarity | |
import torch | |
from .semantic_embedding import generate_embeddings | |
from .tokenize import tokenize_texts | |
import logging | |
from sklearn.feature_extraction.text import TfidfVectorizer | |
from .stopwords_bo import TIBETAN_STOPWORDS, TIBETAN_STOPWORDS_SET | |
from .stopwords_lite_bo import TIBETAN_STOPWORDS_LITE, TIBETAN_STOPWORDS_LITE_SET | |
# Attempt to import the Cython-compiled fast_lcs module | |
try: | |
from .fast_lcs import compute_lcs_fast | |
USE_CYTHON_LCS = True | |
except ImportError: | |
# print("Cython fast_lcs not found, using Python LCS. For better performance, compile the Cython module.") | |
USE_CYTHON_LCS = False | |
logger = logging.getLogger(__name__) | |
MAX_TOKENS_PER_CHUNK = 500 # Max tokens (words via botok) per chunk | |
CHUNK_OVERLAP = 50 # Number of tokens to overlap between chunks | |
def _chunk_text( | |
original_text_content: str, | |
tokens: List[str], | |
max_chunk_tokens: int, | |
overlap_tokens: int, | |
) -> List[str]: | |
""" | |
Splits a list of tokens into chunks and reconstructs text segments from these token chunks. | |
The reconstructed text segments are intended for embedding models. | |
Args: | |
original_text_content (str): The original raw text string. Used if no chunking is needed. | |
tokens (List[str]): The list of botok tokens for the original_text_content. | |
max_chunk_tokens (int): Maximum number of botok tokens per chunk. | |
overlap_tokens (int): Number of botok tokens to overlap between chunks. | |
Returns: | |
List[str]: A list of text strings, where each string is a chunk. | |
""" | |
if ( | |
not tokens | |
): # Handles empty or whitespace-only original text that led to no tokens | |
return [original_text_content] if original_text_content.strip() else [] | |
if len(tokens) <= max_chunk_tokens: | |
# If not chunking, return the original text content directly, as per MEMORY[a777e6ad-11c4-4b90-8e6e-63a923a94432] | |
# The memory states raw text segments are passed directly to the model. | |
# Joining tokens here would alter spacing, etc. | |
return [original_text_content] | |
reconstructed_text_chunks = [] | |
start_idx = 0 | |
while start_idx < len(tokens): | |
end_idx = min(start_idx + max_chunk_tokens, len(tokens)) | |
current_chunk_botok_tokens = tokens[start_idx:end_idx] | |
# Reconstruct the text chunk by joining the botok tokens. This is an approximation. | |
# The semantic model's internal tokenizer will handle this string. | |
reconstructed_text_chunks.append(" ".join(current_chunk_botok_tokens)) | |
if end_idx == len(tokens): | |
break | |
next_start_idx = start_idx + max_chunk_tokens - overlap_tokens | |
if next_start_idx <= start_idx: | |
next_start_idx = start_idx + 1 | |
start_idx = next_start_idx | |
return reconstructed_text_chunks | |
def compute_normalized_lcs(words1: List[str], words2: List[str]) -> float: | |
# Calculate m and n (lengths) here, so they are available for normalization | |
# regardless of which LCS implementation is used. | |
m, n = len(words1), len(words2) | |
if USE_CYTHON_LCS: | |
# Use the Cython-compiled version if available | |
lcs_length = compute_lcs_fast(words1, words2) | |
else: | |
# Fallback to pure Python implementation | |
# m, n = len(words1), len(words2) # Moved to the beginning of the function | |
# Using numpy array for dp table can be slightly faster than list of lists for large inputs | |
# but the primary bottleneck is the Python loop itself compared to Cython. | |
dp = np.zeros((m + 1, n + 1), dtype=np.int32) | |
for i in range(1, m + 1): | |
for j in range(1, n + 1): | |
if words1[i - 1] == words2[j - 1]: | |
dp[i, j] = dp[i - 1, j - 1] + 1 | |
else: | |
dp[i, j] = max(dp[i - 1, j], dp[i, j - 1]) | |
lcs_length = int(dp[m, n]) | |
avg_length = (m + n) / 2 | |
return lcs_length / avg_length if avg_length > 0 else 0.0 | |
def compute_semantic_similarity( | |
text1_segment: str, | |
text2_segment: str, | |
tokens1: List[str], | |
tokens2: List[str], | |
model, | |
device, | |
model_type: str = "sentence_transformer", | |
use_stopwords: bool = True, | |
use_lite_stopwords: bool = False, | |
) -> float: | |
"""Computes semantic similarity using a sentence transformer model, with chunking for long texts.""" | |
if model is None or device is None: | |
logger.warning( | |
"Semantic similarity model or device not available. Skipping calculation." | |
) | |
return np.nan # Return NaN if model isn't loaded | |
if not text1_segment or not text2_segment: | |
logger.info( | |
"One or both texts are empty for semantic similarity. Returning 0.0." | |
) | |
return 0.0 # Or np.nan, depending on desired behavior for empty inputs | |
def _get_aggregated_embedding( | |
raw_text_segment: str, botok_tokens: List[str], model_obj, device_str, model_type: str = "sentence_transformer", use_stopwords: bool = True, use_lite_stopwords: bool = False | |
) -> torch.Tensor | None: | |
"""Helper to get a single embedding for a text, chunking if necessary for transformer models.""" | |
if ( | |
not botok_tokens and not raw_text_segment.strip() | |
): # Check if effectively empty | |
logger.info( | |
f"Text segment is empty or only whitespace: {raw_text_segment[:100]}... Returning None for embedding." | |
) | |
return None | |
# For FastText, we don't need chunking as it processes tokens directly | |
if model_type == "fasttext": | |
if not raw_text_segment.strip(): | |
logger.info( | |
f"Text segment is empty or only whitespace: {raw_text_segment[:100]}... Returning None for embedding." | |
) | |
return None | |
# Pass the raw text, pre-tokenized tokens, and stopword parameters | |
# Wrap the tokens in a list since generate_embeddings expects a list of token lists | |
embedding = generate_embeddings( | |
[raw_text_segment], | |
model_obj, | |
device_str, | |
model_type, | |
tokenize_fn=[botok_tokens], # Wrap in list since we're passing tokens for one text | |
use_stopwords=use_stopwords, | |
use_lite_stopwords=use_lite_stopwords | |
) | |
if embedding is None or embedding.nelement() == 0: | |
logger.error( | |
f"Failed to generate FastText embedding for text: {raw_text_segment[:100]}..." | |
) | |
return None | |
return embedding # Already [1, embed_dim] | |
# For transformer models, check if all tokens are stopwords when filtering is enabled | |
elif use_stopwords: | |
# Filter stopwords to see if any content remains | |
filtered_tokens = [] | |
if use_lite_stopwords: | |
from .stopwords_lite_bo import TIBETAN_STOPWORDS_LITE_SET | |
filtered_tokens = [token for token in botok_tokens if token not in TIBETAN_STOPWORDS_LITE_SET] | |
else: | |
from .stopwords_bo import TIBETAN_STOPWORDS_SET | |
filtered_tokens = [token for token in botok_tokens if token not in TIBETAN_STOPWORDS_SET] | |
# If all tokens were filtered out as stopwords, return zero embedding | |
if not filtered_tokens: | |
logger.info("All tokens in text are stopwords. Returning zero embedding.") | |
# Create a zero tensor with the same dimension as the model's output | |
# For transformer models, typically 384 or 768 dimensions | |
embedding_dim = 384 # Default dimension for MiniLM models | |
return torch.zeros(1, embedding_dim) | |
# Continue with normal processing if content remains after filtering | |
if len(botok_tokens) > MAX_TOKENS_PER_CHUNK: | |
logger.info( | |
f"Text segment with ~{len(botok_tokens)} tokens exceeds {MAX_TOKENS_PER_CHUNK}, chunking {raw_text_segment[:30]}..." | |
) | |
# Pass the original raw text and its pre-computed botok tokens to _chunk_text | |
text_chunks = _chunk_text( | |
raw_text_segment, botok_tokens, MAX_TOKENS_PER_CHUNK, CHUNK_OVERLAP | |
) | |
if not text_chunks: | |
logger.warning( | |
f"Chunking resulted in no chunks for segment: {raw_text_segment[:100]}..." | |
) | |
return None | |
logger.info( | |
f"Generated {len(text_chunks)} chunks for segment: {raw_text_segment[:30]}..." | |
) | |
# Generate embeddings for each chunk using the model | |
chunk_embeddings = generate_embeddings(text_chunks, model_obj, device_str, model_type) | |
if chunk_embeddings is None or chunk_embeddings.nelement() == 0: | |
logger.error( | |
f"Failed to generate embeddings for chunks of text: {raw_text_segment[:100]}..." | |
) | |
return None | |
# Mean pooling of chunk embeddings | |
aggregated_embedding = torch.mean(chunk_embeddings, dim=0, keepdim=True) | |
return aggregated_embedding | |
else: | |
# Text is short enough for transformer model, embed raw text directly | |
if not raw_text_segment.strip(): | |
logger.info( | |
f"Text segment is empty or only whitespace: {raw_text_segment[:100]}... Returning None for embedding." | |
) | |
return None | |
embedding = generate_embeddings([raw_text_segment], model_obj, device_str, model_type) | |
if embedding is None or embedding.nelement() == 0: | |
logger.error( | |
f"Failed to generate embedding for text: {raw_text_segment[:100]}..." | |
) | |
return None | |
return embedding # Already [1, embed_dim] | |
else: | |
# No stopword filtering, proceed with normal processing | |
if len(botok_tokens) > MAX_TOKENS_PER_CHUNK: | |
logger.info( | |
f"Text segment with ~{len(botok_tokens)} tokens exceeds {MAX_TOKENS_PER_CHUNK}, chunking {raw_text_segment[:30]}..." | |
) | |
# Pass the original raw text and its pre-computed botok tokens to _chunk_text | |
text_chunks = _chunk_text( | |
raw_text_segment, botok_tokens, MAX_TOKENS_PER_CHUNK, CHUNK_OVERLAP | |
) | |
if not text_chunks: | |
logger.warning( | |
f"Chunking resulted in no chunks for segment: {raw_text_segment[:100]}..." | |
) | |
return None | |
logger.info( | |
f"Generated {len(text_chunks)} chunks for segment: {raw_text_segment[:30]}..." | |
) | |
# Generate embeddings for each chunk using the model | |
chunk_embeddings = generate_embeddings(text_chunks, model_obj, device_str, model_type) | |
if chunk_embeddings is None or chunk_embeddings.nelement() == 0: | |
logger.error( | |
f"Failed to generate embeddings for chunks of text: {raw_text_segment[:100]}..." | |
) | |
return None | |
# Mean pooling of chunk embeddings | |
aggregated_embedding = torch.mean(chunk_embeddings, dim=0, keepdim=True) | |
return aggregated_embedding | |
else: | |
# Text is short enough for transformer model, embed raw text directly | |
if not raw_text_segment.strip(): | |
logger.info( | |
f"Text segment is empty or only whitespace: {raw_text_segment[:100]}... Returning None for embedding." | |
) | |
return None | |
embedding = generate_embeddings([raw_text_segment], model_obj, device_str, model_type) | |
if embedding is None or embedding.nelement() == 0: | |
logger.error( | |
f"Failed to generate embedding for text: {raw_text_segment[:100]}..." | |
) | |
return None | |
return embedding # Already [1, embed_dim] | |
try: | |
# Pass raw text and its pre-computed botok tokens with stopword preference | |
embedding1 = _get_aggregated_embedding(text1_segment, tokens1, model, device, model_type, use_stopwords, use_lite_stopwords) | |
embedding2 = _get_aggregated_embedding(text2_segment, tokens2, model, device, model_type, use_stopwords, use_lite_stopwords) | |
if ( | |
embedding1 is None | |
or embedding2 is None | |
or embedding1.nelement() == 0 | |
or embedding2.nelement() == 0 | |
): | |
logger.error( | |
"Failed to obtain one or both aggregated embeddings for semantic similarity." | |
) | |
return np.nan | |
# Check if both embeddings are zero vectors (which happens when all tokens are stopwords) | |
if np.all(embedding1.numpy() == 0) and np.all(embedding2.numpy() == 0): | |
# If both texts contain only stopwords, return 0 similarity | |
return 0.0 | |
# Cosine similarity expects 2D arrays, embeddings are [1, embed_dim] and on CPU | |
similarity = cosine_similarity(embedding1.numpy(), embedding2.numpy()) | |
return float(similarity[0][0]) | |
except Exception as e: | |
logger.error( | |
f"Error computing semantic similarity with chunking:\nText1: '{text1_segment[:100]}...'\nText2: '{text2_segment[:100]}...'\nError: {e}", | |
exc_info=True, | |
) | |
return np.nan | |
def compute_all_metrics( | |
texts: Dict[str, str], model=None, device=None, enable_semantic: bool = True, | |
model_type: str = "sentence_transformer", use_stopwords: bool = True, | |
use_lite_stopwords: bool = False | |
) -> pd.DataFrame: | |
""" | |
Computes all selected similarity metrics between pairs of texts. | |
Args: | |
texts (Dict[str, str]): A dictionary where keys are text identifiers (e.g., filenames or segment IDs) | |
and values are the text content strings. | |
model (SentenceTransformer, optional): The pre-loaded sentence transformer model. | |
Defaults to None. | |
device (str, optional): The device the model is on ('cuda' or 'cpu'). | |
Defaults to None. | |
Returns: | |
pd.DataFrame: A DataFrame where each row contains the metrics for a pair of texts, | |
including 'Text Pair', 'Jaccard Similarity (%)', 'Normalized LCS', | |
and 'Semantic Similarity'. | |
""" | |
files = list(texts.keys()) | |
results = [] | |
# Prepare token lists (always use tokenize_texts for raw Unicode) | |
token_lists = {} | |
corpus_for_tfidf = [] # For storing space-joined tokens for TF-IDF | |
for fname, content in texts.items(): | |
tokenized_content = tokenize_texts([content]) # Returns a list of lists | |
if tokenized_content and tokenized_content[0]: | |
token_lists[fname] = tokenized_content[0] | |
else: | |
token_lists[fname] = [] | |
# Regardless of whether tokenized_content[0] exists, prepare entry for TF-IDF corpus | |
# If tokens exist, join them; otherwise, use an empty string for that document | |
corpus_for_tfidf.append( | |
" ".join(token_lists[fname]) | |
if fname in token_lists and token_lists[fname] | |
else "" | |
) | |
# TF-IDF Vectorization and Cosine Similarity Calculation | |
if corpus_for_tfidf: | |
try: | |
# Using a dummy tokenizer and preprocessor as input is already tokenized (as space-separated strings) | |
# and we don't want further case changes or token modifications for Tibetan. | |
# Select appropriate stopwords list based on user preference | |
if use_stopwords: | |
# Choose between regular and lite stopwords list | |
if use_lite_stopwords: | |
stopwords_to_use = TIBETAN_STOPWORDS_LITE | |
else: | |
stopwords_to_use = TIBETAN_STOPWORDS | |
else: | |
# If stopwords are disabled, use an empty list | |
stopwords_to_use = [] | |
vectorizer = TfidfVectorizer( | |
tokenizer=lambda x: x.split(), | |
preprocessor=lambda x: x, | |
token_pattern=None, | |
stop_words=stopwords_to_use | |
) | |
tfidf_matrix = vectorizer.fit_transform(corpus_for_tfidf) | |
# Calculate pairwise cosine similarity on the TF-IDF matrix | |
# This gives a square matrix where cosine_sim_matrix[i, j] is the similarity between doc i and doc j | |
cosine_sim_matrix = cosine_similarity(tfidf_matrix) | |
except ValueError as e: | |
if "empty vocabulary" in str(e): | |
# If vocabulary is empty after stopword removal, create a zero matrix | |
n = len(corpus_for_tfidf) | |
cosine_sim_matrix = np.zeros((n, n)) | |
else: | |
# Re-raise other ValueError | |
raise | |
else: | |
# Handle case with no texts or all empty texts | |
n = len(files) if files else 0 | |
cosine_sim_matrix = np.zeros((n, n)) | |
for i, j in combinations(range(len(files)), 2): | |
f1, f2 = files[i], files[j] | |
words1_raw, words2_raw = token_lists[f1], token_lists[f2] | |
# Select appropriate stopwords set based on user preference | |
if use_stopwords: | |
# Choose between regular and lite stopwords sets | |
if use_lite_stopwords: | |
stopwords_set_to_use = TIBETAN_STOPWORDS_LITE_SET | |
else: | |
stopwords_set_to_use = TIBETAN_STOPWORDS_SET | |
else: | |
# If stopwords are disabled, use an empty set | |
stopwords_set_to_use = set() | |
# Filter stopwords for Jaccard calculation | |
words1_jaccard = [word for word in words1_raw if word not in stopwords_set_to_use] | |
words2_jaccard = [word for word in words2_raw if word not in stopwords_set_to_use] | |
# Check if both texts only contain stopwords | |
both_only_stopwords = len(words1_jaccard) == 0 and len(words2_jaccard) == 0 | |
jaccard = ( | |
len(set(words1_jaccard) & set(words2_jaccard)) / len(set(words1_jaccard) | set(words2_jaccard)) | |
if set(words1_jaccard) | set(words2_jaccard) # Ensure denominator is not zero | |
else 0.0 | |
) | |
# LCS uses raw tokens (words1_raw, words2_raw) to provide a complementary metric. | |
# Semantic similarity also uses raw text and its botok tokens for chunking decisions. | |
jaccard_percent = jaccard * 100.0 | |
norm_lcs = compute_normalized_lcs(words1_raw, words2_raw) | |
# Semantic Similarity Calculation | |
if enable_semantic: | |
# Pass raw texts and their pre-computed botok tokens | |
semantic_sim = compute_semantic_similarity( | |
texts[f1], texts[f2], words1_raw, words2_raw, model, device, model_type, use_stopwords, use_lite_stopwords | |
) | |
else: | |
semantic_sim = np.nan | |
results.append( | |
{ | |
"Text Pair": f"{f1} vs {f2}", | |
"Jaccard Similarity (%)": jaccard_percent, | |
"Normalized LCS": norm_lcs, | |
# Pass tokens1 and tokens2 to compute_semantic_similarity | |
"Semantic Similarity": semantic_sim, | |
"TF-IDF Cosine Sim": ( | |
0.0 if both_only_stopwords else | |
cosine_sim_matrix[i, j] | |
if cosine_sim_matrix.size > 0 | |
and i < cosine_sim_matrix.shape[0] | |
and j < cosine_sim_matrix.shape[1] | |
else np.nan | |
), | |
} | |
) | |
return pd.DataFrame(results) | |