File size: 6,611 Bytes
c22f035
 
 
 
 
 
 
 
 
 
d78024f
c22f035
 
d78024f
c22f035
 
 
bd118ce
d78024f
 
bd118ce
 
 
 
 
d78024f
c22f035
 
 
 
 
 
 
 
 
 
bd118ce
 
 
 
c22f035
 
 
 
 
bd118ce
c22f035
 
 
 
 
 
bd118ce
 
 
 
d78024f
bd118ce
c22f035
 
 
 
bd118ce
 
 
 
c22f035
 
 
d78024f
bd118ce
 
 
 
 
 
 
 
 
d78024f
c22f035
 
 
 
 
d78024f
bd118ce
d78024f
 
 
bd118ce
 
 
 
 
 
d78024f
bd118ce
 
 
d78024f
 
bd118ce
d78024f
 
 
 
 
bd118ce
 
d78024f
 
 
 
 
 
 
 
 
 
 
bd118ce
d78024f
 
 
 
 
 
 
 
 
 
 
bd118ce
 
d78024f
 
 
 
c22f035
bd118ce
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
import streamlit as st
import requests
import re
from bs4 import BeautifulSoup
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain.docstore.document import Document
import chromadb
from sentence_transformers import SentenceTransformer
import google.generativeai as genai

# Initialize Gemini API
genai.configure(api_key="AIzaSyAxUd2tS-qj9C7frYuHRsv92tziXHgIvLo")

# Initialize ChromaDB
CHROMA_PATH = "chroma_db"
chroma_client = chromadb.PersistentClient(path=CHROMA_PATH)

# Initialize session state to track if scraping is complete and collection name
if 'scraped' not in st.session_state:
    st.session_state.scraped = False
if 'collection_name' not in st.session_state:
    st.session_state.collection_name = ""

# Initialize embedding model
embedding_model = SentenceTransformer("all-MiniLM-L6-v2")

def clean_text(text):
    text = re.sub(r'http\S+', '', text)
    text = re.sub(r'\s+', ' ', text).strip()
    return text

def split_content_into_chunks(content):
    text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200, length_function=len)
    documents = [Document(page_content=content)]
    return text_splitter.split_documents(documents)

def add_chunks_to_db(chunks, collection_name):
    # Create or get collection
    collection = chroma_client.get_or_create_collection(name=collection_name)
    
    documents = [chunk.page_content for chunk in chunks]
    ids = [f"ID{i}" for i in range(len(chunks))]
    embeddings = embedding_model.encode(documents, convert_to_list=True)
    collection.upsert(documents=documents, ids=ids, embeddings=embeddings)

def scrape_text(url, collection_name):
    try:
        response = requests.get(url)
        response.raise_for_status()
        soup = BeautifulSoup(response.text, 'html.parser')
        text = clean_text(soup.get_text())
        chunks = split_content_into_chunks(text)
        add_chunks_to_db(chunks, collection_name)
        
        # Store collection name and set scraped state to True
        st.session_state.collection_name = collection_name
        st.session_state.scraped = True
        
        return "Scraping and processing complete. You can now ask questions!"
    except requests.exceptions.RequestException as e:
        return f"Error scraping {url}: {e}"

def ask_question(query, collection_name):
    # Get the collection
    collection = chroma_client.get_collection(name=collection_name)
    
    query_embedding = embedding_model.encode(query, convert_to_list=True)
    results = collection.query(query_embeddings=[query_embedding], n_results=2)
    top_chunks = results.get("documents", [[]])[0]
    
    system_prompt = f"""
    You are a helpful assistant. You answer questions based on the provided context.
    Only answer based on the knowledge I'm providing you. Don't use your internal
    knowledge and don't make things up.
    If you don't know the answer based on the provided context, just say: "I don't have enough information to answer that question based on the scraped content."
    
    Context information:
    {str(top_chunks)}
    """
    
    full_prompt = system_prompt + "\nUser Query: " + query
    model = genai.GenerativeModel('gemini-2.0-flash')
    response = model.generate_content(full_prompt)
    return response.text

# Main UI
st.title("Web Scraper & Q&A Chatbot")

# Scraping section
with st.container():
    st.subheader("Step 1: Scrape a Website")
    
    # Let user create a new database or use existing one
    collection_name = st.text_input("Enter a name for this data collection:", 
                                    value="my_collection", 
                                    help="This will create a new database or use an existing one with this name")
    
    url = st.text_input("Enter the URL to scrape:")
    
    if url and collection_name:
        if st.button("Scrape & Process"):
            with st.spinner("Scraping and processing content..."):
                result = scrape_text(url, collection_name)
                st.success(result)

# Q&A section - only appears after scraping is complete
if st.session_state.scraped:
    with st.container():
        st.subheader("Step 2: Ask Questions About the Scraped Content")
        st.write(f"The database '{st.session_state.collection_name}' contains information scraped from the website. Ask a question:")
        
        # Chat history
        if 'chat_history' not in st.session_state:
            st.session_state.chat_history = []
            
        # Display chat history
        for message in st.session_state.chat_history:
            with st.chat_message(message["role"]):
                st.write(message["content"])
        
        # Input for new question
        user_query = st.chat_input("Ask your question here")
        
        if user_query:
            # Add user question to chat history
            st.session_state.chat_history.append({"role": "user", "content": user_query})
            
            # Display user question
            with st.chat_message("user"):
                st.write(user_query)
            
            # Get and display answer
            with st.chat_message("assistant"):
                with st.spinner("Searching database..."):
                    answer = ask_question(user_query, st.session_state.collection_name)
                    st.write(answer)
                    
            # Add answer to chat history
            st.session_state.chat_history.append({"role": "assistant", "content": answer})

# Selection of existing collections
with st.sidebar:
    st.header("Database Management")
    
    # List available collections
    try:
        all_collections = chroma_client.list_collections()
        collection_names = [collection.name for collection in all_collections]
        
        if collection_names:
            st.write("Available data collections:")
            selected_collection = st.selectbox("Select a collection to query:", collection_names)
            
            if selected_collection and st.button("Load Selected Collection"):
                st.session_state.collection_name = selected_collection
                st.session_state.scraped = True
                st.success(f"Loaded collection: {selected_collection}")
                st.rerun()  # Updated from experimental_rerun()
    except Exception as e:
        st.error(f"Error loading collections: {e}")
    
    # Add a button to clear the session and start over
    if st.button("Clear Chat History"):
        st.session_state.chat_history = []
        st.rerun()  # Updated from experimental_rerun()