2121import static com .rabbitmq .client .amqp .Resource .State .CLOSING ;
2222import static com .rabbitmq .client .amqp .Resource .State .OPEN ;
2323import static com .rabbitmq .client .amqp .impl .AmqpConsumerBuilder .*;
24- import static com .rabbitmq .client .amqp .metrics .MetricsCollector .ConsumeDisposition .*;
2524import static java .time .Duration .ofSeconds ;
2625import static java .util .Optional .ofNullable ;
2726
3837import java .util .stream .IntStream ;
3938import org .apache .qpid .protonj2 .client .*;
4039import org .apache .qpid .protonj2 .client .exceptions .*;
41- import org .apache .qpid .protonj2 .client .impl .ClientConversionSupport ;
4240import org .apache .qpid .protonj2 .client .impl .ClientReceiver ;
4341import org .apache .qpid .protonj2 .client .util .DeliveryQueue ;
4442import org .apache .qpid .protonj2 .engine .EventHandler ;
4745import org .apache .qpid .protonj2 .engine .impl .ProtonReceiver ;
4846import org .apache .qpid .protonj2 .engine .impl .ProtonSessionIncomingWindow ;
4947import org .apache .qpid .protonj2 .types .DescribedType ;
50- import org .apache .qpid .protonj2 .types .messaging .Accepted ;
51- import org .apache .qpid .protonj2 .types .messaging .Modified ;
52- import org .apache .qpid .protonj2 .types .messaging .Rejected ;
53- import org .apache .qpid .protonj2 .types .messaging .Released ;
5448import org .slf4j .Logger ;
5549import org .slf4j .LoggerFactory ;
5650
57- final class AmqpConsumer extends ResourceBase implements Consumer {
51+ final class AmqpConsumer extends ResourceBase implements Consumer , ConsumerUtils . CloseableConsumer {
5852
5953 private static final AtomicLong ID_SEQUENCE = new AtomicLong (0 );
6054
@@ -114,7 +108,7 @@ final class AmqpConsumer extends ResourceBase implements Consumer {
114108 .dispatch (
115109 () -> {
116110 // get result to make spotbugs happy
117- boolean ignored = maybeCloseConsumerOnException (this , e );
111+ boolean ignored = ConsumerUtils . maybeCloseConsumerOnException (this , e );
118112 });
119113 };
120114 this .consumerWorkService = connection .consumerWorkService ();
@@ -301,7 +295,7 @@ void recoverAfterConnectionFailure() {
301295 }
302296 }
303297
304- void close (Throwable cause ) {
298+ public void close (Throwable cause ) {
305299 if (this .closed .compareAndSet (false , true )) {
306300 this .state (CLOSING , cause );
307301 if (this .consumerWorkService != null ) {
@@ -362,7 +356,8 @@ private void initStateFromNativeReceiver(ClientReceiver receiver) {
362356 throw new AmqpException ("Could not initialize consumer internal state" );
363357 }
364358 } catch (InterruptedException e ) {
365- throw new RuntimeException (e );
359+ Thread .currentThread ().interrupt ();
360+ throw new AmqpException (e );
366361 }
367362 }
368363
@@ -400,17 +395,13 @@ enum PauseStatus {
400395 PAUSED
401396 }
402397
403- private static class DeliveryContext implements Consumer . Context {
398+ private static class DeliveryContext extends ConsumerUtils . DeliveryContextBase {
404399
405- private static final DeliveryState REJECTED = DeliveryState .rejected (null , null );
406- private final AtomicBoolean settled = new AtomicBoolean (false );
407- private final Delivery delivery ;
408400 private final Scheduler protonExecutor ;
409401 private final ProtonReceiver protonReceiver ;
410402 private final MetricsCollector metricsCollector ;
411403 private final AtomicLong unsettledMessageCount ;
412404 private final Runnable replenishCreditOperation ;
413- private final AmqpConsumer consumer ;
414405
415406 private DeliveryContext (
416407 Delivery delivery ,
@@ -420,48 +411,17 @@ private DeliveryContext(
420411 AtomicLong unsettledMessageCount ,
421412 Runnable replenishCreditOperation ,
422413 AmqpConsumer consumer ) {
423- this . delivery = delivery ;
414+ super ( delivery , consumer ) ;
424415 this .protonExecutor = protonExecutor ;
425416 this .protonReceiver = protonReceiver ;
426417 this .metricsCollector = metricsCollector ;
427418 this .unsettledMessageCount = unsettledMessageCount ;
428419 this .replenishCreditOperation = replenishCreditOperation ;
429- this .consumer = consumer ;
430420 }
431421
432422 @ Override
433- public void accept () {
434- this .settle (DeliveryState .accepted (), ACCEPTED , "accept" );
435- }
436-
437- @ Override
438- public void discard () {
439- settle (REJECTED , DISCARDED , "discard" );
440- }
441-
442- @ Override
443- public void discard (Map <String , Object > annotations ) {
444- annotations = annotations == null ? Collections .emptyMap () : annotations ;
445- Utils .checkMessageAnnotations (annotations );
446- this .settle (DeliveryState .modified (true , true , annotations ), DISCARDED , "discard (modified)" );
447- }
448-
449- @ Override
450- public void requeue () {
451- settle (DeliveryState .released (), REQUEUED , "requeue" );
452- }
453-
454- @ Override
455- public void requeue (Map <String , Object > annotations ) {
456- annotations = annotations == null ? Collections .emptyMap () : annotations ;
457- Utils .checkMessageAnnotations (annotations );
458- this .settle (
459- DeliveryState .modified (false , false , annotations ), REQUEUED , "requeue (modified)" );
460- }
461-
462- @ Override
463- public BatchContext batch (int batchSizeHint ) {
464- return new BatchDeliveryContext (
423+ public Consumer .BatchContext batch (int batchSizeHint ) {
424+ return new BatchContext (
465425 batchSizeHint ,
466426 protonExecutor ,
467427 protonReceiver ,
@@ -471,18 +431,13 @@ public BatchContext batch(int batchSizeHint) {
471431 consumer );
472432 }
473433
474- private void settle (
475- DeliveryState state , MetricsCollector .ConsumeDisposition disposition , String label ) {
476- if (settled .compareAndSet (false , true )) {
477- try {
478- protonExecutor .execute (replenishCreditOperation );
479- delivery .disposition (state , true );
480- unsettledMessageCount .decrementAndGet ();
481- metricsCollector .consumeDisposition (disposition );
482- } catch (Exception e ) {
483- handleContextException (this .consumer , e , label );
484- }
485- }
434+ @ Override
435+ protected void doSettle (DeliveryState state , MetricsCollector .ConsumeDisposition disposition )
436+ throws Exception {
437+ protonExecutor .execute (replenishCreditOperation );
438+ delivery .disposition (state , true );
439+ unsettledMessageCount .decrementAndGet ();
440+ metricsCollector .consumeDisposition (disposition );
486441 }
487442 }
488443
@@ -491,142 +446,50 @@ public String toString() {
491446 return "AmqpConsumer{" + "id=" + id + ", queue='" + queue + '\'' + '}' ;
492447 }
493448
494- private static final class BatchDeliveryContext implements BatchContext {
449+ private static final class BatchContext extends ConsumerUtils . BatchContextBase {
495450
496- private static final org .apache .qpid .protonj2 .types .transport .DeliveryState REJECTED =
497- new Rejected ();
498- private final List <DeliveryContext > contexts ;
499- private final AtomicBoolean settled = new AtomicBoolean (false );
500451 private final Scheduler protonExecutor ;
501452 private final ProtonReceiver protonReceiver ;
502453 private final MetricsCollector metricsCollector ;
503454 private final AtomicLong unsettledMessageCount ;
504455 private final Runnable replenishCreditOperation ;
505- private final AmqpConsumer consumer ;
506456
507- private BatchDeliveryContext (
457+ private BatchContext (
508458 int batchSizeHint ,
509459 Scheduler protonExecutor ,
510460 ProtonReceiver protonReceiver ,
511461 MetricsCollector metricsCollector ,
512462 AtomicLong unsettledMessageCount ,
513463 Runnable replenishCreditOperation ,
514- AmqpConsumer consumer ) {
515- this . contexts = new ArrayList <> (batchSizeHint );
464+ ConsumerUtils . CloseableConsumer consumer ) {
465+ super (batchSizeHint , consumer );
516466 this .protonExecutor = protonExecutor ;
517467 this .protonReceiver = protonReceiver ;
518468 this .metricsCollector = metricsCollector ;
519469 this .unsettledMessageCount = unsettledMessageCount ;
520470 this .replenishCreditOperation = replenishCreditOperation ;
521- this .consumer = consumer ;
522- }
523-
524- @ Override
525- public void add (Consumer .Context context ) {
526- if (this .settled .get ()) {
527- throw new IllegalStateException ("Batch is closed" );
528- } else {
529- if (context instanceof DeliveryContext ) {
530- DeliveryContext dctx = (DeliveryContext ) context ;
531- // marking the context as settled avoids operation on it and deduplicates as well
532- if (dctx .settled .compareAndSet (false , true )) {
533- this .contexts .add (dctx );
534- } else {
535- throw new IllegalStateException ("Message already settled" );
536- }
537- } else {
538- throw new IllegalArgumentException ("Context type not supported: " + context );
539- }
540- }
541- }
542-
543- @ Override
544- public int size () {
545- return this .contexts .size ();
546471 }
547472
548473 @ Override
549- public void accept () {
550- this .settle (Accepted .getInstance (), ACCEPTED , "accept" );
551- }
552-
553- @ Override
554- public void discard () {
555- this .settle (REJECTED , DISCARDED , "discard" );
556- }
557-
558- @ Override
559- public void discard (Map <String , Object > annotations ) {
560- annotations = annotations == null ? Collections .emptyMap () : annotations ;
561- Utils .checkMessageAnnotations (annotations );
562- Modified state =
563- new Modified (false , true , ClientConversionSupport .toSymbolKeyedMap (annotations ));
564- this .settle (state , DISCARDED , "discard (modified)" );
565- }
566-
567- @ Override
568- public void requeue () {
569- this .settle (Released .getInstance (), REQUEUED , "requeue" );
570- }
571-
572- @ Override
573- public void requeue (Map <String , Object > annotations ) {
574- annotations = annotations == null ? Collections .emptyMap () : annotations ;
575- Utils .checkMessageAnnotations (annotations );
576- Modified state =
577- new Modified (false , false , ClientConversionSupport .toSymbolKeyedMap (annotations ));
578- this .settle (state , REQUEUED , "requeue (modified)" );
579- }
580-
581- @ Override
582- public BatchContext batch (int batchSizeHint ) {
583- return this ;
584- }
585-
586- private void settle (
474+ protected void doSettle (
587475 org .apache .qpid .protonj2 .types .transport .DeliveryState state ,
588- MetricsCollector .ConsumeDisposition disposition ,
589- String label ) {
590- if (settled .compareAndSet (false , true )) {
591- int batchSize = this .contexts .size ();
592- try {
593- protonExecutor .execute (replenishCreditOperation );
594- long [][] ranges =
595- SerialNumberUtils .ranges (this .contexts , ctx -> ctx .delivery .getDeliveryId ());
596- this .protonExecutor .execute (
597- () -> {
598- for (long [] range : ranges ) {
599- this .protonReceiver .disposition (state , range );
600- }
476+ MetricsCollector .ConsumeDisposition disposition ) {
477+ int batchSize = this .size ();
478+ protonExecutor .execute (replenishCreditOperation );
479+ long [][] ranges =
480+ SerialNumberUtils .ranges (this .contexts (), ctx -> ctx .delivery .getDeliveryId ());
481+ this .protonExecutor .execute (
482+ () -> {
483+ for (long [] range : ranges ) {
484+ this .protonReceiver .disposition (state , range );
485+ }
486+ });
487+ unsettledMessageCount .addAndGet (-batchSize );
488+ IntStream .range (0 , batchSize )
489+ .forEach (
490+ ignored -> {
491+ metricsCollector .consumeDisposition (disposition );
601492 });
602- unsettledMessageCount .addAndGet (-batchSize );
603- IntStream .range (0 , batchSize )
604- .forEach (
605- ignored -> {
606- metricsCollector .consumeDisposition (disposition );
607- });
608- } catch (Exception e ) {
609- handleContextException (this .consumer , e , label );
610- }
611- }
612- }
613- }
614-
615- private static void handleContextException (
616- AmqpConsumer consumer , Exception ex , String operation ) {
617- if (maybeCloseConsumerOnException (consumer , ex )) {
618- return ;
619493 }
620- if (ex instanceof ClientIllegalStateException
621- || ex instanceof RejectedExecutionException
622- || ex instanceof ClientIOException ) {
623- LOGGER .debug ("message {} failed: {}" , operation , ex .getMessage ());
624- } else if (ex instanceof ClientException ) {
625- throw ExceptionUtils .convert ((ClientException ) ex );
626- }
627- }
628-
629- private static boolean maybeCloseConsumerOnException (AmqpConsumer consumer , Exception ex ) {
630- return ExceptionUtils .maybeCloseOnException (consumer ::close , ex );
631494 }
632495}
0 commit comments