2121
2222import java .io .IOException ;
2323import java .net .Socket ;
24+ import java .util .concurrent .ExecutorService ;
2425
26+ import com .rabbitmq .client .Address ;
27+ import com .rabbitmq .client .AMQP ;
28+ import com .rabbitmq .client .Connection ;
2529import com .rabbitmq .client .ConnectionFactory ;
2630import com .rabbitmq .client .GetResponse ;
31+ import com .rabbitmq .client .impl .AMQConnection ;
2732import com .rabbitmq .client .impl .Frame ;
2833import com .rabbitmq .client .impl .FrameHandler ;
34+ import com .rabbitmq .client .impl .LongStringHelper ;
2935import com .rabbitmq .client .impl .SocketFrameHandler ;
3036
31- /* Publish a message of size FRAME_MAX. The broker should split this
32- * into two frames before sending back. */
3337public class FrameMax extends BrokerTestCase {
3438 /* This value for FrameMax is larger than the minimum and less
3539 * than what Rabbit suggests. */
3640 final static int FRAME_MAX = 70000 ;
3741 final static int REAL_FRAME_MAX = FRAME_MAX - 8 ;
3842
39- private String queueName ;
40-
4143 public FrameMax () {
4244 connectionFactory = new MyConnectionFactory ();
4345 connectionFactory .setRequestedFrameMax (FRAME_MAX );
4446 }
4547
46- @ Override
47- protected void createResources ()
48- throws IOException
49- {
50- queueName = channel .queueDeclare ().getQueue ();
51- }
52-
53- /* Frame content should be less or equal to frame-max - 8. */
48+ /* Publish a message of size FRAME_MAX. The broker should split
49+ * this into two frames before sending back. Frame content should
50+ * be less or equal to frame-max - 8. */
5451 public void testFrameSizes ()
5552 throws IOException , InterruptedException
5653 {
54+ String queueName = channel .queueDeclare ().getQueue ();
5755 /* This should result in at least 3 frames. */
5856 int howMuch = 2 *FRAME_MAX ;
5957 basicPublishVolatile (new byte [howMuch ], queueName );
@@ -69,6 +67,38 @@ public void testFrameSizes()
6967 }
7068 }
7169
70+ /* server should reject frames larger than AMQP.FRAME_MIN_SIZE
71+ * during connection negotiation */
72+ public void testRejectLargeFramesDuringConnectionNegotiation ()
73+ throws IOException
74+ {
75+ ConnectionFactory cf = new ConnectionFactory ();
76+ cf .getClientProperties ().put ("too_long" , LongStringHelper .asLongString (new byte [AMQP .FRAME_MIN_SIZE ]));
77+ try {
78+ cf .newConnection ();
79+ fail ("Expected exception during connection negotiation" );
80+ } catch (IOException e ) {
81+ }
82+ }
83+
84+ /* server should reject frames larger than the negotiated frame
85+ * size */
86+ public void testRejectExceedingFrameMax ()
87+ throws IOException
88+ {
89+ closeChannel ();
90+ closeConnection ();
91+ ConnectionFactory cf = new GenerousConnectionFactory ();
92+ connection = cf .newConnection ();
93+ openChannel ();
94+ try {
95+ basicPublishVolatile (new byte [connection .getFrameMax ()], "void" );
96+ channel .basicQos (0 );
97+ fail ("Expected exception when publishing" );
98+ } catch (IOException e ) {
99+ }
100+ }
101+
72102 /* ConnectionFactory that uses MyFrameHandler rather than
73103 * SocketFrameHandler. */
74104 private static class MyConnectionFactory extends ConnectionFactory {
@@ -97,4 +127,53 @@ public Frame readFrame() throws IOException {
97127 return f ;
98128 }
99129 }
130+
131+ /*
132+ AMQConnection with a frame_max that is one higher than what it
133+ tells the server.
134+ */
135+ private static class GenerousAMQConnection extends AMQConnection {
136+
137+ public GenerousAMQConnection (ConnectionFactory factory ,
138+ FrameHandler handler ,
139+ ExecutorService executor ) {
140+ super (factory .getUsername (),
141+ factory .getPassword (),
142+ handler ,
143+ executor ,
144+ factory .getVirtualHost (),
145+ factory .getClientProperties (),
146+ factory .getRequestedFrameMax (),
147+ factory .getRequestedChannelMax (),
148+ factory .getRequestedHeartbeat (),
149+ factory .getSaslConfig ());
150+ }
151+
152+ @ Override public int getFrameMax () {
153+ return super .getFrameMax () + 1 ;
154+ }
155+
156+ }
157+
158+ private static class GenerousConnectionFactory extends ConnectionFactory {
159+
160+ @ Override public Connection newConnection (ExecutorService executor , Address [] addrs )
161+ throws IOException
162+ {
163+ IOException lastException = null ;
164+ for (Address addr : addrs ) {
165+ try {
166+ FrameHandler frameHandler = createFrameHandler (addr );
167+ AMQConnection conn = new GenerousAMQConnection (this , frameHandler , executor );
168+ conn .start ();
169+ return conn ;
170+ } catch (IOException e ) {
171+ lastException = e ;
172+ }
173+ }
174+ throw (lastException != null ) ? lastException
175+ : new IOException ("failed to connect" );
176+ }
177+ }
178+
100179}
0 commit comments