@@ -77,9 +77,7 @@ protected void releaseResources() throws IOException {
7777 declareQueue (TEST_QUEUE_NAME , DLX , "routing_key" , null );
7878 }
7979
80- @ Test public void declareQueueWithInvalidDeadLetterExchangeArg ()
81- throws IOException
82- {
80+ @ Test public void declareQueueWithInvalidDeadLetterExchangeArg () {
8381 try {
8482 declareQueue (133 );
8583 fail ("x-dead-letter-exchange must be a valid exchange name" );
@@ -101,9 +99,7 @@ protected void releaseResources() throws IOException {
10199 }
102100 }
103101
104- @ Test public void declareQueueWithInvalidDeadLetterRoutingKeyArg ()
105- throws IOException
106- {
102+ @ Test public void declareQueueWithInvalidDeadLetterRoutingKeyArg () {
107103 try {
108104 declareQueue ("foo" , "amq.direct" , 144 , null );
109105 fail ("x-dead-letter-routing-key must be a string" );
@@ -125,11 +121,9 @@ protected void releaseResources() throws IOException {
125121 }
126122 }
127123
128- @ Test public void declareQueueWithRoutingKeyButNoDeadLetterExchange ()
129- throws IOException
130- {
124+ @ Test public void declareQueueWithRoutingKeyButNoDeadLetterExchange () {
131125 try {
132- Map <String , Object > args = new HashMap <String , Object >();
126+ Map <String , Object > args = new HashMap <>();
133127 args .put (DLX_RK_ARG , "foo" );
134128
135129 channel .queueDeclare (randomQueueName (), false , true , false , args );
@@ -139,11 +133,10 @@ protected void releaseResources() throws IOException {
139133 }
140134 }
141135
142- @ Test public void redeclareQueueWithRoutingKeyButNoDeadLetterExchange ()
143- throws IOException , InterruptedException {
136+ @ Test public void redeclareQueueWithRoutingKeyButNoDeadLetterExchange () {
144137 try {
145138 String queueName = randomQueueName ();
146- Map <String , Object > args = new HashMap <String , Object >();
139+ Map <String , Object > args = new HashMap <>();
147140 channel .queueDeclare (queueName , false , true , false , args );
148141
149142 args .put (DLX_RK_ARG , "foo" );
@@ -165,7 +158,7 @@ protected void releaseResources() throws IOException {
165158 }
166159
167160 @ Test public void deadLetterQueueTTLPromptExpiry () throws Exception {
168- Map <String , Object > args = new HashMap <String , Object >();
161+ Map <String , Object > args = new HashMap <>();
169162 args .put ("x-message-ttl" , TTL );
170163 declareQueue (TEST_QUEUE_NAME , DLX , null , args );
171164 channel .queueBind (TEST_QUEUE_NAME , "amq.direct" , "test" );
@@ -205,7 +198,7 @@ protected void releaseResources() throws IOException {
205198 publishAt (start );
206199 basicGet (TEST_QUEUE_NAME );
207200 // publish a 2nd message and immediately fetch it in ack mode
208- publishAt (start + TTL * 1 / 2 );
201+ publishAt (start + TTL / 2 );
209202 GetResponse r = channel .basicGet (TEST_QUEUE_NAME , false );
210203 // publish a 3rd message
211204 publishAt (start + TTL * 3 / 4 );
@@ -250,20 +243,17 @@ protected void releaseResources() throws IOException {
250243 // the DLQ *AND* should remain there, not getting removed after a subsequent
251244 // wait time > 100ms
252245 sleep (500 );
253- consumeN (DLQ , 1 , new WithResponse () {
254- @ SuppressWarnings ("unchecked" )
255- public void process (GetResponse getResponse ) {
256- assertNull (getResponse .getProps ().getExpiration ());
257- Map <String , Object > headers = getResponse .getProps ().getHeaders ();
258- assertNotNull (headers );
259- ArrayList <Object > death = (ArrayList <Object >)headers .get ("x-death" );
260- assertNotNull (death );
261- assertDeathReason (death , 0 , TEST_QUEUE_NAME , "expired" );
262- final Map <String , Object > deathHeader =
263- (Map <String , Object >)death .get (0 );
264- assertEquals ("100" , deathHeader .get ("original-expiration" ).toString ());
265- }
266- });
246+ consumeN (DLQ , 1 , getResponse -> {
247+ assertNull (getResponse .getProps ().getExpiration ());
248+ Map <String , Object > headers = getResponse .getProps ().getHeaders ();
249+ assertNotNull (headers );
250+ ArrayList <Object > death = (ArrayList <Object >)headers .get ("x-death" );
251+ assertNotNull (death );
252+ assertDeathReason (death , 0 , TEST_QUEUE_NAME , "expired" );
253+ final Map <String , Object > deathHeader =
254+ (Map <String , Object >)death .get (0 );
255+ assertEquals ("100" , deathHeader .get ("original-expiration" ).toString ());
256+ });
267257 }
268258
269259 @ Test public void deadLetterOnReject () throws Exception {
@@ -315,23 +305,20 @@ public void process(GetResponse getResponse) {
315305
316306 // There should now be two copies of each message on DLQ2: one
317307 // with one set of death headers, and another with two sets.
318- consumeN (DLQ2 , MSG_COUNT *2 , new WithResponse () {
319- @ SuppressWarnings ("unchecked" )
320- public void process (GetResponse getResponse ) {
321- Map <String , Object > headers = getResponse .getProps ().getHeaders ();
322- assertNotNull (headers );
323- ArrayList <Object > death = (ArrayList <Object >)headers .get ("x-death" );
324- assertNotNull (death );
325- if (death .size () == 1 ) {
326- assertDeathReason (death , 0 , TEST_QUEUE_NAME , "expired" );
327- } else if (death .size () == 2 ) {
328- assertDeathReason (death , 0 , DLQ , "expired" );
329- assertDeathReason (death , 1 , TEST_QUEUE_NAME , "expired" );
330- } else {
331- fail ("message was dead-lettered more times than expected" );
332- }
333- }
334- });
308+ consumeN (DLQ2 , MSG_COUNT *2 , getResponse -> {
309+ Map <String , Object > headers = getResponse .getProps ().getHeaders ();
310+ assertNotNull (headers );
311+ ArrayList <Object > death = (ArrayList <Object >)headers .get ("x-death" );
312+ assertNotNull (death );
313+ if (death .size () == 1 ) {
314+ assertDeathReason (death , 0 , TEST_QUEUE_NAME , "expired" );
315+ } else if (death .size () == 2 ) {
316+ assertDeathReason (death , 0 , DLQ , "expired" );
317+ assertDeathReason (death , 1 , TEST_QUEUE_NAME , "expired" );
318+ } else {
319+ fail ("message was dead-lettered more times than expected" );
320+ }
321+ });
335322 }
336323
337324 @ Test public void deadLetterSelf () throws Exception {
@@ -379,9 +366,9 @@ public void handleDelivery(String consumerTag, Envelope envelope,
379366 channel .queueBind (DLQ , DLX , "test" );
380367 channel .queueBind (DLQ2 , DLX , "test-other" );
381368
382- Map <String , Object > headers = new HashMap <String , Object >();
383- headers .put ("CC" , Arrays . asList ("foo" ));
384- headers .put ("BCC" , Arrays . asList ("bar" ));
369+ Map <String , Object > headers = new HashMap <>();
370+ headers .put ("CC" , Collections . singletonList ("foo" ));
371+ headers .put ("BCC" , Collections . singletonList ("bar" ));
385372
386373 publishN (MSG_COUNT , (new AMQP .BasicProperties .Builder ())
387374 .headers (headers )
@@ -390,27 +377,24 @@ public void handleDelivery(String consumerTag, Envelope envelope,
390377 sleep (100 );
391378
392379 consumeN (DLQ , 0 , WithResponse .NULL );
393- consumeN (DLQ2 , MSG_COUNT , new WithResponse () {
394- @ SuppressWarnings ("unchecked" )
395- public void process (GetResponse getResponse ) {
396- Map <String , Object > headers = getResponse .getProps ().getHeaders ();
397- assertNotNull (headers );
398- assertNull (headers .get ("CC" ));
399- assertNull (headers .get ("BCC" ));
400-
401- ArrayList <Object > death = (ArrayList <Object >)headers .get ("x-death" );
402- assertNotNull (death );
403- assertEquals (1 , death .size ());
404- assertDeathReason (death , 0 , TEST_QUEUE_NAME ,
405- "expired" , "amq.direct" ,
406- Arrays .asList ("test" , "foo" ));
407- }
408- });
380+ consumeN (DLQ2 , MSG_COUNT , getResponse -> {
381+ Map <String , Object > headers1 = getResponse .getProps ().getHeaders ();
382+ assertNotNull (headers1 );
383+ assertNull (headers1 .get ("CC" ));
384+ assertNull (headers1 .get ("BCC" ));
385+
386+ ArrayList <Object > death = (ArrayList <Object >) headers1 .get ("x-death" );
387+ assertNotNull (death );
388+ assertEquals (1 , death .size ());
389+ assertDeathReason (death , 0 , TEST_QUEUE_NAME ,
390+ "expired" , "amq.direct" ,
391+ Arrays .asList ("test" , "foo" ));
392+ });
409393 }
410394
411395 @ SuppressWarnings ("unchecked" )
412396 @ Test public void republish () throws Exception {
413- Map <String , Object > args = new HashMap <String , Object >();
397+ Map <String , Object > args = new HashMap <>();
414398 args .put ("x-message-ttl" , 100 );
415399 declareQueue (TEST_QUEUE_NAME , DLX , null , args );
416400 channel .queueBind (TEST_QUEUE_NAME , "amq.direct" , "test" );
@@ -430,10 +414,10 @@ public void process(GetResponse getResponse) {
430414 assertNotNull (death );
431415 assertEquals (1 , death .size ());
432416 assertDeathReason (death , 0 , TEST_QUEUE_NAME , "expired" , "amq.direct" ,
433- Arrays . asList ("test" ));
417+ Collections . singletonList ("test" ));
434418
435419 // Make queue zero length
436- args = new HashMap <String , Object >();
420+ args = new HashMap <>();
437421 args .put ("x-max-length" , 0 );
438422 channel .queueDelete (TEST_QUEUE_NAME );
439423 declareQueue (TEST_QUEUE_NAME , DLX , null , args );
@@ -457,9 +441,9 @@ public void process(GetResponse getResponse) {
457441 assertNotNull (death );
458442 assertEquals (2 , death .size ());
459443 assertDeathReason (death , 0 , TEST_QUEUE_NAME , "maxlen" , "amq.direct" ,
460- Arrays . asList ("test" ));
444+ Collections . singletonList ("test" ));
461445 assertDeathReason (death , 1 , TEST_QUEUE_NAME , "expired" , "amq.direct" ,
462- Arrays . asList ("test" ));
446+ Collections . singletonList ("test" ));
463447
464448 //Set invalid headers
465449 headers .put ("x-death" , "[I, am, not, array]" );
@@ -478,39 +462,35 @@ public void process(GetResponse getResponse) {
478462 assertNotNull (death );
479463 assertEquals (1 , death .size ());
480464 assertDeathReason (death , 0 , TEST_QUEUE_NAME , "maxlen" , "amq.direct" ,
481- Arrays .asList ("test" ));
482-
483- }
484-
485- public void rejectionTest (final boolean useNack ) throws Exception {
486- deadLetterTest (new Callable <Void >() {
487- public Void call () throws Exception {
488- for (int x = 0 ; x < MSG_COUNT ; x ++) {
489- GetResponse getResponse =
490- channel .basicGet (TEST_QUEUE_NAME , false );
491- long tag = getResponse .getEnvelope ().getDeliveryTag ();
492- if (useNack ) {
493- channel .basicNack (tag , false , false );
494- } else {
495- channel .basicReject (tag , false );
496- }
497- }
498- return null ;
465+ Collections .singletonList ("test" ));
466+
467+ }
468+
469+ private void rejectionTest (final boolean useNack ) throws Exception {
470+ deadLetterTest ((Callable <Void >) () -> {
471+ for (int x = 0 ; x < MSG_COUNT ; x ++) {
472+ GetResponse getResponse =
473+ channel .basicGet (TEST_QUEUE_NAME , false );
474+ long tag = getResponse .getEnvelope ().getDeliveryTag ();
475+ if (useNack ) {
476+ channel .basicNack (tag , false , false );
477+ } else {
478+ channel .basicReject (tag , false );
499479 }
500- }, null , "rejected" );
480+ }
481+ return null ;
482+ }, null , "rejected" );
501483 }
502484
503485 private void deadLetterTest (final Runnable deathTrigger ,
504486 Map <String , Object > queueDeclareArgs ,
505487 String reason )
506488 throws Exception
507489 {
508- deadLetterTest (new Callable <Object >() {
509- public Object call () throws Exception {
510- deathTrigger .run ();
511- return null ;
512- }
513- }, queueDeclareArgs , reason );
490+ deadLetterTest (() -> {
491+ deathTrigger .run ();
492+ return null ;
493+ }, queueDeclareArgs , reason );
514494 }
515495
516496 private void deadLetterTest (Callable <?> deathTrigger ,
@@ -531,35 +511,30 @@ private void deadLetterTest(Callable<?> deathTrigger,
531511 }
532512
533513 public static void consume (final Channel channel , final String reason ) throws IOException {
534- consumeN (channel , DLQ , MSG_COUNT , new WithResponse () {
535- @ SuppressWarnings ("unchecked" )
536- public void process (GetResponse getResponse ) {
537- Map <String , Object > headers = getResponse .getProps ().getHeaders ();
538- assertNotNull (headers );
539- ArrayList <Object > death = (ArrayList <Object >) headers .get ("x-death" );
540- assertNotNull (death );
541- // the following assertions shouldn't be checked on version lower than 3.7
542- // as the headers are new in 3.7
543- // see https://github.com/rabbitmq/rabbitmq-server/issues/1332
544- if (TestUtils .isVersion37orLater (channel .getConnection ())) {
545- assertNotNull (headers .get ("x-first-death-queue" ));
546- assertNotNull (headers .get ("x-first-death-reason" ));
547- assertNotNull (headers .get ("x-first-death-exchange" ));
548- }
549- assertEquals (1 , death .size ());
550- assertDeathReason (death , 0 , TEST_QUEUE_NAME , reason ,
551- "amq.direct" ,
552- Arrays .asList ("test" ));
514+ consumeN (channel , DLQ , MSG_COUNT , getResponse -> {
515+ Map <String , Object > headers = getResponse .getProps ().getHeaders ();
516+ assertNotNull (headers );
517+ ArrayList <Object > death = (ArrayList <Object >) headers .get ("x-death" );
518+ assertNotNull (death );
519+ // the following assertions shouldn't be checked on version lower than 3.7
520+ // as the headers are new in 3.7
521+ // see https://github.com/rabbitmq/rabbitmq-server/issues/1332
522+ if (TestUtils .isVersion37orLater (channel .getConnection ())) {
523+ assertNotNull (headers .get ("x-first-death-queue" ));
524+ assertNotNull (headers .get ("x-first-death-reason" ));
525+ assertNotNull (headers .get ("x-first-death-exchange" ));
553526 }
527+ assertEquals (1 , death .size ());
528+ assertDeathReason (death , 0 , TEST_QUEUE_NAME , reason ,
529+ "amq.direct" ,
530+ Collections .singletonList ("test" ));
554531 });
555532 }
556533
557534 private void ttlTest (final long ttl ) throws Exception {
558535 Map <String , Object > args = new HashMap <String , Object >();
559536 args .put ("x-message-ttl" , ttl );
560- deadLetterTest (new Runnable () {
561- public void run () { sleep (ttl + 1500 ); }
562- }, args , "expired" );
537+ deadLetterTest (() -> sleep (ttl + 1500 ), args , "expired" );
563538 }
564539
565540 private void sleep (long millis ) {
@@ -604,7 +579,7 @@ private void declareQueue(String queue, Object deadLetterExchange,
604579 throws IOException
605580 {
606581 if (args == null ) {
607- args = new HashMap <String , Object >();
582+ args = new HashMap <>();
608583 }
609584
610585 if (ttl > 0 ){
@@ -674,7 +649,7 @@ private static void assertDeathReason(List<Object> death, int num,
674649 (Map <String , Object >)death .get (num );
675650 assertEquals (exchange , deathHeader .get ("exchange" ).toString ());
676651
677- List <String > deathRKs = new ArrayList <String >();
652+ List <String > deathRKs = new ArrayList <>();
678653 for (Object rk : (ArrayList <?>)deathHeader .get ("routing-keys" )) {
679654 deathRKs .add (rk .toString ());
680655 }
@@ -694,13 +669,11 @@ private static void assertDeathReason(List<Object> death, int num,
694669 assertEquals (reason , deathHeader .get ("reason" ).toString ());
695670 }
696671
697- private static interface WithResponse {
698- static final WithResponse NULL = new WithResponse () {
699- public void process (GetResponse getResponse ) {
700- }
701- };
672+ private interface WithResponse {
673+ WithResponse NULL = getResponse -> {
674+ };
702675
703- public void process (GetResponse response );
676+ void process (GetResponse response );
704677 }
705678
706679 private static String randomQueueName () {
@@ -709,9 +682,9 @@ private static String randomQueueName() {
709682
710683 class AccumulatingMessageConsumer extends DefaultConsumer {
711684
712- BlockingQueue <byte []> messages = new LinkedBlockingQueue <byte [] >();
685+ BlockingQueue <byte []> messages = new LinkedBlockingQueue <>();
713686
714- public AccumulatingMessageConsumer (Channel channel ) {
687+ AccumulatingMessageConsumer (Channel channel ) {
715688 super (channel );
716689 }
717690
0 commit comments