Skip to content

Commit 20cdf42

Browse files
committed
merge stable into default
2 parents 1820704 + 035792e commit 20cdf42

File tree

11 files changed

+192
-63
lines changed

11 files changed

+192
-63
lines changed

src/com/rabbitmq/client/Channel.java

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import com.rabbitmq.client.AMQP.Tx;
2727
import com.rabbitmq.client.AMQP.Basic;
2828
import com.rabbitmq.client.AMQP.Confirm;
29-
import com.rabbitmq.client.AMQP.Channel.FlowOk;
3029

3130
/**
3231
* Public API: Interface to an AMQ channel. See the <a href="http://www.amqp.org/">spec</a> for details.
@@ -90,17 +89,11 @@ public interface Channel extends ShutdownNotifier {
9089
void close(int closeCode, String closeMessage) throws IOException;
9190

9291
/**
93-
* Set flow on the channel
94-
*
95-
* @param active if true, the server is asked to start sending. If false, the server is asked to stop sending.
96-
* @throws IOException
97-
*/
98-
FlowOk flow(boolean active) throws IOException;
99-
100-
/**
101-
* Return the current Channel.Flow settings.
92+
* Indicates whether the server has asked this client to stop
93+
* sending content-bearing commands (such as basic.publish) by
94+
* issueing a channel.flow{active=false}.
10295
*/
103-
FlowOk getFlow();
96+
boolean flowBlocked();
10497

10598
/**
10699
* Abort this channel with the {@link com.rabbitmq.client.AMQP#REPLY_SUCCESS} close code

src/com/rabbitmq/client/ConnectionFactory.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ public class ConnectionFactory implements Cloneable {
8888
private Map<String, Object> _clientProperties = AMQConnection.defaultClientProperties();
8989
private SocketFactory factory = SocketFactory.getDefault();
9090
private SaslConfig saslConfig = DefaultSaslConfig.PLAIN;
91+
private ExecutorService sharedExecutor;
9192

9293
/** @return number of consumer threads in default {@link ExecutorService} */
9394
@Deprecated
@@ -384,6 +385,19 @@ public void setSocketFactory(SocketFactory factory) {
384385
this.factory = factory;
385386
}
386387

388+
/**
389+
* Set the executor to use by default for newly created connections.
390+
* All connections that use this executor share it.
391+
*
392+
* It's developer's responsibility to shut down the executor
393+
* when it is no longer needed.
394+
*
395+
* @param executor
396+
*/
397+
public void setSharedExecutor(ExecutorService executor) {
398+
this.sharedExecutor = executor;
399+
}
400+
387401
public boolean isSSL(){
388402
return getSocketFactory() instanceof SSLSocketFactory;
389403
}
@@ -485,7 +499,7 @@ protected void configureSocket(Socket socket) throws IOException{
485499
* @throws IOException if it encounters a problem
486500
*/
487501
public Connection newConnection(Address[] addrs) throws IOException {
488-
return newConnection(null, addrs);
502+
return newConnection(this.sharedExecutor, addrs);
489503
}
490504

491505
/**
@@ -530,7 +544,7 @@ public Connection newConnection(ExecutorService executor, Address[] addrs)
530544
* @throws IOException if it encounters a problem
531545
*/
532546
public Connection newConnection() throws IOException {
533-
return newConnection(null,
547+
return newConnection(this.sharedExecutor,
534548
new Address[] {new Address(getHost(), getPort())}
535549
);
536550
}

src/com/rabbitmq/client/impl/AMQConnection.java

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@
4747
import com.rabbitmq.client.ShutdownSignalException;
4848
import com.rabbitmq.client.impl.AMQChannel.BlockingRpcContinuation;
4949
import com.rabbitmq.utility.BlockingCell;
50-
import com.rabbitmq.utility.Utility;
5150

5251
final class Copyright {
5352
final static String COPYRIGHT="Copyright (C) 2007-2013 GoPivotal, Inc.";
@@ -383,8 +382,8 @@ public void start()
383382

384383
try {
385384
int channelMax =
386-
negotiatedMaxValue(this.requestedChannelMax,
387-
connTune.getChannelMax());
385+
negotiateChannelMax(this.requestedChannelMax,
386+
connTune.getChannelMax());
388387
_channelManager = new ChannelManager(this._workService, channelMax);
389388

390389
int frameMax =
@@ -422,6 +421,13 @@ public void start()
422421
return;
423422
}
424423

424+
/**
425+
* Private API, allows for easier simulation of bogus clients.
426+
*/
427+
protected int negotiateChannelMax(int requestedChannelMax, int serverMax) {
428+
return negotiatedMaxValue(requestedChannelMax, serverMax);
429+
}
430+
425431
/**
426432
* Private API - check required preconditions and protocol invariants
427433
*/
@@ -475,6 +481,16 @@ public ExceptionHandler getExceptionHandler() {
475481
return _exceptionHandler;
476482
}
477483

484+
485+
/** Public API
486+
*
487+
* @return true if this work service instance uses its own executor (as opposed to a shared one)
488+
*/
489+
public boolean willShutDownConsumerExecutor() {
490+
return this._workService.usesPrivateExecutor();
491+
}
492+
493+
478494
/** Public API - {@inheritDoc} */
479495
public Channel createChannel(int channelNumber) throws IOException {
480496
ensureIsOpen();

src/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1057,13 +1057,8 @@ public Confirm.SelectOk confirmSelect()
10571057
}
10581058

10591059
/** Public API - {@inheritDoc} */
1060-
public Channel.FlowOk flow(final boolean a) throws IOException {
1061-
return (Channel.FlowOk) exnWrappingRpc(new Channel.Flow(a)).getMethod();
1062-
}
1063-
1064-
/** Public API - {@inheritDoc} */
1065-
public Channel.FlowOk getFlow() {
1066-
return new Channel.FlowOk(!_blockContent);
1060+
public boolean flowBlocked() {
1061+
return _blockContent;
10671062
}
10681063

10691064
/** Public API - {@inheritDoc} */

src/com/rabbitmq/client/impl/ConsumerWorkService.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222

2323
import com.rabbitmq.client.Channel;
2424

25-
final class ConsumerWorkService {
25+
final public class ConsumerWorkService {
2626
private static final int MAX_RUNNABLE_BLOCK_SIZE = 16;
2727
private static final int DEFAULT_NUM_THREADS = 5;
2828
private final ExecutorService executor;
@@ -63,6 +63,14 @@ public void addWork(Channel channel, Runnable runnable) {
6363
}
6464
}
6565

66+
/**
67+
* @return true if executor used by this work service is managed
68+
* by it and wasn't provided by the user
69+
*/
70+
public boolean usesPrivateExecutor() {
71+
return privateExecutor;
72+
}
73+
6674
private final class WorkPoolRunnable implements Runnable {
6775

6876
public void run() {

test/src/com/rabbitmq/client/test/ClientTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ public static TestSuite suite() {
3939
suite.addTestSuite(AMQBuilderApiTest.class);
4040
suite.addTestSuite(AmqpUriTest.class);
4141
suite.addTestSuite(JSONReadWriteTest.class);
42+
suite.addTestSuite(SharedThreadPoolTest.class);
4243
return suite;
4344
}
4445
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package com.rabbitmq.client.test;
2+
3+
import com.rabbitmq.client.ConnectionFactory;
4+
import com.rabbitmq.client.impl.AMQConnection;
5+
6+
import java.io.IOException;
7+
import java.util.concurrent.ExecutorService;
8+
import java.util.concurrent.Executors;
9+
10+
public class SharedThreadPoolTest extends BrokerTestCase {
11+
public void testWillShutDownExecutor() throws IOException {
12+
ConnectionFactory cf = new ConnectionFactory();
13+
ExecutorService executor = Executors.newFixedThreadPool(8);
14+
cf.setSharedExecutor(executor);
15+
16+
AMQConnection conn1 = (AMQConnection)cf.newConnection();
17+
assertFalse(conn1.willShutDownConsumerExecutor());
18+
19+
AMQConnection conn2 = (AMQConnection)cf.newConnection(Executors.newSingleThreadExecutor());
20+
assertFalse(conn2.willShutDownConsumerExecutor());
21+
22+
AMQConnection conn3 = (AMQConnection)cf.newConnection((ExecutorService)null);
23+
assertTrue(conn3.willShutDownConsumerExecutor());
24+
25+
cf.setSharedExecutor(null);
26+
27+
AMQConnection conn4 = (AMQConnection)cf.newConnection();
28+
assertTrue(conn4.willShutDownConsumerExecutor());
29+
}
30+
}

test/src/com/rabbitmq/client/test/functional/QosTests.java

Lines changed: 0 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -344,43 +344,6 @@ public void testLimitInheritsUnackedCount()
344344
drain(c, 1);
345345
}
346346

347-
public void testFlow() throws IOException
348-
{
349-
QueueingConsumer c = new QueueingConsumer(channel);
350-
declareBindConsume(c);
351-
fill(1);
352-
drain(c, 1);
353-
channel.flow(false);
354-
fill(1);
355-
drain(c, 0);
356-
channel.flow(true);
357-
drain(c, 1);
358-
}
359-
360-
public void testLimitAndFlow() throws IOException
361-
{
362-
channel.basicQos(1);
363-
QueueingConsumer c = new QueueingConsumer(channel);
364-
declareBindConsume(c);
365-
channel.flow(false);
366-
fill(3);
367-
drain(c, 0);
368-
channel.flow(true);
369-
ack(drain(c, 1), false);
370-
drain(c, 1);
371-
channel.basicQos(0);
372-
drain(c, 1);
373-
}
374-
375-
public void testNoConsumers() throws Exception {
376-
String q = declareBind(channel);
377-
fill(1);
378-
channel.flow(false);
379-
QueueingConsumer c = new QueueingConsumer(channel);
380-
channel.basicConsume(q, c);
381-
drain(c, 0);
382-
}
383-
384347
public void testRecoverReducesLimit() throws Exception {
385348
channel.basicQos(2);
386349
QueueingConsumer c = new QueueingConsumer(channel);

test/src/com/rabbitmq/client/test/performance/QosScaling.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -129,9 +129,7 @@ public long run() throws IOException {
129129
channel.basicQos(1);
130130
QueueingConsumer consumer = new QueueingConsumer(channel);
131131
try {
132-
channel.flow(false);
133132
publish(consume(consumer));
134-
channel.flow(true);
135133
return drain(consumer);
136134
} finally {
137135
connection.abort();
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
package com.rabbitmq.client.test.server;
2+
3+
import com.rabbitmq.client.AMQP;
4+
import com.rabbitmq.client.Connection;
5+
import com.rabbitmq.client.ConnectionFactory;
6+
import com.rabbitmq.client.ShutdownListener;
7+
import com.rabbitmq.client.ShutdownSignalException;
8+
import com.rabbitmq.client.impl.AMQConnection;
9+
import com.rabbitmq.client.impl.ChannelN;
10+
import com.rabbitmq.client.impl.DefaultExceptionHandler;
11+
import com.rabbitmq.client.impl.SocketFrameHandler;
12+
import com.rabbitmq.client.impl.ConsumerWorkService;
13+
import com.rabbitmq.client.test.BrokerTestCase;
14+
import com.rabbitmq.tools.Host;
15+
16+
import javax.net.SocketFactory;
17+
import java.io.IOException;
18+
import java.util.concurrent.Executors;
19+
20+
public class ChannelLimitNegotiation extends BrokerTestCase {
21+
class SpecialConnection extends AMQConnection {
22+
private final int channelMax;
23+
24+
public SpecialConnection(int channelMax) throws Exception {
25+
this(new ConnectionFactory(), channelMax);
26+
}
27+
28+
private SpecialConnection(ConnectionFactory factory, int channelMax) throws Exception {
29+
super(factory.getUsername(),
30+
factory.getPassword(),
31+
new SocketFrameHandler(SocketFactory.getDefault().createSocket("localhost", AMQP.PROTOCOL.PORT)),
32+
Executors.newFixedThreadPool(1),
33+
factory.getVirtualHost(),
34+
factory.getClientProperties(),
35+
factory.getRequestedFrameMax(),
36+
channelMax,
37+
factory.getRequestedHeartbeat(),
38+
factory.getSaslConfig(),
39+
new DefaultExceptionHandler());
40+
41+
this.channelMax = channelMax;
42+
}
43+
44+
/**
45+
* Private API, allows for easier simulation of bogus clients.
46+
*/
47+
@Override
48+
protected int negotiateChannelMax(int requestedChannelMax, int serverMax) {
49+
return this.channelMax;
50+
}
51+
}
52+
53+
public void testChannelMaxLowerThanServerMinimum() throws Exception {
54+
int n = 64;
55+
ConnectionFactory cf = new ConnectionFactory();
56+
cf.setRequestedChannelMax(n);
57+
58+
Connection conn = cf.newConnection();
59+
assertEquals(n, conn.getChannelMax());
60+
}
61+
62+
public void testChannelMaxGreaterThanServerValue() throws Exception {
63+
try {
64+
Host.rabbitmqctl("eval 'application:set_env(rabbit, channel_max, 2048).'");
65+
66+
SpecialConnection connection = new SpecialConnection(4096);
67+
try {
68+
connection.start();
69+
fail("expected failure during connection negotiation");
70+
} catch (IOException e) {
71+
// expected
72+
}
73+
} finally {
74+
Host.rabbitmqctl("eval 'application:set_env(rabbit, channel_max, 0).'");
75+
}
76+
}
77+
78+
public void testOpeningTooManyChannels() throws Exception {
79+
int n = 48;
80+
81+
try {
82+
Host.rabbitmqctl("eval 'application:set_env(rabbit, channel_max, " + n + ").'");
83+
ConnectionFactory cf = new ConnectionFactory();
84+
Connection conn = cf.newConnection();
85+
assertEquals(n, conn.getChannelMax());
86+
87+
for (int i = 1; i <= n; i++) {
88+
assertNotNull(conn.createChannel(i));
89+
}
90+
// ChannelManager guards against channel.open being sent
91+
assertNull(conn.createChannel(n + 1));
92+
93+
// Construct a channel directly
94+
final ChannelN ch = new ChannelN((AMQConnection) conn, n + 1,
95+
new ConsumerWorkService(Executors.newSingleThreadExecutor()));
96+
conn.addShutdownListener(new ShutdownListener() {
97+
public void shutdownCompleted(ShutdownSignalException cause) {
98+
// make sure channel.open continuation is released
99+
ch.processShutdownSignal(cause, true, true);
100+
}
101+
});
102+
ch.open();
103+
fail("expected channel.open to cause a connection exception");
104+
} catch (IOException e) {
105+
checkShutdownSignal(530, e);
106+
} finally {
107+
Host.rabbitmqctl("eval 'application:set_env(rabbit, channel_max, 0).'");
108+
}
109+
}
110+
}

0 commit comments

Comments
 (0)