Skip to content

Commit 3846de0

Browse files
author
Matthew Sackman
committed
Merging default into bug 21922
2 parents 6e4e338 + 3a7d68b commit 3846de0

File tree

7 files changed

+136
-113
lines changed

7 files changed

+136
-113
lines changed

src/com/rabbitmq/client/ConnectionFactory.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -346,13 +346,13 @@ protected FrameHandler createFrameHandler(Socket sock)
346346
}
347347

348348
/**
349-
* Provides a hook to insert custom configuration of the sockets used
350-
* to connect to an AMQP server before they connect.
349+
* Provides a hook to insert custom configuration of the sockets
350+
* used to connect to an AMQP server before they connect.
351351
*
352-
* The default behaviour of this method is to disable Nagle's algorithm to get
353-
* more consistently low latency.
354-
* However it may be overridden freely and there is no requirement to retain
355-
* this behaviour.
352+
* The default behaviour of this method is to disable Nagle's
353+
* algorithm to get more consistently low latency. However it
354+
* may be overridden freely and there is no requirement to retain
355+
* this behaviour.
356356
*
357357
* @param socket The socket that is to be used for the Connection
358358
*/

test/src/com/rabbitmq/client/test/functional/BindingLifecycleBase.java

Lines changed: 2 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -31,14 +31,8 @@
3131

3232
package com.rabbitmq.client.test.functional;
3333

34-
import com.rabbitmq.client.test.BrokerTestCase;
35-
36-
import com.rabbitmq.client.AMQP;
37-
import com.rabbitmq.client.Channel;
38-
import com.rabbitmq.client.Connection;
3934
import com.rabbitmq.client.GetResponse;
4035
import com.rabbitmq.client.QueueingConsumer;
41-
import com.rabbitmq.client.ConnectionFactory;
4236
import java.io.IOException;
4337

4438
/**
@@ -50,7 +44,7 @@
5044
* handler code in the server.
5145
*
5246
*/
53-
public class BindingLifecycleBase extends BrokerTestCase {
47+
public class BindingLifecycleBase extends ClusteredTestBase {
5448
protected static final String K = "K-" + System.currentTimeMillis();
5549
protected static final int N = 1;
5650
protected static final String Q = "Q-" + System.currentTimeMillis();
@@ -60,49 +54,6 @@ public class BindingLifecycleBase extends BrokerTestCase {
6054
protected static String randomString() {
6155
return "-" + System.nanoTime();
6256
}
63-
public Channel secondaryChannel;
64-
public Connection secondaryConnection;
65-
66-
@Override
67-
public void openChannel() throws IOException {
68-
if (secondaryConnection != null) {
69-
secondaryChannel = secondaryConnection.createChannel();
70-
}
71-
super.openChannel();
72-
}
73-
74-
@Override
75-
public void openConnection() throws IOException {
76-
super.openConnection();
77-
if (secondaryConnection == null) {
78-
try {
79-
ConnectionFactory cf2 = connectionFactory.clone();
80-
cf2.setHost("localhost");
81-
cf2.setPort(5673);
82-
secondaryConnection = cf2.newConnection();
83-
}
84-
catch (IOException e) {
85-
}
86-
}
87-
}
88-
89-
@Override
90-
public void closeChannel() throws IOException {
91-
if (secondaryChannel != null) {
92-
secondaryChannel.abort();
93-
secondaryChannel = null;
94-
}
95-
super.closeChannel();
96-
}
97-
98-
@Override
99-
public void closeConnection() throws IOException {
100-
if (secondaryConnection != null) {
101-
secondaryConnection.abort();
102-
secondaryConnection = null;
103-
}
104-
super.closeConnection();
105-
}
10657

10758
protected void createQueueAndBindToExchange(Binding binding, boolean durable) throws IOException {
10859
channel.exchangeDeclare(binding.x, "direct", durable);
@@ -111,8 +62,7 @@ protected void createQueueAndBindToExchange(Binding binding, boolean durable) th
11162
}
11263

11364
protected void declareDurableQueue(String q) throws IOException {
114-
(secondaryChannel == null ? channel : secondaryChannel)
115-
.queueDeclare(q, true, false, false, null);
65+
alternateChannel.queueDeclare(q, true, false, false, null);
11666
}
11767

11868
protected void deleteExchangeAndQueue(Binding binding) throws IOException {
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
package com.rabbitmq.client.test.functional;
2+
3+
import com.rabbitmq.client.AMQP;
4+
import com.rabbitmq.client.Channel;
5+
import com.rabbitmq.client.Connection;
6+
import com.rabbitmq.client.ConnectionFactory;
7+
import com.rabbitmq.client.test.BrokerTestCase;
8+
9+
import java.io.IOException;
10+
11+
/**
12+
* Base class for tests which would like a second, clustered node.
13+
*/
14+
public class ClusteredTestBase extends BrokerTestCase {
15+
// If these are non-null then the secondary node is up and clustered
16+
public Channel clusteredChannel;
17+
public Connection clusteredConnection;
18+
19+
// These will always be non-null - if there is clustering they will point
20+
// to the secondary node, otherwise the primary
21+
public Channel alternateChannel;
22+
public Connection alternateConnection;
23+
24+
@Override
25+
public void openChannel() throws IOException {
26+
super.openChannel();
27+
28+
if (clusteredConnection != null) {
29+
clusteredChannel = clusteredConnection.createChannel();
30+
}
31+
32+
alternateChannel = clusteredChannel == null ? channel : clusteredChannel;
33+
}
34+
35+
private static boolean nonClusteredWarningPrinted;
36+
37+
@Override
38+
public void openConnection() throws IOException {
39+
super.openConnection();
40+
if (clusteredConnection == null) {
41+
try {
42+
ConnectionFactory cf2 = connectionFactory.clone();
43+
cf2.setHost("localhost");
44+
cf2.setPort(5673);
45+
clusteredConnection = cf2.newConnection();
46+
}
47+
catch (IOException e) {
48+
// Must be no secondary node
49+
}
50+
}
51+
52+
if (clusteredConnection != null &&
53+
!clustered(connection, clusteredConnection)) {
54+
clusteredConnection.close();
55+
clusteredConnection = null;
56+
57+
if (!nonClusteredWarningPrinted) {
58+
System.out.println("NOTE: Only one clustered node was detected - certain tests that");
59+
System.out.println("could test clustering will not do so.");
60+
nonClusteredWarningPrinted = true;
61+
}
62+
}
63+
64+
alternateConnection = clusteredConnection == null ? connection : clusteredConnection;
65+
}
66+
67+
private boolean clustered(Connection c1, Connection c2) throws IOException {
68+
Channel ch1 = c1.createChannel();
69+
Channel ch2 = c2.createChannel();
70+
// autodelete but not exclusive
71+
String q = ch1.queueDeclare("", false, false, true, null).getQueue();
72+
73+
try {
74+
ch2.queueDeclarePassive(q);
75+
} catch (IOException e) {
76+
checkShutdownSignal(AMQP.NOT_FOUND, e);
77+
// If we can't see the queue, secondary node must be up but not
78+
// clustered, hence not interesting to us
79+
return false;
80+
}
81+
82+
ch1.queueDelete(q);
83+
ch1.close();
84+
ch2.close();
85+
86+
return true;
87+
}
88+
89+
@Override
90+
public void closeChannel() throws IOException {
91+
if (clusteredChannel != null) {
92+
clusteredChannel.abort();
93+
clusteredChannel = null;
94+
alternateChannel = null;
95+
}
96+
super.closeChannel();
97+
}
98+
99+
@Override
100+
public void closeConnection() throws IOException {
101+
if (clusteredConnection != null) {
102+
clusteredConnection.abort();
103+
clusteredConnection = null;
104+
alternateConnection = null;
105+
}
106+
super.closeConnection();
107+
}
108+
}

test/src/com/rabbitmq/client/test/server/DurableBindingLifecycle.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -49,13 +49,15 @@
4949
*
5050
*/
5151
public class DurableBindingLifecycle extends BindingLifecycleBase {
52-
5352
@Override
5453
protected void restart() throws IOException {
55-
if (secondaryConnection != null) {
56-
secondaryConnection.abort();
57-
secondaryConnection = null;
58-
secondaryChannel = null;
54+
if (clusteredConnection != null) {
55+
clusteredConnection.abort();
56+
clusteredConnection = null;
57+
clusteredChannel = null;
58+
alternateConnection = null;
59+
alternateChannel = null;
60+
5961
Host.executeCommand("cd ../rabbitmq-test; make restart-secondary-node");
6062
}
6163
tearDown();

test/src/com/rabbitmq/client/test/server/EffectVisibilityCrossNodeTest.java

Lines changed: 7 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1,67 +1,27 @@
11
package com.rabbitmq.client.test.server;
22

3-
import com.rabbitmq.client.Channel;
4-
import com.rabbitmq.client.Connection;
5-
import com.rabbitmq.client.ConnectionFactory;
63
import com.rabbitmq.client.MessageProperties;
74
import com.rabbitmq.client.QueueingConsumer;
8-
import com.rabbitmq.client.test.BrokerTestCase;
5+
import com.rabbitmq.client.test.functional.ClusteredTestBase;
96

107
import java.io.IOException;
118

129
/**
1310
* From bug 19844 - we want to be sure that publish vs everything else can't
1411
* happen out of order
1512
*/
16-
public class EffectVisibilityCrossNodeTest extends BrokerTestCase {
13+
public class EffectVisibilityCrossNodeTest extends ClusteredTestBase {
1714
private static final String exchange = "exchange";
1815

19-
private Channel secondaryChannel;
20-
private Connection secondaryConnection;
21-
2216
private String[] queues = new String[QUEUES];
2317

24-
@Override
25-
public void openChannel() throws IOException {
26-
super.openChannel();
27-
secondaryChannel = secondaryConnection.createChannel();
28-
}
29-
30-
@Override
31-
public void openConnection() throws IOException {
32-
super.openConnection();
33-
if (secondaryConnection == null) {
34-
ConnectionFactory cf2 = connectionFactory.clone();
35-
cf2.setHost("localhost");
36-
cf2.setPort(5673);
37-
secondaryConnection = cf2.newConnection();
38-
}
39-
}
40-
41-
@Override
42-
public void closeChannel() throws IOException {
43-
if (secondaryChannel != null) {
44-
secondaryChannel.abort();
45-
secondaryChannel = null;
46-
}
47-
super.closeChannel();
48-
}
49-
50-
@Override
51-
public void closeConnection() throws IOException {
52-
if (secondaryConnection != null) {
53-
secondaryConnection.abort();
54-
secondaryConnection = null;
55-
}
56-
super.closeConnection();
57-
}
58-
5918
@Override
6019
protected void createResources() throws IOException {
6120
channel.exchangeDeclare(exchange, "fanout");
21+
6222
for (int i = 0; i < queues.length ; i++) {
63-
queues[i] = secondaryChannel.queueDeclare().getQueue();
64-
secondaryChannel.queueBind(queues[i], exchange, "");
23+
queues[i] = alternateChannel.queueDeclare().getQueue();
24+
alternateChannel.queueBind(queues[i], exchange, "");
6525
}
6626
}
6727

@@ -90,8 +50,8 @@ public void testEffectVisibility() throws Exception {
9050
}
9151

9252
for (int i = 0; i < queues.length ; i++) {
93-
QueueingConsumer consumer = new QueueingConsumer(secondaryChannel);
94-
secondaryChannel.basicConsume(queues[i], true, consumer);
53+
QueueingConsumer consumer = new QueueingConsumer(alternateChannel);
54+
alternateChannel.basicConsume(queues[i], true, consumer);
9555

9656
for (int j = 0; j < MESSAGES_PER_COMMIT * COMMITS; j++) {
9757
QueueingConsumer.Delivery delivery = consumer.nextDelivery(1000);

test/src/com/rabbitmq/examples/MulticastMain.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ public static void main(String[] args) {
132132
flags, producerTxSize,
133133
1000L * samplingInterval,
134134
rateLimit, minMsgSize, timeLimit);
135-
channel.setReturnListener(p);
135+
channel.setReturnListener(p);
136136
Thread t = new Thread(p);
137137
producerThreads[i] = t;
138138
t.start();

test/src/com/rabbitmq/examples/ProducerMain.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import com.rabbitmq.client.Connection;
4141
import com.rabbitmq.client.ConnectionFactory;
4242
import com.rabbitmq.client.MessageProperties;
43+
import com.rabbitmq.client.AMQP.BasicProperties;
4344

4445
public class ProducerMain implements Runnable {
4546
public static final int SUMMARY_EVERY_MS = 1000;
@@ -191,6 +192,9 @@ public void sendBatch(String queueName) throws IOException {
191192

192193
long nextSummaryTime = startTime + SUMMARY_EVERY_MS;
193194
byte[] message = new byte[256];
195+
BasicProperties props = shouldPersist() ?
196+
MessageProperties.MINIMAL_PERSISTENT_BASIC :
197+
MessageProperties.MINIMAL_BASIC;
194198
for (int i = 0; i < _messageCount; i++) {
195199
ByteArrayOutputStream acc = new ByteArrayOutputStream();
196200
DataOutputStream d = new DataOutputStream(acc);
@@ -205,8 +209,7 @@ public void sendBatch(String queueName) throws IOException {
205209
acc.flush();
206210
byte[] message0 = acc.toByteArray();
207211
System.arraycopy(message0, 0, message, 0, message0.length);
208-
_channel.basicPublish("", queueName, shouldPersist() ? MessageProperties.MINIMAL_PERSISTENT_BASIC : MessageProperties.MINIMAL_BASIC,
209-
message);
212+
_channel.basicPublish("", queueName, props, message);
210213
sent++;
211214
if (shouldCommit()) {
212215
if ((sent % _commitEvery) == 0) {

0 commit comments

Comments
 (0)