Skip to content

Commit b8cf41c

Browse files
committed
Drop Condvar and use block_on for wait_next_event
Given we regularly run into issues arising from mixing sync and async contexts, we here simplify our `EventQueue` implementation by avoiding to use `Condvar::wait_while` (which parks the current thread) and rather simply us `block_on` on our `next_event_async` method.
1 parent ae98bef commit b8cf41c

File tree

2 files changed

+7
-16
lines changed

2 files changed

+7
-16
lines changed

src/event.rs

Lines changed: 3 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use core::future::Future;
99
use core::task::{Poll, Waker};
1010
use std::collections::VecDeque;
1111
use std::ops::Deref;
12-
use std::sync::{Arc, Condvar, Mutex};
12+
use std::sync::{Arc, Mutex};
1313

1414
use bitcoin::blockdata::locktime::absolute::LockTime;
1515
use bitcoin::secp256k1::PublicKey;
@@ -287,7 +287,6 @@ where
287287
{
288288
queue: Arc<Mutex<VecDeque<Event>>>,
289289
waker: Arc<Mutex<Option<Waker>>>,
290-
notifier: Condvar,
291290
kv_store: Arc<DynStore>,
292291
logger: L,
293292
}
@@ -299,8 +298,7 @@ where
299298
pub(crate) fn new(kv_store: Arc<DynStore>, logger: L) -> Self {
300299
let queue = Arc::new(Mutex::new(VecDeque::new()));
301300
let waker = Arc::new(Mutex::new(None));
302-
let notifier = Condvar::new();
303-
Self { queue, waker, notifier, kv_store, logger }
301+
Self { queue, waker, kv_store, logger }
304302
}
305303

306304
pub(crate) fn add_event(&self, event: Event) -> Result<(), Error> {
@@ -310,8 +308,6 @@ where
310308
self.persist_queue(&locked_queue)?;
311309
}
312310

313-
self.notifier.notify_one();
314-
315311
if let Some(waker) = self.waker.lock().unwrap().take() {
316312
waker.wake();
317313
}
@@ -327,19 +323,12 @@ where
327323
EventFuture { event_queue: Arc::clone(&self.queue), waker: Arc::clone(&self.waker) }.await
328324
}
329325

330-
pub(crate) fn wait_next_event(&self) -> Event {
331-
let locked_queue =
332-
self.notifier.wait_while(self.queue.lock().unwrap(), |queue| queue.is_empty()).unwrap();
333-
locked_queue.front().unwrap().clone()
334-
}
335-
336326
pub(crate) fn event_handled(&self) -> Result<(), Error> {
337327
{
338328
let mut locked_queue = self.queue.lock().unwrap();
339329
locked_queue.pop_front();
340330
self.persist_queue(&locked_queue)?;
341331
}
342-
self.notifier.notify_one();
343332

344333
if let Some(waker) = self.waker.lock().unwrap().take() {
345334
waker.wake();
@@ -383,8 +372,7 @@ where
383372
let read_queue: EventQueueDeserWrapper = Readable::read(reader)?;
384373
let queue = Arc::new(Mutex::new(read_queue.0));
385374
let waker = Arc::new(Mutex::new(None));
386-
let notifier = Condvar::new();
387-
Ok(Self { queue, waker, notifier, kv_store, logger })
375+
Ok(Self { queue, waker, kv_store, logger })
388376
}
389377
}
390378

src/lib.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -749,7 +749,10 @@ impl Node {
749749
/// **Caution:** Users must handle events as quickly as possible to prevent a large event backlog,
750750
/// which can increase the memory footprint of [`Node`].
751751
pub fn wait_next_event(&self) -> Event {
752-
self.event_queue.wait_next_event()
752+
let fut = self.event_queue.next_event_async();
753+
// We use our runtime for the sync variant to ensure `tokio::task::block_in_place` is
754+
// always called if we'd ever hit this in an outer runtime context.
755+
self.runtime.block_on(fut)
753756
}
754757

755758
/// Confirm the last retrieved event handled.

0 commit comments

Comments
 (0)