3232package com .rabbitmq .client .test .functional ;
3333
3434import com .rabbitmq .client .test .BrokerTestCase ;
35- import com .rabbitmq .client .AMQP ;
3635import com .rabbitmq .client .AckListener ;
3736import com .rabbitmq .client .DefaultConsumer ;
37+ import com .rabbitmq .client .GetResponse ;
3838import com .rabbitmq .client .MessageProperties ;
3939
4040import java .io .IOException ;
41+ import java .util .Collections ;
42+ import java .util .Map ;
4143import java .util .Set ;
4244import java .util .TreeSet ;
4345
4446public class Confirm extends BrokerTestCase
4547{
4648 final static int NUM_MESSAGES = 1000 ;
49+ private static final String TTL_ARG = "x-message-ttl" ;
4750 volatile Set <Long > ackSet ;
4851
4952 @ Override
@@ -71,6 +74,8 @@ public void handleAck(long seqNo,
7174 channel .queueDeclare ("confirm-test-noconsumer" , true , true , true , null );
7275 channel .queueDeclare ("confirm-test-2" , true , true , true , null );
7376 channel .basicConsume ("confirm-test-2" , true , new DefaultConsumer (channel ));
77+ Map <String , Object > argMap = Collections .singletonMap (TTL_ARG , (Object )1 );
78+ channel .queueDeclare ("confirm-ttl" , true , true , true , argMap );
7479 channel .queueBind ("confirm-test" , "amq.direct" , "confirm-multiple-queues" );
7580 channel .queueBind ("confirm-test-2" , "amq.direct" , "confirm-multiple-queues" );
7681 }
@@ -129,10 +134,7 @@ public void testConfirmMultipleQueues()
129134 public void testConfirmQueueDelete ()
130135 throws IOException , InterruptedException
131136 {
132- for (long i = 0 ; i < NUM_MESSAGES ; i ++) {
133- publish ("" , "confirm-test-noconsumer" , true , false , false );
134- ackSet .add (i );
135- }
137+ publishN ("" ,"confirm-test-noconsumer" , true , false , false );
136138
137139 channel .queueDelete ("confirm-test-noconsumer" );
138140
@@ -143,12 +145,33 @@ public void testConfirmQueueDelete()
143145 public void testConfirmQueuePurge ()
144146 throws IOException , InterruptedException
145147 {
148+ publishN ("" , "confirm-test-noconsumer" , true , false , false );
149+
150+ channel .queuePurge ("confirm-test-noconsumer" );
151+
152+ while (ackSet .size () > 0 )
153+ Thread .sleep (10 );
154+ }
155+
156+ public void testConfirmBasicReject ()
157+ throws IOException , InterruptedException
158+ {
159+ publishN ("" , "confirm-test-noconsumer" , true , false , false );
160+
146161 for (long i = 0 ; i < NUM_MESSAGES ; i ++) {
147- publish ("" , "confirm-test-noconsumer" , true , false , false );
148- ackSet .add (i );
162+ GetResponse resp = channel .basicGet ("confirm-test-noconsumer" , false );
163+ long dtag = resp .getEnvelope ().getDeliveryTag ();
164+ channel .basicReject (dtag , false );
149165 }
150166
151- channel .queuePurge ("confirm-test-noconsumer" );
167+ while (ackSet .size () > 0 )
168+ Thread .sleep (10 );
169+ }
170+
171+ public void testConfirmQueueTTL ()
172+ throws IOException , InterruptedException
173+ {
174+ publishN ("" , "confirm-ttl" , true , false , false );
152175
153176 while (ackSet .size () > 0 )
154177 Thread .sleep (10 );
@@ -161,16 +184,24 @@ public void confirmTest(String exchange, String queueName,
161184 boolean immediate )
162185 throws IOException , InterruptedException
163186 {
164- for (long i = 0 ; i < NUM_MESSAGES ; i ++) {
165- publish (exchange , queueName , persistent ,
166- mandatory , immediate );
167- ackSet .add (i );
168- }
187+ publishN (exchange , queueName , persistent , mandatory , immediate );
169188
170189 while (ackSet .size () > 0 )
171190 Thread .sleep (10 );
172191 }
173192
193+ private void publishN (String exchangeName , String queueName ,
194+ boolean persistent , boolean mandatory ,
195+ boolean immediate )
196+ throws IOException
197+ {
198+ for (long i = 0 ; i < NUM_MESSAGES ; i ++) {
199+ publish (exchangeName , queueName , persistent , mandatory , immediate );
200+ ackSet .add (i );
201+ }
202+ }
203+
204+
174205 private void publish (String exchangeName , String queueName ,
175206 boolean persistent , boolean mandatory ,
176207 boolean immediate )
0 commit comments