"""Module that defines the TimeSeriesAnalyzer object.""" import os from dataclasses import dataclass from datetime import datetime from typing import Any import numpy as np import polars as pl from loguru import logger from sklearn.ensemble import IsolationForest from sqlalchemy import Engine, create_engine, text from data.get_mock import get_df @dataclass class TimeSeriesConfig: """Object with the database connections details. Attributes: host: address of the database port: port of the database database: name of the database username: username of the database password: password of the database """ host: str port: int database: str username: str password: str class TimeSeriesAnalyzer: """Handle connections details, and how to compute insights. Attributes: use_mock_db: if True, databased if mocked. connection: connection engine """ def __init__(self): self.use_mock_db = os.getenv("USE_MOCK_DB", True) if not self.use_mock_db: self.config = TimeSeriesConfig( host=os.getenv("DB_HOST", "localhost"), port=int(os.getenv("DB_PORT", 5432)), database=os.getenv("DB_NAME", "data"), username=os.getenv("DB_USER", "postgres"), password=os.getenv("DB_PASS", "secretpassword"), ) self.connection: Engine def connect(self): """Instantiate the database engine.""" if self.use_mock_db: logger.info("Connecting to mock SQLite database") self.connection = create_engine("sqlite:///mock.db") self._setup_mock_db() else: logger.info( f"Connecting to TimescaleDB at {self.config.host}:{self.config.port}" ) self.connection = create_engine( f"postgresql+psycopg2://{self.config.username}:{self.config.password}@{self.config.host}:{self.config.port}/{self.config.database}" ) logger.info("Connected to database!") def _setup_mock_db(self): df = get_df() if os.path.exists("./mock.db"): return logger.info( f"""df shape: {df.shape}, size: {df.estimated_size("kb"):,.3f}kb""" ) logger.debug(df.head(5)) with self.connection.connect() as conn: df.write_database( "timeseries_data", conn, if_table_exists="replace", engine_options={}, ) def _ensure_connected(self): if not self.connection: self.connect() def get_available_metrics(self) -> list[str]: """Get the list of sensor_id. Returns: list of sensors """ self._ensure_connected() sql = "SELECT DISTINCT sensor_id FROM timeseries_data ORDER BY sensor_id ASC" with self.connection.connect() as conn: rows = conn.execute(text(sql)) return [r[0] for r in rows] def query_metrics( self, sensor_id: str, start_time: str, end_time: str, ) -> pl.DataFrame: """Run a select query of 1 time serie. Args: sensor_id: id of the sensor start_time: iso datetime end_time: iso datetime Returns: The expected time serie as a polar DataFrame. """ self._ensure_connected() start_dt = datetime.fromisoformat(start_time) end_dt = datetime.fromisoformat(end_time) query = f"""SELECT timestamp, value FROM timeseries_data WHERE sensor_id = '{sensor_id}' AND timestamp >= '{start_dt}' AND timestamp <= '{end_dt}' ORDER BY timestamp ASC""" with self.connection.connect() as conn: df = pl.read_database(query, conn) return df def detect_anomalies( self, data: pl.DataFrame, threshold: float = 1.0 ) -> dict[str, Any]: """Detect anomalies in the time series data for a specific sensor. Args: data: expect only 1 timeserie with columns datetime and value threshold: default to 1.0 Returns: { "anomalies_found": int, "anomalies": list[dict[str, int]], "statistics": { "mean": float, "std": float, "min": float, "max": float }, """ mean_val = data["value"].mean() std_val = data["value"].std() data = data.with_columns( ((data["value"] - mean_val).abs() / std_val).alias("z_score") ) anomalies = ( data.filter(data["z_score"] > threshold) .select( [ data["timestamp"].cast(pl.Utf8).alias("timestamp"), data["value"].cast(pl.Float64), data["z_score"].cast(pl.Float64).alias("z_score"), (data["z_score"] > 2.0) .cast(pl.Utf8) .alias("severity") .map_elements( lambda x: "high" if x else "medium", return_dtype=pl.String, ), ] ) .to_dicts() ) return { "anomalies_found": len(anomalies), "anomalies": anomalies, "statistics": { "mean": mean_val, "std": std_val, "min": data["value"].min(), "max": data["value"].max(), }, } def calculate_trends(self, data: pl.DataFrame) -> dict[str, Any]: """Calculate trend information such as trend direction and percentage change. Args: data: expect only 1 timeserie with columns datetime and value Returns: { "trend_direction": Literal["increasing", "decreasing", "stable"], "trend_slope": float, "percentage_change": float, "start_value": float, "end_value": float, "time_period": { "start": datetime, "end": datetime, }, } """ values = data["value"] timestamps = data["timestamp"] x = np.arange(len(values)) coeffs = np.polyfit(x, values, 1) trend_slope = coeffs[0] pct_change = ( ((values[-1] - values[0]) / values[0]) * 100 if len(values) > 1 else 0 ) return { "trend_direction": "increasing" if trend_slope > 0 else "decreasing" if trend_slope < 0 else "stable", "trend_slope": float(trend_slope), "percentage_change": float(pct_change), "start_value": float(values[0]) if len(values) > 0 else 0, "end_value": float(values[-1]) if len(values) > 0 else 0, "time_period": { "start": timestamps[0] if len(timestamps) > 0 else None, "end": timestamps[-1] if len(timestamps) > 0 else None, }, } def detect_anomalies_isolation_forest( self, data: pl.DataFrame, contamination: float = 0.1 ) -> dict[str, Any]: """Detect anomalies in the time series data using Isolation Forest algorithm. Args: data: expect only 1 timeserie with columns datetime and value contamination: expected proportion of anomalies in the data (default: 0.1) Returns: { "anomalies_found": int, "anomalies": list[dict[str, int]], "statistics": { "mean": float, "std": float, "min": float, "max": float } """ values = data["value"].to_numpy().reshape(-1, 1) iso_forest = IsolationForest( contamination=contamination, random_state=42, n_estimators=100 ) # Predict anomalies (-1 for anomalies, 1 for normal) predictions = iso_forest.fit_predict(values) anomaly_scores = -iso_forest.score_samples(values) anomaly_mask = predictions == -1 mean_val = data["value"].mean() std_val = data["value"].std() logger.debug(f"anaomaly_mask: {anomaly_mask}") logger.debug(f"anomaly_scores: {anomaly_scores}") logger.debug( pl.Series(anomaly_scores) .filter(anomaly_mask) .alias("anomaly_score"), ) # Prepare anomalies data anomalies = ( data.select( data["timestamp"].cast(pl.Utf8).alias("timestamp"), data["value"].cast(pl.Float64), pl.Series(anomaly_scores).alias("anomaly_score"), pl.Series(anomaly_scores > np.percentile(anomaly_scores, 90)) .cast(pl.Utf8) .alias("severity") .map_elements( lambda x: "high" if x else "medium", return_dtype=pl.String, ), ) .filter(anomaly_mask) .to_dicts() ) logger.debug(f"anomalies: {anomalies}") return { "anomalies_found": len(anomalies), "anomalies": anomalies, "statistics": { "mean": mean_val, "std": std_val, "min": data["value"].min(), "max": data["value"].max(), }, }