Skip to content

Commit 20fcbe1

Browse files
committed
enable dynamic registry serveQuery handler
1 parent 7466e15 commit 20fcbe1

File tree

4 files changed

+40
-6
lines changed

4 files changed

+40
-6
lines changed

async/async-commons-api/src/main/java/org/reactivecommons/async/api/DynamicRegistry.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,16 @@
11
package org.reactivecommons.async.api;
22

33
import org.reactivecommons.async.api.handlers.EventHandler;
4+
import org.reactivecommons.async.api.handlers.QueryHandler;
45
import reactor.core.publisher.Mono;
56

67
public interface DynamicRegistry {
78

89
@Deprecated
910
<T> Mono<Void> listenEvent(String eventName, EventHandler<T> fn, Class<T> eventClass);
1011

12+
<T, R> void serveQuery(String resource, QueryHandler<T, R> handler, Class<R> queryClass);
13+
1114
Mono<Void> startListeningEvent(String eventName);
1215

1316
Mono<Void> stopListeningEvent(String eventName);

async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/DynamicRegistryImp.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,11 @@
44
import lombok.RequiredArgsConstructor;
55
import org.reactivecommons.async.api.DynamicRegistry;
66
import org.reactivecommons.async.api.handlers.EventHandler;
7+
import org.reactivecommons.async.api.handlers.QueryHandler;
78
import org.reactivecommons.async.api.handlers.registered.RegisteredEventListener;
8-
import org.reactivecommons.async.rabbit.communications.TopologyCreator;
9+
import org.reactivecommons.async.api.handlers.registered.RegisteredQueryHandler;
910
import org.reactivecommons.async.commons.config.IBrokerConfigProps;
11+
import org.reactivecommons.async.rabbit.communications.TopologyCreator;
1012
import reactor.core.publisher.Mono;
1113
import reactor.rabbitmq.BindingSpecification;
1214

@@ -26,6 +28,11 @@ public <T> Mono<Void> listenEvent(String eventName, EventHandler<T> fn, Class<T>
2628
.then();
2729
}
2830

31+
@Override
32+
public <T, R> void serveQuery(String resource, QueryHandler<T, R> handler, Class<R> queryClass) {
33+
resolver.addQueryHandler(new RegisteredQueryHandler<>(resource, (ignored, message) -> handler.handle(message), queryClass));
34+
}
35+
2936
@Override
3037
public Mono<Void> startListeningEvent(String eventName) {
3138
return topologyCreator.bind(buildBindingSpecification(eventName))

async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/HandlerResolver.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package org.reactivecommons.async.rabbit;
22

33
import lombok.RequiredArgsConstructor;
4+
import lombok.extern.java.Log;
45
import org.reactivecommons.async.api.handlers.registered.RegisteredCommandHandler;
56
import org.reactivecommons.async.api.handlers.registered.RegisteredEventListener;
67
import org.reactivecommons.async.api.handlers.registered.RegisteredQueryHandler;
@@ -13,6 +14,7 @@
1314
import java.util.Set;
1415
import java.util.function.Function;
1516

17+
@Log
1618
@RequiredArgsConstructor
1719
public class HandlerResolver {
1820

@@ -73,6 +75,13 @@ void addEventListener(RegisteredEventListener<?> listener) {
7375
eventListeners.put(listener.getPath(), listener);
7476
}
7577

78+
void addQueryHandler(RegisteredQueryHandler<?, ?> handler) {
79+
if (handler.getPath().contains("*")) {
80+
log.warning("avoid * in dynamic handlers, make sure you have no conflicts with cached patterns");
81+
}
82+
queryHandlers.put(handler.getPath(), handler);
83+
}
84+
7685
private <T> Function<String, T> getMatchHandler(Map<String, T> handlers) {
7786
return name -> {
7887
String matched = matcher.match(handlers.keySet(), name);

async/async-rabbit/src/test/java/org/reactivecommons/async/rabbit/DynamicRegistryImpTest.java

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,8 @@
1111
import org.reactivecommons.async.api.handlers.registered.RegisteredCommandHandler;
1212
import org.reactivecommons.async.api.handlers.registered.RegisteredEventListener;
1313
import org.reactivecommons.async.api.handlers.registered.RegisteredQueryHandler;
14-
import org.reactivecommons.async.rabbit.DynamicRegistryImp;
15-
import org.reactivecommons.async.rabbit.HandlerResolver;
16-
import org.reactivecommons.async.rabbit.communications.TopologyCreator;
1714
import org.reactivecommons.async.commons.config.IBrokerConfigProps;
15+
import org.reactivecommons.async.rabbit.communications.TopologyCreator;
1816
import reactor.core.publisher.Mono;
1917
import reactor.rabbitmq.BindingSpecification;
2018
import reactor.test.StepVerifier;
@@ -52,13 +50,12 @@ void setUp() {
5250
Map<String, RegisteredQueryHandler<?, ?>> queryHandlers = new ConcurrentHashMap<>();
5351
resolver = new HandlerResolver(queryHandlers, eventListeners, notificationEventListeners,
5452
dynamicEventsHandlers, commandHandlers);
55-
when(props.getDomainEventsExchangeName()).thenReturn("domainEx");
56-
when(props.getEventsQueue()).thenReturn("events.queue");
5753
dynamicRegistry = new DynamicRegistryImp(resolver, topologyCreator, props);
5854
}
5955

6056
@Test
6157
void registerEventListener() {
58+
setupMock();
6259
when(topologyCreator.bind(any())).thenReturn(just(mock(BindOk.class)));
6360
dynamicRegistry.listenEvent("event1", message -> Mono.empty(), Long.class);
6461

@@ -68,6 +65,7 @@ void registerEventListener() {
6865

6966
@Test
7067
void declareBindingWhenRegisterEventListener() {
68+
setupMock();
7169
ArgumentCaptor<BindingSpecification> captor = ArgumentCaptor.forClass(BindingSpecification.class);
7270
when(topologyCreator.bind(any())).thenReturn(just(mock(BindOk.class)));
7371

@@ -82,6 +80,7 @@ void declareBindingWhenRegisterEventListener() {
8280

8381
@Test
8482
void subscribeToResultWhenRegisterEventListener() {
83+
setupMock();
8584
PublisherProbe<BindOk> probe = PublisherProbe.of(just(mock(BindOk.class)));
8685
when(topologyCreator.bind(any())).thenReturn(probe.mono());
8786

@@ -94,6 +93,7 @@ void subscribeToResultWhenRegisterEventListener() {
9493

9594
@Test
9695
void shouldBindDomainEventsToEventsQueueUsingEventName() {
96+
setupMock();
9797
ArgumentCaptor<BindingSpecification> bindingSpecificationCaptor =
9898
ArgumentCaptor.forClass(BindingSpecification.class);
9999

@@ -119,6 +119,7 @@ void shouldBindDomainEventsToEventsQueueUsingEventName() {
119119

120120
@Test
121121
void shouldUnbindDomainEventsToEventsQueueUsingEventName() {
122+
setupMock();
122123
ArgumentCaptor<BindingSpecification> bindingSpecificationCaptor =
123124
ArgumentCaptor.forClass(BindingSpecification.class);
124125

@@ -142,5 +143,19 @@ void shouldUnbindDomainEventsToEventsQueueUsingEventName() {
142143
topologyCreatorProbe.assertWasSubscribed();
143144
}
144145

146+
@Test
147+
void serveQueryShouldAddHandler() {
148+
dynamicRegistry.serveQuery("dynamic.query", message -> Mono.just(message * 100), Integer.class);
149+
150+
final RegisteredQueryHandler<Integer, Integer> handler = resolver.getQueryHandler("dynamic.query");
151+
StepVerifier.create(handler.getHandler().handle(null, 50))
152+
.expectNext(5000)
153+
.verifyComplete();
154+
}
155+
156+
private void setupMock(){
157+
when(props.getDomainEventsExchangeName()).thenReturn("domainEx");
158+
when(props.getEventsQueue()).thenReturn("events.queue");
159+
}
145160

146161
}

0 commit comments

Comments
 (0)