Skip to content

Commit 6481c0b

Browse files
committed
Merge branch '4.x.x-stable' into 5.1.x-stable
Conflicts: src/test/java/com/rabbitmq/client/test/ClientTests.java
2 parents e37f15f + c2551b0 commit 6481c0b

File tree

13 files changed

+545
-41
lines changed

13 files changed

+545
-41
lines changed

src/main/java/com/rabbitmq/client/ConnectionFactory.java

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,9 @@ public class ConnectionFactory implements Cloneable {
8585
/** The default network recovery interval: 5000 millis */
8686
public static final long DEFAULT_NETWORK_RECOVERY_INTERVAL = 5000;
8787

88+
/** The default timeout for work pool enqueueing: no timeout */
89+
public static final int DEFAULT_WORK_POOL_TIMEOUT = -1;
90+
8891
private static final String PREFERRED_TLS_PROTOCOL = "TLSv1.2";
8992

9093
private static final String FALLBACK_TLS_PROTOCOL = "TLSv1";
@@ -142,6 +145,20 @@ public class ConnectionFactory implements Cloneable {
142145
*/
143146
private boolean channelShouldCheckRpcResponseType = false;
144147

148+
/**
149+
* Listener called when a connection gets an IO error trying to write on the socket.
150+
* Default listener triggers connection recovery asynchronously and propagates
151+
* the exception.
152+
* @since 4.5.0
153+
*/
154+
private ErrorOnWriteListener errorOnWriteListener;
155+
156+
/**
157+
* Timeout in ms for work pool enqueuing.
158+
* @since 4.5.0
159+
*/
160+
private int workPoolTimeout = DEFAULT_WORK_POOL_TIMEOUT;
161+
145162
/** @return the default host to use for connections */
146163
public String getHost() {
147164
return host;
@@ -997,6 +1014,8 @@ public ConnectionParams params(ExecutorService consumerWorkServiceExecutor) {
9971014
result.setHeartbeatExecutor(heartbeatExecutor);
9981015
result.setChannelRpcTimeout(channelRpcTimeout);
9991016
result.setChannelShouldCheckRpcResponseType(channelShouldCheckRpcResponseType);
1017+
result.setWorkPoolTimeout(workPoolTimeout);
1018+
result.setErrorOnWriteListener(errorOnWriteListener);
10001019
return result;
10011020
}
10021021

@@ -1309,4 +1328,40 @@ public void setChannelShouldCheckRpcResponseType(boolean channelShouldCheckRpcRe
13091328
public boolean isChannelShouldCheckRpcResponseType() {
13101329
return channelShouldCheckRpcResponseType;
13111330
}
1331+
1332+
/**
1333+
* Timeout (in ms) for work pool enqueueing.
1334+
* The {@link WorkPool} dispatches several types of responses
1335+
* from the broker (e.g. deliveries). A high-traffic
1336+
* client with slow consumers can exhaust the work pool and
1337+
* compromise the whole connection (by e.g. letting the broker
1338+
* saturate the receive TCP buffers). Setting a timeout
1339+
* would make the connection fail early and avoid hard-to-diagnose
1340+
* TCP connection failure. Note this shouldn't happen
1341+
* with clients that set appropriate QoS values.
1342+
* Default is no timeout.
1343+
*
1344+
* @param workPoolTimeout timeout in ms
1345+
* @since 4.5.0
1346+
*/
1347+
public void setWorkPoolTimeout(int workPoolTimeout) {
1348+
this.workPoolTimeout = workPoolTimeout;
1349+
}
1350+
1351+
public int getWorkPoolTimeout() {
1352+
return workPoolTimeout;
1353+
}
1354+
1355+
/**
1356+
* Set a listener to be called when connection gets an IO error trying to write on the socket.
1357+
* Default listener triggers connection recovery asynchronously and propagates
1358+
* the exception. Override the default listener to disable or
1359+
* customise automatic connection triggering on write operations.
1360+
*
1361+
* @param errorOnWriteListener the listener
1362+
* @since 4.5.0
1363+
*/
1364+
public void setErrorOnWriteListener(ErrorOnWriteListener errorOnWriteListener) {
1365+
this.errorOnWriteListener = errorOnWriteListener;
1366+
}
13121367
}

src/main/java/com/rabbitmq/client/impl/AMQConnection.java

Lines changed: 61 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import java.net.SocketTimeoutException;
3333
import java.util.*;
3434
import java.util.concurrent.*;
35+
import java.util.concurrent.atomic.AtomicBoolean;
3536

3637
final class Copyright {
3738
final static String COPYRIGHT="Copyright (c) 2007-2017 Pivotal Software, Inc.";
@@ -61,6 +62,12 @@ public class AMQConnection extends ShutdownNotifierComponent implements Connecti
6162
private final List<RecoveryCanBeginListener> recoveryCanBeginListeners =
6263
Collections.synchronizedList(new ArrayList<RecoveryCanBeginListener>());
6364

65+
private final ErrorOnWriteListener errorOnWriteListener;
66+
67+
private final int workPoolTimeout;
68+
69+
private final AtomicBoolean finalShutdownStarted = new AtomicBoolean(false);
70+
6471
/**
6572
* Retrieve a copy of the default table of client properties that
6673
* will be sent to the server during connection startup. This
@@ -245,10 +252,17 @@ public AMQConnection(ConnectionParams params, FrameHandler frameHandler, Metrics
245252
this._inConnectionNegotiation = true; // we start out waiting for the first protocol response
246253

247254
this.metricsCollector = metricsCollector;
255+
256+
this.errorOnWriteListener = params.getErrorOnWriteListener() != null ? params.getErrorOnWriteListener() :
257+
new ErrorOnWriteListener() {
258+
@Override
259+
public void handle(Connection connection, IOException exception) { }
260+
};
261+
this.workPoolTimeout = params.getWorkPoolTimeout();
248262
}
249263

250264
private void initializeConsumerWorkService() {
251-
this._workService = new ConsumerWorkService(consumerWorkServiceExecutor, threadFactory, shutdownTimeout);
265+
this._workService = new ConsumerWorkService(consumerWorkServiceExecutor, threadFactory, workPoolTimeout, shutdownTimeout);
252266
}
253267

254268
private void initializeHeartbeatSender() {
@@ -556,7 +570,11 @@ public void writeFrame(Frame f) throws IOException {
556570
* Public API - flush the output buffers
557571
*/
558572
public void flush() throws IOException {
559-
_frameHandler.flush();
573+
try {
574+
_frameHandler.flush();
575+
} catch (IOException ioe) {
576+
this.errorOnWriteListener.handle(this, ioe);
577+
}
560578
}
561579

562580
private static int negotiatedMaxValue(int clientValue, int serverValue) {
@@ -575,15 +593,24 @@ private class MainLoop implements Runnable {
575593
*/
576594
@Override
577595
public void run() {
596+
boolean shouldDoFinalShutdown = true;
578597
try {
579598
while (_running) {
580599
Frame frame = _frameHandler.readFrame();
581600
readFrame(frame);
582601
}
583602
} catch (Throwable ex) {
584-
handleFailure(ex);
603+
if (ex instanceof InterruptedException) {
604+
// loop has been interrupted during shutdown,
605+
// no need to do it again
606+
shouldDoFinalShutdown = false;
607+
} else {
608+
handleFailure(ex);
609+
}
585610
} finally {
586-
doFinalShutdown();
611+
if (shouldDoFinalShutdown) {
612+
doFinalShutdown();
613+
}
587614
}
588615
}
589616
}
@@ -594,6 +621,9 @@ public boolean handleReadFrame(Frame frame) {
594621
try {
595622
readFrame(frame);
596623
return true;
624+
} catch (WorkPoolFullException e) {
625+
// work pool is full, we propagate this one.
626+
throw e;
597627
} catch (Throwable ex) {
598628
try {
599629
handleFailure(ex);
@@ -686,14 +716,33 @@ private void handleFailure(Throwable ex) {
686716

687717
/** private API */
688718
public void doFinalShutdown() {
689-
_frameHandler.close();
690-
_appContinuation.set(null);
691-
notifyListeners();
692-
// assuming that shutdown listeners do not do anything
693-
// asynchronously, e.g. start new threads, this effectively
694-
// guarantees that we only begin recovery when all shutdown
695-
// listeners have executed
696-
notifyRecoveryCanBeginListeners();
719+
if (finalShutdownStarted.compareAndSet(false, true)) {
720+
_frameHandler.close();
721+
_appContinuation.set(null);
722+
closeMainLoopThreadIfNecessary();
723+
notifyListeners();
724+
// assuming that shutdown listeners do not do anything
725+
// asynchronously, e.g. start new threads, this effectively
726+
// guarantees that we only begin recovery when all shutdown
727+
// listeners have executed
728+
notifyRecoveryCanBeginListeners();
729+
}
730+
}
731+
732+
private void closeMainLoopThreadIfNecessary() {
733+
if (mainLoopReadThreadNotNull() && notInMainLoopThread()) {
734+
if (this.mainLoopThread.isAlive()) {
735+
this.mainLoopThread.interrupt();
736+
}
737+
}
738+
}
739+
740+
private boolean notInMainLoopThread() {
741+
return Thread.currentThread() != this.mainLoopThread;
742+
}
743+
744+
private boolean mainLoopReadThreadNotNull() {
745+
return this.mainLoopThread != null;
697746
}
698747

699748
private void notifyRecoveryCanBeginListeners() {
@@ -879,7 +928,6 @@ private ShutdownSignalException startShutdown(Method reason,
879928
_heartbeatSender.shutdown();
880929

881930
_channel0.processShutdownSignal(sse, !initiatedByApplication, notifyRpc);
882-
883931
return sse;
884932
}
885933

src/main/java/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -392,6 +392,9 @@ private void releaseChannel() {
392392
if (callback != null) {
393393
try {
394394
this.dispatcher.handleCancel(callback, consumerTag);
395+
} catch (WorkPoolFullException e) {
396+
// couldn't enqueue in work pool, propagating
397+
throw e;
395398
} catch (Throwable ex) {
396399
getConnection().getExceptionHandler().handleConsumerException(this,
397400
ex,
@@ -450,10 +453,13 @@ protected void processDelivery(Command command, Basic.Deliver method) {
450453
// in case a manual ack in the callback, the stats will be able to record the ack
451454
metricsCollector.consumedMessage(this, m.getDeliveryTag(), m.getConsumerTag());
452455
this.dispatcher.handleDelivery(callback,
453-
m.getConsumerTag(),
454-
envelope,
455-
(BasicProperties) command.getContentHeader(),
456-
command.getContentBody());
456+
m.getConsumerTag(),
457+
envelope,
458+
(BasicProperties) command.getContentHeader(),
459+
command.getContentBody());
460+
} catch (WorkPoolFullException e) {
461+
// couldn't enqueue in work pool, propagating
462+
throw e;
457463
} catch (Throwable ex) {
458464
getConnection().getExceptionHandler().handleConsumerException(this,
459465
ex,

src/main/java/com/rabbitmq/client/impl/ConnectionParams.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ public class ConnectionParams {
4444
private boolean topologyRecovery;
4545
private int channelRpcTimeout;
4646
private boolean channelShouldCheckRpcResponseType;
47+
private ErrorOnWriteListener errorOnWriteListener;
48+
private int workPoolTimeout = -1;
4749

4850
private ExceptionHandler exceptionHandler;
4951
private ThreadFactory threadFactory;
@@ -213,4 +215,20 @@ public void setChannelRpcTimeout(int channelRpcTimeout) {
213215
public void setChannelShouldCheckRpcResponseType(boolean channelShouldCheckRpcResponseType) {
214216
this.channelShouldCheckRpcResponseType = channelShouldCheckRpcResponseType;
215217
}
218+
219+
public void setErrorOnWriteListener(ErrorOnWriteListener errorOnWriteListener) {
220+
this.errorOnWriteListener = errorOnWriteListener;
221+
}
222+
223+
public ErrorOnWriteListener getErrorOnWriteListener() {
224+
return errorOnWriteListener;
225+
}
226+
227+
public void setWorkPoolTimeout(int workPoolTimeout) {
228+
this.workPoolTimeout = workPoolTimeout;
229+
}
230+
231+
public int getWorkPoolTimeout() {
232+
return workPoolTimeout;
233+
}
216234
}

src/main/java/com/rabbitmq/client/impl/ConsumerWorkService.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,14 +31,18 @@ final public class ConsumerWorkService {
3131
private final WorkPool<Channel, Runnable> workPool;
3232
private final int shutdownTimeout;
3333

34-
public ConsumerWorkService(ExecutorService executor, ThreadFactory threadFactory, int shutdownTimeout) {
34+
public ConsumerWorkService(ExecutorService executor, ThreadFactory threadFactory, int queueingTimeout, int shutdownTimeout) {
3535
this.privateExecutor = (executor == null);
3636
this.executor = (executor == null) ? Executors.newFixedThreadPool(DEFAULT_NUM_THREADS, threadFactory)
37-
: executor;
38-
this.workPool = new WorkPool<Channel, Runnable>();
37+
: executor;
38+
this.workPool = new WorkPool<Channel, Runnable>(queueingTimeout);
3939
this.shutdownTimeout = shutdownTimeout;
4040
}
4141

42+
public ConsumerWorkService(ExecutorService executor, ThreadFactory threadFactory, int shutdownTimeout) {
43+
this(executor, threadFactory, -1, shutdownTimeout);
44+
}
45+
4246
public int getShutdownTimeout() {
4347
return shutdownTimeout;
4448
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
// Copyright (c) 2018-Present Pivotal Software, Inc. All rights reserved.
2+
//
3+
// This software, the RabbitMQ Java client library, is triple-licensed under the
4+
// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2
5+
// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
6+
// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,
7+
// please see LICENSE-APACHE2.
8+
//
9+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
10+
// either express or implied. See the LICENSE file for specific language governing
11+
// rights and limitations of this software.
12+
//
13+
// If you have any questions regarding licensing, please contact us at
14+
// info@rabbitmq.com.
15+
16+
package com.rabbitmq.client.impl;
17+
18+
import com.rabbitmq.client.Connection;
19+
20+
import java.io.IOException;
21+
22+
/**
23+
* Listener called when a connection gets an IO error trying to write on the socket.
24+
* This can be used to trigger connection recovery.
25+
*
26+
* @since 4.5.0
27+
*/
28+
public interface ErrorOnWriteListener {
29+
30+
/**
31+
* Called when writing to the socket failed
32+
* @param connection the owning connection instance
33+
* @param exception the thrown exception
34+
*/
35+
void handle(Connection connection, IOException exception) throws IOException;
36+
37+
}

0 commit comments

Comments
 (0)