Skip to content

Commit ca320c1

Browse files
authored
GH-5512 Improve NativeStore corrupt data recovery (#5514)
1 parent 3467b16 commit ca320c1

File tree

17 files changed

+944
-54
lines changed

17 files changed

+944
-54
lines changed

.gitignore

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,5 +54,5 @@ e2e/playwright-report
5454
e2e/test-results
5555
.aider*
5656
/tools/server/.lwjgl/
57-
/tools/server/.lwjgl/
5857
.m2_repo/
58+
.serena/

core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/NativeSailStore.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@
3737
import org.eclipse.rdf4j.model.Value;
3838
import org.eclipse.rdf4j.model.ValueFactory;
3939
import org.eclipse.rdf4j.query.algebra.evaluation.impl.EvaluationStatistics;
40-
import org.eclipse.rdf4j.sail.Sail;
4140
import org.eclipse.rdf4j.sail.SailException;
4241
import org.eclipse.rdf4j.sail.base.BackingSailSource;
4342
import org.eclipse.rdf4j.sail.base.Changeset;

core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/ValueStore.java

Lines changed: 69 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.eclipse.rdf4j.model.vocabulary.XSD;
3636
import org.eclipse.rdf4j.sail.SailException;
3737
import org.eclipse.rdf4j.sail.nativerdf.datastore.DataStore;
38+
import org.eclipse.rdf4j.sail.nativerdf.datastore.RecoveredDataException;
3839
import org.eclipse.rdf4j.sail.nativerdf.model.CorruptIRI;
3940
import org.eclipse.rdf4j.sail.nativerdf.model.CorruptIRIOrBNode;
4041
import org.eclipse.rdf4j.sail.nativerdf.model.CorruptLiteral;
@@ -145,7 +146,7 @@ public ValueStore(File dataDir, boolean forceSync) throws IOException {
145146
public ValueStore(File dataDir, boolean forceSync, int valueCacheSize, int valueIDCacheSize, int namespaceCacheSize,
146147
int namespaceIDCacheSize) throws IOException {
147148
super();
148-
dataStore = new DataStore(dataDir, FILENAME_PREFIX, forceSync);
149+
dataStore = new DataStore(dataDir, FILENAME_PREFIX, forceSync, this);
149150

150151
valueCache = new ConcurrentCache<>(valueCacheSize);
151152
valueIDCache = new ConcurrentCache<>(valueIDCacheSize);
@@ -194,15 +195,31 @@ public NativeValue getValue(int id) throws IOException {
194195
NativeValue resultValue = valueCache.get(cacheID);
195196

196197
if (resultValue == null) {
197-
// Value not in cache, fetch it from file
198-
byte[] data = dataStore.getData(id);
199-
200-
if (data != null) {
201-
resultValue = data2value(id, data);
202-
203-
if (!(resultValue instanceof CorruptValue)) {
204-
// Store value in cache
205-
valueCache.put(cacheID, resultValue);
198+
try {
199+
// Value not in cache, fetch it from file
200+
byte[] data = dataStore.getData(id);
201+
if (data != null) {
202+
resultValue = data2value(id, data);
203+
if (!(resultValue instanceof CorruptValue)) {
204+
// Store value in cache
205+
valueCache.put(cacheID, resultValue);
206+
}
207+
}
208+
} catch (RecoveredDataException rde) {
209+
byte[] recovered = rde.getData();
210+
if (recovered != null && recovered.length > 0) {
211+
byte t = recovered[0];
212+
if (t == URI_VALUE) {
213+
resultValue = new CorruptIRI(revision, id, null, recovered);
214+
} else if (t == BNODE_VALUE) {
215+
resultValue = new CorruptIRIOrBNode(revision, id, recovered);
216+
} else if (t == LITERAL_VALUE) {
217+
resultValue = new CorruptLiteral(revision, id, recovered);
218+
} else {
219+
resultValue = new CorruptUnknownValue(revision, id, recovered);
220+
}
221+
} else {
222+
resultValue = new CorruptUnknownValue(revision, id, recovered);
206223
}
207224
}
208225
}
@@ -434,21 +451,30 @@ public void close() throws IOException {
434451
public void checkConsistency() throws SailException, IOException {
435452
int maxID = dataStore.getMaxID();
436453
for (int id = 1; id <= maxID; id++) {
437-
byte[] data = dataStore.getData(id);
438-
if (isNamespaceData(data)) {
439-
String namespace = data2namespace(data);
440-
try {
441-
if (id == getNamespaceID(namespace, false)
442-
&& java.net.URI.create(namespace + "part").isAbsolute()) {
443-
continue;
454+
try {
455+
byte[] data = dataStore.getData(id);
456+
if (isNamespaceData(data)) {
457+
String namespace = data2namespace(data);
458+
try {
459+
if (id == getNamespaceID(namespace, false)
460+
&& java.net.URI.create(namespace + "part").isAbsolute()) {
461+
continue;
462+
}
463+
} catch (IllegalArgumentException e) {
464+
// throw SailException
465+
}
466+
throw new SailException(
467+
"Store must be manually exported and imported to fix namespaces like " + namespace);
468+
} else {
469+
Value value = this.data2value(id, data);
470+
if (id != this.getID(copy(value))) {
471+
throw new SailException(
472+
"Store must be manually exported and imported to merge values like " + value);
444473
}
445-
} catch (IllegalArgumentException e) {
446-
// throw SailException
447474
}
448-
throw new SailException(
449-
"Store must be manually exported and imported to fix namespaces like " + namespace);
450-
} else {
451-
Value value = this.data2value(id, data);
475+
} catch (RecoveredDataException rde) {
476+
// Treat as a corrupt unknown value during consistency check
477+
Value value = new CorruptUnknownValue(revision, id, rde.getData());
452478
if (id != this.getID(copy(value))) {
453479
throw new SailException(
454480
"Store must be manually exported and imported to merge values like " + value);
@@ -584,7 +610,8 @@ private boolean isNamespaceData(byte[] data) {
584610
return data[0] != URI_VALUE && data[0] != BNODE_VALUE && data[0] != LITERAL_VALUE;
585611
}
586612

587-
private NativeValue data2value(int id, byte[] data) throws IOException {
613+
@InternalUseOnly
614+
public NativeValue data2value(int id, byte[] data) throws IOException {
588615
if (data.length == 0) {
589616
if (SOFT_FAIL_ON_CORRUPT_DATA_AND_REPAIR_INDEXES) {
590617
logger.error("Soft fail on corrupt data: Empty data array for value with id {}", id);
@@ -704,8 +731,12 @@ private String getNamespace(int id) throws IOException {
704731
String namespace = namespaceCache.get(cacheID);
705732

706733
if (namespace == null) {
707-
byte[] namespaceData = dataStore.getData(id);
708-
namespace = data2namespace(namespaceData);
734+
try {
735+
byte[] namespaceData = dataStore.getData(id);
736+
namespace = data2namespace(namespaceData);
737+
} catch (RecoveredDataException rde) {
738+
namespace = data2namespace(rde.getData());
739+
}
709740

710741
namespaceCache.put(cacheID, namespace);
711742
}
@@ -829,13 +860,18 @@ public static void main(String[] args) throws Exception {
829860

830861
int maxID = valueStore.dataStore.getMaxID();
831862
for (int id = 1; id <= maxID; id++) {
832-
byte[] data = valueStore.dataStore.getData(id);
833-
if (valueStore.isNamespaceData(data)) {
834-
String ns = valueStore.data2namespace(data);
835-
System.out.println("[" + id + "] " + ns);
836-
} else {
837-
Value value = valueStore.data2value(id, data);
838-
System.out.println("[" + id + "] " + value.toString());
863+
try {
864+
byte[] data = valueStore.dataStore.getData(id);
865+
if (valueStore.isNamespaceData(data)) {
866+
String ns = valueStore.data2namespace(data);
867+
System.out.println("[" + id + "] " + ns);
868+
} else {
869+
Value value = valueStore.data2value(id, data);
870+
System.out.println("[" + id + "] " + value.toString());
871+
}
872+
} catch (RecoveredDataException rde) {
873+
System.out.println("[" + id + "] CorruptUnknownValue:"
874+
+ new CorruptUnknownValue(valueStore.revision, id, rde.getData()));
839875
}
840876
}
841877
}

core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/datastore/DataFile.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,30 @@ public File getFile() {
117117
return nioFile.getFile();
118118
}
119119

120+
/**
121+
* Returns the current file size (after flushing any pending writes).
122+
*/
123+
public long getFileSize() throws IOException {
124+
flush();
125+
return nioFileSize;
126+
}
127+
128+
/**
129+
* Attempts to recover data bytes between two known entry offsets when the length field at {@code startOffset} is
130+
* corrupt (e.g., zero). This returns up to {@code endOffset - startOffset - 4} bytes starting after the length
131+
* field, capped to a reasonable maximum to avoid large allocations.
132+
*/
133+
public byte[] tryRecoverBetweenOffsets(long startOffset, long endOffset) throws IOException {
134+
flush();
135+
if (endOffset <= startOffset + 4) {
136+
return new byte[0];
137+
}
138+
long available = endOffset - (startOffset + 4);
139+
int cap = 32 * 1024 * 1024; // 32MB cap for recovery
140+
int toRead = (int) Math.min(Math.max(available, 0), cap);
141+
return nioFile.readBytes(startOffset + 4L, toRead);
142+
}
143+
120144
/**
121145
* Stores the specified data and returns the byte-offset at which it has been stored.
122146
*

core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/datastore/DataStore.java

Lines changed: 116 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,16 @@
1313
import java.io.Closeable;
1414
import java.io.File;
1515
import java.io.IOException;
16+
import java.nio.charset.StandardCharsets;
1617
import java.util.Arrays;
1718
import java.util.zip.CRC32;
1819

1920
import org.eclipse.rdf4j.common.io.ByteArrayUtil;
21+
import org.eclipse.rdf4j.sail.nativerdf.NativeStore;
22+
import org.eclipse.rdf4j.sail.nativerdf.ValueStore;
23+
import org.eclipse.rdf4j.sail.nativerdf.model.NativeValue;
24+
import org.slf4j.Logger;
25+
import org.slf4j.LoggerFactory;
2026

2127
/**
2228
* Class that provides indexed storage and retrieval of arbitrary length data.
@@ -35,6 +41,9 @@ public class DataStore implements Closeable {
3541

3642
private final HashFile hashFile;
3743

44+
private static final Logger logger = LoggerFactory.getLogger(DataStore.class);
45+
private ValueStore valueStore;
46+
3847
/*--------------*
3948
* Constructors *
4049
*--------------*/
@@ -49,6 +58,11 @@ public DataStore(File dataDir, String filePrefix, boolean forceSync) throws IOEx
4958
hashFile = new HashFile(new File(dataDir, filePrefix + ".hash"), forceSync);
5059
}
5160

61+
public DataStore(File dataDir, String filePrefix, boolean forceSync, ValueStore valueStore) throws IOException {
62+
this(dataDir, filePrefix, forceSync);
63+
this.valueStore = valueStore;
64+
}
65+
5266
/*---------*
5367
* Methods *
5468
*---------*/
@@ -67,7 +81,108 @@ public byte[] getData(int id) throws IOException {
6781
long offset = idFile.getOffset(id);
6882

6983
if (offset != 0L) {
70-
return dataFile.getData(offset);
84+
byte[] data = dataFile.getData(offset);
85+
if (data.length == 0 && NativeStore.SOFT_FAIL_ON_CORRUPT_DATA_AND_REPAIR_INDEXES) {
86+
try {
87+
long offsetNoCache = idFile.getOffsetNoCache(id);
88+
if (offset != offsetNoCache) {
89+
logger.error("IDFile cache mismatch for id {}: cached={}, raw={}. Using raw.", id, offset,
90+
offsetNoCache);
91+
offset = offsetNoCache;
92+
data = dataFile.getData(offset);
93+
}
94+
} catch (IOException e) {
95+
// If raw read fails, keep cached offset
96+
}
97+
98+
// Attempt recovery by using neighboring offsets to infer the bounds
99+
long startData = offset + 4; // default start if no previous valid entry
100+
// Find previous entry end: prevOffset + 4 + prevLength
101+
int prev = id - 1;
102+
for (; prev >= 1; prev--) {
103+
long po = idFile.getOffset(prev);
104+
try {
105+
long poRaw = idFile.getOffsetNoCache(prev);
106+
if (po != poRaw) {
107+
logger.error("IDFile cache mismatch for prev id {}: cached={}, raw={}. Using raw.", prev,
108+
po, poRaw);
109+
po = poRaw;
110+
}
111+
} catch (IOException e) {
112+
// use cached po if raw read fails
113+
}
114+
if (po > 0L) {
115+
try {
116+
byte[] prevData = dataFile.getData(po);
117+
if (prevData != null && prevData.length > 0) {
118+
try {
119+
if (valueStore != null && Thread.currentThread().getStackTrace().length < 512) {
120+
NativeValue nativeValue = valueStore.data2value(prev, prevData);
121+
logger.warn("Data in previous ID ({}) is: {}", prev, nativeValue);
122+
} else {
123+
logger.warn("Data in previous ID ({}) is: {}", prev,
124+
new String(prevData, StandardCharsets.UTF_8));
125+
}
126+
} catch (Exception ignored) {
127+
}
128+
startData = po + 4L + prevData.length;
129+
break;
130+
}
131+
} catch (Exception ignored) {
132+
}
133+
}
134+
}
135+
136+
// Find next entry start as the end bound
137+
long endOffset = 0L;
138+
int maxId = idFile.getMaxID();
139+
int next = id + 1;
140+
for (; next <= maxId; next++) {
141+
long no = idFile.getOffset(next);
142+
try {
143+
long noRaw = idFile.getOffsetNoCache(next);
144+
if (no != noRaw) {
145+
logger.error("IDFile cache mismatch for next id {}: cached={}, raw={}. Using raw.", next,
146+
no, noRaw);
147+
no = noRaw;
148+
}
149+
} catch (IOException e) {
150+
// use cached value if raw read fails
151+
}
152+
if (no > 0L) {
153+
154+
try {
155+
byte[] nextData = dataFile.getData(no);
156+
if (nextData != null && nextData.length > 0) {
157+
try {
158+
if (valueStore != null && Thread.currentThread().getStackTrace().length < 512) {
159+
NativeValue nativeValue = valueStore.data2value(next, nextData);
160+
logger.warn("Data in next ID ({}) is: {}", next, nativeValue);
161+
} else {
162+
logger.warn("Data in next ID ({}) is: {}", next,
163+
new String(nextData, StandardCharsets.UTF_8));
164+
}
165+
} catch (Exception ignored) {
166+
}
167+
endOffset = no;
168+
break;
169+
}
170+
} catch (Exception e) {
171+
}
172+
173+
}
174+
}
175+
if (endOffset == 0L) {
176+
// Fallback: use current file size as end bound
177+
endOffset = dataFile.getFileSize();
178+
}
179+
if (endOffset > startData) {
180+
// tryRecoverBetweenOffsets expects an offset to a 4-byte length, so pass (startData - 4)
181+
byte[] recovered = dataFile.tryRecoverBetweenOffsets(Math.max(0L, startData - 4L), endOffset);
182+
throw new RecoveredDataException(id, recovered);
183+
}
184+
}
185+
return data;
71186
}
72187

73188
return null;

core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/datastore/IDFile.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,19 @@ public long getOffset(int id) throws IOException {
230230
return nioFile.readLong(ITEM_SIZE * id);
231231
}
232232

233+
/**
234+
* Gets the offset directly from the underlying file, bypassing any caches. Useful for validating cached results
235+
* when diagnosing corruption.
236+
*
237+
* @param id The ID to get the offset for, must be larger than 0.
238+
* @return the raw offset stored for the id
239+
* @throws IOException if an I/O error occurs
240+
*/
241+
public long getOffsetNoCache(int id) throws IOException {
242+
assert id > 0 : "id must be larger than 0, is: " + id;
243+
return nioFile.readLong(ITEM_SIZE * id);
244+
}
245+
233246
/**
234247
* Discards all stored data.
235248
*

0 commit comments

Comments
 (0)