Skip to content

Commit 31738a4

Browse files
authored
[ISSUE #134] fix batch push error (#137)
Signed-off-by: zhangyang <Git_Yang@163.com>
1 parent 6f0bf36 commit 31738a4

File tree

1 file changed

+6
-17
lines changed

1 file changed

+6
-17
lines changed

src/main/java/io/openmessaging/storage/dledger/DLedgerEntryPusher.java

Lines changed: 6 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -589,7 +589,7 @@ private void doAppend() throws Exception {
589589
private void sendBatchAppendEntryRequest() throws Exception {
590590
batchAppendEntryRequest.setCommitIndex(dLedgerStore.getCommittedIndex());
591591
CompletableFuture<PushEntryResponse> responseFuture = dLedgerRpcService.push(batchAppendEntryRequest);
592-
batchPendingMap.put(batchAppendEntryRequest.getFirstEntryIndex(), new Pair<>(System.currentTimeMillis(), batchAppendEntryRequest.getCount()));
592+
batchPendingMap.put(batchAppendEntryRequest.getLastEntryIndex(), new Pair<>(System.currentTimeMillis(), batchAppendEntryRequest.getCount()));
593593
responseFuture.whenComplete((x, ex) -> {
594594
try {
595595
PreConditions.check(ex == null, DLedgerResponseCode.UNKNOWN);
@@ -890,24 +890,13 @@ private PushEntryResponse buildResponse(PushEntryRequest request, int code) {
890890
response.setCode(code);
891891
response.setTerm(request.getTerm());
892892
if (request.getType() != PushEntryRequest.Type.COMMIT) {
893-
response.setIndex(request.getEntry().getIndex());
893+
response.setIndex(request.getLastEntryIndex());
894894
}
895895
response.setBeginIndex(dLedgerStore.getLedgerBeginIndex());
896896
response.setEndIndex(dLedgerStore.getLedgerEndIndex());
897897
return response;
898898
}
899899

900-
private PushEntryResponse buildBatchAppendResponse(PushEntryRequest request, int code) {
901-
PushEntryResponse response = new PushEntryResponse();
902-
response.setGroup(request.getGroup());
903-
response.setCode(code);
904-
response.setTerm(request.getTerm());
905-
response.setIndex(request.getLastEntryIndex());
906-
response.setBeginIndex(dLedgerStore.getLedgerBeginIndex());
907-
response.setEndIndex(dLedgerStore.getLedgerEndIndex());
908-
return response;
909-
}
910-
911900
private void handleDoAppend(long writeIndex, PushEntryRequest request,
912901
CompletableFuture<PushEntryResponse> future) {
913902
try {
@@ -975,7 +964,7 @@ private void handleDoBatchAppend(long writeIndex, PushEntryRequest request,
975964
for (DLedgerEntry entry : request.getBatchEntry()) {
976965
dLedgerStore.appendAsFollower(entry, request.getTerm(), request.getLeaderId());
977966
}
978-
future.complete(buildBatchAppendResponse(request, DLedgerResponseCode.SUCCESS.getCode()));
967+
future.complete(buildResponse(request, DLedgerResponseCode.SUCCESS.getCode()));
979968
updateCommittedIndex(request.getTerm(), request.getCommitIndex());
980969
} catch (Throwable t) {
981970
logger.error("[HandleDoBatchAppend]", t);
@@ -999,11 +988,11 @@ private void checkAppendFuture(long endIndex) {
999988
DLedgerEntry dLedgerEntry = pair.getKey().getEntry();
1000989
PreConditions.check(dLedgerEntry.equals(dLedgerStore.get(dLedgerEntry.getIndex())), DLedgerResponseCode.INCONSISTENT_STATE);
1001990
}
1002-
pair.getValue().complete(buildBatchAppendResponse(pair.getKey(), DLedgerResponseCode.SUCCESS.getCode()));
991+
pair.getValue().complete(buildResponse(pair.getKey(), DLedgerResponseCode.SUCCESS.getCode()));
1003992
logger.warn("[PushFallBehind]The leader pushed an batch append entry last index={} smaller than current ledgerEndIndex={}, maybe the last ack is missed", lastEntryIndex, endIndex);
1004993
} catch (Throwable t) {
1005994
logger.error("[PushFallBehind]The leader pushed an batch append entry last index={} smaller than current ledgerEndIndex={}, maybe the last ack is missed", lastEntryIndex, endIndex, t);
1006-
pair.getValue().complete(buildBatchAppendResponse(pair.getKey(), DLedgerResponseCode.INCONSISTENT_STATE.getCode()));
995+
pair.getValue().complete(buildResponse(pair.getKey(), DLedgerResponseCode.INCONSISTENT_STATE.getCode()));
1007996
}
1008997
writeRequestMap.remove(pair.getKey().getFirstEntryIndex());
1009998
continue;
@@ -1027,7 +1016,7 @@ private void checkAppendFuture(long endIndex) {
10271016
return;
10281017
}
10291018
logger.warn("[PushFastForward] ledgerEndIndex={} entryIndex={}", endIndex, minFastForwardIndex);
1030-
pair.getValue().complete(buildBatchAppendResponse(pair.getKey(), DLedgerResponseCode.INCONSISTENT_STATE.getCode()));
1019+
pair.getValue().complete(buildResponse(pair.getKey(), DLedgerResponseCode.INCONSISTENT_STATE.getCode()));
10311020
}
10321021
/**
10331022
* The leader does push entries to follower, and record the pushed index. But in the following conditions, the push may get stopped.

0 commit comments

Comments
 (0)