3333import java .util .concurrent .CountDownLatch ;
3434import java .util .concurrent .atomic .AtomicLong ;
3535import java .util .function .Predicate ;
36- import java .util .stream .IntStream ;
3736import org .junit .jupiter .api .TestInfo ;
3837import org .junit .jupiter .api .extension .ExtendWith ;
3938import org .junit .jupiter .params .ParameterizedTest ;
@@ -173,42 +172,33 @@ static RetentionTestConfig[] retention() {
173172 @ MethodSource
174173 void retention (RetentionTestConfig configuration , TestInfo info ) throws Exception {
175174 String testStream = streamName (info );
176- CountDownLatch publishingLatch = new CountDownLatch (messageCount );
177- CountDownLatch publishingLatchSecondWave = new CountDownLatch (messageCount * 2 );
178- Client publisher =
179- cf .get (
180- new Client .ClientParameters ()
181- .publishConfirmListener (
182- (publisherId , publishingId ) -> {
183- publishingLatch .countDown ();
184- publishingLatchSecondWave .countDown ();
185- }));
186175
176+ Client client = cf .get ();
187177 try {
188- configuration .streamCreator .accept (new Object [] {publisher , testStream });
178+ configuration .streamCreator .accept (new Object [] {client , testStream });
179+
189180 AtomicLong publishSequence = new AtomicLong (0 );
190- byte [] payload = new byte [payloadSize ];
191- publisher .declarePublisher (b (1 ), null , testStream );
192181 Runnable publish =
193- () ->
194- publisher .publish (
195- b (1 ),
196- Collections .singletonList (
197- publisher
198- .messageBuilder ()
199- .properties ()
200- .messageId (publishSequence .getAndIncrement ())
201- .messageBuilder ()
202- .addData (payload )
203- .build ()));
204- IntStream .range (0 , messageCount ).forEach (i -> publish .run ());
205- assertThat (publishingLatch .await (10 , SECONDS )).isTrue ();
182+ () -> {
183+ byte [] payload = new byte [payloadSize ];
184+ TestUtils .publishAndWaitForConfirms (
185+ cf ,
186+ messageBuilder ->
187+ messageBuilder
188+ .properties ()
189+ .messageId (publishSequence .getAndIncrement ())
190+ .messageBuilder ()
191+ .addData (payload )
192+ .build (),
193+ messageCount ,
194+ testStream ,
195+ Duration .ofSeconds (20 ));
196+ };
206197
198+ publish .run ();
207199 configuration .waiting ();
208-
209200 // publishing again, to make sure new segments trigger retention strategy
210- IntStream .range (0 , messageCount ).forEach (i -> publish .run ());
211- assertThat (publishingLatchSecondWave .await (10 , SECONDS )).isTrue ();
201+ publish .run ();
212202
213203 CountDownLatch consumingLatch = new CountDownLatch (1 );
214204 AtomicLong firstMessageId = new AtomicLong (-1 );
@@ -232,7 +222,7 @@ void retention(RetentionTestConfig configuration, TestInfo info) throws Exceptio
232222 consumer .unsubscribe (b (1 ));
233223 assertThat (configuration .firstMessageIdAssertion .test (firstMessageId .get ())).isTrue ();
234224 } finally {
235- publisher .delete (testStream );
225+ client .delete (testStream );
236226 configuration .clean ();
237227 }
238228 }
0 commit comments