import streamlit as st import requests import re from bs4 import BeautifulSoup from langchain_text_splitters import RecursiveCharacterTextSplitter from langchain.docstore.document import Document import chromadb from sentence_transformers import SentenceTransformer import google.generativeai as genai # Initialize Gemini API genai.configure(api_key="AIzaSyAxUd2tS-qj9C7frYuHRsv92tziXHgIvLo") # Initialize ChromaDB CHROMA_PATH = "chroma_db" chroma_client = chromadb.PersistentClient(path=CHROMA_PATH) # Initialize session state to track if scraping is complete and collection name if 'scraped' not in st.session_state: st.session_state.scraped = False if 'collection_name' not in st.session_state: st.session_state.collection_name = "" # Initialize embedding model embedding_model = SentenceTransformer("all-MiniLM-L6-v2") def clean_text(text): text = re.sub(r'http\S+', '', text) text = re.sub(r'\s+', ' ', text).strip() return text def split_content_into_chunks(content): text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200, length_function=len) documents = [Document(page_content=content)] return text_splitter.split_documents(documents) def add_chunks_to_db(chunks, collection_name): # Create or get collection collection = chroma_client.get_or_create_collection(name=collection_name) documents = [chunk.page_content for chunk in chunks] ids = [f"ID{i}" for i in range(len(chunks))] embeddings = embedding_model.encode(documents, convert_to_list=True) collection.upsert(documents=documents, ids=ids, embeddings=embeddings) def scrape_text(url, collection_name): try: response = requests.get(url) response.raise_for_status() soup = BeautifulSoup(response.text, 'html.parser') text = clean_text(soup.get_text()) chunks = split_content_into_chunks(text) add_chunks_to_db(chunks, collection_name) # Store collection name and set scraped state to True st.session_state.collection_name = collection_name st.session_state.scraped = True return "Scraping and processing complete. You can now ask questions!" except requests.exceptions.RequestException as e: return f"Error scraping {url}: {e}" def ask_question(query, collection_name): # Get the collection collection = chroma_client.get_collection(name=collection_name) query_embedding = embedding_model.encode(query, convert_to_list=True) results = collection.query(query_embeddings=[query_embedding], n_results=2) top_chunks = results.get("documents", [[]])[0] system_prompt = f""" You are a helpful assistant. You answer questions based on the provided context. Only answer based on the knowledge I'm providing you. Don't use your internal knowledge and don't make things up. If you don't know the answer based on the provided context, just say: "I don't have enough information to answer that question based on the scraped content." Context information: {str(top_chunks)} """ full_prompt = system_prompt + "\nUser Query: " + query model = genai.GenerativeModel('gemini-2.0-flash') response = model.generate_content(full_prompt) return response.text # Main UI st.title("Web Scraper & Q&A Chatbot") # Scraping section with st.container(): st.subheader("Step 1: Scrape a Website") # Let user create a new database or use existing one collection_name = st.text_input("Enter a name for this data collection:", value="my_collection", help="This will create a new database or use an existing one with this name") url = st.text_input("Enter the URL to scrape:") if url and collection_name: if st.button("Scrape & Process"): with st.spinner("Scraping and processing content..."): result = scrape_text(url, collection_name) st.success(result) # Q&A section - only appears after scraping is complete if st.session_state.scraped: with st.container(): st.subheader("Step 2: Ask Questions About the Scraped Content") st.write(f"The database '{st.session_state.collection_name}' contains information scraped from the website. Ask a question:") # Chat history if 'chat_history' not in st.session_state: st.session_state.chat_history = [] # Display chat history for message in st.session_state.chat_history: with st.chat_message(message["role"]): st.write(message["content"]) # Input for new question user_query = st.chat_input("Ask your question here") if user_query: # Add user question to chat history st.session_state.chat_history.append({"role": "user", "content": user_query}) # Display user question with st.chat_message("user"): st.write(user_query) # Get and display answer with st.chat_message("assistant"): with st.spinner("Searching database..."): answer = ask_question(user_query, st.session_state.collection_name) st.write(answer) # Add answer to chat history st.session_state.chat_history.append({"role": "assistant", "content": answer}) # Selection of existing collections with st.sidebar: st.header("Database Management") # List available collections try: all_collections = chroma_client.list_collections() collection_names = [collection.name for collection in all_collections] if collection_names: st.write("Available data collections:") selected_collection = st.selectbox("Select a collection to query:", collection_names) if selected_collection and st.button("Load Selected Collection"): st.session_state.collection_name = selected_collection st.session_state.scraped = True st.success(f"Loaded collection: {selected_collection}") st.rerun() # Updated from experimental_rerun() except Exception as e: st.error(f"Error loading collections: {e}") # Add a button to clear the session and start over if st.button("Clear Chat History"): st.session_state.chat_history = [] st.rerun() # Updated from experimental_rerun()