Skip to content

Commit dae95e0

Browse files
author
Simon MacMullen
committed
Merged bug23675
2 parents 9947c93 + 650745c commit dae95e0

File tree

5 files changed

+273
-52
lines changed

5 files changed

+273
-52
lines changed

src/com/rabbitmq/client/Channel.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -534,6 +534,23 @@ Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, b
534534
*/
535535
void basicAck(long deliveryTag, boolean multiple) throws IOException;
536536

537+
/**
538+
* Reject one or several received messages.
539+
*
540+
* Supply the <code>deliveryTag</code> from the {@link com.rabbitmq.client.AMQP.Basic.GetOk}
541+
* or {@link com.rabbitmq.client.AMQP.Basic.GetOk} method containing the message to be rejected.
542+
* @see com.rabbitmq.client.AMQP.Basic.Nack
543+
* @param deliveryTag the tag from the received {@link com.rabbitmq.client.AMQP.Basic.GetOk} or {@link com.rabbitmq.client.AMQP.Basic.Deliver}
544+
* @param multiple true to reject all messages up to and including
545+
* the supplied delivery tag; false to reject just the supplied
546+
* delivery tag.
547+
* @param requeue true if the rejected message(s) should be requeued rather
548+
* than discarded/dead-lettered
549+
* @throws java.io.IOException if an error is encountered
550+
*/
551+
void basicNack(long deliveryTag, boolean multiple, boolean requeue)
552+
throws IOException;
553+
537554
/**
538555
* Reject a message. Supply the deliveryTag from the {@link com.rabbitmq.client.AMQP.Basic.GetOk}
539556
* or {@link com.rabbitmq.client.AMQP.Basic.Deliver} method

src/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -719,6 +719,13 @@ public void basicAck(long deliveryTag, boolean multiple)
719719
transmit(new Basic.Ack(deliveryTag, multiple));
720720
}
721721

722+
/** Public API - {@inheritDoc} */
723+
public void basicNack(long deliveryTag, boolean multiple, boolean requeue)
724+
throws IOException
725+
{
726+
transmit(new Basic.Nack(deliveryTag, multiple, requeue));
727+
}
728+
722729
/** Public API - {@inheritDoc} */
723730
public void basicReject(long deliveryTag, boolean requeue)
724731
throws IOException
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
// The contents of this file are subject to the Mozilla Public License
2+
// Version 1.1 (the "License"); you may not use this file except in
3+
// compliance with the License. You may obtain a copy of the License at
4+
// http://www.mozilla.org/MPL/
5+
//
6+
// Software distributed under the License is distributed on an "AS IS"
7+
// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
8+
// License for the specific language governing rights and limitations
9+
// under the License.
10+
//
11+
// The Original Code is RabbitMQ.
12+
//
13+
// The Initial Developers of the Original Code are LShift Ltd,
14+
// Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
15+
//
16+
// Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
17+
// Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
18+
// are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
19+
// Technologies LLC, and Rabbit Technologies Ltd.
20+
//
21+
// Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
22+
// Ltd. Portions created by Cohesive Financial Technologies LLC are
23+
// Copyright (C) 2007-2010 Cohesive Financial Technologies
24+
// LLC. Portions created by Rabbit Technologies Ltd are Copyright
25+
// (C) 2007-2010 Rabbit Technologies Ltd.
26+
//
27+
// All Rights Reserved.
28+
//
29+
// Contributor(s): ______________________________________.
30+
31+
package com.rabbitmq.client.test.functional;
32+
33+
import com.rabbitmq.client.Channel;
34+
import com.rabbitmq.client.Envelope;
35+
import com.rabbitmq.client.GetResponse;
36+
import com.rabbitmq.client.QueueingConsumer;
37+
import com.rabbitmq.client.test.BrokerTestCase;
38+
39+
import java.io.IOException;
40+
import java.util.Arrays;
41+
42+
abstract class AbstractRejectTest extends BrokerTestCase {
43+
44+
protected Channel secondaryChannel;
45+
46+
@Override
47+
protected void setUp()
48+
throws IOException
49+
{
50+
super.setUp();
51+
secondaryChannel = connection.createChannel();
52+
53+
}
54+
55+
@Override
56+
protected void tearDown()
57+
throws IOException
58+
{
59+
if (secondaryChannel != null) {
60+
secondaryChannel.abort();
61+
secondaryChannel = null;
62+
}
63+
super.tearDown();
64+
}
65+
66+
protected long checkDelivery(QueueingConsumer.Delivery d,
67+
byte[] msg, boolean redelivered)
68+
{
69+
assertNotNull(d);
70+
return checkDelivery(d.getEnvelope(), d.getBody(), msg, redelivered);
71+
}
72+
73+
protected long checkDelivery(GetResponse r, byte[] msg, boolean redelivered)
74+
{
75+
assertNotNull(r);
76+
return checkDelivery(r.getEnvelope(), r.getBody(), msg, redelivered);
77+
}
78+
79+
protected long checkDelivery(Envelope e, byte[] m,
80+
byte[] msg, boolean redelivered)
81+
{
82+
assertNotNull(e);
83+
assertTrue(Arrays.equals(m, msg));
84+
assertEquals(e.isRedeliver(), redelivered);
85+
return e.getDeliveryTag();
86+
}
87+
}
Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
// The contents of this file are subject to the Mozilla Public License
2+
// Version 1.1 (the "License"); you may not use this file except in
3+
// compliance with the License. You may obtain a copy of the License at
4+
// http://www.mozilla.org/MPL/
5+
//
6+
// Software distributed under the License is distributed on an "AS IS"
7+
// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
8+
// License for the specific language governing rights and limitations
9+
// under the License.
10+
//
11+
// The Original Code is RabbitMQ.
12+
//
13+
// The Initial Developers of the Original Code are LShift Ltd,
14+
// Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
15+
//
16+
// Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
17+
// Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
18+
// are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
19+
// Technologies LLC, and Rabbit Technologies Ltd.
20+
//
21+
// Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
22+
// Ltd. Portions created by Cohesive Financial Technologies LLC are
23+
// Copyright (C) 2007-2010 Cohesive Financial Technologies
24+
// LLC. Portions created by Rabbit Technologies Ltd are Copyright
25+
// (C) 2007-2010 Rabbit Technologies Ltd.
26+
//
27+
// All Rights Reserved.
28+
//
29+
// Contributor(s): ______________________________________.
30+
//
31+
32+
package com.rabbitmq.client.test.functional;
33+
34+
import com.rabbitmq.client.AMQP;
35+
import com.rabbitmq.client.GetResponse;
36+
import com.rabbitmq.client.QueueingConsumer;
37+
38+
import java.util.HashSet;
39+
import java.util.Set;
40+
41+
public class Nack extends AbstractRejectTest {
42+
43+
public void testSingleNack() throws Exception {
44+
String q =
45+
channel.queueDeclare("", false, true, false, null).getQueue();
46+
47+
byte[] m1 = "1".getBytes();
48+
byte[] m2 = "2".getBytes();
49+
50+
basicPublishVolatile(m1, q);
51+
basicPublishVolatile(m2, q);
52+
53+
long tag1 = checkDelivery(channel.basicGet(q, false), m1, false);
54+
long tag2 = checkDelivery(channel.basicGet(q, false), m2, false);
55+
56+
QueueingConsumer c = new QueueingConsumer(secondaryChannel);
57+
String consumerTag = secondaryChannel.basicConsume(q, false, c);
58+
59+
// requeue
60+
channel.basicNack(tag2, false, true);
61+
62+
long tag3 = checkDelivery(c.nextDelivery(), m2, true);
63+
secondaryChannel.basicCancel(consumerTag);
64+
65+
// no requeue
66+
secondaryChannel.basicNack(tag3, false, false);
67+
68+
assertNull(channel.basicGet(q, false));
69+
channel.basicAck(tag1, false);
70+
channel.basicNack(tag3, false, true);
71+
72+
expectError(AMQP.PRECONDITION_FAILED);
73+
}
74+
75+
public void testMultiNack() throws Exception {
76+
String q =
77+
channel.queueDeclare("", false, true, false, null).getQueue();
78+
79+
byte[] m1 = "1".getBytes();
80+
byte[] m2 = "2".getBytes();
81+
byte[] m3 = "3".getBytes();
82+
byte[] m4 = "4".getBytes();
83+
84+
basicPublishVolatile(m1, q);
85+
basicPublishVolatile(m2, q);
86+
basicPublishVolatile(m3, q);
87+
basicPublishVolatile(m4, q);
88+
89+
checkDelivery(channel.basicGet(q, false), m1, false);
90+
long tag1 = checkDelivery(channel.basicGet(q, false), m2, false);
91+
checkDelivery(channel.basicGet(q, false), m3, false);
92+
long tag2 = checkDelivery(channel.basicGet(q, false), m4, false);
93+
94+
// ack, leaving a gap in un-acked sequence
95+
channel.basicAck(tag1, false);
96+
97+
QueueingConsumer c = new QueueingConsumer(secondaryChannel);
98+
String consumerTag = secondaryChannel.basicConsume(q, false, c);
99+
100+
// requeue multi
101+
channel.basicNack(tag2, true, true);
102+
103+
long tag3 = checkDeliveries(c, m1, m3, m4);
104+
105+
secondaryChannel.basicCancel(consumerTag);
106+
107+
// no requeue
108+
secondaryChannel.basicNack(tag3, true, false);
109+
110+
assertNull(channel.basicGet(q, false));
111+
112+
channel.basicNack(tag3, true, true);
113+
114+
expectError(AMQP.PRECONDITION_FAILED);
115+
}
116+
117+
public void testNackAll() throws Exception {
118+
String q =
119+
channel.queueDeclare("", false, true, false, null).getQueue();
120+
121+
byte[] m1 = "1".getBytes();
122+
byte[] m2 = "2".getBytes();
123+
124+
basicPublishVolatile(m1, q);
125+
basicPublishVolatile(m2, q);
126+
127+
checkDelivery(channel.basicGet(q, false), m1, false);
128+
checkDelivery(channel.basicGet(q, false), m2, false);
129+
130+
// nack all
131+
channel.basicNack(0, true, true);
132+
133+
QueueingConsumer c = new QueueingConsumer(secondaryChannel);
134+
String consumerTag = secondaryChannel.basicConsume(q, true, c);
135+
136+
checkDeliveries(c, m1, m2);
137+
138+
secondaryChannel.basicCancel(consumerTag);
139+
}
140+
141+
private long checkDeliveries(QueueingConsumer c, byte[]... messages)
142+
throws InterruptedException {
143+
144+
Set<String> msgSet = new HashSet<String>();
145+
for (byte[] message : messages) {
146+
msgSet.add(new String(message));
147+
}
148+
149+
long lastTag = -1;
150+
for(int x = 0; x < messages.length; x++) {
151+
QueueingConsumer.Delivery delivery = c.nextDelivery();
152+
String m = new String(delivery.getBody());
153+
assertTrue("Unexpected message", msgSet.remove(m));
154+
checkDelivery(delivery, m.getBytes(), true);
155+
lastTag = delivery.getEnvelope().getDeliveryTag();
156+
}
157+
158+
assertTrue(msgSet.isEmpty());
159+
return lastTag;
160+
}
161+
}

test/src/com/rabbitmq/client/test/functional/Reject.java

Lines changed: 1 addition & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -31,64 +31,13 @@
3131

3232
package com.rabbitmq.client.test.functional;
3333

34-
import com.rabbitmq.client.test.BrokerTestCase;
35-
3634
import com.rabbitmq.client.AMQP;
37-
import com.rabbitmq.client.Channel;
38-
import com.rabbitmq.client.Envelope;
39-
import com.rabbitmq.client.GetResponse;
4035
import com.rabbitmq.client.QueueingConsumer;
41-
import com.rabbitmq.client.QueueingConsumer.Delivery;
4236

4337
import java.io.IOException;
44-
import java.util.Arrays;
4538

46-
public class Reject extends BrokerTestCase
39+
public class Reject extends AbstractRejectTest
4740
{
48-
49-
protected Channel secondaryChannel;
50-
51-
@Override
52-
protected void setUp()
53-
throws IOException
54-
{
55-
super.setUp();
56-
secondaryChannel = connection.createChannel();
57-
58-
}
59-
60-
@Override
61-
protected void tearDown()
62-
throws IOException
63-
{
64-
if (secondaryChannel != null) {
65-
secondaryChannel.abort();
66-
secondaryChannel = null;
67-
}
68-
super.tearDown();
69-
}
70-
71-
protected long checkDelivery(Delivery d, byte[] msg, boolean redelivered)
72-
{
73-
assertNotNull(d);
74-
return checkDelivery(d.getEnvelope(), d.getBody(), msg, redelivered);
75-
}
76-
77-
protected long checkDelivery(GetResponse r, byte[] msg, boolean redelivered)
78-
{
79-
assertNotNull(r);
80-
return checkDelivery(r.getEnvelope(), r.getBody(), msg, redelivered);
81-
}
82-
83-
protected long checkDelivery(Envelope e, byte[] m,
84-
byte[] msg, boolean redelivered)
85-
{
86-
assertNotNull(e);
87-
assertTrue(Arrays.equals(m, msg));
88-
assertEquals(e.isRedeliver(), redelivered);
89-
return e.getDeliveryTag();
90-
}
91-
9241
public void testReject()
9342
throws IOException, InterruptedException
9443
{

0 commit comments

Comments
 (0)