Skip to content

Commit 2a8d122

Browse files
author
Alexandru Scvortov
committed
added ack listener to Java API
1 parent 2ba6121 commit 2a8d122

File tree

5 files changed

+99
-1
lines changed

5 files changed

+99
-1
lines changed
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
// The contents of this file are subject to the Mozilla Public License
2+
// Version 1.1 (the "License"); you may not use this file except in
3+
// compliance with the License. You may obtain a copy of the License at
4+
// http://www.mozilla.org/MPL/
5+
//
6+
// Software distributed under the License is distributed on an "AS IS"
7+
// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
8+
// License for the specific language governing rights and limitations
9+
// under the License.
10+
//
11+
// The Original Code is RabbitMQ.
12+
//
13+
// The Initial Developers of the Original Code are LShift Ltd,
14+
// Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
15+
//
16+
// Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
17+
// Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
18+
// are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
19+
// Technologies LLC, and Rabbit Technologies Ltd.
20+
//
21+
// Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
22+
// Ltd. Portions created by Cohesive Financial Technologies LLC are
23+
// Copyright (C) 2007-2010 Cohesive Financial Technologies
24+
// LLC. Portions created by Rabbit Technologies Ltd are Copyright
25+
// (C) 2007-2010 Rabbit Technologies Ltd.
26+
//
27+
// All Rights Reserved.
28+
//
29+
// Contributor(s): ______________________________________.
30+
//
31+
32+
package com.rabbitmq.client;
33+
34+
import java.io.IOException;
35+
36+
/**
37+
* Implement this interface in order to be notified of Basic.Ack
38+
* events.
39+
*/
40+
public interface AckListener {
41+
void handleAck(long sequenceNumber)
42+
throws IOException;
43+
}

src/com/rabbitmq/client/Channel.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,18 @@ public interface Channel extends ShutdownNotifier {
148148
*/
149149
void setFlowListener(FlowListener listener);
150150

151+
/**
152+
* Return the current {@link AckListener}.
153+
* @return an interface to the current ack listener.
154+
*/
155+
AckListener getAckListener();
156+
157+
/**
158+
* Set the current {@link AckListener}.
159+
* @param listener the listener to use, or null indicating "don't use one".
160+
*/
161+
void setAckListener(AckListener listener);
162+
151163
/**
152164
* Get the current default consumer. @see setDefaultConsumer for rationale.
153165
* @return an interface to the current default consumer.

src/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131

3232
package com.rabbitmq.client.impl;
3333

34+
import com.rabbitmq.client.AckListener;
3435
import com.rabbitmq.client.AMQP.BasicProperties;
3536
import com.rabbitmq.client.AMQP;
3637
import com.rabbitmq.client.Command;
@@ -99,6 +100,10 @@ public class ChannelN extends AMQChannel implements com.rabbitmq.client.Channel
99100
*/
100101
public volatile FlowListener flowListener = null;
101102

103+
/** Reference to the currently-active AckListener, or null if there is none.
104+
*/
105+
public volatile AckListener ackListener = null;
106+
102107
/** Reference to the currently-active default consumer, or null if there is
103108
* none.
104109
*/
@@ -154,6 +159,19 @@ public void setFlowListener(FlowListener listener) {
154159
flowListener = listener;
155160
}
156161

162+
/** Returns the current AckListener. */
163+
public AckListener getAckListener() {
164+
return ackListener;
165+
}
166+
167+
/**
168+
* Sets the current AckListener.
169+
* A null argument is interpreted to mean "do not use an ack listener".
170+
*/
171+
public void setAckListener(AckListener listener) {
172+
ackListener = listener;
173+
}
174+
157175
/** Returns the current default consumer. */
158176
public Consumer getDefaultConsumer() {
159177
return defaultConsumer;
@@ -311,6 +329,17 @@ public void releaseChannelNumber() {
311329
}
312330
}
313331
return true;
332+
} else if (method instanceof Basic.Ack) {
333+
Basic.Ack ack = (Basic.Ack) method;
334+
AckListener l = getAckListener();
335+
if (l != null) {
336+
try {
337+
l.handleAck(ack.getDeliveryTag());
338+
} catch (Throwable ex) {
339+
_connection.getExceptionHandler().handleAckListenerException(this, ex);
340+
}
341+
}
342+
return true;
314343
} else if (method instanceof Basic.RecoverOk) {
315344
for (Consumer callback: _consumers.values()) {
316345
callback.handleRecoverOk();

src/com/rabbitmq/client/impl/DefaultExceptionHandler.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,10 @@ public void handleFlowListenerException(Channel channel, Throwable exception) {
5555
handleChannelKiller(channel, exception, "FlowListener.handleFlow");
5656
}
5757

58+
public void handleAckListenerException(Channel channel, Throwable exception) {
59+
handleChannelKiller(channel, exception, "AckListener.handleAck");
60+
}
61+
5862
public void handleConsumerException(Channel channel, Throwable exception,
5963
Consumer consumer, String consumerTag,
6064
String methodName)

src/com/rabbitmq/client/impl/ExceptionHandler.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,14 +59,24 @@ public interface ExceptionHandler {
5959

6060
/**
6161
* Perform any required exception processing for the situation
62-
* when the driver thread for the connection has called a
62+
* when the driver thread for the connection has called a
6363
* FlowListener's handleFlow method, and that method has
6464
* thrown an exeption.
6565
* @param channel the ChannelN that held the FlowListener
6666
* @param exception the exception thrown by FlowListener.handleFlow
6767
*/
6868
void handleFlowListenerException(Channel channel, Throwable exception);
6969

70+
/**
71+
* Perform any required exception processing for the situation
72+
* when the driver thread for the connection has called an
73+
* AckListener's handleAck method, and that method has
74+
* thrown an exeption.
75+
* @param channel the ChannelN that held the AckListener
76+
* @param exception the exception thrown by AckListener.handleAck
77+
*/
78+
void handleAckListenerException(Channel channel, Throwable exception);
79+
7080
/**
7181
* Perform any required exception processing for the situation
7282
* when the driver thread for the connection has called a method

0 commit comments

Comments
 (0)