Skip to content

Commit b40739d

Browse files
authored
Merge pull request #165 from rabbitmq/rabbitmq-server-5412-stream-info-command
Support StreamInfo command
2 parents fc183ac + 7ed40da commit b40739d

30 files changed

+437
-41
lines changed

src/main/java/com/rabbitmq/stream/Constants.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ public final class Constants {
6868
public static final short COMMAND_ROUTE = 24;
6969
public static final short COMMAND_PARTITIONS = 25;
7070
public static final short COMMAND_EXCHANGE_COMMAND_VERSIONS = 27;
71+
public static final short COMMAND_STREAM_STATS = 28;
7172

7273
public static final short VERSION_1 = 1;
7374
public static final short VERSION_2 = 2;

src/main/java/com/rabbitmq/stream/Environment.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2020-2021 VMware, Inc. or its affiliates. All rights reserved.
1+
// Copyright (c) 2020-2022 VMware, Inc. or its affiliates. All rights reserved.
22
//
33
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
@@ -61,6 +61,17 @@ static EnvironmentBuilder builder() {
6161
*/
6262
void deleteStream(String stream);
6363

64+
/**
65+
* Query information on a stream.
66+
*
67+
* <p>Requires RabbitMQ 3.11 or more.
68+
*
69+
* @param stream
70+
* @return information on the stream
71+
* @throws UnsupportedOperationException if the broker does not support this command
72+
*/
73+
StreamStats queryStreamInfo(String stream);
74+
6475
/**
6576
* Create a {@link ProducerBuilder} to configure and create a {@link Producer}.
6677
*

src/main/java/com/rabbitmq/stream/MessageHandler.java

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2020-2021 VMware, Inc. or its affiliates. All rights reserved.
1+
// Copyright (c) 2020-2022 VMware, Inc. or its affiliates. All rights reserved.
22
//
33
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
@@ -54,18 +54,21 @@ interface Context {
5454
long timestamp();
5555

5656
/**
57-
* The committed offset on this stream.
57+
* The ID (offset) of the committed chunk (block of messages) in the stream.
5858
*
59-
* <p>It is the offset of the last message confirmed by a quorum of the stream cluster members
60-
* (leader and replicas).
59+
* <p>It is the offset of the first message in the last chunk confirmed by a quorum of the
60+
* stream cluster members (leader and replicas).
6161
*
62-
* <p>The committed offset is a good indication of what the last offset of a stream is at a
63-
* given time. The value can be stale as soon as the application reads it though, as the
64-
* committed offset for a stream that is published to changes all the time.
62+
* <p>The committed chunk ID is a good indication of what the last offset of a stream can be at
63+
* a given time. The value can be stale as soon as the application reads it though, as the
64+
* committed chunk ID for a stream that is published to changes all the time.
6565
*
66-
* @return committed offset on this stream
66+
* <p>This requires RabbitMQ 3.11 or more. The method always returns 0 otherwise.
67+
*
68+
* @return committed chunk ID in this stream
69+
* @see StreamStats#committedChunkId()
6770
*/
68-
long committedOffset();
71+
long committedChunkId();
6972

7073
/**
7174
* The consumer that receives the message.
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
// Copyright (c) 2022 VMware, Inc. or its affiliates. All rights reserved.
2+
//
3+
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
4+
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
5+
// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL,
6+
// please see LICENSE-APACHE2.
7+
//
8+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
9+
// either express or implied. See the LICENSE file for specific language governing
10+
// rights and limitations of this software.
11+
//
12+
// If you have any questions regarding licensing, please contact us at
13+
// info@rabbitmq.com.
14+
package com.rabbitmq.stream;
15+
16+
public class NoOffsetException extends StreamException {
17+
18+
public NoOffsetException(String message) {
19+
super(message, Constants.RESPONSE_CODE_NO_OFFSET);
20+
}
21+
}

src/main/java/com/rabbitmq/stream/StreamDoesNotExistException.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2020-2021 VMware, Inc. or its affiliates. All rights reserved.
1+
// Copyright (c) 2020-2022 VMware, Inc. or its affiliates. All rights reserved.
22
//
33
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
@@ -19,7 +19,7 @@ public class StreamDoesNotExistException extends StreamException {
1919
private final String stream;
2020

2121
public StreamDoesNotExistException(String stream) {
22-
super("Stream " + stream + " does not exist");
22+
super("Stream " + stream + " does not exist", Constants.RESPONSE_CODE_STREAM_DOES_NOT_EXIST);
2323
this.stream = stream;
2424
}
2525

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
// Copyright (c) 2022 VMware, Inc. or its affiliates. All rights reserved.
2+
//
3+
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
4+
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
5+
// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL,
6+
// please see LICENSE-APACHE2.
7+
//
8+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
9+
// either express or implied. See the LICENSE file for specific language governing
10+
// rights and limitations of this software.
11+
//
12+
// If you have any questions regarding licensing, please contact us at
13+
// info@rabbitmq.com.
14+
package com.rabbitmq.stream;
15+
16+
/** Exception to indicate a stream is not available. */
17+
public class StreamNotAvailableException extends StreamException {
18+
19+
public StreamNotAvailableException(String stream) {
20+
super("Stream " + stream + " is not available", Constants.RESPONSE_CODE_STREAM_NOT_AVAILABLE);
21+
}
22+
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
// Copyright (c) 2022 VMware, Inc. or its affiliates. All rights reserved.
2+
//
3+
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
4+
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
5+
// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL,
6+
// please see LICENSE-APACHE2.
7+
//
8+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
9+
// either express or implied. See the LICENSE file for specific language governing
10+
// rights and limitations of this software.
11+
//
12+
// If you have any questions regarding licensing, please contact us at
13+
// info@rabbitmq.com.
14+
package com.rabbitmq.stream;
15+
16+
import com.rabbitmq.stream.MessageHandler.Context;
17+
18+
/**
19+
* Information on a stream.
20+
*
21+
* @see Environment#queryStreamInfo(String)
22+
*/
23+
public interface StreamStats {
24+
25+
/**
26+
* The first offset in the stream.
27+
*
28+
* @return first offset in the stream
29+
* @throws NoOffsetException if there is no first offset yet
30+
*/
31+
long firstOffset();
32+
33+
/**
34+
* The ID (offset) of the committed chunk (block of messages) in the stream.
35+
*
36+
* <p>It is the offset of the first message in the last chunk confirmed by a quorum of the stream
37+
* cluster members (leader and replicas).
38+
*
39+
* <p>The committed chunk ID is a good indication of what the last offset of a stream can be at a
40+
* given time. The value can be stale as soon as the application reads it though, as the committed
41+
* chunk ID for a stream that is published to changes all the time.
42+
*
43+
* @return committed offset in this stream
44+
* @see Context#committedChunkId()
45+
* @throws NoOffsetException if there is no committed chunk yet
46+
*/
47+
long committedChunkId();
48+
}

src/main/java/com/rabbitmq/stream/impl/Client.java

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import static com.rabbitmq.stream.Constants.COMMAND_SASL_AUTHENTICATE;
3333
import static com.rabbitmq.stream.Constants.COMMAND_SASL_HANDSHAKE;
3434
import static com.rabbitmq.stream.Constants.COMMAND_STORE_OFFSET;
35+
import static com.rabbitmq.stream.Constants.COMMAND_STREAM_STATS;
3536
import static com.rabbitmq.stream.Constants.COMMAND_SUBSCRIBE;
3637
import static com.rabbitmq.stream.Constants.COMMAND_UNSUBSCRIBE;
3738
import static com.rabbitmq.stream.Constants.RESPONSE_CODE_AUTHENTICATION_FAILURE;
@@ -1354,6 +1355,31 @@ List<FrameHandlerInfo> exchangeCommandVersions() {
13541355
}
13551356
}
13561357

1358+
StreamInfoResponse streamStats(String stream) {
1359+
if (stream == null) {
1360+
throw new IllegalArgumentException("stream must not be null");
1361+
}
1362+
int length = 2 + 2 + 4 + 2 + stream.length(); // API code, version, correlation ID, 1 string
1363+
int correlationId = correlationSequence.incrementAndGet();
1364+
try {
1365+
ByteBuf bb = allocate(length + 4);
1366+
bb.writeInt(length);
1367+
bb.writeShort(encodeRequestCode(COMMAND_STREAM_STATS));
1368+
bb.writeShort(VERSION_1);
1369+
bb.writeInt(correlationId);
1370+
bb.writeShort(stream.length());
1371+
bb.writeBytes(stream.getBytes(StandardCharsets.UTF_8));
1372+
OutstandingRequest<StreamInfoResponse> request = new OutstandingRequest<>(this.rpcTimeout);
1373+
outstandingRequests.put(correlationId, request);
1374+
channel.writeAndFlush(bb);
1375+
request.block();
1376+
return request.response.get();
1377+
} catch (RuntimeException e) {
1378+
outstandingRequests.remove(correlationId);
1379+
throw new StreamException(e);
1380+
}
1381+
}
1382+
13571383
void shutdownReason(ShutdownReason reason) {
13581384
this.shutdownReason = reason;
13591385
}
@@ -1447,7 +1473,7 @@ void handle(
14471473
byte subscriptionId,
14481474
long offset,
14491475
long chunkTimestamp,
1450-
long committedOffset,
1476+
long committedChunkId,
14511477
Message message);
14521478
}
14531479

@@ -1852,7 +1878,7 @@ static class QueryPublisherSequenceResponse extends Response {
18521878

18531879
private final long sequence;
18541880

1855-
public QueryPublisherSequenceResponse(short responseCode, long sequence) {
1881+
QueryPublisherSequenceResponse(short responseCode, long sequence) {
18561882
super(responseCode);
18571883
this.sequence = sequence;
18581884
}
@@ -1862,6 +1888,20 @@ public long getSequence() {
18621888
}
18631889
}
18641890

1891+
static class StreamInfoResponse extends Response {
1892+
1893+
private final Map<String, Long> info;
1894+
1895+
StreamInfoResponse(short responseCode, Map<String, Long> info) {
1896+
super(responseCode);
1897+
this.info = Collections.unmodifiableMap(new HashMap<>(info));
1898+
}
1899+
1900+
public Map<String, Long> getInfo() {
1901+
return info;
1902+
}
1903+
}
1904+
18651905
public static class StreamMetadata {
18661906

18671907
private final String stream;

src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -310,7 +310,7 @@ public long timestamp() {
310310
}
311311

312312
@Override
313-
public long committedOffset() {
313+
public long committedChunkId() {
314314
return committedOffset;
315315
}
316316

src/main/java/com/rabbitmq/stream/impl/ServerFrameHandler.java

Lines changed: 40 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import static com.rabbitmq.stream.Constants.COMMAND_ROUTE;
3535
import static com.rabbitmq.stream.Constants.COMMAND_SASL_AUTHENTICATE;
3636
import static com.rabbitmq.stream.Constants.COMMAND_SASL_HANDSHAKE;
37+
import static com.rabbitmq.stream.Constants.COMMAND_STREAM_STATS;
3738
import static com.rabbitmq.stream.Constants.COMMAND_SUBSCRIBE;
3839
import static com.rabbitmq.stream.Constants.COMMAND_TUNE;
3940
import static com.rabbitmq.stream.Constants.COMMAND_UNSUBSCRIBE;
@@ -62,6 +63,7 @@
6263
import com.rabbitmq.stream.impl.Client.Response;
6364
import com.rabbitmq.stream.impl.Client.SaslAuthenticateResponse;
6465
import com.rabbitmq.stream.impl.Client.ShutdownContext.ShutdownReason;
66+
import com.rabbitmq.stream.impl.Client.StreamInfoResponse;
6567
import com.rabbitmq.stream.impl.Client.StreamMetadata;
6668
import com.rabbitmq.stream.impl.Client.SubscriptionOffset;
6769
import com.rabbitmq.stream.metrics.MetricsCollector;
@@ -132,6 +134,7 @@ class ServerFrameHandler {
132134
handlers.put(COMMAND_ROUTE, new RouteFrameHandler());
133135
handlers.put(COMMAND_PARTITIONS, new PartitionsFrameHandler());
134136
handlers.put(COMMAND_EXCHANGE_COMMAND_VERSIONS, new ExchangeCommandVersionsFrameHandler());
137+
handlers.put(COMMAND_STREAM_STATS, new StreamStatsFrameHandler());
135138
HANDLERS = new FrameHandler[maxCommandKey + 1][];
136139
handlers
137140
.entrySet()
@@ -325,7 +328,7 @@ static int handleMessage(
325328
long offset,
326329
long offsetLimit,
327330
long chunkTimestamp,
328-
long committedOffset,
331+
long committedChunkId,
329332
Codec codec,
330333
MessageListener messageListener,
331334
byte subscriptionId) {
@@ -339,7 +342,7 @@ static int handleMessage(
339342
messageFiltered.set(true);
340343
} else {
341344
Message message = codec.decode(data);
342-
messageListener.handle(subscriptionId, offset, chunkTimestamp, committedOffset, message);
345+
messageListener.handle(subscriptionId, offset, chunkTimestamp, committedChunkId, message);
343346
}
344347
return read;
345348
}
@@ -597,7 +600,7 @@ int doHandle(Client client, ChannelHandlerContext ctx, ByteBuf message) {
597600
client.chunkChecksum,
598601
client.metricsCollector,
599602
message.readByte(), // subscription ID
600-
message.readLong(), // last committed offset, unsigned long
603+
message.readLong(), // committed chunk ID, unsigned long
601604
9 // byte read count, 1 + 9
602605
);
603606
}
@@ -1146,4 +1149,38 @@ int doHandle(Client client, ChannelHandlerContext ctx, ByteBuf message) {
11461149
return read;
11471150
}
11481151
}
1152+
1153+
private static class StreamStatsFrameHandler extends BaseFrameHandler {
1154+
1155+
@Override
1156+
int doHandle(Client client, ChannelHandlerContext ctx, ByteBuf message) {
1157+
int correlationId = message.readInt();
1158+
int read = 4;
1159+
1160+
short responseCode = message.readShort();
1161+
read += 2;
1162+
1163+
int infoCount = message.readInt();
1164+
read += 4;
1165+
Map<String, Long> info = new LinkedHashMap<>(infoCount);
1166+
1167+
for (int i = 0; i < infoCount; i++) {
1168+
String key = readString(message);
1169+
read += 2 + key.length();
1170+
long value = message.readLong();
1171+
info.put(key, value);
1172+
read += 8;
1173+
}
1174+
1175+
OutstandingRequest<StreamInfoResponse> outstandingRequest =
1176+
remove(client.outstandingRequests, correlationId, StreamInfoResponse.class);
1177+
if (outstandingRequest == null) {
1178+
LOGGER.warn("Could not find outstanding request with correlation ID {}", correlationId);
1179+
} else {
1180+
outstandingRequest.response().set(new StreamInfoResponse(responseCode, info));
1181+
outstandingRequest.countDown();
1182+
}
1183+
return read;
1184+
}
1185+
}
11491186
}

0 commit comments

Comments
 (0)