Skip to content

Commit df244fe

Browse files
author
Hubert Plociniczak
committed
Fixed problems mentioned in QA.
Added notifyRpc argument to processShutdownSignal to enable late notification in the server initiated shutdown. Added _brokerInitiatedShutdown to identify this special case and do not report false exception in the main loop.
1 parent 69573ea commit df244fe

File tree

4 files changed

+50
-30
lines changed

4 files changed

+50
-30
lines changed

src/com/rabbitmq/client/impl/AMQChannel.java

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -215,22 +215,40 @@ public synchronized void quiescingRpc(Method m, RpcContinuation k)
215215
@Override public String toString() {
216216
return "AMQChannel(" + _connection + "," + _channelNumber + ")";
217217
}
218-
218+
219219
/**
220220
* Protected API - respond, in the driver thread, to a {@link ShutdownSignalException}.
221221
* @param signal the signal to handle
222+
* @param ignoreClosed the flag indicating whether to ignore the AlreadyClosedException
223+
* thrown when the channel is already closed
224+
* @param notifyRpc the flag indicating whether any remaining rpc continuation should be
225+
* notified with the given signal
222226
*/
223-
public void processShutdownSignal(ShutdownSignalException signal) {
227+
public void processShutdownSignal(ShutdownSignalException signal, boolean ignoreClosed, boolean notifyRpc) {
224228
try {
225229
synchronized (this) {
226230
ensureIsOpen(); // invariant: we should never be shut down more than once per instance
227231
_shutdownCause = signal;
228232
}
233+
} catch (AlreadyClosedException ace) {
234+
if (!ignoreClosed)
235+
throw ace;
229236
} finally {
230-
notifyOutstandingRpc(signal);
237+
if (notifyRpc)
238+
notifyOutstandingRpc(signal);
231239
}
232240
}
233241

242+
/**
243+
* Protected API - respond, in the driver thread, to a {@link ShutdownSignalException}.
244+
* The method behaves in the same way as processShutdownSignal(ShutdownSignalException, boolean, boolean)
245+
* with the only difference that any continuing rpc is always notified of the shutdown signal.
246+
* @param signal the signal to handle
247+
*/
248+
public void processShutdownSignal(ShutdownSignalException signal, boolean ignoreClosed) {
249+
processShutdownSignal(signal, ignoreClosed, true);
250+
}
251+
234252
public void notifyOutstandingRpc(ShutdownSignalException signal) {
235253
RpcContinuation k = nextOutstandingRpc();
236254
if (k != null) {

src/com/rabbitmq/client/impl/AMQConnection.java

Lines changed: 24 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,9 @@ public class AMQConnection extends ShutdownNotifierComponent implements Connecti
103103
*/
104104
public BlockingCell<Object> _appContinuation = new BlockingCell<Object>();
105105

106+
/** Flag indicating whether the client received Connection.Close message from the broker */
107+
public boolean _brokerInitiatedShutdown = false;
108+
106109
/**
107110
* Protected API - respond, in the driver thread, to a ShutdownSignal.
108111
* @param channelNumber the number of the channel to disconnect
@@ -191,6 +194,7 @@ public AMQConnection(ConnectionParameters params,
191194
_missedHeartbeats = 0;
192195
_heartbeat = 0;
193196
_exceptionHandler = exceptionHandler;
197+
_brokerInitiatedShutdown = false;
194198

195199
new MainLoop(); // start the main loop going
196200

@@ -457,15 +461,15 @@ public MainLoop() {
457461
}
458462
}
459463
} catch (EOFException ex) {
460-
shutdown(ex, false, ex);
464+
if (!_brokerInitiatedShutdown)
465+
shutdown(ex, false, ex, true);
461466
} catch (Throwable ex) {
462467
_exceptionHandler.handleUnexpectedConnectionDriverException(AMQConnection.this,
463468
ex);
464-
shutdown(ex, false, ex);
469+
shutdown(ex, false, ex, true);
465470
} finally {
466471
// Finally, shut down our underlying data connection.
467472
_frameHandler.close();
468-
469473
_appContinuation.set(null);
470474
notifyListeners();
471475
}
@@ -551,18 +555,22 @@ public boolean processControlCommand(Command c)
551555
}
552556

553557
public void handleConnectionClose(Command closeCommand) {
554-
shutdown(closeCommand, false, null);
558+
ShutdownSignalException sse = shutdown(closeCommand, false, null, false);
555559
try {
556560
_channel0.quiescingTransmit(new AMQImpl.Connection.CloseOk());
557561
} catch (IOException ioe) {
558562
Utility.emptyStatement();
559563
}
560-
_heartbeat = 0; // Do not try to send heartbeats after CloseOk
561-
new SocketCloseWait();
564+
_heartbeat = 0; // Do not try to send heartbeats after CloseOk
565+
_brokerInitiatedShutdown = true;
566+
new SocketCloseWait(sse);
562567
}
563568

564569
private class SocketCloseWait extends Thread {
565-
public SocketCloseWait() {
570+
private ShutdownSignalException cause;
571+
572+
public SocketCloseWait(ShutdownSignalException sse) {
573+
cause = sse;
566574
start();
567575
}
568576

@@ -575,6 +583,7 @@ public SocketCloseWait() {
575583
_frameHandler.close();
576584
} finally {
577585
_running = false;
586+
_channel0.notifyOutstandingRpc(cause);
578587
}
579588
}
580589
}
@@ -583,12 +592,14 @@ public SocketCloseWait() {
583592
* Protected API - causes all attached channels to terminate with
584593
* a ShutdownSignal built from the argument, and stops this
585594
* connection from accepting further work from the application.
595+
*
596+
* @return a shutdown signal built using the given arguments
586597
*/
587-
public void shutdown(Object reason,
598+
public ShutdownSignalException shutdown(Object reason,
588599
boolean initiatedByApplication,
589-
Throwable cause)
600+
Throwable cause,
601+
boolean notifyRpc)
590602
{
591-
592603
ShutdownSignalException sse = new ShutdownSignalException(true,initiatedByApplication,
593604
reason, this);
594605
sse.initCause(cause);
@@ -600,15 +611,10 @@ public void shutdown(Object reason,
600611
} catch (AlreadyClosedException ace) {
601612
if (initiatedByApplication)
602613
throw ace;
603-
} finally {
604-
try {
605-
_channel0.processShutdownSignal(sse);
606-
} catch (AlreadyClosedException ace) {
607-
if (initiatedByApplication)
608-
throw ace;
609-
}
610614
}
615+
_channel0.processShutdownSignal(sse, !initiatedByApplication, notifyRpc);
611616
_channelManager.handleSignal(sse);
617+
return sse;
612618
}
613619

614620
/**
@@ -687,7 +693,7 @@ public void close(int closeCode,
687693
try {
688694
AMQImpl.Connection.Close reason =
689695
new AMQImpl.Connection.Close(closeCode, closeMessage, 0, 0);
690-
shutdown(reason, initiatedByApplication, cause);
696+
shutdown(reason, initiatedByApplication, cause, true);
691697
AMQChannel.SimpleBlockingRpcContinuation k =
692698
new AMQChannel.SimpleBlockingRpcContinuation();
693699
_channel0.quiescingRpc(reason, k);

src/com/rabbitmq/client/impl/ChannelManager.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -68,11 +68,7 @@ public void handleSignal(ShutdownSignalException signal) {
6868
}
6969
for (AMQChannel channel : channels) {
7070
disconnectChannel(channel.getChannelNumber());
71-
try {
72-
channel.processShutdownSignal(signal);
73-
} catch (ShutdownSignalException sse) {
74-
// Ignore
75-
}
71+
channel.processShutdownSignal(signal, true);
7672
}
7773
}
7874

src/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -145,8 +145,8 @@ public void broadcastShutdownSignal(ShutdownSignalException signal) {
145145
* Protected API - overridden to broadcast the signal to all
146146
* consumers before calling the superclass's method.
147147
*/
148-
@Override public void processShutdownSignal(ShutdownSignalException signal) {
149-
super.processShutdownSignal(signal);
148+
@Override public void processShutdownSignal(ShutdownSignalException signal, boolean ignoreClosed) {
149+
super.processShutdownSignal(signal, ignoreClosed);
150150
broadcastShutdownSignal(signal);
151151
}
152152

@@ -222,7 +222,7 @@ public void releaseChannelNumber() {
222222
command,
223223
this);
224224
synchronized(this) {
225-
processShutdownSignal(signal);
225+
processShutdownSignal(signal, true);
226226
quiescingTransmit(new Channel.CloseOk());
227227
}
228228
notifyListeners();
@@ -285,7 +285,7 @@ public void close(int closeCode,
285285
// Synchronize the block below to avoid race conditions in case
286286
// connnection wants to send Connection-CloseOK
287287
synchronized(this) {
288-
processShutdownSignal(signal);
288+
processShutdownSignal(signal, !initiatedByApplication);
289289
quiescingRpc(reason, k);
290290
}
291291

0 commit comments

Comments
 (0)