|
import gradio as gr |
|
import pandas as pd |
|
import matplotlib.pyplot as plt |
|
from sklearn.neighbors import LocalOutlierFactor |
|
from datetime import datetime, timedelta |
|
import os |
|
import logging |
|
from reportlab.lib.pagesizes import letter |
|
from reportlab.pdfgen import canvas |
|
import tempfile |
|
|
|
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s,%(msecs)03d - %(levelname)s - %(message)s') |
|
|
|
def validate_csv(df): |
|
""" |
|
Validate that the CSV has the required columns. |
|
Returns True if valid, False otherwise with an error message. |
|
""" |
|
required_columns = ['equipment', 'usage_count', 'status', 'amc_expiry'] |
|
missing_columns = [col for col in required_columns if col not in df.columns] |
|
if missing_columns: |
|
return False, f"Missing required columns: {', '.join(missing_columns)}" |
|
|
|
try: |
|
df['usage_count'] = pd.to_numeric(df['usage_count'], errors='raise') |
|
df['amc_expiry'] = pd.to_datetime(df['amc_expiry'], errors='raise') |
|
except Exception as e: |
|
return False, f"Invalid data types: {str(e)}" |
|
return True, "" |
|
|
|
def generate_summary(combined_df, anomaly_df, amc_df, plot_path, pdf_path): |
|
""" |
|
Generate a detailed summary of the processing results. |
|
Returns a markdown string for display in the Gradio interface. |
|
""" |
|
summary = ["## Processing Summary\n"] |
|
|
|
|
|
total_records = len(combined_df) |
|
unique_devices = combined_df['equipment'].unique() |
|
summary.append(f"- **Total Records Processed**: {total_records}") |
|
summary.append(f"- **Unique Devices**: {len(unique_devices)} ({', '.join(unique_devices)})\n") |
|
|
|
|
|
if anomaly_df is not None: |
|
num_anomalies = sum(anomaly_df['anomaly'] == -1) |
|
summary.append(f"- **Anomalies Detected**: {num_anomalies}") |
|
if num_anomalies > 0: |
|
anomaly_records = anomaly_df[anomaly_df['anomaly'] == -1][['equipment', 'usage_count', 'status']] |
|
summary.append(" **Anomalous Devices**:") |
|
for _, row in anomaly_records.iterrows(): |
|
summary.append(f" - {row['equipment']} (Usage: {row['usage_count']}, Status: {row['status']})") |
|
else: |
|
summary.append(" No anomalies detected.") |
|
else: |
|
summary.append("- **Anomalies Detected**: Failed to detect anomalies.") |
|
summary.append("\n") |
|
|
|
|
|
if amc_df is not None and not amc_df.empty: |
|
unique_devices_amc = amc_df['equipment'].unique() |
|
summary.append(f"- **Devices with Upcoming AMC Expiries (within 7 days)**: {len(unique_devices_amc)}") |
|
summary.append(" **Details**:") |
|
for _, row in amc_df.iterrows(): |
|
summary.append(f" - {row['equipment']}: {row['amc_expiry'].strftime('%Y-%m-%d')}") |
|
else: |
|
summary.append("- **Devices with Upcoming AMC Expiries**: None") |
|
summary.append("\n") |
|
|
|
|
|
summary.append("- **Usage Plot**: " + ("Generated successfully." if plot_path else "Failed to generate.")) |
|
summary.append("- **PDF Report**: " + ("Available for download." if pdf_path else "Not generated.")) |
|
|
|
return "\n".join(summary) |
|
|
|
def process_files(uploaded_files): |
|
""" |
|
Process uploaded CSV files, generate usage plots, detect anomalies, and process AMC expiries. |
|
Returns a dataframe, plot path, PDF path, AMC expiry message, and summary. |
|
""" |
|
|
|
logging.info(f"Received uploaded files: {uploaded_files}") |
|
|
|
if not uploaded_files: |
|
logging.warning("No files uploaded.") |
|
return None, None, None, "Please upload at least one valid CSV file.", "No files uploaded." |
|
|
|
valid_files = [f for f in uploaded_files if f.name.endswith('.csv')] |
|
logging.info(f"Processing {len(valid_files)} valid files: {valid_files}") |
|
|
|
if not valid_files: |
|
logging.warning("No valid CSV files uploaded.") |
|
return None, None, None, "Please upload at least one valid CSV file.", "No valid CSV files uploaded." |
|
|
|
logging.info("Loading logs from uploaded files...") |
|
all_data = [] |
|
|
|
|
|
for file in valid_files: |
|
try: |
|
df = pd.read_csv(file.name) |
|
logging.info(f"Loaded {len(df)} records from {file.name}") |
|
|
|
is_valid, error_msg = validate_csv(df) |
|
if not is_valid: |
|
logging.error(f"Failed to load {file.name}: {error_msg}") |
|
return None, None, None, f"Error loading {file.name}: {error_msg}", f"Error: {error_msg}" |
|
all_data.append(df) |
|
except Exception as e: |
|
logging.error(f"Failed to load {file.name}: {str(e)}") |
|
return None, None, None, f"Error loading {file.name}: {str(e)}", f"Error: {str(e)}" |
|
|
|
if not all_data: |
|
logging.warning("No data loaded from uploaded files.") |
|
return None, None, None, "No valid data found in uploaded files.", "No data loaded." |
|
|
|
combined_df = pd.concat(all_data, ignore_index=True) |
|
logging.info(f"Combined {len(combined_df)} total records.") |
|
logging.info(f"Loaded {len(combined_df)} log records from uploaded files.") |
|
|
|
|
|
logging.info("Generating usage plot...") |
|
plot_path = generate_usage_plot(combined_df) |
|
if plot_path: |
|
logging.info("Usage plot generated successfully.") |
|
else: |
|
logging.error("Failed to generate usage plot.") |
|
return combined_df, None, None, "Failed to generate usage plot.", "Usage plot generation failed." |
|
|
|
|
|
logging.info("Detecting anomalies using Local Outlier Factor...") |
|
anomaly_df = detect_anomalies(combined_df) |
|
if anomaly_df is None: |
|
logging.error("Failed to detect anomalies.") |
|
else: |
|
logging.info(f"Detected {sum(anomaly_df['anomaly'] == -1)} anomalies using Local Outlier Factor.") |
|
|
|
|
|
logging.info("Processing AMC expiries...") |
|
amc_message, amc_df = process_amc_expiries(combined_df) |
|
|
|
|
|
pdf_path = generate_pdf_report(combined_df, anomaly_df, amc_df) |
|
|
|
|
|
logging.info("Generating summary of results...") |
|
summary = generate_summary(combined_df, anomaly_df, amc_df, plot_path, pdf_path) |
|
logging.info("Summary generated successfully.") |
|
|
|
|
|
output_df = combined_df.copy() |
|
if anomaly_df is not None: |
|
output_df['anomaly'] = anomaly_df['anomaly'].map({1: "Normal", -1: "Anomaly"}) |
|
|
|
return output_df, plot_path, pdf_path, amc_message, summary |
|
|
|
def generate_usage_plot(df): |
|
""" |
|
Generate a bar plot of usage_count by equipment and status. |
|
Returns the path to the saved plot. |
|
""" |
|
try: |
|
plt.figure(figsize=(12, 6)) |
|
|
|
status_colors = {'Active': '#36A2EB', 'Inactive': '#FF6384', 'Down': '#FFCE56', 'Online': '#4BC0C0'} |
|
for status in df['status'].unique(): |
|
subset = df[df['status'] == status] |
|
plt.bar( |
|
subset['equipment'] + f" ({status})", |
|
subset['usage_count'], |
|
label=status, |
|
color=status_colors.get(status, '#999999') |
|
) |
|
plt.xlabel("Equipment (Status)", fontsize=12) |
|
plt.ylabel("Usage Count", fontsize=12) |
|
plt.title("Usage Count by Equipment and Status", fontsize=14) |
|
plt.legend(title="Status") |
|
plt.xticks(rotation=45, ha='right') |
|
plt.tight_layout() |
|
|
|
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix='.png') as tmp: |
|
plt.savefig(tmp.name, format='png', dpi=100) |
|
plot_path = tmp.name |
|
plt.close() |
|
return plot_path |
|
except Exception as e: |
|
logging.error(f"Failed to generate usage plot: {str(e)}") |
|
return None |
|
|
|
def detect_anomalies(df): |
|
""" |
|
Detect anomalies in usage_count using Local Outlier Factor. |
|
Returns a dataframe with an 'anomaly' column (-1 for anomalies, 1 for normal). |
|
""" |
|
try: |
|
model = LocalOutlierFactor(n_neighbors=5, contamination=0.1) |
|
anomalies = model.fit_predict(df[['usage_count']].values) |
|
anomaly_df = df.copy() |
|
anomaly_df['anomaly'] = anomalies |
|
return anomaly_df |
|
except Exception as e: |
|
logging.error(f"Failed to detect anomalies: {str(e)}") |
|
return None |
|
|
|
def process_amc_expiries(df): |
|
""" |
|
Identify devices with AMC expiries within 7 days from 2025-06-05. |
|
Returns a message and a dataframe of devices with upcoming expiries. |
|
""" |
|
try: |
|
current_date = datetime(2025, 6, 5) |
|
threshold = current_date + timedelta(days=7) |
|
df['amc_expiry'] = pd.to_datetime(df['amc_expiry']) |
|
upcoming_expiries = df[df['amc_expiry'] <= threshold] |
|
unique_devices = upcoming_expiries['equipment'].unique() |
|
message = f"Found {len(unique_devices)} devices with upcoming AMC expiries: {', '.join(unique_devices)}. Details: " + "; ".join( |
|
[f"{row['equipment']}: {row['amc_expiry'].strftime('%Y-%m-%d')}" for _, row in upcoming_expiries.iterrows()] |
|
) |
|
logging.info(f"Found {len(unique_devices)} devices with upcoming AMC expiries.") |
|
return message, upcoming_expiries |
|
except Exception as e: |
|
logging.error(f"Failed to process AMC expiries: {str(e)}") |
|
return f"Error processing AMC expiries: {str(e)}", None |
|
|
|
def generate_pdf_report(original_df, anomaly_df, amc_df): |
|
""" |
|
Generate a PDF report with data summary, anomalies, and AMC expiries. |
|
Returns the path to the saved PDF. |
|
""" |
|
try: |
|
if original_df is None or original_df.empty: |
|
logging.warning("No data available for PDF generation.") |
|
return None |
|
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as tmp: |
|
c = canvas.Canvas(tmp.name, pagesize=letter) |
|
c.setFont("Helvetica-Bold", 16) |
|
c.drawString(100, 750, "Equipment Log Analysis Report") |
|
c.setFont("Helvetica", 12) |
|
y = 720 |
|
|
|
|
|
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") |
|
c.drawString(100, y, f"Generated on: {current_time}") |
|
y -= 30 |
|
|
|
|
|
c.drawString(100, y, "Summary") |
|
y -= 20 |
|
c.drawString(100, y, f"Total Records: {len(original_df)}") |
|
y -= 20 |
|
c.drawString(100, y, f"Unique Devices: {', '.join(original_df['equipment'].unique())}") |
|
y -= 40 |
|
|
|
|
|
c.drawString(100, y, "Anomaly Detection Results (Using Local Outlier Factor)") |
|
y -= 20 |
|
if anomaly_df is not None: |
|
num_anomalies = sum(anomaly_df['anomaly'] == -1) |
|
c.drawString(100, y, f"Anomalies Detected: {num_anomalies}") |
|
y -= 20 |
|
if num_anomalies > 0: |
|
anomaly_records = anomaly_df[anomaly_df['anomaly'] == -1][['equipment', 'usage_count', 'status']] |
|
c.drawString(100, y, "Anomalous Records:") |
|
y -= 20 |
|
for _, row in anomaly_records.iterrows(): |
|
c.drawString(100, y, f"{row['equipment']}: Usage Count = {row['usage_count']}, Status = {row['status']}") |
|
y -= 20 |
|
if y < 50: |
|
c.showPage() |
|
y = 750 |
|
c.setFont("Helvetica", 12) |
|
else: |
|
c.drawString(100, y, "Anomaly detection failed.") |
|
y -= 20 |
|
y -= 20 |
|
|
|
|
|
c.drawString(100, y, "AMC Expiries Within 7 Days (as of 2025-06-05)") |
|
y -= 20 |
|
if amc_df is not None and not amc_df.empty: |
|
c.drawString(100, y, f"Devices with Upcoming AMC Expiries: {len(amc_df['equipment'].unique())}") |
|
y -= 20 |
|
for _, row in amc_df.iterrows(): |
|
c.drawString(100, y, f"{row['equipment']}: {row['amc_expiry'].strftime('%Y-%m-%d')}") |
|
y -= 20 |
|
if y < 50: |
|
c.showPage() |
|
y = 750 |
|
c.setFont("Helvetica", 12) |
|
else: |
|
c.drawString(100, y, "No AMC expiry data available.") |
|
y -= 20 |
|
|
|
c.showPage() |
|
c.save() |
|
return tmp.name |
|
except Exception as e: |
|
logging.error(f"Failed to generate PDF report: {str(e)}") |
|
return None |
|
|
|
|
|
with gr.Blocks() as demo: |
|
gr.Markdown("# Equipment Log Analysis") |
|
with gr.Row(): |
|
file_input = gr.File(file_count="multiple", label="Upload CSV Files") |
|
process_button = gr.Button("Process Files") |
|
with gr.Row(): |
|
output_df = gr.Dataframe(label="Processed Data") |
|
output_plot = gr.Image(label="Usage Plot") |
|
with gr.Row(): |
|
output_message = gr.Textbox(label="AMC Expiry Status") |
|
output_pdf = gr.File(label="Download PDF Report") |
|
with gr.Row(): |
|
output_summary = gr.Markdown(label="Summary of Results") |
|
|
|
process_button.click( |
|
fn=process_files, |
|
inputs=[file_input], |
|
outputs=[output_df, output_plot, output_pdf, output_message, output_summary] |
|
) |
|
|
|
if __name__ == "__main__": |
|
logging.info("Application starting...") |
|
demo.launch(server_name="0.0.0.0", server_port=7860) |