Skip to content

Commit 1af202a

Browse files
committed
Ingestion API IoC, remove ingestion lock, and align with new flush semantics
1 parent 7357d29 commit 1af202a

File tree

12 files changed

+870
-253
lines changed

12 files changed

+870
-253
lines changed

src/abstract.rs

Lines changed: 4 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,10 @@
33
// (found in the LICENSE-* files in the repository)
44

55
use crate::{
6-
iter_guard::IterGuardImpl, table::Table, version::Version, vlog::BlobFile, AnyTree, BlobTree,
7-
Config, Guard, InternalValue, KvPair, Memtable, SeqNo, SequenceNumberCounter, TableId, Tree,
8-
UserKey, UserValue,
6+
blob_tree::FragmentationMap, compaction::CompactionStrategy, config::TreeType,
7+
iter_guard::IterGuardImpl, table::Table, tree::inner::MemtableId, version::Version,
8+
vlog::BlobFile, AnyTree, BlobTree, Config, Guard, InternalValue, KvPair, Memtable, SeqNo,
9+
TableId, Tree, TreeId, UserKey, UserValue,
910
};
1011
use std::{
1112
ops::RangeBounds,
@@ -137,27 +138,6 @@ pub trait AbstractTree {
137138
index: Option<Arc<Memtable>>,
138139
) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static>;
139140

140-
/// Ingests a sorted stream of key-value pairs into the tree.
141-
///
142-
/// Can only be called on a new fresh, empty tree.
143-
///
144-
/// # Errors
145-
///
146-
/// Will return `Err` if an IO error occurs.
147-
///
148-
/// # Panics
149-
///
150-
/// Panics if the tree is **not** initially empty.
151-
///
152-
/// Will panic if the input iterator is not sorted in ascending order.
153-
#[doc(hidden)]
154-
fn ingest(
155-
&self,
156-
iter: impl Iterator<Item = (UserKey, UserValue)>,
157-
seqno_generator: &SequenceNumberCounter,
158-
visible_seqno: &SequenceNumberCounter,
159-
) -> crate::Result<()>;
160-
161141
/// Returns the approximate number of tombstones in the tree.
162142
fn tombstone_count(&self) -> u64;
163143

src/any_tree.rs

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,10 @@
22
// This source code is licensed under both the Apache 2.0 and MIT License
33
// (found in the LICENSE-* files in the repository)
44

5-
use crate::{BlobTree, Tree};
5+
use crate::{
6+
blob_tree::ingest::BlobIngestion, tree::ingest::Ingestion, BlobTree, SeqNo, Tree, UserKey,
7+
UserValue,
8+
};
69
use enum_dispatch::enum_dispatch;
710

811
/// May be a standard [`Tree`] or a [`BlobTree`]
@@ -15,3 +18,60 @@ pub enum AnyTree {
1518
/// Key-value separated LSM-tree, see [`BlobTree`]
1619
Blob(BlobTree),
1720
}
21+
22+
/// Unified ingestion builder over `AnyTree`
23+
// Keep zero allocations and direct dispatch; boxing introduces heap indirection and `dyn` adds virtual dispatch.
24+
// Ingestion calls use `&mut self` in tight loops; the active variant is stable and branch prediction makes the match cheap.
25+
// Allowing this lint preserves hot-path performance at the cost of a larger enum size.
26+
#[allow(clippy::large_enum_variant)]
27+
pub enum AnyIngestion<'a> {
28+
/// Ingestion for a standard LSM-tree
29+
Standard(Ingestion<'a>),
30+
/// Ingestion for a [`BlobTree`] with KV separation
31+
Blob(BlobIngestion<'a>),
32+
}
33+
34+
impl<'a> AnyIngestion<'a> {
35+
#[must_use]
36+
/// Sets the sequence number used for subsequent writes
37+
pub fn with_seqno(self, seqno: SeqNo) -> Self {
38+
match self {
39+
AnyIngestion::Standard(i) => AnyIngestion::Standard(i.with_seqno(seqno)),
40+
AnyIngestion::Blob(b) => AnyIngestion::Blob(b.with_seqno(seqno)),
41+
}
42+
}
43+
44+
/// Writes a key-value pair
45+
pub fn write(&mut self, key: UserKey, value: UserValue) -> crate::Result<()> {
46+
match self {
47+
AnyIngestion::Standard(i) => i.write(key, value),
48+
AnyIngestion::Blob(b) => b.write(key, value),
49+
}
50+
}
51+
52+
/// Writes a tombstone for a key
53+
pub fn write_tombstone(&mut self, key: UserKey) -> crate::Result<()> {
54+
match self {
55+
AnyIngestion::Standard(i) => i.write_tombstone(key),
56+
AnyIngestion::Blob(b) => b.write_tombstone(key),
57+
}
58+
}
59+
60+
/// Finalizes ingestion and registers created tables (and blob files if present)
61+
pub fn finish(self) -> crate::Result<()> {
62+
match self {
63+
AnyIngestion::Standard(i) => i.finish(),
64+
AnyIngestion::Blob(b) => b.finish(),
65+
}
66+
}
67+
}
68+
69+
impl AnyTree {
70+
/// Starts an ingestion for any tree type (standard or blob)
71+
pub fn ingestion(&self) -> crate::Result<AnyIngestion<'_>> {
72+
match self {
73+
AnyTree::Standard(t) => Ok(AnyIngestion::Standard(Ingestion::new(t)?)),
74+
AnyTree::Blob(b) => Ok(AnyIngestion::Blob(BlobIngestion::new(b)?)),
75+
}
76+
}
77+
}

src/blob_tree/ingest.rs

Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
use crate::{
2+
blob_tree::handle::BlobIndirection,
3+
file::BLOBS_FOLDER,
4+
table::Table,
5+
tree::ingest::Ingestion as TableIngestion,
6+
vlog::{BlobFileWriter, ValueHandle},
7+
SeqNo, UserKey, UserValue,
8+
};
9+
10+
/// Bulk ingestion for BlobTree
11+
///
12+
/// Items NEED to be added in ascending key order.
13+
pub struct BlobIngestion<'a> {
14+
tree: &'a crate::BlobTree,
15+
pub(crate) table: TableIngestion<'a>,
16+
pub(crate) blob: BlobFileWriter,
17+
seqno: SeqNo,
18+
separation_threshold: u32,
19+
last_key: Option<UserKey>,
20+
}
21+
22+
impl<'a> BlobIngestion<'a> {
23+
/// Creates a new ingestion.
24+
///
25+
/// # Errors
26+
///
27+
/// Will return `Err` if an IO error occurs.
28+
pub fn new(tree: &'a crate::BlobTree) -> crate::Result<Self> {
29+
let kv = tree
30+
.index
31+
.config
32+
.kv_separation_opts
33+
.as_ref()
34+
.expect("kv separation options should exist");
35+
36+
let blob_file_size = kv.file_target_size;
37+
38+
let table = TableIngestion::new(&tree.index)?;
39+
let blob = BlobFileWriter::new(
40+
tree.index.0.blob_file_id_counter.clone(),
41+
blob_file_size,
42+
tree.index.config.path.join(BLOBS_FOLDER),
43+
)?
44+
.use_compression(kv.compression);
45+
46+
let separation_threshold = kv.separation_threshold;
47+
48+
Ok(Self {
49+
tree,
50+
table,
51+
blob,
52+
seqno: 0,
53+
separation_threshold,
54+
last_key: None,
55+
})
56+
}
57+
58+
/// Sets the ingestion seqno.
59+
#[must_use]
60+
pub fn with_seqno(mut self, seqno: SeqNo) -> Self {
61+
self.seqno = seqno;
62+
self.table = self.table.with_seqno(seqno);
63+
self
64+
}
65+
66+
/// Writes a key-value pair.
67+
///
68+
/// # Errors
69+
///
70+
/// Will return `Err` if an IO error occurs.
71+
pub fn write(&mut self, key: UserKey, value: UserValue) -> crate::Result<()> {
72+
// Check order before any blob I/O to avoid partial writes on failure
73+
if let Some(prev) = &self.last_key {
74+
assert!(
75+
key > *prev,
76+
"next key in ingestion must be greater than last key"
77+
);
78+
}
79+
80+
#[allow(clippy::cast_possible_truncation)]
81+
let value_size = value.len() as u32;
82+
83+
if value_size >= self.separation_threshold {
84+
let offset = self.blob.offset();
85+
let blob_file_id = self.blob.blob_file_id();
86+
let on_disk_size = self.blob.write(&key, self.seqno, &value)?;
87+
88+
let indirection = BlobIndirection {
89+
vhandle: ValueHandle {
90+
blob_file_id,
91+
offset,
92+
on_disk_size,
93+
},
94+
size: value_size,
95+
};
96+
97+
let cloned_key = key.clone();
98+
let res = self.table.write_indirection(key, indirection);
99+
if res.is_ok() {
100+
self.last_key = Some(cloned_key);
101+
}
102+
res
103+
} else {
104+
let cloned_key = key.clone();
105+
let res = self.table.write(key, value);
106+
if res.is_ok() {
107+
self.last_key = Some(cloned_key);
108+
}
109+
res
110+
}
111+
}
112+
113+
/// Writes a tombstone for a key.
114+
///
115+
/// # Errors
116+
///
117+
/// Will return `Err` if an IO error occurs.
118+
pub fn write_tombstone(&mut self, key: UserKey) -> crate::Result<()> {
119+
if let Some(prev) = &self.last_key {
120+
assert!(
121+
key > *prev,
122+
"next key in ingestion must be greater than last key"
123+
);
124+
}
125+
126+
let cloned_key = key.clone();
127+
let res = self.table.write_tombstone(key);
128+
if res.is_ok() {
129+
self.last_key = Some(cloned_key);
130+
}
131+
res
132+
}
133+
134+
/// Finishes the ingestion.
135+
///
136+
/// # Errors
137+
///
138+
/// Will return `Err` if an IO error occurs.
139+
pub fn finish(self) -> crate::Result<()> {
140+
use crate::AbstractTree;
141+
142+
// Capture required handles before consuming fields during finalization
143+
let index = self.index().clone();
144+
let tree = self.tree.clone();
145+
146+
let blob_files = self.blob.finish()?;
147+
let results = self.table.writer.finish()?;
148+
149+
let pin_filter = index.config.filter_block_pinning_policy.get(0);
150+
let pin_index = index.config.index_block_pinning_policy.get(0);
151+
152+
let created_tables = results
153+
.into_iter()
154+
.map(|(table_id, checksum)| -> crate::Result<Table> {
155+
Table::recover(
156+
index
157+
.config
158+
.path
159+
.join(crate::file::TABLES_FOLDER)
160+
.join(table_id.to_string()),
161+
checksum,
162+
index.id,
163+
index.config.cache.clone(),
164+
index.config.descriptor_table.clone(),
165+
pin_filter,
166+
pin_index,
167+
#[cfg(feature = "metrics")]
168+
index.metrics.clone(),
169+
)
170+
})
171+
.collect::<crate::Result<Vec<_>>>()?;
172+
173+
tree.register_tables(&created_tables, Some(&blob_files), None, &[], 0)?;
174+
175+
Ok(())
176+
}
177+
178+
#[inline]
179+
fn index(&self) -> &crate::Tree {
180+
&self.tree.index
181+
}
182+
}

0 commit comments

Comments
 (0)