Skip to content

Commit 1f83d67

Browse files
author
Emile Joubert
committed
Merged bug22412 into default
2 parents 61d6c27 + 9a21229 commit 1f83d67

File tree

6 files changed

+129
-22
lines changed

6 files changed

+129
-22
lines changed

src/com/rabbitmq/client/Channel.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,11 @@ public interface Channel extends ShutdownNotifier {
101101
*/
102102
FlowOk flow(boolean active) throws IOException;
103103

104+
/**
105+
* Return the current Channel.Flow settings.
106+
*/
107+
FlowOk getFlow();
108+
104109
/**
105110
* Abort this channel with the {@link com.rabbitmq.client.AMQP#REPLY_SUCCESS} close code
106111
* and message 'OK'.
@@ -130,6 +135,18 @@ public interface Channel extends ShutdownNotifier {
130135
*/
131136
void setReturnListener(ReturnListener listener);
132137

138+
/**
139+
* Return the current {@link FlowListener}.
140+
* @return an interface to the current flow listener.
141+
*/
142+
FlowListener getFlowListener();
143+
144+
/**
145+
* Set the current {@link FlowListener}.
146+
* @param listener the listener to use, or null indicating "don't use one".
147+
*/
148+
void setFlowListener(FlowListener listener);
149+
133150
/**
134151
* Request specific "quality of service" settings.
135152
*
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
// The contents of this file are subject to the Mozilla Public License
2+
// Version 1.1 (the "License"); you may not use this file except in
3+
// compliance with the License. You may obtain a copy of the License at
4+
// http://www.mozilla.org/MPL/
5+
//
6+
// Software distributed under the License is distributed on an "AS IS"
7+
// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
8+
// License for the specific language governing rights and limitations
9+
// under the License.
10+
//
11+
// The Original Code is RabbitMQ.
12+
//
13+
// The Initial Developers of the Original Code are LShift Ltd,
14+
// Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
15+
//
16+
// Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
17+
// Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
18+
// are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
19+
// Technologies LLC, and Rabbit Technologies Ltd.
20+
//
21+
// Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
22+
// Ltd. Portions created by Cohesive Financial Technologies LLC are
23+
// Copyright (C) 2007-2010 Cohesive Financial Technologies
24+
// LLC. Portions created by Rabbit Technologies Ltd are Copyright
25+
// (C) 2007-2010 Rabbit Technologies Ltd.
26+
//
27+
// All Rights Reserved.
28+
//
29+
// Contributor(s): ______________________________________.
30+
//
31+
32+
package com.rabbitmq.client;
33+
34+
import java.io.IOException;
35+
36+
/**
37+
* Implement this interface in order to be notified of Channel.Flow
38+
* events.
39+
*/
40+
public interface FlowListener {
41+
void handleFlow(boolean active)
42+
throws IOException;
43+
}

src/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import com.rabbitmq.client.Connection;
3838
import com.rabbitmq.client.Consumer;
3939
import com.rabbitmq.client.Envelope;
40+
import com.rabbitmq.client.FlowListener;
4041
import com.rabbitmq.client.GetResponse;
4142
import com.rabbitmq.client.MessageProperties;
4243
import com.rabbitmq.client.ReturnListener;
@@ -93,6 +94,10 @@ public class ChannelN extends AMQChannel implements com.rabbitmq.client.Channel
9394
*/
9495
public volatile ReturnListener returnListener = null;
9596

97+
/** Reference to the currently-active FlowListener, or null if there is none.
98+
*/
99+
public volatile FlowListener flowListener = null;
100+
96101
/**
97102
* Construct a new channel on the given connection with the given
98103
* channel number. Usually not called directly - call
@@ -130,6 +135,19 @@ public void setReturnListener(ReturnListener listener) {
130135
returnListener = listener;
131136
}
132137

138+
/** Returns the current FlowListener. */
139+
public FlowListener getFlowListener() {
140+
return flowListener;
141+
}
142+
143+
/**
144+
* Sets the current FlowListener.
145+
* A null argument is interpreted to mean "do not use a flow listener".
146+
*/
147+
public void setFlowListener(FlowListener listener) {
148+
flowListener = listener;
149+
}
150+
133151
/**
134152
* Protected API - sends a ShutdownSignal to all active consumers.
135153
* @param signal an exception signalling channel shutdown
@@ -171,7 +189,8 @@ public void releaseChannelNumber() {
171189

172190
/**
173191
* Protected API - Filters the inbound command stream, processing
174-
* Basic.Deliver, Basic.Return and Channel.Close specially.
192+
* Basic.Deliver, Basic.Return, Channel.Flow and Channel.Close
193+
* specially.
175194
*/
176195
@Override public boolean processAsync(Command command) throws IOException
177196
{
@@ -257,6 +276,14 @@ public void releaseChannelNumber() {
257276
transmit(new Channel.FlowOk(channelFlow.active));
258277
_channelMutex.notifyAll();
259278
}
279+
FlowListener l = getFlowListener();
280+
if (l != null) {
281+
try {
282+
l.handleFlow(channelFlow.active);
283+
} catch (Throwable ex) {
284+
_connection.getExceptionHandler().handleFlowListenerException(this, ex);
285+
}
286+
}
260287
return true;
261288
} else {
262289
return false;
@@ -704,4 +731,10 @@ public Tx.RollbackOk txRollback()
704731
public Channel.FlowOk flow(final boolean a) throws IOException {
705732
return (Channel.FlowOk) exnWrappingRpc(new Channel.Flow() {{active = a;}}).getMethod();
706733
}
734+
735+
/** Public API - {@inheritDoc} */
736+
public Channel.FlowOk getFlow() {
737+
return new Channel.FlowOk(!_blockContent);
738+
}
739+
707740
}

src/com/rabbitmq/client/impl/DefaultExceptionHandler.java

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -48,40 +48,40 @@ public void handleUnexpectedConnectionDriverException(Connection conn, Throwable
4848
}
4949

5050
public void handleReturnListenerException(Channel channel, Throwable exception) {
51-
// TODO: Convert to logging framework
52-
System.err.println("ReturnListener.handleBasicReturn threw an exception for channel " +
53-
channel + ":");
54-
exception.printStackTrace();
55-
try {
56-
((AMQConnection) channel.getConnection()).close(AMQP.INTERNAL_ERROR,
57-
"Internal error in ReturnListener",
58-
false,
59-
exception);
60-
} catch (IOException ioe) {
61-
// Man, this clearly isn't our day.
62-
// Ignore the exception? TODO: Log the nested failure
63-
}
51+
handleChannelKiller(channel, exception, "ReturnListener.handleBasicReturn");
6452
}
6553

66-
public void handleConsumerException(Channel channel,
67-
Throwable exception,
68-
Consumer consumer,
69-
String consumerTag,
54+
public void handleFlowListenerException(Channel channel, Throwable exception) {
55+
handleChannelKiller(channel, exception, "FlowListener.handleFlow");
56+
}
57+
58+
public void handleConsumerException(Channel channel, Throwable exception,
59+
Consumer consumer, String consumerTag,
7060
String methodName)
61+
{
62+
handleChannelKiller(channel, exception, "Consumer " + consumer
63+
+ " (" + consumerTag + ")"
64+
+ " method " + methodName
65+
+ " for channel " + channel);
66+
}
67+
68+
protected void handleChannelKiller(Channel channel,
69+
Throwable exception,
70+
String what)
7171
{
7272
// TODO: Convert to logging framework
73-
System.err.println("Consumer " + consumer + " method " + methodName + " for channel " +
74-
channel + " threw an exception:");
73+
System.err.println(what + " threw an exception for channel " +
74+
channel + ":");
7575
exception.printStackTrace();
7676
try {
7777
((AMQConnection) channel.getConnection()).close(AMQP.INTERNAL_ERROR,
78-
"Internal error in Consumer " +
79-
consumerTag,
78+
"Internal error in " + what,
8079
false,
8180
exception);
8281
} catch (IOException ioe) {
8382
// Man, this clearly isn't our day.
8483
// Ignore the exception? TODO: Log the nested failure
8584
}
85+
8686
}
8787
}

src/com/rabbitmq/client/impl/ExceptionHandler.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,16 @@ public interface ExceptionHandler {
5757
*/
5858
void handleReturnListenerException(Channel channel, Throwable exception);
5959

60+
/**
61+
* Perform any required exception processing for the situation
62+
* when the driver thread for the connection has called a
63+
* FlowListener's handleFlow method, and that method has
64+
* thrown an exeption.
65+
* @param channel the ChannelN that held the FlowListener
66+
* @param exception the exception thrown by FlowListener.handleFlow
67+
*/
68+
void handleFlowListenerException(Channel channel, Throwable exception);
69+
6070
/**
6171
* Perform any required exception processing for the situation
6272
* when the driver thread for the connection has called a method

test/src/com/rabbitmq/client/test/AMQConnectionTest.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,10 @@ public void handleReturnListenerException(Channel ch, Throwable ex) {
200200
fail("handleReturnListenerException: " + ex);
201201
}
202202

203+
public void handleFlowListenerException(Channel ch, Throwable ex) {
204+
fail("handleFlowListenerException: " + ex);
205+
}
206+
203207
public void handleConsumerException(Channel ch,
204208
Throwable ex,
205209
Consumer c,

0 commit comments

Comments
 (0)