|
1 | 1 | use std::{future::Future, io}; |
2 | 2 |
|
3 | | -use genawaiter::sync::Gen; |
4 | | -use irpc::{channel::mpsc, RpcMessage}; |
5 | | -use n0_future::{Stream, StreamExt}; |
| 3 | +use irpc::{ |
| 4 | + channel::{mpsc, RecvError}, |
| 5 | + RpcMessage, |
| 6 | +}; |
| 7 | +use n0_future::{stream, Stream, StreamExt}; |
6 | 8 |
|
7 | 9 | /// Trait for an enum that has three variants, item, error, and done. |
8 | 10 | /// |
@@ -135,38 +137,41 @@ where |
135 | 137 | E: From<irpc::Error>, |
136 | 138 | E: From<irpc::channel::RecvError>, |
137 | 139 | { |
138 | | - Gen::new(move |co| async move { |
139 | | - let mut rx = match self.await { |
140 | | - Ok(rx) => rx, |
141 | | - Err(e) => { |
142 | | - co.yield_(Err(E::from(e))).await; |
143 | | - return; |
144 | | - } |
145 | | - }; |
146 | | - loop { |
147 | | - match rx.recv().await { |
148 | | - Ok(Some(item)) => match item.into_result_opt() { |
149 | | - Some(Ok(i)) => co.yield_(Ok(i)).await, |
150 | | - Some(Err(e)) => { |
151 | | - co.yield_(Err(E::from(e))).await; |
152 | | - break; |
153 | | - } |
154 | | - None => break, |
155 | | - }, |
156 | | - Ok(None) => { |
157 | | - co.yield_(Err(E::from(irpc::channel::RecvError::Io(io::Error::new( |
158 | | - io::ErrorKind::UnexpectedEof, |
159 | | - "unexpected end of stream", |
160 | | - ))))) |
161 | | - .await; |
162 | | - break; |
163 | | - } |
164 | | - Err(e) => { |
165 | | - co.yield_(Err(E::from(e))).await; |
166 | | - break; |
167 | | - } |
168 | | - } |
| 140 | + enum State<S, T> { |
| 141 | + Init(S), |
| 142 | + Receiving(mpsc::Receiver<T>), |
| 143 | + Done, |
| 144 | + } |
| 145 | + fn eof() -> RecvError { |
| 146 | + io::Error::new(io::ErrorKind::UnexpectedEof, "unexpected end of stream").into() |
| 147 | + } |
| 148 | + async fn process_recv<S, T, E>( |
| 149 | + mut rx: mpsc::Receiver<T>, |
| 150 | + ) -> Option<(std::result::Result<T::Item, E>, State<S, T>)> |
| 151 | + where |
| 152 | + T: IrpcStreamItem, |
| 153 | + E: From<T::Error>, |
| 154 | + E: From<irpc::Error>, |
| 155 | + E: From<RecvError>, |
| 156 | + { |
| 157 | + match rx.recv().await { |
| 158 | + Ok(Some(item)) => match item.into_result_opt()? { |
| 159 | + Ok(i) => Some((Ok(i), State::Receiving(rx))), |
| 160 | + Err(e) => Some((Err(E::from(e)), State::Done)), |
| 161 | + }, |
| 162 | + Ok(None) => Some((Err(E::from(eof())), State::Done)), |
| 163 | + Err(e) => Some((Err(E::from(e)), State::Done)), |
| 164 | + } |
| 165 | + } |
| 166 | + Box::pin(stream::unfold(State::Init(self), |state| async move { |
| 167 | + match state { |
| 168 | + State::Init(fut) => match fut.await { |
| 169 | + Ok(rx) => process_recv(rx).await, |
| 170 | + Err(e) => Some((Err(E::from(e)), State::Done)), |
| 171 | + }, |
| 172 | + State::Receiving(rx) => process_recv(rx).await, |
| 173 | + State::Done => None, |
169 | 174 | } |
170 | | - }) |
| 175 | + })) |
171 | 176 | } |
172 | 177 | } |
0 commit comments