Skip to content

Commit 434d9f5

Browse files
author
Juan Marín
authored
Merge pull request #51 from reactive-commons/feat/dynamic-event-handler
feat: dynamic events handlers
2 parents 97f8ae3 + ebff49f commit 434d9f5

File tree

11 files changed

+251
-86
lines changed

11 files changed

+251
-86
lines changed

async/async-commons-api/src/main/java/org/reactivecommons/async/api/DynamicRegistry.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,5 +4,12 @@
44
import reactor.core.publisher.Mono;
55

66
public interface DynamicRegistry {
7+
8+
@Deprecated
79
<T> Mono<Void> listenEvent(String eventName, EventHandler<T> fn, Class<T> eventClass);
10+
11+
Mono<Void> startListeningEvent(String eventName);
12+
13+
Mono<Void> stopListeningEvent(String eventName);
14+
815
}

async/async-commons-api/src/main/java/org/reactivecommons/async/api/HandlerRegistry.java

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
import org.reactivecommons.async.api.handlers.registered.RegisteredCommandHandler;
1111
import org.reactivecommons.async.api.handlers.registered.RegisteredEventListener;
1212
import org.reactivecommons.async.api.handlers.registered.RegisteredQueryHandler;
13-
import reactor.core.publisher.Mono;
1413

1514
import java.lang.reflect.ParameterizedType;
1615
import java.util.List;
@@ -20,30 +19,40 @@
2019
@NoArgsConstructor(access = AccessLevel.PACKAGE)
2120
public class HandlerRegistry {
2221

23-
private final List<RegisteredQueryHandler> handlers = new CopyOnWriteArrayList<>();
24-
private final List<RegisteredEventListener> eventListeners = new CopyOnWriteArrayList<>();
25-
private final List<RegisteredCommandHandler> commandHandlers = new CopyOnWriteArrayList<>();
26-
private final List<RegisteredEventListener> eventNotificationListener = new CopyOnWriteArrayList<>();
22+
private final List<RegisteredEventListener<?>> eventListeners = new CopyOnWriteArrayList<>();
23+
private final List<RegisteredEventListener<?>> eventNotificationListener = new CopyOnWriteArrayList<>();
24+
private final List<RegisteredEventListener<?>> dynamicEventsHandlers = new CopyOnWriteArrayList<>();
25+
26+
private final List<RegisteredQueryHandler<?, ?>> handlers = new CopyOnWriteArrayList<>();
27+
private final List<RegisteredCommandHandler<?>> commandHandlers = new CopyOnWriteArrayList<>();
2728

2829
public static HandlerRegistry register() {
2930
return new HandlerRegistry();
3031
}
3132

32-
public <T> HandlerRegistry listenEvent(String eventName, EventHandler<T> fn, Class<T> eventClass) {
33-
eventListeners.add(new RegisteredEventListener<>(eventName, fn, eventClass));
33+
public <T> HandlerRegistry listenEvent(String eventName, EventHandler<T> handler, Class<T> eventClass) {
34+
eventListeners.add(new RegisteredEventListener<>(eventName, handler, eventClass));
3435
return this;
3536
}
3637

3738
public <T> HandlerRegistry listenEvent(String eventName, EventHandler<T> handler) {
38-
eventListeners.add(new RegisteredEventListener<>(eventName, handler, inferGenericParameterType(handler)));
39+
return listenEvent(eventName, handler, inferGenericParameterType(handler));
40+
}
41+
42+
public <T> HandlerRegistry listenNotificationEvent(String eventName, EventHandler<T> handler, Class<T> eventClass) {
43+
eventNotificationListener.add(new RegisteredEventListener<>(eventName, handler, eventClass));
3944
return this;
4045
}
4146

42-
public <T> HandlerRegistry listenNotificationEvent(String eventName, EventHandler<T> fn, Class<T> eventClass) {
43-
eventNotificationListener.add(new RegisteredEventListener<>(eventName, fn, eventClass));
47+
public <T> HandlerRegistry handleDynamicEvents(String eventNamePattern, EventHandler<T> handler, Class<T> eventClass) {
48+
dynamicEventsHandlers.add(new RegisteredEventListener<>(eventNamePattern, handler, eventClass));
4449
return this;
4550
}
4651

52+
public <T> HandlerRegistry handleDynamicEvents(String eventNamePattern, EventHandler<T> handler) {
53+
return handleDynamicEvents(eventNamePattern, handler, inferGenericParameterType(handler));
54+
}
55+
4756
public <T> HandlerRegistry handleCommand(String commandName, CommandHandler<T> fn, Class<T> commandClass) {
4857
commandHandlers.add(new RegisteredCommandHandler<>(commandName, fn, commandClass));
4958
return this;

async/async-commons-api/src/test/java/org/reactivecommons/async/api/HandlerRegistryTest.java

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,48 @@ public void shouldListenEventWithTypeInferenceWhenClassInstanceIsUsed() {
3535
}).hasSize(1);
3636
}
3737

38+
@Test
39+
void shouldRegisterDynamicEventsHandlerWithTypeInference() {
40+
SomeEventHandler eventHandler = new SomeEventHandler();
41+
42+
String eventNamePattern = "a.*";
43+
44+
HandlerRegistry resultRegistry = registry.handleDynamicEvents(eventNamePattern, eventHandler);
45+
RegisteredEventListener<SomeDataClass> expectedRegisteredEventListener =
46+
new RegisteredEventListener<>(eventNamePattern, eventHandler, SomeDataClass.class);
47+
48+
assertThat(registry.getDynamicEventsHandlers())
49+
.anySatisfy(registeredEventListener -> {
50+
assertThat(registeredEventListener)
51+
.usingRecursiveComparison()
52+
.isEqualTo(expectedRegisteredEventListener);
53+
});
54+
55+
assertThat(resultRegistry)
56+
.isSameAs(registry);
57+
}
58+
59+
@Test
60+
void shouldRegisterDynamicEventsHandler() {
61+
SomeEventHandler eventHandler = new SomeEventHandler();
62+
63+
String eventNamePattern = "a.*";
64+
65+
HandlerRegistry resultRegistry = registry.handleDynamicEvents(eventNamePattern, eventHandler, SomeDataClass.class);
66+
RegisteredEventListener<SomeDataClass> expectedRegisteredEventListener =
67+
new RegisteredEventListener<>(eventNamePattern, eventHandler, SomeDataClass.class);
68+
69+
assertThat(registry.getDynamicEventsHandlers())
70+
.anySatisfy(registeredEventListener -> {
71+
assertThat(registeredEventListener)
72+
.usingRecursiveComparison()
73+
.isEqualTo(expectedRegisteredEventListener);
74+
});
75+
76+
assertThat(resultRegistry)
77+
.isSameAs(registry);
78+
}
79+
3880
@Test
3981
@SuppressWarnings("unchecked")
4082
public void listenEvent() {

async/async-commons-starter/src/main/java/org/reactivecommons/async/impl/config/RabbitMqConfig.java

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -75,17 +75,17 @@ public SenderOptions reactiveCommonsSenderOptions(ConnectionFactoryProvider prov
7575
final PropertyMapper map = PropertyMapper.get();
7676

7777
map.from(rabbitProperties.getCache().getChannel()::getSize).whenNonNull()
78-
.to(channelPoolOptions::maxCacheSize);
78+
.to(channelPoolOptions::maxCacheSize);
7979

8080
final ChannelPool channelPool = ChannelPoolFactory.createChannelPool(
81-
senderConnection,
82-
channelPoolOptions
81+
senderConnection,
82+
channelPoolOptions
8383
);
8484

8585
return new SenderOptions()
86-
.channelPool(channelPool)
87-
.resourceManagementChannelMono(channelPool.getChannelMono()
88-
.transform(Utils::cache));
86+
.channelPool(channelPool)
87+
.resourceManagementChannelMono(channelPool.getChannelMono()
88+
.transform(Utils::cache));
8989
}
9090

9191
@Bean
@@ -180,32 +180,39 @@ Mono<Connection> createConnectionMono(ConnectionFactory factory, String connecti
180180
public HandlerResolver resolver(ApplicationContext context, DefaultCommandHandler defaultCommandHandler) {
181181
final Map<String, HandlerRegistry> registries = context.getBeansOfType(HandlerRegistry.class);
182182

183-
final ConcurrentMap<String, RegisteredQueryHandler> handlers = registries
183+
final ConcurrentMap<String, RegisteredQueryHandler<?, ?>> queryHandlers = registries
184184
.values().stream()
185185
.flatMap(r -> r.getHandlers().stream())
186186
.collect(ConcurrentHashMap::new, (map, handler) -> map.put(handler.getPath(), handler),
187187
ConcurrentHashMap::putAll);
188188

189-
final ConcurrentMap<String, RegisteredEventListener> eventListeners = registries
189+
final ConcurrentMap<String, RegisteredEventListener<?>> eventListeners = registries
190190
.values().stream()
191191
.flatMap(r -> r.getEventListeners().stream())
192192
.collect(ConcurrentHashMap::new, (map, handler) -> map.put(handler.getPath(), handler),
193193
ConcurrentHashMap::putAll);
194194

195-
final ConcurrentMap<String, RegisteredCommandHandler> commandHandlers = registries
195+
final ConcurrentMap<String, RegisteredEventListener<?>> dynamicEventHandlers = registries
196+
.values().stream()
197+
.flatMap(r -> r.getDynamicEventsHandlers().stream())
198+
.collect(ConcurrentHashMap::new, (map, handler) -> map.put(handler.getPath(), handler),
199+
ConcurrentHashMap::putAll);
200+
201+
final ConcurrentMap<String, RegisteredCommandHandler<?>> commandHandlers = registries
196202
.values().stream()
197203
.flatMap(r -> r.getCommandHandlers().stream())
198204
.collect(ConcurrentHashMap::new, (map, handler) -> map.put(handler.getPath(), handler),
199205
ConcurrentHashMap::putAll);
200206

201-
final ConcurrentMap<String, RegisteredEventListener> eventNotificationListener = registries
207+
final ConcurrentMap<String, RegisteredEventListener<?>> eventNotificationListener = registries
202208
.values()
203209
.stream()
204210
.flatMap(r -> r.getEventNotificationListener().stream())
205211
.collect(ConcurrentHashMap::new, (map, handler) -> map.put(handler.getPath(), handler),
206212
ConcurrentHashMap::putAll);
207213

208-
return new HandlerResolver(handlers, eventListeners, commandHandlers, eventNotificationListener) {
214+
return new HandlerResolver(queryHandlers, eventListeners, eventNotificationListener,
215+
dynamicEventHandlers, commandHandlers) {
209216
@Override
210217
@SuppressWarnings("unchecked")
211218
public <T> RegisteredCommandHandler<T> getCommandHandler(String path) {

async/async-commons/src/main/java/org/reactivecommons/async/impl/DynamicRegistryImp.java

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package org.reactivecommons.async.impl;
22

3-
import com.rabbitmq.client.AMQP;
43
import lombok.RequiredArgsConstructor;
54
import org.reactivecommons.async.api.DynamicRegistry;
65
import org.reactivecommons.async.api.handlers.EventHandler;
@@ -17,11 +16,29 @@ public class DynamicRegistryImp implements DynamicRegistry {
1716
private final TopologyCreator topologyCreator;
1817
private final IBrokerConfigProps props;
1918

19+
2020
@Override
21-
public <T> Mono<Void> listenEvent(String eventName, EventHandler<T> fn, Class<T> eventClass){
21+
public <T> Mono<Void> listenEvent(String eventName, EventHandler<T> fn, Class<T> eventClass) {
2222
resolver.addEventListener(new RegisteredEventListener<>(eventName, fn, eventClass));
23-
final Mono<AMQP.Queue.BindOk> bind = topologyCreator.bind(BindingSpecification.binding(props.getDomainEventsExchangeName(), eventName, props.getEventsQueue()));
24-
return bind.then();
23+
24+
return topologyCreator.bind(buildBindingSpecification(eventName))
25+
.then();
26+
}
27+
28+
@Override
29+
public Mono<Void> startListeningEvent(String eventName) {
30+
return topologyCreator.bind(buildBindingSpecification(eventName))
31+
.then();
32+
}
33+
34+
@Override
35+
public Mono<Void> stopListeningEvent(String eventName) {
36+
return topologyCreator.unbind(buildBindingSpecification(eventName))
37+
.then();
38+
}
39+
40+
private BindingSpecification buildBindingSpecification(String eventName) {
41+
return BindingSpecification.binding(props.getDomainEventsExchangeName(), eventName, props.getEventsQueue());
2542
}
2643

2744
}

async/async-commons/src/main/java/org/reactivecommons/async/impl/HandlerResolver.java

Lines changed: 28 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,18 @@
66
import org.reactivecommons.async.api.handlers.registered.RegisteredQueryHandler;
77

88
import java.util.Collection;
9+
import java.util.HashSet;
910
import java.util.Map;
11+
import java.util.Set;
1012

1113
@RequiredArgsConstructor
1214
public class HandlerResolver {
1315

14-
private final Map<String, RegisteredQueryHandler> queryHandlers;
15-
private final Map<String, RegisteredEventListener> eventListeners;
16-
private final Map<String, RegisteredCommandHandler> commandHandlers;
17-
private final Map<String, RegisteredEventListener> eventNotificationListeners;
16+
private final Map<String, RegisteredQueryHandler<?, ?>> queryHandlers;
17+
private final Map<String, RegisteredEventListener<?>> eventListeners;
18+
private final Map<String, RegisteredEventListener<?>> eventNotificationListeners;
19+
private final Map<String, RegisteredEventListener<?>> dynamicEventsHandlers;
20+
private final Map<String, RegisteredCommandHandler<?>> commandHandlers;
1821

1922
@SuppressWarnings("unchecked")
2023
public <T, M> RegisteredQueryHandler<T, M> getQueryHandler(String path) {
@@ -23,26 +26,42 @@ public <T, M> RegisteredQueryHandler<T, M> getQueryHandler(String path) {
2326

2427
@SuppressWarnings("unchecked")
2528
public <T> RegisteredCommandHandler<T> getCommandHandler(String path) {
26-
return commandHandlers.get(path);
29+
return (RegisteredCommandHandler<T>) commandHandlers.get(path);
2730
}
2831

2932
@SuppressWarnings("unchecked")
3033
public <T> RegisteredEventListener<T> getEventListener(String path) {
31-
return eventListeners.get(path);
34+
return (RegisteredEventListener<T>) eventListeners.get(path);
3235
}
3336

34-
public Collection<RegisteredEventListener> getNotificationListeners() {
37+
@SuppressWarnings("unchecked")
38+
public <T> RegisteredEventListener<T> getDynamicEventsHandler(String path) {
39+
return (RegisteredEventListener<T>) dynamicEventsHandlers.get(path);
40+
}
41+
42+
public Collection<RegisteredEventListener<?>> getNotificationListeners() {
3543
return eventNotificationListeners.values();
3644
}
3745

46+
@SuppressWarnings("unchecked")
3847
public <T> RegisteredEventListener<T> getNotificationListener(String path) {
39-
return eventNotificationListeners.get(path);
48+
return (RegisteredEventListener<T>) eventNotificationListeners.get(path);
4049
}
4150

42-
public Collection<RegisteredEventListener> getEventListeners() {
51+
public Collection<RegisteredEventListener<?>> getEventListeners() {
4352
return eventListeners.values();
4453
}
4554

55+
public Set<String> getToListenEventNames() {
56+
Set<String> toListenEventNames = new HashSet<>(eventListeners.size() +
57+
dynamicEventsHandlers.size());
58+
59+
toListenEventNames.addAll(eventListeners.keySet());
60+
toListenEventNames.addAll(dynamicEventsHandlers.keySet());
61+
62+
return toListenEventNames;
63+
}
64+
4665
void addEventListener(RegisteredEventListener listener) {
4766
eventListeners.put(listener.getPath(), listener);
4867
}

async/async-commons/src/main/java/org/reactivecommons/async/impl/listeners/ApplicationEventListener.java

Lines changed: 27 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import reactor.rabbitmq.BindingSpecification;
2121
import reactor.rabbitmq.ExchangeSpecification;
2222

23+
import java.util.Objects;
2324
import java.util.Optional;
2425
import java.util.Set;
2526
import java.util.function.Function;
@@ -39,16 +40,14 @@ public class ApplicationEventListener extends GenericMessageListener {
3940
private final Matcher keyMatcher;
4041

4142

42-
43-
44-
public ApplicationEventListener(ReactiveMessageListener receiver,
45-
String queueName,
46-
HandlerResolver resolver,
47-
String eventsExchange,
48-
MessageConverter messageConverter,
49-
boolean withDLQRetry,
50-
long maxRetries, int retryDelay,
51-
Optional<Integer> maxLengthBytes,
43+
public ApplicationEventListener(ReactiveMessageListener receiver,
44+
String queueName,
45+
HandlerResolver resolver,
46+
String eventsExchange,
47+
MessageConverter messageConverter,
48+
boolean withDLQRetry,
49+
long maxRetries, int retryDelay,
50+
Optional<Integer> maxLengthBytes,
5251
DiscardNotifier discardNotifier,
5352
CustomErrorReporter errorReporter) {
5453
super(queueName, receiver, withDLQRetry, maxRetries, discardNotifier, "event", errorReporter);
@@ -64,11 +63,11 @@ public ApplicationEventListener(ReactiveMessageListener receiver,
6463
protected Mono<Void> setUpBindings(TopologyCreator creator) {
6564
if (withDLQRetry) {
6665
final Mono<AMQP.Exchange.DeclareOk> declareExchange = creator.declare(ExchangeSpecification.exchange(eventsExchange).durable(true).type("topic"));
67-
final Mono<AMQP.Exchange.DeclareOk> declareExchangeDLQ = creator.declare(ExchangeSpecification.exchange(eventsExchange+".DLQ").durable(true).type("topic"));
66+
final Mono<AMQP.Exchange.DeclareOk> declareExchangeDLQ = creator.declare(ExchangeSpecification.exchange(eventsExchange + ".DLQ").durable(true).type("topic"));
6867
final Mono<AMQP.Queue.DeclareOk> declareDLQ = creator.declareDLQ(queueName, eventsExchange, retryDelay, maxLengthBytes);
69-
final Mono<AMQP.Queue.DeclareOk> declareQueue = creator.declareQueue(queueName, eventsExchange+".DLQ", maxLengthBytes);
68+
final Mono<AMQP.Queue.DeclareOk> declareQueue = creator.declareQueue(queueName, eventsExchange + ".DLQ", maxLengthBytes);
7069
final Flux<AMQP.Queue.BindOk> bindings = fromIterable(resolver.getEventListeners()).flatMap(listener -> creator.bind(BindingSpecification.binding(eventsExchange, listener.getPath(), queueName)));
71-
final Flux<AMQP.Queue.BindOk> bindingDLQ = fromIterable(resolver.getEventListeners()).flatMap(listener -> creator.bind(BindingSpecification.binding(eventsExchange+".DLQ", listener.getPath(), queueName + ".DLQ")));
70+
final Flux<AMQP.Queue.BindOk> bindingDLQ = fromIterable(resolver.getEventListeners()).flatMap(listener -> creator.bind(BindingSpecification.binding(eventsExchange + ".DLQ", listener.getPath(), queueName + ".DLQ")));
7271
return declareExchange.then(declareExchangeDLQ).then(declareQueue).then(declareDLQ).thenMany(bindings).thenMany(bindingDLQ).then();
7372
} else {
7473
final Flux<AMQP.Queue.BindOk> bindings = fromIterable(resolver.getEventListeners())
@@ -82,20 +81,29 @@ protected Mono<Void> setUpBindings(TopologyCreator creator) {
8281

8382
@Override
8483
protected Function<Message, Mono<Object>> rawMessageHandler(String executorPath) {
85-
final Set<String> listenerKeys = resolver.getEventListeners()
86-
.stream()
87-
.map(RegisteredEventListener::getPath)
88-
.collect(Collectors.toSet());
89-
final String matchedKey = keyMatcher.match(listenerKeys, executorPath);
90-
final RegisteredEventListener<Object> handler = resolver.getEventListener(matchedKey);
84+
final String matchedKey = keyMatcher.match(resolver.getToListenEventNames(), executorPath);
85+
final RegisteredEventListener<Object> handler = getEventListener(matchedKey);
86+
9187
final Class<Object> eventClass = handler.getInputClass();
9288
Function<Message, DomainEvent<Object>> converter = msj -> messageConverter.readDomainEvent(msj, eventClass);
89+
9390
final EventExecutor<Object> executor = new EventExecutor<>(handler.getHandler(), converter);
91+
9492
return msj -> executor
9593
.execute(msj)
9694
.cast(Object.class);
9795
}
9896

97+
private RegisteredEventListener<Object> getEventListener(String matchedKey) {
98+
RegisteredEventListener<Object> eventListener = resolver.getEventListener(matchedKey);
99+
100+
if (eventListener == null) {
101+
return resolver.getDynamicEventsHandler(matchedKey);
102+
}
103+
104+
return eventListener;
105+
}
106+
99107
protected String getExecutorPath(AcknowledgableDelivery msj) {
100108
return msj.getEnvelope().getRoutingKey();
101109
}

0 commit comments

Comments
 (0)