Spaces:
Running
Running
import json | |
import os | |
import datetime | |
import threading | |
from collections import Counter, defaultdict | |
from typing import List, Dict, Any, Optional | |
from fastapi import Request | |
class UsageTracker: | |
def __init__(self, data_file="usage_data.json"): | |
self.data_file = data_file | |
self.lock = threading.Lock() | |
self.data = self._load_data() | |
self._schedule_save() | |
def _load_data(self) -> Dict[str, Any]: | |
"""Loads usage data from the JSON file, ensuring data integrity.""" | |
if os.path.exists(self.data_file): | |
try: | |
with open(self.data_file, 'r') as f: | |
data = json.load(f) | |
if isinstance(data, dict) and 'requests' in data: | |
return data | |
# If data is old format, try to convert it | |
if isinstance(data.get('total_requests'), int): | |
return self._convert_old_format(data) | |
except (json.JSONDecodeError, TypeError): | |
print(f"Warning: Could not decode JSON from {self.data_file}. Starting fresh.") | |
return self._initialize_empty_data() | |
def _initialize_empty_data(self) -> Dict[str, List]: | |
"""Initializes a new data structure for usage tracking.""" | |
return {'requests': []} | |
def _convert_old_format(self, old_data: Dict[str, Any]) -> Dict[str, List]: | |
"""Converts data from the old format to the new detailed format.""" | |
print("Converting old usage data format to new format.") | |
new_data = self._initialize_empty_data() | |
# This is a simplification; a more robust conversion would be needed for full data recovery | |
# For now, we are just starting fresh with the new structure to avoid complexity. | |
return new_data | |
def save_data(self): | |
"""Saves current usage data to the JSON file periodically.""" | |
with self.lock: | |
try: | |
with open(self.data_file, 'w') as f: | |
json.dump(self.data, f, indent=4) | |
except IOError as e: | |
print(f"Error saving usage data to {self.data_file}: {e}") | |
def _schedule_save(self): | |
"""Schedules the data to be saved every 60 seconds.""" | |
threading.Timer(60.0, self._schedule_save).start() | |
self.save_data() | |
def record_request(self, request: Optional[Request], model: str = "unknown", endpoint: str = "unknown"): | |
"""Records a single API request with detailed information.""" | |
with self.lock: | |
now = datetime.datetime.now(datetime.timezone.utc) | |
ip_address = "N/A" | |
user_agent = "N/A" | |
if request: | |
ip_address = request.client.host | |
user_agent = request.headers.get("user-agent", "N/A") | |
self.data['requests'].append({ | |
'timestamp': now.isoformat(), | |
'model': model, | |
'endpoint': endpoint, | |
'ip_address': ip_address, | |
'user_agent': user_agent, | |
}) | |
def get_usage_summary(self, days: int = 7) -> Dict[str, Any]: | |
"""Generates a comprehensive summary of usage data.""" | |
with self.lock: | |
summary = { | |
'total_requests': 0, | |
'model_usage': defaultdict(int), | |
'endpoint_usage': defaultdict(int), | |
'daily_usage': defaultdict(lambda: defaultdict(int)), | |
'unique_ips': set(), | |
'recent_requests': [] | |
} | |
cutoff_date = datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(days=days) | |
# Iterate backwards for recent requests | |
for req in reversed(self.data['requests']): | |
req_time = datetime.datetime.fromisoformat(req['timestamp']) | |
# Update total requests (for all time) | |
summary['total_requests'] += 1 | |
summary['unique_ips'].add(req['ip_address']) | |
if req_time >= cutoff_date: | |
date_str = req_time.strftime("%Y-%m-%d") | |
# Aggregate data for charts and tables | |
summary['model_usage'][req['model']] += 1 | |
summary['endpoint_usage'][req['endpoint']] += 1 | |
summary['daily_usage'][date_str]['requests'] += 1 | |
# Add to recent requests list | |
if len(summary['recent_requests']) < 20: | |
summary['recent_requests'].append(req) | |
# Sort daily usage by date | |
summary['daily_usage'] = dict(sorted(summary['daily_usage'].items())) | |
summary['unique_ip_count'] = len(summary['unique_ips']) | |
del summary['unique_ips'] # No need to send the whole set | |
return summary | |