Skip to content

Commit 2cdc1a0

Browse files
authored
refactor: Modify error types (#69)
mpsc::Receiver::recv returns a Result<Option<T>, RecvError>. RecvError has a case `SenderClosed`. So we have 2 different ways to report that the remote has closed the sender normally. Ok(None) and Err(RecvError::SenderClosed). This is confusing. SenderClosed isn't really an error, it is just what you get if the other side closes the stream. So this PR creates separate RecvError for oneshot and mpsc. In oneshot we still need the SenderClosed case since we expect exactly 1 item, and therefore the receiver being closed before that item is an error. In mpsc we remove the SenderClosed error and just report Ok(None) in that case.
1 parent 6c74d27 commit 2cdc1a0

File tree

4 files changed

+106
-64
lines changed

4 files changed

+106
-64
lines changed

irpc-iroh/src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use iroh::{
88
protocol::{AcceptError, ProtocolHandler},
99
};
1010
use irpc::{
11-
channel::RecvError,
11+
channel::oneshot,
1212
rpc::{
1313
Handler, RemoteConnection, RemoteService, ERROR_CODE_MAX_MESSAGE_SIZE_EXCEEDED,
1414
MAX_MESSAGE_SIZE,
@@ -257,7 +257,7 @@ pub async fn read_request_raw<R: DeserializeOwned + 'static>(
257257
ERROR_CODE_MAX_MESSAGE_SIZE_EXCEEDED.into(),
258258
b"request exceeded max message size",
259259
);
260-
return Err(RecvError::MaxMessageSizeExceeded.into());
260+
return Err(oneshot::RecvError::MaxMessageSizeExceeded.into());
261261
}
262262
let mut buf = vec![0; size as usize];
263263
recv.read_exact(&mut buf)

src/lib.rs

Lines changed: 87 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -371,13 +371,47 @@ pub mod channel {
371371

372372
/// Oneshot channel, similar to tokio's oneshot channel
373373
pub mod oneshot {
374-
use std::{fmt::Debug, future::Future, pin::Pin, task};
374+
use std::{fmt::Debug, future::Future, io, pin::Pin, task};
375375

376376
use n0_future::future::Boxed as BoxFuture;
377377

378-
use super::{RecvError, SendError};
378+
use super::SendError;
379379
use crate::util::FusedOneshotReceiver;
380380

381+
/// Error when receiving a oneshot or mpsc message. For local communication,
382+
/// the only thing that can go wrong is that the sender has been closed.
383+
///
384+
/// For rpc communication, there can be any number of errors, so this is a
385+
/// generic io error.
386+
#[derive(Debug, thiserror::Error)]
387+
pub enum RecvError {
388+
/// The sender has been closed. This is the only error that can occur
389+
/// for local communication.
390+
#[error("sender closed")]
391+
SenderClosed,
392+
/// The message exceeded the maximum allowed message size (see [`MAX_MESSAGE_SIZE`]).
393+
///
394+
/// [`MAX_MESSAGE_SIZE`]: crate::rpc::MAX_MESSAGE_SIZE
395+
#[error("maximum message size exceeded")]
396+
MaxMessageSizeExceeded,
397+
/// An io error occurred. This can occur for remote communication,
398+
/// due to a network error or deserialization error.
399+
#[error("io error: {0}")]
400+
Io(#[from] io::Error),
401+
}
402+
403+
impl From<RecvError> for io::Error {
404+
fn from(e: RecvError) -> Self {
405+
match e {
406+
RecvError::Io(e) => e,
407+
RecvError::SenderClosed => io::Error::new(io::ErrorKind::BrokenPipe, e),
408+
RecvError::MaxMessageSizeExceeded => {
409+
io::Error::new(io::ErrorKind::InvalidData, e)
410+
}
411+
}
412+
}
413+
}
414+
381415
/// Create a local oneshot sender and receiver pair.
382416
///
383417
/// This is currently using a tokio channel pair internally.
@@ -586,9 +620,38 @@ pub mod channel {
586620
///
587621
/// For the rpc case, the send side can not be cloned, hence mpsc instead of mpsc.
588622
pub mod mpsc {
589-
use std::{fmt::Debug, future::Future, marker::PhantomData, pin::Pin, sync::Arc};
623+
use std::{fmt::Debug, future::Future, io, marker::PhantomData, pin::Pin, sync::Arc};
590624

591-
use super::{RecvError, SendError};
625+
use super::SendError;
626+
627+
/// Error when receiving a oneshot or mpsc message. For local communication,
628+
/// the only thing that can go wrong is that the sender has been closed.
629+
///
630+
/// For rpc communication, there can be any number of errors, so this is a
631+
/// generic io error.
632+
#[derive(Debug, thiserror::Error)]
633+
pub enum RecvError {
634+
/// The message exceeded the maximum allowed message size (see [`MAX_MESSAGE_SIZE`]).
635+
///
636+
/// [`MAX_MESSAGE_SIZE`]: crate::rpc::MAX_MESSAGE_SIZE
637+
#[error("maximum message size exceeded")]
638+
MaxMessageSizeExceeded,
639+
/// An io error occurred. This can occur for remote communication,
640+
/// due to a network error or deserialization error.
641+
#[error("io error: {0}")]
642+
Io(#[from] io::Error),
643+
}
644+
645+
impl From<RecvError> for io::Error {
646+
fn from(e: RecvError) -> Self {
647+
match e {
648+
RecvError::Io(e) => e,
649+
RecvError::MaxMessageSizeExceeded => {
650+
io::Error::new(io::ErrorKind::InvalidData, e)
651+
}
652+
}
653+
}
654+
}
592655

593656
/// Create a local mpsc sender and receiver pair, with the given buffer size.
594657
///
@@ -1067,38 +1130,6 @@ pub mod channel {
10671130
}
10681131
}
10691132
}
1070-
1071-
/// Error when receiving a oneshot or mpsc message. For local communication,
1072-
/// the only thing that can go wrong is that the sender has been closed.
1073-
///
1074-
/// For rpc communication, there can be any number of errors, so this is a
1075-
/// generic io error.
1076-
#[derive(Debug, thiserror::Error)]
1077-
pub enum RecvError {
1078-
/// The sender has been closed. This is the only error that can occur
1079-
/// for local communication.
1080-
#[error("sender closed")]
1081-
SenderClosed,
1082-
/// The message exceeded the maximum allowed message size (see [`MAX_MESSAGE_SIZE`]).
1083-
///
1084-
/// [`MAX_MESSAGE_SIZE`]: crate::rpc::MAX_MESSAGE_SIZE
1085-
#[error("maximum message size exceeded")]
1086-
MaxMessageSizeExceeded,
1087-
/// An io error occurred. This can occur for remote communication,
1088-
/// due to a network error or deserialization error.
1089-
#[error("io error: {0}")]
1090-
Io(#[from] io::Error),
1091-
}
1092-
1093-
impl From<RecvError> for io::Error {
1094-
fn from(e: RecvError) -> Self {
1095-
match e {
1096-
RecvError::Io(e) => e,
1097-
RecvError::SenderClosed => io::Error::new(io::ErrorKind::BrokenPipe, e),
1098-
RecvError::MaxMessageSizeExceeded => io::Error::new(io::ErrorKind::InvalidData, e),
1099-
}
1100-
}
1101-
}
11021133
}
11031134

11041135
/// A wrapper for a message with channels to send and receive it.
@@ -1682,8 +1713,10 @@ pub enum Error {
16821713
Request(#[from] RequestError),
16831714
#[error("send error: {0}")]
16841715
Send(#[from] channel::SendError),
1685-
#[error("recv error: {0}")]
1686-
Recv(#[from] channel::RecvError),
1716+
#[error("mpsc recv error: {0}")]
1717+
MpscRecv(#[from] channel::mpsc::RecvError),
1718+
#[error("oneshot recv error: {0}")]
1719+
OneshotRecv(#[from] channel::oneshot::RecvError),
16871720
#[cfg(feature = "rpc")]
16881721
#[error("recv error: {0}")]
16891722
Write(#[from] rpc::WriteError),
@@ -1697,7 +1730,8 @@ impl From<Error> for io::Error {
16971730
match e {
16981731
Error::Request(e) => e.into(),
16991732
Error::Send(e) => e.into(),
1700-
Error::Recv(e) => e.into(),
1733+
Error::MpscRecv(e) => e.into(),
1734+
Error::OneshotRecv(e) => e.into(),
17011735
#[cfg(feature = "rpc")]
17021736
Error::Write(e) => e.into(),
17031737
}
@@ -1772,7 +1806,7 @@ pub mod rpc {
17721806
channel::{
17731807
mpsc::{self, DynReceiver, DynSender},
17741808
none::NoSender,
1775-
oneshot, RecvError, SendError,
1809+
oneshot, SendError,
17761810
},
17771811
util::{now_or_never, AsyncReadVarintExt, WriteVarintExt},
17781812
LocalSender, RequestError, RpcMessage, Service,
@@ -1970,16 +2004,13 @@ pub mod rpc {
19702004
impl<T: DeserializeOwned> From<quinn::RecvStream> for oneshot::Receiver<T> {
19712005
fn from(mut read: quinn::RecvStream) -> Self {
19722006
let fut = async move {
1973-
let size = read
1974-
.read_varint_u64()
1975-
.await?
1976-
.ok_or(RecvError::Io(io::Error::new(
1977-
io::ErrorKind::UnexpectedEof,
1978-
"failed to read size",
1979-
)))?;
2007+
let size = read.read_varint_u64().await?.ok_or(io::Error::new(
2008+
io::ErrorKind::UnexpectedEof,
2009+
"failed to read size",
2010+
))?;
19802011
if size > MAX_MESSAGE_SIZE {
19812012
read.stop(ERROR_CODE_MAX_MESSAGE_SIZE_EXCEEDED.into()).ok();
1982-
return Err(RecvError::MaxMessageSizeExceeded);
2013+
return Err(oneshot::RecvError::MaxMessageSizeExceeded);
19832014
}
19842015
let rest = read
19852016
.read_to_end(size as usize)
@@ -2076,7 +2107,12 @@ pub mod rpc {
20762107
fn recv(
20772108
&mut self,
20782109
) -> Pin<
2079-
Box<dyn Future<Output = std::result::Result<Option<T>, RecvError>> + Send + Sync + '_>,
2110+
Box<
2111+
dyn Future<Output = std::result::Result<Option<T>, mpsc::RecvError>>
2112+
+ Send
2113+
+ Sync
2114+
+ '_,
2115+
>,
20802116
> {
20812117
Box::pin(async {
20822118
let read = &mut self.recv;
@@ -2087,7 +2123,7 @@ pub mod rpc {
20872123
self.recv
20882124
.stop(ERROR_CODE_MAX_MESSAGE_SIZE_EXCEEDED.into())
20892125
.ok();
2090-
return Err(RecvError::MaxMessageSizeExceeded);
2126+
return Err(mpsc::RecvError::MaxMessageSizeExceeded);
20912127
}
20922128
let mut buf = vec![0; size as usize];
20932129
read.read_exact(&mut buf)
@@ -2375,7 +2411,7 @@ pub mod rpc {
23752411
ERROR_CODE_MAX_MESSAGE_SIZE_EXCEEDED.into(),
23762412
b"request exceeded max message size",
23772413
);
2378-
return Err(RecvError::MaxMessageSizeExceeded.into());
2414+
return Err(mpsc::RecvError::MaxMessageSizeExceeded.into());
23792415
}
23802416
let mut buf = vec![0; size as usize];
23812417
recv.read_exact(&mut buf)

tests/mpsc_channel.rs

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,10 @@ use std::{
66
};
77

88
use irpc::{
9-
channel::{mpsc, RecvError, SendError},
9+
channel::{
10+
mpsc::{self, Receiver, RecvError},
11+
SendError,
12+
},
1013
util::AsyncWriteVarintExt,
1114
};
1215
use quinn::Endpoint;
@@ -122,7 +125,7 @@ async fn vec_receiver(server: Endpoint) -> Result<(), RecvError> {
122125
.accept_bi()
123126
.await
124127
.map_err(|e| RecvError::Io(e.into()))?;
125-
let mut recv = mpsc::Receiver::<Vec<u8>>::from(recv);
128+
let mut recv = Receiver::<Vec<u8>>::from(recv);
126129
while recv.recv().await?.is_some() {}
127130
Err(RecvError::Io(io::ErrorKind::UnexpectedEof.into()))
128131
}
@@ -145,7 +148,7 @@ async fn mpsc_max_message_size_send() -> TestResult<()> {
145148
let Err(cause) = server.await? else {
146149
panic!("server should have failed due to max message size");
147150
};
148-
assert!(matches!(cause, RecvError::Io(e) if e.kind() == ErrorKind::ConnectionReset));
151+
assert!(matches!(cause, mpsc::RecvError::Io(e) if e.kind() == ErrorKind::ConnectionReset));
149152
Ok(())
150153
}
151154

@@ -165,24 +168,24 @@ async fn mpsc_max_message_size_recv() -> TestResult<()> {
165168
let Err(cause) = server.await? else {
166169
panic!("server should have failed due to max message size");
167170
};
168-
assert!(matches!(cause, RecvError::MaxMessageSizeExceeded));
171+
assert!(matches!(cause, mpsc::RecvError::MaxMessageSizeExceeded));
169172
Ok(())
170173
}
171174

172-
async fn noser_receiver(server: Endpoint) -> Result<(), RecvError> {
175+
async fn noser_receiver(server: Endpoint) -> Result<(), mpsc::RecvError> {
173176
let conn = server
174177
.accept()
175178
.await
176179
.unwrap()
177180
.await
178-
.map_err(|e| RecvError::Io(e.into()))?;
181+
.map_err(|e| mpsc::RecvError::Io(e.into()))?;
179182
let (_, recv) = conn
180183
.accept_bi()
181184
.await
182-
.map_err(|e| RecvError::Io(e.into()))?;
185+
.map_err(|e| mpsc::RecvError::Io(e.into()))?;
183186
let mut recv = mpsc::Receiver::<NoSer>::from(recv);
184187
while recv.recv().await?.is_some() {}
185-
Err(RecvError::Io(io::ErrorKind::UnexpectedEof.into()))
188+
Err(mpsc::RecvError::Io(io::ErrorKind::UnexpectedEof.into()))
186189
}
187190

188191
/// Checks that a serialization error is caught and propagated to the receiver.
@@ -203,7 +206,7 @@ async fn mpsc_serialize_error_send() -> TestResult<()> {
203206
let Err(cause) = server.await? else {
204207
panic!("server should have failed due to serialization error");
205208
};
206-
assert!(matches!(cause, RecvError::Io(e) if e.kind() == ErrorKind::ConnectionReset));
209+
assert!(matches!(cause, mpsc::RecvError::Io(e) if e.kind() == ErrorKind::ConnectionReset));
207210
Ok(())
208211
}
209212

@@ -220,6 +223,6 @@ async fn mpsc_serialize_error_recv() -> TestResult<()> {
220223
let Err(cause) = server.await? else {
221224
panic!("server should have failed due to serialization error");
222225
};
223-
assert!(matches!(cause, RecvError::Io(e) if e.kind() == ErrorKind::InvalidData));
226+
assert!(matches!(cause, mpsc::RecvError::Io(e) if e.kind() == ErrorKind::InvalidData));
224227
Ok(())
225228
}

tests/oneshot_channel.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,10 @@
33
use std::io::{self, ErrorKind};
44

55
use irpc::{
6-
channel::{oneshot, RecvError, SendError},
6+
channel::{
7+
oneshot::{self, RecvError},
8+
SendError,
9+
},
710
util::AsyncWriteVarintExt,
811
};
912
use quinn::Endpoint;

0 commit comments

Comments
 (0)