File size: 5,537 Bytes
6eaa3dc f6b4c75 6eaa3dc f6b4c75 6eaa3dc 684911e 6eaa3dc 684911e 6eaa3dc f6b4c75 684911e f6b4c75 684911e f6b4c75 684911e f6b4c75 684911e f6b4c75 684911e f6b4c75 684911e f6b4c75 684911e f6b4c75 684911e f6b4c75 684911e f6b4c75 684911e f6b4c75 684911e f6b4c75 684911e f6b4c75 684911e 6eaa3dc f6b4c75 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 |
import streamlit as st
import pandas as pd
import plotly.express as px
from datetime import datetime, timedelta
from simple_salesforce import Salesforce
from transformers import pipeline
from utils import fetch_salesforce_data, detect_anomalies, generate_pdf_report
# Streamlit app configuration
st.set_page_config(page_title="LabOps Dashboard", layout="wide")
# Cache Salesforce connection
@st.cache_resource
def init_salesforce():
try:
return Salesforce(
username=st.secrets["sf_username"],
password=st.secrets["sf_password"],
security_token=st.secrets["sf_security_token"]
)
except Exception as e:
st.error(f"Failed to connect to Salesforce: {e}")
return None
# Cache Hugging Face model
@st.cache_resource
def init_anomaly_detector():
return pipeline("text-classification", model="distilbert-base-uncased", tokenizer="distilbert-base-uncased")
# Initialize connections
sf = init_salesforce()
anomaly_detector = init_anomaly_detector()
# Cache data fetching
@st.cache_data(ttl=10) # Cache for 10 seconds to meet refresh requirement
def get_filtered_data(lab_site, equipment_type, date_start, date_end):
query = f"""
SELECT Equipment__c, Log_Timestamp__c, Status__c, Usage_Count__c, Lab__c, Equipment_Type__c
FROM SmartLog__c
WHERE Log_Timestamp__c >= {date_start.strftime('%Y-%m-%d')}
AND Log_Timestamp__c <= {date_end.strftime('%Y-%m-%d')}
"""
if lab_site != "All":
query += f" AND Lab__c = '{lab_site}'"
if equipment_type != "All":
query += f" AND Equipment_Type__c = '{equipment_type}'"
query += " LIMIT 1000" # Mitigate data overload
return fetch_salesforce_data(sf, query)
def main():
if sf is None:
st.error("Cannot proceed without Salesforce connection.")
return
st.title("Multi-Device LabOps Dashboard")
# Filters
col1, col2, col3 = st.columns(3)
with col1:
lab_site = st.selectbox("Select Lab Site", ["All", "Lab1", "Lab2", "Lab3"])
with col2:
equipment_type = st.selectbox("Equipment Type", ["All", "Cell Analyzer", "Weight Log", "UV Verification"])
with col3:
default_start = datetime.now() - timedelta(days=7)
default_end = datetime.now()
date_range = st.date_input("Date Range", [default_start, default_end])
# Validate date range
if len(date_range) != 2:
st.warning("Please select a valid date range.")
return
date_start, date_end = date_range
# Fetch and process data
data = get_filtered_data(lab_site, equipment_type, date_start, date_end)
if not data:
st.warning("No data available for the selected filters.")
return
df = pd.DataFrame(data)
df["Log_Timestamp__c"] = pd.to_datetime(df["Log_Timestamp__c"])
df["Anomaly"] = df["Status__c"].apply(lambda x: detect_anomalies(str(x), anomaly_detector))
# Pagination
page_size = 10
page = st.number_input("Page", min_value=1, value=1, step=1)
start_idx = (page - 1) * page_size
end_idx = start_idx + page_size
paginated_df = df[start_idx:end_idx]
# Device Cards
st.subheader("Device Status")
for _, row in paginated_df.iterrows():
anomaly = "⚠️ Anomaly" if row["Anomaly"] == "POSITIVE" else "✅ Normal"
st.markdown(f"""
**{row['Equipment__c']}** | Lab: {row['Lab__c']} | Health: {row['Status__c']} |
Usage: {row['Usage_Count__c']} | Last Log: {row['Log_Timestamp__c'].strftime('%Y-%m-%d %H:%M:%S')} | {anomaly}
""")
# Usage Chart
st.subheader("Usage Trends")
fig = px.line(
df,
x="Log_Timestamp__c",
y="Usage_Count__c",
color="Equipment__c",
title="Daily Usage Trends",
labels={"Log_Timestamp__c": "Timestamp", "Usage_Count__c": "Usage Count"}
)
fig.update_layout(xaxis_title="Timestamp", yaxis_title="Usage Count")
st.plotly_chart(fig, use_container_width=True)
# Downtime Chart
st.subheader("Downtime Patterns")
downtime_df = df[df["Status__c"] == "Down"]
if not downtime_df.empty:
fig_downtime = px.histogram(
downtime_df,
x="Log_Timestamp__c",
color="Equipment__c",
title="Downtime Patterns",
labels={"Log_Timestamp__c": "Timestamp"}
)
fig_downtime.update_layout(xaxis_title="Timestamp", yaxis_title="Downtime Count")
st.plotly_chart(fig_downtime, use_container_width=True)
else:
st.info("No downtime events found for the selected filters.")
# AMC Reminders
st.subheader("AMC Reminders")
amc_query = "SELECT Equipment__c, AMC_Expiry_Date__c FROM Equipment__c WHERE AMC_Expiry_Date__c <= NEXT_N_DAYS:14"
amc_data = fetch_salesforce_data(sf, amc_query, retries=3)
if amc_data:
for record in amc_data:
st.write(f"Equipment {record['Equipment__c']} - AMC Expiry: {record['AMC_Expiry_Date__c']}")
else:
st.info("No AMC expiries within the next 14 days.")
# Export PDF
if st.button("Export PDF Report"):
try:
pdf_file = generate_pdf_report(df, lab_site, equipment_type, [date_start, date_end])
with open(pdf_file, "rb") as f:
st.download_button("Download PDF", f, file_name="LabOps_Report.pdf", mime="application/pdf")
except Exception as e:
st.error(f"Failed to generate PDF: {e}")
if __name__ == "__main__":
main() |