Skip to content

Commit 392ffe1

Browse files
committed
aggregrate stats across producers
1 parent 0f33c72 commit 392ffe1

File tree

1 file changed

+99
-71
lines changed

1 file changed

+99
-71
lines changed

test/src/com/rabbitmq/examples/MulticastMain.java

Lines changed: 99 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ public static void main(String[] args) {
134134
flags, producerTxSize,
135135
1000L * samplingInterval,
136136
rateLimit, minMsgSize, timeLimit,
137-
confirm);
137+
confirm, stats);
138138
channel.addReturnListener(p);
139139
channel.addConfirmListener(p);
140140
Thread t = new Thread(p);
@@ -213,9 +213,10 @@ private static List<?> lstArg(CommandLine cmd, char opt) {
213213
}
214214

215215
private static String formatRate(double rate) {
216-
if (rate < 1) return String.format("%1.2f", rate);
216+
if (rate == 0.0) return String.format("%d", (long)rate);
217+
else if (rate < 1) return String.format("%1.2f", rate);
217218
else if (rate < 10) return String.format("%1.1f", rate);
218-
else return String.format("%d", (long) rate);
219+
else return String.format("%d", (long)rate);
219220
}
220221

221222
public static class Producer implements Runnable, ReturnListener,
@@ -232,24 +233,23 @@ public static class Producer implements Runnable, ReturnListener,
232233
private int rateLimit;
233234
private long timeLimit;
234235

236+
private Stats stats;
237+
235238
private byte[] message;
236239

237240
private long startTime;
238241
private long lastStatsTime;
239242
private int msgCount;
240-
private int returnCount;
241243

242244
private long confirm;
243245
private Semaphore confirmPool;
244-
private long confirmCount;
245-
private long nackCount;
246246
private volatile SortedSet<Long> unconfirmedSet =
247247
Collections.synchronizedSortedSet(new TreeSet<Long>());
248248

249249
public Producer(Channel channel, String exchangeName, String id,
250250
List<?> flags, int txSize,
251251
long interval, int rateLimit, int minMsgSize, int timeLimit,
252-
long confirm)
252+
long confirm, Stats stats)
253253
throws IOException {
254254

255255
this.channel = channel;
@@ -267,16 +267,17 @@ public Producer(Channel channel, String exchangeName, String id,
267267
if (confirm > 0) {
268268
this.confirmPool = new Semaphore((int)confirm);
269269
}
270+
this.stats = stats;
270271
}
271272

272-
public synchronized void handleReturn(int replyCode,
273-
String replyText,
274-
String exchange,
275-
String routingKey,
276-
AMQP.BasicProperties properties,
277-
byte[] body)
273+
public void handleReturn(int replyCode,
274+
String replyText,
275+
String exchange,
276+
String routingKey,
277+
AMQP.BasicProperties properties,
278+
byte[] body)
278279
throws IOException {
279-
returnCount++;
280+
stats.handleReturn();
280281
}
281282

282283
public void handleAck(long seqNo, boolean multiple) {
@@ -298,12 +299,10 @@ private void handleAckNack(long seqNo, boolean multiple,
298299
unconfirmedSet.remove(seqNo);
299300
numConfirms = 1;
300301
}
301-
synchronized (this) {
302-
if (nack) {
303-
nackCount += numConfirms;
304-
} else {
305-
confirmCount += numConfirms;
306-
}
302+
if (nack) {
303+
stats.handleNack(numConfirms);
304+
} else {
305+
stats.handleConfirm(numConfirms);
307306
}
308307

309308
if (confirmPool != null) {
@@ -336,6 +335,7 @@ public void run() {
336335
channel.txCommit();
337336
}
338337
now = System.currentTimeMillis();
338+
stats.handleSend();
339339
}
340340

341341
} catch (IOException e) {
@@ -373,31 +373,6 @@ private void delay(long now)
373373
if (pause > 0) {
374374
Thread.sleep(pause);
375375
}
376-
if (elapsed > interval) {
377-
double sendRate, returnRate, confirmRate, nackRate;
378-
synchronized(this) {
379-
sendRate = msgCount * 1000.0 / elapsed;
380-
returnRate = returnCount * 1000.0 / elapsed;
381-
confirmRate = confirmCount * 1000.0 / elapsed;
382-
nackRate = nackCount * 1000.0 / elapsed;
383-
msgCount = 0;
384-
returnCount = 0;
385-
confirmCount = 0;
386-
nackCount = 0;
387-
}
388-
System.out.print("sending rate: " + formatRate(sendRate) + " msg/s");
389-
if (mandatory || immediate) {
390-
System.out.print(", returns: " + formatRate(returnRate) + " ret/s");
391-
}
392-
if (confirm >= 0) {
393-
System.out.print(", confirms: " + formatRate(confirmRate) + " c/s");
394-
if (nackRate > 0) {
395-
System.out.print(", nacks: " + formatRate(nackRate) + " n/s");
396-
}
397-
}
398-
System.out.println();
399-
lastStatsTime = now;
400-
}
401376
}
402377

403378
private byte[] createMessage(int sequenceNumber)
@@ -490,7 +465,7 @@ public void run() {
490465

491466
now = System.currentTimeMillis();
492467

493-
stats.collectStats(now, id.equals(envelope.getRoutingKey()) ? (nano - msgNano) : 0L);
468+
stats.handleRecv(id.equals(envelope.getRoutingKey()) ? (nano - msgNano) : 0L);
494469
}
495470

496471
} catch (IOException e) {
@@ -513,14 +488,20 @@ public void run() {
513488

514489
public static class Stats {
515490

516-
private long interval;
491+
private long interval;
517492

518-
private long lastStatsTime;
519-
private int msgCount;
520-
private int latencyCount;
521-
private long minLatency;
522-
private long maxLatency;
523-
private long cumulativeLatency;
493+
private long lastStatsTime;
494+
495+
private int sendCount;
496+
private int returnCount;
497+
private int confirmCount;
498+
private int nackCount;
499+
private int recvCount;
500+
501+
private int latencyCount;
502+
private long minLatency;
503+
private long maxLatency;
504+
private long cumulativeLatency;
524505

525506
public Stats(long interval) {
526507
this.interval = interval;
@@ -529,37 +510,84 @@ public Stats(long interval) {
529510

530511
private void reset(long t) {
531512
lastStatsTime = t;
532-
msgCount = 0;
513+
514+
sendCount = 0;
515+
returnCount = 0;
516+
confirmCount = 0;
517+
nackCount = 0;
518+
recvCount = 0;
519+
533520
latencyCount = 0;
534521
minLatency = Long.MAX_VALUE;
535522
maxLatency = Long.MIN_VALUE;
536523
cumulativeLatency = 0L;
537524
}
538525

539-
public synchronized void collectStats(long now, long latency) {
540-
msgCount++;
526+
private void report() {
527+
long now = System.currentTimeMillis();
528+
long elapsed = now - lastStatsTime;
529+
if (elapsed > interval) {
530+
double sendRate, returnRate, confirmRate, nackRate;
531+
sendRate = sendCount * 1000.0 / elapsed;
532+
returnRate = returnCount * 1000.0 / elapsed;
533+
confirmRate = confirmCount * 1000.0 / elapsed;
534+
nackRate = nackCount * 1000.0 / elapsed;
541535

536+
System.out.print("sending rate: " + formatRate(sendRate) + " msg/s");
537+
if (returnRate > 0) {
538+
System.out.print(", returns: " + formatRate(returnRate) + " ret/s");
539+
}
540+
if (confirmRate > 0) {
541+
System.out.print(", confirms: " + formatRate(confirmRate) + " c/s");
542+
}
543+
if (nackRate > 0) {
544+
System.out.print(", nacks: " + formatRate(nackRate) + " n/s");
545+
}
546+
547+
System.out.print(", recving rate: " +
548+
formatRate(1000.0 * recvCount / elapsed) +
549+
" msg/s" +
550+
(latencyCount > 0 ?
551+
", min/avg/max latency: " +
552+
minLatency/1000L + "/" +
553+
cumulativeLatency / (1000L * latencyCount) + "/" +
554+
maxLatency/1000L + " microseconds" :
555+
""));
556+
System.out.println();
557+
reset(now);
558+
}
559+
}
560+
561+
562+
public synchronized void handleSend() {
563+
sendCount++;
564+
report();
565+
}
566+
567+
public synchronized void handleReturn() {
568+
returnCount++;
569+
report();
570+
}
571+
572+
public synchronized void handleConfirm(int numConfirms) {
573+
confirmCount+=numConfirms;
574+
report();
575+
}
576+
577+
public synchronized void handleNack(int numAcks) {
578+
nackCount+=numAcks;
579+
report();
580+
}
581+
582+
public synchronized void handleRecv(long latency) {
583+
recvCount++;
542584
if (latency > 0) {
543585
minLatency = Math.min(minLatency, latency);
544586
maxLatency = Math.max(maxLatency, latency);
545587
cumulativeLatency += latency;
546588
latencyCount++;
547589
}
548-
549-
long elapsed = now - lastStatsTime;
550-
if (elapsed > interval) {
551-
System.out.println("recving rate: " +
552-
formatRate(1000.0 * msgCount / elapsed) +
553-
" msg/s" +
554-
(latencyCount > 0 ?
555-
", min/avg/max latency: " +
556-
minLatency/1000L + "/" +
557-
cumulativeLatency / (1000L * latencyCount) + "/" +
558-
maxLatency/1000L + " microseconds" :
559-
""));
560-
reset(now);
561-
}
562-
590+
report();
563591
}
564592

565593
}

0 commit comments

Comments
 (0)