@@ -159,6 +159,19 @@ public void testFairness()
159159
160160 }
161161
162+ public void testSetLimitAfterConsume ()
163+ throws IOException
164+ {
165+ QueueingConsumer c = new QueueingConsumer (channel );
166+ String queue = declareBindConsume (c );
167+ channel .basicQos (1 );
168+ fill (2 );
169+ //We actually only guarantee that the limit takes effect
170+ //*eventually*, so this can in fact fail. It's pretty unlikely
171+ //though.
172+ drain (c , 1 );
173+ }
174+
162175 protected void runLimitTests (int limit ,
163176 boolean multiAck ,
164177 boolean txMode ,
@@ -249,11 +262,7 @@ protected List<String> configure(QueueingConsumer c,
249262 //declare/bind/consume-from queues
250263 List <String > queues = new ArrayList <String >();
251264 for (int i = 0 ; i < queueCount ; i ++) {
252- AMQP .Queue .DeclareOk ok = channel .queueDeclare ();
253- String queue = ok .getQueue ();
254- queues .add (queue );
255- channel .queueBind (queue , "amq.fanout" , "" );
256- channel .basicConsume (queue , false , c );
265+ queues .add (declareBindConsume (c ));
257266 }
258267
259268 //publish
@@ -262,6 +271,16 @@ protected List<String> configure(QueueingConsumer c,
262271 return queues ;
263272 }
264273
274+ protected String declareBindConsume (QueueingConsumer c )
275+ throws IOException
276+ {
277+ AMQP .Queue .DeclareOk ok = channel .queueDeclare ();
278+ String queue = ok .getQueue ();
279+ channel .queueBind (queue , "amq.fanout" , "" );
280+ channel .basicConsume (queue , false , c );
281+ return queue ;
282+ }
283+
265284 protected void ackDelivery (Delivery d , boolean multiple )
266285 throws IOException
267286 {
0 commit comments