22
33import com .rabbitmq .client .AMQP ;
44import com .rabbitmq .client .Channel ;
5+ import com .rabbitmq .client .Connection ;
56import com .rabbitmq .client .GetResponse ;
67import com .rabbitmq .client .MessageProperties ;
78import com .rabbitmq .client .QueueingConsumer ;
@@ -15,6 +16,7 @@ public class DirectReplyTo extends BrokerTestCase {
1516 public void testRoundTrip () throws IOException , InterruptedException {
1617 QueueingConsumer c = new QueueingConsumer (channel );
1718 String replyTo = rpcFirstHalf (c );
19+ declare (connection , replyTo , true );
1820 channel .confirmSelect ();
1921 basicPublishVolatile ("response" .getBytes (), "" , replyTo , MessageProperties .BASIC );
2022 channel .waitForConfirms ();
@@ -29,12 +31,26 @@ public void testHack() throws IOException, InterruptedException {
2931 // 5 chars should overwrite part of the key but not the pid; aiming to prove
3032 // we can't publish using just the pid
3133 replyTo = replyTo .substring (0 , replyTo .length () - 5 ) + "xxxxx" ;
34+ declare (connection , replyTo , false );
3235 basicPublishVolatile ("response" .getBytes (), "" , replyTo , MessageProperties .BASIC );
3336
3437 QueueingConsumer .Delivery del = c .nextDelivery (500 );
3538 assertNull (del );
3639 }
3740
41+ private void declare (Connection connection , String q , boolean expectedExists ) throws IOException {
42+ Channel ch = connection .createChannel ();
43+ try {
44+ ch .queueDeclarePassive (q );
45+ assertTrue (expectedExists );
46+ } catch (IOException e ) {
47+ assertFalse (expectedExists );
48+ checkShutdownSignal (AMQP .NOT_FOUND , e );
49+ // Hmmm...
50+ channel = connection .createChannel ();
51+ }
52+ }
53+
3854 public void testConsumeFail () throws IOException , InterruptedException {
3955 QueueingConsumer c = new QueueingConsumer (channel );
4056 Channel ch = connection .createChannel ();
0 commit comments