Skip to content

Commit e899710

Browse files
merge bug26364 into default
2 parents 199d3bf + 0e71084 commit e899710

File tree

5 files changed

+192
-10
lines changed

5 files changed

+192
-10
lines changed

src/com/rabbitmq/client/impl/recovery/AutorecoveringChannel.java

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import com.rabbitmq.client.ReturnListener;
1515
import com.rabbitmq.client.ShutdownListener;
1616
import com.rabbitmq.client.ShutdownSignalException;
17+
import com.rabbitmq.client.impl.AMQImpl;
1718

1819
import java.io.IOException;
1920
import java.util.ArrayList;
@@ -221,11 +222,11 @@ public AMQP.Exchange.DeleteOk exchangeDelete(String exchange) throws IOException
221222
}
222223

223224
public AMQP.Exchange.BindOk exchangeBind(String destination, String source, String routingKey) throws IOException {
224-
return delegate.exchangeBind(destination, source, routingKey);
225+
return exchangeBind(destination, source, routingKey, null);
225226
}
226227

227228
public AMQP.Exchange.BindOk exchangeBind(String destination, String source, String routingKey, Map<String, Object> arguments) throws IOException {
228-
AMQP.Exchange.BindOk ok = delegate.exchangeBind(destination, source, routingKey, arguments);
229+
final AMQP.Exchange.BindOk ok = delegate.exchangeBind(destination, source, routingKey, arguments);
229230
recordExchangeBinding(destination, source, routingKey, arguments);
230231
return ok;
231232
}
@@ -241,6 +242,7 @@ public AMQP.Exchange.UnbindOk exchangeUnbind(String destination, String source,
241242

242243
public AMQP.Exchange.UnbindOk exchangeUnbind(String destination, String source, String routingKey, Map<String, Object> arguments) throws IOException {
243244
deleteRecordedExchangeBinding(destination, source, routingKey, arguments);
245+
this.maybeDeleteRecordedAutoDeleteExchange(source);
244246
return delegate.exchangeUnbind(destination, source, routingKey, arguments);
245247
}
246248

@@ -321,6 +323,7 @@ public AMQP.Queue.UnbindOk queueUnbind(String queue, String exchange, String rou
321323

322324
public AMQP.Queue.UnbindOk queueUnbind(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException {
323325
deleteRecordedQueueBinding(queue, exchange, routingKey, arguments);
326+
this.maybeDeleteRecordedAutoDeleteExchange(exchange);
324327
return delegate.queueUnbind(queue, exchange, routingKey, arguments);
325328
}
326329

@@ -367,7 +370,8 @@ public String basicConsume(String queue, boolean autoAck, String consumerTag, bo
367370
}
368371

369372
public void basicCancel(String consumerTag) throws IOException {
370-
this.deleteRecordedConsumer(consumerTag);
373+
RecordedConsumer c = this.deleteRecordedConsumer(consumerTag);
374+
this.maybeDeleteRecordedAutoDeleteQueue(c.getQueue());
371375
delegate.basicCancel(consumerTag);
372376
}
373377

@@ -578,7 +582,15 @@ private void recordConsumer(String result,
578582
this.connection.recordConsumer(result, consumer);
579583
}
580584

581-
private void deleteRecordedConsumer(String consumerTag) {
582-
this.connection.deleteRecordedConsumer(consumerTag);
585+
private RecordedConsumer deleteRecordedConsumer(String consumerTag) {
586+
return this.connection.deleteRecordedConsumer(consumerTag);
587+
}
588+
589+
private void maybeDeleteRecordedAutoDeleteQueue(String queue) {
590+
this.connection.maybeDeleteRecordedAutoDeleteQueue(queue);
591+
}
592+
593+
private void maybeDeleteRecordedAutoDeleteExchange(String exchange) {
594+
this.connection.maybeDeleteRecordedAutoDeleteExchange(exchange);
583595
}
584596
}

src/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java

Lines changed: 85 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,13 @@
1919
import java.net.ConnectException;
2020
import java.net.InetAddress;
2121
import java.util.ArrayList;
22+
import java.util.Collection;
2223
import java.util.HashMap;
24+
import java.util.HashSet;
25+
import java.util.Iterator;
2326
import java.util.List;
2427
import java.util.Map;
28+
import java.util.Set;
2529
import java.util.concurrent.ConcurrentHashMap;
2630

2731
/**
@@ -58,10 +62,10 @@ public class AutorecoveringConnection implements Connection, Recoverable, Networ
5862
// Records topology changes
5963
private final Map<String, RecordedQueue> recordedQueues = new ConcurrentHashMap<String, RecordedQueue>();
6064
private final List<RecordedBinding> recordedBindings = new ArrayList<RecordedBinding>();
61-
private Map<String, RecordedExchange> recordedExchanges = new ConcurrentHashMap<String, RecordedExchange>();
65+
private final Map<String, RecordedExchange> recordedExchanges = new ConcurrentHashMap<String, RecordedExchange>();
6266
private final Map<String, RecordedConsumer> consumers = new ConcurrentHashMap<String, RecordedConsumer>();
63-
private List<ConsumerRecoveryListener> consumerRecoveryListeners = new ArrayList<ConsumerRecoveryListener>();
64-
private List<QueueRecoveryListener> queueRecoveryListeners = new ArrayList<QueueRecoveryListener>();
67+
private final List<ConsumerRecoveryListener> consumerRecoveryListeners = new ArrayList<ConsumerRecoveryListener>();
68+
private final List<QueueRecoveryListener> queueRecoveryListeners = new ArrayList<QueueRecoveryListener>();
6569

6670
public AutorecoveringConnection(ConnectionParams params, FrameHandlerFactory f, Address[] addrs) {
6771
this.cf = new RecoveryAwareAMQConnectionFactory(params, f, addrs);
@@ -625,6 +629,10 @@ void recordQueue(String queue, RecordedQueue meta) {
625629

626630
void deleteRecordedQueue(String queue) {
627631
this.recordedQueues.remove(queue);
632+
Set<RecordedBinding> xs = this.removeBindingsWithDestination(queue);
633+
for (RecordedBinding b : xs) {
634+
this.maybeDeleteRecordedAutoDeleteExchange(b.getSource());
635+
}
628636
}
629637

630638
void recordExchange(String exchange, RecordedExchange x) {
@@ -633,13 +641,85 @@ void recordExchange(String exchange, RecordedExchange x) {
633641

634642
void deleteRecordedExchange(String exchange) {
635643
this.recordedExchanges.remove(exchange);
644+
Set<RecordedBinding> xs = this.removeBindingsWithDestination(exchange);
645+
for (RecordedBinding b : xs) {
646+
this.maybeDeleteRecordedAutoDeleteExchange(b.getSource());
647+
}
636648
}
637649

638650
void recordConsumer(String result, RecordedConsumer consumer) {
639651
this.consumers.put(result, consumer);
640652
}
641653

642-
void deleteRecordedConsumer(String consumerTag) {
643-
this.consumers.remove(consumerTag);
654+
RecordedConsumer deleteRecordedConsumer(String consumerTag) {
655+
return this.consumers.remove(consumerTag);
656+
}
657+
658+
void maybeDeleteRecordedAutoDeleteQueue(String queue) {
659+
synchronized (this.recordedQueues) {
660+
synchronized (this.consumers) {
661+
if(!hasMoreConsumersOnQueue(this.consumers.values(), queue)) {
662+
RecordedQueue q = this.recordedQueues.get(queue);
663+
// last consumer on this connection is gone, remove recorded queue
664+
// if it is auto-deleted. See bug 26364.
665+
if((q != null) && q.isAutoDelete()) { this.recordedQueues.remove(queue); }
666+
}
667+
}
668+
}
669+
}
670+
671+
void maybeDeleteRecordedAutoDeleteExchange(String exchange) {
672+
synchronized (this.recordedExchanges) {
673+
synchronized (this.consumers) {
674+
if(!hasMoreDestinationsBoundToExchange(this.recordedBindings, exchange)) {
675+
RecordedExchange x = this.recordedExchanges.get(exchange);
676+
// last binding where this exchange is the source is gone, remove recorded exchange
677+
// if it is auto-deleted. See bug 26364.
678+
if((x != null) && x.isAutoDelete()) { this.recordedExchanges.remove(exchange); }
679+
}
680+
}
681+
}
682+
}
683+
684+
boolean hasMoreDestinationsBoundToExchange(List<RecordedBinding> bindings, String exchange) {
685+
boolean result = false;
686+
for (RecordedBinding b : bindings) {
687+
if(exchange.equals(b.getSource())) {
688+
result = true;
689+
break;
690+
}
691+
}
692+
return result;
693+
}
694+
695+
boolean hasMoreConsumersOnQueue(Collection<RecordedConsumer> consumers, String queue) {
696+
boolean result = false;
697+
for (RecordedConsumer c : consumers) {
698+
if(queue.equals(c.getQueue())) {
699+
result = true;
700+
break;
701+
}
702+
}
703+
return result;
704+
}
705+
706+
Set<RecordedBinding> removeBindingsWithDestination(String s) {
707+
Set<RecordedBinding> result = new HashSet<RecordedBinding>();
708+
for (Iterator<RecordedBinding> it = this.recordedBindings.iterator(); it.hasNext(); ) {
709+
RecordedBinding b = it.next();
710+
if(b.getDestination().equals(s)) {
711+
it.remove();
712+
result.add(b);
713+
}
714+
}
715+
return result;
716+
}
717+
718+
public Map<String, RecordedQueue> getRecordedQueues() {
719+
return recordedQueues;
720+
}
721+
722+
public Map<String, RecordedExchange> getRecordedExchanges() {
723+
return recordedExchanges;
644724
}
645725
}

src/com/rabbitmq/client/impl/recovery/RecordedExchange.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,4 +39,8 @@ public RecordedExchange arguments(Map<String, Object> value) {
3939
this.arguments = value;
4040
return this;
4141
}
42+
43+
public boolean isAutoDelete() {
44+
return autoDelete;
45+
}
4246
}

src/com/rabbitmq/client/impl/recovery/RecordedQueue.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ public boolean isServerNamed() {
3232
return this.serverNamed;
3333
}
3434

35+
public boolean isAutoDelete() { return this.autoDelete; }
36+
3537
public void recover() throws IOException {
3638
this.name = this.channel.queueDeclare(this.getNameToUseForRecovery(),
3739
this.durable,

test/src/com/rabbitmq/client/test/functional/ConnectionRecovery.java

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,82 @@ protected void testClientNamedQueueRecoveryWith(String q, boolean noWait) throws
206206
ch.queueDelete(q);
207207
}
208208

209+
public void testDeclarationOfManyAutoDeleteQueuesWithTransientConsumer() throws IOException {
210+
Channel ch = connection.createChannel();
211+
assertRecordedQueues(connection, 0);
212+
for(int i = 0; i < 5000; i++) {
213+
String q = UUID.randomUUID().toString();
214+
ch.queueDeclare(q, false, false, true, null);
215+
QueueingConsumer dummy = new QueueingConsumer(ch);
216+
String tag = ch.basicConsume(q, true, dummy);
217+
ch.basicCancel(tag);
218+
}
219+
assertRecordedQueues(connection, 0);
220+
ch.close();
221+
}
222+
223+
public void testDeclarationofManyAutoDeleteExchangesWithTransientQueuesThatAreUnbound() throws IOException {
224+
Channel ch = connection.createChannel();
225+
assertRecordedExchanges(connection, 0);
226+
for(int i = 0; i < 5000; i++) {
227+
String x = UUID.randomUUID().toString();
228+
ch.exchangeDeclare(x, "fanout", false, true, null);
229+
String q = ch.queueDeclare().getQueue();
230+
final String rk = "doesn't matter";
231+
ch.queueBind(q, x, rk);
232+
ch.queueUnbind(q, x, rk);
233+
ch.queueDelete(q);
234+
}
235+
assertRecordedExchanges(connection, 0);
236+
ch.close();
237+
}
238+
239+
public void testDeclarationofManyAutoDeleteExchangesWithTransientQueuesThatAreDeleted() throws IOException {
240+
Channel ch = connection.createChannel();
241+
assertRecordedExchanges(connection, 0);
242+
for(int i = 0; i < 5000; i++) {
243+
String x = UUID.randomUUID().toString();
244+
ch.exchangeDeclare(x, "fanout", false, true, null);
245+
String q = ch.queueDeclare().getQueue();
246+
ch.queueBind(q, x, "doesn't matter");
247+
ch.queueDelete(q);
248+
}
249+
assertRecordedExchanges(connection, 0);
250+
ch.close();
251+
}
252+
253+
public void testDeclarationofManyAutoDeleteExchangesWithTransientExchangesThatAreUnbound() throws IOException {
254+
Channel ch = connection.createChannel();
255+
assertRecordedExchanges(connection, 0);
256+
for(int i = 0; i < 5000; i++) {
257+
String src = "src-" + UUID.randomUUID().toString();
258+
String dest = "dest-" + UUID.randomUUID().toString();
259+
ch.exchangeDeclare(src, "fanout", false, true, null);
260+
ch.exchangeDeclare(dest, "fanout", false, true, null);
261+
final String rk = "doesn't matter";
262+
ch.exchangeBind(dest, src, rk);
263+
ch.exchangeUnbind(dest, src, rk);
264+
ch.exchangeDelete(dest);
265+
}
266+
assertRecordedExchanges(connection, 0);
267+
ch.close();
268+
}
269+
270+
public void testDeclarationofManyAutoDeleteExchangesWithTransientExchangesThatAreDeleted() throws IOException {
271+
Channel ch = connection.createChannel();
272+
assertRecordedExchanges(connection, 0);
273+
for(int i = 0; i < 5000; i++) {
274+
String src = "src-" + UUID.randomUUID().toString();
275+
String dest = "dest-" + UUID.randomUUID().toString();
276+
ch.exchangeDeclare(src, "fanout", false, true, null);
277+
ch.exchangeDeclare(dest, "fanout", false, true, null);
278+
ch.exchangeBind(dest, src, "doesn't matter");
279+
ch.exchangeDelete(dest);
280+
}
281+
assertRecordedExchanges(connection, 0);
282+
ch.close();
283+
}
284+
209285
public void testServerNamedQueueRecovery() throws IOException, InterruptedException {
210286
String q = channel.queueDeclare("", false, false, false, null).getQueue();
211287
String x = "amq.fanout";
@@ -552,4 +628,12 @@ private static void wait(CountDownLatch latch) throws InterruptedException {
552628
private void waitForConfirms(Channel ch) throws InterruptedException, TimeoutException {
553629
ch.waitForConfirms(30 * 60 * 1000);
554630
}
631+
632+
protected void assertRecordedQueues(Connection conn, int size) {
633+
assertEquals(size, ((AutorecoveringConnection)conn).getRecordedQueues().size());
634+
}
635+
636+
protected void assertRecordedExchanges(Connection conn, int size) {
637+
assertEquals(size, ((AutorecoveringConnection)conn).getRecordedExchanges().size());
638+
}
555639
}

0 commit comments

Comments
 (0)