Spaces:
Runtime error
Runtime error
"""Functions meant to be exposed.""" | |
import json | |
import threading | |
from loguru import logger | |
from app.analyzer import TimeSeriesAnalyzer | |
analyzer: TimeSeriesAnalyzer | |
analyzer_lock = threading.Lock() | |
def ensure_analyzer_connected() -> bool: | |
"""Check if the analyzer is connected and log the status.""" | |
global analyzer | |
try: | |
if analyzer: | |
pass | |
except NameError: | |
connect_database() | |
return True | |
def connect_database() -> str: | |
"""Connect to the TimescaleDB and initialize the analyzer.""" | |
global analyzer | |
with analyzer_lock: | |
analyzer = TimeSeriesAnalyzer() | |
analyzer.connect() | |
return "Successfully connected to database" | |
def list_available_metrics() -> str: | |
"""List all available metrics from the connected database.""" | |
if not ensure_analyzer_connected(): | |
return json.dumps({"error": "Database not connected"}) | |
metrics = analyzer.get_available_metrics() | |
return json.dumps({"available_metrics": metrics}, indent=2) | |
def query_timeseries(sensor_id: str, start_time: str, end_time: str) -> str: | |
"""Query time series data for a specific sensor within a time range.""" | |
logger.info("query timeseries") | |
if not ensure_analyzer_connected(): | |
return json.dumps({"error": "Database not connected"}) | |
data = analyzer.query_metrics(sensor_id, start_time, end_time) | |
result = { | |
"sensor_id": sensor_id, | |
"data_points": len(data), | |
"time_range": { | |
"start": data["timestamp"].min() if not data.is_empty() else None, | |
"end": data["timestamp"].max() if not data.is_empty else None, | |
}, | |
"sample_data": data.head(10).to_dicts(), | |
} | |
return json.dumps(result, indent=2, default=str) | |
def detect_anomalies( | |
sensor_id: str, | |
start_time: str, | |
end_time: str, | |
threshold: float = 2.0, | |
algorithm: str = "zscore", | |
contamination: float = 0.1, | |
) -> str: | |
"""Detect anomalies in the time series data for a specific sensor. | |
Args: | |
sensor_id: The identifier of the sensor to analyze | |
start_time: The start time of the analysis period in ISO format | |
end_time: The end time of the analysis period in ISO format | |
threshold: Threshold for z-score based detection (default: 2.0) | |
algorithm: Algorithm to use for detection ("zscore" or "isolation_forest") | |
contamination: Expected proportion of anomalies for isolation forest (default: 0.1) | |
Returns: | |
str: JSON string containing anomaly detection results | |
""" | |
logger.info(f"detect anomalies using {algorithm}") | |
if not ensure_analyzer_connected(): | |
return json.dumps({"error": "Database not connected"}) | |
data = analyzer.query_metrics(sensor_id, start_time, end_time) | |
if algorithm == "zscore": | |
anomalies = analyzer.detect_anomalies(data, threshold) | |
elif algorithm == "isolation_forest": | |
anomalies = analyzer.detect_anomalies_isolation_forest( | |
data, contamination | |
) | |
else: | |
return json.dumps({"error": f"Unknown algorithm: {algorithm}"}) | |
return json.dumps(anomalies, indent=2) | |
def analyze_trends(sensor_id: str, start_time: str, end_time: str) -> str: | |
"""Analyze trends in the time series data for a specific sensor. | |
This function retrieves time series data for the specified sensor within the given time range | |
and calculates trend information such as trend direction and percentage change. | |
Args: | |
sensor_id (str): The identifier of the sensor to analyze. | |
start_time (str): The start time of the analysis period in ISO format. | |
end_time (str): The end time of the analysis period in ISO format. | |
Returns: | |
str: A JSON string containing trend analysis results including trend direction, | |
percentage change, and start/end values. | |
Raises: | |
No direct exceptions, but returns a JSON error message if the database is not connected. | |
""" | |
logger.info("analyze trends") | |
if not ensure_analyzer_connected(): | |
return json.dumps({"error": "Database not connected"}) | |
data = analyzer.query_metrics(sensor_id, start_time, end_time) | |
trends = analyzer.calculate_trends(data) | |
return json.dumps(trends, indent=2) | |
def generate_analysis_report( | |
sensor_id: str, | |
start_time: str, | |
end_time: str, | |
include_anomalies: bool = True, | |
include_trends: bool = True, | |
user_question: str | None = None, | |
anomaly_algorithm: str = "zscore", | |
anomaly_threshold: float = 2.0, | |
anomaly_contamination: float = 0.1, | |
) -> str: | |
"""Generate a comprehensive analysis report for a specific sensor. | |
Args: | |
sensor_id: The identifier of the sensor to analyze | |
start_time: The start time of the analysis period in ISO format | |
end_time: The end time of the analysis period in ISO format | |
include_anomalies: Whether to include anomaly detection in the report | |
include_trends: Whether to include trend analysis in the report | |
user_question: A specific question from the user to be included in the report | |
anomaly_algorithm: Algorithm to use for anomaly detection ("zscore" or "isolation_forest") | |
anomaly_threshold: Threshold for z-score based detection | |
anomaly_contamination: Expected proportion of anomalies for isolation forest | |
Returns: | |
str: A formatted string containing the comprehensive analysis report | |
""" | |
logger.info("generate report") | |
if not ensure_analyzer_connected(): | |
return "Error: Database not connected. Please connect first." | |
data = analyzer.query_metrics(sensor_id, start_time, end_time) | |
report_sections = [] | |
report_sections.append(f"## Analysis Report for {sensor_id}") | |
report_sections.append(f"**Time Period:** {start_time} to {end_time}") | |
report_sections.append(f"**Data Points:** {len(data)}") | |
if user_question: | |
report_sections.append(f"**User Question:** {user_question}") | |
if include_trends: | |
trends = analyzer.calculate_trends(data) | |
report_sections.append("\n### Trend Analysis") | |
report_sections.append( | |
f"- **Direction:** {trends['trend_direction'].title()}" | |
) | |
report_sections.append( | |
f"- **Change:** {trends['percentage_change']:.2f}%" | |
) | |
report_sections.append( | |
f"- **Start Value:** {trends['start_value']:.2f}" | |
) | |
report_sections.append(f"- **End Value:** {trends['end_value']:.2f}") | |
if include_anomalies: | |
if anomaly_algorithm == "zscore": | |
anomalies = analyzer.detect_anomalies(data, anomaly_threshold) | |
else: | |
anomalies = analyzer.detect_anomalies_isolation_forest( | |
data, anomaly_contamination | |
) | |
report_sections.append("\n### Anomaly Detection") | |
report_sections.append(f"- **Algorithm:** {anomaly_algorithm}") | |
report_sections.append( | |
f"- **Anomalies Found:** {anomalies['anomalies_found']}" | |
) | |
if anomalies["anomalies_found"] > 0: | |
report_sections.append("- **Notable Anomalies:**") | |
for anomaly in anomalies["anomalies"][:5]: | |
report_sections.append( | |
f" - {anomaly['timestamp']}: {anomaly['value']:.2f} (severity: {anomaly['severity']})" | |
) | |
stats = anomalies["statistics"] | |
report_sections.append("\n### Statistical Summary") | |
report_sections.append(f"- **Mean:** {stats['mean']:.2f}") | |
report_sections.append(f"- **Std Dev:** {stats['std']:.2f}") | |
report_sections.append(f"- **Min:** {stats['min']:.2f}") | |
report_sections.append(f"- **Max:** {stats['max']:.2f}") | |
full_report = "\n".join(report_sections) | |
return full_report | |