Skip to content

Commit 8f38b89

Browse files
author
Simon MacMullen
committed
Merge bug 24991
2 parents c16bdd5 + e887652 commit 8f38b89

File tree

2 files changed

+108
-26
lines changed

2 files changed

+108
-26
lines changed

src/com/rabbitmq/client/impl/AMQCommand.java

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -32,15 +32,15 @@
3232
*/
3333
public class AMQCommand implements Command {
3434

35-
/** EMPTY_CONTENT_BODY_FRAME_SIZE = 8 = 1 + 2 + 4 + 1
35+
/** EMPTY_FRAME_SIZE = 8 = 1 + 2 + 4 + 1
3636
* <ul><li>1 byte of frame type</li>
3737
* <li>2 bytes of channel number</li>
3838
* <li>4 bytes of frame payload length</li>
3939
* <li>1 byte of payload trailer FRAME_END byte</li></ul>
40-
* See {@link #checkEmptyContentBodyFrameSize}, an assertion
41-
* checked at startup.
40+
* See {@link #checkEmptyFrameSize}, an assertion checked at
41+
* startup.
4242
*/
43-
private static final int EMPTY_CONTENT_BODY_FRAME_SIZE = 8;
43+
public static final int EMPTY_FRAME_SIZE = 8;
4444

4545
/** The assembler for this command - synchronised on - contains all the state */
4646
private final CommandAssembler assembler;
@@ -108,7 +108,7 @@ public void transmit(AMQChannel channel) throws IOException {
108108

109109
int frameMax = connection.getFrameMax();
110110
int bodyPayloadMax = (frameMax == 0) ? body.length : frameMax
111-
- EMPTY_CONTENT_BODY_FRAME_SIZE;
111+
- EMPTY_FRAME_SIZE;
112112

113113
for (int offset = 0; offset < body.length; offset += bodyPayloadMax) {
114114
int remaining = body.length - offset;
@@ -157,26 +157,26 @@ private static StringBuilder contentBodyStringBuilder(byte[] body, boolean suppr
157157

158158
/** Called to check internal code assumptions. */
159159
public static void checkPreconditions() {
160-
checkEmptyContentBodyFrameSize();
160+
checkEmptyFrameSize();
161161
}
162162

163163
/**
164-
* Since we're using a pre-computed value for
165-
* EMPTY_CONTENT_BODY_FRAME_SIZE we check this is
166-
* actually correct when run against the framing code in Frame.
164+
* Since we're using a pre-computed value for EMPTY_FRAME_SIZE we
165+
* check this is actually correct when run against the framing
166+
* code in Frame.
167167
*/
168-
private static void checkEmptyContentBodyFrameSize() {
168+
private static void checkEmptyFrameSize() {
169169
Frame f = new Frame(AMQP.FRAME_BODY, 0, new byte[0]);
170170
ByteArrayOutputStream s = new ByteArrayOutputStream();
171171
try {
172172
f.writeTo(new DataOutputStream(s));
173173
} catch (IOException ioe) {
174-
throw new AssertionError("IOException while checking EMPTY_CONTENT_BODY_FRAME_SIZE");
174+
throw new AssertionError("IOException while checking EMPTY_FRAME_SIZE");
175175
}
176176
int actualLength = s.toByteArray().length;
177-
if (EMPTY_CONTENT_BODY_FRAME_SIZE != actualLength) {
178-
throw new AssertionError("Internal error: expected EMPTY_CONTENT_BODY_FRAME_SIZE("
179-
+ EMPTY_CONTENT_BODY_FRAME_SIZE
177+
if (EMPTY_FRAME_SIZE != actualLength) {
178+
throw new AssertionError("Internal error: expected EMPTY_FRAME_SIZE("
179+
+ EMPTY_FRAME_SIZE
180180
+ ") is not equal to computed value: " + actualLength);
181181
}
182182
}

test/src/com/rabbitmq/client/test/functional/FrameMax.java

Lines changed: 94 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,39 +21,38 @@
2121

2222
import java.io.IOException;
2323
import java.net.Socket;
24+
import java.util.concurrent.ExecutorService;
2425

26+
import com.rabbitmq.client.Address;
27+
import com.rabbitmq.client.AMQP;
28+
import com.rabbitmq.client.Connection;
2529
import com.rabbitmq.client.ConnectionFactory;
2630
import com.rabbitmq.client.GetResponse;
31+
import com.rabbitmq.client.impl.AMQConnection;
32+
import com.rabbitmq.client.impl.AMQCommand;
2733
import com.rabbitmq.client.impl.Frame;
2834
import com.rabbitmq.client.impl.FrameHandler;
35+
import com.rabbitmq.client.impl.LongStringHelper;
2936
import com.rabbitmq.client.impl.SocketFrameHandler;
3037

31-
/* Publish a message of size FRAME_MAX. The broker should split this
32-
* into two frames before sending back. */
3338
public class FrameMax extends BrokerTestCase {
3439
/* This value for FrameMax is larger than the minimum and less
3540
* than what Rabbit suggests. */
3641
final static int FRAME_MAX = 70000;
3742
final static int REAL_FRAME_MAX = FRAME_MAX - 8;
3843

39-
private String queueName;
40-
4144
public FrameMax() {
4245
connectionFactory = new MyConnectionFactory();
4346
connectionFactory.setRequestedFrameMax(FRAME_MAX);
4447
}
4548

46-
@Override
47-
protected void createResources()
48-
throws IOException
49-
{
50-
queueName = channel.queueDeclare().getQueue();
51-
}
52-
53-
/* Frame content should be less or equal to frame-max - 8. */
49+
/* Publish a message of size FRAME_MAX. The broker should split
50+
* this into two frames before sending back. Frame content should
51+
* be less or equal to frame-max - 8. */
5452
public void testFrameSizes()
5553
throws IOException, InterruptedException
5654
{
55+
String queueName = channel.queueDeclare().getQueue();
5756
/* This should result in at least 3 frames. */
5857
int howMuch = 2*FRAME_MAX;
5958
basicPublishVolatile(new byte[howMuch], queueName);
@@ -69,6 +68,38 @@ public void testFrameSizes()
6968
}
7069
}
7170

71+
/* server should reject frames larger than AMQP.FRAME_MIN_SIZE
72+
* during connection negotiation */
73+
public void testRejectLargeFramesDuringConnectionNegotiation()
74+
throws IOException
75+
{
76+
ConnectionFactory cf = new ConnectionFactory();
77+
cf.getClientProperties().put("too_long", LongStringHelper.asLongString(new byte[AMQP.FRAME_MIN_SIZE]));
78+
try {
79+
cf.newConnection();
80+
fail("Expected exception during connection negotiation");
81+
} catch (IOException e) {
82+
}
83+
}
84+
85+
/* server should reject frames larger than the negotiated frame
86+
* size */
87+
public void testRejectExceedingFrameMax()
88+
throws IOException
89+
{
90+
closeChannel();
91+
closeConnection();
92+
ConnectionFactory cf = new GenerousConnectionFactory();
93+
connection = cf.newConnection();
94+
openChannel();
95+
try {
96+
basicPublishVolatile(new byte[connection.getFrameMax()], "void");
97+
channel.basicQos(0);
98+
fail("Expected exception when publishing");
99+
} catch (IOException e) {
100+
}
101+
}
102+
72103
/* ConnectionFactory that uses MyFrameHandler rather than
73104
* SocketFrameHandler. */
74105
private static class MyConnectionFactory extends ConnectionFactory {
@@ -97,4 +128,55 @@ public Frame readFrame() throws IOException {
97128
return f;
98129
}
99130
}
131+
132+
/*
133+
AMQConnection with a frame_max that is one higher than what it
134+
tells the server.
135+
*/
136+
private static class GenerousAMQConnection extends AMQConnection {
137+
138+
public GenerousAMQConnection(ConnectionFactory factory,
139+
FrameHandler handler,
140+
ExecutorService executor) {
141+
super(factory.getUsername(),
142+
factory.getPassword(),
143+
handler,
144+
executor,
145+
factory.getVirtualHost(),
146+
factory.getClientProperties(),
147+
factory.getRequestedFrameMax(),
148+
factory.getRequestedChannelMax(),
149+
factory.getRequestedHeartbeat(),
150+
factory.getSaslConfig());
151+
}
152+
153+
@Override public int getFrameMax() {
154+
// the RabbitMQ broker permits frames that are oversize by
155+
// up to EMPTY_FRAME_SIZE octets
156+
return super.getFrameMax() + AMQCommand.EMPTY_FRAME_SIZE + 1;
157+
}
158+
159+
}
160+
161+
private static class GenerousConnectionFactory extends ConnectionFactory {
162+
163+
@Override public Connection newConnection(ExecutorService executor, Address[] addrs)
164+
throws IOException
165+
{
166+
IOException lastException = null;
167+
for (Address addr : addrs) {
168+
try {
169+
FrameHandler frameHandler = createFrameHandler(addr);
170+
AMQConnection conn = new GenerousAMQConnection(this, frameHandler, executor);
171+
conn.start();
172+
return conn;
173+
} catch (IOException e) {
174+
lastException = e;
175+
}
176+
}
177+
throw (lastException != null) ? lastException
178+
: new IOException("failed to connect");
179+
}
180+
}
181+
100182
}

0 commit comments

Comments
 (0)