Skip to content

Commit d0d055f

Browse files
author
Matthias Radestock
committed
merge bug19487 into default
2 parents 74ca20d + 9f7994a commit d0d055f

File tree

4 files changed

+57
-50
lines changed

4 files changed

+57
-50
lines changed

src/com/rabbitmq/client/impl/AMQChannel.java

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -215,17 +215,29 @@ public synchronized void quiescingRpc(Method m, RpcContinuation k)
215215
@Override public String toString() {
216216
return "AMQChannel(" + _connection + "," + _channelNumber + ")";
217217
}
218-
218+
219219
/**
220220
* Protected API - respond, in the driver thread, to a {@link ShutdownSignalException}.
221221
* @param signal the signal to handle
222+
* @param ignoreClosed the flag indicating whether to ignore the AlreadyClosedException
223+
* thrown when the channel is already closed
224+
* @param notifyRpc the flag indicating whether any remaining rpc continuation should be
225+
* notified with the given signal
222226
*/
223-
public void processShutdownSignal(ShutdownSignalException signal) {
224-
synchronized (this) {
225-
ensureIsOpen(); // invariant: we should never be shut down more than once per instance
226-
_shutdownCause = signal;
227+
public void processShutdownSignal(ShutdownSignalException signal,
228+
boolean ignoreClosed,
229+
boolean notifyRpc) {
230+
try {
231+
synchronized (this) {
232+
if (!ignoreClosed)
233+
ensureIsOpen(); // invariant: we should never be shut down more than once per instance
234+
if (isOpen())
235+
_shutdownCause = signal;
236+
}
237+
} finally {
238+
if (notifyRpc)
239+
notifyOutstandingRpc(signal);
227240
}
228-
notifyOutstandingRpc(signal);
229241
}
230242

231243
public void notifyOutstandingRpc(ShutdownSignalException signal) {

src/com/rabbitmq/client/impl/AMQConnection.java

Lines changed: 31 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,9 @@ public class AMQConnection extends ShutdownNotifierComponent implements Connecti
103103
*/
104104
public BlockingCell<Object> _appContinuation = new BlockingCell<Object>();
105105

106+
/** Flag indicating whether the client received Connection.Close message from the broker */
107+
public boolean _brokerInitiatedShutdown = false;
108+
106109
/**
107110
* Protected API - respond, in the driver thread, to a ShutdownSignal.
108111
* @param channelNumber the number of the channel to disconnect
@@ -191,6 +194,7 @@ public AMQConnection(ConnectionParameters params,
191194
_missedHeartbeats = 0;
192195
_heartbeat = 0;
193196
_exceptionHandler = exceptionHandler;
197+
_brokerInitiatedShutdown = false;
194198

195199
new MainLoop(); // start the main loop going
196200

@@ -457,26 +461,15 @@ public MainLoop() {
457461
}
458462
}
459463
} catch (EOFException ex) {
460-
if (isOpen()) {
461-
System.err.println("AMQConnection.mainLoop: connection close");
462-
shutdown(ex, false, ex);
463-
}
464+
if (!_brokerInitiatedShutdown)
465+
shutdown(ex, false, ex, true);
464466
} catch (Throwable ex) {
465467
_exceptionHandler.handleUnexpectedConnectionDriverException(AMQConnection.this,
466468
ex);
467-
if (isOpen()) {
468-
shutdown(ex, false, ex);
469-
}
469+
shutdown(ex, false, ex, true);
470470
} finally {
471471
// Finally, shut down our underlying data connection.
472472
_frameHandler.close();
473-
474-
// Set shutdown exception for any outstanding rpc,
475-
// so that it does not wait infinitely for Connection.CloseOk.
476-
// This can only happen when the broker closed the socket
477-
// unexpectedly.
478-
_channel0.notifyOutstandingRpc(_shutdownCause);
479-
480473
_appContinuation.set(null);
481474
notifyListeners();
482475
}
@@ -562,18 +555,22 @@ public boolean processControlCommand(Command c)
562555
}
563556

564557
public void handleConnectionClose(Command closeCommand) {
565-
shutdown(closeCommand, false, null);
558+
ShutdownSignalException sse = shutdown(closeCommand, false, null, false);
566559
try {
567560
_channel0.quiescingTransmit(new AMQImpl.Connection.CloseOk());
568561
} catch (IOException ioe) {
569562
Utility.emptyStatement();
570563
}
571-
_heartbeat = 0; // Do not try to send heartbeats after CloseOk
572-
new SocketCloseWait();
564+
_heartbeat = 0; // Do not try to send heartbeats after CloseOk
565+
_brokerInitiatedShutdown = true;
566+
new SocketCloseWait(sse);
573567
}
574568

575569
private class SocketCloseWait extends Thread {
576-
public SocketCloseWait() {
570+
private ShutdownSignalException cause;
571+
572+
public SocketCloseWait(ShutdownSignalException sse) {
573+
cause = sse;
577574
start();
578575
}
579576

@@ -586,6 +583,7 @@ public SocketCloseWait() {
586583
_frameHandler.close();
587584
} finally {
588585
_running = false;
586+
_channel0.notifyOutstandingRpc(cause);
589587
}
590588
}
591589
}
@@ -594,27 +592,26 @@ public SocketCloseWait() {
594592
* Protected API - causes all attached channels to terminate with
595593
* a ShutdownSignal built from the argument, and stops this
596594
* connection from accepting further work from the application.
595+
*
596+
* @return a shutdown signal built using the given arguments
597597
*/
598-
public void shutdown(Object reason,
598+
public ShutdownSignalException shutdown(Object reason,
599599
boolean initiatedByApplication,
600-
Throwable cause)
600+
Throwable cause,
601+
boolean notifyRpc)
601602
{
602-
try {
603-
synchronized (this) {
603+
ShutdownSignalException sse = new ShutdownSignalException(true,initiatedByApplication,
604+
reason, this);
605+
sse.initCause(cause);
606+
synchronized (this) {
607+
if (initiatedByApplication)
604608
ensureIsOpen(); // invariant: we should never be shut down more than once per instance
605-
ShutdownSignalException sse = new ShutdownSignalException(true,
606-
initiatedByApplication,
607-
reason, this);
608-
sse.initCause(cause);
609+
if (isOpen())
609610
_shutdownCause = sse;
610-
}
611-
612-
_channel0.processShutdownSignal(_shutdownCause);
613-
} catch (AlreadyClosedException ace) {
614-
if (initiatedByApplication)
615-
throw ace;
616611
}
617-
_channelManager.handleSignal(_shutdownCause);
612+
_channel0.processShutdownSignal(sse, !initiatedByApplication, notifyRpc);
613+
_channelManager.handleSignal(sse);
614+
return sse;
618615
}
619616

620617
public void close()
@@ -690,7 +687,7 @@ public void close(int closeCode,
690687
try {
691688
AMQImpl.Connection.Close reason =
692689
new AMQImpl.Connection.Close(closeCode, closeMessage, 0, 0);
693-
shutdown(reason, initiatedByApplication, cause);
690+
shutdown(reason, initiatedByApplication, cause, true);
694691
AMQChannel.SimpleBlockingRpcContinuation k =
695692
new AMQChannel.SimpleBlockingRpcContinuation();
696693
_channel0.quiescingRpc(reason, k);

src/com/rabbitmq/client/impl/ChannelManager.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -68,12 +68,7 @@ public void handleSignal(ShutdownSignalException signal) {
6868
}
6969
for (AMQChannel channel : channels) {
7070
disconnectChannel(channel.getChannelNumber());
71-
try {
72-
channel.processShutdownSignal(signal);
73-
} catch (ShutdownSignalException sse) {
74-
// Notify channels that are waiting for rpc
75-
channel.notifyOutstandingRpc(signal);
76-
}
71+
channel.processShutdownSignal(signal, true, true);
7772
}
7873
}
7974

src/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -146,8 +146,11 @@ public void broadcastShutdownSignal(ShutdownSignalException signal) {
146146
* Protected API - overridden to broadcast the signal to all
147147
* consumers before calling the superclass's method.
148148
*/
149-
@Override public void processShutdownSignal(ShutdownSignalException signal) {
150-
super.processShutdownSignal(signal);
149+
@Override public void processShutdownSignal(ShutdownSignalException signal,
150+
boolean ignoreClosed,
151+
boolean notifyRpc)
152+
{
153+
super.processShutdownSignal(signal, ignoreClosed, notifyRpc);
151154
broadcastShutdownSignal(signal);
152155
}
153156

@@ -223,7 +226,7 @@ public void releaseChannelNumber() {
223226
command,
224227
this);
225228
synchronized(this) {
226-
processShutdownSignal(signal);
229+
processShutdownSignal(signal, true, true);
227230
quiescingTransmit(new Channel.CloseOk());
228231
}
229232
notifyListeners();
@@ -289,7 +292,7 @@ public void close(int closeCode,
289292
// Synchronize the block below to avoid race conditions in case
290293
// connnection wants to send Connection-CloseOK
291294
synchronized(this) {
292-
processShutdownSignal(signal);
295+
processShutdownSignal(signal, !initiatedByApplication, true);
293296
quiescingRpc(reason, k);
294297
}
295298

0 commit comments

Comments
 (0)