Skip to content

Commit 742c826

Browse files
author
David R. MacIver
committed
use a different mechanism for handling shutdown in QueuingConsumer so that multiple threads consuming will all receive the exception
1 parent 6ccc3ce commit 742c826

File tree

1 file changed

+24
-14
lines changed

1 file changed

+24
-14
lines changed

src/com/rabbitmq/client/QueueingConsumer.java

Lines changed: 24 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -37,28 +37,32 @@
3737
import java.util.concurrent.TimeUnit;
3838

3939
import com.rabbitmq.client.AMQP.BasicProperties;
40-
import com.rabbitmq.utility.ValueOrException;
40+
import com.rabbitmq.utility.Utility;
4141

4242
/**
4343
* Convenience class: an implementation of {@link Consumer} with straightforward blocking semantics
4444
*/
4545
public class QueueingConsumer extends DefaultConsumer {
46-
public BlockingQueue<ValueOrException<Delivery, ShutdownSignalException>> _queue;
46+
private final BlockingQueue<Delivery> _queue;
47+
private volatile ShutdownSignalException _shutdown;
48+
49+
// Marker object used to signal the queue is in shutdown mode.
50+
// Invariant: This is never on _queue unless _shutdown != null.
51+
public static final Delivery POISON = new Delivery(null, null, null);
4752

4853
public QueueingConsumer(Channel ch) {
49-
this(ch,
50-
new LinkedBlockingQueue<ValueOrException<Delivery, ShutdownSignalException>>());
54+
this(ch, new LinkedBlockingQueue<Delivery>());
5155
}
5256

53-
public QueueingConsumer(Channel ch,
54-
BlockingQueue<ValueOrException<Delivery, ShutdownSignalException>> q)
57+
public QueueingConsumer(Channel ch, BlockingQueue<Delivery> q)
5558
{
5659
super(ch);
5760
this._queue = q;
5861
}
5962

6063
@Override public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {
61-
this._queue.add(ValueOrException. <Delivery, ShutdownSignalException> makeException(sig));
64+
_shutdown = sig;
65+
_queue.add(POISON);
6266
}
6367

6468
@Override public void handleDelivery(String consumerTag,
@@ -67,8 +71,7 @@ public QueueingConsumer(Channel ch,
6771
byte[] body)
6872
throws IOException
6973
{
70-
this._queue.add(ValueOrException. <Delivery, ShutdownSignalException> makeValue
71-
(new Delivery(envelope, properties, body)));
74+
this._queue.add(new Delivery(envelope, properties, body));
7275
}
7376

7477
/**
@@ -110,6 +113,15 @@ public byte[] getBody() {
110113
}
111114
}
112115

116+
private Delivery handle(Delivery delivery)
117+
{
118+
if(delivery == POISON || (delivery == null && _shutdown != null)){
119+
_queue.add(POISON);
120+
throw Utility.fixStackTrace(_shutdown);
121+
}
122+
return delivery;
123+
}
124+
113125
/**
114126
* Main application-side API: wait for the next message delivery and return it.
115127
* @return the next message
@@ -119,7 +131,7 @@ public byte[] getBody() {
119131
public Delivery nextDelivery()
120132
throws InterruptedException, ShutdownSignalException
121133
{
122-
return _queue.take().getValue();
134+
return handle(_queue.take());
123135
}
124136

125137
/**
@@ -132,16 +144,14 @@ public Delivery nextDelivery()
132144
public Delivery nextDelivery(long timeout)
133145
throws InterruptedException, ShutdownSignalException
134146
{
135-
ValueOrException<Delivery, ShutdownSignalException> r =
136-
_queue.poll(timeout, TimeUnit.MILLISECONDS);
137-
return r == null ? null : r.getValue();
147+
return handle(_queue.poll(timeout, TimeUnit.MILLISECONDS));
138148
}
139149

140150
/**
141151
* Retrieve the underlying blocking queue.
142152
* @return the queue where incoming messages are stored
143153
*/
144-
public BlockingQueue<ValueOrException<Delivery, ShutdownSignalException>> getQueue() {
154+
public BlockingQueue<Delivery> getQueue() {
145155
return _queue;
146156
}
147157
}

0 commit comments

Comments
 (0)