2121import static com .rabbitmq .client .amqp .Resource .State .CLOSING ;
2222import static com .rabbitmq .client .amqp .Resource .State .OPEN ;
2323import static com .rabbitmq .client .amqp .impl .AmqpConsumerBuilder .*;
24- import static com .rabbitmq .client .amqp .metrics .MetricsCollector .ConsumeDisposition .*;
2524import static java .time .Duration .ofSeconds ;
2625import static java .util .Optional .ofNullable ;
2726
3837import java .util .stream .IntStream ;
3938import org .apache .qpid .protonj2 .client .*;
4039import org .apache .qpid .protonj2 .client .exceptions .*;
41- import org .apache .qpid .protonj2 .client .impl .ClientConversionSupport ;
4240import org .apache .qpid .protonj2 .client .impl .ClientReceiver ;
4341import org .apache .qpid .protonj2 .client .util .DeliveryQueue ;
4442import org .apache .qpid .protonj2 .engine .EventHandler ;
4745import org .apache .qpid .protonj2 .engine .impl .ProtonReceiver ;
4846import org .apache .qpid .protonj2 .engine .impl .ProtonSessionIncomingWindow ;
4947import org .apache .qpid .protonj2 .types .DescribedType ;
50- import org .apache .qpid .protonj2 .types .messaging .Accepted ;
51- import org .apache .qpid .protonj2 .types .messaging .Modified ;
52- import org .apache .qpid .protonj2 .types .messaging .Rejected ;
53- import org .apache .qpid .protonj2 .types .messaging .Released ;
5448import org .slf4j .Logger ;
5549import org .slf4j .LoggerFactory ;
5650
57- final class AmqpConsumer extends ResourceBase implements Consumer {
51+ final class AmqpConsumer extends ResourceBase implements Consumer , ConsumerUtils . CloseableConsumer {
5852
5953 private static final AtomicLong ID_SEQUENCE = new AtomicLong (0 );
6054
@@ -120,7 +114,7 @@ final class AmqpConsumer extends ResourceBase implements Consumer {
120114 .dispatch (
121115 () -> {
122116 // get result to make spotbugs happy
123- boolean ignored = maybeCloseConsumerOnException (this , e );
117+ boolean ignored = ConsumerUtils . maybeCloseConsumerOnException (this , e );
124118 });
125119 };
126120 this .consumerWorkService = connection .consumerWorkService ();
@@ -339,7 +333,7 @@ void recoverAfterConnectionFailure() {
339333 }
340334 }
341335
342- void close (Throwable cause ) {
336+ public void close (Throwable cause ) {
343337 if (this .closed .compareAndSet (false , true )) {
344338 this .state (CLOSING , cause );
345339 if (this .consumerWorkService != null ) {
@@ -400,7 +394,8 @@ private void initStateFromNativeReceiver(ClientReceiver receiver) {
400394 throw new AmqpException ("Could not initialize consumer internal state" );
401395 }
402396 } catch (InterruptedException e ) {
403- throw new RuntimeException (e );
397+ Thread .currentThread ().interrupt ();
398+ throw new AmqpException (e );
404399 }
405400 }
406401
@@ -438,17 +433,13 @@ enum PauseStatus {
438433 PAUSED
439434 }
440435
441- private static class DeliveryContext implements Consumer . Context {
436+ private static class DeliveryContext extends ConsumerUtils . DeliveryContextBase {
442437
443- private static final DeliveryState REJECTED = DeliveryState .rejected (null , null );
444- private final AtomicBoolean settled = new AtomicBoolean (false );
445- private final Delivery delivery ;
446438 private final Scheduler protonExecutor ;
447439 private final ProtonReceiver protonReceiver ;
448440 private final MetricsCollector metricsCollector ;
449441 private final AtomicLong unsettledMessageCount ;
450442 private final Runnable replenishCreditOperation ;
451- private final AmqpConsumer consumer ;
452443
453444 private DeliveryContext (
454445 Delivery delivery ,
@@ -458,48 +449,17 @@ private DeliveryContext(
458449 AtomicLong unsettledMessageCount ,
459450 Runnable replenishCreditOperation ,
460451 AmqpConsumer consumer ) {
461- this . delivery = delivery ;
452+ super ( delivery , consumer ) ;
462453 this .protonExecutor = protonExecutor ;
463454 this .protonReceiver = protonReceiver ;
464455 this .metricsCollector = metricsCollector ;
465456 this .unsettledMessageCount = unsettledMessageCount ;
466457 this .replenishCreditOperation = replenishCreditOperation ;
467- this .consumer = consumer ;
468458 }
469459
470460 @ Override
471- public void accept () {
472- this .settle (DeliveryState .accepted (), ACCEPTED , "accept" );
473- }
474-
475- @ Override
476- public void discard () {
477- settle (REJECTED , DISCARDED , "discard" );
478- }
479-
480- @ Override
481- public void discard (Map <String , Object > annotations ) {
482- annotations = annotations == null ? Collections .emptyMap () : annotations ;
483- Utils .checkMessageAnnotations (annotations );
484- this .settle (DeliveryState .modified (true , true , annotations ), DISCARDED , "discard (modified)" );
485- }
486-
487- @ Override
488- public void requeue () {
489- settle (DeliveryState .released (), REQUEUED , "requeue" );
490- }
491-
492- @ Override
493- public void requeue (Map <String , Object > annotations ) {
494- annotations = annotations == null ? Collections .emptyMap () : annotations ;
495- Utils .checkMessageAnnotations (annotations );
496- this .settle (
497- DeliveryState .modified (false , false , annotations ), REQUEUED , "requeue (modified)" );
498- }
499-
500- @ Override
501- public BatchContext batch (int batchSizeHint ) {
502- return new BatchDeliveryContext (
461+ public Consumer .BatchContext batch (int batchSizeHint ) {
462+ return new BatchContext (
503463 batchSizeHint ,
504464 protonExecutor ,
505465 protonReceiver ,
@@ -509,18 +469,13 @@ public BatchContext batch(int batchSizeHint) {
509469 consumer );
510470 }
511471
512- private void settle (
513- DeliveryState state , MetricsCollector .ConsumeDisposition disposition , String label ) {
514- if (settled .compareAndSet (false , true )) {
515- try {
516- protonExecutor .execute (replenishCreditOperation );
517- delivery .disposition (state , true );
518- unsettledMessageCount .decrementAndGet ();
519- metricsCollector .consumeDisposition (disposition );
520- } catch (Exception e ) {
521- handleContextException (this .consumer , e , label );
522- }
523- }
472+ @ Override
473+ protected void doSettle (DeliveryState state , MetricsCollector .ConsumeDisposition disposition )
474+ throws Exception {
475+ protonExecutor .execute (replenishCreditOperation );
476+ delivery .disposition (state , true );
477+ unsettledMessageCount .decrementAndGet ();
478+ metricsCollector .consumeDisposition (disposition );
524479 }
525480 }
526481
@@ -533,142 +488,50 @@ public String toString() {
533488 return "AmqpConsumer{" + "id=" + id + ", queue='" + queue + '\'' + '}' ;
534489 }
535490
536- private static final class BatchDeliveryContext implements BatchContext {
491+ private static final class BatchContext extends ConsumerUtils . BatchContextBase {
537492
538- private static final org .apache .qpid .protonj2 .types .transport .DeliveryState REJECTED =
539- new Rejected ();
540- private final List <DeliveryContext > contexts ;
541- private final AtomicBoolean settled = new AtomicBoolean (false );
542493 private final Scheduler protonExecutor ;
543494 private final ProtonReceiver protonReceiver ;
544495 private final MetricsCollector metricsCollector ;
545496 private final AtomicLong unsettledMessageCount ;
546497 private final Runnable replenishCreditOperation ;
547- private final AmqpConsumer consumer ;
548498
549- private BatchDeliveryContext (
499+ private BatchContext (
550500 int batchSizeHint ,
551501 Scheduler protonExecutor ,
552502 ProtonReceiver protonReceiver ,
553503 MetricsCollector metricsCollector ,
554504 AtomicLong unsettledMessageCount ,
555505 Runnable replenishCreditOperation ,
556- AmqpConsumer consumer ) {
557- this . contexts = new ArrayList <> (batchSizeHint );
506+ ConsumerUtils . CloseableConsumer consumer ) {
507+ super (batchSizeHint , consumer );
558508 this .protonExecutor = protonExecutor ;
559509 this .protonReceiver = protonReceiver ;
560510 this .metricsCollector = metricsCollector ;
561511 this .unsettledMessageCount = unsettledMessageCount ;
562512 this .replenishCreditOperation = replenishCreditOperation ;
563- this .consumer = consumer ;
564- }
565-
566- @ Override
567- public void add (Consumer .Context context ) {
568- if (this .settled .get ()) {
569- throw new IllegalStateException ("Batch is closed" );
570- } else {
571- if (context instanceof DeliveryContext ) {
572- DeliveryContext dctx = (DeliveryContext ) context ;
573- // marking the context as settled avoids operation on it and deduplicates as well
574- if (dctx .settled .compareAndSet (false , true )) {
575- this .contexts .add (dctx );
576- } else {
577- throw new IllegalStateException ("Message already settled" );
578- }
579- } else {
580- throw new IllegalArgumentException ("Context type not supported: " + context );
581- }
582- }
583- }
584-
585- @ Override
586- public int size () {
587- return this .contexts .size ();
588513 }
589514
590515 @ Override
591- public void accept () {
592- this .settle (Accepted .getInstance (), ACCEPTED , "accept" );
593- }
594-
595- @ Override
596- public void discard () {
597- this .settle (REJECTED , DISCARDED , "discard" );
598- }
599-
600- @ Override
601- public void discard (Map <String , Object > annotations ) {
602- annotations = annotations == null ? Collections .emptyMap () : annotations ;
603- Utils .checkMessageAnnotations (annotations );
604- Modified state =
605- new Modified (false , true , ClientConversionSupport .toSymbolKeyedMap (annotations ));
606- this .settle (state , DISCARDED , "discard (modified)" );
607- }
608-
609- @ Override
610- public void requeue () {
611- this .settle (Released .getInstance (), REQUEUED , "requeue" );
612- }
613-
614- @ Override
615- public void requeue (Map <String , Object > annotations ) {
616- annotations = annotations == null ? Collections .emptyMap () : annotations ;
617- Utils .checkMessageAnnotations (annotations );
618- Modified state =
619- new Modified (false , false , ClientConversionSupport .toSymbolKeyedMap (annotations ));
620- this .settle (state , REQUEUED , "requeue (modified)" );
621- }
622-
623- @ Override
624- public BatchContext batch (int batchSizeHint ) {
625- return this ;
626- }
627-
628- private void settle (
516+ protected void doSettle (
629517 org .apache .qpid .protonj2 .types .transport .DeliveryState state ,
630- MetricsCollector .ConsumeDisposition disposition ,
631- String label ) {
632- if (settled .compareAndSet (false , true )) {
633- int batchSize = this .contexts .size ();
634- try {
635- protonExecutor .execute (replenishCreditOperation );
636- long [][] ranges =
637- SerialNumberUtils .ranges (this .contexts , ctx -> ctx .delivery .getDeliveryId ());
638- this .protonExecutor .execute (
639- () -> {
640- for (long [] range : ranges ) {
641- this .protonReceiver .disposition (state , range );
642- }
518+ MetricsCollector .ConsumeDisposition disposition ) {
519+ int batchSize = this .size ();
520+ protonExecutor .execute (replenishCreditOperation );
521+ long [][] ranges =
522+ SerialNumberUtils .ranges (this .contexts (), ctx -> ctx .delivery .getDeliveryId ());
523+ this .protonExecutor .execute (
524+ () -> {
525+ for (long [] range : ranges ) {
526+ this .protonReceiver .disposition (state , range );
527+ }
528+ });
529+ unsettledMessageCount .addAndGet (-batchSize );
530+ IntStream .range (0 , batchSize )
531+ .forEach (
532+ ignored -> {
533+ metricsCollector .consumeDisposition (disposition );
643534 });
644- unsettledMessageCount .addAndGet (-batchSize );
645- IntStream .range (0 , batchSize )
646- .forEach (
647- ignored -> {
648- metricsCollector .consumeDisposition (disposition );
649- });
650- } catch (Exception e ) {
651- handleContextException (this .consumer , e , label );
652- }
653- }
654- }
655- }
656-
657- private static void handleContextException (
658- AmqpConsumer consumer , Exception ex , String operation ) {
659- if (maybeCloseConsumerOnException (consumer , ex )) {
660- return ;
661535 }
662- if (ex instanceof ClientIllegalStateException
663- || ex instanceof RejectedExecutionException
664- || ex instanceof ClientIOException ) {
665- LOGGER .debug ("message {} failed: {}" , operation , ex .getMessage ());
666- } else if (ex instanceof ClientException ) {
667- throw ExceptionUtils .convert ((ClientException ) ex );
668- }
669- }
670-
671- private static boolean maybeCloseConsumerOnException (AmqpConsumer consumer , Exception ex ) {
672- return ExceptionUtils .maybeCloseOnException (consumer ::close , ex );
673536 }
674537}
0 commit comments