Skip to content

Commit e93bc99

Browse files
committed
Do not provide credits on subscription closing
This can trigger unnecessary SUBSCRIPTION_ID_DOES_NOT_EXIST error messages.
1 parent 60a56c5 commit e93bc99

File tree

1 file changed

+19
-3
lines changed

1 file changed

+19
-3
lines changed

src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,7 @@ private static class SubscriptionTracker {
217217
private volatile long offset;
218218
private volatile byte subscriptionIdInClient;
219219
private volatile ClientSubscriptionsManager manager;
220+
private volatile boolean closing = false;
220221

221222
private SubscriptionTracker(
222223
StreamConsumer consumer,
@@ -230,6 +231,7 @@ private SubscriptionTracker(
230231
}
231232

232233
synchronized void cancel() {
234+
this.closing = true;
233235
if (this.manager != null) {
234236
LOGGER.debug("Removing consumer from manager");
235237
this.manager.remove(this);
@@ -238,6 +240,10 @@ synchronized void cancel() {
238240
}
239241
}
240242

243+
boolean isClosing() {
244+
return this.closing;
245+
}
246+
241247
synchronized void assign(byte subscriptionIdInClient, ClientSubscriptionsManager manager) {
242248
this.subscriptionIdInClient = subscriptionIdInClient;
243249
this.manager = manager;
@@ -367,14 +373,24 @@ private ClientSubscriptionsManager(
367373
IntStream.range(0, maxConsumersByConnection).forEach(i -> subscriptionTrackers.add(null));
368374
AtomicBoolean clientInitializedInManager = new AtomicBoolean(false);
369375
ChunkListener chunkListener =
370-
(client, subscriptionId, offset, messageCount, dataSize) ->
376+
(client, subscriptionId, offset, messageCount, dataSize) -> {
377+
SubscriptionTracker subscriptionTracker =
378+
subscriptionTrackers.get(subscriptionId & 0xFF);
379+
if (subscriptionTracker != null && !subscriptionTracker.isClosing()) {
371380
client.credit(subscriptionId, 1);
381+
} else {
382+
LOGGER.debug(
383+
"Could not find stream subscription {} or subscription closing, not providing credits",
384+
subscriptionId & 0xFF);
385+
}
386+
};
387+
372388
CreditNotification creditNotification =
373389
(subscriptionId, responseCode) ->
374390
LOGGER.debug(
375391
"Received credit notification for subscription {}: {}",
376-
subscriptionId,
377-
responseCode);
392+
subscriptionId & 0xFF,
393+
Utils.formatConstant(responseCode));
378394
MessageListener messageListener =
379395
(subscriptionId, offset, message) -> {
380396
SubscriptionTracker subscriptionTracker =

0 commit comments

Comments
 (0)