Skip to content

Commit 03c5ce4

Browse files
committed
feat: implement UnroutableMessageProcessor and refactor message handling
1 parent 874a845 commit 03c5ce4

File tree

15 files changed

+264
-175
lines changed

15 files changed

+264
-175
lines changed

async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/communications/ReactiveMessageSender.java

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
import reactor.core.publisher.FluxSink;
1111
import reactor.core.publisher.Mono;
1212
import reactor.core.publisher.MonoSink;
13-
import reactor.core.scheduler.Schedulers;
1413
import reactor.rabbitmq.OutboundMessage;
1514
import reactor.rabbitmq.OutboundMessageResult;
1615
import reactor.rabbitmq.SendOptions;
@@ -62,10 +61,6 @@ public ReactiveMessageSender(Sender sender, String sourceApplication,
6261
this.topologyCreator = topologyCreator;
6362
this.isMandatory = isMandatory;
6463
this.unroutableMessageNotifier = unroutableMessageNotifier;
65-
66-
System.out.println("ReactiveMessageSender initialized with mandatory: " + isMandatory);
67-
System.out.println("onReturnedCallback: " + unroutableMessageNotifier);
68-
6964
initializeSenders();
7065
}
7166

@@ -74,9 +69,7 @@ private void initializeSenders() {
7469
final Flux<MyOutboundMessage> messageSource = Flux.create(fluxSinkConfirm::add);
7570
sender.sendWithTypedPublishConfirms(messageSource, new SendOptions().trackReturned(isMandatory))
7671
.doOnNext((OutboundMessageResult<MyOutboundMessage> outboundMessageResult) -> {
77-
System.out.println("MANDATORY: " + isMandatory);
7872
if (outboundMessageResult.isReturned()) {
79-
System.out.println("CALLBACK: " + unroutableMessageNotifier);
8073
this.unroutableMessageNotifier.notifyUnroutableMessage(outboundMessageResult);
8174
}
8275
final Consumer<Boolean> ackNotifier =
@@ -121,12 +114,6 @@ public <T> Flux<OutboundMessageResult> sendWithConfirmBatch(Flux<T> messages, St
121114
);
122115
}
123116

124-
public Mono<Void> sendMessage(Object message, String exchange, String routingKey, Map<String, Object> headers) {
125-
return sendNoConfirm(message, exchange, routingKey, headers, true)
126-
.subscribeOn(Schedulers.boundedElastic())
127-
.doOnError(e -> log.severe("Failed to send unroutable message: " + e.getMessage()));
128-
}
129-
130117
private record AckNotifier(MonoSink<Void> monoSink) implements Consumer<Boolean> {
131118

132119
@Override

async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/communications/UnroutableMessageNotifier.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
11
package org.reactivecommons.async.rabbit.communications;
22

33
import lombok.extern.java.Log;
4+
import reactor.core.Disposable;
45
import reactor.core.publisher.Sinks;
56
import reactor.core.scheduler.Schedulers;
67
import reactor.rabbitmq.OutboundMessageResult;
78

89
@Log
910
public class UnroutableMessageNotifier {
1011
private final Sinks.Many<OutboundMessageResult<MyOutboundMessage>> sink;
12+
private volatile Disposable currentSubscription;
1113

1214
public UnroutableMessageNotifier() {
1315
this.sink = Sinks.many().multicast().onBackpressureBuffer();
@@ -20,13 +22,15 @@ public void notifyUnroutableMessage(OutboundMessageResult<MyOutboundMessage> mes
2022
}
2123

2224
public void listenToUnroutableMessages(UnroutableMessageHandler handler) {
23-
sink.asFlux()
24-
.flatMap(handler::processMessage)
25+
if (currentSubscription != null && !currentSubscription.isDisposed()) {
26+
currentSubscription.dispose();
27+
}
28+
currentSubscription = sink.asFlux()
2529
.subscribeOn(Schedulers.boundedElastic())
30+
.flatMap(handler::processMessage)
2631
.onErrorContinue((throwable, o) ->
2732
log.severe("Error processing unroutable message: " + throwable.getMessage())
2833
)
2934
.subscribe();
30-
3135
}
3236
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package org.reactivecommons.async.rabbit.communications;
2+
3+
import lombok.NoArgsConstructor;
4+
import lombok.extern.java.Log;
5+
import reactor.core.publisher.Mono;
6+
import reactor.rabbitmq.OutboundMessageResult;
7+
8+
import java.nio.charset.StandardCharsets;
9+
10+
@Log
11+
@NoArgsConstructor
12+
public class UnroutableMessageProcessor implements UnroutableMessageHandler {
13+
14+
15+
@Override
16+
public Mono<Void> processMessage(OutboundMessageResult<MyOutboundMessage> result) {
17+
var outboundMessage = result.getOutboundMessage();
18+
log.severe("Unroutable message: exchange=" + outboundMessage.getExchange()
19+
+ ", routingKey=" + outboundMessage.getRoutingKey()
20+
+ ", body=" + new String(outboundMessage.getBody(), StandardCharsets.UTF_8)
21+
+ ", properties=" + outboundMessage.getProperties()
22+
);
23+
return Mono.empty();
24+
}
25+
}

async/async-rabbit/src/test/java/org/reactivecommons/async/rabbit/communications/ReactiveMessageSenderTest.java

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@
2727
import static org.mockito.Mockito.verify;
2828
import static org.mockito.Mockito.when;
2929

30-
@ExtendWith(MockitoExtension.class)
3130
@SuppressWarnings("unchecked")
31+
@ExtendWith(MockitoExtension.class)
3232
class ReactiveMessageSenderTest {
3333

3434
private ReactiveMessageSender messageSender;
@@ -85,19 +85,6 @@ void shouldCallUnroutableMessageHandlerWhenMessageIsReturned() {
8585
.notifyUnroutableMessage(any(OutboundMessageResult.class));
8686
}
8787

88-
89-
@Test
90-
void shouldSendMessageSuccessfully() {
91-
Object message = new SomeClass("id", "name", new Date());
92-
String exchange = "test.exchange";
93-
String routingKey = "test.routingKey";
94-
Map<String, Object> headers = new HashMap<>();
95-
96-
Mono<Void> result = messageSender.sendMessage(message, exchange, routingKey, headers);
97-
98-
StepVerifier.create(result).verifyComplete();
99-
}
100-
10188
@Test
10289
void shouldSendBatchWithConfirmSuccessfully() {
10390
Flux<SomeClass> messages = Flux.just(
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
package org.reactivecommons.async.rabbit.communications;
2+
3+
import org.junit.jupiter.api.BeforeEach;
4+
import org.junit.jupiter.api.Test;
5+
import org.junit.jupiter.api.extension.ExtendWith;
6+
import org.mockito.ArgumentCaptor;
7+
import org.mockito.Captor;
8+
import org.mockito.Mock;
9+
import org.mockito.Spy;
10+
import org.mockito.junit.jupiter.MockitoExtension;
11+
import reactor.core.Disposable;
12+
import reactor.core.publisher.Flux;
13+
import reactor.core.publisher.Mono;
14+
import reactor.core.publisher.Sinks;
15+
import reactor.rabbitmq.OutboundMessageResult;
16+
17+
import java.lang.reflect.Field;
18+
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
import static org.mockito.Mockito.any;
21+
import static org.mockito.Mockito.mock;
22+
import static org.mockito.Mockito.timeout;
23+
import static org.mockito.Mockito.verify;
24+
import static org.mockito.Mockito.when;
25+
26+
@ExtendWith(MockitoExtension.class)
27+
@SuppressWarnings("unchecked")
28+
class UnroutableMessageNotifierTest {
29+
30+
@Spy
31+
private UnroutableMessageNotifier unroutableMessageNotifier;
32+
33+
@Mock
34+
private Sinks.Many<OutboundMessageResult<MyOutboundMessage>> sink;
35+
36+
@Mock
37+
private OutboundMessageResult<MyOutboundMessage> messageResult;
38+
39+
@Mock
40+
private UnroutableMessageHandler handler;
41+
42+
@Captor
43+
private ArgumentCaptor<OutboundMessageResult<MyOutboundMessage>> messageCaptor;
44+
45+
@BeforeEach
46+
void setUp() {
47+
// Usar el constructor por defecto y espiar el sink interno
48+
unroutableMessageNotifier = new UnroutableMessageNotifier();
49+
// Inyectar el mock del sink usando un spy para poder verificarlo
50+
try {
51+
java.lang.reflect.Field sinkField = UnroutableMessageNotifier.class.getDeclaredField("sink");
52+
sinkField.setAccessible(true);
53+
sinkField.set(unroutableMessageNotifier, sink);
54+
} catch (NoSuchFieldException | IllegalAccessException e) {
55+
throw new RuntimeException(e);
56+
}
57+
}
58+
59+
@Test
60+
void shouldEmitUnroutableMessageSuccessfully() {
61+
when(sink.tryEmitNext(messageResult)).thenReturn(Sinks.EmitResult.OK);
62+
63+
unroutableMessageNotifier.notifyUnroutableMessage(messageResult);
64+
65+
verify(sink).tryEmitNext(messageResult);
66+
}
67+
68+
@Test
69+
void shouldNotThrowWhenEmissionFails() {
70+
when(sink.tryEmitNext(messageResult)).thenReturn(Sinks.EmitResult.FAIL_NON_SERIALIZED);
71+
72+
unroutableMessageNotifier.notifyUnroutableMessage(messageResult);
73+
74+
verify(sink).tryEmitNext(messageResult);
75+
}
76+
77+
@Test
78+
void shouldSubscribeHandlerToFluxOfMessages() {
79+
final Flux<OutboundMessageResult<MyOutboundMessage>> messageResultFlux = Flux.just(messageResult);
80+
when(sink.asFlux()).thenReturn(messageResultFlux);
81+
when(handler.processMessage(any(OutboundMessageResult.class))).thenReturn(Mono.empty());
82+
83+
unroutableMessageNotifier.listenToUnroutableMessages(handler);
84+
85+
verify(handler, timeout(1000)).processMessage(messageCaptor.capture());
86+
assertThat(messageCaptor.getValue()).isEqualTo(messageResult);
87+
}
88+
89+
@Test
90+
void shouldDisposePreviousSubscriptionWhenNewHandlerIsSubscribed() throws Exception {
91+
Disposable firstSubscription = mock(Disposable.class);
92+
when(firstSubscription.isDisposed()).thenReturn(false);
93+
94+
Field field = UnroutableMessageNotifier.class.getDeclaredField("currentSubscription");
95+
field.setAccessible(true);
96+
field.set(unroutableMessageNotifier, firstSubscription);
97+
98+
when(sink.asFlux()).thenReturn(Flux.never());
99+
100+
unroutableMessageNotifier.listenToUnroutableMessages(handler);
101+
102+
verify(firstSubscription).dispose();
103+
}
104+
105+
@Test
106+
void shouldContinueProcessingWhenHandlerFails() {
107+
OutboundMessageResult<MyOutboundMessage> messageResult2 = mock(OutboundMessageResult.class);
108+
when(sink.asFlux()).thenReturn(Flux.just(messageResult, messageResult2));
109+
when(handler.processMessage(messageResult)).thenReturn(Mono.error(new RuntimeException("Processing Error")));
110+
when(handler.processMessage(messageResult2)).thenReturn(Mono.empty());
111+
112+
unroutableMessageNotifier.listenToUnroutableMessages(handler);
113+
114+
verify(handler, timeout(1000).times(1)).processMessage(messageResult);
115+
verify(handler, timeout(1000).times(1)).processMessage(messageResult2);
116+
}
117+
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package org.reactivecommons.async.rabbit.communications;
2+
3+
import org.junit.jupiter.api.Test;
4+
import org.junit.jupiter.api.extension.ExtendWith;
5+
import org.mockito.InjectMocks;
6+
import org.mockito.Mock;
7+
import org.mockito.junit.jupiter.MockitoExtension;
8+
import reactor.rabbitmq.OutboundMessageResult;
9+
import reactor.test.StepVerifier;
10+
11+
import java.nio.charset.StandardCharsets;
12+
13+
import static org.mockito.Mockito.verify;
14+
import static org.mockito.Mockito.when;
15+
16+
@ExtendWith(MockitoExtension.class)
17+
class UnroutableMessageProcessorTest {
18+
19+
@Mock
20+
private OutboundMessageResult<MyOutboundMessage> messageResult;
21+
22+
@Mock
23+
private MyOutboundMessage myOutboundMessage;
24+
25+
@InjectMocks
26+
private UnroutableMessageProcessor unroutableMessageProcessor;
27+
28+
29+
@Test
30+
void logsUnroutableMessageDetails() {
31+
when(messageResult.getOutboundMessage()).thenReturn(myOutboundMessage);
32+
when(myOutboundMessage.getExchange()).thenReturn("test-exchange");
33+
when(myOutboundMessage.getRoutingKey()).thenReturn("test-routingKey");
34+
when(myOutboundMessage.getBody()).thenReturn("test-body".getBytes(StandardCharsets.UTF_8));
35+
when(myOutboundMessage.getProperties()).thenReturn(null);
36+
37+
StepVerifier.create(unroutableMessageProcessor.processMessage(messageResult))
38+
.verifyComplete();
39+
40+
verify(messageResult).getOutboundMessage();
41+
verify(myOutboundMessage).getExchange();
42+
verify(myOutboundMessage).getRoutingKey();
43+
verify(myOutboundMessage).getBody();
44+
}
45+
}

starters/async-commons-starter/src/main/java/org/reactivecommons/async/starter/broker/BrokerProvider.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99

1010
@SuppressWarnings("rawtypes")
1111
public interface BrokerProvider<T extends GenericAsyncProps> {
12-
T getProps();
12+
T props();
1313

1414
DomainEventBus getDomainBus();
1515

starters/async-commons-starter/src/test/java/org/reactivecommons/async/starter/mybroker/MyBrokerProvider.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ public class MyBrokerProvider implements BrokerProvider<MyBrokerAsyncProps> {
1717
private final DiscardProvider discardProvider;
1818

1919
@Override
20-
public MyBrokerAsyncProps getProps() {
20+
public MyBrokerAsyncProps props() {
2121
return null;
2222
}
2323

starters/async-kafka-starter/src/main/java/org/reactivecommons/async/kafka/KafkaBrokerProvider.java

Lines changed: 13 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
package org.reactivecommons.async.kafka;
22

33
import io.micrometer.core.instrument.MeterRegistry;
4-
import lombok.Getter;
5-
import lombok.RequiredArgsConstructor;
64
import org.reactivecommons.api.domain.DomainEventBus;
75
import org.reactivecommons.async.api.DirectAsyncGateway;
86
import org.reactivecommons.async.commons.DiscardNotifier;
@@ -23,23 +21,19 @@
2321
import org.springframework.boot.ssl.SslBundles;
2422
import reactor.core.publisher.Mono;
2523

26-
@Getter
27-
@RequiredArgsConstructor
28-
public class KafkaBrokerProvider implements BrokerProvider<AsyncKafkaProps> {
29-
private final String domain;
30-
private final AsyncKafkaProps props;
31-
private final ReactiveReplyRouter router;
32-
private final KafkaJacksonMessageConverter converter;
33-
private final MeterRegistry meterRegistry;
34-
private final CustomReporter errorReporter;
35-
private final KafkaReactiveHealthIndicator healthIndicator;
36-
private final ReactiveMessageListener receiver;
37-
private final ReactiveMessageSender sender;
38-
private final DiscardNotifier discardNotifier;
39-
private final TopologyCreator topologyCreator;
40-
private final KafkaCustomizations customizations;
41-
private final SslBundles sslBundles;
42-
24+
public record KafkaBrokerProvider(String domain,
25+
AsyncKafkaProps props,
26+
ReactiveReplyRouter router,
27+
KafkaJacksonMessageConverter converter,
28+
MeterRegistry meterRegistry,
29+
CustomReporter errorReporter,
30+
KafkaReactiveHealthIndicator healthIndicator,
31+
ReactiveMessageListener receiver,
32+
ReactiveMessageSender sender,
33+
DiscardNotifier discardNotifier,
34+
TopologyCreator topologyCreator,
35+
KafkaCustomizations customizations,
36+
SslBundles sslBundles) implements BrokerProvider<AsyncKafkaProps> {
4337
@Override
4438
public DomainEventBus getDomainBus() {
4539
return new KafkaDomainEventBus(sender);

starters/async-kafka-starter/src/test/java/org/reactivecommons/async/starter/impl/common/kafka/KafkaConfigTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,6 @@ void shouldHasManager() {
4141
// Assert
4242
assertThat(manager).isNotNull();
4343
assertThat(manager.getProviders()).isNotEmpty();
44-
assertThat(manager.getProviders().get("app").getProps().getAppName()).isEqualTo("async-kafka-starter");
44+
assertThat(manager.getProviders().get("app").props().getAppName()).isEqualTo("async-kafka-starter");
4545
}
4646
}

0 commit comments

Comments
 (0)