2222import static org .junit .Assert .assertTrue ;
2323
2424import java .io .IOException ;
25+ import java .util .concurrent .BlockingQueue ;
26+ import java .util .concurrent .LinkedBlockingQueue ;
27+ import java .util .concurrent .TimeUnit ;
2528
29+ import com .rabbitmq .client .*;
2630import org .junit .Test ;
2731
28- import com .rabbitmq .client .AMQP ;
29- import com .rabbitmq .client .Channel ;
30- import com .rabbitmq .client .Connection ;
31- import com .rabbitmq .client .GetResponse ;
32- import com .rabbitmq .client .MessageProperties ;
33- import com .rabbitmq .client .QueueingConsumer ;
3432import com .rabbitmq .client .test .BrokerTestCase ;
3533
3634public class DirectReplyTo extends BrokerTestCase {
3735 private static final String QUEUE = "amq.rabbitmq.reply-to" ;
3836
3937 @ Test public void roundTrip () throws IOException , InterruptedException {
40- QueueingConsumer c = new QueueingConsumer (channel );
38+ QueueMessageConsumer c = new QueueMessageConsumer (channel );
4139 String replyTo = rpcFirstHalf (c );
4240 declare (connection , replyTo , true );
4341 channel .confirmSelect ();
4442 basicPublishVolatile ("response" .getBytes (), "" , replyTo , MessageProperties .BASIC );
4543 channel .waitForConfirms ();
4644
47- QueueingConsumer . Delivery del = c .nextDelivery ();
48- assertEquals ("response" , new String (del . getBody () ));
45+ byte [] body = c .nextDelivery (10000 );
46+ assertEquals ("response" , new String (body ));
4947 }
5048
5149 @ Test public void hack () throws IOException , InterruptedException {
52- QueueingConsumer c = new QueueingConsumer (channel );
50+ QueueMessageConsumer c = new QueueMessageConsumer (channel );
5351 String replyTo = rpcFirstHalf (c );
5452 // 5 chars should overwrite part of the key but not the pid; aiming to prove
5553 // we can't publish using just the pid
5654 replyTo = replyTo .substring (0 , replyTo .length () - 5 ) + "xxxxx" ;
5755 declare (connection , replyTo , false );
5856 basicPublishVolatile ("response" .getBytes (), "" , replyTo , MessageProperties .BASIC );
5957
60- QueueingConsumer . Delivery del = c .nextDelivery (500 );
61- assertNull (del );
58+ byte [] body = c .nextDelivery (500 );
59+ assertNull (body );
6260 }
6361
6462 private void declare (Connection connection , String q , boolean expectedExists ) throws IOException {
@@ -75,7 +73,7 @@ private void declare(Connection connection, String q, boolean expectedExists) th
7573 }
7674
7775 @ Test public void consumeFail () throws IOException , InterruptedException {
78- QueueingConsumer c = new QueueingConsumer (channel );
76+ DefaultConsumer c = new DefaultConsumer (channel );
7977 Channel ch = connection .createChannel ();
8078 try {
8179 ch .basicConsume (QUEUE , false , c );
@@ -95,7 +93,7 @@ private void declare(Connection connection, String q, boolean expectedExists) th
9593 }
9694
9795 @ Test public void consumeSuccess () throws IOException , InterruptedException {
98- QueueingConsumer c = new QueueingConsumer (channel );
96+ DefaultConsumer c = new DefaultConsumer (channel );
9997 String ctag = channel .basicConsume (QUEUE , true , c );
10098 channel .basicCancel (ctag );
10199
@@ -104,7 +102,7 @@ private void declare(Connection connection, String q, boolean expectedExists) th
104102 assertNotSame (ctag , ctag2 );
105103 }
106104
107- private String rpcFirstHalf (QueueingConsumer c ) throws IOException {
105+ private String rpcFirstHalf (Consumer c ) throws IOException {
108106 channel .basicConsume (QUEUE , true , c );
109107 String serverQueue = channel .queueDeclare ().getQueue ();
110108 basicPublishVolatile ("request" .getBytes (), "" , serverQueue , props ());
@@ -116,4 +114,27 @@ private String rpcFirstHalf(QueueingConsumer c) throws IOException {
116114 private AMQP .BasicProperties props () {
117115 return MessageProperties .BASIC .builder ().replyTo (QUEUE ).build ();
118116 }
117+
118+ class QueueMessageConsumer extends DefaultConsumer {
119+
120+ BlockingQueue <byte []> messages = new LinkedBlockingQueue <byte []>();
121+
122+ public QueueMessageConsumer (Channel channel ) {
123+ super (channel );
124+ }
125+
126+ @ Override
127+ public void handleDelivery (String consumerTag , Envelope envelope , AMQP .BasicProperties properties , byte [] body ) throws IOException {
128+ messages .add (body );
129+ }
130+
131+ byte [] nextDelivery () {
132+ return messages .poll ();
133+ }
134+
135+ byte [] nextDelivery (long timeoutInMs ) throws InterruptedException {
136+ return messages .poll (timeoutInMs , TimeUnit .MILLISECONDS );
137+ }
138+
139+ }
119140}
0 commit comments