Spaces:
Running
Running
import numpy as np | |
import pandas as pd | |
from typing import List, Dict, Union | |
from itertools import combinations | |
from sklearn.metrics.pairwise import cosine_similarity | |
from .semantic_embedding import generate_embeddings | |
from .tokenize import tokenize_texts | |
import logging | |
from sklearn.feature_extraction.text import TfidfVectorizer | |
from .stopwords_bo import TIBETAN_STOPWORDS | |
from .stopwords_lite_bo import TIBETAN_STOPWORDS_LITE | |
# Attempt to import the Cython-compiled fast_lcs module | |
try: | |
from .fast_lcs import compute_lcs_fast | |
USE_CYTHON_LCS = True | |
except ImportError: | |
# print("Cython fast_lcs not found, using Python LCS. For better performance, compile the Cython module.") | |
USE_CYTHON_LCS = False | |
logger = logging.getLogger(__name__) | |
def compute_normalized_lcs(words1: List[str], words2: List[str]) -> float: | |
# Calculate m and n (lengths) here, so they are available for normalization | |
# regardless of which LCS implementation is used. | |
m, n = len(words1), len(words2) | |
if USE_CYTHON_LCS: | |
# Use the Cython-compiled version if available | |
lcs_length = compute_lcs_fast(words1, words2) | |
else: | |
# Fallback to pure Python implementation | |
# m, n = len(words1), len(words2) # Moved to the beginning of the function | |
# Using numpy array for dp table can be slightly faster than list of lists for large inputs | |
# but the primary bottleneck is the Python loop itself compared to Cython. | |
dp = np.zeros((m + 1, n + 1), dtype=np.int32) | |
for i in range(1, m + 1): | |
for j in range(1, n + 1): | |
if words1[i - 1] == words2[j - 1]: | |
dp[i, j] = dp[i - 1, j - 1] + 1 | |
else: | |
dp[i, j] = max(dp[i - 1, j], dp[i, j - 1]) | |
lcs_length = int(dp[m, n]) | |
avg_length = (m + n) / 2 | |
return lcs_length / avg_length if avg_length > 0 else 0.0 | |
def compute_semantic_similarity( | |
text1_segment: str, | |
text2_segment: str, | |
tokens1: List[str], # botok tokens for text1, not directly used by FastText path but kept for signature | |
tokens2: List[str], # botok tokens for text2, not directly used by FastText path but kept for signature | |
model, # FastText model object | |
model_type: str = "fasttext", # Should always be 'fasttext' when called | |
use_stopwords: bool = True, | |
use_lite_stopwords: bool = False, | |
fasttext_tokenize_fn=None, | |
term_freq_corpus=None, | |
doc_freq_map=None, | |
total_docs_in_corpus=0 | |
) -> float: | |
"""Computes semantic similarity using a FastText model.""" | |
if model_type != "fasttext": | |
logger.error(f"compute_semantic_similarity called with unexpected model_type: {model_type}") | |
return np.nan | |
if model is None: | |
logger.warning( | |
"FastText model not available for semantic similarity. Skipping calculation." | |
) | |
return np.nan | |
if not text1_segment or not text2_segment: | |
logger.info( | |
"One or both texts are empty for semantic similarity. Returning 0.0." | |
) | |
return 0.0 | |
def _get_aggregated_embedding( | |
raw_text_segment: str, | |
_botok_tokens: List[str], # Parameter name prefixed with _ to indicate it's not used | |
model_obj, | |
use_stopwords_param: bool, | |
use_lite_stopwords_param: bool, | |
tokenize_fn_param, | |
term_freq_corpus_param, | |
doc_freq_map_param, | |
total_docs_in_corpus_param | |
) -> Union[np.ndarray, None]: | |
"""Helper to get a single embedding for a text using FastText.""" | |
if not raw_text_segment.strip(): | |
logger.info( | |
f"Text segment is empty or only whitespace: {raw_text_segment[:100]}... Returning None for embedding." | |
) | |
return None | |
embedding = generate_embeddings( | |
texts=[raw_text_segment], | |
model=model_obj, | |
tokenize_fn=tokenize_fn_param, | |
use_stopwords=use_stopwords_param, | |
use_lite_stopwords=use_lite_stopwords_param, | |
corpus_token_freq=term_freq_corpus_param, | |
doc_freq_map=doc_freq_map_param, | |
total_docs_in_corpus=total_docs_in_corpus_param | |
) | |
if embedding is None or embedding.size == 0: | |
logger.error( | |
f"Failed to generate FastText embedding for text: {raw_text_segment[:100]}..." | |
) | |
return None | |
return embedding | |
try: | |
# Pass all relevant parameters to _get_aggregated_embedding | |
emb1 = _get_aggregated_embedding(text1_segment, tokens1, model, use_stopwords, use_lite_stopwords, fasttext_tokenize_fn, term_freq_corpus, doc_freq_map, total_docs_in_corpus) | |
emb2 = _get_aggregated_embedding(text2_segment, tokens2, model, use_stopwords, use_lite_stopwords, fasttext_tokenize_fn, term_freq_corpus, doc_freq_map, total_docs_in_corpus) | |
if emb1 is None or emb2 is None or emb1.size == 0 or emb2.size == 0: | |
logger.error( | |
"Failed to obtain one or both FastText embeddings for semantic similarity." | |
) | |
return np.nan | |
# Ensure embeddings are numpy arrays (should be, but defensive) | |
if not isinstance(emb1, np.ndarray): emb1 = np.array(emb1) | |
if not isinstance(emb2, np.ndarray): emb2 = np.array(emb2) | |
# Handle cases where embeddings are all zeros | |
if np.all(emb1 == 0) and np.all(emb2 == 0): | |
logger.info("Both FastText embeddings are zero. Semantic similarity is 0.0.") | |
return 0.0 | |
if np.all(emb1 == 0) or np.all(emb2 == 0): | |
logger.info("One of the FastText embeddings is zero. Semantic similarity is 0.0.") | |
return 0.0 | |
# Handle NaN or Inf in embeddings | |
if np.isnan(emb1).any() or np.isinf(emb1).any() or \ | |
np.isnan(emb2).any() or np.isinf(emb2).any(): | |
logger.warning("NaN or Inf found in FastText embeddings. Semantic similarity set to 0.0.") | |
return 0.0 | |
# Ensure embeddings are 2D for cosine_similarity: [1, dim] | |
if emb1.ndim == 1: emb1 = emb1.reshape(1, -1) | |
if emb2.ndim == 1: emb2 = emb2.reshape(1, -1) | |
similarity_score = cosine_similarity(emb1, emb2)[0][0] | |
return max(0.0, float(similarity_score)) | |
except Exception as e: | |
safe_text1 = str(text1_segment)[:100] if text1_segment is not None else "N/A" | |
safe_text2 = str(text2_segment)[:100] if text2_segment is not None else "N/A" | |
logger.error( | |
f"Error during FastText semantic similarity calculation:\nText1: {safe_text1}...\nText2: {safe_text2}...\nError: {e}" | |
) | |
logger.exception("Traceback for FastText semantic similarity calculation error:") | |
return np.nan | |
def compute_all_metrics( | |
texts: Dict[str, str], model=None, enable_semantic: bool = True, # device=None removed | |
model_type: str = "fasttext", use_stopwords: bool = True, | |
use_lite_stopwords: bool = False, | |
fasttext_tokenize_fn=None # Added for FastText specific tokenizer | |
) -> pd.DataFrame: | |
""" | |
Computes all selected similarity metrics between pairs of texts. | |
Args: | |
texts (Dict[str, str]): A dictionary where keys are text identifiers (e.g., filenames or segment IDs) | |
and values are the text content strings. | |
model (SentenceTransformer, optional): The pre-loaded sentence transformer model. | |
Defaults to None. | |
device (str, optional): The device the model is on ('cuda' or 'cpu'). | |
Defaults to None. | |
Returns: | |
pd.DataFrame: A DataFrame where each row contains the metrics for a pair of texts, | |
including 'Text Pair', 'Jaccard Similarity (%)', 'Normalized LCS', | |
and 'Semantic Similarity'. | |
""" | |
files = list(texts.keys()) | |
results = [] | |
# Prepare token lists (always use tokenize_texts for raw Unicode) | |
token_lists = {} # Stores botok tokens for each text_id, used for Jaccard, LCS, and semantic sim | |
corpus_for_sklearn_tfidf = [] # For storing space-joined tokens for scikit-learn's TF-IDF | |
# For FastText TF-IDF related statistics | |
term_freq_corpus_for_fasttext = {} # Renamed from global_corpus_token_freq_for_fasttext | |
document_frequency_map_for_fasttext = {} | |
total_num_documents_for_fasttext = len(texts) | |
stopwords_set_for_fasttext_stats_calc = set() | |
if use_stopwords: # This 'use_stopwords' is an arg to compute_all_metrics | |
if use_lite_stopwords: | |
from .stopwords_lite_bo import TIBETAN_STOPWORDS_LITE_SET | |
stopwords_set_for_fasttext_stats_calc = TIBETAN_STOPWORDS_LITE_SET | |
else: | |
from .stopwords_bo import TIBETAN_STOPWORDS_SET | |
stopwords_set_for_fasttext_stats_calc = TIBETAN_STOPWORDS_SET | |
for fname, content in texts.items(): | |
current_tokens_for_file = [] | |
tokenized_content_list_of_lists = tokenize_texts([content]) | |
if tokenized_content_list_of_lists and tokenized_content_list_of_lists[0]: | |
current_tokens_for_file = tokenized_content_list_of_lists[0] | |
token_lists[fname] = current_tokens_for_file | |
corpus_for_sklearn_tfidf.append(" ".join(current_tokens_for_file) if current_tokens_for_file else "") | |
if model_type == "fasttext": | |
tokens_for_fasttext_stats = [] | |
if fasttext_tokenize_fn is not None: | |
tokens_for_fasttext_stats = fasttext_tokenize_fn(content) | |
else: | |
tokens_for_fasttext_stats = current_tokens_for_file | |
filtered_tokens_for_stats = [ | |
token for token in tokens_for_fasttext_stats if token not in stopwords_set_for_fasttext_stats_calc | |
] if use_stopwords else tokens_for_fasttext_stats | |
# Update corpus-wide term frequencies | |
for token in filtered_tokens_for_stats: | |
if token.strip(): | |
term_freq_corpus_for_fasttext[token] = term_freq_corpus_for_fasttext.get(token, 0) + 1 | |
# Update document frequencies | |
unique_filtered_tokens_in_doc = set(filtered_tokens_for_stats) | |
for token in unique_filtered_tokens_in_doc: | |
if token.strip(): | |
document_frequency_map_for_fasttext[token] = document_frequency_map_for_fasttext.get(token, 0) + 1 | |
if model_type == "fasttext": | |
logger.info(f"Built FastText corpus term frequency map with {len(term_freq_corpus_for_fasttext)} unique tokens.") | |
logger.info(f"Built FastText document frequency map with {len(document_frequency_map_for_fasttext)} unique tokens across {total_num_documents_for_fasttext} documents.") | |
# TF-IDF Vectorization and Cosine Similarity Calculation | |
if corpus_for_sklearn_tfidf: | |
try: | |
# Using a dummy tokenizer and preprocessor as input is already tokenized (as space-separated strings) | |
# and we don't want further case changes or token modifications for Tibetan. | |
# Select appropriate stopwords list based on user preference | |
if use_stopwords: | |
# Choose between regular and lite stopwords list | |
if use_lite_stopwords: | |
stopwords_to_use = TIBETAN_STOPWORDS_LITE | |
else: | |
stopwords_to_use = TIBETAN_STOPWORDS | |
else: | |
# If stopwords are disabled, use an empty list | |
stopwords_to_use = [] | |
vectorizer = TfidfVectorizer( | |
tokenizer=lambda x: x.split(), | |
preprocessor=lambda x: x, | |
token_pattern=None, | |
stop_words=stopwords_to_use | |
) | |
tfidf_matrix = vectorizer.fit_transform(corpus_for_sklearn_tfidf) | |
# Calculate pairwise cosine similarity on the TF-IDF matrix | |
# This gives a square matrix where cosine_sim_matrix[i, j] is the similarity between doc i and doc j | |
cosine_sim_matrix = cosine_similarity(tfidf_matrix) | |
except ValueError as e: | |
if "empty vocabulary" in str(e): | |
# If vocabulary is empty after stopword removal, create a zero matrix | |
n = len(corpus_for_sklearn_tfidf) | |
cosine_sim_matrix = np.zeros((n, n)) | |
else: | |
# Re-raise other ValueError | |
raise | |
else: | |
# Handle case with no texts or all empty texts | |
n = len(files) if files else 0 | |
cosine_sim_matrix = np.zeros((n, n)) | |
for i, j in combinations(range(len(files)), 2): | |
f1, f2 = files[i], files[j] | |
words1_raw, words2_raw = token_lists[f1], token_lists[f2] | |
# Select appropriate stopwords set based on user preference | |
if use_stopwords: | |
# Choose between regular and lite stopwords sets | |
if use_lite_stopwords: | |
stopwords_set_to_use = TIBETAN_STOPWORDS_LITE_SET | |
else: | |
stopwords_set_to_use = TIBETAN_STOPWORDS_SET | |
else: | |
# If stopwords are disabled, use an empty set | |
stopwords_set_to_use = set() | |
# Filter stopwords for Jaccard calculation | |
words1_jaccard = [word for word in words1_raw if word not in stopwords_set_to_use] | |
words2_jaccard = [word for word in words2_raw if word not in stopwords_set_to_use] | |
# Check if both texts only contain stopwords | |
both_only_stopwords = len(words1_jaccard) == 0 and len(words2_jaccard) == 0 | |
jaccard = ( | |
len(set(words1_jaccard) & set(words2_jaccard)) / len(set(words1_jaccard) | set(words2_jaccard)) | |
if set(words1_jaccard) | set(words2_jaccard) # Ensure denominator is not zero | |
else 0.0 | |
) | |
# LCS uses raw tokens (words1_raw, words2_raw) to provide a complementary metric. | |
# Semantic similarity also uses raw text and its botok tokens for chunking decisions. | |
jaccard_percent = jaccard * 100.0 | |
norm_lcs = compute_normalized_lcs(words1_raw, words2_raw) | |
# Semantic Similarity Calculation | |
if enable_semantic: | |
# Pass raw texts and their pre-computed botok tokens | |
semantic_sim = compute_semantic_similarity( | |
texts[f1], texts[f2], words1_raw, words2_raw, model, model_type, use_stopwords, use_lite_stopwords, # device removed | |
fasttext_tokenize_fn=fasttext_tokenize_fn, | |
term_freq_corpus=term_freq_corpus_for_fasttext if model_type == "fasttext" else None, | |
doc_freq_map=document_frequency_map_for_fasttext if model_type == "fasttext" else None, | |
total_docs_in_corpus=total_num_documents_for_fasttext if model_type == "fasttext" else 0 | |
) | |
else: | |
semantic_sim = np.nan | |
results.append( | |
{ | |
"Text Pair": f"{f1} vs {f2}", | |
"Jaccard Similarity (%)": jaccard_percent, | |
"Normalized LCS": norm_lcs, | |
# Pass tokens1 and tokens2 to compute_semantic_similarity | |
"Semantic Similarity": semantic_sim, | |
"TF-IDF Cosine Sim": ( | |
0.0 if both_only_stopwords else | |
cosine_sim_matrix[i, j] | |
if cosine_sim_matrix.size > 0 | |
and i < cosine_sim_matrix.shape[0] | |
and j < cosine_sim_matrix.shape[1] | |
else np.nan | |
), | |
} | |
) | |
return pd.DataFrame(results) | |