Skip to content

Commit 04f1571

Browse files
committed
Merged 19559 into 19129
2 parents 0f15d9a + 1358a6f commit 04f1571

File tree

2 files changed

+26
-3
lines changed

2 files changed

+26
-3
lines changed

src/com/rabbitmq/client/impl/AMQChannel.java

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,9 @@ public abstract class AMQChannel extends ShutdownNotifierComponent {
5555
/** The current outstanding RPC request, if any. (Could become a queue in future.) */
5656
public RpcContinuation _activeRpc = null;
5757

58+
/** Whether transmission of content-bearing methods should be blocked */
59+
public boolean _blockContent = false;
60+
5861
/**
5962
* Construct a channel on the given connection, with the given channel number.
6063
* @param connection the underlying connection for this channel
@@ -233,6 +236,7 @@ public void processShutdownSignal(ShutdownSignalException signal,
233236
ensureIsOpen(); // invariant: we should never be shut down more than once per instance
234237
if (isOpen())
235238
_shutdownCause = signal;
239+
setBlockContent(true);
236240
}
237241
} finally {
238242
if (notifyRpc)
@@ -248,22 +252,34 @@ public void notifyOutstandingRpc(ShutdownSignalException signal) {
248252
}
249253

250254
public synchronized void transmit(Method m) throws IOException {
251-
ensureIsOpen();
252-
quiescingTransmit(m);
255+
transmit(new AMQCommand(m));
253256
}
254257

255258
public synchronized void transmit(AMQCommand c) throws IOException {
256259
ensureIsOpen();
260+
if (c.getMethod().hasContent()) {
261+
while (_blockContent) {
262+
try {
263+
wait();
264+
} catch (InterruptedException e) {}
265+
ensureIsOpen();
266+
}
267+
}
257268
quiescingTransmit(c);
258269
}
259270

260271
public synchronized void quiescingTransmit(Method m) throws IOException {
261-
new AMQCommand(m).transmit(this);
272+
quiescingTransmit(new AMQCommand(m));
262273
}
263274

264275
public synchronized void quiescingTransmit(AMQCommand c) throws IOException {
265276
c.transmit(this);
266277
}
278+
279+
public synchronized void setBlockContent(boolean active) {
280+
_blockContent = !active;
281+
notifyAll();
282+
}
267283

268284
public AMQConnection getAMQConnection() {
269285
return _connection;

src/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,13 @@ public void releaseChannelNumber() {
229229
}
230230
}
231231
return true;
232+
} else if (method instanceof Channel.Flow) {
233+
Channel.Flow channelFlow = (Channel.Flow) method;
234+
synchronized(this) {
235+
setBlockContent(channelFlow.active);
236+
transmit(new Channel.FlowOk(channelFlow.active));
237+
}
238+
return true;
232239
} else {
233240
return false;
234241
}

0 commit comments

Comments
 (0)