Skip to content

Commit 945c3c2

Browse files
author
Matthew Sackman
committed
Merging bug23616 to default
2 parents 65c0336 + 17f4462 commit 945c3c2

File tree

4 files changed

+13
-16
lines changed

4 files changed

+13
-16
lines changed

src/com/rabbitmq/client/Channel.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -688,10 +688,9 @@ Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, b
688688
Confirm.SelectOk confirmSelect() throws IOException;
689689

690690
/**
691-
* Returns the number of messages published since the channel was
692-
* put in confirm mode.
693-
* @return the number of messages published since the first
694-
* confirm.select; if the channel is not in confirm mode, -1 is
695-
* returned */
696-
long getPublishedMessageCount();
691+
* When in confirm mode, returns the sequence number of the next
692+
* message to be published.
693+
* @return the sequence number of the next message to be published
694+
*/
695+
long getNextPublishSeqNo();
697696
}

src/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -105,9 +105,9 @@ public class ChannelN extends AMQChannel implements com.rabbitmq.client.Channel
105105
*/
106106
public volatile AckListener ackListener = null;
107107

108-
/** Current published message count (used by publisher acknowledgements)
108+
/** Sequence number of next published message requiring confirmation.
109109
*/
110-
private final AtomicLong publishedMessageCount = new AtomicLong(-1);
110+
private long nextPublishSeqNo = 0L;
111111

112112
/** Reference to the currently-active default consumer, or null if there is
113113
* none.
@@ -498,8 +498,7 @@ public void basicPublish(String exchange, String routingKey,
498498
BasicProperties props, byte[] body)
499499
throws IOException
500500
{
501-
if (publishedMessageCount.get() >= 0)
502-
publishedMessageCount.incrementAndGet();
501+
if (nextPublishSeqNo > 0) nextPublishSeqNo++;
503502
BasicProperties useProps = props;
504503
if (props == null) {
505504
useProps = MessageProperties.MINIMAL_BASIC;
@@ -868,8 +867,7 @@ public Tx.RollbackOk txRollback()
868867
public Confirm.SelectOk confirmSelect()
869868
throws IOException
870869
{
871-
if (publishedMessageCount.get() == -1)
872-
publishedMessageCount.set(0);
870+
if (nextPublishSeqNo == 0) nextPublishSeqNo = 1;
873871
return (Confirm.SelectOk)
874872
exnWrappingRpc(new Confirm.Select(false)).getMethod();
875873

@@ -886,7 +884,7 @@ public Channel.FlowOk getFlow() {
886884
}
887885

888886
/** Public API - {@inheritDoc} */
889-
public long getPublishedMessageCount() {
890-
return publishedMessageCount.longValue();
887+
public long getNextPublishSeqNo() {
888+
return nextPublishSeqNo;
891889
}
892890
}

test/src/com/rabbitmq/client/test/functional/Confirm.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -254,7 +254,7 @@ private void publishN(String exchangeName, String queueName,
254254
throws IOException
255255
{
256256
for (long i = 0; i < NUM_MESSAGES; i++) {
257-
ackSet.add(i);
257+
ackSet.add(channel.getNextPublishSeqNo());
258258
publish(exchangeName, queueName, persistent, mandatory, immediate);
259259
}
260260
}

test/src/com/rabbitmq/examples/MulticastMain.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -344,7 +344,7 @@ private void publish(byte[] msg)
344344
}
345345

346346
private boolean throttlePubAck() {
347-
return ((pubAckCount > 0) && (channel.getPublishedMessageCount() - mostRecentAcked > pubAckCount));
347+
return ((pubAckCount > 0) && (channel.getNextPublishSeqNo() - mostRecentAcked > pubAckCount));
348348
}
349349

350350
private void delay(long now)

0 commit comments

Comments
 (0)