Skip to content

Commit 25c7ca8

Browse files
committed
merge bug24180 into default
2 parents d74d719 + f8ffe90 commit 25c7ca8

15 files changed

+173
-542
lines changed

test/src/com/rabbitmq/client/test/BrokerTestCase.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import com.rabbitmq.client.AlreadyClosedException;
3232
import com.rabbitmq.client.impl.ShutdownNotifierComponent;
3333
import com.rabbitmq.client.AMQP;
34+
import com.rabbitmq.tools.Host;
3435

3536
public class BrokerTestCase extends TestCase
3637
{
@@ -81,6 +82,14 @@ protected void releaseResources()
8182
throws IOException
8283
{}
8384

85+
protected void restart()
86+
throws IOException
87+
{
88+
tearDown();
89+
Host.executeCommand("cd ../rabbitmq-test; make restart-app");
90+
setUp();
91+
}
92+
8493
public void openConnection()
8594
throws IOException
8695
{
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
// The contents of this file are subject to the Mozilla Public License
2+
// Version 1.1 (the "License"); you may not use this file except in
3+
// compliance with the License. You may obtain a copy of the License
4+
// at http://www.mozilla.org/MPL/
5+
//
6+
// Software distributed under the License is distributed on an "AS IS"
7+
// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
8+
// the License for the specific language governing rights and
9+
// limitations under the License.
10+
//
11+
// The Original Code is RabbitMQ.
12+
//
13+
// The Initial Developer of the Original Code is VMware, Inc.
14+
// Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
15+
//
16+
17+
package com.rabbitmq.client.test;
18+
19+
import com.rabbitmq.client.ConfirmListener;
20+
import com.rabbitmq.client.MessageProperties;
21+
22+
import java.io.IOException;
23+
import java.util.Collections;
24+
import java.util.SortedSet;
25+
import java.util.TreeSet;
26+
27+
public class ConfirmBase extends BrokerTestCase {
28+
29+
protected SortedSet<Long> unconfirmedSet;
30+
31+
@Override
32+
protected void setUp() throws IOException {
33+
super.setUp();
34+
unconfirmedSet = Collections.synchronizedSortedSet(new TreeSet<Long>());
35+
channel.setConfirmListener(new ConfirmListener() {
36+
public void handleAck(long seqNo, boolean multiple) {
37+
if (!unconfirmedSet.contains(seqNo)) {
38+
fail("got duplicate ack: " + seqNo);
39+
}
40+
if (multiple) {
41+
unconfirmedSet.headSet(seqNo + 1).clear();
42+
} else {
43+
unconfirmedSet.remove(seqNo);
44+
}
45+
}
46+
47+
public void handleNack(long seqNo, boolean multiple) {
48+
fail("got a nack");
49+
}
50+
});
51+
channel.confirmSelect();
52+
}
53+
54+
protected void waitAcks() throws InterruptedException {
55+
while (unconfirmedSet.size() > 0)
56+
Thread.sleep(10);
57+
}
58+
59+
protected void publish(String exchangeName, String queueName,
60+
boolean persistent, boolean mandatory,
61+
boolean immediate)
62+
throws IOException {
63+
unconfirmedSet.add(channel.getNextPublishSeqNo());
64+
channel.basicPublish(exchangeName, queueName, mandatory, immediate,
65+
persistent ? MessageProperties.PERSISTENT_BASIC
66+
: MessageProperties.BASIC,
67+
"nop".getBytes());
68+
}
69+
70+
}

test/src/com/rabbitmq/client/test/functional/BindingLifecycleBase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ protected void doAutoDelete(boolean durable, int queues) throws IOException {
108108
}
109109
}
110110

111-
111+
@Override
112112
protected void restart() throws IOException {
113113
}
114114

test/src/com/rabbitmq/client/test/functional/Confirm.java

Lines changed: 5 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -19,38 +19,24 @@
1919

2020
import com.rabbitmq.client.test.BrokerTestCase;
2121
import com.rabbitmq.client.AMQP;
22-
import com.rabbitmq.client.ConfirmListener;
2322
import com.rabbitmq.client.Channel;
2423
import com.rabbitmq.client.DefaultConsumer;
2524
import com.rabbitmq.client.GetResponse;
26-
import com.rabbitmq.client.MessageProperties;
25+
import com.rabbitmq.client.test.ConfirmBase;
2726

2827
import java.io.IOException;
2928
import java.util.Collections;
3029
import java.util.Map;
31-
import java.util.SortedSet;
32-
import java.util.TreeSet;
3330

34-
public class Confirm extends BrokerTestCase
31+
public class Confirm extends ConfirmBase
3532
{
36-
final static int NUM_MESSAGES = 1000;
33+
private final static int NUM_MESSAGES = 1000;
34+
3735
private static final String TTL_ARG = "x-message-ttl";
38-
private SortedSet<Long> unconfirmedSet;
3936

4037
@Override
4138
protected void setUp() throws IOException {
4239
super.setUp();
43-
unconfirmedSet =
44-
Collections.synchronizedSortedSet(new TreeSet<Long>());
45-
channel.setConfirmListener(new ConfirmListener() {
46-
public void handleAck(long seqNo, boolean multiple) {
47-
Confirm.this.handleAck(seqNo, multiple);
48-
}
49-
50-
public void handleNack(long seqNo, boolean multiple) {
51-
fail("got a nack");
52-
}
53-
});
5440
channel.confirmSelect();
5541
channel.queueDeclare("confirm-test", true, true, false, null);
5642
channel.basicConsume("confirm-test", true,
@@ -222,8 +208,7 @@ public void testConfirmSelect()
222208
ch.confirmSelect();
223209
}
224210

225-
/* Publish NUM_MESSAGES persistent messages and wait for
226-
* confirmations. */
211+
/* Publish NUM_MESSAGES messages and wait for confirmations. */
227212
public void confirmTest(String exchange, String queueName,
228213
boolean persistent, boolean mandatory,
229214
boolean immediate)
@@ -240,34 +225,10 @@ private void publishN(String exchangeName, String queueName,
240225
throws IOException
241226
{
242227
for (long i = 0; i < NUM_MESSAGES; i++) {
243-
unconfirmedSet.add(channel.getNextPublishSeqNo());
244228
publish(exchangeName, queueName, persistent, mandatory, immediate);
245229
}
246230
}
247231

248-
249-
private void publish(String exchangeName, String queueName,
250-
boolean persistent, boolean mandatory,
251-
boolean immediate)
252-
throws IOException
253-
{
254-
channel.basicPublish(exchangeName, queueName, mandatory, immediate,
255-
persistent ? MessageProperties.PERSISTENT_BASIC
256-
: MessageProperties.BASIC,
257-
"nop".getBytes());
258-
}
259-
260-
private void handleAck(long msgSeqNo, boolean multiple) {
261-
if (!unconfirmedSet.contains(msgSeqNo)) {
262-
fail("got duplicate ack: " + msgSeqNo);
263-
}
264-
if (multiple) {
265-
unconfirmedSet.headSet(msgSeqNo + 1).clear();
266-
} else {
267-
unconfirmedSet.remove(msgSeqNo);
268-
}
269-
}
270-
271232
private void basicRejectCommon(boolean requeue)
272233
throws IOException
273234
{
@@ -280,9 +241,4 @@ private void basicRejectCommon(boolean requeue)
280241
channel.basicReject(dtag, requeue);
281242
}
282243
}
283-
284-
private void waitAcks() throws InterruptedException {
285-
while (unconfirmedSet.size() > 0)
286-
Thread.sleep(10);
287-
}
288244
}

test/src/com/rabbitmq/client/test/functional/QosTests.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -355,6 +355,21 @@ public void testFlow() throws IOException
355355
drain(c, 1);
356356
}
357357

358+
public void testLimitAndFlow() throws IOException
359+
{
360+
channel.basicQos(1);
361+
QueueingConsumer c = new QueueingConsumer(channel);
362+
declareBindConsume(c);
363+
channel.flow(false);
364+
fill(3);
365+
drain(c, 0);
366+
channel.flow(true);
367+
ack(drain(c, 1), false);
368+
drain(c, 1);
369+
channel.basicQos(0);
370+
drain(c, 1);
371+
}
372+
358373
public void testNoConsumers() throws Exception {
359374
String q = declareBind(channel);
360375
fill(1);

test/src/com/rabbitmq/client/test/server/RestartBase.java renamed to test/src/com/rabbitmq/client/test/server/MessageRecovery.java

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -12,25 +12,25 @@
1212
//
1313
// The Initial Developer of the Original Code is VMware, Inc.
1414
// Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
15-
//
16-
1715

1816
package com.rabbitmq.client.test.server;
1917

20-
import com.rabbitmq.client.test.BrokerTestCase;
18+
import com.rabbitmq.client.test.ConfirmBase;
2119

2220
import java.io.IOException;
2321

24-
import com.rabbitmq.tools.Host;
25-
26-
public class RestartBase extends BrokerTestCase
22+
public class MessageRecovery extends ConfirmBase
2723
{
28-
protected void restart()
29-
throws IOException
30-
{
31-
tearDown();
32-
Host.executeCommand("cd ../rabbitmq-test; make restart-app");
33-
setUp();
24+
25+
private final static String Q = "recovery-test";
26+
27+
public void test() throws IOException, InterruptedException {
28+
channel.queueDeclare(Q, true, false, false, null);
29+
publish("", Q, true, false, false);
30+
waitAcks();
31+
restart();
32+
assertDelivered(Q, 1);
33+
channel.queueDelete(Q);
3434
}
3535

3636
}

0 commit comments

Comments
 (0)