Skip to content

Commit 58e6c8c

Browse files
committed
wip
1 parent 61ad787 commit 58e6c8c

File tree

3 files changed

+197
-123
lines changed

3 files changed

+197
-123
lines changed
Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
/*******************************************************************************
2+
* Copyright (c) 2025 Eclipse RDF4J contributors.
3+
*
4+
* All rights reserved. This program and the accompanying materials
5+
* are made available under the terms of the Eclipse Distribution License v1.0
6+
* which accompanies this distribution, and is available at
7+
* http://www.eclipse.org/org/documents/edl-v10.php.
8+
*
9+
* SPDX-License-Identifier: BSD-3-Clause
10+
*******************************************************************************/
11+
package org.eclipse.rdf4j.sail.nativerdf;
12+
13+
import static java.nio.charset.StandardCharsets.US_ASCII;
14+
15+
import java.io.File;
16+
import java.io.IOException;
17+
import java.nio.ByteBuffer;
18+
import java.nio.MappedByteBuffer;
19+
import java.nio.channels.FileChannel;
20+
import java.nio.file.Files;
21+
import java.nio.file.StandardOpenOption;
22+
import java.util.EnumSet;
23+
24+
import org.eclipse.rdf4j.common.annotation.Experimental;
25+
26+
/**
27+
* Writes transaction statuses to a memory-mapped file. Since the OS is responsible for flushing changes to disk, this
28+
* is generally faster than using regular file I/O. If the JVM crashes, the last written status should still be intact,
29+
* but the change will not be visible until the OS has flushed the page to disk. If the OS or DISK crashes, data may be
30+
* lost or corrupted. Same for power loss. This can be mitigated by setting the {@link #ALWAYS_FORCE_SYNC_PROP} system
31+
* property to true, which forces a sync to disk on every status change.
32+
*/
33+
@Experimental
34+
class MemoryMappedTxnStatusFile extends TxnStatusFile {
35+
36+
/**
37+
* The name of the transaction status file.
38+
*/
39+
public static final String FILE_NAME = "txn-status";
40+
41+
/**
42+
* We currently store a single status byte, but this constant makes it trivial to extend the layout later if needed.
43+
*/
44+
private static final int MAPPED_SIZE = 1;
45+
46+
private static final String ALWAYS_FORCE_SYNC_PROP = "org.eclipse.rdf4j.sail.nativerdf.MemoryMappedTxnStatusFile.alwaysForceSync";
47+
48+
static boolean ALWAYS_FORCE_SYNC = Boolean.getBoolean(ALWAYS_FORCE_SYNC_PROP);
49+
50+
private final File statusFile;
51+
private final FileChannel channel;
52+
private final MappedByteBuffer mapped;
53+
54+
/**
55+
* Creates a new transaction status file. New files are initialized with {@link TxnStatus#NONE}.
56+
*
57+
* @param dataDir The directory for the transaction status file.
58+
* @throws IOException If the file could not be opened or created.
59+
*/
60+
public MemoryMappedTxnStatusFile(File dataDir) throws IOException {
61+
super();
62+
this.statusFile = new File(dataDir, FILE_NAME);
63+
64+
ALWAYS_FORCE_SYNC = !Boolean.getBoolean(ALWAYS_FORCE_SYNC_PROP);
65+
66+
EnumSet<StandardOpenOption> openOptions = EnumSet.of(StandardOpenOption.READ, StandardOpenOption.WRITE,
67+
StandardOpenOption.CREATE);
68+
69+
this.channel = FileChannel.open(statusFile.toPath(), openOptions.toArray(new StandardOpenOption[0]));
70+
71+
long size = channel.size();
72+
73+
// Ensure the file is at least MAPPED_SIZE bytes so we can map it safely.
74+
// If it was previously empty, we treat that as NONE (which is also byte 0).
75+
if (size < MAPPED_SIZE) {
76+
channel.position(MAPPED_SIZE - 1);
77+
int write = channel.write(ByteBuffer.wrap(TxnStatus.NONE.getOnDisk()));
78+
if (write != 1) {
79+
throw new IOException("Failed to initialize transaction status file");
80+
}
81+
channel.force(true);
82+
}
83+
84+
this.mapped = channel.map(FileChannel.MapMode.READ_WRITE, 0, MAPPED_SIZE);
85+
}
86+
87+
public void close() throws IOException {
88+
// We rely on the GC to eventually unmap the MappedByteBuffer; explicitly
89+
// closing the channel is enough for our purposes here.
90+
channel.close();
91+
}
92+
93+
/**
94+
* Writes the specified transaction status to file.
95+
*
96+
* @param txnStatus The transaction status to write.
97+
* @param forceSync If true, forces a sync to disk after writing the status.
98+
*/
99+
public void setTxnStatus(TxnStatus txnStatus, boolean forceSync) {
100+
if (disabled) {
101+
return;
102+
}
103+
104+
mapped.put(0, txnStatus.getOnDisk()[0]);
105+
if (ALWAYS_FORCE_SYNC || forceSync) {
106+
mapped.force();
107+
}
108+
}
109+
110+
/**
111+
* Reads the transaction status from file.
112+
*
113+
* @return The read transaction status, or {@link TxnStatus#UNKNOWN} when the file contains an unrecognized status
114+
* string.
115+
* @throws IOException If the transaction status file could not be read.
116+
*/
117+
public TxnStatus getTxnStatus() throws IOException {
118+
if (disabled) {
119+
return TxnStatus.NONE;
120+
}
121+
122+
try {
123+
return statusMapping[mapped.get(0)];
124+
} catch (IndexOutOfBoundsException e) {
125+
return getTxnStatusDeprecated();
126+
}
127+
}
128+
129+
private TxnStatus getTxnStatusDeprecated() throws IOException {
130+
if (disabled) {
131+
return TxnStatus.NONE;
132+
}
133+
134+
// Read the full file contents as a string, for compatibility with very old
135+
// versions that stored the enum name instead of a bitfield.
136+
byte[] bytes = Files.readAllBytes(statusFile.toPath());
137+
138+
if (bytes.length == 0) {
139+
return TxnStatus.NONE;
140+
}
141+
142+
String s = new String(bytes, US_ASCII);
143+
try {
144+
return TxnStatus.valueOf(s);
145+
} catch (IllegalArgumentException e) {
146+
// use platform encoding for backwards compatibility with versions
147+
// older than 2.6.6:
148+
s = new String(bytes);
149+
try {
150+
return TxnStatus.valueOf(s);
151+
} catch (IllegalArgumentException e2) {
152+
return TxnStatus.UNKNOWN;
153+
}
154+
}
155+
}
156+
}

core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/TxnStatusFile.java

Lines changed: 41 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,11 @@
1212

1313
import static java.nio.charset.StandardCharsets.US_ASCII;
1414

15+
import java.io.EOFException;
1516
import java.io.File;
1617
import java.io.IOException;
17-
import java.nio.ByteBuffer;
18-
import java.nio.MappedByteBuffer;
19-
import java.nio.channels.FileChannel;
20-
import java.nio.file.Files;
21-
import java.nio.file.StandardOpenOption;
22-
import java.util.EnumSet;
18+
19+
import org.eclipse.rdf4j.common.io.NioFile;
2320

2421
/**
2522
* Writes transaction statuses to a file.
@@ -71,13 +68,13 @@ byte[] getOnDisk() {
7168
return onDisk;
7269
}
7370

74-
private static final byte NONE_BYTE = (byte) 0b00000000;
75-
private static final byte OLD_NONE_BYTE = (byte) 0b00000001;
71+
static final byte NONE_BYTE = (byte) 0b00000000;
72+
static final byte OLD_NONE_BYTE = (byte) 0b00000001;
7673

77-
private static final byte ACTIVE_BYTE = (byte) 0b00000010;
78-
private static final byte COMMITTING_BYTE = (byte) 0b00000100;
79-
private static final byte ROLLING_BACK_BYTE = (byte) 0b00001000;
80-
private static final byte UNKNOWN_BYTE = (byte) 0b00010000;
74+
static final byte ACTIVE_BYTE = (byte) 0b00000010;
75+
static final byte COMMITTING_BYTE = (byte) 0b00000100;
76+
static final byte ROLLING_BACK_BYTE = (byte) 0b00001000;
77+
static final byte UNKNOWN_BYTE = (byte) 0b00010000;
8178

8279
}
8380

@@ -86,74 +83,49 @@ byte[] getOnDisk() {
8683
*/
8784
public static final String FILE_NAME = "txn-status";
8885

89-
/**
90-
* We currently store a single status byte, but this constant makes it trivial to extend the layout later if needed.
91-
*/
92-
private static final int MAPPED_SIZE = 1;
93-
94-
private static final String DISABLE_DSYNC_PROPERTY = "org.eclipse.rdf4j.sail.nativerdf.disableTxnStatusDsync";
95-
96-
static boolean DISABLE_DSYNC = Boolean.getBoolean(DISABLE_DSYNC_PROPERTY);
97-
98-
private final File statusFile;
99-
private final FileChannel channel;
100-
private final MappedByteBuffer mapped;
86+
private final NioFile nioFile;
10187

10288
/**
10389
* Creates a new transaction status file. New files are initialized with {@link TxnStatus#NONE}.
10490
*
10591
* @param dataDir The directory for the transaction status file.
106-
* @throws IOException If the file could not be opened or created.
92+
* @throws IOException If the file did not yet exist and could not be written to.
10793
*/
10894
public TxnStatusFile(File dataDir) throws IOException {
109-
this.statusFile = new File(dataDir, FILE_NAME);
110-
111-
DISABLE_DSYNC = !Boolean.getBoolean(DISABLE_DSYNC_PROPERTY);
112-
113-
EnumSet<StandardOpenOption> openOptions = EnumSet.of(StandardOpenOption.READ, StandardOpenOption.WRITE,
114-
StandardOpenOption.CREATE);
115-
if (!DISABLE_DSYNC) {
116-
openOptions.add(StandardOpenOption.DSYNC);
117-
}
118-
119-
this.channel = FileChannel.open(statusFile.toPath(), openOptions.toArray(new StandardOpenOption[0]));
120-
121-
long size = channel.size();
122-
123-
// Ensure the file is at least MAPPED_SIZE bytes so we can map it safely.
124-
// If it was previously empty, we treat that as NONE (which is also byte 0).
125-
if (size < MAPPED_SIZE) {
126-
channel.position(MAPPED_SIZE - 1);
127-
int write = channel.write(ByteBuffer.wrap(TxnStatus.NONE.getOnDisk()));
128-
if (write != 1) {
129-
throw new IOException("Failed to initialize transaction status file");
130-
}
131-
channel.force(true);
132-
}
95+
File statusFile = new File(dataDir, FILE_NAME);
96+
nioFile = new NioFile(statusFile, "rwd");
97+
}
13398

134-
this.mapped = channel.map(FileChannel.MapMode.READ_WRITE, 0, MAPPED_SIZE);
99+
public TxnStatusFile() {
100+
nioFile = null;
135101
}
136102

137103
public void close() throws IOException {
138-
// We rely on the GC to eventually unmap the MappedByteBuffer; explicitly
139-
// closing the channel is enough for our purposes here.
140-
channel.close();
104+
if (nioFile != null) {
105+
nioFile.close();
106+
}
141107
}
142108

143109
/**
144110
* Writes the specified transaction status to file.
145111
*
146112
* @param txnStatus The transaction status to write.
147-
* @param forceSync
113+
* @throws IOException If the transaction status could not be written to file.
148114
*/
149-
public void setTxnStatus(TxnStatus txnStatus, boolean forceSync) {
115+
public void setTxnStatus(TxnStatus txnStatus, boolean force) throws IOException {
150116
if (disabled) {
151117
return;
152118
}
119+
if (txnStatus == TxnStatus.NONE) {
120+
// noinspection DataFlowIssue
121+
nioFile.truncate(0);
122+
} else {
123+
// noinspection DataFlowIssue
124+
nioFile.writeBytes(txnStatus.onDisk, 0);
125+
}
153126

154-
mapped.put(0, txnStatus.getOnDisk()[0]);
155-
if (forceSync) {
156-
mapped.force();
127+
if (force) {
128+
nioFile.force(false);
157129
}
158130
}
159131

@@ -168,16 +140,21 @@ public TxnStatus getTxnStatus() throws IOException {
168140
if (disabled) {
169141
return TxnStatus.NONE;
170142
}
171-
172-
byte b = mapped.get(0);
173143
try {
174-
return statusMapping[b];
144+
// noinspection DataFlowIssue
145+
return statusMapping[nioFile.readBytes(0, 1)[0]];
146+
} catch (EOFException e) {
147+
// empty file = NONE status
148+
return TxnStatus.NONE;
175149
} catch (IndexOutOfBoundsException e) {
150+
// fall back to deprecated reading method
176151
return getTxnStatusDeprecated();
177152
}
153+
178154
}
179155

180-
private final static TxnStatus[] statusMapping = new TxnStatus[17];
156+
final static TxnStatus[] statusMapping = new TxnStatus[17];
157+
181158
static {
182159
statusMapping[TxnStatus.NONE_BYTE] = TxnStatus.NONE;
183160
statusMapping[TxnStatus.OLD_NONE_BYTE] = TxnStatus.NONE;
@@ -192,13 +169,8 @@ private TxnStatus getTxnStatusDeprecated() throws IOException {
192169
return TxnStatus.NONE;
193170
}
194171

195-
// Read the full file contents as a string, for compatibility with very old
196-
// versions that stored the enum name instead of a bitfield.
197-
byte[] bytes = Files.readAllBytes(statusFile.toPath());
198-
199-
if (bytes.length == 0) {
200-
return TxnStatus.NONE;
201-
}
172+
// noinspection DataFlowIssue
173+
byte[] bytes = nioFile.readBytes(0, (int) nioFile.size());
202174

203175
String s = new String(bytes, US_ASCII);
204176
try {

core/sail/nativerdf/src/test/java/org/eclipse/rdf4j/sail/nativerdf/TxnStatusFileDsyncTest.java

Lines changed: 0 additions & 54 deletions
This file was deleted.

0 commit comments

Comments
 (0)