Skip to content

Commit dfb2342

Browse files
author
Ariel Ben-Yehuda
committed
feat: allow overriding the instant returned from Timer
See #3950
1 parent f9f8f44 commit dfb2342

File tree

5 files changed

+37
-12
lines changed

5 files changed

+37
-12
lines changed

benches/support/tokiort.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,10 @@ impl Timer for TokioTimer {
4242
})
4343
}
4444

45+
fn now(&self) -> Instant {
46+
tokio::time::Instant::now().into()
47+
}
48+
4549
fn reset(&self, sleep: &mut Pin<Box<dyn Sleep>>, new_deadline: Instant) {
4650
if let Some(sleep) = sleep.as_mut().downcast_mut_pin::<TokioSleep>() {
4751
sleep.reset(new_deadline)

src/common/time.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,13 @@ impl Time {
5050
}
5151
}
5252

53+
pub(crate) fn now(&self) -> Instant {
54+
match *self {
55+
Time::Empty => Instant::now(),
56+
Time::Timer(ref t) => t.now(),
57+
}
58+
}
59+
5360
pub(crate) fn reset(&self, sleep: &mut Pin<Box<dyn Sleep>>, new_deadline: Instant) {
5461
match *self {
5562
Time::Empty => {

src/proto/h1/conn.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use std::marker::{PhantomData, Unpin};
66
use std::pin::Pin;
77
use std::task::{Context, Poll};
88
#[cfg(feature = "server")]
9-
use std::time::{Duration, Instant};
9+
use std::time::Duration;
1010

1111
use crate::rt::{Read, Write};
1212
use bytes::{Buf, Bytes};
@@ -218,7 +218,7 @@ where
218218
#[cfg(feature = "server")]
219219
if !self.state.h1_header_read_timeout_running {
220220
if let Some(h1_header_read_timeout) = self.state.h1_header_read_timeout {
221-
let deadline = Instant::now() + h1_header_read_timeout;
221+
let deadline = self.state.timer.now() + h1_header_read_timeout;
222222
self.state.h1_header_read_timeout_running = true;
223223
match self.state.h1_header_read_timeout_fut {
224224
Some(ref mut h1_header_read_timeout_fut) => {

src/proto/h2/ping.rs

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ pub(super) fn disabled() -> Recorder {
3737
Recorder { shared: None }
3838
}
3939

40-
pub(super) fn channel(ping_pong: PingPong, config: Config, __timer: Time) -> (Recorder, Ponger) {
40+
pub(super) fn channel(ping_pong: PingPong, config: Config, timer: Time) -> (Recorder, Ponger) {
4141
debug_assert!(
4242
config.is_enabled(),
4343
"ping channel requires bdp or keep-alive config",
@@ -51,8 +51,10 @@ pub(super) fn channel(ping_pong: PingPong, config: Config, __timer: Time) -> (Re
5151
stable_count: 0,
5252
});
5353

54+
let now = timer.now();
55+
5456
let (bytes, next_bdp_at) = if bdp.is_some() {
55-
(Some(0), Some(Instant::now()))
57+
(Some(0), Some(now))
5658
} else {
5759
(None, None)
5860
};
@@ -61,12 +63,12 @@ pub(super) fn channel(ping_pong: PingPong, config: Config, __timer: Time) -> (Re
6163
interval,
6264
timeout: config.keep_alive_timeout,
6365
while_idle: config.keep_alive_while_idle,
64-
sleep: __timer.sleep(interval),
66+
sleep: timer.sleep(interval),
6567
state: KeepAliveState::Init,
66-
timer: __timer,
68+
timer: timer.clone(),
6769
});
6870

69-
let last_read_at = keep_alive.as_ref().map(|_| Instant::now());
71+
let last_read_at = keep_alive.as_ref().map(|_| now);
7072

7173
let shared = Arc::new(Mutex::new(Shared {
7274
bytes,
@@ -75,6 +77,7 @@ pub(super) fn channel(ping_pong: PingPong, config: Config, __timer: Time) -> (Re
7577
ping_pong,
7678
ping_sent_at: None,
7779
next_bdp_at,
80+
timer,
7881
}));
7982

8083
(
@@ -130,6 +133,7 @@ struct Shared {
130133
last_read_at: Option<Instant>,
131134

132135
is_keep_alive_timed_out: bool,
136+
timer: Time,
133137
}
134138

135139
struct Bdp {
@@ -200,7 +204,7 @@ impl Recorder {
200204
// if not, we don't need to record bytes either
201205

202206
if let Some(ref next_bdp_at) = locked.next_bdp_at {
203-
if Instant::now() < *next_bdp_at {
207+
if locked.timer.now() < *next_bdp_at {
204208
return;
205209
} else {
206210
locked.next_bdp_at = None;
@@ -259,8 +263,8 @@ impl Recorder {
259263

260264
impl Ponger {
261265
pub(super) fn poll(&mut self, cx: &mut task::Context<'_>) -> Poll<Ponged> {
262-
let now = Instant::now();
263266
let mut locked = self.shared.lock().unwrap();
267+
let now = locked.timer.now(); // hoping this is fine to move within the lock
264268
let is_idle = self.is_idle();
265269

266270
if let Some(ref mut ka) = self.keep_alive {
@@ -329,7 +333,7 @@ impl Shared {
329333
fn send_ping(&mut self) {
330334
match self.ping_pong.send_ping(Ping::opaque()) {
331335
Ok(()) => {
332-
self.ping_sent_at = Some(Instant::now());
336+
self.ping_sent_at = Some(self.timer.now());
333337
trace!("sent ping");
334338
}
335339
Err(_err) => {
@@ -344,7 +348,7 @@ impl Shared {
344348

345349
fn update_last_read_at(&mut self) {
346350
if self.last_read_at.is_some() {
347-
self.last_read_at = Some(Instant::now());
351+
self.last_read_at = Some(self.timer.now());
348352
}
349353
}
350354

@@ -468,7 +472,7 @@ impl KeepAlive {
468472
trace!("keep-alive interval ({:?}) reached", self.interval);
469473
shared.send_ping();
470474
self.state = KeepAliveState::PingSent;
471-
let timeout = Instant::now() + self.timeout;
475+
let timeout = self.timer.now() + self.timeout;
472476
self.timer.reset(&mut self.sleep, timeout);
473477
}
474478
KeepAliveState::Init | KeepAliveState::PingSent => (),

src/rt/timer.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,16 @@ pub trait Timer {
7474
/// Return a future that resolves at `deadline`.
7575
fn sleep_until(&self, deadline: Instant) -> Pin<Box<dyn Sleep>>;
7676

77+
/// Return an `Instant` representing the current time
78+
///
79+
/// This function is currently unstable to implement - its
80+
/// definition might change in a future version of Hyper
81+
///
82+
/// FIXME: do we want to mention this function in the module doc comment?
83+
fn now(&self) -> Instant {
84+
Instant::now()
85+
}
86+
7787
/// Reset a future to resolve at `new_deadline` instead.
7888
fn reset(&self, sleep: &mut Pin<Box<dyn Sleep>>, new_deadline: Instant) {
7989
*sleep = self.sleep_until(new_deadline);

0 commit comments

Comments
 (0)