Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 4 additions & 24 deletions src/abstract.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@
// (found in the LICENSE-* files in the repository)

use crate::{
iter_guard::IterGuardImpl, table::Table, version::Version, vlog::BlobFile, AnyTree, BlobTree,
Config, Guard, InternalValue, KvPair, Memtable, SeqNo, SequenceNumberCounter, TableId, Tree,
UserKey, UserValue,
blob_tree::FragmentationMap, compaction::CompactionStrategy, config::TreeType,
iter_guard::IterGuardImpl, table::Table, tree::inner::MemtableId, version::Version,
vlog::BlobFile, AnyTree, BlobTree, Config, Guard, InternalValue, KvPair, Memtable, SeqNo,
TableId, Tree, TreeId, UserKey, UserValue,
};
use std::{
ops::RangeBounds,
Expand Down Expand Up @@ -137,27 +138,6 @@ pub trait AbstractTree {
index: Option<Arc<Memtable>>,
) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static>;

/// Ingests a sorted stream of key-value pairs into the tree.
///
/// Can only be called on a new fresh, empty tree.
///
/// # Errors
///
/// Will return `Err` if an IO error occurs.
///
/// # Panics
///
/// Panics if the tree is **not** initially empty.
///
/// Will panic if the input iterator is not sorted in ascending order.
#[doc(hidden)]
fn ingest(
&self,
iter: impl Iterator<Item = (UserKey, UserValue)>,
seqno_generator: &SequenceNumberCounter,
visible_seqno: &SequenceNumberCounter,
) -> crate::Result<()>;

/// Returns the approximate number of tombstones in the tree.
fn tombstone_count(&self) -> u64;

Expand Down
62 changes: 61 additions & 1 deletion src/any_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@
// This source code is licensed under both the Apache 2.0 and MIT License
// (found in the LICENSE-* files in the repository)

use crate::{BlobTree, Tree};
use crate::{
blob_tree::ingest::BlobIngestion, tree::ingest::Ingestion, BlobTree, SeqNo, Tree, UserKey,
UserValue,
};
use enum_dispatch::enum_dispatch;

/// May be a standard [`Tree`] or a [`BlobTree`]
Expand All @@ -15,3 +18,60 @@ pub enum AnyTree {
/// Key-value separated LSM-tree, see [`BlobTree`]
Blob(BlobTree),
}

/// Unified ingestion builder over `AnyTree`
// Keep zero allocations and direct dispatch; boxing introduces heap indirection and `dyn` adds virtual dispatch.
// Ingestion calls use `&mut self` in tight loops; the active variant is stable and branch prediction makes the match cheap.
// Allowing this lint preserves hot-path performance at the cost of a larger enum size.
#[allow(clippy::large_enum_variant)]
pub enum AnyIngestion<'a> {
/// Ingestion for a standard LSM-tree
Standard(Ingestion<'a>),
/// Ingestion for a [`BlobTree`] with KV separation
Blob(BlobIngestion<'a>),
}

impl<'a> AnyIngestion<'a> {
#[must_use]
/// Sets the sequence number used for subsequent writes
pub fn with_seqno(self, seqno: SeqNo) -> Self {
match self {
AnyIngestion::Standard(i) => AnyIngestion::Standard(i.with_seqno(seqno)),
AnyIngestion::Blob(b) => AnyIngestion::Blob(b.with_seqno(seqno)),
}
}

/// Writes a key-value pair
pub fn write(&mut self, key: UserKey, value: UserValue) -> crate::Result<()> {
match self {
AnyIngestion::Standard(i) => i.write(key, value),
AnyIngestion::Blob(b) => b.write(key, value),
}
}

/// Writes a tombstone for a key
pub fn write_tombstone(&mut self, key: UserKey) -> crate::Result<()> {
match self {
AnyIngestion::Standard(i) => i.write_tombstone(key),
AnyIngestion::Blob(b) => b.write_tombstone(key),
}
}

/// Finalizes ingestion and registers created tables (and blob files if present)
pub fn finish(self) -> crate::Result<()> {
match self {
AnyIngestion::Standard(i) => i.finish(),
AnyIngestion::Blob(b) => b.finish(),
}
}
}

impl AnyTree {
/// Starts an ingestion for any tree type (standard or blob)
pub fn ingestion(&self) -> crate::Result<AnyIngestion<'_>> {
match self {
AnyTree::Standard(t) => Ok(AnyIngestion::Standard(Ingestion::new(t)?)),
AnyTree::Blob(b) => Ok(AnyIngestion::Blob(BlobIngestion::new(b)?)),
}
}
}
190 changes: 190 additions & 0 deletions src/blob_tree/ingest.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
use crate::{
blob_tree::handle::BlobIndirection,
file::BLOBS_FOLDER,
table::Table,
tree::ingest::Ingestion as TableIngestion,
vlog::{BlobFileWriter, ValueHandle},
SeqNo, UserKey, UserValue,
};

/// Bulk ingestion for [`BlobTree`]
///
/// Items NEED to be added in ascending key order.
///
/// Uses table ingestion for the index and a blob file writer for large
/// values so both streams advance together.
pub struct BlobIngestion<'a> {
tree: &'a crate::BlobTree,
pub(crate) table: TableIngestion<'a>,
pub(crate) blob: BlobFileWriter,
seqno: SeqNo,
separation_threshold: u32,
last_key: Option<UserKey>,
}

impl<'a> BlobIngestion<'a> {
/// Creates a new ingestion.
///
/// # Errors
///
/// Will return `Err` if an IO error occurs.
pub fn new(tree: &'a crate::BlobTree) -> crate::Result<Self> {
let kv = tree
.index
.config
.kv_separation_opts
.as_ref()
.expect("kv separation options should exist");

let blob_file_size = kv.file_target_size;

let table = TableIngestion::new(&tree.index)?;
let blob = BlobFileWriter::new(
tree.index.0.blob_file_id_counter.clone(),
blob_file_size,
tree.index.config.path.join(BLOBS_FOLDER),
)?
.use_compression(kv.compression);

let separation_threshold = kv.separation_threshold;

Ok(Self {
tree,
table,
blob,
seqno: 0,
separation_threshold,
last_key: None,
})
}

/// Sets the ingestion seqno.
#[must_use]
pub fn with_seqno(mut self, seqno: SeqNo) -> Self {
self.seqno = seqno;
self.table = self.table.with_seqno(seqno);
self
}

/// Writes a key-value pair.
///
/// # Errors
///
/// Will return `Err` if an IO error occurs.
pub fn write(&mut self, key: UserKey, value: UserValue) -> crate::Result<()> {
// Check order before any blob I/O to avoid partial writes on failure
if let Some(prev) = &self.last_key {
assert!(
key > *prev,
"next key in ingestion must be greater than last key"
);
}

#[allow(clippy::cast_possible_truncation)]
let value_size = value.len() as u32;

if value_size >= self.separation_threshold {
let offset = self.blob.offset();
let blob_file_id = self.blob.blob_file_id();
let on_disk_size = self.blob.write(&key, self.seqno, &value)?;

let indirection = BlobIndirection {
vhandle: ValueHandle {
blob_file_id,
offset,
on_disk_size,
},
size: value_size,
};

let cloned_key = key.clone();
let res = self.table.write_indirection(key, indirection);
if res.is_ok() {
self.last_key = Some(cloned_key);
}
res
} else {
let cloned_key = key.clone();
let res = self.table.write(key, value);
if res.is_ok() {
self.last_key = Some(cloned_key);
}
res
}
}

/// Writes a tombstone for a key.
///
/// # Errors
///
/// Will return `Err` if an IO error occurs.
pub fn write_tombstone(&mut self, key: UserKey) -> crate::Result<()> {
if let Some(prev) = &self.last_key {
assert!(
key > *prev,
"next key in ingestion must be greater than last key"
);
}

let cloned_key = key.clone();
let res = self.table.write_tombstone(key);
if res.is_ok() {
self.last_key = Some(cloned_key);
}
res
}

/// Finishes the ingestion.
///
/// # Errors
///
/// Will return `Err` if an IO error occurs.
pub fn finish(self) -> crate::Result<()> {
use crate::AbstractTree;

// Capture required handles before consuming fields during finalization
let index = self.index().clone();
let tree = self.tree.clone();

// Finalize both value log and index writer so the index sees a
// consistent set of blob files.
let blob_files = self.blob.finish()?;
let results = self.table.writer.finish()?;

let created_tables = results
.into_iter()
.map(|(table_id, checksum)| -> crate::Result<Table> {
// Do not pin ingestion output tables here. Large ingests are
// typically placed in level 1 and would otherwise keep all
// filter and index blocks pinned, increasing memory pressure.
Table::recover(
index
.config
.path
.join(crate::file::TABLES_FOLDER)
.join(table_id.to_string()),
checksum,
index.id,
index.config.cache.clone(),
index.config.descriptor_table.clone(),
false,
false,
#[cfg(feature = "metrics")]
index.metrics.clone(),
)
})
.collect::<crate::Result<Vec<_>>>()?;

// Blob ingestion only appends new tables and blob files; sealed
// memtables remain unchanged and GC watermark stays at its
// neutral value for this operation.
tree.register_tables(&created_tables, Some(&blob_files), None, &[], 0)?;

Ok(())
}

#[inline]
fn index(&self) -> &crate::Tree {
&self.tree.index
}
}
Loading