Skip to content

Commit 38fd156

Browse files
committed
feat: check pending total bytes for push
Change-Id: I187cc689307e757fd8cde285f131c40a035d3d93
1 parent 2c2e73e commit 38fd156

File tree

1 file changed

+33
-1
lines changed

1 file changed

+33
-1
lines changed

dledger/src/main/java/io/openmessaging/storage/dledger/DLedgerEntryPusher.java

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
package io.openmessaging.storage.dledger;
1818

1919
import com.alibaba.fastjson.JSON;
20+
21+
import io.openmessaging.storage.dledger.DLedgerEntryPusher.EntryDispatcherState;
2022
import io.openmessaging.storage.dledger.common.Closure;
2123
import io.openmessaging.storage.dledger.common.ShutdownAbleThread;
2224
import io.openmessaging.storage.dledger.common.Status;
@@ -55,6 +57,7 @@
5557
import java.util.concurrent.ConcurrentHashMap;
5658
import java.util.concurrent.ConcurrentMap;
5759
import java.util.concurrent.TimeUnit;
60+
import java.util.concurrent.atomic.AtomicLong;
5861
import java.util.concurrent.atomic.AtomicReference;
5962
import java.util.concurrent.locks.ReentrantLock;
6063
import java.util.stream.Collectors;
@@ -180,12 +183,33 @@ public int getPendingCount(long currTerm) {
180183
return pendings.size();
181184
}
182185

186+
public long getPendingSize(long currTerm) {
187+
if (dispatcherMap == null) {
188+
return 0;
189+
}
190+
long total = 0;
191+
for (EntryDispatcher dispatcher : dispatcherMap.values()) {
192+
total += dispatcher.pendingTotalSize.get();
193+
}
194+
return total;
195+
}
196+
183197
public boolean isPendingFull(long currTerm) {
184198
checkTermForPendingMap(currTerm, "isPendingFull");
185199
if (dLedgerStore.isLocalToomuchUncommitted()) {
186200
return true;
187201
}
188-
return pendingClosure.get(currTerm).size() > dLedgerConfig.getMaxPendingRequestsNum();
202+
if (pendingClosure.get(currTerm).size() > dLedgerConfig.getMaxPendingRequestsNum()) {
203+
return true;
204+
}
205+
// avoid too much memory in pending if more than half followers fall behind too much
206+
int fallBehindTooMuch = 0;
207+
for (EntryDispatcher dispatcher : dispatcherMap.values()) {
208+
if (dispatcher.pendingTotalSize.get() >= dLedgerConfig.getPeerPushPendingMaxBytes()) {
209+
fallBehindTooMuch++;
210+
}
211+
}
212+
return fallBehindTooMuch > dispatcherMap.size() / 2;
189213
}
190214

191215
public void appendClosure(Closure closure, long term, long index) {
@@ -409,6 +433,8 @@ private class EntryDispatcher extends ShutdownAbleThread {
409433
private long matchIndex = -1;
410434

411435
private final int maxPendingSize = 1000;
436+
private AtomicLong pendingTotalSize = new AtomicLong(0);
437+
412438
private long term = -1;
413439
private String leaderId = null;
414440
private long lastCheckLeakTimeMs = System.currentTimeMillis();
@@ -715,6 +741,10 @@ private void doAppend() throws Exception {
715741
doCheckAppendResponse();
716742
break;
717743
}
744+
if (pendingTotalSize.get() >= dLedgerConfig.getPeerPushPendingMaxBytes()) {
745+
// to avoid oom or fullgc, we should wait for a while if too much pending big entry size
746+
break;
747+
}
718748
long lastIndexToBeSend = doAppendInner(writeIndex);
719749
if (lastIndexToBeSend == -1) {
720750
break;
@@ -759,8 +789,10 @@ private void sendBatchAppendEntryRequest() throws Exception {
759789
StopWatch watch = StopWatch.createStarted();
760790
CompletableFuture<PushEntryResponse> responseFuture = dLedgerRpcService.push(batchAppendEntryRequest);
761791
pendingMap.put(firstIndex, new Pair<>(System.currentTimeMillis(), batchAppendEntryRequest.getCount()));
792+
pendingTotalSize.addAndGet(entriesSize);
762793
responseFuture.whenComplete((x, ex) -> {
763794
try {
795+
pendingTotalSize.addAndGet(-1 * entriesSize);
764796
PreConditions.check(ex == null, DLedgerResponseCode.UNKNOWN);
765797
DLedgerResponseCode responseCode = DLedgerResponseCode.valueOf(x.getCode());
766798
switch (responseCode) {

0 commit comments

Comments
 (0)