Skip to content

Commit fcdf726

Browse files
shayne-fletchermeta-codesync[bot]
authored andcommitted
systemd prototyping (#1829)
Summary: Pull Request resolved: #1829 add a minimal systemd D-Bus interface for managing transient units (ephemeral services created programmatically) as an alternative to `tokio::process`. the module provides `SystemdManager` for creating and controlling units, and `SystemdUnit` for querying state. the implementation uses zbus for async D-Bus communication and supports monitoring unit state transitions via property change signals. Reviewed By: mariusae Differential Revision: D86797969 fbshipit-source-id: effd99f02ad9b59082c5211321f8323f75cb2e37
1 parent 52b90eb commit fcdf726

File tree

3 files changed

+383
-0
lines changed

3 files changed

+383
-0
lines changed

hyperactor_mesh/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ tokio-stream = { version = "0.1.17", features = ["fs", "io-util", "net", "signal
8181
tokio-util = { version = "0.7.15", features = ["full"] }
8282
tracing = { version = "0.1.41", features = ["attributes", "valuable"] }
8383
tracing-subscriber = { version = "0.3.20", features = ["chrono", "env-filter", "json", "local-time", "parking_lot", "registry"] }
84+
zbus = { version = "5.11.0", features = ["async-executor", "async-fs", "async-io", "async-lock", "async-process", "async-task", "p2p", "tokio"], default-features = false }
8485

8586
[dev-dependencies]
8687
bytes = { version = "1.10", features = ["serde"] }

hyperactor_mesh/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ pub mod resource;
3232
mod router;
3333
pub mod shared_cell;
3434
pub mod shortuuid;
35+
#[cfg(target_os = "linux")]
36+
mod systemd;
3537
pub mod test_utils;
3638
mod testresource;
3739
pub mod v1;

hyperactor_mesh/src/systemd.rs

Lines changed: 380 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,380 @@
1+
/*
2+
* Copyright (c) Meta Platforms, Inc. and affiliates.
3+
* All rights reserved.
4+
*
5+
* This source code is licensed under the BSD-style license found in the
6+
* LICENSE file in the root directory of this source tree.
7+
*/
8+
9+
//! D-Bus interface to systemd for managing transient units.
10+
//!
11+
//! This module provides minimal proxies to systemd's D-Bus API,
12+
//! allowing us to create, monitor, and tear down **transient units**
13+
//! (ephemeral services created programmatically) instead of forking
14+
//! processes directly with `tokio::process`.
15+
//!
16+
//! # Key components
17+
//!
18+
//! - [`SystemdManager`]: Create and manage units
19+
//! (`start_transient_unit`, `stop_unit`, `reset_failed_unit`)
20+
//! - [`SystemdUnit`]: Query unit state (`active_state`, `sub_state`,
21+
//! `load_state`)
22+
//!
23+
//! # Example
24+
//!
25+
//! ```ignore
26+
//! let conn = Connection::session().await?;
27+
//! let systemd = SystemdManagerProxy::new(&conn).await?;
28+
//!
29+
//! // Create a transient service
30+
//! let exec_start = vec![(
31+
//! "/bin/sleep".to_string(),
32+
//! vec!["/bin/sleep".to_string(), "10".to_string()],
33+
//! false,
34+
//! )];
35+
//! let props = vec![
36+
//! ("Description", Value::from("my service")),
37+
//! ("ExecStart", Value::from(exec_start)),
38+
//! ];
39+
//! systemd.start_transient_unit("my-service.service", "replace", props, vec![]).await?;
40+
//!
41+
//! // Query its state
42+
//! let unit_path = systemd.get_unit("my-service.service").await?;
43+
//! let unit = SystemdUnitProxy::builder(&conn).path(unit_path)?.build().await?;
44+
//! assert_eq!(unit.active_state().await?, "active");
45+
46+
use zbus::Result;
47+
use zbus::proxy;
48+
use zbus::zvariant::OwnedObjectPath;
49+
use zbus::zvariant::Value;
50+
51+
/// Minimal proxy to `org.freedesktop.systemd1.Manager`.
52+
///
53+
/// We use this to talk to systemd over D-Bus (either the user bus or
54+
/// the system bus) so we can create, query, and tear down **transient
55+
/// units** instead of forking processes ourselves.
56+
#[proxy(
57+
interface = "org.freedesktop.systemd1.Manager",
58+
default_service = "org.freedesktop.systemd1",
59+
default_path = "/org/freedesktop/systemd1"
60+
)]
61+
trait SystemdManager {
62+
/// Create and start a transient unit, e.g. `foo.service`.
63+
///
64+
/// `name` is the unit name (`"foo.service"`),
65+
/// `mode` is usually `"replace"`,
66+
/// `properties` is the systemd property list (Description=…,
67+
/// ExecStart=…, Slice=…, etc),
68+
/// `aux` is for auxiliary drop-ins (we usually pass `vec![]`).
69+
fn start_transient_unit(
70+
&self,
71+
name: &str,
72+
mode: &str,
73+
properties: Vec<(&str, Value<'_>)>,
74+
aux: Vec<(&str, Vec<(&str, Value<'_>)>)>,
75+
) -> Result<OwnedObjectPath>;
76+
77+
/// Stop an existing unit by name , e.g. `"foo.service"`
78+
///
79+
/// `mode` is typically `"replace"` or `"fail"`.
80+
fn stop_unit(&self, name: &str, mode: &str) -> Result<OwnedObjectPath>;
81+
82+
/// Clear the "failed" state for a single unit so it can be
83+
/// started again without systemd complaining.
84+
fn reset_failed_unit(&self, name: &str) -> Result<()>;
85+
86+
/// Clear the "failed" state for *all* units owned by this
87+
/// manager.
88+
fn reset_failed(&self) -> Result<()>;
89+
90+
/// Return the D-Bus object path for a unit so we can inspect it
91+
/// further (active state, result, etc.).
92+
fn get_unit(&self, name: &str) -> Result<OwnedObjectPath>;
93+
}
94+
95+
/// Minimal view of a single systemd unit, used to query its state
96+
/// over D-Bus.
97+
#[proxy(
98+
interface = "org.freedesktop.systemd1.Unit",
99+
default_service = "org.freedesktop.systemd1"
100+
)]
101+
trait SystemdUnit {
102+
/// High-level unit state, e.g. "active", "inactive", "failed",
103+
/// "activating".
104+
#[zbus(property)]
105+
fn active_state(&self) -> Result<String>;
106+
107+
/// More specific state for the unit type, e.g. "running",
108+
/// "exited".
109+
#[zbus(property)]
110+
fn sub_state(&self) -> Result<String>;
111+
112+
/// Whether systemd has the unit loaded, e.g. "loaded",
113+
/// "not-found", "error".
114+
#[zbus(property)]
115+
fn load_state(&self) -> Result<String>;
116+
}
117+
118+
#[cfg(test)]
119+
mod tests {
120+
use std::sync::Arc;
121+
use std::sync::Mutex;
122+
123+
use futures::StreamExt;
124+
use hyperactor::clock::Clock;
125+
use hyperactor::clock::RealClock;
126+
use zbus::Connection;
127+
128+
use super::*;
129+
130+
/// Test creating and stopping a transient systemd unit.
131+
///
132+
/// Creates a simple `sleep` service, verifies it's running, stops
133+
/// it, and confirms the transient unit is cleaned up afterward.
134+
#[tokio::test]
135+
async fn test_start_transient_unit() -> Result<()> {
136+
// Skip if no session bus available (GitHub CI runners).
137+
let conn = match Connection::session().await {
138+
Ok(conn) => conn,
139+
Err(_) => {
140+
eprintln!("Skipping test: D-Bus session bus not available");
141+
return Ok(());
142+
}
143+
};
144+
145+
let unit_name = "test-sleep.service";
146+
let exec_start = vec![(
147+
"/bin/sleep".to_string(),
148+
vec!["/bin/sleep".to_string(), "30".to_string()],
149+
false,
150+
)];
151+
let props = vec![
152+
("Description", Value::from("transient sleep 30")),
153+
("ExecStart", Value::from(exec_start)),
154+
("CollectMode", Value::from("inactive-or-failed")),
155+
];
156+
let aux = Vec::new();
157+
158+
let systemd = SystemdManagerProxy::new(&conn).await?;
159+
160+
// Start the unit.
161+
let start_path = systemd
162+
.start_transient_unit(unit_name, "replace", props, aux)
163+
.await?;
164+
assert!(
165+
start_path
166+
.to_string()
167+
.contains("/org/freedesktop/systemd1/job"),
168+
"unexpected object path: {start_path}"
169+
);
170+
171+
// Get unit proxy for monitoring.
172+
let unit = SystemdUnitProxy::builder(&conn)
173+
.path(systemd.get_unit(unit_name).await?)?
174+
.build()
175+
.await?;
176+
177+
// Verify initial state.
178+
let active_state = unit.active_state().await?;
179+
let sub_state = unit.sub_state().await?;
180+
assert_eq!(active_state, "active");
181+
assert_eq!(sub_state, "running");
182+
183+
// Stop the unit.
184+
let stop_path = systemd.stop_unit(unit_name, "replace").await?;
185+
assert!(
186+
stop_path
187+
.to_string()
188+
.contains("/org/freedesktop/systemd1/job"),
189+
"unexpected object path: {stop_path}"
190+
);
191+
192+
// Poll for unit cleanup.
193+
for attempt in 1..=5 {
194+
RealClock.sleep(tokio::time::Duration::from_secs(1)).await;
195+
if systemd.get_unit(unit_name).await.is_err() {
196+
break;
197+
}
198+
if attempt == 5 {
199+
panic!("transient unit not cleaned up after {} seconds", attempt);
200+
}
201+
}
202+
203+
Ok(())
204+
}
205+
206+
/// Test monitoring systemd unit state transitions via D-Bus
207+
/// signals.
208+
///
209+
/// Creates a transient `sleep` service, subscribes to property
210+
/// change signals, stops the unit, and verifies the expected state
211+
/// transitions (Active → Inactive → Gone) are observed.
212+
#[tokio::test]
213+
async fn test_monitor_unit_state_transitions() -> Result<()> {
214+
// State enum to track unit lifecycle.
215+
#[derive(Debug, Clone, PartialEq)]
216+
enum UnitState {
217+
Active { sub_state: String },
218+
Deactivating { sub_state: String },
219+
Inactive { sub_state: String },
220+
Gone,
221+
}
222+
223+
impl UnitState {
224+
fn from_states(active: String, sub: String) -> Self {
225+
match active.as_str() {
226+
"active" => UnitState::Active { sub_state: sub },
227+
"deactivating" => UnitState::Deactivating { sub_state: sub },
228+
"inactive" => UnitState::Inactive { sub_state: sub },
229+
_ => UnitState::Inactive { sub_state: sub },
230+
}
231+
}
232+
}
233+
234+
// Skip if no session bus available (GitHub CI runners).
235+
let conn = match Connection::session().await {
236+
Ok(conn) => conn,
237+
Err(_) => {
238+
eprintln!("Skipping test: D-Bus session bus not available");
239+
return Ok(());
240+
}
241+
};
242+
243+
let unit_name = "test-sleep-monitor.service";
244+
245+
let exec_start = vec![(
246+
"/bin/sleep".to_string(),
247+
vec!["/bin/sleep".to_string(), "30".to_string()],
248+
false,
249+
)];
250+
let props = vec![
251+
("Description", Value::from("monitor state transitions")),
252+
("ExecStart", Value::from(exec_start)),
253+
("CollectMode", Value::from("inactive-or-failed")),
254+
];
255+
let aux = Vec::new();
256+
257+
let systemd = SystemdManagerProxy::new(&conn).await?;
258+
259+
// Start the unit.
260+
let start_path = systemd
261+
.start_transient_unit(unit_name, "replace", props, aux)
262+
.await?;
263+
assert!(
264+
start_path
265+
.to_string()
266+
.contains("/org/freedesktop/systemd1/job")
267+
);
268+
269+
// Get unit proxy for monitoring.
270+
let unit_path = systemd.get_unit(unit_name).await?;
271+
let unit = SystemdUnitProxy::builder(&conn)
272+
.path(unit_path)?
273+
.build()
274+
.await?;
275+
276+
// Verify initial state.
277+
let initial_active = unit.active_state().await?;
278+
let initial_sub = unit.sub_state().await?;
279+
assert_eq!(initial_active, "active");
280+
assert_eq!(initial_sub, "running");
281+
282+
// Collect state transitions.
283+
let initial_state = UnitState::Active {
284+
sub_state: initial_sub.clone(),
285+
};
286+
let states = Arc::new(Mutex::new(vec![initial_state.clone()]));
287+
288+
// Spawn background task to monitor property changes.
289+
let unit_clone = unit.clone();
290+
let states_clone = states.clone();
291+
let initial_state_clone = initial_state.clone();
292+
let monitor_task = tokio::spawn(async move {
293+
let mut last_state = Some(initial_state_clone);
294+
let mut active_stream = unit_clone.receive_active_state_changed().await;
295+
let mut sub_stream = unit_clone.receive_sub_state_changed().await;
296+
297+
loop {
298+
tokio::select! {
299+
Some(active_change) = active_stream.next() => {
300+
if let Ok(active) = active_change.get().await {
301+
if let Ok(sub) = unit_clone.sub_state().await {
302+
let state = UnitState::from_states(active, sub);
303+
if last_state.as_ref() != Some(&state) {
304+
states_clone.lock().unwrap().push(state.clone());
305+
last_state = Some(state);
306+
}
307+
}
308+
}
309+
}
310+
Some(sub_change) = sub_stream.next() => {
311+
if let Ok(sub) = sub_change.get().await {
312+
if let Ok(active) = unit_clone.active_state().await {
313+
let state = UnitState::from_states(active, sub);
314+
if last_state.as_ref() != Some(&state) {
315+
states_clone.lock().unwrap().push(state.clone());
316+
last_state = Some(state);
317+
}
318+
}
319+
}
320+
}
321+
else => break,
322+
}
323+
}
324+
});
325+
326+
// Give monitor time to set up.
327+
RealClock
328+
.sleep(tokio::time::Duration::from_millis(100))
329+
.await;
330+
331+
// Stop the unit.
332+
let stop_path = systemd.stop_unit(unit_name, "replace").await?;
333+
assert!(
334+
stop_path
335+
.to_string()
336+
.contains("/org/freedesktop/systemd1/job")
337+
);
338+
339+
// Poll for unit cleanup.
340+
for attempt in 1..=5 {
341+
RealClock.sleep(tokio::time::Duration::from_secs(1)).await;
342+
if systemd.get_unit(unit_name).await.is_err() {
343+
states.lock().unwrap().push(UnitState::Gone);
344+
break;
345+
}
346+
if attempt == 10 {
347+
panic!("transient unit not cleaned up after {} seconds", attempt);
348+
}
349+
}
350+
351+
// Stop monitoring.
352+
monitor_task.abort();
353+
354+
// Verify state transitions.
355+
let collected_states = states.lock().unwrap();
356+
357+
// Check for observed states.
358+
let has_active = collected_states
359+
.iter()
360+
.any(|s| matches!(s, UnitState::Active { .. }));
361+
let has_deactivating = collected_states
362+
.iter()
363+
.any(|s| matches!(s, UnitState::Deactivating { .. }));
364+
let has_inactive = collected_states
365+
.iter()
366+
.any(|s| matches!(s, UnitState::Inactive { .. }));
367+
let has_gone = collected_states
368+
.iter()
369+
.any(|s| matches!(s, UnitState::Gone));
370+
371+
assert!(has_active, "Should observe active");
372+
assert!(
373+
has_deactivating || has_inactive,
374+
"Should observe deactivating or inactive state during shutdown"
375+
);
376+
assert!(has_gone, "Should observe unit cleanup");
377+
378+
Ok(())
379+
}
380+
}

0 commit comments

Comments
 (0)