Skip to content

Commit 0982bf4

Browse files
merge bug14587 into bug25999
2 parents 0374452 + 2ec1738 commit 0982bf4

File tree

8 files changed

+132
-24
lines changed

8 files changed

+132
-24
lines changed

src/com/rabbitmq/client/ConnectionFactory.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import javax.net.ssl.TrustManager;
3333

3434
import com.rabbitmq.client.impl.AMQConnection;
35+
import com.rabbitmq.client.impl.DefaultThreadFactory;
3536
import com.rabbitmq.client.impl.ConnectionParams;
3637
import com.rabbitmq.client.impl.FrameHandler;
3738
import com.rabbitmq.client.impl.FrameHandlerFactory;
@@ -89,6 +90,7 @@ public class ConnectionFactory implements Cloneable {
8990
private SocketFactory factory = SocketFactory.getDefault();
9091
private SaslConfig saslConfig = DefaultSaslConfig.PLAIN;
9192
private ExecutorService sharedExecutor;
93+
private ThreadFactory threadFactory = new DefaultThreadFactory();
9294
private SocketConfigurator socketConf = new DefaultSocketConfigurator();
9395

9496
private boolean automaticRecovery = false;
@@ -426,6 +428,24 @@ public void setSharedExecutor(ExecutorService executor) {
426428
this.sharedExecutor = executor;
427429
}
428430

431+
/**
432+
* Retrieve the thread factory used to instantiate new threads.
433+
* @see com.rabbitmq.client.ThreadFactory
434+
* @see com.rabbitmq.client.impl.DefaultThreadFactory
435+
*/
436+
public ThreadFactory getThreadFactory() {
437+
return threadFactory;
438+
}
439+
440+
/**
441+
* Set the thread factory to use for newly created connections.
442+
* @see com.rabbitmq.client.ThreadFactory
443+
* @see com.rabbitmq.client.impl.DefaultThreadFactory
444+
*/
445+
public void setThreadFactory(ThreadFactory threadFactory) {
446+
this.threadFactory = threadFactory;
447+
}
448+
429449
public boolean isSSL(){
430450
return getSocketFactory() instanceof SSLSocketFactory;
431451
}
@@ -544,6 +564,7 @@ public Connection newConnection(ExecutorService executor, Address[] addrs)
544564
try {
545565
FrameHandler handler = fhFactory.create(addr);
546566
AMQConnection conn = new AMQConnection(params, handler);
567+
conn.setThreadFactory(this.threadFactory);
547568
conn.start();
548569
return conn;
549570
} catch (IOException e) {
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package com.rabbitmq.client;
2+
3+
/**
4+
* Extends {@link java.util.concurrent.ThreadFactory} to make it possible to specify
5+
* thread name.
6+
*
7+
* In environments with restricted thread management (e.g. Google App Engine), developers
8+
* can provide a custom factory to control how network I/O thread is created.
9+
*/
10+
public interface ThreadFactory extends java.util.concurrent.ThreadFactory {
11+
/**
12+
* Like {@link java.util.concurrent.ThreadFactory#newThread(Runnable)} but also takes
13+
* a thread name.
14+
*
15+
* @param r runnable to execute
16+
* @param threadName thread name
17+
* @return a new thread
18+
*/
19+
Thread newThread(Runnable r, String threadName);
20+
}

src/com/rabbitmq/client/impl/AMQConnection.java

Lines changed: 34 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import com.rabbitmq.client.SaslConfig;
4545
import com.rabbitmq.client.SaslMechanism;
4646
import com.rabbitmq.client.ShutdownSignalException;
47+
import com.rabbitmq.client.ThreadFactory;
4748
import com.rabbitmq.client.impl.AMQChannel.BlockingRpcContinuation;
4849
import com.rabbitmq.utility.BlockingCell;
4950

@@ -61,6 +62,8 @@ final class Copyright {
6162
public class AMQConnection extends ShutdownNotifierComponent implements Connection, NetworkConnection {
6263
/** Timeout used while waiting for AMQP handshaking to complete (milliseconds) */
6364
public static final int HANDSHAKE_TIMEOUT = 10000;
65+
private Thread mainLoopThread;
66+
private ThreadFactory threadFactory = new DefaultThreadFactory();
6467

6568
/**
6669
* Retrieve a copy of the default table of client properties that
@@ -216,7 +219,7 @@ public AMQConnection(ConnectionParams params, FrameHandler frameHandler)
216219
this._workService = new ConsumerWorkService(params.getExecutor());
217220
this._channelManager = null;
218221

219-
this._heartbeatSender = new HeartbeatSender(frameHandler);
222+
this._heartbeatSender = new HeartbeatSender(frameHandler, threadFactory);
220223
this._brokerInitiatedShutdown = false;
221224

222225
this._inConnectionNegotiation = true; // we start out waiting for the first protocol response
@@ -264,7 +267,9 @@ public void start()
264267
}
265268

266269
// start the main loop going
267-
new MainLoop("AMQP Connection " + getHostAddress() + ":" + getPort()).start();
270+
MainLoop loop = new MainLoop();
271+
mainLoopThread = threadFactory.newThread(loop, "AMQP Connection " + getHostAddress() + ":" + getPort());
272+
mainLoopThread.start();
268273
// after this point clear-up of MainLoop is triggered by closing the frameHandler.
269274

270275
AMQP.Connection.Start connStart = null;
@@ -337,7 +342,7 @@ public void start()
337342
int channelMax =
338343
negotiateChannelMax(this.requestedChannelMax,
339344
connTune.getChannelMax());
340-
_channelManager = instantiateChannelManager(channelMax);
345+
_channelManager = instantiateChannelManager(channelMax, threadFactory);
341346

342347
int frameMax =
343348
negotiatedMaxValue(this.requestedFrameMax,
@@ -374,8 +379,8 @@ public void start()
374379
return;
375380
}
376381

377-
protected ChannelManager instantiateChannelManager(int channelMax) {
378-
return new ChannelManager(this._workService, channelMax);
382+
protected ChannelManager instantiateChannelManager(int channelMax, ThreadFactory threadFactory) {
383+
return new ChannelManager(this._workService, channelMax, threadFactory);
379384
}
380385

381386
/**
@@ -427,6 +432,23 @@ public void setHeartbeat(int heartbeat) {
427432
}
428433
}
429434

435+
/**
436+
* Makes it possible to override thread factory that is used
437+
* to instantiate connection network I/O loop. Only necessary
438+
* in the environments with restricted
439+
* @param threadFactory
440+
*/
441+
public void setThreadFactory(ThreadFactory threadFactory) {
442+
this.threadFactory = threadFactory;
443+
}
444+
445+
/**
446+
* @return Thread factory used by this connection.
447+
*/
448+
public ThreadFactory getThreadFactory() {
449+
return threadFactory;
450+
}
451+
430452
public Map<String, Object> getClientProperties() {
431453
return new HashMap<String, Object>(_clientProperties);
432454
}
@@ -485,14 +507,7 @@ private static final int negotiatedMaxValue(int clientValue, int serverValue) {
485507
Math.min(clientValue, serverValue);
486508
}
487509

488-
private class MainLoop extends Thread {
489-
490-
/**
491-
* @param name of thread
492-
*/
493-
MainLoop(String name) {
494-
super(name);
495-
}
510+
private class MainLoop implements Runnable {
496511

497512
/**
498513
* Channel reader thread main loop. Reads a frame, and if it is
@@ -636,13 +651,14 @@ public void handleConnectionClose(Command closeCommand) {
636651
_channel0.quiescingTransmit(new AMQP.Connection.CloseOk.Builder().build());
637652
} catch (IOException _) { } // ignore
638653
_brokerInitiatedShutdown = true;
639-
Thread scw = new SocketCloseWait(sse);
640-
scw.setName("AMQP Connection Closing Monitor " +
654+
SocketCloseWait scw = new SocketCloseWait(sse);
655+
Thread waiter = threadFactory.newThread(scw);
656+
waiter.setName("AMQP Connection Closing Monitor " +
641657
getHostAddress() + ":" + getPort());
642-
scw.start();
658+
waiter.start();
643659
}
644660

645-
private class SocketCloseWait extends Thread {
661+
private class SocketCloseWait implements Runnable {
646662
private final ShutdownSignalException cause;
647663

648664
public SocketCloseWait(ShutdownSignalException sse) {
@@ -789,7 +805,7 @@ public void close(int closeCode,
789805
boolean abort)
790806
throws IOException
791807
{
792-
boolean sync = !(Thread.currentThread() instanceof MainLoop);
808+
boolean sync = !(Thread.currentThread() == mainLoopThread);
793809

794810
try {
795811
AMQP.Connection.Close reason =

src/com/rabbitmq/client/impl/ChannelManager.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.util.concurrent.TimeUnit;
2626

2727
import com.rabbitmq.client.ShutdownSignalException;
28+
import com.rabbitmq.client.ThreadFactory;
2829
import com.rabbitmq.utility.IntAllocator;
2930

3031
/**
@@ -45,12 +46,17 @@ public class ChannelManager {
4546

4647
/** Maximum channel number available on this connection. */
4748
private final int _channelMax;
49+
private final ThreadFactory threadFactory;
4850

4951
public int getChannelMax(){
5052
return _channelMax;
5153
}
5254

5355
public ChannelManager(ConsumerWorkService workService, int channelMax) {
56+
this(workService, channelMax, new DefaultThreadFactory());
57+
}
58+
59+
public ChannelManager(ConsumerWorkService workService, int channelMax, ThreadFactory threadFactory) {
5460
if (channelMax == 0) {
5561
// The framing encoding only allows for unsigned 16-bit integers
5662
// for the channel number
@@ -60,6 +66,7 @@ public ChannelManager(ConsumerWorkService workService, int channelMax) {
6066
channelNumberAllocator = new IntAllocator(1, channelMax);
6167

6268
this.workService = workService;
69+
this.threadFactory = threadFactory;
6370
}
6471

6572
/**
@@ -97,14 +104,18 @@ public void handleSignal(ShutdownSignalException signal) {
97104
private void scheduleShutdownProcessing() {
98105
final Set<CountDownLatch> sdSet = new HashSet<CountDownLatch>(shutdownSet);
99106
final ConsumerWorkService ssWorkService = workService;
100-
Thread shutdownThread = new Thread( new Runnable() {
107+
Runnable target = new Runnable() {
101108
public void run() {
102109
for (CountDownLatch latch : sdSet) {
103110
try { latch.await(SHUTDOWN_TIMEOUT_SECONDS, TimeUnit.SECONDS); } catch (Throwable e) { /*ignored*/ }
104111
}
105112
ssWorkService.shutdown();
106-
}}, "ConsumerWorkServiceShutdown");
107-
shutdownThread.setDaemon(true);
113+
}
114+
};
115+
Thread shutdownThread = threadFactory.newThread(target, "ConsumerWorkService shutdown monitor");
116+
if(Environment.isAllowedToModifyThreads()) {
117+
shutdownThread.setDaemon(true);
118+
}
108119
shutdownThread.start();
109120
}
110121

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package com.rabbitmq.client.impl;
2+
3+
import com.rabbitmq.client.ThreadFactory;
4+
5+
/**
6+
* Default thread factory that instantiates {@link java.lang.Thread}s directly.
7+
*/
8+
public class DefaultThreadFactory implements ThreadFactory {
9+
@Override
10+
public Thread newThread(Runnable r, String threadName) {
11+
return new Thread(r, threadName);
12+
}
13+
14+
@Override
15+
public Thread newThread(Runnable r) {
16+
return new Thread(r);
17+
}
18+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package com.rabbitmq.client.impl;
2+
3+
/**
4+
* Infers information about the execution environment, e.g.
5+
* security permissions.
6+
*/
7+
class Environment {
8+
public static boolean isAllowedToModifyThreads() {
9+
SecurityManager sm = new SecurityManager();
10+
try {
11+
sm.checkPermission(new RuntimePermission("modifyThread"));
12+
sm.checkPermission(new RuntimePermission("modifyThreadGroup"));
13+
return true;
14+
} catch (SecurityException se) {
15+
return false;
16+
}
17+
}
18+
}

src/com/rabbitmq/client/impl/HeartbeatSender.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.concurrent.ExecutorService;
2323
import java.util.concurrent.ScheduledExecutorService;
2424
import java.util.concurrent.Executors;
25+
import java.util.concurrent.ThreadFactory;
2526
import java.util.concurrent.TimeUnit;
2627
import java.util.concurrent.ScheduledFuture;
2728
import java.io.IOException;
@@ -39,6 +40,7 @@ final class HeartbeatSender {
3940
private final Object monitor = new Object();
4041

4142
private final FrameHandler frameHandler;
43+
private final ThreadFactory threadFactory;
4244

4345
private ScheduledExecutorService executor;
4446

@@ -48,8 +50,9 @@ final class HeartbeatSender {
4850

4951
private volatile long lastActivityTime;
5052

51-
HeartbeatSender(FrameHandler frameHandler) {
53+
HeartbeatSender(FrameHandler frameHandler, ThreadFactory threadFactory) {
5254
this.frameHandler = frameHandler;
55+
this.threadFactory = threadFactory;
5356
}
5457

5558
public void signalActivity() {
@@ -86,7 +89,7 @@ public void setHeartbeat(int heartbeatSeconds) {
8689
private ScheduledExecutorService createExecutorIfNecessary() {
8790
synchronized (this.monitor) {
8891
if (this.executor == null) {
89-
this.executor = Executors.newSingleThreadScheduledExecutor();
92+
this.executor = Executors.newSingleThreadScheduledExecutor(threadFactory);
9093
}
9194
return this.executor;
9295
}

src/com/rabbitmq/client/impl/recovery/RecoveryAwareAMQConnection.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.rabbitmq.client.impl.recovery;
22

3+
import com.rabbitmq.client.ThreadFactory;
34
import com.rabbitmq.client.impl.AMQConnection;
45
import com.rabbitmq.client.impl.ConnectionParams;
56
import com.rabbitmq.client.impl.FrameHandler;
@@ -14,7 +15,7 @@ public RecoveryAwareAMQConnection(ConnectionParams params, FrameHandler handler)
1415
}
1516

1617
@Override
17-
protected RecoveryAwareChannelManager instantiateChannelManager(int channelMax) {
18+
protected RecoveryAwareChannelManager instantiateChannelManager(int channelMax, ThreadFactory threadFactory) {
1819
return new RecoveryAwareChannelManager(super._workService, channelMax);
1920
}
2021
}

0 commit comments

Comments
 (0)