Skip to content

Commit 5ec8353

Browse files
author
Steve Powell
committed
Split close processing (channel and connection);
- start and finish parts - defer finish until processing response from the broker for application initiated close.
1 parent ad5125f commit 5ec8353

File tree

3 files changed

+51
-14
lines changed

3 files changed

+51
-14
lines changed

src/com/rabbitmq/client/impl/AMQConnection.java

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import com.rabbitmq.client.SaslConfig;
4242
import com.rabbitmq.client.SaslMechanism;
4343
import com.rabbitmq.client.ShutdownSignalException;
44+
import com.rabbitmq.client.impl.AMQChannel.BlockingRpcContinuation;
4445
import com.rabbitmq.utility.BlockingCell;
4546
import com.rabbitmq.utility.Utility;
4647

@@ -650,6 +651,16 @@ public ShutdownSignalException shutdown(Object reason,
650651
boolean initiatedByApplication,
651652
Throwable cause,
652653
boolean notifyRpc)
654+
{
655+
ShutdownSignalException sse = startShutdown(reason, initiatedByApplication, cause, notifyRpc);
656+
finishShutdown(sse);
657+
return sse;
658+
}
659+
660+
private ShutdownSignalException startShutdown(Object reason,
661+
boolean initiatedByApplication,
662+
Throwable cause,
663+
boolean notifyRpc)
653664
{
654665
ShutdownSignalException sse = new ShutdownSignalException(true,initiatedByApplication,
655666
reason, this);
@@ -664,10 +675,12 @@ public ShutdownSignalException shutdown(Object reason,
664675

665676
_channel0.processShutdownSignal(sse, !initiatedByApplication, notifyRpc);
666677

678+
return sse;
679+
}
680+
681+
private void finishShutdown(ShutdownSignalException sse) {
667682
ChannelManager cm = _channelManager;
668683
if (cm != null) cm.handleSignal(sse);
669-
670-
return sse;
671684
}
672685

673686
/** Public API - {@inheritDoc} */
@@ -762,10 +775,15 @@ public void close(int closeCode,
762775
.replyText(closeMessage)
763776
.build();
764777

765-
shutdown(reason, initiatedByApplication, cause, true);
778+
final ShutdownSignalException sse = startShutdown(reason, initiatedByApplication, cause, true);
766779
if(sync){
767-
AMQChannel.SimpleBlockingRpcContinuation k =
768-
new AMQChannel.SimpleBlockingRpcContinuation();
780+
BlockingRpcContinuation<AMQCommand> k = new BlockingRpcContinuation<AMQCommand>(){
781+
@Override
782+
public AMQCommand transformReply(AMQCommand command) {
783+
AMQConnection.this.finishShutdown(sse);
784+
return command;
785+
}};
786+
769787
_channel0.quiescingRpc(reason, k);
770788
k.getReply(timeout);
771789
} else {

src/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,7 @@ public void setDefaultConsumer(Consumer consumer) {
239239

240240
/**
241241
* Sends a ShutdownSignal to all active consumers.
242+
* Idempotent.
242243
* @param signal an exception signalling channel shutdown
243244
*/
244245
private void broadcastShutdownSignal(ShutdownSignalException signal) {
@@ -250,19 +251,39 @@ private void broadcastShutdownSignal(ShutdownSignalException signal) {
250251
}
251252

252253
/**
253-
* Protected API - overridden to quiesce consumer work and broadcast the signal
254-
* to all consumers after calling the superclass's method.
254+
* Start to shutdown -- defer rest of processing until ready
255255
*/
256-
@Override public void processShutdownSignal(ShutdownSignalException signal,
256+
private void startProcessShutdownSignal(ShutdownSignalException signal,
257257
boolean ignoreClosed,
258258
boolean notifyRpc)
259+
{ super.processShutdownSignal(signal, ignoreClosed, notifyRpc);
260+
}
261+
262+
/**
263+
* Finish shutdown processing -- idempotent
264+
*/
265+
private void finishProcessShutdownSignal()
259266
{
260-
super.processShutdownSignal(signal, ignoreClosed, notifyRpc);
267+
this.dispatcher.quiesce();
268+
broadcastShutdownSignal(getCloseReason());
269+
261270
synchronized (unconfirmedSet) {
262271
unconfirmedSet.notifyAll();
263272
}
264273
}
265274

275+
/**
276+
* Protected API - overridden to quiesce consumer work and broadcast the signal
277+
* to all consumers after calling the superclass's method.
278+
*/
279+
@Override public void processShutdownSignal(ShutdownSignalException signal,
280+
boolean ignoreClosed,
281+
boolean notifyRpc)
282+
{
283+
startProcessShutdownSignal(signal, ignoreClosed, notifyRpc);
284+
finishProcessShutdownSignal();
285+
}
286+
266287
CountDownLatch getShutdownLatch() {
267288
return this.finishedShutdownFlag;
268289
}
@@ -527,17 +548,15 @@ public void close(int closeCode,
527548
BlockingRpcContinuation<AMQCommand> k = new BlockingRpcContinuation<AMQCommand>(){
528549
@Override
529550
public AMQCommand transformReply(AMQCommand command) {
530-
ChannelN.this.dispatcher.quiesce();
531-
broadcastShutdownSignal(getCloseReason());
532-
551+
ChannelN.this.finishProcessShutdownSignal();
533552
return command;
534553
}};
535554
boolean notify = false;
536555
try {
537556
// Synchronize the block below to avoid race conditions in case
538557
// connnection wants to send Connection-CloseOK
539558
synchronized (_channelMutex) {
540-
processShutdownSignal(signal, !initiatedByApplication, true);
559+
startProcessShutdownSignal(signal, !initiatedByApplication, true);
541560
quiescingRpc(reason, k);
542561
}
543562

src/com/rabbitmq/client/impl/ConsumerDispatcher.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ public CountDownLatch handleShutdownSignal(final Map<String, Consumer> consumers
144144
// Execute shutdown processing even if there are no consumers.
145145
execute(new Runnable() {
146146
public void run() {
147-
notifyConsumersOfShutdown(consumers, signal);
147+
ConsumerDispatcher.this.notifyConsumersOfShutdown(consumers, signal);
148148
ConsumerDispatcher.this.shutdown(signal);
149149
ConsumerDispatcher.this.workService.stopWork(ConsumerDispatcher.this.channel);
150150
latch.countDown();

0 commit comments

Comments
 (0)