RChaubey16's picture
Update app.py
8f4ddfa verified
raw
history blame
7 kB
import streamlit as st
import requests
import re
from bs4 import BeautifulSoup
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain.docstore.document import Document
import chromadb
from sentence_transformers import SentenceTransformer
import google.generativeai as genai
import uuid
# Page configuration
st.set_page_config(layout="wide")
# Initialize Gemini API
genai.configure(api_key="AIzaSyAxUd2tS-qj9C7frYuHRsv92tziXHgIvLo")
# Initialize ChromaDB
CHROMA_PATH = "chroma_db"
chroma_client = chromadb.PersistentClient(path=CHROMA_PATH)
# Initialize session state to track if scraping is complete and collection name
if 'scraped' not in st.session_state:
st.session_state.scraped = False
if 'collection_name' not in st.session_state:
st.session_state.collection_name = "default_collection"
if 'chat_history' not in st.session_state:
st.session_state.chat_history = []
# Initialize embedding model
embedding_model = SentenceTransformer("all-MiniLM-L6-v2")
def clean_text(text):
text = re.sub(r'http\S+', '', text)
text = re.sub(r'\s+', ' ', text).strip()
return text
def split_content_into_chunks(content):
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200, length_function=len)
documents = [Document(page_content=content)]
return text_splitter.split_documents(documents)
def add_chunks_to_db(chunks, collection_name):
# Create or get collection
collection = chroma_client.get_or_create_collection(name=collection_name)
documents = [chunk.page_content for chunk in chunks]
ids = [f"ID{i}" for i in range(len(chunks))]
embeddings = embedding_model.encode(documents, convert_to_list=True)
collection.upsert(documents=documents, ids=ids, embeddings=embeddings)
def scrape_text(url):
try:
response = requests.get(url)
response.raise_for_status()
soup = BeautifulSoup(response.text, 'html.parser')
# Extract domain for collection name
collection_name = st.session_state.collection_name
text = clean_text(soup.get_text())
chunks = split_content_into_chunks(text)
add_chunks_to_db(chunks, collection_name)
# Set scraped state to True
st.session_state.scraped = True
return "Scraping and processing complete. You can now ask questions!"
except requests.exceptions.RequestException as e:
return f"Error scraping {url}: {e}"
def ask_question(query, collection_name):
# Get the collection
collection = chroma_client.get_or_create_collection(name=collection_name)
query_embedding = embedding_model.encode(query, convert_to_list=True)
results = collection.query(query_embeddings=[query_embedding], n_results=2)
top_chunks = results.get("documents", [[]])[0]
system_prompt = f"""
You are a helpful assistant. You answer questions based on the provided context.
Only answer based on the knowledge I'm providing you. Don't use your internal
knowledge and don't make things up.
If you don't know the answer based on the provided context, just say: "I don't have enough information to answer that question based on the scraped content."
Context information:
{str(top_chunks)}
"""
full_prompt = system_prompt + "\nUser Query: " + query
model = genai.GenerativeModel('gemini-2.0-flash')
response = model.generate_content(full_prompt)
return response.text
# Create two columns: sidebar for database and main content
col1, main_col = st.columns([1, 3])
# Database management sidebar
with col1:
st.header("Database Management")
# List available collections
try:
# Fix for ChromaDB v0.6.0 - list_collections() now returns only names
collection_names = chroma_client.list_collections()
if collection_names:
st.write("Available data collections:")
selected_collection = st.selectbox("Select a collection to query:", collection_names)
if selected_collection and st.button("Load Selected Collection"):
st.session_state.collection_name = selected_collection
st.session_state.scraped = True
st.success(f"Loaded collection: {selected_collection}")
st.rerun()
except Exception as e:
st.error(f"Error: {str(e)}")
# Add a button to clear the session and start over
if st.button("Clear Chat History"):
st.session_state.chat_history = []
st.rerun()
# Scraping section
st.header("Step 1: Scrape a Website")
url = st.text_input("Enter the URL to scrape:")
if url:
if st.button("Scrape & Process"):
with st.spinner("Scraping and processing content..."):
result = scrape_text(url)
st.success(result)
# Main content area
with main_col:
st.title("Web Scraper & Q&A Chatbot")
# Use a container with custom CSS for the scrollable chat area
chat_container = st.container()
# Apply custom CSS for the chat container
st.markdown("""
<style>
.chat-container {
height: 500px;
overflow-y: auto;
border: 1px solid #ddd;
border-radius: 5px;
padding: 15px;
margin-bottom: 10px;
background-color: #f9f9f9;
}
.stChatInputContainer {
position: sticky;
bottom: 0;
background-color: white;
padding-top: 10px;
z-index: 100;
}
</style>
""", unsafe_allow_html=True)
# Q&A section - only appears after scraping is complete
if st.session_state.scraped:
st.subheader("Step 2: Ask Questions About the Scraped Content")
# Use a div with our custom class for the scrollable area
st.markdown('<div class="chat-container">', unsafe_allow_html=True)
# Display chat history
for message in st.session_state.chat_history:
with chat_container.chat_message(message["role"]):
st.write(message["content"])
st.markdown('</div>', unsafe_allow_html=True)
# Input for new question - always at the bottom
user_query = st.chat_input("Ask your question here")
if user_query:
# Add user question to chat history
st.session_state.chat_history.append({"role": "user", "content": user_query})
# Get answer
with st.spinner("Searching database..."):
answer = ask_question(user_query, st.session_state.collection_name)
# Add answer to chat history
st.session_state.chat_history.append({"role": "assistant", "content": answer})
# Rerun to update the UI with new messages
st.rerun()
else:
st.info("Please scrape a website or load a collection to start chatting.")