|
35 | 35 | import com.rabbitmq.client.GetResponse; |
36 | 36 | import com.rabbitmq.client.QueueingConsumer; |
37 | 37 |
|
| 38 | +import java.util.HashSet; |
| 39 | +import java.util.Set; |
| 40 | + |
38 | 41 | public class Nack extends AbstractRejectTest { |
39 | 42 |
|
40 | 43 | public void testSingleNack() throws Exception { |
@@ -97,9 +100,7 @@ public void testMultiNack() throws Exception { |
97 | 100 | // requeue multi |
98 | 101 | channel.basicNack(tag2, true, true); |
99 | 102 |
|
100 | | - checkDelivery(c.nextDelivery(), m4, true); |
101 | | - checkDelivery(c.nextDelivery(), m3, true); |
102 | | - long tag3 = checkDelivery(c.nextDelivery(), m1, true); |
| 103 | + long tag3 = checkDeliveries(c, m1, m3, m4); |
103 | 104 |
|
104 | 105 | secondaryChannel.basicCancel(consumerTag); |
105 | 106 |
|
@@ -132,9 +133,29 @@ public void testNackAll() throws Exception { |
132 | 133 | QueueingConsumer c = new QueueingConsumer(secondaryChannel); |
133 | 134 | String consumerTag = secondaryChannel.basicConsume(q, true, c); |
134 | 135 |
|
135 | | - checkDelivery(c.nextDelivery(), m2, true); |
136 | | - checkDelivery(c.nextDelivery(), m1, true); |
| 136 | + checkDeliveries(c, m1, m2); |
137 | 137 |
|
138 | 138 | secondaryChannel.basicCancel(consumerTag); |
139 | 139 | } |
| 140 | + |
| 141 | + private long checkDeliveries(QueueingConsumer c, byte[]... messages) |
| 142 | + throws InterruptedException { |
| 143 | + |
| 144 | + Set<String> msgSet = new HashSet<String>(); |
| 145 | + for (byte[] message : messages) { |
| 146 | + msgSet.add(new String(message)); |
| 147 | + } |
| 148 | + |
| 149 | + long lastTag = -1; |
| 150 | + for(int x = 0; x < messages.length; x++) { |
| 151 | + QueueingConsumer.Delivery delivery = c.nextDelivery(); |
| 152 | + String m = new String(delivery.getBody()); |
| 153 | + assertTrue("Unexpected message", msgSet.remove(m)); |
| 154 | + checkDelivery(delivery, m.getBytes(), true); |
| 155 | + lastTag = delivery.getEnvelope().getDeliveryTag(); |
| 156 | + } |
| 157 | + |
| 158 | + assertTrue(msgSet.isEmpty()); |
| 159 | + return lastTag; |
| 160 | + } |
140 | 161 | } |
0 commit comments