Skip to content

Commit fac60a6

Browse files
committed
Merged 19219 into default
2 parents 0ecb708 + 587a8fb commit fac60a6

File tree

1 file changed

+179
-0
lines changed

1 file changed

+179
-0
lines changed
Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
// The contents of this file are subject to the Mozilla Public License
2+
// Version 1.1 (the "License"); you may not use this file except in
3+
// compliance with the License. You may obtain a copy of the License at
4+
// http://www.mozilla.org/MPL/
5+
//
6+
// Software distributed under the License is distributed on an "AS IS"
7+
// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
8+
// License for the specific language governing rights and limitations
9+
// under the License.
10+
//
11+
// The Original Code is RabbitMQ.
12+
//
13+
// The Initial Developers of the Original Code are LShift Ltd.,
14+
// Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd.
15+
//
16+
// Portions created by LShift Ltd., Cohesive Financial Technologies
17+
// LLC., and Rabbit Technologies Ltd. are Copyright (C) 2007-2008
18+
// LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit
19+
// Technologies Ltd.;
20+
//
21+
// All Rights Reserved.
22+
//
23+
// Contributor(s): ______________________________________.
24+
//
25+
package com.rabbitmq.client.test;
26+
27+
import java.io.IOException;
28+
import java.util.concurrent.CountDownLatch;
29+
import java.util.concurrent.Semaphore;
30+
31+
import junit.framework.TestCase;
32+
import junit.framework.TestSuite;
33+
34+
import com.rabbitmq.client.AMQP;
35+
import com.rabbitmq.client.Channel;
36+
import com.rabbitmq.client.Connection;
37+
import com.rabbitmq.client.Consumer;
38+
import com.rabbitmq.client.DefaultConsumer;
39+
import com.rabbitmq.client.MessageProperties;
40+
import com.rabbitmq.client.ShutdownSignalException;
41+
42+
import com.rabbitmq.client.test.functional.BrokerTestCase;
43+
44+
/**
45+
* Test for bug 19219 - timeouts due to task parallelism in channel
46+
* closure code.
47+
*/
48+
public class Bug19219Test extends BrokerTestCase {
49+
50+
/*
51+
These settings require careful tuning. Depending on them we get
52+
one of three outcomes:
53+
1) this program terminates normally
54+
2) the broker runs out of memory and eventually dies, and the
55+
this program barfs and/or dies
56+
3) there are lots of timeout errors in the broker log
57+
58+
It's the last case we are interested in.
59+
60+
The settings below work on tanto against default.
61+
*/
62+
private static final int Q_COUNT = 1500;
63+
private static final int PUB_THREAD_COUNT = 100;
64+
private static final int CLOSE_DELAY = 2000;
65+
66+
private static final Semaphore init = new Semaphore(0);
67+
private static final CountDownLatch resume = new CountDownLatch(1);
68+
69+
@Override protected void setUp() throws Exception {
70+
super.setUp();
71+
openConnection();
72+
openChannel();
73+
}
74+
75+
@Override protected void tearDown() throws Exception {
76+
closeChannel();
77+
closeConnection();
78+
super.tearDown();
79+
}
80+
81+
public static TestSuite suite() {
82+
TestSuite suite = new TestSuite("Bug19219");
83+
suite.addTestSuite(Bug19219Test.class);
84+
return suite;
85+
}
86+
87+
private static void publish(final Channel ch,
88+
final int ticket)
89+
throws IOException {
90+
ch.basicPublish(ticket, "amq.fanout", "",
91+
MessageProperties.PERSISTENT_TEXT_PLAIN,
92+
new byte[0]);
93+
}
94+
95+
public void testIt() throws IOException, InterruptedException {
96+
97+
final Consumer c = new DefaultConsumer(channel);
98+
99+
//1. create lots of auto-delete queues, bind them to the
100+
//amq.fanout exchange, and set up a non-auto-ack consumer for
101+
//each.
102+
for (int i = 0; i < Q_COUNT; i++) {
103+
String qName = channel.queueDeclare(ticket).getQueue();
104+
channel.queueBind(ticket, qName, "amq.fanout", "");
105+
channel.basicConsume(ticket, qName, false, c);
106+
}
107+
108+
//2. send lots of messages in background, to keep the server,
109+
//and especially the queues, busy
110+
final Runnable r = new Runnable() {
111+
public void run() {
112+
try {
113+
startPublisher();
114+
} catch (IOException e) {
115+
} catch (InterruptedException e) {
116+
}
117+
}
118+
};
119+
120+
for (int i = 0; i < PUB_THREAD_COUNT; i++) {
121+
final Thread t = new Thread(r);
122+
t.start();
123+
//wait for thread to finish initialisation
124+
init.acquire();
125+
}
126+
127+
//tell all threads to resume
128+
resume.countDown();
129+
130+
//wait for threads to get into full swing
131+
Thread.sleep(CLOSE_DELAY);
132+
133+
//3. close channel. This will result in the server notifying
134+
//all the queues in parallel, which in turn will requeue all
135+
//the messages. The combined workload may result in some
136+
//notifications timing out.
137+
boolean success = false;
138+
try {
139+
channel.close(AMQP.REPLY_SUCCESS, "bye");
140+
success = true;
141+
} catch (ShutdownSignalException e) {
142+
} finally {
143+
//We deliberately do not perform a clean shutdown of all
144+
//the connections. This test is pushing the server really
145+
//hard, so we chose the quickest way to end things.
146+
channel = null;
147+
connection = null;
148+
149+
assertTrue(success);
150+
}
151+
}
152+
153+
private void startPublisher() throws IOException, InterruptedException {
154+
155+
final Connection conn = connectionFactory.newConnection("localhost");
156+
final Channel pubCh = conn.createChannel();
157+
final int pubTicket = pubCh.accessRequest("/data");
158+
159+
//This forces the initialisation of the guid generation, which
160+
//is an interaction with the persister and not something we
161+
//want to see delay things.
162+
publish(pubCh, pubTicket);
163+
164+
//a synchronous request, to make sure the publish is done
165+
pubCh.accessRequest("/data");
166+
167+
//signal the main thread
168+
init.release();
169+
//wait for main thread to let us resume
170+
resume.await();
171+
172+
//publish lots of messages
173+
while(true) {
174+
publish(pubCh, pubTicket);
175+
}
176+
177+
}
178+
179+
}

0 commit comments

Comments
 (0)