mcp-tscontext / app /service.py
znacer's picture
anomaly detection: iforest
aafd0ea
"""Functions meant to be exposed."""
import json
import threading
from loguru import logger
from app.analyzer import TimeSeriesAnalyzer
analyzer: TimeSeriesAnalyzer
analyzer_lock = threading.Lock()
def ensure_analyzer_connected() -> bool:
"""Check if the analyzer is connected and log the status."""
global analyzer
try:
if analyzer:
pass
except NameError:
connect_database()
return True
def connect_database() -> str:
"""Connect to the TimescaleDB and initialize the analyzer."""
global analyzer
with analyzer_lock:
analyzer = TimeSeriesAnalyzer()
analyzer.connect()
return "Successfully connected to database"
def list_available_metrics() -> str:
"""List all available metrics from the connected database."""
if not ensure_analyzer_connected():
return json.dumps({"error": "Database not connected"})
metrics = analyzer.get_available_metrics()
return json.dumps({"available_metrics": metrics}, indent=2)
def query_timeseries(sensor_id: str, start_time: str, end_time: str) -> str:
"""Query time series data for a specific sensor within a time range."""
logger.info("query timeseries")
if not ensure_analyzer_connected():
return json.dumps({"error": "Database not connected"})
data = analyzer.query_metrics(sensor_id, start_time, end_time)
result = {
"sensor_id": sensor_id,
"data_points": len(data),
"time_range": {
"start": data["timestamp"].min() if not data.is_empty() else None,
"end": data["timestamp"].max() if not data.is_empty else None,
},
"sample_data": data.head(10).to_dicts(),
}
return json.dumps(result, indent=2, default=str)
def detect_anomalies(
sensor_id: str,
start_time: str,
end_time: str,
threshold: float = 2.0,
algorithm: str = "zscore",
contamination: float = 0.1,
) -> str:
"""Detect anomalies in the time series data for a specific sensor.
Args:
sensor_id: The identifier of the sensor to analyze
start_time: The start time of the analysis period in ISO format
end_time: The end time of the analysis period in ISO format
threshold: Threshold for z-score based detection (default: 2.0)
algorithm: Algorithm to use for detection ("zscore" or "isolation_forest")
contamination: Expected proportion of anomalies for isolation forest (default: 0.1)
Returns:
str: JSON string containing anomaly detection results
"""
logger.info(f"detect anomalies using {algorithm}")
if not ensure_analyzer_connected():
return json.dumps({"error": "Database not connected"})
data = analyzer.query_metrics(sensor_id, start_time, end_time)
if algorithm == "zscore":
anomalies = analyzer.detect_anomalies(data, threshold)
elif algorithm == "isolation_forest":
anomalies = analyzer.detect_anomalies_isolation_forest(
data, contamination
)
else:
return json.dumps({"error": f"Unknown algorithm: {algorithm}"})
return json.dumps(anomalies, indent=2)
def analyze_trends(sensor_id: str, start_time: str, end_time: str) -> str:
"""Analyze trends in the time series data for a specific sensor.
This function retrieves time series data for the specified sensor within the given time range
and calculates trend information such as trend direction and percentage change.
Args:
sensor_id (str): The identifier of the sensor to analyze.
start_time (str): The start time of the analysis period in ISO format.
end_time (str): The end time of the analysis period in ISO format.
Returns:
str: A JSON string containing trend analysis results including trend direction,
percentage change, and start/end values.
Raises:
No direct exceptions, but returns a JSON error message if the database is not connected.
"""
logger.info("analyze trends")
if not ensure_analyzer_connected():
return json.dumps({"error": "Database not connected"})
data = analyzer.query_metrics(sensor_id, start_time, end_time)
trends = analyzer.calculate_trends(data)
return json.dumps(trends, indent=2)
def generate_analysis_report(
sensor_id: str,
start_time: str,
end_time: str,
include_anomalies: bool = True,
include_trends: bool = True,
user_question: str | None = None,
anomaly_algorithm: str = "zscore",
anomaly_threshold: float = 2.0,
anomaly_contamination: float = 0.1,
) -> str:
"""Generate a comprehensive analysis report for a specific sensor.
Args:
sensor_id: The identifier of the sensor to analyze
start_time: The start time of the analysis period in ISO format
end_time: The end time of the analysis period in ISO format
include_anomalies: Whether to include anomaly detection in the report
include_trends: Whether to include trend analysis in the report
user_question: A specific question from the user to be included in the report
anomaly_algorithm: Algorithm to use for anomaly detection ("zscore" or "isolation_forest")
anomaly_threshold: Threshold for z-score based detection
anomaly_contamination: Expected proportion of anomalies for isolation forest
Returns:
str: A formatted string containing the comprehensive analysis report
"""
logger.info("generate report")
if not ensure_analyzer_connected():
return "Error: Database not connected. Please connect first."
data = analyzer.query_metrics(sensor_id, start_time, end_time)
report_sections = []
report_sections.append(f"## Analysis Report for {sensor_id}")
report_sections.append(f"**Time Period:** {start_time} to {end_time}")
report_sections.append(f"**Data Points:** {len(data)}")
if user_question:
report_sections.append(f"**User Question:** {user_question}")
if include_trends:
trends = analyzer.calculate_trends(data)
report_sections.append("\n### Trend Analysis")
report_sections.append(
f"- **Direction:** {trends['trend_direction'].title()}"
)
report_sections.append(
f"- **Change:** {trends['percentage_change']:.2f}%"
)
report_sections.append(
f"- **Start Value:** {trends['start_value']:.2f}"
)
report_sections.append(f"- **End Value:** {trends['end_value']:.2f}")
if include_anomalies:
if anomaly_algorithm == "zscore":
anomalies = analyzer.detect_anomalies(data, anomaly_threshold)
else:
anomalies = analyzer.detect_anomalies_isolation_forest(
data, anomaly_contamination
)
report_sections.append("\n### Anomaly Detection")
report_sections.append(f"- **Algorithm:** {anomaly_algorithm}")
report_sections.append(
f"- **Anomalies Found:** {anomalies['anomalies_found']}"
)
if anomalies["anomalies_found"] > 0:
report_sections.append("- **Notable Anomalies:**")
for anomaly in anomalies["anomalies"][:5]:
report_sections.append(
f" - {anomaly['timestamp']}: {anomaly['value']:.2f} (severity: {anomaly['severity']})"
)
stats = anomalies["statistics"]
report_sections.append("\n### Statistical Summary")
report_sections.append(f"- **Mean:** {stats['mean']:.2f}")
report_sections.append(f"- **Std Dev:** {stats['std']:.2f}")
report_sections.append(f"- **Min:** {stats['min']:.2f}")
report_sections.append(f"- **Max:** {stats['max']:.2f}")
full_report = "\n".join(report_sections)
return full_report