From b96a87d49895ffeec7b73e333645b4b0b43525ab Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= <514737+acogoluegnes@users.noreply.github.com> Date: Mon, 22 Sep 2025 16:45:43 +0200 Subject: [PATCH 1/3] Add resource listeners to track producer/consumer state --- .../com/rabbitmq/stream/ConsumerBuilder.java | 9 ++ .../stream/InvalidStateException.java | 26 ++++ .../com/rabbitmq/stream/ProducerBuilder.java | 9 ++ .../java/com/rabbitmq/stream/Resource.java | 89 ++++++++++++ .../stream/ResourceClosedException.java | 22 +++ .../com/rabbitmq/stream/StreamException.java | 4 +- .../stream/impl/ConsumersCoordinator.java | 14 +- .../rabbitmq/stream/impl/ResourceBase.java | 60 ++++++++ .../stream/impl/StateEventSupport.java | 74 ++++++++++ .../rabbitmq/stream/impl/StreamConsumer.java | 72 ++++------ .../stream/impl/StreamConsumerBuilder.java | 15 +- .../rabbitmq/stream/impl/StreamProducer.java | 42 +++--- .../stream/impl/StreamProducerBuilder.java | 16 +++ .../stream/impl/SuperStreamProducer.java | 1 - src/test/java/com/rabbitmq/stream/Cli.java | 20 +++ .../stream/impl/ResourceListenerTest.java | 133 ++++++++++++++++++ .../impl/StreamEnvironmentOAuth2Test.java | 14 +- .../stream/impl/StreamEnvironmentTest.java | 4 +- .../stream/impl/StreamProducerTest.java | 5 +- .../stream/impl/StreamProducerUnitTest.java | 4 + 20 files changed, 542 insertions(+), 91 deletions(-) create mode 100644 src/main/java/com/rabbitmq/stream/InvalidStateException.java create mode 100644 src/main/java/com/rabbitmq/stream/Resource.java create mode 100644 src/main/java/com/rabbitmq/stream/ResourceClosedException.java create mode 100644 src/main/java/com/rabbitmq/stream/impl/ResourceBase.java create mode 100644 src/main/java/com/rabbitmq/stream/impl/StateEventSupport.java create mode 100644 src/test/java/com/rabbitmq/stream/impl/ResourceListenerTest.java diff --git a/src/main/java/com/rabbitmq/stream/ConsumerBuilder.java b/src/main/java/com/rabbitmq/stream/ConsumerBuilder.java index 29fc8646e9..cf49d5d57d 100644 --- a/src/main/java/com/rabbitmq/stream/ConsumerBuilder.java +++ b/src/main/java/com/rabbitmq/stream/ConsumerBuilder.java @@ -126,6 +126,15 @@ public interface ConsumerBuilder { */ ConsumerBuilder subscriptionListener(SubscriptionListener subscriptionListener); + /** + * Add {@link Resource.StateListener}s to the consumer. + * + * @param listeners listeners + * @return this builder instance + * @since 1.3.0 + */ + ConsumerBuilder listeners(Resource.StateListener... listeners); + /** * Enable {@link ManualTrackingStrategy}. * diff --git a/src/main/java/com/rabbitmq/stream/InvalidStateException.java b/src/main/java/com/rabbitmq/stream/InvalidStateException.java new file mode 100644 index 0000000000..07dac19696 --- /dev/null +++ b/src/main/java/com/rabbitmq/stream/InvalidStateException.java @@ -0,0 +1,26 @@ +// Copyright (c) 2025 Broadcom. All Rights Reserved. +// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. +// +// This software, the RabbitMQ Stream Java client library, is dual-licensed under the +// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL"). +// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. +package com.rabbitmq.stream; + +/** + * Exception thrown when a resource is not in an appropriate state. + * + *

An example is a connection that is initializing. + */ +public class InvalidStateException extends StreamException { + public InvalidStateException(String format, Object... args) { + super(format, args); + } +} diff --git a/src/main/java/com/rabbitmq/stream/ProducerBuilder.java b/src/main/java/com/rabbitmq/stream/ProducerBuilder.java index b89524de96..2862a95ebf 100644 --- a/src/main/java/com/rabbitmq/stream/ProducerBuilder.java +++ b/src/main/java/com/rabbitmq/stream/ProducerBuilder.java @@ -192,6 +192,15 @@ public interface ProducerBuilder { */ ProducerBuilder filterValue(Function filterValueExtractor); + /** + * Add {@link Resource.StateListener}s to the producer. + * + * @param listeners listeners + * @return this builder instance + * @since 1.3.0 + */ + ProducerBuilder listeners(Resource.StateListener... listeners); + /** * Create the {@link Producer} instance. * diff --git a/src/main/java/com/rabbitmq/stream/Resource.java b/src/main/java/com/rabbitmq/stream/Resource.java new file mode 100644 index 0000000000..c3df79bd3e --- /dev/null +++ b/src/main/java/com/rabbitmq/stream/Resource.java @@ -0,0 +1,89 @@ +// Copyright (c) 2024-2025 Broadcom. All Rights Reserved. +// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. +// +// This software, the RabbitMQ Stream Java client library, is dual-licensed under the +// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL"). +// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. +package com.rabbitmq.stream; + +/** + * Marker interface for {@link com.rabbitmq.stream.Resource}-like classes. + * + *

Instances of these classes have different states during their lifecycle: open, recovering, + * closed, etc. Application can be interested in taking some actions for a given state (e.g. + * stopping publishing when a {@link com.rabbitmq.stream.Producer} is recovering after a connection + * problem and resuming publishing when it is open again). + * + * @see com.rabbitmq.stream.Producer + * @see com.rabbitmq.stream.Consumer + */ +public interface Resource { + + /** + * Application listener for a {@link com.rabbitmq.stream.Resource}. + * + *

They are registered at creation time. + * + * @see + * com.rabbitmq.stream.ProducerBuilder#listeners(com.rabbitmq.stream.Resource.StateListener...) + * @see + * com.rabbitmq.stream.ConsumerBuilder#listeners(com.rabbitmq.stream.Resource.StateListener...) + */ + @FunctionalInterface + interface StateListener { + + /** + * Handle state change. + * + * @param context state change context + */ + void handle(Context context); + } + + /** Context of a resource state change. */ + interface Context { + + /** + * The resource instance. + * + * @return resource instance + */ + Resource resource(); + + /** + * The previous state of the resource. + * + * @return previous state + */ + State previousState(); + + /** + * The current (new) state of the resource. + * + * @return current state + */ + State currentState(); + } + + /** Resource state. */ + enum State { + /** The resource is currently opening. */ + OPENING, + /** The resource is open and functional. */ + OPEN, + /** The resource is recovering. */ + RECOVERING, + /** The resource is closing. */ + CLOSING, + /** The resource is closed. */ + CLOSED + } +} diff --git a/src/main/java/com/rabbitmq/stream/ResourceClosedException.java b/src/main/java/com/rabbitmq/stream/ResourceClosedException.java new file mode 100644 index 0000000000..871153f840 --- /dev/null +++ b/src/main/java/com/rabbitmq/stream/ResourceClosedException.java @@ -0,0 +1,22 @@ +// Copyright (c) 2025 Broadcom. All Rights Reserved. +// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. +// +// This software, the RabbitMQ Stream Java client library, is dual-licensed under the +// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL"). +// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. +package com.rabbitmq.stream; + +/** Exception thrown when a resource is not usable because it is closed. */ +public class ResourceClosedException extends InvalidStateException { + public ResourceClosedException(String format, Object... args) { + super(format, args); + } +} diff --git a/src/main/java/com/rabbitmq/stream/StreamException.java b/src/main/java/com/rabbitmq/stream/StreamException.java index 4830512009..9d34f49e1f 100644 --- a/src/main/java/com/rabbitmq/stream/StreamException.java +++ b/src/main/java/com/rabbitmq/stream/StreamException.java @@ -25,8 +25,8 @@ public class StreamException extends RuntimeException { private final short code; - public StreamException(String message) { - super(message); + public StreamException(String format, Object... args) { + super(String.format(format, args)); this.code = -1; } diff --git a/src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java b/src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java index fcff1116f3..d6d65c5dc3 100644 --- a/src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java +++ b/src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java @@ -506,15 +506,15 @@ SubscriptionState state() { return this.state.get(); } - private void markConsuming() { + private void markOpen() { if (this.consumer != null) { - this.consumer.consuming(); + this.consumer.markOpen(); } } - private void markNotConsuming() { + private void markRecovering() { if (this.consumer != null) { - this.consumer.notConsuming(); + this.consumer.markRecovering(); } } @@ -712,7 +712,7 @@ private ClientSubscriptionsManager( "Subscription connection has {} consumer(s) over {} stream(s) to recover", this.subscriptionTrackers.stream().filter(Objects::nonNull).count(), this.streamToStreamSubscriptions.size()); - iterate(this.subscriptionTrackers, SubscriptionTracker::markNotConsuming); + iterate(this.subscriptionTrackers, SubscriptionTracker::markRecovering); environment .scheduledExecutorService() .execute( @@ -787,7 +787,7 @@ private ClientSubscriptionsManager( } if (affectedSubscriptions != null && !affectedSubscriptions.isEmpty()) { - iterate(affectedSubscriptions, SubscriptionTracker::markNotConsuming); + iterate(affectedSubscriptions, SubscriptionTracker::markRecovering); environment .scheduledExecutorService() .execute( @@ -1146,7 +1146,7 @@ void add( throw e; } subscriptionTracker.state(SubscriptionState.ACTIVE); - subscriptionTracker.markConsuming(); + subscriptionTracker.markOpen(); LOGGER.debug("Subscribed to '{}'", subscriptionTracker.stream); } finally { this.subscriptionManagerLock.unlock(); diff --git a/src/main/java/com/rabbitmq/stream/impl/ResourceBase.java b/src/main/java/com/rabbitmq/stream/impl/ResourceBase.java new file mode 100644 index 0000000000..9e86570e89 --- /dev/null +++ b/src/main/java/com/rabbitmq/stream/impl/ResourceBase.java @@ -0,0 +1,60 @@ +// Copyright (c) 2024-2025 Broadcom. All Rights Reserved. +// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. +// +// This software, the RabbitMQ Stream Java client library, is dual-licensed under the +// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL"). +// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. +package com.rabbitmq.stream.impl; + +import static com.rabbitmq.stream.Resource.State.CLOSED; +import static com.rabbitmq.stream.Resource.State.OPEN; +import static com.rabbitmq.stream.Resource.State.OPENING; + +import com.rabbitmq.stream.InvalidStateException; +import com.rabbitmq.stream.Resource; +import com.rabbitmq.stream.ResourceClosedException; +import java.util.List; +import java.util.concurrent.atomic.AtomicReference; + +abstract class ResourceBase implements Resource { + + private final AtomicReference state = new AtomicReference<>(); + private final StateEventSupport stateEventSupport; + + ResourceBase(List listeners) { + this.stateEventSupport = new StateEventSupport(listeners); + this.state(OPENING); + } + + protected void checkOpen() { + State state = this.state.get(); + if (state == CLOSED) { + throw new ResourceClosedException("Resource is closed"); + } else if (state != OPEN) { + throw new InvalidStateException("Resource is not open, current state is %s", state.name()); + } + } + + protected State state() { + return this.state.get(); + } + + protected void state(Resource.State state) { + Resource.State previousState = this.state.getAndSet(state); + if (state != previousState) { + this.dispatch(previousState, state); + } + } + + private void dispatch(State previous, State current) { + this.stateEventSupport.dispatch(this, previous, current); + } +} diff --git a/src/main/java/com/rabbitmq/stream/impl/StateEventSupport.java b/src/main/java/com/rabbitmq/stream/impl/StateEventSupport.java new file mode 100644 index 0000000000..4f0e08ce05 --- /dev/null +++ b/src/main/java/com/rabbitmq/stream/impl/StateEventSupport.java @@ -0,0 +1,74 @@ +// Copyright (c) 2024-2025 Broadcom. All Rights Reserved. +// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. +// +// This software, the RabbitMQ Stream Java client library, is dual-licensed under the +// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL"). +// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. +package com.rabbitmq.stream.impl; + +import com.rabbitmq.stream.Resource; +import java.util.List; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +final class StateEventSupport { + + private static final Logger LOGGER = LoggerFactory.getLogger(StateEventSupport.class); + + private final List listeners; + + StateEventSupport(List listeners) { + this.listeners = List.copyOf(listeners); + } + + void dispatch(Resource resource, Resource.State previousState, Resource.State currentState) { + if (!this.listeners.isEmpty()) { + Resource.Context context = new DefaultContext(resource, previousState, currentState); + this.listeners.forEach( + l -> { + try { + l.handle(context); + } catch (Exception e) { + LOGGER.warn("Error in resource listener", e); + } + }); + } + } + + private static class DefaultContext implements Resource.Context { + + private final Resource resource; + private final Resource.State previousState; + private final Resource.State currentState; + + private DefaultContext( + Resource resource, Resource.State previousState, Resource.State currentState) { + this.resource = resource; + this.previousState = previousState; + this.currentState = currentState; + } + + @Override + public Resource resource() { + return this.resource; + } + + @Override + public Resource.State previousState() { + return this.previousState; + } + + @Override + public Resource.State currentState() { + return this.currentState; + } + } +} diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java b/src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java index 1c868002e6..273bf61757 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java @@ -15,18 +15,23 @@ package com.rabbitmq.stream.impl; import static com.rabbitmq.stream.BackOffDelayPolicy.fixedWithInitialDelay; +import static com.rabbitmq.stream.Resource.State.CLOSED; +import static com.rabbitmq.stream.Resource.State.CLOSING; +import static com.rabbitmq.stream.Resource.State.OPEN; +import static com.rabbitmq.stream.Resource.State.OPENING; +import static com.rabbitmq.stream.Resource.State.RECOVERING; import static com.rabbitmq.stream.impl.AsyncRetry.asyncRetry; import static com.rabbitmq.stream.impl.Utils.offsetBefore; import static java.lang.String.format; import com.rabbitmq.stream.*; -import com.rabbitmq.stream.MessageHandler.Context; import com.rabbitmq.stream.impl.StreamConsumerBuilder.TrackingConfiguration; import com.rabbitmq.stream.impl.StreamEnvironment.LocatorNotAvailableException; import com.rabbitmq.stream.impl.StreamEnvironment.TrackingConsumerRegistration; import com.rabbitmq.stream.impl.Utils.CompositeConsumerUpdateListener; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.time.Duration; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.concurrent.*; @@ -41,7 +46,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -class StreamConsumer implements Consumer { +final class StreamConsumer extends ResourceBase implements Consumer { private static final AtomicLong ID_SEQUENCE = new AtomicLong(0); @@ -57,14 +62,12 @@ class StreamConsumer implements Consumer { private volatile Runnable closingCallback; private volatile Client trackingClient; private volatile Client subscriptionClient; - private volatile Status status; private volatile long lastRequestedStoredOffset = 0; private final AtomicBoolean nothingStoredYet = new AtomicBoolean(true); private volatile boolean sacActive; private final boolean sac; private final OffsetSpecification initialOffsetSpecification; private final Lock lock = new ReentrantLock(); - private volatile boolean consuming; @SuppressFBWarnings("CT_CONSTRUCTOR_THROW") StreamConsumer( @@ -78,7 +81,9 @@ class StreamConsumer implements Consumer { SubscriptionListener subscriptionListener, Map subscriptionProperties, ConsumerUpdateListener consumerUpdateListener, - ConsumerFlowStrategy flowStrategy) { + ConsumerFlowStrategy flowStrategy, + List listeners) { + super(listeners); if (Utils.filteringEnabled(subscriptionProperties) && !environment.filteringSupported()) { throw new IllegalArgumentException( "Filtering is not supported by the broker " @@ -103,7 +108,7 @@ class StreamConsumer implements Consumer { trackingClosingCallback = trackingConsumerRegistration.closingCallback(); - java.util.function.Consumer postMessageProcessingCallback = + java.util.function.Consumer postMessageProcessingCallback = trackingConsumerRegistration.postMessageProcessingCallback(); if (postMessageProcessingCallback == null) { // no callback, no need to decorate @@ -225,7 +230,7 @@ class StreamConsumer implements Consumer { Runnable init = () -> { - this.status = Status.INITIALIZING; + this.state(State.OPENING); this.closingCallback = environment.registerConsumer( this, @@ -237,8 +242,7 @@ class StreamConsumer implements Consumer { closedAwareMessageHandler, Map.copyOf(subscriptionProperties), flowStrategy); - - this.status = Status.RUNNING; + this.state(OPEN); }; if (lazyInit) { this.initCallback = init; @@ -250,7 +254,6 @@ class StreamConsumer implements Consumer { this.closed.set(true); throw e; } - this.consuming = true; } static OffsetSpecification getStoredOffset( @@ -349,7 +352,7 @@ void start() { @Override public void store(long offset) { - checkNotClosed(); + checkOpen(); trackingCallback.accept(offset); if (canTrack()) { if (offsetBefore(this.lastRequestedStoredOffset, offset) @@ -441,14 +444,16 @@ boolean sacActive() { } boolean canTrack() { - return ((this.status == Status.INITIALIZING || this.status == Status.RUNNING) - || (this.trackingClient == null && this.status == Status.NOT_AVAILABLE)) + // FIXME check the condition to be able to track + return ((this.state() == OPENING || this.state() == OPEN) + || (this.trackingClient == null && this.state() == RECOVERING)) && this.name != null; } @Override public void close() { if (closed.compareAndSet(false, true)) { + this.state(CLOSING); this.environment.removeConsumer(this); closeFromEnvironment(); } @@ -459,7 +464,7 @@ void closeFromEnvironment() { LOGGER.debug("Calling consumer {} closing callback (stream {})", this.id, this.stream); this.closingCallback.run(); closed.set(true); - this.status = Status.CLOSED; + this.state(CLOSED); LOGGER.debug("Closed consumer successfully"); } @@ -467,7 +472,7 @@ void closeAfterStreamDeletion() { if (closed.compareAndSet(false, true)) { this.maybeNotifyActiveToInactiveSac(); this.environment.removeConsumer(this); - this.status = Status.CLOSED; + this.state(CLOSED); } } @@ -499,11 +504,11 @@ private void maybeNotifyActiveToInactiveSac() { } } - synchronized void unavailable() { + void unavailable() { Utils.lock( this.lock, () -> { - this.status = Status.NOT_AVAILABLE; + this.state(RECOVERING); this.trackingClient = null; }); } @@ -517,11 +522,11 @@ void unlock() { } void running() { - this.status = Status.RUNNING; + this.state(OPEN); } long storedOffset(Supplier clientSupplier) { - checkNotClosed(); + checkOpen(); if (canTrack()) { return OffsetTrackingUtils.storedOffset(clientSupplier, this.name, this.stream); } else if (this.name == null) { @@ -530,8 +535,8 @@ long storedOffset(Supplier clientSupplier) { } else { throw new IllegalStateException( format( - "Not possible to query offset for consumer %s on stream %s for now, consumer status is %s", - this.name, this.stream, this.status.name())); + "Not possible to query offset for consumer %s on stream %s for now, consumer state is %s", + this.name, this.stream, this.state().name())); } } @@ -544,13 +549,6 @@ String stream() { return this.stream; } - enum Status { - INITIALIZING, - RUNNING, - NOT_AVAILABLE, - CLOSED - } - @Override public boolean equals(Object o) { if (this == o) { @@ -589,12 +587,6 @@ public String toString() { + "}"; } - private void checkNotClosed() { - if (this.status == Status.CLOSED) { - throw new IllegalStateException("This producer instance has been closed"); - } - } - long id() { return this.id; } @@ -608,15 +600,11 @@ String subscriptionConnectionName() { } } - void notConsuming() { - this.consuming = false; - } - - void consuming() { - this.consuming = true; + void markRecovering() { + state(RECOVERING); } - boolean isConsuming() { - return this.consuming; + void markOpen() { + state(OPEN); } } diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamConsumerBuilder.java b/src/main/java/com/rabbitmq/stream/impl/StreamConsumerBuilder.java index ec00136800..8d769f8a46 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamConsumerBuilder.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamConsumerBuilder.java @@ -22,6 +22,7 @@ import java.lang.reflect.Field; import java.lang.reflect.Modifier; import java.time.Duration; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Map; @@ -48,6 +49,7 @@ class StreamConsumerBuilder implements ConsumerBuilder { private final DefaultFlowConfiguration flowConfiguration = new DefaultFlowConfiguration(this); private ConsumerUpdateListener consumerUpdateListener; private DefaultFilterConfiguration filterConfiguration; + private final List listeners = new ArrayList<>(); public StreamConsumerBuilder(StreamEnvironment environment) { this.environment = environment; @@ -112,6 +114,16 @@ public ConsumerBuilder subscriptionListener(SubscriptionListener subscriptionLis return this; } + @Override + public ConsumerBuilder listeners(Resource.StateListener... listeners) { + if (listeners == null || listeners.length == 0) { + this.listeners.clear(); + } else { + this.listeners.addAll(List.of(listeners)); + } + return this; + } + @Override @SuppressFBWarnings("AT_STALE_THREAD_WRITE_OF_PRIMITIVE") public ManualTrackingStrategy manualTrackingStrategy() { @@ -241,7 +253,8 @@ public Consumer build() { this.subscriptionListener, this.subscriptionProperties, this.consumerUpdateListener, - this.flowConfiguration.strategy); + this.flowConfiguration.strategy, + this.listeners); environment.addConsumer((StreamConsumer) consumer); } else { if (Utils.isSac(this.subscriptionProperties)) { diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java b/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java index 516afe16f3..38ad4eacf6 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java @@ -15,6 +15,10 @@ package com.rabbitmq.stream.impl; import static com.rabbitmq.stream.Constants.*; +import static com.rabbitmq.stream.Resource.State.CLOSED; +import static com.rabbitmq.stream.Resource.State.CLOSING; +import static com.rabbitmq.stream.Resource.State.OPEN; +import static com.rabbitmq.stream.Resource.State.RECOVERING; import static com.rabbitmq.stream.impl.Utils.formatConstant; import static com.rabbitmq.stream.impl.Utils.namedRunnable; @@ -50,7 +54,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -class StreamProducer implements Producer { +final class StreamProducer extends ResourceBase implements Producer { private static final AtomicLong ID_SEQUENCE = new AtomicLong(0); @@ -77,7 +81,6 @@ class StreamProducer implements Producer { private final boolean retryOnRecovery; private volatile Client client; private volatile byte publisherId; - private volatile Status status; private volatile ScheduledFuture confirmTimeoutFuture; private final short publishVersion; private final Lock lock = new ReentrantLock(); @@ -96,7 +99,9 @@ class StreamProducer implements Producer { Duration enqueueTimeout, boolean retryOnRecovery, Function filterValueExtractor, + List listeners, StreamEnvironment environment) { + super(listeners); if (filterValueExtractor != null && !environment.filteringSupported()) { throw new IllegalArgumentException( "Filtering is not supported by the broker " @@ -204,7 +209,7 @@ public int fragmentLength(Object entity) { if (canSend()) { this.accumulator.flush(false); } - if (status != Status.CLOSED) { + if (this.state() != CLOSED) { environment .scheduledExecutorService() .schedule( @@ -242,7 +247,7 @@ public int fragmentLength(Object entity) { LOGGER.info("Error while executing confirm timeout check task: {}", e.getCause()); } - if (this.status != Status.CLOSED) { + if (this.state() != CLOSED) { this.confirmTimeoutFuture = this.environment .scheduledExecutorService() @@ -269,7 +274,7 @@ public int fragmentLength(Object entity) { confirmTimeout.toMillis(), TimeUnit.MILLISECONDS); } - this.status = Status.RUNNING; + this.state(State.OPEN); } private Runnable confirmTimeoutTask(Duration confirmTimeout) { @@ -408,10 +413,10 @@ private void doSend(Message message, ConfirmationHandler confirmationHandler) { } private void failPublishing(Message message, ConfirmationHandler confirmationHandler) { - if (this.status == Status.NOT_AVAILABLE) { + if (this.state() == RECOVERING) { confirmationHandler.handle( new ConfirmationStatus(message, false, CODE_PRODUCER_NOT_AVAILABLE)); - } else if (this.status == Status.CLOSED) { + } else if (this.state() == CLOSED) { confirmationHandler.handle(new ConfirmationStatus(message, false, CODE_PRODUCER_CLOSED)); } else { confirmationHandler.handle( @@ -420,13 +425,14 @@ private void failPublishing(Message message, ConfirmationHandler confirmationHan } boolean canSend() { - return this.status == Status.RUNNING; + return this.state() == OPEN; } @Override public void close() { if (this.closed.compareAndSet(false, true)) { - if (this.status == Status.RUNNING && this.client != null) { + this.state(CLOSING); + if (this.state() == OPEN && this.client != null) { LOGGER.debug("Deleting producer {}", this.publisherId); Response response = this.client.deletePublisher(this.publisherId); if (!response.isOk()) { @@ -449,7 +455,7 @@ void closeFromEnvironment() { this.closingCallback.run(); cancelConfirmTimeoutTask(); this.closed.set(true); - this.status = Status.CLOSED; + this.state(CLOSED); LOGGER.debug("Closed publisher {} successfully", this.publisherId); } @@ -467,7 +473,7 @@ void closeAfterStreamDeletion(short code) { } cancelConfirmTimeoutTask(); this.environment.removeProducer(this); - this.status = Status.CLOSED; + this.state(CLOSED); } } @@ -491,7 +497,7 @@ boolean isOpen() { } void unavailable() { - this.status = Status.NOT_AVAILABLE; + this.state(RECOVERING); } void running() { @@ -560,7 +566,7 @@ void running() { } } }); - this.status = Status.RUNNING; + this.state(OPEN); } void setClient(Client client) { @@ -571,16 +577,6 @@ void setPublisherId(byte publisherId) { this.executeInLock(() -> this.publisherId = publisherId); } - Status status() { - return this.status; - } - - enum Status { - RUNNING, - NOT_AVAILABLE, - CLOSED - } - @Override public boolean equals(Object o) { if (this == o) { diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamProducerBuilder.java b/src/main/java/com/rabbitmq/stream/impl/StreamProducerBuilder.java index b44290868c..80e54ff238 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamProducerBuilder.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamProducerBuilder.java @@ -17,12 +17,15 @@ import com.rabbitmq.stream.Message; import com.rabbitmq.stream.Producer; import com.rabbitmq.stream.ProducerBuilder; +import com.rabbitmq.stream.Resource; import com.rabbitmq.stream.RoutingStrategy; import com.rabbitmq.stream.StreamException; import com.rabbitmq.stream.compression.Compression; import java.lang.reflect.Field; import java.lang.reflect.Modifier; import java.time.Duration; +import java.util.ArrayList; +import java.util.List; import java.util.function.Function; import java.util.function.ToIntFunction; @@ -59,6 +62,8 @@ class StreamProducerBuilder implements ProducerBuilder { private boolean dynamicBatch = DEFAULT_DYNAMIC_BATCH; + private final List listeners = new ArrayList<>(); + StreamProducerBuilder(StreamEnvironment environment) { this.environment = environment; } @@ -158,6 +163,16 @@ public ProducerBuilder filterValue(Function filterValueExtracto return this; } + @Override + public ProducerBuilder listeners(Resource.StateListener... listeners) { + if (listeners == null || listeners.length == 0) { + this.listeners.clear(); + } else { + this.listeners.addAll(List.of(listeners)); + } + return this; + } + @Override public RoutingConfiguration routing(Function routingKeyExtractor) { this.routingConfiguration = new DefaultRoutingConfiguration(this); @@ -219,6 +234,7 @@ public Producer build() { enqueueTimeout, retryOnRecovery, filterValueExtractor, + listeners, environment); this.environment.addProducer((StreamProducer) producer); } else { diff --git a/src/main/java/com/rabbitmq/stream/impl/SuperStreamProducer.java b/src/main/java/com/rabbitmq/stream/impl/SuperStreamProducer.java index 91cc4722f0..7ce09a246e 100644 --- a/src/main/java/com/rabbitmq/stream/impl/SuperStreamProducer.java +++ b/src/main/java/com/rabbitmq/stream/impl/SuperStreamProducer.java @@ -120,7 +120,6 @@ public void send(Message message, ConfirmationHandler confirmationHandler) { producer(streams.get(0)).send(message, confirmationHandler); } else { for (int i = 0; i < streams.size(); i++) { - Producer producer = producer(streams.get(i)); producer(streams.get(i)).send(messageInterceptor.apply(i, message), confirmationHandler); } } diff --git a/src/test/java/com/rabbitmq/stream/Cli.java b/src/test/java/com/rabbitmq/stream/Cli.java index bb5298bb36..279eaa6494 100644 --- a/src/test/java/com/rabbitmq/stream/Cli.java +++ b/src/test/java/com/rabbitmq/stream/Cli.java @@ -154,6 +154,26 @@ public static List listConnections() { return connectionInfoList; } + public static List listLocatorConnections() { + return listConnections().stream() + .filter(c -> c.clientProvidedName() != null && c.clientProvidedName().contains("-locator-")) + .collect(Collectors.toList()); + } + + public static List listConsumerConnections() { + return listConnections().stream() + .filter( + c -> c.clientProvidedName() != null && c.clientProvidedName().contains("-consumer-")) + .collect(Collectors.toList()); + } + + public static List listProducerConnections() { + return listConnections().stream() + .filter( + c -> c.clientProvidedName() != null && c.clientProvidedName().contains("-producer-")) + .collect(Collectors.toList()); + } + private static Map buildClientProperties(String[] fields) { String clientPropertiesString = fields[1]; clientPropertiesString = diff --git a/src/test/java/com/rabbitmq/stream/impl/ResourceListenerTest.java b/src/test/java/com/rabbitmq/stream/impl/ResourceListenerTest.java new file mode 100644 index 0000000000..40a2e3d020 --- /dev/null +++ b/src/test/java/com/rabbitmq/stream/impl/ResourceListenerTest.java @@ -0,0 +1,133 @@ +// Copyright (c) 2025 Broadcom. All Rights Reserved. +// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. +// +// This software, the RabbitMQ Stream Java client library, is dual-licensed under the +// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL"). +// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. +package com.rabbitmq.stream.impl; + +import static com.rabbitmq.stream.Resource.State.CLOSED; +import static com.rabbitmq.stream.Resource.State.CLOSING; +import static com.rabbitmq.stream.Resource.State.OPEN; +import static com.rabbitmq.stream.Resource.State.OPENING; +import static com.rabbitmq.stream.Resource.State.RECOVERING; +import static com.rabbitmq.stream.impl.Assertions.assertThat; + +import com.rabbitmq.stream.BackOffDelayPolicy; +import com.rabbitmq.stream.Cli; +import com.rabbitmq.stream.Consumer; +import com.rabbitmq.stream.Environment; +import com.rabbitmq.stream.Producer; +import com.rabbitmq.stream.Resource; +import io.netty.channel.EventLoopGroup; +import java.time.Duration; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +@StreamTestInfrastructure +public class ResourceListenerTest { + + String stream; + EventLoopGroup eventLoopGroup; + Environment environment; + + @BeforeEach + void init() { + environment = + Environment.builder() + .recoveryBackOffDelayPolicy(BackOffDelayPolicy.fixed(Duration.ofSeconds(1))) + .netty() + .eventLoopGroup(eventLoopGroup) + .environmentBuilder() + .build(); + } + + @AfterEach + void tearDown() { + environment.close(); + } + + @Test + void publisherListenersShouldBeCalledDuringLifecycle() { + Queue states = new ConcurrentLinkedQueue<>(); + TestUtils.Sync sync = TestUtils.sync(2); + Producer producer = + environment.producerBuilder().stream(stream) + .listeners( + context -> { + states.add(context.currentState()); + if (context.currentState() == OPEN) { + sync.down(); + } + }) + .build(); + + Cli.listProducerConnections().forEach(c -> Cli.killConnection(c.clientProvidedName())); + + assertThat(sync).completes(); + + producer.close(); + Assertions.assertThat(states).containsExactly(OPENING, OPEN, RECOVERING, OPEN, CLOSING, CLOSED); + } + + @Test + void consumerListenersShouldBeCalledDuringLifecycle() { + Queue states = new ConcurrentLinkedQueue<>(); + TestUtils.Sync sync = TestUtils.sync(2); + Consumer consumer = + environment.consumerBuilder().stream(stream) + .messageHandler((ctx, msg) -> {}) + .listeners( + context -> { + states.add(context.currentState()); + if (context.currentState() == OPEN) { + sync.down(); + } + }) + .build(); + + Cli.listConsumerConnections().forEach(c -> Cli.killConnection(c.clientProvidedName())); + + assertThat(sync).completes(); + + consumer.close(); + Assertions.assertThat(states).containsExactly(OPENING, OPEN, RECOVERING, OPEN, CLOSING, CLOSED); + } + + @Test + void listenersForNamedConsumerShouldBeCalledDuringLifecycle() { + Queue states = new ConcurrentLinkedQueue<>(); + TestUtils.Sync sync = TestUtils.sync(2); + Consumer consumer = + environment.consumerBuilder().stream(stream) + .name("app-1") + .messageHandler((ctx, msg) -> {}) + .listeners( + context -> { + states.add(context.currentState()); + if (context.currentState() == OPEN) { + sync.down(); + } + }) + .build(); + + Cli.listProducerConnections().forEach(c -> Cli.killConnection(c.clientProvidedName())); + + assertThat(sync).completes(); + + consumer.close(); + Assertions.assertThat(states).containsExactly(OPENING, OPEN, RECOVERING, OPEN, CLOSING, CLOSED); + } +} diff --git a/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentOAuth2Test.java b/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentOAuth2Test.java index b16797169a..98271753a5 100644 --- a/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentOAuth2Test.java +++ b/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentOAuth2Test.java @@ -29,6 +29,7 @@ import com.rabbitmq.stream.Environment; import com.rabbitmq.stream.EnvironmentBuilder; import com.rabbitmq.stream.Producer; +import com.rabbitmq.stream.Resource; import com.rabbitmq.stream.impl.StreamEnvironmentBuilder.DefaultOAuth2Configuration; import com.rabbitmq.stream.impl.TestUtils.DisabledIfOauth2AuthBackendNotEnabled; import com.rabbitmq.stream.impl.TestUtils.Sync; @@ -95,14 +96,7 @@ void environmentShouldNotWorkAfterTokenExpires() throws Exception { Sync tokenRefreshedSync = sync(2); HttpHandler httpHandler = oAuth2TokenHttpHandler( - () -> { - // if (serverCallCount.getAndIncrement() == 0) { - return currentTimeMillis() + tokenLifetime.toMillis(); - // } else { - // return currentTimeMillis() - 100; - // } - }, - tokenRefreshedSync::down); + () -> currentTimeMillis() + tokenLifetime.toMillis(), tokenRefreshedSync::down); this.server = start(httpHandler, null); try (Environment env = @@ -124,7 +118,7 @@ void environmentShouldNotWorkAfterTokenExpires() throws Exception { producer.send(producer.messageBuilder().build(), ctx -> {}); assertThat(consumeSync).completes(); assertThat(tokenRefreshedSync).completes(); - org.assertj.core.api.Assertions.assertThat(consumer.isConsuming()); + org.assertj.core.api.Assertions.assertThat(consumer.state() == Resource.State.OPEN); // stopping the token server, there won't be attempts to re-authenticate this.server.stop(0); @@ -157,7 +151,7 @@ void environmentShouldNotWorkAfterTokenExpires() throws Exception { org.assertj.core.api.Assertions.assertThat(lastResponseCode) .hasValue(CODE_PRODUCER_NOT_AVAILABLE); - waitAtMost(() -> !consumer.isConsuming()); + waitAtMost(() -> consumer.state() != Resource.State.OPEN); } } diff --git a/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentTest.java b/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentTest.java index f24cccd473..6126a9b303 100644 --- a/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentTest.java @@ -14,6 +14,7 @@ // info@rabbitmq.com. package com.rabbitmq.stream.impl; +import static com.rabbitmq.stream.Cli.listLocatorConnections; import static com.rabbitmq.stream.impl.Assertions.assertThat; import static com.rabbitmq.stream.impl.TestUtils.*; import static com.rabbitmq.stream.impl.TestUtils.CountDownLatchConditions.completed; @@ -432,9 +433,8 @@ void shouldHaveSeveralLocatorsWhenSeveralUrisSpecifiedAndShouldRecoverThemIfClos Supplier> locatorConnectionNamesSupplier = () -> - Cli.listConnections().stream() + listLocatorConnections().stream() .map(ConnectionInfo::clientProvidedName) - .filter(name -> name != null && name.contains("-locator-")) .collect(toList()); List locatorConnectionNames = locatorConnectionNamesSupplier.get(); assertThat(locatorConnectionNames).hasSameSizeAs(uris); diff --git a/src/test/java/com/rabbitmq/stream/impl/StreamProducerTest.java b/src/test/java/com/rabbitmq/stream/impl/StreamProducerTest.java index 47780c26c0..6d1508fe18 100644 --- a/src/test/java/com/rabbitmq/stream/impl/StreamProducerTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/StreamProducerTest.java @@ -23,7 +23,6 @@ import com.rabbitmq.stream.*; import com.rabbitmq.stream.compression.Compression; import com.rabbitmq.stream.impl.MonitoringTestUtils.ProducerInfo; -import com.rabbitmq.stream.impl.StreamProducer.Status; import com.rabbitmq.stream.impl.TestUtils.Sync; import io.netty.channel.ChannelOption; import io.netty.channel.ConnectTimeoutException; @@ -282,7 +281,7 @@ void shouldRecoverAfterConnectionIsKilled(int subEntrySize) throws Exception { Cli.killConnection("rabbitmq-stream-producer-0"); - waitAtMost(() -> ((StreamProducer) producer).status() == Status.NOT_AVAILABLE); + waitAtMost(() -> ((StreamProducer) producer).state() == Resource.State.RECOVERING); canPublish.set(false); assertThat(confirmed.get()).isPositive(); @@ -302,7 +301,7 @@ void shouldRecoverAfterConnectionIsKilled(int subEntrySize) throws Exception { (published.get() - (confirmed.get() + errored.get())))); assertThat(confirmed.get() + errored.get()).isEqualTo(published.get()); - waitAtMost(() -> ((StreamProducer) producer).status() == StreamProducer.Status.RUNNING); + waitAtMost(() -> ((StreamProducer) producer).state() == Resource.State.OPEN); int confirmedAfterUnavailability = confirmed.get(); int errorAfterUnavailability = errored.get(); diff --git a/src/test/java/com/rabbitmq/stream/impl/StreamProducerUnitTest.java b/src/test/java/com/rabbitmq/stream/impl/StreamProducerUnitTest.java index 51f320a46c..efcc4c0fef 100644 --- a/src/test/java/com/rabbitmq/stream/impl/StreamProducerUnitTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/StreamProducerUnitTest.java @@ -15,6 +15,7 @@ package com.rabbitmq.stream.impl; import static com.rabbitmq.stream.impl.TestUtils.waitAtMost; +import static java.util.Collections.emptyList; import static java.util.stream.IntStream.range; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -184,6 +185,7 @@ void confirmTimeoutTaskShouldFailMessagesAfterTimeout( Duration.ofSeconds(10), true, null, + emptyList(), env); range(0, messageCount) @@ -235,6 +237,7 @@ void enqueueTimeoutMessageShouldBeFailedWhenEnqueueTimeoutIsReached(int subEntry enqueueTimeout, true, null, + emptyList(), env); AtomicBoolean confirmCalled = new AtomicBoolean(false); @@ -276,6 +279,7 @@ void enqueueTimeoutSendingShouldBlockWhenEnqueueTimeoutIsZero(int subEntrySize) enqueueTimeout, true, null, + emptyList(), env); AtomicBoolean confirmCalled = new AtomicBoolean(false); From 835e7caff0c4d4a68c2d5e6e3860d98ae3d0c2da Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= <514737+acogoluegnes@users.noreply.github.com> Date: Mon, 22 Sep 2025 17:19:01 +0200 Subject: [PATCH 2/3] Allow store offset on consumer closing When flushing with auto tracking strategy. --- .../com/rabbitmq/stream/impl/StreamConsumer.java | 15 +++++++++++---- .../rabbitmq/stream/impl/StreamConsumerTest.java | 2 +- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java b/src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java index 273bf61757..b86882807d 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java @@ -352,7 +352,7 @@ void start() { @Override public void store(long offset) { - checkOpen(); + checkNotClosed(); trackingCallback.accept(offset); if (canTrack()) { if (offsetBefore(this.lastRequestedStoredOffset, offset) @@ -444,8 +444,8 @@ boolean sacActive() { } boolean canTrack() { - // FIXME check the condition to be able to track - return ((this.state() == OPENING || this.state() == OPEN) + // closing is OK e.g. when flushing on closing + return ((this.state() == OPENING || this.state() == OPEN || this.state() == CLOSING) || (this.trackingClient == null && this.state() == RECOVERING)) && this.name != null; } @@ -526,7 +526,7 @@ void running() { } long storedOffset(Supplier clientSupplier) { - checkOpen(); + checkNotClosed(); if (canTrack()) { return OffsetTrackingUtils.storedOffset(clientSupplier, this.name, this.stream); } else if (this.name == null) { @@ -607,4 +607,11 @@ void markRecovering() { void markOpen() { state(OPEN); } + + private void checkNotClosed() { + if (state() == CLOSED) { + // will throw the appropriate exception + checkOpen(); + } + } } diff --git a/src/test/java/com/rabbitmq/stream/impl/StreamConsumerTest.java b/src/test/java/com/rabbitmq/stream/impl/StreamConsumerTest.java index 583c448e59..153af5fd20 100644 --- a/src/test/java/com/rabbitmq/stream/impl/StreamConsumerTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/StreamConsumerTest.java @@ -971,7 +971,7 @@ void methodsShouldThrowExceptionWhenConsumerIsClosed() { ThrowingCallable[] calls = new ThrowingCallable[] {() -> consumer.store(1), () -> consumer.storedOffset()}; Arrays.stream(calls) - .forEach(call -> assertThatThrownBy(call).isInstanceOf(IllegalStateException.class)); + .forEach(call -> assertThatThrownBy(call).isInstanceOf(ResourceClosedException.class)); } @Test From 0ac78430ede0581a6bee757fb1f6e17d42a231b8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= <514737+acogoluegnes@users.noreply.github.com> Date: Mon, 22 Sep 2025 17:40:51 +0200 Subject: [PATCH 3/3] Squash spotbugs warning --- src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java | 2 -- src/main/java/com/rabbitmq/stream/impl/StreamProducer.java | 2 -- 2 files changed, 4 deletions(-) diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java b/src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java index b86882807d..d7d28d3287 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java @@ -29,7 +29,6 @@ import com.rabbitmq.stream.impl.StreamEnvironment.LocatorNotAvailableException; import com.rabbitmq.stream.impl.StreamEnvironment.TrackingConsumerRegistration; import com.rabbitmq.stream.impl.Utils.CompositeConsumerUpdateListener; -import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.time.Duration; import java.util.List; import java.util.Map; @@ -69,7 +68,6 @@ final class StreamConsumer extends ResourceBase implements Consumer { private final OffsetSpecification initialOffsetSpecification; private final Lock lock = new ReentrantLock(); - @SuppressFBWarnings("CT_CONSTRUCTOR_THROW") StreamConsumer( String stream, OffsetSpecification offsetSpecification, diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java b/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java index 38ad4eacf6..e3c3626d07 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java @@ -27,7 +27,6 @@ import com.rabbitmq.stream.compression.CompressionCodec; import com.rabbitmq.stream.impl.Client.Response; import com.rabbitmq.stream.impl.ProducerUtils.AccumulatedEntity; -import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import io.netty.buffer.ByteBuf; import java.nio.charset.StandardCharsets; import java.time.Duration; @@ -85,7 +84,6 @@ final class StreamProducer extends ResourceBase implements Producer { private final short publishVersion; private final Lock lock = new ReentrantLock(); - @SuppressFBWarnings("CT_CONSTRUCTOR_THROW") StreamProducer( String name, String stream,