|
17 | 17 | import reactor.rabbitmq.AcknowledgableDelivery; |
18 | 18 | import reactor.rabbitmq.ConsumeOptions; |
19 | 19 | import reactor.rabbitmq.Receiver; |
| 20 | +import reactor.util.retry.Retry; |
20 | 21 |
|
21 | 22 | import java.time.Duration; |
22 | 23 | import java.time.Instant; |
@@ -69,6 +70,10 @@ private static long resolveRetries(boolean useDLQRetries, long maxRetries) { |
69 | 70 | return useDLQRetries && maxRetries == -1 ? DEFAULT_RETRIES_DLQ : maxRetries; |
70 | 71 | } |
71 | 72 |
|
| 73 | + private boolean hasLocalRetries() { |
| 74 | + return !useDLQRetries && maxRetries != -1; |
| 75 | + } |
| 76 | + |
72 | 77 | protected Mono<Void> setUpBindings(TopologyCreator creator) { |
73 | 78 | return Mono.empty(); |
74 | 79 | } |
@@ -109,9 +114,12 @@ protected Mono<AcknowledgableDelivery> handle(AcknowledgableDelivery msj, Instan |
109 | 114 | final Function<Message, Mono<Object>> handler = getExecutor(executorPath); |
110 | 115 | final Message message = RabbitMessage.fromDelivery(msj); |
111 | 116 |
|
112 | | - return defer(() -> handler.apply(message)) |
113 | | - .transform(enrichPostProcess(message)) |
114 | | - .doOnSuccess(o -> logExecution(executorPath, initTime, true)) |
| 117 | + Mono<Object> flow = defer(() -> handler.apply(message)) |
| 118 | + .transform(enrichPostProcess(message)); |
| 119 | + if (hasLocalRetries()) { |
| 120 | + flow = flow.retryWhen(Retry.fixedDelay(maxRetries, retryDelay)); |
| 121 | + } |
| 122 | + return flow.doOnSuccess(o -> logExecution(executorPath, initTime, true)) |
115 | 123 | .subscribeOn(scheduler).thenReturn(msj); |
116 | 124 | } catch (Exception e) { |
117 | 125 | log.log(Level.SEVERE, format("ATTENTION !! Outer error protection reached for %s, in Async Consumer!! Severe Warning! ", msj.getProperties().getMessageId())); |
@@ -194,16 +202,16 @@ private Mono<AcknowledgableDelivery> requeueOrAck(AcknowledgableDelivery msj, Th |
194 | 202 | final boolean redeliver = msj.getEnvelope().isRedeliver(); |
195 | 203 | reportErrorMetric(msj, init); |
196 | 204 | sendErrorToCustomReporter(err, rabbitMessage, redeliver || retryNumber > 0); |
197 | | - if (maxRetries != -1 && retryNumber >= maxRetries) { |
| 205 | + if (hasLocalRetries() || retryNumber >= maxRetries) { // Discard |
198 | 206 | logError(err, msj, FallbackStrategy.DEFINITIVE_DISCARD); |
199 | 207 | return discardNotifier |
200 | 208 | .notifyDiscard(rabbitMessage) |
201 | 209 | .doOnSuccess(_a -> msj.ack()).thenReturn(msj); |
202 | | - } else if (useDLQRetries) { |
| 210 | + } else if (useDLQRetries) { // DLQ retries |
203 | 211 | logError(err, msj, FallbackStrategy.RETRY_DLQ); |
204 | 212 | msj.nack(false); |
205 | 213 | return Mono.just(msj); |
206 | | - } else { |
| 214 | + } else { // infinity fast retries |
207 | 215 | logError(err, msj, FallbackStrategy.FAST_RETRY); |
208 | 216 | return Mono.just(msj).delayElement(retryDelay).doOnNext(m -> m.nack(true)); |
209 | 217 | } |
|
0 commit comments