Skip to content

Commit bd09086

Browse files
Merge pull request #384 from rabbitmq/rabbitmq-java-client-383-topology-recovery-hooks
Add filter to skip entities on topology recovery
2 parents 1abb5c2 + 47dc236 commit bd09086

File tree

11 files changed

+596
-50
lines changed

11 files changed

+596
-50
lines changed

RUNNING_TESTS.md

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ For details on running specific tests, see below.
4141

4242
## Running a Specific Test Suite
4343

44-
To run a specific test suite you should execute one of the following in the
44+
To run a specific test suite, execute one of the following in the
4545
top-level directory of the source tree:
4646

4747
* To run the client unit tests:
@@ -59,7 +59,14 @@ top-level directory of the source tree:
5959
* To run a single test:
6060

6161
```
62-
./mvnw -Ddeps.dir=$(pwd)/deps/deps verify -Dit.test=DeadLetterExchange
62+
./mvnw -Ddeps.dir=$(pwd)/deps verify -Dit.test=DeadLetterExchange
63+
```
64+
65+
When running from the repository cloned as part of the [RabbitMQ public umbrella](https://github.com/rabbitmq/rabbitmq-public-umbrella),
66+
the `deps.dir` property path may have to change, e.g.
67+
68+
```
69+
./mvnw -Ddeps.dir=$(pwd)/.. verify -Dit.test=ConnectionRecovery
6370
```
6471

6572
For example, to run the client tests:
@@ -175,4 +182,4 @@ mvn verify -P '!setup-test-cluster'
175182
```
176183

177184
Note that by doing so some tests will fail as they require `rabbitmqctl` to
178-
control the running nodes.
185+
control the running nodes.

src/main/java/com/rabbitmq/client/ConnectionFactory.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
import com.rabbitmq.client.impl.nio.NioParams;
3030
import com.rabbitmq.client.impl.nio.SocketChannelFrameHandlerFactory;
3131
import com.rabbitmq.client.impl.recovery.AutorecoveringConnection;
32+
import com.rabbitmq.client.impl.recovery.TopologyRecoveryFilter;
33+
3234
import java.io.IOException;
3335
import java.net.URI;
3436
import java.net.URISyntaxException;
@@ -172,6 +174,12 @@ public class ConnectionFactory implements Cloneable {
172174
*/
173175
private int workPoolTimeout = DEFAULT_WORK_POOL_TIMEOUT;
174176

177+
/**
178+
* Filter to include/exclude entities from topology recovery.
179+
* @since 4.8.0
180+
*/
181+
private TopologyRecoveryFilter topologyRecoveryFilter;
182+
175183
/** @return the default host to use for connections */
176184
public String getHost() {
177185
return host;
@@ -1046,6 +1054,7 @@ public ConnectionParams params(ExecutorService consumerWorkServiceExecutor) {
10461054
result.setChannelShouldCheckRpcResponseType(channelShouldCheckRpcResponseType);
10471055
result.setWorkPoolTimeout(workPoolTimeout);
10481056
result.setErrorOnWriteListener(errorOnWriteListener);
1057+
result.setTopologyRecoveryFilter(topologyRecoveryFilter);
10491058
return result;
10501059
}
10511060

@@ -1379,4 +1388,12 @@ public int getWorkPoolTimeout() {
13791388
public void setErrorOnWriteListener(ErrorOnWriteListener errorOnWriteListener) {
13801389
this.errorOnWriteListener = errorOnWriteListener;
13811390
}
1391+
1392+
/**
1393+
* Set filter to include/exclude entities from topology recovery.
1394+
* @since 4.8.0
1395+
*/
1396+
public void setTopologyRecoveryFilter(TopologyRecoveryFilter topologyRecoveryFilter) {
1397+
this.topologyRecoveryFilter = topologyRecoveryFilter;
1398+
}
13821399
}

src/main/java/com/rabbitmq/client/impl/ConnectionParams.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import com.rabbitmq.client.RecoveryDelayHandler;
2020
import com.rabbitmq.client.RecoveryDelayHandler.DefaultRecoveryDelayHandler;
2121
import com.rabbitmq.client.SaslConfig;
22+
import com.rabbitmq.client.impl.recovery.TopologyRecoveryFilter;
2223

2324
import java.util.Map;
2425
import java.util.concurrent.ExecutorService;
@@ -46,6 +47,7 @@ public class ConnectionParams {
4647
private boolean channelShouldCheckRpcResponseType;
4748
private ErrorOnWriteListener errorOnWriteListener;
4849
private int workPoolTimeout = -1;
50+
private TopologyRecoveryFilter topologyRecoveryFilter;
4951

5052
private ExceptionHandler exceptionHandler;
5153
private ThreadFactory threadFactory;
@@ -235,4 +237,12 @@ public void setWorkPoolTimeout(int workPoolTimeout) {
235237
public int getWorkPoolTimeout() {
236238
return workPoolTimeout;
237239
}
240+
241+
public void setTopologyRecoveryFilter(TopologyRecoveryFilter topologyRecoveryFilter) {
242+
this.topologyRecoveryFilter = topologyRecoveryFilter;
243+
}
244+
245+
public TopologyRecoveryFilter getTopologyRecoveryFilter() {
246+
return topologyRecoveryFilter;
247+
}
238248
}

src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java

Lines changed: 76 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,8 @@ public class AutorecoveringConnection implements RecoverableConnection, NetworkC
8080
private final Map<String, RecordedConsumer> consumers = Collections.synchronizedMap(new LinkedHashMap<String, RecordedConsumer>());
8181
private final List<ConsumerRecoveryListener> consumerRecoveryListeners = Collections.synchronizedList(new ArrayList<ConsumerRecoveryListener>());
8282
private final List<QueueRecoveryListener> queueRecoveryListeners = Collections.synchronizedList(new ArrayList<QueueRecoveryListener>());
83+
84+
private final TopologyRecoveryFilter topologyRecoveryFilter;
8385

8486
// Used to block connection recovery attempts after close() is invoked.
8587
private volatile boolean manuallyClosed = false;
@@ -103,6 +105,10 @@ public AutorecoveringConnection(ConnectionParams params, FrameHandlerFactory f,
103105
setupErrorOnWriteListenerForPotentialRecovery();
104106

105107
this.channels = new ConcurrentHashMap<Integer, AutorecoveringChannel>();
108+
109+
110+
this.topologyRecoveryFilter = params.getTopologyRecoveryFilter() == null ?
111+
letAllPassFilter() : params.getTopologyRecoveryFilter();
106112
}
107113

108114
private void setupErrorOnWriteListenerForPotentialRecovery() {
@@ -133,6 +139,31 @@ public void run() {
133139
});
134140
}
135141

142+
private TopologyRecoveryFilter letAllPassFilter() {
143+
return new TopologyRecoveryFilter() {
144+
145+
@Override
146+
public boolean filterExchange(RecordedExchange recordedExchange) {
147+
return true;
148+
}
149+
150+
@Override
151+
public boolean filterQueue(RecordedQueue recordedQueue) {
152+
return true;
153+
}
154+
155+
@Override
156+
public boolean filterBinding(RecordedBinding recordedBinding) {
157+
return true;
158+
}
159+
160+
@Override
161+
public boolean filterConsumer(RecordedConsumer recordedConsumer) {
162+
return true;
163+
}
164+
};
165+
}
166+
136167
/**
137168
* Private API.
138169
* @throws IOException
@@ -655,8 +686,10 @@ private void recoverTopology(final ExecutorService executor) {
655686
private void recoverExchange(final RecordedExchange x) {
656687
// recorded exchanges are guaranteed to be non-predefined (we filter out predefined ones in exchangeDeclare). MK.
657688
try {
658-
x.recover();
659-
LOGGER.debug("{} has recovered", x);
689+
if (topologyRecoveryFilter.filterExchange(x)) {
690+
x.recover();
691+
LOGGER.debug("{} has recovered", x);
692+
}
660693
} catch (Exception cause) {
661694
final String message = "Caught an exception while recovering exchange " + x.getName() +
662695
": " + cause.getMessage();
@@ -666,30 +699,33 @@ private void recoverExchange(final RecordedExchange x) {
666699
}
667700

668701
private void recoverQueue(final String oldName, final RecordedQueue q) {
669-
LOGGER.debug("Recovering {}", q);
702+
670703
try {
671-
q.recover();
672-
String newName = q.getName();
673-
if (!oldName.equals(newName)) {
674-
// make sure server-named queues are re-added with
675-
// their new names. MK.
676-
synchronized (this.recordedQueues) {
677-
this.propagateQueueNameChangeToBindings(oldName, newName);
678-
this.propagateQueueNameChangeToConsumers(oldName, newName);
679-
// bug26552:
680-
// remove old name after we've updated the bindings and consumers,
681-
// plus only for server-named queues, both to make sure we don't lose
682-
// anything to recover. MK.
683-
if(q.isServerNamed()) {
684-
deleteRecordedQueue(oldName);
704+
if (topologyRecoveryFilter.filterQueue(q)) {
705+
LOGGER.debug("Recovering {}", q);
706+
q.recover();
707+
String newName = q.getName();
708+
if (!oldName.equals(newName)) {
709+
// make sure server-named queues are re-added with
710+
// their new names. MK.
711+
synchronized (this.recordedQueues) {
712+
this.propagateQueueNameChangeToBindings(oldName, newName);
713+
this.propagateQueueNameChangeToConsumers(oldName, newName);
714+
// bug26552:
715+
// remove old name after we've updated the bindings and consumers,
716+
// plus only for server-named queues, both to make sure we don't lose
717+
// anything to recover. MK.
718+
if(q.isServerNamed()) {
719+
deleteRecordedQueue(oldName);
720+
}
721+
this.recordedQueues.put(newName, q);
685722
}
686-
this.recordedQueues.put(newName, q);
687723
}
724+
for (QueueRecoveryListener qrl : Utility.copy(this.queueRecoveryListeners)) {
725+
qrl.queueRecovered(oldName, newName);
726+
}
727+
LOGGER.debug("{} has recovered", q);
688728
}
689-
for (QueueRecoveryListener qrl : Utility.copy(this.queueRecoveryListeners)) {
690-
qrl.queueRecovered(oldName, newName);
691-
}
692-
LOGGER.debug("{} has recovered", q);
693729
} catch (Exception cause) {
694730
final String message = "Caught an exception while recovering queue " + oldName +
695731
": " + cause.getMessage();
@@ -700,8 +736,10 @@ private void recoverQueue(final String oldName, final RecordedQueue q) {
700736

701737
private void recoverBinding(final RecordedBinding b) {
702738
try {
703-
b.recover();
704-
LOGGER.debug("{} has recovered", b);
739+
if (this.topologyRecoveryFilter.filterBinding(b)) {
740+
b.recover();
741+
LOGGER.debug("{} has recovered", b);
742+
}
705743
} catch (Exception cause) {
706744
String message = "Caught an exception while recovering binding between " + b.getSource() +
707745
" and " + b.getDestination() + ": " + cause.getMessage();
@@ -711,22 +749,24 @@ private void recoverBinding(final RecordedBinding b) {
711749
}
712750

713751
private void recoverConsumer(final String tag, final RecordedConsumer consumer) {
714-
LOGGER.debug("Recovering {}", consumer);
715752
try {
716-
String newTag = consumer.recover();
717-
// make sure server-generated tags are re-added. MK.
718-
if(tag != null && !tag.equals(newTag)) {
719-
synchronized (this.consumers) {
720-
this.consumers.remove(tag);
721-
this.consumers.put(newTag, consumer);
753+
if (this.topologyRecoveryFilter.filterConsumer(consumer)) {
754+
LOGGER.debug("Recovering {}", consumer);
755+
String newTag = consumer.recover();
756+
// make sure server-generated tags are re-added. MK.
757+
if(tag != null && !tag.equals(newTag)) {
758+
synchronized (this.consumers) {
759+
this.consumers.remove(tag);
760+
this.consumers.put(newTag, consumer);
761+
}
762+
consumer.getChannel().updateConsumerTag(tag, newTag);
722763
}
723-
consumer.getChannel().updateConsumerTag(tag, newTag);
724-
}
725764

726-
for (ConsumerRecoveryListener crl : Utility.copy(this.consumerRecoveryListeners)) {
727-
crl.consumerRecovered(tag, newTag);
765+
for (ConsumerRecoveryListener crl : Utility.copy(this.consumerRecoveryListeners)) {
766+
crl.consumerRecovered(tag, newTag);
767+
}
768+
LOGGER.debug("{} has recovered", consumer);
728769
}
729-
LOGGER.debug("{} has recovered", consumer);
730770
} catch (Exception cause) {
731771
final String message = "Caught an exception while recovering consumer " + tag +
732772
": " + cause.getMessage();

src/main/java/com/rabbitmq/client/impl/recovery/RecordedBinding.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,10 @@ public String getDestination() {
5959
return destination;
6060
}
6161

62+
public String getRoutingKey() {
63+
return routingKey;
64+
}
65+
6266
public Map<String, Object> getArguments() {
6367
return arguments;
6468
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
// Copyright (c) 2018 Pivotal Software, Inc. All rights reserved.
2+
//
3+
// This software, the RabbitMQ Java client library, is triple-licensed under the
4+
// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2
5+
// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
6+
// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,
7+
// please see LICENSE-APACHE2.
8+
//
9+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
10+
// either express or implied. See the LICENSE file for specific language governing
11+
// rights and limitations of this software.
12+
//
13+
// If you have any questions regarding licensing, please contact us at
14+
// info@rabbitmq.com.
15+
16+
package com.rabbitmq.client.impl.recovery;
17+
18+
/**
19+
* Filter to know whether entities should be recovered or not.
20+
* @since 4.8.0
21+
*/
22+
public interface TopologyRecoveryFilter {
23+
24+
/**
25+
* Decides whether an exchange is recovered or not.
26+
* @param recordedExchange
27+
* @return true to recover the exchange, false otherwise
28+
*/
29+
boolean filterExchange(RecordedExchange recordedExchange);
30+
31+
/**
32+
* Decides whether a queue is recovered or not.
33+
* @param recordedQueue
34+
* @return true to recover the queue, false otherwise
35+
*/
36+
boolean filterQueue(RecordedQueue recordedQueue);
37+
38+
/**
39+
* Decides whether a binding is recovered or not.
40+
* @param recordedBinding
41+
* @return true to recover the binding, false otherwise
42+
*/
43+
boolean filterBinding(RecordedBinding recordedBinding);
44+
45+
/**
46+
* Decides whether a consumer is recovered or not.
47+
* @param recordedConsumer
48+
* @return true to recover the consumer, false otherwise
49+
*/
50+
boolean filterConsumer(RecordedConsumer recordedConsumer);
51+
52+
}

src/test/java/com/rabbitmq/client/test/BrokerTestCase.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -300,10 +300,26 @@ protected void deleteExchange(String x) throws IOException {
300300
channel.exchangeDelete(x);
301301
}
302302

303+
protected void deleteExchanges(String [] exchanges) throws IOException {
304+
if (exchanges != null) {
305+
for (String exchange : exchanges) {
306+
deleteExchange(exchange);
307+
}
308+
}
309+
}
310+
303311
protected void deleteQueue(String q) throws IOException {
304312
channel.queueDelete(q);
305313
}
306314

315+
protected void deleteQueues(String [] queues) throws IOException {
316+
if (queues != null) {
317+
for (String queue : queues) {
318+
deleteQueue(queue);
319+
}
320+
}
321+
}
322+
307323
protected void clearAllResourceAlarms() throws IOException, InterruptedException {
308324
clearResourceAlarm("memory");
309325
clearResourceAlarm("disk");

0 commit comments

Comments
 (0)