Skip to content

Commit 950bce4

Browse files
Make ConsumerWorkService use connection's ThreadFactory
1 parent f3d3d01 commit 950bce4

File tree

3 files changed

+17
-8
lines changed

3 files changed

+17
-8
lines changed

src/com/rabbitmq/client/impl/AMQConnection.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.util.HashMap;
2727
import java.util.Map;
2828
import java.util.concurrent.CopyOnWriteArrayList;
29+
import java.util.concurrent.ExecutorService;
2930
import java.util.concurrent.TimeoutException;
3031

3132
import com.rabbitmq.client.AMQP;
@@ -62,6 +63,7 @@ final class Copyright {
6263
public class AMQConnection extends ShutdownNotifierComponent implements Connection, NetworkConnection {
6364
/** Timeout used while waiting for AMQP handshaking to complete (milliseconds) */
6465
public static final int HANDSHAKE_TIMEOUT = 10000;
66+
private final ExecutorService executor;
6567
private Thread mainLoopThread;
6668
private ThreadFactory threadFactory = new DefaultThreadFactory();
6769

@@ -104,7 +106,7 @@ public static final Map<String, Object> defaultClientProperties() {
104106
}
105107
};
106108

107-
protected final ConsumerWorkService _workService;
109+
protected ConsumerWorkService _workService = null;
108110

109111
/** Frame source/sink */
110112
private final FrameHandler _frameHandler;
@@ -215,17 +217,21 @@ public AMQConnection(ConnectionParams params, FrameHandler frameHandler)
215217
this.requestedChannelMax = params.getRequestedChannelMax();
216218
this.requestedHeartbeat = params.getRequestedHeartbeat();
217219
this.saslConfig = params.getSaslConfig();
220+
this.executor = params.getExecutor();
218221

219-
this._workService = new ConsumerWorkService(params.getExecutor());
220222
this._channelManager = null;
221223

222224
this._brokerInitiatedShutdown = false;
223225

224226
this._inConnectionNegotiation = true; // we start out waiting for the first protocol response
225227
}
226228

227-
private void initializeHeartbeatSender(FrameHandler frameHandler) {
228-
this._heartbeatSender = new HeartbeatSender(frameHandler, threadFactory);
229+
private void initializeConsumerWorkService() {
230+
this._workService = new ConsumerWorkService(executor, threadFactory);
231+
}
232+
233+
private void initializeHeartbeatSender() {
234+
this._heartbeatSender = new HeartbeatSender(_frameHandler, threadFactory);
229235
}
230236

231237
/**
@@ -247,7 +253,8 @@ private void initializeHeartbeatSender(FrameHandler frameHandler) {
247253
public void start()
248254
throws IOException
249255
{
250-
initializeHeartbeatSender(_frameHandler);
256+
initializeConsumerWorkService();
257+
initializeHeartbeatSender();
251258
this._running = true;
252259
// Make sure that the first thing we do is to send the header,
253260
// which should cause any socket errors to show up for us, rather

src/com/rabbitmq/client/impl/ConsumerWorkService.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.util.List;
2020
import java.util.concurrent.ExecutorService;
2121
import java.util.concurrent.Executors;
22+
import java.util.concurrent.ThreadFactory;
2223

2324
import com.rabbitmq.client.Channel;
2425

@@ -29,9 +30,9 @@ final public class ConsumerWorkService {
2930
private final boolean privateExecutor;
3031
private final WorkPool<Channel, Runnable> workPool;
3132

32-
public ConsumerWorkService(ExecutorService executor) {
33+
public ConsumerWorkService(ExecutorService executor, ThreadFactory threadFactory) {
3334
this.privateExecutor = (executor == null);
34-
this.executor = (executor == null) ? Executors.newFixedThreadPool(DEFAULT_NUM_THREADS)
35+
this.executor = (executor == null) ? Executors.newFixedThreadPool(DEFAULT_NUM_THREADS, threadFactory)
3536
: executor;
3637
this.workPool = new WorkPool<Channel, Runnable>();
3738
}

test/src/com/rabbitmq/client/test/server/ChannelLimitNegotiation.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import com.rabbitmq.client.ShutdownSignalException;
88
import com.rabbitmq.client.impl.AMQConnection;
99
import com.rabbitmq.client.impl.ChannelN;
10+
import com.rabbitmq.client.impl.DefaultThreadFactory;
1011
import com.rabbitmq.client.impl.SocketFrameHandler;
1112
import com.rabbitmq.client.impl.ConsumerWorkService;
1213
import com.rabbitmq.client.test.BrokerTestCase;
@@ -80,7 +81,7 @@ public void testOpeningTooManyChannels() throws Exception {
8081

8182
// Construct a channel directly
8283
final ChannelN ch = new ChannelN((AMQConnection) conn, n + 1,
83-
new ConsumerWorkService(Executors.newSingleThreadExecutor()));
84+
new ConsumerWorkService(Executors.newSingleThreadExecutor(), new DefaultThreadFactory()));
8485
conn.addShutdownListener(new ShutdownListener() {
8586
public void shutdownCompleted(ShutdownSignalException cause) {
8687
// make sure channel.open continuation is released

0 commit comments

Comments
 (0)