Skip to content

Commit e3c1c11

Browse files
authored
Merge pull request #424 from rabbitmq/force-replica
Add option to force consuming from a replica
2 parents 0c01873 + 0754483 commit e3c1c11

File tree

13 files changed

+349
-67
lines changed

13 files changed

+349
-67
lines changed

src/docs/asciidoc/api.adoc

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,16 @@ a new connection is open. The value must be between 1 and 255.
204204
|To delay the connection opening until necessary.
205205
|false
206206

207+
|`requestedHeartbeat`
208+
|Heartbeat requested by the client.
209+
|60 seconds
210+
211+
|`forceReplicaForConsumers`
212+
|Retry connecting until a replica is available for consumers.
213+
The client retries 5 times before falling back to the stream leader node.
214+
Set to `true` only for clustered environments, not for 1-node environments, where only the stream leader is available.
215+
|`false`
216+
207217
|`id`
208218
|Informational ID for the environment instance.
209219
Used as a prefix for connection names.

src/main/java/com/rabbitmq/stream/EnvironmentBuilder.java

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -69,9 +69,8 @@ public interface EnvironmentBuilder {
6969
* <p><i>The default implementation is overridden automatically if the following conditions are
7070
* met: the host to connect to is <code>localhost</code>, the user is <code>guest</code>, and no
7171
* address resolver has been provided. The client will then always tries to connect to <code>
72-
* localhost</code> to facilitate local development.
73-
* Just provide a pass-through address resolver to avoid this
74-
* behavior, e.g.:</i>
72+
* localhost</code> to facilitate local development. Just provide a pass-through address resolver
73+
* to avoid this behavior, e.g.:</i>
7574
*
7675
* <pre>
7776
* Environment.builder()
@@ -184,7 +183,7 @@ public interface EnvironmentBuilder {
184183
EnvironmentBuilder virtualHost(String virtualHost);
185184

186185
/**
187-
* The hearbeat to request.
186+
* The heartbeat to request.
188187
*
189188
* <p>Default is 60 seconds.
190189
*
@@ -323,6 +322,34 @@ EnvironmentBuilder topologyUpdateBackOffDelayPolicy(
323322
*/
324323
EnvironmentBuilder lazyInitialization(boolean lazy);
325324

325+
/**
326+
* Flag to force the connection to a stream replica for consumers.
327+
*
328+
* <p>The library will always prefer to connect to a stream replica to consume from, but it will
329+
* fall back to the stream leader if no replica is available. This is the default behavior. Set
330+
* this flag to <code>true</code> to make the library wait for a replica to become available if
331+
* only the stream leader is available. This can lead to longer recovery time but help to offload
332+
* a stream leader and let it deal only with write requests.
333+
*
334+
* <p>Note the library performs only 5 attempts to locate a replica before falling back to the
335+
* leader when the flag is set to <code>true</code>.
336+
*
337+
* <p>The {@link #recoveryBackOffDelayPolicy(BackOffDelayPolicy)} and {@link
338+
* #topologyUpdateBackOffDelayPolicy(BackOffDelayPolicy)} policies control the time between
339+
* attempts.
340+
*
341+
* <p><b>Do not set this flag to <code>true</code> when streams have only 1 member (the leader),
342+
* e.g. for local development.</b>
343+
*
344+
* <p>Default is false.
345+
*
346+
* @param forceReplica whether to force the connection to a replica or not
347+
* @return this builder instance
348+
* @see #recoveryBackOffDelayPolicy(BackOffDelayPolicy)
349+
* @see #topologyUpdateBackOffDelayPolicy(BackOffDelayPolicy)
350+
*/
351+
EnvironmentBuilder forceReplicaForConsumers(boolean forceReplica);
352+
326353
/**
327354
* Create the {@link Environment} instance.
328355
*

src/main/java/com/rabbitmq/stream/impl/Client.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,7 @@ public long applyAsLong(Object value) {
185185
FlushConsolidationHandler.class.getSimpleName();
186186
private final String NETTY_HANDLER_STREAM = StreamHandler.class.getSimpleName();
187187
private final String host;
188+
private final String clientConnectionName;
188189
private final int port;
189190
private final Map<String, String> serverProperties;
190191
private final Map<String, String> connectionProperties;
@@ -313,6 +314,7 @@ public void initChannel(SocketChannel ch) {
313314
f = b.connect(parameters.host, parameters.port).sync();
314315
this.host = parameters.host;
315316
this.port = parameters.port;
317+
this.clientConnectionName = clientConnectionName;
316318
} catch (Exception e) {
317319
String message =
318320
format(
@@ -2696,7 +2698,11 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
26962698
if (evt instanceof IdleStateEvent) {
26972699
IdleStateEvent e = (IdleStateEvent) evt;
26982700
if (e.state() == IdleState.READER_IDLE) {
2699-
LOGGER.info("Closing connection because it's been idle for too long");
2701+
LOGGER.info(
2702+
"Closing connection {} on {}:{} because it's been idle for too long",
2703+
clientConnectionName,
2704+
host,
2705+
port);
27002706
closing.set(true);
27012707
closingSequence(ShutdownContext.ShutdownReason.HEARTBEAT_FAILURE);
27022708
} else if (e.state() == IdleState.WRITER_IDLE) {

src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java

Lines changed: 44 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import static com.rabbitmq.stream.impl.Utils.namedFunction;
2121
import static com.rabbitmq.stream.impl.Utils.namedRunnable;
2222
import static com.rabbitmq.stream.impl.Utils.quote;
23+
import static java.lang.String.format;
2324

2425
import com.rabbitmq.stream.*;
2526
import com.rabbitmq.stream.Consumer;
@@ -41,10 +42,12 @@
4142
import java.util.Objects;
4243
import java.util.Random;
4344
import java.util.Set;
45+
import java.util.concurrent.Callable;
4446
import java.util.concurrent.ConcurrentHashMap;
4547
import java.util.concurrent.ConcurrentSkipListSet;
4648
import java.util.concurrent.CopyOnWriteArrayList;
4749
import java.util.concurrent.atomic.AtomicBoolean;
50+
import java.util.concurrent.atomic.AtomicInteger;
4851
import java.util.concurrent.atomic.AtomicLong;
4952
import java.util.concurrent.atomic.AtomicReference;
5053
import java.util.function.*;
@@ -56,6 +59,7 @@
5659
class ConsumersCoordinator {
5760

5861
static final int MAX_SUBSCRIPTIONS_PER_CLIENT = 256;
62+
static final int MAX_ATTEMPT_BEFORE_FALLING_BACK_TO_LEADER = 5;
5963

6064
static final OffsetSpecification DEFAULT_OFFSET_SPECIFICATION = OffsetSpecification.next();
6165

@@ -74,16 +78,19 @@ class ConsumersCoordinator {
7478
private final ExecutorServiceFactory executorServiceFactory =
7579
new DefaultExecutorServiceFactory(
7680
Runtime.getRuntime().availableProcessors(), 10, "rabbitmq-stream-consumer-connection-");
81+
private final boolean forceReplica;
7782

7883
ConsumersCoordinator(
7984
StreamEnvironment environment,
8085
int maxConsumersByConnection,
8186
Function<ClientConnectionType, String> connectionNamingStrategy,
82-
ClientFactory clientFactory) {
87+
ClientFactory clientFactory,
88+
boolean forceReplica) {
8389
this.environment = environment;
8490
this.clientFactory = clientFactory;
8591
this.maxConsumersByConnection = maxConsumersByConnection;
8692
this.connectionNamingStrategy = connectionNamingStrategy;
93+
this.forceReplica = forceReplica;
8794
}
8895

8996
private static String keyForClientSubscription(Client.Broker broker) {
@@ -108,7 +115,7 @@ Runnable subscribe(
108115
MessageHandler messageHandler,
109116
Map<String, String> subscriptionProperties,
110117
ConsumerFlowStrategy flowStrategy) {
111-
List<Client.Broker> candidates = findBrokersForStream(stream);
118+
List<Client.Broker> candidates = findBrokersForStream(stream, forceReplica);
112119
Client.Broker newNode = pickBroker(candidates);
113120
if (newNode == null) {
114121
throw new IllegalStateException("No available node to subscribe to");
@@ -201,11 +208,8 @@ private void addToManager(
201208
// manager connection is dead or stream not available
202209
// scheduling manager closing if necessary in another thread to avoid blocking this one
203210
if (pickedManager.isEmpty()) {
204-
ClientSubscriptionsManager manager = pickedManager;
205211
ConsumersCoordinator.this.environment.execute(
206-
() -> {
207-
manager.closeIfEmpty();
208-
},
212+
pickedManager::closeIfEmpty,
209213
"Consumer manager closing after timeout, consumer %d on stream '%s'",
210214
tracker.consumer.id(),
211215
tracker.stream);
@@ -225,12 +229,14 @@ int managerCount() {
225229
}
226230

227231
// package protected for testing
228-
List<Client.Broker> findBrokersForStream(String stream) {
232+
List<Client.Broker> findBrokersForStream(String stream, boolean forceReplica) {
233+
LOGGER.debug(
234+
"Candidate lookup to consumer from '{}', forcing replica? {}", stream, forceReplica);
229235
Map<String, Client.StreamMetadata> metadata =
230236
this.environment.locatorOperation(
231237
namedFunction(
232238
c -> c.metadata(stream), "Candidate lookup to consume from '%s'", stream));
233-
if (metadata.size() == 0 || metadata.get(stream) == null) {
239+
if (metadata.isEmpty() || metadata.get(stream) == null) {
234240
// this is not supposed to happen
235241
throw new StreamDoesNotExistException(stream);
236242
}
@@ -253,8 +259,17 @@ List<Client.Broker> findBrokersForStream(String stream) {
253259

254260
List<Client.Broker> brokers;
255261
if (replicas == null || replicas.isEmpty()) {
256-
brokers = Collections.singletonList(streamMetadata.getLeader());
257-
LOGGER.debug("Only leader node {} for consuming from {}", streamMetadata.getLeader(), stream);
262+
if (forceReplica) {
263+
throw new IllegalStateException(
264+
format(
265+
"Only the leader node is available for consuming from %s and "
266+
+ "consuming from leader has been deactivated for this consumer",
267+
stream));
268+
} else {
269+
brokers = Collections.singletonList(streamMetadata.getLeader());
270+
LOGGER.debug(
271+
"Only leader node {} for consuming from {}", streamMetadata.getLeader(), stream);
272+
}
258273
} else {
259274
LOGGER.debug("Replicas for consuming from {}: {}", stream, replicas);
260275
brokers = new ArrayList<>(replicas);
@@ -265,6 +280,22 @@ List<Client.Broker> findBrokersForStream(String stream) {
265280
return brokers;
266281
}
267282

283+
private Callable<List<Broker>> findBrokersForStream(String stream) {
284+
AtomicInteger attemptNumber = new AtomicInteger();
285+
return () -> {
286+
boolean mustUseReplica;
287+
if (forceReplica) {
288+
mustUseReplica =
289+
attemptNumber.incrementAndGet() <= MAX_ATTEMPT_BEFORE_FALLING_BACK_TO_LEADER;
290+
} else {
291+
mustUseReplica = false;
292+
}
293+
LOGGER.debug(
294+
"Looking for broker(s) for stream {}, forcing replica {}", stream, mustUseReplica);
295+
return findBrokersForStream(stream, mustUseReplica);
296+
};
297+
}
298+
268299
private Client.Broker pickBroker(List<Client.Broker> brokers) {
269300
if (brokers.isEmpty()) {
270301
return null;
@@ -792,7 +823,7 @@ private void assignConsumersToStream(
792823
}
793824
};
794825

795-
AsyncRetry.asyncRetry(() -> findBrokersForStream(stream))
826+
AsyncRetry.asyncRetry(findBrokersForStream(stream))
796827
.description("Candidate lookup to consume from '%s'", stream)
797828
.scheduler(environment.scheduledExecutorService())
798829
.retry(ex -> !(ex instanceof StreamDoesNotExistException))
@@ -885,9 +916,9 @@ private void recoverSubscription(List<Broker> candidates, SubscriptionTracker tr
885916
// maybe not a good candidate, let's refresh and retry for this one
886917
candidates =
887918
Utils.callAndMaybeRetry(
888-
() -> findBrokersForStream(tracker.stream),
919+
findBrokersForStream(tracker.stream),
889920
ex -> !(ex instanceof StreamDoesNotExistException),
890-
environment.recoveryBackOffDelayPolicy(),
921+
recoveryBackOffDelayPolicy(),
891922
"Candidate lookup to consume from '%s' (subscription recovery)",
892923
tracker.stream);
893924
} catch (Exception e) {

src/main/java/com/rabbitmq/stream/impl/ProducersCoordinator.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ private void addToManager(Broker node, AgentTracker tracker) {
162162
try {
163163
pickedManager.register(tracker);
164164
LOGGER.debug(
165-
"Assigned {} tracker {} (stream '{}') to manager {} (node {}), subscription ID {}",
165+
"Assigned {} tracker {} (stream '{}') to manager {} (node {}), publisher ID {}",
166166
tracker.type(),
167167
tracker.uniqueId(),
168168
tracker.stream(),
@@ -605,8 +605,9 @@ private ClientProducersManager(
605605
}
606606
if (shutdownContext.isShutdownUnexpected()) {
607607
LOGGER.debug(
608-
"Recovering {} producer(s) after unexpected connection termination",
609-
producers.size());
608+
"Recovering {} producer(s) and {} tracking consumer(s) after unexpected connection termination",
609+
producers.size(),
610+
trackingConsumerTrackers.size());
610611
producers.forEach((publishingId, tracker) -> tracker.unavailable());
611612
trackingConsumerTrackers.forEach(AgentTracker::unavailable);
612613
// execute in thread pool to free the IO thread
@@ -701,7 +702,8 @@ private void assignProducersToNewManagers(
701702
.thenAccept(
702703
broker -> {
703704
String key = keyForNode(broker);
704-
LOGGER.debug("Assigning {} producer(s) to {}", trackers.size(), key);
705+
LOGGER.debug(
706+
"Assigning {} producer(s) and consumer tracker(s) to {}", trackers.size(), key);
705707
trackers.forEach(tracker -> maybeRecoverAgent(broker, tracker));
706708
})
707709
.exceptionally(
@@ -767,10 +769,10 @@ private void recoverAgent(Broker node, AgentTracker tracker) {
767769
| ClientClosedException
768770
| StreamNotAvailableException e) {
769771
LOGGER.debug(
770-
"{} re-assignment on stream {} timed out or connection closed or stream not available, "
772+
"{} re-assignment on stream {} (ID {}) timed out or connection closed or stream not available, "
771773
+ "refreshing candidate leader and retrying",
772774
tracker.type(),
773-
tracker.id(),
775+
tracker.identifiable() ? tracker.id() : "N/A",
774776
tracker.stream());
775777
// maybe not a good candidate, let's refresh and retry for this one
776778
node =

src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,8 @@ class StreamEnvironment implements Environment {
101101
boolean lazyInit,
102102
Function<ClientConnectionType, String> connectionNamingStrategy,
103103
Function<Client.ClientParameters, Client> clientFactory,
104-
ObservationCollector<?> observationCollector) {
104+
ObservationCollector<?> observationCollector,
105+
boolean forceReplicaForConsumers) {
105106
this.recoveryBackOffDelayPolicy = recoveryBackOffDelayPolicy;
106107
this.topologyUpdateBackOffDelayPolicy = topologyBackOffDelayPolicy;
107108
this.byteBufAllocator = byteBufAllocator;
@@ -208,7 +209,8 @@ class StreamEnvironment implements Environment {
208209
this,
209210
maxConsumersByConnection,
210211
connectionNamingStrategy,
211-
Utils.coordinatorClientFactory(this));
212+
Utils.coordinatorClientFactory(this),
213+
forceReplicaForConsumers);
212214
this.offsetTrackingCoordinator = new OffsetTrackingCoordinator(this);
213215
ClientParameters clientParametersForInit = locatorParametersCopy();
214216
Runnable locatorInitSequence =

src/main/java/com/rabbitmq/stream/impl/StreamEnvironmentBuilder.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ public class StreamEnvironmentBuilder implements EnvironmentBuilder {
6363
private int maxConsumersByConnection = ConsumersCoordinator.MAX_SUBSCRIPTIONS_PER_CLIENT;
6464
private CompressionCodecFactory compressionCodecFactory;
6565
private boolean lazyInit = false;
66+
private boolean forceReplicaForConsumers = false;
6667
private Function<Client.ClientParameters, Client> clientFactory = Client::new;
6768
private ObservationCollector<?> observationCollector = ObservationCollector.NO_OP;
6869

@@ -266,6 +267,12 @@ public EnvironmentBuilder lazyInitialization(boolean lazy) {
266267
return this;
267268
}
268269

270+
@Override
271+
public EnvironmentBuilder forceReplicaForConsumers(boolean forceReplica) {
272+
this.forceReplicaForConsumers = forceReplica;
273+
return this;
274+
}
275+
269276
@Override
270277
public TlsConfiguration tls() {
271278
this.tls.enable();
@@ -318,7 +325,8 @@ public Environment build() {
318325
lazyInit,
319326
connectionNamingStrategy,
320327
this.clientFactory,
321-
this.observationCollector);
328+
this.observationCollector,
329+
this.forceReplicaForConsumers);
322330
}
323331

324332
static final class DefaultTlsConfiguration implements TlsConfiguration {

src/main/java/com/rabbitmq/stream/impl/Utils.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,7 @@ static <T, R> Function<T, R> namedFunction(Function<T, R> task, String format, O
207207
}
208208

209209
static <T> T callAndMaybeRetry(
210-
Supplier<T> operation, Predicate<Exception> retryCondition, String format, Object... args) {
210+
Callable<T> operation, Predicate<Exception> retryCondition, String format, Object... args) {
211211
return callAndMaybeRetry(
212212
operation,
213213
retryCondition,
@@ -217,7 +217,7 @@ static <T> T callAndMaybeRetry(
217217
}
218218

219219
static <T> T callAndMaybeRetry(
220-
Supplier<T> operation,
220+
Callable<T> operation,
221221
Predicate<Exception> retryCondition,
222222
BackOffDelayPolicy delayPolicy,
223223
String format,
@@ -230,7 +230,7 @@ static <T> T callAndMaybeRetry(
230230
while (keepTrying) {
231231
try {
232232
attempt++;
233-
T result = operation.get();
233+
T result = operation.call();
234234
Duration operationDuration = Duration.ofNanos(System.nanoTime() - startTime);
235235
LOGGER.debug(
236236
"Operation '{}' completed in {} ms after {} attempt(s)",

0 commit comments

Comments
 (0)