2525import java .util .Map ;
2626
2727/**
28- * Test queue maxdepth limit.
28+ * Test queue max length limit.
2929 */
3030public class QueueSizeLimit extends BrokerTestCase {
3131
32- private final int MAXDEPTH = 5 ;
32+ private final int MAXLENGTH = 5 ;
3333 private final String q = "queue-maxdepth" ;
3434
3535 @ Override
@@ -52,33 +52,25 @@ private void tearDownDlx() throws IOException {
5252 public void testQueueSize () throws IOException , InterruptedException {
5353 declareQueue ();
5454 fillUp ();
55- syncPublish (null , "overflow" );
56- assertEquals (MAXDEPTH , declareQueue ());
55+ syncPublish (null , "msg" + MAXLENGTH + 1 );
56+ assertEquals (MAXLENGTH , declareQueue ());
57+ assertHead ("msg2" , q );
5758 }
5859
59- public void testQueueAllUnacked () throws IOException , InterruptedException {
60+ public void testQueueUnacked () throws IOException , InterruptedException {
6061 declareQueue ();
6162 fillUnacked ();
62- syncPublish (null , "overflow" );
63- assertEquals (null , channel .basicGet (q , true ));
64- }
65-
66- public void testQueueSomeUnacked () throws IOException , InterruptedException {
67- declareQueue ();
68- fillUnacked (false , MAXDEPTH - 1 );
69- syncPublish (null , "msg" + MAXDEPTH );
63+ syncPublish (null , "msg" + MAXLENGTH + 1 );
7064 assertEquals (1 , declareQueue ());
71-
72- syncPublish (null , "overflow" );
73- assertEquals ("overflow" , new String (channel .basicGet (q , true ).getBody ()));
74- assertEquals (null , channel .basicGet (q , true ));
65+ assertHead ("msg" + MAXLENGTH + 1 , q );
7566 }
7667
77- public void testConfirmPersistent () throws IOException , InterruptedException {
68+ public void testPersistent () throws IOException , InterruptedException {
7869 declareQueue (true );
79- fillUnacked (true , MAXDEPTH );
80- syncPublish (MessageProperties .MINIMAL_PERSISTENT_BASIC , "overflow" );
81- assertEquals (null , channel .basicGet (q , false ));
70+ fillUnacked (true );
71+ syncPublish (MessageProperties .MINIMAL_PERSISTENT_BASIC , "msg" + MAXLENGTH + 1 );
72+ assertEquals (1 , declareQueue (true ));
73+ assertHead ("msg" + MAXLENGTH + 1 , q );
8274 }
8375
8476 public void testDlxHeadTransient () throws IOException , InterruptedException {
@@ -104,9 +96,9 @@ public void dlxHead(boolean persistent) throws IOException, InterruptedException
10496 if (persistent )
10597 props = MessageProperties .MINIMAL_PERSISTENT_BASIC ;
10698 fillUp (persistent );
107- syncPublish (props , "overflow" );
108- assertEquals (MAXDEPTH , declareQueue (persistent ));
109- assertEquals ("msg1" , new String ( channel . basicGet ( "DLQ" , true ). getBody ()) );
99+ syncPublish (props , "msg" + MAXLENGTH + 1 );
100+ assertEquals (MAXLENGTH , declareQueue (persistent ));
101+ assertHead ("msg1" , "DLQ" );
110102 assertNull (channel .basicGet ("DLQ" , true ));
111103 tearDownDlx ();
112104 }
@@ -117,11 +109,11 @@ public void dlxTail(boolean persistent) throws IOException, InterruptedException
117109 AMQP .BasicProperties props = null ;
118110 if (persistent )
119111 props = MessageProperties .MINIMAL_PERSISTENT_BASIC ;
120- fillUnacked (persistent , MAXDEPTH );
121- syncPublish (props , "overflow" );
122- assertEquals (null , channel .basicGet (q , false ));
123- assertEquals ( "overflow" , new String ( channel . basicGet ( "DLQ" , true ). getBody ()) );
124- assertNull (channel .basicGet ("DLQ" , true ));
112+ fillUnacked (persistent );
113+ syncPublish (props , "msg" + MAXLENGTH + 1 );
114+ assertNull (null , channel .basicGet ("DLQ" , true ));
115+ assertHead ( "msg" + MAXLENGTH + 1 , q );
116+ assertNull (channel .basicGet (q , true ));
125117 tearDownDlx ();
126118 }
127119
@@ -134,25 +126,30 @@ private int declareQueue(boolean durable) throws IOException {
134126 }
135127
136128 private void fillUp () throws IOException , InterruptedException {
137- fillUp ( false );
129+ fill ( false , false );
138130 }
139131
140132 private void fillUp (boolean persistent ) throws IOException , InterruptedException {
141- for (int i =1 ; i <= MAXDEPTH ; i ++){
142- syncPublish (null , "msg" + i );
143- assertEquals (i , declareQueue (persistent ));
144- }
133+ fill (persistent , false );
145134 }
146135
147136 private void fillUnacked () throws IOException , InterruptedException {
148- fillUnacked (false , MAXDEPTH );
137+ fill (false , true );
138+ }
139+
140+ private void fillUnacked (boolean persistent ) throws IOException , InterruptedException {
141+ fill (persistent , true );
149142 }
150143
151- private void fillUnacked (boolean persistent , int depth ) throws IOException , InterruptedException {
152- for (int i =1 ; i <= depth ; i ++){
144+ private void fill (boolean persistent , boolean unAcked ) throws IOException , InterruptedException {
145+ for (int i =1 ; i <= MAXLENGTH ; i ++){
153146 syncPublish (null , "msg" + i );
154- channel .basicGet (q , false );
155- assertEquals (0 , declareQueue (persistent ));
147+ if (unAcked ) {
148+ channel .basicGet (q , false );
149+ assertEquals (0 , declareQueue (persistent ));
150+ } else {
151+ assertEquals (i , declareQueue (persistent ));
152+ }
156153 }
157154 }
158155
@@ -163,9 +160,13 @@ private void syncPublish(AMQP.BasicProperties props, String payload) throws IOEx
163160
164161 private int declareQueue (boolean durable , boolean dlx ) throws IOException {
165162 Map <String , Object > args = new HashMap <String , Object >();
166- args .put ("x-maxdepth " , MAXDEPTH );
163+ args .put ("x-max-length " , MAXLENGTH );
167164 if (dlx ) args .put ("x-dead-letter-exchange" , "DLX" );
168165 AMQP .Queue .DeclareOk ok = channel .queueDeclare (q , durable , true , true , args );
169166 return ok .getMessageCount ();
170167 }
168+
169+ private void assertHead (String expected , String queueName ) throws IOException {
170+ assertEquals (expected , new String (channel .basicGet (queueName , true ).getBody ()));
171+ }
171172}
0 commit comments