Skip to content

Commit 5c493e2

Browse files
author
Simon MacMullen
committed
Merge default into bug22892
2 parents 18b7308 + 1ab4163 commit 5c493e2

File tree

12 files changed

+314
-40
lines changed

12 files changed

+314
-40
lines changed

build.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
<pathelement path="${test.javac.out}"/>
2727
</path>
2828

29-
<property name="AMQP_SPEC_JSON_PATH" value="${codegen.dir}/amqp-${spec.version}.json"/>
29+
<property name="AMQP_SPEC_JSON_PATH" value="${codegen.dir}/amqp-${spec.version}.json ${codegen.dir}/rabbitmq-0.8-extensions.json"/>
3030

3131
<target name="amqp-generate-check" description="check if codegen needs to be run">
3232
<uptodate property="amqp.generate.notRequired">

src/com/rabbitmq/client/Channel.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -495,12 +495,22 @@ Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, b
495495
* Ask the broker to resend unacknowledged messages. In 0-8
496496
* basic.recover is asynchronous; in 0-9-1 it is synchronous, and
497497
* the new, deprecated method basic.recover_async is asynchronous.
498-
* To avoid this API changing, this is named for the latter, and
499-
* will be deprecated.
500498
* @param requeue If true, messages will be requeued and possibly
501499
* delivered to a different consumer. If false, messages will be
502500
* redelivered to the same consumer.
503501
*/
502+
Basic.RecoverOk basicRecover(boolean requeue) throws IOException;
503+
504+
/**
505+
* Ask the broker to resend unacknowledged messages. In 0-8
506+
* basic.recover is asynchronous; in 0-9-1 it is synchronous, and
507+
* the new, deprecated method basic.recover_async is asynchronous
508+
* and deprecated.
509+
* @param requeue If true, messages will be requeued and possibly
510+
* delivered to a different consumer. If false, messages will be
511+
* redelivered to the same consumer.
512+
*/
513+
@Deprecated
504514
void basicRecoverAsync(boolean requeue) throws IOException;
505515

506516
/**

src/com/rabbitmq/client/ConnectionFactory.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -335,7 +335,13 @@ protected FrameHandler createFrameHandler(Address addr)
335335
Socket socket = factory.createSocket();
336336
configureSocket(socket);
337337
socket.connect(new InetSocketAddress(hostName, portNumber));
338-
return new SocketFrameHandler(socket);
338+
return createFrameHandler(socket);
339+
}
340+
341+
protected FrameHandler createFrameHandler(Socket sock)
342+
throws IOException
343+
{
344+
return new SocketFrameHandler(sock);
339345
}
340346

341347
/**

src/com/rabbitmq/client/Consumer.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,19 @@ public interface Consumer {
7070
*/
7171
void handleShutdownSignal(String consumerTag, ShutdownSignalException sig);
7272

73+
/**
74+
* Called to notify the consumer that we've received a basic.recover-ok
75+
* in reply to a basic.recover some other thread sent. All messages
76+
* received before this is invoked that haven't been ack'ed will be
77+
* redelivered. All messages received afterwards won't be.
78+
*
79+
* This method exists since all the Consumer callbacks are invoked by the
80+
* connection main loop thread - so it's sometimes useful to allow that
81+
* thread to know that the recover-ok has been received, rather than the
82+
* thread which invoked basicRecover().
83+
*/
84+
void handleRecoverOk();
85+
7386
/**
7487
* Called when a delivery appears for this consumer.
7588
* @param consumerTag the defined consumerTag (either client- or server-generated)

src/com/rabbitmq/client/DefaultConsumer.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,13 @@ public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig
7474
// no work to do
7575
}
7676

77+
/**
78+
* No-op implementation of {@link Consumer#handleRecoverOk}.
79+
*/
80+
public void handleRecoverOk() {
81+
// no work to do
82+
}
83+
7784
/**
7885
* No-op implementation of {@link Consumer#handleDelivery}.
7986
*/

src/com/rabbitmq/client/impl/AMQConnection.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -544,9 +544,18 @@ public boolean processControlCommand(Command c)
544544
// See the detailed comments in ChannelN.processAsync.
545545

546546
Method method = c.getMethod();
547-
547+
548548
if (method instanceof AMQP.Connection.Close) {
549-
handleConnectionClose(c);
549+
if (isOpen()) {
550+
handleConnectionClose(c);
551+
} else {
552+
// Already shutting down, so just send back a CloseOk.
553+
try {
554+
_channel0.quiescingTransmit(new AMQImpl.Connection.CloseOk());
555+
} catch (IOException ioe) {
556+
Utility.emptyStatement();
557+
}
558+
}
550559
return true;
551560
} else {
552561
if (isOpen()) {

src/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 46 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -189,8 +189,9 @@ public void releaseChannelNumber() {
189189

190190
/**
191191
* Protected API - Filters the inbound command stream, processing
192-
* Basic.Deliver, Basic.Return, Channel.Flow and Channel.Close
193-
* specially.
192+
* Basic.Deliver, Basic.Return and Channel.Close specially. If
193+
* we're in quiescing mode, all inbound commands are ignored,
194+
* except for Channel.Close and Channel.CloseOk.
194195
*/
195196
@Override public boolean processAsync(Command command) throws IOException
196197
{
@@ -199,33 +200,30 @@ public void releaseChannelNumber() {
199200
// If we are not, however, then we are in a quiescing, or
200201
// shutting-down state as the result of an application
201202
// decision to close this channel, and we are to discard all
202-
// incoming commands except for a close-ok.
203+
// incoming commands except for a close and close-ok.
203204

204205
Method method = command.getMethod();
205206

206-
if (method instanceof Channel.Close) {
207-
// Channel should always respond to Channel.Close
208-
// from the server
209-
releaseChannelNumber();
210-
ShutdownSignalException signal = new ShutdownSignalException(false,
211-
false,
212-
command,
213-
this);
214-
synchronized (_channelMutex) {
215-
try {
216-
processShutdownSignal(signal, true, false);
217-
quiescingTransmit(new Channel.CloseOk());
218-
} finally {
219-
notifyOutstandingRpc(signal);
220-
}
221-
}
222-
notifyListeners();
223-
return true;
224-
}
225207
if (isOpen()) {
226208
// We're in normal running mode.
227209

228-
if (method instanceof Basic.Deliver) {
210+
if (method instanceof Channel.Close) {
211+
releaseChannelNumber();
212+
ShutdownSignalException signal = new ShutdownSignalException(false,
213+
false,
214+
command,
215+
this);
216+
synchronized (_channelMutex) {
217+
try {
218+
processShutdownSignal(signal, true, false);
219+
quiescingTransmit(new Channel.CloseOk());
220+
} finally {
221+
notifyOutstandingRpc(signal);
222+
}
223+
}
224+
notifyListeners();
225+
return true;
226+
} else if (method instanceof Basic.Deliver) {
229227
Basic.Deliver m = (Basic.Deliver) method;
230228

231229
Consumer callback = _consumers.get(m.consumerTag);
@@ -285,13 +283,28 @@ public void releaseChannelNumber() {
285283
}
286284
}
287285
return true;
286+
} else if (method instanceof Basic.RecoverOk) {
287+
for (Consumer callback: _consumers.values()) {
288+
callback.handleRecoverOk();
289+
}
290+
291+
// Unlike all the other cases we still want this RecoverOk to
292+
// be handled by whichever RPC continuation invoked Recover,
293+
// so return false
294+
return false;
288295
} else {
289296
return false;
290297
}
291298
} else {
292299
// We're in quiescing mode.
293300

294-
if (method instanceof Channel.CloseOk) {
301+
if (method instanceof Channel.Close) {
302+
// We're already shutting down, so just send back an ok.
303+
synchronized (_channelMutex) {
304+
quiescingTransmit(new Channel.CloseOk());
305+
}
306+
return true;
307+
} else if (method instanceof Channel.CloseOk) {
295308
// We're quiescing, and we see a channel.close-ok:
296309
// this is our signal to leave quiescing mode and
297310
// finally shut down for good. Let it be handled as an
@@ -699,11 +712,19 @@ public Consumer transformReply(AMQCommand replyCommand) {
699712
}
700713
}
701714

715+
/** Public API - {@inheritDoc} */
716+
public Basic.RecoverOk basicRecover(boolean requeue)
717+
throws IOException
718+
{
719+
return (Basic.RecoverOk) exnWrappingRpc(new Basic.Recover(requeue)).getMethod();
720+
}
721+
722+
702723
/** Public API - {@inheritDoc} */
703724
public void basicRecoverAsync(boolean requeue)
704725
throws IOException
705726
{
706-
transmit(new Basic.Recover(requeue));
727+
transmit(new Basic.RecoverAsync(requeue));
707728
}
708729

709730
/** Public API - {@inheritDoc} */
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package com.rabbitmq.client.test.functional;
2+
3+
import com.rabbitmq.client.test.BrokerTestCase;
4+
5+
import java.io.IOException;
6+
7+
/**
8+
* See bug 21843. It's not obvious this is the right thing to do, but it's in
9+
* the spec.
10+
*/
11+
public class BindToDefaultExchange extends BrokerTestCase {
12+
public void testBindToDefaultExchange() throws IOException {
13+
String queue = channel.queueDeclare().getQueue();
14+
channel.queueBind(queue, "", "foobar");
15+
basicPublishVolatile("", "foobar"); // Explicit binding
16+
basicPublishVolatile("", queue); // Implicit binding
17+
assertDelivered(queue, 2);
18+
}
19+
}
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
// The contents of this file are subject to the Mozilla Public License
2+
// Version 1.1 (the "License"); you may not use this file except in
3+
// compliance with the License. You may obtain a copy of the License at
4+
// http://www.mozilla.org/MPL/
5+
//
6+
// Software distributed under the License is distributed on an "AS IS"
7+
// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
8+
// License for the specific language governing rights and limitations
9+
// under the License.
10+
//
11+
// The Original Code is RabbitMQ.
12+
//
13+
// The Initial Developers of the Original Code are LShift Ltd,
14+
// Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
15+
//
16+
// Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
17+
// Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
18+
// are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
19+
// Technologies LLC, and Rabbit Technologies Ltd.
20+
//
21+
// Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
22+
// Ltd. Portions created by Cohesive Financial Technologies LLC are
23+
// Copyright (C) 2007-2010 Cohesive Financial Technologies
24+
// LLC. Portions created by Rabbit Technologies Ltd are Copyright
25+
// (C) 2007-2010 Rabbit Technologies Ltd.
26+
//
27+
// All Rights Reserved.
28+
//
29+
// Contributor(s): ______________________________________.
30+
//
31+
32+
package com.rabbitmq.client.test.functional;
33+
34+
import com.rabbitmq.client.test.BrokerTestCase;
35+
36+
import java.io.IOException;
37+
import java.net.Socket;
38+
39+
import com.rabbitmq.client.ConnectionFactory;
40+
import com.rabbitmq.client.GetResponse;
41+
import com.rabbitmq.client.impl.Frame;
42+
import com.rabbitmq.client.impl.FrameHandler;
43+
import com.rabbitmq.client.impl.SocketFrameHandler;
44+
45+
/* Publish a message of size FRAME_MAX. The broker should split this
46+
* into two frames before sending back. */
47+
public class FrameMax extends BrokerTestCase {
48+
/* This value for FrameMax is larger than the minimum and less
49+
* than what Rabbit suggests. */
50+
final static int FRAME_MAX = 70000;
51+
final static int REAL_FRAME_MAX = FRAME_MAX - 8;
52+
final static String ROUTING_KEY = "something";
53+
54+
private String queueName;
55+
56+
public FrameMax() {
57+
connectionFactory = new MyConnectionFactory();
58+
connectionFactory.setRequestedFrameMax(FRAME_MAX);
59+
}
60+
61+
@Override
62+
protected void createResources()
63+
throws IOException
64+
{
65+
queueName = channel.queueDeclare().getQueue();
66+
channel.queueBind(queueName, "", ROUTING_KEY);
67+
}
68+
69+
/* Frame content should be less or equal to frame-max - 8. */
70+
public void testFrameSizes()
71+
throws IOException, InterruptedException
72+
{
73+
/* This should result in at least 3 frames. */
74+
int howMuch = 2*FRAME_MAX;
75+
basicPublishVolatile(new byte[howMuch], ROUTING_KEY);
76+
/* Receive everything that was sent out. */
77+
while (howMuch > 0) {
78+
try {
79+
GetResponse response = channel.basicGet(queueName, false);
80+
howMuch -= response.getBody().length;
81+
} catch (Exception e) {
82+
fail(e.getCause().toString());
83+
}
84+
}
85+
}
86+
87+
/* ConnectionFactory that uses MyFrameHandler rather than
88+
* SocketFrameHandler. */
89+
private static class MyConnectionFactory extends ConnectionFactory {
90+
protected FrameHandler createFrameHandler(Socket sock)
91+
throws IOException
92+
{
93+
return new MyFrameHandler(sock);
94+
}
95+
}
96+
97+
/* FrameHandler with added frame-max error checking. */
98+
private static class MyFrameHandler extends SocketFrameHandler {
99+
public MyFrameHandler(Socket socket)
100+
throws IOException
101+
{
102+
super(socket);
103+
}
104+
105+
public Frame readFrame() throws IOException {
106+
Frame f = super.readFrame();
107+
int size = f.getPayload().length;
108+
if (size > REAL_FRAME_MAX)
109+
fail("Received frame of size " + size
110+
+ ", which exceeds " + REAL_FRAME_MAX + ".");
111+
//System.out.printf("Received a frame of size %d.\n", f.getPayload().length);
112+
return f;
113+
}
114+
}
115+
}

test/src/com/rabbitmq/client/test/functional/FunctionalTests.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ public static TestSuite suite() {
4343
suite.addTestSuite(Routing.class);
4444
suite.addTestSuite(BindingLifecycle.class);
4545
suite.addTestSuite(Recover.class);
46+
suite.addTestSuite(TransactionalRecover.class);
4647
suite.addTestSuite(Transactions.class);
4748
suite.addTestSuite(PersistentTransactions.class);
4849
suite.addTestSuite(RequeueOnConnectionClose.class);
@@ -54,10 +55,12 @@ public static TestSuite suite() {
5455
suite.addTestSuite(QosTests.class);
5556
suite.addTestSuite(AlternateExchange.class);
5657
suite.addTestSuite(ExchangeDeclare.class);
58+
suite.addTestSuite(FrameMax.class);
5759
suite.addTestSuite(QueueLifecycle.class);
5860
suite.addTestSuite(QueueExclusivity.class);
5961
suite.addTestSuite(InvalidAcks.class);
6062
suite.addTestSuite(InvalidAcksTx.class);
63+
suite.addTestSuite(BindToDefaultExchange.class);
6164
return suite;
6265
}
6366
}

0 commit comments

Comments
 (0)