@@ -12,6 +12,7 @@ use std::{
1212use anyhow:: { Context , Result } ;
1313use bao_tree:: ChunkRanges ;
1414use iroh:: endpoint:: { self , RecvStream , SendStream } ;
15+ use iroh_io:: { AsyncStreamReader , AsyncStreamWriter } ;
1516use n0_future:: StreamExt ;
1617use quinn:: { ClosedStream , ConnectionError , ReadToEndError } ;
1718use serde:: { de:: DeserializeOwned , Deserialize , Serialize } ;
@@ -23,6 +24,7 @@ use crate::{
2324 blobs:: { Bitfield , WriteProgress } ,
2425 ExportBaoResult , Store ,
2526 } ,
27+ get:: { IrohStreamReader , IrohStreamWriter } ,
2628 hashseq:: HashSeq ,
2729 protocol:: { GetManyRequest , GetRequest , ObserveItem , ObserveRequest , PushRequest , Request } ,
2830 provider:: events:: { ClientConnected , ClientResult , ConnectionClosed , RequestTracker } ,
@@ -31,6 +33,9 @@ use crate::{
3133pub mod events;
3234use events:: EventSender ;
3335
36+ type DefaultWriter = IrohStreamWriter ;
37+ type DefaultReader = IrohStreamReader ;
38+
3439/// Statistics about a successful or failed transfer.
3540#[ derive( Debug , Serialize , Deserialize ) ]
3641pub struct TransferStats {
@@ -106,7 +111,7 @@ impl StreamPair {
106111 return Err ( e) ;
107112 } ;
108113 Ok ( ProgressWriter :: new (
109- self . writer ,
114+ IrohStreamWriter ( self . writer ) ,
110115 WriterContext {
111116 t0 : self . t0 ,
112117 other_bytes_read : self . other_bytes_read ,
@@ -130,7 +135,7 @@ impl StreamPair {
130135 return Err ( e) ;
131136 } ;
132137 Ok ( ProgressReader {
133- inner : self . reader ,
138+ inner : IrohStreamReader ( self . reader ) ,
134139 context : ReaderContext {
135140 t0 : self . t0 ,
136141 other_bytes_read : self . other_bytes_read ,
@@ -282,14 +287,14 @@ impl WriteProgress for WriterContext {
282287
283288/// Wrapper for a [`quinn::SendStream`] with additional per request information.
284289#[ derive( Debug ) ]
285- pub struct ProgressWriter {
290+ pub struct ProgressWriter < W : AsyncStreamWriter = DefaultWriter > {
286291 /// The quinn::SendStream to write to
287- pub inner : SendStream ,
292+ pub inner : W ,
288293 pub ( crate ) context : WriterContext ,
289294}
290295
291- impl ProgressWriter {
292- fn new ( inner : SendStream , context : WriterContext ) -> Self {
296+ impl < W : AsyncStreamWriter > ProgressWriter < W > {
297+ fn new ( inner : W , context : WriterContext ) -> Self {
293298 Self { inner, context }
294299 }
295300
@@ -465,7 +470,7 @@ pub async fn handle_push(
465470 if !root_ranges. is_empty ( ) {
466471 // todo: send progress from import_bao_quinn or rename to import_bao_quinn_with_progress
467472 store
468- . import_bao_quinn ( hash, root_ranges. clone ( ) , & mut reader. inner )
473+ . import_bao_reader ( hash, root_ranges. clone ( ) , & mut reader. inner )
469474 . await ?;
470475 }
471476 if request. ranges . is_blob ( ) {
@@ -480,7 +485,7 @@ pub async fn handle_push(
480485 continue ;
481486 }
482487 store
483- . import_bao_quinn ( child_hash, child_ranges. clone ( ) , & mut reader. inner )
488+ . import_bao_reader ( child_hash, child_ranges. clone ( ) , & mut reader. inner )
484489 . await ?;
485490 }
486491 Ok ( ( ) )
@@ -496,7 +501,7 @@ pub(crate) async fn send_blob(
496501) -> ExportBaoResult < ( ) > {
497502 store
498503 . export_bao ( hash, ranges)
499- . write_quinn_with_progress ( & mut writer. inner , & mut writer. context , & hash, index)
504+ . write_with_progress ( & mut writer. inner , & mut writer. context , & hash, index)
500505 . await
501506}
502507
@@ -527,7 +532,7 @@ pub async fn handle_observe(
527532 send_observe_item( writer, & diff) . await ?;
528533 old = new;
529534 }
530- _ = writer. inner. stopped( ) => {
535+ _ = writer. inner. 0 . stopped( ) => {
531536 debug!( "observer closed" ) ;
532537 break ;
533538 }
@@ -539,13 +544,13 @@ pub async fn handle_observe(
539544async fn send_observe_item ( writer : & mut ProgressWriter , item : & Bitfield ) -> Result < ( ) > {
540545 use irpc:: util:: AsyncWriteVarintExt ;
541546 let item = ObserveItem :: from ( item) ;
542- let len = writer. inner . write_length_prefixed ( item) . await ?;
547+ let len = writer. inner . 0 . write_length_prefixed ( item) . await ?;
543548 writer. context . log_other_write ( len) ;
544549 Ok ( ( ) )
545550}
546551
547- pub struct ProgressReader {
548- inner : RecvStream ,
552+ pub struct ProgressReader < R : AsyncStreamReader = DefaultReader > {
553+ inner : R ,
549554 context : ReaderContext ,
550555}
551556
0 commit comments