Skip to content

Commit d74460a

Browse files
author
Daniel Bustamante Ospina
authored
Merge pull request #62 from reactive-commons/feature/dynamic-registry
Feature/dynamic registry
2 parents 7020d43 + a9b0655 commit d74460a

File tree

13 files changed

+239
-76
lines changed

13 files changed

+239
-76
lines changed

async/async-commons-api/src/main/java/org/reactivecommons/async/api/DynamicRegistry.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,16 @@
11
package org.reactivecommons.async.api;
22

33
import org.reactivecommons.async.api.handlers.EventHandler;
4+
import org.reactivecommons.async.api.handlers.QueryHandler;
45
import reactor.core.publisher.Mono;
56

67
public interface DynamicRegistry {
78

89
@Deprecated
910
<T> Mono<Void> listenEvent(String eventName, EventHandler<T> fn, Class<T> eventClass);
1011

12+
<T, R> void serveQuery(String resource, QueryHandler<T, R> handler, Class<R> queryClass);
13+
1114
Mono<Void> startListeningEvent(String eventName);
1215

1316
Mono<Void> stopListeningEvent(String eventName);

async/async-commons/async-commons.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,5 +10,7 @@ dependencies {
1010
compileOnly 'io.projectreactor:reactor-core'
1111
api 'io.projectreactor.rabbitmq:reactor-rabbitmq'
1212
api 'com.fasterxml.jackson.core:jackson-databind'
13+
implementation 'commons-io:commons-io:2.11.0'
14+
1315
testImplementation 'io.projectreactor:reactor-test'
1416
}
Lines changed: 30 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -1,70 +1,46 @@
11
package org.reactivecommons.async.commons.utils.matcher;
22

3-
import lombok.Data;
4-
import reactor.util.function.Tuple2;
5-
import reactor.util.function.Tuples;
3+
import org.apache.commons.io.FilenameUtils;
64

75
import java.util.Set;
8-
import java.util.stream.IntStream;
96

10-
@Data
117
public class KeyMatcher implements Matcher {
12-
private static final String WILD_CARD = "*";
13-
private static final String SEGMENT_DELIMITER_REGEX = "\\.";
148

15-
private String[] getSegments(String s) {
16-
return s.split(SEGMENT_DELIMITER_REGEX);
17-
}
18-
19-
private boolean isBoundedToTarget(String[] current, String[] target) {
20-
final String lastSegment = current[current.length -1];
21-
return (current.length == target.length ||
22-
(current.length < target.length && WILD_CARD.equals(lastSegment)));
23-
}
24-
25-
private boolean isRootSegmentCoincident(String[] current, String[] target) {
26-
final String currentFirstSegment = current[0];
27-
final String targetFirstSegment = target[0];
28-
return currentFirstSegment.equalsIgnoreCase(targetFirstSegment);
29-
}
30-
31-
private boolean isCandidate(String[] current, String[] target ) {
32-
return isRootSegmentCoincident(current, target) &&
33-
isBoundedToTarget(current,target);
34-
}
9+
public static final String SEPARATOR_REGEX = "\\.";
10+
public static final String WILDCARD_CHAR = "*";
3511

36-
private long calculateMatchingScore(String[] current, String[] target) {
37-
return IntStream.range(0, Math.min(current.length, target.length))
38-
.filter(segment -> current[segment].equals(WILD_CARD) ||
39-
current[segment].equalsIgnoreCase(target[segment]))
40-
.count();
41-
}
42-
43-
private long getMatchingScore(String[] current, String[] target) {
44-
final long totalMatches = calculateMatchingScore(current,target);
45-
final boolean allSegmentsMatched = totalMatches == current.length;
46-
return allSegmentsMatched ? totalMatches: 0;
12+
@Override
13+
public String match(Set<String> sources, String target) {
14+
return sources.contains(target) || sources.isEmpty() ?
15+
target :
16+
matchMissingKey(sources, target);
4717
}
4818

49-
private Candidate getScoredCandidate(Tuple2<String,String[]> candidate, String[] target) {
50-
final long matchingScore = getMatchingScore(candidate.getT2(), target);
51-
return new Candidate(candidate.getT1(), matchingScore);
19+
private String matchMissingKey(Set<String> names, String target) {
20+
return names.stream()
21+
.filter(name -> name.contains(WILDCARD_CHAR))
22+
.sorted(this::compare)
23+
.filter(name -> FilenameUtils.wildcardMatch(target, name))
24+
.findFirst()
25+
.orElse(target);
5226
}
5327

54-
private String matchMissingKey(Set<String> sources, String target ) {
55-
final String[] targetSegments = getSegments(target);
56-
return sources.stream()
57-
.map(source -> Tuples.of(source, getSegments(source)))
58-
.filter(source -> isCandidate(source.getT2(), targetSegments))
59-
.map(candidate -> getScoredCandidate(candidate, targetSegments))
60-
.max(Candidate::compareTo)
61-
.map(Candidate::getKey)
62-
.orElse(target);
28+
private int compare(String firstExpression, String secondExpression) {
29+
String[] firstExpressionArr = getSeparated(firstExpression);
30+
String[] secondExpressionArr = getSeparated(secondExpression);
31+
for (int i = 0; i < firstExpressionArr.length && i < secondExpressionArr.length; i++) {
32+
if (firstExpressionArr[i].equals(secondExpressionArr[i])) {
33+
continue;
34+
}
35+
if (firstExpressionArr[i].equals(WILDCARD_CHAR)) {
36+
return 1;
37+
}
38+
return -1;
39+
}
40+
return secondExpressionArr.length - firstExpressionArr.length;
6341
}
6442

65-
public String match(Set<String> sources, String target ) {
66-
return sources.contains(target) || sources.isEmpty() ?
67-
target :
68-
matchMissingKey(sources,target);
43+
private String[] getSeparated(String expression) {
44+
return expression.split(SEPARATOR_REGEX);
6945
}
7046
}

async/async-commons/src/test/java/org/reactivecommons/async/commons/utils/matcher/KeyMatcherTest.java

Lines changed: 64 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,19 +11,21 @@
1111

1212

1313
class KeyMatcherTest {
14-
private KeyMatcher keyMatcher;
14+
private Matcher keyMatcher;
1515
private Set<String> listeners;
1616

1717
@BeforeEach
1818
public void init() {
19-
keyMatcher = new KeyMatcher();
19+
keyMatcher = new KeyMatcher();
2020
listeners = new HashSet<>();
2121
listeners.add("A.*");
2222
listeners.add("A.B");
2323
listeners.add("A.B.*");
2424
listeners.add("A.B.C");
2525
listeners.add("A.B.*.D");
2626
listeners.add("A.B.C.D");
27+
listeners.add("W.X.Y");
28+
listeners.add("app.event-prefix.any");
2729
}
2830

2931
@Test
@@ -81,4 +83,63 @@ void matchDefaultForNonExistentSecondLevel() {
8183
final String match = keyMatcher.match(listeners, nonExistentTarget);
8284
assertEquals("A.B.*", match);
8385
}
84-
}
86+
87+
@Test
88+
void shouldNotMatch() {
89+
String nonExistentTarget = "X.Y.Z";
90+
final String match = keyMatcher.match(listeners, nonExistentTarget);
91+
assertEquals(nonExistentTarget, match);
92+
}
93+
94+
@Test
95+
void shouldNotMatchWhenNoWildcard() {
96+
String nonExistentTarget = "W.X.Y.Z";
97+
final String match = keyMatcher.match(listeners, nonExistentTarget);
98+
assertEquals(nonExistentTarget, match);
99+
}
100+
101+
@Test
102+
void shouldNotMatchWhenNoWildcardSameLength() {
103+
String nonExistentTarget = "app.event.test";
104+
final String match = keyMatcher.match(listeners, nonExistentTarget);
105+
assertEquals(nonExistentTarget, match);
106+
}
107+
108+
@Test
109+
void shouldApplyPriority() {
110+
listeners = new HashSet<>();
111+
listeners.add("*.*.*");
112+
listeners.add("prefix.*.*");
113+
listeners.add("*.middle.*");
114+
listeners.add("*.*.suffix");
115+
listeners.add("*.middle.suffix");
116+
listeners.add("prefix.*.suffix");
117+
listeners.add("prefix.middle.*");
118+
listeners.add("prefix.middle.suffix");
119+
listeners.add("prefix.other.other");
120+
listeners.add("other.middle.other");
121+
listeners.add("other.other.suffix");
122+
listeners.add("other.other.other");
123+
listeners.add("in.depend.ent");
124+
assertEquals("in.depend.ent", keyMatcher.match(listeners, "in.depend.ent"));
125+
assertEquals("other.other.other", keyMatcher.match(listeners, "other.other.other"));
126+
assertEquals("other.other.suffix", keyMatcher.match(listeners, "other.other.suffix"));
127+
assertEquals("other.middle.other", keyMatcher.match(listeners, "other.middle.other"));
128+
assertEquals("prefix.other.other", keyMatcher.match(listeners, "prefix.other.other"));
129+
assertEquals("prefix.middle.suffix", keyMatcher.match(listeners, "prefix.middle.suffix"));
130+
assertEquals("prefix.middle.*", keyMatcher.match(listeners, "prefix.middle.any"));
131+
assertEquals("prefix.middle.*", keyMatcher.match(listeners, "prefix.middle.any.any"));
132+
assertEquals("prefix.*.suffix", keyMatcher.match(listeners, "prefix.any.suffix"));
133+
assertEquals("prefix.*.suffix", keyMatcher.match(listeners, "prefix.any.any.suffix"));
134+
assertEquals("*.middle.suffix", keyMatcher.match(listeners, "any.middle.suffix"));
135+
assertEquals("*.middle.suffix", keyMatcher.match(listeners, "any.any.middle.suffix"));
136+
assertEquals("*.*.suffix", keyMatcher.match(listeners, "any.any.suffix"));
137+
assertEquals("*.*.suffix", keyMatcher.match(listeners, "any.any.any.suffix"));
138+
assertEquals("*.middle.*", keyMatcher.match(listeners, "any.middle.any"));
139+
assertEquals("*.middle.*", keyMatcher.match(listeners, "any.any.middle.any.any"));
140+
assertEquals("prefix.*.*", keyMatcher.match(listeners, "prefix.any.any"));
141+
assertEquals("prefix.*.*", keyMatcher.match(listeners, "prefix.any.any.any.any"));
142+
assertEquals("*.*.*", keyMatcher.match(listeners, "any.any.any"));
143+
assertEquals("*.*.*", keyMatcher.match(listeners, "any.any.any.any.any"));
144+
}
145+
}

async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/DynamicRegistryImp.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,11 @@
44
import lombok.RequiredArgsConstructor;
55
import org.reactivecommons.async.api.DynamicRegistry;
66
import org.reactivecommons.async.api.handlers.EventHandler;
7+
import org.reactivecommons.async.api.handlers.QueryHandler;
78
import org.reactivecommons.async.api.handlers.registered.RegisteredEventListener;
8-
import org.reactivecommons.async.rabbit.communications.TopologyCreator;
9+
import org.reactivecommons.async.api.handlers.registered.RegisteredQueryHandler;
910
import org.reactivecommons.async.commons.config.IBrokerConfigProps;
11+
import org.reactivecommons.async.rabbit.communications.TopologyCreator;
1012
import reactor.core.publisher.Mono;
1113
import reactor.rabbitmq.BindingSpecification;
1214

@@ -26,6 +28,11 @@ public <T> Mono<Void> listenEvent(String eventName, EventHandler<T> fn, Class<T>
2628
.then();
2729
}
2830

31+
@Override
32+
public <T, R> void serveQuery(String resource, QueryHandler<T, R> handler, Class<R> queryClass) {
33+
resolver.addQueryHandler(new RegisteredQueryHandler<>(resource, (ignored, message) -> handler.handle(message), queryClass));
34+
}
35+
2936
@Override
3037
public Mono<Void> startListeningEvent(String eventName) {
3138
return topologyCreator.bind(buildBindingSpecification(eventName))

async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/HandlerResolver.java

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,20 @@
11
package org.reactivecommons.async.rabbit;
22

33
import lombok.RequiredArgsConstructor;
4+
import lombok.extern.java.Log;
45
import org.reactivecommons.async.api.handlers.registered.RegisteredCommandHandler;
56
import org.reactivecommons.async.api.handlers.registered.RegisteredEventListener;
67
import org.reactivecommons.async.api.handlers.registered.RegisteredQueryHandler;
8+
import org.reactivecommons.async.commons.utils.matcher.KeyMatcher;
9+
import org.reactivecommons.async.commons.utils.matcher.Matcher;
710

811
import java.util.Collection;
912
import java.util.HashSet;
1013
import java.util.Map;
1114
import java.util.Set;
15+
import java.util.function.Function;
1216

17+
@Log
1318
@RequiredArgsConstructor
1419
public class HandlerResolver {
1520

@@ -18,15 +23,18 @@ public class HandlerResolver {
1823
private final Map<String, RegisteredEventListener<?>> eventNotificationListeners;
1924
private final Map<String, RegisteredEventListener<?>> dynamicEventsHandlers;
2025
private final Map<String, RegisteredCommandHandler<?>> commandHandlers;
26+
private final Matcher matcher = new KeyMatcher();
2127

2228
@SuppressWarnings("unchecked")
2329
public <T, M> RegisteredQueryHandler<T, M> getQueryHandler(String path) {
24-
return (RegisteredQueryHandler<T, M>) queryHandlers.get(path);
30+
return (RegisteredQueryHandler<T, M>) queryHandlers
31+
.computeIfAbsent(path, getMatchHandler(queryHandlers));
2532
}
2633

2734
@SuppressWarnings("unchecked")
2835
public <T> RegisteredCommandHandler<T> getCommandHandler(String path) {
29-
return (RegisteredCommandHandler<T>) commandHandlers.get(path);
36+
return (RegisteredCommandHandler<T>) commandHandlers
37+
.computeIfAbsent(path, getMatchHandler(commandHandlers));
3038
}
3139

3240
@SuppressWarnings("unchecked")
@@ -45,7 +53,8 @@ public Collection<RegisteredEventListener<?>> getNotificationListeners() {
4553

4654
@SuppressWarnings("unchecked")
4755
public <T> RegisteredEventListener<T> getNotificationListener(String path) {
48-
return (RegisteredEventListener<T>) eventNotificationListeners.get(path);
56+
return (RegisteredEventListener<T>) eventNotificationListeners
57+
.computeIfAbsent(path, getMatchHandler(eventNotificationListeners));
4958
}
5059

5160
public Collection<RegisteredEventListener<?>> getEventListeners() {
@@ -62,7 +71,22 @@ public Set<String> getToListenEventNames() {
6271
return toListenEventNames;
6372
}
6473

65-
void addEventListener(RegisteredEventListener listener) {
74+
void addEventListener(RegisteredEventListener<?> listener) {
6675
eventListeners.put(listener.getPath(), listener);
6776
}
77+
78+
void addQueryHandler(RegisteredQueryHandler<?, ?> handler) {
79+
if (handler.getPath().contains("*")) {
80+
throw new RuntimeException("avoid * in dynamic handlers, make sure you have no conflicts with cached patterns");
81+
}
82+
queryHandlers.put(handler.getPath(), handler);
83+
}
84+
85+
private <T> Function<String, T> getMatchHandler(Map<String, T> handlers) {
86+
return name -> {
87+
String matched = matcher.match(handlers.keySet(), name);
88+
return handlers.get(matched);
89+
};
90+
}
91+
6892
}

async/async-rabbit/src/test/java/org/reactivecommons/async/rabbit/DynamicRegistryImpTest.java

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,8 @@
1111
import org.reactivecommons.async.api.handlers.registered.RegisteredCommandHandler;
1212
import org.reactivecommons.async.api.handlers.registered.RegisteredEventListener;
1313
import org.reactivecommons.async.api.handlers.registered.RegisteredQueryHandler;
14-
import org.reactivecommons.async.rabbit.DynamicRegistryImp;
15-
import org.reactivecommons.async.rabbit.HandlerResolver;
16-
import org.reactivecommons.async.rabbit.communications.TopologyCreator;
1714
import org.reactivecommons.async.commons.config.IBrokerConfigProps;
15+
import org.reactivecommons.async.rabbit.communications.TopologyCreator;
1816
import reactor.core.publisher.Mono;
1917
import reactor.rabbitmq.BindingSpecification;
2018
import reactor.test.StepVerifier;
@@ -52,13 +50,12 @@ void setUp() {
5250
Map<String, RegisteredQueryHandler<?, ?>> queryHandlers = new ConcurrentHashMap<>();
5351
resolver = new HandlerResolver(queryHandlers, eventListeners, notificationEventListeners,
5452
dynamicEventsHandlers, commandHandlers);
55-
when(props.getDomainEventsExchangeName()).thenReturn("domainEx");
56-
when(props.getEventsQueue()).thenReturn("events.queue");
5753
dynamicRegistry = new DynamicRegistryImp(resolver, topologyCreator, props);
5854
}
5955

6056
@Test
6157
void registerEventListener() {
58+
setupMock();
6259
when(topologyCreator.bind(any())).thenReturn(just(mock(BindOk.class)));
6360
dynamicRegistry.listenEvent("event1", message -> Mono.empty(), Long.class);
6461

@@ -68,6 +65,7 @@ void registerEventListener() {
6865

6966
@Test
7067
void declareBindingWhenRegisterEventListener() {
68+
setupMock();
7169
ArgumentCaptor<BindingSpecification> captor = ArgumentCaptor.forClass(BindingSpecification.class);
7270
when(topologyCreator.bind(any())).thenReturn(just(mock(BindOk.class)));
7371

@@ -82,6 +80,7 @@ void declareBindingWhenRegisterEventListener() {
8280

8381
@Test
8482
void subscribeToResultWhenRegisterEventListener() {
83+
setupMock();
8584
PublisherProbe<BindOk> probe = PublisherProbe.of(just(mock(BindOk.class)));
8685
when(topologyCreator.bind(any())).thenReturn(probe.mono());
8786

@@ -94,6 +93,7 @@ void subscribeToResultWhenRegisterEventListener() {
9493

9594
@Test
9695
void shouldBindDomainEventsToEventsQueueUsingEventName() {
96+
setupMock();
9797
ArgumentCaptor<BindingSpecification> bindingSpecificationCaptor =
9898
ArgumentCaptor.forClass(BindingSpecification.class);
9999

@@ -119,6 +119,7 @@ void shouldBindDomainEventsToEventsQueueUsingEventName() {
119119

120120
@Test
121121
void shouldUnbindDomainEventsToEventsQueueUsingEventName() {
122+
setupMock();
122123
ArgumentCaptor<BindingSpecification> bindingSpecificationCaptor =
123124
ArgumentCaptor.forClass(BindingSpecification.class);
124125

@@ -142,5 +143,19 @@ void shouldUnbindDomainEventsToEventsQueueUsingEventName() {
142143
topologyCreatorProbe.assertWasSubscribed();
143144
}
144145

146+
@Test
147+
void serveQueryShouldAddHandler() {
148+
dynamicRegistry.serveQuery("dynamic.query", message -> Mono.just(message * 100), Integer.class);
149+
150+
final RegisteredQueryHandler<Integer, Integer> handler = resolver.getQueryHandler("dynamic.query");
151+
StepVerifier.create(handler.getHandler().handle(null, 50))
152+
.expectNext(5000)
153+
.verifyComplete();
154+
}
155+
156+
private void setupMock(){
157+
when(props.getDomainEventsExchangeName()).thenReturn("domainEx");
158+
when(props.getEventsQueue()).thenReturn("events.queue");
159+
}
145160

146161
}

0 commit comments

Comments
 (0)