Skip to content

Commit 036b61a

Browse files
dbuosDaniel Bustamante Ospina
authored andcommitted
Improve over 10x on async query throughput and several minor fixes and tests
1 parent b949746 commit 036b61a

File tree

22 files changed

+364
-134
lines changed

22 files changed

+364
-134
lines changed

acceptance/async-tests/async-tests.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,4 @@ dependencies {
33
compile('org.springframework.boot:spring-boot-starter')
44
}
55

6-
test.onlyIf { false }
6+
test.onlyIf { true }

acceptance/async-tests/src/test/java/org/reactivecommons/test/DirectGatewayPerfTest.java

Lines changed: 25 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import reactor.core.publisher.Flux;
2020
import reactor.core.publisher.Mono;
2121

22+
import java.util.ArrayList;
2223
import java.util.List;
2324
import java.util.Map;
2425
import java.util.UUID;
@@ -36,7 +37,6 @@ public class DirectGatewayPerfTest {
3637
private static final String COMMAND_NAME = "app.command.test";
3738
private static final int messageCount = 40000;
3839
private static final Semaphore semaphore = new Semaphore(0);
39-
private static final CountDownLatch latch = new CountDownLatch(12 + 1);
4040

4141
@Autowired
4242
private RabbitDirectAsyncGateway gateway;
@@ -57,82 +57,56 @@ public void shouldSendInOptimalTime() throws InterruptedException {
5757
semaphore.acquire(messageCount);
5858
final long end = System.currentTimeMillis();
5959

60-
final long total = end - init;
61-
final double microsPerMessage = ((total+0.0)/messageCount)*1000;
62-
System.out.println("Message count: " + messageCount);
63-
System.out.println("Total Execution Time: " + total + "ms");
64-
System.out.println("Microseconds per message: " + microsPerMessage + "us");
65-
System.out.println("Throughput: " + Math.round(messageCount/(total/1000.0)) + " Msg/Seg");
66-
Assertions.assertThat(microsPerMessage).isLessThan(700);
60+
assertMessageThroughput(end - init, messageCount, 200);
6761
}
6862

6963
@Test
70-
public void shouldSendBatchInOptimalTime() throws InterruptedException {
71-
final Flux<Command<DummyMessage>> messages = createMessages(messageCount/4);
72-
final Mono<Void> target = gateway.sendCommands(messages, appName)
73-
.then().doOnSuccess(_v -> semaphore.release());
74-
75-
final Flux<Command<DummyMessage>> messages2 = createMessages(messageCount/4);
76-
final Mono<Void> target2 = gateway.sendCommands(messages2, appName)
77-
.then().doOnSuccess(_v -> semaphore.release());
78-
79-
final Flux<Command<DummyMessage>> messages3 = createMessages(messageCount/4);
80-
final Mono<Void> target3 = gateway.sendCommands(messages3, appName)
81-
.then().doOnSuccess(_v -> semaphore.release());
64+
public void shouldSendBatchInOptimalTime4Channels() throws InterruptedException {
65+
shouldSendBatchInOptimalTimeNChannels(4);
66+
}
8267

83-
final Flux<Command<DummyMessage>> messages4 = createMessages(messageCount/4);
84-
final Mono<Void> target4 = gateway.sendCommands(messages4, appName)
85-
.then().doOnSuccess(_v -> semaphore.release());
68+
@Test
69+
public void shouldSendBatchInOptimalTime2Channels() throws InterruptedException {
70+
shouldSendBatchInOptimalTimeNChannels(2);
71+
}
8672

73+
@Test
74+
public void shouldSendBatchInOptimalTime1Channel() throws InterruptedException {
75+
shouldSendBatchInOptimalTimeNChannels(1);
76+
}
8777

78+
private void shouldSendBatchInOptimalTimeNChannels(int channels) throws InterruptedException {
79+
List<Mono<Void>> subs = new ArrayList<>(channels);
80+
for (int i = 0; i < channels; ++i) {
81+
final Flux<Command<DummyMessage>> messages = createMessages(messageCount/channels);
82+
final Mono<Void> target = gateway.sendCommands(messages, appName).then().doOnSuccess(_v -> semaphore.release());
83+
subs.add(target);
84+
}
8885

8986
final long init = System.currentTimeMillis();
90-
target.subscribe();
91-
target2.subscribe();
92-
target3.subscribe();
93-
target4.subscribe();
87+
subs.forEach(Mono::subscribe);
9488
System.out.println("Wait for publish");
95-
semaphore.acquire(4);
89+
semaphore.acquire(channels);
9690
final long end = System.currentTimeMillis();
9791

9892
final long total = end - init;
99-
final double microsPerMessage = ((total+0.0)/messageCount)*1000;
100-
System.out.println("Message count: " + messageCount);
101-
System.out.println("Total Execution Time: " + total + "ms");
102-
System.out.println("Microseconds per message: " + microsPerMessage + "us");
103-
System.out.println("Throughput: " + Math.round(messageCount/(total/1000.0)) + " Msg/Seg");
104-
Assertions.assertThat(microsPerMessage).isLessThan(700);
93+
assertMessageThroughput(total, messageCount, 230);
10594
}
10695

107-
@Test
108-
public void shouldSendBatchNoConfirmInOptimalTime() throws InterruptedException {
109-
final Flux<Command<DummyMessage>> messages = createMessages(messageCount);
110-
final Mono<Void> target = gateway.sendCommandsNoConfirm(messages, appName)
111-
.doOnSuccess(_v -> semaphore.release());
112-
113-
114-
final long init = System.currentTimeMillis();
115-
target.subscribe();
116-
semaphore.acquire(1);
117-
Thread.sleep(20);
118-
final long end = System.currentTimeMillis();
119-
120-
final long total = end - init;
96+
private void assertMessageThroughput(long total, long messageCount, int reqMicrosPerMessage) {
12197
final double microsPerMessage = ((total+0.0)/messageCount)*1000;
12298
System.out.println("Message count: " + messageCount);
12399
System.out.println("Total Execution Time: " + total + "ms");
124100
System.out.println("Microseconds per message: " + microsPerMessage + "us");
125101
System.out.println("Throughput: " + Math.round(messageCount/(total/1000.0)) + " Msg/Seg");
126-
Assertions.assertThat(microsPerMessage).isLessThan(700);
102+
Assertions.assertThat(microsPerMessage).isLessThan(reqMicrosPerMessage);
127103
}
128104

129105
private Flux<Command<DummyMessage>> createMessages(int count) {
130106
final List<Command<DummyMessage>> commands = IntStream.range(0, count).mapToObj(value -> new Command<>(COMMAND_NAME, UUID.randomUUID().toString(), new DummyMessage())).collect(Collectors.toList());
131107
return Flux.fromIterable(commands);
132108
}
133109

134-
135-
136110
@SpringBootApplication
137111
@EnableDirectAsyncGateway
138112
static class App{
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
package org.reactivecommons.test;
2+
3+
import org.assertj.core.api.Assertions;
4+
import org.junit.Test;
5+
import org.junit.runner.RunWith;
6+
import org.reactivecommons.api.domain.Command;
7+
import org.reactivecommons.async.api.AsyncQuery;
8+
import org.reactivecommons.async.api.DirectAsyncGateway;
9+
import org.reactivecommons.async.api.HandlerRegistry;
10+
import org.reactivecommons.async.impl.config.annotations.EnableDirectAsyncGateway;
11+
import org.reactivecommons.async.impl.config.annotations.EnableMessageListeners;
12+
import org.springframework.beans.factory.annotation.Autowired;
13+
import org.springframework.beans.factory.annotation.Value;
14+
import org.springframework.boot.SpringApplication;
15+
import org.springframework.boot.autoconfigure.SpringBootApplication;
16+
import org.springframework.boot.test.context.SpringBootTest;
17+
import org.springframework.context.annotation.Bean;
18+
import org.springframework.test.context.junit4.SpringRunner;
19+
import reactor.core.publisher.Flux;
20+
import reactor.core.publisher.Mono;
21+
22+
import java.util.List;
23+
import java.util.Map;
24+
import java.util.UUID;
25+
import java.util.concurrent.CountDownLatch;
26+
import java.util.concurrent.Semaphore;
27+
import java.util.concurrent.atomic.AtomicLong;
28+
import java.util.stream.Collectors;
29+
import java.util.stream.IntStream;
30+
31+
import static reactor.core.publisher.Flux.range;
32+
33+
@SpringBootTest
34+
@RunWith(SpringRunner.class)
35+
public class QueryProcessPerfTest {
36+
37+
private static final String QUERY_NAME = "app.command.test";
38+
private static final int messageCount = 40000;
39+
private static final Semaphore semaphore = new Semaphore(0);
40+
private static final AtomicLong atomicLong = new AtomicLong(0);
41+
private static final CountDownLatch latch = new CountDownLatch(12 + 1);
42+
43+
@Autowired
44+
private DirectAsyncGateway gateway;
45+
46+
@Value("${spring.application.name}")
47+
private String appName;
48+
49+
50+
@Test
51+
public void serveQueryPerformanceTest() throws InterruptedException {
52+
final Flux<AsyncQuery<DummyMessage>> messages = createMessages(messageCount);
53+
54+
final long init = System.currentTimeMillis();
55+
messages
56+
.flatMap(dummyMessageAsyncQuery -> gateway.requestReply(dummyMessageAsyncQuery, appName, DummyMessage.class)
57+
.doOnNext(s -> semaphore.release())
58+
)
59+
.subscribe();
60+
semaphore.acquire(messageCount);
61+
final long end = System.currentTimeMillis();
62+
63+
final long total = end - init;
64+
assertMessageThroughput(total, messageCount, 200);
65+
}
66+
67+
private void assertMessageThroughput(long total, long messageCount, int reqMicrosPerMessage) {
68+
final double microsPerMessage = ((total+0.0)/messageCount)*1000;
69+
System.out.println("Message count: " + messageCount);
70+
System.out.println("Total Execution Time: " + total + "ms");
71+
System.out.println("Microseconds per message: " + microsPerMessage + "us");
72+
System.out.println("Throughput: " + Math.round(messageCount/(total/1000.0)) + " Msg/Seg");
73+
Assertions.assertThat(microsPerMessage).isLessThan(reqMicrosPerMessage);
74+
}
75+
76+
77+
private Flux<AsyncQuery<DummyMessage>> createMessages(int count) {
78+
final List<AsyncQuery<DummyMessage>> queryList = IntStream.range(0, count).mapToObj(_v -> new AsyncQuery<>(QUERY_NAME, new DummyMessage())).collect(Collectors.toList());
79+
return Flux.fromIterable(queryList);
80+
}
81+
82+
83+
@SpringBootApplication
84+
@EnableDirectAsyncGateway
85+
@EnableMessageListeners
86+
static class App{
87+
public static void main(String[] args) {
88+
SpringApplication.run(App.class, args);
89+
}
90+
91+
@Bean
92+
public HandlerRegistry registry() {
93+
final HandlerRegistry registry = range(0, 20).reduce(HandlerRegistry.register(), (r, i) -> r.handleCommand("app.command.name" + i, message -> Mono.empty(), Map.class)).block();
94+
return registry
95+
.serveQuery(QUERY_NAME, this::handleSimple, DummyMessage.class);
96+
}
97+
98+
private Mono<DummyMessage> handleSimple(DummyMessage message) {
99+
message.setAge(message.getAge() + 12);
100+
return Mono.just(message);
101+
}
102+
103+
104+
}
105+
}

async/async-commons-starter/src/main/java/org/reactivecommons/async/impl/config/DirectAsyncGatewayConfig.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import org.reactivecommons.async.impl.converters.MessageConverter;
99
import org.reactivecommons.async.impl.listeners.ApplicationReplyListener;
1010
import org.reactivecommons.async.impl.reply.ReactiveReplyRouter;
11+
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
1112
import org.springframework.context.annotation.Bean;
1213
import org.springframework.context.annotation.Configuration;
1314
import org.springframework.context.annotation.Import;
@@ -32,12 +33,6 @@ public ApplicationReplyListener msgListener(ReactiveReplyRouter router, BrokerCo
3233
}
3334

3435

35-
@Bean
36-
public BrokerConfig brokerConfig() {
37-
return new BrokerConfig();
38-
}
39-
40-
4136
@Bean
4237
public ReactiveReplyRouter router() {
4338
return new ReactiveReplyRouter();

async/async-commons-starter/src/main/java/org/reactivecommons/async/impl/config/EventBusConfig.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,9 @@
1515
public class EventBusConfig {
1616

1717
@Bean
18-
public DomainEventBus domainEventBus(ReactiveMessageSender sender, BrokerConfigProps props) {
18+
public DomainEventBus domainEventBus(ReactiveMessageSender sender, BrokerConfigProps props, BrokerConfig config) {
1919
final String exchangeName = props.getDomainEventsExchangeName();
2020
sender.getTopologyCreator().declare(exchange(exchangeName).durable(true).type("topic")).subscribe();
21-
return new RabbitDomainEventBus(sender, exchangeName);
21+
return new RabbitDomainEventBus(sender, exchangeName, config);
2222
}
2323
}

async/async-commons-starter/src/main/java/org/reactivecommons/async/impl/config/RabbitMqConfig.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,12 @@ public ReactiveMessageListener messageListener(ConnectionFactoryProvider provide
100100
asyncProps.getPrefetchCount());
101101
}
102102

103+
@Bean
104+
@ConditionalOnMissingBean
105+
public BrokerConfig brokerConfig() {
106+
return new BrokerConfig();
107+
}
108+
103109
@Bean
104110
@ConditionalOnMissingBean
105111
public ConnectionFactoryProvider rabbitRConnectionFactory(RabbitProperties properties) {

async/async-commons-starter/src/test/java/org/reactivecommons/async/impl/config/integration/EventBusConfigTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ public class EventBusConfigTest {
3333
public void domainEventBus() {
3434
final Mono<Void> mono = from(eventBus.emit(new DomainEvent<>("test1", "23", 11)));
3535
mono.block();
36-
verify(sender).sendWithConfirm(any(), eq("domainEvents"), anyString(), any());
36+
verify(sender).sendWithConfirm(any(), eq("domainEvents"), anyString(), any(), true);
3737
}
3838

3939
@Import(EventBusConfig.class)

async/async-commons/src/main/java/org/reactivecommons/async/impl/RabbitDirectAsyncGateway.java

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
import org.reactivecommons.async.impl.reply.ReactiveReplyRouter;
1010
import reactor.core.publisher.Flux;
1111
import reactor.core.publisher.Mono;
12-
import reactor.core.publisher.UnicastProcessor;
1312
import reactor.rabbitmq.OutboundMessageResult;
1413

1514
import java.time.Duration;
@@ -28,6 +27,9 @@ public class RabbitDirectAsyncGateway implements DirectAsyncGateway {
2827
private final ReactiveMessageSender sender;
2928
private final String exchange;
3029
private final MessageConverter converter;
30+
private final boolean persistentCommands;
31+
private final boolean persistentQueries;
32+
private final Duration replyTimeout;
3133

3234

3335
public RabbitDirectAsyncGateway(BrokerConfig config, ReactiveReplyRouter router, ReactiveMessageSender sender, String exchange, MessageConverter converter) {
@@ -36,36 +38,35 @@ public RabbitDirectAsyncGateway(BrokerConfig config, ReactiveReplyRouter router,
3638
this.sender = sender;
3739
this.exchange = exchange;
3840
this.converter = converter;
39-
41+
this.persistentCommands = config.isPersistentCommands();
42+
this.persistentQueries = config.isPersistentQueries();
43+
this.replyTimeout = config.getReplyTimeout();
4044
}
4145

4246
@Override
4347
public <T> Mono<Void> sendCommand(Command<T> command, String targetName) {
44-
return sender.sendWithConfirm(command, exchange, targetName, Collections.emptyMap());
48+
return sender.sendWithConfirm(command, exchange, targetName, Collections.emptyMap(), persistentCommands);
4549
}
4650

4751
public <T> Flux<OutboundMessageResult> sendCommands(Flux<Command<T>> commands, String targetName) {
48-
return sender.sendWithConfirm2(commands, exchange, targetName, Collections.emptyMap());
52+
return sender.sendWithConfirmBatch(commands, exchange, targetName, Collections.emptyMap(), persistentCommands);
4953
}
5054

51-
public <T> Mono<Void> sendCommandsNoConfirm(Flux<Command<T>> commands, String targetName) {
52-
return sender.sendNoConfirm(commands, exchange, targetName, Collections.emptyMap());
53-
}
5455

5556
@Override
5657
public <T, R> Mono<R> requestReply(AsyncQuery<T> query, String targetName, Class<R> type) {
5758
final String correlationID = UUID.randomUUID().toString().replaceAll("-", "");
5859

5960
final Mono<R> replyHolder = router.register(correlationID)
60-
.timeout(Duration.ofSeconds(15))
61-
.flatMap(s -> fromCallable(() -> (R)converter.readValue(s, type)));
61+
.timeout(replyTimeout)
62+
.flatMap(s -> fromCallable(() -> converter.readValue(s, type)));
6263

6364
Map<String, Object> headers = new HashMap<>();
6465
headers.put(REPLY_ID, config.getRoutingKey());
6566
headers.put(SERVED_QUERY_ID, query.getResource());
6667
headers.put(CORRELATION_ID, correlationID);
6768

68-
return sender.sendWithConfirm(query, exchange, targetName + ".query", headers).then(replyHolder);
69+
return sender.sendNoConfirm(query, exchange, targetName + ".query", headers, persistentQueries).then(replyHolder);
6970
}
7071

7172
}

async/async-commons/src/main/java/org/reactivecommons/async/impl/RabbitDomainEventBus.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package org.reactivecommons.async.impl;
22

33
import org.reactivecommons.async.impl.communications.ReactiveMessageSender;
4+
import org.reactivecommons.async.impl.config.BrokerConfig;
45
import reactor.core.publisher.Mono;
56
import org.reactivecommons.api.domain.DomainEvent;
67
import org.reactivecommons.api.domain.DomainEventBus;
@@ -11,15 +12,21 @@ public class RabbitDomainEventBus implements DomainEventBus {
1112

1213
private final ReactiveMessageSender sender;
1314
private final String exchange;
15+
private final boolean persistentEvents;
1416

1517
public RabbitDomainEventBus(ReactiveMessageSender sender, String exchange) {
18+
this(sender, exchange, new BrokerConfig());
19+
}
20+
21+
public RabbitDomainEventBus(ReactiveMessageSender sender, String exchange, BrokerConfig config) {
1622
this.sender = sender;
1723
this.exchange = exchange;
24+
persistentEvents = config.isPersistentEvents();
1825
}
1926

2027
@Override
2128
public <T> Mono<Void> emit(DomainEvent<T> event) {
22-
return sender.sendWithConfirm(event, exchange, event.getName(), Collections.emptyMap())
29+
return sender.sendWithConfirm(event, exchange, event.getName(), Collections.emptyMap(), persistentEvents)
2330
.onErrorMap(err -> new RuntimeException("Event send failure: " + event.getName(), err));
2431
}
2532

0 commit comments

Comments
 (0)