Skip to content

Commit 94751c7

Browse files
committed
Async NIFs
1 parent 85d0a90 commit 94751c7

File tree

13 files changed

+585
-8
lines changed

13 files changed

+585
-8
lines changed

rustler/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,15 @@ nif_version_2_15 = ["nif_version_2_14"]
1919
nif_version_2_16 = ["nif_version_2_15"]
2020
nif_version_2_17 = ["nif_version_2_16"]
2121
serde = ["dep:serde"]
22+
tokio_rt = ["dep:tokio"]
2223

2324
[dependencies]
2425
inventory = "0.3"
2526
rustler_codegen = { path = "../rustler_codegen", version = "0.37.1"}
2627
num-bigint = { version = "0.4", optional = true }
2728
serde = { version = "1", optional = true }
29+
tokio = { version = "1", optional = true, features = ["rt", "rt-multi-thread", "sync"] }
30+
once_cell = "1"
2831

2932
[target.'cfg(not(windows))'.dependencies]
3033
libloading = "0.9"

rustler/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,4 +83,7 @@ pub mod serde;
8383
#[cfg(feature = "serde")]
8484
pub use crate::serde::SerdeTerm;
8585

86+
#[cfg(feature = "tokio_rt")]
87+
pub mod tokio;
88+
8689
pub mod sys;

rustler/src/tokio/mod.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
mod runtime;
2+
3+
pub use runtime::{
4+
configure, configure_runtime, runtime_handle, ConfigError, RuntimeConfig,
5+
};

rustler/src/tokio/runtime.rs

Lines changed: 218 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,218 @@
1+
use crate::{Decoder, NifResult, Term};
2+
use once_cell::sync::OnceCell;
3+
use std::sync::Arc;
4+
use tokio::runtime::Runtime;
5+
6+
/// Global tokio runtime for async NIFs.
7+
///
8+
/// This runtime can be configured via `configure_runtime()` in your NIF's `load` callback,
9+
/// or will be lazily initialized with default settings on first use.
10+
static TOKIO_RUNTIME: OnceCell<Arc<Runtime>> = OnceCell::new();
11+
12+
/// Error type for runtime configuration failures.
13+
#[derive(Debug)]
14+
pub enum ConfigError {
15+
/// The runtime has already been initialized (either by configuration or first use).
16+
AlreadyInitialized,
17+
/// Failed to build the Tokio runtime.
18+
BuildFailed(std::io::Error),
19+
}
20+
21+
impl std::fmt::Display for ConfigError {
22+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
23+
match self {
24+
ConfigError::AlreadyInitialized => {
25+
write!(f, "Tokio runtime already initialized")
26+
}
27+
ConfigError::BuildFailed(e) => {
28+
write!(f, "Failed to build Tokio runtime: {}", e)
29+
}
30+
}
31+
}
32+
}
33+
34+
impl std::error::Error for ConfigError {}
35+
36+
/// Configuration options for the Tokio runtime.
37+
///
38+
/// These can be passed from Elixir via the `load_data` option:
39+
///
40+
/// ```elixir
41+
/// use Rustler,
42+
/// otp_app: :my_app,
43+
/// crate: :my_nif,
44+
/// load_data: [
45+
/// worker_threads: 4,
46+
/// thread_name: "my-runtime"
47+
/// ]
48+
/// ```
49+
#[derive(Debug, Clone)]
50+
pub struct RuntimeConfig {
51+
/// Number of worker threads for the runtime.
52+
/// If not specified, uses Tokio's default (number of CPU cores).
53+
pub worker_threads: Option<usize>,
54+
55+
/// Thread name prefix for worker threads.
56+
/// If not specified, uses "rustler-tokio".
57+
pub thread_name: Option<String>,
58+
59+
/// Stack size for worker threads in bytes.
60+
/// If not specified, uses Tokio's default.
61+
pub thread_stack_size: Option<usize>,
62+
}
63+
64+
impl Default for RuntimeConfig {
65+
fn default() -> Self {
66+
RuntimeConfig {
67+
worker_threads: None,
68+
thread_name: Some("rustler-tokio".to_string()),
69+
thread_stack_size: None,
70+
}
71+
}
72+
}
73+
74+
impl<'a> Decoder<'a> for RuntimeConfig {
75+
fn decode(term: Term<'a>) -> NifResult<Self> {
76+
use crate::types::map::MapIterator;
77+
use crate::Error;
78+
79+
let mut config = RuntimeConfig::default();
80+
81+
// Try to decode as a map/keyword list
82+
let map_iter = MapIterator::new(term).ok_or(Error::BadArg)?;
83+
84+
for (key, value) in map_iter {
85+
let key_str: String = key.decode()?;
86+
87+
match key_str.as_str() {
88+
"worker_threads" => {
89+
config.worker_threads = Some(value.decode()?);
90+
}
91+
"thread_name" => {
92+
config.thread_name = Some(value.decode()?);
93+
}
94+
"thread_stack_size" => {
95+
config.thread_stack_size = Some(value.decode()?);
96+
}
97+
_ => {
98+
// Ignore unknown options for forward compatibility
99+
}
100+
}
101+
}
102+
103+
Ok(config)
104+
}
105+
}
106+
107+
/// Configure the global Tokio runtime from Elixir load_data.
108+
///
109+
/// This is the recommended way to configure the runtime, allowing Elixir application
110+
/// developers to tune the runtime without recompiling the NIF.
111+
///
112+
/// # Example
113+
///
114+
/// ```ignore
115+
/// use rustler::{Env, Term};
116+
///
117+
/// fn load(_env: Env, load_info: Term) -> bool {
118+
/// // Try to decode runtime config from load_info
119+
/// if let Ok(config) = load_info.decode::<rustler::tokio::RuntimeConfig>() {
120+
/// rustler::tokio::configure(config)
121+
/// .expect("Failed to configure Tokio runtime");
122+
/// }
123+
/// true
124+
/// }
125+
/// ```
126+
///
127+
/// In your Elixir config:
128+
///
129+
/// ```elixir
130+
/// # config/config.exs
131+
/// config :my_app, MyNif,
132+
/// load_data: [
133+
/// worker_threads: 4,
134+
/// thread_name: "my-runtime"
135+
/// ]
136+
/// ```
137+
pub fn configure(config: RuntimeConfig) -> Result<(), ConfigError> {
138+
let mut builder = tokio::runtime::Builder::new_multi_thread();
139+
builder.enable_all();
140+
141+
// Apply configuration
142+
if let Some(threads) = config.worker_threads {
143+
builder.worker_threads(threads);
144+
}
145+
146+
if let Some(name) = config.thread_name {
147+
builder.thread_name(name);
148+
}
149+
150+
if let Some(stack_size) = config.thread_stack_size {
151+
builder.thread_stack_size(stack_size);
152+
}
153+
154+
let runtime = builder.build().map_err(ConfigError::BuildFailed)?;
155+
156+
TOKIO_RUNTIME
157+
.set(Arc::new(runtime))
158+
.map_err(|_| ConfigError::AlreadyInitialized)
159+
}
160+
161+
/// Configure the global Tokio runtime programmatically.
162+
///
163+
/// This provides direct access to the Tokio Builder API for advanced use cases.
164+
/// For most applications, prefer `configure_runtime_from_term` which allows
165+
/// configuration from Elixir.
166+
///
167+
/// # Example
168+
///
169+
/// ```ignore
170+
/// use rustler::{Env, Term};
171+
///
172+
/// fn load(_env: Env, _: Term) -> bool {
173+
/// rustler::tokio::configure_runtime(|builder| {
174+
/// builder
175+
/// .worker_threads(4)
176+
/// .thread_name("myapp-tokio")
177+
/// .thread_stack_size(3 * 1024 * 1024);
178+
/// }).expect("Failed to configure Tokio runtime");
179+
///
180+
/// true
181+
/// }
182+
/// ```
183+
pub fn configure_runtime<F>(config_fn: F) -> Result<(), ConfigError>
184+
where
185+
F: FnOnce(&mut tokio::runtime::Builder),
186+
{
187+
let mut builder = tokio::runtime::Builder::new_multi_thread();
188+
builder.enable_all();
189+
190+
// Allow user to customize
191+
config_fn(&mut builder);
192+
193+
let runtime = builder.build().map_err(ConfigError::BuildFailed)?;
194+
195+
TOKIO_RUNTIME
196+
.set(Arc::new(runtime))
197+
.map_err(|_| ConfigError::AlreadyInitialized)
198+
}
199+
200+
/// Get a handle to the global tokio runtime, or the current runtime if already inside one.
201+
pub fn runtime_handle() -> tokio::runtime::Handle {
202+
// Try to get the current runtime handle first (if already in a tokio context)
203+
tokio::runtime::Handle::try_current().unwrap_or_else(|_| {
204+
// Get or initialize with default configuration
205+
TOKIO_RUNTIME
206+
.get_or_init(|| {
207+
Arc::new(
208+
tokio::runtime::Builder::new_multi_thread()
209+
.enable_all()
210+
.thread_name("rustler-tokio")
211+
.build()
212+
.expect("Failed to create default tokio runtime for async NIFs"),
213+
)
214+
})
215+
.handle()
216+
.clone()
217+
})
218+
}

rustler/src/types/local_pid.rs

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,58 @@ impl Ord for LocalPid {
6363
}
6464
}
6565

66+
/// A wrapper for `LocalPid` that represents the calling process in async NIFs.
67+
///
68+
/// When used as the first parameter of an async NIF, `CallerPid` is automatically
69+
/// populated with the calling process's PID, and is not decoded from the arguments.
70+
/// This allows async NIFs to send intermediate messages back to the caller.
71+
///
72+
/// # Example
73+
///
74+
/// ```ignore
75+
/// #[rustler::nif]
76+
/// async fn with_progress(caller: CallerPid, work: Vec<i64>) -> i64 {
77+
/// // Send progress updates
78+
/// let mut env = OwnedEnv::new();
79+
/// env.send(caller.as_pid(), |e| "started".encode(e));
80+
///
81+
/// let result = do_work(work).await;
82+
///
83+
/// // Final result sent automatically
84+
/// result
85+
/// }
86+
/// ```
87+
#[derive(Copy, Clone)]
88+
pub struct CallerPid(LocalPid);
89+
90+
impl CallerPid {
91+
/// Create a new CallerPid from a LocalPid.
92+
///
93+
/// This is only used internally by the NIF macro.
94+
#[doc(hidden)]
95+
pub fn new(pid: LocalPid) -> Self {
96+
CallerPid(pid)
97+
}
98+
99+
/// Get the underlying LocalPid.
100+
pub fn as_pid(&self) -> &LocalPid {
101+
&self.0
102+
}
103+
104+
/// Check whether the calling process is alive.
105+
pub fn is_alive(self, env: Env) -> bool {
106+
self.0.is_alive(env)
107+
}
108+
}
109+
110+
impl std::ops::Deref for CallerPid {
111+
type Target = LocalPid;
112+
113+
fn deref(&self) -> &Self::Target {
114+
&self.0
115+
}
116+
}
117+
66118
impl Env<'_> {
67119
/// Return the calling process's pid.
68120
///

rustler/src/types/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ pub mod tuple;
2828

2929
#[doc(hidden)]
3030
pub mod local_pid;
31-
pub use self::local_pid::LocalPid;
31+
pub use self::local_pid::{CallerPid, LocalPid};
3232

3333
#[doc(hidden)]
3434
pub mod reference;

0 commit comments

Comments
 (0)