lokiai / usage_tracker.py
ParthSadaria's picture
Update usage_tracker.py
d5a05cc verified
raw
history blame
12.4 kB
import json
import os
import datetime
import threading
from collections import defaultdict
from typing import List, Dict, Any, Optional
from fastapi import Request
class UsageTracker:
def __init__(self, data_file="usage_data.json"):
self.data_file = data_file
self.lock = threading.Lock()
self.data = self._load_data()
self._schedule_save()
def _load_data(self) -> Dict[str, Any]:
"""
Loads usage data from the JSON file, ensuring data integrity.
Handles cases where the file might be corrupted or in an old format.
"""
if os.path.exists(self.data_file):
try:
with open(self.data_file, 'r') as f:
data = json.load(f)
# Check if data is in the expected new format
if isinstance(data, dict) and 'requests' in data and 'models' in data and 'api_endpoints' in data:
return data
# If data is in an older, simpler format, convert it
elif isinstance(data, dict) and 'total_requests' in data: # Heuristic for old format
return self._convert_old_format(data)
except (json.JSONDecodeError, TypeError) as e:
print(f"Warning: Could not decode JSON from {self.data_file} ({e}). Starting fresh.")
return self._initialize_empty_data()
def _initialize_empty_data(self) -> Dict[str, Any]:
"""
Initializes a new, empty data structure for usage tracking.
This structure includes a list for all requests, and dictionaries
to store aggregated data for models and API endpoints.
"""
return {
'requests': [],
'models': defaultdict(lambda: {'total_requests': 0, 'first_used': None, 'last_used': None}),
'api_endpoints': defaultdict(lambda: {'total_requests': 0, 'first_used': None, 'last_used': None})
}
def _convert_old_format(self, old_data: Dict[str, Any]) -> Dict[str, Any]:
"""
Converts data from the old format to the new detailed format.
This is a crucial step to avoid data loss on updates.
It iterates through old 'requests' (if any) and re-records them
into the new structured format.
"""
print("Converting old usage data format to new format.")
new_data = self._initialize_empty_data()
# Preserve existing requests if they follow a basic structure
if 'requests' in old_data and isinstance(old_data['requests'], list):
for req in old_data['requests']:
# Attempt to extract relevant fields from old request entry
timestamp_str = req.get('timestamp')
model_name = req.get('model', 'unknown_model')
endpoint_name = req.get('endpoint', 'unknown_endpoint')
ip_address = req.get('ip_address', 'N/A')
user_agent = req.get('user_agent', 'N/A')
# Ensure timestamp is valid and parseable
try:
timestamp = datetime.datetime.fromisoformat(timestamp_str) if timestamp_str else datetime.datetime.now(datetime.timezone.utc)
except ValueError:
timestamp = datetime.datetime.now(datetime.timezone.utc) # Fallback if timestamp is malformed
new_data['requests'].append({
'timestamp': timestamp.isoformat(),
'model': model_name,
'endpoint': endpoint_name,
'ip_address': ip_address,
'user_agent': user_agent,
})
# Update aggregated stats for models and endpoints
# This ensures that even old data contributes to the new summary
if not new_data['models'][model_name]['first_used'] or timestamp < datetime.datetime.fromisoformat(new_data['models'][model_name]['first_used']):
new_data['models'][model_name]['first_used'] = timestamp.isoformat()
if not new_data['models'][model_name]['last_used'] or timestamp > datetime.datetime.fromisoformat(new_data['models'][model_name]['last_used']):
new_data['models'][model_name]['last_used'] = timestamp.isoformat()
new_data['models'][model_name]['total_requests'] += 1
if not new_data['api_endpoints'][endpoint_name]['first_used'] or timestamp < datetime.datetime.fromisoformat(new_data['api_endpoints'][endpoint_name]['first_used']):
new_data['api_endpoints'][endpoint_name]['first_used'] = timestamp.isoformat()
if not new_data['api_endpoints'][endpoint_name]['last_used'] or timestamp > datetime.datetime.fromisoformat(new_data['api_endpoints'][endpoint_name]['last_used']):
new_data['api_endpoints'][endpoint_name]['last_used'] = timestamp.isoformat()
new_data['api_endpoints'][endpoint_name]['total_requests'] += 1
print("Data conversion complete.")
return new_data
def save_data(self):
"""Saves current usage data to the JSON file periodically."""
with self.lock:
try:
# Convert defaultdicts to regular dicts for JSON serialization
serializable_data = {
'requests': self.data['requests'],
'models': dict(self.data['models']),
'api_endpoints': dict(self.data['api_endpoints'])
}
with open(self.data_file, 'w') as f:
json.dump(serializable_data, f, indent=4)
except IOError as e:
print(f"Error saving usage data to {self.data_file}: {e}")
def _schedule_save(self):
"""Schedules the data to be saved every 60 seconds."""
# Use a non-daemon thread for saving to ensure it runs even if main thread exits
# if using daemon threads, ensure proper shutdown hook is in place.
# For simplicity in this context, a direct Timer call is fine.
threading.Timer(60.0, self._schedule_save).start()
self.save_data()
def record_request(self, request: Optional[Request] = None, model: str = "unknown", endpoint: str = "unknown"):
"""
Records a single API request with detailed information.
Updates both the raw request list and aggregated statistics.
"""
with self.lock:
now = datetime.datetime.now(datetime.timezone.utc)
ip_address = request.client.host if request and request.client else "N/A"
user_agent = request.headers.get("user-agent", "N/A") if request else "N/A"
# Append to raw requests list
self.data['requests'].append({
'timestamp': now.isoformat(),
'model': model,
'endpoint': endpoint,
'ip_address': ip_address,
'user_agent': user_agent,
})
# Update model specific stats
model_stats = self.data['models'][model]
model_stats['total_requests'] += 1
if model_stats['first_used'] is None or now < datetime.datetime.fromisoformat(model_stats['first_used']):
model_stats['first_used'] = now.isoformat()
if model_stats['last_used'] is None or now > datetime.datetime.fromisoformat(model_stats['last_used']):
model_stats['last_used'] = now.isoformat()
# Update endpoint specific stats
endpoint_stats = self.data['api_endpoints'][endpoint]
endpoint_stats['total_requests'] += 1
if endpoint_stats['first_used'] is None or now < datetime.datetime.fromisoformat(endpoint_stats['first_used']):
endpoint_stats['first_used'] = now.isoformat()
if endpoint_stats['last_used'] is None or now > datetime.datetime.fromisoformat(endpoint_stats['last_used']):
endpoint_stats['last_used'] = now.isoformat()
def get_usage_summary(self, days: int = 7) -> Dict[str, Any]:
"""
Generates a comprehensive summary of usage data for the specified number of days.
Includes total requests, model usage, endpoint usage, daily usage, and unique IPs.
"""
with self.lock:
summary = {
'total_requests': 0,
'model_usage': defaultdict(int), # Requests per model for the period
'endpoint_usage': defaultdict(int), # Requests per endpoint for the period
'daily_usage': defaultdict(lambda: {'requests': 0, 'unique_ips': set()}), # Daily stats
'unique_ips_total': set(), # Unique IPs across all requests
'recent_requests': []
}
# Prepare data for model and API endpoint charts
# These are based on the aggregated 'self.data' which covers all time,
# but the summary 'model_usage' and 'endpoint_usage' below are for the given 'days' period.
all_time_model_data = {
model: {
'total_requests': stats['total_requests'],
'first_used': stats['first_used'],
'last_used': stats['last_used']
} for model, stats in self.data['models'].items()
}
all_time_endpoint_data = {
endpoint: {
'total_requests': stats['total_requests'],
'first_used': stats['first_used'],
'last_used': stats['last_used']
} for endpoint, stats in self.data['api_endpoints'].items()
}
cutoff_date = datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(days=days)
# Iterate backwards for recent requests and aggregate data for the specified period
requests_for_period = []
for req in reversed(self.data['requests']):
req_time = datetime.datetime.fromisoformat(req['timestamp'])
# Always update total requests and unique IPs for all time
summary['total_requests'] += 1
summary['unique_ips_total'].add(req['ip_address'])
if req_time >= cutoff_date:
requests_for_period.append(req)
date_str = req_time.strftime("%Y-%m-%d")
# Aggregate data for charts and tables for the given period
summary['model_usage'][req['model']] += 1
summary['endpoint_usage'][req['endpoint']] += 1
summary['daily_usage'][date_str]['requests'] += 1
summary['daily_usage'][date_str]['unique_ips'].add(req['ip_address'])
# Add to recent requests list (up to 20)
if len(summary['recent_requests']) < 20:
summary['recent_requests'].append(req)
# Convert daily unique IPs set to count
for date_str, daily_stats in summary['daily_usage'].items():
daily_stats['unique_ips_count'] = len(daily_stats['unique_ips'])
del daily_stats['unique_ips'] # Remove the set before returning
# Sort daily usage by date
summary['daily_usage'] = dict(sorted(summary['daily_usage'].items()))
# Convert defaultdicts to regular dicts for final summary
summary['model_usage_period'] = dict(summary['model_usage'])
summary['endpoint_usage_period'] = dict(summary['endpoint_usage'])
summary['daily_usage_period'] = dict(summary['daily_usage'])
# Add all-time data
summary['all_time_model_usage'] = all_time_model_data
summary['all_time_endpoint_usage'] = all_time_endpoint_data
summary['unique_ips_total_count'] = len(summary['unique_ips_total'])
del summary['unique_ips_total'] # No need to send the whole set
# Clean up defaultdicts that are not needed in the final output structure
del summary['model_usage']
del summary['endpoint_usage']
del summary['daily_usage']
return summary