Skip to content

Commit c6fff65

Browse files
author
Simon MacMullen
committed
Tests for consumer priorities.
1 parent 938f34c commit c6fff65

File tree

2 files changed

+75
-0
lines changed

2 files changed

+75
-0
lines changed
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
package com.rabbitmq.client.test.functional;
2+
3+
import com.rabbitmq.client.AMQP;
4+
import com.rabbitmq.client.Channel;
5+
import com.rabbitmq.client.MessageProperties;
6+
import com.rabbitmq.client.QueueingConsumer;
7+
import com.rabbitmq.client.test.BrokerTestCase;
8+
9+
import java.io.IOException;
10+
import java.util.Arrays;
11+
import java.util.HashMap;
12+
import java.util.Map;
13+
14+
public class ConsumerPriorities extends BrokerTestCase {
15+
public void testValidation() throws IOException {
16+
assertFailValidation(args("banana"));
17+
assertFailValidation(args(new HashMap()));
18+
assertFailValidation(args(null));
19+
assertFailValidation(args(Arrays.asList(1, 2, 3)));
20+
}
21+
22+
private void assertFailValidation(Map<String, Object> args) throws IOException {
23+
Channel ch = connection.createChannel();
24+
String queue = ch.queueDeclare().getQueue();
25+
try {
26+
ch.basicConsume(queue, true, "", false, false, args, new QueueingConsumer(ch));
27+
fail("Validation should fail for " + args);
28+
} catch (IOException ioe) {
29+
checkShutdownSignal(AMQP.PRECONDITION_FAILED, ioe);
30+
}
31+
}
32+
33+
private static final int COUNT = 10;
34+
35+
public void testConsumerPriorities() throws Exception {
36+
String queue = channel.queueDeclare().getQueue();
37+
QueueingConsumer highConsumer = new QueueingConsumer(channel);
38+
QueueingConsumer medConsumer = new QueueingConsumer(channel);
39+
QueueingConsumer lowConsumer = new QueueingConsumer(channel);
40+
channel.basicConsume(queue, true, "high", false, false, args(1), highConsumer);
41+
channel.basicConsume(queue, true, "med", false, false, null, medConsumer);
42+
channel.basicConsume(queue, true, "low", false, false, args(-1), lowConsumer);
43+
44+
publish(queue, COUNT, "high");
45+
channel.basicCancel("high");
46+
publish(queue, COUNT, "med");
47+
channel.basicCancel("med");
48+
publish(queue, COUNT, "low");
49+
50+
assertContents(highConsumer, COUNT, "high");
51+
assertContents(medConsumer, COUNT, "med");
52+
assertContents(lowConsumer, COUNT, "low");
53+
}
54+
55+
private Map<String, Object> args(Object o) {
56+
Map<String, Object> map = new HashMap<String, Object>();
57+
map.put("x-priority", o);
58+
return map;
59+
}
60+
61+
private void assertContents(QueueingConsumer qc, int count, String msg) throws InterruptedException {
62+
for (int i = 0; i < count; i++) {
63+
QueueingConsumer.Delivery d = qc.nextDelivery();
64+
assertEquals(msg, new String(d.getBody()));
65+
}
66+
assertEquals(null, qc.nextDelivery(0));
67+
}
68+
69+
private void publish(String queue, int count, String msg) throws IOException {
70+
for (int i = 0; i < count; i++) {
71+
channel.basicPublish("", queue, MessageProperties.MINIMAL_BASIC, msg.getBytes());
72+
}
73+
}
74+
}

test/src/com/rabbitmq/client/test/functional/FunctionalTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,5 +73,6 @@ public static void add(TestSuite suite) {
7373
suite.addTestSuite(CcRoutes.class);
7474
suite.addTestSuite(WorkPoolTests.class);
7575
suite.addTestSuite(HeadersExchangeValidation.class);
76+
suite.addTestSuite(ConsumerPriorities.class);
7677
}
7778
}

0 commit comments

Comments
 (0)