Skip to content

Commit 33619b7

Browse files
author
Alexandru Scvortov
committed
MCM: use a semaphore to throttle confirms instead of a try-and-wait loop
1 parent 27bdb32 commit 33619b7

File tree

1 file changed

+30
-23
lines changed

1 file changed

+30
-23
lines changed

test/src/com/rabbitmq/examples/MulticastMain.java

Lines changed: 30 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import java.io.DataOutputStream;
3838
import java.io.IOException;
3939
import java.util.Arrays;
40+
import java.util.concurrent.Semaphore;
4041
import java.util.List;
4142
import java.util.UUID;
4243

@@ -244,10 +245,9 @@ public static class Producer implements Runnable, ReturnListener, AckListener {
244245
private int msgCount;
245246
private int basicReturnCount;
246247

247-
private boolean confirm;
248-
private long confirmMax;
249-
private long mostRecentConfirmed;
250-
private long confirmCount;
248+
private boolean confirm;
249+
private long confirmCount;
250+
private Semaphore confirmPool;
251251

252252
public Producer(Channel channel, String exchangeName, String id,
253253
List flags, int txSize,
@@ -265,7 +265,9 @@ public Producer(Channel channel, String exchangeName, String id,
265265
this.interval = interval;
266266
this.rateLimit = rateLimit;
267267
this.timeLimit = 1000L * timeLimit;
268-
this.confirmMax = confirmMax;
268+
if (confirmMax > 0) {
269+
this.confirmPool = new Semaphore((int)confirmMax);
270+
}
269271
this.message = new byte[minMsgSize];
270272
this.confirm = confirm;
271273
}
@@ -295,9 +297,21 @@ private synchronized void resetConfirms() {
295297
confirmCount = 0;
296298
}
297299

298-
private synchronized void logAck(long seqNum) {
299-
mostRecentConfirmed = seqNum;
300-
confirmCount++;
300+
private void logAck(long seqNum) {
301+
if (confirmPool != null) {
302+
confirmPool.release();
303+
}
304+
synchronized (this) {
305+
confirmCount++;
306+
}
307+
}
308+
309+
private void canPublish()
310+
throws InterruptedException
311+
{
312+
if (confirmPool != null) {
313+
confirmPool.acquire();
314+
}
301315
}
302316

303317
public void run() {
@@ -310,17 +324,14 @@ public void run() {
310324
try {
311325

312326
while (timeLimit == 0 || now < startTime + timeLimit) {
313-
if (!throttleConfirms()) {
314-
delay(now);
315-
publish(createMessage(totalMsgCount));
316-
totalMsgCount++;
317-
msgCount++;
318-
319-
if (txSize != 0 && totalMsgCount % txSize == 0) {
320-
channel.txCommit();
321-
}
322-
} else {
323-
Thread.sleep(10);
327+
canPublish();
328+
delay(now);
329+
publish(createMessage(totalMsgCount));
330+
totalMsgCount++;
331+
msgCount++;
332+
333+
if (txSize != 0 && totalMsgCount % txSize == 0) {
334+
channel.txCommit();
324335
}
325336
now = System.currentTimeMillis();
326337
}
@@ -346,10 +357,6 @@ private void publish(byte[] msg)
346357
msg);
347358
}
348359

349-
private boolean throttleConfirms() {
350-
return ((confirmMax > 0) && (channel.getNextPublishSeqNo() - mostRecentConfirmed > confirmMax));
351-
}
352-
353360
private void delay(long now)
354361
throws InterruptedException {
355362

0 commit comments

Comments
 (0)