@@ -231,7 +231,7 @@ private CountDownLatch broadcastShutdownSignal(ShutdownSignalException signal) {
231231 }
232232 }
233233
234- public CountDownLatch getShutdownLatch () {
234+ CountDownLatch getShutdownLatch () {
235235 return this .finishedShutdownFlag ;
236236 }
237237
@@ -385,7 +385,7 @@ private void callReturnListeners(Command command, Basic.Return basicReturn) {
385385 }
386386 }
387387
388- private void callFlowListeners (Command command , Channel .Flow channelFlow ) {
388+ private void callFlowListeners (@ SuppressWarnings ( "unused" ) Command command , Channel .Flow channelFlow ) {
389389 try {
390390 for (FlowListener l : this .flowListeners ) {
391391 l .handleFlow (channelFlow .getActive ());
@@ -395,7 +395,7 @@ private void callFlowListeners(Command command, Channel.Flow channelFlow) {
395395 }
396396 }
397397
398- private void callConfirmListeners (Command command , Basic .Ack ack ) {
398+ private void callConfirmListeners (@ SuppressWarnings ( "unused" ) Command command , Basic .Ack ack ) {
399399 try {
400400 for (ConfirmListener l : this .confirmListeners ) {
401401 l .handleAck (ack .getDeliveryTag (), ack .getMultiple ());
@@ -405,7 +405,7 @@ private void callConfirmListeners(Command command, Basic.Ack ack) {
405405 }
406406 }
407407
408- private void callConfirmListeners (Command command , Basic .Nack nack ) {
408+ private void callConfirmListeners (@ SuppressWarnings ( "unused" ) Command command , Basic .Nack nack ) {
409409 try {
410410 for (ConfirmListener l : this .confirmListeners ) {
411411 l .handleNack (nack .getDeliveryTag (), nack .getMultiple ());
@@ -460,10 +460,17 @@ public void abort(int closeCode, String closeMessage)
460460 close (closeCode , closeMessage , true , null , true );
461461 }
462462
463+ // TODO: method should be private
463464 /**
464465 * Protected API - Close channel with code and message, indicating
465466 * the source of the closure and a causing exception (null if
466467 * none).
468+ * @param closeCode the close code (See under "Reply Codes" in the AMQP specification)
469+ * @param closeMessage a message indicating the reason for closing the connection
470+ * @param initiatedByApplication true if this comes from an API call, false otherwise
471+ * @param cause exception triggering close
472+ * @param abort true if we should close and ignore errors
473+ * @throws IOException if an error is encountered
467474 */
468475 public void close (int closeCode ,
469476 String closeMessage ,
@@ -905,14 +912,15 @@ public String transformReply(AMQCommand replyCommand) {
905912 public void basicCancel (final String consumerTag )
906913 throws IOException
907914 {
915+ final Consumer originalConsumer = _consumers .get (consumerTag );
916+ if (originalConsumer == null )
917+ throw new IOException ("Unknown consumerTag" );
908918 BlockingRpcContinuation <Consumer > k = new BlockingRpcContinuation <Consumer >() {
909919 public Consumer transformReply (AMQCommand replyCommand ) {
910- replyCommand .getMethod (); // discard result
911- Consumer callback = _consumers .remove (consumerTag );
912- // We need to call back inside the connection thread
913- // in order avoid races with 'deliver' commands
914- dispatcher .handleCancelOk (callback , consumerTag );
915- return callback ;
920+ replyCommand .getMethod ();
921+ _consumers .remove (consumerTag ); //may already have been removed
922+ dispatcher .handleCancelOk (originalConsumer , consumerTag );
923+ return originalConsumer ;
916924 }
917925 };
918926
0 commit comments