2121import static org .junit .jupiter .api .Assertions .assertNull ;
2222import static org .junit .jupiter .api .Assertions .fail ;
2323
24+ import com .rabbitmq .client .test .TestUtils ;
2425import com .rabbitmq .client .test .TestUtils .BrokerVersion ;
2526import com .rabbitmq .client .test .TestUtils .BrokerVersionAtLeast ;
2627import java .io .IOException ;
28+ import java .time .Duration ;
2729import java .util .ArrayList ;
2830import java .util .HashMap ;
2931import java .util .List ;
@@ -47,6 +49,7 @@ public class Routing extends BrokerTestCase
4749 protected final String Q2 = "bar" ;
4850
4951 private volatile BlockingCell <Integer > returnCell ;
52+ private static final int TIMEOUT = (int ) Duration .ofSeconds (10 ).toMillis ();
5053
5154 protected void createResources () throws IOException {
5255 channel .exchangeDeclare (E , "direct" );
@@ -289,13 +292,13 @@ public void headersWithXRouting() throws Exception {
289292 checkGet (Q2 , false );
290293 }
291294
292- @ Test public void basicReturn () throws IOException {
295+ @ Test public void basicReturn () throws Exception {
293296 channel .addReturnListener (makeReturnListener ());
294297 returnCell = new BlockingCell <Integer >();
295298
296299 //returned 'mandatory' publish
297300 channel .basicPublish ("" , "unknown" , true , false , null , "mandatory1" .getBytes ());
298- checkReturn (AMQP . NO_ROUTE );
301+ checkReturn ();
299302
300303 //routed 'mandatory' publish
301304 channel .basicPublish ("" , Q1 , true , false , null , "mandatory2" .getBytes ());
@@ -313,7 +316,7 @@ public void headersWithXRouting() throws Exception {
313316 }
314317 }
315318
316- @ Test public void basicReturnTransactional () throws IOException {
319+ @ Test public void basicReturnTransactional () throws Exception {
317320 channel .txSelect ();
318321 channel .addReturnListener (makeReturnListener ());
319322 returnCell = new BlockingCell <Integer >();
@@ -325,39 +328,31 @@ public void headersWithXRouting() throws Exception {
325328 fail ("basic.return issued prior to tx.commit" );
326329 } catch (TimeoutException toe ) {}
327330 channel .txCommit ();
328- checkReturn (AMQP . NO_ROUTE );
331+ checkReturn ();
329332
330333 //routed 'mandatory' publish
331334 channel .basicPublish ("" , Q1 , true , false , null , "mandatory2" .getBytes ());
332335 channel .txCommit ();
333336 assertNotNull (channel .basicGet (Q1 , true ));
334337
335- //returned 'mandatory' publish when message is routable on
336- //publish but not on commit
337- channel .basicPublish ("" , Q1 , true , false , null , "mandatory2" .getBytes ());
338- channel .queueDelete (Q1 );
339- channel .txCommit ();
340- checkReturn (AMQP .NO_ROUTE );
341- channel .queueDeclare (Q1 , false , false , false , null );
338+ if (TestUtils .atMost312 (connection )) {
339+ //returned 'mandatory' publish when message is routable on
340+ //publish but not on commit
341+ channel .basicPublish ("" , Q1 , true , false , null , "mandatory2" .getBytes ());
342+ channel .queueDelete (Q1 );
343+ channel .txCommit ();
344+ checkReturn ();
345+ channel .queueDeclare (Q1 , false , false , false , null );
346+ }
342347 }
343348
344349 protected ReturnListener makeReturnListener () {
345- return new ReturnListener () {
346- public void handleReturn (int replyCode ,
347- String replyText ,
348- String exchange ,
349- String routingKey ,
350- AMQP .BasicProperties properties ,
351- byte [] body )
352- throws IOException {
353- Routing .this .returnCell .set (replyCode );
354- }
355- };
350+ return (replyCode , replyText , exchange , routingKey , properties , body ) -> Routing .this .returnCell .set (replyCode );
356351 }
357352
358- protected void checkReturn (int replyCode ) {
359- assertEquals ((int )returnCell .uninterruptibleGet (), AMQP .NO_ROUTE );
360- returnCell = new BlockingCell <Integer >();
353+ protected void checkReturn () throws TimeoutException {
354+ assertEquals ((int )returnCell .uninterruptibleGet (TIMEOUT ), AMQP .NO_ROUTE );
355+ returnCell = new BlockingCell <>();
361356 }
362357
363358}
0 commit comments