1616import reactor .rabbitmq .Receiver ;
1717
1818import java .time .Duration ;
19+ import java .time .Instant ;
1920import java .util .HashMap ;
2021import java .util .List ;
2122import java .util .Optional ;
@@ -39,14 +40,17 @@ public abstract class GenericMessageListener {
3940 private final boolean useDLQRetries ;
4041 private final long maxRetries ;
4142 private final DiscardNotifier discardNotifier ;
43+ private final String objectType ;
4244
43- public GenericMessageListener (String queueName , ReactiveMessageListener listener , boolean useDLQRetries , long maxRetries , DiscardNotifier discardNotifier ) {
45+ public GenericMessageListener (String queueName , ReactiveMessageListener listener , boolean useDLQRetries ,
46+ long maxRetries , DiscardNotifier discardNotifier , String objectType ) {
4447 this .receiver = listener .getReceiver ();
4548 this .queueName = queueName ;
4649 this .messageListener = listener ;
4750 this .maxRetries = maxRetries ;
4851 this .useDLQRetries = useDLQRetries ;
4952 this .discardNotifier = discardNotifier ;
53+ this .objectType = objectType ;
5054 }
5155
5256 protected Mono <Void > setUpBindings (TopologyCreator creator ) {
@@ -55,35 +59,52 @@ protected Mono<Void> setUpBindings(TopologyCreator creator) {
5559
5660 public void startListener () {
5761 log .log (Level .INFO , "Using max concurrency {0}, in queue: {1}" , new Object []{messageListener .getMaxConcurrency (), queueName });
58- if (useDLQRetries ){
62+ if (useDLQRetries ) {
5963 log .log (Level .INFO , "ATTENTION! Using DLQ Strategy for retries with {0} + 1 Max Retries configured!" , new Object []{maxRetries });
60- }else {
64+ } else {
6165 log .log (Level .INFO , "ATTENTION! Using infinite fast retries as Retry Strategy" );
6266 }
6367 setUpBindings (messageListener .getTopologyCreator ()).thenMany (
64- receiver .consumeManualAck (queueName )
65- .transform (this ::consumeFaultTolerant )
66- .transform (this ::outerFailureProtection ))
67- .subscribe ();
68+ receiver .consumeManualAck (queueName )
69+ .transform (this ::consumeFaultTolerant )
70+ .transform (this ::outerFailureProtection ))
71+ .subscribe ();
6872 }
6973
7074
7175 private Mono <AcknowledgableDelivery > handle (AcknowledgableDelivery msj ) {
72- final Function <Message , Mono <Object >> handler = getExecutor (getExecutorPath (msj ));
76+ String executorPath = getExecutorPath (msj );
77+ final Function <Message , Mono <Object >> handler = getExecutor (executorPath );
7378 final Message message = RabbitMessage .fromDelivery (msj );
74- return defer (() -> handler .apply (message )).transform (enrichPostProcess (message ))
75- .subscribeOn (scheduler ).thenReturn (msj );
79+
80+ return defer (() -> handler .apply (message ))
81+ .transform (enrichPostProcess (message ))
82+ .transform (logExecution (executorPath ))
83+ .subscribeOn (scheduler ).thenReturn (msj );
7684 }
7785
86+ private Function <Mono <Object >, Mono <Object >> logExecution (String executorPath ) {
87+ return objectMono -> {
88+ Instant beforeExecutionTime = Instant .now ();
89+
90+ return objectMono .doOnTerminate (() -> {
91+ Instant afterExecutionTime = Instant .now ();
92+ long timeElapsed = Duration .between (beforeExecutionTime , afterExecutionTime ).toMillis ();
93+
94+ log .log (Level .FINE , String .format ("%s with path %s handled, took %d ms" ,
95+ objectType , executorPath , timeElapsed ));
96+ });
97+ };
98+ }
7899
79100 private <T > Flux <T > outerFailureProtection (Flux <T > messageFlux ) {
80101 return messageFlux .onErrorContinue (t -> true , (throwable , elem ) -> {
81- if (elem instanceof AcknowledgableDelivery ){
102+ if (elem instanceof AcknowledgableDelivery ) {
82103 try {
83104 String messageID = ((AcknowledgableDelivery ) elem ).getProperties ().getMessageId ();
84105 log .log (Level .SEVERE , format ("ATTENTION !! Outer error protection reached for %s, in Async Consumer!! Severe Warning! " , messageID ));
85106 requeueOrAck ((AcknowledgableDelivery ) elem , throwable ).subscribe ();
86- }catch (Exception e ){
107+ } catch (Exception e ) {
87108 log .log (Level .SEVERE , "Error returning message in failure!" , e );
88109 }
89110 }
@@ -92,22 +113,22 @@ private <T> Flux<T> outerFailureProtection(Flux<T> messageFlux) {
92113
93114 private Flux <AcknowledgableDelivery > consumeFaultTolerant (Flux <AcknowledgableDelivery > messageFlux ) {
94115 return messageFlux .flatMap (msj ->
95- handle (msj )
96- .doOnSuccess (AcknowledgableDelivery ::ack )
97- .onErrorResume (err -> requeueOrAck (msj , err ))
98- , messageListener .getMaxConcurrency ());
116+ handle (msj )
117+ .doOnSuccess (AcknowledgableDelivery ::ack )
118+ .onErrorResume (err -> requeueOrAck (msj , err ))
119+ , messageListener .getMaxConcurrency ());
99120 }
100121
101122
102- protected void logError (Throwable err , AcknowledgableDelivery msj , FallbackStrategy strategy ){
123+ protected void logError (Throwable err , AcknowledgableDelivery msj , FallbackStrategy strategy ) {
103124 String messageID = msj .getProperties ().getMessageId ();
104125 try {
105126 log .log (Level .SEVERE , format ("Error encounter while processing message %s: %s" , messageID , err .toString ()));
106127 log .warning (format ("Message %s Headers: %s" , messageID , msj .getProperties ().getHeaders ().toString ()));
107128 log .warning (format ("Message %s Body: %s" , messageID , new String (msj .getBody ())));
108129 } catch (Exception e ) {
109130 log .log (Level .SEVERE , "Error Login message Content!!" , e );
110- }finally {
131+ } finally {
111132 log .warning (format (strategy .message , messageID ));
112133 }
113134 }
@@ -119,14 +140,15 @@ private Function<Message, Mono<Object>> getExecutor(String path) {
119140
120141 private Function <Message , Mono <Object >> computeRawMessageHandler (String commandId ) {
121142 return handlers .computeIfAbsent (commandId , s ->
122- rawMessageHandler (commandId )
143+ rawMessageHandler (commandId )
123144 );
124145 }
125146
126147 protected abstract Function <Message , Mono <Object >> rawMessageHandler (String executorPath );
148+
127149 protected abstract String getExecutorPath (AcknowledgableDelivery msj );
128150
129- protected Function <Mono <Object >, Mono <Object >> enrichPostProcess (Message msg ){
151+ protected Function <Mono <Object >, Mono <Object >> enrichPostProcess (Message msg ) {
130152 return identity ();
131153 }
132154
@@ -143,7 +165,7 @@ private Mono<AcknowledgableDelivery> requeueOrAck(AcknowledgableDelivery msj, Th
143165 msj .nack (false );
144166 }
145167 return Mono .just (msj );
146- }else {
168+ } else {
147169 logError (err , msj , FallbackStrategy .FAST_RETRY );
148170 return Mono .just (msj ).delayElement (Duration .ofMillis (200 )).doOnNext (m -> m .nack (true ));
149171 }
@@ -152,7 +174,7 @@ private Mono<AcknowledgableDelivery> requeueOrAck(AcknowledgableDelivery msj, Th
152174 private Long getRetryNumber (AcknowledgableDelivery delivery ) {
153175 return Optional .ofNullable (delivery .getProperties ())
154176 .map (AMQP .BasicProperties ::getHeaders )
155- .map (x -> (List <HashMap >)x .get ("x-death" ))
177+ .map (x -> (List <HashMap >) x .get ("x-death" ))
156178 .filter (list -> !list .isEmpty ())
157179 .map (list -> list .get (0 ))
158180 .map (hashMap -> (Long ) hashMap .get ("count" ))
0 commit comments