File size: 4,880 Bytes
c22f035
 
 
 
 
 
 
 
 
 
d78024f
c22f035
 
d78024f
c22f035
 
 
 
 
d78024f
 
 
 
c22f035
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d78024f
 
c22f035
 
 
 
 
 
 
 
d78024f
c22f035
 
 
 
 
 
d78024f
c22f035
 
 
 
 
d78024f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c22f035
d78024f
 
c22f035
d78024f
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
import streamlit as st
import requests
import re
from bs4 import BeautifulSoup
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain.docstore.document import Document
import chromadb
from sentence_transformers import SentenceTransformer
import google.generativeai as genai

# Initialize Gemini API
genai.configure(api_key="AIzaSyAxUd2tS-qj9C7frYuHRsv92tziXHgIvLo")

# Initialize ChromaDB
CHROMA_PATH = "chroma_db"
chroma_client = chromadb.PersistentClient(path=CHROMA_PATH)
collection = chroma_client.get_or_create_collection(name="formula_1")
embedding_model = SentenceTransformer("all-MiniLM-L6-v2")

# Initialize session state to track if scraping is complete
if 'scraped' not in st.session_state:
    st.session_state.scraped = False

def clean_text(text):
    text = re.sub(r'http\S+', '', text)
    text = re.sub(r'\s+', ' ', text).strip()
    return text

def split_content_into_chunks(content):
    text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200, length_function=len)
    documents = [Document(page_content=content)]
    return text_splitter.split_documents(documents)

def add_chunks_to_db(chunks):
    documents = [chunk.page_content for chunk in chunks]
    ids = [f"ID{i}" for i in range(len(chunks))]
    embeddings = embedding_model.encode(documents, convert_to_list=True)
    collection.upsert(documents=documents, ids=ids, embeddings=embeddings)

def scrape_text(url):
    try:
        response = requests.get(url)
        response.raise_for_status()
        soup = BeautifulSoup(response.text, 'html.parser')
        text = clean_text(soup.get_text())
        chunks = split_content_into_chunks(text)
        add_chunks_to_db(chunks)
        # Set scraped state to True when complete
        st.session_state.scraped = True
        return "Scraping and processing complete. You can now ask questions!"
    except requests.exceptions.RequestException as e:
        return f"Error scraping {url}: {e}"

def ask_question(query):
    query_embedding = embedding_model.encode(query, convert_to_list=True)
    results = collection.query(query_embeddings=[query_embedding], n_results=2)
    top_chunks = results.get("documents", [[]])[0]
    
    system_prompt = """
    You are a Formula 1 expert. You answer questions about Formula 1.
    But you only answer based on knowledge I'm providing you. You don't use your internal
    knowledge and you don't make things up.
    If you don't know the answer, just say: I don't know.
    """ + str(top_chunks)
    
    full_prompt = system_prompt + "\nUser Query: " + query
    model = genai.GenerativeModel('gemini-2.0-flash')
    response = model.generate_content(full_prompt)
    return response.text

# Main UI
st.title("Formula 1 Web Scraper & Chatbot")

# Scraping section
with st.container():
    st.subheader("Step 1: Scrape a Formula 1 Website")
    url = st.text_input("Enter a Formula 1 related URL:")
    
    if url:
        if st.button("Scrape & Process"):
            with st.spinner("Scraping and processing content..."):
                result = scrape_text(url)
                st.success(result)

# Q&A section - only appears after scraping is complete
if st.session_state.scraped:
    with st.container():
        st.subheader("Step 2: Ask Questions About Formula 1")
        st.write("The database contains information scraped from the website. Ask a question about Formula 1:")
        
        # Chat history
        if 'chat_history' not in st.session_state:
            st.session_state.chat_history = []
            
        # Display chat history
        for message in st.session_state.chat_history:
            with st.chat_message(message["role"]):
                st.write(message["content"])
        
        # Input for new question
        user_query = st.chat_input("Ask your Formula 1 question here")
        
        if user_query:
            # Add user question to chat history
            st.session_state.chat_history.append({"role": "user", "content": user_query})
            
            # Display user question
            with st.chat_message("user"):
                st.write(user_query)
            
            # Get and display answer
            with st.chat_message("assistant"):
                with st.spinner("Searching Formula 1 database..."):
                    answer = ask_question(user_query)
                    st.write(answer)
                    
            # Add answer to chat history
            st.session_state.chat_history.append({"role": "assistant", "content": answer})

else:
    st.info("Please scrape a Formula 1 website first to populate the database, then you can ask questions!")

# Add a button to clear the session and start over
if st.button("Clear Chat History and Data"):
    st.session_state.chat_history = []
    st.session_state.scraped = False
    st.experimental_rerun()