5656import com .rabbitmq .client .Envelope ;
5757import com .rabbitmq .client .MessageProperties ;
5858import com .rabbitmq .client .QueueingConsumer ;
59+ import com .rabbitmq .client .ReturnListener ;
5960import com .rabbitmq .client .ShutdownSignalException ;
6061import com .rabbitmq .client .AMQP .Queue ;
6162import com .rabbitmq .client .QueueingConsumer .Delivery ;
@@ -89,7 +90,7 @@ public static void main(String[] args) {
8990
9091 //setup
9192 String id = UUID .randomUUID ().toString ();
92- Stats stats = new Stats (1000L * samplingInterval );
93+ final Stats stats = new Stats (1000L * samplingInterval );
9394 ConnectionFactory factory = new ConnectionFactory ();
9495 factory .setHost (hostName );
9596 factory .setPort (portNumber );
@@ -126,6 +127,17 @@ public static void main(String[] args) {
126127 Channel channel = conn .createChannel ();
127128 if (producerTxSize > 0 ) channel .txSelect ();
128129 channel .exchangeDeclare (exchangeName , exchangeType );
130+ channel .setReturnListener (new ReturnListener () {
131+ public void handleBasicReturn (int replyCode ,
132+ String replyText ,
133+ String exchange ,
134+ String routingKey ,
135+ AMQP .BasicProperties properties ,
136+ byte [] body )
137+ throws IOException {
138+ stats .logBasicReturn ();
139+ }
140+ });
129141 Thread t =
130142 new Thread (new Producer (channel , exchangeName , id ,
131143 flags , producerTxSize ,
@@ -409,6 +421,7 @@ public static class Stats {
409421 private long minLatency ;
410422 private long maxLatency ;
411423 private long cumulativeLatency ;
424+ private long numBasicReturns ;
412425
413426 public Stats (long interval ) {
414427 this .interval = interval ;
@@ -422,6 +435,11 @@ private void reset(long t) {
422435 minLatency = Long .MAX_VALUE ;
423436 maxLatency = Long .MIN_VALUE ;
424437 cumulativeLatency = 0L ;
438+ numBasicReturns = 0L ;
439+ }
440+
441+ public synchronized void logBasicReturn () {
442+ ++numBasicReturns ;
425443 }
426444
427445 public synchronized void collectStats (long now , long latency ) {
@@ -444,7 +462,8 @@ public synchronized void collectStats(long now, long latency) {
444462 minLatency /1000L + "/" +
445463 cumulativeLatency / (1000L * latencyCount ) + "/" +
446464 maxLatency /1000L + " microseconds" :
447- "" ));
465+ "" ) +
466+ ", basic returns: " + numBasicReturns );
448467 reset (now );
449468 }
450469
0 commit comments