import os import json import fitz # PyMuPDF import nltk import chromadb from tqdm import tqdm from nltk.tokenize import sent_tokenize from sentence_transformers import SentenceTransformer, util import numpy as np import torch from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline import pytesseract from PIL import Image import io import gradio as gr # --------------------------- # โš™๏ธ Configuration # --------------------------- pdf_folder = r"./Manuals" # Path relative to the app.py file in the Space output_jsonl_pages = "manual_pages_with_ocr.jsonl" output_jsonl_chunks = "manual_chunks_with_ocr.jsonl" chroma_path = "./chroma_store" collection_name = "manual_chunks" chunk_size = 750 chunk_overlap = 100 MAX_CONTEXT_CHUNKS = 3 # Max chunks to send to the LLM # Hugging Face Model Configuration HF_MODEL_ID = "meta-llama/Llama-3.1-8B-Instruct" # Read HF Token from environment variable for security HF_TOKEN = os.environ.get("HF_TOKEN") # Hugging Face Space secret name # --------------------------- # Ensure NLTK resources are available # --------------------------- try: nltk.data.find('tokenizers/punkt') except nltk.downloader.DownloadError: nltk.download('punkt') except LookupError: nltk.download('punkt') # --------------------------- # ๐Ÿ“„ Utility: Read PDF to text (with OCR fallback) # --------------------------- # This combines logic from extract_text_from_pdf and extract_text_from_page def extract_text_from_page_with_ocr(page): text = page.get_text().strip() if text: return text, False # native text found, no OCR needed # If native text is missing, try OCR try: pix = page.get_pixmap(dpi=300) img_data = pix.tobytes("png") img = Image.open(io.BytesIO(img_data)) ocr_text = pytesseract.image_to_string(img).strip() return ocr_text, True except Exception as e: print(f"OCR failed for a page: {e}") return "", False # Return empty and indicate OCR was not used if it fails # --------------------------- # ๐Ÿงน Clean up lines (from original notebook) # --------------------------- def clean_text(text): lines = text.splitlines() lines = [line.strip() for line in lines if line.strip()] return "\n".join(lines) # --------------------------- # โœ‚๏ธ Sentence Tokenizer (from original notebook) # --------------------------- def tokenize_sentences(text): return sent_tokenize(text) # --------------------------- # ๐Ÿ“ฆ Chunk into fixed size blocks (from original notebook) # --------------------------- def split_into_chunks(sentences, max_tokens=750, overlap=100): chunks = [] current_chunk = [] current_len = 0 for sentence in sentences: token_count = len(sentence.split()) # Check if adding the next sentence exceeds max_tokens # If it does, and the current chunk is not empty, save the current chunk if current_len + token_count > max_tokens and current_chunk: chunks.append(" ".join(current_chunk)) # Start the next chunk with the overlap current_chunk = current_chunk[-overlap:] # Recalculate current_len based on the overlap current_len = sum(len(s.split()) for s in current_chunk) # Add the current sentence and update length current_chunk.append(sentence) current_len += token_count # Add the last chunk if it's not empty if current_chunk: chunks.append(" ".join(current_chunk)) return chunks # --------------------------- # ๐Ÿง  Extract Metadata from Filename (from original notebook) # --------------------------- def extract_metadata_from_filename(filename): name = filename.lower().replace("_", " ").replace("-", " ") metadata = { "model": "unknown", "doc_type": "unknown", "brand": "life fitness" # Assuming 'life fitness' is constant based on your notebook } if "om" in name or "owner" in name: metadata["doc_type"] = "owner manual" elif "sm" in name or "service" in name: metadata["doc_type"] = "service manual" elif "assembly" in name: metadata["doc_type"] = "assembly instructions" elif "alert" in name: metadata["doc_type"] = "installer alert" elif "parts" in name: metadata["doc_type"] = "parts manual" elif "bulletin" in name: metadata["doc_type"] = "service bulletin" known_models = [ "se3hd", "se3", "se4", "symbio", "explore", "integrity x", "integrity sl", "everest", "engage", "inspire", "discover", "95t", "95x", "95c", "95r", "97c" ] for model in known_models: # Use regex for more robust matching if needed, but simple 'in' check from notebook if model.replace(" ", "") in name.replace(" ", ""): metadata["model"] = model break return metadata # --------------------------- # ๐Ÿš€ Step 1: Process PDFs, Extract Pages with OCR # --------------------------- def process_pdfs_for_pages(pdf_folder, output_jsonl): print("Starting PDF processing and OCR...") all_pages = [] if not os.path.exists(pdf_folder): print(f"Error: PDF folder not found at {pdf_folder}") return [] # Return empty list if folder doesn't exist pdf_files = [f for f in os.listdir(pdf_folder) if f.lower().endswith(".pdf")] if not pdf_files: print(f"No PDF files found in {pdf_folder}") return [] for pdf_file in tqdm(pdf_files, desc="Scanning PDFs"): path = os.path.join(pdf_folder, pdf_file) try: doc = fitz.open(path) for page_num, page in enumerate(doc, start=1): text, used_ocr = extract_text_from_page_with_ocr(page) if text: # Only save pages with extracted text all_pages.append({ "source_file": pdf_file, "page": page_num, "text": text, "ocr_used": used_ocr }) doc.close() # Close the document except Exception as e: print(f"Error processing {pdf_file}: {e}") continue # Skip to the next file with open(output_jsonl, "w", encoding="utf-8") as f: for page in all_pages: json.dump(page, f) f.write("\n") print(f"โœ… Saved {len(all_pages)} pages to {output_jsonl} (with OCR fallback)") return all_pages # Return the list of pages # --------------------------- # ๐Ÿš€ Step 2: Chunk the Pages # --------------------------- def chunk_pages(input_jsonl, output_jsonl, chunk_size, chunk_overlap): print("Starting page chunking...") all_chunks = [] if not os.path.exists(input_jsonl): print(f"Error: Input JSONL file not found at {input_jsonl}. Run PDF processing first.") return [] try: with open(input_jsonl, "r", encoding="utf-8") as f: # Count lines for tqdm progress bar total_lines = sum(1 for _ in f) f.seek(0) # Reset file pointer to the beginning for line in tqdm(f, total=total_lines, desc="Chunking pages"): try: page = json.loads(line) source_file = page["source_file"] page_number = page["page"] text = page["text"] metadata = extract_metadata_from_filename(source_file) sentences = tokenize_sentences(clean_text(text)) # Clean and tokenize the page text chunks = split_into_chunks(sentences, max_tokens=chunk_size, overlap=chunk_overlap) for i, chunk in enumerate(chunks): # Ensure chunk text is not empty if chunk.strip(): all_chunks.append({ "source_file": source_file, "chunk_id": f"{source_file}::page_{page_number}::chunk_{i+1}", "page": page_number, "ocr_used": page.get("ocr_used", False), # Use .get for safety "model": metadata.get("model", "unknown"), "doc_type": metadata.get("doc_type", "unknown"), "brand": metadata.get("brand", "life fitness"), "text": chunk.strip() # Ensure no leading/trailing whitespace }) except json.JSONDecodeError: print(f"Skipping invalid JSON line: {line}") except Exception as e: print(f"Error processing page from {line}: {e}") continue # Continue with the next line except Exception as e: print(f"Error opening or reading input JSONL file: {e}") return [] if not all_chunks: print("No chunks were created.") with open(output_jsonl, "w", encoding="utf-8") as f: for chunk in all_chunks: json.dump(chunk, f) f.write("\n") print(f"โœ… Done! {len(all_chunks)} chunks saved to {output_jsonl}") return all_chunks # Return the list of chunks # --------------------------- # ๐Ÿš€ Step 3: Embed Chunks into Chroma # --------------------------- def embed_chunks_into_chroma(jsonl_path, chroma_path, collection_name): print("Starting ChromaDB embedding...") try: embedder = SentenceTransformer("all-MiniLM-L6-v2") embedder.eval() print("โœ… SentenceTransformer model loaded.") except Exception as e: print(f"โŒ Error loading SentenceTransformer model: {e}") return None, "Error loading SentenceTransformer model." try: # Use a persistent client client = chromadb.PersistentClient(path=chroma_path) # Check if collection exists and delete if it does to rebuild try: client.get_collection(name=collection_name) client.delete_collection(collection_name) print(f"Deleted existing collection: {collection_name}") except Exception: # Collection does not exist, which is fine pass collection = client.create_collection(name=collection_name) print(f"โœ… ChromaDB collection '{collection_name}' created.") except Exception as e: print(f"โŒ Error initializing ChromaDB: {e}") return None, "Error initializing ChromaDB." texts, metadatas, ids = [], [], [] batch_size = 16 # Define batch size for embedding if not os.path.exists(jsonl_path): print(f"Error: Input JSONL file not found at {jsonl_path}. Run chunking first.") return None, "Input chunk file not found." try: with open(jsonl_path, "r", encoding="utf-8") as f: # Count lines for tqdm progress bar total_lines = sum(1 for _ in f) f.seek(0) # Reset file pointer to the beginning for line in tqdm(f, total=total_lines, desc="Embedding chunks"): try: item = json.loads(line) texts.append(item.get("text", "")) # Use .get for safety ids.append(item.get("chunk_id", f"unknown_{len(ids)}")) # Ensure chunk_id exists # Prepare metadata, ensuring all keys are strings and handling potential missing keys metadata = {str(k): str(v) for k, v in item.items() if k != "text"} metadatas.append(metadata) if len(texts) >= batch_size: embeddings = embedder.encode(texts).tolist() collection.add(documents=texts, metadatas=metadatas, ids=ids, embeddings=embeddings) texts, metadatas, ids = [], [], [] # Reset batches except json.JSONDecodeError: print(f"Skipping invalid JSON line during embedding: {line}") except Exception as e: print(f"Error processing chunk line {line} during embedding: {e}") continue # Continue with the next line # Add any remaining items in the last batch if texts: embeddings = embedder.encode(texts).tolist() collection.add(documents=texts, metadatas=metadatas, ids=ids, embeddings=embeddings) print("โœ… All OCR-enhanced chunks embedded in Chroma!") return collection, None # Return collection and no error except Exception as e: print(f"โŒ Error reading input JSONL file for embedding: {e}") return None, "Error reading input file for embedding." # --------------------------- # ๐Ÿง  Load Hugging Face Model and Tokenizer # --------------------------- # This needs to happen after imports but before the Gradio interface tokenizer = None model = None pipe = None print(f"Attempting to load Hugging Face model: {HF_MODEL_ID}") print(f"Using HF_TOKEN (present: {HF_TOKEN is not None})") if not HF_TOKEN: print("โŒ HF_TOKEN environment variable not set. Cannot load Hugging Face model.") else: try: # Check if CUDA is available device = "cuda" if torch.cuda.is_available() else "cpu" print(f"Using device: {device}") # Load tokenizer and model tokenizer = AutoTokenizer.from_pretrained(HF_MODEL_ID, token=HF_TOKEN) model = AutoModelForCausalLM.from_pretrained( HF_MODEL_ID, token=HF_TOKEN, torch_dtype=torch.bfloat16 if torch.cuda.is_available() else torch.float32, # Use bfloat16 on GPU device_map="auto" if torch.cuda.is_available() else None # Auto device mapping on GPU ).to(device) # Move model to selected device # Create a pipeline for easy inference pipe = pipeline( "text-generation", model=model, tokenizer=tokenizer, max_new_tokens=512, temperature=0.1, top_p=0.9, do_sample=True, device=0 if torch.cuda.is_available() else -1 # Specify device for pipeline ) print(f"โœ… Successfully loaded Hugging Face model: {HF_MODEL_ID} on {device}") except Exception as e: print(f"โŒ Error loading Hugging Face model: {e}") print("Please ensure:") print("- The HF_TOKEN secret is set in your Hugging Face Space settings.") print("- Your Space has sufficient resources (GPU, RAM) for the model.") print("- You have accepted the model's terms on Hugging Face (if required).") tokenizer, model, pipe = None, None, None # Set to None if loading fails # --------------------------- # ๐Ÿ”Ž Query Function (Uses Embedder and Chroma) # --------------------------- # Embedder is loaded during the embedding step, need to ensure it's accessible embedder = None # Initialize embedder as None def query_manuals(question, model_filter=None, doc_type_filter=None, top_k=5, rerank_keywords=None): global embedder # Access the global embedder variable if collection is None or embedder is None: print("โš ๏ธ ChromaDB or Embedder not loaded. Cannot perform vector search.") return [] # Return empty if Chroma or Embedder is not loaded where_filter = {} if model_filter: where_filter["model"] = model_filter.lower() if doc_type_filter: where_filter["doc_type"] = doc_type_filter.lower() # ChromaDB query expects a dictionary for 'where' results = collection.query( query_texts=[question], n_results=top_k * 5, # fetch more for reranking where={} if not where_filter else where_filter # Pass empty dict if no filter ) if not results or not results.get("documents") or not results["documents"][0]: return [] # No matches try: question_embedding = embedder.encode(question, convert_to_tensor=True) except Exception as e: print(f"Error encoding question: {e}") return [] # Return empty if embedding fails # Step 3: Compute semantic + keyword score reranked = [] # Ensure results["documents"] and results["metadatas"] are not empty before iterating if results.get("documents") and results["documents"][0]: for i, text in enumerate(results["documents"][0]): meta = results["metadatas"][0][i] # Handle potential encoding errors during text embedding try: embedding = embedder.encode(text, convert_to_tensor=True) # Semantic similarity similarity_score = float(util.cos_sim(question_embedding, embedding)) except Exception as e: print(f"Error encoding chunk text for reranking: {e}. Skipping chunk.") continue # Skip this chunk if encoding fails # Keyword score keyword_score = 0 if rerank_keywords and text: # Ensure text is not None or empty for kw in rerank_keywords: if kw.lower() in text.lower(): keyword_score += 1 # Combine with tunable weights # Weights should sum to 1 for a simple weighted average final_score = (0.8 * similarity_score) + (0.2 * keyword_score) reranked.append({ "score": final_score, "text": text, "metadata": meta }) # Sort by combined score reranked.sort(key=lambda x: x["score"], reverse=True) return reranked[:top_k] # --------------------------- # ๐Ÿ’ฌ Ask Hugging Face Model # --------------------------- def ask_hf_model(prompt): if pipe is None: return "Hugging Face model not loaded. Cannot generate response." try: # Use the Llama 3.1 chat template messages = [ {"role": "system", "content": "You are a technical assistant trained to answer questions using equipment manuals. Use only the provided context to answer the question. If the answer is not clearly in the context, reply: 'I don't know.'"}, {"role": "user", "content": prompt} ] # Apply chat template and generate text prompt_text = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) outputs = pipe( prompt_text, do_sample=True, temperature=0.1, # Keep temperature low for more factual answers top_p=0.9, max_new_tokens=512, pad_token_id=tokenizer.eos_token_id # Set pad_token_id for generation ) # The output includes the prompt, we need to extract just the generated part # Find the end of the prompt text in the generated output generated_text = outputs[0]["generated_text"] # The chat template adds the assistant's turn token, look for that to find the response start response_start_token = tokenizer.apply_chat_template([{"role": "assistant", "content": ""}], tokenize=False, add_generation_prompt=False) response_start_index = generated_text.find(response_start_token) if response_start_index != -1: response = generated_text[response_start_index + len(response_start_token):].strip() else: # Fallback if the assistant token isn't found response = generated_text.strip() # Remove any trailing EOS tokens or similar artifacts if response.endswith(tokenizer.eos_token): response = response[:-len(tokenizer.eos_token)].strip() return response except Exception as e: return f"โŒ Error generating response from Hugging Face model: {str(e)}" # --------------------------- # ๐ŸŽฏ Full RAG Pipeline # --------------------------- def run_rag_qa(user_question, model_filter=None, doc_type_filter=None): # Added filters as optional inputs # Ensure ChromaDB and the HF model pipeline are loaded before proceeding if collection is None: return "ChromaDB is not loaded. Ensure PDFs are in ./Manuals and the app started correctly." if pipe is None: return "Hugging Face model pipeline is not loaded. Ensure HF_TOKEN is set and the model loaded successfully." results = query_manuals( question=user_question, model_filter=model_filter, # Use the optional filter inputs doc_type_filter=doc_type_filter, top_k=MAX_CONTEXT_CHUNKS, rerank_keywords=["diagnostic", "immobilize", "system", "screen", "service", "error"] # Example keywords ) if not results: # Attempt a broader search if initial filter yields no results if model_filter or doc_type_filter: print("No results with specified filters, trying broader search...") results = query_manuals( question=user_question, model_filter=None, # Remove filters for broader search doc_type_filter=None, top_k=MAX_CONTEXT_CHUNKS, rerank_keywords=["diagnostic", "immobilize", "system", "screen", "service", "error"] ) if not results: return "No relevant documents found for the query, even with broader search." else: return "No relevant documents found for the query." context = "\n\n".join([f"Source File: {r['metadata'].get('source_file', 'N/A')}, Page: {r['metadata'].get('page', 'N/A')}\nText: {r['text'].strip()}" for r in results]) prompt = f""" Context: {context} Question: {user_question} """ return ask_hf_model(prompt) # --------------------------- # --- Initial Setup --- # This code runs when the app starts on Hugging Face Spaces # It processes PDFs, chunks, and builds the ChromaDB # --------------------------- print("Starting initial setup...") # Ensure Tesseract is available on the system (Hugging Face Spaces usually has it, but this command is good practice) # Using ! in app.py is generally discouraged, better to ensure the environment has it # For HF Spaces, you might need to use a Dockerfile or rely on the default environment. # If Tesseract isn't found, the OCR part might fail. # Process PDFs and extract pages all_pages = process_pdfs_for_pages(pdf_folder, output_jsonl_pages) # Chunk the pages all_chunks = [] if all_pages: # Only chunk if pages were processed all_chunks = chunk_pages(output_jsonl_pages, output_jsonl_chunks, chunk_size, chunk_overlap) # Embed chunks into ChromaDB collection = None # Initialize collection if all_chunks: # Only embed if chunks were created collection, embed_error = embed_chunks_into_chroma(output_jsonl_chunks, chroma_path, collection_name) if embed_error: print(f"Error during embedding: {embed_error}") print("Initial setup complete.") # --------------------------- # ๐Ÿ–ฅ๏ธ Gradio Interface # --------------------------- # Only define and launch the interface if the necessary components loaded if collection is not None and pipe is not None: with gr.Blocks() as demo: gr.Markdown("""# ๐Ÿง  Manual QA via Hugging Face Llama 3.1 Ask a technical question and get answers using your own PDF manual database and a Hugging Face model. **Note:** Initial startup might take time to process manuals and build the search index. Ensure your `Manuals` folder is uploaded and the `HF_TOKEN` secret is set in Space settings. """) with gr.Row(): question = gr.Textbox(label="Your Question", placeholder="e.g. How do I access diagnostics on the SE3 console?") with gr.Row(): model_filter_input = gr.Textbox(label="Filter by Model (Optional)", placeholder="e.g. se3hd") doc_type_filter_input = gr.Dropdown(label="Filter by Document Type (Optional)", choices=["owner manual", "service manual", "assembly instructions", "installer alert", "parts manual", "service bulletin", "unknown", None], value=None, allow_custom_value=True) submit = gr.Button("๐Ÿ” Ask") answer = gr.Textbox(label="Answer", lines=10) # Increased lines for better readability # Call the run_rag_qa function when the button is clicked submit.click( fn=run_rag_qa, inputs=[question, model_filter_input, doc_type_filter_input], outputs=[answer] ) # In Hugging Face Spaces, the app is launched automatically. # The demo.launch() call is removed. # demo.launch() else: print("Gradio demo will not launch because RAG components (ChromaDB or HF Model) failed to load during setup.") # You could add a simple Gradio interface here to show an error message # if you wanted to provide user feedback in the Space UI even on failure. # Example: # with gr.Blocks() as error_demo: # gr.Markdown("## Application Failed to Load") # gr.Textbox(label="Error Details", value="RAG components (ChromaDB or HF Model) failed to initialize. Check logs and Space settings (HF_TOKEN, resources).", interactive=False) # error_demo.launch()