import streamlit as st from docx import Document import os from langchain_core.prompts import PromptTemplate from transformers import AutoTokenizer, AutoModelForCausalLM import torch import time from sentence_transformers import SentenceTransformer from langchain.vectorstores import Chroma from langchain.docstore.document import Document as Document2 from langchain_community.embeddings import HuggingFaceEmbeddings docs_folder = "./converted_docs" # Function to load .docx files from Google Drive folder def load_docx_files_from_drive(drive_folder): docx_files = [f for f in os.listdir(drive_folder) if f.endswith(".docx")] documents = [] for file_name in docx_files: file_path = os.path.join(drive_folder, file_name) doc = Document(file_path) content = "\n".join([p.text for p in doc.paragraphs if p.text.strip()]) documents.append(content) return documents # Load .docx files from Google Drive folder documents = load_docx_files_from_drive(docs_folder) def split_extracted_text_into_chunks(documents): # List to hold all chunks chunks = [] for doc_text in documents: # Split the document text into lines lines = doc_text.splitlines() # Initialize variables for splitting current_chunk = [] for line in lines: # Check if the line starts with "File Name:" if line.startswith("File Name:"): # If there's a current chunk, save it before starting a new one if current_chunk: chunks.append("\n".join(current_chunk)) current_chunk = [] # Reset the current chunk # Add the line to the current chunk current_chunk.append(line) # Add the last chunk for the current document if current_chunk: chunks.append("\n".join(current_chunk)) return chunks # Split the extracted documents into chunks chunks = split_extracted_text_into_chunks(documents) def save_chunks_to_file(chunks, output_file_path): # Open the file in write mode with open(output_file_path, "w", encoding="utf-8") as file: for i, chunk in enumerate(chunks, start=1): # Write each chunk with a header for easy identification file.write(f"Chunk {i}:\n") file.write(chunk) file.write("\n" + "=" * 50 + "\n") # Path to save the chunks file output_file_path = "./chunks_output.txt" # Split the extracted documents into chunks chunks = split_extracted_text_into_chunks(documents) # Save the chunks to the file save_chunks_to_file(chunks, output_file_path) # Step 1: Load the model through LangChain's wrapper embedding_model = HuggingFaceEmbeddings( model_name="Omartificial-Intelligence-Space/Arabic-Triplet-Matryoshka-V2" ) # Step 2: Embed the chunks (now simplified) def embed_chunks(chunks): return [ {"chunk": chunk, "embedding": embedding_model.embed_query(chunk)} for chunk in chunks ] embeddings = embed_chunks(chunks) # Step 3: Prepare documents (unchanged) def prepare_documents_for_chroma(embeddings): return [ Document2(page_content=entry["chunk"], metadata={"chunk_index": i}) for i, entry in enumerate(embeddings, start=1) ] documents = prepare_documents_for_chroma(embeddings) # Step 4: Create Chroma store (fixed) vectorstore = Chroma.from_documents( documents=documents, embedding=embedding_model, # Proper embedding object persist_directory="./chroma_db", # Optional persistence ) class RAGPipeline: def __init__(self, vectorstore, model_name="CohereForAI/aya-expanse-8b", k=6): self.vectorstore = vectorstore self.model_name = model_name self.k = k self.retriever = self.vectorstore.as_retriever( search_type="mmr", search_kwargs={"k": self.k} ) self.prompt_template = PromptTemplate.from_template(self._get_template()) # Load model and tokenizer self.tokenizer = AutoTokenizer.from_pretrained(self.model_name) self.model = AutoModelForCausalLM.from_pretrained( self.model_name, torch_dtype=torch.bfloat16, device_map="auto" ) def _get_template(self): return """\ [INST] <> أنت مساعد مفيد يقدم إجابات باللغة العربية بناءً على السياق المقدم. - أجب فقط باللغة العربية - إذا لم تجد إجابة في السياق، قل أنك لا تعرف - كن دقيقاً وواضحاً في إجاباتك <> السياق: {context} السؤال: {question} الإجابة: [/INST]\ """ def generate_response(self, question): retrieved_docs = self._retrieve_documents(question) prompt = self._create_prompt(retrieved_docs, question) response = self._generate_response(prompt) return response def _retrieve_documents(self, question): start = time.time() retrieved_docs = self.retriever.invoke(question) result = {f"doc_{i}": doc.page_content for i, doc in enumerate(retrieved_docs)} end = time.time() time_lapsed = end - start print(f"Time lapsed in Retreival: {time_lapsed}") return result def _create_prompt(self, docs, question): return self.prompt_template.format(context=docs, question=question) def _generate_response(self, prompt): inputs = self.tokenizer(prompt, return_tensors="pt").to(self.model.device) start = time.time() outputs = self.model.generate( inputs.input_ids, max_new_tokens=1024, temperature=0.7, top_p=0.9, do_sample=True, pad_token_id=self.tokenizer.eos_token_id, ) end = time.time() time_lapsed = end - start print(f"Time lapsed in Generation: {time_lapsed}") response = self.tokenizer.decode(outputs[0], skip_special_tokens=True) # Extract only the assistant's response after [/INST] return response.split("[/INST]")[-1].strip() rag_pipeline = RAGPipeline(vectorstore) question = st.text_area("أدخل سؤالك هنا") if st.button("Generate Answer"): response = rag_pipeline.generate_response(question) st.write(response) print("Question: ", question) print("Response: ", response)