ParthSadaria commited on
Commit
d5a05cc
·
verified ·
1 Parent(s): 925b0de

Update usage_tracker.py

Browse files
Files changed (1) hide show
  1. usage_tracker.py +161 -35
usage_tracker.py CHANGED
@@ -2,7 +2,7 @@ import json
2
  import os
3
  import datetime
4
  import threading
5
- from collections import Counter, defaultdict
6
  from typing import List, Dict, Any, Optional
7
  from fastapi import Request
8
 
@@ -14,56 +14,121 @@ class UsageTracker:
14
  self._schedule_save()
15
 
16
  def _load_data(self) -> Dict[str, Any]:
17
- """Loads usage data from the JSON file, ensuring data integrity."""
 
 
 
18
  if os.path.exists(self.data_file):
19
  try:
20
  with open(self.data_file, 'r') as f:
21
  data = json.load(f)
22
- if isinstance(data, dict) and 'requests' in data:
 
23
  return data
24
- # If data is old format, try to convert it
25
- if isinstance(data.get('total_requests'), int):
26
  return self._convert_old_format(data)
27
- except (json.JSONDecodeError, TypeError):
28
- print(f"Warning: Could not decode JSON from {self.data_file}. Starting fresh.")
29
  return self._initialize_empty_data()
30
 
31
- def _initialize_empty_data(self) -> Dict[str, List]:
32
- """Initializes a new data structure for usage tracking."""
33
- return {'requests': []}
34
-
35
- def _convert_old_format(self, old_data: Dict[str, Any]) -> Dict[str, List]:
36
- """Converts data from the old format to the new detailed format."""
 
 
 
 
 
 
 
 
 
 
 
 
 
37
  print("Converting old usage data format to new format.")
38
  new_data = self._initialize_empty_data()
39
- # This is a simplification; a more robust conversion would be needed for full data recovery
40
- # For now, we are just starting fresh with the new structure to avoid complexity.
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
41
  return new_data
42
 
43
  def save_data(self):
44
  """Saves current usage data to the JSON file periodically."""
45
  with self.lock:
46
  try:
 
 
 
 
 
 
47
  with open(self.data_file, 'w') as f:
48
- json.dump(self.data, f, indent=4)
49
  except IOError as e:
50
  print(f"Error saving usage data to {self.data_file}: {e}")
51
 
52
  def _schedule_save(self):
53
  """Schedules the data to be saved every 60 seconds."""
 
 
 
54
  threading.Timer(60.0, self._schedule_save).start()
55
  self.save_data()
56
 
57
- def record_request(self, request: Optional[Request], model: str = "unknown", endpoint: str = "unknown"):
58
- """Records a single API request with detailed information."""
 
 
 
59
  with self.lock:
60
  now = datetime.datetime.now(datetime.timezone.utc)
61
- ip_address = "N/A"
62
- user_agent = "N/A"
63
- if request:
64
- ip_address = request.client.host
65
- user_agent = request.headers.get("user-agent", "N/A")
66
 
 
67
  self.data['requests'].append({
68
  'timestamp': now.isoformat(),
69
  'model': model,
@@ -72,45 +137,106 @@ class UsageTracker:
72
  'user_agent': user_agent,
73
  })
74
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
75
  def get_usage_summary(self, days: int = 7) -> Dict[str, Any]:
76
- """Generates a comprehensive summary of usage data."""
 
 
 
77
  with self.lock:
78
  summary = {
79
  'total_requests': 0,
80
- 'model_usage': defaultdict(int),
81
- 'endpoint_usage': defaultdict(int),
82
- 'daily_usage': defaultdict(lambda: defaultdict(int)),
83
- 'unique_ips': set(),
84
  'recent_requests': []
85
  }
86
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
87
  cutoff_date = datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(days=days)
88
 
89
- # Iterate backwards for recent requests
 
90
  for req in reversed(self.data['requests']):
91
  req_time = datetime.datetime.fromisoformat(req['timestamp'])
92
 
93
- # Update total requests (for all time)
94
  summary['total_requests'] += 1
95
- summary['unique_ips'].add(req['ip_address'])
96
 
97
  if req_time >= cutoff_date:
 
98
  date_str = req_time.strftime("%Y-%m-%d")
99
 
100
- # Aggregate data for charts and tables
101
  summary['model_usage'][req['model']] += 1
102
  summary['endpoint_usage'][req['endpoint']] += 1
 
103
  summary['daily_usage'][date_str]['requests'] += 1
 
104
 
105
- # Add to recent requests list
106
  if len(summary['recent_requests']) < 20:
107
  summary['recent_requests'].append(req)
108
 
 
 
 
 
109
 
110
  # Sort daily usage by date
111
  summary['daily_usage'] = dict(sorted(summary['daily_usage'].items()))
112
- summary['unique_ip_count'] = len(summary['unique_ips'])
113
- del summary['unique_ips'] # No need to send the whole set
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
114
 
115
  return summary
116
 
 
2
  import os
3
  import datetime
4
  import threading
5
+ from collections import defaultdict
6
  from typing import List, Dict, Any, Optional
7
  from fastapi import Request
8
 
 
14
  self._schedule_save()
15
 
16
  def _load_data(self) -> Dict[str, Any]:
17
+ """
18
+ Loads usage data from the JSON file, ensuring data integrity.
19
+ Handles cases where the file might be corrupted or in an old format.
20
+ """
21
  if os.path.exists(self.data_file):
22
  try:
23
  with open(self.data_file, 'r') as f:
24
  data = json.load(f)
25
+ # Check if data is in the expected new format
26
+ if isinstance(data, dict) and 'requests' in data and 'models' in data and 'api_endpoints' in data:
27
  return data
28
+ # If data is in an older, simpler format, convert it
29
+ elif isinstance(data, dict) and 'total_requests' in data: # Heuristic for old format
30
  return self._convert_old_format(data)
31
+ except (json.JSONDecodeError, TypeError) as e:
32
+ print(f"Warning: Could not decode JSON from {self.data_file} ({e}). Starting fresh.")
33
  return self._initialize_empty_data()
34
 
35
+ def _initialize_empty_data(self) -> Dict[str, Any]:
36
+ """
37
+ Initializes a new, empty data structure for usage tracking.
38
+ This structure includes a list for all requests, and dictionaries
39
+ to store aggregated data for models and API endpoints.
40
+ """
41
+ return {
42
+ 'requests': [],
43
+ 'models': defaultdict(lambda: {'total_requests': 0, 'first_used': None, 'last_used': None}),
44
+ 'api_endpoints': defaultdict(lambda: {'total_requests': 0, 'first_used': None, 'last_used': None})
45
+ }
46
+
47
+ def _convert_old_format(self, old_data: Dict[str, Any]) -> Dict[str, Any]:
48
+ """
49
+ Converts data from the old format to the new detailed format.
50
+ This is a crucial step to avoid data loss on updates.
51
+ It iterates through old 'requests' (if any) and re-records them
52
+ into the new structured format.
53
+ """
54
  print("Converting old usage data format to new format.")
55
  new_data = self._initialize_empty_data()
56
+
57
+ # Preserve existing requests if they follow a basic structure
58
+ if 'requests' in old_data and isinstance(old_data['requests'], list):
59
+ for req in old_data['requests']:
60
+ # Attempt to extract relevant fields from old request entry
61
+ timestamp_str = req.get('timestamp')
62
+ model_name = req.get('model', 'unknown_model')
63
+ endpoint_name = req.get('endpoint', 'unknown_endpoint')
64
+ ip_address = req.get('ip_address', 'N/A')
65
+ user_agent = req.get('user_agent', 'N/A')
66
+
67
+ # Ensure timestamp is valid and parseable
68
+ try:
69
+ timestamp = datetime.datetime.fromisoformat(timestamp_str) if timestamp_str else datetime.datetime.now(datetime.timezone.utc)
70
+ except ValueError:
71
+ timestamp = datetime.datetime.now(datetime.timezone.utc) # Fallback if timestamp is malformed
72
+
73
+ new_data['requests'].append({
74
+ 'timestamp': timestamp.isoformat(),
75
+ 'model': model_name,
76
+ 'endpoint': endpoint_name,
77
+ 'ip_address': ip_address,
78
+ 'user_agent': user_agent,
79
+ })
80
+
81
+ # Update aggregated stats for models and endpoints
82
+ # This ensures that even old data contributes to the new summary
83
+ if not new_data['models'][model_name]['first_used'] or timestamp < datetime.datetime.fromisoformat(new_data['models'][model_name]['first_used']):
84
+ new_data['models'][model_name]['first_used'] = timestamp.isoformat()
85
+ if not new_data['models'][model_name]['last_used'] or timestamp > datetime.datetime.fromisoformat(new_data['models'][model_name]['last_used']):
86
+ new_data['models'][model_name]['last_used'] = timestamp.isoformat()
87
+ new_data['models'][model_name]['total_requests'] += 1
88
+
89
+ if not new_data['api_endpoints'][endpoint_name]['first_used'] or timestamp < datetime.datetime.fromisoformat(new_data['api_endpoints'][endpoint_name]['first_used']):
90
+ new_data['api_endpoints'][endpoint_name]['first_used'] = timestamp.isoformat()
91
+ if not new_data['api_endpoints'][endpoint_name]['last_used'] or timestamp > datetime.datetime.fromisoformat(new_data['api_endpoints'][endpoint_name]['last_used']):
92
+ new_data['api_endpoints'][endpoint_name]['last_used'] = timestamp.isoformat()
93
+ new_data['api_endpoints'][endpoint_name]['total_requests'] += 1
94
+
95
+ print("Data conversion complete.")
96
  return new_data
97
 
98
  def save_data(self):
99
  """Saves current usage data to the JSON file periodically."""
100
  with self.lock:
101
  try:
102
+ # Convert defaultdicts to regular dicts for JSON serialization
103
+ serializable_data = {
104
+ 'requests': self.data['requests'],
105
+ 'models': dict(self.data['models']),
106
+ 'api_endpoints': dict(self.data['api_endpoints'])
107
+ }
108
  with open(self.data_file, 'w') as f:
109
+ json.dump(serializable_data, f, indent=4)
110
  except IOError as e:
111
  print(f"Error saving usage data to {self.data_file}: {e}")
112
 
113
  def _schedule_save(self):
114
  """Schedules the data to be saved every 60 seconds."""
115
+ # Use a non-daemon thread for saving to ensure it runs even if main thread exits
116
+ # if using daemon threads, ensure proper shutdown hook is in place.
117
+ # For simplicity in this context, a direct Timer call is fine.
118
  threading.Timer(60.0, self._schedule_save).start()
119
  self.save_data()
120
 
121
+ def record_request(self, request: Optional[Request] = None, model: str = "unknown", endpoint: str = "unknown"):
122
+ """
123
+ Records a single API request with detailed information.
124
+ Updates both the raw request list and aggregated statistics.
125
+ """
126
  with self.lock:
127
  now = datetime.datetime.now(datetime.timezone.utc)
128
+ ip_address = request.client.host if request and request.client else "N/A"
129
+ user_agent = request.headers.get("user-agent", "N/A") if request else "N/A"
 
 
 
130
 
131
+ # Append to raw requests list
132
  self.data['requests'].append({
133
  'timestamp': now.isoformat(),
134
  'model': model,
 
137
  'user_agent': user_agent,
138
  })
139
 
140
+ # Update model specific stats
141
+ model_stats = self.data['models'][model]
142
+ model_stats['total_requests'] += 1
143
+ if model_stats['first_used'] is None or now < datetime.datetime.fromisoformat(model_stats['first_used']):
144
+ model_stats['first_used'] = now.isoformat()
145
+ if model_stats['last_used'] is None or now > datetime.datetime.fromisoformat(model_stats['last_used']):
146
+ model_stats['last_used'] = now.isoformat()
147
+
148
+ # Update endpoint specific stats
149
+ endpoint_stats = self.data['api_endpoints'][endpoint]
150
+ endpoint_stats['total_requests'] += 1
151
+ if endpoint_stats['first_used'] is None or now < datetime.datetime.fromisoformat(endpoint_stats['first_used']):
152
+ endpoint_stats['first_used'] = now.isoformat()
153
+ if endpoint_stats['last_used'] is None or now > datetime.datetime.fromisoformat(endpoint_stats['last_used']):
154
+ endpoint_stats['last_used'] = now.isoformat()
155
+
156
  def get_usage_summary(self, days: int = 7) -> Dict[str, Any]:
157
+ """
158
+ Generates a comprehensive summary of usage data for the specified number of days.
159
+ Includes total requests, model usage, endpoint usage, daily usage, and unique IPs.
160
+ """
161
  with self.lock:
162
  summary = {
163
  'total_requests': 0,
164
+ 'model_usage': defaultdict(int), # Requests per model for the period
165
+ 'endpoint_usage': defaultdict(int), # Requests per endpoint for the period
166
+ 'daily_usage': defaultdict(lambda: {'requests': 0, 'unique_ips': set()}), # Daily stats
167
+ 'unique_ips_total': set(), # Unique IPs across all requests
168
  'recent_requests': []
169
  }
170
 
171
+ # Prepare data for model and API endpoint charts
172
+ # These are based on the aggregated 'self.data' which covers all time,
173
+ # but the summary 'model_usage' and 'endpoint_usage' below are for the given 'days' period.
174
+ all_time_model_data = {
175
+ model: {
176
+ 'total_requests': stats['total_requests'],
177
+ 'first_used': stats['first_used'],
178
+ 'last_used': stats['last_used']
179
+ } for model, stats in self.data['models'].items()
180
+ }
181
+ all_time_endpoint_data = {
182
+ endpoint: {
183
+ 'total_requests': stats['total_requests'],
184
+ 'first_used': stats['first_used'],
185
+ 'last_used': stats['last_used']
186
+ } for endpoint, stats in self.data['api_endpoints'].items()
187
+ }
188
+
189
+
190
  cutoff_date = datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(days=days)
191
 
192
+ # Iterate backwards for recent requests and aggregate data for the specified period
193
+ requests_for_period = []
194
  for req in reversed(self.data['requests']):
195
  req_time = datetime.datetime.fromisoformat(req['timestamp'])
196
 
197
+ # Always update total requests and unique IPs for all time
198
  summary['total_requests'] += 1
199
+ summary['unique_ips_total'].add(req['ip_address'])
200
 
201
  if req_time >= cutoff_date:
202
+ requests_for_period.append(req)
203
  date_str = req_time.strftime("%Y-%m-%d")
204
 
205
+ # Aggregate data for charts and tables for the given period
206
  summary['model_usage'][req['model']] += 1
207
  summary['endpoint_usage'][req['endpoint']] += 1
208
+
209
  summary['daily_usage'][date_str]['requests'] += 1
210
+ summary['daily_usage'][date_str]['unique_ips'].add(req['ip_address'])
211
 
212
+ # Add to recent requests list (up to 20)
213
  if len(summary['recent_requests']) < 20:
214
  summary['recent_requests'].append(req)
215
 
216
+ # Convert daily unique IPs set to count
217
+ for date_str, daily_stats in summary['daily_usage'].items():
218
+ daily_stats['unique_ips_count'] = len(daily_stats['unique_ips'])
219
+ del daily_stats['unique_ips'] # Remove the set before returning
220
 
221
  # Sort daily usage by date
222
  summary['daily_usage'] = dict(sorted(summary['daily_usage'].items()))
223
+
224
+ # Convert defaultdicts to regular dicts for final summary
225
+ summary['model_usage_period'] = dict(summary['model_usage'])
226
+ summary['endpoint_usage_period'] = dict(summary['endpoint_usage'])
227
+ summary['daily_usage_period'] = dict(summary['daily_usage'])
228
+
229
+ # Add all-time data
230
+ summary['all_time_model_usage'] = all_time_model_data
231
+ summary['all_time_endpoint_usage'] = all_time_endpoint_data
232
+
233
+ summary['unique_ips_total_count'] = len(summary['unique_ips_total'])
234
+ del summary['unique_ips_total'] # No need to send the whole set
235
+
236
+ # Clean up defaultdicts that are not needed in the final output structure
237
+ del summary['model_usage']
238
+ del summary['endpoint_usage']
239
+ del summary['daily_usage']
240
 
241
  return summary
242