Skip to content

Commit 51ffd18

Browse files
committed
feat(handler): add support for queue listeners and enhance HandlerRegistry
1 parent d77f14f commit 51ffd18

File tree

39 files changed

+1098
-50
lines changed

39 files changed

+1098
-50
lines changed

acceptance/async-tests/src/test/java/org/reactivecommons/test/QueryProcessPerfTest.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,8 @@ class QueryProcessPerfTest {
3232
private static final String QUERY_NAME = "app.command.test";
3333
private static final int MESSAGE_COUNT = 40000;
3434
private static final Semaphore semaphore = new Semaphore(0);
35-
private static final AtomicLong atomicLong = new AtomicLong(0);
36-
private static final CountDownLatch latch = new CountDownLatch(12 + 1);
35+
// private static final AtomicLong atomicLong = new AtomicLong(0);
36+
// private static final CountDownLatch latch = new CountDownLatch(12 + 1);
3737

3838
@Autowired
3939
private DirectAsyncGateway gateway;
@@ -75,7 +75,7 @@ private void assertMessageThroughput(long total, long messageCount, int reqMicro
7575
private Flux<AsyncQuery<DummyMessage>> createMessages(int count) {
7676
final List<AsyncQuery<DummyMessage>> queryList = IntStream.range(0, count)
7777
.mapToObj(_v -> new AsyncQuery<>(QUERY_NAME, new DummyMessage()))
78-
.collect(Collectors.toList());
78+
.toList();
7979
return Flux.fromIterable(queryList);
8080
}
8181

@@ -95,6 +95,7 @@ public HandlerRegistry registry() {
9595
"app.command.name" + i, message -> Mono.empty(), Map.class
9696
))
9797
.block();
98+
assert registry != null;
9899
return registry
99100
.serveQuery(QUERY_NAME, this::handleSimple, DummyMessage.class);
100101
}

async/async-commons-api/src/main/java/org/reactivecommons/async/api/HandlerRegistry.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,13 @@
1313
import org.reactivecommons.async.api.handlers.QueryHandlerDelegate;
1414
import org.reactivecommons.async.api.handlers.RawCommandHandler;
1515
import org.reactivecommons.async.api.handlers.RawEventHandler;
16+
import org.reactivecommons.async.api.handlers.TopologyHandlerSetup;
1617
import org.reactivecommons.async.api.handlers.registered.RegisteredCommandHandler;
1718
import org.reactivecommons.async.api.handlers.registered.RegisteredDomainHandlers;
1819
import org.reactivecommons.async.api.handlers.registered.RegisteredEventListener;
1920
import org.reactivecommons.async.api.handlers.registered.RegisteredQueryHandler;
21+
import org.reactivecommons.async.api.handlers.registered.RegisteredQueueListener;
22+
import reactor.core.publisher.Mono;
2023

2124
import java.lang.reflect.ParameterizedType;
2225
import java.util.concurrent.CopyOnWriteArrayList;
@@ -35,6 +38,8 @@ public final class HandlerRegistry {
3538
private final RegisteredDomainHandlers<RegisteredQueryHandler<?, ?>> handlers = new RegisteredDomainHandlers<>();
3639
private final RegisteredDomainHandlers<RegisteredCommandHandler<?, ?>> commandHandlers =
3740
new RegisteredDomainHandlers<>();
41+
private final RegisteredDomainHandlers<RegisteredQueueListener> queueHandlers =
42+
new RegisteredDomainHandlers<>();
3843

3944

4045
public static HandlerRegistry register() {
@@ -176,6 +181,26 @@ public HandlerRegistry serveCloudEventQuery(String resource, QueryHandlerDelegat
176181
return this;
177182
}
178183

184+
// Queues
185+
public HandlerRegistry listenQueue(String queueName, RawEventHandler<RawMessage> handler) {
186+
return listenQueue(queueName, handler, creator -> Mono.empty());
187+
}
188+
189+
public HandlerRegistry listenQueue(String domain, String queueName, RawEventHandler<RawMessage> handler) {
190+
return listenQueue(domain, queueName, handler, creator -> Mono.empty());
191+
}
192+
193+
public HandlerRegistry listenQueue(String queueName, RawEventHandler<RawMessage> handler,
194+
TopologyHandlerSetup topologyCreator) {
195+
queueHandlers.add(DEFAULT_DOMAIN, new RegisteredQueueListener(queueName, handler, topologyCreator));
196+
return this;
197+
}
198+
199+
public HandlerRegistry listenQueue(String domain, String queueName, RawEventHandler<RawMessage> handler,
200+
TopologyHandlerSetup topologyCreator) {
201+
queueHandlers.add(domain, new RegisteredQueueListener(queueName, handler, topologyCreator));
202+
return this;
203+
}
179204

180205
@Deprecated(forRemoval = true)
181206
public <T> HandlerRegistry listenEvent(String eventName, DomainEventHandler<T> handler) {
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package org.reactivecommons.async.api.handlers;
2+
3+
import reactor.core.publisher.Mono;
4+
5+
/**
6+
* Interface for setting up topology handlers.
7+
*/
8+
@FunctionalInterface
9+
public interface TopologyHandlerSetup {
10+
Mono<Void> setup(Object topologyCreator);
11+
}
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package org.reactivecommons.async.api.handlers.registered;
2+
3+
import org.reactivecommons.api.domain.RawMessage;
4+
import org.reactivecommons.async.api.handlers.RawEventHandler;
5+
import org.reactivecommons.async.api.handlers.TopologyHandlerSetup;
6+
7+
8+
public record RegisteredQueueListener(String queueName,
9+
RawEventHandler<RawMessage> handler,
10+
TopologyHandlerSetup topologyHandlerSetup) {
11+
}

async/async-commons/src/main/java/org/reactivecommons/async/commons/HandlerResolver.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
package org.reactivecommons.async.commons;
22

3+
import lombok.Getter;
34
import lombok.RequiredArgsConstructor;
45
import lombok.extern.java.Log;
56
import org.reactivecommons.async.api.handlers.registered.RegisteredCommandHandler;
67
import org.reactivecommons.async.api.handlers.registered.RegisteredEventListener;
78
import org.reactivecommons.async.api.handlers.registered.RegisteredQueryHandler;
9+
import org.reactivecommons.async.api.handlers.registered.RegisteredQueueListener;
810
import org.reactivecommons.async.commons.utils.matcher.KeyMatcher;
911
import org.reactivecommons.async.commons.utils.matcher.Matcher;
1012

@@ -23,6 +25,8 @@ public class HandlerResolver {
2325
private final Map<String, RegisteredEventListener<?, ?>> eventsToBind;
2426
private final Map<String, RegisteredEventListener<?, ?>> eventNotificationListeners;
2527
private final Map<String, RegisteredCommandHandler<?, ?>> commandHandlers;
28+
@Getter
29+
private final Map<String, RegisteredQueueListener> queueListeners;
2630

2731
private final Matcher matcher = new KeyMatcher();
2832

async/async-commons/src/main/java/org/reactivecommons/async/commons/utils/resolver/HandlerResolverBuilder.java

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import org.reactivecommons.async.api.handlers.registered.RegisteredCommandHandler;
99
import org.reactivecommons.async.api.handlers.registered.RegisteredEventListener;
1010
import org.reactivecommons.async.api.handlers.registered.RegisteredQueryHandler;
11+
import org.reactivecommons.async.api.handlers.registered.RegisteredQueueListener;
1112
import org.reactivecommons.async.commons.HandlerResolver;
1213

1314
import java.util.List;
@@ -42,7 +43,17 @@ public static HandlerResolver buildResolver(String domain, Map<String, HandlerRe
4243
-> map.put(handler.path(), handler), ConcurrentHashMap::putAll
4344
);
4445

45-
final ConcurrentMap<String, RegisteredEventListener<?, ?>> eventNotificationListener = registries
46+
final ConcurrentMap<String, RegisteredQueueListener> queueListeners = registries
47+
.values()
48+
.stream()
49+
.flatMap(r -> r.getQueueHandlers()
50+
.getOrDefault(domain, List.of())
51+
.stream())
52+
.collect(ConcurrentHashMap::new, (map, handler)
53+
-> map.put(handler.queueName(), handler), ConcurrentHashMap::putAll
54+
);
55+
56+
final ConcurrentMap<String, RegisteredEventListener<?, ?>> eventNotificationListeners = registries
4657
.values()
4758
.stream()
4859
.flatMap(r -> r.getEventNotificationListener()
@@ -55,11 +66,11 @@ public static HandlerResolver buildResolver(String domain, Map<String, HandlerRe
5566
final ConcurrentMap<String, RegisteredEventListener<?, ?>> eventsToBind = getEventsToBind(domain,
5667
registries);
5768

58-
final ConcurrentMap<String, RegisteredEventListener<?, ?>> eventHandlers =
69+
final ConcurrentMap<String, RegisteredEventListener<?, ?>> eventListeners =
5970
getEventHandlersWithDynamics(domain, registries);
6071

61-
return new HandlerResolver(queryHandlers, eventHandlers, eventsToBind, eventNotificationListener,
62-
commandHandlers) {
72+
return new HandlerResolver(queryHandlers, eventListeners, eventsToBind, eventNotificationListeners,
73+
commandHandlers, queueListeners) {
6374
@Override
6475
@SuppressWarnings("unchecked")
6576
public <T, D> RegisteredCommandHandler<T, D> getCommandHandler(String path) {

async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/listeners/ApplicationCommandListener.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ public ApplicationCommandListener(ReactiveMessageListener listener,
6464
this.maxLengthBytes = maxLengthBytes;
6565
}
6666

67+
@Override
6768
protected Mono<Void> setUpBindings(TopologyCreator creator) {
6869
final Mono<AMQP.Exchange.DeclareOk> declareExchange = creator.declare(
6970
ExchangeSpecification.exchange(directExchange).durable(true).type("direct")

async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/listeners/ApplicationEventListener.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ public ApplicationEventListener(ReactiveMessageListener receiver,
6161
this.appName = appName;
6262
}
6363

64+
@Override
6465
protected Mono<Void> setUpBindings(TopologyCreator creator) {
6566
final Mono<AMQP.Exchange.DeclareOk> declareExchange = creator.declare(
6667
ExchangeSpecification.exchange(eventsExchange).durable(true).type("topic")

async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/listeners/ApplicationNotificationListener.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ public ApplicationNotificationListener(ReactiveMessageListener receiver,
5050
this.createTopology = createTopology;
5151
}
5252

53+
@Override
5354
protected Mono<Void> setUpBindings(TopologyCreator creator) {
5455

5556
final Mono<AMQP.Queue.DeclareOk> declareQueue = creator.declare(
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
package org.reactivecommons.async.rabbit.listeners;
2+
3+
import lombok.extern.java.Log;
4+
import org.reactivecommons.async.api.handlers.registered.RegisteredQueueListener;
5+
import org.reactivecommons.async.commons.DiscardNotifier;
6+
import org.reactivecommons.async.commons.communications.Message;
7+
import org.reactivecommons.async.commons.ext.CustomReporter;
8+
import org.reactivecommons.async.rabbit.communications.ReactiveMessageListener;
9+
import org.reactivecommons.async.rabbit.communications.TopologyCreator;
10+
import reactor.core.publisher.Mono;
11+
import reactor.rabbitmq.AcknowledgableDelivery;
12+
13+
import java.util.function.Function;
14+
15+
@Log
16+
public class ApplicationQueueListener extends GenericMessageListener {
17+
18+
private final RegisteredQueueListener registeredListener;
19+
20+
public ApplicationQueueListener(ReactiveMessageListener listener,
21+
boolean withDLQRetry,
22+
long maxRetries,
23+
int retryDelay,
24+
RegisteredQueueListener registeredListener,
25+
DiscardNotifier discardNotifier,
26+
CustomReporter errorReporter) {
27+
super(registeredListener.queueName(), listener, withDLQRetry, true, maxRetries, retryDelay,
28+
discardNotifier, "queue", errorReporter);
29+
this.registeredListener = registeredListener;
30+
}
31+
32+
@Override
33+
protected Mono<Void> setUpBindings(TopologyCreator creator) {
34+
return registeredListener.topologyHandlerSetup().setup(creator);
35+
}
36+
37+
@Override
38+
protected Function<Message, Mono<Object>> rawMessageHandler(String executorPath) {
39+
return message -> registeredListener.handler().handle(message).cast(Object.class);
40+
}
41+
42+
protected String getExecutorPath(AcknowledgableDelivery msj) {
43+
return registeredListener.queueName();
44+
}
45+
46+
@Override
47+
protected Object parseMessageForReporter(Message message) {
48+
return message;
49+
}
50+
51+
@Override
52+
protected String getKind() {
53+
return "queue";
54+
}
55+
}
56+
57+
58+

0 commit comments

Comments
 (0)