Skip to content

Commit af5d4d9

Browse files
committed
raft算法重构
1 parent 2dffd59 commit af5d4d9

37 files changed

+907
-280
lines changed
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
package code.distribution.raft;
2+
3+
import code.distribution.raft.enums.NodeEventType;
4+
import code.distribution.raft.enums.RoleType;
5+
import code.distribution.raft.model.NodeEvent;
6+
import code.util.NamedThreadFactory;
7+
8+
import java.util.concurrent.ScheduledExecutorService;
9+
import java.util.concurrent.ScheduledThreadPoolExecutor;
10+
import java.util.concurrent.ThreadPoolExecutor;
11+
import java.util.concurrent.TimeUnit;
12+
13+
/**
14+
* 〈节点管理服务〉<p>
15+
* 〈功能详细描述〉
16+
*
17+
* @author beston
18+
* @date 2020/12/08
19+
*/
20+
public class NodeManageService implements IService {
21+
22+
private final RaftNodeServer nodeServer;
23+
24+
private boolean stop;
25+
26+
public NodeManageService(RaftNodeServer nodeServer) {
27+
this.nodeServer = nodeServer;
28+
}
29+
30+
private final ScheduledExecutorService nodeManagerTimer = new ScheduledThreadPoolExecutor(1,
31+
new NamedThreadFactory("NodeManagerTimer-"), new ThreadPoolExecutor.CallerRunsPolicy());
32+
33+
@Override
34+
public void start() {
35+
stop = false;
36+
nodeManagerTimer.scheduleAtFixedRate(() -> {
37+
while (!stop) {
38+
if (nodeServer.getNode().getRole() == RoleType.LEADER) {
39+
try {
40+
NodeEvent nodeEvent = RaftClusterManager.newNodeQueue().take();
41+
System.out.println("Handle node event:" + nodeEvent);
42+
if (nodeEvent.getEventType() == NodeEventType.ADD) {
43+
nodeServer.addNode(nodeEvent.getNodeId());
44+
} else if (nodeEvent.getEventType() == NodeEventType.REMOVE) {
45+
nodeServer.removeNode(nodeEvent.getNodeId());
46+
}
47+
} catch (InterruptedException e) {
48+
Thread.currentThread().interrupt();
49+
}
50+
}
51+
}
52+
}, RaftConst.SCHEDULER_DELAY_MS, RaftConst.SLEEP_DEVIATION_MS, TimeUnit.MILLISECONDS);
53+
}
54+
55+
@Override
56+
public void close() {
57+
stop = true;
58+
nodeManagerTimer.shutdownNow();
59+
}
60+
61+
}
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
package code.distribution.raft;
2+
3+
import code.distribution.raft.model.NodeCommand;
4+
import code.distribution.raft.model.NodeEvent;
5+
6+
import java.util.ArrayList;
7+
import java.util.HashSet;
8+
import java.util.List;
9+
import java.util.Set;
10+
import java.util.concurrent.BlockingQueue;
11+
import java.util.concurrent.LinkedBlockingQueue;
12+
import java.util.stream.Collectors;
13+
14+
/**
15+
* 〈集群管理〉<p>
16+
* 〈维护集群节点的上下线〉
17+
*
18+
* @author zixiao
19+
* @date 2019/3/7
20+
*/
21+
public class RaftClusterManager {
22+
23+
/**
24+
* 所有节点
25+
*/
26+
private static Set<String> allNodes = new HashSet<>();
27+
28+
/**
29+
* 除自身外的索引节点
30+
*/
31+
private static Set<String> otherNodes = new HashSet<>();
32+
33+
/**
34+
* 新变更节点队列
35+
*/
36+
private static BlockingQueue<NodeEvent> newNodeQueue = new LinkedBlockingQueue<>();
37+
38+
/**
39+
* 自身节点
40+
* 如果有值,则本节点是raft服务端
41+
* 如果为null,则本节点是raft客户端
42+
*/
43+
private static String selfId;
44+
45+
public static void config(String[] nodeIds){
46+
for (String nodeId : nodeIds) {
47+
allNodes.add(nodeId);
48+
}
49+
}
50+
51+
public static void config(RaftConfig raftConfig){
52+
config(raftConfig.parseClusterNodes());
53+
if(raftConfig.getNodeId() != null){
54+
selfId = raftConfig.getNodeId();
55+
otherNodes = allNodes.stream().filter(nodeId -> !nodeId.equals(selfId)).collect(Collectors.toSet());
56+
}
57+
}
58+
59+
public static List<String> allNodes(){
60+
return new ArrayList<>(allNodes);
61+
}
62+
63+
public static Set<String> otherNodes(){
64+
return otherNodes;
65+
}
66+
67+
public static BlockingQueue<NodeEvent> newNodeQueue(){
68+
return newNodeQueue;
69+
}
70+
71+
public static int nodeNum(){
72+
return allNodes.size();
73+
}
74+
75+
public static boolean exist(String nodeId){
76+
return allNodes().contains(nodeId);
77+
}
78+
79+
public static boolean addNode(String nodeId){
80+
synchronized (allNodes) {
81+
//是否为raft服务端
82+
if (selfId != null && !nodeId.equals(selfId)) {
83+
otherNodes.add(nodeId);
84+
}
85+
boolean success = allNodes.add(nodeId);
86+
if (success) {
87+
newNodeQueue.add(NodeEvent.buildAdd(nodeId));
88+
}
89+
return success;
90+
}
91+
}
92+
93+
public static boolean removeNode(String nodeId){
94+
synchronized (allNodes) {
95+
//是否为raft服务端
96+
if (selfId != null) {
97+
otherNodes.remove(nodeId);
98+
}
99+
boolean success = allNodes.remove(nodeId);
100+
if (success) {
101+
newNodeQueue.add(NodeEvent.buildRemove(nodeId));
102+
}
103+
return success;
104+
}
105+
}
106+
107+
}

src/main/java/code/distribution/raft/RaftConfig.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,11 @@
1313
@Data
1414
public class RaftConfig {
1515

16+
/**
17+
* 机器节点Id
18+
*/
19+
private String nodeId;
20+
1621
/**
1722
* 集群节点配置,以英文逗号,或分号;分隔
1823
*/

src/main/java/code/distribution/raft/RaftConst.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,23 @@ public interface RaftConst {
1111
/**
1212
* 心跳定时器
1313
*/
14-
long HEARTBEAT_MS = 100;
14+
long HEARTBEAT_MS = 150;
1515

1616
/**
1717
* 选举定时器
1818
* 必须要大于5倍心跳定时,建议是10倍关系
1919
*/
20-
long ELECTION_TIMEOUT_MS = 1000;
20+
long ELECTION_TIMEOUT_MS = 1500;
21+
22+
/**
23+
* 休眠时间
24+
*/
25+
long SLEEP_DEVIATION_MS = 50;
26+
27+
/**
28+
* 定时器启动延时
29+
*/
30+
long SCHEDULER_DELAY_MS = 3000;
2131

2232
/**
2333
* 空对象的term值
@@ -27,7 +37,7 @@ public interface RaftConst {
2737
/**
2838
* 重试日志复制
2939
*/
30-
long RETRY_APPEND_MS = 3000;
40+
long RETRY_APPEND_MS = 1000;
3141

3242
/**
3343
* 换行符

src/main/java/code/distribution/raft/RaftNetwork.java

Lines changed: 0 additions & 37 deletions
This file was deleted.

0 commit comments

Comments
 (0)