MaheshP98's picture
Update app.py
83ce7a6 verified
raw
history blame
13.6 kB
import gradio as gr
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.neighbors import LocalOutlierFactor
from datetime import datetime, timedelta
import os
import logging
from reportlab.lib.pagesizes import letter
from reportlab.pdfgen import canvas
import tempfile
# Configure logging to match the log format
logging.basicConfig(level=logging.INFO, format='%(asctime)s,%(msecs)03d - %(levelname)s - %(message)s')
def validate_csv(df):
"""
Validate that the CSV has the required columns.
Returns True if valid, False otherwise with an error message.
"""
required_columns = ['equipment', 'usage_count', 'status', 'amc_expiry']
missing_columns = [col for col in required_columns if col not in df.columns]
if missing_columns:
return False, f"Missing required columns: {', '.join(missing_columns)}"
# Validate data types
try:
df['usage_count'] = pd.to_numeric(df['usage_count'], errors='raise')
df['amc_expiry'] = pd.to_datetime(df['amc_expiry'], errors='raise')
except Exception as e:
return False, f"Invalid data types: {str(e)}"
return True, ""
def generate_summary(combined_df, anomaly_df, amc_df, plot_path, pdf_path):
"""
Generate a detailed summary of the processing results.
Returns a markdown string for display in the Gradio interface.
"""
summary = ["## Processing Summary\n"]
# Total records and devices
total_records = len(combined_df)
unique_devices = combined_df['equipment'].unique()
summary.append(f"- **Total Records Processed**: {total_records}")
summary.append(f"- **Unique Devices**: {len(unique_devices)} ({', '.join(unique_devices)})\n")
# Anomalies
if anomaly_df is not None:
num_anomalies = sum(anomaly_df['anomaly'] == -1)
summary.append(f"- **Anomalies Detected**: {num_anomalies}")
if num_anomalies > 0:
anomaly_records = anomaly_df[anomaly_df['anomaly'] == -1][['equipment', 'usage_count', 'status']]
summary.append(" **Anomalous Devices**:")
for _, row in anomaly_records.iterrows():
summary.append(f" - {row['equipment']} (Usage: {row['usage_count']}, Status: {row['status']})")
else:
summary.append(" No anomalies detected.")
else:
summary.append("- **Anomalies Detected**: Failed to detect anomalies.")
summary.append("\n")
# AMC Expiries
if amc_df is not None and not amc_df.empty:
unique_devices_amc = amc_df['equipment'].unique()
summary.append(f"- **Devices with Upcoming AMC Expiries (within 7 days)**: {len(unique_devices_amc)}")
summary.append(" **Details**:")
for _, row in amc_df.iterrows():
summary.append(f" - {row['equipment']}: {row['amc_expiry'].strftime('%Y-%m-%d')}")
else:
summary.append("- **Devices with Upcoming AMC Expiries**: None")
summary.append("\n")
# Plot and PDF
summary.append("- **Usage Plot**: " + ("Generated successfully." if plot_path else "Failed to generate."))
summary.append("- **PDF Report**: " + ("Available for download." if pdf_path else "Not generated."))
return "\n".join(summary)
def process_files(uploaded_files):
"""
Process uploaded CSV files, generate usage plots, detect anomalies, and process AMC expiries.
Returns a dataframe, plot path, PDF path, AMC expiry message, and summary.
"""
# Log received files
logging.info(f"Received uploaded files: {uploaded_files}")
if not uploaded_files:
logging.warning("No files uploaded.")
return None, None, None, "Please upload at least one valid CSV file.", "No files uploaded."
valid_files = [f for f in uploaded_files if f.name.endswith('.csv')]
logging.info(f"Processing {len(valid_files)} valid files: {valid_files}")
if not valid_files:
logging.warning("No valid CSV files uploaded.")
return None, None, None, "Please upload at least one valid CSV file.", "No valid CSV files uploaded."
logging.info("Loading logs from uploaded files...")
all_data = []
# Load and combine CSV files
for file in valid_files:
try:
df = pd.read_csv(file.name)
logging.info(f"Loaded {len(df)} records from {file.name}")
# Validate CSV structure
is_valid, error_msg = validate_csv(df)
if not is_valid:
logging.error(f"Failed to load {file.name}: {error_msg}")
return None, None, None, f"Error loading {file.name}: {error_msg}", f"Error: {error_msg}"
all_data.append(df)
except Exception as e:
logging.error(f"Failed to load {file.name}: {str(e)}")
return None, None, None, f"Error loading {file.name}: {str(e)}", f"Error: {str(e)}"
if not all_data:
logging.warning("No data loaded from uploaded files.")
return None, None, None, "No valid data found in uploaded files.", "No data loaded."
combined_df = pd.concat(all_data, ignore_index=True)
logging.info(f"Combined {len(combined_df)} total records.")
logging.info(f"Loaded {len(combined_df)} log records from uploaded files.")
# Generate usage plot
logging.info("Generating usage plot...")
plot_path = generate_usage_plot(combined_df)
if plot_path:
logging.info("Usage plot generated successfully.")
else:
logging.error("Failed to generate usage plot.")
return combined_df, None, None, "Failed to generate usage plot.", "Usage plot generation failed."
# Detect anomalies using Local Outlier Factor
logging.info("Detecting anomalies using Local Outlier Factor...")
anomaly_df = detect_anomalies(combined_df)
if anomaly_df is None:
logging.error("Failed to detect anomalies.")
else:
logging.info(f"Detected {sum(anomaly_df['anomaly'] == -1)} anomalies using Local Outlier Factor.")
# Process AMC expiries
logging.info("Processing AMC expiries...")
amc_message, amc_df = process_amc_expiries(combined_df)
# Generate PDF report
pdf_path = generate_pdf_report(combined_df, anomaly_df, amc_df)
# Generate summary
logging.info("Generating summary of results...")
summary = generate_summary(combined_df, anomaly_df, amc_df, plot_path, pdf_path)
logging.info("Summary generated successfully.")
# Prepare output dataframe (combine original data with anomalies)
output_df = combined_df.copy()
if anomaly_df is not None:
output_df['anomaly'] = anomaly_df['anomaly'].map({1: "Normal", -1: "Anomaly"})
return output_df, plot_path, pdf_path, amc_message, summary
def generate_usage_plot(df):
"""
Generate a bar plot of usage_count by equipment and status.
Returns the path to the saved plot.
"""
try:
plt.figure(figsize=(12, 6))
# Define colors for statuses
status_colors = {'Active': '#36A2EB', 'Inactive': '#FF6384', 'Down': '#FFCE56', 'Online': '#4BC0C0'}
for status in df['status'].unique():
subset = df[df['status'] == status]
plt.bar(
subset['equipment'] + f" ({status})",
subset['usage_count'],
label=status,
color=status_colors.get(status, '#999999')
)
plt.xlabel("Equipment (Status)", fontsize=12)
plt.ylabel("Usage Count", fontsize=12)
plt.title("Usage Count by Equipment and Status", fontsize=14)
plt.legend(title="Status")
plt.xticks(rotation=45, ha='right')
plt.tight_layout()
# Save plot to temporary file
with tempfile.NamedTemporaryFile(delete=False, suffix='.png') as tmp:
plt.savefig(tmp.name, format='png', dpi=100)
plot_path = tmp.name
plt.close()
return plot_path
except Exception as e:
logging.error(f"Failed to generate usage plot: {str(e)}")
return None
def detect_anomalies(df):
"""
Detect anomalies in usage_count using Local Outlier Factor.
Returns a dataframe with an 'anomaly' column (-1 for anomalies, 1 for normal).
"""
try:
model = LocalOutlierFactor(n_neighbors=5, contamination=0.1)
anomalies = model.fit_predict(df[['usage_count']].values)
anomaly_df = df.copy()
anomaly_df['anomaly'] = anomalies
return anomaly_df
except Exception as e:
logging.error(f"Failed to detect anomalies: {str(e)}")
return None
def process_amc_expiries(df):
"""
Identify devices with AMC expiries within 7 days from 2025-06-05.
Returns a message and a dataframe of devices with upcoming expiries.
"""
try:
current_date = datetime(2025, 6, 5)
threshold = current_date + timedelta(days=7)
df['amc_expiry'] = pd.to_datetime(df['amc_expiry'])
upcoming_expiries = df[df['amc_expiry'] <= threshold]
unique_devices = upcoming_expiries['equipment'].unique()
message = f"Found {len(unique_devices)} devices with upcoming AMC expiries: {', '.join(unique_devices)}. Details: " + "; ".join(
[f"{row['equipment']}: {row['amc_expiry'].strftime('%Y-%m-%d')}" for _, row in upcoming_expiries.iterrows()]
)
logging.info(f"Found {len(unique_devices)} devices with upcoming AMC expiries.")
return message, upcoming_expiries
except Exception as e:
logging.error(f"Failed to process AMC expiries: {str(e)}")
return f"Error processing AMC expiries: {str(e)}", None
def generate_pdf_report(original_df, anomaly_df, amc_df):
"""
Generate a PDF report with data summary, anomalies, and AMC expiries.
Returns the path to the saved PDF.
"""
try:
if original_df is None or original_df.empty:
logging.warning("No data available for PDF generation.")
return None
with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as tmp:
c = canvas.Canvas(tmp.name, pagesize=letter)
c.setFont("Helvetica-Bold", 16)
c.drawString(100, 750, "Equipment Log Analysis Report")
c.setFont("Helvetica", 12)
y = 720
# Report generated timestamp
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
c.drawString(100, y, f"Generated on: {current_time}")
y -= 30
# Summary
c.drawString(100, y, "Summary")
y -= 20
c.drawString(100, y, f"Total Records: {len(original_df)}")
y -= 20
c.drawString(100, y, f"Unique Devices: {', '.join(original_df['equipment'].unique())}")
y -= 40
# Anomalies
c.drawString(100, y, "Anomaly Detection Results (Using Local Outlier Factor)")
y -= 20
if anomaly_df is not None:
num_anomalies = sum(anomaly_df['anomaly'] == -1)
c.drawString(100, y, f"Anomalies Detected: {num_anomalies}")
y -= 20
if num_anomalies > 0:
anomaly_records = anomaly_df[anomaly_df['anomaly'] == -1][['equipment', 'usage_count', 'status']]
c.drawString(100, y, "Anomalous Records:")
y -= 20
for _, row in anomaly_records.iterrows():
c.drawString(100, y, f"{row['equipment']}: Usage Count = {row['usage_count']}, Status = {row['status']}")
y -= 20
if y < 50:
c.showPage()
y = 750
c.setFont("Helvetica", 12)
else:
c.drawString(100, y, "Anomaly detection failed.")
y -= 20
y -= 20
# AMC Expiries
c.drawString(100, y, "AMC Expiries Within 7 Days (as of 2025-06-05)")
y -= 20
if amc_df is not None and not amc_df.empty:
c.drawString(100, y, f"Devices with Upcoming AMC Expiries: {len(amc_df['equipment'].unique())}")
y -= 20
for _, row in amc_df.iterrows():
c.drawString(100, y, f"{row['equipment']}: {row['amc_expiry'].strftime('%Y-%m-%d')}")
y -= 20
if y < 50:
c.showPage()
y = 750
c.setFont("Helvetica", 12)
else:
c.drawString(100, y, "No AMC expiry data available.")
y -= 20
c.showPage()
c.save()
return tmp.name
except Exception as e:
logging.error(f"Failed to generate PDF report: {str(e)}")
return None
# Gradio interface
with gr.Blocks() as demo:
gr.Markdown("# Equipment Log Analysis")
with gr.Row():
file_input = gr.File(file_count="multiple", label="Upload CSV Files")
process_button = gr.Button("Process Files")
with gr.Row():
output_df = gr.Dataframe(label="Processed Data")
output_plot = gr.Image(label="Usage Plot")
with gr.Row():
output_message = gr.Textbox(label="AMC Expiry Status")
output_pdf = gr.File(label="Download PDF Report")
with gr.Row():
output_summary = gr.Markdown(label="Summary of Results")
process_button.click(
fn=process_files,
inputs=[file_input],
outputs=[output_df, output_plot, output_pdf, output_message, output_summary]
)
if __name__ == "__main__":
logging.info("Application starting...")
demo.launch(server_name="0.0.0.0", server_port=7860)