2525
2626import org .eclipse .paho .client .mqttv3 .MqttConnectOptions ;
2727import org .eclipse .paho .client .mqttv3 .MqttException ;
28- import org .junit .jupiter .api . Test ;
28+ import org .junitpioneer .jupiter .RetryingTest ;
2929
3030import org .springframework .context .ApplicationContext ;
3131import org .springframework .context .annotation .AnnotationConfigApplicationContext ;
6363 */
6464class ClientManagerBackToBackTests implements MosquittoContainerTest {
6565
66- private static final long QUIESCENT_TIMEOUT = 1L ;
66+ static final long QUIESCENT_TIMEOUT = 1L ;
6767
68- private static final long DISCONNECT_COMPLETION_TIMEOUT = 1L ;
68+ static final long DISCONNECT_COMPLETION_TIMEOUT = 1L ;
6969
70- @ Test
70+ @ RetryingTest ( 10 )
7171 void testSameV3ClientIdWorksForPubAndSub () throws Exception {
7272 testSubscribeAndPublish (Mqttv3Config .class , Mqttv3Config .TOPIC_NAME , Mqttv3Config .subscribedLatch );
7373 }
7474
75- @ Test
75+ @ RetryingTest ( 10 )
7676 void testSameV5ClientIdWorksForPubAndSub () throws Exception {
7777 testSubscribeAndPublish (Mqttv5Config .class , Mqttv5Config .TOPIC_NAME , Mqttv5Config .subscribedLatch );
7878 }
7979
80- @ Test
80+ @ RetryingTest ( 10 )
8181 void testV3ClientManagerReconnect () throws Exception {
8282 testSubscribeAndPublish (Mqttv3ConfigWithDisconnect .class , Mqttv3ConfigWithDisconnect .TOPIC_NAME ,
8383 Mqttv3ConfigWithDisconnect .subscribedLatch );
8484 }
8585
86- @ Test
86+ @ RetryingTest ( 10 )
8787 void testV3ClientManagerRuntime () throws Exception {
8888 testSubscribeAndPublishRuntime (Mqttv3ConfigRuntime .class , Mqttv3ConfigRuntime .TOPIC_NAME ,
8989 Mqttv3ConfigRuntime .subscribedLatch );
9090 }
9191
92- @ Test
92+ @ RetryingTest ( 10 )
9393 void testV5ClientManagerReconnect () throws Exception {
9494 testSubscribeAndPublish (Mqttv5ConfigWithDisconnect .class , Mqttv5ConfigWithDisconnect .TOPIC_NAME ,
9595 Mqttv5ConfigWithDisconnect .subscribedLatch );
9696 }
9797
98- @ Test
98+ @ RetryingTest ( 10 )
9999 void testV5ClientManagerRuntime () throws Exception {
100100 testSubscribeAndPublishRuntime (Mqttv5ConfigRuntime .class , Mqttv5ConfigRuntime .TOPIC_NAME ,
101101 Mqttv5ConfigRuntime .subscribedLatch );
102102 }
103103
104104 @ SuppressWarnings ("unchecked" )
105- private void testSubscribeAndPublish (Class <?> configClass , String topicName , CountDownLatch subscribedLatch )
105+ static void testSubscribeAndPublish (Class <?> configClass , String topicName , CountDownLatch subscribedLatch )
106106 throws Exception {
107107
108108 try (var ctx = new AnnotationConfigApplicationContext (configClass )) {
@@ -134,8 +134,8 @@ private void testSubscribeAndPublish(Class<?> configClass, String topicName, Cou
134134 }
135135 }
136136
137- private void testSubscribeAndPublishRuntime (Class <?> configClass , String topicName , CountDownLatch subscribedLatch )
138- throws Exception {
137+ static void testSubscribeAndPublishRuntime (Class <?> configClass , String topicName ,
138+ CountDownLatch subscribedLatch ) throws Exception {
139139
140140 try (var ctx = new AnnotationConfigApplicationContext (configClass )) {
141141 // given
@@ -169,14 +169,14 @@ private void testSubscribeAndPublishRuntime(Class<?> configClass, String topicNa
169169
170170 @ Configuration
171171 @ EnableIntegration
172- public static class Mqttv3Config {
172+ static class Mqttv3Config {
173173
174174 static final String TOPIC_NAME = "test-topic-v3" ;
175175
176176 static final CountDownLatch subscribedLatch = new CountDownLatch (1 );
177177
178178 @ EventListener
179- public void onSubscribed (MqttSubscribedEvent e ) {
179+ void onSubscribed (MqttSubscribedEvent e ) {
180180 subscribedLatch .countDown ();
181181 }
182182
@@ -191,7 +191,7 @@ List<MqttMessageDeliveryEvent> deliveryEvents() {
191191 }
192192
193193 @ Bean
194- public Mqttv3ClientManager mqttv3ClientManager () {
194+ Mqttv3ClientManager mqttv3ClientManager () {
195195 MqttConnectOptions connectionOptions = new MqttConnectOptions ();
196196 connectionOptions .setServerURIs (new String [] {MosquittoContainerTest .mqttUrl ()});
197197 connectionOptions .setAutomaticReconnect (true );
@@ -202,15 +202,15 @@ public Mqttv3ClientManager mqttv3ClientManager() {
202202 }
203203
204204 @ Bean
205- public IntegrationFlow mqttOutFlow (Mqttv3ClientManager mqttv3ClientManager ) {
205+ IntegrationFlow mqttOutFlow (Mqttv3ClientManager mqttv3ClientManager ) {
206206 MqttPahoMessageHandler mqttPahoMessageHandler = new MqttPahoMessageHandler (mqttv3ClientManager );
207207 mqttPahoMessageHandler .setAsync (true );
208208 mqttPahoMessageHandler .setAsyncEvents (true );
209209 return f -> f .handle (mqttPahoMessageHandler );
210210 }
211211
212212 @ Bean
213- public IntegrationFlow mqttInFlow (Mqttv3ClientManager mqttv3ClientManager ) {
213+ IntegrationFlow mqttInFlow (Mqttv3ClientManager mqttv3ClientManager ) {
214214 return IntegrationFlow .from (new MqttPahoMessageDrivenChannelAdapter (mqttv3ClientManager , TOPIC_NAME ))
215215 .channel (c -> c .queue ("fromMqttChannel" ))
216216 .get ();
@@ -220,24 +220,24 @@ public IntegrationFlow mqttInFlow(Mqttv3ClientManager mqttv3ClientManager) {
220220
221221 @ Configuration
222222 @ EnableIntegration
223- public static class Mqttv3ConfigWithDisconnect {
223+ static class Mqttv3ConfigWithDisconnect {
224224
225225 static final String TOPIC_NAME = "test-topic-v3-reconnect" ;
226226
227227 static final CountDownLatch subscribedLatch = new CountDownLatch (1 );
228228
229229 @ EventListener
230- public void onSubscribed (MqttSubscribedEvent e ) {
230+ void onSubscribed (MqttSubscribedEvent e ) {
231231 subscribedLatch .countDown ();
232232 }
233233
234234 @ Bean
235- public ClientV3Disconnector disconnector (Mqttv3ClientManager clientManager ) {
235+ ClientV3Disconnector disconnector (Mqttv3ClientManager clientManager ) {
236236 return new ClientV3Disconnector (clientManager );
237237 }
238238
239239 @ Bean
240- public Mqttv3ClientManager mqttv3ClientManager () {
240+ Mqttv3ClientManager mqttv3ClientManager () {
241241 MqttConnectOptions connectionOptions = new MqttConnectOptions ();
242242 connectionOptions .setServerURIs (new String [] {MosquittoContainerTest .mqttUrl ()});
243243 connectionOptions .setAutomaticReconnect (true );
@@ -248,12 +248,12 @@ public Mqttv3ClientManager mqttv3ClientManager() {
248248 }
249249
250250 @ Bean
251- public IntegrationFlow mqttOutFlow () {
251+ IntegrationFlow mqttOutFlow () {
252252 return f -> f .handle (new MqttPahoMessageHandler (MosquittoContainerTest .mqttUrl (), "old-client-v3" ));
253253 }
254254
255255 @ Bean
256- public IntegrationFlow mqttInFlow (Mqttv3ClientManager mqttv3ClientManager ) {
256+ IntegrationFlow mqttInFlow (Mqttv3ClientManager mqttv3ClientManager ) {
257257 return IntegrationFlow .from (new MqttPahoMessageDrivenChannelAdapter (mqttv3ClientManager , TOPIC_NAME ))
258258 .channel (c -> c .queue ("fromMqttChannel" ))
259259 .get ();
@@ -263,19 +263,19 @@ public IntegrationFlow mqttInFlow(Mqttv3ClientManager mqttv3ClientManager) {
263263
264264 @ Configuration
265265 @ EnableIntegration
266- public static class Mqttv3ConfigRuntime implements MessageDrivenChannelAdapterFactory {
266+ static class Mqttv3ConfigRuntime implements MessageDrivenChannelAdapterFactory {
267267
268268 static final String TOPIC_NAME = "test-topic-v3" ;
269269
270270 static final CountDownLatch subscribedLatch = new CountDownLatch (1 );
271271
272272 @ EventListener
273- public void onSubscribed (MqttSubscribedEvent e ) {
273+ void onSubscribed (MqttSubscribedEvent e ) {
274274 subscribedLatch .countDown ();
275275 }
276276
277277 @ Bean
278- public Mqttv3ClientManager mqttv3ClientManager () {
278+ Mqttv3ClientManager mqttv3ClientManager () {
279279 MqttConnectOptions connectionOptions = new MqttConnectOptions ();
280280 connectionOptions .setServerURIs (new String [] {MosquittoContainerTest .mqttUrl ()});
281281 connectionOptions .setAutomaticReconnect (true );
@@ -286,7 +286,7 @@ public Mqttv3ClientManager mqttv3ClientManager() {
286286 }
287287
288288 @ Bean
289- public IntegrationFlow mqttOutFlow (Mqttv3ClientManager mqttv3ClientManager ) {
289+ IntegrationFlow mqttOutFlow (Mqttv3ClientManager mqttv3ClientManager ) {
290290 return f -> f .handle (new MqttPahoMessageHandler (mqttv3ClientManager ));
291291 }
292292
@@ -300,14 +300,14 @@ public MessageProducerSupport createMessageDrivenAdapter(ApplicationContext ctx)
300300
301301 @ Configuration
302302 @ EnableIntegration
303- public static class Mqttv5Config {
303+ static class Mqttv5Config {
304304
305305 static final String TOPIC_NAME = "test-topic-v5" ;
306306
307307 static final CountDownLatch subscribedLatch = new CountDownLatch (1 );
308308
309309 @ EventListener
310- public void onSubscribed (MqttSubscribedEvent e ) {
310+ void onSubscribed (MqttSubscribedEvent e ) {
311311 subscribedLatch .countDown ();
312312 }
313313
@@ -322,21 +322,21 @@ List<MqttMessageDeliveryEvent> deliveryEvents() {
322322 }
323323
324324 @ Bean
325- public Mqttv5ClientManager mqttv5ClientManager () {
325+ Mqttv5ClientManager mqttv5ClientManager () {
326326 return new Mqttv5ClientManager (MosquittoContainerTest .mqttUrl (), "client-manager-client-id-v5" );
327327 }
328328
329329 @ Bean
330330 @ ServiceActivator (inputChannel = "mqttOutFlow.input" )
331- public Mqttv5PahoMessageHandler mqttv5PahoMessageHandler (Mqttv5ClientManager mqttv5ClientManager ) {
331+ Mqttv5PahoMessageHandler mqttv5PahoMessageHandler (Mqttv5ClientManager mqttv5ClientManager ) {
332332 Mqttv5PahoMessageHandler mqttPahoMessageHandler = new Mqttv5PahoMessageHandler (mqttv5ClientManager );
333333 mqttPahoMessageHandler .setAsync (true );
334334 mqttPahoMessageHandler .setAsyncEvents (true );
335335 return mqttPahoMessageHandler ;
336336 }
337337
338338 @ Bean
339- public IntegrationFlow mqttInFlow (Mqttv5ClientManager mqttv5ClientManager ) {
339+ IntegrationFlow mqttInFlow (Mqttv5ClientManager mqttv5ClientManager ) {
340340 return IntegrationFlow .from (new Mqttv5PahoMessageDrivenChannelAdapter (mqttv5ClientManager , TOPIC_NAME ))
341341 .channel (c -> c .queue ("fromMqttChannel" ))
342342 .get ();
@@ -346,34 +346,34 @@ public IntegrationFlow mqttInFlow(Mqttv5ClientManager mqttv5ClientManager) {
346346
347347 @ Configuration
348348 @ EnableIntegration
349- public static class Mqttv5ConfigWithDisconnect {
349+ static class Mqttv5ConfigWithDisconnect {
350350
351351 static final String TOPIC_NAME = "test-topic-v5-reconnect" ;
352352
353353 static final CountDownLatch subscribedLatch = new CountDownLatch (1 );
354354
355355 @ EventListener
356- public void onSubscribed (MqttSubscribedEvent e ) {
356+ void onSubscribed (MqttSubscribedEvent e ) {
357357 subscribedLatch .countDown ();
358358 }
359359
360360 @ Bean
361- public ClientV5Disconnector clientV3Disconnector (Mqttv5ClientManager clientManager ) {
361+ ClientV5Disconnector clientV3Disconnector (Mqttv5ClientManager clientManager ) {
362362 return new ClientV5Disconnector (clientManager );
363363 }
364364
365365 @ Bean
366- public Mqttv5ClientManager mqttv5ClientManager () {
366+ Mqttv5ClientManager mqttv5ClientManager () {
367367 return new Mqttv5ClientManager (MosquittoContainerTest .mqttUrl (), "client-manager-client-id-v5-reconnect" );
368368 }
369369
370370 @ Bean
371- public IntegrationFlow mqttOutFlow (Mqttv5ClientManager mqttv5ClientManager ) {
372- return f -> f .handle (new Mqttv5PahoMessageHandler (MosquittoContainerTest . mqttUrl (), "old-client-v5" ));
371+ IntegrationFlow mqttOutFlow (Mqttv5ClientManager mqttv5ClientManager ) {
372+ return f -> f .handle (new Mqttv5PahoMessageHandler (mqttv5ClientManager ));
373373 }
374374
375375 @ Bean
376- public IntegrationFlow mqttInFlow (Mqttv5ClientManager mqttv5ClientManager ) {
376+ IntegrationFlow mqttInFlow (Mqttv5ClientManager mqttv5ClientManager ) {
377377 return IntegrationFlow .from (new Mqttv5PahoMessageDrivenChannelAdapter (mqttv5ClientManager , TOPIC_NAME ))
378378 .channel (c -> c .queue ("fromMqttChannel" ))
379379 .get ();
@@ -383,25 +383,25 @@ public IntegrationFlow mqttInFlow(Mqttv5ClientManager mqttv5ClientManager) {
383383
384384 @ Configuration
385385 @ EnableIntegration
386- public static class Mqttv5ConfigRuntime implements MessageDrivenChannelAdapterFactory {
386+ static class Mqttv5ConfigRuntime implements MessageDrivenChannelAdapterFactory {
387387
388388 static final String TOPIC_NAME = "test-topic-v5" ;
389389
390390 static final CountDownLatch subscribedLatch = new CountDownLatch (1 );
391391
392392 @ EventListener
393- public void onSubscribed (MqttSubscribedEvent e ) {
393+ void onSubscribed (MqttSubscribedEvent e ) {
394394 subscribedLatch .countDown ();
395395 }
396396
397397 @ Bean
398- public Mqttv5ClientManager mqttv5ClientManager () {
398+ Mqttv5ClientManager mqttv5ClientManager () {
399399 return new Mqttv5ClientManager (MosquittoContainerTest .mqttUrl (), "client-manager-client-id-v5" );
400400 }
401401
402402 @ Bean
403403 @ ServiceActivator (inputChannel = "mqttOutFlow.input" )
404- public Mqttv5PahoMessageHandler mqttv5PahoMessageHandler (Mqttv5ClientManager mqttv5ClientManager ) {
404+ Mqttv5PahoMessageHandler mqttv5PahoMessageHandler (Mqttv5ClientManager mqttv5ClientManager ) {
405405 return new Mqttv5PahoMessageHandler (mqttv5ClientManager );
406406 }
407407
@@ -422,7 +422,7 @@ interface MessageDrivenChannelAdapterFactory {
422422 record ClientV3Disconnector (Mqttv3ClientManager clientManager ) {
423423
424424 @ EventListener (MqttSubscribedEvent .class )
425- public void handleSubscribedEvent () {
425+ void handleSubscribedEvent () {
426426 try {
427427 this .clientManager .getClient ().disconnectForcibly ();
428428 }
@@ -436,7 +436,7 @@ public void handleSubscribedEvent() {
436436 record ClientV5Disconnector (Mqttv5ClientManager clientManager ) {
437437
438438 @ EventListener (MqttSubscribedEvent .class )
439- public void handleSubscribedEvent () {
439+ void handleSubscribedEvent () {
440440 try {
441441 this .clientManager .getClient ().disconnectForcibly ();
442442 }
0 commit comments