Skip to content

Commit da8ce03

Browse files
committed
Use publish confirms in visibility effect test
The test started to fail on a 2-node 3.10 cluster while it was OK before. The test expected messages to be almost immediately available in the queues after publishing, which is not realistic, as publishing is asynchronous. This commit enables publish confirms and waits for the confirms to arrive before purging and checking the content of the queues, which is more realistic in terms of expectations. The bulk of the test remains asynchronous in case it would hang like it used to do sometimes. (cherry picked from commit 24fd595)
1 parent 54b7e8f commit da8ce03

File tree

1 file changed

+37
-6
lines changed

1 file changed

+37
-6
lines changed

src/test/java/com/rabbitmq/client/test/server/EffectVisibilityCrossNodeTest.java

Lines changed: 37 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,15 @@
1515

1616
package com.rabbitmq.client.test.server;
1717

18+
import static org.assertj.core.api.Assertions.assertThat;
1819
import static org.junit.Assert.assertEquals;
1920

2021
import java.io.IOException;
22+
import java.util.Iterator;
23+
import java.util.Set;
2124
import java.util.concurrent.*;
2225

26+
import java.util.concurrent.atomic.AtomicReference;
2327
import org.junit.Test;
2428

2529
import com.rabbitmq.client.test.functional.ClusteredTestBase;
@@ -31,16 +35,20 @@
3135
public class EffectVisibilityCrossNodeTest extends ClusteredTestBase {
3236
private final String[] queues = new String[QUEUES];
3337

38+
ExecutorService executorService;
39+
3440
@Override
3541
protected void createResources() throws IOException {
3642
for (int i = 0; i < queues.length ; i++) {
3743
queues[i] = alternateChannel.queueDeclare("", false, false, true, null).getQueue();
3844
alternateChannel.queueBind(queues[i], "amq.fanout", "");
3945
}
46+
executorService = Executors.newSingleThreadExecutor();
4047
}
4148

4249
@Override
4350
protected void releaseResources() throws IOException {
51+
executorService.shutdownNow();
4452
for (int i = 0; i < queues.length ; i++) {
4553
alternateChannel.queueDelete(queues[i]);
4654
}
@@ -53,24 +61,47 @@ protected void releaseResources() throws IOException {
5361
private static final byte[] msg = "".getBytes();
5462

5563
@Test public void effectVisibility() throws Exception {
56-
ExecutorService executorService = Executors.newSingleThreadExecutor();
57-
try {
64+
AtomicReference<CountDownLatch> confirmLatch = new AtomicReference<>();
65+
Set<Long> publishIds = ConcurrentHashMap.newKeySet();
66+
channel.addConfirmListener(
67+
(deliveryTag, multiple) -> {
68+
if (multiple) {
69+
Iterator<Long> iterator = publishIds.iterator();
70+
while (iterator.hasNext()) {
71+
long publishId = iterator.next();
72+
if (publishId <= deliveryTag) {
73+
iterator.remove();
74+
}
75+
}
76+
} else {
77+
publishIds.remove(deliveryTag);
78+
}
79+
if (publishIds.isEmpty()) {
80+
confirmLatch.get().countDown();
81+
}
82+
},
83+
(deliveryTag, multiple) -> {});
84+
// the test bulk is asynchronous because this test has a history of hanging
5885
Future<Void> task = executorService.submit(() -> {
86+
// we use publish confirm to make sure messages made it to the queues
87+
// before checking their content
88+
channel.confirmSelect();
5989
for (int i = 0; i < BATCHES; i++) {
6090
Thread.sleep(10); // to avoid flow control for the connection
91+
confirmLatch.set(new CountDownLatch(1));
6192
for (int j = 0; j < MESSAGES_PER_BATCH; j++) {
93+
long publishId = channel.getNextPublishSeqNo();
6294
channel.basicPublish("amq.fanout", "", null, msg);
95+
publishIds.add(publishId);
6396
}
97+
assertThat(confirmLatch.get().await(10, TimeUnit.SECONDS)).isTrue();
98+
publishIds.clear();
6499
for (int j = 0; j < queues.length; j++) {
65100
assertEquals(MESSAGES_PER_BATCH, channel.queuePurge(queues[j]).getMessageCount());
66101
}
67102
}
68103
return null;
69104
});
70105
task.get(1, TimeUnit.MINUTES);
71-
} finally {
72-
executorService.shutdownNow();
73-
executorService.awaitTermination(1, TimeUnit.SECONDS);
74-
}
75106
}
76107
}

0 commit comments

Comments
 (0)