From 478e69649a311833dd44148a0e4982ac5a8e3d73 Mon Sep 17 00:00:00 2001 From: Guillaume Endignoux Date: Fri, 10 Oct 2025 15:48:37 +0200 Subject: [PATCH 1/5] WIP: Add HashSet::par_iter and HashMap::par_iter(_mut) integration with paralight. --- Cargo.toml | 3 +- src/external_trait_impls/mod.rs | 2 + src/external_trait_impls/paralight.rs | 257 ++++++++++++++++++++++++++ 3 files changed, 261 insertions(+), 1 deletion(-) create mode 100644 src/external_trait_impls/paralight.rs diff --git a/Cargo.toml b/Cargo.toml index 45e433ffff..484314994e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,6 +17,7 @@ rust-version = "1.65.0" foldhash = { version = "0.2.0", default-features = false, optional = true } # For external trait impls +paralight = { version = "0.0.6", optional = true } rayon = { version = "1.9.0", optional = true } serde_core = { version = "1.0.221", default-features = false, optional = true } @@ -85,5 +86,5 @@ default-hasher = ["dep:foldhash"] inline-more = [] [package.metadata.docs.rs] -features = ["nightly", "rayon", "serde", "raw-entry"] +features = ["nightly", "paralight", "rayon", "serde", "raw-entry"] rustdoc-args = ["--generate-link-to-definition"] diff --git a/src/external_trait_impls/mod.rs b/src/external_trait_impls/mod.rs index ef497836cb..cfea0ffeff 100644 --- a/src/external_trait_impls/mod.rs +++ b/src/external_trait_impls/mod.rs @@ -1,3 +1,5 @@ +#[cfg(feature = "paralight")] +mod paralight; #[cfg(feature = "rayon")] pub(crate) mod rayon; #[cfg(feature = "serde")] diff --git a/src/external_trait_impls/paralight.rs b/src/external_trait_impls/paralight.rs new file mode 100644 index 0000000000..592226f7cc --- /dev/null +++ b/src/external_trait_impls/paralight.rs @@ -0,0 +1,257 @@ +use crate::raw::RawTable; +use crate::{HashMap, HashSet}; +use paralight::iter::{ + IntoParallelRefMutSource, IntoParallelRefSource, ParallelSource, SourceCleanup, + SourceDescriptor, +}; + +impl<'data, T: Sync + 'data> IntoParallelRefSource<'data> for HashSet { + type Item = Option<&'data T>; + type Source = HashSetRefParallelSource<'data, T>; + + fn par_iter(&'data self) -> Self::Source { + HashSetRefParallelSource { hash_set: self } + } +} + +#[must_use = "iterator adaptors are lazy"] +pub struct HashSetRefParallelSource<'data, T> { + hash_set: &'data HashSet, +} + +impl<'data, T: Sync> ParallelSource for HashSetRefParallelSource<'data, T> { + type Item = Option<&'data T>; + + fn descriptor(self) -> impl SourceDescriptor + Sync { + HashSetRefSourceDescriptor { + table: &self.hash_set.map.table, + } + } +} + +struct HashSetRefSourceDescriptor<'data, T: Sync> { + table: &'data RawTable<(T, ())>, +} + +impl SourceCleanup for HashSetRefSourceDescriptor<'_, T> { + const NEEDS_CLEANUP: bool = false; + + unsafe fn cleanup_item_range(&self, _range: core::ops::Range) { + // Nothing to cleanup + } +} + +impl<'data, T: Sync> SourceDescriptor for HashSetRefSourceDescriptor<'data, T> { + type Item = Option<&'data T>; + + fn len(&self) -> usize { + self.table.buckets() + } + + unsafe fn fetch_item(&self, index: usize) -> Self::Item { + // SAFETY: TODO + let full = unsafe { self.table.is_bucket_full(index) }; + if full { + // SAFETY: TODO + let bucket = unsafe { self.table.bucket(index) }; + let (t, ()) = unsafe { bucket.as_ref() }; + Some(t) + } else { + None + } + } +} + +impl<'data, K: Sync + 'data, V: Sync + 'data> IntoParallelRefSource<'data> for HashMap { + type Item = Option<&'data (K, V)>; + type Source = HashMapRefParallelSource<'data, K, V>; + + fn par_iter(&'data self) -> Self::Source { + HashMapRefParallelSource { hash_map: self } + } +} + +#[must_use = "iterator adaptors are lazy"] +pub struct HashMapRefParallelSource<'data, K, V> { + hash_map: &'data HashMap, +} + +impl<'data, K: Sync, V: Sync> ParallelSource for HashMapRefParallelSource<'data, K, V> { + type Item = Option<&'data (K, V)>; + + fn descriptor(self) -> impl SourceDescriptor + Sync { + HashMapRefSourceDescriptor { + table: &self.hash_map.table, + } + } +} + +struct HashMapRefSourceDescriptor<'data, K: Sync, V: Sync> { + table: &'data RawTable<(K, V)>, +} + +impl SourceCleanup for HashMapRefSourceDescriptor<'_, K, V> { + const NEEDS_CLEANUP: bool = false; + + unsafe fn cleanup_item_range(&self, _range: core::ops::Range) { + // Nothing to cleanup + } +} + +impl<'data, K: Sync, V: Sync> SourceDescriptor for HashMapRefSourceDescriptor<'data, K, V> { + type Item = Option<&'data (K, V)>; + + fn len(&self) -> usize { + self.table.buckets() + } + + unsafe fn fetch_item(&self, index: usize) -> Self::Item { + // SAFETY: TODO + let full = unsafe { self.table.is_bucket_full(index) }; + if full { + // SAFETY: TODO + let bucket = unsafe { self.table.bucket(index) }; + unsafe { Some(bucket.as_ref()) } + } else { + None + } + } +} + +// TODO: Remove Sync requirement on V. +impl<'data, K: Sync + 'data, V: Send + Sync + 'data> IntoParallelRefMutSource<'data> + for HashMap +{ + type Item = Option<(&'data K, &'data mut V)>; + type Source = HashMapRefMutParallelSource<'data, K, V>; + + fn par_iter_mut(&'data mut self) -> Self::Source { + HashMapRefMutParallelSource { hash_map: self } + } +} + +#[must_use = "iterator adaptors are lazy"] +pub struct HashMapRefMutParallelSource<'data, K, V> { + hash_map: &'data mut HashMap, +} + +impl<'data, K: Sync, V: Send + Sync> ParallelSource for HashMapRefMutParallelSource<'data, K, V> { + type Item = Option<(&'data K, &'data mut V)>; + + fn descriptor(self) -> impl SourceDescriptor + Sync { + HashMapRefMutSourceDescriptor { + table: &self.hash_map.table, + } + } +} + +struct HashMapRefMutSourceDescriptor<'data, K: Sync, V: Send + Sync> { + table: &'data RawTable<(K, V)>, +} + +impl SourceCleanup for HashMapRefMutSourceDescriptor<'_, K, V> { + const NEEDS_CLEANUP: bool = false; + + unsafe fn cleanup_item_range(&self, _range: core::ops::Range) { + // Nothing to cleanup + } +} + +impl<'data, K: Sync, V: Send + Sync> SourceDescriptor + for HashMapRefMutSourceDescriptor<'data, K, V> +{ + type Item = Option<(&'data K, &'data mut V)>; + + fn len(&self) -> usize { + self.table.buckets() + } + + unsafe fn fetch_item(&self, index: usize) -> Self::Item { + // SAFETY: TODO + let full = unsafe { self.table.is_bucket_full(index) }; + if full { + // SAFETY: TODO + let bucket = unsafe { self.table.bucket(index) }; + // SAFETY: TODO + let (key, value) = unsafe { bucket.as_mut() }; + Some((key, value)) + } else { + None + } + } +} + +#[cfg(test)] +mod test { + use super::*; + use alloc::boxed::Box; + use core::ops::Deref; + use paralight::iter::{ParallelIteratorExt, ParallelSourceExt}; + use paralight::{CpuPinningPolicy, RangeStrategy, ThreadCount, ThreadPoolBuilder}; + + #[test] + fn test_set_par_iter() { + let mut set = HashSet::new(); + for i in 1..=42 { + set.insert(Box::new(i)); + } + + let mut thread_pool = ThreadPoolBuilder { + num_threads: ThreadCount::AvailableParallelism, + range_strategy: RangeStrategy::WorkStealing, + cpu_pinning: CpuPinningPolicy::No, + } + .build(); + + let sum = set + .par_iter() + .with_thread_pool(&mut thread_pool) + .filter_map(|x| x.map(|y| y.deref())) + .sum::(); + assert_eq!(sum, 21 * 43); + } + + #[test] + fn test_map_par_iter() { + let mut map = HashMap::new(); + for i in 1..=42 { + map.insert(Box::new(i), Box::new(i * i)); + } + + let mut thread_pool = ThreadPoolBuilder { + num_threads: ThreadCount::AvailableParallelism, + range_strategy: RangeStrategy::WorkStealing, + cpu_pinning: CpuPinningPolicy::No, + } + .build(); + + map.par_iter() + .with_thread_pool(&mut thread_pool) + .filter_map(|x| x) + .for_each(|(k, v)| assert_eq!(**k * **k, **v)); + } + + #[test] + fn test_map_par_iter_mut() { + let mut map = HashMap::new(); + for i in 1..=42 { + map.insert(Box::new(i), Box::new(i)); + } + + let mut thread_pool = ThreadPoolBuilder { + num_threads: ThreadCount::AvailableParallelism, + range_strategy: RangeStrategy::WorkStealing, + cpu_pinning: CpuPinningPolicy::No, + } + .build(); + + map.par_iter_mut() + .with_thread_pool(&mut thread_pool) + .filter_map(|x| x) + .for_each(|(k, v)| **v *= **k); + + for (k, v) in map.iter() { + assert_eq!(**k * **k, **v); + } + } +} From 967d88080138e8e61f52235278ab78574a3eb067 Mon Sep 17 00:00:00 2001 From: Guillaume Endignoux Date: Fri, 10 Oct 2025 15:54:42 +0200 Subject: [PATCH 2/5] WIP: Support non-default hasher and allocator parameters. --- src/external_trait_impls/paralight.rs | 77 ++++++++++++++++----------- 1 file changed, 47 insertions(+), 30 deletions(-) diff --git a/src/external_trait_impls/paralight.rs b/src/external_trait_impls/paralight.rs index 592226f7cc..065be5458e 100644 --- a/src/external_trait_impls/paralight.rs +++ b/src/external_trait_impls/paralight.rs @@ -1,13 +1,16 @@ -use crate::raw::RawTable; +use crate::raw::{Allocator, RawTable}; use crate::{HashMap, HashSet}; use paralight::iter::{ IntoParallelRefMutSource, IntoParallelRefSource, ParallelSource, SourceCleanup, SourceDescriptor, }; -impl<'data, T: Sync + 'data> IntoParallelRefSource<'data> for HashSet { +// HashSet.par_iter() +impl<'data, T: Sync + 'data, S: 'data, A: Allocator + Sync + 'data> IntoParallelRefSource<'data> + for HashSet +{ type Item = Option<&'data T>; - type Source = HashSetRefParallelSource<'data, T>; + type Source = HashSetRefParallelSource<'data, T, S, A>; fn par_iter(&'data self) -> Self::Source { HashSetRefParallelSource { hash_set: self } @@ -15,11 +18,13 @@ impl<'data, T: Sync + 'data> IntoParallelRefSource<'data> for HashSet { } #[must_use = "iterator adaptors are lazy"] -pub struct HashSetRefParallelSource<'data, T> { - hash_set: &'data HashSet, +pub struct HashSetRefParallelSource<'data, T, S, A: Allocator> { + hash_set: &'data HashSet, } -impl<'data, T: Sync> ParallelSource for HashSetRefParallelSource<'data, T> { +impl<'data, T: Sync, S, A: Allocator + Sync> ParallelSource + for HashSetRefParallelSource<'data, T, S, A> +{ type Item = Option<&'data T>; fn descriptor(self) -> impl SourceDescriptor + Sync { @@ -29,11 +34,11 @@ impl<'data, T: Sync> ParallelSource for HashSetRefParallelSource<'data, T> { } } -struct HashSetRefSourceDescriptor<'data, T: Sync> { - table: &'data RawTable<(T, ())>, +struct HashSetRefSourceDescriptor<'data, T: Sync, A: Allocator> { + table: &'data RawTable<(T, ()), A>, } -impl SourceCleanup for HashSetRefSourceDescriptor<'_, T> { +impl SourceCleanup for HashSetRefSourceDescriptor<'_, T, A> { const NEEDS_CLEANUP: bool = false; unsafe fn cleanup_item_range(&self, _range: core::ops::Range) { @@ -41,7 +46,7 @@ impl SourceCleanup for HashSetRefSourceDescriptor<'_, T> { } } -impl<'data, T: Sync> SourceDescriptor for HashSetRefSourceDescriptor<'data, T> { +impl<'data, T: Sync, A: Allocator> SourceDescriptor for HashSetRefSourceDescriptor<'data, T, A> { type Item = Option<&'data T>; fn len(&self) -> usize { @@ -62,9 +67,12 @@ impl<'data, T: Sync> SourceDescriptor for HashSetRefSourceDescriptor<'data, T> { } } -impl<'data, K: Sync + 'data, V: Sync + 'data> IntoParallelRefSource<'data> for HashMap { +// HashMap.par_iter() +impl<'data, K: Sync + 'data, V: Sync + 'data, S: 'data, A: Allocator + Sync + 'data> + IntoParallelRefSource<'data> for HashMap +{ type Item = Option<&'data (K, V)>; - type Source = HashMapRefParallelSource<'data, K, V>; + type Source = HashMapRefParallelSource<'data, K, V, S, A>; fn par_iter(&'data self) -> Self::Source { HashMapRefParallelSource { hash_map: self } @@ -72,11 +80,13 @@ impl<'data, K: Sync + 'data, V: Sync + 'data> IntoParallelRefSource<'data> for H } #[must_use = "iterator adaptors are lazy"] -pub struct HashMapRefParallelSource<'data, K, V> { - hash_map: &'data HashMap, +pub struct HashMapRefParallelSource<'data, K, V, S, A: Allocator> { + hash_map: &'data HashMap, } -impl<'data, K: Sync, V: Sync> ParallelSource for HashMapRefParallelSource<'data, K, V> { +impl<'data, K: Sync, V: Sync, S, A: Allocator + Sync> ParallelSource + for HashMapRefParallelSource<'data, K, V, S, A> +{ type Item = Option<&'data (K, V)>; fn descriptor(self) -> impl SourceDescriptor + Sync { @@ -86,11 +96,11 @@ impl<'data, K: Sync, V: Sync> ParallelSource for HashMapRefParallelSource<'data, } } -struct HashMapRefSourceDescriptor<'data, K: Sync, V: Sync> { - table: &'data RawTable<(K, V)>, +struct HashMapRefSourceDescriptor<'data, K: Sync, V: Sync, A: Allocator> { + table: &'data RawTable<(K, V), A>, } -impl SourceCleanup for HashMapRefSourceDescriptor<'_, K, V> { +impl SourceCleanup for HashMapRefSourceDescriptor<'_, K, V, A> { const NEEDS_CLEANUP: bool = false; unsafe fn cleanup_item_range(&self, _range: core::ops::Range) { @@ -98,7 +108,9 @@ impl SourceCleanup for HashMapRefSourceDescriptor<'_, K, V> { } } -impl<'data, K: Sync, V: Sync> SourceDescriptor for HashMapRefSourceDescriptor<'data, K, V> { +impl<'data, K: Sync, V: Sync, A: Allocator> SourceDescriptor + for HashMapRefSourceDescriptor<'data, K, V, A> +{ type Item = Option<&'data (K, V)>; fn len(&self) -> usize { @@ -118,12 +130,13 @@ impl<'data, K: Sync, V: Sync> SourceDescriptor for HashMapRefSourceDescriptor<'d } } +// HashMap.par_iter_mut() // TODO: Remove Sync requirement on V. -impl<'data, K: Sync + 'data, V: Send + Sync + 'data> IntoParallelRefMutSource<'data> - for HashMap +impl<'data, K: Sync + 'data, V: Send + Sync + 'data, S: 'data, A: Allocator + Sync + 'data> + IntoParallelRefMutSource<'data> for HashMap { type Item = Option<(&'data K, &'data mut V)>; - type Source = HashMapRefMutParallelSource<'data, K, V>; + type Source = HashMapRefMutParallelSource<'data, K, V, S, A>; fn par_iter_mut(&'data mut self) -> Self::Source { HashMapRefMutParallelSource { hash_map: self } @@ -131,11 +144,13 @@ impl<'data, K: Sync + 'data, V: Send + Sync + 'data> IntoParallelRefMutSource<'d } #[must_use = "iterator adaptors are lazy"] -pub struct HashMapRefMutParallelSource<'data, K, V> { - hash_map: &'data mut HashMap, +pub struct HashMapRefMutParallelSource<'data, K, V, S, A: Allocator> { + hash_map: &'data mut HashMap, } -impl<'data, K: Sync, V: Send + Sync> ParallelSource for HashMapRefMutParallelSource<'data, K, V> { +impl<'data, K: Sync, V: Send + Sync, S, A: Allocator + Sync> ParallelSource + for HashMapRefMutParallelSource<'data, K, V, S, A> +{ type Item = Option<(&'data K, &'data mut V)>; fn descriptor(self) -> impl SourceDescriptor + Sync { @@ -145,11 +160,13 @@ impl<'data, K: Sync, V: Send + Sync> ParallelSource for HashMapRefMutParallelSou } } -struct HashMapRefMutSourceDescriptor<'data, K: Sync, V: Send + Sync> { - table: &'data RawTable<(K, V)>, +struct HashMapRefMutSourceDescriptor<'data, K: Sync, V: Send + Sync, A: Allocator> { + table: &'data RawTable<(K, V), A>, } -impl SourceCleanup for HashMapRefMutSourceDescriptor<'_, K, V> { +impl SourceCleanup + for HashMapRefMutSourceDescriptor<'_, K, V, A> +{ const NEEDS_CLEANUP: bool = false; unsafe fn cleanup_item_range(&self, _range: core::ops::Range) { @@ -157,8 +174,8 @@ impl SourceCleanup for HashMapRefMutSourceDescriptor<'_ } } -impl<'data, K: Sync, V: Send + Sync> SourceDescriptor - for HashMapRefMutSourceDescriptor<'data, K, V> +impl<'data, K: Sync, V: Send + Sync, A: Allocator> SourceDescriptor + for HashMapRefMutSourceDescriptor<'data, K, V, A> { type Item = Option<(&'data K, &'data mut V)>; From ab9bd683729006aeac4efa3aefa68456a32bf9ac Mon Sep 17 00:00:00 2001 From: Guillaume Endignoux Date: Sat, 11 Oct 2025 19:54:23 +0200 Subject: [PATCH 3/5] Add debug assertions and missing SAFETY:TODO comments. --- src/external_trait_impls/paralight.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/external_trait_impls/paralight.rs b/src/external_trait_impls/paralight.rs index 065be5458e..d0c74ab0b2 100644 --- a/src/external_trait_impls/paralight.rs +++ b/src/external_trait_impls/paralight.rs @@ -54,11 +54,13 @@ impl<'data, T: Sync, A: Allocator> SourceDescriptor for HashSetRefSourceDescript } unsafe fn fetch_item(&self, index: usize) -> Self::Item { + debug_assert!(index < self.len()); // SAFETY: TODO let full = unsafe { self.table.is_bucket_full(index) }; if full { // SAFETY: TODO let bucket = unsafe { self.table.bucket(index) }; + // SAFETY: TODO let (t, ()) = unsafe { bucket.as_ref() }; Some(t) } else { @@ -118,11 +120,13 @@ impl<'data, K: Sync, V: Sync, A: Allocator> SourceDescriptor } unsafe fn fetch_item(&self, index: usize) -> Self::Item { + debug_assert!(index < self.len()); // SAFETY: TODO let full = unsafe { self.table.is_bucket_full(index) }; if full { // SAFETY: TODO let bucket = unsafe { self.table.bucket(index) }; + // SAFETY: TODO unsafe { Some(bucket.as_ref()) } } else { None @@ -184,6 +188,7 @@ impl<'data, K: Sync, V: Send + Sync, A: Allocator> SourceDescriptor } unsafe fn fetch_item(&self, index: usize) -> Self::Item { + debug_assert!(index < self.len()); // SAFETY: TODO let full = unsafe { self.table.is_bucket_full(index) }; if full { From 9e6c86963e16f0339fa9c730a174af491940e38c Mon Sep 17 00:00:00 2001 From: Guillaume Endignoux Date: Sat, 11 Oct 2025 19:57:55 +0200 Subject: [PATCH 4/5] Implement IntoParallelSource for hash tables. --- src/external_trait_impls/paralight.rs | 223 +++++++++++++++++++++++++- 1 file changed, 221 insertions(+), 2 deletions(-) diff --git a/src/external_trait_impls/paralight.rs b/src/external_trait_impls/paralight.rs index d0c74ab0b2..2c6f0c746e 100644 --- a/src/external_trait_impls/paralight.rs +++ b/src/external_trait_impls/paralight.rs @@ -1,8 +1,8 @@ use crate::raw::{Allocator, RawTable}; use crate::{HashMap, HashSet}; use paralight::iter::{ - IntoParallelRefMutSource, IntoParallelRefSource, ParallelSource, SourceCleanup, - SourceDescriptor, + IntoParallelRefMutSource, IntoParallelRefSource, IntoParallelSource, ParallelSource, + SourceCleanup, SourceDescriptor, }; // HashSet.par_iter() @@ -203,6 +203,183 @@ impl<'data, K: Sync, V: Send + Sync, A: Allocator> SourceDescriptor } } +// HashSet.into_par_iter() +// TODO: Remove Sync requirement on T. +impl IntoParallelSource for HashSet { + type Item = Option; + type Source = HashSetParallelSource; + + fn into_par_iter(self) -> Self::Source { + HashSetParallelSource { hash_set: self } + } +} + +#[must_use = "iterator adaptors are lazy"] +pub struct HashSetParallelSource { + hash_set: HashSet, +} + +impl ParallelSource for HashSetParallelSource { + type Item = Option; + + fn descriptor(self) -> impl SourceDescriptor + Sync { + HashSetSourceDescriptor { + table: self.hash_set.map.table, + } + } +} + +struct HashSetSourceDescriptor { + table: RawTable<(T, ()), A>, +} + +impl SourceCleanup for HashSetSourceDescriptor { + const NEEDS_CLEANUP: bool = core::mem::needs_drop::(); + + unsafe fn cleanup_item_range(&self, range: core::ops::Range) { + if Self::NEEDS_CLEANUP { + debug_assert!(range.start <= range.end); + debug_assert!(range.start <= self.len()); + debug_assert!(range.end <= self.len()); + for index in range { + // SAFETY: TODO + let full = unsafe { self.table.is_bucket_full(index) }; + if full { + // SAFETY: TODO + let bucket = unsafe { self.table.bucket(index) }; + // SAFETY: TODO + let (t, ()) = unsafe { bucket.read() }; + drop(t); + } + } + } + } +} + +impl SourceDescriptor for HashSetSourceDescriptor { + type Item = Option; + + fn len(&self) -> usize { + self.table.buckets() + } + + unsafe fn fetch_item(&self, index: usize) -> Self::Item { + debug_assert!(index < self.len()); + // SAFETY: TODO + let full = unsafe { self.table.is_bucket_full(index) }; + if full { + // SAFETY: TODO + let bucket = unsafe { self.table.bucket(index) }; + // SAFETY: TODO + let (t, ()) = unsafe { bucket.read() }; + Some(t) + } else { + None + } + } +} + +impl Drop for HashSetSourceDescriptor { + fn drop(&mut self) { + // Paralight already dropped each missing bucket via calls to cleanup_item_range(), so we + // can simply mark all buckets as cleared and let the RawTable destructor do the rest. + // TODO: Optimize this to simply deallocate without touching the control bytes. + self.table.clear_no_drop(); + } +} + +// HashMap.into_par_iter() +// TODO: Remove Sync requirement on K and V. +impl IntoParallelSource + for HashMap +{ + type Item = Option<(K, V)>; + type Source = HashMapParallelSource; + + fn into_par_iter(self) -> Self::Source { + HashMapParallelSource { hash_map: self } + } +} + +#[must_use = "iterator adaptors are lazy"] +pub struct HashMapParallelSource { + hash_map: HashMap, +} + +impl ParallelSource + for HashMapParallelSource +{ + type Item = Option<(K, V)>; + + fn descriptor(self) -> impl SourceDescriptor + Sync { + HashMapSourceDescriptor { + table: self.hash_map.table, + } + } +} + +struct HashMapSourceDescriptor { + table: RawTable<(K, V), A>, +} + +impl SourceCleanup + for HashMapSourceDescriptor +{ + const NEEDS_CLEANUP: bool = core::mem::needs_drop::<(K, V)>(); + + unsafe fn cleanup_item_range(&self, range: core::ops::Range) { + if Self::NEEDS_CLEANUP { + debug_assert!(range.start <= range.end); + debug_assert!(range.start <= self.len()); + debug_assert!(range.end <= self.len()); + for index in range { + // SAFETY: TODO + let full = unsafe { self.table.is_bucket_full(index) }; + if full { + // SAFETY: TODO + let bucket = unsafe { self.table.bucket(index) }; + // SAFETY: TODO + let key_value = unsafe { bucket.read() }; + drop(key_value); + } + } + } + } +} + +impl SourceDescriptor + for HashMapSourceDescriptor +{ + type Item = Option<(K, V)>; + + fn len(&self) -> usize { + self.table.buckets() + } + + unsafe fn fetch_item(&self, index: usize) -> Self::Item { + debug_assert!(index < self.len()); + // SAFETY: TODO + let full = unsafe { self.table.is_bucket_full(index) }; + if full { + // SAFETY: TODO + let bucket = unsafe { self.table.bucket(index) }; + // SAFETY: TODO + unsafe { Some(bucket.read()) } + } else { + None + } + } +} + +impl Drop for HashMapSourceDescriptor { + fn drop(&mut self) { + // Paralight already dropped each missing bucket via calls to cleanup_item_range(), so we + // can simply mark all buckets as cleared and let the RawTable destructor do the rest. + // TODO: Optimize this to simply deallocate without touching the control bytes. + self.table.clear_no_drop(); + } +} + #[cfg(test)] mod test { use super::*; @@ -233,6 +410,28 @@ mod test { assert_eq!(sum, 21 * 43); } + #[test] + fn test_set_into_par_iter() { + let mut set = HashSet::new(); + for i in 1..=42 { + set.insert(Box::new(i)); + } + + let mut thread_pool = ThreadPoolBuilder { + num_threads: ThreadCount::AvailableParallelism, + range_strategy: RangeStrategy::WorkStealing, + cpu_pinning: CpuPinningPolicy::No, + } + .build(); + + let sum = set + .into_par_iter() + .with_thread_pool(&mut thread_pool) + .filter_map(|x| x.map(|y| *y)) + .sum::(); + assert_eq!(sum, 21 * 43); + } + #[test] fn test_map_par_iter() { let mut map = HashMap::new(); @@ -276,4 +475,24 @@ mod test { assert_eq!(**k * **k, **v); } } + + #[test] + fn test_map_into_par_iter() { + let mut map = HashMap::new(); + for i in 1..=42 { + map.insert(Box::new(i), Box::new(i * i)); + } + + let mut thread_pool = ThreadPoolBuilder { + num_threads: ThreadCount::AvailableParallelism, + range_strategy: RangeStrategy::WorkStealing, + cpu_pinning: CpuPinningPolicy::No, + } + .build(); + + map.into_par_iter() + .with_thread_pool(&mut thread_pool) + .filter_map(|x| x) + .for_each(|(k, v)| assert_eq!(*k * *k, *v)); + } } From 30ded5c55d0661e71641ea0abcc00849afa79ee8 Mon Sep 17 00:00:00 2001 From: Guillaume Endignoux Date: Sat, 11 Oct 2025 20:17:42 +0200 Subject: [PATCH 5/5] Move thread pool building at the start of the tests. --- src/external_trait_impls/paralight.rs | 50 +++++++++++++-------------- 1 file changed, 25 insertions(+), 25 deletions(-) diff --git a/src/external_trait_impls/paralight.rs b/src/external_trait_impls/paralight.rs index 2c6f0c746e..ce01ca5076 100644 --- a/src/external_trait_impls/paralight.rs +++ b/src/external_trait_impls/paralight.rs @@ -390,11 +390,6 @@ mod test { #[test] fn test_set_par_iter() { - let mut set = HashSet::new(); - for i in 1..=42 { - set.insert(Box::new(i)); - } - let mut thread_pool = ThreadPoolBuilder { num_threads: ThreadCount::AvailableParallelism, range_strategy: RangeStrategy::WorkStealing, @@ -402,6 +397,11 @@ mod test { } .build(); + let mut set = HashSet::new(); + for i in 1..=42 { + set.insert(Box::new(i)); + } + let sum = set .par_iter() .with_thread_pool(&mut thread_pool) @@ -412,11 +412,6 @@ mod test { #[test] fn test_set_into_par_iter() { - let mut set = HashSet::new(); - for i in 1..=42 { - set.insert(Box::new(i)); - } - let mut thread_pool = ThreadPoolBuilder { num_threads: ThreadCount::AvailableParallelism, range_strategy: RangeStrategy::WorkStealing, @@ -424,6 +419,11 @@ mod test { } .build(); + let mut set = HashSet::new(); + for i in 1..=42 { + set.insert(Box::new(i)); + } + let sum = set .into_par_iter() .with_thread_pool(&mut thread_pool) @@ -434,11 +434,6 @@ mod test { #[test] fn test_map_par_iter() { - let mut map = HashMap::new(); - for i in 1..=42 { - map.insert(Box::new(i), Box::new(i * i)); - } - let mut thread_pool = ThreadPoolBuilder { num_threads: ThreadCount::AvailableParallelism, range_strategy: RangeStrategy::WorkStealing, @@ -446,6 +441,11 @@ mod test { } .build(); + let mut map = HashMap::new(); + for i in 1..=42 { + map.insert(Box::new(i), Box::new(i * i)); + } + map.par_iter() .with_thread_pool(&mut thread_pool) .filter_map(|x| x) @@ -454,11 +454,6 @@ mod test { #[test] fn test_map_par_iter_mut() { - let mut map = HashMap::new(); - for i in 1..=42 { - map.insert(Box::new(i), Box::new(i)); - } - let mut thread_pool = ThreadPoolBuilder { num_threads: ThreadCount::AvailableParallelism, range_strategy: RangeStrategy::WorkStealing, @@ -466,6 +461,11 @@ mod test { } .build(); + let mut map = HashMap::new(); + for i in 1..=42 { + map.insert(Box::new(i), Box::new(i)); + } + map.par_iter_mut() .with_thread_pool(&mut thread_pool) .filter_map(|x| x) @@ -478,11 +478,6 @@ mod test { #[test] fn test_map_into_par_iter() { - let mut map = HashMap::new(); - for i in 1..=42 { - map.insert(Box::new(i), Box::new(i * i)); - } - let mut thread_pool = ThreadPoolBuilder { num_threads: ThreadCount::AvailableParallelism, range_strategy: RangeStrategy::WorkStealing, @@ -490,6 +485,11 @@ mod test { } .build(); + let mut map = HashMap::new(); + for i in 1..=42 { + map.insert(Box::new(i), Box::new(i * i)); + } + map.into_par_iter() .with_thread_pool(&mut thread_pool) .filter_map(|x| x)