Skip to content

Commit 00f765c

Browse files
merge bug26359 into default
2 parents c9d1d84 + 079422c commit 00f765c

File tree

4 files changed

+97
-0
lines changed

4 files changed

+97
-0
lines changed

src/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@ public class AutorecoveringConnection implements Connection, Recoverable, Networ
6060
private final List<RecordedBinding> recordedBindings = new ArrayList<RecordedBinding>();
6161
private Map<String, RecordedExchange> recordedExchanges = new ConcurrentHashMap<String, RecordedExchange>();
6262
private final Map<String, RecordedConsumer> consumers = new ConcurrentHashMap<String, RecordedConsumer>();
63+
private List<ConsumerRecoveryListener> consumerRecoveryListeners = new ArrayList<ConsumerRecoveryListener>();
64+
private List<QueueRecoveryListener> queueRecoveryListeners = new ArrayList<QueueRecoveryListener>();
6365

6466
public AutorecoveringConnection(ConnectionParams params, FrameHandlerFactory f, Address[] addrs) {
6567
this.cf = new RecoveryAwareAMQConnectionFactory(params, f, addrs);
@@ -355,6 +357,44 @@ public void shutdownCompleted(ShutdownSignalException cause) {
355357
}
356358
}
357359

360+
/**
361+
* Not part of the public API. Mean to be used by JVM RabbitMQ clients that build on
362+
* top of the Java client and need to be notified when server-named queue name changes
363+
* after recovery.
364+
*
365+
* @param listener listener that observes queue name changes after recovery
366+
*/
367+
public void addQueueRecoveryListener(QueueRecoveryListener listener) {
368+
this.queueRecoveryListeners.add(listener);
369+
}
370+
371+
/**
372+
* @see com.rabbitmq.client.impl.recovery.AutorecoveringConnection#addQueueRecoveryListener
373+
* @param listener listener to be removed
374+
*/
375+
public void removeQueueRecoveryListener(QueueRecoveryListener listener) {
376+
this.queueRecoveryListeners.remove(listener);
377+
}
378+
379+
/**
380+
* Not part of the public API. Mean to be used by JVM RabbitMQ clients that build on
381+
* top of the Java client and need to be notified when consumer tag changes
382+
* after recovery.
383+
*
384+
* @param listener listener that observes consumer tag changes after recovery
385+
*/
386+
public void addConsumerRecoveryListener(ConsumerRecoveryListener listener) {
387+
this.consumerRecoveryListeners.add(listener);
388+
}
389+
390+
/**
391+
* @see com.rabbitmq.client.impl.recovery.AutorecoveringConnection#addConsumerRecoveryListener(ConsumerRecoveryListener)
392+
* @param listener listener to be removed
393+
*/
394+
public void removeConsumerRecoveryListener(ConsumerRecoveryListener listener) {
395+
this.consumerRecoveryListeners.remove(listener);
396+
}
397+
358398
synchronized private void beginAutomaticRecovery() throws InterruptedException, IOException, TopologyRecoveryException {
359399
Thread.sleep(this.params.getNetworkRecoveryInterval());
360400
this.recoverConnection();
@@ -455,6 +495,9 @@ private void recoverQueues() {
455495
this.propagateQueueNameChangeToBindings(oldName, newName);
456496
this.propagateQueueNameChangeToConsumers(oldName, newName);
457497
}
498+
for(QueueRecoveryListener qrl : this.queueRecoveryListeners) {
499+
qrl.queueRecovered(oldName, newName);
500+
}
458501
} catch (Exception cause) {
459502
final String message = "Caught an exception while recovering queue " + oldName +
460503
": " + cause.getMessage();
@@ -490,6 +533,9 @@ private void recoverConsumers() {
490533
this.consumers.remove(tag);
491534
this.consumers.put(newTag, consumer);
492535
}
536+
for(ConsumerRecoveryListener crl : this.consumerRecoveryListeners) {
537+
crl.consumerRecovered(tag, newTag);
538+
}
493539
} catch (Exception cause) {
494540
final String message = "Caught an exception while recovering consumer " + tag +
495541
": " + cause.getMessage();
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package com.rabbitmq.client.impl.recovery;
2+
3+
/**
4+
* Not part of the public API. Mean to be used by JVM RabbitMQ clients that build on
5+
* top of the Java client and need to be notified when consumer tag changes
6+
* after recovery.
7+
*/
8+
public interface ConsumerRecoveryListener {
9+
void consumerRecovered(String oldConsumerTag, String newConsumerTag);
10+
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package com.rabbitmq.client.impl.recovery;
2+
3+
/**
4+
* Not part of the public API. Mean to be used by JVM RabbitMQ clients that build on
5+
* top of the Java client and need to be notified when server-name queue name changes
6+
* after recovery.
7+
*/
8+
public interface QueueRecoveryListener {
9+
void queueRecovered(String oldName, String newName);
10+
}

test/src/com/rabbitmq/client/test/functional/ConnectionRecovery.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
import com.rabbitmq.client.impl.recovery.AutorecoveringConnection;
66
import com.rabbitmq.client.Recoverable;
77
import com.rabbitmq.client.RecoveryListener;
8+
import com.rabbitmq.client.impl.recovery.ConsumerRecoveryListener;
9+
import com.rabbitmq.client.impl.recovery.QueueRecoveryListener;
810
import com.rabbitmq.client.test.BrokerTestCase;
911
import com.rabbitmq.tools.Host;
1012

@@ -16,6 +18,7 @@
1618
import java.util.concurrent.TimeUnit;
1719
import java.util.concurrent.TimeoutException;
1820
import java.util.concurrent.atomic.AtomicInteger;
21+
import java.util.concurrent.atomic.AtomicReference;
1922

2023
public class ConnectionRecovery extends BrokerTestCase {
2124
public static final long RECOVERY_INTERVAL = 2000;
@@ -208,10 +211,24 @@ public void testServerNamedQueueRecovery() throws IOException, InterruptedExcept
208211
String x = "amq.fanout";
209212
channel.queueBind(q, x, "");
210213

214+
final AtomicReference<String> nameBefore = new AtomicReference<String>();
215+
final AtomicReference<String> nameAfter = new AtomicReference<String>();
216+
final CountDownLatch listenerLatch = new CountDownLatch(1);
217+
((AutorecoveringConnection)connection).addQueueRecoveryListener(new QueueRecoveryListener() {
218+
@Override
219+
public void queueRecovered(String oldName, String newName) {
220+
nameBefore.set(oldName);
221+
nameAfter.set(newName);
222+
listenerLatch.countDown();
223+
}
224+
});
225+
211226
closeAndWaitForRecovery();
227+
wait(listenerLatch);
212228
expectChannelRecovery(channel);
213229
channel.basicPublish(x, "", null, "msg".getBytes());
214230
assertDelivered(q, 1);
231+
assertFalse(nameBefore.get().equals(nameAfter.get()));
215232
channel.queueDelete(q);
216233
}
217234

@@ -318,8 +335,22 @@ public void testConsumerRecoveryWithManyConsumers() throws IOException, Interrup
318335
for (int i = 0; i < n; i++) {
319336
channel.basicConsume(q, new DefaultConsumer(channel));
320337
}
338+
final AtomicReference<String> tagA = new AtomicReference<String>();
339+
final AtomicReference<String> tagB = new AtomicReference<String>();
340+
final CountDownLatch listenerLatch = new CountDownLatch(n);
341+
((AutorecoveringConnection)connection).addConsumerRecoveryListener(new ConsumerRecoveryListener() {
342+
@Override
343+
public void consumerRecovered(String oldConsumerTag, String newConsumerTag) {
344+
tagA.set(oldConsumerTag);
345+
tagB.set(newConsumerTag);
346+
listenerLatch.countDown();
347+
}
348+
});
349+
321350
assertConsumerCount(n, q);
322351
closeAndWaitForRecovery();
352+
wait(listenerLatch);
353+
assertTrue(tagA.get().equals(tagB.get()));
323354
expectChannelRecovery(channel);
324355
assertConsumerCount(n, q);
325356

0 commit comments

Comments
 (0)