3737import java .io .DataOutputStream ;
3838import java .io .IOException ;
3939import java .util .Arrays ;
40+ import java .util .Collections ;
41+ import java .util .concurrent .Semaphore ;
4042import java .util .List ;
43+ import java .util .SortedSet ;
44+ import java .util .TreeSet ;
4145import java .util .UUID ;
4246
4347import org .apache .commons .cli .CommandLine ;
@@ -244,10 +248,11 @@ public static class Producer implements Runnable, ReturnListener, AckListener {
244248 private int msgCount ;
245249 private int returnCount ;
246250
247- private boolean confirm ;
248- private long confirmMax ;
249- private long mostRecentConfirmed ;
250- private long confirmCount ;
251+ private boolean confirm ;
252+ private long confirmCount ;
253+ private Semaphore confirmPool ;
254+ private volatile SortedSet <Long > ackSet =
255+ Collections .synchronizedSortedSet (new TreeSet <Long >());
251256
252257 public Producer (Channel channel , String exchangeName , String id ,
253258 List flags , int txSize ,
@@ -265,7 +270,9 @@ public Producer(Channel channel, String exchangeName, String id,
265270 this .interval = interval ;
266271 this .rateLimit = rateLimit ;
267272 this .timeLimit = 1000L * timeLimit ;
268- this .confirmMax = confirmMax ;
273+ if (confirmMax > 0 ) {
274+ this .confirmPool = new Semaphore ((int )confirmMax );
275+ }
269276 this .message = new byte [minMsgSize ];
270277 this .confirm = confirm ;
271278 }
@@ -280,16 +287,28 @@ public synchronized void handleBasicReturn(int replyCode,
280287 returnCount ++;
281288 }
282289
283- public synchronized void resetCounts () {
284- msgCount = 0 ;
285- returnCount = 0 ;
286- confirmCount = 0 ;
287- }
290+ public void handleAck (long seqNo , boolean multiple ) {
291+ int numConfirms = 0 ;
292+ if (multiple ) {
293+ for (long i = ackSet .first (); i <= seqNo ; ++i ) {
294+ if (!ackSet .contains (i ))
295+ continue ;
296+ ackSet .remove (i );
297+ numConfirms ++;
298+ }
299+ } else {
300+ ackSet .remove (seqNo );
301+ numConfirms = 1 ;
302+ }
303+ synchronized (this ) {
304+ confirmCount += numConfirms ;
305+ }
288306
289- public synchronized void handleAck (long sequenceNumber ,
290- boolean multiple ) {
291- mostRecentConfirmed = sequenceNumber ;
292- confirmCount ++;
307+ if (confirmPool != null ) {
308+ for (int i = 0 ; i < numConfirms ; ++i ) {
309+ confirmPool .release ();
310+ }
311+ }
293312 }
294313
295314 public void run () {
@@ -302,17 +321,16 @@ public void run() {
302321 try {
303322
304323 while (timeLimit == 0 || now < startTime + timeLimit ) {
305- if (!throttleConfirms ()) {
306- delay (now );
307- publish (createMessage (totalMsgCount ));
308- totalMsgCount ++;
309- msgCount ++;
310-
311- if (txSize != 0 && totalMsgCount % txSize == 0 ) {
312- channel .txCommit ();
313- }
314- } else {
315- Thread .sleep (10 );
324+ if (confirmPool != null ) {
325+ confirmPool .acquire ();
326+ }
327+ delay (now );
328+ publish (createMessage (totalMsgCount ));
329+ totalMsgCount ++;
330+ msgCount ++;
331+
332+ if (txSize != 0 && totalMsgCount % txSize == 0 ) {
333+ channel .txCommit ();
316334 }
317335 now = System .currentTimeMillis ();
318336 }
@@ -332,16 +350,13 @@ public void run() {
332350 private void publish (byte [] msg )
333351 throws IOException {
334352
353+ ackSet .add (channel .getNextPublishSeqNo ());
335354 channel .basicPublish (exchangeName , id ,
336355 mandatory , immediate ,
337356 persistent ? MessageProperties .MINIMAL_PERSISTENT_BASIC : MessageProperties .MINIMAL_BASIC ,
338357 msg );
339358 }
340359
341- private boolean throttleConfirms () {
342- return ((confirmMax > 0 ) && (channel .getNextPublishSeqNo () - mostRecentConfirmed > confirmMax ));
343- }
344-
345360 private void delay (long now )
346361 throws InterruptedException {
347362
@@ -356,21 +371,23 @@ private void delay(long now)
356371 Thread .sleep (pause );
357372 }
358373 if (elapsed > interval ) {
359- System .out .print ("sending rate: " +
360- (msgCount * 1000L / elapsed ) +
361- " msg/s" );
374+ long sendRate , returnRate , confirmRate ;
375+ synchronized (this ) {
376+ sendRate = msgCount * 1000L / elapsed ;
377+ returnRate = returnCount * 1000L / elapsed ;
378+ confirmRate = confirmCount * 1000L / elapsed ;
379+ msgCount = 0 ;
380+ returnCount = 0 ;
381+ confirmCount = 0 ;
382+ }
383+ System .out .print ("sending rate: " + sendRate + " msg/s" );
362384 if (mandatory || immediate ) {
363- System .out .print (", returns: " +
364- (returnCount * 1000L / elapsed ) +
365- " ret/s" );
385+ System .out .print (", returns: " + returnRate + " ret/s" );
366386 }
367387 if (confirm ) {
368- System .out .print (", confirms: " +
369- (confirmCount * 1000L / elapsed ) +
370- " c/s" );
388+ System .out .print (", confirms: " + confirmRate + " c/s" );
371389 }
372390 System .out .println ();
373- resetCounts ();
374391 lastStatsTime = now ;
375392 }
376393 }
0 commit comments