Skip to content

Commit e3dcfb0

Browse files
author
Matthias Radestock
committed
merge bug19559 into default
2 parents 9578a70 + d44348d commit e3dcfb0

File tree

2 files changed

+27
-3
lines changed

2 files changed

+27
-3
lines changed

src/com/rabbitmq/client/impl/AMQChannel.java

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,9 @@ public abstract class AMQChannel extends ShutdownNotifierComponent {
5555
/** The current outstanding RPC request, if any. (Could become a queue in future.) */
5656
public RpcContinuation _activeRpc = null;
5757

58+
/** Whether transmission of content-bearing methods should be blocked */
59+
public boolean _blockContent = false;
60+
5861
/**
5962
* Construct a channel on the given connection, with the given channel number.
6063
* @param connection the underlying connection for this channel
@@ -233,6 +236,8 @@ public void processShutdownSignal(ShutdownSignalException signal,
233236
ensureIsOpen(); // invariant: we should never be shut down more than once per instance
234237
if (isOpen())
235238
_shutdownCause = signal;
239+
240+
notifyAll();
236241
}
237242
} finally {
238243
if (notifyRpc)
@@ -248,8 +253,7 @@ public void notifyOutstandingRpc(ShutdownSignalException signal) {
248253
}
249254

250255
public synchronized void transmit(Method m) throws IOException {
251-
ensureIsOpen();
252-
quiescingTransmit(m);
256+
transmit(new AMQCommand(m));
253257
}
254258

255259
public synchronized void transmit(AMQCommand c) throws IOException {
@@ -258,10 +262,22 @@ public synchronized void transmit(AMQCommand c) throws IOException {
258262
}
259263

260264
public synchronized void quiescingTransmit(Method m) throws IOException {
261-
new AMQCommand(m).transmit(this);
265+
quiescingTransmit(new AMQCommand(m));
262266
}
263267

264268
public synchronized void quiescingTransmit(AMQCommand c) throws IOException {
269+
if (c.getMethod().hasContent()) {
270+
while (_blockContent) {
271+
try {
272+
wait();
273+
} catch (InterruptedException e) {}
274+
275+
// This is to catch a situation when the thread wakes up during
276+
// shutdown. Currently, no command that has content is allowed
277+
// to send anything in a closing state.
278+
ensureIsOpen();
279+
}
280+
}
265281
c.transmit(this);
266282
}
267283

src/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,14 @@ public void releaseChannelNumber() {
235235
}
236236
}
237237
return true;
238+
} else if (method instanceof Channel.Flow) {
239+
Channel.Flow channelFlow = (Channel.Flow) method;
240+
synchronized(this) {
241+
_blockContent = !channelFlow.active;
242+
transmit(new Channel.FlowOk(channelFlow.active));
243+
notifyAll();
244+
}
245+
return true;
238246
} else {
239247
return false;
240248
}

0 commit comments

Comments
 (0)