Skip to content

Commit fda5553

Browse files
committed
unify ingestion API across tree types
1 parent 9252a20 commit fda5553

File tree

6 files changed

+83
-100
lines changed

6 files changed

+83
-100
lines changed

src/any_tree.rs

Lines changed: 57 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,10 @@
22
// This source code is licensed under both the Apache 2.0 and MIT License
33
// (found in the LICENSE-* files in the repository)
44

5-
use crate::{BlobTree, Tree};
5+
use crate::{
6+
blob_tree::ingest::BlobIngestion, tree::ingest::Ingestion, BlobTree, SeqNo, Tree, UserKey,
7+
UserValue,
8+
};
69
use enum_dispatch::enum_dispatch;
710

811
/// May be a standard [`Tree`] or a [`BlobTree`]
@@ -15,3 +18,56 @@ pub enum AnyTree {
1518
/// Key-value separated LSM-tree, see [`BlobTree`]
1619
Blob(BlobTree),
1720
}
21+
22+
/// Unified ingestion builder over AnyTree
23+
pub enum AnyIngestion<'a> {
24+
/// Ingestion for a standard LSM-tree
25+
Standard(Ingestion<'a>),
26+
/// Ingestion for a BlobTree with KV separation
27+
Blob(BlobIngestion<'a>),
28+
}
29+
30+
impl<'a> AnyIngestion<'a> {
31+
#[must_use]
32+
/// Sets the sequence number used for subsequent writes
33+
pub fn with_seqno(self, seqno: SeqNo) -> Self {
34+
match self {
35+
AnyIngestion::Standard(i) => AnyIngestion::Standard(i.with_seqno(seqno)),
36+
AnyIngestion::Blob(b) => AnyIngestion::Blob(b.with_seqno(seqno)),
37+
}
38+
}
39+
40+
/// Writes a key-value pair
41+
pub fn write(&mut self, key: UserKey, value: UserValue) -> crate::Result<()> {
42+
match self {
43+
AnyIngestion::Standard(i) => i.write(key, value),
44+
AnyIngestion::Blob(b) => b.write(key, value),
45+
}
46+
}
47+
48+
/// Writes a tombstone for a key
49+
pub fn write_tombstone(&mut self, key: UserKey) -> crate::Result<()> {
50+
match self {
51+
AnyIngestion::Standard(i) => i.write_tombstone(key),
52+
AnyIngestion::Blob(b) => b.write_tombstone(key),
53+
}
54+
}
55+
56+
/// Finalizes ingestion and registers created tables (and blob files if present)
57+
pub fn finish(self) -> crate::Result<()> {
58+
match self {
59+
AnyIngestion::Standard(i) => i.finish(),
60+
AnyIngestion::Blob(b) => b.finish(),
61+
}
62+
}
63+
}
64+
65+
impl AnyTree {
66+
/// Starts an ingestion for any tree type (standard or blob)
67+
pub fn ingestion(&self) -> crate::Result<AnyIngestion<'_>> {
68+
match self {
69+
AnyTree::Standard(t) => Ok(AnyIngestion::Standard(Ingestion::new(t)?)),
70+
AnyTree::Blob(b) => Ok(AnyIngestion::Blob(BlobIngestion::new(b)?)),
71+
}
72+
}
73+
}

src/blob_tree/ingest.rs

Lines changed: 7 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -26,36 +26,24 @@ impl<'a> BlobIngestion<'a> {
2626
///
2727
/// Will return `Err` if an IO error occurs.
2828
pub fn new(tree: &'a crate::BlobTree) -> crate::Result<Self> {
29-
let blob_file_size = tree
29+
let kv = tree
3030
.index
3131
.config
3232
.kv_separation_opts
3333
.as_ref()
34-
.expect("kv separation options should exist")
35-
.file_target_size;
34+
.expect("kv separation options should exist");
35+
36+
let blob_file_size = kv.file_target_size;
3637

3738
let table = TableIngestion::new(&tree.index)?;
3839
let blob = BlobFileWriter::new(
3940
tree.index.0.blob_file_id_generator.clone(),
4041
blob_file_size,
4142
tree.index.config.path.join(BLOBS_FOLDER),
4243
)?
43-
.use_compression(
44-
tree.index
45-
.config
46-
.kv_separation_opts
47-
.as_ref()
48-
.expect("blob options should exist")
49-
.compression,
50-
);
51-
52-
let separation_threshold = tree
53-
.index
54-
.config
55-
.kv_separation_opts
56-
.as_ref()
57-
.expect("kv separation options should exist")
58-
.separation_threshold;
44+
.use_compression(kv.compression);
45+
46+
let separation_threshold = kv.separation_threshold;
5947

6048
Ok(Self {
6149
tree,

src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,7 @@ pub use {
226226
};
227227

228228
pub use {
229-
any_tree::AnyTree,
229+
any_tree::{AnyIngestion, AnyTree},
230230
blob_tree::ingest::BlobIngestion,
231231
blob_tree::BlobTree,
232232
cache::Cache,

src/tree/mod.rs

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -701,7 +701,7 @@ impl Tree {
701701
None
702702
}
703703

704-
// Look through runs/tables and keep the newest visible version for the key
704+
// Scan levels top-down and runs newest-first; return the first table hit
705705
fn get_internal_entry_from_tables(
706706
&self,
707707
version: &Version,
@@ -712,18 +712,13 @@ impl Tree {
712712
// https://fjall-rs.github.io/post/bloom-filter-hash-sharing/
713713
let key_hash = crate::table::filter::standard_bloom::Builder::get_hash(key);
714714

715-
let mut best: Option<InternalValue> = None;
716-
717715
for level in version.iter_levels() {
718716
for run in level.iter() {
719717
// NOTE: Based on benchmarking, binary search is only worth it with ~4 tables
720718
if run.len() >= 4 {
721719
if let Some(table) = run.get_for_key(key) {
722720
if let Some(item) = table.get(key, seqno, key_hash)? {
723-
match &best {
724-
Some(b) if b.key.seqno >= item.key.seqno => {}
725-
_ => best = Some(item),
726-
}
721+
return Ok(Some(item));
727722
}
728723
}
729724
} else {
@@ -734,17 +729,14 @@ impl Tree {
734729
}
735730

736731
if let Some(item) = table.get(key, seqno, key_hash)? {
737-
match &best {
738-
Some(b) if b.key.seqno >= item.key.seqno => {}
739-
_ => best = Some(item),
740-
}
732+
return Ok(Some(item));
741733
}
742734
}
743735
}
744736
}
745737
}
746738

747-
Ok(best)
739+
Ok(None)
748740
}
749741

750742
fn inner_compact(

tests/ingestion_api.rs

Lines changed: 7 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,16 @@
1-
use lsm_tree::{
2-
AbstractTree, AnyTree, BlobIngestion, Config, Ingestion, KvSeparationOptions, SeqNo,
3-
};
1+
use lsm_tree::{AbstractTree, Config, KvSeparationOptions, SeqNo};
42

53
#[test]
64
fn tree_ingestion_tombstones_delete_existing_keys() -> lsm_tree::Result<()> {
75
let folder = tempfile::tempdir()?;
86
let tree = Config::new(folder, Default::default()).open()?;
9-
let tree = match tree {
10-
AnyTree::Standard(t) => t,
11-
_ => unreachable!(),
12-
};
137

148
for i in 0..10u32 {
159
let key = format!("k{:03}", i);
1610
tree.insert(key.as_bytes(), b"v", 0);
1711
}
1812

19-
let mut ingest = Ingestion::new(&tree)?.with_seqno(10);
13+
let mut ingest = tree.ingestion()?.with_seqno(10);
2014
for i in 0..10u32 {
2115
let key = format!("k{:03}", i);
2216
ingest.write_tombstone(key.as_bytes().into())?;
@@ -38,17 +32,13 @@ fn blob_tree_ingestion_tombstones_delete_existing_keys() -> lsm_tree::Result<()>
3832
let tree = Config::new(folder, Default::default())
3933
.with_kv_separation(Some(KvSeparationOptions::default().separation_threshold(1)))
4034
.open()?;
41-
let tree = match tree {
42-
AnyTree::Blob(t) => t,
43-
_ => unreachable!(),
44-
};
4535

4636
for i in 0..8u32 {
4737
let key = format!("b{:03}", i);
4838
tree.insert(key.as_bytes(), b"x", 0);
4939
}
5040

51-
let mut ingest = BlobIngestion::new(&tree)?.with_seqno(10);
41+
let mut ingest = tree.ingestion()?.with_seqno(10);
5242
for i in 0..8u32 {
5343
let key = format!("b{:03}", i);
5444
ingest.write_tombstone(key.as_bytes().into())?;
@@ -68,13 +58,9 @@ fn blob_tree_ingestion_tombstones_delete_existing_keys() -> lsm_tree::Result<()>
6858
fn tree_ingestion_finish_no_writes_noop() -> lsm_tree::Result<()> {
6959
let folder = tempfile::tempdir()?;
7060
let tree = Config::new(folder, Default::default()).open()?;
71-
let tree = match tree {
72-
AnyTree::Standard(t) => t,
73-
_ => unreachable!(),
74-
};
7561

7662
let before_tables = tree.table_count();
77-
Ingestion::new(&tree)?.finish()?;
63+
tree.ingestion()?.finish()?;
7864
let after_tables = tree.table_count();
7965

8066
assert_eq!(before_tables, after_tables);
@@ -89,10 +75,6 @@ fn blob_ingestion_only_tombstones_does_not_create_blob_files() -> lsm_tree::Resu
8975
let tree = Config::new(folder, Default::default())
9076
.with_kv_separation(Some(KvSeparationOptions::default().separation_threshold(1)))
9177
.open()?;
92-
let tree = match tree {
93-
AnyTree::Blob(t) => t,
94-
_ => unreachable!(),
95-
};
9678

9779
for i in 0..5u32 {
9880
let key = format!("d{:03}", i);
@@ -101,7 +83,7 @@ fn blob_ingestion_only_tombstones_does_not_create_blob_files() -> lsm_tree::Resu
10183

10284
let before_blobs = tree.blob_file_count();
10385

104-
let mut ingest = BlobIngestion::new(&tree)?.with_seqno(10);
86+
let mut ingest = tree.ingestion()?.with_seqno(10);
10587
for i in 0..5u32 {
10688
let key = format!("d{:03}", i);
10789
ingest.write_tombstone(key.as_bytes().into())?;
@@ -125,15 +107,11 @@ fn blob_ingestion_finish_no_writes_noop() -> lsm_tree::Result<()> {
125107
let tree = Config::new(folder, Default::default())
126108
.with_kv_separation(Some(KvSeparationOptions::default().separation_threshold(1)))
127109
.open()?;
128-
let tree = match tree {
129-
AnyTree::Blob(t) => t,
130-
_ => unreachable!(),
131-
};
132110

133111
let before_tables = tree.table_count();
134112
let before_blobs = tree.blob_file_count();
135113

136-
BlobIngestion::new(&tree)?.finish()?;
114+
tree.ingestion()?.finish()?;
137115

138116
let after_tables = tree.table_count();
139117
let after_blobs = tree.blob_file_count();
@@ -151,12 +129,8 @@ fn blob_ingestion_separates_large_values_and_reads_ok() -> lsm_tree::Result<()>
151129
let tree = Config::new(folder, Default::default())
152130
.with_kv_separation(Some(KvSeparationOptions::default().separation_threshold(8)))
153131
.open()?;
154-
let tree = match tree {
155-
AnyTree::Blob(t) => t,
156-
_ => unreachable!(),
157-
};
158132

159-
let mut ingest = BlobIngestion::new(&tree)?.with_seqno(1);
133+
let mut ingest = tree.ingestion()?.with_seqno(1);
160134
ingest.write("k_big1".as_bytes().into(), vec![1u8; 16].into())?;
161135
ingest.write("k_big2".as_bytes().into(), vec![2u8; 32].into())?;
162136
ingest.write("k_small".as_bytes().into(), b"abc".as_slice().into())?;

tests/tree_bulk_ingest.rs

Lines changed: 7 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,4 @@
1-
use lsm_tree::{
2-
AbstractTree, AnyTree, BlobIngestion, Config, Guard, Ingestion, KvSeparationOptions, SeqNo,
3-
SequenceNumberCounter,
4-
};
1+
use lsm_tree::{AbstractTree, Config, Guard, KvSeparationOptions, SeqNo, SequenceNumberCounter};
52
use test_log::test;
63

74
const ITEM_COUNT: usize = 100_000;
@@ -14,13 +11,9 @@ fn tree_bulk_ingest() -> lsm_tree::Result<()> {
1411
let visible_seqno = SequenceNumberCounter::default();
1512

1613
let tree = Config::new(folder, seqno.clone()).open()?;
17-
let tree = match tree {
18-
AnyTree::Standard(t) => t,
19-
_ => unreachable!(),
20-
};
2114

2215
let seq = seqno.next();
23-
let mut ingestion = Ingestion::new(&tree)?.with_seqno(seq);
16+
let mut ingestion = tree.ingestion()?.with_seqno(seq);
2417
for x in 0..ITEM_COUNT as u64 {
2518
let k = x.to_be_bytes();
2619
let v = nanoid::nanoid!();
@@ -53,13 +46,9 @@ fn tree_copy() -> lsm_tree::Result<()> {
5346
let visible_seqno = SequenceNumberCounter::default();
5447

5548
let src = Config::new(folder, seqno.clone()).open()?;
56-
let src = match src {
57-
AnyTree::Standard(t) => t,
58-
_ => unreachable!(),
59-
};
6049

6150
let seq = seqno.next();
62-
let mut ingestion = Ingestion::new(&src)?.with_seqno(seq);
51+
let mut ingestion = src.ingestion()?.with_seqno(seq);
6352
for x in 0..ITEM_COUNT as u64 {
6453
let k = x.to_be_bytes();
6554
let v = nanoid::nanoid!();
@@ -83,13 +72,9 @@ fn tree_copy() -> lsm_tree::Result<()> {
8372

8473
let folder = tempfile::tempdir()?;
8574
let dest = Config::new(folder, seqno.clone()).open()?;
86-
let dest = match dest {
87-
AnyTree::Standard(t) => t,
88-
_ => unreachable!(),
89-
};
9075

9176
let seq = seqno.next();
92-
let mut ingestion = Ingestion::new(&dest)?.with_seqno(seq);
77+
let mut ingestion = dest.ingestion()?.with_seqno(seq);
9378
for item in src.iter(SeqNo::MAX, None) {
9479
let (k, v) = item.into_inner().unwrap();
9580
ingestion.write(k, v)?;
@@ -123,13 +108,9 @@ fn blob_tree_bulk_ingest() -> lsm_tree::Result<()> {
123108
let tree = Config::new(folder, seqno.clone())
124109
.with_kv_separation(Some(KvSeparationOptions::default().separation_threshold(1)))
125110
.open()?;
126-
let tree = match tree {
127-
AnyTree::Blob(t) => t,
128-
_ => unreachable!(),
129-
};
130111

131112
let seq = seqno.next();
132-
let mut ingestion = BlobIngestion::new(&tree)?.with_seqno(seq);
113+
let mut ingestion = tree.ingestion()?.with_seqno(seq);
133114
for x in 0..ITEM_COUNT as u64 {
134115
let k = x.to_be_bytes();
135116
let v = nanoid::nanoid!();
@@ -165,13 +146,9 @@ fn blob_tree_copy() -> lsm_tree::Result<()> {
165146
let src = Config::new(folder, seqno.clone())
166147
.with_kv_separation(Some(KvSeparationOptions::default().separation_threshold(1)))
167148
.open()?;
168-
let src = match src {
169-
AnyTree::Blob(t) => t,
170-
_ => unreachable!(),
171-
};
172149

173150
let seq = seqno.next();
174-
let mut ingestion = BlobIngestion::new(&src)?.with_seqno(seq);
151+
let mut ingestion = src.ingestion()?.with_seqno(seq);
175152
for x in 0..ITEM_COUNT as u64 {
176153
let k = x.to_be_bytes();
177154
let v = nanoid::nanoid!();
@@ -198,13 +175,9 @@ fn blob_tree_copy() -> lsm_tree::Result<()> {
198175
let dest = Config::new(folder, seqno.clone())
199176
.with_kv_separation(Some(KvSeparationOptions::default().separation_threshold(1)))
200177
.open()?;
201-
let dest = match dest {
202-
AnyTree::Blob(t) => t,
203-
_ => unreachable!(),
204-
};
205178

206179
let seq = seqno.next();
207-
let mut ingestion = BlobIngestion::new(&dest)?.with_seqno(seq);
180+
let mut ingestion = dest.ingestion()?.with_seqno(seq);
208181
for item in src.iter(SeqNo::MAX, None) {
209182
let (k, v) = item.into_inner().unwrap();
210183
ingestion.write(k, v)?;

0 commit comments

Comments
 (0)