"""Functions meant to be exposed.""" import json import threading from loguru import logger from app.analyzer import TimeSeriesAnalyzer analyzer: TimeSeriesAnalyzer analyzer_lock = threading.Lock() def ensure_analyzer_connected() -> bool: """Check if the analyzer is connected and log the status.""" global analyzer try: if analyzer: pass except NameError: connect_database() return True def connect_database() -> str: """Connect to the TimescaleDB and initialize the analyzer.""" global analyzer with analyzer_lock: analyzer = TimeSeriesAnalyzer() analyzer.connect() return "Successfully connected to database" def list_available_metrics() -> str: """List all available metrics from the connected database.""" if not ensure_analyzer_connected(): return json.dumps({"error": "Database not connected"}) metrics = analyzer.get_available_metrics() return json.dumps({"available_metrics": metrics}, indent=2) def query_timeseries(sensor_id: str, start_time: str, end_time: str) -> str: """Query time series data for a specific sensor within a time range.""" logger.info("query timeseries") if not ensure_analyzer_connected(): return json.dumps({"error": "Database not connected"}) data = analyzer.query_metrics(sensor_id, start_time, end_time) result = { "sensor_id": sensor_id, "data_points": len(data), "time_range": { "start": data["timestamp"].min() if not data.is_empty() else None, "end": data["timestamp"].max() if not data.is_empty else None, }, "sample_data": data.head(10).to_dicts(), } return json.dumps(result, indent=2, default=str) def detect_anomalies( sensor_id: str, start_time: str, end_time: str, threshold: float = 2.0, algorithm: str = "zscore", contamination: float = 0.1, ) -> str: """Detect anomalies in the time series data for a specific sensor. Args: sensor_id: The identifier of the sensor to analyze start_time: The start time of the analysis period in ISO format end_time: The end time of the analysis period in ISO format threshold: Threshold for z-score based detection (default: 2.0) algorithm: Algorithm to use for detection ("zscore" or "isolation_forest") contamination: Expected proportion of anomalies for isolation forest (default: 0.1) Returns: str: JSON string containing anomaly detection results """ logger.info(f"detect anomalies using {algorithm}") if not ensure_analyzer_connected(): return json.dumps({"error": "Database not connected"}) data = analyzer.query_metrics(sensor_id, start_time, end_time) if algorithm == "zscore": anomalies = analyzer.detect_anomalies(data, threshold) elif algorithm == "isolation_forest": anomalies = analyzer.detect_anomalies_isolation_forest( data, contamination ) else: return json.dumps({"error": f"Unknown algorithm: {algorithm}"}) return json.dumps(anomalies, indent=2) def analyze_trends(sensor_id: str, start_time: str, end_time: str) -> str: """Analyze trends in the time series data for a specific sensor. This function retrieves time series data for the specified sensor within the given time range and calculates trend information such as trend direction and percentage change. Args: sensor_id (str): The identifier of the sensor to analyze. start_time (str): The start time of the analysis period in ISO format. end_time (str): The end time of the analysis period in ISO format. Returns: str: A JSON string containing trend analysis results including trend direction, percentage change, and start/end values. Raises: No direct exceptions, but returns a JSON error message if the database is not connected. """ logger.info("analyze trends") if not ensure_analyzer_connected(): return json.dumps({"error": "Database not connected"}) data = analyzer.query_metrics(sensor_id, start_time, end_time) trends = analyzer.calculate_trends(data) return json.dumps(trends, indent=2) def generate_analysis_report( sensor_id: str, start_time: str, end_time: str, include_anomalies: bool = True, include_trends: bool = True, user_question: str | None = None, anomaly_algorithm: str = "zscore", anomaly_threshold: float = 2.0, anomaly_contamination: float = 0.1, ) -> str: """Generate a comprehensive analysis report for a specific sensor. Args: sensor_id: The identifier of the sensor to analyze start_time: The start time of the analysis period in ISO format end_time: The end time of the analysis period in ISO format include_anomalies: Whether to include anomaly detection in the report include_trends: Whether to include trend analysis in the report user_question: A specific question from the user to be included in the report anomaly_algorithm: Algorithm to use for anomaly detection ("zscore" or "isolation_forest") anomaly_threshold: Threshold for z-score based detection anomaly_contamination: Expected proportion of anomalies for isolation forest Returns: str: A formatted string containing the comprehensive analysis report """ logger.info("generate report") if not ensure_analyzer_connected(): return "Error: Database not connected. Please connect first." data = analyzer.query_metrics(sensor_id, start_time, end_time) report_sections = [] report_sections.append(f"## Analysis Report for {sensor_id}") report_sections.append(f"**Time Period:** {start_time} to {end_time}") report_sections.append(f"**Data Points:** {len(data)}") if user_question: report_sections.append(f"**User Question:** {user_question}") if include_trends: trends = analyzer.calculate_trends(data) report_sections.append("\n### Trend Analysis") report_sections.append( f"- **Direction:** {trends['trend_direction'].title()}" ) report_sections.append( f"- **Change:** {trends['percentage_change']:.2f}%" ) report_sections.append( f"- **Start Value:** {trends['start_value']:.2f}" ) report_sections.append(f"- **End Value:** {trends['end_value']:.2f}") if include_anomalies: if anomaly_algorithm == "zscore": anomalies = analyzer.detect_anomalies(data, anomaly_threshold) else: anomalies = analyzer.detect_anomalies_isolation_forest( data, anomaly_contamination ) report_sections.append("\n### Anomaly Detection") report_sections.append(f"- **Algorithm:** {anomaly_algorithm}") report_sections.append( f"- **Anomalies Found:** {anomalies['anomalies_found']}" ) if anomalies["anomalies_found"] > 0: report_sections.append("- **Notable Anomalies:**") for anomaly in anomalies["anomalies"][:5]: report_sections.append( f" - {anomaly['timestamp']}: {anomaly['value']:.2f} (severity: {anomaly['severity']})" ) stats = anomalies["statistics"] report_sections.append("\n### Statistical Summary") report_sections.append(f"- **Mean:** {stats['mean']:.2f}") report_sections.append(f"- **Std Dev:** {stats['std']:.2f}") report_sections.append(f"- **Min:** {stats['min']:.2f}") report_sections.append(f"- **Max:** {stats['max']:.2f}") full_report = "\n".join(report_sections) return full_report