1818package com .rabbitmq .client .test .functional ;
1919
2020import com .rabbitmq .client .AMQP ;
21+ import com .rabbitmq .client .GetResponse ;
2122import com .rabbitmq .client .MessageProperties ;
2223import com .rabbitmq .client .test .BrokerTestCase ;
2324import java .io .IOException ;
3031public class QueueSizeLimit extends BrokerTestCase {
3132
3233 private final int MAXLENGTH = 5 ;
33- private final String q = "queue-maxdepth" ;
34+ private final int MAXLENGTH1 = MAXLENGTH + 1 ;
35+ private final String q = "queue-maxlength" ;
3436
3537 @ Override
3638 protected void setUp () throws IOException {
3739 super .setUp ();
3840 channel .confirmSelect ();
3941 }
4042
41- private void setupDlx () throws IOException {
42- channel .exchangeDeclare ("DLX" , "fanout" );
43+ AMQP .BasicProperties setupDlx (boolean persistent ) throws IOException {
4344 channel .queueDeclare ("DLQ" , false , true , false , null );
44- channel .queueBind ("DLQ" , "DLX" , "test" );
45- }
46-
47- private void tearDownDlx () throws IOException {
48- channel .exchangeDelete ("DLX" );
49- channel .queueDelete ("DLQ" );
45+ channel .queueBind ("DLQ" , "amq.fanout" , "" );
46+ declareQueue (persistent , true );
47+ AMQP .BasicProperties props = null ;
48+ if (persistent ) {
49+ props = MessageProperties .MINIMAL_PERSISTENT_BASIC ;
50+ }
51+ return props ;
5052 }
5153
5254 public void testQueueSize () throws IOException , InterruptedException {
53- declareQueue ();
54- fillUp ();
55- syncPublish (null , "msg" + MAXLENGTH + 1 );
56- assertEquals (MAXLENGTH , declareQueue ());
57- assertHead ("msg2" , q );
55+ declareQueue (false , false );
56+ fill (false , false );
57+ syncPublish (null , "msg" + MAXLENGTH1 );
58+ assertHead (MAXLENGTH , "msg2" , q );
5859 }
5960
6061 public void testQueueUnacked () throws IOException , InterruptedException {
61- declareQueue ();
62- fillUnacked ();
63- syncPublish (null , "msg" + MAXLENGTH + 1 );
64- assertEquals (1 , declareQueue ());
65- assertHead ("msg" + MAXLENGTH + 1 , q );
62+ declareQueue (false , false );
63+ fill (false , true );
64+ syncPublish (null , "msg" + MAXLENGTH1 );
65+ assertHead (1 , "msg" + MAXLENGTH1 , q );
6666 }
6767
6868 public void testPersistent () throws IOException , InterruptedException {
69- declareQueue (true );
70- fillUnacked (true );
71- syncPublish (MessageProperties .MINIMAL_PERSISTENT_BASIC , "msg" + MAXLENGTH + 1 );
72- assertEquals (1 , declareQueue (true ));
73- assertHead ("msg" + MAXLENGTH + 1 , q );
69+ declareQueue (true , false );
70+ fill (true , true );
71+ syncPublish (MessageProperties .MINIMAL_PERSISTENT_BASIC , "msg" + MAXLENGTH1 );
72+ assertHead (1 , "msg" + MAXLENGTH1 , q );
7473 }
7574
7675 public void testDlxHeadTransient () throws IOException , InterruptedException {
@@ -90,83 +89,54 @@ public void testDlxTailDurable() throws IOException, InterruptedException {
9089 }
9190
9291 public void dlxHead (boolean persistent ) throws IOException , InterruptedException {
93- declareQueue (persistent , true );
94- setupDlx ();
95- AMQP .BasicProperties props = null ;
96- if (persistent )
97- props = MessageProperties .MINIMAL_PERSISTENT_BASIC ;
98- fillUp (persistent );
99- syncPublish (props , "msg" + MAXLENGTH + 1 );
100- assertEquals (MAXLENGTH , declareQueue (persistent ));
101- assertHead ("msg1" , "DLQ" );
102- assertNull (channel .basicGet ("DLQ" , true ));
103- tearDownDlx ();
104- }
105-
106- public void dlxTail (boolean persistent ) throws IOException , InterruptedException {
107- declareQueue (persistent , true );
108- setupDlx ();
109- AMQP .BasicProperties props = null ;
110- if (persistent )
111- props = MessageProperties .MINIMAL_PERSISTENT_BASIC ;
112- fillUnacked (persistent );
113- syncPublish (props , "msg" + MAXLENGTH + 1 );
114- assertNull (null , channel .basicGet ("DLQ" , true ));
115- assertHead ("msg" + MAXLENGTH + 1 , q );
116- assertNull (channel .basicGet (q , true ));
117- tearDownDlx ();
118- }
119-
120- private int declareQueue () throws IOException {
121- return declareQueue (false , false );
122- }
123-
124- private int declareQueue (boolean durable ) throws IOException {
125- return declareQueue (durable , false );
126- }
127-
128- private void fillUp () throws IOException , InterruptedException {
129- fill (false , false );
130- }
131-
132- private void fillUp (boolean persistent ) throws IOException , InterruptedException {
92+ AMQP .BasicProperties props = setupDlx (persistent );
13393 fill (persistent , false );
94+ syncPublish (props , "msg" + MAXLENGTH1 );
95+ assertEquals (MAXLENGTH , declareQueue (persistent , true ));
96+ assertHead (1 , "msg1" , "DLQ" );
13497 }
13598
136- private void fillUnacked () throws IOException , InterruptedException {
137- fill (false , true );
138- }
139-
140- private void fillUnacked (boolean persistent ) throws IOException , InterruptedException {
99+ public void dlxTail (boolean persistent ) throws IOException , InterruptedException {
100+ AMQP .BasicProperties props = setupDlx (persistent );
141101 fill (persistent , true );
102+ syncPublish (props , "msg" + MAXLENGTH1 );
103+ assertNull (null , channel .basicGet ("DLQ" , true ));
104+ assertHead (1 , "msg" + MAXLENGTH1 , q );
142105 }
143106
144107 private void fill (boolean persistent , boolean unAcked ) throws IOException , InterruptedException {
145108 for (int i =1 ; i <= MAXLENGTH ; i ++){
146109 syncPublish (null , "msg" + i );
147110 if (unAcked ) {
148- channel .basicGet (q , false );
149- assertEquals (0 , declareQueue (persistent ));
150- } else {
151- assertEquals (i , declareQueue (persistent ));
111+ assertNotNull (channel .basicGet (q , false ));
152112 }
153113 }
114+ if (unAcked ) {
115+ assertEquals (0 , declareQueue (persistent , false ));
116+ } else {
117+ assertEquals (MAXLENGTH , declareQueue (persistent , false ));
118+ }
154119 }
155120
156121 private void syncPublish (AMQP .BasicProperties props , String payload ) throws IOException , InterruptedException {
157122 channel .basicPublish ("" , q , props , payload .getBytes ());
158123 channel .waitForConfirmsOrDie ();
159124 }
160125
161- private int declareQueue (boolean durable , boolean dlx ) throws IOException {
126+ private int declareQueue (boolean persistent , boolean dlx ) throws IOException {
162127 Map <String , Object > args = new HashMap <String , Object >();
163128 args .put ("x-max-length" , MAXLENGTH );
164- if (dlx ) args .put ("x-dead-letter-exchange" , "DLX" );
165- AMQP .Queue .DeclareOk ok = channel .queueDeclare (q , durable , true , true , args );
129+ if (dlx ) {
130+ args .put ("x-dead-letter-exchange" , "amq.fanout" );
131+ }
132+ AMQP .Queue .DeclareOk ok = channel .queueDeclare (q , persistent , true , true , args );
166133 return ok .getMessageCount ();
167134 }
168135
169- private void assertHead (String expected , String queueName ) throws IOException {
170- assertEquals (expected , new String (channel .basicGet (queueName , true ).getBody ()));
136+ private void assertHead (int expectedLength , String expectedPayload , String queueName ) throws IOException {
137+ GetResponse head = channel .basicGet (queueName , true );
138+ assertNotNull (head );
139+ assertEquals (expectedPayload , new String (head .getBody ()));
140+ assertEquals (expectedLength , head .getMessageCount () + 1 );
171141 }
172142}
0 commit comments