Skip to content

Commit f1373c7

Browse files
author
Jerry Kuch
committed
Merge from default.
2 parents db65727 + e98491c commit f1373c7

File tree

4 files changed

+173
-1
lines changed

4 files changed

+173
-1
lines changed

src/com/rabbitmq/client/Channel.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import com.rabbitmq.client.AMQP.Basic;
4141
import com.rabbitmq.client.AMQP.Confirm;
4242
import com.rabbitmq.client.AMQP.Channel.FlowOk;
43+
import com.rabbitmq.client.impl.AMQImpl;
4344

4445
/**
4546
* Public API: Interface to an AMQ channel. See the <a href="http://www.amqp.org/">spec</a> for details.
@@ -285,6 +286,29 @@ void basicPublish(String exchange, String routingKey, boolean mandatory, boolean
285286
Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete,
286287
Map<String, Object> arguments) throws IOException;
287288

289+
/**
290+
* Declare an exchange, via an interface that allows the complete set of
291+
* arguments.
292+
* @see com.rabbitmq.client.AMQP.Exchange.Declare
293+
* @see com.rabbitmq.client.AMQP.Exchange.DeclareOk
294+
* @param exchange the name of the exchange
295+
* @param type the exchange type
296+
* @param durable true if we are declaring a durable exchange (the exchange will survive a server restart)
297+
* @param autoDelete true if the server should delete the exchange when it is no longer in use
298+
* @param internal true if the exchange is internal, i.e. can't be directly
299+
* published to by a client.
300+
* @param arguments other properties (construction arguments) for the exchange
301+
* @return a declaration-confirm method to indicate the exchange was successfully declared
302+
* @throws java.io.IOException if an error is encountered
303+
*/
304+
public AMQImpl.Exchange.DeclareOk exchangeDeclare(String exchange,
305+
String type,
306+
boolean durable,
307+
boolean autoDelete,
308+
boolean internal,
309+
Map<String, Object> arguments)
310+
throws IOException;
311+
288312
/**
289313
* Declare an exchange passively; that is, check if the named exchange exists.
290314
* @param name check the existence of an exchange named this

src/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -521,6 +521,22 @@ public Exchange.DeclareOk exchangeDeclare(String exchange, String type,
521521
false, false, arguments)).getMethod();
522522
}
523523

524+
/** Public API - {@inheritDoc} */
525+
public Exchange.DeclareOk exchangeDeclare(String exchange, String type,
526+
boolean durable,
527+
boolean autoDelete,
528+
boolean internal,
529+
Map<String, Object> arguments)
530+
throws IOException
531+
{
532+
return (Exchange.DeclareOk)
533+
exnWrappingRpc(new Exchange.Declare(TICKET, exchange, type,
534+
false, durable, autoDelete,
535+
internal, false,
536+
arguments)).getMethod();
537+
}
538+
539+
524540
/** Public API - {@inheritDoc} */
525541
public Exchange.DeclareOk exchangeDeclare(String exchange, String type,
526542
boolean durable)

test/src/com/rabbitmq/client/test/functional/FunctionalTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ public static TestSuite suite() {
7070
suite.addTestSuite(Confirm.class);
7171
suite.addTestSuite(UnexpectedFrames.class);
7272
suite.addTestSuite(PerQueueTTL.class);
73-
73+
suite.addTestSuite(InternalExchangeTest.class);
7474
return suite;
7575
}
7676
}
Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
// The contents of this file are subject to the Mozilla Public License
2+
// Version 1.1 (the "License"); you may not use this file except in
3+
// compliance with the License. You may obtain a copy of the License at
4+
// http://www.mozilla.org/MPL/
5+
//
6+
// Software distributed under the License is distributed on an "AS IS"
7+
// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
8+
// License for the specific language governing rights and limitations
9+
// under the License.
10+
//
11+
// The Original Code is RabbitMQ.
12+
//
13+
// The Initial Developers of the Original Code are LShift Ltd,
14+
// Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
15+
//
16+
// Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
17+
// Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
18+
// are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
19+
// Technologies LLC, and Rabbit Technologies Ltd.
20+
//
21+
// Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
22+
// Ltd. Portions created by Cohesive Financial Technologies LLC are
23+
// Copyright (C) 2007-2010 Cohesive Financial Technologies
24+
// LLC. Portions created by Rabbit Technologies Ltd are Copyright
25+
// (C) 2007-2010 Rabbit Technologies Ltd.
26+
//
27+
// All Rights Reserved.
28+
//
29+
// Contributor(s): ______________________________________.
30+
//
31+
32+
package com.rabbitmq.client.test.functional;
33+
34+
import com.rabbitmq.client.GetResponse;
35+
import com.rabbitmq.client.QueueingConsumer;
36+
import com.rabbitmq.client.ShutdownSignalException;
37+
import com.rabbitmq.client.test.BrokerTestCase;
38+
39+
import java.io.IOException;
40+
import java.util.Arrays;
41+
42+
43+
//
44+
// Functional test demonstrating use of an internal exchange in an exchange to
45+
// exchange routing scenario. The routing topology is:
46+
//
47+
// ------- -------
48+
// -/ \- -/ \-
49+
// / \ / \ +-------------+
50+
// | e0 +------| e1 +-----------+ q1 |
51+
// \ / \ / +-------------+
52+
// -\ /- -\ /-
53+
// ------- -------
54+
// (internal)
55+
//
56+
// Where a non-internal exchange is bound to an internal exchange, which in
57+
// turn is bound to a queue. A client should be able to publish to e0, but
58+
// not to e1, and publications to e0 should be delivered into q1.
59+
//
60+
public class InternalExchangeTest extends BrokerTestCase
61+
{
62+
private final String[] queues = new String[] { "q1" };
63+
private final String[] exchanges = new String[] { "e0", "e1" };
64+
65+
private QueueingConsumer[] consumers =
66+
new QueueingConsumer[] { null, null, null };
67+
68+
protected void createResources() throws IOException
69+
{
70+
for (String q : queues)
71+
{
72+
channel.queueDeclare(q, false, false, false, null);
73+
}
74+
75+
// First exchange will be an 'internal' one.
76+
for ( String e : exchanges )
77+
{
78+
channel.exchangeDeclare(e, "direct",
79+
false, false,
80+
e.equals("e0") ? false : true,
81+
null);
82+
}
83+
84+
channel.exchangeBind("e1", "e0", "");
85+
channel.queueBind("q1", "e1", "");
86+
87+
}
88+
89+
@Override
90+
protected void releaseResources() throws IOException
91+
{
92+
for (String q : queues)
93+
{
94+
channel.queueDelete(q);
95+
}
96+
for (String e : exchanges)
97+
{
98+
channel.exchangeDelete(e);
99+
}
100+
}
101+
102+
103+
public void testTryPublishingToInternalExchange()
104+
throws IOException,
105+
InterruptedException
106+
{
107+
byte[] testDataBody = "test-data".getBytes();
108+
109+
// We should be able to publish to the non-internal exchange as usual
110+
// and see our message land in the queue...
111+
channel.basicPublish("e0", "", null, testDataBody);
112+
assertTrue(channel.isOpen());
113+
GetResponse r = channel.basicGet("q1", true);
114+
assertTrue(Arrays.equals(r.getBody(), testDataBody));
115+
116+
117+
// Publishing to the internal exchange will not be allowed...
118+
channel.basicPublish("e1", "", null, testDataBody);
119+
try
120+
{
121+
// This blocking operation will fail because our bad attempt to
122+
// to the internal exchange...
123+
channel.basicGet("q1", true);
124+
fail("Channel should have shut down with 403 (access refused).");
125+
}
126+
catch (IOException e)
127+
{
128+
// We should get 403, access refused...
129+
checkShutdownSignal(403, e);
130+
}
131+
}
132+
}

0 commit comments

Comments
 (0)