diff --git a/.cargo/config.toml b/.cargo/config.toml new file mode 100644 index 00000000..226dec96 --- /dev/null +++ b/.cargo/config.toml @@ -0,0 +1,3 @@ +[target.wasm32-unknown-unknown] +runner = "wasm-bindgen-test-runner" +rustflags = ['--cfg', 'getrandom_backend="wasm_js"'] diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index f1df4405..94dfa960 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -2,7 +2,7 @@ name: CI on: pull_request: - types: [ 'labeled', 'unlabeled', 'opened', 'synchronize', 'reopened' ] + types: ["labeled", "unlabeled", "opened", "synchronize", "reopened"] merge_group: push: branches: @@ -24,7 +24,7 @@ jobs: tests: name: CI Test Suite if: "github.event_name != 'pull_request' || ! contains(github.event.pull_request.labels.*.name, 'flaky-test')" - uses: './.github/workflows/tests.yaml' + uses: "./.github/workflows/tests.yaml" cross_build: name: Cross Build Only @@ -35,8 +35,8 @@ jobs: fail-fast: false matrix: target: - # cross tests are currently broken vor armv7 and aarch64 - # see https://github.com/cross-rs/cross/issues/1311 + # cross tests are currently broken vor armv7 and aarch64 + # see https://github.com/cross-rs/cross/issues/1311 # - armv7-linux-androideabi # - aarch64-linux-android # Freebsd execution fails in cross @@ -45,29 +45,29 @@ jobs: # Netbsd execution fails to link in cross # - x86_64-unknown-netbsd steps: - - name: Checkout - uses: actions/checkout@v4 - with: - submodules: recursive - - - name: Install rust stable - uses: dtolnay/rust-toolchain@stable - - - name: Cleanup Docker - continue-on-error: true - run: | - docker kill $(docker ps -q) - - # See https://github.com/cross-rs/cross/issues/1222 - - uses: taiki-e/install-action@cross - - - name: build - # cross tests are currently broken vor armv7 and aarch64 - # see https://github.com/cross-rs/cross/issues/1311. So on - # those platforms we only build but do not run tests. - run: cross build --all --target ${{ matrix.target }} - env: - RUST_LOG: ${{ runner.debug && 'TRACE' || 'DEBUG'}} + - name: Checkout + uses: actions/checkout@v4 + with: + submodules: recursive + + - name: Install rust stable + uses: dtolnay/rust-toolchain@stable + + - name: Cleanup Docker + continue-on-error: true + run: | + docker kill $(docker ps -q) + + # See https://github.com/cross-rs/cross/issues/1222 + - uses: taiki-e/install-action@cross + + - name: build + # cross tests are currently broken vor armv7 and aarch64 + # see https://github.com/cross-rs/cross/issues/1311. So on + # those platforms we only build but do not run tests. + run: cross build --all --target ${{ matrix.target }} + env: + RUST_LOG: ${{ runner.debug && 'TRACE' || 'DEBUG'}} android_build: name: Android Build Only @@ -82,38 +82,38 @@ jobs: - aarch64-linux-android - armv7-linux-androideabi steps: - - name: Checkout - uses: actions/checkout@v4 - - - name: Set up Rust - uses: dtolnay/rust-toolchain@stable - with: - target: ${{ matrix.target }} - - name: Install rustup target - run: rustup target add ${{ matrix.target }} - - - name: Setup Java - uses: actions/setup-java@v4 - with: - distribution: 'temurin' - java-version: '17' - - - name: Setup Android SDK - uses: android-actions/setup-android@v3 - - - name: Setup Android NDK - uses: arqu/setup-ndk@main - id: setup-ndk - with: - ndk-version: r23 - add-to-path: true - - - name: Build - env: - ANDROID_NDK_HOME: ${{ steps.setup-ndk.outputs.ndk-path }} - run: | - cargo install --version 3.5.4 cargo-ndk - cargo ndk --target ${{ matrix.target }} build + - name: Checkout + uses: actions/checkout@v4 + + - name: Set up Rust + uses: dtolnay/rust-toolchain@stable + with: + target: ${{ matrix.target }} + - name: Install rustup target + run: rustup target add ${{ matrix.target }} + + - name: Setup Java + uses: actions/setup-java@v4 + with: + distribution: "temurin" + java-version: "17" + + - name: Setup Android SDK + uses: android-actions/setup-android@v3 + + - name: Setup Android NDK + uses: arqu/setup-ndk@main + id: setup-ndk + with: + ndk-version: r23 + add-to-path: true + + - name: Build + env: + ANDROID_NDK_HOME: ${{ steps.setup-ndk.outputs.ndk-path }} + run: | + cargo install --version 3.5.4 cargo-ndk + cargo ndk --target ${{ matrix.target }} build cross_test: name: Cross Test @@ -126,26 +126,26 @@ jobs: target: - i686-unknown-linux-gnu steps: - - name: Checkout - uses: actions/checkout@v4 - with: - submodules: recursive + - name: Checkout + uses: actions/checkout@v4 + with: + submodules: recursive - - name: Install rust stable - uses: dtolnay/rust-toolchain@stable + - name: Install rust stable + uses: dtolnay/rust-toolchain@stable - - name: Cleanup Docker - continue-on-error: true - run: | - docker kill $(docker ps -q) + - name: Cleanup Docker + continue-on-error: true + run: | + docker kill $(docker ps -q) - # See https://github.com/cross-rs/cross/issues/1222 - - uses: taiki-e/install-action@cross + # See https://github.com/cross-rs/cross/issues/1222 + - uses: taiki-e/install-action@cross - - name: test - run: cross test --all --target ${{ matrix.target }} -- --test-threads=12 - env: - RUST_LOG: ${{ runner.debug && 'TRACE' || 'DEBUG' }} + - name: test + run: cross test --all --target ${{ matrix.target }} -- --test-threads=12 + env: + RUST_LOG: ${{ runner.debug && 'TRACE' || 'DEBUG' }} check_semver: runs-on: ubuntu-latest @@ -186,13 +186,13 @@ jobs: RUSTC_WRAPPER: "sccache" SCCACHE_GHA_ENABLED: "on" steps: - - uses: actions/checkout@v4 - - uses: dtolnay/rust-toolchain@stable - with: - components: rustfmt - - uses: mozilla-actions/sccache-action@v0.0.9 - - uses: taiki-e/install-action@cargo-make - - run: cargo make format-check + - uses: actions/checkout@v4 + - uses: dtolnay/rust-toolchain@stable + with: + components: rustfmt + - uses: mozilla-actions/sccache-action@v0.0.9 + - uses: taiki-e/install-action@cargo-make + - run: cargo make format-check check_docs: timeout-minutes: 30 @@ -202,17 +202,17 @@ jobs: RUSTC_WRAPPER: "sccache" SCCACHE_GHA_ENABLED: "on" steps: - - uses: actions/checkout@v4 - - uses: dtolnay/rust-toolchain@master - with: - toolchain: nightly-2025-10-09 - - name: Install sccache - uses: mozilla-actions/sccache-action@v0.0.9 - - - name: Docs - run: cargo doc --workspace --all-features --no-deps --document-private-items - env: - RUSTDOCFLAGS: --cfg docsrs + - uses: actions/checkout@v4 + - uses: dtolnay/rust-toolchain@master + with: + toolchain: nightly-2025-10-09 + - name: Install sccache + uses: mozilla-actions/sccache-action@v0.0.9 + + - name: Docs + run: cargo doc --workspace --all-features --no-deps --document-private-items + env: + RUSTDOCFLAGS: --cfg docsrs clippy_check: timeout-minutes: 30 @@ -221,23 +221,23 @@ jobs: RUSTC_WRAPPER: "sccache" SCCACHE_GHA_ENABLED: "on" steps: - - uses: actions/checkout@v4 - - uses: dtolnay/rust-toolchain@stable - with: - components: clippy - - name: Install sccache - uses: mozilla-actions/sccache-action@v0.0.9 + - uses: actions/checkout@v4 + - uses: dtolnay/rust-toolchain@stable + with: + components: clippy + - name: Install sccache + uses: mozilla-actions/sccache-action@v0.0.9 - # TODO: We have a bunch of platform-dependent code so should - # probably run this job on the full platform matrix - - name: clippy check (all features) - run: cargo clippy --workspace --all-features --all-targets --bins --tests --benches + # TODO: We have a bunch of platform-dependent code so should + # probably run this job on the full platform matrix + - name: clippy check (all features) + run: cargo clippy --workspace --all-features --all-targets --bins --tests --benches - - name: clippy check (no features) - run: cargo clippy --workspace --no-default-features --lib --bins --tests + - name: clippy check (no features) + run: cargo clippy --workspace --no-default-features --lib --bins --tests - - name: clippy check (default features) - run: cargo clippy --workspace --all-targets + - name: clippy check (default features) + run: cargo clippy --workspace --all-targets msrv: if: "github.event_name != 'pull_request' || ! contains(github.event.pull_request.labels.*.name, 'flaky-test')" @@ -248,17 +248,17 @@ jobs: RUSTC_WRAPPER: "sccache" SCCACHE_GHA_ENABLED: "on" steps: - - uses: actions/checkout@v4 - - uses: dtolnay/rust-toolchain@master - with: - toolchain: ${{ env.MSRV }} - - name: Install sccache - uses: mozilla-actions/sccache-action@v0.0.9 - - - name: Check MSRV all features - continue-on-error: true - run: | - cargo +$MSRV check --workspace --all-targets + - uses: actions/checkout@v4 + - uses: dtolnay/rust-toolchain@master + with: + toolchain: ${{ env.MSRV }} + - name: Install sccache + uses: mozilla-actions/sccache-action@v0.0.9 + + - name: Check MSRV all features + continue-on-error: true + run: | + cargo +$MSRV check --workspace --all-targets cargo_deny: timeout-minutes: 30 @@ -276,6 +276,43 @@ jobs: timeout-minutes: 30 runs-on: ubuntu-latest steps: - - uses: actions/checkout@v4 - - run: pip install --user codespell[toml] - - run: codespell --ignore-words-list=ans,atmost,crate,inout,ratatui,ser,stayin,swarmin,worl --skip=CHANGELOG.md + - uses: actions/checkout@v4 + - run: pip install --user codespell[toml] + - run: codespell --ignore-words-list=ans,atmost,crate,inout,ratatui,ser,stayin,swarmin,worl --skip=CHANGELOG.md + + wasm_build: + name: Build & test wasm32 + runs-on: ubuntu-latest + env: + RUSTFLAGS: '--cfg getrandom_backend="wasm_js"' + steps: + - name: Checkout sources + uses: actions/checkout@v4 + + - name: Install Node.js + uses: actions/setup-node@v4 + with: + node-version: 20 + + - name: Install stable toolchain + uses: dtolnay/rust-toolchain@stable + + - name: Add wasm target + run: rustup target add wasm32-unknown-unknown + + - name: Install wasm-tools + uses: bytecodealliance/actions/wasm-tools/setup@v1 + + - name: Install wasm-pack + uses: taiki-e/install-action@v2 + with: + tool: wasm-bindgen,wasm-pack + + - name: wasm32 build + run: cargo build --target wasm32-unknown-unknown --no-default-features + + # If the Wasm file contains any 'import "env"' declarations, then + # some non-Wasm-compatible code made it into the final code. + - name: Ensure no 'import "env"' in wasm + run: | + ! wasm-tools print --skeleton target/wasm32-unknown-unknown/debug/iroh_docs.wasm | grep 'import "env"' diff --git a/Cargo.lock b/Cargo.lock index 55ed5dab..14fea3eb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -166,7 +166,7 @@ checksum = "3109e49b1e4909e9db6515a30c633684d68cdeaa252f215214cb4fa1a5bfee2c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", "synstructure", ] @@ -178,7 +178,7 @@ checksum = "7b18050c2cd6fe86c3a76584ef5e0baf286d038cda203eb6223df2cc413565f7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -214,7 +214,7 @@ checksum = "9035ad2d096bed7955a320ee7e2230574d28fd3c3a0f186cbea1ff3c7eed5dbb" dependencies = [ "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -424,9 +424,9 @@ dependencies = [ [[package]] name = "block-buffer" -version = "0.11.0-rc.5" +version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e9ef36a6fcdb072aa548f3da057640ec10859eb4e91ddf526ee648d50c76a949" +checksum = "96eb4cdd6cf1b31d671e9efe75c5d1ec614776856cefbe109ca373554a6d514f" dependencies = [ "hybrid-array", "zeroize", @@ -492,9 +492,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.2.44" +version = "1.2.45" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "37521ac7aabe3d13122dc382493e20c9416f299d2ccd5b3a5340a2570cdeb0f3" +checksum = "35900b6c8d709fb1d854671ae27aeaa9eec2f8b01b364e1619a40da3e6fe2afe" dependencies = [ "find-msvc-tools", "shlex", @@ -587,7 +587,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -796,7 +796,7 @@ checksum = "f46882e17999c6cc590af592290432be3bce0428cb0d5f8b6715e4dc7b383eb3" dependencies = [ "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -821,9 +821,9 @@ checksum = "2a2330da5de22e8a3cb63252ce2abb30116bf5265e89c0e01bc17015ce30a476" [[package]] name = "der" -version = "0.8.0-rc.9" +version = "0.8.0-rc.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e9d8dd2f26c86b27a2a8ea2767ec7f9df7a89516e4794e54ac01ee618dda3aa4" +checksum = "02c1d73e9668ea6b6a28172aa55f3ebec38507131ce179051c8033b5c6037653" dependencies = [ "const-oid", "pem-rfc7468", @@ -862,7 +862,7 @@ dependencies = [ "proc-macro2", "quote", "structmeta", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -891,7 +891,7 @@ checksum = "cb7330aeadfbe296029522e6c40f315320aba36fc43a5b3632f3795348f3bd22" dependencies = [ "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", "unicode-xid", ] @@ -903,7 +903,7 @@ checksum = "bda628edc44c4bb645fbe0f758797143e4e07926f7ebf4e9bdfbd3d2ce621df3" dependencies = [ "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", "unicode-xid", ] @@ -932,7 +932,7 @@ checksum = "97369cbbc041bc366949bc74d34658d6cda5621039731c6310521892a3a20ae0" dependencies = [ "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -963,9 +963,9 @@ checksum = "d0881ea181b1df73ff77ffaaf9c7544ecc11e82fba9b5f27b262a3c73a332555" [[package]] name = "ed25519" -version = "3.0.0-rc.1" +version = "3.0.0-rc.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9ef49c0b20c0ad088893ad2a790a29c06a012b3f05bcfc66661fd22a94b32129" +checksum = "594435fe09e345ee388e4e8422072ff7dfeca8729389fbd997b3f5504c44cd47" dependencies = [ "pkcs8", "serde", @@ -1009,7 +1009,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -1191,7 +1191,7 @@ checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" dependencies = [ "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -1496,9 +1496,9 @@ dependencies = [ [[package]] name = "hyper" -version = "1.7.0" +version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eb3aa54a13a0dfe7fbe3a59e0c76093041720fdc77b110cc0fc260fafb4dc51e" +checksum = "1744436df46f0bde35af3eda22aeaba453aada65d8f1c171cd8a5f59030bd69f" dependencies = [ "atomic-waker", "bytes", @@ -1795,7 +1795,7 @@ dependencies = [ "igd-next", "instant", "iroh-base", - "iroh-metrics", + "iroh-metrics 0.37.0", "iroh-quinn", "iroh-quinn-proto", "iroh-quinn-udp", @@ -1850,8 +1850,7 @@ dependencies = [ [[package]] name = "iroh-blobs" version = "0.97.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c901304c1c28f257fcf9aae8c9149e54e0baf62f5eb2788cecde3bf1206a04e6" +source = "git+https://github.com/n0-computer/iroh-blobs.git?branch=b5%2Ffix-std-time#74175f18714e24356e9fa9171fd36cb96777420e" dependencies = [ "anyhow", "arrayvec", @@ -1867,7 +1866,7 @@ dependencies = [ "iroh", "iroh-base", "iroh-io", - "iroh-metrics", + "iroh-metrics 0.37.0", "iroh-quinn", "iroh-tickets", "irpc", @@ -1897,6 +1896,7 @@ dependencies = [ "async-channel", "blake3", "bytes", + "cfg_aliases", "data-encoding", "derive_more 2.0.1", "ed25519-dalek", @@ -1907,7 +1907,7 @@ dependencies = [ "iroh", "iroh-blobs", "iroh-gossip", - "iroh-metrics", + "iroh-metrics 0.36.2", "iroh-quinn", "iroh-tickets", "irpc", @@ -1956,7 +1956,7 @@ dependencies = [ "indexmap", "iroh", "iroh-base", - "iroh-metrics", + "iroh-metrics 0.37.0", "irpc", "n0-error", "n0-future", @@ -1981,6 +1981,21 @@ dependencies = [ "tokio", ] +[[package]] +name = "iroh-metrics" +version = "0.36.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c84c167b59ae22f940e78eb347ca5f02aa25608e994cb5a7cc016ac2d5eada18" +dependencies = [ + "iroh-metrics-derive 0.3.1", + "itoa", + "postcard", + "ryu", + "serde", + "snafu", + "tracing", +] + [[package]] name = "iroh-metrics" version = "0.37.0" @@ -1990,7 +2005,7 @@ dependencies = [ "http-body-util", "hyper", "hyper-util", - "iroh-metrics-derive", + "iroh-metrics-derive 0.4.0", "itoa", "n0-error", "postcard", @@ -2001,6 +2016,18 @@ dependencies = [ "tracing", ] +[[package]] +name = "iroh-metrics-derive" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "748d380f26f7c25307c0a7acd181b84b977ddc2a1b7beece1e5998623c323aa1" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "syn 2.0.110", +] + [[package]] name = "iroh-metrics-derive" version = "0.4.0" @@ -2010,7 +2037,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -2089,7 +2116,7 @@ dependencies = [ "hyper", "hyper-util", "iroh-base", - "iroh-metrics", + "iroh-metrics 0.37.0", "iroh-quinn", "iroh-quinn-proto", "lru 0.16.2", @@ -2172,7 +2199,7 @@ checksum = "58148196d2230183c9679431ac99b57e172000326d664e8456fa2cd27af6505a" dependencies = [ "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -2401,14 +2428,14 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] name = "n0-future" -version = "0.3.0" +version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "439e746b307c1fd0c08771c3cafcd1746c3ccdb0d9c7b859d3caded366b6da76" +checksum = "8c0709ac8235ce13b82bc4d180ee3c42364b90c1a8a628c3422d991d75a728b5" dependencies = [ "cfg_aliases", "derive_more 1.0.0", @@ -2428,8 +2455,7 @@ dependencies = [ [[package]] name = "n0-snafu" version = "0.2.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1815107e577a95bfccedb4cfabc73d709c0db6d12de3f14e0f284a8c5036dc4f" +source = "git+https://github.com/n0-computer/n0-snafu.git?branch=b5%2Fwasm#fbee015d3789c8e7d86647399e9021f9e651d7d6" dependencies = [ "anyhow", "btparse", @@ -2458,7 +2484,7 @@ dependencies = [ "proc-macro-crate", "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -2656,7 +2682,7 @@ dependencies = [ "proc-macro-crate", "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -2746,9 +2772,9 @@ dependencies = [ [[package]] name = "pem-rfc7468" -version = "1.0.0-rc.3" +version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a8e58fab693c712c0d4e88f8eb3087b6521d060bcaf76aeb20cb192d809115ba" +checksum = "a6305423e0e7738146434843d1694d621cce767262b2a86910beab705e4493d9" dependencies = [ "base64ct", ] @@ -2786,7 +2812,7 @@ checksum = "6e918e4ff8c4549eb882f14b3a4bc8c8bc93de829416eacf579f1207a8fbf861" dependencies = [ "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -2834,9 +2860,9 @@ dependencies = [ [[package]] name = "pkcs8" -version = "0.11.0-rc.7" +version = "0.11.0-rc.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "93eac55f10aceed84769df670ea4a32d2ffad7399400d41ee1c13b1cd8e1b478" +checksum = "77089aec8290d0b7bb01b671b091095cf1937670725af4fd73d47249f03b12c0" dependencies = [ "der", "spki", @@ -2871,7 +2897,7 @@ dependencies = [ "futures-util", "hyper-util", "igd-next", - "iroh-metrics", + "iroh-metrics 0.37.0", "libc", "n0-error", "netwatch", @@ -2920,7 +2946,7 @@ checksum = "e0232bd009a197ceec9cc881ba46f727fcd8060a2d8d6a9dde7a69030a6fe2bb" dependencies = [ "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -3079,9 +3105,9 @@ dependencies = [ [[package]] name = "quote" -version = "1.0.41" +version = "1.0.42" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ce25767e7b499d1b604768e7cde645d14cc8584231ea6b295e9c9eb22c02e1d1" +checksum = "a338cc41d27e6cc6dce6cefc13a0729dfbb81c262b1f519331575dd80ef3067f" dependencies = [ "proc-macro2", ] @@ -3222,7 +3248,7 @@ checksum = "b7186006dcb21920990093f30e3dea63b7d6e977bf1256be20c3563a5db070da" dependencies = [ "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -3649,7 +3675,7 @@ checksum = "d540f220d3187173da220f885ab66608367b6574e925011a9353e4badda91d79" dependencies = [ "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -3814,7 +3840,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -3845,7 +3871,7 @@ checksum = "c87e960f4dca2788eeb86bbdde8dd246be8948790b7618d656e68f9b720a86e8" dependencies = [ "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -3894,7 +3920,7 @@ dependencies = [ "proc-macro2", "quote", "structmeta-derive", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -3905,7 +3931,7 @@ checksum = "152a0b65a590ff6c3da95cabe2353ee04e6167c896b28e3b14478c2636c922fc" dependencies = [ "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -3936,7 +3962,7 @@ dependencies = [ "proc-macro2", "quote", "rustversion", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -3948,7 +3974,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -3970,9 +3996,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.109" +version = "2.0.110" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2f17c7e013e88258aa9543dcbe81aca68a667a9ac37cd69c9fbc07858bfe0e2f" +checksum = "a99801b5bd34ede4cf3fc688c5919368fea4e4814a4664359503e6015b280aea" dependencies = [ "proc-macro2", "quote", @@ -4007,7 +4033,7 @@ checksum = "728a70f3dbaf5bab7f0c4b1ac8d7ae5ea60a4b5549c8a5914361c99147a709d2" dependencies = [ "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -4083,7 +4109,7 @@ dependencies = [ "proc-macro2", "quote", "structmeta", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -4132,7 +4158,7 @@ checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1" dependencies = [ "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -4143,7 +4169,7 @@ checksum = "3ff15c8ecd7de3849db632e14d18d2571fa09dfc5ed93479bc4485c7a517c913" dependencies = [ "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -4236,7 +4262,7 @@ checksum = "af407857209536a95c8e56f8231ef2c2e2aff839b22e07a1ffcbc617e9db9fa5" dependencies = [ "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -4442,7 +4468,7 @@ checksum = "81383ab64e72a7a8b8e13130c49e3dab29def6d0c7d76a03087b3cf71c5c6903" dependencies = [ "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -4476,6 +4502,16 @@ dependencies = [ "tracing-core", ] +[[package]] +name = "tracing-serde" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "704b1aeb7be0d0a84fc9828cae51dab5970fee5088f83d1dd7ee6f6246fc6ff1" +dependencies = [ + "serde", + "tracing-core", +] + [[package]] name = "tracing-subscriber" version = "0.3.20" @@ -4486,12 +4522,15 @@ dependencies = [ "nu-ansi-term", "once_cell", "regex-automata", + "serde", + "serde_json", "sharded-slab", "smallvec", "thread_local", "tracing", "tracing-core", "tracing-log", + "tracing-serde", ] [[package]] @@ -4512,7 +4551,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "04659ddb06c87d233c566112c1c9c5b9e98256d9af50ec3bc9c8327f873a7568" dependencies = [ "quote", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -4702,7 +4741,7 @@ dependencies = [ "bumpalo", "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", "wasm-bindgen-shared", ] @@ -4745,6 +4784,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5a6580f308b1fad9207618087a65c04e7a10bc77e02c8e84e9b00dd4b12fa0bb" dependencies = [ "js-sys", + "serde", "wasm-bindgen", ] @@ -4922,7 +4962,7 @@ checksum = "053e2e040ab57b9dc951b72c264860db7eb3b0200ba345b4e4c3b14f67855ddf" dependencies = [ "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -4933,7 +4973,7 @@ checksum = "3f316c4a2570ba26bbec722032c4099d8c8bc095efccdc15688708623367e358" dependencies = [ "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -5445,7 +5485,7 @@ checksum = "b659052874eb698efe5b9e8cf382204678a0086ebf46982b79d6ca3182927e5d" dependencies = [ "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", "synstructure", ] @@ -5472,7 +5512,7 @@ checksum = "88d2b8d9c68ad2b9e4340d7832716a4d21a22a1154777ad56ea55c51a9cf3831" dependencies = [ "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -5492,7 +5532,7 @@ checksum = "d71e5d6e06ab090c67b5e44993ec16b72dcbaabc526db883a360057678b48502" dependencies = [ "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", "synstructure", ] @@ -5513,7 +5553,7 @@ checksum = "ce36e65b0d2999d2aafac989fb249189a141aee1f53c612c1f37d72631959f69" dependencies = [ "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -5546,5 +5586,5 @@ checksum = "eadce39539ca5cb3985590102671f2567e659fca9666581ad3411d59207951f3" dependencies = [ "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", ] diff --git a/Cargo.toml b/Cargo.toml index a980c26e..d745b0b8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,20 +31,20 @@ futures-buffered = "0.2.4" futures-lite = "2.3.0" futures-util = { version = "0.3.25" } hex = "0.4" -iroh = { version = "0.95" } +iroh = { version = "0.95", default-features = false } iroh-tickets = { version = "0.2"} -iroh-blobs = { version = "0.97" } -iroh-gossip = { version = "0.95", features = ["net"] } -iroh-metrics = { version = "0.37", default-features = false } -irpc = { version = "0.11.0" } -n0-future = "0.3" +iroh-blobs = { version = "0.97", default-features = false, git = "https://github.com/n0-computer/iroh-blobs.git", branch = "b5/fix-std-time" } +iroh-gossip = { version = "0.95", features = ["net"], default-features = false } +iroh-metrics = { version = "0.36", default-features = false } +irpc = { version = "0.11", default-features = false } +n0-future = { version = "0.3.1", features = ["serde"] } num_enum = "0.7" postcard = { version = "1", default-features = false, features = [ "alloc", "use-std", "experimental-derive", ] } -quinn = { package = "iroh-quinn", version = "0.14.0" } +quinn = { package = "iroh-quinn", version = "0.14.0", optional = true } rand = "0.9.2" redb = { version = "2.6.3" } self_cell = "1.0.3" @@ -71,12 +71,19 @@ test-strategy = "0.4" testdir = "0.7" testresult = "0.4.1" tokio = { version = "1", features = ["sync", "macros"] } -tracing-subscriber = { version = "0.3.20", features = ["env-filter"] } tracing-test = "0.2.5" + +tracing-subscriber = { version = "0.3.20", features = ["env-filter"] } + +[target.'cfg(target_arch = "wasm32")'.dependencies] +tracing-subscriber = { version = "0.3.20", default-features = false, features = ["env-filter", "fmt", "json", "registry"] } + [features] -default = ["metrics"] +default = ["metrics", "rpc", "fs-store"] metrics = ["iroh-metrics/metrics", "iroh/metrics"] +rpc = ["dep:quinn", "irpc/rpc", "iroh-blobs/rpc"] +fs-store = ["iroh-blobs/fs-store"] [package.metadata.docs.rs] all-features = true @@ -94,3 +101,6 @@ missing_debug_implementations = "warn" # do. To enable for a crate set `#![cfg_attr(iroh_docsrs, # feature(doc_cfg))]` in the crate. unexpected_cfgs = { level = "warn", check-cfg = ["cfg(iroh_docsrs)"] } + +[build-dependencies] +cfg_aliases = "0.2.1" diff --git a/build.rs b/build.rs new file mode 100644 index 00000000..7aae5682 --- /dev/null +++ b/build.rs @@ -0,0 +1,9 @@ +use cfg_aliases::cfg_aliases; + +fn main() { + // Setup cfg aliases + cfg_aliases! { + // Convenience aliases + wasm_browser: { all(target_family = "wasm", target_os = "unknown") }, + } +} diff --git a/src/actor.rs b/src/actor.rs index 7b9f9241..9969832c 100644 --- a/src/actor.rs +++ b/src/actor.rs @@ -4,8 +4,6 @@ use std::{ collections::{hash_map, HashMap}, num::NonZeroU64, sync::Arc, - thread::JoinHandle, - time::Duration, }; use anyhow::{anyhow, Context, Result}; @@ -13,8 +11,11 @@ use bytes::Bytes; use futures_util::FutureExt; use iroh_blobs::Hash; use irpc::channel::mpsc; +use n0_future::{task::JoinSet, time::Duration}; use serde::{Deserialize, Serialize}; -use tokio::{sync::oneshot, task::JoinSet}; +use tokio::sync::oneshot; +#[cfg(wasm_browser)] +use tracing::Instrument; use tracing::{debug, error, error_span, trace, warn}; use crate::{ @@ -226,7 +227,11 @@ struct OpenReplica { #[derive(Debug, Clone)] pub struct SyncHandle { tx: async_channel::Sender, - join_handle: Arc>>, + #[cfg(wasm_browser)] + #[allow(unused)] + join_handle: Arc>>, + #[cfg(not(wasm_browser))] + join_handle: Arc>>, metrics: Arc, } @@ -270,17 +275,23 @@ impl SyncHandle { tasks: Default::default(), metrics: metrics.clone(), }; + + let span = error_span!("sync", %me); + #[cfg(wasm_browser)] + let join_handle = n0_future::task::spawn(actor.run_async().instrument(span)); + + #[cfg(not(wasm_browser))] let join_handle = std::thread::Builder::new() .name("sync-actor".to_string()) .spawn(move || { - let span = error_span!("sync", %me); let _enter = span.enter(); - if let Err(err) = actor.run() { + if let Err(err) = actor.run_in_thread() { error!("Sync actor closed with error: {err:?}"); } }) .expect("failed to spawn thread"); + let join_handle = Arc::new(Some(join_handle)); SyncHandle { tx: action_tx, @@ -594,14 +605,26 @@ impl SyncHandle { impl Drop for SyncHandle { fn drop(&mut self) { // this means we're dropping the last reference + #[allow(unused)] if let Some(handle) = Arc::get_mut(&mut self.join_handle) { - // this call is the reason tx can not be a tokio mpsc channel. - // we have no control about where drop is called, yet tokio send_blocking panics - // when called from inside a tokio runtime. - self.tx.send_blocking(Action::Shutdown { reply: None }).ok(); - let handle = handle.take().expect("this can only run once"); - if let Err(err) = handle.join() { - warn!(?err, "Failed to join sync actor"); + #[cfg(wasm_browser)] + { + let tx = self.tx.clone(); + n0_future::task::spawn(async move { + tx.send(Action::Shutdown { reply: None }).await.ok(); + }); + } + #[cfg(not(wasm_browser))] + { + // this call is the reason tx can not be a tokio mpsc channel. + // we have no control about where drop is called, yet tokio send_blocking panics + // when called from inside a tokio runtime. + self.tx.send_blocking(Action::Shutdown { reply: None }).ok(); + let handle = handle.take().expect("this can only run once"); + + if let Err(err) = handle.join() { + warn!(?err, "Failed to join sync actor"); + } } } } @@ -617,7 +640,8 @@ struct Actor { } impl Actor { - fn run(self) -> Result<()> { + #[cfg(not(wasm_browser))] + fn run_in_thread(self) -> Result<()> { let rt = tokio::runtime::Builder::new_current_thread() .enable_time() .build()?; @@ -628,7 +652,7 @@ impl Actor { async fn run_async(mut self) { let reply = loop { - let timeout = tokio::time::sleep(MAX_COMMIT_DELAY); + let timeout = n0_future::time::sleep(MAX_COMMIT_DELAY); tokio::pin!(timeout); let action = tokio::select! { _ = &mut timeout => { @@ -764,37 +788,46 @@ impl Actor { hash, len, reply, - } => send_reply_with(reply, self, move |this| { - let author = get_author(&mut this.store, &author)?; - let mut replica = this.states.replica(namespace, &mut this.store)?; - replica.insert(&key, &author, hash, len)?; - this.metrics.new_entries_local.inc(); - this.metrics.new_entries_local_size.inc_by(len); - Ok(()) - }), + } => { + send_reply_with_async(reply, self, async move |this| { + let author = get_author(&mut this.store, &author)?; + let mut replica = this.states.replica(namespace, &mut this.store)?; + replica.insert(&key, &author, hash, len).await?; + this.metrics.new_entries_local.inc(); + this.metrics.new_entries_local_size.inc_by(len); + Ok(()) + }) + .await + } ReplicaAction::DeletePrefix { author, key, reply } => { - send_reply_with(reply, self, |this| { + send_reply_with_async(reply, self, async |this| { let author = get_author(&mut this.store, &author)?; let mut replica = this.states.replica(namespace, &mut this.store)?; - let res = replica.delete_prefix(&key, &author)?; + let res = replica.delete_prefix(&key, &author).await?; Ok(res) }) + .await } ReplicaAction::InsertRemote { entry, from, content_status, reply, - } => send_reply_with(reply, self, move |this| { - let mut replica = this - .states - .replica_if_syncing(&namespace, &mut this.store)?; - let len = entry.content_len(); - replica.insert_remote_entry(entry, from, content_status)?; - this.metrics.new_entries_remote.inc(); - this.metrics.new_entries_remote_size.inc_by(len); - Ok(()) - }), + } => { + send_reply_with_async(reply, self, async move |this| { + let mut replica = this + .states + .replica_if_syncing(&namespace, &mut this.store)?; + let len = entry.content_len(); + replica + .insert_remote_entry(entry, from, content_status) + .await?; + this.metrics.new_entries_remote.inc(); + this.metrics.new_entries_remote_size.inc_by(len); + Ok(()) + }) + .await + } ReplicaAction::SyncInitialMessage { reply } => { send_reply_with(reply, self, move |this| { @@ -1049,6 +1082,14 @@ fn send_reply_with( sender.send(f(this)).map_err(send_reply_error) } +async fn send_reply_with_async( + sender: oneshot::Sender>, + this: &mut Actor, + f: impl AsyncFnOnce(&mut Actor) -> Result, +) -> Result<(), SendReplyError> { + sender.send(f(this).await).map_err(send_reply_error) +} + fn send_reply_error(_err: T) -> SendReplyError { SendReplyError } diff --git a/src/api.rs b/src/api.rs index fc5e9c07..b0b3ac45 100644 --- a/src/api.rs +++ b/src/api.rs @@ -4,7 +4,6 @@ use std::{ future::Future, - net::SocketAddr, path::Path, pin::Pin, sync::{ @@ -14,18 +13,14 @@ use std::{ task::{ready, Poll}, }; -use anyhow::{Context, Result}; +use anyhow::Result; use bytes::Bytes; use iroh::EndpointAddr; use iroh_blobs::{ api::blobs::{AddPathOptions, AddProgressItem, ExportMode, ExportOptions, ExportProgress}, Hash, }; -use irpc::rpc::Handler; -use n0_future::{ - task::{self, AbortOnDropHandle}, - FutureExt, Stream, StreamExt, -}; +use n0_future::{FutureExt, Stream, StreamExt}; use self::{ actor::RpcActor, @@ -67,19 +62,25 @@ impl DocsApi { } /// Connect to a remote docs service - pub fn connect(endpoint: quinn::Endpoint, addr: SocketAddr) -> Result { + #[cfg(feature = "rpc")] + pub fn connect(endpoint: quinn::Endpoint, addr: std::net::SocketAddr) -> Result { Ok(DocsApi { inner: Client::quinn(endpoint, addr), }) } /// Listen for incoming RPC connections - pub fn listen(&self, endpoint: quinn::Endpoint) -> Result> { + #[cfg(feature = "rpc")] + pub fn listen( + &self, + endpoint: quinn::Endpoint, + ) -> Result> { + use anyhow::Context; let local = self .inner .as_local() .context("cannot listen on remote API")?; - let handler: Handler = Arc::new(move |msg, _rx, tx| { + let handler: irpc::rpc::Handler = Arc::new(move |msg, _rx, tx| { let local = local.clone(); Box::pin(async move { match msg { @@ -114,8 +115,8 @@ impl DocsApi { } }) }); - let join_handle = task::spawn(irpc::rpc::listen(endpoint, handler)); - Ok(AbortOnDropHandle::new(join_handle)) + let join_handle = n0_future::task::spawn(irpc::rpc::listen(endpoint, handler)); + Ok(n0_future::task::AbortOnDropHandle::new(join_handle)) } /// Creates a new document author. diff --git a/src/api/actor.rs b/src/api/actor.rs index 5a8fce4b..9d365c10 100644 --- a/src/api/actor.rs +++ b/src/api/actor.rs @@ -429,7 +429,7 @@ impl RpcActor { return; } }; - tokio::task::spawn(async move { + n0_future::task::spawn(async move { loop { tokio::select! { msg = stream.next() => { diff --git a/src/api/protocol.rs b/src/api/protocol.rs index 1575474a..ca77fea9 100644 --- a/src/api/protocol.rs +++ b/src/api/protocol.rs @@ -304,7 +304,7 @@ pub struct AuthorDeleteResponse; // Use the macro to generate both the DocsProtocol and DocsMessage enums // plus implement Channels for each type -#[rpc_requests(message = DocsMessage)] +#[rpc_requests(message = DocsMessage, rpc_feature = "rpc")] #[derive(Serialize, Deserialize, Debug)] pub enum DocsProtocol { #[rpc(tx = oneshot::Sender>)] diff --git a/src/engine.rs b/src/engine.rs index 22831d41..3a3833da 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -2,13 +2,9 @@ //! //! [`crate::Replica`] is also called documents here. -use std::{ - path::PathBuf, - str::FromStr, - sync::{Arc, RwLock}, -}; +use std::sync::{Arc, RwLock}; -use anyhow::{bail, Context, Result}; +use anyhow::{bail, Result}; use futures_lite::{Stream, StreamExt}; use iroh::{Endpoint, EndpointAddr, PublicKey}; use iroh_blobs::{ @@ -17,9 +13,9 @@ use iroh_blobs::{ Hash, }; use iroh_gossip::net::Gossip; +use n0_future::task::AbortOnDropHandle; use serde::{Deserialize, Serialize}; use tokio::sync::{mpsc, oneshot}; -use tokio_util::task::AbortOnDropHandle; use tracing::{debug, error, error_span, Instrument}; use self::live::{LiveActor, ToLiveActor}; @@ -128,7 +124,7 @@ impl Engine { live_actor_tx.clone(), sync.metrics().clone(), ); - let actor_handle = tokio::task::spawn( + let actor_handle = n0_future::task::spawn( async move { if let Err(err) = actor.run().await { error!("sync actor failed: {err:?}"); @@ -355,7 +351,8 @@ pub enum DefaultAuthorStorage { /// Memory storage. Mem, /// File based persistent storage. - Persistent(PathBuf), + #[cfg(feature = "fs-store")] + Persistent(std::path::PathBuf), } impl DefaultAuthorStorage { @@ -373,7 +370,11 @@ impl DefaultAuthorStorage { docs_store.import_author(author).await?; Ok(author_id) } + #[cfg(feature = "fs-store")] Self::Persistent(ref path) => { + use std::str::FromStr; + + use anyhow::Context; if path.exists() { let data = tokio::fs::read_to_string(path).await.with_context(|| { format!( @@ -407,12 +408,14 @@ impl DefaultAuthorStorage { } /// Save a new default author. - pub async fn persist(&self, author_id: AuthorId) -> anyhow::Result<()> { + pub async fn persist(&self, #[allow(unused)] author_id: AuthorId) -> anyhow::Result<()> { match self { Self::Mem => { // persistence is not possible for the mem storage so this is a noop. } + #[cfg(feature = "fs-store")] Self::Persistent(ref path) => { + use anyhow::Context; tokio::fs::write(path, author_id.to_string()) .await .with_context(|| { diff --git a/src/engine/gossip.rs b/src/engine/gossip.rs index bb892287..bfaf0324 100644 --- a/src/engine/gossip.rs +++ b/src/engine/gossip.rs @@ -9,10 +9,8 @@ use iroh_gossip::{ api::{Event, GossipReceiver, GossipSender, JoinOptions}, net::Gossip, }; -use tokio::{ - sync::mpsc, - task::{AbortHandle, JoinSet}, -}; +use n0_future::task::{AbortHandle, JoinSet}; +use tokio::sync::mpsc; use tracing::{debug, instrument, warn}; use super::live::{Op, ToLiveActor}; diff --git a/src/engine/live.rs b/src/engine/live.rs index 9a4b11f1..b8355344 100644 --- a/src/engine/live.rs +++ b/src/engine/live.rs @@ -3,7 +3,6 @@ use std::{ collections::{HashMap, HashSet}, sync::Arc, - time::SystemTime, }; use anyhow::{Context, Result}; @@ -20,11 +19,9 @@ use iroh_blobs::{ Hash, HashAndFormat, }; use iroh_gossip::net::Gossip; +use n0_future::{task::JoinSet, time::SystemTime}; use serde::{Deserialize, Serialize}; -use tokio::{ - sync::{self, mpsc, oneshot}, - task::JoinSet, -}; +use tokio::sync::{self, mpsc, oneshot}; use tracing::{debug, error, info, instrument, trace, warn, Instrument, Span}; // use super::gossip::{GossipActor, ToGossipActor}; diff --git a/src/engine/state.rs b/src/engine/state.rs index c210610c..6e040d26 100644 --- a/src/engine/state.rs +++ b/src/engine/state.rs @@ -1,10 +1,8 @@ -use std::{ - collections::BTreeMap, - time::{Instant, SystemTime}, -}; +use std::collections::BTreeMap; use anyhow::Result; use iroh::EndpointId; +use n0_future::time::{Instant, SystemTime}; use serde::{Deserialize, Serialize}; use tracing::{debug, warn}; diff --git a/src/net.rs b/src/net.rs index 9c4757f7..771284e0 100644 --- a/src/net.rs +++ b/src/net.rs @@ -1,11 +1,9 @@ //! Network implementation of the iroh-docs protocol -use std::{ - future::Future, - time::{Duration, Instant}, -}; +use std::future::Future; use iroh::{Endpoint, EndpointAddr, PublicKey}; +use n0_future::time::{Duration, Instant}; use serde::{Deserialize, Serialize}; use tracing::{debug, error_span, trace, Instrument}; diff --git a/src/net/codec.rs b/src/net/codec.rs index 81dbc0ef..c2ed08b5 100644 --- a/src/net/codec.rs +++ b/src/net/codec.rs @@ -322,6 +322,7 @@ mod tests { let alice_replica_id = alice_replica.id(); alice_replica .hash_and_insert("hello bob", &author, "from alice") + .await .unwrap(); let mut bob_store = store::Store::memory(); @@ -329,6 +330,7 @@ mod tests { let bob_replica_id = bob_replica.id(); bob_replica .hash_and_insert("hello alice", &author, "from bob") + .await .unwrap(); assert_eq!( @@ -429,6 +431,7 @@ mod tests { #[tokio::test] #[traced_test] + #[cfg(feature = "fs-store")] async fn test_sync_many_authors_fs() -> Result<()> { let tmpdir = tempfile::tempdir()?; let alice_store = store::fs::Store::persistent(tmpdir.path().join("a.db"))?; @@ -438,9 +441,9 @@ mod tests { type Message = (AuthorId, Vec, Hash); - fn insert_messages( + async fn insert_messages( mut rng: impl CryptoRng, - replica: &mut crate::sync::Replica, + replica: &mut crate::sync::Replica<'_>, num_authors: usize, msgs_per_author: usize, key_value_fn: impl Fn(&AuthorId, usize) -> (String, String), @@ -453,7 +456,10 @@ mod tests { for i in 0..msgs_per_author { for author in authors.iter() { let (key, value) = key_value_fn(&author.id(), i); - let hash = replica.hash_and_insert(key.clone(), author, value).unwrap(); + let hash = replica + .hash_and_insert(key.clone(), author, value) + .await + .unwrap(); res.push((author.id(), key.as_bytes().to_vec(), hash)); } } @@ -509,7 +515,8 @@ mod tests { format!("from alice by {author}: {i}"), ) }, - ); + ) + .await; all_messages.extend_from_slice(&alice_messages); let mut bob_replica = bob_store.new_replica(namespace.clone()).unwrap(); @@ -524,7 +531,8 @@ mod tests { format!("from bob by {author}: {i}"), ) }, - ); + ) + .await; all_messages.extend_from_slice(&bob_messages); all_messages.sort(); @@ -622,6 +630,7 @@ mod tests { #[tokio::test] #[traced_test] + #[cfg(feature = "fs-store")] async fn test_sync_timestamps_fs() -> Result<()> { let tmpdir = tempfile::tempdir()?; let alice_store = store::fs::Store::persistent(tmpdir.path().join("a.db"))?; @@ -646,10 +655,12 @@ mod tests { // Insert into alice let hash_alice = alice_replica .hash_and_insert(&key, &author, &value_alice) + .await .unwrap(); // Insert into bob let hash_bob = bob_replica .hash_and_insert(&key, &author, &value_bob) + .await .unwrap(); assert_eq!( diff --git a/src/protocol.rs b/src/protocol.rs index e07568ea..08ebb8bf 100644 --- a/src/protocol.rs +++ b/src/protocol.rs @@ -1,6 +1,6 @@ //! [`ProtocolHandler`] implementation for the docs [`Engine`]. -use std::{path::PathBuf, sync::Arc}; +use std::sync::Arc; use anyhow::Result; use iroh::{endpoint::Connection, protocol::ProtocolHandler, Endpoint}; @@ -13,6 +13,14 @@ use crate::{ store::Store, }; +#[derive(Default, Debug)] +enum Storage { + #[default] + Memory, + #[cfg(feature = "fs-store")] + Persistent(std::path::PathBuf), +} + /// Docs protocol. #[derive(Debug, Clone)] pub struct Docs { @@ -28,9 +36,10 @@ impl Docs { /// Create a new [`Builder`] for the docs protocol, using a persistent replica and author storage /// in the given directory. - pub fn persistent(path: PathBuf) -> Builder { + #[cfg(feature = "fs-store")] + pub fn persistent(path: std::path::PathBuf) -> Builder { Builder { - path: Some(path), + storage: Storage::Persistent(path), protect_cb: None, } } @@ -75,7 +84,7 @@ impl ProtocolHandler for Docs { /// Builder for the docs protocol. #[derive(Debug, Default)] pub struct Builder { - path: Option, + storage: Storage, protect_cb: Option, } @@ -95,13 +104,17 @@ impl Builder { blobs: BlobsStore, gossip: Gossip, ) -> anyhow::Result { - let replica_store = match self.path { - Some(ref path) => Store::persistent(path.join("docs.redb"))?, - None => Store::memory(), + let replica_store = match &self.storage { + Storage::Memory => Store::memory(), + #[cfg(feature = "fs-store")] + Storage::Persistent(path) => Store::persistent(path.join("docs.redb"))?, }; - let author_store = match self.path { - Some(ref path) => DefaultAuthorStorage::Persistent(path.join("default-author")), - None => DefaultAuthorStorage::Mem, + let author_store = match &self.storage { + Storage::Memory => DefaultAuthorStorage::Mem, + #[cfg(feature = "fs-store")] + Storage::Persistent(path) => { + DefaultAuthorStorage::Persistent(path.join("default-author")) + } }; let downloader = blobs.downloader(&endpoint); let engine = Engine::spawn( diff --git a/src/ranger.rs b/src/ranger.rs index c520eca1..6d5ef87c 100644 --- a/src/ranger.rs +++ b/src/ranger.rs @@ -1,7 +1,7 @@ //! Implementation of Set Reconcilliation based on //! "Range-Based Set Reconciliation" by Aljoscha Meyer. -use std::{fmt::Debug, pin::Pin}; +use std::fmt::Debug; use n0_future::StreamExt; use serde::{Deserialize, Serialize}; @@ -288,6 +288,7 @@ pub trait Store: Sized { /// /// This will remove just the entry with the given key, but will not perform prefix deletion. #[cfg(test)] + #[allow(unused)] fn entry_remove(&mut self, key: &E::Key) -> Result, Self::Error>; /// Remove all entries whose key start with a prefix and for which the `predicate` callback @@ -330,11 +331,8 @@ pub trait Store: Sized { ) -> Result>, Self::Error> where F: Fn(&Self, &E, ContentStatus) -> bool, - F2: FnMut(&Self, E, ContentStatus), - F3: for<'a> Fn( - &'a E, - ) - -> Pin + Send + 'a>>, + F2: AsyncFnMut(&Self, E, ContentStatus), + F3: for<'a> AsyncFn(&'a E) -> ContentStatus, { let mut out = Vec::new(); @@ -397,7 +395,7 @@ pub trait Store: Sized { // TODO: Get rid of the clone? let outcome = self.put(entry.clone())?; if let InsertOutcome::Inserted { .. } = outcome { - on_insert_cb(self, entry, content_status); + on_insert_cb(self, entry, content_status).await; } } } @@ -1419,8 +1417,8 @@ mod tests { &Default::default(), msg, &bob_validate_cb, - |_, _, _| (), - |_| Box::pin(async move { ContentStatus::Complete }), + async |_, _, _| (), + async |_| ContentStatus::Complete, ) .await .unwrap() @@ -1431,8 +1429,8 @@ mod tests { &Default::default(), msg, &alice_validate_cb, - |_, _, _| (), - |_| Box::pin(async move { ContentStatus::Complete }), + async |_, _, _| (), + async |_| ContentStatus::Complete, ) .await .unwrap(); diff --git a/src/store/fs.rs b/src/store/fs.rs index 6a0f6c54..e05223df 100644 --- a/src/store/fs.rs +++ b/src/store/fs.rs @@ -6,15 +6,15 @@ use std::{ iter::{Chain, Flatten}, num::NonZeroU64, ops::Bound, - path::Path, }; use anyhow::{anyhow, Result}; use ed25519_dalek::{SignatureError, VerifyingKey}; use iroh_blobs::Hash; +use n0_future::time::SystemTime; use rand::CryptoRng; -use redb::{Database, DatabaseError, ReadableMultimapTable, ReadableTable}; -use tracing::{info, warn}; +use redb::{Database, ReadableMultimapTable, ReadableTable}; +use tracing::warn; use super::{ pubkeys::MemPublicKeyStore, DownloadPolicy, ImportNamespaceOutcome, OpenError, PublicKeyStore, @@ -98,16 +98,17 @@ impl Store { /// Create or open a store from a `path` to a database file. /// /// The file will be created if it does not exist, otherwise it will be opened. - pub fn persistent(path: impl AsRef) -> Result { + #[cfg(feature = "fs-store")] + pub fn persistent(path: impl AsRef) -> Result { let mut db = match Database::create(&path) { Ok(db) => db, - Err(DatabaseError::UpgradeRequired(1)) => return Err( + Err(redb::DatabaseError::UpgradeRequired(1)) => return Err( anyhow!("Opening the database failed: Upgrading from old format is no longer supported. Use iroh-docs 0.92 to perform the upgrade, then upgrade to the latest release again.") ), Err(err) => return Err(err.into()), }; match db.upgrade() { - Ok(true) => info!("Database was upgraded to redb v3 compatible format"), + Ok(true) => tracing::info!("Database was upgraded to redb v3 compatible format"), Ok(false) => {} Err(err) => warn!("Database upgrade to redb v3 compatible format failed: {err:#}"), } @@ -487,7 +488,7 @@ impl Store { let peer = &peer; let namespace = namespace.as_bytes(); // calculate nanos since UNIX_EPOCH for a time measurement - let nanos = std::time::UNIX_EPOCH + let nanos = SystemTime::UNIX_EPOCH .elapsed() .map(|duration| duration.as_nanos() as u64)?; self.modify(|tables| { @@ -984,13 +985,13 @@ fn into_entry(key: RecordsId, value: RecordsValue) -> SignedEntry { SignedEntry::new(entry_signature, entry) } -#[cfg(test)] +#[cfg(all(test, feature = "fs-store"))] mod tests { - use super::{tables::LATEST_PER_AUTHOR_TABLE, *}; + use super::*; use crate::ranger::Store as _; - #[test] - fn test_ranges() -> Result<()> { + #[tokio::test] + async fn test_ranges() -> Result<()> { let dbfile = tempfile::NamedTempFile::new()?; let mut store = Store::persistent(dbfile.path())?; @@ -1001,8 +1002,8 @@ mod tests { // test author prefix relation for all-255 keys let key1 = vec![255, 255]; let key2 = vec![255, 255, 255]; - replica.hash_and_insert(&key1, &author, b"v1")?; - replica.hash_and_insert(&key2, &author, b"v2")?; + replica.hash_and_insert(&key1, &author, b"v1").await?; + replica.hash_and_insert(&key2, &author, b"v2").await?; let res = store .get_many(namespace.id(), Query::author(author.id()).key_prefix([255]))? .collect::>>()?; @@ -1094,7 +1095,7 @@ mod tests { } fn copy_and_modify( - source: &Path, + source: &std::path::Path, modify: impl Fn(&redb::WriteTransaction) -> Result<()>, ) -> Result { let dbfile = tempfile::NamedTempFile::new()?; @@ -1107,8 +1108,10 @@ mod tests { Ok(dbfile) } - #[test] - fn test_migration_001_populate_latest_table() -> Result<()> { + #[tokio::test] + #[cfg(feature = "fs-store")] + async fn test_migration_001_populate_latest_table() -> Result<()> { + use super::tables::LATEST_PER_AUTHOR_TABLE; let dbfile = tempfile::NamedTempFile::new()?; let namespace = NamespaceSecret::new(&mut rand::rng()); @@ -1118,9 +1121,9 @@ mod tests { let author1 = store.new_author(&mut rand::rng())?; let author2 = store.new_author(&mut rand::rng())?; let mut replica = store.new_replica(namespace.clone())?; - replica.hash_and_insert(b"k1", &author1, b"v1")?; - replica.hash_and_insert(b"k2", &author2, b"v1")?; - replica.hash_and_insert(b"k3", &author1, b"v1")?; + replica.hash_and_insert(b"k1", &author1, b"v1").await?; + replica.hash_and_insert(b"k2", &author2, b"v1").await?; + replica.hash_and_insert(b"k3", &author1, b"v1").await?; let expected = store .get_latest_for_each_author(namespace.id())? @@ -1151,6 +1154,7 @@ mod tests { } #[test] + #[cfg(feature = "fs-store")] fn test_migration_004_populate_by_key_index() -> Result<()> { use redb::ReadableTableMetadata; let dbfile = tempfile::NamedTempFile::new()?; diff --git a/src/store/fs/tables.rs b/src/store/fs/tables.rs index d683d038..34744162 100644 --- a/src/store/fs/tables.rs +++ b/src/store/fs/tables.rs @@ -1,9 +1,8 @@ #![allow(missing_docs)] // Table Definitions -use std::time::Instant; - use bytes::Bytes; +use n0_future::time::Instant; use redb::{ MultimapTable, MultimapTableDefinition, ReadOnlyMultimapTable, ReadOnlyTable, ReadTransaction, Table, TableDefinition, WriteTransaction, @@ -61,7 +60,7 @@ pub type RecordsByKeyIdOwned = ([u8; 32], Bytes, [u8; 32]); /// Value: `(u64, [u8; 32])` # ([`Nanos`], &[`PeerIdBytes`]) representing the last time a peer was used. pub const NAMESPACE_PEERS_TABLE: MultimapTableDefinition<&[u8; 32], (Nanos, &PeerIdBytes)> = MultimapTableDefinition::new("sync-peers-1"); -/// Number of seconds elapsed since [`std::time::SystemTime::UNIX_EPOCH`]. Used to register the +/// Number of seconds elapsed since [`n0_future::time::SystemTime::UNIX_EPOCH`]. Used to register the /// last time a peer was useful in a document. // NOTE: resolution is nanoseconds, stored as a u64 since this covers ~500years from unix epoch, // which should be more than enough diff --git a/src/sync.rs b/src/sync.rs index ddb140b6..87d1c431 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -11,12 +11,15 @@ use std::{ fmt::Debug, ops::{Deref, DerefMut}, sync::Arc, - time::{Duration, SystemTime}, }; use bytes::{Bytes, BytesMut}; use ed25519_dalek::{Signature, SignatureError}; use iroh_blobs::Hash; +use n0_future::{ + time::{Duration, SystemTime}, + IterExt, +}; use serde::{Deserialize, Serialize}; pub use crate::heads::AuthorHeads; @@ -129,16 +132,22 @@ impl Subscribers { pub fn unsubscribe(&mut self, sender: &async_channel::Sender) { self.0.retain(|s| !same_channel(s, sender)); } - pub fn send(&mut self, event: Event) { - self.0 - .retain(|sender| sender.send_blocking(event.clone()).is_ok()) + pub async fn send(&mut self, event: Event) { + self.0 = std::mem::take(&mut self.0) + .into_iter() + .map(async |tx| tx.send(event.clone()).await.ok().map(|_| tx)) + .join_all() + .await + .into_iter() + .flatten() + .collect(); } pub fn len(&self) -> usize { self.0.len() } - pub fn send_with(&mut self, f: impl FnOnce() -> Event) { + pub async fn send_with(&mut self, f: impl FnOnce() -> Event) { if !self.0.is_empty() { - self.send(f()) + self.send(f()).await } } } @@ -361,7 +370,7 @@ where /// /// Returns the number of entries removed as a consequence of this insertion, /// or an error either if the entry failed to validate or if a store operation failed. - pub fn insert( + pub async fn insert( &mut self, key: impl AsRef<[u8]>, author: &Author, @@ -377,7 +386,7 @@ where let entry = Entry::new(id, record); let secret = self.secret_key()?; let signed_entry = entry.sign(secret, author); - self.insert_entry(signed_entry, InsertOrigin::Local) + self.insert_entry(signed_entry, InsertOrigin::Local).await } /// Delete entries that match the given `author` and key `prefix`. @@ -386,7 +395,7 @@ where /// entries whose key starts with or is equal to the given `prefix`. /// /// Returns the number of entries deleted. - pub fn delete_prefix( + pub async fn delete_prefix( &mut self, prefix: impl AsRef<[u8]>, author: &Author, @@ -395,7 +404,7 @@ where let id = RecordIdentifier::new(self.id(), author.id(), prefix); let entry = Entry::new_empty(id); let signed_entry = entry.sign(self.secret_key()?, author); - self.insert_entry(signed_entry, InsertOrigin::Local) + self.insert_entry(signed_entry, InsertOrigin::Local).await } /// Insert an entry into this replica which was received from a remote peer. @@ -405,7 +414,7 @@ where /// /// Returns the number of entries removed as a consequence of this insertion, /// or an error if the entry failed to validate or if a store operation failed. - pub fn insert_remote_entry( + pub async fn insert_remote_entry( &mut self, entry: SignedEntry, received_from: PeerIdBytes, @@ -417,13 +426,13 @@ where from: received_from, remote_content_status: content_status, }; - self.insert_entry(entry, origin) + self.insert_entry(entry, origin).await } /// Insert a signed entry into the database. /// /// Returns the number of entries removed as a consequence of this insertion. - fn insert_entry( + async fn insert_entry( &mut self, entry: SignedEntry, origin: InsertOrigin, @@ -462,7 +471,7 @@ where } }; - self.info.subscribers.send(insert_event); + self.info.subscribers.send(insert_event).await; Ok(removed_count) } @@ -471,7 +480,7 @@ where /// /// This does not store the content, just the record of it. /// Returns the calculated hash. - pub fn hash_and_insert( + pub async fn hash_and_insert( &mut self, key: impl AsRef<[u8]>, author: &Author, @@ -480,7 +489,7 @@ where self.info.ensure_open()?; let len = data.as_ref().len() as u64; let hash = Hash::new(data); - self.insert(key, author, hash, len)?; + self.insert(key, author, hash, len).await?; Ok(hash) } @@ -537,29 +546,29 @@ where validate_entry(now, store, my_namespace, entry, &origin).is_ok() }, // on_insert callback: is called when an entry was actually inserted in the store - |_store, entry, content_status| { + async |_store, entry, content_status| { // We use `send_with` to only clone the entry if we have active subscriptions. - self.info.subscribers.send_with(|| { - let should_download = download_policy.matches(entry.entry()); - Event::RemoteInsert { - from: from_peer, - namespace: my_namespace, - entry: entry.clone(), - should_download, - remote_content_status: content_status, - } - }) + self.info + .subscribers + .send_with(|| { + let should_download = download_policy.matches(entry.entry()); + Event::RemoteInsert { + from: from_peer, + namespace: my_namespace, + entry: entry.clone(), + should_download, + remote_content_status: content_status, + } + }) + .await }, // content_status callback: get content status for outgoing entries - move |entry| { - let cb = cb.clone(); - Box::pin(async move { - if let Some(cb) = cb.as_ref() { - cb(entry.content_hash()).await - } else { - ContentStatus::Missing - } - }) + async move |entry| { + if let Some(cb) = cb.as_ref() { + cb(entry.content_hash()).await + } else { + ContentStatus::Missing + } }, ) .await?; @@ -1205,23 +1214,24 @@ mod tests { store::{OpenError, Query, SortBy, SortDirection, Store}, }; - #[test] - fn test_basics_memory() -> Result<()> { + #[tokio::test] + async fn test_basics_memory() -> Result<()> { let store = store::Store::memory(); - test_basics(store)?; + test_basics(store).await?; Ok(()) } - #[test] - fn test_basics_fs() -> Result<()> { + #[tokio::test] + #[cfg(feature = "fs-store")] + async fn test_basics_fs() -> Result<()> { let dbfile = tempfile::NamedTempFile::new()?; let store = store::fs::Store::persistent(dbfile.path())?; - test_basics(store)?; + test_basics(store).await?; Ok(()) } - fn test_basics(mut store: Store) -> Result<()> { + async fn test_basics(mut store: Store) -> Result<()> { let mut rng = rand::rng(); let alice = Author::new(&mut rng); let bob = Author::new(&mut rng); @@ -1235,11 +1245,9 @@ mod tests { let mut my_replica = store.new_replica(myspace.clone())?; for i in 0..10 { - my_replica.hash_and_insert( - format!("/{i}"), - &alice, - format!("{i}: hello from alice"), - )?; + my_replica + .hash_and_insert(format!("/{i}"), &alice, format!("{i}: hello from alice")) + .await?; } for i in 0..10 { @@ -1253,13 +1261,17 @@ mod tests { // Test multiple records for the same key let mut my_replica = store.new_replica(myspace.clone())?; - my_replica.hash_and_insert("/cool/path", &alice, "round 1")?; + my_replica + .hash_and_insert("/cool/path", &alice, "round 1") + .await?; let _entry = store .get_exact(myspace.id(), alice.id(), "/cool/path", false)? .unwrap(); // Second let mut my_replica = store.new_replica(myspace.clone())?; - my_replica.hash_and_insert("/cool/path", &alice, "round 2")?; + my_replica + .hash_and_insert("/cool/path", &alice, "round 2") + .await?; let _entry = store .get_exact(myspace.id(), alice.id(), "/cool/path", false)? .unwrap(); @@ -1290,7 +1302,9 @@ mod tests { // insert record from different author let mut my_replica = store.new_replica(myspace.clone())?; - let _entry = my_replica.hash_and_insert("/cool/path", &bob, "bob round 1")?; + let _entry = my_replica + .hash_and_insert("/cool/path", &bob, "bob round 1") + .await?; // Get All by author let entries: Vec<_> = store @@ -1392,20 +1406,21 @@ mod tests { Ok(()) } - #[test] - fn test_content_hashes_iterator_memory() -> Result<()> { + #[tokio::test] + async fn test_content_hashes_iterator_memory() -> Result<()> { let store = store::Store::memory(); - test_content_hashes_iterator(store) + test_content_hashes_iterator(store).await } - #[test] - fn test_content_hashes_iterator_fs() -> Result<()> { + #[tokio::test] + #[cfg(feature = "fs-store")] + async fn test_content_hashes_iterator_fs() -> Result<()> { let dbfile = tempfile::NamedTempFile::new()?; let store = store::fs::Store::persistent(dbfile.path())?; - test_content_hashes_iterator(store) + test_content_hashes_iterator(store).await } - fn test_content_hashes_iterator(mut store: Store) -> Result<()> { + async fn test_content_hashes_iterator(mut store: Store) -> Result<()> { let mut rng = rand::rng(); let mut expected = HashSet::new(); let n_replicas = 3; @@ -1417,7 +1432,7 @@ mod tests { for j in 0..n_entries { let key = format!("{j}"); let data = format!("{i}:{j}"); - let hash = replica.hash_and_insert(key, &author, data)?; + let hash = replica.hash_and_insert(key, &author, data).await?; expected.insert(hash); } } @@ -1517,23 +1532,24 @@ mod tests { } } - #[test] - fn test_timestamps_memory() -> Result<()> { + #[tokio::test] + async fn test_timestamps_memory() -> Result<()> { let store = store::Store::memory(); - test_timestamps(store)?; + test_timestamps(store).await?; Ok(()) } - #[test] - fn test_timestamps_fs() -> Result<()> { + #[tokio::test] + #[cfg(feature = "fs-store")] + async fn test_timestamps_fs() -> Result<()> { let dbfile = tempfile::NamedTempFile::new()?; let store = store::fs::Store::persistent(dbfile.path())?; - test_timestamps(store)?; + test_timestamps(store).await?; Ok(()) } - fn test_timestamps(mut store: Store) -> Result<()> { + async fn test_timestamps(mut store: Store) -> Result<()> { let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(1); let namespace = NamespaceSecret::new(&mut rng); let _replica = store.new_replica(namespace.clone())?; @@ -1552,6 +1568,7 @@ mod tests { replica .insert_entry(entry.clone(), InsertOrigin::Local) + .await .unwrap(); store.close_replica(namespace.id()); let res = store @@ -1567,7 +1584,7 @@ mod tests { }; let mut replica = store.open_replica(&namespace.id())?; - let res = replica.insert_entry(entry2, InsertOrigin::Local); + let res = replica.insert_entry(entry2, InsertOrigin::Local).await; store.close_replica(namespace.id()); assert!(matches!(res, Err(InsertError::NewerEntryExists))); let res = store @@ -1588,6 +1605,7 @@ mod tests { } #[tokio::test] + #[cfg(feature = "fs-store")] async fn test_replica_sync_fs() -> Result<()> { let alice_dbfile = tempfile::NamedTempFile::new()?; let alice_store = store::fs::Store::persistent(alice_dbfile.path())?; @@ -1607,12 +1625,12 @@ mod tests { let myspace = NamespaceSecret::new(&mut rng); let mut alice = alice_store.new_replica(myspace.clone())?; for el in &alice_set { - alice.hash_and_insert(el, &author, el.as_bytes())?; + alice.hash_and_insert(el, &author, el.as_bytes()).await?; } let mut bob = bob_store.new_replica(myspace.clone())?; for el in &bob_set { - bob.hash_and_insert(el, &author, el.as_bytes())?; + bob.hash_and_insert(el, &author, el.as_bytes()).await?; } let (alice_out, bob_out) = sync(&mut alice, &mut bob).await?; @@ -1641,6 +1659,7 @@ mod tests { } #[tokio::test] + #[cfg(feature = "fs-store")] async fn test_replica_timestamp_sync_fs() -> Result<()> { let alice_dbfile = tempfile::NamedTempFile::new()?; let alice_store = store::fs::Store::persistent(alice_dbfile.path())?; @@ -1664,9 +1683,9 @@ mod tests { let key = b"key"; let alice_value = b"alice"; let bob_value = b"bob"; - let _alice_hash = alice.hash_and_insert(key, &author, alice_value)?; + let _alice_hash = alice.hash_and_insert(key, &author, alice_value).await?; // system time increased - sync should overwrite - let bob_hash = bob.hash_and_insert(key, &author, bob_value)?; + let bob_hash = bob.hash_and_insert(key, &author, bob_value).await?; sync(&mut alice, &mut bob).await?; assert_eq!( get_content_hash(&mut alice_store, namespace.id(), author.id(), key)?, @@ -1682,8 +1701,8 @@ mod tests { let alice_value_2 = b"alice2"; // system time increased - sync should overwrite - let _bob_hash_2 = bob.hash_and_insert(key, &author, bob_value)?; - let alice_hash_2 = alice.hash_and_insert(key, &author, alice_value_2)?; + let _bob_hash_2 = bob.hash_and_insert(key, &author, bob_value).await?; + let alice_hash_2 = alice.hash_and_insert(key, &author, alice_value_2).await?; sync(&mut alice, &mut bob).await?; assert_eq!( get_content_hash(&mut alice_store, namespace.id(), author.id(), key)?, @@ -1698,8 +1717,8 @@ mod tests { Ok(()) } - #[test] - fn test_future_timestamp() -> Result<()> { + #[tokio::test] + async fn test_future_timestamp() -> Result<()> { let mut rng = rand::rng(); let mut store = store::Store::memory(); let author = Author::new(&mut rng); @@ -1710,7 +1729,9 @@ mod tests { let t = system_time_now(); let record = Record::from_data(b"1", t); let entry0 = SignedEntry::from_parts(&namespace, &author, key, record); - replica.insert_entry(entry0.clone(), InsertOrigin::Local)?; + replica + .insert_entry(entry0.clone(), InsertOrigin::Local) + .await?; assert_eq!( get_entry(&mut store, namespace.id(), author.id(), key)?, @@ -1721,7 +1742,9 @@ mod tests { let t = system_time_now() + MAX_TIMESTAMP_FUTURE_SHIFT - 10000; let record = Record::from_data(b"2", t); let entry1 = SignedEntry::from_parts(&namespace, &author, key, record); - replica.insert_entry(entry1.clone(), InsertOrigin::Local)?; + replica + .insert_entry(entry1.clone(), InsertOrigin::Local) + .await?; assert_eq!( get_entry(&mut store, namespace.id(), author.id(), key)?, entry1 @@ -1731,7 +1754,9 @@ mod tests { let t = system_time_now() + MAX_TIMESTAMP_FUTURE_SHIFT; let record = Record::from_data(b"2", t); let entry2 = SignedEntry::from_parts(&namespace, &author, key, record); - replica.insert_entry(entry2.clone(), InsertOrigin::Local)?; + replica + .insert_entry(entry2.clone(), InsertOrigin::Local) + .await?; assert_eq!( get_entry(&mut store, namespace.id(), author.id(), key)?, entry2 @@ -1741,7 +1766,7 @@ mod tests { let t = system_time_now() + MAX_TIMESTAMP_FUTURE_SHIFT + 10000; let record = Record::from_data(b"2", t); let entry3 = SignedEntry::from_parts(&namespace, &author, key, record); - let res = replica.insert_entry(entry3, InsertOrigin::Local); + let res = replica.insert_entry(entry3, InsertOrigin::Local).await; assert!(matches!( res, Err(InsertError::Validation( @@ -1756,42 +1781,43 @@ mod tests { Ok(()) } - #[test] - fn test_insert_empty() -> Result<()> { + #[tokio::test] + async fn test_insert_empty() -> Result<()> { let mut store = store::Store::memory(); let mut rng = rand::rng(); let alice = Author::new(&mut rng); let myspace = NamespaceSecret::new(&mut rng); let mut replica = store.new_replica(myspace.clone())?; let hash = Hash::new(b""); - let res = replica.insert(b"foo", &alice, hash, 0); + let res = replica.insert(b"foo", &alice, hash, 0).await; assert!(matches!(res, Err(InsertError::EntryIsEmpty))); store.flush()?; Ok(()) } - #[test] - fn test_prefix_delete_memory() -> Result<()> { + #[tokio::test] + async fn test_prefix_delete_memory() -> Result<()> { let store = store::Store::memory(); - test_prefix_delete(store)?; + test_prefix_delete(store).await?; Ok(()) } - #[test] - fn test_prefix_delete_fs() -> Result<()> { + #[tokio::test] + #[cfg(feature = "fs-store")] + async fn test_prefix_delete_fs() -> Result<()> { let dbfile = tempfile::NamedTempFile::new()?; let store = store::fs::Store::persistent(dbfile.path())?; - test_prefix_delete(store)?; + test_prefix_delete(store).await?; Ok(()) } - fn test_prefix_delete(mut store: Store) -> Result<()> { + async fn test_prefix_delete(mut store: Store) -> Result<()> { let mut rng = rand::rng(); let alice = Author::new(&mut rng); let myspace = NamespaceSecret::new(&mut rng); let mut replica = store.new_replica(myspace.clone())?; - let hash1 = replica.hash_and_insert(b"foobar", &alice, b"hello")?; - let hash2 = replica.hash_and_insert(b"fooboo", &alice, b"world")?; + let hash1 = replica.hash_and_insert(b"foobar", &alice, b"hello").await?; + let hash2 = replica.hash_and_insert(b"fooboo", &alice, b"world").await?; // sanity checks assert_eq!( @@ -1805,7 +1831,7 @@ mod tests { // delete let mut replica = store.new_replica(myspace.clone())?; - let deleted = replica.delete_prefix(b"foo", &alice)?; + let deleted = replica.delete_prefix(b"foo", &alice).await?; assert_eq!(deleted, 2); assert_eq!( store.get_exact(myspace.id(), alice.id(), b"foobar", false)?, @@ -1832,6 +1858,7 @@ mod tests { } #[tokio::test] + #[cfg(feature = "fs-store")] async fn test_replica_sync_delete_fs() -> Result<()> { let alice_dbfile = tempfile::NamedTempFile::new()?; let alice_store = store::fs::Store::persistent(alice_dbfile.path())?; @@ -1849,12 +1876,12 @@ mod tests { let myspace = NamespaceSecret::new(&mut rng); let mut alice = alice_store.new_replica(myspace.clone())?; for el in &alice_set { - alice.hash_and_insert(el, &author, el.as_bytes())?; + alice.hash_and_insert(el, &author, el.as_bytes()).await?; } let mut bob = bob_store.new_replica(myspace.clone())?; for el in &bob_set { - bob.hash_and_insert(el, &author, el.as_bytes())?; + bob.hash_and_insert(el, &author, el.as_bytes()).await?; } sync(&mut alice, &mut bob).await?; @@ -1866,8 +1893,9 @@ mod tests { let mut alice = alice_store.new_replica(myspace.clone())?; let mut bob = bob_store.new_replica(myspace.clone())?; - alice.delete_prefix("foo", &author)?; - bob.hash_and_insert("fooz", &author, "fooz".as_bytes())?; + alice.delete_prefix("foo", &author).await?; + bob.hash_and_insert("fooz", &author, "fooz".as_bytes()) + .await?; sync(&mut alice, &mut bob).await?; check_entries(&mut alice_store, &myspace.id(), &author, &["fog", "fooz"])?; check_entries(&mut bob_store, &myspace.id(), &author, &["fog", "fooz"])?; @@ -1876,27 +1904,28 @@ mod tests { Ok(()) } - #[test] - fn test_replica_remove_memory() -> Result<()> { + #[tokio::test] + async fn test_replica_remove_memory() -> Result<()> { let alice_store = store::Store::memory(); - test_replica_remove(alice_store) + test_replica_remove(alice_store).await } - #[test] - fn test_replica_remove_fs() -> Result<()> { + #[tokio::test] + #[cfg(feature = "fs-store")] + async fn test_replica_remove_fs() -> Result<()> { let alice_dbfile = tempfile::NamedTempFile::new()?; let alice_store = store::fs::Store::persistent(alice_dbfile.path())?; - test_replica_remove(alice_store) + test_replica_remove(alice_store).await } - fn test_replica_remove(mut store: Store) -> Result<()> { + async fn test_replica_remove(mut store: Store) -> Result<()> { let mut rng = rand::rng(); let namespace = NamespaceSecret::new(&mut rng); let author = Author::new(&mut rng); let mut replica = store.new_replica(namespace.clone())?; // insert entry - let hash = replica.hash_and_insert(b"foo", &author, b"bar")?; + let hash = replica.hash_and_insert(b"foo", &author, b"bar").await?; let res = store .get_many(namespace.id(), Query::all())? .collect::>(); @@ -1919,7 +1948,7 @@ mod tests { // may recreate replica let mut replica = store.new_replica(namespace.clone())?; - replica.insert(b"foo", &author, hash, 3)?; + replica.insert(b"foo", &author, hash, 3).await?; let res = store .get_many(namespace.id(), Query::all())? .collect::>(); @@ -1928,20 +1957,21 @@ mod tests { Ok(()) } - #[test] - fn test_replica_delete_edge_cases_memory() -> Result<()> { + #[tokio::test] + async fn test_replica_delete_edge_cases_memory() -> Result<()> { let store = store::Store::memory(); - test_replica_delete_edge_cases(store) + test_replica_delete_edge_cases(store).await } - #[test] - fn test_replica_delete_edge_cases_fs() -> Result<()> { + #[tokio::test] + #[cfg(feature = "fs-store")] + async fn test_replica_delete_edge_cases_fs() -> Result<()> { let dbfile = tempfile::NamedTempFile::new()?; let store = store::fs::Store::persistent(dbfile.path())?; - test_replica_delete_edge_cases(store) + test_replica_delete_edge_cases(store).await } - fn test_replica_delete_edge_cases(mut store: Store) -> Result<()> { + async fn test_replica_delete_edge_cases(mut store: Store) -> Result<()> { let mut rng = rand::rng(); let author = Author::new(&mut rng); let namespace = NamespaceSecret::new(&mut rng); @@ -1956,23 +1986,23 @@ mod tests { for suffix in edgecases { let key = [prefix, suffix].to_vec(); expected.push(key.clone()); - replica.insert(&key, &author, hash, len)?; + replica.insert(&key, &author, hash, len).await?; } assert_keys(&mut store, namespace.id(), expected); let mut replica = store.new_replica(namespace.clone())?; - replica.delete_prefix([prefix], &author)?; + replica.delete_prefix([prefix], &author).await?; assert_keys(&mut store, namespace.id(), vec![]); } let mut replica = store.new_replica(namespace.clone())?; let key = vec![1u8, 0u8]; - replica.insert(key, &author, hash, len)?; + replica.insert(key, &author, hash, len).await?; let key = vec![1u8, 1u8]; - replica.insert(key, &author, hash, len)?; + replica.insert(key, &author, hash, len).await?; let key = vec![1u8, 2u8]; - replica.insert(key, &author, hash, len)?; + replica.insert(key, &author, hash, len).await?; let prefix = vec![1u8, 1u8]; - replica.delete_prefix(prefix, &author)?; + replica.delete_prefix(prefix, &author).await?; assert_keys( &mut store, namespace.id(), @@ -1981,11 +2011,11 @@ mod tests { let mut replica = store.new_replica(namespace.clone())?; let key = vec![0u8, 255u8]; - replica.insert(key, &author, hash, len)?; + replica.insert(key, &author, hash, len).await?; let key = vec![0u8, 0u8]; - replica.insert(key, &author, hash, len)?; + replica.insert(key, &author, hash, len).await?; let prefix = vec![0u8]; - replica.delete_prefix(prefix, &author)?; + replica.delete_prefix(prefix, &author).await?; assert_keys( &mut store, namespace.id(), @@ -1995,27 +2025,28 @@ mod tests { Ok(()) } - #[test] - fn test_latest_iter_memory() -> Result<()> { + #[tokio::test] + async fn test_latest_iter_memory() -> Result<()> { let store = store::Store::memory(); - test_latest_iter(store) + test_latest_iter(store).await } - #[test] - fn test_latest_iter_fs() -> Result<()> { + #[tokio::test] + #[cfg(feature = "fs-store")] + async fn test_latest_iter_fs() -> Result<()> { let dbfile = tempfile::NamedTempFile::new()?; let store = store::fs::Store::persistent(dbfile.path())?; - test_latest_iter(store) + test_latest_iter(store).await } - fn test_latest_iter(mut store: Store) -> Result<()> { + async fn test_latest_iter(mut store: Store) -> Result<()> { let mut rng = rand::rng(); let author0 = Author::new(&mut rng); let author1 = Author::new(&mut rng); let namespace = NamespaceSecret::new(&mut rng); let mut replica = store.new_replica(namespace.clone())?; - replica.hash_and_insert(b"a0.1", &author0, b"hi")?; + replica.hash_and_insert(b"a0.1", &author0, b"hi").await?; let latest = store .get_latest_for_each_author(namespace.id())? .collect::>>()?; @@ -2023,8 +2054,8 @@ mod tests { assert_eq!(latest[0].2, b"a0.1".to_vec()); let mut replica = store.new_replica(namespace.clone())?; - replica.hash_and_insert(b"a1.1", &author1, b"hi")?; - replica.hash_and_insert(b"a0.2", &author0, b"hi")?; + replica.hash_and_insert(b"a1.1", &author1, b"hi").await?; + replica.hash_and_insert(b"a0.2", &author0, b"hi").await?; let latest = store .get_latest_for_each_author(namespace.id())? .collect::>>()?; @@ -2035,24 +2066,25 @@ mod tests { Ok(()) } - #[test] - fn test_replica_byte_keys_memory() -> Result<()> { + #[tokio::test] + async fn test_replica_byte_keys_memory() -> Result<()> { let store = store::Store::memory(); - test_replica_byte_keys(store)?; + test_replica_byte_keys(store).await?; Ok(()) } - #[test] - fn test_replica_byte_keys_fs() -> Result<()> { + #[tokio::test] + #[cfg(feature = "fs-store")] + async fn test_replica_byte_keys_fs() -> Result<()> { let dbfile = tempfile::NamedTempFile::new()?; let store = store::fs::Store::persistent(dbfile.path())?; - test_replica_byte_keys(store)?; + test_replica_byte_keys(store).await?; Ok(()) } - fn test_replica_byte_keys(mut store: Store) -> Result<()> { + async fn test_replica_byte_keys(mut store: Store) -> Result<()> { let mut rng = rand::rng(); let author = Author::new(&mut rng); let namespace = NamespaceSecret::new(&mut rng); @@ -2062,11 +2094,11 @@ mod tests { let key = vec![1u8, 0u8]; let mut replica = store.new_replica(namespace.clone())?; - replica.insert(key, &author, hash, len)?; + replica.insert(key, &author, hash, len).await?; assert_keys(&mut store, namespace.id(), vec![vec![1u8, 0u8]]); let key = vec![1u8, 2u8]; let mut replica = store.new_replica(namespace.clone())?; - replica.insert(key, &author, hash, len)?; + replica.insert(key, &author, hash, len).await?; assert_keys( &mut store, namespace.id(), @@ -2075,7 +2107,7 @@ mod tests { let key = vec![0u8, 255u8]; let mut replica = store.new_replica(namespace.clone())?; - replica.insert(key, &author, hash, len)?; + replica.insert(key, &author, hash, len).await?; assert_keys( &mut store, namespace.id(), @@ -2085,21 +2117,22 @@ mod tests { Ok(()) } - #[test] - fn test_replica_capability_memory() -> Result<()> { + #[tokio::test] + async fn test_replica_capability_memory() -> Result<()> { let store = store::Store::memory(); - test_replica_capability(store) + test_replica_capability(store).await } - #[test] - fn test_replica_capability_fs() -> Result<()> { + #[tokio::test] + #[cfg(feature = "fs-store")] + async fn test_replica_capability_fs() -> Result<()> { let dbfile = tempfile::NamedTempFile::new()?; let store = store::fs::Store::persistent(dbfile.path())?; - test_replica_capability(store) + test_replica_capability(store).await } #[allow(clippy::redundant_pattern_matching)] - fn test_replica_capability(mut store: Store) -> Result<()> { + async fn test_replica_capability(mut store: Store) -> Result<()> { let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(1); let author = store.new_author(&mut rng)?; let namespace = NamespaceSecret::new(&mut rng); @@ -2108,18 +2141,18 @@ mod tests { let capability = Capability::Read(namespace.id()); store.import_namespace(capability)?; let mut replica = store.open_replica(&namespace.id())?; - let res = replica.hash_and_insert(b"foo", &author, b"bar"); + let res = replica.hash_and_insert(b"foo", &author, b"bar").await; assert!(matches!(res, Err(InsertError::ReadOnly))); // import write capability - insert must succeed let capability = Capability::Write(namespace.clone()); store.import_namespace(capability)?; let mut replica = store.open_replica(&namespace.id())?; - let res = replica.hash_and_insert(b"foo", &author, b"bar"); + let res = replica.hash_and_insert(b"foo", &author, b"bar").await; assert!(matches!(res, Ok(_))); store.close_replica(namespace.id()); let mut replica = store.open_replica(&namespace.id())?; - let res = replica.hash_and_insert(b"foo", &author, b"bar"); + let res = replica.hash_and_insert(b"foo", &author, b"bar").await; assert!(res.is_ok()); // import read capability again - insert must still succeed @@ -2127,7 +2160,7 @@ mod tests { store.import_namespace(capability)?; store.close_replica(namespace.id()); let mut replica = store.open_replica(&namespace.id())?; - let res = replica.hash_and_insert(b"foo", &author, b"bar"); + let res = replica.hash_and_insert(b"foo", &author, b"bar").await; assert!(res.is_ok()); store.flush()?; Ok(()) @@ -2140,6 +2173,7 @@ mod tests { } #[tokio::test] + #[cfg(feature = "fs-store")] async fn test_actor_capability_fs() -> Result<()> { let dbfile = tempfile::NamedTempFile::new()?; let store = store::fs::Store::persistent(dbfile.path())?; @@ -2217,7 +2251,7 @@ mod tests { replica1.info.subscribe(events1_sender); replica2.info.subscribe(events2_sender); - replica1.hash_and_insert(b"foo", &author, b"init")?; + replica1.hash_and_insert(b"foo", &author, b"init").await?; let from1 = replica1.sync_initial_message()?; let from2 = replica2 @@ -2233,7 +2267,7 @@ mod tests { // now we will receive the entry from rpelica1. we will insert a newer entry now, while the // sync is already running. this means the entry from replica1 will be rejected. we make // sure that no InsertRemote event is emitted for this entry. - replica2.hash_and_insert(b"foo", &author, b"update")?; + replica2.hash_and_insert(b"foo", &author, b"update").await?; let from2 = replica2 .sync_process_message(from1, peer1, &mut state2) .await @@ -2254,24 +2288,25 @@ mod tests { Ok(()) } - #[test] - fn test_replica_queries_mem() -> Result<()> { + #[tokio::test] + async fn test_replica_queries_mem() -> Result<()> { let store = store::Store::memory(); - test_replica_queries(store)?; + test_replica_queries(store).await?; Ok(()) } - #[test] - fn test_replica_queries_fs() -> Result<()> { + #[tokio::test] + #[cfg(feature = "fs-store")] + async fn test_replica_queries_fs() -> Result<()> { let dbfile = tempfile::NamedTempFile::new()?; let store = store::fs::Store::persistent(dbfile.path())?; - test_replica_queries(store)?; + test_replica_queries(store).await?; Ok(()) } - fn test_replica_queries(mut store: Store) -> Result<()> { + async fn test_replica_queries(mut store: Store) -> Result<()> { let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(1); let namespace = NamespaceSecret::new(&mut rng); let namespace_id = namespace.id(); @@ -2287,10 +2322,10 @@ mod tests { ); let mut replica = store.new_replica(namespace.clone())?; - replica.hash_and_insert("hi/world", &a2, "a2")?; - replica.hash_and_insert("hi/world", &a1, "a1")?; - replica.hash_and_insert("hi/moon", &a2, "a1")?; - replica.hash_and_insert("hi", &a3, "a3")?; + replica.hash_and_insert("hi/world", &a2, "a2").await?; + replica.hash_and_insert("hi/world", &a1, "a1").await?; + replica.hash_and_insert("hi/moon", &a2, "a1").await?; + replica.hash_and_insert("hi", &a3, "a3").await?; struct QueryTester<'a> { store: &'a mut Store, @@ -2420,7 +2455,7 @@ mod tests { ); let mut replica = store.new_replica(namespace)?; - replica.delete_prefix("hi/world", &a2)?; + replica.delete_prefix("hi/world", &a2).await?; let mut qt = QueryTester { store: &mut store, namespace: namespace_id, @@ -2451,6 +2486,7 @@ mod tests { } #[test] + #[cfg(feature = "fs-store")] fn test_dl_policies_fs() -> Result<()> { let dbfile = tempfile::NamedTempFile::new()?; let mut store = store::fs::Store::persistent(dbfile.path())?; diff --git a/tests/client.rs b/tests/client.rs index ed93a949..6ba45c6b 100644 --- a/tests/client.rs +++ b/tests/client.rs @@ -31,7 +31,7 @@ async fn test_doc_close() -> Result<()> { // dropping doc1 will close the doc if not already closed // wait a bit because the close-on-drop spawns a task for which we cannot track completion. drop(doc1); - tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + n0_future::time::sleep(n0_future::time::Duration::from_millis(100)).await; // operations on doc2 still succeed doc2.set_bytes(author, "foo", "bar").await?; @@ -167,6 +167,7 @@ async fn test_default_author_memory() -> Result<()> { #[tokio::test] #[traced_test] +#[cfg(feature = "fs-store")] async fn test_default_author_persist() -> TestResult<()> { let iroh_root_dir = tempfile::TempDir::new()?; let iroh_root = iroh_root_dir.path(); @@ -216,7 +217,7 @@ async fn test_default_author_persist() -> TestResult<()> { // somehow the blob store is not shutdown correctly (yet?) on macos. // so we give it some time until we find a proper fix. #[cfg(target_os = "macos")] - tokio::time::sleep(std::time::Duration::from_secs(1)).await; + n0_future::time::sleep(std::time::Duration::from_secs(1)).await; tokio::fs::remove_file(iroh_root.join("default-author")).await?; drop(iroh); diff --git a/tests/gc.rs b/tests/gc.rs index df726a75..0f28e251 100644 --- a/tests/gc.rs +++ b/tests/gc.rs @@ -1,9 +1,12 @@ -use std::{path::PathBuf, time::Duration}; +#![allow(unused)] + +use std::path::PathBuf; use anyhow::Result; use bytes::Bytes; use futures_lite::StreamExt; use iroh_blobs::api::blobs::ImportMode; +use n0_future::time::Duration; use rand::RngCore; use testdir::testdir; use util::Node; @@ -18,6 +21,7 @@ pub fn create_test_data(size: usize) -> Bytes { } /// Wrap a bao store in a node that has gc enabled. +#[cfg(feature = "fs-store")] async fn persistent_node( path: PathBuf, gc_period: Duration, @@ -35,6 +39,7 @@ async fn persistent_node( } #[tokio::test] +#[cfg(feature = "fs-store")] async fn redb_doc_import_stress() -> Result<()> { let _ = tracing_subscriber::fmt::try_init(); let dir = testdir!(); diff --git a/tests/sync.rs b/tests/sync.rs index 32304fc7..dc10d021 100644 --- a/tests/sync.rs +++ b/tests/sync.rs @@ -1,9 +1,4 @@ -use std::{ - collections::HashMap, - future::Future, - sync::Arc, - time::{Duration, Instant}, -}; +use std::{collections::HashMap, future::Future, sync::Arc}; use anyhow::{anyhow, bail, Context, Result}; use bytes::Bytes; @@ -20,7 +15,9 @@ use iroh_docs::{ store::{DownloadPolicy, FilterKind, Query}, AuthorId, ContentStatus, Entry, }; +use n0_future::time::{Duration, Instant}; use rand::{CryptoRng, Rng, SeedableRng}; +#[cfg(feature = "fs-store")] use tempfile::tempdir; use tracing::{debug, error_span, info, Instrument}; use tracing_test::traced_test; @@ -140,7 +137,7 @@ async fn sync_subscribe_no_sync() -> Result<()> { let mut sub = doc.subscribe().await?; let author = client.docs().author_create().await?; doc.set_bytes(author, b"k".to_vec(), b"v".to_vec()).await?; - let event = tokio::time::timeout(Duration::from_millis(100), sub.next()).await?; + let event = n0_future::time::timeout(Duration::from_millis(100), sub.next()).await?; assert!( matches!(event, Some(Ok(LiveEvent::InsertLocal { .. }))), "expected InsertLocal but got {event:?}" @@ -588,6 +585,7 @@ async fn test_sync_via_relay() -> Result<()> { #[tokio::test] #[traced_test] #[ignore = "flaky"] +#[cfg(feature = "fs-store")] async fn sync_restart_node() -> Result<()> { let mut rng = test_rng(b"sync_restart_node"); let (relay_map, _relay_url, _guard) = iroh::test_utils::run_relay_server().await?; @@ -657,7 +655,7 @@ async fn sync_restart_node() -> Result<()> { info!(me = %id1.fmt_short(), "node1 down"); info!(me = %id1.fmt_short(), "sleep 1s"); - tokio::time::sleep(Duration::from_secs(1)).await; + n0_future::time::sleep(Duration::from_secs(1)).await; info!(me = %id2.fmt_short(), "node2 set b"); let hash_b = doc2.set_bytes(author2, "n2/b", "b").await?; @@ -843,7 +841,7 @@ async fn test_download_policies() -> Result<()> { (downloaded_a, downloaded_b) }; - let (downloaded_a, mut downloaded_b) = tokio::time::timeout(TIMEOUT, fut) + let (downloaded_a, mut downloaded_b) = n0_future::time::timeout(TIMEOUT, fut) .await .context("timeout elapsed")?; @@ -874,7 +872,7 @@ async fn sync_big() -> Result<()> { tokio::task::spawn(async move { for i in 0.. { - tokio::time::sleep(Duration::from_secs(1)).await; + n0_future::time::sleep(Duration::from_secs(1)).await; info!("tick {i}"); } }); @@ -1012,7 +1010,7 @@ async fn test_list_docs_stream() -> testresult::TestResult<()> { } }; - tokio::time::timeout(Duration::from_secs(2), fut) + n0_future::time::timeout(Duration::from_secs(2), fut) .await .expect("not to timeout"); @@ -1081,7 +1079,7 @@ async fn wait_for_events( matcher: impl Fn(&LiveEvent) -> bool, ) -> anyhow::Result> { let mut res = Vec::with_capacity(count); - let sleep = tokio::time::sleep(timeout); + let sleep = n0_future::time::sleep(timeout); tokio::pin!(sleep); while res.len() < count { tokio::select! { @@ -1160,6 +1158,7 @@ impl PartialEq for (Entry, Bytes) { #[tokio::test] #[traced_test] +#[cfg(feature = "fs-store")] async fn doc_delete() -> Result<()> { let tempdir = tempdir()?; // TODO(Frando): iroh-blobs has gc only for fs store atm, change test to test both @@ -1184,7 +1183,7 @@ async fn doc_delete() -> Result<()> { // wait for gc // TODO: allow to manually trigger gc - tokio::time::sleep(Duration::from_secs(2)).await; + n0_future::time::sleep(Duration::from_secs(2)).await; let bytes = client.blobs().get_bytes(hash).await; assert!(bytes.is_err()); node.shutdown().await?; @@ -1289,7 +1288,7 @@ async fn assert_next( } items }; - let res = tokio::time::timeout(timeout, fut).await; + let res = n0_future::time::timeout(timeout, fut).await; res.expect("timeout reached") } @@ -1357,7 +1356,7 @@ async fn assert_next_unordered_with_optionals( Ok(()) }; tokio::pin!(fut); - let res = tokio::time::timeout(timeout, fut) + let res = n0_future::time::timeout(timeout, fut) .await .map_err(|_| anyhow!("Timeout reached ({timeout:?})")) .and_then(|res| res); diff --git a/tests/util.rs b/tests/util.rs index a16204fd..28464af3 100644 --- a/tests/util.rs +++ b/tests/util.rs @@ -1,4 +1,5 @@ -#![allow(dead_code)] +#![allow(unused)] + use std::{ net::{Ipv4Addr, Ipv6Addr, SocketAddrV4, SocketAddrV6}, ops::Deref, @@ -6,7 +7,7 @@ use std::{ }; use iroh::{discovery::IntoDiscovery, dns::DnsResolver, EndpointId, RelayMode, SecretKey}; -use iroh_blobs::store::{fs::options::Options, GcConfig}; +use iroh_blobs::store::GcConfig; use iroh_docs::{engine::ProtectCallbackHandler, protocol::Docs}; use iroh_gossip::net::Gossip; @@ -64,9 +65,9 @@ impl Client { pub struct Builder { endpoint: iroh::endpoint::Builder, use_n0_discovery: bool, - path: Option, + storage: Storage, // node_discovery: Option>, - gc_interval: Option, + gc_interval: Option, #[debug(skip)] register_gc_done_cb: Option>, bind_random_port: bool, @@ -97,9 +98,10 @@ impl Builder { let endpoint = builder.bind().await?; let mut router = iroh::protocol::Router::builder(endpoint.clone()); let gossip = Gossip::builder().spawn(endpoint.clone()); - let mut docs_builder = match self.path { - Some(ref path) => Docs::persistent(path.to_path_buf()), - None => Docs::memory(), + let mut docs_builder = match self.storage { + Storage::Memory => Docs::memory(), + #[cfg(feature = "fs-store")] + Storage::Persistent(ref path) => Docs::persistent(path.to_path_buf()), }; if let Some(protect_cb) = protect_cb { docs_builder = docs_builder.protect_handler(protect_cb); @@ -163,7 +165,7 @@ impl Builder { self } - pub fn gc_interval(mut self, value: Option) -> Self { + pub fn gc_interval(mut self, value: Option) -> Self { self.gc_interval = value; self } @@ -183,11 +185,11 @@ impl Builder { self } - fn new(path: Option) -> Self { + fn new(storage: Storage) -> Self { Self { endpoint: iroh::Endpoint::empty_builder(RelayMode::Disabled), use_n0_discovery: true, - path, + storage, gc_interval: None, bind_random_port: false, // node_discovery: None, @@ -197,30 +199,38 @@ impl Builder { } } +#[derive(Debug)] +enum Storage { + Memory, + #[cfg(feature = "fs-store")] + Persistent(PathBuf), +} + impl Node { /// Creates a new node with memory storage pub fn memory() -> Builder { - Builder::new(None) + Builder::new(Storage::Memory) } /// Creates a new node with persistent storage + #[cfg(feature = "fs-store")] pub fn persistent(path: impl AsRef) -> Builder { - let path = Some(path.as_ref().to_owned()); - Builder::new(path) + Builder::new(Storage::Persistent(path.as_ref().to_owned())) } } impl Builder { /// Spawns the node pub async fn spawn(self) -> anyhow::Result { - let (store, protect_handler) = match self.path { - None => { + let (store, protect_handler) = match self.storage { + Storage::Memory => { let store = iroh_blobs::store::mem::MemStore::new(); ((*store).clone(), None) } - Some(ref path) => { + #[cfg(feature = "fs-store")] + Storage::Persistent(ref path) => { let db_path = path.join("blobs.db"); - let mut opts = Options::new(path); + let mut opts = iroh_blobs::store::fs::options::Options::new(path); let protect_handler = if let Some(interval) = self.gc_interval { let (handler, cb) = ProtectCallbackHandler::new(); opts.gc = Some(GcConfig {