@@ -86,8 +86,8 @@ public static void main(String[] args) {
8686 int consumerCount = intArg (cmd , 'y' , 1 );
8787 int producerTxSize = intArg (cmd , 'm' , 0 );
8888 int consumerTxSize = intArg (cmd , 'n' , 0 );
89- boolean pubAck = cmd .hasOption ('c' );
90- long pubAckCount = intArg (cmd , 'k' , 0 );
89+ boolean confirm = cmd .hasOption ('c' );
90+ long confirmMax = intArg (cmd , 'k' , 0 );
9191 boolean autoAck = cmd .hasOption ('a' );
9292 int prefetchCount = intArg (cmd , 'q' , 0 );
9393 int minMsgSize = intArg (cmd , 's' , 0 );
@@ -96,9 +96,9 @@ public static void main(String[] args) {
9696 int frameMax = intArg (cmd , 'M' , 0 );
9797 int heartbeat = intArg (cmd , 'b' , 0 );
9898
99- if ((producerTxSize + consumerTxSize > 0 ) && pubAck ) {
99+ if ((producerTxSize + consumerTxSize > 0 ) && confirm ) {
100100 throw new ParseException ("Cannot select both producerTxSize" +
101- "/consumerTxSize and pubAck. " );
101+ "/consumerTxSize and confirm " );
102102 }
103103
104104 //setup
@@ -141,13 +141,13 @@ public static void main(String[] args) {
141141 producerConnections [i ] = conn ;
142142 Channel channel = conn .createChannel ();
143143 if (producerTxSize > 0 ) channel .txSelect ();
144- if (pubAck ) channel .confirmSelect ();
144+ if (confirm ) channel .confirmSelect ();
145145 channel .exchangeDeclare (exchangeName , exchangeType );
146146 final Producer p = new Producer (channel , exchangeName , id ,
147147 flags , producerTxSize ,
148148 1000L * samplingInterval ,
149149 rateLimit , minMsgSize , timeLimit ,
150- pubAckCount );
150+ confirm , confirmMax );
151151 channel .setReturnListener (p );
152152 channel .setAckListener (p );
153153 Thread t = new Thread (p );
@@ -193,9 +193,9 @@ private static Options getOptions() {
193193 options .addOption (new Option ("x" , "producers" , true , "producer count" ));
194194 options .addOption (new Option ("y" , "consumers" , true , "consumer count" ));
195195 options .addOption (new Option ("m" , "ptxsize" , true , "producer tx size" ));
196- options .addOption (new Option ("k" , "pubackcnt " , true , "max unack'd publishes" ));
196+ options .addOption (new Option ("k" , "confirmMax " , true , "max unconfirmed publishes" ));
197197 options .addOption (new Option ("n" , "ctxsize" , true , "consumer tx size" ));
198- options .addOption (new Option ("c" , "puback " , false ,"publisher acks " ));
198+ options .addOption (new Option ("c" , "confirm " , false ,"confirm mode " ));
199199 options .addOption (new Option ("a" , "autoack" , false ,"auto ack" ));
200200 options .addOption (new Option ("q" , "qos" , true , "qos prefetch count" ));
201201 options .addOption (new Option ("s" , "size" , true , "message size" ));
@@ -242,15 +242,17 @@ public static class Producer implements Runnable, ReturnListener, AckListener {
242242 private long startTime ;
243243 private long lastStatsTime ;
244244 private int msgCount ;
245- private int basicReturnCount ;
245+ private int returnCount ;
246246
247- private long pubAckCount ;
248- private long mostRecentAcked ;
247+ private boolean confirm ;
248+ private long confirmMax ;
249+ private long mostRecentConfirmed ;
250+ private long confirmCount ;
249251
250252 public Producer (Channel channel , String exchangeName , String id ,
251253 List flags , int txSize ,
252254 long interval , int rateLimit , int minMsgSize , int timeLimit ,
253- long pubAckCount )
255+ boolean confirm , long confirmMax )
254256 throws IOException {
255257
256258 this .channel = channel ;
@@ -263,38 +265,31 @@ public Producer(Channel channel, String exchangeName, String id,
263265 this .interval = interval ;
264266 this .rateLimit = rateLimit ;
265267 this .timeLimit = 1000L * timeLimit ;
266- this .pubAckCount = pubAckCount ;
268+ this .confirmMax = confirmMax ;
267269 this .message = new byte [minMsgSize ];
270+ this .confirm = confirm ;
268271 }
269272
270- public void handleBasicReturn (int replyCode ,
271- String replyText ,
272- String exchange ,
273- String routingKey ,
274- AMQP .BasicProperties properties ,
275- byte [] body ) throws IOException {
276- logBasicReturn ();
277- }
278-
279- public synchronized void logBasicReturn () {
280- basicReturnCount ++;
281- }
282-
283- public synchronized void resetBasicReturns () {
284- basicReturnCount = 0 ;
273+ public synchronized void handleBasicReturn (int replyCode ,
274+ String replyText ,
275+ String exchange ,
276+ String routingKey ,
277+ AMQP .BasicProperties properties ,
278+ byte [] body )
279+ throws IOException {
280+ returnCount ++;
285281 }
286282
287- public void handleAck (long sequenceNumber , boolean multiple ) {
288- if (multiple ) {
289- logAck (sequenceNumber );
290- System .out .printf ("got an ack all messages up to %d\n " , sequenceNumber );
291- } else {
292- logAck (sequenceNumber );
293- }
283+ public synchronized void resetCounts () {
284+ msgCount = 0 ;
285+ returnCount = 0 ;
286+ confirmCount = 0 ;
294287 }
295288
296- private synchronized void logAck (long seqNum ) {
297- mostRecentAcked = seqNum ;
289+ public synchronized void handleAck (long sequenceNumber ,
290+ boolean multiple ) {
291+ mostRecentConfirmed = sequenceNumber ;
292+ confirmCount ++;
298293 }
299294
300295 public void run () {
@@ -307,7 +302,7 @@ public void run() {
307302 try {
308303
309304 while (timeLimit == 0 || now < startTime + timeLimit ) {
310- if (!throttlePubAck ()) {
305+ if (!throttleConfirms ()) {
311306 delay (now );
312307 publish (createMessage (totalMsgCount ));
313308 totalMsgCount ++;
@@ -343,8 +338,8 @@ private void publish(byte[] msg)
343338 msg );
344339 }
345340
346- private boolean throttlePubAck () {
347- return ((pubAckCount > 0 ) && (channel .getNextPublishSeqNo () - mostRecentAcked > pubAckCount ));
341+ private boolean throttleConfirms () {
342+ return ((confirmMax > 0 ) && (channel .getNextPublishSeqNo () - mostRecentConfirmed > confirmMax ));
348343 }
349344
350345 private void delay (long now )
@@ -361,14 +356,21 @@ private void delay(long now)
361356 Thread .sleep (pause );
362357 }
363358 if (elapsed > interval ) {
364- System .out .println ("sending rate: " +
365- (msgCount * 1000L / elapsed ) +
366- " msg/s" +
367- ", basic returns: " +
368- (basicReturnCount * 1000L / elapsed ) +
369- " ret/s" );
370- resetBasicReturns ();
371- msgCount = 0 ;
359+ System .out .print ("sending rate: " +
360+ (msgCount * 1000L / elapsed ) +
361+ " msg/s" );
362+ if (mandatory || immediate ) {
363+ System .out .print (", returns: " +
364+ (returnCount * 1000L / elapsed ) +
365+ " ret/s" );
366+ }
367+ if (confirm ) {
368+ System .out .print (", confirms: " +
369+ (confirmCount * 1000L / elapsed ) +
370+ " c/s" );
371+ }
372+ System .out .println ();
373+ resetCounts ();
372374 lastStatsTime = now ;
373375 }
374376 }
0 commit comments