Skip to content

Commit dec7e92

Browse files
authored
feat(rabbitmq): Add direct queue listener which allows compatibility with existing queues (#155)
* fix(handler): simplify handleRawCommand methods and improve RabbitMessage handling * docs(handler): add support for raw command and event handling with RabbitMQ * feat(handler): add support for queue listeners and enhance HandlerRegistry * docs(handler): update queue handling documentation for consistency and clarity
1 parent 832bde1 commit dec7e92

File tree

47 files changed

+3620
-1603
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

47 files changed

+3620
-1603
lines changed

acceptance/async-tests/src/test/java/org/reactivecommons/test/QueryProcessPerfTest.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,7 @@
1818

1919
import java.util.List;
2020
import java.util.Map;
21-
import java.util.concurrent.CountDownLatch;
2221
import java.util.concurrent.Semaphore;
23-
import java.util.concurrent.atomic.AtomicLong;
24-
import java.util.stream.Collectors;
2522
import java.util.stream.IntStream;
2623

2724
import static reactor.core.publisher.Flux.range;
@@ -32,8 +29,6 @@ class QueryProcessPerfTest {
3229
private static final String QUERY_NAME = "app.command.test";
3330
private static final int MESSAGE_COUNT = 40000;
3431
private static final Semaphore semaphore = new Semaphore(0);
35-
private static final AtomicLong atomicLong = new AtomicLong(0);
36-
private static final CountDownLatch latch = new CountDownLatch(12 + 1);
3732

3833
@Autowired
3934
private DirectAsyncGateway gateway;
@@ -75,7 +70,7 @@ private void assertMessageThroughput(long total, long messageCount, int reqMicro
7570
private Flux<AsyncQuery<DummyMessage>> createMessages(int count) {
7671
final List<AsyncQuery<DummyMessage>> queryList = IntStream.range(0, count)
7772
.mapToObj(_v -> new AsyncQuery<>(QUERY_NAME, new DummyMessage()))
78-
.collect(Collectors.toList());
73+
.toList();
7974
return Flux.fromIterable(queryList);
8075
}
8176

@@ -95,6 +90,7 @@ public HandlerRegistry registry() {
9590
"app.command.name" + i, message -> Mono.empty(), Map.class
9691
))
9792
.block();
93+
assert registry != null;
9894
return registry
9995
.serveQuery(QUERY_NAME, this::handleSimple, DummyMessage.class);
10096
}

async/async-commons-api/src/main/java/org/reactivecommons/async/api/HandlerRegistry.java

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,13 @@
1313
import org.reactivecommons.async.api.handlers.QueryHandlerDelegate;
1414
import org.reactivecommons.async.api.handlers.RawCommandHandler;
1515
import org.reactivecommons.async.api.handlers.RawEventHandler;
16+
import org.reactivecommons.async.api.handlers.TopologyHandlerSetup;
1617
import org.reactivecommons.async.api.handlers.registered.RegisteredCommandHandler;
1718
import org.reactivecommons.async.api.handlers.registered.RegisteredDomainHandlers;
1819
import org.reactivecommons.async.api.handlers.registered.RegisteredEventListener;
1920
import org.reactivecommons.async.api.handlers.registered.RegisteredQueryHandler;
21+
import org.reactivecommons.async.api.handlers.registered.RegisteredQueueListener;
22+
import reactor.core.publisher.Mono;
2023

2124
import java.lang.reflect.ParameterizedType;
2225
import java.util.concurrent.CopyOnWriteArrayList;
@@ -35,6 +38,8 @@ public final class HandlerRegistry {
3538
private final RegisteredDomainHandlers<RegisteredQueryHandler<?, ?>> handlers = new RegisteredDomainHandlers<>();
3639
private final RegisteredDomainHandlers<RegisteredCommandHandler<?, ?>> commandHandlers =
3740
new RegisteredDomainHandlers<>();
41+
private final RegisteredDomainHandlers<RegisteredQueueListener> queueHandlers =
42+
new RegisteredDomainHandlers<>();
3843

3944

4045
public static HandlerRegistry register() {
@@ -142,12 +147,12 @@ public HandlerRegistry handleCloudEventCommand(String domain, String commandName
142147
}
143148

144149
// commands: RawMessage
145-
public HandlerRegistry handleRawCommand(String commandName, RawCommandHandler<?> handler) {
146-
return handleRawCommand(DEFAULT_DOMAIN, commandName, handler);
150+
public HandlerRegistry handleRawCommand(RawCommandHandler<?> handler) {
151+
return handleRawCommand(DEFAULT_DOMAIN, handler);
147152
}
148153

149-
public HandlerRegistry handleRawCommand(String domain, String commandName, RawCommandHandler<?> handler) {
150-
commandHandlers.add(domain, new RegisteredCommandHandler<>(commandName, handler, RawMessage.class));
154+
public HandlerRegistry handleRawCommand(String domain, RawCommandHandler<?> handler) {
155+
commandHandlers.add(domain, new RegisteredCommandHandler<>("", handler, RawMessage.class));
151156
return this;
152157
}
153158

@@ -176,6 +181,26 @@ public HandlerRegistry serveCloudEventQuery(String resource, QueryHandlerDelegat
176181
return this;
177182
}
178183

184+
// Queues
185+
public HandlerRegistry listenQueue(String queueName, RawEventHandler<RawMessage> handler) {
186+
return listenQueue(queueName, handler, creator -> Mono.empty());
187+
}
188+
189+
public HandlerRegistry listenQueue(String domain, String queueName, RawEventHandler<RawMessage> handler) {
190+
return listenQueue(domain, queueName, handler, creator -> Mono.empty());
191+
}
192+
193+
public HandlerRegistry listenQueue(String queueName, RawEventHandler<RawMessage> handler,
194+
TopologyHandlerSetup topologyCreator) {
195+
queueHandlers.add(DEFAULT_DOMAIN, new RegisteredQueueListener(queueName, handler, topologyCreator));
196+
return this;
197+
}
198+
199+
public HandlerRegistry listenQueue(String domain, String queueName, RawEventHandler<RawMessage> handler,
200+
TopologyHandlerSetup topologyCreator) {
201+
queueHandlers.add(domain, new RegisteredQueueListener(queueName, handler, topologyCreator));
202+
return this;
203+
}
179204

180205
@Deprecated(forRemoval = true)
181206
public <T> HandlerRegistry listenEvent(String eventName, DomainEventHandler<T> handler) {
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package org.reactivecommons.async.api.handlers;
2+
3+
import reactor.core.publisher.Mono;
4+
5+
/**
6+
* Interface for setting up topology handlers.
7+
*/
8+
@FunctionalInterface
9+
public interface TopologyHandlerSetup {
10+
Mono<Void> setup(Object topologyCreator);
11+
}
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package org.reactivecommons.async.api.handlers.registered;
2+
3+
import org.reactivecommons.api.domain.RawMessage;
4+
import org.reactivecommons.async.api.handlers.RawEventHandler;
5+
import org.reactivecommons.async.api.handlers.TopologyHandlerSetup;
6+
7+
8+
public record RegisteredQueueListener(String queueName,
9+
RawEventHandler<RawMessage> handler,
10+
TopologyHandlerSetup topologyHandlerSetup) {
11+
}

async/async-commons-api/src/test/java/org/reactivecommons/async/api/HandlerRegistryTest.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
class HandlerRegistryTest {
2727
private final HandlerRegistry registry = HandlerRegistry.register();
2828
private final String name = "some.event";
29-
private final String nameRaw = "some.raw.event";
3029
private final String nameRawNotification = "some.raw.notification.event";
3130
private final String domain = "some-domain";
3231

@@ -222,14 +221,14 @@ void handleCloudEventCommand() {
222221
void handleRawCommand() {
223222
SomeRawCommandEventHandler eventHandler = new SomeRawCommandEventHandler();
224223

225-
registry.handleRawCommand(nameRaw, eventHandler);
224+
registry.handleRawCommand(eventHandler);
226225

227226
assertThat(registry.getCommandHandlers().get(DEFAULT_DOMAIN))
228227
.anySatisfy(registered -> assertThat(registered)
229228
.extracting(RegisteredCommandHandler::path, RegisteredCommandHandler::inputClass,
230229
RegisteredCommandHandler::handler
231230
)
232-
.containsExactly(nameRaw, RawMessage.class, eventHandler)).hasSize(1);
231+
.containsExactly("", RawMessage.class, eventHandler)).hasSize(1);
233232
}
234233

235234
@Test

async/async-commons/src/main/java/org/reactivecommons/async/commons/HandlerResolver.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
package org.reactivecommons.async.commons;
22

3+
import lombok.Getter;
34
import lombok.RequiredArgsConstructor;
45
import lombok.extern.java.Log;
56
import org.reactivecommons.async.api.handlers.registered.RegisteredCommandHandler;
67
import org.reactivecommons.async.api.handlers.registered.RegisteredEventListener;
78
import org.reactivecommons.async.api.handlers.registered.RegisteredQueryHandler;
9+
import org.reactivecommons.async.api.handlers.registered.RegisteredQueueListener;
810
import org.reactivecommons.async.commons.utils.matcher.KeyMatcher;
911
import org.reactivecommons.async.commons.utils.matcher.Matcher;
1012

@@ -23,6 +25,8 @@ public class HandlerResolver {
2325
private final Map<String, RegisteredEventListener<?, ?>> eventsToBind;
2426
private final Map<String, RegisteredEventListener<?, ?>> eventNotificationListeners;
2527
private final Map<String, RegisteredCommandHandler<?, ?>> commandHandlers;
28+
@Getter
29+
private final Map<String, RegisteredQueueListener> queueListeners;
2630

2731
private final Matcher matcher = new KeyMatcher();
2832

async/async-commons/src/main/java/org/reactivecommons/async/commons/utils/resolver/HandlerResolverBuilder.java

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import org.reactivecommons.async.api.handlers.registered.RegisteredCommandHandler;
99
import org.reactivecommons.async.api.handlers.registered.RegisteredEventListener;
1010
import org.reactivecommons.async.api.handlers.registered.RegisteredQueryHandler;
11+
import org.reactivecommons.async.api.handlers.registered.RegisteredQueueListener;
1112
import org.reactivecommons.async.commons.HandlerResolver;
1213

1314
import java.util.List;
@@ -42,7 +43,17 @@ public static HandlerResolver buildResolver(String domain, Map<String, HandlerRe
4243
-> map.put(handler.path(), handler), ConcurrentHashMap::putAll
4344
);
4445

45-
final ConcurrentMap<String, RegisteredEventListener<?, ?>> eventNotificationListener = registries
46+
final ConcurrentMap<String, RegisteredQueueListener> queueListeners = registries
47+
.values()
48+
.stream()
49+
.flatMap(r -> r.getQueueHandlers()
50+
.getOrDefault(domain, List.of())
51+
.stream())
52+
.collect(ConcurrentHashMap::new, (map, handler)
53+
-> map.put(handler.queueName(), handler), ConcurrentHashMap::putAll
54+
);
55+
56+
final ConcurrentMap<String, RegisteredEventListener<?, ?>> eventNotificationListeners = registries
4657
.values()
4758
.stream()
4859
.flatMap(r -> r.getEventNotificationListener()
@@ -55,11 +66,11 @@ public static HandlerResolver buildResolver(String domain, Map<String, HandlerRe
5566
final ConcurrentMap<String, RegisteredEventListener<?, ?>> eventsToBind = getEventsToBind(domain,
5667
registries);
5768

58-
final ConcurrentMap<String, RegisteredEventListener<?, ?>> eventHandlers =
69+
final ConcurrentMap<String, RegisteredEventListener<?, ?>> eventListeners =
5970
getEventHandlersWithDynamics(domain, registries);
6071

61-
return new HandlerResolver(queryHandlers, eventHandlers, eventsToBind, eventNotificationListener,
62-
commandHandlers) {
72+
return new HandlerResolver(queryHandlers, eventListeners, eventsToBind, eventNotificationListeners,
73+
commandHandlers, queueListeners) {
6374
@Override
6475
@SuppressWarnings("unchecked")
6576
public <T, D> RegisteredCommandHandler<T, D> getCommandHandler(String path) {

async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/RabbitMessage.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ public static class RabbitMessageProperties implements Properties {
2323
}
2424

2525
public static RabbitMessage fromDelivery(Delivery delivery) {
26-
return fromDelivery(delivery, null);
26+
return fromDelivery(delivery, "");
2727
}
2828

2929
public static RabbitMessage fromDelivery(Delivery delivery, String executorPath) {

async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/listeners/ApplicationCommandListener.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import lombok.extern.java.Log;
66
import org.reactivecommons.async.api.handlers.CloudCommandHandler;
77
import org.reactivecommons.async.api.handlers.DomainCommandHandler;
8+
import org.reactivecommons.async.api.handlers.RawCommandHandler;
89
import org.reactivecommons.async.api.handlers.registered.RegisteredCommandHandler;
910
import org.reactivecommons.async.commons.CommandExecutor;
1011
import org.reactivecommons.async.commons.DiscardNotifier;
@@ -63,6 +64,7 @@ public ApplicationCommandListener(ReactiveMessageListener listener,
6364
this.maxLengthBytes = maxLengthBytes;
6465
}
6566

67+
@Override
6668
protected Mono<Void> setUpBindings(TopologyCreator creator) {
6769
final Mono<AMQP.Exchange.DeclareOk> declareExchange = creator.declare(
6870
ExchangeSpecification.exchange(directExchange).durable(true).type("direct")
@@ -128,7 +130,10 @@ protected String getExecutorPath(AcknowledgableDelivery msj) {
128130
if (jsonNode.get(COMMAND_ID) != null) {
129131
return jsonNode.get(NAME).asText();
130132
}
131-
return jsonNode.get(TYPE).asText();
133+
if (jsonNode.get(TYPE) != null) {
134+
return jsonNode.get(TYPE).asText();
135+
}
136+
return rabbitMessage.getType();
132137
}
133138

134139
@Override
@@ -142,6 +147,8 @@ private <T, D> Function<Message, Object> resolveConverter(RegisteredCommandHandl
142147
return msj -> messageConverter.readCommand(msj, commandClass);
143148
} else if (registeredCommandHandler.handler() instanceof CloudCommandHandler) {
144149
return messageConverter::readCloudEvent;
150+
} else if (registeredCommandHandler.handler() instanceof RawCommandHandler) {
151+
return message -> message;
145152
}
146153
throw new RuntimeException("Unknown handler type");
147154
}

async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/listeners/ApplicationEventListener.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ public ApplicationEventListener(ReactiveMessageListener receiver,
6161
this.appName = appName;
6262
}
6363

64+
@Override
6465
protected Mono<Void> setUpBindings(TopologyCreator creator) {
6566
final Mono<AMQP.Exchange.DeclareOk> declareExchange = creator.declare(
6667
ExchangeSpecification.exchange(eventsExchange).durable(true).type("topic")

0 commit comments

Comments
 (0)