Skip to content

Commit 6775159

Browse files
authored
Feature: let dledger complete responseFutures after applying entries to statemachine (#132)
1 parent 4293093 commit 6775159

File tree

6 files changed

+190
-79
lines changed

6 files changed

+190
-79
lines changed

src/main/java/io/openmessaging/storage/dledger/DLedgerEntryPusher.java

Lines changed: 132 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import io.openmessaging.storage.dledger.protocol.DLedgerResponseCode;
2424
import io.openmessaging.storage.dledger.protocol.PushEntryRequest;
2525
import io.openmessaging.storage.dledger.protocol.PushEntryResponse;
26-
import io.openmessaging.storage.dledger.statemachine.StateMachine;
2726
import io.openmessaging.storage.dledger.statemachine.StateMachineCaller;
2827
import io.openmessaging.storage.dledger.store.DLedgerMemoryStore;
2928
import io.openmessaging.storage.dledger.store.DLedgerStore;
@@ -184,6 +183,79 @@ public void wakeUpDispatchers() {
184183
}
185184
}
186185

186+
/**
187+
*
188+
* Complete the TimeoutFuture in pendingAppendResponsesByTerm (CurrentTerm, index).
189+
* Called by statemachineCaller when a committed entry (CurrentTerm, index) was applying to statemachine done.
190+
*
191+
* @return true if complete success
192+
*/
193+
public boolean completeResponseFuture(final long index) {
194+
final long term = this.memberState.currTerm();
195+
final Map<Long, TimeoutFuture<AppendEntryResponse>> responses = this.pendingAppendResponsesByTerm.get(term);
196+
if (responses != null) {
197+
CompletableFuture<AppendEntryResponse> future = responses.remove(index);
198+
if (future != null && !future.isDone()) {
199+
logger.info("Complete future, term {}, index {}", term, index);
200+
AppendEntryResponse response = new AppendEntryResponse();
201+
response.setGroup(this.memberState.getGroup());
202+
response.setTerm(term);
203+
response.setIndex(index);
204+
response.setLeaderId(this.memberState.getSelfId());
205+
response.setPos(((AppendFuture) future).getPos());
206+
future.complete(response);
207+
return true;
208+
}
209+
}
210+
return false;
211+
}
212+
213+
/**
214+
* Check responseFutures timeout from {beginIndex} in currentTerm
215+
*/
216+
public void checkResponseFuturesTimeout(final long beginIndex) {
217+
final long term = this.memberState.currTerm();
218+
final Map<Long, TimeoutFuture<AppendEntryResponse>> responses = this.pendingAppendResponsesByTerm.get(term);
219+
if (responses != null) {
220+
for (long i = beginIndex; i < Integer.MAX_VALUE; i++) {
221+
TimeoutFuture<AppendEntryResponse> future = responses.get(i);
222+
if (future == null) {
223+
break;
224+
} else if (future.isTimeOut()) {
225+
AppendEntryResponse response = new AppendEntryResponse();
226+
response.setGroup(memberState.getGroup());
227+
response.setCode(DLedgerResponseCode.WAIT_QUORUM_ACK_TIMEOUT.getCode());
228+
response.setTerm(term);
229+
response.setIndex(i);
230+
response.setLeaderId(memberState.getSelfId());
231+
future.complete(response);
232+
} else {
233+
break;
234+
}
235+
}
236+
}
237+
}
238+
239+
/**
240+
* Check responseFutures elapsed before {endIndex} in currentTerm
241+
*/
242+
private void checkResponseFuturesElapsed(final long endIndex) {
243+
final long currTerm = this.memberState.currTerm();
244+
final Map<Long, TimeoutFuture<AppendEntryResponse>> responses = this.pendingAppendResponsesByTerm.get(currTerm);
245+
for (Map.Entry<Long, TimeoutFuture<AppendEntryResponse>> futureEntry : responses.entrySet()) {
246+
if (futureEntry.getKey() < endIndex) {
247+
AppendEntryResponse response = new AppendEntryResponse();
248+
response.setGroup(memberState.getGroup());
249+
response.setTerm(currTerm);
250+
response.setIndex(futureEntry.getKey());
251+
response.setLeaderId(memberState.getSelfId());
252+
response.setPos(((AppendFuture) futureEntry.getValue()).getPos());
253+
futureEntry.getValue().complete(response);
254+
responses.remove(futureEntry.getKey());
255+
}
256+
}
257+
}
258+
187259
private void updateCommittedIndex(final long term, final long committedIndex) {
188260
dLedgerStore.updateCommittedIndex(term, committedIndex);
189261
this.fsmCaller.ifPresent(caller -> caller.onCommitted(committedIndex));
@@ -206,8 +278,14 @@ public QuorumAckChecker(Logger logger) {
206278
public void doWork() {
207279
try {
208280
if (DLedgerUtils.elapsed(lastPrintWatermarkTimeMs) > 3000) {
209-
logger.info("[{}][{}] term={} ledgerBegin={} ledgerEnd={} committed={} watermarks={}",
210-
memberState.getSelfId(), memberState.getRole(), memberState.currTerm(), dLedgerStore.getLedgerBeginIndex(), dLedgerStore.getLedgerEndIndex(), dLedgerStore.getCommittedIndex(), JSON.toJSONString(peerWaterMarksByTerm));
281+
if (DLedgerEntryPusher.this.fsmCaller.isPresent()) {
282+
final long lastAppliedIndex = DLedgerEntryPusher.this.fsmCaller.get().getLastAppliedIndex();
283+
logger.info("[{}][{}] term={} ledgerBegin={} ledgerEnd={} committed={} watermarks={} appliedIndex={}",
284+
memberState.getSelfId(), memberState.getRole(), memberState.currTerm(), dLedgerStore.getLedgerBeginIndex(), dLedgerStore.getLedgerEndIndex(), dLedgerStore.getCommittedIndex(), JSON.toJSONString(peerWaterMarksByTerm), lastAppliedIndex);
285+
} else {
286+
logger.info("[{}][{}] term={} ledgerBegin={} ledgerEnd={} committed={} watermarks={}",
287+
memberState.getSelfId(), memberState.getRole(), memberState.currTerm(), dLedgerStore.getLedgerBeginIndex(), dLedgerStore.getLedgerEndIndex(), dLedgerStore.getCommittedIndex(), JSON.toJSONString(peerWaterMarksByTerm));
288+
}
211289
lastPrintWatermarkTimeMs = System.currentTimeMillis();
212290
}
213291
if (!memberState.isLeader()) {
@@ -243,73 +321,66 @@ public void doWork() {
243321
peerWaterMarksByTerm.remove(term);
244322
}
245323
}
246-
Map<String, Long> peerWaterMarks = peerWaterMarksByTerm.get(currTerm);
247324

325+
Map<String, Long> peerWaterMarks = peerWaterMarksByTerm.get(currTerm);
248326
List<Long> sortedWaterMarks = peerWaterMarks.values()
249-
.stream()
250-
.sorted(Comparator.reverseOrder())
251-
.collect(Collectors.toList());
327+
.stream()
328+
.sorted(Comparator.reverseOrder())
329+
.collect(Collectors.toList());
252330
long quorumIndex = sortedWaterMarks.get(sortedWaterMarks.size() / 2);
253-
updateCommittedIndex(currTerm, quorumIndex);
254-
ConcurrentMap<Long, TimeoutFuture<AppendEntryResponse>> responses = pendingAppendResponsesByTerm.get(currTerm);
255-
boolean needCheck = false;
256-
int ackNum = 0;
257-
for (Long i = quorumIndex; i > lastQuorumIndex; i--) {
258-
try {
259-
CompletableFuture<AppendEntryResponse> future = responses.remove(i);
260-
if (future == null) {
261-
needCheck = true;
262-
break;
263-
} else if (!future.isDone()) {
264-
AppendEntryResponse response = new AppendEntryResponse();
265-
response.setGroup(memberState.getGroup());
266-
response.setTerm(currTerm);
267-
response.setIndex(i);
268-
response.setLeaderId(memberState.getSelfId());
269-
response.setPos(((AppendFuture) future).getPos());
270-
future.complete(response);
271-
}
272-
ackNum++;
273-
} catch (Throwable t) {
274-
logger.error("Error in ack to index={} term={}", i, currTerm, t);
331+
final Optional<StateMachineCaller> fsmCaller = DLedgerEntryPusher.this.fsmCaller;
332+
if (fsmCaller.isPresent()) {
333+
// If there exist statemachine
334+
DLedgerEntryPusher.this.dLedgerStore.updateCommittedIndex(currTerm, quorumIndex);
335+
final StateMachineCaller caller = fsmCaller.get();
336+
caller.onCommitted(quorumIndex);
337+
338+
// Check elapsed
339+
if (DLedgerUtils.elapsed(lastCheckLeakTimeMs) > 1000) {
340+
updatePeerWaterMark(currTerm, memberState.getSelfId(), dLedgerStore.getLedgerEndIndex());
341+
checkResponseFuturesElapsed(caller.getLastAppliedIndex());
342+
lastCheckLeakTimeMs = System.currentTimeMillis();
275343
}
276-
}
277344

278-
if (ackNum == 0) {
279-
for (long i = quorumIndex + 1; i < Integer.MAX_VALUE; i++) {
280-
TimeoutFuture<AppendEntryResponse> future = responses.get(i);
281-
if (future == null) {
282-
break;
283-
} else if (future.isTimeOut()) {
284-
AppendEntryResponse response = new AppendEntryResponse();
285-
response.setGroup(memberState.getGroup());
286-
response.setCode(DLedgerResponseCode.WAIT_QUORUM_ACK_TIMEOUT.getCode());
287-
response.setTerm(currTerm);
288-
response.setIndex(i);
289-
response.setLeaderId(memberState.getSelfId());
290-
future.complete(response);
291-
} else {
292-
break;
345+
if (quorumIndex == this.lastQuorumIndex) {
346+
waitForRunning(1);
347+
}
348+
} else {
349+
dLedgerStore.updateCommittedIndex(currTerm, quorumIndex);
350+
ConcurrentMap<Long, TimeoutFuture<AppendEntryResponse>> responses = pendingAppendResponsesByTerm.get(currTerm);
351+
boolean needCheck = false;
352+
int ackNum = 0;
353+
for (Long i = quorumIndex; i > lastQuorumIndex; i--) {
354+
try {
355+
CompletableFuture<AppendEntryResponse> future = responses.remove(i);
356+
if (future == null) {
357+
needCheck = true;
358+
break;
359+
} else if (!future.isDone()) {
360+
AppendEntryResponse response = new AppendEntryResponse();
361+
response.setGroup(memberState.getGroup());
362+
response.setTerm(currTerm);
363+
response.setIndex(i);
364+
response.setLeaderId(memberState.getSelfId());
365+
response.setPos(((AppendFuture) future).getPos());
366+
future.complete(response);
367+
}
368+
ackNum++;
369+
} catch (Throwable t) {
370+
logger.error("Error in ack to index={} term={}", i, currTerm, t);
293371
}
294372
}
295-
waitForRunning(1);
296-
}
297373

298-
if (DLedgerUtils.elapsed(lastCheckLeakTimeMs) > 1000 || needCheck) {
299-
updatePeerWaterMark(currTerm, memberState.getSelfId(), dLedgerStore.getLedgerEndIndex());
300-
for (Map.Entry<Long, TimeoutFuture<AppendEntryResponse>> futureEntry : responses.entrySet()) {
301-
if (futureEntry.getKey() < quorumIndex) {
302-
AppendEntryResponse response = new AppendEntryResponse();
303-
response.setGroup(memberState.getGroup());
304-
response.setTerm(currTerm);
305-
response.setIndex(futureEntry.getKey());
306-
response.setLeaderId(memberState.getSelfId());
307-
response.setPos(((AppendFuture) futureEntry.getValue()).getPos());
308-
futureEntry.getValue().complete(response);
309-
responses.remove(futureEntry.getKey());
310-
}
374+
if (ackNum == 0) {
375+
checkResponseFuturesTimeout(quorumIndex);
376+
waitForRunning(1);
377+
}
378+
379+
if (DLedgerUtils.elapsed(lastCheckLeakTimeMs) > 1000 || needCheck) {
380+
updatePeerWaterMark(currTerm, memberState.getSelfId(), dLedgerStore.getLedgerEndIndex());
381+
checkResponseFuturesElapsed(quorumIndex);
382+
lastCheckLeakTimeMs = System.currentTimeMillis();
311383
}
312-
lastCheckLeakTimeMs = System.currentTimeMillis();
313384
}
314385
lastQuorumIndex = quorumIndex;
315386
} catch (Throwable t) {

src/main/java/io/openmessaging/storage/dledger/DLedgerServer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ public MemberState getMemberState() {
120120
}
121121

122122
public void registerStateMachine(final StateMachine fsm) {
123-
final StateMachineCaller fsmCaller = new StateMachineCaller(this.dLedgerStore, fsm);
123+
final StateMachineCaller fsmCaller = new StateMachineCaller(this.dLedgerStore, fsm, this.dLedgerEntryPusher);
124124
fsmCaller.start();
125125
this.fsmCaller = Optional.of(fsmCaller);
126126
// Register state machine caller to entry pusher

src/main/java/io/openmessaging/storage/dledger/statemachine/CommittedEntryIterator.java

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,32 +16,41 @@
1616

1717
package io.openmessaging.storage.dledger.statemachine;
1818

19-
import java.util.Iterator;
20-
import java.util.concurrent.atomic.AtomicLong;
21-
2219
import io.openmessaging.storage.dledger.entry.DLedgerEntry;
2320
import io.openmessaging.storage.dledger.store.DLedgerStore;
21+
import java.util.Iterator;
22+
import java.util.concurrent.atomic.AtomicLong;
23+
import java.util.function.Function;
2424

2525
/**
2626
* The iterator implementation of committed entries.
2727
*/
2828
public class CommittedEntryIterator implements Iterator<DLedgerEntry> {
2929

30+
private final Function<Long, Boolean> completeEntryCallback;
3031
private final DLedgerStore dLedgerStore;
31-
private final long committedIndex;
32-
private final AtomicLong applyingIndex;
33-
private long currentIndex;
32+
private final long committedIndex;
33+
private final long firstApplyingIndex;
34+
private final AtomicLong applyingIndex;
35+
private long currentIndex;
36+
private int completeAckNums = 0;
3437

3538
public CommittedEntryIterator(final DLedgerStore dLedgerStore, final long committedIndex,
36-
final AtomicLong applyingIndex, final long lastAppliedIndex) {
39+
final AtomicLong applyingIndex, final long lastAppliedIndex,
40+
final Function<Long, Boolean> completeEntryCallback) {
3741
this.dLedgerStore = dLedgerStore;
3842
this.committedIndex = committedIndex;
3943
this.applyingIndex = applyingIndex;
44+
this.firstApplyingIndex = lastAppliedIndex + 1;
4045
this.currentIndex = lastAppliedIndex;
46+
this.completeEntryCallback = completeEntryCallback;
4147
}
4248

4349
@Override
4450
public boolean hasNext() {
51+
if (this.currentIndex >= this.firstApplyingIndex && this.currentIndex <= this.committedIndex) {
52+
completeApplyingEntry();
53+
}
4554
return this.currentIndex < this.committedIndex;
4655
}
4756

@@ -56,7 +65,17 @@ public DLedgerEntry next() {
5665
return null;
5766
}
5867

68+
private void completeApplyingEntry() {
69+
if (this.completeEntryCallback.apply(this.currentIndex)) {
70+
this.completeAckNums++;
71+
}
72+
}
73+
5974
public long getIndex() {
6075
return this.currentIndex;
6176
}
77+
78+
public int getCompleteAckNums() {
79+
return completeAckNums;
80+
}
6281
}

src/main/java/io/openmessaging/storage/dledger/statemachine/StateMachine.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,9 @@
1616

1717
package io.openmessaging.storage.dledger.statemachine;
1818

19-
import java.util.concurrent.CompletableFuture;
20-
2119
import io.openmessaging.storage.dledger.snapshot.SnapshotReader;
2220
import io.openmessaging.storage.dledger.snapshot.SnapshotWriter;
21+
import java.util.concurrent.CompletableFuture;
2322

2423
/**
2524
* Finite state machine, which should be implemented by user.

0 commit comments

Comments
 (0)