bsnl-chatboot / app.py
samim2024's picture
Update app.py
3cc4dcf verified
raw
history blame
7.63 kB
# app.py
import streamlit as st
import os
import zipfile
import shutil
from io import BytesIO
from PyPDF2 import PdfReader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain_community.vectorstores import FAISS
from langchain_community.docstore.in_memory import InMemoryDocstore
from langchain_community.llms import HuggingFaceHub
from langchain.chains import RetrievalQA
from langchain.prompts import PromptTemplate
import faiss
import uuid
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
HUGGINGFACEHUB_API_TOKEN = os.getenv("HUGGINGFACEHUB_API_TOKEN")
RAG_ACCESS_KEY = os.getenv("RAG_ACCESS_KEY")
# Initialize session state
if "vectorstore" not in st.session_state:
st.session_state.vectorstore = None
if "history" not in st.session_state:
st.session_state.history = []
if "authenticated" not in st.session_state:
st.session_state.authenticated = False
# Sidebar
with st.sidebar:
# BSNL Logo (local file)
st.image(
"bsnl_logo.png",
width=150
)
st.header("RAG Control Panel")
api_key_input = st.text_input("Enter RAG Access Key", type="password")
# Authentication
if st.button("Authenticate"):
if api_key_input == RAG_ACCESS_KEY:
st.session_state.authenticated = True
st.success("Authentication successful!")
else:
st.error("Invalid API key.")
# File uploader
if st.session_state.authenticated:
input_type = st.selectbox("Select Input Type", ["Single PDF", "Folder/Zip of PDFs"])
input_data = None
if input_type == "Single PDF":
input_data = st.file_uploader("Upload a PDF file", type=["pdf"])
else:
input_data = st.file_uploader("Upload a folder or zip of PDFs", type=["zip"])
if st.button("Process Files") and input_data is not None:
with st.spinner("Processing files..."):
vector_store = process_input(input_type, input_data)
st.session_state.vectorstore = vector_store
st.success("Files processed successfully. You can now ask questions.")
# Display chat history
st.subheader("Chat History")
for i, (q, a) in enumerate(st.session_state.history):
st.write(f"**Q{i+1}:** {q}")
st.write(f"**A{i+1}:** {a}")
st.markdown("---")
# Main app
def main():
# Inject CSS for mint theme and attractive styling
st.markdown("""
<style>
@import url('https://fonts.googleapis.com/css2?family=Roboto:wght@400;700&display=swap');
.stApp {
background-color: #98FF98; /* Mint color */
font-family: 'Roboto', sans-serif;
color: #ffffff;
}
.stTextInput > div > div > input {
background-color: #ffffff;
color: #333333;
border-radius: 8px;
border: 1px solid #4CAF50;
padding: 10px;
box-shadow: 0 2px 4px rgba(0,0,0,0.1);
}
.stButton > button {
background-color: #4CAF50;
color: white;
border-radius: 8px;
padding: 10px 20px;
border: none;
transition: all 0.3s ease;
box-shadow: 0 2px 4px rgba(0,0,0,0.2);
}
.stButton > button:hover {
background-color: #45a049;
transform: scale(1.05);
}
.stSidebar {
background-color: #88EE88;
padding: 20px;
border-right: 2px solid #4CAF50;
}
h1, h2, h3 {
color: #ffffff;
text-shadow: 1px 1px 2px rgba(0,0,0,0.2);
}
.stSpinner > div > div {
border-color: #4CAF50 transparent transparent transparent;
}
</style>
""", unsafe_allow_html=True)
st.title("RAG Q&A App with Mistral AI")
st.markdown("Welcome to the BSNL RAG App! Upload your PDFs and ask questions with ease.", unsafe_allow_html=True)
if not st.session_state.authenticated:
st.warning("Please authenticate with your API key in the sidebar.")
return
if st.session_state.vectorstore is None:
st.info("Please upload and process a PDF or folder/zip of PDFs in the sidebar.")
return
query = st.text_input("Enter your question:")
if st.button("Submit") and query:
with st.spinner("Generating answer..."):
answer = answer_question(st.session_state.vectorstore, query)
st.session_state.history.append((query, answer))
st.write("**Answer:**", answer)
def process_input(input_type, input_data):
# Create uploads directory
os.makedirs("uploads", exist_ok=True)
documents = ""
if input_type == "Single PDF":
pdf_reader = PdfReader(input_data)
for page in pdf_reader.pages:
documents += page.extract_text() or ""
else:
# Handle zip file
zip_path = "uploads/uploaded.zip"
with open(zip_path, "wb") as f:
f.write(input_data.getvalue())
with zipfile.ZipFile(zip_path, "r") as zip_ref:
zip_ref.extractall("uploads/extracted")
# Process all PDFs in extracted folder
for root, _, files in os.walk("uploads/extracted"):
for file in files:
if file.endswith(".pdf"):
pdf_path = os.path.join(root, file)
pdf_reader = PdfReader(pdf_path)
for page in pdf_reader.pages:
documents += page.extract_text() or ""
# Clean up extracted files
shutil.rmtree("uploads/extracted", ignore_errors=True)
os.remove(zip_path)
# Split text
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=100)
texts = text_splitter.split_text(documents)
# Create embeddings
hf_embeddings = HuggingFaceEmbeddings(
model_name="sentence-transformers/all-mpnet-base-v2",
model_kwargs={'device': 'cpu'}
)
# Initialize FAISS
dimension = len(hf_embeddings.embed_query("sample text"))
index = faiss.IndexFlatL2(dimension)
vector_store = FAISS(
embedding_function=hf_embeddings,
index=index,
docstore=InMemoryDocstore({}),
index_to_docstore_id={}
)
# Add texts to vector store
uuids = [str(uuid.uuid4()) for _ in range(len(texts))]
vector_store.add_texts(texts, ids=uuids)
# Save vector store locally
vector_store.save_local("vectorstore/faiss_index")
return vector_store
def answer_question(vectorstore, query):
llm = HuggingFaceHub(
repo_id="mistralai/Mistral-7B-Instruct-v0.1",
model_kwargs={"temperature": 0.7, "max_length": 512},
huggingfacehub_api_token=HUGGINGFACEHUB_API_TOKEN
)
retriever = vectorstore.as_retriever(search_kwargs={"k": 3})
prompt_template = PromptTemplate(
template="Use the provided context to answer the question concisely:\n\nContext: {context}\n\nQuestion: {question}\n\nAnswer:",
input_variables=["context", "question"]
)
qa_chain = RetrievalQA.from_chain_type(
llm=llm,
chain_type="stuff",
retriever=retriever,
return_source_documents=False,
chain_type_kwargs={"prompt": prompt_template}
)
result = qa_chain({"query": query})
return result["result"].split("Answer:")[-1].strip()
if __name__ == "__main__":
main()