Skip to content

Commit b807cf8

Browse files
committed
Add super stream test
To make sure messages with a given key (e.g. user name) end up in the same partition.
1 parent c5be26f commit b807cf8

File tree

1 file changed

+110
-0
lines changed

1 file changed

+110
-0
lines changed

src/test/java/com/rabbitmq/stream/impl/SuperStreamTest.java

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,18 @@
2626
import com.rabbitmq.stream.OffsetSpecification;
2727
import com.rabbitmq.stream.Producer;
2828
import io.netty.channel.EventLoopGroup;
29+
import java.nio.charset.StandardCharsets;
30+
import java.util.HashSet;
31+
import java.util.List;
32+
import java.util.Map;
33+
import java.util.Random;
34+
import java.util.Set;
2935
import java.util.UUID;
36+
import java.util.concurrent.ConcurrentHashMap;
3037
import java.util.concurrent.CountDownLatch;
3138
import java.util.concurrent.atomic.AtomicInteger;
39+
import java.util.concurrent.atomic.AtomicLong;
40+
import java.util.stream.Collectors;
3241
import java.util.stream.IntStream;
3342
import org.junit.jupiter.api.AfterEach;
3443
import org.junit.jupiter.api.BeforeEach;
@@ -153,4 +162,105 @@ void allMessagesSentWithRoutingKeyRoutingShouldBeThenConsumed() throws Exception
153162
latchAssert(consumeLatch).completes();
154163
assertThat(totalCount.get()).isEqualTo(messageCount);
155164
}
165+
166+
@Test
167+
void allMessagesForSameUserShouldEndUpInSamePartition() throws Exception {
168+
int messageCount = 10_000 * partitions;
169+
int userCount = 10;
170+
declareSuperStreamTopology(connection, superStream, partitions);
171+
172+
AtomicInteger totalReceivedCount = new AtomicInteger(0);
173+
// <partition>.<user> => count
174+
Map<String, AtomicLong> receivedMessagesPerPartitionUserCount =
175+
new ConcurrentHashMap<>(userCount);
176+
CountDownLatch consumeLatch = new CountDownLatch(messageCount);
177+
CountDownLatch consumersReadyLatch = new CountDownLatch(0);
178+
IntStream.range(0, partitions)
179+
.forEach(
180+
i -> {
181+
environment
182+
.consumerBuilder()
183+
.superStream(superStream)
184+
.name("app-1")
185+
.singleActiveConsumer()
186+
.offset(OffsetSpecification.first())
187+
.consumerUpdateListener(
188+
context -> {
189+
if (context.isActive() && i == partitions - 1) {
190+
// a consumer in the last composite consumer gets activated
191+
consumersReadyLatch.countDown();
192+
}
193+
return OffsetSpecification.first();
194+
})
195+
.messageHandler(
196+
(context, message) -> {
197+
String user =
198+
new String(message.getProperties().getUserId(), StandardCharsets.UTF_8);
199+
receivedMessagesPerPartitionUserCount
200+
.computeIfAbsent(context.stream() + "." + user, s -> new AtomicLong(0))
201+
.incrementAndGet();
202+
totalReceivedCount.incrementAndGet();
203+
consumeLatch.countDown();
204+
})
205+
.build();
206+
});
207+
208+
assertThat(latchAssert(consumersReadyLatch)).completes();
209+
210+
Producer producer =
211+
environment.producerBuilder().stream(superStream)
212+
.routing(
213+
message -> new String(message.getProperties().getUserId(), StandardCharsets.UTF_8))
214+
.producerBuilder()
215+
.build();
216+
217+
List<String> users =
218+
IntStream.range(0, userCount).mapToObj(i -> "user" + i).collect(Collectors.toList());
219+
Map<String, AtomicLong> messagePerUserCount =
220+
users.stream().collect(Collectors.toMap(u -> u, u -> new AtomicLong(0)));
221+
Random random = new Random();
222+
223+
CountDownLatch publishLatch = new CountDownLatch(messageCount);
224+
IntStream.range(0, messageCount)
225+
.forEach(
226+
i -> {
227+
String user = users.get(random.nextInt(userCount));
228+
messagePerUserCount.get(user).incrementAndGet();
229+
producer.send(
230+
producer
231+
.messageBuilder()
232+
.properties()
233+
.userId(user.getBytes(StandardCharsets.UTF_8))
234+
.messageBuilder()
235+
.build(),
236+
confirmationStatus -> publishLatch.countDown());
237+
});
238+
239+
assertThat(latchAssert(publishLatch)).completes(5);
240+
241+
latchAssert(consumeLatch).completes();
242+
assertThat(totalReceivedCount.get()).isEqualTo(messageCount);
243+
assertThat(receivedMessagesPerPartitionUserCount).hasSize(userCount);
244+
245+
Set<String> partitionNames =
246+
IntStream.range(0, partitions)
247+
.mapToObj(i -> superStream + "-" + i)
248+
.collect(Collectors.toSet());
249+
AtomicLong total = new AtomicLong();
250+
Set<String> passedUsers = new HashSet<>();
251+
receivedMessagesPerPartitionUserCount.forEach(
252+
(key, count) -> {
253+
// <partition>.<user> => count
254+
String partition = key.split("\\.")[0];
255+
String user = key.split("\\.")[1];
256+
assertThat(partition).isIn(partitionNames);
257+
assertThat(user).isIn(users);
258+
259+
assertThat(passedUsers).doesNotContain(user);
260+
passedUsers.add(user);
261+
262+
total.addAndGet(count.get());
263+
});
264+
assertThat(total).hasValue(messageCount);
265+
}
156266
}

0 commit comments

Comments
 (0)