Skip to content

Commit 572007a

Browse files
committed
Allow FutureSpawner to return the result of the spawned future
`tokio::spawn` can be use both to spawn a forever-running background task or to spawn a task which gets `poll`ed independently and eventually returns a result which the callsite wants. In LDK, we have only ever needed the first, and thus didn't bother defining a return type for `FutureSpawner::spawn`. However, in the next commit we'll start using `FutureSpawner` in a context where we actually do want the spawned future's result. Thus, here, we add a result output to `FutureSpawner::spawn`, mirroring the `tokio::spawn` API.
1 parent 5ef35f2 commit 572007a

File tree

3 files changed

+125
-13
lines changed

3 files changed

+125
-13
lines changed

lightning-block-sync/src/gossip.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,12 @@ pub trait UtxoSource: BlockSource + 'static {
4949
pub struct TokioSpawner;
5050
#[cfg(feature = "tokio")]
5151
impl FutureSpawner for TokioSpawner {
52-
fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
53-
tokio::spawn(future);
52+
type E = tokio::task::JoinError;
53+
type SpawnedFutureResult<O> = tokio::task::JoinHandle<O>;
54+
fn spawn<O: Send + 'static, F: Future<Output = O> + Send + 'static>(
55+
&self, future: F,
56+
) -> Self::SpawnedFutureResult<O> {
57+
tokio::spawn(future)
5458
}
5559
}
5660

@@ -280,7 +284,7 @@ where
280284
let gossiper = Arc::clone(&self.gossiper);
281285
let block_cache = Arc::clone(&self.block_cache);
282286
let pmw = Arc::clone(&self.peer_manager_wake);
283-
self.spawn.spawn(async move {
287+
let _ = self.spawn.spawn(async move {
284288
let res = Self::retrieve_utxo(source, block_cache, short_channel_id).await;
285289
fut.resolve(gossiper.network_graph(), &*gossiper, res);
286290
(pmw)();

lightning/src/util/native_async.rs

Lines changed: 109 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,23 +8,44 @@
88
//! environment.
99
1010
#[cfg(all(test, feature = "std"))]
11-
use crate::sync::Mutex;
11+
use crate::sync::{Arc, Mutex};
1212
use crate::util::async_poll::{MaybeSend, MaybeSync};
1313

14+
#[cfg(all(test, not(feature = "std")))]
15+
use alloc::rc::Rc;
16+
1417
#[cfg(all(test, not(feature = "std")))]
1518
use core::cell::RefCell;
19+
#[cfg(test)]
20+
use core::convert::Infallible;
1621
use core::future::Future;
1722
#[cfg(test)]
1823
use core::pin::Pin;
24+
#[cfg(test)]
25+
use core::task::{Context, Poll};
1926

20-
/// A generic trait which is able to spawn futures in the background.
27+
/// A generic trait which is able to spawn futures to be polled in the background.
28+
///
29+
/// When the spawned future completes, the returned [`Self::SpawnedFutureResult`] should resolve
30+
/// with the output of the spawned future.
31+
///
32+
/// Spawned futures must be polled independently in the background even if the returned
33+
/// [`Self::SpawnedFutureResult`] is dropped without being polled. This matches the semantics of
34+
/// `tokio::spawn`.
2135
///
2236
/// This is not exported to bindings users as async is only supported in Rust.
2337
pub trait FutureSpawner: MaybeSend + MaybeSync + 'static {
38+
/// The error type of [`Self::SpawnedFutureResult`]. This can be used to indicate that the
39+
/// spawned future was cancelled or panicked.
40+
type E;
41+
/// The result of [`Self::spawn`], a future which completes when the spawned future completes.
42+
type SpawnedFutureResult<O>: Future<Output = Result<O, Self::E>> + Unpin;
2443
/// Spawns the given future as a background task.
2544
///
2645
/// This method MUST NOT block on the given future immediately.
27-
fn spawn<T: Future<Output = ()> + MaybeSend + 'static>(&self, future: T);
46+
fn spawn<O: MaybeSend + 'static, T: Future<Output = O> + MaybeSend + 'static>(
47+
&self, future: T,
48+
) -> Self::SpawnedFutureResult<O>;
2849
}
2950

3051
#[cfg(test)]
@@ -39,6 +60,69 @@ pub(crate) struct FutureQueue(Mutex<Vec<Pin<Box<dyn MaybeSendableFuture>>>>);
3960
#[cfg(all(test, not(feature = "std")))]
4061
pub(crate) struct FutureQueue(RefCell<Vec<Pin<Box<dyn MaybeSendableFuture>>>>);
4162

63+
#[cfg(all(test, feature = "std"))]
64+
pub struct FutureQueueCompletion<O>(Arc<Mutex<Option<O>>>);
65+
#[cfg(all(test, not(feature = "std")))]
66+
pub struct FutureQueueCompletion<O>(Rc<RefCell<Option<O>>>);
67+
68+
#[cfg(all(test, feature = "std"))]
69+
impl<O> FutureQueueCompletion<O> {
70+
fn new() -> Self {
71+
Self(Arc::new(Mutex::new(None)))
72+
}
73+
74+
fn complete(&self, o: O) {
75+
*self.0.lock().unwrap() = Some(o);
76+
}
77+
}
78+
79+
#[cfg(all(test, feature = "std"))]
80+
impl<O> Clone for FutureQueueCompletion<O> {
81+
fn clone(&self) -> Self {
82+
Self(self.0.clone())
83+
}
84+
}
85+
86+
#[cfg(all(test, not(feature = "std")))]
87+
impl<O> FutureQueueCompletion<O> {
88+
fn new() -> Self {
89+
Self(Rc::new(RefCell::new(None)))
90+
}
91+
92+
fn complete(&self, o: O) {
93+
*self.0.borrow_mut() = Some(o);
94+
}
95+
}
96+
97+
#[cfg(all(test, not(feature = "std")))]
98+
impl<O> Clone for FutureQueueCompletion<O> {
99+
fn clone(&self) -> Self {
100+
Self(self.0.clone())
101+
}
102+
}
103+
104+
#[cfg(all(test, feature = "std"))]
105+
impl<O> Future for FutureQueueCompletion<O> {
106+
type Output = Result<O, Infallible>;
107+
fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<O, Infallible>> {
108+
match Pin::into_inner(self).0.lock().unwrap().take() {
109+
None => Poll::Pending,
110+
Some(o) => Poll::Ready(Ok(o)),
111+
}
112+
}
113+
}
114+
115+
#[cfg(all(test, not(feature = "std")))]
116+
impl<O> Future for FutureQueueCompletion<O> {
117+
type Output = Result<O, Infallible>;
118+
fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<O, Infallible>> {
119+
match Pin::into_inner(self).0.borrow_mut().take() {
120+
None => Poll::Pending,
121+
Some(o) => Poll::Ready(Ok(o)),
122+
}
123+
}
124+
}
125+
42126
#[cfg(test)]
43127
impl FutureQueue {
44128
pub(crate) fn new() -> Self {
@@ -74,7 +158,6 @@ impl FutureQueue {
74158
futures = self.0.borrow_mut();
75159
}
76160
futures.retain_mut(|fut| {
77-
use core::task::{Context, Poll};
78161
let waker = crate::util::async_poll::dummy_waker();
79162
match fut.as_mut().poll(&mut Context::from_waker(&waker)) {
80163
Poll::Ready(()) => false,
@@ -86,7 +169,16 @@ impl FutureQueue {
86169

87170
#[cfg(test)]
88171
impl FutureSpawner for FutureQueue {
89-
fn spawn<T: Future<Output = ()> + MaybeSend + 'static>(&self, future: T) {
172+
type E = Infallible;
173+
type SpawnedFutureResult<O> = FutureQueueCompletion<O>;
174+
fn spawn<O: MaybeSend + 'static, F: Future<Output = O> + MaybeSend + 'static>(
175+
&self, f: F,
176+
) -> FutureQueueCompletion<O> {
177+
let completion = FutureQueueCompletion::new();
178+
let compl_ref = completion.clone();
179+
let future = async move {
180+
compl_ref.complete(f.await);
181+
};
90182
#[cfg(feature = "std")]
91183
{
92184
self.0.lock().unwrap().push(Box::pin(future));
@@ -95,14 +187,24 @@ impl FutureSpawner for FutureQueue {
95187
{
96188
self.0.borrow_mut().push(Box::pin(future));
97189
}
190+
completion
98191
}
99192
}
100193

101194
#[cfg(test)]
102195
impl<D: core::ops::Deref<Target = FutureQueue> + MaybeSend + MaybeSync + 'static> FutureSpawner
103196
for D
104197
{
105-
fn spawn<T: Future<Output = ()> + MaybeSend + 'static>(&self, future: T) {
198+
type E = Infallible;
199+
type SpawnedFutureResult<O> = FutureQueueCompletion<O>;
200+
fn spawn<O: MaybeSend + 'static, F: Future<Output = O> + MaybeSend + 'static>(
201+
&self, f: F,
202+
) -> FutureQueueCompletion<O> {
203+
let completion = FutureQueueCompletion::new();
204+
let compl_ref = completion.clone();
205+
let future = async move {
206+
compl_ref.complete(f.await);
207+
};
106208
#[cfg(feature = "std")]
107209
{
108210
self.0.lock().unwrap().push(Box::pin(future));
@@ -111,5 +213,6 @@ impl<D: core::ops::Deref<Target = FutureQueue> + MaybeSend + MaybeSync + 'static
111213
{
112214
self.0.borrow_mut().push(Box::pin(future));
113215
}
216+
completion
114217
}
115218
}

lightning/src/util/persist.rs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use alloc::sync::Arc;
1616
use bitcoin::hashes::hex::FromHex;
1717
use bitcoin::{BlockHash, Txid};
1818

19+
use core::convert::Infallible;
1920
use core::future::Future;
2021
use core::mem;
2122
use core::ops::Deref;
@@ -491,7 +492,11 @@ where
491492

492493
struct PanicingSpawner;
493494
impl FutureSpawner for PanicingSpawner {
494-
fn spawn<T: Future<Output = ()> + MaybeSend + 'static>(&self, _: T) {
495+
type E = Infallible;
496+
type SpawnedFutureResult<O> = Box<dyn Future<Output = Result<O, Infallible>> + Unpin>;
497+
fn spawn<O, T: Future<Output = O> + MaybeSend + 'static>(
498+
&self, _: T,
499+
) -> Self::SpawnedFutureResult<O> {
495500
unreachable!();
496501
}
497502
}
@@ -959,7 +964,7 @@ where
959964
let future = inner.persist_new_channel(monitor_name, monitor);
960965
let channel_id = monitor.channel_id();
961966
let completion = (monitor.channel_id(), monitor.get_latest_update_id());
962-
self.0.future_spawner.spawn(async move {
967+
let _runs_free = self.0.future_spawner.spawn(async move {
963968
match future.await {
964969
Ok(()) => {
965970
inner.async_completed_updates.lock().unwrap().push(completion);
@@ -991,7 +996,7 @@ where
991996
None
992997
};
993998
let inner = Arc::clone(&self.0);
994-
self.0.future_spawner.spawn(async move {
999+
let _runs_free = self.0.future_spawner.spawn(async move {
9951000
match future.await {
9961001
Ok(()) => if let Some(completion) = completion {
9971002
inner.async_completed_updates.lock().unwrap().push(completion);
@@ -1009,7 +1014,7 @@ where
10091014

10101015
pub(crate) fn spawn_async_archive_persisted_channel(&self, monitor_name: MonitorName) {
10111016
let inner = Arc::clone(&self.0);
1012-
self.0.future_spawner.spawn(async move {
1017+
let _runs_free = self.0.future_spawner.spawn(async move {
10131018
inner.archive_persisted_channel(monitor_name).await;
10141019
});
10151020
}

0 commit comments

Comments
 (0)