@@ -77,7 +77,7 @@ public void run() {
7777 long now = System .currentTimeMillis ();
7878 if ((now - state .getLastActivity ()) > state .getConnection ().getHeartbeat () * 1000 * 2 ) {
7979 try {
80- state . getConnection (). handleHeartbeatFailure ();
80+ handleHeartbeatFailure (state );
8181 } catch (Exception e ) {
8282 LOGGER .warn ("Error after heartbeat failure of connection {}" , state .getConnection ());
8383 } catch (AssertionError e ) {
@@ -270,48 +270,68 @@ protected void handleIoError(SocketChannelFrameHandlerState state, Throwable ex)
270270 }
271271 }
272272
273+ protected void handleHeartbeatFailure (final SocketChannelFrameHandlerState state ) {
274+ if (needToDispatchIoError (state )) {
275+ dispatchShutdownToConnection (
276+ new Runnable () {
277+
278+ @ Override
279+ public void run () {
280+ state .getConnection ().handleHeartbeatFailure ();
281+ }
282+ },
283+ state .getConnection ().toString ()
284+ );
285+ } else {
286+ try {
287+ state .close ();
288+ } catch (IOException e ) {
289+
290+ }
291+ }
292+ }
293+
273294 protected boolean needToDispatchIoError (final SocketChannelFrameHandlerState state ) {
274295 return state .getConnection ().isOpen ();
275296 }
276297
277298 protected void dispatchIoErrorToConnection (final SocketChannelFrameHandlerState state , final Throwable ex ) {
278- // In case of recovery after the shutdown,
279- // the new connection shouldn't be initialized in
280- // the NIO thread, to avoid a deadlock.
281- Runnable shutdown = new Runnable () {
299+ dispatchShutdownToConnection (
300+ new Runnable () {
282301
283- @ Override
284- public void run () {
285- try {
302+ @ Override
303+ public void run () {
286304 state .getConnection ().handleIoError (ex );
287- } catch (AssertionError e ) {
288- LOGGER .warn ("Assertion error during error dispatching to connection: " + e .getMessage ());
289305 }
290- }
291- };
292- if (executorService () == null ) {
293- String name = "rabbitmq-connection-shutdown-" + state .getConnection ();
294- Thread shutdownThread = Environment .newThread (threadFactory (), shutdown , name );
295- shutdownThread .start ();
296- } else {
297- executorService ().submit (shutdown );
298- }
306+ },
307+ state .getConnection ().toString ()
308+ );
299309 }
300310
301311 protected void dispatchShutdownToConnection (final SocketChannelFrameHandlerState state ) {
302- Runnable shutdown = new Runnable () {
303- @ Override
304- public void run () {
305- state .getConnection ().doFinalShutdown ();
306- }
307- };
312+ dispatchShutdownToConnection (
313+ new Runnable () {
314+
315+ @ Override
316+ public void run () {
317+ state .getConnection ().doFinalShutdown ();
318+ }
319+ },
320+ state .getConnection ().toString ()
321+ );
322+ }
323+
324+ protected void dispatchShutdownToConnection (Runnable connectionShutdownRunnable , String connectionName ) {
325+ // In case of recovery after the shutdown,
326+ // the new connection shouldn't be initialized in
327+ // the NIO thread, to avoid a deadlock.
308328 if (this .connectionShutdownExecutor != null ) {
309- connectionShutdownExecutor .execute (shutdown );
329+ connectionShutdownExecutor .execute (connectionShutdownRunnable );
310330 } else if (executorService () != null ) {
311- executorService ().execute (shutdown );
331+ executorService ().execute (connectionShutdownRunnable );
312332 } else {
313- String name = "rabbitmq-connection-shutdown-" + state . getConnection () ;
314- Thread shutdownThread = Environment .newThread (threadFactory (), shutdown , name );
333+ String name = "rabbitmq-connection-shutdown-" + connectionName ;
334+ Thread shutdownThread = Environment .newThread (threadFactory (), connectionShutdownRunnable , name );
315335 shutdownThread .start ();
316336 }
317337 }
0 commit comments