Skip to content

Commit 21eccc3

Browse files
Add a test for confirm mode and listener recovery
1 parent aa67826 commit 21eccc3

File tree

1 file changed

+26
-0
lines changed

1 file changed

+26
-0
lines changed

test/src/com/rabbitmq/client/test/functional/ConnectionRecovery.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import java.util.UUID;
1313
import java.util.concurrent.CountDownLatch;
1414
import java.util.concurrent.TimeUnit;
15+
import java.util.concurrent.TimeoutException;
1516
import java.util.concurrent.atomic.AtomicInteger;
1617

1718
public class ConnectionRecovery extends BrokerTestCase {
@@ -110,6 +111,31 @@ public void handleReturn(int replyCode, String replyText, String exchange,
110111
assertTrue(latch.await(150, TimeUnit.MILLISECONDS));
111112
}
112113

114+
public void testConfirmListenerRecovery() throws IOException, InterruptedException, TimeoutException {
115+
int n = 10;
116+
String q = channel.queueDeclare(UUID.randomUUID().toString(), false, false, false, null).getQueue();
117+
final CountDownLatch latch = new CountDownLatch(n);
118+
channel.addConfirmListener(new ConfirmListener() {
119+
@Override
120+
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
121+
latch.countDown();
122+
}
123+
124+
@Override
125+
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
126+
}
127+
});
128+
channel.confirmSelect();
129+
closeAndWaitForShutdown(connection);
130+
waitForRecovery();
131+
expectChannelRecovery(channel);
132+
for (int i = 0; i < n; i++) {
133+
channel.basicPublish("", q, true, false, null, "mandatory1".getBytes());
134+
}
135+
channel.waitForConfirms(200);
136+
assertTrue(latch.await(50, TimeUnit.MILLISECONDS));
137+
}
138+
113139
public void testClientNamedQueueRecovery() throws IOException, InterruptedException {
114140
Channel ch = connection.createChannel();
115141
String q = "java-client.test.recovery.q1";

0 commit comments

Comments
 (0)