Skip to content

Commit 6f0bf36

Browse files
authored
Merge pull request #133 from openmessaging/statemachine
Add Statemachine feature for DLedger
2 parents ac3f801 + 6775159 commit 6f0bf36

File tree

9 files changed

+712
-70
lines changed

9 files changed

+712
-70
lines changed

src/main/java/io/openmessaging/storage/dledger/DLedgerEntryPusher.java

Lines changed: 149 additions & 64 deletions
Large diffs are not rendered by default.

src/main/java/io/openmessaging/storage/dledger/DLedgerServer.java

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@
3737
import io.openmessaging.storage.dledger.protocol.PushEntryResponse;
3838
import io.openmessaging.storage.dledger.protocol.VoteRequest;
3939
import io.openmessaging.storage.dledger.protocol.VoteResponse;
40+
import io.openmessaging.storage.dledger.statemachine.StateMachine;
41+
import io.openmessaging.storage.dledger.statemachine.StateMachineCaller;
4042
import io.openmessaging.storage.dledger.store.DLedgerMemoryStore;
4143
import io.openmessaging.storage.dledger.store.DLedgerStore;
4244
import io.openmessaging.storage.dledger.store.file.DLedgerMmapFileStore;
@@ -49,12 +51,12 @@
4951
import java.util.Collections;
5052
import java.util.Iterator;
5153
import java.util.List;
54+
import java.util.Optional;
5255
import java.util.concurrent.Executors;
5356
import java.util.concurrent.ScheduledExecutorService;
5457
import java.util.concurrent.TimeUnit;
5558
import java.util.concurrent.CompletableFuture;
5659

57-
5860
import org.slf4j.Logger;
5961
import org.slf4j.LoggerFactory;
6062

@@ -71,6 +73,7 @@ public class DLedgerServer implements DLedgerProtocolHandler {
7173
private DLedgerLeaderElector dLedgerLeaderElector;
7274

7375
private ScheduledExecutorService executorService;
76+
private Optional<StateMachineCaller> fsmCaller;
7477

7578
public DLedgerServer(DLedgerConfig dLedgerConfig) {
7679
this.dLedgerConfig = dLedgerConfig;
@@ -87,7 +90,6 @@ public DLedgerServer(DLedgerConfig dLedgerConfig) {
8790
});
8891
}
8992

90-
9193
public void startup() {
9294
this.dLedgerStore.startup();
9395
this.dLedgerRpcService.startup();
@@ -102,6 +104,7 @@ public void shutdown() {
102104
this.dLedgerRpcService.shutdown();
103105
this.dLedgerStore.shutdown();
104106
executorService.shutdown();
107+
this.fsmCaller.ifPresent(StateMachineCaller::shutdown);
105108
}
106109

107110
private DLedgerStore createDLedgerStore(String storeType, DLedgerConfig config, MemberState memberState) {
@@ -116,6 +119,18 @@ public MemberState getMemberState() {
116119
return memberState;
117120
}
118121

122+
public void registerStateMachine(final StateMachine fsm) {
123+
final StateMachineCaller fsmCaller = new StateMachineCaller(this.dLedgerStore, fsm, this.dLedgerEntryPusher);
124+
fsmCaller.start();
125+
this.fsmCaller = Optional.of(fsmCaller);
126+
// Register state machine caller to entry pusher
127+
this.dLedgerEntryPusher.registerStateMachine(this.fsmCaller);
128+
}
129+
130+
public StateMachine getStateMachine() {
131+
return this.fsmCaller.map(StateMachineCaller::getStateMachine).orElse(null);
132+
}
133+
119134
@Override public CompletableFuture<HeartBeatResponse> handleHeartBeat(HeartBeatRequest request) throws Exception {
120135
try {
121136

@@ -190,12 +205,12 @@ public CompletableFuture<AppendEntryResponse> handleAppend(AppendEntryRequest re
190205
}
191206
// only wait last entry ack is ok
192207
BatchAppendFuture<AppendEntryResponse> batchAppendFuture =
193-
(BatchAppendFuture<AppendEntryResponse>) dLedgerEntryPusher.waitAck(resEntry, true);
208+
(BatchAppendFuture<AppendEntryResponse>) dLedgerEntryPusher.waitAck(resEntry, true);
194209
batchAppendFuture.setPositions(positions);
195210
return batchAppendFuture;
196211
}
197212
throw new DLedgerException(DLedgerResponseCode.REQUEST_WITH_EMPTY_BODYS, "BatchAppendEntryRequest" +
198-
" with empty bodys");
213+
" with empty bodys");
199214
} else {
200215
DLedgerEntry dLedgerEntry = new DLedgerEntry();
201216
dLedgerEntry.setBody(request.getBody());
@@ -278,7 +293,8 @@ public CompletableFuture<PullEntriesResponse> handlePull(PullEntriesRequest requ
278293
}
279294

280295
@Override
281-
public CompletableFuture<LeadershipTransferResponse> handleLeadershipTransfer(LeadershipTransferRequest request) throws Exception {
296+
public CompletableFuture<LeadershipTransferResponse> handleLeadershipTransfer(
297+
LeadershipTransferRequest request) throws Exception {
282298
try {
283299
PreConditions.check(memberState.getSelfId().equals(request.getRemoteId()), DLedgerResponseCode.UNKNOWN_MEMBER, "%s != %s", request.getRemoteId(), memberState.getSelfId());
284300
PreConditions.check(memberState.getGroup().equals(request.getGroup()), DLedgerResponseCode.UNKNOWN_GROUP, "%s != %s", request.getGroup(), memberState.getGroup());
@@ -385,7 +401,7 @@ private void checkPreferredLeader() {
385401
}
386402
}
387403
logger.info("preferredLeaderId = {}, which has the smallest fall behind index = {} and is decided to be transferee.", preferredLeaderId, minFallBehind);
388-
404+
389405
if (minFallBehind < dLedgerConfig.getMaxLeadershipTransferWaitIndex()) {
390406
LeadershipTransferRequest request = new LeadershipTransferRequest();
391407
request.setTerm(memberState.currTerm());
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/*
2+
* Copyright 2017-2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.openmessaging.storage.dledger.snapshot;
18+
19+
/**
20+
* Reader for snapshot
21+
*/
22+
public interface SnapshotReader {
23+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/*
2+
* Copyright 2017-2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.openmessaging.storage.dledger.snapshot;
18+
19+
/**
20+
* Writer for snapshot
21+
*/
22+
public interface SnapshotWriter {
23+
}
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/*
2+
* Copyright 2017-2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.openmessaging.storage.dledger.statemachine;
18+
19+
import io.openmessaging.storage.dledger.entry.DLedgerEntry;
20+
import io.openmessaging.storage.dledger.store.DLedgerStore;
21+
import java.util.Iterator;
22+
import java.util.concurrent.atomic.AtomicLong;
23+
import java.util.function.Function;
24+
25+
/**
26+
* The iterator implementation of committed entries.
27+
*/
28+
public class CommittedEntryIterator implements Iterator<DLedgerEntry> {
29+
30+
private final Function<Long, Boolean> completeEntryCallback;
31+
private final DLedgerStore dLedgerStore;
32+
private final long committedIndex;
33+
private final long firstApplyingIndex;
34+
private final AtomicLong applyingIndex;
35+
private long currentIndex;
36+
private int completeAckNums = 0;
37+
38+
public CommittedEntryIterator(final DLedgerStore dLedgerStore, final long committedIndex,
39+
final AtomicLong applyingIndex, final long lastAppliedIndex,
40+
final Function<Long, Boolean> completeEntryCallback) {
41+
this.dLedgerStore = dLedgerStore;
42+
this.committedIndex = committedIndex;
43+
this.applyingIndex = applyingIndex;
44+
this.firstApplyingIndex = lastAppliedIndex + 1;
45+
this.currentIndex = lastAppliedIndex;
46+
this.completeEntryCallback = completeEntryCallback;
47+
}
48+
49+
@Override
50+
public boolean hasNext() {
51+
if (this.currentIndex >= this.firstApplyingIndex && this.currentIndex <= this.committedIndex) {
52+
completeApplyingEntry();
53+
}
54+
return this.currentIndex < this.committedIndex;
55+
}
56+
57+
@Override
58+
public DLedgerEntry next() {
59+
++this.currentIndex;
60+
if (this.currentIndex <= this.committedIndex) {
61+
final DLedgerEntry dLedgerEntry = this.dLedgerStore.get(this.currentIndex);
62+
this.applyingIndex.set(this.currentIndex);
63+
return dLedgerEntry;
64+
}
65+
return null;
66+
}
67+
68+
private void completeApplyingEntry() {
69+
if (this.completeEntryCallback.apply(this.currentIndex)) {
70+
this.completeAckNums++;
71+
}
72+
}
73+
74+
public long getIndex() {
75+
return this.currentIndex;
76+
}
77+
78+
public int getCompleteAckNums() {
79+
return completeAckNums;
80+
}
81+
}
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Copyright 2017-2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.openmessaging.storage.dledger.statemachine;
18+
19+
import io.openmessaging.storage.dledger.snapshot.SnapshotReader;
20+
import io.openmessaging.storage.dledger.snapshot.SnapshotWriter;
21+
import java.util.concurrent.CompletableFuture;
22+
23+
/**
24+
* Finite state machine, which should be implemented by user.
25+
*/
26+
public interface StateMachine {
27+
28+
/**
29+
* Update the user statemachine with a batch a tasks that can be accessed
30+
* through |iterator|.
31+
*
32+
* @param iter iterator of committed entry
33+
*/
34+
void onApply(final CommittedEntryIterator iter);
35+
36+
/**
37+
* User defined snapshot generate function, this method will block StateMachine#onApply(Iterator).
38+
* Call done.run(status) when snapshot finished.
39+
*
40+
* @param writer snapshot writer
41+
* @param done callback
42+
*/
43+
void onSnapshotSave(final SnapshotWriter writer, final CompletableFuture<Boolean> done);
44+
45+
/**
46+
* User defined snapshot load function.
47+
*
48+
* @param reader snapshot reader
49+
* @return true on success
50+
*/
51+
boolean onSnapshotLoad(final SnapshotReader reader);
52+
53+
/**
54+
* Invoked once when the raft node was shut down.
55+
* Default do nothing
56+
*/
57+
void onShutdown();
58+
}

0 commit comments

Comments
 (0)