Skip to content

Commit 89eca76

Browse files
author
Emile Joubert
committed
Merged bug25749 into default
2 parents d9cd708 + e6a4e1a commit 89eca76

File tree

2 files changed

+224
-0
lines changed

2 files changed

+224
-0
lines changed

test/src/com/rabbitmq/client/test/functional/FunctionalTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,5 +74,6 @@ public static void add(TestSuite suite) {
7474
suite.addTestSuite(WorkPoolTests.class);
7575
suite.addTestSuite(HeadersExchangeValidation.class);
7676
suite.addTestSuite(ConsumerPriorities.class);
77+
suite.addTestSuite(Policies.class);
7778
}
7879
}
Lines changed: 223 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,223 @@
1+
// The contents of this file are subject to the Mozilla Public License
2+
// Version 1.1 (the "License"); you may not use this file except in
3+
// compliance with the License. You may obtain a copy of the License
4+
// at http://www.mozilla.org/MPL/
5+
//
6+
// Software distributed under the License is distributed on an "AS IS"
7+
// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
8+
// the License for the specific language governing rights and
9+
// limitations under the License.
10+
//
11+
// The Original Code is RabbitMQ.
12+
//
13+
// The Initial Developer of the Original Code is GoPivotal, Inc.
14+
// Copyright (c) 2007-2013 GoPivotal, Inc. All rights reserved.
15+
//
16+
17+
package com.rabbitmq.client.test.functional;
18+
19+
import com.rabbitmq.client.Channel;
20+
import com.rabbitmq.client.GetResponse;
21+
import com.rabbitmq.client.test.BrokerTestCase;
22+
import com.rabbitmq.tools.Host;
23+
24+
import java.io.IOException;
25+
import java.util.HashMap;
26+
import java.util.HashSet;
27+
import java.util.Map;
28+
import java.util.Set;
29+
30+
public class Policies extends BrokerTestCase {
31+
private static final int DELAY = 10; // MILLIS
32+
33+
@Override protected void createResources() throws IOException {
34+
setPolicy("AE", "^has-ae", "{\\\"alternate-exchange\\\":\\\"ae\\\"}");
35+
setPolicy("DLX", "^has-dlx", "{\\\"dead-letter-exchange\\\":\\\"dlx\\\"\\,\\\"dead-letter-routing-key\\\":\\\"rk\\\"}");
36+
setPolicy("TTL", "^has-ttl", "{\\\"message-ttl\\\":" + DELAY + "}");
37+
setPolicy("Expires", "^has-expires", "{\\\"expires\\\":" + DELAY + "}");
38+
setPolicy("MaxLength", "^has-max-length", "{\\\"max-length\\\":1}");
39+
channel.exchangeDeclare("has-ae", "fanout");
40+
Map<String, Object> args = new HashMap<String, Object>();
41+
args.put("alternate-exchange", "ae2");
42+
channel.exchangeDeclare("has-ae-args", "fanout", false, false, args);
43+
}
44+
45+
public void testAlternateExchange() throws IOException {
46+
String q = declareQueue();
47+
channel.exchangeDeclare("ae", "fanout", false, true, null);
48+
channel.queueBind(q, "ae", "");
49+
basicPublishVolatile("has-ae", "");
50+
assertDelivered(q, 1);
51+
clearPolicies();
52+
53+
basicPublishVolatile("has-ae", "");
54+
assertDelivered(q, 0);
55+
}
56+
57+
// i.e. the argument takes priority over the policy
58+
public void testAlternateExchangeArgs() throws IOException {
59+
String q = declareQueue();
60+
channel.exchangeDeclare("ae2", "fanout", false, true, null);
61+
channel.queueBind(q, "ae2", "");
62+
basicPublishVolatile("has-ae-args", "");
63+
assertDelivered(q, 1);
64+
}
65+
66+
public void testDeadLetterExchange() throws IOException, InterruptedException {
67+
Map<String, Object> args = ttlArgs(0);
68+
String src = declareQueue("has-dlx", args);
69+
String dest = declareQueue();
70+
channel.exchangeDeclare("dlx", "fanout", false, true, null);
71+
channel.queueBind(dest, "dlx", "");
72+
basicPublishVolatile(src);
73+
Thread.sleep(DELAY);
74+
GetResponse resp = channel.basicGet(dest, true);
75+
assertEquals("rk", resp.getEnvelope().getRoutingKey());
76+
clearPolicies();
77+
78+
basicPublishVolatile(src);
79+
Thread.sleep(DELAY);
80+
assertDelivered(dest, 0);
81+
}
82+
83+
// again the argument takes priority over the policy
84+
public void testDeadLetterExchangeArgs() throws IOException, InterruptedException {
85+
Map<String, Object> args = ttlArgs(0);
86+
args.put("x-dead-letter-exchange", "dlx2");
87+
args.put("x-dead-letter-routing-key", "rk2");
88+
String src = declareQueue("has-dlx-args", args);
89+
String dest = declareQueue();
90+
channel.exchangeDeclare("dlx2", "fanout", false, true, null);
91+
channel.queueBind(dest, "dlx2", "");
92+
basicPublishVolatile(src);
93+
Thread.sleep(DELAY);
94+
GetResponse resp = channel.basicGet(dest, true);
95+
assertEquals("rk2", resp.getEnvelope().getRoutingKey());
96+
}
97+
98+
public void testTTL() throws IOException, InterruptedException {
99+
String q = declareQueue("has-ttl", null);
100+
basicPublishVolatile(q);
101+
Thread.sleep(2 * DELAY);
102+
assertDelivered(q, 0);
103+
clearPolicies();
104+
105+
basicPublishVolatile(q);
106+
Thread.sleep(2 * DELAY);
107+
assertDelivered(q, 1);
108+
}
109+
110+
// Test that we get lower of args and policy
111+
public void testTTLArgs() throws IOException, InterruptedException {
112+
String q = declareQueue("has-ttl", ttlArgs(3 * DELAY));
113+
basicPublishVolatile(q);
114+
Thread.sleep(2 * DELAY);
115+
assertDelivered(q, 0);
116+
clearPolicies();
117+
118+
basicPublishVolatile(q);
119+
Thread.sleep(2 * DELAY);
120+
assertDelivered(q, 1);
121+
basicPublishVolatile(q);
122+
Thread.sleep(4 * DELAY);
123+
assertDelivered(q, 0);
124+
}
125+
126+
public void testExpires() throws IOException, InterruptedException {
127+
String q = declareQueue("has-expires", null);
128+
Thread.sleep(2 * DELAY);
129+
assertFalse(queueExists(q));
130+
clearPolicies();
131+
132+
q = declareQueue("has-expires", null);
133+
Thread.sleep(2 * DELAY);
134+
assertTrue(queueExists(q));
135+
}
136+
137+
// Test that we get lower of args and policy
138+
public void testExpiresArgs() throws IOException, InterruptedException {
139+
String q = declareQueue("has-expires", args("x-expires", 3 * DELAY));
140+
Thread.sleep(2 * DELAY);
141+
assertFalse(queueExists(q));
142+
clearPolicies();
143+
144+
q = declareQueue("has-expires", args("x-expires", 3 * DELAY));
145+
Thread.sleep(2 * DELAY);
146+
assertTrue(queueExists(q));
147+
}
148+
149+
public void testMaxLength() throws IOException, InterruptedException {
150+
String q = declareQueue("has-max-length", null);
151+
basicPublishVolatileN(q, 3);
152+
assertDelivered(q, 1);
153+
clearPolicies();
154+
155+
basicPublishVolatileN(q, 3);
156+
assertDelivered(q, 3);
157+
}
158+
159+
public void testMaxLengthArgs() throws IOException, InterruptedException {
160+
String q = declareQueue("has-max-length", args("x-max-length", 2));
161+
basicPublishVolatileN(q, 3);
162+
assertDelivered(q, 1);
163+
clearPolicies();
164+
165+
basicPublishVolatileN(q, 3);
166+
assertDelivered(q, 2);
167+
}
168+
169+
@Override protected void releaseResources() throws IOException {
170+
clearPolicies();
171+
channel.exchangeDelete("has-ae");
172+
channel.exchangeDelete("has-ae-args");
173+
}
174+
175+
private Set<String> policies = new HashSet<String>();
176+
177+
private void setPolicy(String name, String pattern, String definition) throws IOException {
178+
Host.rabbitmqctl("set_policy " + name + " " + pattern + " " + definition);
179+
policies.add(name);
180+
}
181+
182+
private void clearPolicies() throws IOException {
183+
for (String policy : policies) {
184+
Host.rabbitmqctl("clear_policy " + policy);
185+
}
186+
policies.clear();
187+
}
188+
189+
private Map<String, Object> ttlArgs(int ttl) {
190+
return args("x-message-ttl", ttl);
191+
}
192+
193+
private Map<String, Object> args(String name, Object value) {
194+
Map<String, Object> args = new HashMap<String, Object>();
195+
args.put(name, value);
196+
return args;
197+
}
198+
199+
private String declareQueue() throws IOException {
200+
return channel.queueDeclare().getQueue();
201+
}
202+
203+
private String declareQueue(String name, Map<String, Object> args) throws IOException {
204+
return channel.queueDeclare(name, false, true, false, args).getQueue();
205+
}
206+
207+
private boolean queueExists(String name) throws IOException {
208+
Channel ch2 = connection.createChannel();
209+
try {
210+
ch2.queueDeclarePassive(name);
211+
return true;
212+
} catch (IOException ioe) {
213+
return false;
214+
}
215+
}
216+
217+
private void basicPublishVolatileN(String q, int count) throws IOException {
218+
for (int i = 0; i < count; i++) {
219+
basicPublishVolatile(q);
220+
}
221+
}
222+
223+
}

0 commit comments

Comments
 (0)