File size: 5,212 Bytes
bc25066
 
 
 
 
18069c2
bc25066
 
6f368e7
bc25066
 
 
6f368e7
835a614
bc25066
 
 
fcbea64
bc25066
835a614
bc25066
835a614
 
bc25066
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6f368e7
c76542a
bc25066
 
 
 
 
 
 
 
 
 
 
 
fcbea64
bc25066
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
fcbea64
bc25066
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
# app.py
# SmartManuals-AI: Hugging Face Space version

import os, json, fitz, nltk, chromadb, io
import torch
from tqdm import tqdm
from PIL import Image
from docx import Document
from sentence_transformers import SentenceTransformer, util
from transformers import pipeline, AutoTokenizer, AutoModelForCausalLM
from nltk.tokenize import sent_tokenize
import pytesseract
import gradio as gr

# ----------------------
# Configuration
# ----------------------
MANUALS_FOLDER = "./Manuals"
CHUNKS_JSONL = "chunks.jsonl"
CHROMA_PATH = "./chroma_store"
COLLECTION_NAME = "manual_chunks"
CHUNK_SIZE = 750
CHUNK_OVERLAP = 100
MODEL_NAME = "meta-llama/Llama-3.1-8B-Instruct"
HF_TOKEN = os.getenv("HF_TOKEN")

# ----------------------
# Ensure punkt is downloaded
# ----------------------
nltk.download("punkt")

# ----------------------
# Utilities
# ----------------------
def extract_text_from_pdf(path):
    doc = fitz.open(path)
    text = ""
    for page in doc:
        t = page.get_text()
        if not t.strip():
            pix = page.get_pixmap(dpi=300)
            img = Image.open(io.BytesIO(pix.tobytes("png")))
            t = pytesseract.image_to_string(img)
        text += t + "\n"
    return text

def extract_text_from_docx(path):
    doc = Document(path)
    return "\n".join(p.text for p in doc.paragraphs if p.text.strip())

def clean(text):
    return "\n".join([line.strip() for line in text.splitlines() if line.strip()])

def split_sentences(text):
    return sent_tokenize(text)

def chunk_sentences(sentences, max_tokens=CHUNK_SIZE, overlap=CHUNK_OVERLAP):
    chunks, chunk, count = [], [], 0
    for s in sentences:
        words = s.split()
        if count + len(words) > max_tokens:
            chunks.append(" ".join(chunk))
            chunk = chunk[-overlap:] if overlap > 0 else []
            count = sum(len(x.split()) for x in chunk)
        chunk.append(s)
        count += len(words)
    if chunk:
        chunks.append(" ".join(chunk))
    return chunks

def get_metadata(filename):
    name = filename.lower()
    return {
        "source_file": filename,
        "doc_type": "service manual" if "sm" in name else "owner manual" if "om" in name else "unknown",
        "model": "se3hd" if "se3hd" in name else "unknown"
    }

# ----------------------
# Embedding
# ----------------------
def embed_all():
    embedder = SentenceTransformer("all-MiniLM-L6-v2")
    client = chromadb.PersistentClient(path=CHROMA_PATH)
    try:
        client.delete_collection(COLLECTION_NAME)
    except:
        pass
    collection = client.create_collection(COLLECTION_NAME)
    chunks, metadatas, ids = [], [], []
    files = os.listdir(MANUALS_FOLDER)
    idx = 0
    for file in tqdm(files):
        path = os.path.join(MANUALS_FOLDER, file)
        text = extract_text_from_pdf(path) if file.endswith(".pdf") else extract_text_from_docx(path)
        meta = get_metadata(file)
        sents = split_sentences(clean(text))
        for i, chunk in enumerate(chunk_sentences(sents)):
            chunks.append(chunk)
            ids.append(f"{file}::chunk_{i}")
            metadatas.append(meta)
            if len(chunks) >= 16:
                emb = embedder.encode(chunks).tolist()
                collection.add(documents=chunks, ids=ids, metadatas=metadatas, embeddings=emb)
                chunks, ids, metadatas = [], [], []
    if chunks:
        emb = embedder.encode(chunks).tolist()
        collection.add(documents=chunks, ids=ids, metadatas=metadatas, embeddings=emb)
    return collection, embedder

# ----------------------
# Model setup
# ----------------------
def load_model():
    device = 0 if torch.cuda.is_available() else -1
    tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME, token=HF_TOKEN)
    model = AutoModelForCausalLM.from_pretrained(MODEL_NAME, token=HF_TOKEN)
    return pipeline("text-generation", model=model, tokenizer=tokenizer, device=device, max_new_tokens=512)

# ----------------------
# RAG Pipeline
# ----------------------
def answer_query(question):
    results = db.query(query_texts=[question], n_results=5)
    context = "\n\n".join(results["documents"][0])
    prompt = f"""<|begin_of_text|><|start_header_id|>system<|end_header_id|>
You are a helpful assistant. Use the provided context to answer questions. If you don't know, say 'I don't know.'
<context>
{context}
</context>
<|start_header_id|>user<|end_header_id|>
{question}<|start_header_id|>assistant<|end_header_id|>"""
    return llm(prompt)[0]["generated_text"].split("<|start_header_id|>assistant<|end_header_id|>")[-1].strip()

# ----------------------
# UI
# ----------------------
with gr.Blocks() as demo:
    status = gr.Textbox(label="Status", value="Embedding manuals... Please wait.", interactive=False)
    question = gr.Textbox(label="Ask a Question")
    submit = gr.Button("🔍 Ask")
    answer = gr.Textbox(label="Answer", lines=8)

    def handle_query(q):
        return answer_query(q)

    submit.click(fn=handle_query, inputs=question, outputs=answer)

# ----------------------
# Startup
# ----------------------
status_text = "Embedding manuals and loading model..."
db, embedder = embed_all()
llm = load_model()
status_text = "Ready!"
demo.launch()