Skip to content

Commit ce5b015

Browse files
author
Alexandru Scvortov
committed
abstract timeout waitForConfirms into a separate file
Oh dear, we forgot to remove ConfirmBase before. We're using it for something now, at least.
1 parent 28efe08 commit ce5b015

File tree

3 files changed

+35
-78
lines changed

3 files changed

+35
-78
lines changed

test/src/com/rabbitmq/client/test/ConfirmBase.java

Lines changed: 26 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -11,60 +11,40 @@
1111
// The Original Code is RabbitMQ.
1212
//
1313
// The Initial Developer of the Original Code is VMware, Inc.
14-
// Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
14+
// Copyright (c) 2011 VMware, Inc. All rights reserved.
1515
//
1616

1717
package com.rabbitmq.client.test;
1818

19-
import com.rabbitmq.client.ConfirmListener;
20-
import com.rabbitmq.client.MessageProperties;
19+
import com.rabbitmq.client.ShutdownSignalException;
2120

2221
import java.io.IOException;
23-
import java.util.Collections;
24-
import java.util.SortedSet;
25-
import java.util.TreeSet;
22+
import java.util.concurrent.ExecutionException;
23+
import java.util.concurrent.Executors;
24+
import java.util.concurrent.FutureTask;
25+
import java.util.concurrent.TimeoutException;
26+
import java.util.concurrent.TimeUnit;
2627

2728
public class ConfirmBase extends BrokerTestCase {
28-
29-
protected SortedSet<Long> unconfirmedSet;
30-
31-
@Override
32-
protected void setUp() throws IOException {
33-
super.setUp();
34-
unconfirmedSet = Collections.synchronizedSortedSet(new TreeSet<Long>());
35-
channel.addConfirmListener(new ConfirmListener() {
36-
public void handleAck(long seqNo, boolean multiple) {
37-
if (!unconfirmedSet.contains(seqNo)) {
38-
fail("got duplicate ack: " + seqNo);
39-
}
40-
if (multiple) {
41-
unconfirmedSet.headSet(seqNo + 1).clear();
42-
} else {
43-
unconfirmedSet.remove(seqNo);
29+
protected void waitForConfirms()
30+
throws InterruptedException, TimeoutException
31+
{
32+
try {
33+
FutureTask<?> waiter = new FutureTask<Object>(new Runnable() {
34+
public void run() {
35+
try {
36+
channel.waitForConfirmsOrDie();
37+
} catch (IOException e) {
38+
throw (ShutdownSignalException)e.getCause();
39+
} catch (InterruptedException e) {
40+
fail("test interrupted");
41+
}
4442
}
45-
}
46-
47-
public void handleNack(long seqNo, boolean multiple) {
48-
fail("got a nack");
49-
}
50-
});
51-
channel.confirmSelect();
43+
}, null);
44+
(Executors.newSingleThreadExecutor()).execute(waiter);
45+
waiter.get(10, TimeUnit.SECONDS);
46+
} catch (ExecutionException e) {
47+
throw (ShutdownSignalException)e.getCause();
48+
}
5249
}
53-
54-
protected void waitAcks() throws InterruptedException {
55-
while (unconfirmedSet.size() > 0)
56-
Thread.sleep(10);
57-
}
58-
59-
protected void publish(String exchangeName, String queueName,
60-
boolean persistent, boolean mandatory,
61-
boolean immediate)
62-
throws IOException {
63-
unconfirmedSet.add(channel.getNextPublishSeqNo());
64-
channel.basicPublish(exchangeName, queueName, mandatory, immediate,
65-
persistent ? MessageProperties.PERSISTENT_BASIC
66-
: MessageProperties.BASIC,
67-
"nop".getBytes());
68-
}
69-
7050
}

test/src/com/rabbitmq/client/test/functional/Confirm.java

Lines changed: 2 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -24,20 +24,16 @@
2424
import com.rabbitmq.client.GetResponse;
2525
import com.rabbitmq.client.MessageProperties;
2626
import com.rabbitmq.client.ShutdownSignalException;
27-
import com.rabbitmq.client.test.BrokerTestCase;
27+
import com.rabbitmq.client.test.ConfirmBase;
2828

2929
import java.io.IOException;
3030
import java.util.Collections;
3131
import java.util.Map;
3232
import java.util.SortedSet;
3333
import java.util.TreeSet;
34-
import java.util.concurrent.ExecutionException;
35-
import java.util.concurrent.Executors;
36-
import java.util.concurrent.FutureTask;
3734
import java.util.concurrent.TimeoutException;
38-
import java.util.concurrent.TimeUnit;
3935

40-
public class Confirm extends BrokerTestCase
36+
public class Confirm extends ConfirmBase
4137
{
4238
private final static int NUM_MESSAGES = 1000;
4339

@@ -318,26 +314,4 @@ protected void publish(String exchangeName, String queueName,
318314
: MessageProperties.BASIC,
319315
"nop".getBytes());
320316
}
321-
322-
protected void waitForConfirms()
323-
throws InterruptedException, TimeoutException
324-
{
325-
try {
326-
FutureTask<?> waiter = new FutureTask<Object>(new Runnable() {
327-
public void run() {
328-
try {
329-
channel.waitForConfirmsOrDie();
330-
} catch (IOException e) {
331-
throw (ShutdownSignalException)e.getCause();
332-
} catch (InterruptedException e) {
333-
fail("test interrupted");
334-
}
335-
}
336-
}, null);
337-
(Executors.newSingleThreadExecutor()).execute(waiter);
338-
waiter.get(10, TimeUnit.SECONDS);
339-
} catch (ExecutionException e) {
340-
throw (ShutdownSignalException)e.getCause();
341-
}
342-
}
343317
}

test/src/com/rabbitmq/client/test/server/MessageRecovery.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,22 +16,25 @@
1616
package com.rabbitmq.client.test.server;
1717

1818
import com.rabbitmq.client.MessageProperties;
19-
import com.rabbitmq.client.test.BrokerTestCase;
19+
import com.rabbitmq.client.test.ConfirmBase;
2020

2121
import java.io.IOException;
22+
import java.util.concurrent.TimeoutException;
2223

23-
public class MessageRecovery extends BrokerTestCase
24+
public class MessageRecovery extends ConfirmBase
2425
{
2526

2627
private final static String Q = "recovery-test";
2728

28-
public void testMessageRecovery() throws IOException, InterruptedException {
29+
public void testMessageRecovery()
30+
throws IOException, InterruptedException, TimeoutException
31+
{
2932
channel.confirmSelect();
3033
channel.queueDeclare(Q, true, false, false, null);
3134
channel.basicPublish("", Q, false, false,
3235
MessageProperties.PERSISTENT_BASIC,
3336
"nop".getBytes());
34-
channel.waitForConfirmsOrDie();
37+
waitForConfirms();
3538

3639
restart();
3740
assertDelivered(Q, 1);

0 commit comments

Comments
 (0)