Skip to content

Commit 28eedfa

Browse files
committed
sync wrapper wip
1 parent e551842 commit 28eedfa

File tree

2 files changed

+151
-7
lines changed

2 files changed

+151
-7
lines changed

lightning-background-processor/src/lib.rs

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,9 @@ use lightning::sign::ChangeDestinationSourceSync;
4040
use lightning::sign::OutputSpender;
4141
use lightning::util::async_poll::FutureSpawner;
4242
use lightning::util::logger::Logger;
43-
use lightning::util::persist::{KVStore, Persister};
43+
use lightning::util::persist::{
44+
KVStore, KVStoreSync, KVStoreSyncWrapper, Persister, PersisterSync,
45+
};
4446
use lightning::util::sweep::OutputSweeper;
4547
#[cfg(feature = "std")]
4648
use lightning::util::sweep::OutputSweeperSync;
@@ -995,7 +997,9 @@ impl BackgroundProcessor {
995997
D: 'static + Deref,
996998
O: 'static + Deref,
997999
K: 'static + Deref,
998-
OS: 'static + Deref<Target = OutputSweeperSync<T, D, F, CF, K, L, O>> + Send,
1000+
OS: 'static
1001+
+ Deref<Target = OutputSweeperSync<T, D, F, CF, KVStoreSyncWrapper<K>, L, O>>
1002+
+ Send,
9991003
FS: FutureSpawner,
10001004
>(
10011005
persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
@@ -1009,14 +1013,14 @@ impl BackgroundProcessor {
10091013
F::Target: 'static + FeeEstimator,
10101014
L::Target: 'static + Logger,
10111015
P::Target: 'static + Persist<<CM::Target as AChannelManager>::Signer>,
1012-
PS::Target: 'static + Persister<'a, CM, L, S>,
1016+
PS::Target: 'static + PersisterSync<'a, CM, L, S>,
10131017
CM::Target: AChannelManager,
10141018
OM::Target: AOnionMessenger,
10151019
PM::Target: APeerManager,
10161020
LM::Target: ALiquidityManager,
10171021
D::Target: ChangeDestinationSourceSync,
10181022
O::Target: 'static + OutputSpender,
1019-
K::Target: 'static + KVStore,
1023+
K::Target: 'static + KVStoreSync,
10201024
{
10211025
let stop_thread = Arc::new(AtomicBool::new(false));
10221026
let stop_thread_clone = stop_thread.clone();
@@ -1033,9 +1037,9 @@ impl BackgroundProcessor {
10331037
.expect("Time should be sometime after 1970");
10341038
if update_scorer(scorer, &event, duration_since_epoch) {
10351039
log_trace!(logger, "Persisting scorer after update");
1036-
// if let Err(e) = persister.persist_scorer(&scorer).await {
1037-
// log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
1038-
// }
1040+
if let Err(e) = persister.persist_scorer(&scorer) {
1041+
log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
1042+
}
10391043
}
10401044
}
10411045
event_handler.handle_event(event)

lightning/src/util/persist.rs

Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,91 @@ pub trait KVStore {
167167
) -> Result<Vec<String>, io::Error>;
168168
}
169169

170+
/// Provides a synchronous interface to the [`KVStore`] trait.
171+
pub trait KVStoreSync {
172+
/// Returns the data stored for the given `primary_namespace`, `secondary_namespace`, and
173+
/// `key`.
174+
///
175+
/// Returns an [`ErrorKind::NotFound`] if the given `key` could not be found in the given
176+
/// `primary_namespace` and `secondary_namespace`.
177+
///
178+
/// [`ErrorKind::NotFound`]: io::ErrorKind::NotFound
179+
fn read(
180+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
181+
) -> Result<Vec<u8>, io::Error>;
182+
/// Persists the given data under the given `key`.
183+
///
184+
/// Will create the given `primary_namespace` and `secondary_namespace` if not already present
185+
/// in the store.
186+
fn write(
187+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8],
188+
) -> Result<(), io::Error>;
189+
/// Removes any data that had previously been persisted under the given `key`.
190+
///
191+
/// If the `lazy` flag is set to `true`, the backend implementation might choose to lazily
192+
/// remove the given `key` at some point in time after the method returns, e.g., as part of an
193+
/// eventual batch deletion of multiple keys. As a consequence, subsequent calls to
194+
/// [`KVStore::list`] might include the removed key until the changes are actually persisted.
195+
///
196+
/// Note that while setting the `lazy` flag reduces the I/O burden of multiple subsequent
197+
/// `remove` calls, it also influences the atomicity guarantees as lazy `remove`s could
198+
/// potentially get lost on crash after the method returns. Therefore, this flag should only be
199+
/// set for `remove` operations that can be safely replayed at a later time.
200+
///
201+
/// Returns successfully if no data will be stored for the given `primary_namespace`,
202+
/// `secondary_namespace`, and `key`, independently of whether it was present before its
203+
/// invokation or not.
204+
fn remove(
205+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool,
206+
) -> Result<(), io::Error>;
207+
/// Returns a list of keys that are stored under the given `secondary_namespace` in
208+
/// `primary_namespace`.
209+
///
210+
/// Returns the keys in arbitrary order, so users requiring a particular order need to sort the
211+
/// returned keys. Returns an empty list if `primary_namespace` or `secondary_namespace` is unknown.
212+
fn list(
213+
&self, primary_namespace: &str, secondary_namespace: &str,
214+
) -> Result<Vec<String>, io::Error>;
215+
}
216+
217+
/// A wrapper around a [`KVStoreSync`] that implements the [`KVStore`] trait.
218+
pub struct KVStoreSyncWrapper<K: KVStoreSync>(Arc<K>);
219+
220+
impl<K: KVStoreSync> KVStoreSyncWrapper<K> {
221+
/// Constructs a new [`KVStoreSyncWrapper`].
222+
pub fn new(kv_store: Arc<K>) -> Self {
223+
Self(kv_store)
224+
}
225+
}
226+
227+
impl<K: KVStoreSync> KVStore for KVStoreSyncWrapper<K> {
228+
fn read(
229+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
230+
) -> AsyncResultType<'static, Vec<u8>, io::Error> {
231+
let res = self.0.read(primary_namespace, secondary_namespace, key);
232+
233+
Box::pin(async move { res })
234+
}
235+
236+
fn write(
237+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8],
238+
) -> AsyncResultType<'static, (), io::Error> {
239+
todo!()
240+
}
241+
242+
fn remove(
243+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool,
244+
) -> Result<(), io::Error> {
245+
todo!()
246+
}
247+
248+
fn list(
249+
&self, primary_namespace: &str, secondary_namespace: &str,
250+
) -> Result<Vec<String>, io::Error> {
251+
todo!()
252+
}
253+
}
254+
170255
/// Provides additional interface methods that are required for [`KVStore`]-to-[`KVStore`]
171256
/// data migration.
172257
pub trait MigratableKVStore: KVStore {
@@ -275,6 +360,61 @@ where
275360
}
276361
}
277362

363+
/// Trait that handles persisting a [`ChannelManager`], [`NetworkGraph`], and [`WriteableScore`] to disk.
364+
///
365+
/// [`ChannelManager`]: crate::ln::channelmanager::ChannelManager
366+
pub trait PersisterSync<'a, CM: Deref, L: Deref, S: Deref>
367+
where
368+
CM::Target: 'static + AChannelManager,
369+
L::Target: 'static + Logger,
370+
S::Target: WriteableScore<'a>,
371+
{
372+
/// Persist the given ['ChannelManager'] to disk, returning an error if persistence failed.
373+
///
374+
/// [`ChannelManager`]: crate::ln::channelmanager::ChannelManager
375+
fn persist_manager(&self, channel_manager: &CM) -> Result<(), io::Error>;
376+
377+
/// Persist the given [`NetworkGraph`] to disk, returning an error if persistence failed.
378+
fn persist_graph(&self, network_graph: &NetworkGraph<L>) -> Result<(), io::Error>;
379+
380+
/// Persist the given [`WriteableScore`] to disk, returning an error if persistence failed.
381+
fn persist_scorer(&self, scorer: &S) -> Result<(), io::Error>;
382+
}
383+
384+
impl<'a, A: KVStoreSync + ?Sized, CM: Deref, L: Deref, S: Deref> PersisterSync<'a, CM, L, S> for A
385+
where
386+
CM::Target: 'static + AChannelManager,
387+
L::Target: 'static + Logger,
388+
S::Target: WriteableScore<'a>,
389+
{
390+
fn persist_manager(&self, channel_manager: &CM) -> Result<(), io::Error> {
391+
self.write(
392+
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
393+
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
394+
CHANNEL_MANAGER_PERSISTENCE_KEY,
395+
&channel_manager.get_cm().encode(),
396+
)
397+
}
398+
399+
fn persist_graph(&self, network_graph: &NetworkGraph<L>) -> Result<(), io::Error> {
400+
self.write(
401+
NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE,
402+
NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
403+
NETWORK_GRAPH_PERSISTENCE_KEY,
404+
&network_graph.encode(),
405+
)
406+
}
407+
408+
fn persist_scorer(&self, scorer: &S) -> Result<(), io::Error> {
409+
self.write(
410+
SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
411+
SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
412+
SCORER_PERSISTENCE_KEY,
413+
&scorer.encode(),
414+
)
415+
}
416+
}
417+
278418
impl<ChannelSigner: EcdsaChannelSigner, K: KVStore + ?Sized + Sync + Send + 'static>
279419
Persist<ChannelSigner> for Arc<K>
280420
{

0 commit comments

Comments
 (0)