|
20 | 20 | import static com.rabbitmq.stream.impl.TestUtils.latchAssert; |
21 | 21 | import static com.rabbitmq.stream.impl.TestUtils.waitAtMost; |
22 | 22 | import static com.rabbitmq.stream.impl.TestUtils.wrap; |
| 23 | +import static java.lang.String.format; |
23 | 24 | import static org.assertj.core.api.Assertions.assertThat; |
24 | 25 |
|
25 | 26 | import com.rabbitmq.client.Connection; |
@@ -254,10 +255,7 @@ void autoOffsetTrackingShouldStoreOffsetZero() throws Exception { |
254 | 255 | publishToPartitions(cf, partitions, messageCount); |
255 | 256 | ConcurrentMap<String, AtomicInteger> messagesReceived = new ConcurrentHashMap<>(partitionCount); |
256 | 257 | ConcurrentMap<String, Long> lastOffsets = new ConcurrentHashMap<>(partitionCount); |
257 | | - partitions.forEach( |
258 | | - p -> { |
259 | | - messagesReceived.put(p, new AtomicInteger(0)); |
260 | | - }); |
| 258 | + partitions.forEach(p -> messagesReceived.put(p, new AtomicInteger(0))); |
261 | 259 | CountDownLatch consumeLatch = new CountDownLatch(messageCount); |
262 | 260 | String consumerName = "my-app"; |
263 | 261 | AtomicInteger totalCount = new AtomicInteger(); |
@@ -294,9 +292,12 @@ void autoOffsetTrackingShouldStoreOffsetZero() throws Exception { |
294 | 292 | waitAtMost( |
295 | 293 | () -> { |
296 | 294 | QueryOffsetResponse response = client.queryOffset(consumerName, p); |
297 | | - return response.isOk() |
298 | | - && response.getOffset() == lastOffsets.get(p).longValue(); |
299 | | - }); |
| 295 | + return response.isOk() && response.getOffset() == lastOffsets.get(p); |
| 296 | + }, |
| 297 | + () -> |
| 298 | + format( |
| 299 | + "Expecting stored offset %d on stream '%s', but got %d", |
| 300 | + lastOffsets.get(p), p, client.queryOffset(consumerName, p).getOffset())); |
300 | 301 | })); |
301 | 302 | } |
302 | 303 |
|
|
0 commit comments