Skip to content

Commit 1358a6f

Browse files
author
Hubert Plociniczak
committed
Notify the channels that await Channel.Flow
about connection/channel shutdown.
1 parent a85d78c commit 1358a6f

File tree

2 files changed

+17
-13
lines changed

2 files changed

+17
-13
lines changed

src/com/rabbitmq/client/impl/AMQChannel.java

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,7 @@ public abstract class AMQChannel extends ShutdownNotifierComponent {
5555
/** The current outstanding RPC request, if any. (Could become a queue in future.) */
5656
public RpcContinuation _activeRpc = null;
5757

58-
/** Whether transmission of content-bearing methods should be blocked
59-
*/
58+
/** Whether transmission of content-bearing methods should be blocked */
6059
public boolean _blockContent = false;
6160

6261
/**
@@ -237,6 +236,7 @@ public void processShutdownSignal(ShutdownSignalException signal,
237236
ensureIsOpen(); // invariant: we should never be shut down more than once per instance
238237
if (isOpen())
239238
_shutdownCause = signal;
239+
setBlockContent(true);
240240
}
241241
} finally {
242242
if (notifyRpc)
@@ -252,12 +252,19 @@ public void notifyOutstandingRpc(ShutdownSignalException signal) {
252252
}
253253

254254
public synchronized void transmit(Method m) throws IOException {
255-
ensureIsOpen();
256-
quiescingTransmit(m);
255+
transmit(new AMQCommand(m));
257256
}
258257

259258
public synchronized void transmit(AMQCommand c) throws IOException {
260259
ensureIsOpen();
260+
if (c.getMethod().hasContent()) {
261+
while (_blockContent) {
262+
try {
263+
wait();
264+
} catch (InterruptedException e) {}
265+
ensureIsOpen();
266+
}
267+
}
261268
quiescingTransmit(c);
262269
}
263270

@@ -266,15 +273,13 @@ public synchronized void quiescingTransmit(Method m) throws IOException {
266273
}
267274

268275
public synchronized void quiescingTransmit(AMQCommand c) throws IOException {
269-
if (c.getMethod().hasContent()) {
270-
while (_blockContent) {
271-
try {
272-
wait();
273-
} catch (InterruptedException e) {}
274-
}
275-
}
276276
c.transmit(this);
277277
}
278+
279+
public synchronized void setBlockContent(boolean active) {
280+
_blockContent = !active;
281+
notifyAll();
282+
}
278283

279284
public AMQConnection getAMQConnection() {
280285
return _connection;

src/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -222,9 +222,8 @@ public void releaseChannelNumber() {
222222
} else if (method instanceof Channel.Flow) {
223223
Channel.Flow channelFlow = (Channel.Flow) method;
224224
synchronized(this) {
225-
_blockContent = !channelFlow.active;
225+
setBlockContent(channelFlow.active);
226226
transmit(new Channel.FlowOk(channelFlow.active));
227-
notifyAll();
228227
}
229228
return true;
230229
} else if (method instanceof Channel.Close) {

0 commit comments

Comments
 (0)