Skip to content

Commit 4cdf8f9

Browse files
authored
feat(epoxy): add ability to get & set coordinator state (#3294)
* chore(guard): add metadata to rate limit error * chore(tracing-utils): add tracing utils for custom instrument * feat(epoxy): add ability to get & set coordinator state
1 parent 5513171 commit 4cdf8f9

File tree

3 files changed

+108
-2
lines changed

3 files changed

+108
-2
lines changed

engine/packages/api-peer/src/internal.rs

Lines changed: 78 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use anyhow::Result;
1+
use anyhow::*;
22
use gas::prelude::*;
33
use rivet_api_builder::ApiCtx;
44
use serde::{Deserialize, Serialize};
@@ -91,6 +91,12 @@ pub struct ReplicaReconfigureRequest {}
9191
#[serde(deny_unknown_fields)]
9292
pub struct ReplicaReconfigureResponse {}
9393

94+
/// Triggers the epoxy coordinator to reconfigure all replicas.
95+
///
96+
/// Useful when a replica's configuration is outdated for any reason and needs to be re-notified of
97+
/// changes.
98+
///
99+
/// This should never need to be called manually if everything is operating correctly.
94100
pub async fn epoxy_replica_reconfigure(
95101
ctx: ApiCtx,
96102
_path: (),
@@ -107,3 +113,74 @@ pub async fn epoxy_replica_reconfigure(
107113

108114
Ok(ReplicaReconfigureResponse {})
109115
}
116+
117+
#[derive(Serialize)]
118+
#[serde(deny_unknown_fields)]
119+
pub struct GetEpoxyStateResponse {
120+
pub config: epoxy::types::ClusterConfig,
121+
}
122+
123+
/// Returns the current epoxy coordinator cluster configuration.
124+
///
125+
/// Useful for inspecting the current state of the epoxy cluster, including all replicas and their statuses.
126+
pub async fn get_epoxy_state(ctx: ApiCtx, _path: (), _query: ()) -> Result<GetEpoxyStateResponse> {
127+
let workflow_id = ctx
128+
.find_workflow::<epoxy::workflows::coordinator::Workflow>((
129+
"replica",
130+
ctx.config().epoxy_replica_id(),
131+
))
132+
.await?
133+
.ok_or_else(|| anyhow!("epoxy coordinator workflow not found"))?;
134+
135+
let wfs = ctx.get_workflows(vec![workflow_id]).await?;
136+
let wf = wfs.first().ok_or_else(|| anyhow!("workflow not found"))?;
137+
138+
let state: epoxy::workflows::coordinator::State =
139+
wf.parse_state().context("failed to parse workflow state")?;
140+
141+
Ok(GetEpoxyStateResponse {
142+
config: state.config,
143+
})
144+
}
145+
146+
#[derive(Deserialize)]
147+
#[serde(deny_unknown_fields)]
148+
pub struct SetEpoxyStateRequest {
149+
pub config: epoxy::types::ClusterConfig,
150+
}
151+
152+
#[derive(Serialize)]
153+
#[serde(deny_unknown_fields)]
154+
pub struct SetEpoxyStateResponse {}
155+
156+
/// Overrides the epoxy coordinator cluster configuration and triggers reconfiguration.
157+
///
158+
/// Useful for manually adjusting the cluster state in case the replica status drifts from the
159+
/// state in the coordinator. This will automatically trigger a reconfigure when called.
160+
///
161+
/// This should never need to be called manually if everything is operating correctly.
162+
pub async fn set_epoxy_state(
163+
ctx: ApiCtx,
164+
_path: (),
165+
_query: (),
166+
body: SetEpoxyStateRequest,
167+
) -> Result<SetEpoxyStateResponse> {
168+
ensure!(
169+
body.config.coordinator_replica_id == ctx.config().epoxy_replica_id(),
170+
"config coordinator_replica_id ({}) does not match current replica id ({})",
171+
body.config.coordinator_replica_id,
172+
ctx.config().epoxy_replica_id()
173+
);
174+
175+
if ctx.config().is_leader() {
176+
ctx.signal(epoxy::workflows::coordinator::OverrideState {
177+
config: body.config,
178+
})
179+
.to_workflow::<epoxy::workflows::coordinator::Workflow>()
180+
.tag("replica", ctx.config().epoxy_replica_id())
181+
.send()
182+
.await?;
183+
}
184+
185+
Ok(SetEpoxyStateResponse {})
186+
}

engine/packages/api-peer/src/router.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,11 @@ pub async fn router(
3636
post(internal::bump_serverless_autoscaler),
3737
)
3838
.route(
39-
"/epoxy/replica-reconfigure",
39+
"/epoxy/coordinator/replica-reconfigure",
4040
post(internal::epoxy_replica_reconfigure),
4141
)
42+
.route("/epoxy/coordinator/state", get(internal::get_epoxy_state))
43+
.route("/epoxy/coordinator/state", post(internal::set_epoxy_state))
4244
.route("/debug/tracing/config", put(internal::set_tracing_config))
4345
})
4446
.await

engine/packages/epoxy/src/workflows/coordinator/mod.rs

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,12 @@ pub async fn epoxy_coordinator(ctx: &mut WorkflowCtx, _input: &Input) -> Result<
4040
Main::ReplicaReconfigure(_) => {
4141
replica_status_change::replica_reconfigure(ctx).await?;
4242
}
43+
Main::OverrideState(sig) => {
44+
ctx.activity(OverrideStateActivityInput { config: sig.config })
45+
.await?;
46+
47+
reconfigure::reconfigure(ctx).await?;
48+
}
4349
}
4450

4551
Ok(Loop::<()>::Continue)
@@ -67,6 +73,21 @@ pub async fn check_config_changes(ctx: &ActivityCtx, _input: &InitInput) -> Resu
6773
Ok(())
6874
}
6975

76+
#[derive(Debug, Clone, Serialize, Deserialize, Hash)]
77+
pub struct OverrideStateActivityInput {
78+
pub config: types::ClusterConfig,
79+
}
80+
81+
#[activity(OverrideStateActivity)]
82+
pub async fn override_state_activity(
83+
ctx: &ActivityCtx,
84+
input: &OverrideStateActivityInput,
85+
) -> Result<()> {
86+
let mut state = ctx.state::<State>()?;
87+
state.config = input.config.clone();
88+
Ok(())
89+
}
90+
7091
#[message("epoxy_coordinator_config_update")]
7192
pub struct ConfigChangeMessage {
7293
pub config: types::ClusterConfig,
@@ -87,8 +108,14 @@ pub struct ReplicaStatusChange {
87108
#[signal("epoxy_coordinator_replica_reconfigure")]
88109
pub struct ReplicaReconfigure {}
89110

111+
#[signal("epoxy_coordinator_override_state")]
112+
pub struct OverrideState {
113+
pub config: types::ClusterConfig,
114+
}
115+
90116
join_signal!(Main {
91117
Reconfigure,
92118
ReplicaStatusChange,
93119
ReplicaReconfigure,
120+
OverrideState,
94121
});

0 commit comments

Comments
 (0)