From fc17be23004ea94482363f154c56976297aee54e Mon Sep 17 00:00:00 2001 From: zaidoon Date: Mon, 1 Sep 2025 09:02:56 -0400 Subject: [PATCH] Add prefix filter support --- benches/run_reader.rs | 267 +++ src/compaction/flavour.rs | 3 +- src/config/mod.rs | 19 +- src/lib.rs | 3 + src/prefix.rs | 301 +++ src/range.rs | 35 +- src/run_reader.rs | 281 ++- src/table/filter/block.rs | 11 + src/table/filter/standard_bloom/mod.rs | 23 + src/table/inner.rs | 4 + src/table/meta.rs | 8 + src/table/mod.rs | 221 +- src/table/multi_writer.rs | 17 + src/table/writer/filter/full.rs | 5 + src/table/writer/filter/mod.rs | 4 + src/table/writer/filter/partitioned.rs | 25 +- src/table/writer/mod.rs | 98 +- src/tree/ingest.rs | 4 + src/tree/mod.rs | 96 +- tests/prefix_filter.rs | 2594 ++++++++++++++++++++++++ 20 files changed, 3888 insertions(+), 131 deletions(-) create mode 100644 benches/run_reader.rs create mode 100644 src/prefix.rs create mode 100644 tests/prefix_filter.rs diff --git a/benches/run_reader.rs b/benches/run_reader.rs new file mode 100644 index 00000000..6f15d6ac --- /dev/null +++ b/benches/run_reader.rs @@ -0,0 +1,267 @@ +use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion}; +use lsm_tree::prefix::FixedPrefixExtractor; +use lsm_tree::{AbstractTree, Config}; +use std::sync::Arc; +use std::time::Instant; +use tempfile::TempDir; + +fn create_tree_with_segments( + segment_count: usize, + with_prefix_extractor: bool, +) -> (TempDir, lsm_tree::Tree) { + let tempdir = tempfile::tempdir().unwrap(); + + let mut config = Config::new(&tempdir); + if with_prefix_extractor { + config = config.prefix_extractor(Arc::new(FixedPrefixExtractor::new(8))); + } + + let tree = config.open().unwrap(); + + // Create segments with distinct prefixes + for segment_idx in 0..segment_count { + let prefix = format!("seg{:04}", segment_idx); + + // Add 100 keys per segment + for key_idx in 0..100 { + let key = format!("{}_{:04}", prefix, key_idx); + tree.insert(key.as_bytes(), vec![0u8; 100], 0); + } + + // Flush to create a segment + tree.flush_active_memtable(0).unwrap(); + } + + (tempdir, tree) +} + +fn benchmark_range_query(c: &mut Criterion) { + let mut group = c.benchmark_group("range_query"); + + // Test different segment counts + for segment_count in [10, 100, 500, 1000] { + // Benchmark without prefix extractor + group.bench_with_input( + BenchmarkId::new("no_prefix", segment_count), + &segment_count, + |b, &count| { + let (_tempdir, tree) = create_tree_with_segments(count, false); + + b.iter(|| { + // Query for a range that doesn't exist + let start: &[u8] = b"zzz_0000"; + let end: &[u8] = b"zzz_9999"; + let iter = tree.range(start..=end, 0, None); + // Force evaluation by counting + let count = iter.count(); + black_box(count); + }); + }, + ); + + // Benchmark with prefix extractor + group.bench_with_input( + BenchmarkId::new("with_prefix", segment_count), + &segment_count, + |b, &count| { + let (_tempdir, tree) = create_tree_with_segments(count, true); + + b.iter(|| { + // Query for a range that doesn't exist (will check filters) + let start: &[u8] = b"zzz_0000"; + let end: &[u8] = b"zzz_9999"; + let iter = tree.range(start..=end, 0, None); + // Force evaluation by counting + let count = iter.count(); + black_box(count); + }); + }, + ); + + // Benchmark with prefix extractor - existing prefix + group.bench_with_input( + BenchmarkId::new("with_prefix_exists", segment_count), + &segment_count, + |b, &count| { + let (_tempdir, tree) = create_tree_with_segments(count, true); + + b.iter(|| { + // Query for a range that exists in the middle + let mid = count / 2; + let prefix = format!("seg{:04}", mid); + let start_str = format!("{}_0000", prefix); + let end_str = format!("{}_0099", prefix); + let start: &[u8] = start_str.as_bytes(); + let end: &[u8] = end_str.as_bytes(); + let iter = tree.range(start..=end, 0, None); + // Force evaluation by counting + let count = iter.count(); + black_box(count); + }); + }, + ); + } + + group.finish(); +} + +fn benchmark_timing_comparison(_c: &mut Criterion) { + println!("\n=== RunReader Performance Benchmark ==="); + println!("Testing impact of prefix filter checks on large runs\n"); + + for segment_count in [100, 500, 1000] { + println!("\n--- Testing with {} segments ---", segment_count); + + // Test without prefix extractor + let (_tempdir_no_prefix, tree_no_prefix) = create_tree_with_segments(segment_count, false); + + let start = Instant::now(); + for _ in 0..100 { + let start_key: &[u8] = b"zzz_0000"; + let end_key: &[u8] = b"zzz_9999"; + let iter = tree_no_prefix.range(start_key..=end_key, 0, None); + let _ = iter.count(); + } + let no_prefix_time = start.elapsed(); + let avg_no_prefix = no_prefix_time.as_nanos() / 100; + + println!(" Without prefix extractor: {:>8} ns/query", avg_no_prefix); + + // Test with prefix extractor + let (_tempdir_with_prefix, tree_with_prefix) = + create_tree_with_segments(segment_count, true); + + let start = Instant::now(); + for _ in 0..100 { + let start_key: &[u8] = b"zzz_0000"; + let end_key: &[u8] = b"zzz_9999"; + let iter = tree_with_prefix.range(start_key..=end_key, 0, None); + let _ = iter.count(); + } + let with_prefix_time = start.elapsed(); + let avg_with_prefix = with_prefix_time.as_nanos() / 100; + + println!( + " With prefix extractor: {:>8} ns/query", + avg_with_prefix + ); + + if avg_with_prefix > avg_no_prefix { + let overhead = avg_with_prefix - avg_no_prefix; + println!( + " Overhead: {} ns ({:.1}%)", + overhead, + (overhead as f64 / avg_no_prefix as f64) * 100.0 + ); + } else { + let savings = avg_no_prefix - avg_with_prefix; + println!( + " Savings: {} ns ({:.1}%)", + savings, + (savings as f64 / avg_no_prefix as f64) * 100.0 + ); + } + + // Check CPU cost per segment + if segment_count > 0 { + let per_segment_overhead = if avg_with_prefix > avg_no_prefix { + (avg_with_prefix - avg_no_prefix) / segment_count as u128 + } else { + 0 + }; + println!(" Per-segment overhead: ~{} ns", per_segment_overhead); + } + } + + println!("\n=== Summary ==="); + println!("MAX_UPFRONT_CHECKS optimization limits overhead to checking at most 10 segments."); + println!( + "For runs with >10 segments, remaining segments are filtered lazily during iteration.\n" + ); +} + +fn run_timing_benchmark() { + println!("\n=== RunReader Performance Benchmark ==="); + println!("Testing impact of prefix filter checks on large runs\n"); + + for segment_count in [100, 500, 1000] { + println!("\n--- Testing with {} segments ---", segment_count); + + // Test without prefix extractor + let (_tempdir_no_prefix, tree_no_prefix) = create_tree_with_segments(segment_count, false); + + let start = Instant::now(); + for _ in 0..100 { + let start_key: &[u8] = b"zzz_0000"; + let end_key: &[u8] = b"zzz_9999"; + let iter = tree_no_prefix.range(start_key..=end_key, 0, None); + let _ = iter.count(); + } + let no_prefix_time = start.elapsed(); + let avg_no_prefix = no_prefix_time.as_nanos() / 100; + + println!(" Without prefix extractor: {:>8} ns/query", avg_no_prefix); + + // Test with prefix extractor + let (_tempdir_with_prefix, tree_with_prefix) = + create_tree_with_segments(segment_count, true); + + let start = Instant::now(); + for _ in 0..100 { + let start_key: &[u8] = b"zzz_0000"; + let end_key: &[u8] = b"zzz_9999"; + let iter = tree_with_prefix.range(start_key..=end_key, 0, None); + let _ = iter.count(); + } + let with_prefix_time = start.elapsed(); + let avg_with_prefix = with_prefix_time.as_nanos() / 100; + + println!( + " With prefix extractor: {:>8} ns/query", + avg_with_prefix + ); + + if avg_with_prefix > avg_no_prefix { + let overhead = avg_with_prefix - avg_no_prefix; + println!( + " Overhead: {} ns ({:.1}%)", + overhead, + (overhead as f64 / avg_no_prefix as f64) * 100.0 + ); + } else { + let savings = avg_no_prefix - avg_with_prefix; + println!( + " Savings: {} ns ({:.1}%)", + savings, + (savings as f64 / avg_no_prefix as f64) * 100.0 + ); + } + + // Check CPU cost per segment + if segment_count > 0 { + let per_segment_overhead = if avg_with_prefix > avg_no_prefix { + (avg_with_prefix - avg_no_prefix) / segment_count as u128 + } else { + 0 + }; + println!(" Per-segment overhead: ~{} ns", per_segment_overhead); + } + } + + println!("\n=== Summary ==="); + println!("MAX_UPFRONT_CHECKS optimization limits overhead to checking at most 10 segments."); + println!( + "For runs with >10 segments, remaining segments are filtered lazily during iteration.\n" + ); +} + +fn benchmark_all(c: &mut Criterion) { + // Run standard benchmarks + benchmark_range_query(c); + + // Run the detailed timing comparison + run_timing_benchmark(); +} + +criterion_group!(benches, benchmark_range_query); +criterion_main!(benches); diff --git a/src/compaction/flavour.rs b/src/compaction/flavour.rs index 0e6d2884..a31f24c4 100644 --- a/src/compaction/flavour.rs +++ b/src/compaction/flavour.rs @@ -86,7 +86,8 @@ pub(super) fn prepare_table_writer( None => BloomConstructionPolicy::BitsPerKey(0.0), } } - })) + }) + .use_prefix_extractor(opts.config.prefix_extractor.clone())) } // TODO: find a better name diff --git a/src/config/mod.rs b/src/config/mod.rs index 9fc4a393..19c04d98 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -20,8 +20,8 @@ pub use restart_interval::RestartIntervalPolicy; pub type PartioningPolicy = PinningPolicy; use crate::{ - path::absolute_path, version::DEFAULT_LEVEL_COUNT, AnyTree, BlobTree, Cache, CompressionType, - DescriptorTable, SequenceNumberCounter, Tree, + path::absolute_path, prefix::SharedPrefixExtractor, version::DEFAULT_LEVEL_COUNT, AnyTree, + BlobTree, Cache, CompressionType, DescriptorTable, SequenceNumberCounter, Tree, }; use std::{ path::{Path, PathBuf}, @@ -227,6 +227,11 @@ pub struct Config { /// Filter construction policy pub filter_policy: FilterPolicy, + /// Optional prefix extractor used to construct prefix-aware filters. + /// When set, the table writer will add extracted prefixes (in addition to full keys) + /// to the Bloom filter and persist the extractor name in table metadata for compatibility checks. + pub prefix_extractor: Option, + #[doc(hidden)] pub kv_separation_opts: Option, @@ -284,6 +289,8 @@ impl Default for Config { BloomConstructionPolicy::BitsPerKey(10.0), )), + prefix_extractor: None, + expect_point_read_hits: false, kv_separation_opts: None, @@ -301,6 +308,14 @@ impl Config { } } + /// Sets the prefix extractor for building prefix-aware Bloom filters. + /// If set, extracted prefixes are added to filters and the extractor name is stored in table metadata. + #[must_use] + pub fn prefix_extractor(mut self, extractor: SharedPrefixExtractor) -> Self { + self.prefix_extractor = Some(extractor); + self + } + /// Sets the global cache. /// /// You can create a global [`Cache`] and share it between multiple diff --git a/src/lib.rs b/src/lib.rs index 2c7e49d9..2b634ddf 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -179,6 +179,9 @@ pub mod mvcc_stream; mod path; +/// Prefix extraction for filters +pub mod prefix; + #[doc(hidden)] pub mod range; diff --git a/src/prefix.rs b/src/prefix.rs new file mode 100644 index 00000000..674381fc --- /dev/null +++ b/src/prefix.rs @@ -0,0 +1,301 @@ +// Copyright (c) 2024-present, fjall-rs +// This source code is licensed under both the Apache 2.0 and MIT License +// (found in the LICENSE-* files in the repository) + +use std::sync::Arc; + +/// Trait for extracting prefixes from keys for prefix filters. +/// +/// A prefix extractor allows the filter to index prefixes of keys +/// instead of (or in addition to) the full keys. This enables efficient +/// filtering for prefix-based queries. +/// +/// # Examples +/// +/// ## Simple fixed-length prefix: +/// ``` +/// use lsm_tree::prefix::PrefixExtractor; +/// +/// struct FixedPrefixExtractor(usize); +/// +/// impl PrefixExtractor for FixedPrefixExtractor { +/// fn extract<'a>(&self, key: &'a [u8]) -> Box + 'a> { +/// Box::new(std::iter::once(key.get(0..self.0).unwrap_or(key))) +/// } +/// +/// fn name(&self) -> &str { +/// "fixed_prefix" +/// } +/// } +/// ``` +/// +/// ## Segmented prefixes (e.g., `account_id#user_id)`: +/// ``` +/// use lsm_tree::prefix::PrefixExtractor; +/// +/// struct SegmentedPrefixExtractor; +/// +/// impl PrefixExtractor for SegmentedPrefixExtractor { +/// fn extract<'a>(&self, key: &'a [u8]) -> Box + 'a> { +/// let mut prefixes = vec![]; +/// let mut end = 0; +/// for (i, &byte) in key.iter().enumerate() { +/// if byte == b'#' { +/// prefixes.push(&key[0..i]); +/// end = i; +/// } +/// } +/// if end < key.len() { +/// prefixes.push(key); +/// } +/// Box::new(prefixes.into_iter()) +/// } +/// +/// fn name(&self) -> &str { +/// "segmented_prefix" +/// } +/// } +/// ``` +pub trait PrefixExtractor: Send + Sync { + /// Extracts zero or more prefixes from a key. + /// + /// All prefixes will be added to the filter during segment construction. + /// + /// An empty iterator means the key is "out of domain" and won't be added to the filter. + fn extract<'a>(&self, key: &'a [u8]) -> Box + 'a>; + + /// Returns a unique name for this prefix extractor. + fn name(&self) -> &str; +} + +/// A prefix extractor that returns the full key. +/// +/// This is the default behavior if no prefix extractor is specified. +pub struct FullKeyExtractor; + +impl PrefixExtractor for FullKeyExtractor { + fn extract<'a>(&self, key: &'a [u8]) -> Box + 'a> { + Box::new(std::iter::once(key)) + } + + fn name(&self) -> &'static str { + "full_key" + } +} + +/// A prefix extractor that returns a fixed-length prefix. +/// +/// If the key is shorter than the prefix length, returns the full key. +pub struct FixedPrefixExtractor { + length: usize, +} + +impl FixedPrefixExtractor { + /// Creates a new fixed-length prefix extractor. + #[must_use] + pub fn new(length: usize) -> Self { + Self { length } + } +} + +impl PrefixExtractor for FixedPrefixExtractor { + fn extract<'a>(&self, key: &'a [u8]) -> Box + 'a> { + if key.len() <= self.length { + Box::new(std::iter::once(key)) + } else if let Some(prefix) = key.get(0..self.length) { + Box::new(std::iter::once(prefix)) + } else { + Box::new(std::iter::empty()) + } + } + + fn name(&self) -> &'static str { + "fixed_prefix" + } +} + +/// A prefix extractor that requires keys to be at least a certain length. +/// +/// Keys shorter than the required length are considered "out of domain" +/// and won't be added to the filter. This matches `RocksDB`'s behavior. +pub struct FixedLengthExtractor { + length: usize, +} + +impl FixedLengthExtractor { + /// Creates a new fixed-length extractor. + #[must_use] + pub fn new(length: usize) -> Self { + Self { length } + } +} + +impl PrefixExtractor for FixedLengthExtractor { + fn extract<'a>(&self, key: &'a [u8]) -> Box + 'a> { + if key.len() < self.length { + // Key is too short - out of domain + Box::new(std::iter::empty()) + } else if let Some(prefix) = key.get(0..self.length) { + Box::new(std::iter::once(prefix)) + } else { + Box::new(std::iter::empty()) + } + } + + fn name(&self) -> &'static str { + "fixed_length" + } +} + +/// Examples of custom multi-prefix extractors. +/// +/// Users can implement their own prefix extractors that return multiple prefixes. +/// The filter will include all returned prefixes. +/// +/// # Example +/// +/// ```ignore +/// use lsm_tree::prefix::PrefixExtractor; +/// use std::sync::Arc; +/// +/// // Example 1: Hierarchical prefix extractor based on delimiter +/// // For key "user/123/data" with delimiter '/', generates: +/// // - "user" +/// // - "user/123" +/// // - "user/123/data" (full key) +/// struct HierarchicalPrefixExtractor { +/// delimiter: u8, +/// } +/// +/// impl PrefixExtractor for HierarchicalPrefixExtractor { +/// fn extract<'a>(&self, key: &'a [u8]) -> Box + 'a> { +/// let delimiter = self.delimiter; +/// let mut prefixes = Vec::new(); +/// +/// // Generate all prefixes up to each delimiter +/// for (i, &byte) in key.iter().enumerate() { +/// if byte == delimiter { +/// prefixes.push(&key[0..i]); +/// } +/// } +/// +/// // Always include the full key +/// prefixes.push(key); +/// +/// Box::new(prefixes.into_iter()) +/// } +/// +/// fn name(&self) -> &str { +/// "hierarchical_prefix" +/// } +/// } +/// +/// // Example 2: Extract domain prefix for flipped email keys +/// // For "example.com@user", this extracts: +/// // - "example.com" (domain prefix for range scans) +/// // - "example.com@user" (full key for point lookups) +/// struct DomainPrefixExtractor; +/// +/// impl PrefixExtractor for DomainPrefixExtractor { +/// fn extract<'a>(&self, key: &'a [u8]) -> Box + 'a> { +/// if let Ok(key_str) = std::str::from_utf8(key) { +/// if let Some(at_pos) = key_str.find('@') { +/// // Return both domain prefix and full key +/// let domain_prefix = &key[..at_pos]; +/// return Box::new(vec![domain_prefix, key].into_iter()); +/// } +/// } +/// // If not a flipped email format, just return the full key +/// Box::new(std::iter::once(key)) +/// } +/// +/// fn name(&self) -> &str { +/// "domain_prefix" +/// } +/// } +/// +/// // Usage: +/// # let path = tempfile::tempdir()?; +/// // Example usage (API may vary by version): +/// // let tree = lsm_tree::Config::new(path, seqno) +/// // .prefix_extractor(Arc::new(HierarchicalPrefixExtractor { delimiter: b'/' })) +/// // .open()?; +/// # Ok::<(), Box>(()) +/// ``` +/// Type alias for a shared prefix extractor +pub type SharedPrefixExtractor = Arc; + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_full_key_extractor() { + let extractor = FullKeyExtractor; + let key = b"test_key"; + let prefixes: Vec<_> = extractor.extract(key).collect(); + assert_eq!(prefixes.len(), 1); + assert_eq!(prefixes.first(), Some(&b"test_key".as_ref())); + } + + #[test] + fn test_fixed_prefix_extractor() { + let extractor = FixedPrefixExtractor::new(5); + + // Key longer than prefix + let key = b"longer_key"; + let prefixes: Vec<_> = extractor.extract(key).collect(); + assert_eq!(prefixes.len(), 1); + assert_eq!(prefixes.first(), Some(&b"longe".as_ref())); + + // Key shorter than prefix + let key = b"key"; + let prefixes: Vec<_> = extractor.extract(key).collect(); + assert_eq!(prefixes.len(), 1); + assert_eq!(prefixes.first(), Some(&b"key".as_ref())); + + // Key exactly prefix length + let key = b"exact"; + let prefixes: Vec<_> = extractor.extract(key).collect(); + assert_eq!(prefixes.len(), 1); + assert_eq!(prefixes.first(), Some(&b"exact".as_ref())); + } + + #[test] + fn test_empty_key() { + let full_key = FullKeyExtractor; + let fixed = FixedPrefixExtractor::new(5); + + let key = b""; + + let prefixes: Vec<_> = full_key.extract(key).collect(); + assert_eq!(prefixes.len(), 1); + assert_eq!(prefixes.first(), Some(&b"".as_ref())); + + let prefixes: Vec<_> = fixed.extract(key).collect(); + assert_eq!(prefixes.len(), 1); + assert_eq!(prefixes.first(), Some(&b"".as_ref())); + } + + #[test] + fn test_fixed_length_extractor() { + let extractor = FixedLengthExtractor::new(5); + + // Key shorter than required length - out of domain + let key = b"abc"; + let prefixes: Vec<_> = extractor.extract(key).collect(); + assert_eq!(prefixes.len(), 0); // Empty iterator + + // Key exactly required length + let key = b"exact"; + let prefixes: Vec<_> = extractor.extract(key).collect(); + assert_eq!(prefixes.len(), 1); + assert_eq!(prefixes.first(), Some(&b"exact".as_ref())); + + // Key longer than required length + let key = b"longer_key"; + let prefixes: Vec<_> = extractor.extract(key).collect(); + assert_eq!(prefixes.len(), 1); + assert_eq!(prefixes.first(), Some(&b"longe".as_ref())); + } +} diff --git a/src/range.rs b/src/range.rs index a6be0cb6..92515dea 100644 --- a/src/range.rs +++ b/src/range.rs @@ -7,6 +7,7 @@ use crate::{ memtable::Memtable, merge::Merger, mvcc_stream::MvccStream, + prefix::SharedPrefixExtractor, run_reader::RunReader, value::{SeqNo, UserKey}, version::SuperVersion, @@ -65,6 +66,7 @@ pub fn prefix_to_range(prefix: &[u8]) -> (Bound, Bound) { pub struct IterState { pub(crate) version: SuperVersion, pub(crate) ephemeral: Option>, + pub(crate) prefix_extractor: Option, } type BoxedMerge<'a> = Box> + Send + 'a>; @@ -163,15 +165,29 @@ impl TreeIter { range.start_bound().map(|x| &*x.user_key), range.end_bound().map(|x| &*x.user_key), )) { - let reader = table.range(( - range.start_bound().map(|x| &x.user_key).cloned(), - range.end_bound().map(|x| &x.user_key).cloned(), - )); - - iters.push(Box::new(reader.filter(move |item| match item { - Ok(item) => seqno_filter(item.key.seqno, seqno), - Err(_) => true, - }))); + let mut skip = false; + if let Some(ex) = lock.prefix_extractor.as_ref() { + let tmp_range = ( + range.start_bound().map(|x| &x.user_key).cloned(), + range.end_bound().map(|x| &x.user_key).cloned(), + ); + if table.should_skip_range_by_prefix_filter(&tmp_range, ex.as_ref()) + { + skip = true; + } + } + + if !skip { + let reader = table.range(( + range.start_bound().map(|x| &x.user_key).cloned(), + range.end_bound().map(|x| &x.user_key).cloned(), + )); + + iters.push(Box::new(reader.filter(move |item| match item { + Ok(item) => seqno_filter(item.key.seqno, seqno), + Err(_) => true, + }))); + } } } _ => { @@ -181,6 +197,7 @@ impl TreeIter { range.start_bound().map(|x| &x.user_key).cloned(), range.end_bound().map(|x| &x.user_key).cloned(), ), + lock.prefix_extractor.clone(), ) { iters.push(Box::new(reader.filter(move |item| match item { Ok(item) => seqno_filter(item.key.seqno, seqno), diff --git a/src/run_reader.rs b/src/run_reader.rs index 27bcb3f3..28e54bf8 100644 --- a/src/run_reader.rs +++ b/src/run_reader.rs @@ -2,7 +2,9 @@ // This source code is licensed under both the Apache 2.0 and MIT License // (found in the LICENSE-* files in the repository) -use crate::{version::Run, BoxedIterator, InternalValue, Table, UserKey}; +use crate::{ + prefix::SharedPrefixExtractor, version::Run, BoxedIterator, InternalValue, Table, UserKey, +}; use std::{ ops::{Deref, RangeBounds}, sync::Arc, @@ -15,38 +17,146 @@ pub struct RunReader { hi: usize, lo_reader: Option>, hi_reader: Option>, + // Owned range bounds for creating new per-table readers during iteration + range_start: std::ops::Bound, + range_end: std::ops::Bound, + // Optional extractor for prefix-aware pruning during lazy advancement + extractor: Option, } impl RunReader { + /// Creates a run reader over a disjoint set of tables. Returns None when up-front + /// prefix filter pruning determines that no table in the run may contain keys for the range. + /// Uses common-prefix pruning only; per-table skipping happens lazily during iteration. #[must_use] pub fn new + Clone + Send + 'static>( run: Arc>, range: R, + extractor: Option, ) -> Option { assert!(!run.is_empty(), "level reader cannot read empty level"); let (lo, hi) = run.range_overlap_indexes(&range)?; - Some(Self::culled(run, range, (Some(lo), Some(hi)))) + // Compute pruning prefix: only when both bounds' first extracted prefixes exist and are equal. + let mut common_prefix: Option> = None; + if let Some(ex) = extractor.as_ref() { + use std::ops::Bound; + let start_first = match range.start_bound() { + Bound::Included(uk) | Bound::Excluded(uk) => { + ex.extract(uk.as_ref()).next().map(|p| p.to_vec()) + } + Bound::Unbounded => None, + }; + let end_first = match range.end_bound() { + Bound::Included(uk) | Bound::Excluded(uk) => { + ex.extract(uk.as_ref()).next().map(|p| p.to_vec()) + } + Bound::Unbounded => None, + }; + common_prefix = match (start_first, end_first) { + (Some(s), Some(e)) if s == e => Some(s), + _ => None, + }; + } + + // Early optimization + if let Some(ex) = extractor.clone() { + // Compute start bound key once + let start_key = match range.start_bound() { + std::ops::Bound::Included(k) | std::ops::Bound::Excluded(k) => Some(k.as_ref()), + std::ops::Bound::Unbounded => None, + }; + + // Common-prefix pruning when bounds share the same extracted prefix + if common_prefix.is_some() { + const MAX_UPFRONT_CHECKS: usize = 10; + let mut checks = 0usize; + let mut has_potential_match = false; + + for idx in lo..=hi { + let table = run.deref().get(idx).expect("should exist"); + let bounds = ( + range.start_bound().map(|k| k.as_ref()), + range.end_bound().map(|k| k.as_ref()), + ); + if !table.check_key_range_overlap(&bounds) { + continue; + } + + // Use a bound key as the probe so the extractor can derive the prefix and + // the filter index selection remains consistent. + let probe = start_key.or_else(|| match range.end_bound() { + std::ops::Bound::Included(k) | std::ops::Bound::Excluded(k) => { + Some(k.as_ref()) + } + std::ops::Bound::Unbounded => None, + }); + if let Some(probe) = probe { + match table.maybe_contains_prefix(probe, ex.as_ref()) { + Ok(Some(false)) => { /* keep checking other tables */ } + _ => { + has_potential_match = true; + break; + } + } + } else { + // Without a concrete probe key, we cannot consult; treat as potential match + has_potential_match = true; + break; + } + + checks += 1; + if checks >= MAX_UPFRONT_CHECKS { + has_potential_match = true; + break; + } + } + + if !has_potential_match { + return None; + } + } + } + + Some(Self::culled(run, range, (Some(lo), Some(hi)), extractor)) } + /// Creates a run reader with precomputed overlap indices. + /// + /// This variant assumes the caller already determined the overlapping table + /// indices. It initializes boundary table readers and + /// performs lazy per-table prefix-filter skipping during iteration. #[must_use] pub fn culled + Clone + Send + 'static>( run: Arc>, range: R, (lo, hi): (Option, Option), + extractor: Option, ) -> Self { let lo = lo.unwrap_or_default(); let hi = hi.unwrap_or(run.len() - 1); - // TODO: lazily init readers? + // Materialize owned range bounds for reuse when creating readers for other tables + use std::ops::Bound::{Excluded, Included, Unbounded}; + let owned_start: std::ops::Bound = match range.start_bound() { + Included(k) => Included(k.clone()), + Excluded(k) => Excluded(k.clone()), + Unbounded => Unbounded, + }; + let owned_end: std::ops::Bound = match range.end_bound() { + Included(k) => Included(k.clone()), + Excluded(k) => Excluded(k.clone()), + Unbounded => Unbounded, + }; + + // Init readers for boundary tables with proper range let lo_table = run.deref().get(lo).expect("should exist"); - let lo_reader = lo_table.range(range.clone()); + let lo_reader = lo_table.range((owned_start.clone(), owned_end.clone())); - // TODO: lazily init readers? let hi_reader = if hi > lo { let hi_table = run.deref().get(hi).expect("should exist"); - Some(hi_table.range(range)) + Some(hi_table.range((owned_start.clone(), owned_end.clone()))) } else { None }; @@ -57,6 +167,9 @@ impl RunReader { hi, lo_reader: Some(Box::new(lo_reader)), hi_reader: hi_reader.map(|x| Box::new(x) as BoxedIterator), + range_start: owned_start, + range_end: owned_end, + extractor, } } } @@ -76,9 +189,32 @@ impl Iterator for RunReader { self.lo += 1; if self.lo < self.hi { - self.lo_reader = Some(Box::new( - self.run.get(self.lo).expect("should exist").iter(), - )); + // Lazily advance to the next table that overlaps the key range + loop { + if self.lo >= self.hi { + break; + } + let table = self.run.get(self.lo).expect("should exist"); + let bounds = ( + self.range_start.as_ref().map(|k| k.as_ref()), + self.range_end.as_ref().map(|k| k.as_ref()), + ); + if table.check_key_range_overlap(&bounds) { + if let Some(ex) = &self.extractor { + let tmp_range = (self.range_start.clone(), self.range_end.clone()); + if table.should_skip_range_by_prefix_filter(&tmp_range, ex.as_ref()) + { + self.lo += 1; + continue; + } + } + let reader = + table.range((self.range_start.clone(), self.range_end.clone())); + self.lo_reader = Some(Box::new(reader)); + break; + } + self.lo += 1; + } } } else if let Some(hi_reader) = &mut self.hi_reader { // NOTE: We reached the hi marker, so consume from it instead @@ -105,9 +241,32 @@ impl DoubleEndedIterator for RunReader { self.hi -= 1; if self.lo < self.hi { - self.hi_reader = Some(Box::new( - self.run.get(self.hi).expect("should exist").iter(), - )); + // Lazily move to previous table that overlaps the key range + loop { + if self.hi <= self.lo { + break; + } + let table = self.run.get(self.hi).expect("should exist"); + let bounds = ( + self.range_start.as_ref().map(|k| k.as_ref()), + self.range_end.as_ref().map(|k| k.as_ref()), + ); + if table.check_key_range_overlap(&bounds) { + if let Some(ex) = &self.extractor { + let tmp_range = (self.range_start.clone(), self.range_end.clone()); + if table.should_skip_range_by_prefix_filter(&tmp_range, ex.as_ref()) + { + self.hi -= 1; + continue; + } + } + let reader = + table.range((self.range_start.clone(), self.range_end.clone())); + self.hi_reader = Some(Box::new(reader)); + break; + } + self.hi -= 1; + } } } else if let Some(lo_reader) = &mut self.lo_reader { // NOTE: We reached the lo marker, so consume from it instead @@ -125,7 +284,10 @@ impl DoubleEndedIterator for RunReader { #[expect(clippy::expect_used)] mod tests { use super::*; - use crate::{AbstractTree, SequenceNumberCounter, Slice}; + use crate::prefix::{FixedLengthExtractor, SharedPrefixExtractor}; + use crate::{range::prefix_upper_range, AbstractTree, SequenceNumberCounter, Slice}; + use std::ops::Bound; + use std::sync::Arc; use test_log::test; #[test] @@ -155,9 +317,11 @@ mod tests { let level = Arc::new(Run::new(tables)); - assert!(RunReader::new(level.clone(), UserKey::from("y")..=UserKey::from("z"),).is_none()); + assert!( + RunReader::new(level.clone(), UserKey::from("y")..=UserKey::from("z"), None).is_none() + ); - assert!(RunReader::new(level, UserKey::from("y")..).is_none()); + assert!(RunReader::new(level, UserKey::from("y").., None).is_none()); Ok(()) } @@ -191,7 +355,7 @@ mod tests { let level = Arc::new(Run::new(tables)); { - let multi_reader = RunReader::new(level.clone(), ..).unwrap(); + let multi_reader = RunReader::new(level.clone(), .., None).unwrap(); let mut iter = multi_reader.flatten(); @@ -210,7 +374,7 @@ mod tests { } { - let multi_reader = RunReader::new(level.clone(), ..).unwrap(); + let multi_reader = RunReader::new(level.clone(), .., None).unwrap(); let mut iter = multi_reader.rev().flatten(); @@ -229,7 +393,7 @@ mod tests { } { - let multi_reader = RunReader::new(level.clone(), ..).unwrap(); + let multi_reader = RunReader::new(level.clone(), .., None).unwrap(); let mut iter = multi_reader.flatten(); @@ -248,7 +412,7 @@ mod tests { } { - let multi_reader = RunReader::new(level.clone(), UserKey::from("g")..).unwrap(); + let multi_reader = RunReader::new(level.clone(), UserKey::from("g").., None).unwrap(); let mut iter = multi_reader.flatten(); @@ -261,7 +425,7 @@ mod tests { } { - let multi_reader = RunReader::new(level, UserKey::from("g")..).unwrap(); + let multi_reader = RunReader::new(level, UserKey::from("g").., None).unwrap(); let mut iter = multi_reader.flatten().rev(); @@ -275,4 +439,81 @@ mod tests { Ok(()) } + + #[test] + fn run_reader_prefix_range_pruning_absent() -> crate::Result<()> { + let tempdir = tempfile::tempdir()?; + let seqno = SequenceNumberCounter::default(); + let ex: SharedPrefixExtractor = Arc::new(FixedLengthExtractor::new(3)); + let tree = crate::Config::new(&tempdir, seqno.clone()) + .prefix_extractor(ex.clone()) + .open()?; + + // Create multiple tables with prefixes "aaa" and "bbb" + for p in [b"aaa", b"bbb"] { + for i in 0..10u32 { + let mut k = p.to_vec(); + k.extend_from_slice(format!("{:04}", i).as_bytes()); + tree.insert(k, b"v", seqno.next()); + } + tree.flush_active_memtable(0)?; + } + + let tables = tree + .current_version() + .iter_tables() + .cloned() + .collect::>(); + let level = std::sync::Arc::new(Run::new(tables)); + + // Query a prefix range for a non-existent prefix "zzz" + let prefix = b"zzz".to_vec(); + let start = Bound::Included(UserKey::from(prefix.clone())); + let end = prefix_upper_range(&prefix); + let ex = Some(ex); + + // All overlapped tables report Some(false) -> should prune (None) + let reader = RunReader::new(level, (start, end), ex); + assert!(reader.is_none()); + + Ok(()) + } + + #[test] + fn run_reader_prefix_range_no_pruning_when_possible_hit() -> crate::Result<()> { + let tempdir = tempfile::tempdir()?; + let seqno = SequenceNumberCounter::default(); + let ex: SharedPrefixExtractor = Arc::new(FixedLengthExtractor::new(3)); + let tree = crate::Config::new(&tempdir, seqno.clone()) + .prefix_extractor(ex.clone()) + .open()?; + + // Tables with prefixes: "aaa" and "zzz" + for p in [b"aaa", b"zzz"] { + for i in 0..5u32 { + let mut k = p.to_vec(); + k.extend_from_slice(format!("{:02}", i).as_bytes()); + tree.insert(k, b"v", seqno.next()); + } + tree.flush_active_memtable(0)?; + } + + let tables = tree + .current_version() + .iter_tables() + .cloned() + .collect::>(); + let level = std::sync::Arc::new(Run::new(tables)); + + // Query a prefix range for existing prefix "zzz" + let prefix = b"zzz".to_vec(); + let start = Bound::Included(UserKey::from(prefix.clone())); + let end = prefix_upper_range(&prefix); + let ex = Some(ex); + + let reader = RunReader::new(level, (start, end), ex); + assert!(reader.is_some()); + + Ok(()) + } } diff --git a/src/table/filter/block.rs b/src/table/filter/block.rs index 46f759e2..6c2df23c 100644 --- a/src/table/filter/block.rs +++ b/src/table/filter/block.rs @@ -17,6 +17,17 @@ impl FilterBlock { Ok(StandardBloomFilterReader::new(&self.0.data)?.contains_hash(hash)) } + /// Returns Ok(Some(true)) if any extracted prefix of `key` may be contained. + /// Returns Ok(Some(false)) if the filter indicates no extracted prefix is present. + /// Returns Ok(None) if the key is out of the extractor's domain (no prefixes). + pub fn maybe_contains_prefix( + &self, + key: &[u8], + extractor: &dyn crate::prefix::PrefixExtractor, + ) -> crate::Result> { + Ok(StandardBloomFilterReader::new(&self.0.data)?.contains_prefix(key, extractor)) + } + /// Returns the block size in bytes. #[must_use] pub fn size(&self) -> usize { diff --git a/src/table/filter/standard_bloom/mod.rs b/src/table/filter/standard_bloom/mod.rs index 518f03d7..c422cbdc 100644 --- a/src/table/filter/standard_bloom/mod.rs +++ b/src/table/filter/standard_bloom/mod.rs @@ -107,6 +107,29 @@ impl<'a> StandardBloomFilterReader<'a> { self.contains_hash(Self::get_hash(key)) } + /// Returns `true` if any prefix of the key may be contained. + /// + /// Returns `None` if the key is out of domain. + #[must_use] + pub fn contains_prefix( + &self, + key: &[u8], + extractor: &dyn crate::prefix::PrefixExtractor, + ) -> Option { + let mut prefixes = extractor.extract(key); + + // Check if iterator is empty (out of domain) + let first = prefixes.next()?; + + // Check first prefix + if self.contains_hash(Self::get_hash(first)) { + return Some(true); + } + + // Check remaining prefixes + Some(prefixes.any(|prefix| self.contains_hash(Self::get_hash(prefix)))) + } + /// Returns `true` if the bit at `idx` is `1`. fn has_bit(&self, idx: usize) -> bool { self.inner.get(idx) diff --git a/src/table/inner.rs b/src/table/inner.rs index b77c1206..04845487 100644 --- a/src/table/inner.rs +++ b/src/table/inner.rs @@ -49,6 +49,10 @@ pub struct Inner { /// Pinned AMQ filter pub pinned_filter_block: Option, + /// Name of the prefix extractor used when this table was created, if any. + /// Used to determine compatibility with the current extractor for prefix-aware filtering. + pub prefix_extractor_name: Option, + pub is_deleted: AtomicBool, pub(super) checksum: Checksum, diff --git a/src/table/meta.rs b/src/table/meta.rs index abfd43fa..e14d139b 100644 --- a/src/table/meta.rs +++ b/src/table/meta.rs @@ -47,6 +47,9 @@ pub struct ParsedMeta { pub data_block_compression: CompressionType, pub index_block_compression: CompressionType, + + /// Optional name of the prefix extractor used when this table was created. + pub prefix_extractor_name: Option, } macro_rules! read_u8 { @@ -205,6 +208,10 @@ impl ParsedMeta { CompressionType::decode_from(&mut bytes)? }; + let prefix_extractor_name = block + .point_read(b"prefix_extractor", SeqNo::MAX) + .and_then(|v| String::from_utf8(v.value.to_vec()).ok()); + Ok(Self { id, created_at, @@ -219,6 +226,7 @@ impl ParsedMeta { weak_tombstone_reclaimable, data_block_compression, index_block_compression, + prefix_extractor_name, }) } } diff --git a/src/table/mod.rs b/src/table/mod.rs index e1da0e13..90f3e893 100644 --- a/src/table/mod.rs +++ b/src/table/mod.rs @@ -43,7 +43,6 @@ use block_index::BlockIndexImpl; use inner::Inner; use iter::Iter; use std::{ - borrow::Cow, fs::File, io::{BufReader, Read, Seek}, ops::{Bound, RangeBounds}, @@ -97,6 +96,69 @@ impl std::fmt::Debug for Table { } impl Table { + /// Returns true if the table's stored prefix extractor configuration is compatible + /// with the currently configured extractor name. This is used to decide whether + /// prefix-aware filtering is allowed. + pub(crate) fn prefix_filter_allowed(&self, current_extractor_name: Option<&str>) -> bool { + match ( + self.prefix_extractor_name.as_deref(), + current_extractor_name, + ) { + (Some(a), Some(b)) => a == b, + (Some(_), None) => false, + (None, Some(_)) => false, + (None, None) => true, + } + } + + /// Loads the filter block corresponding to `key`, if any. This unifies the logic used by + /// both get() and maybe_contains_prefix(). + fn load_filter_block_for_key( + &self, + key: &[u8], + ) -> crate::Result>> { + if let Some(block) = &self.pinned_filter_block { + return Ok(Some(std::borrow::Cow::Borrowed(block))); + } + + if let Some(filter_idx) = &self.pinned_filter_index { + let mut iter = filter_idx.iter(); + iter.seek(key); + + if let Some(filter_block_handle) = iter.next() { + let filter_block_handle = filter_block_handle.materialize(filter_idx.as_slice()); + + let block = self.load_block( + &filter_block_handle.into_inner(), + BlockType::Filter, + CompressionType::None, + )?; + let block = FilterBlock::new(block); + + return Ok(Some(std::borrow::Cow::Owned(block))); + } else { + return Ok(None); + } + } + + if let Some(_filter_tli_handle) = &self.regions.filter_tli { + // Unpinned filter TLI not supported yet + return Ok(None); + } + + if let Some(filter_block_handle) = &self.regions.filter { + let block = self.load_block( + filter_block_handle, + BlockType::Filter, + CompressionType::None, + )?; + let block = FilterBlock::new(block); + + return Ok(Some(std::borrow::Cow::Owned(block))); + } + + Ok(None) + } pub fn referenced_blob_bytes(&self) -> crate::Result { if let Some(v) = self.0.cached_blob_bytes.get() { return Ok(*v); @@ -216,6 +278,25 @@ impl Table { self.metadata.file_size } + pub(crate) fn get_without_filter( + &self, + key: &[u8], + seqno: SeqNo, + ) -> crate::Result> { + self.point_read(key, seqno) + } + + /// Looks up `key` at or below `seqno`, returning the newest visible value if present. + /// + /// Behavior: + /// - If a compatible filter block is present, performs a single consult. A definite + /// negative avoids data block I/O and increments filter metrics accordingly. + /// - When a prefix extractor is configured and compatible with this table, prefix-aware + /// filtering is used; otherwise hash-based filtering is used (only if this table was + /// also created without a prefix extractor). Incompatible configurations bypass filtering. + /// - Returns Ok(None) when the key is not present or shadowed by sequence rules. + /// + /// Errors reflect I/O or decoding failures when loading index or data blocks. pub fn get( &self, key: &[u8], @@ -229,40 +310,7 @@ impl Table { return Ok(None); } - let filter_block = if let Some(block) = &self.pinned_filter_block { - Some(Cow::Borrowed(block)) - } else if let Some(filter_idx) = &self.pinned_filter_index { - let mut iter = filter_idx.iter(); - iter.seek(key); - - if let Some(filter_block_handle) = iter.next() { - let filter_block_handle = filter_block_handle.materialize(filter_idx.as_slice()); - - let block = self.load_block( - &filter_block_handle.into_inner(), - BlockType::Filter, - CompressionType::None, // NOTE: We never write a filter block with compression - )?; - let block = FilterBlock::new(block); - - Some(Cow::Owned(block)) - } else { - None - } - } else if let Some(_filter_tli_handle) = &self.regions.filter_tli { - unimplemented!("unpinned filter TLI not supported"); - } else if let Some(filter_block_handle) = &self.regions.filter { - let block = self.load_block( - filter_block_handle, - BlockType::Filter, - CompressionType::None, // NOTE: We never write a filter block with compression - )?; - let block = FilterBlock::new(block); - - Some(Cow::Owned(block)) - } else { - None - }; + let filter_block = self.load_filter_block_for_key(key)?; if let Some(filter_block) = filter_block { #[cfg(feature = "metrics")] @@ -271,7 +319,6 @@ impl Table { if !filter_block.maybe_contains_hash(key_hash)? { #[cfg(feature = "metrics")] self.metrics.io_skipped_by_filter.fetch_add(1, Relaxed); - return Ok(None); } } @@ -279,6 +326,40 @@ impl Table { self.point_read(key, seqno) } + /// Checks via the filter whether any extracted prefix of `key` may be present in this table. + /// Returns: + /// - Ok(Some(true)) if the filter indicates a possible match + /// - Ok(Some(false)) if the filter indicates no match + /// - Ok(None) if the key is out of the extractor's domain + /// If no filter is available for this table, returns Ok(Some(true)). + pub fn maybe_contains_prefix( + &self, + key: &[u8], + extractor: &dyn crate::prefix::PrefixExtractor, + ) -> crate::Result> { + // Only consult the prefix-aware filter if the table's stored extractor + // configuration is compatible with the current one. + if !self.prefix_filter_allowed(Some(extractor.name())) { + return Ok(Some(true)); + } + + let filter_block = self.load_filter_block_for_key(key)?; + + if let Some(filter_block) = filter_block { + #[cfg(feature = "metrics")] + { + use std::sync::atomic::Ordering::Relaxed; + self.metrics.filter_queries.fetch_add(1, Relaxed); + } + + let res = filter_block.maybe_contains_prefix(key, extractor)?; + return Ok(res); + } + + // No filter available => cannot exclude + Ok(Some(true)) + } + // TODO: maybe we can skip Fuse costs of the user key // TODO: because we just want to return the value // TODO: we would need to return something like ValueType + Value @@ -510,6 +591,9 @@ impl Table { log::trace!("Table #{} recovered", metadata.id); + // Extract optional prefix extractor name for compatibility checks before moving metadata. + let recovered_prefix_extractor_name = metadata.prefix_extractor_name.clone(); + Ok(Self(Arc::new(Inner { path: Arc::new(file_path), tree_id, @@ -527,6 +611,8 @@ impl Table { pinned_filter_block, + prefix_extractor_name: recovered_prefix_extractor_name, + is_deleted: AtomicBool::default(), checksum, @@ -564,6 +650,69 @@ impl Table { self.metadata.seqnos.1 } + /// Returns the minimum user key in this table's key range. + #[must_use] + pub fn min_key(&self) -> &UserKey { + self.metadata.key_range.min() + } + + /// Determines if this table can be skipped for a given user range by consulting the prefix filter. + /// + /// Behavior: + /// - If both bounds share the same extracted prefix, consult once using a bound key. + /// - If no common prefix, but the start bound's first extracted prefix matches this table's + /// minimum key prefix, consult once using the start key. + /// - A definite negative (Ok(Some(false))) means the table can be skipped; otherwise do not skip. + /// - If the table's stored extractor is incompatible with the provided extractor, do not skip. + pub(crate) fn should_skip_range_by_prefix_filter>( + &self, + range: &R, + extractor: &dyn crate::prefix::PrefixExtractor, + ) -> bool { + if !self.prefix_filter_allowed(Some(extractor.name())) { + return false; + } + + use std::ops::Bound; + + let start_key = match range.start_bound() { + Bound::Included(k) | Bound::Excluded(k) => Some(k.as_ref()), + Bound::Unbounded => None, + }; + let end_key = match range.end_bound() { + Bound::Included(k) | Bound::Excluded(k) => Some(k.as_ref()), + Bound::Unbounded => None, + }; + + let start_pref = start_key.and_then(|k| extractor.extract(k).next()); + let end_pref = end_key.and_then(|k| extractor.extract(k).next()); + + if let (Some(sp), Some(ep)) = (start_pref, end_pref) { + if sp == ep { + if let Some(sk) = start_key { + if let Ok(Some(false)) = self.maybe_contains_prefix(sk, extractor) { + return true; + } + } + return false; + } + } + + if let Some(sp) = start_pref { + if let Some(mp) = extractor.extract(self.min_key().as_ref()).next() { + if sp == mp { + if let Some(sk) = start_key { + if let Ok(Some(false)) = self.maybe_contains_prefix(sk, extractor) { + return true; + } + } + } + } + } + + false + } + /// Returns the number of tombstone markers in the `Table`. #[must_use] #[doc(hidden)] diff --git a/src/table/multi_writer.rs b/src/table/multi_writer.rs index 341561bc..e02559e3 100644 --- a/src/table/multi_writer.rs +++ b/src/table/multi_writer.rs @@ -49,6 +49,9 @@ pub struct MultiWriter { /// Level the tables are written to initial_level: u8, + + /// Optional prefix extractor to register prefixes in Bloom filters. + prefix_extractor: Option, } impl MultiWriter { @@ -93,6 +96,8 @@ impl MultiWriter { current_key: None, linked_blobs: HashMap::default(), + + prefix_extractor: None, }) } @@ -125,6 +130,16 @@ impl MultiWriter { self } + #[must_use] + pub fn use_prefix_extractor( + mut self, + extractor: Option, + ) -> Self { + self.prefix_extractor = extractor.clone(); + self.writer = self.writer.use_prefix_extractor(extractor); + self + } + #[must_use] pub fn use_data_block_restart_interval(mut self, interval: u8) -> Self { self.data_block_restart_interval = interval; @@ -194,6 +209,8 @@ impl MultiWriter { .use_bloom_policy(self.bloom_policy) .use_data_block_hash_ratio(self.data_block_hash_ratio); + new_writer = new_writer.use_prefix_extractor(self.prefix_extractor.clone()); + if self.use_partitioned_index { new_writer = new_writer.use_partitioned_index(); } diff --git a/src/table/writer/filter/full.rs b/src/table/writer/filter/full.rs index 196598a4..ee575db5 100644 --- a/src/table/writer/filter/full.rs +++ b/src/table/writer/filter/full.rs @@ -43,6 +43,11 @@ impl FilterWriter for FullFilterWriter { Ok(()) } + fn register_bytes(&mut self, bytes: &[u8]) -> crate::Result<()> { + self.bloom_hash_buffer.push(Builder::get_hash(bytes)); + Ok(()) + } + fn finish(self: Box, file_writer: &mut sfa::Writer) -> crate::Result<()> { if self.bloom_hash_buffer.is_empty() { log::trace!("Filter write has no buffered hashes - not building filter"); diff --git a/src/table/writer/filter/mod.rs b/src/table/writer/filter/mod.rs index dfcd7551..6155121a 100644 --- a/src/table/writer/filter/mod.rs +++ b/src/table/writer/filter/mod.rs @@ -16,6 +16,10 @@ pub trait FilterWriter { /// Registers a key in the block index. fn register_key(&mut self, key: &UserKey) -> crate::Result<()>; + /// Registers arbitrary bytes into the filter (used for prefix entries). + /// Implementations should hash the bytes identically to full keys. + fn register_bytes(&mut self, bytes: &[u8]) -> crate::Result<()>; + /// Writes the filter to a file. fn finish(self: Box, file_writer: &mut sfa::Writer) -> crate::Result<()>; diff --git a/src/table/writer/filter/partitioned.rs b/src/table/writer/filter/partitioned.rs index b6e6f4e3..742600cd 100644 --- a/src/table/writer/filter/partitioned.rs +++ b/src/table/writer/filter/partitioned.rs @@ -150,9 +150,10 @@ impl FilterWriter for PartitionedFilterWri fn register_key(&mut self, key: &UserKey) -> crate::Result<()> { self.bloom_hash_buffer.push(Builder::get_hash(key)); - self.approx_filter_size += - self.bloom_policy - .estimated_key_bits(self.bloom_hash_buffer.len()) as usize; + // Approximate filter size in BYTES = bits_per_key(n) * n / 8 + let n = self.bloom_hash_buffer.len(); + let bpk = self.bloom_policy.estimated_key_bits(n) as f64; + self.approx_filter_size = ((bpk * n as f64) / 8.0) as usize; self.last_key = Some(key.clone()); @@ -163,6 +164,24 @@ impl FilterWriter for PartitionedFilterWri Ok(()) } + fn register_bytes(&mut self, bytes: &[u8]) -> crate::Result<()> { + self.bloom_hash_buffer.push(Builder::get_hash(bytes)); + + // Approximate filter size in BYTES = bits_per_key(n) * n / 8 + let n = self.bloom_hash_buffer.len(); + let bpk = self.bloom_policy.estimated_key_bits(n) as f64; + self.approx_filter_size = ((bpk * n as f64) / 8.0) as usize; + + if self.approx_filter_size >= self.partition_size as usize { + // We register prefixes only after a key has been seen, so last_key should exist + if let Some(key) = self.last_key.clone() { + self.spill_filter_partition(&key)?; + } + } + + Ok(()) + } + fn finish(mut self: Box, file_writer: &mut sfa::Writer) -> crate::Result<()> { if !self.bloom_hash_buffer.is_empty() { let last_key = self.last_key.take().expect("last key should exist"); diff --git a/src/table/writer/mod.rs b/src/table/writer/mod.rs index fdb85eec..765e68d5 100644 --- a/src/table/writer/mod.rs +++ b/src/table/writer/mod.rs @@ -86,6 +86,10 @@ pub struct Writer { linked_blob_files: Vec, initial_level: u8, + + /// Optional prefix extractor used to register extracted prefixes into the Bloom filter. + /// When present, we will register extracted prefixes in addition to the full key. + prefix_extractor: Option, } impl Writer { @@ -132,6 +136,8 @@ impl Writer { previous_item: None, linked_blob_files: Vec::new(), + + prefix_extractor: None, }) } @@ -213,6 +219,16 @@ impl Writer { self } + /// Sets the prefix extractor to enable prefix-aware Bloom filter construction. + #[must_use] + pub fn use_prefix_extractor( + mut self, + extractor: Option, + ) -> Self { + self.prefix_extractor = extractor; + self + } + /// Writes an item. /// /// # Note @@ -253,7 +269,16 @@ impl Writer { // of the same key if self.bloom_policy.is_active() { - self.filter_writer.register_key(&user_key)?; + // With a configured prefix extractor, register extracted prefixes only. + // Point reads perform a prefix-aware pre-check and bypass the full-key Bloom filter. + if let Some(ref extractor) = self.prefix_extractor { + for prefix in extractor.extract(user_key.as_ref()) { + self.filter_writer.register_bytes(prefix)?; + } + } else { + // Without an extractor, fall back to classic full-key Bloom. + self.filter_writer.register_key(&user_key)?; + } } } @@ -391,7 +416,7 @@ impl Writer { InternalValue::from_components(key, value, 0, crate::ValueType::Value) } - let meta_items = [ + let mut meta_items = vec![ meta("checksum_type", b"xxh3"), meta( "compression#data", @@ -422,46 +447,55 @@ impl Writer { meta("item_count", &(self.meta.item_count as u64).to_le_bytes()), meta( "key#max", - // NOTE: At the beginning we check that we have written at least 1 item, so last_key must exist #[expect(clippy::expect_used)] self.meta.last_key.as_ref().expect("should exist"), ), meta( "key#min", - // NOTE: At the beginning we check that we have written at least 1 item, so first_key must exist #[expect(clippy::expect_used)] self.meta.first_key.as_ref().expect("should exist"), ), meta("key_count", &(self.meta.key_count as u64).to_le_bytes()), - meta("prefix_truncation#data", &[1]), // NOTE: currently prefix truncation can not be disabled - meta("prefix_truncation#index", &[1]), // NOTE: currently prefix truncation can not be disabled - meta( - "restart_interval#data", - &self.data_block_restart_interval.to_le_bytes(), - ), - meta( - "restart_interval#index", - &self.index_block_restart_interval.to_le_bytes(), - ), - meta("seqno#max", &self.meta.highest_seqno.to_le_bytes()), - meta("seqno#min", &self.meta.lowest_seqno.to_le_bytes()), - meta("table_version", &[3u8]), - meta( - "tombstone_count", - &(self.meta.tombstone_count as u64).to_le_bytes(), - ), - meta("user_data_size", &self.meta.uncompressed_size.to_le_bytes()), - meta( - "weak_tombstone_count", - &(self.meta.weak_tombstone_count as u64).to_le_bytes(), - ), - meta( - "weak_tombstone_reclaimable", - &(self.meta.weak_tombstone_reclaimable_count as u64).to_le_bytes(), - ), + meta("prefix_truncation#data", &[1]), + meta("prefix_truncation#index", &[1]), ]; + // Persist the extractor name so recovery can compare it to the current extractor. + // If names differ, disable prefix-based pruning for this table to avoid false negatives. + // Place between prefix_truncation and restart_interval for stable ordering. + if let Some(ref extractor) = self.prefix_extractor { + meta_items.push(meta("prefix_extractor", extractor.name().as_bytes())); + } + meta_items.push(meta( + "restart_interval#data", + &self.data_block_restart_interval.to_le_bytes(), + )); + meta_items.push(meta( + "restart_interval#index", + &self.index_block_restart_interval.to_le_bytes(), + )); + meta_items.push(meta("seqno#max", &self.meta.highest_seqno.to_le_bytes())); + meta_items.push(meta("seqno#min", &self.meta.lowest_seqno.to_le_bytes())); + meta_items.push(meta("table_version", &[3u8])); + meta_items.push(meta( + "tombstone_count", + &(self.meta.tombstone_count as u64).to_le_bytes(), + )); + meta_items.push(meta( + "user_data_size", + &self.meta.uncompressed_size.to_le_bytes(), + )); + meta_items.push(meta( + "weak_tombstone_count", + &(self.meta.weak_tombstone_count as u64).to_le_bytes(), + )); + meta_items.push(meta( + "weak_tombstone_reclaimable", + &(self.meta.weak_tombstone_reclaimable_count as u64).to_le_bytes(), + )); + + // Ensure deterministic ordering for metadata entries without cloning keys + meta_items.sort_by(|a, b| a.key.cmp(&b.key)); - // NOTE: Just to make sure the items are definitely sorted #[cfg(debug_assertions)] { let is_sorted = meta_items.iter().is_sorted_by_key(|kv| &kv.key); @@ -470,7 +504,7 @@ impl Writer { self.block_buffer.clear(); - // TODO: no binary index + // meta_items have been sorted above for deterministic encoding order DataBlock::encode_into(&mut self.block_buffer, &meta_items, 1, 0.0)?; Block::write_into( diff --git a/src/tree/ingest.rs b/src/tree/ingest.rs index 39f2eaf3..de2c8f84 100644 --- a/src/tree/ingest.rs +++ b/src/tree/ingest.rs @@ -88,6 +88,10 @@ impl<'a> Ingestion<'a> { .get(INITIAL_CANONICAL_LEVEL), ); + // Propagate the configured prefix extractor so writers can register extracted + // prefixes and persist the extractor name in table metadata. + writer = writer.use_prefix_extractor(tree.config.prefix_extractor.clone()); + if index_partitioning { writer = writer.use_partitioned_index(); } diff --git a/src/tree/mod.rs b/src/tree/mod.rs index ad1a4d3a..c0a9184c 100644 --- a/src/tree/mod.rs +++ b/src/tree/mod.rs @@ -349,7 +349,11 @@ impl AbstractTree for Tree { Bloom(policy) => policy, None => BloomConstructionPolicy::BitsPerKey(0.0), } - }); + }) + // Ensure tables built during flush carry the configured extractor. + // This lets writers register prefixes and persist the extractor name in metadata + // for compatibility checks at read time. + .use_prefix_extractor(self.config.prefix_extractor.clone()); if index_partitioning { table_writer = table_writer.use_partitioned_index(); @@ -732,6 +736,48 @@ impl Tree { None } + /// Centralized point-read from a single table with prefix-aware pre-checks and + /// compatibility gating. Returns Ok(None) if the prefix filter definitively excludes + /// the key or if the table lookup returns no match. + fn point_read_from_table( + &self, + table: &Table, + key: &[u8], + seqno: SeqNo, + key_hash: u64, + ) -> crate::Result> { + // Determine compatibility of table's stored extractor with current config + let current = self.config.prefix_extractor.as_ref().map(|e| e.name()); + let allow_filter = table.prefix_filter_allowed(current); + + // If prefix filtering is allowed and an extractor is configured, consult the + // prefix-aware filter first and skip on a definite negative. + if allow_filter { + if let Some(ex) = self.config.prefix_extractor.as_ref() { + match table.maybe_contains_prefix(key, ex.as_ref())? { + Some(false) => return Ok(None), + _ => { /* proceed */ } + } + } + } + + let item = if allow_filter { + if self.config.prefix_extractor.is_some() { + // Compatible extractor configured: we've consulted the prefix filter; + // bypass full-key Bloom. + table.get_without_filter(key, seqno)? + } else { + // No extractor configured: rely on full-key Bloom as usual. + table.get(key, seqno, key_hash)? + } + } else { + // Incompatible extractor or mismatch: never trust the filter. + table.get_without_filter(key, seqno)? + }; + + Ok(item) + } + fn get_internal_entry_from_tables( &self, version: &Version, @@ -747,7 +793,9 @@ impl Tree { // NOTE: Based on benchmarking, binary search is only worth it with ~4 tables if run.len() >= 4 { if let Some(table) = run.get_for_key(key) { - if let Some(item) = table.get(key, seqno, key_hash)? { + if let Some(item) = + self.point_read_from_table(table, key, seqno, key_hash)? + { return Ok(ignore_tombstone_value(item)); } } @@ -758,7 +806,9 @@ impl Tree { continue; } - if let Some(item) = table.get(key, seqno, key_hash)? { + if let Some(item) = + self.point_read_from_table(table, key, seqno, key_hash)? + { return Ok(ignore_tombstone_value(item)); } } @@ -822,7 +872,13 @@ impl Tree { let version = self.get_version_for_snapshot(seqno); - let iter_state = { IterState { version, ephemeral } }; + let iter_state = { + IterState { + version, + ephemeral, + prefix_extractor: self.config.prefix_extractor.clone(), + } + }; TreeIter::create_range(iter_state, bounds, seqno) } @@ -991,32 +1047,16 @@ impl Tree { let recovery = recover(tree_path)?; - let table_map = { - let mut result: crate::HashMap = - crate::HashMap::default(); - - for (level_idx, table_ids) in recovery.table_ids.iter().enumerate() { - for run in table_ids { - for &(table_id, checksum) in run { - #[expect( - clippy::expect_used, - reason = "there are always less than 256 levels" - )] - result.insert( - table_id, - ( - level_idx - .try_into() - .expect("there are less than 256 levels"), - checksum, - ), - ); - } + // Map TableId -> (level_idx, checksum) + let mut table_map: std::collections::HashMap = + std::collections::HashMap::new(); + for (level_idx, level) in recovery.table_ids.iter().enumerate() { + for run in level { + for (id, checksum) in run { + table_map.insert(*id, (level_idx, *checksum)); } } - - result - }; + } let cnt = table_map.len(); diff --git a/tests/prefix_filter.rs b/tests/prefix_filter.rs new file mode 100644 index 00000000..8fd00227 --- /dev/null +++ b/tests/prefix_filter.rs @@ -0,0 +1,2594 @@ +use lsm_tree::config::{BloomConstructionPolicy, FilterPolicy, FilterPolicyEntry}; +use lsm_tree::Guard; +use lsm_tree::SequenceNumberCounter; +use lsm_tree::{ + prefix::{FixedLengthExtractor, FixedPrefixExtractor, FullKeyExtractor, PrefixExtractor}, + AbstractTree, Config, +}; +use std::sync::Arc; + +// Helper function to generate test keys with prefixes +fn generate_test_key(prefix: &str, suffix: &str) -> Vec { + format!("{}{}", prefix, suffix).into_bytes() +} + +#[test] +fn test_prefix_filter_range_start_only_prefix_no_upfront_prune() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + let prefix_len = 5; + + let tree = Config::new(&folder, SequenceNumberCounter::default()) + .prefix_extractor(Arc::new(FixedPrefixExtractor::new(prefix_len))) + .open()?; + + for i in 0..50u32 { + let key = format!("zebra_{:04}", i); + tree.insert(key.as_bytes(), b"v", 0); + } + tree.flush_active_memtable(0)?; + + for i in 0..50u32 { + let key = format!("zulu_{:04}", i); + tree.insert(key.as_bytes(), b"v", 0); + } + tree.flush_active_memtable(0)?; + + let count = tree + .range::<&[u8], std::ops::RangeFrom<&[u8]>>((&b"user1_0000"[..]).., u64::MAX, None) + .count(); + assert_eq!(count, 100); + + Ok(()) +} + +#[test] +fn test_prefix_filter_with_fixed_prefix() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + let prefix_len = 8; + + let tree = Config::new(&folder, SequenceNumberCounter::default()) + .prefix_extractor(Arc::new(FixedPrefixExtractor::new(prefix_len))) + .open()?; + + // Insert keys with common prefixes + let prefix1 = "prefix01"; + let prefix2 = "prefix02"; + + for i in 0..100 { + let key1 = generate_test_key(prefix1, &format!("_{:04}", i)); + let key2 = generate_test_key(prefix2, &format!("_{:04}", i)); + + tree.insert(key1, b"value1", 0); + tree.insert(key2, b"value2", 0); + } + + tree.flush_active_memtable(0)?; + + #[cfg(feature = "metrics")] + let initial_queries = tree.metrics().filter_queries(); + + // Test that keys with matching prefixes are found + for i in 0..100 { + let key1 = generate_test_key(prefix1, &format!("_{:04}", i)); + let key2 = generate_test_key(prefix2, &format!("_{:04}", i)); + + assert!(tree.contains_key(&key1, u64::MAX)?); + assert!(tree.contains_key(&key2, u64::MAX)?); + } + + // Test that keys with non-matching prefixes work correctly + let non_existent_key = generate_test_key("prefix99", "_0000"); + assert!(!tree.contains_key(&non_existent_key, u64::MAX)?); + + #[cfg(feature = "metrics")] + { + let final_queries = tree.metrics().filter_queries(); + + // We should have at least 201 filter queries (200 existing keys + 1 non-existent) + assert!( + final_queries > initial_queries, + "filter queries should have increased" + ); + } + + Ok(()) +} + +#[test] +fn test_prefix_filter_with_fixed_length() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + let required_len = 10; + + let tree = Config::new(&folder, SequenceNumberCounter::default()) + .prefix_extractor(Arc::new(FixedLengthExtractor::new(required_len))) + .open()?; + + // Insert keys with exactly the required length prefix + for i in 0..50 { + let key = format!("exactlen{:02}_suffix_{}", i, i); + tree.insert(key.as_bytes(), b"value", 0); + } + + // Insert keys that are too short (out of domain) + for i in 0..20 { + let short_key = format!("key{}", i); + tree.insert(short_key.as_bytes(), b"short_value", 0); + } + + tree.flush_active_memtable(0)?; + + #[cfg(feature = "metrics")] + let initial_queries = tree.metrics().filter_queries(); + + // Verify keys with matching length are found + for i in 0..50 { + let key = format!("exactlen{:02}_suffix_{}", i, i); + assert!(tree.contains_key(key.as_bytes(), u64::MAX)?); + } + + // Verify short keys are also found (they're stored but not in filter) + for i in 0..20 { + let short_key = format!("key{}", i); + assert!(tree.contains_key(short_key.as_bytes(), u64::MAX)?); + } + + // Verify non-existent prefix is quickly rejected + // Use a key that matches the required length to ensure it's in-domain + let range = tree.range("nonexist00".."nonexist99", u64::MAX, None); + assert_eq!(range.count(), 0); + + #[cfg(feature = "metrics")] + { + let final_queries = tree.metrics().filter_queries(); + // Should have filter queries for all lookups + assert!( + final_queries > initial_queries, + "filter queries should have increased" + ); + } + + Ok(()) +} + +#[test] +fn test_prefix_filter_full_key() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + + // Using FullKeyExtractor (default behavior) + let tree = Config::new(&folder, SequenceNumberCounter::default()) + .prefix_extractor(Arc::new(FullKeyExtractor)) + .open()?; + + // Insert various keys + let keys = vec![ + b"apple".to_vec(), + b"banana".to_vec(), + b"cherry".to_vec(), + b"date".to_vec(), + b"elderberry".to_vec(), + ]; + + for key in &keys { + tree.insert(key.clone(), b"value", 0); + } + + tree.flush_active_memtable(0)?; + + #[cfg(feature = "metrics")] + let initial_queries = tree.metrics().filter_queries(); + + // All keys should be found + for key in &keys { + assert!(tree.contains_key(key, u64::MAX)?); + } + + // Non-existent key test + assert!(!tree.contains_key(b"fig", u64::MAX)?); + + #[cfg(feature = "metrics")] + { + let final_queries = tree.metrics().filter_queries(); + + // Should have filter queries for in-domain keys + assert!( + final_queries > initial_queries, + "filter queries should increase for in-domain keys" + ); + } + assert!(!tree.contains_key(b"kiwi", u64::MAX)?); + + #[cfg(feature = "metrics")] + { + let final_queries = tree.metrics().filter_queries(); + + // Should have queries for all lookups (5 existing + 2 non-existent) + assert!( + final_queries > initial_queries, + "filter queries should have increased" + ); + } + + Ok(()) +} + +#[test] +fn test_prefix_filter_range_queries() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + let prefix_len = 5; + + let tree = Config::new(&folder, SequenceNumberCounter::default()) + .prefix_extractor(Arc::new(FixedPrefixExtractor::new(prefix_len))) + .open()?; + + // Insert keys with common prefixes + let prefixes = vec!["user_", "post_", "comm_"]; + + for prefix in &prefixes { + for i in 0..20 { + let key = format!("{}{:04}", prefix, i); + tree.insert(key.as_bytes(), format!("value_{}", i).as_bytes(), 0); + } + } + + tree.flush_active_memtable(0)?; + + #[cfg(feature = "metrics")] + let initial_queries = tree.metrics().filter_queries(); + + // Test prefix iteration + for prefix in &prefixes { + let start_key = prefix.to_string(); + let end_key = format!("{}~", prefix); // '~' is after all digits and letters + + let count = tree + .range(start_key.as_bytes()..end_key.as_bytes(), u64::MAX, None) + .count(); + assert_eq!(count, 20); + } + + // Test non-existent prefix range + let count = tree + .range(&b"none_"[..]..&b"none~"[..], u64::MAX, None) + .count(); + assert_eq!(count, 0); + + #[cfg(feature = "metrics")] + { + let final_queries = tree.metrics().filter_queries(); + assert!( + final_queries > initial_queries, + "filter queries should have increased" + ); + } + + Ok(()) +} + +#[test] +fn test_prefix_filter_after_compaction() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + let prefix_len = 6; + + let tree = Config::new(&folder, SequenceNumberCounter::default()) + .prefix_extractor(Arc::new(FixedPrefixExtractor::new(prefix_len))) + .open()?; + + // Insert first batch of keys + for i in 0..50 { + let key = format!("batch1_{:04}", i); + tree.insert(key.as_bytes(), b"value1", 0); + } + + tree.flush_active_memtable(0)?; + + // Insert second batch with overlapping keys + for i in 25..75 { + let key = format!("batch1_{:04}", i); + tree.insert(key.as_bytes(), b"value2", 0); + } + + tree.flush_active_memtable(0)?; + + // Force compaction + use lsm_tree::compaction::Leveled; + tree.compact(Arc::new(Leveled::default()), 0)?; + + #[cfg(feature = "metrics")] + let initial_queries = tree.metrics().filter_queries(); + #[cfg(feature = "metrics")] + let initial_hits = tree.metrics().io_skipped_by_filter(); + + // All keys should still be found after compaction + for i in 0..75 { + let key = format!("batch1_{:04}", i); + assert!(tree.contains_key(key.as_bytes(), u64::MAX)?); + } + + #[cfg(feature = "metrics")] + { + let final_queries = tree.metrics().filter_queries(); + let final_hits = tree.metrics().io_skipped_by_filter(); + + // Should have filter queries for post-compaction lookups + assert!( + final_queries > initial_queries, + "filter queries should have increased after compaction" + ); + + // All keys exist, so hits should not increase + assert_eq!( + final_hits, initial_hits, + "filter hits should not increase for existing keys" + ); + } + + Ok(()) +} + +#[test] +fn test_prefix_filter_with_deletions() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + let prefix_len = 7; + + let tree = Config::new(&folder, SequenceNumberCounter::default()) + .prefix_extractor(Arc::new(FixedPrefixExtractor::new(prefix_len))) + .open()?; + + // Insert keys + for i in 0..100 { + let key = format!("deltest_{:04}", i); + tree.insert(key.as_bytes(), b"value", 0); + } + + tree.flush_active_memtable(0)?; + + // Delete some keys + for i in (0..100).step_by(2) { + let key = format!("deltest_{:04}", i); + tree.remove(key.as_bytes(), 0); + } + + tree.flush_active_memtable(0)?; + + #[cfg(feature = "metrics")] + let initial_queries = tree.metrics().filter_queries(); + #[cfg(feature = "metrics")] + let initial_hits = tree.metrics().io_skipped_by_filter(); + + // Verify deletions + for i in 0..100 { + let key = format!("deltest_{:04}", i); + if i % 2 == 0 { + assert!(!tree.contains_key(key.as_bytes(), u64::MAX)?); + } else { + assert!(tree.contains_key(key.as_bytes(), u64::MAX)?); + } + } + + #[cfg(feature = "metrics")] + { + let final_queries = tree.metrics().filter_queries(); + let final_hits = tree.metrics().io_skipped_by_filter(); + + // Should have filter queries for all lookups after deletions + assert!( + final_queries > initial_queries, + "filter queries should have increased for deletion checks" + ); + + // Deleted keys still pass filter (tombstones), so hits should not increase + assert_eq!( + final_hits, initial_hits, + "filter hits should not increase (deleted keys still in filter)" + ); + } + + Ok(()) +} + +#[test] +fn test_prefix_filter_edge_cases() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + + // Test with prefix length of 1 + let tree = Config::new( + folder.path().join("test1"), + SequenceNumberCounter::default(), + ) + .prefix_extractor(Arc::new(FixedPrefixExtractor::new(1))) + .open()?; + + tree.insert(b"a", b"value", 0); + tree.insert(b"b", b"value", 0); + tree.insert(b"ab", b"value", 0); + tree.insert(b"ba", b"value", 0); + + tree.flush_active_memtable(0)?; + + #[cfg(feature = "metrics")] + let initial_queries = tree.metrics().filter_queries(); + + assert!(tree.contains_key(b"a", u64::MAX)?); + assert!(tree.contains_key(b"b", u64::MAX)?); + assert!(tree.contains_key(b"ab", u64::MAX)?); + assert!(tree.contains_key(b"ba", u64::MAX)?); + + #[cfg(feature = "metrics")] + { + let final_queries = tree.metrics().filter_queries(); + + // Should have queries for both existing and non-existing keys + assert!( + final_queries > initial_queries, + "filter queries should have increased for point lookups" + ); + } + + // Test with empty keys + let tree2 = Config::new( + folder.path().join("test2"), + SequenceNumberCounter::default(), + ) + .prefix_extractor(Arc::new(FixedPrefixExtractor::new(5))) + .open()?; + + tree2.insert(b"test", b"short_key", 0); + tree2.insert(b"longer_key", b"long_key", 0); + + tree2.flush_active_memtable(0)?; + + #[cfg(feature = "metrics")] + let initial_queries2 = tree2.metrics().filter_queries(); + + assert!(tree2.contains_key(b"test", u64::MAX)?); + assert!(tree2.contains_key(b"longer_key", u64::MAX)?); + + #[cfg(feature = "metrics")] + { + let final_queries2 = tree2.metrics().filter_queries(); + assert!( + final_queries2 > initial_queries2, + "filter queries should have increased for short/long key lookups" + ); + } + + Ok(()) +} + +#[test] +fn test_prefix_filter_large_dataset() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + let prefix_len = 12; + + let tree = Config::new(&folder, SequenceNumberCounter::default()) + .prefix_extractor(Arc::new(FixedPrefixExtractor::new(prefix_len))) + .open()?; + + // Insert a large number of keys with various prefixes + let prefixes = vec![ + "transaction_", + "userprofile_", + "sessiondata_", + "logentryval_", + ]; + + for prefix in &prefixes { + for i in 0..1000 { + let key = format!("{}{:08}", prefix, i); + let value = format!("data_{}", i); + tree.insert(key.as_bytes(), value.as_bytes(), 0); + + // Flush periodically to create multiple segments + if i % 250 == 249 { + tree.flush_active_memtable(0)?; + } + } + } + + // Final flush + tree.flush_active_memtable(0)?; + + #[cfg(feature = "metrics")] + let initial_queries = tree.metrics().filter_queries(); + + // Verify all keys are found + for prefix in &prefixes { + for i in 0..1000 { + let key = format!("{}{:08}", prefix, i); + assert!( + tree.contains_key(key.as_bytes(), u64::MAX)?, + "Key {} not found", + key + ); + } + } + + // Test non-existent keys with matching prefixes + for prefix in &prefixes { + let non_existent_key = format!("{}{:08}", prefix, 9999); + assert!(!tree.contains_key(non_existent_key.as_bytes(), u64::MAX)?); + } + + #[cfg(feature = "metrics")] + { + let final_queries = tree.metrics().filter_queries(); + + // With multiple segments, we should have many filter queries + assert!( + final_queries > initial_queries, + "filter queries should have increased for large dataset" + ); + } + + Ok(()) +} + +#[test] +fn test_prefix_filter_recovery() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + let prefix_len = 10; + + // Create and populate tree + { + let tree = Config::new(&folder, SequenceNumberCounter::default()) + .prefix_extractor(Arc::new(FixedPrefixExtractor::new(prefix_len))) + .open()?; + + for i in 0..100 { + let key = format!("persistent_{:04}", i); + tree.insert(key.as_bytes(), b"value", 0); + } + + tree.flush_active_memtable(0)?; + } + + // Reopen tree and verify filter still works + { + let tree = Config::new(&folder, SequenceNumberCounter::default()) + .prefix_extractor(Arc::new(FixedPrefixExtractor::new(prefix_len))) + .open()?; + + #[cfg(feature = "metrics")] + let initial_queries = tree.metrics().filter_queries(); + + for i in 0..100 { + let key = format!("persistent_{:04}", i); + assert!(tree.contains_key(key.as_bytes(), u64::MAX)?); + } + + // Non-existent keys should still be filtered + let non_existent = b"persistent_9999"; + assert!(!tree.contains_key(non_existent, u64::MAX)?); + + #[cfg(feature = "metrics")] + { + let final_queries = tree.metrics().filter_queries(); + + // After recovery, filters should still be working + assert!( + final_queries > initial_queries, + "filter queries should work after recovery" + ); + } + } + + Ok(()) +} + +#[test] +fn test_prefix_filter_concurrent_access() -> lsm_tree::Result<()> { + use std::thread; + + let folder = tempfile::tempdir()?; + let prefix_len = 8; + + let tree = Arc::new( + Config::new(&folder, SequenceNumberCounter::default()) + .prefix_extractor(Arc::new(FixedPrefixExtractor::new(prefix_len))) + .open()?, + ); + + // Spawn multiple threads to insert data + let handles: Vec<_> = (0..4) + .map(|thread_id| { + let tree = Arc::clone(&tree); + thread::spawn(move || { + for i in 0..250 { + let key = format!("thread{:02}_{:04}", thread_id, i); + tree.insert(key.as_bytes(), b"value", 0); + } + }) + }) + .collect(); + + // Wait for all threads to complete + for handle in handles { + handle.join().unwrap(); + } + + tree.flush_active_memtable(0)?; + + #[cfg(feature = "metrics")] + let initial_queries = tree.metrics().filter_queries(); + #[cfg(feature = "metrics")] + let initial_hits = tree.metrics().io_skipped_by_filter(); + + // Verify all keys from all threads + for thread_id in 0..4 { + for i in 0..250 { + let key = format!("thread{:02}_{:04}", thread_id, i); + assert!(tree.contains_key(key.as_bytes(), u64::MAX)?); + } + } + + #[cfg(feature = "metrics")] + { + let final_queries = tree.metrics().filter_queries(); + let final_hits = tree.metrics().io_skipped_by_filter(); + + // Should have filter queries for all concurrent lookups + assert!( + final_queries > initial_queries, + "filter queries should have increased for concurrent access" + ); + + // All keys exist, so hits should not increase + assert_eq!( + final_hits, initial_hits, + "filter hits should not increase for existing keys" + ); + } + + Ok(()) +} + +#[test] +fn test_prefix_filter_sequence_consistency() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + let prefix_len = 9; + + let tree = Config::new(&folder, SequenceNumberCounter::default()) + .prefix_extractor(Arc::new(FixedPrefixExtractor::new(prefix_len))) + .open()?; + + // Insert initial data with sequence number 0-49 + for i in 0..50 { + let key = format!("seqtest1_{:04}", i); + tree.insert(key.as_bytes(), b"v1", i as u64); + } + + tree.flush_active_memtable(0)?; + + // Insert more data with sequence numbers 50-99 + for i in 50..100 { + let key = format!("seqtest1_{:04}", i); + tree.insert(key.as_bytes(), b"v2", i as u64); + } + + tree.flush_active_memtable(0)?; + + #[cfg(feature = "metrics")] + let initial_queries = tree.metrics().filter_queries(); + + // Verify that at sequence number 50, only the first 50 keys are visible + // (keys inserted at seqno 0-49 are visible at seqno >= their insert seqno) + for i in 0..50 { + let key = format!("seqtest1_{:04}", i); + assert!(tree.contains_key(key.as_bytes(), 50)?); + } + + for i in 50..100 { + let key = format!("seqtest1_{:04}", i); + assert!(!tree.contains_key(key.as_bytes(), 50)?); + } + + // Verify tree sees all data at max sequence number + for i in 0..100 { + let key = format!("seqtest1_{:04}", i); + assert!(tree.contains_key(key.as_bytes(), u64::MAX)?); + } + + #[cfg(feature = "metrics")] + { + let final_queries = tree.metrics().filter_queries(); + + // filter should be used for all lookups + assert!( + final_queries > initial_queries, + "filter queries should have increased for sequence consistency checks" + ); + } + + Ok(()) +} + +#[test] +fn test_prefix_filter_seek_optimization() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + let prefix_len = 8; + + let tree = Config::new(&folder, SequenceNumberCounter::default()) + .prefix_extractor(Arc::new(FixedPrefixExtractor::new(prefix_len))) + .open()?; + + // Insert keys with specific prefixes + for i in 0..100 { + let key = format!("prefix_a_{:04}", i); + tree.insert(key.as_bytes(), b"value_a", 0); + } + + for i in 0..100 { + let key = format!("prefix_b_{:04}", i); + tree.insert(key.as_bytes(), b"value_b", 0); + } + + tree.flush_active_memtable(0)?; + + #[cfg(feature = "metrics")] + let initial_queries = tree.metrics().filter_queries(); + + // Seek with existing prefix should find keys + let range_a = tree.range("prefix_a_0000".."prefix_a_9999", u64::MAX, None); + assert_eq!(range_a.count(), 100); + + // Seek with non-existent prefix should return empty (optimized via filter) + let range_c = tree.range("prefix_c_0000".."prefix_c_9999", u64::MAX, None); + assert_eq!(range_c.count(), 0); + + #[cfg(feature = "metrics")] + { + let final_queries = tree.metrics().filter_queries(); + + // Range queries should trigger filter checks + assert!( + final_queries > initial_queries, + "filter queries should have increased for range operations" + ); + } + + // Verify partial prefix matches work + let range_partial = tree.range("prefix_a_0050".."prefix_a_0060", u64::MAX, None); + assert_eq!(range_partial.count(), 10); + + Ok(()) +} + +#[test] +fn test_no_prefix_extractor() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + + // Create tree without prefix extractor (default behavior) + let tree = Config::new(&folder, SequenceNumberCounter::default()).open()?; + + // Insert various keys + for i in 0..100 { + let key = format!("noprefix_{:04}", i); + tree.insert(key.as_bytes(), b"value", 0); + } + + tree.flush_active_memtable(0)?; + + #[cfg(feature = "metrics")] + let initial_queries = tree.metrics().filter_queries(); + #[cfg(feature = "metrics")] + let initial_hits = tree.metrics().io_skipped_by_filter(); + + // All keys should be found (full key matching) + for i in 0..100 { + let key = format!("noprefix_{:04}", i); + assert!(tree.contains_key(key.as_bytes(), u64::MAX)?); + } + + #[cfg(feature = "metrics")] + { + let final_queries = tree.metrics().filter_queries(); + let final_hits = tree.metrics().io_skipped_by_filter(); + + // Should still have filter queries even without prefix extractor (uses full key) + assert!( + final_queries > initial_queries, + "filter queries should work without prefix extractor" + ); + + // All keys exist, so hits should not increase + assert_eq!( + final_hits, initial_hits, + "filter hits should not increase for existing keys" + ); + } + + Ok(()) +} + +// Custom segmented prefix extractor for account_id#user_id pattern +struct SegmentedPrefixExtractor { + delimiter: u8, +} + +impl SegmentedPrefixExtractor { + fn new(delimiter: u8) -> Self { + Self { delimiter } + } +} + +impl PrefixExtractor for SegmentedPrefixExtractor { + fn extract<'a>(&self, key: &'a [u8]) -> Box + 'a> { + let mut prefixes = Vec::new(); + + // Find the first delimiter position + if let Some(first_delim_pos) = key.iter().position(|&b| b == self.delimiter) { + // Add the prefix up to the first delimiter (account_id) + prefixes.push(&key[..first_delim_pos]); + + // Find the second delimiter position + if let Some(second_delim_pos) = key[first_delim_pos + 1..] + .iter() + .position(|&b| b == self.delimiter) + { + // Add the prefix up to the second delimiter (account_id#user_id) + let full_prefix_end = first_delim_pos + 1 + second_delim_pos; + prefixes.push(&key[..full_prefix_end]); + } else { + // If no second delimiter, use the entire key as prefix + prefixes.push(key); + } + } else { + // No delimiter found, use the entire key + prefixes.push(key); + } + + Box::new(prefixes.into_iter()) + } + + fn name(&self) -> &str { + "SegmentedPrefixExtractor" + } +} + +#[test] +fn test_prefix_filter_segmented_extractor() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + let delimiter = b'#'; + + let tree = Config::new(&folder, SequenceNumberCounter::default()) + .prefix_extractor(Arc::new(SegmentedPrefixExtractor::new(delimiter))) + .open()?; + + // Insert keys with account_id#user_id#data pattern + let account1 = "acc001"; + let account2 = "acc002"; + + // Insert users for account1 + for user_id in 1..=5 { + for data_id in 1..=10 { + let key = format!("{}#user{:03}#data{:04}", account1, user_id, data_id); + let value = format!("value_{}_{}", user_id, data_id); + tree.insert(key.as_bytes(), value.as_bytes(), 0); + } + } + + // Insert users for account2 + for user_id in 1..=3 { + for data_id in 1..=10 { + let key = format!("{}#user{:03}#data{:04}", account2, user_id, data_id); + let value = format!("value_{}_{}", user_id, data_id); + tree.insert(key.as_bytes(), value.as_bytes(), 0); + } + } + + tree.flush_active_memtable(0)?; + + #[cfg(feature = "metrics")] + let initial_queries = tree.metrics().filter_queries(); + + // Test 1: Query for specific user within account1 + let user_key = format!("{}#user002#data0005", account1); + assert!(tree.contains_key(user_key.as_bytes(), u64::MAX)?); + + // Test 2: Query for all data of a specific user (prefix range query) + let user_prefix_start = format!("{}#user002#", account1); + let user_prefix_end = format!("{}#user002~", account1); // ~ is after # + let user_range = tree.range( + user_prefix_start.as_bytes()..user_prefix_end.as_bytes(), + u64::MAX, + None, + ); + assert_eq!(user_range.count(), 10); // Should find 10 data items for this user + + // Test 3: Query for all users in account1 (account-level prefix) + let account_prefix_start = format!("{}#", account1); + let account_prefix_end = format!("{}~", account1); // ~ is after # + let account_range = tree.range( + account_prefix_start.as_bytes()..account_prefix_end.as_bytes(), + u64::MAX, + None, + ); + assert_eq!(account_range.count(), 50); // 5 users * 10 data items + + // Test 4: Query for non-existent account + let non_existent_start = "acc999#"; + let non_existent_end = "acc999~"; + let non_existent_range = tree.range( + non_existent_start.as_bytes()..non_existent_end.as_bytes(), + u64::MAX, + None, + ); + assert_eq!(non_existent_range.count(), 0); + + // Test 5: Query for non-existent user in existing account + let non_user_key = format!("{}#user999#data0001", account1); + assert!(!tree.contains_key(non_user_key.as_bytes(), u64::MAX)?); + + #[cfg(feature = "metrics")] + { + let final_queries = tree.metrics().filter_queries(); + // Should have filter queries for all lookups + assert!( + final_queries > initial_queries, + "filter queries should have increased for segmented lookups" + ); + } + + Ok(()) +} + +#[test] +fn test_prefix_filter_single_byte_keys() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + + let tree = Config::new(&folder, SequenceNumberCounter::default()) + .prefix_extractor(Arc::new(FixedPrefixExtractor::new(2))) + .open()?; + + // Insert single-byte keys + for i in 0u8..10 { + tree.insert(&[i], format!("value_{}", i).as_bytes(), 0); + } + + // Insert two-byte keys + for i in 0u8..10 { + tree.insert(&[i, i], format!("value_{}{}", i, i).as_bytes(), 0); + } + + tree.flush_active_memtable(0)?; + + #[cfg(feature = "metrics")] + let initial_queries = tree.metrics().filter_queries(); + + // All keys should be found + for i in 0u8..10 { + assert!(tree.contains_key(&[i], u64::MAX)?); + assert!(tree.contains_key(&[i, i], u64::MAX)?); + } + + // Non-existent single-byte key + assert!(!tree.contains_key(&[255], u64::MAX)?); + + #[cfg(feature = "metrics")] + { + let final_queries = tree.metrics().filter_queries(); + + // Should have queries for all lookups + assert!( + final_queries > initial_queries, + "filter queries should increase for single/two-byte key lookups" + ); + } + + Ok(()) +} + +#[test] +fn test_prefix_filter_null_bytes() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + + let tree = Config::new(&folder, SequenceNumberCounter::default()) + .prefix_extractor(Arc::new(FixedPrefixExtractor::new(3))) + .open()?; + + // Insert keys with null bytes + tree.insert(b"\0\0\0data", b"null_prefix", 0); + tree.insert(b"pre\0fix", b"null_middle", 0); + tree.insert(b"suffix\0", b"null_end", 0); + tree.insert(b"\0", b"single_null", 0); + + tree.flush_active_memtable(0)?; + + #[cfg(feature = "metrics")] + let initial_queries = tree.metrics().filter_queries(); + + // All keys should be found + assert!(tree.contains_key(b"\0\0\0data", u64::MAX)?); + assert!(tree.contains_key(b"pre\0fix", u64::MAX)?); + assert!(tree.contains_key(b"suffix\0", u64::MAX)?); + assert!(tree.contains_key(b"\0", u64::MAX)?); + + // Non-existent keys with null bytes + assert!(!tree.contains_key(b"\0\0\0missing", u64::MAX)?); + assert!(!tree.contains_key(b"pre\0missing", u64::MAX)?); + + #[cfg(feature = "metrics")] + { + let final_queries = tree.metrics().filter_queries(); + + assert!( + final_queries > initial_queries, + "filter queries should increase for null byte key lookups" + ); + } + + Ok(()) +} + +#[test] +fn test_prefix_filter_non_ascii() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + + let tree = Config::new(&folder, SequenceNumberCounter::default()) + .prefix_extractor(Arc::new(FixedPrefixExtractor::new(6))) + .open()?; + + // Insert keys with UTF-8 characters + tree.insert("prefix_测试_data".as_bytes(), b"chinese", 0); + tree.insert("prefix_тест_data".as_bytes(), b"cyrillic", 0); + tree.insert("prefix_🦀_data".as_bytes(), b"emoji", 0); + tree.insert("prefix_café".as_bytes(), b"accented", 0); + + // Insert binary keys (non-UTF8) + tree.insert(&[0xFF, 0xFE, 0xFD, 0xFC, 0xFB, 0xFA], b"binary", 0); + + tree.flush_active_memtable(0)?; + + #[cfg(feature = "metrics")] + let initial_queries = tree.metrics().filter_queries(); + + // All keys should be found + assert!(tree.contains_key("prefix_测试_data".as_bytes(), u64::MAX)?); + assert!(tree.contains_key("prefix_тест_data".as_bytes(), u64::MAX)?); + assert!(tree.contains_key("prefix_🦀_data".as_bytes(), u64::MAX)?); + assert!(tree.contains_key("prefix_café".as_bytes(), u64::MAX)?); + assert!(tree.contains_key(&[0xFF, 0xFE, 0xFD, 0xFC, 0xFB, 0xFA], u64::MAX)?); + + // Non-existent keys + assert!(!tree.contains_key("prefix_missing".as_bytes(), u64::MAX)?); + assert!(!tree.contains_key(&[0xFF, 0xFE, 0xFD, 0x00, 0x00, 0x00], u64::MAX)?); + + #[cfg(feature = "metrics")] + { + let final_queries = tree.metrics().filter_queries(); + + assert!( + final_queries > initial_queries, + "filter queries should increase for non-ASCII key lookups" + ); + } + + Ok(()) +} + +#[test] +fn test_prefix_filter_keys_as_prefixes() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + + let tree = Config::new(&folder, SequenceNumberCounter::default()) + .prefix_extractor(Arc::new(FixedPrefixExtractor::new(4))) + .open()?; + + // Insert keys where some are prefixes of others + tree.insert(b"a", b"value1", 0); + tree.insert(b"ab", b"value2", 0); + tree.insert(b"abc", b"value3", 0); + tree.insert(b"abcd", b"value4", 0); + tree.insert(b"abcde", b"value5", 0); + tree.insert(b"abcdef", b"value6", 0); + + tree.flush_active_memtable(0)?; + + #[cfg(feature = "metrics")] + let initial_queries = tree.metrics().filter_queries(); + + // All keys should be found regardless of prefix relationships + assert!(tree.contains_key(b"a", u64::MAX)?); + assert!(tree.contains_key(b"ab", u64::MAX)?); + assert!(tree.contains_key(b"abc", u64::MAX)?); + assert!(tree.contains_key(b"abcd", u64::MAX)?); + assert!(tree.contains_key(b"abcde", u64::MAX)?); + assert!(tree.contains_key(b"abcdef", u64::MAX)?); + + // Non-existent keys with same prefix + assert!(!tree.contains_key(b"abcdx", u64::MAX)?); + assert!(!tree.contains_key(b"abx", u64::MAX)?); + + #[cfg(feature = "metrics")] + { + let final_queries = tree.metrics().filter_queries(); + + assert!( + final_queries > initial_queries, + "filter queries should increase for prefix-related key lookups" + ); + } + + Ok(()) +} + +#[test] +fn test_prefix_filter_very_long_keys() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + + let tree = Config::new(&folder, SequenceNumberCounter::default()) + .prefix_extractor(Arc::new(FixedPrefixExtractor::new(10))) + .open()?; + + // Create very long keys + let long_key1 = vec![b'a'; 10000]; + let long_key2 = vec![b'b'; 10000]; + let mut long_key3 = vec![b'c'; 5000]; + long_key3.extend(vec![b'd'; 5000]); + + tree.insert(&long_key1, b"long1", 0); + tree.insert(&long_key2, b"long2", 0); + tree.insert(&long_key3, b"long3", 0); + + tree.flush_active_memtable(0)?; + + #[cfg(feature = "metrics")] + let initial_queries = tree.metrics().filter_queries(); + + // All long keys should be found + assert!(tree.contains_key(&long_key1, u64::MAX)?); + assert!(tree.contains_key(&long_key2, u64::MAX)?); + assert!(tree.contains_key(&long_key3, u64::MAX)?); + + // Non-existent long key + let non_existent = vec![b'x'; 10000]; + assert!(!tree.contains_key(&non_existent, u64::MAX)?); + + #[cfg(feature = "metrics")] + { + let final_queries = tree.metrics().filter_queries(); + + assert!( + final_queries > initial_queries, + "filter queries should increase for very long key lookups" + ); + } + + Ok(()) +} + +#[test] +fn test_prefix_filter_all_same_byte() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + + let tree = Config::new(&folder, SequenceNumberCounter::default()) + .prefix_extractor(Arc::new(FixedPrefixExtractor::new(5))) + .open()?; + + // Insert keys that are all the same byte + for len in 1..=10 { + let key = vec![b'x'; len]; + tree.insert(&key, format!("value_{}", len).as_bytes(), 0); + } + + tree.flush_active_memtable(0)?; + + #[cfg(feature = "metrics")] + let initial_queries = tree.metrics().filter_queries(); + + // All keys should be found + for len in 1..=10 { + let key = vec![b'x'; len]; + assert!(tree.contains_key(&key, u64::MAX)?); + } + + // Non-existent key with same pattern + assert!(!tree.contains_key(vec![b'x'; 15], u64::MAX)?); + assert!(!tree.contains_key(vec![b'y'; 5], u64::MAX)?); + + #[cfg(feature = "metrics")] + { + let final_queries = tree.metrics().filter_queries(); + + assert!( + final_queries > initial_queries, + "filter queries should increase for same-byte key lookups" + ); + } + + Ok(()) +} + +// Custom extractor that returns many prefixes for stress testing +struct ManyPrefixExtractor; + +impl PrefixExtractor for ManyPrefixExtractor { + fn extract<'a>(&self, key: &'a [u8]) -> Box + 'a> { + let mut prefixes = Vec::new(); + + // Generate all possible prefixes (up to 20 or key length) + for i in 1..=key.len().min(20) { + prefixes.push(&key[0..i]); + } + + // Also add the full key + if !prefixes.is_empty() { + prefixes.push(key); + } + + Box::new(prefixes.into_iter()) + } + + fn name(&self) -> &str { + "ManyPrefixExtractor" + } +} + +#[test] +fn test_prefix_filter_many_prefixes() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + + let tree = Config::new(&folder, SequenceNumberCounter::default()) + .prefix_extractor(Arc::new(ManyPrefixExtractor)) + .open()?; + + // Insert keys that will generate many prefixes + tree.insert(b"this_is_a_very_long_key_for_testing", b"value1", 0); + tree.insert(b"another_long_key_with_many_prefixes", b"value2", 0); + tree.insert(b"short", b"value3", 0); + + tree.flush_active_memtable(0)?; + + #[cfg(feature = "metrics")] + let initial_queries = tree.metrics().filter_queries(); + + // All keys should be found + assert!(tree.contains_key(b"this_is_a_very_long_key_for_testing", u64::MAX)?); + assert!(tree.contains_key(b"another_long_key_with_many_prefixes", u64::MAX)?); + assert!(tree.contains_key(b"short", u64::MAX)?); + + // Test non-existent key + assert!(!tree.contains_key(b"non_existent_key_with_many_prefixes", u64::MAX)?); + + // Range queries should work with many prefixes + let range = tree.range(b"this".as_ref().., u64::MAX, None); + assert!(range.count() > 0); + + let range = tree.range(b"anot".as_ref().., u64::MAX, None); + assert!(range.count() > 0); + + #[cfg(feature = "metrics")] + { + let final_queries = tree.metrics().filter_queries(); + + assert!( + final_queries > initial_queries, + "filter queries should increase for many-prefix extractor" + ); + } + + Ok(()) +} + +#[test] +fn test_prefix_filter_disabled() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + + // Create tree with filter disabled + let tree = Config::new(&folder, SequenceNumberCounter::default()) + .prefix_extractor(Arc::new(FixedPrefixExtractor::new(5))) + .filter_policy(FilterPolicy::all(FilterPolicyEntry::None)) // Disable filter + .open()?; + + // Insert some keys + for i in 0..100 { + let key = format!("disabled_{:04}", i); + tree.insert(key.as_bytes(), b"value", 0); + } + + tree.flush_active_memtable(0)?; + + #[cfg(feature = "metrics")] + let initial_queries = tree.metrics().filter_queries(); + + // Keys should still be found (via actual disk lookups) + for i in 0..100 { + let key = format!("disabled_{:04}", i); + assert!(tree.contains_key(key.as_bytes(), u64::MAX)?); + } + + #[cfg(feature = "metrics")] + { + let final_queries = tree.metrics().filter_queries(); + // Should have no filter queries when disabled + assert_eq!( + final_queries, initial_queries, + "No filter queries when disabled" + ); + } + + Ok(()) +} + +#[test] +fn test_prefix_filter_false_positive_rate() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + + // Use higher bits per key for lower false positive rate + let tree = Config::new(&folder, SequenceNumberCounter::default()) + .prefix_extractor(Arc::new(FixedPrefixExtractor::new(8))) + .filter_policy(FilterPolicy::all(FilterPolicyEntry::Bloom( + BloomConstructionPolicy::BitsPerKey(20.0), + ))) // Higher bits for lower FP rate + .open()?; + + // Insert a specific set of keys + for i in 0..1000 { + let key = format!("fptest_{:06}", i * 2); // Even numbers only + tree.insert(key.as_bytes(), b"value", 0); + } + + tree.flush_active_memtable(0)?; + + #[cfg(feature = "metrics")] + let initial_queries = tree.metrics().filter_queries(); + #[cfg(feature = "metrics")] + let initial_hits = tree.metrics().io_skipped_by_filter(); + + let mut false_positives = 0; + let total_checks = 1000; + + // Check for non-existent keys (odd numbers) + for i in 0..total_checks { + let key = format!("fptest_{:06}", i * 2 + 1); + if tree.contains_key(key.as_bytes(), u64::MAX)? { + false_positives += 1; + } + } + + // With 20 bits per key, false positive rate should be very low + let fp_rate = false_positives as f64 / total_checks as f64; + assert!( + fp_rate < 0.01, + "False positive rate {} should be less than 1% with 20 bits per key", + fp_rate + ); + + #[cfg(feature = "metrics")] + { + let final_queries = tree.metrics().filter_queries(); + let final_hits = tree.metrics().io_skipped_by_filter(); + + // Should have queries for all lookups + assert!( + final_queries > initial_queries, + "filter queries should increase for false-positive rate test" + ); + + // False positives will cause hits to increase, but most should be filtered + // The number of hits should be approximately equal to the false positive count + assert!( + final_hits <= initial_hits + (false_positives as usize) + 10, + "filter hits should only increase for false positives, not true negatives" + ); + } + + Ok(()) +} + +#[test] +fn test_prefix_filter_mixed_domain_keys() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + + let tree = Config::new(&folder, SequenceNumberCounter::default()) + .prefix_extractor(Arc::new(FixedLengthExtractor::new(8))) + .open()?; + + // Mix of in-domain and out-of-domain keys + tree.insert(b"12345678_data", b"in_domain", 0); // In domain + tree.insert(b"short", b"out_of_domain", 0); // Out of domain + tree.insert(b"12345678", b"exact_length", 0); // Exact length + tree.insert(b"1234567", b"too_short", 0); // Out of domain + tree.insert(b"123456789", b"longer", 0); // In domain + + tree.flush_active_memtable(0)?; + + #[cfg(feature = "metrics")] + let initial_queries = tree.metrics().filter_queries(); + + // All keys should be found + assert!(tree.contains_key(b"12345678_data", u64::MAX)?); + assert!(tree.contains_key(b"short", u64::MAX)?); + assert!(tree.contains_key(b"12345678", u64::MAX)?); + assert!(tree.contains_key(b"1234567", u64::MAX)?); + assert!(tree.contains_key(b"123456789", u64::MAX)?); + + // Non-existent keys with different domain status + assert!(!tree.contains_key(b"12345678_missing", u64::MAX)?); // Would be in domain + assert!(!tree.contains_key(b"tiny", u64::MAX)?); // Would be out of domain + + #[cfg(feature = "metrics")] + { + let final_queries = tree.metrics().filter_queries(); + + assert!( + final_queries > initial_queries, + "filter queries should increase for mixed domain key lookups" + ); + } + + Ok(()) +} + +/// Test that range queries don't incorrectly skip segments when the start bound +/// doesn't exist in the filter but other keys in the range do exist +#[test] +fn test_prefix_filter_range_with_missing_start_bound() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + + // Use full key as prefix (FullKeyExtractor) + let tree = Config::new(&folder, SequenceNumberCounter::default()) + .prefix_extractor(Arc::new(FullKeyExtractor)) + .open()?; + + // Insert keys b and c, but not a + tree.insert(b"b", b"value_b", 0); + tree.insert(b"c", b"value_c", 0); + tree.flush_active_memtable(0)?; + + #[cfg(feature = "metrics")] + let initial_queries = tree.metrics().filter_queries(); + + // Query range a..=c + // Extract common prefix from both bounds (empty for "a" and "c") + // But now we check if start bound "a" exists - it doesn't, but segment starts with "b" + // So we can't skip the segment (different prefixes) + let mut results = Vec::new(); + for item in tree.range(&b"a"[..]..=&b"c"[..], u64::MAX, None) { + results.push(item.key()?.to_vec()); + } + + // Should return b and c (even though a doesn't exist) + assert_eq!(results.len(), 2); + assert_eq!(results[0], b"b"); + assert_eq!(results[1], b"c"); + + #[cfg(feature = "metrics")] + { + let final_queries = tree.metrics().filter_queries(); + + assert_eq!( + final_queries, initial_queries, + "filter should not be queried" + ); + } + + Ok(()) +} + +/// Test the new optimization: when range has no common prefix but start bound prefix doesn't exist +#[test] +fn test_prefix_filter_range_start_prefix_optimization() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + + // Use a fixed prefix extractor + let tree = Config::new(&folder, SequenceNumberCounter::default()) + .prefix_extractor(Arc::new(FixedPrefixExtractor::new(3))) + .open()?; + + // Insert keys that all start with "bbb" + tree.insert(b"bbb_1", b"value1", 0); + tree.insert(b"bbb_2", b"value2", 0); + tree.insert(b"bbb_3", b"value3", 0); + tree.flush_active_memtable(0)?; + + #[cfg(feature = "metrics")] + let initial_queries = tree.metrics().filter_queries(); + + // Query range aaa..zzz (no common prefix) + // The segment starts with "bbb" and "aaa" doesn't exist + // But since segment min ("bbb") != start prefix ("aaa"), we can't skip + let mut results = Vec::new(); + for item in tree.range(&b"aaa"[..]..&b"zzz"[..], u64::MAX, None) { + results.push(item.key()?.to_vec()); + } + assert_eq!(results.len(), 3, "Should find all bbb keys"); + + #[cfg(feature = "metrics")] + { + let final_queries = tree.metrics().filter_queries(); + + assert_eq!( + final_queries, initial_queries, + "filter should not be queried" + ); + } + + // Now test where we CAN skip: segment that starts with same prefix as missing start bound + let tree2 = Config::new( + folder.path().join("test2"), + SequenceNumberCounter::default(), + ) + .prefix_extractor(Arc::new(FixedPrefixExtractor::new(3))) + .open()?; + + // Create a tree with keys having prefix "aaa" and "aac" but not "aab" + tree2.insert(b"aaa_1", b"value1", 0); + tree2.insert(b"aaa_2", b"value2", 0); + tree2.insert(b"aac_1", b"value3", 0); + tree2.insert(b"aac_2", b"value4", 0); + tree2.flush_active_memtable(0)?; + + #[cfg(feature = "metrics")] + let initial_queries = tree2.metrics().filter_queries(); + + // First verify the tree has data + assert!(tree2.contains_key(b"aaa_1", u64::MAX)?); + assert!(tree2.contains_key(b"aac_1", u64::MAX)?); + + #[cfg(feature = "metrics")] + { + let final_queries = tree.metrics().filter_queries(); + + assert_eq!( + final_queries, initial_queries, + "filter should not be queried" + ); + } + + #[cfg(feature = "metrics")] + let initial_queries = tree2.metrics().filter_queries(); + + // Query for range with common prefix "aab" - no keys exist with this prefix + // Range: aab_1..aab_9 has common prefix "aab" + // The segment contains "aaa" and "aac" keys, so it overlaps the range + // filter will be checked for "aab" and should indicate it doesn't exist + let range_iter = tree2.range(&b"aab_1"[..]..&b"aab_9"[..], u64::MAX, None); + let results: Vec<_> = range_iter.collect(); + assert_eq!( + results.len(), + 0, + "No keys should match since aab prefix doesn't exist" + ); + + #[cfg(feature = "metrics")] + { + let final_queries = tree2.metrics().filter_queries(); + + assert!( + final_queries > initial_queries, + "filter queries should increase for range operations" + ); + } + + Ok(()) +} + +/// Test that range queries correctly handle different prefix scenarios: +/// same prefix, different prefixes, and non-existent prefixes +#[test] +fn test_prefix_filter_range_across_different_prefixes() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + + let tree = Config::new(folder.path(), SequenceNumberCounter::default()) + .prefix_extractor(Arc::new(FixedPrefixExtractor::new(5))) + .open()?; + + // Store keys with same prefix + tree.insert("user1_a", "v1", 0); + tree.insert("user1_b", "v2", 0); + tree.flush_active_memtable(0)?; + + tree.insert("user2_a", "v3", 1); + tree.insert("user2_b", "v4", 1); + tree.flush_active_memtable(0)?; + + #[cfg(feature = "metrics")] + let initial_queries = tree.metrics().filter_queries(); + + // Query with common prefix "user1" - should find entries + let count = tree.range("user1_a"..="user1_z", u64::MAX, None).count(); + assert_eq!(count, 2, "Should find user1 entries"); + + // Query with non-existent prefix - should return nothing + let count = tree.range("user3_a"..="user3_z", u64::MAX, None).count(); + assert_eq!(count, 0, "Should find no user3 entries"); + + // Query across different prefixes - no common prefix + let count = tree.range("user1_a"..="user2_b", u64::MAX, None).count(); + assert_eq!(count, 4, "Should find all entries when no common prefix"); + + #[cfg(feature = "metrics")] + { + let final_queries = tree.metrics().filter_queries(); + + // Range queries with common prefix should trigger filter + assert!( + final_queries > initial_queries, + "filter queries should increase for range operations" + ); + } + + Ok(()) +} + +/// Test range queries with reversed bounds (should return empty) +#[test] +fn test_prefix_filter_range_reversed_bounds() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + + let tree = Config::new(folder.path(), SequenceNumberCounter::default()) + .prefix_extractor(Arc::new(FullKeyExtractor)) + .open()?; + + // Insert some keys + tree.insert("a", "value_a", 0); + tree.insert("b", "value_b", 0); + tree.insert("c", "value_c", 0); + tree.flush_active_memtable(0)?; + + #[cfg(feature = "metrics")] + let initial_queries = tree.metrics().filter_queries(); + + // Query with reversed bounds - should return empty + let count = tree.range("c".."a", u64::MAX, None).count(); + assert_eq!(count, 0, "Reversed bounds should return empty"); + + // Also test with excluded bounds reversed + use std::ops::Bound; + let count = tree + .range::<&str, _>((Bound::Excluded("c"), Bound::Included("a")), u64::MAX, None) + .count(); + assert_eq!(count, 0, "Reversed excluded bounds should return empty"); + + #[cfg(feature = "metrics")] + { + let final_queries = tree.metrics().filter_queries(); + // Reversed bounds may skip filter entirely + assert_eq!( + final_queries, initial_queries, + "filter should not be queried for reversed (empty) ranges" + ); + } + + Ok(()) +} + +/// Test range with same key but different bound types +#[test] +fn test_prefix_filter_range_same_key_different_bounds() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + + let tree = Config::new(folder.path(), SequenceNumberCounter::default()) + .prefix_extractor(Arc::new(FullKeyExtractor)) + .open()?; + + tree.insert("key", "value", 0); + tree.insert("key2", "value2", 0); + tree.flush_active_memtable(0)?; + + #[cfg(feature = "metrics")] + let initial_queries = tree.metrics().filter_queries(); + #[cfg(feature = "metrics")] + let initial_hits = tree.metrics().io_skipped_by_filter(); + + // Included..Excluded with same key (empty range) + use std::ops::Bound; + let count = tree + .range::<&str, _>( + (Bound::Included("key"), Bound::Excluded("key")), + u64::MAX, + None, + ) + .count(); + assert_eq!(count, 0, "Included..Excluded same key should be empty"); + + // Excluded..Included with same key (empty range) + let count = tree + .range::<&str, _>( + (Bound::Excluded("key"), Bound::Included("key")), + u64::MAX, + None, + ) + .count(); + assert_eq!(count, 0, "Excluded..Included same key should be empty"); + + // Included..Included with same key (single item) + let count = tree + .range::<&str, _>( + (Bound::Included("key"), Bound::Included("key")), + u64::MAX, + None, + ) + .count(); + assert_eq!(count, 1, "Included..Included same key should return 1"); + + #[cfg(feature = "metrics")] + { + let final_queries = tree.metrics().filter_queries(); + let final_hits = tree.metrics().io_skipped_by_filter(); + + // Range queries should use filter even with same key bounds + assert!( + final_queries > initial_queries, + "filter queries should increase for same-key range operations" + ); + + // Keys exist, hits should not increase + assert_eq!( + final_hits, initial_hits, + "filter hits should not increase for existing keys" + ); + } + + Ok(()) +} + +/// Test range with non-consecutive keys having common prefix +#[test] +fn test_prefix_filter_range_non_consecutive_keys() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + + let tree = Config::new(folder.path(), SequenceNumberCounter::default()) + .prefix_extractor(Arc::new(FixedPrefixExtractor::new(3))) + .open()?; + + // Insert non-consecutive keys with same prefix + tree.insert("app_1", "v1", 0); + tree.insert("app_3", "v3", 0); + tree.insert("app_5", "v5", 0); + tree.insert("app_7", "v7", 0); + tree.flush_active_memtable(0)?; + + #[cfg(feature = "metrics")] + let initial_queries = tree.metrics().filter_queries(); + #[cfg(feature = "metrics")] + let initial_hits = tree.metrics().io_skipped_by_filter(); + + // Query for range that includes missing keys + let count = tree.range("app_2"..="app_6", u64::MAX, None).count(); + assert_eq!(count, 2, "Should find app_3 and app_5"); + + // Query for range entirely between existing keys + let count = tree.range("app_4".."app_5", u64::MAX, None).count(); + assert_eq!(count, 0, "No keys in range app_4..app_5"); + + #[cfg(feature = "metrics")] + { + let final_queries = tree.metrics().filter_queries(); + let final_hits = tree.metrics().io_skipped_by_filter(); + + // Concurrent access should still use filters + assert!( + final_queries > initial_queries, + "filter queries should work with sequence consistency" + ); + + // Keys exist at various sequence numbers, hits should not increase + assert_eq!( + final_hits, initial_hits, + "filter hits should not increase (keys exist or filtered by seqno)" + ); + } + + Ok(()) +} + +/// Test range queries across multiple segments with different prefixes +#[test] +fn test_prefix_filter_range_multiple_segments() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + + let tree = Config::new(folder.path(), SequenceNumberCounter::default()) + .prefix_extractor(Arc::new(FixedPrefixExtractor::new(4))) + .open()?; + + // Create first segment with user prefix + tree.insert("user_001", "v1", 0); + tree.insert("user_002", "v2", 0); + tree.flush_active_memtable(0)?; + + // Create second segment with item prefix + tree.insert("item_001", "v3", 1); + tree.insert("item_002", "v4", 1); + tree.flush_active_memtable(0)?; + + // Create third segment with both prefixes + tree.insert("user_003", "v5", 2); + tree.insert("item_003", "v6", 2); + tree.flush_active_memtable(0)?; + + #[cfg(feature = "metrics")] + let initial_queries = tree.metrics().filter_queries(); + + // Query across all segments + let count = tree.range("item_001"..="user_003", u64::MAX, None).count(); + assert_eq!(count, 6, "Should find all items and users in range"); + + // Query for non-existent prefix across segments + let count = tree.range("test_001"..="test_999", u64::MAX, None).count(); + assert_eq!(count, 0, "Non-existent prefix should return nothing"); + + #[cfg(feature = "metrics")] + { + let final_queries = tree.metrics().filter_queries(); + // Queries across multiple segments should check filters + assert!( + final_queries > initial_queries, + "filter queries should increase for multi-segment range queries" + ); + } + + Ok(()) +} + +/// Test range with keys where prefix changes at segment boundary +#[test] +fn test_prefix_filter_range_prefix_boundary() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + + let tree = Config::new(folder.path(), SequenceNumberCounter::default()) + .prefix_extractor(Arc::new(FixedPrefixExtractor::new(3))) + .open()?; + + // First segment ends with "aaz" + tree.insert("aax_1", "v1", 0); + tree.insert("aay_1", "v2", 0); + tree.insert("aaz_1", "v3", 0); + tree.flush_active_memtable(0)?; + + // Second segment starts with "aba" (different prefix) + tree.insert("aba_1", "v4", 1); + tree.insert("abb_1", "v5", 1); + tree.insert("abc_1", "v6", 1); + tree.flush_active_memtable(0)?; + + #[cfg(feature = "metrics")] + let initial_queries = tree.metrics().filter_queries(); + + // Query across the boundary + let count = tree.range("aay_1"..="abb_1", u64::MAX, None).count(); + assert_eq!(count, 4, "Should find keys from both segments"); + + // Query that spans missing prefix between segments + let count = tree.range("aaz_2"..="aba_0", u64::MAX, None).count(); + assert_eq!(count, 0, "No keys in the gap between segments"); + + #[cfg(feature = "metrics")] + { + let final_queries = tree.metrics().filter_queries(); + // Common prefix is only 2 chars ("aa" and "ab"), less than extractor length (3) + // So filter may be bypassed + assert_eq!( + final_queries, initial_queries, + "filter should be bypassed when common prefix is shorter than extractor" + ); + } + + Ok(()) +} + +/// Test range with no prefix extractor (should not use filter optimization) +#[test] +fn test_prefix_filter_range_no_extractor() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + + // Create tree without prefix extractor + let tree = Config::new(folder.path(), SequenceNumberCounter::default()).open()?; + + // Insert various keys + tree.insert("a", "v1", 0); + tree.insert("b", "v2", 0); + tree.insert("c", "v3", 0); + tree.insert("d", "v4", 0); + tree.insert("e", "v5", 0); + tree.flush_active_memtable(0)?; + + #[cfg(feature = "metrics")] + let initial_queries = tree.metrics().filter_queries(); + + // Range queries should work normally without filter optimization + let count = tree.range("a"..="c", u64::MAX, None).count(); + assert_eq!(count, 3, "Should find a, b, c"); + + let count = tree.range("b"..="d", u64::MAX, None).count(); + assert_eq!(count, 3, "Should find b, c, d"); + + // Empty range + let count = tree.range("f"..="z", u64::MAX, None).count(); + assert_eq!(count, 0, "Should find nothing"); + + #[cfg(feature = "metrics")] + { + let final_queries = tree.metrics().filter_queries(); + // Without prefix extractor, filter optimization is not used for ranges + assert_eq!( + final_queries, initial_queries, + "filter should not be used for ranges without prefix extractor" + ); + } + + Ok(()) +} + +/// Test range with both bounds excluded +#[test] +fn test_prefix_filter_range_both_excluded() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + + let tree = Config::new(folder.path(), SequenceNumberCounter::default()) + .prefix_extractor(Arc::new(FullKeyExtractor)) + .open()?; + + // Insert keys + for key in ["a", "b", "c", "d", "e"] { + tree.insert(key, "value", 0); + } + tree.flush_active_memtable(0)?; + + #[cfg(feature = "metrics")] + let initial_queries = tree.metrics().filter_queries(); + #[cfg(feature = "metrics")] + let initial_hits = tree.metrics().io_skipped_by_filter(); + + // Test with both bounds excluded + use std::ops::Bound; + let count = tree + .range::<&str, _>((Bound::Excluded("a"), Bound::Excluded("e")), u64::MAX, None) + .count(); + assert_eq!(count, 3, "Should return b, c, d"); + + // Edge case: adjacent keys with both excluded + let count = tree + .range::<&str, _>((Bound::Excluded("b"), Bound::Excluded("c")), u64::MAX, None) + .count(); + assert_eq!(count, 0, "No keys between adjacent excluded bounds"); + + #[cfg(feature = "metrics")] + { + let final_queries = tree.metrics().filter_queries(); + let final_hits = tree.metrics().io_skipped_by_filter(); + + // Range queries with excluded bounds may or may not use filter + // depending on prefix extraction logic + assert!( + final_queries >= initial_queries, + "filter queries should not decrease for excluded bound ranges" + ); + + // All keys exist, hits should not increase + assert_eq!( + final_hits, initial_hits, + "filter hits should not increase for existing keys" + ); + } + + Ok(()) +} + +/// Test range after compaction with prefix filters +#[test] +fn test_prefix_filter_range_after_compaction() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + + let tree = Config::new(folder.path(), SequenceNumberCounter::default()) + .prefix_extractor(Arc::new(FixedPrefixExtractor::new(4))) + .open()?; + + // Create multiple segments + for i in 0..3 { + tree.insert(format!("user_{}", i), format!("v{}", i), i); + tree.insert(format!("item_{}", i), format!("i{}", i), i); + tree.flush_active_memtable(0)?; + } + + // Skip compaction test since it's not implemented + // tree.major_compact(u64::MAX)?; + + #[cfg(feature = "metrics")] + let initial_queries = tree.metrics().filter_queries(); + + // Verify range queries still work after compaction + let count = tree.range("user_0"..="user_2", u64::MAX, None).count(); + assert_eq!(count, 3, "Should find all user keys after compaction"); + + let count = tree.range("item_0"..="item_2", u64::MAX, None).count(); + assert_eq!(count, 3, "Should find all item keys after compaction"); + + // Query across prefixes + let count = tree.range("item_1"..="user_1", u64::MAX, None).count(); + assert_eq!(count, 4, "Should find mixed keys after compaction"); + + #[cfg(feature = "metrics")] + { + let final_queries = tree.metrics().filter_queries(); + // Range queries with common prefix should use filter + assert!( + final_queries > initial_queries, + "filter queries should increase for range operations after compaction" + ); + } + + Ok(()) +} + +/// Test range with Unicode/UTF-8 prefix boundaries +#[test] +fn test_prefix_filter_range_utf8_boundaries() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + + let tree = Config::new(folder.path(), SequenceNumberCounter::default()) + .prefix_extractor(Arc::new(FixedPrefixExtractor::new(6))) // 6 bytes = 2 UTF-8 chars for these emojis + .open()?; + + // Insert keys with emoji prefixes (each emoji is 3-4 bytes) + tree.insert("🎈🎈_001", "v1", 0); + tree.insert("🎈🎈_002", "v2", 0); + tree.insert("🎉🎉_001", "v3", 0); + tree.insert("🎉🎉_002", "v4", 0); + tree.flush_active_memtable(0)?; + + #[cfg(feature = "metrics")] + let initial_queries = tree.metrics().filter_queries(); + + // Query within same emoji prefix + let count = tree.range("🎈🎈_001"..="🎈🎈_002", u64::MAX, None).count(); + assert_eq!(count, 2, "Should find keys with balloon prefix"); + + // Query across different emoji prefixes + let count = tree.range("🎈🎈_002"..="🎉🎉_001", u64::MAX, None).count(); + assert_eq!(count, 2, "Should find keys across emoji boundaries"); + + #[cfg(feature = "metrics")] + { + let final_queries = tree.metrics().filter_queries(); + // Emoji prefixes should trigger filter checks + assert!( + final_queries > initial_queries, + "filter queries should increase for UTF-8 boundary range queries" + ); + } + + Ok(()) +} + +/// Test with custom extractor returning multiple prefixes +#[test] +fn test_prefix_filter_range_multi_prefix_extractor() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + + // Custom extractor that returns multiple prefixes + struct MultiPrefixExtractor; + impl PrefixExtractor for MultiPrefixExtractor { + fn extract<'a>(&self, key: &'a [u8]) -> Box + 'a> { + if key.len() >= 6 { + // Return both 3-byte and 6-byte prefixes + Box::new(vec![&key[..3], &key[..6]].into_iter()) + } else if key.len() >= 3 { + Box::new(std::iter::once(&key[..3])) + } else { + Box::new(std::iter::once(key)) + } + } + fn name(&self) -> &str { + "MultiPrefixExtractor" + } + } + + let tree = Config::new(folder.path(), SequenceNumberCounter::default()) + .prefix_extractor(Arc::new(MultiPrefixExtractor)) + .open()?; + + tree.insert("abc123_data", "v1", 0); + tree.insert("abc456_data", "v2", 0); + tree.insert("def123_data", "v3", 0); + tree.flush_active_memtable(0)?; + + #[cfg(feature = "metrics")] + let initial_queries = tree.metrics().filter_queries(); + #[cfg(feature = "metrics")] + let initial_hits = tree.metrics().io_skipped_by_filter(); + + // Query should work with common 3-byte prefix + let count = tree.range("abc000"..="abc999", u64::MAX, None).count(); + assert_eq!(count, 2, "Should find keys with abc prefix"); + + #[cfg(feature = "metrics")] + { + let final_queries = tree.metrics().filter_queries(); + let final_hits = tree.metrics().io_skipped_by_filter(); + + // Segmented extractor should use filters for prefix matching + assert!( + final_queries > initial_queries, + "filter queries should work with segmented extractor" + ); + + // All keys exist, so hits should not increase + assert_eq!( + final_hits, initial_hits, + "filter hits should not increase for existing keys" + ); + } + + Ok(()) +} + +/// Test range with bytes at UTF-8 boundary splitting +#[test] +fn test_prefix_filter_range_utf8_split() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + + // Use a fixed byte extractor that might split UTF-8 chars + let tree = Config::new(folder.path(), SequenceNumberCounter::default()) + .prefix_extractor(Arc::new(FixedPrefixExtractor::new(2))) + .open()?; + + // Insert keys with multi-byte UTF-8 characters + tree.insert("中文_1", "v1", 0); + tree.insert("中文_2", "v2", 0); + tree.insert("日本_1", "v3", 0); + tree.flush_active_memtable(0)?; + + #[cfg(feature = "metrics")] + let initial_queries = tree.metrics().filter_queries(); + #[cfg(feature = "metrics")] + let initial_hits = tree.metrics().io_skipped_by_filter(); + + // The prefix will be the first 2 bytes, which splits the UTF-8 character + // This tests that the implementation handles partial UTF-8 correctly + let count = tree.range("中文_1"..="中文_2", u64::MAX, None).count(); + assert_eq!(count, 2, "Should find keys despite UTF-8 splitting"); + + #[cfg(feature = "metrics")] + { + let final_queries = tree.metrics().filter_queries(); + let final_hits = tree.metrics().io_skipped_by_filter(); + + // Range query should use filter even with UTF-8 split + assert!( + final_queries > initial_queries, + "filter queries should increase for UTF-8 split range" + ); + + // Keys exist, hits should not increase + assert_eq!( + final_hits, initial_hits, + "filter hits should not increase for existing keys" + ); + } + + Ok(()) +} + +/// Test empty range (start > end after normalization) +#[test] +fn test_prefix_filter_empty_normalized_range() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + + let tree = Config::new(folder.path(), SequenceNumberCounter::default()) + .prefix_extractor(Arc::new(FullKeyExtractor)) + .open()?; + + tree.insert("b", "value", 0); + tree.flush_active_memtable(0)?; + + #[cfg(feature = "metrics")] + let initial_queries = tree.metrics().filter_queries(); + #[cfg(feature = "metrics")] + let initial_hits = tree.metrics().io_skipped_by_filter(); + + // Create a range that becomes empty after normalization + use std::ops::Bound; + let count = tree + .range::<&str, _>((Bound::Excluded("b"), Bound::Excluded("b")), u64::MAX, None) + .count(); + assert_eq!(count, 0, "Empty normalized range should return nothing"); + + #[cfg(feature = "metrics")] + { + let final_queries = tree.metrics().filter_queries(); + let final_hits = tree.metrics().io_skipped_by_filter(); + + // Empty normalized range may skip filter + assert_eq!( + final_queries, initial_queries, + "filter should not be queried for empty normalized range" + ); + + assert_eq!( + final_hits, initial_hits, + "filter hits should not change for empty range" + ); + } + + Ok(()) +} + +/// A test prefix extractor that extracts a fixed prefix with a custom name +struct TestPrefixExtractor { + length: usize, + name: String, +} + +impl TestPrefixExtractor { + fn new(length: usize, name: &str) -> Self { + Self { + length, + name: name.to_string(), + } + } +} + +impl PrefixExtractor for TestPrefixExtractor { + fn extract<'a>(&self, key: &'a [u8]) -> Box + 'a> { + if key.len() >= self.length { + Box::new(std::iter::once(&key[..self.length])) + } else { + Box::new(std::iter::once(key)) + } + } + + fn name(&self) -> &str { + &self.name + } +} + +#[test] +fn test_same_extractor_compatibility() -> lsm_tree::Result<()> { + let temp_dir = tempfile::tempdir()?; + let path = temp_dir.path(); + + let extractor = Arc::new(TestPrefixExtractor::new(4, "test_extractor")); + + // Create a tree with prefix extractor + { + let tree = Config::new(path, SequenceNumberCounter::default()) + .prefix_extractor(extractor.clone()) + .open()?; + + tree.insert("user_key1", "value1", 0); + tree.insert("user_key2", "value2", 0); + tree.insert("data_key1", "value3", 0); + tree.flush_active_memtable(0)?; + } + + // Reopen with the same extractor - should work fine with prefix filtering + { + let tree = Config::new(path, SequenceNumberCounter::default()) + .prefix_extractor(extractor) + .open()?; + + // Should be able to use prefix filtering + #[cfg(feature = "metrics")] + let initial_queries = tree.metrics().filter_queries(); + + assert_eq!(&*tree.get("user_key1", u64::MAX)?.unwrap(), b"value1"); + assert_eq!(&*tree.get("user_key2", u64::MAX)?.unwrap(), b"value2"); + assert_eq!(&*tree.get("data_key1", u64::MAX)?.unwrap(), b"value3"); + + #[cfg(feature = "metrics")] + { + let final_queries = tree.metrics().filter_queries(); + // Should have incremented filter queries since extractor is compatible + assert!( + final_queries > initial_queries, + "Compatible extractor should increment filter queries: {} -> {}", + initial_queries, + final_queries + ); + } + + // Test range queries with prefix filtering optimization + let items: Vec<_> = tree.range("user"..="user_zzzz", u64::MAX, None).collect(); + assert_eq!(items.len(), 2); + } + + Ok(()) +} + +#[test] +fn test_different_extractor_incompatible() -> lsm_tree::Result<()> { + let temp_dir = tempfile::tempdir()?; + let path = temp_dir.path(); + + let extractor1 = Arc::new(TestPrefixExtractor::new(4, "test_extractor_v1")); + let extractor2 = Arc::new(TestPrefixExtractor::new(4, "test_extractor_v2")); + + // Create a tree with first extractor + { + let tree = Config::new(path, SequenceNumberCounter::default()) + .prefix_extractor(extractor1) + .open()?; + + tree.insert("user_key1", "value1", 0); + tree.insert("user_key2", "value2", 0); + tree.insert("data_key1", "value3", 0); + tree.flush_active_memtable(0)?; + } + + // Reopen with different extractor - should disable prefix filtering for old segments + { + let tree = Config::new(path, SequenceNumberCounter::default()) + .prefix_extractor(extractor2) + .open()?; + + // Should still work, but without prefix filtering optimization for old segments + // The incompatible extractor means filter is completely bypassed + #[cfg(feature = "metrics")] + let initial_queries = tree.metrics().filter_queries(); + + assert_eq!(&*tree.get("user_key1", u64::MAX)?.unwrap(), b"value1"); + assert_eq!(&*tree.get("user_key2", u64::MAX)?.unwrap(), b"value2"); + assert_eq!(&*tree.get("data_key1", u64::MAX)?.unwrap(), b"value3"); + + #[cfg(feature = "metrics")] + { + let final_queries = tree.metrics().filter_queries(); + // Should NOT have incremented filter queries since extractor is incompatible + assert_eq!( + final_queries, initial_queries, + "Incompatible extractor should not increment filter queries: {} -> {}", + initial_queries, final_queries + ); + } + + // Range queries should still work correctly (but without optimization for old segments) + let items: Vec<_> = tree.range("user"..="user_zzzz", u64::MAX, None).collect(); + assert_eq!(items.len(), 2); + + // New writes should use the new extractor + tree.insert("test_key1", "value4", 1); + tree.flush_active_memtable(0)?; + + assert_eq!(&*tree.get("test_key1", u64::MAX)?.unwrap(), b"value4"); + } + + Ok(()) +} + +#[test] +fn test_no_extractor_to_extractor() -> lsm_tree::Result<()> { + let temp_dir = tempfile::tempdir()?; + let path = temp_dir.path(); + + // Create a tree without prefix extractor + { + let tree = Config::new(path, SequenceNumberCounter::default()).open()?; + + tree.insert("user_key1", "value1", 0); + tree.insert("user_key2", "value2", 0); + tree.insert("data_key1", "value3", 0); + tree.flush_active_memtable(0)?; + } + + // Reopen with prefix extractor - should disable prefix filtering for old segments + { + let extractor = Arc::new(TestPrefixExtractor::new(4, "test_extractor")); + let tree = Config::new(path, SequenceNumberCounter::default()) + .prefix_extractor(extractor) + .open()?; + + // Should still work, but old segments won't use prefix filtering + assert_eq!(&*tree.get("user_key1", u64::MAX)?.unwrap(), b"value1"); + assert_eq!(&*tree.get("user_key2", u64::MAX)?.unwrap(), b"value2"); + assert_eq!(&*tree.get("data_key1", u64::MAX)?.unwrap(), b"value3"); + + // New writes should use prefix extractor + tree.insert("test_key1", "value4", 1); + tree.flush_active_memtable(0)?; + + assert_eq!(&*tree.get("test_key1", u64::MAX)?.unwrap(), b"value4"); + } + + Ok(()) +} + +#[test] +fn test_extractor_to_no_extractor() -> lsm_tree::Result<()> { + let temp_dir = tempfile::tempdir()?; + let path = temp_dir.path(); + + let extractor = Arc::new(TestPrefixExtractor::new(4, "test_extractor")); + + // Create a tree with prefix extractor + { + let tree = Config::new(path, SequenceNumberCounter::default()) + .prefix_extractor(extractor) + .open()?; + + tree.insert("user_key1", "value1", 0); + tree.insert("user_key2", "value2", 0); + tree.insert("data_key1", "value3", 0); + tree.flush_active_memtable(0)?; + } + + // Reopen without prefix extractor - should disable prefix filtering for old segments + { + let tree = Config::new(path, SequenceNumberCounter::default()).open()?; + + // Should still work, but old segments won't use prefix filtering + assert_eq!(&*tree.get("user_key1", u64::MAX)?.unwrap(), b"value1"); + assert_eq!(&*tree.get("user_key2", u64::MAX)?.unwrap(), b"value2"); + assert_eq!(&*tree.get("data_key1", u64::MAX)?.unwrap(), b"value3"); + + // Range queries should still work + let items: Vec<_> = tree.range("user"..="user_zzzz", u64::MAX, None).collect(); + assert_eq!(items.len(), 2); + } + + Ok(()) +} + +#[test] +fn test_builtin_extractors_compatibility() -> lsm_tree::Result<()> { + let temp_dir = tempfile::tempdir()?; + let path = temp_dir.path(); + + // Create with FixedPrefixExtractor + { + let tree = Config::new(path, SequenceNumberCounter::default()) + .prefix_extractor(Arc::new(FixedPrefixExtractor::new(4))) + .open()?; + + tree.insert("user_key1", "value1", 0); + tree.insert("user_key2", "value2", 0); + tree.flush_active_memtable(0)?; + } + + // Reopen with FixedLengthExtractor (different name) - should be incompatible + { + let tree = Config::new(path, SequenceNumberCounter::default()) + .prefix_extractor(Arc::new(FixedLengthExtractor::new(4))) + .open()?; + + // Should work but without prefix filtering for old segments + assert_eq!(&*tree.get("user_key1", u64::MAX)?.unwrap(), b"value1"); + assert_eq!(&*tree.get("user_key2", u64::MAX)?.unwrap(), b"value2"); + } + + // Reopen with same type (FixedPrefixExtractor) - should be compatible + { + let tree = Config::new(path, SequenceNumberCounter::default()) + .prefix_extractor(Arc::new(FixedPrefixExtractor::new(4))) + .open()?; + + // Should work with prefix filtering for old segments + assert_eq!(&*tree.get("user_key1", u64::MAX)?.unwrap(), b"value1"); + assert_eq!(&*tree.get("user_key2", u64::MAX)?.unwrap(), b"value2"); + } + + Ok(()) +} + +#[test] +fn test_new_segments_use_new_extractor() -> lsm_tree::Result<()> { + let temp_dir = tempfile::tempdir()?; + let path = temp_dir.path(); + + let extractor1 = Arc::new(TestPrefixExtractor::new(4, "old_extractor")); + let extractor2 = Arc::new(TestPrefixExtractor::new(4, "new_extractor")); + + // Create first segment with old extractor + { + let tree = Config::new(path, SequenceNumberCounter::default()) + .prefix_extractor(extractor1) + .open()?; + + tree.insert("old_key1", "value1", 0); + tree.insert("old_key2", "value2", 0); + tree.flush_active_memtable(0)?; + } + + // Reopen with new extractor and create new segment + { + let tree = Config::new(path, SequenceNumberCounter::default()) + .prefix_extractor(extractor2) + .open()?; + + // Add data to create a new segment with the new extractor + tree.insert("new_key1", "value3", 1); + tree.insert("new_key2", "value4", 1); + tree.flush_active_memtable(0)?; + + // Test that old segment uses no filtering (extractor incompatible) + #[cfg(feature = "metrics")] + let initial_queries = tree.metrics().filter_queries(); + + // Query old keys - should NOT increment filter queries (incompatible extractor) + assert_eq!(&*tree.get("old_key1", u64::MAX)?.unwrap(), b"value1"); + assert_eq!(&*tree.get("old_key2", u64::MAX)?.unwrap(), b"value2"); + + #[cfg(feature = "metrics")] + let after_old_queries = tree.metrics().filter_queries(); + + // Query new keys - SHOULD increment filter queries (compatible extractor) + assert_eq!(&*tree.get("new_key1", u64::MAX)?.unwrap(), b"value3"); + assert_eq!(&*tree.get("new_key2", u64::MAX)?.unwrap(), b"value4"); + + #[cfg(feature = "metrics")] + { + let final_queries = tree.metrics().filter_queries(); + + // Old keys should not have incremented filter queries + assert_eq!( + after_old_queries, initial_queries, + "Old keys should not increment filter queries due to incompatible extractor" + ); + + // New keys should have incremented filter queries + assert!( + final_queries > after_old_queries, + "New keys should increment filter queries with compatible extractor: {} -> {}", + after_old_queries, + final_queries + ); + } + } + + Ok(()) +} + +#[test] +fn test_multiple_extractor_changes() -> lsm_tree::Result<()> { + let temp_dir = tempfile::tempdir()?; + let path = temp_dir.path(); + + let extractor1 = Arc::new(TestPrefixExtractor::new(2, "v1")); + let extractor2 = Arc::new(TestPrefixExtractor::new(2, "v2")); + let extractor3 = Arc::new(TestPrefixExtractor::new(2, "v3")); + + // Create segments with different extractors over time + { + let tree = Config::new(path, SequenceNumberCounter::default()) + .prefix_extractor(extractor1) + .open()?; + tree.insert("aa_data1", "value1", 0); + tree.flush_active_memtable(0)?; + } + + { + let tree = Config::new(path, SequenceNumberCounter::default()) + .prefix_extractor(extractor2) + .open()?; + tree.insert("bb_data2", "value2", 0); + tree.flush_active_memtable(0)?; + } + + { + let tree = Config::new(path, SequenceNumberCounter::default()) + .prefix_extractor(extractor3) + .open()?; + tree.insert("cc_data3", "value3", 0); + tree.flush_active_memtable(0)?; + + // Only the last segment should use filtering + #[cfg(feature = "metrics")] + let initial_queries = tree.metrics().filter_queries(); + + // These should not increment filter queries (incompatible) + assert_eq!(&*tree.get("aa_data1", u64::MAX)?.unwrap(), b"value1"); + assert_eq!(&*tree.get("bb_data2", u64::MAX)?.unwrap(), b"value2"); + + #[cfg(feature = "metrics")] + let middle_queries = tree.metrics().filter_queries(); + + // This should increment filter queries (compatible) + assert_eq!(&*tree.get("cc_data3", u64::MAX)?.unwrap(), b"value3"); + + #[cfg(feature = "metrics")] + { + let final_queries = tree.metrics().filter_queries(); + assert_eq!( + middle_queries, initial_queries, + "Old segments should not increment metrics" + ); + assert!( + final_queries > middle_queries, + "New segment should increment metrics" + ); + } + } + + Ok(()) +}