|
22 | 22 | import com.rabbitmq.client.MessageProperties; |
23 | 23 | import com.rabbitmq.client.test.BrokerTestCase; |
24 | 24 | import java.io.IOException; |
| 25 | +import java.util.ArrayList; |
25 | 26 | import java.util.HashMap; |
26 | 27 | import java.util.Map; |
27 | 28 |
|
@@ -88,6 +89,37 @@ public void testDlxTailDurable() throws IOException, InterruptedException { |
88 | 89 | dlxTail(true); |
89 | 90 | } |
90 | 91 |
|
| 92 | + public void testMaxlenZero() throws IOException, InterruptedException { |
| 93 | + Map<String, Object> args = new HashMap<String, Object>(); |
| 94 | + args.put("x-max-length", 0); |
| 95 | + channel.queueDeclare(q, false, true, true, args); |
| 96 | + syncPublish(null, "msg"); |
| 97 | + assertNull(channel.basicGet(q, true)); |
| 98 | + } |
| 99 | + |
| 100 | + public void testRequeue() throws IOException, InterruptedException { |
| 101 | + declareQueue(false, false); |
| 102 | + ArrayList<Long> tags = new ArrayList<Long>(MAXLENGTH);; |
| 103 | + fill(false, false, false); |
| 104 | + getUnacked(MAXLENGTH, tags); |
| 105 | + fill(false, false, false); |
| 106 | + channel.basicNack(tags.get(0), false, true); |
| 107 | + channel.basicNack(tags.get(MAXLENGTH - 1), true, true); |
| 108 | + assertHead(MAXLENGTH, "msg1", q); |
| 109 | + } |
| 110 | + |
| 111 | + public void testRequeueWithDlx() throws IOException, InterruptedException { |
| 112 | + setupDlx(false); |
| 113 | + ArrayList<Long> tags = new ArrayList<Long>(MAXLENGTH);; |
| 114 | + fill(false, false, true); |
| 115 | + getUnacked(MAXLENGTH, tags); |
| 116 | + fill(false, false, true); |
| 117 | + channel.basicNack(tags.get(0), false, true); |
| 118 | + channel.basicNack(tags.get(MAXLENGTH - 1), true, true); |
| 119 | + assertHead(MAXLENGTH, "msg1", q); |
| 120 | + assertHead(MAXLENGTH, "msg1", "DLQ"); |
| 121 | + } |
| 122 | + |
91 | 123 | public void dlxHead(boolean persistent) throws IOException, InterruptedException { |
92 | 124 | AMQP.BasicProperties props = setupDlx(persistent); |
93 | 125 | fill(persistent, false, true); |
@@ -139,4 +171,10 @@ private void assertHead(int expectedLength, String expectedPayload, String queue |
139 | 171 | assertEquals(expectedPayload, new String(head.getBody())); |
140 | 172 | assertEquals(expectedLength, head.getMessageCount() + 1); |
141 | 173 | } |
| 174 | + |
| 175 | + private void getUnacked(int howMany, ArrayList<Long> acks) throws IOException { |
| 176 | + for (;howMany > 0; howMany --){ |
| 177 | + acks.add(channel.basicGet(q, false).getEnvelope().getDeliveryTag()); |
| 178 | + } |
| 179 | + } |
142 | 180 | } |
0 commit comments