Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
36b2651
Expose Ingestion API with Inversion of Control
zaidoon1 Nov 15, 2025
d9d041f
clippy
marvin-j97 Nov 15, 2025
3489b8f
don't pin ingestion output tables
zaidoon1 Nov 15, 2025
25c9cdf
split ingestion initialization from seqno assignment in bulk ingest t…
zaidoon1 Nov 15, 2025
087e5de
Merge branch 'main' into zaidoon/ingestion-api-inversion-control
marvin-j97 Nov 15, 2025
359a585
Merge branch 'main' into zaidoon/ingestion-api-inversion-control
marvin-j97 Nov 17, 2025
3314099
Merge remote-tracking branch 'zaidoon1/zaidoon/ingestion-api-inversio…
marvin-j97 Nov 19, 2025
96a7f58
Merge branch 'main' into zaidoon/ingestion-api-inversion-control
marvin-j97 Nov 19, 2025
90b02a6
test: dirty snapshot after ingestion
marvin-j97 Nov 19, 2025
b73aa4e
refactor
marvin-j97 Nov 19, 2025
1877b57
refactor
marvin-j97 Nov 19, 2025
f3adbb7
Merge remote-tracking branch 'zaidoon1/zaidoon/ingestion-api-inversio…
marvin-j97 Nov 19, 2025
7849efa
more ergonomic ingestion API, add more tests
marvin-j97 Nov 19, 2025
0725204
refactor
marvin-j97 Nov 19, 2025
f06a70c
apply pinning on recovery
marvin-j97 Nov 19, 2025
8971f4c
Merge branch 'main' into zaidoon/ingestion-api-inversion-control
marvin-j97 Nov 19, 2025
0aa3ff4
Merge branch 'main' into zaidoon/ingestion-api-inversion-control
marvin-j97 Nov 19, 2025
fe64606
Merge branch 'main' into zaidoon/ingestion-api-inversion-control
marvin-j97 Nov 19, 2025
976cf87
change ingestion flush watermark to 0
marvin-j97 Nov 20, 2025
7fe024f
lint
marvin-j97 Nov 20, 2025
19e0876
feat: table global seqno
marvin-j97 Nov 22, 2025
bc0f85f
refactor
marvin-j97 Nov 22, 2025
0be5b17
implement atomic flush and global_seqno coordination
zaidoon1 Nov 22, 2025
f69061a
refactor ingestion to use upgrade_version_with_seqno for explicit seq…
zaidoon1 Nov 22, 2025
51e0fdc
refactor
marvin-j97 Nov 23, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 4 additions & 24 deletions src/abstract.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@
// (found in the LICENSE-* files in the repository)

use crate::{
iter_guard::IterGuardImpl, table::Table, version::Version, vlog::BlobFile, AnyTree, BlobTree,
Config, Guard, InternalValue, KvPair, Memtable, SeqNo, SequenceNumberCounter, TableId, Tree,
UserKey, UserValue,
blob_tree::FragmentationMap, compaction::CompactionStrategy, config::TreeType,
iter_guard::IterGuardImpl, table::Table, tree::inner::MemtableId, version::Version,
vlog::BlobFile, AnyTree, BlobTree, Config, Guard, InternalValue, KvPair, Memtable, SeqNo,
TableId, Tree, TreeId, UserKey, UserValue,
};
use std::{
ops::RangeBounds,
Expand Down Expand Up @@ -137,27 +138,6 @@ pub trait AbstractTree {
index: Option<Arc<Memtable>>,
) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static>;

/// Ingests a sorted stream of key-value pairs into the tree.
///
/// Can only be called on a new fresh, empty tree.
///
/// # Errors
///
/// Will return `Err` if an IO error occurs.
///
/// # Panics
///
/// Panics if the tree is **not** initially empty.
///
/// Will panic if the input iterator is not sorted in ascending order.
#[doc(hidden)]
fn ingest(
&self,
iter: impl Iterator<Item = (UserKey, UserValue)>,
seqno_generator: &SequenceNumberCounter,
visible_seqno: &SequenceNumberCounter,
) -> crate::Result<()>;

/// Returns the approximate number of tombstones in the tree.
fn tombstone_count(&self) -> u64;

Expand Down
62 changes: 61 additions & 1 deletion src/any_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@
// This source code is licensed under both the Apache 2.0 and MIT License
// (found in the LICENSE-* files in the repository)

use crate::{BlobTree, Tree};
use crate::{
blob_tree::ingest::BlobIngestion, tree::ingest::Ingestion, BlobTree, SeqNo, Tree, UserKey,
UserValue,
};
use enum_dispatch::enum_dispatch;

/// May be a standard [`Tree`] or a [`BlobTree`]
Expand All @@ -15,3 +18,60 @@ pub enum AnyTree {
/// Key-value separated LSM-tree, see [`BlobTree`]
Blob(BlobTree),
}

/// Unified ingestion builder over `AnyTree`
// Keep zero allocations and direct dispatch; boxing introduces heap indirection and `dyn` adds virtual dispatch.
// Ingestion calls use `&mut self` in tight loops; the active variant is stable and branch prediction makes the match cheap.
// Allowing this lint preserves hot-path performance at the cost of a larger enum size.
#[allow(clippy::large_enum_variant)]
pub enum AnyIngestion<'a> {
/// Ingestion for a standard LSM-tree
Standard(Ingestion<'a>),
/// Ingestion for a [`BlobTree`] with KV separation
Blob(BlobIngestion<'a>),
}

impl<'a> AnyIngestion<'a> {
#[must_use]
/// Sets the sequence number used for subsequent writes
pub fn with_seqno(self, seqno: SeqNo) -> Self {
match self {
AnyIngestion::Standard(i) => AnyIngestion::Standard(i.with_seqno(seqno)),
AnyIngestion::Blob(b) => AnyIngestion::Blob(b.with_seqno(seqno)),
}
}

/// Writes a key-value pair
pub fn write(&mut self, key: UserKey, value: UserValue) -> crate::Result<()> {
match self {
AnyIngestion::Standard(i) => i.write(key, value),
AnyIngestion::Blob(b) => b.write(key, value),
}
}

/// Writes a tombstone for a key
pub fn write_tombstone(&mut self, key: UserKey) -> crate::Result<()> {
match self {
AnyIngestion::Standard(i) => i.write_tombstone(key),
AnyIngestion::Blob(b) => b.write_tombstone(key),
}
}

/// Finalizes ingestion and registers created tables (and blob files if present)
pub fn finish(self) -> crate::Result<()> {
match self {
AnyIngestion::Standard(i) => i.finish(),
AnyIngestion::Blob(b) => b.finish(),
}
}
}

impl AnyTree {
/// Starts an ingestion for any tree type (standard or blob)
pub fn ingestion(&self) -> crate::Result<AnyIngestion<'_>> {
match self {
AnyTree::Standard(t) => Ok(AnyIngestion::Standard(Ingestion::new(t)?)),
AnyTree::Blob(b) => Ok(AnyIngestion::Blob(BlobIngestion::new(b)?)),
}
}
}
190 changes: 190 additions & 0 deletions src/blob_tree/ingest.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
use crate::{
blob_tree::handle::BlobIndirection,
file::BLOBS_FOLDER,
table::Table,
tree::ingest::Ingestion as TableIngestion,
vlog::{BlobFileWriter, ValueHandle},
SeqNo, UserKey, UserValue,
};

/// Bulk ingestion for [`BlobTree`]
///
/// Items NEED to be added in ascending key order.
///
/// Uses table ingestion for the index and a blob file writer for large
/// values so both streams advance together.
pub struct BlobIngestion<'a> {
tree: &'a crate::BlobTree,
pub(crate) table: TableIngestion<'a>,
pub(crate) blob: BlobFileWriter,
seqno: SeqNo,
separation_threshold: u32,
last_key: Option<UserKey>,
}

impl<'a> BlobIngestion<'a> {
/// Creates a new ingestion.
///
/// # Errors
///
/// Will return `Err` if an IO error occurs.
pub fn new(tree: &'a crate::BlobTree) -> crate::Result<Self> {
let kv = tree
.index
.config
.kv_separation_opts
.as_ref()
.expect("kv separation options should exist");

let blob_file_size = kv.file_target_size;

let table = TableIngestion::new(&tree.index)?;
let blob = BlobFileWriter::new(
tree.index.0.blob_file_id_counter.clone(),
blob_file_size,
tree.index.config.path.join(BLOBS_FOLDER),
)?
.use_compression(kv.compression);

let separation_threshold = kv.separation_threshold;

Ok(Self {
tree,
table,
blob,
seqno: 0,
separation_threshold,
last_key: None,
})
}

/// Sets the ingestion seqno.
#[must_use]
pub fn with_seqno(mut self, seqno: SeqNo) -> Self {
self.seqno = seqno;
self.table = self.table.with_seqno(seqno);
self
}

/// Writes a key-value pair.
///
/// # Errors
///
/// Will return `Err` if an IO error occurs.
pub fn write(&mut self, key: UserKey, value: UserValue) -> crate::Result<()> {
// Check order before any blob I/O to avoid partial writes on failure
if let Some(prev) = &self.last_key {
assert!(
key > *prev,
"next key in ingestion must be greater than last key"
);
}

#[allow(clippy::cast_possible_truncation)]
let value_size = value.len() as u32;

if value_size >= self.separation_threshold {
let offset = self.blob.offset();
let blob_file_id = self.blob.blob_file_id();
let on_disk_size = self.blob.write(&key, self.seqno, &value)?;

let indirection = BlobIndirection {
vhandle: ValueHandle {
blob_file_id,
offset,
on_disk_size,
},
size: value_size,
};

let cloned_key = key.clone();
let res = self.table.write_indirection(key, indirection);
if res.is_ok() {
self.last_key = Some(cloned_key);
}
res
} else {
let cloned_key = key.clone();
let res = self.table.write(key, value);
if res.is_ok() {
self.last_key = Some(cloned_key);
}
res
}
}

/// Writes a tombstone for a key.
///
/// # Errors
///
/// Will return `Err` if an IO error occurs.
pub fn write_tombstone(&mut self, key: UserKey) -> crate::Result<()> {
if let Some(prev) = &self.last_key {
assert!(
key > *prev,
"next key in ingestion must be greater than last key"
);
}

let cloned_key = key.clone();
let res = self.table.write_tombstone(key);
if res.is_ok() {
self.last_key = Some(cloned_key);
}
res
}

/// Finishes the ingestion.
///
/// # Errors
///
/// Will return `Err` if an IO error occurs.
pub fn finish(self) -> crate::Result<()> {
use crate::AbstractTree;

// Capture required handles before consuming fields during finalization
let index = self.index().clone();
let tree = self.tree.clone();

// Finalize both value log and index writer so the index sees a
// consistent set of blob files.
let blob_files = self.blob.finish()?;
let results = self.table.writer.finish()?;

let created_tables = results
.into_iter()
.map(|(table_id, checksum)| -> crate::Result<Table> {
// Do not pin ingestion output tables here. Large ingests are
// typically placed in level 1 and would otherwise keep all
// filter and index blocks pinned, increasing memory pressure.
Table::recover(
index
.config
.path
.join(crate::file::TABLES_FOLDER)
.join(table_id.to_string()),
checksum,
index.id,
index.config.cache.clone(),
index.config.descriptor_table.clone(),
false,
false,
#[cfg(feature = "metrics")]
index.metrics.clone(),
)
})
.collect::<crate::Result<Vec<_>>>()?;

// Blob ingestion only appends new tables and blob files; sealed
// memtables remain unchanged and GC watermark stays at its
// neutral value for this operation.
tree.register_tables(&created_tables, Some(&blob_files), None, &[], 0)?;

Ok(())
}

#[inline]
fn index(&self) -> &crate::Tree {
&self.tree.index
}
}
Loading