|
34 | 34 | import com.rabbitmq.client.test.BrokerTestCase; |
35 | 35 |
|
36 | 36 | import com.rabbitmq.client.AMQP; |
| 37 | +import com.rabbitmq.client.Channel; |
| 38 | +import com.rabbitmq.client.Envelope; |
| 39 | +import com.rabbitmq.client.GetResponse; |
37 | 40 | import com.rabbitmq.client.QueueingConsumer; |
38 | 41 | import com.rabbitmq.client.QueueingConsumer.Delivery; |
39 | 42 |
|
|
43 | 46 | public class Reject extends BrokerTestCase |
44 | 47 | { |
45 | 48 |
|
| 49 | + protected Channel secondaryChannel; |
| 50 | + |
| 51 | + protected void setUp() |
| 52 | + throws IOException |
| 53 | + { |
| 54 | + super.setUp(); |
| 55 | + secondaryChannel = connection.createChannel(); |
| 56 | + |
| 57 | + } |
| 58 | + |
| 59 | + protected void tearDown() |
| 60 | + throws IOException |
| 61 | + { |
| 62 | + if (secondaryChannel != null) { |
| 63 | + secondaryChannel.abort(); |
| 64 | + secondaryChannel = null; |
| 65 | + } |
| 66 | + super.tearDown(); |
| 67 | + } |
| 68 | + |
46 | 69 | protected long checkDelivery(Delivery d, byte[] msg, boolean redelivered) |
47 | 70 | { |
48 | 71 | assertNotNull(d); |
49 | | - assertTrue(Arrays.equals(msg, d.getBody())); |
50 | | - assertEquals(d.getEnvelope().isRedeliver(), redelivered); |
51 | | - return d.getEnvelope().getDeliveryTag(); |
| 72 | + return checkDelivery(d.getEnvelope(), d.getBody(), msg, redelivered); |
| 73 | + } |
| 74 | + |
| 75 | + protected long checkDelivery(GetResponse r, byte[] msg, boolean redelivered) |
| 76 | + { |
| 77 | + assertNotNull(r); |
| 78 | + return checkDelivery(r.getEnvelope(), r.getBody(), msg, redelivered); |
| 79 | + } |
| 80 | + |
| 81 | + protected long checkDelivery(Envelope e, byte[] m, |
| 82 | + byte[] msg, boolean redelivered) |
| 83 | + { |
| 84 | + assertNotNull(e); |
| 85 | + assertTrue(Arrays.equals(m, msg)); |
| 86 | + assertEquals(e.isRedeliver(), redelivered); |
| 87 | + return e.getDeliveryTag(); |
52 | 88 | } |
53 | 89 |
|
54 | 90 | public void testReject() |
55 | 91 | throws IOException, InterruptedException |
56 | 92 | { |
57 | 93 | String q = channel.queueDeclare("", false, true, false, null).getQueue(); |
58 | 94 |
|
59 | | - QueueingConsumer c = new QueueingConsumer(channel); |
60 | | - String consumerTag = channel.basicConsume(q, false, c); |
61 | | - |
62 | 95 | byte[] m1 = "1".getBytes(); |
63 | 96 | byte[] m2 = "2".getBytes(); |
64 | 97 |
|
65 | 98 | basicPublishVolatile(m1, q); |
66 | 99 | basicPublishVolatile(m2, q); |
67 | 100 |
|
68 | | - long tag1 = checkDelivery(c.nextDelivery(), m1, false); |
69 | | - long tag2 = checkDelivery(c.nextDelivery(), m2, false); |
| 101 | + long tag1 = checkDelivery(channel.basicGet(q, false), m1, false); |
| 102 | + long tag2 = checkDelivery(channel.basicGet(q, false), m2, false); |
| 103 | + QueueingConsumer c = new QueueingConsumer(secondaryChannel); |
| 104 | + String consumerTag = channel.basicConsume(q, false, c); |
70 | 105 | channel.basicReject(tag2, true); |
71 | 106 | long tag3 = checkDelivery(c.nextDelivery(), m2, true); |
72 | 107 | channel.basicCancel(consumerTag); |
|
0 commit comments