Skip to content

Commit 3136f72

Browse files
author
Matthew Sackman
committed
Support consumer cancellation. Sans documentation atm. And some of the tests will likely fail now and need adjustment
1 parent a72b0f3 commit 3136f72

File tree

6 files changed

+38
-0
lines changed

6 files changed

+38
-0
lines changed

src/com/rabbitmq/client/Channel.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,9 @@ public interface Channel extends ShutdownNotifier {
180180
*/
181181
void setDefaultConsumer(Consumer consumer);
182182

183+
ConsumerCancellationListener getConsumerCancellationListener();
184+
void setConsumerCancellationListener(ConsumerCancellationListener notifier);
185+
183186
/**
184187
* Request specific "quality of service" settings.
185188
*

src/com/rabbitmq/client/impl/AMQConnection.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ public static Map<String, Object> defaultClientProperties() {
6565
capabilities.put("publisher_confirms", true);
6666
capabilities.put("exchange_exchange_bindings", true);
6767
capabilities.put("basic.nack", true);
68+
capabilities.put("consumer_cancel_notify", true);
6869
return Frame.buildTable(new Object[] {
6970
"product", LongStringHelper.asLongString("RabbitMQ"),
7071
"version", LongStringHelper.asLongString(ClientVersion.VERSION),

src/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.rabbitmq.client.Command;
2424
import com.rabbitmq.client.Connection;
2525
import com.rabbitmq.client.Consumer;
26+
import com.rabbitmq.client.ConsumerCancellationListener;
2627
import com.rabbitmq.client.Envelope;
2728
import com.rabbitmq.client.FlowListener;
2829
import com.rabbitmq.client.GetResponse;
@@ -90,6 +91,8 @@ public class ChannelN extends AMQChannel implements com.rabbitmq.client.Channel
9091
*/
9192
public volatile ConfirmListener confirmListener = null;
9293

94+
public volatile ConsumerCancellationListener consumerCancellationListener = null;
95+
9396
/** Sequence number of next published message requiring confirmation.
9497
*/
9598
private long nextPublishSeqNo = 0L;
@@ -175,6 +178,15 @@ public void setDefaultConsumer(Consumer consumer) {
175178
defaultConsumer = consumer;
176179
}
177180

181+
public ConsumerCancellationListener getConsumerCancellationListener() {
182+
return consumerCancellationListener;
183+
}
184+
185+
public void setConsumerCancellationListener(
186+
ConsumerCancellationListener listener) {
187+
consumerCancellationListener = listener;
188+
}
189+
178190
/**
179191
* Protected API - sends a ShutdownSignal to all active consumers.
180192
* @param signal an exception signalling channel shutdown
@@ -339,6 +351,16 @@ public void releaseChannelNumber() {
339351
// be handled by whichever RPC continuation invoked Recover,
340352
// so return false
341353
return false;
354+
} else if (method instanceof Basic.Cancel) {
355+
ConsumerCancellationListener l = getConsumerCancellationListener();
356+
if (l != null) {
357+
try {
358+
l.handleConsumerCancellation(((Basic.Cancel)method).getConsumerTag());
359+
} catch (Throwable ex) {
360+
_connection.getExceptionHandler().handleConsumerCancellationException(this, ex);
361+
}
362+
}
363+
return true;
342364
} else {
343365
return false;
344366
}

src/com/rabbitmq/client/impl/DefaultExceptionHandler.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,12 @@ public void handleConsumerException(Channel channel, Throwable exception,
5555
+ " for channel " + channel);
5656
}
5757

58+
public void handleConsumerCancellationException(Channel channel,
59+
Throwable exception) {
60+
handleChannelKiller(channel, exception, "ConsumerCancellationListener.handleConsumerCancellation");
61+
62+
}
63+
5864
protected void handleChannelKiller(Channel channel,
5965
Throwable exception,
6066
String what)

src/com/rabbitmq/client/impl/ExceptionHandler.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,4 +78,6 @@ void handleConsumerException(Channel channel,
7878
Consumer consumer,
7979
String consumerTag,
8080
String methodName);
81+
82+
void handleConsumerCancellationException(Channel channel, Throwable exception);
8183
}

test/src/com/rabbitmq/client/test/AMQConnectionTest.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,10 @@ public void handleConsumerException(Channel ch,
197197
fail("handleConsumerException " + consumerTag + " " + methodName + ": " + ex);
198198
}
199199

200+
public void handleConsumerCancellationException(Channel channel, Throwable ex) {
201+
fail("handleConsmuerCancellationException " + ex);
202+
}
203+
200204
public List<Throwable> getHandledExceptions() {
201205
return _handledExceptions;
202206
}

0 commit comments

Comments
 (0)