damoojeje commited on
Commit
6728736
Β·
verified Β·
1 Parent(s): e6fa21e

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +149 -118
app.py CHANGED
@@ -1,156 +1,187 @@
1
- # βœ… SmartManuals-AI app.py (for Hugging Face Spaces)
2
- # Optimized to support multiple LLMs, Gradio UI, and secure on-device document QA
3
 
4
  import os
5
  import json
6
- import io
7
- import fitz
8
  import nltk
9
  import chromadb
 
 
10
  import pytesseract
11
- import numpy as np
12
- import torch
13
  from PIL import Image
14
  from tqdm import tqdm
15
  from nltk.tokenize import sent_tokenize
16
  from sentence_transformers import SentenceTransformer, util
17
  from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline
18
- import gradio as gr
19
 
20
- # ----------------------
21
- # πŸ”§ Configurations
22
- # ----------------------
23
- PDF_DIR = "./Manuals"
24
- CHROMA_PATH = "./chroma_store"
25
- COLLECTION_NAME = "manual_chunks"
 
 
 
26
  MAX_CONTEXT_CHUNKS = 3
27
- CHUNK_SIZE = 750
28
- CHUNK_OVERLAP = 100
29
- MODEL_OPTIONS = [
30
- "meta-llama/Llama-3.1-8B-Instruct",
31
- "meta-llama/Llama-4-Scout-17B-16E-Instruct",
32
- "google/gemma-1.1-7b-it",
33
- "Qwen/Qwen1.5-14B-Chat",
34
- "mistralai/Mistral-7B-Instruct-v0.3"
35
- ]
36
  HF_TOKEN = os.environ.get("HF_TOKEN")
37
 
38
- # ----------------------
39
- # πŸ“š NLTK Setup
40
- # ----------------------
41
- try:
42
- nltk.data.find('tokenizers/punkt')
43
- except LookupError:
44
- nltk.download('punkt')
45
-
46
- # ----------------------
47
- # πŸ“„ Utility Functions
48
- # ----------------------
49
- def extract_text_or_ocr(page):
50
- text = page.get_text().strip()
51
- if text:
52
- return text, False
53
- pix = page.get_pixmap(dpi=300)
54
- img_data = pix.tobytes("png")
55
- img = Image.open(io.BytesIO(img_data))
56
- return pytesseract.image_to_string(img).strip(), True
57
 
 
 
 
58
  def clean_text(text):
59
  return "\n".join([line.strip() for line in text.splitlines() if line.strip()])
60
 
61
  def tokenize_sentences(text):
 
62
  return sent_tokenize(text)
63
 
64
- def split_chunks(sentences, max_tokens=CHUNK_SIZE, overlap=CHUNK_OVERLAP):
65
- chunks, chunk, length = [], [], 0
66
  for sentence in sentences:
67
- count = len(sentence.split())
68
- if length + count > max_tokens and chunk:
69
- chunks.append(" ".join(chunk))
70
- chunk = chunk[-overlap:]
71
- length = sum(len(s.split()) for s in chunk)
72
- chunk.append(sentence)
73
- length += count
74
- if chunk: chunks.append(" ".join(chunk))
 
75
  return chunks
76
 
77
- def extract_metadata(filename):
78
  name = filename.lower().replace("_", " ").replace("-", " ")
79
  meta = {"model": "unknown", "doc_type": "unknown", "brand": "life fitness"}
80
- if "om" in name or "owner" in name: meta["doc_type"] = "owner manual"
81
- elif "sm" in name or "service" in name: meta["doc_type"] = "service manual"
82
  elif "assembly" in name: meta["doc_type"] = "assembly instructions"
83
  elif "alert" in name: meta["doc_type"] = "installer alert"
84
  elif "parts" in name: meta["doc_type"] = "parts manual"
85
- elif "bulletin" in name: meta["doc_type"] = "service bulletin"
86
- for kw in ["se3hd", "se3", "se4", "symbio", "explore", "integrity x", "integrity sl", "everest", "engage", "inspire", "discover", "95t", "95x", "95c", "95r", "97c"]:
87
- if kw.replace(" ", "") in name.replace(" ", ""): meta["model"] = kw
 
88
  return meta
89
 
90
- # ----------------------
91
- # 🧠 Load LLM
92
- # ----------------------
93
- def load_llm(model_id):
94
- tokenizer = AutoTokenizer.from_pretrained(model_id, token=HF_TOKEN)
95
- model = AutoModelForCausalLM.from_pretrained(model_id, token=HF_TOKEN, torch_dtype=torch.float32)
96
- return pipeline("text-generation", model=model, tokenizer=tokenizer, device=-1)
97
-
98
- # ----------------------
99
- # 🧠 Chroma + Embed
100
- # ----------------------
101
- def embed_pdfs():
102
- os.makedirs(CHROMA_PATH, exist_ok=True)
103
- client = chromadb.PersistentClient(path=CHROMA_PATH)
104
- if COLLECTION_NAME in [c.name for c in client.list_collections()]:
105
- client.delete_collection(COLLECTION_NAME)
106
- collection = client.create_collection(COLLECTION_NAME)
107
- embedder = SentenceTransformer("all-MiniLM-L6-v2")
108
 
109
- for file in tqdm(os.listdir(PDF_DIR)):
110
- if not file.lower().endswith(".pdf"): continue
111
- doc = fitz.open(os.path.join(PDF_DIR, file))
112
- meta = extract_metadata(file)
113
- for page_num, page in enumerate(doc, 1):
114
- text, _ = extract_text_or_ocr(page)
115
- if not text.strip(): continue
 
 
 
 
 
 
 
116
  sents = tokenize_sentences(clean_text(text))
117
- chunks = split_chunks(sents)
118
  for i, chunk in enumerate(chunks):
119
- chunk_id = f"{file}::p{page_num}::c{i}"
120
- emb = embedder.encode([chunk])[0].tolist()
121
- collection.add(
122
- documents=[chunk],
123
- ids=[chunk_id],
124
- embeddings=[emb],
125
- metadatas=[{**meta, "source_file": file, "page": page_num}]
126
- )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
127
  return collection, embedder
128
 
129
- # ----------------------
130
- # πŸ” RAG Pipeline
131
- # ----------------------
132
- def answer_query(q, model_id):
133
- collection, embedder = embed_pdfs()
134
- pipe = load_llm(model_id)
135
- emb_q = embedder.encode([q])[0].tolist()
136
- results = collection.query(query_embeddings=[emb_q], n_results=MAX_CONTEXT_CHUNKS)
137
- context = "\n\n".join(results['documents'][0])
138
- prompt = f"Use the context below to answer the question.\nContext:\n{context}\n\nQuestion: {q}\nAnswer:"
139
- return pipe(prompt)[0]['generated_text'].split("Answer:")[-1].strip()
140
-
141
- # ----------------------
142
- # πŸš€ Gradio UI
143
- # ----------------------
144
- with gr.Blocks() as app:
145
- gr.Markdown("""# SmartManuals-AI
146
- **Local-first document QA** powered by OCR, ChromaDB & your choice of LLM (via Hugging Face).
147
- """)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
148
  with gr.Row():
149
- question = gr.Textbox(placeholder="Ask a question from the manuals...", label="Question")
150
- model_choice = gr.Dropdown(label="Choose Model", choices=MODEL_OPTIONS, value=MODEL_OPTIONS[0])
151
- output = gr.Textbox(label="Answer", lines=10)
152
- run = gr.Button("Run RAG")
153
- run.click(fn=answer_query, inputs=[question, model_choice], outputs=output)
154
-
155
- if __name__ == "__main__":
156
- app.launch()
 
 
 
1
+ # βœ… app.py (SmartManuals-AI)
2
+ # Hugging Face Space-ready app with multi-model support, PDF upload, and live progress feedback
3
 
4
  import os
5
  import json
6
+ import fitz # PyMuPDF
 
7
  import nltk
8
  import chromadb
9
+ import tempfile
10
+ import shutil
11
  import pytesseract
12
+ import gradio as gr
 
13
  from PIL import Image
14
  from tqdm import tqdm
15
  from nltk.tokenize import sent_tokenize
16
  from sentence_transformers import SentenceTransformer, util
17
  from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline
 
18
 
19
+ # ---------------------------
20
+ # πŸ”§ CONFIG
21
+ # ---------------------------
22
+ pdf_folder = "Manuals"
23
+ output_jsonl_chunks = "chunks.jsonl"
24
+ chroma_path = "./chroma_store"
25
+ collection_name = "manual_chunks"
26
+ chunk_size = 750
27
+ chunk_overlap = 100
28
  MAX_CONTEXT_CHUNKS = 3
 
 
 
 
 
 
 
 
 
29
  HF_TOKEN = os.environ.get("HF_TOKEN")
30
 
31
+ MODEL_MAP = {
32
+ "LLaMA 3 (8B)": "meta-llama/Meta-Llama-3-8B-Instruct",
33
+ "LLaMA 4 Scout (17B)": "meta-llama/Meta-Llama-4-Scout-17B-16E-Instruct",
34
+ "Gemma 3 (27B)": "google/gemma-3-27b-it",
35
+ "Mistral 7B": "mistralai/Mistral-7B-Instruct-v0.3",
36
+ "Qwen3 (30B)": "Qwen/Qwen3-30B-A3B"
37
+ }
 
 
 
 
 
 
 
 
 
 
 
 
38
 
39
+ # ---------------------------
40
+ # πŸ“₯ UTILITIES
41
+ # ---------------------------
42
  def clean_text(text):
43
  return "\n".join([line.strip() for line in text.splitlines() if line.strip()])
44
 
45
  def tokenize_sentences(text):
46
+ nltk.download('punkt', quiet=True)
47
  return sent_tokenize(text)
48
 
49
+ def split_into_chunks(sentences, max_tokens=750, overlap=100):
50
+ chunks, current_chunk, current_len = [], [], 0
51
  for sentence in sentences:
52
+ token_count = len(sentence.split())
53
+ if current_len + token_count > max_tokens and current_chunk:
54
+ chunks.append(" ".join(current_chunk))
55
+ current_chunk = current_chunk[-overlap:]
56
+ current_len = sum(len(s.split()) for s in current_chunk)
57
+ current_chunk.append(sentence)
58
+ current_len += token_count
59
+ if current_chunk:
60
+ chunks.append(" ".join(current_chunk))
61
  return chunks
62
 
63
+ def extract_metadata_from_filename(filename):
64
  name = filename.lower().replace("_", " ").replace("-", " ")
65
  meta = {"model": "unknown", "doc_type": "unknown", "brand": "life fitness"}
66
+ if "om" in name: meta["doc_type"] = "owner manual"
67
+ elif "sm" in name: meta["doc_type"] = "service manual"
68
  elif "assembly" in name: meta["doc_type"] = "assembly instructions"
69
  elif "alert" in name: meta["doc_type"] = "installer alert"
70
  elif "parts" in name: meta["doc_type"] = "parts manual"
71
+ known_models = ["se3hd", "se3", "se4", "symbio", "explore", "integrity x", "integrity sl", "everest", "engage"]
72
+ for model in known_models:
73
+ if model.replace(" ", "") in name.replace(" ", ""):
74
+ meta["model"] = model
75
  return meta
76
 
77
+ def extract_text_with_ocr(page):
78
+ text = page.get_text().strip()
79
+ if text:
80
+ return text
81
+ pix = page.get_pixmap(dpi=300)
82
+ img_data = pix.tobytes("png")
83
+ img = Image.open(tempfile.SpooledTemporaryFile())
84
+ img.fp.write(img_data)
85
+ img.fp.seek(0)
86
+ return pytesseract.image_to_string(img).strip()
 
 
 
 
 
 
 
 
87
 
88
+ # ---------------------------
89
+ # 🧠 EMBEDDING + CHROMA
90
+ # ---------------------------
91
+ def embed_pdfs_from_uploaded(files, progress=gr.Progress(track_tqdm=True)):
92
+ os.makedirs(pdf_folder, exist_ok=True)
93
+ temp_chunks = []
94
+ for file in files:
95
+ filename = os.path.basename(file.name)
96
+ dst = os.path.join(pdf_folder, filename)
97
+ shutil.copy(file.name, dst)
98
+ doc = fitz.open(dst)
99
+ meta = extract_metadata_from_filename(filename)
100
+ for page_num, page in enumerate(doc, start=1):
101
+ text = extract_text_with_ocr(page)
102
  sents = tokenize_sentences(clean_text(text))
103
+ chunks = split_into_chunks(sents, chunk_size, chunk_overlap)
104
  for i, chunk in enumerate(chunks):
105
+ temp_chunks.append({
106
+ "chunk_id": f"{filename}::page_{page_num}::chunk_{i+1}",
107
+ "source_file": filename,
108
+ "page": page_num,
109
+ "text": chunk,
110
+ **meta
111
+ })
112
+
113
+ with open(output_jsonl_chunks, "w", encoding="utf-8") as f:
114
+ for c in temp_chunks:
115
+ json.dump(c, f)
116
+ f.write("\n")
117
+
118
+ embedder = SentenceTransformer("all-MiniLM-L6-v2")
119
+ client = chromadb.PersistentClient(path=chroma_path)
120
+ if collection_name in [c.name for c in client.list_collections()]:
121
+ client.delete_collection(collection_name)
122
+ collection = client.create_collection(collection_name)
123
+
124
+ for i in tqdm(range(0, len(temp_chunks), 16)):
125
+ batch = temp_chunks[i:i+16]
126
+ texts = [b["text"] for b in batch]
127
+ metadatas = [b for b in batch]
128
+ ids = [b["chunk_id"] for b in batch]
129
+ embeddings = embedder.encode(texts).tolist()
130
+ collection.add(documents=texts, ids=ids, metadatas=metadatas, embeddings=embeddings)
131
+
132
  return collection, embedder
133
 
134
+ # ---------------------------
135
+ # πŸ€– LLM INFERENCE
136
+ # ---------------------------
137
+ def load_llm(model_key):
138
+ model_id = MODEL_MAP.get(model_key)
139
+ if not model_id or not HF_TOKEN:
140
+ return None, None, None
141
+ tokenizer = AutoTokenizer.from_pretrained(model_id, token=HF_TOKEN)
142
+ model = AutoModelForCausalLM.from_pretrained(model_id, token=HF_TOKEN, device_map="auto")
143
+ pipe = pipeline("text-generation", model=model, tokenizer=tokenizer, max_new_tokens=300)
144
+ return tokenizer, model, pipe
145
+
146
+ def generate_answer(pipe, tokenizer, context, query):
147
+ messages = [
148
+ {"role": "system", "content": "You are an expert manual assistant. Answer accurately using only the context."},
149
+ {"role": "user", "content": f"Context:\n{context}\n\nQuestion: {query}"}
150
+ ]
151
+ prompt = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
152
+ output = pipe(prompt)[0]["generated_text"]
153
+ return output.split("\n")[-1].strip()
154
+
155
+ # ---------------------------
156
+ # 🎯 FULL PIPELINE
157
+ # ---------------------------
158
+ def rag_pipeline(query, model_key, files):
159
+ collection, embedder = embed_pdfs_from_uploaded(files)
160
+ query_embedding = embedder.encode(query, convert_to_tensor=True)
161
+ results = collection.query(query_texts=[query], n_results=MAX_CONTEXT_CHUNKS)
162
+ if not results["documents"]:
163
+ return "No matches found."
164
+
165
+ context = "\n\n".join(results["documents"][0])
166
+ tokenizer, model, pipe = load_llm(model_key)
167
+ if pipe:
168
+ return generate_answer(pipe, tokenizer, context, query)
169
+ return "Model could not be loaded."
170
+
171
+ # ---------------------------
172
+ # πŸ–₯️ GRADIO UI
173
+ # ---------------------------
174
+ with gr.Blocks() as demo:
175
+ gr.Markdown("""# 🧠 SmartManuals-AI with Multi-Model RAG
176
+ Upload your PDF manuals and ask smart questions. Choose your preferred LLM.""")
177
  with gr.Row():
178
+ file_upload = gr.File(file_types=[".pdf"], file_count="multiple", label="Upload Manuals")
179
+ with gr.Row():
180
+ query_box = gr.Textbox(label="Question")
181
+ model_selector = gr.Dropdown(label="Choose Model", choices=list(MODEL_MAP.keys()), value="LLaMA 3 (8B)")
182
+ submit_btn = gr.Button("Run Query")
183
+ answer_box = gr.Textbox(label="Answer", lines=8)
184
+
185
+ submit_btn.click(fn=rag_pipeline, inputs=[query_box, model_selector, file_upload], outputs=[answer_box])
186
+
187
+ demo.launch()