Skip to content

Commit f77561a

Browse files
merge default into bug25999
2 parents 3ec56f2 + 63baa43 commit f77561a

File tree

11 files changed

+172
-38
lines changed

11 files changed

+172
-38
lines changed

src/com/rabbitmq/client/ConnectionFactory.java

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import javax.net.ssl.TrustManager;
3333

3434
import com.rabbitmq.client.impl.AMQConnection;
35+
import com.rabbitmq.client.impl.DefaultThreadFactory;
3536
import com.rabbitmq.client.impl.ConnectionParams;
3637
import com.rabbitmq.client.impl.DefaultExceptionHandler;
3738
import com.rabbitmq.client.impl.FrameHandler;
@@ -90,6 +91,7 @@ public class ConnectionFactory implements Cloneable {
9091
private SocketFactory factory = SocketFactory.getDefault();
9192
private SaslConfig saslConfig = DefaultSaslConfig.PLAIN;
9293
private ExecutorService sharedExecutor;
94+
private ThreadFactory threadFactory = new DefaultThreadFactory();
9395
private SocketConfigurator socketConf = new DefaultSocketConfigurator();
9496
private ExceptionHandler exceptionHandler = new DefaultExceptionHandler();
9597

@@ -429,20 +431,40 @@ public void setSharedExecutor(ExecutorService executor) {
429431
}
430432

431433
/**
432-
* Get the exception handler.
433-
*
434-
* @see com.rabbitmq.client.ExceptionHandler
434+
* Retrieve the thread factory used to instantiate new threads.
435+
* @see com.rabbitmq.client.ThreadFactory
436+
* @see com.rabbitmq.client.impl.DefaultThreadFactory
437+
*/
438+
public ThreadFactory getThreadFactory() {
439+
return threadFactory;
440+
}
441+
442+
/**
443+
* Set the thread factory used to instantiate new threads.
444+
* @see com.rabbitmq.client.ThreadFactory
445+
* @see com.rabbitmq.client.impl.DefaultThreadFactory
435446
*/
447+
public void setThreadFactory(ThreadFactory threadFactory) {
448+
this.threadFactory = threadFactory;
449+
}
450+
451+
/**
452+
* Get the exception handler.
453+
*
454+
* @see com.rabbitmq.client.ExceptionHandler
455+
*/
436456
public ExceptionHandler getExceptionHandler() {
437457
return exceptionHandler;
438458
}
439459

440460
/**
461+
* Set the thread factory to use for newly created connections.
462+
* @see com.rabbitmq.client.ThreadFactory
463+
* @see com.rabbitmq.client.impl.DefaultThreadFactory
441464
* Set the exception handler to use for newly created connections.
442465
*
443466
* @see com.rabbitmq.client.ExceptionHandler
444467
*/
445-
446468
public void setExceptionHandler(ExceptionHandler exceptionHandler) {
447469
if (exceptionHandler == null) {
448470
throw new IllegalArgumentException("exception handler cannot be null!");
@@ -583,7 +605,7 @@ public Connection newConnection(ExecutorService executor, Address[] addrs)
583605
public ConnectionParams params(ExecutorService executor) {
584606
return new ConnectionParams(username, password, executor, virtualHost, getClientProperties(),
585607
requestedFrameMax, requestedChannelMax, requestedHeartbeat, saslConfig,
586-
networkRecoveryInterval, topologyRecovery, exceptionHandler);
608+
networkRecoveryInterval, topologyRecovery, exceptionHandler, threadFactory);
587609
}
588610

589611
/**
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package com.rabbitmq.client;
2+
3+
/**
4+
* Extends {@link java.util.concurrent.ThreadFactory} to make it possible to specify
5+
* thread name.
6+
*
7+
* In environments with restricted thread management (e.g. Google App Engine), developers
8+
* can provide a custom factory to control how network I/O thread is created.
9+
*/
10+
public interface ThreadFactory extends java.util.concurrent.ThreadFactory {
11+
/**
12+
* Like {@link java.util.concurrent.ThreadFactory#newThread(Runnable)} but also takes
13+
* a thread name.
14+
*
15+
* @param r runnable to execute
16+
* @param threadName thread name
17+
* @return a new thread
18+
*/
19+
Thread newThread(Runnable r, String threadName);
20+
}

src/com/rabbitmq/client/impl/AMQConnection.java

Lines changed: 50 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.util.HashMap;
2727
import java.util.Map;
2828
import java.util.concurrent.CopyOnWriteArrayList;
29+
import java.util.concurrent.ExecutorService;
2930
import java.util.concurrent.TimeoutException;
3031

3132
import com.rabbitmq.client.AMQP;
@@ -45,6 +46,7 @@
4546
import com.rabbitmq.client.SaslConfig;
4647
import com.rabbitmq.client.SaslMechanism;
4748
import com.rabbitmq.client.ShutdownSignalException;
49+
import com.rabbitmq.client.ThreadFactory;
4850
import com.rabbitmq.client.impl.AMQChannel.BlockingRpcContinuation;
4951
import com.rabbitmq.utility.BlockingCell;
5052

@@ -62,6 +64,9 @@ final class Copyright {
6264
public class AMQConnection extends ShutdownNotifierComponent implements Connection, NetworkConnection {
6365
/** Timeout used while waiting for AMQP handshaking to complete (milliseconds) */
6466
public static final int HANDSHAKE_TIMEOUT = 10000;
67+
private final ExecutorService executor;
68+
private Thread mainLoopThread;
69+
private ThreadFactory threadFactory = new DefaultThreadFactory();
6570

6671
/**
6772
* Retrieve a copy of the default table of client properties that
@@ -102,7 +107,7 @@ public static final Map<String, Object> defaultClientProperties() {
102107
}
103108
};
104109

105-
protected final ConsumerWorkService _workService;
110+
protected ConsumerWorkService _workService = null;
106111

107112
/** Frame source/sink */
108113
private final FrameHandler _frameHandler;
@@ -125,7 +130,7 @@ public static final Map<String, Object> defaultClientProperties() {
125130
private volatile boolean _inConnectionNegotiation;
126131

127132
/** Manages heart-beat sending for this connection */
128-
private final HeartbeatSender _heartbeatSender;
133+
private HeartbeatSender _heartbeatSender;
129134

130135
private final String _virtualHost;
131136
private final Map<String, Object> _clientProperties;
@@ -212,17 +217,25 @@ public AMQConnection(ConnectionParams params, FrameHandler frameHandler)
212217
this.requestedChannelMax = params.getRequestedChannelMax();
213218
this.requestedHeartbeat = params.getRequestedHeartbeat();
214219
this.saslConfig = params.getSaslConfig();
220+
this.executor = params.getExecutor();
221+
this.threadFactory = params.getThreadFactory();
215222

216-
this._workService = new ConsumerWorkService(params.getExecutor());
217223
this._channelManager = null;
218224

219-
this._heartbeatSender = new HeartbeatSender(frameHandler);
220225
this._brokerInitiatedShutdown = false;
221226

222227
this._inConnectionNegotiation = true; // we start out waiting for the first protocol response
223228
}
224229

225-
/**
230+
private void initializeConsumerWorkService() {
231+
this._workService = new ConsumerWorkService(executor, threadFactory);
232+
}
233+
234+
private void initializeHeartbeatSender() {
235+
this._heartbeatSender = new HeartbeatSender(_frameHandler, threadFactory);
236+
}
237+
238+
/**
226239
* Start up the connection, including the MainLoop thread.
227240
* Sends the protocol
228241
* version negotiation header, and runs through
@@ -241,6 +254,8 @@ public AMQConnection(ConnectionParams params, FrameHandler frameHandler)
241254
public void start()
242255
throws IOException
243256
{
257+
initializeConsumerWorkService();
258+
initializeHeartbeatSender();
244259
this._running = true;
245260
// Make sure that the first thing we do is to send the header,
246261
// which should cause any socket errors to show up for us, rather
@@ -264,7 +279,9 @@ public void start()
264279
}
265280

266281
// start the main loop going
267-
new MainLoop("AMQP Connection " + getHostAddress() + ":" + getPort()).start();
282+
MainLoop loop = new MainLoop();
283+
mainLoopThread = threadFactory.newThread(loop, "AMQP Connection " + getHostAddress() + ":" + getPort());
284+
mainLoopThread.start();
268285
// after this point clear-up of MainLoop is triggered by closing the frameHandler.
269286

270287
AMQP.Connection.Start connStart = null;
@@ -337,7 +354,7 @@ public void start()
337354
int channelMax =
338355
negotiateChannelMax(this.requestedChannelMax,
339356
connTune.getChannelMax());
340-
_channelManager = instantiateChannelManager(channelMax);
357+
_channelManager = instantiateChannelManager(channelMax, threadFactory);
341358

342359
int frameMax =
343360
negotiatedMaxValue(this.requestedFrameMax,
@@ -374,8 +391,8 @@ public void start()
374391
return;
375392
}
376393

377-
protected ChannelManager instantiateChannelManager(int channelMax) {
378-
return new ChannelManager(this._workService, channelMax);
394+
protected ChannelManager instantiateChannelManager(int channelMax, ThreadFactory threadFactory) {
395+
return new ChannelManager(this._workService, channelMax, threadFactory);
379396
}
380397

381398
/**
@@ -427,6 +444,23 @@ public void setHeartbeat(int heartbeat) {
427444
}
428445
}
429446

447+
/**
448+
* Makes it possible to override thread factory that is used
449+
* to instantiate connection network I/O loop. Only necessary
450+
* in the environments with restricted
451+
* @param threadFactory
452+
*/
453+
public void setThreadFactory(ThreadFactory threadFactory) {
454+
this.threadFactory = threadFactory;
455+
}
456+
457+
/**
458+
* @return Thread factory used by this connection.
459+
*/
460+
public ThreadFactory getThreadFactory() {
461+
return threadFactory;
462+
}
463+
430464
public Map<String, Object> getClientProperties() {
431465
return new HashMap<String, Object>(_clientProperties);
432466
}
@@ -485,14 +519,7 @@ private static final int negotiatedMaxValue(int clientValue, int serverValue) {
485519
Math.min(clientValue, serverValue);
486520
}
487521

488-
private class MainLoop extends Thread {
489-
490-
/**
491-
* @param name of thread
492-
*/
493-
MainLoop(String name) {
494-
super(name);
495-
}
522+
private class MainLoop implements Runnable {
496523

497524
/**
498525
* Channel reader thread main loop. Reads a frame, and if it is
@@ -636,13 +663,14 @@ public void handleConnectionClose(Command closeCommand) {
636663
_channel0.quiescingTransmit(new AMQP.Connection.CloseOk.Builder().build());
637664
} catch (IOException _) { } // ignore
638665
_brokerInitiatedShutdown = true;
639-
Thread scw = new SocketCloseWait(sse);
640-
scw.setName("AMQP Connection Closing Monitor " +
666+
SocketCloseWait scw = new SocketCloseWait(sse);
667+
Thread waiter = threadFactory.newThread(scw);
668+
waiter.setName("AMQP Connection Closing Monitor " +
641669
getHostAddress() + ":" + getPort());
642-
scw.start();
670+
waiter.start();
643671
}
644672

645-
private class SocketCloseWait extends Thread {
673+
private class SocketCloseWait implements Runnable {
646674
private final ShutdownSignalException cause;
647675

648676
public SocketCloseWait(ShutdownSignalException sse) {
@@ -789,7 +817,7 @@ public void close(int closeCode,
789817
boolean abort)
790818
throws IOException
791819
{
792-
boolean sync = !(Thread.currentThread() instanceof MainLoop);
820+
boolean sync = !(Thread.currentThread() == mainLoopThread);
793821

794822
try {
795823
AMQP.Connection.Close reason =

src/com/rabbitmq/client/impl/ChannelManager.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.util.concurrent.TimeUnit;
2626

2727
import com.rabbitmq.client.ShutdownSignalException;
28+
import com.rabbitmq.client.ThreadFactory;
2829
import com.rabbitmq.utility.IntAllocator;
2930

3031
/**
@@ -45,12 +46,17 @@ public class ChannelManager {
4546

4647
/** Maximum channel number available on this connection. */
4748
private final int _channelMax;
49+
private final ThreadFactory threadFactory;
4850

4951
public int getChannelMax(){
5052
return _channelMax;
5153
}
5254

5355
public ChannelManager(ConsumerWorkService workService, int channelMax) {
56+
this(workService, channelMax, new DefaultThreadFactory());
57+
}
58+
59+
public ChannelManager(ConsumerWorkService workService, int channelMax, ThreadFactory threadFactory) {
5460
if (channelMax == 0) {
5561
// The framing encoding only allows for unsigned 16-bit integers
5662
// for the channel number
@@ -60,6 +66,7 @@ public ChannelManager(ConsumerWorkService workService, int channelMax) {
6066
channelNumberAllocator = new IntAllocator(1, channelMax);
6167

6268
this.workService = workService;
69+
this.threadFactory = threadFactory;
6370
}
6471

6572
/**
@@ -97,14 +104,18 @@ public void handleSignal(ShutdownSignalException signal) {
97104
private void scheduleShutdownProcessing() {
98105
final Set<CountDownLatch> sdSet = new HashSet<CountDownLatch>(shutdownSet);
99106
final ConsumerWorkService ssWorkService = workService;
100-
Thread shutdownThread = new Thread( new Runnable() {
107+
Runnable target = new Runnable() {
101108
public void run() {
102109
for (CountDownLatch latch : sdSet) {
103110
try { latch.await(SHUTDOWN_TIMEOUT_SECONDS, TimeUnit.SECONDS); } catch (Throwable e) { /*ignored*/ }
104111
}
105112
ssWorkService.shutdown();
106-
}}, "ConsumerWorkServiceShutdown");
107-
shutdownThread.setDaemon(true);
113+
}
114+
};
115+
Thread shutdownThread = threadFactory.newThread(target, "ConsumerWorkService shutdown monitor");
116+
if(Environment.isAllowedToModifyThreads()) {
117+
shutdownThread.setDaemon(true);
118+
}
108119
shutdownThread.start();
109120
}
110121

src/com/rabbitmq/client/impl/ConnectionParams.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import com.rabbitmq.client.ExceptionHandler;
44
import com.rabbitmq.client.SaslConfig;
5+
import com.rabbitmq.client.ThreadFactory;
56

67
import java.util.Map;
78
import java.util.concurrent.ExecutorService;
@@ -20,8 +21,9 @@ public class ConnectionParams {
2021
private final boolean topologyRecovery;
2122

2223
private ExceptionHandler exceptionHandler;
24+
private ThreadFactory threadFactory;
2325

24-
/**
26+
/**
2527
* @param username name used to establish connection
2628
* @param password for <code><b>username</b></code>
2729
* @param executor thread pool service for consumer threads for channels on this connection
@@ -33,13 +35,17 @@ public class ConnectionParams {
3335
* @param saslConfig sasl configuration hook
3436
* @param networkRecoveryInterval interval used when recovering from network failure
3537
* @param topologyRecovery should topology (queues, exchanges, bindings, consumers) recovery be performed?
38+
<<<<<<< local
39+
* @param threadFactory
40+
=======
3641
* @param exceptionHandler
42+
>>>>>>> other
3743
*/
3844
public ConnectionParams(String username, String password, ExecutorService executor,
3945
String virtualHost, Map<String, Object> clientProperties,
4046
int requestedFrameMax, int requestedChannelMax, int requestedHeartbeat,
4147
SaslConfig saslConfig, int networkRecoveryInterval,
42-
boolean topologyRecovery, ExceptionHandler exceptionHandler) {
48+
boolean topologyRecovery, ExceptionHandler exceptionHandler, ThreadFactory threadFactory) {
4349
this.username = username;
4450
this.password = password;
4551
this.executor = executor;
@@ -52,6 +58,7 @@ public ConnectionParams(String username, String password, ExecutorService execut
5258
this.networkRecoveryInterval = networkRecoveryInterval;
5359
this.topologyRecovery = topologyRecovery;
5460
this.exceptionHandler = exceptionHandler;
61+
this.threadFactory = threadFactory;
5562
}
5663

5764
public String getUsername() {
@@ -101,4 +108,8 @@ public int getNetworkRecoveryInterval() {
101108
public boolean isTopologyRecoveryEnabled() {
102109
return topologyRecovery;
103110
}
111+
112+
public ThreadFactory getThreadFactory() {
113+
return threadFactory;
114+
}
104115
}

0 commit comments

Comments
 (0)