From de6b708e1941e0960f4e5c715fcbf3174936599f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= <514737+acogoluegnes@users.noreply.github.com> Date: Tue, 6 May 2025 16:55:11 +0200 Subject: [PATCH] Null-proof channel state retrieval in AbstractMetricsCollector A channel message may have been closed by the time a metrics method is called (e.g. when cancelling a consumer the metrics collector is called after the RPC is completed), so the channel state may no longer be there. This commit null-proofs methods rely on the channel state retrieval. Fixes #1592 --- .../client/impl/AbstractMetricsCollector.java | 23 +++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/src/main/java/com/rabbitmq/client/impl/AbstractMetricsCollector.java b/src/main/java/com/rabbitmq/client/impl/AbstractMetricsCollector.java index 7ea19ae93..8f7c9b332 100644 --- a/src/main/java/com/rabbitmq/client/impl/AbstractMetricsCollector.java +++ b/src/main/java/com/rabbitmq/client/impl/AbstractMetricsCollector.java @@ -115,9 +115,12 @@ public void basicPublish(Channel channel, long deliveryTag) { try { if (deliveryTag != 0) { ChannelState channelState = channelState(channel); + if (channelState == null) { + return; + } channelState.lock.lock(); try { - channelState(channel).unconfirmedMessageDeliveryTags.add(deliveryTag); + channelState.unconfirmedMessageDeliveryTags.add(deliveryTag); } finally { channelState.lock.unlock(); } @@ -169,9 +172,12 @@ public void basicConsume(Channel channel, String consumerTag, boolean autoAck) { try { if(!autoAck) { ChannelState channelState = channelState(channel); + if (channelState == null) { + return; + } channelState.lock.lock(); try { - channelState(channel).consumersWithManualAck.add(consumerTag); + channelState.consumersWithManualAck.add(consumerTag); } finally { channelState.lock.unlock(); } @@ -185,9 +191,12 @@ public void basicConsume(Channel channel, String consumerTag, boolean autoAck) { public void basicCancel(Channel channel, String consumerTag) { try { ChannelState channelState = channelState(channel); + if (channelState == null) { + return; + } channelState.lock.lock(); try { - channelState(channel).consumersWithManualAck.remove(consumerTag); + channelState.consumersWithManualAck.remove(consumerTag); } finally { channelState.lock.unlock(); } @@ -202,9 +211,12 @@ public void consumedMessage(Channel channel, long deliveryTag, boolean autoAck) markConsumedMessage(); if(!autoAck) { ChannelState channelState = channelState(channel); + if (channelState == null) { + return; + } channelState.lock.lock(); try { - channelState(channel).unackedMessageDeliveryTags.add(deliveryTag); + channelState.unackedMessageDeliveryTags.add(deliveryTag); } finally { channelState.lock.unlock(); } @@ -219,6 +231,9 @@ public void consumedMessage(Channel channel, long deliveryTag, String consumerTa try { markConsumedMessage(); ChannelState channelState = channelState(channel); + if (channelState == null) { + return; + } channelState.lock.lock(); try { if(channelState.consumersWithManualAck.contains(consumerTag)) {