Skip to content

Commit 16147e6

Browse files
author
Alexandru Scvortov
committed
MCM: take into account the multiple flags and count logical confirms rather than methods
1 parent f3199f3 commit 16147e6

File tree

1 file changed

+28
-9
lines changed

1 file changed

+28
-9
lines changed

test/src/com/rabbitmq/examples/MulticastMain.java

Lines changed: 28 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,11 @@
3737
import java.io.DataOutputStream;
3838
import java.io.IOException;
3939
import java.util.Arrays;
40+
import java.util.Collections;
4041
import java.util.concurrent.Semaphore;
4142
import java.util.List;
43+
import java.util.SortedSet;
44+
import java.util.TreeSet;
4245
import java.util.UUID;
4346

4447
import org.apache.commons.cli.CommandLine;
@@ -248,6 +251,8 @@ public static class Producer implements Runnable, ReturnListener, AckListener {
248251
private boolean confirm;
249252
private long confirmCount;
250253
private Semaphore confirmPool;
254+
private volatile SortedSet<Long> ackSet =
255+
Collections.synchronizedSortedSet(new TreeSet<Long>());
251256

252257
public Producer(Channel channel, String exchangeName, String id,
253258
List flags, int txSize,
@@ -289,21 +294,34 @@ public synchronized void resetBasicReturns() {
289294
basicReturnCount = 0;
290295
}
291296

292-
public void handleAck(long sequenceNumber, boolean multiple) {
293-
logAck(sequenceNumber);
297+
public void handleAck(long seqNo, boolean multiple) {
298+
int numConfirms = 0;
299+
if (multiple) {
300+
for (long i = ackSet.first(); i <= seqNo; ++i) {
301+
if (!ackSet.contains(i))
302+
continue;
303+
ackSet.remove(i);
304+
numConfirms++;
305+
}
306+
} else {
307+
ackSet.remove(seqNo);
308+
numConfirms = 1;
309+
}
310+
addConfirms(numConfirms);
311+
312+
if (confirmPool != null) {
313+
for (int i = 0; i < numConfirms; ++i) {
314+
confirmPool.release();
315+
}
316+
}
294317
}
295318

296319
private synchronized void resetConfirms() {
297320
confirmCount = 0;
298321
}
299322

300-
private void logAck(long seqNum) {
301-
if (confirmPool != null) {
302-
confirmPool.release();
303-
}
304-
synchronized (this) {
305-
confirmCount++;
306-
}
323+
private synchronized void addConfirms(int numConfirms) {
324+
confirmCount += numConfirms;
307325
}
308326

309327
public void run() {
@@ -345,6 +363,7 @@ public void run() {
345363
private void publish(byte[] msg)
346364
throws IOException {
347365

366+
ackSet.add(channel.getNextPublishSeqNo());
348367
channel.basicPublish(exchangeName, id,
349368
mandatory, immediate,
350369
persistent ? MessageProperties.MINIMAL_PERSISTENT_BASIC : MessageProperties.MINIMAL_BASIC,

0 commit comments

Comments
 (0)