Spaces:
Sleeping
Sleeping
import os | |
import csv | |
import time | |
import hashlib | |
import gradio as gr | |
from datetime import datetime | |
from src.db.vector_store import VectorStore | |
from src.modelling.embed import DalaEmbedder | |
from src.modelling.topic_model import TopicModeller | |
from src.modelling.transliterate import DalaTransliterator | |
from src.utils.data_utils import ( | |
extract_text_with_pdfplumber, | |
extract_text_with_ocr, | |
chunk_text, | |
deduplicate_chunks, | |
repair_extracted_text | |
) | |
from typing import Any, List, Tuple | |
# Instantiate components | |
translit = DalaTransliterator() | |
embedder = DalaEmbedder() | |
vector_db = VectorStore() | |
topic_modeller = TopicModeller() | |
def print_recent_logs(n: int = 5): | |
""" | |
Print the last N log lines to the container logs for developer monitoring. | |
""" | |
log_file = "semanticdala_log.csv" | |
if os.path.exists(log_file): | |
print(f"\n[SEMANTICDALA USAGE LOG - Last {n} Entries]") | |
with open(log_file, "r") as f: | |
lines = f.readlines() | |
for line in lines[-n:]: | |
print(line.strip()) | |
print("[END LOG SNAPSHOT]\n") | |
def log_submission(filename: str, num_chunks: int, start_time: float, status: str, session_id: str = "anonymous") -> None: | |
""" | |
Basic logging utility to keep track of app usage. | |
""" | |
log_file = "semanticdala_log.csv" | |
end_time = time.time() | |
duration = round(end_time - start_time, 2) | |
# Anonymise filename for privacy | |
anonymized_name = hashlib.sha256(filename.encode()).hexdigest()[:10] | |
# Get file size in bytes | |
file_size = os.path.getsize(filename) if os.path.exists(filename) else 0 | |
file_size_mb = round(file_size / (1024 * 1024), 2) | |
log_entry = { | |
"timestamp": datetime.now(datetime.UTC).isoformat(), | |
"filename_hash": anonymized_name, | |
"file_size_mb": file_size_mb, | |
"num_chunks": num_chunks, | |
"processing_time_sec": duration, | |
"status": status, | |
"session_id": session_id | |
} | |
file_exists = os.path.isfile(log_file) | |
with open(log_file, mode = 'a', newline = "") as f: | |
writer = csv.DictWriter(f, fieldnames = log_entry.keys()) | |
if not file_exists: | |
writer.writeheader() | |
writer.writerow(log_entry) | |
def extract_text(file: Any) -> str: | |
""" | |
Try multiple PDF extraction strategies, with fallback to OCR if necessary. | |
""" | |
if file.name.endswith(".pdf"): | |
text = extract_text_with_pdfplumber(file) | |
if len(text.strip()) > 100: | |
return repair_extracted_text(text) | |
print("[INFO] Falling back to OCR...") | |
return extract_text_with_ocr(file) | |
elif file.name.endswith(".txt"): | |
return repair_extracted_text(file.read().decode("utf-8", errors = "ignore")) | |
return "" | |
def process_file(file: Any) -> Tuple[List[Tuple[str, int]], Any, Any]: | |
""" | |
Main file processing function, which will also chunk, transliterate and cluster | |
the file contents, as well as plot the clusters. | |
""" | |
start = time.time() | |
try: | |
raw_text = extract_text(file) | |
chunks = chunk_text(raw_text) | |
# Deduplicate and embed embedding | |
translits = translit.batch_transliterate(chunks) | |
dedup_translits = deduplicate_chunks(translits, embedder) | |
embeddings = embedder.embed_batch(dedup_translits) | |
# Clear previous entries before adding | |
vector_db.index.reset() | |
vector_db.metadata = [] | |
metadata = [{"id": f"{file.name}_chunk{i}", "text": t} for i, t in enumerate(dedup_translits)] | |
vector_db.add(embeddings, metadata) | |
# Topic modelling | |
topics, fig, topic_labels, umap_fig = topic_modeller.fit(translits, embeddings) | |
# Get a list of rows for topic labels | |
overview_table = [[k, v] for k, v in topic_labels.items()] | |
# Zip back transliterated text with topic IDs | |
annotated = list(zip(translits, topics)) | |
# Log success | |
log_submission(file.name, len(chunks), start, status = "success") | |
print_recent_logs() | |
return annotated, fig, overview_table, umap_fig | |
except Exception as e: | |
log_submission(file.name, 0, start, status = f"error: {str(e)}") | |
print_recent_logs() | |
raise e | |
def search_text(query: str): | |
""" | |
Search for a given query in the vector DB. | |
""" | |
query_emb = embedder.embed_text(query) | |
results = vector_db.search(query_emb, top_k = 5) | |
return "\n\n".join(f"[{r['id']}]: {r['text']}" for r in results) | |
# Gradio UI | |
with gr.Blocks() as demo: | |
title_html = gr.HTML("<center><h1>🇰🇿 SemanticDala</h1><h2>Қазақтың семантикалық платформасы</h2><h3>Kazakh Semantic Platform</h3></center>") | |
with gr.Tab("📁 Жүктеп салу және өңдеу / Upload and Process"): | |
with gr.Row(): | |
file_input = gr.File(label = "PDF немесе TXT жүктеңіз / Upload PDF or TXT", file_types = [".pdf", ".txt"]) | |
process_btn = gr.Button("Процесс файлы / Process File", scale = 1) | |
translit_output = gr.Dataframe( | |
headers = ["Мәтін / Text", "Тақырып идентификаторы / Topic ID"], | |
label = "Транслитерацияланған үзінділер + Тақырыптар / Transliterated Chunks + Topics" | |
) | |
topic_label_table = gr.Dataframe( | |
headers = ["Тақырып идентификаторы / Topic ID", "Белгі / Label"], | |
label = "Тақырып белгілері / Topic Labels" | |
) | |
with gr.Row(equal_height = True): | |
with gr.Column(scale = 1): | |
plot_output = gr.Plot(label = "Негізгі тақырыптар / Top Topics") | |
with gr.Column(scale = 1): | |
umap_output = gr.Plot(label = "UMAP проекциясы / UMAP Topic Projection") | |
with gr.Tab("🔍 Семантикалық іздеу / Semantic Search"): | |
with gr.Row(): | |
search_box = gr.Textbox(label = "Сұрау / Query", placeholder = "мысалы / e.g., Qazaqstan tarihy", lines = 1, scale = 5) | |
search_btn = gr.Button("Іздеу / Search", scale = 1) | |
search_results = gr.Textbox(label = "Нәтижелер / Top Results", lines = 6, interactive = False) | |
# Bind callbacks | |
process_btn.click( | |
fn = process_file, | |
inputs = file_input, | |
outputs = [translit_output, plot_output, topic_label_table, umap_output] | |
) | |
search_btn.click(fn = search_text, inputs = search_box, outputs = search_results) | |
# Launch | |
if __name__ == "__main__": | |
demo.launch() | |