Skip to content

Commit 36b2651

Browse files
committed
Expose Ingestion API with Inversion of Control
1 parent 7357d29 commit 36b2651

File tree

12 files changed

+887
-253
lines changed

12 files changed

+887
-253
lines changed

src/abstract.rs

Lines changed: 4 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,10 @@
33
// (found in the LICENSE-* files in the repository)
44

55
use crate::{
6-
iter_guard::IterGuardImpl, table::Table, version::Version, vlog::BlobFile, AnyTree, BlobTree,
7-
Config, Guard, InternalValue, KvPair, Memtable, SeqNo, SequenceNumberCounter, TableId, Tree,
8-
UserKey, UserValue,
6+
blob_tree::FragmentationMap, compaction::CompactionStrategy, config::TreeType,
7+
iter_guard::IterGuardImpl, table::Table, tree::inner::MemtableId, version::Version,
8+
vlog::BlobFile, AnyTree, BlobTree, Config, Guard, InternalValue, KvPair, Memtable, SeqNo,
9+
TableId, Tree, TreeId, UserKey, UserValue,
910
};
1011
use std::{
1112
ops::RangeBounds,
@@ -137,27 +138,6 @@ pub trait AbstractTree {
137138
index: Option<Arc<Memtable>>,
138139
) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static>;
139140

140-
/// Ingests a sorted stream of key-value pairs into the tree.
141-
///
142-
/// Can only be called on a new fresh, empty tree.
143-
///
144-
/// # Errors
145-
///
146-
/// Will return `Err` if an IO error occurs.
147-
///
148-
/// # Panics
149-
///
150-
/// Panics if the tree is **not** initially empty.
151-
///
152-
/// Will panic if the input iterator is not sorted in ascending order.
153-
#[doc(hidden)]
154-
fn ingest(
155-
&self,
156-
iter: impl Iterator<Item = (UserKey, UserValue)>,
157-
seqno_generator: &SequenceNumberCounter,
158-
visible_seqno: &SequenceNumberCounter,
159-
) -> crate::Result<()>;
160-
161141
/// Returns the approximate number of tombstones in the tree.
162142
fn tombstone_count(&self) -> u64;
163143

src/any_tree.rs

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,10 @@
22
// This source code is licensed under both the Apache 2.0 and MIT License
33
// (found in the LICENSE-* files in the repository)
44

5-
use crate::{BlobTree, Tree};
5+
use crate::{
6+
blob_tree::ingest::BlobIngestion, tree::ingest::Ingestion, BlobTree, SeqNo, Tree, UserKey,
7+
UserValue,
8+
};
69
use enum_dispatch::enum_dispatch;
710

811
/// May be a standard [`Tree`] or a [`BlobTree`]
@@ -15,3 +18,60 @@ pub enum AnyTree {
1518
/// Key-value separated LSM-tree, see [`BlobTree`]
1619
Blob(BlobTree),
1720
}
21+
22+
/// Unified ingestion builder over `AnyTree`
23+
// Keep zero allocations and direct dispatch; boxing introduces heap indirection and `dyn` adds virtual dispatch.
24+
// Ingestion calls use `&mut self` in tight loops; the active variant is stable and branch prediction makes the match cheap.
25+
// Allowing this lint preserves hot-path performance at the cost of a larger enum size.
26+
#[allow(clippy::large_enum_variant)]
27+
pub enum AnyIngestion<'a> {
28+
/// Ingestion for a standard LSM-tree
29+
Standard(Ingestion<'a>),
30+
/// Ingestion for a [`BlobTree`] with KV separation
31+
Blob(BlobIngestion<'a>),
32+
}
33+
34+
impl<'a> AnyIngestion<'a> {
35+
#[must_use]
36+
/// Sets the sequence number used for subsequent writes
37+
pub fn with_seqno(self, seqno: SeqNo) -> Self {
38+
match self {
39+
AnyIngestion::Standard(i) => AnyIngestion::Standard(i.with_seqno(seqno)),
40+
AnyIngestion::Blob(b) => AnyIngestion::Blob(b.with_seqno(seqno)),
41+
}
42+
}
43+
44+
/// Writes a key-value pair
45+
pub fn write(&mut self, key: UserKey, value: UserValue) -> crate::Result<()> {
46+
match self {
47+
AnyIngestion::Standard(i) => i.write(key, value),
48+
AnyIngestion::Blob(b) => b.write(key, value),
49+
}
50+
}
51+
52+
/// Writes a tombstone for a key
53+
pub fn write_tombstone(&mut self, key: UserKey) -> crate::Result<()> {
54+
match self {
55+
AnyIngestion::Standard(i) => i.write_tombstone(key),
56+
AnyIngestion::Blob(b) => b.write_tombstone(key),
57+
}
58+
}
59+
60+
/// Finalizes ingestion and registers created tables (and blob files if present)
61+
pub fn finish(self) -> crate::Result<()> {
62+
match self {
63+
AnyIngestion::Standard(i) => i.finish(),
64+
AnyIngestion::Blob(b) => b.finish(),
65+
}
66+
}
67+
}
68+
69+
impl AnyTree {
70+
/// Starts an ingestion for any tree type (standard or blob)
71+
pub fn ingestion(&self) -> crate::Result<AnyIngestion<'_>> {
72+
match self {
73+
AnyTree::Standard(t) => Ok(AnyIngestion::Standard(Ingestion::new(t)?)),
74+
AnyTree::Blob(b) => Ok(AnyIngestion::Blob(BlobIngestion::new(b)?)),
75+
}
76+
}
77+
}

src/blob_tree/ingest.rs

Lines changed: 190 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,190 @@
1+
use crate::{
2+
blob_tree::handle::BlobIndirection,
3+
file::BLOBS_FOLDER,
4+
table::Table,
5+
tree::ingest::Ingestion as TableIngestion,
6+
vlog::{BlobFileWriter, ValueHandle},
7+
SeqNo, UserKey, UserValue,
8+
};
9+
10+
/// Bulk ingestion for BlobTree
11+
///
12+
/// Items NEED to be added in ascending key order.
13+
///
14+
/// Uses table ingestion for the index and a blob file writer for large
15+
/// values so both streams advance together.
16+
pub struct BlobIngestion<'a> {
17+
tree: &'a crate::BlobTree,
18+
pub(crate) table: TableIngestion<'a>,
19+
pub(crate) blob: BlobFileWriter,
20+
seqno: SeqNo,
21+
separation_threshold: u32,
22+
last_key: Option<UserKey>,
23+
}
24+
25+
impl<'a> BlobIngestion<'a> {
26+
/// Creates a new ingestion.
27+
///
28+
/// # Errors
29+
///
30+
/// Will return `Err` if an IO error occurs.
31+
pub fn new(tree: &'a crate::BlobTree) -> crate::Result<Self> {
32+
let kv = tree
33+
.index
34+
.config
35+
.kv_separation_opts
36+
.as_ref()
37+
.expect("kv separation options should exist");
38+
39+
let blob_file_size = kv.file_target_size;
40+
41+
let table = TableIngestion::new(&tree.index)?;
42+
let blob = BlobFileWriter::new(
43+
tree.index.0.blob_file_id_counter.clone(),
44+
blob_file_size,
45+
tree.index.config.path.join(BLOBS_FOLDER),
46+
)?
47+
.use_compression(kv.compression);
48+
49+
let separation_threshold = kv.separation_threshold;
50+
51+
Ok(Self {
52+
tree,
53+
table,
54+
blob,
55+
seqno: 0,
56+
separation_threshold,
57+
last_key: None,
58+
})
59+
}
60+
61+
/// Sets the ingestion seqno.
62+
#[must_use]
63+
pub fn with_seqno(mut self, seqno: SeqNo) -> Self {
64+
self.seqno = seqno;
65+
self.table = self.table.with_seqno(seqno);
66+
self
67+
}
68+
69+
/// Writes a key-value pair.
70+
///
71+
/// # Errors
72+
///
73+
/// Will return `Err` if an IO error occurs.
74+
pub fn write(&mut self, key: UserKey, value: UserValue) -> crate::Result<()> {
75+
// Check order before any blob I/O to avoid partial writes on failure
76+
if let Some(prev) = &self.last_key {
77+
assert!(
78+
key > *prev,
79+
"next key in ingestion must be greater than last key"
80+
);
81+
}
82+
83+
#[allow(clippy::cast_possible_truncation)]
84+
let value_size = value.len() as u32;
85+
86+
if value_size >= self.separation_threshold {
87+
let offset = self.blob.offset();
88+
let blob_file_id = self.blob.blob_file_id();
89+
let on_disk_size = self.blob.write(&key, self.seqno, &value)?;
90+
91+
let indirection = BlobIndirection {
92+
vhandle: ValueHandle {
93+
blob_file_id,
94+
offset,
95+
on_disk_size,
96+
},
97+
size: value_size,
98+
};
99+
100+
let cloned_key = key.clone();
101+
let res = self.table.write_indirection(key, indirection);
102+
if res.is_ok() {
103+
self.last_key = Some(cloned_key);
104+
}
105+
res
106+
} else {
107+
let cloned_key = key.clone();
108+
let res = self.table.write(key, value);
109+
if res.is_ok() {
110+
self.last_key = Some(cloned_key);
111+
}
112+
res
113+
}
114+
}
115+
116+
/// Writes a tombstone for a key.
117+
///
118+
/// # Errors
119+
///
120+
/// Will return `Err` if an IO error occurs.
121+
pub fn write_tombstone(&mut self, key: UserKey) -> crate::Result<()> {
122+
if let Some(prev) = &self.last_key {
123+
assert!(
124+
key > *prev,
125+
"next key in ingestion must be greater than last key"
126+
);
127+
}
128+
129+
let cloned_key = key.clone();
130+
let res = self.table.write_tombstone(key);
131+
if res.is_ok() {
132+
self.last_key = Some(cloned_key);
133+
}
134+
res
135+
}
136+
137+
/// Finishes the ingestion.
138+
///
139+
/// # Errors
140+
///
141+
/// Will return `Err` if an IO error occurs.
142+
pub fn finish(self) -> crate::Result<()> {
143+
use crate::AbstractTree;
144+
145+
// Capture required handles before consuming fields during finalization
146+
let index = self.index().clone();
147+
let tree = self.tree.clone();
148+
149+
// Finalize both value log and index writer so the index sees a
150+
// consistent set of blob files.
151+
let blob_files = self.blob.finish()?;
152+
let results = self.table.writer.finish()?;
153+
154+
let pin_filter = index.config.filter_block_pinning_policy.get(0);
155+
let pin_index = index.config.index_block_pinning_policy.get(0);
156+
157+
let created_tables = results
158+
.into_iter()
159+
.map(|(table_id, checksum)| -> crate::Result<Table> {
160+
Table::recover(
161+
index
162+
.config
163+
.path
164+
.join(crate::file::TABLES_FOLDER)
165+
.join(table_id.to_string()),
166+
checksum,
167+
index.id,
168+
index.config.cache.clone(),
169+
index.config.descriptor_table.clone(),
170+
pin_filter,
171+
pin_index,
172+
#[cfg(feature = "metrics")]
173+
index.metrics.clone(),
174+
)
175+
})
176+
.collect::<crate::Result<Vec<_>>>()?;
177+
178+
// Blob ingestion only appends new tables and blob files; sealed
179+
// memtables remain unchanged and GC watermark stays at its
180+
// neutral value for this operation.
181+
tree.register_tables(&created_tables, Some(&blob_files), None, &[], 0)?;
182+
183+
Ok(())
184+
}
185+
186+
#[inline]
187+
fn index(&self) -> &crate::Tree {
188+
&self.tree.index
189+
}
190+
}

0 commit comments

Comments
 (0)