Skip to content

Commit 01168ff

Browse files
Refactor Endpoint internals (#71)
* Refactor Endpoint internals. It's ok to depend on `http` crate, and this reduces the number of additional abstractions needed. * Split handle in handle_with_options. For now keep handle_with_options private.
1 parent 24c829d commit 01168ff

File tree

4 files changed

+669
-628
lines changed

4 files changed

+669
-628
lines changed

Cargo.toml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,15 +27,16 @@ tracing-span-filter = ["dep:tracing-subscriber"]
2727
bytes = "1.10"
2828
futures = "0.3"
2929
http = "1.3"
30+
http-body = "1.0.1"
3031
http-body-util = { version = "0.1", optional = true }
3132
hyper = { version = "1.6", optional = true}
3233
hyper-util = { version = "0.1", features = ["tokio", "server", "server-graceful", "http2"], optional = true }
3334
pin-project-lite = "0.2"
3435
rand = { version = "0.9", optional = true }
35-
regress = "0.10"
36+
regress = "=0.10.3"
3637
restate-sdk-macros = { version = "0.6", path = "macros" }
3738
restate-sdk-shared-core = { version = "=0.4.0", features = ["request_identity", "sha2_random_seed", "http"] }
38-
schemars = { version = "1.0.0-alpha.17", optional = true }
39+
schemars = { version = "1.0.0", optional = true }
3940
serde = "1.0"
4041
serde_json = "1.0"
4142
thiserror = "2.0"

src/endpoint/builder.rs

Lines changed: 256 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,256 @@
1+
use crate::endpoint::{BoxedService, Endpoint, EndpointInner, Error};
2+
use crate::service::{Discoverable, Service};
3+
use futures::future::BoxFuture;
4+
use restate_sdk_shared_core::{IdentityVerifier, KeyError};
5+
use std::collections::HashMap;
6+
use std::sync::Arc;
7+
use std::time::Duration;
8+
9+
/// Various configuration options that can be provided when binding a service
10+
#[derive(Default, Debug, Clone)]
11+
pub struct ServiceOptions {
12+
pub(crate) metadata: HashMap<String, String>,
13+
pub(crate) inactivity_timeout: Option<Duration>,
14+
pub(crate) abort_timeout: Option<Duration>,
15+
pub(crate) idempotency_retention: Option<Duration>,
16+
pub(crate) journal_retention: Option<Duration>,
17+
pub(crate) enable_lazy_state: Option<bool>,
18+
pub(crate) ingress_private: Option<bool>,
19+
pub(crate) handler_options: HashMap<String, HandlerOptions>,
20+
21+
_priv: (),
22+
}
23+
24+
impl ServiceOptions {
25+
pub fn new() -> Self {
26+
Self::default()
27+
}
28+
29+
/// This timer guards against stalled invocations. Once it expires, Restate triggers a graceful
30+
/// termination by asking the invocation to suspend (which preserves intermediate progress).
31+
///
32+
/// The abort_timeout is used to abort the invocation, in case it doesn't react to the request to
33+
/// suspend.
34+
///
35+
/// This overrides the default inactivity timeout configured in the restate-server for all
36+
/// invocations to this service.
37+
pub fn inactivity_timeout(mut self, timeout: Duration) -> Self {
38+
self.inactivity_timeout = Some(timeout);
39+
self
40+
}
41+
42+
/// This timer guards against stalled service/handler invocations that are supposed to terminate. The
43+
/// abort timeout is started after the inactivity_timeout has expired and the service/handler
44+
/// invocation has been asked to gracefully terminate. Once the timer expires, it will abort the
45+
/// service/handler invocation.
46+
///
47+
/// This timer potentially *interrupts* user code. If the user code needs longer to gracefully
48+
/// terminate, then this value needs to be set accordingly.
49+
///
50+
/// This overrides the default abort timeout configured in the restate-server for all invocations to
51+
/// this service.
52+
pub fn abort_timeout(mut self, timeout: Duration) -> Self {
53+
self.abort_timeout = Some(timeout);
54+
self
55+
}
56+
57+
/// The retention duration of idempotent requests to this service.
58+
pub fn idempotency_retention(mut self, retention: Duration) -> Self {
59+
self.idempotency_retention = Some(retention);
60+
self
61+
}
62+
63+
/// The journal retention. When set, this applies to all requests to all handlers of this service.
64+
///
65+
/// In case the request has an idempotency key, the idempotency_retention caps the journal retention
66+
/// time.
67+
pub fn journal_retention(mut self, retention: Duration) -> Self {
68+
self.journal_retention = Some(retention);
69+
self
70+
}
71+
72+
/// When set to `true`, lazy state will be enabled for all invocations to this service. This is
73+
/// relevant only for workflows and virtual objects.
74+
pub fn enable_lazy_state(mut self, enable: bool) -> Self {
75+
self.enable_lazy_state = Some(enable);
76+
self
77+
}
78+
79+
/// When set to `true` this service, with all its handlers, cannot be invoked from the restate-server
80+
/// HTTP and Kafka ingress, but only from other services.
81+
pub fn ingress_private(mut self, private: bool) -> Self {
82+
self.ingress_private = Some(private);
83+
self
84+
}
85+
86+
/// Custom metadata of this service definition. This metadata is shown on the Admin API when querying the service definition.
87+
pub fn metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
88+
self.metadata.insert(key.into(), value.into());
89+
self
90+
}
91+
92+
/// Handler-specific options.
93+
///
94+
/// *Note*: If you provide a handler name for a non-existing handler, binding the service will *panic!*.
95+
pub fn handler(mut self, handler_name: impl Into<String>, options: HandlerOptions) -> Self {
96+
self.handler_options.insert(handler_name.into(), options);
97+
self
98+
}
99+
}
100+
101+
/// Various configuration options that can be provided when binding a service handler
102+
#[derive(Default, Debug, Clone)]
103+
pub struct HandlerOptions {
104+
pub(crate) metadata: HashMap<String, String>,
105+
pub(crate) inactivity_timeout: Option<Duration>,
106+
pub(crate) abort_timeout: Option<Duration>,
107+
pub(crate) idempotency_retention: Option<Duration>,
108+
pub(crate) workflow_retention: Option<Duration>,
109+
pub(crate) journal_retention: Option<Duration>,
110+
pub(crate) ingress_private: Option<bool>,
111+
pub(crate) enable_lazy_state: Option<bool>,
112+
113+
_priv: (),
114+
}
115+
116+
impl HandlerOptions {
117+
pub fn new() -> Self {
118+
Self::default()
119+
}
120+
121+
/// Custom metadata of this handler definition. This metadata is shown on the Admin API when querying the service/handler definition.
122+
pub fn metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
123+
self.metadata.insert(key.into(), value.into());
124+
self
125+
}
126+
127+
/// This timer guards against stalled invocations. Once it expires, Restate triggers a graceful
128+
/// termination by asking the invocation to suspend (which preserves intermediate progress).
129+
///
130+
/// The abort_timeout is used to abort the invocation, in case it doesn't react to the request to
131+
/// suspend.
132+
///
133+
/// This overrides the inactivity timeout set for the service and the default set in restate-server.
134+
pub fn inactivity_timeout(mut self, timeout: Duration) -> Self {
135+
self.inactivity_timeout = Some(timeout);
136+
self
137+
}
138+
139+
/// This timer guards against stalled invocations that are supposed to terminate. The abort timeout
140+
/// is started after the inactivity_timeout has expired and the invocation has been asked to
141+
/// gracefully terminate. Once the timer expires, it will abort the invocation.
142+
///
143+
/// This timer potentially *interrupts* user code. If the user code needs longer to gracefully
144+
/// terminate, then this value needs to be set accordingly.
145+
///
146+
/// This overrides the abort timeout set for the service and the default set in restate-server.
147+
pub fn abort_timeout(mut self, timeout: Duration) -> Self {
148+
self.abort_timeout = Some(timeout);
149+
self
150+
}
151+
152+
/// The retention duration of idempotent requests to this service.
153+
pub fn idempotency_retention(mut self, retention: Duration) -> Self {
154+
self.idempotency_retention = Some(retention);
155+
self
156+
}
157+
158+
/// The retention duration for this workflow handler.
159+
pub fn workflow_retention(mut self, retention: Duration) -> Self {
160+
self.workflow_retention = Some(retention);
161+
self
162+
}
163+
164+
/// The journal retention for invocations to this handler.
165+
///
166+
/// In case the request has an idempotency key, the idempotency_retention caps the journal retention
167+
/// time.
168+
pub fn journal_retention(mut self, retention: Duration) -> Self {
169+
self.journal_retention = Some(retention);
170+
self
171+
}
172+
173+
/// When set to `true` this handler cannot be invoked from the restate-server HTTP and Kafka ingress,
174+
/// but only from other services.
175+
pub fn ingress_private(mut self, private: bool) -> Self {
176+
self.ingress_private = Some(private);
177+
self
178+
}
179+
180+
/// When set to `true`, lazy state will be enabled for all invocations to this handler. This is
181+
/// relevant only for workflows and virtual objects.
182+
pub fn enable_lazy_state(mut self, enable: bool) -> Self {
183+
self.enable_lazy_state = Some(enable);
184+
self
185+
}
186+
}
187+
188+
/// Builder for [`Endpoint`]
189+
#[derive(Default)]
190+
pub struct Builder {
191+
svcs: HashMap<String, BoxedService>,
192+
discovery_services: Vec<crate::discovery::Service>,
193+
identity_verifier: IdentityVerifier,
194+
}
195+
196+
impl Builder {
197+
/// Create a new builder for [`Endpoint`].
198+
pub fn new() -> Self {
199+
Self::default()
200+
}
201+
202+
/// Add a [`Service`] to this endpoint.
203+
///
204+
/// When using the [`service`](macro@crate::service), [`object`](macro@crate::object) or [`workflow`](macro@crate::workflow) macros,
205+
/// you need to pass the result of the `serve` method.
206+
pub fn bind<
207+
S: Service<Future = BoxFuture<'static, Result<(), Error>>>
208+
+ Discoverable
209+
+ Send
210+
+ Sync
211+
+ 'static,
212+
>(
213+
self,
214+
s: S,
215+
) -> Self {
216+
self.bind_with_options(s, ServiceOptions::default())
217+
}
218+
219+
/// Like [`bind`], but providing options
220+
pub fn bind_with_options<
221+
S: Service<Future = BoxFuture<'static, Result<(), Error>>>
222+
+ Discoverable
223+
+ Send
224+
+ Sync
225+
+ 'static,
226+
>(
227+
mut self,
228+
s: S,
229+
service_options: ServiceOptions,
230+
) -> Self {
231+
// Discover and apply options
232+
let mut service_metadata = S::discover();
233+
service_metadata.apply_options(service_options);
234+
235+
let boxed_service = BoxedService::new(s);
236+
self.svcs
237+
.insert(service_metadata.name.to_string(), boxed_service);
238+
self.discovery_services.push(service_metadata);
239+
self
240+
}
241+
242+
/// Add identity key, e.g. `publickeyv1_ChjENKeMvCtRnqG2mrBK1HmPKufgFUc98K8B3ononQvp`.
243+
pub fn identity_key(mut self, key: &str) -> Result<Self, KeyError> {
244+
self.identity_verifier = self.identity_verifier.with_key(key)?;
245+
Ok(self)
246+
}
247+
248+
/// Build the [`Endpoint`].
249+
pub fn build(self) -> Endpoint {
250+
Endpoint(Arc::new(EndpointInner {
251+
svcs: self.svcs,
252+
discovery_services: self.discovery_services,
253+
identity_verifier: self.identity_verifier,
254+
}))
255+
}
256+
}

0 commit comments

Comments
 (0)