Skip to content

Commit 9252a20

Browse files
committed
Expose Ingestion API with Inversion of Control
1 parent 9f02d16 commit 9252a20

File tree

8 files changed

+541
-259
lines changed

8 files changed

+541
-259
lines changed

src/abstract.rs

Lines changed: 1 addition & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use crate::{
66
blob_tree::FragmentationMap, compaction::CompactionStrategy, config::TreeType,
77
iter_guard::IterGuardImpl, table::Table, tree::inner::MemtableId, version::Version,
88
vlog::BlobFile, AnyTree, BlobTree, Config, Guard, InternalValue, KvPair, Memtable, SeqNo,
9-
SequenceNumberCounter, TableId, Tree, TreeId, UserKey, UserValue,
9+
TableId, Tree, TreeId, UserKey, UserValue,
1010
};
1111
use enum_dispatch::enum_dispatch;
1212
use std::{ops::RangeBounds, sync::Arc};
@@ -72,27 +72,6 @@ pub trait AbstractTree {
7272
index: Option<Arc<Memtable>>,
7373
) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static>;
7474

75-
/// Ingests a sorted stream of key-value pairs into the tree.
76-
///
77-
/// Can only be called on a new fresh, empty tree.
78-
///
79-
/// # Errors
80-
///
81-
/// Will return `Err` if an IO error occurs.
82-
///
83-
/// # Panics
84-
///
85-
/// Panics if the tree is **not** initially empty.
86-
///
87-
/// Will panic if the input iterator is not sorted in ascending order.
88-
#[doc(hidden)]
89-
fn ingest(
90-
&self,
91-
iter: impl Iterator<Item = (UserKey, UserValue)>,
92-
seqno_generator: &SequenceNumberCounter,
93-
visible_seqno: &SequenceNumberCounter,
94-
) -> crate::Result<()>;
95-
9675
/// Returns the approximate number of tombstones in the tree.
9776
fn tombstone_count(&self) -> u64;
9877

src/blob_tree/ingest.rs

Lines changed: 194 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,194 @@
1+
use crate::{
2+
blob_tree::handle::BlobIndirection,
3+
file::BLOBS_FOLDER,
4+
table::Table,
5+
tree::ingest::Ingestion as TableIngestion,
6+
vlog::{BlobFileWriter, ValueHandle},
7+
SeqNo, UserKey, UserValue,
8+
};
9+
10+
/// Bulk ingestion for BlobTree
11+
///
12+
/// Items NEED to be added in ascending key order.
13+
pub struct BlobIngestion<'a> {
14+
tree: &'a crate::BlobTree,
15+
pub(crate) table: TableIngestion<'a>,
16+
pub(crate) blob: BlobFileWriter,
17+
seqno: SeqNo,
18+
separation_threshold: u32,
19+
last_key: Option<UserKey>,
20+
}
21+
22+
impl<'a> BlobIngestion<'a> {
23+
/// Creates a new ingestion.
24+
///
25+
/// # Errors
26+
///
27+
/// Will return `Err` if an IO error occurs.
28+
pub fn new(tree: &'a crate::BlobTree) -> crate::Result<Self> {
29+
let blob_file_size = tree
30+
.index
31+
.config
32+
.kv_separation_opts
33+
.as_ref()
34+
.expect("kv separation options should exist")
35+
.file_target_size;
36+
37+
let table = TableIngestion::new(&tree.index)?;
38+
let blob = BlobFileWriter::new(
39+
tree.index.0.blob_file_id_generator.clone(),
40+
blob_file_size,
41+
tree.index.config.path.join(BLOBS_FOLDER),
42+
)?
43+
.use_compression(
44+
tree.index
45+
.config
46+
.kv_separation_opts
47+
.as_ref()
48+
.expect("blob options should exist")
49+
.compression,
50+
);
51+
52+
let separation_threshold = tree
53+
.index
54+
.config
55+
.kv_separation_opts
56+
.as_ref()
57+
.expect("kv separation options should exist")
58+
.separation_threshold;
59+
60+
Ok(Self {
61+
tree,
62+
table,
63+
blob,
64+
seqno: 0,
65+
separation_threshold,
66+
last_key: None,
67+
})
68+
}
69+
70+
/// Sets the ingestion seqno.
71+
#[must_use]
72+
pub fn with_seqno(mut self, seqno: SeqNo) -> Self {
73+
self.seqno = seqno;
74+
self.table = self.table.with_seqno(seqno);
75+
self
76+
}
77+
78+
/// Writes a key-value pair.
79+
///
80+
/// # Errors
81+
///
82+
/// Will return `Err` if an IO error occurs.
83+
pub fn write(&mut self, key: UserKey, value: UserValue) -> crate::Result<()> {
84+
// Check order before any blob I/O to avoid partial writes on failure
85+
if let Some(prev) = &self.last_key {
86+
assert!(
87+
key > *prev,
88+
"next key in ingestion must be greater than last key"
89+
);
90+
}
91+
92+
#[allow(clippy::cast_possible_truncation)]
93+
let value_size = value.len() as u32;
94+
95+
if value_size >= self.separation_threshold {
96+
let offset = self.blob.offset();
97+
let blob_file_id = self.blob.blob_file_id();
98+
let on_disk_size = self.blob.write(&key, self.seqno, &value)?;
99+
100+
let indirection = BlobIndirection {
101+
vhandle: ValueHandle {
102+
blob_file_id,
103+
offset,
104+
on_disk_size,
105+
},
106+
size: value_size,
107+
};
108+
109+
let cloned_key = key.clone();
110+
let res = self.table.write_indirection(key, indirection);
111+
if res.is_ok() {
112+
self.last_key = Some(cloned_key);
113+
}
114+
res
115+
} else {
116+
let cloned_key = key.clone();
117+
let res = self.table.write(key, value);
118+
if res.is_ok() {
119+
self.last_key = Some(cloned_key);
120+
}
121+
res
122+
}
123+
}
124+
125+
/// Writes a tombstone for a key.
126+
///
127+
/// # Errors
128+
///
129+
/// Will return `Err` if an IO error occurs.
130+
pub fn write_tombstone(&mut self, key: UserKey) -> crate::Result<()> {
131+
if let Some(prev) = &self.last_key {
132+
assert!(
133+
key > *prev,
134+
"next key in ingestion must be greater than last key"
135+
);
136+
}
137+
138+
let cloned_key = key.clone();
139+
let res = self.table.write_tombstone(key);
140+
if res.is_ok() {
141+
self.last_key = Some(cloned_key);
142+
}
143+
res
144+
}
145+
146+
/// Finishes the ingestion.
147+
///
148+
/// # Errors
149+
///
150+
/// Will return `Err` if an IO error occurs.
151+
pub fn finish(self) -> crate::Result<()> {
152+
use crate::AbstractTree;
153+
154+
// Capture required handles before consuming fields during finalization
155+
let index = self.index().clone();
156+
let tree = self.tree.clone();
157+
158+
let blob_files = self.blob.finish()?;
159+
let results = self.table.writer.finish()?;
160+
161+
let pin_filter = index.config.filter_block_pinning_policy.get(0);
162+
let pin_index = index.config.index_block_pinning_policy.get(0);
163+
164+
let created_tables = results
165+
.into_iter()
166+
.map(|(table_id, checksum)| -> crate::Result<Table> {
167+
Table::recover(
168+
index
169+
.config
170+
.path
171+
.join(crate::file::TABLES_FOLDER)
172+
.join(table_id.to_string()),
173+
checksum,
174+
index.id,
175+
index.config.cache.clone(),
176+
index.config.descriptor_table.clone(),
177+
pin_filter,
178+
pin_index,
179+
#[cfg(feature = "metrics")]
180+
index.metrics.clone(),
181+
)
182+
})
183+
.collect::<crate::Result<Vec<_>>>()?;
184+
185+
tree.register_tables(&created_tables, Some(&blob_files), None)?;
186+
187+
Ok(())
188+
}
189+
190+
#[inline]
191+
fn index(&self) -> &crate::Tree {
192+
&self.tree.index
193+
}
194+
}

src/blob_tree/mod.rs

Lines changed: 1 addition & 116 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
mod gc;
66
pub mod handle;
7+
pub mod ingest;
78

89
#[doc(hidden)]
910
pub use gc::{FragmentationEntry, FragmentationMap};
@@ -251,122 +252,6 @@ impl AbstractTree for BlobTree {
251252
self.index.drop_range(range)
252253
}
253254

254-
fn ingest(
255-
&self,
256-
iter: impl Iterator<Item = (UserKey, UserValue)>,
257-
seqno_generator: &SequenceNumberCounter,
258-
visible_seqno: &SequenceNumberCounter,
259-
) -> crate::Result<()> {
260-
use crate::{compaction::MoveDown, tree::ingest::Ingestion};
261-
use std::time::Instant;
262-
263-
let seqno = seqno_generator.next();
264-
265-
let blob_file_size = self
266-
.index
267-
.config
268-
.kv_separation_opts
269-
.as_ref()
270-
.expect("kv separation options should exist")
271-
.file_target_size;
272-
273-
let mut table_writer = Ingestion::new(&self.index)?.with_seqno(seqno);
274-
let mut blob_writer = BlobFileWriter::new(
275-
self.index.0.blob_file_id_generator.clone(),
276-
blob_file_size,
277-
self.index.config.path.join(BLOBS_FOLDER),
278-
)?
279-
.use_compression(
280-
self.index
281-
.config
282-
.kv_separation_opts
283-
.as_ref()
284-
.expect("blob options should exist")
285-
.compression,
286-
);
287-
288-
let start = Instant::now();
289-
let mut count = 0;
290-
let mut last_key = None;
291-
292-
let separation_threshold = self
293-
.index
294-
.config
295-
.kv_separation_opts
296-
.as_ref()
297-
.expect("kv separation options should exist")
298-
.separation_threshold;
299-
300-
for (key, value) in iter {
301-
if let Some(last_key) = &last_key {
302-
assert!(
303-
key > last_key,
304-
"next key in bulk ingest was not greater than last key",
305-
);
306-
}
307-
last_key = Some(key.clone());
308-
309-
#[expect(clippy::cast_possible_truncation, reason = "values are 32-bit max")]
310-
let value_size = value.len() as u32;
311-
312-
if value_size >= separation_threshold {
313-
let offset = blob_writer.offset();
314-
let blob_file_id = blob_writer.blob_file_id();
315-
let on_disk_size = blob_writer.write(&key, seqno, &value)?;
316-
317-
let indirection = BlobIndirection {
318-
vhandle: ValueHandle {
319-
blob_file_id,
320-
offset,
321-
on_disk_size,
322-
},
323-
size: value_size,
324-
};
325-
326-
table_writer.write_indirection(key, indirection)?;
327-
} else {
328-
table_writer.write(key, value)?;
329-
}
330-
331-
count += 1;
332-
}
333-
334-
let blob_files = blob_writer.finish()?;
335-
let results = table_writer.writer.finish()?;
336-
337-
let pin_filter = self.index.config.filter_block_pinning_policy.get(0);
338-
let pin_index = self.index.config.filter_block_pinning_policy.get(0);
339-
340-
let created_tables = results
341-
.into_iter()
342-
.map(|(table_id, checksum)| -> crate::Result<Table> {
343-
Table::recover(
344-
self.index
345-
.config
346-
.path
347-
.join(crate::file::TABLES_FOLDER)
348-
.join(table_id.to_string()),
349-
checksum,
350-
self.index.id,
351-
self.index.config.cache.clone(),
352-
self.index.config.descriptor_table.clone(),
353-
pin_filter,
354-
pin_index,
355-
#[cfg(feature = "metrics")]
356-
self.index.metrics.clone(),
357-
)
358-
})
359-
.collect::<crate::Result<Vec<_>>>()?;
360-
361-
self.register_tables(&created_tables, Some(&blob_files), None)?;
362-
363-
visible_seqno.fetch_max(seqno + 1);
364-
365-
log::info!("Ingested {count} items in {:?}", start.elapsed());
366-
367-
Ok(())
368-
}
369-
370255
fn major_compact(&self, target_size: u64, seqno_threshold: SeqNo) -> crate::Result<()> {
371256
self.index.major_compact(target_size, seqno_threshold)
372257
}

src/lib.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -221,13 +221,13 @@ pub use {
221221
merge::BoxedIterator,
222222
slice::Builder,
223223
table::{GlobalTableId, Table, TableId},
224-
tree::ingest::Ingestion,
225224
tree::inner::TreeId,
226225
value::InternalValue,
227226
};
228227

229228
pub use {
230229
any_tree::AnyTree,
230+
blob_tree::ingest::BlobIngestion,
231231
blob_tree::BlobTree,
232232
cache::Cache,
233233
compression::CompressionType,
@@ -240,6 +240,7 @@ pub use {
240240
r#abstract::AbstractTree,
241241
seqno::SequenceNumberCounter,
242242
slice::Slice,
243+
tree::ingest::Ingestion,
243244
tree::Tree,
244245
value::SeqNo,
245246
value_type::ValueType,

0 commit comments

Comments
 (0)