Skip to content

Commit cbeb7ac

Browse files
author
Matthias Radestock
committed
add scalability tests for qos
1 parent bbd2107 commit cbeb7ac

File tree

1 file changed

+161
-0
lines changed

1 file changed

+161
-0
lines changed
Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
// The contents of this file are subject to the Mozilla Public License
2+
// Version 1.1 (the "License"); you may not use this file except in
3+
// compliance with the License. You may obtain a copy of the License at
4+
// http://www.mozilla.org/MPL/
5+
//
6+
// Software distributed under the License is distributed on an "AS IS"
7+
// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
8+
// License for the specific language governing rights and limitations
9+
// under the License.
10+
//
11+
// The Original Code is RabbitMQ.
12+
//
13+
// The Initial Developers of the Original Code are LShift Ltd,
14+
// Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
15+
//
16+
// Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
17+
// Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
18+
// are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
19+
// Technologies LLC, and Rabbit Technologies Ltd.
20+
//
21+
// Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
22+
// Ltd. Portions created by Cohesive Financial Technologies LLC are
23+
// Copyright (C) 2007-2009 Cohesive Financial Technologies
24+
// LLC. Portions created by Rabbit Technologies Ltd are Copyright
25+
// (C) 2007-2009 Rabbit Technologies Ltd.
26+
//
27+
// All Rights Reserved.
28+
//
29+
// Contributor(s): ______________________________________.
30+
//
31+
32+
package com.rabbitmq.client.test.performance;
33+
34+
import com.rabbitmq.client.Channel;
35+
import com.rabbitmq.client.Connection;
36+
import com.rabbitmq.client.ConnectionFactory;
37+
import com.rabbitmq.client.QueueingConsumer;
38+
39+
import org.apache.commons.cli.CommandLine;
40+
import org.apache.commons.cli.Option;
41+
42+
import java.io.IOException;
43+
import java.util.List;
44+
import java.util.ArrayList;
45+
46+
public class QosScaling {
47+
48+
protected static class Parameters {
49+
String host;
50+
int port;
51+
int messageCount;
52+
int queueCount;
53+
int emptyCount;
54+
55+
public static CommandLine parseCommandLine(String[] args) {
56+
CLIHelper helper = CLIHelper.defaultHelper();
57+
helper.addOption(new Option("n", "messages", true, "number of messages to send"));
58+
helper.addOption(new Option("q", "queues", true, "number of queues to route messages to"));
59+
helper.addOption(new Option("e", "empty", true, "number of queues to leave empty"));
60+
61+
return helper.parseCommandLine(args);
62+
}
63+
64+
public Parameters(CommandLine cmd) {
65+
host = cmd.getOptionValue("h", "localhost");
66+
port = CLIHelper.getOptionValue(cmd, "p", 5672);
67+
messageCount = CLIHelper.getOptionValue(cmd, "n", 2000);
68+
queueCount = CLIHelper.getOptionValue(cmd, "q", 100);
69+
emptyCount = CLIHelper.getOptionValue(cmd, "e", 0);
70+
}
71+
72+
public String toString() {
73+
StringBuilder b = new StringBuilder();
74+
b.append("host=" + host);
75+
b.append(",port=" + port);
76+
b.append(",messages=" + messageCount);
77+
b.append(",queues=" + queueCount);
78+
b.append(",empty=" + emptyCount);
79+
return b.toString();
80+
}
81+
82+
}
83+
84+
protected final Parameters params;
85+
protected final ConnectionFactory connectionFactory =
86+
new ConnectionFactory();
87+
protected Connection connection;
88+
protected Channel channel;
89+
90+
public QosScaling(Parameters p) {
91+
params = p;
92+
}
93+
94+
protected List<String> consume(QueueingConsumer c) throws IOException {
95+
for (int i = 0; i < params.emptyCount; i++) {
96+
String queue = channel.queueDeclare().getQueue();
97+
channel.basicConsume(queue, false, c);
98+
}
99+
List<String> queues = new ArrayList<String>();
100+
for (int i = 0; i < params.queueCount; i++) {
101+
String queue = channel.queueDeclare().getQueue();
102+
channel.basicConsume(queue, false, c);
103+
queues.add(queue);
104+
}
105+
return queues;
106+
}
107+
108+
protected void publish(List<String> queues) throws IOException {
109+
Channel pubCh = connection.createChannel();
110+
pubCh.txSelect();
111+
byte[] body = "".getBytes();
112+
int messagesPerQueue = params.messageCount / queues.size();
113+
for (String queue : queues) {
114+
for (int i = 0; i < messagesPerQueue; i++) {
115+
pubCh.basicPublish("", queue, null, body);
116+
}
117+
}
118+
pubCh.txCommit();
119+
pubCh.close();
120+
}
121+
122+
protected long drain(QueueingConsumer c) throws IOException {
123+
long start = System.nanoTime();
124+
try {
125+
for (int i = 0; i < params.messageCount; i++) {
126+
long tag = c.nextDelivery().getEnvelope().getDeliveryTag();
127+
channel.basicAck(tag, false);
128+
}
129+
} catch (InterruptedException e) {
130+
IOException ioe = new IOException();
131+
ioe.initCause(e);
132+
throw ioe;
133+
}
134+
long finish = System.nanoTime();
135+
return finish - start;
136+
}
137+
138+
public long run() throws IOException {
139+
connection = connectionFactory.newConnection(params.host, params.port);
140+
channel = connection.createChannel();
141+
channel.basicQos(1);
142+
QueueingConsumer consumer = new QueueingConsumer(channel);
143+
try {
144+
publish(consume(consumer));
145+
return drain(consumer);
146+
} finally {
147+
connection.abort();
148+
}
149+
}
150+
151+
public static void main(String[] args) throws Exception {
152+
CommandLine cmd = Parameters.parseCommandLine(args);
153+
if (cmd == null) return;
154+
Parameters params = new Parameters(cmd);
155+
System.out.print(params.toString());
156+
QosScaling test = new QosScaling(params);
157+
long result = test.run();
158+
System.out.println(" -> " + result / 1000000 + "ms");
159+
}
160+
161+
}

0 commit comments

Comments
 (0)