Skip to content

Commit 2b251b0

Browse files
committed
Add option for connection recovery triggering
Fixes #379
1 parent 3f675f4 commit 2b251b0

File tree

3 files changed

+48
-11
lines changed

3 files changed

+48
-11
lines changed

src/main/java/com/rabbitmq/client/ConnectionFactory.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import java.util.concurrent.ScheduledExecutorService;
4747
import java.util.concurrent.ThreadFactory;
4848
import java.util.concurrent.TimeoutException;
49+
import java.util.function.Predicate;
4950
import javax.net.SocketFactory;
5051
import javax.net.ssl.SSLContext;
5152
import javax.net.ssl.SSLSocketFactory;
@@ -177,6 +178,12 @@ public class ConnectionFactory implements Cloneable {
177178
*/
178179
private int workPoolTimeout = DEFAULT_WORK_POOL_TIMEOUT;
179180

181+
/**
182+
* Condition to trigger automatic connection recovery.
183+
* @since 5.4.0
184+
*/
185+
private Predicate<ShutdownSignalException> connectionRecoveryTriggeringCondition;
186+
180187
/** @return the default host to use for connections */
181188
public String getHost() {
182189
return host;
@@ -1070,6 +1077,7 @@ public ConnectionParams params(ExecutorService consumerWorkServiceExecutor) {
10701077
result.setChannelShouldCheckRpcResponseType(channelShouldCheckRpcResponseType);
10711078
result.setWorkPoolTimeout(workPoolTimeout);
10721079
result.setErrorOnWriteListener(errorOnWriteListener);
1080+
result.setConnectionRecoveryTriggeringCondition(connectionRecoveryTriggeringCondition);
10731081
return result;
10741082
}
10751083

@@ -1419,4 +1427,14 @@ public int getWorkPoolTimeout() {
14191427
public void setErrorOnWriteListener(ErrorOnWriteListener errorOnWriteListener) {
14201428
this.errorOnWriteListener = errorOnWriteListener;
14211429
}
1430+
1431+
/**
1432+
* Allows to decide on automatic connection recovery is triggered.
1433+
* Default is for shutdown not initiated by application or missed heartbeat errors.
1434+
* @param connectionRecoveryTriggeringCondition
1435+
*/
1436+
public void setConnectionRecoveryTriggeringCondition(Predicate<ShutdownSignalException> connectionRecoveryTriggeringCondition) {
1437+
this.connectionRecoveryTriggeringCondition = connectionRecoveryTriggeringCondition;
1438+
}
1439+
14221440
}

src/main/java/com/rabbitmq/client/impl/ConnectionParams.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,13 @@
1919
import com.rabbitmq.client.RecoveryDelayHandler;
2020
import com.rabbitmq.client.RecoveryDelayHandler.DefaultRecoveryDelayHandler;
2121
import com.rabbitmq.client.SaslConfig;
22+
import com.rabbitmq.client.ShutdownSignalException;
2223

2324
import java.util.Map;
2425
import java.util.concurrent.ExecutorService;
2526
import java.util.concurrent.ScheduledExecutorService;
2627
import java.util.concurrent.ThreadFactory;
28+
import java.util.function.Predicate;
2729

2830
public class ConnectionParams {
2931
private CredentialsProvider credentialsProvider;
@@ -46,6 +48,7 @@ public class ConnectionParams {
4648
private boolean channelShouldCheckRpcResponseType;
4749
private ErrorOnWriteListener errorOnWriteListener;
4850
private int workPoolTimeout = -1;
51+
private Predicate<ShutdownSignalException> connectionRecoveryTriggeringCondition;
4952

5053
private ExceptionHandler exceptionHandler;
5154
private ThreadFactory threadFactory;
@@ -235,4 +238,12 @@ public void setWorkPoolTimeout(int workPoolTimeout) {
235238
public int getWorkPoolTimeout() {
236239
return workPoolTimeout;
237240
}
241+
242+
public void setConnectionRecoveryTriggeringCondition(Predicate<ShutdownSignalException> connectionRecoveryTriggeringCondition) {
243+
this.connectionRecoveryTriggeringCondition = connectionRecoveryTriggeringCondition;
244+
}
245+
246+
public Predicate<ShutdownSignalException> getConnectionRecoveryTriggeringCondition() {
247+
return connectionRecoveryTriggeringCondition;
248+
}
238249
}

src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import java.util.concurrent.TimeoutException;
3838
import java.util.concurrent.locks.Lock;
3939
import java.util.concurrent.locks.ReentrantLock;
40+
import java.util.function.Predicate;
4041

4142
/**
4243
* Connection implementation that performs automatic recovery when
@@ -61,6 +62,9 @@
6162
*/
6263
public class AutorecoveringConnection implements RecoverableConnection, NetworkConnection {
6364

65+
public static final Predicate<ShutdownSignalException> DEFAULT_CONNECTION_RECOVERY_TRIGGERING_CONDITION =
66+
cause -> !cause.isInitiatedByApplication() || (cause.getCause() instanceof MissedHeartbeatException);
67+
6468
private static final Logger LOGGER = LoggerFactory.getLogger(AutorecoveringConnection.class);
6569

6670
private final RecoveryAwareAMQConnectionFactory cf;
@@ -87,6 +91,8 @@ public class AutorecoveringConnection implements RecoverableConnection, NetworkC
8791
// be created after application code has initiated shutdown.
8892
private final Object recoveryLock = new Object();
8993

94+
private final Predicate<ShutdownSignalException> connectionRecoveryTriggeringCondition;
95+
9096
public AutorecoveringConnection(ConnectionParams params, FrameHandlerFactory f, List<Address> addrs) {
9197
this(params, f, new ListAddressResolver(addrs));
9298
}
@@ -99,9 +105,14 @@ public AutorecoveringConnection(ConnectionParams params, FrameHandlerFactory f,
99105
this.cf = new RecoveryAwareAMQConnectionFactory(params, f, addressResolver, metricsCollector);
100106
this.params = params;
101107

108+
this.connectionRecoveryTriggeringCondition = params.getConnectionRecoveryTriggeringCondition() == null ?
109+
DEFAULT_CONNECTION_RECOVERY_TRIGGERING_CONDITION : params.getConnectionRecoveryTriggeringCondition();
110+
111+
System.out.println(this.connectionRecoveryTriggeringCondition);
112+
102113
setupErrorOnWriteListenerForPotentialRecovery();
103114

104-
this.channels = new ConcurrentHashMap<Integer, AutorecoveringChannel>();
115+
this.channels = new ConcurrentHashMap<>();
105116
}
106117

107118
private void setupErrorOnWriteListenerForPotentialRecovery() {
@@ -484,16 +495,13 @@ private void addAutomaticRecoveryListener(final RecoveryAwareAMQConnection newCo
484495
final AutorecoveringConnection c = this;
485496
// this listener will run after shutdown listeners,
486497
// see https://github.com/rabbitmq/rabbitmq-java-client/issues/135
487-
RecoveryCanBeginListener starter = new RecoveryCanBeginListener() {
488-
@Override
489-
public void recoveryCanBegin(ShutdownSignalException cause) {
490-
try {
491-
if (shouldTriggerConnectionRecovery(cause)) {
492-
c.beginAutomaticRecovery();
493-
}
494-
} catch (Exception e) {
495-
newConn.getExceptionHandler().handleConnectionRecoveryException(c, e);
498+
RecoveryCanBeginListener starter = cause -> {
499+
try {
500+
if (shouldTriggerConnectionRecovery(cause)) {
501+
c.beginAutomaticRecovery();
496502
}
503+
} catch (Exception e) {
504+
newConn.getExceptionHandler().handleConnectionRecoveryException(c, e);
497505
}
498506
};
499507
synchronized (this) {
@@ -502,7 +510,7 @@ public void recoveryCanBegin(ShutdownSignalException cause) {
502510
}
503511

504512
protected boolean shouldTriggerConnectionRecovery(ShutdownSignalException cause) {
505-
return !cause.isInitiatedByApplication() || (cause.getCause() instanceof MissedHeartbeatException);
513+
return connectionRecoveryTriggeringCondition.test(cause);
506514
}
507515

508516
/**

0 commit comments

Comments
 (0)