File size: 5,841 Bytes
67a227e
086625b
 
 
 
67a227e
 
086625b
 
 
 
67a227e
086625b
 
 
 
 
 
 
 
 
 
 
4420abf
 
 
 
 
 
 
086625b
 
 
 
 
 
 
 
67a227e
086625b
 
 
4420abf
67a227e
 
4420abf
 
 
 
 
 
 
 
 
 
086625b
 
 
4420abf
 
086625b
 
4420abf
086625b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
67a227e
086625b
 
 
 
 
 
 
 
 
67a227e
086625b
 
 
 
4420abf
086625b
 
4420abf
 
 
 
086625b
 
4420abf
 
 
67a227e
 
086625b
 
 
 
 
 
 
 
 
 
4420abf
086625b
 
 
 
4420abf
 
 
086625b
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
import json
import os
import datetime
import threading
from collections import defaultdict

class UsageTracker:
    def __init__(self, data_file="usage_data.json"):
        self.data_file = data_file
        self.lock = threading.Lock()
        self.data = self._load_data()

    def _load_data(self):
        """Loads usage data from the JSON file."""
        if os.path.exists(self.data_file):
            try:
                with open(self.data_file, 'r') as f:
                    data = json.load(f)
                    # Ensure all necessary keys exist, initialize if not
                    data.setdefault('total_requests', 0)
                    data.setdefault('models', {})
                    data.setdefault('api_endpoints', {})
                    data.setdefault('recent_daily_usage', {})

                    # Convert loaded daily usage dictionaries back to defaultdicts for internal use
                    loaded_daily_usage = {}
                    for date, entities in data['recent_daily_usage'].items():
                        loaded_daily_usage[date] = defaultdict(int, entities)
                    data['recent_daily_usage'] = loaded_daily_usage

                    return data
            except json.JSONDecodeError:
                print(f"Warning: Could not decode JSON from {self.data_file}. Starting with empty data.")
                return self._initialize_empty_data()
        return self._initialize_empty_data()

    def _initialize_empty_data(self):
        """Initializes an empty data structure for usage tracking."""
        return {
            'total_requests': 0,
            'models': {},
            'api_endpoints': {},
            'recent_daily_usage': {} # This will hold defaultdicts for each date
        }

    def _convert_defaultdicts_to_dicts(self, obj):
        """Recursively converts defaultdicts to dicts for JSON serialization."""
        if isinstance(obj, defaultdict):
            return {k: self._convert_defaultdicts_to_dicts(v) for k, v in obj.items()}
        elif isinstance(obj, dict):
            return {k: self._convert_defaultdicts_to_dicts(v) for k, v in obj.items()}
        elif isinstance(obj, list):
            return [self._convert_defaultdicts_to_dicts(elem) for elem in obj]
        return obj

    def save_data(self):
        """Saves current usage data to the JSON file."""
        with self.lock:
            # Convert defaultdicts to regular dicts before saving
            data_to_save = self._convert_defaultdicts_to_dicts(self.data)
            try:
                with open(self.data_file, 'w') as f:
                    json.dump(data_to_save, f, indent=4)
            except IOError as e:
                print(f"Error saving usage data to {self.data_file}: {e}")

    def record_request(self, model: str = "unknown", endpoint: str = "unknown"):
        """Records a single API request, updating model, endpoint, and daily usage."""
        with self.lock:
            now = datetime.datetime.now()
            current_date = now.strftime("%Y-%m-%d")
            current_time = now.strftime("%Y-%m-%d %I:%M:%S %p")

            # Update total requests
            self.data['total_requests'] += 1

            # Update model usage
            if model not in self.data['models']:
                self.data['models'][model] = {
                    'total_requests': 0,
                    'first_used': current_time,
                    'last_used': current_time
                }
            self.data['models'][model]['total_requests'] += 1
            self.data['models'][model]['last_used'] = current_time

            # Update API endpoint usage
            if endpoint not in self.data['api_endpoints']:
                self.data['api_endpoints'][endpoint] = {
                    'total_requests': 0,
                    'first_used': current_time,
                    'last_used': current_time
                }
            self.data['api_endpoints'][endpoint]['total_requests'] += 1
            self.data['api_endpoints'][endpoint]['last_used'] = current_time

            # Update daily usage
            # Ensure the inner dictionary for the current_date is a defaultdict
            if current_date not in self.data['recent_daily_usage']:
                self.data['recent_daily_usage'][current_date] = defaultdict(int)
            elif not isinstance(self.data['recent_daily_usage'][current_date], defaultdict):
                # This handles cases where data was loaded as a plain dict
                self.data['recent_daily_usage'][current_date] = defaultdict(int, self.data['recent_daily_usage'][current_date])

            self.data['recent_daily_usage'][current_date][model] += 1
            self.data['recent_daily_usage'][current_date][endpoint] += 1

            # Removed the line that converts defaultdict back to dict here.
            # Conversion will now happen only during save_data.


    def get_usage_summary(self, days: int = 7):
        """Generates a summary of usage data for the last 'days'."""
        with self.lock:
            summary = {
                'total_requests': self.data['total_requests'],
                'models': self.data['models'],
                'api_endpoints': self.data['api_endpoints'],
                'recent_daily_usage': {}
            }

            # Filter daily usage for the last 'days' and ensure it's a plain dict for the summary
            today = datetime.date.today()
            for i in range(days):
                date = (today - datetime.timedelta(days=i)).strftime("%Y-%m-%d")
                if date in self.data['recent_daily_usage']:
                    # Convert the defaultdict for this date to a plain dict for the summary
                    summary['recent_daily_usage'][date] = dict(self.data['recent_daily_usage'][date])

            return summary