|
38 | 38 | import java.util.List; |
39 | 39 | import java.util.Queue; |
40 | 40 |
|
| 41 | +import com.rabbitmq.client.Channel; |
41 | 42 | import com.rabbitmq.client.GetResponse; |
42 | 43 | import com.rabbitmq.client.QueueingConsumer; |
43 | 44 | import com.rabbitmq.client.QueueingConsumer.Delivery; |
@@ -180,7 +181,6 @@ public void testConsumerLifecycle() |
180 | 181 | channel.queueDelete(queue); |
181 | 182 | } |
182 | 183 |
|
183 | | - |
184 | 184 | public void testSetLimitAfterConsume() |
185 | 185 | throws IOException |
186 | 186 | { |
@@ -225,6 +225,30 @@ public void testLimitedToUnlimited() |
225 | 225 | drain(c, 2); |
226 | 226 | } |
227 | 227 |
|
| 228 | + public void testLimitingMultipleChannels() |
| 229 | + throws IOException |
| 230 | + { |
| 231 | + Channel ch1 = connection.createChannel(); |
| 232 | + Channel ch2 = connection.createChannel(); |
| 233 | + QueueingConsumer c1 = new QueueingConsumer(ch1); |
| 234 | + QueueingConsumer c2 = new QueueingConsumer(ch2); |
| 235 | + String q1 = declareBindConsume(ch1, c1); |
| 236 | + String q2 = declareBindConsume(ch2, c2); |
| 237 | + ch1.basicConsume(q2, false, c1); |
| 238 | + ch2.basicConsume(q1, false, c2); |
| 239 | + ch1.basicQos(1); |
| 240 | + ch2.basicQos(1); |
| 241 | + fill(5); |
| 242 | + Queue<Delivery> d1 = drain(c1, 1); |
| 243 | + Queue<Delivery> d2 = drain(c2, 1); |
| 244 | + ackDelivery(ch1, d1.remove(), true); |
| 245 | + ackDelivery(ch2, d2.remove(), true); |
| 246 | + drain(c1, 1); |
| 247 | + drain(c2, 1); |
| 248 | + ch1.close(); |
| 249 | + ch2.close(); |
| 250 | + } |
| 251 | + |
228 | 252 | protected void runLimitTests(int limit, |
229 | 253 | boolean multiAck, |
230 | 254 | boolean txMode, |
@@ -325,17 +349,29 @@ protected Queue<Delivery> configure(QueueingConsumer c, |
325 | 349 | protected String declareBindConsume(QueueingConsumer c) |
326 | 350 | throws IOException |
327 | 351 | { |
328 | | - AMQP.Queue.DeclareOk ok = channel.queueDeclare(); |
| 352 | + return declareBindConsume(channel, c); |
| 353 | + } |
| 354 | + |
| 355 | + protected String declareBindConsume(Channel ch, QueueingConsumer c) |
| 356 | + throws IOException |
| 357 | + { |
| 358 | + AMQP.Queue.DeclareOk ok = ch.queueDeclare(); |
329 | 359 | String queue = ok.getQueue(); |
330 | | - channel.queueBind(queue, "amq.fanout", ""); |
331 | | - channel.basicConsume(queue, false, c); |
| 360 | + ch.queueBind(queue, "amq.fanout", ""); |
| 361 | + ch.basicConsume(queue, false, c); |
332 | 362 | return queue; |
333 | 363 | } |
334 | 364 |
|
335 | 365 | protected void ackDelivery(Delivery d, boolean multiple) |
336 | 366 | throws IOException |
337 | 367 | { |
338 | | - channel.basicAck(d.getEnvelope().getDeliveryTag(), multiple); |
| 368 | + ackDelivery(channel, d, multiple); |
| 369 | + } |
| 370 | + |
| 371 | + protected void ackDelivery(Channel ch, Delivery d, boolean multiple) |
| 372 | + throws IOException |
| 373 | + { |
| 374 | + ch.basicAck(d.getEnvelope().getDeliveryTag(), multiple); |
339 | 375 | } |
340 | 376 |
|
341 | 377 | } |
0 commit comments