Skip to content

Commit e8d353c

Browse files
author
Simon MacMullen
committed
Merge bug25957
2 parents 21f0997 + 8c49f0e commit e8d353c

File tree

8 files changed

+205
-41
lines changed

8 files changed

+205
-41
lines changed

src/com/rabbitmq/client/Channel.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -215,11 +215,24 @@ public interface Channel extends ShutdownNotifier {
215215
* @param prefetchCount maximum number of messages that the server
216216
* will deliver, 0 if unlimited
217217
* @param global true if the settings should be applied to the
218-
* entire connection rather than just the current channel
218+
* entire channel rather than each consumer
219219
* @throws java.io.IOException if an error is encountered
220220
*/
221221
void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;
222222

223+
/**
224+
* Request a specific prefetchCount "quality of service" settings
225+
* for this channel.
226+
*
227+
* @see #basicQos(int, int, boolean)
228+
* @param prefetchCount maximum number of messages that the server
229+
* will deliver, 0 if unlimited
230+
* @param global true if the settings should be applied to the
231+
* entire channel rather than each consumer
232+
* @throws java.io.IOException if an error is encountered
233+
*/
234+
void basicQos(int prefetchCount, boolean global) throws IOException;
235+
223236
/**
224237
* Request a specific prefetchCount "quality of service" settings
225238
* for this channel.

src/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -599,6 +599,13 @@ public void basicQos(int prefetchSize, int prefetchCount, boolean global)
599599
exnWrappingRpc(new Basic.Qos(prefetchSize, prefetchCount, global));
600600
}
601601

602+
/** Public API - {@inheritDoc} */
603+
public void basicQos(int prefetchCount, boolean global)
604+
throws IOException
605+
{
606+
basicQos(0, prefetchCount, global);
607+
}
608+
602609
/** Public API - {@inheritDoc} */
603610
public void basicQos(int prefetchCount)
604611
throws IOException

src/com/rabbitmq/client/impl/recovery/AutorecoveringChannel.java

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,8 @@ public class AutorecoveringChannel implements Channel, Recoverable {
3434
private List<ReturnListener> returnListeners = new ArrayList<ReturnListener>();
3535
private List<ConfirmListener> confirmListeners = new ArrayList<ConfirmListener>();
3636
private List<FlowListener> flowListeners = new ArrayList<FlowListener>();
37-
private int prefetchCount;
38-
private boolean globalQos;
37+
private int prefetchCountConsumer;
38+
private int prefetchCountGlobal;
3939
private boolean usesPublisherConfirms;
4040
private boolean usesTransactions;
4141

@@ -138,16 +138,23 @@ public void setDefaultConsumer(Consumer consumer) {
138138
}
139139

140140
public void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException {
141-
this.prefetchCount = prefetchCount;
142-
this.globalQos = global;
141+
if (global) {
142+
this.prefetchCountGlobal = prefetchCount;
143+
} else {
144+
this.prefetchCountConsumer = prefetchCount;
145+
}
146+
143147
delegate.basicQos(prefetchSize, prefetchCount, global);
144148
}
145149

146150
public void basicQos(int prefetchCount) throws IOException {
147-
148151
basicQos(0, prefetchCount, false);
149152
}
150153

154+
public void basicQos(int prefetchCount, boolean global) throws IOException {
155+
basicQos(0, prefetchCount, global);
156+
}
157+
151158
public void basicPublish(String exchange, String routingKey, AMQP.BasicProperties props, byte[] body) throws IOException {
152159
delegate.basicPublish(exchange, routingKey, props, body);
153160
}
@@ -432,7 +439,12 @@ private void recoverFlowListeners() {
432439
}
433440

434441
private void recoverState() throws IOException {
435-
basicQos(0, this.prefetchCount, this.globalQos);
442+
if (this.prefetchCountConsumer != 0) {
443+
basicQos(this.prefetchCountConsumer, false);
444+
}
445+
if (this.prefetchCountGlobal != 0) {
446+
basicQos(this.prefetchCountGlobal, true);
447+
}
436448
if(this.usesPublisherConfirms) {
437449
this.confirmSelect();
438450
}

test/src/com/rabbitmq/client/test/functional/FunctionalTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,5 +76,6 @@ public static void add(TestSuite suite) {
7676
suite.addTestSuite(ConsumerPriorities.class);
7777
suite.addTestSuite(Policies.class);
7878
suite.addTestSuite(ConnectionRecovery.class);
79+
suite.addTestSuite(PerConsumerPrefetch.class);
7980
}
8081
}
Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
package com.rabbitmq.client.test.functional;
2+
3+
import com.rabbitmq.client.GetResponse;
4+
import com.rabbitmq.client.QueueingConsumer;
5+
import com.rabbitmq.client.QueueingConsumer.Delivery;
6+
import com.rabbitmq.client.test.BrokerTestCase;
7+
8+
import java.io.IOException;
9+
import java.util.Arrays;
10+
import java.util.Deque;
11+
12+
import static com.rabbitmq.client.test.functional.QosTests.drain;
13+
14+
public class PerConsumerPrefetch extends BrokerTestCase {
15+
private String q;
16+
17+
@Override
18+
protected void createResources() throws IOException {
19+
q = channel.queueDeclare().getQueue();
20+
}
21+
22+
private interface Closure {
23+
public void makeMore(Deque<Delivery> deliveries) throws IOException;
24+
}
25+
26+
public void testSingleAck() throws IOException {
27+
testPrefetch(new Closure() {
28+
public void makeMore(Deque<Delivery> deliveries) throws IOException {
29+
for (Delivery del : deliveries) {
30+
ack(del, false);
31+
}
32+
}
33+
});
34+
}
35+
36+
public void testMultiAck() throws IOException {
37+
testPrefetch(new Closure() {
38+
public void makeMore(Deque<Delivery> deliveries) throws IOException {
39+
ack(deliveries.getLast(), true);
40+
}
41+
});
42+
}
43+
44+
public void testSingleNack() throws IOException {
45+
for (final boolean requeue: Arrays.asList(false, true)) {
46+
testPrefetch(new Closure() {
47+
public void makeMore(Deque<Delivery> deliveries) throws IOException {
48+
for (Delivery del : deliveries) {
49+
nack(del, false, requeue);
50+
}
51+
}
52+
});
53+
}
54+
}
55+
56+
public void testMultiNack() throws IOException {
57+
for (final boolean requeue: Arrays.asList(false, true)) {
58+
testPrefetch(new Closure() {
59+
public void makeMore(Deque<Delivery> deliveries) throws IOException {
60+
nack(deliveries.getLast(), true, requeue);
61+
}
62+
});
63+
}
64+
}
65+
66+
public void testRecover() throws IOException {
67+
testPrefetch(new Closure() {
68+
public void makeMore(Deque<Delivery> deliveries) throws IOException {
69+
channel.basicRecover();
70+
}
71+
});
72+
}
73+
74+
private void testPrefetch(Closure closure) throws IOException {
75+
QueueingConsumer c = new QueueingConsumer(channel);
76+
publish(q, 15);
77+
consume(c, 5, false);
78+
Deque<Delivery> deliveries = drain(c, 5);
79+
80+
ack(channel.basicGet(q, false), false);
81+
drain(c, 0);
82+
83+
closure.makeMore(deliveries);
84+
drain(c, 5);
85+
}
86+
87+
public void testPrefetchOnEmpty() throws IOException {
88+
QueueingConsumer c = new QueueingConsumer(channel);
89+
publish(q, 5);
90+
consume(c, 10, false);
91+
drain(c, 5);
92+
publish(q, 10);
93+
drain(c, 5);
94+
}
95+
96+
public void testAutoAckIgnoresPrefetch() throws IOException {
97+
QueueingConsumer c = new QueueingConsumer(channel);
98+
publish(q, 10);
99+
consume(c, 1, true);
100+
drain(c, 10);
101+
}
102+
103+
public void testPrefetchZeroMeansInfinity() throws IOException {
104+
QueueingConsumer c = new QueueingConsumer(channel);
105+
publish(q, 10);
106+
consume(c, 0, false);
107+
drain(c, 10);
108+
}
109+
110+
private void publish(String q, int n) throws IOException {
111+
for (int i = 0; i < n; i++) {
112+
channel.basicPublish("", q, null, "".getBytes());
113+
}
114+
}
115+
116+
private void consume(QueueingConsumer c, int prefetch, boolean autoAck) throws IOException {
117+
channel.basicQos(prefetch);
118+
channel.basicConsume(q, autoAck, c);
119+
}
120+
121+
private void ack(Delivery del, boolean multi) throws IOException {
122+
channel.basicAck(del.getEnvelope().getDeliveryTag(), multi);
123+
}
124+
125+
private void ack(GetResponse get, boolean multi) throws IOException {
126+
channel.basicAck(get.getEnvelope().getDeliveryTag(), multi);
127+
}
128+
129+
private void nack(Delivery del, boolean multi, boolean requeue) throws IOException {
130+
channel.basicNack(del.getEnvelope().getDeliveryTag(), multi, requeue);
131+
}
132+
}

test/src/com/rabbitmq/client/test/functional/QosTests.java

Lines changed: 17 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.ArrayList;
2323
import java.util.Arrays;
2424
import java.util.Collections;
25+
import java.util.Deque;
2526
import java.util.HashMap;
2627
import java.util.LinkedList;
2728
import java.util.List;
@@ -65,10 +66,10 @@ public void fill(int n)
6566
* receive n messages - check that we receive no fewer and cannot
6667
* receive more
6768
**/
68-
public Queue<Delivery> drain(QueueingConsumer c, int n)
69+
public static Deque<Delivery> drain(QueueingConsumer c, int n)
6970
throws IOException
7071
{
71-
Queue<Delivery> res = new LinkedList<Delivery>();
72+
Deque<Delivery> res = new LinkedList<Delivery>();
7273
try {
7374
long start = System.currentTimeMillis();
7475
for (int i = 0; i < n; i++) {
@@ -85,17 +86,6 @@ public Queue<Delivery> drain(QueueingConsumer c, int n)
8586
return res;
8687
}
8788

88-
public void testMessageLimitGlobalFails()
89-
throws IOException
90-
{
91-
try {
92-
channel.basicQos(0, 1, true);
93-
fail("basic.qos{global=false} should not be supported");
94-
} catch (IOException ioe) {
95-
checkShutdownSignal(AMQP.NOT_IMPLEMENTED, ioe);
96-
}
97-
}
98-
9989
public void testMessageLimitPrefetchSizeFails()
10090
throws IOException
10191
{
@@ -120,15 +110,15 @@ public void testNoAckNoAlterLimit()
120110
{
121111
QueueingConsumer c = new QueueingConsumer(channel);
122112
declareBindConsume(channel, c, true);
123-
channel.basicQos(1);
113+
channel.basicQos(1, true);
124114
fill(2);
125115
drain(c, 2);
126116
}
127117

128118
public void testNoAckObeysLimit()
129119
throws IOException
130120
{
131-
channel.basicQos(1);
121+
channel.basicQos(1, true);
132122
QueueingConsumer c1 = new QueueingConsumer(channel);
133123
declareBindConsume(channel, c1, false);
134124
fill(1);
@@ -201,7 +191,7 @@ public void testSingleChannelAndQueueFairness()
201191
//channel & queue, and a prefetch limit set, that all
202192
//consumers get a fair share of the messages.
203193

204-
channel.basicQos(1);
194+
channel.basicQos(1, true);
205195
String q = channel.queueDeclare().getQueue();
206196
channel.queueBind(q, "amq.fanout", "");
207197

@@ -246,7 +236,7 @@ public void testSingleChannelAndQueueFairness()
246236
public void testConsumerLifecycle()
247237
throws IOException
248238
{
249-
channel.basicQos(1);
239+
channel.basicQos(1, true);
250240
QueueingConsumer c = new QueueingConsumer(channel);
251241
String queue = "qosTest";
252242
channel.queueDeclare(queue, false, false, false, null);
@@ -269,7 +259,7 @@ public void testSetLimitAfterConsume()
269259
{
270260
QueueingConsumer c = new QueueingConsumer(channel);
271261
declareBindConsume(c);
272-
channel.basicQos(1);
262+
channel.basicQos(1, true);
273263
fill(3);
274264
//We actually only guarantee that the limit takes effect
275265
//*eventually*, so this can in fact fail. It's pretty unlikely
@@ -284,7 +274,7 @@ public void testLimitIncrease()
284274
{
285275
QueueingConsumer c = new QueueingConsumer(channel);
286276
configure(c, 1, 3);
287-
channel.basicQos(2);
277+
channel.basicQos(2, true);
288278
drain(c, 1);
289279
}
290280

@@ -293,7 +283,7 @@ public void testLimitDecrease()
293283
{
294284
QueueingConsumer c = new QueueingConsumer(channel);
295285
Queue<Delivery> d = configure(c, 2, 4);
296-
channel.basicQos(1);
286+
channel.basicQos(1, true);
297287
drain(c, 0);
298288
ack(d, true);
299289
drain(c, 1);
@@ -304,7 +294,7 @@ public void testLimitedToUnlimited()
304294
{
305295
QueueingConsumer c = new QueueingConsumer(channel);
306296
configure(c, 1, 3);
307-
channel.basicQos(0);
297+
channel.basicQos(0, true);
308298
drain(c, 2);
309299
}
310300

@@ -319,8 +309,8 @@ public void testLimitingMultipleChannels()
319309
String q2 = declareBindConsume(ch2, c2, false);
320310
ch1.basicConsume(q2, false, c1);
321311
ch2.basicConsume(q1, false, c2);
322-
ch1.basicQos(1);
323-
ch2.basicQos(1);
312+
ch1.basicQos(1, true);
313+
ch2.basicQos(1, true);
324314
fill(5);
325315
Queue<Delivery> d1 = drain(c1, 1);
326316
Queue<Delivery> d2 = drain(c2, 1);
@@ -339,13 +329,13 @@ public void testLimitInheritsUnackedCount()
339329
declareBindConsume(c);
340330
fill(1);
341331
drain(c, 1);
342-
channel.basicQos(2);
332+
channel.basicQos(2, true);
343333
fill(2);
344334
drain(c, 1);
345335
}
346336

347337
public void testRecoverReducesLimit() throws Exception {
348-
channel.basicQos(2);
338+
channel.basicQos(2, true);
349339
QueueingConsumer c = new QueueingConsumer(channel);
350340
declareBindConsume(c);
351341
fill(3);
@@ -426,7 +416,7 @@ protected List<String> configure(QueueingConsumer c,
426416
int messages)
427417
throws IOException
428418
{
429-
channel.basicQos(limit);
419+
channel.basicQos(limit, true);
430420

431421
//declare/bind/consume-from queues
432422
List <String> queues = new ArrayList<String>();
@@ -445,7 +435,7 @@ protected Queue<Delivery> configure(QueueingConsumer c,
445435
int messages)
446436
throws IOException
447437
{
448-
channel.basicQos(limit);
438+
channel.basicQos(limit, true);
449439
declareBindConsume(c);
450440
fill(messages);
451441
return drain(c, limit);

0 commit comments

Comments
 (0)