2626import java .util .HashMap ;
2727import java .util .Map ;
2828import java .util .concurrent .CopyOnWriteArrayList ;
29+ import java .util .concurrent .ExecutorService ;
2930import java .util .concurrent .TimeoutException ;
3031
3132import com .rabbitmq .client .AMQP ;
4445import com .rabbitmq .client .SaslConfig ;
4546import com .rabbitmq .client .SaslMechanism ;
4647import com .rabbitmq .client .ShutdownSignalException ;
48+ import com .rabbitmq .client .ThreadFactory ;
4749import com .rabbitmq .client .impl .AMQChannel .BlockingRpcContinuation ;
4850import com .rabbitmq .utility .BlockingCell ;
4951
@@ -61,6 +63,9 @@ final class Copyright {
6163public class AMQConnection extends ShutdownNotifierComponent implements Connection , NetworkConnection {
6264 /** Timeout used while waiting for AMQP handshaking to complete (milliseconds) */
6365 public static final int HANDSHAKE_TIMEOUT = 10000 ;
66+ private final ExecutorService executor ;
67+ private Thread mainLoopThread ;
68+ private ThreadFactory threadFactory = new DefaultThreadFactory ();
6469
6570 /**
6671 * Retrieve a copy of the default table of client properties that
@@ -101,7 +106,7 @@ public static final Map<String, Object> defaultClientProperties() {
101106 }
102107 };
103108
104- protected final ConsumerWorkService _workService ;
109+ protected ConsumerWorkService _workService = null ;
105110
106111 /** Frame source/sink */
107112 private final FrameHandler _frameHandler ;
@@ -124,7 +129,7 @@ public static final Map<String, Object> defaultClientProperties() {
124129 private volatile boolean _inConnectionNegotiation ;
125130
126131 /** Manages heart-beat sending for this connection */
127- private final HeartbeatSender _heartbeatSender ;
132+ private HeartbeatSender _heartbeatSender ;
128133
129134 private final String _virtualHost ;
130135 private final Map <String , Object > _clientProperties ;
@@ -212,16 +217,23 @@ public AMQConnection(ConnectionParams params, FrameHandler frameHandler)
212217 this .requestedChannelMax = params .getRequestedChannelMax ();
213218 this .requestedHeartbeat = params .getRequestedHeartbeat ();
214219 this .saslConfig = params .getSaslConfig ();
220+ this .executor = params .getExecutor ();
215221
216- this ._workService = new ConsumerWorkService (params .getExecutor ());
217222 this ._channelManager = null ;
218223
219- this ._heartbeatSender = new HeartbeatSender (frameHandler );
220224 this ._brokerInitiatedShutdown = false ;
221225
222226 this ._inConnectionNegotiation = true ; // we start out waiting for the first protocol response
223227 }
224228
229+ private void initializeConsumerWorkService () {
230+ this ._workService = new ConsumerWorkService (executor , threadFactory );
231+ }
232+
233+ private void initializeHeartbeatSender () {
234+ this ._heartbeatSender = new HeartbeatSender (_frameHandler , threadFactory );
235+ }
236+
225237 /**
226238 * Start up the connection, including the MainLoop thread.
227239 * Sends the protocol
@@ -241,6 +253,8 @@ public AMQConnection(ConnectionParams params, FrameHandler frameHandler)
241253 public void start ()
242254 throws IOException
243255 {
256+ initializeConsumerWorkService ();
257+ initializeHeartbeatSender ();
244258 this ._running = true ;
245259 // Make sure that the first thing we do is to send the header,
246260 // which should cause any socket errors to show up for us, rather
@@ -264,7 +278,9 @@ public void start()
264278 }
265279
266280 // start the main loop going
267- new MainLoop ("AMQP Connection " + getHostAddress () + ":" + getPort ()).start ();
281+ MainLoop loop = new MainLoop ();
282+ mainLoopThread = threadFactory .newThread (loop , "AMQP Connection " + getHostAddress () + ":" + getPort ());
283+ mainLoopThread .start ();
268284 // after this point clear-up of MainLoop is triggered by closing the frameHandler.
269285
270286 AMQP .Connection .Start connStart = null ;
@@ -337,7 +353,7 @@ public void start()
337353 int channelMax =
338354 negotiateChannelMax (this .requestedChannelMax ,
339355 connTune .getChannelMax ());
340- _channelManager = instantiateChannelManager (channelMax );
356+ _channelManager = instantiateChannelManager (channelMax , threadFactory );
341357
342358 int frameMax =
343359 negotiatedMaxValue (this .requestedFrameMax ,
@@ -374,8 +390,8 @@ public void start()
374390 return ;
375391 }
376392
377- protected ChannelManager instantiateChannelManager (int channelMax ) {
378- return new ChannelManager (this ._workService , channelMax );
393+ protected ChannelManager instantiateChannelManager (int channelMax , ThreadFactory threadFactory ) {
394+ return new ChannelManager (this ._workService , channelMax , threadFactory );
379395 }
380396
381397 /**
@@ -427,6 +443,23 @@ public void setHeartbeat(int heartbeat) {
427443 }
428444 }
429445
446+ /**
447+ * Makes it possible to override thread factory that is used
448+ * to instantiate connection network I/O loop. Only necessary
449+ * in the environments with restricted
450+ * @param threadFactory
451+ */
452+ public void setThreadFactory (ThreadFactory threadFactory ) {
453+ this .threadFactory = threadFactory ;
454+ }
455+
456+ /**
457+ * @return Thread factory used by this connection.
458+ */
459+ public ThreadFactory getThreadFactory () {
460+ return threadFactory ;
461+ }
462+
430463 public Map <String , Object > getClientProperties () {
431464 return new HashMap <String , Object >(_clientProperties );
432465 }
@@ -485,14 +518,7 @@ private static final int negotiatedMaxValue(int clientValue, int serverValue) {
485518 Math .min (clientValue , serverValue );
486519 }
487520
488- private class MainLoop extends Thread {
489-
490- /**
491- * @param name of thread
492- */
493- MainLoop (String name ) {
494- super (name );
495- }
521+ private class MainLoop implements Runnable {
496522
497523 /**
498524 * Channel reader thread main loop. Reads a frame, and if it is
@@ -636,13 +662,14 @@ public void handleConnectionClose(Command closeCommand) {
636662 _channel0 .quiescingTransmit (new AMQP .Connection .CloseOk .Builder ().build ());
637663 } catch (IOException _) { } // ignore
638664 _brokerInitiatedShutdown = true ;
639- Thread scw = new SocketCloseWait (sse );
640- scw .setName ("AMQP Connection Closing Monitor " +
665+ SocketCloseWait scw = new SocketCloseWait (sse );
666+ Thread waiter = threadFactory .newThread (scw );
667+ waiter .setName ("AMQP Connection Closing Monitor " +
641668 getHostAddress () + ":" + getPort ());
642- scw .start ();
669+ waiter .start ();
643670 }
644671
645- private class SocketCloseWait extends Thread {
672+ private class SocketCloseWait implements Runnable {
646673 private final ShutdownSignalException cause ;
647674
648675 public SocketCloseWait (ShutdownSignalException sse ) {
@@ -789,7 +816,7 @@ public void close(int closeCode,
789816 boolean abort )
790817 throws IOException
791818 {
792- boolean sync = !(Thread .currentThread () instanceof MainLoop );
819+ boolean sync = !(Thread .currentThread () == mainLoopThread );
793820
794821 try {
795822 AMQP .Connection .Close reason =
0 commit comments