ParthSadaria commited on
Commit
dc58aee
·
verified ·
1 Parent(s): 6166ca2

Update usage_tracker.py

Browse files
Files changed (1) hide show
  1. usage_tracker.py +83 -98
usage_tracker.py CHANGED
@@ -2,130 +2,115 @@ import json
2
  import os
3
  import datetime
4
  import threading
5
- from collections import defaultdict
 
 
6
 
7
  class UsageTracker:
8
  def __init__(self, data_file="usage_data.json"):
9
  self.data_file = data_file
10
  self.lock = threading.Lock()
11
  self.data = self._load_data()
 
12
 
13
- def _load_data(self):
14
- """Loads usage data from the JSON file."""
15
  if os.path.exists(self.data_file):
16
  try:
17
  with open(self.data_file, 'r') as f:
18
  data = json.load(f)
19
- # Ensure all necessary keys exist, initialize if not
20
- data.setdefault('total_requests', 0)
21
- data.setdefault('models', {})
22
- data.setdefault('api_endpoints', {})
23
- data.setdefault('recent_daily_usage', {})
24
-
25
- # Convert loaded daily usage dictionaries back to defaultdicts for internal use
26
- loaded_daily_usage = {}
27
- for date, entities in data['recent_daily_usage'].items():
28
- loaded_daily_usage[date] = defaultdict(int, entities)
29
- data['recent_daily_usage'] = loaded_daily_usage
30
-
31
- return data
32
- except json.JSONDecodeError:
33
- print(f"Warning: Could not decode JSON from {self.data_file}. Starting with empty data.")
34
- return self._initialize_empty_data()
35
  return self._initialize_empty_data()
36
 
37
- def _initialize_empty_data(self):
38
- """Initializes an empty data structure for usage tracking."""
39
- return {
40
- 'total_requests': 0,
41
- 'models': {},
42
- 'api_endpoints': {},
43
- 'recent_daily_usage': {} # This will hold defaultdicts for each date
44
- }
45
-
46
- def _convert_defaultdicts_to_dicts(self, obj):
47
- """Recursively converts defaultdicts to dicts for JSON serialization."""
48
- if isinstance(obj, defaultdict):
49
- return {k: self._convert_defaultdicts_to_dicts(v) for k, v in obj.items()}
50
- elif isinstance(obj, dict):
51
- return {k: self._convert_defaultdicts_to_dicts(v) for k, v in obj.items()}
52
- elif isinstance(obj, list):
53
- return [self._convert_defaultdicts_to_dicts(elem) for elem in obj]
54
- return obj
55
 
56
  def save_data(self):
57
- """Saves current usage data to the JSON file."""
58
  with self.lock:
59
- # Convert defaultdicts to regular dicts before saving
60
- data_to_save = self._convert_defaultdicts_to_dicts(self.data)
61
  try:
62
  with open(self.data_file, 'w') as f:
63
- json.dump(data_to_save, f, indent=4)
64
  except IOError as e:
65
  print(f"Error saving usage data to {self.data_file}: {e}")
66
 
67
- def record_request(self, model: str = "unknown", endpoint: str = "unknown"):
68
- """Records a single API request, updating model, endpoint, and daily usage."""
 
 
 
 
 
69
  with self.lock:
70
- now = datetime.datetime.now()
71
- current_date = now.strftime("%Y-%m-%d")
72
- current_time = now.strftime("%Y-%m-%d %I:%M:%S %p")
73
-
74
- # Update total requests
75
- self.data['total_requests'] += 1
76
-
77
- # Update model usage
78
- if model not in self.data['models']:
79
- self.data['models'][model] = {
80
- 'total_requests': 0,
81
- 'first_used': current_time,
82
- 'last_used': current_time
83
- }
84
- self.data['models'][model]['total_requests'] += 1
85
- self.data['models'][model]['last_used'] = current_time
86
-
87
- # Update API endpoint usage
88
- if endpoint not in self.data['api_endpoints']:
89
- self.data['api_endpoints'][endpoint] = {
90
- 'total_requests': 0,
91
- 'first_used': current_time,
92
- 'last_used': current_time
93
- }
94
- self.data['api_endpoints'][endpoint]['total_requests'] += 1
95
- self.data['api_endpoints'][endpoint]['last_used'] = current_time
96
-
97
- # Update daily usage
98
- # Ensure the inner dictionary for the current_date is a defaultdict
99
- if current_date not in self.data['recent_daily_usage']:
100
- self.data['recent_daily_usage'][current_date] = defaultdict(int)
101
- elif not isinstance(self.data['recent_daily_usage'][current_date], defaultdict):
102
- # This handles cases where data was loaded as a plain dict
103
- self.data['recent_daily_usage'][current_date] = defaultdict(int, self.data['recent_daily_usage'][current_date])
104
-
105
- self.data['recent_daily_usage'][current_date][model] += 1
106
- self.data['recent_daily_usage'][current_date][endpoint] += 1
107
-
108
- # Removed the line that converts defaultdict back to dict here.
109
- # Conversion will now happen only during save_data.
110
-
111
-
112
- def get_usage_summary(self, days: int = 7):
113
- """Generates a summary of usage data for the last 'days'."""
114
  with self.lock:
115
  summary = {
116
- 'total_requests': self.data['total_requests'],
117
- 'models': self.data['models'],
118
- 'api_endpoints': self.data['api_endpoints'],
119
- 'recent_daily_usage': {}
 
 
120
  }
121
 
122
- # Filter daily usage for the last 'days' and ensure it's a plain dict for the summary
123
- today = datetime.date.today()
124
- for i in range(days):
125
- date = (today - datetime.timedelta(days=i)).strftime("%Y-%m-%d")
126
- if date in self.data['recent_daily_usage']:
127
- # Convert the defaultdict for this date to a plain dict for the summary
128
- summary['recent_daily_usage'][date] = dict(self.data['recent_daily_usage'][date])
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
129
 
130
  return summary
131
 
 
2
  import os
3
  import datetime
4
  import threading
5
+ from collections import Counter, defaultdict
6
+ from typing import List, Dict, Any, Optional
7
+ from fastapi import Request
8
 
9
  class UsageTracker:
10
  def __init__(self, data_file="usage_data.json"):
11
  self.data_file = data_file
12
  self.lock = threading.Lock()
13
  self.data = self._load_data()
14
+ self._schedule_save()
15
 
16
+ def _load_data(self) -> Dict[str, Any]:
17
+ """Loads usage data from the JSON file, ensuring data integrity."""
18
  if os.path.exists(self.data_file):
19
  try:
20
  with open(self.data_file, 'r') as f:
21
  data = json.load(f)
22
+ if isinstance(data, dict) and 'requests' in data:
23
+ return data
24
+ # If data is old format, try to convert it
25
+ if isinstance(data.get('total_requests'), int):
26
+ return self._convert_old_format(data)
27
+ except (json.JSONDecodeError, TypeError):
28
+ print(f"Warning: Could not decode JSON from {self.data_file}. Starting fresh.")
 
 
 
 
 
 
 
 
 
29
  return self._initialize_empty_data()
30
 
31
+ def _initialize_empty_data(self) -> Dict[str, List]:
32
+ """Initializes a new data structure for usage tracking."""
33
+ return {'requests': []}
34
+
35
+ def _convert_old_format(self, old_data: Dict[str, Any]) -> Dict[str, List]:
36
+ """Converts data from the old format to the new detailed format."""
37
+ print("Converting old usage data format to new format.")
38
+ new_data = self._initialize_empty_data()
39
+ # This is a simplification; a more robust conversion would be needed for full data recovery
40
+ # For now, we are just starting fresh with the new structure to avoid complexity.
41
+ return new_data
 
 
 
 
 
 
 
42
 
43
  def save_data(self):
44
+ """Saves current usage data to the JSON file periodically."""
45
  with self.lock:
 
 
46
  try:
47
  with open(self.data_file, 'w') as f:
48
+ json.dump(self.data, f, indent=4)
49
  except IOError as e:
50
  print(f"Error saving usage data to {self.data_file}: {e}")
51
 
52
+ def _schedule_save(self):
53
+ """Schedules the data to be saved every 60 seconds."""
54
+ threading.Timer(60.0, self._schedule_save).start()
55
+ self.save_data()
56
+
57
+ def record_request(self, request: Optional[Request], model: str = "unknown", endpoint: str = "unknown"):
58
+ """Records a single API request with detailed information."""
59
  with self.lock:
60
+ now = datetime.datetime.now(datetime.timezone.utc)
61
+ ip_address = "N/A"
62
+ user_agent = "N/A"
63
+ if request:
64
+ ip_address = request.client.host
65
+ user_agent = request.headers.get("user-agent", "N/A")
66
+
67
+ self.data['requests'].append({
68
+ 'timestamp': now.isoformat(),
69
+ 'model': model,
70
+ 'endpoint': endpoint,
71
+ 'ip_address': ip_address,
72
+ 'user_agent': user_agent,
73
+ })
74
+
75
+ def get_usage_summary(self, days: int = 7) -> Dict[str, Any]:
76
+ """Generates a comprehensive summary of usage data."""
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
77
  with self.lock:
78
  summary = {
79
+ 'total_requests': 0,
80
+ 'model_usage': defaultdict(int),
81
+ 'endpoint_usage': defaultdict(int),
82
+ 'daily_usage': defaultdict(lambda: defaultdict(int)),
83
+ 'unique_ips': set(),
84
+ 'recent_requests': []
85
  }
86
 
87
+ cutoff_date = datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(days=days)
88
+
89
+ # Iterate backwards for recent requests
90
+ for req in reversed(self.data['requests']):
91
+ req_time = datetime.datetime.fromisoformat(req['timestamp'])
92
+
93
+ # Update total requests (for all time)
94
+ summary['total_requests'] += 1
95
+ summary['unique_ips'].add(req['ip_address'])
96
+
97
+ if req_time >= cutoff_date:
98
+ date_str = req_time.strftime("%Y-%m-%d")
99
+
100
+ # Aggregate data for charts and tables
101
+ summary['model_usage'][req['model']] += 1
102
+ summary['endpoint_usage'][req['endpoint']] += 1
103
+ summary['daily_usage'][date_str]['requests'] += 1
104
+
105
+ # Add to recent requests list
106
+ if len(summary['recent_requests']) < 20:
107
+ summary['recent_requests'].append(req)
108
+
109
+
110
+ # Sort daily usage by date
111
+ summary['daily_usage'] = dict(sorted(summary['daily_usage'].items()))
112
+ summary['unique_ip_count'] = len(summary['unique_ips'])
113
+ del summary['unique_ips'] # No need to send the whole set
114
 
115
  return summary
116