Skip to content

Commit d7b6585

Browse files
committed
feat: add mandatory property
1 parent 399449d commit d7b6585

File tree

30 files changed

+3022
-2208
lines changed

30 files changed

+3022
-2208
lines changed

async/async-commons-api/src/main/java/org/reactivecommons/async/api/HandlerRegistry.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323

2424
@Getter
2525
@NoArgsConstructor(access = AccessLevel.PACKAGE)
26+
@SuppressWarnings({"unchecked", "unused"})
2627
public final class HandlerRegistry {
2728
public static final String DEFAULT_DOMAIN = "app";
2829
private final RegisteredDomainHandlers<RegisteredEventListener<?, ?>> domainEventListeners =
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package org.reactivecommons.async.rabbit.communications;
2+
3+
import com.rabbitmq.client.AMQP;
4+
import lombok.Getter;
5+
import reactor.rabbitmq.OutboundMessage;
6+
7+
import java.util.function.Consumer;
8+
9+
@Getter
10+
public class MyOutboundMessage extends OutboundMessage {
11+
12+
private final Consumer<Boolean> ackNotifier;
13+
14+
public MyOutboundMessage(
15+
String exchange, String routingKey, AMQP.BasicProperties properties,
16+
byte[] body, Consumer<Boolean> ackNotifier
17+
) {
18+
super(exchange, routingKey, properties, body);
19+
this.ackNotifier = ackNotifier;
20+
}
21+
}

async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/communications/ReactiveMessageSender.java

Lines changed: 29 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,18 @@
22

33
import com.rabbitmq.client.AMQP;
44
import lombok.Getter;
5+
import lombok.extern.java.Log;
56
import org.reactivecommons.async.commons.communications.Message;
67
import org.reactivecommons.async.commons.converters.MessageConverter;
78
import org.reactivecommons.async.commons.exceptions.SendFailureNoAckException;
89
import reactor.core.publisher.Flux;
910
import reactor.core.publisher.FluxSink;
1011
import reactor.core.publisher.Mono;
1112
import reactor.core.publisher.MonoSink;
13+
import reactor.core.scheduler.Schedulers;
1214
import reactor.rabbitmq.OutboundMessage;
1315
import reactor.rabbitmq.OutboundMessageResult;
16+
import reactor.rabbitmq.SendOptions;
1417
import reactor.rabbitmq.Sender;
1518

1619
import java.util.Date;
@@ -26,13 +29,16 @@
2629
import static org.reactivecommons.async.api.DirectAsyncGateway.DELAYED;
2730
import static org.reactivecommons.async.commons.Headers.SOURCE_APPLICATION;
2831

32+
@Log
2933
public class ReactiveMessageSender {
3034
@Getter
3135
private final Sender sender;
3236
private final String sourceApplication;
3337
private final MessageConverter messageConverter;
3438
@Getter
3539
private final TopologyCreator topologyCreator;
40+
private final boolean isMandatory;
41+
private final UnroutableMessageHandler unroutableMessageHandler;
3642

3743
private static final int NUMBER_OF_SENDER_SUBSCRIPTIONS = 4;
3844
private final CopyOnWriteArrayList<FluxSink<MyOutboundMessage>> fluxSinkConfirm = new CopyOnWriteArrayList<>();
@@ -47,18 +53,32 @@ public class ReactiveMessageSender {
4753
13, r -> new Thread(r, "RMessageSender2-" + counter.getAndIncrement())
4854
);
4955

50-
5156
public ReactiveMessageSender(Sender sender, String sourceApplication,
52-
MessageConverter messageConverter, TopologyCreator topologyCreator) {
57+
MessageConverter messageConverter, TopologyCreator topologyCreator,
58+
boolean isMandatory, UnroutableMessageHandler unroutableMessageHandler) {
5359
this.sender = sender;
5460
this.sourceApplication = sourceApplication;
5561
this.messageConverter = messageConverter;
5662
this.topologyCreator = topologyCreator;
63+
this.isMandatory = isMandatory && unroutableMessageHandler != null;
64+
this.unroutableMessageHandler = unroutableMessageHandler;
65+
66+
System.out.println("ReactiveMessageSender initialized with mandatory: " + isMandatory);
67+
System.out.println("onReturnedCallback: " + unroutableMessageHandler);
68+
69+
initializeSenders();
70+
}
5771

72+
private void initializeSenders() {
5873
for (int i = 0; i < NUMBER_OF_SENDER_SUBSCRIPTIONS; ++i) {
5974
final Flux<MyOutboundMessage> messageSource = Flux.create(fluxSinkConfirm::add);
60-
sender.sendWithTypedPublishConfirms(messageSource)
75+
sender.sendWithTypedPublishConfirms(messageSource, new SendOptions().trackReturned(isMandatory))
6176
.doOnNext((OutboundMessageResult<MyOutboundMessage> outboundMessageResult) -> {
77+
System.out.println("MANDATORY: " + isMandatory);
78+
if (outboundMessageResult.isReturned()) {
79+
System.out.println("CALLBACK: " + unroutableMessageHandler);
80+
this.unroutableMessageHandler.processMessage(outboundMessageResult);
81+
}
6282
final Consumer<Boolean> ackNotifier = outboundMessageResult.getOutboundMessage().getAckNotifier();
6383
executorService.submit(() -> ackNotifier.accept(outboundMessageResult.isAck()));
6484
}).subscribe();
@@ -68,7 +88,6 @@ public ReactiveMessageSender(Sender sender, String sourceApplication,
6888
this.fluxSinkNoConfirm = fluxSink
6989
);
7090
sender.send(messageSourceNoConfirm).subscribe();
71-
7291
}
7392

7493
public <T> Mono<Void> sendWithConfirm(T message, String exchange, String routingKey,
@@ -101,12 +120,13 @@ public <T> Flux<OutboundMessageResult> sendWithConfirmBatch(Flux<T> messages, St
101120
);
102121
}
103122

104-
private static class AckNotifier implements Consumer<Boolean> {
105-
private final MonoSink<Void> monoSink;
123+
public Mono<Void> sendMessage(Object message, String exchange, String routingKey, Map<String, Object> headers) {
124+
return sendNoConfirm(message, exchange, routingKey, headers, true)
125+
.subscribeOn(Schedulers.boundedElastic())
126+
.doOnError(e -> log.severe("Failed to send unroutable message: " + e.getMessage()));
127+
}
106128

107-
public AckNotifier(MonoSink<Void> monoSink) {
108-
this.monoSink = monoSink;
109-
}
129+
private record AckNotifier(MonoSink<Void> monoSink) implements Consumer<Boolean> {
110130

111131
@Override
112132
public void accept(Boolean ack) {
@@ -118,21 +138,6 @@ public void accept(Boolean ack) {
118138
}
119139
}
120140

121-
122-
@Getter
123-
static class MyOutboundMessage extends OutboundMessage {
124-
125-
private final Consumer<Boolean> ackNotifier;
126-
127-
public MyOutboundMessage(
128-
String exchange, String routingKey, AMQP.BasicProperties properties,
129-
byte[] body, Consumer<Boolean> ackNotifier
130-
) {
131-
super(exchange, routingKey, properties, body);
132-
this.ackNotifier = ackNotifier;
133-
}
134-
}
135-
136141
private <T> MyOutboundMessage toOutboundMessage(T object, String exchange,
137142
String routingKey, Map<String, Object> headers,
138143
Consumer<Boolean> ackNotifier, boolean persistent) {
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package org.reactivecommons.async.rabbit.communications;
2+
3+
import reactor.core.publisher.Mono;
4+
import reactor.rabbitmq.OutboundMessageResult;
5+
6+
@FunctionalInterface
7+
public interface UnroutableMessageHandler {
8+
/**
9+
* Processes a message that was returned by RabbitMQ because it could not be routed.
10+
*
11+
* @param result The result of the outbound message, containing the original message and return details.
12+
*/
13+
Mono<Void> processMessage(OutboundMessageResult<MyOutboundMessage> result);
14+
}

async/async-rabbit/src/test/java/org/reactivecommons/async/rabbit/RabbitDirectAsyncGatewayTest.java

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,15 @@
1919
import org.reactivecommons.async.commons.converters.json.DefaultObjectMapperSupplier;
2020
import org.reactivecommons.async.commons.reply.ReactiveReplyRouter;
2121
import org.reactivecommons.async.rabbit.communications.ReactiveMessageSender;
22+
import org.reactivecommons.async.rabbit.communications.UnroutableMessageHandler;
2223
import org.reactivecommons.async.rabbit.converters.json.RabbitJacksonMessageConverter;
2324
import org.reactivestreams.Publisher;
2425
import reactor.core.publisher.Flux;
2526
import reactor.core.publisher.Mono;
2627
import reactor.core.publisher.Sinks;
2728
import reactor.rabbitmq.OutboundMessage;
2829
import reactor.rabbitmq.OutboundMessageResult;
30+
import reactor.rabbitmq.SendOptions;
2931
import reactor.rabbitmq.Sender;
3032
import reactor.test.StepVerifier;
3133

@@ -36,7 +38,6 @@
3638
import java.util.concurrent.Semaphore;
3739
import java.util.concurrent.ThreadLocalRandom;
3840
import java.util.concurrent.TimeoutException;
39-
import java.util.stream.Collectors;
4041
import java.util.stream.IntStream;
4142

4243
import static org.assertj.core.api.Assertions.assertThat;
@@ -59,11 +60,15 @@ class RabbitDirectAsyncGatewayTest {
5960

6061
private final BrokerConfig config = new BrokerConfig();
6162
private final Semaphore semaphore = new Semaphore(0);
62-
private final MessageConverter converter = new RabbitJacksonMessageConverter(new DefaultObjectMapperSupplier().get());
63+
private final MessageConverter converter = new RabbitJacksonMessageConverter(
64+
new DefaultObjectMapperSupplier().get()
65+
);
6366
@Mock
6467
private ReactiveReplyRouter router;
6568
@Mock
6669
private ReactiveMessageSender senderMock;
70+
@Mock
71+
private UnroutableMessageHandler unroutableMessageHandler;
6772

6873
private final MeterRegistry meterRegistry = new SimpleMeterRegistry();
6974
private RabbitDirectAsyncGateway asyncGateway;
@@ -101,6 +106,7 @@ void shouldSendInOptimalTime() throws InterruptedException {
101106
messages.flatMap(dummyMessageCommand ->
102107
asyncGateway.sendCommand(dummyMessageCommand, "testTarget")
103108
.doOnSuccess(aVoid -> semaphore.release())
109+
.doOnError(Throwable::printStackTrace)
104110
);
105111

106112
final long init = System.currentTimeMillis();
@@ -137,7 +143,7 @@ void shouldReplyQuery() {
137143
.sendNoConfirm(eq(response), eq("globalReply"), eq("replyId"),
138144
headersCaptor.capture(), anyBoolean()
139145
);
140-
assertThat(headersCaptor.getValue().get(CORRELATION_ID)).isEqualTo("correlationId");
146+
assertThat(headersCaptor.getValue()).containsEntry(CORRELATION_ID, "correlationId");
141147
}
142148

143149
@Test
@@ -156,8 +162,9 @@ void shouldReplyQueryWithout() {
156162
ArgumentCaptor<Map<String, Object>> headersCaptor = ArgumentCaptor.forClass(Map.class);
157163
verify(senderMock, times(1))
158164
.sendNoConfirm(eq(null), eq("globalReply"), eq("replyId"), headersCaptor.capture(), anyBoolean());
159-
assertThat(headersCaptor.getValue().get(CORRELATION_ID)).isEqualTo("correlationId");
160-
assertThat(headersCaptor.getValue().get(COMPLETION_ONLY_SIGNAL)).isEqualTo(Boolean.TRUE.toString());
165+
assertThat(headersCaptor.getValue())
166+
.containsEntry(CORRELATION_ID, "correlationId")
167+
.containsEntry(COMPLETION_ONLY_SIGNAL, Boolean.TRUE.toString());
161168
}
162169

163170
@Test
@@ -180,8 +187,8 @@ void shouldHandleRequestReply() throws JsonProcessingException {
180187
verify(senderMock, times(1))
181188
.sendNoConfirm(eq(query), eq("exchange"), eq("app-target.query"), headersCaptor.capture(),
182189
anyBoolean());
183-
assertThat(headersCaptor.getValue().get(REPLY_ID).toString().length()).isEqualTo(32);
184-
assertThat(headersCaptor.getValue().get(CORRELATION_ID).toString().length()).isEqualTo(32);
190+
assertThat(headersCaptor.getValue().get(REPLY_ID).toString()).hasSize(32);
191+
assertThat(headersCaptor.getValue().get(CORRELATION_ID).toString()).hasSize(32);
185192
}
186193

187194
private void senderMock() {
@@ -201,19 +208,25 @@ private void mockReply() throws JsonProcessingException {
201208

202209
private ReactiveMessageSender getReactiveMessageSender() {
203210
Sender sender = new StubSender();
204-
return new ReactiveMessageSender(sender, "sourceApplication", converter, null);
211+
return new ReactiveMessageSender(sender, "sourceApplication", converter, null, true, unroutableMessageHandler);
205212
}
206213

207214
private Flux<Command<DummyMessage>> createMessagesHot(int count) {
208215
final List<Command<DummyMessage>> commands = IntStream.range(0, count).mapToObj(value -> new Command<>("app" +
209-
".command.test", UUID.randomUUID().toString(), new DummyMessage())).collect(Collectors.toList());
216+
".command.test", UUID.randomUUID().toString(), new DummyMessage())).toList();
210217
return Flux.fromIterable(commands);
211218
}
212219

213220
static class StubSender extends Sender {
214221

215222
@Override
216-
public <OMSG extends OutboundMessage> Flux<OutboundMessageResult<OMSG>> sendWithTypedPublishConfirms(Publisher<OMSG> messages) {
223+
public Mono<Void> send(Publisher<OutboundMessage> messages) {
224+
return Flux.from(messages).then();
225+
}
226+
227+
@Override
228+
public <OMSG extends OutboundMessage> Flux<OutboundMessageResult<OMSG>> sendWithTypedPublishConfirms(
229+
Publisher<OMSG> messages, SendOptions options) {
217230
return Flux.from(messages).map(omsg -> new OutboundMessageResult<>(omsg, true));
218231
}
219232

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package org.reactivecommons.async.rabbit.communications;
2+
3+
import com.rabbitmq.client.AMQP;
4+
import org.junit.jupiter.api.Test;
5+
6+
import java.util.concurrent.atomic.AtomicBoolean;
7+
8+
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
9+
import static org.junit.jupiter.api.Assertions.assertEquals;
10+
import static org.junit.jupiter.api.Assertions.assertNotNull;
11+
import static org.junit.jupiter.api.Assertions.assertTrue;
12+
13+
class MyOutboundMessageTest {
14+
15+
@Test
16+
void shouldCreateMessageWithCorrectProperties() {
17+
AMQP.BasicProperties properties = new AMQP.BasicProperties();
18+
byte[] body = "test message".getBytes();
19+
AtomicBoolean ackCalled = new AtomicBoolean(false);
20+
21+
MyOutboundMessage message = new MyOutboundMessage(
22+
"test.exchange", "test.routingKey", properties, body, ackCalled::set
23+
);
24+
25+
assertEquals("test.exchange", message.getExchange());
26+
assertEquals("test.routingKey", message.getRoutingKey());
27+
assertEquals(properties, message.getProperties());
28+
assertArrayEquals(body, message.getBody());
29+
assertNotNull(message.getAckNotifier());
30+
}
31+
32+
@Test
33+
void shouldInvokeAckNotifierWhenCalled() {
34+
AtomicBoolean ackCalled = new AtomicBoolean(false);
35+
36+
MyOutboundMessage message = new MyOutboundMessage(
37+
"test.exchange", "test.routingKey", new AMQP.BasicProperties(),
38+
"test message".getBytes(), ackCalled::set
39+
);
40+
41+
message.getAckNotifier().accept(true);
42+
43+
assertTrue(ackCalled.get());
44+
}
45+
}

0 commit comments

Comments
 (0)