Skip to content

Commit 079422c

Browse files
Introduce listeners for queue name and consumer tag changes (during recovery)
Meant to be used by clients like March Hare that represent queues and consumers as object and may need to update them, while relying on the Java client to perform recovery.
1 parent c9d1d84 commit 079422c

File tree

4 files changed

+97
-0
lines changed

4 files changed

+97
-0
lines changed

src/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@ public class AutorecoveringConnection implements Connection, Recoverable, Networ
6060
private final List<RecordedBinding> recordedBindings = new ArrayList<RecordedBinding>();
6161
private Map<String, RecordedExchange> recordedExchanges = new ConcurrentHashMap<String, RecordedExchange>();
6262
private final Map<String, RecordedConsumer> consumers = new ConcurrentHashMap<String, RecordedConsumer>();
63+
private List<ConsumerRecoveryListener> consumerRecoveryListeners = new ArrayList<ConsumerRecoveryListener>();
64+
private List<QueueRecoveryListener> queueRecoveryListeners = new ArrayList<QueueRecoveryListener>();
6365

6466
public AutorecoveringConnection(ConnectionParams params, FrameHandlerFactory f, Address[] addrs) {
6567
this.cf = new RecoveryAwareAMQConnectionFactory(params, f, addrs);
@@ -355,6 +357,44 @@ public void shutdownCompleted(ShutdownSignalException cause) {
355357
}
356358
}
357359

360+
/**
361+
* Not part of the public API. Mean to be used by JVM RabbitMQ clients that build on
362+
* top of the Java client and need to be notified when server-named queue name changes
363+
* after recovery.
364+
*
365+
* @param listener listener that observes queue name changes after recovery
366+
*/
367+
public void addQueueRecoveryListener(QueueRecoveryListener listener) {
368+
this.queueRecoveryListeners.add(listener);
369+
}
370+
371+
/**
372+
* @see com.rabbitmq.client.impl.recovery.AutorecoveringConnection#addQueueRecoveryListener
373+
* @param listener listener to be removed
374+
*/
375+
public void removeQueueRecoveryListener(QueueRecoveryListener listener) {
376+
this.queueRecoveryListeners.remove(listener);
377+
}
378+
379+
/**
380+
* Not part of the public API. Mean to be used by JVM RabbitMQ clients that build on
381+
* top of the Java client and need to be notified when consumer tag changes
382+
* after recovery.
383+
*
384+
* @param listener listener that observes consumer tag changes after recovery
385+
*/
386+
public void addConsumerRecoveryListener(ConsumerRecoveryListener listener) {
387+
this.consumerRecoveryListeners.add(listener);
388+
}
389+
390+
/**
391+
* @see com.rabbitmq.client.impl.recovery.AutorecoveringConnection#addConsumerRecoveryListener(ConsumerRecoveryListener)
392+
* @param listener listener to be removed
393+
*/
394+
public void removeConsumerRecoveryListener(ConsumerRecoveryListener listener) {
395+
this.consumerRecoveryListeners.remove(listener);
396+
}
397+
358398
synchronized private void beginAutomaticRecovery() throws InterruptedException, IOException, TopologyRecoveryException {
359399
Thread.sleep(this.params.getNetworkRecoveryInterval());
360400
this.recoverConnection();
@@ -455,6 +495,9 @@ private void recoverQueues() {
455495
this.propagateQueueNameChangeToBindings(oldName, newName);
456496
this.propagateQueueNameChangeToConsumers(oldName, newName);
457497
}
498+
for(QueueRecoveryListener qrl : this.queueRecoveryListeners) {
499+
qrl.queueRecovered(oldName, newName);
500+
}
458501
} catch (Exception cause) {
459502
final String message = "Caught an exception while recovering queue " + oldName +
460503
": " + cause.getMessage();
@@ -490,6 +533,9 @@ private void recoverConsumers() {
490533
this.consumers.remove(tag);
491534
this.consumers.put(newTag, consumer);
492535
}
536+
for(ConsumerRecoveryListener crl : this.consumerRecoveryListeners) {
537+
crl.consumerRecovered(tag, newTag);
538+
}
493539
} catch (Exception cause) {
494540
final String message = "Caught an exception while recovering consumer " + tag +
495541
": " + cause.getMessage();
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package com.rabbitmq.client.impl.recovery;
2+
3+
/**
4+
* Not part of the public API. Mean to be used by JVM RabbitMQ clients that build on
5+
* top of the Java client and need to be notified when consumer tag changes
6+
* after recovery.
7+
*/
8+
public interface ConsumerRecoveryListener {
9+
void consumerRecovered(String oldConsumerTag, String newConsumerTag);
10+
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package com.rabbitmq.client.impl.recovery;
2+
3+
/**
4+
* Not part of the public API. Mean to be used by JVM RabbitMQ clients that build on
5+
* top of the Java client and need to be notified when server-name queue name changes
6+
* after recovery.
7+
*/
8+
public interface QueueRecoveryListener {
9+
void queueRecovered(String oldName, String newName);
10+
}

test/src/com/rabbitmq/client/test/functional/ConnectionRecovery.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
import com.rabbitmq.client.impl.recovery.AutorecoveringConnection;
66
import com.rabbitmq.client.Recoverable;
77
import com.rabbitmq.client.RecoveryListener;
8+
import com.rabbitmq.client.impl.recovery.ConsumerRecoveryListener;
9+
import com.rabbitmq.client.impl.recovery.QueueRecoveryListener;
810
import com.rabbitmq.client.test.BrokerTestCase;
911
import com.rabbitmq.tools.Host;
1012

@@ -16,6 +18,7 @@
1618
import java.util.concurrent.TimeUnit;
1719
import java.util.concurrent.TimeoutException;
1820
import java.util.concurrent.atomic.AtomicInteger;
21+
import java.util.concurrent.atomic.AtomicReference;
1922

2023
public class ConnectionRecovery extends BrokerTestCase {
2124
public static final long RECOVERY_INTERVAL = 2000;
@@ -208,10 +211,24 @@ public void testServerNamedQueueRecovery() throws IOException, InterruptedExcept
208211
String x = "amq.fanout";
209212
channel.queueBind(q, x, "");
210213

214+
final AtomicReference<String> nameBefore = new AtomicReference<String>();
215+
final AtomicReference<String> nameAfter = new AtomicReference<String>();
216+
final CountDownLatch listenerLatch = new CountDownLatch(1);
217+
((AutorecoveringConnection)connection).addQueueRecoveryListener(new QueueRecoveryListener() {
218+
@Override
219+
public void queueRecovered(String oldName, String newName) {
220+
nameBefore.set(oldName);
221+
nameAfter.set(newName);
222+
listenerLatch.countDown();
223+
}
224+
});
225+
211226
closeAndWaitForRecovery();
227+
wait(listenerLatch);
212228
expectChannelRecovery(channel);
213229
channel.basicPublish(x, "", null, "msg".getBytes());
214230
assertDelivered(q, 1);
231+
assertFalse(nameBefore.get().equals(nameAfter.get()));
215232
channel.queueDelete(q);
216233
}
217234

@@ -318,8 +335,22 @@ public void testConsumerRecoveryWithManyConsumers() throws IOException, Interrup
318335
for (int i = 0; i < n; i++) {
319336
channel.basicConsume(q, new DefaultConsumer(channel));
320337
}
338+
final AtomicReference<String> tagA = new AtomicReference<String>();
339+
final AtomicReference<String> tagB = new AtomicReference<String>();
340+
final CountDownLatch listenerLatch = new CountDownLatch(n);
341+
((AutorecoveringConnection)connection).addConsumerRecoveryListener(new ConsumerRecoveryListener() {
342+
@Override
343+
public void consumerRecovered(String oldConsumerTag, String newConsumerTag) {
344+
tagA.set(oldConsumerTag);
345+
tagB.set(newConsumerTag);
346+
listenerLatch.countDown();
347+
}
348+
});
349+
321350
assertConsumerCount(n, q);
322351
closeAndWaitForRecovery();
352+
wait(listenerLatch);
353+
assertTrue(tagA.get().equals(tagB.get()));
323354
expectChannelRecovery(channel);
324355
assertConsumerCount(n, q);
325356

0 commit comments

Comments
 (0)