Skip to content

Commit 9e0a552

Browse files
author
Alexandru Scvortov
committed
keep track of published message count in java client
1 parent 39a19de commit 9e0a552

File tree

3 files changed

+55
-6
lines changed

3 files changed

+55
-6
lines changed

src/com/rabbitmq/client/Channel.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -617,4 +617,11 @@ Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, b
617617
*/
618618
Confirm.SelectOk confirmSelect(boolean multiple, boolean nowait)
619619
throws IOException;
620+
621+
/**
622+
* Returns the number of messages published since the channel was
623+
* put in confirm mode.
624+
* @return the number of messages published since the first
625+
* confirm.select */
626+
long getPublishedMessageCount();
620627
}

src/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,10 @@ public class ChannelN extends AMQChannel implements com.rabbitmq.client.Channel
104104
*/
105105
public volatile AckListener ackListener = null;
106106

107+
/** Current published message count (used by publisher acknowledgements)
108+
*/
109+
private long publishedMessageCount;
110+
107111
/** Reference to the currently-active default consumer, or null if there is
108112
* none.
109113
*/
@@ -493,6 +497,10 @@ public void basicPublish(String exchange, String routingKey,
493497
BasicProperties props, byte[] body)
494498
throws IOException
495499
{
500+
synchronized (_channelMutex) {
501+
if (publishedMessageCount >= 0)
502+
++publishedMessageCount;
503+
}
496504
BasicProperties useProps = props;
497505
if (props == null) {
498506
useProps = MessageProperties.MINIMAL_BASIC;
@@ -823,6 +831,10 @@ public Confirm.SelectOk confirmSelect(boolean multiple)
823831
public Confirm.SelectOk confirmSelect(boolean multiple, boolean nowait)
824832
throws IOException
825833
{
834+
synchronized (_channelMutex) {
835+
if (publishedMessageCount == -1)
836+
publishedMessageCount = 0;
837+
}
826838
return (Confirm.SelectOk)
827839
exnWrappingRpc(new Confirm.Select(multiple, nowait)).getMethod();
828840
}
@@ -837,4 +849,8 @@ public Channel.FlowOk getFlow() {
837849
return new Channel.FlowOk(!_blockContent);
838850
}
839851

852+
/** Public API - {@inheritDoc} */
853+
public long getPublishedMessageCount() {
854+
return publishedMessageCount;
855+
}
840856
}

test/src/com/rabbitmq/examples/MulticastMain.java

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ public static void main(String[] args) {
8282
int producerTxSize = intArg(cmd, 'm', 0);
8383
int consumerTxSize = intArg(cmd, 'n', 0);
8484
boolean pubAck = cmd.hasOption('c');
85-
int pubAckCount = intArg(cmd, 'k', 0);
85+
long pubAckCount = intArg(cmd, 'k', 0);
8686
boolean autoAck = cmd.hasOption('a');
8787
int prefetchCount = intArg(cmd, 'q', 0);
8888
int minMsgSize = intArg(cmd, 's', 0);
@@ -139,9 +139,10 @@ public static void main(String[] args) {
139139
if (pubAck) channel.confirmSelect(true);
140140
channel.exchangeDeclare(exchangeName, exchangeType);
141141
final Producer p = new Producer(channel, exchangeName, id,
142-
flags, producerTxSize,
143-
1000L * samplingInterval,
144-
rateLimit, minMsgSize, timeLimit);
142+
flags, producerTxSize,
143+
1000L * samplingInterval,
144+
rateLimit, minMsgSize, timeLimit,
145+
pubAckCount);
145146
channel.setReturnListener(p);
146147
channel.setAckListener(p);
147148
Thread t = new Thread(p);
@@ -233,9 +234,13 @@ public static class Producer implements Runnable, ReturnListener, AckListener {
233234
private int msgCount;
234235
private int basicReturnCount;
235236

237+
private long pubAckCount;
238+
private long mostRecentAcked;
239+
236240
public Producer(Channel channel, String exchangeName, String id,
237241
List flags, int txSize,
238-
long interval, int rateLimit, int minMsgSize, int timeLimit)
242+
long interval, int rateLimit, int minMsgSize, int timeLimit,
243+
long pubAckCount)
239244
throws IOException {
240245

241246
this.channel = channel;
@@ -248,6 +253,7 @@ public Producer(Channel channel, String exchangeName, String id,
248253
this.interval = interval;
249254
this.rateLimit = rateLimit;
250255
this.timeLimit = 1000L * timeLimit;
256+
this.pubAckCount = pubAckCount;
251257
this.message = new byte[minMsgSize];
252258
}
253259

@@ -269,7 +275,16 @@ public synchronized void resetBasicReturns() {
269275
}
270276

271277
public void handleAck(long sequenceNumber, boolean multiple) {
272-
System.out.printf("got an ack for %d\n", sequenceNumber);
278+
if (multiple) {
279+
logAck(sequenceNumber);
280+
System.out.printf("got an ack all messages up to %d\n", sequenceNumber);
281+
} else {
282+
System.out.printf("got an ack for message %d\n", sequenceNumber);
283+
}
284+
}
285+
286+
private synchronized void logAck(long seqNum) {
287+
mostRecentAcked = seqNum;
273288
}
274289

275290
public void run() {
@@ -282,6 +297,7 @@ public void run() {
282297
try {
283298

284299
while (timeLimit == 0 || now < startTime + timeLimit) {
300+
delayPubAck();
285301
delay(now);
286302
publish(createMessage(totalMsgCount));
287303
totalMsgCount++;
@@ -314,6 +330,16 @@ private void publish(byte[] msg)
314330
msg);
315331
}
316332

333+
private void delayPubAck()
334+
throws InterruptedException {
335+
if (pubAckCount == 0)
336+
return;
337+
while (channel.getPublishedMessageCount() - mostRecentAcked
338+
> pubAckCount) {
339+
Thread.sleep(100);
340+
}
341+
}
342+
317343
private void delay(long now)
318344
throws InterruptedException {
319345

0 commit comments

Comments
 (0)