Skip to content

Commit e23cbee

Browse files
author
Alexandru Scvortov
committed
merge default into bug20284
2 parents 83656b3 + f762b38 commit e23cbee

File tree

9 files changed

+464
-13
lines changed

9 files changed

+464
-13
lines changed
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
// The contents of this file are subject to the Mozilla Public License
2+
// Version 1.1 (the "License"); you may not use this file except in
3+
// compliance with the License. You may obtain a copy of the License at
4+
// http://www.mozilla.org/MPL/
5+
//
6+
// Software distributed under the License is distributed on an "AS IS"
7+
// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
8+
// License for the specific language governing rights and limitations
9+
// under the License.
10+
//
11+
// The Original Code is RabbitMQ.
12+
//
13+
// The Initial Developers of the Original Code are LShift Ltd,
14+
// Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
15+
//
16+
// Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
17+
// Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
18+
// are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
19+
// Technologies LLC, and Rabbit Technologies Ltd.
20+
//
21+
// Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
22+
// Ltd. Portions created by Cohesive Financial Technologies LLC are
23+
// Copyright (C) 2007-2010 Cohesive Financial Technologies
24+
// LLC. Portions created by Rabbit Technologies Ltd are Copyright
25+
// (C) 2007-2010 Rabbit Technologies Ltd.
26+
//
27+
// All Rights Reserved.
28+
//
29+
// Contributor(s): ______________________________________.
30+
//
31+
32+
package com.rabbitmq.client;
33+
34+
import java.io.IOException;
35+
36+
/**
37+
* Implement this interface in order to be notified of Basic.Ack
38+
* events.
39+
*/
40+
public interface AckListener {
41+
void handleAck(long deliveryTag, boolean multiple)
42+
throws IOException;
43+
}

src/com/rabbitmq/client/Channel.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import com.rabbitmq.client.AMQP.Queue;
3939
import com.rabbitmq.client.AMQP.Tx;
4040
import com.rabbitmq.client.AMQP.Basic;
41+
import com.rabbitmq.client.AMQP.Confirm;
4142
import com.rabbitmq.client.AMQP.Channel.FlowOk;
4243

4344
/**
@@ -147,6 +148,18 @@ public interface Channel extends ShutdownNotifier {
147148
*/
148149
void setFlowListener(FlowListener listener);
149150

151+
/**
152+
* Return the current {@link AckListener}.
153+
* @return an interface to the current ack listener.
154+
*/
155+
AckListener getAckListener();
156+
157+
/**
158+
* Set the current {@link AckListener}.
159+
* @param listener the listener to use, or null indicating "don't use one".
160+
*/
161+
void setAckListener(AckListener listener);
162+
150163
/**
151164
* Get the current default consumer. @see setDefaultConsumer for rationale.
152165
* @return an interface to the current default consumer.
@@ -633,4 +646,20 @@ Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, b
633646
* @throws java.io.IOException if an error is encountered
634647
*/
635648
Tx.RollbackOk txRollback() throws IOException;
649+
650+
/**
651+
* Enables publisher acknowledgements on this channel.
652+
* @param many determines whether the broker can acknowledge
653+
* multiple messages at the same time
654+
* @see com.rabbitmq.client.AMQP.Confirm.Select
655+
* @throws java.io.IOException if an error is encountered
656+
*/
657+
Confirm.SelectOk confirmSelect(boolean multiple) throws IOException;
658+
659+
/**
660+
* Returns the number of messages published since the channel was
661+
* put in confirm mode.
662+
* @return the number of messages published since the first
663+
* confirm.select */
664+
long getPublishedMessageCount();
636665
}

src/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131

3232
package com.rabbitmq.client.impl;
3333

34+
import com.rabbitmq.client.AckListener;
3435
import com.rabbitmq.client.AMQP.BasicProperties;
3536
import com.rabbitmq.client.AMQP;
3637
import com.rabbitmq.client.Command;
@@ -45,6 +46,7 @@
4546
import com.rabbitmq.client.UnexpectedMethodError;
4647
import com.rabbitmq.client.impl.AMQImpl.Basic;
4748
import com.rabbitmq.client.impl.AMQImpl.Channel;
49+
import com.rabbitmq.client.impl.AMQImpl.Confirm;
4850
import com.rabbitmq.client.impl.AMQImpl.Exchange;
4951
import com.rabbitmq.client.impl.AMQImpl.Queue;
5052
import com.rabbitmq.client.impl.AMQImpl.Tx;
@@ -98,6 +100,14 @@ public class ChannelN extends AMQChannel implements com.rabbitmq.client.Channel
98100
*/
99101
public volatile FlowListener flowListener = null;
100102

103+
/** Reference to the currently-active AckListener, or null if there is none.
104+
*/
105+
public volatile AckListener ackListener = null;
106+
107+
/** Current published message count (used by publisher acknowledgements)
108+
*/
109+
private long publishedMessageCount;
110+
101111
/** Reference to the currently-active default consumer, or null if there is
102112
* none.
103113
*/
@@ -153,6 +163,19 @@ public void setFlowListener(FlowListener listener) {
153163
flowListener = listener;
154164
}
155165

166+
/** Returns the current AckListener. */
167+
public AckListener getAckListener() {
168+
return ackListener;
169+
}
170+
171+
/**
172+
* Sets the current AckListener.
173+
* A null argument is interpreted to mean "do not use an ack listener".
174+
*/
175+
public void setAckListener(AckListener listener) {
176+
ackListener = listener;
177+
}
178+
156179
/** Returns the current default consumer. */
157180
public Consumer getDefaultConsumer() {
158181
return defaultConsumer;
@@ -310,6 +333,17 @@ public void releaseChannelNumber() {
310333
}
311334
}
312335
return true;
336+
} else if (method instanceof Basic.Ack) {
337+
Basic.Ack ack = (Basic.Ack) method;
338+
AckListener l = getAckListener();
339+
if (l != null) {
340+
try {
341+
l.handleAck(ack.getDeliveryTag(), ack.getMultiple());
342+
} catch (Throwable ex) {
343+
_connection.getExceptionHandler().handleAckListenerException(this, ex);
344+
}
345+
}
346+
return true;
313347
} else if (method instanceof Basic.RecoverOk) {
314348
for (Consumer callback: _consumers.values()) {
315349
callback.handleRecoverOk();
@@ -463,6 +497,10 @@ public void basicPublish(String exchange, String routingKey,
463497
BasicProperties props, byte[] body)
464498
throws IOException
465499
{
500+
synchronized (_channelMutex) {
501+
if (publishedMessageCount >= 0)
502+
++publishedMessageCount;
503+
}
466504
BasicProperties useProps = props;
467505
if (props == null) {
468506
useProps = MessageProperties.MINIMAL_BASIC;
@@ -805,6 +843,19 @@ public Tx.RollbackOk txRollback()
805843
return (Tx.RollbackOk) exnWrappingRpc(new Tx.Rollback()).getMethod();
806844
}
807845

846+
/** Public API - {@inheritDoc} */
847+
public Confirm.SelectOk confirmSelect(boolean multiple)
848+
throws IOException
849+
{
850+
synchronized (_channelMutex) {
851+
if (publishedMessageCount == -1)
852+
publishedMessageCount = 0;
853+
}
854+
return (Confirm.SelectOk)
855+
exnWrappingRpc(new Confirm.Select(multiple, false)).getMethod();
856+
857+
}
858+
808859
/** Public API - {@inheritDoc} */
809860
public Channel.FlowOk flow(final boolean a) throws IOException {
810861
return (Channel.FlowOk) exnWrappingRpc(new Channel.Flow() {{active = a;}}).getMethod();
@@ -815,4 +866,8 @@ public Channel.FlowOk getFlow() {
815866
return new Channel.FlowOk(!_blockContent);
816867
}
817868

869+
/** Public API - {@inheritDoc} */
870+
public long getPublishedMessageCount() {
871+
return publishedMessageCount;
872+
}
818873
}

src/com/rabbitmq/client/impl/DefaultExceptionHandler.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,10 @@ public void handleFlowListenerException(Channel channel, Throwable exception) {
5555
handleChannelKiller(channel, exception, "FlowListener.handleFlow");
5656
}
5757

58+
public void handleAckListenerException(Channel channel, Throwable exception) {
59+
handleChannelKiller(channel, exception, "AckListener.handleAck");
60+
}
61+
5862
public void handleConsumerException(Channel channel, Throwable exception,
5963
Consumer consumer, String consumerTag,
6064
String methodName)

src/com/rabbitmq/client/impl/ExceptionHandler.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,14 +59,24 @@ public interface ExceptionHandler {
5959

6060
/**
6161
* Perform any required exception processing for the situation
62-
* when the driver thread for the connection has called a
62+
* when the driver thread for the connection has called a
6363
* FlowListener's handleFlow method, and that method has
6464
* thrown an exeption.
6565
* @param channel the ChannelN that held the FlowListener
6666
* @param exception the exception thrown by FlowListener.handleFlow
6767
*/
6868
void handleFlowListenerException(Channel channel, Throwable exception);
6969

70+
/**
71+
* Perform any required exception processing for the situation
72+
* when the driver thread for the connection has called an
73+
* AckListener's handleAck method, and that method has
74+
* thrown an exeption.
75+
* @param channel the ChannelN that held the AckListener
76+
* @param exception the exception thrown by AckListener.handleAck
77+
*/
78+
void handleAckListenerException(Channel channel, Throwable exception);
79+
7080
/**
7181
* Perform any required exception processing for the situation
7282
* when the driver thread for the connection has called a method

test/src/com/rabbitmq/client/test/AMQConnectionTest.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,10 @@ public void handleFlowListenerException(Channel ch, Throwable ex) {
201201
fail("handleFlowListenerException: " + ex);
202202
}
203203

204+
public void handleAckListenerException(Channel ch, Throwable ex) {
205+
fail("handleAckListenerException: " + ex);
206+
}
207+
204208
public void handleConsumerException(Channel ch,
205209
Throwable ex,
206210
Consumer c,

0 commit comments

Comments
 (0)