Spaces:
Configuration error
Configuration error
| import shutil | |
| import streamlit as st | |
| st.set_page_config( | |
| page_title="RAG Configuration", | |
| page_icon="🤖", | |
| layout="wide", | |
| initial_sidebar_state="collapsed" | |
| ) | |
| import re | |
| import os | |
| import spire.pdf | |
| import fitz | |
| from src.Databases import * | |
| from langchain.text_splitter import * | |
| from sentence_transformers import SentenceTransformer, CrossEncoder | |
| from langchain_community.llms import HuggingFaceHub | |
| from langchain_huggingface import HuggingFaceEmbeddings | |
| from transformers import (AutoFeatureExtractor, AutoModel, AutoImageProcessor) | |
| from llama_index.embeddings.huggingface import HuggingFaceEmbedding | |
| import PyPDF2 | |
| class SentenceTransformerEmbeddings: | |
| """ | |
| Wrapper Class for SentenceTransformer Class | |
| """ | |
| def __init__(self, model_name: str): | |
| """ | |
| Initiliases a Sentence Transformer | |
| """ | |
| self.model = SentenceTransformer(model_name) | |
| def embed_documents(self, texts): | |
| """ | |
| Returns a list of embeddings for the given texts. | |
| """ | |
| return self.model.encode(texts, convert_to_tensor=True).tolist() | |
| def embed_query(self, text): | |
| """ | |
| Returns a list of embeddings for the given text. | |
| """ | |
| return self.model.encode(text, convert_to_tensor=True).tolist() | |
| def settings(): | |
| return HuggingFaceEmbedding(model_name="BAAI/bge-base-en") | |
| def pine_embedding_model(): | |
| return SentenceTransformerEmbeddings(model_name="all-mpnet-base-v2") # 784 dimension + euclidean | |
| def weaviate_embedding_model(): | |
| return SentenceTransformerEmbeddings(model_name="all-MiniLM-L6-v2") | |
| def load_image_model(model): | |
| extractor = AutoFeatureExtractor.from_pretrained(model) | |
| im_model = AutoModel.from_pretrained(model) | |
| return extractor, im_model | |
| def load_bi_encoder(): | |
| return HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L12-v2", model_kwargs={"device": "cpu"}) | |
| def pine_embedding_model(): | |
| return SentenceTransformerEmbeddings(model_name="all-mpnet-base-v2") # 784 dimension + euclidean | |
| def weaviate_embedding_model(): | |
| return SentenceTransformerEmbeddings(model_name="all-MiniLM-L6-v2") | |
| def load_cross(): | |
| return CrossEncoder("cross-encoder/ms-marco-TinyBERT-L-2-v2", max_length=512, device="cpu") | |
| def pine_cross_encoder(): | |
| return CrossEncoder("cross-encoder/ms-marco-MiniLM-L-12-v2", max_length=512, device="cpu") | |
| def weaviate_cross_encoder(): | |
| return CrossEncoder("cross-encoder/ms-marco-MiniLM-L-6-v2", max_length=512, device="cpu") | |
| def load_chat_model(): | |
| template = ''' | |
| You are an assistant for question-answering tasks. | |
| Use the following pieces of retrieved context to answer the question accurately. | |
| If the question is not related to the context, just answer 'I don't know'. | |
| Question: {question} | |
| Context: {context} | |
| Answer: | |
| ''' | |
| return HuggingFaceHub( | |
| repo_id="mistralai/Mistral-7B-Instruct-v0.1", | |
| model_kwargs={"temperature": 0.5, "max_length": 64, "max_new_tokens": 512, "query_wrapper_prompt": template} | |
| ) | |
| def load_q_model(): | |
| return HuggingFaceHub( | |
| repo_id="mistralai/Mistral-7B-Instruct-v0.3", | |
| model_kwargs={"temperature": 0.5, "max_length": 64, "max_new_tokens": 512} | |
| ) | |
| def load_image_model(model): | |
| extractor = AutoFeatureExtractor.from_pretrained(model) | |
| im_model = AutoModel.from_pretrained(model) | |
| return extractor, im_model | |
| def load_nomic_model(): | |
| return AutoImageProcessor.from_pretrained("nomic-ai/nomic-embed-vision-v1.5"), AutoModel.from_pretrained("nomic-ai/nomic-embed-vision-v1.5", | |
| trust_remote_code=True) | |
| def vector_database_prep(file): | |
| def data_prep(file): | |
| def findWholeWord(w): | |
| return re.compile(r'\b{0}\b'.format(re.escape(w)), flags=re.IGNORECASE).search | |
| file_name = file.name | |
| pdf_file_path = os.path.join(os.getcwd(), 'pdfs', file_name) | |
| image_folder = os.path.join(os.getcwd(), f'figures_{file_name}') | |
| if not os.path.exists(image_folder): | |
| os.makedirs(image_folder) | |
| # everything down here is wrt pages dir | |
| print('1. folder made') | |
| with spire.pdf.PdfDocument() as doc: | |
| doc.LoadFromFile(pdf_file_path) | |
| images = [] | |
| for page_num in range(doc.Pages.Count): | |
| page = doc.Pages[page_num] | |
| for image_num in range(len(page.ImagesInfo)): | |
| imageFileName = os.path.join(image_folder, f'figure-{page_num}-{image_num}.png') | |
| image = page.ImagesInfo[image_num] #This retrieve the image from the current pdf | |
| image.Image.Save(imageFileName) #This line save the image at spcified location for the further use in hadr disk | |
| os.chmod(imageFileName, 0o777) | |
| print("os.chmod(imageFileName, 0o777)") #This provide permission for the current image to edit in the another process | |
| images.append({ | |
| "image_file_name": imageFileName, | |
| "image": image | |
| }) #Image object and name of the iamge save in the lsit | |
| print('2. image extraction done') | |
| image_info = [] | |
| for image_file in os.listdir(image_folder): | |
| if image_file.endswith('.png'): #This confirm all the images are are in png form | |
| image_info.append({ | |
| "image_file_name": image_file[:-4], #image name without .png | |
| "image": Image.open(os.path.join(image_folder, image_file)), #This is location where that image is stored | |
| "pg_no": int(image_file.split('-')[1]) #Image page number where it is present | |
| }) | |
| print('3. temporary') | |
| figures = [] | |
| with fitz.open(pdf_file_path) as pdf_file: | |
| data = "" | |
| for page in pdf_file: | |
| text = page.get_text() | |
| if not (findWholeWord('table of contents')(text) or findWholeWord('index')(text)): | |
| data += text | |
| data = data.replace('}', '-') | |
| data = data.replace('{', '-') | |
| print('4. Data extraction done') | |
| hs = [] | |
| for i in image_info: #here three things are stored | |
| src = i['image_file_name'] + '.png' | |
| headers = {'_': []} | |
| header = '_' | |
| page = pdf_file[i['pg_no']] | |
| texts = page.get_text('dict') | |
| for block in texts['blocks']: | |
| if block['type'] == 0: | |
| for line in block['lines']: | |
| for span in line['spans']: | |
| if 'bol' in span['font'].lower() and not span['text'].isnumeric(): | |
| header = span['text'] | |
| print("header: ", header) | |
| headers[header] = [header] | |
| else: | |
| headers[header].append(span['text']) | |
| try: | |
| if findWholeWord('fig')(span['text']): | |
| i['image_file_name'] = span['text'] | |
| figures.append(span['text'].split('fig')[-1]) | |
| elif findWholeWord('figure')(span['text']): | |
| i['image_file_name'] = span['text'] | |
| figures.append(span['text'].lower().split('figure')[-1]) | |
| else: | |
| pass | |
| except re.error: | |
| pass | |
| if not i['image_file_name'].endswith('.png'): | |
| s = i['image_file_name'] + '.png' | |
| i['image_file_name'] = s | |
| # os.rename(os.path.join(image_folder, src), os.path.join(image_folder, i['image_file_name'])) | |
| hs.append({"image": i, "header": headers}) | |
| print('5. header and figures done') | |
| figure_contexts = {} | |
| for fig in figures: | |
| figure_contexts[fig] = [] | |
| for page_num in range(len(pdf_file)): | |
| page = pdf_file[page_num] | |
| texts = page.get_text('dict') | |
| for block in texts['blocks']: | |
| if block['type'] == 0: | |
| for line in block['lines']: | |
| for span in line['spans']: | |
| if findWholeWord(fig)(span['text']): | |
| print('figure mention: ', span['text']) | |
| figure_contexts[fig].append(span['text']) | |
| print('6. Figure context collected') | |
| contexts = [] | |
| for h in hs: | |
| context = "" | |
| for q in h['header'].values(): | |
| context += "".join(q) | |
| s = pytesseract.image_to_string(h['image']['image']) | |
| qwea = context + '\n' + s if len(s) != 0 else context | |
| contexts.append(( | |
| h['image']['image_file_name'], | |
| qwea, | |
| h['image']['image'] | |
| )) | |
| print('7. Overall context collected') | |
| image_content = [] | |
| for fig in figure_contexts: | |
| for c in contexts: | |
| if findWholeWord(fig)(c[0]): | |
| s = c[1] + '\n' + "\n".join(figure_contexts[fig]) | |
| s = str("\n".join( | |
| [ | |
| "".join([h for h in i.strip() if h.isprintable()]) | |
| for i in s.split('\n') | |
| if len(i.strip()) != 0 | |
| ] | |
| )) | |
| image_content.append(( | |
| c[0], | |
| s, | |
| c[2] | |
| )) | |
| print('8. Figure context added') | |
| return data, image_content | |
| # Vector Database objects | |
| extractor, i_model = st.session_state['extractor'], st.session_state['image_model'] | |
| pinecone_embed = st.session_state['pinecone_embed'] | |
| weaviate_embed = st.session_state['weaviate_embed'] | |
| vb1 = UnifiedDatabase('vb1', 'lancedb/rag') | |
| vb1.model_prep(extractor, i_model, weaviate_embed, | |
| RecursiveCharacterTextSplitter(chunk_size=1330, chunk_overlap=35)) | |
| vb2 = UnifiedDatabase('vb2', 'lancedb/rag') | |
| vb2.model_prep(extractor, i_model, pinecone_embed, | |
| RecursiveCharacterTextSplitter(chunk_size=1330, chunk_overlap=35)) | |
| vb_list = [vb1, vb2] | |
| data, image_content = data_prep(file) | |
| for vb in vb_list: | |
| vb.upsert(data) | |
| vb.upsert(image_content) # image_cont = dict[image_file_path, context, PIL] | |
| return vb_list | |
| # Function to extract text from PDF | |
| # def read_pdf(pdf_file): #this is the one change i have done here | |
| # try: | |
| # # Open the PDF file | |
| # with open(pdf_file, 'rb') as file: | |
| # reader = PyPDF2.PdfReader(file) | |
| # pdf_text = "" | |
| # # Extract text from each page | |
| # for page in reader.pages: | |
| # pdf_text += page.extract_text() | |
| # # Assuming vb_list contains tuples of (vb, sp) | |
| # for vb, sp in vb_list: | |
| # # Ensure `data` is defined properly (in this case, it could be the extracted text) | |
| # data = pdf_text | |
| # vb.upsert(data, sp) | |
| # return vb_list | |
| # except Exception as e: | |
| # print(f"Error reading or processing the PDF: {e}") | |
| # return None | |
| os.environ["HUGGINGFACEHUB_API_TOKEN"] = st.secrets["HUGGINGFACEHUB_API_TOKEN"] | |
| os.environ["LANGCHAIN_PROJECT"] = st.secrets["LANGCHAIN_PROJECT"] | |
| os.environ["OPENAI_API_KEY"] = st.secrets["GPT_KEY"] | |
| st.session_state['pdf_file'] = [] | |
| st.session_state['vb_list'] = [] | |
| st.session_state['Settings.embed_model'] = settings() | |
| st.session_state['processor'], st.session_state['vision_model'] = load_nomic_model() | |
| st.session_state['bi_encoder'] = load_bi_encoder() | |
| st.session_state['chat_model'] = load_chat_model() | |
| st.session_state['cross_model'] = load_cross() | |
| st.session_state['q_model'] = load_q_model() | |
| st.session_state['extractor'], st.session_state['image_model'] = load_image_model("google/vit-base-patch16-224-in21k") | |
| st.session_state['pinecone_embed'] = pine_embedding_model() | |
| st.session_state['weaviate_embed'] = weaviate_embedding_model() | |
| st.title('Multi-modal RAG based LLM for Information Retrieval') | |
| st.subheader('Converse with our Chatbot') | |
| st.markdown('Enter a pdf file as a source.') | |
| uploaded_file = st.file_uploader("Choose an pdf document...", type=["pdf"], accept_multiple_files=False) | |
| if uploaded_file is not None: | |
| with open(uploaded_file.name, mode='wb') as w: | |
| w.write(uploaded_file.getvalue()) | |
| if not os.path.exists(os.path.join(os.getcwd(), 'pdfs')): | |
| print("i ma here") | |
| os.makedirs(os.path.join(os.getcwd(), 'pdfs')) | |
| shutil.move(uploaded_file.name, os.path.join(os.getcwd(), 'pdfs')) | |
| st.session_state['pdf_file'] = uploaded_file.name | |
| def data_prep(file): | |
| def findWholeWord(w): | |
| return re.compile(r'\b{0}\b'.format(re.escape(w)), flags=re.IGNORECASE).search | |
| file_name = uploaded_file.name | |
| pdf_file_path = os.path.join(os.getcwd(), 'pdfs', file_name) | |
| image_folder = os.path.join(os.getcwd(), f'figures_{file_name}') #name the image folder | |
| if not os.path.exists(image_folder): | |
| os.makedirs(image_folder) #make the image folder if folder is not presnt | |
| print('1. folder made') | |
| with spire.pdf.PdfDocument() as doc: | |
| doc.LoadFromFile(pdf_file_path) | |
| images = [] | |
| for page_num in range(doc.Pages.Count): | |
| page = doc.Pages[page_num] | |
| for image_num in range(len(page.ImagesInfo)): | |
| imageFileName = os.path.join(image_folder, f'figure-{page_num}-{image_num}.png') #name the fir page number and image numer on that image | |
| # print(imageFileName) | |
| image = page.ImagesInfo[image_num] | |
| image.Image.Save(imageFileName) | |
| os.chmod(imageFileName, 0o777) | |
| images.append({ | |
| "image_file_name": imageFileName, | |
| "image": image | |
| }) | |
| return images | |
| file_path = os.path.join('pdfs', uploaded_file.name) # Define the full file path | |
| with open(file_path, mode='wb') as f: | |
| f.write(uploaded_file.getvalue()) # Save the uploaded file to disk | |
| img=data_prep(uploaded_file) | |
| st.session_state['file_path'] = file_path | |
| st.success(f"File uploaded and saved as: {file_path}") | |
| if len(img)>0: | |
| with st.spinner('Extracting'): | |
| vb_list = vector_database_prep(uploaded_file) | |
| st.session_state['vb_list'] = vb_list | |
| st.switch_page('pages/rag.py') | |
| st.experimental_rerun() | |
| else: | |
| st.switch_page('pages/b.py') | |
| # vb_list = read_pdf(uploaded_file) # Corrected to use session state | |
| # st.session_state['vb_list'] = vb_list | |
| # st.write("vb list is implemtnted") | |
| # # Ask the user for a question | |
| # question = st.text_input("Enter your question:", "How are names present in the context?") | |
| # if st.button("Submit Question"): | |
| # # Display the answer to the question | |
| # with st.spinner('Fetching the answer...'): | |
| # # Assuming query is a function that takes the question as input | |
| # answer = req.query(question) | |
| # print(answer) | |
| # st.success(f"Answer: {answer}") | |