Skip to content

Commit dd71c9c

Browse files
author
Simon MacMullen
committed
stable to default
2 parents 5e9fe95 + e96477b commit dd71c9c

File tree

7 files changed

+133
-106
lines changed

7 files changed

+133
-106
lines changed

test/src/com/rabbitmq/client/test/server/LoopbackUsers.java

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -71,20 +71,19 @@ private ConnectionFactory getFactory(String name, String addr) {
7171

7272
// Find the first IP address of a network interface that is up, not loopback, not point to point (e.g. VPN thing)
7373
private static InetAddress findRealIPAddress() throws SocketException {
74-
throw new RuntimeException("this test will be enabled once we stop supporting Java 1.5");
75-
// Enumeration<NetworkInterface> ifs = NetworkInterface.getNetworkInterfaces();
76-
// while (ifs.hasMoreElements()) {
77-
// NetworkInterface nif = ifs.nextElement();
78-
// if (nif.isUp() && !nif.isPointToPoint() && !nif.isLoopback() && !nif.isVirtual()) {
79-
// Enumeration<InetAddress> addrs = nif.getInetAddresses();
80-
// while (addrs.hasMoreElements()) {
81-
// InetAddress addr = addrs.nextElement();
82-
// if (addr instanceof Inet4Address) {
83-
// return addr;
84-
// }
85-
// }
86-
// }
87-
// }
88-
// throw new RuntimeException("Could not determine real network address");
74+
Enumeration<NetworkInterface> ifs = NetworkInterface.getNetworkInterfaces();
75+
while (ifs.hasMoreElements()) {
76+
NetworkInterface nif = ifs.nextElement();
77+
if (nif.isUp() && !nif.isPointToPoint() && !nif.isLoopback() && !nif.isVirtual()) {
78+
Enumeration<InetAddress> addrs = nif.getInetAddresses();
79+
while (addrs.hasMoreElements()) {
80+
InetAddress addr = addrs.nextElement();
81+
if (addr instanceof Inet4Address) {
82+
return addr;
83+
}
84+
}
85+
}
86+
}
87+
throw new RuntimeException("Could not determine real network address");
8988
}
9089
}

test/src/com/rabbitmq/client/test/server/ServerTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,6 @@ public static void add(TestSuite suite) {
4242
suite.addTestSuite(Shutdown.class);
4343
suite.addTestSuite(BlockedConnection.class);
4444
suite.addTestSuite(ChannelLimitNegotiation.class);
45-
//suite.addTestSuite(LoopbackUsers.class);
45+
suite.addTestSuite(LoopbackUsers.class);
4646
}
4747
}

test/src/com/rabbitmq/examples/PerfTest.java

Lines changed: 30 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -45,31 +45,32 @@ public static void main(String[] args) {
4545
System.exit(0);
4646
}
4747

48-
String exchangeType = strArg(cmd, 't', "direct");
49-
String exchangeName = strArg(cmd, 'e', exchangeType);
50-
String queueName = strArg(cmd, 'u', "");
51-
String routingKey = strArg(cmd, 'k', null);
52-
int samplingInterval = intArg(cmd, 'i', 1);
53-
float rateLimit = floatArg(cmd, 'r', 0.0f);
54-
int producerCount = intArg(cmd, 'x', 1);
55-
int consumerCount = intArg(cmd, 'y', 1);
56-
int producerTxSize = intArg(cmd, 'm', 0);
57-
int consumerTxSize = intArg(cmd, 'n', 0);
58-
long confirm = intArg(cmd, 'c', -1);
59-
boolean autoAck = cmd.hasOption('a');
60-
int multiAckEvery = intArg(cmd, 'A', 0);
61-
int channelPrefetch = intArg(cmd, 'Q', 0);
62-
int consumerPrefetch = intArg(cmd, 'q', 0);
63-
int minMsgSize = intArg(cmd, 's', 0);
64-
int timeLimit = intArg(cmd, 'z', 0);
65-
int producerMsgCount = intArg(cmd, 'C', 0);
66-
int consumerMsgCount = intArg(cmd, 'D', 0);
67-
List<?> flags = lstArg(cmd, 'f');
68-
int frameMax = intArg(cmd, 'M', 0);
69-
int heartbeat = intArg(cmd, 'b', 0);
70-
boolean predeclared = cmd.hasOption('p');
71-
72-
String uri = strArg(cmd, 'h', "amqp://localhost");
48+
String exchangeType = strArg(cmd, 't', "direct");
49+
String exchangeName = strArg(cmd, 'e', exchangeType);
50+
String queueName = strArg(cmd, 'u', "");
51+
String routingKey = strArg(cmd, 'k', null);
52+
int samplingInterval = intArg(cmd, 'i', 1);
53+
float producerRateLimit = floatArg(cmd, 'r', 0.0f);
54+
float consumerRateLimit = floatArg(cmd, 'R', 0.0f);
55+
int producerCount = intArg(cmd, 'x', 1);
56+
int consumerCount = intArg(cmd, 'y', 1);
57+
int producerTxSize = intArg(cmd, 'm', 0);
58+
int consumerTxSize = intArg(cmd, 'n', 0);
59+
long confirm = intArg(cmd, 'c', -1);
60+
boolean autoAck = cmd.hasOption('a');
61+
int multiAckEvery = intArg(cmd, 'A', 0);
62+
int channelPrefetch = intArg(cmd, 'Q', 0);
63+
int consumerPrefetch = intArg(cmd, 'q', 0);
64+
int minMsgSize = intArg(cmd, 's', 0);
65+
int timeLimit = intArg(cmd, 'z', 0);
66+
int producerMsgCount = intArg(cmd, 'C', 0);
67+
int consumerMsgCount = intArg(cmd, 'D', 0);
68+
List<?> flags = lstArg(cmd, 'f');
69+
int frameMax = intArg(cmd, 'M', 0);
70+
int heartbeat = intArg(cmd, 'b', 0);
71+
boolean predeclared = cmd.hasOption('p');
72+
73+
String uri = strArg(cmd, 'h', "amqp://localhost");
7374

7475
//setup
7576
PrintlnStats stats = new PrintlnStats(1000L * samplingInterval,
@@ -91,6 +92,7 @@ public static void main(String[] args) {
9192
p.setConfirm( confirm);
9293
p.setConsumerCount( consumerCount);
9394
p.setConsumerMsgCount( consumerMsgCount);
95+
p.setConsumerRateLimit(consumerRateLimit);
9496
p.setConsumerTxSize( consumerTxSize);
9597
p.setExchangeName( exchangeName);
9698
p.setExchangeType( exchangeType);
@@ -105,7 +107,7 @@ public static void main(String[] args) {
105107
p.setProducerTxSize( producerTxSize);
106108
p.setQueueName( queueName);
107109
p.setRoutingKey( routingKey);
108-
p.setRateLimit( rateLimit);
110+
p.setProducerRateLimit(producerRateLimit);
109111
p.setTimeLimit( timeLimit);
110112

111113
MulticastSet set = new MulticastSet(stats, factory, p);
@@ -137,7 +139,8 @@ private static Options getOptions() {
137139
options.addOption(new Option("u", "queue", true, "queue name"));
138140
options.addOption(new Option("k", "routingKey", true, "routing key"));
139141
options.addOption(new Option("i", "interval", true, "sampling interval"));
140-
options.addOption(new Option("r", "rate", true, "rate limit"));
142+
options.addOption(new Option("r", "rate", true, "producer rate limit"));
143+
options.addOption(new Option("R", "consumerRate", true, "consumer rate limit"));
141144
options.addOption(new Option("x", "producers", true, "producer count"));
142145
options.addOption(new Option("y", "consumers", true, "consumer count"));
143146
options.addOption(new Option("m", "ptxsize", true, "producer tx size"));

test/src/com/rabbitmq/examples/perf/Consumer.java

Lines changed: 45 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -16,20 +16,21 @@
1616

1717
package com.rabbitmq.examples.perf;
1818

19+
import com.rabbitmq.client.AMQP.BasicProperties;
1920
import com.rabbitmq.client.Channel;
20-
import com.rabbitmq.client.ConsumerCancelledException;
21+
import com.rabbitmq.client.DefaultConsumer;
2122
import com.rabbitmq.client.Envelope;
22-
import com.rabbitmq.client.QueueingConsumer;
2323
import com.rabbitmq.client.ShutdownSignalException;
2424

2525
import java.io.ByteArrayInputStream;
2626
import java.io.DataInputStream;
2727
import java.io.IOException;
28+
import java.util.concurrent.CountDownLatch;
2829

29-
public class Consumer implements Runnable {
30+
public class Consumer extends ProducerConsumerBase implements Runnable {
3031

31-
private QueueingConsumer q;
32-
private Channel channel;
32+
private ConsumerImpl q;
33+
private Channel channel;
3334
private String id;
3435
private String queueName;
3536
private int txSize;
@@ -38,14 +39,16 @@ public class Consumer implements Runnable {
3839
private Stats stats;
3940
private int msgLimit;
4041
private long timeLimit;
42+
private CountDownLatch latch = new CountDownLatch(1);
4143

4244
public Consumer(Channel channel, String id,
4345
String queueName, int txSize, boolean autoAck,
44-
int multiAckEvery, Stats stats, int msgLimit, int timeLimit) {
46+
int multiAckEvery, Stats stats, float rateLimit, int msgLimit, int timeLimit) {
4547

4648
this.channel = channel;
4749
this.id = id;
4850
this.queueName = queueName;
51+
this.rateLimit = rateLimit;
4952
this.txSize = txSize;
5053
this.autoAck = autoAck;
5154
this.multiAckEvery = multiAckEvery;
@@ -55,40 +58,44 @@ public Consumer(Channel channel, String id,
5558
}
5659

5760
public void run() {
61+
try {
62+
q = new ConsumerImpl(channel);
63+
channel.basicConsume(queueName, autoAck, q);
64+
latch.await();
65+
66+
} catch (IOException e) {
67+
throw new RuntimeException(e);
68+
} catch (InterruptedException e) {
69+
throw new RuntimeException(e);
70+
} catch (ShutdownSignalException e) {
71+
throw new RuntimeException(e);
72+
}
73+
}
74+
75+
private class ConsumerImpl extends DefaultConsumer {
5876
long now;
5977
long startTime;
60-
startTime = now = System.currentTimeMillis();
6178
int totalMsgCount = 0;
6279

63-
try {
64-
q = new QueueingConsumer(channel);
65-
channel.basicConsume(queueName, autoAck, q);
80+
private ConsumerImpl(Channel channel) {
81+
super(channel);
82+
startTime = now = System.currentTimeMillis();
83+
lastStatsTime = startTime;
84+
msgCount = 0;
85+
}
6686

67-
while ((timeLimit == 0 || now < startTime + timeLimit) &&
68-
(msgLimit == 0 || totalMsgCount < msgLimit)) {
69-
QueueingConsumer.Delivery delivery;
70-
try {
71-
if (timeLimit == 0) {
72-
delivery = q.nextDelivery();
73-
} else {
74-
delivery = q.nextDelivery(startTime + timeLimit - now);
75-
if (delivery == null) break;
76-
}
77-
} catch (ConsumerCancelledException e) {
78-
System.out.println("Consumer cancelled by broker. Re-consuming.");
79-
q = new QueueingConsumer(channel);
80-
channel.basicConsume(queueName, autoAck, q);
81-
continue;
82-
}
87+
@Override
88+
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
89+
if ((timeLimit == 0 || now < startTime + timeLimit) &&
90+
(msgLimit == 0 || msgCount < msgLimit)) {
8391
totalMsgCount++;
92+
msgCount++;
8493

85-
DataInputStream d = new DataInputStream(new ByteArrayInputStream(delivery.getBody()));
94+
DataInputStream d = new DataInputStream(new ByteArrayInputStream(body));
8695
d.readInt();
8796
long msgNano = d.readLong();
8897
long nano = System.nanoTime();
8998

90-
Envelope envelope = delivery.getEnvelope();
91-
9299
if (!autoAck) {
93100
if (multiAckEvery == 0) {
94101
channel.basicAck(envelope.getDeliveryTag(), false);
@@ -104,14 +111,17 @@ public void run() {
104111
now = System.currentTimeMillis();
105112

106113
stats.handleRecv(id.equals(envelope.getRoutingKey()) ? (nano - msgNano) : 0L);
114+
delay(now);
115+
}
116+
else {
117+
latch.countDown();
107118
}
119+
}
108120

109-
} catch (IOException e) {
110-
throw new RuntimeException(e);
111-
} catch (InterruptedException e) {
112-
throw new RuntimeException (e);
113-
} catch (ShutdownSignalException e) {
114-
throw new RuntimeException(e);
121+
@Override
122+
public void handleCancel(String consumerTag) throws IOException {
123+
System.out.println("Consumer cancelled by broker. Re-consuming.");
124+
channel.basicConsume(queueName, autoAck, q);
115125
}
116126
}
117127
}

test/src/com/rabbitmq/examples/perf/MulticastParams.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818

1919
import com.rabbitmq.client.AMQP;
2020
import com.rabbitmq.client.Channel;
21-
import com.rabbitmq.client.Command;
2221
import com.rabbitmq.client.Connection;
2322
import com.rabbitmq.client.ShutdownSignalException;
2423

@@ -37,7 +36,8 @@ public class MulticastParams {
3736
private int minMsgSize = 0;
3837

3938
private int timeLimit = 0;
40-
private float rateLimit = 0;
39+
private float producerRateLimit = 0;
40+
private float consumerRateLimit = 0;
4141
private int producerMsgCount = 0;
4242
private int consumerMsgCount = 0;
4343

@@ -70,14 +70,18 @@ public void setRoutingKey(String routingKey) {
7070
this.routingKey = routingKey;
7171
}
7272

73-
public void setRateLimit(float rateLimit) {
74-
this.rateLimit = rateLimit;
73+
public void setProducerRateLimit(float producerRateLimit) {
74+
this.producerRateLimit = producerRateLimit;
7575
}
7676

7777
public void setProducerCount(int producerCount) {
7878
this.producerCount = producerCount;
7979
}
8080

81+
public void setConsumerRateLimit(float consumerRateLimit) {
82+
this.consumerRateLimit = consumerRateLimit;
83+
}
84+
8185
public void setConsumerCount(int consumerCount) {
8286
this.consumerCount = consumerCount;
8387
}
@@ -168,7 +172,7 @@ public Producer createProducer(Connection connection, Stats stats, String id) th
168172
}
169173
final Producer producer = new Producer(channel, exchangeName, id,
170174
flags, producerTxSize,
171-
rateLimit, producerMsgCount,
175+
producerRateLimit, producerMsgCount,
172176
minMsgSize, timeLimit,
173177
confirm, stats);
174178
channel.addReturnListener(producer);
@@ -184,7 +188,7 @@ public Consumer createConsumer(Connection connection, Stats stats, String id) th
184188
if (channelPrefetch > 0) channel.basicQos(channelPrefetch, true);
185189
return new Consumer(channel, id, qName,
186190
consumerTxSize, autoAck, multiAckEvery,
187-
stats, consumerMsgCount, timeLimit);
191+
stats, consumerRateLimit, consumerMsgCount, timeLimit);
188192
}
189193

190194
public boolean shouldConfigureQueue() {

test/src/com/rabbitmq/examples/perf/Producer.java

Lines changed: 5 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
import java.util.TreeSet;
3232
import java.util.concurrent.Semaphore;
3333

34-
public class Producer implements Runnable, ReturnListener,
34+
public class Producer extends ProducerConsumerBase implements Runnable, ReturnListener,
3535
ConfirmListener
3636
{
3737
private Channel channel;
@@ -41,18 +41,13 @@ public class Producer implements Runnable, ReturnListener,
4141
private boolean immediate;
4242
private boolean persistent;
4343
private int txSize;
44-
private float rateLimit;
4544
private int msgLimit;
4645
private long timeLimit;
4746

4847
private Stats stats;
4948

5049
private byte[] message;
5150

52-
private long startTime;
53-
private long lastStatsTime;
54-
private int msgCount;
55-
5651
private Semaphore confirmPool;
5752
private volatile SortedSet<Long> unconfirmedSet =
5853
Collections.synchronizedSortedSet(new TreeSet<Long>());
@@ -124,7 +119,10 @@ private void handleAckNack(long seqNo, boolean multiple,
124119
}
125120

126121
public void run() {
127-
long now = startTime = lastStatsTime = System.currentTimeMillis();
122+
long now;
123+
long startTime;
124+
startTime = now = System.currentTimeMillis();
125+
lastStatsTime = startTime;
128126
msgCount = 0;
129127
int totalMsgCount = 0;
130128

@@ -164,21 +162,6 @@ private void publish(byte[] msg)
164162
msg);
165163
}
166164

167-
private void delay(long now)
168-
throws InterruptedException {
169-
170-
long elapsed = now - lastStatsTime;
171-
//example: rateLimit is 5000 msg/s,
172-
//10 ms have elapsed, we have sent 200 messages
173-
//the 200 msgs we have actually sent should have taken us
174-
//200 * 1000 / 5000 = 40 ms. So we pause for 40ms - 10ms
175-
long pause = (long) (rateLimit == 0.0f ?
176-
0.0f : (msgCount * 1000.0 / rateLimit - elapsed));
177-
if (pause > 0) {
178-
Thread.sleep(pause);
179-
}
180-
}
181-
182165
private byte[] createMessage(int sequenceNumber)
183166
throws IOException {
184167

0 commit comments

Comments
 (0)