|
import streamlit as st |
|
import requests |
|
import re |
|
from bs4 import BeautifulSoup |
|
from langchain_text_splitters import RecursiveCharacterTextSplitter |
|
from langchain.docstore.document import Document |
|
import chromadb |
|
from sentence_transformers import SentenceTransformer |
|
import google.generativeai as genai |
|
|
|
|
|
genai.configure(api_key="AIzaSyAxUd2tS-qj9C7frYuHRsv92tziXHgIvLo") |
|
|
|
|
|
CHROMA_PATH = "chroma_db" |
|
chroma_client = chromadb.PersistentClient(path=CHROMA_PATH) |
|
|
|
|
|
if 'scraped' not in st.session_state: |
|
st.session_state.scraped = False |
|
if 'collection_name' not in st.session_state: |
|
st.session_state.collection_name = "" |
|
|
|
|
|
embedding_model = SentenceTransformer("all-MiniLM-L6-v2") |
|
|
|
def clean_text(text): |
|
text = re.sub(r'http\S+', '', text) |
|
text = re.sub(r'\s+', ' ', text).strip() |
|
return text |
|
|
|
def split_content_into_chunks(content): |
|
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200, length_function=len) |
|
documents = [Document(page_content=content)] |
|
return text_splitter.split_documents(documents) |
|
|
|
def add_chunks_to_db(chunks, collection_name): |
|
|
|
collection = chroma_client.get_or_create_collection(name=collection_name) |
|
|
|
documents = [chunk.page_content for chunk in chunks] |
|
ids = [f"ID{i}" for i in range(len(chunks))] |
|
embeddings = embedding_model.encode(documents, convert_to_list=True) |
|
collection.upsert(documents=documents, ids=ids, embeddings=embeddings) |
|
|
|
def scrape_text(url, collection_name): |
|
try: |
|
response = requests.get(url) |
|
response.raise_for_status() |
|
soup = BeautifulSoup(response.text, 'html.parser') |
|
text = clean_text(soup.get_text()) |
|
chunks = split_content_into_chunks(text) |
|
add_chunks_to_db(chunks, collection_name) |
|
|
|
|
|
st.session_state.collection_name = collection_name |
|
st.session_state.scraped = True |
|
|
|
return "Scraping and processing complete. You can now ask questions!" |
|
except requests.exceptions.RequestException as e: |
|
return f"Error scraping {url}: {e}" |
|
|
|
def ask_question(query, collection_name): |
|
|
|
collection = chroma_client.get_collection(name=collection_name) |
|
|
|
query_embedding = embedding_model.encode(query, convert_to_list=True) |
|
results = collection.query(query_embeddings=[query_embedding], n_results=2) |
|
top_chunks = results.get("documents", [[]])[0] |
|
|
|
system_prompt = f""" |
|
You are a helpful assistant. You answer questions based on the provided context. |
|
Only answer based on the knowledge I'm providing you. Don't use your internal |
|
knowledge and don't make things up. |
|
If you don't know the answer based on the provided context, just say: "I don't have enough information to answer that question based on the scraped content." |
|
|
|
Context information: |
|
{str(top_chunks)} |
|
""" |
|
|
|
full_prompt = system_prompt + "\nUser Query: " + query |
|
model = genai.GenerativeModel('gemini-2.0-flash') |
|
response = model.generate_content(full_prompt) |
|
return response.text |
|
|
|
|
|
st.title("Web Scraper & Q&A Chatbot") |
|
|
|
|
|
with st.container(): |
|
st.subheader("Step 1: Scrape a Website") |
|
|
|
|
|
collection_name = st.text_input("Enter a name for this data collection:", |
|
value="my_collection", |
|
help="This will create a new database or use an existing one with this name") |
|
|
|
url = st.text_input("Enter the URL to scrape:") |
|
|
|
if url and collection_name: |
|
if st.button("Scrape & Process"): |
|
with st.spinner("Scraping and processing content..."): |
|
result = scrape_text(url, collection_name) |
|
st.success(result) |
|
|
|
|
|
if st.session_state.scraped: |
|
with st.container(): |
|
st.subheader("Step 2: Ask Questions About the Scraped Content") |
|
st.write(f"The database '{st.session_state.collection_name}' contains information scraped from the website. Ask a question:") |
|
|
|
|
|
if 'chat_history' not in st.session_state: |
|
st.session_state.chat_history = [] |
|
|
|
|
|
for message in st.session_state.chat_history: |
|
with st.chat_message(message["role"]): |
|
st.write(message["content"]) |
|
|
|
|
|
user_query = st.chat_input("Ask your question here") |
|
|
|
if user_query: |
|
|
|
st.session_state.chat_history.append({"role": "user", "content": user_query}) |
|
|
|
|
|
with st.chat_message("user"): |
|
st.write(user_query) |
|
|
|
|
|
with st.chat_message("assistant"): |
|
with st.spinner("Searching database..."): |
|
answer = ask_question(user_query, st.session_state.collection_name) |
|
st.write(answer) |
|
|
|
|
|
st.session_state.chat_history.append({"role": "assistant", "content": answer}) |
|
|
|
|
|
with st.sidebar: |
|
st.header("Database Management") |
|
|
|
|
|
try: |
|
all_collections = chroma_client.list_collections() |
|
collection_names = [collection.name for collection in all_collections] |
|
|
|
if collection_names: |
|
st.write("Available data collections:") |
|
selected_collection = st.selectbox("Select a collection to query:", collection_names) |
|
|
|
if selected_collection and st.button("Load Selected Collection"): |
|
st.session_state.collection_name = selected_collection |
|
st.session_state.scraped = True |
|
st.success(f"Loaded collection: {selected_collection}") |
|
st.rerun() |
|
except Exception as e: |
|
st.error(f"Error loading collections: {e}") |
|
|
|
|
|
if st.button("Clear Chat History"): |
|
st.session_state.chat_history = [] |
|
st.rerun() |