Skip to content

Commit a9dbea3

Browse files
author
Emile Joubert
committed
Merged bug21836 into default
2 parents b28aae5 + bc990ab commit a9dbea3

File tree

2 files changed

+190
-0
lines changed

2 files changed

+190
-0
lines changed
Lines changed: 189 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,189 @@
1+
// The contents of this file are subject to the Mozilla Public License
2+
// Version 1.1 (the "License"); you may not use this file except in
3+
// compliance with the License. You may obtain a copy of the License at
4+
// http://www.mozilla.org/MPL/
5+
//
6+
// Software distributed under the License is distributed on an "AS IS"
7+
// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
8+
// License for the specific language governing rights and limitations
9+
// under the License.
10+
//
11+
// The Original Code is RabbitMQ.
12+
//
13+
// The Initial Developers of the Original Code are LShift Ltd,
14+
// Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
15+
//
16+
// Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
17+
// Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
18+
// are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
19+
// Technologies LLC, and Rabbit Technologies Ltd.
20+
//
21+
// Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
22+
// Ltd. Portions created by Cohesive Financial Technologies LLC are
23+
// Copyright (C) 2007-2010 Cohesive Financial Technologies
24+
// LLC. Portions created by Rabbit Technologies Ltd are Copyright
25+
// (C) 2007-2010 Rabbit Technologies Ltd.
26+
//
27+
// All Rights Reserved.
28+
//
29+
// Contributor(s): ______________________________________.
30+
//
31+
32+
package com.rabbitmq.client.test.functional;
33+
34+
import com.rabbitmq.client.test.BrokerTestCase;
35+
36+
import java.io.IOException;
37+
import java.net.Socket;
38+
import java.net.InetSocketAddress;
39+
40+
import com.rabbitmq.client.Address;
41+
import com.rabbitmq.client.ConnectionFactory;
42+
import com.rabbitmq.client.MessageProperties;
43+
import com.rabbitmq.client.QueueingConsumer;
44+
import com.rabbitmq.client.QueueingConsumer.Delivery;
45+
import com.rabbitmq.client.impl.AMQConnection;
46+
import com.rabbitmq.client.impl.Frame;
47+
import com.rabbitmq.client.impl.FrameHandler;
48+
import com.rabbitmq.client.impl.SocketFrameHandler;
49+
50+
/* Publish a message of size FRAME_MAX. The broker should split this
51+
* into two frames before sending back. */
52+
public class FrameMax extends BrokerTestCase {
53+
/* This value for FrameMax is larger than the minimum and less
54+
* than what Rabbit suggests. */
55+
final static int FRAME_MAX = 131008;
56+
final static int REAL_FRAME_MAX = FRAME_MAX - 8;
57+
final static int TIMEOUT = 3000; /* Time to wait for messages. */
58+
final static String EXCHANGE_NAME = "xchg1";
59+
final static String ROUTING_KEY = "something";
60+
61+
QueueingConsumer consumer;
62+
63+
public FrameMax() {
64+
connectionFactory = new MyConnectionFactory();
65+
}
66+
67+
@Override
68+
protected void setUp()
69+
throws IOException
70+
{
71+
super.setUp();
72+
connectionFactory.setRequestedFrameMax(FRAME_MAX);
73+
}
74+
75+
@Override
76+
protected void createResources()
77+
throws IOException
78+
{
79+
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
80+
consumer = new QueueingConsumer(channel);
81+
String queueName = channel.queueDeclare().getQueue();
82+
channel.basicConsume(queueName, consumer);
83+
channel.queueBind(queueName, EXCHANGE_NAME, ROUTING_KEY);
84+
}
85+
86+
@Override
87+
protected void releaseResources()
88+
throws IOException
89+
{
90+
consumer = null;
91+
channel.exchangeDelete(EXCHANGE_NAME);
92+
}
93+
94+
/* Frame content should be less or equal to frame-max - 8. */
95+
public void testFrameSizes()
96+
throws IOException, InterruptedException
97+
{
98+
/* This should result in at least 3 frames. */
99+
int howMuch = 2*FRAME_MAX;
100+
produce(howMuch);
101+
/* Receive everything that was sent out. */
102+
while (howMuch > 0) {
103+
try {
104+
Delivery delivery = consumer.nextDelivery(TIMEOUT);
105+
howMuch -= delivery.getBody().length;
106+
} catch (RuntimeException e) {
107+
fail(e.toString());
108+
}
109+
}
110+
}
111+
112+
/* Send out howMuch worth of gibberish */
113+
protected void produce(int howMuch)
114+
throws IOException
115+
{
116+
while (howMuch > 0) {
117+
int size = (howMuch <= (REAL_FRAME_MAX)) ? howMuch : (REAL_FRAME_MAX);
118+
publish(new byte[size]);
119+
howMuch -= (REAL_FRAME_MAX);
120+
}
121+
}
122+
123+
/* Publish a non-persistant, non-immediate message. */
124+
private void publish(byte[] msg)
125+
throws IOException
126+
{
127+
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY,
128+
false, false,
129+
MessageProperties.MINIMAL_BASIC,
130+
msg);
131+
}
132+
133+
/* ConnectionFactory that uses MyFrameHandler rather than
134+
* SocketFrameHandler. */
135+
private static class MyConnectionFactory extends ConnectionFactory {
136+
protected FrameHandler createFrameHandler(Address addr)
137+
throws IOException
138+
{
139+
String hostName = addr.getHost();
140+
int portNumber = portOrDefault(addr.getPort());
141+
Socket socket = getSocketFactory().createSocket();
142+
configureSocket(socket);
143+
socket.connect(new InetSocketAddress(hostName, portNumber));
144+
return new MyFrameHandler(socket);
145+
}
146+
147+
/* Copy-pasted from ConnectionFactory. Should be protected,
148+
* rather than private. */
149+
private int portOrDefault(int port){
150+
if (port != USE_DEFAULT_PORT) return port;
151+
else if (isSSL()) return DEFAULT_AMQP_OVER_SSL_PORT;
152+
else return DEFAULT_AMQP_PORT;
153+
}
154+
}
155+
156+
/* FrameHandler with added frame-max error checking. */
157+
private static class MyFrameHandler extends SocketFrameHandler {
158+
public MyFrameHandler(Socket socket)
159+
throws IOException
160+
{
161+
super(socket);
162+
}
163+
164+
public Frame readFrame() throws IOException {
165+
Frame f = super.readFrame();
166+
int size = f.getPayload().length;
167+
if (size > REAL_FRAME_MAX)
168+
throw new FrameTooLargeException(size, REAL_FRAME_MAX);
169+
//System.out.printf("Received a frame of size %d.\n", f.getPayload().length);
170+
return f;
171+
}
172+
}
173+
174+
private static class FrameTooLargeException extends RuntimeException {
175+
private int _frameSize;
176+
private int _maxSize;
177+
178+
public FrameTooLargeException(int frameSize, int maxSize) {
179+
_frameSize = frameSize;
180+
_maxSize = maxSize;
181+
}
182+
183+
@Override
184+
public String toString() {
185+
return "Received frame of size " + _frameSize
186+
+ ", which exceeds " + _maxSize + ".";
187+
}
188+
}
189+
}

test/src/com/rabbitmq/client/test/functional/FunctionalTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ public static TestSuite suite() {
5454
suite.addTestSuite(QosTests.class);
5555
suite.addTestSuite(AlternateExchange.class);
5656
suite.addTestSuite(ExchangeDeclare.class);
57+
suite.addTestSuite(FrameMax.class);
5758
suite.addTestSuite(QueueLifecycle.class);
5859
suite.addTestSuite(QueueExclusivity.class);
5960
suite.addTestSuite(InvalidAcks.class);

0 commit comments

Comments
 (0)