Spaces:
Sleeping
Sleeping
import time | |
import threading | |
from typing import Optional | |
class SnowflakeIDGenerator: | |
""" | |
雪花ID生成器 | |
雪花ID结构 (64位): | |
- 符号位: 1位,固定为0 | |
- 时间戳: 41位,毫秒级时间戳 | |
- 工作机器ID: 10位,包含5位数据中心ID和5位机器ID | |
- 序列号: 12位,同一毫秒内的自增序列 | |
特点: | |
- 趋势递增 | |
- 全局唯一 | |
- 支持分布式环境 | |
- 高性能 | |
""" | |
def __init__(self, datacenter_id: int = 1, worker_id: int = 1, sequence: int = 0): | |
""" | |
初始化雪花ID生成器 | |
Args: | |
datacenter_id: 数据中心ID (0-31) | |
worker_id: 工作机器ID (0-31) | |
sequence: 初始序列号 | |
""" | |
# 位数分配 | |
self.TIMESTAMP_BITS = 41 | |
self.DATACENTER_ID_BITS = 5 | |
self.WORKER_ID_BITS = 5 | |
self.SEQUENCE_BITS = 12 | |
# 最大值 | |
self.MAX_DATACENTER_ID = -1 ^ (-1 << self.DATACENTER_ID_BITS) | |
self.MAX_WORKER_ID = -1 ^ (-1 << self.WORKER_ID_BITS) | |
self.MAX_SEQUENCE = -1 ^ (-1 << self.SEQUENCE_BITS) | |
# 偏移量 | |
self.WORKER_ID_SHIFT = self.SEQUENCE_BITS | |
self.DATACENTER_ID_SHIFT = self.SEQUENCE_BITS + self.WORKER_ID_BITS | |
self.TIMESTAMP_LEFT_SHIFT = ( | |
self.SEQUENCE_BITS + self.WORKER_ID_BITS + self.DATACENTER_ID_BITS | |
) | |
# 验证参数 | |
if datacenter_id > self.MAX_DATACENTER_ID or datacenter_id < 0: | |
raise ValueError( | |
f"Datacenter ID must be between 0 and {self.MAX_DATACENTER_ID}" | |
) | |
if worker_id > self.MAX_WORKER_ID or worker_id < 0: | |
raise ValueError(f"Worker ID must be between 0 and {self.MAX_WORKER_ID}") | |
self.datacenter_id = datacenter_id | |
self.worker_id = worker_id | |
self.sequence = sequence | |
# 时间戳基准点 (2023-01-01 00:00:00 UTC) | |
self.EPOCH = 1672531200000 | |
# 上次生成ID的时间戳 | |
self.last_timestamp = -1 | |
# 线程锁 | |
self.lock = threading.Lock() | |
def _get_timestamp(self) -> int: | |
""" | |
获取当前毫秒时间戳 | |
Returns: | |
当前毫秒时间戳 | |
""" | |
return int(time.time() * 1000) | |
def _wait_for_next_millis(self, last_timestamp: int) -> int: | |
""" | |
等待到下一毫秒 | |
Args: | |
last_timestamp: 上次时间戳 | |
Returns: | |
新的时间戳 | |
""" | |
timestamp = self._get_timestamp() | |
while timestamp <= last_timestamp: | |
timestamp = self._get_timestamp() | |
return timestamp | |
def generate_id(self) -> int: | |
""" | |
生成雪花ID | |
Returns: | |
64位雪花ID | |
Raises: | |
RuntimeError: 时钟回拨时抛出异常 | |
""" | |
with self.lock: | |
timestamp = self._get_timestamp() | |
# 检查时钟回拨 | |
if timestamp < self.last_timestamp: | |
raise RuntimeError( | |
f"Clock moved backwards. Refusing to generate id for {self.last_timestamp - timestamp} milliseconds" | |
) | |
# 如果是同一毫秒内 | |
if timestamp == self.last_timestamp: | |
self.sequence = (self.sequence + 1) & self.MAX_SEQUENCE | |
# 如果序列号溢出,等待下一毫秒 | |
if self.sequence == 0: | |
timestamp = self._wait_for_next_millis(self.last_timestamp) | |
else: | |
# 不同毫秒,序列号重置 | |
self.sequence = 0 | |
self.last_timestamp = timestamp | |
# 生成ID | |
snowflake_id = ( | |
((timestamp - self.EPOCH) << self.TIMESTAMP_LEFT_SHIFT) | |
| (self.datacenter_id << self.DATACENTER_ID_SHIFT) | |
| (self.worker_id << self.WORKER_ID_SHIFT) | |
| self.sequence | |
) | |
return snowflake_id | |
def generate_id_str(self) -> str: | |
""" | |
生成字符串格式的雪花ID | |
Returns: | |
字符串格式的雪花ID | |
""" | |
return str(self.generate_id()) | |
def parse_id(self, snowflake_id: int) -> dict: | |
""" | |
解析雪花ID | |
Args: | |
snowflake_id: 雪花ID | |
Returns: | |
包含解析结果的字典 | |
""" | |
timestamp = (snowflake_id >> self.TIMESTAMP_LEFT_SHIFT) + self.EPOCH | |
datacenter_id = ( | |
snowflake_id >> self.DATACENTER_ID_SHIFT | |
) & self.MAX_DATACENTER_ID | |
worker_id = (snowflake_id >> self.WORKER_ID_SHIFT) & self.MAX_WORKER_ID | |
sequence = snowflake_id & self.MAX_SEQUENCE | |
return { | |
"timestamp": timestamp, | |
"datacenter_id": datacenter_id, | |
"worker_id": worker_id, | |
"sequence": sequence, | |
"datetime": time.strftime( | |
"%Y-%m-%d %H:%M:%S", time.localtime(timestamp / 1000) | |
), | |
} | |
# 全局雪花ID生成器实例 | |
_snowflake_generator: Optional[SnowflakeIDGenerator] = None | |
_generator_lock = threading.Lock() | |
def get_snowflake_generator( | |
datacenter_id: int = 1, worker_id: int = 1 | |
) -> SnowflakeIDGenerator: | |
""" | |
获取全局雪花ID生成器实例 | |
Args: | |
datacenter_id: 数据中心ID | |
worker_id: 工作机器ID | |
Returns: | |
雪花ID生成器实例 | |
""" | |
global _snowflake_generator | |
if _snowflake_generator is None: | |
with _generator_lock: | |
if _snowflake_generator is None: | |
_snowflake_generator = SnowflakeIDGenerator(datacenter_id, worker_id) | |
return _snowflake_generator | |
def generate_snowflake_id() -> int: | |
""" | |
生成雪花ID (使用默认配置) | |
Returns: | |
64位雪花ID | |
""" | |
return get_snowflake_generator().generate_id() | |
def generate_snowflake_id_str() -> str: | |
""" | |
生成字符串格式的雪花ID (使用默认配置) | |
Returns: | |
字符串格式的雪花ID | |
""" | |
return get_snowflake_generator().generate_id_str() | |
def parse_snowflake_id(snowflake_id: int) -> dict: | |
""" | |
解析雪花ID | |
Args: | |
snowflake_id: 雪花ID | |
Returns: | |
包含解析结果的字典 | |
""" | |
return get_snowflake_generator().parse_id(snowflake_id) | |
# 便捷函数 | |
def snowflake_id() -> int: | |
""" | |
快速生成雪花ID的便捷函数 | |
Returns: | |
64位雪花ID | |
""" | |
return generate_snowflake_id() | |
def snowflake_id_str() -> str: | |
""" | |
快速生成字符串格式雪花ID的便捷函数 | |
Returns: | |
字符串格式的雪花ID | |
""" | |
return generate_snowflake_id_str() | |