|
46 | 46 | import java.io.IOException; |
47 | 47 |
|
48 | 48 | public class ConfirmDontLoseMessages { |
49 | | - final static int MSG_COUNT = 10000; |
| 49 | + static int msgCount = 10000; |
50 | 50 | final static String QUEUE_NAME = "confirm-test"; |
51 | 51 | static ConnectionFactory connectionFactory; |
52 | 52 |
|
53 | 53 | public static void main(String[] args) |
54 | 54 | throws IOException, InterruptedException |
55 | 55 | { |
| 56 | + if (args.length > 0) { |
| 57 | + msgCount = Integer.parseInt(args[0]); |
| 58 | + } |
| 59 | + |
| 60 | + System.out.printf("msgCount = %d\n", msgCount); |
| 61 | + |
56 | 62 | connectionFactory = new ConnectionFactory(); |
57 | 63 |
|
58 | | - // Publish MSG_COUNT messages and wait for confirms. |
| 64 | + // Publish msgCount messages and wait for confirms. |
59 | 65 | (new Thread(new Consumer())).start(); |
60 | | - // Consume MSG_COUNT messages. |
| 66 | + // Consume msgCount messages. |
61 | 67 | (new Thread(new Publisher())).start(); |
62 | 68 | } |
63 | 69 |
|
@@ -87,7 +93,7 @@ public void handleAck(long seqNo, |
87 | 93 | ch.confirmSelect(); |
88 | 94 |
|
89 | 95 | // Publish |
90 | | - for (long i = 0; i < MSG_COUNT; ++i) { |
| 96 | + for (long i = 0; i < msgCount; ++i) { |
91 | 97 | ackSet.add(i); |
92 | 98 | ch.basicPublish("", QUEUE_NAME, |
93 | 99 | MessageProperties.PERSISTENT_BASIC, |
@@ -124,7 +130,7 @@ public void run() { |
124 | 130 | // Consume |
125 | 131 | QueueingConsumer qc = new QueueingConsumer(ch); |
126 | 132 | ch.basicConsume(QUEUE_NAME, true, qc); |
127 | | - for (int i = 0; i < MSG_COUNT; ++i) { |
| 133 | + for (int i = 0; i < msgCount; ++i) { |
128 | 134 | qc.nextDelivery(); |
129 | 135 | } |
130 | 136 |
|
|
0 commit comments