@@ -141,7 +141,7 @@ public static void main(String[] args) {
141141 producerConnections [i ] = conn ;
142142 Channel channel = conn .createChannel ();
143143 if (producerTxSize > 0 ) channel .txSelect ();
144- if (pubAck ) channel .confirmSelect (true );
144+ if (pubAck ) channel .confirmSelect (false );
145145 channel .exchangeDeclare (exchangeName , exchangeType );
146146 final Producer p = new Producer (channel , exchangeName , id ,
147147 flags , producerTxSize ,
@@ -289,7 +289,7 @@ public void handleAck(long sequenceNumber, boolean multiple) {
289289 logAck (sequenceNumber );
290290 System .out .printf ("got an ack all messages up to %d\n " , sequenceNumber );
291291 } else {
292- System . out . printf ( "got an ack for message %d \n " , sequenceNumber );
292+ logAck ( sequenceNumber );
293293 }
294294 }
295295
@@ -307,14 +307,17 @@ public void run() {
307307 try {
308308
309309 while (timeLimit == 0 || now < startTime + timeLimit ) {
310- delayPubAck ();
311- delay (now );
312- publish (createMessage (totalMsgCount ));
313- totalMsgCount ++;
314- msgCount ++;
315-
316- if (txSize != 0 && totalMsgCount % txSize == 0 ) {
317- channel .txCommit ();
310+ if (!throttlePubAck ()) {
311+ delay (now );
312+ publish (createMessage (totalMsgCount ));
313+ totalMsgCount ++;
314+ msgCount ++;
315+
316+ if (txSize != 0 && totalMsgCount % txSize == 0 ) {
317+ channel .txCommit ();
318+ }
319+ } else {
320+ Thread .sleep (10 );
318321 }
319322 now = System .currentTimeMillis ();
320323 }
@@ -340,14 +343,8 @@ private void publish(byte[] msg)
340343 msg );
341344 }
342345
343- private void delayPubAck ()
344- throws InterruptedException {
345- if (pubAckCount == 0 )
346- return ;
347- while (channel .getPublishedMessageCount () - mostRecentAcked
348- > pubAckCount ) {
349- Thread .sleep (100 );
350- }
346+ private boolean throttlePubAck () {
347+ return ((pubAckCount > 0 ) && (channel .getPublishedMessageCount () - mostRecentAcked > pubAckCount ));
351348 }
352349
353350 private void delay (long now )
0 commit comments