1919public class DirectReplyToPerformance {
2020 private static final String DIRECT_QUEUE = "amq.rabbitmq.reply-to" ;
2121 private static final String SERVER_QUEUE = "server-queue" ;
22- private static final int CLIENTS = 1 ;
23- private static final int RPC_COUNT_PER_CLIENT = 1000 ;
22+ private static final int CLIENTS = 5 ;
23+ private static final int RPC_COUNT_PER_CLIENT = 2000 ;
2424
2525 public static void main (String [] args ) throws Exception {
2626 String uri = args [0 ];
2727 start (new Server (uri ));
2828
29- doTest (uri , DirectReply .class );
30- doTest (uri , SharedReplyQueue .class );
31- doTest (uri , PerRPCReplyQueue .class );
29+ doTest (uri , DirectReply .class , true );
30+ doTest (uri , SharedReplyQueue .class , true );
31+ doTest (uri , PerRPCReplyQueue .class , true );
32+ doTest (uri , DirectReply .class , false );
33+ doTest (uri , SharedReplyQueue .class , false );
34+ doTest (uri , PerRPCReplyQueue .class , false );
3235 System .exit (0 );
3336 }
3437
35- private static void doTest (String uri , Class strategy ) throws Exception {
36- System .out .println ("*** " + strategy .getSimpleName ());
38+ private static void doTest (String uri , Class strategy , boolean reuseConnection ) throws Exception {
39+ System .out .println ("*** " + strategy .getSimpleName () + ( reuseConnection ? " (reusing connections)" : "" ) );
3740 CountDownLatch latch = new CountDownLatch (CLIENTS );
3841 for (int i = 0 ; i < CLIENTS ; i ++) {
39- start (new Client (uri , latch , (ReplyQueueStrategy ) strategy .newInstance ()));
42+ start (new Client (uri , latch , (ReplyQueueStrategy ) strategy .newInstance (), reuseConnection ));
4043 }
4144 latch .await ();
4245 }
@@ -138,11 +141,13 @@ private static class Client implements Task {
138141 private String uri ;
139142 private CountDownLatch globalLatch ;
140143 private ReplyQueueStrategy strategy ;
144+ private boolean reuseConnection ;
141145
142- public Client (String uri , CountDownLatch latch , ReplyQueueStrategy strategy ) {
146+ public Client (String uri , CountDownLatch latch , ReplyQueueStrategy strategy , boolean reuseConnection ) {
143147 this .uri = uri ;
144148 this .globalLatch = latch ;
145149 this .strategy = strategy ;
150+ this .reuseConnection = reuseConnection ;
146151 }
147152
148153 public void run () throws Exception {
@@ -151,18 +156,31 @@ public void run() throws Exception {
151156 final CountDownLatch [] latch = new CountDownLatch [1 ];
152157 long time = System .nanoTime ();
153158 Consumer cons = new ClientConsumer (latch );
154- Connection conn = factory .newConnection ();
155- Channel ch = conn .createChannel ();
159+ Connection conn = null ;
160+ Channel ch = null ;
161+ if (reuseConnection ) {
162+ conn = factory .newConnection ();
163+ ch = conn .createChannel ();
164+ }
156165 for (int i = 0 ; i < RPC_COUNT_PER_CLIENT ; i ++) {
157166 latch [0 ] = new CountDownLatch (1 );
167+ if (!reuseConnection ) {
168+ conn = factory .newConnection ();
169+ ch = conn .createChannel ();
170+ }
158171
159172 String replyTo = strategy .preMsg (ch , cons );
160173 AMQP .BasicProperties props = MessageProperties .MINIMAL_BASIC .builder ().replyTo (replyTo ).build ();
161174 ch .basicPublish ("" , SERVER_QUEUE , props , "Hello client!" .getBytes ());
162175 latch [0 ].await ();
163176 strategy .postMsg (ch );
177+ if (!reuseConnection ) {
178+ conn .close ();
179+ }
180+ }
181+ if (reuseConnection ) {
182+ conn .close ();
164183 }
165- conn .close ();
166184 System .out .println ((System .nanoTime () - time ) / (1000 * RPC_COUNT_PER_CLIENT ) + "us per RPC" );
167185 globalLatch .countDown ();
168186 }
0 commit comments