Skip to content

Commit 01c6b67

Browse files
author
Simon MacMullen
committed
Merge in default
2 parents fe069ad + bdb8b31 commit 01c6b67

File tree

8 files changed

+230
-16
lines changed

8 files changed

+230
-16
lines changed

src/com/rabbitmq/client/Channel.java

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -215,11 +215,24 @@ public interface Channel extends ShutdownNotifier {
215215
* @param prefetchCount maximum number of messages that the server
216216
* will deliver, 0 if unlimited
217217
* @param global true if the settings should be applied to the
218-
* entire connection rather than just the current channel
218+
* entire channel rather than each consumer
219219
* @throws java.io.IOException if an error is encountered
220220
*/
221221
void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;
222222

223+
/**
224+
* Request a specific prefetchCount "quality of service" settings
225+
* for this channel.
226+
*
227+
* @see #basicQos(int, int, boolean)
228+
* @param prefetchCount maximum number of messages that the server
229+
* will deliver, 0 if unlimited
230+
* @param global true if the settings should be applied to the
231+
* entire channel rather than each consumer
232+
* @throws java.io.IOException if an error is encountered
233+
*/
234+
void basicQos(int prefetchCount, boolean global) throws IOException;
235+
223236
/**
224237
* Request a specific prefetchCount "quality of service" settings
225238
* for this channel.
@@ -615,6 +628,23 @@ void basicNack(long deliveryTag, boolean multiple, boolean requeue)
615628
*/
616629
String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;
617630

631+
/**
632+
* Start a non-nolocal, non-exclusive consumer, with
633+
* a server-generated consumerTag and specified arguments.
634+
* @param queue the name of the queue
635+
* @param autoAck true if the server should consider messages
636+
* acknowledged once delivered; false if the server should expect
637+
* explicit acknowledgements
638+
* @param arguments a set of arguments for the consume
639+
* @param callback an interface to the consumer object
640+
* @return the consumerTag generated by the server
641+
* @throws java.io.IOException if an error is encountered
642+
* @see com.rabbitmq.client.AMQP.Basic.Consume
643+
* @see com.rabbitmq.client.AMQP.Basic.ConsumeOk
644+
* @see #basicConsume(String, boolean, String, boolean, boolean, Map, Consumer)
645+
*/
646+
String basicConsume(String queue, boolean autoAck, Map<String, Object> arguments, Consumer callback) throws IOException;
647+
618648
/**
619649
* Start a non-nolocal, non-exclusive consumer.
620650
* @param queue the name of the queue

src/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -595,6 +595,13 @@ public void basicQos(int prefetchSize, int prefetchCount, boolean global)
595595
exnWrappingRpc(new Basic.Qos(prefetchSize, prefetchCount, global));
596596
}
597597

598+
/** Public API - {@inheritDoc} */
599+
public void basicQos(int prefetchCount, boolean global)
600+
throws IOException
601+
{
602+
basicQos(0, prefetchCount, global);
603+
}
604+
598605
/** Public API - {@inheritDoc} */
599606
public void basicQos(int prefetchCount)
600607
throws IOException
@@ -935,6 +942,14 @@ public String basicConsume(String queue, boolean autoAck, Consumer callback)
935942
return basicConsume(queue, autoAck, "", callback);
936943
}
937944

945+
/** Public API - {@inheritDoc} */
946+
public String basicConsume(String queue, boolean autoAck, Map<String, Object> arguments,
947+
Consumer callback)
948+
throws IOException
949+
{
950+
return basicConsume(queue, autoAck, "", false, false, arguments, callback);
951+
}
952+
938953
/** Public API - {@inheritDoc} */
939954
public String basicConsume(String queue, boolean autoAck, String consumerTag,
940955
Consumer callback)

test/src/com/rabbitmq/client/test/functional/ConsumerPriorities.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ private void assertFailValidation(Map<String, Object> args) throws IOException {
2323
Channel ch = connection.createChannel();
2424
String queue = ch.queueDeclare().getQueue();
2525
try {
26-
ch.basicConsume(queue, true, "", false, false, args, new QueueingConsumer(ch));
26+
ch.basicConsume(queue, true, args, new QueueingConsumer(ch));
2727
fail("Validation should fail for " + args);
2828
} catch (IOException ioe) {
2929
checkShutdownSignal(AMQP.PRECONDITION_FAILED, ioe);
@@ -37,14 +37,14 @@ public void testConsumerPriorities() throws Exception {
3737
QueueingConsumer highConsumer = new QueueingConsumer(channel);
3838
QueueingConsumer medConsumer = new QueueingConsumer(channel);
3939
QueueingConsumer lowConsumer = new QueueingConsumer(channel);
40-
channel.basicConsume(queue, true, "high", false, false, args(1), highConsumer);
41-
channel.basicConsume(queue, true, "med", false, false, null, medConsumer);
42-
channel.basicConsume(queue, true, "low", false, false, args(-1), lowConsumer);
40+
String high = channel.basicConsume(queue, true, args(1), highConsumer);
41+
String med = channel.basicConsume(queue, true, medConsumer);
42+
channel.basicConsume(queue, true, args(-1), lowConsumer);
4343

4444
publish(queue, COUNT, "high");
45-
channel.basicCancel("high");
45+
channel.basicCancel(high);
4646
publish(queue, COUNT, "med");
47-
channel.basicCancel("med");
47+
channel.basicCancel(med);
4848
publish(queue, COUNT, "low");
4949

5050
assertContents(highConsumer, COUNT, "high");

test/src/com/rabbitmq/client/test/functional/FunctionalTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,5 +75,6 @@ public static void add(TestSuite suite) {
7575
suite.addTestSuite(HeadersExchangeValidation.class);
7676
suite.addTestSuite(ConsumerPriorities.class);
7777
suite.addTestSuite(Policies.class);
78+
suite.addTestSuite(PerConsumerPrefetch.class);
7879
}
7980
}
Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
package com.rabbitmq.client.test.functional;
2+
3+
import com.rabbitmq.client.AMQP;
4+
import com.rabbitmq.client.Channel;
5+
import com.rabbitmq.client.GetResponse;
6+
import com.rabbitmq.client.QueueingConsumer;
7+
import com.rabbitmq.client.QueueingConsumer.Delivery;
8+
import com.rabbitmq.client.test.BrokerTestCase;
9+
10+
import java.io.IOException;
11+
import java.util.Arrays;
12+
import java.util.Deque;
13+
import java.util.HashMap;
14+
import java.util.Map;
15+
16+
import static com.rabbitmq.client.test.functional.QosTests.drain;
17+
18+
public class PerConsumerPrefetch extends BrokerTestCase {
19+
private String q;
20+
21+
@Override
22+
protected void createResources() throws IOException {
23+
q = channel.queueDeclare().getQueue();
24+
}
25+
26+
private interface Closure {
27+
public void makeMore(Deque<Delivery> deliveries) throws IOException;
28+
}
29+
30+
public void testSingleAck() throws IOException {
31+
testPrefetch(new Closure() {
32+
public void makeMore(Deque<Delivery> deliveries) throws IOException {
33+
for (Delivery del : deliveries) {
34+
ack(del, false);
35+
}
36+
}
37+
});
38+
}
39+
40+
public void testMultiAck() throws IOException {
41+
testPrefetch(new Closure() {
42+
public void makeMore(Deque<Delivery> deliveries) throws IOException {
43+
ack(deliveries.getLast(), true);
44+
}
45+
});
46+
}
47+
48+
public void testSingleNack() throws IOException {
49+
for (final boolean requeue: Arrays.asList(false, true)) {
50+
testPrefetch(new Closure() {
51+
public void makeMore(Deque<Delivery> deliveries) throws IOException {
52+
for (Delivery del : deliveries) {
53+
nack(del, false, requeue);
54+
}
55+
}
56+
});
57+
}
58+
}
59+
60+
public void testMultiNack() throws IOException {
61+
for (final boolean requeue: Arrays.asList(false, true)) {
62+
testPrefetch(new Closure() {
63+
public void makeMore(Deque<Delivery> deliveries) throws IOException {
64+
nack(deliveries.getLast(), true, requeue);
65+
}
66+
});
67+
}
68+
}
69+
70+
public void testRecover() throws IOException {
71+
testPrefetch(new Closure() {
72+
public void makeMore(Deque<Delivery> deliveries) throws IOException {
73+
channel.basicRecover();
74+
}
75+
});
76+
}
77+
78+
private void testPrefetch(Closure closure) throws IOException {
79+
QueueingConsumer c = new QueueingConsumer(channel);
80+
publish(q, 15);
81+
consume(c, 5, false);
82+
Deque<Delivery> deliveries = drain(c, 5);
83+
84+
ack(channel.basicGet(q, false), false);
85+
drain(c, 0);
86+
87+
closure.makeMore(deliveries);
88+
drain(c, 5);
89+
}
90+
91+
public void testPrefetchOnEmpty() throws IOException {
92+
QueueingConsumer c = new QueueingConsumer(channel);
93+
publish(q, 5);
94+
consume(c, 10, false);
95+
drain(c, 5);
96+
publish(q, 10);
97+
drain(c, 5);
98+
}
99+
100+
public void testAutoAckIgnoresPrefetch() throws IOException {
101+
QueueingConsumer c = new QueueingConsumer(channel);
102+
publish(q, 10);
103+
consume(c, 1, true);
104+
drain(c, 10);
105+
}
106+
107+
public void testPrefetchZeroMeansInfinity() throws IOException {
108+
QueueingConsumer c = new QueueingConsumer(channel);
109+
publish(q, 10);
110+
consume(c, 0, false);
111+
drain(c, 10);
112+
}
113+
114+
public void testPrefetchValidation() throws IOException {
115+
validationFail(-1);
116+
validationFail(new HashMap<String, Object>());
117+
validationFail("banana");
118+
}
119+
120+
private void validationFail(Object badThing) throws IOException {
121+
Channel ch = connection.createChannel();
122+
QueueingConsumer c = new QueueingConsumer(ch);
123+
124+
try {
125+
ch.basicConsume(q, false, args(badThing), c);
126+
} catch (IOException e) {
127+
checkShutdownSignal(AMQP.PRECONDITION_FAILED, e);
128+
}
129+
}
130+
131+
private void publish(String q, int n) throws IOException {
132+
for (int i = 0; i < n; i++) {
133+
channel.basicPublish("", q, null, "".getBytes());
134+
}
135+
}
136+
137+
private void consume(QueueingConsumer c, int prefetch, boolean autoAck) throws IOException {
138+
channel.basicConsume(q, autoAck, args(prefetch), c);
139+
}
140+
141+
private void ack(Delivery del, boolean multi) throws IOException {
142+
channel.basicAck(del.getEnvelope().getDeliveryTag(), multi);
143+
}
144+
145+
private void ack(GetResponse get, boolean multi) throws IOException {
146+
channel.basicAck(get.getEnvelope().getDeliveryTag(), multi);
147+
}
148+
149+
private void nack(Delivery del, boolean multi, boolean requeue) throws IOException {
150+
channel.basicNack(del.getEnvelope().getDeliveryTag(), multi, requeue);
151+
}
152+
153+
private Map<String, Object> args(Object prefetch) {
154+
Map<String, Object> a = new HashMap<String, Object>();
155+
a.put("x-prefetch", prefetch);
156+
return a;
157+
}
158+
}

test/src/com/rabbitmq/client/test/functional/QosTests.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.ArrayList;
2323
import java.util.Arrays;
2424
import java.util.Collections;
25+
import java.util.Deque;
2526
import java.util.HashMap;
2627
import java.util.LinkedList;
2728
import java.util.List;
@@ -65,10 +66,10 @@ public void fill(int n)
6566
* receive n messages - check that we receive no fewer and cannot
6667
* receive more
6768
**/
68-
public Queue<Delivery> drain(QueueingConsumer c, int n)
69+
public static Deque<Delivery> drain(QueueingConsumer c, int n)
6970
throws IOException
7071
{
71-
Queue<Delivery> res = new LinkedList<Delivery>();
72+
Deque<Delivery> res = new LinkedList<Delivery>();
7273
try {
7374
long start = System.currentTimeMillis();
7475
for (int i = 0; i < n; i++) {

test/src/com/rabbitmq/examples/PerfTest.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,8 @@ public static void main(String[] args) {
5858
long confirm = intArg(cmd, 'c', -1);
5959
boolean autoAck = cmd.hasOption('a');
6060
int multiAckEvery = intArg(cmd, 'A', 0);
61-
int prefetchCount = intArg(cmd, 'q', 0);
61+
int channelPrefetch = intArg(cmd, 'G', 0);
62+
int consumerPrefetch = intArg(cmd, 'q', 0);
6263
int minMsgSize = intArg(cmd, 's', 0);
6364
int timeLimit = intArg(cmd, 'z', 0);
6465
int producerMsgCount = intArg(cmd, 'C', 0);
@@ -97,7 +98,8 @@ public static void main(String[] args) {
9798
p.setMultiAckEvery( multiAckEvery);
9899
p.setMinMsgSize( minMsgSize);
99100
p.setPredeclared( predeclared);
100-
p.setPrefetchCount( prefetchCount);
101+
p.setConsumerPrefetch( consumerPrefetch);
102+
p.setChannelPrefetch( channelPrefetch);
101103
p.setProducerCount( producerCount);
102104
p.setProducerMsgCount( producerMsgCount);
103105
p.setProducerTxSize( producerTxSize);
@@ -143,7 +145,8 @@ private static Options getOptions() {
143145
options.addOption(new Option("c", "confirm", true, "max unconfirmed publishes"));
144146
options.addOption(new Option("a", "autoack", false,"auto ack"));
145147
options.addOption(new Option("A", "multiAckEvery", true, "multi ack every"));
146-
options.addOption(new Option("q", "qos", true, "qos prefetch count"));
148+
options.addOption(new Option("q", "qos", true, "consumer prefetch count"));
149+
options.addOption(new Option("G", "globalQos", true, "channel prefetch count"));
147150
options.addOption(new Option("s", "size", true, "message size"));
148151
options.addOption(new Option("z", "time", true, "time limit"));
149152
options.addOption(new Option("C", "pmessages", true, "producer message count"));

test/src/com/rabbitmq/examples/perf/MulticastParams.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@ public class MulticastParams {
3232
private int producerCount = 1;
3333
private int consumerTxSize = 0;
3434
private int producerTxSize = 0;
35-
private int prefetchCount = 0;
35+
private int channelPrefetch = 0;
36+
private int consumerPrefetch = 0;
3637
private int minMsgSize = 0;
3738

3839
private int timeLimit = 0;
@@ -101,8 +102,12 @@ public void setMultiAckEvery(int multiAckEvery) {
101102
this.multiAckEvery = multiAckEvery;
102103
}
103104

104-
public void setPrefetchCount(int prefetchCount) {
105-
this.prefetchCount = prefetchCount;
105+
public void setChannelPrefetch(int channelPrefetch) {
106+
this.channelPrefetch = channelPrefetch;
107+
}
108+
109+
public void setConsumerPrefetch(int consumerPrefetch) {
110+
this.consumerPrefetch = consumerPrefetch;
106111
}
107112

108113
public void setMinMsgSize(int minMsgSize) {
@@ -175,7 +180,8 @@ public Consumer createConsumer(Connection connection, Stats stats, String id) th
175180
Channel channel = connection.createChannel();
176181
if (consumerTxSize > 0) channel.txSelect();
177182
String qName = configureQueue(connection, id);
178-
if (prefetchCount > 0) channel.basicQos(prefetchCount);
183+
if (consumerPrefetch > 0) channel.basicQos(consumerPrefetch);
184+
if (channelPrefetch > 0) channel.basicQos(channelPrefetch, true);
179185
return new Consumer(channel, id, qName,
180186
consumerTxSize, autoAck, multiAckEvery,
181187
stats, consumerMsgCount, timeLimit);

0 commit comments

Comments
 (0)