import json import os import datetime import threading from collections import Counter, defaultdict from typing import List, Dict, Any, Optional from fastapi import Request class UsageTracker: def __init__(self, data_file="usage_data.json"): self.data_file = data_file self.lock = threading.Lock() self.data = self._load_data() self._schedule_save() def _load_data(self) -> Dict[str, Any]: """Loads usage data from the JSON file, ensuring data integrity.""" if os.path.exists(self.data_file): try: with open(self.data_file, 'r') as f: data = json.load(f) if isinstance(data, dict) and 'requests' in data: return data # If data is old format, try to convert it if isinstance(data.get('total_requests'), int): return self._convert_old_format(data) except (json.JSONDecodeError, TypeError): print(f"Warning: Could not decode JSON from {self.data_file}. Starting fresh.") return self._initialize_empty_data() def _initialize_empty_data(self) -> Dict[str, List]: """Initializes a new data structure for usage tracking.""" return {'requests': []} def _convert_old_format(self, old_data: Dict[str, Any]) -> Dict[str, List]: """Converts data from the old format to the new detailed format.""" print("Converting old usage data format to new format.") new_data = self._initialize_empty_data() # This is a simplification; a more robust conversion would be needed for full data recovery # For now, we are just starting fresh with the new structure to avoid complexity. return new_data def save_data(self): """Saves current usage data to the JSON file periodically.""" with self.lock: try: with open(self.data_file, 'w') as f: json.dump(self.data, f, indent=4) except IOError as e: print(f"Error saving usage data to {self.data_file}: {e}") def _schedule_save(self): """Schedules the data to be saved every 60 seconds.""" threading.Timer(60.0, self._schedule_save).start() self.save_data() def record_request(self, request: Optional[Request], model: str = "unknown", endpoint: str = "unknown"): """Records a single API request with detailed information.""" with self.lock: now = datetime.datetime.now(datetime.timezone.utc) ip_address = "N/A" user_agent = "N/A" if request: ip_address = request.client.host user_agent = request.headers.get("user-agent", "N/A") self.data['requests'].append({ 'timestamp': now.isoformat(), 'model': model, 'endpoint': endpoint, 'ip_address': ip_address, 'user_agent': user_agent, }) def get_usage_summary(self, days: int = 7) -> Dict[str, Any]: """Generates a comprehensive summary of usage data.""" with self.lock: summary = { 'total_requests': 0, 'model_usage': defaultdict(int), 'endpoint_usage': defaultdict(int), 'daily_usage': defaultdict(lambda: defaultdict(int)), 'unique_ips': set(), 'recent_requests': [] } cutoff_date = datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(days=days) # Iterate backwards for recent requests for req in reversed(self.data['requests']): req_time = datetime.datetime.fromisoformat(req['timestamp']) # Update total requests (for all time) summary['total_requests'] += 1 summary['unique_ips'].add(req['ip_address']) if req_time >= cutoff_date: date_str = req_time.strftime("%Y-%m-%d") # Aggregate data for charts and tables summary['model_usage'][req['model']] += 1 summary['endpoint_usage'][req['endpoint']] += 1 summary['daily_usage'][date_str]['requests'] += 1 # Add to recent requests list if len(summary['recent_requests']) < 20: summary['recent_requests'].append(req) # Sort daily usage by date summary['daily_usage'] = dict(sorted(summary['daily_usage'].items())) summary['unique_ip_count'] = len(summary['unique_ips']) del summary['unique_ips'] # No need to send the whole set return summary