Skip to content

Commit 0754483

Browse files
committed
Add forceReplicaForConsumer flag
To force the use of a replica for consumers. The library tries 5 times before falling back to the stream leader when the flag is set to true (default is false, to favor faster recovery). The stream leader is started first and a replica can start a few seconds later, so with bad timing a consumer can see only the leader when it recovers. Setting the flag to true will force the library to retry a few times to use a replica if one becomes available. The performance tool has also the corresponding flag now: --force-replica-for-consumers. This commit also adds the following options to the performance tool: --hearbeat, --connection-recovery-interval, --topology-recovery-interval. Fixes #426, #427
1 parent 39776f0 commit 0754483

File tree

11 files changed

+184
-36
lines changed

11 files changed

+184
-36
lines changed

src/docs/asciidoc/api.adoc

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,16 @@ a new connection is open. The value must be between 1 and 255.
204204
|To delay the connection opening until necessary.
205205
|false
206206

207+
|`requestedHeartbeat`
208+
|Heartbeat requested by the client.
209+
|60 seconds
210+
211+
|`forceReplicaForConsumers`
212+
|Retry connecting until a replica is available for consumers.
213+
The client retries 5 times before falling back to the stream leader node.
214+
Set to `true` only for clustered environments, not for 1-node environments, where only the stream leader is available.
215+
|`false`
216+
207217
|`id`
208218
|Informational ID for the environment instance.
209219
Used as a prefix for connection names.

src/main/java/com/rabbitmq/stream/EnvironmentBuilder.java

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ public interface EnvironmentBuilder {
183183
EnvironmentBuilder virtualHost(String virtualHost);
184184

185185
/**
186-
* The hearbeat to request.
186+
* The heartbeat to request.
187187
*
188188
* <p>Default is 60 seconds.
189189
*
@@ -322,6 +322,34 @@ EnvironmentBuilder topologyUpdateBackOffDelayPolicy(
322322
*/
323323
EnvironmentBuilder lazyInitialization(boolean lazy);
324324

325+
/**
326+
* Flag to force the connection to a stream replica for consumers.
327+
*
328+
* <p>The library will always prefer to connect to a stream replica to consume from, but it will
329+
* fall back to the stream leader if no replica is available. This is the default behavior. Set
330+
* this flag to <code>true</code> to make the library wait for a replica to become available if
331+
* only the stream leader is available. This can lead to longer recovery time but help to offload
332+
* a stream leader and let it deal only with write requests.
333+
*
334+
* <p>Note the library performs only 5 attempts to locate a replica before falling back to the
335+
* leader when the flag is set to <code>true</code>.
336+
*
337+
* <p>The {@link #recoveryBackOffDelayPolicy(BackOffDelayPolicy)} and {@link
338+
* #topologyUpdateBackOffDelayPolicy(BackOffDelayPolicy)} policies control the time between
339+
* attempts.
340+
*
341+
* <p><b>Do not set this flag to <code>true</code> when streams have only 1 member (the leader),
342+
* e.g. for local development.</b>
343+
*
344+
* <p>Default is false.
345+
*
346+
* @param forceReplica whether to force the connection to a replica or not
347+
* @return this builder instance
348+
* @see #recoveryBackOffDelayPolicy(BackOffDelayPolicy)
349+
* @see #topologyUpdateBackOffDelayPolicy(BackOffDelayPolicy)
350+
*/
351+
EnvironmentBuilder forceReplicaForConsumers(boolean forceReplica);
352+
325353
/**
326354
* Create the {@link Environment} instance.
327355
*

src/main/java/com/rabbitmq/stream/impl/Client.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,7 @@ public long applyAsLong(Object value) {
185185
FlushConsolidationHandler.class.getSimpleName();
186186
private final String NETTY_HANDLER_STREAM = StreamHandler.class.getSimpleName();
187187
private final String host;
188+
private final String clientConnectionName;
188189
private final int port;
189190
private final Map<String, String> serverProperties;
190191
private final Map<String, String> connectionProperties;
@@ -313,6 +314,7 @@ public void initChannel(SocketChannel ch) {
313314
f = b.connect(parameters.host, parameters.port).sync();
314315
this.host = parameters.host;
315316
this.port = parameters.port;
317+
this.clientConnectionName = clientConnectionName;
316318
} catch (Exception e) {
317319
String message =
318320
format(
@@ -2696,7 +2698,11 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
26962698
if (evt instanceof IdleStateEvent) {
26972699
IdleStateEvent e = (IdleStateEvent) evt;
26982700
if (e.state() == IdleState.READER_IDLE) {
2699-
LOGGER.info("Closing connection because it's been idle for too long");
2701+
LOGGER.info(
2702+
"Closing connection {} on {}:{} because it's been idle for too long",
2703+
clientConnectionName,
2704+
host,
2705+
port);
27002706
closing.set(true);
27012707
closingSequence(ShutdownContext.ShutdownReason.HEARTBEAT_FAILURE);
27022708
} else if (e.state() == IdleState.WRITER_IDLE) {

src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -290,6 +290,8 @@ private Callable<List<Broker>> findBrokersForStream(String stream) {
290290
} else {
291291
mustUseReplica = false;
292292
}
293+
LOGGER.debug(
294+
"Looking for broker(s) for stream {}, forcing replica {}", stream, mustUseReplica);
293295
return findBrokersForStream(stream, mustUseReplica);
294296
};
295297
}

src/main/java/com/rabbitmq/stream/impl/ProducersCoordinator.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ private void addToManager(Broker node, AgentTracker tracker) {
162162
try {
163163
pickedManager.register(tracker);
164164
LOGGER.debug(
165-
"Assigned {} tracker {} (stream '{}') to manager {} (node {}), subscription ID {}",
165+
"Assigned {} tracker {} (stream '{}') to manager {} (node {}), publisher ID {}",
166166
tracker.type(),
167167
tracker.uniqueId(),
168168
tracker.stream(),
@@ -605,8 +605,9 @@ private ClientProducersManager(
605605
}
606606
if (shutdownContext.isShutdownUnexpected()) {
607607
LOGGER.debug(
608-
"Recovering {} producer(s) after unexpected connection termination",
609-
producers.size());
608+
"Recovering {} producer(s) and {} tracking consumer(s) after unexpected connection termination",
609+
producers.size(),
610+
trackingConsumerTrackers.size());
610611
producers.forEach((publishingId, tracker) -> tracker.unavailable());
611612
trackingConsumerTrackers.forEach(AgentTracker::unavailable);
612613
// execute in thread pool to free the IO thread
@@ -701,7 +702,8 @@ private void assignProducersToNewManagers(
701702
.thenAccept(
702703
broker -> {
703704
String key = keyForNode(broker);
704-
LOGGER.debug("Assigning {} producer(s) to {}", trackers.size(), key);
705+
LOGGER.debug(
706+
"Assigning {} producer(s) and consumer tracker(s) to {}", trackers.size(), key);
705707
trackers.forEach(tracker -> maybeRecoverAgent(broker, tracker));
706708
})
707709
.exceptionally(
@@ -767,10 +769,10 @@ private void recoverAgent(Broker node, AgentTracker tracker) {
767769
| ClientClosedException
768770
| StreamNotAvailableException e) {
769771
LOGGER.debug(
770-
"{} re-assignment on stream {} timed out or connection closed or stream not available, "
772+
"{} re-assignment on stream {} (ID {}) timed out or connection closed or stream not available, "
771773
+ "refreshing candidate leader and retrying",
772774
tracker.type(),
773-
tracker.id(),
775+
tracker.identifiable() ? tracker.id() : "N/A",
774776
tracker.stream());
775777
// maybe not a good candidate, let's refresh and retry for this one
776778
node =

src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,8 @@ class StreamEnvironment implements Environment {
101101
boolean lazyInit,
102102
Function<ClientConnectionType, String> connectionNamingStrategy,
103103
Function<Client.ClientParameters, Client> clientFactory,
104-
ObservationCollector<?> observationCollector) {
104+
ObservationCollector<?> observationCollector,
105+
boolean forceReplicaForConsumers) {
105106
this.recoveryBackOffDelayPolicy = recoveryBackOffDelayPolicy;
106107
this.topologyUpdateBackOffDelayPolicy = topologyBackOffDelayPolicy;
107108
this.byteBufAllocator = byteBufAllocator;
@@ -209,7 +210,7 @@ class StreamEnvironment implements Environment {
209210
maxConsumersByConnection,
210211
connectionNamingStrategy,
211212
Utils.coordinatorClientFactory(this),
212-
false);
213+
forceReplicaForConsumers);
213214
this.offsetTrackingCoordinator = new OffsetTrackingCoordinator(this);
214215
ClientParameters clientParametersForInit = locatorParametersCopy();
215216
Runnable locatorInitSequence =

src/main/java/com/rabbitmq/stream/impl/StreamEnvironmentBuilder.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ public class StreamEnvironmentBuilder implements EnvironmentBuilder {
6363
private int maxConsumersByConnection = ConsumersCoordinator.MAX_SUBSCRIPTIONS_PER_CLIENT;
6464
private CompressionCodecFactory compressionCodecFactory;
6565
private boolean lazyInit = false;
66+
private boolean forceReplicaForConsumers = false;
6667
private Function<Client.ClientParameters, Client> clientFactory = Client::new;
6768
private ObservationCollector<?> observationCollector = ObservationCollector.NO_OP;
6869

@@ -266,6 +267,12 @@ public EnvironmentBuilder lazyInitialization(boolean lazy) {
266267
return this;
267268
}
268269

270+
@Override
271+
public EnvironmentBuilder forceReplicaForConsumers(boolean forceReplica) {
272+
this.forceReplicaForConsumers = forceReplica;
273+
return this;
274+
}
275+
269276
@Override
270277
public TlsConfiguration tls() {
271278
this.tls.enable();
@@ -318,7 +325,8 @@ public Environment build() {
318325
lazyInit,
319326
connectionNamingStrategy,
320327
this.clientFactory,
321-
this.observationCollector);
328+
this.observationCollector,
329+
this.forceReplicaForConsumers);
322330
}
323331

324332
static final class DefaultTlsConfiguration implements TlsConfiguration {

src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java

Lines changed: 42 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -22,24 +22,9 @@
2222
import com.google.common.util.concurrent.RateLimiter;
2323
import com.rabbitmq.client.Channel;
2424
import com.rabbitmq.client.Connection;
25-
import com.rabbitmq.stream.Address;
26-
import com.rabbitmq.stream.AddressResolver;
27-
import com.rabbitmq.stream.ByteCapacity;
28-
import com.rabbitmq.stream.Codec;
29-
import com.rabbitmq.stream.ConfirmationHandler;
30-
import com.rabbitmq.stream.Constants;
31-
import com.rabbitmq.stream.Consumer;
32-
import com.rabbitmq.stream.ConsumerBuilder;
33-
import com.rabbitmq.stream.Environment;
34-
import com.rabbitmq.stream.EnvironmentBuilder;
25+
import com.rabbitmq.stream.*;
3526
import com.rabbitmq.stream.EnvironmentBuilder.TlsConfiguration;
36-
import com.rabbitmq.stream.MessageBuilder;
37-
import com.rabbitmq.stream.OffsetSpecification;
38-
import com.rabbitmq.stream.Producer;
39-
import com.rabbitmq.stream.ProducerBuilder;
40-
import com.rabbitmq.stream.StreamCreator;
4127
import com.rabbitmq.stream.StreamCreator.LeaderLocator;
42-
import com.rabbitmq.stream.StreamException;
4328
import com.rabbitmq.stream.codec.QpidProtonCodec;
4429
import com.rabbitmq.stream.codec.SimpleCodec;
4530
import com.rabbitmq.stream.compression.Compression;
@@ -443,6 +428,12 @@ public class StreamPerfTest implements Callable<Integer> {
443428
split = ",")
444429
private List<String> filterValues;
445430

431+
@CommandLine.Option(
432+
names = {"--force-replica-for-consumers", "-frfc"},
433+
description = "force the connection to a replica for consumers",
434+
defaultValue = "false")
435+
private boolean forceReplicaForConsumers;
436+
446437
static class InstanceSyncOptions {
447438

448439
@CommandLine.Option(
@@ -484,6 +475,33 @@ static class InstanceSyncOptions {
484475
converter = Utils.NotNegativeIntegerTypeConverter.class)
485476
private int initialCredits;
486477

478+
@CommandLine.Option(
479+
names = {"--heartbeat", "-b"},
480+
description = "requested heartbeat in seconds",
481+
defaultValue = "60",
482+
converter = Utils.GreaterThanOrEqualToZeroIntegerTypeConverter.class)
483+
private int heartbeat;
484+
485+
@CommandLine.Option(
486+
names = {"--connection-recovery-interval", "-cri"},
487+
description =
488+
"connection recovery interval in seconds. "
489+
+ "Examples: 5 for a fixed delay of 5 seconds, 5:10 for a first attempt after 5 seconds then "
490+
+ "10 seconds between attempts.",
491+
defaultValue = "5",
492+
converter = Utils.BackOffDelayPolicyTypeConverter.class)
493+
private BackOffDelayPolicy recoveryBackOffDelayPolicy;
494+
495+
@CommandLine.Option(
496+
names = {"--topology-recovery-interval", "-tri"},
497+
description =
498+
"topology recovery interval in seconds. "
499+
+ "Examples: 5 for a fixed delay of 5 seconds, 5:10 for a first attempt after 5 seconds then "
500+
+ "10 seconds between attempts.",
501+
defaultValue = "5:1",
502+
converter = Utils.BackOffDelayPolicyTypeConverter.class)
503+
private BackOffDelayPolicy topologyBackOffDelayPolicy;
504+
487505
private MetricsCollector metricsCollector;
488506
private PerformanceMetrics performanceMetrics;
489507
private List<Monitoring> monitorings;
@@ -747,7 +765,11 @@ public Integer call() throws Exception {
747765
.maxTrackingConsumersByConnection(this.trackingConsumersByConnection)
748766
.maxConsumersByConnection(this.consumersByConnection)
749767
.rpcTimeout(Duration.ofSeconds(this.rpcTimeout))
750-
.requestedMaxFrameSize((int) this.requestedMaxFrameSize.toBytes());
768+
.requestedMaxFrameSize((int) this.requestedMaxFrameSize.toBytes())
769+
.forceReplicaForConsumers(this.forceReplicaForConsumers)
770+
.requestedHeartbeat(Duration.ofSeconds(this.heartbeat))
771+
.recoveryBackOffDelayPolicy(this.recoveryBackOffDelayPolicy)
772+
.topologyUpdateBackOffDelayPolicy(this.topologyBackOffDelayPolicy);
751773

752774
if (addrResolver != null) {
753775
environmentBuilder = environmentBuilder.addressResolver(addrResolver);
@@ -978,7 +1000,7 @@ public Integer call() throws Exception {
9781000
final int msgSize = this.messageSize;
9791001

9801002
try {
981-
while (true && !Thread.currentThread().isInterrupted()) {
1003+
while (!Thread.currentThread().isInterrupted()) {
9821004
rateLimiterCallback.run();
9831005
// Using current time for interoperability with other tools
9841006
// and also across different processes.
@@ -993,9 +1015,8 @@ public Integer call() throws Exception {
9931015
messageBuilder.addData(payload).build(), confirmationHandler);
9941016
}
9951017
} catch (Exception e) {
996-
if (e instanceof InterruptedException
997-
|| (e.getCause() != null
998-
&& e.getCause() instanceof InterruptedException)) {
1018+
if (e.getCause() != null
1019+
&& e.getCause() instanceof InterruptedException) {
9991020
LOGGER.info("Publisher #{} thread interrupted", i, e);
10001021
} else {
10011022
LOGGER.warn("Publisher #{} crashed", i, e);

src/main/java/com/rabbitmq/stream/perf/Utils.java

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,16 @@
1313
// info@rabbitmq.com.
1414
package com.rabbitmq.stream.perf;
1515

16+
import static java.time.Duration.ofSeconds;
17+
1618
import com.codahale.metrics.MetricRegistry;
1719
import com.rabbitmq.client.BuiltinExchangeType;
1820
import com.rabbitmq.client.Channel;
1921
import com.rabbitmq.client.Connection;
2022
import com.rabbitmq.client.ConnectionFactory;
2123
import com.rabbitmq.client.SocketConfigurator;
2224
import com.rabbitmq.client.SocketConfigurators;
25+
import com.rabbitmq.stream.BackOffDelayPolicy;
2326
import com.rabbitmq.stream.ByteCapacity;
2427
import com.rabbitmq.stream.OffsetSpecification;
2528
import com.rabbitmq.stream.StreamCreator.LeaderLocator;
@@ -676,6 +679,44 @@ public Integer convert(String input) {
676679
}
677680
}
678681

682+
static class BackOffDelayPolicyTypeConverter
683+
implements CommandLine.ITypeConverter<BackOffDelayPolicy> {
684+
685+
@Override
686+
public BackOffDelayPolicy convert(String input) {
687+
if (input == null || input.trim().isEmpty()) {
688+
typeConversionException("Value for back-off delay policy cannot be empty");
689+
}
690+
String[] values = input.split(":");
691+
if (values.length != 1 && values.length != 2) {
692+
typeConversionException("Invalid value for back-off delay policy: " + input);
693+
}
694+
int firstAttempt = 0, nextAttempts = 0;
695+
try {
696+
firstAttempt = Integer.parseInt(values[0]);
697+
if (firstAttempt <= 0) {
698+
throw new IllegalArgumentException();
699+
}
700+
} catch (Exception e) {
701+
typeConversionException("Invalid value for back-off delay policy: " + input);
702+
}
703+
if (values.length == 2) {
704+
try {
705+
nextAttempts = Integer.parseInt(values[1]);
706+
if (nextAttempts <= 0) {
707+
throw new IllegalArgumentException();
708+
}
709+
} catch (Exception e) {
710+
typeConversionException("Invalid value for back-off delay policy: " + input);
711+
}
712+
} else {
713+
nextAttempts = firstAttempt;
714+
}
715+
return BackOffDelayPolicy.fixedWithInitialDelay(
716+
ofSeconds(firstAttempt), ofSeconds(nextAttempts));
717+
}
718+
}
719+
679720
private static void typeConversionException(String message) {
680721
throw new TypeConversionException(message);
681722
}
@@ -820,7 +861,7 @@ protected DistributionSummary createChunkSizeDistributionSummary(
820861
.tags(tags)
821862
.description("chunk size")
822863
.publishPercentiles(0.5, 0.75, 0.95, 0.99)
823-
.distributionStatisticExpiry(Duration.ofSeconds(1))
864+
.distributionStatisticExpiry(ofSeconds(1))
824865
.serviceLevelObjectives()
825866
.register(registry);
826867
}

0 commit comments

Comments
 (0)