Skip to content

Commit 39fe702

Browse files
committed
Release permits only if possible
1 parent a8b01e0 commit 39fe702

File tree

3 files changed

+101
-16
lines changed

3 files changed

+101
-16
lines changed

src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -305,7 +305,7 @@ private static String jsonField(String name, Number value) {
305305
}
306306

307307
private static String jsonField(String name, String value) {
308-
return quote(name) + " : " + quote(value);
308+
return quote(name) + " : " + (value == null ? "null" : quote(value));
309309
}
310310

311311
@Override
@@ -317,13 +317,30 @@ public String toString() {
317317
this.managers.stream()
318318
.map(
319319
m -> {
320-
StringBuilder b = new StringBuilder("{");
321-
b.append(jsonField("id", m.id))
320+
StringBuilder managerBuilder = new StringBuilder("{");
321+
managerBuilder
322+
.append(jsonField("id", m.id))
322323
.append(",")
323324
.append(jsonField("node", m.name))
324325
.append(",")
325-
.append(jsonField("consumer_count", m.trackerCount));
326-
return b.append("}").toString();
326+
.append(jsonField("consumer_count", m.trackerCount))
327+
.append(",");
328+
managerBuilder.append("\"subscriptions\" : [");
329+
List<SubscriptionTracker> trackers = m.subscriptionTrackers;
330+
managerBuilder.append(
331+
trackers.stream()
332+
.filter(Objects::nonNull)
333+
.map(
334+
t -> {
335+
StringBuilder trackerBuilder = new StringBuilder("{");
336+
trackerBuilder.append(jsonField("stream", t.stream)).append(",");
337+
trackerBuilder.append(
338+
jsonField("subscription_id", t.subscriptionIdInClient));
339+
return trackerBuilder.append("}").toString();
340+
})
341+
.collect(Collectors.joining(",")));
342+
managerBuilder.append("]");
343+
return managerBuilder.append("}").toString();
327344
})
328345
.collect(Collectors.joining(",")));
329346
builder.append("],");

src/main/java/com/rabbitmq/stream/impl/ProducersCoordinator.java

Lines changed: 75 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -36,13 +36,15 @@
3636
import com.rabbitmq.stream.impl.Utils.ClientFactoryContext;
3737
import java.util.Collection;
3838
import java.util.Iterator;
39+
import java.util.List;
3940
import java.util.Map;
4041
import java.util.NavigableSet;
4142
import java.util.Objects;
4243
import java.util.Set;
4344
import java.util.concurrent.ConcurrentHashMap;
4445
import java.util.concurrent.ConcurrentMap;
4546
import java.util.concurrent.ConcurrentSkipListSet;
47+
import java.util.concurrent.CopyOnWriteArrayList;
4648
import java.util.concurrent.atomic.AtomicBoolean;
4749
import java.util.concurrent.atomic.AtomicLong;
4850
import java.util.concurrent.atomic.AtomicReference;
@@ -64,6 +66,8 @@ class ProducersCoordinator {
6466
private final AtomicLong managerIdSequence = new AtomicLong(0);
6567
private final NavigableSet<ClientProducersManager> managers = new ConcurrentSkipListSet<>();
6668
private final AtomicLong trackerIdSequence = new AtomicLong(0);
69+
// TODO remove the list of trackers (it's here just for debugging)
70+
private final List<ProducerTracker> producerTrackers = new CopyOnWriteArrayList<>();
6771

6872
ProducersCoordinator(
6973
StreamEnvironment environment,
@@ -83,9 +87,10 @@ private static String keyForNode(Client.Broker broker) {
8387
}
8488

8589
Runnable registerProducer(StreamProducer producer, String reference, String stream) {
86-
return registerAgentTracker(
87-
new ProducerTracker(trackerIdSequence.getAndIncrement(), reference, stream, producer),
88-
stream);
90+
ProducerTracker tracker =
91+
new ProducerTracker(trackerIdSequence.getAndIncrement(), reference, stream, producer);
92+
this.producerTrackers.add(tracker);
93+
return registerAgentTracker(tracker, stream);
8994
}
9095

9196
Runnable registerTrackingConsumer(StreamConsumer consumer) {
@@ -100,7 +105,16 @@ private Runnable registerAgentTracker(AgentTracker tracker, String stream) {
100105

101106
addToManager(broker, tracker);
102107

103-
return tracker::cancel;
108+
return () -> {
109+
if (tracker instanceof ProducerTracker) {
110+
try {
111+
this.producerTrackers.remove(tracker);
112+
} catch (Exception e) {
113+
LOGGER.debug("Error while removing producer tracker from list");
114+
}
115+
}
116+
tracker.cancel();
117+
};
104118
}
105119

106120
private void addToManager(Broker node, AgentTracker tracker) {
@@ -248,20 +262,68 @@ public String toString() {
248262
"tracking_consumer_count",
249263
this.managers.stream().mapToInt(m -> m.trackingConsumerTrackers.size()).sum()))
250264
.append(",");
265+
builder.append(jsonField("producer_tracker_count", this.producerTrackers.size())).append(",");
251266
builder.append(quote("clients")).append(" : [");
252267
builder.append(
253268
this.managers.stream()
254269
.map(
255270
m -> {
256-
StringBuilder b = new StringBuilder("{");
257-
b.append(jsonField("id", m.id))
271+
StringBuilder managerBuilder = new StringBuilder("{");
272+
managerBuilder
273+
.append(jsonField("id", m.id))
258274
.append(",")
259275
.append(jsonField("node", m.name))
260276
.append(",")
261277
.append(jsonField("producer_count", m.producers.size()))
262278
.append(",")
263279
.append(
264-
jsonField("tracking_consumer_count", m.trackingConsumerTrackers.size()));
280+
jsonField("tracking_consumer_count", m.trackingConsumerTrackers.size()))
281+
.append(",");
282+
managerBuilder.append("\"producers\" : [");
283+
managerBuilder.append(
284+
m.producers.values().stream()
285+
.map(
286+
p -> {
287+
StringBuilder producerBuilder = new StringBuilder("{");
288+
producerBuilder.append(jsonField("stream", p.stream())).append(",");
289+
producerBuilder.append(jsonField("producer_id", p.publisherId));
290+
return producerBuilder.append("}").toString();
291+
})
292+
.collect(Collectors.joining(",")));
293+
managerBuilder.append("],");
294+
managerBuilder.append("\"tracking_consumers\" : [");
295+
managerBuilder.append(
296+
m.trackingConsumerTrackers.stream()
297+
.map(
298+
t -> {
299+
StringBuilder trackerBuilder = new StringBuilder("{");
300+
trackerBuilder.append(jsonField("stream", t.stream()));
301+
return trackerBuilder.append("}").toString();
302+
})
303+
.collect(Collectors.joining(",")));
304+
managerBuilder.append("]");
305+
return managerBuilder.append("}").toString();
306+
})
307+
.collect(Collectors.joining(",")));
308+
builder.append("],");
309+
builder.append("\"producer_trackers\" : [");
310+
builder.append(
311+
this.producerTrackers.stream()
312+
.map(
313+
t -> {
314+
StringBuilder b = new StringBuilder("{");
315+
b.append(quote("stream")).append(":").append(quote(t.stream)).append(",");
316+
b.append(quote("node")).append(":");
317+
Client client = null;
318+
ClientProducersManager manager = t.clientProducersManager;
319+
if (manager != null) {
320+
client = manager.client;
321+
}
322+
if (client == null) {
323+
b.append("null");
324+
} else {
325+
b.append(quote(client.getHost() + ":" + client.getPort()));
326+
}
265327
return b.append("}").toString();
266328
})
267329
.collect(Collectors.joining(",")));
@@ -630,6 +692,11 @@ private void assignProducersToNewManagers(
630692
tracker -> {
631693
if (tracker.markRecoveryInProgress()) {
632694
recoverAgent(broker, tracker);
695+
} else {
696+
LOGGER.debug(
697+
"Not recovering {} (stream {}), recovery is already is progress",
698+
tracker.type(),
699+
tracker.stream());
633700
}
634701
});
635702
})
@@ -689,7 +756,7 @@ private void recoverAgent(Broker node, AgentTracker tracker) {
689756
tracker.stream());
690757
} catch (Exception e) {
691758
LOGGER.warn(
692-
"Error while re-assigning %s (stream '{}')", tracker.type(), tracker.stream(), e);
759+
"Error while re-assigning {} (stream '{}')", tracker.type(), tracker.stream(), e);
693760
reassignmentCompleted = true;
694761
}
695762
}

src/main/java/com/rabbitmq/stream/impl/StreamProducer.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -484,9 +484,10 @@ void running() {
484484
}
485485
}
486486
publishBatch(false);
487-
if (unconfirmedMessagesSemaphore.availablePermits() != maxUnconfirmedMessages) {
488-
unconfirmedMessagesSemaphore.release(
489-
maxUnconfirmedMessages - unconfirmedMessagesSemaphore.availablePermits());
487+
488+
int toRelease = maxUnconfirmedMessages - unconfirmedMessagesSemaphore.availablePermits();
489+
if (toRelease > 0) {
490+
unconfirmedMessagesSemaphore.release(toRelease);
490491
if (!unconfirmedMessagesSemaphore.tryAcquire(this.unconfirmedMessages.size())) {
491492
LOGGER.debug(
492493
"Could not acquire {} permit(s) for message republishing",

0 commit comments

Comments
 (0)