Skip to content

Commit 95d584d

Browse files
author
Alexandru Scvortov
committed
refactor
1 parent e23cbee commit 95d584d

File tree

3 files changed

+12
-15
lines changed

3 files changed

+12
-15
lines changed

src/com/rabbitmq/client/Channel.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -649,7 +649,7 @@ Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, b
649649

650650
/**
651651
* Enables publisher acknowledgements on this channel.
652-
* @param many determines whether the broker can acknowledge
652+
* @param multiple determines whether the broker can acknowledge
653653
* multiple messages at the same time
654654
* @see com.rabbitmq.client.AMQP.Confirm.Select
655655
* @throws java.io.IOException if an error is encountered
@@ -660,6 +660,7 @@ Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, b
660660
* Returns the number of messages published since the channel was
661661
* put in confirm mode.
662662
* @return the number of messages published since the first
663-
* confirm.select */
663+
* confirm.select; if the channel is not in confirm mode, -1 is
664+
* returned */
664665
long getPublishedMessageCount();
665666
}

src/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
import java.util.Collections;
5757
import java.util.HashMap;
5858
import java.util.Map;
59+
import java.util.concurrent.atomic.AtomicLong;
5960
import java.util.concurrent.TimeoutException;
6061

6162

@@ -106,7 +107,7 @@ public class ChannelN extends AMQChannel implements com.rabbitmq.client.Channel
106107

107108
/** Current published message count (used by publisher acknowledgements)
108109
*/
109-
private long publishedMessageCount;
110+
private AtomicLong publishedMessageCount = new AtomicLong(-1);
110111

111112
/** Reference to the currently-active default consumer, or null if there is
112113
* none.
@@ -497,10 +498,8 @@ public void basicPublish(String exchange, String routingKey,
497498
BasicProperties props, byte[] body)
498499
throws IOException
499500
{
500-
synchronized (_channelMutex) {
501-
if (publishedMessageCount >= 0)
502-
++publishedMessageCount;
503-
}
501+
if (publishedMessageCount.get() >= 0)
502+
publishedMessageCount.incrementAndGet();
504503
BasicProperties useProps = props;
505504
if (props == null) {
506505
useProps = MessageProperties.MINIMAL_BASIC;
@@ -847,10 +846,8 @@ public Tx.RollbackOk txRollback()
847846
public Confirm.SelectOk confirmSelect(boolean multiple)
848847
throws IOException
849848
{
850-
synchronized (_channelMutex) {
851-
if (publishedMessageCount == -1)
852-
publishedMessageCount = 0;
853-
}
849+
if (publishedMessageCount.get() == -1)
850+
publishedMessageCount.set(0);
854851
return (Confirm.SelectOk)
855852
exnWrappingRpc(new Confirm.Select(multiple, false)).getMethod();
856853

@@ -868,6 +865,6 @@ public Channel.FlowOk getFlow() {
868865

869866
/** Public API - {@inheritDoc} */
870867
public long getPublishedMessageCount() {
871-
return publishedMessageCount;
868+
return publishedMessageCount.longValue();
872869
}
873870
}

test/src/com/rabbitmq/client/test/functional/Confirm.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,15 +53,14 @@ public class Confirm extends BrokerTestCase
5353
protected void setUp() throws IOException {
5454
super.setUp();
5555
ackSet = new TreeSet<Long>();
56-
final Confirm This = this;
5756
channel.setAckListener(new AckListener() {
5857
public void handleAck(long seqNo,
5958
boolean multiple) {
6059
if (multiple) {
6160
for (int i = 0; i <= seqNo; ++i)
62-
This.gotAckFor(i);
61+
Confirm.this.gotAckFor(i);
6362
} else {
64-
This.gotAckFor(seqNo);
63+
Confirm.this.gotAckFor(seqNo);
6564
}
6665
}
6766
});

0 commit comments

Comments
 (0)