Skip to content

Commit 5b2e2ae

Browse files
committed
better naming for publish sequence number
also, there is on need to make the var atomic. It is only ever accessed by application threads and our general rules for Channel usage dictate that there must only ever be one of these per channel.
1 parent 65c0336 commit 5b2e2ae

File tree

3 files changed

+14
-15
lines changed

3 files changed

+14
-15
lines changed

src/com/rabbitmq/client/Channel.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -688,10 +688,11 @@ Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, b
688688
Confirm.SelectOk confirmSelect() throws IOException;
689689

690690
/**
691-
* Returns the number of messages published since the channel was
692-
* put in confirm mode.
693-
* @return the number of messages published since the first
694-
* confirm.select; if the channel is not in confirm mode, -1 is
695-
* returned */
696-
long getPublishedMessageCount();
691+
* Returns the sequence number of the next message to be published
692+
* that requires confirmation.
693+
* @return the sequence number of the next message to be published
694+
* that requires confirmation. 0 if the channel is not in confirm mode.
695+
* not in confirm mode
696+
*/
697+
long getNextPublishSeqNo();
697698
}

src/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -105,9 +105,9 @@ public class ChannelN extends AMQChannel implements com.rabbitmq.client.Channel
105105
*/
106106
public volatile AckListener ackListener = null;
107107

108-
/** Current published message count (used by publisher acknowledgements)
108+
/** Sequence number of next published message requiring confirmation.
109109
*/
110-
private final AtomicLong publishedMessageCount = new AtomicLong(-1);
110+
private long nextPublishSeqNo = 0L;
111111

112112
/** Reference to the currently-active default consumer, or null if there is
113113
* none.
@@ -498,8 +498,7 @@ public void basicPublish(String exchange, String routingKey,
498498
BasicProperties props, byte[] body)
499499
throws IOException
500500
{
501-
if (publishedMessageCount.get() >= 0)
502-
publishedMessageCount.incrementAndGet();
501+
if (nextPublishSeqNo > 0) nextPublishSeqNo++;
503502
BasicProperties useProps = props;
504503
if (props == null) {
505504
useProps = MessageProperties.MINIMAL_BASIC;
@@ -868,8 +867,7 @@ public Tx.RollbackOk txRollback()
868867
public Confirm.SelectOk confirmSelect()
869868
throws IOException
870869
{
871-
if (publishedMessageCount.get() == -1)
872-
publishedMessageCount.set(0);
870+
if (nextPublishSeqNo == 0) nextPublishSeqNo = 0;
873871
return (Confirm.SelectOk)
874872
exnWrappingRpc(new Confirm.Select(false)).getMethod();
875873

@@ -886,7 +884,7 @@ public Channel.FlowOk getFlow() {
886884
}
887885

888886
/** Public API - {@inheritDoc} */
889-
public long getPublishedMessageCount() {
890-
return publishedMessageCount.longValue();
887+
public long getNextPublishSeqNo() {
888+
return nextPublishSeqNo;
891889
}
892890
}

test/src/com/rabbitmq/examples/MulticastMain.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -344,7 +344,7 @@ private void publish(byte[] msg)
344344
}
345345

346346
private boolean throttlePubAck() {
347-
return ((pubAckCount > 0) && (channel.getPublishedMessageCount() - mostRecentAcked > pubAckCount));
347+
return ((pubAckCount > 0) && (channel.getNextPublishSeqNo() - mostRecentAcked > pubAckCount));
348348
}
349349

350350
private void delay(long now)

0 commit comments

Comments
 (0)