Skip to content

Commit 8a3e3de

Browse files
author
Daniel Bustamante Ospina
committed
Performance and resource optimization in message sender.
1 parent 9c4d930 commit 8a3e3de

File tree

22 files changed

+2182
-42
lines changed

22 files changed

+2182
-42
lines changed
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
package org.reactivecommons.test;
2+
3+
import com.fasterxml.jackson.core.JsonProcessingException;
4+
import com.fasterxml.jackson.databind.ObjectMapper;
5+
import com.rabbitmq.client.AMQP;
6+
import com.rabbitmq.client.Delivery;
7+
import com.rabbitmq.client.Envelope;
8+
import org.assertj.core.api.Assertions;
9+
import org.junit.Test;
10+
import org.junit.runner.RunWith;
11+
import org.reactivecommons.api.domain.Command;
12+
import org.reactivecommons.async.api.AsyncQuery;
13+
import org.reactivecommons.async.api.DirectAsyncGateway;
14+
import org.reactivecommons.async.api.HandlerRegistry;
15+
import org.reactivecommons.async.api.handlers.CommandHandler;
16+
import org.reactivecommons.async.impl.config.annotations.EnableDirectAsyncGateway;
17+
import org.reactivecommons.async.impl.config.annotations.EnableMessageListeners;
18+
import org.springframework.beans.factory.annotation.Autowired;
19+
import org.springframework.beans.factory.annotation.Value;
20+
import org.springframework.boot.SpringApplication;
21+
import org.springframework.boot.autoconfigure.SpringBootApplication;
22+
import org.springframework.boot.test.context.SpringBootTest;
23+
import org.springframework.context.annotation.Bean;
24+
import org.springframework.test.context.junit4.SpringRunner;
25+
import reactor.core.publisher.Flux;
26+
import reactor.core.publisher.Mono;
27+
import reactor.core.publisher.UnicastProcessor;
28+
import reactor.core.scheduler.Schedulers;
29+
import reactor.rabbitmq.AcknowledgableDelivery;
30+
import reactor.test.StepVerifier;
31+
32+
import java.time.Duration;
33+
import java.util.ArrayList;
34+
import java.util.List;
35+
import java.util.Map;
36+
import java.util.UUID;
37+
import java.util.concurrent.CountDownLatch;
38+
import java.util.concurrent.Semaphore;
39+
import java.util.concurrent.ThreadLocalRandom;
40+
import java.util.stream.Collectors;
41+
import java.util.stream.IntStream;
42+
43+
import static org.assertj.core.api.Assertions.assertThat;
44+
import static reactor.core.publisher.Flux.range;
45+
import static reactor.core.publisher.Mono.empty;
46+
import static reactor.core.publisher.Mono.just;
47+
48+
@SpringBootTest
49+
@RunWith(SpringRunner.class)
50+
public class CommandsProcessPerfTest {
51+
52+
private static final String COMMAND_NAME = "app.command.test";
53+
private static final int messageCount = 40000;
54+
private static final Semaphore semaphore = new Semaphore(0);
55+
private static final CountDownLatch latch = new CountDownLatch(12 + 1);
56+
57+
@Autowired
58+
private DirectAsyncGateway gateway;
59+
60+
@Value("${spring.application.name}")
61+
private String appName;
62+
63+
64+
@Test
65+
public void commandShouldArrive() throws InterruptedException {
66+
final long init_p = System.currentTimeMillis();
67+
createMessages(messageCount);
68+
final long end_p = System.currentTimeMillis() - init_p;
69+
System.out.println("Total Publication Time: " + end_p + "ms");
70+
71+
latch.countDown();
72+
final long init = System.currentTimeMillis();
73+
semaphore.acquire(messageCount);
74+
final long end = System.currentTimeMillis();
75+
76+
final long total = end - init;
77+
final double microsPerMessage = ((total+0.0)/messageCount)*1000;
78+
System.out.println("Message count: " + messageCount);
79+
System.out.println("Total Execution Time: " + total + "ms");
80+
System.out.println("Microseconds per message: " + microsPerMessage + "us");
81+
Assertions.assertThat(microsPerMessage).isLessThan(140);
82+
}
83+
84+
85+
private void createMessages(int count) throws InterruptedException {
86+
Flux.range(0, count).flatMap(value -> {
87+
Command<DummyMessage> command = new Command<>("app.command.test", UUID.randomUUID().toString(), new DummyMessage());
88+
return gateway.sendCommand(command, appName).doOnSuccess(_v -> semaphore.release()).thenReturn(value);
89+
}).subscribe();
90+
91+
System.out.println("Wait for publish");
92+
semaphore.acquire(count);
93+
}
94+
95+
96+
@SpringBootApplication
97+
@EnableDirectAsyncGateway
98+
@EnableMessageListeners
99+
static class App{
100+
public static void main(String[] args) {
101+
SpringApplication.run(App.class, args);
102+
}
103+
104+
@Bean
105+
public HandlerRegistry registry() {
106+
final HandlerRegistry registry = range(0, 20).reduce(HandlerRegistry.register(), (r, i) -> r.handleCommand("app.command.name" + i, message -> Mono.empty(), Map.class)).block();
107+
return registry
108+
.handleCommand(COMMAND_NAME, this::handleSimple, DummyMessage.class);
109+
}
110+
111+
private Mono<Void> handleSimple(Command<DummyMessage> message) {
112+
return Mono.fromRunnable(() -> {
113+
if (latch.getCount() > 0) {
114+
latch.countDown();
115+
try {
116+
latch.await();
117+
} catch (InterruptedException e) {
118+
e.printStackTrace();
119+
}
120+
}
121+
semaphore.release();
122+
});
123+
}
124+
125+
}
126+
}
Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
package org.reactivecommons.test;
2+
3+
import org.assertj.core.api.Assertions;
4+
import org.junit.Test;
5+
import org.junit.runner.RunWith;
6+
import org.reactivecommons.api.domain.Command;
7+
import org.reactivecommons.async.api.DirectAsyncGateway;
8+
import org.reactivecommons.async.api.HandlerRegistry;
9+
import org.reactivecommons.async.impl.RabbitDirectAsyncGateway;
10+
import org.reactivecommons.async.impl.config.annotations.EnableDirectAsyncGateway;
11+
import org.reactivecommons.async.impl.config.annotations.EnableMessageListeners;
12+
import org.springframework.beans.factory.annotation.Autowired;
13+
import org.springframework.beans.factory.annotation.Value;
14+
import org.springframework.boot.SpringApplication;
15+
import org.springframework.boot.autoconfigure.SpringBootApplication;
16+
import org.springframework.boot.test.context.SpringBootTest;
17+
import org.springframework.context.annotation.Bean;
18+
import org.springframework.test.context.junit4.SpringRunner;
19+
import reactor.core.publisher.Flux;
20+
import reactor.core.publisher.Mono;
21+
22+
import java.util.List;
23+
import java.util.Map;
24+
import java.util.UUID;
25+
import java.util.concurrent.CountDownLatch;
26+
import java.util.concurrent.Semaphore;
27+
import java.util.stream.Collectors;
28+
import java.util.stream.IntStream;
29+
30+
import static reactor.core.publisher.Flux.range;
31+
32+
@SpringBootTest
33+
@RunWith(SpringRunner.class)
34+
public class DirectGatewayPerfTest {
35+
36+
private static final String COMMAND_NAME = "app.command.test";
37+
private static final int messageCount = 40000;
38+
private static final Semaphore semaphore = new Semaphore(0);
39+
private static final CountDownLatch latch = new CountDownLatch(12 + 1);
40+
41+
@Autowired
42+
private RabbitDirectAsyncGateway gateway;
43+
44+
@Value("${spring.application.name}")
45+
private String appName;
46+
47+
48+
@Test
49+
public void shouldSendInOptimalTime() throws InterruptedException {
50+
final Flux<Command<DummyMessage>> messages = createMessages(messageCount);
51+
final Flux<Void> target = messages.flatMap(dummyMessageCommand ->
52+
gateway.sendCommand(dummyMessageCommand, appName)
53+
.doOnSuccess(aVoid -> semaphore.release()));
54+
55+
final long init = System.currentTimeMillis();
56+
target.subscribe();
57+
semaphore.acquire(messageCount);
58+
final long end = System.currentTimeMillis();
59+
60+
final long total = end - init;
61+
final double microsPerMessage = ((total+0.0)/messageCount)*1000;
62+
System.out.println("Message count: " + messageCount);
63+
System.out.println("Total Execution Time: " + total + "ms");
64+
System.out.println("Microseconds per message: " + microsPerMessage + "us");
65+
System.out.println("Throughput: " + Math.round(messageCount/(total/1000.0)) + " Msg/Seg");
66+
Assertions.assertThat(microsPerMessage).isLessThan(700);
67+
}
68+
69+
@Test
70+
public void shouldSendBatchInOptimalTime() throws InterruptedException {
71+
final Flux<Command<DummyMessage>> messages = createMessages(messageCount/4);
72+
final Mono<Void> target = gateway.sendCommands(messages, appName)
73+
.then().doOnSuccess(_v -> semaphore.release());
74+
75+
final Flux<Command<DummyMessage>> messages2 = createMessages(messageCount/4);
76+
final Mono<Void> target2 = gateway.sendCommands(messages2, appName)
77+
.then().doOnSuccess(_v -> semaphore.release());
78+
79+
final Flux<Command<DummyMessage>> messages3 = createMessages(messageCount/4);
80+
final Mono<Void> target3 = gateway.sendCommands(messages3, appName)
81+
.then().doOnSuccess(_v -> semaphore.release());
82+
83+
final Flux<Command<DummyMessage>> messages4 = createMessages(messageCount/4);
84+
final Mono<Void> target4 = gateway.sendCommands(messages4, appName)
85+
.then().doOnSuccess(_v -> semaphore.release());
86+
87+
88+
89+
final long init = System.currentTimeMillis();
90+
target.subscribe();
91+
target2.subscribe();
92+
target3.subscribe();
93+
target4.subscribe();
94+
System.out.println("Wait for publish");
95+
semaphore.acquire(4);
96+
final long end = System.currentTimeMillis();
97+
98+
final long total = end - init;
99+
final double microsPerMessage = ((total+0.0)/messageCount)*1000;
100+
System.out.println("Message count: " + messageCount);
101+
System.out.println("Total Execution Time: " + total + "ms");
102+
System.out.println("Microseconds per message: " + microsPerMessage + "us");
103+
System.out.println("Throughput: " + Math.round(messageCount/(total/1000.0)) + " Msg/Seg");
104+
Assertions.assertThat(microsPerMessage).isLessThan(700);
105+
}
106+
107+
@Test
108+
public void shouldSendBatchNoConfirmInOptimalTime() throws InterruptedException {
109+
final Flux<Command<DummyMessage>> messages = createMessages(messageCount);
110+
final Mono<Void> target = gateway.sendCommandsNoConfirm(messages, appName)
111+
.doOnSuccess(_v -> semaphore.release());
112+
113+
114+
final long init = System.currentTimeMillis();
115+
target.subscribe();
116+
semaphore.acquire(1);
117+
Thread.sleep(20);
118+
final long end = System.currentTimeMillis();
119+
120+
final long total = end - init;
121+
final double microsPerMessage = ((total+0.0)/messageCount)*1000;
122+
System.out.println("Message count: " + messageCount);
123+
System.out.println("Total Execution Time: " + total + "ms");
124+
System.out.println("Microseconds per message: " + microsPerMessage + "us");
125+
System.out.println("Throughput: " + Math.round(messageCount/(total/1000.0)) + " Msg/Seg");
126+
Assertions.assertThat(microsPerMessage).isLessThan(700);
127+
}
128+
129+
private Flux<Command<DummyMessage>> createMessages(int count) {
130+
final List<Command<DummyMessage>> commands = IntStream.range(0, count).mapToObj(value -> new Command<>(COMMAND_NAME, UUID.randomUUID().toString(), new DummyMessage())).collect(Collectors.toList());
131+
return Flux.fromIterable(commands);
132+
}
133+
134+
135+
136+
@SpringBootApplication
137+
@EnableDirectAsyncGateway
138+
static class App{
139+
public static void main(String[] args) {
140+
SpringApplication.run(App.class, args);
141+
}
142+
143+
}
144+
}
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package org.reactivecommons.test;
2+
3+
import lombok.Data;
4+
import java.util.concurrent.ThreadLocalRandom;
5+
6+
@Data
7+
class DummyMessage {
8+
private String name = "Daniel" + ThreadLocalRandom.current().nextLong();
9+
private Long age = ThreadLocalRandom.current().nextLong();
10+
private String field1 = "Field Data " + ThreadLocalRandom.current().nextLong();
11+
private String field2 = "Field Data " + ThreadLocalRandom.current().nextLong();
12+
private String field3 = "Field Data " + ThreadLocalRandom.current().nextLong();
13+
private String field4 = "Field Data " + ThreadLocalRandom.current().nextLong();
14+
}
Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
1-
spring.application.name=test-app-n5
1+
spring.application.name=test-app-n5
2+
spring.rabbitmq.virtual-host=test

async/async-commons-starter/async-commons-starter.gradle

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,4 +79,14 @@ dependencies {
7979
annotationProcessor 'org.springframework.boot:spring-boot-configuration-processor'
8080

8181
testImplementation 'io.projectreactor:reactor-test'
82+
}
83+
84+
configurations.all {
85+
86+
resolutionStrategy.eachDependency {DependencyResolveDetails details ->
87+
if (details.requested.group == 'io.projectreactor.rabbitmq'){
88+
details.useVersion('1.5.0-M2')
89+
details.because('Upgrade')
90+
}
91+
}
8292
}

async/async-commons-starter/src/main/java/org/reactivecommons/async/impl/config/RabbitMqConfig.java

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -62,27 +62,29 @@ public class RabbitMqConfig {
6262
private String appName;
6363

6464
@Bean
65-
public ReactiveMessageSender messageSender(ConnectionFactoryProvider provider, MessageConverter converter,
66-
BrokerConfigProps brokerConfigProps, RabbitProperties rabbitProperties) {
67-
final Mono<Connection> senderConnection =
68-
createConnectionMono(provider.getConnectionFactory(), appName, SENDER_TYPE);
65+
public ReactiveMessageSender messageSender(MessageConverter converter, BrokerConfigProps brokerConfigProps, SenderOptions senderOptions) {
66+
final Sender sender = RabbitFlux.createSender(senderOptions);
67+
return new ReactiveMessageSender(sender, brokerConfigProps.getAppName(), converter, new TopologyCreator(sender));
68+
}
69+
70+
@Bean
71+
public SenderOptions reactiveCommonsSenderOptions(ConnectionFactoryProvider provider, RabbitProperties rabbitProperties) {
72+
final Mono<Connection> senderConnection = createConnectionMono(provider.getConnectionFactory(), appName, SENDER_TYPE);
6973
final ChannelPoolOptions channelPoolOptions = new ChannelPoolOptions();
7074
final PropertyMapper map = PropertyMapper.get();
7175

7276
map.from(rabbitProperties.getCache().getChannel()::getSize).whenNonNull()
73-
.to(channelPoolOptions::maxCacheSize);
77+
.to(channelPoolOptions::maxCacheSize);
7478

7579
final ChannelPool channelPool = ChannelPoolFactory.createChannelPool(
76-
senderConnection,
77-
channelPoolOptions
80+
senderConnection,
81+
channelPoolOptions
7882
);
7983

80-
final Sender sender = RabbitFlux.createSender(new SenderOptions()
81-
.channelPool(channelPool)
82-
.resourceManagementChannelMono(channelPool.getChannelMono()
83-
.transform(Utils::cache)));
84-
85-
return new ReactiveMessageSender(sender, brokerConfigProps.getAppName(), converter, new TopologyCreator(sender));
84+
return new SenderOptions()
85+
.channelPool(channelPool)
86+
.resourceManagementChannelMono(channelPool.getChannelMono()
87+
.transform(Utils::cache));
8688
}
8789

8890
@Bean

async/async-commons-starter/src/main/java/org/reactivecommons/async/impl/config/annotations/EnableMessageListeners.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,10 @@
1010
import java.lang.annotation.*;
1111

1212
/**
13-
* Actualmente se utiliza EnableMessageListeners para habilitar Comandos, querys y eventos al mismo tiempo,
14-
* se han separado en 3 EnableCommandListeners, EnableQueryListeners y EnableEventListeners, estos se pueden utilizar
15-
* todos juntos o de manera individual segun necesidad
16-
* @deprecated Use EnableCommandListeners, EnableQueryListeners, EnableEventListeners
13+
* This annotation enables all messages listeners (Query, Commands, Events). If you want to enable separately, please use
14+
* EnableCommandListeners, EnableQueryListeners or EnableEventListeners.
15+
*
1716
*/
18-
@Deprecated
1917
@Retention(RetentionPolicy.RUNTIME)
2018
@Target({ElementType.TYPE})
2119
@Documented

async/async-commons/async-commons.gradle

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,9 +76,19 @@ dependencies {
7676
compile project(":domain-events-api")
7777

7878
api 'io.projectreactor:reactor-core'
79-
api "io.projectreactor.rabbitmq:reactor-rabbitmq"
79+
api ("io.projectreactor.rabbitmq:reactor-rabbitmq")
8080
api 'com.fasterxml.jackson.core:jackson-databind'
8181
testImplementation 'io.projectreactor:reactor-test'
8282
testCompile group: 'org.databene', name: 'contiperf', version: '2.3.4'
8383

84+
}
85+
86+
configurations.all {
87+
88+
resolutionStrategy.eachDependency {DependencyResolveDetails details ->
89+
if (details.requested.group == 'io.projectreactor.rabbitmq'){
90+
details.useVersion('1.5.0-M2')
91+
details.because('Upgrade')
92+
}
93+
}
8494
}

0 commit comments

Comments
 (0)