1818package com .rabbitmq .client .test .functional ;
1919
2020import com .rabbitmq .client .AMQP ;
21+ import com .rabbitmq .client .MessageProperties ;
2122import com .rabbitmq .client .test .BrokerTestCase ;
2223import java .io .IOException ;
2324import java .util .HashMap ;
@@ -37,55 +38,134 @@ protected void setUp() throws IOException {
3738 channel .confirmSelect ();
3839 }
3940
41+ private void setupDlx () throws IOException {
42+ channel .exchangeDeclare ("DLX" , "fanout" );
43+ channel .queueDeclare ("DLQ" , false , true , false , null );
44+ channel .queueBind ("DLQ" , "DLX" , "test" );
45+ }
46+
47+ private void tearDownDlx () throws IOException {
48+ channel .exchangeDelete ("DLX" );
49+ channel .queueDelete ("DLQ" );
50+ }
51+
4052 public void testQueueSize () throws IOException , InterruptedException {
4153 declareQueue ();
42- for (int i =1 ; i <= MAXDEPTH ; i ++){
43- channel .basicPublish ("" , q , null , ("msg" + i ).getBytes ());
44- channel .waitForConfirmsOrDie ();
45- assertEquals (i , declareQueue ());
46- }
47- for (int i =1 ; i <= MAXDEPTH ; i ++){
48- channel .basicPublish ("" , q , null , ("msg overflow" ).getBytes ());
49- channel .waitForConfirmsOrDie ();
50- assertEquals (MAXDEPTH , declareQueue ());
51- }
54+ fillUp ();
55+ syncPublish (null , "overflow" );
56+ assertEquals (MAXDEPTH , declareQueue ());
5257 }
5358
5459 public void testQueueAllUnacked () throws IOException , InterruptedException {
5560 declareQueue ();
56- for (int i =1 ; i <= MAXDEPTH ; i ++){
57- channel .basicPublish ("" , q , null , ("msg" + i ).getBytes ());
58- channel .waitForConfirmsOrDie ();
59- channel .basicGet (q , false );
60- assertEquals (0 , declareQueue ());
61- }
62- channel .basicPublish ("" , q , null , ("msg overflow" ).getBytes ());
63- channel .waitForConfirmsOrDie ();
64- assertEquals (null , channel .basicGet (q , false ));
61+ fillUnacked ();
62+ syncPublish (null , "overflow" );
63+ assertEquals (null , channel .basicGet (q , true ));
6564 }
6665
6766 public void testQueueSomeUnacked () throws IOException , InterruptedException {
6867 declareQueue ();
69- for (int i =1 ; i <= MAXDEPTH - 1 ; i ++){
70- channel .basicPublish ("" , q , null , ("msg" + i ).getBytes ());
71- channel .waitForConfirmsOrDie ();
72- channel .basicGet (q , false );
73- assertEquals (0 , declareQueue ());
74- }
75- channel .basicPublish ("" , q , null , ("msg" + MAXDEPTH ).getBytes ());
76- channel .waitForConfirmsOrDie ();
68+ fillUnacked (false , MAXDEPTH - 1 );
69+ syncPublish (null , "msg" + MAXDEPTH );
7770 assertEquals (1 , declareQueue ());
7871
79- channel .basicPublish ("" , q , null , ("msg overflow" ).getBytes ());
80- channel .waitForConfirmsOrDie ();
81- assertEquals ("msg overflow" , new String (channel .basicGet (q , false ).getBody ()));
72+ syncPublish (null , "overflow" );
73+ assertEquals ("overflow" , new String (channel .basicGet (q , true ).getBody ()));
74+ assertEquals (null , channel .basicGet (q , true ));
75+ }
76+
77+ public void testConfirmPersistent () throws IOException , InterruptedException {
78+ declareQueue (true );
79+ fillUnacked (true , MAXDEPTH );
80+ syncPublish (MessageProperties .MINIMAL_PERSISTENT_BASIC , "overflow" );
8281 assertEquals (null , channel .basicGet (q , false ));
8382 }
8483
84+ public void testDlxHeadTransient () throws IOException , InterruptedException {
85+ dlxHead (false );
86+ }
87+
88+ public void testDlxTailTransient () throws IOException , InterruptedException {
89+ dlxTail (false );
90+ }
91+
92+ public void testDlxHeadDurable () throws IOException , InterruptedException {
93+ dlxHead (true );
94+ }
95+
96+ public void testDlxTailDurable () throws IOException , InterruptedException {
97+ dlxTail (true );
98+ }
99+
100+ public void dlxHead (boolean persistent ) throws IOException , InterruptedException {
101+ declareQueue (persistent , true );
102+ setupDlx ();
103+ AMQP .BasicProperties props = null ;
104+ if (persistent )
105+ props = MessageProperties .MINIMAL_PERSISTENT_BASIC ;
106+ fillUp (persistent );
107+ syncPublish (props , "overflow" );
108+ assertEquals (MAXDEPTH , declareQueue (persistent ));
109+ assertEquals ("msg1" , new String (channel .basicGet ("DLQ" , true ).getBody ()));
110+ assertNull (channel .basicGet ("DLQ" , true ));
111+ tearDownDlx ();
112+ }
113+
114+ public void dlxTail (boolean persistent ) throws IOException , InterruptedException {
115+ declareQueue (persistent , true );
116+ setupDlx ();
117+ AMQP .BasicProperties props = null ;
118+ if (persistent )
119+ props = MessageProperties .MINIMAL_PERSISTENT_BASIC ;
120+ fillUnacked (persistent , MAXDEPTH );
121+ syncPublish (props , "overflow" );
122+ assertEquals (null , channel .basicGet (q , false ));
123+ assertEquals ("overflow" , new String (channel .basicGet ("DLQ" , true ).getBody ()));
124+ assertNull (channel .basicGet ("DLQ" , true ));
125+ tearDownDlx ();
126+ }
127+
85128 private int declareQueue () throws IOException {
129+ return declareQueue (false , false );
130+ }
131+
132+ private int declareQueue (boolean durable ) throws IOException {
133+ return declareQueue (durable , false );
134+ }
135+
136+ private void fillUp () throws IOException , InterruptedException {
137+ fillUp (false );
138+ }
139+
140+ private void fillUp (boolean persistent ) throws IOException , InterruptedException {
141+ for (int i =1 ; i <= MAXDEPTH ; i ++){
142+ syncPublish (null , "msg" + i );
143+ assertEquals (i , declareQueue (persistent ));
144+ }
145+ }
146+
147+ private void fillUnacked () throws IOException , InterruptedException {
148+ fillUnacked (false , MAXDEPTH );
149+ }
150+
151+ private void fillUnacked (boolean persistent , int depth ) throws IOException , InterruptedException {
152+ for (int i =1 ; i <= depth ; i ++){
153+ syncPublish (null , "msg" + i );
154+ channel .basicGet (q , false );
155+ assertEquals (0 , declareQueue (persistent ));
156+ }
157+ }
158+
159+ private void syncPublish (AMQP .BasicProperties props , String payload ) throws IOException , InterruptedException {
160+ channel .basicPublish ("" , q , props , payload .getBytes ());
161+ channel .waitForConfirmsOrDie ();
162+ }
163+
164+ private int declareQueue (boolean durable , boolean dlx ) throws IOException {
86165 Map <String , Object > args = new HashMap <String , Object >();
87166 args .put ("x-maxdepth" , MAXDEPTH );
88- AMQP .Queue .DeclareOk ok = channel .queueDeclare (q , false , true , true , args );
167+ if (dlx ) args .put ("x-dead-letter-exchange" , "DLX" );
168+ AMQP .Queue .DeclareOk ok = channel .queueDeclare (q , durable , true , true , args );
89169 return ok .getMessageCount ();
90170 }
91171}
0 commit comments