Skip to content

Commit 30b3ba6

Browse files
committed
listener-matcher c69f7ef
author gf7 <godfat7@gmail.com> 1591241642 -0500 committer gf7 <godfat7@gmail.com> 1596560939 -0500 Adds event notification listener and functionality to find most accurate listener for an specific event Adds notification event listener
1 parent c69f7ef commit 30b3ba6

File tree

20 files changed

+1495
-42
lines changed

20 files changed

+1495
-42
lines changed

README.md

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,23 +11,23 @@ To include all (API and implementation) (Spring boot Starter):
1111
```groovy
1212
1313
dependencies {
14-
compile 'org.reactivecommons:async-commons-starter:0.4.6'
14+
compile 'org.reactivecommons:async-commons-starter:0.5.0'
1515
}
1616
```
1717

1818
To include only domain events API:
1919

2020
```groovy
2121
dependencies {
22-
compile 'org.reactivecommons:domain-events-api:0.4.6'
22+
compile 'org.reactivecommons:domain-events-api:0.5.0'
2323
}
2424
```
2525

2626
To include only async commons API:
2727

2828
```groovy
2929
dependencies {
30-
compile 'org.reactivecommons:async-commons-api:0.4.6'
30+
compile 'org.reactivecommons:async-commons-api:0.5.0'
3131
}
3232
```
3333

@@ -128,6 +128,31 @@ Don't forget to add the implementation dependency to the main spring boot module
128128
}
129129
```
130130

131+
### Domain Event-Listener
132+
Reactive commons has four types of listeners implemented, available to be registered in the application via the **HandlerRegistry**, each of them is designed to tackle
133+
common requirements for listeners in event based applications and abstracts the behavior of event flow in every situation (Varying for example in retrying strategy, dead letter events, sources and so on).
134+
135+
The available event listeners are:
136+
- Domain Event Listener
137+
- Query Event Listener
138+
- Command Listener
139+
- Notification Listener
140+
141+
Example Code:
142+
```java
143+
public HandlerRegistry notificationEvents() {
144+
return HandlerRegistry.register()
145+
.listenNotificationEvent("gatewayRouteAdded", message -> {
146+
System.out.println("Refreshing instance");
147+
return Mono.empty();
148+
},GatewayEvent.class);
149+
}
150+
```
151+
152+
The above code shows how to handle a notification event (Notification event: an event that should be handled by
153+
every running instance of a microservice, e.g: notify to every instance that a configuration setting has changed
154+
and has to do a hot reload from a persistent source of that data).
155+
131156
### Request-Reply
132157
Example Code:
133158

async/async-commons-api/src/main/java/org/reactivecommons/async/api/HandlerRegistry.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ public static HandlerRegistry register(){
2525
private final List<RegisteredQueryHandler<?, ?>> handlers = new CopyOnWriteArrayList<>();
2626
private final List<RegisteredEventListener> eventListeners = new CopyOnWriteArrayList<>();
2727
private final List<RegisteredCommandHandler> commandHandlers = new CopyOnWriteArrayList<>();
28+
private final List<RegisteredEventListener> eventNotificationListener = new CopyOnWriteArrayList<>();
2829

2930
public <T> HandlerRegistry listenEvent(String eventName, EventHandler<T> fn, Class<T> eventClass){
3031
eventListeners.add(new RegisteredEventListener<>(eventName, fn, eventClass));
@@ -36,6 +37,11 @@ public <T> HandlerRegistry listenEvent(String eventName, EventHandler<T> handler
3637
return this;
3738
}
3839

40+
public <T> HandlerRegistry listenNotificationEvent(String eventName, EventHandler<T> fn, Class<T> eventClass){
41+
eventNotificationListener.add(new RegisteredEventListener<>(eventName, fn, eventClass));
42+
return this;
43+
}
44+
3945
public <T> HandlerRegistry handleCommand(String commandName, CommandHandler<T> fn, Class<T> commandClass){
4046
commandHandlers.add(new RegisteredCommandHandler<>(commandName, fn, commandClass));
4147
return this;

async/async-commons-starter/src/main/java/org/reactivecommons/async/impl/config/MessageListenersConfig.java

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import org.reactivecommons.async.impl.converters.MessageConverter;
1818
import org.reactivecommons.async.impl.listeners.ApplicationCommandListener;
1919
import org.reactivecommons.async.impl.listeners.ApplicationEventListener;
20+
import org.reactivecommons.async.impl.listeners.ApplicationNotificationListener;
2021
import org.reactivecommons.async.impl.listeners.ApplicationQueryListener;
2122
import org.springframework.beans.factory.annotation.Value;
2223
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
@@ -48,9 +49,21 @@ public ApplicationEventListener eventListener(HandlerResolver resolver, MessageC
4849
appName + ".subsEvents", resolver, asyncProps.getDomain().getEvents().getExchange(),
4950
messageConverter, asyncProps.getWithDLQRetry(), asyncProps.getMaxRetries(), asyncProps.getRetryDelay(),
5051
discardNotifier);
51-
5252
listener.startListener();
53+
return listener;
54+
}
5355

56+
@Bean
57+
public ApplicationNotificationListener eventNotificationListener(HandlerResolver resolver, MessageConverter messageConverter,
58+
ReactiveMessageListener receiver, DiscardNotifier discardNotifier) {
59+
final ApplicationNotificationListener listener = new ApplicationNotificationListener(
60+
receiver,
61+
asyncProps.getDomain().getEvents().getExchange(),
62+
asyncProps.getNotificationProps().getQueueName(appName),
63+
resolver,
64+
messageConverter,
65+
discardNotifier);
66+
listener.startListener();
5467
return listener;
5568
}
5669

@@ -62,9 +75,7 @@ public ApplicationQueryListener queryListener(MessageConverter converter, Handle
6275
appName + ".query", resolver, sender, asyncProps.getDirect().getExchange(), converter,
6376
"globalReply", asyncProps.getWithDLQRetry(), asyncProps.getMaxRetries(),
6477
asyncProps.getRetryDelay(), discardNotifier);
65-
6678
listener.startListener();
67-
6879
return listener;
6980
}
7081

@@ -75,9 +86,7 @@ public ApplicationCommandListener applicationCommandListener(ReactiveMessageList
7586
ApplicationCommandListener commandListener = new ApplicationCommandListener(listener, appName, resolver,
7687
asyncProps.getDirect().getExchange(), converter, asyncProps.getWithDLQRetry(), asyncProps.getMaxRetries(),
7788
asyncProps.getRetryDelay(), discardNotifier);
78-
7989
commandListener.startListener();
80-
8190
return commandListener;
8291
}
8392

@@ -108,7 +117,14 @@ public HandlerResolver resolver(ApplicationContext context, DefaultCommandHandle
108117
.collect(ConcurrentHashMap::new, (map, handler) -> map.put(handler.getPath(), handler),
109118
ConcurrentHashMap::putAll);
110119

111-
return new HandlerResolver(handlers, eventListeners, commandHandlers) {
120+
final ConcurrentMap<String, RegisteredEventListener> eventNotificationListener = registries
121+
.values()
122+
.stream()
123+
.flatMap(r -> r.getEventNotificationListener().stream())
124+
.collect(ConcurrentHashMap::new, (map, handler) -> map.put(handler.getPath(), handler),
125+
ConcurrentHashMap::putAll);
126+
127+
return new HandlerResolver(handlers, eventListeners, commandHandlers, eventNotificationListener) {
112128
@Override
113129
@SuppressWarnings("unchecked")
114130
public <T> RegisteredCommandHandler<T> getCommandHandler(String path) {
@@ -125,7 +141,6 @@ public DefaultQueryHandler defaultHandler() {
125141
Mono.error(new RuntimeException("No Handler Registered"));
126142
}
127143

128-
129144
@Bean
130145
@ConditionalOnMissingBean
131146
public DefaultCommandHandler defaultCommandHandler() {

async/async-commons-starter/src/main/java/org/reactivecommons/async/impl/config/props/AsyncProps.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,14 @@ public class AsyncProps {
2020
@NestedConfigurationProperty
2121
private DirectProps direct = new DirectProps();
2222

23+
@NestedConfigurationProperty
24+
private NotificationProps notificationProps = new NotificationProps();
25+
2326
private Integer maxRetries = 10;
2427

2528
private Integer prefetchCount = 250;
2629

2730
private Integer retryDelay = 1000;
2831

2932
private Boolean withDLQRetry = false;
30-
3133
}

async/async-commons-starter/src/main/java/org/reactivecommons/async/impl/config/props/BrokerConfigProps.java

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import lombok.Getter;
44
import lombok.RequiredArgsConstructor;
55
import org.reactivecommons.async.impl.config.IBrokerConfigProps;
6+
import org.reactivecommons.async.impl.utils.NameGenerator;
67
import org.springframework.beans.factory.annotation.Value;
78
import org.springframework.context.annotation.Configuration;
89
import org.springframework.util.Base64Utils;
@@ -19,9 +20,7 @@ public class BrokerConfigProps implements IBrokerConfigProps {
1920

2021
@Value("${spring.application.name}")
2122
private String appName;
22-
2323
private final AsyncProps asyncProps;
24-
2524
private final AtomicReference<String> replyQueueName = new AtomicReference<>();
2625

2726
@Override
@@ -43,7 +42,7 @@ public String getCommandsQueue() {
4342
public String getReplyQueue() {
4443
final String name = replyQueueName.get();
4544
if (name == null) {
46-
final String replyName = newRandomQueueName();
45+
final String replyName = NameGenerator.generateNameFrom(appName);
4746
if (replyQueueName.compareAndSet(null, replyName)) {
4847
return replyName;
4948
} else {
@@ -62,14 +61,4 @@ public String getDomainEventsExchangeName() {
6261
public String getDirectMessagesExchangeName() {
6362
return asyncProps.getDirect().getExchange();
6463
}
65-
66-
private String newRandomQueueName() {
67-
UUID uuid = UUID.randomUUID();
68-
ByteBuffer bb = ByteBuffer.wrap(new byte[16]);
69-
bb.putLong(uuid.getMostSignificantBits())
70-
.putLong(uuid.getLeastSignificantBits());
71-
return appName + Base64Utils.encodeToUrlSafeString(bb.array())
72-
.replaceAll("=", "");
73-
}
74-
7564
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package org.reactivecommons.async.impl.config.props;
2+
3+
import lombok.Getter;
4+
import lombok.RequiredArgsConstructor;
5+
import org.reactivecommons.async.impl.utils.NameGenerator;
6+
7+
import java.util.concurrent.atomic.AtomicReference;
8+
9+
@Getter
10+
@RequiredArgsConstructor
11+
public class NotificationProps {
12+
13+
private final AtomicReference<String> queueName = new AtomicReference<>();
14+
private final String queueSuffix = "notification";
15+
16+
public String getQueueName(String applicationName) {
17+
final String name = this.queueName.get();
18+
if(name == null) return getGeneratedName(applicationName);
19+
return name;
20+
}
21+
22+
private String getGeneratedName(String applicationName) {
23+
String generatedName = NameGenerator.generateNameFrom(applicationName, queueSuffix);
24+
return this.queueName
25+
.compareAndSet(null, generatedName) ?
26+
generatedName : this.queueName.get();
27+
}
28+
}

async/async-commons/async-commons.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,4 +79,6 @@ dependencies {
7979
api "io.projectreactor.rabbitmq:reactor-rabbitmq"
8080
api 'com.fasterxml.jackson.core:jackson-databind'
8181
testImplementation 'io.projectreactor:reactor-test'
82+
testCompile group: 'org.databene', name: 'contiperf', version: '2.3.4'
83+
8284
}

async/async-commons/src/main/java/org/reactivecommons/async/impl/HandlerResolver.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ public class HandlerResolver {
1414
private final Map<String, RegisteredQueryHandler> queryHandlers;
1515
private final Map<String, RegisteredEventListener> eventListeners;
1616
private final Map<String, RegisteredCommandHandler> commandHandlers;
17-
17+
private final Map<String , RegisteredEventListener> eventNotificationListeners;
1818

1919
@SuppressWarnings("unchecked")
2020
public <T, R> RegisteredQueryHandler<T, R> getQueryHandler(String path) {
@@ -31,6 +31,14 @@ public <T> RegisteredEventListener<T> getEventListener(String path) {
3131
return eventListeners.get(path);
3232
}
3333

34+
public Collection<RegisteredEventListener> getNotificationListeners() {
35+
return eventNotificationListeners.values();
36+
}
37+
38+
public <T> RegisteredEventListener<T> getNotificationListener(String path) {
39+
return eventNotificationListeners.get(path);
40+
}
41+
3442
public Collection<RegisteredEventListener> getEventListeners() {
3543
return eventListeners.values();
3644
}

async/async-commons/src/main/java/org/reactivecommons/async/impl/listeners/ApplicationEventListener.java

Lines changed: 27 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,18 @@
1313
import org.reactivecommons.async.impl.HandlerResolver;
1414
import org.reactivecommons.async.impl.communications.ReactiveMessageListener;
1515
import org.reactivecommons.async.impl.communications.TopologyCreator;
16+
import org.reactivecommons.async.impl.utils.matcher.KeyMatcher;
17+
import org.reactivecommons.async.impl.utils.matcher.Matcher;
1618
import reactor.core.publisher.Flux;
1719
import reactor.core.publisher.Mono;
1820
import reactor.rabbitmq.AcknowledgableDelivery;
1921
import reactor.rabbitmq.BindingSpecification;
2022
import reactor.rabbitmq.ExchangeSpecification;
2123
import reactor.rabbitmq.QueueSpecification;
2224

25+
import java.util.Set;
2326
import java.util.function.Function;
27+
import java.util.stream.Collectors;
2428

2529
import static reactor.core.publisher.Flux.fromIterable;
2630

@@ -32,14 +36,24 @@ public class ApplicationEventListener extends GenericMessageListener {
3236
private final String eventsExchange;
3337
private final boolean withDLQRetry;
3438
private final int retryDelay;
35-
36-
public ApplicationEventListener(ReactiveMessageListener receiver, String queueName, HandlerResolver resolver, String eventsExchange, MessageConverter messageConverter, boolean withDLQRetry, long maxRetries, int retryDelay, DiscardNotifier discardNotifier) {
39+
private final Matcher keyMatcher;
40+
41+
public ApplicationEventListener(ReactiveMessageListener receiver,
42+
String queueName,
43+
HandlerResolver resolver,
44+
String eventsExchange,
45+
MessageConverter messageConverter,
46+
boolean withDLQRetry,
47+
long maxRetries,
48+
int retryDelay,
49+
DiscardNotifier discardNotifier) {
3750
super(queueName, receiver, withDLQRetry, maxRetries, discardNotifier, "event");
3851
this.retryDelay = retryDelay;
3952
this.withDLQRetry = withDLQRetry;
4053
this.resolver = resolver;
4154
this.eventsExchange = eventsExchange;
4255
this.messageConverter = messageConverter;
56+
this.keyMatcher = new KeyMatcher();
4357
}
4458

4559
protected Mono<Void> setUpBindings(TopologyCreator creator) {
@@ -48,7 +62,8 @@ protected Mono<Void> setUpBindings(TopologyCreator creator) {
4862
final Mono<AMQP.Exchange.DeclareOk> declareExchangeDLQ = creator.declare(ExchangeSpecification.exchange(eventsExchange+".DLQ").durable(true).type("topic"));
4963
final Mono<AMQP.Queue.DeclareOk> declareDLQ = creator.declareDLQ(queueName, eventsExchange, retryDelay);
5064
final Mono<AMQP.Queue.DeclareOk> declareQueue = creator.declareQueue(queueName, eventsExchange+".DLQ");
51-
final Flux<AMQP.Queue.BindOk> bindings = fromIterable(resolver.getEventListeners()).flatMap(listener -> creator.bind(BindingSpecification.binding(eventsExchange, listener.getPath(), queueName)));
65+
final Flux<AMQP.Queue.BindOk> bindings = fromIterable(resolver.getEventListeners())
66+
.flatMap(listener -> creator.bind(BindingSpecification.binding(eventsExchange, listener.getPath(), queueName)));
5267
final Flux<AMQP.Queue.BindOk> bindingDLQ = fromIterable(resolver.getEventListeners()).flatMap(listener -> creator.bind(BindingSpecification.binding(eventsExchange+".DLQ", listener.getPath(), queueName + ".DLQ")));
5368
return declareExchange.then(declareExchangeDLQ).then(declareQueue).then(declareDLQ).thenMany(bindings).thenMany(bindingDLQ).then();
5469
} else {
@@ -63,26 +78,24 @@ protected Mono<Void> setUpBindings(TopologyCreator creator) {
6378

6479
@Override
6580
protected Function<Message, Mono<Object>> rawMessageHandler(String executorPath) {
66-
final RegisteredEventListener<Object> handler = resolver.getEventListener(executorPath);
81+
final Set<String> listenerKeys = resolver.getEventListeners()
82+
.stream()
83+
.map(RegisteredEventListener::getPath)
84+
.collect(Collectors.toSet());
85+
final String matchedKey = keyMatcher.match(listenerKeys, executorPath);
86+
final RegisteredEventListener<Object> handler = resolver.getEventListener(matchedKey);
6787
final Class<Object> eventClass = handler.getInputClass();
6888
Function<Message, DomainEvent<Object>> converter = msj -> messageConverter.readDomainEvent(msj, eventClass);
6989
final EventExecutor<Object> executor = new EventExecutor<>(handler.getHandler(), converter);
70-
return msj -> executor.execute(msj).cast(Object.class);
90+
return msj -> executor
91+
.execute(msj)
92+
.cast(Object.class);
7193
}
7294

7395
protected String getExecutorPath(AcknowledgableDelivery msj) {
7496
return msj.getEnvelope().getRoutingKey();
7597
}
7698

77-
78-
@Data
79-
private static class DomainEventInt {
80-
private String name;
81-
private String eventId;
82-
private JsonNode data;
83-
}
84-
85-
8699
}
87100

88101

0 commit comments

Comments
 (0)