Skip to content

Commit 56c1bd3

Browse files
author
Daniel Bustamante Ospina
committed
enhance message processing performance
1 parent 395e950 commit 56c1bd3

File tree

7 files changed

+462
-5
lines changed

7 files changed

+462
-5
lines changed

acceptance/async-tests/src/test/java/org/reactivecommons/test/DirectAsyncGatewayTest.java renamed to acceptance/async-tests/src/test/java/org/reactivecommons/test/SimpleDirectCommunicationTest.java

Lines changed: 41 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,11 @@
22

33
import org.junit.Test;
44
import org.junit.runner.RunWith;
5+
import org.reactivecommons.api.domain.Command;
56
import org.reactivecommons.async.api.AsyncQuery;
67
import org.reactivecommons.async.api.DirectAsyncGateway;
78
import org.reactivecommons.async.api.HandlerRegistry;
9+
import org.reactivecommons.async.api.handlers.CommandHandler;
810
import org.reactivecommons.async.impl.config.annotations.EnableDirectAsyncGateway;
911
import org.reactivecommons.async.impl.config.annotations.EnableMessageListeners;
1012
import org.springframework.beans.factory.annotation.Autowired;
@@ -15,22 +17,44 @@
1517
import org.springframework.context.annotation.Bean;
1618
import org.springframework.test.context.junit4.SpringRunner;
1719
import reactor.core.publisher.Mono;
20+
import reactor.core.publisher.UnicastProcessor;
1821
import reactor.test.StepVerifier;
1922

2023
import java.time.Duration;
24+
import java.util.concurrent.ThreadLocalRandom;
2125

26+
import static org.assertj.core.api.Assertions.assertThat;
2227
import static reactor.core.publisher.Mono.*;
2328

2429
@SpringBootTest
2530
@RunWith(SpringRunner.class)
26-
public class DirectAsyncGatewayTest {
31+
public class SimpleDirectCommunicationTest {
32+
33+
private static final String COMMAND_NAME = "simpleTestCommand";
2734

2835
@Autowired
2936
private DirectAsyncGateway gateway;
3037

3138
@Value("${spring.application.name}")
3239
private String appName;
3340

41+
@Autowired
42+
private UnicastProcessor<Command<Long>> listener;
43+
44+
private String commandId = ThreadLocalRandom.current().nextInt() + "";
45+
private Long data = ThreadLocalRandom.current().nextLong();
46+
47+
@Test
48+
public void commandShouldArrive() {
49+
Command<Long> command = new Command<>(COMMAND_NAME, commandId, data);
50+
gateway.sendCommand(command, appName).subscribe();
51+
52+
StepVerifier.create(listener.next()).assertNext(cmd -> {
53+
assertThat(cmd).extracting(Command::getCommandId, Command::getData, Command::getName)
54+
.containsExactly(commandId, data, COMMAND_NAME);
55+
}).verifyComplete();
56+
}
57+
3458
@Test
3559
public void shouldReceiveResponse() {
3660
final Mono<Integer> reply = gateway.requestReply(new AsyncQuery<>("double", 42), appName, Integer.class);
@@ -49,8 +73,22 @@ public static void main(String[] args) {
4973
}
5074

5175
@Bean
52-
public HandlerRegistry registry() {
53-
return HandlerRegistry.register().serveQuery("double", rqt -> just(rqt*2), Long.class);
76+
public HandlerRegistry registry(UnicastProcessor<Command<Long>> listener) {
77+
return HandlerRegistry.register()
78+
.serveQuery("double", rqt -> just(rqt*2), Long.class)
79+
.handleCommand(COMMAND_NAME, handle(listener), Long.class);
80+
}
81+
82+
@Bean
83+
public UnicastProcessor<Command<Long>> listener() {
84+
return UnicastProcessor.create();
85+
}
86+
87+
private CommandHandler<Long> handle(UnicastProcessor<Command<Long>> listener) {
88+
return command -> {
89+
listener.onNext(command);
90+
return empty();
91+
};
5492
}
5593
}
5694
}
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
package org.reactivecommons.test;
2+
3+
import org.junit.Test;
4+
import org.junit.runner.RunWith;
5+
import org.reactivecommons.api.domain.DomainEvent;
6+
import org.reactivecommons.api.domain.DomainEventBus;
7+
import org.reactivecommons.async.api.HandlerRegistry;
8+
import org.reactivecommons.async.api.handlers.EventHandler;
9+
import org.reactivecommons.async.impl.config.annotations.EnableDomainEventBus;
10+
import org.reactivecommons.async.impl.config.annotations.EnableMessageListeners;
11+
import org.springframework.beans.factory.annotation.Autowired;
12+
import org.springframework.boot.SpringApplication;
13+
import org.springframework.boot.autoconfigure.SpringBootApplication;
14+
import org.springframework.boot.test.context.SpringBootTest;
15+
import org.springframework.context.annotation.Bean;
16+
import org.springframework.test.context.junit4.SpringRunner;
17+
import reactor.core.publisher.UnicastProcessor;
18+
import reactor.test.StepVerifier;
19+
20+
import java.util.concurrent.ThreadLocalRandom;
21+
import java.util.concurrent.TimeUnit;
22+
23+
import static org.assertj.core.api.Assertions.assertThat;
24+
import static reactor.core.publisher.Mono.*;
25+
26+
@SpringBootTest
27+
@RunWith(SpringRunner.class)
28+
public class SimpleEventNotificationTest {
29+
30+
private static final String EVENT_NAME = "simpleTestEvent";
31+
32+
@Autowired
33+
private DomainEventBus eventBus;
34+
35+
@Autowired
36+
private UnicastProcessor<DomainEvent<Long>> listener;
37+
38+
private String eventId = ThreadLocalRandom.current().nextInt() + "";
39+
private Long data = ThreadLocalRandom.current().nextLong();
40+
41+
@Test
42+
public void shouldReceiveEvent() throws InterruptedException {
43+
DomainEvent<?> event = new DomainEvent<>(EVENT_NAME, eventId, data);
44+
from(eventBus.emit(event)).subscribe();
45+
StepVerifier.create(listener.take(1)).assertNext(evt ->
46+
assertThat(evt).extracting(DomainEvent::getName, DomainEvent::getEventId, DomainEvent::getData)
47+
.containsExactly(EVENT_NAME, eventId, data)
48+
).verifyComplete();
49+
}
50+
51+
52+
53+
@SpringBootApplication
54+
@EnableDomainEventBus
55+
@EnableMessageListeners
56+
static class App{
57+
public static void main(String[] args) {
58+
SpringApplication.run(App.class, args);
59+
}
60+
61+
@Bean
62+
public HandlerRegistry registry(UnicastProcessor<DomainEvent<Long>> listener) {
63+
return HandlerRegistry.register()
64+
.serveQuery("double", rqt -> just(rqt*2), Long.class)
65+
.listenEvent(EVENT_NAME, handle(listener), Long.class);
66+
}
67+
68+
@Bean
69+
public UnicastProcessor<DomainEvent<Long>> listener() {
70+
return UnicastProcessor.create();
71+
}
72+
73+
private EventHandler<Long> handle(UnicastProcessor<DomainEvent<Long>> listener) {
74+
return command -> {
75+
listener.onNext(command);
76+
return empty();
77+
};
78+
}
79+
}
80+
}
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
package org.reactivecommons.test.perf;
2+
3+
import org.junit.Test;
4+
import org.junit.runner.RunWith;
5+
import org.reactivecommons.api.domain.Command;
6+
import org.reactivecommons.async.api.DirectAsyncGateway;
7+
import org.reactivecommons.async.api.HandlerRegistry;
8+
import org.reactivecommons.async.api.handlers.CommandHandler;
9+
import org.reactivecommons.async.impl.config.annotations.EnableDirectAsyncGateway;
10+
import org.reactivecommons.async.impl.config.annotations.EnableMessageListeners;
11+
import org.springframework.beans.factory.annotation.Autowired;
12+
import org.springframework.beans.factory.annotation.Value;
13+
import org.springframework.boot.SpringApplication;
14+
import org.springframework.boot.autoconfigure.SpringBootApplication;
15+
import org.springframework.boot.test.context.SpringBootTest;
16+
import org.springframework.context.annotation.Bean;
17+
import org.springframework.test.context.junit4.SpringRunner;
18+
import reactor.core.publisher.Flux;
19+
import reactor.core.publisher.Mono;
20+
import reactor.core.publisher.UnicastProcessor;
21+
import reactor.core.scheduler.Schedulers;
22+
import reactor.test.StepVerifier;
23+
24+
import java.time.Duration;
25+
import java.util.concurrent.ThreadLocalRandom;
26+
import java.util.concurrent.TimeUnit;
27+
28+
import static java.lang.System.out;
29+
import static org.assertj.core.api.Assertions.assertThat;
30+
import static reactor.core.publisher.Mono.*;
31+
32+
@SpringBootTest
33+
@RunWith(SpringRunner.class)
34+
public class BlockingCommandHandlePerfTest {
35+
36+
private static final String COMMAND_NAME = "simpleTestCommand1";
37+
38+
@Autowired
39+
private DirectAsyncGateway gateway;
40+
41+
@Value("${spring.application.name}")
42+
private String appName;
43+
44+
@Autowired
45+
private UnicastProcessor<Command<Long>> listener;
46+
47+
private String commandId = ThreadLocalRandom.current().nextInt() + "";
48+
private Long data = ThreadLocalRandom.current().nextLong();
49+
50+
@Test
51+
public void commandShouldBeHandledInParallel() throws InterruptedException {
52+
Flux.range(0, 12).flatMap(i -> {
53+
Command<Long> command = new Command<>(COMMAND_NAME, commandId+1, data+1);
54+
return gateway.sendCommand(command, appName);
55+
}).subscribe();
56+
57+
final long init = System.currentTimeMillis();
58+
59+
final Flux<Command<Long>> results = listener.take(12).collectList()
60+
.timeout(Duration.ofMillis(1500))
61+
.flatMapMany(Flux::fromIterable);
62+
63+
StepVerifier.create(results).assertNext(cmd -> {
64+
assertThat(cmd.getName()).isEqualTo(COMMAND_NAME);
65+
})
66+
.expectNextCount(11)
67+
.verifyComplete();
68+
69+
final long total = System.currentTimeMillis() - init;
70+
out.println("Test duration: " +total);
71+
assertThat(total).isLessThan(1500L);
72+
73+
//Give some time to finish messages ack
74+
Thread.sleep(350);
75+
}
76+
77+
78+
79+
@SpringBootApplication
80+
@EnableDirectAsyncGateway
81+
@EnableMessageListeners
82+
static class App{
83+
public static void main(String[] args) {
84+
SpringApplication.run(App.class, args);
85+
}
86+
87+
@Bean
88+
public HandlerRegistry registry(UnicastProcessor<Command<Long>> listener) {
89+
return HandlerRegistry.register()
90+
.handleCommand(COMMAND_NAME, handle(listener), Long.class);
91+
}
92+
93+
@Bean
94+
public UnicastProcessor<Command<Long>> listener() {
95+
return UnicastProcessor.create();
96+
}
97+
98+
private CommandHandler<Long> handle(UnicastProcessor<Command<Long>> listener) {
99+
return command -> {
100+
return fromRunnable(() -> {
101+
// out.println("Received at: " + System.currentTimeMillis()/1000);
102+
try {
103+
// out.println("internal: " + Thread.currentThread().getName());
104+
TimeUnit.MILLISECONDS.sleep(750);
105+
// out.println("Handled at: " + System.currentTimeMillis()/1000);
106+
listener.onNext(command);
107+
} catch (InterruptedException e) {
108+
}
109+
listener.onNext(command);
110+
});
111+
};
112+
}
113+
}
114+
}
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
package org.reactivecommons.test.perf;
2+
3+
import org.junit.Test;
4+
import org.junit.runner.RunWith;
5+
import org.reactivecommons.api.domain.Command;
6+
import org.reactivecommons.async.api.DirectAsyncGateway;
7+
import org.reactivecommons.async.api.HandlerRegistry;
8+
import org.reactivecommons.async.api.handlers.CommandHandler;
9+
import org.reactivecommons.async.impl.config.annotations.EnableDirectAsyncGateway;
10+
import org.reactivecommons.async.impl.config.annotations.EnableMessageListeners;
11+
import org.springframework.beans.factory.annotation.Autowired;
12+
import org.springframework.beans.factory.annotation.Value;
13+
import org.springframework.boot.SpringApplication;
14+
import org.springframework.boot.autoconfigure.SpringBootApplication;
15+
import org.springframework.boot.test.context.SpringBootTest;
16+
import org.springframework.context.annotation.Bean;
17+
import org.springframework.test.context.junit4.SpringRunner;
18+
import reactor.core.publisher.Flux;
19+
import reactor.core.publisher.UnicastProcessor;
20+
import reactor.test.StepVerifier;
21+
22+
import java.time.Duration;
23+
import java.util.List;
24+
import java.util.concurrent.ThreadLocalRandom;
25+
import java.util.concurrent.TimeUnit;
26+
27+
import static java.lang.System.out;
28+
import static org.assertj.core.api.Assertions.assertThat;
29+
import static reactor.core.publisher.Mono.empty;
30+
import static reactor.core.publisher.Mono.fromRunnable;
31+
32+
@SpringBootTest
33+
@RunWith(SpringRunner.class)
34+
public class ParallelOnBlockingInSubscriptionTimeTest {
35+
36+
private static final String COMMAND_NAME = "simpleTestCommand2";
37+
38+
@Autowired
39+
private DirectAsyncGateway gateway;
40+
41+
@Value("${spring.application.name}")
42+
private String appName;
43+
44+
@Autowired
45+
private UnicastProcessor<Command<Long>> listener;
46+
47+
private String commandId = ThreadLocalRandom.current().nextInt() + "";
48+
private Long data = ThreadLocalRandom.current().nextLong();
49+
50+
@Test
51+
public void commandShouldBeHandledInParallel() throws InterruptedException {
52+
Flux.range(0, 12).flatMap(i -> {
53+
Command<Long> command = new Command<>(COMMAND_NAME, commandId+1, data+1);
54+
return gateway.sendCommand(command, appName);
55+
}).subscribe();
56+
57+
final long init = System.currentTimeMillis();
58+
59+
final Flux<Command<Long>> results = listener.take(12).collectList()
60+
.timeout(Duration.ofMillis(1500))
61+
.flatMapMany(Flux::fromIterable);
62+
63+
StepVerifier.create(results).assertNext(cmd -> {
64+
assertThat(cmd.getName()).isEqualTo(COMMAND_NAME);
65+
})
66+
.expectNextCount(11)
67+
.verifyComplete();
68+
69+
final long total = System.currentTimeMillis() - init;
70+
out.println("Test duration: " +total);
71+
assertThat(total).isLessThan(1500);
72+
73+
//Give some time to finish messages ack
74+
Thread.sleep(350);
75+
}
76+
77+
78+
79+
@SpringBootApplication
80+
@EnableDirectAsyncGateway
81+
@EnableMessageListeners
82+
static class App{
83+
public static void main(String[] args) {
84+
SpringApplication.run(App.class, args);
85+
}
86+
87+
@Bean
88+
public HandlerRegistry registry(UnicastProcessor<Command<Long>> listener) {
89+
return HandlerRegistry.register()
90+
.handleCommand(COMMAND_NAME, handle(listener), Long.class);
91+
}
92+
93+
@Bean
94+
public UnicastProcessor<Command<Long>> listener() {
95+
return UnicastProcessor.create();
96+
}
97+
98+
private CommandHandler<Long> handle(UnicastProcessor<Command<Long>> listener) {
99+
return command -> {
100+
// out.println("Received at: " + System.currentTimeMillis()/1000);
101+
try {
102+
// out.println("internal: " + Thread.currentThread().getName());
103+
TimeUnit.MILLISECONDS.sleep(750);
104+
// out.println("Handled at: " + System.currentTimeMillis()/1000);
105+
listener.onNext(command);
106+
} catch (InterruptedException e) {
107+
}
108+
listener.onNext(command);
109+
return empty();
110+
};
111+
}
112+
}
113+
}

0 commit comments

Comments
 (0)