Spaces:
Sleeping
Sleeping
import re | |
import pdfplumber | |
import numpy as np | |
import pytesseract | |
from transformers import AutoTokenizer | |
from pdf2image import convert_from_path | |
from src.utils.config import DALAT5_MODEL, CHUNK_SIZE, CHUNK_OVERLAP | |
from typing import Any, List | |
# Load DalaT5's tokeniser | |
tokeniser = AutoTokenizer.from_pretrained(DALAT5_MODEL) | |
def extract_text_with_pdfplumber(file: Any) -> str: | |
""" | |
Extract text by leveraging PDFPlumber, which is particularly useful for PDF files | |
with tabular data. | |
""" | |
if file.name.endswith(".pdf"): | |
try: | |
with pdfplumber.open(file.name) as pdf: | |
texts = [page.extract_text() or "" for page in pdf.pages] | |
return "\n".join(texts).strip() | |
except Exception as e: | |
print(f"[ERROR] PDFPlumber failed: {e}") | |
return "" | |
return "" | |
def extract_text_with_ocr(file: Any) -> str: | |
""" | |
Extract text data by leveraging Tesseract. | |
""" | |
if file.name.endswith(".pdf"): | |
try: | |
images = convert_from_path(file.name, dpi = 300) | |
page_texts = [] | |
for img in images: | |
raw = pytesseract.image_to_string(img, lang = "kaz+eng") | |
# Clean page-by-page | |
cleaned = repair_extracted_text(raw) | |
page_texts.append(cleaned) | |
return "\n".join(page_texts).strip() | |
except Exception as e: | |
print(f"[ERROR] OCR failed: {e}") | |
return "" | |
def clean_text(text: str) -> str: | |
""" | |
Pre-clean text before chunking. | |
""" | |
# Collapse multiple newlines into a space | |
text = re.sub(r"\n+", " ", text) | |
# Normalize excessive punctuation | |
text = re.sub(r"[^\w\s]{2,}", "", text) | |
# Remove repeated punctuation or symbols | |
text = re.sub(r"[β’ββββ]+", " ", text) | |
# Normalize extra spacing | |
text = re.sub(r"\s{2,}", " ", text) | |
return text.strip() | |
def is_valid_chunk(chunk: str) -> bool: | |
""" | |
Heuristic to filter out low-quality chunks. | |
""" | |
if len(chunk) < 20: | |
return False | |
symbols = sum(1 for c in chunk if not c.isalnum() and c != ' ') | |
if symbols / len(chunk) > 0.4: | |
return False | |
return True | |
def deduplicate_chunks(chunks: List[str], embedder: Any, threshold: float = 0.95) -> List[str]: | |
""" | |
Deduplicate chunks based on cosine similarity. | |
Only retains semantically distinct segments. | |
""" | |
unique_chunks = [] | |
seen_embeddings = [] | |
for chunk in chunks: | |
emb = embedder.embed_text(chunk) | |
if all(np.dot(emb, e) / (np.linalg.norm(emb) * np.linalg.norm(e)) < threshold for e in seen_embeddings): | |
unique_chunks.append(chunk) | |
seen_embeddings.append(emb) | |
return unique_chunks | |
def chunk_text(text: str) -> List[str]: | |
""" | |
Chunk text into overlapping token-based segments using DalaT5's tokeniser. | |
""" | |
# Clean text before doing anything | |
cleaned_text = clean_text(text) | |
# Encode with the tokeniser | |
tokens = tokeniser.encode(cleaned_text, add_special_tokens = False) | |
total_tokens = len(tokens) | |
if total_tokens <= CHUNK_SIZE: | |
single_chunk = tokeniser.decode(tokens, skip_special_tokens=True).strip() | |
return [single_chunk] if is_valid_chunk(single_chunk) else [] | |
chunks = [] | |
start = 0 | |
while start < total_tokens: | |
end = min(start + CHUNK_SIZE, total_tokens) | |
chunk_tokens = tokens[start:end] | |
chunk = tokeniser.decode(chunk_tokens, skip_special_tokens=True).strip() | |
if is_valid_chunk(chunk): | |
chunks.append(chunk) | |
start += CHUNK_SIZE - CHUNK_OVERLAP | |
return chunks | |
def repair_extracted_text(text: str) -> str: | |
""" | |
Additional logic to repair broken line splits, hyphenations, and common repetition artifacts. | |
""" | |
# Remove repeated words | |
text = re.sub(r'\b(\w{4,})\s+\1\b', r'\1', text) | |
# Fix hyphenation | |
text = re.sub(r'(\w+)-\s+(\w+)', r'\1\2', text) | |
# Remove extremely repeated sentences | |
text = re.sub(r'(\b\w{1,2}\b\s+){5,}', '', text) | |
# Remove some previously observed junk | |
text = re.sub(r'\b(Googsoft|Hoogsoft|biometriialyq|avtorometriia)\b', '', text) | |
# Collapse multiple spaces | |
text = re.sub(r'\s{2,}', ' ', text) | |
return text.strip() | |