Skip to content

Commit 1102614

Browse files
Make available the knowledge that we've just seen a basic.recover_ok to the thread that's handling Consumer callbacks, and in QueueingConsumer.
1 parent d0d6e52 commit 1102614

File tree

5 files changed

+64
-3
lines changed

5 files changed

+64
-3
lines changed

src/com/rabbitmq/client/Consumer.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,11 @@ public interface Consumer {
7070
*/
7171
void handleShutdownSignal(String consumerTag, ShutdownSignalException sig);
7272

73+
/**
74+
*
75+
*/
76+
void handleRecoverOk();
77+
7378
/**
7479
* Called when a delivery appears for this consumer.
7580
* @param consumerTag the defined consumerTag (either client- or server-generated)

src/com/rabbitmq/client/DefaultConsumer.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,13 @@ public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig
7474
// no work to do
7575
}
7676

77+
/**
78+
* No-op implementation of {@link Consumer#handleRecoverOk}.
79+
*/
80+
public void handleRecoverOk() {
81+
// no work to do
82+
}
83+
7784
/**
7885
* No-op implementation of {@link Consumer#handleDelivery}.
7986
*/

src/com/rabbitmq/client/QueueingConsumer.java

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,11 +49,15 @@ public class QueueingConsumer extends DefaultConsumer {
4949
// throw a shutdown signal exception.
5050
private volatile ShutdownSignalException _shutdown;
5151

52+
// When this is true we have seen recover_ok but not had a delivery to
53+
// attach it to
54+
private boolean _nextDeliveryRecoverOk = false;
55+
5256
// Marker object used to signal the queue is in shutdown mode.
5357
// It is only there to wake up consumers. The canonical representation
5458
// of shutting down is the presence of _shutdown.
5559
// Invariant: This is never on _queue unless _shutdown != null.
56-
private static final Delivery POISON = new Delivery(null, null, null);
60+
private static final Delivery POISON = new Delivery(null, null, null, false);
5761

5862
public QueueingConsumer(Channel ch) {
5963
this(ch, new LinkedBlockingQueue<Delivery>());
@@ -77,7 +81,15 @@ public QueueingConsumer(Channel ch, BlockingQueue<Delivery> q) {
7781
throws IOException
7882
{
7983
checkShutdown();
80-
this._queue.add(new Delivery(envelope, properties, body));
84+
this._queue.add(new Delivery(envelope, properties, body,
85+
_nextDeliveryRecoverOk));
86+
_nextDeliveryRecoverOk = false;
87+
}
88+
89+
@Override
90+
public void handleRecoverOk() {
91+
checkShutdown();
92+
_nextDeliveryRecoverOk = true;
8193
}
8294

8395
/**
@@ -87,11 +99,14 @@ public static class Delivery {
8799
private final Envelope _envelope;
88100
private final AMQP.BasicProperties _properties;
89101
private final byte[] _body;
102+
private final boolean _recoverOk;
90103

91-
public Delivery(Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
104+
public Delivery(Envelope envelope, AMQP.BasicProperties properties,
105+
byte[] body, boolean recoverOk) {
92106
_envelope = envelope;
93107
_properties = properties;
94108
_body = body;
109+
_recoverOk = recoverOk;
95110
}
96111

97112
/**
@@ -117,6 +132,19 @@ public BasicProperties getProperties() {
117132
public byte[] getBody() {
118133
return _body;
119134
}
135+
136+
/**
137+
* Have we seen recover_ok immediately before this message?
138+
* If true, then all messages <b>before</b> this one that have not been
139+
* acked will be recovered, but this one and subsequent ones will not.
140+
*
141+
* NB: this method does <b>not</b> tell you whether this message, or
142+
* any other, actually is recovered - for that, see Envelope.isRedeliver().
143+
* @return whether we have seen recover_ok immediately before this message
144+
*/
145+
public boolean getRecoverOk() {
146+
return _recoverOk;
147+
}
120148
}
121149

122150
/**

src/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,15 @@ public void releaseChannelNumber() {
258258
_channelMutex.notifyAll();
259259
}
260260
return true;
261+
} else if (method instanceof Basic.RecoverOk) {
262+
for (Consumer callback: _consumers.values()) {
263+
callback.handleRecoverOk();
264+
}
265+
266+
// Unlike all the other cases we still want this RecoverOk to
267+
// be handled by whichever RPC continuation invoked Recover,
268+
// so return false
269+
return false;
261270
} else {
262271
return false;
263272
}

test/src/com/rabbitmq/client/test/functional/Recover.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ public void createResources() throws IOException {
5252

5353
static interface RecoverCallback {
5454
void recover(Channel channel) throws IOException;
55+
void checkRecover(QueueingConsumer.Delivery delivery, boolean expected) throws IOException;
5556
}
5657

5758
// The AMQP specification under-specifies the behaviour when
@@ -64,11 +65,13 @@ void verifyRedeliverOnRecover(RecoverCallback call)
6465
channel.basicConsume(queue, false, consumer); // require acks.
6566
channel.basicPublish("", queue, new AMQP.BasicProperties(), body);
6667
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
68+
call.checkRecover(delivery, false);
6769
assertTrue("consumed message body not as sent",
6870
Arrays.equals(body, delivery.getBody()));
6971
// Don't ack it, and get it redelivered to the same consumer
7072
call.recover(channel);
7173
QueueingConsumer.Delivery secondDelivery = consumer.nextDelivery(5000);
74+
call.checkRecover(secondDelivery, true);
7275
assertNotNull("timed out waiting for redelivered message", secondDelivery);
7376
assertTrue("consumed (redelivered) message body not as sent",
7477
Arrays.equals(body, delivery.getBody()));
@@ -80,6 +83,7 @@ void verifyNoRedeliveryWithAutoAck(RecoverCallback call)
8083
channel.basicConsume(queue, true, consumer); // auto ack.
8184
channel.basicPublish("", queue, new AMQP.BasicProperties(), body);
8285
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
86+
call.checkRecover(delivery, false);
8387
assertTrue("consumed message body not as sent",
8488
Arrays.equals(body, delivery.getBody()));
8589
call.recover(channel);
@@ -92,12 +96,20 @@ void verifyNoRedeliveryWithAutoAck(RecoverCallback call)
9296
public void recover(Channel channel) throws IOException {
9397
channel.basicRecoverAsync(true);
9498
}
99+
100+
public void checkRecover(QueueingConsumer.Delivery delivery, boolean expected) throws IOException {
101+
// We make no guarantees
102+
}
95103
};
96104

97105
RecoverCallback recoverSync = new RecoverCallback() {
98106
public void recover(Channel channel) throws IOException {
99107
channel.basicRecover(true);
100108
}
109+
110+
public void checkRecover(QueueingConsumer.Delivery delivery, boolean expected) throws IOException {
111+
assertEquals(expected, delivery.getRecoverOk());
112+
}
101113
};
102114

103115
public void testRedeliverOnRecoverAsync() throws IOException, InterruptedException {

0 commit comments

Comments
 (0)