File size: 12,423 Bytes
67a227e
086625b
 
 
d5a05cc
dc58aee
 
67a227e
 
086625b
 
 
 
dc58aee
67a227e
dc58aee
d5a05cc
 
 
 
086625b
 
 
 
d5a05cc
 
dc58aee
d5a05cc
 
dc58aee
d5a05cc
 
086625b
 
d5a05cc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
dc58aee
 
d5a05cc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
dc58aee
4420abf
086625b
dc58aee
086625b
 
d5a05cc
 
 
 
 
 
086625b
d5a05cc
086625b
 
 
dc58aee
 
d5a05cc
 
 
dc58aee
 
 
d5a05cc
 
 
 
 
086625b
dc58aee
d5a05cc
 
dc58aee
d5a05cc
dc58aee
 
 
 
 
 
 
 
d5a05cc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
dc58aee
d5a05cc
 
 
 
086625b
 
dc58aee
d5a05cc
 
 
 
dc58aee
086625b
 
d5a05cc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
dc58aee
 
d5a05cc
 
dc58aee
 
 
d5a05cc
dc58aee
d5a05cc
dc58aee
 
d5a05cc
dc58aee
 
d5a05cc
dc58aee
 
d5a05cc
dc58aee
d5a05cc
dc58aee
d5a05cc
dc58aee
 
 
d5a05cc
 
 
 
dc58aee
 
 
d5a05cc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4420abf
086625b
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
import json
import os
import datetime
import threading
from collections import defaultdict
from typing import List, Dict, Any, Optional
from fastapi import Request

class UsageTracker:
    def __init__(self, data_file="usage_data.json"):
        self.data_file = data_file
        self.lock = threading.Lock()
        self.data = self._load_data()
        self._schedule_save()

    def _load_data(self) -> Dict[str, Any]:
        """
        Loads usage data from the JSON file, ensuring data integrity.
        Handles cases where the file might be corrupted or in an old format.
        """
        if os.path.exists(self.data_file):
            try:
                with open(self.data_file, 'r') as f:
                    data = json.load(f)
                    # Check if data is in the expected new format
                    if isinstance(data, dict) and 'requests' in data and 'models' in data and 'api_endpoints' in data:
                        return data
                    # If data is in an older, simpler format, convert it
                    elif isinstance(data, dict) and 'total_requests' in data: # Heuristic for old format
                        return self._convert_old_format(data)
            except (json.JSONDecodeError, TypeError) as e:
                print(f"Warning: Could not decode JSON from {self.data_file} ({e}). Starting fresh.")
        return self._initialize_empty_data()

    def _initialize_empty_data(self) -> Dict[str, Any]:
        """
        Initializes a new, empty data structure for usage tracking.
        This structure includes a list for all requests, and dictionaries
        to store aggregated data for models and API endpoints.
        """
        return {
            'requests': [],
            'models': defaultdict(lambda: {'total_requests': 0, 'first_used': None, 'last_used': None}),
            'api_endpoints': defaultdict(lambda: {'total_requests': 0, 'first_used': None, 'last_used': None})
        }

    def _convert_old_format(self, old_data: Dict[str, Any]) -> Dict[str, Any]:
        """
        Converts data from the old format to the new detailed format.
        This is a crucial step to avoid data loss on updates.
        It iterates through old 'requests' (if any) and re-records them
        into the new structured format.
        """
        print("Converting old usage data format to new format.")
        new_data = self._initialize_empty_data()
        
        # Preserve existing requests if they follow a basic structure
        if 'requests' in old_data and isinstance(old_data['requests'], list):
            for req in old_data['requests']:
                # Attempt to extract relevant fields from old request entry
                timestamp_str = req.get('timestamp')
                model_name = req.get('model', 'unknown_model')
                endpoint_name = req.get('endpoint', 'unknown_endpoint')
                ip_address = req.get('ip_address', 'N/A')
                user_agent = req.get('user_agent', 'N/A')

                # Ensure timestamp is valid and parseable
                try:
                    timestamp = datetime.datetime.fromisoformat(timestamp_str) if timestamp_str else datetime.datetime.now(datetime.timezone.utc)
                except ValueError:
                    timestamp = datetime.datetime.now(datetime.timezone.utc) # Fallback if timestamp is malformed

                new_data['requests'].append({
                    'timestamp': timestamp.isoformat(),
                    'model': model_name,
                    'endpoint': endpoint_name,
                    'ip_address': ip_address,
                    'user_agent': user_agent,
                })
                
                # Update aggregated stats for models and endpoints
                # This ensures that even old data contributes to the new summary
                if not new_data['models'][model_name]['first_used'] or timestamp < datetime.datetime.fromisoformat(new_data['models'][model_name]['first_used']):
                    new_data['models'][model_name]['first_used'] = timestamp.isoformat()
                if not new_data['models'][model_name]['last_used'] or timestamp > datetime.datetime.fromisoformat(new_data['models'][model_name]['last_used']):
                    new_data['models'][model_name]['last_used'] = timestamp.isoformat()
                new_data['models'][model_name]['total_requests'] += 1

                if not new_data['api_endpoints'][endpoint_name]['first_used'] or timestamp < datetime.datetime.fromisoformat(new_data['api_endpoints'][endpoint_name]['first_used']):
                    new_data['api_endpoints'][endpoint_name]['first_used'] = timestamp.isoformat()
                if not new_data['api_endpoints'][endpoint_name]['last_used'] or timestamp > datetime.datetime.fromisoformat(new_data['api_endpoints'][endpoint_name]['last_used']):
                    new_data['api_endpoints'][endpoint_name]['last_used'] = timestamp.isoformat()
                new_data['api_endpoints'][endpoint_name]['total_requests'] += 1

        print("Data conversion complete.")
        return new_data

    def save_data(self):
        """Saves current usage data to the JSON file periodically."""
        with self.lock:
            try:
                # Convert defaultdicts to regular dicts for JSON serialization
                serializable_data = {
                    'requests': self.data['requests'],
                    'models': dict(self.data['models']),
                    'api_endpoints': dict(self.data['api_endpoints'])
                }
                with open(self.data_file, 'w') as f:
                    json.dump(serializable_data, f, indent=4)
            except IOError as e:
                print(f"Error saving usage data to {self.data_file}: {e}")

    def _schedule_save(self):
        """Schedules the data to be saved every 60 seconds."""
        # Use a non-daemon thread for saving to ensure it runs even if main thread exits
        # if using daemon threads, ensure proper shutdown hook is in place.
        # For simplicity in this context, a direct Timer call is fine.
        threading.Timer(60.0, self._schedule_save).start()
        self.save_data()

    def record_request(self, request: Optional[Request] = None, model: str = "unknown", endpoint: str = "unknown"):
        """
        Records a single API request with detailed information.
        Updates both the raw request list and aggregated statistics.
        """
        with self.lock:
            now = datetime.datetime.now(datetime.timezone.utc)
            ip_address = request.client.host if request and request.client else "N/A"
            user_agent = request.headers.get("user-agent", "N/A") if request else "N/A"

            # Append to raw requests list
            self.data['requests'].append({
                'timestamp': now.isoformat(),
                'model': model,
                'endpoint': endpoint,
                'ip_address': ip_address,
                'user_agent': user_agent,
            })

            # Update model specific stats
            model_stats = self.data['models'][model]
            model_stats['total_requests'] += 1
            if model_stats['first_used'] is None or now < datetime.datetime.fromisoformat(model_stats['first_used']):
                model_stats['first_used'] = now.isoformat()
            if model_stats['last_used'] is None or now > datetime.datetime.fromisoformat(model_stats['last_used']):
                model_stats['last_used'] = now.isoformat()

            # Update endpoint specific stats
            endpoint_stats = self.data['api_endpoints'][endpoint]
            endpoint_stats['total_requests'] += 1
            if endpoint_stats['first_used'] is None or now < datetime.datetime.fromisoformat(endpoint_stats['first_used']):
                endpoint_stats['first_used'] = now.isoformat()
            if endpoint_stats['last_used'] is None or now > datetime.datetime.fromisoformat(endpoint_stats['last_used']):
                endpoint_stats['last_used'] = now.isoformat()

    def get_usage_summary(self, days: int = 7) -> Dict[str, Any]:
        """
        Generates a comprehensive summary of usage data for the specified number of days.
        Includes total requests, model usage, endpoint usage, daily usage, and unique IPs.
        """
        with self.lock:
            summary = {
                'total_requests': 0,
                'model_usage': defaultdict(int),      # Requests per model for the period
                'endpoint_usage': defaultdict(int),    # Requests per endpoint for the period
                'daily_usage': defaultdict(lambda: {'requests': 0, 'unique_ips': set()}), # Daily stats
                'unique_ips_total': set(),             # Unique IPs across all requests
                'recent_requests': []
            }

            # Prepare data for model and API endpoint charts
            # These are based on the aggregated 'self.data' which covers all time,
            # but the summary 'model_usage' and 'endpoint_usage' below are for the given 'days' period.
            all_time_model_data = {
                model: {
                    'total_requests': stats['total_requests'],
                    'first_used': stats['first_used'],
                    'last_used': stats['last_used']
                } for model, stats in self.data['models'].items()
            }
            all_time_endpoint_data = {
                endpoint: {
                    'total_requests': stats['total_requests'],
                    'first_used': stats['first_used'],
                    'last_used': stats['last_used']
                } for endpoint, stats in self.data['api_endpoints'].items()
            }


            cutoff_date = datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(days=days)

            # Iterate backwards for recent requests and aggregate data for the specified period
            requests_for_period = []
            for req in reversed(self.data['requests']):
                req_time = datetime.datetime.fromisoformat(req['timestamp'])
                
                # Always update total requests and unique IPs for all time
                summary['total_requests'] += 1
                summary['unique_ips_total'].add(req['ip_address'])

                if req_time >= cutoff_date:
                    requests_for_period.append(req)
                    date_str = req_time.strftime("%Y-%m-%d")
                    
                    # Aggregate data for charts and tables for the given period
                    summary['model_usage'][req['model']] += 1
                    summary['endpoint_usage'][req['endpoint']] += 1
                    
                    summary['daily_usage'][date_str]['requests'] += 1
                    summary['daily_usage'][date_str]['unique_ips'].add(req['ip_address'])
                    
                    # Add to recent requests list (up to 20)
                    if len(summary['recent_requests']) < 20:
                         summary['recent_requests'].append(req)

            # Convert daily unique IPs set to count
            for date_str, daily_stats in summary['daily_usage'].items():
                daily_stats['unique_ips_count'] = len(daily_stats['unique_ips'])
                del daily_stats['unique_ips'] # Remove the set before returning

            # Sort daily usage by date
            summary['daily_usage'] = dict(sorted(summary['daily_usage'].items()))
            
            # Convert defaultdicts to regular dicts for final summary
            summary['model_usage_period'] = dict(summary['model_usage'])
            summary['endpoint_usage_period'] = dict(summary['endpoint_usage'])
            summary['daily_usage_period'] = dict(summary['daily_usage'])
            
            # Add all-time data
            summary['all_time_model_usage'] = all_time_model_data
            summary['all_time_endpoint_usage'] = all_time_endpoint_data
            
            summary['unique_ips_total_count'] = len(summary['unique_ips_total'])
            del summary['unique_ips_total'] # No need to send the whole set

            # Clean up defaultdicts that are not needed in the final output structure
            del summary['model_usage']
            del summary['endpoint_usage']
            del summary['daily_usage']

            return summary