Skip to content

Commit 235b399

Browse files
merge bug14587 into bug25999
2 parents 52d7063 + d503784 commit 235b399

File tree

10 files changed

+151
-30
lines changed

10 files changed

+151
-30
lines changed

src/com/rabbitmq/client/ConnectionFactory.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import javax.net.ssl.TrustManager;
3333

3434
import com.rabbitmq.client.impl.AMQConnection;
35+
import com.rabbitmq.client.impl.DefaultThreadFactory;
3536
import com.rabbitmq.client.impl.ConnectionParams;
3637
import com.rabbitmq.client.impl.FrameHandler;
3738
import com.rabbitmq.client.impl.FrameHandlerFactory;
@@ -89,6 +90,7 @@ public class ConnectionFactory implements Cloneable {
8990
private SocketFactory factory = SocketFactory.getDefault();
9091
private SaslConfig saslConfig = DefaultSaslConfig.PLAIN;
9192
private ExecutorService sharedExecutor;
93+
private ThreadFactory threadFactory = new DefaultThreadFactory();
9294
private SocketConfigurator socketConf = new DefaultSocketConfigurator();
9395

9496
private boolean automaticRecovery = false;
@@ -426,6 +428,24 @@ public void setSharedExecutor(ExecutorService executor) {
426428
this.sharedExecutor = executor;
427429
}
428430

431+
/**
432+
* Retrieve the thread factory used to instantiate new threads.
433+
* @see com.rabbitmq.client.ThreadFactory
434+
* @see com.rabbitmq.client.impl.DefaultThreadFactory
435+
*/
436+
public ThreadFactory getThreadFactory() {
437+
return threadFactory;
438+
}
439+
440+
/**
441+
* Set the thread factory to use for newly created connections.
442+
* @see com.rabbitmq.client.ThreadFactory
443+
* @see com.rabbitmq.client.impl.DefaultThreadFactory
444+
*/
445+
public void setThreadFactory(ThreadFactory threadFactory) {
446+
this.threadFactory = threadFactory;
447+
}
448+
429449
public boolean isSSL(){
430450
return getSocketFactory() instanceof SSLSocketFactory;
431451
}
@@ -546,6 +566,7 @@ public Connection newConnection(ExecutorService executor, Address[] addrs)
546566
try {
547567
FrameHandler handler = fhFactory.create(addr);
548568
AMQConnection conn = new AMQConnection(params, handler);
569+
conn.setThreadFactory(this.threadFactory);
549570
conn.start();
550571
return conn;
551572
} catch (IOException e) {
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package com.rabbitmq.client;
2+
3+
/**
4+
* Extends {@link java.util.concurrent.ThreadFactory} to make it possible to specify
5+
* thread name.
6+
*
7+
* In environments with restricted thread management (e.g. Google App Engine), developers
8+
* can provide a custom factory to control how network I/O thread is created.
9+
*/
10+
public interface ThreadFactory extends java.util.concurrent.ThreadFactory {
11+
/**
12+
* Like {@link java.util.concurrent.ThreadFactory#newThread(Runnable)} but also takes
13+
* a thread name.
14+
*
15+
* @param r runnable to execute
16+
* @param threadName thread name
17+
* @return a new thread
18+
*/
19+
Thread newThread(Runnable r, String threadName);
20+
}

src/com/rabbitmq/client/impl/AMQConnection.java

Lines changed: 48 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.util.HashMap;
2727
import java.util.Map;
2828
import java.util.concurrent.CopyOnWriteArrayList;
29+
import java.util.concurrent.ExecutorService;
2930
import java.util.concurrent.TimeoutException;
3031

3132
import com.rabbitmq.client.AMQP;
@@ -45,6 +46,7 @@
4546
import com.rabbitmq.client.SaslConfig;
4647
import com.rabbitmq.client.SaslMechanism;
4748
import com.rabbitmq.client.ShutdownSignalException;
49+
import com.rabbitmq.client.ThreadFactory;
4850
import com.rabbitmq.client.impl.AMQChannel.BlockingRpcContinuation;
4951
import com.rabbitmq.utility.BlockingCell;
5052

@@ -62,6 +64,9 @@ final class Copyright {
6264
public class AMQConnection extends ShutdownNotifierComponent implements Connection, NetworkConnection {
6365
/** Timeout used while waiting for AMQP handshaking to complete (milliseconds) */
6466
public static final int HANDSHAKE_TIMEOUT = 10000;
67+
private final ExecutorService executor;
68+
private Thread mainLoopThread;
69+
private ThreadFactory threadFactory = new DefaultThreadFactory();
6570

6671
/**
6772
* Retrieve a copy of the default table of client properties that
@@ -102,7 +107,7 @@ public static final Map<String, Object> defaultClientProperties() {
102107
}
103108
};
104109

105-
protected final ConsumerWorkService _workService;
110+
protected ConsumerWorkService _workService = null;
106111

107112
/** Frame source/sink */
108113
private final FrameHandler _frameHandler;
@@ -125,7 +130,7 @@ public static final Map<String, Object> defaultClientProperties() {
125130
private volatile boolean _inConnectionNegotiation;
126131

127132
/** Manages heart-beat sending for this connection */
128-
private final HeartbeatSender _heartbeatSender;
133+
private HeartbeatSender _heartbeatSender;
129134

130135
private final String _virtualHost;
131136
private final Map<String, Object> _clientProperties;
@@ -213,16 +218,23 @@ public AMQConnection(ConnectionParams params, FrameHandler frameHandler)
213218
this.requestedChannelMax = params.getRequestedChannelMax();
214219
this.requestedHeartbeat = params.getRequestedHeartbeat();
215220
this.saslConfig = params.getSaslConfig();
221+
this.executor = params.getExecutor();
216222

217-
this._workService = new ConsumerWorkService(params.getExecutor());
218223
this._channelManager = null;
219224

220-
this._heartbeatSender = new HeartbeatSender(frameHandler);
221225
this._brokerInitiatedShutdown = false;
222226

223227
this._inConnectionNegotiation = true; // we start out waiting for the first protocol response
224228
}
225229

230+
private void initializeConsumerWorkService() {
231+
this._workService = new ConsumerWorkService(executor, threadFactory);
232+
}
233+
234+
private void initializeHeartbeatSender() {
235+
this._heartbeatSender = new HeartbeatSender(_frameHandler, threadFactory);
236+
}
237+
226238
/**
227239
* Start up the connection, including the MainLoop thread.
228240
* Sends the protocol
@@ -242,6 +254,8 @@ public AMQConnection(ConnectionParams params, FrameHandler frameHandler)
242254
public void start()
243255
throws IOException
244256
{
257+
initializeConsumerWorkService();
258+
initializeHeartbeatSender();
245259
this._running = true;
246260
// Make sure that the first thing we do is to send the header,
247261
// which should cause any socket errors to show up for us, rather
@@ -265,7 +279,9 @@ public void start()
265279
}
266280

267281
// start the main loop going
268-
new MainLoop("AMQP Connection " + getHostAddress() + ":" + getPort()).start();
282+
MainLoop loop = new MainLoop();
283+
mainLoopThread = threadFactory.newThread(loop, "AMQP Connection " + getHostAddress() + ":" + getPort());
284+
mainLoopThread.start();
269285
// after this point clear-up of MainLoop is triggered by closing the frameHandler.
270286

271287
AMQP.Connection.Start connStart = null;
@@ -338,7 +354,7 @@ public void start()
338354
int channelMax =
339355
negotiateChannelMax(this.requestedChannelMax,
340356
connTune.getChannelMax());
341-
_channelManager = instantiateChannelManager(channelMax);
357+
_channelManager = instantiateChannelManager(channelMax, threadFactory);
342358

343359
int frameMax =
344360
negotiatedMaxValue(this.requestedFrameMax,
@@ -375,8 +391,8 @@ public void start()
375391
return;
376392
}
377393

378-
protected ChannelManager instantiateChannelManager(int channelMax) {
379-
return new ChannelManager(this._workService, channelMax);
394+
protected ChannelManager instantiateChannelManager(int channelMax, ThreadFactory threadFactory) {
395+
return new ChannelManager(this._workService, channelMax, threadFactory);
380396
}
381397

382398
/**
@@ -428,6 +444,23 @@ public void setHeartbeat(int heartbeat) {
428444
}
429445
}
430446

447+
/**
448+
* Makes it possible to override thread factory that is used
449+
* to instantiate connection network I/O loop. Only necessary
450+
* in the environments with restricted
451+
* @param threadFactory
452+
*/
453+
public void setThreadFactory(ThreadFactory threadFactory) {
454+
this.threadFactory = threadFactory;
455+
}
456+
457+
/**
458+
* @return Thread factory used by this connection.
459+
*/
460+
public ThreadFactory getThreadFactory() {
461+
return threadFactory;
462+
}
463+
431464
public Map<String, Object> getClientProperties() {
432465
return new HashMap<String, Object>(_clientProperties);
433466
}
@@ -486,14 +519,7 @@ private static final int negotiatedMaxValue(int clientValue, int serverValue) {
486519
Math.min(clientValue, serverValue);
487520
}
488521

489-
private class MainLoop extends Thread {
490-
491-
/**
492-
* @param name of thread
493-
*/
494-
MainLoop(String name) {
495-
super(name);
496-
}
522+
private class MainLoop implements Runnable {
497523

498524
/**
499525
* Channel reader thread main loop. Reads a frame, and if it is
@@ -637,13 +663,14 @@ public void handleConnectionClose(Command closeCommand) {
637663
_channel0.quiescingTransmit(new AMQP.Connection.CloseOk.Builder().build());
638664
} catch (IOException _) { } // ignore
639665
_brokerInitiatedShutdown = true;
640-
Thread scw = new SocketCloseWait(sse);
641-
scw.setName("AMQP Connection Closing Monitor " +
666+
SocketCloseWait scw = new SocketCloseWait(sse);
667+
Thread waiter = threadFactory.newThread(scw);
668+
waiter.setName("AMQP Connection Closing Monitor " +
642669
getHostAddress() + ":" + getPort());
643-
scw.start();
670+
waiter.start();
644671
}
645672

646-
private class SocketCloseWait extends Thread {
673+
private class SocketCloseWait implements Runnable {
647674
private final ShutdownSignalException cause;
648675

649676
public SocketCloseWait(ShutdownSignalException sse) {
@@ -790,7 +817,7 @@ public void close(int closeCode,
790817
boolean abort)
791818
throws IOException
792819
{
793-
boolean sync = !(Thread.currentThread() instanceof MainLoop);
820+
boolean sync = !(Thread.currentThread() == mainLoopThread);
794821

795822
try {
796823
AMQP.Connection.Close reason =

src/com/rabbitmq/client/impl/ChannelManager.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.util.concurrent.TimeUnit;
2626

2727
import com.rabbitmq.client.ShutdownSignalException;
28+
import com.rabbitmq.client.ThreadFactory;
2829
import com.rabbitmq.utility.IntAllocator;
2930

3031
/**
@@ -45,12 +46,17 @@ public class ChannelManager {
4546

4647
/** Maximum channel number available on this connection. */
4748
private final int _channelMax;
49+
private final ThreadFactory threadFactory;
4850

4951
public int getChannelMax(){
5052
return _channelMax;
5153
}
5254

5355
public ChannelManager(ConsumerWorkService workService, int channelMax) {
56+
this(workService, channelMax, new DefaultThreadFactory());
57+
}
58+
59+
public ChannelManager(ConsumerWorkService workService, int channelMax, ThreadFactory threadFactory) {
5460
if (channelMax == 0) {
5561
// The framing encoding only allows for unsigned 16-bit integers
5662
// for the channel number
@@ -60,6 +66,7 @@ public ChannelManager(ConsumerWorkService workService, int channelMax) {
6066
channelNumberAllocator = new IntAllocator(1, channelMax);
6167

6268
this.workService = workService;
69+
this.threadFactory = threadFactory;
6370
}
6471

6572
/**
@@ -97,14 +104,18 @@ public void handleSignal(ShutdownSignalException signal) {
97104
private void scheduleShutdownProcessing() {
98105
final Set<CountDownLatch> sdSet = new HashSet<CountDownLatch>(shutdownSet);
99106
final ConsumerWorkService ssWorkService = workService;
100-
Thread shutdownThread = new Thread( new Runnable() {
107+
Runnable target = new Runnable() {
101108
public void run() {
102109
for (CountDownLatch latch : sdSet) {
103110
try { latch.await(SHUTDOWN_TIMEOUT_SECONDS, TimeUnit.SECONDS); } catch (Throwable e) { /*ignored*/ }
104111
}
105112
ssWorkService.shutdown();
106-
}}, "ConsumerWorkServiceShutdown");
107-
shutdownThread.setDaemon(true);
113+
}
114+
};
115+
Thread shutdownThread = threadFactory.newThread(target, "ConsumerWorkService shutdown monitor");
116+
if(Environment.isAllowedToModifyThreads()) {
117+
shutdownThread.setDaemon(true);
118+
}
108119
shutdownThread.start();
109120
}
110121

src/com/rabbitmq/client/impl/ConsumerWorkService.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.util.List;
2020
import java.util.concurrent.ExecutorService;
2121
import java.util.concurrent.Executors;
22+
import java.util.concurrent.ThreadFactory;
2223

2324
import com.rabbitmq.client.Channel;
2425

@@ -29,9 +30,9 @@ final public class ConsumerWorkService {
2930
private final boolean privateExecutor;
3031
private final WorkPool<Channel, Runnable> workPool;
3132

32-
public ConsumerWorkService(ExecutorService executor) {
33+
public ConsumerWorkService(ExecutorService executor, ThreadFactory threadFactory) {
3334
this.privateExecutor = (executor == null);
34-
this.executor = (executor == null) ? Executors.newFixedThreadPool(DEFAULT_NUM_THREADS)
35+
this.executor = (executor == null) ? Executors.newFixedThreadPool(DEFAULT_NUM_THREADS, threadFactory)
3536
: executor;
3637
this.workPool = new WorkPool<Channel, Runnable>();
3738
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package com.rabbitmq.client.impl;
2+
3+
import com.rabbitmq.client.ThreadFactory;
4+
5+
/**
6+
* Default thread factory that instantiates {@link java.lang.Thread}s directly.
7+
*/
8+
public class DefaultThreadFactory implements ThreadFactory {
9+
@Override
10+
public Thread newThread(Runnable r, String threadName) {
11+
return new Thread(r, threadName);
12+
}
13+
14+
@Override
15+
public Thread newThread(Runnable r) {
16+
return new Thread(r);
17+
}
18+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package com.rabbitmq.client.impl;
2+
3+
/**
4+
* Infers information about the execution environment, e.g.
5+
* security permissions.
6+
*/
7+
class Environment {
8+
public static boolean isAllowedToModifyThreads() {
9+
try {
10+
SecurityManager sm = new SecurityManager();
11+
sm.checkPermission(new RuntimePermission("modifyThread"));
12+
sm.checkPermission(new RuntimePermission("modifyThreadGroup"));
13+
return true;
14+
} catch (SecurityException se) {
15+
return false;
16+
}
17+
}
18+
}

src/com/rabbitmq/client/impl/HeartbeatSender.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.concurrent.ExecutorService;
2323
import java.util.concurrent.ScheduledExecutorService;
2424
import java.util.concurrent.Executors;
25+
import java.util.concurrent.ThreadFactory;
2526
import java.util.concurrent.TimeUnit;
2627
import java.util.concurrent.ScheduledFuture;
2728
import java.io.IOException;
@@ -39,6 +40,7 @@ final class HeartbeatSender {
3940
private final Object monitor = new Object();
4041

4142
private final FrameHandler frameHandler;
43+
private final ThreadFactory threadFactory;
4244

4345
private ScheduledExecutorService executor;
4446

@@ -48,8 +50,9 @@ final class HeartbeatSender {
4850

4951
private volatile long lastActivityTime;
5052

51-
HeartbeatSender(FrameHandler frameHandler) {
53+
HeartbeatSender(FrameHandler frameHandler, ThreadFactory threadFactory) {
5254
this.frameHandler = frameHandler;
55+
this.threadFactory = threadFactory;
5356
}
5457

5558
public void signalActivity() {
@@ -86,7 +89,7 @@ public void setHeartbeat(int heartbeatSeconds) {
8689
private ScheduledExecutorService createExecutorIfNecessary() {
8790
synchronized (this.monitor) {
8891
if (this.executor == null) {
89-
this.executor = Executors.newSingleThreadScheduledExecutor();
92+
this.executor = Executors.newSingleThreadScheduledExecutor(threadFactory);
9093
}
9194
return this.executor;
9295
}

0 commit comments

Comments
 (0)