Skip to content

Commit e62b82e

Browse files
committed
Base retry completion on backoff delay policy
1 parent 1b71be8 commit e62b82e

File tree

4 files changed

+87
-64
lines changed

4 files changed

+87
-64
lines changed

src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java

Lines changed: 24 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,10 @@
1616
import static com.rabbitmq.stream.impl.Utils.convertCodeToException;
1717
import static com.rabbitmq.stream.impl.Utils.formatConstant;
1818
import static com.rabbitmq.stream.impl.Utils.isSac;
19+
import static com.rabbitmq.stream.impl.Utils.jsonField;
1920
import static com.rabbitmq.stream.impl.Utils.namedFunction;
2021
import static com.rabbitmq.stream.impl.Utils.namedRunnable;
22+
import static com.rabbitmq.stream.impl.Utils.quote;
2123

2224
import com.rabbitmq.stream.BackOffDelayPolicy;
2325
import com.rabbitmq.stream.Constants;
@@ -224,7 +226,6 @@ private void addToManager(
224226
}
225227
}
226228

227-
// for testing
228229
int managerCount() {
229230
return this.managers.size();
230231
}
@@ -297,22 +298,6 @@ public void close() {
297298
}
298299
}
299300

300-
private static String quote(String value) {
301-
if (value == null) {
302-
return "null";
303-
} else {
304-
return "\"" + value + "\"";
305-
}
306-
}
307-
308-
private static String jsonField(String name, Number value) {
309-
return quote(name) + " : " + value.longValue();
310-
}
311-
312-
private static String jsonField(String name, String value) {
313-
return quote(name) + " : " + (value == null ? "null" : quote(value));
314-
}
315-
316301
@Override
317302
public String toString() {
318303
StringBuilder builder = new StringBuilder("{");
@@ -768,17 +753,7 @@ private void assignConsumersToStream(
768753
consumersClosingCallback.run();
769754
} else {
770755
for (SubscriptionTracker affectedSubscription : subscriptions) {
771-
if (affectedSubscription.compareAndSet(
772-
SubscriptionState.ACTIVE, SubscriptionState.RECOVERING)) {
773-
recoverSubscription(candidates, affectedSubscription);
774-
} else {
775-
LOGGER.debug(
776-
"Not recovering consumer {} from stream {}, state is {}, expected is {}",
777-
affectedSubscription.consumer.id(),
778-
affectedSubscription.stream,
779-
affectedSubscription.state(),
780-
SubscriptionState.ACTIVE);
781-
}
756+
maybeRecoverSubscription(candidates, affectedSubscription);
782757
}
783758
if (maybeCloseClient) {
784759
this.closeIfEmpty();
@@ -800,6 +775,27 @@ private void assignConsumersToStream(
800775
});
801776
}
802777

778+
private void maybeRecoverSubscription(List<Broker> candidates, SubscriptionTracker tracker) {
779+
if (tracker.compareAndSet(SubscriptionState.ACTIVE, SubscriptionState.RECOVERING)) {
780+
try {
781+
recoverSubscription(candidates, tracker);
782+
} catch (Exception e) {
783+
LOGGER.warn(
784+
"Error while recovering consumer {} from stream '{}'. Reason: {}",
785+
tracker.consumer.id(),
786+
tracker.stream,
787+
Utils.exceptionMessage(e));
788+
}
789+
} else {
790+
LOGGER.debug(
791+
"Not recovering consumer {} from stream {}, state is {}, expected is {}",
792+
tracker.consumer.id(),
793+
tracker.stream,
794+
tracker.state(),
795+
SubscriptionState.ACTIVE);
796+
}
797+
}
798+
803799
private void recoverSubscription(List<Broker> candidates, SubscriptionTracker tracker) {
804800
boolean reassignmentCompleted = false;
805801
while (!reassignmentCompleted) {

src/main/java/com/rabbitmq/stream/impl/ProducersCoordinator.java

Lines changed: 23 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,10 @@
1515

1616
import static com.rabbitmq.stream.impl.Utils.callAndMaybeRetry;
1717
import static com.rabbitmq.stream.impl.Utils.formatConstant;
18+
import static com.rabbitmq.stream.impl.Utils.jsonField;
1819
import static com.rabbitmq.stream.impl.Utils.namedFunction;
1920
import static com.rabbitmq.stream.impl.Utils.namedRunnable;
21+
import static com.rabbitmq.stream.impl.Utils.quote;
2022
import static java.util.stream.Collectors.toSet;
2123

2224
import com.rabbitmq.stream.BackOffDelayPolicy;
@@ -238,22 +240,6 @@ int nodesConnected() {
238240
return this.managers.stream().map(m -> m.name).collect(toSet()).size();
239241
}
240242

241-
private static String quote(String value) {
242-
if (value == null) {
243-
return "null";
244-
} else {
245-
return "\"" + value + "\"";
246-
}
247-
}
248-
249-
private static String jsonField(String name, Number value) {
250-
return quote(name) + " : " + value.longValue();
251-
}
252-
253-
private static String jsonField(String name, String value) {
254-
return quote(name) + " : " + quote(value);
255-
}
256-
257243
@Override
258244
public String toString() {
259245
StringBuilder builder = new StringBuilder("{");
@@ -700,17 +686,7 @@ private void assignProducersToNewManagers(
700686
broker -> {
701687
String key = keyForNode(broker);
702688
LOGGER.debug("Assigning {} producer(s) to {}", trackers.size(), key);
703-
trackers.forEach(
704-
tracker -> {
705-
if (tracker.markRecoveryInProgress()) {
706-
recoverAgent(broker, tracker);
707-
} else {
708-
LOGGER.debug(
709-
"Not recovering {} (stream '{}'), recovery is already is progress",
710-
tracker.type(),
711-
tracker.stream());
712-
}
713-
});
689+
trackers.forEach(tracker -> maybeRecoverAgent(broker, tracker));
714690
})
715691
.exceptionally(
716692
ex -> {
@@ -735,6 +711,26 @@ private void assignProducersToNewManagers(
735711
});
736712
}
737713

714+
private void maybeRecoverAgent(Broker broker, AgentTracker tracker) {
715+
if (tracker.markRecoveryInProgress()) {
716+
try {
717+
recoverAgent(broker, tracker);
718+
} catch (Exception e) {
719+
LOGGER.warn(
720+
"Error while recovering {} tracker {} (stream '{}'). Reason: {}",
721+
tracker.type(),
722+
tracker.uniqueId(),
723+
tracker.stream(),
724+
Utils.exceptionMessage(e));
725+
}
726+
} else {
727+
LOGGER.debug(
728+
"Not recovering {} (stream '{}'), recovery is already is progress",
729+
tracker.type(),
730+
tracker.stream());
731+
}
732+
}
733+
738734
private void recoverAgent(Broker node, AgentTracker tracker) {
739735
boolean reassignmentCompleted = false;
740736
while (!reassignmentCompleted) {

src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -812,6 +812,10 @@ public String toString() {
812812
})
813813
.collect(Collectors.joining(","))
814814
+ "], "
815+
+ Utils.jsonField("producer_client_count", this.producersCoordinator.clientCount())
816+
+ ","
817+
+ Utils.jsonField("consumer_client_count", this.consumersCoordinator.managerCount())
818+
+ ","
815819
+ "\"producers\" : "
816820
+ this.producersCoordinator
817821
+ ", \"consumers\" : "

src/main/java/com/rabbitmq/stream/impl/Utils.java

Lines changed: 36 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,12 @@ static <T, R> Function<T, R> namedFunction(Function<T, R> task, String format, O
165165

166166
static <T> T callAndMaybeRetry(
167167
Supplier<T> operation, Predicate<Exception> retryCondition, String format, Object... args) {
168-
return callAndMaybeRetry(operation, retryCondition, i -> Duration.ZERO, format, args);
168+
return callAndMaybeRetry(
169+
operation,
170+
retryCondition,
171+
i -> i >= 3 ? BackOffDelayPolicy.TIMEOUT : Duration.ZERO,
172+
format,
173+
args);
169174
}
170175

171176
static <T> T callAndMaybeRetry(
@@ -177,32 +182,38 @@ static <T> T callAndMaybeRetry(
177182
String description = format(format, args);
178183
int attempt = 0;
179184
Exception lastException = null;
180-
while (attempt++ < 3) {
185+
boolean keepTrying = true;
186+
while (keepTrying) {
181187
try {
182-
return operation.get();
188+
attempt++;
189+
T result = operation.get();
190+
LOGGER.debug("Operation '{}' completed after {} attempt(s)", description, attempt);
191+
return result;
183192
} catch (Exception e) {
184193
lastException = e;
185194
if (retryCondition.test(e)) {
186195
LOGGER.debug("Operation '{}' failed, retrying...", description);
187-
Duration delay = delayPolicy.delay(attempt - 1);
188-
if (!delay.isZero()) {
196+
Duration delay = delayPolicy.delay(attempt);
197+
if (BackOffDelayPolicy.TIMEOUT.equals(delay)) {
198+
keepTrying = false;
199+
} else if (!delay.isZero()) {
189200
try {
190201
Thread.sleep(delay.toMillis());
191202
} catch (InterruptedException ex) {
192203
Thread.interrupted();
193204
lastException = ex;
194-
break;
205+
keepTrying = false;
195206
}
196207
}
197208
} else {
198-
break;
209+
keepTrying = false;
199210
}
200211
}
201212
}
202213
String message =
203214
format(
204-
"Could not complete task '%s' after %d attempt(s) (reason: {})",
205-
description, --attempt, exceptionMessage(lastException));
215+
"Could not complete task '%s' after %d attempt(s) (reason: %s)",
216+
description, attempt, exceptionMessage(lastException));
206217
LOGGER.debug(message);
207218
if (lastException == null) {
208219
throw new StreamException(message);
@@ -455,4 +466,20 @@ public String toString() {
455466
return this.name;
456467
}
457468
}
469+
470+
static String quote(String value) {
471+
if (value == null) {
472+
return "null";
473+
} else {
474+
return "\"" + value + "\"";
475+
}
476+
}
477+
478+
static String jsonField(String name, Number value) {
479+
return quote(name) + " : " + value.longValue();
480+
}
481+
482+
static String jsonField(String name, String value) {
483+
return quote(name) + " : " + quote(value);
484+
}
458485
}

0 commit comments

Comments
 (0)