Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 13 additions & 0 deletions fvm/src/state_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,7 @@ where
self.hamt.into_store()
}

/// Iterates over each KV in the Hamt and runs a function on the values with cache.
pub fn for_each<F>(&self, mut f: F) -> anyhow::Result<()>
where
F: FnMut(Address, &ActorState) -> anyhow::Result<()>,
Expand All @@ -374,4 +375,16 @@ where
})?;
Ok(())
}

/// Iterates over each KV in the Hamt and runs a function on the values without cache.
pub fn for_each_cacheless<F>(&self, mut f: F) -> anyhow::Result<()>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Docs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

where
F: FnMut(Address, &ActorState) -> anyhow::Result<()>,
{
self.hamt.for_each_cacheless(|k, v| {
let addr = Address::from_bytes(&k.0)?;
f(addr, v)
})?;
Ok(())
}
}
2 changes: 2 additions & 0 deletions ipld/hamt/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ Changes to the reference FVM's HAMT implementation.

## [Unreleased]

- Added `for_each_cacheless` method to iterate over the HAMT without caching the values. This is lowers memory requirements usage and is useful for single-pass, read-only operations over large HAMTs.

## 0.10.4 [2025-04-09]

- Updates multiple dependencies (semver breaking internally but not exported).
Expand Down
1 change: 1 addition & 0 deletions ipld/hamt/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ unsigned-varint = { workspace = true }
quickcheck = { workspace = true }
quickcheck_macros = { workspace = true }
rand = { workspace = true }
itertools = { workspace = true }

[[bench]]
name = "hamt_beckmark"
Expand Down
62 changes: 44 additions & 18 deletions ipld/hamt/benches/hamt_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
use std::hint::black_box;

use criterion::{Criterion, criterion_group, criterion_main};
use fvm_ipld_blockstore::{Blockstore, MemoryBlockstore};
use fvm_ipld_encoding::tuple::*;
use fvm_ipld_hamt::Hamt;

const BIT_WIDTH: u32 = 5;
const ITEM_COUNT: u8 = 40;

// Struct to simulate a reasonable amount of data per value into the amt
Expand Down Expand Up @@ -37,8 +39,8 @@ impl BenchData {
fn insert(c: &mut Criterion) {
c.bench_function("HAMT bulk insert (no flush)", |b| {
b.iter(|| {
let db = fvm_ipld_blockstore::MemoryBlockstore::default();
let mut a = Hamt::<_, _>::new_with_bit_width(&db, 5);
let db = MemoryBlockstore::default();
let mut a = Hamt::<_, _>::new_with_bit_width(&db, BIT_WIDTH);

for i in 0..black_box(ITEM_COUNT) {
a.set(black_box(vec![i; 20].into()), black_box(BenchData::new(i)))
Expand All @@ -51,12 +53,12 @@ fn insert(c: &mut Criterion) {
fn insert_load_flush(c: &mut Criterion) {
c.bench_function("HAMT bulk insert with flushing and loading", |b| {
b.iter(|| {
let db = fvm_ipld_blockstore::MemoryBlockstore::default();
let mut empt = Hamt::<_, ()>::new_with_bit_width(&db, 5);
let db = MemoryBlockstore::default();
let mut empt = Hamt::<_, ()>::new_with_bit_width(&db, BIT_WIDTH);
let mut cid = empt.flush().unwrap();

for i in 0..black_box(ITEM_COUNT) {
let mut a = Hamt::<_, _>::load_with_bit_width(&cid, &db, 5).unwrap();
let mut a = Hamt::<_, _>::load_with_bit_width(&cid, &db, BIT_WIDTH).unwrap();
a.set(black_box(vec![i; 20].into()), black_box(BenchData::new(i)))
.unwrap();
cid = a.flush().unwrap();
Expand All @@ -66,16 +68,13 @@ fn insert_load_flush(c: &mut Criterion) {
}

fn delete(c: &mut Criterion) {
let db = fvm_ipld_blockstore::MemoryBlockstore::default();
let mut a = Hamt::<_, _>::new_with_bit_width(&db, 5);
for i in 0..black_box(ITEM_COUNT) {
a.set(vec![i; 20].into(), BenchData::new(i)).unwrap();
}
let db = MemoryBlockstore::default();
let mut a = setup_hamt(&db);
let cid = a.flush().unwrap();

c.bench_function("HAMT deleting all nodes", |b| {
b.iter(|| {
let mut a = Hamt::<_, BenchData>::load_with_bit_width(&cid, &db, 5).unwrap();
let mut a = Hamt::<_, BenchData>::load_with_bit_width(&cid, &db, BIT_WIDTH).unwrap();
for i in 0..black_box(ITEM_COUNT) {
a.delete(black_box([i; 20].as_ref())).unwrap();
}
Expand All @@ -84,20 +83,47 @@ fn delete(c: &mut Criterion) {
}

fn for_each(c: &mut Criterion) {
let db = fvm_ipld_blockstore::MemoryBlockstore::default();
let mut a = Hamt::<_, _>::new_with_bit_width(&db, 5);
for i in 0..black_box(ITEM_COUNT) {
a.set(vec![i; 20].into(), BenchData::new(i)).unwrap();
}
let db = MemoryBlockstore::default();
let mut a = setup_hamt(&db);
let cid = a.flush().unwrap();

c.bench_function("HAMT for_each function", |b| {
b.iter(|| {
let a = Hamt::<_, _>::load_with_bit_width(&cid, &db, 5).unwrap();
let a = Hamt::<_, _>::load_with_bit_width(&cid, &db, BIT_WIDTH).unwrap();
black_box(a).for_each(|_k, _v: &BenchData| Ok(())).unwrap();
})
});
}

criterion_group!(benches, insert, insert_load_flush, delete, for_each);
fn for_each_cacheless(c: &mut Criterion) {
let db = MemoryBlockstore::default();
let mut a = setup_hamt(&db);
let cid = a.flush().unwrap();

c.bench_function("HAMT for_each_cacheless function", |b| {
b.iter(|| {
let a = Hamt::<_, _>::load_with_bit_width(&cid, &db, BIT_WIDTH).unwrap();
black_box(a)
.for_each_cacheless(|_k, _v: &BenchData| Ok(()))
.unwrap();
})
});
}

fn setup_hamt<BS: Blockstore>(db: &BS) -> Hamt<&BS, BenchData> {
let mut a = Hamt::<_, _>::new_with_bit_width(db, BIT_WIDTH);
for i in 0..ITEM_COUNT {
a.set(vec![i; 20].into(), BenchData::new(i)).unwrap();
}
a
}

criterion_group!(
benches,
insert,
insert_load_flush,
delete,
for_each,
for_each_cacheless
);
criterion_main!(benches);
34 changes: 33 additions & 1 deletion ipld/hamt/src/hamt.rs
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changelog entry?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ where
/// map.set(4, 2).unwrap();
///
/// let mut total = 0;
/// map.for_each(|_, v: &u64| {
/// map.for_each(|_, v| {
/// total += v;
/// Ok(())
/// }).unwrap();
Expand All @@ -382,6 +382,38 @@ where
Ok(())
}

/// Iterates over each KV in the Hamt and runs a function on the values. This is a
/// non-caching version of [`Self::for_each`]. It can potentially be more efficient, especially memory-wise,
/// for large HAMTs or when the iteration occurs only once.
///
/// # Examples
///
/// ```
/// use fvm_ipld_hamt::Hamt;
///
/// let store = fvm_ipld_blockstore::MemoryBlockstore::default();
///
/// let mut map: Hamt<_, _, usize> = Hamt::new(store);
/// map.set(1, 1).unwrap();
/// map.set(4, 2).unwrap();
///
/// let mut total = 0;
/// map.for_each_cacheless(|_, v| {
/// total += v;
/// Ok(())
/// }).unwrap();
/// assert_eq!(total, 3);
/// ```
pub fn for_each_cacheless<F>(&self, mut f: F) -> Result<(), Error>
where
K: Clone,
V: DeserializeOwned + Clone,
F: FnMut(&K, &V) -> anyhow::Result<()>,
{
self.root
.for_each_cacheless(&self.store, &self.conf, &mut f)
}

/// Iterates over each KV in the Hamt and runs a function on the values. If starting key is
/// provided, iteration will start from that key. If max is provided, iteration will stop after
/// max number of items have been traversed. The number of items that were traversed is
Expand Down
4 changes: 2 additions & 2 deletions ipld/hamt/src/hash_algorithm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,9 @@ pub enum Identity {}

#[cfg(feature = "identity")]
impl HashAlgorithm for Identity {
fn hash<X: ?Sized>(key: &X) -> HashedKey
fn hash<X>(key: &X) -> HashedKey
where
X: Hash,
X: Hash + ?Sized,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To fix a clippy warning

{
let mut ident_hasher = IdentityHasher::default();
key.hash(&mut ident_hasher);
Expand Down
2 changes: 1 addition & 1 deletion ipld/hamt/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ impl Default for Config {

type HashedKey = [u8; 32];

#[derive(Debug, Serialize, Deserialize, PartialEq)]
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
struct KeyValuePair<K, V>(K, V);

impl<K, V> KeyValuePair<K, V> {
Expand Down
96 changes: 96 additions & 0 deletions ipld/hamt/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,20 @@ pub(crate) struct Node<K, V, H, Ver = version::V3> {
hash: PhantomData<H>,
}

impl<K, V, H, Ver> Clone for Node<K, V, H, Ver>
where
K: Clone,
V: Clone,
{
fn clone(&self) -> Self {
Self {
bitfield: self.bitfield,
pointers: self.pointers.clone(),
hash: Default::default(),
}
}
}

impl<K: PartialEq, V: PartialEq, H, Ver> PartialEq for Node<K, V, H, Ver> {
fn eq(&self, other: &Self) -> bool {
(self.bitfield == other.bitfield) && (self.pointers == other.pointers)
Expand Down Expand Up @@ -206,6 +220,88 @@ where
self.pointers.is_empty()
}

/// Non-caching iteration over the values in the node.
pub(super) fn for_each_cacheless<S, F>(
&self,
bs: &S,
conf: &Config,
f: &mut F,
) -> Result<(), Error>
where
F: FnMut(&K, &V) -> anyhow::Result<()>,
S: Blockstore,
K: Clone,
V: Clone,
{
enum IterItem<'a, T> {
Borrowed(&'a T),
Owned(T),
}

enum StackItem<'a, T> {
Iter(std::slice::Iter<'a, T>),
IntoIter(std::vec::IntoIter<T>),
}

impl<'a, V> From<std::slice::Iter<'a, V>> for StackItem<'a, V> {
fn from(value: std::slice::Iter<'a, V>) -> Self {
Self::Iter(value)
}
}

impl<V> From<std::vec::IntoIter<V>> for StackItem<'_, V> {
fn from(value: std::vec::IntoIter<V>) -> Self {
Self::IntoIter(value)
}
}

impl<'a, V> Iterator for StackItem<'a, V> {
type Item = IterItem<'a, V>;

fn next(&mut self) -> Option<Self::Item> {
match self {
Self::Iter(it) => it.next().map(IterItem::Borrowed),
Self::IntoIter(it) => it.next().map(IterItem::Owned),
}
}
}

let mut stack: Vec<StackItem<_>> = vec![self.pointers.iter().into()];
loop {
let Some(pointers) = stack.last_mut() else {
return Ok(());
};
let Some(pointer) = pointers.next() else {
stack.pop();
continue;
};
match pointer {
IterItem::Borrowed(Pointer::Link { cid, cache: _ }) => {
let node = Node::load(conf, bs, cid, stack.len() as u32)?;
stack.push(node.pointers.into_iter().into())
}
IterItem::Owned(Pointer::Link { cid, cache: _ }) => {
let node = Node::load(conf, bs, &cid, stack.len() as u32)?;
stack.push(node.pointers.into_iter().into())
}
IterItem::Borrowed(Pointer::Dirty(node)) => stack.push(node.pointers.iter().into()),
IterItem::Owned(Pointer::Dirty(node)) => {
stack.push(node.pointers.into_iter().into())
}
IterItem::Borrowed(Pointer::Values(kvs)) => {
for kv in kvs.iter() {
f(kv.key(), kv.value())?;
}
}
IterItem::Owned(Pointer::Values(kvs)) => {
for kv in kvs.iter() {
f(kv.key(), kv.value())?;
}
}
}
}
}

/// Search for a key.
fn search<Q, S: Blockstore>(
&self,
Expand Down
17 changes: 17 additions & 0 deletions ipld/hamt/src/pointer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,23 @@ pub(crate) enum Pointer<K, V, H, Ver = version::V3> {
Dirty(Box<Node<K, V, H, Ver>>),
}

impl<K, V, H, Ver> Clone for Pointer<K, V, H, Ver>
where
K: Clone,
V: Clone,
{
fn clone(&self) -> Self {
match self {
Self::Values(v) => Self::Values(v.clone()),
Self::Link { cid, cache: _ } => Self::Link {
cid: *cid,
cache: Default::default(),
},
Self::Dirty(n) => Self::Dirty(n.clone()),
}
}
}

impl<K: PartialEq, V: PartialEq, H, Ver> PartialEq for Pointer<K, V, H, Ver> {
fn eq(&self, other: &Self) -> bool {
match (self, other) {
Expand Down
Loading
Loading