mcp-tscontext / app /analyzer.py
znacer's picture
anomaly detection: iforest
aafd0ea
"""Module that defines the TimeSeriesAnalyzer object."""
import os
from dataclasses import dataclass
from datetime import datetime
from typing import Any
import numpy as np
import polars as pl
from loguru import logger
from sklearn.ensemble import IsolationForest
from sqlalchemy import Engine, create_engine, text
from data.get_mock import get_df
@dataclass
class TimeSeriesConfig:
"""Object with the database connections details.
Attributes:
host: address of the database
port: port of the database
database: name of the database
username: username of the database
password: password of the database
"""
host: str
port: int
database: str
username: str
password: str
class TimeSeriesAnalyzer:
"""Handle connections details, and how to compute insights.
Attributes:
use_mock_db: if True, databased if mocked.
connection: connection engine
"""
def __init__(self):
self.use_mock_db = os.getenv("USE_MOCK_DB", True)
if not self.use_mock_db:
self.config = TimeSeriesConfig(
host=os.getenv("DB_HOST", "localhost"),
port=int(os.getenv("DB_PORT", 5432)),
database=os.getenv("DB_NAME", "data"),
username=os.getenv("DB_USER", "postgres"),
password=os.getenv("DB_PASS", "secretpassword"),
)
self.connection: Engine
def connect(self):
"""Instantiate the database engine."""
if self.use_mock_db:
logger.info("Connecting to mock SQLite database")
self.connection = create_engine("sqlite:///mock.db")
self._setup_mock_db()
else:
logger.info(
f"Connecting to TimescaleDB at {self.config.host}:{self.config.port}"
)
self.connection = create_engine(
f"postgresql+psycopg2://{self.config.username}:{self.config.password}@{self.config.host}:{self.config.port}/{self.config.database}"
)
logger.info("Connected to database!")
def _setup_mock_db(self):
df = get_df()
if os.path.exists("./mock.db"):
return
logger.info(
f"""df shape: {df.shape}, size: {df.estimated_size("kb"):,.3f}kb"""
)
logger.debug(df.head(5))
with self.connection.connect() as conn:
df.write_database(
"timeseries_data",
conn,
if_table_exists="replace",
engine_options={},
)
def _ensure_connected(self):
if not self.connection:
self.connect()
def get_available_metrics(self) -> list[str]:
"""Get the list of sensor_id.
Returns:
list of sensors
"""
self._ensure_connected()
sql = "SELECT DISTINCT sensor_id FROM timeseries_data ORDER BY sensor_id ASC"
with self.connection.connect() as conn:
rows = conn.execute(text(sql))
return [r[0] for r in rows]
def query_metrics(
self,
sensor_id: str,
start_time: str,
end_time: str,
) -> pl.DataFrame:
"""Run a select query of 1 time serie.
Args:
sensor_id: id of the sensor
start_time: iso datetime
end_time: iso datetime
Returns:
The expected time serie as a polar DataFrame.
"""
self._ensure_connected()
start_dt = datetime.fromisoformat(start_time)
end_dt = datetime.fromisoformat(end_time)
query = f"""SELECT timestamp, value FROM timeseries_data
WHERE sensor_id = '{sensor_id}' AND timestamp >= '{start_dt}' AND timestamp <= '{end_dt}'
ORDER BY timestamp ASC"""
with self.connection.connect() as conn:
df = pl.read_database(query, conn)
return df
def detect_anomalies(
self, data: pl.DataFrame, threshold: float = 1.0
) -> dict[str, Any]:
"""Detect anomalies in the time series data for a specific sensor.
Args:
data: expect only 1 timeserie with columns datetime and value
threshold: default to 1.0
Returns:
{
"anomalies_found": int,
"anomalies": list[dict[str, int]],
"statistics": {
"mean": float,
"std": float,
"min": float,
"max": float
},
"""
mean_val = data["value"].mean()
std_val = data["value"].std()
data = data.with_columns(
((data["value"] - mean_val).abs() / std_val).alias("z_score")
)
anomalies = (
data.filter(data["z_score"] > threshold)
.select(
[
data["timestamp"].cast(pl.Utf8).alias("timestamp"),
data["value"].cast(pl.Float64),
data["z_score"].cast(pl.Float64).alias("z_score"),
(data["z_score"] > 2.0)
.cast(pl.Utf8)
.alias("severity")
.map_elements(
lambda x: "high" if x else "medium",
return_dtype=pl.String,
),
]
)
.to_dicts()
)
return {
"anomalies_found": len(anomalies),
"anomalies": anomalies,
"statistics": {
"mean": mean_val,
"std": std_val,
"min": data["value"].min(),
"max": data["value"].max(),
},
}
def calculate_trends(self, data: pl.DataFrame) -> dict[str, Any]:
"""Calculate trend information such as trend direction and percentage change.
Args:
data: expect only 1 timeserie with columns datetime and value
Returns:
{
"trend_direction": Literal["increasing", "decreasing", "stable"],
"trend_slope": float,
"percentage_change": float,
"start_value": float,
"end_value": float,
"time_period": {
"start": datetime,
"end": datetime,
},
}
"""
values = data["value"]
timestamps = data["timestamp"]
x = np.arange(len(values))
coeffs = np.polyfit(x, values, 1)
trend_slope = coeffs[0]
pct_change = (
((values[-1] - values[0]) / values[0]) * 100
if len(values) > 1
else 0
)
return {
"trend_direction": "increasing"
if trend_slope > 0
else "decreasing"
if trend_slope < 0
else "stable",
"trend_slope": float(trend_slope),
"percentage_change": float(pct_change),
"start_value": float(values[0]) if len(values) > 0 else 0,
"end_value": float(values[-1]) if len(values) > 0 else 0,
"time_period": {
"start": timestamps[0] if len(timestamps) > 0 else None,
"end": timestamps[-1] if len(timestamps) > 0 else None,
},
}
def detect_anomalies_isolation_forest(
self, data: pl.DataFrame, contamination: float = 0.1
) -> dict[str, Any]:
"""Detect anomalies in the time series data using Isolation Forest algorithm.
Args:
data: expect only 1 timeserie with columns datetime and value
contamination: expected proportion of anomalies in the data (default: 0.1)
Returns:
{
"anomalies_found": int,
"anomalies": list[dict[str, int]],
"statistics": {
"mean": float,
"std": float,
"min": float,
"max": float
}
"""
values = data["value"].to_numpy().reshape(-1, 1)
iso_forest = IsolationForest(
contamination=contamination, random_state=42, n_estimators=100
)
# Predict anomalies (-1 for anomalies, 1 for normal)
predictions = iso_forest.fit_predict(values)
anomaly_scores = -iso_forest.score_samples(values)
anomaly_mask = predictions == -1
mean_val = data["value"].mean()
std_val = data["value"].std()
logger.debug(f"anaomaly_mask: {anomaly_mask}")
logger.debug(f"anomaly_scores: {anomaly_scores}")
logger.debug(
pl.Series(anomaly_scores)
.filter(anomaly_mask)
.alias("anomaly_score"),
)
# Prepare anomalies data
anomalies = (
data.select(
data["timestamp"].cast(pl.Utf8).alias("timestamp"),
data["value"].cast(pl.Float64),
pl.Series(anomaly_scores).alias("anomaly_score"),
pl.Series(anomaly_scores > np.percentile(anomaly_scores, 90))
.cast(pl.Utf8)
.alias("severity")
.map_elements(
lambda x: "high" if x else "medium",
return_dtype=pl.String,
),
)
.filter(anomaly_mask)
.to_dicts()
)
logger.debug(f"anomalies: {anomalies}")
return {
"anomalies_found": len(anomalies),
"anomalies": anomalies,
"statistics": {
"mean": mean_val,
"std": std_val,
"min": data["value"].min(),
"max": data["value"].max(),
},
}