Skip to content

Commit c437617

Browse files
author
Simon MacMullen
committed
Add a test
1 parent fb042bc commit c437617

File tree

2 files changed

+81
-0
lines changed

2 files changed

+81
-0
lines changed

test/src/com/rabbitmq/client/test/functional/FunctionalTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,5 +78,6 @@ public static void add(TestSuite suite) {
7878
suite.addTestSuite(ConnectionRecovery.class);
7979
suite.addTestSuite(ExceptionHandling.class);
8080
suite.addTestSuite(PerConsumerPrefetch.class);
81+
suite.addTestSuite(ReplyTo.class);
8182
}
8283
}
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
package com.rabbitmq.client.test.functional;
2+
3+
import com.rabbitmq.client.AMQP;
4+
import com.rabbitmq.client.Channel;
5+
import com.rabbitmq.client.GetResponse;
6+
import com.rabbitmq.client.MessageProperties;
7+
import com.rabbitmq.client.QueueingConsumer;
8+
import com.rabbitmq.client.test.BrokerTestCase;
9+
10+
import java.io.IOException;
11+
12+
public class ReplyTo extends BrokerTestCase {
13+
private static final String QUEUE = "amq.rabbitmq.reply-to";
14+
15+
public void testRoundTrip() throws IOException, InterruptedException {
16+
QueueingConsumer c = new QueueingConsumer(channel);
17+
String replyTo = rpcFirstHalf(c);
18+
channel.confirmSelect();
19+
basicPublishVolatile("response".getBytes(), "", replyTo, MessageProperties.BASIC);
20+
channel.waitForConfirms();
21+
22+
QueueingConsumer.Delivery del = c.nextDelivery();
23+
assertEquals("response", new String(del.getBody()));
24+
}
25+
26+
public void testHack() throws IOException, InterruptedException {
27+
QueueingConsumer c = new QueueingConsumer(channel);
28+
String replyTo = rpcFirstHalf(c);
29+
// 5 chars should overwrite part of the key but not the pid; aiming to prove
30+
// we can't publish using just the pid
31+
replyTo = replyTo.substring(0, replyTo.length() - 5) + "xxxxx";
32+
basicPublishVolatile("response".getBytes(), "", replyTo, MessageProperties.BASIC);
33+
34+
QueueingConsumer.Delivery del = c.nextDelivery(500);
35+
assertNull(del);
36+
}
37+
38+
public void testConsumeFail() throws IOException, InterruptedException {
39+
QueueingConsumer c = new QueueingConsumer(channel);
40+
Channel ch = connection.createChannel();
41+
try {
42+
ch.basicConsume(QUEUE, false, c);
43+
} catch (IOException e) {
44+
// Can't have ack mode
45+
checkShutdownSignal(AMQP.PRECONDITION_FAILED, e);
46+
}
47+
48+
ch = connection.createChannel();
49+
ch.basicConsume(QUEUE, true, c);
50+
try {
51+
ch.basicConsume(QUEUE, true, c);
52+
} catch (IOException e) {
53+
// Can't have multiple consumers
54+
checkShutdownSignal(AMQP.PRECONDITION_FAILED, e);
55+
}
56+
}
57+
58+
public void testConsumeSuccess() throws IOException, InterruptedException {
59+
QueueingConsumer c = new QueueingConsumer(channel);
60+
String ctag = channel.basicConsume(QUEUE, true, c);
61+
channel.basicCancel(ctag);
62+
63+
String ctag2 = channel.basicConsume(QUEUE, true, c);
64+
channel.basicCancel(ctag2);
65+
assertNotSame(ctag, ctag2);
66+
}
67+
68+
private String rpcFirstHalf(QueueingConsumer c) throws IOException {
69+
channel.basicConsume(QUEUE, true, c);
70+
String serverQueue = channel.queueDeclare().getQueue();
71+
basicPublishVolatile("request".getBytes(), "", serverQueue, props());
72+
73+
GetResponse req = channel.basicGet(serverQueue, true);
74+
return req.getProps().getReplyTo();
75+
}
76+
77+
private AMQP.BasicProperties props() {
78+
return MessageProperties.BASIC.builder().replyTo(QUEUE).build();
79+
}
80+
}

0 commit comments

Comments
 (0)