Skip to content

Commit 59765ef

Browse files
author
Simon MacMullen
committed
Update tests
1 parent 01c6b67 commit 59765ef

File tree

2 files changed

+16
-53
lines changed

2 files changed

+16
-53
lines changed

test/src/com/rabbitmq/client/test/functional/PerConsumerPrefetch.java

Lines changed: 2 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
package com.rabbitmq.client.test.functional;
22

3-
import com.rabbitmq.client.AMQP;
4-
import com.rabbitmq.client.Channel;
53
import com.rabbitmq.client.GetResponse;
64
import com.rabbitmq.client.QueueingConsumer;
75
import com.rabbitmq.client.QueueingConsumer.Delivery;
@@ -10,8 +8,6 @@
108
import java.io.IOException;
119
import java.util.Arrays;
1210
import java.util.Deque;
13-
import java.util.HashMap;
14-
import java.util.Map;
1511

1612
import static com.rabbitmq.client.test.functional.QosTests.drain;
1713

@@ -111,31 +107,15 @@ public void testPrefetchZeroMeansInfinity() throws IOException {
111107
drain(c, 10);
112108
}
113109

114-
public void testPrefetchValidation() throws IOException {
115-
validationFail(-1);
116-
validationFail(new HashMap<String, Object>());
117-
validationFail("banana");
118-
}
119-
120-
private void validationFail(Object badThing) throws IOException {
121-
Channel ch = connection.createChannel();
122-
QueueingConsumer c = new QueueingConsumer(ch);
123-
124-
try {
125-
ch.basicConsume(q, false, args(badThing), c);
126-
} catch (IOException e) {
127-
checkShutdownSignal(AMQP.PRECONDITION_FAILED, e);
128-
}
129-
}
130-
131110
private void publish(String q, int n) throws IOException {
132111
for (int i = 0; i < n; i++) {
133112
channel.basicPublish("", q, null, "".getBytes());
134113
}
135114
}
136115

137116
private void consume(QueueingConsumer c, int prefetch, boolean autoAck) throws IOException {
138-
channel.basicConsume(q, autoAck, args(prefetch), c);
117+
channel.basicQos(prefetch);
118+
channel.basicConsume(q, autoAck, c);
139119
}
140120

141121
private void ack(Delivery del, boolean multi) throws IOException {
@@ -149,10 +129,4 @@ private void ack(GetResponse get, boolean multi) throws IOException {
149129
private void nack(Delivery del, boolean multi, boolean requeue) throws IOException {
150130
channel.basicNack(del.getEnvelope().getDeliveryTag(), multi, requeue);
151131
}
152-
153-
private Map<String, Object> args(Object prefetch) {
154-
Map<String, Object> a = new HashMap<String, Object>();
155-
a.put("x-prefetch", prefetch);
156-
return a;
157-
}
158132
}

test/src/com/rabbitmq/client/test/functional/QosTests.java

Lines changed: 14 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -86,17 +86,6 @@ public static Deque<Delivery> drain(QueueingConsumer c, int n)
8686
return res;
8787
}
8888

89-
public void testMessageLimitGlobalFails()
90-
throws IOException
91-
{
92-
try {
93-
channel.basicQos(0, 1, true);
94-
fail("basic.qos{global=false} should not be supported");
95-
} catch (IOException ioe) {
96-
checkShutdownSignal(AMQP.NOT_IMPLEMENTED, ioe);
97-
}
98-
}
99-
10089
public void testMessageLimitPrefetchSizeFails()
10190
throws IOException
10291
{
@@ -121,15 +110,15 @@ public void testNoAckNoAlterLimit()
121110
{
122111
QueueingConsumer c = new QueueingConsumer(channel);
123112
declareBindConsume(channel, c, true);
124-
channel.basicQos(1);
113+
channel.basicQos(1, true);
125114
fill(2);
126115
drain(c, 2);
127116
}
128117

129118
public void testNoAckObeysLimit()
130119
throws IOException
131120
{
132-
channel.basicQos(1);
121+
channel.basicQos(1, true);
133122
QueueingConsumer c1 = new QueueingConsumer(channel);
134123
declareBindConsume(channel, c1, false);
135124
fill(1);
@@ -202,7 +191,7 @@ public void testSingleChannelAndQueueFairness()
202191
//channel & queue, and a prefetch limit set, that all
203192
//consumers get a fair share of the messages.
204193

205-
channel.basicQos(1);
194+
channel.basicQos(1, true);
206195
String q = channel.queueDeclare().getQueue();
207196
channel.queueBind(q, "amq.fanout", "");
208197

@@ -247,7 +236,7 @@ public void testSingleChannelAndQueueFairness()
247236
public void testConsumerLifecycle()
248237
throws IOException
249238
{
250-
channel.basicQos(1);
239+
channel.basicQos(1, true);
251240
QueueingConsumer c = new QueueingConsumer(channel);
252241
String queue = "qosTest";
253242
channel.queueDeclare(queue, false, false, false, null);
@@ -270,7 +259,7 @@ public void testSetLimitAfterConsume()
270259
{
271260
QueueingConsumer c = new QueueingConsumer(channel);
272261
declareBindConsume(c);
273-
channel.basicQos(1);
262+
channel.basicQos(1, true);
274263
fill(3);
275264
//We actually only guarantee that the limit takes effect
276265
//*eventually*, so this can in fact fail. It's pretty unlikely
@@ -285,7 +274,7 @@ public void testLimitIncrease()
285274
{
286275
QueueingConsumer c = new QueueingConsumer(channel);
287276
configure(c, 1, 3);
288-
channel.basicQos(2);
277+
channel.basicQos(2, true);
289278
drain(c, 1);
290279
}
291280

@@ -294,7 +283,7 @@ public void testLimitDecrease()
294283
{
295284
QueueingConsumer c = new QueueingConsumer(channel);
296285
Queue<Delivery> d = configure(c, 2, 4);
297-
channel.basicQos(1);
286+
channel.basicQos(1, true);
298287
drain(c, 0);
299288
ack(d, true);
300289
drain(c, 1);
@@ -305,7 +294,7 @@ public void testLimitedToUnlimited()
305294
{
306295
QueueingConsumer c = new QueueingConsumer(channel);
307296
configure(c, 1, 3);
308-
channel.basicQos(0);
297+
channel.basicQos(0, true);
309298
drain(c, 2);
310299
}
311300

@@ -320,8 +309,8 @@ public void testLimitingMultipleChannels()
320309
String q2 = declareBindConsume(ch2, c2, false);
321310
ch1.basicConsume(q2, false, c1);
322311
ch2.basicConsume(q1, false, c2);
323-
ch1.basicQos(1);
324-
ch2.basicQos(1);
312+
ch1.basicQos(1, true);
313+
ch2.basicQos(1, true);
325314
fill(5);
326315
Queue<Delivery> d1 = drain(c1, 1);
327316
Queue<Delivery> d2 = drain(c2, 1);
@@ -340,13 +329,13 @@ public void testLimitInheritsUnackedCount()
340329
declareBindConsume(c);
341330
fill(1);
342331
drain(c, 1);
343-
channel.basicQos(2);
332+
channel.basicQos(2, true);
344333
fill(2);
345334
drain(c, 1);
346335
}
347336

348337
public void testRecoverReducesLimit() throws Exception {
349-
channel.basicQos(2);
338+
channel.basicQos(2, true);
350339
QueueingConsumer c = new QueueingConsumer(channel);
351340
declareBindConsume(c);
352341
fill(3);
@@ -427,7 +416,7 @@ protected List<String> configure(QueueingConsumer c,
427416
int messages)
428417
throws IOException
429418
{
430-
channel.basicQos(limit);
419+
channel.basicQos(limit, true);
431420

432421
//declare/bind/consume-from queues
433422
List <String> queues = new ArrayList<String>();
@@ -446,7 +435,7 @@ protected Queue<Delivery> configure(QueueingConsumer c,
446435
int messages)
447436
throws IOException
448437
{
449-
channel.basicQos(limit);
438+
channel.basicQos(limit, true);
450439
declareBindConsume(c);
451440
fill(messages);
452441
return drain(c, limit);

0 commit comments

Comments
 (0)