Skip to content

Commit 774590a

Browse files
committed
recover channels in parallel too
1 parent e238df3 commit 774590a

File tree

4 files changed

+167
-64
lines changed

4 files changed

+167
-64
lines changed

src/main/java/com/rabbitmq/client/ConnectionFactory.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ public class ConnectionFactory implements Cloneable {
127127

128128
private boolean automaticRecovery = true;
129129
private boolean topologyRecovery = true;
130-
private int topologyRecoveryThreads = 1;
130+
private int recoveryThreads = 1;
131131

132132
// long is used to make sure the users can use both ints
133133
// and longs safely. It is unlikely that anybody'd need
@@ -714,13 +714,13 @@ public void setTopologyRecoveryEnabled(boolean topologyRecovery) {
714714
this.topologyRecovery = topologyRecovery;
715715
}
716716

717-
public int getTopologyRecoveryThreadCount() {
718-
return topologyRecoveryThreads;
717+
public int getRecoveryThreadCount() {
718+
return recoveryThreads;
719719
}
720720

721721
// TODO Document that your exception handler method should be thread safe
722-
public void setTopologyRecoveryThreadCount(final int topologyRecoveryThreads) {
723-
this.topologyRecoveryThreads = topologyRecoveryThreads;
722+
public void setRecoveryThreadCount(final int recoveryThreads) {
723+
this.recoveryThreads = recoveryThreads;
724724
}
725725

726726
public void setMetricsCollector(MetricsCollector metricsCollector) {
@@ -1021,7 +1021,7 @@ public ConnectionParams params(ExecutorService consumerWorkServiceExecutor) {
10211021
result.setNetworkRecoveryInterval(networkRecoveryInterval);
10221022
result.setRecoveryDelayHandler(recoveryDelayHandler);
10231023
result.setTopologyRecovery(topologyRecovery);
1024-
result.setTopologyRecoveryThreadCount(topologyRecoveryThreads);
1024+
result.setRecoveryThreadCount(recoveryThreads);
10251025
result.setExceptionHandler(exceptionHandler);
10261026
result.setThreadFactory(threadFactory);
10271027
result.setHandshakeTimeout(handshakeTimeout);

src/main/java/com/rabbitmq/client/impl/ConnectionParams.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ public class ConnectionParams {
4141
private long networkRecoveryInterval;
4242
private RecoveryDelayHandler recoveryDelayHandler;
4343
private boolean topologyRecovery;
44-
private int topologyRecoveryThreads = 1;
44+
private int recoveryThreads = 1;
4545
private int channelRpcTimeout;
4646
private boolean channelShouldCheckRpcResponseType;
4747
private ErrorOnWriteListener errorOnWriteListener;
@@ -116,8 +116,8 @@ public boolean isTopologyRecoveryEnabled() {
116116
return topologyRecovery;
117117
}
118118

119-
public int getTopologyRecoveryThreadCount() {
120-
return topologyRecoveryThreads;
119+
public int getRecoveryThreadCount() {
120+
return recoveryThreads;
121121
}
122122

123123
public ThreadFactory getThreadFactory() {
@@ -180,8 +180,8 @@ public void setTopologyRecovery(boolean topologyRecovery) {
180180
this.topologyRecovery = topologyRecovery;
181181
}
182182

183-
public void setTopologyRecoveryThreadCount(final int topologyRecoveryThreads) {
184-
this.topologyRecoveryThreads = topologyRecoveryThreads;
183+
public void setRecoveryThreadCount(final int recoveryThreads) {
184+
this.recoveryThreads = recoveryThreads;
185185
}
186186

187187
public void setExceptionHandler(ExceptionHandler exceptionHandler) {

src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java

Lines changed: 54 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -540,12 +540,25 @@ private synchronized void beginAutomaticRecovery() throws InterruptedException {
540540
this.addAutomaticRecoveryListener(newConn);
541541
this.recoverShutdownListeners(newConn);
542542
this.recoverBlockedListeners(newConn);
543-
this.recoverChannels(newConn);
544-
// don't assign new delegate connection until channel recovery is complete
545-
this.delegate = newConn;
546-
if (this.params.isTopologyRecoveryEnabled()) {
547-
recoverTopology(params.getTopologyRecoveryThreadCount());
548-
}
543+
544+
// Optionally support recovering channels & entities in parallel for connections that have a lot of channels, queues, bindings, etc.
545+
ExecutorService executor = null;
546+
if (params.getRecoveryThreadCount() > 1) {
547+
executor = Executors.newFixedThreadPool(params.getRecoveryThreadCount(), delegate.getThreadFactory());
548+
}
549+
try {
550+
this.recoverChannels(newConn, executor);
551+
// don't assign new delegate connection until channel recovery is complete
552+
this.delegate = newConn;
553+
// recover topology
554+
if (this.params.isTopologyRecoveryEnabled()) {
555+
recoverTopology(executor);
556+
}
557+
} finally {
558+
if (executor != null) {
559+
executor.shutdownNow();
560+
}
561+
}
549562
this.notifyRecoveryListenersComplete();
550563
}
551564

@@ -589,16 +602,34 @@ private RecoveryAwareAMQConnection recoverConnection() throws InterruptedExcepti
589602
return null;
590603
}
591604

592-
private void recoverChannels(final RecoveryAwareAMQConnection newConn) {
593-
for (AutorecoveringChannel ch : this.channels.values()) {
594-
try {
595-
ch.automaticallyRecover(this, newConn);
596-
LOGGER.debug("Channel {} has recovered", ch);
597-
} catch (Throwable t) {
598-
newConn.getExceptionHandler().handleChannelRecoveryException(ch, t);
605+
private void recoverChannels(final RecoveryAwareAMQConnection newConn, final ExecutorService executor) throws InterruptedException {
606+
if (executor != null) {
607+
final List<Callable<Object>> tasks = new ArrayList<Callable<Object>>();
608+
for (final AutorecoveringChannel ch : this.channels.values()) {
609+
tasks.add(Executors.callable(new Runnable() {
610+
@Override
611+
public void run() {
612+
recoverChannel(newConn, ch);
613+
}
614+
}));
615+
}
616+
// invokeAll will block until all callables are completed
617+
executor.invokeAll(tasks);
618+
} else {
619+
for (final AutorecoveringChannel ch : this.channels.values()) {
620+
recoverChannel(newConn, ch);
599621
}
600622
}
601623
}
624+
625+
private void recoverChannel(final RecoveryAwareAMQConnection newConn, final AutorecoveringChannel ch) {
626+
try {
627+
ch.automaticallyRecover(this, newConn);
628+
LOGGER.debug("Channel {} has recovered", ch);
629+
} catch (Throwable t) {
630+
newConn.getExceptionHandler().handleChannelRecoveryException(ch, t);
631+
}
632+
}
602633

603634
private void notifyRecoveryListenersComplete() {
604635
for (RecoveryListener f : Utility.copy(this.recoveryListeners)) {
@@ -612,26 +643,20 @@ private void notifyRecoveryListenersStarted() {
612643
}
613644
}
614645

615-
private void recoverTopology(final int recoveryThreads) throws InterruptedException {
646+
private void recoverTopology(final ExecutorService executor) throws InterruptedException {
616647
// The recovery sequence is the following:
617648
// 1. Recover exchanges
618649
// 2. Recover queues
619650
// 3. Recover bindings
620651
// 4. Recover consumers
621-
if (recoveryThreads > 1) {
622-
// Support recovering entities in parallel for connections that have a lot of queues, bindings, & consumers
652+
if (executor != null) {
623653
// A channel is single threaded, so group things by channel and recover 1 entity at a time per channel
624654
// We still need to recover 1 type of entity at a time in case channel1 has a binding to a queue that is currently owned and being recovered by channel2 for example
625-
final ExecutorService executor = Executors.newFixedThreadPool(recoveryThreads, delegate.getThreadFactory());
626-
try {
627-
// invokeAll will block until all callables are completed
628-
executor.invokeAll(groupEntitiesByChannel(Utility.copy(recordedExchanges).values()));
629-
executor.invokeAll(groupEntitiesByChannel(Utility.copy(recordedQueues).values()));
630-
executor.invokeAll(groupEntitiesByChannel(Utility.copy(recordedBindings)));
631-
executor.invokeAll(groupEntitiesByChannel(Utility.copy(consumers).values()));
632-
} finally {
633-
executor.shutdownNow();
634-
}
655+
// invokeAll will block until all callables are completed
656+
executor.invokeAll(groupEntitiesByChannel(Utility.copy(recordedExchanges).values()));
657+
executor.invokeAll(groupEntitiesByChannel(Utility.copy(recordedQueues).values()));
658+
executor.invokeAll(groupEntitiesByChannel(Utility.copy(recordedBindings)));
659+
executor.invokeAll(groupEntitiesByChannel(Utility.copy(consumers).values()));
635660
} else {
636661
// recover entities in serial on the main connection thread
637662
for (final RecordedExchange exchange : Utility.copy(recordedExchanges).values()) {
@@ -760,9 +785,9 @@ private <E extends RecordedEntity> List<Callable<Object>> groupEntitiesByChannel
760785
list.add(entity);
761786
}
762787
// now create a runnable per channel
763-
final List<Callable<Object>> callables = new ArrayList<Callable<Object>>();
788+
final List<Callable<Object>> tasks = new ArrayList<Callable<Object>>();
764789
for (final List<E> entityList : map.values()) {
765-
callables.add(Executors.callable(new Runnable() {
790+
tasks.add(Executors.callable(new Runnable() {
766791
@Override
767792
public void run() {
768793
for (final E entity : entityList) {
@@ -781,7 +806,7 @@ public void run() {
781806
}
782807
}));
783808
}
784-
return callables;
809+
return tasks;
785810
}
786811

787812
void recordQueueBinding(AutorecoveringChannel ch,

0 commit comments

Comments
 (0)