Skip to content

Commit f9551d3

Browse files
author
Hubert Plociniczak
committed
All content-bearing methods should be blocked
the client got Channel.Flow{active=false} message, even in closing state.
1 parent 1358a6f commit f9551d3

File tree

2 files changed

+19
-15
lines changed

2 files changed

+19
-15
lines changed

src/com/rabbitmq/client/impl/AMQChannel.java

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,11 @@ public void processShutdownSignal(ShutdownSignalException signal,
236236
ensureIsOpen(); // invariant: we should never be shut down more than once per instance
237237
if (isOpen())
238238
_shutdownCause = signal;
239-
setBlockContent(true);
239+
240+
synchronized(this) {
241+
_blockContent = false;
242+
notifyAll();
243+
}
240244
}
241245
} finally {
242246
if (notifyRpc)
@@ -257,14 +261,6 @@ public synchronized void transmit(Method m) throws IOException {
257261

258262
public synchronized void transmit(AMQCommand c) throws IOException {
259263
ensureIsOpen();
260-
if (c.getMethod().hasContent()) {
261-
while (_blockContent) {
262-
try {
263-
wait();
264-
} catch (InterruptedException e) {}
265-
ensureIsOpen();
266-
}
267-
}
268264
quiescingTransmit(c);
269265
}
270266

@@ -273,13 +269,20 @@ public synchronized void quiescingTransmit(Method m) throws IOException {
273269
}
274270

275271
public synchronized void quiescingTransmit(AMQCommand c) throws IOException {
272+
if (c.getMethod().hasContent()) {
273+
while (_blockContent) {
274+
try {
275+
wait();
276+
} catch (InterruptedException e) {}
277+
278+
// This is to catch a situation when the thread wakes up during
279+
// shutdown. Currently, no command that has content is allowed
280+
// to send anything in a closing state.
281+
ensureIsOpen();
282+
}
283+
}
276284
c.transmit(this);
277285
}
278-
279-
public synchronized void setBlockContent(boolean active) {
280-
_blockContent = !active;
281-
notifyAll();
282-
}
283286

284287
public AMQConnection getAMQConnection() {
285288
return _connection;

src/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -222,8 +222,9 @@ public void releaseChannelNumber() {
222222
} else if (method instanceof Channel.Flow) {
223223
Channel.Flow channelFlow = (Channel.Flow) method;
224224
synchronized(this) {
225-
setBlockContent(channelFlow.active);
225+
_blockContent = !channelFlow.active;
226226
transmit(new Channel.FlowOk(channelFlow.active));
227+
notifyAll();
227228
}
228229
return true;
229230
} else if (method instanceof Channel.Close) {

0 commit comments

Comments
 (0)