Skip to content

Commit 3d48a72

Browse files
committed
Add "at least broker version" condition for partition tests
They run only on latest alphas.
1 parent 34eea92 commit 3d48a72

File tree

5 files changed

+149
-1
lines changed

5 files changed

+149
-1
lines changed

src/main/java/com/rabbitmq/stream/impl/Client.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1320,6 +1320,10 @@ int serverAdvertisedPort() {
13201320
return Integer.valueOf(this.connectionProperties("advertised_port"));
13211321
}
13221322

1323+
String brokerVersion() {
1324+
return this.serverProperties.get("version");
1325+
}
1326+
13231327
private String connectionProperties(String key) {
13241328
if (this.connectionProperties != null && this.connectionProperties.containsKey(key)) {
13251329
return this.connectionProperties.get(key);

src/test/java/com/rabbitmq/stream/impl/RoutePartitionsTest.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import com.rabbitmq.client.Connection;
2222
import com.rabbitmq.client.ConnectionFactory;
23+
import com.rabbitmq.stream.impl.TestUtils.BrokerVersionAtLeast;
2324
import java.util.List;
2425
import java.util.UUID;
2526
import java.util.stream.IntStream;
@@ -52,6 +53,7 @@ void tearDown() throws Exception {
5253
}
5354

5455
@Test
56+
@BrokerVersionAtLeast("3.9.6")
5557
void routeShouldReturnEmptyListWhenExchangeDoesNotExist() {
5658
assertThat(cf.get().route("", UUID.randomUUID().toString())).isEmpty();
5759
}
@@ -62,6 +64,7 @@ void partitionsShouldReturnEmptyListWhenExchangeDoesNotExist() {
6264
}
6365

6466
@Test
67+
@BrokerVersionAtLeast("3.9.6")
6568
void routeShouldReturnNullWhenNoStreamForRoutingKey() throws Exception {
6669
declareSuperStreamTopology(connection, superStream, partitions);
6770

@@ -79,6 +82,7 @@ void partitionsShouldReturnEmptyListWhenThereIsNoBinding() throws Exception {
7982
}
8083

8184
@Test
85+
@BrokerVersionAtLeast("3.9.6")
8286
void routeTopologyWithPartitionCount() throws Exception {
8387
declareSuperStreamTopology(connection, superStream, 3);
8488

@@ -94,6 +98,7 @@ void routeTopologyWithPartitionCount() throws Exception {
9498
}
9599

96100
@Test
101+
@BrokerVersionAtLeast("3.9.6")
97102
void routeReturnsMultipleStreamsIfMultipleBindingsForSameKey() throws Exception {
98103
declareSuperStreamTopology(connection, superStream, 3);
99104
connection.createChannel().queueBind(superStream + "-1", superStream, "0");

src/test/java/com/rabbitmq/stream/impl/SuperStreamProducerTest.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import com.rabbitmq.stream.OffsetSpecification;
2828
import com.rabbitmq.stream.Producer;
2929
import com.rabbitmq.stream.ProducerBuilder.RoutingType;
30+
import com.rabbitmq.stream.impl.TestUtils.BrokerVersionAtLeast;
3031
import io.netty.channel.EventLoopGroup;
3132
import java.util.Map;
3233
import java.util.UUID;
@@ -74,6 +75,7 @@ void tearDown() throws Exception {
7475
}
7576

7677
@Test
78+
@BrokerVersionAtLeast("3.9.6")
7779
void allMessagesSentToSuperStreamWithHashRoutingShouldBeThenConsumed() throws Exception {
7880
int messageCount = 10_000;
7981
declareSuperStreamTopology(connection, superStream, partitions);
@@ -125,6 +127,7 @@ void allMessagesSentToSuperStreamWithHashRoutingShouldBeThenConsumed() throws Ex
125127
}
126128

127129
@Test
130+
@BrokerVersionAtLeast("3.9.6")
128131
void allMessagesSentToSuperStreamWithRoutingKeyRoutingShouldBeThenConsumed() throws Exception {
129132
int messageCount = 10_000;
130133
routingKeys = new String[] {"amer", "emea", "apac"};
@@ -176,6 +179,7 @@ void allMessagesSentToSuperStreamWithRoutingKeyRoutingShouldBeThenConsumed() thr
176179
}
177180

178181
@Test
182+
@BrokerVersionAtLeast("3.9.6")
179183
void getLastPublishingIdShouldReturnLowestValue() throws Exception {
180184
int messageCount = 10_000;
181185
declareSuperStreamTopology(connection, superStream, partitions);

src/test/java/com/rabbitmq/stream/impl/TestUtils.java

Lines changed: 104 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import com.rabbitmq.stream.MessageBuilder;
2828
import com.rabbitmq.stream.StreamException;
2929
import com.rabbitmq.stream.impl.Client.Broker;
30+
import com.rabbitmq.stream.impl.Client.ClientParameters;
3031
import com.rabbitmq.stream.impl.Client.StreamMetadata;
3132
import io.netty.channel.EventLoopGroup;
3233
import io.netty.channel.nio.NioEventLoopGroup;
@@ -39,6 +40,7 @@
3940
import java.lang.annotation.Retention;
4041
import java.lang.annotation.RetentionPolicy;
4142
import java.lang.annotation.Target;
43+
import java.lang.reflect.AnnotatedElement;
4244
import java.lang.reflect.Field;
4345
import java.lang.reflect.Method;
4446
import java.nio.charset.StandardCharsets;
@@ -47,6 +49,7 @@
4749
import java.util.Collections;
4850
import java.util.List;
4951
import java.util.Map;
52+
import java.util.Optional;
5053
import java.util.Set;
5154
import java.util.UUID;
5255
import java.util.concurrent.ConcurrentHashMap;
@@ -71,6 +74,7 @@
7174
import org.junit.jupiter.api.extension.ExtensionContext;
7275
import org.mockito.invocation.InvocationOnMock;
7376
import org.mockito.stubbing.Answer;
77+
import org.slf4j.LoggerFactory;
7478

7579
public final class TestUtils {
7680

@@ -363,6 +367,15 @@ static Map<String, StreamMetadata> metadata(Broker leader, List<Broker> replicas
363367
@ExtendWith(DisabledIfTlsNotEnabledCondition.class)
364368
public @interface DisabledIfTlsNotEnabled {}
365369

370+
@Target({ElementType.TYPE, ElementType.METHOD})
371+
@Retention(RetentionPolicy.RUNTIME)
372+
@Documented
373+
@ExtendWith(BrokerVersionAtLeastCondition.class)
374+
public @interface BrokerVersionAtLeast {
375+
376+
String value();
377+
}
378+
366379
interface TaskWithException {
367380

368381
void run(Object context) throws Exception;
@@ -398,7 +411,7 @@ private static ExtensionContext.Store store(ExtensionContext extensionContext) {
398411
return extensionContext.getRoot().getStore(NAMESPACE);
399412
}
400413

401-
private static EventLoopGroup eventLoopGroup(ExtensionContext context) {
414+
static EventLoopGroup eventLoopGroup(ExtensionContext context) {
402415
return (EventLoopGroup) store(context).get("nettyEventLoopGroup");
403416
}
404417

@@ -570,6 +583,96 @@ public ConditionEvaluationResult evaluateExecutionCondition(ExtensionContext con
570583
}
571584
}
572585

586+
static class BrokerVersionAtLeastCondition implements ExecutionCondition {
587+
588+
@Override
589+
public ConditionEvaluationResult evaluateExecutionCondition(ExtensionContext context) {
590+
Optional<AnnotatedElement> element = context.getElement();
591+
// String expectedVersion = annotation.get().getAnnotation(BrokerVersionAtLeast.class);
592+
BrokerVersionAtLeast annotation = element.get().getAnnotation(BrokerVersionAtLeast.class);
593+
if (annotation == null) {
594+
return ConditionEvaluationResult.enabled("No broker version requirement");
595+
} else {
596+
EventLoopGroup eventLoopGroup = StreamTestInfrastructureExtension.eventLoopGroup(context);
597+
if (eventLoopGroup == null) {
598+
throw new IllegalStateException(
599+
"The event loop group must be in the test context to use "
600+
+ BrokerVersionAtLeast.class.getSimpleName()
601+
+ ", use the "
602+
+ StreamTestInfrastructureExtension.class.getSimpleName()
603+
+ " extension in the test");
604+
}
605+
Client client = new Client(new ClientParameters().eventLoopGroup(eventLoopGroup));
606+
String expectedVersion = annotation.value();
607+
String brokerVersion = client.brokerVersion();
608+
if (atLeastVersion(expectedVersion, brokerVersion)) {
609+
return ConditionEvaluationResult.enabled(
610+
"Broker version requirement met, expected "
611+
+ expectedVersion
612+
+ ", actual "
613+
+ brokerVersion);
614+
} else {
615+
return ConditionEvaluationResult.disabled(
616+
"Broker version requirement not met, expected "
617+
+ expectedVersion
618+
+ ", actual "
619+
+ brokerVersion);
620+
}
621+
}
622+
}
623+
}
624+
625+
private static String currentVersion(String currentVersion) {
626+
// versions built from source: 3.7.0+rc.1.4.gedc5d96
627+
if (currentVersion.contains("+")) {
628+
currentVersion = currentVersion.substring(0, currentVersion.indexOf("+"));
629+
}
630+
// alpha (snapshot) versions: 3.7.0~alpha.449-1
631+
if (currentVersion.contains("~")) {
632+
currentVersion = currentVersion.substring(0, currentVersion.indexOf("~"));
633+
}
634+
// alpha (snapshot) versions: 3.7.1-alpha.40
635+
if (currentVersion.contains("-")) {
636+
currentVersion = currentVersion.substring(0, currentVersion.indexOf("-"));
637+
}
638+
return currentVersion;
639+
}
640+
641+
static boolean atLeastVersion(String expectedVersion, String currentVersion) {
642+
if (currentVersion.contains("alpha-stream")) {
643+
return true;
644+
}
645+
try {
646+
currentVersion = currentVersion(currentVersion);
647+
return "0.0.0".equals(currentVersion) || versionCompare(currentVersion, expectedVersion) >= 0;
648+
} catch (RuntimeException e) {
649+
LoggerFactory.getLogger(TestUtils.class)
650+
.warn("Unable to parse broker version {}", currentVersion, e);
651+
throw e;
652+
}
653+
}
654+
655+
/**
656+
* https://stackoverflow.com/questions/6701948/efficient-way-to-compare-version-strings-in-java
657+
*/
658+
static int versionCompare(String str1, String str2) {
659+
String[] vals1 = str1.split("\\.");
660+
String[] vals2 = str2.split("\\.");
661+
int i = 0;
662+
// set index to first non-equal ordinal or length of shortest version string
663+
while (i < vals1.length && i < vals2.length && vals1[i].equals(vals2[i])) {
664+
i++;
665+
}
666+
// compare first non-equal ordinal number
667+
if (i < vals1.length && i < vals2.length) {
668+
int diff = Integer.valueOf(vals1[i]).compareTo(Integer.valueOf(vals2[i]));
669+
return Integer.signum(diff);
670+
}
671+
// the strings are equal or one string is a substring of the other
672+
// e.g. "1.2.3" = "1.2.3" or "1.2.3" < "1.2.3.4"
673+
return Integer.signum(vals1.length - vals2.length);
674+
}
675+
573676
static class CountDownLatchAssert implements AssertDelegateTarget {
574677

575678
private static final Duration TIMEOUT = Duration.ofSeconds(10);
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
// Copyright (c) 2021 VMware, Inc. or its affiliates. All rights reserved.
2+
//
3+
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
4+
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
5+
// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL,
6+
// please see LICENSE-APACHE2.
7+
//
8+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
9+
// either express or implied. See the LICENSE file for specific language governing
10+
// rights and limitations of this software.
11+
//
12+
// If you have any questions regarding licensing, please contact us at
13+
// info@rabbitmq.com.
14+
package com.rabbitmq.stream.impl;
15+
16+
import static org.assertj.core.api.Assertions.assertThat;
17+
18+
import org.junit.jupiter.params.ParameterizedTest;
19+
import org.junit.jupiter.params.provider.CsvSource;
20+
21+
public class TestUtilsTest {
22+
23+
@ParameterizedTest
24+
@CsvSource({
25+
"3.9.6,3.9.5,false",
26+
"3.9.6,3.9.0-alpha-stream.232,true",
27+
"3.9.6,3.9.6-alpha.28,true"
28+
})
29+
void atLeastVersion(String expectedVersion, String currentVersion, boolean expected) {
30+
assertThat(TestUtils.atLeastVersion(expectedVersion, currentVersion)).isEqualTo(expected);
31+
}
32+
}

0 commit comments

Comments
 (0)