Skip to content

Commit 9df8f88

Browse files
authored
Merge pull request #29 from juanpmarin/master
perf: use elastic scheduler in reactive message sender
2 parents b0ff7ce + 1d1744b commit 9df8f88

File tree

1 file changed

+20
-11
lines changed

1 file changed

+20
-11
lines changed

async/async-commons/src/main/java/org/reactivecommons/async/impl/communications/ReactiveMessageSender.java

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
import org.reactivecommons.async.impl.converters.MessageConverter;
55
import org.reactivecommons.async.impl.exceptions.SendFailureNoAckException;
66
import reactor.core.publisher.Mono;
7+
import reactor.core.scheduler.Scheduler;
8+
import reactor.core.scheduler.Schedulers;
79
import reactor.rabbitmq.OutboundMessage;
810
import reactor.rabbitmq.Sender;
911

@@ -17,6 +19,8 @@
1719

1820
public class ReactiveMessageSender {
1921

22+
private static final Scheduler senderScheduler = Schedulers.newElastic("reactive-message-sender");
23+
2024
private final Sender sender;
2125
private final String sourceApplication;
2226
private final MessageConverter messageConverter;
@@ -30,10 +34,15 @@ public ReactiveMessageSender(Sender sender, String sourceApplication, MessageCon
3034
}
3135

3236
public <T> Mono<Void> sendWithConfirm(T message, String exchange, String routingKey, Map<String, Object> headers) {
33-
return sender.sendWithPublishConfirms(just(toOutboundMessage(message, exchange, routingKey, headers))).flatMap(result -> result.isAck() ?
34-
Mono.empty() :
35-
Mono.error(new SendFailureNoAckException("Event no ACK in communications"))
36-
).then();
37+
return just(toOutboundMessage(message, exchange, routingKey, headers))
38+
.publishOn(senderScheduler)
39+
.map(Mono::just)
40+
.flatMapMany(sender::sendWithPublishConfirms)
41+
.flatMap(result -> result.isAck() ?
42+
Mono.empty() :
43+
Mono.error(new SendFailureNoAckException("Event no ACK in communications"))
44+
)
45+
.then();
3746
}
3847

3948
private <T> OutboundMessage toOutboundMessage(T object, String exchange, String routingKey, Map<String, Object> headers) {
@@ -48,13 +57,13 @@ private AMQP.BasicProperties buildMessageProperties(Message message, Map<String,
4857
baseHeaders.putAll(headers);
4958
baseHeaders.put(SOURCE_APPLICATION, sourceApplication);
5059
return new AMQP.BasicProperties.Builder()
51-
.contentType(properties.getContentType())
52-
.appId(sourceApplication)
53-
.contentEncoding(properties.getContentEncoding())
54-
.deliveryMode(2)
55-
.timestamp(new Date())
56-
.messageId(UUID.randomUUID().toString())
57-
.headers(baseHeaders).build();
60+
.contentType(properties.getContentType())
61+
.appId(sourceApplication)
62+
.contentEncoding(properties.getContentEncoding())
63+
.deliveryMode(2)
64+
.timestamp(new Date())
65+
.messageId(UUID.randomUUID().toString())
66+
.headers(baseHeaders).build();
5867
}
5968

6069
public Sender getSender() {

0 commit comments

Comments
 (0)