Skip to content

Commit 052f9ba

Browse files
author
Alexandru Scvortov
committed
restart bug22412 off default
1 parent 4e2621d commit 052f9ba

File tree

5 files changed

+79
-0
lines changed

5 files changed

+79
-0
lines changed

src/com/rabbitmq/client/Channel.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,11 @@ public interface Channel extends ShutdownNotifier {
101101
*/
102102
FlowOk flow(boolean active) throws IOException;
103103

104+
/**
105+
* Return the current Channel.Flow settings.
106+
*/
107+
FlowOk getFlow();
108+
104109
/**
105110
* Abort this channel with the {@link com.rabbitmq.client.AMQP#REPLY_SUCCESS} close code
106111
* and message 'OK'.
@@ -130,6 +135,18 @@ public interface Channel extends ShutdownNotifier {
130135
*/
131136
void setReturnListener(ReturnListener listener);
132137

138+
/**
139+
* Return the current {@link FlowListener}.
140+
* @return an interface to the current flow listener.
141+
*/
142+
FlowListener getFlowListener();
143+
144+
/**
145+
* Set the current {@link FlowListener}.
146+
* @param listener the listener to use, or null indicating "don't use one".
147+
*/
148+
void setFlowListener(FlowListener listener);
149+
133150
/**
134151
* Request specific "quality of service" settings.
135152
*

src/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import com.rabbitmq.client.Connection;
3838
import com.rabbitmq.client.Consumer;
3939
import com.rabbitmq.client.Envelope;
40+
import com.rabbitmq.client.FlowListener;
4041
import com.rabbitmq.client.GetResponse;
4142
import com.rabbitmq.client.MessageProperties;
4243
import com.rabbitmq.client.ReturnListener;
@@ -93,6 +94,10 @@ public class ChannelN extends AMQChannel implements com.rabbitmq.client.Channel
9394
*/
9495
public volatile ReturnListener returnListener = null;
9596

97+
/** Reference to the currently-active FlowListener, or null if there is none.
98+
*/
99+
public volatile FlowListener flowListener = null;
100+
96101
/**
97102
* Construct a new channel on the given connection with the given
98103
* channel number. Usually not called directly - call
@@ -130,6 +135,19 @@ public void setReturnListener(ReturnListener listener) {
130135
returnListener = listener;
131136
}
132137

138+
/** Returns the current FlowListener. */
139+
public FlowListener getFlowListener() {
140+
return flowListener;
141+
}
142+
143+
/**
144+
* Sets the current FlowListener.
145+
* A null argument is interpreted to mean "do not use a flow listener".
146+
*/
147+
public void setFlowListener(FlowListener listener) {
148+
flowListener = listener;
149+
}
150+
133151
/**
134152
* Protected API - sends a ShutdownSignal to all active consumers.
135153
* @param signal an exception signalling channel shutdown
@@ -257,6 +275,14 @@ public void releaseChannelNumber() {
257275
transmit(new Channel.FlowOk(channelFlow.active));
258276
_channelMutex.notifyAll();
259277
}
278+
FlowListener l = getFlowListener();
279+
if (l != null) {
280+
try {
281+
l.handleFlow(channelFlow.active);
282+
} catch (Throwable ex) {
283+
_connection.getExceptionHandler().handleFlowListenerException(this, ex);
284+
}
285+
}
260286
return true;
261287
} else {
262288
return false;
@@ -704,4 +730,10 @@ public Tx.RollbackOk txRollback()
704730
public Channel.FlowOk flow(final boolean a) throws IOException {
705731
return (Channel.FlowOk) exnWrappingRpc(new Channel.Flow() {{active = a;}}).getMethod();
706732
}
733+
734+
/** Public API - {@inheritDoc} */
735+
public Channel.FlowOk getFlow() {
736+
return new Channel.FlowOk(!_blockContent);
737+
}
738+
707739
}

src/com/rabbitmq/client/impl/DefaultExceptionHandler.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,22 @@ public void handleReturnListenerException(Channel channel, Throwable exception)
6363
}
6464
}
6565

66+
public void handleFlowListenerException(Channel channel, Throwable exception) {
67+
// TODO: Convert to logging framework
68+
System.err.println("FlowListener.handleFlow threw an exception for channel " +
69+
channel + ":");
70+
exception.printStackTrace();
71+
try {
72+
((AMQConnection) channel.getConnection()).close(AMQP.INTERNAL_ERROR,
73+
"Internal error in FlowListener",
74+
false,
75+
exception);
76+
} catch (IOException ioe) {
77+
// Man, this clearly isn't our day.
78+
// Ignore the exception? TODO: Log the nested failure
79+
}
80+
}
81+
6682
public void handleConsumerException(Channel channel,
6783
Throwable exception,
6884
Consumer consumer,

src/com/rabbitmq/client/impl/ExceptionHandler.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,16 @@ public interface ExceptionHandler {
5757
*/
5858
void handleReturnListenerException(Channel channel, Throwable exception);
5959

60+
/**
61+
* Perform any required exception processing for the situation
62+
* when the driver thread for the connection has called a
63+
* FlowListener's handleFlow method, and that method has
64+
* thrown an exeption.
65+
* @param channel the ChannelN that held the FlowListener
66+
* @param exception the exception thrown by FlowListener.handleFlow
67+
*/
68+
void handleFlowListenerException(Channel channel, Throwable exception);
69+
6070
/**
6171
* Perform any required exception processing for the situation
6272
* when the driver thread for the connection has called a method

test/src/com/rabbitmq/client/test/AMQConnectionTest.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,10 @@ public void handleReturnListenerException(Channel ch, Throwable ex) {
200200
fail("handleReturnListenerException: " + ex);
201201
}
202202

203+
public void handleFlowListenerException(Channel ch, Throwable ex) {
204+
fail("handleFlowListenerException: " + ex);
205+
}
206+
203207
public void handleConsumerException(Channel ch,
204208
Throwable ex,
205209
Consumer c,

0 commit comments

Comments
 (0)