@@ -66,40 +66,56 @@ public void handleAck(long seqNo,
6666 channel .queueDeclare ("confirm-test" , true , true , true , null );
6767 channel .basicConsume ("confirm-test" , true , new DefaultConsumer (channel ));
6868 channel .queueDeclare ("confirm-test-noconsumer" , true , true , true , null );
69+ channel .queueDeclare ("confirm-test-2" , true , true , true , null );
70+ channel .basicConsume ("confirm-test-2" , true , new DefaultConsumer (channel ));
71+ channel .queueBind ("confirm-test" , "amq.direct" , "confirm-multiple-queues" );
72+ channel .queueBind ("confirm-test-2" , "amq.direct" , "confirm-multiple-queues" );
6973 }
7074
7175 public void testConfirmTransient () throws IOException , InterruptedException {
72- confirmTest ("consumer -test" , false , false , false );
76+ confirmTest ("confirm -test" , false , false , false );
7377 }
7478
7579 public void testConfirmPersistentSimple ()
7680 throws IOException , InterruptedException
7781 {
78- confirmTest ("consumer -test" , true , false , false );
82+ confirmTest ("confirm -test" , true , false , false );
7983 }
8084
8185 public void testConfirmPersistentImmediate ()
8286 throws IOException , InterruptedException
8387 {
84- confirmTest ("consumer -test" , true , false , true );
88+ confirmTest ("confirm -test" , true , false , true );
8589 }
8690
8791 public void testConfirmPersistentImmediateNoConsumer ()
8892 throws IOException , InterruptedException
8993 {
90- confirmTest ("consumer -test-noconsumer" , true , false , true );
94+ confirmTest ("confirm -test-noconsumer" , true , false , true );
9195 }
9296
9397 public void testConfirmPersistentMandatory ()
9498 throws IOException , InterruptedException
9599 {
96- confirmTest ("consumer -test" , true , true , false );
100+ confirmTest ("confirm -test" , true , true , false );
97101 }
98102
99103 public void testConfirmPersistentMandatoryReturn ()
100104 throws IOException , InterruptedException
101105 {
102- confirmTest ("consumer-test-doesnotexist" , true , true , false );
106+ confirmTest ("confirm-test-doesnotexist" , true , true , false );
107+ }
108+
109+ public void testConfirmMultipleQueues ()
110+ throws IOException , InterruptedException
111+ {
112+ for (long i = 0 ; i < NUM_MESSAGES ; i ++) {
113+ publish ("amq.direct" , "confirm-multiple-queues" , true , false , false );
114+ ackSet .add (i );
115+ }
116+
117+ while (ackSet .size () > 0 )
118+ Thread .sleep (10 );
103119 }
104120
105121 /* Publish NUM_MESSAGES persistent messages and wait for
@@ -121,13 +137,23 @@ private void publish(String queueName, boolean persistent,
121137 boolean mandatory , boolean immediate )
122138 throws IOException
123139 {
124- channel .basicPublish ("" , queueName , mandatory , immediate ,
140+ publish ("" , queueName , persistent , mandatory , immediate );
141+ }
142+
143+ private void publish (String exchangeName , String queueName ,
144+ boolean persistent , boolean mandatory ,
145+ boolean immediate )
146+ throws IOException
147+ {
148+ channel .basicPublish (exchangeName , queueName , mandatory , immediate ,
125149 persistent ? MessageProperties .PERSISTENT_BASIC
126150 : MessageProperties .BASIC ,
127151 "nop" .getBytes ());
128152 }
129153
130154 private synchronized void gotAckFor (long msgSeqNo ) {
155+ if (!ackSet .contains (msgSeqNo ))
156+ fail ("got duplicate ack: " + msgSeqNo );
131157 ackSet .remove (msgSeqNo );
132158 }
133159}
0 commit comments