Skip to content

Commit 6c16884

Browse files
author
Simon MacMullen
committed
Merge bug22525
2 parents 8753d32 + fc2f2a7 commit 6c16884

File tree

4 files changed

+121
-3
lines changed

4 files changed

+121
-3
lines changed

src/com/rabbitmq/client/impl/AMQConnection.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -382,8 +382,8 @@ public void start()
382382

383383
try {
384384
int channelMax =
385-
negotiatedMaxValue(this.requestedChannelMax,
386-
connTune.getChannelMax());
385+
negotiateChannelMax(this.requestedChannelMax,
386+
connTune.getChannelMax());
387387
_channelManager = new ChannelManager(this._workService, channelMax);
388388

389389
int frameMax =
@@ -421,6 +421,13 @@ public void start()
421421
return;
422422
}
423423

424+
/**
425+
* Private API, allows for easier simulation of bogus clients.
426+
*/
427+
protected int negotiateChannelMax(int requestedChannelMax, int serverMax) {
428+
return negotiatedMaxValue(requestedChannelMax, serverMax);
429+
}
430+
424431
/**
425432
* Private API - check required preconditions and protocol invariants
426433
*/

src/com/rabbitmq/client/impl/ConsumerWorkService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222

2323
import com.rabbitmq.client.Channel;
2424

25-
final class ConsumerWorkService {
25+
final public class ConsumerWorkService {
2626
private static final int MAX_RUNNABLE_BLOCK_SIZE = 16;
2727
private static final int DEFAULT_NUM_THREADS = 5;
2828
private final ExecutorService executor;
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
package com.rabbitmq.client.test.server;
2+
3+
import com.rabbitmq.client.AMQP;
4+
import com.rabbitmq.client.Connection;
5+
import com.rabbitmq.client.ConnectionFactory;
6+
import com.rabbitmq.client.ShutdownListener;
7+
import com.rabbitmq.client.ShutdownSignalException;
8+
import com.rabbitmq.client.impl.AMQConnection;
9+
import com.rabbitmq.client.impl.ChannelN;
10+
import com.rabbitmq.client.impl.DefaultExceptionHandler;
11+
import com.rabbitmq.client.impl.SocketFrameHandler;
12+
import com.rabbitmq.client.impl.ConsumerWorkService;
13+
import com.rabbitmq.client.test.BrokerTestCase;
14+
import com.rabbitmq.tools.Host;
15+
16+
import javax.net.SocketFactory;
17+
import java.io.IOException;
18+
import java.util.concurrent.Executors;
19+
20+
public class ChannelLimitNegotiation extends BrokerTestCase {
21+
class SpecialConnection extends AMQConnection {
22+
private final int channelMax;
23+
24+
public SpecialConnection(int channelMax) throws Exception {
25+
this(new ConnectionFactory(), channelMax);
26+
}
27+
28+
private SpecialConnection(ConnectionFactory factory, int channelMax) throws Exception {
29+
super(factory.getUsername(),
30+
factory.getPassword(),
31+
new SocketFrameHandler(SocketFactory.getDefault().createSocket("localhost", AMQP.PROTOCOL.PORT)),
32+
Executors.newFixedThreadPool(1),
33+
factory.getVirtualHost(),
34+
factory.getClientProperties(),
35+
factory.getRequestedFrameMax(),
36+
channelMax,
37+
factory.getRequestedHeartbeat(),
38+
factory.getSaslConfig(),
39+
new DefaultExceptionHandler());
40+
41+
this.channelMax = channelMax;
42+
}
43+
44+
/**
45+
* Private API, allows for easier simulation of bogus clients.
46+
*/
47+
@Override
48+
protected int negotiateChannelMax(int requestedChannelMax, int serverMax) {
49+
return this.channelMax;
50+
}
51+
}
52+
53+
public void testChannelMaxLowerThanServerMinimum() throws Exception {
54+
int n = 64;
55+
ConnectionFactory cf = new ConnectionFactory();
56+
cf.setRequestedChannelMax(n);
57+
58+
Connection conn = cf.newConnection();
59+
assertEquals(n, conn.getChannelMax());
60+
}
61+
62+
public void testChannelMaxGreaterThanServerValue() throws Exception {
63+
try {
64+
Host.rabbitmqctl("eval 'application:set_env(rabbit, channel_max, 2048).'");
65+
66+
SpecialConnection connection = new SpecialConnection(4096);
67+
try {
68+
connection.start();
69+
fail("expected failure during connection negotiation");
70+
} catch (IOException e) {
71+
// expected
72+
}
73+
} finally {
74+
Host.rabbitmqctl("eval 'application:set_env(rabbit, channel_max, 0).'");
75+
}
76+
}
77+
78+
public void testOpeningTooManyChannels() throws Exception {
79+
int n = 48;
80+
81+
try {
82+
Host.rabbitmqctl("eval 'application:set_env(rabbit, channel_max, " + n + ").'");
83+
ConnectionFactory cf = new ConnectionFactory();
84+
Connection conn = cf.newConnection();
85+
assertEquals(n, conn.getChannelMax());
86+
87+
for (int i = 1; i <= n; i++) {
88+
assertNotNull(conn.createChannel(i));
89+
}
90+
// ChannelManager guards against channel.open being sent
91+
assertNull(conn.createChannel(n + 1));
92+
93+
// Construct a channel directly
94+
final ChannelN ch = new ChannelN((AMQConnection) conn, n + 1,
95+
new ConsumerWorkService(Executors.newSingleThreadExecutor()));
96+
conn.addShutdownListener(new ShutdownListener() {
97+
public void shutdownCompleted(ShutdownSignalException cause) {
98+
// make sure channel.open continuation is released
99+
ch.processShutdownSignal(cause, true, true);
100+
}
101+
});
102+
ch.open();
103+
fail("expected channel.open to cause a connection exception");
104+
} catch (IOException e) {
105+
checkShutdownSignal(530, e);
106+
} finally {
107+
Host.rabbitmqctl("eval 'application:set_env(rabbit, channel_max, 0).'");
108+
}
109+
}
110+
}

test/src/com/rabbitmq/client/test/server/ServerTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,5 +41,6 @@ public static void add(TestSuite suite) {
4141
suite.addTestSuite(PersistenceGuarantees.class);
4242
suite.addTestSuite(Shutdown.class);
4343
suite.addTestSuite(BlockedConnection.class);
44+
suite.addTestSuite(ChannelLimitNegotiation.class);
4445
}
4546
}

0 commit comments

Comments
 (0)