Skip to content

Commit c9d1d84

Browse files
author
Simon MacMullen
committed
Merge bug25813
2 parents a8a10ad + 652eaa4 commit c9d1d84

File tree

7 files changed

+100
-74
lines changed

7 files changed

+100
-74
lines changed

build.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -444,7 +444,7 @@
444444
</jar>
445445
</target>
446446

447-
<target name="dist" depends="jar, test-jar">
447+
<target name="dist" depends="jar, test-jar" description="Build all library JARs and documentation">
448448
<mkdir dir="${dist.out}"/>
449449
<copy todir="${dist.out}">
450450
<!-- ant doesn't seem to provide any form of usable abstraction over sets of file names -->
@@ -464,7 +464,7 @@
464464
</copy>
465465
</target>
466466

467-
<target name="clean">
467+
<target name="clean" description="Cleans build artifacts">
468468
<delete dir="build"/>
469469
</target>
470470

src/com/rabbitmq/client/ConnectionFactory.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,10 @@ public class ConnectionFactory implements Cloneable {
9999
private boolean automaticRecovery = false;
100100
private boolean topologyRecovery = true;
101101

102-
private int networkRecoveryInterval = 5000;
102+
// long is used to make sure the users can use both ints
103+
// and longs safely. It is unlikely that anybody'd need
104+
// to use recovery intervals > Integer.MAX_VALUE in practice.
105+
private long networkRecoveryInterval = 5000;
103106

104107
/** @return number of consumer threads in default {@link ExecutorService} */
105108
@Deprecated
@@ -638,7 +641,7 @@ public Connection newConnection(ExecutorService executor) throws IOException {
638641
* Returns automatic connection recovery interval in milliseconds.
639642
* @return how long will automatic recovery wait before attempting to reconnect, in ms; default is 5000
640643
*/
641-
public int getNetworkRecoveryInterval() {
644+
public long getNetworkRecoveryInterval() {
642645
return networkRecoveryInterval;
643646
}
644647

@@ -649,4 +652,12 @@ public int getNetworkRecoveryInterval() {
649652
public void setNetworkRecoveryInterval(int networkRecoveryInterval) {
650653
this.networkRecoveryInterval = networkRecoveryInterval;
651654
}
655+
656+
/**
657+
* Sets connection recovery interval. Default is 5000.
658+
* @param networkRecoveryInterval how long will automatic recovery wait before attempting to reconnect, in ms
659+
*/
660+
public void setNetworkRecoveryInterval(long networkRecoveryInterval) {
661+
this.networkRecoveryInterval = networkRecoveryInterval;
662+
}
652663
}

src/com/rabbitmq/client/impl/ConnectionParams.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ public class ConnectionParams {
1717
private final int requestedChannelMax;
1818
private final int requestedHeartbeat;
1919
private final SaslConfig saslConfig;
20-
private final int networkRecoveryInterval;
20+
private final long networkRecoveryInterval;
2121
private final boolean topologyRecovery;
2222

2323
private ExceptionHandler exceptionHandler;
@@ -41,7 +41,7 @@ public class ConnectionParams {
4141
public ConnectionParams(String username, String password, ExecutorService executor,
4242
String virtualHost, Map<String, Object> clientProperties,
4343
int requestedFrameMax, int requestedChannelMax, int requestedHeartbeat,
44-
SaslConfig saslConfig, int networkRecoveryInterval,
44+
SaslConfig saslConfig, long networkRecoveryInterval,
4545
boolean topologyRecovery, ExceptionHandler exceptionHandler, ThreadFactory threadFactory) {
4646
this.username = username;
4747
this.password = password;
@@ -98,7 +98,7 @@ public ExceptionHandler getExceptionHandler() {
9898
return exceptionHandler;
9999
}
100100

101-
public int getNetworkRecoveryInterval() {
101+
public long getNetworkRecoveryInterval() {
102102
return networkRecoveryInterval;
103103
}
104104

test/src/com/rabbitmq/client/test/functional/ConnectionRecovery.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
import java.util.concurrent.atomic.AtomicInteger;
1919

2020
public class ConnectionRecovery extends BrokerTestCase {
21-
public static final int RECOVERY_INTERVAL = 2000;
21+
public static final long RECOVERY_INTERVAL = 2000;
2222

2323
public void testConnectionRecovery() throws IOException, InterruptedException {
2424
assertTrue(connection.isOpen());

test/src/com/rabbitmq/examples/PerfTest.java

Lines changed: 55 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -45,32 +45,33 @@ public static void main(String[] args) {
4545
System.exit(0);
4646
}
4747

48-
String exchangeType = strArg(cmd, 't', "direct");
49-
String exchangeName = strArg(cmd, 'e', exchangeType);
50-
String queueName = strArg(cmd, 'u', "");
51-
String routingKey = strArg(cmd, 'k', null);
52-
int samplingInterval = intArg(cmd, 'i', 1);
53-
float producerRateLimit = floatArg(cmd, 'r', 0.0f);
54-
float consumerRateLimit = floatArg(cmd, 'R', 0.0f);
55-
int producerCount = intArg(cmd, 'x', 1);
56-
int consumerCount = intArg(cmd, 'y', 1);
57-
int producerTxSize = intArg(cmd, 'm', 0);
58-
int consumerTxSize = intArg(cmd, 'n', 0);
59-
long confirm = intArg(cmd, 'c', -1);
60-
boolean autoAck = cmd.hasOption('a');
61-
int multiAckEvery = intArg(cmd, 'A', 0);
62-
int channelPrefetch = intArg(cmd, 'Q', 0);
63-
int consumerPrefetch = intArg(cmd, 'q', 0);
64-
int minMsgSize = intArg(cmd, 's', 0);
65-
int timeLimit = intArg(cmd, 'z', 0);
66-
int producerMsgCount = intArg(cmd, 'C', 0);
67-
int consumerMsgCount = intArg(cmd, 'D', 0);
68-
List<?> flags = lstArg(cmd, 'f');
69-
int frameMax = intArg(cmd, 'M', 0);
70-
int heartbeat = intArg(cmd, 'b', 0);
71-
boolean predeclared = cmd.hasOption('p');
72-
73-
String uri = strArg(cmd, 'h', "amqp://localhost");
48+
String exchangeType = strArg(cmd, 't', "direct");
49+
String exchangeName = strArg(cmd, 'e', exchangeType);
50+
String queueName = strArg(cmd, 'u', "");
51+
String routingKey = strArg(cmd, 'k', null);
52+
boolean randomRoutingKey = cmd.hasOption('K');
53+
int samplingInterval = intArg(cmd, 'i', 1);
54+
float producerRateLimit = floatArg(cmd, 'r', 0.0f);
55+
float consumerRateLimit = floatArg(cmd, 'R', 0.0f);
56+
int producerCount = intArg(cmd, 'x', 1);
57+
int consumerCount = intArg(cmd, 'y', 1);
58+
int producerTxSize = intArg(cmd, 'm', 0);
59+
int consumerTxSize = intArg(cmd, 'n', 0);
60+
long confirm = intArg(cmd, 'c', -1);
61+
boolean autoAck = cmd.hasOption('a');
62+
int multiAckEvery = intArg(cmd, 'A', 0);
63+
int channelPrefetch = intArg(cmd, 'Q', 0);
64+
int consumerPrefetch = intArg(cmd, 'q', 0);
65+
int minMsgSize = intArg(cmd, 's', 0);
66+
int timeLimit = intArg(cmd, 'z', 0);
67+
int producerMsgCount = intArg(cmd, 'C', 0);
68+
int consumerMsgCount = intArg(cmd, 'D', 0);
69+
List<?> flags = lstArg(cmd, 'f');
70+
int frameMax = intArg(cmd, 'M', 0);
71+
int heartbeat = intArg(cmd, 'b', 0);
72+
boolean predeclared = cmd.hasOption('p');
73+
74+
String uri = strArg(cmd, 'h', "amqp://localhost");
7475

7576
//setup
7677
PrintlnStats stats = new PrintlnStats(1000L * samplingInterval,
@@ -107,6 +108,7 @@ public static void main(String[] args) {
107108
p.setProducerTxSize( producerTxSize);
108109
p.setQueueName( queueName);
109110
p.setRoutingKey( routingKey);
111+
p.setRandomRoutingKey( randomRoutingKey);
110112
p.setProducerRateLimit(producerRateLimit);
111113
p.setTimeLimit( timeLimit);
112114

@@ -132,34 +134,35 @@ private static void usage(Options options) {
132134

133135
private static Options getOptions() {
134136
Options options = new Options();
135-
options.addOption(new Option("?", "help", false,"show usage"));
136-
options.addOption(new Option("h", "uri", true, "AMQP URI"));
137-
options.addOption(new Option("t", "type", true, "exchange type"));
138-
options.addOption(new Option("e", "exchange", true, "exchange name"));
139-
options.addOption(new Option("u", "queue", true, "queue name"));
140-
options.addOption(new Option("k", "routingKey", true, "routing key"));
141-
options.addOption(new Option("i", "interval", true, "sampling interval"));
142-
options.addOption(new Option("r", "rate", true, "producer rate limit"));
143-
options.addOption(new Option("R", "consumerRate", true, "consumer rate limit"));
144-
options.addOption(new Option("x", "producers", true, "producer count"));
145-
options.addOption(new Option("y", "consumers", true, "consumer count"));
146-
options.addOption(new Option("m", "ptxsize", true, "producer tx size"));
147-
options.addOption(new Option("n", "ctxsize", true, "consumer tx size"));
148-
options.addOption(new Option("c", "confirm", true, "max unconfirmed publishes"));
149-
options.addOption(new Option("a", "autoack", false,"auto ack"));
150-
options.addOption(new Option("A", "multiAckEvery", true, "multi ack every"));
151-
options.addOption(new Option("q", "qos", true, "consumer prefetch count"));
152-
options.addOption(new Option("Q", "globalQos", true, "channel prefetch count"));
153-
options.addOption(new Option("s", "size", true, "message size"));
154-
options.addOption(new Option("z", "time", true, "time limit"));
155-
options.addOption(new Option("C", "pmessages", true, "producer message count"));
156-
options.addOption(new Option("D", "cmessages", true, "consumer message count"));
157-
Option flag = new Option("f", "flag", true, "message flag");
137+
options.addOption(new Option("?", "help", false,"show usage"));
138+
options.addOption(new Option("h", "uri", true, "AMQP URI"));
139+
options.addOption(new Option("t", "type", true, "exchange type"));
140+
options.addOption(new Option("e", "exchange", true, "exchange name"));
141+
options.addOption(new Option("u", "queue", true, "queue name"));
142+
options.addOption(new Option("k", "routingKey", true, "routing key"));
143+
options.addOption(new Option("K", "randomRoutingKey", false,"use random routing key per message"));
144+
options.addOption(new Option("i", "interval", true, "sampling interval"));
145+
options.addOption(new Option("r", "rate", true, "producer rate limit"));
146+
options.addOption(new Option("R", "consumerRate", true, "consumer rate limit"));
147+
options.addOption(new Option("x", "producers", true, "producer count"));
148+
options.addOption(new Option("y", "consumers", true, "consumer count"));
149+
options.addOption(new Option("m", "ptxsize", true, "producer tx size"));
150+
options.addOption(new Option("n", "ctxsize", true, "consumer tx size"));
151+
options.addOption(new Option("c", "confirm", true, "max unconfirmed publishes"));
152+
options.addOption(new Option("a", "autoack", false,"auto ack"));
153+
options.addOption(new Option("A", "multiAckEvery", true, "multi ack every"));
154+
options.addOption(new Option("q", "qos", true, "consumer prefetch count"));
155+
options.addOption(new Option("Q", "globalQos", true, "channel prefetch count"));
156+
options.addOption(new Option("s", "size", true, "message size"));
157+
options.addOption(new Option("z", "time", true, "time limit"));
158+
options.addOption(new Option("C", "pmessages", true, "producer message count"));
159+
options.addOption(new Option("D", "cmessages", true, "consumer message count"));
160+
Option flag = new Option("f", "flag", true, "message flag");
158161
flag.setArgs(Option.UNLIMITED_VALUES);
159162
options.addOption(flag);
160-
options.addOption(new Option("M", "framemax", true, "frame max"));
161-
options.addOption(new Option("b", "heartbeat", true, "heartbeat interval"));
162-
options.addOption(new Option("p", "predeclared", false,"allow use of predeclared objects"));
163+
options.addOption(new Option("M", "framemax", true, "frame max"));
164+
options.addOption(new Option("b", "heartbeat", true, "heartbeat interval"));
165+
options.addOption(new Option("p", "predeclared", false,"allow use of predeclared objects"));
163166
return options;
164167
}
165168

test/src/com/rabbitmq/examples/perf/MulticastParams.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ public class MulticastParams {
4545
private String exchangeType = "direct";
4646
private String queueName = "";
4747
private String routingKey = null;
48+
private boolean randomRoutingKey = false;
4849

4950
private List<?> flags = new ArrayList<Object>();
5051

@@ -70,6 +71,10 @@ public void setRoutingKey(String routingKey) {
7071
this.routingKey = routingKey;
7172
}
7273

74+
public void setRandomRoutingKey(boolean randomRoutingKey) {
75+
this.randomRoutingKey = randomRoutingKey;
76+
}
77+
7378
public void setProducerRateLimit(float producerRateLimit) {
7479
this.producerRateLimit = producerRateLimit;
7580
}
@@ -163,6 +168,10 @@ public String getRoutingKey() {
163168
return routingKey;
164169
}
165170

171+
public boolean getRandomRoutingKey() {
172+
return randomRoutingKey;
173+
}
174+
166175
public Producer createProducer(Connection connection, Stats stats, String id) throws IOException {
167176
Channel channel = connection.createChannel();
168177
if (producerTxSize > 0) channel.txSelect();
@@ -171,7 +180,7 @@ public Producer createProducer(Connection connection, Stats stats, String id) th
171180
channel.exchangeDeclare(exchangeName, exchangeType);
172181
}
173182
final Producer producer = new Producer(channel, exchangeName, id,
174-
flags, producerTxSize,
183+
randomRoutingKey, flags, producerTxSize,
175184
producerRateLimit, producerMsgCount,
176185
minMsgSize, timeLimit,
177186
confirm, stats);

test/src/com/rabbitmq/examples/perf/Producer.java

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import java.util.List;
3030
import java.util.SortedSet;
3131
import java.util.TreeSet;
32+
import java.util.UUID;
3233
import java.util.concurrent.Semaphore;
3334

3435
public class Producer extends ProducerConsumerBase implements Runnable, ReturnListener,
@@ -37,6 +38,7 @@ public class Producer extends ProducerConsumerBase implements Runnable, ReturnLi
3738
private Channel channel;
3839
private String exchangeName;
3940
private String id;
41+
private boolean randomRoutingKey;
4042
private boolean mandatory;
4143
private boolean immediate;
4244
private boolean persistent;
@@ -52,23 +54,24 @@ public class Producer extends ProducerConsumerBase implements Runnable, ReturnLi
5254
private volatile SortedSet<Long> unconfirmedSet =
5355
Collections.synchronizedSortedSet(new TreeSet<Long>());
5456

55-
public Producer(Channel channel, String exchangeName, String id,
57+
public Producer(Channel channel, String exchangeName, String id, boolean randomRoutingKey,
5658
List<?> flags, int txSize,
5759
float rateLimit, int msgLimit, int minMsgSize, int timeLimit,
5860
long confirm, Stats stats)
5961
throws IOException {
6062

61-
this.channel = channel;
62-
this.exchangeName = exchangeName;
63-
this.id = id;
64-
this.mandatory = flags.contains("mandatory");
65-
this.immediate = flags.contains("immediate");
66-
this.persistent = flags.contains("persistent");
67-
this.txSize = txSize;
68-
this.rateLimit = rateLimit;
69-
this.msgLimit = msgLimit;
70-
this.timeLimit = 1000L * timeLimit;
71-
this.message = new byte[minMsgSize];
63+
this.channel = channel;
64+
this.exchangeName = exchangeName;
65+
this.id = id;
66+
this.randomRoutingKey = randomRoutingKey;
67+
this.mandatory = flags.contains("mandatory");
68+
this.immediate = flags.contains("immediate");
69+
this.persistent = flags.contains("persistent");
70+
this.txSize = txSize;
71+
this.rateLimit = rateLimit;
72+
this.msgLimit = msgLimit;
73+
this.timeLimit = 1000L * timeLimit;
74+
this.message = new byte[minMsgSize];
7275
if (confirm > 0) {
7376
this.confirmPool = new Semaphore((int)confirm);
7477
}
@@ -156,7 +159,7 @@ private void publish(byte[] msg)
156159
throws IOException {
157160

158161
unconfirmedSet.add(channel.getNextPublishSeqNo());
159-
channel.basicPublish(exchangeName, id,
162+
channel.basicPublish(exchangeName, randomRoutingKey ? UUID.randomUUID().toString() : id,
160163
mandatory, immediate,
161164
persistent ? MessageProperties.MINIMAL_PERSISTENT_BASIC : MessageProperties.MINIMAL_BASIC,
162165
msg);

0 commit comments

Comments
 (0)