4343import java .util .concurrent .Executors ;
4444import java .util .concurrent .ThreadFactory ;
4545import java .util .concurrent .atomic .AtomicInteger ;
46+ import org .apache .rocketmq .remoting .ChannelEventListener ;
4647import org .apache .rocketmq .remoting .netty .NettyClientConfig ;
4748import org .apache .rocketmq .remoting .netty .NettyRemotingClient ;
4849import org .apache .rocketmq .remoting .netty .NettyRemotingServer ;
@@ -96,6 +97,14 @@ public Thread newThread(Runnable r) {
9697 });
9798
9899 public DLedgerRpcNettyService (DLedgerServer dLedgerServer ) {
100+ this (dLedgerServer , null , null , null );
101+ }
102+
103+ public DLedgerRpcNettyService (DLedgerServer dLedgerServer , NettyServerConfig nettyServerConfig , NettyClientConfig nettyClientConfig ) {
104+ this (dLedgerServer , nettyServerConfig , nettyClientConfig , null );
105+ }
106+
107+ public DLedgerRpcNettyService (DLedgerServer dLedgerServer , NettyServerConfig nettyServerConfig , NettyClientConfig nettyClientConfig , ChannelEventListener channelEventListener ) {
99108 this .dLedgerServer = dLedgerServer ;
100109 this .memberState = dLedgerServer .getMemberState ();
101110 NettyRequestProcessor protocolProcessor = new NettyRequestProcessor () {
@@ -110,9 +119,11 @@ public boolean rejectRequest() {
110119 }
111120 };
112121 //start the remoting server
113- NettyServerConfig nettyServerConfig = new NettyServerConfig ();
114- nettyServerConfig .setListenPort (Integer .valueOf (memberState .getSelfAddr ().split (":" )[1 ]));
115- this .remotingServer = new NettyRemotingServer (nettyServerConfig , null );
122+ if (nettyServerConfig == null ) {
123+ nettyServerConfig = new NettyServerConfig ();
124+ }
125+ nettyServerConfig .setListenPort (Integer .parseInt (memberState .getSelfAddr ().split (":" )[1 ]));
126+ this .remotingServer = new NettyRemotingServer (nettyServerConfig , channelEventListener );
116127 this .remotingServer .registerProcessor (DLedgerRequestCode .METADATA .getCode (), protocolProcessor , null );
117128 this .remotingServer .registerProcessor (DLedgerRequestCode .APPEND .getCode (), protocolProcessor , null );
118129 this .remotingServer .registerProcessor (DLedgerRequestCode .GET .getCode (), protocolProcessor , null );
@@ -123,8 +134,10 @@ public boolean rejectRequest() {
123134 this .remotingServer .registerProcessor (DLedgerRequestCode .LEADERSHIP_TRANSFER .getCode (), protocolProcessor , null );
124135
125136 //start the remoting client
126- this .remotingClient = new NettyRemotingClient (new NettyClientConfig (), null );
127-
137+ if (nettyClientConfig == null ) {
138+ nettyClientConfig = new NettyClientConfig ();
139+ }
140+ this .remotingClient = new NettyRemotingClient (nettyClientConfig , null );
128141 }
129142
130143 private String getPeerAddr (RequestOrResponse request ) {
@@ -476,4 +489,8 @@ public DLedgerServer getdLedgerServer() {
476489 public void setdLedgerServer (DLedgerServer dLedgerServer ) {
477490 this .dLedgerServer = dLedgerServer ;
478491 }
492+
493+ public NettyRemotingServer getRemotingServer () {
494+ return remotingServer ;
495+ }
479496}
0 commit comments