Skip to content

Commit 4dd2936

Browse files
committed
async write wip
1 parent d72d193 commit 4dd2936

File tree

3 files changed

+93
-59
lines changed

3 files changed

+93
-59
lines changed

lightning/src/chain/chainmonitor.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ use crate::chain::transaction::{OutPoint, TransactionData};
3636
use crate::ln::types::ChannelId;
3737
use crate::sign::ecdsa::EcdsaChannelSigner;
3838
use crate::events::{self, Event, EventHandler, ReplayEvent};
39-
use crate::util::async_poll::{poll_or_spawn, AsyncResult, FutureSpawner};
39+
use crate::util::async_poll::{poll_or_spawn, AsyncResult, AsyncVoid, FutureSpawner};
4040
use crate::util::logger::{Logger, WithContext};
4141
use crate::util::errors::APIError;
4242
use crate::util::persist::MonitorName;
@@ -175,7 +175,7 @@ pub trait Persist<ChannelSigner: EcdsaChannelSigner> {
175175
/// the archive process. Additionally, because the archive operation could be retried on
176176
/// restart, this method must in that case be idempotent, ensuring it can handle scenarios where
177177
/// the monitor already exists in the archive.
178-
fn archive_persisted_channel(&self, monitor_name: MonitorName);
178+
fn archive_persisted_channel(&self, monitor_name: MonitorName) -> AsyncVoid;
179179
}
180180

181181
struct MonitorHolder<ChannelSigner: EcdsaChannelSigner> {

lightning/src/util/async_poll.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,9 @@ pub(crate) fn dummy_waker() -> Waker {
9696
unsafe { Waker::from_raw(RawWaker::new(core::ptr::null(), &DUMMY_WAKER_VTABLE)) }
9797
}
9898

99+
/// A type alias for a future that returns nothing.
100+
pub type AsyncVoid = Pin<Box<dyn Future<Output = ()> + 'static + Send>>;
101+
99102
/// A type alias for a future that returns a result of type T.
100103
pub type AsyncResult<'a, T> = Pin<Box<dyn Future<Output = Result<T, ()>> + 'a + Send>>;
101104

lightning/src/util/persist.rs

Lines changed: 88 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ use crate::sync::Arc;
3232
use crate::util::logger::Logger;
3333
use crate::util::ser::{Readable, ReadableArgs, Writeable};
3434

35-
use super::async_poll::{AsyncResult, AsyncResultType};
35+
use super::async_poll::{AsyncResult, AsyncResultType, AsyncVoid};
3636

3737
/// The alphabet of characters allowed for namespaces and keys.
3838
pub const KVSTORE_NAMESPACE_KEY_ALPHABET: &str =
@@ -136,11 +136,6 @@ pub trait KVStore {
136136
///
137137
/// Will create the given `primary_namespace` and `secondary_namespace` if not already present
138138
/// in the store.
139-
fn write(
140-
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8],
141-
) -> Result<(), io::Error>;
142-
143-
/// Asynchronously persists the given data under the given `key`.
144139
fn write_async(
145140
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8],
146141
) -> AsyncResultType<'static, (), io::Error>;
@@ -193,14 +188,22 @@ pub trait MigratableKVStore: KVStore {
193188
///
194189
/// Will abort and return an error if any IO operation fails. Note that in this case the
195190
/// `target_store` might get left in an intermediate state.
196-
pub fn migrate_kv_store_data<S: MigratableKVStore, T: MigratableKVStore>(
191+
pub async fn migrate_kv_store_data<S: MigratableKVStore, T: MigratableKVStore>(
197192
source_store: &mut S, target_store: &mut T,
198193
) -> Result<(), io::Error> {
199194
let keys_to_migrate = source_store.list_all_keys()?;
200195

201196
for (primary_namespace, secondary_namespace, key) in &keys_to_migrate {
202197
let data = source_store.read(primary_namespace, secondary_namespace, key)?;
203-
target_store.write(primary_namespace, secondary_namespace, key, &data)?;
198+
target_store
199+
.write_async(primary_namespace, secondary_namespace, key, &data)
200+
.await
201+
.map_err(|_| {
202+
io::Error::new(
203+
io::ErrorKind::Other,
204+
"Failed to write data to target store during migration",
205+
)
206+
})?;
204207
}
205208

206209
Ok(())
@@ -218,41 +221,53 @@ where
218221
/// Persist the given ['ChannelManager'] to disk, returning an error if persistence failed.
219222
///
220223
/// [`ChannelManager`]: crate::ln::channelmanager::ChannelManager
221-
fn persist_manager(&self, channel_manager: &CM) -> Result<(), io::Error>;
224+
fn persist_manager(&self, channel_manager: &CM) -> AsyncResultType<'static, (), io::Error>;
222225

223226
/// Persist the given [`NetworkGraph`] to disk, returning an error if persistence failed.
224-
fn persist_graph(&self, network_graph: &NetworkGraph<L>) -> Result<(), io::Error>;
227+
fn persist_graph(
228+
&self, network_graph: &NetworkGraph<L>,
229+
) -> AsyncResultType<'static, (), io::Error>;
225230

226231
/// Persist the given [`WriteableScore`] to disk, returning an error if persistence failed.
227-
fn persist_scorer(&self, scorer: &S) -> Result<(), io::Error>;
232+
fn persist_scorer(&self, scorer: &S) -> AsyncResultType<'static, (), io::Error>;
228233
}
229234

230-
impl<'a, A: KVStore + ?Sized, CM: Deref, L: Deref, S: Deref> Persister<'a, CM, L, S> for A
235+
impl<'a, A: KVStore + ?Sized + Send + Sync + 'static, CM: Deref, L: Deref, S: Deref>
236+
Persister<'a, CM, L, S> for Arc<A>
231237
where
232238
CM::Target: 'static + AChannelManager,
233239
L::Target: 'static + Logger,
234240
S::Target: WriteableScore<'a>,
235241
{
236-
fn persist_manager(&self, channel_manager: &CM) -> Result<(), io::Error> {
237-
self.write(
238-
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
239-
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
240-
CHANNEL_MANAGER_PERSISTENCE_KEY,
241-
&channel_manager.get_cm().encode(),
242-
)
242+
fn persist_manager(&self, channel_manager: &CM) -> AsyncResultType<'static, (), io::Error> {
243+
let encoded = channel_manager.get_cm().encode();
244+
let kv_store = self.clone();
245+
246+
Box::pin(async move {
247+
kv_store
248+
.write_async(
249+
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
250+
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
251+
CHANNEL_MANAGER_PERSISTENCE_KEY,
252+
&encoded,
253+
)
254+
.await
255+
})
243256
}
244257

245-
fn persist_graph(&self, network_graph: &NetworkGraph<L>) -> Result<(), io::Error> {
246-
self.write(
258+
fn persist_graph(
259+
&self, network_graph: &NetworkGraph<L>,
260+
) -> AsyncResultType<'static, (), io::Error> {
261+
self.write_async(
247262
NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE,
248263
NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
249264
NETWORK_GRAPH_PERSISTENCE_KEY,
250265
&network_graph.encode(),
251266
)
252267
}
253268

254-
fn persist_scorer(&self, scorer: &S) -> Result<(), io::Error> {
255-
self.write(
269+
fn persist_scorer(&self, scorer: &S) -> AsyncResultType<'static, (), io::Error> {
270+
self.write_async(
256271
SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
257272
SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
258273
SCORER_PERSISTENCE_KEY,
@@ -308,31 +323,38 @@ impl<ChannelSigner: EcdsaChannelSigner, K: KVStore + ?Sized + Sync + Send + 'sta
308323
})
309324
}
310325

311-
fn archive_persisted_channel(&self, monitor_name: MonitorName) {
312-
let monitor_key = monitor_name.to_string();
313-
let monitor = match self.read(
314-
CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
315-
CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
316-
monitor_key.as_str(),
317-
) {
318-
Ok(monitor) => monitor,
319-
Err(_) => return,
320-
};
321-
match self.write(
322-
ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
323-
ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
324-
monitor_key.as_str(),
325-
&monitor,
326-
) {
327-
Ok(()) => {},
328-
Err(_e) => return,
329-
};
330-
let _ = self.remove(
331-
CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
332-
CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
333-
monitor_key.as_str(),
334-
true,
335-
);
326+
fn archive_persisted_channel(&self, monitor_name: MonitorName) -> AsyncVoid {
327+
let kv_store = self.clone();
328+
329+
Box::pin(async move {
330+
let monitor_key = monitor_name.to_string();
331+
let monitor = match kv_store.read(
332+
CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
333+
CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
334+
monitor_key.as_str(),
335+
) {
336+
Ok(monitor) => monitor,
337+
Err(_) => return,
338+
};
339+
match kv_store
340+
.write_async(
341+
ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
342+
ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
343+
monitor_key.as_str(),
344+
&monitor,
345+
)
346+
.await
347+
{
348+
Ok(()) => {},
349+
Err(_e) => return,
350+
};
351+
let _ = kv_store.remove(
352+
CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
353+
CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
354+
monitor_key.as_str(),
355+
true,
356+
);
357+
})
336358
}
337359
}
338360

@@ -777,8 +799,13 @@ where
777799
})
778800
}
779801

780-
fn archive_persisted_channel(&self, monitor_name: MonitorName) {
781-
self.state.archive_persisted_channel(monitor_name);
802+
fn archive_persisted_channel(&self, monitor_name: MonitorName) -> AsyncVoid {
803+
let monitor_name = monitor_name;
804+
let state = self.state.clone();
805+
806+
Box::pin(async move {
807+
state.archive_persisted_channel(monitor_name).await;
808+
})
782809
}
783810
}
784811

@@ -925,18 +952,22 @@ where
925952
}
926953
}
927954

928-
fn archive_persisted_channel(&self, monitor_name: MonitorName) {
955+
async fn archive_persisted_channel(&self, monitor_name: MonitorName) {
929956
let monitor_key = monitor_name.to_string();
930957
let monitor = match self.read_channel_monitor_with_updates(&monitor_key) {
931958
Ok((_block_hash, monitor)) => monitor,
932959
Err(_) => return,
933960
};
934-
match self.kv_store.write(
935-
ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
936-
ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
937-
monitor_key.as_str(),
938-
&monitor.encode(),
939-
) {
961+
match self
962+
.kv_store
963+
.write_async(
964+
ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
965+
ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
966+
monitor_key.as_str(),
967+
&monitor.encode(),
968+
)
969+
.await
970+
{
940971
Ok(()) => {},
941972
Err(_e) => return,
942973
};

0 commit comments

Comments
 (0)