Skip to content

Commit c122e4d

Browse files
author
Simon MacMullen
committed
Merging bug21848 into amqp_0_9_1
2 parents ecfe921 + caf322a commit c122e4d

File tree

1 file changed

+28
-23
lines changed

1 file changed

+28
-23
lines changed

src/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 28 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,9 @@ public void releaseChannelNumber() {
171171

172172
/**
173173
* Protected API - Filters the inbound command stream, processing
174-
* Basic.Deliver, Basic.Return and Channel.Close specially.
174+
* Basic.Deliver, Basic.Return and Channel.Close specially. If
175+
* we're in quiescing mode, all inbound commands are ignored,
176+
* except for Channel.Close and Channel.CloseOk.
175177
*/
176178
@Override public boolean processAsync(Command command) throws IOException
177179
{
@@ -180,33 +182,30 @@ public void releaseChannelNumber() {
180182
// If we are not, however, then we are in a quiescing, or
181183
// shutting-down state as the result of an application
182184
// decision to close this channel, and we are to discard all
183-
// incoming commands except for a close-ok.
185+
// incoming commands except for close and close-ok.
184186

185187
Method method = command.getMethod();
186188

187-
if (method instanceof Channel.Close) {
188-
// Channel should always respond to Channel.Close
189-
// from the server
190-
releaseChannelNumber();
191-
ShutdownSignalException signal = new ShutdownSignalException(false,
192-
false,
193-
command,
194-
this);
195-
synchronized (_channelMutex) {
196-
try {
197-
processShutdownSignal(signal, true, false);
198-
quiescingTransmit(new Channel.CloseOk());
199-
} finally {
200-
notifyOutstandingRpc(signal);
201-
}
202-
}
203-
notifyListeners();
204-
return true;
205-
}
206189
if (isOpen()) {
207190
// We're in normal running mode.
208191

209-
if (method instanceof Basic.Deliver) {
192+
if (method instanceof Channel.Close) {
193+
releaseChannelNumber();
194+
ShutdownSignalException signal = new ShutdownSignalException(false,
195+
false,
196+
command,
197+
this);
198+
synchronized (_channelMutex) {
199+
try {
200+
processShutdownSignal(signal, true, false);
201+
quiescingTransmit(new Channel.CloseOk());
202+
} finally {
203+
notifyOutstandingRpc(signal);
204+
}
205+
}
206+
notifyListeners();
207+
return true;
208+
} else if (method instanceof Basic.Deliver) {
210209
Basic.Deliver m = (Basic.Deliver) method;
211210

212211
Consumer callback = _consumers.get(m.consumerTag);
@@ -273,7 +272,13 @@ public void releaseChannelNumber() {
273272
} else {
274273
// We're in quiescing mode.
275274

276-
if (method instanceof Channel.CloseOk) {
275+
if (method instanceof Channel.Close) {
276+
// We're already shutting down, so just send back an ok.
277+
synchronized (_channelMutex) {
278+
quiescingTransmit(new Channel.CloseOk());
279+
}
280+
return true;
281+
} else if (method instanceof Channel.CloseOk) {
277282
// We're quiescing, and we see a channel.close-ok:
278283
// this is our signal to leave quiescing mode and
279284
// finally shut down for good. Let it be handled as an

0 commit comments

Comments
 (0)