1818//! [iroh]: https://docs.rs/iroh
1919use std:: {
2020 fmt:: { self , Debug } ,
21+ io,
2122 time:: { Duration , Instant } ,
2223} ;
2324
2425use anyhow:: Result ;
2526use bao_tree:: { io:: fsm:: BaoContentItem , ChunkNum } ;
2627use fsm:: RequestCounters ;
27- use iroh:: endpoint:: { RecvStream , SendStream } ;
28- use iroh_io:: { TokioStreamReader , TokioStreamWriter } ;
28+ use iroh_io:: { AsyncStreamReader , AsyncStreamWriter } ;
2929use n0_snafu:: SpanTrace ;
3030use nested_enum_utils:: common_fields;
31+ use quinn:: ReadExactError ;
3132use serde:: { Deserialize , Serialize } ;
3233use snafu:: { Backtrace , IntoError , ResultExt , Snafu } ;
34+ use tokio:: io:: AsyncWriteExt ;
3335use tracing:: { debug, error} ;
3436
3537use crate :: { protocol:: ChunkRangesSeq , store:: IROH_BLOCK_SIZE , Hash } ;
@@ -39,8 +41,49 @@ pub mod request;
3941pub ( crate ) use error:: get_error;
4042pub use error:: { GetError , GetResult } ;
4143
42- type DefaultReader = TokioStreamReader < RecvStream > ;
43- type DefaultWriter = TokioStreamWriter < SendStream > ;
44+ pub struct IrohStreamWriter ( iroh:: endpoint:: SendStream ) ;
45+
46+ impl AsyncStreamWriter for IrohStreamWriter {
47+ async fn write ( & mut self , data : & [ u8 ] ) -> io:: Result < ( ) > {
48+ Ok ( self . 0 . write_all ( data) . await ?)
49+ }
50+
51+ async fn write_bytes ( & mut self , data : bytes:: Bytes ) -> io:: Result < ( ) > {
52+ Ok ( self . 0 . write_chunk ( data) . await ?)
53+ }
54+
55+ async fn sync ( & mut self ) -> io:: Result < ( ) > {
56+ Ok ( self . 0 . flush ( ) . await ?)
57+ }
58+ }
59+
60+ pub struct IrohStreamReader ( iroh:: endpoint:: RecvStream ) ;
61+
62+ impl AsyncStreamReader for IrohStreamReader {
63+ async fn read < const N : usize > ( & mut self ) -> io:: Result < [ u8 ; N ] > {
64+ let mut buf = [ 0u8 ; N ] ;
65+ match self . 0 . read_exact ( & mut buf) . await {
66+ Ok ( ( ) ) => Ok ( buf) ,
67+ Err ( ReadExactError :: ReadError ( e) ) => Err ( e. into ( ) ) ,
68+ Err ( ReadExactError :: FinishedEarly ( _) ) => Err ( io:: ErrorKind :: UnexpectedEof . into ( ) ) ,
69+ }
70+ }
71+
72+ async fn read_bytes ( & mut self , len : usize ) -> io:: Result < bytes:: Bytes > {
73+ let mut buf = vec ! [ 0u8 ; len] ;
74+ match self . 0 . read_exact ( & mut buf) . await {
75+ Ok ( ( ) ) => Ok ( buf. into ( ) ) ,
76+ Err ( ReadExactError :: ReadError ( e) ) => Err ( e. into ( ) ) ,
77+ Err ( ReadExactError :: FinishedEarly ( n) ) => {
78+ buf. truncate ( n) ;
79+ Ok ( buf. into ( ) )
80+ }
81+ }
82+ }
83+ }
84+
85+ type DefaultReader = IrohStreamReader ;
86+ type DefaultWriter = IrohStreamWriter ;
4487
4588/// Stats about the transfer.
4689#[ derive(
@@ -96,7 +139,7 @@ pub mod fsm {
96139 } ;
97140 use derive_more:: From ;
98141 use iroh:: endpoint:: Connection ;
99- use iroh_io:: { AsyncSliceWriter , AsyncStreamReader , AsyncStreamWriter , TokioStreamReader } ;
142+ use iroh_io:: { AsyncSliceWriter , AsyncStreamReader , AsyncStreamWriter } ;
100143
101144 use super :: * ;
102145 use crate :: {
@@ -134,8 +177,8 @@ pub mod fsm {
134177 . open_bi ( )
135178 . await
136179 . map_err ( |e| OpenSnafu . into_error ( e. into ( ) ) ) ?;
137- let reader = TokioStreamReader :: new ( reader) ;
138- let mut writer = TokioStreamWriter ( writer) ;
180+ let reader = IrohStreamReader ( reader) ;
181+ let mut writer = IrohStreamWriter ( writer) ;
139182 let request = Request :: GetMany ( request) ;
140183 let request_bytes = postcard:: to_stdvec ( & request)
141184 . map_err ( |source| BadRequestSnafu . into_error ( source. into ( ) ) ) ?;
@@ -227,8 +270,8 @@ pub mod fsm {
227270 . open_bi ( )
228271 . await
229272 . map_err ( |e| OpenSnafu . into_error ( e. into ( ) ) ) ?;
230- let reader = TokioStreamReader :: new ( reader) ;
231- let writer = TokioStreamWriter ( writer) ;
273+ let reader = IrohStreamReader ( reader) ;
274+ let writer = IrohStreamWriter ( writer) ;
232275 Ok ( AtConnected {
233276 start,
234277 reader,
@@ -375,7 +418,7 @@ pub mod fsm {
375418
376419 /// State of the get response when we start reading a collection
377420 #[ derive( Debug ) ]
378- pub struct AtStartRoot < R : AsyncStreamReader = TokioStreamReader < RecvStream > > {
421+ pub struct AtStartRoot < R : AsyncStreamReader = DefaultReader > {
379422 ranges : ChunkRanges ,
380423 reader : R ,
381424 misc : Box < Misc > ,
@@ -384,7 +427,7 @@ pub mod fsm {
384427
385428 /// State of the get response when we start reading a child
386429 #[ derive( Debug ) ]
387- pub struct AtStartChild < R : AsyncStreamReader = TokioStreamReader < RecvStream > > {
430+ pub struct AtStartChild < R : AsyncStreamReader = DefaultReader > {
388431 ranges : ChunkRanges ,
389432 reader : R ,
390433 misc : Box < Misc > ,
@@ -459,7 +502,7 @@ pub mod fsm {
459502
460503 /// State before reading a size header
461504 #[ derive( Debug ) ]
462- pub struct AtBlobHeader < R : AsyncStreamReader = TokioStreamReader < RecvStream > > {
505+ pub struct AtBlobHeader < R : AsyncStreamReader = DefaultReader > {
463506 ranges : ChunkRanges ,
464507 reader : R ,
465508 misc : Box < Misc > ,
@@ -587,7 +630,7 @@ pub mod fsm {
587630
588631 /// State while we are reading content
589632 #[ derive( Debug ) ]
590- pub struct AtBlobContent < R : AsyncStreamReader = TokioStreamReader < RecvStream > > {
633+ pub struct AtBlobContent < R : AsyncStreamReader = DefaultReader > {
591634 stream : ResponseDecoder < R > ,
592635 misc : Box < Misc > ,
593636 }
@@ -683,17 +726,17 @@ pub mod fsm {
683726 impl From < bao_tree:: io:: DecodeError > for DecodeError {
684727 fn from ( value : bao_tree:: io:: DecodeError ) -> Self {
685728 match value {
686- bao_tree:: io:: DecodeError :: ParentNotFound ( x ) => {
687- decode_error:: ParentNotFoundSnafu { node : x } . build ( )
729+ bao_tree:: io:: DecodeError :: ParentNotFound ( node ) => {
730+ decode_error:: ParentNotFoundSnafu { node } . build ( )
688731 }
689- bao_tree:: io:: DecodeError :: LeafNotFound ( x ) => {
690- decode_error:: LeafNotFoundSnafu { num : x } . build ( )
732+ bao_tree:: io:: DecodeError :: LeafNotFound ( num ) => {
733+ decode_error:: LeafNotFoundSnafu { num } . build ( )
691734 }
692735 bao_tree:: io:: DecodeError :: ParentHashMismatch ( node) => {
693736 decode_error:: ParentHashMismatchSnafu { node } . build ( )
694737 }
695- bao_tree:: io:: DecodeError :: LeafHashMismatch ( chunk ) => {
696- decode_error:: LeafHashMismatchSnafu { num : chunk } . build ( )
738+ bao_tree:: io:: DecodeError :: LeafHashMismatch ( num ) => {
739+ decode_error:: LeafHashMismatchSnafu { num } . build ( )
697740 }
698741 bao_tree:: io:: DecodeError :: Io ( cause) => decode_error:: ReadSnafu . into_error ( cause) ,
699742 }
@@ -876,14 +919,14 @@ pub mod fsm {
876919
877920 /// State after we have read all the content for a blob
878921 #[ derive( Debug ) ]
879- pub struct AtEndBlob < R : AsyncStreamReader = TokioStreamReader < RecvStream > > {
922+ pub struct AtEndBlob < R : AsyncStreamReader = DefaultReader > {
880923 stream : R ,
881924 misc : Box < Misc > ,
882925 }
883926
884927 /// The next state after the end of a blob
885928 #[ derive( Debug , From ) ]
886- pub enum EndBlobNext < R : AsyncStreamReader = TokioStreamReader < RecvStream > > {
929+ pub enum EndBlobNext < R : AsyncStreamReader = DefaultReader > {
887930 /// Response is expected to have more children
888931 MoreChildren ( AtStartChild < R > ) ,
889932 /// No more children expected
@@ -909,7 +952,7 @@ pub mod fsm {
909952
910953 /// State when finishing the get response
911954 #[ derive( Debug ) ]
912- pub struct AtClosing < R : AsyncStreamReader = TokioStreamReader < RecvStream > > {
955+ pub struct AtClosing < R : AsyncStreamReader = DefaultReader > {
913956 misc : Box < Misc > ,
914957 reader : R ,
915958 check_extra_data : bool ,
0 commit comments