From aaff1cc322b5e26ced53dfe40090964e39eb97e8 Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Sun, 20 Apr 2025 02:07:17 -0400 Subject: [PATCH 1/3] AutorecoveringConnection: clean up bindings of deleted exchanges so that they (the bindings) do not reappear after connection recovery. Noticed while working on ruby-amqp/bunny#704. --- .../recovery/AutorecoveringConnection.java | 22 +++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java b/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java index 48bb7b130..87cea2a2c 100644 --- a/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java +++ b/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java @@ -1095,8 +1095,12 @@ void recordExchange(String exchange, RecordedExchange x) { void deleteRecordedExchange(String exchange) { this.recordedExchanges.remove(exchange); - Set xs = this.removeBindingsWithDestination(exchange); - for (RecordedBinding b : xs) { + Set xs1 = this.removeBindingsWithDestination(exchange); + for (RecordedBinding b : xs1) { + this.maybeDeleteRecordedAutoDeleteExchange(b.getSource()); + } + Set xs2 = this.removeBindingsWithSource(exchange); + for (RecordedBinding b : xs2) { this.maybeDeleteRecordedAutoDeleteExchange(b.getSource()); } } @@ -1173,6 +1177,20 @@ Set removeBindingsWithDestination(String s) { return result; } + Set removeBindingsWithSource(String s) { + final Set result = new LinkedHashSet<>(); + synchronized (this.recordedBindings) { + for (Iterator it = this.recordedBindings.iterator(); it.hasNext(); ) { + RecordedBinding b = it.next(); + if (b.getSource().equals(s)) { + it.remove(); + result.add(b); + } + } + } + return result; + } + public Map getRecordedQueues() { return recordedQueues; } From b451798ea64b7d5606cca5f74f7a63689c3afbef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= <514737+acogoluegnes@users.noreply.github.com> Date: Mon, 5 May 2025 17:56:40 +0200 Subject: [PATCH 2/3] Add test for recorded binding clean-up after exchange deletion --- .../recovery/AutorecoveringConnection.java | 18 ++++++------------ .../test/functional/ConnectionRecovery.java | 19 +++++++++++++++++++ 2 files changed, 25 insertions(+), 12 deletions(-) diff --git a/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java b/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java index 87cea2a2c..f9e910515 100644 --- a/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java +++ b/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java @@ -1164,25 +1164,19 @@ boolean hasMoreConsumersOnQueue(Collection consumers, String q } Set removeBindingsWithDestination(String s) { - final Set result = new LinkedHashSet<>(); - synchronized (this.recordedBindings) { - for (Iterator it = this.recordedBindings.iterator(); it.hasNext(); ) { - RecordedBinding b = it.next(); - if(b.getDestination().equals(s)) { - it.remove(); - result.add(b); - } - } - } - return result; + return this.removeBindingsWithCondition(b -> b.getSource().equals(s)); } Set removeBindingsWithSource(String s) { + return this.removeBindingsWithCondition(b -> b.getSource().equals(s)); + } + + private Set removeBindingsWithCondition(Predicate condition) { final Set result = new LinkedHashSet<>(); synchronized (this.recordedBindings) { for (Iterator it = this.recordedBindings.iterator(); it.hasNext(); ) { RecordedBinding b = it.next(); - if (b.getSource().equals(s)) { + if (condition.test(b)) { it.remove(); result.add(b); } diff --git a/src/test/java/com/rabbitmq/client/test/functional/ConnectionRecovery.java b/src/test/java/com/rabbitmq/client/test/functional/ConnectionRecovery.java index 4257c5a7b..5145929eb 100644 --- a/src/test/java/com/rabbitmq/client/test/functional/ConnectionRecovery.java +++ b/src/test/java/com/rabbitmq/client/test/functional/ConnectionRecovery.java @@ -865,6 +865,21 @@ public void handleDelivery(String consumerTag, Envelope envelope, BasicPropertie } } + @Test public void thatBindingFromDeletedExchangeIsDeleted() throws IOException, InterruptedException { + String q = generateQueueName(); + channel.queueDeclare(q, false, false, false, null); + try { + String x = generateExchangeName(); + channel.exchangeDeclare(x, "fanout"); + channel.queueBind(q, x, ""); + assertRecordedBinding(connection, 1); + channel.exchangeDelete(x); + assertRecordedBinding(connection, 0); + } finally { + channel.queueDelete(q); + } + } + private void assertConsumerCount(int exp, String q) throws IOException { assertThat(channel.queueDeclarePassive(q).getConsumerCount()).isEqualTo(exp); } @@ -1017,4 +1032,8 @@ private static void assertRecordedQueues(Connection conn, int size) { private static void assertRecordedExchanges(Connection conn, int size) { assertThat(((AutorecoveringConnection)conn).getRecordedExchanges()).hasSize(size); } + + private static void assertRecordedBinding(Connection conn, int size) { + assertThat(((AutorecoveringConnection)conn).getRecordedBindings()).hasSize(size); + } } From 8dff80c4f26138263cbb570a92eb1fac0a5f47b7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= <514737+acogoluegnes@users.noreply.github.com> Date: Mon, 5 May 2025 19:51:45 +0200 Subject: [PATCH 3/3] Fix condition to delete exchange binding --- .../rabbitmq/client/impl/recovery/AutorecoveringConnection.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java b/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java index f9e910515..0e3e82d95 100644 --- a/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java +++ b/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java @@ -1164,7 +1164,7 @@ boolean hasMoreConsumersOnQueue(Collection consumers, String q } Set removeBindingsWithDestination(String s) { - return this.removeBindingsWithCondition(b -> b.getSource().equals(s)); + return this.removeBindingsWithCondition(b -> b.getDestination().equals(s)); } Set removeBindingsWithSource(String s) {