Skip to content

Commit 874a845

Browse files
committed
break circular dependency
1 parent d7b6585 commit 874a845

File tree

9 files changed

+77
-28
lines changed

9 files changed

+77
-28
lines changed

async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/communications/ReactiveMessageSender.java

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ public class ReactiveMessageSender {
3838
@Getter
3939
private final TopologyCreator topologyCreator;
4040
private final boolean isMandatory;
41-
private final UnroutableMessageHandler unroutableMessageHandler;
41+
private final UnroutableMessageNotifier unroutableMessageNotifier;
4242

4343
private static final int NUMBER_OF_SENDER_SUBSCRIPTIONS = 4;
4444
private final CopyOnWriteArrayList<FluxSink<MyOutboundMessage>> fluxSinkConfirm = new CopyOnWriteArrayList<>();
@@ -55,16 +55,16 @@ public class ReactiveMessageSender {
5555

5656
public ReactiveMessageSender(Sender sender, String sourceApplication,
5757
MessageConverter messageConverter, TopologyCreator topologyCreator,
58-
boolean isMandatory, UnroutableMessageHandler unroutableMessageHandler) {
58+
boolean isMandatory, UnroutableMessageNotifier unroutableMessageNotifier) {
5959
this.sender = sender;
6060
this.sourceApplication = sourceApplication;
6161
this.messageConverter = messageConverter;
6262
this.topologyCreator = topologyCreator;
63-
this.isMandatory = isMandatory && unroutableMessageHandler != null;
64-
this.unroutableMessageHandler = unroutableMessageHandler;
63+
this.isMandatory = isMandatory;
64+
this.unroutableMessageNotifier = unroutableMessageNotifier;
6565

6666
System.out.println("ReactiveMessageSender initialized with mandatory: " + isMandatory);
67-
System.out.println("onReturnedCallback: " + unroutableMessageHandler);
67+
System.out.println("onReturnedCallback: " + unroutableMessageNotifier);
6868

6969
initializeSenders();
7070
}
@@ -76,10 +76,11 @@ private void initializeSenders() {
7676
.doOnNext((OutboundMessageResult<MyOutboundMessage> outboundMessageResult) -> {
7777
System.out.println("MANDATORY: " + isMandatory);
7878
if (outboundMessageResult.isReturned()) {
79-
System.out.println("CALLBACK: " + unroutableMessageHandler);
80-
this.unroutableMessageHandler.processMessage(outboundMessageResult);
79+
System.out.println("CALLBACK: " + unroutableMessageNotifier);
80+
this.unroutableMessageNotifier.notifyUnroutableMessage(outboundMessageResult);
8181
}
82-
final Consumer<Boolean> ackNotifier = outboundMessageResult.getOutboundMessage().getAckNotifier();
82+
final Consumer<Boolean> ackNotifier =
83+
outboundMessageResult.getOutboundMessage().getAckNotifier();
8384
executorService.submit(() -> ackNotifier.accept(outboundMessageResult.isAck()));
8485
}).subscribe();
8586
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package org.reactivecommons.async.rabbit.communications;
2+
3+
import lombok.extern.java.Log;
4+
import reactor.core.publisher.Sinks;
5+
import reactor.core.scheduler.Schedulers;
6+
import reactor.rabbitmq.OutboundMessageResult;
7+
8+
@Log
9+
public class UnroutableMessageNotifier {
10+
private final Sinks.Many<OutboundMessageResult<MyOutboundMessage>> sink;
11+
12+
public UnroutableMessageNotifier() {
13+
this.sink = Sinks.many().multicast().onBackpressureBuffer();
14+
}
15+
16+
public void notifyUnroutableMessage(OutboundMessageResult<MyOutboundMessage> message) {
17+
if (sink.tryEmitNext(message).isFailure()) {
18+
log.warning("Failed to emit unroutable message: " + message);
19+
}
20+
}
21+
22+
public void listenToUnroutableMessages(UnroutableMessageHandler handler) {
23+
sink.asFlux()
24+
.flatMap(handler::processMessage)
25+
.subscribeOn(Schedulers.boundedElastic())
26+
.onErrorContinue((throwable, o) ->
27+
log.severe("Error processing unroutable message: " + throwable.getMessage())
28+
)
29+
.subscribe();
30+
31+
}
32+
}

async/async-rabbit/src/test/java/org/reactivecommons/async/rabbit/RabbitDirectAsyncGatewayTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
import org.reactivecommons.async.commons.converters.json.DefaultObjectMapperSupplier;
2020
import org.reactivecommons.async.commons.reply.ReactiveReplyRouter;
2121
import org.reactivecommons.async.rabbit.communications.ReactiveMessageSender;
22-
import org.reactivecommons.async.rabbit.communications.UnroutableMessageHandler;
22+
import org.reactivecommons.async.rabbit.communications.UnroutableMessageNotifier;
2323
import org.reactivecommons.async.rabbit.converters.json.RabbitJacksonMessageConverter;
2424
import org.reactivestreams.Publisher;
2525
import reactor.core.publisher.Flux;
@@ -68,7 +68,7 @@ class RabbitDirectAsyncGatewayTest {
6868
@Mock
6969
private ReactiveMessageSender senderMock;
7070
@Mock
71-
private UnroutableMessageHandler unroutableMessageHandler;
71+
private UnroutableMessageNotifier unroutableMessageNotifier;
7272

7373
private final MeterRegistry meterRegistry = new SimpleMeterRegistry();
7474
private RabbitDirectAsyncGateway asyncGateway;
@@ -208,7 +208,7 @@ private void mockReply() throws JsonProcessingException {
208208

209209
private ReactiveMessageSender getReactiveMessageSender() {
210210
Sender sender = new StubSender();
211-
return new ReactiveMessageSender(sender, "sourceApplication", converter, null, true, unroutableMessageHandler);
211+
return new ReactiveMessageSender(sender, "sourceApplication", converter, null, true, unroutableMessageNotifier);
212212
}
213213

214214
private Flux<Command<DummyMessage>> createMessagesHot(int count) {

async/async-rabbit/src/test/java/org/reactivecommons/async/rabbit/communications/ReactiveMessageSenderTest.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ class ReactiveMessageSenderTest {
4545
private final SendOptions sendOptions = new SendOptions();
4646

4747
@Mock
48-
private UnroutableMessageHandler unroutableMessageHandler;
48+
private UnroutableMessageNotifier unroutableMessageNotifier;
4949

5050
@BeforeEach
5151
void init() {
@@ -61,7 +61,8 @@ void init() {
6161
});
6262
String sourceApplication = "TestApp";
6363

64-
messageSender = new ReactiveMessageSender(sender, sourceApplication, messageConverter, null, false, unroutableMessageHandler);
64+
messageSender = new ReactiveMessageSender(sender, sourceApplication, messageConverter, null, false,
65+
unroutableMessageNotifier);
6566
}
6667

6768
@Test
@@ -73,15 +74,15 @@ void shouldCallUnroutableMessageHandlerWhenMessageIsReturned() {
7374
});
7475

7576
messageSender = new ReactiveMessageSender(
76-
sender, sourceApplication, messageConverter, null, true, unroutableMessageHandler
77+
sender, sourceApplication, messageConverter, null, true, unroutableMessageNotifier
7778
);
7879
SomeClass messageContent = new SomeClass("id", "name", new Date());
7980

8081
messageSender.sendWithConfirm(messageContent, "exchange", "rkey", new HashMap<>(), true)
8182
.subscribe();
8283

83-
verify(unroutableMessageHandler, timeout(1000).times(1))
84-
.processMessage(any(OutboundMessageResult.class));
84+
verify(unroutableMessageNotifier, timeout(1000).times(1))
85+
.notifyUnroutableMessage(any(OutboundMessageResult.class));
8586
}
8687

8788

@@ -113,7 +114,8 @@ void shouldSendBatchWithConfirmSuccessfully() {
113114
when(sender.sendWithPublishConfirms(any(Publisher.class)))
114115
.thenReturn(Flux.just(result1, result2));
115116

116-
Flux<OutboundMessageResult> resultFlux = messageSender.sendWithConfirmBatch(messages, exchange, routingKey, headers, true);
117+
Flux<OutboundMessageResult> resultFlux = messageSender.sendWithConfirmBatch(messages, exchange, routingKey,
118+
headers, true);
117119

118120
StepVerifier.create(resultFlux).verifyComplete();
119121
}

starters/async-rabbit-starter/src/main/java/org/reactivecommons/async/rabbit/RabbitMQBrokerProviderFactory.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
import org.reactivecommons.async.commons.reply.ReactiveReplyRouter;
99
import org.reactivecommons.async.rabbit.communications.ReactiveMessageListener;
1010
import org.reactivecommons.async.rabbit.communications.ReactiveMessageSender;
11-
import org.reactivecommons.async.rabbit.communications.UnroutableMessageHandler;
11+
import org.reactivecommons.async.rabbit.communications.UnroutableMessageNotifier;
1212
import org.reactivecommons.async.rabbit.config.ConnectionFactoryProvider;
1313
import org.reactivecommons.async.rabbit.config.RabbitProperties;
1414
import org.reactivecommons.async.rabbit.config.props.AsyncProps;
@@ -29,7 +29,7 @@ public class RabbitMQBrokerProviderFactory implements BrokerProviderFactory<Asyn
2929
private final MeterRegistry meterRegistry;
3030
private final CustomReporter errorReporter;
3131
private final RabbitMQDiscardProviderFactory discardProvider;
32-
private final UnroutableMessageHandler unroutableMessageHandler;
32+
private final UnroutableMessageNotifier unroutableMessageNotifier;
3333

3434
@Override
3535
public String getBrokerType() {
@@ -47,7 +47,8 @@ public BrokerProvider<AsyncProps> getProvider(String domain, AsyncProps props, D
4747
ConnectionFactoryProvider provider = RabbitMQSetupUtils.connectionFactoryProvider(properties);
4848
RabbitReactiveHealthIndicator healthIndicator =
4949
new RabbitReactiveHealthIndicator(domain, provider.getConnectionFactory());
50-
ReactiveMessageSender sender = RabbitMQSetupUtils.createMessageSender(provider, props, converter, unroutableMessageHandler);
50+
ReactiveMessageSender sender = RabbitMQSetupUtils.createMessageSender(provider, props, converter,
51+
unroutableMessageNotifier);
5152
ReactiveMessageListener listener = RabbitMQSetupUtils.createMessageListener(provider, props);
5253
DiscardNotifier discardNotifier;
5354
if (props.isUseDiscardNotifierPerDomain()) {

starters/async-rabbit-starter/src/main/java/org/reactivecommons/async/rabbit/RabbitMQSetupUtils.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import org.reactivecommons.async.rabbit.communications.ReactiveMessageSender;
1616
import org.reactivecommons.async.rabbit.communications.TopologyCreator;
1717
import org.reactivecommons.async.rabbit.communications.UnroutableMessageHandler;
18+
import org.reactivecommons.async.rabbit.communications.UnroutableMessageNotifier;
1819
import org.reactivecommons.async.rabbit.config.ConnectionFactoryProvider;
1920
import org.reactivecommons.async.rabbit.config.RabbitProperties;
2021
import org.reactivecommons.async.rabbit.config.props.AsyncProps;
@@ -95,11 +96,11 @@ public static ConnectionFactoryProvider connectionFactoryProvider(RabbitProperti
9596
public static ReactiveMessageSender createMessageSender(ConnectionFactoryProvider provider,
9697
AsyncProps props,
9798
MessageConverter converter,
98-
UnroutableMessageHandler unroutableMessageHandler) {
99+
UnroutableMessageNotifier unroutableMessageNotifier) {
99100
final Sender sender = RabbitFlux.createSender(reactiveCommonsSenderOptions(props.getAppName(), provider,
100101
props.getConnectionProperties()));
101102
return new ReactiveMessageSender(sender, props.getAppName(), converter, new TopologyCreator(sender),
102-
props.getMandatory(), unroutableMessageHandler
103+
props.getMandatory(), unroutableMessageNotifier
103104
);
104105
}
105106

starters/async-rabbit-starter/src/main/java/org/reactivecommons/async/starter/impl/common/rabbit/RabbitMQConfig.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import org.reactivecommons.async.rabbit.RabbitMQBrokerProviderFactory;
77
import org.reactivecommons.async.rabbit.RabbitMQFactory;
88
import org.reactivecommons.async.rabbit.communications.UnroutableMessageHandler;
9+
import org.reactivecommons.async.rabbit.communications.UnroutableMessageNotifier;
910
import org.reactivecommons.async.rabbit.config.RabbitProperties;
1011
import org.reactivecommons.async.rabbit.config.RabbitPropertiesAutoConfig;
1112
import org.reactivecommons.async.rabbit.config.props.AsyncPropsDomain;
@@ -51,8 +52,8 @@ public RabbitProperties defaultRabbitProperties(RabbitPropertiesAutoConfig prope
5152

5253
@Bean
5354
@ConditionalOnMissingBean(UnroutableMessageHandler.class)
54-
public UnroutableMessageHandler defaultUnroutableMessageHandler() {
55-
return (result -> {
55+
public UnroutableMessageHandler defaultUnroutableMessageHandler(UnroutableMessageNotifier notifier) {
56+
UnroutableMessageHandler handler = (result -> {
5657
System.out.println("MENSAJE DEVUELTO: " + result.isReturned());
5758
OutboundMessage returned = result.getOutboundMessage();
5859
log.severe("Unroutable message: exchange=" + returned.getExchange()
@@ -62,6 +63,14 @@ public UnroutableMessageHandler defaultUnroutableMessageHandler() {
6263
);
6364
return Mono.empty();
6465
});
66+
notifier.listenToUnroutableMessages(handler);
67+
return handler;
68+
}
69+
70+
@Bean
71+
@ConditionalOnMissingBean(UnroutableMessageNotifier.class)
72+
public UnroutableMessageNotifier defaultUnroutableMessageNotifier() {
73+
return new UnroutableMessageNotifier();
6574
}
6675

6776
@Bean

starters/async-rabbit-starter/src/test/java/org/reactivecommons/async/rabbit/RabbitMQBrokerProviderFactoryTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
import org.reactivecommons.async.commons.config.IBrokerConfigProps;
1111
import org.reactivecommons.async.commons.ext.CustomReporter;
1212
import org.reactivecommons.async.commons.reply.ReactiveReplyRouter;
13-
import org.reactivecommons.async.rabbit.communications.UnroutableMessageHandler;
13+
import org.reactivecommons.async.rabbit.communications.UnroutableMessageNotifier;
1414
import org.reactivecommons.async.rabbit.config.RabbitProperties;
1515
import org.reactivecommons.async.rabbit.config.props.AsyncProps;
1616
import org.reactivecommons.async.rabbit.config.props.BrokerConfigProps;
@@ -34,14 +34,14 @@ class RabbitMQBrokerProviderFactoryTest {
3434
@Mock
3535
private CustomReporter errorReporter;
3636
@Mock
37-
private UnroutableMessageHandler unroutableMessageHandler;
37+
private UnroutableMessageNotifier unroutableMessageNotifier;
3838

3939
private BrokerProviderFactory<AsyncProps> providerFactory;
4040

4141
@BeforeEach
4242
void setUp() {
4343
providerFactory = new RabbitMQBrokerProviderFactory(config, router, converter, meterRegistry, errorReporter,
44-
RabbitMQDiscardProviderImpl::new, unroutableMessageHandler);
44+
RabbitMQDiscardProviderImpl::new, unroutableMessageNotifier);
4545
}
4646

4747
@Test

starters/async-rabbit-starter/src/test/java/org/reactivecommons/async/starter/impl/common/rabbit/RabbitMQConfigTest.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import org.reactivecommons.async.rabbit.RabbitMQFactory;
99
import org.reactivecommons.async.rabbit.communications.MyOutboundMessage;
1010
import org.reactivecommons.async.rabbit.communications.UnroutableMessageHandler;
11+
import org.reactivecommons.async.rabbit.communications.UnroutableMessageNotifier;
1112
import org.reactivecommons.async.rabbit.config.props.AsyncPropsDomain;
1213
import org.reactivecommons.async.rabbit.converters.json.RabbitJacksonMessageConverter;
1314
import org.reactivecommons.async.starter.config.ConnectionManager;
@@ -43,6 +44,8 @@ class RabbitMQConfigTest {
4344

4445
@Mock
4546
private MyOutboundMessage outboundMessageMock;
47+
@Mock
48+
private UnroutableMessageNotifier unroutableMessageNotifier;
4649

4750
private RabbitMQConfig rabbitMQConfig;
4851

@@ -66,7 +69,7 @@ void shouldHasManager() {
6669
@Test
6770
void shouldProcessAndLogWhenMessageIsReturned() {
6871
// Arrange
69-
UnroutableMessageHandler handler = rabbitMQConfig.defaultUnroutableMessageHandler();
72+
UnroutableMessageHandler handler = rabbitMQConfig.defaultUnroutableMessageHandler(unroutableMessageNotifier);
7073
when(resultMock.getOutboundMessage()).thenReturn(outboundMessageMock);
7174
when(outboundMessageMock.getExchange()).thenReturn("test.exchange");
7275
when(outboundMessageMock.getRoutingKey()).thenReturn("test.key");

0 commit comments

Comments
 (0)