Skip to content

Commit 8cfb410

Browse files
committed
Run SAC tests against RabbitMQ 3.11 or more
Refactor also the JUnit conditions to check the broker version.
1 parent 0e7e9a8 commit 8cfb410

File tree

8 files changed

+101
-35
lines changed

8 files changed

+101
-35
lines changed

src/main/java/com/rabbitmq/stream/impl/Utils.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,11 @@
1515

1616
import com.rabbitmq.stream.Address;
1717
import com.rabbitmq.stream.Constants;
18+
import com.rabbitmq.stream.ConsumerUpdateListener;
19+
import com.rabbitmq.stream.OffsetSpecification;
1820
import com.rabbitmq.stream.StreamDoesNotExistException;
1921
import com.rabbitmq.stream.StreamException;
2022
import com.rabbitmq.stream.StreamNotAvailableException;
21-
import com.rabbitmq.stream.ConsumerUpdateListener;
22-
import com.rabbitmq.stream.OffsetSpecification;
2323
import com.rabbitmq.stream.impl.Client.ClientParameters;
2424
import java.security.cert.X509Certificate;
2525
import java.time.Duration;

src/test/java/com/rabbitmq/stream/impl/ClientTest.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import com.rabbitmq.stream.impl.Client.StreamInfoResponse;
4242
import com.rabbitmq.stream.impl.Client.StreamParametersBuilder;
4343
import com.rabbitmq.stream.impl.ServerFrameHandler.FrameHandlerInfo;
44+
import com.rabbitmq.stream.impl.TestUtils.BrokerVersion;
4445
import com.rabbitmq.stream.impl.TestUtils.BrokerVersionAtLeast;
4546
import io.netty.buffer.ByteBufAllocator;
4647
import io.netty.buffer.UnpooledByteBufAllocator;
@@ -833,7 +834,7 @@ void closingPublisherWhilePublishingShouldNotCloseConnection(String publisherRef
833834
}
834835

835836
@Test
836-
@BrokerVersionAtLeast("3.11.0")
837+
@BrokerVersionAtLeast(BrokerVersion.RABBITMQ_3_11)
837838
void exchangeCommandVersions() {
838839
Client client = cf.get();
839840
List<FrameHandlerInfo> infos = client.exchangeCommandVersions();
@@ -842,7 +843,7 @@ void exchangeCommandVersions() {
842843
}
843844

844845
@Test
845-
@BrokerVersionAtLeast("3.11.0")
846+
@BrokerVersionAtLeast(BrokerVersion.RABBITMQ_3_11)
846847
void deliverVersion2LastCommittedOffsetShouldBeSet() throws Exception {
847848
int publishCount = 20_000;
848849
byte correlationId = 42;
@@ -881,7 +882,7 @@ void deliverVersion2LastCommittedOffsetShouldBeSet() throws Exception {
881882
}
882883

883884
@Test
884-
@BrokerVersionAtLeast("3.11.0")
885+
@BrokerVersionAtLeast(BrokerVersion.RABBITMQ_3_11)
885886
void streamInfoShouldReturnFirstOffsetAndCommittedOffset() throws Exception {
886887
int publishCount = 20_000;
887888
CountDownLatch latch = new CountDownLatch(publishCount);
@@ -922,14 +923,14 @@ void streamInfoShouldReturnFirstOffsetAndCommittedOffset() throws Exception {
922923
}
923924

924925
@Test
925-
@BrokerVersionAtLeast("3.11.0")
926+
@BrokerVersionAtLeast(BrokerVersion.RABBITMQ_3_11)
926927
void streamInfoShouldReturnErrorWhenStreamDoesNotExist() {
927928
assertThat(cf.get().streamStats("does not exist").getResponseCode())
928929
.isEqualTo(Constants.RESPONSE_CODE_STREAM_DOES_NOT_EXIST);
929930
}
930931

931932
@Test
932-
@BrokerVersionAtLeast("3.11.0")
933+
@BrokerVersionAtLeast(BrokerVersion.RABBITMQ_3_11)
933934
void streamInfoFirstOffsetShouldChangeAfterRetentionKickedIn(TestInfo info) {
934935
int messageCount = 1000;
935936
int payloadSize = 1000;

src/test/java/com/rabbitmq/stream/impl/SacClientTest.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import com.rabbitmq.stream.impl.Client.CreditNotification;
3636
import com.rabbitmq.stream.impl.Client.MessageListener;
3737
import com.rabbitmq.stream.impl.Client.Response;
38+
import com.rabbitmq.stream.impl.TestUtils.BrokerVersionAtLeast311Condition;
3839
import com.rabbitmq.stream.impl.TestUtils.DisabledIfRabbitMqCtlNotSet;
3940
import java.nio.charset.StandardCharsets;
4041
import java.util.Collections;
@@ -53,7 +54,10 @@
5354
import org.junit.jupiter.api.TestInfo;
5455
import org.junit.jupiter.api.extension.ExtendWith;
5556

56-
@ExtendWith(TestUtils.StreamTestInfrastructureExtension.class)
57+
@ExtendWith({
58+
TestUtils.StreamTestInfrastructureExtension.class,
59+
BrokerVersionAtLeast311Condition.class
60+
})
5761
@TestUtils.SingleActiveConsumer
5862
public class SacClientTest {
5963

src/test/java/com/rabbitmq/stream/impl/SacStreamConsumerTest.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.rabbitmq.stream.Environment;
2323
import com.rabbitmq.stream.EnvironmentBuilder;
2424
import com.rabbitmq.stream.OffsetSpecification;
25+
import com.rabbitmq.stream.impl.TestUtils.BrokerVersionAtLeast311Condition;
2526
import io.netty.channel.EventLoopGroup;
2627
import java.util.Map;
2728
import java.util.concurrent.ConcurrentHashMap;
@@ -32,7 +33,10 @@
3233
import org.junit.jupiter.api.Test;
3334
import org.junit.jupiter.api.extension.ExtendWith;
3435

35-
@ExtendWith(TestUtils.StreamTestInfrastructureExtension.class)
36+
@ExtendWith({
37+
TestUtils.StreamTestInfrastructureExtension.class,
38+
BrokerVersionAtLeast311Condition.class
39+
})
3640
@TestUtils.SingleActiveConsumer
3741
public class SacStreamConsumerTest {
3842

src/test/java/com/rabbitmq/stream/impl/SacSuperStreamConsumerTest.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import com.rabbitmq.stream.EnvironmentBuilder;
2929
import com.rabbitmq.stream.NoOffsetException;
3030
import com.rabbitmq.stream.OffsetSpecification;
31+
import com.rabbitmq.stream.impl.TestUtils.BrokerVersionAtLeast311Condition;
3132
import com.rabbitmq.stream.impl.TestUtils.CallableBooleanSupplier;
3233
import com.rabbitmq.stream.impl.TestUtils.SingleActiveConsumer;
3334
import com.rabbitmq.stream.impl.Utils.CompositeConsumerUpdateListener;
@@ -46,7 +47,10 @@
4647
import org.junit.jupiter.api.TestInfo;
4748
import org.junit.jupiter.api.extension.ExtendWith;
4849

49-
@ExtendWith(TestUtils.StreamTestInfrastructureExtension.class)
50+
@ExtendWith({
51+
TestUtils.StreamTestInfrastructureExtension.class,
52+
BrokerVersionAtLeast311Condition.class
53+
})
5054
@SingleActiveConsumer
5155
public class SacSuperStreamConsumerTest {
5256
EventLoopGroup eventLoopGroup;

src/test/java/com/rabbitmq/stream/impl/StreamConsumerTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import com.rabbitmq.stream.StreamDoesNotExistException;
3737
import com.rabbitmq.stream.impl.Client.QueryOffsetResponse;
3838
import com.rabbitmq.stream.impl.MonitoringTestUtils.ConsumerInfo;
39+
import com.rabbitmq.stream.impl.TestUtils.BrokerVersion;
3940
import com.rabbitmq.stream.impl.TestUtils.BrokerVersionAtLeast;
4041
import com.rabbitmq.stream.impl.TestUtils.DisabledIfRabbitMqCtlNotSet;
4142
import io.netty.channel.EventLoopGroup;
@@ -141,7 +142,7 @@ void nameShouldBeSetIfTrackingStrategyIsSet() {
141142
}
142143

143144
@Test
144-
@BrokerVersionAtLeast("3.11.0")
145+
@BrokerVersionAtLeast(BrokerVersion.RABBITMQ_3_11)
145146
void committedOffsetShouldBeSet() throws Exception {
146147
int messageCount = 20_000;
147148
TestUtils.publishAndWaitForConfirms(cf, messageCount, this.stream);

src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import com.rabbitmq.stream.StreamStats;
4646
import com.rabbitmq.stream.impl.Client.StreamMetadata;
4747
import com.rabbitmq.stream.impl.MonitoringTestUtils.EnvironmentInfo;
48+
import com.rabbitmq.stream.impl.TestUtils.BrokerVersion;
4849
import com.rabbitmq.stream.impl.TestUtils.BrokerVersionAtLeast;
4950
import com.rabbitmq.stream.impl.TestUtils.DisabledIfTlsNotEnabled;
5051
import io.netty.channel.EventLoopGroup;
@@ -504,7 +505,7 @@ void createPublishConsumeDelete(boolean lazyInit, TestInfo info) {
504505
}
505506

506507
@Test
507-
@BrokerVersionAtLeast("3.11.0")
508+
@BrokerVersionAtLeast(BrokerVersion.RABBITMQ_3_11)
508509
void queryStreamInfoShouldReturnFirstOffsetAndCommittedOffset() throws Exception {
509510
try (Environment env = environmentBuilder.build()) {
510511
StreamStats info = env.queryStreamInfo(stream);
@@ -536,7 +537,7 @@ void queryStreamInfoShouldReturnFirstOffsetAndCommittedOffset() throws Exception
536537
}
537538

538539
@Test
539-
@BrokerVersionAtLeast("3.11.0")
540+
@BrokerVersionAtLeast(BrokerVersion.RABBITMQ_3_11)
540541
void queryStreamInfoShouldThrowExceptionWhenStreamDoesNotExist() {
541542
try (Environment env = environmentBuilder.build()) {
542543
assertThatThrownBy(() -> env.queryStreamInfo("does not exist"))

src/test/java/com/rabbitmq/stream/impl/TestUtils.java

Lines changed: 73 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@
4444
import java.lang.annotation.Retention;
4545
import java.lang.annotation.RetentionPolicy;
4646
import java.lang.annotation.Target;
47-
import java.lang.reflect.AnnotatedElement;
4847
import java.lang.reflect.Field;
4948
import java.lang.reflect.Method;
5049
import java.nio.charset.StandardCharsets;
@@ -54,7 +53,6 @@
5453
import java.util.Collections;
5554
import java.util.List;
5655
import java.util.Map;
57-
import java.util.Optional;
5856
import java.util.Set;
5957
import java.util.UUID;
6058
import java.util.concurrent.ConcurrentHashMap;
@@ -468,10 +466,10 @@ static boolean atLeastVersion(String expectedVersion, String currentVersion) {
468466
@Target({ElementType.TYPE, ElementType.METHOD})
469467
@Retention(RetentionPolicy.RUNTIME)
470468
@Documented
471-
@ExtendWith(BrokerVersionAtLeastCondition.class)
469+
@ExtendWith(AnnotationBrokerVersionAtLeastCondition.class)
472470
public @interface BrokerVersionAtLeast {
473471

474-
String value();
472+
BrokerVersion value();
475473
}
476474

477475
interface TaskWithException {
@@ -746,29 +744,47 @@ public ConditionEvaluationResult evaluateExecutionCondition(ExtensionContext con
746744
}
747745
}
748746

749-
static class BrokerVersionAtLeastCondition implements ExecutionCondition {
747+
private static class BaseBrokerVersionAtLeastCondition implements ExecutionCondition {
748+
749+
private final Function<ExtensionContext, String> versionProvider;
750+
751+
private BaseBrokerVersionAtLeastCondition(Function<ExtensionContext, String> versionProvider) {
752+
this.versionProvider = versionProvider;
753+
}
750754

751755
@Override
752756
public ConditionEvaluationResult evaluateExecutionCondition(ExtensionContext context) {
753-
Optional<AnnotatedElement> element = context.getElement();
754-
// String expectedVersion = annotation.get().getAnnotation(BrokerVersionAtLeast.class);
755-
BrokerVersionAtLeast annotation = element.get().getAnnotation(BrokerVersionAtLeast.class);
756-
if (annotation == null) {
757+
if (!context.getTestMethod().isPresent()) {
758+
return ConditionEvaluationResult.enabled("Apply only to methods");
759+
}
760+
String expectedVersion = versionProvider.apply(context);
761+
if (expectedVersion == null) {
757762
return ConditionEvaluationResult.enabled("No broker version requirement");
758763
} else {
759-
EventLoopGroup eventLoopGroup = StreamTestInfrastructureExtension.eventLoopGroup(context);
760-
if (eventLoopGroup == null) {
761-
throw new IllegalStateException(
762-
"The event loop group must be in the test context to use "
763-
+ BrokerVersionAtLeast.class.getSimpleName()
764-
+ ", use the "
765-
+ StreamTestInfrastructureExtension.class.getSimpleName()
766-
+ " extension in the test");
767-
}
768-
Client client = new Client(new ClientParameters().eventLoopGroup(eventLoopGroup));
769-
String expectedVersion = annotation.value();
770-
String brokerVersion = client.brokerVersion();
771-
client.close();
764+
String brokerVersion =
765+
context
766+
.getRoot()
767+
.getStore(Namespace.GLOBAL)
768+
.getOrComputeIfAbsent(
769+
"brokerVersion",
770+
k -> {
771+
EventLoopGroup eventLoopGroup =
772+
StreamTestInfrastructureExtension.eventLoopGroup(context);
773+
if (eventLoopGroup == null) {
774+
throw new IllegalStateException(
775+
"The event loop group must be in the test context to use "
776+
+ BrokerVersionAtLeast.class.getSimpleName()
777+
+ ", use the "
778+
+ StreamTestInfrastructureExtension.class.getSimpleName()
779+
+ " extension in the test");
780+
}
781+
try (Client client =
782+
new Client(new ClientParameters().eventLoopGroup(eventLoopGroup))) {
783+
return client.brokerVersion();
784+
}
785+
},
786+
String.class);
787+
772788
if (atLeastVersion(expectedVersion, brokerVersion)) {
773789
return ConditionEvaluationResult.enabled(
774790
"Broker version requirement met, expected "
@@ -786,6 +802,26 @@ public ConditionEvaluationResult evaluateExecutionCondition(ExtensionContext con
786802
}
787803
}
788804

805+
private static class AnnotationBrokerVersionAtLeastCondition
806+
extends BaseBrokerVersionAtLeastCondition {
807+
808+
private AnnotationBrokerVersionAtLeastCondition() {
809+
super(
810+
context -> {
811+
BrokerVersionAtLeast annotation =
812+
context.getElement().get().getAnnotation(BrokerVersionAtLeast.class);
813+
return annotation == null ? null : annotation.value().toString();
814+
});
815+
}
816+
}
817+
818+
static class BrokerVersionAtLeast311Condition extends BaseBrokerVersionAtLeastCondition {
819+
820+
private BrokerVersionAtLeast311Condition() {
821+
super(context -> "3.11.0");
822+
}
823+
}
824+
789825
static class CountDownLatchAssert implements AssertDelegateTarget {
790826

791827
private static final Duration TIMEOUT = Duration.ofSeconds(10);
@@ -851,4 +887,19 @@ static void waitMs(long waitTime) {
851887
@Retention(RetentionPolicy.RUNTIME)
852888
@Tag("single-active-consumer")
853889
public @interface SingleActiveConsumer {}
890+
891+
public enum BrokerVersion {
892+
RABBITMQ_3_11("3.11.0");
893+
894+
final String value;
895+
896+
BrokerVersion(String value) {
897+
this.value = value;
898+
}
899+
900+
@Override
901+
public String toString() {
902+
return this.value;
903+
}
904+
}
854905
}

0 commit comments

Comments
 (0)