Skip to content

Commit 718a9d3

Browse files
author
Matthew Sackman
committed
Merging bug 22867 onto amqp0_9_1
2 parents 5393e4f + 6936a7f commit 718a9d3

File tree

15 files changed

+150
-36
lines changed

15 files changed

+150
-36
lines changed

src/com/rabbitmq/client/Channel.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -214,11 +214,12 @@ void basicPublish(String exchange, String routingKey, boolean mandatory, boolean
214214
* @param exchange the name of the exchange
215215
* @param type the exchange type
216216
* @param durable true if we are declaring a durable exchange (the exchange will survive a server restart)
217+
* @param autoDelete true if the server should delete the exchange when it is no longer in use
217218
* @param arguments other properties (construction arguments) for the exchange
218219
* @return a declaration-confirm method to indicate the exchange was successfully declared
219220
* @throws java.io.IOException if an error is encountered
220221
*/
221-
Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable,
222+
Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete,
222223
Map<String, Object> arguments) throws IOException;
223224

224225
/**
@@ -287,7 +288,7 @@ Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, b
287288
* exclusively owned by another connection.
288289
*/
289290
Queue.DeclareOk queueDeclarePassive(String queue) throws IOException;
290-
291+
291292
/**
292293
* Delete a queue, without regard for whether it is in use or has messages on it
293294
* @see com.rabbitmq.client.AMQP.Queue.Delete

src/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -421,13 +421,13 @@ public void basicPublish(String exchange, String routingKey,
421421

422422
/** Public API - {@inheritDoc} */
423423
public Exchange.DeclareOk exchangeDeclare(String exchange, String type,
424-
boolean durable,
424+
boolean durable, boolean autoDelete,
425425
Map<String, Object> arguments)
426426
throws IOException
427427
{
428428
return (Exchange.DeclareOk)
429429
exnWrappingRpc(new Exchange.Declare(TICKET, exchange, type,
430-
false, durable, false,
430+
false, durable, autoDelete,
431431
false, false, arguments)).getMethod();
432432
}
433433

@@ -436,14 +436,14 @@ public Exchange.DeclareOk exchangeDeclare(String exchange, String type,
436436
boolean durable)
437437
throws IOException
438438
{
439-
return exchangeDeclare(exchange, type, durable, null);
439+
return exchangeDeclare(exchange, type, durable, false, null);
440440
}
441441

442442
/** Public API - {@inheritDoc} */
443443
public Exchange.DeclareOk exchangeDeclare(String exchange, String type)
444444
throws IOException
445445
{
446-
return exchangeDeclare(exchange, type, false, null);
446+
return exchangeDeclare(exchange, type, false, false, null);
447447
}
448448

449449
/** Public API - {@inheritDoc} */

test/src/com/rabbitmq/client/test/CloseInMainLoop.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ public void testCloseOKNormallyReceived() throws Exception{
6363
public void testCloseWithFaultyConsumer() throws Exception{
6464
SpecialConnection connection = new SpecialConnection();
6565
Channel channel = connection.createChannel();
66-
channel.exchangeDeclare("x", "direct", false, null);
66+
channel.exchangeDeclare("x", "direct");
6767
channel.queueDeclare("q", false, false, false, null);
6868
channel.queueBind("q", "x", "k");
6969

test/src/com/rabbitmq/client/test/functional/AlternateExchange.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ public void handleBasicReturn(int replyCode,
109109
protected void setupRouting(String name, String ae) throws IOException {
110110
Map<String, Object> args = new HashMap<String, Object>();
111111
if (ae != null) args.put("alternate-exchange", ae);
112-
channel.exchangeDeclare(name, "direct", false, args);
112+
channel.exchangeDeclare(name, "direct", false, false, args);
113113
channel.queueBind(name, name, name);
114114
}
115115

test/src/com/rabbitmq/client/test/functional/BindingLifecycle.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,40 @@ public void testExchangeIfUnused() throws IOException {
147147
fail("Exchange delete should have failed");
148148
}
149149

150+
/**
151+
* This tests whether the server checks that an auto_delete
152+
* exchange actually deletes the bindings attached to it when it
153+
* is deleted.
154+
*
155+
* To test this, you declare and auto_delete exchange and bind an
156+
* auto_delete queue to it.
157+
*
158+
* Start a consumer on this queue, send a message, let it get
159+
* consumed and then cancel the consumer
160+
*
161+
* The unsubscribe should cause the queue to auto_delete, which in
162+
* turn should cause the exchange to auto_delete.
163+
*
164+
* Then re-declare the queue again and try to rebind it to the same exhange.
165+
*
166+
* Because the exchange has been auto-deleted, the bind operation
167+
* should fail.
168+
*/
169+
public void testExchangeAutoDelete() throws IOException {
170+
doAutoDelete(false, 1);
171+
}
172+
173+
/**
174+
* Runs something similar to testExchangeAutoDelete, but adds
175+
* different queues with the same binding to the same exchange.
176+
*
177+
* The difference should be that the original exchange should not
178+
* get auto-deleted
179+
*/
180+
public void testExchangeAutoDeleteManyBindings() throws IOException {
181+
doAutoDelete(false, 10);
182+
}
183+
150184
/**
151185
*
152186
*/

test/src/com/rabbitmq/client/test/functional/BindingLifecycleBase.java

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,57 @@ protected void deleteExchangeAndQueue(Binding binding) throws IOException {
120120
channel.exchangeDelete(binding.x);
121121
}
122122

123+
protected void doAutoDelete(boolean durable, int queues) throws IOException {
124+
String[] queueNames = null;
125+
Binding binding = Binding.randomBinding();
126+
channel.exchangeDeclare(binding.x, "direct", durable, true, null);
127+
channel.queueDeclare(binding.q, durable, false, true, null);
128+
channel.queueBind(binding.q, binding.x, binding.k);
129+
if (queues > 1) {
130+
int j = queues - 1;
131+
queueNames = new String[j];
132+
for (int i = 0; i < j; i++) {
133+
queueNames[i] = randomString();
134+
channel.queueDeclare(queueNames[i], durable, false, false, null);
135+
channel.queueBind(queueNames[i], binding.x, binding.k);
136+
channel.basicConsume(queueNames[i], true, new QueueingConsumer(channel));
137+
}
138+
}
139+
subscribeSendUnsubscribe(binding);
140+
if (durable) {
141+
restart();
142+
}
143+
if (queues > 1) {
144+
for (String s : queueNames) {
145+
channel.basicConsume(s, true, new QueueingConsumer(channel));
146+
Binding tmp = new Binding(s, binding.x, binding.k);
147+
sendUnroutable(tmp);
148+
}
149+
}
150+
channel.queueDeclare(binding.q, durable, true, true, null);
151+
// if (queues == 1): Because the exchange does not exist, this
152+
// bind should fail
153+
try {
154+
channel.queueBind(binding.q, binding.x, binding.k);
155+
sendRoutable(binding);
156+
}
157+
catch (Exception e) {
158+
// do nothing, this is the correct behaviour
159+
channel = null;
160+
return;
161+
}
162+
if (queues == 1) {
163+
deleteExchangeAndQueue(binding);
164+
fail("Queue bind should have failed");
165+
}
166+
// Do some cleanup
167+
if (queues > 1) {
168+
for (String q : queueNames) {
169+
channel.queueDelete(q);
170+
}
171+
}
172+
}
173+
123174

124175
protected void restart() throws IOException {
125176
}
@@ -172,4 +223,26 @@ protected Binding(String q, String x, String k) {
172223
}
173224

174225

226+
// A couple of tests that are common to the subclasses (which differ on
227+
// whether the broker is restarted)
228+
229+
/**
230+
*
231+
* The same thing as testExchangeAutoDelete, but with durable
232+
* queues.
233+
*
234+
* Main difference is restarting the broker to make sure that the
235+
* durable queues are blasted away.
236+
*/
237+
public void testExchangeAutoDeleteDurable() throws IOException {
238+
doAutoDelete(true, 1);
239+
}
240+
241+
/**
242+
* The same thing as testExchangeAutoDeleteManyBindings, but with
243+
* durable queues.
244+
*/
245+
public void testExchangeAutoDeleteDurableManyBindings() throws IOException {
246+
doAutoDelete(true, 10);
247+
}
175248
}

test/src/com/rabbitmq/client/test/functional/ExchangeDeclare.java

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -50,44 +50,49 @@ public void releaseResources() throws IOException {
5050
}
5151

5252
public static void verifyEquivalent(Channel channel, String name,
53-
String type, boolean durable,
53+
String type, boolean durable, boolean autoDelete,
5454
Map<String, Object> args) throws IOException {
5555
channel.exchangeDeclarePassive(name);
56-
channel.exchangeDeclare(name, type, durable, args);
56+
channel.exchangeDeclare(name, type, durable, autoDelete, args);
5757
}
5858

5959
// Note: this will close the channel
6060
public static void verifyNotEquivalent(Channel channel, String name,
61-
String type, boolean durable,
61+
String type, boolean durable, boolean autoDelete,
6262
Map<String, Object> args) throws IOException {
6363
channel.exchangeDeclarePassive(name);
6464
try {
65-
channel.exchangeDeclare(name, type, durable, args);
65+
channel.exchangeDeclare(name, type, durable, autoDelete, args);
6666
fail("Exchange was supposed to be not equivalent");
6767
} catch (IOException ioe) {
6868
return;
6969
}
7070
}
7171

7272
public void testExchangeNoArgsEquivalence() throws IOException {
73-
channel.exchangeDeclare(NAME, TYPE, false, null);
74-
verifyEquivalent(channel, NAME, TYPE, false, null);
73+
channel.exchangeDeclare(NAME, TYPE, false, false, null);
74+
verifyEquivalent(channel, NAME, TYPE, false, false, null);
7575
}
7676

7777
public void testExchangeNonsenseArgsEquivalent() throws IOException {
78-
channel.exchangeDeclare(NAME, TYPE, false, null);
78+
channel.exchangeDeclare(NAME, TYPE, false, false, null);
7979
Map<String, Object> args = new HashMap<String, Object>();
8080
args.put("nonsensical-argument-surely-not-in-use", "foo");
81-
verifyEquivalent(channel, NAME, TYPE, false, args);
81+
verifyEquivalent(channel, NAME, TYPE, false, false, args);
8282
}
8383

8484
public void testExchangeDurableNotEquivalent() throws IOException {
85-
channel.exchangeDeclare(NAME, TYPE, false, null);
86-
verifyNotEquivalent(channel, NAME, TYPE, true, null);
85+
channel.exchangeDeclare(NAME, TYPE, false, false, null);
86+
verifyNotEquivalent(channel, NAME, TYPE, true, false, null);
8787
}
8888

8989
public void testExchangeTypeNotEquivalent() throws IOException {
90-
channel.exchangeDeclare(NAME, "direct", false, null);
91-
verifyNotEquivalent(channel, NAME, "fanout", false, null);
90+
channel.exchangeDeclare(NAME, "direct", false, false, null);
91+
verifyNotEquivalent(channel, NAME, "fanout", false, false, null);
92+
}
93+
94+
public void testExchangeAutoDeleteNotEquivalent() throws IOException {
95+
channel.exchangeDeclare(NAME, "direct", false, false, null);
96+
verifyNotEquivalent(channel, NAME, "direct", false, true, null);
9297
}
9398
}

test/src/com/rabbitmq/client/test/functional/FunctionalTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ public static TestSuite suite() {
4848
suite.addTestSuite(PersistentTransactions.class);
4949
suite.addTestSuite(RequeueOnConnectionClose.class);
5050
suite.addTestSuite(RequeueOnChannelClose.class);
51+
suite.addTestSuite(DurableOnTransient.class);
5152
suite.addTestSuite(NoRequeueOnCancel.class);
5253
suite.addTestSuite(Bug20004Test.class);
5354
suite.addTestSuite(ExchangeDeleteIfUnused.class);

test/src/com/rabbitmq/client/test/functional/QosTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,7 @@ public void testFairness()
203203
//behind" - a notion of fairness somewhat short of perfect but
204204
//probably good enough.
205205
for (String q : queues) {
206-
AMQP.Queue.DeclareOk ok = channel.queueDeclare(q, false, false, false, null);
206+
AMQP.Queue.DeclareOk ok = channel.queueDeclarePassive(q);
207207
assertTrue(ok.getMessageCount() < messageCount);
208208
}
209209

@@ -480,7 +480,7 @@ protected String declareBindConsume(Channel ch,
480480
boolean noAck)
481481
throws IOException
482482
{
483-
AMQP.Queue.DeclareOk ok = ch.queueDeclare("", false, false, false, null);
483+
AMQP.Queue.DeclareOk ok = ch.queueDeclare();
484484
String queue = ok.getQueue();
485485
ch.queueBind(queue, "amq.fanout", "");
486486
ch.basicConsume(queue, noAck, c);

test/src/com/rabbitmq/client/test/functional/Tables.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ public void testTypes() throws IOException {
8484

8585
//sending as part of method arguments - we are relying on
8686
//exchange.declare ignoring the arguments table.
87-
channel.exchangeDeclare("x", "direct", false, table);
87+
channel.exchangeDeclare("x", "direct", false, false, table);
8888
channel.exchangeDelete("x");
8989
}
9090

0 commit comments

Comments
 (0)