Spaces:
Running
Running
File size: 4,875 Bytes
67a227e 086625b dc58aee 67a227e 086625b dc58aee 67a227e dc58aee 086625b dc58aee 086625b dc58aee 4420abf 086625b dc58aee 086625b dc58aee 086625b dc58aee 086625b dc58aee 086625b dc58aee 086625b dc58aee 4420abf 086625b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 |
import json
import os
import datetime
import threading
from collections import Counter, defaultdict
from typing import List, Dict, Any, Optional
from fastapi import Request
class UsageTracker:
def __init__(self, data_file="usage_data.json"):
self.data_file = data_file
self.lock = threading.Lock()
self.data = self._load_data()
self._schedule_save()
def _load_data(self) -> Dict[str, Any]:
"""Loads usage data from the JSON file, ensuring data integrity."""
if os.path.exists(self.data_file):
try:
with open(self.data_file, 'r') as f:
data = json.load(f)
if isinstance(data, dict) and 'requests' in data:
return data
# If data is old format, try to convert it
if isinstance(data.get('total_requests'), int):
return self._convert_old_format(data)
except (json.JSONDecodeError, TypeError):
print(f"Warning: Could not decode JSON from {self.data_file}. Starting fresh.")
return self._initialize_empty_data()
def _initialize_empty_data(self) -> Dict[str, List]:
"""Initializes a new data structure for usage tracking."""
return {'requests': []}
def _convert_old_format(self, old_data: Dict[str, Any]) -> Dict[str, List]:
"""Converts data from the old format to the new detailed format."""
print("Converting old usage data format to new format.")
new_data = self._initialize_empty_data()
# This is a simplification; a more robust conversion would be needed for full data recovery
# For now, we are just starting fresh with the new structure to avoid complexity.
return new_data
def save_data(self):
"""Saves current usage data to the JSON file periodically."""
with self.lock:
try:
with open(self.data_file, 'w') as f:
json.dump(self.data, f, indent=4)
except IOError as e:
print(f"Error saving usage data to {self.data_file}: {e}")
def _schedule_save(self):
"""Schedules the data to be saved every 60 seconds."""
threading.Timer(60.0, self._schedule_save).start()
self.save_data()
def record_request(self, request: Optional[Request], model: str = "unknown", endpoint: str = "unknown"):
"""Records a single API request with detailed information."""
with self.lock:
now = datetime.datetime.now(datetime.timezone.utc)
ip_address = "N/A"
user_agent = "N/A"
if request:
ip_address = request.client.host
user_agent = request.headers.get("user-agent", "N/A")
self.data['requests'].append({
'timestamp': now.isoformat(),
'model': model,
'endpoint': endpoint,
'ip_address': ip_address,
'user_agent': user_agent,
})
def get_usage_summary(self, days: int = 7) -> Dict[str, Any]:
"""Generates a comprehensive summary of usage data."""
with self.lock:
summary = {
'total_requests': 0,
'model_usage': defaultdict(int),
'endpoint_usage': defaultdict(int),
'daily_usage': defaultdict(lambda: defaultdict(int)),
'unique_ips': set(),
'recent_requests': []
}
cutoff_date = datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(days=days)
# Iterate backwards for recent requests
for req in reversed(self.data['requests']):
req_time = datetime.datetime.fromisoformat(req['timestamp'])
# Update total requests (for all time)
summary['total_requests'] += 1
summary['unique_ips'].add(req['ip_address'])
if req_time >= cutoff_date:
date_str = req_time.strftime("%Y-%m-%d")
# Aggregate data for charts and tables
summary['model_usage'][req['model']] += 1
summary['endpoint_usage'][req['endpoint']] += 1
summary['daily_usage'][date_str]['requests'] += 1
# Add to recent requests list
if len(summary['recent_requests']) < 20:
summary['recent_requests'].append(req)
# Sort daily usage by date
summary['daily_usage'] = dict(sorted(summary['daily_usage'].items()))
summary['unique_ip_count'] = len(summary['unique_ips'])
del summary['unique_ips'] # No need to send the whole set
return summary
|