import json import os import datetime import threading from collections import defaultdict from typing import List, Dict, Any, Optional from fastapi import Request class UsageTracker: def __init__(self, data_file="usage_data.json"): self.data_file = data_file self.lock = threading.Lock() self.data = self._load_data() self._schedule_save() def _load_data(self) -> Dict[str, Any]: """ Loads usage data from the JSON file, ensuring data integrity. Handles cases where the file might be corrupted or in an old format. """ if os.path.exists(self.data_file): try: with open(self.data_file, 'r') as f: data = json.load(f) # Check if data is in the expected new format if isinstance(data, dict) and 'requests' in data and 'models' in data and 'api_endpoints' in data: return data # If data is in an older, simpler format, convert it elif isinstance(data, dict) and 'total_requests' in data: # Heuristic for old format return self._convert_old_format(data) except (json.JSONDecodeError, TypeError) as e: print(f"Warning: Could not decode JSON from {self.data_file} ({e}). Starting fresh.") return self._initialize_empty_data() def _initialize_empty_data(self) -> Dict[str, Any]: """ Initializes a new, empty data structure for usage tracking. This structure includes a list for all requests, and dictionaries to store aggregated data for models and API endpoints. """ return { 'requests': [], 'models': defaultdict(lambda: {'total_requests': 0, 'first_used': None, 'last_used': None}), 'api_endpoints': defaultdict(lambda: {'total_requests': 0, 'first_used': None, 'last_used': None}) } def _convert_old_format(self, old_data: Dict[str, Any]) -> Dict[str, Any]: """ Converts data from the old format to the new detailed format. This is a crucial step to avoid data loss on updates. It iterates through old 'requests' (if any) and re-records them into the new structured format. """ print("Converting old usage data format to new format.") new_data = self._initialize_empty_data() # Preserve existing requests if they follow a basic structure if 'requests' in old_data and isinstance(old_data['requests'], list): for req in old_data['requests']: # Attempt to extract relevant fields from old request entry timestamp_str = req.get('timestamp') model_name = req.get('model', 'unknown_model') endpoint_name = req.get('endpoint', 'unknown_endpoint') ip_address = req.get('ip_address', 'N/A') user_agent = req.get('user_agent', 'N/A') # Ensure timestamp is valid and parseable try: timestamp = datetime.datetime.fromisoformat(timestamp_str) if timestamp_str else datetime.datetime.now(datetime.timezone.utc) except ValueError: timestamp = datetime.datetime.now(datetime.timezone.utc) # Fallback if timestamp is malformed new_data['requests'].append({ 'timestamp': timestamp.isoformat(), 'model': model_name, 'endpoint': endpoint_name, 'ip_address': ip_address, 'user_agent': user_agent, }) # Update aggregated stats for models and endpoints # This ensures that even old data contributes to the new summary if not new_data['models'][model_name]['first_used'] or timestamp < datetime.datetime.fromisoformat(new_data['models'][model_name]['first_used']): new_data['models'][model_name]['first_used'] = timestamp.isoformat() if not new_data['models'][model_name]['last_used'] or timestamp > datetime.datetime.fromisoformat(new_data['models'][model_name]['last_used']): new_data['models'][model_name]['last_used'] = timestamp.isoformat() new_data['models'][model_name]['total_requests'] += 1 if not new_data['api_endpoints'][endpoint_name]['first_used'] or timestamp < datetime.datetime.fromisoformat(new_data['api_endpoints'][endpoint_name]['first_used']): new_data['api_endpoints'][endpoint_name]['first_used'] = timestamp.isoformat() if not new_data['api_endpoints'][endpoint_name]['last_used'] or timestamp > datetime.datetime.fromisoformat(new_data['api_endpoints'][endpoint_name]['last_used']): new_data['api_endpoints'][endpoint_name]['last_used'] = timestamp.isoformat() new_data['api_endpoints'][endpoint_name]['total_requests'] += 1 print("Data conversion complete.") return new_data def save_data(self): """Saves current usage data to the JSON file periodically.""" with self.lock: try: # Convert defaultdicts to regular dicts for JSON serialization serializable_data = { 'requests': self.data['requests'], 'models': dict(self.data['models']), 'api_endpoints': dict(self.data['api_endpoints']) } with open(self.data_file, 'w') as f: json.dump(serializable_data, f, indent=4) except IOError as e: print(f"Error saving usage data to {self.data_file}: {e}") def _schedule_save(self): """Schedules the data to be saved every 60 seconds.""" # Use a non-daemon thread for saving to ensure it runs even if main thread exits # if using daemon threads, ensure proper shutdown hook is in place. # For simplicity in this context, a direct Timer call is fine. threading.Timer(60.0, self._schedule_save).start() self.save_data() def record_request(self, request: Optional[Request] = None, model: str = "unknown", endpoint: str = "unknown"): """ Records a single API request with detailed information. Updates both the raw request list and aggregated statistics. """ with self.lock: now = datetime.datetime.now(datetime.timezone.utc) ip_address = request.client.host if request and request.client else "N/A" user_agent = request.headers.get("user-agent", "N/A") if request else "N/A" # Append to raw requests list self.data['requests'].append({ 'timestamp': now.isoformat(), 'model': model, 'endpoint': endpoint, 'ip_address': ip_address, 'user_agent': user_agent, }) # Update model specific stats model_stats = self.data['models'][model] model_stats['total_requests'] += 1 if model_stats['first_used'] is None or now < datetime.datetime.fromisoformat(model_stats['first_used']): model_stats['first_used'] = now.isoformat() if model_stats['last_used'] is None or now > datetime.datetime.fromisoformat(model_stats['last_used']): model_stats['last_used'] = now.isoformat() # Update endpoint specific stats endpoint_stats = self.data['api_endpoints'][endpoint] endpoint_stats['total_requests'] += 1 if endpoint_stats['first_used'] is None or now < datetime.datetime.fromisoformat(endpoint_stats['first_used']): endpoint_stats['first_used'] = now.isoformat() if endpoint_stats['last_used'] is None or now > datetime.datetime.fromisoformat(endpoint_stats['last_used']): endpoint_stats['last_used'] = now.isoformat() def get_usage_summary(self, days: int = 7) -> Dict[str, Any]: """ Generates a comprehensive summary of usage data for the specified number of days. Includes total requests, model usage, endpoint usage, daily usage, and unique IPs. """ with self.lock: summary = { 'total_requests': 0, 'model_usage': defaultdict(int), # Requests per model for the period 'endpoint_usage': defaultdict(int), # Requests per endpoint for the period 'daily_usage': defaultdict(lambda: {'requests': 0, 'unique_ips': set()}), # Daily stats 'unique_ips_total': set(), # Unique IPs across all requests 'recent_requests': [] } # Prepare data for model and API endpoint charts # These are based on the aggregated 'self.data' which covers all time, # but the summary 'model_usage' and 'endpoint_usage' below are for the given 'days' period. all_time_model_data = { model: { 'total_requests': stats['total_requests'], 'first_used': stats['first_used'], 'last_used': stats['last_used'] } for model, stats in self.data['models'].items() } all_time_endpoint_data = { endpoint: { 'total_requests': stats['total_requests'], 'first_used': stats['first_used'], 'last_used': stats['last_used'] } for endpoint, stats in self.data['api_endpoints'].items() } cutoff_date = datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(days=days) # Iterate backwards for recent requests and aggregate data for the specified period requests_for_period = [] for req in reversed(self.data['requests']): req_time = datetime.datetime.fromisoformat(req['timestamp']) # Always update total requests and unique IPs for all time summary['total_requests'] += 1 summary['unique_ips_total'].add(req['ip_address']) if req_time >= cutoff_date: requests_for_period.append(req) date_str = req_time.strftime("%Y-%m-%d") # Aggregate data for charts and tables for the given period summary['model_usage'][req['model']] += 1 summary['endpoint_usage'][req['endpoint']] += 1 summary['daily_usage'][date_str]['requests'] += 1 summary['daily_usage'][date_str]['unique_ips'].add(req['ip_address']) # Add to recent requests list (up to 20) if len(summary['recent_requests']) < 20: summary['recent_requests'].append(req) # Convert daily unique IPs set to count for date_str, daily_stats in summary['daily_usage'].items(): daily_stats['unique_ips_count'] = len(daily_stats['unique_ips']) del daily_stats['unique_ips'] # Remove the set before returning # Sort daily usage by date summary['daily_usage'] = dict(sorted(summary['daily_usage'].items())) # Convert defaultdicts to regular dicts for final summary summary['model_usage_period'] = dict(summary['model_usage']) summary['endpoint_usage_period'] = dict(summary['endpoint_usage']) summary['daily_usage_period'] = dict(summary['daily_usage']) # Add all-time data summary['all_time_model_usage'] = all_time_model_data summary['all_time_endpoint_usage'] = all_time_endpoint_data summary['unique_ips_total_count'] = len(summary['unique_ips_total']) del summary['unique_ips_total'] # No need to send the whole set # Clean up defaultdicts that are not needed in the final output structure del summary['model_usage'] del summary['endpoint_usage'] del summary['daily_usage'] return summary