diff --git a/Cargo.lock b/Cargo.lock index d68b3517a..3c02cae60 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -684,9 +684,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.5.50" +version = "4.5.51" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0c2cfd7bf8a6017ddaa4e32ffe7403d547790db06bd171c1c53926faab501623" +checksum = "4c26d721170e0295f191a69bd9a1f93efcdb0aff38684b61ab5750468972e5f5" dependencies = [ "clap_builder", "clap_derive", @@ -694,9 +694,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.5.50" +version = "4.5.51" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0a4c05b9e80c5ccd3a7ef080ad7b6ba7d6fc00a985b8b157197075677c82c7a0" +checksum = "75835f0c7bf681bfd05abe44e965760fea999a5286c6eb2d59883634fd02011a" dependencies = [ "anstream", "anstyle", @@ -1828,6 +1828,7 @@ dependencies = [ "chrono", "clap", "freenet", + "freenet-macros 0.1.0", "freenet-ping-types", "freenet-stdlib", "futures 0.3.31", diff --git a/MACRO_TEST_CONVERSION_STATUS.md b/MACRO_TEST_CONVERSION_STATUS.md deleted file mode 100644 index f44eee4aa..000000000 --- a/MACRO_TEST_CONVERSION_STATUS.md +++ /dev/null @@ -1,192 +0,0 @@ -# Test Conversion Status: `#[freenet_test]` Macro - -## Overview - -This document tracks which integration tests have been converted to use the `#[freenet_test]` procedural macro and explains why certain tests were not converted. - -## Successfully Converted Tests - -### error_notification.rs (3/4 tests converted) - -✅ **Converted:** -1. `test_get_error_notification` - GET operation error notification -2. `test_put_error_notification` - PUT operation error notification -3. `test_update_error_notification` - UPDATE operation error notification - -❌ **Not Converted:** -- `test_connection_drop_error_notification` - Requires custom peer lifecycle management (selective shutdown) - - **Reason:** The macro doesn't support stopping individual nodes mid-test - -### isolated_node_regression.rs (4/4 tests converted) - -✅ **All Converted:** -1. `test_isolated_node_put_get_workflow` - Complete PUT→GET workflow on isolated node -2. `test_concurrent_get_deduplication_race` - Concurrent GET operations -3. `test_isolated_node_local_subscription` - Subscribe operations on isolated node -4. `test_isolated_node_update_operation` - UPDATE operations on isolated nodes - -### test_macro_example.rs (7/7 tests - examples) - -✅ **All Using Macro:** -1. `test_single_node` - Single gateway node -2. `test_multi_node` - Multiple nodes (gateway + 2 peers) -3. `test_with_event_aggregation` - Event aggregation on failure -4. `test_always_aggregate` - Event aggregation always -5. `test_custom_tokio_config` - Custom tokio worker threads -6. `test_current_thread_runtime` - Single-threaded tokio runtime -7. `test_multiple_gateways` - Multiple gateway nodes (NEW!) - -## Tests Not Yet Converted - -### operations.rs (0/11 tests converted) - -**Tests in this file:** -- `test_put_contract` -- `test_update_contract` -- `test_put_merge_persists_state` -- `test_multiple_clients_subscription` -- `test_get_with_subscribe_flag` -- `test_put_with_subscribe_flag` -- `test_delegate_request` -- `test_gateway_packet_size_change_after_60s` -- `test_production_decryption_error_scenario` -- `test_subscription_introspection` -- `test_update_no_change_notification` - -**Why not converted:** -1. **Complex peer-gateway relationships** - Tests set up specific gateway configurations and peer connections -2. **Custom gateway discovery** - Uses `gw_config()` and `base_node_test_config()` helpers with specific network topologies -3. **Multi-phase testing** - Some tests require stopping/starting nodes at specific points -4. **Contract persistence verification** - Tests verify data persists across specific node restarts -5. **Uses `#[test_log::test]`** - Would need to verify macro compatibility - -**Future conversion approach:** -- Macro would need to support: - - Configuring peer gateway lists (which gateways peers should connect to) - - Mid-test node lifecycle control (stop/restart specific nodes) - - Or: Create specialized macros for these patterns (e.g., `#[freenet_network_test]`) - -### connectivity.rs (0/3 tests converted) - -**Tests in this file:** -- `test_gateway_reconnection` -- `test_basic_gateway_connectivity` -- `test_three_node_network_connectivity` - -**Why not converted:** -1. **Network topology testing** - These tests specifically test connection establishment and maintenance -2. **Custom disconnect/reconnect logic** - Tests intentionally disconnect and reconnect nodes -3. **Connection state verification** - Tests verify specific connection counts and states -4. **Requires fine-grained control** - Need to control exactly when nodes connect/disconnect - -**Future conversion approach:** -- Similar to operations.rs, would need lifecycle control features -- Or: Keep as-is since these are network layer tests, not contract operation tests - -### Other Test Files - -- `redb_migration.rs` - Database migration tests (no multi-node setup) -- `token_expiration.rs` - Token config tests (no multi-node setup) -- `ubertest.rs` - Large-scale River app test (too specialized/complex) - -## Conversion Statistics - -- **Total test files analyzed:** 8 -- **Files with converted tests:** 3 -- **Total tests converted:** 7 -- **Total boilerplate lines eliminated:** ~300+ - -## Macro Features Used in Converted Tests - -### Basic Features -- ✅ Single gateway node -- ✅ Multiple peer nodes -- ✅ Multiple gateway nodes (NEW!) -- ✅ Custom timeouts -- ✅ Custom startup wait times -- ✅ Event aggregation (on_failure, always, never) -- ✅ Custom log levels -- ✅ Tokio flavor configuration -- ✅ Tokio worker thread configuration - -### TestContext API Used -- ✅ `ctx.node(label)` - Get specific node -- ✅ `ctx.gateway()` - Get first gateway -- ✅ `ctx.gateways()` - Get all gateways (NEW!) -- ✅ `ctx.peers()` - Get all peers (NEW!) -- ✅ `ctx.node_labels()` - Get all node labels -- ✅ `ctx.event_log_path(label)` - Get node event log path -- ✅ `ctx.aggregate_events()` - Aggregate events from all nodes -- ✅ `ctx.generate_failure_report(error)` - Generate comprehensive failure report -- ✅ `ctx.generate_success_summary()` - Generate success summary - -## Future Enhancements - -### For Conversion of Remaining Tests - -1. **Peer Gateway Configuration** - ```rust - #[freenet_test( - nodes = ["gw-1", "gw-2", "peer-1", "peer-2"], - gateways = ["gw-1", "gw-2"], - peer_gateways = { - "peer-1": ["gw-1"], - "peer-2": ["gw-2"] - } - )] - ``` - -2. **Node Lifecycle Control** - ```rust - async fn test(ctx: &mut TestContext) -> TestResult { - // Start with nodes running - ctx.stop_node("peer-1").await?; - // Test something - ctx.start_node("peer-1").await?; - // Test something else - Ok(()) - } - ``` - -3. **Network State Verification** - ```rust - async fn test(ctx: &mut TestContext) -> TestResult { - let conn_count = ctx.connection_count("gateway").await?; - assert_eq!(conn_count, 2); - Ok(()) - } - ``` - -### Specialized Macros - -For network/connectivity tests, consider creating specialized macros: - -```rust -#[freenet_network_test( - topology = "star", // or "mesh", "chain", etc. - gateway_count = 1, - peer_count = 3, - test_disconnections = true -)] -async fn test(ctx: &mut NetworkTestContext) -> TestResult { - // Network-specific test context with connection control - Ok(()) -} -``` - -## Conclusion - -The `#[freenet_test]` macro successfully handles: -- ✅ Isolated node tests -- ✅ Simple multi-node tests -- ✅ Tests with multiple gateways -- ✅ Tests with custom tokio configurations -- ✅ Tests with event aggregation - -The macro is **not suitable** for: -- ❌ Tests requiring specific peer-gateway connections -- ❌ Tests requiring node lifecycle control (stop/restart) -- ❌ Network topology and connectivity tests -- ❌ Tests with complex custom configurations - -For these cases, the existing test infrastructure remains appropriate and should be kept as-is. diff --git a/apps/freenet-ping/Cargo.lock b/apps/freenet-ping/Cargo.lock index 64dc9b249..340aa1761 100644 --- a/apps/freenet-ping/Cargo.lock +++ b/apps/freenet-ping/Cargo.lock @@ -242,14 +242,14 @@ checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8" [[package]] name = "axum" -version = "0.7.9" +version = "0.8.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "edca88bc138befd0323b20752846e6587272d3b03b0343c8ea28a6f819e6e71f" +checksum = "8a18ed336352031311f4e0b4dd2ff392d4fbb370777c9d18d7fc9d7359f73871" dependencies = [ - "async-trait", "axum-core", "base64 0.22.1", "bytes", + "form_urlencoded", "futures-util", "http", "http-body", @@ -262,15 +262,14 @@ dependencies = [ "mime", "percent-encoding", "pin-project-lite", - "rustversion", - "serde", + "serde_core", "serde_json", "serde_path_to_error", "serde_urlencoded", "sha1", "sync_wrapper", "tokio", - "tokio-tungstenite 0.24.0", + "tokio-tungstenite 0.28.0", "tower", "tower-layer", "tower-service", @@ -278,19 +277,17 @@ dependencies = [ [[package]] name = "axum-core" -version = "0.4.5" +version = "0.5.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "09f2bd6146b97ae3359fa0cc6d6b376d9539582c7b4220f041a33ec24c226199" +checksum = "59446ce19cd142f8833f856eb31f3eb097812d1479ab224f54d72428ca21ea22" dependencies = [ - "async-trait", "bytes", - "futures-util", + "futures-core", "http", "http-body", "http-body-util", "mime", "pin-project-lite", - "rustversion", "sync_wrapper", "tower-layer", "tower-service", @@ -1188,6 +1185,16 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "eytzinger-interpolation" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "756e203906df34193e72e75d598bbc96dd732160560a28cf9e57611390b58fdb" +dependencies = [ + "nohash-hasher", + "nonmax", +] + [[package]] name = "fallible-iterator" version = "0.3.0" @@ -1274,7 +1281,7 @@ dependencies = [ [[package]] name = "freenet" -version = "0.1.30" +version = "0.1.36" dependencies = [ "aes-gcm", "ahash", @@ -1305,6 +1312,7 @@ dependencies = [ "opentelemetry", "parking_lot", "pav_regression", + "pin-project", "pkcs8", "rand 0.9.2", "redb", @@ -1318,6 +1326,7 @@ dependencies = [ "tar", "thiserror 2.0.12", "tokio", + "tokio-stream", "tokio-tungstenite 0.27.0", "toml", "tower-http", @@ -1332,6 +1341,16 @@ dependencies = [ "xz2", ] +[[package]] +name = "freenet-macros" +version = "0.1.0" +dependencies = [ + "darling", + "proc-macro2", + "quote", + "syn 2.0.106", +] + [[package]] name = "freenet-macros" version = "0.1.1" @@ -1351,6 +1370,7 @@ dependencies = [ "chrono", "clap", "freenet", + "freenet-macros 0.1.0", "freenet-ping-types", "freenet-stdlib", "futures", @@ -1389,9 +1409,9 @@ dependencies = [ [[package]] name = "freenet-stdlib" -version = "0.1.22" +version = "0.1.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6efc8b4956becbe20ee6a437202b0ae2ee1fc2a8a02f8dfb60a427fdf5dcc665" +checksum = "66c64fa03f4a083918c7e347be47122c223d8156f4c012a0fe8e89a643350f2d" dependencies = [ "arbitrary", "bincode", @@ -1400,7 +1420,7 @@ dependencies = [ "byteorder", "chrono", "flatbuffers 24.12.23", - "freenet-macros", + "freenet-macros 0.1.1", "futures", "js-sys", "once_cell", @@ -2071,17 +2091,6 @@ dependencies = [ "generic-array", ] -[[package]] -name = "io-uring" -version = "0.7.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "046fa2d4d00aea763528b4950358d0ead425372445dc8ff86312b3c69ff7727b" -dependencies = [ - "bitflags 2.9.4", - "cfg-if", - "libc", -] - [[package]] name = "ipconfig" version = "0.3.2" @@ -2324,9 +2333,9 @@ dependencies = [ [[package]] name = "matchit" -version = "0.7.3" +version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" +checksum = "47e1ffaa40ddd1f3ed91f717a33c8c0ee23fff369e3aa8772b9605cc1d22f4c3" [[package]] name = "memchr" @@ -2459,6 +2468,12 @@ dependencies = [ "libc", ] +[[package]] +name = "nohash-hasher" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2bf50223579dc7cdcfb3bfcacf7069ff68243f8c363f62ffa99cf000a6b9c451" + [[package]] name = "nom" version = "7.1.3" @@ -2469,6 +2484,12 @@ dependencies = [ "minimal-lexical", ] +[[package]] +name = "nonmax" +version = "0.5.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "610a5acd306ec67f907abe5567859a3c693fb9886eb1f012ab8f2a47bef3db51" + [[package]] name = "notify" version = "8.0.0" @@ -2715,13 +2736,14 @@ checksum = "57c0d7b74b563b49d38dae00a0c37d4d6de9b432382b2892f0574ddcae73fd0a" [[package]] name = "pav_regression" -version = "0.5.2" +version = "0.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d92b9b41adf4f984e4d6a86ceee68a8a729e539e28a75b65b445c300bcb9f3c" +checksum = "8eb0b883df1700344786c158cf21a3cb04974b0b6f98a827dec11cef921cd3d8" dependencies = [ + "eytzinger-interpolation", "ordered-float", "serde", - "thiserror 1.0.69", + "thiserror 2.0.12", ] [[package]] @@ -2739,6 +2761,26 @@ version = "2.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" +[[package]] +name = "pin-project" +version = "1.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "677f1add503faace112b9f1373e43e9e054bfdd22ff1a63c1bc485eaec6a6a8a" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e918e4ff8c4549eb882f14b3a4bc8c8bc93de829416eacf579f1207a8fbf861" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.106", +] + [[package]] name = "pin-project-lite" version = "0.2.16" @@ -3490,10 +3532,11 @@ dependencies = [ [[package]] name = "serde" -version = "1.0.219" +version = "1.0.228" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f0e2c6ed6606019b4e29e69dbaba95b11854410e5347d525002456dbbb786b6" +checksum = "9a8e94ea7f378bd32cbbd37198a4a91436180c5bb472411e48b5ec2e2124ae9e" dependencies = [ + "serde_core", "serde_derive", ] @@ -3517,11 +3560,20 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_core" +version = "1.0.228" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41d385c7d4ca58e59fc732af25c3983b67ac852c1a25000afe1175de458b67ad" +dependencies = [ + "serde_derive", +] + [[package]] name = "serde_derive" -version = "1.0.219" +version = "1.0.228" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b0276cf7f2c73365f7157c8123c21cd9a50fbbd844757af28ca1f5925fc2a00" +checksum = "d540f220d3187173da220f885ab66608367b6574e925011a9353e4badda91d79" dependencies = [ "proc-macro2", "quote", @@ -3979,29 +4031,27 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.47.1" +version = "1.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "89e49afdadebb872d3145a5638b59eb0691ea23e46ca484037cfab3b76b95038" +checksum = "ff360e02eab121e0bc37a2d3b4d4dc622e6eda3a8e5253d5435ecf5bd4c68408" dependencies = [ - "backtrace", "bytes", - "io-uring", "libc", "mio", "parking_lot", "pin-project-lite", "signal-hook-registry", - "slab", "socket2 0.6.0", "tokio-macros", - "windows-sys 0.59.0", + "tracing", + "windows-sys 0.61.2", ] [[package]] name = "tokio-macros" -version = "2.5.0" +version = "2.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6e06d43f1345a3bcd39f6a56dbb7dcab2ba47e68e8ac134855e7e2bdbaf8cab8" +checksum = "af407857209536a95c8e56f8231ef2c2e2aff839b22e07a1ffcbc617e9db9fa5" dependencies = [ "proc-macro2", "quote", @@ -4039,15 +4089,14 @@ dependencies = [ ] [[package]] -name = "tokio-tungstenite" -version = "0.24.0" +name = "tokio-stream" +version = "0.1.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "edc5f74e248dc973e0dbb7b74c7e0d6fcc301c694ff50049504004ef4d0cdcd9" +checksum = "eca58d7bba4a75707817a2c44174253f9236b2d5fbd055602e9d5c07c139a047" dependencies = [ - "futures-util", - "log", + "futures-core", + "pin-project-lite", "tokio", - "tungstenite 0.24.0", ] [[package]] @@ -4062,6 +4111,18 @@ dependencies = [ "tungstenite 0.27.0", ] +[[package]] +name = "tokio-tungstenite" +version = "0.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d25a406cddcc431a75d3d9afc6a7c0f7428d4891dd973e4d54c56b46127bf857" +dependencies = [ + "futures-util", + "log", + "tokio", + "tungstenite 0.28.0", +] + [[package]] name = "tokio-util" version = "0.7.15" @@ -4214,6 +4275,16 @@ dependencies = [ "tracing-core", ] +[[package]] +name = "tracing-serde" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "704b1aeb7be0d0a84fc9828cae51dab5970fee5088f83d1dd7ee6f6246fc6ff1" +dependencies = [ + "serde", + "tracing-core", +] + [[package]] name = "tracing-subscriber" version = "0.3.19" @@ -4224,12 +4295,15 @@ dependencies = [ "nu-ansi-term", "once_cell", "regex", + "serde", + "serde_json", "sharded-slab", "smallvec", "thread_local", "tracing", "tracing-core", "tracing-log", + "tracing-serde", ] [[package]] @@ -4250,27 +4324,26 @@ checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" [[package]] name = "tungstenite" -version = "0.24.0" +version = "0.27.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "18e5b8366ee7a95b16d32197d0b2604b43a0be89dc5fac9f8e96ccafbaedda8a" +checksum = "eadc29d668c91fcc564941132e17b28a7ceb2f3ebf0b9dae3e03fd7a6748eb0d" dependencies = [ - "byteorder", "bytes", "data-encoding", "http", "httparse", "log", - "rand 0.8.5", + "rand 0.9.2", "sha1", - "thiserror 1.0.69", + "thiserror 2.0.12", "utf-8", ] [[package]] name = "tungstenite" -version = "0.27.0" +version = "0.28.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eadc29d668c91fcc564941132e17b28a7ceb2f3ebf0b9dae3e03fd7a6748eb0d" +checksum = "8628dcc84e5a09eb3d8423d6cb682965dea9133204e8fb3efee74c2a0c259442" dependencies = [ "bytes", "data-encoding", @@ -4995,6 +5068,15 @@ dependencies = [ "windows-targets 0.53.2", ] +[[package]] +name = "windows-sys" +version = "0.61.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae137229bcbd6cdf0f7b80a31df61766145077ddf49416a728b02cb3921ff3fc" +dependencies = [ + "windows-link 0.2.1", +] + [[package]] name = "windows-targets" version = "0.48.5" diff --git a/apps/freenet-ping/app/Cargo.toml b/apps/freenet-ping/app/Cargo.toml index ef83d63ae..18fcd94c3 100644 --- a/apps/freenet-ping/app/Cargo.toml +++ b/apps/freenet-ping/app/Cargo.toml @@ -25,6 +25,7 @@ humantime = "2.2.0" [dev-dependencies] freenet = { path = "../../../crates/core" } +freenet-macros = { path = "../../../crates/freenet-macros" } testresult = { workspace = true } [lib] diff --git a/apps/freenet-ping/app/src/ping_client.rs b/apps/freenet-ping/app/src/ping_client.rs index 60e37f6ec..f4aa8df15 100644 --- a/apps/freenet-ping/app/src/ping_client.rs +++ b/apps/freenet-ping/app/src/ping_client.rs @@ -1,7 +1,7 @@ use std::collections::HashMap; use std::time::{Duration, Instant}; -use chrono::{DateTime, Utc}; +use freenet_ping_types::chrono::{DateTime, Utc}; use freenet_ping_types::{Ping, PingContractOptions}; use freenet_stdlib::client_api::{ ClientRequest, ContractRequest, ContractResponse, HostResponse, WebApi, @@ -47,7 +47,7 @@ pub async fn wait_for_put_response( expected_key: &ContractKey, ) -> Result> { loop { - let resp = timeout(Duration::from_secs(30), client.recv()).await; + let resp = timeout(Duration::from_secs(60), client.recv()).await; match resp { Ok(Ok(HostResponse::ContractResponse(ContractResponse::PutResponse { key }))) => { if &key == expected_key { @@ -91,7 +91,7 @@ pub async fn wait_for_get_response( expected_key: &ContractKey, ) -> Result> { loop { - let resp = timeout(Duration::from_secs(30), client.recv()).await; + let resp = timeout(Duration::from_secs(60), client.recv()).await; match resp { Ok(Ok(HostResponse::ContractResponse(ContractResponse::GetResponse { key, @@ -134,7 +134,7 @@ pub async fn wait_for_subscribe_response( expected_key: &ContractKey, ) -> Result<(), Box> { loop { - let resp = timeout(Duration::from_secs(30), client.recv()).await; + let resp = timeout(Duration::from_secs(60), client.recv()).await; match resp { Ok(Ok(HostResponse::ContractResponse(ContractResponse::SubscribeResponse { key, diff --git a/apps/freenet-ping/app/tests/common/mod.rs b/apps/freenet-ping/app/tests/common/mod.rs index 3a333a12a..5750f2914 100644 --- a/apps/freenet-ping/app/tests/common/mod.rs +++ b/apps/freenet-ping/app/tests/common/mod.rs @@ -208,6 +208,9 @@ pub(crate) enum PackageType { Delegate, } +const CONTRACT_EXTRA_FEATURES: [&str; 1] = ["contract"]; +const NO_EXTRA_FEATURES: [&str; 0] = []; + impl PackageType { pub fn feature(&self) -> &'static str { match self { @@ -215,6 +218,13 @@ impl PackageType { PackageType::Delegate => "freenet-main-delegate", } } + + pub fn extra_features(&self) -> &'static [&'static str] { + match self { + PackageType::Contract => &CONTRACT_EXTRA_FEATURES, + PackageType::Delegate => &NO_EXTRA_FEATURES, + } + } } impl std::fmt::Display for PackageType { @@ -250,9 +260,10 @@ fn compile_options(cli_config: &BuildToolConfig) -> impl Iterator .iter() .flat_map(|s| { s.split(',') - .filter(|p| *p != cli_config.package_type.feature()) + .filter(|p| *p != cli_config.package_type.feature() && *p != "contract") }) - .chain([cli_config.package_type.feature()]); + .chain([cli_config.package_type.feature()]) + .chain(cli_config.package_type.extra_features().iter().copied()); let features = [ "--features".to_string(), feature_list.collect::>().join(","), @@ -262,24 +273,54 @@ fn compile_options(cli_config: &BuildToolConfig) -> impl Iterator .chain(release.iter().map(|s| s.to_string())) } // TODO: refactor so we share the implementation with fdev (need to extract to ) +fn ensure_target_dir_env() { + if std::env::var(TARGET_DIR_VAR).is_err() { + let workspace_dir = std::env::var("CARGO_WORKSPACE_DIR") + .map(PathBuf::from) + .unwrap_or_else(|_| find_workspace_root()); + let target_dir = workspace_dir.join("target"); + std::env::set_var(TARGET_DIR_VAR, &target_dir); + } +} + +fn find_workspace_root() -> PathBuf { + let manifest_dir = PathBuf::from(env!("CARGO_MANIFEST_DIR")); + manifest_dir + .ancestors() + .find(|dir| { + let cargo_toml = dir.join("Cargo.toml"); + cargo_toml.exists() + && std::fs::read_to_string(&cargo_toml) + .map(|contents| contents.contains("[workspace]")) + .unwrap_or(false) + }) + .expect("Could not determine workspace root from manifest directory") + .to_path_buf() +} + fn compile_contract(contract_path: &PathBuf) -> anyhow::Result> { + ensure_target_dir_env(); println!("module path: {contract_path:?}"); let target = std::env::var(TARGET_DIR_VAR) .map_err(|_| anyhow::anyhow!("CARGO_TARGET_DIR should be set"))?; println!("trying to compile the test contract, target: {target}"); - compile_rust_wasm_lib( - &BuildToolConfig { - features: None, - package_type: PackageType::Contract, - debug: true, - }, - contract_path, - )?; + let build_config = BuildToolConfig { + features: None, + package_type: PackageType::Contract, + debug: false, + }; + + compile_rust_wasm_lib(&build_config, contract_path)?; + let build_dir = if build_config.debug { + "debug" + } else { + "release" + }; let output_file = Path::new(&target) .join(WASM_TARGET) - .join("debug") + .join(build_dir) .join(WASM_FILE_NAME.replace('-', "_")) .with_extension("wasm"); println!("output file: {output_file:?}"); diff --git a/apps/freenet-ping/app/tests/run_app.rs b/apps/freenet-ping/app/tests/run_app.rs index b1cd6480d..17969d830 100644 --- a/apps/freenet-ping/app/tests/run_app.rs +++ b/apps/freenet-ping/app/tests/run_app.rs @@ -3,7 +3,7 @@ mod common; use std::{net::TcpListener, path::PathBuf, time::Duration}; use anyhow::anyhow; -use freenet::{local_node::NodeConfig, server::serve_gateway}; +use freenet::{local_node::NodeConfig, server::serve_gateway, test_utils::TestContext}; use freenet_ping_types::{Ping, PingContractOptions}; use freenet_stdlib::{ client_api::{ @@ -12,7 +12,7 @@ use freenet_stdlib::{ }, prelude::*, }; -use futures::{stream::FuturesUnordered, FutureExt, StreamExt}; +use futures::FutureExt; use rand::SeedableRng; use testresult::TestResult; use tokio::{select, time::sleep, time::timeout}; @@ -20,8 +20,8 @@ use tokio_tungstenite::connect_async; use tracing::{level_filters::LevelFilter, span, Instrument, Level}; use common::{ - base_node_test_config, base_node_test_config_with_rng, gw_config_from_path, - gw_config_from_path_with_rng, APP_TAG, PACKAGE_DIR, PATH_TO_CONTRACT, + base_node_test_config_with_rng, gw_config_from_path_with_rng, APP_TAG, PACKAGE_DIR, + PATH_TO_CONTRACT, }; use freenet_ping_app::ping_client::{ run_ping_client, wait_for_get_response, wait_for_put_response, wait_for_subscribe_response, @@ -1519,766 +1519,626 @@ async fn test_ping_application_loop() -> TestResult { Ok(()) } -#[tokio::test(flavor = "multi_thread")] -#[ignore = "Test has never worked - gateway nodes fail on startup with channel closed errors"] -async fn test_ping_partially_connected_network() -> TestResult { +#[freenet_macros::freenet_test( + nodes = ["gw-0", "gw-1", "gw-2", "node-0", "node-1", "node-2", "node-3", "node-4", "node-5", "node-6"], + gateways = ["gw-0", "gw-1", "gw-2"], + peer_connectivity_ratio = 0.3, + timeout_secs = 240, + startup_wait_secs = 30, + tokio_flavor = "multi_thread", + aggregate_events = "on_failure" +)] +async fn test_ping_partially_connected_network( + ctx: &mut freenet::test_utils::TestContext, +) -> TestResult { /* * This test verifies how subscription propagation works in a partially connected network. * - * Parameters: - * - NUM_GATEWAYS: Number of gateway nodes to create (default: 3) - * - NUM_REGULAR_NODES: Number of regular nodes to create (default: 7) - * - CONNECTIVITY_RATIO: Percentage of connectivity between regular nodes (0.0-1.0) - * - * Network Topology: - * - Creates a network with specified number of gateway and regular nodes - * - ALL regular nodes connect to ALL gateways (full gateway connectivity) - * - Regular nodes have partial connectivity to other regular nodes based on CONNECTIVITY_RATIO - * - Uses a deterministic approach to create partial connectivity between regular nodes + * Network Topology (configured by macro): + * - 3 gateways (gw-0, gw-1, gw-2) + * - 7 regular nodes (node-0 through node-6) + * - ALL regular nodes connect to ALL gateways (auto_connect_peers = true) + * - Regular nodes have 50% connectivity to other regular nodes (peer_connectivity_ratio = 0.5) + * - Uses deterministic blocking pattern based on node indices * * Test procedure: - * 1. Configures and starts all nodes with the specified topology - * 2. One node publishes the ping contract - * 3. Tracks which nodes successfully access the contract - * 4. Subscribes available nodes to the contract - * 5. Sends an update from one node - * 6. Verifies update propagation across the network - * 7. Analyzes results to detect potential subscription propagation issues + * 1. Load and publish the ping contract + * 2. Track which nodes successfully access the contract + * 3. Subscribe available nodes to the contract + * 4. Send an update from one node + * 5. Verify update propagation across the network + * 6. Analyze results to detect potential subscription propagation issues */ - // Network configuration parameters - const NUM_GATEWAYS: usize = 3; - const NUM_REGULAR_NODES: usize = 7; - const CONNECTIVITY_RATIO: f64 = 0.5; // Controls connectivity between regular nodes + use tokio_tungstenite::connect_async; + + let num_gateways = ctx.gateways().len(); + let num_regular_nodes = ctx.peers().len(); - // Configure logging - freenet::config::set_logger(Some(LevelFilter::DEBUG), None); tracing::info!( - "Starting test with {} gateways and {} regular nodes (connectivity ratio: {})", - NUM_GATEWAYS, - NUM_REGULAR_NODES, - CONNECTIVITY_RATIO + "Test started with {} gateways and {} regular nodes (50% connectivity ratio)", + num_gateways, + num_regular_nodes ); - // Setup network sockets for the gateways - let mut gateway_sockets = Vec::with_capacity(NUM_GATEWAYS); - let mut ws_api_gateway_sockets = Vec::with_capacity(NUM_GATEWAYS); - - for _ in 0..NUM_GATEWAYS { - gateway_sockets.push(TcpListener::bind("127.0.0.1:0")?); - ws_api_gateway_sockets.push(TcpListener::bind("127.0.0.1:0")?); + // Connect WebSocket clients to all gateway nodes + let mut gateway_clients = Vec::with_capacity(num_gateways); + for gw in ctx.gateways() { + let uri = format!( + "ws://127.0.0.1:{}/v1/contract/command?encodingProtocol=native", + gw.ws_port + ); + let (stream, _) = connect_async(&uri).await?; + let client = WebApi::start(stream); + gateway_clients.push(client); + tracing::info!("Connected to gateway {}", gw.label); } - // Setup API sockets for regular nodes - let mut ws_api_node_sockets = Vec::with_capacity(NUM_REGULAR_NODES); - let mut regular_node_addresses = Vec::with_capacity(NUM_REGULAR_NODES); - - // First, bind all sockets to get addresses for later blocking - for _i in 0..NUM_REGULAR_NODES { - let socket = TcpListener::bind("127.0.0.1:0")?; - // Store the address for later use in blocked_addresses - regular_node_addresses.push(socket.local_addr()?); - ws_api_node_sockets.push(TcpListener::bind("127.0.0.1:0")?); + // Connect WebSocket clients to all peer nodes + let mut node_clients = Vec::with_capacity(num_regular_nodes); + for peer in ctx.peers() { + let uri = format!( + "ws://127.0.0.1:{}/v1/contract/command?encodingProtocol=native", + peer.ws_port + ); + let (stream, _) = connect_async(&uri).await?; + let client = WebApi::start(stream); + node_clients.push(client); + tracing::info!("Connected to peer {}", peer.label); } - // Configure gateway nodes - let mut gateway_info = Vec::new(); - let mut ws_api_ports_gw = Vec::new(); - - // Build configurations and keep temp directories alive - let mut gateway_configs = Vec::with_capacity(NUM_GATEWAYS); - let mut gateway_presets = Vec::with_capacity(NUM_GATEWAYS); - - for i in 0..NUM_GATEWAYS { - let data_dir_suffix = format!("gw_{i}"); + // Load the ping contract. Compile once to determine the code hash, then again with proper options. + let path_to_code = PathBuf::from(PACKAGE_DIR).join(PATH_TO_CONTRACT); + tracing::info!(path=%path_to_code.display(), "loading contract code"); + let temp_options = PingContractOptions { + frequency: Duration::from_secs(3), + ttl: Duration::from_secs(60), + tag: APP_TAG.to_string(), + code_key: String::new(), + }; + let temp_params = Parameters::from(serde_json::to_vec(&temp_options).unwrap()); + let temp_container = common::load_contract(&path_to_code, temp_params)?; + let code_hash = CodeHash::from_code(temp_container.data()); + let ping_options = PingContractOptions { + frequency: Duration::from_secs(3), + ttl: Duration::from_secs(60), + tag: APP_TAG.to_string(), + code_key: code_hash.to_string(), + }; - let (cfg, preset) = base_node_test_config( - true, - vec![], - Some(gateway_sockets[i].local_addr()?.port()), - ws_api_gateway_sockets[i].local_addr()?.port(), - &data_dir_suffix, - None, // base_tmp_dir - None, // No blocked addresses for gateways - ) + let params = Parameters::from(serde_json::to_vec(&ping_options).unwrap()); + let container = common::load_contract(&path_to_code, params)?; + let contract_key = container.key(); + + // Choose a node to publish the contract + let publisher_idx = 0; + tracing::info!("Node {} will publish the contract", publisher_idx); + + // Publisher node puts the contract + let ping = Ping::default(); + let serialized = serde_json::to_vec(&ping)?; + let wrapped_state = WrappedState::new(serialized); + + let publisher = &mut node_clients[publisher_idx]; + publisher + .send(ClientRequest::ContractOp(ContractRequest::Put { + contract: container.clone(), + state: wrapped_state.clone(), + related_contracts: RelatedContracts::new(), + subscribe: false, + })) .await?; - let public_port = cfg.network_api.public_port.unwrap(); - let path = preset.temp_dir.path().to_path_buf(); - let config_info = gw_config_from_path(public_port, &path)?; - - tracing::info!("Gateway {} data dir: {:?}", i, preset.temp_dir.path()); - ws_api_ports_gw.push(cfg.ws_api.ws_api_port.unwrap()); - gateway_info.push(config_info); - gateway_configs.push(cfg); - gateway_presets.push(preset); - } - - // Serialize gateway info for nodes to use - let serialized_gateways: Vec = gateway_info - .iter() - .map(|info| serde_json::to_string(info).unwrap()) - .collect(); - - // Configure regular nodes with partial connectivity to OTHER regular nodes - let mut ws_api_ports_nodes = Vec::new(); - let mut node_connections = Vec::new(); - - // Build configurations and keep temp directories alive - let mut node_configs = Vec::with_capacity(NUM_REGULAR_NODES); - let mut node_presets = Vec::with_capacity(NUM_REGULAR_NODES); - - for (i, _) in ws_api_node_sockets.iter().enumerate() { - // Determine which other regular nodes this node should block - let mut blocked_addresses = Vec::new(); - for (j, &addr) in regular_node_addresses.iter().enumerate() { - if i == j { - continue; // Skip self - } - - // Use a deterministic approach based on node indices - // If the result is >= than CONNECTIVITY_RATIO, block the connection - let should_block = (i * j) % 100 >= (CONNECTIVITY_RATIO * 100.0) as usize; - if should_block { - blocked_addresses.push(addr); - } - } - - // Count effective connections to other regular nodes - let effective_connections = (NUM_REGULAR_NODES - 1) - blocked_addresses.len(); - node_connections.push(effective_connections); - - let data_dir_suffix = format!("node_{i}"); - - let (cfg, preset) = base_node_test_config( - false, - serialized_gateways.clone(), // All nodes connect to all gateways - None, - ws_api_node_sockets[i].local_addr()?.port(), - &data_dir_suffix, - None, - Some(blocked_addresses.clone()), // Use blocked_addresses for regular nodes - ) - .await?; + // Wait for put response on publisher + let key = wait_for_put_response(publisher, &contract_key) + .await + .map_err(anyhow::Error::msg)?; + tracing::info!(key=%key, "Publisher node {} put ping contract successfully!", publisher_idx); - tracing::info!( - "Node {} data dir: {:?} - Connected to {} other regular nodes (blocked: {})", - i, - preset.temp_dir.path(), - effective_connections, - blocked_addresses.len() - ); + // All nodes try to get the contract to see which have access + let mut nodes_with_contract = vec![false; num_regular_nodes]; + let mut get_requests = Vec::with_capacity(num_regular_nodes); - ws_api_ports_nodes.push(cfg.ws_api.ws_api_port.unwrap()); - node_configs.push(cfg); - node_presets.push(preset); + for (i, client) in node_clients.iter_mut().enumerate() { + client + .send(ClientRequest::ContractOp(ContractRequest::Get { + key: contract_key, + return_contract_code: true, + subscribe: false, + })) + .await?; + get_requests.push(i); } - // Free ports to avoid binding errors - std::mem::drop(gateway_sockets); - std::mem::drop(ws_api_gateway_sockets); - std::mem::drop(ws_api_node_sockets); - - // Start all gateway nodes - let gateway_futures = FuturesUnordered::new(); - for config in gateway_configs.into_iter() { - let gateway_future = async move { - let config = config.build().await?; - let node = NodeConfig::new(config.clone()) - .await? - .build(serve_gateway(config.ws_api).await) - .await?; - node.run().await - } - .boxed_local(); - gateway_futures.push(gateway_future); - } + // Track gateways with the contract + let mut gateways_with_contract = vec![false; num_gateways]; + let mut gw_get_requests = Vec::with_capacity(num_gateways); - // Start all regular nodes - let regular_node_futures = FuturesUnordered::new(); - for config in node_configs.into_iter() { - let regular_node_future = async move { - let config = config.build().await?; - let node = NodeConfig::new(config.clone()) - .await? - .build(serve_gateway(config.ws_api).await) - .await?; - node.run().await - } - .boxed_local(); - tokio::time::sleep(Duration::from_secs(2)).await; - regular_node_futures.push(regular_node_future); + for (i, client) in gateway_clients.iter_mut().enumerate() { + client + .send(ClientRequest::ContractOp(ContractRequest::Get { + key: contract_key, + return_contract_code: true, + subscribe: false, + })) + .await?; + gw_get_requests.push(i); } - // Create a future that will complete if any gateway fails - let mut gateway_monitor = gateway_futures.into_future(); - let mut regular_node_monitor = regular_node_futures.into_future(); + // Process all get responses with a timeout + let total_timeout = Duration::from_secs(30); + let start = std::time::Instant::now(); - let test = tokio::time::timeout(Duration::from_secs(240), async { - // Wait for nodes to start up - tracing::info!("Waiting for nodes to start up..."); - tokio::time::sleep(Duration::from_secs(30)).await; - tracing::info!("Proceeding to connect to nodes..."); - - // Connect to all nodes with retry logic - let mut gateway_clients = Vec::with_capacity(NUM_GATEWAYS); - for (i, port) in ws_api_ports_gw.iter().enumerate() { - let uri = format!( - "ws://127.0.0.1:{port}/v1/contract/command?encodingProtocol=native" - ); - let (stream, _) = connect_async(&uri).await?; - let client = WebApi::start(stream); - gateway_clients.push(client); - tracing::info!("Connected to gateway {}", i); + while !get_requests.is_empty() || !gw_get_requests.is_empty() { + if start.elapsed() > total_timeout { + tracing::warn!("Timeout waiting for get responses, continuing with test"); + break; } - let mut node_clients = Vec::with_capacity(NUM_REGULAR_NODES); - for (i, port) in ws_api_ports_nodes.iter().enumerate() { - let uri = format!( - "ws://127.0.0.1:{port}/v1/contract/command?encodingProtocol=native" - ); - let (stream, _) = connect_async(&uri).await?; - let client = WebApi::start(stream); - node_clients.push(client); - tracing::info!("Connected to regular node {}", i); + // Check regular nodes + let mut i = 0; + while i < get_requests.len() { + let node_idx = get_requests[i]; + let client = &mut node_clients[node_idx]; + + match timeout(Duration::from_millis(500), client.recv()).await { + Ok(Ok(HostResponse::ContractResponse(ContractResponse::GetResponse { + key, + .. + }))) => { + if key == contract_key { + tracing::info!("Node {} successfully got the contract", node_idx); + nodes_with_contract[node_idx] = true; + get_requests.remove(i); + continue; + } + } + Ok(Ok(_)) => {} + Ok(Err(e)) => { + tracing::warn!("Error receiving from node {}: {}", node_idx, e); + get_requests.remove(i); + continue; + } + Err(_) => {} + } + i += 1; } - // Log the node connectivity - tracing::info!("Node connectivity setup:"); - for (i, num_connections) in node_connections.iter().enumerate() { - tracing::info!("Node {} is connected to all {} gateways and {} other regular nodes", - i, NUM_GATEWAYS, num_connections); + // Check gateways + let mut i = 0; + while i < gw_get_requests.len() { + let gw_idx = gw_get_requests[i]; + let client = &mut gateway_clients[gw_idx]; + + match timeout(Duration::from_millis(500), client.recv()).await { + Ok(Ok(HostResponse::ContractResponse(ContractResponse::GetResponse { + key, + .. + }))) => { + if key == contract_key { + tracing::info!("Gateway {} successfully got the contract", gw_idx); + gateways_with_contract[gw_idx] = true; + gw_get_requests.remove(i); + continue; + } + } + Ok(Ok(_)) => {} + Ok(Err(e)) => { + tracing::warn!("Error receiving from gateway {}: {}", gw_idx, e); + gw_get_requests.remove(i); + continue; + } + Err(_) => {} + } + i += 1; } - // Load the ping contract - let path_to_code = PathBuf::from(PACKAGE_DIR).join(PATH_TO_CONTRACT); - tracing::info!(path=%path_to_code.display(), "loading contract code"); - let code = std::fs::read(path_to_code) - .ok() - .ok_or_else(|| anyhow!("Failed to read contract code"))?; - let code_hash = CodeHash::from_code(&code); - - // Create ping contract options - let ping_options = PingContractOptions { - frequency: Duration::from_secs(3), - ttl: Duration::from_secs(60), - tag: APP_TAG.to_string(), - code_key: code_hash.to_string(), - }; - - let params = Parameters::from(serde_json::to_vec(&ping_options).unwrap()); - let container = ContractContainer::try_from((code, ¶ms))?; - let contract_key = container.key(); - - // Choose a node to publish the contract - let publisher_idx = 0; - tracing::info!("Node {} will publish the contract", publisher_idx); - - // Publisher node puts the contract - let ping = Ping::default(); - let serialized = serde_json::to_vec(&ping)?; - let wrapped_state = WrappedState::new(serialized); - - let publisher = &mut node_clients[publisher_idx]; - publisher - .send(ClientRequest::ContractOp(ContractRequest::Put { - contract: container.clone(), - state: wrapped_state.clone(), - related_contracts: RelatedContracts::new(), - subscribe: false, - })) - .await?; + tokio::time::sleep(Duration::from_millis(100)).await; + } - // Wait for put response on publisher - let key = wait_for_put_response(publisher, &contract_key) - .await - .map_err(anyhow::Error::msg)?; - tracing::info!(key=%key, "Publisher node {} put ping contract successfully!", publisher_idx); + // Log initial contract distribution + tracing::info!("Initial contract distribution:"); + tracing::info!( + "Gateways with contract: {}/{}", + gateways_with_contract.iter().filter(|&&x| x).count(), + num_gateways + ); + tracing::info!( + "Regular nodes with contract: {}/{}", + nodes_with_contract.iter().filter(|&&x| x).count(), + num_regular_nodes + ); - // All nodes try to get the contract to see which have access - let mut nodes_with_contract = [false; NUM_REGULAR_NODES]; - let mut get_requests = Vec::with_capacity(NUM_REGULAR_NODES); + // All nodes with the contract subscribe to it + let mut subscribed_nodes = vec![false; num_regular_nodes]; + let mut subscription_requests = Vec::new(); - for (i, client) in node_clients.iter_mut().enumerate() { - client - .send(ClientRequest::ContractOp(ContractRequest::Get { + for (i, has_contract) in nodes_with_contract.iter().enumerate() { + if *has_contract { + node_clients[i] + .send(ClientRequest::ContractOp(ContractRequest::Subscribe { key: contract_key, - return_contract_code: true, - subscribe: false, + summary: None, })) .await?; - get_requests.push(i); + subscription_requests.push(i); } + } - // Track gateways with the contract - let mut gateways_with_contract = [false; NUM_GATEWAYS]; - let mut gw_get_requests = Vec::with_capacity(NUM_GATEWAYS); + // Also subscribe gateways + let mut subscribed_gateways = vec![false; num_gateways]; + let mut gw_subscription_requests = Vec::new(); - for (i, client) in gateway_clients.iter_mut().enumerate() { - client - .send(ClientRequest::ContractOp(ContractRequest::Get { + for (i, has_contract) in gateways_with_contract.iter().enumerate() { + if *has_contract { + gateway_clients[i] + .send(ClientRequest::ContractOp(ContractRequest::Subscribe { key: contract_key, - return_contract_code: true, - subscribe: false, + summary: None, })) .await?; - gw_get_requests.push(i); + gw_subscription_requests.push(i); } + } - // Process all get responses with a timeout - let total_timeout = Duration::from_secs(30); - let start = std::time::Instant::now(); + // Process subscription responses with a timeout + let start = std::time::Instant::now(); + let total_timeout = Duration::from_secs(30); - while !get_requests.is_empty() || !gw_get_requests.is_empty() { - if start.elapsed() > total_timeout { - tracing::warn!("Timeout waiting for get responses, continuing with test"); - break; - } + while !subscription_requests.is_empty() || !gw_subscription_requests.is_empty() { + if start.elapsed() > total_timeout { + tracing::warn!("Timeout waiting for subscription responses, continuing with test"); + break; + } - // Check regular nodes - let mut i = 0; - while i < get_requests.len() { - let node_idx = get_requests[i]; - let client = &mut node_clients[node_idx]; - - match timeout(Duration::from_millis(500), client.recv()).await { - Ok(Ok(HostResponse::ContractResponse(ContractResponse::GetResponse { key, .. }))) => { - if key == contract_key { - tracing::info!("Node {} successfully got the contract", node_idx); - nodes_with_contract[node_idx] = true; - get_requests.remove(i); - continue; - } - } - Ok(Ok(_)) => {}, - Ok(Err(e)) => { - tracing::warn!("Error receiving from node {}: {}", node_idx, e); - get_requests.remove(i); + // Check regular nodes + let mut i = 0; + while i < subscription_requests.len() { + let node_idx = subscription_requests[i]; + let client = &mut node_clients[node_idx]; + + match timeout(Duration::from_millis(500), client.recv()).await { + Ok(Ok(HostResponse::ContractResponse(ContractResponse::SubscribeResponse { + key, + subscribed, + .. + }))) => { + if key == contract_key { + tracing::info!("Node {} subscription result: {}", node_idx, subscribed); + subscribed_nodes[node_idx] = subscribed; + subscription_requests.remove(i); continue; } - Err(_) => {} } - i += 1; + Ok(Ok(_)) => {} + Ok(Err(e)) => { + tracing::warn!("Error receiving from node {}: {}", node_idx, e); + subscription_requests.remove(i); + continue; + } + Err(_) => {} } + i += 1; + } - // Check gateways - let mut i = 0; - while i < gw_get_requests.len() { - let gw_idx = gw_get_requests[i]; - let client = &mut gateway_clients[gw_idx]; - - match timeout(Duration::from_millis(500), client.recv()).await { - Ok(Ok(HostResponse::ContractResponse(ContractResponse::GetResponse { key, .. }))) => { - if key == contract_key { - tracing::info!("Gateway {} successfully got the contract", gw_idx); - gateways_with_contract[gw_idx] = true; - gw_get_requests.remove(i); - continue; - } - } - Ok(Ok(_)) => {}, - Ok(Err(e)) => { - tracing::warn!("Error receiving from gateway {}: {}", gw_idx, e); - gw_get_requests.remove(i); + // Check gateways + let mut i = 0; + while i < gw_subscription_requests.len() { + let gw_idx = gw_subscription_requests[i]; + let client = &mut gateway_clients[gw_idx]; + + match timeout(Duration::from_millis(500), client.recv()).await { + Ok(Ok(HostResponse::ContractResponse(ContractResponse::SubscribeResponse { + key, + subscribed, + .. + }))) => { + if key == contract_key { + tracing::info!("Gateway {} subscription result: {}", gw_idx, subscribed); + subscribed_gateways[gw_idx] = subscribed; + gw_subscription_requests.remove(i); continue; } - Err(_) => {} } - i += 1; - } - - tokio::time::sleep(Duration::from_millis(100)).await; - } - - // Log initial contract distribution - tracing::info!("Initial contract distribution:"); - tracing::info!("Gateways with contract: {}/{}", - gateways_with_contract.iter().filter(|&&x| x).count(), - NUM_GATEWAYS); - tracing::info!("Regular nodes with contract: {}/{}", - nodes_with_contract.iter().filter(|&&x| x).count(), - NUM_REGULAR_NODES); - - // All nodes with the contract subscribe to it - let mut subscribed_nodes = [false; NUM_REGULAR_NODES]; - let mut subscription_requests = Vec::new(); - - for (i, has_contract) in nodes_with_contract.iter().enumerate() { - if *has_contract { - node_clients[i] - .send(ClientRequest::ContractOp(ContractRequest::Subscribe { - key: contract_key, - summary: None, - })) - .await?; - subscription_requests.push(i); - } - } - - // Also subscribe gateways - let mut subscribed_gateways = [false; NUM_GATEWAYS]; - let mut gw_subscription_requests = Vec::new(); - - for (i, has_contract) in gateways_with_contract.iter().enumerate() { - if *has_contract { - gateway_clients[i] - .send(ClientRequest::ContractOp(ContractRequest::Subscribe { - key: contract_key, - summary: None, - })) - .await?; - gw_subscription_requests.push(i); + Ok(Ok(_)) => {} + Ok(Err(e)) => { + tracing::warn!("Error receiving from gateway {}: {}", gw_idx, e); + gw_subscription_requests.remove(i); + continue; + } + Err(_) => {} } + i += 1; } - // Process subscription responses with a timeout - let start = std::time::Instant::now(); - let total_timeout = Duration::from_secs(30); + tokio::time::sleep(Duration::from_millis(100)).await; + } - while !subscription_requests.is_empty() || !gw_subscription_requests.is_empty() { - if start.elapsed() > total_timeout { - tracing::warn!("Timeout waiting for subscription responses, continuing with test"); - break; - } + // Log subscription results + tracing::info!("Initial subscription results:"); + tracing::info!( + "Subscribed gateways: {}/{} (with contract: {})", + subscribed_gateways.iter().filter(|&&x| x).count(), + num_gateways, + gateways_with_contract.iter().filter(|&&x| x).count() + ); + tracing::info!( + "Subscribed regular nodes: {}/{} (with contract: {})", + subscribed_nodes.iter().filter(|&&x| x).count(), + num_regular_nodes, + nodes_with_contract.iter().filter(|&&x| x).count() + ); - // Check regular nodes - let mut i = 0; - while i < subscription_requests.len() { - let node_idx = subscription_requests[i]; - let client = &mut node_clients[node_idx]; - - match timeout(Duration::from_millis(500), client.recv()).await { - Ok(Ok(HostResponse::ContractResponse(ContractResponse::SubscribeResponse { key, subscribed, .. }))) => { - if key == contract_key { - tracing::info!("Node {} subscription result: {}", node_idx, subscribed); - subscribed_nodes[node_idx] = subscribed; - subscription_requests.remove(i); - continue; - } - } - Ok(Ok(_)) => {}, - Ok(Err(e)) => { - tracing::warn!("Error receiving from node {}: {}", node_idx, e); - subscription_requests.remove(i); - continue; - } - Err(_) => {} - } - i += 1; - } + // Choose one subscribed node to send an update + let updater_indices = subscribed_nodes + .iter() + .enumerate() + .filter_map(|(i, &subscribed)| if subscribed { Some(i) } else { None }) + .collect::>(); - // Check gateways - let mut i = 0; - while i < gw_subscription_requests.len() { - let gw_idx = gw_subscription_requests[i]; - let client = &mut gateway_clients[gw_idx]; - - match timeout(Duration::from_millis(500), client.recv()).await { - Ok(Ok(HostResponse::ContractResponse(ContractResponse::SubscribeResponse { key, subscribed, .. }))) => { - if key == contract_key { - tracing::info!("Gateway {} subscription result: {}", gw_idx, subscribed); - subscribed_gateways[gw_idx] = subscribed; - gw_subscription_requests.remove(i); - continue; - } - } - Ok(Ok(_)) => {}, - Ok(Err(e)) => { - tracing::warn!("Error receiving from gateway {}: {}", gw_idx, e); - gw_subscription_requests.remove(i); - continue; - } - Err(_) => {} - } - i += 1; - } + if updater_indices.is_empty() { + return Err(anyhow!("No subscribed nodes to send updates!").into()); + } - tokio::time::sleep(Duration::from_millis(100)).await; - } + let updater_idx = updater_indices[0]; + tracing::info!("Node {} will send an update", updater_idx); - // Log subscription results - tracing::info!("Initial subscription results:"); - tracing::info!("Subscribed gateways: {}/{} (with contract: {})", - subscribed_gateways.iter().filter(|&&x| x).count(), - NUM_GATEWAYS, - gateways_with_contract.iter().filter(|&&x| x).count()); - tracing::info!("Subscribed regular nodes: {}/{} (with contract: {})", - subscribed_nodes.iter().filter(|&&x| x).count(), - NUM_REGULAR_NODES, - nodes_with_contract.iter().filter(|&&x| x).count()); - - // Choose one subscribed node to send an update - let updater_indices = subscribed_nodes.iter() - .enumerate() - .filter_map(|(i, &subscribed)| if subscribed { Some(i) } else { None }) - .collect::>(); - - if updater_indices.is_empty() { - return Err(anyhow!("No subscribed nodes to send updates!")); - } + // Create a unique tag for the updater + let update_tag = format!("ping-from-node-{updater_idx}"); - let updater_idx = updater_indices[0]; - tracing::info!("Node {} will send an update", updater_idx); + // Send the update + let mut update_ping = Ping::default(); + update_ping.insert(update_tag.clone()); - // Create a unique tag for the updater - let update_tag = format!("ping-from-node-{updater_idx}"); + tracing::info!( + "Node {} sending update with tag: {}", + updater_idx, + update_tag + ); + node_clients[updater_idx] + .send(ClientRequest::ContractOp(ContractRequest::Update { + key: contract_key, + data: UpdateData::Delta(StateDelta::from(serde_json::to_vec(&update_ping).unwrap())), + })) + .await?; - // Send the update - let mut update_ping = Ping::default(); - update_ping.insert(update_tag.clone()); + // Wait for the update to propagate through the network + tracing::info!("Waiting for update to propagate..."); + tokio::time::sleep(Duration::from_secs(20)).await; - tracing::info!("Node {} sending update with tag: {}", updater_idx, update_tag); - node_clients[updater_idx] - .send(ClientRequest::ContractOp(ContractRequest::Update { - key: contract_key, - data: UpdateData::Delta(StateDelta::from(serde_json::to_vec(&update_ping).unwrap())), - })) - .await?; + // Check which nodes received the update + let mut nodes_received_update = vec![false; num_regular_nodes]; + let mut get_state_requests = Vec::new(); - // Wait for the update to propagate through the network - tracing::info!("Waiting for update to propagate..."); - tokio::time::sleep(Duration::from_secs(20)).await; - - // Check which nodes received the update - let mut nodes_received_update = [false; NUM_REGULAR_NODES]; - let mut get_state_requests = Vec::new(); - - for (i, subscribed) in subscribed_nodes.iter().enumerate() { - if *subscribed { - node_clients[i] - .send(ClientRequest::ContractOp(ContractRequest::Get { - key: contract_key, - return_contract_code: false, - subscribe: false, - })) - .await?; - get_state_requests.push(i); - } + for (i, subscribed) in subscribed_nodes.iter().enumerate() { + if *subscribed { + node_clients[i] + .send(ClientRequest::ContractOp(ContractRequest::Get { + key: contract_key, + return_contract_code: false, + subscribe: false, + })) + .await?; + get_state_requests.push(i); } + } - // Also check gateways - let mut gateways_received_update = [false; NUM_GATEWAYS]; - let mut gw_get_state_requests = Vec::new(); - - for (i, subscribed) in subscribed_gateways.iter().enumerate() { - if *subscribed { - gateway_clients[i] - .send(ClientRequest::ContractOp(ContractRequest::Get { - key: contract_key, - return_contract_code: false, - subscribe: false, - })) - .await?; - gw_get_state_requests.push(i); - } - } + // Also check gateways + let mut gateways_received_update = vec![false; num_gateways]; + let mut gw_get_state_requests = Vec::new(); - // Process get state responses with a timeout - let start = std::time::Instant::now(); - let total_timeout = Duration::from_secs(30); + for (i, subscribed) in subscribed_gateways.iter().enumerate() { + if *subscribed { + gateway_clients[i] + .send(ClientRequest::ContractOp(ContractRequest::Get { + key: contract_key, + return_contract_code: false, + subscribe: false, + })) + .await?; + gw_get_state_requests.push(i); + } + } - while !get_state_requests.is_empty() || !gw_get_state_requests.is_empty() { - if start.elapsed() > total_timeout { - tracing::warn!("Timeout waiting for get state responses, finalizing test"); - break; - } + // Process get state responses with a timeout + let start = std::time::Instant::now(); + let total_timeout = Duration::from_secs(30); - // Check regular nodes - let mut i = 0; - while i < get_state_requests.len() { - let node_idx = get_state_requests[i]; - let client = &mut node_clients[node_idx]; - - match timeout(Duration::from_millis(500), client.recv()).await { - Ok(Ok(HostResponse::ContractResponse(ContractResponse::GetResponse { key, state, .. }))) => { - if key == contract_key { - // Deserialize state and check for the update tag - match serde_json::from_slice::(&state) { - Ok(ping_state) => { - let has_update = ping_state.get(&update_tag).is_some(); - tracing::info!("Node {} has update: {}", node_idx, has_update); - - if has_update { - let timestamps = ping_state.get(&update_tag).unwrap(); - tracing::info!("Node {} has {} timestamps for tag {}", - node_idx, - timestamps.len(), - update_tag); - } + while !get_state_requests.is_empty() || !gw_get_state_requests.is_empty() { + if start.elapsed() > total_timeout { + tracing::warn!("Timeout waiting for get state responses, finalizing test"); + break; + } - nodes_received_update[node_idx] = has_update; - } - Err(e) => { - tracing::warn!("Failed to deserialize state from node {}: {}", node_idx, e); + // Check regular nodes + let mut i = 0; + while i < get_state_requests.len() { + let node_idx = get_state_requests[i]; + let client = &mut node_clients[node_idx]; + + match timeout(Duration::from_millis(500), client.recv()).await { + Ok(Ok(HostResponse::ContractResponse(ContractResponse::GetResponse { + key, + state, + .. + }))) => { + if key == contract_key { + // Deserialize state and check for the update tag + match serde_json::from_slice::(&state) { + Ok(ping_state) => { + let has_update = ping_state.get(&update_tag).is_some(); + tracing::info!("Node {} has update: {}", node_idx, has_update); + + if has_update { + let timestamps = ping_state.get(&update_tag).unwrap(); + tracing::info!( + "Node {} has {} timestamps for tag {}", + node_idx, + timestamps.len(), + update_tag + ); } + + nodes_received_update[node_idx] = has_update; + } + Err(e) => { + tracing::warn!( + "Failed to deserialize state from node {}: {}", + node_idx, + e + ); } - get_state_requests.remove(i); - continue; } - } - Ok(Ok(_)) => {}, - Ok(Err(e)) => { - tracing::warn!("Error receiving from node {}: {}", node_idx, e); get_state_requests.remove(i); continue; } - Err(_) => {} } - i += 1; + Ok(Ok(_)) => {} + Ok(Err(e)) => { + tracing::warn!("Error receiving from node {}: {}", node_idx, e); + get_state_requests.remove(i); + continue; + } + Err(_) => {} } + i += 1; + } - // Check gateways - let mut i = 0; - while i < gw_get_state_requests.len() { - let gw_idx = gw_get_state_requests[i]; - let client = &mut gateway_clients[gw_idx]; - - match timeout(Duration::from_millis(500), client.recv()).await { - Ok(Ok(HostResponse::ContractResponse(ContractResponse::GetResponse { key, state, .. }))) => { - if key == contract_key { - // Deserialize state and check for the update tag - match serde_json::from_slice::(&state) { - Ok(ping_state) => { - let has_update = ping_state.get(&update_tag).is_some(); - tracing::info!("Gateway {} has update: {}", gw_idx, has_update); - - if has_update { - let timestamps = ping_state.get(&update_tag).unwrap(); - tracing::info!("Gateway {} has {} timestamps for tag {}", - gw_idx, - timestamps.len(), - update_tag); - } - - gateways_received_update[gw_idx] = has_update; - } - Err(e) => { - tracing::warn!("Failed to deserialize state from gateway {}: {}", gw_idx, e); + // Check gateways + let mut i = 0; + while i < gw_get_state_requests.len() { + let gw_idx = gw_get_state_requests[i]; + let client = &mut gateway_clients[gw_idx]; + + match timeout(Duration::from_millis(500), client.recv()).await { + Ok(Ok(HostResponse::ContractResponse(ContractResponse::GetResponse { + key, + state, + .. + }))) => { + if key == contract_key { + // Deserialize state and check for the update tag + match serde_json::from_slice::(&state) { + Ok(ping_state) => { + let has_update = ping_state.get(&update_tag).is_some(); + tracing::info!("Gateway {} has update: {}", gw_idx, has_update); + + if has_update { + let timestamps = ping_state.get(&update_tag).unwrap(); + tracing::info!( + "Gateway {} has {} timestamps for tag {}", + gw_idx, + timestamps.len(), + update_tag + ); } + + gateways_received_update[gw_idx] = has_update; + } + Err(e) => { + tracing::warn!( + "Failed to deserialize state from gateway {}: {}", + gw_idx, + e + ); } - gw_get_state_requests.remove(i); - continue; } - } - Ok(Ok(_)) => {}, - Ok(Err(e)) => { - tracing::warn!("Error receiving from gateway {}: {}", gw_idx, e); gw_get_state_requests.remove(i); continue; } - Err(_) => {} } - i += 1; + Ok(Ok(_)) => {} + Ok(Err(e)) => { + tracing::warn!("Error receiving from gateway {}: {}", gw_idx, e); + gw_get_state_requests.remove(i); + continue; + } + Err(_) => {} } - - tokio::time::sleep(Duration::from_millis(100)).await; + i += 1; } - // Analyze update propagation results - tracing::info!("Final update propagation results:"); - - // Summary for gateways - let subscribed_gw_count = subscribed_gateways.iter().filter(|&&x| x).count(); - let updated_gw_count = gateways_received_update.iter().filter(|&&x| x).count(); - - tracing::info!("Gateways: {}/{} subscribed received the update ({:.1}%)", - updated_gw_count, - subscribed_gw_count, - if subscribed_gw_count > 0 { - (updated_gw_count as f64 / subscribed_gw_count as f64) * 100.0 - } else { - 0.0 - }); - - // Summary for regular nodes - let subscribed_node_count = subscribed_nodes.iter().filter(|&&x| x).count(); - let updated_node_count = nodes_received_update.iter().filter(|&&x| x).count(); - - tracing::info!("Regular nodes: {}/{} subscribed received the update ({:.1}%)", - updated_node_count, - subscribed_node_count, - if subscribed_node_count > 0 { - (updated_node_count as f64 / subscribed_node_count as f64) * 100.0 - } else { - 0.0 - }); - - // Check nodes that didn't receive updates - for (node_idx, (subscribed, updated)) in subscribed_nodes.iter().zip(nodes_received_update.iter()).enumerate() { - if *subscribed && !updated { - tracing::warn!("Node {} was subscribed but did not receive the update!", node_idx); - - // Get the node connectivity info - let connections = node_connections[node_idx]; - tracing::warn!("Node {} is connected to {} other regular nodes", node_idx, connections); - } - } + tokio::time::sleep(Duration::from_millis(100)).await; + } - // Verify that updates have propagated to at least some nodes - assert!( - updated_node_count > 0, - "No nodes received the update, subscription propagation failed" - ); + // Analyze update propagation results + tracing::info!("Final update propagation results:"); - // Verify that if we have multiple gateways, at least some received the update - if NUM_GATEWAYS > 1 && subscribed_gw_count > 1 { - assert!( - updated_gw_count > 0, - "No gateways received the update, gateway subscription propagation failed" - ); + // Summary for gateways + let subscribed_gw_count = subscribed_gateways.iter().filter(|&&x| x).count(); + let updated_gw_count = gateways_received_update.iter().filter(|&&x| x).count(); + + tracing::info!( + "Gateways: {}/{} subscribed received the update ({:.1}%)", + updated_gw_count, + subscribed_gw_count, + if subscribed_gw_count > 0 { + (updated_gw_count as f64 / subscribed_gw_count as f64) * 100.0 + } else { + 0.0 } + ); - // Calculate and assert a minimum expected update propagation rate - let min_expected_rate = 0.5; // At least 50% of subscribed nodes should get updates + // Summary for regular nodes + let subscribed_node_count = subscribed_nodes.iter().filter(|&&x| x).count(); + let updated_node_count = nodes_received_update.iter().filter(|&&x| x).count(); - let actual_rate = if subscribed_node_count > 0 { - updated_node_count as f64 / subscribed_node_count as f64 + tracing::info!( + "Regular nodes: {}/{} subscribed received the update ({:.1}%)", + updated_node_count, + subscribed_node_count, + if subscribed_node_count > 0 { + (updated_node_count as f64 / subscribed_node_count as f64) * 100.0 } else { 0.0 - }; + } + ); + // Check nodes that didn't receive updates + for (node_idx, (subscribed, updated)) in subscribed_nodes + .iter() + .zip(nodes_received_update.iter()) + .enumerate() + { + if *subscribed && !updated { + tracing::warn!( + "Node {} was subscribed but did not receive the update!", + node_idx + ); + } + } + + // Verify that updates have propagated to at least some nodes + assert!( + updated_node_count > 0, + "No nodes received the update, subscription propagation failed" + ); + + // Verify that if we have multiple gateways, at least some received the update + if num_gateways > 1 && subscribed_gw_count > 1 { assert!( - actual_rate >= min_expected_rate, - "Update propagation rate too low: {:.1}% (expected at least {:.1}%)", - actual_rate * 100.0, - min_expected_rate * 100.0 + updated_gw_count > 0, + "No gateways received the update, gateway subscription propagation failed" ); + } - tracing::info!("Subscription propagation test completed successfully!"); - - Ok::<_, anyhow::Error>(()) - }) - .instrument(span!(Level::INFO, "test_ping_partially_connected_network")); + // Calculate and assert a minimum expected update propagation rate + let min_expected_rate = 0.5; // At least 50% of subscribed nodes should get updates - // Wait for test completion or node failures - select! { - (r, _remaining) = &mut gateway_monitor => { - if let Some(r) = r { - match r { - Err(err) => return Err(anyhow!("Gateway node failed: {}", err).into()), - Ok(_) => panic!("Gateway node unexpectedly terminated successfully"), - } - } - } - (r, _remaining) = &mut regular_node_monitor => { - if let Some(r) = r { - match r { - Err(err) => return Err(anyhow!("Regular node failed: {}", err).into()), - Ok(_) => panic!("Regular node unexpectedly terminated successfully"), - } - } - } - r = test => { - r??; - } - } + let actual_rate = if subscribed_node_count > 0 { + updated_node_count as f64 / subscribed_node_count as f64 + } else { + 0.0 + }; - // Keep presets alive until here - tracing::debug!( - "Test complete, dropping {} gateway presets and {} node presets", - gateway_presets.len(), - node_presets.len() + assert!( + actual_rate >= min_expected_rate, + "Update propagation rate too low: {:.1}% (expected at least {:.1}%)", + actual_rate * 100.0, + min_expected_rate * 100.0 ); + tracing::info!("Subscription propagation test completed successfully!"); + Ok(()) } diff --git a/crates/core/src/node/network_bridge/p2p_protoc.rs b/crates/core/src/node/network_bridge/p2p_protoc.rs index 4a39bfc43..f05fb9e1d 100644 --- a/crates/core/src/node/network_bridge/p2p_protoc.rs +++ b/crates/core/src/node/network_bridge/p2p_protoc.rs @@ -888,12 +888,27 @@ impl P2pConnManager { .await?; Ok(EventResult::Continue) } - Err(handshake_error) => { - tracing::error!(?handshake_error, "Handshake handler error"); - Ok(EventResult::Event( - ConnEvent::ClosedChannel(ChannelCloseReason::Handshake).into(), - )) - } + Err(handshake_error) => match handshake_error { + HandshakeError::ConnectionClosed(addr) => { + tracing::warn!( + %addr, + "Handshake handler reported connection closed - continuing without shutdown" + ); + Ok(EventResult::Continue) + } + HandshakeError::ChannelClosed => { + tracing::warn!( + "Handshake channel reported as closed - treating as transient and continuing" + ); + Ok(EventResult::Continue) + } + _ => { + tracing::error!(?handshake_error, "Handshake handler error"); + Ok(EventResult::Event( + ConnEvent::ClosedChannel(ChannelCloseReason::Handshake).into(), + )) + } + }, } } SelectResult::NodeController(msg) => { @@ -1020,23 +1035,64 @@ impl P2pConnManager { tracing::debug!(tx = %tx, "Blocked addresses: {:?}, peer addr: {}", blocked_addrs, peer.addr); } state.awaiting_connection.insert(peer.addr, callback); - let res = timeout( + match timeout( Duration::from_secs(10), handshake_handler_msg.establish_conn(peer.clone(), tx, is_gw), ) .await - .inspect_err(|error| { - tracing::error!(tx = %tx, "Failed to establish connection: {:?}", error); - })?; - match res { - Ok(()) => { - tracing::debug!(tx = %tx, + { + Ok(Ok(())) => { + tracing::debug!( + tx = %tx, "Successfully initiated connection process for peer: {:?}", peer ); Ok(()) } - Err(e) => Err(anyhow::Error::msg(e)), + Ok(Err(e)) => { + tracing::error!( + tx = %tx, + remote = %peer, + error = ?e, + "Handshake handler failed while queuing connection request" + ); + if let Some(mut cb) = state.awaiting_connection.remove(&peer.addr) { + cb.send_result(Err(e)) + .await + .inspect_err(|err| { + tracing::debug!( + remote = %peer, + "Failed to notify caller about handshake failure: {:?}", + err + ); + }) + .ok(); + } + Ok(()) + } + Err(elapsed) => { + tracing::warn!( + tx = %tx, + remote = %peer, + elapsed = ?elapsed, + "Timed out while queuing handshake request; treating as connection failure" + ); + if let Some(mut cb) = state.awaiting_connection.remove(&peer.addr) { + cb.send_result(Err(HandshakeError::ConnectionError( + ConnectionError::Timeout, + ))) + .await + .inspect_err(|err| { + tracing::debug!( + remote = %peer, + "Failed to notify caller about handshake timeout: {:?}", + err + ); + }) + .ok(); + } + Ok(()) + } } } @@ -1240,14 +1296,37 @@ impl P2pConnManager { let key = (*self.bridge.op_manager.ring.connection_manager.pub_key).clone(); PeerId::new(self_addr, key) }; - timeout( + let connection_peer_id = peer_id.clone(); + match timeout( Duration::from_secs(60), - cb.send_result(Ok((peer_id, remaining_checks))), + cb.send_result(Ok((connection_peer_id, remaining_checks))), ) .await - .inspect_err(|error| { - tracing::error!("Failed to send connection result: {:?}", error); - })??; + { + Ok(Ok(())) => {} + Ok(Err(HandshakeError::ChannelClosed)) => { + tracing::debug!( + %peer_id, + "Connection result receiver dropped before completion; treating as benign" + ); + } + Ok(Err(e)) => { + tracing::error!( + %peer_id, + error = ?e, + "Failed to deliver connection result to caller" + ); + return Err(anyhow::Error::new(e)); + } + Err(elapsed) => { + tracing::error!( + %peer_id, + elapsed = ?elapsed, + "Timed out delivering connection result to caller" + ); + return Err(anyhow::Error::from(elapsed)); + } + } } else { tracing::warn!(%peer_id, "No callback for connection established"); } diff --git a/crates/core/tests/connectivity.rs b/crates/core/tests/connectivity.rs index 48f1a4974..a2bb2c3b7 100644 --- a/crates/core/tests/connectivity.rs +++ b/crates/core/tests/connectivity.rs @@ -18,7 +18,6 @@ use tokio_tungstenite::connect_async; /// connecting to the gateway. #[freenet_test( nodes = ["gateway", "peer"], - auto_connect_peers = true, timeout_secs = 180, startup_wait_secs = 15, aggregate_events = "always", @@ -251,7 +250,6 @@ async fn test_basic_gateway_connectivity(ctx: &mut TestContext) -> TestResult { /// #[freenet_test( nodes = ["gateway", "peer1", "peer2"], - auto_connect_peers = true, timeout_secs = 180, startup_wait_secs = 30, aggregate_events = "always", diff --git a/crates/core/tests/operations.rs b/crates/core/tests/operations.rs index c2287426d..7e5e35ba8 100644 --- a/crates/core/tests/operations.rs +++ b/crates/core/tests/operations.rs @@ -131,7 +131,6 @@ async fn get_contract( /// Test PUT operation across two peers (gateway and peer) #[freenet_test( nodes = ["gateway", "peer-a"], - auto_connect_peers = true, timeout_secs = 180, startup_wait_secs = 15, tokio_flavor = "multi_thread", @@ -239,7 +238,6 @@ async fn test_put_contract(ctx: &mut TestContext) -> TestResult { #[freenet_test( nodes = ["gateway", "peer-a"], - auto_connect_peers = true, timeout_secs = 180, startup_wait_secs = 20, tokio_flavor = "multi_thread", @@ -410,7 +408,6 @@ async fn test_update_contract(ctx: &mut TestContext) -> TestResult { /// This is a regression test for issue #1995. #[freenet_test( nodes = ["gateway", "peer-a"], - auto_connect_peers = true, timeout_secs = 180, startup_wait_secs = 15, tokio_flavor = "multi_thread", @@ -597,7 +594,6 @@ async fn test_put_merge_persists_state(ctx: &mut TestContext) -> TestResult { // If this test becomes flaky again, see issue #1798 for historical context. #[freenet_test( nodes = ["gateway", "node-a", "node-b"], - auto_connect_peers = true, timeout_secs = 600, startup_wait_secs = 40, tokio_flavor = "multi_thread", @@ -1171,7 +1167,6 @@ async fn test_multiple_clients_subscription(ctx: &mut TestContext) -> TestResult #[freenet_test( nodes = ["gateway", "node-a"], - auto_connect_peers = true, timeout_secs = 120, startup_wait_secs = 20, tokio_flavor = "multi_thread", @@ -1399,7 +1394,6 @@ async fn test_get_with_subscribe_flag(ctx: &mut TestContext) -> TestResult { // FIXME Update notification is not received #[freenet_test( nodes = ["gateway", "node-a"], - auto_connect_peers = true, timeout_secs = 180, startup_wait_secs = 20, tokio_flavor = "multi_thread", @@ -1680,7 +1674,6 @@ async fn test_put_with_subscribe_flag(ctx: &mut TestContext) -> TestResult { #[freenet_test( nodes = ["gateway", "client-node"], - auto_connect_peers = true, timeout_secs = 180, startup_wait_secs = 20, tokio_flavor = "multi_thread", @@ -1838,7 +1831,6 @@ async fn test_delegate_request(ctx: &mut TestContext) -> TestResult { #[freenet_test( nodes = ["gateway", "peer-a", "peer-c"], gateways = ["gateway"], - auto_connect_peers = true, timeout_secs = 240, startup_wait_secs = 15, aggregate_events = "on_failure", @@ -2412,7 +2404,6 @@ async fn wait_for_subscribe_response( #[freenet_test( nodes = ["gateway", "peer-node"], - auto_connect_peers = true, timeout_secs = 180, startup_wait_secs = 10, tokio_flavor = "multi_thread", @@ -2486,7 +2477,6 @@ async fn test_subscription_introspection(ctx: &mut TestContext) -> TestResult { #[freenet_test( nodes = ["gateway", "peer-a"], - auto_connect_peers = true, timeout_secs = 180, startup_wait_secs = 20, tokio_flavor = "multi_thread", diff --git a/crates/core/tests/test_macro_example.rs b/crates/core/tests/test_macro_example.rs index 8d81c4336..707014061 100644 --- a/crates/core/tests/test_macro_example.rs +++ b/crates/core/tests/test_macro_example.rs @@ -212,7 +212,6 @@ async fn test_multiple_gateways(ctx: &mut TestContext) -> TestResult { /// Test with auto_connect_peers enabled #[freenet_test( nodes = ["gateway", "peer-1", "peer-2"], - auto_connect_peers = true, timeout_secs = 120, startup_wait_secs = 15 )] @@ -242,7 +241,6 @@ async fn test_auto_connect_peers(ctx: &mut TestContext) -> TestResult { #[freenet_test( nodes = ["gw-1", "gw-2", "peer-1", "peer-2"], gateways = ["gw-1", "gw-2"], - auto_connect_peers = true, timeout_secs = 120, startup_wait_secs = 15 )] diff --git a/crates/freenet-macros/README.md b/crates/freenet-macros/README.md index 2cd601227..294521922 100644 --- a/crates/freenet-macros/README.md +++ b/crates/freenet-macros/README.md @@ -29,19 +29,18 @@ async fn test_basic_gateway(ctx: &mut TestContext) -> TestResult { } ``` -### Multi-Node Test with Auto-Connect +### Multi-Node Test ```rust #[freenet_test( nodes = ["gateway", "peer-1", "peer-2"], - auto_connect_peers = true, aggregate_events = "on_failure" )] async fn test_network_operations(ctx: &mut TestContext) -> TestResult { let gateway = ctx.gateway()?; let peers = ctx.peers(); - // All peers are automatically configured to connect to the gateway + // Peers are automatically configured to connect to the gateway (default behavior) assert_eq!(peers.len(), 2); // Your test logic here... @@ -85,16 +84,22 @@ async fn test_multi_gateway(ctx: &mut TestContext) -> TestResult { #### `auto_connect_peers` Automatically configure all peer nodes to connect to all gateway nodes. +**Default:** `true` + ```rust +// Default behavior - peers auto-connect +#[freenet_test(nodes = ["gateway", "peer-1", "peer-2"])] + +// Explicitly disable auto-connection if needed #[freenet_test( nodes = ["gateway", "peer-1", "peer-2"], - auto_connect_peers = true // Peers auto-connect to gateway + auto_connect_peers = false )] ``` **Behavior:** -- When `true`: Peers are pre-configured with gateway connection info -- When `false` (default): You must manually configure peer connections +- When `true` (default): Peers are pre-configured with gateway connection info +- When `false`: You must manually configure peer connections - Works with multiple gateways (peers connect to all gateways) #### `aggregate_events` @@ -205,6 +210,32 @@ Logging level for the test. **Values:** `"trace"`, `"debug"`, `"info"`, `"warn"`, `"error"` +#### `peer_connectivity_ratio` +Controls the connectivity ratio between peer nodes (0.0-1.0) for testing partially connected networks. + +```rust +#[freenet_test( + nodes = ["gw-0", "gw-1", "node-0", "node-1", "node-2", "node-3"], + gateways = ["gw-0", "gw-1"], + auto_connect_peers = true, // Peers connect to all gateways + peer_connectivity_ratio = 0.5 // 50% connectivity between peers +)] +``` + +**How it works:** +- A ratio of `1.0` means full connectivity between all peers +- A ratio of `0.5` means approximately 50% of peer-to-peer connections are blocked +- A ratio of `0.0` means no direct peer-to-peer connections (only via gateways) +- The blocking pattern is deterministic based on node indices +- Gateway connectivity is not affected - this only controls peer-to-peer connections + +**Use cases:** +- Testing subscription propagation in partially connected networks +- Simulating network partitions or unreliable peer connections +- Verifying that updates propagate through gateways when direct peer routes are unavailable + +**Note:** When this is set, `auto_connect_peers` should typically be `true` to ensure peers can reach gateways. + ## TestContext API The macro provides a `TestContext` parameter to your test function with these methods: @@ -262,7 +293,7 @@ use freenet_stdlib::prelude::*; #[freenet_test( nodes = ["gateway", "peer-1", "peer-2"], - auto_connect_peers = true, + timeout_secs = 180, startup_wait_secs = 15, aggregate_events = "on_failure", @@ -412,7 +443,7 @@ EVENT LOG SUMMARY #[freenet_test( nodes = ["gw-1", "gw-2", "peer-1", "peer-2", "peer-3", "peer-4"], gateways = ["gw-1", "gw-2"], - auto_connect_peers = true, + startup_wait_secs = 20 // More time for connections to establish )] ``` diff --git a/crates/freenet-macros/src/codegen.rs b/crates/freenet-macros/src/codegen.rs index 0344e1d75..2593ff475 100644 --- a/crates/freenet-macros/src/codegen.rs +++ b/crates/freenet-macros/src/codegen.rs @@ -58,7 +58,6 @@ pub fn generate_test_code(args: FreenetTestArgs, input_fn: ItemFn) -> Result freenet::test_utils::TestResult { use freenet::test_utils::{TestContext, TestLogger, NodeInfo}; use std::time::{Duration, Instant}; - use tokio::select; use anyhow::anyhow; // 1. Setup TestLogger @@ -81,21 +80,22 @@ pub fn generate_test_code(args: FreenetTestArgs, input_fn: ItemFn) -> Result TokenStream { } } + // Pre-compute peer network ports if we need partial connectivity + let peer_network_ports_setup = if args.peer_connectivity_ratio.is_some() { + let peer_indices: Vec<_> = args + .nodes + .iter() + .enumerate() + .filter(|(idx, node_label)| !is_gateway(args, node_label, *idx)) + .map(|(idx, _)| idx) + .collect(); + + let peer_port_vars: Vec<_> = peer_indices + .iter() + .map(|idx| format_ident!("peer_network_port_{}", idx)) + .collect(); + + quote! { + // Pre-bind sockets for all peers to get their network ports + #( + let peer_socket = std::net::TcpListener::bind("127.0.0.1:0")?; + let #peer_port_vars = peer_socket.local_addr()?.port(); + std::mem::drop(peer_socket); + )* + } + } else { + quote! {} + }; + + setup_code.push(peer_network_ports_setup); + // Third pass: Generate peer configurations (non-gateway nodes) for (idx, node_label) in args.nodes.iter().enumerate() { let config_var = format_ident!("config_{}", idx); @@ -230,6 +259,61 @@ fn generate_node_setup(args: &FreenetTestArgs) -> TokenStream { } }; + // Compute blocked addresses for this peer if partial connectivity is enabled + let blocked_addresses_code = if let Some(ratio) = args.peer_connectivity_ratio { + // Build mapping from node index to peer index (for deterministic blocking) + let peer_indices: Vec<(usize, usize)> = args + .nodes + .iter() + .enumerate() + .filter(|(node_idx, label)| !is_gateway(args, label, *node_idx)) + .enumerate() + .map(|(peer_idx, (node_idx, _))| (node_idx, peer_idx)) + .collect(); + + // Find this peer's relative index + let this_peer_idx = peer_indices + .iter() + .find(|(node_idx, _)| *node_idx == idx) + .map(|(_, peer_idx)| *peer_idx) + .expect("Current node must be in peer list"); + + let peer_checks: Vec<_> = args + .nodes + .iter() + .enumerate() + .filter(|(other_idx, other_label)| !is_gateway(args, other_label, *other_idx) && *other_idx != idx) + .map(|(other_idx, _)| { + // Find the other peer's relative index + let other_peer_idx = peer_indices.iter() + .find(|(node_idx, _)| *node_idx == other_idx) + .map(|(_, peer_idx)| *peer_idx) + .expect("Other node must be in peer list"); + + let port_var = format_ident!("peer_network_port_{}", other_idx); + quote! { + // Use ordered pair and better hash distribution for deterministic connectivity + // Use relative peer indices (not absolute node indices) for consistent blocking + let (a, b) = if #this_peer_idx < #other_peer_idx { (#this_peer_idx, #other_peer_idx) } else { (#other_peer_idx, #this_peer_idx) }; + let hash_value = (a * 17 + b * 31 + a * b * 7) % 100; + if hash_value >= (#ratio * 100.0) as usize { + blocked_addresses.push(std::net::SocketAddr::from((std::net::Ipv4Addr::LOCALHOST, #port_var))); + } + } + }) + .collect(); + + quote! { + let mut blocked_addresses = Vec::new(); + #(#peer_checks)* + let blocked_addresses = Some(blocked_addresses); + } + } else { + quote! { + let blocked_addresses = None; + } + }; + // Peer node configuration setup_code.push(quote! { let (#config_var, #temp_var) = { @@ -249,6 +333,8 @@ fn generate_node_setup(args: &FreenetTestArgs) -> TokenStream { let location: f64 = rand::Rng::random(&mut rand::rng()); + #blocked_addresses_code + let config = freenet::config::ConfigArgs { ws_api: freenet::config::WebsocketApiArgs { address: Some(std::net::Ipv4Addr::LOCALHOST.into()), @@ -267,7 +353,7 @@ fn generate_node_setup(args: &FreenetTestArgs) -> TokenStream { address: Some(std::net::Ipv4Addr::LOCALHOST.into()), network_port: Some(network_port), bandwidth_limit: None, - blocked_addresses: None, + blocked_addresses, }, config_paths: freenet::config::ConfigPathsArgs { config_dir: Some(temp_dir.path().to_path_buf()), @@ -339,27 +425,18 @@ fn generate_node_builds(args: &FreenetTestArgs) -> TokenStream { fn generate_node_tasks(args: &FreenetTestArgs) -> TokenStream { let mut tasks = Vec::new(); - for (idx, node_label) in args.nodes.iter().enumerate() { + for (idx, _node_label) in args.nodes.iter().enumerate() { let task_var = format_ident!("node_task_{}", idx); let node_var = format_ident!("node_{}", idx); tasks.push(quote! { - let #task_var = { - let node = #node_var; - async move { - tracing::info!("Node running: {}", #node_label); - node.run().await - } - .instrument(tracing::info_span!("test_peer", test_node = #node_label)) - .boxed_local() - }; + let #task_var = tokio::task::spawn_local(#node_var.run()); + // Small delay to ensure proper task startup ordering + tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; }); } quote! { - use futures::FutureExt; - use tracing::Instrument; - #(#tasks)* } } @@ -423,45 +500,24 @@ fn generate_context_creation_with_handles(args: &FreenetTestArgs) -> TokenStream } } -/// Generate test coordination code with select! +/// Generate test coordination code with spawned tasks fn generate_test_coordination(args: &FreenetTestArgs, inner_fn_name: &syn::Ident) -> TokenStream { let timeout_secs = args.timeout_secs; let startup_wait_secs = args.startup_wait_secs; - // Generate select! arms for each node - let mut select_arms = Vec::new(); - for (idx, node_label) in args.nodes.iter().enumerate() { - let task_var = format_ident!("node_task_{}", idx); - select_arms.push(quote! { - result = #task_var => { - Err(anyhow!("Node '{}' exited unexpectedly: {:?}", #node_label, result)) - } - }); - } - - // Add test arm - select_arms.push(quote! { - result = test_future => { - match result { - Ok(Ok(val)) => { - // Give event loggers time to flush their batches - // The EventRegister is cloned multiple times, so all senders need to be dropped - // before the record_logs task will exit and flush remaining events - tokio::time::sleep(Duration::from_secs(5)).await; - Ok(val) - }, - Ok(Err(e)) => { - // Also flush on error - tokio::time::sleep(Duration::from_secs(5)).await; - Err(e) - }, - Err(_) => Err(anyhow!("Test timed out after {} seconds", #timeout_secs)), - } - } - }); + // Collect all node task variables + let task_vars: Vec<_> = (0..args.nodes.len()) + .map(|idx| format_ident!("node_task_{}", idx)) + .collect(); + let node_labels = &args.nodes; quote! { - let test_future = tokio::time::timeout( + // Store node handles for monitoring (optional) + let _node_handles = vec![#(#task_vars),*]; + let _node_labels = vec![#(#node_labels),*]; + + // Run test with timeout + let test_result = tokio::time::timeout( Duration::from_secs(#timeout_secs), async { // Wait for nodes to start @@ -472,10 +528,21 @@ fn generate_test_coordination(args: &FreenetTestArgs, inner_fn_name: &syn::Ident // Run user's test #inner_fn_name(&mut ctx).await } - ); - - select! { - #(#select_arms),* + ).await; + + // Check test result + match test_result { + Ok(Ok(val)) => { + // Give event loggers time to flush their batches + tokio::time::sleep(Duration::from_secs(5)).await; + Ok(val) + }, + Ok(Err(e)) => { + // Also flush on error + tokio::time::sleep(Duration::from_secs(5)).await; + Err(e) + }, + Err(_) => Err(anyhow!("Test timed out after {} seconds", #timeout_secs)), } } } diff --git a/crates/freenet-macros/src/lib.rs b/crates/freenet-macros/src/lib.rs index 988b3c38e..94a68ed9e 100644 --- a/crates/freenet-macros/src/lib.rs +++ b/crates/freenet-macros/src/lib.rs @@ -86,15 +86,14 @@ use parser::FreenetTestArgs; /// } /// ``` /// -/// ## Auto-Connect Peers to Gateways +/// ## Auto-Connect Peers to Gateways (Default Behavior) /// ```ignore /// #[freenet_test( /// nodes = ["gateway", "peer-1", "peer-2"], -/// auto_connect_peers = true, // Peers will connect to gateway /// timeout_secs = 120 /// )] /// async fn test_with_connections(ctx: &mut TestContext) -> TestResult { -/// // Peers are configured to discover and connect to the gateway +/// // Peers are automatically configured to connect to the gateway (default: auto_connect_peers = true) /// let gateway = ctx.gateway()?; /// let peers = ctx.peers(); /// // Test peer-gateway interactions... diff --git a/crates/freenet-macros/src/parser.rs b/crates/freenet-macros/src/parser.rs index 76864685e..6a6eb4bc8 100644 --- a/crates/freenet-macros/src/parser.rs +++ b/crates/freenet-macros/src/parser.rs @@ -21,6 +21,8 @@ pub struct FreenetTestArgs { pub tokio_flavor: TokioFlavor, /// Tokio worker threads pub tokio_worker_threads: Option, + /// Connectivity ratio between peers (0.0-1.0), controlling partial connectivity + pub peer_connectivity_ratio: Option, } #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -40,13 +42,18 @@ impl syn::parse::Parse for FreenetTestArgs { fn parse(input: syn::parse::ParseStream) -> syn::Result { let mut nodes = None; let mut gateways = None; - let mut auto_connect_peers = false; + let mut auto_connect_peers = true; let mut timeout_secs = 180; let mut startup_wait_secs = 15; let mut aggregate_events = AggregateEventsMode::OnFailure; - let mut log_level = "freenet=debug,info".to_string(); + // Default log level: freenet=debug for general debugging, + // transport=warn to reduce keep-alive/connection noise, + // p2p_protoc=info to reduce connection timeout spam, + // info as global default + let mut log_level = "freenet=debug,freenet_core::transport=warn,freenet::node::network_bridge::p2p_protoc=info,info".to_string(); let mut tokio_flavor = TokioFlavor::CurrentThread; let mut tokio_worker_threads = None; + let mut peer_connectivity_ratio = None; // Parse key-value pairs while !input.is_empty() { @@ -159,6 +166,17 @@ impl syn::parse::Parse for FreenetTestArgs { let lit: syn::LitBool = input.parse()?; auto_connect_peers = lit.value; } + "peer_connectivity_ratio" => { + let lit: syn::LitFloat = input.parse()?; + let ratio: f64 = lit.base10_parse()?; + if !(0.0..=1.0).contains(&ratio) { + return Err(syn::Error::new( + lit.span(), + "peer_connectivity_ratio must be between 0.0 and 1.0", + )); + } + peer_connectivity_ratio = Some(ratio); + } _ => { return Err(syn::Error::new( key.span(), @@ -201,6 +219,7 @@ impl syn::parse::Parse for FreenetTestArgs { log_level, tokio_flavor, tokio_worker_threads, + peer_connectivity_ratio, }) } } @@ -218,9 +237,11 @@ mod tests { let args: FreenetTestArgs = syn::parse2(tokens).unwrap(); assert_eq!(args.nodes, vec!["gateway", "peer-1"]); + assert_eq!(args.auto_connect_peers, true); // Verify default is true assert_eq!(args.timeout_secs, 180); assert_eq!(args.startup_wait_secs, 15); assert_eq!(args.aggregate_events, AggregateEventsMode::OnFailure); + assert_eq!(args.log_level, "freenet=debug,freenet_core::transport=warn,freenet::node::network_bridge::p2p_protoc=info,info"); } #[test] @@ -261,4 +282,28 @@ mod tests { let result: Result = syn::parse2(tokens); assert!(result.is_err()); } + + #[test] + fn test_auto_connect_peers_explicit_false() { + let tokens = quote! { + nodes = ["gateway", "peer-1"], + auto_connect_peers = false + }; + + let args: FreenetTestArgs = syn::parse2(tokens).unwrap(); + assert_eq!(args.nodes, vec!["gateway", "peer-1"]); + assert_eq!(args.auto_connect_peers, false); // Verify explicit false works + } + + #[test] + fn test_auto_connect_peers_explicit_true() { + let tokens = quote! { + nodes = ["gateway", "peer-1"], + auto_connect_peers = true + }; + + let args: FreenetTestArgs = syn::parse2(tokens).unwrap(); + assert_eq!(args.nodes, vec!["gateway", "peer-1"]); + assert_eq!(args.auto_connect_peers, true); + } } diff --git a/docs/debugging/testing-logging-guide.md b/docs/debugging/testing-logging-guide.md index 8367c4dde..5bbc0757e 100644 --- a/docs/debugging/testing-logging-guide.md +++ b/docs/debugging/testing-logging-guide.md @@ -72,7 +72,6 @@ use freenet_macros::freenet_test; #[freenet_test( nodes = ["gateway", "peer-1", "peer-2"], - auto_connect_peers = true, aggregate_events = "on_failure" // Automatic event aggregation! )] async fn test_network_operation(ctx: &mut TestContext) -> TestResult { @@ -156,7 +155,6 @@ async fn test_gateway_starts(ctx: &mut TestContext) -> TestResult { ```rust #[freenet_test( nodes = ["gateway", "peer-1", "peer-2"], - auto_connect_peers = true, timeout_secs = 180, startup_wait_secs = 15 )]