diff --git a/src/abstract.rs b/src/abstract.rs index 6b1bbf6e..4a45cd00 100644 --- a/src/abstract.rs +++ b/src/abstract.rs @@ -3,9 +3,10 @@ // (found in the LICENSE-* files in the repository) use crate::{ - iter_guard::IterGuardImpl, table::Table, version::Version, vlog::BlobFile, AnyTree, BlobTree, - Config, Guard, InternalValue, KvPair, Memtable, SeqNo, SequenceNumberCounter, TableId, Tree, - UserKey, UserValue, + blob_tree::FragmentationMap, compaction::CompactionStrategy, config::TreeType, + iter_guard::IterGuardImpl, table::Table, tree::inner::MemtableId, version::Version, + vlog::BlobFile, AnyTree, BlobTree, Config, Guard, InternalValue, KvPair, Memtable, SeqNo, + TableId, Tree, TreeId, UserKey, UserValue, }; use std::{ ops::RangeBounds, @@ -137,27 +138,6 @@ pub trait AbstractTree { index: Option>, ) -> Box + Send + 'static>; - /// Ingests a sorted stream of key-value pairs into the tree. - /// - /// Can only be called on a new fresh, empty tree. - /// - /// # Errors - /// - /// Will return `Err` if an IO error occurs. - /// - /// # Panics - /// - /// Panics if the tree is **not** initially empty. - /// - /// Will panic if the input iterator is not sorted in ascending order. - #[doc(hidden)] - fn ingest( - &self, - iter: impl Iterator, - seqno_generator: &SequenceNumberCounter, - visible_seqno: &SequenceNumberCounter, - ) -> crate::Result<()>; - /// Returns the approximate number of tombstones in the tree. fn tombstone_count(&self) -> u64; diff --git a/src/any_tree.rs b/src/any_tree.rs index 29cfbaf6..e39a35ea 100644 --- a/src/any_tree.rs +++ b/src/any_tree.rs @@ -2,7 +2,10 @@ // This source code is licensed under both the Apache 2.0 and MIT License // (found in the LICENSE-* files in the repository) -use crate::{BlobTree, Tree}; +use crate::{ + blob_tree::ingest::BlobIngestion, tree::ingest::Ingestion, BlobTree, SeqNo, Tree, UserKey, + UserValue, +}; use enum_dispatch::enum_dispatch; /// May be a standard [`Tree`] or a [`BlobTree`] @@ -15,3 +18,60 @@ pub enum AnyTree { /// Key-value separated LSM-tree, see [`BlobTree`] Blob(BlobTree), } + +/// Unified ingestion builder over `AnyTree` +// Keep zero allocations and direct dispatch; boxing introduces heap indirection and `dyn` adds virtual dispatch. +// Ingestion calls use `&mut self` in tight loops; the active variant is stable and branch prediction makes the match cheap. +// Allowing this lint preserves hot-path performance at the cost of a larger enum size. +#[allow(clippy::large_enum_variant)] +pub enum AnyIngestion<'a> { + /// Ingestion for a standard LSM-tree + Standard(Ingestion<'a>), + /// Ingestion for a [`BlobTree`] with KV separation + Blob(BlobIngestion<'a>), +} + +impl<'a> AnyIngestion<'a> { + #[must_use] + /// Sets the sequence number used for subsequent writes + pub fn with_seqno(self, seqno: SeqNo) -> Self { + match self { + AnyIngestion::Standard(i) => AnyIngestion::Standard(i.with_seqno(seqno)), + AnyIngestion::Blob(b) => AnyIngestion::Blob(b.with_seqno(seqno)), + } + } + + /// Writes a key-value pair + pub fn write(&mut self, key: UserKey, value: UserValue) -> crate::Result<()> { + match self { + AnyIngestion::Standard(i) => i.write(key, value), + AnyIngestion::Blob(b) => b.write(key, value), + } + } + + /// Writes a tombstone for a key + pub fn write_tombstone(&mut self, key: UserKey) -> crate::Result<()> { + match self { + AnyIngestion::Standard(i) => i.write_tombstone(key), + AnyIngestion::Blob(b) => b.write_tombstone(key), + } + } + + /// Finalizes ingestion and registers created tables (and blob files if present) + pub fn finish(self) -> crate::Result<()> { + match self { + AnyIngestion::Standard(i) => i.finish(), + AnyIngestion::Blob(b) => b.finish(), + } + } +} + +impl AnyTree { + /// Starts an ingestion for any tree type (standard or blob) + pub fn ingestion(&self) -> crate::Result> { + match self { + AnyTree::Standard(t) => Ok(AnyIngestion::Standard(Ingestion::new(t)?)), + AnyTree::Blob(b) => Ok(AnyIngestion::Blob(BlobIngestion::new(b)?)), + } + } +} diff --git a/src/blob_tree/ingest.rs b/src/blob_tree/ingest.rs new file mode 100644 index 00000000..8372d3f1 --- /dev/null +++ b/src/blob_tree/ingest.rs @@ -0,0 +1,190 @@ +use crate::{ + blob_tree::handle::BlobIndirection, + file::BLOBS_FOLDER, + table::Table, + tree::ingest::Ingestion as TableIngestion, + vlog::{BlobFileWriter, ValueHandle}, + SeqNo, UserKey, UserValue, +}; + +/// Bulk ingestion for [`BlobTree`] +/// +/// Items NEED to be added in ascending key order. +/// +/// Uses table ingestion for the index and a blob file writer for large +/// values so both streams advance together. +pub struct BlobIngestion<'a> { + tree: &'a crate::BlobTree, + pub(crate) table: TableIngestion<'a>, + pub(crate) blob: BlobFileWriter, + seqno: SeqNo, + separation_threshold: u32, + last_key: Option, +} + +impl<'a> BlobIngestion<'a> { + /// Creates a new ingestion. + /// + /// # Errors + /// + /// Will return `Err` if an IO error occurs. + pub fn new(tree: &'a crate::BlobTree) -> crate::Result { + let kv = tree + .index + .config + .kv_separation_opts + .as_ref() + .expect("kv separation options should exist"); + + let blob_file_size = kv.file_target_size; + + let table = TableIngestion::new(&tree.index)?; + let blob = BlobFileWriter::new( + tree.index.0.blob_file_id_counter.clone(), + blob_file_size, + tree.index.config.path.join(BLOBS_FOLDER), + )? + .use_compression(kv.compression); + + let separation_threshold = kv.separation_threshold; + + Ok(Self { + tree, + table, + blob, + seqno: 0, + separation_threshold, + last_key: None, + }) + } + + /// Sets the ingestion seqno. + #[must_use] + pub fn with_seqno(mut self, seqno: SeqNo) -> Self { + self.seqno = seqno; + self.table = self.table.with_seqno(seqno); + self + } + + /// Writes a key-value pair. + /// + /// # Errors + /// + /// Will return `Err` if an IO error occurs. + pub fn write(&mut self, key: UserKey, value: UserValue) -> crate::Result<()> { + // Check order before any blob I/O to avoid partial writes on failure + if let Some(prev) = &self.last_key { + assert!( + key > *prev, + "next key in ingestion must be greater than last key" + ); + } + + #[allow(clippy::cast_possible_truncation)] + let value_size = value.len() as u32; + + if value_size >= self.separation_threshold { + let offset = self.blob.offset(); + let blob_file_id = self.blob.blob_file_id(); + let on_disk_size = self.blob.write(&key, self.seqno, &value)?; + + let indirection = BlobIndirection { + vhandle: ValueHandle { + blob_file_id, + offset, + on_disk_size, + }, + size: value_size, + }; + + let cloned_key = key.clone(); + let res = self.table.write_indirection(key, indirection); + if res.is_ok() { + self.last_key = Some(cloned_key); + } + res + } else { + let cloned_key = key.clone(); + let res = self.table.write(key, value); + if res.is_ok() { + self.last_key = Some(cloned_key); + } + res + } + } + + /// Writes a tombstone for a key. + /// + /// # Errors + /// + /// Will return `Err` if an IO error occurs. + pub fn write_tombstone(&mut self, key: UserKey) -> crate::Result<()> { + if let Some(prev) = &self.last_key { + assert!( + key > *prev, + "next key in ingestion must be greater than last key" + ); + } + + let cloned_key = key.clone(); + let res = self.table.write_tombstone(key); + if res.is_ok() { + self.last_key = Some(cloned_key); + } + res + } + + /// Finishes the ingestion. + /// + /// # Errors + /// + /// Will return `Err` if an IO error occurs. + pub fn finish(self) -> crate::Result<()> { + use crate::AbstractTree; + + // Capture required handles before consuming fields during finalization + let index = self.index().clone(); + let tree = self.tree.clone(); + + // Finalize both value log and index writer so the index sees a + // consistent set of blob files. + let blob_files = self.blob.finish()?; + let results = self.table.writer.finish()?; + + let created_tables = results + .into_iter() + .map(|(table_id, checksum)| -> crate::Result { + // Do not pin ingestion output tables here. Large ingests are + // typically placed in level 1 and would otherwise keep all + // filter and index blocks pinned, increasing memory pressure. + Table::recover( + index + .config + .path + .join(crate::file::TABLES_FOLDER) + .join(table_id.to_string()), + checksum, + index.id, + index.config.cache.clone(), + index.config.descriptor_table.clone(), + false, + false, + #[cfg(feature = "metrics")] + index.metrics.clone(), + ) + }) + .collect::>>()?; + + // Blob ingestion only appends new tables and blob files; sealed + // memtables remain unchanged and GC watermark stays at its + // neutral value for this operation. + tree.register_tables(&created_tables, Some(&blob_files), None, &[], 0)?; + + Ok(()) + } + + #[inline] + fn index(&self) -> &crate::Tree { + &self.tree.index + } +} diff --git a/src/blob_tree/mod.rs b/src/blob_tree/mod.rs index 1d04aa12..bc036147 100644 --- a/src/blob_tree/mod.rs +++ b/src/blob_tree/mod.rs @@ -4,6 +4,7 @@ mod gc; pub mod handle; +pub mod ingest; #[doc(hidden)] pub use gc::{FragmentationEntry, FragmentationMap}; @@ -264,119 +265,6 @@ impl AbstractTree for BlobTree { self.index.drop_range(range) } - fn ingest( - &self, - iter: impl Iterator, - seqno_generator: &SequenceNumberCounter, - visible_seqno: &SequenceNumberCounter, - ) -> crate::Result<()> { - use crate::tree::ingest::Ingestion; - use std::time::Instant; - - let seqno = seqno_generator.next(); - - let blob_file_size = self - .index - .config - .kv_separation_opts - .as_ref() - .expect("kv separation options should exist") - .file_target_size; - - let mut table_writer = Ingestion::new(&self.index)?.with_seqno(seqno); - let mut blob_writer = BlobFileWriter::new( - self.index.0.blob_file_id_counter.clone(), - blob_file_size, - self.index.config.path.join(BLOBS_FOLDER), - )? - .use_compression( - self.index - .config - .kv_separation_opts - .as_ref() - .expect("blob options should exist") - .compression, - ); - - let start = Instant::now(); - let mut count = 0; - let mut last_key = None; - - let separation_threshold = self - .index - .config - .kv_separation_opts - .as_ref() - .expect("kv separation options should exist") - .separation_threshold; - - for (key, value) in iter { - if let Some(last_key) = &last_key { - assert!( - key > last_key, - "next key in bulk ingest was not greater than last key", - ); - } - last_key = Some(key.clone()); - - #[expect(clippy::cast_possible_truncation, reason = "values are 32-bit max")] - let value_size = value.len() as u32; - - if value_size >= separation_threshold { - let offset = blob_writer.offset(); - let blob_file_id = blob_writer.blob_file_id(); - let on_disk_size = blob_writer.write(&key, seqno, &value)?; - - let indirection = BlobIndirection { - vhandle: ValueHandle { - blob_file_id, - offset, - on_disk_size, - }, - size: value_size, - }; - - table_writer.write_indirection(key, indirection)?; - } else { - table_writer.write(key, value)?; - } - - count += 1; - } - - let blob_files = blob_writer.finish()?; - let results = table_writer.writer.finish()?; - - let created_tables = results - .into_iter() - .map(|(table_id, checksum)| -> crate::Result
{ - Table::recover( - self.index - .config - .path - .join(crate::file::TABLES_FOLDER) - .join(table_id.to_string()), - checksum, - self.index.id, - self.index.config.cache.clone(), - self.index.config.descriptor_table.clone(), - false, - false, - #[cfg(feature = "metrics")] - self.index.metrics.clone(), - ) - }) - .collect::>>()?; - - self.register_tables(&created_tables, Some(&blob_files), None, &[], 0)?; - - visible_seqno.fetch_max(seqno + 1); - - log::info!("Ingested {count} items in {:?}", start.elapsed()); - - Ok(()) - } - fn major_compact(&self, target_size: u64, seqno_threshold: SeqNo) -> crate::Result<()> { self.index.major_compact(target_size, seqno_threshold) } diff --git a/src/lib.rs b/src/lib.rs index 8041a7fa..844cf69f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -169,14 +169,14 @@ pub use { merge::BoxedIterator, slice::Builder, table::{GlobalTableId, Table, TableId}, - tree::ingest::Ingestion, tree::inner::TreeId, tree::Guard as StandardGuard, value::InternalValue, }; pub use { - any_tree::AnyTree, + any_tree::{AnyIngestion, AnyTree}, + blob_tree::ingest::BlobIngestion, blob_tree::BlobTree, cache::Cache, compression::CompressionType, @@ -189,6 +189,7 @@ pub use { r#abstract::AbstractTree, seqno::SequenceNumberCounter, slice::Slice, + tree::ingest::Ingestion, tree::Tree, value::SeqNo, value_type::ValueType, diff --git a/src/tree/ingest.rs b/src/tree/ingest.rs index ee3d09ac..4430b926 100644 --- a/src/tree/ingest.rs +++ b/src/tree/ingest.rs @@ -4,21 +4,25 @@ use super::Tree; use crate::{ - compaction::MoveDown, config::FilterPolicyEntry, table::multi_writer::MultiWriter, - AbstractTree, BlobIndirection, SeqNo, UserKey, UserValue, + config::FilterPolicyEntry, table::multi_writer::MultiWriter, AbstractTree, BlobIndirection, + SeqNo, UserKey, UserValue, }; -use std::{path::PathBuf, sync::Arc}; +use std::path::PathBuf; pub const INITIAL_CANONICAL_LEVEL: usize = 1; /// Bulk ingestion /// /// Items NEED to be added in ascending key order. +/// +/// Ingested data bypasses memtables and is written directly into new tables, +/// using the same table writer configuration that is used for flush and compaction. pub struct Ingestion<'a> { folder: PathBuf, tree: &'a Tree, pub(crate) writer: MultiWriter, seqno: SeqNo, + last_key: Option, } impl<'a> Ingestion<'a> { @@ -28,6 +32,12 @@ impl<'a> Ingestion<'a> { /// /// Will return `Err` if an IO error occurs. pub fn new(tree: &'a Tree) -> crate::Result { + // Use the shared flush helper so ingestion participates in the same + // path as normal writes: any dirty memtable content is moved into + // tables before building new tables from the ingestion stream. + // This keeps the lookup path ordered as active > sealed > tables. + tree.flush_active_memtable(SeqNo::MAX)?; + let folder = tree.config.path.join(crate::file::TABLES_FOLDER); log::debug!("Ingesting into tables in {}", folder.display()); @@ -100,6 +110,7 @@ impl<'a> Ingestion<'a> { tree, writer, seqno: 0, + last_key: None, }) } @@ -122,6 +133,14 @@ impl<'a> Ingestion<'a> { ) -> crate::Result<()> { use crate::coding::Encode; + if let Some(prev) = &self.last_key { + assert!( + key > *prev, + "next key in ingestion must be greater than last key" + ); + } + + let cloned_key = key.clone(); self.writer.write(crate::InternalValue::from_components( key, indirection.encode_into_vec(), @@ -131,6 +150,8 @@ impl<'a> Ingestion<'a> { self.writer.register_blob(indirection); + // Remember the last user key to validate the next call's ordering + self.last_key = Some(cloned_key); Ok(()) } @@ -140,12 +161,24 @@ impl<'a> Ingestion<'a> { /// /// Will return `Err` if an IO error occurs. pub fn write(&mut self, key: UserKey, value: UserValue) -> crate::Result<()> { - self.writer.write(crate::InternalValue::from_components( + if let Some(prev) = &self.last_key { + assert!( + key > *prev, + "next key in ingestion must be greater than last key" + ); + } + + let cloned_key = key.clone(); + let res = self.writer.write(crate::InternalValue::from_components( key, value, self.seqno, crate::ValueType::Value, - )) + )); + if res.is_ok() { + self.last_key = Some(cloned_key); + } + res } /// Writes a key-value pair. @@ -153,14 +186,25 @@ impl<'a> Ingestion<'a> { /// # Errors /// /// Will return `Err` if an IO error occurs. - #[doc(hidden)] pub fn write_tombstone(&mut self, key: UserKey) -> crate::Result<()> { - self.writer.write(crate::InternalValue::from_components( + if let Some(prev) = &self.last_key { + assert!( + key > *prev, + "next key in ingestion must be greater than last key" + ); + } + + let cloned_key = key.clone(); + let res = self.writer.write(crate::InternalValue::from_components( key, crate::UserValue::empty(), self.seqno, crate::ValueType::Tombstone, - )) + )); + if res.is_ok() { + self.last_key = Some(cloned_key); + } + res } /// Finishes the ingestion. @@ -175,6 +219,8 @@ impl<'a> Ingestion<'a> { log::info!("Finished ingestion writer"); + // Turn the writer output into fully recovered tables that can be + // registered as a fresh L0 run. let created_tables = results .into_iter() .map(|(table_id, checksum)| -> crate::Result
{ @@ -184,6 +230,9 @@ impl<'a> Ingestion<'a> { // .with_metrics(metrics) // .run(path, tree_id, cache, descriptor_table); + // Do not pin ingestion output tables here. Large ingests are + // typically placed in level 1 and would otherwise keep all + // filter and index blocks pinned, increasing memory pressure. Table::recover( self.folder.join(table_id.to_string()), checksum, @@ -198,6 +247,9 @@ impl<'a> Ingestion<'a> { }) .collect::>>()?; + // Ingestion produces new tables only and does not touch sealed + // memtables directly, so the deletion set is empty and the + // watermark is left at its neutral value. self.tree .register_tables(&created_tables, None, None, &[], 0)?; diff --git a/src/tree/inner.rs b/src/tree/inner.rs index 02dc5c26..4cc05830 100644 --- a/src/tree/inner.rs +++ b/src/tree/inner.rs @@ -63,8 +63,8 @@ pub struct TreeInner { /// can be concurrent next to each other. pub(crate) major_compaction_lock: RwLock<()>, + /// Serializes flush operations. pub(crate) flush_lock: Mutex<()>, - #[doc(hidden)] #[cfg(feature = "metrics")] pub metrics: Arc, diff --git a/src/tree/mod.rs b/src/tree/mod.rs index 49691945..25d920d2 100644 --- a/src/tree/mod.rs +++ b/src/tree/mod.rs @@ -7,7 +7,7 @@ pub mod inner; pub mod sealed; use crate::{ - compaction::{drop_range::OwnedBounds, CompactionStrategy}, + compaction::{drop_range::OwnedBounds, state::CompactionState, CompactionStrategy}, config::Config, format_version::FormatVersion, iter_guard::{IterGuard, IterGuardImpl}, @@ -103,8 +103,14 @@ impl AbstractTree for Tree { #[expect(clippy::significant_drop_tightening)] fn get_internal_entry(&self, key: &[u8], seqno: SeqNo) -> crate::Result> { + // Returns the newest visible version (`entry.seqno < seqno`) by consulting sources + // in recency order and returning on first hit: active memtable, sealed memtables, + // then tables (each scanned newest-first). + let version_history_lock = self.version_history.read().expect("lock is poisoned"); let super_version = version_history_lock.get_version_for_snapshot(seqno); + // Avoid holding the read lock across potential I/O in table lookups + drop(version_history_lock); if let Some(entry) = super_version.active_memtable.get(key, seqno) { return Ok(ignore_tombstone_value(entry)); @@ -118,7 +124,13 @@ impl AbstractTree for Tree { } // Now look in tables... this may involve disk I/O - self.get_internal_entry_from_tables(&super_version.version, key, seqno) + if let Some(entry) = + self.get_internal_entry_from_tables(&super_version.version, key, seqno)? + { + return Ok(ignore_tombstone_value(entry)); + } + + Ok(None) } fn current_version(&self) -> Version { @@ -193,48 +205,6 @@ impl AbstractTree for Tree { .sum() } - fn ingest( - &self, - iter: impl Iterator, - seqno_generator: &SequenceNumberCounter, - visible_seqno: &SequenceNumberCounter, - ) -> crate::Result<()> { - use crate::tree::ingest::Ingestion; - use std::time::Instant; - - let seqno = seqno_generator.next(); - - // TODO: allow ingestion always, by flushing memtable - - let mut writer = Ingestion::new(self)?.with_seqno(seqno); - - let start = Instant::now(); - let mut count = 0; - let mut last_key = None; - - for (key, value) in iter { - if let Some(last_key) = &last_key { - assert!( - key > last_key, - "next key in bulk ingest was not greater than last key, last: {last_key:?}, next: {key:?}", - ); - } - last_key = Some(key.clone()); - - writer.write(key, value)?; - - count += 1; - } - - writer.finish()?; - - visible_seqno.fetch_max(seqno + 1); - - log::info!("Ingested {count} items in {:?}", start.elapsed()); - - Ok(()) - } - fn drop_range, R: RangeBounds>(&self, range: R) -> crate::Result<()> { let (bounds, is_empty) = Self::range_bounds_to_owned_bounds(&range); @@ -545,7 +515,6 @@ impl AbstractTree for Tree { self.current_version().level(idx).map(|x| x.table_count()) } - #[expect(clippy::significant_drop_tightening)] fn approximate_len(&self) -> usize { let super_version = self .version_history @@ -578,7 +547,6 @@ impl AbstractTree for Tree { .sum() } - #[expect(clippy::significant_drop_tightening)] fn get_highest_memtable_seqno(&self) -> Option { let version = self .version_history @@ -709,6 +677,38 @@ impl Tree { Ok(tree) } + pub(crate) fn consume_writer( + &self, + writer: crate::table::Writer, + ) -> crate::Result> { + let table_file_path = writer.path.clone(); + + let Some((_, checksum)) = writer.finish()? else { + return Ok(None); + }; + + log::debug!("Finalized table write at {}", table_file_path.display()); + + let pin_filter = self.config.filter_block_pinning_policy.get(0); + let pin_index = self.config.index_block_pinning_policy.get(0); + + let created_table = Table::recover( + table_file_path, + checksum, + self.id, + self.config.cache.clone(), + self.config.descriptor_table.clone(), + pin_filter, + pin_index, + #[cfg(feature = "metrics")] + self.metrics.clone(), + )?; + + log::debug!("Flushed table to {:?}", created_table.path); + + Ok(Some(created_table)) + } + /// Returns `true` if there are some tables that are being compacted. #[doc(hidden)] #[must_use] @@ -731,10 +731,10 @@ impl Tree { return Some(entry); } } - None } + // Scan levels top-down and runs newest-first; return the first table hit fn get_internal_entry_from_tables( &self, version: &Version, @@ -751,7 +751,7 @@ impl Tree { if run.len() >= 4 { if let Some(table) = run.get_for_key(key) { if let Some(item) = table.get(key, seqno, key_hash)? { - return Ok(ignore_tombstone_value(item)); + return Ok(Some(item)); } } } else { @@ -762,7 +762,7 @@ impl Tree { } if let Some(item) = table.get(key, seqno, key_hash)? { - return Ok(ignore_tombstone_value(item)); + return Ok(Some(item)); } } } @@ -931,9 +931,7 @@ impl Tree { config, major_compaction_lock: RwLock::default(), flush_lock: Mutex::default(), - compaction_state: Arc::new(Mutex::new( - crate::compaction::state::CompactionState::default(), - )), + compaction_state: Arc::new(Mutex::new(CompactionState::default())), #[cfg(feature = "metrics")] metrics, diff --git a/tests/ingestion_api.rs b/tests/ingestion_api.rs new file mode 100644 index 00000000..c790d01f --- /dev/null +++ b/tests/ingestion_api.rs @@ -0,0 +1,298 @@ +use lsm_tree::{AbstractTree, Config, KvSeparationOptions, SeqNo}; + +#[test] +fn tree_ingestion_tombstones_delete_existing_keys() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + let tree = Config::new(folder, Default::default()).open()?; + + for i in 0..10u32 { + let key = format!("k{:03}", i); + tree.insert(key.as_bytes(), b"v", 0); + } + let mut ingest = tree.ingestion()?.with_seqno(10); + for i in 0..10u32 { + let key = format!("k{:03}", i); + ingest.write_tombstone(key.as_bytes().into())?; + } + ingest.finish()?; + + for i in 0..10u32 { + let key = format!("k{:03}", i); + assert!(tree.get(key.as_bytes(), SeqNo::MAX)?.is_none()); + } + assert_eq!(tree.tombstone_count(), 10); + + Ok(()) +} + +#[test] +fn sealed_memtable_value_overrides_table_value() -> lsm_tree::Result<()> { + use lsm_tree::AbstractTree; + let folder = tempfile::tempdir()?; + let tree = lsm_tree::Config::new(folder, Default::default()).open()?; + + // Older table value via ingestion (seqno 1) + { + let mut ingest = tree.ingestion()?.with_seqno(1); + ingest.write(b"k".as_slice().into(), b"old".as_slice().into())?; + ingest.finish()?; + } + + // Newer value in memtable (seqno 2), then seal it + tree.insert(b"k", b"new", 2); + let _ = tree.rotate_memtable(); // move active -> sealed + + // Read should return the sealed memtable value + assert_eq!( + tree.get(b"k", lsm_tree::SeqNo::MAX)?, + Some(b"new".as_slice().into()) + ); + Ok(()) +} + +#[test] +fn sealed_memtable_tombstone_overrides_table_value() -> lsm_tree::Result<()> { + use lsm_tree::AbstractTree; + let folder = tempfile::tempdir()?; + let tree = lsm_tree::Config::new(folder, Default::default()).open()?; + + // Older table value via ingestion (seqno 1) + { + let mut ingest = tree.ingestion()?.with_seqno(1); + ingest.write(b"k".as_slice().into(), b"old".as_slice().into())?; + ingest.finish()?; + } + + // Newer tombstone in memtable (seqno 2), then seal it + tree.remove(b"k", 2); + let _ = tree.rotate_memtable(); + + // Read should see the delete from sealed memtable + assert!(tree.get(b"k", lsm_tree::SeqNo::MAX)?.is_none()); + Ok(()) +} + +#[test] +fn tables_newest_first_returns_highest_seqno() -> lsm_tree::Result<()> { + use lsm_tree::AbstractTree; + let folder = tempfile::tempdir()?; + let tree = lsm_tree::Config::new(folder, Default::default()).open()?; + + // Two separate ingestions create two tables containing the same key at different seqnos + { + let mut ingest = tree.ingestion()?.with_seqno(1); + ingest.write(b"k".as_slice().into(), b"v1".as_slice().into())?; + ingest.finish()?; + } + { + let mut ingest = tree.ingestion()?.with_seqno(2); + ingest.write(b"k".as_slice().into(), b"v2".as_slice().into())?; + ingest.finish()?; + } + + // With memtables empty, read should return the value from the newest table run (seqno 2) + assert_eq!( + tree.get(b"k", lsm_tree::SeqNo::MAX)?, + Some(b"v2".as_slice().into()) + ); + Ok(()) +} + +#[test] +#[should_panic(expected = "next key in ingestion must be greater than last key")] +fn ingestion_enforces_order_standard_panics() { + let folder = tempfile::tempdir().unwrap(); + let tree = lsm_tree::Config::new(folder, Default::default()) + .open() + .unwrap(); + + let mut ingest = tree.ingestion().unwrap().with_seqno(1); + // First write higher key, then lower to trigger ordering assertion + ingest + .write(b"k2".as_slice().into(), b"v".as_slice().into()) + .unwrap(); + // Panics here + let _ = ingest.write(b"k1".as_slice().into(), b"v".as_slice().into()); +} + +#[test] +fn blob_ingestion_out_of_order_panics_without_blob_write() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + let tree = lsm_tree::Config::new(folder, Default::default()) + .with_kv_separation(Some(KvSeparationOptions::default().separation_threshold(8))) + .open()?; + + let before = tree.blob_file_count(); + + // Use a small value for the first write to avoid blob I/O + let result = std::panic::catch_unwind(|| { + let mut ingest = tree.ingestion().unwrap().with_seqno(1); + ingest + .write(b"k2".as_slice().into(), b"x".as_slice().into()) + .unwrap(); + // Second write would require blob I/O, but ordering check should fire before any blob write + let _ = ingest.write(b"k1".as_slice().into(), vec![1u8; 16].into()); + }); + assert!(result.is_err()); + + let after = tree.blob_file_count(); + assert_eq!(before, after); + Ok(()) +} + +#[test] +fn memtable_put_overrides_table_tombstone() -> lsm_tree::Result<()> { + use lsm_tree::AbstractTree; + let folder = tempfile::tempdir()?; + let tree = lsm_tree::Config::new(folder, Default::default()).open()?; + + // Older put written via ingestion to tables (seqno 1) + { + let mut ingest = tree.ingestion()?.with_seqno(1); + ingest.write(b"k".as_slice().into(), b"v1".as_slice().into())?; + ingest.finish()?; + } + + // Newer tombstone written via ingestion to tables (seqno 2) + { + let mut ingest = tree.ingestion()?.with_seqno(2); + ingest.write_tombstone(b"k".as_slice().into())?; + ingest.finish()?; + } + + // Newest put in active memtable (seqno 3) should override table tombstone + tree.insert(b"k", b"v3", 3); + assert_eq!( + tree.get(b"k", lsm_tree::SeqNo::MAX)?, + Some(b"v3".as_slice().into()) + ); + Ok(()) +} + +#[test] +fn blob_tree_ingestion_tombstones_delete_existing_keys() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + let tree = Config::new(folder, Default::default()) + .with_kv_separation(Some(KvSeparationOptions::default().separation_threshold(1))) + .open()?; + + for i in 0..8u32 { + let key = format!("b{:03}", i); + tree.insert(key.as_bytes(), b"x", 0); + } + + let mut ingest = tree.ingestion()?.with_seqno(10); + for i in 0..8u32 { + let key = format!("b{:03}", i); + ingest.write_tombstone(key.as_bytes().into())?; + } + ingest.finish()?; + + for i in 0..8u32 { + let key = format!("b{:03}", i); + assert!(tree.get(key.as_bytes(), SeqNo::MAX)?.is_none()); + } + assert_eq!(tree.tombstone_count(), 8); + + Ok(()) +} + +#[test] +fn tree_ingestion_finish_no_writes_noop() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + let tree = Config::new(folder, Default::default()).open()?; + + let before_tables = tree.table_count(); + tree.ingestion()?.finish()?; + let after_tables = tree.table_count(); + + assert_eq!(before_tables, after_tables); + assert!(tree.is_empty(SeqNo::MAX, None)?); + + Ok(()) +} + +#[test] +fn blob_ingestion_only_tombstones_does_not_create_blob_files() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + let tree = Config::new(folder, Default::default()) + .with_kv_separation(Some(KvSeparationOptions::default().separation_threshold(1))) + .open()?; + + for i in 0..5u32 { + let key = format!("d{:03}", i); + tree.insert(key.as_bytes(), b"value", 0); + } + + let before_blobs = tree.blob_file_count(); + + let mut ingest = tree.ingestion()?.with_seqno(10); + for i in 0..5u32 { + let key = format!("d{:03}", i); + ingest.write_tombstone(key.as_bytes().into())?; + } + ingest.finish()?; + + let after_blobs = tree.blob_file_count(); + assert_eq!(before_blobs, after_blobs); + + for i in 0..5u32 { + let key = format!("d{:03}", i); + assert!(tree.get(key.as_bytes(), SeqNo::MAX)?.is_none()); + } + + Ok(()) +} + +#[test] +fn blob_ingestion_finish_no_writes_noop() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + let tree = Config::new(folder, Default::default()) + .with_kv_separation(Some(KvSeparationOptions::default().separation_threshold(1))) + .open()?; + + let before_tables = tree.table_count(); + let before_blobs = tree.blob_file_count(); + + tree.ingestion()?.finish()?; + + let after_tables = tree.table_count(); + let after_blobs = tree.blob_file_count(); + + assert_eq!(before_tables, after_tables); + assert_eq!(before_blobs, after_blobs); + assert!(tree.is_empty(SeqNo::MAX, None)?); + + Ok(()) +} + +#[test] +fn blob_ingestion_separates_large_values_and_reads_ok() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + let tree = Config::new(folder, Default::default()) + .with_kv_separation(Some(KvSeparationOptions::default().separation_threshold(8))) + .open()?; + + let mut ingest = tree.ingestion()?.with_seqno(1); + ingest.write("k_big1".as_bytes().into(), vec![1u8; 16].into())?; + ingest.write("k_big2".as_bytes().into(), vec![2u8; 32].into())?; + ingest.write("k_small".as_bytes().into(), b"abc".as_slice().into())?; + ingest.finish()?; + + assert!(tree.blob_file_count() >= 1); + + assert_eq!( + tree.get("k_small", SeqNo::MAX)?, + Some(b"abc".as_slice().into()) + ); + assert_eq!( + tree.get("k_big1", SeqNo::MAX)?.as_deref().map(|s| s.len()), + Some(16) + ); + assert_eq!( + tree.get("k_big2", SeqNo::MAX)?.as_deref().map(|s| s.len()), + Some(32) + ); + + Ok(()) +} diff --git a/tests/ingestion_invariants.rs b/tests/ingestion_invariants.rs new file mode 100644 index 00000000..8f6c689a --- /dev/null +++ b/tests/ingestion_invariants.rs @@ -0,0 +1,153 @@ +use std::sync::mpsc; +use std::thread; +use std::time::Duration; + +use lsm_tree::{AbstractTree, Config, KvSeparationOptions, SeqNo}; + +#[test] +fn ingestion_autoflushes_active_memtable() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + let tree = Config::new(&folder, Default::default()).open()?; + + // Write to active memtable + for i in 0..10u32 { + let k = format!("a{:03}", i); + tree.insert(k.as_bytes(), b"v", 1); + } + + let tables_before = tree.table_count(); + let sealed_before = tree.sealed_memtable_count(); + assert_eq!(sealed_before, 0); + + // Start ingestion (should auto-flush active) + tree.ingestion()?.with_seqno(10).finish()?; + + // After ingestion, data is in tables; no sealed memtables + assert_eq!(tree.sealed_memtable_count(), 0); + assert!(tree.table_count() >= tables_before + 1); + + // Reads must succeed from tables + for i in 0..10u32 { + let k = format!("a{:03}", i); + assert_eq!( + tree.get(k.as_bytes(), SeqNo::MAX)?, + Some(b"v".as_slice().into()) + ); + } + + Ok(()) +} + +#[test] +fn ingestion_flushes_sealed_memtables() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + let tree = Config::new(&folder, Default::default()).open()?; + + // Put items into active and seal them + for i in 0..8u32 { + let k = format!("s{:03}", i); + tree.insert(k.as_bytes(), b"x", 1); + } + assert!(tree.rotate_memtable().is_some()); + assert!(tree.sealed_memtable_count() > 0); + + let tables_before = tree.table_count(); + + // Ingestion should flush sealed memtables and register resulting tables + tree.ingestion()?.with_seqno(20).finish()?; + + assert_eq!(tree.sealed_memtable_count(), 0); + assert!(tree.table_count() >= tables_before + 1); + + for i in 0..8u32 { + let k = format!("s{:03}", i); + assert_eq!( + tree.get(k.as_bytes(), SeqNo::MAX)?, + Some(b"x".as_slice().into()) + ); + } + + Ok(()) +} + +#[test] +fn ingestion_blocks_memtable_writes_until_finish() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + let tree = Config::new(&folder, Default::default()).open()?; + + // Acquire ingestion and keep it active while another thread performs writes + let ingest = tree.ingestion()?.with_seqno(5); + + let (started_tx, started_rx) = mpsc::channel(); + let (done_tx, done_rx) = mpsc::channel(); + let tree2 = tree.clone(); + + let handle = thread::spawn(move || { + started_tx.send(()).ok(); + tree2.insert(b"k_block", b"v", 6); + done_tx.send(()).ok(); + }); + + // Wait for the writer thread to start the attempt + started_rx.recv().unwrap(); + + // Give it a moment; the insert should complete and not be blocked by ingestion + thread::sleep(Duration::from_millis(100)); + assert!(done_rx.try_recv().is_ok(), "insert should not be blocked"); + + handle.join().unwrap(); + ingest.finish()?; + + // Verify the write landed and is visible + assert_eq!( + tree.get(b"k_block", SeqNo::MAX)?, + Some(b"v".as_slice().into()) + ); + + Ok(()) +} + +#[test] +fn blob_ingestion_honors_invariants_and_blocks_writes() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + let tree = Config::new(&folder, Default::default()) + .with_kv_separation(Some(KvSeparationOptions::default().separation_threshold(1))) + .open()?; + + // Write small values into memtable and then start blob ingestion + for i in 0..4u32 { + let k = format!("b{:03}", i); + tree.insert(k.as_bytes(), b"y", 1); + } + + let (started_tx, started_rx) = mpsc::channel(); + let (done_tx, done_rx) = mpsc::channel(); + let tree2 = tree.clone(); + + let ingest = tree.ingestion()?.with_seqno(30); + + let handle = thread::spawn(move || { + started_tx.send(()).ok(); + tree2.insert(b"b999", b"z", 31); + done_tx.send(()).ok(); + }); + + started_rx.recv().unwrap(); + thread::sleep(Duration::from_millis(100)); + assert!(done_rx.try_recv().is_ok()); + + handle.join().unwrap(); + ingest.finish()?; + + // Data visible after ingestion, including concurrent write + for i in 0..4u32 { + let k = format!("b{:03}", i); + assert_eq!( + tree.get(k.as_bytes(), SeqNo::MAX)?, + Some(b"y".as_slice().into()) + ); + } + assert_eq!(tree.get(b"b999", SeqNo::MAX)?, Some(b"z".as_slice().into())); + + Ok(()) +} diff --git a/tests/tree_bulk_ingest.rs b/tests/tree_bulk_ingest.rs index 0712ed73..268cbe8c 100644 --- a/tests/tree_bulk_ingest.rs +++ b/tests/tree_bulk_ingest.rs @@ -12,15 +12,16 @@ fn tree_bulk_ingest() -> lsm_tree::Result<()> { let tree = Config::new(folder, seqno.clone()).open()?; - tree.ingest( - (0..ITEM_COUNT as u64).map(|x| { - let k = x.to_be_bytes(); - let v = nanoid::nanoid!(); - (k.into(), v.into()) - }), - &seqno, - &visible_seqno, - )?; + let mut ingestion = tree.ingestion()?; + let seq = seqno.next(); + ingestion = ingestion.with_seqno(seq); + for x in 0..ITEM_COUNT as u64 { + let k = x.to_be_bytes(); + let v = nanoid::nanoid!(); + ingestion.write(k.into(), v.into())?; + } + ingestion.finish()?; + visible_seqno.fetch_max(seq + 1); assert_eq!(tree.len(SeqNo::MAX, None)?, ITEM_COUNT); assert_eq!( @@ -47,15 +48,16 @@ fn tree_copy() -> lsm_tree::Result<()> { let src = Config::new(folder, seqno.clone()).open()?; - src.ingest( - (0..ITEM_COUNT as u64).map(|x| { - let k = x.to_be_bytes(); - let v = nanoid::nanoid!(); - (k.into(), v.into()) - }), - &seqno, - &visible_seqno, - )?; + let mut ingestion = src.ingestion()?; + let seq = seqno.next(); + ingestion = ingestion.with_seqno(seq); + for x in 0..ITEM_COUNT as u64 { + let k = x.to_be_bytes(); + let v = nanoid::nanoid!(); + ingestion.write(k.into(), v.into())?; + } + ingestion.finish()?; + visible_seqno.fetch_max(seq + 1); assert_eq!(src.len(SeqNo::MAX, None)?, ITEM_COUNT); assert_eq!( @@ -73,13 +75,15 @@ fn tree_copy() -> lsm_tree::Result<()> { let folder = tempfile::tempdir()?; let dest = Config::new(folder, seqno.clone()).open()?; - dest.ingest( - src.iter(SeqNo::MAX, None) - .map(|x| x.into_inner()) - .map(|x| x.unwrap()), - &seqno, - &visible_seqno, - )?; + let mut ingestion = dest.ingestion()?; + let seq = seqno.next(); + ingestion = ingestion.with_seqno(seq); + for item in src.iter(SeqNo::MAX, None) { + let (k, v) = item.into_inner().unwrap(); + ingestion.write(k, v)?; + } + ingestion.finish()?; + visible_seqno.fetch_max(seq + 1); assert_eq!(dest.len(SeqNo::MAX, None)?, ITEM_COUNT); assert_eq!( @@ -108,15 +112,16 @@ fn blob_tree_bulk_ingest() -> lsm_tree::Result<()> { .with_kv_separation(Some(KvSeparationOptions::default().separation_threshold(1))) .open()?; - tree.ingest( - (0..ITEM_COUNT as u64).map(|x| { - let k = x.to_be_bytes(); - let v = nanoid::nanoid!(); - (k.into(), v.into()) - }), - &seqno, - &visible_seqno, - )?; + let mut ingestion = tree.ingestion()?; + let seq = seqno.next(); + ingestion = ingestion.with_seqno(seq); + for x in 0..ITEM_COUNT as u64 { + let k = x.to_be_bytes(); + let v = nanoid::nanoid!(); + ingestion.write(k.into(), v.into())?; + } + ingestion.finish()?; + visible_seqno.fetch_max(seq + 1); assert_eq!(tree.len(SeqNo::MAX, None)?, ITEM_COUNT); assert_eq!( @@ -146,15 +151,16 @@ fn blob_tree_copy() -> lsm_tree::Result<()> { .with_kv_separation(Some(KvSeparationOptions::default().separation_threshold(1))) .open()?; - src.ingest( - (0..ITEM_COUNT as u64).map(|x| { - let k = x.to_be_bytes(); - let v = nanoid::nanoid!(); - (k.into(), v.into()) - }), - &seqno, - &visible_seqno, - )?; + let mut ingestion = src.ingestion()?; + let seq = seqno.next(); + ingestion = ingestion.with_seqno(seq); + for x in 0..ITEM_COUNT as u64 { + let k = x.to_be_bytes(); + let v = nanoid::nanoid!(); + ingestion.write(k.into(), v.into())?; + } + ingestion.finish()?; + visible_seqno.fetch_max(seq + 1); assert_eq!(src.len(SeqNo::MAX, None)?, ITEM_COUNT); assert_eq!( @@ -175,13 +181,15 @@ fn blob_tree_copy() -> lsm_tree::Result<()> { .with_kv_separation(Some(KvSeparationOptions::default().separation_threshold(1))) .open()?; - dest.ingest( - src.iter(SeqNo::MAX, None) - .map(|x| x.into_inner()) - .map(|x| x.unwrap()), - &seqno, - &visible_seqno, - )?; + let mut ingestion = dest.ingestion()?; + let seq = seqno.next(); + ingestion = ingestion.with_seqno(seq); + for item in src.iter(SeqNo::MAX, None) { + let (k, v) = item.into_inner().unwrap(); + ingestion.write(k, v)?; + } + ingestion.finish()?; + visible_seqno.fetch_max(seq + 1); assert_eq!(dest.len(SeqNo::MAX, None)?, ITEM_COUNT); assert_eq!(