Skip to content

Commit 4e223b2

Browse files
committed
Add timeout to work pool enqueuing
Making the work pool fail after it didn't manage to enqueue work for a given time makes the client more reactive to broker overload. Note this usually happens to clients that do not set QoS properly. Neverlethess, making the client as early as possible can avoid hard-to-debug connection failure. This complements the triggering of connection recovery on failed write operations. Work pool enqueueing timeout is usefull for NIO, where the same thread is used for both reading and writing (if the thread is stuck waiting on work pool enqueueing, no write operation can occur, and the TCP connection failure is never detected). [#154263515] Fixes #341
1 parent c8926f8 commit 4e223b2

File tree

9 files changed

+135
-22
lines changed

9 files changed

+135
-22
lines changed

src/main/java/com/rabbitmq/client/ConnectionFactory.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,9 @@ public class ConnectionFactory implements Cloneable {
8181
/** The default network recovery interval: 5000 millis */
8282
public static final long DEFAULT_NETWORK_RECOVERY_INTERVAL = 5000;
8383

84+
/** The default timeout for work pool enqueueing: no timeout */
85+
public static final int DEFAULT_WORK_POOL_TIMEOUT = -1;
86+
8487
private static final String PREFERRED_TLS_PROTOCOL = "TLSv1.2";
8588

8689
private static final String FALLBACK_TLS_PROTOCOL = "TLSv1";
@@ -138,6 +141,12 @@ public class ConnectionFactory implements Cloneable {
138141
*/
139142
private boolean channelShouldCheckRpcResponseType = false;
140143

144+
/**
145+
* Timeout in ms for work pool enqueuing.
146+
* @since 4.5.0
147+
*/
148+
private int workPoolTimeout = DEFAULT_WORK_POOL_TIMEOUT;
149+
141150
/** @return the default host to use for connections */
142151
public String getHost() {
143152
return host;
@@ -974,6 +983,7 @@ public ConnectionParams params(ExecutorService consumerWorkServiceExecutor) {
974983
result.setHeartbeatExecutor(heartbeatExecutor);
975984
result.setChannelRpcTimeout(channelRpcTimeout);
976985
result.setChannelShouldCheckRpcResponseType(channelShouldCheckRpcResponseType);
986+
result.setWorkPoolTimeout(workPoolTimeout);
977987
return result;
978988
}
979989

@@ -1270,4 +1280,25 @@ public void setChannelShouldCheckRpcResponseType(boolean channelShouldCheckRpcRe
12701280
public boolean isChannelShouldCheckRpcResponseType() {
12711281
return channelShouldCheckRpcResponseType;
12721282
}
1283+
1284+
/**
1285+
* Timeout (in ms) for work pool enqueueing.
1286+
* The {@link WorkPool} dispatches several types of responses
1287+
* from the broker (e.g. deliveries). A high-traffic
1288+
* client with slow consumers can exhaust the work pool and
1289+
* compromise the whole connection (by e.g. letting the broker
1290+
* saturate the receive TCP buffers). Setting a timeout
1291+
* would make the connection fail early and avoid hard-to-diagnose
1292+
* TCP connection failure. Note this shouldn't happen
1293+
* with clients that set appropriate QoS values.
1294+
* Default is no timeout.
1295+
* @param workPoolTimeout timeout in ms
1296+
*/
1297+
public void setWorkPoolTimeout(int workPoolTimeout) {
1298+
this.workPoolTimeout = workPoolTimeout;
1299+
}
1300+
1301+
public int getWorkPoolTimeout() {
1302+
return workPoolTimeout;
1303+
}
12731304
}

src/main/java/com/rabbitmq/client/impl/AMQConnection.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,8 @@ public class AMQConnection extends ShutdownNotifierComponent implements Connecti
6464

6565
private final ErrorOnWriteListener errorOnWriteListener;
6666

67+
private final int workPoolTimeout;
68+
6769
private final AtomicBoolean finalShutdownStarted = new AtomicBoolean(false);
6870

6971
/**
@@ -255,12 +257,12 @@ public AMQConnection(ConnectionParams params, FrameHandler frameHandler, Metrics
255257
new ErrorOnWriteListener() {
256258
@Override
257259
public void handle(Connection connection, IOException exception) { }
258-
};
259-
260+
};
261+
this.workPoolTimeout = params.getWorkPoolTimeout();
260262
}
261263

262264
private void initializeConsumerWorkService() {
263-
this._workService = new ConsumerWorkService(consumerWorkServiceExecutor, threadFactory, shutdownTimeout);
265+
this._workService = new ConsumerWorkService(consumerWorkServiceExecutor, threadFactory, workPoolTimeout, shutdownTimeout);
264266
}
265267

266268
private void initializeHeartbeatSender() {
@@ -619,6 +621,9 @@ public boolean handleReadFrame(Frame frame) {
619621
try {
620622
readFrame(frame);
621623
return true;
624+
} catch (WorkPoolFullException e) {
625+
// work pool is full, we propagate this one.
626+
throw e;
622627
} catch (Throwable ex) {
623628
try {
624629
handleFailure(ex);

src/main/java/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -387,6 +387,9 @@ private void releaseChannel() {
387387
if (callback != null) {
388388
try {
389389
this.dispatcher.handleCancel(callback, consumerTag);
390+
} catch (WorkPoolFullException e) {
391+
// couldn't enqueue in work pool, propagating
392+
throw e;
390393
} catch (Throwable ex) {
391394
getConnection().getExceptionHandler().handleConsumerException(this,
392395
ex,
@@ -445,10 +448,13 @@ protected void processDelivery(Command command, Basic.Deliver method) {
445448
// in case a manual ack in the callback, the stats will be able to record the ack
446449
metricsCollector.consumedMessage(this, m.getDeliveryTag(), m.getConsumerTag());
447450
this.dispatcher.handleDelivery(callback,
448-
m.getConsumerTag(),
449-
envelope,
450-
(BasicProperties) command.getContentHeader(),
451-
command.getContentBody());
451+
m.getConsumerTag(),
452+
envelope,
453+
(BasicProperties) command.getContentHeader(),
454+
command.getContentBody());
455+
} catch (WorkPoolFullException e) {
456+
// couldn't enqueue in work pool, propagating
457+
throw e;
452458
} catch (Throwable ex) {
453459
getConnection().getExceptionHandler().handleConsumerException(this,
454460
ex,

src/main/java/com/rabbitmq/client/impl/ConnectionParams.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ public class ConnectionParams {
4545
private int channelRpcTimeout;
4646
private boolean channelShouldCheckRpcResponseType;
4747
private ErrorOnWriteListener errorOnWriteListener;
48+
private int workPoolTimeout = -1;
4849

4950
private ExceptionHandler exceptionHandler;
5051
private ThreadFactory threadFactory;
@@ -222,4 +223,12 @@ public void setErrorOnWriteListener(ErrorOnWriteListener errorOnWriteListener) {
222223
public ErrorOnWriteListener getErrorOnWriteListener() {
223224
return errorOnWriteListener;
224225
}
226+
227+
public void setWorkPoolTimeout(int workPoolTimeout) {
228+
this.workPoolTimeout = workPoolTimeout;
229+
}
230+
231+
public int getWorkPoolTimeout() {
232+
return workPoolTimeout;
233+
}
225234
}

src/main/java/com/rabbitmq/client/impl/ConsumerWorkService.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,14 +31,18 @@ final public class ConsumerWorkService {
3131
private final WorkPool<Channel, Runnable> workPool;
3232
private final int shutdownTimeout;
3333

34-
public ConsumerWorkService(ExecutorService executor, ThreadFactory threadFactory, int shutdownTimeout) {
34+
public ConsumerWorkService(ExecutorService executor, ThreadFactory threadFactory, int queueingTimeout, int shutdownTimeout) {
3535
this.privateExecutor = (executor == null);
3636
this.executor = (executor == null) ? Executors.newFixedThreadPool(DEFAULT_NUM_THREADS, threadFactory)
37-
: executor;
38-
this.workPool = new WorkPool<Channel, Runnable>();
37+
: executor;
38+
this.workPool = new WorkPool<Channel, Runnable>(queueingTimeout);
3939
this.shutdownTimeout = shutdownTimeout;
4040
}
4141

42+
public ConsumerWorkService(ExecutorService executor, ThreadFactory threadFactory, int shutdownTimeout) {
43+
this(executor, threadFactory, -1, shutdownTimeout);
44+
}
45+
4246
public int getShutdownTimeout() {
4347
return shutdownTimeout;
4448
}

src/main/java/com/rabbitmq/client/impl/WorkPool.java

Lines changed: 41 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@
2121
import java.util.Iterator;
2222
import java.util.Map;
2323
import java.util.Set;
24+
import java.util.concurrent.BlockingQueue;
25+
import java.util.concurrent.Callable;
26+
import java.util.concurrent.TimeUnit;
2427

2528
/**
2629
* <p>This is a generic implementation of the channels specification
@@ -61,6 +64,37 @@ public class WorkPool<K, W> {
6164
private final Map<K, VariableLinkedBlockingQueue<W>> pool = new HashMap<K, VariableLinkedBlockingQueue<W>>();
6265
/** Those keys which want limits to be removed. We do not limit queue size if this is non-empty. */
6366
private final Set<K> unlimited = new HashSet<K>();
67+
private final EnqueueingCallback<W> enqueueingCallback;
68+
69+
public WorkPool(final int queueingTimeout) {
70+
if (queueingTimeout > 0) {
71+
this.enqueueingCallback = new EnqueueingCallback<W>() {
72+
@Override
73+
public void enqueue(BlockingQueue<W> queue, W item) {
74+
try {
75+
boolean offered = queue.offer(item, queueingTimeout, TimeUnit.MILLISECONDS);
76+
if (!offered) {
77+
throw new WorkPoolFullException("Could not enqueue in work pool after " + queueingTimeout + " ms.");
78+
}
79+
} catch (InterruptedException e) {
80+
Thread.currentThread();
81+
}
82+
}
83+
};
84+
} else {
85+
this.enqueueingCallback = new EnqueueingCallback<W>() {
86+
87+
@Override
88+
public void enqueue(BlockingQueue<W> queue, W item) {
89+
try {
90+
queue.put(item);
91+
} catch (InterruptedException e) {
92+
Thread.currentThread().interrupt();
93+
}
94+
}
95+
};
96+
}
97+
}
6498

6599
/**
66100
* Add client <code><b>key</b></code> to pool of item queues, with an empty queue.
@@ -178,11 +212,7 @@ public boolean addWorkItem(K key, W item) {
178212
}
179213
// The put operation may block. We need to make sure we are not holding the lock while that happens.
180214
if (queue != null) {
181-
try {
182-
queue.put(item);
183-
} catch (InterruptedException e) {
184-
Thread.currentThread().interrupt();
185-
}
215+
enqueueingCallback.enqueue(queue, item);
186216

187217
synchronized (this) {
188218
if (isDormant(key)) {
@@ -243,4 +273,10 @@ private K readyToInProgress() {
243273
}
244274
return key;
245275
}
276+
277+
private interface EnqueueingCallback<W> {
278+
279+
void enqueue(BlockingQueue<W> queue, W item);
280+
281+
}
246282
}
Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,26 @@
1+
// Copyright (c) 2018-Present Pivotal Software, Inc. All rights reserved.
2+
//
3+
// This software, the RabbitMQ Java client library, is triple-licensed under the
4+
// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2
5+
// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
6+
// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,
7+
// please see LICENSE-APACHE2.
8+
//
9+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
10+
// either express or implied. See the LICENSE file for specific language governing
11+
// rights and limitations of this software.
12+
//
13+
// If you have any questions regarding licensing, please contact us at
14+
// info@rabbitmq.com.
15+
116
package com.rabbitmq.client.impl;
217

318
/**
4-
*
19+
* Exception thrown when {@link WorkPool} enqueueing times out.
520
*/
6-
public class WorkPoolFullException {
21+
public class WorkPoolFullException extends RuntimeException {
722

23+
public WorkPoolFullException(String msg) {
24+
super(msg);
25+
}
826
}

src/test/java/com/rabbitmq/client/impl/WorkPoolTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
*/
3131
public class WorkPoolTests {
3232

33-
private final WorkPool<String, Object> pool = new WorkPool<String, Object>();
33+
private final WorkPool<String, Object> pool = new WorkPool<String, Object>(-1);
3434

3535
/**
3636
* Test unknown key tolerated silently

src/test/java/com/rabbitmq/client/test/NoAutoRecoveryWhenTcpWindowIsFullTest.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.rabbitmq.client.Envelope;
2525
import com.rabbitmq.client.Recoverable;
2626
import com.rabbitmq.client.RecoveryListener;
27+
import com.rabbitmq.client.impl.nio.NioParams;
2728
import com.rabbitmq.client.impl.recovery.AutorecoveringChannel;
2829
import com.rabbitmq.client.impl.recovery.AutorecoveringConnection;
2930
import org.junit.After;
@@ -79,6 +80,7 @@ public void setUp() throws Exception {
7980
final ConnectionFactory factory = TestUtils.connectionFactory();
8081
factory.setSocketConfigurator(new DefaultSocketConfigurator() {
8182

83+
/* default value on a Linux platform */
8284
int DEFAULT_RECEIVE_BUFFER_SIZE = 43690;
8385

8486
@Override
@@ -94,10 +96,15 @@ public void configure(Socket socket) throws IOException {
9496
factory.setRequestedHeartbeat(5);
9597
factory.setSharedExecutor(executorService);
9698
// we need the shutdown executor: channel shutting down depends on the work pool,
97-
// which is full. Channel shutting down will time out with the shutdown executor
99+
// which is full. Channel shutting down will time out with the shutdown executor.
98100
factory.setShutdownExecutor(executorService);
99101
factory.setNetworkRecoveryInterval(2000);
100102

103+
if (TestUtils.USE_NIO) {
104+
factory.setWorkPoolTimeout(10 * 1000);
105+
factory.setNioParams(new NioParams().setWriteQueueCapacity(10 * 1000 * 1000).setNbIoThreads(4));
106+
}
107+
101108
producingConnection = (AutorecoveringConnection) factory.newConnection("Producer Connection");
102109
producingChannel = (AutorecoveringChannel) producingConnection.createChannel();
103110
consumingConnection = (AutorecoveringConnection) factory.newConnection("Consuming Connection");
@@ -116,9 +123,6 @@ public void tearDown() throws IOException {
116123

117124
@Test
118125
public void failureAndRecovery() throws IOException, InterruptedException {
119-
if (TestUtils.USE_NIO) {
120-
return;
121-
}
122126
final String queue = UUID.randomUUID().toString();
123127

124128
final CountDownLatch recoveryLatch = new CountDownLatch(1);

0 commit comments

Comments
 (0)