Skip to content

Commit 3d3a244

Browse files
author
Simon MacMullen
committed
Merge default into amqp_0_9_1
2 parents 718a9d3 + f18c6c5 commit 3d3a244

19 files changed

+473
-40
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ SRC_ARCHIVE=$(PACKAGE_NAME)-$(VERSION)
77
SIGNING_KEY=056E8E56
88
GNUPG_PATH=~
99

10-
WEB_URL=http://stage.rabbitmq.com/
10+
WEB_URL=http://www.rabbitmq.com/
1111
NEXUS_STAGE_URL=http://oss.sonatype.org/service/local/staging/deploy/maven2
1212

1313
AMQP_CODEGEN_DIR=$(shell fgrep sibling.codegen.dir build.properties | sed -e 's:sibling\.codegen\.dir=::')

src/com/rabbitmq/client/Channel.java

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,11 @@ public interface Channel extends ShutdownNotifier {
101101
*/
102102
FlowOk flow(boolean active) throws IOException;
103103

104+
/**
105+
* Return the current Channel.Flow settings.
106+
*/
107+
FlowOk getFlow();
108+
104109
/**
105110
* Abort this channel with the {@link com.rabbitmq.client.AMQP#REPLY_SUCCESS} close code
106111
* and message 'OK'.
@@ -130,6 +135,52 @@ public interface Channel extends ShutdownNotifier {
130135
*/
131136
void setReturnListener(ReturnListener listener);
132137

138+
/**
139+
* Return the current {@link FlowListener}.
140+
* @return an interface to the current flow listener.
141+
*/
142+
FlowListener getFlowListener();
143+
144+
/**
145+
* Set the current {@link FlowListener}.
146+
* @param listener the listener to use, or null indicating "don't use one".
147+
*/
148+
void setFlowListener(FlowListener listener);
149+
150+
/**
151+
* Get the current default consumer. @see setDefaultConsumer for rationale.
152+
* @return an interface to the current default consumer.
153+
*/
154+
Consumer getDefaultConsumer();
155+
156+
/**
157+
* Set the current default consumer.
158+
*
159+
* Under certain circumstances it is possible for a channel to receive a
160+
* message delivery which does not match any consumer which is currently
161+
* set up via basicConsume(). This will occur after the following sequence
162+
* of events:
163+
*
164+
* ctag = basicConsume(queue, consumer); // i.e. with explicit acks
165+
* // some deliveries take place but are not acked
166+
* basicCancel(ctag);
167+
* basicRecover(false);
168+
*
169+
* Since requeue is specified to be false in the basicRecover, the spec
170+
* states that the message must be redelivered to "the original recipient"
171+
* - i.e. the same channel / consumer-tag. But the consumer is no longer
172+
* active.
173+
*
174+
* In these circumstances, you can register a default consumer to handle
175+
* such deliveries. If no default consumer is registered an
176+
* IllegalStateException will be thrown when such a delivery arrives.
177+
*
178+
* Most people will not need to use this.
179+
*
180+
* @param consumer the consumer to use, or null indicating "don't use one".
181+
*/
182+
void setDefaultConsumer(Consumer consumer);
183+
133184
/**
134185
* Request specific "quality of service" settings.
135186
*

src/com/rabbitmq/client/ConnectionFactory.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -335,7 +335,13 @@ protected FrameHandler createFrameHandler(Address addr)
335335
Socket socket = factory.createSocket();
336336
configureSocket(socket);
337337
socket.connect(new InetSocketAddress(hostName, portNumber));
338-
return new SocketFrameHandler(socket);
338+
return createFrameHandler(socket);
339+
}
340+
341+
protected FrameHandler createFrameHandler(Socket sock)
342+
throws IOException
343+
{
344+
return new SocketFrameHandler(sock);
339345
}
340346

341347
/**

src/com/rabbitmq/client/Consumer.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,15 @@ public interface Consumer {
7171
void handleShutdownSignal(String consumerTag, ShutdownSignalException sig);
7272

7373
/**
74+
* Called to notify the consumer that we've received a basic.recover-ok
75+
* in reply to a basic.recover some other thread sent. All messages
76+
* received before this is invoked that haven't been ack'ed will be
77+
* redelivered. All messages received afterwards won't be.
7478
*
79+
* This method exists since all the Consumer callbacks are invoked by the
80+
* connection main loop thread - so it's sometimes useful to allow that
81+
* thread to know that the recover-ok has been received, rather than the
82+
* thread which invoked basicRecover().
7583
*/
7684
void handleRecoverOk();
7785

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
// The contents of this file are subject to the Mozilla Public License
2+
// Version 1.1 (the "License"); you may not use this file except in
3+
// compliance with the License. You may obtain a copy of the License at
4+
// http://www.mozilla.org/MPL/
5+
//
6+
// Software distributed under the License is distributed on an "AS IS"
7+
// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
8+
// License for the specific language governing rights and limitations
9+
// under the License.
10+
//
11+
// The Original Code is RabbitMQ.
12+
//
13+
// The Initial Developers of the Original Code are LShift Ltd,
14+
// Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
15+
//
16+
// Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
17+
// Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
18+
// are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
19+
// Technologies LLC, and Rabbit Technologies Ltd.
20+
//
21+
// Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
22+
// Ltd. Portions created by Cohesive Financial Technologies LLC are
23+
// Copyright (C) 2007-2010 Cohesive Financial Technologies
24+
// LLC. Portions created by Rabbit Technologies Ltd are Copyright
25+
// (C) 2007-2010 Rabbit Technologies Ltd.
26+
//
27+
// All Rights Reserved.
28+
//
29+
// Contributor(s): ______________________________________.
30+
//
31+
32+
package com.rabbitmq.client;
33+
34+
import java.io.IOException;
35+
36+
/**
37+
* Implement this interface in order to be notified of Channel.Flow
38+
* events.
39+
*/
40+
public interface FlowListener {
41+
void handleFlow(boolean active)
42+
throws IOException;
43+
}

src/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 64 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import com.rabbitmq.client.Connection;
3838
import com.rabbitmq.client.Consumer;
3939
import com.rabbitmq.client.Envelope;
40+
import com.rabbitmq.client.FlowListener;
4041
import com.rabbitmq.client.GetResponse;
4142
import com.rabbitmq.client.MessageProperties;
4243
import com.rabbitmq.client.ReturnListener;
@@ -93,6 +94,15 @@ public class ChannelN extends AMQChannel implements com.rabbitmq.client.Channel
9394
*/
9495
public volatile ReturnListener returnListener = null;
9596

97+
/** Reference to the currently-active FlowListener, or null if there is none.
98+
*/
99+
public volatile FlowListener flowListener = null;
100+
101+
/** Reference to the currently-active default consumer, or null if there is
102+
* none.
103+
*/
104+
public volatile Consumer defaultConsumer = null;
105+
96106
/**
97107
* Construct a new channel on the given connection with the given
98108
* channel number. Usually not called directly - call
@@ -130,6 +140,32 @@ public void setReturnListener(ReturnListener listener) {
130140
returnListener = listener;
131141
}
132142

143+
/** Returns the current FlowListener. */
144+
public FlowListener getFlowListener() {
145+
return flowListener;
146+
}
147+
148+
/**
149+
* Sets the current FlowListener.
150+
* A null argument is interpreted to mean "do not use a flow listener".
151+
*/
152+
public void setFlowListener(FlowListener listener) {
153+
flowListener = listener;
154+
}
155+
156+
/** Returns the current default consumer. */
157+
public Consumer getDefaultConsumer() {
158+
return defaultConsumer;
159+
}
160+
161+
/**
162+
* Sets the current default consumer.
163+
* A null argument is interpreted to mean "do not use a default consumer".
164+
*/
165+
public void setDefaultConsumer(Consumer consumer) {
166+
defaultConsumer = consumer;
167+
}
168+
133169
/**
134170
* Protected API - sends a ShutdownSignal to all active consumers.
135171
* @param signal an exception signalling channel shutdown
@@ -182,7 +218,7 @@ public void releaseChannelNumber() {
182218
// If we are not, however, then we are in a quiescing, or
183219
// shutting-down state as the result of an application
184220
// decision to close this channel, and we are to discard all
185-
// incoming commands except for close and close-ok.
221+
// incoming commands except for a close and close-ok.
186222

187223
Method method = command.getMethod();
188224

@@ -210,8 +246,17 @@ public void releaseChannelNumber() {
210246

211247
Consumer callback = _consumers.get(m.consumerTag);
212248
if (callback == null) {
213-
// FIXME: what to do when we get such an unsolicited delivery?
214-
throw new UnsupportedOperationException("FIXME unsolicited delivery");
249+
if (defaultConsumer == null) {
250+
// No handler set. We should blow up as this message
251+
// needs acking, just dropping it is not enough. See bug
252+
// 22587 for discussion.
253+
throw new IllegalStateException("Unsolicited delivery -" +
254+
" see Channel.setDefaultConsumer to handle this" +
255+
" case.");
256+
}
257+
else {
258+
callback = defaultConsumer;
259+
}
215260
}
216261

217262
Envelope envelope = new Envelope(m.deliveryTag,
@@ -256,6 +301,14 @@ public void releaseChannelNumber() {
256301
transmit(new Channel.FlowOk(channelFlow.active));
257302
_channelMutex.notifyAll();
258303
}
304+
FlowListener l = getFlowListener();
305+
if (l != null) {
306+
try {
307+
l.handleFlow(channelFlow.active);
308+
} catch (Throwable ex) {
309+
_connection.getExceptionHandler().handleFlowListenerException(this, ex);
310+
}
311+
}
259312
return true;
260313
} else if (method instanceof Basic.RecoverOk) {
261314
for (Consumer callback: _consumers.values()) {
@@ -686,13 +739,14 @@ public Consumer transformReply(AMQCommand replyCommand) {
686739
}
687740
}
688741

689-
/** Public API - {@inheritDoc} */
742+
/** Public API - {@inheritDoc} */
690743
public Basic.RecoverOk basicRecover(boolean requeue)
691744
throws IOException
692745
{
693746
return (Basic.RecoverOk) exnWrappingRpc(new Basic.Recover(requeue)).getMethod();
694747
}
695748

749+
696750
/** Public API - {@inheritDoc} */
697751
public void basicRecoverAsync(boolean requeue)
698752
throws IOException
@@ -725,4 +779,10 @@ public Tx.RollbackOk txRollback()
725779
public Channel.FlowOk flow(final boolean a) throws IOException {
726780
return (Channel.FlowOk) exnWrappingRpc(new Channel.Flow() {{active = a;}}).getMethod();
727781
}
782+
783+
/** Public API - {@inheritDoc} */
784+
public Channel.FlowOk getFlow() {
785+
return new Channel.FlowOk(!_blockContent);
786+
}
787+
728788
}

src/com/rabbitmq/client/impl/DefaultExceptionHandler.java

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -48,40 +48,40 @@ public void handleUnexpectedConnectionDriverException(Connection conn, Throwable
4848
}
4949

5050
public void handleReturnListenerException(Channel channel, Throwable exception) {
51-
// TODO: Convert to logging framework
52-
System.err.println("ReturnListener.handleBasicReturn threw an exception for channel " +
53-
channel + ":");
54-
exception.printStackTrace();
55-
try {
56-
((AMQConnection) channel.getConnection()).close(AMQP.INTERNAL_ERROR,
57-
"Internal error in ReturnListener",
58-
false,
59-
exception);
60-
} catch (IOException ioe) {
61-
// Man, this clearly isn't our day.
62-
// Ignore the exception? TODO: Log the nested failure
63-
}
51+
handleChannelKiller(channel, exception, "ReturnListener.handleBasicReturn");
6452
}
6553

66-
public void handleConsumerException(Channel channel,
67-
Throwable exception,
68-
Consumer consumer,
69-
String consumerTag,
54+
public void handleFlowListenerException(Channel channel, Throwable exception) {
55+
handleChannelKiller(channel, exception, "FlowListener.handleFlow");
56+
}
57+
58+
public void handleConsumerException(Channel channel, Throwable exception,
59+
Consumer consumer, String consumerTag,
7060
String methodName)
61+
{
62+
handleChannelKiller(channel, exception, "Consumer " + consumer
63+
+ " (" + consumerTag + ")"
64+
+ " method " + methodName
65+
+ " for channel " + channel);
66+
}
67+
68+
protected void handleChannelKiller(Channel channel,
69+
Throwable exception,
70+
String what)
7171
{
7272
// TODO: Convert to logging framework
73-
System.err.println("Consumer " + consumer + " method " + methodName + " for channel " +
74-
channel + " threw an exception:");
73+
System.err.println(what + " threw an exception for channel " +
74+
channel + ":");
7575
exception.printStackTrace();
7676
try {
7777
((AMQConnection) channel.getConnection()).close(AMQP.INTERNAL_ERROR,
78-
"Internal error in Consumer " +
79-
consumerTag,
78+
"Internal error in " + what,
8079
false,
8180
exception);
8281
} catch (IOException ioe) {
8382
// Man, this clearly isn't our day.
8483
// Ignore the exception? TODO: Log the nested failure
8584
}
85+
8686
}
8787
}

src/com/rabbitmq/client/impl/ExceptionHandler.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,16 @@ public interface ExceptionHandler {
5757
*/
5858
void handleReturnListenerException(Channel channel, Throwable exception);
5959

60+
/**
61+
* Perform any required exception processing for the situation
62+
* when the driver thread for the connection has called a
63+
* FlowListener's handleFlow method, and that method has
64+
* thrown an exeption.
65+
* @param channel the ChannelN that held the FlowListener
66+
* @param exception the exception thrown by FlowListener.handleFlow
67+
*/
68+
void handleFlowListenerException(Channel channel, Throwable exception);
69+
6070
/**
6171
* Perform any required exception processing for the situation
6272
* when the driver thread for the connection has called a method

test/src/com/rabbitmq/client/test/AMQConnectionTest.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,10 @@ public void handleReturnListenerException(Channel ch, Throwable ex) {
197197
fail("handleReturnListenerException: " + ex);
198198
}
199199

200+
public void handleFlowListenerException(Channel ch, Throwable ex) {
201+
fail("handleFlowListenerException: " + ex);
202+
}
203+
200204
public void handleConsumerException(Channel ch,
201205
Throwable ex,
202206
Consumer c,
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package com.rabbitmq.client.test.functional;
2+
3+
import com.rabbitmq.client.test.BrokerTestCase;
4+
5+
import java.io.IOException;
6+
7+
/**
8+
* See bug 21843. It's not obvious this is the right thing to do, but it's in
9+
* the spec.
10+
*/
11+
public class BindToDefaultExchange extends BrokerTestCase {
12+
public void testBindToDefaultExchange() throws IOException {
13+
String queue = channel.queueDeclare().getQueue();
14+
channel.queueBind(queue, "", "foobar");
15+
basicPublishVolatile("", "foobar"); // Explicit binding
16+
basicPublishVolatile("", queue); // Implicit binding
17+
assertDelivered(queue, 2);
18+
}
19+
}

0 commit comments

Comments
 (0)