170170< a href ="#169 " id ="169 "> 169</ a >
171171< a href ="#170 " id ="170 "> 170</ a >
172172< a href ="#171 " id ="171 "> 171</ a >
173- < a href ="#172 " id ="172 "> 172</ a > </ pre > </ div > < pre class ="rust "> < code > < span class ="kw "> use </ span > std::{future::Future, io};
173+ < a href ="#172 " id ="172 "> 172</ a >
174+ < a href ="#173 " id ="173 "> 173</ a >
175+ < a href ="#174 " id ="174 "> 174</ a >
176+ < a href ="#175 " id ="175 "> 175</ a >
177+ < a href ="#176 " id ="176 "> 176</ a >
178+ < a href ="#177 " id ="177 "> 177</ a > </ pre > </ div > < pre class ="rust "> < code > < span class ="kw "> use </ span > std::{future::Future, io};
174179
175- < span class ="kw "> use </ span > genawaiter::sync::Gen;
176- < span class ="kw "> use </ span > irpc::{channel::mpsc, RpcMessage};
177- < span class ="kw "> use </ span > n0_future::{Stream, StreamExt};
180+ < span class ="kw "> use </ span > irpc::{
181+ channel::{mpsc, RecvError},
182+ RpcMessage,
183+ };
184+ < span class ="kw "> use </ span > n0_future::{stream, Stream, StreamExt};
178185
179186< span class ="doccomment "> /// Trait for an enum that has three variants, item, error, and done.
180187///
307314 E: From<irpc::Error>,
308315 E: From<irpc::channel::RecvError>,
309316 {
310- Gen::new(< span class ="kw "> move </ span > |co| < span class ="kw "> async move </ span > {
311- < span class ="kw "> let </ span > < span class ="kw-2 "> mut </ span > rx = < span class ="kw "> match </ span > < span class ="self "> self</ span > .< span class ="kw "> await </ span > {
312- < span class ="prelude-val "> Ok</ span > (rx) => rx,
313- < span class ="prelude-val "> Err</ span > (e) => {
314- co.yield_(< span class ="prelude-val "> Err</ span > (E::from(e))).< span class ="kw "> await</ span > ;
315- < span class ="kw "> return</ span > ;
316- }
317- };
318- < span class ="kw "> loop </ span > {
319- < span class ="kw "> match </ span > rx.recv().< span class ="kw "> await </ span > {
320- < span class ="prelude-val "> Ok</ span > (< span class ="prelude-val "> Some</ span > (item)) => < span class ="kw "> match </ span > item.into_result_opt() {
321- < span class ="prelude-val "> Some</ span > (< span class ="prelude-val "> Ok</ span > (i)) => co.yield_(< span class ="prelude-val "> Ok</ span > (i)).< span class ="kw "> await</ span > ,
322- < span class ="prelude-val "> Some</ span > (< span class ="prelude-val "> Err</ span > (e)) => {
323- co.yield_(< span class ="prelude-val "> Err</ span > (E::from(e))).< span class ="kw "> await</ span > ;
324- < span class ="kw "> break</ span > ;
325- }
326- < span class ="prelude-val "> None </ span > => < span class ="kw "> break</ span > ,
327- },
328- < span class ="prelude-val "> Ok</ span > (< span class ="prelude-val "> None</ span > ) => {
329- co.yield_(< span class ="prelude-val "> Err</ span > (E::from(irpc::channel::RecvError::Io(io::Error::new(
330- io::ErrorKind::UnexpectedEof,
331- < span class ="string "> "unexpected end of stream"</ span > ,
332- )))))
333- .< span class ="kw "> await</ span > ;
334- < span class ="kw "> break</ span > ;
335- }
336- < span class ="prelude-val "> Err</ span > (e) => {
337- co.yield_(< span class ="prelude-val "> Err</ span > (E::from(e))).< span class ="kw "> await</ span > ;
338- < span class ="kw "> break</ span > ;
339- }
340- }
317+ < span class ="kw "> enum </ span > State<S, T> {
318+ Init(S),
319+ Receiving(mpsc::Receiver<T>),
320+ Done,
321+ }
322+ < span class ="kw "> fn </ span > eof() -> RecvError {
323+ io::Error::new(io::ErrorKind::UnexpectedEof, < span class ="string "> "unexpected end of stream"</ span > ).into()
324+ }
325+ < span class ="kw "> async fn </ span > process_recv<S, T, E>(
326+ < span class ="kw-2 "> mut </ span > rx: mpsc::Receiver<T>,
327+ ) -> < span class ="prelude-ty "> Option</ span > <(std::result::Result<T::Item, E>, State<S, T>)>
328+ < span class ="kw "> where
329+ </ span > T: IrpcStreamItem,
330+ E: From<T::Error>,
331+ E: From<irpc::Error>,
332+ E: From<RecvError>,
333+ {
334+ < span class ="kw "> match </ span > rx.recv().< span class ="kw "> await </ span > {
335+ < span class ="prelude-val "> Ok</ span > (< span class ="prelude-val "> Some</ span > (item)) => < span class ="kw "> match </ span > item.into_result_opt()< span class ="question-mark "> ? </ span > {
336+ < span class ="prelude-val "> Ok</ span > (i) => < span class ="prelude-val "> Some</ span > ((< span class ="prelude-val "> Ok</ span > (i), State::Receiving(rx))),
337+ < span class ="prelude-val "> Err</ span > (e) => < span class ="prelude-val "> Some</ span > ((< span class ="prelude-val "> Err</ span > (E::from(e)), State::Done)),
338+ },
339+ < span class ="prelude-val "> Ok</ span > (< span class ="prelude-val "> None</ span > ) => < span class ="prelude-val "> Some</ span > ((< span class ="prelude-val "> Err</ span > (E::from(eof())), State::Done)),
340+ < span class ="prelude-val "> Err</ span > (e) => < span class ="prelude-val "> Some</ span > ((< span class ="prelude-val "> Err</ span > (E::from(e)), State::Done)),
341+ }
342+ }
343+ Box::pin(stream::unfold(State::Init(< span class ="self "> self</ span > ), |state| < span class ="kw "> async move </ span > {
344+ < span class ="kw "> match </ span > state {
345+ State::Init(fut) => < span class ="kw "> match </ span > fut.< span class ="kw "> await </ span > {
346+ < span class ="prelude-val "> Ok</ span > (rx) => process_recv(rx).< span class ="kw "> await</ span > ,
347+ < span class ="prelude-val "> Err</ span > (e) => < span class ="prelude-val "> Some</ span > ((< span class ="prelude-val "> Err</ span > (E::from(e)), State::Done)),
348+ },
349+ State::Receiving(rx) => process_recv(rx).< span class ="kw "> await</ span > ,
350+ State::Done => < span class ="prelude-val "> None</ span > ,
341351 }
342- })
352+ }))
343353 }
344354}
345355</ code > </ pre > </ div > </ section > </ main > </ body > </ html >
0 commit comments