Skip to content

Commit 395e950

Browse files
author
Daniel Bustamante Ospina
committed
Add DynamicRegistry in order to handle new events in runtime and DynamicRegistryTest to demonstrate his usage
Fixes: gh-9
1 parent ed89cef commit 395e950

File tree

20 files changed

+467
-120
lines changed

20 files changed

+467
-120
lines changed
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
package org.reactivecommons.test;
2+
3+
import org.junit.Test;
4+
import org.junit.runner.RunWith;
5+
import org.mockito.Mockito;
6+
import org.reactivecommons.api.domain.DomainEvent;
7+
import org.reactivecommons.api.domain.DomainEventBus;
8+
import org.reactivecommons.async.api.AsyncQuery;
9+
import org.reactivecommons.async.api.DirectAsyncGateway;
10+
import org.reactivecommons.async.api.DynamicRegistry;
11+
import org.reactivecommons.async.api.HandlerRegistry;
12+
import org.reactivecommons.async.api.handlers.EventHandler;
13+
import org.reactivecommons.async.impl.config.annotations.EnableDirectAsyncGateway;
14+
import org.reactivecommons.async.impl.config.annotations.EnableDomainEventBus;
15+
import org.reactivecommons.async.impl.config.annotations.EnableMessageListeners;
16+
import org.reactivestreams.Publisher;
17+
import org.springframework.beans.factory.annotation.Autowired;
18+
import org.springframework.beans.factory.annotation.Value;
19+
import org.springframework.boot.SpringApplication;
20+
import org.springframework.boot.autoconfigure.SpringBootApplication;
21+
import org.springframework.boot.test.context.SpringBootTest;
22+
import org.springframework.context.annotation.Bean;
23+
import org.springframework.test.context.junit4.SpringRunner;
24+
import reactor.core.publisher.Mono;
25+
import reactor.core.publisher.UnicastProcessor;
26+
import reactor.test.StepVerifier;
27+
28+
import java.time.Duration;
29+
30+
import static reactor.core.publisher.Mono.*;
31+
32+
@SpringBootTest
33+
@RunWith(SpringRunner.class)
34+
public class DynamicRegistryTest {
35+
36+
@Autowired
37+
private DomainEventBus eventBus;
38+
39+
@Autowired
40+
private DynamicRegistry dynamicRegistry;
41+
42+
@Value("${spring.application.name}")
43+
private String appName;
44+
45+
@Test
46+
public void shouldReceiveResponse() {
47+
UnicastProcessor<String> result = UnicastProcessor.create();
48+
EventHandler<String> fn = message -> fromRunnable(() -> result.onNext(message.getData()));
49+
50+
dynamicRegistry.listenEvent("test.event", fn, String.class).block();
51+
final Publisher<Void> emit = eventBus.emit(new DomainEvent<>("test.event", "42", "Hello"));
52+
from(emit).block();
53+
54+
StepVerifier.create(result.next().timeout(Duration.ofSeconds(10)))
55+
.expectNext("Hello")
56+
.verifyComplete();
57+
58+
59+
}
60+
61+
62+
@SpringBootApplication
63+
@EnableMessageListeners
64+
@EnableDomainEventBus
65+
static class App{
66+
public static void main(String[] args) {
67+
SpringApplication.run(App.class, args);
68+
}
69+
70+
}
71+
}
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
spring.application.name=testApp
1+
spring.application.name=test-app
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package org.reactivecommons.async.api;
2+
3+
import org.reactivecommons.async.api.handlers.EventHandler;
4+
import reactor.core.publisher.Mono;
5+
6+
public interface DynamicRegistry {
7+
<T> Mono<Void> listenEvent(String eventName, EventHandler<T> fn, Class<T> eventClass);
8+
}
Lines changed: 6 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,42 +1,32 @@
11
package org.reactivecommons.async.impl.config;
22

3-
import lombok.AllArgsConstructor;
4-
import lombok.NoArgsConstructor;
3+
import lombok.RequiredArgsConstructor;
54
import org.reactivecommons.async.impl.RabbitDirectAsyncGateway;
65
import org.reactivecommons.async.impl.communications.ReactiveMessageListener;
76
import org.reactivecommons.async.impl.communications.ReactiveMessageSender;
7+
import org.reactivecommons.async.impl.config.props.BrokerConfigProps;
88
import org.reactivecommons.async.impl.converters.MessageConverter;
99
import org.reactivecommons.async.impl.listeners.ApplicationReplyListener;
1010
import org.reactivecommons.async.impl.reply.ReactiveReplyRouter;
11-
import org.springframework.beans.factory.annotation.Value;
1211
import org.springframework.context.annotation.Bean;
1312
import org.springframework.context.annotation.Configuration;
1413
import org.springframework.context.annotation.Import;
15-
import org.springframework.util.Base64Utils;
16-
17-
import java.nio.ByteBuffer;
18-
import java.util.UUID;
1914

2015
@Configuration
2116
@Import(RabbitMqConfig.class)
22-
@AllArgsConstructor
23-
@NoArgsConstructor
17+
@RequiredArgsConstructor
2418
public class DirectAsyncGatewayConfig {
2519

26-
@Value("${app.async.direct.exchange:directMessages}")
27-
private String directMessagesExchangeName;
28-
29-
@Value("${spring.application.name}")
30-
private String appName;
20+
private final BrokerConfigProps props;
3121

3222
@Bean
3323
public RabbitDirectAsyncGateway rabbitDirectAsyncGateway(BrokerConfig config, ReactiveReplyRouter router, ReactiveMessageSender rSender, MessageConverter converter) throws Exception {
34-
return new RabbitDirectAsyncGateway(config, router, rSender, directMessagesExchangeName, converter);
24+
return new RabbitDirectAsyncGateway(config, router, rSender, props.getDirectMessagesExchangeName(), converter);
3525
}
3626

3727
@Bean
3828
public ApplicationReplyListener msgListener(ReactiveReplyRouter router, BrokerConfig config, ReactiveMessageListener listener) {
39-
final ApplicationReplyListener replyListener = new ApplicationReplyListener(router, listener, generateName());
29+
final ApplicationReplyListener replyListener = new ApplicationReplyListener(router, listener, props.getReplyQueue());
4030
replyListener.startListening(config.getRoutingKey());
4131
return replyListener;
4232
}
@@ -53,14 +43,5 @@ public ReactiveReplyRouter router() {
5343
return new ReactiveReplyRouter();
5444
}
5545

56-
public String generateName() {
57-
UUID uuid = UUID.randomUUID();
58-
ByteBuffer bb = ByteBuffer.wrap(new byte[16]);
59-
bb.putLong(uuid.getMostSignificantBits())
60-
.putLong(uuid.getLeastSignificantBits());
61-
// Convert to base64 and remove trailing =
62-
return this.appName + Base64Utils.encodeToUrlSafeString(bb.array())
63-
.replaceAll("=", "");
64-
}
6546

6647
}

async/async-commons-starter/src/main/java/org/reactivecommons/async/impl/config/EventBusConfig.java

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import org.reactivecommons.api.domain.DomainEventBus;
44
import org.reactivecommons.async.impl.RabbitDomainEventBus;
55
import org.reactivecommons.async.impl.communications.ReactiveMessageSender;
6-
import org.springframework.beans.factory.annotation.Value;
6+
import org.reactivecommons.async.impl.config.props.BrokerConfigProps;
77
import org.springframework.context.annotation.Bean;
88
import org.springframework.context.annotation.Configuration;
99
import org.springframework.context.annotation.Import;
@@ -14,12 +14,10 @@
1414
@Import(RabbitMqConfig.class)
1515
public class EventBusConfig {
1616

17-
@Value("${app.async.domain.events.exchange:domainEvents}")
18-
private String domainEventsExchangeName;
19-
2017
@Bean
21-
public DomainEventBus domainEventBus(ReactiveMessageSender sender) {
22-
sender.getTopologyCreator().declare(exchange(domainEventsExchangeName).durable(true).type("topic")).subscribe();
23-
return new RabbitDomainEventBus(sender, domainEventsExchangeName);
18+
public DomainEventBus domainEventBus(ReactiveMessageSender sender, BrokerConfigProps props) {
19+
final String exchangeName = props.getDomainEventsExchangeName();
20+
sender.getTopologyCreator().declare(exchange(exchangeName).durable(true).type("topic")).subscribe();
21+
return new RabbitDomainEventBus(sender, exchangeName);
2422
}
2523
}

async/async-commons-starter/src/main/java/org/reactivecommons/async/impl/config/MessageConfig.java

Lines changed: 0 additions & 21 deletions
This file was deleted.

async/async-commons-starter/src/main/java/org/reactivecommons/async/impl/config/MessageListenersConfig.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,18 @@
22

33
import org.reactivecommons.async.api.DefaultCommandHandler;
44
import org.reactivecommons.async.api.DefaultQueryHandler;
5+
import org.reactivecommons.async.api.DynamicRegistry;
56
import org.reactivecommons.async.api.HandlerRegistry;
67
import org.reactivecommons.async.api.handlers.QueryHandler;
78
import org.reactivecommons.async.api.handlers.registered.RegisteredCommandHandler;
89
import org.reactivecommons.async.api.handlers.registered.RegisteredEventListener;
910
import org.reactivecommons.async.api.handlers.registered.RegisteredQueryHandler;
11+
import org.reactivecommons.async.impl.DynamicRegistryImp;
1012
import org.reactivecommons.async.impl.HandlerResolver;
1113
import org.reactivecommons.async.impl.communications.ReactiveMessageListener;
1214
import org.reactivecommons.async.impl.communications.ReactiveMessageSender;
15+
import org.reactivecommons.async.impl.communications.TopologyCreator;
16+
import org.reactivecommons.async.impl.config.props.BrokerConfigProps;
1317
import org.reactivecommons.async.impl.converters.MessageConverter;
1418
import org.reactivecommons.async.impl.listeners.ApplicationCommandListener;
1519
import org.reactivecommons.async.impl.listeners.ApplicationEventListener;
@@ -25,6 +29,7 @@
2529

2630
import java.util.Map;
2731
import java.util.concurrent.ConcurrentHashMap;
32+
import java.util.concurrent.ConcurrentMap;
2833

2934
@Configuration
3035
@Import(RabbitMqConfig.class)
@@ -60,23 +65,28 @@ public ApplicationCommandListener applicationCommandListener(ReactiveMessageList
6065
return commandListener;
6166
}
6267

68+
@Bean
69+
public DynamicRegistry dynamicRegistry(HandlerResolver resolver, ReactiveMessageListener listener, IBrokerConfigProps props) {
70+
return new DynamicRegistryImp(resolver, listener.getTopologyCreator(), props);
71+
}
72+
6373
@Bean
6474
public HandlerResolver resolver(ApplicationContext context, DefaultQueryHandler defaultHandler, Environment env, DefaultCommandHandler defaultCommandHandler) {
6575
final Map<String, HandlerRegistry> registries = context.getBeansOfType(HandlerRegistry.class);
6676

67-
final ConcurrentHashMap<String, RegisteredQueryHandler> handlers = registries
77+
final ConcurrentMap<String, RegisteredQueryHandler> handlers = registries
6878
.values().stream()
6979
.flatMap(r -> r.getHandlers().stream())
7080
.collect(ConcurrentHashMap::new, (map, handler) -> map.put(handler.getPath(), handler),
7181
ConcurrentHashMap::putAll);
7282

73-
final Map<String, RegisteredEventListener> eventListeners = registries
83+
final ConcurrentMap<String, RegisteredEventListener> eventListeners = registries
7484
.values().stream()
7585
.flatMap(r -> r.getEventListeners().stream())
7686
.collect(ConcurrentHashMap::new, (map, handler) -> map.put(handler.getPath(), handler),
7787
ConcurrentHashMap::putAll);
7888

79-
final Map<String, RegisteredCommandHandler> commandHandlers = registries
89+
final ConcurrentMap<String, RegisteredCommandHandler> commandHandlers = registries
8090
.values().stream()
8191
.flatMap(r -> r.getCommandHandlers().stream())
8292
.collect(ConcurrentHashMap::new, (map, handler) -> map.put(handler.getPath(), handler),

async/async-commons-starter/src/main/java/org/reactivecommons/async/impl/config/RabbitMqConfig.java

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,20 +2,19 @@
22

33
import com.rabbitmq.client.Connection;
44
import com.rabbitmq.client.ConnectionFactory;
5-
import lombok.AllArgsConstructor;
6-
import lombok.NoArgsConstructor;
75
import lombok.extern.java.Log;
86
import org.reactivecommons.async.impl.communications.ReactiveMessageListener;
97
import org.reactivecommons.async.impl.communications.ReactiveMessageSender;
108
import org.reactivecommons.async.impl.communications.TopologyCreator;
9+
import org.reactivecommons.async.impl.config.props.BrokerConfigProps;
1110
import org.reactivecommons.async.impl.converters.JacksonMessageConverter;
1211
import org.reactivecommons.async.impl.converters.MessageConverter;
13-
import org.springframework.beans.factory.annotation.Value;
1412
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
1513
import org.springframework.boot.context.properties.EnableConfigurationProperties;
1614
import org.springframework.boot.context.properties.PropertyMapper;
1715
import org.springframework.context.annotation.Bean;
1816
import org.springframework.context.annotation.Configuration;
17+
import org.springframework.context.annotation.Import;
1918
import reactor.core.publisher.Mono;
2019
import reactor.core.scheduler.Scheduler;
2120
import reactor.core.scheduler.Schedulers;
@@ -26,19 +25,15 @@
2625

2726
@Log
2827
@Configuration
29-
@NoArgsConstructor
30-
@AllArgsConstructor
3128
@EnableConfigurationProperties(RabbitProperties.class)
29+
@Import(BrokerConfigProps.class)
3230
public class RabbitMqConfig {
3331

34-
@Value("${spring.application.name}")
35-
private String appName;
36-
3732
@Bean
38-
public ReactiveMessageSender messageSender(ConnectionFactoryProvider provider, MessageConverter converter){
33+
public ReactiveMessageSender messageSender(ConnectionFactoryProvider provider, MessageConverter converter, BrokerConfigProps props){
3934
final Mono<Connection> senderConnection = createSenderConnectionMono(provider.getConnectionFactory(), "sender");
4035
final Sender sender = RabbitFlux.createSender(new SenderOptions().connectionMono(senderConnection));
41-
return new ReactiveMessageSender(sender, appName, converter, new TopologyCreator(senderConnection));
36+
return new ReactiveMessageSender(sender, props.getAppName(), converter, new TopologyCreator(senderConnection));
4237
}
4338

4439
@Bean
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
package org.reactivecommons.async.impl.config.props;
2+
3+
import lombok.Getter;
4+
import org.reactivecommons.async.impl.config.IBrokerConfigProps;
5+
import org.springframework.beans.factory.annotation.Value;
6+
import org.springframework.context.annotation.Configuration;
7+
import org.springframework.util.Base64Utils;
8+
9+
import java.nio.ByteBuffer;
10+
import java.util.UUID;
11+
import java.util.concurrent.atomic.AtomicReference;
12+
13+
14+
@Getter
15+
@Configuration
16+
public class BrokerConfigProps implements IBrokerConfigProps {
17+
18+
@Value("${spring.application.name}")
19+
private String appName;
20+
21+
@Value("${app.async.domain.events.exchange:domainEvents}")
22+
private String domainEventsExchangeName;
23+
24+
@Value("${app.async.direct.exchange:directMessages}")
25+
private String directMessagesExchangeName;
26+
27+
private final AtomicReference<String> replyQueueName = new AtomicReference<>();
28+
29+
@Override
30+
public String getEventsQueue() {
31+
return appName + ".subsEvents";
32+
}
33+
34+
@Override
35+
public String getQueriesQueue() {
36+
return appName + ".query";
37+
}
38+
39+
@Override
40+
public String getCommandsQueue() {
41+
return appName;
42+
}
43+
44+
@Override
45+
public String getReplyQueue() {
46+
final String name = replyQueueName.get();
47+
if (name == null) {
48+
final String replyName = newRandomQueueName();
49+
if (replyQueueName.compareAndSet(null, replyName)) {
50+
return replyName;
51+
} else {
52+
return replyQueueName.get();
53+
}
54+
}
55+
return name;
56+
}
57+
58+
private String newRandomQueueName() {
59+
UUID uuid = UUID.randomUUID();
60+
ByteBuffer bb = ByteBuffer.wrap(new byte[16]);
61+
bb.putLong(uuid.getMostSignificantBits())
62+
.putLong(uuid.getLeastSignificantBits());
63+
return appName + Base64Utils.encodeToUrlSafeString(bb.array())
64+
.replaceAll("=", "");
65+
}
66+
67+
}

0 commit comments

Comments
 (0)