Skip to content

Commit f00c488

Browse files
[ISSUE #131] Support Snapshot for Dledger
1 parent 5398f6a commit f00c488

31 files changed

+1511
-69
lines changed

dledger/src/main/java/io/openmessaging/storage/dledger/DLedgerConfig.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,9 @@ public class DLedgerConfig {
9191

9292
private long leadershipTransferWaitTimeout = 1000;
9393

94+
private int snapshotThreshold = 1000;
95+
private int maxSnapshotReservedNum = 3;
96+
9497
public String getDefaultPath() {
9598
return storeBaseDir + File.separator + "dledger-" + selfId;
9699
}
@@ -106,6 +109,10 @@ public void setDataStorePath(String dataStorePath) {
106109
this.dataStorePath = dataStorePath;
107110
}
108111

112+
public String getSnapshotStoreBaseDir() {
113+
return getDefaultPath() + File.separator + "snapshot";
114+
}
115+
109116
public String getIndexStorePath() {
110117
return getDefaultPath() + File.separator + "index";
111118
}
@@ -463,4 +470,19 @@ public Map<String, String> getPeerAddressMap() {
463470
return this.peerAddressMap;
464471
}
465472

473+
public int getSnapshotThreshold() {
474+
return snapshotThreshold;
475+
}
476+
477+
public void setSnapshotThreshold(int snapshotThreshold) {
478+
this.snapshotThreshold = snapshotThreshold;
479+
}
480+
481+
public int getMaxSnapshotReservedNum() {
482+
return maxSnapshotReservedNum;
483+
}
484+
485+
public void setMaxSnapshotReservedNum(int maxSnapshotReservedNum) {
486+
this.maxSnapshotReservedNum = maxSnapshotReservedNum;
487+
}
466488
}

dledger/src/main/java/io/openmessaging/storage/dledger/DLedgerServer.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import io.openmessaging.storage.dledger.protocol.PushEntryResponse;
3737
import io.openmessaging.storage.dledger.protocol.VoteRequest;
3838
import io.openmessaging.storage.dledger.protocol.VoteResponse;
39+
import io.openmessaging.storage.dledger.snapshot.SnapshotManager;
3940
import io.openmessaging.storage.dledger.statemachine.StateMachine;
4041
import io.openmessaging.storage.dledger.statemachine.StateMachineCaller;
4142
import io.openmessaging.storage.dledger.store.DLedgerMemoryStore;
@@ -139,6 +140,11 @@ public DLedgerServer(DLedgerConfig dLedgerConfig, DLedgerRpcService dLedgerRpcSe
139140
public synchronized void startup() {
140141
if (!isStarted) {
141142
this.dLedgerStore.startup();
143+
this.fsmCaller.ifPresent(x -> {
144+
// Start state machine caller and load existing snapshots for data recovery
145+
x.start();
146+
x.getSnapshotManager().loadSnapshot();
147+
});
142148
if (RpcServiceMode.EXCLUSIVE.equals(this.rpcServiceMode)) {
143149
this.dLedgerRpcService.startup();
144150
}
@@ -183,7 +189,7 @@ public synchronized void registerStateMachine(final StateMachine fsm) {
183189
throw new IllegalStateException("can not register statemachine after dledger server starts");
184190
}
185191
final StateMachineCaller fsmCaller = new StateMachineCaller(this.dLedgerStore, fsm, this.dLedgerEntryPusher);
186-
fsmCaller.start();
192+
fsmCaller.registerSnapshotManager(new SnapshotManager(this));
187193
this.fsmCaller = Optional.of(fsmCaller);
188194
// Register state machine caller to entry pusher
189195
this.dLedgerEntryPusher.registerStateMachine(this.fsmCaller);
@@ -544,6 +550,10 @@ public NettyRemotingClient getRemotingClient() {
544550
return null;
545551
}
546552

553+
public StateMachineCaller getFsmCaller() {
554+
return fsmCaller.orElseThrow(NullPointerException::new);
555+
}
556+
547557
public boolean isLeader() {
548558
return this.memberState.isLeader();
549559
}

dledger/src/main/java/io/openmessaging/storage/dledger/protocol/DLedgerResponseCode.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,8 @@ public enum DLedgerResponseCode {
5454
LEADER_PENDING_FULL(503, ""),
5555
ILLEGAL_MEMBER_STATE(504, ""),
5656
LEADER_NOT_READY(505, ""),
57-
LEADER_TRANSFERRING(506, "");
57+
LEADER_TRANSFERRING(506, ""),
58+
LOAD_SNAPSHOT_ERROR(507, "");
5859

5960
private static Map<Integer, DLedgerResponseCode> codeMap = new HashMap<>();
6061

Lines changed: 261 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,261 @@
1+
/*
2+
* Copyright 2017-2022 The DLedger Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.openmessaging.storage.dledger.snapshot;
18+
19+
import io.openmessaging.storage.dledger.DLedgerConfig;
20+
import io.openmessaging.storage.dledger.DLedgerServer;
21+
import io.openmessaging.storage.dledger.entry.DLedgerEntry;
22+
import io.openmessaging.storage.dledger.exception.DLedgerException;
23+
import io.openmessaging.storage.dledger.protocol.DLedgerResponseCode;
24+
import io.openmessaging.storage.dledger.snapshot.file.FileSnapshotStore;
25+
import io.openmessaging.storage.dledger.snapshot.hook.LoadSnapshotHook;
26+
import io.openmessaging.storage.dledger.snapshot.hook.SaveSnapshotHook;
27+
import io.openmessaging.storage.dledger.utils.IOUtils;
28+
import org.slf4j.Logger;
29+
import org.slf4j.LoggerFactory;
30+
31+
import java.io.File;
32+
import java.io.IOException;
33+
import java.util.Objects;
34+
import java.util.concurrent.CompletableFuture;
35+
36+
public class SnapshotManager {
37+
38+
private static Logger logger = LoggerFactory.getLogger(SnapshotManager.class);
39+
40+
public static final String SNAPSHOT_META_FILE = "snapshot_meta";
41+
public static final String SNAPSHOT_DATA_FILE = "data";
42+
public static final String SNAPSHOT_DIR_PREFIX = "snapshot_";
43+
public static final String SNAPSHOT_TEMP_DIR = "tmp";
44+
45+
private DLedgerServer dLedgerServer;
46+
private long lastSnapshotIndex;
47+
private long lastSnapshotTerm;
48+
private final SnapshotStore snapshotStore;
49+
private volatile boolean savingSnapshot;
50+
private volatile boolean loadingSnapshot;
51+
52+
public SnapshotManager(DLedgerServer dLedgerServer) {
53+
this.dLedgerServer = dLedgerServer;
54+
this.snapshotStore = new FileSnapshotStore(this.dLedgerServer.getDLedgerConfig().getSnapshotStoreBaseDir());
55+
}
56+
57+
public boolean isSavingSnapshot() {
58+
return savingSnapshot;
59+
}
60+
61+
public boolean isLoadingSnapshot() {
62+
return loadingSnapshot;
63+
}
64+
65+
private class SaveSnapshotAfterHook implements SaveSnapshotHook {
66+
67+
SnapshotWriter writer;
68+
DLedgerEntry dLedgerEntry;
69+
SnapshotMeta snapshotMeta;
70+
71+
public SaveSnapshotAfterHook(SnapshotWriter writer, DLedgerEntry dLedgerEntry) {
72+
this.writer = writer;
73+
this.dLedgerEntry = dLedgerEntry;
74+
}
75+
76+
@Override
77+
public void doCallBack(SnapshotStatus status) {
78+
saveSnapshotAfter(writer, snapshotMeta, dLedgerEntry, status);
79+
}
80+
81+
@Override
82+
public void registerSnapshotMeta(SnapshotMeta snapshotMeta) {
83+
this.snapshotMeta = snapshotMeta;
84+
this.writer.setSnapshotMeta(snapshotMeta);
85+
}
86+
87+
@Override
88+
public SnapshotWriter getSnapshotWriter() {
89+
return this.writer;
90+
}
91+
92+
@Override
93+
public DLedgerEntry getSnapshotEntry() {
94+
return this.dLedgerEntry;
95+
}
96+
}
97+
98+
private class LoadSnapshotAfterHook implements LoadSnapshotHook {
99+
100+
SnapshotReader reader;
101+
SnapshotMeta snapshotMeta;
102+
103+
public LoadSnapshotAfterHook(SnapshotReader reader) {
104+
this.reader = reader;
105+
}
106+
107+
@Override
108+
public void doCallBack(SnapshotStatus status) {
109+
loadSnapshotAfter(reader, snapshotMeta, status);
110+
}
111+
112+
@Override
113+
public void registerSnapshotMeta(SnapshotMeta snapshotMeta) {
114+
this.snapshotMeta = snapshotMeta;
115+
}
116+
117+
@Override
118+
public SnapshotReader getSnapshotReader() {
119+
return this.reader;
120+
}
121+
}
122+
123+
public void saveSnapshot(DLedgerEntry dLedgerEntry) {
124+
// Check if still saving other snapshots
125+
if (this.savingSnapshot) {
126+
return;
127+
}
128+
// Check if applied index reaching the snapshot threshold
129+
if (dLedgerEntry.getIndex() - this.lastSnapshotIndex <= this.dLedgerServer.getDLedgerConfig().getSnapshotThreshold()) {
130+
return;
131+
}
132+
// Create snapshot writer
133+
SnapshotWriter writer = this.snapshotStore.createSnapshotWriter();
134+
if (writer == null) {
135+
return;
136+
}
137+
// Start saving snapshot
138+
this.savingSnapshot = true;
139+
SaveSnapshotAfterHook saveSnapshotAfter = new SaveSnapshotAfterHook(writer, dLedgerEntry);
140+
if (!this.dLedgerServer.getFsmCaller().onSnapshotSave(saveSnapshotAfter)) {
141+
logger.error("Unable to call statemachine onSnapshotSave");
142+
saveSnapshotAfter.doCallBack(SnapshotStatus.FAIL);
143+
}
144+
}
145+
146+
private void saveSnapshotAfter(SnapshotWriter writer, SnapshotMeta snapshotMeta, DLedgerEntry dLedgerEntry, SnapshotStatus status) {
147+
int res = status.getCode();
148+
// Update snapshot meta
149+
if (res == SnapshotStatus.SUCCESS.getCode()) {
150+
writer.setSnapshotMeta(snapshotMeta);
151+
}
152+
// Write snapshot meta into files and close snapshot writer
153+
try {
154+
writer.save(status);
155+
} catch (IOException e) {
156+
logger.error("Unable to close snapshot writer", e);
157+
res = SnapshotStatus.FAIL.getCode();
158+
}
159+
if (res == SnapshotStatus.SUCCESS.getCode()) {
160+
this.lastSnapshotIndex = snapshotMeta.getLastIncludedIndex();
161+
this.lastSnapshotTerm = snapshotMeta.getLastIncludedTerm();
162+
logger.info("Snapshot {} saved successfully", snapshotMeta);
163+
// Remove previous logs
164+
CompletableFuture.runAsync(() -> {
165+
truncatePrefix(dLedgerEntry);
166+
});
167+
} else {
168+
logger.error("Unable to save snapshot");
169+
}
170+
this.savingSnapshot = false;
171+
}
172+
173+
private void truncatePrefix(DLedgerEntry entry) {
174+
deleteExpiredSnapshot();
175+
this.dLedgerServer.getFsmCaller().getdLedgerStore().resetOffsetAfterSnapshot(entry);
176+
}
177+
178+
private void deleteExpiredSnapshot() {
179+
// Remove the oldest snapshot
180+
DLedgerConfig config = dLedgerServer.getDLedgerConfig();
181+
File[] snapshotFiles = new File(config.getSnapshotStoreBaseDir()).listFiles();
182+
if (snapshotFiles != null && snapshotFiles.length > config.getMaxSnapshotReservedNum()) {
183+
long minSnapshotIdx = Long.MAX_VALUE;
184+
for (File file : snapshotFiles) {
185+
String fileName = file.getName();
186+
if (!fileName.startsWith(SnapshotManager.SNAPSHOT_DIR_PREFIX)) {
187+
continue;
188+
}
189+
minSnapshotIdx = Math.min(Long.parseLong(fileName.substring(SnapshotManager.SNAPSHOT_DIR_PREFIX.length())), minSnapshotIdx);
190+
}
191+
String deleteFilePath = config.getSnapshotStoreBaseDir() + File.separator + SnapshotManager.SNAPSHOT_DIR_PREFIX + minSnapshotIdx;
192+
try {
193+
IOUtils.deleteFile(new File(deleteFilePath));
194+
} catch (IOException e) {
195+
logger.error("Unable to remove expired snapshot: {}", deleteFilePath, e);
196+
}
197+
}
198+
}
199+
200+
public void loadSnapshot() {
201+
// Check if still loading snapshot
202+
if (loadingSnapshot) {
203+
return;
204+
}
205+
// Create snapshot reader
206+
SnapshotReader reader = snapshotStore.createSnapshotReader();
207+
if (reader == null) {
208+
return;
209+
}
210+
// Start loading snapshot
211+
this.loadingSnapshot = true;
212+
LoadSnapshotAfterHook loadSnapshotAfter = new LoadSnapshotAfterHook(reader);
213+
if (!this.dLedgerServer.getFsmCaller().onSnapshotLoad(loadSnapshotAfter)) {
214+
this.dLedgerServer.getFsmCaller().setError(this.dLedgerServer,
215+
new DLedgerException(DLedgerResponseCode.LOAD_SNAPSHOT_ERROR, "Unable to call statemachine onSnapshotLoad"));
216+
}
217+
}
218+
219+
private void loadSnapshotAfter(SnapshotReader reader, SnapshotMeta snapshotMeta, SnapshotStatus status) {
220+
if (status.getCode() == SnapshotStatus.SUCCESS.getCode()) {
221+
this.lastSnapshotIndex = snapshotMeta.getLastIncludedIndex();
222+
this.lastSnapshotTerm = snapshotMeta.getLastIncludedTerm();
223+
this.loadingSnapshot = false;
224+
logger.info("Snapshot {} loaded successfully", snapshotMeta);
225+
} else {
226+
// Stop the loading process if the snapshot is expired
227+
if (status.getCode() == SnapshotStatus.EXPIRED.getCode()) {
228+
this.loadingSnapshot = false;
229+
return;
230+
}
231+
// Remove the error snapshot
232+
boolean failed = false;
233+
try {
234+
IOUtils.deleteFile(new File(reader.getSnapshotStorePath()));
235+
} catch (IOException e) {
236+
logger.error("Unable to remove error snapshot: {}", reader.getSnapshotStorePath(), e);
237+
failed = true;
238+
}
239+
// Check if there is snapshot exists
240+
DLedgerConfig config = this.dLedgerServer.getDLedgerConfig();
241+
if (Objects.requireNonNull(new File(config.getSnapshotStoreBaseDir()).listFiles()).length == 0) {
242+
logger.error("No snapshot for recovering state machine: {}", config.getSnapshotStoreBaseDir());
243+
failed = true;
244+
}
245+
if (failed) {
246+
// Still able to recover from files if the beginning index of file store is 0
247+
if (this.dLedgerServer.getFsmCaller().getdLedgerStore().getLedgerBeginIndex() == 0) {
248+
this.loadingSnapshot = false;
249+
return;
250+
}
251+
this.dLedgerServer.getFsmCaller().setError(this.dLedgerServer,
252+
new DLedgerException(DLedgerResponseCode.LOAD_SNAPSHOT_ERROR, "Fail to recover state machine"));
253+
return;
254+
}
255+
// Retry loading the previous snapshots
256+
logger.warn("Load snapshot from {} failed. Start recovering from the previous snapshot", reader.getSnapshotStorePath());
257+
this.loadingSnapshot = false;
258+
loadSnapshot();
259+
}
260+
}
261+
}

0 commit comments

Comments
 (0)