Skip to content

Commit bad5a41

Browse files
committed
Merge bug21844 into default
2 parents 0c3254d + 3f26e08 commit bad5a41

File tree

4 files changed

+100
-0
lines changed

4 files changed

+100
-0
lines changed

src/com/rabbitmq/client/Channel.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -468,6 +468,18 @@ Queue.DeclareOk queueDeclare(String queue, boolean passive, boolean durable, boo
468468
*/
469469
void basicCancel(String consumerTag) throws IOException;
470470

471+
/**
472+
* Ask the broker to resend unacknowledged messages. In 0-8
473+
* basic.recover is asynchronous; in 0-9-1 it is synchronous, and
474+
* the new, deprecated method basic.recover_async is asynchronous.
475+
* To avoid this API changing, this is named for the latter, and
476+
* will be deprecated.
477+
* @param requeue If true, messages will be requeued and possibly
478+
* delivered to a different consumer. If false, messages will be
479+
* redelivered to the same consumer.
480+
*/
481+
void basicRecoverAsync(boolean requeue) throws IOException;
482+
471483
/**
472484
* Enables TX mode on this channel.
473485
* @see com.rabbitmq.client.AMQP.Tx.Select

src/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -668,6 +668,13 @@ public Consumer transformReply(AMQCommand replyCommand) {
668668
}
669669
}
670670

671+
/** Public API - {@inheritDoc} */
672+
public void basicRecoverAsync(boolean requeue)
673+
throws IOException
674+
{
675+
transmit(new Basic.Recover(requeue));
676+
}
677+
671678
/** Public API - {@inheritDoc} */
672679
public Tx.SelectOk txSelect()
673680
throws IOException

test/src/com/rabbitmq/client/test/functional/FunctionalTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ public static TestSuite suite() {
4141
suite.addTestSuite(DoubleDeletion.class);
4242
suite.addTestSuite(Routing.class);
4343
suite.addTestSuite(BindingLifecycle.class);
44+
suite.addTestSuite(Recover.class);
4445
suite.addTestSuite(Transactions.class);
4546
suite.addTestSuite(PersistentTransactions.class);
4647
suite.addTestSuite(RequeueOnConnectionClose.class);
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
// The contents of this file are subject to the Mozilla Public License
2+
// Version 1.1 (the "License"); you may not use this file except in
3+
// compliance with the License. You may obtain a copy of the License at
4+
// http://www.mozilla.org/MPL/
5+
//
6+
// Software distributed under the License is distributed on an "AS IS"
7+
// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
8+
// License for the specific language governing rights and limitations
9+
// under the License.
10+
//
11+
// The Original Code is RabbitMQ.
12+
//
13+
// The Initial Developers of the Original Code are LShift Ltd,
14+
// Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
15+
//
16+
// Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
17+
// Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
18+
// are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
19+
// Technologies LLC, and Rabbit Technologies Ltd.
20+
//
21+
// Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
22+
// Ltd. Portions created by Cohesive Financial Technologies LLC are
23+
// Copyright (C) 2007-2009 Cohesive Financial Technologies
24+
// LLC. Portions created by Rabbit Technologies Ltd are Copyright
25+
// (C) 2007-2009 Rabbit Technologies Ltd.
26+
//
27+
// All Rights Reserved.
28+
//
29+
// Contributor(s): ______________________________________.
30+
//
31+
32+
package com.rabbitmq.client.test.functional;
33+
34+
import java.util.Arrays;
35+
import java.io.IOException;
36+
37+
import com.rabbitmq.client.AMQP;
38+
import com.rabbitmq.client.QueueingConsumer;
39+
40+
public class Recover extends BrokerTestCase {
41+
42+
String queue;
43+
byte[] body = "message".getBytes();
44+
45+
public void createResources() throws IOException {
46+
AMQP.Queue.DeclareOk ok = channel.queueDeclare();
47+
queue = ok.getQueue();
48+
}
49+
50+
public void testRedeliverOnRecover() throws IOException, InterruptedException {
51+
QueueingConsumer consumer = new QueueingConsumer(channel);
52+
channel.basicConsume(queue, false, consumer); // require acks.
53+
channel.basicPublish("", queue, new AMQP.BasicProperties(), body);
54+
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
55+
assertTrue("consumed message body not as sent",
56+
Arrays.equals(body, delivery.getBody()));
57+
// Don't ack it, and get it redelivered to the same consumer
58+
channel.basicRecoverAsync(false);
59+
QueueingConsumer.Delivery secondDelivery = consumer.nextDelivery(5000);
60+
assertNotNull("timed out waiting for redelivered message", secondDelivery);
61+
assertTrue("consumed (redelivered) message body not as sent",
62+
Arrays.equals(body, delivery.getBody()));
63+
}
64+
65+
public void testNoRedeliveryWithAutoAck() throws IOException, InterruptedException {
66+
QueueingConsumer consumer = new QueueingConsumer(channel);
67+
channel.basicConsume(queue, true, consumer); // auto ack.
68+
channel.basicPublish("", queue, new AMQP.BasicProperties(), body);
69+
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
70+
assertTrue("consumed message body not as sent",
71+
Arrays.equals(body, delivery.getBody()));
72+
channel.basicRecoverAsync(false);
73+
// there's a race here between our recover finishing and the basic.get;
74+
Thread.sleep(500);
75+
assertNull("should be no message available", channel.basicGet(queue, true));
76+
}
77+
78+
// The AMQP specification under-specifies the behaviour when
79+
// requeue=false. So we can't really test anything.
80+
}

0 commit comments

Comments
 (0)