Skip to content

Commit 14ef584

Browse files
committed
enforce strict recency invariants during ingestion
1 parent 0abd2a4 commit 14ef584

File tree

5 files changed

+254
-34
lines changed

5 files changed

+254
-34
lines changed

src/error.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@ pub enum Error {
2020
/// Some required files could not be recovered from disk
2121
Unrecoverable,
2222

23+
/// Ingestion could not start because required preconditions were not met
24+
IngestionPreconditionFailed,
25+
2326
/// Checksum mismatch
2427
ChecksumMismatch {
2528
/// Checksum of loaded block

src/tree/ingest.rs

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use crate::{
88
SeqNo, UserKey, UserValue,
99
};
1010
use std::path::PathBuf;
11+
use std::sync::RwLockWriteGuard;
1112

1213
pub const INITIAL_CANONICAL_LEVEL: usize = 1;
1314

@@ -20,6 +21,7 @@ pub struct Ingestion<'a> {
2021
pub(crate) writer: MultiWriter,
2122
seqno: SeqNo,
2223
last_key: Option<UserKey>,
24+
_ingestion_guard: RwLockWriteGuard<'a, ()>,
2325
}
2426

2527
impl<'a> Ingestion<'a> {
@@ -29,6 +31,45 @@ impl<'a> Ingestion<'a> {
2931
///
3032
/// Will return `Err` if an IO error occurs.
3133
pub fn new(tree: &'a Tree) -> crate::Result<Self> {
34+
// Block concurrent writes and concurrent ingestions for the duration of this ingestion.
35+
let ingestion_guard = tree.0.ingestion_lock.write().expect("lock is poisoned");
36+
37+
// If the active memtable has data, flush it first to preserve strict recency invariants
38+
// for the read path (active > sealed > tables). This avoids overlapping in-memory state
39+
// with newly ingested on-disk tables.
40+
tree.flush_active_memtable(SeqNo::MAX)?;
41+
42+
// Flush any remaining sealed memtables synchronously into tables, then register them.
43+
if tree.sealed_memtable_count() > 0 {
44+
let sealed: Vec<(u64, std::sync::Arc<crate::Memtable>)> = {
45+
let sv = tree
46+
.version_history
47+
.read()
48+
.expect("lock is poisoned")
49+
.latest_version();
50+
sv.sealed_memtables
51+
.iter()
52+
.map(|(id, mt)| (*id, std::sync::Arc::clone(mt)))
53+
.collect()
54+
};
55+
56+
let mut tables = Vec::with_capacity(sealed.len());
57+
for (id, mt) in sealed {
58+
if let Some((table, _blob)) = tree.flush_memtable(id, &mt, SeqNo::MAX)? {
59+
tables.push(table);
60+
}
61+
}
62+
if !tables.is_empty() {
63+
tree.register_tables(&tables, None, None)?;
64+
}
65+
}
66+
67+
// Ensure invariants are satisfied before proceeding with ingestion.
68+
if tree.sealed_memtable_count() > 0 {
69+
drop(ingestion_guard);
70+
return Err(crate::Error::IngestionPreconditionFailed);
71+
}
72+
3273
let folder = tree.config.path.join(crate::file::TABLES_FOLDER);
3374
log::debug!("Ingesting into tables in {}", folder.display());
3475

@@ -102,6 +143,7 @@ impl<'a> Ingestion<'a> {
102143
writer,
103144
seqno: 0,
104145
last_key: None,
146+
_ingestion_guard: ingestion_guard,
105147
})
106148
}
107149

@@ -250,3 +292,28 @@ impl<'a> Ingestion<'a> {
250292
Ok(())
251293
}
252294
}
295+
296+
#[cfg(test)]
297+
mod tests {
298+
use super::*;
299+
use crate::{memtable::Memtable, SequenceNumberCounter};
300+
301+
#[test]
302+
fn ingestion_fails_when_sealed_memtable_cannot_be_flushed() -> crate::Result<()> {
303+
let dir = tempfile::tempdir()?;
304+
let config = crate::config::Config::new(dir.path(), SequenceNumberCounter::default());
305+
let tree = Tree::open(config)?;
306+
307+
// Inject an empty sealed memtable. Flushing this produces no table, so it remains sealed.
308+
let id = tree.get_next_table_id();
309+
tree.add_sealed_memtable(id, std::sync::Arc::new(Memtable::default()));
310+
311+
let err = match Ingestion::new(&tree) {
312+
Ok(_) => panic!("expected IngestionPreconditionFailed"),
313+
Err(e) => e,
314+
};
315+
assert!(matches!(err, crate::Error::IngestionPreconditionFailed));
316+
317+
Ok(())
318+
}
319+
}

src/tree/inner.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,9 @@ pub struct TreeInner {
5959
/// can be concurrent next to each other.
6060
pub(crate) major_compaction_lock: RwLock<()>,
6161

62+
/// Prevents concurrent memtable writes during ingestion.
63+
pub(crate) ingestion_lock: RwLock<()>,
64+
6265
#[doc(hidden)]
6366
#[cfg(feature = "metrics")]
6467
pub metrics: Arc<Metrics>,
@@ -77,6 +80,7 @@ impl TreeInner {
7780
version_history: Arc::new(RwLock::new(SuperVersions::new(version))),
7881
stop_signal: StopSignal::default(),
7982
major_compaction_lock: RwLock::default(),
83+
ingestion_lock: RwLock::default(),
8084
compaction_state: Arc::new(Mutex::new(CompactionState::default())),
8185

8286
#[cfg(feature = "metrics")]

src/tree/mod.rs

Lines changed: 17 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -81,55 +81,34 @@ impl AbstractTree for Tree {
8181

8282
#[expect(clippy::significant_drop_tightening)]
8383
fn get_internal_entry(&self, key: &[u8], seqno: SeqNo) -> crate::Result<Option<InternalValue>> {
84-
// Returns the newest visible version (`entry.seqno < seqno`) across all sources
85-
// for the given snapshot watermark `seqno`:
86-
// - Active memtable
87-
// - Sealed memtables (newest-first)
88-
// - Tables (newest-first)
89-
// Ingestion can write newer versions directly to tables with an explicit `seqno`,
90-
// so we compare seqnos across sources and keep the maximal visible version, then
91-
// apply tombstone semantics once to that winner.
84+
// Returns the newest visible version (`entry.seqno < seqno`) by consulting sources
85+
// in recency order and returning on first hit: active memtable, sealed memtables,
86+
// then tables (each scanned newest-first).
9287

9388
let version_history_lock = self.version_history.read().expect("lock is poisoned");
9489
let super_version = version_history_lock.get_version_for_snapshot(seqno);
9590
// Avoid holding the read lock across potential I/O in table lookups
9691
drop(version_history_lock);
9792

98-
// 1) Active memtable (in-memory)
99-
// If it yields the maximal visible seqno (seqno-1), no source can be newer.
100-
let mut best: Option<InternalValue> = super_version.active_memtable.get(key, seqno);
101-
if let Some(ref b) = best {
102-
if seqno > 0 && b.key.seqno + 1 == seqno {
103-
return Ok(ignore_tombstone_value(b.clone()));
104-
}
93+
if let Some(entry) = super_version.active_memtable.get(key, seqno) {
94+
return Ok(ignore_tombstone_value(entry));
10595
}
10696

107-
// 2) Sealed memtables (in-memory, newest-first)
108-
// If active memtable already has a hit, sealed memtables cannot be newer.
109-
if best.is_none() {
110-
if let Some(entry) =
111-
Self::get_internal_entry_from_sealed_memtables(&super_version, key, seqno)
112-
{
113-
// Early-exit if sealed provided the maximal visible seqno.
114-
if seqno > 0 && entry.key.seqno + 1 == seqno {
115-
return Ok(ignore_tombstone_value(entry));
116-
}
117-
best = Some(entry);
118-
}
97+
// Now look in sealed memtables
98+
if let Some(entry) =
99+
Self::get_internal_entry_from_sealed_memtables(&super_version, key, seqno)
100+
{
101+
return Ok(ignore_tombstone_value(entry));
119102
}
120103

121-
// 3) Tables (on-disk), scanned newest-first within each level/run
104+
// Now look in tables... this may involve disk I/O
122105
if let Some(entry) =
123106
self.get_internal_entry_from_tables(&super_version.version, key, seqno)?
124107
{
125-
match &best {
126-
Some(b) if b.key.seqno >= entry.key.seqno => {}
127-
_ => best = Some(entry),
128-
}
108+
return Ok(ignore_tombstone_value(entry));
129109
}
130110

131-
// Apply tombstone semantics after selecting the newest visible version
132-
Ok(best.and_then(ignore_tombstone_value))
111+
Ok(None)
133112
}
134113

135114
fn current_version(&self) -> Version {
@@ -571,16 +550,19 @@ impl AbstractTree for Tree {
571550
value: V,
572551
seqno: SeqNo,
573552
) -> (u64, u64) {
553+
let _ingestion_guard = self.0.ingestion_lock.read().expect("lock is poisoned");
574554
let value = InternalValue::from_components(key, value, seqno, ValueType::Value);
575555
self.append_entry(value)
576556
}
577557

578558
fn remove<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64) {
559+
let _ingestion_guard = self.0.ingestion_lock.read().expect("lock is poisoned");
579560
let value = InternalValue::new_tombstone(key, seqno);
580561
self.append_entry(value)
581562
}
582563

583564
fn remove_weak<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64) {
565+
let _ingestion_guard = self.0.ingestion_lock.read().expect("lock is poisoned");
584566
let value = InternalValue::new_weak_tombstone(key, seqno);
585567
self.append_entry(value)
586568
}
@@ -915,6 +897,7 @@ impl Tree {
915897
stop_signal: StopSignal::default(),
916898
config,
917899
major_compaction_lock: RwLock::default(),
900+
ingestion_lock: RwLock::default(),
918901
compaction_state: Arc::new(Mutex::new(CompactionState::default())),
919902

920903
#[cfg(feature = "metrics")]

tests/ingestion_invariants.rs

Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
use std::sync::mpsc;
2+
use std::thread;
3+
use std::time::Duration;
4+
5+
use lsm_tree::{AbstractTree, Config, KvSeparationOptions, SeqNo};
6+
7+
#[test]
8+
fn ingestion_autoflushes_active_memtable() -> lsm_tree::Result<()> {
9+
let folder = tempfile::tempdir()?;
10+
let tree = Config::new(&folder, Default::default()).open()?;
11+
12+
// Write to active memtable
13+
for i in 0..10u32 {
14+
let k = format!("a{:03}", i);
15+
tree.insert(k.as_bytes(), b"v", 1);
16+
}
17+
18+
let tables_before = tree.table_count();
19+
let sealed_before = tree.sealed_memtable_count();
20+
assert_eq!(sealed_before, 0);
21+
22+
// Start ingestion (should auto-flush active)
23+
tree.ingestion()?.with_seqno(10).finish()?;
24+
25+
// After ingestion, data is in tables; no sealed memtables
26+
assert_eq!(tree.sealed_memtable_count(), 0);
27+
assert!(tree.table_count() >= tables_before + 1);
28+
29+
// Reads must succeed from tables
30+
for i in 0..10u32 {
31+
let k = format!("a{:03}", i);
32+
assert_eq!(
33+
tree.get(k.as_bytes(), SeqNo::MAX)?,
34+
Some(b"v".as_slice().into())
35+
);
36+
}
37+
38+
Ok(())
39+
}
40+
41+
#[test]
42+
fn ingestion_flushes_sealed_memtables() -> lsm_tree::Result<()> {
43+
let folder = tempfile::tempdir()?;
44+
let tree = Config::new(&folder, Default::default()).open()?;
45+
46+
// Put items into active and seal them
47+
for i in 0..8u32 {
48+
let k = format!("s{:03}", i);
49+
tree.insert(k.as_bytes(), b"x", 1);
50+
}
51+
assert!(tree.rotate_memtable().is_some());
52+
assert!(tree.sealed_memtable_count() > 0);
53+
54+
let tables_before = tree.table_count();
55+
56+
// Ingestion should flush sealed memtables and register resulting tables
57+
tree.ingestion()?.with_seqno(20).finish()?;
58+
59+
assert_eq!(tree.sealed_memtable_count(), 0);
60+
assert!(tree.table_count() >= tables_before + 1);
61+
62+
for i in 0..8u32 {
63+
let k = format!("s{:03}", i);
64+
assert_eq!(
65+
tree.get(k.as_bytes(), SeqNo::MAX)?,
66+
Some(b"x".as_slice().into())
67+
);
68+
}
69+
70+
Ok(())
71+
}
72+
73+
#[test]
74+
fn ingestion_blocks_memtable_writes_until_finish() -> lsm_tree::Result<()> {
75+
let folder = tempfile::tempdir()?;
76+
let tree = Config::new(&folder, Default::default()).open()?;
77+
78+
// Acquire ingestion and hold it to block writes
79+
let ingest = tree.ingestion()?.with_seqno(5);
80+
81+
let (started_tx, started_rx) = mpsc::channel();
82+
let (done_tx, done_rx) = mpsc::channel();
83+
let tree2 = tree.clone();
84+
85+
let handle = thread::spawn(move || {
86+
started_tx.send(()).ok();
87+
// This insert should block until ingestion finishes (ingestion_lock is held)
88+
tree2.insert(b"k_block", b"v", 6);
89+
done_tx.send(()).ok();
90+
});
91+
92+
// Wait for the writer thread to start the attempt
93+
started_rx.recv().unwrap();
94+
95+
// Give it a moment; it should still be blocked
96+
thread::sleep(Duration::from_millis(100));
97+
assert!(
98+
done_rx.try_recv().is_err(),
99+
"insert should still be blocked"
100+
);
101+
102+
// Finish ingestion to release the lock
103+
ingest.finish()?;
104+
105+
// Now the insert should complete
106+
handle.join().unwrap();
107+
done_rx.recv_timeout(Duration::from_secs(1)).unwrap();
108+
109+
// Verify the write landed
110+
assert_eq!(
111+
tree.get(b"k_block", SeqNo::MAX)?,
112+
Some(b"v".as_slice().into())
113+
);
114+
115+
Ok(())
116+
}
117+
118+
#[test]
119+
fn blob_ingestion_honors_invariants_and_blocks_writes() -> lsm_tree::Result<()> {
120+
let folder = tempfile::tempdir()?;
121+
let tree = Config::new(&folder, Default::default())
122+
.with_kv_separation(Some(KvSeparationOptions::default().separation_threshold(1)))
123+
.open()?;
124+
125+
// Write small values into memtable and then start blob ingestion
126+
for i in 0..4u32 {
127+
let k = format!("b{:03}", i);
128+
tree.insert(k.as_bytes(), b"y", 1);
129+
}
130+
131+
let (started_tx, started_rx) = mpsc::channel();
132+
let (done_tx, done_rx) = mpsc::channel();
133+
let tree2 = tree.clone();
134+
135+
let ingest = tree.ingestion()?.with_seqno(30);
136+
137+
// Concurrent write should block while ingestion is active
138+
let handle = thread::spawn(move || {
139+
started_tx.send(()).ok();
140+
tree2.insert(b"b999", b"z", 31);
141+
done_tx.send(()).ok();
142+
});
143+
144+
started_rx.recv().unwrap();
145+
thread::sleep(Duration::from_millis(100));
146+
assert!(done_rx.try_recv().is_err());
147+
148+
ingest.finish()?;
149+
handle.join().unwrap();
150+
done_rx.recv_timeout(Duration::from_secs(1)).unwrap();
151+
152+
// Data visible after ingestion
153+
for i in 0..4u32 {
154+
let k = format!("b{:03}", i);
155+
assert_eq!(
156+
tree.get(k.as_bytes(), SeqNo::MAX)?,
157+
Some(b"y".as_slice().into())
158+
);
159+
}
160+
assert_eq!(tree.get(b"b999", SeqNo::MAX)?, Some(b"z".as_slice().into()));
161+
162+
Ok(())
163+
}

0 commit comments

Comments
 (0)