From fd12d9cc9880af5176514144c1c41f1ec324dd89 Mon Sep 17 00:00:00 2001 From: Ariel Ben-Yehuda Date: Sun, 19 Oct 2025 14:15:48 +0000 Subject: [PATCH] fix: support fake time This avoids bad behavior like being stuck on a busy loop when tokio paused clock is used. Requires hyperium/hyper#3965 --- src/client/legacy/pool.rs | 39 ++++++++++++++++++++++++++++----------- src/common/timer.rs | 4 ++++ src/rt/tokio.rs | 4 ++++ 3 files changed, 36 insertions(+), 11 deletions(-) diff --git a/src/client/legacy/pool.rs b/src/client/legacy/pool.rs index e0d6f2ff..a369bcb9 100644 --- a/src/client/legacy/pool.rs +++ b/src/client/legacy/pool.rs @@ -298,7 +298,7 @@ struct IdlePopper<'a, T, K> { } impl<'a, T: Poolable + 'a, K: Debug> IdlePopper<'a, T, K> { - fn pop(self, expiration: &Expiration) -> Option> { + fn pop(self, expiration: &Expiration, now: Instant) -> Option> { while let Some(entry) = self.list.pop() { // If the connection has been closed, or is older than our idle // timeout, simply drop it and keep looking... @@ -312,7 +312,7 @@ impl<'a, T: Poolable + 'a, K: Debug> IdlePopper<'a, T, K> { // // In that case, we could just break out of the loop and drop the // whole list... - if expiration.expires(entry.idle_at) { + if expiration.expires(entry.idle_at, now) { trace!("removing expired connection for {:?}", self.key); continue; } @@ -321,7 +321,7 @@ impl<'a, T: Poolable + 'a, K: Debug> IdlePopper<'a, T, K> { #[cfg(feature = "http2")] Reservation::Shared(to_reinsert, to_checkout) => { self.list.push(Idle { - idle_at: Instant::now(), + idle_at: now, value: to_reinsert, }); to_checkout @@ -340,6 +340,12 @@ impl<'a, T: Poolable + 'a, K: Debug> IdlePopper<'a, T, K> { } impl PoolInner { + fn now(&self) -> Instant { + self.timer + .as_ref() + .map_or_else(|| Instant::now(), |t| t.now()) + } + fn put(&mut self, key: K, value: T, __pool_ref: &Arc>>) { if value.can_share() && self.idle.contains_key(&key) { trace!("put; existing idle HTTP/2 connection for {:?}", key); @@ -386,6 +392,7 @@ impl PoolInner { Some(value) => { // borrow-check scope... { + let now = self.now(); let idle_list = self.idle.entry(key.clone()).or_default(); if self.max_idle_per_host <= idle_list.len() { trace!("max idle per host for {:?}, dropping connection", key); @@ -395,7 +402,7 @@ impl PoolInner { debug!("pooling idle connection for {:?}", key); idle_list.push(Idle { value, - idle_at: Instant::now(), + idle_at: now, }); } @@ -477,7 +484,7 @@ impl PoolInner { fn clear_expired(&mut self) { let dur = self.timeout.expect("interval assumes timeout"); - let now = Instant::now(); + let now = self.now(); //self.last_idle_check_at = now; self.idle.retain(|key, values| { @@ -649,6 +656,7 @@ impl Checkout { let entry = { let mut inner = self.pool.inner.as_ref()?.lock().unwrap(); let expiration = Expiration::new(inner.timeout); + let now = inner.now(); let maybe_entry = inner.idle.get_mut(&self.key).and_then(|list| { trace!("take? {:?}: expiration = {:?}", self.key, expiration.0); // A block to end the mutable borrow on list, @@ -658,7 +666,7 @@ impl Checkout { key: &self.key, list, }; - popper.pop(&expiration) + popper.pop(&expiration, now) } .map(|e| (e, list.is_empty())) }); @@ -762,10 +770,10 @@ impl Expiration { Expiration(dur) } - fn expires(&self, instant: Instant) -> bool { + fn expires(&self, instant: Instant, now: Instant) -> bool { match self.0 { // Avoid `Instant::elapsed` to avoid issues like rust-lang/rust#86470. - Some(timeout) => Instant::now().saturating_duration_since(instant) > timeout, + Some(timeout) => now.saturating_duration_since(instant) > timeout, None => false, } } @@ -785,7 +793,7 @@ impl IdleTask { async fn run(self) { use futures_util::future; - let mut sleep = self.timer.sleep_until(Instant::now() + self.duration); + let mut sleep = self.timer.sleep_until(self.timer.now() + self.duration); let mut on_pool_drop = self.pool_drop_notifier; loop { match future::select(&mut on_pool_drop, &mut sleep).await { @@ -801,7 +809,7 @@ impl IdleTask { } } - let deadline = Instant::now() + self.duration; + let deadline = self.timer.now() + self.duration; self.timer.reset(&mut sleep, deadline); } } @@ -976,7 +984,16 @@ mod tests { } #[tokio::test] - async fn test_pool_timer_removes_expired() { + async fn test_pool_timer_removes_expired_realtime() { + test_pool_timer_removes_expired_inner().await + } + + #[tokio::test(start_paused = true)] + async fn test_pool_timer_removes_expired_faketime() { + test_pool_timer_removes_expired_inner().await + } + + async fn test_pool_timer_removes_expired_inner() { let pool = Pool::new( super::Config { idle_timeout: Some(Duration::from_millis(10)), diff --git a/src/common/timer.rs b/src/common/timer.rs index 390be3b0..95959fe8 100644 --- a/src/common/timer.rs +++ b/src/common/timer.rs @@ -35,4 +35,8 @@ impl hyper::rt::Timer for Timer { fn sleep_until(&self, deadline: Instant) -> Pin> { self.0.sleep_until(deadline) } + + fn now(&self) -> Instant { + self.0.now() + } } diff --git a/src/rt/tokio.rs b/src/rt/tokio.rs index a9469f0a..bd5fd50f 100644 --- a/src/rt/tokio.rs +++ b/src/rt/tokio.rs @@ -295,6 +295,10 @@ impl Timer for TokioTimer { sleep.reset(new_deadline) } } + + fn now(&self) -> Instant { + tokio::time::Instant::now().into() + } } impl TokioTimer {