Spaces:
Sleeping
Sleeping
import streamlit as st | |
from docx import Document | |
import os | |
from langchain_core.prompts import PromptTemplate | |
from transformers import AutoTokenizer, AutoModelForCausalLM | |
import torch | |
import time | |
from sentence_transformers import SentenceTransformer | |
from langchain.vectorstores import Chroma | |
from langchain.docstore.document import Document as Document2 | |
from langchain_community.embeddings import HuggingFaceEmbeddings | |
from huggingface_hub import HfFolder | |
# Load token from environment variable | |
token = os.getenv("HF_TOKEN") | |
# Save the token to Hugging Face's system directory | |
HfFolder.save_token(token) | |
docs_folder = "./converted_docs" | |
# Function to load .docx files from Google Drive folder | |
def load_docx_files_from_drive(drive_folder): | |
docx_files = [f for f in os.listdir(drive_folder) if f.endswith(".docx")] | |
documents = [] | |
for file_name in docx_files: | |
file_path = os.path.join(drive_folder, file_name) | |
doc = Document(file_path) | |
content = "\n".join([p.text for p in doc.paragraphs if p.text.strip()]) | |
documents.append(content) | |
return documents | |
# Load .docx files from Google Drive folder | |
documents = load_docx_files_from_drive(docs_folder) | |
def split_extracted_text_into_chunks(documents): | |
# List to hold all chunks | |
chunks = [] | |
for doc_text in documents: | |
# Split the document text into lines | |
lines = doc_text.splitlines() | |
# Initialize variables for splitting | |
current_chunk = [] | |
for line in lines: | |
# Check if the line starts with "File Name:" | |
if line.startswith("File Name:"): | |
# If there's a current chunk, save it before starting a new one | |
if current_chunk: | |
chunks.append("\n".join(current_chunk)) | |
current_chunk = [] # Reset the current chunk | |
# Add the line to the current chunk | |
current_chunk.append(line) | |
# Add the last chunk for the current document | |
if current_chunk: | |
chunks.append("\n".join(current_chunk)) | |
return chunks | |
# Split the extracted documents into chunks | |
chunks = split_extracted_text_into_chunks(documents) | |
def save_chunks_to_file(chunks, output_file_path): | |
# Open the file in write mode | |
with open(output_file_path, "w", encoding="utf-8") as file: | |
for i, chunk in enumerate(chunks, start=1): | |
# Write each chunk with a header for easy identification | |
file.write(f"Chunk {i}:\n") | |
file.write(chunk) | |
file.write("\n" + "=" * 50 + "\n") | |
# Path to save the chunks file | |
output_file_path = "./chunks_output.txt" | |
# Split the extracted documents into chunks | |
chunks = split_extracted_text_into_chunks(documents) | |
# Save the chunks to the file | |
save_chunks_to_file(chunks, output_file_path) | |
# Step 1: Load the model through LangChain's wrapper | |
embedding_model = HuggingFaceEmbeddings( | |
model_name="Omartificial-Intelligence-Space/Arabic-Triplet-Matryoshka-V2" | |
) | |
# Step 2: Embed the chunks (now simplified) | |
def embed_chunks(chunks): | |
return [ | |
{"chunk": chunk, "embedding": embedding_model.embed_query(chunk)} | |
for chunk in chunks | |
] | |
embeddings = embed_chunks(chunks) | |
# Step 3: Prepare documents (unchanged) | |
def prepare_documents_for_chroma(embeddings): | |
return [ | |
Document2(page_content=entry["chunk"], metadata={"chunk_index": i}) | |
for i, entry in enumerate(embeddings, start=1) | |
] | |
documents = prepare_documents_for_chroma(embeddings) | |
# Step 4: Create Chroma store (fixed) | |
vectorstore = Chroma.from_documents( | |
documents=documents, | |
embedding=embedding_model, # Proper embedding object | |
persist_directory="./chroma_db", # Optional persistence | |
) | |
class RAGPipeline: | |
def __init__(self, vectorstore, model_name="CohereForAI/aya-expanse-8b", k=6): | |
self.vectorstore = vectorstore | |
self.model_name = model_name | |
self.k = k | |
self.retriever = self.vectorstore.as_retriever( | |
search_type="mmr", search_kwargs={"k": self.k} | |
) | |
self.prompt_template = PromptTemplate.from_template(self._get_template()) | |
# Load model and tokenizer | |
self.tokenizer = AutoTokenizer.from_pretrained(self.model_name, token=token) | |
self.model = AutoModelForCausalLM.from_pretrained( | |
self.model_name, torch_dtype=torch.bfloat16, device_map="auto", token=token | |
) | |
def _get_template(self): | |
return """\ | |
<s>[INST] <<SYS>> | |
أنت مساعد مفيد يقدم إجابات باللغة العربية بناءً على السياق المقدم. | |
- أجب فقط باللغة العربية | |
- إذا لم تجد إجابة في السياق، قل أنك لا تعرف | |
- كن دقيقاً وواضحاً في إجاباتك | |
<</SYS>> | |
السياق: {context} | |
السؤال: {question} | |
الإجابة: [/INST]\ | |
""" | |
def generate_response(self, question): | |
retrieved_docs = self._retrieve_documents(question) | |
prompt = self._create_prompt(retrieved_docs, question) | |
response = self._generate_response(prompt) | |
return response | |
def _retrieve_documents(self, question): | |
start = time.time() | |
retrieved_docs = self.retriever.invoke(question) | |
result = {f"doc_{i}": doc.page_content for i, doc in enumerate(retrieved_docs)} | |
end = time.time() | |
time_lapsed = end - start | |
print(f"Time lapsed in Retreival: {time_lapsed}") | |
return result | |
def _create_prompt(self, docs, question): | |
return self.prompt_template.format(context=docs, question=question) | |
def _generate_response(self, prompt): | |
inputs = self.tokenizer(prompt, return_tensors="pt").to(self.model.device) | |
start = time.time() | |
outputs = self.model.generate( | |
inputs.input_ids, | |
max_new_tokens=1024, | |
temperature=0.7, | |
top_p=0.9, | |
do_sample=True, | |
pad_token_id=self.tokenizer.eos_token_id, | |
) | |
end = time.time() | |
time_lapsed = end - start | |
print(f"Time lapsed in Generation: {time_lapsed}") | |
response = self.tokenizer.decode(outputs[0], skip_special_tokens=True) | |
# Extract only the assistant's response after [/INST] | |
return response.split("[/INST]")[-1].strip() | |
rag_pipeline = RAGPipeline(vectorstore) | |
question = st.text_area("أدخل سؤالك هنا") | |
if st.button("Generate Answer"): | |
response = rag_pipeline.generate_response(question) | |
st.write(response) | |
print("Question: ", question) | |
print("Response: ", response) | |