1515
1616package com .rabbitmq .client .test .functional ;
1717
18- import com .rabbitmq .client .*;
18+ import com .rabbitmq .client .AMQP ;
19+ import com .rabbitmq .client .Channel ;
20+ import com .rabbitmq .client .Connection ;
21+ import com .rabbitmq .client .ConnectionFactory ;
22+ import com .rabbitmq .client .DefaultConsumer ;
23+ import com .rabbitmq .client .Envelope ;
24+ import com .rabbitmq .client .GetResponse ;
25+ import com .rabbitmq .client .Recoverable ;
26+ import com .rabbitmq .client .RecoveryListener ;
1927import com .rabbitmq .client .impl .StandardMetricsCollector ;
2028import com .rabbitmq .client .impl .recovery .AutorecoveringConnection ;
2129import com .rabbitmq .client .test .BrokerTestCase ;
2230import com .rabbitmq .client .test .TestUtils ;
2331import com .rabbitmq .tools .Host ;
2432import org .awaitility .Duration ;
2533import org .junit .Test ;
34+ import org .junit .runner .RunWith ;
35+ import org .junit .runners .Parameterized ;
2636
2737import java .io .IOException ;
2838import java .lang .reflect .Field ;
2939import java .util .ArrayList ;
3040import java .util .Collection ;
3141import java .util .List ;
3242import java .util .Random ;
33- import java .util .concurrent .*;
34-
35- import static org .awaitility .Awaitility .*;
36- import static org .hamcrest .Matchers .*;
37- import static org .junit .Assert .*;
43+ import java .util .concurrent .Callable ;
44+ import java .util .concurrent .CountDownLatch ;
45+ import java .util .concurrent .ExecutorService ;
46+ import java .util .concurrent .Executors ;
47+ import java .util .concurrent .TimeUnit ;
48+ import java .util .concurrent .TimeoutException ;
49+
50+ import static org .awaitility .Awaitility .waitAtMost ;
51+ import static org .hamcrest .Matchers .equalTo ;
52+ import static org .hamcrest .Matchers .is ;
53+ import static org .junit .Assert .assertThat ;
54+ import static org .junit .Assert .assertTrue ;
3855
3956/**
4057 *
4158 */
59+ @ RunWith (Parameterized .class )
4260public class Metrics extends BrokerTestCase {
4361
62+ @ Parameterized .Parameters
63+ public static Object [] data () {
64+ return new Object [] { createConnectionFactory (), createAutoRecoveryConnectionFactory () };
65+ }
66+
67+ @ Parameterized .Parameter
68+ public ConnectionFactory connectionFactory ;
69+
4470 static final String QUEUE = "metrics.queue" ;
4571
4672 @ Override
@@ -53,17 +79,7 @@ protected void releaseResources() throws IOException {
5379 channel .queueDelete (QUEUE );
5480 }
5581
56- @ Test public void metricsStandardConnection () throws IOException , TimeoutException {
57- doMetrics (createConnectionFactory ());
58- }
59-
60- @ Test public void metricsAutoRecoveryConnection () throws IOException , TimeoutException {
61- ConnectionFactory connectionFactory = createConnectionFactory ();
62- connectionFactory .setAutomaticRecoveryEnabled (true );
63- doMetrics (connectionFactory );
64- }
65-
66- private void doMetrics (ConnectionFactory connectionFactory ) throws IOException , TimeoutException {
82+ @ Test public void metrics () throws IOException , TimeoutException {
6783 StandardMetricsCollector metrics = new StandardMetricsCollector ();
6884 connectionFactory .setMetricsCollector (metrics );
6985 Connection connection1 = null ;
@@ -103,15 +119,15 @@ private void doMetrics(ConnectionFactory connectionFactory) throws IOException,
103119 assertThat (metrics .getConsumedMessages ().getCount (), is (2L +1L ));
104120
105121 channel .basicConsume (QUEUE , true , new DefaultConsumer (channel ));
106- waitAtMost (timeout ()).until (new ConsumedMessagesMetricsCallable ( metrics ), equalTo (2L +1L +1L ));
122+ waitAtMost (timeout ()).until (() -> metrics . getConsumedMessages (). getCount ( ), equalTo (2L +1L +1L ));
107123
108124 safeClose (connection1 );
109- waitAtMost (timeout ()).until (new ConnectionsMetricsCallable ( metrics ), equalTo (1L ));
110- waitAtMost (timeout ()).until (new ChannelsMetricsCallable ( metrics ), equalTo (2L ));
125+ waitAtMost (timeout ()).until (() -> metrics . getConnections (). getCount ( ), equalTo (1L ));
126+ waitAtMost (timeout ()).until (() -> metrics . getChannels (). getCount ( ), equalTo (2L ));
111127
112128 safeClose (connection2 );
113- waitAtMost (timeout ()).until (new ConnectionsMetricsCallable ( metrics ), equalTo (0L ));
114- waitAtMost (timeout ()).until (new ChannelsMetricsCallable ( metrics ), equalTo (0L ));
129+ waitAtMost (timeout ()).until (() -> metrics . getConnections (). getCount ( ), equalTo (0L ));
130+ waitAtMost (timeout ()).until (() -> metrics . getChannels (). getCount ( ), equalTo (0L ));
115131
116132 assertThat (metrics .getAcknowledgedMessages ().getCount (), is (0L ));
117133 assertThat (metrics .getRejectedMessages ().getCount (), is (0L ));
@@ -122,17 +138,8 @@ private void doMetrics(ConnectionFactory connectionFactory) throws IOException,
122138 }
123139 }
124140
125- @ Test public void metricsAckStandardConnection () throws IOException , TimeoutException {
126- doMetricsAck (createConnectionFactory ());
127- }
128141
129- @ Test public void metricsAckAutoRecoveryConnection () throws IOException , TimeoutException {
130- ConnectionFactory connectionFactory = createConnectionFactory ();
131- connectionFactory .setAutomaticRecoveryEnabled (true );
132- doMetricsAck (connectionFactory );
133- }
134-
135- private void doMetricsAck (ConnectionFactory connectionFactory ) throws IOException , TimeoutException {
142+ @ Test public void metricsAck () throws IOException , TimeoutException {
136143 StandardMetricsCollector metrics = new StandardMetricsCollector ();
137144 connectionFactory .setMetricsCollector (metrics );
138145
@@ -190,12 +197,12 @@ private void doMetricsAck(ConnectionFactory connectionFactory) throws IOExceptio
190197 }
191198
192199 waitAtMost (timeout ()).until (
193- new ConsumedMessagesMetricsCallable ( metrics ),
200+ () -> metrics . getConsumedMessages (). getCount ( ),
194201 equalTo (alreadySentMessages +nbMessages )
195202 );
196203
197204 waitAtMost (timeout ()).until (
198- new AcknowledgedMessagesMetricsCallable ( metrics ),
205+ () -> metrics . getAcknowledgedMessages (). getCount ( ),
199206 equalTo (alreadySentMessages +nbMessages )
200207 );
201208
@@ -204,17 +211,7 @@ private void doMetricsAck(ConnectionFactory connectionFactory) throws IOExceptio
204211 }
205212 }
206213
207- @ Test public void metricsRejectStandardConnection () throws IOException , TimeoutException {
208- doMetricsReject (createConnectionFactory ());
209- }
210-
211- @ Test public void metricsRejectAutoRecoveryConnection () throws IOException , TimeoutException {
212- ConnectionFactory connectionFactory = createConnectionFactory ();
213- connectionFactory .setAutomaticRecoveryEnabled (true );
214- doMetricsReject (connectionFactory );
215- }
216-
217- private void doMetricsReject (ConnectionFactory connectionFactory ) throws IOException , TimeoutException {
214+ @ Test public void metricsReject () throws IOException , TimeoutException {
218215 StandardMetricsCollector metrics = new StandardMetricsCollector ();
219216 connectionFactory .setMetricsCollector (metrics );
220217
@@ -239,20 +236,9 @@ private void doMetricsReject(ConnectionFactory connectionFactory) throws IOExcep
239236 } finally {
240237 safeClose (connection );
241238 }
242-
243239 }
244240
245241 @ Test public void multiThreadedMetricsStandardConnection () throws InterruptedException , TimeoutException , IOException {
246- doMultiThreadedMetrics (createConnectionFactory ());
247- }
248-
249- @ Test public void multiThreadedMetricsAutoRecoveryConnection () throws InterruptedException , TimeoutException , IOException {
250- ConnectionFactory connectionFactory = createConnectionFactory ();
251- connectionFactory .setAutomaticRecoveryEnabled (true );
252- doMultiThreadedMetrics (connectionFactory );
253- }
254-
255- private void doMultiThreadedMetrics (ConnectionFactory connectionFactory ) throws IOException , TimeoutException , InterruptedException {
256242 StandardMetricsCollector metrics = new StandardMetricsCollector ();
257243 connectionFactory .setMetricsCollector (metrics );
258244 int nbConnections = 3 ;
@@ -293,7 +279,7 @@ private void doMultiThreadedMetrics(ConnectionFactory connectionFactory) throws
293279 executorService .invokeAll (tasks );
294280
295281 assertThat (metrics .getPublishedMessages ().getCount (), is (nbOfMessages ));
296- waitAtMost (timeout ()).until (new ConsumedMessagesMetricsCallable ( metrics ), equalTo (nbOfMessages ));
282+ waitAtMost (timeout ()).until (() -> metrics . getConsumedMessages (). getCount ( ), equalTo (nbOfMessages ));
297283 assertThat (metrics .getAcknowledgedMessages ().getCount (), is (0L ));
298284
299285 // to remove the listeners
@@ -322,8 +308,8 @@ private void doMultiThreadedMetrics(ConnectionFactory connectionFactory) throws
322308 executorService .invokeAll (tasks );
323309
324310 assertThat (metrics .getPublishedMessages ().getCount (), is (2 *nbOfMessages ));
325- waitAtMost (timeout ()).until (new ConsumedMessagesMetricsCallable ( metrics ), equalTo (2 *nbOfMessages ));
326- waitAtMost (timeout ()).until (new AcknowledgedMessagesMetricsCallable ( metrics ), equalTo (nbOfMessages ));
311+ waitAtMost (timeout ()).until (() -> metrics . getConsumedMessages (). getCount ( ), equalTo (2 *nbOfMessages ));
312+ waitAtMost (timeout ()).until (() -> metrics . getAcknowledgedMessages (). getCount ( ), equalTo (nbOfMessages ));
327313
328314 // to remove the listeners
329315 for (int i = 0 ; i < nbChannels ; i ++) {
@@ -351,29 +337,18 @@ private void doMultiThreadedMetrics(ConnectionFactory connectionFactory) throws
351337 executorService .invokeAll (tasks );
352338
353339 assertThat (metrics .getPublishedMessages ().getCount (), is (3 *nbOfMessages ));
354- waitAtMost (timeout ()).until (new ConsumedMessagesMetricsCallable ( metrics ), equalTo (3 *nbOfMessages ));
355- waitAtMost (timeout ()).until (new AcknowledgedMessagesMetricsCallable ( metrics ), equalTo (nbOfMessages ));
356- waitAtMost (timeout ()).until (new RejectedMessagesMetricsCallable ( metrics ), equalTo (nbOfMessages ));
340+ waitAtMost (timeout ()).until (() -> metrics . getConsumedMessages (). getCount ( ), equalTo (3 *nbOfMessages ));
341+ waitAtMost (timeout ()).until (() -> metrics . getAcknowledgedMessages (). getCount ( ), equalTo (nbOfMessages ));
342+ waitAtMost (timeout ()).until (() -> metrics . getRejectedMessages (). getCount ( ), equalTo (nbOfMessages ));
357343 } finally {
358344 for (Connection connection : connections ) {
359345 safeClose (connection );
360346 }
361347 executorService .shutdownNow ();
362348 }
363-
364- }
365-
366- @ Test public void errorInChannelStandardConnection () throws IOException , TimeoutException {
367- errorInChannel (createConnectionFactory ());
368- }
369-
370- @ Test public void errorInChananelAutoRecoveryConnection () throws IOException , TimeoutException {
371- ConnectionFactory connectionFactory = createConnectionFactory ();
372- connectionFactory .setAutomaticRecoveryEnabled (true );
373- errorInChannel (connectionFactory );
374349 }
375350
376- private void errorInChannel (ConnectionFactory connectionFactory ) throws IOException , TimeoutException {
351+ @ Test public void errorInChannel () throws IOException , TimeoutException {
377352 StandardMetricsCollector metrics = new StandardMetricsCollector ();
378353 connectionFactory .setMetricsCollector (metrics );
379354
@@ -387,12 +362,11 @@ private void errorInChannel(ConnectionFactory connectionFactory) throws IOExcept
387362
388363 channel .basicPublish ("unlikelynameforanexchange" , "" , null , "msg" .getBytes ("UTF-8" ));
389364
390- waitAtMost (timeout ()).until (new ChannelsMetricsCallable ( metrics ), is (0L ));
365+ waitAtMost (timeout ()).until (() -> metrics . getChannels (). getCount ( ), is (0L ));
391366 assertThat (metrics .getConnections ().getCount (), is (1L ));
392367 } finally {
393368 safeClose (connection );
394369 }
395-
396370 }
397371
398372 @ Test public void checkListenersWithAutoRecoveryConnection () throws Exception {
@@ -426,8 +400,15 @@ private void errorInChannel(ConnectionFactory connectionFactory) throws IOExcept
426400
427401 }
428402
429- private ConnectionFactory createConnectionFactory () {
403+ private static ConnectionFactory createConnectionFactory () {
430404 ConnectionFactory connectionFactory = TestUtils .connectionFactory ();
405+ connectionFactory .setAutomaticRecoveryEnabled (false );
406+ return connectionFactory ;
407+ }
408+
409+ private static ConnectionFactory createAutoRecoveryConnectionFactory () {
410+ ConnectionFactory connectionFactory = TestUtils .connectionFactory ();
411+ connectionFactory .setAutomaticRecoveryEnabled (true );
431412 return connectionFactory ;
432413 }
433414
@@ -587,87 +568,4 @@ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProp
587568 }
588569 }
589570
590- static abstract class MetricsCallable implements Callable <Long > {
591-
592- final StandardMetricsCollector metrics ;
593-
594- protected MetricsCallable (StandardMetricsCollector metrics ) {
595- this .metrics = metrics ;
596- }
597-
598-
599- }
600-
601- static class ConnectionsMetricsCallable extends MetricsCallable {
602-
603- ConnectionsMetricsCallable (StandardMetricsCollector metrics ) {
604- super (metrics );
605- }
606-
607- @ Override
608- public Long call () throws Exception {
609- return metrics .getConnections ().getCount ();
610- }
611- }
612-
613- static class ChannelsMetricsCallable extends MetricsCallable {
614-
615- ChannelsMetricsCallable (StandardMetricsCollector metrics ) {
616- super (metrics );
617- }
618-
619- @ Override
620- public Long call () throws Exception {
621- return metrics .getChannels ().getCount ();
622- }
623- }
624-
625- static class PublishedMessagesMetricsCallable extends MetricsCallable {
626-
627- PublishedMessagesMetricsCallable (StandardMetricsCollector metrics ) {
628- super (metrics );
629- }
630-
631- @ Override
632- public Long call () throws Exception {
633- return metrics .getPublishedMessages ().getCount ();
634- }
635- }
636-
637- static class ConsumedMessagesMetricsCallable extends MetricsCallable {
638-
639- ConsumedMessagesMetricsCallable (StandardMetricsCollector metrics ) {
640- super (metrics );
641- }
642-
643- @ Override
644- public Long call () throws Exception {
645- return metrics .getConsumedMessages ().getCount ();
646- }
647- }
648-
649- static class AcknowledgedMessagesMetricsCallable extends MetricsCallable {
650-
651- AcknowledgedMessagesMetricsCallable (StandardMetricsCollector metrics ) {
652- super (metrics );
653- }
654-
655- @ Override
656- public Long call () throws Exception {
657- return metrics .getAcknowledgedMessages ().getCount ();
658- }
659- }
660-
661- static class RejectedMessagesMetricsCallable extends MetricsCallable {
662-
663- RejectedMessagesMetricsCallable (StandardMetricsCollector metrics ) {
664- super (metrics );
665- }
666-
667- @ Override
668- public Long call () throws Exception {
669- return metrics .getRejectedMessages ().getCount ();
670- }
671- }
672-
673571}
0 commit comments