Skip to content

Commit b80053e

Browse files
author
Simon MacMullen
committed
Merge default to amqp_0_9_1
2 parents 5810495 + cefddb5 commit b80053e

File tree

6 files changed

+166
-12
lines changed

6 files changed

+166
-12
lines changed

test/src/com/rabbitmq/client/test/BrokerTestCase.java

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -159,19 +159,36 @@ protected GetResponse basicGet(String q) throws IOException {
159159
}
160160

161161
protected void basicPublishPersistent(String q) throws IOException {
162-
channel.basicPublish("", q, MessageProperties.PERSISTENT_TEXT_PLAIN, "persistent message".getBytes());
162+
basicPublishPersistent("persistent message".getBytes(), q);
163163
}
164164

165-
protected void basicPublishPersistent(String x, String routingKey) throws IOException {
166-
channel.basicPublish(x, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, "persistent message".getBytes());
165+
protected void basicPublishPersistent(byte[] msg, String q) throws IOException {
166+
basicPublishPersistent(msg, "", q);
167167
}
168168

169+
protected void basicPublishPersistent(String x, String routingKey) throws IOException {
170+
basicPublishPersistent("persistent message".getBytes(), x, routingKey);
171+
}
172+
173+
174+
protected void basicPublishPersistent(byte[] msg, String x, String routingKey) throws IOException {
175+
channel.basicPublish(x, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, msg);
176+
}
177+
169178
protected void basicPublishVolatile(String q) throws IOException {
170-
channel.basicPublish("", q, MessageProperties.TEXT_PLAIN, "not persistent message".getBytes());
179+
basicPublishVolatile("not persistent message".getBytes(), q);
180+
}
181+
182+
protected void basicPublishVolatile(byte[] msg, String q) throws IOException {
183+
basicPublishVolatile(msg, "", q);
171184
}
172185

173186
protected void basicPublishVolatile(String x, String routingKey) throws IOException {
174-
channel.basicPublish(x, routingKey, MessageProperties.TEXT_PLAIN, "not persistent message".getBytes());
187+
basicPublishVolatile("not persistent message".getBytes(), x, routingKey);
188+
}
189+
190+
protected void basicPublishVolatile(byte[] msg, String x, String routingKey) throws IOException {
191+
channel.basicPublish(x, routingKey, MessageProperties.TEXT_PLAIN, msg);
175192
}
176193

177194
protected void declareAndBindDurableQueue(String q, String x, String r) throws IOException {
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
package com.rabbitmq.client.test.server;
2+
3+
import com.rabbitmq.client.Channel;
4+
import com.rabbitmq.client.Connection;
5+
import com.rabbitmq.client.ConnectionFactory;
6+
import com.rabbitmq.client.MessageProperties;
7+
import com.rabbitmq.client.QueueingConsumer;
8+
import com.rabbitmq.client.test.BrokerTestCase;
9+
10+
import java.io.IOException;
11+
12+
/**
13+
* From bug 19844 - we want to be sure that publish vs everything else can't
14+
* happen out of order
15+
*/
16+
public class EffectVisibilityCrossNodeTest extends BrokerTestCase {
17+
private static final String exchange = "exchange";
18+
19+
private Channel secondaryChannel;
20+
private Connection secondaryConnection;
21+
22+
private String[] queues = new String[QUEUES];
23+
24+
@Override
25+
public void openChannel() throws IOException {
26+
super.openChannel();
27+
secondaryChannel = secondaryConnection.createChannel();
28+
}
29+
30+
@Override
31+
public void openConnection() throws IOException {
32+
super.openConnection();
33+
if (secondaryConnection == null) {
34+
ConnectionFactory cf2 = connectionFactory.clone();
35+
cf2.setHost("localhost");
36+
cf2.setPort(5673);
37+
secondaryConnection = cf2.newConnection();
38+
}
39+
}
40+
41+
@Override
42+
public void closeChannel() throws IOException {
43+
if (secondaryChannel != null) {
44+
secondaryChannel.abort();
45+
secondaryChannel = null;
46+
}
47+
super.closeChannel();
48+
}
49+
50+
@Override
51+
public void closeConnection() throws IOException {
52+
if (secondaryConnection != null) {
53+
secondaryConnection.abort();
54+
secondaryConnection = null;
55+
}
56+
super.closeConnection();
57+
}
58+
59+
@Override
60+
protected void createResources() throws IOException {
61+
channel.exchangeDeclare(exchange, "fanout");
62+
for (int i = 0; i < queues.length ; i++) {
63+
queues[i] = secondaryChannel.queueDeclare().getQueue();
64+
secondaryChannel.queueBind(queues[i], exchange, "");
65+
}
66+
}
67+
68+
@Override
69+
protected void releaseResources() throws IOException {
70+
channel.exchangeDelete(exchange);
71+
}
72+
73+
private static final int QUEUES = 5;
74+
private static final int COMMITS = 500;
75+
private static final int MESSAGES_PER_COMMIT = 10;
76+
77+
public void testEffectVisibility() throws Exception {
78+
channel.txSelect();
79+
80+
for (int i = 0; i < COMMITS; i++) {
81+
for (int j = 0; j < MESSAGES_PER_COMMIT; j++) {
82+
channel.basicPublish(exchange, "", MessageProperties.MINIMAL_BASIC, ("" + (i * MESSAGES_PER_COMMIT + j)).getBytes());
83+
}
84+
channel.txCommit();
85+
86+
for (int j = 0; j < MESSAGES_PER_COMMIT; j++) {
87+
channel.basicPublish(exchange, "", MessageProperties.MINIMAL_BASIC, "bad".getBytes());
88+
}
89+
channel.txRollback();
90+
}
91+
92+
for (int i = 0; i < queues.length ; i++) {
93+
QueueingConsumer consumer = new QueueingConsumer(secondaryChannel);
94+
secondaryChannel.basicConsume(queues[i], true, consumer);
95+
96+
for (int j = 0; j < MESSAGES_PER_COMMIT * COMMITS; j++) {
97+
QueueingConsumer.Delivery delivery = consumer.nextDelivery(1000);
98+
assertNotNull(delivery);
99+
int sequence = Integer.parseInt(new String(delivery.getBody()));
100+
101+
assertEquals(j, sequence);
102+
}
103+
}
104+
}
105+
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package com.rabbitmq.client.test.server;
2+
3+
import java.io.IOException;
4+
5+
public class PersisterRestart6 extends RestartBase {
6+
7+
private static final String q = "Restart6";
8+
9+
public void testRestart() throws IOException, InterruptedException {
10+
declareDurableQueue(q);
11+
basicPublishPersistent("a".getBytes(), q);
12+
basicPublishPersistent("b".getBytes(), q);
13+
basicPublishPersistent("c".getBytes(), q);
14+
forceSnapshot();
15+
restart();
16+
assertTrue(new String(basicGet(q).getBody()).equals("a"));
17+
forceSnapshot();
18+
restart();
19+
restart();
20+
assertTrue(new String(basicGet(q).getBody()).equals("b"));
21+
assertTrue(new String(basicGet(q).getBody()).equals("c"));
22+
deleteQueue(q);
23+
}
24+
25+
}

test/src/com/rabbitmq/client/test/server/PersisterRestartTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ public static TestSuite suite() {
4242
suite.addTestSuite(PersisterRestart3.class);
4343
suite.addTestSuite(PersisterRestart4.class);
4444
suite.addTestSuite(PersisterRestart5.class);
45+
suite.addTestSuite(PersisterRestart6.class);
4546
return suite;
4647
}
4748

test/src/com/rabbitmq/client/test/server/ServerTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ public static TestSuite suite() {
3939
TestSuite suite = new TestSuite("server-tests");
4040
suite.addTestSuite(Permissions.class);
4141
suite.addTestSuite(DurableBindingLifecycle.class);
42+
suite.addTestSuite(EffectVisibilityCrossNodeTest.class);
4243
suite.addTest(PersisterRestartTests.suite());
4344
suite.addTestSuite(ExclusiveQueueDurability.class);
4445
suite.addTestSuite(ExchangeEquivalence.class);

test/src/com/rabbitmq/examples/MulticastMain.java

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -84,13 +84,15 @@ public static void main(String[] args) {
8484
int minMsgSize = intArg(cmd, 's', 0);
8585
int timeLimit = intArg(cmd, 'z', 0);
8686
List flags = lstArg(cmd, 'f');
87+
int frameMax = intArg(cmd, 'M', 0);
8788

8889
//setup
8990
String id = UUID.randomUUID().toString();
9091
Stats stats = new Stats(1000L * samplingInterval);
9192
ConnectionFactory factory = new ConnectionFactory();
9293
factory.setHost(hostName);
9394
factory.setPort(portNumber);
95+
factory.setRequestedFrameMax(frameMax);
9496

9597
Thread[] consumerThreads = new Thread[consumerCount];
9698
Connection[] consumerConnections = new Connection[consumerCount];
@@ -174,6 +176,7 @@ private static Options getOptions() {
174176
Option flag = new Option("f", "flag", true, "message flag");
175177
flag.setArgs(Option.UNLIMITED_VALUES);
176178
options.addOption(flag);
179+
options.addOption(new Option("M", "framemax", true, "frame max"));
177180
return options;
178181
}
179182

@@ -239,10 +242,12 @@ public void run() {
239242

240243
try {
241244

242-
for (; timeLimit == 0 || now < startTime + timeLimit;
243-
totalMsgCount++, msgCount++) {
245+
while (timeLimit == 0 || now < startTime + timeLimit) {
244246
delay(now);
245247
publish(createMessage(totalMsgCount));
248+
totalMsgCount++;
249+
msgCount++;
250+
246251
if (txSize != 0 && totalMsgCount % txSize == 0) {
247252
channel.txCommit();
248253
}
@@ -256,7 +261,7 @@ public void run() {
256261
}
257262

258263
System.out.println("sending rate avg: " +
259-
(totalMsgCount * 1000 / (now - startTime)) +
264+
(totalMsgCount * 1000L / (now - startTime)) +
260265
" msg/s");
261266

262267
}
@@ -285,7 +290,7 @@ private void delay(long now)
285290
}
286291
if (elapsed > interval) {
287292
System.out.println("sending rate: " +
288-
(msgCount * 1000 / elapsed) +
293+
(msgCount * 1000L / elapsed) +
289294
" msg/s");
290295
msgCount = 0;
291296
lastStatsTime = now;
@@ -345,15 +350,15 @@ public void run() {
345350

346351
try {
347352

348-
for (; timeLimit == 0 || now < startTime + timeLimit;
349-
totalMsgCount++) {
353+
while (timeLimit == 0 || now < startTime + timeLimit) {
350354
Delivery delivery;
351355
if (timeLimit == 0) {
352356
delivery = q.nextDelivery();
353357
} else {
354358
delivery = q.nextDelivery(startTime + timeLimit - now);
355359
if (delivery == null) break;
356360
}
361+
totalMsgCount++;
357362

358363
DataInputStream d = new DataInputStream(new ByteArrayInputStream(delivery.getBody()));
359364
int msgSeq = d.readInt();
@@ -386,7 +391,7 @@ public void run() {
386391
long elapsed = now - startTime;
387392
if (elapsed > 0) {
388393
System.out.println("recving rate avg: " +
389-
(totalMsgCount * 1000 / elapsed) +
394+
(totalMsgCount * 1000L / elapsed) +
390395
" msg/s");
391396
}
392397
}

0 commit comments

Comments
 (0)