Skip to content

Commit e6d8f36

Browse files
author
Alexandru Scvortov
committed
added publisher ack test
1 parent cef94b4 commit e6d8f36

File tree

2 files changed

+134
-0
lines changed

2 files changed

+134
-0
lines changed
Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
// The contents of this file are subject to the Mozilla Public License
2+
// Version 1.1 (the "License"); you may not use this file except in
3+
// compliance with the License. You may obtain a copy of the License at
4+
// http://www.mozilla.org/MPL/
5+
//
6+
// Software distributed under the License is distributed on an "AS IS"
7+
// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
8+
// License for the specific language governing rights and limitations
9+
// under the License.
10+
//
11+
// The Original Code is RabbitMQ.
12+
//
13+
// The Initial Developers of the Original Code are LShift Ltd,
14+
// Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
15+
//
16+
// Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
17+
// Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
18+
// are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
19+
// Technologies LLC, and Rabbit Technologies Ltd.
20+
//
21+
// Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
22+
// Ltd. Portions created by Cohesive Financial Technologies LLC are
23+
// Copyright (C) 2007-2010 Cohesive Financial Technologies
24+
// LLC. Portions created by Rabbit Technologies Ltd are Copyright
25+
// (C) 2007-2010 Rabbit Technologies Ltd.
26+
//
27+
// All Rights Reserved.
28+
//
29+
// Contributor(s): ______________________________________.
30+
//
31+
32+
package com.rabbitmq.client.test.functional;
33+
34+
import com.rabbitmq.client.test.BrokerTestCase;
35+
import com.rabbitmq.client.AMQP;
36+
import com.rabbitmq.client.AckListener;
37+
import com.rabbitmq.client.DefaultConsumer;
38+
import com.rabbitmq.client.MessageProperties;
39+
40+
import java.io.IOException;
41+
import java.util.Set;
42+
import java.util.TreeSet;
43+
44+
public class Confirm extends BrokerTestCase
45+
{
46+
final static int NUM_MESSAGES = 1000;
47+
volatile Set<Long> ackSet;
48+
49+
@Override
50+
protected void setUp() throws IOException {
51+
super.setUp();
52+
ackSet = new TreeSet<Long>();
53+
final Confirm This = this;
54+
channel.setAckListener(new AckListener() {
55+
public void handleAck(long seqNo,
56+
boolean multiple) {
57+
if (multiple) {
58+
for (int i = 0; i <= seqNo; ++i)
59+
This.gotAckFor(i);
60+
} else {
61+
This.gotAckFor(seqNo);
62+
}
63+
}
64+
});
65+
channel.confirmSelect(true);
66+
channel.queueDeclare("confirm-test", true, true, true, null);
67+
channel.basicConsume("confirm-test", true, new DefaultConsumer(channel));
68+
channel.queueDeclare("confirm-test-noconsumer", true, true, true, null);
69+
}
70+
71+
public void testConfirmTransient() throws IOException, InterruptedException {
72+
confirmTest("consumer-test", false, false, false);
73+
}
74+
75+
public void testConfirmPersistentSimple()
76+
throws IOException, InterruptedException
77+
{
78+
confirmTest("consumer-test", true, false, false);
79+
}
80+
81+
public void testConfirmPersistentImmediate()
82+
throws IOException, InterruptedException
83+
{
84+
confirmTest("consumer-test", true, false, true);
85+
}
86+
87+
public void testConfirmPersistentImmediateNoConsumer()
88+
throws IOException, InterruptedException
89+
{
90+
confirmTest("consumer-test-noconsumer", true, false, true);
91+
}
92+
93+
public void testConfirmPersistentMandatory()
94+
throws IOException, InterruptedException
95+
{
96+
confirmTest("consumer-test", true, true, false);
97+
}
98+
99+
public void testConfirmPersistentMandatoryReturn()
100+
throws IOException, InterruptedException
101+
{
102+
confirmTest("consumer-test-doesnotexist", true, true, false);
103+
}
104+
105+
/* Publish NUM_MESSAGES persistent messages and wait for
106+
* confirmations. */
107+
public void confirmTest(String queueName, boolean persistent,
108+
boolean mandatory, boolean immediate)
109+
throws IOException, InterruptedException
110+
{
111+
for (long i = 0; i < NUM_MESSAGES; i++) {
112+
publish(queueName, persistent, mandatory, immediate);
113+
ackSet.add(i);
114+
}
115+
116+
while (ackSet.size() > 0)
117+
Thread.sleep(10);
118+
}
119+
120+
private void publish(String queueName, boolean persistent,
121+
boolean mandatory, boolean immediate)
122+
throws IOException
123+
{
124+
channel.basicPublish("", queueName, mandatory, immediate,
125+
persistent ? MessageProperties.PERSISTENT_BASIC
126+
: MessageProperties.BASIC,
127+
"nop".getBytes());
128+
}
129+
130+
private synchronized void gotAckFor(long msgSeqNo) {
131+
ackSet.remove(msgSeqNo);
132+
}
133+
}

test/src/com/rabbitmq/client/test/functional/FunctionalTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ public static TestSuite suite() {
6666
suite.addTestSuite(BindToDefaultExchange.class);
6767
suite.addTestSuite(UnbindAutoDeleteExchange.class);
6868
suite.addTestSuite(RecoverAfterCancel.class);
69+
suite.addTestSuite(Confirm.class);
6970
suite.addTestSuite(UnexpectedFrames.class);
7071
return suite;
7172
}

0 commit comments

Comments
 (0)