Skip to content

Commit 286c30a

Browse files
author
Simon MacMullen
committed
Time-limit the consumer using the latch's await method; don't attempt to figure it out based on messages arriving.
1 parent a2dddb0 commit 286c30a

File tree

1 file changed

+9
-6
lines changed

1 file changed

+9
-6
lines changed

test/src/com/rabbitmq/examples/perf/Consumer.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.io.DataInputStream;
2727
import java.io.IOException;
2828
import java.util.concurrent.CountDownLatch;
29+
import java.util.concurrent.TimeUnit;
2930

3031
public class Consumer extends ProducerConsumerBase implements Runnable {
3132

@@ -61,7 +62,12 @@ public void run() {
6162
try {
6263
q = new ConsumerImpl(channel);
6364
channel.basicConsume(queueName, autoAck, q);
64-
latch.await();
65+
if (timeLimit == 0) {
66+
latch.await();
67+
}
68+
else {
69+
latch.await(timeLimit, TimeUnit.MILLISECONDS);
70+
}
6571

6672
} catch (IOException e) {
6773
throw new RuntimeException(e);
@@ -74,20 +80,17 @@ public void run() {
7480

7581
private class ConsumerImpl extends DefaultConsumer {
7682
long now;
77-
long startTime;
7883
int totalMsgCount = 0;
7984

8085
private ConsumerImpl(Channel channel) {
8186
super(channel);
82-
startTime = now = System.currentTimeMillis();
83-
lastStatsTime = startTime;
87+
lastStatsTime = now = System.currentTimeMillis();
8488
msgCount = 0;
8589
}
8690

8791
@Override
8892
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
89-
if ((timeLimit == 0 || now < startTime + timeLimit) &&
90-
(msgLimit == 0 || msgCount < msgLimit)) {
93+
if (msgLimit == 0 || msgCount < msgLimit) {
9194
totalMsgCount++;
9295
msgCount++;
9396

0 commit comments

Comments
 (0)