Skip to content

Commit bc159ca

Browse files
committed
More moving stuff around
1 parent 349c36b commit bc159ca

File tree

10 files changed

+650
-579
lines changed

10 files changed

+650
-579
lines changed

Cargo.lock

Lines changed: 7 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ walkdir = "2.5.0"
6262
atomic_refcell = "0.1.13"
6363
iroh = { version = "0.91.1", features = ["discovery-local-network"]}
6464
async-compression = { version = "0.4.30", features = ["zstd", "tokio"] }
65+
concat_const = "0.2.0"
6566

6667
[features]
6768
hide-proto-docs = []

examples/compression.rs

Lines changed: 95 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -9,24 +9,23 @@
99
/// grade code you might nevertheless put the tasks into a [tokio::task::JoinSet] or
1010
/// [n0_future::FuturesUnordered].
1111
mod common;
12-
use std::{io, path::PathBuf};
12+
use std::{fmt::Debug, path::PathBuf};
1313

1414
use anyhow::Result;
15-
use async_compression::tokio::{bufread::Lz4Decoder, write::Lz4Encoder};
1615
use clap::Parser;
1716
use common::setup_logging;
18-
use iroh::{endpoint::VarInt, protocol::ProtocolHandler};
17+
use iroh::protocol::ProtocolHandler;
1918
use iroh_blobs::{
2019
api::Store,
20+
get::StreamPair,
2121
provider::{
22+
self,
2223
events::{ClientConnected, EventSender, HasErrorCode},
23-
handle_stream, AsyncReadRecvStream, AsyncWriteSendStream, RecvStreamSpecific,
24-
SendStreamSpecific, StreamPair,
24+
handle_stream,
2525
},
2626
store::mem::MemStore,
2727
ticket::BlobTicket,
2828
};
29-
use tokio::io::{AsyncRead, AsyncWrite, BufReader};
3029
use tracing::debug;
3130

3231
use crate::common::get_or_generate_secret_key;
@@ -49,54 +48,110 @@ pub enum Args {
4948
},
5049
}
5150

52-
struct CompressedWriteStream(Lz4Encoder<iroh::endpoint::SendStream>);
51+
trait Compression: Clone + Send + Sync + Debug + 'static {
52+
const ALPN: &'static [u8];
53+
fn recv_stream(
54+
&self,
55+
stream: iroh::endpoint::RecvStream,
56+
) -> impl iroh_blobs::util::RecvStream + Sync + 'static;
57+
fn send_stream(
58+
&self,
59+
stream: iroh::endpoint::SendStream,
60+
) -> impl iroh_blobs::util::SendStream + Sync + 'static;
61+
}
5362

54-
impl SendStreamSpecific for CompressedWriteStream {
55-
fn inner(&mut self) -> &mut (impl AsyncWrite + Unpin + Send) {
56-
&mut self.0
57-
}
63+
mod lz4 {
64+
use std::io;
65+
66+
use async_compression::tokio::{bufread::Lz4Decoder, write::Lz4Encoder};
67+
use iroh::endpoint::VarInt;
68+
use iroh_blobs::util::{
69+
AsyncReadRecvStream, AsyncWriteSendStream, RecvStreamSpecific, SendStreamSpecific,
70+
};
71+
use tokio::io::{AsyncRead, AsyncWrite, BufReader};
72+
73+
struct SendStream(Lz4Encoder<iroh::endpoint::SendStream>);
5874

59-
fn reset(&mut self, code: VarInt) -> io::Result<()> {
60-
Ok(self.0.get_mut().reset(code)?)
75+
impl SendStream {
76+
pub fn new(inner: iroh::endpoint::SendStream) -> AsyncWriteSendStream<Self> {
77+
AsyncWriteSendStream::new(Self(Lz4Encoder::new(inner)))
78+
}
6179
}
6280

63-
async fn stopped(&mut self) -> io::Result<Option<VarInt>> {
64-
Ok(self.0.get_mut().stopped().await?)
81+
impl SendStreamSpecific for SendStream {
82+
fn inner(&mut self) -> &mut (impl AsyncWrite + Unpin + Send) {
83+
&mut self.0
84+
}
85+
86+
fn reset(&mut self, code: VarInt) -> io::Result<()> {
87+
Ok(self.0.get_mut().reset(code)?)
88+
}
89+
90+
async fn stopped(&mut self) -> io::Result<Option<VarInt>> {
91+
Ok(self.0.get_mut().stopped().await?)
92+
}
6593
}
66-
}
6794

68-
struct CompressedReadStream(Lz4Decoder<BufReader<iroh::endpoint::RecvStream>>);
95+
struct RecvStream(Lz4Decoder<BufReader<iroh::endpoint::RecvStream>>);
6996

70-
impl RecvStreamSpecific for CompressedReadStream {
71-
fn inner(&mut self) -> &mut (impl AsyncRead + Unpin + Send) {
72-
&mut self.0
97+
impl RecvStream {
98+
pub fn new(inner: iroh::endpoint::RecvStream) -> AsyncReadRecvStream<Self> {
99+
AsyncReadRecvStream::new(Self(Lz4Decoder::new(BufReader::new(inner))))
100+
}
73101
}
74102

75-
fn stop(&mut self, code: VarInt) -> io::Result<()> {
76-
Ok(self.0.get_mut().get_mut().stop(code)?)
103+
impl RecvStreamSpecific for RecvStream {
104+
fn inner(&mut self) -> &mut (impl AsyncRead + Unpin + Send) {
105+
&mut self.0
106+
}
107+
108+
fn stop(&mut self, code: VarInt) -> io::Result<()> {
109+
Ok(self.0.get_mut().get_mut().stop(code)?)
110+
}
111+
112+
fn id(&self) -> u64 {
113+
self.0.get_ref().get_ref().id().index()
114+
}
77115
}
78116

79-
fn id(&self) -> u64 {
80-
self.0.get_ref().get_ref().id().index()
117+
#[derive(Debug, Clone)]
118+
pub struct Compression;
119+
120+
impl super::Compression for Compression {
121+
const ALPN: &[u8] = concat_const::concat_bytes!(b"lz4/", iroh_blobs::ALPN);
122+
fn recv_stream(
123+
&self,
124+
stream: iroh::endpoint::RecvStream,
125+
) -> impl iroh_blobs::util::RecvStream + Sync + 'static {
126+
RecvStream::new(stream)
127+
}
128+
fn send_stream(
129+
&self,
130+
stream: iroh::endpoint::SendStream,
131+
) -> impl iroh_blobs::util::SendStream + Sync + 'static {
132+
SendStream::new(stream)
133+
}
81134
}
82135
}
83136

84137
#[derive(Debug, Clone)]
85-
struct CompressedBlobsProtocol {
138+
struct CompressedBlobsProtocol<C: Compression> {
86139
store: Store,
87140
events: EventSender,
141+
compression: C,
88142
}
89143

90-
impl CompressedBlobsProtocol {
91-
fn new(store: &Store, events: EventSender) -> Self {
144+
impl<C: Compression> CompressedBlobsProtocol<C> {
145+
fn new(store: &Store, events: EventSender, compression: C) -> Self {
92146
Self {
93147
store: store.clone(),
94148
events,
149+
compression,
95150
}
96151
}
97152
}
98153

99-
impl ProtocolHandler for CompressedBlobsProtocol {
154+
impl<C: Compression> ProtocolHandler for CompressedBlobsProtocol<C> {
100155
async fn accept(
101156
&self,
102157
connection: iroh::endpoint::Connection,
@@ -116,20 +171,16 @@ impl ProtocolHandler for CompressedBlobsProtocol {
116171
return Ok(());
117172
}
118173
while let Ok((send, recv)) = connection.accept_bi().await {
119-
let send = AsyncWriteSendStream::new(CompressedWriteStream(Lz4Encoder::new(send)));
120-
let recv = AsyncReadRecvStream::new(CompressedReadStream(Lz4Decoder::new(
121-
BufReader::new(recv),
122-
)));
174+
let send = self.compression.send_stream(send);
175+
let recv = self.compression.recv_stream(recv);
123176
let store = self.store.clone();
124-
let pair = StreamPair::new(connection_id, recv, send, self.events.clone());
177+
let pair = provider::StreamPair::new(connection_id, recv, send, self.events.clone());
125178
tokio::spawn(handle_stream(pair, store));
126179
}
127180
Ok(())
128181
}
129182
}
130183

131-
const ALPN: &[u8] = b"iroh-blobs-compressed/0.1.0";
132-
133184
#[tokio::main]
134185
async fn main() -> Result<()> {
135186
setup_logging();
@@ -140,13 +191,14 @@ async fn main() -> Result<()> {
140191
.discovery_n0()
141192
.bind()
142193
.await?;
194+
let compression = lz4::Compression;
143195
match args {
144196
Args::Provide { path } => {
145197
let store = MemStore::new();
146198
let tag = store.add_path(path).await?;
147-
let blobs = CompressedBlobsProtocol::new(&store, EventSender::DEFAULT);
199+
let blobs = CompressedBlobsProtocol::new(&store, EventSender::DEFAULT, compression);
148200
let router = iroh::protocol::Router::builder(endpoint.clone())
149-
.accept(ALPN, blobs)
201+
.accept(lz4::Compression::ALPN, blobs)
150202
.spawn();
151203
let ticket = BlobTicket::new(endpoint.node_id().into(), tag.hash, tag.format);
152204
println!("Serving blob with hash {}", tag.hash);
@@ -158,14 +210,14 @@ async fn main() -> Result<()> {
158210
}
159211
Args::Get { ticket, target } => {
160212
let store = MemStore::new();
161-
let conn = endpoint.connect(ticket.node_addr().clone(), ALPN).await?;
213+
let conn = endpoint
214+
.connect(ticket.node_addr().clone(), &lz4::Compression::ALPN)
215+
.await?;
162216
let connection_id = conn.stable_id() as u64;
163217
let (send, recv) = conn.open_bi().await?;
164-
let send = AsyncWriteSendStream::new(CompressedWriteStream(Lz4Encoder::new(send)));
165-
let recv = AsyncReadRecvStream::new(CompressedReadStream(Lz4Decoder::new(
166-
BufReader::new(recv),
167-
)));
168-
let sp = StreamPair::new(connection_id, recv, send, EventSender::DEFAULT);
218+
let send = compression.send_stream(send);
219+
let recv = compression.recv_stream(recv);
220+
let sp = StreamPair::new(connection_id, recv, send);
169221
let stats = store.remote().fetch(sp, ticket.hash_and_format()).await?;
170222
if let Some(target) = target {
171223
let size = store.export(ticket.hash(), &target).await?;

src/api/blobs.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -55,9 +55,9 @@ use super::{
5555
};
5656
use crate::{
5757
api::proto::{BatchRequest, ImportByteStreamUpdate},
58-
provider::{events::ClientResult, RecvStreamAsyncStreamReader},
58+
provider::events::ClientResult,
5959
store::IROH_BLOCK_SIZE,
60-
util::temp_tag::TempTag,
60+
util::{temp_tag::TempTag, RecvStreamAsyncStreamReader},
6161
BlobFormat, Hash, HashAndFormat,
6262
};
6363

@@ -429,7 +429,7 @@ impl Blobs {
429429
}
430430

431431
#[cfg_attr(feature = "hide-proto-docs", doc(hidden))]
432-
pub async fn import_bao_reader<R: crate::provider::RecvStream>(
432+
pub async fn import_bao_reader<R: crate::util::RecvStream>(
433433
&self,
434434
hash: Hash,
435435
ranges: ChunkRanges,
@@ -1073,7 +1073,7 @@ impl ExportBaoProgress {
10731073
}
10741074

10751075
/// Write quinn variant that also feeds a progress writer.
1076-
pub(crate) async fn write_with_progress<W: crate::provider::SendStream>(
1076+
pub(crate) async fn write_with_progress<W: crate::util::SendStream>(
10771077
self,
10781078
writer: &mut W,
10791079
progress: &mut impl WriteProgress,

0 commit comments

Comments
 (0)