3333
3434import java .io .IOException ;
3535import java .util .Arrays ;
36- import java .util .List ;
3736import java .util .ArrayList ;
37+ import java .util .LinkedList ;
38+ import java .util .List ;
39+ import java .util .Queue ;
3840
3941import com .rabbitmq .client .GetResponse ;
4042import com .rabbitmq .client .QueueingConsumer ;
@@ -73,15 +75,24 @@ public void fill(int n)
7375 * receive n messages - check that we receive no fewer and cannot
7476 * receive more
7577 **/
76- public void drain (QueueingConsumer c , int n )
78+ public Queue < Delivery > drain (QueueingConsumer c , int n )
7779 throws IOException
7880 {
81+ Queue <Delivery > res = new LinkedList <Delivery >();
7982 try {
80- Thread .sleep (n * 10 + 50 );
81- assertEquals (n , c .getQueue ().size ());
83+ long start = System .currentTimeMillis ();
84+ for (int i = 0 ; i < n ; i ++) {
85+ Delivery d = c .nextDelivery (1000 );
86+ assertNotNull (d );
87+ res .offer (d );
88+ }
89+ long finish = System .currentTimeMillis ();
90+ Thread .sleep ( (n == 0 ? 0 : (finish - start ) / n ) + 10 );
91+ assertNull (c .nextDelivery (0 ));
8292 } catch (InterruptedException ie ) {
8393 fail ("interrupted" );
8494 }
95+ return res ;
8596 }
8697
8798 public void testMessageLimitGlobalFails ()
@@ -128,8 +139,8 @@ public void testFairness()
128139 List <String > queues = configure (c , 1 , queueCount , messageCount );
129140
130141 for (int i = 0 ; i < messageCount - 1 ; i ++) {
131- drain (c , 1 );
132- ack (c , false );
142+ Queue < Delivery > d = drain (c , 1 );
143+ ack (d , false );
133144 }
134145
135146 //Perfect fairness would result in every queue having
@@ -181,7 +192,7 @@ protected void runLimitTestsHelper(int limit,
181192 }
182193
183194 //is limit enforced?
184- drain (c , limit );
195+ Queue < Delivery > d = drain (c , limit );
185196
186197 //is basic.get not limited?
187198 List <Long > tags = new ArrayList <Long >();
@@ -193,7 +204,7 @@ protected void runLimitTestsHelper(int limit,
193204
194205 //are acks handled correctly?
195206 //and does the basic.get above have no effect on limiting?
196- Delivery last = ack (c , multiAck );
207+ Delivery last = ack (d , multiAck );
197208 if (txMode ) {
198209 drain (c , 0 );
199210 channel .txRollback ();
@@ -204,9 +215,6 @@ protected void runLimitTestsHelper(int limit,
204215 drain (c , limit );
205216
206217 //do acks for basic.gets have no effect on limiting?
207- for (int i = 0 ; i < limit ; i ++) {
208- c .nextDelivery ();
209- }
210218 for (long t : tags ) {
211219 channel .basicAck (t , false );
212220 }
@@ -216,18 +224,17 @@ protected void runLimitTestsHelper(int limit,
216224 drain (c , 0 );
217225 }
218226
219- protected Delivery ack (QueueingConsumer c , boolean multiAck )
227+ protected Delivery ack (Queue < Delivery > d , boolean multiAck )
220228 throws IOException , InterruptedException
221229 {
222230 Delivery last = null ;
223- if (multiAck ) {
224- for (Delivery tmp = null ; (tmp = c .nextDelivery (0 )) != null ; last = tmp );
225- ackDelivery (last , true );
226- } else {
227- for (Delivery tmp = null ; (tmp = c .nextDelivery (0 )) != null ; last = tmp ) {
228- ackDelivery (tmp , false );
229- }
231+
232+ for (Delivery tmp : d ) {
233+ if (!multiAck ) ackDelivery (tmp , false );
234+ last = tmp ;
230235 }
236+ if (multiAck ) ackDelivery (last , true );
237+
231238 return last ;
232239 }
233240
0 commit comments