Skip to content

Commit e207d97

Browse files
Invalidate local exchange cache entries for auto-delete exchanges on queueUnbind
1 parent aac93d9 commit e207d97

File tree

4 files changed

+59
-5
lines changed

4 files changed

+59
-5
lines changed

src/com/rabbitmq/client/impl/recovery/AutorecoveringChannel.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -321,6 +321,7 @@ public AMQP.Queue.UnbindOk queueUnbind(String queue, String exchange, String rou
321321

322322
public AMQP.Queue.UnbindOk queueUnbind(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException {
323323
deleteRecordedQueueBinding(queue, exchange, routingKey, arguments);
324+
this.maybeDeleteRecordedAutoDeleteExchange(exchange);
324325
return delegate.queueUnbind(queue, exchange, routingKey, arguments);
325326
}
326327

@@ -586,4 +587,8 @@ private RecordedConsumer deleteRecordedConsumer(String consumerTag) {
586587
private void maybeDeleteRecordedAutoDeleteQueue(String queue) {
587588
this.connection.maybeDeleteRecordedAutoDeleteQueue(queue);
588589
}
590+
591+
private void maybeDeleteRecordedAutoDeleteExchange(String exchange) {
592+
this.connection.maybeDeleteRecordedAutoDeleteExchange(exchange);
593+
}
589594
}

src/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -59,10 +59,10 @@ public class AutorecoveringConnection implements Connection, Recoverable, Networ
5959
// Records topology changes
6060
private final Map<String, RecordedQueue> recordedQueues = new ConcurrentHashMap<String, RecordedQueue>();
6161
private final List<RecordedBinding> recordedBindings = new ArrayList<RecordedBinding>();
62-
private Map<String, RecordedExchange> recordedExchanges = new ConcurrentHashMap<String, RecordedExchange>();
62+
private final Map<String, RecordedExchange> recordedExchanges = new ConcurrentHashMap<String, RecordedExchange>();
6363
private final Map<String, RecordedConsumer> consumers = new ConcurrentHashMap<String, RecordedConsumer>();
64-
private List<ConsumerRecoveryListener> consumerRecoveryListeners = new ArrayList<ConsumerRecoveryListener>();
65-
private List<QueueRecoveryListener> queueRecoveryListeners = new ArrayList<QueueRecoveryListener>();
64+
private final List<ConsumerRecoveryListener> consumerRecoveryListeners = new ArrayList<ConsumerRecoveryListener>();
65+
private final List<QueueRecoveryListener> queueRecoveryListeners = new ArrayList<QueueRecoveryListener>();
6666

6767
public AutorecoveringConnection(ConnectionParams params, FrameHandlerFactory f, Address[] addrs) {
6868
this.cf = new RecoveryAwareAMQConnectionFactory(params, f, addrs);
@@ -651,12 +651,36 @@ void maybeDeleteRecordedAutoDeleteQueue(String queue) {
651651
RecordedQueue q = this.recordedQueues.get(queue);
652652
// last consumer on this connection is gone, remove recorded queue
653653
// if it is auto-deleted. See bug 26364.
654-
if(q.isAutoDelete()) { this.recordedQueues.remove(queue); }
654+
if((q != null) && q.isAutoDelete()) { this.recordedQueues.remove(queue); }
655655
}
656656
}
657657
}
658658
}
659659

660+
void maybeDeleteRecordedAutoDeleteExchange(String exchange) {
661+
synchronized (this.recordedExchanges) {
662+
synchronized (this.consumers) {
663+
if(!hasMoreDestinationsBoundToExchange(this.recordedBindings, exchange)) {
664+
RecordedExchange x = this.recordedExchanges.get(exchange);
665+
// last binding where this exchange is the source is gone, remove recorded exchange
666+
// if it is auto-deleted. See bug 26364.
667+
if((x != null) && x.isAutoDelete()) { this.recordedExchanges.remove(exchange); }
668+
}
669+
}
670+
}
671+
}
672+
673+
boolean hasMoreDestinationsBoundToExchange(List<RecordedBinding> bindings, String exchange) {
674+
boolean result = false;
675+
for (RecordedBinding b : bindings) {
676+
if(exchange.equals(b.getSource())) {
677+
result = true;
678+
break;
679+
}
680+
}
681+
return result;
682+
}
683+
660684
boolean hasMoreConsumersOnQueue(Collection<RecordedConsumer> consumers, String queue) {
661685
boolean result = false;
662686
for (RecordedConsumer c : consumers) {
@@ -671,4 +695,8 @@ boolean hasMoreConsumersOnQueue(Collection<RecordedConsumer> consumers, String q
671695
public Map<String, RecordedQueue> getRecordedQueues() {
672696
return recordedQueues;
673697
}
698+
699+
public Map<String, RecordedExchange> getRecordedExchanges() {
700+
return recordedExchanges;
701+
}
674702
}

src/com/rabbitmq/client/impl/recovery/RecordedExchange.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,4 +39,8 @@ public RecordedExchange arguments(Map<String, Object> value) {
3939
this.arguments = value;
4040
return this;
4141
}
42+
43+
public boolean isAutoDelete() {
44+
return autoDelete;
45+
}
4246
}

test/src/com/rabbitmq/client/test/functional/ConnectionRecovery.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,7 @@ protected void testClientNamedQueueRecoveryWith(String q, boolean noWait) throws
210210
public void testDeclarationOfManyAutoDeleteQueuesWithTransientConsumer() throws IOException {
211211
Channel ch = connection.createChannel();
212212
assertEquals(0, ((AutorecoveringConnection)connection).getRecordedQueues().size());
213-
for(int i = 0; i < 20000; i++) {
213+
for(int i = 0; i < 5000; i++) {
214214
String q = UUID.randomUUID().toString();
215215
ch.queueDeclareNoWait(q, false, false, true, null);
216216
QueueingConsumer dummy = new QueueingConsumer(ch);
@@ -221,6 +221,23 @@ public void testDeclarationOfManyAutoDeleteQueuesWithTransientConsumer() throws
221221
ch.close();
222222
}
223223

224+
// bug 26364
225+
public void testDeclarationofManyAutoDeleteExchangesWithTransientQueues() throws IOException {
226+
Channel ch = connection.createChannel();
227+
assertEquals(0, ((AutorecoveringConnection)connection).getRecordedExchanges().size());
228+
for(int i = 0; i < 5000; i++) {
229+
String x = UUID.randomUUID().toString();
230+
ch.exchangeDeclare(x, "fanout", false, true, null);
231+
String q = ch.queueDeclare().getQueue();
232+
final String rk = "doesn't matter";
233+
ch.queueBind(q, x, rk);
234+
ch.queueUnbind(q, x, rk);
235+
ch.queueDelete(q);
236+
}
237+
assertEquals(0, ((AutorecoveringConnection)connection).getRecordedExchanges().size());
238+
ch.close();
239+
}
240+
224241
public void testServerNamedQueueRecovery() throws IOException, InterruptedException {
225242
String q = channel.queueDeclare("", false, false, false, null).getQueue();
226243
String x = "amq.fanout";

0 commit comments

Comments
 (0)