Skip to content

Commit 2bdaac8

Browse files
Don't instantiate threads directly in two more places
1 parent 129ac77 commit 2bdaac8

File tree

2 files changed

+28
-8
lines changed

2 files changed

+28
-8
lines changed

src/com/rabbitmq/client/impl/AMQConnection.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -389,7 +389,7 @@ public void start()
389389
int channelMax =
390390
negotiateChannelMax(this.requestedChannelMax,
391391
connTune.getChannelMax());
392-
_channelManager = new ChannelManager(this._workService, channelMax);
392+
_channelManager = new ChannelManager(this._workService, channelMax, threadFactory);
393393

394394
int frameMax =
395395
negotiatedMaxValue(this.requestedFrameMax,
@@ -485,6 +485,13 @@ public void setThreadFactory(ThreadFactory threadFactory) {
485485
this.threadFactory = threadFactory;
486486
}
487487

488+
/**
489+
* @return Thread factory used by this connection.
490+
*/
491+
public ThreadFactory getThreadFactory() {
492+
return threadFactory;
493+
}
494+
488495
public Map<String, Object> getClientProperties() {
489496
return new HashMap<String, Object>(_clientProperties);
490497
}
@@ -687,13 +694,14 @@ public void handleConnectionClose(Command closeCommand) {
687694
_channel0.quiescingTransmit(new AMQP.Connection.CloseOk.Builder().build());
688695
} catch (IOException _) { } // ignore
689696
_brokerInitiatedShutdown = true;
690-
Thread scw = new SocketCloseWait(sse);
691-
scw.setName("AMQP Connection Closing Monitor " +
697+
SocketCloseWait scw = new SocketCloseWait(sse);
698+
Thread waiter = threadFactory.newThread(scw);
699+
waiter.setName("AMQP Connection Closing Monitor " +
692700
getHostAddress() + ":" + getPort());
693-
scw.start();
701+
waiter.start();
694702
}
695703

696-
private class SocketCloseWait extends Thread {
704+
private class SocketCloseWait implements Runnable {
697705
private final ShutdownSignalException cause;
698706

699707
public SocketCloseWait(ShutdownSignalException sse) {

src/com/rabbitmq/client/impl/ChannelManager.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.util.concurrent.TimeUnit;
2626

2727
import com.rabbitmq.client.ShutdownSignalException;
28+
import com.rabbitmq.client.ThreadFactory;
2829
import com.rabbitmq.utility.IntAllocator;
2930

3031
/**
@@ -45,12 +46,17 @@ public final class ChannelManager {
4546

4647
/** Maximum channel number available on this connection. */
4748
private final int _channelMax;
49+
private final ThreadFactory threadFactory;
4850

4951
public int getChannelMax(){
5052
return _channelMax;
5153
}
5254

5355
public ChannelManager(ConsumerWorkService workService, int channelMax) {
56+
this(workService, channelMax, new DefaultThreadFactory());
57+
}
58+
59+
public ChannelManager(ConsumerWorkService workService, int channelMax, ThreadFactory threadFactory) {
5460
if (channelMax == 0) {
5561
// The framing encoding only allows for unsigned 16-bit integers
5662
// for the channel number
@@ -60,6 +66,7 @@ public ChannelManager(ConsumerWorkService workService, int channelMax) {
6066
channelNumberAllocator = new IntAllocator(1, channelMax);
6167

6268
this.workService = workService;
69+
this.threadFactory = threadFactory;
6370
}
6471

6572
/**
@@ -97,13 +104,18 @@ public void handleSignal(ShutdownSignalException signal) {
97104
private void scheduleShutdownProcessing() {
98105
final Set<CountDownLatch> sdSet = new HashSet<CountDownLatch>(shutdownSet);
99106
final ConsumerWorkService ssWorkService = workService;
100-
Thread shutdownThread = new Thread( new Runnable() {
107+
Runnable target = new Runnable() {
101108
public void run() {
102109
for (CountDownLatch latch : sdSet) {
103-
try { latch.await(SHUTDOWN_TIMEOUT_SECONDS, TimeUnit.SECONDS); } catch (Throwable e) { }
110+
try {
111+
latch.await(SHUTDOWN_TIMEOUT_SECONDS, TimeUnit.SECONDS);
112+
} catch (Throwable e) {
113+
}
104114
}
105115
ssWorkService.shutdown();
106-
}}, "ConsumerWorkServiceShutdown");
116+
}
117+
};
118+
Thread shutdownThread = threadFactory.newThread(target, "ConsumerWorkService shutdown monitor");
107119
shutdownThread.setDaemon(true);
108120
shutdownThread.start();
109121
}

0 commit comments

Comments
 (0)