Skip to content

Commit 4c4a5e7

Browse files
committed
Add example how to add compression to the entire blobs protocol.
1 parent 4e8387a commit 4c4a5e7

File tree

6 files changed

+328
-10
lines changed

6 files changed

+328
-10
lines changed

Cargo.lock

Lines changed: 97 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ iroh-base = "0.91.1"
4444
reflink-copy = "0.1.24"
4545
irpc = { version = "0.7.0", features = ["rpc", "quinn_endpoint_setup", "spans", "stream", "derive"], default-features = false }
4646
iroh-metrics = { version = "0.35" }
47+
async-compression = { version = "0.4.30", features = ["lz4", "tokio"] }
4748

4849
[dev-dependencies]
4950
clap = { version = "4.5.31", features = ["derive"] }
@@ -60,6 +61,7 @@ tracing-test = "0.2.5"
6061
walkdir = "2.5.0"
6162
atomic_refcell = "0.1.13"
6263
iroh = { version = "0.91.1", features = ["discovery-local-network"]}
64+
async-compression = { version = "0.4.30", features = ["zstd", "tokio"] }
6365

6466
[features]
6567
hide-proto-docs = []
@@ -68,4 +70,4 @@ default = ["hide-proto-docs"]
6870

6971
[patch.crates-io]
7072
iroh = { git = "https://github.com/n0-computer/iroh", branch = "main" }
71-
iroh-base = { git = "https://github.com/n0-computer/iroh", branch = "main" }
73+
iroh-base = { git = "https://github.com/n0-computer/iroh", branch = "main" }

examples/compression.rs

Lines changed: 191 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,191 @@
1+
/// Example how to limit blob requests by hash and node id, and to add
2+
/// throttling or limiting the maximum number of connections.
3+
///
4+
/// Limiting is done via a fn that returns an EventSender and internally
5+
/// makes liberal use of spawn to spawn background tasks.
6+
///
7+
/// This is fine, since the tasks will terminate as soon as the [BlobsProtocol]
8+
/// instance holding the [EventSender] will be dropped. But for production
9+
/// grade code you might nevertheless put the tasks into a [tokio::task::JoinSet] or
10+
/// [n0_future::FuturesUnordered].
11+
mod common;
12+
use std::{path::PathBuf, time::Instant};
13+
14+
use anyhow::Result;
15+
use async_compression::tokio::{bufread::Lz4Decoder, write::Lz4Encoder};
16+
use bao_tree::blake3;
17+
use clap::Parser;
18+
use common::setup_logging;
19+
use iroh::protocol::ProtocolHandler;
20+
use iroh_blobs::{
21+
api::Store,
22+
get::fsm::{AtConnected, ConnectedNext, EndBlobNext},
23+
protocol::{ChunkRangesSeq, GetRequest, Request},
24+
provider::{
25+
events::{ClientConnected, EventSender, HasErrorCode},
26+
handle_get, ErrorHandler, StreamPair,
27+
},
28+
store::mem::MemStore,
29+
ticket::BlobTicket,
30+
};
31+
use iroh_io::{TokioStreamReader, TokioStreamWriter};
32+
use tokio::io::BufReader;
33+
use tracing::debug;
34+
35+
use crate::common::get_or_generate_secret_key;
36+
37+
#[derive(Debug, Parser)]
38+
#[command(version, about)]
39+
pub enum Args {
40+
/// Limit requests by node id
41+
Provide {
42+
/// Path for files to add.
43+
path: PathBuf,
44+
},
45+
/// Get a blob. Just for completeness sake.
46+
Get {
47+
/// Ticket for the blob to download
48+
ticket: BlobTicket,
49+
/// Path to save the blob to
50+
#[clap(long)]
51+
target: Option<PathBuf>,
52+
},
53+
}
54+
55+
type CompressedWriter =
56+
TokioStreamWriter<async_compression::tokio::write::Lz4Encoder<iroh::endpoint::SendStream>>;
57+
type CompressedReader = TokioStreamReader<
58+
async_compression::tokio::bufread::Lz4Decoder<BufReader<iroh::endpoint::RecvStream>>,
59+
>;
60+
61+
#[derive(Debug, Clone)]
62+
struct CompressedBlobsProtocol {
63+
store: Store,
64+
events: EventSender,
65+
}
66+
67+
impl CompressedBlobsProtocol {
68+
fn new(store: &Store, events: EventSender) -> Self {
69+
Self {
70+
store: store.clone(),
71+
events,
72+
}
73+
}
74+
}
75+
76+
struct CompressedErrorHandler;
77+
78+
impl ErrorHandler for CompressedErrorHandler {
79+
type W = CompressedWriter;
80+
81+
type R = CompressedReader;
82+
83+
async fn stop(reader: &mut Self::R, code: quinn::VarInt) {
84+
reader.0.get_mut().get_mut().stop(code).ok();
85+
}
86+
87+
async fn reset(writer: &mut Self::W, code: quinn::VarInt) {
88+
writer.0.get_mut().reset(code).ok();
89+
}
90+
}
91+
92+
impl ProtocolHandler for CompressedBlobsProtocol {
93+
async fn accept(
94+
&self,
95+
connection: iroh::endpoint::Connection,
96+
) -> std::result::Result<(), iroh::protocol::AcceptError> {
97+
let connection_id = connection.stable_id() as u64;
98+
let node_id = connection.remote_node_id()?;
99+
if let Err(cause) = self
100+
.events
101+
.client_connected(|| ClientConnected {
102+
connection_id,
103+
node_id,
104+
})
105+
.await
106+
{
107+
connection.close(cause.code(), cause.reason());
108+
debug!("closing connection: {cause}");
109+
return Ok(());
110+
}
111+
while let Ok((send, recv)) = connection.accept_bi().await {
112+
let stream_id = send.id().index();
113+
let send = TokioStreamWriter(Lz4Encoder::new(send));
114+
let recv = TokioStreamReader(Lz4Decoder::new(BufReader::new(recv)));
115+
let store = self.store.clone();
116+
let mut pair =
117+
StreamPair::new(connection_id, stream_id, recv, send, self.events.clone());
118+
tokio::spawn(async move {
119+
let request = pair.read_request().await?;
120+
if let Request::Get(request) = request {
121+
handle_get::<CompressedErrorHandler>(pair, store, request).await?;
122+
}
123+
anyhow::Ok(())
124+
});
125+
}
126+
Ok(())
127+
}
128+
}
129+
130+
const ALPN: &[u8] = b"iroh-blobs-compressed/0.1.0";
131+
132+
#[tokio::main]
133+
async fn main() -> Result<()> {
134+
setup_logging();
135+
let args = Args::parse();
136+
let secret = get_or_generate_secret_key()?;
137+
let endpoint = iroh::Endpoint::builder()
138+
.secret_key(secret)
139+
.discovery_n0()
140+
.bind()
141+
.await?;
142+
match args {
143+
Args::Provide { path } => {
144+
let store = MemStore::new();
145+
let tag = store.add_path(path).await?;
146+
let blobs = CompressedBlobsProtocol::new(&store, EventSender::DEFAULT);
147+
let router = iroh::protocol::Router::builder(endpoint.clone())
148+
.accept(ALPN, blobs)
149+
.spawn();
150+
let ticket = BlobTicket::new(endpoint.node_id().into(), tag.hash, tag.format);
151+
println!("Serving blob with hash {}", tag.hash);
152+
println!("Ticket: {ticket}");
153+
println!("Node is running. Press Ctrl-C to exit.");
154+
tokio::signal::ctrl_c().await?;
155+
println!("Shutting down.");
156+
router.shutdown().await?;
157+
}
158+
Args::Get { ticket, target } => {
159+
let conn = endpoint.connect(ticket.node_addr().clone(), ALPN).await?;
160+
let (send, recv) = conn.open_bi().await?;
161+
let send = TokioStreamWriter(Lz4Encoder::new(send));
162+
let recv = TokioStreamReader(Lz4Decoder::new(BufReader::new(recv)));
163+
let request = GetRequest {
164+
hash: ticket.hash(),
165+
ranges: ChunkRangesSeq::root(),
166+
};
167+
let connected =
168+
AtConnected::new(Instant::now(), recv, send, request, Default::default());
169+
let ConnectedNext::StartRoot(start) = connected.next().await? else {
170+
unreachable!("expected start root");
171+
};
172+
let (end, data) = start.next().concatenate_into_vec().await?;
173+
let EndBlobNext::Closing(closing) = end.next() else {
174+
unreachable!("expected closing");
175+
};
176+
let stats = closing.next().await?;
177+
if let Some(target) = target {
178+
tokio::fs::write(&target, &data).await?;
179+
println!(
180+
"Wrote {} bytes to {}",
181+
stats.payload_bytes_read,
182+
target.display()
183+
);
184+
} else {
185+
let hash = blake3::hash(&data);
186+
println!("Hash: {hash}");
187+
}
188+
}
189+
}
190+
Ok(())
191+
}

0 commit comments

Comments
 (0)