@@ -71,21 +71,25 @@ public void startListener() {
7171
7272 setUpBindings (messageListener .getTopologyCreator ()).thenMany (
7373 receiver .consumeManualAck (queueName , consumeOptions )
74- .transform (this ::consumeFaultTolerant )
75- .transform (this ::outerFailureProtection ))
74+ .transform (this ::consumeFaultTolerant ))
7675 .subscribe ();
7776 }
7877
7978
8079 private Mono <AcknowledgableDelivery > handle (AcknowledgableDelivery msj ) {
81- String executorPath = getExecutorPath (msj );
82- final Function <Message , Mono <Object >> handler = getExecutor (executorPath );
83- final Message message = RabbitMessage .fromDelivery (msj );
80+ try {
81+ String executorPath = getExecutorPath (msj );
82+ final Function <Message , Mono <Object >> handler = getExecutor (executorPath );
83+ final Message message = RabbitMessage .fromDelivery (msj );
8484
85- return defer (() -> handler .apply (message ))
85+ return defer (() -> handler .apply (message ))
8686 .transform (enrichPostProcess (message ))
8787 .transform (logExecution (executorPath ))
8888 .subscribeOn (scheduler ).thenReturn (msj );
89+ } catch (Exception e ) {
90+ log .log (Level .SEVERE , format ("ATTENTION !! Outer error protection reached for %s, in Async Consumer!! Severe Warning! " , msj .getProperties ().getMessageId ()));
91+ return Mono .error (e );
92+ }
8993 }
9094
9195 private Function <Mono <Object >, Mono <Object >> logExecution (String executorPath ) {
@@ -102,22 +106,6 @@ private Function<Mono<Object>, Mono<Object>> logExecution(String executorPath) {
102106 };
103107 }
104108
105- private <T > Flux <T > outerFailureProtection (Flux <T > messageFlux ) {
106- return messageFlux .onErrorContinue (t -> true , (throwable , elem ) -> {
107- if (elem instanceof AcknowledgableDelivery ) {
108- try {
109- String messageID = ((AcknowledgableDelivery ) elem ).getProperties ().getMessageId ();
110- log .log (Level .SEVERE , format ("ATTENTION !! Outer error protection reached for %s, in Async Consumer!! Severe Warning! " , messageID ));
111- requeueOrAck ((AcknowledgableDelivery ) elem , throwable ).subscribe ();
112- } catch (Exception e ) {
113- log .log (Level .SEVERE , "Error returning message in failure!" , e );
114- }
115- } else {
116- throw new RuntimeException (throwable );
117- }
118- });
119- }
120-
121109 private Flux <AcknowledgableDelivery > consumeFaultTolerant (Flux <AcknowledgableDelivery > messageFlux ) {
122110 return messageFlux .flatMap (msj ->
123111 handle (msj )
0 commit comments