Skip to content

Commit 04b8d1e

Browse files
committed
添加分布式ID Sequence的插件实现
1 parent fee0673 commit 04b8d1e

File tree

4 files changed

+319
-0
lines changed

4 files changed

+319
-0
lines changed
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<parent>
6+
<artifactId>api-boot-plugins</artifactId>
7+
<groupId>org.minbox.framework</groupId>
8+
<version>${revision}</version>
9+
</parent>
10+
<artifactId>api-boot-plugin-sequence</artifactId>
11+
<packaging>jar</packaging>
12+
<description>
13+
分布式高效ID算法集成,基于Twitter的Snowflake("雪花算法")算法实现分布式高效有序ID生产黑科技
14+
15+
Distributed efficient ID algorithm integration,
16+
based on Twitter's Snowflake algorithm to achieve distributed efficient and orderly ID production black
17+
technology.
18+
19+
See more details:https://gitee.com/yu120/sequence
20+
</description>
21+
<modelVersion>4.0.0</modelVersion>
22+
<properties>
23+
<main.basedir>${basedir}/../../..</main.basedir>
24+
</properties>
25+
26+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,213 @@
1+
package org.minbox.framework.api.boot.plugin.sequence;
2+
3+
import java.net.InetAddress;
4+
import java.util.concurrent.ThreadLocalRandom;
5+
6+
/**
7+
* 基于Twitter的Snowflake算法实现分布式高效有序ID生产黑科技(sequence)——升级版Snowflake
8+
* <p>
9+
* 开源框架地址:https://gitee.com/yu120/sequence
10+
* <p>
11+
* 特性:
12+
* 1.支持自定义允许时间回拨的范围<p>
13+
* 2.解决跨毫秒起始值每次为0开始的情况(避免末尾必定为偶数,而不便于取余使用问题)<p>
14+
* 3.解决高并发场景中获取时间戳性能问题<p>
15+
* 4.支撑根据IP末尾数据作为workerId
16+
* 5.时间回拨方案思考:1024个节点中分配10个点作为时间回拨序号(连续10次时间回拨的概率较小)
17+
*
18+
* @author lry
19+
* @version 3.0
20+
*/
21+
public final class Sequence {
22+
23+
/**
24+
* 起始时间戳
25+
**/
26+
private final static long START_TIME = 1519740777809L;
27+
28+
/**
29+
* dataCenterId占用的位数:2
30+
**/
31+
private final static long DATA_CENTER_ID_BITS = 2L;
32+
/**
33+
* workerId占用的位数:8
34+
**/
35+
private final static long WORKER_ID_BITS = 8L;
36+
/**
37+
* 序列号占用的位数:12(表示只允许workId的范围为:0-4095)
38+
**/
39+
private final static long SEQUENCE_BITS = 12L;
40+
41+
/**
42+
* workerId可以使用范围:0-255
43+
**/
44+
private final static long MAX_WORKER_ID = ~(-1L << WORKER_ID_BITS);
45+
/**
46+
* dataCenterId可以使用范围:0-3
47+
**/
48+
private final static long MAX_DATA_CENTER_ID = ~(-1L << DATA_CENTER_ID_BITS);
49+
50+
private final static long WORKER_ID_SHIFT = SEQUENCE_BITS;
51+
private final static long DATA_CENTER_ID_SHIFT = SEQUENCE_BITS + WORKER_ID_BITS;
52+
private final static long TIMESTAMP_LEFT_SHIFT = SEQUENCE_BITS + WORKER_ID_BITS + DATA_CENTER_ID_BITS;
53+
54+
/**
55+
* 用mask防止溢出:位与运算保证计算的结果范围始终是 0-4095
56+
**/
57+
private final static long SEQUENCE_MASK = ~(-1L << SEQUENCE_BITS);
58+
59+
private final long workerId;
60+
private final long dataCenterId;
61+
private long sequence = 0L;
62+
private long lastTimestamp = -1L;
63+
64+
private static byte LAST_IP = 0;
65+
private final boolean clock;
66+
private final long timeOffset;
67+
private final boolean randomSequence;
68+
private final ThreadLocalRandom tlr = ThreadLocalRandom.current();
69+
70+
public Sequence(long dataCenterId) {
71+
this(dataCenterId, 0x000000FF & getLastIPAddress(), false, 5L, false);
72+
}
73+
74+
public Sequence(long dataCenterId, boolean clock, boolean randomSequence) {
75+
this(dataCenterId, 0x000000FF & getLastIPAddress(), clock, 5L, randomSequence);
76+
}
77+
78+
/**
79+
* 基于Snowflake创建分布式ID生成器
80+
*
81+
* @param dataCenterId 数据中心ID,数据范围为0~255
82+
* @param workerId 工作机器ID,数据范围为0~3
83+
* @param clock true表示解决高并发下获取时间戳的性能问题
84+
* @param timeOffset 允许时间回拨的毫秒量,建议5ms
85+
* @param randomSequence true表示使用毫秒内的随机序列(超过范围则取余)
86+
*/
87+
public Sequence(long dataCenterId, long workerId, boolean clock, long timeOffset, boolean randomSequence) {
88+
if (dataCenterId > MAX_DATA_CENTER_ID || dataCenterId < 0) {
89+
throw new IllegalArgumentException("Data Center Id can't be greater than " + MAX_DATA_CENTER_ID + " or less than 0");
90+
}
91+
if (workerId > MAX_WORKER_ID || workerId < 0) {
92+
throw new IllegalArgumentException("Worker Id can't be greater than " + MAX_WORKER_ID + " or less than 0");
93+
}
94+
95+
this.workerId = workerId;
96+
this.dataCenterId = dataCenterId;
97+
this.clock = clock;
98+
this.timeOffset = timeOffset;
99+
this.randomSequence = randomSequence;
100+
}
101+
102+
/**
103+
* 获取ID
104+
*
105+
* @return long
106+
*/
107+
public synchronized Long nextId() {
108+
long currentTimestamp = this.timeGen();
109+
110+
// 闰秒:如果当前时间小于上一次ID生成的时间戳,说明系统时钟回退过,这个时候应当抛出异常
111+
if (currentTimestamp < lastTimestamp) {
112+
// 校验时间偏移回拨量
113+
long offset = lastTimestamp - currentTimestamp;
114+
if (offset > timeOffset) {
115+
throw new RuntimeException("Clock moved backwards, refusing to generate id for [" + offset + "ms]");
116+
}
117+
118+
try {
119+
// 时间回退timeOffset毫秒内,则允许等待2倍的偏移量后重新获取,解决小范围的时间回拨问题
120+
this.wait(offset << 1);
121+
} catch (Exception e) {
122+
throw new RuntimeException(e);
123+
}
124+
// 再次获取
125+
currentTimestamp = this.timeGen();
126+
// 再次校验
127+
if (currentTimestamp < lastTimestamp) {
128+
throw new RuntimeException("Clock moved backwards, refusing to generate id for [" + offset + "ms]");
129+
}
130+
}
131+
132+
// 同一毫秒内序列直接自增
133+
if (lastTimestamp == currentTimestamp) {
134+
// randomSequence为true表示随机生成允许范围内的序列起始值并取余数,否则毫秒内起始值为0L开始自增
135+
long tempSequence = sequence + 1;
136+
if (randomSequence && tempSequence > SEQUENCE_MASK) {
137+
tempSequence = tempSequence % SEQUENCE_MASK;
138+
}
139+
140+
// 通过位与运算保证计算的结果范围始终是 0-4095
141+
sequence = tempSequence & SEQUENCE_MASK;
142+
if (sequence == 0) {
143+
currentTimestamp = this.tilNextMillis(lastTimestamp);
144+
}
145+
} else {
146+
// randomSequence为true表示随机生成允许范围内的序列起始值,否则毫秒内起始值为0L开始自增
147+
sequence = randomSequence ? tlr.nextLong(SEQUENCE_MASK + 1) : 0L;
148+
}
149+
150+
lastTimestamp = currentTimestamp;
151+
long currentOffsetTime = currentTimestamp - START_TIME;
152+
153+
/*
154+
* 1.左移运算是为了将数值移动到对应的段(41、5、5,12那段因为本来就在最右,因此不用左移)
155+
* 2.然后对每个左移后的值(la、lb、lc、sequence)做位或运算,是为了把各个短的数据合并起来,合并成一个二进制数
156+
* 3.最后转换成10进制,就是最终生成的id
157+
*/
158+
return (currentOffsetTime << TIMESTAMP_LEFT_SHIFT) |
159+
// 数据中心位
160+
(dataCenterId << DATA_CENTER_ID_SHIFT) |
161+
// 工作ID位
162+
(workerId << WORKER_ID_SHIFT) |
163+
// 毫秒序列化位
164+
sequence;
165+
}
166+
167+
/**
168+
* 保证返回的毫秒数在参数之后(阻塞到下一个毫秒,直到获得新的时间戳)——CAS
169+
*
170+
* @param lastTimestamp last timestamp
171+
* @return next millis
172+
*/
173+
private long tilNextMillis(long lastTimestamp) {
174+
long timestamp = this.timeGen();
175+
while (timestamp <= lastTimestamp) {
176+
// 如果发现时间回拨,则自动重新获取(可能会处于无限循环中)
177+
timestamp = this.timeGen();
178+
}
179+
180+
return timestamp;
181+
}
182+
183+
/**
184+
* 获得系统当前毫秒时间戳
185+
*
186+
* @return timestamp 毫秒时间戳
187+
*/
188+
private long timeGen() {
189+
return clock ? SystemClock.INSTANCE.currentTimeMillis() : System.currentTimeMillis();
190+
}
191+
192+
/**
193+
* 用IP地址最后几个字节标示
194+
*
195+
* @return last IP
196+
*/
197+
public static byte getLastIPAddress() {
198+
if (LAST_IP != 0) {
199+
return LAST_IP;
200+
}
201+
202+
try {
203+
InetAddress inetAddress = InetAddress.getLocalHost();
204+
byte[] addressByte = inetAddress.getAddress();
205+
LAST_IP = addressByte[addressByte.length - 1];
206+
} catch (Exception e) {
207+
throw new RuntimeException("Unknown Host Exception", e);
208+
}
209+
210+
return LAST_IP;
211+
}
212+
213+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
package org.minbox.framework.api.boot.plugin.sequence;
2+
3+
4+
import java.sql.Timestamp;
5+
import java.util.concurrent.ScheduledExecutorService;
6+
import java.util.concurrent.ScheduledThreadPoolExecutor;
7+
import java.util.concurrent.TimeUnit;
8+
import java.util.concurrent.atomic.AtomicLong;
9+
10+
/**
11+
* System Clock
12+
* <p>
13+
* 利用ScheduledExecutorService实现高并发场景下System.curentTimeMillis()的性能问题的优化.
14+
*
15+
* @author lry
16+
*/
17+
public enum SystemClock {
18+
19+
// ====
20+
21+
INSTANCE(1);
22+
23+
private final long period;
24+
private final AtomicLong nowTime;
25+
private boolean started = false;
26+
private ScheduledExecutorService executorService;
27+
28+
SystemClock(long period) {
29+
this.period = period;
30+
this.nowTime = new AtomicLong(System.currentTimeMillis());
31+
}
32+
33+
/**
34+
* The initialize scheduled executor service
35+
*/
36+
public void initialize() {
37+
if (started) {
38+
return;
39+
}
40+
41+
this.executorService = new ScheduledThreadPoolExecutor(1, r -> {
42+
Thread thread = new Thread(r, "system-clock");
43+
thread.setDaemon(true);
44+
return thread;
45+
});
46+
executorService.scheduleAtFixedRate(() -> nowTime.set(System.currentTimeMillis()),
47+
this.period, this.period, TimeUnit.MILLISECONDS);
48+
Runtime.getRuntime().addShutdownHook(new Thread(this::destroy));
49+
started = true;
50+
}
51+
52+
/**
53+
* The get current time milliseconds
54+
*
55+
* @return long time
56+
*/
57+
public long currentTimeMillis() {
58+
return started ? nowTime.get() : System.currentTimeMillis();
59+
}
60+
61+
/**
62+
* The get string current time
63+
*
64+
* @return string time
65+
*/
66+
public String currentTime() {
67+
return new Timestamp(currentTimeMillis()).toString();
68+
}
69+
70+
/**
71+
* The destroy of executor service
72+
*/
73+
public void destroy() {
74+
if (executorService != null) {
75+
executorService.shutdown();
76+
}
77+
}
78+
79+
}

api-boot-project/api-boot-plugins/pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,5 +30,6 @@
3030
<module>api-boot-plugin-message-push</module>
3131
<module>api-boot-plugin-rate-limiter</module>
3232
<module>api-boot-plugin-mail</module>
33+
<module>api-boot-plugin-sequence</module>
3334
</modules>
3435
</project>

0 commit comments

Comments
 (0)