Skip to content

Commit d9b2d2f

Browse files
author
Simon MacMullen
committed
Simple tests. The QosTests class is surprisingly unhelpful here, almost all of it is to do with how qos needs to be shared, survive consumers starting and stopping, etc.
1 parent 5afe4ef commit d9b2d2f

File tree

3 files changed

+69
-2
lines changed

3 files changed

+69
-2
lines changed

test/src/com/rabbitmq/client/test/functional/FunctionalTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,5 +75,6 @@ public static void add(TestSuite suite) {
7575
suite.addTestSuite(HeadersExchangeValidation.class);
7676
suite.addTestSuite(ConsumerPriorities.class);
7777
suite.addTestSuite(Policies.class);
78+
suite.addTestSuite(PerConsumerPrefetch.class);
7879
}
7980
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
package com.rabbitmq.client.test.functional;
2+
3+
import com.rabbitmq.client.QueueingConsumer;
4+
import com.rabbitmq.client.QueueingConsumer.Delivery;
5+
import com.rabbitmq.client.test.BrokerTestCase;
6+
7+
import java.io.IOException;
8+
import java.util.HashMap;
9+
import java.util.Map;
10+
import java.util.Queue;
11+
12+
import static com.rabbitmq.client.test.functional.QosTests.drain;
13+
14+
public class PerConsumerPrefetch extends BrokerTestCase {
15+
private String q;
16+
17+
@Override
18+
protected void createResources() throws IOException {
19+
q = channel.queueDeclare().getQueue();
20+
}
21+
22+
public void testPrefetch() throws IOException {
23+
QueueingConsumer c = new QueueingConsumer(channel);
24+
publish(q, 5);
25+
consume(c, false);
26+
Queue<Delivery> dels = drain(c, 2);
27+
for (Delivery del : dels) {
28+
ack(del, false);
29+
}
30+
Delivery last = drain(c, 2).getLast();
31+
ack(last, true);
32+
drain(c, 1);
33+
publish(q, 5);
34+
drain(c, 1);
35+
}
36+
37+
public void testAutoAckIgnoresPrefetch() throws IOException {
38+
QueueingConsumer c = new QueueingConsumer(channel);
39+
publish(q, 10);
40+
consume(c, true);
41+
drain(c, 10);
42+
}
43+
44+
private void publish(String q, int n) throws IOException {
45+
for (int i = 0; i < n; i++) {
46+
channel.basicPublish("", q, null, "".getBytes());
47+
}
48+
}
49+
50+
private void consume(QueueingConsumer c, boolean autoAck) throws IOException {
51+
channel.basicConsume(q, autoAck, "", false, false, args(2), c);
52+
}
53+
54+
private void ack(Delivery del, boolean multi) throws IOException {
55+
channel.basicAck(del.getEnvelope().getDeliveryTag(), multi);
56+
}
57+
58+
private Map<String, Object> args(int prefetch) {
59+
Map<String, Object> a = new HashMap<String, Object>();
60+
if (prefetch != 0) {
61+
a.put("x-prefetch", prefetch);
62+
}
63+
return a;
64+
}
65+
}

test/src/com/rabbitmq/client/test/functional/QosTests.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.ArrayList;
2323
import java.util.Arrays;
2424
import java.util.Collections;
25+
import java.util.Deque;
2526
import java.util.HashMap;
2627
import java.util.LinkedList;
2728
import java.util.List;
@@ -65,10 +66,10 @@ public void fill(int n)
6566
* receive n messages - check that we receive no fewer and cannot
6667
* receive more
6768
**/
68-
public Queue<Delivery> drain(QueueingConsumer c, int n)
69+
public static Deque<Delivery> drain(QueueingConsumer c, int n)
6970
throws IOException
7071
{
71-
Queue<Delivery> res = new LinkedList<Delivery>();
72+
Deque<Delivery> res = new LinkedList<Delivery>();
7273
try {
7374
long start = System.currentTimeMillis();
7475
for (int i = 0; i < n; i++) {

0 commit comments

Comments
 (0)