Skip to content

Commit 564b2b5

Browse files
author
Rob Harrop
committed
Merged bug23564 into default
2 parents c7342e8 + 1bcaf89 commit 564b2b5

File tree

1 file changed

+139
-0
lines changed

1 file changed

+139
-0
lines changed
Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
// The contents of this file are subject to the Mozilla Public License
2+
// Version 1.1 (the "License"); you may not use this file except in
3+
// compliance with the License. You may obtain a copy of the License at
4+
// http://www.mozilla.org/MPL/
5+
//
6+
// Software distributed under the License is distributed on an "AS IS"
7+
// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
8+
// License for the specific language governing rights and limitations
9+
// under the License.
10+
//
11+
// The Original Code is RabbitMQ.
12+
//
13+
// The Initial Developers of the Original Code are LShift Ltd,
14+
// Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
15+
//
16+
// Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
17+
// Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
18+
// are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
19+
// Technologies LLC, and Rabbit Technologies Ltd.
20+
//
21+
// Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
22+
// Ltd. Portions created by Cohesive Financial Technologies LLC are
23+
// Copyright (C) 2007-2010 Cohesive Financial Technologies
24+
// LLC. Portions created by Rabbit Technologies Ltd are Copyright
25+
// (C) 2007-2010 Rabbit Technologies Ltd.
26+
//
27+
// All Rights Reserved.
28+
//
29+
// Contributor(s): ______________________________________.
30+
//
31+
32+
package com.rabbitmq.examples;
33+
34+
import com.rabbitmq.client.AMQP;
35+
import com.rabbitmq.client.AckListener;
36+
import com.rabbitmq.client.Channel;
37+
import com.rabbitmq.client.Connection;
38+
import com.rabbitmq.client.ConnectionFactory;
39+
import com.rabbitmq.client.MessageProperties;
40+
import com.rabbitmq.client.QueueingConsumer;
41+
42+
import java.util.Collections;
43+
import java.util.SortedSet;
44+
import java.util.TreeSet;
45+
46+
import java.io.IOException;
47+
48+
public class ConfirmDontLoseMessages {
49+
final static int MSG_COUNT = 10000;
50+
final static String QUEUE_NAME = "confirm-test";
51+
static ConnectionFactory connectionFactory;
52+
53+
public static void main(String[] args)
54+
throws IOException, InterruptedException
55+
{
56+
connectionFactory = new ConnectionFactory();
57+
58+
// Publish MSG_COUNT messages and wait for confirms.
59+
(new Thread(new Consumer())).start();
60+
// Consume MSG_COUNT messages.
61+
(new Thread(new Publisher())).start();
62+
}
63+
64+
static class Publisher implements Runnable {
65+
private volatile SortedSet<Long> ackSet =
66+
Collections.synchronizedSortedSet(new TreeSet<Long>());
67+
68+
public void run() {
69+
try {
70+
long startTime = System.currentTimeMillis();
71+
72+
// Setup
73+
Connection conn = connectionFactory.newConnection();
74+
Channel ch = conn.createChannel();
75+
ch.queueDeclare(QUEUE_NAME, true, false, true, null);
76+
ch.confirmSelect(true);
77+
ch.setAckListener(new AckListener() {
78+
public void handleAck(long seqNo,
79+
boolean multiple) {
80+
if (multiple) {
81+
for (long i = ackSet.first(); i <= seqNo; ++i)
82+
ackSet.remove(i);
83+
} else {
84+
ackSet.remove(seqNo);
85+
}
86+
}
87+
});
88+
89+
// Publish
90+
for (long i = 0; i < MSG_COUNT; ++i) {
91+
ackSet.add(i);
92+
ch.basicPublish("", QUEUE_NAME,
93+
MessageProperties.PERSISTENT_BASIC,
94+
"nop".getBytes());
95+
}
96+
97+
// Wait
98+
while (ackSet.size() > 0)
99+
Thread.sleep(10);
100+
101+
// Cleanup
102+
ch.close();
103+
conn.close();
104+
105+
long endTime = System.currentTimeMillis();
106+
System.out.printf("Test took %.3fs\n",
107+
(float)(endTime - startTime)/1000);
108+
} catch (Throwable e) {
109+
System.out.println("foobar :(");
110+
System.out.print(e);
111+
}
112+
}
113+
}
114+
115+
static class Consumer implements Runnable {
116+
public void run() {
117+
try {
118+
// Setup
119+
Connection conn = connectionFactory.newConnection();
120+
Channel ch = conn.createChannel();
121+
ch.queueDeclare(QUEUE_NAME, true, false, true, null);
122+
123+
// Consume
124+
QueueingConsumer qc = new QueueingConsumer(ch);
125+
ch.basicConsume(QUEUE_NAME, true, qc);
126+
for (int i = 0; i < MSG_COUNT; ++i) {
127+
qc.nextDelivery();
128+
}
129+
130+
// Consume
131+
ch.close();
132+
conn.close();
133+
} catch (Throwable e) {
134+
System.out.println("Whoosh!");
135+
System.out.print(e);
136+
}
137+
}
138+
}
139+
}

0 commit comments

Comments
 (0)