Skip to content

Conversation

@scrogson
Copy link
Member

@scrogson scrogson commented Nov 12, 2025

Async NIF Support with Tokio Runtime

Adds support for async Rust functions in NIFs, allowing long-running operations without blocking the BEAM scheduler. Async NIFs spawn tasks onto a Tokio runtime and send results via message passing.

Key Features

1. Async Functions

The #[rustler::nif] macro detects async fn and generates wrapper code that:

  • Returns :ok immediately (non-blocking)
  • Spawns task onto global Tokio runtime
  • Sends result via enif_send when complete
  • Requires owned types only (no Env, Term, or references)
#[rustler::nif]
async fn async_operation(input: String) -> String {
    tokio::time::sleep(Duration::from_secs(1)).await;
    input.to_uppercase()
}

2. CallerPid for Intermediate Messages

Optional CallerPid first parameter for sending progress updates. Doesn't count toward NIF arity.

#[rustler::nif]
async fn with_progress(caller: CallerPid, items: i64) -> i64 {
    for i in 0..items {
        let mut env = OwnedEnv::new();
        env.send_and_clear(caller.as_pid(), |e| ("progress", i).encode(e));
    }
    items
}

3. Configurable Runtime

Application developers configure via standard Elixir config:

# config/config.exs
config :my_nif_library, MyNifLibrary,
  load_data: [worker_threads: 4, thread_name: "myapp-async"]

NIF authors decode in load callback:

fn load(_env: Env, load_info: Term) -> bool {
    #[cfg(feature = "tokio_rt")]
    {
        if let Ok(config) = load_info.decode::<rustler::tokio::RuntimeConfig>() {
            rustler::tokio::configure(config).ok();
        }
    }
    true
}

Implementation Details

Code Generation

  • Sync NIFs: Standard wrapper (existing behavior)
  • Async NIFs: Detected via sig.asyncness.is_some()
    • Decodes arguments before spawning (owned types requirement)
    • Injects CallerPid if first parameter (doesn't count toward arity)
    • Spawns on runtime_handle(), sends result via OwnedEnv

Runtime Management

  • Lazy-initialized global runtime (OnceCell<Arc<Runtime>>)
  • Falls back to current runtime if already in Tokio context
  • RuntimeConfig: Decodable struct with worker_threads, thread_name, thread_stack_size
  • configure(RuntimeConfig): Configure from Elixir term
  • configure_runtime(|builder|): Programmatic configuration

CallerPid Type

New wrapper type around LocalPid that the macro detects for special handling.

Design Decisions

  1. Global runtime: Single lazy-initialized runtime for simpler resource management, falls back to current runtime if in Tokio context
  2. Message passing: Return :ok immediately, send result via message (non-blocking, BEAM-idiomatic)
  3. Owned types: Arguments decoded before spawning (Env/Term not Send)
  4. CallerPid parameter: Optional first parameter, doesn't affect arity
  5. Application config: Leverage existing use Rustler config merging (standard Elixir pattern)

Testing

  • Basic async operations (computation, owned types, tuples)
  • Concurrent execution validation
  • CallerPid intermediate messages
  • Runtime configuration via Application config
  • All 175 tests pass

Dependencies

Under tokio_rt feature: tokio = "1" (rt, rt-multi-thread, sync), once_cell = "1"

Backward Compatibility

Fully backward-compatible, gated behind tokio_rt feature flag.

Usage Example

# config/config.exs
config :my_app, MyApp.NIF,
  load_data: [worker_threads: 4, thread_name: "my-app"]

# lib/my_app/nif.ex
defmodule MyApp.NIF do
  use Rustler, otp_app: :my_app, crate: :my_nif
  def heavy_computation(_), do: :erlang.nif_error(:nif_not_loaded)
end

# Usage
:ok = MyApp.NIF.heavy_computation("data")
receive do
  result -> IO.puts(result)
after
  5000 -> :timeout
end
#[rustler::nif]
async fn heavy_computation(input: String) -> String {
    tokio::time::sleep(Duration::from_secs(2)).await;
    input.to_uppercase()
}

fn load(_env: Env, load_info: Term) -> bool {
    #[cfg(feature = "tokio_rt")]
    if let Ok(config) = load_info.decode::<rustler::tokio::RuntimeConfig>() {
        rustler::tokio::configure(config).ok();
    }
    true
}

rustler::init!("Elixir.MyApp.NIF", load = load);

@scrogson scrogson requested a review from filmor November 12, 2025 05:11
@filmor
Copy link
Member

filmor commented Nov 12, 2025

This behaviour of the async NIFs is not quite what I was after. Users could implement something like this right now with just a few more lines of code. My intention was to make the async NIFs "block" from the BEAM side, in that they only return once the NIF has run fully through, but yield at all .awaits. That would allow users to implement IO-constrained NIFs without having to rely on dirty scheduling.

I still think that what you have built here has merit:

  • It should use a new macro, something like #[rustler::nif_task] to clearly separate it from NIFs that return what the function returns
  • If we can do without the user implementing load, we should. Maybe lift this RustlerConfig to a "primary" feature that is always tried on the load_info term?
  • Not necessarily for the first attempt, but you are using an extremely small part of Tokio directly, to the point that users could simply bring their own spawn function in the initial configuration. We could have our own AsyncRuntime trait with feature-activated implementations.
  • IMO, everything that can be called fully asynchronously has to return a ref that is included in the result message

@evnu
Copy link
Member

evnu commented Nov 12, 2025

My intention was to make the async NIFs "block" from the BEAM side, in that they only return once the NIF has run fully through, but yield at all .awaits.

Without knowing all previous discussion, this is what I thought about a bit when reading the description as well. From an ergonomics perspective, I think NIFs should handle the same (if possible!) from within Elixir, as a NIF might be exposed to users through libraries. As a user of such a NIF, I'd probably not want to know that I need to expect a message some time in the future, but just block for the long running work.

@scrogson
Copy link
Member Author

scrogson commented Nov 12, 2025

@evnu

As a user of such a NIF, I'd probably not want to know that I need to expect a message some time in the future, but just block for the long running work.

The intention is that the library author of the NIF library should have some public API that exposes it as a "sync" function.

So users of the NIF library wouldn't know any difference.

@scrogson
Copy link
Member Author

@filmor

Users could implement something like this right now with just a few more lines of code.

Yes, that's a fair point. However, I'm tired of implementing this every time I want this functionality.

My intention was to make the async NIFs "block" from the BEAM side, in that they only return once the NIF has run fully through, but yield at all .awaits.

How would you handle a NIF which streams results back to the caller? If the call blocks, the caller can't handle intermediate messages as they arrive.

That would allow users to implement IO-constrained NIFs without having to rely on dirty scheduling.

I need to hear more about what you have in mind here. As I see it, with an async NIF the way it's implemented right now, you don't need dirty scheduling because the work is immediately spawned onto the async runtime.

  • If we can do without the user implementing load, we should. Maybe lift this RustlerConfig to a "primary" feature that is always tried on the load_info term?

I think this is doable. I was thinking about doing this but I held off to reduce scope.

  • It should use a new macro, something like #[rustler::nif_task] to clearly separate it from NIFs that return what the function returns

sure, maybe #[rustler::task]. I need to understand what you mean about the return value. In this case, the return value is whatever the NIF sends to the caller.

  • Not necessarily for the first attempt, but you are using an extremely small part of Tokio directly, to the point that users could simply bring their own spawn function in the initial configuration. We could have our own AsyncRuntime trait with feature-activated implementations.

Say more? Can you expand on an example of how you'd use what you are thinking here? Are you talking about file system and networking APIs?

IMO, everything that can be called fully asynchronously has to return a ref that is included in the result message

agreed 💯

@scrogson scrogson marked this pull request as draft November 13, 2025 19:32
@scrogson scrogson changed the title Async NIFs Unstable: Async Task NIFs Nov 13, 2025
@scrogson
Copy link
Member Author

@filmor @evnu I've updated the functionality based on feedback.

Channel API for Async Tasks

Status: Experimental (requires rustler_unstable cfg flag)

Overview

The Channel API provides type-safe, bidirectional communication between Elixir and Rust async tasks. It replaces the need for manual message handling with a clean, ergonomic interface.

Enabling

Create .cargo/config.toml in your NIF crate directory:

[build]
rustflags = ["--cfg", "rustler_unstable"]

Basic Examples

Example 1: One-Way Communication with Progress Updates

Send progress updates back to Elixir while processing work:

use rustler::runtime::Channel;

#[rustler::task]
async fn process_items(channel: Channel<(), String>, items: Vec<String>) {
    for (i, item) in items.iter().enumerate() {
        tokio::time::sleep(Duration::from_millis(50)).await;

        // Send progress update
        channel.send(format!("Processing {}/{}: {}", i + 1, items.len(), item));
    }

    // Send final result
    channel.finish(format!("Completed {} items", items.len()));
}

Elixir usage:

ref = MyNif.process_items(["task1", "task2", "task3"])

# Receive all messages
receive do
  {^ref, "Completed " <> _ = final} ->
    IO.puts("Done: #{final}")
  {^ref, progress} ->
    IO.puts(progress)
    # Continue receiving...
end

Example 2: Bidirectional Communication with Commands

Build interactive workers that receive commands and send responses:

use rustler::runtime::Channel;

#[derive(rustler::NifTaggedEnum, Clone)]
enum Command {
    Add { value: i64 },
    Multiply { value: i64 },
    GetCurrent,
    Shutdown,
}

#[derive(rustler::NifTaggedEnum, Clone)]
enum Response {
    Updated { old_value: i64, new_value: i64 },
    Current { value: i64 },
    ShuttingDown { final_value: i64 },
}

#[rustler::task]
async fn stateful_worker(channel: Channel<Command, Response>) {
    let mut current_value = 0i64;

    while let Some(cmd) = channel.next().await {
        let response = match cmd {
            Command::Add { value } => {
                let old = current_value;
                current_value += value;
                Response::Updated { old_value: old, new_value: current_value }
            }
            Command::Multiply { value } => {
                let old = current_value;
                current_value *= value;
                Response::Updated { old_value: old, new_value: current_value }
            }
            Command::GetCurrent => {
                Response::Current { value: current_value }
            }
            Command::Shutdown => {
                channel.send(Response::ShuttingDown { final_value: current_value });
                break;
            }
        };

        channel.send(response);
    }

    channel.finish(Response::ShuttingDown { final_value: current_value });
}

// Helper NIF for sending commands
#[rustler::nif]
fn worker_send_command(
    env: rustler::Env,
    sender: rustler::runtime::ChannelSender<Command>,
    command: rustler::Term,
) -> rustler::NifResult<rustler::types::Atom> {
    rustler::runtime::channel::send(env, sender, command)
}

Elixir usage:

# Start worker
worker = MyNif.stateful_worker()

# Send commands
MyNif.worker_send_command(worker, {:add, %{value: 10}})
receive do
  {^worker, {:updated, %{new_value: value}}} ->
    IO.puts("New value: #{value}")
end

MyNif.worker_send_command(worker, {:multiply, %{value: 2}})
receive do
  {^worker, {:updated, %{new_value: value}}} ->
    IO.puts("New value: #{value}")
end

MyNif.worker_send_command(worker, :get_current)
receive do
  {^worker, {:current, %{value: value}}} ->
    IO.puts("Current: #{value}")
end

MyNif.worker_send_command(worker, :shutdown)
receive do
  {^worker, {:shutting_down, %{final_value: value}}} ->
    IO.puts("Final value: #{value}")
end

Key Concepts

Channel Types

  • Channel<(), Response> - One-way: task sends responses to Elixir
  • Channel<Request, Response> - Bidirectional: task receives requests and sends responses

Message Format

All messages are tuples: {channel_sender, payload}

  • channel_sender - The reference returned by the task
  • payload - Your data (type-checked)

Channel Methods

// Receive next request (bidirectional only)
channel.next().await -> Option<Request>

// Send response
channel.send(response)

// Send final response and close
channel.finish(response)

// Get cloneable sender for spawned tasks
channel.responder() -> ResponseSender

Helper for Sending from Elixir

#[rustler::nif]
fn send_to_channel(
    env: rustler::Env,
    sender: rustler::runtime::ChannelSender<YourRequestType>,
    message: rustler::Term,
) -> rustler::NifResult<rustler::types::Atom> {
    rustler::runtime::channel::send(env, sender, message)
}

Runtime Configuration

Configure the Tokio runtime in your load function:

fn load(_env: rustler::Env, _load_info: rustler::Term) -> bool {
    rustler::runtime::builder(|builder| {
        builder
            .worker_threads(4)
            .thread_name("my-nif-worker")
            .thread_stack_size(2 * 1024 * 1024);
    }).is_ok()
}

rustler::init!("Elixir.MyNif", load = load);

Examples

See working examples in rustler_tests/native/rustler_test/src/test_async.rs

Limitations

  • Channel parameter must be first in function signature
  • Tasks with Channel don't need explicit return types
  • All parameters must be owned types (no Env or Term)
  • Requires rustler_unstable cfg flag

Feedback

This is experimental. Let me know what you think.

@scrogson scrogson requested review from evnu and filmor November 13, 2025 20:24
@scrogson scrogson changed the title Unstable: Async Task NIFs Unstable: Async NIFs and Tasks Nov 13, 2025
@scrogson
Copy link
Member Author

@filmor @evnu ok...I got yielding async NIF support with enif_schedule_nif.

Cooperative Yielding NIFs

What is it?

A new way to write long-running NIFs that cooperate with the BEAM scheduler using enif_schedule_nif. They appear synchronous to Elixir while yielding internally.

Usage

use rustler::runtime::yield_now;

#[rustler::nif]
async fn process_large_dataset(items: i64) -> i64 {
    let mut sum = 0;
    for i in 0..items {
        sum += i;
        if i % 100 == 0 {
            yield_now().await;  // Yield to scheduler
        }
    }
    sum
}
# Appears synchronous - blocks until complete
result = MyNif.process_large_dataset(10_000)

Key Differences

#[rustler::nif] async fn #[rustler::task]
Syntax result = nif() ref = nif()receive {^ref, result}
Appearance Synchronous Asynchronous
Return Direct value Reference + message
Mechanism enif_schedule_nif Tokio spawn + enif_send
Use case CPU-bound work that needs to yield I/O-bound or background tasks

How it works

  1. NIF polls async function
  2. If pending, reschedules itself via enif_schedule_nif
  3. BEAM scheduler calls continuation when ready
  4. Repeats until complete
  5. Returns result directly (no messages)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants