From b96a87d49895ffeec7b73e333645b4b0b43525ab Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?=
<514737+acogoluegnes@users.noreply.github.com>
Date: Mon, 22 Sep 2025 16:45:43 +0200
Subject: [PATCH 1/4] Add resource listeners to track producer/consumer state
---
.../com/rabbitmq/stream/ConsumerBuilder.java | 9 ++
.../stream/InvalidStateException.java | 26 ++++
.../com/rabbitmq/stream/ProducerBuilder.java | 9 ++
.../java/com/rabbitmq/stream/Resource.java | 89 ++++++++++++
.../stream/ResourceClosedException.java | 22 +++
.../com/rabbitmq/stream/StreamException.java | 4 +-
.../stream/impl/ConsumersCoordinator.java | 14 +-
.../rabbitmq/stream/impl/ResourceBase.java | 60 ++++++++
.../stream/impl/StateEventSupport.java | 74 ++++++++++
.../rabbitmq/stream/impl/StreamConsumer.java | 72 ++++------
.../stream/impl/StreamConsumerBuilder.java | 15 +-
.../rabbitmq/stream/impl/StreamProducer.java | 42 +++---
.../stream/impl/StreamProducerBuilder.java | 16 +++
.../stream/impl/SuperStreamProducer.java | 1 -
src/test/java/com/rabbitmq/stream/Cli.java | 20 +++
.../stream/impl/ResourceListenerTest.java | 133 ++++++++++++++++++
.../impl/StreamEnvironmentOAuth2Test.java | 14 +-
.../stream/impl/StreamEnvironmentTest.java | 4 +-
.../stream/impl/StreamProducerTest.java | 5 +-
.../stream/impl/StreamProducerUnitTest.java | 4 +
20 files changed, 542 insertions(+), 91 deletions(-)
create mode 100644 src/main/java/com/rabbitmq/stream/InvalidStateException.java
create mode 100644 src/main/java/com/rabbitmq/stream/Resource.java
create mode 100644 src/main/java/com/rabbitmq/stream/ResourceClosedException.java
create mode 100644 src/main/java/com/rabbitmq/stream/impl/ResourceBase.java
create mode 100644 src/main/java/com/rabbitmq/stream/impl/StateEventSupport.java
create mode 100644 src/test/java/com/rabbitmq/stream/impl/ResourceListenerTest.java
diff --git a/src/main/java/com/rabbitmq/stream/ConsumerBuilder.java b/src/main/java/com/rabbitmq/stream/ConsumerBuilder.java
index 29fc8646e9..cf49d5d57d 100644
--- a/src/main/java/com/rabbitmq/stream/ConsumerBuilder.java
+++ b/src/main/java/com/rabbitmq/stream/ConsumerBuilder.java
@@ -126,6 +126,15 @@ public interface ConsumerBuilder {
*/
ConsumerBuilder subscriptionListener(SubscriptionListener subscriptionListener);
+ /**
+ * Add {@link Resource.StateListener}s to the consumer.
+ *
+ * @param listeners listeners
+ * @return this builder instance
+ * @since 1.3.0
+ */
+ ConsumerBuilder listeners(Resource.StateListener... listeners);
+
/**
* Enable {@link ManualTrackingStrategy}.
*
diff --git a/src/main/java/com/rabbitmq/stream/InvalidStateException.java b/src/main/java/com/rabbitmq/stream/InvalidStateException.java
new file mode 100644
index 0000000000..07dac19696
--- /dev/null
+++ b/src/main/java/com/rabbitmq/stream/InvalidStateException.java
@@ -0,0 +1,26 @@
+// Copyright (c) 2025 Broadcom. All Rights Reserved.
+// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
+//
+// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
+// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
+// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL,
+// please see LICENSE-APACHE2.
+//
+// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
+// either express or implied. See the LICENSE file for specific language governing
+// rights and limitations of this software.
+//
+// If you have any questions regarding licensing, please contact us at
+// info@rabbitmq.com.
+package com.rabbitmq.stream;
+
+/**
+ * Exception thrown when a resource is not in an appropriate state.
+ *
+ *
An example is a connection that is initializing.
+ */
+public class InvalidStateException extends StreamException {
+ public InvalidStateException(String format, Object... args) {
+ super(format, args);
+ }
+}
diff --git a/src/main/java/com/rabbitmq/stream/ProducerBuilder.java b/src/main/java/com/rabbitmq/stream/ProducerBuilder.java
index b89524de96..2862a95ebf 100644
--- a/src/main/java/com/rabbitmq/stream/ProducerBuilder.java
+++ b/src/main/java/com/rabbitmq/stream/ProducerBuilder.java
@@ -192,6 +192,15 @@ public interface ProducerBuilder {
*/
ProducerBuilder filterValue(Function filterValueExtractor);
+ /**
+ * Add {@link Resource.StateListener}s to the producer.
+ *
+ * @param listeners listeners
+ * @return this builder instance
+ * @since 1.3.0
+ */
+ ProducerBuilder listeners(Resource.StateListener... listeners);
+
/**
* Create the {@link Producer} instance.
*
diff --git a/src/main/java/com/rabbitmq/stream/Resource.java b/src/main/java/com/rabbitmq/stream/Resource.java
new file mode 100644
index 0000000000..c3df79bd3e
--- /dev/null
+++ b/src/main/java/com/rabbitmq/stream/Resource.java
@@ -0,0 +1,89 @@
+// Copyright (c) 2024-2025 Broadcom. All Rights Reserved.
+// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
+//
+// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
+// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
+// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL,
+// please see LICENSE-APACHE2.
+//
+// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
+// either express or implied. See the LICENSE file for specific language governing
+// rights and limitations of this software.
+//
+// If you have any questions regarding licensing, please contact us at
+// info@rabbitmq.com.
+package com.rabbitmq.stream;
+
+/**
+ * Marker interface for {@link com.rabbitmq.stream.Resource}-like classes.
+ *
+ * Instances of these classes have different states during their lifecycle: open, recovering,
+ * closed, etc. Application can be interested in taking some actions for a given state (e.g.
+ * stopping publishing when a {@link com.rabbitmq.stream.Producer} is recovering after a connection
+ * problem and resuming publishing when it is open again).
+ *
+ * @see com.rabbitmq.stream.Producer
+ * @see com.rabbitmq.stream.Consumer
+ */
+public interface Resource {
+
+ /**
+ * Application listener for a {@link com.rabbitmq.stream.Resource}.
+ *
+ *
They are registered at creation time.
+ *
+ * @see
+ * com.rabbitmq.stream.ProducerBuilder#listeners(com.rabbitmq.stream.Resource.StateListener...)
+ * @see
+ * com.rabbitmq.stream.ConsumerBuilder#listeners(com.rabbitmq.stream.Resource.StateListener...)
+ */
+ @FunctionalInterface
+ interface StateListener {
+
+ /**
+ * Handle state change.
+ *
+ * @param context state change context
+ */
+ void handle(Context context);
+ }
+
+ /** Context of a resource state change. */
+ interface Context {
+
+ /**
+ * The resource instance.
+ *
+ * @return resource instance
+ */
+ Resource resource();
+
+ /**
+ * The previous state of the resource.
+ *
+ * @return previous state
+ */
+ State previousState();
+
+ /**
+ * The current (new) state of the resource.
+ *
+ * @return current state
+ */
+ State currentState();
+ }
+
+ /** Resource state. */
+ enum State {
+ /** The resource is currently opening. */
+ OPENING,
+ /** The resource is open and functional. */
+ OPEN,
+ /** The resource is recovering. */
+ RECOVERING,
+ /** The resource is closing. */
+ CLOSING,
+ /** The resource is closed. */
+ CLOSED
+ }
+}
diff --git a/src/main/java/com/rabbitmq/stream/ResourceClosedException.java b/src/main/java/com/rabbitmq/stream/ResourceClosedException.java
new file mode 100644
index 0000000000..871153f840
--- /dev/null
+++ b/src/main/java/com/rabbitmq/stream/ResourceClosedException.java
@@ -0,0 +1,22 @@
+// Copyright (c) 2025 Broadcom. All Rights Reserved.
+// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
+//
+// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
+// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
+// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL,
+// please see LICENSE-APACHE2.
+//
+// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
+// either express or implied. See the LICENSE file for specific language governing
+// rights and limitations of this software.
+//
+// If you have any questions regarding licensing, please contact us at
+// info@rabbitmq.com.
+package com.rabbitmq.stream;
+
+/** Exception thrown when a resource is not usable because it is closed. */
+public class ResourceClosedException extends InvalidStateException {
+ public ResourceClosedException(String format, Object... args) {
+ super(format, args);
+ }
+}
diff --git a/src/main/java/com/rabbitmq/stream/StreamException.java b/src/main/java/com/rabbitmq/stream/StreamException.java
index 4830512009..9d34f49e1f 100644
--- a/src/main/java/com/rabbitmq/stream/StreamException.java
+++ b/src/main/java/com/rabbitmq/stream/StreamException.java
@@ -25,8 +25,8 @@ public class StreamException extends RuntimeException {
private final short code;
- public StreamException(String message) {
- super(message);
+ public StreamException(String format, Object... args) {
+ super(String.format(format, args));
this.code = -1;
}
diff --git a/src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java b/src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java
index fcff1116f3..d6d65c5dc3 100644
--- a/src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java
+++ b/src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java
@@ -506,15 +506,15 @@ SubscriptionState state() {
return this.state.get();
}
- private void markConsuming() {
+ private void markOpen() {
if (this.consumer != null) {
- this.consumer.consuming();
+ this.consumer.markOpen();
}
}
- private void markNotConsuming() {
+ private void markRecovering() {
if (this.consumer != null) {
- this.consumer.notConsuming();
+ this.consumer.markRecovering();
}
}
@@ -712,7 +712,7 @@ private ClientSubscriptionsManager(
"Subscription connection has {} consumer(s) over {} stream(s) to recover",
this.subscriptionTrackers.stream().filter(Objects::nonNull).count(),
this.streamToStreamSubscriptions.size());
- iterate(this.subscriptionTrackers, SubscriptionTracker::markNotConsuming);
+ iterate(this.subscriptionTrackers, SubscriptionTracker::markRecovering);
environment
.scheduledExecutorService()
.execute(
@@ -787,7 +787,7 @@ private ClientSubscriptionsManager(
}
if (affectedSubscriptions != null && !affectedSubscriptions.isEmpty()) {
- iterate(affectedSubscriptions, SubscriptionTracker::markNotConsuming);
+ iterate(affectedSubscriptions, SubscriptionTracker::markRecovering);
environment
.scheduledExecutorService()
.execute(
@@ -1146,7 +1146,7 @@ void add(
throw e;
}
subscriptionTracker.state(SubscriptionState.ACTIVE);
- subscriptionTracker.markConsuming();
+ subscriptionTracker.markOpen();
LOGGER.debug("Subscribed to '{}'", subscriptionTracker.stream);
} finally {
this.subscriptionManagerLock.unlock();
diff --git a/src/main/java/com/rabbitmq/stream/impl/ResourceBase.java b/src/main/java/com/rabbitmq/stream/impl/ResourceBase.java
new file mode 100644
index 0000000000..9e86570e89
--- /dev/null
+++ b/src/main/java/com/rabbitmq/stream/impl/ResourceBase.java
@@ -0,0 +1,60 @@
+// Copyright (c) 2024-2025 Broadcom. All Rights Reserved.
+// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
+//
+// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
+// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
+// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL,
+// please see LICENSE-APACHE2.
+//
+// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
+// either express or implied. See the LICENSE file for specific language governing
+// rights and limitations of this software.
+//
+// If you have any questions regarding licensing, please contact us at
+// info@rabbitmq.com.
+package com.rabbitmq.stream.impl;
+
+import static com.rabbitmq.stream.Resource.State.CLOSED;
+import static com.rabbitmq.stream.Resource.State.OPEN;
+import static com.rabbitmq.stream.Resource.State.OPENING;
+
+import com.rabbitmq.stream.InvalidStateException;
+import com.rabbitmq.stream.Resource;
+import com.rabbitmq.stream.ResourceClosedException;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+
+abstract class ResourceBase implements Resource {
+
+ private final AtomicReference state = new AtomicReference<>();
+ private final StateEventSupport stateEventSupport;
+
+ ResourceBase(List listeners) {
+ this.stateEventSupport = new StateEventSupport(listeners);
+ this.state(OPENING);
+ }
+
+ protected void checkOpen() {
+ State state = this.state.get();
+ if (state == CLOSED) {
+ throw new ResourceClosedException("Resource is closed");
+ } else if (state != OPEN) {
+ throw new InvalidStateException("Resource is not open, current state is %s", state.name());
+ }
+ }
+
+ protected State state() {
+ return this.state.get();
+ }
+
+ protected void state(Resource.State state) {
+ Resource.State previousState = this.state.getAndSet(state);
+ if (state != previousState) {
+ this.dispatch(previousState, state);
+ }
+ }
+
+ private void dispatch(State previous, State current) {
+ this.stateEventSupport.dispatch(this, previous, current);
+ }
+}
diff --git a/src/main/java/com/rabbitmq/stream/impl/StateEventSupport.java b/src/main/java/com/rabbitmq/stream/impl/StateEventSupport.java
new file mode 100644
index 0000000000..4f0e08ce05
--- /dev/null
+++ b/src/main/java/com/rabbitmq/stream/impl/StateEventSupport.java
@@ -0,0 +1,74 @@
+// Copyright (c) 2024-2025 Broadcom. All Rights Reserved.
+// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
+//
+// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
+// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
+// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL,
+// please see LICENSE-APACHE2.
+//
+// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
+// either express or implied. See the LICENSE file for specific language governing
+// rights and limitations of this software.
+//
+// If you have any questions regarding licensing, please contact us at
+// info@rabbitmq.com.
+package com.rabbitmq.stream.impl;
+
+import com.rabbitmq.stream.Resource;
+import java.util.List;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class StateEventSupport {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(StateEventSupport.class);
+
+ private final List listeners;
+
+ StateEventSupport(List listeners) {
+ this.listeners = List.copyOf(listeners);
+ }
+
+ void dispatch(Resource resource, Resource.State previousState, Resource.State currentState) {
+ if (!this.listeners.isEmpty()) {
+ Resource.Context context = new DefaultContext(resource, previousState, currentState);
+ this.listeners.forEach(
+ l -> {
+ try {
+ l.handle(context);
+ } catch (Exception e) {
+ LOGGER.warn("Error in resource listener", e);
+ }
+ });
+ }
+ }
+
+ private static class DefaultContext implements Resource.Context {
+
+ private final Resource resource;
+ private final Resource.State previousState;
+ private final Resource.State currentState;
+
+ private DefaultContext(
+ Resource resource, Resource.State previousState, Resource.State currentState) {
+ this.resource = resource;
+ this.previousState = previousState;
+ this.currentState = currentState;
+ }
+
+ @Override
+ public Resource resource() {
+ return this.resource;
+ }
+
+ @Override
+ public Resource.State previousState() {
+ return this.previousState;
+ }
+
+ @Override
+ public Resource.State currentState() {
+ return this.currentState;
+ }
+ }
+}
diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java b/src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java
index 1c868002e6..273bf61757 100644
--- a/src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java
+++ b/src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java
@@ -15,18 +15,23 @@
package com.rabbitmq.stream.impl;
import static com.rabbitmq.stream.BackOffDelayPolicy.fixedWithInitialDelay;
+import static com.rabbitmq.stream.Resource.State.CLOSED;
+import static com.rabbitmq.stream.Resource.State.CLOSING;
+import static com.rabbitmq.stream.Resource.State.OPEN;
+import static com.rabbitmq.stream.Resource.State.OPENING;
+import static com.rabbitmq.stream.Resource.State.RECOVERING;
import static com.rabbitmq.stream.impl.AsyncRetry.asyncRetry;
import static com.rabbitmq.stream.impl.Utils.offsetBefore;
import static java.lang.String.format;
import com.rabbitmq.stream.*;
-import com.rabbitmq.stream.MessageHandler.Context;
import com.rabbitmq.stream.impl.StreamConsumerBuilder.TrackingConfiguration;
import com.rabbitmq.stream.impl.StreamEnvironment.LocatorNotAvailableException;
import com.rabbitmq.stream.impl.StreamEnvironment.TrackingConsumerRegistration;
import com.rabbitmq.stream.impl.Utils.CompositeConsumerUpdateListener;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.time.Duration;
+import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.*;
@@ -41,7 +46,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-class StreamConsumer implements Consumer {
+final class StreamConsumer extends ResourceBase implements Consumer {
private static final AtomicLong ID_SEQUENCE = new AtomicLong(0);
@@ -57,14 +62,12 @@ class StreamConsumer implements Consumer {
private volatile Runnable closingCallback;
private volatile Client trackingClient;
private volatile Client subscriptionClient;
- private volatile Status status;
private volatile long lastRequestedStoredOffset = 0;
private final AtomicBoolean nothingStoredYet = new AtomicBoolean(true);
private volatile boolean sacActive;
private final boolean sac;
private final OffsetSpecification initialOffsetSpecification;
private final Lock lock = new ReentrantLock();
- private volatile boolean consuming;
@SuppressFBWarnings("CT_CONSTRUCTOR_THROW")
StreamConsumer(
@@ -78,7 +81,9 @@ class StreamConsumer implements Consumer {
SubscriptionListener subscriptionListener,
Map subscriptionProperties,
ConsumerUpdateListener consumerUpdateListener,
- ConsumerFlowStrategy flowStrategy) {
+ ConsumerFlowStrategy flowStrategy,
+ List listeners) {
+ super(listeners);
if (Utils.filteringEnabled(subscriptionProperties) && !environment.filteringSupported()) {
throw new IllegalArgumentException(
"Filtering is not supported by the broker "
@@ -103,7 +108,7 @@ class StreamConsumer implements Consumer {
trackingClosingCallback = trackingConsumerRegistration.closingCallback();
- java.util.function.Consumer postMessageProcessingCallback =
+ java.util.function.Consumer postMessageProcessingCallback =
trackingConsumerRegistration.postMessageProcessingCallback();
if (postMessageProcessingCallback == null) {
// no callback, no need to decorate
@@ -225,7 +230,7 @@ class StreamConsumer implements Consumer {
Runnable init =
() -> {
- this.status = Status.INITIALIZING;
+ this.state(State.OPENING);
this.closingCallback =
environment.registerConsumer(
this,
@@ -237,8 +242,7 @@ class StreamConsumer implements Consumer {
closedAwareMessageHandler,
Map.copyOf(subscriptionProperties),
flowStrategy);
-
- this.status = Status.RUNNING;
+ this.state(OPEN);
};
if (lazyInit) {
this.initCallback = init;
@@ -250,7 +254,6 @@ class StreamConsumer implements Consumer {
this.closed.set(true);
throw e;
}
- this.consuming = true;
}
static OffsetSpecification getStoredOffset(
@@ -349,7 +352,7 @@ void start() {
@Override
public void store(long offset) {
- checkNotClosed();
+ checkOpen();
trackingCallback.accept(offset);
if (canTrack()) {
if (offsetBefore(this.lastRequestedStoredOffset, offset)
@@ -441,14 +444,16 @@ boolean sacActive() {
}
boolean canTrack() {
- return ((this.status == Status.INITIALIZING || this.status == Status.RUNNING)
- || (this.trackingClient == null && this.status == Status.NOT_AVAILABLE))
+ // FIXME check the condition to be able to track
+ return ((this.state() == OPENING || this.state() == OPEN)
+ || (this.trackingClient == null && this.state() == RECOVERING))
&& this.name != null;
}
@Override
public void close() {
if (closed.compareAndSet(false, true)) {
+ this.state(CLOSING);
this.environment.removeConsumer(this);
closeFromEnvironment();
}
@@ -459,7 +464,7 @@ void closeFromEnvironment() {
LOGGER.debug("Calling consumer {} closing callback (stream {})", this.id, this.stream);
this.closingCallback.run();
closed.set(true);
- this.status = Status.CLOSED;
+ this.state(CLOSED);
LOGGER.debug("Closed consumer successfully");
}
@@ -467,7 +472,7 @@ void closeAfterStreamDeletion() {
if (closed.compareAndSet(false, true)) {
this.maybeNotifyActiveToInactiveSac();
this.environment.removeConsumer(this);
- this.status = Status.CLOSED;
+ this.state(CLOSED);
}
}
@@ -499,11 +504,11 @@ private void maybeNotifyActiveToInactiveSac() {
}
}
- synchronized void unavailable() {
+ void unavailable() {
Utils.lock(
this.lock,
() -> {
- this.status = Status.NOT_AVAILABLE;
+ this.state(RECOVERING);
this.trackingClient = null;
});
}
@@ -517,11 +522,11 @@ void unlock() {
}
void running() {
- this.status = Status.RUNNING;
+ this.state(OPEN);
}
long storedOffset(Supplier clientSupplier) {
- checkNotClosed();
+ checkOpen();
if (canTrack()) {
return OffsetTrackingUtils.storedOffset(clientSupplier, this.name, this.stream);
} else if (this.name == null) {
@@ -530,8 +535,8 @@ long storedOffset(Supplier clientSupplier) {
} else {
throw new IllegalStateException(
format(
- "Not possible to query offset for consumer %s on stream %s for now, consumer status is %s",
- this.name, this.stream, this.status.name()));
+ "Not possible to query offset for consumer %s on stream %s for now, consumer state is %s",
+ this.name, this.stream, this.state().name()));
}
}
@@ -544,13 +549,6 @@ String stream() {
return this.stream;
}
- enum Status {
- INITIALIZING,
- RUNNING,
- NOT_AVAILABLE,
- CLOSED
- }
-
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -589,12 +587,6 @@ public String toString() {
+ "}";
}
- private void checkNotClosed() {
- if (this.status == Status.CLOSED) {
- throw new IllegalStateException("This producer instance has been closed");
- }
- }
-
long id() {
return this.id;
}
@@ -608,15 +600,11 @@ String subscriptionConnectionName() {
}
}
- void notConsuming() {
- this.consuming = false;
- }
-
- void consuming() {
- this.consuming = true;
+ void markRecovering() {
+ state(RECOVERING);
}
- boolean isConsuming() {
- return this.consuming;
+ void markOpen() {
+ state(OPEN);
}
}
diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamConsumerBuilder.java b/src/main/java/com/rabbitmq/stream/impl/StreamConsumerBuilder.java
index ec00136800..8d769f8a46 100644
--- a/src/main/java/com/rabbitmq/stream/impl/StreamConsumerBuilder.java
+++ b/src/main/java/com/rabbitmq/stream/impl/StreamConsumerBuilder.java
@@ -22,6 +22,7 @@
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.time.Duration;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
@@ -48,6 +49,7 @@ class StreamConsumerBuilder implements ConsumerBuilder {
private final DefaultFlowConfiguration flowConfiguration = new DefaultFlowConfiguration(this);
private ConsumerUpdateListener consumerUpdateListener;
private DefaultFilterConfiguration filterConfiguration;
+ private final List listeners = new ArrayList<>();
public StreamConsumerBuilder(StreamEnvironment environment) {
this.environment = environment;
@@ -112,6 +114,16 @@ public ConsumerBuilder subscriptionListener(SubscriptionListener subscriptionLis
return this;
}
+ @Override
+ public ConsumerBuilder listeners(Resource.StateListener... listeners) {
+ if (listeners == null || listeners.length == 0) {
+ this.listeners.clear();
+ } else {
+ this.listeners.addAll(List.of(listeners));
+ }
+ return this;
+ }
+
@Override
@SuppressFBWarnings("AT_STALE_THREAD_WRITE_OF_PRIMITIVE")
public ManualTrackingStrategy manualTrackingStrategy() {
@@ -241,7 +253,8 @@ public Consumer build() {
this.subscriptionListener,
this.subscriptionProperties,
this.consumerUpdateListener,
- this.flowConfiguration.strategy);
+ this.flowConfiguration.strategy,
+ this.listeners);
environment.addConsumer((StreamConsumer) consumer);
} else {
if (Utils.isSac(this.subscriptionProperties)) {
diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java b/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java
index 516afe16f3..38ad4eacf6 100644
--- a/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java
+++ b/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java
@@ -15,6 +15,10 @@
package com.rabbitmq.stream.impl;
import static com.rabbitmq.stream.Constants.*;
+import static com.rabbitmq.stream.Resource.State.CLOSED;
+import static com.rabbitmq.stream.Resource.State.CLOSING;
+import static com.rabbitmq.stream.Resource.State.OPEN;
+import static com.rabbitmq.stream.Resource.State.RECOVERING;
import static com.rabbitmq.stream.impl.Utils.formatConstant;
import static com.rabbitmq.stream.impl.Utils.namedRunnable;
@@ -50,7 +54,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-class StreamProducer implements Producer {
+final class StreamProducer extends ResourceBase implements Producer {
private static final AtomicLong ID_SEQUENCE = new AtomicLong(0);
@@ -77,7 +81,6 @@ class StreamProducer implements Producer {
private final boolean retryOnRecovery;
private volatile Client client;
private volatile byte publisherId;
- private volatile Status status;
private volatile ScheduledFuture> confirmTimeoutFuture;
private final short publishVersion;
private final Lock lock = new ReentrantLock();
@@ -96,7 +99,9 @@ class StreamProducer implements Producer {
Duration enqueueTimeout,
boolean retryOnRecovery,
Function filterValueExtractor,
+ List listeners,
StreamEnvironment environment) {
+ super(listeners);
if (filterValueExtractor != null && !environment.filteringSupported()) {
throw new IllegalArgumentException(
"Filtering is not supported by the broker "
@@ -204,7 +209,7 @@ public int fragmentLength(Object entity) {
if (canSend()) {
this.accumulator.flush(false);
}
- if (status != Status.CLOSED) {
+ if (this.state() != CLOSED) {
environment
.scheduledExecutorService()
.schedule(
@@ -242,7 +247,7 @@ public int fragmentLength(Object entity) {
LOGGER.info("Error while executing confirm timeout check task: {}", e.getCause());
}
- if (this.status != Status.CLOSED) {
+ if (this.state() != CLOSED) {
this.confirmTimeoutFuture =
this.environment
.scheduledExecutorService()
@@ -269,7 +274,7 @@ public int fragmentLength(Object entity) {
confirmTimeout.toMillis(),
TimeUnit.MILLISECONDS);
}
- this.status = Status.RUNNING;
+ this.state(State.OPEN);
}
private Runnable confirmTimeoutTask(Duration confirmTimeout) {
@@ -408,10 +413,10 @@ private void doSend(Message message, ConfirmationHandler confirmationHandler) {
}
private void failPublishing(Message message, ConfirmationHandler confirmationHandler) {
- if (this.status == Status.NOT_AVAILABLE) {
+ if (this.state() == RECOVERING) {
confirmationHandler.handle(
new ConfirmationStatus(message, false, CODE_PRODUCER_NOT_AVAILABLE));
- } else if (this.status == Status.CLOSED) {
+ } else if (this.state() == CLOSED) {
confirmationHandler.handle(new ConfirmationStatus(message, false, CODE_PRODUCER_CLOSED));
} else {
confirmationHandler.handle(
@@ -420,13 +425,14 @@ private void failPublishing(Message message, ConfirmationHandler confirmationHan
}
boolean canSend() {
- return this.status == Status.RUNNING;
+ return this.state() == OPEN;
}
@Override
public void close() {
if (this.closed.compareAndSet(false, true)) {
- if (this.status == Status.RUNNING && this.client != null) {
+ this.state(CLOSING);
+ if (this.state() == OPEN && this.client != null) {
LOGGER.debug("Deleting producer {}", this.publisherId);
Response response = this.client.deletePublisher(this.publisherId);
if (!response.isOk()) {
@@ -449,7 +455,7 @@ void closeFromEnvironment() {
this.closingCallback.run();
cancelConfirmTimeoutTask();
this.closed.set(true);
- this.status = Status.CLOSED;
+ this.state(CLOSED);
LOGGER.debug("Closed publisher {} successfully", this.publisherId);
}
@@ -467,7 +473,7 @@ void closeAfterStreamDeletion(short code) {
}
cancelConfirmTimeoutTask();
this.environment.removeProducer(this);
- this.status = Status.CLOSED;
+ this.state(CLOSED);
}
}
@@ -491,7 +497,7 @@ boolean isOpen() {
}
void unavailable() {
- this.status = Status.NOT_AVAILABLE;
+ this.state(RECOVERING);
}
void running() {
@@ -560,7 +566,7 @@ void running() {
}
}
});
- this.status = Status.RUNNING;
+ this.state(OPEN);
}
void setClient(Client client) {
@@ -571,16 +577,6 @@ void setPublisherId(byte publisherId) {
this.executeInLock(() -> this.publisherId = publisherId);
}
- Status status() {
- return this.status;
- }
-
- enum Status {
- RUNNING,
- NOT_AVAILABLE,
- CLOSED
- }
-
@Override
public boolean equals(Object o) {
if (this == o) {
diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamProducerBuilder.java b/src/main/java/com/rabbitmq/stream/impl/StreamProducerBuilder.java
index b44290868c..80e54ff238 100644
--- a/src/main/java/com/rabbitmq/stream/impl/StreamProducerBuilder.java
+++ b/src/main/java/com/rabbitmq/stream/impl/StreamProducerBuilder.java
@@ -17,12 +17,15 @@
import com.rabbitmq.stream.Message;
import com.rabbitmq.stream.Producer;
import com.rabbitmq.stream.ProducerBuilder;
+import com.rabbitmq.stream.Resource;
import com.rabbitmq.stream.RoutingStrategy;
import com.rabbitmq.stream.StreamException;
import com.rabbitmq.stream.compression.Compression;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
import java.util.function.Function;
import java.util.function.ToIntFunction;
@@ -59,6 +62,8 @@ class StreamProducerBuilder implements ProducerBuilder {
private boolean dynamicBatch = DEFAULT_DYNAMIC_BATCH;
+ private final List listeners = new ArrayList<>();
+
StreamProducerBuilder(StreamEnvironment environment) {
this.environment = environment;
}
@@ -158,6 +163,16 @@ public ProducerBuilder filterValue(Function filterValueExtracto
return this;
}
+ @Override
+ public ProducerBuilder listeners(Resource.StateListener... listeners) {
+ if (listeners == null || listeners.length == 0) {
+ this.listeners.clear();
+ } else {
+ this.listeners.addAll(List.of(listeners));
+ }
+ return this;
+ }
+
@Override
public RoutingConfiguration routing(Function routingKeyExtractor) {
this.routingConfiguration = new DefaultRoutingConfiguration(this);
@@ -219,6 +234,7 @@ public Producer build() {
enqueueTimeout,
retryOnRecovery,
filterValueExtractor,
+ listeners,
environment);
this.environment.addProducer((StreamProducer) producer);
} else {
diff --git a/src/main/java/com/rabbitmq/stream/impl/SuperStreamProducer.java b/src/main/java/com/rabbitmq/stream/impl/SuperStreamProducer.java
index 91cc4722f0..7ce09a246e 100644
--- a/src/main/java/com/rabbitmq/stream/impl/SuperStreamProducer.java
+++ b/src/main/java/com/rabbitmq/stream/impl/SuperStreamProducer.java
@@ -120,7 +120,6 @@ public void send(Message message, ConfirmationHandler confirmationHandler) {
producer(streams.get(0)).send(message, confirmationHandler);
} else {
for (int i = 0; i < streams.size(); i++) {
- Producer producer = producer(streams.get(i));
producer(streams.get(i)).send(messageInterceptor.apply(i, message), confirmationHandler);
}
}
diff --git a/src/test/java/com/rabbitmq/stream/Cli.java b/src/test/java/com/rabbitmq/stream/Cli.java
index bb5298bb36..279eaa6494 100644
--- a/src/test/java/com/rabbitmq/stream/Cli.java
+++ b/src/test/java/com/rabbitmq/stream/Cli.java
@@ -154,6 +154,26 @@ public static List listConnections() {
return connectionInfoList;
}
+ public static List listLocatorConnections() {
+ return listConnections().stream()
+ .filter(c -> c.clientProvidedName() != null && c.clientProvidedName().contains("-locator-"))
+ .collect(Collectors.toList());
+ }
+
+ public static List listConsumerConnections() {
+ return listConnections().stream()
+ .filter(
+ c -> c.clientProvidedName() != null && c.clientProvidedName().contains("-consumer-"))
+ .collect(Collectors.toList());
+ }
+
+ public static List listProducerConnections() {
+ return listConnections().stream()
+ .filter(
+ c -> c.clientProvidedName() != null && c.clientProvidedName().contains("-producer-"))
+ .collect(Collectors.toList());
+ }
+
private static Map buildClientProperties(String[] fields) {
String clientPropertiesString = fields[1];
clientPropertiesString =
diff --git a/src/test/java/com/rabbitmq/stream/impl/ResourceListenerTest.java b/src/test/java/com/rabbitmq/stream/impl/ResourceListenerTest.java
new file mode 100644
index 0000000000..40a2e3d020
--- /dev/null
+++ b/src/test/java/com/rabbitmq/stream/impl/ResourceListenerTest.java
@@ -0,0 +1,133 @@
+// Copyright (c) 2025 Broadcom. All Rights Reserved.
+// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
+//
+// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
+// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
+// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL,
+// please see LICENSE-APACHE2.
+//
+// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
+// either express or implied. See the LICENSE file for specific language governing
+// rights and limitations of this software.
+//
+// If you have any questions regarding licensing, please contact us at
+// info@rabbitmq.com.
+package com.rabbitmq.stream.impl;
+
+import static com.rabbitmq.stream.Resource.State.CLOSED;
+import static com.rabbitmq.stream.Resource.State.CLOSING;
+import static com.rabbitmq.stream.Resource.State.OPEN;
+import static com.rabbitmq.stream.Resource.State.OPENING;
+import static com.rabbitmq.stream.Resource.State.RECOVERING;
+import static com.rabbitmq.stream.impl.Assertions.assertThat;
+
+import com.rabbitmq.stream.BackOffDelayPolicy;
+import com.rabbitmq.stream.Cli;
+import com.rabbitmq.stream.Consumer;
+import com.rabbitmq.stream.Environment;
+import com.rabbitmq.stream.Producer;
+import com.rabbitmq.stream.Resource;
+import io.netty.channel.EventLoopGroup;
+import java.time.Duration;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+@StreamTestInfrastructure
+public class ResourceListenerTest {
+
+ String stream;
+ EventLoopGroup eventLoopGroup;
+ Environment environment;
+
+ @BeforeEach
+ void init() {
+ environment =
+ Environment.builder()
+ .recoveryBackOffDelayPolicy(BackOffDelayPolicy.fixed(Duration.ofSeconds(1)))
+ .netty()
+ .eventLoopGroup(eventLoopGroup)
+ .environmentBuilder()
+ .build();
+ }
+
+ @AfterEach
+ void tearDown() {
+ environment.close();
+ }
+
+ @Test
+ void publisherListenersShouldBeCalledDuringLifecycle() {
+ Queue states = new ConcurrentLinkedQueue<>();
+ TestUtils.Sync sync = TestUtils.sync(2);
+ Producer producer =
+ environment.producerBuilder().stream(stream)
+ .listeners(
+ context -> {
+ states.add(context.currentState());
+ if (context.currentState() == OPEN) {
+ sync.down();
+ }
+ })
+ .build();
+
+ Cli.listProducerConnections().forEach(c -> Cli.killConnection(c.clientProvidedName()));
+
+ assertThat(sync).completes();
+
+ producer.close();
+ Assertions.assertThat(states).containsExactly(OPENING, OPEN, RECOVERING, OPEN, CLOSING, CLOSED);
+ }
+
+ @Test
+ void consumerListenersShouldBeCalledDuringLifecycle() {
+ Queue states = new ConcurrentLinkedQueue<>();
+ TestUtils.Sync sync = TestUtils.sync(2);
+ Consumer consumer =
+ environment.consumerBuilder().stream(stream)
+ .messageHandler((ctx, msg) -> {})
+ .listeners(
+ context -> {
+ states.add(context.currentState());
+ if (context.currentState() == OPEN) {
+ sync.down();
+ }
+ })
+ .build();
+
+ Cli.listConsumerConnections().forEach(c -> Cli.killConnection(c.clientProvidedName()));
+
+ assertThat(sync).completes();
+
+ consumer.close();
+ Assertions.assertThat(states).containsExactly(OPENING, OPEN, RECOVERING, OPEN, CLOSING, CLOSED);
+ }
+
+ @Test
+ void listenersForNamedConsumerShouldBeCalledDuringLifecycle() {
+ Queue states = new ConcurrentLinkedQueue<>();
+ TestUtils.Sync sync = TestUtils.sync(2);
+ Consumer consumer =
+ environment.consumerBuilder().stream(stream)
+ .name("app-1")
+ .messageHandler((ctx, msg) -> {})
+ .listeners(
+ context -> {
+ states.add(context.currentState());
+ if (context.currentState() == OPEN) {
+ sync.down();
+ }
+ })
+ .build();
+
+ Cli.listProducerConnections().forEach(c -> Cli.killConnection(c.clientProvidedName()));
+
+ assertThat(sync).completes();
+
+ consumer.close();
+ Assertions.assertThat(states).containsExactly(OPENING, OPEN, RECOVERING, OPEN, CLOSING, CLOSED);
+ }
+}
diff --git a/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentOAuth2Test.java b/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentOAuth2Test.java
index b16797169a..98271753a5 100644
--- a/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentOAuth2Test.java
+++ b/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentOAuth2Test.java
@@ -29,6 +29,7 @@
import com.rabbitmq.stream.Environment;
import com.rabbitmq.stream.EnvironmentBuilder;
import com.rabbitmq.stream.Producer;
+import com.rabbitmq.stream.Resource;
import com.rabbitmq.stream.impl.StreamEnvironmentBuilder.DefaultOAuth2Configuration;
import com.rabbitmq.stream.impl.TestUtils.DisabledIfOauth2AuthBackendNotEnabled;
import com.rabbitmq.stream.impl.TestUtils.Sync;
@@ -95,14 +96,7 @@ void environmentShouldNotWorkAfterTokenExpires() throws Exception {
Sync tokenRefreshedSync = sync(2);
HttpHandler httpHandler =
oAuth2TokenHttpHandler(
- () -> {
- // if (serverCallCount.getAndIncrement() == 0) {
- return currentTimeMillis() + tokenLifetime.toMillis();
- // } else {
- // return currentTimeMillis() - 100;
- // }
- },
- tokenRefreshedSync::down);
+ () -> currentTimeMillis() + tokenLifetime.toMillis(), tokenRefreshedSync::down);
this.server = start(httpHandler, null);
try (Environment env =
@@ -124,7 +118,7 @@ void environmentShouldNotWorkAfterTokenExpires() throws Exception {
producer.send(producer.messageBuilder().build(), ctx -> {});
assertThat(consumeSync).completes();
assertThat(tokenRefreshedSync).completes();
- org.assertj.core.api.Assertions.assertThat(consumer.isConsuming());
+ org.assertj.core.api.Assertions.assertThat(consumer.state() == Resource.State.OPEN);
// stopping the token server, there won't be attempts to re-authenticate
this.server.stop(0);
@@ -157,7 +151,7 @@ void environmentShouldNotWorkAfterTokenExpires() throws Exception {
org.assertj.core.api.Assertions.assertThat(lastResponseCode)
.hasValue(CODE_PRODUCER_NOT_AVAILABLE);
- waitAtMost(() -> !consumer.isConsuming());
+ waitAtMost(() -> consumer.state() != Resource.State.OPEN);
}
}
diff --git a/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentTest.java b/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentTest.java
index f24cccd473..6126a9b303 100644
--- a/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentTest.java
+++ b/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentTest.java
@@ -14,6 +14,7 @@
// info@rabbitmq.com.
package com.rabbitmq.stream.impl;
+import static com.rabbitmq.stream.Cli.listLocatorConnections;
import static com.rabbitmq.stream.impl.Assertions.assertThat;
import static com.rabbitmq.stream.impl.TestUtils.*;
import static com.rabbitmq.stream.impl.TestUtils.CountDownLatchConditions.completed;
@@ -432,9 +433,8 @@ void shouldHaveSeveralLocatorsWhenSeveralUrisSpecifiedAndShouldRecoverThemIfClos
Supplier> locatorConnectionNamesSupplier =
() ->
- Cli.listConnections().stream()
+ listLocatorConnections().stream()
.map(ConnectionInfo::clientProvidedName)
- .filter(name -> name != null && name.contains("-locator-"))
.collect(toList());
List locatorConnectionNames = locatorConnectionNamesSupplier.get();
assertThat(locatorConnectionNames).hasSameSizeAs(uris);
diff --git a/src/test/java/com/rabbitmq/stream/impl/StreamProducerTest.java b/src/test/java/com/rabbitmq/stream/impl/StreamProducerTest.java
index 47780c26c0..6d1508fe18 100644
--- a/src/test/java/com/rabbitmq/stream/impl/StreamProducerTest.java
+++ b/src/test/java/com/rabbitmq/stream/impl/StreamProducerTest.java
@@ -23,7 +23,6 @@
import com.rabbitmq.stream.*;
import com.rabbitmq.stream.compression.Compression;
import com.rabbitmq.stream.impl.MonitoringTestUtils.ProducerInfo;
-import com.rabbitmq.stream.impl.StreamProducer.Status;
import com.rabbitmq.stream.impl.TestUtils.Sync;
import io.netty.channel.ChannelOption;
import io.netty.channel.ConnectTimeoutException;
@@ -282,7 +281,7 @@ void shouldRecoverAfterConnectionIsKilled(int subEntrySize) throws Exception {
Cli.killConnection("rabbitmq-stream-producer-0");
- waitAtMost(() -> ((StreamProducer) producer).status() == Status.NOT_AVAILABLE);
+ waitAtMost(() -> ((StreamProducer) producer).state() == Resource.State.RECOVERING);
canPublish.set(false);
assertThat(confirmed.get()).isPositive();
@@ -302,7 +301,7 @@ void shouldRecoverAfterConnectionIsKilled(int subEntrySize) throws Exception {
(published.get() - (confirmed.get() + errored.get()))));
assertThat(confirmed.get() + errored.get()).isEqualTo(published.get());
- waitAtMost(() -> ((StreamProducer) producer).status() == StreamProducer.Status.RUNNING);
+ waitAtMost(() -> ((StreamProducer) producer).state() == Resource.State.OPEN);
int confirmedAfterUnavailability = confirmed.get();
int errorAfterUnavailability = errored.get();
diff --git a/src/test/java/com/rabbitmq/stream/impl/StreamProducerUnitTest.java b/src/test/java/com/rabbitmq/stream/impl/StreamProducerUnitTest.java
index 51f320a46c..efcc4c0fef 100644
--- a/src/test/java/com/rabbitmq/stream/impl/StreamProducerUnitTest.java
+++ b/src/test/java/com/rabbitmq/stream/impl/StreamProducerUnitTest.java
@@ -15,6 +15,7 @@
package com.rabbitmq.stream.impl;
import static com.rabbitmq.stream.impl.TestUtils.waitAtMost;
+import static java.util.Collections.emptyList;
import static java.util.stream.IntStream.range;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -184,6 +185,7 @@ void confirmTimeoutTaskShouldFailMessagesAfterTimeout(
Duration.ofSeconds(10),
true,
null,
+ emptyList(),
env);
range(0, messageCount)
@@ -235,6 +237,7 @@ void enqueueTimeoutMessageShouldBeFailedWhenEnqueueTimeoutIsReached(int subEntry
enqueueTimeout,
true,
null,
+ emptyList(),
env);
AtomicBoolean confirmCalled = new AtomicBoolean(false);
@@ -276,6 +279,7 @@ void enqueueTimeoutSendingShouldBlockWhenEnqueueTimeoutIsZero(int subEntrySize)
enqueueTimeout,
true,
null,
+ emptyList(),
env);
AtomicBoolean confirmCalled = new AtomicBoolean(false);
From 835e7caff0c4d4a68c2d5e6e3860d98ae3d0c2da Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?=
<514737+acogoluegnes@users.noreply.github.com>
Date: Mon, 22 Sep 2025 17:19:01 +0200
Subject: [PATCH 2/4] Allow store offset on consumer closing
When flushing with auto tracking strategy.
---
.../com/rabbitmq/stream/impl/StreamConsumer.java | 15 +++++++++++----
.../rabbitmq/stream/impl/StreamConsumerTest.java | 2 +-
2 files changed, 12 insertions(+), 5 deletions(-)
diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java b/src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java
index 273bf61757..b86882807d 100644
--- a/src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java
+++ b/src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java
@@ -352,7 +352,7 @@ void start() {
@Override
public void store(long offset) {
- checkOpen();
+ checkNotClosed();
trackingCallback.accept(offset);
if (canTrack()) {
if (offsetBefore(this.lastRequestedStoredOffset, offset)
@@ -444,8 +444,8 @@ boolean sacActive() {
}
boolean canTrack() {
- // FIXME check the condition to be able to track
- return ((this.state() == OPENING || this.state() == OPEN)
+ // closing is OK e.g. when flushing on closing
+ return ((this.state() == OPENING || this.state() == OPEN || this.state() == CLOSING)
|| (this.trackingClient == null && this.state() == RECOVERING))
&& this.name != null;
}
@@ -526,7 +526,7 @@ void running() {
}
long storedOffset(Supplier clientSupplier) {
- checkOpen();
+ checkNotClosed();
if (canTrack()) {
return OffsetTrackingUtils.storedOffset(clientSupplier, this.name, this.stream);
} else if (this.name == null) {
@@ -607,4 +607,11 @@ void markRecovering() {
void markOpen() {
state(OPEN);
}
+
+ private void checkNotClosed() {
+ if (state() == CLOSED) {
+ // will throw the appropriate exception
+ checkOpen();
+ }
+ }
}
diff --git a/src/test/java/com/rabbitmq/stream/impl/StreamConsumerTest.java b/src/test/java/com/rabbitmq/stream/impl/StreamConsumerTest.java
index 583c448e59..153af5fd20 100644
--- a/src/test/java/com/rabbitmq/stream/impl/StreamConsumerTest.java
+++ b/src/test/java/com/rabbitmq/stream/impl/StreamConsumerTest.java
@@ -971,7 +971,7 @@ void methodsShouldThrowExceptionWhenConsumerIsClosed() {
ThrowingCallable[] calls =
new ThrowingCallable[] {() -> consumer.store(1), () -> consumer.storedOffset()};
Arrays.stream(calls)
- .forEach(call -> assertThatThrownBy(call).isInstanceOf(IllegalStateException.class));
+ .forEach(call -> assertThatThrownBy(call).isInstanceOf(ResourceClosedException.class));
}
@Test
From 0ac78430ede0581a6bee757fb1f6e17d42a231b8 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?=
<514737+acogoluegnes@users.noreply.github.com>
Date: Mon, 22 Sep 2025 17:40:51 +0200
Subject: [PATCH 3/4] Squash spotbugs warning
---
src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java | 2 --
src/main/java/com/rabbitmq/stream/impl/StreamProducer.java | 2 --
2 files changed, 4 deletions(-)
diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java b/src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java
index b86882807d..d7d28d3287 100644
--- a/src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java
+++ b/src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java
@@ -29,7 +29,6 @@
import com.rabbitmq.stream.impl.StreamEnvironment.LocatorNotAvailableException;
import com.rabbitmq.stream.impl.StreamEnvironment.TrackingConsumerRegistration;
import com.rabbitmq.stream.impl.Utils.CompositeConsumerUpdateListener;
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.time.Duration;
import java.util.List;
import java.util.Map;
@@ -69,7 +68,6 @@ final class StreamConsumer extends ResourceBase implements Consumer {
private final OffsetSpecification initialOffsetSpecification;
private final Lock lock = new ReentrantLock();
- @SuppressFBWarnings("CT_CONSTRUCTOR_THROW")
StreamConsumer(
String stream,
OffsetSpecification offsetSpecification,
diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java b/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java
index 38ad4eacf6..e3c3626d07 100644
--- a/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java
+++ b/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java
@@ -27,7 +27,6 @@
import com.rabbitmq.stream.compression.CompressionCodec;
import com.rabbitmq.stream.impl.Client.Response;
import com.rabbitmq.stream.impl.ProducerUtils.AccumulatedEntity;
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.netty.buffer.ByteBuf;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
@@ -85,7 +84,6 @@ final class StreamProducer extends ResourceBase implements Producer {
private final short publishVersion;
private final Lock lock = new ReentrantLock();
- @SuppressFBWarnings("CT_CONSTRUCTOR_THROW")
StreamProducer(
String name,
String stream,
From 5b0e46322df09e00d1223717a315fb0486069584 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?=
<514737+acogoluegnes@users.noreply.github.com>
Date: Mon, 22 Sep 2025 16:50:11 +0200
Subject: [PATCH 4/4] Fix tracking check
Conflicts:
src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java
---
src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java b/src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java
index d7d28d3287..18492aa74c 100644
--- a/src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java
+++ b/src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java
@@ -443,8 +443,8 @@ boolean sacActive() {
boolean canTrack() {
// closing is OK e.g. when flushing on closing
- return ((this.state() == OPENING || this.state() == OPEN || this.state() == CLOSING)
- || (this.trackingClient == null && this.state() == RECOVERING))
+ return (this.state() == OPENING || this.state() == OPEN || this.state() == CLOSING)
+ && (this.trackingClient != null && this.state() != RECOVERING)
&& this.name != null;
}