Spaces:
Running
Running
import os | |
import json | |
import fitz # PyMuPDF | |
import docx | |
import chromadb | |
import torch | |
import nltk | |
import gradio as gr | |
from tqdm import tqdm | |
from typing import List | |
from PIL import Image | |
from nltk.tokenize import sent_tokenize | |
from sentence_transformers import SentenceTransformer, util | |
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline | |
# --- Ensure punkt tokenizer is available --- | |
try: | |
nltk.data.find("tokenizers/punkt") | |
except LookupError: | |
nltk.download("punkt") | |
# --- Configuration --- | |
MANUALS_FOLDER = "./Manuals" | |
CHROMA_PATH = "./chroma_store" | |
COLLECTION_NAME = "manual_chunks" | |
MODEL_OPTIONS = { | |
"LLaMA 3.1 8B": "meta-llama/Llama-3.1-8B-Instruct", | |
"Mistral 7B": "mistralai/Mistral-7B-Instruct-v0.3", | |
"Gemma 7B": "google/gemma-1.1-7b-it" | |
} | |
HF_TOKEN = os.environ.get("HF_TOKEN") | |
MAX_CONTEXT_CHUNKS = 3 | |
# --- Utility Functions --- | |
def extract_text_from_pdf(path): | |
try: | |
doc = fitz.open(path) | |
return "\n".join([page.get_text().strip() for page in doc]) | |
except: | |
return "" | |
def extract_text_from_docx(path): | |
try: | |
doc = docx.Document(path) | |
return "\n".join([para.text.strip() for para in doc.paragraphs]) | |
except: | |
return "" | |
def clean(text): | |
return "\n".join([line.strip() for line in text.splitlines() if line.strip()]) | |
def split_sentences(text): | |
try: | |
return sent_tokenize(text) | |
except Exception as e: | |
print(f"[Tokenizer Error] {e}. Falling back to simple split.") | |
return text.split(". ") | |
def chunk_sentences(sentences, max_tokens=500, overlap=50): | |
chunks = [] | |
current = [] | |
total = 0 | |
for sentence in sentences: | |
count = len(sentence.split()) | |
if total + count > max_tokens: | |
chunks.append(" ".join(current)) | |
current = current[-overlap:] | |
total = sum(len(s.split()) for s in current) | |
current.append(sentence) | |
total += count | |
if current: | |
chunks.append(" ".join(current)) | |
return chunks | |
def embed_all(): | |
db = chromadb.PersistentClient(path=CHROMA_PATH) | |
if COLLECTION_NAME in [c.name for c in db.list_collections()]: | |
db.delete_collection(COLLECTION_NAME) | |
collection = db.create_collection(COLLECTION_NAME) | |
embedder = SentenceTransformer("all-MiniLM-L6-v2") | |
all_chunks = [] | |
for fname in os.listdir(MANUALS_FOLDER): | |
path = os.path.join(MANUALS_FOLDER, fname) | |
text = "" | |
if fname.lower().endswith(".pdf"): | |
text = extract_text_from_pdf(path) | |
elif fname.lower().endswith(".docx"): | |
text = extract_text_from_docx(path) | |
else: | |
continue | |
sents = split_sentences(clean(text)) | |
chunks = chunk_sentences(sents) | |
for idx, chunk in enumerate(chunks): | |
chunk_id = f"{fname}::chunk_{idx}" | |
all_chunks.append({"id": chunk_id, "text": chunk, "metadata": {"source": fname}}) | |
for i in range(0, len(all_chunks), 16): | |
batch = all_chunks[i:i+16] | |
docs = [x["text"] for x in batch] | |
ids = [x["id"] for x in batch] | |
metas = [x["metadata"] for x in batch] | |
embs = embedder.encode(docs).tolist() | |
collection.add(documents=docs, ids=ids, metadatas=metas, embeddings=embs) | |
return collection, embedder | |
def answer_query(query, model_choice): | |
db, embedder = embed_all() | |
results = db.get_collection(COLLECTION_NAME).query(query_texts=[query], n_results=MAX_CONTEXT_CHUNKS) | |
context = "\n\n".join(results["documents"][0]) | |
model_id = MODEL_OPTIONS.get(model_choice) | |
tokenizer = AutoTokenizer.from_pretrained(model_id, token=HF_TOKEN) | |
model = AutoModelForCausalLM.from_pretrained(model_id, token=HF_TOKEN) | |
pipe = pipeline("text-generation", model=model, tokenizer=tokenizer) | |
prompt = f""" | |
Context: | |
{context} | |
Question: {query} | |
Answer:""" | |
out = pipe(prompt, max_new_tokens=300, do_sample=False) | |
return out[0]["generated_text"].split("Answer:")[-1].strip() | |
# --- UI --- | |
with gr.Blocks() as demo: | |
gr.Markdown("""# 📘 SmartManuals-AI | |
Ask technical questions from manuals (PDF & DOCX) with LLM + OCR + RAG. | |
""") | |
with gr.Row(): | |
question = gr.Textbox(label="Your Question", placeholder="e.g., How do I reset the console?") | |
model_choice = gr.Dropdown(choices=list(MODEL_OPTIONS.keys()), value="LLaMA 3.1 8B", label="Model") | |
answer = gr.Textbox(label="Answer") | |
submit = gr.Button("Ask") | |
submit.click(fn=answer_query, inputs=[question, model_choice], outputs=answer) | |
demo.launch() | |