Skip to content

Commit 3dc7d79

Browse files
committed
merge stable into default
2 parents 1fe4074 + 25e43c8 commit 3dc7d79

File tree

7 files changed

+186
-6
lines changed

7 files changed

+186
-6
lines changed

src/com/rabbitmq/client/ConnectionFactory.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ public class ConnectionFactory implements Cloneable {
8888
private Map<String, Object> _clientProperties = AMQConnection.defaultClientProperties();
8989
private SocketFactory factory = SocketFactory.getDefault();
9090
private SaslConfig saslConfig = DefaultSaslConfig.PLAIN;
91+
private ExecutorService sharedExecutor;
9192

9293
/** @return number of consumer threads in default {@link ExecutorService} */
9394
@Deprecated
@@ -384,6 +385,19 @@ public void setSocketFactory(SocketFactory factory) {
384385
this.factory = factory;
385386
}
386387

388+
/**
389+
* Set the executor to use by default for newly created connections.
390+
* All connections that use this executor share it.
391+
*
392+
* It's developer's responsibility to shut down the executor
393+
* when it is no longer needed.
394+
*
395+
* @param executor
396+
*/
397+
public void setSharedExecutor(ExecutorService executor) {
398+
this.sharedExecutor = executor;
399+
}
400+
387401
public boolean isSSL(){
388402
return getSocketFactory() instanceof SSLSocketFactory;
389403
}
@@ -485,7 +499,7 @@ protected void configureSocket(Socket socket) throws IOException{
485499
* @throws IOException if it encounters a problem
486500
*/
487501
public Connection newConnection(Address[] addrs) throws IOException {
488-
return newConnection(null, addrs);
502+
return newConnection(this.sharedExecutor, addrs);
489503
}
490504

491505
/**
@@ -530,7 +544,7 @@ public Connection newConnection(ExecutorService executor, Address[] addrs)
530544
* @throws IOException if it encounters a problem
531545
*/
532546
public Connection newConnection() throws IOException {
533-
return newConnection(null,
547+
return newConnection(this.sharedExecutor,
534548
new Address[] {new Address(getHost(), getPort())}
535549
);
536550
}

src/com/rabbitmq/client/impl/AMQConnection.java

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@
4747
import com.rabbitmq.client.ShutdownSignalException;
4848
import com.rabbitmq.client.impl.AMQChannel.BlockingRpcContinuation;
4949
import com.rabbitmq.utility.BlockingCell;
50-
import com.rabbitmq.utility.Utility;
5150

5251
final class Copyright {
5352
final static String COPYRIGHT="Copyright (C) 2007-2013 GoPivotal, Inc.";
@@ -383,8 +382,8 @@ public void start()
383382

384383
try {
385384
int channelMax =
386-
negotiatedMaxValue(this.requestedChannelMax,
387-
connTune.getChannelMax());
385+
negotiateChannelMax(this.requestedChannelMax,
386+
connTune.getChannelMax());
388387
_channelManager = new ChannelManager(this._workService, channelMax);
389388

390389
int frameMax =
@@ -422,6 +421,13 @@ public void start()
422421
return;
423422
}
424423

424+
/**
425+
* Private API, allows for easier simulation of bogus clients.
426+
*/
427+
protected int negotiateChannelMax(int requestedChannelMax, int serverMax) {
428+
return negotiatedMaxValue(requestedChannelMax, serverMax);
429+
}
430+
425431
/**
426432
* Private API - check required preconditions and protocol invariants
427433
*/
@@ -475,6 +481,16 @@ public ExceptionHandler getExceptionHandler() {
475481
return _exceptionHandler;
476482
}
477483

484+
485+
/** Public API
486+
*
487+
* @return true if this work service instance uses its own executor (as opposed to a shared one)
488+
*/
489+
public boolean willShutDownConsumerExecutor() {
490+
return this._workService.usesPrivateExecutor();
491+
}
492+
493+
478494
/** Public API - {@inheritDoc} */
479495
public Channel createChannel(int channelNumber) throws IOException {
480496
ensureIsOpen();

src/com/rabbitmq/client/impl/ConsumerWorkService.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222

2323
import com.rabbitmq.client.Channel;
2424

25-
final class ConsumerWorkService {
25+
final public class ConsumerWorkService {
2626
private static final int MAX_RUNNABLE_BLOCK_SIZE = 16;
2727
private static final int DEFAULT_NUM_THREADS = 5;
2828
private final ExecutorService executor;
@@ -63,6 +63,14 @@ public void addWork(Channel channel, Runnable runnable) {
6363
}
6464
}
6565

66+
/**
67+
* @return true if executor used by this work service is managed
68+
* by it and wasn't provided by the user
69+
*/
70+
public boolean usesPrivateExecutor() {
71+
return privateExecutor;
72+
}
73+
6674
private final class WorkPoolRunnable implements Runnable {
6775

6876
public void run() {

test/src/com/rabbitmq/client/test/ClientTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ public static TestSuite suite() {
3939
suite.addTestSuite(AMQBuilderApiTest.class);
4040
suite.addTestSuite(AmqpUriTest.class);
4141
suite.addTestSuite(JSONReadWriteTest.class);
42+
suite.addTestSuite(SharedThreadPoolTest.class);
4243
return suite;
4344
}
4445
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package com.rabbitmq.client.test;
2+
3+
import com.rabbitmq.client.ConnectionFactory;
4+
import com.rabbitmq.client.impl.AMQConnection;
5+
6+
import java.io.IOException;
7+
import java.util.concurrent.ExecutorService;
8+
import java.util.concurrent.Executors;
9+
10+
public class SharedThreadPoolTest extends BrokerTestCase {
11+
public void testWillShutDownExecutor() throws IOException {
12+
ConnectionFactory cf = new ConnectionFactory();
13+
ExecutorService executor = Executors.newFixedThreadPool(8);
14+
cf.setSharedExecutor(executor);
15+
16+
AMQConnection conn1 = (AMQConnection)cf.newConnection();
17+
assertFalse(conn1.willShutDownConsumerExecutor());
18+
19+
AMQConnection conn2 = (AMQConnection)cf.newConnection(Executors.newSingleThreadExecutor());
20+
assertFalse(conn2.willShutDownConsumerExecutor());
21+
22+
AMQConnection conn3 = (AMQConnection)cf.newConnection((ExecutorService)null);
23+
assertTrue(conn3.willShutDownConsumerExecutor());
24+
25+
cf.setSharedExecutor(null);
26+
27+
AMQConnection conn4 = (AMQConnection)cf.newConnection();
28+
assertTrue(conn4.willShutDownConsumerExecutor());
29+
}
30+
}
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
package com.rabbitmq.client.test.server;
2+
3+
import com.rabbitmq.client.AMQP;
4+
import com.rabbitmq.client.Connection;
5+
import com.rabbitmq.client.ConnectionFactory;
6+
import com.rabbitmq.client.ShutdownListener;
7+
import com.rabbitmq.client.ShutdownSignalException;
8+
import com.rabbitmq.client.impl.AMQConnection;
9+
import com.rabbitmq.client.impl.ChannelN;
10+
import com.rabbitmq.client.impl.DefaultExceptionHandler;
11+
import com.rabbitmq.client.impl.SocketFrameHandler;
12+
import com.rabbitmq.client.impl.ConsumerWorkService;
13+
import com.rabbitmq.client.test.BrokerTestCase;
14+
import com.rabbitmq.tools.Host;
15+
16+
import javax.net.SocketFactory;
17+
import java.io.IOException;
18+
import java.util.concurrent.Executors;
19+
20+
public class ChannelLimitNegotiation extends BrokerTestCase {
21+
class SpecialConnection extends AMQConnection {
22+
private final int channelMax;
23+
24+
public SpecialConnection(int channelMax) throws Exception {
25+
this(new ConnectionFactory(), channelMax);
26+
}
27+
28+
private SpecialConnection(ConnectionFactory factory, int channelMax) throws Exception {
29+
super(factory.getUsername(),
30+
factory.getPassword(),
31+
new SocketFrameHandler(SocketFactory.getDefault().createSocket("localhost", AMQP.PROTOCOL.PORT)),
32+
Executors.newFixedThreadPool(1),
33+
factory.getVirtualHost(),
34+
factory.getClientProperties(),
35+
factory.getRequestedFrameMax(),
36+
channelMax,
37+
factory.getRequestedHeartbeat(),
38+
factory.getSaslConfig(),
39+
new DefaultExceptionHandler());
40+
41+
this.channelMax = channelMax;
42+
}
43+
44+
/**
45+
* Private API, allows for easier simulation of bogus clients.
46+
*/
47+
@Override
48+
protected int negotiateChannelMax(int requestedChannelMax, int serverMax) {
49+
return this.channelMax;
50+
}
51+
}
52+
53+
public void testChannelMaxLowerThanServerMinimum() throws Exception {
54+
int n = 64;
55+
ConnectionFactory cf = new ConnectionFactory();
56+
cf.setRequestedChannelMax(n);
57+
58+
Connection conn = cf.newConnection();
59+
assertEquals(n, conn.getChannelMax());
60+
}
61+
62+
public void testChannelMaxGreaterThanServerValue() throws Exception {
63+
try {
64+
Host.rabbitmqctl("eval 'application:set_env(rabbit, channel_max, 2048).'");
65+
66+
SpecialConnection connection = new SpecialConnection(4096);
67+
try {
68+
connection.start();
69+
fail("expected failure during connection negotiation");
70+
} catch (IOException e) {
71+
// expected
72+
}
73+
} finally {
74+
Host.rabbitmqctl("eval 'application:set_env(rabbit, channel_max, 0).'");
75+
}
76+
}
77+
78+
public void testOpeningTooManyChannels() throws Exception {
79+
int n = 48;
80+
81+
try {
82+
Host.rabbitmqctl("eval 'application:set_env(rabbit, channel_max, " + n + ").'");
83+
ConnectionFactory cf = new ConnectionFactory();
84+
Connection conn = cf.newConnection();
85+
assertEquals(n, conn.getChannelMax());
86+
87+
for (int i = 1; i <= n; i++) {
88+
assertNotNull(conn.createChannel(i));
89+
}
90+
// ChannelManager guards against channel.open being sent
91+
assertNull(conn.createChannel(n + 1));
92+
93+
// Construct a channel directly
94+
final ChannelN ch = new ChannelN((AMQConnection) conn, n + 1,
95+
new ConsumerWorkService(Executors.newSingleThreadExecutor()));
96+
conn.addShutdownListener(new ShutdownListener() {
97+
public void shutdownCompleted(ShutdownSignalException cause) {
98+
// make sure channel.open continuation is released
99+
ch.processShutdownSignal(cause, true, true);
100+
}
101+
});
102+
ch.open();
103+
fail("expected channel.open to cause a connection exception");
104+
} catch (IOException e) {
105+
checkShutdownSignal(530, e);
106+
} finally {
107+
Host.rabbitmqctl("eval 'application:set_env(rabbit, channel_max, 0).'");
108+
}
109+
}
110+
}

test/src/com/rabbitmq/client/test/server/ServerTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,5 +41,6 @@ public static void add(TestSuite suite) {
4141
suite.addTestSuite(PersistenceGuarantees.class);
4242
suite.addTestSuite(Shutdown.class);
4343
suite.addTestSuite(BlockedConnection.class);
44+
suite.addTestSuite(ChannelLimitNegotiation.class);
4445
}
4546
}

0 commit comments

Comments
 (0)