Skip to content

Commit e3f1be2

Browse files
author
Matthias Radestock
committed
cosmetic
1 parent 06ddbd3 commit e3f1be2

File tree

3 files changed

+90
-75
lines changed

3 files changed

+90
-75
lines changed

src/com/rabbitmq/client/impl/SocketFrameHandler.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -56,21 +56,21 @@ public class SocketFrameHandler implements FrameHandler {
5656
/** Socket's outputstream - data to the broker */
5757
public final DataOutputStream _outputStream;
5858

59-
// Note, we use each of these to synchronize on to make sure we don't try to use them
60-
// twice simultaneously.
59+
// Note, we use each of these to synchronize on to make sure we
60+
// don't try to use them twice simultaneously.
6161

6262
/**
6363
* @param socket the socket to use
6464
*/
65-
public SocketFrameHandler(Socket socket) throws IOException{
65+
public SocketFrameHandler(Socket socket) throws IOException {
6666
_socket = socket;
6767

6868
_inputStream = new DataInputStream(new BufferedInputStream(_socket.getInputStream()));
6969
_outputStream = new DataOutputStream(new BufferedOutputStream(_socket.getOutputStream()));
7070
}
7171

7272
public String getHost() {
73-
return _socket.getInetAddress().getHostName();
73+
return _socket.getInetAddress().getHostName();
7474
}
7575

7676
public int getPort() {

test/src/com/rabbitmq/examples/BufferPerformanceMetrics.java

Lines changed: 57 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -14,70 +14,82 @@
1414

1515

1616
/**
17-
* Class to explore how performance of sending and receiving messages varies with the buffer size and
18-
* enabling/disabling Nagle's algorithm.
17+
* Class to explore how performance of sending and receiving messages
18+
* varies with the buffer size and enabling/disabling Nagle's
19+
* algorithm.
1920
*/
20-
public class BufferPerformanceMetrics{
21+
public class BufferPerformanceMetrics {
2122

22-
public static final String QUEUE = "performance-test-queue";
23-
public static final String EXCHANGE = "performance-test-exchange";
23+
public static final String QUEUE = "performance-test-queue";
24+
public static final String EXCHANGE = "performance-test-exchange";
2425
public static final String ROUTING_KEY = "performance-test-rk";
25-
public static final int MESSAGE_COUNT = 100000;
26-
public static final byte[] MESSAGE = "Hello world".getBytes();
26+
public static final int MESSAGE_COUNT = 100000;
27+
public static final byte[] MESSAGE = "Hello world".getBytes();
28+
public static final int REPEATS = 1000000;
29+
public static final int PEAK_SIZE = 20 * 1024;
30+
2731
public static double NANOSECONDS_PER_SECOND = 1000 * 1000 * 1000;
28-
public static final int REPEATS = 1000000;
29-
public static final int PEAK_SIZE = 20 * 1024;
3032

31-
public static void main(String[] args) throws Exception{
32-
String hostName = args.length > 0 ? args[0] : "localhost";
33+
public static void main(String[] args) throws Exception {
34+
String hostName = args.length > 0 ? args[0] : "localhost";
3335

3436
Random rnd = new Random();
3537

36-
System.out.println("buffer size, publish rate with nagle, get rate with nagle," +
37-
" publish rate without nagle, get rate without nagle");
38-
39-
38+
System.out.println("buffer size, " +
39+
"publish rate with nagle, " +
40+
"get rate with nagle, " +
41+
"publish rate without nagle, " +
42+
"get rate without nagle");
4043

41-
for(int repeat = 0; repeat < REPEATS; repeat++){
44+
for(int repeat = 0; repeat < REPEATS; repeat++) {
4245
final int bufferSize = 1 + rnd.nextInt(PEAK_SIZE);
43-
44-
double publishRateNagle = 0, publishRateNoNagle = 0, getRateNagle = 0, getRateNoNagle = 0;
45-
46-
for(final boolean useNagle : new boolean[]{ false, true }){
47-
ConnectionFactory factory = new ConnectionFactory(){
48-
@Override public void configureSocket(Socket socket) throws IOException{
49-
socket.setTcpNoDelay(!useNagle);
50-
socket.setReceiveBufferSize(bufferSize);
51-
socket.setSendBufferSize(bufferSize);
52-
}
53-
};
46+
47+
double
48+
publishRateNagle = 0,
49+
publishRateNoNagle = 0,
50+
getRateNagle = 0,
51+
getRateNoNagle = 0;
52+
53+
for(final boolean useNagle : new boolean[] { false, true }) {
54+
ConnectionFactory factory = new ConnectionFactory() {
55+
public void configureSocket(Socket socket)
56+
throws IOException {
57+
socket.setTcpNoDelay(!useNagle);
58+
socket.setReceiveBufferSize(bufferSize);
59+
socket.setSendBufferSize(bufferSize);
60+
}
61+
};
5462

5563
Connection connection = factory.newConnection(hostName);
5664
Channel channel = connection.createChannel();
5765
channel.exchangeDeclare(EXCHANGE, "direct");
5866
channel.queueDeclare(QUEUE);
5967
channel.queueBind(QUEUE, EXCHANGE, ROUTING_KEY);
60-
68+
6169
long start;
62-
63-
start = System.nanoTime();
64-
for(int i = 0; i < MESSAGE_COUNT; i++){
65-
channel.basicPublish(EXCHANGE, ROUTING_KEY, MessageProperties.BASIC, MESSAGE);
66-
}
67-
long publishTime = System.nanoTime() - start;
6870

69-
start = System.nanoTime();
71+
start = System.nanoTime();
72+
for(int i = 0; i < MESSAGE_COUNT; i++) {
73+
channel.basicPublish(EXCHANGE, ROUTING_KEY,
74+
MessageProperties.BASIC, MESSAGE);
75+
}
76+
long publishTime = System.nanoTime() - start;
77+
78+
start = System.nanoTime();
7079

7180
QueueingConsumer consumer = new QueueingConsumer(channel);
7281
channel.basicConsume(QUEUE, true, consumer);
7382

7483
for(int i = 0; i < MESSAGE_COUNT; i++){
7584
consumer.nextDelivery();
76-
}
77-
long getTime = System.nanoTime() - start;
85+
}
86+
long getTime = System.nanoTime() - start;
87+
88+
double publishRate =
89+
MESSAGE_COUNT / (publishTime / NANOSECONDS_PER_SECOND);
90+
double getRate =
91+
MESSAGE_COUNT / (getTime / NANOSECONDS_PER_SECOND);
7892

79-
double publishRate = MESSAGE_COUNT / (publishTime / NANOSECONDS_PER_SECOND);
80-
double getRate = MESSAGE_COUNT / (getTime / NANOSECONDS_PER_SECOND);
8193
if(useNagle){
8294
publishRateNagle = publishRate;
8395
getRateNagle = getRate;
@@ -87,11 +99,15 @@ public static void main(String[] args) throws Exception{
8799
}
88100

89101
connection.close();
90-
// Small sleep to remove noise from hammering the server.
102+
// Small sleep to remove noise from hammering the server.
91103
Thread.sleep(100);
92104
}
93105

94-
System.out.println(bufferSize + ", " + publishRateNagle + ", " + getRateNagle + ", " + publishRateNoNagle + ", " + getRateNoNagle);
106+
System.out.println(bufferSize + ", " +
107+
publishRateNagle + ", " +
108+
getRateNagle + ", " +
109+
publishRateNoNagle + ", " +
110+
getRateNoNagle);
95111
}
96112
}
97113
}

test/src/com/rabbitmq/examples/MulticastMain.java

Lines changed: 29 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -100,16 +100,15 @@ public static void main(String[] args) {
100100
Connection[] consumerConnections = new Connection[consumerCount];
101101
for (int i = 0; i < consumerCount; i++) {
102102
System.out.println("starting consumer #" + i);
103-
Connection conn = new ConnectionFactory(params){
104-
@Override public void configureSocket(Socket socket) throws IOException{
105-
super.configureSocket(socket);
106-
if(recvBufferSize > 0)
107-
socket.setReceiveBufferSize(recvBufferSize);
108-
if(sendBufferSize > 0 )
109-
socket.setSendBufferSize(sendBufferSize);
110-
111-
}
112-
}.newConnection(addresses, maxRedirects);
103+
Connection conn = new ConnectionFactory(params) {
104+
public void configureSocket(Socket socket) throws IOException {
105+
super.configureSocket(socket);
106+
if(recvBufferSize > 0)
107+
socket.setReceiveBufferSize(recvBufferSize);
108+
if(sendBufferSize > 0 )
109+
socket.setSendBufferSize(sendBufferSize);
110+
}
111+
}.newConnection(addresses, maxRedirects);
113112
consumerConnections[i] = conn;
114113
Channel channel = conn.createChannel();
115114
if (consumerTxSize > 0) channel.txSelect();
@@ -120,7 +119,7 @@ public static void main(String[] args) {
120119
if (prefetchCount > 0) channel.basicQos(prefetchCount);
121120
channel.basicConsume(queueName, autoAck, consumer);
122121
channel.queueBind(queueName, exchangeName, id);
123-
Thread t =
122+
Thread t =
124123
new Thread(new Consumer(consumer, id,
125124
consumerTxSize, autoAck,
126125
stats, timeLimit));
@@ -136,7 +135,7 @@ public static void main(String[] args) {
136135
Channel channel = conn.createChannel();
137136
if (producerTxSize > 0) channel.txSelect();
138137
channel.exchangeDeclare(exchangeName, exchangeType);
139-
Thread t =
138+
Thread t =
140139
new Thread(new Producer(channel, exchangeName, id,
141140
flags, producerTxSize,
142141
1000L * samplingInterval,
@@ -169,24 +168,24 @@ public static void main(String[] args) {
169168

170169
private static Options getOptions() {
171170
Options options = new Options();
172-
options.addOption(new Option("h", "host", true, "broker host"));
173-
options.addOption(new Option("p", "port", true, "broker port"));
174-
options.addOption(new Option("t", "type", true, "exchange type"));
175-
options.addOption(new Option("e", "exchange", true, "exchange name"));
176-
options.addOption(new Option("i", "interval", true, "sampling interval"));
177-
options.addOption(new Option("r", "rate", true, "rate limit"));
178-
options.addOption(new Option("x", "producers", true, "producer count"));
179-
options.addOption(new Option("y", "consumers", true, "consumer count"));
180-
options.addOption(new Option("m", "ptxsize", true, "producer tx size"));
181-
options.addOption(new Option("n", "ctxsize", true, "consumer tx size"));
182-
options.addOption(new Option("a", "autoack", false,"auto ack"));
183-
options.addOption(new Option("q", "qos", true, "qos prefetch count"));
184-
options.addOption(new Option("s", "size", true, "message size"));
185-
options.addOption(new Option("d", "redirects", true, "max redirects"));
186-
options.addOption(new Option("z", "time", true, "time limit"));
187-
options.addOption(new Option("b", "sendbuffer",true, "send buffer size"));
188-
options.addOption(new Option("c", "recvbuffer",true, "receive buffer size"));
189-
Option flag = new Option("f", "flag", true, "message flag");
171+
options.addOption(new Option("h", "host", true, "broker host"));
172+
options.addOption(new Option("p", "port", true, "broker port"));
173+
options.addOption(new Option("t", "type", true, "exchange type"));
174+
options.addOption(new Option("e", "exchange", true, "exchange name"));
175+
options.addOption(new Option("i", "interval", true, "sampling interval"));
176+
options.addOption(new Option("r", "rate", true, "rate limit"));
177+
options.addOption(new Option("x", "producers", true, "producer count"));
178+
options.addOption(new Option("y", "consumers", true, "consumer count"));
179+
options.addOption(new Option("m", "ptxsize", true, "producer tx size"));
180+
options.addOption(new Option("n", "ctxsize", true, "consumer tx size"));
181+
options.addOption(new Option("a", "autoack", false,"auto ack"));
182+
options.addOption(new Option("q", "qos", true, "qos prefetch count"));
183+
options.addOption(new Option("s", "size", true, "message size"));
184+
options.addOption(new Option("d", "redirects", true, "max redirects"));
185+
options.addOption(new Option("z", "time", true, "time limit"));
186+
options.addOption(new Option("b", "sendbuffer", true, "send buffer size"));
187+
options.addOption(new Option("c", "recvbuffer", true, "receive buffer size"));
188+
Option flag = new Option("f", "flag", true, "message flag");
190189
flag.setArgs(Option.UNLIMITED_VALUES);
191190
options.addOption(flag);
192191
return options;

0 commit comments

Comments
 (0)