diff --git a/concurrency/src/tasks/mod.rs b/concurrency/src/tasks/mod.rs index 65ce84b..8333472 100644 --- a/concurrency/src/tasks/mod.rs +++ b/concurrency/src/tasks/mod.rs @@ -16,5 +16,5 @@ pub use gen_server::{ InitResult::NoSuccess, InitResult::Success, }; pub use process::{send, Process, ProcessInfo}; -pub use stream::spawn_listener; +pub use stream::{spawn_listener, spawn_spawner}; pub use time::{send_after, send_interval}; diff --git a/concurrency/src/tasks/stream.rs b/concurrency/src/tasks/stream.rs index 492c4f9..ba549db 100644 --- a/concurrency/src/tasks/stream.rs +++ b/concurrency/src/tasks/stream.rs @@ -1,3 +1,5 @@ +use std::net::SocketAddr; + use crate::tasks::{GenServer, GenServerHandle}; use futures::{future::select, Stream, StreamExt}; use spawned_rt::tasks::JoinHandle; @@ -41,3 +43,25 @@ where }); join_handle } + +/// Spawns a task that listens to a stream of streams and spawns a GenServer that +/// handles messages from that stream. +/// Each GenServer is created using the provided factory function, and spawned +/// using [`GenServer::start`]. +pub fn spawn_spawner(listener: L, mut factory: F) -> JoinHandle<()> +where + L: Send + Stream + 'static, + S: Send + Stream + 'static, + T: GenServer + 'static, + F: FnMut(SocketAddr) -> T + Send + 'static, +{ + spawned_rt::tasks::spawn(async move { + let mut listener = core::pin::pin!(listener); + // TODO: listener can't be a stream since tokio::net::TcpListener does not implement Stream + while let Some((stream, addr)) = listener.next().await { + let handle = factory(addr).start(); + // TODO: handle error + spawn_listener(handle, stream).await.unwrap(); + } + }) +}