Skip to content

Commit 5e924a6

Browse files
Add new retry policy options (#73)
1 parent 22aa0d9 commit 5e924a6

File tree

5 files changed

+225
-1
lines changed

5 files changed

+225
-1
lines changed

endpoint_manifest_schema.json

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,11 @@
2424
"maximum": 2147483647,
2525
"description": "Maximum supported protocol version"
2626
},
27+
"lambdaCompression": {
28+
"type": "string",
29+
"enum": ["zstd"],
30+
"description": "Compression used when the endpoint is a Lambda. This is unsupported if the endpoint is a regular HTTP endpoint."
31+
},
2732
"services": {
2833
"type": "array",
2934
"items": {
@@ -175,6 +180,33 @@
175180
"type": "boolean",
176181
"description": "If true, the service cannot be invoked from the HTTP nor Kafka ingress."
177182
},
183+
"retryPolicyInitialInterval": {
184+
"type": "integer",
185+
"minimum": 0,
186+
"description": "Retry policy initial interval, expressed in milliseconds."
187+
},
188+
"retryPolicyMaxInterval": {
189+
"type": "integer",
190+
"minimum": 0,
191+
"description": "Retry policy max interval, expressed in milliseconds."
192+
},
193+
"retryPolicyMaxAttempts": {
194+
"type": "integer",
195+
"minimum": 0,
196+
"description": "Retry policy max attempts."
197+
},
198+
"retryPolicyExponentiationFactor": {
199+
"type": "number",
200+
"description": "Retry policy exponentiation factor."
201+
},
202+
"retryPolicyOnMaxAttempts": {
203+
"title": "RetryPolicyOnMaxAttempts",
204+
"enum": [
205+
"PAUSE",
206+
"KILL"
207+
],
208+
"description": "Retry policy behavior on max attempts."
209+
},
178210
"metadata": {
179211
"type": "object",
180212
"description": "Custom metadata of this handler definition. This metadata is shown on the Admin API when querying the service/handler definition.",
@@ -217,6 +249,33 @@
217249
"type": "boolean",
218250
"description": "If true, the service cannot be invoked from the HTTP nor Kafka ingress."
219251
},
252+
"retryPolicyInitialInterval": {
253+
"type": "integer",
254+
"minimum": 0,
255+
"description": "Retry policy initial interval, expressed in milliseconds."
256+
},
257+
"retryPolicyMaxInterval": {
258+
"type": "integer",
259+
"minimum": 0,
260+
"description": "Retry policy max interval, expressed in milliseconds."
261+
},
262+
"retryPolicyMaxAttempts": {
263+
"type": "integer",
264+
"minimum": 0,
265+
"description": "Retry policy max attempts."
266+
},
267+
"retryPolicyExponentiationFactor": {
268+
"type": "number",
269+
"description": "Retry policy exponentiation factor."
270+
},
271+
"retryPolicyOnMaxAttempts": {
272+
"title": "RetryPolicyOnMaxAttempts",
273+
"enum": [
274+
"PAUSE",
275+
"KILL"
276+
],
277+
"description": "Retry policy behavior on max attempts."
278+
},
220279
"metadata": {
221280
"type": "object",
222281
"description": "Custom metadata of this service definition. This metadata is shown on the Admin API when querying the service definition.",

macros/src/gen.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,11 @@ impl<'a> ServiceGenerator<'a> {
230230
workflow_completion_retention: None,
231231
enable_lazy_state: None,
232232
ingress_private: None,
233+
retry_policy_initial_interval: None,
234+
retry_policy_max_interval: None,
235+
retry_policy_max_attempts: None,
236+
retry_policy_exponentiation_factor: None,
237+
retry_policy_on_max_attempts: None,
233238
}
234239
}
235240
});
@@ -252,6 +257,11 @@ impl<'a> ServiceGenerator<'a> {
252257
idempotency_retention: None,
253258
enable_lazy_state: None,
254259
ingress_private: None,
260+
retry_policy_initial_interval: None,
261+
retry_policy_max_interval: None,
262+
retry_policy_max_attempts: None,
263+
retry_policy_exponentiation_factor: None,
264+
retry_policy_on_max_attempts: None,
255265
}
256266
}
257267
}

src/discovery.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,16 @@ impl Service {
6060
self.journal_retention = options.journal_retention.map(|d| d.as_millis() as u64);
6161
self.enable_lazy_state = options.enable_lazy_state;
6262
self.ingress_private = options.ingress_private;
63+
// Retry policy
64+
self.retry_policy_initial_interval = options
65+
.retry_policy_initial_interval
66+
.map(|d| d.as_millis() as u64);
67+
self.retry_policy_exponentiation_factor = options.retry_policy_exponentiation_factor;
68+
self.retry_policy_max_interval = options
69+
.retry_policy_max_interval
70+
.map(|d| d.as_millis() as u64);
71+
self.retry_policy_max_attempts = options.retry_policy_max_attempts;
72+
self.retry_policy_on_max_attempts = options.retry_policy_on_max_attempts;
6373

6474
// Apply handler specific options
6575
for (handler_name, handler_options) in options.handler_options {
@@ -86,5 +96,15 @@ impl Handler {
8696
options.workflow_retention.map(|d| d.as_millis() as u64);
8797
self.enable_lazy_state = options.enable_lazy_state;
8898
self.ingress_private = options.ingress_private;
99+
// Retry policy
100+
self.retry_policy_initial_interval = options
101+
.retry_policy_initial_interval
102+
.map(|d| d.as_millis() as u64);
103+
self.retry_policy_exponentiation_factor = options.retry_policy_exponentiation_factor;
104+
self.retry_policy_max_interval = options
105+
.retry_policy_max_interval
106+
.map(|d| d.as_millis() as u64);
107+
self.retry_policy_max_attempts = options.retry_policy_max_attempts;
108+
self.retry_policy_on_max_attempts = options.retry_policy_on_max_attempts;
89109
}
90110
}

src/endpoint/builder.rs

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,12 @@ pub struct ServiceOptions {
1616
pub(crate) journal_retention: Option<Duration>,
1717
pub(crate) enable_lazy_state: Option<bool>,
1818
pub(crate) ingress_private: Option<bool>,
19+
// Retry policy options
20+
pub(crate) retry_policy_initial_interval: Option<Duration>,
21+
pub(crate) retry_policy_exponentiation_factor: Option<f64>,
22+
pub(crate) retry_policy_max_interval: Option<Duration>,
23+
pub(crate) retry_policy_max_attempts: Option<u64>,
24+
pub(crate) retry_policy_on_max_attempts: Option<crate::discovery::RetryPolicyOnMaxAttempts>,
1925
pub(crate) handler_options: HashMap<String, HandlerOptions>,
2026

2127
_priv: (),
@@ -89,6 +95,53 @@ impl ServiceOptions {
8995
self
9096
}
9197

98+
/// Initial delay before the first retry attempt.
99+
///
100+
/// If unset, the server default is used.
101+
pub fn retry_policy_initial_interval(mut self, interval: Duration) -> Self {
102+
self.retry_policy_initial_interval = Some(interval);
103+
self
104+
}
105+
106+
/// Exponential backoff multiplier used to compute the next retry delay.
107+
///
108+
/// For attempt n, the next delay is roughly previousDelay * exponentiationFactor,
109+
/// capped by retry_policy_max_interval if set.
110+
pub fn retry_policy_exponentiation_factor(mut self, factor: f64) -> Self {
111+
self.retry_policy_exponentiation_factor = Some(factor);
112+
self
113+
}
114+
115+
/// Upper bound for the computed retry delay.
116+
pub fn retry_policy_max_interval(mut self, interval: Duration) -> Self {
117+
self.retry_policy_max_interval = Some(interval);
118+
self
119+
}
120+
121+
/// Maximum number of attempts before giving up retrying.
122+
///
123+
/// The initial call counts as the first attempt; retries increment the count by 1.
124+
pub fn retry_policy_max_attempts(mut self, attempts: u64) -> Self {
125+
self.retry_policy_max_attempts = Some(attempts);
126+
self
127+
}
128+
129+
/// Behavior when the configured retry_policy_max_attempts is reached: pause the invocation.
130+
///
131+
/// The invocation enters the paused state and can be manually resumed from the CLI or UI.
132+
pub fn retry_policy_pause_on_max_attempts(mut self) -> Self {
133+
self.retry_policy_on_max_attempts = Some(crate::discovery::RetryPolicyOnMaxAttempts::Pause);
134+
self
135+
}
136+
137+
/// Behavior when the configured retry_policy_max_attempts is reached: kill the invocation.
138+
///
139+
/// The invocation will be marked as failed and will not be retried unless explicitly re-triggered.
140+
pub fn retry_policy_kill_on_max_attempts(mut self) -> Self {
141+
self.retry_policy_on_max_attempts = Some(crate::discovery::RetryPolicyOnMaxAttempts::Kill);
142+
self
143+
}
144+
92145
/// Handler-specific options.
93146
///
94147
/// *Note*: If you provide a handler name for a non-existing handler, binding the service will *panic!*.
@@ -109,6 +162,12 @@ pub struct HandlerOptions {
109162
pub(crate) journal_retention: Option<Duration>,
110163
pub(crate) ingress_private: Option<bool>,
111164
pub(crate) enable_lazy_state: Option<bool>,
165+
// Retry policy options
166+
pub(crate) retry_policy_initial_interval: Option<Duration>,
167+
pub(crate) retry_policy_exponentiation_factor: Option<f64>,
168+
pub(crate) retry_policy_max_interval: Option<Duration>,
169+
pub(crate) retry_policy_max_attempts: Option<u64>,
170+
pub(crate) retry_policy_on_max_attempts: Option<crate::discovery::RetryPolicyOnMaxAttempts>,
112171

113172
_priv: (),
114173
}
@@ -183,6 +242,53 @@ impl HandlerOptions {
183242
self.enable_lazy_state = Some(enable);
184243
self
185244
}
245+
246+
/// Initial delay before the first retry attempt.
247+
///
248+
/// If unset, the server default is used.
249+
pub fn retry_policy_initial_interval(mut self, interval: Duration) -> Self {
250+
self.retry_policy_initial_interval = Some(interval);
251+
self
252+
}
253+
254+
/// Exponential backoff multiplier used to compute the next retry delay.
255+
///
256+
/// For attempt n, the next delay is roughly previousDelay * exponentiationFactor,
257+
/// capped by retry_policy_max_interval if set.
258+
pub fn retry_policy_exponentiation_factor(mut self, factor: f64) -> Self {
259+
self.retry_policy_exponentiation_factor = Some(factor);
260+
self
261+
}
262+
263+
/// Upper bound for the computed retry delay.
264+
pub fn retry_policy_max_interval(mut self, interval: Duration) -> Self {
265+
self.retry_policy_max_interval = Some(interval);
266+
self
267+
}
268+
269+
/// Maximum number of attempts before giving up retrying.
270+
///
271+
/// The initial call counts as the first attempt; retries increment the count by 1.
272+
pub fn retry_policy_max_attempts(mut self, attempts: u64) -> Self {
273+
self.retry_policy_max_attempts = Some(attempts);
274+
self
275+
}
276+
277+
/// Behavior when the configured retry_policy_max_attempts is reached: pause the invocation.
278+
///
279+
/// The invocation enters the paused state and can be manually resumed from the CLI or UI.
280+
pub fn retry_policy_pause_on_max_attempts(mut self) -> Self {
281+
self.retry_policy_on_max_attempts = Some(crate::discovery::RetryPolicyOnMaxAttempts::Pause);
282+
self
283+
}
284+
285+
/// Behavior when the configured retry_policy_max_attempts is reached: kill the invocation.
286+
///
287+
/// The invocation will be marked as failed and will not be retried unless explicitly re-triggered.
288+
pub fn retry_policy_kill_on_max_attempts(mut self) -> Self {
289+
self.retry_policy_on_max_attempts = Some(crate::discovery::RetryPolicyOnMaxAttempts::Kill);
290+
self
291+
}
186292
}
187293

188294
/// Builder for [`Endpoint`]

src/endpoint/mod.rs

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ const X_RESTATE_SERVER_VALUE: HeaderValue =
3737
HeaderValue::from_static(concat!("restate-sdk-rust/", env!("CARGO_PKG_VERSION")));
3838
const DISCOVERY_CONTENT_TYPE_V2: &str = "application/vnd.restate.endpointmanifest.v2+json";
3939
const DISCOVERY_CONTENT_TYPE_V3: &str = "application/vnd.restate.endpointmanifest.v3+json";
40+
const DISCOVERY_CONTENT_TYPE_V4: &str = "application/vnd.restate.endpointmanifest.v4+json";
4041

4142
type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;
4243

@@ -301,7 +302,10 @@ impl Endpoint {
301302
let mut version = 2;
302303
let mut content_type = DISCOVERY_CONTENT_TYPE_V2;
303304
if let Some(accept) = accept_header {
304-
if accept.contains(DISCOVERY_CONTENT_TYPE_V3) {
305+
if accept.contains(DISCOVERY_CONTENT_TYPE_V4) {
306+
version = 4;
307+
content_type = DISCOVERY_CONTENT_TYPE_V4;
308+
} else if accept.contains(DISCOVERY_CONTENT_TYPE_V3) {
305309
version = 3;
306310
content_type = DISCOVERY_CONTENT_TYPE_V3;
307311
} else if accept.contains(DISCOVERY_CONTENT_TYPE_V2) {
@@ -324,6 +328,7 @@ impl Endpoint {
324328
}],
325329
Bytes::from(
326330
serde_json::to_string(&crate::discovery::Endpoint {
331+
lambda_compression: None,
327332
max_protocol_version: 5,
328333
min_protocol_version: 5,
329334
protocol_mode: Some(match protocol_mode {
@@ -341,6 +346,30 @@ impl Endpoint {
341346

342347
fn validate_discovery_request(&self, version: usize) -> Result<(), ErrorInner> {
343348
// Validate that new discovery fields aren't used with older protocol versions
349+
if version <= 3 {
350+
// Check for new discovery fields in version 3 that shouldn't be used in version 2 or lower
351+
for service in &self.0.discovery_services {
352+
if service.retry_policy_initial_interval.is_some()
353+
|| service.retry_policy_exponentiation_factor.is_some()
354+
|| service.retry_policy_max_interval.is_some()
355+
|| service.retry_policy_max_attempts.is_some()
356+
|| service.retry_policy_on_max_attempts.is_some()
357+
{
358+
Err(ErrorInner::FieldRequiresMinimumVersion("retry_policy", 4))?;
359+
}
360+
361+
for handler in &service.handlers {
362+
if handler.retry_policy_initial_interval.is_some()
363+
|| handler.retry_policy_exponentiation_factor.is_some()
364+
|| handler.retry_policy_max_interval.is_some()
365+
|| handler.retry_policy_max_attempts.is_some()
366+
|| handler.retry_policy_on_max_attempts.is_some()
367+
{
368+
Err(ErrorInner::FieldRequiresMinimumVersion("retry_policy", 4))?;
369+
}
370+
}
371+
}
372+
}
344373
if version <= 2 {
345374
// Check for new discovery fields in version 3 that shouldn't be used in version 2 or lower
346375
for service in &self.0.discovery_services {

0 commit comments

Comments
 (0)