Skip to content

Commit 044b8a1

Browse files
committed
fix snapshot isolation in point lookups
1 parent eaa4c49 commit 044b8a1

File tree

2 files changed

+165
-12
lines changed

2 files changed

+165
-12
lines changed

src/tree/mod.rs

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -84,26 +84,37 @@ impl AbstractTree for Tree {
8484
let version_history_lock = self.version_history.read().expect("lock is poisoned");
8585
let super_version = version_history_lock.get_version_for_snapshot(seqno);
8686

87-
// Active memtable has the newest writes; if present, it's definitive.
88-
if let Some(entry) = super_version.active_memtable.get(key, seqno) {
89-
return Ok(ignore_tombstone_value(entry));
87+
// Track best candidate by seqno across all sources
88+
let mut best: Option<InternalValue> = super_version.active_memtable.get(key, seqno);
89+
// If the memtable hit matches the snapshot seqno, it's the maximal visible version
90+
if let Some(ref entry) = best {
91+
if entry.key.seqno == seqno {
92+
return Ok(ignore_tombstone_value(entry.clone()));
93+
}
9094
}
9195

92-
// Sealed memtables are newer than any tables; return on first hit.
93-
if let Some(entry) =
94-
Self::get_internal_entry_from_sealed_memtables(&super_version, key, seqno)
95-
{
96-
return Ok(ignore_tombstone_value(entry));
96+
// Sealed memtables are older than active; only consult them on miss
97+
if best.is_none() {
98+
if let Some(entry) =
99+
Self::get_internal_entry_from_sealed_memtables(&super_version, key, seqno)
100+
{
101+
best = Some(entry);
102+
}
97103
}
98104

99-
// Otherwise consult tables (levels top-down, runs newest-first) and return first hit.
100105
if let Some(entry) =
101106
self.get_internal_entry_from_tables(&super_version.version, key, seqno)?
102107
{
108+
if let Some(b) = best {
109+
if b.key.seqno >= entry.key.seqno {
110+
return Ok(ignore_tombstone_value(b));
111+
}
112+
}
103113
return Ok(ignore_tombstone_value(entry));
104114
}
105115

106-
Ok(None)
116+
// Apply tombstone semantics after selecting the newest version
117+
Ok(best.and_then(ignore_tombstone_value))
107118
}
108119

109120
fn current_version(&self) -> Version {
@@ -650,7 +661,6 @@ impl Tree {
650661
log::debug!("Finalized table write at {}", table_file_path.display());
651662

652663
let pin_filter = self.config.filter_block_pinning_policy.get(0);
653-
// Index blocks use the index pinning policy
654664
let pin_index = self.config.index_block_pinning_policy.get(0);
655665

656666
let created_table = Table::recover(

tests/ingestion_api.rs

Lines changed: 144 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ fn tree_ingestion_tombstones_delete_existing_keys() -> lsm_tree::Result<()> {
99
let key = format!("k{:03}", i);
1010
tree.insert(key.as_bytes(), b"v", 0);
1111
}
12-
1312
let mut ingest = tree.ingestion()?.with_seqno(10);
1413
for i in 0..10u32 {
1514
let key = format!("k{:03}", i);
@@ -26,6 +25,150 @@ fn tree_ingestion_tombstones_delete_existing_keys() -> lsm_tree::Result<()> {
2625
Ok(())
2726
}
2827

28+
#[test]
29+
fn sealed_memtable_value_overrides_table_value() -> lsm_tree::Result<()> {
30+
use lsm_tree::AbstractTree;
31+
let folder = tempfile::tempdir()?;
32+
let tree = lsm_tree::Config::new(folder, Default::default()).open()?;
33+
34+
// Older table value via ingestion (seqno 1)
35+
{
36+
let mut ingest = tree.ingestion()?.with_seqno(1);
37+
ingest.write(b"k".as_slice().into(), b"old".as_slice().into())?;
38+
ingest.finish()?;
39+
}
40+
41+
// Newer value in memtable (seqno 2), then seal it
42+
tree.insert(b"k", b"new", 2);
43+
let _ = tree.rotate_memtable(); // move active -> sealed
44+
45+
// Read should return the sealed memtable value
46+
assert_eq!(
47+
tree.get(b"k", lsm_tree::SeqNo::MAX)?,
48+
Some(b"new".as_slice().into())
49+
);
50+
Ok(())
51+
}
52+
53+
#[test]
54+
fn sealed_memtable_tombstone_overrides_table_value() -> lsm_tree::Result<()> {
55+
use lsm_tree::AbstractTree;
56+
let folder = tempfile::tempdir()?;
57+
let tree = lsm_tree::Config::new(folder, Default::default()).open()?;
58+
59+
// Older table value via ingestion (seqno 1)
60+
{
61+
let mut ingest = tree.ingestion()?.with_seqno(1);
62+
ingest.write(b"k".as_slice().into(), b"old".as_slice().into())?;
63+
ingest.finish()?;
64+
}
65+
66+
// Newer tombstone in memtable (seqno 2), then seal it
67+
tree.remove(b"k", 2);
68+
let _ = tree.rotate_memtable();
69+
70+
// Read should see the delete from sealed memtable
71+
assert!(tree.get(b"k", lsm_tree::SeqNo::MAX)?.is_none());
72+
Ok(())
73+
}
74+
75+
#[test]
76+
fn tables_newest_first_returns_highest_seqno() -> lsm_tree::Result<()> {
77+
use lsm_tree::AbstractTree;
78+
let folder = tempfile::tempdir()?;
79+
let tree = lsm_tree::Config::new(folder, Default::default()).open()?;
80+
81+
// Two separate ingestions create two tables containing the same key at different seqnos
82+
{
83+
let mut ingest = tree.ingestion()?.with_seqno(1);
84+
ingest.write(b"k".as_slice().into(), b"v1".as_slice().into())?;
85+
ingest.finish()?;
86+
}
87+
{
88+
let mut ingest = tree.ingestion()?.with_seqno(2);
89+
ingest.write(b"k".as_slice().into(), b"v2".as_slice().into())?;
90+
ingest.finish()?;
91+
}
92+
93+
// With memtables empty, read should return the value from the newest table run (seqno 2)
94+
assert_eq!(
95+
tree.get(b"k", lsm_tree::SeqNo::MAX)?,
96+
Some(b"v2".as_slice().into())
97+
);
98+
Ok(())
99+
}
100+
101+
#[test]
102+
#[should_panic(expected = "next key in ingestion must be greater than last key")]
103+
fn ingestion_enforces_order_standard_panics() {
104+
let folder = tempfile::tempdir().unwrap();
105+
let tree = lsm_tree::Config::new(folder, Default::default())
106+
.open()
107+
.unwrap();
108+
109+
let mut ingest = tree.ingestion().unwrap().with_seqno(1);
110+
// First write higher key, then lower to trigger ordering assertion
111+
ingest
112+
.write(b"k2".as_slice().into(), b"v".as_slice().into())
113+
.unwrap();
114+
// Panics here
115+
let _ = ingest.write(b"k1".as_slice().into(), b"v".as_slice().into());
116+
}
117+
118+
#[test]
119+
fn blob_ingestion_out_of_order_panics_without_blob_write() -> lsm_tree::Result<()> {
120+
let folder = tempfile::tempdir()?;
121+
let tree = lsm_tree::Config::new(folder, Default::default())
122+
.with_kv_separation(Some(KvSeparationOptions::default().separation_threshold(8)))
123+
.open()?;
124+
125+
let before = tree.blob_file_count();
126+
127+
// Use a small value for the first write to avoid blob I/O
128+
let result = std::panic::catch_unwind(|| {
129+
let mut ingest = tree.ingestion().unwrap().with_seqno(1);
130+
ingest
131+
.write(b"k2".as_slice().into(), b"x".as_slice().into())
132+
.unwrap();
133+
// Second write would require blob I/O, but ordering check should fire before any blob write
134+
let _ = ingest.write(b"k1".as_slice().into(), vec![1u8; 16].into());
135+
});
136+
assert!(result.is_err());
137+
138+
let after = tree.blob_file_count();
139+
assert_eq!(before, after);
140+
Ok(())
141+
}
142+
143+
#[test]
144+
fn memtable_put_overrides_table_tombstone() -> lsm_tree::Result<()> {
145+
use lsm_tree::AbstractTree;
146+
let folder = tempfile::tempdir()?;
147+
let tree = lsm_tree::Config::new(folder, Default::default()).open()?;
148+
149+
// Older put written via ingestion to tables (seqno 1)
150+
{
151+
let mut ingest = tree.ingestion()?.with_seqno(1);
152+
ingest.write(b"k".as_slice().into(), b"v1".as_slice().into())?;
153+
ingest.finish()?;
154+
}
155+
156+
// Newer tombstone written via ingestion to tables (seqno 2)
157+
{
158+
let mut ingest = tree.ingestion()?.with_seqno(2);
159+
ingest.write_tombstone(b"k".as_slice().into())?;
160+
ingest.finish()?;
161+
}
162+
163+
// Newest put in active memtable (seqno 3) should override table tombstone
164+
tree.insert(b"k", b"v3", 3);
165+
assert_eq!(
166+
tree.get(b"k", lsm_tree::SeqNo::MAX)?,
167+
Some(b"v3".as_slice().into())
168+
);
169+
Ok(())
170+
}
171+
29172
#[test]
30173
fn blob_tree_ingestion_tombstones_delete_existing_keys() -> lsm_tree::Result<()> {
31174
let folder = tempfile::tempdir()?;

0 commit comments

Comments
 (0)