Skip to content

Commit 3891bfb

Browse files
author
Simon MacMullen
committed
Merge in default
2 parents 21f0997 + 9810cce commit 3891bfb

File tree

8 files changed

+218
-41
lines changed

8 files changed

+218
-41
lines changed

src/com/rabbitmq/client/Channel.java

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -215,11 +215,24 @@ public interface Channel extends ShutdownNotifier {
215215
* @param prefetchCount maximum number of messages that the server
216216
* will deliver, 0 if unlimited
217217
* @param global true if the settings should be applied to the
218-
* entire connection rather than just the current channel
218+
* entire channel rather than each consumer
219219
* @throws java.io.IOException if an error is encountered
220220
*/
221221
void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;
222222

223+
/**
224+
* Request a specific prefetchCount "quality of service" settings
225+
* for this channel.
226+
*
227+
* @see #basicQos(int, int, boolean)
228+
* @param prefetchCount maximum number of messages that the server
229+
* will deliver, 0 if unlimited
230+
* @param global true if the settings should be applied to the
231+
* entire channel rather than each consumer
232+
* @throws java.io.IOException if an error is encountered
233+
*/
234+
void basicQos(int prefetchCount, boolean global) throws IOException;
235+
223236
/**
224237
* Request a specific prefetchCount "quality of service" settings
225238
* for this channel.
@@ -615,6 +628,23 @@ void basicNack(long deliveryTag, boolean multiple, boolean requeue)
615628
*/
616629
String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;
617630

631+
/**
632+
* Start a non-nolocal, non-exclusive consumer, with
633+
* a server-generated consumerTag and specified arguments.
634+
* @param queue the name of the queue
635+
* @param autoAck true if the server should consider messages
636+
* acknowledged once delivered; false if the server should expect
637+
* explicit acknowledgements
638+
* @param arguments a set of arguments for the consume
639+
* @param callback an interface to the consumer object
640+
* @return the consumerTag generated by the server
641+
* @throws java.io.IOException if an error is encountered
642+
* @see com.rabbitmq.client.AMQP.Basic.Consume
643+
* @see com.rabbitmq.client.AMQP.Basic.ConsumeOk
644+
* @see #basicConsume(String, boolean, String, boolean, boolean, Map, Consumer)
645+
*/
646+
String basicConsume(String queue, boolean autoAck, Map<String, Object> arguments, Consumer callback) throws IOException;
647+
618648
/**
619649
* Start a non-nolocal, non-exclusive consumer.
620650
* @param queue the name of the queue

src/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -599,6 +599,13 @@ public void basicQos(int prefetchSize, int prefetchCount, boolean global)
599599
exnWrappingRpc(new Basic.Qos(prefetchSize, prefetchCount, global));
600600
}
601601

602+
/** Public API - {@inheritDoc} */
603+
public void basicQos(int prefetchCount, boolean global)
604+
throws IOException
605+
{
606+
basicQos(0, prefetchCount, global);
607+
}
608+
602609
/** Public API - {@inheritDoc} */
603610
public void basicQos(int prefetchCount)
604611
throws IOException
@@ -939,6 +946,14 @@ public String basicConsume(String queue, boolean autoAck, Consumer callback)
939946
return basicConsume(queue, autoAck, "", callback);
940947
}
941948

949+
/** Public API - {@inheritDoc} */
950+
public String basicConsume(String queue, boolean autoAck, Map<String, Object> arguments,
951+
Consumer callback)
952+
throws IOException
953+
{
954+
return basicConsume(queue, autoAck, "", false, false, arguments, callback);
955+
}
956+
942957
/** Public API - {@inheritDoc} */
943958
public String basicConsume(String queue, boolean autoAck, String consumerTag,
944959
Consumer callback)

test/src/com/rabbitmq/client/test/functional/ConsumerPriorities.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ private void assertFailValidation(Map<String, Object> args) throws IOException {
2323
Channel ch = connection.createChannel();
2424
String queue = ch.queueDeclare().getQueue();
2525
try {
26-
ch.basicConsume(queue, true, "", false, false, args, new QueueingConsumer(ch));
26+
ch.basicConsume(queue, true, args, new QueueingConsumer(ch));
2727
fail("Validation should fail for " + args);
2828
} catch (IOException ioe) {
2929
checkShutdownSignal(AMQP.PRECONDITION_FAILED, ioe);
@@ -37,14 +37,14 @@ public void testConsumerPriorities() throws Exception {
3737
QueueingConsumer highConsumer = new QueueingConsumer(channel);
3838
QueueingConsumer medConsumer = new QueueingConsumer(channel);
3939
QueueingConsumer lowConsumer = new QueueingConsumer(channel);
40-
channel.basicConsume(queue, true, "high", false, false, args(1), highConsumer);
41-
channel.basicConsume(queue, true, "med", false, false, null, medConsumer);
42-
channel.basicConsume(queue, true, "low", false, false, args(-1), lowConsumer);
40+
String high = channel.basicConsume(queue, true, args(1), highConsumer);
41+
String med = channel.basicConsume(queue, true, medConsumer);
42+
channel.basicConsume(queue, true, args(-1), lowConsumer);
4343

4444
publish(queue, COUNT, "high");
45-
channel.basicCancel("high");
45+
channel.basicCancel(high);
4646
publish(queue, COUNT, "med");
47-
channel.basicCancel("med");
47+
channel.basicCancel(med);
4848
publish(queue, COUNT, "low");
4949

5050
assertContents(highConsumer, COUNT, "high");

test/src/com/rabbitmq/client/test/functional/FunctionalTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,5 +76,6 @@ public static void add(TestSuite suite) {
7676
suite.addTestSuite(ConsumerPriorities.class);
7777
suite.addTestSuite(Policies.class);
7878
suite.addTestSuite(ConnectionRecovery.class);
79+
suite.addTestSuite(PerConsumerPrefetch.class);
7980
}
8081
}
Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
package com.rabbitmq.client.test.functional;
2+
3+
import com.rabbitmq.client.GetResponse;
4+
import com.rabbitmq.client.QueueingConsumer;
5+
import com.rabbitmq.client.QueueingConsumer.Delivery;
6+
import com.rabbitmq.client.test.BrokerTestCase;
7+
8+
import java.io.IOException;
9+
import java.util.Arrays;
10+
import java.util.Deque;
11+
12+
import static com.rabbitmq.client.test.functional.QosTests.drain;
13+
14+
public class PerConsumerPrefetch extends BrokerTestCase {
15+
private String q;
16+
17+
@Override
18+
protected void createResources() throws IOException {
19+
q = channel.queueDeclare().getQueue();
20+
}
21+
22+
private interface Closure {
23+
public void makeMore(Deque<Delivery> deliveries) throws IOException;
24+
}
25+
26+
public void testSingleAck() throws IOException {
27+
testPrefetch(new Closure() {
28+
public void makeMore(Deque<Delivery> deliveries) throws IOException {
29+
for (Delivery del : deliveries) {
30+
ack(del, false);
31+
}
32+
}
33+
});
34+
}
35+
36+
public void testMultiAck() throws IOException {
37+
testPrefetch(new Closure() {
38+
public void makeMore(Deque<Delivery> deliveries) throws IOException {
39+
ack(deliveries.getLast(), true);
40+
}
41+
});
42+
}
43+
44+
public void testSingleNack() throws IOException {
45+
for (final boolean requeue: Arrays.asList(false, true)) {
46+
testPrefetch(new Closure() {
47+
public void makeMore(Deque<Delivery> deliveries) throws IOException {
48+
for (Delivery del : deliveries) {
49+
nack(del, false, requeue);
50+
}
51+
}
52+
});
53+
}
54+
}
55+
56+
public void testMultiNack() throws IOException {
57+
for (final boolean requeue: Arrays.asList(false, true)) {
58+
testPrefetch(new Closure() {
59+
public void makeMore(Deque<Delivery> deliveries) throws IOException {
60+
nack(deliveries.getLast(), true, requeue);
61+
}
62+
});
63+
}
64+
}
65+
66+
public void testRecover() throws IOException {
67+
testPrefetch(new Closure() {
68+
public void makeMore(Deque<Delivery> deliveries) throws IOException {
69+
channel.basicRecover();
70+
}
71+
});
72+
}
73+
74+
private void testPrefetch(Closure closure) throws IOException {
75+
QueueingConsumer c = new QueueingConsumer(channel);
76+
publish(q, 15);
77+
consume(c, 5, false);
78+
Deque<Delivery> deliveries = drain(c, 5);
79+
80+
ack(channel.basicGet(q, false), false);
81+
drain(c, 0);
82+
83+
closure.makeMore(deliveries);
84+
drain(c, 5);
85+
}
86+
87+
public void testPrefetchOnEmpty() throws IOException {
88+
QueueingConsumer c = new QueueingConsumer(channel);
89+
publish(q, 5);
90+
consume(c, 10, false);
91+
drain(c, 5);
92+
publish(q, 10);
93+
drain(c, 5);
94+
}
95+
96+
public void testAutoAckIgnoresPrefetch() throws IOException {
97+
QueueingConsumer c = new QueueingConsumer(channel);
98+
publish(q, 10);
99+
consume(c, 1, true);
100+
drain(c, 10);
101+
}
102+
103+
public void testPrefetchZeroMeansInfinity() throws IOException {
104+
QueueingConsumer c = new QueueingConsumer(channel);
105+
publish(q, 10);
106+
consume(c, 0, false);
107+
drain(c, 10);
108+
}
109+
110+
private void publish(String q, int n) throws IOException {
111+
for (int i = 0; i < n; i++) {
112+
channel.basicPublish("", q, null, "".getBytes());
113+
}
114+
}
115+
116+
private void consume(QueueingConsumer c, int prefetch, boolean autoAck) throws IOException {
117+
channel.basicQos(prefetch);
118+
channel.basicConsume(q, autoAck, c);
119+
}
120+
121+
private void ack(Delivery del, boolean multi) throws IOException {
122+
channel.basicAck(del.getEnvelope().getDeliveryTag(), multi);
123+
}
124+
125+
private void ack(GetResponse get, boolean multi) throws IOException {
126+
channel.basicAck(get.getEnvelope().getDeliveryTag(), multi);
127+
}
128+
129+
private void nack(Delivery del, boolean multi, boolean requeue) throws IOException {
130+
channel.basicNack(del.getEnvelope().getDeliveryTag(), multi, requeue);
131+
}
132+
}

0 commit comments

Comments
 (0)