Skip to content

Commit f9c1220

Browse files
author
Juan Marín
committed
fix: bump reactor-rabbitmq version and use sender for topology creator
1 parent fcff887 commit f9c1220

File tree

6 files changed

+27
-56
lines changed

6 files changed

+27
-56
lines changed

async/async-commons-starter/src/main/java/org/reactivecommons/async/impl/config/RabbitMqConfig.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -55,14 +55,16 @@ public ReactiveMessageSender messageSender(ConnectionFactoryProvider provider, M
5555

5656
final Sender sender = RabbitFlux.createSender(new SenderOptions().channelPool(channelPool));
5757

58-
return new ReactiveMessageSender(sender, brokerConfigProps.getAppName(), converter, new TopologyCreator(senderConnection));
58+
return new ReactiveMessageSender(sender, brokerConfigProps.getAppName(), converter, new TopologyCreator(sender));
5959
}
6060

6161
@Bean
6262
public ReactiveMessageListener messageListener(ConnectionFactoryProvider provider) {
6363
final Mono<Connection> connection = createSenderConnectionMono(provider.getConnectionFactory(), "listener");
6464
Receiver receiver = RabbitFlux.createReceiver(new ReceiverOptions().connectionMono(connection));
65-
return new ReactiveMessageListener(receiver, new TopologyCreator(connection), maxConcurrency);
65+
final Sender sender = RabbitFlux.createSender(new SenderOptions().connectionMono(connection));
66+
67+
return new ReactiveMessageListener(receiver, new TopologyCreator(sender), maxConcurrency);
6668
}
6769

6870
@Bean
@@ -75,9 +77,6 @@ public ConnectionFactoryProvider connectionFactory(RabbitProperties properties)
7577
map.from(properties::determineUsername).whenNonNull().to(factory::setUsername);
7678
map.from(properties::determinePassword).whenNonNull().to(factory::setPassword);
7779
map.from(properties::determineVirtualHost).whenNonNull().to(factory::setVirtualHost);
78-
map.from(properties::getRequestedHeartbeat).whenNonNull().asInt(Duration::getSeconds).to(factory::setRequestedHeartbeat);
79-
factory.setAutomaticRecoveryEnabled(true);
80-
factory.setTopologyRecoveryEnabled(true);
8180
factory.useNio();
8281
return () -> factory;
8382
}

async/async-commons/async-commons.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ dependencies {
7676
compile project(":domain-events-api")
7777

7878
api 'io.projectreactor:reactor-core'
79-
api "io.projectreactor.rabbitmq:reactor-rabbitmq:1.2.0.RELEASE"
79+
api "io.projectreactor.rabbitmq:reactor-rabbitmq:1.4.0.RC1"
8080
api 'com.fasterxml.jackson.core:jackson-databind:2.9.7'
8181
testImplementation 'io.projectreactor:reactor-test'
8282
}

async/async-commons/src/main/java/org/reactivecommons/async/impl/communications/ReactiveMessageSender.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@
1919

2020
public class ReactiveMessageSender {
2121

22-
private static final Scheduler senderScheduler = Schedulers.newElastic("reactive-message-sender");
23-
2422
private final Sender sender;
2523
private final String sourceApplication;
2624
private final MessageConverter messageConverter;
@@ -35,7 +33,6 @@ public ReactiveMessageSender(Sender sender, String sourceApplication, MessageCon
3533

3634
public <T> Mono<Void> sendWithConfirm(T message, String exchange, String routingKey, Map<String, Object> headers) {
3735
return just(toOutboundMessage(message, exchange, routingKey, headers))
38-
.publishOn(senderScheduler)
3936
.map(Mono::just)
4037
.flatMapMany(sender::sendWithPublishConfirms)
4138
.flatMap(result -> result.isAck() ?

async/async-commons/src/main/java/org/reactivecommons/async/impl/communications/TopologyCreator.java

Lines changed: 17 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import reactor.rabbitmq.BindingSpecification;
99
import reactor.rabbitmq.ExchangeSpecification;
1010
import reactor.rabbitmq.QueueSpecification;
11+
import reactor.rabbitmq.Sender;
1112

1213
import java.io.IOException;
1314
import java.time.Duration;
@@ -19,63 +20,35 @@
1920
*/
2021
public class TopologyCreator {
2122

22-
private final Mono<Channel> channel;
23+
private final Sender sender;
2324

24-
public TopologyCreator(Mono<Connection> connectionMono) {
25-
this.channel = connectionMono.map(connection -> {
26-
try {
27-
return connection.createChannel();
28-
} catch (IOException e) {
29-
throw new TopologyDefException("Fail to create channel", e);
30-
}
31-
}).doOnError(e -> log.log(Level.SEVERE, e.getMessage(), e))
32-
.retryBackoff(5, Duration.ofMillis(500))
33-
.cache();
25+
public TopologyCreator(Sender sender) {
26+
this.sender = sender;
3427
}
3528

36-
public Mono<AMQP.Exchange.DeclareOk> declare(ExchangeSpecification exchange){
37-
return channel.map(ch -> {
38-
try {
39-
return ch.exchangeDeclare(exchange.getName(), exchange.getType(), exchange.isDurable(), exchange.isAutoDelete(), exchange.getArguments());
40-
} catch (IOException e) {
41-
throw new TopologyDefException("Fail to declare exchange: " + exchange.getName(), e);
42-
}
43-
});
29+
public Mono<AMQP.Exchange.DeclareOk> declare(ExchangeSpecification exchange) {
30+
return sender.declare(exchange)
31+
.onErrorMap(TopologyDefException::new);
4432
}
4533

46-
public Mono<AMQP.Queue.DeclareOk> declare(QueueSpecification queue){
47-
return channel.map(ch -> {
48-
try {
49-
return ch.queueDeclare(queue.getName(), queue.isDurable(), queue.isExclusive(), queue.isAutoDelete(), queue.getArguments());
50-
} catch (IOException e) {
51-
throw new TopologyDefException("Fail to declare queue: " + queue.getName(), e);
52-
}
53-
});
34+
public Mono<AMQP.Queue.DeclareOk> declare(QueueSpecification queue) {
35+
return sender.declare(queue)
36+
.onErrorMap(TopologyDefException::new);
5437
}
5538

56-
public Mono<AMQP.Queue.BindOk> bind(BindingSpecification binding){
57-
return channel.map(ch -> {
58-
try {
59-
return ch.queueBind(binding.getQueue(), binding.getExchange(), binding.getRoutingKey(), binding.getArguments());
60-
} catch (IOException e) {
61-
throw new TopologyDefException("Fail to bind queue: " + binding.getQueue(), e);
62-
}
63-
});
39+
public Mono<AMQP.Queue.BindOk> bind(BindingSpecification binding) {
40+
return sender.bind(binding)
41+
.onErrorMap(TopologyDefException::new);
6442
}
6543

6644
public Mono<AMQP.Queue.UnbindOk> unbind(BindingSpecification binding) {
67-
return channel.map(ch -> {
68-
try {
69-
return ch.queueUnbind(binding.getQueue(), binding.getExchange(), binding.getRoutingKey(), binding.getArguments());
70-
} catch (IOException e) {
71-
throw new TopologyDefException("Fail to unbind queue: " + binding.getQueue(), e);
72-
}
73-
}) ;
45+
return sender.unbind(binding)
46+
.onErrorMap(TopologyDefException::new);
7447
}
7548

7649
public static class TopologyDefException extends RuntimeException {
77-
public TopologyDefException(String message, Throwable cause) {
78-
super(message, cause);
50+
public TopologyDefException(Throwable cause) {
51+
super(cause);
7952
}
8053
}
8154
}

build.gradle

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ buildscript {
66
}
77
dependencies {
88
classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
9+
classpath("com.github.ben-manes:gradle-versions-plugin:$gradleVersionsVersion")
910
}
1011
}
1112

@@ -14,5 +15,4 @@ plugins {
1415
}
1516

1617
apply from: './main.gradle'
17-
18-
18+
apply plugin: 'com.github.ben-manes.versions'

gradle.properties

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,4 @@
11
version=0.2.1-beta1
2-
springBootVersion=2.1.1.RELEASE
2+
springBootVersion=2.2.1.RELEASE
3+
gradleVersionsVersion=0.27.0
4+

0 commit comments

Comments
 (0)