Skip to content

Commit 759186b

Browse files
author
Simon MacMullen
committed
Simple performance test.
1 parent c437617 commit 759186b

File tree

1 file changed

+190
-0
lines changed

1 file changed

+190
-0
lines changed
Lines changed: 190 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,190 @@
1+
package com.rabbitmq.examples;
2+
3+
import com.rabbitmq.client.AMQP;
4+
import com.rabbitmq.client.Channel;
5+
import com.rabbitmq.client.Connection;
6+
import com.rabbitmq.client.ConnectionFactory;
7+
import com.rabbitmq.client.Consumer;
8+
import com.rabbitmq.client.DefaultConsumer;
9+
import com.rabbitmq.client.Envelope;
10+
import com.rabbitmq.client.MessageProperties;
11+
import com.rabbitmq.client.ShutdownSignalException;
12+
13+
import java.io.IOException;
14+
import java.util.HashMap;
15+
import java.util.Map;
16+
import java.util.UUID;
17+
import java.util.concurrent.CountDownLatch;
18+
19+
public class DirectReplyToPerformance {
20+
private static final String DIRECT_QUEUE = "amq.rabbitmq.reply-to";
21+
private static final String SERVER_QUEUE = "server-queue";
22+
private static final int CLIENTS = 1;
23+
private static final int RPC_COUNT_PER_CLIENT = 1000;
24+
25+
public static void main(String[] args) throws Exception {
26+
String uri = args[0];
27+
start(new Server(uri));
28+
29+
doTest(uri, DirectReply.class);
30+
doTest(uri, SharedReplyQueue.class);
31+
doTest(uri, PerRPCReplyQueue.class);
32+
System.exit(0);
33+
}
34+
35+
private static void doTest(String uri, Class strategy) throws Exception {
36+
System.out.println("*** " + strategy.getSimpleName());
37+
CountDownLatch latch = new CountDownLatch(CLIENTS);
38+
for (int i = 0; i < CLIENTS; i++) {
39+
start(new Client(uri, latch, (ReplyQueueStrategy) strategy.newInstance()));
40+
}
41+
latch.await();
42+
}
43+
44+
private static void start(final Task task) {
45+
new Thread(new Runnable() {
46+
public void run() {
47+
try {
48+
task.run();
49+
} catch (Exception e) {
50+
System.out.println(e.getMessage());
51+
e.printStackTrace();
52+
System.exit(1);
53+
}
54+
}
55+
}).start();
56+
}
57+
58+
private interface Task {
59+
public void run() throws Exception;
60+
}
61+
62+
private interface ReplyQueueStrategy {
63+
public String preMsg(Channel ch, Consumer consumer) throws IOException;
64+
public void postMsg(Channel ch) throws IOException;
65+
}
66+
67+
public static class DirectReply implements ReplyQueueStrategy {
68+
private String ctag;
69+
70+
public String preMsg(Channel ch, Consumer consumer) throws IOException {
71+
ctag = ch.basicConsume(DIRECT_QUEUE, true, consumer);
72+
return DIRECT_QUEUE;
73+
}
74+
75+
public void postMsg(Channel ch) throws IOException {
76+
ch.basicCancel(ctag);
77+
}
78+
}
79+
80+
public static class SharedReplyQueue implements ReplyQueueStrategy {
81+
private String queue;
82+
private String ctag;
83+
84+
public SharedReplyQueue() {
85+
queue = "reply-queue-" + UUID.randomUUID();
86+
}
87+
88+
public String preMsg(Channel ch, Consumer consumer) throws IOException {
89+
Map<String, Object> args = new HashMap<String, Object>();
90+
args.put("x-expires", 10000);
91+
ch.queueDeclare(queue, false, false, false, args);
92+
ctag = ch.basicConsume(queue, true, consumer);
93+
return queue;
94+
}
95+
96+
public void postMsg(Channel ch) throws IOException {
97+
ch.basicCancel(ctag);
98+
}
99+
}
100+
101+
public static class PerRPCReplyQueue implements ReplyQueueStrategy {
102+
private String queue;
103+
104+
public String preMsg(Channel ch, Consumer consumer) throws IOException {
105+
queue = ch.queueDeclare().getQueue();
106+
ch.basicConsume(queue, true, consumer);
107+
return queue;
108+
}
109+
110+
public void postMsg(Channel ch) throws IOException {
111+
ch.queueDelete(queue);
112+
}
113+
}
114+
private static class Server implements Task {
115+
private String uri;
116+
117+
public Server(String uri) {
118+
this.uri = uri;
119+
}
120+
121+
public void run() throws Exception {
122+
ConnectionFactory factory = new ConnectionFactory();
123+
factory.setUri(uri);
124+
Connection connection = factory.newConnection();
125+
final Channel ch = connection.createChannel();
126+
ch.queueDeclare(SERVER_QUEUE, false, true, false, null);
127+
ch.basicConsume(SERVER_QUEUE, true, new DefaultConsumer(ch) {
128+
@Override
129+
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
130+
String replyTo = properties.getReplyTo();
131+
ch.basicPublish("", replyTo, MessageProperties.MINIMAL_BASIC, "Hello client!".getBytes());
132+
}
133+
});
134+
}
135+
}
136+
137+
private static class Client implements Task {
138+
private String uri;
139+
private CountDownLatch globalLatch;
140+
private ReplyQueueStrategy strategy;
141+
142+
public Client(String uri, CountDownLatch latch, ReplyQueueStrategy strategy) {
143+
this.uri = uri;
144+
this.globalLatch = latch;
145+
this.strategy = strategy;
146+
}
147+
148+
public void run() throws Exception {
149+
ConnectionFactory factory = new ConnectionFactory();
150+
factory.setUri(uri);
151+
final CountDownLatch[] latch = new CountDownLatch[1];
152+
long time = System.nanoTime();
153+
Consumer cons = new ClientConsumer(latch);
154+
Connection conn = factory.newConnection();
155+
Channel ch = conn.createChannel();
156+
for (int i = 0; i < RPC_COUNT_PER_CLIENT; i++) {
157+
latch[0] = new CountDownLatch(1);
158+
159+
String replyTo = strategy.preMsg(ch, cons);
160+
AMQP.BasicProperties props = MessageProperties.MINIMAL_BASIC.builder().replyTo(replyTo).build();
161+
ch.basicPublish("", SERVER_QUEUE, props, "Hello client!".getBytes());
162+
latch[0].await();
163+
strategy.postMsg(ch);
164+
}
165+
conn.close();
166+
System.out.println((System.nanoTime() - time) / (1000 * RPC_COUNT_PER_CLIENT) + "us per RPC");
167+
globalLatch.countDown();
168+
}
169+
}
170+
171+
private static class ClientConsumer implements Consumer {
172+
private CountDownLatch[] latch;
173+
174+
public ClientConsumer(CountDownLatch[] latch) {
175+
this.latch = latch;
176+
}
177+
178+
@Override public void handleConsumeOk(String consumerTag) {}
179+
@Override public void handleCancelOk(String consumerTag) {}
180+
@Override public void handleCancel(String consumerTag) throws IOException {}
181+
@Override public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {}
182+
@Override public void handleRecoverOk(String consumerTag) {}
183+
184+
@Override
185+
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
186+
latch[0].countDown();
187+
}
188+
};
189+
190+
}

0 commit comments

Comments
 (0)