File tree Expand file tree Collapse file tree 6 files changed +54
-6
lines changed
main/java/com/rabbitmq/stream
test/java/com/rabbitmq/stream/impl Expand file tree Collapse file tree 6 files changed +54
-6
lines changed Original file line number Diff line number Diff line change @@ -61,6 +61,15 @@ static EnvironmentBuilder builder() {
6161 */
6262 void deleteStream (String stream );
6363
64+ /**
65+ * Query information on a stream.
66+ *
67+ * <p>Requires RabbitMQ 3.11 or more.
68+ *
69+ * @param stream
70+ * @return information on the stream
71+ * @throws UnsupportedOperationException if the broker does not support this command
72+ */
6473 StreamInfo queryStreamInfo (String stream );
6574
6675 /**
Original file line number Diff line number Diff line change @@ -54,7 +54,7 @@ interface Context {
5454 long timestamp ();
5555
5656 /**
57- * The committed offset on this stream.
57+ * The committed offset in this stream.
5858 *
5959 * <p>It is the offset of the last message confirmed by a quorum of the stream cluster members
6060 * (leader and replicas).
@@ -63,7 +63,10 @@ interface Context {
6363 * given time. The value can be stale as soon as the application reads it though, as the
6464 * committed offset for a stream that is published to changes all the time.
6565 *
66- * @return committed offset on this stream
66+ * <p>This requires RabbitMQ 3.11 or more. The method always returns 0 otherwise.
67+ *
68+ * @return committed offset in this stream
69+ * @see StreamInfo#committedOffset()
6770 */
6871 long committedOffset ();
6972
Original file line number Diff line number Diff line change 1313// info@rabbitmq.com.
1414package com .rabbitmq .stream ;
1515
16+ import com .rabbitmq .stream .MessageHandler .Context ;
17+
18+ /**
19+ * Information on a stream.
20+ *
21+ * @see Environment#queryStreamInfo(String)
22+ */
1623public interface StreamInfo {
1724
25+ /**
26+ * The first offset in the stream.
27+ *
28+ * @return first offset in the stream
29+ * @throws NoOffsetException if there is no first offset yet
30+ */
1831 long firstOffset ();
1932
33+ /**
34+ * The committed offset in the stream.
35+ *
36+ * <p>It is the offset of the last message confirmed by a quorum of the stream cluster members
37+ * (leader and replicas).
38+ *
39+ * <p>The committed offset is a good indication of what the last offset of a stream is at a given
40+ * time. The value can be stale as soon as the application reads it though, as the committed
41+ * offset for a stream that is published to changes all the time.
42+ *
43+ * @return committed offset in this stream
44+ * @see Context#committedOffset()
45+ * @throws NoOffsetException if there is no committed offset yet
46+ */
2047 long committedOffset ();
2148}
Original file line number Diff line number Diff line change 1313// info@rabbitmq.com.
1414package com .rabbitmq .stream ;
1515
16+ /** Exception to indicate a stream is not available. */
1617public class StreamNotAvailableException extends StreamException {
1718
18- private final String stream ;
19-
2019 public StreamNotAvailableException (String stream ) {
2120 super ("Stream " + stream + " is not available" , Constants .RESPONSE_CODE_STREAM_NOT_AVAILABLE );
22- this .stream = stream ;
2321 }
2422}
Original file line number Diff line number Diff line change @@ -391,7 +391,16 @@ public void deleteStream(String stream) {
391391
392392 @ Override
393393 public StreamInfo queryStreamInfo (String stream ) {
394- StreamInfoResponse response = locatorOperation (client -> client .streamInfo (stream ));
394+ StreamInfoResponse response =
395+ locatorOperation (
396+ client -> {
397+ if (Utils .is3_11_OrMore (client .brokerVersion ())) {
398+ return client .streamInfo (stream );
399+ } else {
400+ throw new UnsupportedOperationException (
401+ "QueryStringInfo is available only for RabbitMQ 3.11 or more." );
402+ }
403+ });
395404 if (response .isOk ()) {
396405 Map <String , String > info = response .getInfo ();
397406 BiFunction <String , String , LongSupplier > offsetSupplierLogic =
Original file line number Diff line number Diff line change @@ -922,12 +922,14 @@ void streamInfoShouldReturnFirstOffsetAndCommittedOffset() throws Exception {
922922 }
923923
924924 @ Test
925+ @ BrokerVersionAtLeast ("3.11.0" )
925926 void streamInfoShouldReturnErrorWhenStreamDoesNotExist () {
926927 assertThat (cf .get ().streamInfo ("does not exist" ).getResponseCode ())
927928 .isEqualTo (Constants .RESPONSE_CODE_STREAM_DOES_NOT_EXIST );
928929 }
929930
930931 @ Test
932+ @ BrokerVersionAtLeast ("3.11.0" )
931933 void streamInfoFirstOffsetShouldChangeAfterRetentionKickedIn (TestInfo info ) {
932934 int messageCount = 1000 ;
933935 int payloadSize = 1000 ;
You can’t perform that action at this time.
0 commit comments