Skip to content

Commit 791f1da

Browse files
authored
Merge pull request #32 from juanpmarin/master
test: connection factory tests
2 parents 8352b44 + 6aedbbf commit 791f1da

File tree

3 files changed

+22
-11
lines changed

3 files changed

+22
-11
lines changed

async/async-commons-starter/src/main/java/org/reactivecommons/async/impl/config/RabbitMqConfig.java

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,11 @@
3232
@Import(BrokerConfigProps.class)
3333
public class RabbitMqConfig {
3434

35+
private static final String LISTENER_TYPE = "listener";
36+
37+
private static final String SENDER_TYPE = "sender";
38+
39+
3540
@Value("${app.async.flux.maxConcurrency:250}")
3641
private Integer maxConcurrency;
3742

@@ -41,14 +46,15 @@ public class RabbitMqConfig {
4146
@Bean
4247
public ReactiveMessageSender messageSender(ConnectionFactoryProvider provider, MessageConverter converter,
4348
BrokerConfigProps brokerConfigProps, RabbitProperties rabbitProperties) {
44-
Mono<Connection> senderConnection = createSenderConnectionMono(provider.getConnectionFactory(), "sender");
45-
ChannelPoolOptions channelPoolOptions = new ChannelPoolOptions();
49+
final Mono<Connection> senderConnection =
50+
createConnectionMono(provider.getConnectionFactory(), appName, SENDER_TYPE);
51+
final ChannelPoolOptions channelPoolOptions = new ChannelPoolOptions();
52+
final PropertyMapper map = PropertyMapper.get();
4653

47-
PropertyMapper map = PropertyMapper.get();
4854
map.from(rabbitProperties.getCache().getChannel()::getSize).whenNonNull()
4955
.to(channelPoolOptions::maxCacheSize);
5056

51-
ChannelPool channelPool = ChannelPoolFactory.createChannelPool(
57+
final ChannelPool channelPool = ChannelPoolFactory.createChannelPool(
5258
senderConnection,
5359
channelPoolOptions
5460
);
@@ -60,8 +66,9 @@ public ReactiveMessageSender messageSender(ConnectionFactoryProvider provider, M
6066

6167
@Bean
6268
public ReactiveMessageListener messageListener(ConnectionFactoryProvider provider) {
63-
final Mono<Connection> connection = createSenderConnectionMono(provider.getConnectionFactory(), "listener");
64-
Receiver receiver = RabbitFlux.createReceiver(new ReceiverOptions().connectionMono(connection));
69+
final Mono<Connection> connection =
70+
createConnectionMono(provider.getConnectionFactory(), appName, LISTENER_TYPE);
71+
final Receiver receiver = RabbitFlux.createReceiver(new ReceiverOptions().connectionMono(connection));
6572
final Sender sender = RabbitFlux.createSender(new SenderOptions().connectionMono(connection));
6673

6774
return new ReactiveMessageListener(receiver, new TopologyCreator(sender), maxConcurrency);
@@ -93,9 +100,9 @@ public MessageConverter messageConverter(ObjectMapperSupplier objectMapperSuppli
93100
return new JacksonMessageConverter(objectMapperSupplier.get());
94101
}
95102

96-
Mono<Connection> createSenderConnectionMono(ConnectionFactory factory, String name) {
103+
Mono<Connection> createConnectionMono(ConnectionFactory factory, String connectionPrefix, String connectionType) {
97104
final Scheduler senderScheduler = Schedulers.elastic();
98-
return Mono.fromCallable(() -> factory.newConnection(appName + " " + name))
105+
return Mono.fromCallable(() -> factory.newConnection(connectionPrefix + " " + connectionType))
99106
.doOnError(err ->
100107
log.log(Level.SEVERE, "Error creating connection to RabbitMq Broker. Starting retry process...", err)
101108
)

async/async-commons-starter/src/test/java/org/reactivecommons/async/impl/config/RabbitMqConfigTest.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,20 @@ public class RabbitMqConfigTest {
1919

2020
@Test
2121
public void retryInitialConnection() throws IOException, TimeoutException {
22+
final String connectionType = "sender";
23+
final String appName = "appName";
24+
final String connectionName = "appName sender";
25+
2226
final AtomicInteger count = new AtomicInteger();
2327
final Connection connection = mock(Connection.class);
2428
ConnectionFactory factory = mock(ConnectionFactory.class);
25-
when(factory.newConnection("test")).thenAnswer(invocation -> {
29+
when(factory.newConnection(connectionName)).thenAnswer(invocation -> {
2630
if(count.incrementAndGet() == 10){
2731
return connection;
2832
}
2933
throw new RuntimeException();
3034
});
31-
StepVerifier.withVirtualTime(() -> config.createSenderConnectionMono(factory, "test"))
35+
StepVerifier.withVirtualTime(() -> config.createConnectionMono(factory, appName, connectionType))
3236
.thenAwait(Duration.ofMinutes(2))
3337
.expectNext(connection).verifyComplete();
3438
}

async/async-commons/async-commons.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ dependencies {
7676
compile project(":domain-events-api")
7777

7878
api 'io.projectreactor:reactor-core'
79-
api "io.projectreactor.rabbitmq:reactor-rabbitmq:1.4.0.RC1"
79+
api "io.projectreactor.rabbitmq:reactor-rabbitmq:1.4.0.RELEASE"
8080
api 'com.fasterxml.jackson.core:jackson-databind:2.9.7'
8181
testImplementation 'io.projectreactor:reactor-test'
8282
}

0 commit comments

Comments
 (0)