Skip to content

Commit b0ff7ce

Browse files
authored
Merge pull request #27 from juanpmarin/master
fix: avoid publish blocking with channels pooling
2 parents 1853520 + d68c12b commit b0ff7ce

File tree

6 files changed

+32
-9
lines changed

6 files changed

+32
-9
lines changed

async/async-commons-standalone/src/main/java/org/reactivecommons/async/impl/config/RabbitMqConfig.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,17 @@ public RabbitMqConfig(String appName) {
2626
this.appName = appName;
2727
}
2828

29-
public ReactiveMessageSender messageSender(ConnectionFactoryProvider provider, MessageConverter converter) {
29+
public ReactiveMessageSender messageSender(ConnectionFactoryProvider provider, MessageConverter converter,
30+
RabbitProperties rabbitProperties) {
3031
final Mono<Connection> senderConnection = createSenderConnectionMono(provider.getConnectionFactory(), "sender");
31-
final Sender sender = RabbitFlux.createSender(new SenderOptions().connectionMono(senderConnection));
32+
33+
ChannelPool channelPool = ChannelPoolFactory.createChannelPool(
34+
senderConnection,
35+
new ChannelPoolOptions().maxCacheSize(rabbitProperties.getChannelPoolMaxCacheSize())
36+
);
37+
38+
final Sender sender = RabbitFlux.createSender(new SenderOptions().channelPool(channelPool));
39+
3240
return new ReactiveMessageSender(sender, appName, converter, new TopologyCreator(senderConnection));
3341
}
3442

async/async-commons-standalone/src/main/java/org/reactivecommons/async/impl/config/RabbitProperties.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,4 +9,5 @@ public class RabbitProperties {
99
private String username = "guest";
1010
private String password = "guest";
1111
private String virtualHost;
12+
private Integer channelPoolMaxCacheSize;
1213
}

async/async-commons-starter/src/main/java/org/reactivecommons/async/impl/config/RabbitMqConfig.java

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,23 @@ public class RabbitMqConfig {
3636
private Integer maxConcurrency;
3737

3838
@Bean
39-
public ReactiveMessageSender messageSender(ConnectionFactoryProvider provider, MessageConverter converter, BrokerConfigProps props) {
40-
final Mono<Connection> senderConnection = createSenderConnectionMono(provider.getConnectionFactory(), "sender");
41-
final Sender sender = RabbitFlux.createSender(new SenderOptions().connectionMono(senderConnection));
42-
return new ReactiveMessageSender(sender, props.getAppName(), converter, new TopologyCreator(senderConnection));
39+
public ReactiveMessageSender messageSender(ConnectionFactoryProvider provider, MessageConverter converter,
40+
BrokerConfigProps brokerConfigProps, RabbitProperties rabbitProperties) {
41+
Mono<Connection> senderConnection = createSenderConnectionMono(provider.getConnectionFactory(), "sender");
42+
ChannelPoolOptions channelPoolOptions = new ChannelPoolOptions();
43+
44+
PropertyMapper map = PropertyMapper.get();
45+
map.from(rabbitProperties.getCache().getChannel()::getSize).whenNonNull()
46+
.to(channelPoolOptions::maxCacheSize);
47+
48+
ChannelPool channelPool = ChannelPoolFactory.createChannelPool(
49+
senderConnection,
50+
channelPoolOptions
51+
);
52+
53+
final Sender sender = RabbitFlux.createSender(new SenderOptions().channelPool(channelPool));
54+
55+
return new ReactiveMessageSender(sender, brokerConfigProps.getAppName(), converter, new TopologyCreator(senderConnection));
4356
}
4457

4558
@Bean

async/async-commons/async-commons.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ dependencies {
7676
compile project(":domain-events-api")
7777

7878
api 'io.projectreactor:reactor-core'
79-
api "io.projectreactor.rabbitmq:reactor-rabbitmq:1.0.0.RELEASE"
79+
api "io.projectreactor.rabbitmq:reactor-rabbitmq:1.2.0.RELEASE"
8080
api 'com.fasterxml.jackson.core:jackson-databind:2.9.7'
8181
testImplementation 'io.projectreactor:reactor-test'
8282
}
Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
1+
#Wed Jul 24 11:42:46 COT 2019
12
distributionBase=GRADLE_USER_HOME
23
distributionPath=wrapper/dists
3-
distributionUrl=https\://services.gradle.org/distributions/gradle-5.0-bin.zip
44
zipStoreBase=GRADLE_USER_HOME
55
zipStorePath=wrapper/dists
6+
distributionUrl=https\://services.gradle.org/distributions/gradle-5.0-all.zip

samples/async/simpleConsumer/simple-consumer.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,5 @@ dependencies {
22
// compile 'org.springframework.boot:spring-boot-starter-amqp'
33
compile('org.springframework.boot:spring-boot-starter')
44
// compile project(":async:libs:reactor-rabbitmq")
5-
compile "io.projectreactor.rabbitmq:reactor-rabbitmq:1.0.0.RC1"
5+
compile "io.projectreactor.rabbitmq:reactor-rabbitmq:1.2.0.RELEASE"
66
}

0 commit comments

Comments
 (0)