Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 34 additions & 29 deletions src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use lightning::util::config::{
ChannelConfigOverrides, ChannelConfigUpdate, ChannelHandshakeConfigUpdate,
};
use lightning::util::errors::APIError;
use lightning::util::persist::KVStoreSync;
use lightning::util::persist::KVStore;
use lightning::util::ser::{Readable, ReadableArgs, Writeable, Writer};
use lightning_liquidity::lsps2::utils::compute_opening_fee;
use lightning_types::payment::{PaymentHash, PaymentPreimage};
Expand Down Expand Up @@ -301,12 +301,14 @@ where
Self { queue, waker, kv_store, logger }
}

pub(crate) fn add_event(&self, event: Event) -> Result<(), Error> {
{
pub(crate) async fn add_event(&self, event: Event) -> Result<(), Error> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was looking at what's left in EventQueue that is sync. Just next_event. Unrelated to this PR, but in every test where that method is used, the test is already async. So it could use the async version?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was looking at what's left in EventQueue that is sync. Just next_event. Unrelated to this PR, but in every test where that method is used, the test is already async. So it could use the async version?

But next_event doesn't do anything that would require async operation? It doesn't repersist or anything?

let data = {
let mut locked_queue = self.queue.lock().unwrap();
locked_queue.push_back(event);
self.persist_queue(&locked_queue)?;
}
EventQueueSerWrapper(&locked_queue).encode()
};

self.persist_queue(data).await?;

if let Some(waker) = self.waker.lock().unwrap().take() {
waker.wake();
Expand All @@ -323,28 +325,30 @@ where
EventFuture { event_queue: Arc::clone(&self.queue), waker: Arc::clone(&self.waker) }.await
}

pub(crate) fn event_handled(&self) -> Result<(), Error> {
{
pub(crate) async fn event_handled(&self) -> Result<(), Error> {
let data = {
let mut locked_queue = self.queue.lock().unwrap();
locked_queue.pop_front();
self.persist_queue(&locked_queue)?;
}
EventQueueSerWrapper(&locked_queue).encode()
};

self.persist_queue(data).await?;

if let Some(waker) = self.waker.lock().unwrap().take() {
waker.wake();
}
Ok(())
}

fn persist_queue(&self, locked_queue: &VecDeque<Event>) -> Result<(), Error> {
let data = EventQueueSerWrapper(locked_queue).encode();
KVStoreSync::write(
async fn persist_queue(&self, encoded_queue: Vec<u8>) -> Result<(), Error> {
KVStore::write(
&*self.kv_store,
EVENT_QUEUE_PERSISTENCE_PRIMARY_NAMESPACE,
EVENT_QUEUE_PERSISTENCE_SECONDARY_NAMESPACE,
EVENT_QUEUE_PERSISTENCE_KEY,
data,
encoded_queue,
)
.await
.map_err(|e| {
log_error!(
self.logger,
Expand Down Expand Up @@ -694,7 +698,7 @@ where
claim_deadline,
custom_records,
};
match self.event_queue.add_event(event) {
match self.event_queue.add_event(event).await {
Ok(_) => return Ok(()),
Err(e) => {
log_error!(
Expand Down Expand Up @@ -928,7 +932,7 @@ where
.map(|cf| cf.custom_tlvs().into_iter().map(|tlv| tlv.into()).collect())
.unwrap_or_default(),
};
match self.event_queue.add_event(event) {
match self.event_queue.add_event(event).await {
Ok(_) => return Ok(()),
Err(e) => {
log_error!(self.logger, "Failed to push to event queue: {}", e);
Expand Down Expand Up @@ -988,7 +992,7 @@ where
fee_paid_msat,
};

match self.event_queue.add_event(event) {
match self.event_queue.add_event(event).await {
Ok(_) => return Ok(()),
Err(e) => {
log_error!(self.logger, "Failed to push to event queue: {}", e);
Expand Down Expand Up @@ -1019,7 +1023,7 @@ where

let event =
Event::PaymentFailed { payment_id: Some(payment_id), payment_hash, reason };
match self.event_queue.add_event(event) {
match self.event_queue.add_event(event).await {
Ok(_) => return Ok(()),
Err(e) => {
log_error!(self.logger, "Failed to push to event queue: {}", e);
Expand Down Expand Up @@ -1295,7 +1299,7 @@ where
claim_from_onchain_tx,
outbound_amount_forwarded_msat,
};
self.event_queue.add_event(event).map_err(|e| {
self.event_queue.add_event(event).await.map_err(|e| {
log_error!(self.logger, "Failed to push to event queue: {}", e);
ReplayEvent()
})?;
Expand All @@ -1322,7 +1326,7 @@ where
counterparty_node_id,
funding_txo,
};
match self.event_queue.add_event(event) {
match self.event_queue.add_event(event).await {
Ok(_) => {},
Err(e) => {
log_error!(self.logger, "Failed to push to event queue: {}", e);
Expand Down Expand Up @@ -1383,7 +1387,7 @@ where
user_channel_id: UserChannelId(user_channel_id),
counterparty_node_id: Some(counterparty_node_id),
};
match self.event_queue.add_event(event) {
match self.event_queue.add_event(event).await {
Ok(_) => {},
Err(e) => {
log_error!(self.logger, "Failed to push to event queue: {}", e);
Expand All @@ -1407,7 +1411,7 @@ where
reason: Some(reason),
};

match self.event_queue.add_event(event) {
match self.event_queue.add_event(event).await {
Ok(_) => {},
Err(e) => {
log_error!(self.logger, "Failed to push to event queue: {}", e);
Expand Down Expand Up @@ -1621,7 +1625,7 @@ mod tests {
user_channel_id: UserChannelId(2323),
counterparty_node_id: None,
};
event_queue.add_event(expected_event.clone()).unwrap();
event_queue.add_event(expected_event.clone()).await.unwrap();

// Check we get the expected event and that it is returned until we mark it handled.
for _ in 0..5 {
Expand All @@ -1630,18 +1634,19 @@ mod tests {
}

// Check we can read back what we persisted.
let persisted_bytes = KVStoreSync::read(
let persisted_bytes = KVStore::read(
&*store,
EVENT_QUEUE_PERSISTENCE_PRIMARY_NAMESPACE,
EVENT_QUEUE_PERSISTENCE_SECONDARY_NAMESPACE,
EVENT_QUEUE_PERSISTENCE_KEY,
)
.await
.unwrap();
let deser_event_queue =
EventQueue::read(&mut &persisted_bytes[..], (Arc::clone(&store), logger)).unwrap();
assert_eq!(deser_event_queue.next_event_async().await, expected_event);

event_queue.event_handled().unwrap();
event_queue.event_handled().await.unwrap();
assert_eq!(event_queue.next_event(), None);
}

Expand Down Expand Up @@ -1675,28 +1680,28 @@ mod tests {
let mut delayed_enqueue = false;

for _ in 0..25 {
event_queue.add_event(expected_event.clone()).unwrap();
event_queue.add_event(expected_event.clone()).await.unwrap();
enqueued_events.fetch_add(1, Ordering::SeqCst);
}

loop {
tokio::select! {
_ = tokio::time::sleep(Duration::from_millis(10)), if !delayed_enqueue => {
event_queue.add_event(expected_event.clone()).unwrap();
event_queue.add_event(expected_event.clone()).await.unwrap();
enqueued_events.fetch_add(1, Ordering::SeqCst);
delayed_enqueue = true;
}
e = event_queue.next_event_async() => {
assert_eq!(e, expected_event);
event_queue.event_handled().unwrap();
event_queue.event_handled().await.unwrap();
received_events.fetch_add(1, Ordering::SeqCst);

event_queue.add_event(expected_event.clone()).unwrap();
event_queue.add_event(expected_event.clone()).await.unwrap();
enqueued_events.fetch_add(1, Ordering::SeqCst);
}
e = event_queue.next_event_async() => {
assert_eq!(e, expected_event);
event_queue.event_handled().unwrap();
event_queue.event_handled().await.unwrap();
received_events.fetch_add(1, Ordering::SeqCst);
}
}
Expand Down
5 changes: 4 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -777,7 +777,10 @@ impl Node {
///
/// **Note:** This **MUST** be called after each event has been handled.
pub fn event_handled(&self) -> Result<(), Error> {
self.event_queue.event_handled().map_err(|e| {
// We use our runtime for the sync variant to ensure `tokio::task::block_in_place` is
// always called if we'd ever hit this in an outer runtime context.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Every time I read this, the same question marks pop up. It feels so strange to have regular sync code and worry about an outer runtime. And then we also make it more complicated because our own block_on variant isn't using the internal handle if there is already a context.

let fut = self.event_queue.event_handled();
self.runtime.block_on(fut).map_err(|e| {
log_error!(
self.logger,
"Couldn't mark event handled due to persistence failure: {}",
Expand Down
Loading