Skip to content

Commit f0a1fc8

Browse files
Invalidate recorded auto-delete exchanges after queue.delete and exchange.delete
When the destination end of a binding is deleted, so is the binding.
1 parent e207d97 commit f0a1fc8

File tree

3 files changed

+74
-5
lines changed

3 files changed

+74
-5
lines changed

src/com/rabbitmq/client/impl/recovery/AutorecoveringChannel.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import com.rabbitmq.client.ReturnListener;
1515
import com.rabbitmq.client.ShutdownListener;
1616
import com.rabbitmq.client.ShutdownSignalException;
17+
import com.rabbitmq.client.impl.AMQImpl;
1718

1819
import java.io.IOException;
1920
import java.util.ArrayList;
@@ -221,11 +222,11 @@ public AMQP.Exchange.DeleteOk exchangeDelete(String exchange) throws IOException
221222
}
222223

223224
public AMQP.Exchange.BindOk exchangeBind(String destination, String source, String routingKey) throws IOException {
224-
return delegate.exchangeBind(destination, source, routingKey);
225+
return exchangeBind(destination, source, routingKey, null);
225226
}
226227

227228
public AMQP.Exchange.BindOk exchangeBind(String destination, String source, String routingKey, Map<String, Object> arguments) throws IOException {
228-
AMQP.Exchange.BindOk ok = delegate.exchangeBind(destination, source, routingKey, arguments);
229+
final AMQP.Exchange.BindOk ok = delegate.exchangeBind(destination, source, routingKey, arguments);
229230
recordExchangeBinding(destination, source, routingKey, arguments);
230231
return ok;
231232
}
@@ -241,6 +242,7 @@ public AMQP.Exchange.UnbindOk exchangeUnbind(String destination, String source,
241242

242243
public AMQP.Exchange.UnbindOk exchangeUnbind(String destination, String source, String routingKey, Map<String, Object> arguments) throws IOException {
243244
deleteRecordedExchangeBinding(destination, source, routingKey, arguments);
245+
this.maybeDeleteRecordedAutoDeleteExchange(source);
244246
return delegate.exchangeUnbind(destination, source, routingKey, arguments);
245247
}
246248

src/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,11 @@
2121
import java.util.ArrayList;
2222
import java.util.Collection;
2323
import java.util.HashMap;
24+
import java.util.HashSet;
25+
import java.util.Iterator;
2426
import java.util.List;
2527
import java.util.Map;
28+
import java.util.Set;
2629
import java.util.concurrent.ConcurrentHashMap;
2730

2831
/**
@@ -626,6 +629,10 @@ void recordQueue(String queue, RecordedQueue meta) {
626629

627630
void deleteRecordedQueue(String queue) {
628631
this.recordedQueues.remove(queue);
632+
Set<RecordedBinding> xs = this.removeBindingsWithDestination(queue);
633+
for (RecordedBinding b : xs) {
634+
this.maybeDeleteRecordedAutoDeleteExchange(b.getSource());
635+
}
629636
}
630637

631638
void recordExchange(String exchange, RecordedExchange x) {
@@ -634,6 +641,10 @@ void recordExchange(String exchange, RecordedExchange x) {
634641

635642
void deleteRecordedExchange(String exchange) {
636643
this.recordedExchanges.remove(exchange);
644+
Set<RecordedBinding> xs = this.removeBindingsWithDestination(exchange);
645+
for (RecordedBinding b : xs) {
646+
this.maybeDeleteRecordedAutoDeleteExchange(b.getSource());
647+
}
637648
}
638649

639650
void recordConsumer(String result, RecordedConsumer consumer) {
@@ -692,6 +703,18 @@ boolean hasMoreConsumersOnQueue(Collection<RecordedConsumer> consumers, String q
692703
return result;
693704
}
694705

706+
Set<RecordedBinding> removeBindingsWithDestination(String s) {
707+
Set<RecordedBinding> result = new HashSet<RecordedBinding>();
708+
for (Iterator<RecordedBinding> it = this.recordedBindings.iterator(); it.hasNext(); ) {
709+
RecordedBinding b = it.next();
710+
if(b.getDestination().equals(s)) {
711+
it.remove();
712+
result.add(b);
713+
}
714+
}
715+
return result;
716+
}
717+
695718
public Map<String, RecordedQueue> getRecordedQueues() {
696719
return recordedQueues;
697720
}

test/src/com/rabbitmq/client/test/functional/ConnectionRecovery.java

Lines changed: 47 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,6 @@ protected void testClientNamedQueueRecoveryWith(String q, boolean noWait) throws
206206
ch.queueDelete(q);
207207
}
208208

209-
// bug 26364
210209
public void testDeclarationOfManyAutoDeleteQueuesWithTransientConsumer() throws IOException {
211210
Channel ch = connection.createChannel();
212211
assertEquals(0, ((AutorecoveringConnection)connection).getRecordedQueues().size());
@@ -221,8 +220,7 @@ public void testDeclarationOfManyAutoDeleteQueuesWithTransientConsumer() throws
221220
ch.close();
222221
}
223222

224-
// bug 26364
225-
public void testDeclarationofManyAutoDeleteExchangesWithTransientQueues() throws IOException {
223+
public void testDeclarationofManyAutoDeleteExchangesWithTransientQueuesThatAreUnbound() throws IOException {
226224
Channel ch = connection.createChannel();
227225
assertEquals(0, ((AutorecoveringConnection)connection).getRecordedExchanges().size());
228226
for(int i = 0; i < 5000; i++) {
@@ -238,6 +236,52 @@ public void testDeclarationofManyAutoDeleteExchangesWithTransientQueues() throws
238236
ch.close();
239237
}
240238

239+
public void testDeclarationofManyAutoDeleteExchangesWithTransientQueuesThatAreDeleted() throws IOException {
240+
Channel ch = connection.createChannel();
241+
assertEquals(0, ((AutorecoveringConnection)connection).getRecordedExchanges().size());
242+
for(int i = 0; i < 5000; i++) {
243+
String x = UUID.randomUUID().toString();
244+
ch.exchangeDeclare(x, "fanout", false, true, null);
245+
String q = ch.queueDeclare().getQueue();
246+
ch.queueBind(q, x, "doesn't matter");
247+
ch.queueDelete(q);
248+
}
249+
assertEquals(0, ((AutorecoveringConnection)connection).getRecordedExchanges().size());
250+
ch.close();
251+
}
252+
253+
public void testDeclarationofManyAutoDeleteExchangesWithTransientExchangesThatAreUnbound() throws IOException {
254+
Channel ch = connection.createChannel();
255+
assertEquals(0, ((AutorecoveringConnection)connection).getRecordedExchanges().size());
256+
for(int i = 0; i < 5000; i++) {
257+
String src = UUID.randomUUID().toString();
258+
String dest = UUID.randomUUID().toString();
259+
ch.exchangeDeclare(src, "fanout", false, true, null);
260+
ch.exchangeDeclare(dest, "fanout", false, true, null);
261+
final String rk = "doesn't matter";
262+
ch.exchangeBind(dest, src, rk);
263+
ch.exchangeUnbind(dest, src, rk);
264+
ch.exchangeDelete(dest);
265+
}
266+
assertEquals(0, ((AutorecoveringConnection)connection).getRecordedExchanges().size());
267+
ch.close();
268+
}
269+
270+
public void testDeclarationofManyAutoDeleteExchangesWithTransientExchangesThatAreDeleted() throws IOException {
271+
Channel ch = connection.createChannel();
272+
assertEquals(0, ((AutorecoveringConnection)connection).getRecordedExchanges().size());
273+
for(int i = 0; i < 100; i++) {
274+
String src = "src-" + UUID.randomUUID().toString();
275+
String dest = "dest-" + UUID.randomUUID().toString();
276+
ch.exchangeDeclare(src, "fanout", false, true, null);
277+
ch.exchangeDeclare(dest, "fanout", false, true, null);
278+
ch.exchangeBind(dest, src, "doesn't matter");
279+
ch.exchangeDelete(dest);
280+
}
281+
assertEquals(0, ((AutorecoveringConnection)connection).getRecordedExchanges().size());
282+
ch.close();
283+
}
284+
241285
public void testServerNamedQueueRecovery() throws IOException, InterruptedException {
242286
String q = channel.queueDeclare("", false, false, false, null).getQueue();
243287
String x = "amq.fanout";

0 commit comments

Comments
 (0)