Skip to content

Commit 9578a70

Browse files
committed
merge bug18776 into default
2 parents b508437 + 0d539e6 commit 9578a70

File tree

8 files changed

+806
-3
lines changed

8 files changed

+806
-3
lines changed

.hgignore

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
11
syntax: glob
22
*~
3+
*.class
4+
*.dat
5+
*.ipr
6+
*.iws
7+
*.iml
8+
*.dump
39

410
syntax: regexp
511
^build/
Lines changed: 367 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,367 @@
1+
package com.rabbitmq.client.test.functional;
2+
3+
import com.rabbitmq.client.GetResponse;
4+
import com.rabbitmq.client.MessageProperties;
5+
import com.rabbitmq.client.QueueingConsumer;
6+
import com.rabbitmq.tools.Host;
7+
8+
import java.io.IOException;
9+
10+
/**
11+
* This tests whether bindings are created and nuked properly.
12+
*
13+
* TODO: Adjust this test when Queue.Unbind is implemented in the
14+
* server
15+
*/
16+
public class BindingLifecycle extends PersisterRestartBase {
17+
18+
protected static final byte[] payload =
19+
(""+ System.currentTimeMillis()).getBytes();
20+
21+
private static final int N = 1;
22+
23+
protected static final String Q = "Q-" + System.currentTimeMillis();
24+
protected static final String X = "X-" + System.currentTimeMillis();
25+
protected static final String K = "K-" + System.currentTimeMillis();
26+
27+
/**
28+
* Tests whether durable bindings are correctly recovered.
29+
*/
30+
public void testDurableBindingRecovery() throws IOException {
31+
declareDurableTopicExchange(X);
32+
declareAndBindDurableQueue(Q, X, K);
33+
34+
restart();
35+
36+
for (int i = 0; i < N; i++){
37+
basicPublishVolatile(X, K);
38+
}
39+
40+
assertDelivered(Q, N);
41+
42+
deleteQueue(Q);
43+
deleteExchange(X);
44+
}
45+
46+
/**
47+
* This tests whether the bindings attached to a durable exchange
48+
* are correctly blown away when the exhange is nuked.
49+
*
50+
* This complements a unit test for testing non-durable exhanges.
51+
* In that case, an exchange is deleted and you expect any
52+
* bindings hanging to it to be deleted as well. To verify this,
53+
* the exchange is deleted and then recreated.
54+
*
55+
* After the recreation, the old bindings should no longer exist
56+
* and hence any messages published to that exchange get routed to
57+
* /dev/null
58+
*
59+
* This test exercises the durable variable of that test, so the
60+
* main difference is that the broker has to be restarted to
61+
* verify that the durable routes have been turfed.
62+
*/
63+
public void testDurableBindingsDeletion() throws IOException {
64+
declareDurableTopicExchange(X);
65+
declareAndBindDurableQueue(Q, X, K);
66+
67+
deleteExchange(X);
68+
69+
restart();
70+
71+
declareDurableTopicExchange(X);
72+
73+
for (int i = 0; i < N; i++){
74+
basicPublishVolatile(X, K);
75+
}
76+
77+
GetResponse response = channel.basicGet(ticket, Q, true);
78+
assertNull("The initial response SHOULD BE null", response);
79+
80+
deleteQueue(Q);
81+
deleteExchange(X);
82+
}
83+
84+
85+
/**
86+
* This tests whether the default bindings for persistent queues
87+
* are recovered properly.
88+
*
89+
* The idea is to create a durable queue, nuke the server and then
90+
* publish a message to it using the queue name as a routing key
91+
*/
92+
public void testDefaultBindingRecovery() throws IOException {
93+
declareDurableQueue(Q);
94+
95+
restart();
96+
97+
basicPublishVolatile("", Q);
98+
99+
GetResponse response = channel.basicGet(ticket, Q, true);
100+
assertNotNull("The initial response SHOULD NOT be null", response);
101+
102+
deleteQueue(Q);
103+
}
104+
105+
/**
106+
* This tests whether when you delete a queue, that its bindings
107+
* are deleted as well.
108+
*/
109+
public void testQueueDelete() throws IOException {
110+
111+
boolean durable = true;
112+
Binding binding = setupExchangeAndRouteMessage(durable);
113+
114+
// Nuke the queue and repeat this test, this time you expect
115+
// nothing to get routed.
116+
//
117+
// TODO: When unbind is implemented, use that instead of
118+
// deleting and re-creating the queue
119+
channel.queueDelete(ticket, binding.q);
120+
channel.queueDeclare(ticket, binding.q, durable);
121+
122+
sendUnroutable(binding);
123+
124+
deleteExchangeAndQueue(binding);
125+
}
126+
127+
/**
128+
* This tests whether when you delete an exchange, that any
129+
* bindings attached to it are deleted as well.
130+
*/
131+
public void testExchangeDelete() throws IOException {
132+
133+
boolean durable = true;
134+
Binding binding = setupExchangeAndRouteMessage(durable);
135+
136+
// Nuke the exchange and repeat this test, this time you
137+
// expect nothing to get routed
138+
139+
channel.exchangeDelete(ticket, binding.x);
140+
channel.exchangeDeclare(ticket, binding.x, "direct");
141+
142+
sendUnroutable(binding);
143+
144+
channel.queueDelete(ticket, binding.q);
145+
}
146+
147+
/**
148+
* This tests whether the server checks that an exchange is
149+
* actually being used when you try to delete it with the ifunused
150+
* flag.
151+
*
152+
* To test this, you try to delete an exchange with a queue still
153+
* bound to it and expect the delete operation to fail.
154+
*/
155+
public void testExchangeIfUnused() throws IOException {
156+
157+
boolean durable = true;
158+
Binding binding = setupExchangeBindings(durable);
159+
160+
try {
161+
channel.exchangeDelete(ticket, binding.x, true);
162+
}
163+
catch (Exception e) {
164+
// do nothing, this is the correct behaviour
165+
openChannel();
166+
deleteExchangeAndQueue(binding);
167+
return;
168+
}
169+
170+
fail("Exchange delete should have failed");
171+
}
172+
173+
/**
174+
* This tests whether the server checks that an auto_delete
175+
* exchange actually deletes the bindings attached to it when it
176+
* is deleted.
177+
*
178+
* To test this, you declare and auto_delete exchange and bind an
179+
* auto_delete queue to it.
180+
*
181+
* Start a consumer on this queue, send a message, let it get
182+
* consumed and then cancel the consumer
183+
*
184+
* The unsubscribe should cause the queue to auto_delete, which in
185+
* turn should cause the exchange to auto_delete.
186+
*
187+
* Then re-declare the queue again and try to rebind it to the same exhange.
188+
*
189+
* Because the exchange has been auto-deleted, the bind operation
190+
* should fail.
191+
*/
192+
public void testExchangeAutoDelete() throws IOException {
193+
doAutoDelete(false, 1);
194+
}
195+
196+
/**
197+
* Runs something similar to testExchangeAutoDelete, but adds
198+
* different queues with the same binding to the same exchange.
199+
*
200+
* The difference should be that the original exchange should not
201+
* get auto-deleted
202+
*/
203+
public void testExchangeAutoDeleteManyBindings() throws IOException {
204+
doAutoDelete(false, 10);
205+
}
206+
207+
/**
208+
* The same thing as testExchangeAutoDelete, but with durable
209+
* queues.
210+
*
211+
* Main difference is restarting the broker to make sure that the
212+
* durable queues are blasted away.
213+
*/
214+
public void testExchangeAutoDeleteDurable() throws IOException {
215+
doAutoDelete(true, 1);
216+
}
217+
218+
/**
219+
* The same thing as testExchangeAutoDeleteManyBindings, but with
220+
* durable queues.
221+
*/
222+
public void testExchangeAutoDeleteDurableManyBindings() throws IOException {
223+
doAutoDelete(true, 10);
224+
}
225+
226+
private void doAutoDelete(boolean durable, int queues) throws IOException {
227+
228+
String[] queueNames = null;
229+
230+
Binding binding = Binding.randomBinding();
231+
232+
channel.exchangeDeclare(ticket, binding.x, "direct",
233+
false, durable, true, null);
234+
channel.queueDeclare(ticket, binding.q,
235+
false, durable, false, true, null);
236+
channel.queueBind(ticket, binding.q, binding.x, binding.k);
237+
238+
239+
if (queues > 1) {
240+
int j = queues - 1;
241+
queueNames = new String[j];
242+
for (int i = 0 ; i < j ; i++) {
243+
queueNames[i] = randomString();
244+
channel.queueDeclare(ticket, queueNames[i],
245+
false, durable, false, false, null);
246+
channel.queueBind(ticket, queueNames[i],
247+
binding.x, binding.k);
248+
channel.basicConsume(ticket, queueNames[i], true,
249+
new QueueingConsumer(channel));
250+
}
251+
}
252+
253+
subscribeSendUnsubscribe(binding);
254+
255+
if (durable) {
256+
restart();
257+
}
258+
259+
if (queues > 1) {
260+
for (String s : queueNames) {
261+
channel.basicConsume(ticket, s, true,
262+
new QueueingConsumer(channel));
263+
Binding tmp = new Binding(binding.x, s, binding.k);
264+
sendUnroutable(tmp);
265+
}
266+
}
267+
268+
channel.queueDeclare(ticket, binding.q,
269+
false, durable, true, true, null);
270+
271+
// if (queues == 1): Because the exchange does not exist, this
272+
// bind should fail
273+
try {
274+
channel.queueBind(ticket, binding.q, binding.x, binding.k);
275+
sendRoutable(binding);
276+
}
277+
catch (Exception e) {
278+
// do nothing, this is the correct behaviour
279+
channel = null;
280+
return;
281+
}
282+
283+
if (queues == 1) {
284+
deleteExchangeAndQueue(binding);
285+
fail("Queue bind should have failed");
286+
}
287+
288+
289+
// Do some cleanup
290+
if (queues > 1) {
291+
for (String q : queueNames) {
292+
channel.queueDelete(ticket, q);
293+
}
294+
}
295+
296+
}
297+
298+
private void subscribeSendUnsubscribe(Binding binding) throws IOException {
299+
String tag = channel.basicConsume(ticket, binding.q,
300+
new QueueingConsumer(channel));
301+
sendUnroutable(binding);
302+
channel.basicCancel(tag);
303+
}
304+
305+
private void sendUnroutable(Binding binding) throws IOException {
306+
channel.basicPublish(ticket, binding.x, binding.k, null, payload);
307+
GetResponse response = channel.basicGet(ticket, binding.q, true);
308+
assertNull("The response SHOULD BE null", response);
309+
}
310+
311+
private void sendRoutable(Binding binding) throws IOException {
312+
channel.basicPublish(ticket, binding.x, binding.k, null, payload);
313+
GetResponse response = channel.basicGet(ticket, binding.q, true);
314+
assertNotNull("The response should not be null", response);
315+
}
316+
317+
private static String randomString() {
318+
return "-" + System.nanoTime();
319+
}
320+
321+
private static class Binding {
322+
323+
String x, q, k;
324+
325+
static Binding randomBinding() {
326+
return new Binding(randomString(), randomString(), randomString());
327+
}
328+
329+
private Binding(String x, String q, String k) {
330+
this.x = x;
331+
this.q = q;
332+
this.k = k;
333+
}
334+
}
335+
336+
private void createQueueAndBindToExchange(Binding binding, boolean durable)
337+
throws IOException {
338+
339+
channel.exchangeDeclare(ticket, binding.x, "direct", durable);
340+
channel.queueDeclare(ticket, binding.q, durable);
341+
channel.queueBind(ticket, binding.q, binding.x, binding.k);
342+
}
343+
344+
private void deleteExchangeAndQueue(Binding binding)
345+
throws IOException {
346+
347+
channel.queueDelete(ticket, binding.q);
348+
channel.exchangeDelete(ticket, binding.x);
349+
}
350+
351+
private Binding setupExchangeBindings(boolean durable)
352+
throws IOException {
353+
354+
Binding binding = Binding.randomBinding();
355+
createQueueAndBindToExchange(binding, durable);
356+
return binding;
357+
}
358+
359+
private Binding setupExchangeAndRouteMessage(boolean durable)
360+
throws IOException {
361+
362+
Binding binding = setupExchangeBindings(durable);
363+
sendRoutable(binding);
364+
return binding;
365+
}
366+
367+
}

test/src/com/rabbitmq/client/test/functional/FunctionalTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ public class FunctionalTests extends TestCase {
3232
public static TestSuite suite() {
3333
TestSuite suite = new TestSuite("functional");
3434
suite.addTestSuite(Routing.class);
35+
suite.addTestSuite(BindingLifecycle.class);
3536
suite.addTestSuite(Transactions.class);
3637
suite.addTestSuite(PersistentTransactions.class);
3738
suite.addTestSuite(RequeueOnConnectionClose.class);

0 commit comments

Comments
 (0)