Skip to content

Commit 24638d6

Browse files
committed
se adiciona argumento x-max-length-bytes en la creación de colas, se parametriza nombres de directMessages y globalReply para ser personalizados, se separa CommandListenersConfig, QueryListenerConfig y EventListenersConfig
1 parent 4f068e4 commit 24638d6

File tree

19 files changed

+335
-119
lines changed

19 files changed

+335
-119
lines changed

README.md

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,31 @@ app.async.retryDelay=1000
215215
app.async.maxRetries=10
216216
217217
```
218+
### Domain custom Configuration (RabbitMQ)
219+
220+
221+
```
222+
app.async.domain.events.exchange=exchangeCustomName
223+
app.async.domain.events.maxLengthBytes=125000000
224+
225+
```
226+
227+
### Direct custom Configuration (RabbitMQ)
228+
229+
230+
```
231+
app.async.direct.exchange=exchangeCustomName
232+
app.async.direct.maxLengthBytes=125000000
233+
```
234+
235+
### Global custom Configuration (RabbitMQ)
236+
237+
238+
```
239+
app.async.global.exchange=exchangeCustomName
240+
app.async.global.maxLengthBytes=125000000
241+
```
242+
218243
* withDLQRetry: Wheter to enable or not the new Retry DLQ Strategy
219244
* retryDelay: Delay retry value in ms
220245
* maxRetries: Max number of retries in case of error in adition to the one automatic retry per queue.
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package org.reactivecommons.async.impl.config;
2+
3+
import lombok.RequiredArgsConstructor;
4+
import org.reactivecommons.async.impl.DiscardNotifier;
5+
import org.reactivecommons.async.impl.HandlerResolver;
6+
import org.reactivecommons.async.impl.communications.ReactiveMessageListener;
7+
import org.reactivecommons.async.impl.config.props.AsyncProps;
8+
import org.reactivecommons.async.impl.converters.MessageConverter;
9+
import org.reactivecommons.async.impl.listeners.ApplicationCommandListener;
10+
import org.springframework.beans.factory.annotation.Value;
11+
import org.springframework.context.annotation.Bean;
12+
import org.springframework.context.annotation.Configuration;
13+
import org.springframework.context.annotation.Import;
14+
15+
@Configuration
16+
@RequiredArgsConstructor
17+
@Import(RabbitMqConfig.class)
18+
public class CommandListenersConfig {
19+
20+
@Value("${spring.application.name}")
21+
private String appName;
22+
23+
private final AsyncProps asyncProps;
24+
25+
@Bean
26+
public ApplicationCommandListener applicationCommandListener(ReactiveMessageListener listener,
27+
HandlerResolver resolver, MessageConverter converter,
28+
DiscardNotifier discardNotifier) {
29+
ApplicationCommandListener commandListener = new ApplicationCommandListener(listener, appName, resolver,
30+
asyncProps.getDirect().getExchange(), converter, asyncProps.getWithDLQRetry(), asyncProps.getMaxRetries(),
31+
asyncProps.getRetryDelay(), asyncProps.getDirect().getMaxLengthBytes(), discardNotifier);
32+
33+
commandListener.startListener();
34+
35+
return commandListener;
36+
}
37+
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package org.reactivecommons.async.impl.config;
2+
3+
import lombok.RequiredArgsConstructor;
4+
import org.reactivecommons.async.impl.DiscardNotifier;
5+
import org.reactivecommons.async.impl.HandlerResolver;
6+
import org.reactivecommons.async.impl.communications.ReactiveMessageListener;
7+
import org.reactivecommons.async.impl.config.props.AsyncProps;
8+
import org.reactivecommons.async.impl.converters.MessageConverter;
9+
import org.reactivecommons.async.impl.listeners.ApplicationEventListener;
10+
import org.springframework.beans.factory.annotation.Value;
11+
import org.springframework.context.annotation.Bean;
12+
import org.springframework.context.annotation.Configuration;
13+
import org.springframework.context.annotation.Import;
14+
15+
@Configuration
16+
@RequiredArgsConstructor
17+
@Import(RabbitMqConfig.class)
18+
public class EventListenersConfig {
19+
20+
@Value("${spring.application.name}")
21+
private String appName;
22+
23+
private final AsyncProps asyncProps;
24+
25+
@Bean
26+
public ApplicationEventListener eventListener(HandlerResolver resolver, MessageConverter messageConverter,
27+
ReactiveMessageListener receiver, DiscardNotifier discardNotifier) {
28+
29+
final ApplicationEventListener listener = new ApplicationEventListener(receiver,
30+
appName + ".subsEvents", resolver, asyncProps.getDomain().getEvents().getExchange(),
31+
messageConverter, asyncProps.getWithDLQRetry(), asyncProps.getMaxRetries(), asyncProps.getRetryDelay(),asyncProps.getDomain().getEvents().getMaxLengthBytes(),
32+
discardNotifier);
33+
34+
listener.startListener();
35+
36+
return listener;
37+
}
38+
}
Lines changed: 1 addition & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,16 @@
11
package org.reactivecommons.async.impl.config;
22

33
import lombok.RequiredArgsConstructor;
4-
import org.reactivecommons.async.api.DefaultCommandHandler;
5-
import org.reactivecommons.async.api.DefaultQueryHandler;
6-
import org.reactivecommons.async.api.DynamicRegistry;
7-
import org.reactivecommons.async.api.HandlerRegistry;
8-
import org.reactivecommons.async.api.handlers.registered.RegisteredCommandHandler;
9-
import org.reactivecommons.async.api.handlers.registered.RegisteredEventListener;
10-
import org.reactivecommons.async.api.handlers.registered.RegisteredQueryHandler;
114
import org.reactivecommons.async.impl.DiscardNotifier;
12-
import org.reactivecommons.async.impl.DynamicRegistryImp;
135
import org.reactivecommons.async.impl.HandlerResolver;
146
import org.reactivecommons.async.impl.communications.ReactiveMessageListener;
15-
import org.reactivecommons.async.impl.communications.ReactiveMessageSender;
167
import org.reactivecommons.async.impl.config.props.AsyncProps;
178
import org.reactivecommons.async.impl.converters.MessageConverter;
18-
import org.reactivecommons.async.impl.listeners.ApplicationCommandListener;
199
import org.reactivecommons.async.impl.listeners.ApplicationEventListener;
20-
import org.reactivecommons.async.impl.listeners.ApplicationQueryListener;
2110
import org.springframework.beans.factory.annotation.Value;
22-
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
23-
import org.springframework.context.ApplicationContext;
2411
import org.springframework.context.annotation.Bean;
2512
import org.springframework.context.annotation.Configuration;
2613
import org.springframework.context.annotation.Import;
27-
import reactor.core.publisher.Mono;
28-
29-
import java.util.Map;
30-
import java.util.concurrent.ConcurrentHashMap;
31-
import java.util.concurrent.ConcurrentMap;
3214

3315
@Configuration
3416
@RequiredArgsConstructor
@@ -46,89 +28,11 @@ public ApplicationEventListener eventListener(HandlerResolver resolver, MessageC
4628
ReactiveMessageListener receiver, DiscardNotifier discardNotifier) {
4729
final ApplicationEventListener listener = new ApplicationEventListener(receiver,
4830
appName + ".subsEvents", resolver, asyncProps.getDomain().getEvents().getExchange(),
49-
messageConverter, asyncProps.getWithDLQRetry(), asyncProps.getMaxRetries(), asyncProps.getRetryDelay(),
31+
messageConverter, asyncProps.getWithDLQRetry(), asyncProps.getMaxRetries(), asyncProps.getRetryDelay(),asyncProps.getDomain().getEvents().getMaxLengthBytes(),
5032
discardNotifier);
5133

5234
listener.startListener();
5335

5436
return listener;
5537
}
56-
57-
@Bean //TODO: move to own config (QueryListenerConfig)
58-
public ApplicationQueryListener queryListener(MessageConverter converter, HandlerResolver resolver,
59-
ReactiveMessageSender sender, ReactiveMessageListener rlistener,
60-
DiscardNotifier discardNotifier) {
61-
final ApplicationQueryListener listener = new ApplicationQueryListener(rlistener,
62-
appName + ".query", resolver, sender, asyncProps.getDirect().getExchange(), converter,
63-
"globalReply", asyncProps.getWithDLQRetry(), asyncProps.getMaxRetries(),
64-
asyncProps.getRetryDelay(), discardNotifier);
65-
66-
listener.startListener();
67-
68-
return listener;
69-
}
70-
71-
@Bean
72-
public ApplicationCommandListener applicationCommandListener(ReactiveMessageListener listener,
73-
HandlerResolver resolver, MessageConverter converter,
74-
DiscardNotifier discardNotifier) {
75-
ApplicationCommandListener commandListener = new ApplicationCommandListener(listener, appName, resolver,
76-
asyncProps.getDirect().getExchange(), converter, asyncProps.getWithDLQRetry(), asyncProps.getMaxRetries(),
77-
asyncProps.getRetryDelay(), discardNotifier);
78-
79-
commandListener.startListener();
80-
81-
return commandListener;
82-
}
83-
84-
@Bean
85-
public DynamicRegistry dynamicRegistry(HandlerResolver resolver, ReactiveMessageListener listener, IBrokerConfigProps props) {
86-
return new DynamicRegistryImp(resolver, listener.getTopologyCreator(), props);
87-
}
88-
89-
@Bean
90-
public HandlerResolver resolver(ApplicationContext context, DefaultCommandHandler defaultCommandHandler) {
91-
final Map<String, HandlerRegistry> registries = context.getBeansOfType(HandlerRegistry.class);
92-
93-
final ConcurrentMap<String, RegisteredQueryHandler> handlers = registries
94-
.values().stream()
95-
.flatMap(r -> r.getHandlers().stream())
96-
.collect(ConcurrentHashMap::new, (map, handler) -> map.put(handler.getPath(), handler),
97-
ConcurrentHashMap::putAll);
98-
99-
final ConcurrentMap<String, RegisteredEventListener> eventListeners = registries
100-
.values().stream()
101-
.flatMap(r -> r.getEventListeners().stream())
102-
.collect(ConcurrentHashMap::new, (map, handler) -> map.put(handler.getPath(), handler),
103-
ConcurrentHashMap::putAll);
104-
105-
final ConcurrentMap<String, RegisteredCommandHandler> commandHandlers = registries
106-
.values().stream()
107-
.flatMap(r -> r.getCommandHandlers().stream())
108-
.collect(ConcurrentHashMap::new, (map, handler) -> map.put(handler.getPath(), handler),
109-
ConcurrentHashMap::putAll);
110-
111-
return new HandlerResolver(handlers, eventListeners, commandHandlers) {
112-
@Override
113-
@SuppressWarnings("unchecked")
114-
public <T> RegisteredCommandHandler<T> getCommandHandler(String path) {
115-
final RegisteredCommandHandler<T> handler = super.getCommandHandler(path);
116-
return handler != null ? handler : new RegisteredCommandHandler<>("", defaultCommandHandler, Object.class);
117-
}
118-
};
119-
}
120-
121-
@Bean
122-
@ConditionalOnMissingBean
123-
public DefaultQueryHandler defaultHandler() {
124-
return (DefaultQueryHandler<Object, Object>) command ->
125-
Mono.error(new RuntimeException("No Handler Registered"));
126-
}
127-
128-
129-
@Bean
130-
@ConditionalOnMissingBean
131-
public DefaultCommandHandler defaultCommandHandler() {
132-
return message -> Mono.error(new RuntimeException("No Handler Registered"));
133-
}
13438
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package org.reactivecommons.async.impl.config;
2+
3+
import lombok.RequiredArgsConstructor;
4+
import org.reactivecommons.async.impl.DiscardNotifier;
5+
import org.reactivecommons.async.impl.HandlerResolver;
6+
import org.reactivecommons.async.impl.communications.ReactiveMessageListener;
7+
import org.reactivecommons.async.impl.communications.ReactiveMessageSender;
8+
import org.reactivecommons.async.impl.config.props.AsyncProps;
9+
import org.reactivecommons.async.impl.converters.MessageConverter;
10+
import org.reactivecommons.async.impl.listeners.ApplicationQueryListener;
11+
import org.springframework.beans.factory.annotation.Value;
12+
import org.springframework.context.annotation.Bean;
13+
import org.springframework.context.annotation.Configuration;
14+
import org.springframework.context.annotation.Import;
15+
16+
@Configuration
17+
@RequiredArgsConstructor
18+
@Import(RabbitMqConfig.class)
19+
public class QueryListenerConfig {
20+
21+
@Value("${spring.application.name}")
22+
private String appName;
23+
24+
private final AsyncProps asyncProps;
25+
26+
@Bean
27+
public ApplicationQueryListener queryListener(MessageConverter converter, HandlerResolver resolver,
28+
ReactiveMessageSender sender, ReactiveMessageListener rlistener,
29+
DiscardNotifier discardNotifier) {
30+
final ApplicationQueryListener listener = new ApplicationQueryListener(rlistener,
31+
appName + ".query", resolver, sender, asyncProps.getDirect().getExchange(), converter,
32+
asyncProps.getGlobal().getExchange(), asyncProps.getWithDLQRetry(), asyncProps.getMaxRetries(),
33+
asyncProps.getRetryDelay(),asyncProps.getGlobal().getMaxLengthBytes(), discardNotifier);
34+
35+
listener.startListener();
36+
37+
return listener;
38+
}
39+
}

async/async-commons-starter/src/main/java/org/reactivecommons/async/impl/config/RabbitMqConfig.java

Lines changed: 62 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,14 @@
55
import lombok.RequiredArgsConstructor;
66
import lombok.extern.java.Log;
77
import org.reactivecommons.api.domain.DomainEventBus;
8-
import org.reactivecommons.async.impl.DiscardNotifier;
9-
import org.reactivecommons.async.impl.RabbitDiscardNotifier;
10-
import org.reactivecommons.async.impl.RabbitDomainEventBus;
8+
import org.reactivecommons.async.api.DefaultCommandHandler;
9+
import org.reactivecommons.async.api.DefaultQueryHandler;
10+
import org.reactivecommons.async.api.DynamicRegistry;
11+
import org.reactivecommons.async.api.HandlerRegistry;
12+
import org.reactivecommons.async.api.handlers.registered.RegisteredCommandHandler;
13+
import org.reactivecommons.async.api.handlers.registered.RegisteredEventListener;
14+
import org.reactivecommons.async.api.handlers.registered.RegisteredQueryHandler;
15+
import org.reactivecommons.async.impl.*;
1116
import org.reactivecommons.async.impl.communications.ReactiveMessageListener;
1217
import org.reactivecommons.async.impl.communications.ReactiveMessageSender;
1318
import org.reactivecommons.async.impl.communications.TopologyCreator;
@@ -21,6 +26,7 @@
2126
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
2227
import org.springframework.boot.context.properties.EnableConfigurationProperties;
2328
import org.springframework.boot.context.properties.PropertyMapper;
29+
import org.springframework.context.ApplicationContext;
2430
import org.springframework.context.annotation.Bean;
2531
import org.springframework.context.annotation.Configuration;
2632
import org.springframework.context.annotation.Import;
@@ -29,6 +35,9 @@
2935
import reactor.util.retry.Retry;
3036

3137
import java.time.Duration;
38+
import java.util.Map;
39+
import java.util.concurrent.ConcurrentHashMap;
40+
import java.util.concurrent.ConcurrentMap;
3241
import java.util.logging.Level;
3342

3443
import static reactor.rabbitmq.ExchangeSpecification.exchange;
@@ -137,4 +146,54 @@ Mono<Connection> createConnectionMono(ConnectionFactory factory, String connecti
137146
.cache();
138147
}
139148

149+
@Bean
150+
public HandlerResolver resolver(ApplicationContext context, DefaultCommandHandler defaultCommandHandler) {
151+
final Map<String, HandlerRegistry> registries = context.getBeansOfType(HandlerRegistry.class);
152+
153+
final ConcurrentMap<String, RegisteredQueryHandler> handlers = registries
154+
.values().stream()
155+
.flatMap(r -> r.getHandlers().stream())
156+
.collect(ConcurrentHashMap::new, (map, handler) -> map.put(handler.getPath(), handler),
157+
ConcurrentHashMap::putAll);
158+
159+
final ConcurrentMap<String, RegisteredEventListener> eventListeners = registries
160+
.values().stream()
161+
.flatMap(r -> r.getEventListeners().stream())
162+
.collect(ConcurrentHashMap::new, (map, handler) -> map.put(handler.getPath(), handler),
163+
ConcurrentHashMap::putAll);
164+
165+
final ConcurrentMap<String, RegisteredCommandHandler> commandHandlers = registries
166+
.values().stream()
167+
.flatMap(r -> r.getCommandHandlers().stream())
168+
.collect(ConcurrentHashMap::new, (map, handler) -> map.put(handler.getPath(), handler),
169+
ConcurrentHashMap::putAll);
170+
171+
return new HandlerResolver(handlers, eventListeners, commandHandlers) {
172+
@Override
173+
@SuppressWarnings("unchecked")
174+
public <T> RegisteredCommandHandler<T> getCommandHandler(String path) {
175+
final RegisteredCommandHandler<T> handler = super.getCommandHandler(path);
176+
return handler != null ? handler : new RegisteredCommandHandler<>("", defaultCommandHandler, Object.class);
177+
}
178+
};
179+
}
180+
181+
@Bean
182+
public DynamicRegistry dynamicRegistry(HandlerResolver resolver, ReactiveMessageListener listener, IBrokerConfigProps props) {
183+
return new DynamicRegistryImp(resolver, listener.getTopologyCreator(), props);
184+
}
185+
186+
@Bean
187+
@ConditionalOnMissingBean
188+
public DefaultQueryHandler defaultHandler() {
189+
return (DefaultQueryHandler<Object, Object>) command ->
190+
Mono.error(new RuntimeException("No Handler Registered"));
191+
}
192+
193+
@Bean
194+
@ConditionalOnMissingBean
195+
public DefaultCommandHandler defaultCommandHandler() {
196+
return message -> Mono.error(new RuntimeException("No Handler Registered"));
197+
}
198+
140199
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package org.reactivecommons.async.impl.config.annotations;
2+
3+
import org.reactivecommons.async.impl.config.CommandListenersConfig;
4+
import org.springframework.context.annotation.Configuration;
5+
import org.springframework.context.annotation.Import;
6+
7+
import java.lang.annotation.*;
8+
9+
10+
@Retention(RetentionPolicy.RUNTIME)
11+
@Target({ElementType.TYPE})
12+
@Documented
13+
@Import(CommandListenersConfig.class)
14+
@Configuration
15+
public @interface EnableCommandListeners {
16+
}
17+
18+
19+

0 commit comments

Comments
 (0)