1616package com .rabbitmq .client .test .functional ;
1717
1818import static org .junit .Assert .assertEquals ;
19+ import static org .junit .Assert .assertTrue ;
1920import static org .junit .Assert .fail ;
2021
2122import java .io .IOException ;
2223import java .util .Arrays ;
2324import java .util .HashMap ;
2425import java .util .Map ;
26+ import java .util .concurrent .BlockingQueue ;
27+ import java .util .concurrent .CountDownLatch ;
28+ import java .util .concurrent .LinkedBlockingQueue ;
29+ import java .util .concurrent .TimeUnit ;
2530
31+ import com .rabbitmq .client .DefaultConsumer ;
32+ import com .rabbitmq .client .Envelope ;
2633import org .junit .Test ;
2734
2835import com .rabbitmq .client .AMQP ;
@@ -51,24 +58,33 @@ private void assertFailValidation(Map<String, Object> args) throws IOException {
5158 }
5259
5360 private static final int COUNT = 10 ;
61+ private static final long DELIVERY_TIMEOUT_MS = 100 ;
62+ private static final long CANCEL_OK_TIMEOUT_MS = 10 * 1000 ;
5463
5564 @ Test public void consumerPriorities () throws Exception {
5665 String queue = channel .queueDeclare ().getQueue ();
57- QueueingConsumer highConsumer = new QueueingConsumer (channel );
58- QueueingConsumer medConsumer = new QueueingConsumer (channel );
59- QueueingConsumer lowConsumer = new QueueingConsumer (channel );
66+ QueueMessageConsumer highConsumer = new QueueMessageConsumer (channel );
67+ QueueMessageConsumer medConsumer = new QueueMessageConsumer (channel );
68+ QueueMessageConsumer lowConsumer = new QueueMessageConsumer (channel );
6069 String high = channel .basicConsume (queue , true , args (1 ), highConsumer );
6170 String med = channel .basicConsume (queue , true , medConsumer );
6271 channel .basicConsume (queue , true , args (-1 ), lowConsumer );
6372
6473 publish (queue , COUNT , "high" );
74+ assertContents (highConsumer , COUNT , "high" );
6575 channel .basicCancel (high );
76+ assertTrue (
77+ "High priority consumer should have been cancelled" ,
78+ highConsumer .cancelLatch .await (CANCEL_OK_TIMEOUT_MS , TimeUnit .MILLISECONDS )
79+ );
6680 publish (queue , COUNT , "med" );
81+ assertContents (medConsumer , COUNT , "med" );
6782 channel .basicCancel (med );
83+ assertTrue (
84+ "Medium priority consumer should have been cancelled" ,
85+ medConsumer .cancelLatch .await (CANCEL_OK_TIMEOUT_MS , TimeUnit .MILLISECONDS )
86+ );
6887 publish (queue , COUNT , "low" );
69-
70- assertContents (highConsumer , COUNT , "high" );
71- assertContents (medConsumer , COUNT , "med" );
7288 assertContents (lowConsumer , COUNT , "low" );
7389 }
7490
@@ -78,17 +94,43 @@ private Map<String, Object> args(Object o) {
7894 return map ;
7995 }
8096
81- private void assertContents (QueueingConsumer qc , int count , String msg ) throws InterruptedException {
97+ private void assertContents (QueueMessageConsumer qc , int count , String msg ) throws InterruptedException {
8298 for (int i = 0 ; i < count ; i ++) {
83- QueueingConsumer . Delivery d = qc .nextDelivery ();
84- assertEquals (msg , new String (d . getBody () ));
99+ byte [] body = qc .nextDelivery (DELIVERY_TIMEOUT_MS );
100+ assertEquals (msg , new String (body ));
85101 }
86- assertEquals (null , qc .nextDelivery (0 ));
102+ assertEquals (null , qc .nextDelivery (DELIVERY_TIMEOUT_MS ));
87103 }
88104
89105 private void publish (String queue , int count , String msg ) throws IOException {
90106 for (int i = 0 ; i < count ; i ++) {
91107 channel .basicPublish ("" , queue , MessageProperties .MINIMAL_BASIC , msg .getBytes ());
92108 }
93109 }
110+
111+ private static class QueueMessageConsumer extends DefaultConsumer {
112+
113+ BlockingQueue <byte []> messages = new LinkedBlockingQueue <byte []>();
114+
115+ CountDownLatch cancelLatch = new CountDownLatch (1 );
116+
117+ public QueueMessageConsumer (Channel channel ) {
118+ super (channel );
119+ }
120+
121+ @ Override
122+ public void handleDelivery (String consumerTag , Envelope envelope , AMQP .BasicProperties properties , byte [] body ) throws IOException {
123+ messages .add (body );
124+ }
125+
126+ @ Override
127+ public void handleCancelOk (String consumerTag ) {
128+ cancelLatch .countDown ();
129+ }
130+
131+ byte [] nextDelivery (long timeoutInMs ) throws InterruptedException {
132+ return messages .poll (timeoutInMs , TimeUnit .MILLISECONDS );
133+ }
134+
135+ }
94136}
0 commit comments