Skip to content

Commit b28aae5

Browse files
author
Emile Joubert
committed
Merged bug22864 into default
2 parents 1f83d67 + 1dc4623 commit b28aae5

File tree

6 files changed

+97
-13
lines changed

6 files changed

+97
-13
lines changed

build.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
<pathelement path="${test.javac.out}"/>
2727
</path>
2828

29-
<property name="AMQP_SPEC_JSON_PATH" value="${codegen.dir}/amqp-${spec.version}.json"/>
29+
<property name="AMQP_SPEC_JSON_PATH" value="${codegen.dir}/amqp-${spec.version}.json ${codegen.dir}/rabbitmq-0.8-extensions.json"/>
3030

3131
<target name="amqp-generate-check" description="check if codegen needs to be run">
3232
<uptodate property="amqp.generate.notRequired">

src/com/rabbitmq/client/Channel.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -495,12 +495,22 @@ Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, b
495495
* Ask the broker to resend unacknowledged messages. In 0-8
496496
* basic.recover is asynchronous; in 0-9-1 it is synchronous, and
497497
* the new, deprecated method basic.recover_async is asynchronous.
498-
* To avoid this API changing, this is named for the latter, and
499-
* will be deprecated.
500498
* @param requeue If true, messages will be requeued and possibly
501499
* delivered to a different consumer. If false, messages will be
502500
* redelivered to the same consumer.
503501
*/
502+
Basic.RecoverOk basicRecover(boolean requeue) throws IOException;
503+
504+
/**
505+
* Ask the broker to resend unacknowledged messages. In 0-8
506+
* basic.recover is asynchronous; in 0-9-1 it is synchronous, and
507+
* the new, deprecated method basic.recover_async is asynchronous
508+
* and deprecated.
509+
* @param requeue If true, messages will be requeued and possibly
510+
* delivered to a different consumer. If false, messages will be
511+
* redelivered to the same consumer.
512+
*/
513+
@Deprecated
504514
void basicRecoverAsync(boolean requeue) throws IOException;
505515

506516
/**

src/com/rabbitmq/client/Consumer.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,19 @@ public interface Consumer {
7070
*/
7171
void handleShutdownSignal(String consumerTag, ShutdownSignalException sig);
7272

73+
/**
74+
* Called to notify the consumer that we've received a basic.recover-ok
75+
* in reply to a basic.recover some other thread sent. All messages
76+
* received before this is invoked that haven't been ack'ed will be
77+
* redelivered. All messages received afterwards won't be.
78+
*
79+
* This method exists since all the Consumer callbacks are invoked by the
80+
* connection main loop thread - so it's sometimes useful to allow that
81+
* thread to know that the recover-ok has been received, rather than the
82+
* thread which invoked basicRecover().
83+
*/
84+
void handleRecoverOk();
85+
7386
/**
7487
* Called when a delivery appears for this consumer.
7588
* @param consumerTag the defined consumerTag (either client- or server-generated)

src/com/rabbitmq/client/DefaultConsumer.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,13 @@ public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig
7474
// no work to do
7575
}
7676

77+
/**
78+
* No-op implementation of {@link Consumer#handleRecoverOk}.
79+
*/
80+
public void handleRecoverOk() {
81+
// no work to do
82+
}
83+
7784
/**
7885
* No-op implementation of {@link Consumer#handleDelivery}.
7986
*/

src/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -285,6 +285,15 @@ public void releaseChannelNumber() {
285285
}
286286
}
287287
return true;
288+
} else if (method instanceof Basic.RecoverOk) {
289+
for (Consumer callback: _consumers.values()) {
290+
callback.handleRecoverOk();
291+
}
292+
293+
// Unlike all the other cases we still want this RecoverOk to
294+
// be handled by whichever RPC continuation invoked Recover,
295+
// so return false
296+
return false;
288297
} else {
289298
return false;
290299
}
@@ -699,11 +708,19 @@ public Consumer transformReply(AMQCommand replyCommand) {
699708
}
700709
}
701710

711+
/** Public API - {@inheritDoc} */
712+
public Basic.RecoverOk basicRecover(boolean requeue)
713+
throws IOException
714+
{
715+
return (Basic.RecoverOk) exnWrappingRpc(new Basic.Recover(requeue)).getMethod();
716+
}
717+
718+
702719
/** Public API - {@inheritDoc} */
703720
public void basicRecoverAsync(boolean requeue)
704721
throws IOException
705722
{
706-
transmit(new Basic.Recover(requeue));
723+
transmit(new Basic.RecoverAsync(requeue));
707724
}
708725

709726
/** Public API - {@inheritDoc} */

test/src/com/rabbitmq/client/test/functional/Recover.java

Lines changed: 46 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -36,48 +36,85 @@
3636

3737
import com.rabbitmq.client.AMQP;
3838
import com.rabbitmq.client.QueueingConsumer;
39+
import com.rabbitmq.client.Channel;
3940

4041
import com.rabbitmq.client.test.BrokerTestCase;
4142

4243
public class Recover extends BrokerTestCase {
4344

4445
String queue;
4546
byte[] body = "message".getBytes();
46-
47+
4748
public void createResources() throws IOException {
4849
AMQP.Queue.DeclareOk ok = channel.queueDeclare();
4950
queue = ok.getQueue();
5051
}
5152

52-
public void testRedeliverOnRecover() throws IOException, InterruptedException {
53+
static interface RecoverCallback {
54+
void recover(Channel channel) throws IOException;
55+
}
56+
57+
// The AMQP specification under-specifies the behaviour when
58+
// requeue=false. So we can't really test any scenarios for
59+
// requeue=false.
60+
61+
void verifyRedeliverOnRecover(RecoverCallback call)
62+
throws IOException, InterruptedException {
5363
QueueingConsumer consumer = new QueueingConsumer(channel);
5464
channel.basicConsume(queue, false, consumer); // require acks.
5565
channel.basicPublish("", queue, new AMQP.BasicProperties(), body);
5666
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
5767
assertTrue("consumed message body not as sent",
5868
Arrays.equals(body, delivery.getBody()));
5969
// Don't ack it, and get it redelivered to the same consumer
60-
channel.basicRecoverAsync(true);
70+
call.recover(channel);
6171
QueueingConsumer.Delivery secondDelivery = consumer.nextDelivery(5000);
6272
assertNotNull("timed out waiting for redelivered message", secondDelivery);
6373
assertTrue("consumed (redelivered) message body not as sent",
64-
Arrays.equals(body, delivery.getBody()));
74+
Arrays.equals(body, delivery.getBody()));
6575
}
6676

67-
public void testNoRedeliveryWithAutoAck() throws IOException, InterruptedException {
77+
void verifyNoRedeliveryWithAutoAck(RecoverCallback call)
78+
throws IOException, InterruptedException {
6879
QueueingConsumer consumer = new QueueingConsumer(channel);
6980
channel.basicConsume(queue, true, consumer); // auto ack.
7081
channel.basicPublish("", queue, new AMQP.BasicProperties(), body);
7182
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
7283
assertTrue("consumed message body not as sent",
7384
Arrays.equals(body, delivery.getBody()));
74-
channel.basicRecoverAsync(true);
85+
call.recover(channel);
7586
// there's a race here between our recover finishing and the basic.get;
7687
Thread.sleep(500);
7788
assertNull("should be no message available", channel.basicGet(queue, true));
7889
}
7990

80-
// The AMQP specification under-specifies the behaviour when
81-
// requeue=false. So we can't really test any scenarios for
82-
// requeue=false.
91+
RecoverCallback recoverAsync = new RecoverCallback() {
92+
public void recover(Channel channel) throws IOException {
93+
channel.basicRecoverAsync(true);
94+
}
95+
};
96+
97+
RecoverCallback recoverSync = new RecoverCallback() {
98+
public void recover(Channel channel) throws IOException {
99+
channel.basicRecover(true);
100+
}
101+
};
102+
103+
public void testRedeliverOnRecoverAsync() throws IOException, InterruptedException {
104+
verifyRedeliverOnRecover(recoverAsync);
105+
}
106+
107+
public void testRedeliveryOnRecover() throws IOException, InterruptedException {
108+
verifyRedeliverOnRecover(recoverSync);
109+
}
110+
111+
public void testNoRedeliveryWithAutoAckAsync()
112+
throws IOException, InterruptedException {
113+
verifyNoRedeliveryWithAutoAck(recoverAsync);
114+
}
115+
116+
public void testNoRedeliveryWithAutoAck()
117+
throws IOException, InterruptedException {
118+
verifyNoRedeliveryWithAutoAck(recoverSync);
119+
}
83120
}

0 commit comments

Comments
 (0)