File size: 4,875 Bytes
67a227e
086625b
 
 
dc58aee
 
 
67a227e
 
086625b
 
 
 
dc58aee
67a227e
dc58aee
 
086625b
 
 
 
dc58aee
 
 
 
 
 
 
086625b
 
dc58aee
 
 
 
 
 
 
 
 
 
 
4420abf
086625b
dc58aee
086625b
 
 
dc58aee
086625b
 
 
dc58aee
 
 
 
 
 
 
086625b
dc58aee
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
086625b
 
dc58aee
 
 
 
 
 
086625b
 
dc58aee
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4420abf
086625b
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
import json
import os
import datetime
import threading
from collections import Counter, defaultdict
from typing import List, Dict, Any, Optional
from fastapi import Request

class UsageTracker:
    def __init__(self, data_file="usage_data.json"):
        self.data_file = data_file
        self.lock = threading.Lock()
        self.data = self._load_data()
        self._schedule_save()

    def _load_data(self) -> Dict[str, Any]:
        """Loads usage data from the JSON file, ensuring data integrity."""
        if os.path.exists(self.data_file):
            try:
                with open(self.data_file, 'r') as f:
                    data = json.load(f)
                    if isinstance(data, dict) and 'requests' in data:
                        return data
                    # If data is old format, try to convert it
                    if isinstance(data.get('total_requests'), int):
                        return self._convert_old_format(data)
            except (json.JSONDecodeError, TypeError):
                print(f"Warning: Could not decode JSON from {self.data_file}. Starting fresh.")
        return self._initialize_empty_data()

    def _initialize_empty_data(self) -> Dict[str, List]:
        """Initializes a new data structure for usage tracking."""
        return {'requests': []}

    def _convert_old_format(self, old_data: Dict[str, Any]) -> Dict[str, List]:
        """Converts data from the old format to the new detailed format."""
        print("Converting old usage data format to new format.")
        new_data = self._initialize_empty_data()
        # This is a simplification; a more robust conversion would be needed for full data recovery
        # For now, we are just starting fresh with the new structure to avoid complexity.
        return new_data

    def save_data(self):
        """Saves current usage data to the JSON file periodically."""
        with self.lock:
            try:
                with open(self.data_file, 'w') as f:
                    json.dump(self.data, f, indent=4)
            except IOError as e:
                print(f"Error saving usage data to {self.data_file}: {e}")

    def _schedule_save(self):
        """Schedules the data to be saved every 60 seconds."""
        threading.Timer(60.0, self._schedule_save).start()
        self.save_data()

    def record_request(self, request: Optional[Request], model: str = "unknown", endpoint: str = "unknown"):
        """Records a single API request with detailed information."""
        with self.lock:
            now = datetime.datetime.now(datetime.timezone.utc)
            ip_address = "N/A"
            user_agent = "N/A"
            if request:
                ip_address = request.client.host
                user_agent = request.headers.get("user-agent", "N/A")

            self.data['requests'].append({
                'timestamp': now.isoformat(),
                'model': model,
                'endpoint': endpoint,
                'ip_address': ip_address,
                'user_agent': user_agent,
            })

    def get_usage_summary(self, days: int = 7) -> Dict[str, Any]:
        """Generates a comprehensive summary of usage data."""
        with self.lock:
            summary = {
                'total_requests': 0,
                'model_usage': defaultdict(int),
                'endpoint_usage': defaultdict(int),
                'daily_usage': defaultdict(lambda: defaultdict(int)),
                'unique_ips': set(),
                'recent_requests': []
            }

            cutoff_date = datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(days=days)

            # Iterate backwards for recent requests
            for req in reversed(self.data['requests']):
                req_time = datetime.datetime.fromisoformat(req['timestamp'])
                
                # Update total requests (for all time)
                summary['total_requests'] += 1
                summary['unique_ips'].add(req['ip_address'])

                if req_time >= cutoff_date:
                    date_str = req_time.strftime("%Y-%m-%d")
                    
                    # Aggregate data for charts and tables
                    summary['model_usage'][req['model']] += 1
                    summary['endpoint_usage'][req['endpoint']] += 1
                    summary['daily_usage'][date_str]['requests'] += 1
                    
                    # Add to recent requests list
                    if len(summary['recent_requests']) < 20:
                         summary['recent_requests'].append(req)


            # Sort daily usage by date
            summary['daily_usage'] = dict(sorted(summary['daily_usage'].items()))
            summary['unique_ip_count'] = len(summary['unique_ips'])
            del summary['unique_ips'] # No need to send the whole set

            return summary