2020import static com .rabbitmq .stream .impl .Utils .namedFunction ;
2121import static com .rabbitmq .stream .impl .Utils .namedRunnable ;
2222import static com .rabbitmq .stream .impl .Utils .quote ;
23+ import static java .lang .String .format ;
2324
2425import com .rabbitmq .stream .*;
2526import com .rabbitmq .stream .Consumer ;
4142import java .util .Objects ;
4243import java .util .Random ;
4344import java .util .Set ;
45+ import java .util .concurrent .Callable ;
4446import java .util .concurrent .ConcurrentHashMap ;
4547import java .util .concurrent .ConcurrentSkipListSet ;
4648import java .util .concurrent .CopyOnWriteArrayList ;
4749import java .util .concurrent .atomic .AtomicBoolean ;
50+ import java .util .concurrent .atomic .AtomicInteger ;
4851import java .util .concurrent .atomic .AtomicLong ;
4952import java .util .concurrent .atomic .AtomicReference ;
5053import java .util .function .*;
5659class ConsumersCoordinator {
5760
5861 static final int MAX_SUBSCRIPTIONS_PER_CLIENT = 256 ;
62+ static final int MAX_ATTEMPT_BEFORE_FALLING_BACK_TO_LEADER = 5 ;
5963
6064 static final OffsetSpecification DEFAULT_OFFSET_SPECIFICATION = OffsetSpecification .next ();
6165
@@ -74,16 +78,19 @@ class ConsumersCoordinator {
7478 private final ExecutorServiceFactory executorServiceFactory =
7579 new DefaultExecutorServiceFactory (
7680 Runtime .getRuntime ().availableProcessors (), 10 , "rabbitmq-stream-consumer-connection-" );
81+ private final boolean forceReplica ;
7782
7883 ConsumersCoordinator (
7984 StreamEnvironment environment ,
8085 int maxConsumersByConnection ,
8186 Function <ClientConnectionType , String > connectionNamingStrategy ,
82- ClientFactory clientFactory ) {
87+ ClientFactory clientFactory ,
88+ boolean forceReplica ) {
8389 this .environment = environment ;
8490 this .clientFactory = clientFactory ;
8591 this .maxConsumersByConnection = maxConsumersByConnection ;
8692 this .connectionNamingStrategy = connectionNamingStrategy ;
93+ this .forceReplica = forceReplica ;
8794 }
8895
8996 private static String keyForClientSubscription (Client .Broker broker ) {
@@ -108,7 +115,7 @@ Runnable subscribe(
108115 MessageHandler messageHandler ,
109116 Map <String , String > subscriptionProperties ,
110117 ConsumerFlowStrategy flowStrategy ) {
111- List <Client .Broker > candidates = findBrokersForStream (stream );
118+ List <Client .Broker > candidates = findBrokersForStream (stream , forceReplica );
112119 Client .Broker newNode = pickBroker (candidates );
113120 if (newNode == null ) {
114121 throw new IllegalStateException ("No available node to subscribe to" );
@@ -201,11 +208,8 @@ private void addToManager(
201208 // manager connection is dead or stream not available
202209 // scheduling manager closing if necessary in another thread to avoid blocking this one
203210 if (pickedManager .isEmpty ()) {
204- ClientSubscriptionsManager manager = pickedManager ;
205211 ConsumersCoordinator .this .environment .execute (
206- () -> {
207- manager .closeIfEmpty ();
208- },
212+ pickedManager ::closeIfEmpty ,
209213 "Consumer manager closing after timeout, consumer %d on stream '%s'" ,
210214 tracker .consumer .id (),
211215 tracker .stream );
@@ -225,12 +229,14 @@ int managerCount() {
225229 }
226230
227231 // package protected for testing
228- List <Client .Broker > findBrokersForStream (String stream ) {
232+ List <Client .Broker > findBrokersForStream (String stream , boolean forceReplica ) {
233+ LOGGER .debug (
234+ "Candidate lookup to consumer from '{}', forcing replica? {}" , stream , forceReplica );
229235 Map <String , Client .StreamMetadata > metadata =
230236 this .environment .locatorOperation (
231237 namedFunction (
232238 c -> c .metadata (stream ), "Candidate lookup to consume from '%s'" , stream ));
233- if (metadata .size () == 0 || metadata .get (stream ) == null ) {
239+ if (metadata .isEmpty () || metadata .get (stream ) == null ) {
234240 // this is not supposed to happen
235241 throw new StreamDoesNotExistException (stream );
236242 }
@@ -253,8 +259,17 @@ List<Client.Broker> findBrokersForStream(String stream) {
253259
254260 List <Client .Broker > brokers ;
255261 if (replicas == null || replicas .isEmpty ()) {
256- brokers = Collections .singletonList (streamMetadata .getLeader ());
257- LOGGER .debug ("Only leader node {} for consuming from {}" , streamMetadata .getLeader (), stream );
262+ if (forceReplica ) {
263+ throw new IllegalStateException (
264+ format (
265+ "Only the leader node is available for consuming from %s and "
266+ + "consuming from leader has been deactivated for this consumer" ,
267+ stream ));
268+ } else {
269+ brokers = Collections .singletonList (streamMetadata .getLeader ());
270+ LOGGER .debug (
271+ "Only leader node {} for consuming from {}" , streamMetadata .getLeader (), stream );
272+ }
258273 } else {
259274 LOGGER .debug ("Replicas for consuming from {}: {}" , stream , replicas );
260275 brokers = new ArrayList <>(replicas );
@@ -265,6 +280,20 @@ List<Client.Broker> findBrokersForStream(String stream) {
265280 return brokers ;
266281 }
267282
283+ private Callable <List <Broker >> findBrokersForStream (String stream ) {
284+ AtomicInteger attemptNumber = new AtomicInteger ();
285+ return () -> {
286+ boolean mustUseReplica ;
287+ if (forceReplica ) {
288+ mustUseReplica =
289+ attemptNumber .incrementAndGet () <= MAX_ATTEMPT_BEFORE_FALLING_BACK_TO_LEADER ;
290+ } else {
291+ mustUseReplica = false ;
292+ }
293+ return findBrokersForStream (stream , mustUseReplica );
294+ };
295+ }
296+
268297 private Client .Broker pickBroker (List <Client .Broker > brokers ) {
269298 if (brokers .isEmpty ()) {
270299 return null ;
@@ -792,7 +821,7 @@ private void assignConsumersToStream(
792821 }
793822 };
794823
795- AsyncRetry .asyncRetry (() -> findBrokersForStream (stream ))
824+ AsyncRetry .asyncRetry (findBrokersForStream (stream ))
796825 .description ("Candidate lookup to consume from '%s'" , stream )
797826 .scheduler (environment .scheduledExecutorService ())
798827 .retry (ex -> !(ex instanceof StreamDoesNotExistException ))
@@ -885,9 +914,9 @@ private void recoverSubscription(List<Broker> candidates, SubscriptionTracker tr
885914 // maybe not a good candidate, let's refresh and retry for this one
886915 candidates =
887916 Utils .callAndMaybeRetry (
888- () -> findBrokersForStream (tracker .stream ),
917+ findBrokersForStream (tracker .stream ),
889918 ex -> !(ex instanceof StreamDoesNotExistException ),
890- environment . recoveryBackOffDelayPolicy (),
919+ recoveryBackOffDelayPolicy (),
891920 "Candidate lookup to consume from '%s' (subscription recovery)" ,
892921 tracker .stream );
893922 } catch (Exception e ) {
0 commit comments