1212
1313import java .io .File ;
1414import java .io .IOException ;
15+ import java .nio .charset .StandardCharsets ;
16+ import java .nio .file .Files ;
17+ import java .nio .file .Path ;
18+ import java .nio .file .StandardOpenOption ;
1519import java .util .ArrayList ;
1620import java .util .Collections ;
1721import java .util .LinkedHashSet ;
1822import java .util .List ;
1923import java .util .Map ;
2024import java .util .Map .Entry ;
2125import java .util .Objects ;
26+ import java .util .OptionalLong ;
2227import java .util .Set ;
28+ import java .util .UUID ;
2329import java .util .concurrent .atomic .AtomicBoolean ;
2430import java .util .concurrent .locks .ReentrantLock ;
2531
4652import org .eclipse .rdf4j .sail .base .SailStore ;
4753import org .eclipse .rdf4j .sail .nativerdf .btree .RecordIterator ;
4854import org .eclipse .rdf4j .sail .nativerdf .model .NativeValue ;
55+ import org .eclipse .rdf4j .sail .nativerdf .wal .ValueStoreWAL ;
56+ import org .eclipse .rdf4j .sail .nativerdf .wal .WalConfig ;
4957import org .slf4j .Logger ;
5058import org .slf4j .LoggerFactory ;
5159
@@ -60,6 +68,8 @@ class NativeSailStore implements SailStore {
6068
6169 private final TripleStore tripleStore ;
6270
71+ private final ValueStoreWAL valueStoreWal ;
72+
6373 private final ValueStore valueStore ;
6474
6575 private final NamespaceStore namespaceStore ;
@@ -91,21 +101,96 @@ public NativeSailStore(File dataDir, String tripleIndexes) throws IOException, S
91101 */
92102 public NativeSailStore (File dataDir , String tripleIndexes , boolean forceSync , int valueCacheSize ,
93103 int valueIDCacheSize , int namespaceCacheSize , int namespaceIDCacheSize ) throws IOException , SailException {
104+ NamespaceStore createdNamespaceStore = null ;
105+ ValueStoreWAL createdWal = null ;
106+ ValueStore createdValueStore = null ;
107+ TripleStore createdTripleStore = null ;
108+ ContextStore createdContextStore = null ;
94109 boolean initialized = false ;
95110 try {
96- namespaceStore = new NamespaceStore (dataDir );
97- valueStore = new ValueStore (dataDir , forceSync , valueCacheSize , valueIDCacheSize , namespaceCacheSize ,
98- namespaceIDCacheSize );
99- tripleStore = new TripleStore (dataDir , tripleIndexes , forceSync );
100- contextStore = new ContextStore (this , dataDir );
111+ createdNamespaceStore = new NamespaceStore (dataDir );
112+ Path walDir = dataDir .toPath ().resolve ("wal" );
113+ String storeUuid = loadOrCreateWalUuid (walDir );
114+ WalConfig walConfig = WalConfig .builder ()
115+ .walDirectory (walDir )
116+ .storeUuid (storeUuid )
117+ .build ();
118+ createdWal = ValueStoreWAL .open (walConfig );
119+ createdValueStore = new ValueStore (dataDir , forceSync , valueCacheSize , valueIDCacheSize ,
120+ namespaceCacheSize , namespaceIDCacheSize , createdWal );
121+ createdTripleStore = new TripleStore (dataDir , tripleIndexes , forceSync );
122+ createdContextStore = new ContextStore (this , dataDir );
101123 initialized = true ;
102124 } finally {
103125 if (!initialized ) {
104- close ();
126+ closeQuietly (createdContextStore );
127+ closeQuietly (createdTripleStore );
128+ closeQuietly (createdValueStore );
129+ closeQuietly (createdWal );
130+ closeQuietly (createdNamespaceStore );
131+ }
132+ }
133+ namespaceStore = createdNamespaceStore ;
134+ valueStoreWal = createdWal ;
135+ valueStore = createdValueStore ;
136+ tripleStore = createdTripleStore ;
137+ contextStore = createdContextStore ;
138+ }
139+
140+ private String loadOrCreateWalUuid (Path walDir ) throws IOException {
141+ Files .createDirectories (walDir );
142+ Path file = walDir .resolve ("store.uuid" );
143+ if (Files .exists (file )) {
144+ return Files .readString (file , StandardCharsets .UTF_8 ).trim ();
145+ }
146+ String uuid = UUID .randomUUID ().toString ();
147+ Files .writeString (file , uuid , StandardCharsets .UTF_8 , StandardOpenOption .CREATE ,
148+ StandardOpenOption .TRUNCATE_EXISTING );
149+ return uuid ;
150+ }
151+
152+ private void closeQuietly (ContextStore store ) {
153+ if (store != null ) {
154+ store .close ();
155+ }
156+ }
157+
158+ private void closeQuietly (TripleStore store ) {
159+ if (store != null ) {
160+ try {
161+ store .close ();
162+ } catch (IOException e ) {
163+ logger .warn ("Failed to close triple store" , e );
164+ }
165+ }
166+ }
167+
168+ private void closeQuietly (ValueStore store ) {
169+ if (store != null ) {
170+ try {
171+ store .close ();
172+ } catch (IOException e ) {
173+ logger .warn ("Failed to close value store" , e );
105174 }
106175 }
107176 }
108177
178+ private void closeQuietly (ValueStoreWAL wal ) {
179+ if (wal != null ) {
180+ try {
181+ wal .close ();
182+ } catch (IOException e ) {
183+ logger .warn ("Failed to close value store WAL" , e );
184+ }
185+ }
186+ }
187+
188+ private void closeQuietly (NamespaceStore store ) {
189+ if (store != null ) {
190+ store .close ();
191+ }
192+ }
193+
109194 @ Override
110195 public ValueFactory getValueFactory () {
111196 return valueStore ;
@@ -129,8 +214,14 @@ public void close() throws SailException {
129214 valueStore .close ();
130215 }
131216 } finally {
132- if (tripleStore != null ) {
133- tripleStore .close ();
217+ try {
218+ if (valueStoreWal != null ) {
219+ valueStoreWal .close ();
220+ }
221+ } finally {
222+ if (tripleStore != null ) {
223+ tripleStore .close ();
224+ }
134225 }
135226 }
136227 }
@@ -353,11 +444,22 @@ public NativeSailSink(boolean explicit) throws SailException {
353444 this .explicit = explicit ;
354445 }
355446
447+ private long walHighWaterMark = ValueStoreWAL .NO_LSN ;
448+
356449 @ Override
357450 public void close () {
358451 // no-op
359452 }
360453
454+ private int storeValueId (Value value ) throws IOException {
455+ int id = valueStore .storeValue (value );
456+ OptionalLong walLsn = valueStore .drainPendingWalHighWaterMark ();
457+ if (walLsn .isPresent ()) {
458+ walHighWaterMark = Math .max (walHighWaterMark , walLsn .getAsLong ());
459+ }
460+ return id ;
461+ }
462+
361463 @ Override
362464 public void prepare () throws SailException {
363465 // serializable is not supported at this level
@@ -368,6 +470,10 @@ public synchronized void flush() throws SailException {
368470 sinkStoreAccessLock .lock ();
369471 try {
370472 try {
473+ if (walHighWaterMark > ValueStoreWAL .NO_LSN ) {
474+ valueStore .awaitWalDurable (walHighWaterMark );
475+ walHighWaterMark = ValueStoreWAL .NO_LSN ;
476+ }
371477 valueStore .sync ();
372478 } finally {
373479 try {
@@ -472,13 +578,13 @@ public void approveAll(Set<Statement> approved, Set<Resource> approvedContexts)
472578 Value obj = statement .getObject ();
473579 Resource context = statement .getContext ();
474580
475- int subjID = valueStore . storeValue (subj );
476- int predID = valueStore . storeValue (pred );
477- int objID = valueStore . storeValue (obj );
581+ int subjID = storeValueId (subj );
582+ int predID = storeValueId (pred );
583+ int objID = storeValueId (obj );
478584
479585 int contextID = 0 ;
480586 if (context != null ) {
481- contextID = valueStore . storeValue (context );
587+ contextID = storeValueId (context );
482588 }
483589
484590 boolean wasNew = tripleStore .storeTriple (subjID , predID , objID , contextID , explicit );
@@ -532,9 +638,9 @@ private boolean addStatement(Resource subj, IRI pred, Value obj, boolean explici
532638 sinkStoreAccessLock .lock ();
533639 try {
534640 startTriplestoreTransaction ();
535- int subjID = valueStore . storeValue (subj );
536- int predID = valueStore . storeValue (pred );
537- int objID = valueStore . storeValue (obj );
641+ int subjID = storeValueId (subj );
642+ int predID = storeValueId (pred );
643+ int objID = storeValueId (obj );
538644
539645 if (contexts .length == 0 ) {
540646 contexts = new Resource [] { null };
@@ -543,7 +649,7 @@ private boolean addStatement(Resource subj, IRI pred, Value obj, boolean explici
543649 for (Resource context : contexts ) {
544650 int contextID = 0 ;
545651 if (context != null ) {
546- contextID = valueStore . storeValue (context );
652+ contextID = storeValueId (context );
547653 }
548654
549655 boolean wasNew = tripleStore .storeTriple (subjID , predID , objID , contextID , explicit );
0 commit comments