Skip to content

Commit bdb8b31

Browse files
author
Simon MacMullen
committed
Update PerfTest for new qos regime, and add a convenience method for per-channel prefetch.
1 parent 891c7ce commit bdb8b31

File tree

5 files changed

+31
-22
lines changed

5 files changed

+31
-22
lines changed

src/com/rabbitmq/client/Channel.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -222,11 +222,24 @@ public interface Channel extends ShutdownNotifier {
222222
* @param prefetchCount maximum number of messages that the server
223223
* will deliver, 0 if unlimited
224224
* @param global true if the settings should be applied to the
225-
* entire connection rather than just the current channel
225+
* entire channel rather than each consumer
226226
* @throws java.io.IOException if an error is encountered
227227
*/
228228
void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;
229229

230+
/**
231+
* Request a specific prefetchCount "quality of service" settings
232+
* for this channel.
233+
*
234+
* @see #basicQos(int, int, boolean)
235+
* @param prefetchCount maximum number of messages that the server
236+
* will deliver, 0 if unlimited
237+
* @param global true if the settings should be applied to the
238+
* entire channel rather than each consumer
239+
* @throws java.io.IOException if an error is encountered
240+
*/
241+
void basicQos(int prefetchCount, boolean global) throws IOException;
242+
230243
/**
231244
* Request a specific prefetchCount "quality of service" settings
232245
* for this channel.

src/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -595,6 +595,13 @@ public void basicQos(int prefetchSize, int prefetchCount, boolean global)
595595
exnWrappingRpc(new Basic.Qos(prefetchSize, prefetchCount, global));
596596
}
597597

598+
/** Public API - {@inheritDoc} */
599+
public void basicQos(int prefetchCount, boolean global)
600+
throws IOException
601+
{
602+
basicQos(0, prefetchCount, global);
603+
}
604+
598605
/** Public API - {@inheritDoc} */
599606
public void basicQos(int prefetchCount)
600607
throws IOException

test/src/com/rabbitmq/examples/PerfTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,8 @@ public static void main(String[] args) {
5858
long confirm = intArg(cmd, 'c', -1);
5959
boolean autoAck = cmd.hasOption('a');
6060
int multiAckEvery = intArg(cmd, 'A', 0);
61-
int channelPrefetch = intArg(cmd, 'q', 0);
62-
int consumerPrefetch = intArg(cmd, 'P', 0);
61+
int channelPrefetch = intArg(cmd, 'G', 0);
62+
int consumerPrefetch = intArg(cmd, 'q', 0);
6363
int minMsgSize = intArg(cmd, 's', 0);
6464
int timeLimit = intArg(cmd, 'z', 0);
6565
int producerMsgCount = intArg(cmd, 'C', 0);
@@ -148,8 +148,8 @@ private static Options getOptions() {
148148
options.addOption(new Option("c", "confirm", true, "max unconfirmed publishes"));
149149
options.addOption(new Option("a", "autoack", false,"auto ack"));
150150
options.addOption(new Option("A", "multiAckEvery", true, "multi ack every"));
151-
options.addOption(new Option("q", "qos", true, "qos (channel) prefetch count"));
152-
options.addOption(new Option("P", "prefetch", true, "consumer prefetch count"));
151+
options.addOption(new Option("q", "qos", true, "consumer prefetch count"));
152+
options.addOption(new Option("G", "globalQos", true, "channel prefetch count"));
153153
options.addOption(new Option("s", "size", true, "message size"));
154154
options.addOption(new Option("z", "time", true, "time limit"));
155155
options.addOption(new Option("C", "pmessages", true, "producer message count"));

test/src/com/rabbitmq/examples/perf/Consumer.java

Lines changed: 3 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -25,16 +25,13 @@
2525
import java.io.ByteArrayInputStream;
2626
import java.io.DataInputStream;
2727
import java.io.IOException;
28-
import java.util.HashMap;
29-
import java.util.Map;
3028

3129
public class Consumer implements Runnable {
3230

3331
private QueueingConsumer q;
3432
private Channel channel;
3533
private String id;
3634
private String queueName;
37-
private int prefetch;
3835
private int txSize;
3936
private boolean autoAck;
4037
private int multiAckEvery;
@@ -43,12 +40,11 @@ public class Consumer implements Runnable {
4340
private long timeLimit;
4441

4542
public Consumer(Channel channel, String id,
46-
String queueName, int prefetch, int txSize, boolean autoAck,
43+
String queueName, int txSize, boolean autoAck,
4744
int multiAckEvery, Stats stats, int msgLimit, int timeLimit) {
4845

4946
this.channel = channel;
5047
this.id = id;
51-
this.prefetch = prefetch;
5248
this.queueName = queueName;
5349
this.txSize = txSize;
5450
this.autoAck = autoAck;
@@ -66,7 +62,7 @@ public void run() {
6662

6763
try {
6864
q = new QueueingConsumer(channel);
69-
channel.basicConsume(queueName, autoAck, args(prefetch), q);
65+
channel.basicConsume(queueName, autoAck, q);
7066

7167
while ((timeLimit == 0 || now < startTime + timeLimit) &&
7268
(msgLimit == 0 || totalMsgCount < msgLimit)) {
@@ -81,7 +77,7 @@ public void run() {
8177
} catch (ConsumerCancelledException e) {
8278
System.out.println("Consumer cancelled by broker. Re-consuming.");
8379
q = new QueueingConsumer(channel);
84-
channel.basicConsume(queueName, autoAck, args(prefetch), q);
80+
channel.basicConsume(queueName, autoAck, q);
8581
continue;
8682
}
8783
totalMsgCount++;
@@ -118,12 +114,4 @@ public void run() {
118114
throw new RuntimeException(e);
119115
}
120116
}
121-
122-
private Map<String, Object> args(int prefetch) {
123-
Map<String, Object> a = new HashMap<String, Object>();
124-
if (prefetch != 0) {
125-
a.put("x-prefetch", prefetch);
126-
}
127-
return a;
128-
}
129117
}

test/src/com/rabbitmq/examples/perf/MulticastParams.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -185,8 +185,9 @@ public Consumer createConsumer(Connection connection, Stats stats, String id) th
185185
Channel channel = connection.createChannel();
186186
if (consumerTxSize > 0) channel.txSelect();
187187
String qName = configureQueue(connection, id);
188-
if (channelPrefetch > 0) channel.basicQos(channelPrefetch);
189-
return new Consumer(channel, id, qName, consumerPrefetch,
188+
if (consumerPrefetch > 0) channel.basicQos(consumerPrefetch);
189+
if (channelPrefetch > 0) channel.basicQos(channelPrefetch, true);
190+
return new Consumer(channel, id, qName,
190191
consumerTxSize, autoAck, multiAckEvery,
191192
stats, consumerMsgCount, timeLimit);
192193
}

0 commit comments

Comments
 (0)