2121import com .rabbitmq .client .Connection ;
2222import com .rabbitmq .client .QueueingConsumer ;
2323
24+ /**
25+ * Java Application to create multiple connections with multiple channels,
26+ * sending and receiving messages on each channel on distinct threads.
27+ * All messages are sent to and consumed from a common, non-durable, shared, auto-delete queue.
28+ * <p/>
29+ * The first thread (thread number 0) outputs statistics for each message it receives.
30+ */
2431public class ManyConnections {
25- public static double rate ;
26- public static int connectionCount ;
27- public static int channelPerConnectionCount ;
28- public static int heartbeatInterval ;
29-
30- public static int totalCount () {
31- return connectionCount * channelPerConnectionCount ;
32- }
32+ private static final String QUEUE_NAME = "ManyConnections" ;
33+ private static double rate ;
34+ private static int connectionCount ;
35+ private static int channelPerConnectionCount ;
36+ private static int heartbeatInterval ;
3337
38+ /**
39+ * @param args command-line parameters:
40+ * <p>
41+ * Four mandatory and one optional positional parameters:
42+ * </p>
43+ * <ul>
44+ * <li><i>AMQP-uri</i> - the AMQP uri to connect to the broker.
45+ * (See {@link ConnectionFactory#setUri(String) setUri()}.)
46+ * </li>
47+ * <li><i>connection-count</i> - the number of connections to create.
48+ * </li>
49+ * <li><i>channel-count</i> - the number of channels to create on <i>each</i> connection.
50+ * </li>
51+ * <li><i>heartbeat-interval</i> - the heartbeat interval, in seconds, for each channel.
52+ * Zero means no heartbeats.
53+ * </li>
54+ * <li><i>rate</i> - the message rate, in floating point messages per second
55+ * (<code>0.0 < rate < 50.0</code> is realistic). Default <code>1.0</code>.
56+ * </li>
57+ * </ul>
58+ * <p>
59+ * There are <i>connection-count</i> x <i>channel-count</i> threads created, each one sending messages at approximately the given rate.
60+ * </p>
61+ */
3462 public static void main (String [] args ) {
3563 try {
3664 if (args .length < 4 ) {
3765 System .err
38- .println ("Usage: ManyConnections uri connCount chanPerConnCount heartbeatInterval [rate]" );
66+ .println ("Usage: ManyConnections uri connCount chanPerConnCount heartbeatInterval [rate]" );
3967 System .exit (2 );
4068 }
4169
4270 String uri = args [0 ];
4371 connectionCount = Integer .parseInt (args [1 ]);
4472 channelPerConnectionCount = Integer .parseInt (args [2 ]);
73+ final int totalCount = connectionCount * channelPerConnectionCount ;
4574 heartbeatInterval = Integer .parseInt (args [3 ]);
4675 rate = (args .length > 4 ) ? Double .parseDouble (args [4 ]) : 1.0 ;
76+ final int delayBetweenMessages = (int ) (1000 / rate );
4777
4878 ConnectionFactory factory = new ConnectionFactory ();
4979 factory .setRequestedHeartbeat (heartbeatInterval );
80+ factory .setUri (uri );
81+
5082 for (int i = 0 ; i < connectionCount ; i ++) {
5183 System .out .println ("Starting connection " + i );
52- factory .setUri (uri );
5384 final Connection conn = factory .newConnection ();
5485
5586 for (int j = 0 ; j < channelPerConnectionCount ; j ++) {
@@ -58,14 +89,13 @@ public static void main(String[] args) {
5889 final int threadNumber = i * channelPerConnectionCount + j ;
5990 System .out .println ("Starting " + threadNumber + " " + ch
6091 + " thread..." );
61- new Thread (new Runnable () {
62- public void run () {
63- runChannel (threadNumber , conn , ch );
64- }
65- }).start ();
92+ new Thread
93+ ( new ChannelRunnable (threadNumber , ch , delayBetweenMessages , totalCount )
94+ , "ManyConnections thread " + threadNumber
95+ ).start ();
6696 }
6797 }
68- System .out .println ("Started " + totalCount ()
98+ System .out .println ("Started " + totalCount
6999 + " channels and threads." );
70100 } catch (Exception e ) {
71101 System .err .println ("Main thread caught exception: " + e );
@@ -74,33 +104,49 @@ public void run() {
74104 }
75105 }
76106
77- public static void runChannel (int threadNumber , Connection conn , Channel ch ) {
107+ private final static class ChannelRunnable implements Runnable {
108+
109+ private final int threadNumber ;
110+ private final Channel ch ;
111+ private final int delayBetweenMessages ;
112+ private final int totalCount ;
113+
114+ public ChannelRunnable (int threadNumber , Channel ch , int delayBetweenMessages , int totalCount ) {
115+ this .threadNumber = threadNumber ;
116+ this .ch = ch ;
117+ this .delayBetweenMessages = delayBetweenMessages ;
118+ this .totalCount = totalCount ;
119+ }
120+
121+ public void run () {
122+ runChannel (threadNumber , ch , delayBetweenMessages , totalCount );
123+ }
124+
125+ }
126+
127+ private final static void runChannel (int threadNumber , final Channel ch , int delayLen , int totalCount ) {
78128 try {
79- int delayLen = (int ) (1000 / rate );
80129 long startTime = System .currentTimeMillis ();
81130
82131 int msgCount = 0 ;
83- String queueName = "ManyConnections" ;
84- ch .queueDeclare (queueName , false , false , false , null );
132+ ch .queueDeclare (QUEUE_NAME , false , false , true , null );
85133
86134 QueueingConsumer consumer = new QueueingConsumer (ch );
87- ch .basicConsume (queueName , true , consumer );
135+ ch .basicConsume (QUEUE_NAME , true , consumer );
88136 while (true ) {
89137 String toSend = threadNumber + "/" + msgCount ++;
90- ch .basicPublish ("" , queueName , null , toSend .getBytes ());
138+ ch .basicPublish ("" , QUEUE_NAME , null , toSend .getBytes ());
91139 Thread .sleep (delayLen );
92140
93141 QueueingConsumer .Delivery delivery = consumer .nextDelivery ();
94142 if (threadNumber == 0 ) {
95143 long now = System .currentTimeMillis ();
96144 double delta = (now - startTime ) / 1000.0 ;
97145 double actualRate = msgCount / delta ;
98- double totalRate = totalCount () * actualRate ;
99- System .out .println (threadNumber + " got message: "
100- + new String (delivery .getBody ()) + "; " + msgCount
101- + " messages in " + delta + " seconds ("
102- + actualRate + " Hz * " + totalCount ()
103- + " channels -> " + totalRate + " Hz)" );
146+ double totalRate = totalCount * actualRate ;
147+ System .out .println (String .format ("thread %3d got message: %20s; "
148+ + "%4d messages in %10.3f seconds (%8.2f Hz x %4d channels -> %8.2f Hz)"
149+ , threadNumber , "\" " + new String (delivery .getBody ()) + "\" " , msgCount , delta , actualRate , totalCount , totalRate ));
104150 }
105151 }
106152 } catch (Exception e ) {
0 commit comments