@@ -246,7 +246,7 @@ public static class Producer implements Runnable, ReturnListener, AckListener {
246246 private long startTime ;
247247 private long lastStatsTime ;
248248 private int msgCount ;
249- private int basicReturnCount ;
249+ private int returnCount ;
250250
251251 private boolean confirm ;
252252 private long confirmCount ;
@@ -277,24 +277,17 @@ public Producer(Channel channel, String exchangeName, String id,
277277 this .confirm = confirm ;
278278 }
279279
280- public void handleBasicReturn (int replyCode ,
281- String replyText ,
282- String exchange ,
283- String routingKey ,
284- AMQP .BasicProperties properties ,
285- byte [] body ) throws IOException {
286- logBasicReturn ();
287- }
288-
289- public synchronized void logBasicReturn () {
290- basicReturnCount ++;
291- }
292-
293- public synchronized void resetBasicReturns () {
294- basicReturnCount = 0 ;
280+ public synchronized void handleBasicReturn (int replyCode ,
281+ String replyText ,
282+ String exchange ,
283+ String routingKey ,
284+ AMQP .BasicProperties properties ,
285+ byte [] body )
286+ throws IOException {
287+ returnCount ++;
295288 }
296289
297- public void handleAck (long seqNo , boolean multiple ) {
290+ public synchronized void handleAck (long seqNo , boolean multiple ) {
298291 int numConfirms = 0 ;
299292 if (multiple ) {
300293 for (long i = ackSet .first (); i <= seqNo ; ++i ) {
@@ -307,7 +300,7 @@ public void handleAck(long seqNo, boolean multiple) {
307300 ackSet .remove (seqNo );
308301 numConfirms = 1 ;
309302 }
310- addConfirms ( numConfirms ) ;
303+ confirmCount += numConfirms ;
311304
312305 if (confirmPool != null ) {
313306 for (int i = 0 ; i < numConfirms ; ++i ) {
@@ -316,14 +309,12 @@ public void handleAck(long seqNo, boolean multiple) {
316309 }
317310 }
318311
319- private synchronized void resetConfirms () {
312+ public synchronized void resetCounts () {
313+ msgCount = 0 ;
314+ returnCount = 0 ;
320315 confirmCount = 0 ;
321316 }
322317
323- private synchronized void addConfirms (int numConfirms ) {
324- confirmCount += numConfirms ;
325- }
326-
327318 public void run () {
328319
329320 long now ;
@@ -388,8 +379,8 @@ private void delay(long now)
388379 (msgCount * 1000L / elapsed ) +
389380 " msg/s" );
390381 if (mandatory || immediate ) {
391- System .out .print (", basic returns: " +
392- (basicReturnCount * 1000L / elapsed ) +
382+ System .out .print (", returns: " +
383+ (returnCount * 1000L / elapsed ) +
393384 " ret/s" );
394385 }
395386 if (confirm ) {
@@ -398,9 +389,7 @@ private void delay(long now)
398389 " c/s" );
399390 }
400391 System .out .println ();
401- resetBasicReturns ();
402- resetConfirms ();
403- msgCount = 0 ;
392+ resetCounts ();
404393 lastStatsTime = now ;
405394 }
406395 }
0 commit comments