3232import io .openmessaging .storage .dledger .protocol .MetadataResponse ;
3333import io .openmessaging .storage .dledger .protocol .PullEntriesRequest ;
3434import io .openmessaging .storage .dledger .protocol .PullEntriesResponse ;
35+ import io .openmessaging .storage .dledger .protocol .PullIndexRequest ;
36+ import io .openmessaging .storage .dledger .protocol .PullIndexResponse ;
3537import io .openmessaging .storage .dledger .protocol .PushEntryRequest ;
3638import io .openmessaging .storage .dledger .protocol .PushEntryResponse ;
3739import io .openmessaging .storage .dledger .protocol .RequestOrResponse ;
@@ -77,11 +79,13 @@ public DLedgerRpcNettyService(DLedgerServer dLedgerServer) {
7779 this (dLedgerServer , null , null , null );
7880 }
7981
80- public DLedgerRpcNettyService (DLedgerServer dLedgerServer , NettyServerConfig nettyServerConfig , NettyClientConfig nettyClientConfig ) {
82+ public DLedgerRpcNettyService (DLedgerServer dLedgerServer , NettyServerConfig nettyServerConfig ,
83+ NettyClientConfig nettyClientConfig ) {
8184 this (dLedgerServer , nettyServerConfig , nettyClientConfig , null );
8285 }
8386
84- public DLedgerRpcNettyService (DLedgerServer dLedgerServer , NettyServerConfig nettyServerConfig , NettyClientConfig nettyClientConfig , ChannelEventListener channelEventListener ) {
87+ public DLedgerRpcNettyService (DLedgerServer dLedgerServer , NettyServerConfig nettyServerConfig ,
88+ NettyClientConfig nettyClientConfig , ChannelEventListener channelEventListener ) {
8589 this .dLedgerServer = dLedgerServer ;
8690 this .memberState = dLedgerServer .getMemberState ();
8791 NettyRequestProcessor protocolProcessor = new NettyRequestProcessor () {
@@ -109,6 +113,7 @@ public boolean rejectRequest() {
109113 this .remotingServer .registerProcessor (DLedgerRequestCode .VOTE .getCode (), protocolProcessor , null );
110114 this .remotingServer .registerProcessor (DLedgerRequestCode .HEART_BEAT .getCode (), protocolProcessor , null );
111115 this .remotingServer .registerProcessor (DLedgerRequestCode .LEADERSHIP_TRANSFER .getCode (), protocolProcessor , null );
116+ this .remotingServer .registerProcessor (DLedgerRequestCode .PULL_INDEX .getCode (), protocolProcessor , null );
112117
113118 //start the remoting client
114119 if (nettyClientConfig == null ) {
@@ -252,9 +257,29 @@ public CompletableFuture<PushEntryResponse> push(PushEntryRequest request) throw
252257 return future ;
253258 }
254259
260+ @ Override
261+ public CompletableFuture <PullIndexResponse > pullIndex (PullIndexRequest request ) throws Exception {
262+ CompletableFuture <PullIndexResponse > future = new CompletableFuture <>();
263+ RemotingCommand wrapperRequest = RemotingCommand .createRequestCommand (DLedgerRequestCode .PULL_INDEX .getCode (), null );
264+ wrapperRequest .setBody (JSON .toJSONBytes (request ));
265+ remotingClient .invokeAsync (getPeerAddr (request ), wrapperRequest , 3000 , responseFuture -> {
266+ RemotingCommand responseCommand = responseFuture .getResponseCommand ();
267+ PullIndexResponse response ;
268+ if (null != responseCommand ) {
269+ response = JSON .parseObject (responseCommand .getBody (), PullIndexResponse .class );
270+ } else {
271+ response = new PullIndexResponse ();
272+ response .copyBaseInfo (request );
273+ response .setCode (DLedgerResponseCode .NETWORK_ERROR .getCode ());
274+ }
275+ future .complete (response );
276+ });
277+ return future ;
278+ }
279+
255280 @ Override
256281 public CompletableFuture <LeadershipTransferResponse > leadershipTransfer (
257- LeadershipTransferRequest request ) throws Exception {
282+ LeadershipTransferRequest request ) throws Exception {
258283 CompletableFuture <LeadershipTransferResponse > future = new CompletableFuture <>();
259284 try {
260285 RemotingCommand wrapperRequest = RemotingCommand .createRequestCommand (DLedgerRequestCode .LEADERSHIP_TRANSFER .getCode (), null );
@@ -283,7 +308,7 @@ public CompletableFuture<LeadershipTransferResponse> leadershipTransfer(
283308 }
284309
285310 private void writeResponse (RequestOrResponse storeResp , Throwable t , RemotingCommand request ,
286- ChannelHandlerContext ctx ) {
311+ ChannelHandlerContext ctx ) {
287312 RemotingCommand response = null ;
288313 try {
289314 if (t != null ) {
@@ -319,57 +344,43 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand
319344 case METADATA : {
320345 MetadataRequest metadataRequest = JSON .parseObject (request .getBody (), MetadataRequest .class );
321346 CompletableFuture <MetadataResponse > future = handleMetadata (metadataRequest );
322- future .whenCompleteAsync ((x , y ) -> {
323- writeResponse (x , y , request , ctx );
324- }, futureExecutor );
347+ future .whenCompleteAsync ((x , y ) -> writeResponse (x , y , request , ctx ), futureExecutor );
325348 break ;
326349 }
327350 case APPEND : {
328351 AppendEntryRequest appendEntryRequest = JSON .parseObject (request .getBody (), AppendEntryRequest .class );
329352 CompletableFuture <AppendEntryResponse > future = handleAppend (appendEntryRequest );
330- future .whenCompleteAsync ((x , y ) -> {
331- writeResponse (x , y , request , ctx );
332- }, futureExecutor );
353+ future .whenCompleteAsync ((x , y ) -> writeResponse (x , y , request , ctx ), futureExecutor );
333354 break ;
334355 }
335356 case GET : {
336357 GetEntriesRequest getEntriesRequest = JSON .parseObject (request .getBody (), GetEntriesRequest .class );
337358 CompletableFuture <GetEntriesResponse > future = handleGet (getEntriesRequest );
338- future .whenCompleteAsync ((x , y ) -> {
339- writeResponse (x , y , request , ctx );
340- }, futureExecutor );
359+ future .whenCompleteAsync ((x , y ) -> writeResponse (x , y , request , ctx ), futureExecutor );
341360 break ;
342361 }
343362 case PULL : {
344363 PullEntriesRequest pullEntriesRequest = JSON .parseObject (request .getBody (), PullEntriesRequest .class );
345364 CompletableFuture <PullEntriesResponse > future = handlePull (pullEntriesRequest );
346- future .whenCompleteAsync ((x , y ) -> {
347- writeResponse (x , y , request , ctx );
348- }, futureExecutor );
365+ future .whenCompleteAsync ((x , y ) -> writeResponse (x , y , request , ctx ), futureExecutor );
349366 break ;
350367 }
351368 case PUSH : {
352369 PushEntryRequest pushEntryRequest = JSON .parseObject (request .getBody (), PushEntryRequest .class );
353370 CompletableFuture <PushEntryResponse > future = handlePush (pushEntryRequest );
354- future .whenCompleteAsync ((x , y ) -> {
355- writeResponse (x , y , request , ctx );
356- }, futureExecutor );
371+ future .whenCompleteAsync ((x , y ) -> writeResponse (x , y , request , ctx ), futureExecutor );
357372 break ;
358373 }
359374 case VOTE : {
360375 VoteRequest voteRequest = JSON .parseObject (request .getBody (), VoteRequest .class );
361376 CompletableFuture <VoteResponse > future = handleVote (voteRequest );
362- future .whenCompleteAsync ((x , y ) -> {
363- writeResponse (x , y , request , ctx );
364- }, futureExecutor );
377+ future .whenCompleteAsync ((x , y ) -> writeResponse (x , y , request , ctx ), futureExecutor );
365378 break ;
366379 }
367380 case HEART_BEAT : {
368381 HeartBeatRequest heartBeatRequest = JSON .parseObject (request .getBody (), HeartBeatRequest .class );
369382 CompletableFuture <HeartBeatResponse > future = handleHeartBeat (heartBeatRequest );
370- future .whenCompleteAsync ((x , y ) -> {
371- writeResponse (x , y , request , ctx );
372- }, futureExecutor );
383+ future .whenCompleteAsync ((x , y ) -> writeResponse (x , y , request , ctx ), futureExecutor );
373384 break ;
374385 }
375386 case LEADERSHIP_TRANSFER : {
@@ -379,10 +390,16 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand
379390 future .whenCompleteAsync ((x , y ) -> {
380391 writeResponse (x , y , request , ctx );
381392 logger .info ("LEADERSHIP_TRANSFER FINISHED. Request={}, response={}, cost={}ms" ,
382- request , x , DLedgerUtils .elapsed (start ));
393+ request , x , DLedgerUtils .elapsed (start ));
383394 }, futureExecutor );
384395 break ;
385396 }
397+ case PULL_INDEX : {
398+ PullIndexRequest pullIndexRequest = JSON .parseObject (request .getBody (), PullIndexRequest .class );
399+ CompletableFuture <PullIndexResponse > future = handlePullIndex (pullIndexRequest );
400+ future .whenCompleteAsync ((x , y ) -> writeResponse (x , y , request , ctx ), futureExecutor );
401+ break ;
402+ }
386403 default :
387404 logger .error ("Unknown request code {} from {}" , request .getCode (), request );
388405 break ;
@@ -392,7 +409,7 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand
392409
393410 @ Override
394411 public CompletableFuture <LeadershipTransferResponse > handleLeadershipTransfer (
395- LeadershipTransferRequest leadershipTransferRequest ) throws Exception {
412+ LeadershipTransferRequest leadershipTransferRequest ) throws Exception {
396413 return dLedgerServer .handleLeadershipTransfer (leadershipTransferRequest );
397414 }
398415
@@ -432,6 +449,11 @@ public CompletableFuture<PushEntryResponse> handlePush(PushEntryRequest request)
432449 return dLedgerServer .handlePush (request );
433450 }
434451
452+ @ Override
453+ public CompletableFuture <PullIndexResponse > handlePullIndex (PullIndexRequest request ) throws Exception {
454+ return dLedgerServer .handlePullIndex (request );
455+ }
456+
435457 public RemotingCommand handleResponse (RequestOrResponse response , RemotingCommand request ) {
436458 RemotingCommand remotingCommand = RemotingCommand .createResponseCommand (DLedgerResponseCode .SUCCESS .getCode (), null );
437459 remotingCommand .setBody (JSON .toJSONBytes (response ));
0 commit comments