4848import org .apache .commons .cli .Options ;
4949import org .apache .commons .cli .ParseException ;
5050
51+ import com .rabbitmq .client .AckListener ;
5152import com .rabbitmq .client .AMQP ;
5253import com .rabbitmq .client .Address ;
5354import com .rabbitmq .client .Channel ;
@@ -80,6 +81,8 @@ public static void main(String[] args) {
8081 int consumerCount = intArg (cmd , 'y' , 1 );
8182 int producerTxSize = intArg (cmd , 'm' , 0 );
8283 int consumerTxSize = intArg (cmd , 'n' , 0 );
84+ boolean pubAck = cmd .hasOption ('c' );
85+ int pubAckCount = intArg (cmd , 'k' , 0 );
8386 boolean autoAck = cmd .hasOption ('a' );
8487 int prefetchCount = intArg (cmd , 'q' , 0 );
8588 int minMsgSize = intArg (cmd , 's' , 0 );
@@ -88,6 +91,11 @@ public static void main(String[] args) {
8891 int frameMax = intArg (cmd , 'M' , 0 );
8992 int heartbeat = intArg (cmd , 'b' , 0 );
9093
94+ if ((producerTxSize + consumerTxSize > 0 ) && pubAck ) {
95+ throw new ParseException ("Cannot select both producerTxSize" +
96+ "/consumerTxSize and pubAck." );
97+ }
98+
9199 //setup
92100 String id = UUID .randomUUID ().toString ();
93101 Stats stats = new Stats (1000L * samplingInterval );
@@ -128,12 +136,14 @@ public static void main(String[] args) {
128136 producerConnections [i ] = conn ;
129137 Channel channel = conn .createChannel ();
130138 if (producerTxSize > 0 ) channel .txSelect ();
139+ if (pubAck ) channel .confirmSelect (true );
131140 channel .exchangeDeclare (exchangeName , exchangeType );
132141 final Producer p = new Producer (channel , exchangeName , id ,
133142 flags , producerTxSize ,
134143 1000L * samplingInterval ,
135144 rateLimit , minMsgSize , timeLimit );
136145 channel .setReturnListener (p );
146+ channel .setAckListener (p );
137147 Thread t = new Thread (p );
138148 producerThreads [i ] = t ;
139149 t .start ();
@@ -172,7 +182,9 @@ private static Options getOptions() {
172182 options .addOption (new Option ("x" , "producers" , true , "producer count" ));
173183 options .addOption (new Option ("y" , "consumers" , true , "consumer count" ));
174184 options .addOption (new Option ("m" , "ptxsize" , true , "producer tx size" ));
185+ options .addOption (new Option ("k" , "pubackcnt" , true , "max unack'd publishes" ));
175186 options .addOption (new Option ("n" , "ctxsize" , true , "consumer tx size" ));
187+ options .addOption (new Option ("c" , "puback" , false ,"publisher acks" ));
176188 options .addOption (new Option ("a" , "autoack" , false ,"auto ack" ));
177189 options .addOption (new Option ("q" , "qos" , true , "qos prefetch count" ));
178190 options .addOption (new Option ("s" , "size" , true , "message size" ));
@@ -201,7 +213,7 @@ private static List lstArg(CommandLine cmd, char opt) {
201213 return Arrays .asList (vals );
202214 }
203215
204- public static class Producer implements Runnable , ReturnListener {
216+ public static class Producer implements Runnable , ReturnListener , AckListener {
205217
206218 private Channel channel ;
207219 private String exchangeName ;
@@ -256,6 +268,10 @@ public synchronized void resetBasicReturns() {
256268 basicReturnCount = 0 ;
257269 }
258270
271+ public void handleAck (long sequenceNumber , boolean multiple ) {
272+ System .out .printf ("got an ack for %d\n " , sequenceNumber );
273+ }
274+
259275 public void run () {
260276
261277 long now ;
0 commit comments