Skip to content

Commit a8b01e0

Browse files
committed
Remove per-node data structure in producer coordinator
It would contain the client connections for a given node. It was difficult to maintain the consistency between those structures, so we're better off not using this layer. Now client connections are looked up with a linear scan, which is good enough, as the number of connections should remain under the thousand.
1 parent 8841de1 commit a8b01e0

File tree

9 files changed

+507
-325
lines changed

9 files changed

+507
-325
lines changed

src/main/java/com/rabbitmq/stream/impl/Client.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -723,6 +723,9 @@ public Response declarePublisher(byte publisherId, String publisherReference, St
723723
channel.writeAndFlush(bb);
724724
request.block();
725725
return request.response.get();
726+
} catch (StreamException e) {
727+
outstandingRequests.remove(correlationId);
728+
throw e;
726729
} catch (RuntimeException e) {
727730
outstandingRequests.remove(correlationId);
728731
throw new StreamException(e);
@@ -744,6 +747,9 @@ public Response deletePublisher(byte publisherId) {
744747
channel.writeAndFlush(bb);
745748
request.block();
746749
return request.response.get();
750+
} catch (StreamException e) {
751+
outstandingRequests.remove(correlationId);
752+
throw e;
747753
} catch (RuntimeException e) {
748754
outstandingRequests.remove(correlationId);
749755
throw new StreamException(e);
@@ -2048,6 +2054,10 @@ public boolean equals(Object o) {
20482054
public int hashCode() {
20492055
return Objects.hash(host, port);
20502056
}
2057+
2058+
String label() {
2059+
return this.host + ":" + this.port;
2060+
}
20512061
}
20522062

20532063
public static class ClientParameters {

src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java

Lines changed: 27 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import static com.rabbitmq.stream.impl.Utils.isSac;
1919
import static com.rabbitmq.stream.impl.Utils.namedFunction;
2020
import static com.rabbitmq.stream.impl.Utils.namedRunnable;
21-
import static java.lang.String.format;
2221

2322
import com.rabbitmq.stream.BackOffDelayPolicy;
2423
import com.rabbitmq.stream.Constants;
@@ -43,7 +42,6 @@
4342
import com.rabbitmq.stream.impl.Utils.ClientConnectionType;
4443
import com.rabbitmq.stream.impl.Utils.ClientFactory;
4544
import com.rabbitmq.stream.impl.Utils.ClientFactoryContext;
46-
import java.time.Duration;
4745
import java.util.ArrayList;
4846
import java.util.Collection;
4947
import java.util.Collections;
@@ -63,7 +61,6 @@
6361
import java.util.concurrent.atomic.AtomicReference;
6462
import java.util.function.Function;
6563
import java.util.function.Predicate;
66-
import java.util.function.Supplier;
6764
import java.util.stream.Collectors;
6865
import java.util.stream.IntStream;
6966
import org.slf4j.Logger;
@@ -171,11 +168,14 @@ private void addToManager(
171168
pickedManager = iterator.next();
172169
if (pickedManager.isClosed()) {
173170
iterator.remove();
174-
}
175-
if (node.equals(pickedManager.node) && !pickedManager.isFull()) {
176-
break;
177-
} else {
178171
pickedManager = null;
172+
} else {
173+
if (node.equals(pickedManager.node) && !pickedManager.isFull()) {
174+
// let's try this one
175+
break;
176+
} else {
177+
pickedManager = null;
178+
}
179179
}
180180
}
181181
if (pickedManager == null) {
@@ -744,7 +744,7 @@ private void assignConsumersToStream(
744744
for (SubscriptionTracker affectedSubscription : subscriptions) {
745745
if (affectedSubscription.compareAndSet(
746746
SubscriptionState.ACTIVE, SubscriptionState.RECOVERING)) {
747-
recoverSubscription(stream, candidates, affectedSubscription);
747+
recoverSubscription(candidates, affectedSubscription);
748748
} else {
749749
LOGGER.debug(
750750
"Not recovering consumer {} from stream {}, state is {}, expected is {}",
@@ -774,51 +774,46 @@ private void assignConsumersToStream(
774774
});
775775
}
776776

777-
private void recoverSubscription(
778-
String stream, List<Broker> candidates, SubscriptionTracker affectedSubscription) {
777+
private void recoverSubscription(List<Broker> candidates, SubscriptionTracker tracker) {
779778
boolean reassignmentCompleted = false;
780779
while (!reassignmentCompleted) {
781780
try {
782-
if (affectedSubscription.consumer.isOpen()) {
781+
if (tracker.consumer.isOpen()) {
783782
Broker broker = pickBroker(candidates);
784-
LOGGER.debug("Using {} to resume consuming from {}", broker, stream);
785-
synchronized (affectedSubscription.consumer) {
786-
if (affectedSubscription.consumer.isOpen()) {
783+
LOGGER.debug("Using {} to resume consuming from {}", broker, tracker.stream);
784+
synchronized (tracker.consumer) {
785+
if (tracker.consumer.isOpen()) {
787786
OffsetSpecification offsetSpecification;
788-
if (affectedSubscription.hasReceivedSomething) {
789-
offsetSpecification = OffsetSpecification.offset(affectedSubscription.offset);
787+
if (tracker.hasReceivedSomething) {
788+
offsetSpecification = OffsetSpecification.offset(tracker.offset);
790789
} else {
791-
offsetSpecification = affectedSubscription.initialOffsetSpecification;
790+
offsetSpecification = tracker.initialOffsetSpecification;
792791
}
793-
addToManager(broker, affectedSubscription, offsetSpecification, false);
794-
reassignmentCompleted = true;
795-
} else {
796-
reassignmentCompleted = true;
792+
addToManager(broker, tracker, offsetSpecification, false);
797793
}
798794
}
799795
} else {
800796
LOGGER.debug("Not re-assigning consumer because it has been closed");
801-
reassignmentCompleted = true;
802797
}
798+
reassignmentCompleted = true;
803799
} catch (ConnectionStreamException
804800
| ClientClosedException
805801
| StreamNotAvailableException e) {
806802
LOGGER.debug(
807803
"Consumer {} re-assignment on stream {} timed out or connection closed or stream not available, "
808804
+ "refreshing candidates and retrying",
809-
affectedSubscription.consumer.id(),
810-
affectedSubscription.stream);
805+
tracker.consumer.id(),
806+
tracker.stream);
811807
// maybe not a good candidate, let's refresh and retry for this one
812808
candidates =
813-
callAndMaybeRetry(
814-
() -> findBrokersForStream(stream),
809+
Utils.callAndMaybeRetry(
810+
() -> findBrokersForStream(tracker.stream),
815811
ex -> !(ex instanceof StreamDoesNotExistException),
816812
environment.recoveryBackOffDelayPolicy(),
817813
"Candidate lookup to consume from '%s'",
818-
stream);
819-
814+
tracker.stream);
820815
} catch (Exception e) {
821-
LOGGER.warn("Error while re-assigning subscription from stream {}", stream, e);
816+
LOGGER.warn("Error while re-assigning subscription from stream {}", tracker.stream, e);
822817
reassignmentCompleted = true;
823818
}
824819
}
@@ -873,7 +868,7 @@ synchronized void add(
873868
if (offsetTrackingReference != null) {
874869
checkNotClosed();
875870
QueryOffsetResponse queryOffsetResponse =
876-
callAndMaybeRetry(
871+
Utils.callAndMaybeRetry(
877872
() -> client.queryOffset(offsetTrackingReference, subscriptionTracker.stream),
878873
RETRY_ON_TIMEOUT,
879874
"Offset query for consumer %s on stream '%s' (reference %s)",
@@ -917,7 +912,7 @@ synchronized void add(
917912
// FIXME consider using fewer initial credits
918913
byte subId = subscriptionId;
919914
Client.Response subscribeResponse =
920-
callAndMaybeRetry(
915+
Utils.callAndMaybeRetry(
921916
() ->
922917
client.subscribe(
923918
subId,
@@ -968,7 +963,7 @@ synchronized void remove(SubscriptionTracker subscriptionTracker) {
968963
byte subscriptionIdInClient = subscriptionTracker.subscriptionIdInClient;
969964
try {
970965
Client.Response unsubscribeResponse =
971-
callAndMaybeRetry(
966+
Utils.callAndMaybeRetry(
972967
() -> client.unsubscribe(subscriptionIdInClient),
973968
RETRY_ON_TIMEOUT,
974969
"Unsubscribe request for consumer %d on stream '%s'",
@@ -1129,54 +1124,6 @@ private static void maybeExchangeCommandVersions(Client client) {
11291124
}
11301125
}
11311126

1132-
static <T> T callAndMaybeRetry(
1133-
Supplier<T> operation, Predicate<Exception> retryCondition, String format, Object... args) {
1134-
return callAndMaybeRetry(operation, retryCondition, i -> Duration.ZERO, format, args);
1135-
}
1136-
1137-
static <T> T callAndMaybeRetry(
1138-
Supplier<T> operation,
1139-
Predicate<Exception> retryCondition,
1140-
BackOffDelayPolicy delayPolicy,
1141-
String format,
1142-
Object... args) {
1143-
String description = format(format, args);
1144-
int attempt = 0;
1145-
Exception lastException = null;
1146-
while (attempt++ < 3) {
1147-
try {
1148-
return operation.get();
1149-
} catch (Exception e) {
1150-
lastException = e;
1151-
if (retryCondition.test(e)) {
1152-
LOGGER.debug("Operation '{}' failed, retrying...", description);
1153-
Duration delay = delayPolicy.delay(attempt - 1);
1154-
if (!delay.isZero()) {
1155-
try {
1156-
Thread.sleep(delay.toMillis());
1157-
} catch (InterruptedException ex) {
1158-
Thread.interrupted();
1159-
lastException = ex;
1160-
break;
1161-
}
1162-
}
1163-
} else {
1164-
break;
1165-
}
1166-
}
1167-
}
1168-
String message =
1169-
format("Could not complete task '%s' after %d attempt(s)", description, --attempt);
1170-
LOGGER.debug(message);
1171-
if (lastException == null) {
1172-
throw new StreamException(message);
1173-
} else if (lastException instanceof RuntimeException) {
1174-
throw (RuntimeException) lastException;
1175-
} else {
1176-
throw new StreamException(message, lastException);
1177-
}
1178-
}
1179-
11801127
private static final Predicate<Exception> RETRY_ON_TIMEOUT =
11811128
e -> e instanceof TimeoutStreamException;
11821129

0 commit comments

Comments
 (0)