From 78fa5123e6ed1c221e5265ebc4f66c2154ad4591 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Wed, 5 Nov 2025 17:19:19 +0000 Subject: [PATCH 1/3] avoid dyn in gossip validation --- src/chain/bitcoind.rs | 74 +++++++++++++++++++++++++++++++++++++++---- src/chain/mod.rs | 7 ++-- src/gossip.rs | 2 +- src/types.rs | 5 +-- 4 files changed, 74 insertions(+), 14 deletions(-) diff --git a/src/chain/bitcoind.rs b/src/chain/bitcoind.rs index 4b7cd588f..e232495ab 100644 --- a/src/chain/bitcoind.rs +++ b/src/chain/bitcoind.rs @@ -12,7 +12,7 @@ use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; use base64::prelude::BASE64_STANDARD; use base64::Engine; -use bitcoin::{BlockHash, FeeRate, Network, Transaction, Txid}; +use bitcoin::{BlockHash, FeeRate, Network, OutPoint, Transaction, Txid}; use lightning::chain::chaininterface::ConfirmationTarget as LdkConfirmationTarget; use lightning::chain::Listen; use lightning::util::ser::Writeable; @@ -120,7 +120,7 @@ impl BitcoindChainSource { } } - pub(super) fn as_utxo_source(&self) -> Arc { + pub(super) fn as_utxo_source(&self) -> UtxoSourceClient { self.api_client.utxo_source() } @@ -625,6 +625,68 @@ impl BitcoindChainSource { } } +#[derive(Clone)] +pub(crate) enum UtxoSourceClient { + Rpc(Arc), + Rest(Arc), +} + +impl std::ops::Deref for UtxoSourceClient { + type Target = Self; + fn deref(&self) -> &Self { + self + } +} + +impl BlockSource for UtxoSourceClient { + fn get_header<'a>( + &'a self, + header_hash: &'a BlockHash, + height_hint: Option, + ) -> AsyncBlockSourceResult<'a, BlockHeaderData> { + match self { + Self::Rpc(client) => client.get_header(header_hash, height_hint), + Self::Rest(client) => client.get_header(header_hash, height_hint), + } + } + + fn get_block<'a>( + &'a self, + header_hash: &'a BlockHash, + ) -> AsyncBlockSourceResult<'a, BlockData> { + match self { + Self::Rpc(client) => client.get_block(header_hash), + Self::Rest(client) => client.get_block(header_hash), + } + } + + fn get_best_block( + &self, + ) -> AsyncBlockSourceResult<'_, (BlockHash, Option)> { + match self { + Self::Rpc(client) => client.get_best_block(), + Self::Rest(client) => client.get_best_block(), + } + } +} + + +impl UtxoSource for UtxoSourceClient { + fn get_block_hash_by_height<'a>(&'a self, block_height: u32) -> AsyncBlockSourceResult<'a, BlockHash> { + match self { + Self::Rpc(client) => client.get_block_hash_by_height(block_height), + Self::Rest(client) => client.get_block_hash_by_height(block_height), + } + } + + fn is_output_unspent<'a>(&'a self, outpoint: OutPoint) -> AsyncBlockSourceResult<'a, bool> { + match self { + Self::Rpc(client) => client.is_output_unspent(outpoint), + Self::Rest(client) => client.is_output_unspent(outpoint), + } + } +} + pub enum BitcoindClient { Rpc { rpc_client: Arc, @@ -686,12 +748,10 @@ impl BitcoindClient { } } - pub(crate) fn utxo_source(&self) -> Arc { + fn utxo_source(&self) -> UtxoSourceClient { match self { - BitcoindClient::Rpc { rpc_client, .. } => Arc::clone(rpc_client) as Arc, - BitcoindClient::Rest { rest_client, .. } => { - Arc::clone(rest_client) as Arc - }, + Self::Rpc { rpc_client, .. } => UtxoSourceClient::Rpc(Arc::clone(&rpc_client)), + Self::Rest { rest_client, .. } => UtxoSourceClient::Rest(Arc::clone(&rest_client)), } } diff --git a/src/chain/mod.rs b/src/chain/mod.rs index 309d60eab..ed626b654 100644 --- a/src/chain/mod.rs +++ b/src/chain/mod.rs @@ -5,7 +5,7 @@ // http://opensource.org/licenses/MIT>, at your option. You may not use this file except in // accordance with one or both of these licenses. -mod bitcoind; +pub(crate) mod bitcoind; mod electrum; mod esplora; @@ -15,9 +15,8 @@ use std::time::Duration; use bitcoin::{Script, Txid}; use lightning::chain::Filter; -use lightning_block_sync::gossip::UtxoSource; -use crate::chain::bitcoind::BitcoindChainSource; +use crate::chain::bitcoind::{BitcoindChainSource, UtxoSourceClient}; use crate::chain::electrum::ElectrumChainSource; use crate::chain::esplora::EsploraChainSource; use crate::config::{ @@ -205,7 +204,7 @@ impl ChainSource { } } - pub(crate) fn as_utxo_source(&self) -> Option> { + pub(crate) fn as_utxo_source(&self) -> Option { match &self.kind { ChainSourceKind::Bitcoind(bitcoind_chain_source) => { Some(bitcoind_chain_source.as_utxo_source()) diff --git a/src/gossip.rs b/src/gossip.rs index 01aff4742..563d9e1ea 100644 --- a/src/gossip.rs +++ b/src/gossip.rs @@ -71,7 +71,7 @@ impl GossipSource { if let Some(utxo_source) = chain_source.as_utxo_source() { let spawner = RuntimeSpawner::new(Arc::clone(&runtime)); let gossip_verifier = Arc::new(GossipVerifier::new( - utxo_source, + Arc::new(utxo_source), spawner, Arc::clone(gossip_sync), peer_manager, diff --git a/src/types.rs b/src/types.rs index 71512b2cd..4d537508b 100644 --- a/src/types.rs +++ b/src/types.rs @@ -24,11 +24,12 @@ use lightning::sign::InMemorySigner; use lightning::util::persist::{KVStore, KVStoreSync, MonitorUpdatingPersister}; use lightning::util::ser::{Readable, Writeable, Writer}; use lightning::util::sweep::OutputSweeper; -use lightning_block_sync::gossip::{GossipVerifier, UtxoSource}; +use lightning_block_sync::gossip::GossipVerifier; use lightning_liquidity::utils::time::DefaultTimeProvider; use lightning_net_tokio::SocketDescriptor; use crate::chain::ChainSource; +use crate::chain::bitcoind::UtxoSourceClient; use crate::config::ChannelConfig; use crate::data_store::DataStore; use crate::fee_estimator::OnchainFeeEstimator; @@ -120,7 +121,7 @@ pub(crate) type Scorer = CombinedScorer, Arc>; pub(crate) type Graph = gossip::NetworkGraph>; -pub(crate) type UtxoLookup = GossipVerifier, Arc>; +pub(crate) type UtxoLookup = GossipVerifier, Arc>; pub(crate) type P2PGossipSync = lightning::routing::gossip::P2PGossipSync, Arc, Arc>; From 9ed1e828c79d8c60f2188b398c8ef1856dcc5506 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Wed, 5 Nov 2025 20:30:24 +0000 Subject: [PATCH 2/3] wrap --- Cargo.toml | 52 ++++---- src/builder.rs | 26 ++-- src/chain/bitcoind.rs | 111 ++++++++++-------- src/data_store.rs | 3 +- src/event.rs | 5 +- src/io/sqlite_store/mod.rs | 26 ++-- src/io/vss_store.rs | 9 +- src/lib.rs | 6 +- .../asynchronous/static_invoice_store.rs | 4 +- src/peer_store.rs | 4 +- src/scoring.rs | 3 +- src/types.rs | 90 +++++++++++++- src/wallet/mod.rs | 26 ++-- tests/common/mod.rs | 29 +++-- tests/integration_tests_rust.rs | 10 +- 15 files changed, 252 insertions(+), 152 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 701d9ddb3..3301c1ba2 100755 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,29 +29,29 @@ panic = 'abort' # Abort on panic default = [] [dependencies] -lightning = { version = "0.2.0-rc1", features = ["std"] } -lightning-types = { version = "0.3.0-rc1" } -lightning-invoice = { version = "0.34.0-rc1", features = ["std"] } -lightning-net-tokio = { version = "0.2.0-rc1" } -lightning-persister = { version = "0.2.0-rc1", features = ["tokio"] } -lightning-background-processor = { version = "0.2.0-rc1" } -lightning-rapid-gossip-sync = { version = "0.2.0-rc1" } -lightning-block-sync = { version = "0.2.0-rc1", features = ["rest-client", "rpc-client", "tokio"] } -lightning-transaction-sync = { version = "0.2.0-rc1", features = ["esplora-async-https", "time", "electrum-rustls-ring"] } -lightning-liquidity = { version = "0.2.0-rc1", features = ["std"] } -lightning-macros = { version = "0.2.0-rc1" } - -#lightning = { git = "https://github.com/lightningdevkit/rust-lightning", branch = "main", features = ["std"] } -#lightning-types = { git = "https://github.com/lightningdevkit/rust-lightning", branch = "main" } -#lightning-invoice = { git = "https://github.com/lightningdevkit/rust-lightning", branch = "main", features = ["std"] } -#lightning-net-tokio = { git = "https://github.com/lightningdevkit/rust-lightning", branch = "main" } -#lightning-persister = { git = "https://github.com/lightningdevkit/rust-lightning", branch = "main", features = ["tokio"] } -#lightning-background-processor = { git = "https://github.com/lightningdevkit/rust-lightning", branch = "main" } -#lightning-rapid-gossip-sync = { git = "https://github.com/lightningdevkit/rust-lightning", branch = "main" } -#lightning-block-sync = { git = "https://github.com/lightningdevkit/rust-lightning", branch = "main", features = ["rest-client", "rpc-client", "tokio"] } -#lightning-transaction-sync = { git = "https://github.com/lightningdevkit/rust-lightning", branch = "main", features = ["esplora-async-https", "electrum-rustls-ring", "time"] } -#lightning-liquidity = { git = "https://github.com/lightningdevkit/rust-lightning", branch = "main" } -#lightning-macros = { git = "https://github.com/lightningdevkit/rust-lightning", branch = "main" } +#lightning = { version = "0.2.0-rc1", features = ["std"] } +#lightning-types = { version = "0.3.0-rc1" } +#lightning-invoice = { version = "0.34.0-rc1", features = ["std"] } +#lightning-net-tokio = { version = "0.2.0-rc1" } +#lightning-persister = { version = "0.2.0-rc1", features = ["tokio"] } +#lightning-background-processor = { version = "0.2.0-rc1" } +#lightning-rapid-gossip-sync = { version = "0.2.0-rc1" } +#lightning-block-sync = { version = "0.2.0-rc1", features = ["rest-client", "rpc-client", "tokio"] } +#lightning-transaction-sync = { version = "0.2.0-rc1", features = ["esplora-async-https", "time", "electrum-rustls-ring"] } +#lightning-liquidity = { version = "0.2.0-rc1", features = ["std"] } +#lightning-macros = { version = "0.2.0-rc1" } + +lightning = { git = "https://git.bitcoin.ninja/rust-lightning", branch = "2025-10-msrv-less-box", features = ["std"] } +lightning-types = { git = "https://git.bitcoin.ninja/rust-lightning", branch = "2025-10-msrv-less-box" } +lightning-invoice = { git = "https://git.bitcoin.ninja/rust-lightning", branch = "2025-10-msrv-less-box", features = ["std"] } +lightning-net-tokio = { git = "https://git.bitcoin.ninja/rust-lightning", branch = "2025-10-msrv-less-box" } +lightning-persister = { git = "https://git.bitcoin.ninja/rust-lightning", branch = "2025-10-msrv-less-box", features = ["tokio"] } +lightning-background-processor = { git = "https://git.bitcoin.ninja/rust-lightning", branch = "2025-10-msrv-less-box" } +lightning-rapid-gossip-sync = { git = "https://git.bitcoin.ninja/rust-lightning", branch = "2025-10-msrv-less-box" } +lightning-block-sync = { git = "https://git.bitcoin.ninja/rust-lightning", branch = "2025-10-msrv-less-box", features = ["rest-client", "rpc-client", "tokio"] } +lightning-transaction-sync = { git = "https://git.bitcoin.ninja/rust-lightning", branch = "2025-10-msrv-less-box", features = ["esplora-async-https", "electrum-rustls-ring", "time"] } +lightning-liquidity = { git = "https://git.bitcoin.ninja/rust-lightning", branch = "2025-10-msrv-less-box" } +lightning-macros = { git = "https://git.bitcoin.ninja/rust-lightning", branch = "2025-10-msrv-less-box" } #lightning = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "21e9a9c0ef80021d0669f2a366f55d08ba8d9b03", features = ["std"] } #lightning-types = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "21e9a9c0ef80021d0669f2a366f55d08ba8d9b03" } @@ -108,9 +108,9 @@ prost = { version = "0.11.6", default-features = false} winapi = { version = "0.3", features = ["winbase"] } [dev-dependencies] -lightning = { version = "0.2.0-rc1", features = ["std", "_test_utils"] } -#lightning = { git = "https://github.com/lightningdevkit/rust-lightning", branch="main", features = ["std", "_test_utils"] } -#lightning = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "21e9a9c0ef80021d0669f2a366f55d08ba8d9b03", features = ["std", "_test_utils"] } +#lightning = { version = "0.2.0-rc1", features = ["std", "_test_utils"] } +lightning = { git = "https://git.bitcoin.ninja/rust-lightning", branch="2025-10-msrv-less-box", features = ["std", "_test_utils"] } +#lightning = { git = "https://git.bitcoin.ninja/rust-lightning", rev = "21e9a9c0ef80021d0669f2a366f55d08ba8d9b03", features = ["std", "_test_utils"] } #lightning = { path = "../rust-lightning/lightning", features = ["std", "_test_utils"] } proptest = "1.0.0" regex = "1.5.6" diff --git a/src/builder.rs b/src/builder.rs index c0e39af7a..bcf48daaa 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -69,8 +69,8 @@ use crate::peer_store::PeerStore; use crate::runtime::Runtime; use crate::tx_broadcaster::TransactionBroadcaster; use crate::types::{ - ChainMonitor, ChannelManager, DynStore, GossipSync, Graph, KeysManager, MessageRouter, - OnionMessenger, PaymentStore, PeerManager, Persister, + ChainMonitor, ChannelManager, DynStore, DynStoreWrapper, GossipSync, Graph, KeysManager, + MessageRouter, OnionMessenger, PaymentStore, PeerManager, Persister, SyncAndAsyncKVStore, }; use crate::wallet::persist::KVStoreWalletPersister; use crate::wallet::Wallet; @@ -588,14 +588,12 @@ impl NodeBuilder { let storage_dir_path = self.config.storage_dir_path.clone(); fs::create_dir_all(storage_dir_path.clone()) .map_err(|_| BuildError::StoragePathAccessFailed)?; - let kv_store = Arc::new( - SqliteStore::new( - storage_dir_path.into(), - Some(io::sqlite_store::SQLITE_DB_FILE_NAME.to_string()), - Some(io::sqlite_store::KV_TABLE_NAME.to_string()), - ) - .map_err(|_| BuildError::KVStoreSetupFailed)?, - ); + let kv_store = SqliteStore::new( + storage_dir_path.into(), + Some(io::sqlite_store::SQLITE_DB_FILE_NAME.to_string()), + Some(io::sqlite_store::KV_TABLE_NAME.to_string()), + ) + .map_err(|_| BuildError::KVStoreSetupFailed)?; self.build_with_store(kv_store) } @@ -607,7 +605,7 @@ impl NodeBuilder { fs::create_dir_all(storage_dir_path.clone()) .map_err(|_| BuildError::StoragePathAccessFailed)?; - let kv_store = Arc::new(FilesystemStore::new(storage_dir_path)); + let kv_store = FilesystemStore::new(storage_dir_path); self.build_with_store(kv_store) } @@ -743,12 +741,12 @@ impl NodeBuilder { seed_bytes, runtime, logger, - Arc::new(vss_store), + Arc::new(DynStoreWrapper(vss_store)), ) } /// Builds a [`Node`] instance according to the options previously configured. - pub fn build_with_store(&self, kv_store: Arc) -> Result { + pub fn build_with_store(&self, kv_store: S) -> Result { let logger = setup_logger(&self.log_writer_config, &self.config)?; let runtime = if let Some(handle) = self.runtime_handle.as_ref() { @@ -777,7 +775,7 @@ impl NodeBuilder { seed_bytes, runtime, logger, - kv_store, + Arc::new(DynStoreWrapper(kv_store)), ) } } diff --git a/src/chain/bitcoind.rs b/src/chain/bitcoind.rs index e232495ab..9d7ca2e89 100644 --- a/src/chain/bitcoind.rs +++ b/src/chain/bitcoind.rs @@ -6,6 +6,7 @@ // accordance with one or both of these licenses. use std::collections::{HashMap, VecDeque}; +use std::future::Future; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::{Arc, Mutex, RwLock}; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; @@ -23,7 +24,7 @@ use lightning_block_sync::poll::{ChainPoller, ChainTip, ValidatedBlockHeader}; use lightning_block_sync::rest::RestClient; use lightning_block_sync::rpc::{RpcClient, RpcError}; use lightning_block_sync::{ - AsyncBlockSourceResult, BlockData, BlockHeaderData, BlockSource, BlockSourceErrorKind, Cache, + BlockData, BlockHeaderData, BlockSource, BlockSourceError, BlockSourceErrorKind, Cache, SpvClient, }; use serde::Serialize; @@ -643,46 +644,56 @@ impl BlockSource for UtxoSourceClient { &'a self, header_hash: &'a BlockHash, height_hint: Option, - ) -> AsyncBlockSourceResult<'a, BlockHeaderData> { - match self { - Self::Rpc(client) => client.get_header(header_hash, height_hint), - Self::Rest(client) => client.get_header(header_hash, height_hint), + ) -> impl Future> + 'a { + async move { + match self { + Self::Rpc(client) => client.get_header(header_hash, height_hint).await, + Self::Rest(client) => client.get_header(header_hash, height_hint).await, + } } } fn get_block<'a>( &'a self, header_hash: &'a BlockHash, - ) -> AsyncBlockSourceResult<'a, BlockData> { - match self { - Self::Rpc(client) => client.get_block(header_hash), - Self::Rest(client) => client.get_block(header_hash), + ) -> impl Future> + 'a { + async move { + match self { + Self::Rpc(client) => client.get_block(header_hash).await, + Self::Rest(client) => client.get_block(header_hash).await, + } } } - fn get_best_block( - &self, - ) -> AsyncBlockSourceResult<'_, (BlockHash, Option)> { - match self { - Self::Rpc(client) => client.get_best_block(), - Self::Rest(client) => client.get_best_block(), + fn get_best_block<'a>( + &'a self, + ) -> impl Future), BlockSourceError>> + 'a { + async move { + match self { + Self::Rpc(client) => client.get_best_block().await, + Self::Rest(client) => client.get_best_block().await, + } } } } impl UtxoSource for UtxoSourceClient { - fn get_block_hash_by_height<'a>(&'a self, block_height: u32) -> AsyncBlockSourceResult<'a, BlockHash> { - match self { - Self::Rpc(client) => client.get_block_hash_by_height(block_height), - Self::Rest(client) => client.get_block_hash_by_height(block_height), + fn get_block_hash_by_height<'a>(&'a self, block_height: u32) -> impl Future> + 'a { + async move { + match self { + Self::Rpc(client) => client.get_block_hash_by_height(block_height).await, + Self::Rest(client) => client.get_block_hash_by_height(block_height).await, + } } } - fn is_output_unspent<'a>(&'a self, outpoint: OutPoint) -> AsyncBlockSourceResult<'a, bool> { - match self { - Self::Rpc(client) => client.is_output_unspent(outpoint), - Self::Rest(client) => client.is_output_unspent(outpoint), + fn is_output_unspent<'a>(&'a self, outpoint: OutPoint) -> impl Future> + 'a { + async move { + match self { + Self::Rpc(client) => client.is_output_unspent(outpoint).await, + Self::Rest(client) => client.is_output_unspent(outpoint).await, + } } } } @@ -1235,38 +1246,44 @@ impl BitcoindClient { impl BlockSource for BitcoindClient { fn get_header<'a>( &'a self, header_hash: &'a bitcoin::BlockHash, height_hint: Option, - ) -> AsyncBlockSourceResult<'a, BlockHeaderData> { - match self { - BitcoindClient::Rpc { rpc_client, .. } => { - Box::pin(async move { rpc_client.get_header(header_hash, height_hint).await }) - }, - BitcoindClient::Rest { rest_client, .. } => { - Box::pin(async move { rest_client.get_header(header_hash, height_hint).await }) - }, + ) -> impl Future> + 'a { + async move { + match self { + BitcoindClient::Rpc { rpc_client, .. } => { + rpc_client.get_header(header_hash, height_hint).await + }, + BitcoindClient::Rest { rest_client, .. } => { + rest_client.get_header(header_hash, height_hint).await + }, + } } } fn get_block<'a>( &'a self, header_hash: &'a bitcoin::BlockHash, - ) -> AsyncBlockSourceResult<'a, BlockData> { - match self { - BitcoindClient::Rpc { rpc_client, .. } => { - Box::pin(async move { rpc_client.get_block(header_hash).await }) - }, - BitcoindClient::Rest { rest_client, .. } => { - Box::pin(async move { rest_client.get_block(header_hash).await }) - }, + ) -> impl Future> + 'a { + async move { + match self { + BitcoindClient::Rpc { rpc_client, .. } => { + rpc_client.get_block(header_hash).await + }, + BitcoindClient::Rest { rest_client, .. } => { + rest_client.get_block(header_hash).await + }, + } } } - fn get_best_block(&self) -> AsyncBlockSourceResult<'_, (bitcoin::BlockHash, Option)> { - match self { - BitcoindClient::Rpc { rpc_client, .. } => { - Box::pin(async move { rpc_client.get_best_block().await }) - }, - BitcoindClient::Rest { rest_client, .. } => { - Box::pin(async move { rest_client.get_best_block().await }) - }, + fn get_best_block<'a>(&'a self) -> impl Future), BlockSourceError>> + 'a { + async move { + match self { + BitcoindClient::Rpc { rpc_client, .. } => { + rpc_client.get_best_block().await + }, + BitcoindClient::Rest { rest_client, .. } => { + rest_client.get_best_block().await + }, + } } } } diff --git a/src/data_store.rs b/src/data_store.rs index 83cbf4476..584ec2d8d 100644 --- a/src/data_store.rs +++ b/src/data_store.rs @@ -176,6 +176,7 @@ mod tests { use super::*; use crate::hex_utils; + use crate::types::DynStoreWrapper; #[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)] struct TestObjectId { @@ -234,7 +235,7 @@ mod tests { #[test] fn data_is_persisted() { - let store: Arc = Arc::new(TestStore::new(false)); + let store: Arc = Arc::new(DynStoreWrapper(TestStore::new(false))); let logger = Arc::new(TestLogger::new()); let primary_namespace = "datastore_test_primary".to_string(); let secondary_namespace = "datastore_test_secondary".to_string(); diff --git a/src/event.rs b/src/event.rs index eedfb1c14..0f7a7a3df 100644 --- a/src/event.rs +++ b/src/event.rs @@ -1608,10 +1608,11 @@ mod tests { use lightning::util::test_utils::{TestLogger, TestStore}; use super::*; + use crate::types::DynStoreWrapper; #[tokio::test] async fn event_queue_persistence() { - let store: Arc = Arc::new(TestStore::new(false)); + let store: Arc = Arc::new(DynStoreWrapper(TestStore::new(false))); let logger = Arc::new(TestLogger::new()); let event_queue = Arc::new(EventQueue::new(Arc::clone(&store), Arc::clone(&logger))); assert_eq!(event_queue.next_event(), None); @@ -1647,7 +1648,7 @@ mod tests { #[tokio::test] async fn event_queue_concurrency() { - let store: Arc = Arc::new(TestStore::new(false)); + let store: Arc = Arc::new(DynStoreWrapper(TestStore::new(false))); let logger = Arc::new(TestLogger::new()); let event_queue = Arc::new(EventQueue::new(Arc::clone(&store), Arc::clone(&logger))); assert_eq!(event_queue.next_event(), None); diff --git a/src/io/sqlite_store/mod.rs b/src/io/sqlite_store/mod.rs index 789330cef..e4091b24e 100644 --- a/src/io/sqlite_store/mod.rs +++ b/src/io/sqlite_store/mod.rs @@ -6,12 +6,10 @@ // accordance with one or both of these licenses. //! Objects related to [`SqliteStore`] live here. -use std::boxed::Box; use std::collections::HashMap; use std::fs; use std::future::Future; use std::path::PathBuf; -use std::pin::Pin; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::{Arc, Mutex}; @@ -92,7 +90,7 @@ impl SqliteStore { impl KVStore for SqliteStore { fn read( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, - ) -> Pin, io::Error>> + Send>> { + ) -> impl Future, io::Error>> + 'static + Send { let primary_namespace = primary_namespace.to_string(); let secondary_namespace = secondary_namespace.to_string(); let key = key.to_string(); @@ -100,17 +98,17 @@ impl KVStore for SqliteStore { let fut = tokio::task::spawn_blocking(move || { inner.read_internal(&primary_namespace, &secondary_namespace, &key) }); - Box::pin(async move { + async move { fut.await.unwrap_or_else(|e| { let msg = format!("Failed to IO operation due join error: {}", e); Err(io::Error::new(io::ErrorKind::Other, msg)) }) - }) + } } fn write( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, - ) -> Pin> + Send>> { + ) -> impl Future> + 'static + Send { let locking_key = self.build_locking_key(primary_namespace, secondary_namespace, key); let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(locking_key.clone()); let primary_namespace = primary_namespace.to_string(); @@ -128,17 +126,17 @@ impl KVStore for SqliteStore { buf, ) }); - Box::pin(async move { + async move { fut.await.unwrap_or_else(|e| { let msg = format!("Failed to IO operation due join error: {}", e); Err(io::Error::new(io::ErrorKind::Other, msg)) }) - }) + } } fn remove( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, _lazy: bool, - ) -> Pin> + Send>> { + ) -> impl Future> + 'static + Send { let locking_key = self.build_locking_key(primary_namespace, secondary_namespace, key); let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(locking_key.clone()); let primary_namespace = primary_namespace.to_string(); @@ -155,29 +153,29 @@ impl KVStore for SqliteStore { &key, ) }); - Box::pin(async move { + async move { fut.await.unwrap_or_else(|e| { let msg = format!("Failed to IO operation due join error: {}", e); Err(io::Error::new(io::ErrorKind::Other, msg)) }) - }) + } } fn list( &self, primary_namespace: &str, secondary_namespace: &str, - ) -> Pin, io::Error>> + Send>> { + ) -> impl Future, io::Error>> + 'static + Send { let primary_namespace = primary_namespace.to_string(); let secondary_namespace = secondary_namespace.to_string(); let inner = Arc::clone(&self.inner); let fut = tokio::task::spawn_blocking(move || { inner.list_internal(&primary_namespace, &secondary_namespace) }); - Box::pin(async move { + async move { fut.await.unwrap_or_else(|e| { let msg = format!("Failed to IO operation due join error: {}", e); Err(io::Error::new(io::ErrorKind::Other, msg)) }) - }) + } } } diff --git a/src/io/vss_store.rs b/src/io/vss_store.rs index 028eb87e4..d2f10398c 100644 --- a/src/io/vss_store.rs +++ b/src/io/vss_store.rs @@ -10,7 +10,6 @@ use std::collections::HashMap; use std::future::Future; #[cfg(test)] use std::panic::RefUnwindSafe; -use std::pin::Pin; use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; use std::sync::{Arc, Mutex}; use std::time::Duration; @@ -242,7 +241,7 @@ impl KVStoreSync for VssStore { impl KVStore for VssStore { fn read( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, - ) -> Pin, io::Error>> + Send>> { + ) -> impl Future, io::Error>> + 'static + Send { let primary_namespace = primary_namespace.to_string(); let secondary_namespace = secondary_namespace.to_string(); let key = key.to_string(); @@ -253,7 +252,7 @@ impl KVStore for VssStore { } fn write( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, - ) -> Pin> + Send>> { + ) -> impl Future> + 'static + Send { let locking_key = self.build_locking_key(primary_namespace, secondary_namespace, key); let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(locking_key.clone()); let primary_namespace = primary_namespace.to_string(); @@ -276,7 +275,7 @@ impl KVStore for VssStore { } fn remove( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, _lazy: bool, - ) -> Pin> + Send>> { + ) -> impl Future> + 'static + Send { let locking_key = self.build_locking_key(primary_namespace, secondary_namespace, key); let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(locking_key.clone()); let primary_namespace = primary_namespace.to_string(); @@ -298,7 +297,7 @@ impl KVStore for VssStore { } fn list( &self, primary_namespace: &str, secondary_namespace: &str, - ) -> Pin, io::Error>> + Send>> { + ) -> impl Future, io::Error>> + 'static + Send { let primary_namespace = primary_namespace.to_string(); let secondary_namespace = secondary_namespace.to_string(); let inner = Arc::clone(&self.inner); diff --git a/src/lib.rs b/src/lib.rs index 6a26c6c5b..898917aaf 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -152,11 +152,11 @@ use peer_store::{PeerInfo, PeerStore}; use rand::Rng; use runtime::Runtime; use types::{ - Broadcaster, BumpTransactionEventHandler, ChainMonitor, ChannelManager, Graph, KeysManager, - OnionMessenger, PaymentStore, PeerManager, Router, Scorer, Sweeper, Wallet, + Broadcaster, BumpTransactionEventHandler, ChainMonitor, ChannelManager, DynStore, Graph, + KeysManager, OnionMessenger, PaymentStore, PeerManager, Router, Scorer, Sweeper, Wallet, }; pub use types::{ - ChannelDetails, CustomTlvRecord, DynStore, PeerDetails, SyncAndAsyncKVStore, UserChannelId, + ChannelDetails, CustomTlvRecord, PeerDetails, SyncAndAsyncKVStore, UserChannelId, }; pub use { diff --git a/src/payment/asynchronous/static_invoice_store.rs b/src/payment/asynchronous/static_invoice_store.rs index a7e2d2f9e..529b21a3f 100644 --- a/src/payment/asynchronous/static_invoice_store.rs +++ b/src/payment/asynchronous/static_invoice_store.rs @@ -161,11 +161,11 @@ mod tests { use lightning_types::features::BlindedHopFeatures; use crate::payment::asynchronous::static_invoice_store::StaticInvoiceStore; - use crate::types::DynStore; + use crate::types::{DynStore, DynStoreWrapper}; #[tokio::test] async fn static_invoice_store_test() { - let store: Arc = Arc::new(TestStore::new(false)); + let store: Arc = Arc::new(DynStoreWrapper(TestStore::new(false))); let static_invoice_store = StaticInvoiceStore::new(Arc::clone(&store)); let static_invoice = invoice(); diff --git a/src/peer_store.rs b/src/peer_store.rs index 82c80c396..24dfbeacb 100644 --- a/src/peer_store.rs +++ b/src/peer_store.rs @@ -156,9 +156,11 @@ mod tests { use super::*; + use crate::types::DynStoreWrapper; + #[test] fn peer_info_persistence() { - let store: Arc = Arc::new(TestStore::new(false)); + let store: Arc = Arc::new(DynStoreWrapper(TestStore::new(false))); let logger = Arc::new(TestLogger::new()); let peer_store = PeerStore::new(Arc::clone(&store), Arc::clone(&logger)); diff --git a/src/scoring.rs b/src/scoring.rs index 107f63f65..6d9c381d4 100644 --- a/src/scoring.rs +++ b/src/scoring.rs @@ -11,9 +11,10 @@ use crate::{ io::utils::write_external_pathfinding_scores_to_cache, logger::LdkLogger, runtime::Runtime, + types::DynStore, NodeMetrics, Scorer, }; -use crate::{write_node_metrics, DynStore, Logger}; +use crate::{write_node_metrics, Logger}; use lightning::{ log_error, log_info, log_trace, routing::scoring::ChannelLiquidities, util::ser::Readable, }; diff --git a/src/types.rs b/src/types.rs index 4d537508b..0c5116722 100644 --- a/src/types.rs +++ b/src/types.rs @@ -6,6 +6,8 @@ // accordance with one or both of these licenses. use std::fmt; +use std::future::Future; +use std::pin::Pin; use std::sync::{Arc, Mutex}; use bitcoin::secp256k1::PublicKey; @@ -49,8 +51,92 @@ where { } -/// A type alias for [`SyncAndAsyncKVStore`] with `Sync`/`Send` markers; -pub type DynStore = dyn SyncAndAsyncKVStore + Sync + Send; +pub(crate) trait DynStoreTrait : Send + Sync { + fn read_async(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> Pin, bitcoin::io::Error>> + Send + 'static>>; + fn write_async(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec) -> Pin> + Send + 'static>>; + fn remove_async(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool) -> Pin> + Send + 'static>>; + fn list_async(&self, primary_namespace: &str, secondary_namespace: &str) -> Pin, bitcoin::io::Error>> + Send + 'static>>; + + fn read(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> Result, bitcoin::io::Error>; + fn write(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec) -> Result<(), bitcoin::io::Error>; + fn remove(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool) -> Result<(), bitcoin::io::Error>; + fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> Result, bitcoin::io::Error>; +} + +impl<'a> KVStore for dyn DynStoreTrait + 'a { + fn read(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> impl Future, bitcoin::io::Error>> + Send + 'static { + DynStoreTrait::read_async(self, primary_namespace, secondary_namespace, key) + } + + fn write(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec) -> impl Future> + Send + 'static { + DynStoreTrait::write_async(self, primary_namespace, secondary_namespace, key, buf) + } + + fn remove(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool) -> impl Future> + Send + 'static { + DynStoreTrait::remove_async(self, primary_namespace, secondary_namespace, key, lazy) + } + + fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> impl Future, bitcoin::io::Error>> + Send + 'static { + DynStoreTrait::list_async(self, primary_namespace, secondary_namespace) + } +} + +impl<'a> KVStoreSync for dyn DynStoreTrait + 'a { + fn read(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> Result, bitcoin::io::Error> { + DynStoreTrait::read(self, primary_namespace, secondary_namespace, key) + } + + fn write(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec) -> Result<(), bitcoin::io::Error> { + DynStoreTrait::write(self, primary_namespace, secondary_namespace, key, buf) + } + + fn remove(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool) -> Result<(), bitcoin::io::Error> { + DynStoreTrait::remove(self, primary_namespace, secondary_namespace, key, lazy) + } + + fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> Result, bitcoin::io::Error> { + DynStoreTrait::list(self, primary_namespace, secondary_namespace) + } +} + +pub(crate) type DynStore = dyn DynStoreTrait; + +pub(crate) struct DynStoreWrapper(pub(crate) T); + +impl DynStoreTrait for DynStoreWrapper { + fn read_async(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> Pin, bitcoin::io::Error>> + Send + 'static>> { + Box::pin(KVStore::read(&self.0, primary_namespace, secondary_namespace, key)) + } + + fn write_async(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec) -> Pin> + Send + 'static>> { + Box::pin(KVStore::write(&self.0, primary_namespace, secondary_namespace, key, buf)) + } + + fn remove_async(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool) -> Pin> + Send + 'static>> { + Box::pin(KVStore::remove(&self.0, primary_namespace, secondary_namespace, key, lazy)) + } + + fn list_async(&self, primary_namespace: &str, secondary_namespace: &str) -> Pin, bitcoin::io::Error>> + Send + 'static>> { + Box::pin(KVStore::list(&self.0, primary_namespace, secondary_namespace)) + } + + fn read(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> Result, bitcoin::io::Error> { + KVStoreSync::read(&self.0, primary_namespace, secondary_namespace, key) + } + + fn write(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec) -> Result<(), bitcoin::io::Error> { + KVStoreSync::write(&self.0, primary_namespace, secondary_namespace, key, buf) + } + + fn remove(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool) -> Result<(), bitcoin::io::Error> { + KVStoreSync::remove(&self.0, primary_namespace, secondary_namespace, key, lazy) + } + + fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> Result, bitcoin::io::Error> { + KVStoreSync::list(&self.0, primary_namespace, secondary_namespace) + } + +} pub type Persister = MonitorUpdatingPersister< Arc, diff --git a/src/wallet/mod.rs b/src/wallet/mod.rs index 0f3797431..e18693b1b 100644 --- a/src/wallet/mod.rs +++ b/src/wallet/mod.rs @@ -6,7 +6,6 @@ // accordance with one or both of these licenses. use std::future::Future; -use std::pin::Pin; use std::str::FromStr; use std::sync::{Arc, Mutex}; @@ -758,20 +757,20 @@ impl Listen for Wallet { impl WalletSource for Wallet { fn list_confirmed_utxos<'a>( &'a self, - ) -> Pin, ()>> + Send + 'a>> { - Box::pin(async move { self.list_confirmed_utxos_inner() }) + ) -> impl Future, ()>> + Send + 'a { + async move { self.list_confirmed_utxos_inner() } } fn get_change_script<'a>( &'a self, - ) -> Pin> + Send + 'a>> { - Box::pin(async move { self.get_change_script_inner() }) + ) -> impl Future> + Send + 'a { + async move { self.get_change_script_inner() } } fn sign_psbt<'a>( &'a self, psbt: Psbt, - ) -> Pin> + Send + 'a>> { - Box::pin(async move { self.sign_psbt_inner(psbt) }) + ) -> impl Future> + Send + 'a { + async move { self.sign_psbt_inner(psbt) } } } @@ -917,17 +916,16 @@ impl SignerProvider for WalletKeysManager { impl ChangeDestinationSource for WalletKeysManager { fn get_change_destination_script<'a>( &'a self, - ) -> Pin> + Send + 'a>> { - let wallet = Arc::clone(&self.wallet); - let logger = Arc::clone(&self.logger); - Box::pin(async move { - wallet + ) -> impl Future> + Send + 'a { + async move { + self + .wallet .get_new_internal_address() .map_err(|e| { - log_error!(logger, "Failed to retrieve new address from wallet: {}", e); + log_error!(self.logger, "Failed to retrieve new address from wallet: {}", e); }) .map(|addr| addr.script_pubkey()) .map_err(|_| ()) - }) + } } } diff --git a/tests/common/mod.rs b/tests/common/mod.rs index dd680488c..bc7db5014 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -10,12 +10,10 @@ pub(crate) mod logging; -use std::boxed::Box; use std::collections::{HashMap, HashSet}; use std::env; use std::future::Future; use std::path::PathBuf; -use std::pin::Pin; use std::sync::{Arc, RwLock}; use std::time::Duration; @@ -411,7 +409,7 @@ pub(crate) fn setup_node_for_async_payments( let node = match config.store_type { TestStoreType::TestSyncStore => { - let kv_store = Arc::new(TestSyncStore::new(config.node_config.storage_dir_path.into())); + let kv_store = TestSyncStore::new(config.node_config.storage_dir_path.into()); builder.build_with_store(kv_store).unwrap() }, TestStoreType::Sqlite => builder.build().unwrap(), @@ -1231,6 +1229,7 @@ pub(crate) async fn do_channel_full_cycle( } // A `KVStore` impl for testing purposes that wraps all our `KVStore`s and asserts their synchronicity. +#[derive(Clone)] pub(crate) struct TestSyncStore { inner: Arc, } @@ -1245,7 +1244,7 @@ impl TestSyncStore { impl KVStore for TestSyncStore { fn read( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, - ) -> Pin, io::Error>> + Send>> { + ) -> impl Future, io::Error>> + 'static + Send { let primary_namespace = primary_namespace.to_string(); let secondary_namespace = secondary_namespace.to_string(); let key = key.to_string(); @@ -1253,16 +1252,16 @@ impl KVStore for TestSyncStore { let fut = tokio::task::spawn_blocking(move || { inner.read_internal(&primary_namespace, &secondary_namespace, &key) }); - Box::pin(async move { + async move { fut.await.unwrap_or_else(|e| { let msg = format!("Failed to IO operation due join error: {}", e); Err(io::Error::new(io::ErrorKind::Other, msg)) }) - }) + } } fn write( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, - ) -> Pin> + Send>> { + ) -> impl Future> + 'static + Send { let primary_namespace = primary_namespace.to_string(); let secondary_namespace = secondary_namespace.to_string(); let key = key.to_string(); @@ -1270,16 +1269,16 @@ impl KVStore for TestSyncStore { let fut = tokio::task::spawn_blocking(move || { inner.write_internal(&primary_namespace, &secondary_namespace, &key, buf) }); - Box::pin(async move { + async move { fut.await.unwrap_or_else(|e| { let msg = format!("Failed to IO operation due join error: {}", e); Err(io::Error::new(io::ErrorKind::Other, msg)) }) - }) + } } fn remove( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, - ) -> Pin> + Send>> { + ) -> impl Future> + 'static + Send { let primary_namespace = primary_namespace.to_string(); let secondary_namespace = secondary_namespace.to_string(); let key = key.to_string(); @@ -1287,28 +1286,28 @@ impl KVStore for TestSyncStore { let fut = tokio::task::spawn_blocking(move || { inner.remove_internal(&primary_namespace, &secondary_namespace, &key, lazy) }); - Box::pin(async move { + async move { fut.await.unwrap_or_else(|e| { let msg = format!("Failed to IO operation due join error: {}", e); Err(io::Error::new(io::ErrorKind::Other, msg)) }) - }) + } } fn list( &self, primary_namespace: &str, secondary_namespace: &str, - ) -> Pin, io::Error>> + Send>> { + ) -> impl Future, io::Error>> + 'static + Send { let primary_namespace = primary_namespace.to_string(); let secondary_namespace = secondary_namespace.to_string(); let inner = Arc::clone(&self.inner); let fut = tokio::task::spawn_blocking(move || { inner.list_internal(&primary_namespace, &secondary_namespace) }); - Box::pin(async move { + async move { fut.await.unwrap_or_else(|e| { let msg = format!("Failed to IO operation due join error: {}", e); Err(io::Error::new(io::ErrorKind::Other, msg)) }) - }) + } } } diff --git a/tests/integration_tests_rust.rs b/tests/integration_tests_rust.rs index e2d4207cd..bd76c1cb3 100644 --- a/tests/integration_tests_rust.rs +++ b/tests/integration_tests_rust.rs @@ -31,7 +31,7 @@ use ldk_node::payment::{ ConfirmationStatus, PaymentDetails, PaymentDirection, PaymentKind, PaymentStatus, QrPaymentResult, }; -use ldk_node::{Builder, DynStore, Event, NodeError}; +use ldk_node::{Builder, Event, NodeError}; use lightning::ln::channelmanager::PaymentId; use lightning::routing::gossip::{NodeAlias, NodeId}; use lightning::routing::router::RouteParametersConfig; @@ -252,14 +252,14 @@ async fn start_stop_reinit() { let esplora_url = format!("http://{}", electrsd.esplora_url.as_ref().unwrap()); - let test_sync_store: Arc = - Arc::new(TestSyncStore::new(config.node_config.storage_dir_path.clone().into())); + let test_sync_store = + TestSyncStore::new(config.node_config.storage_dir_path.clone().into()); let sync_config = EsploraSyncConfig { background_sync_config: None }; setup_builder!(builder, config.node_config); builder.set_chain_source_esplora(esplora_url.clone(), Some(sync_config)); - let node = builder.build_with_store(Arc::clone(&test_sync_store)).unwrap(); + let node = builder.build_with_store(test_sync_store.clone()).unwrap(); node.start().unwrap(); let expected_node_id = node.node_id(); @@ -297,7 +297,7 @@ async fn start_stop_reinit() { setup_builder!(builder, config.node_config); builder.set_chain_source_esplora(esplora_url.clone(), Some(sync_config)); - let reinitialized_node = builder.build_with_store(Arc::clone(&test_sync_store)).unwrap(); + let reinitialized_node = builder.build_with_store(test_sync_store).unwrap(); reinitialized_node.start().unwrap(); assert_eq!(reinitialized_node.node_id(), expected_node_id); From d65035055f2876bb2c49237c17f008fc6316b3ef Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Thu, 6 Nov 2025 12:03:46 +0000 Subject: [PATCH 3/3] f drop more boxes --- src/io/vss_store.rs | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/src/io/vss_store.rs b/src/io/vss_store.rs index d2f10398c..ad6805117 100644 --- a/src/io/vss_store.rs +++ b/src/io/vss_store.rs @@ -246,9 +246,7 @@ impl KVStore for VssStore { let secondary_namespace = secondary_namespace.to_string(); let key = key.to_string(); let inner = Arc::clone(&self.inner); - Box::pin( - async move { inner.read_internal(primary_namespace, secondary_namespace, key).await }, - ) + async move { inner.read_internal(primary_namespace, secondary_namespace, key).await } } fn write( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, @@ -259,7 +257,7 @@ impl KVStore for VssStore { let secondary_namespace = secondary_namespace.to_string(); let key = key.to_string(); let inner = Arc::clone(&self.inner); - Box::pin(async move { + async move { inner .write_internal( inner_lock_ref, @@ -271,7 +269,7 @@ impl KVStore for VssStore { buf, ) .await - }) + } } fn remove( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, _lazy: bool, @@ -282,7 +280,7 @@ impl KVStore for VssStore { let secondary_namespace = secondary_namespace.to_string(); let key = key.to_string(); let inner = Arc::clone(&self.inner); - Box::pin(async move { + async move { inner .remove_internal( inner_lock_ref, @@ -293,7 +291,7 @@ impl KVStore for VssStore { key, ) .await - }) + } } fn list( &self, primary_namespace: &str, secondary_namespace: &str, @@ -301,7 +299,7 @@ impl KVStore for VssStore { let primary_namespace = primary_namespace.to_string(); let secondary_namespace = secondary_namespace.to_string(); let inner = Arc::clone(&self.inner); - Box::pin(async move { inner.list_internal(primary_namespace, secondary_namespace).await }) + async move { inner.list_internal(primary_namespace, secondary_namespace).await } } }