2121import static com .rabbitmq .client .amqp .ConnectionSettings .Affinity .Operation .PUBLISH ;
2222import static com .rabbitmq .client .amqp .Management .QueueType .QUORUM ;
2323import static com .rabbitmq .client .amqp .impl .Assertions .assertThat ;
24+ import static com .rabbitmq .client .amqp .impl .Cli .quorumStatus ;
25+ import static com .rabbitmq .client .amqp .impl .Cli .rabbitmqctl ;
2426import static com .rabbitmq .client .amqp .impl .ExceptionUtils .noRunningStreamMemberOnNode ;
2527import static com .rabbitmq .client .amqp .impl .TestUtils .name ;
2628import static com .rabbitmq .client .amqp .impl .TestUtils .sync ;
2729import static com .rabbitmq .client .amqp .impl .TestUtils .waitAtMost ;
2830import static java .time .Duration .ofMillis ;
31+ import static java .time .Duration .ofNanos ;
2932import static java .time .Duration .ofSeconds ;
3033import static java .util .Arrays .stream ;
3134import static java .util .stream .Collectors .toList ;
5962import org .junit .jupiter .api .TestInfo ;
6063import org .junit .jupiter .params .ParameterizedTest ;
6164import org .junit .jupiter .params .provider .EnumSource ;
65+ import org .slf4j .Logger ;
66+ import org .slf4j .LoggerFactory ;
6267
6368@ DisabledIfNotCluster
6469public class ClusterTest {
6570
71+ private static final Logger LOGGER = LoggerFactory .getLogger (ClusterTest .class );
72+
6673 static final String [] URIS =
6774 new String [] {"amqp://localhost:5672" , "amqp://localhost:5673" , "amqp://localhost:5674" };
6875 static final BackOffDelayPolicy BACK_OFF_DELAY_POLICY = BackOffDelayPolicy .fixed (ofMillis (100 ));
@@ -339,9 +346,13 @@ void consumeFromQuorumQueueWhenLeaderIsPaused() throws InterruptedException {
339346
340347 assertThat (initialFollowers ).isNotEmpty ();
341348
349+ LOGGER .info ("Pausing node {}" , initialLeader );
350+
342351 Cli .pauseNode (initialLeader );
343352 nodePaused = true ;
344353
354+ LOGGER .info ("Node {} paused" , initialLeader );
355+
345356 publisher .publish (publisher .message ().messageId (2L ), ctx -> publishSync .down ());
346357
347358 assertThat (publishSync ).completes (ofSeconds (20 ));
@@ -351,6 +362,9 @@ void consumeFromQuorumQueueWhenLeaderIsPaused() throws InterruptedException {
351362 assertThat (messageIds ).containsExactlyInAnyOrder (1L , 2L );
352363 consumeSync .reset ();
353364
365+ LOGGER .info ("Waiting for topology update" );
366+ long start = System .nanoTime ();
367+
354368 waitAtMost (
355369 ofSeconds (20 ),
356370 () -> initialFollowers .contains (mgmt .queueInfo (q ).leader ()),
@@ -361,8 +375,14 @@ void consumeFromQuorumQueueWhenLeaderIsPaused() throws InterruptedException {
361375 + "queue info "
362376 + mgmt .queueInfo (q ));
363377
378+ LOGGER .info (
379+ "Topology updated after {} second(s)" , ofNanos (System .nanoTime () - start ).toSeconds ());
380+
381+ LOGGER .info ("Unpausing node {}" , initialLeader );
382+
364383 Cli .unpauseNode (initialLeader );
365384 nodePaused = false ;
385+ LOGGER .info ("Node {} unpaused" , initialLeader );
366386
367387 publisher .publish (publisher .message ().messageId (3L ), ctx -> publishSync .down ());
368388 assertThat (publishSync ).completes ();
@@ -381,10 +401,14 @@ void consumeFromQuorumQueueWhenLeaderIsPaused() throws InterruptedException {
381401 + "queue info "
382402 + mgmt .queueInfo (q ));
383403 } finally {
384- System .out .println (
385- Cli .rabbitmqctl ("eval 'khepri:info(rabbitmq_metadata).'" , initialFollowers .get (0 ))
386- .output ());
387- System .out .println (Cli .quorumStatus (q , initialFollowers .get (0 )));
404+ if (LOGGER .isInfoEnabled ()) {
405+ LOGGER .info ("Khepri status:" );
406+ System .out .println (
407+ rabbitmqctl ("eval 'khepri:info(rabbitmq_metadata).'" , initialFollowers .get (0 ))
408+ .output ());
409+ LOGGER .info ("Quorum queue status:" );
410+ System .out .println (quorumStatus (q , initialFollowers .get (0 )));
411+ }
388412 if (nodePaused ) {
389413 Cli .unpauseNode (initialLeader );
390414 }
0 commit comments