2323import com .rabbitmq .client .Address ;
2424import com .rabbitmq .client .ConnectionFactory ;
2525import com .rabbitmq .client .MalformedFrameException ;
26+ import com .rabbitmq .client .ShutdownSignalException ;
2627import com .rabbitmq .client .SocketConfigurator ;
2728import io .netty .bootstrap .Bootstrap ;
2829import io .netty .buffer .ByteBuf ;
6162import java .util .concurrent .atomic .AtomicReference ;
6263import java .util .function .Consumer ;
6364import java .util .function .Function ;
65+ import java .util .function .Predicate ;
6466import javax .net .ssl .SSLHandshakeException ;
6567import org .slf4j .Logger ;
6668import org .slf4j .LoggerFactory ;
@@ -73,6 +75,7 @@ public final class NettyFrameHandlerFactory extends AbstractFrameHandlerFactory
7375 private final Consumer <Channel > channelCustomizer ;
7476 private final Consumer <Bootstrap > bootstrapCustomizer ;
7577 private final Duration enqueuingTimeout ;
78+ private final Predicate <ShutdownSignalException > willRecover ;
7679
7780 public NettyFrameHandlerFactory (
7881 EventLoopGroup eventLoopGroup ,
@@ -82,14 +85,30 @@ public NettyFrameHandlerFactory(
8285 Duration enqueuingTimeout ,
8386 int connectionTimeout ,
8487 SocketConfigurator configurator ,
85- int maxInboundMessageBodySize ) {
88+ int maxInboundMessageBodySize ,
89+ boolean automaticRecovery ,
90+ Predicate <ShutdownSignalException > recoveryCondition ) {
8691 super (connectionTimeout , configurator , sslContextFactory != null , maxInboundMessageBodySize );
8792 this .eventLoopGroup = eventLoopGroup ;
8893 this .sslContextFactory = sslContextFactory == null ? connName -> null : sslContextFactory ;
8994 this .channelCustomizer = channelCustomizer == null ? Utils .noOpConsumer () : channelCustomizer ;
9095 this .bootstrapCustomizer =
9196 bootstrapCustomizer == null ? Utils .noOpConsumer () : bootstrapCustomizer ;
9297 this .enqueuingTimeout = enqueuingTimeout ;
98+ this .willRecover =
99+ sse -> {
100+ if (!automaticRecovery ) {
101+ return false ;
102+ } else {
103+ try {
104+ return recoveryCondition .test (sse );
105+ } catch (Exception e ) {
106+ // we assume it will recover, so we take the safe path to dispatch the closing
107+ // it avoids the risk of deadlock
108+ return true ;
109+ }
110+ }
111+ };
93112 }
94113
95114 private static void closeNettyState (Channel channel , EventLoopGroup eventLoopGroup ) {
@@ -133,6 +152,7 @@ public FrameHandler create(Address addr, String connectionName) throws IOExcepti
133152 sslContext ,
134153 this .eventLoopGroup ,
135154 this .enqueuingTimeout ,
155+ this .willRecover ,
136156 this .channelCustomizer ,
137157 this .bootstrapCustomizer );
138158 }
@@ -163,6 +183,7 @@ private NettyFrameHandler(
163183 SslContext sslContext ,
164184 EventLoopGroup elg ,
165185 Duration enqueuingTimeout ,
186+ Predicate <ShutdownSignalException > willRecover ,
166187 Consumer <Channel > channelCustomizer ,
167188 Consumer <Bootstrap > bootstrapCustomizer )
168189 throws IOException {
@@ -195,7 +216,8 @@ private NettyFrameHandler(
195216 int lengthFieldOffset = 3 ;
196217 int lengthFieldLength = 4 ;
197218 int lengthAdjustement = 1 ;
198- AmqpHandler amqpHandler = new AmqpHandler (maxInboundMessageBodySize , this ::close );
219+ AmqpHandler amqpHandler =
220+ new AmqpHandler (maxInboundMessageBodySize , this ::close , willRecover );
199221 int port = ConnectionFactory .portOrDefault (addr .getPort (), sslContext != null );
200222 b .handler (
201223 new ChannelInitializer <SocketChannel >() {
@@ -404,14 +426,26 @@ private static class AmqpHandler extends ChannelInboundHandlerAdapter {
404426
405427 private final int maxPayloadSize ;
406428 private final Runnable closeSequence ;
429+ private final Predicate <ShutdownSignalException > willRecover ;
407430 private volatile AMQConnection connection ;
431+ private volatile Channel ch ;
408432 private final AtomicBoolean writable = new AtomicBoolean (true );
409433 private final AtomicReference <CountDownLatch > writableLatch =
410434 new AtomicReference <>(new CountDownLatch (1 ));
411435
412- private AmqpHandler (int maxPayloadSize , Runnable closeSequence ) {
436+ private AmqpHandler (
437+ int maxPayloadSize ,
438+ Runnable closeSequence ,
439+ Predicate <ShutdownSignalException > willRecover ) {
413440 this .maxPayloadSize = maxPayloadSize ;
414441 this .closeSequence = closeSequence ;
442+ this .willRecover = willRecover ;
443+ }
444+
445+ @ Override
446+ public void channelActive (ChannelHandlerContext ctx ) throws Exception {
447+ this .ch = ctx .channel ();
448+ super .channelActive (ctx );
415449 }
416450
417451 @ Override
@@ -444,7 +478,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
444478 if (noProblem
445479 && (!this .connection .isRunning () || this .connection .hasBrokerInitiatedShutdown ())) {
446480 // looks like the frame was Close-Ok or Close
447- ctx . executor (). submit (() -> this .connection .doFinalShutdown ());
481+ this . dispatchShutdownToConnection (() -> this .connection .doFinalShutdown ());
448482 }
449483 } finally {
450484 m .release ();
@@ -504,10 +538,10 @@ public void channelInactive(ChannelHandlerContext ctx) {
504538 AMQConnection c = this .connection ;
505539 if (c .isOpen ()) {
506540 // it is likely to be an IO exception
507- c .handleIoError (null );
541+ this . dispatchShutdownToConnection (() -> c .handleIoError (null ) );
508542 } else {
509543 // just in case, the call is idempotent anyway
510- c . doFinalShutdown ( );
544+ this . dispatchShutdownToConnection ( c :: doFinalShutdown );
511545 }
512546 }
513547 }
@@ -533,7 +567,7 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc
533567 this .connection .getAddress ().getHostName (),
534568 this .connection .getPort ());
535569 if (needToDispatchIoError ()) {
536- this .connection .handleHeartbeatFailure ();
570+ this .dispatchShutdownToConnection (() -> this . connection .handleHeartbeatFailure () );
537571 }
538572 } else if (e .state () == IdleState .WRITER_IDLE ) {
539573 this .connection .writeFrame (new Frame (AMQP .FRAME_HEARTBEAT , 0 ));
@@ -545,7 +579,7 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc
545579
546580 private void handleIoError (Throwable cause ) {
547581 if (needToDispatchIoError ()) {
548- this .connection .handleIoError (cause );
582+ this .dispatchShutdownToConnection (() -> this . connection .handleIoError (cause ) );
549583 } else {
550584 this .closeSequence .run ();
551585 }
@@ -563,6 +597,32 @@ private boolean isWritable() {
563597 private CountDownLatch writableLatch () {
564598 return this .writableLatch .get ();
565599 }
600+
601+ protected void dispatchShutdownToConnection (Runnable connectionShutdownRunnable ) {
602+ String name = "rabbitmq-connection-shutdown" ;
603+ AMQConnection c = this .connection ;
604+ if (c == null || ch == null ) {
605+ // not enough information, we dispatch in separate thread
606+ Environment .newThread (connectionShutdownRunnable , name ).start ();
607+ } else {
608+ if (ch .eventLoop ().inEventLoop ()) {
609+ if (this .willRecover .test (c .getCloseReason ())) {
610+ // the connection will recover, we don't want this to happen in the event loop,
611+ // it could cause a deadlock, so using a separate thread
612+ name = name + "-" + c ;
613+ System .out .println ("in separate thread" );
614+ Environment .newThread (connectionShutdownRunnable , name ).start ();
615+ } else {
616+ // no recovery, it is safe to dispatch in the event loop
617+ System .out .println ("in event loop" );
618+ ch .eventLoop ().submit (connectionShutdownRunnable );
619+ }
620+ } else {
621+ // not in the event loop, we can run it in the same thread
622+ connectionShutdownRunnable .run ();
623+ }
624+ }
625+ }
566626 }
567627
568628 private static final class ProtocolVersionMismatchHandler extends ChannelInboundHandlerAdapter {
0 commit comments