@@ -239,32 +239,51 @@ public void setDefaultConsumer(Consumer consumer) {
239239
240240 /**
241241 * Sends a ShutdownSignal to all active consumers.
242+ * Idempotent.
242243 * @param signal an exception signalling channel shutdown
243244 */
244- private CountDownLatch broadcastShutdownSignal (ShutdownSignalException signal ) {
245+ private void broadcastShutdownSignal (ShutdownSignalException signal ) {
245246 Map <String , Consumer > snapshotConsumers ;
246247 synchronized (_consumers ) {
247248 snapshotConsumers = new HashMap <String , Consumer >(_consumers );
248249 }
249- return this .dispatcher .handleShutdownSignal (snapshotConsumers , signal );
250+ this . finishedShutdownFlag = this .dispatcher .handleShutdownSignal (snapshotConsumers , signal );
250251 }
251252
252253 /**
253- * Protected API - overridden to quiesce consumer work and broadcast the signal
254- * to all consumers after calling the superclass's method.
254+ * Start to shutdown -- defer rest of processing until ready
255255 */
256- @ Override public void processShutdownSignal (ShutdownSignalException signal ,
256+ private void startProcessShutdownSignal (ShutdownSignalException signal ,
257257 boolean ignoreClosed ,
258258 boolean notifyRpc )
259+ { super .processShutdownSignal (signal , ignoreClosed , notifyRpc );
260+ }
261+
262+ /**
263+ * Finish shutdown processing -- idempotent
264+ */
265+ private void finishProcessShutdownSignal ()
259266 {
260267 this .dispatcher .quiesce ();
261- super . processShutdownSignal ( signal , ignoreClosed , notifyRpc );
262- this . finishedShutdownFlag = broadcastShutdownSignal ( signal );
268+ broadcastShutdownSignal ( getCloseReason () );
269+
263270 synchronized (unconfirmedSet ) {
264271 unconfirmedSet .notifyAll ();
265272 }
266273 }
267274
275+ /**
276+ * Protected API - overridden to quiesce consumer work and broadcast the signal
277+ * to all consumers after calling the superclass's method.
278+ */
279+ @ Override public void processShutdownSignal (ShutdownSignalException signal ,
280+ boolean ignoreClosed ,
281+ boolean notifyRpc )
282+ {
283+ startProcessShutdownSignal (signal , ignoreClosed , notifyRpc );
284+ finishProcessShutdownSignal ();
285+ }
286+
268287 CountDownLatch getShutdownLatch () {
269288 return this .finishedShutdownFlag ;
270289 }
@@ -526,13 +545,18 @@ public void close(int closeCode,
526545 signal .initCause (cause );
527546 }
528547
529- BlockingRpcContinuation <AMQCommand > k = new SimpleBlockingRpcContinuation ();
548+ BlockingRpcContinuation <AMQCommand > k = new BlockingRpcContinuation <AMQCommand >(){
549+ @ Override
550+ public AMQCommand transformReply (AMQCommand command ) {
551+ ChannelN .this .finishProcessShutdownSignal ();
552+ return command ;
553+ }};
530554 boolean notify = false ;
531555 try {
532556 // Synchronize the block below to avoid race conditions in case
533557 // connnection wants to send Connection-CloseOK
534558 synchronized (_channelMutex ) {
535- processShutdownSignal (signal , !initiatedByApplication , true );
559+ startProcessShutdownSignal (signal , !initiatedByApplication , true );
536560 quiescingRpc (reason , k );
537561 }
538562
@@ -581,7 +605,16 @@ public void basicPublish(String exchange, String routingKey,
581605 BasicProperties props , byte [] body )
582606 throws IOException
583607 {
584- basicPublish (exchange , routingKey , false , false , props , body );
608+ basicPublish (exchange , routingKey , false , props , body );
609+ }
610+
611+ /** Public API - {@inheritDoc} */
612+ public void basicPublish (String exchange , String routingKey ,
613+ boolean mandatory ,
614+ BasicProperties props , byte [] body )
615+ throws IOException
616+ {
617+ basicPublish (exchange , routingKey , mandatory , false , props , body );
585618 }
586619
587620 /** Public API - {@inheritDoc} */
0 commit comments