Spaces:
Runtime error
Runtime error
"""Module that defines the TimeSeriesAnalyzer object.""" | |
import os | |
from dataclasses import dataclass | |
from datetime import datetime | |
from typing import Any | |
import numpy as np | |
import polars as pl | |
from loguru import logger | |
from sklearn.ensemble import IsolationForest | |
from sqlalchemy import Engine, create_engine, text | |
from data.get_mock import get_df | |
class TimeSeriesConfig: | |
"""Object with the database connections details. | |
Attributes: | |
host: address of the database | |
port: port of the database | |
database: name of the database | |
username: username of the database | |
password: password of the database | |
""" | |
host: str | |
port: int | |
database: str | |
username: str | |
password: str | |
class TimeSeriesAnalyzer: | |
"""Handle connections details, and how to compute insights. | |
Attributes: | |
use_mock_db: if True, databased if mocked. | |
connection: connection engine | |
""" | |
def __init__(self): | |
self.use_mock_db = os.getenv("USE_MOCK_DB", True) | |
if not self.use_mock_db: | |
self.config = TimeSeriesConfig( | |
host=os.getenv("DB_HOST", "localhost"), | |
port=int(os.getenv("DB_PORT", 5432)), | |
database=os.getenv("DB_NAME", "data"), | |
username=os.getenv("DB_USER", "postgres"), | |
password=os.getenv("DB_PASS", "secretpassword"), | |
) | |
self.connection: Engine | |
def connect(self): | |
"""Instantiate the database engine.""" | |
if self.use_mock_db: | |
logger.info("Connecting to mock SQLite database") | |
self.connection = create_engine("sqlite:///mock.db") | |
self._setup_mock_db() | |
else: | |
logger.info( | |
f"Connecting to TimescaleDB at {self.config.host}:{self.config.port}" | |
) | |
self.connection = create_engine( | |
f"postgresql+psycopg2://{self.config.username}:{self.config.password}@{self.config.host}:{self.config.port}/{self.config.database}" | |
) | |
logger.info("Connected to database!") | |
def _setup_mock_db(self): | |
df = get_df() | |
if os.path.exists("./mock.db"): | |
return | |
logger.info( | |
f"""df shape: {df.shape}, size: {df.estimated_size("kb"):,.3f}kb""" | |
) | |
logger.debug(df.head(5)) | |
with self.connection.connect() as conn: | |
df.write_database( | |
"timeseries_data", | |
conn, | |
if_table_exists="replace", | |
engine_options={}, | |
) | |
def _ensure_connected(self): | |
if not self.connection: | |
self.connect() | |
def get_available_metrics(self) -> list[str]: | |
"""Get the list of sensor_id. | |
Returns: | |
list of sensors | |
""" | |
self._ensure_connected() | |
sql = "SELECT DISTINCT sensor_id FROM timeseries_data ORDER BY sensor_id ASC" | |
with self.connection.connect() as conn: | |
rows = conn.execute(text(sql)) | |
return [r[0] for r in rows] | |
def query_metrics( | |
self, | |
sensor_id: str, | |
start_time: str, | |
end_time: str, | |
) -> pl.DataFrame: | |
"""Run a select query of 1 time serie. | |
Args: | |
sensor_id: id of the sensor | |
start_time: iso datetime | |
end_time: iso datetime | |
Returns: | |
The expected time serie as a polar DataFrame. | |
""" | |
self._ensure_connected() | |
start_dt = datetime.fromisoformat(start_time) | |
end_dt = datetime.fromisoformat(end_time) | |
query = f"""SELECT timestamp, value FROM timeseries_data | |
WHERE sensor_id = '{sensor_id}' AND timestamp >= '{start_dt}' AND timestamp <= '{end_dt}' | |
ORDER BY timestamp ASC""" | |
with self.connection.connect() as conn: | |
df = pl.read_database(query, conn) | |
return df | |
def detect_anomalies( | |
self, data: pl.DataFrame, threshold: float = 1.0 | |
) -> dict[str, Any]: | |
"""Detect anomalies in the time series data for a specific sensor. | |
Args: | |
data: expect only 1 timeserie with columns datetime and value | |
threshold: default to 1.0 | |
Returns: | |
{ | |
"anomalies_found": int, | |
"anomalies": list[dict[str, int]], | |
"statistics": { | |
"mean": float, | |
"std": float, | |
"min": float, | |
"max": float | |
}, | |
""" | |
mean_val = data["value"].mean() | |
std_val = data["value"].std() | |
data = data.with_columns( | |
((data["value"] - mean_val).abs() / std_val).alias("z_score") | |
) | |
anomalies = ( | |
data.filter(data["z_score"] > threshold) | |
.select( | |
[ | |
data["timestamp"].cast(pl.Utf8).alias("timestamp"), | |
data["value"].cast(pl.Float64), | |
data["z_score"].cast(pl.Float64).alias("z_score"), | |
(data["z_score"] > 2.0) | |
.cast(pl.Utf8) | |
.alias("severity") | |
.map_elements( | |
lambda x: "high" if x else "medium", | |
return_dtype=pl.String, | |
), | |
] | |
) | |
.to_dicts() | |
) | |
return { | |
"anomalies_found": len(anomalies), | |
"anomalies": anomalies, | |
"statistics": { | |
"mean": mean_val, | |
"std": std_val, | |
"min": data["value"].min(), | |
"max": data["value"].max(), | |
}, | |
} | |
def calculate_trends(self, data: pl.DataFrame) -> dict[str, Any]: | |
"""Calculate trend information such as trend direction and percentage change. | |
Args: | |
data: expect only 1 timeserie with columns datetime and value | |
Returns: | |
{ | |
"trend_direction": Literal["increasing", "decreasing", "stable"], | |
"trend_slope": float, | |
"percentage_change": float, | |
"start_value": float, | |
"end_value": float, | |
"time_period": { | |
"start": datetime, | |
"end": datetime, | |
}, | |
} | |
""" | |
values = data["value"] | |
timestamps = data["timestamp"] | |
x = np.arange(len(values)) | |
coeffs = np.polyfit(x, values, 1) | |
trend_slope = coeffs[0] | |
pct_change = ( | |
((values[-1] - values[0]) / values[0]) * 100 | |
if len(values) > 1 | |
else 0 | |
) | |
return { | |
"trend_direction": "increasing" | |
if trend_slope > 0 | |
else "decreasing" | |
if trend_slope < 0 | |
else "stable", | |
"trend_slope": float(trend_slope), | |
"percentage_change": float(pct_change), | |
"start_value": float(values[0]) if len(values) > 0 else 0, | |
"end_value": float(values[-1]) if len(values) > 0 else 0, | |
"time_period": { | |
"start": timestamps[0] if len(timestamps) > 0 else None, | |
"end": timestamps[-1] if len(timestamps) > 0 else None, | |
}, | |
} | |
def detect_anomalies_isolation_forest( | |
self, data: pl.DataFrame, contamination: float = 0.1 | |
) -> dict[str, Any]: | |
"""Detect anomalies in the time series data using Isolation Forest algorithm. | |
Args: | |
data: expect only 1 timeserie with columns datetime and value | |
contamination: expected proportion of anomalies in the data (default: 0.1) | |
Returns: | |
{ | |
"anomalies_found": int, | |
"anomalies": list[dict[str, int]], | |
"statistics": { | |
"mean": float, | |
"std": float, | |
"min": float, | |
"max": float | |
} | |
""" | |
values = data["value"].to_numpy().reshape(-1, 1) | |
iso_forest = IsolationForest( | |
contamination=contamination, random_state=42, n_estimators=100 | |
) | |
# Predict anomalies (-1 for anomalies, 1 for normal) | |
predictions = iso_forest.fit_predict(values) | |
anomaly_scores = -iso_forest.score_samples(values) | |
anomaly_mask = predictions == -1 | |
mean_val = data["value"].mean() | |
std_val = data["value"].std() | |
logger.debug(f"anaomaly_mask: {anomaly_mask}") | |
logger.debug(f"anomaly_scores: {anomaly_scores}") | |
logger.debug( | |
pl.Series(anomaly_scores) | |
.filter(anomaly_mask) | |
.alias("anomaly_score"), | |
) | |
# Prepare anomalies data | |
anomalies = ( | |
data.select( | |
data["timestamp"].cast(pl.Utf8).alias("timestamp"), | |
data["value"].cast(pl.Float64), | |
pl.Series(anomaly_scores).alias("anomaly_score"), | |
pl.Series(anomaly_scores > np.percentile(anomaly_scores, 90)) | |
.cast(pl.Utf8) | |
.alias("severity") | |
.map_elements( | |
lambda x: "high" if x else "medium", | |
return_dtype=pl.String, | |
), | |
) | |
.filter(anomaly_mask) | |
.to_dicts() | |
) | |
logger.debug(f"anomalies: {anomalies}") | |
return { | |
"anomalies_found": len(anomalies), | |
"anomalies": anomalies, | |
"statistics": { | |
"mean": mean_val, | |
"std": std_val, | |
"min": data["value"].min(), | |
"max": data["value"].max(), | |
}, | |
} | |