2525import io .openmessaging .storage .dledger .entry .DLedgerEntry ;
2626import io .openmessaging .storage .dledger .exception .DLedgerException ;
2727import io .openmessaging .storage .dledger .metrics .DLedgerMetricsManager ;
28+ import io .openmessaging .storage .dledger .protocol .AppendEntryResponse ;
2829import io .openmessaging .storage .dledger .protocol .DLedgerResponseCode ;
2930import io .openmessaging .storage .dledger .protocol .InstallSnapshotRequest ;
3031import io .openmessaging .storage .dledger .protocol .InstallSnapshotResponse ;
@@ -168,8 +169,22 @@ public long getPeerWaterMark(long term, String peerId) {
168169 }
169170 }
170171
172+ public int getPendingCount (long currTerm ) {
173+ if (pendingClosure == null ) {
174+ return 0 ;
175+ }
176+ ConcurrentMap <Long , Closure > pendings = pendingClosure .get (currTerm );
177+ if (pendings == null ) {
178+ return 0 ;
179+ }
180+ return pendings .size ();
181+ }
182+
171183 public boolean isPendingFull (long currTerm ) {
172184 checkTermForPendingMap (currTerm , "isPendingFull" );
185+ if (dLedgerStore .isLocalToomuchUncommitted ()) {
186+ return true ;
187+ }
173188 return pendingClosure .get (currTerm ).size () > dLedgerConfig .getMaxPendingRequestsNum ();
174189 }
175190
@@ -227,17 +242,30 @@ public void checkResponseFuturesTimeout(final long beginIndex) {
227242 }
228243 ConcurrentMap <Long , Closure > closureMap = this .pendingClosure .get (term );
229244 if (closureMap != null && closureMap .size () > 0 ) {
245+ boolean anyChecked = false ;
230246 for (long i = beginIndex ; i < maxIndex ; i ++) {
231247 Closure closure = closureMap .get (i );
232248 if (closure == null ) {
233249 // index may be removed for complete, we should continue scan
234250 } else if (closure .isTimeOut ()) {
235251 closure .done (Status .error (DLedgerResponseCode .WAIT_QUORUM_ACK_TIMEOUT ));
236252 closureMap .remove (i );
253+ anyChecked = true ;
237254 } else {
255+ anyChecked = true ;
238256 break ;
239257 }
240258 }
259+ if (!anyChecked ) {
260+ // since the batch append may have index discontinuous, we should check here
261+ for (Map .Entry <Long , Closure > futureEntry : closureMap .entrySet ()) {
262+ Closure closure = futureEntry .getValue ();
263+ if (closure .isTimeOut ()) {
264+ closure .done (Status .error (DLedgerResponseCode .WAIT_QUORUM_ACK_TIMEOUT ));
265+ closureMap .remove (futureEntry .getKey ());
266+ }
267+ }
268+ }
241269 }
242270 }
243271
@@ -334,6 +362,7 @@ public void doWork() {
334362 // advance the commit index
335363 // we can only commit the index whose term is equals to current term (refer to raft paper 5.4.2)
336364 if (DLedgerEntryPusher .this .memberState .leaderUpdateCommittedIndex (currTerm , quorumIndex )) {
365+ dLedgerStore .updateCommittedIndex (quorumIndex );
337366 DLedgerEntryPusher .this .fsmCaller .onCommitted (quorumIndex );
338367 } else {
339368 // If the commit index is not advanced, we should wait for the next round
@@ -941,6 +970,7 @@ private void handleDoAppend(long writeIndex, PushEntryRequest request,
941970 future .complete (buildResponse (request , DLedgerResponseCode .SUCCESS .getCode ()));
942971 long committedIndex = Math .min (dLedgerStore .getLedgerEndIndex (), request .getCommitIndex ());
943972 if (DLedgerEntryPusher .this .memberState .followerUpdateCommittedIndex (committedIndex )) {
973+ dLedgerStore .updateCommittedIndex (committedIndex );
944974 DLedgerEntryPusher .this .fsmCaller .onCommitted (committedIndex );
945975 }
946976 } catch (Throwable t ) {
@@ -1004,6 +1034,7 @@ private CompletableFuture<PushEntryResponse> handleDoCommit(long committedIndex,
10041034 PreConditions .check (request .getType () == PushEntryRequest .Type .COMMIT , DLedgerResponseCode .UNKNOWN );
10051035 committedIndex = committedIndex <= dLedgerStore .getLedgerEndIndex () ? committedIndex : dLedgerStore .getLedgerEndIndex ();
10061036 if (DLedgerEntryPusher .this .memberState .followerUpdateCommittedIndex (committedIndex )) {
1037+ dLedgerStore .updateCommittedIndex (committedIndex );
10071038 DLedgerEntryPusher .this .fsmCaller .onCommitted (committedIndex );
10081039 }
10091040 future .complete (buildResponse (request , DLedgerResponseCode .SUCCESS .getCode ()));
@@ -1024,6 +1055,7 @@ private CompletableFuture<PushEntryResponse> handleDoTruncate(long truncateIndex
10241055 future .complete (buildResponse (request , DLedgerResponseCode .SUCCESS .getCode ()));
10251056 long committedIndex = request .getCommitIndex () <= dLedgerStore .getLedgerEndIndex () ? request .getCommitIndex () : dLedgerStore .getLedgerEndIndex ();
10261057 if (DLedgerEntryPusher .this .memberState .followerUpdateCommittedIndex (committedIndex )) {
1058+ dLedgerStore .updateCommittedIndex (committedIndex );
10271059 DLedgerEntryPusher .this .fsmCaller .onCommitted (committedIndex );
10281060 }
10291061 } catch (Throwable t ) {
0 commit comments