2929import java .util .ArrayList ;
3030import java .util .Arrays ;
3131import java .util .List ;
32+ import java .util .Map ;
3233import java .util .Random ;
3334import java .util .concurrent .CompletableFuture ;
3435import java .util .concurrent .CountDownLatch ;
@@ -111,6 +112,13 @@ public CompletableFuture<HeartBeatResponse> handleHeartBeat(HeartBeatRequest req
111112 return CompletableFuture .completedFuture (new HeartBeatResponse ().term (memberState .currTerm ()).code (DLedgerResponseCode .UNEXPECTED_MEMBER .getCode ()));
112113 }
113114
115+ if (memberState .isCandidate () && request .isNeedCheckMemberState ()) {
116+ logger .warn ("[CHECK_MEMBER_STATE] [HandleHeartBeat] remoteId={} need check member state" , request .getLeaderId ());
117+ if (request .getTerm () < memberState .currTerm ()) {
118+ memberState .recoveryToFollower (request .getTerm (), request .getLeaderId ());
119+ }
120+ }
121+
114122 if (request .getTerm () < memberState .currTerm ()) {
115123 return CompletableFuture .completedFuture (new HeartBeatResponse ().term (memberState .currTerm ()).code (DLedgerResponseCode .EXPIRED_TERM .getCode ()));
116124 } else if (request .getTerm () == memberState .currTerm ()) {
@@ -283,10 +291,12 @@ private void sendHeartbeats(long term, String leaderId) throws Exception {
283291 break ;
284292 }
285293
286- if (x .getCode () == DLedgerResponseCode .NETWORK_ERROR .getCode ())
294+ if (x .getCode () == DLedgerResponseCode .NETWORK_ERROR .getCode ()) {
287295 memberState .getPeersLiveTable ().put (id , Boolean .FALSE );
288- else
296+ } else {
289297 memberState .getPeersLiveTable ().put (id , Boolean .TRUE );
298+ memberState .getPeersTermTable ().put (id , x .getTerm ());
299+ }
290300
291301 if (memberState .isQuorum (succNum .get ())
292302 || memberState .isQuorum (succNum .get () + notReadyNum .get ())) {
@@ -305,6 +315,7 @@ private void sendHeartbeats(long term, String leaderId) throws Exception {
305315 beatLatch .await (heartBeatTimeIntervalMs , TimeUnit .MILLISECONDS );
306316 if (memberState .isQuorum (succNum .get ())) {
307317 lastSuccHeartBeatTime = System .currentTimeMillis ();
318+ checkPeersTermTable ();
308319 } else {
309320 logger .info ("[{}] Parse heartbeat responses in cost={} term={} allNum={} succNum={} notReadyNum={} inconsistLeader={} maxTerm={} peerSize={} lastSuccHeartBeatTime={}" ,
310321 memberState .getSelfId (), DLedgerUtils .elapsed (startHeartbeatTimeMs ), term , allNum .get (), succNum .get (), notReadyNum .get (), inconsistLeader .get (), maxTerm .get (), memberState .peerSize (), new Timestamp (lastSuccHeartBeatTime ));
@@ -320,6 +331,28 @@ private void sendHeartbeats(long term, String leaderId) throws Exception {
320331 }
321332 }
322333
334+ private void checkPeersTermTable () throws Exception {
335+ if (memberState .getSelfId ().equals (memberState .getLeaderId ())) {
336+ long leaderTerm = memberState .getPeersTermTable ().getOrDefault (memberState .getLeaderId (), -1L );
337+ for (Map .Entry <String , Long > entryTerm : memberState .getPeersTermTable ().entrySet ()) {
338+ if (entryTerm .getKey ().equals (memberState .getSelfId ())) {
339+ continue ;
340+ }
341+
342+ if (entryTerm .getValue () > leaderTerm ) {
343+ HeartBeatRequest heartBeatRequest = new HeartBeatRequest ();
344+ heartBeatRequest .setGroup (memberState .getGroup ());
345+ heartBeatRequest .setLocalId (memberState .getSelfId ());
346+ heartBeatRequest .setRemoteId (memberState .getSelfId ());
347+ heartBeatRequest .setLeaderId (memberState .getLeaderId ());
348+ heartBeatRequest .setNeedCheckMemberState (true );
349+ heartBeatRequest .setTerm (leaderTerm );
350+ dLedgerRpcService .heartBeat (heartBeatRequest );
351+ }
352+ }
353+ }
354+ }
355+
323356 private void maintainAsLeader () throws Exception {
324357 if (DLedgerUtils .elapsed (lastSendHeartBeatTime ) > heartBeatTimeIntervalMs ) {
325358 long term ;
0 commit comments