File size: 6,719 Bytes
79899c0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
import time
import threading
from typing import Optional


class SnowflakeIDGenerator:
    """
    雪花ID生成器

    雪花ID结构 (64位):
    - 符号位: 1位,固定为0
    - 时间戳: 41位,毫秒级时间戳
    - 工作机器ID: 10位,包含5位数据中心ID和5位机器ID
    - 序列号: 12位,同一毫秒内的自增序列

    特点:
    - 趋势递增
    - 全局唯一
    - 支持分布式环境
    - 高性能
    """

    def __init__(self, datacenter_id: int = 1, worker_id: int = 1, sequence: int = 0):
        """
        初始化雪花ID生成器

        Args:
            datacenter_id: 数据中心ID (0-31)
            worker_id: 工作机器ID (0-31)
            sequence: 初始序列号
        """
        # 位数分配
        self.TIMESTAMP_BITS = 41
        self.DATACENTER_ID_BITS = 5
        self.WORKER_ID_BITS = 5
        self.SEQUENCE_BITS = 12

        # 最大值
        self.MAX_DATACENTER_ID = -1 ^ (-1 << self.DATACENTER_ID_BITS)
        self.MAX_WORKER_ID = -1 ^ (-1 << self.WORKER_ID_BITS)
        self.MAX_SEQUENCE = -1 ^ (-1 << self.SEQUENCE_BITS)

        # 偏移量
        self.WORKER_ID_SHIFT = self.SEQUENCE_BITS
        self.DATACENTER_ID_SHIFT = self.SEQUENCE_BITS + self.WORKER_ID_BITS
        self.TIMESTAMP_LEFT_SHIFT = (
            self.SEQUENCE_BITS + self.WORKER_ID_BITS + self.DATACENTER_ID_BITS
        )

        # 验证参数
        if datacenter_id > self.MAX_DATACENTER_ID or datacenter_id < 0:
            raise ValueError(
                f"Datacenter ID must be between 0 and {self.MAX_DATACENTER_ID}"
            )
        if worker_id > self.MAX_WORKER_ID or worker_id < 0:
            raise ValueError(f"Worker ID must be between 0 and {self.MAX_WORKER_ID}")

        self.datacenter_id = datacenter_id
        self.worker_id = worker_id
        self.sequence = sequence

        # 时间戳基准点 (2023-01-01 00:00:00 UTC)
        self.EPOCH = 1672531200000

        # 上次生成ID的时间戳
        self.last_timestamp = -1

        # 线程锁
        self.lock = threading.Lock()

    def _get_timestamp(self) -> int:
        """
        获取当前毫秒时间戳

        Returns:
            当前毫秒时间戳
        """
        return int(time.time() * 1000)

    def _wait_for_next_millis(self, last_timestamp: int) -> int:
        """
        等待到下一毫秒

        Args:
            last_timestamp: 上次时间戳

        Returns:
            新的时间戳
        """
        timestamp = self._get_timestamp()
        while timestamp <= last_timestamp:
            timestamp = self._get_timestamp()
        return timestamp

    def generate_id(self) -> int:
        """
        生成雪花ID

        Returns:
            64位雪花ID

        Raises:
            RuntimeError: 时钟回拨时抛出异常
        """
        with self.lock:
            timestamp = self._get_timestamp()

            # 检查时钟回拨
            if timestamp < self.last_timestamp:
                raise RuntimeError(
                    f"Clock moved backwards. Refusing to generate id for {self.last_timestamp - timestamp} milliseconds"
                )

            # 如果是同一毫秒内
            if timestamp == self.last_timestamp:
                self.sequence = (self.sequence + 1) & self.MAX_SEQUENCE
                # 如果序列号溢出,等待下一毫秒
                if self.sequence == 0:
                    timestamp = self._wait_for_next_millis(self.last_timestamp)
            else:
                # 不同毫秒,序列号重置
                self.sequence = 0

            self.last_timestamp = timestamp

            # 生成ID
            snowflake_id = (
                ((timestamp - self.EPOCH) << self.TIMESTAMP_LEFT_SHIFT)
                | (self.datacenter_id << self.DATACENTER_ID_SHIFT)
                | (self.worker_id << self.WORKER_ID_SHIFT)
                | self.sequence
            )

            return snowflake_id

    def generate_id_str(self) -> str:
        """
        生成字符串格式的雪花ID

        Returns:
            字符串格式的雪花ID
        """
        return str(self.generate_id())

    def parse_id(self, snowflake_id: int) -> dict:
        """
        解析雪花ID

        Args:
            snowflake_id: 雪花ID

        Returns:
            包含解析结果的字典
        """
        timestamp = (snowflake_id >> self.TIMESTAMP_LEFT_SHIFT) + self.EPOCH
        datacenter_id = (
            snowflake_id >> self.DATACENTER_ID_SHIFT
        ) & self.MAX_DATACENTER_ID
        worker_id = (snowflake_id >> self.WORKER_ID_SHIFT) & self.MAX_WORKER_ID
        sequence = snowflake_id & self.MAX_SEQUENCE

        return {
            "timestamp": timestamp,
            "datacenter_id": datacenter_id,
            "worker_id": worker_id,
            "sequence": sequence,
            "datetime": time.strftime(
                "%Y-%m-%d %H:%M:%S", time.localtime(timestamp / 1000)
            ),
        }


# 全局雪花ID生成器实例
_snowflake_generator: Optional[SnowflakeIDGenerator] = None
_generator_lock = threading.Lock()


def get_snowflake_generator(
    datacenter_id: int = 1, worker_id: int = 1
) -> SnowflakeIDGenerator:
    """
    获取全局雪花ID生成器实例

    Args:
        datacenter_id: 数据中心ID
        worker_id: 工作机器ID

    Returns:
        雪花ID生成器实例
    """
    global _snowflake_generator

    if _snowflake_generator is None:
        with _generator_lock:
            if _snowflake_generator is None:
                _snowflake_generator = SnowflakeIDGenerator(datacenter_id, worker_id)

    return _snowflake_generator


def generate_snowflake_id() -> int:
    """
    生成雪花ID (使用默认配置)

    Returns:
        64位雪花ID
    """
    return get_snowflake_generator().generate_id()


def generate_snowflake_id_str() -> str:
    """
    生成字符串格式的雪花ID (使用默认配置)

    Returns:
        字符串格式的雪花ID
    """
    return get_snowflake_generator().generate_id_str()


def parse_snowflake_id(snowflake_id: int) -> dict:
    """
    解析雪花ID

    Args:
        snowflake_id: 雪花ID

    Returns:
        包含解析结果的字典
    """
    return get_snowflake_generator().parse_id(snowflake_id)


# 便捷函数
def snowflake_id() -> int:
    """
    快速生成雪花ID的便捷函数

    Returns:
        64位雪花ID
    """
    return generate_snowflake_id()


def snowflake_id_str() -> str:
    """
    快速生成字符串格式雪花ID的便捷函数

    Returns:
        字符串格式的雪花ID
    """
    return generate_snowflake_id_str()