File size: 7,801 Bytes
0167724
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
aafd0ea
 
 
 
 
 
0167724
aafd0ea
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0167724
 
 
aafd0ea
 
 
 
 
 
 
 
 
 
0167724
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
aafd0ea
 
 
0167724
 
 
 
aafd0ea
 
 
 
 
 
 
 
 
0167724
 
aafd0ea
0167724
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
aafd0ea
 
 
 
 
 
 
0167724
aafd0ea
0167724
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
"""Functions meant to be exposed."""

import json
import threading

from loguru import logger

from app.analyzer import TimeSeriesAnalyzer

analyzer: TimeSeriesAnalyzer
analyzer_lock = threading.Lock()


def ensure_analyzer_connected() -> bool:
    """Check if the analyzer is connected and log the status."""
    global analyzer
    try:
        if analyzer:
            pass
    except NameError:
        connect_database()
    return True


def connect_database() -> str:
    """Connect to the TimescaleDB and initialize the analyzer."""
    global analyzer
    with analyzer_lock:
        analyzer = TimeSeriesAnalyzer()
    analyzer.connect()
    return "Successfully connected to database"


def list_available_metrics() -> str:
    """List all available metrics from the connected database."""
    if not ensure_analyzer_connected():
        return json.dumps({"error": "Database not connected"})
    metrics = analyzer.get_available_metrics()
    return json.dumps({"available_metrics": metrics}, indent=2)


def query_timeseries(sensor_id: str, start_time: str, end_time: str) -> str:
    """Query time series data for a specific sensor within a time range."""
    logger.info("query timeseries")
    if not ensure_analyzer_connected():
        return json.dumps({"error": "Database not connected"})
    data = analyzer.query_metrics(sensor_id, start_time, end_time)
    result = {
        "sensor_id": sensor_id,
        "data_points": len(data),
        "time_range": {
            "start": data["timestamp"].min() if not data.is_empty() else None,
            "end": data["timestamp"].max() if not data.is_empty else None,
        },
        "sample_data": data.head(10).to_dicts(),
    }
    return json.dumps(result, indent=2, default=str)


def detect_anomalies(
    sensor_id: str,
    start_time: str,
    end_time: str,
    threshold: float = 2.0,
    algorithm: str = "zscore",
    contamination: float = 0.1,
) -> str:
    """Detect anomalies in the time series data for a specific sensor.

    Args:
        sensor_id: The identifier of the sensor to analyze
        start_time: The start time of the analysis period in ISO format
        end_time: The end time of the analysis period in ISO format
        threshold: Threshold for z-score based detection (default: 2.0)
        algorithm: Algorithm to use for detection ("zscore" or "isolation_forest")
        contamination: Expected proportion of anomalies for isolation forest (default: 0.1)

    Returns:
        str: JSON string containing anomaly detection results

    """
    logger.info(f"detect anomalies using {algorithm}")
    if not ensure_analyzer_connected():
        return json.dumps({"error": "Database not connected"})
    data = analyzer.query_metrics(sensor_id, start_time, end_time)

    if algorithm == "zscore":
        anomalies = analyzer.detect_anomalies(data, threshold)
    elif algorithm == "isolation_forest":
        anomalies = analyzer.detect_anomalies_isolation_forest(
            data, contamination
        )
    else:
        return json.dumps({"error": f"Unknown algorithm: {algorithm}"})

    return json.dumps(anomalies, indent=2)


def analyze_trends(sensor_id: str, start_time: str, end_time: str) -> str:
    """Analyze trends in the time series data for a specific sensor.

    This function retrieves time series data for the specified sensor within the given time range
    and calculates trend information such as trend direction and percentage change.

    Args:
        sensor_id (str): The identifier of the sensor to analyze.
        start_time (str): The start time of the analysis period in ISO format.
        end_time (str): The end time of the analysis period in ISO format.

    Returns:
        str: A JSON string containing trend analysis results including trend direction,
            percentage change, and start/end values.

    Raises:
        No direct exceptions, but returns a JSON error message if the database is not connected.

    """
    logger.info("analyze trends")
    if not ensure_analyzer_connected():
        return json.dumps({"error": "Database not connected"})
    data = analyzer.query_metrics(sensor_id, start_time, end_time)
    trends = analyzer.calculate_trends(data)
    return json.dumps(trends, indent=2)


def generate_analysis_report(
    sensor_id: str,
    start_time: str,
    end_time: str,
    include_anomalies: bool = True,
    include_trends: bool = True,
    user_question: str | None = None,
    anomaly_algorithm: str = "zscore",
    anomaly_threshold: float = 2.0,
    anomaly_contamination: float = 0.1,
) -> str:
    """Generate a comprehensive analysis report for a specific sensor.

    Args:
        sensor_id: The identifier of the sensor to analyze
        start_time: The start time of the analysis period in ISO format
        end_time: The end time of the analysis period in ISO format
        include_anomalies: Whether to include anomaly detection in the report
        include_trends: Whether to include trend analysis in the report
        user_question: A specific question from the user to be included in the report
        anomaly_algorithm: Algorithm to use for anomaly detection ("zscore" or "isolation_forest")
        anomaly_threshold: Threshold for z-score based detection
        anomaly_contamination: Expected proportion of anomalies for isolation forest

    Returns:
        str: A formatted string containing the comprehensive analysis report

    """
    logger.info("generate report")
    if not ensure_analyzer_connected():
        return "Error: Database not connected. Please connect first."
    data = analyzer.query_metrics(sensor_id, start_time, end_time)
    report_sections = []
    report_sections.append(f"## Analysis Report for {sensor_id}")
    report_sections.append(f"**Time Period:** {start_time} to {end_time}")
    report_sections.append(f"**Data Points:** {len(data)}")
    if user_question:
        report_sections.append(f"**User Question:** {user_question}")
    if include_trends:
        trends = analyzer.calculate_trends(data)
        report_sections.append("\n### Trend Analysis")
        report_sections.append(
            f"- **Direction:** {trends['trend_direction'].title()}"
        )
        report_sections.append(
            f"- **Change:** {trends['percentage_change']:.2f}%"
        )
        report_sections.append(
            f"- **Start Value:** {trends['start_value']:.2f}"
        )
        report_sections.append(f"- **End Value:** {trends['end_value']:.2f}")
    if include_anomalies:
        if anomaly_algorithm == "zscore":
            anomalies = analyzer.detect_anomalies(data, anomaly_threshold)
        else:
            anomalies = analyzer.detect_anomalies_isolation_forest(
                data, anomaly_contamination
            )

        report_sections.append("\n### Anomaly Detection")
        report_sections.append(f"- **Algorithm:** {anomaly_algorithm}")
        report_sections.append(
            f"- **Anomalies Found:** {anomalies['anomalies_found']}"
        )
        if anomalies["anomalies_found"] > 0:
            report_sections.append("- **Notable Anomalies:**")
            for anomaly in anomalies["anomalies"][:5]:
                report_sections.append(
                    f"  - {anomaly['timestamp']}: {anomaly['value']:.2f} (severity: {anomaly['severity']})"
                )
        stats = anomalies["statistics"]
        report_sections.append("\n### Statistical Summary")
        report_sections.append(f"- **Mean:** {stats['mean']:.2f}")
        report_sections.append(f"- **Std Dev:** {stats['std']:.2f}")
        report_sections.append(f"- **Min:** {stats['min']:.2f}")
        report_sections.append(f"- **Max:** {stats['max']:.2f}")
    full_report = "\n".join(report_sections)
    return full_report