3232import java .util .Map ;
3333import java .util .concurrent .CountDownLatch ;
3434import java .util .concurrent .atomic .AtomicLong ;
35+ import java .util .function .LongFunction ;
3536import java .util .function .Predicate ;
3637import org .junit .jupiter .api .TestInfo ;
3738import org .junit .jupiter .api .extension .ExtendWith ;
@@ -60,7 +61,8 @@ static RetentionTestConfig[] retention() {
6061 .maxSegmentSizeBytes (messageCount * payloadSize / 20 )
6162 .build ());
6263 },
63- firstMessageId -> firstMessageId > 0 ),
64+ firstMessageId -> firstMessageId > 0 ,
65+ firstMessageId -> "First message ID should be positive but is " + firstMessageId ),
6466 new RetentionTestConfig (
6567 "retention defined in policy should take precedence" ,
6668 context -> {
@@ -85,6 +87,7 @@ static RetentionTestConfig[] retention() {
8587 Host .rabbitmqctl (policyCommand );
8688 },
8789 firstMessageId -> firstMessageId > 0 ,
90+ firstMessageId -> "First message ID should be positive but is " + firstMessageId ,
8891 () -> Host .rabbitmqctl ("clear_policy stream-retention-test" )),
8992 new RetentionTestConfig (
9093 "with size helper to specify bytes" ,
@@ -98,7 +101,8 @@ static RetentionTestConfig[] retention() {
98101 .maxSegmentSizeBytes (ByteCapacity .B (messageCount * payloadSize / 20 ))
99102 .build ());
100103 },
101- firstMessageId -> firstMessageId > 0 ),
104+ firstMessageId -> firstMessageId > 0 ,
105+ firstMessageId -> "First message ID should be positive but is " + firstMessageId ),
102106 new RetentionTestConfig (
103107 "with max age" ,
104108 context -> {
@@ -112,6 +116,7 @@ static RetentionTestConfig[] retention() {
112116 .build ());
113117 },
114118 firstMessageId -> firstMessageId > 0 ,
119+ firstMessageId -> "First message ID should be positive but is " + firstMessageId ,
115120 Duration .ofSeconds (3 )),
116121 new RetentionTestConfig (
117122 "no retention" ,
@@ -120,7 +125,8 @@ static RetentionTestConfig[] retention() {
120125 String stream = (String ) ((Object []) context )[1 ];
121126 client .create (stream , Collections .emptyMap ());
122127 },
123- firstMessageId -> firstMessageId == 0 ),
128+ firstMessageId -> firstMessageId == 0 ,
129+ firstMessageId -> "First message ID should be 0 but is " + firstMessageId ),
124130 new RetentionTestConfig (
125131 "with AMQP client, no retention" ,
126132 context -> {
@@ -132,7 +138,8 @@ static RetentionTestConfig[] retention() {
132138 stream , true , false , false , Collections .singletonMap ("x-queue-type" , "stream" ));
133139 }
134140 },
135- firstMessageId -> firstMessageId == 0 ),
141+ firstMessageId -> firstMessageId == 0 ,
142+ firstMessageId -> "First message ID should be 0 but is " + firstMessageId ),
136143 new RetentionTestConfig (
137144 "with AMQP client, with size-based retention" ,
138145 context -> {
@@ -147,7 +154,8 @@ static RetentionTestConfig[] retention() {
147154 ch .queueDeclare (stream , true , false , false , arguments );
148155 }
149156 },
150- firstMessageId -> firstMessageId > 0 ),
157+ firstMessageId -> firstMessageId > 0 ,
158+ firstMessageId -> "First message ID should be positive but is " + firstMessageId ),
151159 new RetentionTestConfig (
152160 "with AMQP client, with age-based retention" ,
153161 context -> {
@@ -163,6 +171,7 @@ static RetentionTestConfig[] retention() {
163171 }
164172 },
165173 firstMessageId -> firstMessageId > 0 ,
174+ firstMessageId -> "First message ID should be positive but is " + firstMessageId ,
166175 Duration .ofSeconds (3 )),
167176 };
168177 }
@@ -202,6 +211,7 @@ void retention(RetentionTestConfig configuration, TestInfo info) throws Exceptio
202211
203212 CountDownLatch consumingLatch = new CountDownLatch (1 );
204213 AtomicLong firstMessageId = new AtomicLong (-1 );
214+ AtomicLong lastMessageId = new AtomicLong (-1 );
205215 Client consumer =
206216 cf .get (
207217 new Client .ClientParameters ()
@@ -212,15 +222,27 @@ void retention(RetentionTestConfig configuration, TestInfo info) throws Exceptio
212222 (subscriptionId , offset , chunkTimestamp , committedOffset , message ) -> {
213223 long messageId = message .getProperties ().getMessageIdAsLong ();
214224 firstMessageId .compareAndSet (-1 , messageId );
225+ lastMessageId .set (messageId );
215226 if (messageId == publishSequence .get () - 1 ) {
216227 consumingLatch .countDown ();
217228 }
218229 }));
219230
220231 consumer .subscribe (b (1 ), testStream , OffsetSpecification .first (), 10 );
221- assertThat (consumingLatch .await (10 , SECONDS )).isTrue ();
232+ assertThat (consumingLatch .await (10 , SECONDS ))
233+ .as (
234+ () ->
235+ String .format (
236+ "Failure '%s', first message ID %d, last message ID %d, publish sequence %d" ,
237+ configuration .description ,
238+ firstMessageId .get (),
239+ lastMessageId .get (),
240+ publishSequence .get ()))
241+ .isTrue ();
222242 consumer .unsubscribe (b (1 ));
223- assertThat (configuration .firstMessageIdAssertion .test (firstMessageId .get ())).isTrue ();
243+ assertThat (configuration .firstMessageIdAssertion .test (firstMessageId .get ()))
244+ .as (configuration .assertionDescription .apply (firstMessageId .get ()))
245+ .isTrue ();
224246 } finally {
225247 client .delete (testStream );
226248 configuration .clean ();
@@ -231,41 +253,65 @@ private static class RetentionTestConfig {
231253 final String description ;
232254 final CallableConsumer <Object > streamCreator ;
233255 final Predicate <Long > firstMessageIdAssertion ;
256+ final LongFunction <String > assertionDescription ;
234257 final Duration waitTime ;
235258 final RunnableWithException cleaning ;
236259
237260 RetentionTestConfig (
238261 String description ,
239262 CallableConsumer <Object > streamCreator ,
240- Predicate <Long > firstMessageIdAssertion ) {
241- this (description , streamCreator , firstMessageIdAssertion , Duration .ZERO , null );
263+ Predicate <Long > firstMessageIdAssertion ,
264+ LongFunction <String > assertionDescription ) {
265+ this (
266+ description ,
267+ streamCreator ,
268+ firstMessageIdAssertion ,
269+ assertionDescription ,
270+ Duration .ZERO ,
271+ null );
242272 }
243273
244274 RetentionTestConfig (
245275 String description ,
246276 CallableConsumer <Object > streamCreator ,
247277 Predicate <Long > firstMessageIdAssertion ,
278+ LongFunction <String > assertionDescription ,
248279 Duration waitTime ) {
249- this (description , streamCreator , firstMessageIdAssertion , waitTime , null );
280+ this (
281+ description ,
282+ streamCreator ,
283+ firstMessageIdAssertion ,
284+ assertionDescription ,
285+ waitTime ,
286+ null );
250287 }
251288
252289 RetentionTestConfig (
253290 String description ,
254291 CallableConsumer <Object > streamCreator ,
255292 Predicate <Long > firstMessageIdAssertion ,
293+ LongFunction <String > assertionDescription ,
256294 RunnableWithException cleaning ) {
257- this (description , streamCreator , firstMessageIdAssertion , Duration .ZERO , cleaning );
295+ this (
296+ description ,
297+ streamCreator ,
298+ firstMessageIdAssertion ,
299+ assertionDescription ,
300+ Duration .ZERO ,
301+ cleaning );
258302 }
259303
260304 RetentionTestConfig (
261305 String description ,
262306 CallableConsumer <Object > streamCreator ,
263307 Predicate <Long > firstMessageIdAssertion ,
308+ LongFunction <String > assertionDescription ,
264309 Duration waitTime ,
265310 RunnableWithException cleaning ) {
266311 this .description = description ;
267312 this .streamCreator = streamCreator ;
268313 this .firstMessageIdAssertion = firstMessageIdAssertion ;
314+ this .assertionDescription = assertionDescription ;
269315 this .waitTime = waitTime ;
270316 this .cleaning = cleaning == null ? () -> {} : cleaning ;
271317 }
0 commit comments