5151
5252public class DLedgerEntryPusher {
5353
54- private static Logger logger = LoggerFactory .getLogger (DLedgerEntryPusher .class );
54+ private static final Logger LOGGER = LoggerFactory .getLogger (DLedgerEntryPusher .class );
5555
56- private DLedgerConfig dLedgerConfig ;
57- private DLedgerStore dLedgerStore ;
56+ private final DLedgerConfig dLedgerConfig ;
57+ private final DLedgerStore dLedgerStore ;
5858
5959 private final MemberState memberState ;
6060
61- private DLedgerRpcService dLedgerRpcService ;
61+ private final DLedgerRpcService dLedgerRpcService ;
6262
63- private Map <Long , ConcurrentMap <String , Long >> peerWaterMarksByTerm = new ConcurrentHashMap <>();
64- private Map <Long , ConcurrentMap <Long , TimeoutFuture <AppendEntryResponse >>> pendingAppendResponsesByTerm = new ConcurrentHashMap <>();
63+ private final Map <Long , ConcurrentMap <String , Long >> peerWaterMarksByTerm = new ConcurrentHashMap <>();
64+ private final Map <Long , ConcurrentMap <Long , TimeoutFuture <AppendEntryResponse >>> pendingAppendResponsesByTerm = new ConcurrentHashMap <>();
6565
66- private EntryHandler entryHandler ;
66+ private final EntryHandler entryHandler ;
6767
68- private QuorumAckChecker quorumAckChecker ;
68+ private final QuorumAckChecker quorumAckChecker ;
6969
70- private Map <String , EntryDispatcher > dispatcherMap = new HashMap <>();
70+ private final Map <String , EntryDispatcher > dispatcherMap = new HashMap <>();
7171
7272 private Optional <StateMachineCaller > fsmCaller ;
7373
@@ -79,11 +79,11 @@ public DLedgerEntryPusher(DLedgerConfig dLedgerConfig, MemberState memberState,
7979 this .dLedgerRpcService = dLedgerRpcService ;
8080 for (String peer : memberState .getPeerMap ().keySet ()) {
8181 if (!peer .equals (memberState .getSelfId ())) {
82- dispatcherMap .put (peer , new EntryDispatcher (peer , logger ));
82+ dispatcherMap .put (peer , new EntryDispatcher (peer , LOGGER ));
8383 }
8484 }
85- this .entryHandler = new EntryHandler (logger );
86- this .quorumAckChecker = new QuorumAckChecker (logger );
85+ this .entryHandler = new EntryHandler (LOGGER );
86+ this .quorumAckChecker = new QuorumAckChecker (LOGGER );
8787 this .fsmCaller = Optional .empty ();
8888 }
8989
@@ -113,7 +113,7 @@ public CompletableFuture<PushEntryResponse> handlePush(PushEntryRequest request)
113113
114114 private void checkTermForWaterMark (long term , String env ) {
115115 if (!peerWaterMarksByTerm .containsKey (term )) {
116- logger .info ("Initialize the watermark in {} for term={}" , env , term );
116+ LOGGER .info ("Initialize the watermark in {} for term={}" , env , term );
117117 ConcurrentMap <String , Long > waterMarks = new ConcurrentHashMap <>();
118118 for (String peer : memberState .getPeerMap ().keySet ()) {
119119 waterMarks .put (peer , -1L );
@@ -124,7 +124,7 @@ private void checkTermForWaterMark(long term, String env) {
124124
125125 private void checkTermForPendingMap (long term , String env ) {
126126 if (!pendingAppendResponsesByTerm .containsKey (term )) {
127- logger .info ("Initialize the pending append map in {} for term={}" , env , term );
127+ LOGGER .info ("Initialize the pending append map in {} for term={}" , env , term );
128128 pendingAppendResponsesByTerm .putIfAbsent (term , new ConcurrentHashMap <>());
129129 }
130130 }
@@ -162,7 +162,7 @@ public CompletableFuture<AppendEntryResponse> waitAck(DLedgerEntry entry, boolea
162162 future .setPos (entry .getPos ());
163163 CompletableFuture <AppendEntryResponse > old = pendingAppendResponsesByTerm .get (entry .getTerm ()).put (entry .getIndex (), future );
164164 if (old != null ) {
165- logger .warn ("[MONITOR] get old wait at index={}" , entry .getIndex ());
165+ LOGGER .warn ("[MONITOR] get old wait at index={}" , entry .getIndex ());
166166 }
167167 return future ;
168168 }
@@ -185,13 +185,13 @@ public boolean completeResponseFuture(final long index) {
185185 if (responses != null ) {
186186 CompletableFuture <AppendEntryResponse > future = responses .remove (index );
187187 if (future != null && !future .isDone ()) {
188- logger .info ("Complete future, term {}, index {}" , term , index );
188+ LOGGER .info ("Complete future, term {}, index {}" , term , index );
189189 AppendEntryResponse response = new AppendEntryResponse ();
190190 response .setGroup (this .memberState .getGroup ());
191191 response .setTerm (term );
192192 response .setIndex (index );
193193 response .setLeaderId (this .memberState .getSelfId ());
194- response .setPos (((AppendFuture ) future ).getPos ());
194+ response .setPos (((AppendFuture <?> ) future ).getPos ());
195195 future .complete (response );
196196 return true ;
197197 }
@@ -339,7 +339,7 @@ public void doWork() {
339339 ConcurrentMap <Long , TimeoutFuture <AppendEntryResponse >> responses = pendingAppendResponsesByTerm .get (currTerm );
340340 boolean needCheck = false ;
341341 int ackNum = 0 ;
342- for (Long i = quorumIndex ; i > lastQuorumIndex ; i --) {
342+ for (long i = quorumIndex ; i > lastQuorumIndex ; i --) {
343343 try {
344344 CompletableFuture <AppendEntryResponse > future = responses .remove (i );
345345 if (future == null ) {
@@ -373,7 +373,7 @@ public void doWork() {
373373 }
374374 lastQuorumIndex = quorumIndex ;
375375 } catch (Throwable t ) {
376- DLedgerEntryPusher .logger .error ("Error in {}" , getName (), t );
376+ DLedgerEntryPusher .LOGGER .error ("Error in {}" , getName (), t );
377377 DLedgerUtils .sleep (100 );
378378 }
379379 }
@@ -398,41 +398,41 @@ public void doWork() {
398398 */
399399 private class EntryDispatcher extends ShutdownAbleThread {
400400
401- private AtomicReference <PushEntryRequest .Type > type = new AtomicReference <>(PushEntryRequest .Type .COMPARE );
401+ private final AtomicReference <PushEntryRequest .Type > type = new AtomicReference <>(PushEntryRequest .Type .COMPARE );
402402 private long lastPushCommitTimeMs = -1 ;
403- private String peerId ;
403+ private final String peerId ;
404404 private long compareIndex = -1 ;
405405 private long writeIndex = -1 ;
406- private int maxPendingSize = 1000 ;
406+ private final int maxPendingSize = 1000 ;
407407 private long term = -1 ;
408408 private String leaderId = null ;
409409 private long lastCheckLeakTimeMs = System .currentTimeMillis ();
410- private ConcurrentMap <Long , Long > pendingMap = new ConcurrentHashMap <>();
411- private ConcurrentMap <Long , Pair <Long , Integer >> batchPendingMap = new ConcurrentHashMap <>();
412- private PushEntryRequest batchAppendEntryRequest = new PushEntryRequest ();
413- private Quota quota = new Quota (dLedgerConfig .getPeerPushQuota ());
410+ private final ConcurrentMap <Long , Long > pendingMap = new ConcurrentHashMap <>();
411+ private final ConcurrentMap <Long , Pair <Long , Integer >> batchPendingMap = new ConcurrentHashMap <>();
412+ private final PushEntryRequest batchAppendEntryRequest = new PushEntryRequest ();
413+ private final Quota quota = new Quota (dLedgerConfig .getPeerPushQuota ());
414414
415415 public EntryDispatcher (String peerId , Logger logger ) {
416416 super ("EntryDispatcher-" + memberState .getSelfId () + "-" + peerId , logger );
417417 this .peerId = peerId ;
418418 }
419419
420- private boolean checkAndFreshState () {
420+ private boolean checkNotLeaderAndFreshState () {
421421 if (!memberState .isLeader ()) {
422- return false ;
422+ return true ;
423423 }
424424 if (term != memberState .currTerm () || leaderId == null || !leaderId .equals (memberState .getLeaderId ())) {
425425 synchronized (memberState ) {
426426 if (!memberState .isLeader ()) {
427- return false ;
427+ return true ;
428428 }
429429 PreConditions .check (memberState .getSelfId ().equals (memberState .getLeaderId ()), DLedgerResponseCode .UNKNOWN );
430430 term = memberState .currTerm ();
431431 leaderId = memberState .getSelfId ();
432432 changeState (-1 , PushEntryRequest .Type .COMPARE );
433433 }
434434 }
435- return true ;
435+ return false ;
436436 }
437437
438438 private PushEntryRequest buildPushRequest (DLedgerEntry entry , PushEntryRequest .Type target ) {
@@ -548,7 +548,7 @@ private void doCheckAppendResponse() throws Exception {
548548
549549 private void doAppend () throws Exception {
550550 while (true ) {
551- if (! checkAndFreshState ()) {
551+ if (checkNotLeaderAndFreshState ()) {
552552 break ;
553553 }
554554 if (type .get () != PushEntryRequest .Type .APPEND ) {
@@ -639,7 +639,7 @@ private void doCheckBatchAppendResponse() throws Exception {
639639
640640 private void doBatchAppend () throws Exception {
641641 while (true ) {
642- if (! checkAndFreshState ()) {
642+ if (checkNotLeaderAndFreshState ()) {
643643 break ;
644644 }
645645 if (type .get () != PushEntryRequest .Type .APPEND ) {
@@ -675,10 +675,12 @@ private void doTruncate(long truncateIndex) throws Exception {
675675 PreConditions .check (type .get () == PushEntryRequest .Type .TRUNCATE , DLedgerResponseCode .UNKNOWN );
676676 DLedgerEntry truncateEntry = dLedgerStore .get (truncateIndex );
677677 PreConditions .check (truncateEntry != null , DLedgerResponseCode .UNKNOWN );
678+
678679 logger .info ("[Push-{}]Will push data to truncate truncateIndex={} pos={}" , peerId , truncateIndex , truncateEntry .getPos ());
679680 PushEntryRequest truncateRequest = buildPushRequest (truncateEntry , PushEntryRequest .Type .TRUNCATE );
680681 PushEntryResponse truncateResponse = dLedgerRpcService .push (truncateRequest ).get (3 , TimeUnit .SECONDS );
681682 PreConditions .check (truncateResponse != null , DLedgerResponseCode .UNKNOWN , "truncateIndex=%d" , truncateIndex );
683+
682684 PreConditions .check (truncateResponse .getCode () == DLedgerResponseCode .SUCCESS .getCode (), DLedgerResponseCode .valueOf (truncateResponse .getCode ()), "truncateIndex=%d" , truncateIndex );
683685 lastPushCommitTimeMs = System .currentTimeMillis ();
684686 changeState (truncateIndex , PushEntryRequest .Type .APPEND );
@@ -717,7 +719,7 @@ private synchronized void changeState(long index, PushEntryRequest.Type target)
717719
718720 private void doCompare () throws Exception {
719721 while (true ) {
720- if (! checkAndFreshState ()) {
722+ if (checkNotLeaderAndFreshState ()) {
721723 break ;
722724 }
723725 if (type .get () != PushEntryRequest .Type .COMPARE
@@ -742,6 +744,7 @@ private void doCompare() throws Exception {
742744 CompletableFuture <PushEntryResponse > responseFuture = dLedgerRpcService .push (request );
743745 PushEntryResponse response = responseFuture .get (3 , TimeUnit .SECONDS );
744746 PreConditions .check (response != null , DLedgerResponseCode .INTERNAL_ERROR , "compareIndex=%d" , compareIndex );
747+
745748 PreConditions .check (response .getCode () == DLedgerResponseCode .INCONSISTENT_STATE .getCode () || response .getCode () == DLedgerResponseCode .SUCCESS .getCode ()
746749 , DLedgerResponseCode .valueOf (response .getCode ()), "compareIndex=%d" , compareIndex );
747750 long truncateIndex = -1 ;
@@ -805,7 +808,7 @@ private void doCompare() throws Exception {
805808 @ Override
806809 public void doWork () {
807810 try {
808- if (! checkAndFreshState ()) {
811+ if (checkNotLeaderAndFreshState ()) {
809812 waitForRunning (1 );
810813 return ;
811814 }
@@ -821,7 +824,7 @@ public void doWork() {
821824 }
822825 waitForRunning (1 );
823826 } catch (Throwable t ) {
824- DLedgerEntryPusher .logger .error ("[Push-{}]Error in {} writeIndex={} compareIndex={}" , peerId , getName (), writeIndex , compareIndex , t );
827+ DLedgerEntryPusher .LOGGER .error ("[Push-{}]Error in {} writeIndex={} compareIndex={}" , peerId , getName (), writeIndex , compareIndex , t );
825828 changeState (-1 , PushEntryRequest .Type .COMPARE );
826829 DLedgerUtils .sleep (500 );
827830 }
@@ -837,7 +840,7 @@ private class EntryHandler extends ShutdownAbleThread {
837840 private long lastCheckFastForwardTimeMs = System .currentTimeMillis ();
838841
839842 ConcurrentMap <Long , Pair <PushEntryRequest , CompletableFuture <PushEntryResponse >>> writeRequestMap = new ConcurrentHashMap <>();
840- BlockingQueue <Pair <PushEntryRequest , CompletableFuture <PushEntryResponse >>> compareOrTruncateRequests = new ArrayBlockingQueue <Pair < PushEntryRequest , CompletableFuture < PushEntryResponse >> >(100 );
843+ BlockingQueue <Pair <PushEntryRequest , CompletableFuture <PushEntryResponse >>> compareOrTruncateRequests = new ArrayBlockingQueue <>(100 );
841844
842845 public EntryHandler (Logger logger ) {
843846 super ("EntryHandler-" + memberState .getSelfId (), logger );
@@ -1073,7 +1076,7 @@ public void doWork() {
10731076 }
10741077 }
10751078 } catch (Throwable t ) {
1076- DLedgerEntryPusher .logger .error ("Error in {}" , getName (), t );
1079+ DLedgerEntryPusher .LOGGER .error ("Error in {}" , getName (), t );
10771080 DLedgerUtils .sleep (100 );
10781081 }
10791082 }
0 commit comments