import streamlit as st import requests import re from bs4 import BeautifulSoup from langchain_text_splitters import RecursiveCharacterTextSplitter from langchain.docstore.document import Document import chromadb from sentence_transformers import SentenceTransformer import google.generativeai as genai import uuid # Page configuration st.set_page_config(layout="wide") # Initialize Gemini API genai.configure(api_key="AIzaSyAxUd2tS-qj9C7frYuHRsv92tziXHgIvLo") # Initialize ChromaDB CHROMA_PATH = "chroma_db" chroma_client = chromadb.PersistentClient(path=CHROMA_PATH) # Initialize session state to track if scraping is complete and collection name if 'scraped' not in st.session_state: st.session_state.scraped = False if 'collection_name' not in st.session_state: st.session_state.collection_name = "default_collection" if 'chat_history' not in st.session_state: st.session_state.chat_history = [] # Initialize embedding model embedding_model = SentenceTransformer("all-MiniLM-L6-v2") def clean_text(text): text = re.sub(r'http\S+', '', text) text = re.sub(r'\s+', ' ', text).strip() return text def split_content_into_chunks(content): text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200, length_function=len) documents = [Document(page_content=content)] return text_splitter.split_documents(documents) def add_chunks_to_db(chunks, collection_name): # Create or get collection collection = chroma_client.get_or_create_collection(name=collection_name) documents = [chunk.page_content for chunk in chunks] ids = [f"ID{i}" for i in range(len(chunks))] embeddings = embedding_model.encode(documents, convert_to_list=True) collection.upsert(documents=documents, ids=ids, embeddings=embeddings) def scrape_text(url): try: response = requests.get(url) response.raise_for_status() soup = BeautifulSoup(response.text, 'html.parser') # Extract domain for collection name collection_name = st.session_state.collection_name text = clean_text(soup.get_text()) chunks = split_content_into_chunks(text) add_chunks_to_db(chunks, collection_name) # Set scraped state to True st.session_state.scraped = True return "Scraping and processing complete. You can now ask questions!" except requests.exceptions.RequestException as e: return f"Error scraping {url}: {e}" def ask_question(query, collection_name): # Get the collection collection = chroma_client.get_or_create_collection(name=collection_name) query_embedding = embedding_model.encode(query, convert_to_list=True) results = collection.query(query_embeddings=[query_embedding], n_results=2) top_chunks = results.get("documents", [[]])[0] system_prompt = f""" You are a helpful assistant. You answer questions based on the provided context. Only answer based on the knowledge I'm providing you. Don't use your internal knowledge and don't make things up. If you don't know the answer based on the provided context, just say: "I don't have enough information to answer that question based on the scraped content." Context information: {str(top_chunks)} """ full_prompt = system_prompt + "\nUser Query: " + query model = genai.GenerativeModel('gemini-2.0-flash') response = model.generate_content(full_prompt) return response.text # Create two columns: sidebar for database and main content col1, main_col = st.columns([1, 3]) # Database management sidebar with col1: st.header("Database Management") # List available collections try: # Fix for ChromaDB v0.6.0 - list_collections() now returns only names collection_names = chroma_client.list_collections() if collection_names: st.write("Available data collections:") selected_collection = st.selectbox("Select a collection to query:", collection_names) if selected_collection and st.button("Load Selected Collection"): st.session_state.collection_name = selected_collection st.session_state.scraped = True st.success(f"Loaded collection: {selected_collection}") st.rerun() except Exception as e: st.error(f"Error: {str(e)}") # Add a button to clear the session and start over if st.button("Clear Chat History"): st.session_state.chat_history = [] st.rerun() # Scraping section st.header("Step 1: Scrape a Website") url = st.text_input("Enter the URL to scrape:") if url: if st.button("Scrape & Process"): with st.spinner("Scraping and processing content..."): result = scrape_text(url) st.success(result) # Main content area with main_col: st.title("Web Scraper & Q&A Chatbot") # Use a container with custom CSS for the scrollable chat area chat_container = st.container() # Apply custom CSS for the chat container st.markdown(""" """, unsafe_allow_html=True) # Q&A section - only appears after scraping is complete if st.session_state.scraped: st.subheader("Step 2: Ask Questions About the Scraped Content") # Use a div with our custom class for the scrollable area st.markdown('
', unsafe_allow_html=True) # Display chat history for message in st.session_state.chat_history: with chat_container.chat_message(message["role"]): st.write(message["content"]) st.markdown('
', unsafe_allow_html=True) # Input for new question - always at the bottom user_query = st.chat_input("Ask your question here") if user_query: # Add user question to chat history st.session_state.chat_history.append({"role": "user", "content": user_query}) # Get answer with st.spinner("Searching database..."): answer = ask_question(user_query, st.session_state.collection_name) # Add answer to chat history st.session_state.chat_history.append({"role": "assistant", "content": answer}) # Rerun to update the UI with new messages st.rerun() else: st.info("Please scrape a website or load a collection to start chatting.")