From d310f36f9334e8800c49afad479ff00215c91043 Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Wed, 1 Oct 2025 11:10:10 +0200 Subject: [PATCH 1/2] feat: initial implementation of `filter` --- src/lib.rs | 116 +++++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 113 insertions(+), 3 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 736d9bf..b2729a3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -81,7 +81,7 @@ use std::{ future::Future, pin::Pin, sync::{Arc, Weak}, - task::{self, ready, Poll, Waker}, + task::{self, Poll, Waker, ready}, }; #[cfg(watcher_loom)] @@ -110,18 +110,36 @@ impl Clone for Watchable { pub trait Nullable { /// Converts this value into an `Option`. fn into_option(self) -> Option; + + fn filter(self, f: F) -> Self + where + F: FnOnce(&T) -> bool; } impl Nullable for Option { fn into_option(self) -> Option { self } + + fn filter(self, f: F) -> Self + where + F: FnOnce(&T) -> bool, + { + self.filter(f) + } } impl Nullable for Vec { fn into_option(mut self) -> Option { self.pop() } + + fn filter(self, f: F) -> Self + where + F: FnOnce(&T) -> bool, + { + todo!() + } } impl Watchable { @@ -350,6 +368,24 @@ pub trait Watcher: Clone { } } + fn filter(mut self, filter: impl Fn(&T) -> bool + Send + Sync + 'static) -> Filter + where + T: Clone + Eq, + Self: Watcher, + { + let current = self.get(); + let current = if filter(¤t) { + Some(current) + } else { + None + }; + Filter { + current, + filter: Arc::new(filter), + watcher: self, + } + } + /// Returns a watcher that updates every time this or the other watcher /// updates, and yields both watcher's items together when that happens. fn or(self, other: W) -> (Self, W) { @@ -519,6 +555,57 @@ impl Watcher for Map { } } +/// Wraps a [`Watcher`] to allow observing a derived value. +/// +/// See [`Watcher::map`]. +#[derive(derive_more::Debug, Clone)] +pub struct Filter +where + T: Clone + Eq, + W: Watcher, +{ + #[debug("Arc bool + 'static>")] + filter: Arc bool + Send + Sync + 'static>, + watcher: W, + current: Option, +} + +impl Watcher for Filter +where + T: Clone + Eq, + W: Watcher, +{ + type Value = Option; + + fn get(&mut self) -> Self::Value { + self.current.clone() + } + + fn is_connected(&self) -> bool { + self.watcher.is_connected() + } + + fn poll_updated( + &mut self, + cx: &mut task::Context<'_>, + ) -> Poll> { + loop { + let value = ready!(self.watcher.poll_updated(cx)?); + let filtered = if (self.filter)(&value) { + Some(value) + } else { + None + }; + if filtered != self.current { + self.current = filtered.clone(); + return Poll::Ready(Ok(filtered)); + } else { + self.current = filtered; + } + } + } +} + /// Future returning the next item after the current one in a [`Watcher`]. /// /// See [`Watcher::updated`]. @@ -683,8 +770,8 @@ impl Shared { #[cfg(test)] mod tests { - use n0_future::{future::poll_once, StreamExt}; - use rand::{thread_rng, Rng}; + use n0_future::{StreamExt, future::poll_once}; + use rand::{Rng, thread_rng}; use tokio::{ task::JoinSet, time::{Duration, Instant}, @@ -1005,4 +1092,27 @@ mod tests { assert!(!a.has_watchers()); assert!(!b.has_watchers()); } + + #[tokio::test] + async fn test_filter() { + let a = Watchable::new(1u8); + let mut filtered = a.watch().filter(|x| *x > 2 && *x < 6); + + assert_eq!(filtered.get(), None); + + let handle = tokio::task::spawn(async move { filtered.stream().collect::>().await }); + + for i in 2u8..10 { + a.set(i).unwrap(); + tokio::task::yield_now().await; + } + drop(a); + + let values = tokio::time::timeout(Duration::from_secs(5), handle) + .await + .unwrap() + .unwrap(); + + assert_eq!(values, vec![None, Some(3u8), Some(4), Some(5), None]); + } } From aecedb7f0bc4cbe06af8e1c1afe458bd962b7e20 Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Wed, 1 Oct 2025 11:49:17 +0200 Subject: [PATCH 2/2] improve test and cleanup --- src/lib.rs | 49 +++++++++++++++++++++++++++---------------------- 1 file changed, 27 insertions(+), 22 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index b2729a3..ffff76b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -81,7 +81,7 @@ use std::{ future::Future, pin::Pin, sync::{Arc, Weak}, - task::{self, Poll, Waker, ready}, + task::{self, ready, Poll, Waker}, }; #[cfg(watcher_loom)] @@ -110,36 +110,18 @@ impl Clone for Watchable { pub trait Nullable { /// Converts this value into an `Option`. fn into_option(self) -> Option; - - fn filter(self, f: F) -> Self - where - F: FnOnce(&T) -> bool; } impl Nullable for Option { fn into_option(self) -> Option { self } - - fn filter(self, f: F) -> Self - where - F: FnOnce(&T) -> bool, - { - self.filter(f) - } } impl Nullable for Vec { fn into_option(mut self) -> Option { self.pop() } - - fn filter(self, f: F) -> Self - where - F: FnOnce(&T) -> bool, - { - todo!() - } } impl Watchable { @@ -770,8 +752,8 @@ impl Shared { #[cfg(test)] mod tests { - use n0_future::{StreamExt, future::poll_once}; - use rand::{Rng, thread_rng}; + use n0_future::{future::poll_once, StreamExt}; + use rand::{thread_rng, Rng}; use tokio::{ task::JoinSet, time::{Duration, Instant}, @@ -1094,7 +1076,7 @@ mod tests { } #[tokio::test] - async fn test_filter() { + async fn test_filter_basic() { let a = Watchable::new(1u8); let mut filtered = a.watch().filter(|x| *x > 2 && *x < 6); @@ -1115,4 +1097,27 @@ mod tests { assert_eq!(values, vec![None, Some(3u8), Some(4), Some(5), None]); } + + #[tokio::test] + async fn test_filter_init() { + let a = Watchable::new(1u8); + let mut filtered = a.watch().filter(|x| *x > 2 && *x < 6); + + assert_eq!(filtered.get(), None); + + let handle = tokio::task::spawn(async move { filtered.initialized().await }); + + for i in 2u8..10 { + a.set(i).unwrap(); + tokio::task::yield_now().await; + } + drop(a); + + let value = tokio::time::timeout(Duration::from_secs(5), handle) + .await + .unwrap() + .unwrap(); + + assert_eq!(value, 3); + } }