Skip to content

Commit 60ba8fd

Browse files
author
Emile Joubert
committed
Merged default into amqp_0_9_1
2 parents 179b327 + a849d7d commit 60ba8fd

File tree

11 files changed

+240
-53
lines changed

11 files changed

+240
-53
lines changed

test/src/com/rabbitmq/client/test/functional/BindingLifecycle.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,40 @@ public void testQueuePurge() throws IOException {
6767
deleteExchangeAndQueue(binding);
6868
}
6969

70+
/**
71+
* See bug 21854:
72+
* "When Queue.Purge is called, sent-but-unacknowledged messages are no
73+
* longer purged, even if the channel they were sent down is not
74+
* (Tx-)transacted."
75+
*/
76+
public void testUnackedPurge() throws IOException {
77+
Binding binding = setupExchangeBindings(false);
78+
channel.basicPublish(binding.x, binding.k, null, payload);
79+
80+
GetResponse response = channel.basicGet(binding.q, false);
81+
assertFalse(response.getEnvelope().isRedeliver());
82+
assertNotNull("The response SHOULD NOT BE null", response);
83+
84+
// If we purge the queue the unacked message should still be there on
85+
// recover.
86+
channel.queuePurge(binding.q);
87+
response = channel.basicGet(binding.q, true);
88+
assertNull("The response SHOULD BE null", response);
89+
90+
channel.basicRecoverAsync(true);
91+
response = channel.basicGet(binding.q, false);
92+
assertTrue(response.getEnvelope().isRedeliver());
93+
assertNotNull("The response SHOULD NOT BE null", response);
94+
95+
// If we recover then purge the message should go away
96+
channel.basicRecoverAsync(true);
97+
channel.queuePurge(binding.q);
98+
response = channel.basicGet(binding.q, true);
99+
assertNull("The response SHOULD BE null", response);
100+
101+
deleteExchangeAndQueue(binding);
102+
}
103+
70104
/**
71105
* This tests whether when you delete an exchange, that any
72106
* bindings attached to it are deleted as well.

test/src/com/rabbitmq/client/test/functional/DurableOnTransient.java

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -68,14 +68,11 @@ protected void releaseResources() throws IOException {
6868
channel.exchangeDelete(X);
6969
}
7070

71-
public void testBind()
71+
public void testBindDurableToTransient()
7272
throws IOException
7373
{
74-
try {
75-
channel.queueBind(Q, X, "");
76-
fail("Expected exception from queueBind");
77-
} catch (IOException ee) {
78-
// Pass!
79-
}
74+
channel.queueBind(Q, X, "");
75+
basicPublish();
76+
assertNotNull(basicGet());
8077
}
8178
}

test/src/com/rabbitmq/client/test/functional/ExchangeDeclare.java

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,7 @@
3535
import java.util.HashMap;
3636
import java.io.IOException;
3737

38-
import com.rabbitmq.client.AMQP;
3938
import com.rabbitmq.client.Channel;
40-
import com.rabbitmq.client.QueueingConsumer;
4139

4240
import com.rabbitmq.client.test.BrokerTestCase;
4341

@@ -50,26 +48,27 @@ public class ExchangeDeclare extends BrokerTestCase {
5048
public void releaseResources() throws IOException {
5149
channel.exchangeDelete(NAME);
5250
}
53-
54-
public static void verifyEquivalent(Channel channel, String name, String type, boolean durable, Map<String, Object> args)
55-
throws IOException {
51+
52+
public static void verifyEquivalent(Channel channel, String name,
53+
String type, boolean durable,
54+
Map<String, Object> args) throws IOException {
5655
channel.exchangeDeclarePassive(name);
5756
channel.exchangeDeclare(name, type, durable, args);
5857
}
5958

6059
// Note: this will close the channel
61-
public static void verifyNotEquivalent(Channel channel, String name, String type, boolean durable, Map<String, Object> args)
62-
throws IOException {
60+
public static void verifyNotEquivalent(Channel channel, String name,
61+
String type, boolean durable,
62+
Map<String, Object> args) throws IOException {
6363
channel.exchangeDeclarePassive(name);
6464
try {
6565
channel.exchangeDeclare(name, type, durable, args);
6666
fail("Exchange was supposed to be not equivalent");
67-
}
68-
catch (IOException ioe) {
67+
} catch (IOException ioe) {
6968
return;
7069
}
7170
}
72-
71+
7372
public void testExchangeNoArgsEquivalence() throws IOException {
7473
channel.exchangeDeclare(NAME, TYPE, false, null);
7574
verifyEquivalent(channel, NAME, TYPE, false, null);
@@ -91,4 +90,4 @@ public void testExchangeTypeNotEquivalent() throws IOException {
9190
channel.exchangeDeclare(NAME, "direct", false, null);
9291
verifyNotEquivalent(channel, NAME, "fanout", false, null);
9392
}
94-
}
93+
}
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
// The contents of this file are subject to the Mozilla Public License
2+
// Version 1.1 (the "License"); you may not use this file except in
3+
// compliance with the License. You may obtain a copy of the License at
4+
// http://www.mozilla.org/MPL/
5+
//
6+
// Software distributed under the License is distributed on an "AS IS"
7+
// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
8+
// License for the specific language governing rights and limitations
9+
// under the License.
10+
//
11+
// The Original Code is RabbitMQ.
12+
//
13+
// The Initial Developers of the Original Code are LShift Ltd,
14+
// Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
15+
//
16+
// Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
17+
// Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
18+
// are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
19+
// Technologies LLC, and Rabbit Technologies Ltd.
20+
//
21+
// Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
22+
// Ltd. Portions created by Cohesive Financial Technologies LLC are
23+
// Copyright (C) 2007-2010 Cohesive Financial Technologies
24+
// LLC. Portions created by Rabbit Technologies Ltd are Copyright
25+
// (C) 2007-2010 Rabbit Technologies Ltd.
26+
//
27+
// All Rights Reserved.
28+
//
29+
// Contributor(s): ______________________________________.
30+
//
31+
32+
package com.rabbitmq.client.test.functional;
33+
34+
import java.io.IOException;
35+
36+
import com.rabbitmq.client.AMQP;
37+
import com.rabbitmq.client.ShutdownSignalException;
38+
import com.rabbitmq.client.test.BrokerTestCase;
39+
40+
/* Declare an exchange, bind a queue to it, then try to delete it,
41+
* setting if-unused to true. This should throw an exception. */
42+
public class ExchangeDeleteIfUnused extends BrokerTestCase {
43+
private final static String EXCHANGE_NAME = "xchg1";
44+
private final static String ROUTING_KEY = "something";
45+
46+
protected void createResources()
47+
throws IOException
48+
{
49+
super.createResources();
50+
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
51+
String queueName = channel.queueDeclare().getQueue();
52+
channel.queueBind(queueName, EXCHANGE_NAME, ROUTING_KEY);
53+
}
54+
55+
protected void releaseResources()
56+
throws IOException
57+
{
58+
channel.exchangeDelete(EXCHANGE_NAME);
59+
super.releaseResources();
60+
}
61+
62+
/* Attempt to Exchange.Delete(ifUnused = true) a used exchange.
63+
* Should throw an exception. */
64+
public void testExchangeDelete() {
65+
try {
66+
channel.exchangeDelete(EXCHANGE_NAME, true);
67+
fail("Exception expected if exchange in use");
68+
} catch (IOException e) {
69+
checkShutdownSignal(AMQP.PRECONDITION_FAILED, e);
70+
}
71+
}
72+
}

test/src/com/rabbitmq/client/test/functional/FunctionalTests.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,11 +50,14 @@ public static TestSuite suite() {
5050
suite.addTestSuite(RequeueOnChannelClose.class);
5151
suite.addTestSuite(NoRequeueOnCancel.class);
5252
suite.addTestSuite(Bug20004Test.class);
53+
suite.addTestSuite(ExchangeDeleteIfUnused.class);
5354
suite.addTestSuite(QosTests.class);
5455
suite.addTestSuite(AlternateExchange.class);
5556
suite.addTestSuite(ExchangeDeclare.class);
5657
suite.addTestSuite(QueueLifecycle.class);
5758
suite.addTestSuite(QueueExclusivity.class);
59+
suite.addTestSuite(InvalidAcks.class);
60+
suite.addTestSuite(InvalidAcksTx.class);
5861
return suite;
5962
}
6063
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package com.rabbitmq.client.test.functional;
2+
3+
import java.io.IOException;
4+
5+
public class InvalidAcks extends InvalidAcksBase {
6+
protected void select() throws IOException {}
7+
protected void commit() throws IOException {}
8+
}
9+
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
package com.rabbitmq.client.test.functional;
2+
3+
import com.rabbitmq.client.AMQP;
4+
import com.rabbitmq.client.GetResponse;
5+
import com.rabbitmq.client.test.BrokerTestCase;
6+
7+
import java.io.IOException;
8+
9+
/**
10+
* See bug 21846:
11+
* Basic.Ack is now required to signal a channel error immediately upon
12+
* detecting an invalid deliveryTag, even if the channel is (Tx-)transacted.
13+
*
14+
* Specifically, a client MUST not acknowledge the same message more than once.
15+
*/
16+
public abstract class InvalidAcksBase extends BrokerTestCase {
17+
protected abstract void select() throws IOException;
18+
protected abstract void commit() throws IOException;
19+
20+
public void testDoubleAck()
21+
throws IOException
22+
{
23+
select();
24+
String q = channel.queueDeclare().getQueue();
25+
basicPublishVolatile(q);
26+
commit();
27+
28+
long tag = channel.basicGet(q, false).getEnvelope().getDeliveryTag();
29+
channel.basicAck(tag, false);
30+
channel.basicAck(tag, false);
31+
32+
expectChannelError(AMQP.NOT_FOUND);
33+
}
34+
35+
private void expectChannelError(int error) {
36+
try {
37+
channel.queueDeclare();
38+
fail("Expected channel error " + error);
39+
} catch (IOException e) {
40+
checkShutdownSignal(error, e);
41+
}
42+
}
43+
44+
public void testCrazyAck()
45+
throws IOException
46+
{
47+
select();
48+
channel.basicAck(123456, false);
49+
expectChannelError(AMQP.NOT_FOUND);
50+
}
51+
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package com.rabbitmq.client.test.functional;
2+
3+
import java.io.IOException;
4+
5+
public class InvalidAcksTx extends InvalidAcksBase {
6+
protected void select() throws IOException {
7+
channel.txSelect();
8+
}
9+
10+
protected void commit() throws IOException {
11+
channel.txCommit();
12+
}
13+
}

test/src/com/rabbitmq/client/test/functional/TransactionsBase.java

Lines changed: 24 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131

3232
package com.rabbitmq.client.test.functional;
3333

34+
import com.rabbitmq.client.AMQP;
3435
import com.rabbitmq.client.test.BrokerTestCase;
3536
import java.io.IOException;
3637

@@ -227,22 +228,6 @@ public void testRollbackAcksAndReAck()
227228
closeChannel();
228229
}
229230

230-
/*
231-
it is legal to ack the same message twice
232-
*/
233-
public void testDuplicateAck()
234-
throws IOException
235-
{
236-
openChannel();
237-
basicPublish();
238-
txSelect();
239-
basicGet();
240-
basicAck();
241-
basicAck();
242-
txCommit();
243-
closeChannel();
244-
}
245-
246231
/*
247232
it is illegal to ack with an unknown delivery tag
248233
*/
@@ -319,4 +304,27 @@ public void testAckAll()
319304
closeChannel();
320305
}
321306

307+
public void testNonTransactedCommit()
308+
throws IOException
309+
{
310+
openChannel();
311+
try {
312+
txCommit();
313+
fail("Expected channel error");
314+
} catch (IOException e) {
315+
checkShutdownSignal(AMQP.PRECONDITION_FAILED, e);
316+
}
317+
}
318+
319+
public void testNonTransactedRollback()
320+
throws IOException
321+
{
322+
openChannel();
323+
try {
324+
txRollback();
325+
fail("Expected channel error");
326+
} catch (IOException e) {
327+
checkShutdownSignal(AMQP.PRECONDITION_FAILED, e);
328+
}
329+
}
322330
}

test/src/com/rabbitmq/client/test/server/ExchangeEquivalence.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,13 +46,15 @@ public class ExchangeEquivalence extends BrokerTestCase {
4646

4747
public void testAlternateExchangeEquivalence() throws IOException {
4848
channel.exchangeDeclare("alternate", "direct", false, args);
49-
ExchangeDeclare.verifyEquivalent(channel, "alternate", "direct", false, args);
49+
ExchangeDeclare.verifyEquivalent(channel, "alternate", "direct", false,
50+
args);
5051
}
5152

5253
public void testAlternateExchangeNonEquivalence() throws IOException {
5354
channel.exchangeDeclare("alternate", "direct", false, args);
5455
Map<String, Object> altargs = new HashMap<String, Object>();
5556
altargs.put("alternate-exchange", "somewhere");
56-
ExchangeDeclare.verifyNotEquivalent(channel, "alternate", "direct", false, altargs);
57+
ExchangeDeclare.verifyNotEquivalent(channel, "alternate", "direct",
58+
false, altargs);
5759
}
58-
}
60+
}

0 commit comments

Comments
 (0)