Skip to content

Commit 93c2294

Browse files
author
Alexandru Scvortov
committed
added confirm example
1 parent f4aa3aa commit 93c2294

File tree

1 file changed

+126
-0
lines changed

1 file changed

+126
-0
lines changed
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
// The contents of this file are subject to the Mozilla Public License
2+
// Version 1.1 (the "License"); you may not use this file except in
3+
// compliance with the License. You may obtain a copy of the License at
4+
// http://www.mozilla.org/MPL/
5+
//
6+
// Software distributed under the License is distributed on an "AS IS"
7+
// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
8+
// License for the specific language governing rights and limitations
9+
// under the License.
10+
//
11+
// The Original Code is RabbitMQ.
12+
//
13+
// The Initial Developers of the Original Code are LShift Ltd,
14+
// Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
15+
//
16+
// Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
17+
// Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
18+
// are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
19+
// Technologies LLC, and Rabbit Technologies Ltd.
20+
//
21+
// Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
22+
// Ltd. Portions created by Cohesive Financial Technologies LLC are
23+
// Copyright (C) 2007-2010 Cohesive Financial Technologies
24+
// LLC. Portions created by Rabbit Technologies Ltd are Copyright
25+
// (C) 2007-2010 Rabbit Technologies Ltd.
26+
//
27+
// All Rights Reserved.
28+
//
29+
// Contributor(s): ______________________________________.
30+
//
31+
32+
package com.rabbitmq.examples;
33+
34+
import com.rabbitmq.client.AMQP;
35+
import com.rabbitmq.client.AckListener;
36+
import com.rabbitmq.client.Channel;
37+
import com.rabbitmq.client.Connection;
38+
import com.rabbitmq.client.ConnectionFactory;
39+
import com.rabbitmq.client.MessageProperties;
40+
import com.rabbitmq.client.QueueingConsumer;
41+
42+
import java.util.Collections;
43+
import java.util.SortedSet;
44+
import java.util.TreeSet;
45+
46+
import java.io.IOException;
47+
48+
public class ConfirmDontLoseMessages {
49+
final static int MSG_COUNT = 10000;
50+
final static String QUEUE_NAME = "confirm-test";
51+
static ConnectionFactory connectionFactory;
52+
53+
public static void main(String[] args)
54+
throws IOException, InterruptedException
55+
{
56+
connectionFactory = new ConnectionFactory();
57+
58+
(new Thread(new Consumer())).start();
59+
(new Thread(new Publisher())).start();
60+
}
61+
62+
static class Publisher implements Runnable {
63+
volatile SortedSet<Long> ackSet = Collections.synchronizedSortedSet(new TreeSet<Long>());
64+
65+
public void run() {
66+
try {
67+
long startTime = System.currentTimeMillis();
68+
69+
Connection conn = connectionFactory.newConnection();
70+
Channel ch = conn.createChannel();
71+
ch.queueDeclare(QUEUE_NAME, true, false, true, null);
72+
ch.confirmSelect(true);
73+
ch.setAckListener(new AckListener() {
74+
public void handleAck(long seqNo,
75+
boolean multiple) {
76+
if (multiple) {
77+
for (long i = ackSet.first(); i <= seqNo; ++i)
78+
ackSet.remove(i);
79+
} else {
80+
ackSet.remove(seqNo);
81+
}
82+
}
83+
});
84+
85+
for (long i = 0; i < MSG_COUNT; ++i) {
86+
ackSet.add(i);
87+
ch.basicPublish("", QUEUE_NAME,
88+
MessageProperties.PERSISTENT_BASIC,
89+
"nop".getBytes());
90+
}
91+
92+
while (ackSet.size() > 0)
93+
Thread.sleep(10);
94+
95+
ch.close();
96+
conn.close();
97+
98+
long endTime = System.currentTimeMillis();
99+
System.out.printf("Test took %.3fs\n", (float)(endTime - startTime)/1000);
100+
} catch (Throwable e) {
101+
System.out.println("foobar :(");
102+
System.out.print(e);
103+
}
104+
}
105+
}
106+
107+
static class Consumer implements Runnable {
108+
public void run() {
109+
try {
110+
Connection conn = connectionFactory.newConnection();
111+
Channel ch = conn.createChannel();
112+
ch.queueDeclare(QUEUE_NAME, true, false, true, null);
113+
QueueingConsumer qc = new QueueingConsumer(ch);
114+
ch.basicConsume(QUEUE_NAME, true, qc);
115+
for (int i = 0; i < MSG_COUNT; ++i) {
116+
qc.nextDelivery();
117+
}
118+
ch.close();
119+
conn.close();
120+
} catch (Throwable e) {
121+
System.out.println("Whoosh!");
122+
System.out.print(e);
123+
}
124+
}
125+
}
126+
}

0 commit comments

Comments
 (0)