Skip to content

Commit 35e8aed

Browse files
author
Matthias Radestock
committed
react to channel.flow
1 parent cbaef5f commit 35e8aed

File tree

2 files changed

+20
-1
lines changed

2 files changed

+20
-1
lines changed

src/com/rabbitmq/client/impl/AMQChannel.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,10 @@ public abstract class AMQChannel extends ShutdownNotifierComponent {
5555
/** The current outstanding RPC request, if any. (Could become a queue in future.) */
5656
public RpcContinuation _activeRpc = null;
5757

58+
/** Whether transmission of content-bearing methods should be blocked
59+
*/
60+
public boolean _blockContent = false;
61+
5862
/**
5963
* Construct a channel on the given connection, with the given channel number.
6064
* @param connection the underlying connection for this channel
@@ -246,10 +250,17 @@ public synchronized void transmit(AMQCommand c) throws IOException {
246250
}
247251

248252
public synchronized void quiescingTransmit(Method m) throws IOException {
249-
new AMQCommand(m).transmit(this);
253+
quiescingTransmit(new AMQCommand(m));
250254
}
251255

252256
public synchronized void quiescingTransmit(AMQCommand c) throws IOException {
257+
if (c.getMethod().hasContent()) {
258+
while (_blockContent) {
259+
try {
260+
wait();
261+
} catch (InterruptedException e) {}
262+
}
263+
}
253264
c.transmit(this);
254265
}
255266

src/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,14 @@ public void releaseChannelNumber() {
215215
}
216216
}
217217
return true;
218+
} else if (method instanceof Channel.Flow) {
219+
Channel.Flow channelFlow = (Channel.Flow) method;
220+
synchronized(this) {
221+
_blockContent = !channelFlow.active;
222+
transmit(new Channel.FlowOk(channelFlow.active));
223+
notifyAll();
224+
}
225+
return true;
218226
} else if (method instanceof Channel.Close) {
219227
releaseChannelNumber();
220228
ShutdownSignalException signal = new ShutdownSignalException(false,

0 commit comments

Comments
 (0)