""" Utilities for task status management and logging in the ML suite. This module provides functions and classes for: - Managing task status files for long-running operations - Logging both to Flask app logs and status files - Standardized error handling for AI operations - Progress reporting for data preparation and model training These utilities ensure that AI operations have proper status tracking, enabling the frontend to display progress and results to users. """ import json import time import uuid import datetime import traceback import os import logging from typing import Dict, List, Optional, Union, Any, Tuple def get_current_timestamp() -> str: """Returns ISO format timestamp for current time.""" return datetime.datetime.now().isoformat() def get_current_timestamp_log_prefix() -> str: """Returns a formatted timestamp string for log entries.""" return f"[{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}]" def initialize_task_status_file(status_file_path: str, task_id: Optional[str] = None) -> Dict[str, Any]: """Initialize a new task status file with default values. Args: status_file_path: Path to the status file task_id: Optional unique ID for the task (UUID generated if None) Returns: Dict containing the initialized status data """ if task_id is None: task_id = str(uuid.uuid4()) status_data = { "status": "pending", "message": "Task initialized and pending execution", "progress": 0.0, "log": [], "task_id": task_id, "start_time": get_current_timestamp() } # Ensure the directory exists dir_path = os.path.dirname(status_file_path) if dir_path: # Only create directory if dirname returns a non-empty string os.makedirs(dir_path, exist_ok=True) with open(status_file_path, 'w') as f: json.dump(status_data, f, indent=2) return status_data def update_task_status( status_file_path: str, status: Optional[str] = None, message: Optional[str] = None, log_entry: Optional[str] = None, log_level: str = "info", progress: Optional[float] = None, result: Optional[Dict[str, Any]] = None, error: Optional[Dict[str, Any]] = None ) -> Dict[str, Any]: """Update a task status file with new information. Args: status_file_path: Path to the status file status: Optional new status ('pending', 'in_progress', 'completed', 'failed') message: Optional status message log_entry: Optional log message to add to the log list log_level: Log level ('info', 'warning', 'error', 'debug') progress: Optional progress value (0.0 to 1.0) result: Optional result data (for completed tasks) error: Optional error data (for failed tasks) Returns: Dict containing the updated status data """ try: with open(status_file_path, 'r') as f: status_data = json.load(f) except (FileNotFoundError, json.JSONDecodeError): # If file doesn't exist or is invalid, initialize it status_data = initialize_task_status_file(status_file_path) # Update fields if provided if status is not None: status_data["status"] = status # If status is completed or failed, set end_time if status in ("completed", "failed"): status_data["end_time"] = get_current_timestamp() if message is not None: status_data["message"] = message if log_entry is not None: if "log" not in status_data: status_data["log"] = [] status_data["log"].append({ "timestamp": get_current_timestamp(), "level": log_level, "message": log_entry }) if progress is not None: status_data["progress"] = max(0.0, min(1.0, float(progress))) # Ensure between 0-1 if result is not None and status_data.get("status") == "completed": status_data["result"] = result if error is not None and status_data.get("status") == "failed": status_data["error"] = error # Write updated status back to file with open(status_file_path, 'w') as f: json.dump(status_data, f, indent=2) return status_data def get_task_status(status_file_path: str) -> Dict[str, Any]: """Read and return the current task status. Args: status_file_path: Path to the status file Returns: Dict containing the current status data """ try: with open(status_file_path, 'r') as f: return json.load(f) except (FileNotFoundError, json.JSONDecodeError): # If status file doesn't exist or is invalid, return a default status return { "status": "unknown", "message": "Task status unknown or not initialized", "progress": 0.0, "log": [] } def log_task_error(status_file_path: str, error: Exception, message: str = "Task failed due to an error") -> Dict[str, Any]: """Log an error to the task status file. Args: status_file_path: Path to the status file error: The exception that occurred message: Human-readable error message Returns: Dict containing the updated status data """ error_info = { "type": error.__class__.__name__, "message": str(error), "traceback": traceback.format_exc() } return update_task_status( status_file_path=status_file_path, status="failed", message=message, log_entry=f"ERROR: {message} - {error}", log_level="error", error=error_info ) class AiTaskLogger: """Logger for AI tasks that updates both the application logger and task status file. This logger provides methods for: - Logging info, warning, and error messages - Updating task progress - Marking tasks as started, completed, or failed - Ensuring consistent logging across both Flask app logs and task status files """ def __init__(self, app_logger: logging.Logger, status_file_path: str, task_id: Optional[str] = None): """Initialize the task logger. Args: app_logger: Flask application logger status_file_path: Path to the task status file task_id: Optional unique ID for the task (UUID generated if None) """ self.app_logger = app_logger self.status_file_path = status_file_path self.task_id = task_id or str(uuid.uuid4()) self.short_task_id = self.task_id[:8] # First 8 chars for log readability # Initialize the status file initialize_task_status_file(status_file_path, self.task_id) def info(self, message: str, update_progress: Optional[float] = None) -> None: """Log an info message and optionally update progress. Args: message: The message to log update_progress: Optional progress value (0.0 to 1.0) """ self.app_logger.info(f"[AI Task {self.short_task_id}] {message}") update_task_status( self.status_file_path, log_entry=message, log_level="info", progress=update_progress ) def warning(self, message: str) -> None: """Log a warning message. Args: message: The warning message to log """ self.app_logger.warning(f"[AI Task {self.short_task_id}] {message}") update_task_status( self.status_file_path, log_entry=message, log_level="warning" ) def error(self, message: str, error: Optional[Exception] = None) -> None: """Log an error message and optionally the exception details. Args: message: The error message to log error: Optional exception that caused the error """ if error: self.app_logger.error(f"[AI Task {self.short_task_id}] {message}: {error}", exc_info=True) log_task_error(self.status_file_path, error, message) else: self.app_logger.error(f"[AI Task {self.short_task_id}] {message}") update_task_status( self.status_file_path, log_entry=message, log_level="error" ) def start_task(self, message: str = "Task started") -> None: """Mark the task as started. Args: message: Optional message describing the task start """ self.app_logger.info(f"[AI Task {self.short_task_id}] {message}") update_task_status( self.status_file_path, status="in_progress", message=message, log_entry=message, log_level="info", progress=0.0 ) def complete_task(self, message: str = "Task completed successfully", result: Optional[Dict[str, Any]] = None) -> None: """Mark the task as completed. Args: message: Optional completion message result: Optional result data to store """ self.app_logger.info(f"[AI Task {self.short_task_id}] {message}") update_task_status( self.status_file_path, status="completed", message=message, log_entry=message, log_level="info", progress=1.0, result=result ) def fail_task(self, message: str, error: Optional[Exception] = None) -> None: """Mark the task as failed. Args: message: Failure message error: Optional exception that caused the failure """ if error: self.app_logger.error(f"[AI Task {self.short_task_id}] {message}: {error}", exc_info=True) log_task_error(self.status_file_path, error, message) else: self.app_logger.error(f"[AI Task {self.short_task_id}] {message}") update_task_status( self.status_file_path, status="failed", message=message, log_entry=message, log_level="error" ) def update_progress(self, progress: float, message: Optional[str] = None) -> None: """Update the task progress. Args: progress: Progress value (0.0 to 1.0) message: Optional progress message """ if message: self.app_logger.info(f"[AI Task {self.short_task_id}] {message} (Progress: {progress:.1%})") update_task_status( self.status_file_path, message=message, log_entry=message, log_level="info", progress=progress ) else: update_task_status( self.status_file_path, progress=progress )