Skip to content

Commit 7ed40da

Browse files
committed
Rename "StreamInfo" to "StreamStats"
And use "committed chunk ID" instead of "committed offset". References rabbitmq/rabbitmq-server#5412
1 parent 789bb1d commit 7ed40da

File tree

13 files changed

+80
-81
lines changed

13 files changed

+80
-81
lines changed

src/main/java/com/rabbitmq/stream/Constants.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ public final class Constants {
6868
public static final short COMMAND_ROUTE = 24;
6969
public static final short COMMAND_PARTITIONS = 25;
7070
public static final short COMMAND_EXCHANGE_COMMAND_VERSIONS = 27;
71-
public static final short COMMAND_STREAM_INFO = 28;
71+
public static final short COMMAND_STREAM_STATS = 28;
7272

7373
public static final short VERSION_1 = 1;
7474
public static final short VERSION_2 = 2;

src/main/java/com/rabbitmq/stream/Environment.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ static EnvironmentBuilder builder() {
7070
* @return information on the stream
7171
* @throws UnsupportedOperationException if the broker does not support this command
7272
*/
73-
StreamInfo queryStreamInfo(String stream);
73+
StreamStats queryStreamInfo(String stream);
7474

7575
/**
7676
* Create a {@link ProducerBuilder} to configure and create a {@link Producer}.

src/main/java/com/rabbitmq/stream/MessageHandler.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -54,21 +54,21 @@ interface Context {
5454
long timestamp();
5555

5656
/**
57-
* The committed offset in this stream.
57+
* The ID (offset) of the committed chunk (block of messages) in the stream.
5858
*
59-
* <p>It is the offset of the last message confirmed by a quorum of the stream cluster members
60-
* (leader and replicas).
59+
* <p>It is the offset of the first message in the last chunk confirmed by a quorum of the
60+
* stream cluster members (leader and replicas).
6161
*
62-
* <p>The committed offset is a good indication of what the last offset of a stream is at a
63-
* given time. The value can be stale as soon as the application reads it though, as the
64-
* committed offset for a stream that is published to changes all the time.
62+
* <p>The committed chunk ID is a good indication of what the last offset of a stream can be at
63+
* a given time. The value can be stale as soon as the application reads it though, as the
64+
* committed chunk ID for a stream that is published to changes all the time.
6565
*
6666
* <p>This requires RabbitMQ 3.11 or more. The method always returns 0 otherwise.
6767
*
68-
* @return committed offset in this stream
69-
* @see StreamInfo#committedOffset()
68+
* @return committed chunk ID in this stream
69+
* @see StreamStats#committedChunkId()
7070
*/
71-
long committedOffset();
71+
long committedChunkId();
7272

7373
/**
7474
* The consumer that receives the message.

src/main/java/com/rabbitmq/stream/StreamInfo.java renamed to src/main/java/com/rabbitmq/stream/StreamStats.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
*
2121
* @see Environment#queryStreamInfo(String)
2222
*/
23-
public interface StreamInfo {
23+
public interface StreamStats {
2424

2525
/**
2626
* The first offset in the stream.
@@ -31,18 +31,18 @@ public interface StreamInfo {
3131
long firstOffset();
3232

3333
/**
34-
* The committed offset in the stream.
34+
* The ID (offset) of the committed chunk (block of messages) in the stream.
3535
*
36-
* <p>It is the offset of the last message confirmed by a quorum of the stream cluster members
37-
* (leader and replicas).
36+
* <p>It is the offset of the first message in the last chunk confirmed by a quorum of the stream
37+
* cluster members (leader and replicas).
3838
*
39-
* <p>The committed offset is a good indication of what the last offset of a stream is at a given
40-
* time. The value can be stale as soon as the application reads it though, as the committed
41-
* offset for a stream that is published to changes all the time.
39+
* <p>The committed chunk ID is a good indication of what the last offset of a stream can be at a
40+
* given time. The value can be stale as soon as the application reads it though, as the committed
41+
* chunk ID for a stream that is published to changes all the time.
4242
*
4343
* @return committed offset in this stream
44-
* @see Context#committedOffset()
45-
* @throws NoOffsetException if there is no committed offset yet
44+
* @see Context#committedChunkId()
45+
* @throws NoOffsetException if there is no committed chunk yet
4646
*/
47-
long committedOffset();
47+
long committedChunkId();
4848
}

src/main/java/com/rabbitmq/stream/impl/Client.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
import static com.rabbitmq.stream.Constants.COMMAND_SASL_AUTHENTICATE;
3333
import static com.rabbitmq.stream.Constants.COMMAND_SASL_HANDSHAKE;
3434
import static com.rabbitmq.stream.Constants.COMMAND_STORE_OFFSET;
35-
import static com.rabbitmq.stream.Constants.COMMAND_STREAM_INFO;
35+
import static com.rabbitmq.stream.Constants.COMMAND_STREAM_STATS;
3636
import static com.rabbitmq.stream.Constants.COMMAND_SUBSCRIBE;
3737
import static com.rabbitmq.stream.Constants.COMMAND_UNSUBSCRIBE;
3838
import static com.rabbitmq.stream.Constants.RESPONSE_CODE_AUTHENTICATION_FAILURE;
@@ -1355,7 +1355,7 @@ List<FrameHandlerInfo> exchangeCommandVersions() {
13551355
}
13561356
}
13571357

1358-
StreamInfoResponse streamInfo(String stream) {
1358+
StreamInfoResponse streamStats(String stream) {
13591359
if (stream == null) {
13601360
throw new IllegalArgumentException("stream must not be null");
13611361
}
@@ -1364,7 +1364,7 @@ StreamInfoResponse streamInfo(String stream) {
13641364
try {
13651365
ByteBuf bb = allocate(length + 4);
13661366
bb.writeInt(length);
1367-
bb.writeShort(encodeRequestCode(COMMAND_STREAM_INFO));
1367+
bb.writeShort(encodeRequestCode(COMMAND_STREAM_STATS));
13681368
bb.writeShort(VERSION_1);
13691369
bb.writeInt(correlationId);
13701370
bb.writeShort(stream.length());
@@ -1473,7 +1473,7 @@ void handle(
14731473
byte subscriptionId,
14741474
long offset,
14751475
long chunkTimestamp,
1476-
long committedOffset,
1476+
long committedChunkId,
14771477
Message message);
14781478
}
14791479

@@ -1890,14 +1890,14 @@ public long getSequence() {
18901890

18911891
static class StreamInfoResponse extends Response {
18921892

1893-
private final Map<String, String> info;
1893+
private final Map<String, Long> info;
18941894

1895-
StreamInfoResponse(short responseCode, Map<String, String> info) {
1895+
StreamInfoResponse(short responseCode, Map<String, Long> info) {
18961896
super(responseCode);
18971897
this.info = Collections.unmodifiableMap(new HashMap<>(info));
18981898
}
18991899

1900-
public Map<String, String> getInfo() {
1900+
public Map<String, Long> getInfo() {
19011901
return info;
19021902
}
19031903
}

src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -310,7 +310,7 @@ public long timestamp() {
310310
}
311311

312312
@Override
313-
public long committedOffset() {
313+
public long committedChunkId() {
314314
return committedOffset;
315315
}
316316

src/main/java/com/rabbitmq/stream/impl/ServerFrameHandler.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
import static com.rabbitmq.stream.Constants.COMMAND_ROUTE;
3535
import static com.rabbitmq.stream.Constants.COMMAND_SASL_AUTHENTICATE;
3636
import static com.rabbitmq.stream.Constants.COMMAND_SASL_HANDSHAKE;
37-
import static com.rabbitmq.stream.Constants.COMMAND_STREAM_INFO;
37+
import static com.rabbitmq.stream.Constants.COMMAND_STREAM_STATS;
3838
import static com.rabbitmq.stream.Constants.COMMAND_SUBSCRIBE;
3939
import static com.rabbitmq.stream.Constants.COMMAND_TUNE;
4040
import static com.rabbitmq.stream.Constants.COMMAND_UNSUBSCRIBE;
@@ -134,7 +134,7 @@ class ServerFrameHandler {
134134
handlers.put(COMMAND_ROUTE, new RouteFrameHandler());
135135
handlers.put(COMMAND_PARTITIONS, new PartitionsFrameHandler());
136136
handlers.put(COMMAND_EXCHANGE_COMMAND_VERSIONS, new ExchangeCommandVersionsFrameHandler());
137-
handlers.put(COMMAND_STREAM_INFO, new StreamInfoFrameHandler());
137+
handlers.put(COMMAND_STREAM_STATS, new StreamStatsFrameHandler());
138138
HANDLERS = new FrameHandler[maxCommandKey + 1][];
139139
handlers
140140
.entrySet()
@@ -328,7 +328,7 @@ static int handleMessage(
328328
long offset,
329329
long offsetLimit,
330330
long chunkTimestamp,
331-
long committedOffset,
331+
long committedChunkId,
332332
Codec codec,
333333
MessageListener messageListener,
334334
byte subscriptionId) {
@@ -342,7 +342,7 @@ static int handleMessage(
342342
messageFiltered.set(true);
343343
} else {
344344
Message message = codec.decode(data);
345-
messageListener.handle(subscriptionId, offset, chunkTimestamp, committedOffset, message);
345+
messageListener.handle(subscriptionId, offset, chunkTimestamp, committedChunkId, message);
346346
}
347347
return read;
348348
}
@@ -600,7 +600,7 @@ int doHandle(Client client, ChannelHandlerContext ctx, ByteBuf message) {
600600
client.chunkChecksum,
601601
client.metricsCollector,
602602
message.readByte(), // subscription ID
603-
message.readLong(), // last committed offset, unsigned long
603+
message.readLong(), // committed chunk ID, unsigned long
604604
9 // byte read count, 1 + 9
605605
);
606606
}
@@ -1150,7 +1150,7 @@ int doHandle(Client client, ChannelHandlerContext ctx, ByteBuf message) {
11501150
}
11511151
}
11521152

1153-
private static class StreamInfoFrameHandler extends BaseFrameHandler {
1153+
private static class StreamStatsFrameHandler extends BaseFrameHandler {
11541154

11551155
@Override
11561156
int doHandle(Client client, ChannelHandlerContext ctx, ByteBuf message) {
@@ -1162,14 +1162,14 @@ int doHandle(Client client, ChannelHandlerContext ctx, ByteBuf message) {
11621162

11631163
int infoCount = message.readInt();
11641164
read += 4;
1165-
Map<String, String> info = new LinkedHashMap<>(infoCount);
1165+
Map<String, Long> info = new LinkedHashMap<>(infoCount);
11661166

11671167
for (int i = 0; i < infoCount; i++) {
11681168
String key = readString(message);
11691169
read += 2 + key.length();
1170-
String value = readString(message);
1171-
read += 2 + value.length();
1170+
long value = message.readLong();
11721171
info.put(key, value);
1172+
read += 8;
11731173
}
11741174

11751175
OutstandingRequest<StreamInfoResponse> outstandingRequest =

src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
import com.rabbitmq.stream.ProducerBuilder;
3131
import com.rabbitmq.stream.StreamCreator;
3232
import com.rabbitmq.stream.StreamException;
33-
import com.rabbitmq.stream.StreamInfo;
33+
import com.rabbitmq.stream.StreamStats;
3434
import com.rabbitmq.stream.SubscriptionListener;
3535
import com.rabbitmq.stream.compression.CompressionCodecFactory;
3636
import com.rabbitmq.stream.impl.Client.ClientParameters;
@@ -390,29 +390,29 @@ public void deleteStream(String stream) {
390390
}
391391

392392
@Override
393-
public StreamInfo queryStreamInfo(String stream) {
393+
public StreamStats queryStreamInfo(String stream) {
394394
StreamInfoResponse response =
395395
locatorOperation(
396396
client -> {
397397
if (Utils.is3_11_OrMore(client.brokerVersion())) {
398-
return client.streamInfo(stream);
398+
return client.streamStats(stream);
399399
} else {
400400
throw new UnsupportedOperationException(
401401
"QueryStringInfo is available only for RabbitMQ 3.11 or more.");
402402
}
403403
});
404404
if (response.isOk()) {
405-
Map<String, String> info = response.getInfo();
405+
Map<String, Long> info = response.getInfo();
406406
BiFunction<String, String, LongSupplier> offsetSupplierLogic =
407407
(key, message) -> {
408-
if (!info.containsKey(key) || "-1".equals(info.get(key))) {
408+
if (!info.containsKey(key) || info.get(key) == -1) {
409409
return () -> {
410410
throw new NoOffsetException(message);
411411
};
412412
} else {
413413
try {
414-
long firstOffset = Long.parseUnsignedLong(info.get(key));
415-
return () -> firstOffset;
414+
long offset = info.get(key);
415+
return () -> offset;
416416
} catch (NumberFormatException e) {
417417
return () -> {
418418
throw new NoOffsetException(message);
@@ -421,20 +421,21 @@ public StreamInfo queryStreamInfo(String stream) {
421421
}
422422
};
423423
LongSupplier firstOffsetSupplier =
424-
offsetSupplierLogic.apply("first_offset", "No first offset for stream " + stream);
424+
offsetSupplierLogic.apply("first_chunk_id", "No first offset for stream " + stream);
425425
LongSupplier committedOffsetSupplier =
426-
offsetSupplierLogic.apply("committed_offset", "No committed offset for stream " + stream);
427-
return new DefaultStreamInfo(firstOffsetSupplier, committedOffsetSupplier);
426+
offsetSupplierLogic.apply(
427+
"committed_chunk_id", "No committed chunk ID for stream " + stream);
428+
return new DefaultStreamStats(firstOffsetSupplier, committedOffsetSupplier);
428429
} else {
429430
throw propagateException(response.getResponseCode(), stream);
430431
}
431432
}
432433

433-
private static class DefaultStreamInfo implements StreamInfo {
434+
private static class DefaultStreamStats implements StreamStats {
434435

435436
private final LongSupplier firstOffsetSupplier, committedOffsetSupplier;
436437

437-
private DefaultStreamInfo(
438+
private DefaultStreamStats(
438439
LongSupplier firstOffsetSupplier, LongSupplier committedOffsetSupplier) {
439440
this.firstOffsetSupplier = firstOffsetSupplier;
440441
this.committedOffsetSupplier = committedOffsetSupplier;
@@ -446,7 +447,7 @@ public long firstOffset() {
446447
}
447448

448449
@Override
449-
public long committedOffset() {
450+
public long committedChunkId() {
450451
return committedOffsetSupplier.getAsLong();
451452
}
452453
}

src/main/java/com/rabbitmq/stream/impl/SuperStreamConsumer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -116,8 +116,8 @@ public long timestamp() {
116116
}
117117

118118
@Override
119-
public long committedOffset() {
120-
return context.committedOffset();
119+
public long committedChunkId() {
120+
return context.committedChunkId();
121121
}
122122

123123
@Override

src/test/java/com/rabbitmq/stream/impl/ClientTest.java

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,6 @@
6565
import java.util.concurrent.atomic.AtomicReference;
6666
import java.util.function.Function;
6767
import java.util.function.LongConsumer;
68-
import java.util.function.ToLongBiFunction;
6968
import java.util.stream.Collectors;
7069
import java.util.stream.IntStream;
7170
import org.junit.jupiter.api.Test;
@@ -902,13 +901,13 @@ void streamInfoShouldReturnFirstOffsetAndCommittedOffset() throws Exception {
902901
new Client.ClientParameters()
903902
.chunkListener(chunkListener)
904903
.messageListener(messageListener));
905-
StreamInfoResponse response = client.streamInfo(stream);
906-
assertThat(response.getInfo()).containsEntry("first_offset", "0");
907-
assertThat(response.getInfo()).containsEntry("committed_offset", "-1");
904+
StreamInfoResponse response = client.streamStats(stream);
905+
assertThat(response.getInfo()).containsEntry("first_chunk_id", -1L);
906+
assertThat(response.getInfo()).containsEntry("committed_chunk_id", -1L);
908907
TestUtils.publishAndWaitForConfirms(cf, publishCount, stream);
909-
response = client.streamInfo(stream);
910-
assertThat(response.getInfo()).containsEntry("first_offset", "0");
911-
assertThat(response.getInfo().get("committed_offset")).isNotEqualTo("-1");
908+
response = client.streamStats(stream);
909+
assertThat(response.getInfo()).containsEntry("first_chunk_id", 0L);
910+
assertThat(response.getInfo().get("committed_chunk_id")).isNotEqualTo(-1L);
912911

913912
client.exchangeCommandVersions();
914913

@@ -918,13 +917,13 @@ void streamInfoShouldReturnFirstOffsetAndCommittedOffset() throws Exception {
918917

919918
assertThat(latch.await(10, SECONDS)).isTrue();
920919
assertThat(committedOffset.get()).isPositive();
921-
assertThat(committedOffset.toString()).isEqualTo(response.getInfo().get("committed_offset"));
920+
assertThat(committedOffset).hasValue(response.getInfo().get("committed_chunk_id"));
922921
}
923922

924923
@Test
925924
@BrokerVersionAtLeast("3.11.0")
926925
void streamInfoShouldReturnErrorWhenStreamDoesNotExist() {
927-
assertThat(cf.get().streamInfo("does not exist").getResponseCode())
926+
assertThat(cf.get().streamStats("does not exist").getResponseCode())
928927
.isEqualTo(Constants.RESPONSE_CODE_STREAM_DOES_NOT_EXIST);
929928
}
930929

@@ -947,20 +946,19 @@ void streamInfoFirstOffsetShouldChangeAfterRetentionKickedIn(TestInfo info) {
947946
.isOk())
948947
.isTrue();
949948

950-
StreamInfoResponse response = client.streamInfo(s);
951-
assertThat(response.getInfo()).containsEntry("first_offset", "0");
952-
assertThat(response.getInfo()).containsEntry("committed_offset", "-1");
949+
StreamInfoResponse response = client.streamStats(s);
950+
assertThat(response.getInfo()).containsEntry("first_chunk_id", -1L);
951+
assertThat(response.getInfo()).containsEntry("committed_chunk_id", -1L);
953952

954953
byte[] payload = new byte[payloadSize];
955954
Function<MessageBuilder, Message> messageCreation = mb -> mb.addData(payload).build();
956955

957956
TestUtils.publishAndWaitForConfirms(cf, messageCreation, messageCount, s);
958957
// publishing again, to make sure new segments trigger retention strategy
959958
TestUtils.publishAndWaitForConfirms(cf, messageCreation, messageCount, s);
960-
response = client.streamInfo(s);
961-
ToLongBiFunction<Map<String, String>, String> toOffset = (m, k) -> Long.parseLong(m.get(k));
962-
assertThat(toOffset.applyAsLong(response.getInfo(), "first_offset")).isPositive();
963-
assertThat(toOffset.applyAsLong(response.getInfo(), "committed_offset")).isPositive();
959+
response = client.streamStats(s);
960+
assertThat(response.getInfo().get("first_chunk_id")).isPositive();
961+
assertThat(response.getInfo().get("committed_chunk_id")).isPositive();
964962

965963
} finally {
966964
assertThat(client.delete(s).isOk()).isTrue();

0 commit comments

Comments
 (0)