@@ -49,24 +49,24 @@ public class QueueingConsumer extends DefaultConsumer {
4949 // throw a shutdown signal exception.
5050 private volatile ShutdownSignalException _shutdown ;
5151
52- // Marker object used to signal the queue is in shutdown mode.
52+ // Marker object used to signal the queue is in shutdown mode.
5353 // It is only there to wake up consumers. The canonical representation
54- // of shutting down is the presence of _shutdown.
54+ // of shutting down is the presence of _shutdown.
5555 // Invariant: This is never on _queue unless _shutdown != null.
5656 private static final Delivery POISON = new Delivery (null , null , null );
5757
5858 public QueueingConsumer (Channel ch ) {
5959 this (ch , new LinkedBlockingQueue <Delivery >());
6060 }
6161
62- public QueueingConsumer (Channel ch , BlockingQueue <Delivery > q )
63- {
62+ public QueueingConsumer (Channel ch , BlockingQueue <Delivery > q ) {
6463 super (ch );
6564 this ._queue = q ;
6665 }
6766
68- @ Override public void handleShutdownSignal (String consumerTag , ShutdownSignalException sig ) {
69- _shutdown = sig ;
67+ @ Override public void handleShutdownSignal (String consumerTag ,
68+ ShutdownSignalException sig ) {
69+ _shutdown = sig ;
7070 _queue .add (POISON );
7171 }
7272
@@ -122,22 +122,30 @@ public byte[] getBody() {
122122 /**
123123 * Check if we are in shutdown mode and if so throw an exception.
124124 */
125- private void checkShutdown (){
126- if (_shutdown != null ) throw Utility .fixStackTrace (_shutdown );
125+ private void checkShutdown () {
126+ if (_shutdown != null )
127+ throw Utility .fixStackTrace (_shutdown );
127128 }
128129
129130 /**
130131 * If this is a non-POISON non-null delivery simply return it.
131132 * If this is POISON we are in shutdown mode, throw _shutdown
132133 * If this is null, we may be in shutdown mode. Check and see.
133134 */
134- private Delivery handle (Delivery delivery )
135- {
136- if (delivery == POISON || (delivery == null && _shutdown != null )){
137- if (delivery == POISON ) _queue .add (POISON );
138- throw Utility .fixStackTrace (_shutdown );
139- }
140- return delivery ;
135+ private Delivery handle (Delivery delivery ) {
136+ if (delivery == POISON ||
137+ delivery == null && _shutdown != null ) {
138+ if (delivery == POISON ) {
139+ _queue .add (POISON );
140+ if (_shutdown == null ) {
141+ throw new IllegalStateException (
142+ "POISON in queue, but null _shutdown. " +
143+ "This should never happen, please report as a BUG" );
144+ }
145+ }
146+ throw Utility .fixStackTrace (_shutdown );
147+ }
148+ return delivery ;
141149 }
142150
143151 /**
@@ -162,7 +170,6 @@ public Delivery nextDelivery()
162170 public Delivery nextDelivery (long timeout )
163171 throws InterruptedException , ShutdownSignalException
164172 {
165- checkShutdown ();
166173 return handle (_queue .poll (timeout , TimeUnit .MILLISECONDS ));
167174 }
168175}
0 commit comments