Skip to content

Commit ef6459e

Browse files
author
Simon MacMullen
committed
Merge from default
2 parents 9a4936e + db65727 commit ef6459e

23 files changed

+703
-75
lines changed
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
// The contents of this file are subject to the Mozilla Public License
2+
// Version 1.1 (the "License"); you may not use this file except in
3+
// compliance with the License. You may obtain a copy of the License at
4+
// http://www.mozilla.org/MPL/
5+
//
6+
// Software distributed under the License is distributed on an "AS IS"
7+
// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
8+
// License for the specific language governing rights and limitations
9+
// under the License.
10+
//
11+
// The Original Code is RabbitMQ.
12+
//
13+
// The Initial Developers of the Original Code are LShift Ltd,
14+
// Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
15+
//
16+
// Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
17+
// Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
18+
// are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
19+
// Technologies LLC, and Rabbit Technologies Ltd.
20+
//
21+
// Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
22+
// Ltd. Portions created by Cohesive Financial Technologies LLC are
23+
// Copyright (C) 2007-2010 Cohesive Financial Technologies
24+
// LLC. Portions created by Rabbit Technologies Ltd are Copyright
25+
// (C) 2007-2010 Rabbit Technologies Ltd.
26+
//
27+
// All Rights Reserved.
28+
//
29+
// Contributor(s): ______________________________________.
30+
//
31+
32+
package com.rabbitmq.client;
33+
34+
import java.io.IOException;
35+
36+
/**
37+
* Implement this interface in order to be notified of Basic.Ack
38+
* events.
39+
*/
40+
public interface AckListener {
41+
void handleAck(long deliveryTag, boolean multiple)
42+
throws IOException;
43+
}

src/com/rabbitmq/client/Channel.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import com.rabbitmq.client.AMQP.Queue;
3939
import com.rabbitmq.client.AMQP.Tx;
4040
import com.rabbitmq.client.AMQP.Basic;
41+
import com.rabbitmq.client.AMQP.Confirm;
4142
import com.rabbitmq.client.AMQP.Channel.FlowOk;
4243

4344
/**
@@ -147,6 +148,18 @@ public interface Channel extends ShutdownNotifier {
147148
*/
148149
void setFlowListener(FlowListener listener);
149150

151+
/**
152+
* Return the current {@link AckListener}.
153+
* @return an interface to the current ack listener.
154+
*/
155+
AckListener getAckListener();
156+
157+
/**
158+
* Set the current {@link AckListener}.
159+
* @param listener the listener to use, or null indicating "don't use one".
160+
*/
161+
void setAckListener(AckListener listener);
162+
150163
/**
151164
* Get the current default consumer. @see setDefaultConsumer for rationale.
152165
* @return an interface to the current default consumer.
@@ -585,6 +598,17 @@ Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, b
585598
*/
586599
void basicCancel(String consumerTag) throws IOException;
587600

601+
/**
602+
* Ask the broker to resend unacknowledged messages. In 0-8
603+
* basic.recover is asynchronous; in 0-9-1 it is synchronous, and
604+
* the new, deprecated method basic.recover_async is asynchronous.
605+
* <p/>
606+
* Equivalent to calling <code>basicRecover(true)</code>, messages
607+
* will be requeued and possibly delivered to a different consumer.
608+
* @see #basicRecover(boolean)
609+
*/
610+
Basic.RecoverOk basicRecover() throws IOException;
611+
588612
/**
589613
* Ask the broker to resend unacknowledged messages. In 0-8
590614
* basic.recover is asynchronous; in 0-9-1 it is synchronous, and
@@ -633,4 +657,21 @@ Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, b
633657
* @throws java.io.IOException if an error is encountered
634658
*/
635659
Tx.RollbackOk txRollback() throws IOException;
660+
661+
/**
662+
* Enables publisher acknowledgements on this channel.
663+
* @param multiple determines whether the broker can acknowledge
664+
* multiple messages at the same time
665+
* @see com.rabbitmq.client.AMQP.Confirm.Select
666+
* @throws java.io.IOException if an error is encountered
667+
*/
668+
Confirm.SelectOk confirmSelect(boolean multiple) throws IOException;
669+
670+
/**
671+
* Returns the number of messages published since the channel was
672+
* put in confirm mode.
673+
* @return the number of messages published since the first
674+
* confirm.select; if the channel is not in confirm mode, -1 is
675+
* returned */
676+
long getPublishedMessageCount();
636677
}

src/com/rabbitmq/client/impl/AMQConnection.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -227,8 +227,8 @@ public AMQConnection(ConnectionFactory factory,
227227
* calls Connection.Open and waits for the OpenOk. Sets heartbeat
228228
* and frame max values after tuning has taken place.
229229
* @throws java.io.IOException if an error is encountered; IOException
230-
* subtypes ProtocolVersionMismatchException and
231-
* PossibleAuthenticationFailureException will be thrown in the
230+
* subtypes {@link ProtocolVersionMismatchException} and
231+
* {@link PossibleAuthenticationFailureException} will be thrown in the
232232
* corresponding circumstances.
233233
*/
234234
public void start()

src/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131

3232
package com.rabbitmq.client.impl;
3333

34+
import com.rabbitmq.client.AckListener;
3435
import com.rabbitmq.client.AMQP.BasicProperties;
3536
import com.rabbitmq.client.AMQP;
3637
import com.rabbitmq.client.Command;
@@ -45,6 +46,7 @@
4546
import com.rabbitmq.client.UnexpectedMethodError;
4647
import com.rabbitmq.client.impl.AMQImpl.Basic;
4748
import com.rabbitmq.client.impl.AMQImpl.Channel;
49+
import com.rabbitmq.client.impl.AMQImpl.Confirm;
4850
import com.rabbitmq.client.impl.AMQImpl.Exchange;
4951
import com.rabbitmq.client.impl.AMQImpl.Queue;
5052
import com.rabbitmq.client.impl.AMQImpl.Tx;
@@ -54,6 +56,7 @@
5456
import java.util.Collections;
5557
import java.util.HashMap;
5658
import java.util.Map;
59+
import java.util.concurrent.atomic.AtomicLong;
5760
import java.util.concurrent.TimeoutException;
5861

5962

@@ -98,6 +101,14 @@ public class ChannelN extends AMQChannel implements com.rabbitmq.client.Channel
98101
*/
99102
public volatile FlowListener flowListener = null;
100103

104+
/** Reference to the currently-active AckListener, or null if there is none.
105+
*/
106+
public volatile AckListener ackListener = null;
107+
108+
/** Current published message count (used by publisher acknowledgements)
109+
*/
110+
private final AtomicLong publishedMessageCount = new AtomicLong(-1);
111+
101112
/** Reference to the currently-active default consumer, or null if there is
102113
* none.
103114
*/
@@ -153,6 +164,19 @@ public void setFlowListener(FlowListener listener) {
153164
flowListener = listener;
154165
}
155166

167+
/** Returns the current AckListener. */
168+
public AckListener getAckListener() {
169+
return ackListener;
170+
}
171+
172+
/**
173+
* Sets the current AckListener.
174+
* A null argument is interpreted to mean "do not use an ack listener".
175+
*/
176+
public void setAckListener(AckListener listener) {
177+
ackListener = listener;
178+
}
179+
156180
/** Returns the current default consumer. */
157181
public Consumer getDefaultConsumer() {
158182
return defaultConsumer;
@@ -310,6 +334,17 @@ public void releaseChannelNumber() {
310334
}
311335
}
312336
return true;
337+
} else if (method instanceof Basic.Ack) {
338+
Basic.Ack ack = (Basic.Ack) method;
339+
AckListener l = getAckListener();
340+
if (l != null) {
341+
try {
342+
l.handleAck(ack.getDeliveryTag(), ack.getMultiple());
343+
} catch (Throwable ex) {
344+
_connection.getExceptionHandler().handleAckListenerException(this, ex);
345+
}
346+
}
347+
return true;
313348
} else if (method instanceof Basic.RecoverOk) {
314349
for (Consumer callback: _consumers.values()) {
315350
callback.handleRecoverOk();
@@ -463,6 +498,8 @@ public void basicPublish(String exchange, String routingKey,
463498
BasicProperties props, byte[] body)
464499
throws IOException
465500
{
501+
if (publishedMessageCount.get() >= 0)
502+
publishedMessageCount.incrementAndGet();
466503
BasicProperties useProps = props;
467504
if (props == null) {
468505
useProps = MessageProperties.MINIMAL_BASIC;
@@ -553,7 +590,7 @@ public Exchange.UnbindOk exchangeUnbind(String destination, String source,
553590
String routingKey) throws IOException {
554591
return exchangeUnbind(destination, source, routingKey, null);
555592
}
556-
593+
557594
/** Public API - {@inheritDoc} */
558595
public Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive,
559596
boolean autoDelete, Map<String, Object> arguments)
@@ -769,6 +806,14 @@ public Consumer transformReply(AMQCommand replyCommand) {
769806
}
770807
}
771808

809+
810+
/** Public API - {@inheritDoc} */
811+
public Basic.RecoverOk basicRecover()
812+
throws IOException
813+
{
814+
return basicRecover(true);
815+
}
816+
772817
/** Public API - {@inheritDoc} */
773818
public Basic.RecoverOk basicRecover(boolean requeue)
774819
throws IOException
@@ -805,6 +850,17 @@ public Tx.RollbackOk txRollback()
805850
return (Tx.RollbackOk) exnWrappingRpc(new Tx.Rollback()).getMethod();
806851
}
807852

853+
/** Public API - {@inheritDoc} */
854+
public Confirm.SelectOk confirmSelect(boolean multiple)
855+
throws IOException
856+
{
857+
if (publishedMessageCount.get() == -1)
858+
publishedMessageCount.set(0);
859+
return (Confirm.SelectOk)
860+
exnWrappingRpc(new Confirm.Select(multiple, false)).getMethod();
861+
862+
}
863+
808864
/** Public API - {@inheritDoc} */
809865
public Channel.FlowOk flow(final boolean a) throws IOException {
810866
return (Channel.FlowOk) exnWrappingRpc(new Channel.Flow() {{active = a;}}).getMethod();
@@ -815,4 +871,8 @@ public Channel.FlowOk getFlow() {
815871
return new Channel.FlowOk(!_blockContent);
816872
}
817873

874+
/** Public API - {@inheritDoc} */
875+
public long getPublishedMessageCount() {
876+
return publishedMessageCount.longValue();
877+
}
818878
}

src/com/rabbitmq/client/impl/DefaultExceptionHandler.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,10 @@ public void handleFlowListenerException(Channel channel, Throwable exception) {
5555
handleChannelKiller(channel, exception, "FlowListener.handleFlow");
5656
}
5757

58+
public void handleAckListenerException(Channel channel, Throwable exception) {
59+
handleChannelKiller(channel, exception, "AckListener.handleAck");
60+
}
61+
5862
public void handleConsumerException(Channel channel, Throwable exception,
5963
Consumer consumer, String consumerTag,
6064
String methodName)

src/com/rabbitmq/client/impl/ExceptionHandler.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,14 +59,24 @@ public interface ExceptionHandler {
5959

6060
/**
6161
* Perform any required exception processing for the situation
62-
* when the driver thread for the connection has called a
62+
* when the driver thread for the connection has called a
6363
* FlowListener's handleFlow method, and that method has
6464
* thrown an exeption.
6565
* @param channel the ChannelN that held the FlowListener
6666
* @param exception the exception thrown by FlowListener.handleFlow
6767
*/
6868
void handleFlowListenerException(Channel channel, Throwable exception);
6969

70+
/**
71+
* Perform any required exception processing for the situation
72+
* when the driver thread for the connection has called an
73+
* AckListener's handleAck method, and that method has
74+
* thrown an exeption.
75+
* @param channel the ChannelN that held the AckListener
76+
* @param exception the exception thrown by AckListener.handleAck
77+
*/
78+
void handleAckListenerException(Channel channel, Throwable exception);
79+
7080
/**
7181
* Perform any required exception processing for the situation
7282
* when the driver thread for the connection has called a method

test/src/com/rabbitmq/client/test/AMQConnectionTest.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,10 @@ public void handleFlowListenerException(Channel ch, Throwable ex) {
201201
fail("handleFlowListenerException: " + ex);
202202
}
203203

204+
public void handleAckListenerException(Channel ch, Throwable ex) {
205+
fail("handleAckListenerException: " + ex);
206+
}
207+
204208
public void handleConsumerException(Channel ch,
205209
Throwable ex,
206210
Consumer c,

0 commit comments

Comments
 (0)