Skip to content

Commit 6448483

Browse files
author
Alexandru Scvortov
committed
merge default into bug20284
2 parents 4a66bc5 + e6d8f36 commit 6448483

File tree

9 files changed

+331
-13
lines changed

9 files changed

+331
-13
lines changed
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
// The contents of this file are subject to the Mozilla Public License
2+
// Version 1.1 (the "License"); you may not use this file except in
3+
// compliance with the License. You may obtain a copy of the License at
4+
// http://www.mozilla.org/MPL/
5+
//
6+
// Software distributed under the License is distributed on an "AS IS"
7+
// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
8+
// License for the specific language governing rights and limitations
9+
// under the License.
10+
//
11+
// The Original Code is RabbitMQ.
12+
//
13+
// The Initial Developers of the Original Code are LShift Ltd,
14+
// Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
15+
//
16+
// Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
17+
// Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
18+
// are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
19+
// Technologies LLC, and Rabbit Technologies Ltd.
20+
//
21+
// Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
22+
// Ltd. Portions created by Cohesive Financial Technologies LLC are
23+
// Copyright (C) 2007-2010 Cohesive Financial Technologies
24+
// LLC. Portions created by Rabbit Technologies Ltd are Copyright
25+
// (C) 2007-2010 Rabbit Technologies Ltd.
26+
//
27+
// All Rights Reserved.
28+
//
29+
// Contributor(s): ______________________________________.
30+
//
31+
32+
package com.rabbitmq.client;
33+
34+
import java.io.IOException;
35+
36+
/**
37+
* Implement this interface in order to be notified of Basic.Ack
38+
* events.
39+
*/
40+
public interface AckListener {
41+
void handleAck(long deliveryTag, boolean multiple)
42+
throws IOException;
43+
}

src/com/rabbitmq/client/Channel.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import com.rabbitmq.client.AMQP.Queue;
3939
import com.rabbitmq.client.AMQP.Tx;
4040
import com.rabbitmq.client.AMQP.Basic;
41+
import com.rabbitmq.client.AMQP.Confirm;
4142
import com.rabbitmq.client.AMQP.Channel.FlowOk;
4243

4344
/**
@@ -147,6 +148,18 @@ public interface Channel extends ShutdownNotifier {
147148
*/
148149
void setFlowListener(FlowListener listener);
149150

151+
/**
152+
* Return the current {@link AckListener}.
153+
* @return an interface to the current ack listener.
154+
*/
155+
AckListener getAckListener();
156+
157+
/**
158+
* Set the current {@link AckListener}.
159+
* @param listener the listener to use, or null indicating "don't use one".
160+
*/
161+
void setAckListener(AckListener listener);
162+
150163
/**
151164
* Get the current default consumer. @see setDefaultConsumer for rationale.
152165
* @return an interface to the current default consumer.
@@ -624,4 +637,20 @@ Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, b
624637
* @throws java.io.IOException if an error is encountered
625638
*/
626639
Tx.RollbackOk txRollback() throws IOException;
640+
641+
/**
642+
* Enables publisher acknowledgements on this channel.
643+
* @param many determines whether the broker can acknowledge
644+
* multiple messages at the same time
645+
* @see com.rabbitmq.client.AMQP.Confirm.Select
646+
* @throws java.io.IOException if an error is encountered
647+
*/
648+
Confirm.SelectOk confirmSelect(boolean multiple) throws IOException;
649+
650+
/**
651+
* Returns the number of messages published since the channel was
652+
* put in confirm mode.
653+
* @return the number of messages published since the first
654+
* confirm.select */
655+
long getPublishedMessageCount();
627656
}

src/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131

3232
package com.rabbitmq.client.impl;
3333

34+
import com.rabbitmq.client.AckListener;
3435
import com.rabbitmq.client.AMQP.BasicProperties;
3536
import com.rabbitmq.client.AMQP;
3637
import com.rabbitmq.client.Command;
@@ -45,6 +46,7 @@
4546
import com.rabbitmq.client.UnexpectedMethodError;
4647
import com.rabbitmq.client.impl.AMQImpl.Basic;
4748
import com.rabbitmq.client.impl.AMQImpl.Channel;
49+
import com.rabbitmq.client.impl.AMQImpl.Confirm;
4850
import com.rabbitmq.client.impl.AMQImpl.Exchange;
4951
import com.rabbitmq.client.impl.AMQImpl.Queue;
5052
import com.rabbitmq.client.impl.AMQImpl.Tx;
@@ -98,6 +100,14 @@ public class ChannelN extends AMQChannel implements com.rabbitmq.client.Channel
98100
*/
99101
public volatile FlowListener flowListener = null;
100102

103+
/** Reference to the currently-active AckListener, or null if there is none.
104+
*/
105+
public volatile AckListener ackListener = null;
106+
107+
/** Current published message count (used by publisher acknowledgements)
108+
*/
109+
private long publishedMessageCount;
110+
101111
/** Reference to the currently-active default consumer, or null if there is
102112
* none.
103113
*/
@@ -153,6 +163,19 @@ public void setFlowListener(FlowListener listener) {
153163
flowListener = listener;
154164
}
155165

166+
/** Returns the current AckListener. */
167+
public AckListener getAckListener() {
168+
return ackListener;
169+
}
170+
171+
/**
172+
* Sets the current AckListener.
173+
* A null argument is interpreted to mean "do not use an ack listener".
174+
*/
175+
public void setAckListener(AckListener listener) {
176+
ackListener = listener;
177+
}
178+
156179
/** Returns the current default consumer. */
157180
public Consumer getDefaultConsumer() {
158181
return defaultConsumer;
@@ -310,6 +333,17 @@ public void releaseChannelNumber() {
310333
}
311334
}
312335
return true;
336+
} else if (method instanceof Basic.Ack) {
337+
Basic.Ack ack = (Basic.Ack) method;
338+
AckListener l = getAckListener();
339+
if (l != null) {
340+
try {
341+
l.handleAck(ack.getDeliveryTag(), ack.getMultiple());
342+
} catch (Throwable ex) {
343+
_connection.getExceptionHandler().handleAckListenerException(this, ex);
344+
}
345+
}
346+
return true;
313347
} else if (method instanceof Basic.RecoverOk) {
314348
for (Consumer callback: _consumers.values()) {
315349
callback.handleRecoverOk();
@@ -463,6 +497,10 @@ public void basicPublish(String exchange, String routingKey,
463497
BasicProperties props, byte[] body)
464498
throws IOException
465499
{
500+
synchronized (_channelMutex) {
501+
if (publishedMessageCount >= 0)
502+
++publishedMessageCount;
503+
}
466504
BasicProperties useProps = props;
467505
if (props == null) {
468506
useProps = MessageProperties.MINIMAL_BASIC;
@@ -805,6 +843,19 @@ public Tx.RollbackOk txRollback()
805843
return (Tx.RollbackOk) exnWrappingRpc(new Tx.Rollback()).getMethod();
806844
}
807845

846+
/** Public API - {@inheritDoc} */
847+
public Confirm.SelectOk confirmSelect(boolean multiple)
848+
throws IOException
849+
{
850+
synchronized (_channelMutex) {
851+
if (publishedMessageCount == -1)
852+
publishedMessageCount = 0;
853+
}
854+
return (Confirm.SelectOk)
855+
exnWrappingRpc(new Confirm.Select(multiple, false)).getMethod();
856+
857+
}
858+
808859
/** Public API - {@inheritDoc} */
809860
public Channel.FlowOk flow(final boolean a) throws IOException {
810861
return (Channel.FlowOk) exnWrappingRpc(new Channel.Flow() {{active = a;}}).getMethod();
@@ -815,4 +866,8 @@ public Channel.FlowOk getFlow() {
815866
return new Channel.FlowOk(!_blockContent);
816867
}
817868

869+
/** Public API - {@inheritDoc} */
870+
public long getPublishedMessageCount() {
871+
return publishedMessageCount;
872+
}
818873
}

src/com/rabbitmq/client/impl/DefaultExceptionHandler.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,10 @@ public void handleFlowListenerException(Channel channel, Throwable exception) {
5555
handleChannelKiller(channel, exception, "FlowListener.handleFlow");
5656
}
5757

58+
public void handleAckListenerException(Channel channel, Throwable exception) {
59+
handleChannelKiller(channel, exception, "AckListener.handleAck");
60+
}
61+
5862
public void handleConsumerException(Channel channel, Throwable exception,
5963
Consumer consumer, String consumerTag,
6064
String methodName)

src/com/rabbitmq/client/impl/ExceptionHandler.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,14 +59,24 @@ public interface ExceptionHandler {
5959

6060
/**
6161
* Perform any required exception processing for the situation
62-
* when the driver thread for the connection has called a
62+
* when the driver thread for the connection has called a
6363
* FlowListener's handleFlow method, and that method has
6464
* thrown an exeption.
6565
* @param channel the ChannelN that held the FlowListener
6666
* @param exception the exception thrown by FlowListener.handleFlow
6767
*/
6868
void handleFlowListenerException(Channel channel, Throwable exception);
6969

70+
/**
71+
* Perform any required exception processing for the situation
72+
* when the driver thread for the connection has called an
73+
* AckListener's handleAck method, and that method has
74+
* thrown an exeption.
75+
* @param channel the ChannelN that held the AckListener
76+
* @param exception the exception thrown by AckListener.handleAck
77+
*/
78+
void handleAckListenerException(Channel channel, Throwable exception);
79+
7080
/**
7181
* Perform any required exception processing for the situation
7282
* when the driver thread for the connection has called a method

test/src/com/rabbitmq/client/test/AMQConnectionTest.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,10 @@ public void handleFlowListenerException(Channel ch, Throwable ex) {
201201
fail("handleFlowListenerException: " + ex);
202202
}
203203

204+
public void handleAckListenerException(Channel ch, Throwable ex) {
205+
fail("handleAckListenerException: " + ex);
206+
}
207+
204208
public void handleConsumerException(Channel ch,
205209
Throwable ex,
206210
Consumer c,
Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
// The contents of this file are subject to the Mozilla Public License
2+
// Version 1.1 (the "License"); you may not use this file except in
3+
// compliance with the License. You may obtain a copy of the License at
4+
// http://www.mozilla.org/MPL/
5+
//
6+
// Software distributed under the License is distributed on an "AS IS"
7+
// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
8+
// License for the specific language governing rights and limitations
9+
// under the License.
10+
//
11+
// The Original Code is RabbitMQ.
12+
//
13+
// The Initial Developers of the Original Code are LShift Ltd,
14+
// Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
15+
//
16+
// Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
17+
// Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
18+
// are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
19+
// Technologies LLC, and Rabbit Technologies Ltd.
20+
//
21+
// Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
22+
// Ltd. Portions created by Cohesive Financial Technologies LLC are
23+
// Copyright (C) 2007-2010 Cohesive Financial Technologies
24+
// LLC. Portions created by Rabbit Technologies Ltd are Copyright
25+
// (C) 2007-2010 Rabbit Technologies Ltd.
26+
//
27+
// All Rights Reserved.
28+
//
29+
// Contributor(s): ______________________________________.
30+
//
31+
32+
package com.rabbitmq.client.test.functional;
33+
34+
import com.rabbitmq.client.test.BrokerTestCase;
35+
import com.rabbitmq.client.AMQP;
36+
import com.rabbitmq.client.AckListener;
37+
import com.rabbitmq.client.DefaultConsumer;
38+
import com.rabbitmq.client.MessageProperties;
39+
40+
import java.io.IOException;
41+
import java.util.Set;
42+
import java.util.TreeSet;
43+
44+
public class Confirm extends BrokerTestCase
45+
{
46+
final static int NUM_MESSAGES = 1000;
47+
volatile Set<Long> ackSet;
48+
49+
@Override
50+
protected void setUp() throws IOException {
51+
super.setUp();
52+
ackSet = new TreeSet<Long>();
53+
final Confirm This = this;
54+
channel.setAckListener(new AckListener() {
55+
public void handleAck(long seqNo,
56+
boolean multiple) {
57+
if (multiple) {
58+
for (int i = 0; i <= seqNo; ++i)
59+
This.gotAckFor(i);
60+
} else {
61+
This.gotAckFor(seqNo);
62+
}
63+
}
64+
});
65+
channel.confirmSelect(true);
66+
channel.queueDeclare("confirm-test", true, true, true, null);
67+
channel.basicConsume("confirm-test", true, new DefaultConsumer(channel));
68+
channel.queueDeclare("confirm-test-noconsumer", true, true, true, null);
69+
}
70+
71+
public void testConfirmTransient() throws IOException, InterruptedException {
72+
confirmTest("consumer-test", false, false, false);
73+
}
74+
75+
public void testConfirmPersistentSimple()
76+
throws IOException, InterruptedException
77+
{
78+
confirmTest("consumer-test", true, false, false);
79+
}
80+
81+
public void testConfirmPersistentImmediate()
82+
throws IOException, InterruptedException
83+
{
84+
confirmTest("consumer-test", true, false, true);
85+
}
86+
87+
public void testConfirmPersistentImmediateNoConsumer()
88+
throws IOException, InterruptedException
89+
{
90+
confirmTest("consumer-test-noconsumer", true, false, true);
91+
}
92+
93+
public void testConfirmPersistentMandatory()
94+
throws IOException, InterruptedException
95+
{
96+
confirmTest("consumer-test", true, true, false);
97+
}
98+
99+
public void testConfirmPersistentMandatoryReturn()
100+
throws IOException, InterruptedException
101+
{
102+
confirmTest("consumer-test-doesnotexist", true, true, false);
103+
}
104+
105+
/* Publish NUM_MESSAGES persistent messages and wait for
106+
* confirmations. */
107+
public void confirmTest(String queueName, boolean persistent,
108+
boolean mandatory, boolean immediate)
109+
throws IOException, InterruptedException
110+
{
111+
for (long i = 0; i < NUM_MESSAGES; i++) {
112+
publish(queueName, persistent, mandatory, immediate);
113+
ackSet.add(i);
114+
}
115+
116+
while (ackSet.size() > 0)
117+
Thread.sleep(10);
118+
}
119+
120+
private void publish(String queueName, boolean persistent,
121+
boolean mandatory, boolean immediate)
122+
throws IOException
123+
{
124+
channel.basicPublish("", queueName, mandatory, immediate,
125+
persistent ? MessageProperties.PERSISTENT_BASIC
126+
: MessageProperties.BASIC,
127+
"nop".getBytes());
128+
}
129+
130+
private synchronized void gotAckFor(long msgSeqNo) {
131+
ackSet.remove(msgSeqNo);
132+
}
133+
}

test/src/com/rabbitmq/client/test/functional/FunctionalTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ public static TestSuite suite() {
6868
suite.addTestSuite(DefaultExchange.class);
6969
suite.addTestSuite(UnbindAutoDeleteExchange.class);
7070
suite.addTestSuite(RecoverAfterCancel.class);
71+
suite.addTestSuite(Confirm.class);
7172
suite.addTestSuite(UnexpectedFrames.class);
7273
return suite;
7374
}

0 commit comments

Comments
 (0)