2626import com .rabbitmq .client .ShutdownSignalException ;
2727import com .rabbitmq .client .test .BrokerTestCase ;
2828
29-
3029import java .io .IOException ;
3130import java .util .Collections ;
3231import java .util .Map ;
3332import java .util .SortedSet ;
3433import java .util .TreeSet ;
34+ import java .util .concurrent .ExecutionException ;
35+ import java .util .concurrent .Executors ;
36+ import java .util .concurrent .FutureTask ;
37+ import java .util .concurrent .TimeoutException ;
38+ import java .util .concurrent .TimeUnit ;
3539
3640public class Confirm extends BrokerTestCase
3741{
@@ -65,48 +69,49 @@ protected void setUp() throws IOException {
6569 }
6670
6771 public void testTransient ()
68- throws IOException , InterruptedException {
72+ throws IOException , InterruptedException , TimeoutException
73+ {
6974 confirmTest ("" , "confirm-test" , false , false , false );
7075 }
7176
7277 public void testPersistentSimple ()
73- throws IOException , InterruptedException
78+ throws IOException , InterruptedException , TimeoutException
7479 {
7580 confirmTest ("" , "confirm-test" , true , false , false );
7681 }
7782
7883 public void testNonDurable ()
79- throws IOException , InterruptedException
84+ throws IOException , InterruptedException , TimeoutException
8085 {
8186 confirmTest ("" , "confirm-test-nondurable" , true , false , false );
8287 }
8388
8489 public void testPersistentImmediate ()
85- throws IOException , InterruptedException
90+ throws IOException , InterruptedException , TimeoutException
8691 {
8792 confirmTest ("" , "confirm-test" , true , false , true );
8893 }
8994
9095 public void testPersistentImmediateNoConsumer ()
91- throws IOException , InterruptedException
96+ throws IOException , InterruptedException , TimeoutException
9297 {
9398 confirmTest ("" , "confirm-test-noconsumer" , true , false , true );
9499 }
95100
96101 public void testPersistentMandatory ()
97- throws IOException , InterruptedException
102+ throws IOException , InterruptedException , TimeoutException
98103 {
99104 confirmTest ("" , "confirm-test" , true , true , false );
100105 }
101106
102107 public void testPersistentMandatoryReturn ()
103- throws IOException , InterruptedException
108+ throws IOException , InterruptedException , TimeoutException
104109 {
105110 confirmTest ("" , "confirm-test-doesnotexist" , true , true , false );
106111 }
107112
108113 public void testMultipleQueues ()
109- throws IOException , InterruptedException
114+ throws IOException , InterruptedException , TimeoutException
110115 {
111116 confirmTest ("amq.direct" , "confirm-multiple-queues" ,
112117 true , false , false );
@@ -118,43 +123,43 @@ public void testMultipleQueues()
118123 * internal_sync that notifies the clients. */
119124
120125 public void testQueueDelete ()
121- throws IOException , InterruptedException
126+ throws IOException , InterruptedException , TimeoutException
122127 {
123128 publishN ("" ,"confirm-test-noconsumer" , true , false , false );
124129
125130 channel .queueDelete ("confirm-test-noconsumer" );
126131
127- channel . waitForConfirmsOrDie ();
132+ waitForConfirms ();
128133 }
129134
130135 public void testQueuePurge ()
131- throws IOException , InterruptedException
136+ throws IOException , InterruptedException , TimeoutException
132137 {
133138 publishN ("" , "confirm-test-noconsumer" , true , false , false );
134139
135140 channel .queuePurge ("confirm-test-noconsumer" );
136141
137- channel . waitForConfirmsOrDie ();
142+ waitForConfirms ();
138143 }
139144
140145 public void testBasicReject ()
141- throws IOException , InterruptedException
146+ throws IOException , InterruptedException , TimeoutException
142147 {
143148 basicRejectCommon (false );
144149
145- channel . waitForConfirmsOrDie ();
150+ waitForConfirms ();
146151 }
147152
148153 public void testQueueTTL ()
149- throws IOException , InterruptedException
154+ throws IOException , InterruptedException , TimeoutException
150155 {
151156 publishN ("" , "confirm-ttl" , true , false , false );
152157
153- channel . waitForConfirmsOrDie ();
158+ waitForConfirms ();
154159 }
155160
156161 public void testBasicRejectRequeue ()
157- throws IOException , InterruptedException
162+ throws IOException , InterruptedException , TimeoutException
158163 {
159164 basicRejectCommon (true );
160165
@@ -164,11 +169,11 @@ public void testBasicRejectRequeue()
164169 channel .basicConsume ("confirm-test-noconsumer" , true ,
165170 new DefaultConsumer (channel ));
166171
167- channel . waitForConfirmsOrDie ();
172+ waitForConfirms ();
168173 }
169174
170175 public void testBasicRecover ()
171- throws IOException , InterruptedException
176+ throws IOException , InterruptedException , TimeoutException
172177 {
173178 publishN ("" , "confirm-test-noconsumer" , true , false , false );
174179
@@ -186,7 +191,7 @@ public void testBasicRecover()
186191 channel .basicConsume ("confirm-test-noconsumer" , true ,
187192 new DefaultConsumer (channel ));
188193
189- channel . waitForConfirmsOrDie ();
194+ waitForConfirms ();
190195 }
191196
192197 public void testSelect ()
@@ -212,7 +217,7 @@ public void testSelect()
212217 }
213218
214219 public void testWaitForConfirms ()
215- throws IOException , InterruptedException
220+ throws IOException , InterruptedException , TimeoutException
216221 {
217222 final SortedSet <Long > unconfirmedSet =
218223 Collections .synchronizedSortedSet (new TreeSet <Long >());
@@ -238,28 +243,28 @@ public void handleNack(long seqNo, boolean multiple) {
238243 publish ("" , "confirm-test" , true , false , false );
239244 }
240245
241- channel . waitForConfirmsOrDie ();
246+ waitForConfirms ();
242247 if (!unconfirmedSet .isEmpty ()) {
243248 fail ("waitForConfirms returned with unconfirmed messages" );
244249 }
245250 }
246251
247252 public void testWaitForConfirmsNoOp ()
248- throws IOException , InterruptedException
253+ throws IOException , InterruptedException , TimeoutException
249254 {
250255 channel = connection .createChannel ();
251256 // Don't enable Confirm mode
252257 publish ("" , "confirm-test" , true , false , false );
253- channel . waitForConfirmsOrDie (); // Nop
258+ waitForConfirms (); // Nop
254259 }
255260
256261 public void testWaitForConfirmsException ()
257- throws IOException , InterruptedException
262+ throws IOException , InterruptedException , TimeoutException
258263 {
259264 publishN ("" , "confirm-test" , true , false , false );
260265 channel .close ();
261266 try {
262- channel . waitForConfirmsOrDie ();
267+ waitForConfirms ();
263268 fail ("waitAcks worked on a closed channel" );
264269 } catch (ShutdownSignalException sse ) {
265270 if (!(sse .getReason () instanceof AMQP .Channel .Close ))
@@ -274,11 +279,11 @@ public void testWaitForConfirmsException()
274279 public void confirmTest (String exchange , String queueName ,
275280 boolean persistent , boolean mandatory ,
276281 boolean immediate )
277- throws IOException , InterruptedException
282+ throws IOException , InterruptedException , TimeoutException
278283 {
279284 publishN (exchange , queueName , persistent , mandatory , immediate );
280285
281- channel . waitForConfirmsOrDie ();
286+ waitForConfirms ();
282287 }
283288
284289 private void publishN (String exchangeName , String queueName ,
@@ -313,4 +318,26 @@ protected void publish(String exchangeName, String queueName,
313318 : MessageProperties .BASIC ,
314319 "nop" .getBytes ());
315320 }
321+
322+ protected void waitForConfirms ()
323+ throws InterruptedException , TimeoutException
324+ {
325+ try {
326+ FutureTask <?> waiter = new FutureTask <Object >(new Runnable () {
327+ public void run () {
328+ try {
329+ channel .waitForConfirmsOrDie ();
330+ } catch (IOException e ) {
331+ throw (ShutdownSignalException )e .getCause ();
332+ } catch (InterruptedException e ) {
333+ fail ("test interrupted" );
334+ }
335+ }
336+ }, null );
337+ (Executors .newSingleThreadExecutor ()).execute (waiter );
338+ waiter .get (10 , TimeUnit .SECONDS );
339+ } catch (ExecutionException e ) {
340+ throw (ShutdownSignalException )e .getCause ();
341+ }
342+ }
316343}
0 commit comments