Skip to content

Commit aac93d9

Browse files
Clean up auto-deleted queues from recovery cache when the last (known) consumer is cancelled
This addresses a memory leak in the most common case: declare an auto-deleted queue, add a consumer, do some work, cancel. Of course, our cache is oblivious to other consumers that may be used, including those on other connections, but we only invalidate our own connection-scoped cache, not delete the queue.
1 parent 199d3bf commit aac93d9

File tree

4 files changed

+56
-5
lines changed

4 files changed

+56
-5
lines changed

src/com/rabbitmq/client/impl/recovery/AutorecoveringChannel.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -367,7 +367,8 @@ public String basicConsume(String queue, boolean autoAck, String consumerTag, bo
367367
}
368368

369369
public void basicCancel(String consumerTag) throws IOException {
370-
this.deleteRecordedConsumer(consumerTag);
370+
RecordedConsumer c = this.deleteRecordedConsumer(consumerTag);
371+
this.maybeDeleteRecordedAutoDeleteQueue(c.getQueue());
371372
delegate.basicCancel(consumerTag);
372373
}
373374

@@ -578,7 +579,11 @@ private void recordConsumer(String result,
578579
this.connection.recordConsumer(result, consumer);
579580
}
580581

581-
private void deleteRecordedConsumer(String consumerTag) {
582-
this.connection.deleteRecordedConsumer(consumerTag);
582+
private RecordedConsumer deleteRecordedConsumer(String consumerTag) {
583+
return this.connection.deleteRecordedConsumer(consumerTag);
584+
}
585+
586+
private void maybeDeleteRecordedAutoDeleteQueue(String queue) {
587+
this.connection.maybeDeleteRecordedAutoDeleteQueue(queue);
583588
}
584589
}

src/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.net.ConnectException;
2020
import java.net.InetAddress;
2121
import java.util.ArrayList;
22+
import java.util.Collection;
2223
import java.util.HashMap;
2324
import java.util.List;
2425
import java.util.Map;
@@ -639,7 +640,35 @@ void recordConsumer(String result, RecordedConsumer consumer) {
639640
this.consumers.put(result, consumer);
640641
}
641642

642-
void deleteRecordedConsumer(String consumerTag) {
643-
this.consumers.remove(consumerTag);
643+
RecordedConsumer deleteRecordedConsumer(String consumerTag) {
644+
return this.consumers.remove(consumerTag);
645+
}
646+
647+
void maybeDeleteRecordedAutoDeleteQueue(String queue) {
648+
synchronized (this.recordedQueues) {
649+
synchronized (this.consumers) {
650+
if(!hasMoreConsumersOnQueue(this.consumers.values(), queue)) {
651+
RecordedQueue q = this.recordedQueues.get(queue);
652+
// last consumer on this connection is gone, remove recorded queue
653+
// if it is auto-deleted. See bug 26364.
654+
if(q.isAutoDelete()) { this.recordedQueues.remove(queue); }
655+
}
656+
}
657+
}
658+
}
659+
660+
boolean hasMoreConsumersOnQueue(Collection<RecordedConsumer> consumers, String queue) {
661+
boolean result = false;
662+
for (RecordedConsumer c : consumers) {
663+
if(queue.equals(c.getQueue())) {
664+
result = true;
665+
break;
666+
}
667+
}
668+
return result;
669+
}
670+
671+
public Map<String, RecordedQueue> getRecordedQueues() {
672+
return recordedQueues;
644673
}
645674
}

src/com/rabbitmq/client/impl/recovery/RecordedQueue.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ public boolean isServerNamed() {
3232
return this.serverNamed;
3333
}
3434

35+
public boolean isAutoDelete() { return this.autoDelete; }
36+
3537
public void recover() throws IOException {
3638
this.name = this.channel.queueDeclare(this.getNameToUseForRecovery(),
3739
this.durable,

test/src/com/rabbitmq/client/test/functional/ConnectionRecovery.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,21 @@ protected void testClientNamedQueueRecoveryWith(String q, boolean noWait) throws
206206
ch.queueDelete(q);
207207
}
208208

209+
// bug 26364
210+
public void testDeclarationOfManyAutoDeleteQueuesWithTransientConsumer() throws IOException {
211+
Channel ch = connection.createChannel();
212+
assertEquals(0, ((AutorecoveringConnection)connection).getRecordedQueues().size());
213+
for(int i = 0; i < 20000; i++) {
214+
String q = UUID.randomUUID().toString();
215+
ch.queueDeclareNoWait(q, false, false, true, null);
216+
QueueingConsumer dummy = new QueueingConsumer(ch);
217+
String tag = ch.basicConsume(q, true, dummy);
218+
ch.basicCancel(tag);
219+
}
220+
assertEquals(0, ((AutorecoveringConnection)connection).getRecordedQueues().size());
221+
ch.close();
222+
}
223+
209224
public void testServerNamedQueueRecovery() throws IOException, InterruptedException {
210225
String q = channel.queueDeclare("", false, false, false, null).getQueue();
211226
String x = "amq.fanout";

0 commit comments

Comments
 (0)