Spaces:
Runtime error
Runtime error
File size: 7,801 Bytes
0167724 aafd0ea 0167724 aafd0ea 0167724 aafd0ea 0167724 aafd0ea 0167724 aafd0ea 0167724 aafd0ea 0167724 aafd0ea 0167724 aafd0ea 0167724 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 |
"""Functions meant to be exposed."""
import json
import threading
from loguru import logger
from app.analyzer import TimeSeriesAnalyzer
analyzer: TimeSeriesAnalyzer
analyzer_lock = threading.Lock()
def ensure_analyzer_connected() -> bool:
"""Check if the analyzer is connected and log the status."""
global analyzer
try:
if analyzer:
pass
except NameError:
connect_database()
return True
def connect_database() -> str:
"""Connect to the TimescaleDB and initialize the analyzer."""
global analyzer
with analyzer_lock:
analyzer = TimeSeriesAnalyzer()
analyzer.connect()
return "Successfully connected to database"
def list_available_metrics() -> str:
"""List all available metrics from the connected database."""
if not ensure_analyzer_connected():
return json.dumps({"error": "Database not connected"})
metrics = analyzer.get_available_metrics()
return json.dumps({"available_metrics": metrics}, indent=2)
def query_timeseries(sensor_id: str, start_time: str, end_time: str) -> str:
"""Query time series data for a specific sensor within a time range."""
logger.info("query timeseries")
if not ensure_analyzer_connected():
return json.dumps({"error": "Database not connected"})
data = analyzer.query_metrics(sensor_id, start_time, end_time)
result = {
"sensor_id": sensor_id,
"data_points": len(data),
"time_range": {
"start": data["timestamp"].min() if not data.is_empty() else None,
"end": data["timestamp"].max() if not data.is_empty else None,
},
"sample_data": data.head(10).to_dicts(),
}
return json.dumps(result, indent=2, default=str)
def detect_anomalies(
sensor_id: str,
start_time: str,
end_time: str,
threshold: float = 2.0,
algorithm: str = "zscore",
contamination: float = 0.1,
) -> str:
"""Detect anomalies in the time series data for a specific sensor.
Args:
sensor_id: The identifier of the sensor to analyze
start_time: The start time of the analysis period in ISO format
end_time: The end time of the analysis period in ISO format
threshold: Threshold for z-score based detection (default: 2.0)
algorithm: Algorithm to use for detection ("zscore" or "isolation_forest")
contamination: Expected proportion of anomalies for isolation forest (default: 0.1)
Returns:
str: JSON string containing anomaly detection results
"""
logger.info(f"detect anomalies using {algorithm}")
if not ensure_analyzer_connected():
return json.dumps({"error": "Database not connected"})
data = analyzer.query_metrics(sensor_id, start_time, end_time)
if algorithm == "zscore":
anomalies = analyzer.detect_anomalies(data, threshold)
elif algorithm == "isolation_forest":
anomalies = analyzer.detect_anomalies_isolation_forest(
data, contamination
)
else:
return json.dumps({"error": f"Unknown algorithm: {algorithm}"})
return json.dumps(anomalies, indent=2)
def analyze_trends(sensor_id: str, start_time: str, end_time: str) -> str:
"""Analyze trends in the time series data for a specific sensor.
This function retrieves time series data for the specified sensor within the given time range
and calculates trend information such as trend direction and percentage change.
Args:
sensor_id (str): The identifier of the sensor to analyze.
start_time (str): The start time of the analysis period in ISO format.
end_time (str): The end time of the analysis period in ISO format.
Returns:
str: A JSON string containing trend analysis results including trend direction,
percentage change, and start/end values.
Raises:
No direct exceptions, but returns a JSON error message if the database is not connected.
"""
logger.info("analyze trends")
if not ensure_analyzer_connected():
return json.dumps({"error": "Database not connected"})
data = analyzer.query_metrics(sensor_id, start_time, end_time)
trends = analyzer.calculate_trends(data)
return json.dumps(trends, indent=2)
def generate_analysis_report(
sensor_id: str,
start_time: str,
end_time: str,
include_anomalies: bool = True,
include_trends: bool = True,
user_question: str | None = None,
anomaly_algorithm: str = "zscore",
anomaly_threshold: float = 2.0,
anomaly_contamination: float = 0.1,
) -> str:
"""Generate a comprehensive analysis report for a specific sensor.
Args:
sensor_id: The identifier of the sensor to analyze
start_time: The start time of the analysis period in ISO format
end_time: The end time of the analysis period in ISO format
include_anomalies: Whether to include anomaly detection in the report
include_trends: Whether to include trend analysis in the report
user_question: A specific question from the user to be included in the report
anomaly_algorithm: Algorithm to use for anomaly detection ("zscore" or "isolation_forest")
anomaly_threshold: Threshold for z-score based detection
anomaly_contamination: Expected proportion of anomalies for isolation forest
Returns:
str: A formatted string containing the comprehensive analysis report
"""
logger.info("generate report")
if not ensure_analyzer_connected():
return "Error: Database not connected. Please connect first."
data = analyzer.query_metrics(sensor_id, start_time, end_time)
report_sections = []
report_sections.append(f"## Analysis Report for {sensor_id}")
report_sections.append(f"**Time Period:** {start_time} to {end_time}")
report_sections.append(f"**Data Points:** {len(data)}")
if user_question:
report_sections.append(f"**User Question:** {user_question}")
if include_trends:
trends = analyzer.calculate_trends(data)
report_sections.append("\n### Trend Analysis")
report_sections.append(
f"- **Direction:** {trends['trend_direction'].title()}"
)
report_sections.append(
f"- **Change:** {trends['percentage_change']:.2f}%"
)
report_sections.append(
f"- **Start Value:** {trends['start_value']:.2f}"
)
report_sections.append(f"- **End Value:** {trends['end_value']:.2f}")
if include_anomalies:
if anomaly_algorithm == "zscore":
anomalies = analyzer.detect_anomalies(data, anomaly_threshold)
else:
anomalies = analyzer.detect_anomalies_isolation_forest(
data, anomaly_contamination
)
report_sections.append("\n### Anomaly Detection")
report_sections.append(f"- **Algorithm:** {anomaly_algorithm}")
report_sections.append(
f"- **Anomalies Found:** {anomalies['anomalies_found']}"
)
if anomalies["anomalies_found"] > 0:
report_sections.append("- **Notable Anomalies:**")
for anomaly in anomalies["anomalies"][:5]:
report_sections.append(
f" - {anomaly['timestamp']}: {anomaly['value']:.2f} (severity: {anomaly['severity']})"
)
stats = anomalies["statistics"]
report_sections.append("\n### Statistical Summary")
report_sections.append(f"- **Mean:** {stats['mean']:.2f}")
report_sections.append(f"- **Std Dev:** {stats['std']:.2f}")
report_sections.append(f"- **Min:** {stats['min']:.2f}")
report_sections.append(f"- **Max:** {stats['max']:.2f}")
full_report = "\n".join(report_sections)
return full_report
|