4646public class QosTests extends BrokerTestCase
4747{
4848
49- protected final String Q = "QosTests" ;
50-
5149 protected void setUp ()
5250 throws IOException
5351 {
@@ -99,7 +97,8 @@ public void testMessageLimitGlobalFails()
9997 public void testMessageLimitUnlimited ()
10098 throws IOException
10199 {
102- QueueingConsumer c = configure (0 , 1 , 2 );
100+ QueueingConsumer c = new QueueingConsumer (channel );
101+ configure (c , 0 , 1 , 2 );
103102 drain (c , 2 );
104103 }
105104
@@ -140,9 +139,13 @@ protected void runLimitTestsHelper(int limit,
140139 throws IOException , InterruptedException
141140 {
142141
142+ QueueingConsumer c = new QueueingConsumer (channel );
143+
143144 // We attempt to drain 'limit' messages twice, do one
144- // basic.get, and need one message to spare -> 2*limit + 1 + 1
145- QueueingConsumer c = configure (limit , queueCount , 2 *limit + 1 + 1 );
145+ // basic.get per queue, and need one message to spare
146+ //-> 2*limit + 1*queueCount + 1
147+ List <String > queues = configure (c , limit , queueCount ,
148+ 2 *limit + 1 *queueCount + 1 );
146149
147150 if (txMode ) {
148151 channel .txSelect ();
@@ -152,8 +155,12 @@ protected void runLimitTestsHelper(int limit,
152155 drain (c , limit );
153156
154157 //is basic.get not limited?
155- GetResponse r = channel .basicGet (Q , false );
156- assertNotNull (r );
158+ List <Long > tags = new ArrayList <Long >();
159+ for (String q : queues ) {
160+ GetResponse r = channel .basicGet (q , false );
161+ assertNotNull (r );
162+ tags .add (r .getEnvelope ().getDeliveryTag ());
163+ }
157164
158165 //are acks handled correctly?
159166 //and does the basic.get above have no effect on limiting?
@@ -171,7 +178,9 @@ protected void runLimitTestsHelper(int limit,
171178 for (int i = 0 ; i < limit ; i ++) {
172179 c .nextDelivery ();
173180 }
174- channel .basicAck (r .getEnvelope ().getDeliveryTag (), false );
181+ for (long t : tags ) {
182+ channel .basicAck (t , false );
183+ }
175184 if (txMode ) {
176185 channel .txCommit ();
177186 }
@@ -193,33 +202,28 @@ protected Delivery ack(QueueingConsumer c, boolean multiAck)
193202 return last ;
194203 }
195204
196- protected QueueingConsumer configure (int limit ,
197- int queueCount ,
198- int messages )
205+ protected List <String > configure (QueueingConsumer c ,
206+ int limit ,
207+ int queueCount ,
208+ int messages )
199209 throws IOException
200210 {
201211 channel .basicQos (limit );
202212
203- QueueingConsumer c = new QueueingConsumer (channel );
204-
205- //we always declare/bind/consume-from a queue with name Q, so
206- //we can perform tests that operate on a specific queue
207- channel .queueDeclare (Q , false , false , true , true , null );
208- channel .queueBind (Q , "amq.fanout" , "" );
209- channel .basicConsume (Q , false , c );
210-
211- //declare/bind/consume-from remaining queues
212- for (int i = 1 ; i < queueCount ; i ++) {
213+ //declare/bind/consume-from queues
214+ List <String > queues = new ArrayList <String >();
215+ for (int i = 0 ; i < queueCount ; i ++) {
213216 AMQP .Queue .DeclareOk ok = channel .queueDeclare ();
214217 String queue = ok .getQueue ();
218+ queues .add (queue );
215219 channel .queueBind (queue , "amq.fanout" , "" );
216220 channel .basicConsume (queue , false , c );
217221 }
218222
219223 //publish
220224 fill (messages );
221225
222- return c ;
226+ return queues ;
223227 }
224228
225229 protected void ackDelivery (Delivery d , boolean multiple )
0 commit comments