Spaces:
Sleeping
Sleeping
import streamlit as st | |
from streamlit_option_menu import option_menu | |
import pandas as pd | |
import os | |
from google.oauth2 import service_account | |
from googleapiclient.discovery import build | |
from streamlit_chat import message as st_message | |
from langchain.schema import SystemMessage | |
from langchain_groq import ChatGroq | |
from dotenv import load_dotenv | |
import warnings | |
warnings.filterwarnings("ignore", category=DeprecationWarning) | |
# Load environment variables | |
load_dotenv() | |
GROQ_API_KEY = os.getenv("GROQ_API_KEY") | |
llm = ChatGroq(model="llama-3.1-70b-versatile") | |
PROMPT_TEMPLATE = """ | |
You are an expert information extraction assistant designed to obtain specific details from the web and external sources. | |
You’ll be provided with an entity name and a query that specifies the type of information needed about that entity. | |
Please follow the instructions carefully and return only the most relevant, accurate information. | |
#### Entity Name: {entity} | |
#### Query: {query} | |
Instructions: | |
1. Extract the information directly related to the entity. | |
2. If available, include only verified, publicly accessible data. | |
3. Provide information in a single sentence or a short, structured response. | |
4. If the requested information isn’t available or verifiable, respond with "Information not available." | |
Begin extraction. | |
""" | |
def get_llm_response(entity, query): | |
formatted_prompt = PROMPT_TEMPLATE.format(entity=entity, query=query) | |
response = llm([SystemMessage(content=formatted_prompt)]) | |
return response[0].content if response else "Information not available" | |
# Streamlit Setup | |
st.set_page_config(page_title="DataScribe", page_icon=":notebook_with_decorative_cover:", layout="wide") | |
# Sidebar Navigation | |
with st.sidebar: | |
selected = option_menu( | |
"DataScribe Menu", | |
["Home", "Upload Data", "Define Query", "Extract Information", "View & Download"], | |
icons=["house", "cloud-upload", "gear", "search", "table"], | |
menu_icon="cast", | |
default_index=0 | |
) | |
# Main header | |
st.title("DataScribe: AI-Powered Information Extractor") | |
# Initialize session states for data and results | |
if "data" not in st.session_state: | |
st.session_state["data"] = None | |
if "results" not in st.session_state: | |
st.session_state["results"] = None | |
if "column_selection" not in st.session_state: | |
st.session_state["column_selection"] = None | |
# Upload Data Section | |
if selected == "Upload Data": | |
st.header("Upload or Connect Your Data") | |
data_source = st.radio("Choose data source:", ["CSV File", "Google Sheets"]) | |
if data_source == "CSV File": | |
uploaded_file = st.file_uploader("Upload your CSV file", type=["csv"]) | |
if uploaded_file: | |
st.session_state["data"] = pd.read_csv(uploaded_file) | |
st.write("### Preview of Uploaded Data") | |
st.dataframe(st.session_state["data"].head()) | |
elif data_source == "Google Sheets": | |
sheet_id = st.text_input("Enter Google Sheet ID") | |
range_name = st.text_input("Enter the data range (e.g., Sheet1!A1:C100)") | |
if st.button("Fetch Data"): | |
if sheet_id and range_name: | |
st.session_state["data"] = get_google_sheet_data(sheet_id, range_name) | |
st.write("### Preview of Google Sheets Data") | |
st.dataframe(st.session_state["data"].head()) | |
else: | |
st.warning("Please enter both the Google Sheet ID and range.") | |
# Define Query Section | |
elif selected == "Define Query": | |
st.header("Define Your Custom Query") | |
if st.session_state["data"] is not None: | |
column_selection = st.selectbox("Select the primary column for entities", options=st.session_state["data"].columns) | |
query_template = st.text_input("Define your query template", "Get me the email for {company}") | |
st.session_state["query_template"] = query_template | |
st.session_state["column_selection"] = column_selection | |
st.write("### Example query preview") | |
if column_selection: | |
sample_entity = str(st.session_state["data"][column_selection].iloc[0]) | |
example_query = query_template.replace("{company}", sample_entity) | |
st.code(example_query) | |
else: | |
st.warning("Please upload data first.") | |
# Extract Information Section with Progress Bar | |
elif selected == "Extract Information": | |
st.header("Extract Information") | |
if st.session_state.get("query_template") and st.session_state["data"] is not None and st.session_state["column_selection"] is not None: | |
st.write("Data extraction is in progress. This may take a few moments.") | |
progress_bar = st.progress(0) | |
column_selection = st.session_state["column_selection"] | |
progress_step = 1.0 / len(st.session_state["data"][column_selection]) | |
results = [] | |
for i, entity in enumerate(st.session_state["data"][column_selection]): | |
user_message = st.session_state["query_template"].replace("{company}", str(entity)) | |
result_text = get_llm_response(entity, user_message) | |
results.append({"Entity": entity, "Extracted Information": result_text}) | |
progress_bar.progress((i + 1) * progress_step) | |
st.session_state["results"] = pd.DataFrame(results) | |
st.write("### Extracted Information") | |
st.dataframe(st.session_state["results"]) | |
# View & Download Section | |
elif selected == "View & Download": | |
st.header("View and Download Results") | |
if st.session_state["results"] is not None: | |
st.write("### Extracted Data Table") | |
st.dataframe(st.session_state["results"]) | |
csv_data = st.session_state["results"].to_csv(index=False) | |
st.download_button("Download as CSV", csv_data, "datascribe_results.csv", "text/csv") | |
else: | |
st.warning("No data available to view or download.") | |