Skip to content

Commit 4293093

Browse files
authored
Feature: add statemachine for dledger (#128)
* feature: add statemachine add statemachine caller * feature: add fsm caller to dledgerEntryPusher * feature: Improve the function of the state machine * style: add apach header * style: add apache header
1 parent 26c15a9 commit 4293093

File tree

9 files changed

+542
-11
lines changed

9 files changed

+542
-11
lines changed

src/main/java/io/openmessaging/storage/dledger/DLedgerEntryPusher.java

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import io.openmessaging.storage.dledger.protocol.DLedgerResponseCode;
2424
import io.openmessaging.storage.dledger.protocol.PushEntryRequest;
2525
import io.openmessaging.storage.dledger.protocol.PushEntryResponse;
26+
import io.openmessaging.storage.dledger.statemachine.StateMachine;
27+
import io.openmessaging.storage.dledger.statemachine.StateMachineCaller;
2628
import io.openmessaging.storage.dledger.store.DLedgerMemoryStore;
2729
import io.openmessaging.storage.dledger.store.DLedgerStore;
2830
import io.openmessaging.storage.dledger.store.file.DLedgerMmapFileStore;
@@ -34,6 +36,7 @@
3436
import java.util.HashMap;
3537
import java.util.List;
3638
import java.util.Map;
39+
import java.util.Optional;
3740
import java.util.concurrent.ArrayBlockingQueue;
3841
import java.util.concurrent.BlockingQueue;
3942
import java.util.concurrent.CompletableFuture;
@@ -65,6 +68,8 @@ public class DLedgerEntryPusher {
6568

6669
private Map<String, EntryDispatcher> dispatcherMap = new HashMap<>();
6770

71+
private Optional<StateMachineCaller> fsmCaller;
72+
6873
public DLedgerEntryPusher(DLedgerConfig dLedgerConfig, MemberState memberState, DLedgerStore dLedgerStore,
6974
DLedgerRpcService dLedgerRpcService) {
7075
this.dLedgerConfig = dLedgerConfig;
@@ -96,6 +101,10 @@ public void shutdown() {
96101
}
97102
}
98103

104+
public void registerStateMachine(final Optional<StateMachineCaller> fsmCaller) {
105+
this.fsmCaller = fsmCaller;
106+
}
107+
99108
public CompletableFuture<PushEntryResponse> handlePush(PushEntryRequest request) throws Exception {
100109
return entryHandler.handlePush(request);
101110
}
@@ -175,6 +184,11 @@ public void wakeUpDispatchers() {
175184
}
176185
}
177186

187+
private void updateCommittedIndex(final long term, final long committedIndex) {
188+
dLedgerStore.updateCommittedIndex(term, committedIndex);
189+
this.fsmCaller.ifPresent(caller -> caller.onCommitted(committedIndex));
190+
}
191+
178192
/**
179193
* This thread will check the quorum index and complete the pending requests.
180194
*/
@@ -236,7 +250,7 @@ public void doWork() {
236250
.sorted(Comparator.reverseOrder())
237251
.collect(Collectors.toList());
238252
long quorumIndex = sortedWaterMarks.get(sortedWaterMarks.size() / 2);
239-
dLedgerStore.updateCommittedIndex(currTerm, quorumIndex);
253+
updateCommittedIndex(currTerm, quorumIndex);
240254
ConcurrentMap<Long, TimeoutFuture<AppendEntryResponse>> responses = pendingAppendResponsesByTerm.get(currTerm);
241255
boolean needCheck = false;
242256
int ackNum = 0;
@@ -830,7 +844,7 @@ private void handleDoAppend(long writeIndex, PushEntryRequest request,
830844
DLedgerEntry entry = dLedgerStore.appendAsFollower(request.getEntry(), request.getTerm(), request.getLeaderId());
831845
PreConditions.check(entry.getIndex() == writeIndex, DLedgerResponseCode.INCONSISTENT_STATE);
832846
future.complete(buildResponse(request, DLedgerResponseCode.SUCCESS.getCode()));
833-
dLedgerStore.updateCommittedIndex(request.getTerm(), request.getCommitIndex());
847+
updateCommittedIndex(request.getTerm(), request.getCommitIndex());
834848
} catch (Throwable t) {
835849
logger.error("[HandleDoWrite] writeIndex={}", writeIndex, t);
836850
future.complete(buildResponse(request, DLedgerResponseCode.INCONSISTENT_STATE.getCode()));
@@ -857,7 +871,7 @@ private CompletableFuture<PushEntryResponse> handleDoCommit(long committedIndex,
857871
try {
858872
PreConditions.check(committedIndex == request.getCommitIndex(), DLedgerResponseCode.UNKNOWN);
859873
PreConditions.check(request.getType() == PushEntryRequest.Type.COMMIT, DLedgerResponseCode.UNKNOWN);
860-
dLedgerStore.updateCommittedIndex(request.getTerm(), committedIndex);
874+
updateCommittedIndex(request.getTerm(), committedIndex);
861875
future.complete(buildResponse(request, DLedgerResponseCode.SUCCESS.getCode()));
862876
} catch (Throwable t) {
863877
logger.error("[HandleDoCommit] committedIndex={}", request.getCommitIndex(), t);
@@ -875,7 +889,7 @@ private CompletableFuture<PushEntryResponse> handleDoTruncate(long truncateIndex
875889
long index = dLedgerStore.truncate(request.getEntry(), request.getTerm(), request.getLeaderId());
876890
PreConditions.check(index == truncateIndex, DLedgerResponseCode.INCONSISTENT_STATE);
877891
future.complete(buildResponse(request, DLedgerResponseCode.SUCCESS.getCode()));
878-
dLedgerStore.updateCommittedIndex(request.getTerm(), request.getCommitIndex());
892+
updateCommittedIndex(request.getTerm(), request.getCommitIndex());
879893
} catch (Throwable t) {
880894
logger.error("[HandleDoTruncate] truncateIndex={}", truncateIndex, t);
881895
future.complete(buildResponse(request, DLedgerResponseCode.INCONSISTENT_STATE.getCode()));
@@ -891,7 +905,7 @@ private void handleDoBatchAppend(long writeIndex, PushEntryRequest request,
891905
dLedgerStore.appendAsFollower(entry, request.getTerm(), request.getLeaderId());
892906
}
893907
future.complete(buildBatchAppendResponse(request, DLedgerResponseCode.SUCCESS.getCode()));
894-
dLedgerStore.updateCommittedIndex(request.getTerm(), request.getCommitIndex());
908+
updateCommittedIndex(request.getTerm(), request.getCommitIndex());
895909
} catch (Throwable t) {
896910
logger.error("[HandleDoBatchAppend]", t);
897911
}

src/main/java/io/openmessaging/storage/dledger/DLedgerServer.java

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@
3737
import io.openmessaging.storage.dledger.protocol.PushEntryResponse;
3838
import io.openmessaging.storage.dledger.protocol.VoteRequest;
3939
import io.openmessaging.storage.dledger.protocol.VoteResponse;
40+
import io.openmessaging.storage.dledger.statemachine.StateMachine;
41+
import io.openmessaging.storage.dledger.statemachine.StateMachineCaller;
4042
import io.openmessaging.storage.dledger.store.DLedgerMemoryStore;
4143
import io.openmessaging.storage.dledger.store.DLedgerStore;
4244
import io.openmessaging.storage.dledger.store.file.DLedgerMmapFileStore;
@@ -49,12 +51,12 @@
4951
import java.util.Collections;
5052
import java.util.Iterator;
5153
import java.util.List;
54+
import java.util.Optional;
5255
import java.util.concurrent.Executors;
5356
import java.util.concurrent.ScheduledExecutorService;
5457
import java.util.concurrent.TimeUnit;
5558
import java.util.concurrent.CompletableFuture;
5659

57-
5860
import org.slf4j.Logger;
5961
import org.slf4j.LoggerFactory;
6062

@@ -71,6 +73,7 @@ public class DLedgerServer implements DLedgerProtocolHander {
7173
private DLedgerLeaderElector dLedgerLeaderElector;
7274

7375
private ScheduledExecutorService executorService;
76+
private Optional<StateMachineCaller> fsmCaller;
7477

7578
public DLedgerServer(DLedgerConfig dLedgerConfig) {
7679
this.dLedgerConfig = dLedgerConfig;
@@ -87,7 +90,6 @@ public DLedgerServer(DLedgerConfig dLedgerConfig) {
8790
});
8891
}
8992

90-
9193
public void startup() {
9294
this.dLedgerStore.startup();
9395
this.dLedgerRpcService.startup();
@@ -102,6 +104,7 @@ public void shutdown() {
102104
this.dLedgerRpcService.shutdown();
103105
this.dLedgerStore.shutdown();
104106
executorService.shutdown();
107+
this.fsmCaller.ifPresent(StateMachineCaller::shutdown);
105108
}
106109

107110
private DLedgerStore createDLedgerStore(String storeType, DLedgerConfig config, MemberState memberState) {
@@ -116,6 +119,18 @@ public MemberState getMemberState() {
116119
return memberState;
117120
}
118121

122+
public void registerStateMachine(final StateMachine fsm) {
123+
final StateMachineCaller fsmCaller = new StateMachineCaller(this.dLedgerStore, fsm);
124+
fsmCaller.start();
125+
this.fsmCaller = Optional.of(fsmCaller);
126+
// Register state machine caller to entry pusher
127+
this.dLedgerEntryPusher.registerStateMachine(this.fsmCaller);
128+
}
129+
130+
public StateMachine getStateMachine() {
131+
return this.fsmCaller.map(StateMachineCaller::getStateMachine).orElse(null);
132+
}
133+
119134
@Override public CompletableFuture<HeartBeatResponse> handleHeartBeat(HeartBeatRequest request) throws Exception {
120135
try {
121136

@@ -190,12 +205,12 @@ public CompletableFuture<AppendEntryResponse> handleAppend(AppendEntryRequest re
190205
}
191206
// only wait last entry ack is ok
192207
BatchAppendFuture<AppendEntryResponse> batchAppendFuture =
193-
(BatchAppendFuture<AppendEntryResponse>) dLedgerEntryPusher.waitAck(resEntry, true);
208+
(BatchAppendFuture<AppendEntryResponse>) dLedgerEntryPusher.waitAck(resEntry, true);
194209
batchAppendFuture.setPositions(positions);
195210
return batchAppendFuture;
196211
}
197212
throw new DLedgerException(DLedgerResponseCode.REQUEST_WITH_EMPTY_BODYS, "BatchAppendEntryRequest" +
198-
" with empty bodys");
213+
" with empty bodys");
199214
} else {
200215
DLedgerEntry dLedgerEntry = new DLedgerEntry();
201216
dLedgerEntry.setBody(request.getBody());
@@ -278,7 +293,8 @@ public CompletableFuture<PullEntriesResponse> handlePull(PullEntriesRequest requ
278293
}
279294

280295
@Override
281-
public CompletableFuture<LeadershipTransferResponse> handleLeadershipTransfer(LeadershipTransferRequest request) throws Exception {
296+
public CompletableFuture<LeadershipTransferResponse> handleLeadershipTransfer(
297+
LeadershipTransferRequest request) throws Exception {
282298
try {
283299
PreConditions.check(memberState.getSelfId().equals(request.getRemoteId()), DLedgerResponseCode.UNKNOWN_MEMBER, "%s != %s", request.getRemoteId(), memberState.getSelfId());
284300
PreConditions.check(memberState.getGroup().equals(request.getGroup()), DLedgerResponseCode.UNKNOWN_GROUP, "%s != %s", request.getGroup(), memberState.getGroup());
@@ -385,7 +401,7 @@ private void checkPreferredLeader() {
385401
}
386402
}
387403
logger.info("preferredLeaderId = {}, which has the smallest fall behind index = {} and is decided to be transferee.", preferredLeaderId, minFallBehind);
388-
404+
389405
if (minFallBehind < dLedgerConfig.getMaxLeadershipTransferWaitIndex()) {
390406
LeadershipTransferRequest request = new LeadershipTransferRequest();
391407
request.setTerm(memberState.currTerm());
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/*
2+
* Copyright 2017-2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.openmessaging.storage.dledger.snapshot;
18+
19+
/**
20+
* Reader for snapshot
21+
*/
22+
public interface SnapshotReader {
23+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/*
2+
* Copyright 2017-2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.openmessaging.storage.dledger.snapshot;
18+
19+
/**
20+
* Writer for snapshot
21+
*/
22+
public interface SnapshotWriter {
23+
}
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Copyright 2017-2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.openmessaging.storage.dledger.statemachine;
18+
19+
import java.util.Iterator;
20+
import java.util.concurrent.atomic.AtomicLong;
21+
22+
import io.openmessaging.storage.dledger.entry.DLedgerEntry;
23+
import io.openmessaging.storage.dledger.store.DLedgerStore;
24+
25+
/**
26+
* The iterator implementation of committed entries.
27+
*/
28+
public class CommittedEntryIterator implements Iterator<DLedgerEntry> {
29+
30+
private final DLedgerStore dLedgerStore;
31+
private final long committedIndex;
32+
private final AtomicLong applyingIndex;
33+
private long currentIndex;
34+
35+
public CommittedEntryIterator(final DLedgerStore dLedgerStore, final long committedIndex,
36+
final AtomicLong applyingIndex, final long lastAppliedIndex) {
37+
this.dLedgerStore = dLedgerStore;
38+
this.committedIndex = committedIndex;
39+
this.applyingIndex = applyingIndex;
40+
this.currentIndex = lastAppliedIndex;
41+
}
42+
43+
@Override
44+
public boolean hasNext() {
45+
return this.currentIndex < this.committedIndex;
46+
}
47+
48+
@Override
49+
public DLedgerEntry next() {
50+
++this.currentIndex;
51+
if (this.currentIndex <= this.committedIndex) {
52+
final DLedgerEntry dLedgerEntry = this.dLedgerStore.get(this.currentIndex);
53+
this.applyingIndex.set(this.currentIndex);
54+
return dLedgerEntry;
55+
}
56+
return null;
57+
}
58+
59+
public long getIndex() {
60+
return this.currentIndex;
61+
}
62+
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Copyright 2017-2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.openmessaging.storage.dledger.statemachine;
18+
19+
import java.util.concurrent.CompletableFuture;
20+
21+
import io.openmessaging.storage.dledger.snapshot.SnapshotReader;
22+
import io.openmessaging.storage.dledger.snapshot.SnapshotWriter;
23+
24+
/**
25+
* Finite state machine, which should be implemented by user.
26+
*/
27+
public interface StateMachine {
28+
29+
/**
30+
* Update the user statemachine with a batch a tasks that can be accessed
31+
* through |iterator|.
32+
*
33+
* @param iter iterator of committed entry
34+
*/
35+
void onApply(final CommittedEntryIterator iter);
36+
37+
/**
38+
* User defined snapshot generate function, this method will block StateMachine#onApply(Iterator).
39+
* Call done.run(status) when snapshot finished.
40+
*
41+
* @param writer snapshot writer
42+
* @param done callback
43+
*/
44+
void onSnapshotSave(final SnapshotWriter writer, final CompletableFuture<Boolean> done);
45+
46+
/**
47+
* User defined snapshot load function.
48+
*
49+
* @param reader snapshot reader
50+
* @return true on success
51+
*/
52+
boolean onSnapshotLoad(final SnapshotReader reader);
53+
54+
/**
55+
* Invoked once when the raft node was shut down.
56+
* Default do nothing
57+
*/
58+
void onShutdown();
59+
}

0 commit comments

Comments
 (0)