Skip to content

Commit 83ff8e0

Browse files
committed
Use StreamStats lingo, not StreamInfo
References #165
1 parent d2831d9 commit 83ff8e0

File tree

7 files changed

+32
-32
lines changed

7 files changed

+32
-32
lines changed

src/main/java/com/rabbitmq/stream/Environment.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,15 +62,15 @@ static EnvironmentBuilder builder() {
6262
void deleteStream(String stream);
6363

6464
/**
65-
* Query information on a stream.
65+
* Query statistics on a stream.
6666
*
6767
* <p>Requires RabbitMQ 3.11 or more.
6868
*
6969
* @param stream
70-
* @return information on the stream
70+
* @return statistics on the stream
7171
* @throws UnsupportedOperationException if the broker does not support this command
7272
*/
73-
StreamStats queryStreamInfo(String stream);
73+
StreamStats queryStreamStats(String stream);
7474

7575
/**
7676
* Create a {@link ProducerBuilder} to configure and create a {@link Producer}.

src/main/java/com/rabbitmq/stream/StreamStats.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,9 @@
1616
import com.rabbitmq.stream.MessageHandler.Context;
1717

1818
/**
19-
* Information on a stream.
19+
* Statistics on a stream.
2020
*
21-
* @see Environment#queryStreamInfo(String)
21+
* @see Environment#queryStreamStats(String)
2222
*/
2323
public interface StreamStats {
2424

src/main/java/com/rabbitmq/stream/impl/Client.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1360,7 +1360,7 @@ List<FrameHandlerInfo> exchangeCommandVersions() {
13601360
}
13611361
}
13621362

1363-
StreamInfoResponse streamStats(String stream) {
1363+
StreamStatsResponse streamStats(String stream) {
13641364
if (stream == null) {
13651365
throw new IllegalArgumentException("stream must not be null");
13661366
}
@@ -1374,7 +1374,7 @@ StreamInfoResponse streamStats(String stream) {
13741374
bb.writeInt(correlationId);
13751375
bb.writeShort(stream.length());
13761376
bb.writeBytes(stream.getBytes(StandardCharsets.UTF_8));
1377-
OutstandingRequest<StreamInfoResponse> request = new OutstandingRequest<>(this.rpcTimeout);
1377+
OutstandingRequest<StreamStatsResponse> request = new OutstandingRequest<>(this.rpcTimeout);
13781378
outstandingRequests.put(correlationId, request);
13791379
channel.writeAndFlush(bb);
13801380
request.block();
@@ -1921,11 +1921,11 @@ public long getSequence() {
19211921
}
19221922
}
19231923

1924-
static class StreamInfoResponse extends Response {
1924+
static class StreamStatsResponse extends Response {
19251925

19261926
private final Map<String, Long> info;
19271927

1928-
StreamInfoResponse(short responseCode, Map<String, Long> info) {
1928+
StreamStatsResponse(short responseCode, Map<String, Long> info) {
19291929
super(responseCode);
19301930
this.info = Collections.unmodifiableMap(new HashMap<>(info));
19311931
}

src/main/java/com/rabbitmq/stream/impl/ServerFrameHandler.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@
6565
import com.rabbitmq.stream.impl.Client.Response;
6666
import com.rabbitmq.stream.impl.Client.SaslAuthenticateResponse;
6767
import com.rabbitmq.stream.impl.Client.ShutdownContext.ShutdownReason;
68-
import com.rabbitmq.stream.impl.Client.StreamInfoResponse;
68+
import com.rabbitmq.stream.impl.Client.StreamStatsResponse;
6969
import com.rabbitmq.stream.impl.Client.StreamMetadata;
7070
import com.rabbitmq.stream.impl.Client.SubscriptionOffset;
7171
import com.rabbitmq.stream.metrics.MetricsCollector;
@@ -1196,12 +1196,12 @@ int doHandle(Client client, ChannelHandlerContext ctx, ByteBuf message) {
11961196
read += 8;
11971197
}
11981198

1199-
OutstandingRequest<StreamInfoResponse> outstandingRequest =
1200-
remove(client.outstandingRequests, correlationId, StreamInfoResponse.class);
1199+
OutstandingRequest<StreamStatsResponse> outstandingRequest =
1200+
remove(client.outstandingRequests, correlationId, StreamStatsResponse.class);
12011201
if (outstandingRequest == null) {
12021202
LOGGER.warn("Could not find outstanding request with correlation ID {}", correlationId);
12031203
} else {
1204-
outstandingRequest.response().set(new StreamInfoResponse(responseCode, info));
1204+
outstandingRequest.response().set(new StreamStatsResponse(responseCode, info));
12051205
outstandingRequest.countDown();
12061206
}
12071207
return read;

src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
import com.rabbitmq.stream.SubscriptionListener;
3535
import com.rabbitmq.stream.compression.CompressionCodecFactory;
3636
import com.rabbitmq.stream.impl.Client.ClientParameters;
37-
import com.rabbitmq.stream.impl.Client.StreamInfoResponse;
37+
import com.rabbitmq.stream.impl.Client.StreamStatsResponse;
3838
import com.rabbitmq.stream.impl.OffsetTrackingCoordinator.Registration;
3939
import com.rabbitmq.stream.impl.StreamConsumerBuilder.TrackingConfiguration;
4040
import com.rabbitmq.stream.impl.StreamEnvironmentBuilder.DefaultTlsConfiguration;
@@ -390,8 +390,8 @@ public void deleteStream(String stream) {
390390
}
391391

392392
@Override
393-
public StreamStats queryStreamInfo(String stream) {
394-
StreamInfoResponse response =
393+
public StreamStats queryStreamStats(String stream) {
394+
StreamStatsResponse response =
395395
locatorOperation(
396396
client -> {
397397
if (Utils.is3_11_OrMore(client.brokerVersion())) {

src/test/java/com/rabbitmq/stream/impl/ClientTest.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
import com.rabbitmq.stream.codec.SwiftMqCodec;
3939
import com.rabbitmq.stream.impl.Client.ClientParameters;
4040
import com.rabbitmq.stream.impl.Client.Response;
41-
import com.rabbitmq.stream.impl.Client.StreamInfoResponse;
41+
import com.rabbitmq.stream.impl.Client.StreamStatsResponse;
4242
import com.rabbitmq.stream.impl.Client.StreamParametersBuilder;
4343
import com.rabbitmq.stream.impl.ServerFrameHandler.FrameHandlerInfo;
4444
import com.rabbitmq.stream.impl.TestUtils.BrokerVersion;
@@ -883,7 +883,7 @@ void deliverVersion2LastCommittedOffsetShouldBeSet() throws Exception {
883883

884884
@Test
885885
@BrokerVersionAtLeast(BrokerVersion.RABBITMQ_3_11)
886-
void streamInfoShouldReturnFirstOffsetAndCommittedOffset() throws Exception {
886+
void streamStatsShouldReturnFirstOffsetAndCommittedOffset() throws Exception {
887887
int publishCount = 20_000;
888888
CountDownLatch latch = new CountDownLatch(publishCount);
889889

@@ -903,7 +903,7 @@ void streamInfoShouldReturnFirstOffsetAndCommittedOffset() throws Exception {
903903
new Client.ClientParameters()
904904
.chunkListener(chunkListener)
905905
.messageListener(messageListener));
906-
StreamInfoResponse response = client.streamStats(stream);
906+
StreamStatsResponse response = client.streamStats(stream);
907907
assertThat(response.getInfo()).containsEntry("first_chunk_id", -1L);
908908
assertThat(response.getInfo()).containsEntry("committed_chunk_id", -1L);
909909
TestUtils.publishAndWaitForConfirms(cf, publishCount, stream);
@@ -924,14 +924,14 @@ void streamInfoShouldReturnFirstOffsetAndCommittedOffset() throws Exception {
924924

925925
@Test
926926
@BrokerVersionAtLeast(BrokerVersion.RABBITMQ_3_11)
927-
void streamInfoShouldReturnErrorWhenStreamDoesNotExist() {
927+
void streamStatsShouldReturnErrorWhenStreamDoesNotExist() {
928928
assertThat(cf.get().streamStats("does not exist").getResponseCode())
929929
.isEqualTo(Constants.RESPONSE_CODE_STREAM_DOES_NOT_EXIST);
930930
}
931931

932932
@Test
933933
@BrokerVersionAtLeast(BrokerVersion.RABBITMQ_3_11)
934-
void streamInfoFirstOffsetShouldChangeAfterRetentionKickedIn(TestInfo info) {
934+
void streamStatsFirstOffsetShouldChangeAfterRetentionKickedIn(TestInfo info) {
935935
int messageCount = 1000;
936936
int payloadSize = 1000;
937937
String s = TestUtils.streamName(info);
@@ -948,7 +948,7 @@ void streamInfoFirstOffsetShouldChangeAfterRetentionKickedIn(TestInfo info) {
948948
.isOk())
949949
.isTrue();
950950

951-
StreamInfoResponse response = client.streamStats(s);
951+
StreamStatsResponse response = client.streamStats(s);
952952
assertThat(response.getInfo()).containsEntry("first_chunk_id", -1L);
953953
assertThat(response.getInfo()).containsEntry("committed_chunk_id", -1L);
954954

src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentTest.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -506,18 +506,18 @@ void createPublishConsumeDelete(boolean lazyInit, TestInfo info) {
506506

507507
@Test
508508
@BrokerVersionAtLeast(BrokerVersion.RABBITMQ_3_11)
509-
void queryStreamInfoShouldReturnFirstOffsetAndCommittedOffset() throws Exception {
509+
void queryStreamStatsShouldReturnFirstOffsetAndCommittedOffset() throws Exception {
510510
try (Environment env = environmentBuilder.build()) {
511-
StreamStats info = env.queryStreamInfo(stream);
512-
assertThatThrownBy(() -> info.firstOffset()).isInstanceOf(NoOffsetException.class);
513-
assertThatThrownBy(() -> info.committedChunkId()).isInstanceOf(NoOffsetException.class);
511+
StreamStats stats = env.queryStreamStats(stream);
512+
assertThatThrownBy(() -> stats.firstOffset()).isInstanceOf(NoOffsetException.class);
513+
assertThatThrownBy(() -> stats.committedChunkId()).isInstanceOf(NoOffsetException.class);
514514

515515
int publishCount = 20_000;
516516
TestUtils.publishAndWaitForConfirms(cf, publishCount, stream);
517517

518-
StreamStats info2 = env.queryStreamInfo(stream);
519-
assertThat(info2.firstOffset()).isZero();
520-
assertThat(info2.committedChunkId()).isPositive();
518+
StreamStats stats2 = env.queryStreamStats(stream);
519+
assertThat(stats2.firstOffset()).isZero();
520+
assertThat(stats2.committedChunkId()).isPositive();
521521

522522
CountDownLatch latch = new CountDownLatch(publishCount);
523523
AtomicLong committedChunkId = new AtomicLong();
@@ -532,15 +532,15 @@ void queryStreamInfoShouldReturnFirstOffsetAndCommittedOffset() throws Exception
532532

533533
assertThat(latch.await(10, SECONDS)).isTrue();
534534
assertThat(committedChunkId.get()).isPositive();
535-
assertThat(committedChunkId).hasValue(info2.committedChunkId());
535+
assertThat(committedChunkId).hasValue(stats2.committedChunkId());
536536
}
537537
}
538538

539539
@Test
540540
@BrokerVersionAtLeast(BrokerVersion.RABBITMQ_3_11)
541-
void queryStreamInfoShouldThrowExceptionWhenStreamDoesNotExist() {
541+
void queryStreamStatsShouldThrowExceptionWhenStreamDoesNotExist() {
542542
try (Environment env = environmentBuilder.build()) {
543-
assertThatThrownBy(() -> env.queryStreamInfo("does not exist"))
543+
assertThatThrownBy(() -> env.queryStreamStats("does not exist"))
544544
.isInstanceOf(StreamDoesNotExistException.class);
545545
}
546546
}

0 commit comments

Comments
 (0)