Skip to content

Commit f3d02e7

Browse files
committed
Working adapters
1 parent 4c4a5e7 commit f3d02e7

File tree

3 files changed

+249
-48
lines changed

3 files changed

+249
-48
lines changed

examples/compression.rs renamed to examples/compression.rs_

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,11 @@ use iroh_blobs::{
2222
get::fsm::{AtConnected, ConnectedNext, EndBlobNext},
2323
protocol::{ChunkRangesSeq, GetRequest, Request},
2424
provider::{
25-
events::{ClientConnected, EventSender, HasErrorCode},
26-
handle_get, ErrorHandler, StreamPair,
25+
events::{ClientConnected, EventSender, HasErrorCode}, handle_get, AsyncReadRecvStream, AsyncWriteSendStream, ErrorHandler, StreamPair
2726
},
2827
store::mem::MemStore,
2928
ticket::BlobTicket,
3029
};
31-
use iroh_io::{TokioStreamReader, TokioStreamWriter};
3230
use tokio::io::BufReader;
3331
use tracing::debug;
3432

@@ -53,8 +51,8 @@ pub enum Args {
5351
}
5452

5553
type CompressedWriter =
56-
TokioStreamWriter<async_compression::tokio::write::Lz4Encoder<iroh::endpoint::SendStream>>;
57-
type CompressedReader = TokioStreamReader<
54+
AsyncWriteSendStream<async_compression::tokio::write::Lz4Encoder<iroh::endpoint::SendStream>>;
55+
type CompressedReader = AsyncReadRecvStream<
5856
async_compression::tokio::bufread::Lz4Decoder<BufReader<iroh::endpoint::RecvStream>>,
5957
>;
6058

@@ -158,8 +156,8 @@ async fn main() -> Result<()> {
158156
Args::Get { ticket, target } => {
159157
let conn = endpoint.connect(ticket.node_addr().clone(), ALPN).await?;
160158
let (send, recv) = conn.open_bi().await?;
161-
let send = TokioStreamWriter(Lz4Encoder::new(send));
162-
let recv = TokioStreamReader(Lz4Decoder::new(BufReader::new(recv)));
159+
let send = AsyncWriteSendStream(Lz4Encoder::new(send));
160+
let recv = AsyncReadRecvStream::new(Lz4Decoder::new(BufReader::new(recv)));
163161
let request = GetRequest {
164162
hash: ticket.hash(),
165163
ranges: ChunkRangesSeq::root(),

src/get.rs

Lines changed: 32 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -82,8 +82,8 @@ impl AsyncStreamReader for IrohStreamReader {
8282
}
8383
}
8484

85-
type DefaultReader = IrohStreamReader;
86-
type DefaultWriter = IrohStreamWriter;
85+
type DefaultReader = iroh::endpoint::RecvStream;
86+
type DefaultWriter = iroh::endpoint::SendStream;
8787

8888
/// Stats about the transfer.
8989
#[derive(
@@ -139,14 +139,14 @@ pub mod fsm {
139139
};
140140
use derive_more::From;
141141
use iroh::endpoint::Connection;
142-
use iroh_io::{AsyncSliceWriter, AsyncStreamReader, AsyncStreamWriter};
142+
use iroh_io::{AsyncSliceWriter};
143143

144144
use super::*;
145145
use crate::{
146146
get::get_error::BadRequestSnafu,
147147
protocol::{
148148
GetManyRequest, GetRequest, NonEmptyRequestRangeSpecIter, Request, MAX_MESSAGE_SIZE,
149-
},
149+
}, provider::{RecvStream, RecvStreamAsyncStreamReader, SendStream},
150150
};
151151

152152
self_cell::self_cell! {
@@ -173,17 +173,15 @@ pub mod fsm {
173173
counters: RequestCounters,
174174
) -> std::result::Result<Result<AtStartChild, AtClosing>, GetError> {
175175
let start = Instant::now();
176-
let (writer, reader) = connection
176+
let (mut writer, reader) = connection
177177
.open_bi()
178178
.await
179179
.map_err(|e| OpenSnafu.into_error(e.into()))?;
180-
let reader = IrohStreamReader(reader);
181-
let mut writer = IrohStreamWriter(writer);
182180
let request = Request::GetMany(request);
183181
let request_bytes = postcard::to_stdvec(&request)
184182
.map_err(|source| BadRequestSnafu.into_error(source.into()))?;
185183
writer
186-
.write_bytes(request_bytes.into())
184+
.send_bytes(request_bytes.into())
187185
.await
188186
.context(connected_next_error::WriteSnafu)?;
189187
let Request::GetMany(request) = request else {
@@ -270,8 +268,6 @@ pub mod fsm {
270268
.open_bi()
271269
.await
272270
.map_err(|e| OpenSnafu.into_error(e.into()))?;
273-
let reader = IrohStreamReader(reader);
274-
let writer = IrohStreamWriter(writer);
275271
Ok(AtConnected {
276272
start,
277273
reader,
@@ -298,8 +294,8 @@ pub mod fsm {
298294
/// State of the get response machine after the handshake has been sent
299295
#[derive(Debug)]
300296
pub struct AtConnected<
301-
R: AsyncStreamReader = DefaultReader,
302-
W: AsyncStreamWriter = DefaultWriter,
297+
R: RecvStream = DefaultReader,
298+
W: SendStream = DefaultWriter,
303299
> {
304300
start: Instant,
305301
reader: R,
@@ -310,7 +306,7 @@ pub mod fsm {
310306

311307
/// Possible next states after the handshake has been sent
312308
#[derive(Debug, From)]
313-
pub enum ConnectedNext<R: AsyncStreamReader> {
309+
pub enum ConnectedNext<R: RecvStream = DefaultReader> {
314310
/// First response is either a collection or a single blob
315311
StartRoot(AtStartRoot<R>),
316312
/// First response is a child
@@ -341,7 +337,7 @@ pub mod fsm {
341337
Write { source: io::Error },
342338
}
343339

344-
impl<R: AsyncStreamReader, W: AsyncStreamWriter> AtConnected<R, W> {
340+
impl<R: RecvStream, W: SendStream> AtConnected<R, W> {
345341
pub fn new(
346342
start: Instant,
347343
reader: R,
@@ -390,7 +386,7 @@ pub mod fsm {
390386
// write the request itself
391387
let len = request_bytes.len() as u64;
392388
writer
393-
.write_bytes(request_bytes.into())
389+
.send_bytes(request_bytes.into())
394390
.await
395391
.context(connected_next_error::WriteSnafu)?;
396392
writer
@@ -438,7 +434,7 @@ pub mod fsm {
438434

439435
/// State of the get response when we start reading a collection
440436
#[derive(Debug)]
441-
pub struct AtStartRoot<R: AsyncStreamReader = DefaultReader> {
437+
pub struct AtStartRoot<R: RecvStream = DefaultReader> {
442438
ranges: ChunkRanges,
443439
reader: R,
444440
misc: Box<Misc>,
@@ -447,14 +443,14 @@ pub mod fsm {
447443

448444
/// State of the get response when we start reading a child
449445
#[derive(Debug)]
450-
pub struct AtStartChild<R: AsyncStreamReader = DefaultReader> {
446+
pub struct AtStartChild<R: RecvStream = DefaultReader> {
451447
ranges: ChunkRanges,
452448
reader: R,
453449
misc: Box<Misc>,
454450
offset: u64,
455451
}
456452

457-
impl<R: AsyncStreamReader> AtStartChild<R> {
453+
impl<R: RecvStream> AtStartChild<R> {
458454
/// The offset of the child we are currently reading
459455
///
460456
/// This must be used to determine the hash needed to call next.
@@ -491,7 +487,7 @@ pub mod fsm {
491487
}
492488
}
493489

494-
impl<R: AsyncStreamReader> AtStartRoot<R> {
490+
impl<R: RecvStream> AtStartRoot<R> {
495491
/// The ranges we have requested for the child
496492
pub fn ranges(&self) -> &ChunkRanges {
497493
&self.ranges
@@ -522,7 +518,8 @@ pub mod fsm {
522518

523519
/// State before reading a size header
524520
#[derive(Debug)]
525-
pub struct AtBlobHeader<R: AsyncStreamReader = DefaultReader> {
521+
pub struct AtBlobHeader<R: RecvStream = DefaultReader>
522+
{
526523
ranges: ChunkRanges,
527524
reader: R,
528525
misc: Box<Misc>,
@@ -560,10 +557,10 @@ pub mod fsm {
560557
}
561558
}
562559

563-
impl<R: AsyncStreamReader> AtBlobHeader<R> {
560+
impl<R: RecvStream> AtBlobHeader<R> {
564561
/// Read the size header, returning it and going into the `Content` state.
565562
pub async fn next(mut self) -> Result<(AtBlobContent<R>, u64), AtBlobHeaderNextError> {
566-
let size = self.reader.read::<8>().await.map_err(|cause| {
563+
let size = self.reader.recv::<8>().await.map_err(|cause| {
567564
if cause.kind() == io::ErrorKind::UnexpectedEof {
568565
at_blob_header_next_error::NotFoundSnafu.build()
569566
} else {
@@ -576,7 +573,7 @@ pub mod fsm {
576573
self.hash.into(),
577574
self.ranges,
578575
BaoTree::new(size, IROH_BLOCK_SIZE),
579-
self.reader,
576+
RecvStreamAsyncStreamReader::new(self.reader),
580577
);
581578
Ok((
582579
AtBlobContent {
@@ -650,8 +647,8 @@ pub mod fsm {
650647

651648
/// State while we are reading content
652649
#[derive(Debug)]
653-
pub struct AtBlobContent<R: AsyncStreamReader = DefaultReader> {
654-
stream: ResponseDecoder<R>,
650+
pub struct AtBlobContent<R: RecvStream = DefaultReader> {
651+
stream: ResponseDecoder<RecvStreamAsyncStreamReader<R>>,
655652
misc: Box<Misc>,
656653
}
657654

@@ -765,7 +762,7 @@ pub mod fsm {
765762

766763
/// The next state after reading a content item
767764
#[derive(Debug, From)]
768-
pub enum BlobContentNext<R: AsyncStreamReader> {
765+
pub enum BlobContentNext<R: RecvStream> {
769766
/// We expect more content
770767
More(
771768
(
@@ -777,7 +774,7 @@ pub mod fsm {
777774
Done(AtEndBlob<R>),
778775
}
779776

780-
impl<R: AsyncStreamReader> AtBlobContent<R> {
777+
impl<R: RecvStream> AtBlobContent<R> {
781778
/// Read the next item, either content, an error, or the end of the blob
782779
pub async fn next(self) -> BlobContentNext<R> {
783780
match self.stream.next().await {
@@ -796,7 +793,7 @@ pub mod fsm {
796793
BlobContentNext::More((next, res))
797794
}
798795
ResponseDecoderNext::Done(stream) => BlobContentNext::Done(AtEndBlob {
799-
stream,
796+
stream: stream.into_inner(),
800797
misc: self.misc,
801798
}),
802799
}
@@ -933,27 +930,27 @@ pub mod fsm {
933930

934931
/// Immediately finish the get response without reading further
935932
pub fn finish(self) -> AtClosing<R> {
936-
AtClosing::new(self.misc, self.stream.finish(), false)
933+
AtClosing::new(self.misc, self.stream.finish().into_inner(), false)
937934
}
938935
}
939936

940937
/// State after we have read all the content for a blob
941938
#[derive(Debug)]
942-
pub struct AtEndBlob<R: AsyncStreamReader = DefaultReader> {
939+
pub struct AtEndBlob<R: RecvStream = DefaultReader> {
943940
stream: R,
944941
misc: Box<Misc>,
945942
}
946943

947944
/// The next state after the end of a blob
948945
#[derive(Debug, From)]
949-
pub enum EndBlobNext<R: AsyncStreamReader = DefaultReader> {
946+
pub enum EndBlobNext<R: RecvStream = DefaultReader> {
950947
/// Response is expected to have more children
951948
MoreChildren(AtStartChild<R>),
952949
/// No more children expected
953950
Closing(AtClosing<R>),
954951
}
955952

956-
impl<R: AsyncStreamReader> AtEndBlob<R> {
953+
impl<R: RecvStream> AtEndBlob<R> {
957954
/// Read the next child, or finish
958955
pub fn next(mut self) -> EndBlobNext<R> {
959956
if let Some((offset, ranges)) = self.misc.ranges_iter.next() {
@@ -972,13 +969,13 @@ pub mod fsm {
972969

973970
/// State when finishing the get response
974971
#[derive(Debug)]
975-
pub struct AtClosing<R: AsyncStreamReader = DefaultReader> {
972+
pub struct AtClosing<R: RecvStream = DefaultReader> {
976973
misc: Box<Misc>,
977974
reader: R,
978975
check_extra_data: bool,
979976
}
980977

981-
impl<R: AsyncStreamReader> AtClosing<R> {
978+
impl<R: RecvStream> AtClosing<R> {
982979
fn new(misc: Box<Misc>, reader: R, check_extra_data: bool) -> Self {
983980
Self {
984981
misc,
@@ -992,7 +989,7 @@ pub mod fsm {
992989
// Shut down the stream
993990
let mut reader = self.reader;
994991
if self.check_extra_data {
995-
let rest = reader.read_bytes(1).await?;
992+
let rest = reader.recv_bytes(1).await?;
996993
if !rest.is_empty() {
997994
error!("Unexpected extra data at the end of the stream");
998995
}

0 commit comments

Comments
 (0)