Skip to content

Commit 9917b24

Browse files
author
Matthias Radestock
committed
Test to try and provoke timeouts in rabbit_channel:notify_queues
This is extremely fragile, but the best I could come up with after a lot of experimentation. It turns out that the server, and in particular the queues, need to be extremely busy.
1 parent d52a1a5 commit 9917b24

File tree

1 file changed

+181
-0
lines changed

1 file changed

+181
-0
lines changed
Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
// The contents of this file are subject to the Mozilla Public License
2+
// Version 1.1 (the "License"); you may not use this file except in
3+
// compliance with the License. You may obtain a copy of the License at
4+
// http://www.mozilla.org/MPL/
5+
//
6+
// Software distributed under the License is distributed on an "AS IS"
7+
// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
8+
// License for the specific language governing rights and limitations
9+
// under the License.
10+
//
11+
// The Original Code is RabbitMQ.
12+
//
13+
// The Initial Developers of the Original Code are LShift Ltd.,
14+
// Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd.
15+
//
16+
// Portions created by LShift Ltd., Cohesive Financial Technologies
17+
// LLC., and Rabbit Technologies Ltd. are Copyright (C) 2007-2008
18+
// LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit
19+
// Technologies Ltd.;
20+
//
21+
// All Rights Reserved.
22+
//
23+
// Contributor(s): ______________________________________.
24+
//
25+
package com.rabbitmq.client.test;
26+
27+
import java.io.IOException;
28+
import java.util.concurrent.CountDownLatch;
29+
import java.util.concurrent.CyclicBarrier;
30+
import java.util.concurrent.BrokenBarrierException;
31+
32+
import junit.framework.TestCase;
33+
import junit.framework.TestSuite;
34+
35+
import com.rabbitmq.client.AMQP;
36+
import com.rabbitmq.client.Channel;
37+
import com.rabbitmq.client.Connection;
38+
import com.rabbitmq.client.Consumer;
39+
import com.rabbitmq.client.DefaultConsumer;
40+
import com.rabbitmq.client.MessageProperties;
41+
42+
import com.rabbitmq.client.test.functional.BrokerTestCase;
43+
44+
/**
45+
* Test for bug 19219 - timeouts due to task parallelism in channel
46+
* closure code.
47+
*/
48+
public class Bug19219Test extends BrokerTestCase {
49+
50+
/*
51+
These settings require careful tuning. Depending on them we get
52+
one of three outcomes:
53+
1) this program terminates normally
54+
2) the broker runs out of memory and eventually dies, and the
55+
this program barfs and/or dies
56+
3) there are lots of timeout errors in the broker log
57+
58+
It's the last case we are interested in.
59+
60+
The settings below work on tanto against default.
61+
*/
62+
private static final int Q_COUNT = 1500;
63+
private static final int PUB_THREAD_COUNT = 100;
64+
private static final int CLOSE_DELAY = 10;
65+
66+
@Override protected void setUp() throws Exception {
67+
super.setUp();
68+
openConnection();
69+
openChannel();
70+
}
71+
72+
@Override protected void tearDown() throws Exception {
73+
closeChannel();
74+
closeConnection();
75+
super.tearDown();
76+
}
77+
78+
public static TestSuite suite() {
79+
TestSuite suite = new TestSuite("Bug19219");
80+
suite.addTestSuite(Bug19219Test.class);
81+
return suite;
82+
}
83+
84+
private static void publish(final Channel ch,
85+
final int ticket)
86+
throws IOException {
87+
ch.basicPublish(ticket, "amq.fanout", "",
88+
MessageProperties.PERSISTENT_TEXT_PLAIN,
89+
new byte[0]);
90+
}
91+
92+
/*
93+
public void testIt() {
94+
95+
try {
96+
helper();
97+
} catch (Exception e) {
98+
System.out.println("FAILED!!!\n" + e);
99+
}
100+
}
101+
*/
102+
103+
public void testIt()
104+
throws IOException, InterruptedException, BrokenBarrierException {
105+
106+
final Consumer c = new DefaultConsumer(channel);
107+
108+
//1. create lots of auto-delete queues, bind them to the
109+
//amq.fanout exchange, and set up a non-auto-ack consumer for
110+
//each.
111+
for (int i = 0; i < Q_COUNT; i++) {
112+
String qName = channel.queueDeclare(ticket).getQueue();
113+
channel.queueBind(ticket, qName, "amq.fanout", "");
114+
channel.basicConsume(ticket, qName, false, c);
115+
}
116+
117+
//2. send lots of messages in background, to keep the server,
118+
//and especially the queues, busy
119+
final CyclicBarrier barrier = new CyclicBarrier(2);
120+
final CountDownLatch latch = new CountDownLatch(1);
121+
final Runnable r = new Runnable() {
122+
public void run() {
123+
try {
124+
startPublisher(barrier, latch);
125+
} catch (IOException e) {
126+
} catch (InterruptedException e) {
127+
} catch (BrokenBarrierException e) {
128+
}
129+
}
130+
};
131+
132+
for (int i = 0; i < PUB_THREAD_COUNT; i++) {
133+
final Thread t = new Thread(r);
134+
t.start();
135+
//wait for thread to finish initialisation
136+
barrier.await();
137+
barrier.reset();
138+
}
139+
140+
//tell all threads to resume
141+
latch.countDown();
142+
143+
//wait for threads to get into full swing
144+
Thread.sleep(CLOSE_DELAY);
145+
146+
//3. close channel. This will result in the server notifying
147+
//all the queues in parallel, which in turn will requeue all
148+
//the messages. The combined workload may result in some
149+
//notifications timing out.
150+
channel.close(AMQP.REPLY_SUCCESS, "bye");
151+
channel = null;
152+
}
153+
154+
private void startPublisher(final CyclicBarrier barrier,
155+
final CountDownLatch latch)
156+
throws IOException, InterruptedException, BrokenBarrierException {
157+
158+
final Connection conn = connectionFactory.newConnection("localhost");
159+
final Channel pubCh = conn.createChannel();
160+
final int pubTicket = pubCh.accessRequest("/data");
161+
162+
//This forces the initialisation of the guid generation, which
163+
//is an interaction with the persister and not something we
164+
//want to see delay things.
165+
publish(pubCh, pubTicket);
166+
167+
//a synchronous request, to make sure the publish is done
168+
pubCh.accessRequest("/data");
169+
170+
//signal the main thread
171+
barrier.await();
172+
//wait for main thread to let us resume
173+
latch.await();
174+
175+
//publish lots of messages
176+
for (;;) {
177+
publish(pubCh, pubTicket);
178+
}
179+
}
180+
181+
}

0 commit comments

Comments
 (0)