Skip to content

Commit 4ee007d

Browse files
cserwendengzhiwen1
andauthored
[ISSUE #247]revote when leader's term is not the biggest
Co-authored-by: dengzhiwen1 <dengzhiwen1@xiaomi.com>
1 parent 6765275 commit 4ee007d

File tree

2 files changed

+12
-3
lines changed

2 files changed

+12
-3
lines changed

dledger/src/main/java/io/openmessaging/storage/dledger/DLedgerLeaderElector.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -304,16 +304,24 @@ private void sendHeartbeats(long term, String leaderId) throws Exception {
304304
}
305305
});
306306
}
307-
beatLatch.await(heartBeatTimeIntervalMs, TimeUnit.MILLISECONDS);
307+
long voteResultWaitTime = 10;
308+
beatLatch.await(heartBeatTimeIntervalMs - voteResultWaitTime, TimeUnit.MILLISECONDS);
309+
Thread.sleep(voteResultWaitTime);
310+
311+
//abnormal case, deal with it immediately
312+
if (maxTerm.get() > term) {
313+
LOGGER.warn("[{}] currentTerm{} is not the biggest={}, deal with it", memberState.getSelfId(), term, maxTerm.get());
314+
changeRoleToCandidate(maxTerm.get());
315+
return;
316+
}
317+
308318
if (memberState.isQuorum(succNum.get())) {
309319
lastSuccHeartBeatTime = System.currentTimeMillis();
310320
} else {
311321
LOGGER.info("[{}] Parse heartbeat responses in cost={} term={} allNum={} succNum={} notReadyNum={} inconsistLeader={} maxTerm={} peerSize={} lastSuccHeartBeatTime={}",
312322
memberState.getSelfId(), DLedgerUtils.elapsed(startHeartbeatTimeMs), term, allNum.get(), succNum.get(), notReadyNum.get(), inconsistLeader.get(), maxTerm.get(), memberState.peerSize(), new Timestamp(lastSuccHeartBeatTime));
313323
if (memberState.isQuorum(succNum.get() + notReadyNum.get())) {
314324
lastSendHeartBeatTime = -1;
315-
} else if (maxTerm.get() > term) {
316-
changeRoleToCandidate(maxTerm.get());
317325
} else if (inconsistLeader.get()) {
318326
changeRoleToCandidate(term);
319327
} else if (DLedgerUtils.elapsed(lastSuccHeartBeatTime) > (long) maxHeartBeatLeak * heartBeatTimeIntervalMs) {

dledger/src/main/java/io/openmessaging/storage/dledger/DLedgerServer.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -367,6 +367,7 @@ public CompletableFuture<PushEntryResponse> handlePush(PushEntryRequest request)
367367
@Override
368368
public CompletableFuture<LeadershipTransferResponse> handleLeadershipTransfer(
369369
LeadershipTransferRequest request) throws Exception {
370+
LOGGER.info("handleLeadershipTransfer: {}", request);
370371
try {
371372
PreConditions.check(memberState.getSelfId().equals(request.getRemoteId()), DLedgerResponseCode.UNKNOWN_MEMBER, "%s != %s", request.getRemoteId(), memberState.getSelfId());
372373
PreConditions.check(memberState.getGroup().equals(request.getGroup()), DLedgerResponseCode.UNKNOWN_GROUP, "%s != %s", request.getGroup(), memberState.getGroup());

0 commit comments

Comments
 (0)