Skip to content

Commit f7a695c

Browse files
author
Daniel Bustamante Ospina
authored
Merge branch 'master' into separate-messagelistenersconfig
2 parents 98a9c39 + 0c8812d commit f7a695c

File tree

11 files changed

+1280
-17
lines changed

11 files changed

+1280
-17
lines changed

README.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,23 +11,23 @@ To include all (API and implementation) (Spring boot Starter):
1111
```groovy
1212
1313
dependencies {
14-
compile 'org.reactivecommons:async-commons-starter:0.4.6'
14+
compile 'org.reactivecommons:async-commons-starter:0.5.0'
1515
}
1616
```
1717

1818
To include only domain events API:
1919

2020
```groovy
2121
dependencies {
22-
compile 'org.reactivecommons:domain-events-api:0.4.6'
22+
compile 'org.reactivecommons:domain-events-api:0.5.0'
2323
}
2424
```
2525

2626
To include only async commons API:
2727

2828
```groovy
2929
dependencies {
30-
compile 'org.reactivecommons:async-commons-api:0.4.6'
30+
compile 'org.reactivecommons:async-commons-api:0.5.0'
3131
}
3232
```
3333

async/async-commons/async-commons.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,4 +79,6 @@ dependencies {
7979
api "io.projectreactor.rabbitmq:reactor-rabbitmq"
8080
api 'com.fasterxml.jackson.core:jackson-databind'
8181
testImplementation 'io.projectreactor:reactor-test'
82+
testCompile group: 'org.databene', name: 'contiperf', version: '2.3.4'
83+
8284
}

async/async-commons/src/main/java/org/reactivecommons/async/impl/listeners/ApplicationEventListener.java

Lines changed: 27 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
import org.reactivecommons.async.impl.HandlerResolver;
1414
import org.reactivecommons.async.impl.communications.ReactiveMessageListener;
1515
import org.reactivecommons.async.impl.communications.TopologyCreator;
16+
import org.reactivecommons.async.impl.utils.matcher.KeyMatcher;
17+
import org.reactivecommons.async.impl.utils.matcher.Matcher;
1618
import reactor.core.publisher.Flux;
1719
import reactor.core.publisher.Mono;
1820
import reactor.rabbitmq.AcknowledgableDelivery;
@@ -21,7 +23,9 @@
2123
import reactor.rabbitmq.QueueSpecification;
2224

2325
import java.util.Optional;
26+
import java.util.Set;
2427
import java.util.function.Function;
28+
import java.util.stream.Collectors;
2529

2630
import static reactor.core.publisher.Flux.fromIterable;
2731

@@ -34,15 +38,28 @@ public class ApplicationEventListener extends GenericMessageListener {
3438
private final boolean withDLQRetry;
3539
private final int retryDelay;
3640
private final Optional<Integer> maxLengthBytes;
41+
private final Matcher keyMatcher;
3742

38-
public ApplicationEventListener(ReactiveMessageListener receiver, String queueName, HandlerResolver resolver, String eventsExchange, MessageConverter messageConverter, boolean withDLQRetry, long maxRetries, int retryDelay, Optional<Integer> maxLengthBytes, DiscardNotifier discardNotifier) {
43+
44+
45+
46+
public ApplicationEventListener(ReactiveMessageListener receiver,
47+
String queueName,
48+
HandlerResolver resolver,
49+
String eventsExchange,
50+
MessageConverter messageConverter,
51+
boolean withDLQRetry,
52+
long maxRetries, int retryDelay,
53+
Optional<Integer> maxLengthBytes,
54+
DiscardNotifier discardNotifier) {
3955
super(queueName, receiver, withDLQRetry, maxRetries, discardNotifier, "event");
4056
this.retryDelay = retryDelay;
4157
this.withDLQRetry = withDLQRetry;
4258
this.resolver = resolver;
4359
this.eventsExchange = eventsExchange;
4460
this.messageConverter = messageConverter;
4561
this.maxLengthBytes = maxLengthBytes;
62+
this.keyMatcher = new KeyMatcher();
4663
}
4764

4865
protected Mono<Void> setUpBindings(TopologyCreator creator) {
@@ -66,26 +83,24 @@ protected Mono<Void> setUpBindings(TopologyCreator creator) {
6683

6784
@Override
6885
protected Function<Message, Mono<Object>> rawMessageHandler(String executorPath) {
69-
final RegisteredEventListener<Object> handler = resolver.getEventListener(executorPath);
86+
final Set<String> listenerKeys = resolver.getEventListeners()
87+
.stream()
88+
.map(RegisteredEventListener::getPath)
89+
.collect(Collectors.toSet());
90+
final String matchedKey = keyMatcher.match(listenerKeys, executorPath);
91+
final RegisteredEventListener<Object> handler = resolver.getEventListener(matchedKey);
7092
final Class<Object> eventClass = handler.getInputClass();
7193
Function<Message, DomainEvent<Object>> converter = msj -> messageConverter.readDomainEvent(msj, eventClass);
7294
final EventExecutor<Object> executor = new EventExecutor<>(handler.getHandler(), converter);
73-
return msj -> executor.execute(msj).cast(Object.class);
95+
return msj -> executor
96+
.execute(msj)
97+
.cast(Object.class);
7498
}
7599

76100
protected String getExecutorPath(AcknowledgableDelivery msj) {
77101
return msj.getEnvelope().getRoutingKey();
78102
}
79103

80-
81-
@Data
82-
private static class DomainEventInt {
83-
private String name;
84-
private String eventId;
85-
private JsonNode data;
86-
}
87-
88-
89104
}
90105

91106

async/async-commons/src/main/java/org/reactivecommons/async/impl/listeners/GenericMessageListener.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,8 @@ private <T> Flux<T> outerFailureProtection(Flux<T> messageFlux) {
112112
} catch (Exception e) {
113113
log.log(Level.SEVERE, "Error returning message in failure!", e);
114114
}
115+
} else {
116+
throw new RuntimeException(throwable);
115117
}
116118
});
117119
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package org.reactivecommons.async.impl.utils.matcher;
2+
3+
import lombok.Data;
4+
5+
import java.util.Comparator;
6+
7+
@Data
8+
public class Candidate implements Comparable<Candidate>, Comparator<Candidate> {
9+
private final String key;
10+
private final long score;
11+
12+
@Override
13+
public int compareTo(Candidate o) {
14+
return (int) (this.score - o.score);
15+
}
16+
17+
@Override
18+
public int compare(Candidate o1, Candidate o2) {
19+
return (int) (o1.score - o2.score);
20+
}
21+
}
22+
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
package org.reactivecommons.async.impl.utils.matcher;
2+
3+
import lombok.Data;
4+
import reactor.util.function.Tuple2;
5+
import reactor.util.function.Tuples;
6+
7+
import java.util.Set;
8+
import java.util.stream.IntStream;
9+
10+
@Data
11+
public class KeyMatcher implements Matcher {
12+
private static final String WILD_CARD = "*";
13+
private static final String SEGMENT_DELIMITER_REGEX = "\\.";
14+
15+
private String[] getSegments(String s) {
16+
return s.split(SEGMENT_DELIMITER_REGEX);
17+
}
18+
19+
private boolean isBoundedToTarget(String[] current, String[] target) {
20+
final String lastSegment = current[current.length -1];
21+
return (current.length == target.length ||
22+
(current.length < target.length && WILD_CARD.equals(lastSegment)));
23+
}
24+
25+
private boolean isRootSegmentCoincident(String[] current, String[] target) {
26+
final String currentFirstSegment = current[0];
27+
final String targetFirstSegment = target[0];
28+
return currentFirstSegment.equalsIgnoreCase(targetFirstSegment);
29+
}
30+
31+
private boolean isCandidate(String[] current, String[] target ) {
32+
return isRootSegmentCoincident(current, target) &&
33+
isBoundedToTarget(current,target);
34+
}
35+
36+
private long calculateMatchingScore(String[] current, String[] target) {
37+
return IntStream.range(0, Math.min(current.length, target.length))
38+
.filter(segment -> current[segment].equals(WILD_CARD) ||
39+
current[segment].equalsIgnoreCase(target[segment]))
40+
.count();
41+
}
42+
43+
private long getMatchingScore(String[] current, String[] target) {
44+
final long totalMatches = calculateMatchingScore(current,target);
45+
final boolean allSegmentsMatched = totalMatches == current.length;
46+
return allSegmentsMatched ? totalMatches: 0;
47+
}
48+
49+
private Candidate getScoredCandidate(Tuple2<String,String[]> candidate, String[] target) {
50+
final long matchingScore = getMatchingScore(candidate.getT2(), target);
51+
return new Candidate(candidate.getT1(), matchingScore);
52+
}
53+
54+
private String matchMissingKey(Set<String> sources, String target ) {
55+
final String[] targetSegments = getSegments(target);
56+
return sources.stream()
57+
.map(source -> Tuples.of(source, getSegments(source)))
58+
.filter(source -> isCandidate(source.getT2(), targetSegments))
59+
.map(candidate -> getScoredCandidate(candidate, targetSegments))
60+
.max(Candidate::compareTo)
61+
.map(Candidate::getKey)
62+
.orElse(target);
63+
}
64+
65+
public String match(Set<String> sources, String target ) {
66+
return sources.contains(target) || sources.isEmpty() ?
67+
target :
68+
matchMissingKey(sources,target);
69+
}
70+
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package org.reactivecommons.async.impl.utils.matcher;
2+
3+
import java.util.Set;
4+
5+
public interface Matcher {
6+
String match(Set<String> sources, String target);
7+
}
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
package org.reactivecommons.async.impl.utils.matcher;
2+
3+
import org.databene.contiperf.PerfTest;
4+
import org.databene.contiperf.Required;
5+
import org.databene.contiperf.junit.ContiPerfRule;
6+
import org.junit.Before;
7+
import org.junit.Rule;
8+
import org.junit.Test;
9+
import org.junit.runner.RunWith;
10+
import org.springframework.test.context.ContextConfiguration;
11+
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
12+
13+
import java.io.File;
14+
import java.io.IOException;
15+
import java.nio.file.Files;
16+
import java.nio.file.Paths;
17+
import java.util.HashMap;
18+
import java.util.HashSet;
19+
import java.util.Map;
20+
import java.util.Set;
21+
import java.util.stream.Collectors;
22+
23+
@ContextConfiguration(classes = KeyMatcher.class)
24+
@RunWith(SpringJUnit4ClassRunner.class)
25+
public class KeyMatcherPerformanceTest {
26+
27+
@Rule
28+
public ContiPerfRule contiPerfRule = new ContiPerfRule();
29+
Map<String, String> candidates = new HashMap<>();
30+
31+
private KeyMatcher keyMatcher = new KeyMatcher();
32+
33+
34+
@Before
35+
public void init() {
36+
ClassLoader classLoader = getClass().getClassLoader();
37+
File file = new File(classLoader.getResource("candidateNamesForMatching.txt").getFile());
38+
try {
39+
Set<String> names = new HashSet<>(Files
40+
.readAllLines(Paths.get(file.getAbsolutePath())));
41+
candidates = names.stream()
42+
.collect(Collectors.toMap(name -> name, name -> name));
43+
} catch (IOException e) {
44+
e.printStackTrace();
45+
}
46+
}
47+
48+
@Test
49+
@PerfTest(threads = 4, duration = 4000)
50+
@Required(average = 200, max = 10000, throughput = 200)
51+
public void getFromMapTest() {
52+
String existentKey = "System.segment.3073.event.action";
53+
candidates.get(existentKey);
54+
}
55+
56+
57+
@Test
58+
@PerfTest(threads = 4, duration = 4000)
59+
@Required(average = 200, max = 10000, throughput = 200)
60+
public void matchNameBeforeTest() {
61+
keyMatcher.match(candidates.keySet(), "System.segment.1.event.action");
62+
}
63+
}
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
package org.reactivecommons.async.impl.utils.matcher;
2+
3+
import org.junit.Before;
4+
import org.junit.Test;
5+
6+
import java.util.HashSet;
7+
import java.util.Set;
8+
9+
import static org.junit.Assert.*;
10+
11+
public class KeyMatcherTest {
12+
private KeyMatcher keyMatcher;
13+
private Set<String> listeners;
14+
15+
@Before
16+
public void init() {
17+
keyMatcher = new KeyMatcher();
18+
listeners = new HashSet<>();
19+
listeners.add("A.*");
20+
listeners.add("A.B");
21+
listeners.add("A.B.*");
22+
listeners.add("A.B.C");
23+
listeners.add("A.B.*.D");
24+
listeners.add("A.B.C.D");
25+
}
26+
27+
@Test
28+
public void matchNonExistentFirstLevel() {
29+
String nonExistentTarget = "A.X";
30+
final String match = keyMatcher.match(listeners, nonExistentTarget);
31+
assertEquals("A.*", match);
32+
}
33+
34+
@Test
35+
public void matchExistentFirstLevel() {
36+
String existentTarget = "A.B";
37+
final String match = keyMatcher.match(listeners, existentTarget);
38+
assertEquals("A.B", match);
39+
}
40+
41+
@Test
42+
public void matchNonExistentSecondLevel() {
43+
String nonExistentTarget = "A.B.X";
44+
final String match = keyMatcher.match(listeners, nonExistentTarget);
45+
assertEquals("A.B.*", match);
46+
}
47+
48+
@Test
49+
public void matchExistentSecondLevel() {
50+
String existentTarget = "A.B.C";
51+
final String match = keyMatcher.match(listeners, existentTarget);
52+
assertEquals("A.B.C", match);
53+
}
54+
55+
@Test
56+
public void matchNonExistentThirdLevel() {
57+
String nonExistentTarget = "A.B.X.D";
58+
final String match = keyMatcher.match(listeners, nonExistentTarget);
59+
assertEquals("A.B.*.D", match);
60+
}
61+
62+
@Test
63+
public void matchExistentThirdLevel() {
64+
String existentTarget = "A.B.C.D";
65+
final String match = keyMatcher.match(listeners, existentTarget);
66+
assertEquals("A.B.C.D", match);
67+
}
68+
69+
@Test
70+
public void matchDefaultForNonExistent() {
71+
String nonExistentTarget = "A.W.X.Y.Z";
72+
final String match = keyMatcher.match(listeners, nonExistentTarget);
73+
assertEquals("A.*", match);
74+
}
75+
76+
@Test
77+
public void matchDefaultForNonExistentSecondLevel() {
78+
String nonExistentTarget = "A.B.X.Y.Z";
79+
final String match = keyMatcher.match(listeners, nonExistentTarget);
80+
assertEquals("A.B.*", match);
81+
}
82+
}

0 commit comments

Comments
 (0)