4242import java .io .IOException ;
4343import java .util .Collections ;
4444import java .util .Map ;
45- import java .util .Set ;
45+ import java .util .SortedSet ;
4646import java .util .TreeSet ;
4747
4848public class Confirm extends BrokerTestCase
4949{
5050 final static int NUM_MESSAGES = 1000 ;
5151 private static final String TTL_ARG = "x-message-ttl" ;
52- volatile Set <Long > ackSet ;
52+ private SortedSet <Long > ackSet ;
5353
5454 @ Override
5555 protected void setUp () throws IOException {
5656 super .setUp ();
57- ackSet = new TreeSet <Long >();
57+ ackSet = Collections . synchronizedSortedSet ( new TreeSet <Long >() );
5858 channel .setAckListener (new AckListener () {
5959 public void handleAck (long seqNo ,
6060 boolean multiple ) {
6161 if (multiple ) {
62- for (int i = 0 ; i <= seqNo ; ++i )
63- Confirm .this .gotAckFor (i );
62+ Confirm .this .gotAckForMultiple (seqNo );
6463 } else {
6564 Confirm .this .gotAckFor (seqNo );
6665 }
@@ -253,8 +252,8 @@ private void publishN(String exchangeName, String queueName,
253252 throws IOException
254253 {
255254 for (long i = 0 ; i < NUM_MESSAGES ; i ++) {
256- publish (exchangeName , queueName , persistent , mandatory , immediate );
257255 ackSet .add (i );
256+ publish (exchangeName , queueName , persistent , mandatory , immediate );
258257 }
259258 }
260259
@@ -270,10 +269,14 @@ private void publish(String exchangeName, String queueName,
270269 "nop" .getBytes ());
271270 }
272271
272+ private void gotAckForMultiple (long msgSeqNo ) {
273+ for (long i = ackSet .first (); i <= msgSeqNo ; ++i )
274+ gotAckFor (i );
275+ }
276+
273277 private synchronized void gotAckFor (long msgSeqNo ) {
274278 if (!ackSet .contains (msgSeqNo )) {
275279 fail ("got duplicate ack: " + msgSeqNo );
276- //System.out.println("got duplicate ack: " + msgSeqNo);
277280 }
278281 ackSet .remove (msgSeqNo );
279282 }
0 commit comments