2626
2727import java .io .IOException ;
2828import java .util .concurrent .CountDownLatch ;
29- import java .util .concurrent .CyclicBarrier ;
30- import java .util .concurrent .BrokenBarrierException ;
29+ import java .util .concurrent .Semaphore ;
3130
3231import junit .framework .TestCase ;
3332import junit .framework .TestSuite ;
3837import com .rabbitmq .client .Consumer ;
3938import com .rabbitmq .client .DefaultConsumer ;
4039import com .rabbitmq .client .MessageProperties ;
40+ import com .rabbitmq .client .ShutdownSignalException ;
4141
4242import com .rabbitmq .client .test .functional .BrokerTestCase ;
4343
@@ -61,7 +61,10 @@ public class Bug19219Test extends BrokerTestCase {
6161 */
6262 private static final int Q_COUNT = 1500 ;
6363 private static final int PUB_THREAD_COUNT = 100 ;
64- private static final int CLOSE_DELAY = 10 ;
64+ private static final int CLOSE_DELAY = 2000 ;
65+
66+ private static final Semaphore init = new Semaphore (0 );
67+ private static final CountDownLatch resume = new CountDownLatch (1 );
6568
6669 @ Override protected void setUp () throws Exception {
6770 super .setUp ();
@@ -89,19 +92,7 @@ private static void publish(final Channel ch,
8992 new byte [0 ]);
9093 }
9194
92- /*
93- public void testIt() {
94-
95- try {
96- helper();
97- } catch (Exception e) {
98- System.out.println("FAILED!!!\n" + e);
99- }
100- }
101- */
102-
103- public void testIt ()
104- throws IOException , InterruptedException , BrokenBarrierException {
95+ public void testIt () throws IOException , InterruptedException {
10596
10697 final Consumer c = new DefaultConsumer (channel );
10798
@@ -116,15 +107,12 @@ public void testIt()
116107
117108 //2. send lots of messages in background, to keep the server,
118109 //and especially the queues, busy
119- final CyclicBarrier barrier = new CyclicBarrier (2 );
120- final CountDownLatch latch = new CountDownLatch (1 );
121110 final Runnable r = new Runnable () {
122111 public void run () {
123112 try {
124- startPublisher (barrier , latch );
113+ startPublisher ();
125114 } catch (IOException e ) {
126115 } catch (InterruptedException e ) {
127- } catch (BrokenBarrierException e ) {
128116 }
129117 }
130118 };
@@ -133,12 +121,11 @@ public void run() {
133121 final Thread t = new Thread (r );
134122 t .start ();
135123 //wait for thread to finish initialisation
136- barrier .await ();
137- barrier .reset ();
124+ init .acquire ();
138125 }
139126
140127 //tell all threads to resume
141- latch .countDown ();
128+ resume .countDown ();
142129
143130 //wait for threads to get into full swing
144131 Thread .sleep (CLOSE_DELAY );
@@ -147,13 +134,23 @@ public void run() {
147134 //all the queues in parallel, which in turn will requeue all
148135 //the messages. The combined workload may result in some
149136 //notifications timing out.
150- channel .close (AMQP .REPLY_SUCCESS , "bye" );
151- channel = null ;
137+ boolean success = false ;
138+ try {
139+ channel .close (AMQP .REPLY_SUCCESS , "bye" );
140+ success = true ;
141+ } catch (ShutdownSignalException e ) {
142+ } finally {
143+ //We deliberately do not perform a clean shutdown of all
144+ //the connections. This test is pushing the server really
145+ //hard, so we chose the quickest way to end things.
146+ channel = null ;
147+ connection = null ;
148+
149+ assertTrue (success );
150+ }
152151 }
153152
154- private void startPublisher (final CyclicBarrier barrier ,
155- final CountDownLatch latch )
156- throws IOException , InterruptedException , BrokenBarrierException {
153+ private void startPublisher () throws IOException , InterruptedException {
157154
158155 final Connection conn = connectionFactory .newConnection ("localhost" );
159156 final Channel pubCh = conn .createChannel ();
@@ -168,14 +165,15 @@ private void startPublisher(final CyclicBarrier barrier,
168165 pubCh .accessRequest ("/data" );
169166
170167 //signal the main thread
171- barrier . await ();
168+ init . release ();
172169 //wait for main thread to let us resume
173- latch .await ();
170+ resume .await ();
174171
175172 //publish lots of messages
176- for (;; ) {
173+ while ( true ) {
177174 publish (pubCh , pubTicket );
178175 }
176+
179177 }
180178
181179}
0 commit comments