import os import fitz # PyMuPDF import docx import json import gradio as gr import pytesseract from PIL import Image from tqdm import tqdm import chromadb import torch import nltk from sentence_transformers import SentenceTransformer, util from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline # ---------------------------- # ✅ Ensure nltk punkt is available # ---------------------------- try: nltk.data.find("tokenizers/punkt") except LookupError: nltk.download("punkt") from nltk.tokenize import sent_tokenize # ---------------------------- # ⚙️ Config # ---------------------------- MANUAL_DIR = "./Manuals" CHROMA_DIR = "./chroma_store" CHUNK_SIZE = 750 CHUNK_OVERLAP = 100 MAX_CONTEXT = 3 DEFAULT_MODEL = "meta-llama/Llama-3-8b-Instruct" MODEL_OPTIONS = [ "meta-llama/Llama-3-8b-Instruct", "mistralai/Mistral-7B-Instruct-v0.3", "google/gemma-1.1-7b-it" ] HF_TOKEN = os.environ.get("HF_TOKEN") # ---------------------------- # 🔍 Utility functions # ---------------------------- def extract_pdf_text(path): text_blocks = [] doc = fitz.open(path) for i, page in enumerate(doc): text = page.get_text() if not text.strip(): img = Image.open(io.BytesIO(page.get_pixmap().tobytes("png"))) text = pytesseract.image_to_string(img) text_blocks.append({"page": i + 1, "text": text}) return text_blocks def extract_docx_text(path): doc = docx.Document(path) full_text = "\n".join([para.text for para in doc.paragraphs]) return [{"page": 1, "text": full_text}] def split_sentences(text): try: return sent_tokenize(text) except Exception: return text.split(". ") def chunk_text(sentences): chunks = [] current = [] count = 0 for sentence in sentences: tokens = sentence.split() if count + len(tokens) > CHUNK_SIZE: chunks.append(" ".join(current)) current = current[-CHUNK_OVERLAP:] count = sum(len(s.split()) for s in current) current.append(sentence) count += len(tokens) if current: chunks.append(" ".join(current)) return chunks def embed_all(): client = chromadb.PersistentClient(path=CHROMA_DIR) if "manual_chunks" in [c.name for c in client.list_collections()]: client.delete_collection("manual_chunks") collection = client.create_collection("manual_chunks") embedder = SentenceTransformer("all-MiniLM-L6-v2") for fname in os.listdir(MANUAL_DIR): fpath = os.path.join(MANUAL_DIR, fname) if fname.lower().endswith(".pdf"): pages = extract_pdf_text(fpath) elif fname.lower().endswith(".docx"): pages = extract_docx_text(fpath) else: continue for page in pages: sents = split_sentences(page["text"]) chunks = chunk_text(sents) for idx, chunk in enumerate(chunks): cid = f"{fname}::p{page['page']}::c{idx}" collection.add(documents=[chunk], ids=[cid], metadatas=[{"source": fname, "page": page["page"]}]) return collection, embedder def get_model(model_id): tokenizer = AutoTokenizer.from_pretrained(model_id, token=HF_TOKEN) model = AutoModelForCausalLM.from_pretrained(model_id, token=HF_TOKEN, torch_dtype=torch.float32) return pipeline("text-generation", model=model, tokenizer=tokenizer, device=-1) def run_query(question, model_name): results = db.query(query_texts=[question], n_results=MAX_CONTEXT) if not results or not results.get("documents"): return "No matching information found." context = "\n\n".join(results["documents"][0]) prompt = f""" You are a helpful assistant. Use the following context to answer the question. Context: {context} Question: {question} Answer: """ model = get_model(model_name) res = model(prompt, max_new_tokens=300)[0]['generated_text'] return res.split("Answer:")[-1].strip() # ---------------------------- # ✅ Startup: Embed manuals # ---------------------------- db, embedder = embed_all() # ---------------------------- # 🎛️ Gradio UI # ---------------------------- with gr.Blocks() as demo: gr.Markdown(""" # 📘 SmartManuals-AI (Docker) Ask any question from the preloaded manuals (PDF + Word). """) with gr.Row(): question = gr.Textbox(label="Ask a Question") model = gr.Dropdown(choices=MODEL_OPTIONS, value=DEFAULT_MODEL, label="Choose LLM") btn = gr.Button("Ask") answer = gr.Textbox(label="Answer", lines=10) btn.click(fn=run_query, inputs=[question, model], outputs=answer) demo.launch(server_name="0.0.0.0", server_port=7860)