Skip to content

Commit 80db3ba

Browse files
author
Steve Powell
committed
Move channel shutdown processing to after close-ok reply received.
Move WorkPoolTests to com.rabbitmq.client.impl package space.
1 parent fabe315 commit 80db3ba

File tree

3 files changed

+16
-12
lines changed

3 files changed

+16
-12
lines changed

src/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -241,12 +241,12 @@ public void setDefaultConsumer(Consumer consumer) {
241241
* Sends a ShutdownSignal to all active consumers.
242242
* @param signal an exception signalling channel shutdown
243243
*/
244-
private CountDownLatch broadcastShutdownSignal(ShutdownSignalException signal) {
244+
private void broadcastShutdownSignal(ShutdownSignalException signal) {
245245
Map<String, Consumer> snapshotConsumers;
246246
synchronized (_consumers) {
247247
snapshotConsumers = new HashMap<String, Consumer>(_consumers);
248248
}
249-
return this.dispatcher.handleShutdownSignal(snapshotConsumers, signal);
249+
this.finishedShutdownFlag = this.dispatcher.handleShutdownSignal(snapshotConsumers, signal);
250250
}
251251

252252
/**
@@ -257,9 +257,7 @@ private CountDownLatch broadcastShutdownSignal(ShutdownSignalException signal) {
257257
boolean ignoreClosed,
258258
boolean notifyRpc)
259259
{
260-
this.dispatcher.quiesce();
261260
super.processShutdownSignal(signal, ignoreClosed, notifyRpc);
262-
this.finishedShutdownFlag = broadcastShutdownSignal(signal);
263261
synchronized (unconfirmedSet) {
264262
unconfirmedSet.notifyAll();
265263
}
@@ -526,7 +524,14 @@ public void close(int closeCode,
526524
signal.initCause(cause);
527525
}
528526

529-
BlockingRpcContinuation<AMQCommand> k = new SimpleBlockingRpcContinuation();
527+
BlockingRpcContinuation<AMQCommand> k = new BlockingRpcContinuation<AMQCommand>(){
528+
@Override
529+
public AMQCommand transformReply(AMQCommand command) {
530+
ChannelN.this.dispatcher.quiesce();
531+
broadcastShutdownSignal(getCloseReason());
532+
533+
return command;
534+
}};
530535
boolean notify = false;
531536
try {
532537
// Synchronize the block below to avoid race conditions in case
@@ -536,10 +541,10 @@ public void close(int closeCode,
536541
quiescingRpc(reason, k);
537542
}
538543

539-
// Now that we're in quiescing state, channel.close was sent and
540-
// we wait for the reply. We ignore the result.
541-
// (It's NOT always close-ok.)
542-
notify = true;
544+
// Now that we're in quiescing state, channel.close was sent and
545+
// we wait for the reply. We ignore the result.
546+
// (It's NOT always close-ok.)
547+
notify = true;
543548
k.getReply(-1);
544549
} catch (TimeoutException ise) {
545550
// Will never happen since we wait infinitely

test/src/com/rabbitmq/client/test/impl/WorkPoolTests.java renamed to test/src/com/rabbitmq/client/impl/WorkPoolTests.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
1-
package com.rabbitmq.client.test.impl;
1+
package com.rabbitmq.client.impl;
22

3-
import com.rabbitmq.client.impl.WorkPool;
43
import junit.framework.TestCase;
54

65
import java.util.ArrayList;

test/src/com/rabbitmq/client/test/functional/FunctionalTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@
1717

1818
package com.rabbitmq.client.test.functional;
1919

20+
import com.rabbitmq.client.impl.WorkPoolTests;
2021
import com.rabbitmq.client.test.Bug20004Test;
21-
import com.rabbitmq.client.test.impl.WorkPoolTests;
2222

2323
import junit.framework.TestCase;
2424
import junit.framework.TestSuite;

0 commit comments

Comments
 (0)