Skip to content

Commit b992545

Browse files
georgeh0Copilot
andauthored
fix: use None for unlimited retry deadline instead of a large duration (#1155)
* fix: use `None` for unlimited retry deadline instead of a large duration * Update src/utils/retryable.rs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
1 parent 18f5cc0 commit b992545

File tree

2 files changed

+20
-12
lines changed

2 files changed

+20
-12
lines changed

src/execution/live_updater.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ impl SourceUpdateTask {
144144
async move {
145145
let mut change_stream = change_stream;
146146
let retry_options = retryable::RetryOptions {
147-
retry_timeout: std::time::Duration::from_secs(365 * 24 * 60 * 60),
147+
retry_timeout: None,
148148
initial_backoff: std::time::Duration::from_secs(5),
149149
max_backoff: std::time::Duration::from_secs(60),
150150
};

src/utils/retryable.rs

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -80,23 +80,23 @@ pub fn Ok<T>(value: T) -> Result<T> {
8080
}
8181

8282
pub struct RetryOptions {
83-
pub retry_timeout: Duration,
83+
pub retry_timeout: Option<Duration>,
8484
pub initial_backoff: Duration,
8585
pub max_backoff: Duration,
8686
}
8787

8888
impl Default for RetryOptions {
8989
fn default() -> Self {
9090
Self {
91-
retry_timeout: DEFAULT_RETRY_TIMEOUT,
91+
retry_timeout: Some(DEFAULT_RETRY_TIMEOUT),
9292
initial_backoff: Duration::from_millis(100),
9393
max_backoff: Duration::from_secs(10),
9494
}
9595
}
9696
}
9797

9898
pub static HEAVY_LOADED_OPTIONS: RetryOptions = RetryOptions {
99-
retry_timeout: DEFAULT_RETRY_TIMEOUT,
99+
retry_timeout: Some(DEFAULT_RETRY_TIMEOUT),
100100
initial_backoff: Duration::from_secs(1),
101101
max_backoff: Duration::from_secs(60),
102102
};
@@ -110,8 +110,9 @@ pub async fn run<
110110
f: F,
111111
options: &RetryOptions,
112112
) -> Result<Ok, Err> {
113-
let start_time = Instant::now();
114-
let deadline = start_time + options.retry_timeout;
113+
let deadline = options
114+
.retry_timeout
115+
.map(|timeout| Instant::now() + timeout);
115116
let mut backoff = options.initial_backoff;
116117

117118
loop {
@@ -121,13 +122,20 @@ pub async fn run<
121122
if !err.is_retryable() {
122123
return Result::Err(err);
123124
}
124-
let now = Instant::now();
125-
if now >= deadline {
126-
return Result::Err(err);
125+
let mut sleep_duration = backoff;
126+
if let Some(deadline) = deadline {
127+
let now = Instant::now();
128+
if now >= deadline {
129+
return Result::Err(err);
130+
}
131+
let remaining_time = deadline.saturating_duration_since(now);
132+
sleep_duration = std::cmp::min(sleep_duration, remaining_time);
127133
}
128-
trace!("Will retry in {}ms for error: {}", backoff.as_millis(), err);
129-
let remaining_time = deadline.saturating_duration_since(now);
130-
let sleep_duration = std::cmp::min(backoff, remaining_time);
134+
trace!(
135+
"Will retry in {}ms for error: {}",
136+
sleep_duration.as_millis(),
137+
err
138+
);
131139
tokio::time::sleep(sleep_duration).await;
132140
if backoff < options.max_backoff {
133141
backoff = std::cmp::min(

0 commit comments

Comments
 (0)