Skip to content

Commit 94bccd5

Browse files
committed
add API and test for basic.reject
1 parent 3a7d68b commit 94bccd5

File tree

4 files changed

+95
-0
lines changed

4 files changed

+95
-0
lines changed

src/com/rabbitmq/client/Channel.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -457,6 +457,17 @@ Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, b
457457
*/
458458
void basicAck(long deliveryTag, boolean multiple) throws IOException;
459459

460+
/**
461+
* Reject a message. Supply the deliveryTag from the {@link com.rabbitmq.client.AMQP.Basic.GetOk}
462+
* or {@link com.rabbitmq.client.AMQP.Basic.Deliver} method
463+
* containing the received message being rejected.
464+
* @see com.rabbitmq.client.AMQP.Basic.Reject
465+
* @param deliveryTag the tag from the received {@link com.rabbitmq.client.AMQP.Basic.GetOk} or {@link com.rabbitmq.client.AMQP.Basic.Deliver}
466+
* @param requeue true if the rejected message should be requeued rather than discarded
467+
* @throws java.io.IOException if an error is encountered
468+
*/
469+
void basicReject(long deliveryTag, boolean requeue) throws IOException;
470+
460471
/**
461472
* Start a non-nolocal, non-exclusive consumer, with
462473
* explicit acknowledgements required and a server-generated consumerTag.

src/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -646,6 +646,13 @@ public void basicAck(long deliveryTag, boolean multiple)
646646
transmit(new Basic.Ack(deliveryTag, multiple));
647647
}
648648

649+
/** Public API - {@inheritDoc} */
650+
public void basicReject(long deliveryTag, boolean requeue)
651+
throws IOException
652+
{
653+
transmit(new Basic.Reject(deliveryTag, requeue));
654+
}
655+
649656
/** Public API - {@inheritDoc} */
650657
public String basicConsume(String queue, Consumer callback)
651658
throws IOException

test/src/com/rabbitmq/client/test/functional/FunctionalTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ public static TestSuite suite() {
4343
suite.addTestSuite(Routing.class);
4444
suite.addTestSuite(BindingLifecycle.class);
4545
suite.addTestSuite(Recover.class);
46+
suite.addTestSuite(Reject.class);
4647
suite.addTestSuite(TransactionalRecover.class);
4748
suite.addTestSuite(Transactions.class);
4849
suite.addTestSuite(PersistentTransactions.class);
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
// The contents of this file are subject to the Mozilla Public License
2+
// Version 1.1 (the "License"); you may not use this file except in
3+
// compliance with the License. You may obtain a copy of the License at
4+
// http://www.mozilla.org/MPL/
5+
//
6+
// Software distributed under the License is distributed on an "AS IS"
7+
// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
8+
// License for the specific language governing rights and limitations
9+
// under the License.
10+
//
11+
// The Original Code is RabbitMQ.
12+
//
13+
// The Initial Developers of the Original Code are LShift Ltd,
14+
// Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
15+
//
16+
// Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
17+
// Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
18+
// are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
19+
// Technologies LLC, and Rabbit Technologies Ltd.
20+
//
21+
// Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
22+
// Ltd. Portions created by Cohesive Financial Technologies LLC are
23+
// Copyright (C) 2007-2010 Cohesive Financial Technologies
24+
// LLC. Portions created by Rabbit Technologies Ltd are Copyright
25+
// (C) 2007-2010 Rabbit Technologies Ltd.
26+
//
27+
// All Rights Reserved.
28+
//
29+
// Contributor(s): ______________________________________.
30+
//
31+
32+
package com.rabbitmq.client.test.functional;
33+
34+
import com.rabbitmq.client.test.BrokerTestCase;
35+
36+
import com.rabbitmq.client.QueueingConsumer;
37+
import com.rabbitmq.client.QueueingConsumer.Delivery;
38+
39+
import java.io.IOException;
40+
import java.util.Arrays;
41+
42+
public class Reject extends BrokerTestCase
43+
{
44+
45+
protected long checkDelivery(Delivery d, byte[] msg, boolean redelivered)
46+
{
47+
assertNotNull(d);
48+
assertTrue(Arrays.equals(msg, d.getBody()));
49+
assertEquals(d.getEnvelope().isRedeliver(), redelivered);
50+
return d.getEnvelope().getDeliveryTag();
51+
}
52+
53+
public void testReject()
54+
throws IOException, InterruptedException
55+
{
56+
String q = channel.queueDeclare("", false, true, false, null).getQueue();
57+
58+
QueueingConsumer c = new QueueingConsumer(channel);
59+
String consumerTag = channel.basicConsume(q, false, c);
60+
61+
byte[] m1 = "1".getBytes();
62+
byte[] m2 = "2".getBytes();
63+
64+
basicPublishVolatile(m1, q);
65+
basicPublishVolatile(m2, q);
66+
67+
long tag1 = checkDelivery(c.nextDelivery(), m1, false);
68+
long tag2 = checkDelivery(c.nextDelivery(), m2, false);
69+
channel.basicReject(tag2, true);
70+
long tag3 = checkDelivery(c.nextDelivery(), m2, true);
71+
channel.basicCancel(consumerTag);
72+
channel.basicReject(tag3, false);
73+
assertNull(channel.basicGet(q, false));
74+
channel.basicAck(tag1, false);
75+
}
76+
}

0 commit comments

Comments
 (0)