Skip to content

Commit c454b63

Browse files
committed
parallel recovery
1 parent 0427cfd commit c454b63

File tree

8 files changed

+190
-96
lines changed

8 files changed

+190
-96
lines changed

src/main/java/com/rabbitmq/client/ConnectionFactory.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,8 @@ public class ConnectionFactory implements Cloneable {
127127

128128
private boolean automaticRecovery = true;
129129
private boolean topologyRecovery = true;
130-
130+
private int topologyRecoveryThreads = 1;
131+
131132
// long is used to make sure the users can use both ints
132133
// and longs safely. It is unlikely that anybody'd need
133134
// to use recovery intervals > Integer.MAX_VALUE in practice.
@@ -339,7 +340,7 @@ public void setUri(String uriString)
339340
setUri(new URI(uriString));
340341
}
341342

342-
private String uriDecode(String s) {
343+
private static String uriDecode(String s) {
343344
try {
344345
// URLDecode decodes '+' to a space, as for
345346
// form encoding. So protect plus signs.
@@ -523,7 +524,6 @@ public void setSocketFactory(SocketFactory factory) {
523524
*
524525
* @see #setSocketConfigurator(SocketConfigurator)
525526
*/
526-
@SuppressWarnings("unused")
527527
public SocketConfigurator getSocketConfigurator() {
528528
return socketConf;
529529
}
@@ -701,7 +701,6 @@ public void setAutomaticRecoveryEnabled(boolean automaticRecovery) {
701701
* @return true if topology recovery is enabled, false otherwise
702702
* @see <a href="http://www.rabbitmq.com/api-guide.html#recovery">Automatic Recovery</a>
703703
*/
704-
@SuppressWarnings("unused")
705704
public boolean isTopologyRecoveryEnabled() {
706705
return topologyRecovery;
707706
}
@@ -714,6 +713,15 @@ public boolean isTopologyRecoveryEnabled() {
714713
public void setTopologyRecoveryEnabled(boolean topologyRecovery) {
715714
this.topologyRecovery = topologyRecovery;
716715
}
716+
717+
public int getTopologyRecoveryThreadCount() {
718+
return topologyRecoveryThreads;
719+
}
720+
721+
// TODO Document that your exception handler method should be thread safe
722+
public void setTopologyRecoveryThreadCount(final int topologyRecoveryThreads) {
723+
this.topologyRecoveryThreads = topologyRecoveryThreads;
724+
}
717725

718726
public void setMetricsCollector(MetricsCollector metricsCollector) {
719727
this.metricsCollector = metricsCollector;
@@ -1013,6 +1021,7 @@ public ConnectionParams params(ExecutorService consumerWorkServiceExecutor) {
10131021
result.setNetworkRecoveryInterval(networkRecoveryInterval);
10141022
result.setRecoveryDelayHandler(recoveryDelayHandler);
10151023
result.setTopologyRecovery(topologyRecovery);
1024+
result.setTopologyRecoveryThreadCount(topologyRecoveryThreads);
10161025
result.setExceptionHandler(exceptionHandler);
10171026
result.setThreadFactory(threadFactory);
10181027
result.setHandshakeTimeout(handshakeTimeout);

src/main/java/com/rabbitmq/client/impl/ConnectionParams.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ public class ConnectionParams {
4141
private long networkRecoveryInterval;
4242
private RecoveryDelayHandler recoveryDelayHandler;
4343
private boolean topologyRecovery;
44+
private int topologyRecoveryThreads = 1;
4445
private int channelRpcTimeout;
4546
private boolean channelShouldCheckRpcResponseType;
4647
private ErrorOnWriteListener errorOnWriteListener;
@@ -114,10 +115,14 @@ public RecoveryDelayHandler getRecoveryDelayHandler() {
114115
public boolean isTopologyRecoveryEnabled() {
115116
return topologyRecovery;
116117
}
118+
119+
public int getTopologyRecoveryThreadCount() {
120+
return topologyRecoveryThreads;
121+
}
117122

118123
public ThreadFactory getThreadFactory() {
119-
return threadFactory;
120-
}
124+
return threadFactory;
125+
}
121126

122127
public int getChannelRpcTimeout() {
123128
return channelRpcTimeout;
@@ -174,6 +179,10 @@ public void setRecoveryDelayHandler(final RecoveryDelayHandler recoveryDelayHand
174179
public void setTopologyRecovery(boolean topologyRecovery) {
175180
this.topologyRecovery = topologyRecovery;
176181
}
182+
183+
public void setTopologyRecoveryThreadCount(final int topologyRecoveryThreads) {
184+
this.topologyRecoveryThreads = topologyRecoveryThreads;
185+
}
177186

178187
public void setExceptionHandler(ExceptionHandler exceptionHandler) {
179188
this.exceptionHandler = exceptionHandler;

src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java

Lines changed: 141 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,10 @@
2828
import java.io.IOException;
2929
import java.net.InetAddress;
3030
import java.util.*;
31+
import java.util.concurrent.Callable;
3132
import java.util.concurrent.ConcurrentHashMap;
3233
import java.util.concurrent.ExecutorService;
34+
import java.util.concurrent.Executors;
3335
import java.util.concurrent.ThreadFactory;
3436
import java.util.concurrent.TimeoutException;
3537
import java.util.concurrent.locks.Lock;
@@ -525,7 +527,7 @@ public void removeConsumerRecoveryListener(ConsumerRecoveryListener listener) {
525527
this.consumerRecoveryListeners.remove(listener);
526528
}
527529

528-
synchronized private void beginAutomaticRecovery() throws InterruptedException {
530+
private synchronized void beginAutomaticRecovery() throws InterruptedException {
529531
Thread.sleep(this.params.getRecoveryDelayHandler().getDelay(0));
530532

531533
this.notifyRecoveryListenersStarted();
@@ -541,11 +543,9 @@ synchronized private void beginAutomaticRecovery() throws InterruptedException {
541543
this.recoverChannels(newConn);
542544
// don't assign new delegate connection until channel recovery is complete
543545
this.delegate = newConn;
544-
if(this.params.isTopologyRecoveryEnabled()) {
545-
this.recoverEntities();
546-
this.recoverConsumers();
546+
if (this.params.isTopologyRecoveryEnabled()) {
547+
recoverTopology(params.getTopologyRecoveryThreadCount());
547548
}
548-
549549
this.notifyRecoveryListenersComplete();
550550
}
551551

@@ -591,8 +591,10 @@ private RecoveryAwareAMQConnection recoverConnection() throws InterruptedExcepti
591591

592592
private void recoverChannels(final RecoveryAwareAMQConnection newConn) {
593593
for (AutorecoveringChannel ch : this.channels.values()) {
594+
LOGGER.debug("Recovering channel {}", ch);
594595
try {
595596
ch.automaticallyRecover(this, newConn);
597+
LOGGER.debug("Channel {} has recovered", ch);
596598
} catch (Throwable t) {
597599
newConn.getExceptionHandler().handleChannelRecoveryException(ch, t);
598600
}
@@ -610,113 +612,126 @@ private void notifyRecoveryListenersStarted() {
610612
f.handleRecoveryStarted(this);
611613
}
612614
}
613-
614-
private void recoverEntities() {
615+
616+
private void recoverTopology(final int recoveryThreads) throws InterruptedException {
615617
// The recovery sequence is the following:
616-
//
617618
// 1. Recover exchanges
618619
// 2. Recover queues
619620
// 3. Recover bindings
620621
// 4. Recover consumers
621-
recoverExchanges();
622-
recoverQueues();
623-
recoverBindings();
624-
}
625-
626-
private void recoverExchanges() {
627-
// recorded exchanges are guaranteed to be
628-
// non-predefined (we filter out predefined ones
629-
// in exchangeDeclare). MK.
630-
for (RecordedExchange x : Utility.copy(this.recordedExchanges).values()) {
622+
if (recoveryThreads > 1) {
623+
// Support recovering entities in parallel for connections that have a lot of queues, bindings, & consumers
624+
// A channel is single threaded, so group things by channel and recover 1 entity at a time per channel
625+
// We still need to recover 1 type of entity at a time in case channel1 has a binding to a queue that is currently owned and being recovered by channel2 for example
626+
final ExecutorService executor = Executors.newFixedThreadPool(recoveryThreads, delegate.getThreadFactory());
631627
try {
632-
x.recover();
633-
} catch (Exception cause) {
634-
final String message = "Caught an exception while recovering exchange " + x.getName() +
635-
": " + cause.getMessage();
636-
TopologyRecoveryException e = new TopologyRecoveryException(message, cause);
637-
this.getExceptionHandler().handleTopologyRecoveryException(delegate, x.getDelegateChannel(), e);
628+
// invokeAll will block until all callables are completed
629+
executor.invokeAll(groupEntitiesByChannel(Utility.copy(recordedExchanges).values()));
630+
executor.invokeAll(groupEntitiesByChannel(Utility.copy(recordedQueues).values()));
631+
executor.invokeAll(groupEntitiesByChannel(Utility.copy(recordedBindings)));
632+
executor.invokeAll(groupEntitiesByChannel(Utility.copy(consumers).values()));
633+
} finally {
634+
executor.shutdownNow();
635+
}
636+
} else {
637+
// recover entities in serial on the main connection thread
638+
for (final RecordedExchange exchange : Utility.copy(recordedExchanges).values()) {
639+
recoverExchange(exchange);
640+
}
641+
for (final Map.Entry<String, RecordedQueue> entry : Utility.copy(recordedQueues).entrySet()) {
642+
recoverQueue(entry.getKey(), entry.getValue());
643+
}
644+
for (final RecordedBinding b : Utility.copy(recordedBindings)) {
645+
recoverBinding(b);
646+
}
647+
for (final Map.Entry<String, RecordedConsumer> entry : Utility.copy(consumers).entrySet()) {
648+
recoverConsumer(entry.getKey(), entry.getValue());
638649
}
639650
}
640651
}
641652

642-
private void recoverQueues() {
643-
for (Map.Entry<String, RecordedQueue> entry : Utility.copy(this.recordedQueues).entrySet()) {
644-
String oldName = entry.getKey();
645-
RecordedQueue q = entry.getValue();
646-
try {
647-
q.recover();
648-
String newName = q.getName();
649-
if (!oldName.equals(newName)) {
650-
// make sure server-named queues are re-added with
651-
// their new names. MK.
652-
synchronized (this.recordedQueues) {
653-
this.propagateQueueNameChangeToBindings(oldName, newName);
654-
this.propagateQueueNameChangeToConsumers(oldName, newName);
655-
// bug26552:
656-
// remove old name after we've updated the bindings and consumers,
657-
// plus only for server-named queues, both to make sure we don't lose
658-
// anything to recover. MK.
659-
if(q.isServerNamed()) {
660-
deleteRecordedQueue(oldName);
661-
}
662-
this.recordedQueues.put(newName, q);
653+
private void recoverExchange(final RecordedExchange x) {
654+
// recorded exchanges are guaranteed to be non-predefined (we filter out predefined ones in exchangeDeclare). MK.
655+
LOGGER.debug("Recovering exchange {}", x);
656+
try {
657+
x.recover();
658+
LOGGER.debug("Exchange {} has recovered", x);
659+
} catch (Exception cause) {
660+
final String message = "Caught an exception while recovering exchange " + x.getName() +
661+
": " + cause.getMessage();
662+
TopologyRecoveryException e = new TopologyRecoveryException(message, cause);
663+
this.getExceptionHandler().handleTopologyRecoveryException(delegate, x.getDelegateChannel(), e);
664+
}
665+
}
666+
667+
private void recoverQueue(final String oldName, final RecordedQueue q) {
668+
LOGGER.debug("Recovering queue {}", q);
669+
try {
670+
q.recover();
671+
String newName = q.getName();
672+
if (!oldName.equals(newName)) {
673+
// make sure server-named queues are re-added with
674+
// their new names. MK.
675+
synchronized (this.recordedQueues) {
676+
this.propagateQueueNameChangeToBindings(oldName, newName);
677+
this.propagateQueueNameChangeToConsumers(oldName, newName);
678+
// bug26552:
679+
// remove old name after we've updated the bindings and consumers,
680+
// plus only for server-named queues, both to make sure we don't lose
681+
// anything to recover. MK.
682+
if(q.isServerNamed()) {
683+
deleteRecordedQueue(oldName);
663684
}
685+
this.recordedQueues.put(newName, q);
664686
}
665-
for(QueueRecoveryListener qrl : Utility.copy(this.queueRecoveryListeners)) {
666-
qrl.queueRecovered(oldName, newName);
667-
}
668-
} catch (Exception cause) {
669-
final String message = "Caught an exception while recovering queue " + oldName +
670-
": " + cause.getMessage();
671-
TopologyRecoveryException e = new TopologyRecoveryException(message, cause);
672-
this.getExceptionHandler().handleTopologyRecoveryException(delegate, q.getDelegateChannel(), e);
673687
}
688+
for (QueueRecoveryListener qrl : Utility.copy(this.queueRecoveryListeners)) {
689+
qrl.queueRecovered(oldName, newName);
690+
}
691+
LOGGER.debug("Queue {} has recovered", q);
692+
} catch (Exception cause) {
693+
final String message = "Caught an exception while recovering queue " + oldName +
694+
": " + cause.getMessage();
695+
TopologyRecoveryException e = new TopologyRecoveryException(message, cause);
696+
this.getExceptionHandler().handleTopologyRecoveryException(delegate, q.getDelegateChannel(), e);
674697
}
675698
}
676699

677-
private void recoverBindings() {
678-
for (RecordedBinding b : Utility.copy(this.recordedBindings)) {
679-
try {
680-
b.recover();
681-
} catch (Exception cause) {
682-
String message = "Caught an exception while recovering binding between " + b.getSource() +
683-
" and " + b.getDestination() + ": " + cause.getMessage();
684-
TopologyRecoveryException e = new TopologyRecoveryException(message, cause);
685-
this.getExceptionHandler().handleTopologyRecoveryException(delegate, b.getDelegateChannel(), e);
686-
}
700+
private void recoverBinding(final RecordedBinding b) {
701+
LOGGER.debug("Recovering binding {}", b);
702+
try {
703+
b.recover();
704+
LOGGER.debug("Binding {} has recovered", b);
705+
} catch (Exception cause) {
706+
String message = "Caught an exception while recovering binding between " + b.getSource() +
707+
" and " + b.getDestination() + ": " + cause.getMessage();
708+
TopologyRecoveryException e = new TopologyRecoveryException(message, cause);
709+
this.getExceptionHandler().handleTopologyRecoveryException(delegate, b.getDelegateChannel(), e);
687710
}
688711
}
689712

690-
private void recoverConsumers() {
691-
for (Map.Entry<String, RecordedConsumer> entry : Utility.copy(this.consumers).entrySet()) {
692-
String tag = entry.getKey();
693-
RecordedConsumer consumer = entry.getValue();
694-
if (LOGGER.isDebugEnabled()) {
695-
LOGGER.debug("Recovering consumer {}", consumer);
696-
}
697-
try {
698-
String newTag = consumer.recover();
699-
// make sure server-generated tags are re-added. MK.
700-
if(tag != null && !tag.equals(newTag)) {
701-
synchronized (this.consumers) {
702-
this.consumers.remove(tag);
703-
this.consumers.put(newTag, consumer);
704-
}
705-
consumer.getChannel().updateConsumerTag(tag, newTag);
713+
private void recoverConsumer(final String tag, final RecordedConsumer consumer) {
714+
LOGGER.debug("Recovering consumer {}", consumer);
715+
try {
716+
String newTag = consumer.recover();
717+
// make sure server-generated tags are re-added. MK.
718+
if(tag != null && !tag.equals(newTag)) {
719+
synchronized (this.consumers) {
720+
this.consumers.remove(tag);
721+
this.consumers.put(newTag, consumer);
706722
}
723+
consumer.getChannel().updateConsumerTag(tag, newTag);
724+
}
707725

708-
for(ConsumerRecoveryListener crl : Utility.copy(this.consumerRecoveryListeners)) {
709-
crl.consumerRecovered(tag, newTag);
710-
}
711-
if (LOGGER.isDebugEnabled()) {
712-
LOGGER.debug("Consumer {} has recovered", consumer);
713-
}
714-
} catch (Exception cause) {
715-
final String message = "Caught an exception while recovering consumer " + tag +
716-
": " + cause.getMessage();
717-
TopologyRecoveryException e = new TopologyRecoveryException(message, cause);
718-
this.getExceptionHandler().handleTopologyRecoveryException(delegate, consumer.getDelegateChannel(), e);
726+
for (ConsumerRecoveryListener crl : Utility.copy(this.consumerRecoveryListeners)) {
727+
crl.consumerRecovered(tag, newTag);
719728
}
729+
LOGGER.debug("Consumer {} has recovered", consumer);
730+
} catch (Exception cause) {
731+
final String message = "Caught an exception while recovering consumer " + tag +
732+
": " + cause.getMessage();
733+
TopologyRecoveryException e = new TopologyRecoveryException(message, cause);
734+
this.getExceptionHandler().handleTopologyRecoveryException(delegate, consumer.getDelegateChannel(), e);
720735
}
721736
}
722737

@@ -735,6 +750,42 @@ private void propagateQueueNameChangeToConsumers(String oldName, String newName)
735750
}
736751
}
737752
}
753+
754+
private <E extends RecordedEntity> List<Callable<Object>> groupEntitiesByChannel(final Collection<E> entities) {
755+
// map entities by channel
756+
final Map<AutorecoveringChannel, List<E>> map = new LinkedHashMap<AutorecoveringChannel, List<E>>();
757+
for (final E entity : entities) {
758+
final AutorecoveringChannel channel = entity.getChannel();
759+
List<E> list = map.get(channel);
760+
if (list == null) {
761+
map.put(channel, list = new ArrayList<E>());
762+
}
763+
list.add(entity);
764+
}
765+
// now create a runnable per channel
766+
final List<Callable<Object>> callables = new ArrayList<Callable<Object>>();
767+
for (final List<E> entityList : map.values()) {
768+
callables.add(Executors.callable(new Runnable() {
769+
@Override
770+
public void run() {
771+
for (final E entity : entityList) {
772+
if (entity instanceof RecordedExchange) {
773+
recoverExchange((RecordedExchange)entity);
774+
} else if (entity instanceof RecordedQueue) {
775+
final RecordedQueue q = (RecordedQueue) entity;
776+
recoverQueue(q.getName(), q);
777+
} else if (entity instanceof RecordedBinding) {
778+
recoverBinding((RecordedBinding) entity);
779+
} else if (entity instanceof RecordedConsumer) {
780+
final RecordedConsumer c = (RecordedConsumer) entity;
781+
recoverConsumer(c.getConsumerTag(), c);
782+
}
783+
}
784+
}
785+
}));
786+
}
787+
return callables;
788+
}
738789

739790
void recordQueueBinding(AutorecoveringChannel ch,
740791
String queue,

0 commit comments

Comments
 (0)