Skip to content

Commit af85acf

Browse files
author
Alvaro Videla
committed
adds random routing key parameter to PerfTest
1 parent dd71c9c commit af85acf

File tree

3 files changed

+18
-3
lines changed

3 files changed

+18
-3
lines changed

test/src/com/rabbitmq/examples/PerfTest.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ public static void main(String[] args) {
4949
String exchangeName = strArg(cmd, 'e', exchangeType);
5050
String queueName = strArg(cmd, 'u', "");
5151
String routingKey = strArg(cmd, 'k', null);
52+
boolean randomRKey = cmd.hasOption('K');
5253
int samplingInterval = intArg(cmd, 'i', 1);
5354
float producerRateLimit = floatArg(cmd, 'r', 0.0f);
5455
float consumerRateLimit = floatArg(cmd, 'R', 0.0f);
@@ -107,6 +108,7 @@ public static void main(String[] args) {
107108
p.setProducerTxSize( producerTxSize);
108109
p.setQueueName( queueName);
109110
p.setRoutingKey( routingKey);
111+
p.setRandomRKey( randomRKey);
110112
p.setProducerRateLimit(producerRateLimit);
111113
p.setTimeLimit( timeLimit);
112114

@@ -138,6 +140,7 @@ private static Options getOptions() {
138140
options.addOption(new Option("e", "exchange", true, "exchange name"));
139141
options.addOption(new Option("u", "queue", true, "queue name"));
140142
options.addOption(new Option("k", "routingKey", true, "routing key"));
143+
options.addOption(new Option("K", "randomRKey", false,"use random routing key per message"));
141144
options.addOption(new Option("i", "interval", true, "sampling interval"));
142145
options.addOption(new Option("r", "rate", true, "producer rate limit"));
143146
options.addOption(new Option("R", "consumerRate", true, "consumer rate limit"));

test/src/com/rabbitmq/examples/perf/MulticastParams.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ public class MulticastParams {
4545
private String exchangeType = "direct";
4646
private String queueName = "";
4747
private String routingKey = null;
48+
private boolean randomRKey = false;
4849

4950
private List<?> flags = new ArrayList<Object>();
5051

@@ -70,6 +71,10 @@ public void setRoutingKey(String routingKey) {
7071
this.routingKey = routingKey;
7172
}
7273

74+
public void setRandomRKey(boolean randomRKey) {
75+
this.randomRKey = randomRKey;
76+
}
77+
7378
public void setProducerRateLimit(float producerRateLimit) {
7479
this.producerRateLimit = producerRateLimit;
7580
}
@@ -163,6 +168,10 @@ public String getRoutingKey() {
163168
return routingKey;
164169
}
165170

171+
public boolean getRandomRKey() {
172+
return randomRKey;
173+
}
174+
166175
public Producer createProducer(Connection connection, Stats stats, String id) throws IOException {
167176
Channel channel = connection.createChannel();
168177
if (producerTxSize > 0) channel.txSelect();
@@ -171,7 +180,7 @@ public Producer createProducer(Connection connection, Stats stats, String id) th
171180
channel.exchangeDeclare(exchangeName, exchangeType);
172181
}
173182
final Producer producer = new Producer(channel, exchangeName, id,
174-
flags, producerTxSize,
183+
randomRKey, flags, producerTxSize,
175184
producerRateLimit, producerMsgCount,
176185
minMsgSize, timeLimit,
177186
confirm, stats);

test/src/com/rabbitmq/examples/perf/Producer.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import java.util.List;
3030
import java.util.SortedSet;
3131
import java.util.TreeSet;
32+
import java.util.UUID;
3233
import java.util.concurrent.Semaphore;
3334

3435
public class Producer extends ProducerConsumerBase implements Runnable, ReturnListener,
@@ -37,6 +38,7 @@ public class Producer extends ProducerConsumerBase implements Runnable, ReturnLi
3738
private Channel channel;
3839
private String exchangeName;
3940
private String id;
41+
private boolean randomRKey;
4042
private boolean mandatory;
4143
private boolean immediate;
4244
private boolean persistent;
@@ -52,7 +54,7 @@ public class Producer extends ProducerConsumerBase implements Runnable, ReturnLi
5254
private volatile SortedSet<Long> unconfirmedSet =
5355
Collections.synchronizedSortedSet(new TreeSet<Long>());
5456

55-
public Producer(Channel channel, String exchangeName, String id,
57+
public Producer(Channel channel, String exchangeName, String id, boolean randomRKey,
5658
List<?> flags, int txSize,
5759
float rateLimit, int msgLimit, int minMsgSize, int timeLimit,
5860
long confirm, Stats stats)
@@ -61,6 +63,7 @@ public Producer(Channel channel, String exchangeName, String id,
6163
this.channel = channel;
6264
this.exchangeName = exchangeName;
6365
this.id = id;
66+
this.randomRKey = randomRKey;
6467
this.mandatory = flags.contains("mandatory");
6568
this.immediate = flags.contains("immediate");
6669
this.persistent = flags.contains("persistent");
@@ -156,7 +159,7 @@ private void publish(byte[] msg)
156159
throws IOException {
157160

158161
unconfirmedSet.add(channel.getNextPublishSeqNo());
159-
channel.basicPublish(exchangeName, id,
162+
channel.basicPublish(exchangeName, randomRKey ? UUID.randomUUID().toString() : id,
160163
mandatory, immediate,
161164
persistent ? MessageProperties.MINIMAL_PERSISTENT_BASIC : MessageProperties.MINIMAL_BASIC,
162165
msg);

0 commit comments

Comments
 (0)