11package com .rabbitmq .client .test .functional ;
22
3+ import com .rabbitmq .client .GetResponse ;
34import com .rabbitmq .client .QueueingConsumer ;
45import com .rabbitmq .client .QueueingConsumer .Delivery ;
56import com .rabbitmq .client .test .BrokerTestCase ;
67
78import java .io .IOException ;
9+ import java .util .Arrays ;
10+ import java .util .Deque ;
811import java .util .HashMap ;
912import java .util .Map ;
10- import java .util .Queue ;
1113
1214import static com .rabbitmq .client .test .functional .QosTests .drain ;
1315
@@ -19,25 +21,84 @@ protected void createResources() throws IOException {
1921 q = channel .queueDeclare ().getQueue ();
2022 }
2123
22- public void testPrefetch () throws IOException {
23- QueueingConsumer c = new QueueingConsumer (channel );
24- publish (q , 5 );
25- consume (c , false );
26- Queue <Delivery > dels = drain (c , 2 );
27- for (Delivery del : dels ) {
28- ack (del , false );
24+ private interface Closure {
25+ public void makeMore (Deque <Delivery > deliveries ) throws IOException ;
26+ }
27+
28+ public void testSingleAck () throws IOException {
29+ testPrefetch (new Closure () {
30+ public void makeMore (Deque <Delivery > deliveries ) throws IOException {
31+ for (Delivery del : deliveries ) {
32+ ack (del , false );
33+ }
34+ }
35+ });
36+ }
37+
38+ public void testMultiAck () throws IOException {
39+ testPrefetch (new Closure () {
40+ public void makeMore (Deque <Delivery > deliveries ) throws IOException {
41+ ack (deliveries .getLast (), true );
42+ }
43+ });
44+ }
45+
46+ public void testSingleNack () throws IOException {
47+ for (final boolean requeue : Arrays .asList (false , true )) {
48+ testPrefetch (new Closure () {
49+ public void makeMore (Deque <Delivery > deliveries ) throws IOException {
50+ for (Delivery del : deliveries ) {
51+ nack (del , false , requeue );
52+ }
53+ }
54+ });
55+ }
56+ }
57+
58+ public void testMultiNack () throws IOException {
59+ for (final boolean requeue : Arrays .asList (false , true )) {
60+ testPrefetch (new Closure () {
61+ public void makeMore (Deque <Delivery > deliveries ) throws IOException {
62+ nack (deliveries .getLast (), true , requeue );
63+ }
64+ });
2965 }
30- Delivery last = drain (c , 2 ).getLast ();
31- ack (last , true );
32- drain (c , 1 );
66+ }
67+
68+ public void testRecover () throws IOException {
69+ testPrefetch (new Closure () {
70+ public void makeMore (Deque <Delivery > deliveries ) throws IOException {
71+ channel .basicRecover ();
72+ }
73+ });
74+ }
75+
76+ private void testPrefetch (Closure closure ) throws IOException {
77+ QueueingConsumer c = new QueueingConsumer (channel );
78+ publish (q , 15 );
79+ consume (c , 5 , false );
80+ Deque <Delivery > deliveries = drain (c , 5 );
81+
82+ ack (channel .basicGet (q , false ), false );
83+ drain (c , 0 );
84+
85+ closure .makeMore (deliveries );
86+ drain (c , 5 );
87+ }
88+
89+ public void testPrefetchOnEmpty () throws IOException {
90+ QueueingConsumer c = new QueueingConsumer (channel );
3391 publish (q , 5 );
34- drain (c , 1 );
92+ consume (c , 10 , false );
93+ drain (c , 5 );
94+ publish (q , 10 );
95+ drain (c , 5 );
3596 }
3697
3798 public void testAutoAckIgnoresPrefetch () throws IOException {
3899 QueueingConsumer c = new QueueingConsumer (channel );
39100 publish (q , 10 );
40- consume (c , true );
101+ consume (c , 1 , true );
41102 drain (c , 10 );
42103 }
43104
@@ -47,14 +108,22 @@ private void publish(String q, int n) throws IOException {
47108 }
48109 }
49110
50- private void consume (QueueingConsumer c , boolean autoAck ) throws IOException {
51- channel .basicConsume (q , autoAck , "" , false , false , args (2 ), c );
111+ private void consume (QueueingConsumer c , int prefetch , boolean autoAck ) throws IOException {
112+ channel .basicConsume (q , autoAck , "" , false , false , args (prefetch ), c );
52113 }
53114
54115 private void ack (Delivery del , boolean multi ) throws IOException {
55116 channel .basicAck (del .getEnvelope ().getDeliveryTag (), multi );
56117 }
57118
119+ private void ack (GetResponse get , boolean multi ) throws IOException {
120+ channel .basicAck (get .getEnvelope ().getDeliveryTag (), multi );
121+ }
122+
123+ private void nack (Delivery del , boolean multi , boolean requeue ) throws IOException {
124+ channel .basicNack (del .getEnvelope ().getDeliveryTag (), multi , requeue );
125+ }
126+
58127 private Map <String , Object > args (int prefetch ) {
59128 Map <String , Object > a = new HashMap <String , Object >();
60129 if (prefetch != 0 ) {
0 commit comments