Skip to content

Commit e68e1f8

Browse files
merge default into bug25999
2 parents 02b532b + 6536825 commit e68e1f8

File tree

10 files changed

+151
-30
lines changed

10 files changed

+151
-30
lines changed

src/com/rabbitmq/client/ConnectionFactory.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import javax.net.ssl.TrustManager;
3333

3434
import com.rabbitmq.client.impl.AMQConnection;
35+
import com.rabbitmq.client.impl.DefaultThreadFactory;
3536
import com.rabbitmq.client.impl.ConnectionParams;
3637
import com.rabbitmq.client.impl.FrameHandler;
3738
import com.rabbitmq.client.impl.FrameHandlerFactory;
@@ -89,6 +90,7 @@ public class ConnectionFactory implements Cloneable {
8990
private SocketFactory factory = SocketFactory.getDefault();
9091
private SaslConfig saslConfig = DefaultSaslConfig.PLAIN;
9192
private ExecutorService sharedExecutor;
93+
private ThreadFactory threadFactory = new DefaultThreadFactory();
9294
private SocketConfigurator socketConf = new DefaultSocketConfigurator();
9395

9496
private boolean automaticRecovery = false;
@@ -426,6 +428,24 @@ public void setSharedExecutor(ExecutorService executor) {
426428
this.sharedExecutor = executor;
427429
}
428430

431+
/**
432+
* Retrieve the thread factory used to instantiate new threads.
433+
* @see com.rabbitmq.client.ThreadFactory
434+
* @see com.rabbitmq.client.impl.DefaultThreadFactory
435+
*/
436+
public ThreadFactory getThreadFactory() {
437+
return threadFactory;
438+
}
439+
440+
/**
441+
* Set the thread factory to use for newly created connections.
442+
* @see com.rabbitmq.client.ThreadFactory
443+
* @see com.rabbitmq.client.impl.DefaultThreadFactory
444+
*/
445+
public void setThreadFactory(ThreadFactory threadFactory) {
446+
this.threadFactory = threadFactory;
447+
}
448+
429449
public boolean isSSL(){
430450
return getSocketFactory() instanceof SSLSocketFactory;
431451
}
@@ -546,6 +566,7 @@ public Connection newConnection(ExecutorService executor, Address[] addrs)
546566
try {
547567
FrameHandler handler = fhFactory.create(addr);
548568
AMQConnection conn = new AMQConnection(params, handler);
569+
conn.setThreadFactory(this.threadFactory);
549570
conn.start();
550571
return conn;
551572
} catch (IOException e) {
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package com.rabbitmq.client;
2+
3+
/**
4+
* Extends {@link java.util.concurrent.ThreadFactory} to make it possible to specify
5+
* thread name.
6+
*
7+
* In environments with restricted thread management (e.g. Google App Engine), developers
8+
* can provide a custom factory to control how network I/O thread is created.
9+
*/
10+
public interface ThreadFactory extends java.util.concurrent.ThreadFactory {
11+
/**
12+
* Like {@link java.util.concurrent.ThreadFactory#newThread(Runnable)} but also takes
13+
* a thread name.
14+
*
15+
* @param r runnable to execute
16+
* @param threadName thread name
17+
* @return a new thread
18+
*/
19+
Thread newThread(Runnable r, String threadName);
20+
}

src/com/rabbitmq/client/impl/AMQConnection.java

Lines changed: 48 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.util.HashMap;
2727
import java.util.Map;
2828
import java.util.concurrent.CopyOnWriteArrayList;
29+
import java.util.concurrent.ExecutorService;
2930
import java.util.concurrent.TimeoutException;
3031

3132
import com.rabbitmq.client.AMQP;
@@ -44,6 +45,7 @@
4445
import com.rabbitmq.client.SaslConfig;
4546
import com.rabbitmq.client.SaslMechanism;
4647
import com.rabbitmq.client.ShutdownSignalException;
48+
import com.rabbitmq.client.ThreadFactory;
4749
import com.rabbitmq.client.impl.AMQChannel.BlockingRpcContinuation;
4850
import com.rabbitmq.utility.BlockingCell;
4951

@@ -61,6 +63,9 @@ final class Copyright {
6163
public class AMQConnection extends ShutdownNotifierComponent implements Connection, NetworkConnection {
6264
/** Timeout used while waiting for AMQP handshaking to complete (milliseconds) */
6365
public static final int HANDSHAKE_TIMEOUT = 10000;
66+
private final ExecutorService executor;
67+
private Thread mainLoopThread;
68+
private ThreadFactory threadFactory = new DefaultThreadFactory();
6469

6570
/**
6671
* Retrieve a copy of the default table of client properties that
@@ -101,7 +106,7 @@ public static final Map<String, Object> defaultClientProperties() {
101106
}
102107
};
103108

104-
protected final ConsumerWorkService _workService;
109+
protected ConsumerWorkService _workService = null;
105110

106111
/** Frame source/sink */
107112
private final FrameHandler _frameHandler;
@@ -124,7 +129,7 @@ public static final Map<String, Object> defaultClientProperties() {
124129
private volatile boolean _inConnectionNegotiation;
125130

126131
/** Manages heart-beat sending for this connection */
127-
private final HeartbeatSender _heartbeatSender;
132+
private HeartbeatSender _heartbeatSender;
128133

129134
private final String _virtualHost;
130135
private final Map<String, Object> _clientProperties;
@@ -210,16 +215,23 @@ public AMQConnection(ConnectionParams params, FrameHandler frameHandler)
210215
this.requestedChannelMax = params.getRequestedChannelMax();
211216
this.requestedHeartbeat = params.getRequestedHeartbeat();
212217
this.saslConfig = params.getSaslConfig();
218+
this.executor = params.getExecutor();
213219

214-
this._workService = new ConsumerWorkService(params.getExecutor());
215220
this._channelManager = null;
216221

217-
this._heartbeatSender = new HeartbeatSender(frameHandler);
218222
this._brokerInitiatedShutdown = false;
219223

220224
this._inConnectionNegotiation = true; // we start out waiting for the first protocol response
221225
}
222226

227+
private void initializeConsumerWorkService() {
228+
this._workService = new ConsumerWorkService(executor, threadFactory);
229+
}
230+
231+
private void initializeHeartbeatSender() {
232+
this._heartbeatSender = new HeartbeatSender(_frameHandler, threadFactory);
233+
}
234+
223235
/**
224236
* Start up the connection, including the MainLoop thread.
225237
* Sends the protocol
@@ -239,6 +251,8 @@ public AMQConnection(ConnectionParams params, FrameHandler frameHandler)
239251
public void start()
240252
throws IOException
241253
{
254+
initializeConsumerWorkService();
255+
initializeHeartbeatSender();
242256
this._running = true;
243257
// Make sure that the first thing we do is to send the header,
244258
// which should cause any socket errors to show up for us, rather
@@ -262,7 +276,9 @@ public void start()
262276
}
263277

264278
// start the main loop going
265-
new MainLoop("AMQP Connection " + getHostAddress() + ":" + getPort()).start();
279+
MainLoop loop = new MainLoop();
280+
mainLoopThread = threadFactory.newThread(loop, "AMQP Connection " + getHostAddress() + ":" + getPort());
281+
mainLoopThread.start();
266282
// after this point clear-up of MainLoop is triggered by closing the frameHandler.
267283

268284
AMQP.Connection.Start connStart = null;
@@ -335,7 +351,7 @@ public void start()
335351
int channelMax =
336352
negotiateChannelMax(this.requestedChannelMax,
337353
connTune.getChannelMax());
338-
_channelManager = instantiateChannelManager(channelMax);
354+
_channelManager = instantiateChannelManager(channelMax, threadFactory);
339355

340356
int frameMax =
341357
negotiatedMaxValue(this.requestedFrameMax,
@@ -372,8 +388,8 @@ public void start()
372388
return;
373389
}
374390

375-
protected ChannelManager instantiateChannelManager(int channelMax) {
376-
return new ChannelManager(this._workService, channelMax);
391+
protected ChannelManager instantiateChannelManager(int channelMax, ThreadFactory threadFactory) {
392+
return new ChannelManager(this._workService, channelMax, threadFactory);
377393
}
378394

379395
/**
@@ -425,6 +441,23 @@ public void setHeartbeat(int heartbeat) {
425441
}
426442
}
427443

444+
/**
445+
* Makes it possible to override thread factory that is used
446+
* to instantiate connection network I/O loop. Only necessary
447+
* in the environments with restricted
448+
* @param threadFactory
449+
*/
450+
public void setThreadFactory(ThreadFactory threadFactory) {
451+
this.threadFactory = threadFactory;
452+
}
453+
454+
/**
455+
* @return Thread factory used by this connection.
456+
*/
457+
public ThreadFactory getThreadFactory() {
458+
return threadFactory;
459+
}
460+
428461
public Map<String, Object> getClientProperties() {
429462
return new HashMap<String, Object>(_clientProperties);
430463
}
@@ -483,14 +516,7 @@ private static final int negotiatedMaxValue(int clientValue, int serverValue) {
483516
Math.min(clientValue, serverValue);
484517
}
485518

486-
private class MainLoop extends Thread {
487-
488-
/**
489-
* @param name of thread
490-
*/
491-
MainLoop(String name) {
492-
super(name);
493-
}
519+
private class MainLoop implements Runnable {
494520

495521
/**
496522
* Channel reader thread main loop. Reads a frame, and if it is
@@ -634,13 +660,14 @@ public void handleConnectionClose(Command closeCommand) {
634660
_channel0.quiescingTransmit(new AMQP.Connection.CloseOk.Builder().build());
635661
} catch (IOException _) { } // ignore
636662
_brokerInitiatedShutdown = true;
637-
Thread scw = new SocketCloseWait(sse);
638-
scw.setName("AMQP Connection Closing Monitor " +
663+
SocketCloseWait scw = new SocketCloseWait(sse);
664+
Thread waiter = threadFactory.newThread(scw);
665+
waiter.setName("AMQP Connection Closing Monitor " +
639666
getHostAddress() + ":" + getPort());
640-
scw.start();
667+
waiter.start();
641668
}
642669

643-
private class SocketCloseWait extends Thread {
670+
private class SocketCloseWait implements Runnable {
644671
private final ShutdownSignalException cause;
645672

646673
public SocketCloseWait(ShutdownSignalException sse) {
@@ -787,7 +814,7 @@ public void close(int closeCode,
787814
boolean abort)
788815
throws IOException
789816
{
790-
boolean sync = !(Thread.currentThread() instanceof MainLoop);
817+
boolean sync = !(Thread.currentThread() == mainLoopThread);
791818

792819
try {
793820
AMQP.Connection.Close reason =

src/com/rabbitmq/client/impl/ChannelManager.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.util.concurrent.TimeUnit;
2626

2727
import com.rabbitmq.client.ShutdownSignalException;
28+
import com.rabbitmq.client.ThreadFactory;
2829
import com.rabbitmq.utility.IntAllocator;
2930

3031
/**
@@ -45,12 +46,17 @@ public class ChannelManager {
4546

4647
/** Maximum channel number available on this connection. */
4748
private final int _channelMax;
49+
private final ThreadFactory threadFactory;
4850

4951
public int getChannelMax(){
5052
return _channelMax;
5153
}
5254

5355
public ChannelManager(ConsumerWorkService workService, int channelMax) {
56+
this(workService, channelMax, new DefaultThreadFactory());
57+
}
58+
59+
public ChannelManager(ConsumerWorkService workService, int channelMax, ThreadFactory threadFactory) {
5460
if (channelMax == 0) {
5561
// The framing encoding only allows for unsigned 16-bit integers
5662
// for the channel number
@@ -60,6 +66,7 @@ public ChannelManager(ConsumerWorkService workService, int channelMax) {
6066
channelNumberAllocator = new IntAllocator(1, channelMax);
6167

6268
this.workService = workService;
69+
this.threadFactory = threadFactory;
6370
}
6471

6572
/**
@@ -97,14 +104,18 @@ public void handleSignal(ShutdownSignalException signal) {
97104
private void scheduleShutdownProcessing() {
98105
final Set<CountDownLatch> sdSet = new HashSet<CountDownLatch>(shutdownSet);
99106
final ConsumerWorkService ssWorkService = workService;
100-
Thread shutdownThread = new Thread( new Runnable() {
107+
Runnable target = new Runnable() {
101108
public void run() {
102109
for (CountDownLatch latch : sdSet) {
103110
try { latch.await(SHUTDOWN_TIMEOUT_SECONDS, TimeUnit.SECONDS); } catch (Throwable e) { /*ignored*/ }
104111
}
105112
ssWorkService.shutdown();
106-
}}, "ConsumerWorkServiceShutdown");
107-
shutdownThread.setDaemon(true);
113+
}
114+
};
115+
Thread shutdownThread = threadFactory.newThread(target, "ConsumerWorkService shutdown monitor");
116+
if(Environment.isAllowedToModifyThreads()) {
117+
shutdownThread.setDaemon(true);
118+
}
108119
shutdownThread.start();
109120
}
110121

src/com/rabbitmq/client/impl/ConsumerWorkService.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.util.List;
2020
import java.util.concurrent.ExecutorService;
2121
import java.util.concurrent.Executors;
22+
import java.util.concurrent.ThreadFactory;
2223

2324
import com.rabbitmq.client.Channel;
2425

@@ -29,9 +30,9 @@ final public class ConsumerWorkService {
2930
private final boolean privateExecutor;
3031
private final WorkPool<Channel, Runnable> workPool;
3132

32-
public ConsumerWorkService(ExecutorService executor) {
33+
public ConsumerWorkService(ExecutorService executor, ThreadFactory threadFactory) {
3334
this.privateExecutor = (executor == null);
34-
this.executor = (executor == null) ? Executors.newFixedThreadPool(DEFAULT_NUM_THREADS)
35+
this.executor = (executor == null) ? Executors.newFixedThreadPool(DEFAULT_NUM_THREADS, threadFactory)
3536
: executor;
3637
this.workPool = new WorkPool<Channel, Runnable>();
3738
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package com.rabbitmq.client.impl;
2+
3+
import com.rabbitmq.client.ThreadFactory;
4+
5+
/**
6+
* Default thread factory that instantiates {@link java.lang.Thread}s directly.
7+
*/
8+
public class DefaultThreadFactory implements ThreadFactory {
9+
@Override
10+
public Thread newThread(Runnable r, String threadName) {
11+
return new Thread(r, threadName);
12+
}
13+
14+
@Override
15+
public Thread newThread(Runnable r) {
16+
return new Thread(r);
17+
}
18+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package com.rabbitmq.client.impl;
2+
3+
/**
4+
* Infers information about the execution environment, e.g.
5+
* security permissions.
6+
*/
7+
class Environment {
8+
public static boolean isAllowedToModifyThreads() {
9+
try {
10+
SecurityManager sm = new SecurityManager();
11+
sm.checkPermission(new RuntimePermission("modifyThread"));
12+
sm.checkPermission(new RuntimePermission("modifyThreadGroup"));
13+
return true;
14+
} catch (SecurityException se) {
15+
return false;
16+
}
17+
}
18+
}

src/com/rabbitmq/client/impl/HeartbeatSender.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.concurrent.ExecutorService;
2323
import java.util.concurrent.ScheduledExecutorService;
2424
import java.util.concurrent.Executors;
25+
import java.util.concurrent.ThreadFactory;
2526
import java.util.concurrent.TimeUnit;
2627
import java.util.concurrent.ScheduledFuture;
2728
import java.io.IOException;
@@ -39,6 +40,7 @@ final class HeartbeatSender {
3940
private final Object monitor = new Object();
4041

4142
private final FrameHandler frameHandler;
43+
private final ThreadFactory threadFactory;
4244

4345
private ScheduledExecutorService executor;
4446

@@ -48,8 +50,9 @@ final class HeartbeatSender {
4850

4951
private volatile long lastActivityTime;
5052

51-
HeartbeatSender(FrameHandler frameHandler) {
53+
HeartbeatSender(FrameHandler frameHandler, ThreadFactory threadFactory) {
5254
this.frameHandler = frameHandler;
55+
this.threadFactory = threadFactory;
5356
}
5457

5558
public void signalActivity() {
@@ -86,7 +89,7 @@ public void setHeartbeat(int heartbeatSeconds) {
8689
private ScheduledExecutorService createExecutorIfNecessary() {
8790
synchronized (this.monitor) {
8891
if (this.executor == null) {
89-
this.executor = Executors.newSingleThreadScheduledExecutor();
92+
this.executor = Executors.newSingleThreadScheduledExecutor(threadFactory);
9093
}
9194
return this.executor;
9295
}

0 commit comments

Comments
 (0)