Skip to content

Commit 69573ea

Browse files
author
Hubert Plociniczak
committed
Always pass the right ShutdownSignalException to
any outstanding rpc when performing the shutdown.
1 parent b7b2084 commit 69573ea

File tree

3 files changed

+22
-26
lines changed

3 files changed

+22
-26
lines changed

src/com/rabbitmq/client/impl/AMQChannel.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -221,11 +221,14 @@ public synchronized void quiescingRpc(Method m, RpcContinuation k)
221221
* @param signal the signal to handle
222222
*/
223223
public void processShutdownSignal(ShutdownSignalException signal) {
224-
synchronized (this) {
225-
ensureIsOpen(); // invariant: we should never be shut down more than once per instance
226-
_shutdownCause = signal;
224+
try {
225+
synchronized (this) {
226+
ensureIsOpen(); // invariant: we should never be shut down more than once per instance
227+
_shutdownCause = signal;
228+
}
229+
} finally {
230+
notifyOutstandingRpc(signal);
227231
}
228-
notifyOutstandingRpc(signal);
229232
}
230233

231234
public void notifyOutstandingRpc(ShutdownSignalException signal) {

src/com/rabbitmq/client/impl/AMQConnection.java

Lines changed: 14 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -457,26 +457,15 @@ public MainLoop() {
457457
}
458458
}
459459
} catch (EOFException ex) {
460-
if (isOpen()) {
461-
System.err.println("AMQConnection.mainLoop: connection close");
462-
shutdown(ex, false, ex);
463-
}
460+
shutdown(ex, false, ex);
464461
} catch (Throwable ex) {
465462
_exceptionHandler.handleUnexpectedConnectionDriverException(AMQConnection.this,
466463
ex);
467-
if (isOpen()) {
468-
shutdown(ex, false, ex);
469-
}
464+
shutdown(ex, false, ex);
470465
} finally {
471466
// Finally, shut down our underlying data connection.
472467
_frameHandler.close();
473468

474-
// Set shutdown exception for any outstanding rpc,
475-
// so that it does not wait infinitely for Connection.CloseOk.
476-
// This can only happen when the broker closed the socket
477-
// unexpectedly.
478-
_channel0.notifyOutstandingRpc(_shutdownCause);
479-
480469
_appContinuation.set(null);
481470
notifyListeners();
482471
}
@@ -599,22 +588,27 @@ public void shutdown(Object reason,
599588
boolean initiatedByApplication,
600589
Throwable cause)
601590
{
591+
592+
ShutdownSignalException sse = new ShutdownSignalException(true,initiatedByApplication,
593+
reason, this);
594+
sse.initCause(cause);
602595
try {
603596
synchronized (this) {
604597
ensureIsOpen(); // invariant: we should never be shut down more than once per instance
605-
ShutdownSignalException sse = new ShutdownSignalException(true,
606-
initiatedByApplication,
607-
reason, this);
608-
sse.initCause(cause);
609598
_shutdownCause = sse;
610599
}
611-
612-
_channel0.processShutdownSignal(_shutdownCause);
613600
} catch (AlreadyClosedException ace) {
614601
if (initiatedByApplication)
615602
throw ace;
603+
} finally {
604+
try {
605+
_channel0.processShutdownSignal(sse);
606+
} catch (AlreadyClosedException ace) {
607+
if (initiatedByApplication)
608+
throw ace;
609+
}
616610
}
617-
_channelManager.handleSignal(_shutdownCause);
611+
_channelManager.handleSignal(sse);
618612
}
619613

620614
/**

src/com/rabbitmq/client/impl/ChannelManager.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,7 @@ public void handleSignal(ShutdownSignalException signal) {
7171
try {
7272
channel.processShutdownSignal(signal);
7373
} catch (ShutdownSignalException sse) {
74-
// Notify channels that are waiting for rpc
75-
channel.notifyOutstandingRpc(signal);
74+
// Ignore
7675
}
7776
}
7877
}

0 commit comments

Comments
 (0)