Skip to content

Commit fbda319

Browse files
author
Simon MacMullen
committed
Revert the changes to QueueingConsumer since they were considered too ugly and the obnly way to fix would be to replace it with a different API.
1 parent 1102614 commit fbda319

File tree

2 files changed

+3
-43
lines changed

2 files changed

+3
-43
lines changed

src/com/rabbitmq/client/QueueingConsumer.java

Lines changed: 3 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -49,15 +49,11 @@ public class QueueingConsumer extends DefaultConsumer {
4949
// throw a shutdown signal exception.
5050
private volatile ShutdownSignalException _shutdown;
5151

52-
// When this is true we have seen recover_ok but not had a delivery to
53-
// attach it to
54-
private boolean _nextDeliveryRecoverOk = false;
55-
5652
// Marker object used to signal the queue is in shutdown mode.
5753
// It is only there to wake up consumers. The canonical representation
5854
// of shutting down is the presence of _shutdown.
5955
// Invariant: This is never on _queue unless _shutdown != null.
60-
private static final Delivery POISON = new Delivery(null, null, null, false);
56+
private static final Delivery POISON = new Delivery(null, null, null);
6157

6258
public QueueingConsumer(Channel ch) {
6359
this(ch, new LinkedBlockingQueue<Delivery>());
@@ -81,15 +77,7 @@ public QueueingConsumer(Channel ch, BlockingQueue<Delivery> q) {
8177
throws IOException
8278
{
8379
checkShutdown();
84-
this._queue.add(new Delivery(envelope, properties, body,
85-
_nextDeliveryRecoverOk));
86-
_nextDeliveryRecoverOk = false;
87-
}
88-
89-
@Override
90-
public void handleRecoverOk() {
91-
checkShutdown();
92-
_nextDeliveryRecoverOk = true;
80+
this._queue.add(new Delivery(envelope, properties, body));
9381
}
9482

9583
/**
@@ -99,14 +87,11 @@ public static class Delivery {
9987
private final Envelope _envelope;
10088
private final AMQP.BasicProperties _properties;
10189
private final byte[] _body;
102-
private final boolean _recoverOk;
10390

104-
public Delivery(Envelope envelope, AMQP.BasicProperties properties,
105-
byte[] body, boolean recoverOk) {
91+
public Delivery(Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
10692
_envelope = envelope;
10793
_properties = properties;
10894
_body = body;
109-
_recoverOk = recoverOk;
11095
}
11196

11297
/**
@@ -132,19 +117,6 @@ public BasicProperties getProperties() {
132117
public byte[] getBody() {
133118
return _body;
134119
}
135-
136-
/**
137-
* Have we seen recover_ok immediately before this message?
138-
* If true, then all messages <b>before</b> this one that have not been
139-
* acked will be recovered, but this one and subsequent ones will not.
140-
*
141-
* NB: this method does <b>not</b> tell you whether this message, or
142-
* any other, actually is recovered - for that, see Envelope.isRedeliver().
143-
* @return whether we have seen recover_ok immediately before this message
144-
*/
145-
public boolean getRecoverOk() {
146-
return _recoverOk;
147-
}
148120
}
149121

150122
/**

test/src/com/rabbitmq/client/test/functional/Recover.java

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,6 @@ public void createResources() throws IOException {
5252

5353
static interface RecoverCallback {
5454
void recover(Channel channel) throws IOException;
55-
void checkRecover(QueueingConsumer.Delivery delivery, boolean expected) throws IOException;
5655
}
5756

5857
// The AMQP specification under-specifies the behaviour when
@@ -65,13 +64,11 @@ void verifyRedeliverOnRecover(RecoverCallback call)
6564
channel.basicConsume(queue, false, consumer); // require acks.
6665
channel.basicPublish("", queue, new AMQP.BasicProperties(), body);
6766
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
68-
call.checkRecover(delivery, false);
6967
assertTrue("consumed message body not as sent",
7068
Arrays.equals(body, delivery.getBody()));
7169
// Don't ack it, and get it redelivered to the same consumer
7270
call.recover(channel);
7371
QueueingConsumer.Delivery secondDelivery = consumer.nextDelivery(5000);
74-
call.checkRecover(secondDelivery, true);
7572
assertNotNull("timed out waiting for redelivered message", secondDelivery);
7673
assertTrue("consumed (redelivered) message body not as sent",
7774
Arrays.equals(body, delivery.getBody()));
@@ -83,7 +80,6 @@ void verifyNoRedeliveryWithAutoAck(RecoverCallback call)
8380
channel.basicConsume(queue, true, consumer); // auto ack.
8481
channel.basicPublish("", queue, new AMQP.BasicProperties(), body);
8582
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
86-
call.checkRecover(delivery, false);
8783
assertTrue("consumed message body not as sent",
8884
Arrays.equals(body, delivery.getBody()));
8985
call.recover(channel);
@@ -96,20 +92,12 @@ void verifyNoRedeliveryWithAutoAck(RecoverCallback call)
9692
public void recover(Channel channel) throws IOException {
9793
channel.basicRecoverAsync(true);
9894
}
99-
100-
public void checkRecover(QueueingConsumer.Delivery delivery, boolean expected) throws IOException {
101-
// We make no guarantees
102-
}
10395
};
10496

10597
RecoverCallback recoverSync = new RecoverCallback() {
10698
public void recover(Channel channel) throws IOException {
10799
channel.basicRecover(true);
108100
}
109-
110-
public void checkRecover(QueueingConsumer.Delivery delivery, boolean expected) throws IOException {
111-
assertEquals(expected, delivery.getRecoverOk());
112-
}
113101
};
114102

115103
public void testRedeliverOnRecoverAsync() throws IOException, InterruptedException {

0 commit comments

Comments
 (0)