Skip to content

Commit 412dc81

Browse files
author
Simon MacMullen
committed
Make MulticastParams create producers and consumers, and thus prevent knowledge of its entire state leaking into MulticastSet.
1 parent c0f8106 commit 412dc81

File tree

5 files changed

+103
-74
lines changed

5 files changed

+103
-74
lines changed

test/src/com/rabbitmq/examples/perf/MulticastParams.java

Lines changed: 80 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -16,33 +16,36 @@
1616

1717
package com.rabbitmq.examples.perf;
1818

19+
import com.rabbitmq.client.Channel;
20+
21+
import java.io.IOException;
1922
import java.util.ArrayList;
2023
import java.util.List;
2124

2225
public class MulticastParams {
23-
protected long confirm = -1;
24-
protected int consumerCount = 1;
25-
protected int producerCount = 1;
26-
protected int consumerTxSize = 0;
27-
protected int producerTxSize = 0;
28-
protected int prefetchCount = 0;
29-
protected int minMsgSize = 0;
30-
31-
protected int timeLimit = 0;
32-
protected int rateLimit = 0;
33-
protected int producerMsgCount = 0;
34-
protected int consumerMsgCount = 0;
35-
36-
protected String exchangeName = "direct";
37-
protected String exchangeType = "direct";
38-
protected String queueName = "";
39-
40-
protected List<?> flags = new ArrayList<Object>();
41-
42-
protected int multiAckEvery = 0;
43-
protected boolean autoAck = true;
44-
protected boolean exclusive = true;
45-
protected boolean autoDelete = false;
26+
private long confirm = -1;
27+
private int consumerCount = 1;
28+
private int producerCount = 1;
29+
private int consumerTxSize = 0;
30+
private int producerTxSize = 0;
31+
private int prefetchCount = 0;
32+
private int minMsgSize = 0;
33+
34+
private int timeLimit = 0;
35+
private int rateLimit = 0;
36+
private int producerMsgCount = 0;
37+
private int consumerMsgCount = 0;
38+
39+
private String exchangeName = "direct";
40+
private String exchangeType = "direct";
41+
private String queueName = "";
42+
43+
private List<?> flags = new ArrayList<Object>();
44+
45+
private int multiAckEvery = 0;
46+
private boolean autoAck = true;
47+
private boolean exclusive = true;
48+
private boolean autoDelete = false;
4649

4750
public void setExchangeType(String exchangeType) {
4851
this.exchangeType = exchangeType;
@@ -124,4 +127,58 @@ public void setExclusive(boolean exclusive) {
124127
public void setAutoDelete(boolean autoDelete) {
125128
this.autoDelete = autoDelete;
126129
}
130+
131+
public int getConsumerCount() {
132+
return consumerCount;
133+
}
134+
135+
public int getProducerCount() {
136+
return producerCount;
137+
}
138+
139+
public int getMinMsgSize() {
140+
return minMsgSize;
141+
}
142+
143+
public Producer createProducer(Channel channel, Stats stats, String id) throws IOException {
144+
if (producerTxSize > 0) channel.txSelect();
145+
if (confirm >= 0) channel.confirmSelect();
146+
channel.exchangeDeclare(exchangeName, exchangeType);
147+
final Producer producer = new Producer(channel, exchangeName, id,
148+
flags, producerTxSize,
149+
rateLimit, producerMsgCount,
150+
minMsgSize, timeLimit,
151+
confirm, stats);
152+
channel.addReturnListener(producer);
153+
channel.addConfirmListener(producer);
154+
return producer;
155+
}
156+
157+
public Consumer createConsumer(Channel channel, Stats stats, String id) throws IOException {
158+
if (consumerTxSize > 0) channel.txSelect();
159+
channel.exchangeDeclare(exchangeName, exchangeType);
160+
String qName =
161+
channel.queueDeclare(queueName,
162+
flags.contains("persistent"),
163+
exclusive, autoDelete,
164+
null).getQueue();
165+
if (prefetchCount > 0) channel.basicQos(prefetchCount);
166+
channel.queueBind(qName, exchangeName, id);
167+
return new Consumer(channel, id, qName,
168+
consumerTxSize, autoAck, multiAckEvery,
169+
stats, consumerMsgCount, timeLimit);
170+
}
171+
172+
public boolean shouldConfigureQueue() {
173+
return consumerCount == 0 && !queueName.equals("");
174+
}
175+
176+
public void configureQueue(Channel channel, String id) throws IOException {
177+
channel.exchangeDeclare(exchangeName, exchangeType);
178+
channel.queueDeclare(queueName,
179+
flags.contains("persistent"),
180+
exclusive, autoDelete,
181+
null).getQueue();
182+
channel.queueBind(queueName, exchangeName, id);
183+
}
127184
}

test/src/com/rabbitmq/examples/perf/MulticastSet.java

Lines changed: 19 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -27,99 +27,72 @@ public class MulticastSet {
2727
private final String id;
2828
private final Stats stats;
2929
private final ConnectionFactory factory;
30-
private final MulticastParams p;
30+
private final MulticastParams params;
3131

3232
public MulticastSet(Stats stats, ConnectionFactory factory,
3333
MulticastParams params) {
3434
this.id = UUID.randomUUID().toString();
3535
this.stats = stats;
3636
this.factory = factory;
37-
this.p = params;
37+
this.params = params;
3838
}
3939

4040
public void run() throws IOException, InterruptedException {
4141
run(false);
4242
}
4343

4444
public void run(boolean announceStartup) throws IOException, InterruptedException {
45-
Thread[] consumerThreads = new Thread[p.consumerCount];
46-
Connection[] consumerConnections = new Connection[p.consumerCount];
47-
for (int i = 0; i < p.consumerCount; i++) {
45+
Thread[] consumerThreads = new Thread[params.getConsumerCount()];
46+
Connection[] consumerConnections = new Connection[consumerThreads.length];
47+
for (int i = 0; i < consumerConnections.length; i++) {
4848
if (announceStartup) {
4949
System.out.println("starting consumer #" + i);
5050
}
5151
Connection conn = factory.newConnection();
5252
consumerConnections[i] = conn;
5353
Channel channel = conn.createChannel();
54-
if (p.consumerTxSize > 0) channel.txSelect();
55-
channel.exchangeDeclare(p.exchangeName, p.exchangeType);
56-
String qName =
57-
channel.queueDeclare(p.queueName,
58-
p.flags.contains("persistent"),
59-
p.exclusive, p.autoDelete,
60-
null).getQueue();
61-
if (p.prefetchCount > 0) channel.basicQos(p.prefetchCount);
62-
channel.queueBind(qName, p.exchangeName, id);
63-
Thread t =
64-
new Thread(new Consumer(channel, id, qName,
65-
p.consumerTxSize, p.autoAck, p.multiAckEvery,
66-
stats, p.consumerMsgCount, p.timeLimit));
54+
Thread t = new Thread(params.createConsumer(channel, stats, id));
6755
consumerThreads[i] = t;
6856
}
6957

70-
if (p.consumerCount == 0 && !p.queueName.equals("")) {
58+
if (params.shouldConfigureQueue()) {
7159
Connection conn = factory.newConnection();
7260
Channel channel = conn.createChannel();
73-
channel.exchangeDeclare(p.exchangeName, p.exchangeType);
74-
channel.queueDeclare(p.queueName,
75-
p.flags.contains("persistent"),
76-
p.exclusive, p.autoDelete,
77-
null).getQueue();
78-
channel.queueBind(p.queueName, p.exchangeName, id);
61+
params.configureQueue(channel, id);
7962
conn.close();
8063
}
8164

82-
Thread[] producerThreads = new Thread[p.producerCount];
83-
Connection[] producerConnections = new Connection[p.producerCount];
84-
Channel[] producerChannels = new Channel[p.producerCount];
85-
for (int i = 0; i < p.producerCount; i++) {
65+
Thread[] producerThreads = new Thread[params.getProducerCount()];
66+
Connection[] producerConnections = new Connection[producerThreads.length];
67+
Channel[] producerChannels = new Channel[producerConnections.length];
68+
for (int i = 0; i < producerChannels.length; i++) {
8669
if (announceStartup) {
8770
System.out.println("starting producer #" + i);
8871
}
8972
Connection conn = factory.newConnection();
9073
producerConnections[i] = conn;
9174
Channel channel = conn.createChannel();
9275
producerChannels[i] = channel;
93-
if (p.producerTxSize > 0) channel.txSelect();
94-
if (p.confirm >= 0) channel.confirmSelect();
95-
channel.exchangeDeclare(p.exchangeName, p.exchangeType);
96-
final Producer producer = new Producer(channel, p.exchangeName, id,
97-
p.flags, p.producerTxSize,
98-
p.rateLimit, p.producerMsgCount,
99-
p.minMsgSize, p.timeLimit,
100-
p.confirm, stats);
101-
channel.addReturnListener(producer);
102-
channel.addConfirmListener(producer);
103-
Thread t = new Thread(producer);
76+
Thread t = new Thread(params.createProducer(channel, stats, id));
10477
producerThreads[i] = t;
10578
}
10679

107-
for (int i = 0; i < p.consumerCount; i++) {
108-
consumerThreads[i].start();
80+
for (Thread consumerThread : consumerThreads) {
81+
consumerThread.start();
10982
}
11083

111-
for (int i = 0; i < p.producerCount; i++) {
112-
producerThreads[i].start();
84+
for (Thread producerThread : producerThreads) {
85+
producerThread.start();
11386
}
11487

115-
for (int i = 0; i < p.producerCount; i++) {
88+
for (int i = 0; i < producerThreads.length; i++) {
11689
producerThreads[i].join();
11790
producerChannels[i].clearReturnListeners();
11891
producerChannels[i].clearConfirmListeners();
11992
producerConnections[i].close();
12093
}
12194

122-
for (int i = 0; i < p.consumerCount; i++) {
95+
for (int i = 0; i < consumerThreads.length; i++) {
12396
consumerThreads[i].join();
12497
consumerConnections[i].close();
12598
}

test/src/com/rabbitmq/examples/perf/SimpleScenario.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import com.rabbitmq.client.ConnectionFactory;
2020

2121
import java.io.IOException;
22-
import java.util.Map;
2322

2423
public class SimpleScenario implements Scenario {
2524
private String name;
@@ -44,7 +43,7 @@ public void run() throws IOException, InterruptedException {
4443
this.stats = new SimpleScenarioStats(interval);
4544
for (MulticastParams p : params) {
4645
MulticastSet set = new MulticastSet(stats, factory, p);
47-
stats.setMinMsgSize(p.minMsgSize);
46+
stats.setup(p);
4847
set.run();
4948
}
5049
}

test/src/com/rabbitmq/examples/perf/SimpleScenarioStats.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,8 +70,8 @@ public Map<String, Object> results() {
7070
return map;
7171
}
7272

73-
public void setMinMsgSize(long minMsgSize) {
74-
this.minMsgSize = minMsgSize;
73+
public void setup(MulticastParams params) {
74+
this.minMsgSize = params.getMinMsgSize();
7575
}
7676

7777
public double getSendRate() {

test/src/com/rabbitmq/examples/perf/VaryingScenario.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ private void run(Variable[] variables, List<VariableValue> values) throws Except
6464
value.setup(p);
6565
}
6666
MulticastSet set = new MulticastSet(stats0, factory, p);
67-
stats0.setMinMsgSize(p.minMsgSize);
67+
stats0.setup(p);
6868
set.run();
6969
for (VariableValue value : values) {
7070
value.teardown(p);

0 commit comments

Comments
 (0)