lokiai / usage_tracker.py
ParthSadaria's picture
Update usage_tracker.py
dc58aee verified
raw
history blame
4.88 kB
import json
import os
import datetime
import threading
from collections import Counter, defaultdict
from typing import List, Dict, Any, Optional
from fastapi import Request
class UsageTracker:
def __init__(self, data_file="usage_data.json"):
self.data_file = data_file
self.lock = threading.Lock()
self.data = self._load_data()
self._schedule_save()
def _load_data(self) -> Dict[str, Any]:
"""Loads usage data from the JSON file, ensuring data integrity."""
if os.path.exists(self.data_file):
try:
with open(self.data_file, 'r') as f:
data = json.load(f)
if isinstance(data, dict) and 'requests' in data:
return data
# If data is old format, try to convert it
if isinstance(data.get('total_requests'), int):
return self._convert_old_format(data)
except (json.JSONDecodeError, TypeError):
print(f"Warning: Could not decode JSON from {self.data_file}. Starting fresh.")
return self._initialize_empty_data()
def _initialize_empty_data(self) -> Dict[str, List]:
"""Initializes a new data structure for usage tracking."""
return {'requests': []}
def _convert_old_format(self, old_data: Dict[str, Any]) -> Dict[str, List]:
"""Converts data from the old format to the new detailed format."""
print("Converting old usage data format to new format.")
new_data = self._initialize_empty_data()
# This is a simplification; a more robust conversion would be needed for full data recovery
# For now, we are just starting fresh with the new structure to avoid complexity.
return new_data
def save_data(self):
"""Saves current usage data to the JSON file periodically."""
with self.lock:
try:
with open(self.data_file, 'w') as f:
json.dump(self.data, f, indent=4)
except IOError as e:
print(f"Error saving usage data to {self.data_file}: {e}")
def _schedule_save(self):
"""Schedules the data to be saved every 60 seconds."""
threading.Timer(60.0, self._schedule_save).start()
self.save_data()
def record_request(self, request: Optional[Request], model: str = "unknown", endpoint: str = "unknown"):
"""Records a single API request with detailed information."""
with self.lock:
now = datetime.datetime.now(datetime.timezone.utc)
ip_address = "N/A"
user_agent = "N/A"
if request:
ip_address = request.client.host
user_agent = request.headers.get("user-agent", "N/A")
self.data['requests'].append({
'timestamp': now.isoformat(),
'model': model,
'endpoint': endpoint,
'ip_address': ip_address,
'user_agent': user_agent,
})
def get_usage_summary(self, days: int = 7) -> Dict[str, Any]:
"""Generates a comprehensive summary of usage data."""
with self.lock:
summary = {
'total_requests': 0,
'model_usage': defaultdict(int),
'endpoint_usage': defaultdict(int),
'daily_usage': defaultdict(lambda: defaultdict(int)),
'unique_ips': set(),
'recent_requests': []
}
cutoff_date = datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(days=days)
# Iterate backwards for recent requests
for req in reversed(self.data['requests']):
req_time = datetime.datetime.fromisoformat(req['timestamp'])
# Update total requests (for all time)
summary['total_requests'] += 1
summary['unique_ips'].add(req['ip_address'])
if req_time >= cutoff_date:
date_str = req_time.strftime("%Y-%m-%d")
# Aggregate data for charts and tables
summary['model_usage'][req['model']] += 1
summary['endpoint_usage'][req['endpoint']] += 1
summary['daily_usage'][date_str]['requests'] += 1
# Add to recent requests list
if len(summary['recent_requests']) < 20:
summary['recent_requests'].append(req)
# Sort daily usage by date
summary['daily_usage'] = dict(sorted(summary['daily_usage'].items()))
summary['unique_ip_count'] = len(summary['unique_ips'])
del summary['unique_ips'] # No need to send the whole set
return summary