Spaces:
Running
Running
File size: 12,423 Bytes
67a227e 086625b d5a05cc dc58aee 67a227e 086625b dc58aee 67a227e dc58aee d5a05cc 086625b d5a05cc dc58aee d5a05cc dc58aee d5a05cc 086625b d5a05cc dc58aee d5a05cc dc58aee 4420abf 086625b dc58aee 086625b d5a05cc 086625b d5a05cc 086625b dc58aee d5a05cc dc58aee d5a05cc 086625b dc58aee d5a05cc dc58aee d5a05cc dc58aee d5a05cc dc58aee d5a05cc 086625b dc58aee d5a05cc dc58aee 086625b d5a05cc dc58aee d5a05cc dc58aee d5a05cc dc58aee d5a05cc dc58aee d5a05cc dc58aee d5a05cc dc58aee d5a05cc dc58aee d5a05cc dc58aee d5a05cc dc58aee d5a05cc dc58aee d5a05cc 4420abf 086625b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 |
import json
import os
import datetime
import threading
from collections import defaultdict
from typing import List, Dict, Any, Optional
from fastapi import Request
class UsageTracker:
def __init__(self, data_file="usage_data.json"):
self.data_file = data_file
self.lock = threading.Lock()
self.data = self._load_data()
self._schedule_save()
def _load_data(self) -> Dict[str, Any]:
"""
Loads usage data from the JSON file, ensuring data integrity.
Handles cases where the file might be corrupted or in an old format.
"""
if os.path.exists(self.data_file):
try:
with open(self.data_file, 'r') as f:
data = json.load(f)
# Check if data is in the expected new format
if isinstance(data, dict) and 'requests' in data and 'models' in data and 'api_endpoints' in data:
return data
# If data is in an older, simpler format, convert it
elif isinstance(data, dict) and 'total_requests' in data: # Heuristic for old format
return self._convert_old_format(data)
except (json.JSONDecodeError, TypeError) as e:
print(f"Warning: Could not decode JSON from {self.data_file} ({e}). Starting fresh.")
return self._initialize_empty_data()
def _initialize_empty_data(self) -> Dict[str, Any]:
"""
Initializes a new, empty data structure for usage tracking.
This structure includes a list for all requests, and dictionaries
to store aggregated data for models and API endpoints.
"""
return {
'requests': [],
'models': defaultdict(lambda: {'total_requests': 0, 'first_used': None, 'last_used': None}),
'api_endpoints': defaultdict(lambda: {'total_requests': 0, 'first_used': None, 'last_used': None})
}
def _convert_old_format(self, old_data: Dict[str, Any]) -> Dict[str, Any]:
"""
Converts data from the old format to the new detailed format.
This is a crucial step to avoid data loss on updates.
It iterates through old 'requests' (if any) and re-records them
into the new structured format.
"""
print("Converting old usage data format to new format.")
new_data = self._initialize_empty_data()
# Preserve existing requests if they follow a basic structure
if 'requests' in old_data and isinstance(old_data['requests'], list):
for req in old_data['requests']:
# Attempt to extract relevant fields from old request entry
timestamp_str = req.get('timestamp')
model_name = req.get('model', 'unknown_model')
endpoint_name = req.get('endpoint', 'unknown_endpoint')
ip_address = req.get('ip_address', 'N/A')
user_agent = req.get('user_agent', 'N/A')
# Ensure timestamp is valid and parseable
try:
timestamp = datetime.datetime.fromisoformat(timestamp_str) if timestamp_str else datetime.datetime.now(datetime.timezone.utc)
except ValueError:
timestamp = datetime.datetime.now(datetime.timezone.utc) # Fallback if timestamp is malformed
new_data['requests'].append({
'timestamp': timestamp.isoformat(),
'model': model_name,
'endpoint': endpoint_name,
'ip_address': ip_address,
'user_agent': user_agent,
})
# Update aggregated stats for models and endpoints
# This ensures that even old data contributes to the new summary
if not new_data['models'][model_name]['first_used'] or timestamp < datetime.datetime.fromisoformat(new_data['models'][model_name]['first_used']):
new_data['models'][model_name]['first_used'] = timestamp.isoformat()
if not new_data['models'][model_name]['last_used'] or timestamp > datetime.datetime.fromisoformat(new_data['models'][model_name]['last_used']):
new_data['models'][model_name]['last_used'] = timestamp.isoformat()
new_data['models'][model_name]['total_requests'] += 1
if not new_data['api_endpoints'][endpoint_name]['first_used'] or timestamp < datetime.datetime.fromisoformat(new_data['api_endpoints'][endpoint_name]['first_used']):
new_data['api_endpoints'][endpoint_name]['first_used'] = timestamp.isoformat()
if not new_data['api_endpoints'][endpoint_name]['last_used'] or timestamp > datetime.datetime.fromisoformat(new_data['api_endpoints'][endpoint_name]['last_used']):
new_data['api_endpoints'][endpoint_name]['last_used'] = timestamp.isoformat()
new_data['api_endpoints'][endpoint_name]['total_requests'] += 1
print("Data conversion complete.")
return new_data
def save_data(self):
"""Saves current usage data to the JSON file periodically."""
with self.lock:
try:
# Convert defaultdicts to regular dicts for JSON serialization
serializable_data = {
'requests': self.data['requests'],
'models': dict(self.data['models']),
'api_endpoints': dict(self.data['api_endpoints'])
}
with open(self.data_file, 'w') as f:
json.dump(serializable_data, f, indent=4)
except IOError as e:
print(f"Error saving usage data to {self.data_file}: {e}")
def _schedule_save(self):
"""Schedules the data to be saved every 60 seconds."""
# Use a non-daemon thread for saving to ensure it runs even if main thread exits
# if using daemon threads, ensure proper shutdown hook is in place.
# For simplicity in this context, a direct Timer call is fine.
threading.Timer(60.0, self._schedule_save).start()
self.save_data()
def record_request(self, request: Optional[Request] = None, model: str = "unknown", endpoint: str = "unknown"):
"""
Records a single API request with detailed information.
Updates both the raw request list and aggregated statistics.
"""
with self.lock:
now = datetime.datetime.now(datetime.timezone.utc)
ip_address = request.client.host if request and request.client else "N/A"
user_agent = request.headers.get("user-agent", "N/A") if request else "N/A"
# Append to raw requests list
self.data['requests'].append({
'timestamp': now.isoformat(),
'model': model,
'endpoint': endpoint,
'ip_address': ip_address,
'user_agent': user_agent,
})
# Update model specific stats
model_stats = self.data['models'][model]
model_stats['total_requests'] += 1
if model_stats['first_used'] is None or now < datetime.datetime.fromisoformat(model_stats['first_used']):
model_stats['first_used'] = now.isoformat()
if model_stats['last_used'] is None or now > datetime.datetime.fromisoformat(model_stats['last_used']):
model_stats['last_used'] = now.isoformat()
# Update endpoint specific stats
endpoint_stats = self.data['api_endpoints'][endpoint]
endpoint_stats['total_requests'] += 1
if endpoint_stats['first_used'] is None or now < datetime.datetime.fromisoformat(endpoint_stats['first_used']):
endpoint_stats['first_used'] = now.isoformat()
if endpoint_stats['last_used'] is None or now > datetime.datetime.fromisoformat(endpoint_stats['last_used']):
endpoint_stats['last_used'] = now.isoformat()
def get_usage_summary(self, days: int = 7) -> Dict[str, Any]:
"""
Generates a comprehensive summary of usage data for the specified number of days.
Includes total requests, model usage, endpoint usage, daily usage, and unique IPs.
"""
with self.lock:
summary = {
'total_requests': 0,
'model_usage': defaultdict(int), # Requests per model for the period
'endpoint_usage': defaultdict(int), # Requests per endpoint for the period
'daily_usage': defaultdict(lambda: {'requests': 0, 'unique_ips': set()}), # Daily stats
'unique_ips_total': set(), # Unique IPs across all requests
'recent_requests': []
}
# Prepare data for model and API endpoint charts
# These are based on the aggregated 'self.data' which covers all time,
# but the summary 'model_usage' and 'endpoint_usage' below are for the given 'days' period.
all_time_model_data = {
model: {
'total_requests': stats['total_requests'],
'first_used': stats['first_used'],
'last_used': stats['last_used']
} for model, stats in self.data['models'].items()
}
all_time_endpoint_data = {
endpoint: {
'total_requests': stats['total_requests'],
'first_used': stats['first_used'],
'last_used': stats['last_used']
} for endpoint, stats in self.data['api_endpoints'].items()
}
cutoff_date = datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(days=days)
# Iterate backwards for recent requests and aggregate data for the specified period
requests_for_period = []
for req in reversed(self.data['requests']):
req_time = datetime.datetime.fromisoformat(req['timestamp'])
# Always update total requests and unique IPs for all time
summary['total_requests'] += 1
summary['unique_ips_total'].add(req['ip_address'])
if req_time >= cutoff_date:
requests_for_period.append(req)
date_str = req_time.strftime("%Y-%m-%d")
# Aggregate data for charts and tables for the given period
summary['model_usage'][req['model']] += 1
summary['endpoint_usage'][req['endpoint']] += 1
summary['daily_usage'][date_str]['requests'] += 1
summary['daily_usage'][date_str]['unique_ips'].add(req['ip_address'])
# Add to recent requests list (up to 20)
if len(summary['recent_requests']) < 20:
summary['recent_requests'].append(req)
# Convert daily unique IPs set to count
for date_str, daily_stats in summary['daily_usage'].items():
daily_stats['unique_ips_count'] = len(daily_stats['unique_ips'])
del daily_stats['unique_ips'] # Remove the set before returning
# Sort daily usage by date
summary['daily_usage'] = dict(sorted(summary['daily_usage'].items()))
# Convert defaultdicts to regular dicts for final summary
summary['model_usage_period'] = dict(summary['model_usage'])
summary['endpoint_usage_period'] = dict(summary['endpoint_usage'])
summary['daily_usage_period'] = dict(summary['daily_usage'])
# Add all-time data
summary['all_time_model_usage'] = all_time_model_data
summary['all_time_endpoint_usage'] = all_time_endpoint_data
summary['unique_ips_total_count'] = len(summary['unique_ips_total'])
del summary['unique_ips_total'] # No need to send the whole set
# Clean up defaultdicts that are not needed in the final output structure
del summary['model_usage']
del summary['endpoint_usage']
del summary['daily_usage']
return summary
|