Skip to content

Commit 9c4d930

Browse files
author
Daniel Bustamante Ospina
authored
Merge pull request #41 from jfgomezvelez/separate-messagelistenersconfig
Separate messagelistenersconfig
2 parents 0c8812d + f7a695c commit 9c4d930

20 files changed

+417
-34
lines changed

README.md

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,31 @@ app.async.retryDelay=1000
240240
app.async.maxRetries=10
241241
242242
```
243+
### Domain custom Configuration (RabbitMQ)
244+
245+
246+
```
247+
app.async.domain.events.exchange=exchangeCustomName
248+
app.async.domain.events.maxLengthBytes=125000000
249+
250+
```
251+
252+
### Direct custom Configuration (RabbitMQ)
253+
254+
255+
```
256+
app.async.direct.exchange=exchangeCustomName
257+
app.async.direct.maxLengthBytes=125000000
258+
```
259+
260+
### Global custom Configuration (RabbitMQ)
261+
262+
263+
```
264+
app.async.global.exchange=exchangeCustomName
265+
app.async.global.maxLengthBytes=125000000
266+
```
267+
243268
* withDLQRetry: Wheter to enable or not the new Retry DLQ Strategy
244269
* retryDelay: Delay retry value in ms
245270
* maxRetries: Max number of retries in case of error in adition to the one automatic retry per queue.
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package org.reactivecommons.async.impl.config;
2+
3+
import lombok.RequiredArgsConstructor;
4+
import org.reactivecommons.async.impl.DiscardNotifier;
5+
import org.reactivecommons.async.impl.HandlerResolver;
6+
import org.reactivecommons.async.impl.communications.ReactiveMessageListener;
7+
import org.reactivecommons.async.impl.config.props.AsyncProps;
8+
import org.reactivecommons.async.impl.converters.MessageConverter;
9+
import org.reactivecommons.async.impl.listeners.ApplicationCommandListener;
10+
import org.springframework.beans.factory.annotation.Value;
11+
import org.springframework.context.annotation.Bean;
12+
import org.springframework.context.annotation.Configuration;
13+
import org.springframework.context.annotation.Import;
14+
15+
@Configuration
16+
@RequiredArgsConstructor
17+
@Import(RabbitMqConfig.class)
18+
public class CommandListenersConfig {
19+
20+
@Value("${spring.application.name}")
21+
private String appName;
22+
23+
private final AsyncProps asyncProps;
24+
25+
@Bean
26+
public ApplicationCommandListener applicationCommandListener(ReactiveMessageListener listener,
27+
HandlerResolver resolver, MessageConverter converter,
28+
DiscardNotifier discardNotifier) {
29+
ApplicationCommandListener commandListener = new ApplicationCommandListener(listener, appName, resolver,
30+
asyncProps.getDirect().getExchange(), converter, asyncProps.getWithDLQRetry(), asyncProps.getMaxRetries(),
31+
asyncProps.getRetryDelay(), asyncProps.getDirect().getMaxLengthBytes(), discardNotifier);
32+
33+
commandListener.startListener();
34+
35+
return commandListener;
36+
}
37+
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package org.reactivecommons.async.impl.config;
2+
3+
import lombok.RequiredArgsConstructor;
4+
import org.reactivecommons.async.impl.DiscardNotifier;
5+
import org.reactivecommons.async.impl.HandlerResolver;
6+
import org.reactivecommons.async.impl.communications.ReactiveMessageListener;
7+
import org.reactivecommons.async.impl.config.props.AsyncProps;
8+
import org.reactivecommons.async.impl.converters.MessageConverter;
9+
import org.reactivecommons.async.impl.listeners.ApplicationEventListener;
10+
import org.springframework.beans.factory.annotation.Value;
11+
import org.springframework.context.annotation.Bean;
12+
import org.springframework.context.annotation.Configuration;
13+
import org.springframework.context.annotation.Import;
14+
15+
@Configuration
16+
@RequiredArgsConstructor
17+
@Import(RabbitMqConfig.class)
18+
public class EventListenersConfig {
19+
20+
@Value("${spring.application.name}")
21+
private String appName;
22+
23+
private final AsyncProps asyncProps;
24+
25+
@Bean
26+
public ApplicationEventListener eventListener(HandlerResolver resolver, MessageConverter messageConverter,
27+
ReactiveMessageListener receiver, DiscardNotifier discardNotifier) {
28+
29+
final ApplicationEventListener listener = new ApplicationEventListener(receiver,
30+
appName + ".subsEvents", resolver, asyncProps.getDomain().getEvents().getExchange(),
31+
messageConverter, asyncProps.getWithDLQRetry(), asyncProps.getMaxRetries(), asyncProps.getRetryDelay(),asyncProps.getDomain().getEvents().getMaxLengthBytes(),
32+
discardNotifier);
33+
34+
listener.startListener();
35+
36+
return listener;
37+
}
38+
}

async/async-commons-starter/src/main/java/org/reactivecommons/async/impl/config/MessageListenersConfig.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333

3434
@Configuration
3535
@RequiredArgsConstructor
36+
@Deprecated
3637
@Import(RabbitMqConfig.class)
3738
public class MessageListenersConfig {
3839

@@ -48,14 +49,14 @@ public ApplicationEventListener eventListener(HandlerResolver resolver, MessageC
4849
final ApplicationEventListener listener = new ApplicationEventListener(receiver,
4950
appName + ".subsEvents", resolver, asyncProps.getDomain().getEvents().getExchange(),
5051
messageConverter, asyncProps.getWithDLQRetry(), asyncProps.getMaxRetries(), asyncProps.getRetryDelay(),
51-
discardNotifier);
52+
asyncProps.getDomain().getEvents().getMaxLengthBytes(), discardNotifier);
5253
listener.startListener();
5354
return listener;
5455
}
5556

5657
@Bean
5758
public ApplicationNotificationListener eventNotificationListener(HandlerResolver resolver, MessageConverter messageConverter,
58-
ReactiveMessageListener receiver, DiscardNotifier discardNotifier) {
59+
ReactiveMessageListener receiver, DiscardNotifier discardNotifier) {
5960
final ApplicationNotificationListener listener = new ApplicationNotificationListener(
6061
receiver,
6162
asyncProps.getDomain().getEvents().getExchange(),
@@ -73,8 +74,8 @@ public ApplicationQueryListener queryListener(MessageConverter converter, Handle
7374
DiscardNotifier discardNotifier) {
7475
final ApplicationQueryListener listener = new ApplicationQueryListener(rlistener,
7576
appName + ".query", resolver, sender, asyncProps.getDirect().getExchange(), converter,
76-
"globalReply", asyncProps.getWithDLQRetry(), asyncProps.getMaxRetries(),
77-
asyncProps.getRetryDelay(), discardNotifier);
77+
asyncProps.getGlobal().getExchange(), asyncProps.getWithDLQRetry(), asyncProps.getMaxRetries(),
78+
asyncProps.getRetryDelay(), asyncProps.getGlobal().getMaxLengthBytes(), discardNotifier);
7879
listener.startListener();
7980
return listener;
8081
}
@@ -85,7 +86,7 @@ public ApplicationCommandListener applicationCommandListener(ReactiveMessageList
8586
DiscardNotifier discardNotifier) {
8687
ApplicationCommandListener commandListener = new ApplicationCommandListener(listener, appName, resolver,
8788
asyncProps.getDirect().getExchange(), converter, asyncProps.getWithDLQRetry(), asyncProps.getMaxRetries(),
88-
asyncProps.getRetryDelay(), discardNotifier);
89+
asyncProps.getRetryDelay(), asyncProps.getDirect().getMaxLengthBytes(), discardNotifier);
8990
commandListener.startListener();
9091
return commandListener;
9192
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package org.reactivecommons.async.impl.config;
2+
3+
import lombok.RequiredArgsConstructor;
4+
import org.reactivecommons.async.impl.DiscardNotifier;
5+
import org.reactivecommons.async.impl.HandlerResolver;
6+
import org.reactivecommons.async.impl.communications.ReactiveMessageListener;
7+
import org.reactivecommons.async.impl.config.props.AsyncProps;
8+
import org.reactivecommons.async.impl.converters.MessageConverter;
9+
import org.reactivecommons.async.impl.listeners.ApplicationEventListener;
10+
import org.reactivecommons.async.impl.listeners.ApplicationNotificationListener;
11+
import org.springframework.beans.factory.annotation.Value;
12+
import org.springframework.context.annotation.Bean;
13+
import org.springframework.context.annotation.Configuration;
14+
import org.springframework.context.annotation.Import;
15+
16+
@Configuration
17+
@RequiredArgsConstructor
18+
@Import(RabbitMqConfig.class)
19+
public class NotificacionListenersConfig {
20+
21+
@Value("${spring.application.name}")
22+
private String appName;
23+
24+
private final AsyncProps asyncProps;
25+
26+
@Bean
27+
public ApplicationNotificationListener eventNotificationListener(HandlerResolver resolver, MessageConverter messageConverter,
28+
ReactiveMessageListener receiver, DiscardNotifier discardNotifier) {
29+
final ApplicationNotificationListener listener = new ApplicationNotificationListener(
30+
receiver,
31+
asyncProps.getDomain().getEvents().getExchange(),
32+
asyncProps.getNotificationProps().getQueueName(appName),
33+
resolver,
34+
messageConverter,
35+
discardNotifier);
36+
listener.startListener();
37+
return listener;
38+
}
39+
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package org.reactivecommons.async.impl.config;
2+
3+
import lombok.RequiredArgsConstructor;
4+
import org.reactivecommons.async.impl.DiscardNotifier;
5+
import org.reactivecommons.async.impl.HandlerResolver;
6+
import org.reactivecommons.async.impl.communications.ReactiveMessageListener;
7+
import org.reactivecommons.async.impl.communications.ReactiveMessageSender;
8+
import org.reactivecommons.async.impl.config.props.AsyncProps;
9+
import org.reactivecommons.async.impl.converters.MessageConverter;
10+
import org.reactivecommons.async.impl.listeners.ApplicationQueryListener;
11+
import org.springframework.beans.factory.annotation.Value;
12+
import org.springframework.context.annotation.Bean;
13+
import org.springframework.context.annotation.Configuration;
14+
import org.springframework.context.annotation.Import;
15+
16+
@Configuration
17+
@RequiredArgsConstructor
18+
@Import(RabbitMqConfig.class)
19+
public class QueryListenerConfig {
20+
21+
@Value("${spring.application.name}")
22+
private String appName;
23+
24+
private final AsyncProps asyncProps;
25+
26+
@Bean
27+
public ApplicationQueryListener queryListener(MessageConverter converter, HandlerResolver resolver,
28+
ReactiveMessageSender sender, ReactiveMessageListener rlistener,
29+
DiscardNotifier discardNotifier) {
30+
final ApplicationQueryListener listener = new ApplicationQueryListener(rlistener,
31+
appName + ".query", resolver, sender, asyncProps.getDirect().getExchange(), converter,
32+
asyncProps.getGlobal().getExchange(), asyncProps.getWithDLQRetry(), asyncProps.getMaxRetries(),
33+
asyncProps.getRetryDelay(),asyncProps.getGlobal().getMaxLengthBytes(), discardNotifier);
34+
35+
listener.startListener();
36+
37+
return listener;
38+
}
39+
}

async/async-commons-starter/src/main/java/org/reactivecommons/async/impl/config/RabbitMqConfig.java

Lines changed: 69 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,14 @@
55
import lombok.RequiredArgsConstructor;
66
import lombok.extern.java.Log;
77
import org.reactivecommons.api.domain.DomainEventBus;
8-
import org.reactivecommons.async.impl.DiscardNotifier;
9-
import org.reactivecommons.async.impl.RabbitDiscardNotifier;
10-
import org.reactivecommons.async.impl.RabbitDomainEventBus;
8+
import org.reactivecommons.async.api.DefaultCommandHandler;
9+
import org.reactivecommons.async.api.DefaultQueryHandler;
10+
import org.reactivecommons.async.api.DynamicRegistry;
11+
import org.reactivecommons.async.api.HandlerRegistry;
12+
import org.reactivecommons.async.api.handlers.registered.RegisteredCommandHandler;
13+
import org.reactivecommons.async.api.handlers.registered.RegisteredEventListener;
14+
import org.reactivecommons.async.api.handlers.registered.RegisteredQueryHandler;
15+
import org.reactivecommons.async.impl.*;
1116
import org.reactivecommons.async.impl.communications.ReactiveMessageListener;
1217
import org.reactivecommons.async.impl.communications.ReactiveMessageSender;
1318
import org.reactivecommons.async.impl.communications.TopologyCreator;
@@ -21,6 +26,7 @@
2126
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
2227
import org.springframework.boot.context.properties.EnableConfigurationProperties;
2328
import org.springframework.boot.context.properties.PropertyMapper;
29+
import org.springframework.context.ApplicationContext;
2430
import org.springframework.context.annotation.Bean;
2531
import org.springframework.context.annotation.Configuration;
2632
import org.springframework.context.annotation.Import;
@@ -29,6 +35,9 @@
2935
import reactor.util.retry.Retry;
3036

3137
import java.time.Duration;
38+
import java.util.Map;
39+
import java.util.concurrent.ConcurrentHashMap;
40+
import java.util.concurrent.ConcurrentMap;
3241
import java.util.logging.Level;
3342

3443
import static reactor.rabbitmq.ExchangeSpecification.exchange;
@@ -137,4 +146,61 @@ Mono<Connection> createConnectionMono(ConnectionFactory factory, String connecti
137146
.cache();
138147
}
139148

149+
@Bean
150+
public HandlerResolver resolver(ApplicationContext context, DefaultCommandHandler defaultCommandHandler) {
151+
final Map<String, HandlerRegistry> registries = context.getBeansOfType(HandlerRegistry.class);
152+
153+
final ConcurrentMap<String, RegisteredQueryHandler> handlers = registries
154+
.values().stream()
155+
.flatMap(r -> r.getHandlers().stream())
156+
.collect(ConcurrentHashMap::new, (map, handler) -> map.put(handler.getPath(), handler),
157+
ConcurrentHashMap::putAll);
158+
159+
final ConcurrentMap<String, RegisteredEventListener> eventListeners = registries
160+
.values().stream()
161+
.flatMap(r -> r.getEventListeners().stream())
162+
.collect(ConcurrentHashMap::new, (map, handler) -> map.put(handler.getPath(), handler),
163+
ConcurrentHashMap::putAll);
164+
165+
final ConcurrentMap<String, RegisteredCommandHandler> commandHandlers = registries
166+
.values().stream()
167+
.flatMap(r -> r.getCommandHandlers().stream())
168+
.collect(ConcurrentHashMap::new, (map, handler) -> map.put(handler.getPath(), handler),
169+
ConcurrentHashMap::putAll);
170+
171+
final ConcurrentMap<String, RegisteredEventListener> eventNotificationListener = registries
172+
.values()
173+
.stream()
174+
.flatMap(r -> r.getEventNotificationListener().stream())
175+
.collect(ConcurrentHashMap::new, (map, handler) -> map.put(handler.getPath(), handler),
176+
ConcurrentHashMap::putAll);
177+
178+
return new HandlerResolver(handlers, eventListeners, commandHandlers, eventNotificationListener) {
179+
@Override
180+
@SuppressWarnings("unchecked")
181+
public <T> RegisteredCommandHandler<T> getCommandHandler(String path) {
182+
final RegisteredCommandHandler<T> handler = super.getCommandHandler(path);
183+
return handler != null ? handler : new RegisteredCommandHandler<>("", defaultCommandHandler, Object.class);
184+
}
185+
};
186+
}
187+
188+
@Bean
189+
public DynamicRegistry dynamicRegistry(HandlerResolver resolver, ReactiveMessageListener listener, IBrokerConfigProps props) {
190+
return new DynamicRegistryImp(resolver, listener.getTopologyCreator(), props);
191+
}
192+
193+
@Bean
194+
@ConditionalOnMissingBean
195+
public DefaultQueryHandler defaultHandler() {
196+
return (DefaultQueryHandler<Object, Object>) command ->
197+
Mono.error(new RuntimeException("No Handler Registered"));
198+
}
199+
200+
@Bean
201+
@ConditionalOnMissingBean
202+
public DefaultCommandHandler defaultCommandHandler() {
203+
return message -> Mono.error(new RuntimeException("No Handler Registered"));
204+
}
205+
140206
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package org.reactivecommons.async.impl.config.annotations;
2+
3+
import org.reactivecommons.async.impl.config.CommandListenersConfig;
4+
import org.springframework.context.annotation.Configuration;
5+
import org.springframework.context.annotation.Import;
6+
7+
import java.lang.annotation.*;
8+
9+
10+
@Retention(RetentionPolicy.RUNTIME)
11+
@Target({ElementType.TYPE})
12+
@Documented
13+
@Import(CommandListenersConfig.class)
14+
@Configuration
15+
public @interface EnableCommandListeners {
16+
}
17+
18+
19+
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package org.reactivecommons.async.impl.config.annotations;
2+
3+
import org.reactivecommons.async.impl.config.EventListenersConfig;
4+
import org.springframework.context.annotation.Configuration;
5+
import org.springframework.context.annotation.Import;
6+
7+
import java.lang.annotation.*;
8+
9+
@Retention(RetentionPolicy.RUNTIME)
10+
@Target({ElementType.TYPE})
11+
@Documented
12+
@Import(EventListenersConfig.class)
13+
@Configuration
14+
public @interface EnableEventListeners {
15+
}
16+
17+
18+

async/async-commons-starter/src/main/java/org/reactivecommons/async/impl/config/annotations/EnableMessageListeners.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,25 @@
11
package org.reactivecommons.async.impl.config.annotations;
22

3-
import org.reactivecommons.async.impl.config.MessageListenersConfig;
3+
import org.reactivecommons.async.impl.config.CommandListenersConfig;
4+
import org.reactivecommons.async.impl.config.EventListenersConfig;
5+
import org.reactivecommons.async.impl.config.NotificacionListenersConfig;
6+
import org.reactivecommons.async.impl.config.QueryListenerConfig;
47
import org.springframework.context.annotation.Configuration;
58
import org.springframework.context.annotation.Import;
69

710
import java.lang.annotation.*;
811

9-
12+
/**
13+
* Actualmente se utiliza EnableMessageListeners para habilitar Comandos, querys y eventos al mismo tiempo,
14+
* se han separado en 3 EnableCommandListeners, EnableQueryListeners y EnableEventListeners, estos se pueden utilizar
15+
* todos juntos o de manera individual segun necesidad
16+
* @deprecated Use EnableCommandListeners, EnableQueryListeners, EnableEventListeners
17+
*/
18+
@Deprecated
1019
@Retention(RetentionPolicy.RUNTIME)
1120
@Target({ElementType.TYPE})
1221
@Documented
13-
@Import(MessageListenersConfig.class)
22+
@Import({CommandListenersConfig.class, QueryListenerConfig.class, EventListenersConfig.class, NotificacionListenersConfig.class})
1423
@Configuration
1524
public @interface EnableMessageListeners {
1625
}

0 commit comments

Comments
 (0)