Skip to content

Commit f8beb69

Browse files
author
Simon MacMullen
committed
Support x-prefetch
1 parent 3dc7d79 commit f8beb69

File tree

3 files changed

+30
-10
lines changed

3 files changed

+30
-10
lines changed

test/src/com/rabbitmq/examples/PerfTest.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,8 @@ public static void main(String[] args) {
5858
long confirm = intArg(cmd, 'c', -1);
5959
boolean autoAck = cmd.hasOption('a');
6060
int multiAckEvery = intArg(cmd, 'A', 0);
61-
int prefetchCount = intArg(cmd, 'q', 0);
61+
int channelPrefetch = intArg(cmd, 'q', 0);
62+
int consumerPrefetch = intArg(cmd, 'P', 0);
6263
int minMsgSize = intArg(cmd, 's', 0);
6364
int timeLimit = intArg(cmd, 'z', 0);
6465
int producerMsgCount = intArg(cmd, 'C', 0);
@@ -100,7 +101,8 @@ public static void main(String[] args) {
100101
p.setMultiAckEvery( multiAckEvery);
101102
p.setMinMsgSize( minMsgSize);
102103
p.setPredeclared( predeclared);
103-
p.setPrefetchCount( prefetchCount);
104+
p.setConsumerPrefetch( consumerPrefetch);
105+
p.setChannelPrefetch( channelPrefetch);
104106
p.setProducerCount( producerCount);
105107
p.setProducerMsgCount( producerMsgCount);
106108
p.setProducerTxSize( producerTxSize);
@@ -146,7 +148,8 @@ private static Options getOptions() {
146148
options.addOption(new Option("c", "confirm", true, "max unconfirmed publishes"));
147149
options.addOption(new Option("a", "autoack", false,"auto ack"));
148150
options.addOption(new Option("A", "multiAckEvery", true, "multi ack every"));
149-
options.addOption(new Option("q", "qos", true, "qos prefetch count"));
151+
options.addOption(new Option("q", "qos", true, "qos (channel) prefetch count"));
152+
options.addOption(new Option("P", "prefetch", true, "consumer prefetch count"));
150153
options.addOption(new Option("s", "size", true, "message size"));
151154
options.addOption(new Option("z", "time", true, "time limit"));
152155
options.addOption(new Option("C", "pmessages", true, "producer message count"));

test/src/com/rabbitmq/examples/perf/Consumer.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,16 @@
2525
import java.io.ByteArrayInputStream;
2626
import java.io.DataInputStream;
2727
import java.io.IOException;
28+
import java.util.HashMap;
29+
import java.util.Map;
2830

2931
public class Consumer implements Runnable {
3032

3133
private QueueingConsumer q;
3234
private Channel channel;
3335
private String id;
3436
private String queueName;
37+
private int prefetch;
3538
private int txSize;
3639
private boolean autoAck;
3740
private int multiAckEvery;
@@ -40,11 +43,12 @@ public class Consumer implements Runnable {
4043
private long timeLimit;
4144

4245
public Consumer(Channel channel, String id,
43-
String queueName, int txSize, boolean autoAck,
46+
String queueName, int prefetch, int txSize, boolean autoAck,
4447
int multiAckEvery, Stats stats, int msgLimit, int timeLimit) {
4548

4649
this.channel = channel;
4750
this.id = id;
51+
this.prefetch = prefetch;
4852
this.queueName = queueName;
4953
this.txSize = txSize;
5054
this.autoAck = autoAck;
@@ -77,7 +81,7 @@ public void run() {
7781
} catch (ConsumerCancelledException e) {
7882
System.out.println("Consumer cancelled by broker. Re-consuming.");
7983
q = new QueueingConsumer(channel);
80-
channel.basicConsume(queueName, autoAck, q);
84+
channel.basicConsume(queueName, autoAck, "consumer-tag", false, false, args(prefetch), q);
8185
continue;
8286
}
8387
totalMsgCount++;
@@ -114,4 +118,12 @@ public void run() {
114118
throw new RuntimeException(e);
115119
}
116120
}
121+
122+
private Map args(int prefetch) {
123+
Map a = new HashMap();
124+
if (prefetch != 0) {
125+
a.put("x-prefetch", prefetch);
126+
}
127+
return a;
128+
}
117129
}

test/src/com/rabbitmq/examples/perf/MulticastParams.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@ public class MulticastParams {
3232
private int producerCount = 1;
3333
private int consumerTxSize = 0;
3434
private int producerTxSize = 0;
35-
private int prefetchCount = 0;
35+
private int channelPrefetch = 0;
36+
private int consumerPrefetch = 0;
3637
private int minMsgSize = 0;
3738

3839
private int timeLimit = 0;
@@ -102,8 +103,12 @@ public void setMultiAckEvery(int multiAckEvery) {
102103
this.multiAckEvery = multiAckEvery;
103104
}
104105

105-
public void setPrefetchCount(int prefetchCount) {
106-
this.prefetchCount = prefetchCount;
106+
public void setChannelPrefetch(int channelPrefetch) {
107+
this.channelPrefetch = channelPrefetch;
108+
}
109+
110+
public void setConsumerPrefetch(int consumerPrefetch) {
111+
this.consumerPrefetch = consumerPrefetch;
107112
}
108113

109114
public void setMinMsgSize(int minMsgSize) {
@@ -180,8 +185,8 @@ public Consumer createConsumer(Connection connection, Stats stats, String id) th
180185
Channel channel = connection.createChannel();
181186
if (consumerTxSize > 0) channel.txSelect();
182187
String qName = configureQueue(connection, id);
183-
if (prefetchCount > 0) channel.basicQos(prefetchCount);
184-
return new Consumer(channel, id, qName,
188+
if (channelPrefetch > 0) channel.basicQos(channelPrefetch);
189+
return new Consumer(channel, id, qName, consumerPrefetch,
185190
consumerTxSize, autoAck, multiAckEvery,
186191
stats, consumerMsgCount, timeLimit);
187192
}

0 commit comments

Comments
 (0)