@@ -55,7 +55,9 @@ public static void main(String[] args)
5555 {
5656 connectionFactory = new ConnectionFactory ();
5757
58+ // Publish MSG_COUNT messages and wait for confirms.
5859 (new Thread (new Consumer ())).start ();
60+ // Consume MSG_COUNT messages.
5961 (new Thread (new Publisher ())).start ();
6062 }
6163
@@ -66,6 +68,7 @@ public void run() {
6668 try {
6769 long startTime = System .currentTimeMillis ();
6870
71+ // Setup
6972 Connection conn = connectionFactory .newConnection ();
7073 Channel ch = conn .createChannel ();
7174 ch .queueDeclare (QUEUE_NAME , true , false , true , null );
@@ -82,16 +85,19 @@ public void handleAck(long seqNo,
8285 }
8386 });
8487
88+ // Publish
8589 for (long i = 0 ; i < MSG_COUNT ; ++i ) {
8690 ackSet .add (i );
8791 ch .basicPublish ("" , QUEUE_NAME ,
8892 MessageProperties .PERSISTENT_BASIC ,
8993 "nop" .getBytes ());
9094 }
9195
96+ // Wait
9297 while (ackSet .size () > 0 )
9398 Thread .sleep (10 );
9499
100+ // Cleanup
95101 ch .close ();
96102 conn .close ();
97103
@@ -107,14 +113,19 @@ public void handleAck(long seqNo,
107113 static class Consumer implements Runnable {
108114 public void run () {
109115 try {
116+ // Setup
110117 Connection conn = connectionFactory .newConnection ();
111118 Channel ch = conn .createChannel ();
112119 ch .queueDeclare (QUEUE_NAME , true , false , true , null );
120+
121+ // Consume
113122 QueueingConsumer qc = new QueueingConsumer (ch );
114123 ch .basicConsume (QUEUE_NAME , true , qc );
115124 for (int i = 0 ; i < MSG_COUNT ; ++i ) {
116125 qc .nextDelivery ();
117126 }
127+
128+ // Consume
118129 ch .close ();
119130 conn .close ();
120131 } catch (Throwable e ) {
0 commit comments