Spaces:
Running
Running
import pandas as pd | |
from typing import Dict, List, Tuple | |
from .metrics import compute_all_metrics | |
from .semantic_embedding import get_model_and_device | |
from .fasttext_embedding import load_fasttext_model # Added for custom fasttext | |
from .tokenize import tokenize_texts | |
import logging | |
from itertools import combinations | |
import re | |
# Define FASTTEXT_MODEL_ID if not already defined (it should be, from semantic_embedding or globally) | |
# For safety, let's assume it might be needed here directly for conditional logic | |
FASTTEXT_MODEL_ID = "fasttext-tibetan" # Ensure this matches the ID used elsewhere | |
def get_botok_tokens_for_single_text(text: str, mode: str = "syllable") -> list[str]: | |
""" | |
A wrapper around tokenize_texts to make it suitable for tokenize_fn | |
in generate_embeddings, which expects a function that tokenizes a single string. | |
Accepts a 'mode' argument ('syllable' or 'word') to pass to tokenize_texts. | |
""" | |
if not text.strip(): | |
return [] | |
# Pass the mode to tokenize_texts | |
tokenized_list_of_lists = tokenize_texts([text], mode=mode) | |
if tokenized_list_of_lists and tokenized_list_of_lists[0]: | |
return tokenized_list_of_lists[0] | |
return [] | |
def clean_tibetan_text_for_fasttext(text: str) -> str: | |
""" | |
Applies cleaning steps to Tibetan text similar to those in FastText training: | |
- Removes lnX/pX page/line markers. | |
- Normalizes double tsheg to single tsheg. | |
- Normalizes whitespace. | |
""" | |
# Remove lnX/pX markers | |
cleaned_text = re.sub(r"\s*(?:[lL][nN]|[pP])\d{1,3}[abAB]?\s*", " ", text) | |
# Normalize double tsheg | |
cleaned_text = re.sub(r"།\s*།", "།", cleaned_text) | |
# Normalize spaces (multiple spaces to single, strip leading/trailing) | |
cleaned_text = re.sub(r'\s+', ' ', cleaned_text).strip() | |
return cleaned_text | |
logger = logging.getLogger(__name__) | |
def process_texts( | |
text_data: Dict[str, str], | |
filenames: List[str], | |
enable_semantic: bool = True, | |
model_name: str = "buddhist-nlp/buddhist-sentence-similarity", | |
use_stopwords: bool = True, | |
use_lite_stopwords: bool = False, | |
progress_callback = None | |
) -> Tuple[pd.DataFrame, pd.DataFrame, str]: | |
""" | |
Processes uploaded texts, segments them by chapter marker, and computes metrics between chapters of different files. | |
Args: | |
text_data (Dict[str, str]): A dictionary mapping filenames to their content. | |
filenames (List[str]): A list of filenames that were uploaded. | |
enable_semantic (bool, optional): Whether to compute semantic similarity metrics. | |
Requires loading a sentence transformer model, which can be time-consuming. Defaults to True. | |
model_name (str, optional): The name of the sentence transformer model to use for semantic similarity. | |
Must be a valid model identifier on Hugging Face. Defaults to "buddhist-nlp/buddhist-sentence-similarity". | |
use_stopwords (bool, optional): Whether to use stopwords in the metrics calculation. Defaults to True. | |
use_lite_stopwords (bool, optional): Whether to use the lite stopwords list (common particles only) | |
instead of the comprehensive list. Only applies if use_stopwords is True. Defaults to False. | |
progress_callback (callable, optional): A callback function for reporting progress updates. | |
Should accept a float between 0 and 1 and a description string. Defaults to None. | |
Returns: | |
Tuple[pd.DataFrame, pd.DataFrame, str]: | |
- metrics_df: DataFrame with similarity metrics between corresponding chapters of file pairs. | |
Contains columns: 'Text Pair', 'Chapter', 'Jaccard Similarity (%)', 'Normalized LCS', | |
'Semantic Similarity' (if enable_semantic=True), and 'TF-IDF Cosine Sim'. | |
- word_counts_df: DataFrame with word counts for each segment (chapter) in each file. | |
Contains columns: 'Filename', 'ChapterNumber', 'SegmentID', 'WordCount'. | |
- warning: A string containing any warnings generated during processing (e.g., missing chapter markers). | |
Raises: | |
RuntimeError: If the botok tokenizer fails to initialize. | |
ValueError: If the input files cannot be processed or if metrics computation fails. | |
""" | |
# Initialize model and model_type variables | |
model, model_type = None, None # st_device removed | |
model_warning = "" | |
# Update progress if callback provided | |
if progress_callback is not None: | |
try: | |
progress_callback(0.25, desc="Preparing for text analysis...") | |
except Exception as e: | |
logger.warning(f"Progress callback error (non-critical): {e}") | |
# Continue processing even if progress reporting fails | |
# Load semantic model if enabled | |
if enable_semantic: | |
logger.info("Semantic similarity enabled. Loading embedding model...") | |
try: | |
logger.info("Using model: %s", model_name) | |
if model_name == FASTTEXT_MODEL_ID: # FASTTEXT_MODEL_ID is 'fasttext-tibetan' | |
logger.info(f"Attempting to load custom FastText model: {model_name}") | |
if progress_callback is not None: | |
try: | |
progress_callback(0.25, desc=f"Loading custom FastText model: {model_name}...") | |
except Exception as e: | |
logger.warning(f"Progress callback error (non-critical): {e}") | |
loaded_custom_model = load_fasttext_model(model_id=model_name) # model_id is expected to be path or key by this func | |
if loaded_custom_model: | |
model = loaded_custom_model | |
model_type = "fasttext" | |
logger.info(f"Custom FastText model '{model_name}' loaded successfully.") | |
if progress_callback is not None: | |
try: | |
progress_callback(0.3, desc=f"Custom FastText model '{model_name}' loaded.") | |
except Exception as e: | |
logger.warning(f"Progress callback error (non-critical): {e}") | |
else: | |
model_warning = f"Custom FastText model ('{model_name}') failed to load. Semantic similarity will be disabled." | |
logger.warning(model_warning) | |
enable_semantic = False | |
elif model_name == "facebook-fasttext-pretrained": | |
logger.info(f"Attempting to load Facebook FastText model: {model_name}") | |
if progress_callback is not None: | |
try: | |
progress_callback(0.25, desc=f"Loading Facebook FastText model: {model_name}...") | |
except Exception as e: | |
logger.warning(f"Progress callback error (non-critical): {e}") | |
fb_model, fb_model_type = get_model_and_device(model_id=model_name) # from semantic_embedding | |
if fb_model: | |
model = fb_model | |
model_type = fb_model_type # Should be "fasttext" | |
logger.info(f"Facebook FastText model '{model_name}' (type: {model_type}) loaded successfully.") | |
if progress_callback is not None: | |
try: | |
progress_callback(0.3, desc=f"Facebook FastText model '{model_name}' loaded.") | |
except Exception as e: | |
logger.warning(f"Progress callback error (non-critical): {e}") | |
else: | |
model_warning = f"Facebook FastText model ('{model_name}') failed to load. Semantic similarity will be disabled." | |
logger.warning(model_warning) | |
enable_semantic = False | |
else: # Any other model_name is unsupported | |
model_warning = f"Unsupported model_name: '{model_name}'. Semantic similarity will be disabled. Supported models are '{FASTTEXT_MODEL_ID}' and 'facebook-fasttext-pretrained'." | |
logger.warning(model_warning) | |
enable_semantic = False | |
if progress_callback is not None: | |
try: | |
progress_callback(0.3, desc="Unsupported model, continuing without semantic similarity.") | |
except Exception as e: | |
logger.warning(f"Progress callback error (non-critical): {e}") | |
except Exception as e: # General catch-all for unexpected errors during model loading attempts | |
model_warning = f"An unexpected error occurred while attempting to load model '{model_name}': {e}. Semantic similarity will be disabled." | |
logger.error(model_warning, exc_info=True) | |
enable_semantic = False | |
if progress_callback is not None: | |
try: | |
progress_callback(0.3, desc="Error loading model, continuing without semantic similarity.") | |
except Exception as e_cb: | |
logger.warning(f"Progress callback error (non-critical): {e_cb}") | |
else: | |
logger.info("Semantic similarity disabled. Skipping model loading.") | |
if progress_callback is not None: | |
try: | |
progress_callback(0.3, desc="Processing text segments") | |
except Exception as e: | |
logger.warning(f"Progress callback error (non-critical): {e}") | |
# Detect chapter marker and segment texts | |
if progress_callback is not None: | |
try: | |
progress_callback(0.35, desc="Segmenting texts by chapters...") | |
except Exception as e: | |
logger.warning(f"Progress callback error (non-critical): {e}") | |
chapter_marker = "༈" | |
fallback = False | |
segment_texts = {} | |
# Process each file | |
for i, fname in enumerate(filenames): | |
if progress_callback is not None and len(filenames) > 1: | |
try: | |
progress_callback(0.35 + (0.05 * (i / len(filenames))), | |
desc=f"Segmenting file {i+1}/{len(filenames)}: {fname}") | |
except Exception as e: | |
logger.warning(f"Progress callback error (non-critical): {e}") | |
content = text_data[fname] | |
# Check if content is empty | |
if not content.strip(): | |
logger.warning(f"File '{fname}' is empty or contains only whitespace.") | |
continue | |
# Split by chapter marker if present | |
if chapter_marker in content: | |
segments = [ | |
seg.strip() for seg in content.split(chapter_marker) if seg.strip() | |
] | |
# Check if we have valid segments after splitting | |
if not segments: | |
logger.warning(f"File '{fname}' contains chapter markers but no valid text segments.") | |
continue | |
for idx, seg in enumerate(segments): | |
seg_id = f"{fname}|chapter {idx+1}" | |
cleaned_seg = clean_tibetan_text_for_fasttext(seg) | |
segment_texts[seg_id] = cleaned_seg | |
else: | |
# No chapter markers found, treat entire file as one segment | |
seg_id = f"{fname}|chapter 1" | |
cleaned_content = clean_tibetan_text_for_fasttext(content.strip()) | |
segment_texts[seg_id] = cleaned_content | |
fallback = True | |
# Generate warning if no chapter markers found | |
warning = model_warning # Include any model warnings | |
if fallback: | |
chapter_warning = ( | |
"No chapter marker found in one or more files. " | |
"Each file will be treated as a single segment. " | |
"For best results, add a unique marker (e.g., ༈) to separate chapters or sections." | |
) | |
warning = warning + " " + chapter_warning if warning else chapter_warning | |
# Check if we have any valid segments | |
if not segment_texts: | |
logger.error("No valid text segments found in any of the uploaded files.") | |
return pd.DataFrame(), pd.DataFrame(), "No valid text segments found in the uploaded files. Please check your files and try again." | |
# Group chapters by filename (preserving order) | |
if progress_callback is not None: | |
try: | |
progress_callback(0.4, desc="Organizing text segments...") | |
except Exception as e: | |
logger.warning(f"Progress callback error (non-critical): {e}") | |
file_to_chapters = {} | |
for seg_id in segment_texts: | |
fname = seg_id.split("|")[0] | |
file_to_chapters.setdefault(fname, []).append(seg_id) | |
# For each pair of files, compare corresponding chapters (by index) | |
if progress_callback is not None: | |
try: | |
progress_callback(0.45, desc="Computing similarity metrics...") | |
except Exception as e: | |
logger.warning(f"Progress callback error (non-critical): {e}") | |
results = [] | |
files = list(file_to_chapters.keys()) | |
# Check if we have at least two files to compare | |
if len(files) < 2: | |
logger.warning("Need at least two files to compute similarity metrics.") | |
return pd.DataFrame(), pd.DataFrame(), "Need at least two files to compute similarity metrics." | |
# Track total number of comparisons for progress reporting | |
total_comparisons = 0 | |
for file1, file2 in combinations(files, 2): | |
chaps1 = file_to_chapters[file1] | |
chaps2 = file_to_chapters[file2] | |
total_comparisons += min(len(chaps1), len(chaps2)) | |
# Process each file pair | |
comparison_count = 0 | |
for file1, file2 in combinations(files, 2): | |
chaps1 = file_to_chapters[file1] | |
chaps2 = file_to_chapters[file2] | |
min_chaps = min(len(chaps1), len(chaps2)) | |
if progress_callback is not None: | |
try: | |
progress_callback(0.45, desc=f"Comparing {file1} with {file2}...") | |
except Exception as e: | |
logger.warning(f"Progress callback error (non-critical): {e}") | |
for idx in range(min_chaps): | |
seg1 = chaps1[idx] | |
seg2 = chaps2[idx] | |
# Update progress | |
comparison_count += 1 | |
if progress_callback is not None and total_comparisons > 0: | |
try: | |
progress_percentage = 0.45 + (0.25 * (comparison_count / total_comparisons)) | |
progress_callback(progress_percentage, | |
desc=f"Computing metrics for chapter {idx+1} ({comparison_count}/{total_comparisons})") | |
except Exception as e: | |
logger.warning(f"Progress callback error (non-critical): {e}") | |
try: | |
# Compute metrics for this chapter pair | |
tokenizer_for_fasttext = None | |
current_model_type = model_type if 'model_type' in locals() else "sentence_transformer" | |
if current_model_type == "fasttext": | |
# Tokenizer setup for FastText model: | |
def fasttext_tokenizer_adapter(text_segment: str) -> List[str]: | |
cleaned_segment = clean_tibetan_text_for_fasttext(text_segment) | |
# Use word-level tokenization for the custom FastText model | |
return get_botok_tokens_for_single_text(cleaned_segment, mode="word") | |
tokenizer_for_fasttext = fasttext_tokenizer_adapter | |
logger.info("Using botok word-level tokenization for FastText model.") | |
pair_metrics = compute_all_metrics( | |
{seg1: segment_texts[seg1], seg2: segment_texts[seg2]}, | |
model=model, | |
enable_semantic=enable_semantic, | |
model_type=model_type, | |
use_stopwords=use_stopwords, | |
use_lite_stopwords=use_lite_stopwords, | |
fasttext_tokenize_fn=tokenizer_for_fasttext | |
) | |
# Rename 'Text Pair' to show file stems and chapter number | |
pair_metrics.loc[:, "Text Pair"] = f"{file1} vs {file2}" | |
pair_metrics.loc[:, "Chapter"] = idx + 1 | |
results.append(pair_metrics) | |
except Exception as e: | |
logger.error(f"Error computing metrics for {seg1} vs {seg2}: {e}") | |
# Continue with other comparisons instead of failing completely | |
continue | |
# Create the metrics DataFrame | |
if results: | |
metrics_df = pd.concat(results, ignore_index=True) | |
else: | |
metrics_df = pd.DataFrame() | |
warning += " No valid metrics could be computed. Please check your files and try again." | |
# Calculate word counts | |
if progress_callback is not None: | |
try: | |
progress_callback(0.75, desc="Calculating word counts...") | |
except Exception as e: | |
logger.warning(f"Progress callback error (non-critical): {e}") | |
word_counts_data = [] | |
# Process each segment | |
for i, (seg_id, text_content) in enumerate(segment_texts.items()): | |
# Update progress | |
if progress_callback is not None and len(segment_texts) > 0: | |
try: | |
progress_percentage = 0.75 + (0.15 * (i / len(segment_texts))) | |
progress_callback(progress_percentage, desc=f"Counting words in segment {i+1}/{len(segment_texts)}") | |
except Exception as e: | |
logger.warning(f"Progress callback error (non-critical): {e}") | |
fname, chapter_info = seg_id.split("|", 1) | |
chapter_num = int(chapter_info.replace("chapter ", "")) | |
try: | |
# Use botok for accurate word count for raw Tibetan text | |
tokenized_segments = tokenize_texts([text_content]) # Returns a list of lists | |
if tokenized_segments and tokenized_segments[0]: | |
word_count = len(tokenized_segments[0]) | |
else: | |
word_count = 0 | |
word_counts_data.append( | |
{ | |
"Filename": fname.replace(".txt", ""), | |
"ChapterNumber": chapter_num, | |
"SegmentID": seg_id, | |
"WordCount": word_count, | |
} | |
) | |
except Exception as e: | |
logger.error(f"Error calculating word count for segment {seg_id}: {e}") | |
# Add entry with 0 word count to maintain consistency | |
word_counts_data.append( | |
{ | |
"Filename": fname.replace(".txt", ""), | |
"ChapterNumber": chapter_num, | |
"SegmentID": seg_id, | |
"WordCount": 0, | |
} | |
) | |
# Create and sort the word counts DataFrame | |
word_counts_df = pd.DataFrame(word_counts_data) | |
if not word_counts_df.empty: | |
word_counts_df = word_counts_df.sort_values( | |
by=["Filename", "ChapterNumber"] | |
).reset_index(drop=True) | |
if progress_callback is not None: | |
try: | |
progress_callback(0.95, desc="Analysis complete!") | |
except Exception as e: | |
logger.warning(f"Progress callback error (non-critical): {e}") | |
return metrics_df, word_counts_df, warning | |