Skip to content

Commit 5068e0f

Browse files
author
Steve Powell
committed
Check that consumerTag is valid on basicCancel; also freeze consumer it references at call time.
1 parent 56d92ba commit 5068e0f

File tree

2 files changed

+21
-13
lines changed

2 files changed

+21
-13
lines changed

src/com/rabbitmq/client/Channel.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -638,7 +638,7 @@ void basicNack(long deliveryTag, boolean multiple, boolean requeue)
638638
* Cancel a consumer. Calls the consumer's {@link Consumer#handleCancelOk}
639639
* method.
640640
* @param consumerTag a client- or server-generated consumer tag to establish context
641-
* @throws java.io.IOException if an error is encountered
641+
* @throws IOException if an error is encountered, or if the consumerTag is unknown
642642
* @see com.rabbitmq.client.AMQP.Basic.Cancel
643643
* @see com.rabbitmq.client.AMQP.Basic.CancelOk
644644
*/
@@ -649,12 +649,12 @@ void basicNack(long deliveryTag, boolean multiple, boolean requeue)
649649
* basic.recover is asynchronous; in 0-9-1 it is synchronous, and
650650
* the new, deprecated method basic.recover_async is asynchronous.
651651
* <p/>
652-
* Equivalent to calling <code>basicRecover(true)</code>, messages
653-
* will be requeued and possibly delivered to a different consumer.
652+
* Equivalent to calling <code>basicRecover(true)</code>, messages
653+
* will be requeued and possibly delivered to a different consumer.
654654
* @see #basicRecover(boolean)
655655
*/
656656
Basic.RecoverOk basicRecover() throws IOException;
657-
657+
658658
/**
659659
* Ask the broker to resend unacknowledged messages. In 0-8
660660
* basic.recover is asynchronous; in 0-9-1 it is synchronous, and

src/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,7 @@ private CountDownLatch broadcastShutdownSignal(ShutdownSignalException signal) {
233233
}
234234
}
235235

236-
public CountDownLatch getShutdownLatch() {
236+
CountDownLatch getShutdownLatch() {
237237
return this.finishedShutdownFlag;
238238
}
239239

@@ -387,7 +387,7 @@ private void callReturnListeners(Command command, Basic.Return basicReturn) {
387387
}
388388
}
389389

390-
private void callFlowListeners(Command command, Channel.Flow channelFlow) {
390+
private void callFlowListeners(@SuppressWarnings("unused") Command command, Channel.Flow channelFlow) {
391391
try {
392392
for (FlowListener l : this.flowListeners) {
393393
l.handleFlow(channelFlow.getActive());
@@ -397,7 +397,7 @@ private void callFlowListeners(Command command, Channel.Flow channelFlow) {
397397
}
398398
}
399399

400-
private void callConfirmListeners(Command command, Basic.Ack ack) {
400+
private void callConfirmListeners(@SuppressWarnings("unused") Command command, Basic.Ack ack) {
401401
try {
402402
for (ConfirmListener l : this.confirmListeners) {
403403
l.handleAck(ack.getDeliveryTag(), ack.getMultiple());
@@ -407,7 +407,7 @@ private void callConfirmListeners(Command command, Basic.Ack ack) {
407407
}
408408
}
409409

410-
private void callConfirmListeners(Command command, Basic.Nack nack) {
410+
private void callConfirmListeners(@SuppressWarnings("unused") Command command, Basic.Nack nack) {
411411
try {
412412
for (ConfirmListener l : this.confirmListeners) {
413413
l.handleNack(nack.getDeliveryTag(), nack.getMultiple());
@@ -463,9 +463,16 @@ public void abort(int closeCode, String closeMessage)
463463
}
464464

465465
/**
466+
* SHOULD BE PRIVATE<br/>
466467
* Protected API - Close channel with code and message, indicating
467468
* the source of the closure and a causing exception (null if
468469
* none).
470+
* @param closeCode the close code (See under "Reply Codes" in the AMQP specification)
471+
* @param closeMessage a message indicating the reason for closing the connection
472+
* @param initiatedByApplication true if this comes from an API call, false otherwise
473+
* @param cause exception triggering close
474+
* @param abort true if we should close and ignore errors
475+
* @throws IOException if an error is encountered
469476
*/
470477
public void close(int closeCode,
471478
String closeMessage,
@@ -907,15 +914,16 @@ public String transformReply(AMQCommand replyCommand) {
907914
public void basicCancel(final String consumerTag)
908915
throws IOException
909916
{
917+
final Consumer originalConsumer = _consumers.get(consumerTag);
918+
if (originalConsumer == null)
919+
throw new IOException("Unknown consumerTag");
910920
BlockingRpcContinuation<Consumer> k = new BlockingRpcContinuation<Consumer>() {
911921
public Consumer transformReply(AMQCommand replyCommand) {
912922
Basic.CancelOk dummy = (Basic.CancelOk) replyCommand.getMethod();
913923
Utility.use(dummy);
914-
Consumer callback = _consumers.remove(consumerTag);
915-
// We need to call back inside the connection thread
916-
// in order avoid races with 'deliver' commands
917-
dispatcher.handleCancelOk(callback, consumerTag);
918-
return callback;
924+
_consumers.remove(consumerTag); //may already have been removed
925+
dispatcher.handleCancelOk(originalConsumer, consumerTag);
926+
return originalConsumer;
919927
}
920928
};
921929

0 commit comments

Comments
 (0)