@@ -590,15 +590,15 @@ private void doAppend() throws Exception {
590590 private void sendBatchAppendEntryRequest () throws Exception {
591591 batchAppendEntryRequest .setCommitIndex (dLedgerStore .getCommittedIndex ());
592592 CompletableFuture <PushEntryResponse > responseFuture = dLedgerRpcService .push (batchAppendEntryRequest );
593- batchPendingMap .put (batchAppendEntryRequest .getLastEntryIndex (), new Pair <>(System .currentTimeMillis (), batchAppendEntryRequest .getCount ()));
593+ batchPendingMap .put (batchAppendEntryRequest .getFirstEntryIndex (), new Pair <>(System .currentTimeMillis (), batchAppendEntryRequest .getCount ()));
594594 responseFuture .whenComplete ((x , ex ) -> {
595595 try {
596596 PreConditions .check (ex == null , DLedgerResponseCode .UNKNOWN );
597597 DLedgerResponseCode responseCode = DLedgerResponseCode .valueOf (x .getCode ());
598598 switch (responseCode ) {
599599 case SUCCESS :
600600 batchPendingMap .remove (x .getIndex ());
601- updatePeerWaterMark (x .getTerm (), peerId , x .getIndex ());
601+ updatePeerWaterMark (x .getTerm (), peerId , x .getIndex () + x . getCount () - 1 );
602602 break ;
603603 case INCONSISTENT_STATE :
604604 logger .info ("[Push-{}]Get INCONSISTENT_STATE when batch push index={} term={}" , peerId , x .getIndex (), x .getTerm ());
@@ -891,7 +891,8 @@ private PushEntryResponse buildResponse(PushEntryRequest request, int code) {
891891 response .setCode (code );
892892 response .setTerm (request .getTerm ());
893893 if (request .getType () != PushEntryRequest .Type .COMMIT ) {
894- response .setIndex (request .getLastEntryIndex ());
894+ response .setIndex (request .getFirstEntryIndex ());
895+ response .setCount (request .getCount ());
895896 }
896897 response .setBeginIndex (dLedgerStore .getLedgerBeginIndex ());
897898 response .setEndIndex (dLedgerStore .getLedgerEndIndex ());
0 commit comments