Skip to content

Commit ff9312f

Browse files
authored
Add option to start a db write transaction without fsyncing (ThinkParQ/beegfs-rs#184)
2 parents 06f7809 + ed6108f commit ff9312f

File tree

4 files changed

+67
-16
lines changed

4 files changed

+67
-16
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

mgmtd/src/bee_msg/node.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ async fn update_node(msg: RegisterNode, ctx: &Context) -> Result<NodeId> {
170170

171171
let (node, meta_root, is_new) = ctx
172172
.db
173-
.write_tx(move |tx| {
173+
.write_tx_no_sync(move |tx| {
174174
let node = if msg.node_id == 0 {
175175
// No node ID given => new node
176176
None

sqlite/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ publish.workspace = true
99

1010
[dependencies]
1111
anyhow.workspace = true
12+
log.workspace = true
1213
rusqlite = { workspace = true, features = ["backup"] }
13-
tokio = { workspace = true }
14+
tokio.workspace = true
1415

sqlite/src/connection.rs

Lines changed: 63 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,11 @@ pub fn setup_connection(conn: &rusqlite::Connection) -> rusqlite::Result<()> {
2626
// Note that the WAL is merged into the main db file automatically by SQLite after it has
2727
// reached a certain size and on the last connection being closed. This could be configured or
2828
// even disabled so we can run it manually.
29-
conn.pragma_update(None, "journal_mode", "WAL")?;
29+
conn.pragma_update(None, "journal_mode", "wal")?;
30+
// Default to fsync after each transaction. If this is changed, make sure to update the
31+
// transaction methods below to match the change, especially in run_op() where a temporary
32+
// change happens.
33+
conn.pragma_update(None, "synchronous", "full")?;
3034

3135
Ok(())
3236
}
@@ -63,6 +67,12 @@ impl Deref for Connections {
6367
}
6468
}
6569

70+
#[derive(Debug, Clone, Copy)]
71+
pub enum SyncMode {
72+
Full,
73+
Normal,
74+
}
75+
6676
#[derive(Debug)]
6777
pub struct InnerConnections {
6878
conns: Mutex<Vec<Connection>>,
@@ -89,7 +99,27 @@ impl Connections {
8999
&self,
90100
op: T,
91101
) -> Result<R> {
92-
self.run_op(|conn| {
102+
self.run_op(SyncMode::Full, move |conn| {
103+
let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
104+
let res = op(&tx)?;
105+
tx.commit()?;
106+
107+
Ok(res)
108+
})
109+
.await
110+
}
111+
112+
/// Same as `write_tx()`, but changes sqlite sync mode temporarily from `full` to `normal` to
113+
/// avoid syncing the transaction to disk immediately. Meant for transactions that can cause
114+
/// heavy load on bigger systems and are not that critical if they get lost.
115+
pub async fn write_tx_no_sync<
116+
T: Send + 'static + FnOnce(&Transaction) -> Result<R>,
117+
R: Send + 'static,
118+
>(
119+
&self,
120+
op: T,
121+
) -> Result<R> {
122+
self.run_op(SyncMode::Normal, move |conn| {
93123
let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
94124
let res = op(&tx)?;
95125
tx.commit()?;
@@ -111,7 +141,7 @@ impl Connections {
111141
&self,
112142
op: T,
113143
) -> Result<R> {
114-
self.run_op(|conn| {
144+
self.run_op(SyncMode::Full, move |conn| {
115145
let tx = conn.transaction_with_behavior(TransactionBehavior::Deferred)?;
116146
let res = op(&tx)?;
117147
tx.commit()?;
@@ -132,11 +162,12 @@ impl Connections {
132162
&self,
133163
op: T,
134164
) -> Result<R> {
135-
self.run_op(op).await
165+
self.run_op(SyncMode::Full, op).await
136166
}
137167

138168
async fn run_op<T: Send + 'static + FnOnce(&mut Connection) -> Result<R>, R: Send + 'static>(
139169
&self,
170+
sync_mode: SyncMode,
140171
op: T,
141172
) -> Result<R> {
142173
let this = self.clone();
@@ -155,16 +186,34 @@ impl Connections {
155186
open(this.db_file.as_path())?
156187
};
157188

158-
let res = op(&mut conn);
159-
160-
// Push the connection to the stack
161-
// We assume that sqlite connections never invalidate on errors, so there is no need to
162-
// drop them. There might be severe cases where connections don't work anymore (e.g.
163-
// one removing or corrupting the database file, the file system breaks, ...), but these
164-
// are unrecoverable anyway and new connections won't fix anything there.
165-
this.conns.lock().unwrap().push(conn);
166-
167-
res
189+
match sync_mode {
190+
SyncMode::Full => {
191+
let res = op(&mut conn);
192+
// Push the connection to the stack
193+
// We assume that sqlite connections never invalidate on errors, so there is no
194+
// need to drop them. There might be severe cases where
195+
// connections don't work anymore (e.g. one removing or
196+
// corrupting the database file, the file system breaks, ...), but these
197+
// are unrecoverable anyway and new connections won't fix anything there.
198+
this.conns.lock().unwrap().push(conn);
199+
200+
res
201+
}
202+
SyncMode::Normal => {
203+
conn.pragma_update(None, "synchronous", "normal")?;
204+
let res = op(&mut conn);
205+
// If the sync mode could not be reset (should most likely never happen), we
206+
// don't error out as the transaction already completed.
207+
// Instead we just dorop it to prevent future usage with FULL mode.
208+
if conn.pragma_update(None, "synchronous", "full").is_ok() {
209+
this.conns.lock().unwrap().push(conn);
210+
} else {
211+
log::error!("Failed to change db connection sync mode back to full");
212+
}
213+
214+
res
215+
}
216+
}
168217
})
169218
.await?
170219
}

0 commit comments

Comments
 (0)