Skip to content

Commit 0a52b07

Browse files
author
Matthew Sackman
committed
Use a marker message to ensure the absense of duplicate messages
1 parent 310801f commit 0a52b07

File tree

1 file changed

+15
-7
lines changed

1 file changed

+15
-7
lines changed

test/src/com/rabbitmq/client/test/functional/ExchangeExchangeBindings.java

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,13 @@
3636
import com.rabbitmq.client.AMQP;
3737
import com.rabbitmq.client.QueueingConsumer;
3838
import com.rabbitmq.client.ShutdownSignalException;
39+
import com.rabbitmq.client.QueueingConsumer.Delivery;
3940
import com.rabbitmq.client.test.BrokerTestCase;
4041

4142
public class ExchangeExchangeBindings extends BrokerTestCase {
4243

4344
private static final int TIMEOUT = 1000;
45+
private static final byte[] MARKER = "MARK".getBytes();
4446

4547
private final String[] queues = new String[] { "q0", "q1", "q2" };
4648
private final String[] exchanges = new String[] { "e0", "e1", "e2" };
@@ -51,6 +53,11 @@ public class ExchangeExchangeBindings extends BrokerTestCase {
5153
private QueueingConsumer[] consumers = new QueueingConsumer[] { null, null,
5254
null };
5355

56+
private void publishWithMarker(String x, String rk) throws IOException {
57+
basicPublishVolatile(x, rk);
58+
basicPublishVolatile(MARKER, x, rk);
59+
}
60+
5461
@Override
5562
protected void createResources() throws IOException {
5663
for (String q : queues) {
@@ -84,7 +91,8 @@ private void consumeExactly(QueueingConsumer consumer, int n)
8491
for (; n > 0; --n) {
8592
assertNotNull(consumer.nextDelivery(TIMEOUT));
8693
}
87-
assertNull(consumer.nextDelivery(0));
94+
Delivery markerDelivery = consumer.nextDelivery(TIMEOUT);
95+
assertEquals(new String(MARKER), new String(markerDelivery.getBody()));
8896
}
8997

9098
public void testBindingCreationDeletion() throws IOException {
@@ -108,16 +116,16 @@ public void testBindingCreationDeletion() throws IOException {
108116
*/
109117
public void testSimpleChains() throws IOException, ShutdownSignalException,
110118
InterruptedException {
111-
basicPublishVolatile("e0", "");
119+
publishWithMarker("e0", "");
112120
consumeExactly(consumers[0], 1);
113121

114122
channel.exchangeBind("e0", "e1", "");
115-
basicPublishVolatile("e1", "");
123+
publishWithMarker("e1", "");
116124
consumeExactly(consumers[0], 1);
117125
consumeExactly(consumers[1], 1);
118126

119127
channel.exchangeBind("e1", "e2", "");
120-
basicPublishVolatile("e2", "");
128+
publishWithMarker("e2", "");
121129
consumeExactly(consumers[0], 1);
122130
consumeExactly(consumers[1], 1);
123131
consumeExactly(consumers[2], 1);
@@ -136,13 +144,13 @@ public void testSimpleChains() throws IOException, ShutdownSignalException,
136144
public void testDuplicateQueueDestinations() throws IOException,
137145
ShutdownSignalException, InterruptedException {
138146
channel.queueBind("q1", "e0", "");
139-
basicPublishVolatile("e0", "");
147+
publishWithMarker("e0", "");
140148
consumeExactly(consumers[0], 1);
141149
consumeExactly(consumers[1], 1);
142150

143151
channel.exchangeBind("e0", "e1", "");
144152

145-
basicPublishVolatile("e1", "");
153+
publishWithMarker("e1", "");
146154
consumeExactly(consumers[0], 1);
147155
consumeExactly(consumers[1], 1);
148156

@@ -162,7 +170,7 @@ public void testExchangeRoutingLoop() throws IOException,
162170
channel.exchangeBind("e2", "e0", "");
163171

164172
for (String e : exchanges) {
165-
basicPublishVolatile(e, "");
173+
publishWithMarker(e, "");
166174
for (QueueingConsumer c : consumers) {
167175
consumeExactly(c, 1);
168176
}

0 commit comments

Comments
 (0)