@@ -99,7 +99,7 @@ public void testMessageLimitGlobalFails()
9999 public void testMessageLimitUnlimited ()
100100 throws IOException
101101 {
102- QueueingConsumer c = publishLimitAndConsume ( 2 , 0 , 1 );
102+ QueueingConsumer c = configure ( 0 , 1 , 2 );
103103 drain (c , 2 );
104104 }
105105
@@ -142,9 +142,7 @@ protected void runLimitTestsHelper(int limit,
142142
143143 // We attempt to drain 'limit' messages twice, do one
144144 // basic.get, and need one message to spare -> 2*limit + 1 + 1
145- QueueingConsumer c = publishLimitAndConsume (2 *limit + 1 + 1 ,
146- limit ,
147- queueCount );
145+ QueueingConsumer c = configure (limit , queueCount , 2 *limit + 1 + 1 );
148146
149147 if (txMode ) {
150148 channel .txSelect ();
@@ -195,37 +193,32 @@ protected Delivery ack(QueueingConsumer c, boolean multiAck)
195193 return last ;
196194 }
197195
198- protected QueueingConsumer publishLimitAndConsume (int messages ,
199- int limit ,
200- int queueCount )
196+ protected QueueingConsumer configure (int limit ,
197+ int queueCount ,
198+ int messages )
201199 throws IOException
202200 {
203- //we always declare a queue with name Q, so we can perform
204- //tests that operate on a specific queue
201+ channel .basicQos (limit );
202+
203+ QueueingConsumer c = new QueueingConsumer (channel );
204+
205+ //we always declare/bind/consume-from a queue with name Q, so
206+ //we can perform tests that operate on a specific queue
205207 channel .queueDeclare (Q , false , false , true , true , null );
206208 channel .queueBind (Q , "amq.fanout" , "" );
209+ channel .basicConsume (Q , false , c );
207210
208- //declare & bind remaining queues
209- List <String > queues = new ArrayList <String >();
211+ //declare/bind/consume-from remaining queues
210212 for (int i = 1 ; i < queueCount ; i ++) {
211213 AMQP .Queue .DeclareOk ok = channel .queueDeclare ();
212214 String queue = ok .getQueue ();
213215 channel .queueBind (queue , "amq.fanout" , "" );
214- queues . add (queue );
216+ channel . basicConsume (queue , false , c );
215217 }
216218
217219 //publish
218- fill (messages );
220+ fill (messages );
219221
220- //limit
221- channel .basicQos (limit );
222-
223- //consume
224- QueueingConsumer c = new QueueingConsumer (channel );
225- channel .basicConsume (Q , false , c );
226- for (String q : queues ) {
227- channel .basicConsume (q , false , c );
228- }
229222 return c ;
230223 }
231224
0 commit comments