2929import java .util .ArrayList ;
3030import java .util .Arrays ;
3131import java .util .List ;
32+ import java .util .Map ;
3233import java .util .Random ;
3334import java .util .concurrent .CompletableFuture ;
3435import java .util .concurrent .CountDownLatch ;
@@ -111,9 +112,7 @@ public CompletableFuture<HeartBeatResponse> handleHeartBeat(HeartBeatRequest req
111112 return CompletableFuture .completedFuture (new HeartBeatResponse ().term (memberState .currTerm ()).code (DLedgerResponseCode .UNEXPECTED_MEMBER .getCode ()));
112113 }
113114
114- if (request .getTerm () < memberState .currTerm ()) {
115- return CompletableFuture .completedFuture (new HeartBeatResponse ().term (memberState .currTerm ()).code (DLedgerResponseCode .EXPIRED_TERM .getCode ()));
116- } else if (request .getTerm () == memberState .currTerm ()) {
115+ if (request .getTerm () == memberState .currTerm ()) {
117116 if (request .getLeaderId ().equals (memberState .getLeaderId ())) {
118117 lastLeaderHeartBeatTime = System .currentTimeMillis ();
119118 return CompletableFuture .completedFuture (new HeartBeatResponse ());
@@ -124,8 +123,15 @@ public CompletableFuture<HeartBeatResponse> handleHeartBeat(HeartBeatRequest req
124123 //hold the lock to get the latest term and leaderId
125124 synchronized (memberState ) {
126125 if (request .getTerm () < memberState .currTerm ()) {
126+ if (memberState .isCandidate () && request .isNeedCheckMemberState ()) {
127+ logger .warn ("[CHECK_MEMBER_STATE] [HandleHeartBeat] remoteId={} need check member state" , request .getLeaderId ());
128+ memberState .recoveryToFollower (request .getTerm (), request .getLeaderId ());
129+ return CompletableFuture .completedFuture (new HeartBeatResponse ());
130+ }
127131 return CompletableFuture .completedFuture (new HeartBeatResponse ().term (memberState .currTerm ()).code (DLedgerResponseCode .EXPIRED_TERM .getCode ()));
128- } else if (request .getTerm () == memberState .currTerm ()) {
132+ }
133+
134+ if (request .getTerm () == memberState .currTerm ()) {
129135 if (memberState .getLeaderId () == null ) {
130136 changeRoleToFollower (request .getTerm (), request .getLeaderId ());
131137 return CompletableFuture .completedFuture (new HeartBeatResponse ());
@@ -283,10 +289,12 @@ private void sendHeartbeats(long term, String leaderId) throws Exception {
283289 break ;
284290 }
285291
286- if (x .getCode () == DLedgerResponseCode .NETWORK_ERROR .getCode ())
292+ if (x .getCode () == DLedgerResponseCode .NETWORK_ERROR .getCode ()) {
287293 memberState .getPeersLiveTable ().put (id , Boolean .FALSE );
288- else
294+ } else {
289295 memberState .getPeersLiveTable ().put (id , Boolean .TRUE );
296+ memberState .getPeersTermTable ().put (id , x .getTerm ());
297+ }
290298
291299 if (memberState .isQuorum (succNum .get ())
292300 || memberState .isQuorum (succNum .get () + notReadyNum .get ())) {
@@ -305,6 +313,7 @@ private void sendHeartbeats(long term, String leaderId) throws Exception {
305313 beatLatch .await (heartBeatTimeIntervalMs , TimeUnit .MILLISECONDS );
306314 if (memberState .isQuorum (succNum .get ())) {
307315 lastSuccHeartBeatTime = System .currentTimeMillis ();
316+ checkPeersTermTable ();
308317 } else {
309318 logger .info ("[{}] Parse heartbeat responses in cost={} term={} allNum={} succNum={} notReadyNum={} inconsistLeader={} maxTerm={} peerSize={} lastSuccHeartBeatTime={}" ,
310319 memberState .getSelfId (), DLedgerUtils .elapsed (startHeartbeatTimeMs ), term , allNum .get (), succNum .get (), notReadyNum .get (), inconsistLeader .get (), maxTerm .get (), memberState .peerSize (), new Timestamp (lastSuccHeartBeatTime ));
@@ -320,6 +329,28 @@ private void sendHeartbeats(long term, String leaderId) throws Exception {
320329 }
321330 }
322331
332+ private void checkPeersTermTable () throws Exception {
333+ if (memberState .getSelfId ().equals (memberState .getLeaderId ())) {
334+ long leaderTerm = memberState .getPeersTermTable ().getOrDefault (memberState .getLeaderId (), -1L );
335+ for (Map .Entry <String , Long > entryTerm : memberState .getPeersTermTable ().entrySet ()) {
336+ if (entryTerm .getKey ().equals (memberState .getSelfId ())) {
337+ continue ;
338+ }
339+
340+ if (entryTerm .getValue () > leaderTerm ) {
341+ HeartBeatRequest heartBeatRequest = new HeartBeatRequest ();
342+ heartBeatRequest .setGroup (memberState .getGroup ());
343+ heartBeatRequest .setLocalId (memberState .getSelfId ());
344+ heartBeatRequest .setRemoteId (entryTerm .getKey ());
345+ heartBeatRequest .setLeaderId (memberState .getLeaderId ());
346+ heartBeatRequest .setNeedCheckMemberState (true );
347+ heartBeatRequest .setTerm (leaderTerm );
348+ dLedgerRpcService .heartBeat (heartBeatRequest );
349+ }
350+ }
351+ }
352+ }
353+
323354 private void maintainAsLeader () throws Exception {
324355 if (DLedgerUtils .elapsed (lastSendHeartBeatTime ) > heartBeatTimeIntervalMs ) {
325356 long term ;
0 commit comments