Skip to content

Commit 2fb86cc

Browse files
committed
Add filtering + super stream test
1 parent 20282d9 commit 2fb86cc

File tree

1 file changed

+98
-0
lines changed

1 file changed

+98
-0
lines changed

src/test/java/com/rabbitmq/stream/impl/FilteringTest.java

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
import static com.rabbitmq.stream.impl.TestUtils.*;
1717
import static java.util.Collections.singletonMap;
18+
import static java.util.stream.Collectors.toList;
1819
import static org.assertj.core.api.Assertions.assertThat;
1920
import static org.assertj.core.api.Assertions.assertThatThrownBy;
2021

@@ -31,6 +32,7 @@
3132
import java.util.concurrent.CountDownLatch;
3233
import java.util.concurrent.atomic.AtomicInteger;
3334
import java.util.concurrent.atomic.AtomicLong;
35+
import java.util.concurrent.atomic.AtomicReference;
3436
import java.util.function.Supplier;
3537
import java.util.stream.IntStream;
3638
import org.junit.jupiter.api.AfterEach;
@@ -270,6 +272,102 @@ public void handleDelivery(
270272
});
271273
}
272274

275+
@Test
276+
void superStream(TestInfo info) throws Exception {
277+
repeatIfFailure(
278+
() -> {
279+
String superStream = TestUtils.streamName(info);
280+
int partitionCount = 3;
281+
Connection connection = new ConnectionFactory().newConnection();
282+
try {
283+
TestUtils.declareSuperStreamTopology(connection, superStream, partitionCount);
284+
285+
Producer producer =
286+
environment
287+
.producerBuilder()
288+
.superStream(superStream)
289+
.routing(msg -> msg.getApplicationProperties().get("customerId").toString())
290+
.producerBuilder()
291+
.filterValue(msg -> msg.getApplicationProperties().get("type").toString())
292+
.build();
293+
294+
List<String> customers =
295+
IntStream.range(0, 10)
296+
.mapToObj(ignored -> UUID.randomUUID().toString())
297+
.collect(toList());
298+
Random random = new Random();
299+
300+
List<String> filterValues = new ArrayList<>(Arrays.asList("invoice", "order", "claim"));
301+
Map<String, AtomicInteger> filterValueCount = new HashMap<>();
302+
AtomicReference<CountDownLatch> latch =
303+
new AtomicReference<>(new CountDownLatch(messageCount));
304+
Runnable insert =
305+
() -> {
306+
ConfirmationHandler confirmationHandler = ctx -> latch.get().countDown();
307+
IntStream.range(0, messageCount)
308+
.forEach(
309+
ignored -> {
310+
String filterValue =
311+
filterValues.get(random.nextInt(filterValues.size()));
312+
filterValueCount
313+
.computeIfAbsent(filterValue, k -> new AtomicInteger())
314+
.incrementAndGet();
315+
producer.send(
316+
producer
317+
.messageBuilder()
318+
.applicationProperties()
319+
.entry(
320+
"customerId",
321+
customers.get(random.nextInt(customers.size())))
322+
.entry("type", filterValue)
323+
.messageBuilder()
324+
.build(),
325+
confirmationHandler);
326+
});
327+
latchAssert(latch).completes(CONDITION_TIMEOUT);
328+
};
329+
330+
insert.run();
331+
332+
// second wave of messages, with only one, new filter value
333+
filterValues.clear();
334+
String filterValue = "refund";
335+
filterValues.add(filterValue);
336+
latch.set(new CountDownLatch(messageCount));
337+
insert.run();
338+
339+
AtomicInteger receivedMessageCount = new AtomicInteger(0);
340+
AtomicInteger filteredConsumedMessageCount = new AtomicInteger(0);
341+
Consumer consumer =
342+
environment
343+
.consumerBuilder()
344+
.superStream(superStream)
345+
.offset(OffsetSpecification.first())
346+
.filter()
347+
.values(filterValue)
348+
.postFilter(
349+
msg -> {
350+
receivedMessageCount.incrementAndGet();
351+
return filterValue.equals(msg.getApplicationProperties().get("type"));
352+
})
353+
.builder()
354+
.messageHandler(
355+
(context, message) -> {
356+
filteredConsumedMessageCount.incrementAndGet();
357+
})
358+
.build();
359+
360+
int expectedCount = filterValueCount.get(filterValue).get();
361+
waitAtMost(
362+
CONDITION_TIMEOUT, () -> filteredConsumedMessageCount.get() == expectedCount);
363+
assertThat(receivedMessageCount).hasValueLessThan(messageCount * 2);
364+
} finally {
365+
TestUtils.deleteSuperStreamTopology(connection, superStream, partitionCount);
366+
connection.close();
367+
}
368+
});
369+
}
370+
273371
private ProducerBuilder producerBuilder() {
274372
return this.environment.producerBuilder().stream(stream);
275373
}

0 commit comments

Comments
 (0)