Skip to content

Commit 4ddc137

Browse files
committed
Merge branch 'main' into newtype-it
2 parents 1390954 + 49ab2b7 commit 4ddc137

File tree

26 files changed

+349
-301
lines changed

26 files changed

+349
-301
lines changed

Cargo.toml

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ quinn = { package = "iroh-quinn", version = "0.14.0" }
2121
n0-future = "0.2.0"
2222
n0-snafu = "0.2.0"
2323
range-collections = { version = "0.4.6", features = ["serde"] }
24-
redb = { version = "=2.4" }
2524
smallvec = { version = "1", features = ["serde", "const_new"] }
2625
snafu = "0.8.5"
2726
tokio = { version = "1.43.0", features = ["full"] }
@@ -41,10 +40,11 @@ iroh = "0.91.1"
4140
self_cell = "1.1.0"
4241
genawaiter = { version = "0.99.1", features = ["futures03"] }
4342
iroh-base = "0.91.1"
44-
reflink-copy = "0.1.24"
4543
irpc = { version = "0.7.0", features = ["rpc", "quinn_endpoint_setup", "spans", "stream", "derive"], default-features = false }
4644
iroh-metrics = { version = "0.35" }
4745
async-compression = { version = "0.4.30", features = ["lz4", "tokio"] }
46+
redb = { version = "=2.4", optional = true }
47+
reflink-copy = { version = "0.1.24", optional = true }
4848

4949
[dev-dependencies]
5050
clap = { version = "4.5.31", features = ["derive"] }
@@ -67,7 +67,8 @@ concat_const = "0.2.0"
6767
[features]
6868
hide-proto-docs = []
6969
metrics = []
70-
default = ["hide-proto-docs"]
70+
default = ["hide-proto-docs", "fs-store"]
71+
fs-store = ["dep:redb", "dep:reflink-copy"]
7172

7273
[patch.crates-io]
7374
iroh = { git = "https://github.com/n0-computer/iroh", branch = "main" }

examples/compression.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -151,12 +151,11 @@ impl<C: Compression> ProtocolHandler for CompressedBlobsProtocol<C> {
151151
connection: iroh::endpoint::Connection,
152152
) -> std::result::Result<(), iroh::protocol::AcceptError> {
153153
let connection_id = connection.stable_id() as u64;
154-
let node_id = connection.remote_node_id()?;
155154
if let Err(cause) = self
156155
.events
157156
.client_connected(|| ClientConnected {
158157
connection_id,
159-
node_id,
158+
node_id: connection.remote_node_id().ok(),
160159
})
161160
.await
162161
{

examples/limit.rs

Lines changed: 32 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -81,20 +81,26 @@ fn limit_by_node_id(allowed_nodes: HashSet<NodeId>) -> EventSender {
8181
let mask = EventMask {
8282
// We want a request for each incoming connection so we can accept
8383
// or reject them. We don't need any other events.
84-
connected: ConnectMode::Request,
84+
connected: ConnectMode::Intercept,
8585
..EventMask::DEFAULT
8686
};
8787
let (tx, mut rx) = EventSender::channel(32, mask);
8888
n0_future::task::spawn(async move {
8989
while let Some(msg) = rx.recv().await {
9090
if let ProviderMessage::ClientConnected(msg) = msg {
91-
let node_id = msg.node_id;
92-
let res = if allowed_nodes.contains(&node_id) {
93-
println!("Client connected: {node_id}");
94-
Ok(())
95-
} else {
96-
println!("Client rejected: {node_id}");
97-
Err(AbortReason::Permission)
91+
let res = match msg.node_id {
92+
Some(node_id) if allowed_nodes.contains(&node_id) => {
93+
println!("Client connected: {node_id}");
94+
Ok(())
95+
}
96+
Some(node_id) => {
97+
println!("Client rejected: {node_id}");
98+
Err(AbortReason::Permission)
99+
}
100+
None => {
101+
println!("Client rejected: no node id");
102+
Err(AbortReason::Permission)
103+
}
98104
};
99105
msg.tx.send(res).await.ok();
100106
}
@@ -108,7 +114,7 @@ fn limit_by_hash(allowed_hashes: HashSet<Hash>) -> EventSender {
108114
// We want to get a request for each get request that we can answer
109115
// with OK or not OK depending on the hash. We do not want detailed
110116
// events once it has been decided to handle a request.
111-
get: RequestMode::Request,
117+
get: RequestMode::Intercept,
112118
..EventMask::DEFAULT
113119
};
114120
let (tx, mut rx) = EventSender::channel(32, mask);
@@ -136,7 +142,7 @@ fn throttle(delay_ms: u64) -> EventSender {
136142
let mask = EventMask {
137143
// We want to get requests for each sent user data blob, so we can add a delay.
138144
// Other than that, we don't need any events.
139-
throttle: ThrottleMode::Throttle,
145+
throttle: ThrottleMode::Intercept,
140146
..EventMask::DEFAULT
141147
};
142148
let (tx, mut rx) = EventSender::channel(32, mask);
@@ -190,40 +196,30 @@ fn limit_max_connections(max_connections: usize) -> EventSender {
190196
// based on the current connection count if we want to accept or reject.
191197
// We also want detailed logging of events for the get request, so we can
192198
// detect when the request is finished one way or another.
193-
get: RequestMode::RequestLog,
199+
connected: ConnectMode::Intercept,
194200
..EventMask::DEFAULT
195201
};
196202
let (tx, mut rx) = EventSender::channel(32, mask);
197203
n0_future::task::spawn(async move {
198204
let requests = ConnectionCounter::new(max_connections);
199205
while let Some(msg) = rx.recv().await {
200-
if let ProviderMessage::GetRequestReceived(mut msg) = msg {
201-
let connection_id = msg.connection_id;
202-
let request_id = msg.request_id;
203-
let res = requests.inc();
204-
match res {
205-
Ok(n) => {
206-
println!("Accepting request {n}, id ({connection_id},{request_id})");
207-
msg.tx.send(Ok(())).await.ok();
208-
}
209-
Err(_) => {
210-
println!(
211-
"Connection limit of {max_connections} exceeded, rejecting request"
212-
);
213-
msg.tx.send(Err(AbortReason::RateLimited)).await.ok();
214-
continue;
215-
}
206+
match msg {
207+
ProviderMessage::ClientConnected(msg) => {
208+
let connection_id = msg.connection_id;
209+
let node_id = msg.node_id;
210+
let res = if let Ok(n) = requests.inc() {
211+
println!("Accepting connection {n}, node_id {node_id:?}, connection_id {connection_id}");
212+
Ok(())
213+
} else {
214+
Err(AbortReason::RateLimited)
215+
};
216+
msg.tx.send(res).await.ok();
216217
}
217-
let requests = requests.clone();
218-
n0_future::task::spawn(async move {
219-
// just drain the per request events
220-
//
221-
// Note that we have requested updates for the request, now we also need to process them
222-
// otherwise the request will be aborted!
223-
while let Ok(Some(_)) = msg.rx.recv().await {}
224-
println!("Stopping request, id ({connection_id},{request_id})");
218+
ProviderMessage::ConnectionClosed(msg) => {
225219
requests.dec();
226-
});
220+
println!("Connection closed, connection_id {}", msg.connection_id,);
221+
}
222+
_ => {}
227223
}
228224
}
229225
});

src/api.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ pub enum ExportBaoError {
9898
#[snafu(display("encode error: {source}"))]
9999
ExportBaoInner { source: bao_tree::io::EncodeError },
100100
#[snafu(display("client error: {source}"))]
101-
Progress { source: ProgressError },
101+
ClientError { source: ProgressError },
102102
}
103103

104104
impl From<ExportBaoError> for Error {
@@ -109,7 +109,7 @@ impl From<ExportBaoError> for Error {
109109
ExportBaoError::Request { source, .. } => Self::Io(source.into()),
110110
ExportBaoError::ExportBaoIo { source, .. } => Self::Io(source),
111111
ExportBaoError::ExportBaoInner { source, .. } => Self::Io(source.into()),
112-
ExportBaoError::Progress { source, .. } => Self::Io(source.into()),
112+
ExportBaoError::ClientError { source, .. } => Self::Io(source.into()),
113113
}
114114
}
115115
}
@@ -157,7 +157,7 @@ impl From<bao_tree::io::EncodeError> for ExportBaoError {
157157

158158
impl From<ProgressError> for ExportBaoError {
159159
fn from(value: ProgressError) -> Self {
160-
ProgressSnafu.into_error(value)
160+
ClientSnafu.into_error(value)
161161
}
162162
}
163163

src/api/blobs.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,13 +142,15 @@ impl Blobs {
142142
/// clears the protections before.
143143
///
144144
/// Users should rely only on garbage collection for blob deletion.
145+
#[cfg(feature = "fs-store")]
145146
pub(crate) async fn delete_with_opts(&self, options: DeleteOptions) -> RequestResult<()> {
146147
trace!("{options:?}");
147148
self.client.rpc(options).await??;
148149
Ok(())
149150
}
150151

151152
/// See [`Self::delete_with_opts`].
153+
#[cfg(feature = "fs-store")]
152154
pub(crate) async fn delete(
153155
&self,
154156
hashes: impl IntoIterator<Item = impl Into<Hash>>,
@@ -501,6 +503,7 @@ impl Blobs {
501503
}
502504
}
503505

506+
#[allow(dead_code)]
504507
pub(crate) async fn clear_protected(&self) -> RequestResult<()> {
505508
let msg = ClearProtectedRequest;
506509
self.client.rpc(msg).await??;

src/api/blobs/reader.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,7 @@ impl tokio::io::AsyncSeek for BlobReader {
214214
}
215215

216216
#[cfg(test)]
217+
#[cfg(feature = "fs-store")]
217218
mod tests {
218219
use bao_tree::ChunkRanges;
219220
use testresult::TestResult;

src/api/downloader.rs

Lines changed: 23 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ pub struct Downloader {
3434
#[rpc_requests(message = SwarmMsg, alias = "Msg")]
3535
#[derive(Debug, Serialize, Deserialize)]
3636
enum SwarmProtocol {
37-
#[rpc(tx = mpsc::Sender<DownloadProgessItem>)]
37+
#[rpc(tx = mpsc::Sender<DownloadProgressItem>)]
3838
Download(DownloadRequest),
3939
}
4040

@@ -46,7 +46,7 @@ struct DownloaderActor {
4646
}
4747

4848
#[derive(Debug, Serialize, Deserialize)]
49-
pub enum DownloadProgessItem {
49+
pub enum DownloadProgressItem {
5050
#[serde(skip)]
5151
Error(anyhow::Error),
5252
TryProvider {
@@ -98,15 +98,15 @@ impl DownloaderActor {
9898
async fn handle_download(store: Store, pool: ConnectionPool, msg: DownloadMsg) {
9999
let DownloadMsg { inner, mut tx, .. } = msg;
100100
if let Err(cause) = handle_download_impl(store, pool, inner, &mut tx).await {
101-
tx.send(DownloadProgessItem::Error(cause)).await.ok();
101+
tx.send(DownloadProgressItem::Error(cause)).await.ok();
102102
}
103103
}
104104

105105
async fn handle_download_impl(
106106
store: Store,
107107
pool: ConnectionPool,
108108
request: DownloadRequest,
109-
tx: &mut mpsc::Sender<DownloadProgessItem>,
109+
tx: &mut mpsc::Sender<DownloadProgressItem>,
110110
) -> anyhow::Result<()> {
111111
match request.strategy {
112112
SplitStrategy::Split => handle_download_split_impl(store, pool, request, tx).await?,
@@ -127,7 +127,7 @@ async fn handle_download_split_impl(
127127
store: Store,
128128
pool: ConnectionPool,
129129
request: DownloadRequest,
130-
tx: &mut mpsc::Sender<DownloadProgessItem>,
130+
tx: &mut mpsc::Sender<DownloadProgressItem>,
131131
) -> anyhow::Result<()> {
132132
let providers = request.providers;
133133
let requests = split_request(&request.request, &providers, &pool, &store, Drain).await?;
@@ -140,7 +140,7 @@ async fn handle_download_split_impl(
140140
let progress_tx = progress_tx.clone();
141141
async move {
142142
let hash = request.hash;
143-
let (tx, rx) = tokio::sync::mpsc::channel::<(usize, DownloadProgessItem)>(16);
143+
let (tx, rx) = tokio::sync::mpsc::channel::<(usize, DownloadProgressItem)>(16);
144144
progress_tx.send(rx).await.ok();
145145
let sink = TokioMpscSenderSink(tx).with_map(move |x| (id, x));
146146
let res = execute_get(&pool, Arc::new(request), &providers, &store, sink).await;
@@ -154,12 +154,12 @@ async fn handle_download_split_impl(
154154
into_stream(progress_rx)
155155
.flat_map(into_stream)
156156
.map(move |(id, item)| match item {
157-
DownloadProgessItem::Progress(offset) => {
157+
DownloadProgressItem::Progress(offset) => {
158158
total += offset;
159159
if let Some(prev) = offsets.insert(id, offset) {
160160
total -= prev;
161161
}
162-
DownloadProgessItem::Progress(total)
162+
DownloadProgressItem::Progress(total)
163163
}
164164
x => x,
165165
})
@@ -174,7 +174,7 @@ async fn handle_download_split_impl(
174174
Some((_hash, Ok(()))) => {
175175
}
176176
Some((_hash, Err(_e))) => {
177-
tx.send(DownloadProgessItem::DownloadError).await?;
177+
tx.send(DownloadProgressItem::DownloadError).await?;
178178
}
179179
None => break,
180180
}
@@ -298,19 +298,19 @@ impl<'de> Deserialize<'de> for DownloadRequest {
298298
pub type DownloadOptions = DownloadRequest;
299299

300300
pub struct DownloadProgress {
301-
fut: future::Boxed<irpc::Result<mpsc::Receiver<DownloadProgessItem>>>,
301+
fut: future::Boxed<irpc::Result<mpsc::Receiver<DownloadProgressItem>>>,
302302
}
303303

304304
impl DownloadProgress {
305-
fn new(fut: future::Boxed<irpc::Result<mpsc::Receiver<DownloadProgessItem>>>) -> Self {
305+
fn new(fut: future::Boxed<irpc::Result<mpsc::Receiver<DownloadProgressItem>>>) -> Self {
306306
Self { fut }
307307
}
308308

309-
pub async fn stream(self) -> irpc::Result<impl Stream<Item = DownloadProgessItem> + Unpin> {
309+
pub async fn stream(self) -> irpc::Result<impl Stream<Item = DownloadProgressItem> + Unpin> {
310310
let rx = self.fut.await?;
311311
Ok(Box::pin(rx.into_stream().map(|item| match item {
312312
Ok(item) => item,
313-
Err(e) => DownloadProgessItem::Error(e.into()),
313+
Err(e) => DownloadProgressItem::Error(e.into()),
314314
})))
315315
}
316316

@@ -320,8 +320,8 @@ impl DownloadProgress {
320320
tokio::pin!(stream);
321321
while let Some(item) = stream.next().await {
322322
match item? {
323-
DownloadProgessItem::Error(e) => Err(e)?,
324-
DownloadProgessItem::DownloadError => anyhow::bail!("Download error"),
323+
DownloadProgressItem::Error(e) => Err(e)?,
324+
DownloadProgressItem::DownloadError => anyhow::bail!("Download error"),
325325
_ => {}
326326
}
327327
}
@@ -372,7 +372,7 @@ async fn split_request<'a>(
372372
providers: &Arc<dyn ContentDiscovery>,
373373
pool: &ConnectionPool,
374374
store: &Store,
375-
progress: impl Sink<DownloadProgessItem, Error = irpc::channel::SendError>,
375+
progress: impl Sink<DownloadProgressItem, Error = irpc::channel::SendError>,
376376
) -> anyhow::Result<Box<dyn Iterator<Item = GetRequest> + Send + 'a>> {
377377
Ok(match request {
378378
FiniteRequest::Get(req) => {
@@ -428,13 +428,13 @@ async fn execute_get(
428428
request: Arc<GetRequest>,
429429
providers: &Arc<dyn ContentDiscovery>,
430430
store: &Store,
431-
mut progress: impl Sink<DownloadProgessItem, Error = irpc::channel::SendError>,
431+
mut progress: impl Sink<DownloadProgressItem, Error = irpc::channel::SendError>,
432432
) -> anyhow::Result<()> {
433433
let remote = store.remote();
434434
let mut providers = providers.find_providers(request.content());
435435
while let Some(provider) = providers.next().await {
436436
progress
437-
.send(DownloadProgessItem::TryProvider {
437+
.send(DownloadProgressItem::TryProvider {
438438
id: provider,
439439
request: request.clone(),
440440
})
@@ -447,7 +447,7 @@ async fn execute_get(
447447
let local_bytes = local.local_bytes();
448448
let Ok(conn) = conn.await else {
449449
progress
450-
.send(DownloadProgessItem::ProviderFailed {
450+
.send(DownloadProgressItem::ProviderFailed {
451451
id: provider,
452452
request: request.clone(),
453453
})
@@ -458,21 +458,21 @@ async fn execute_get(
458458
.execute_get_sink(
459459
conn.clone(),
460460
local.missing(),
461-
(&mut progress).with_map(move |x| DownloadProgessItem::Progress(x + local_bytes)),
461+
(&mut progress).with_map(move |x| DownloadProgressItem::Progress(x + local_bytes)),
462462
)
463463
.await
464464
{
465465
Ok(_stats) => {
466466
progress
467-
.send(DownloadProgessItem::PartComplete {
467+
.send(DownloadProgressItem::PartComplete {
468468
request: request.clone(),
469469
})
470470
.await?;
471471
return Ok(());
472472
}
473473
Err(_cause) => {
474474
progress
475-
.send(DownloadProgessItem::ProviderFailed {
475+
.send(DownloadProgressItem::ProviderFailed {
476476
id: provider,
477477
request: request.clone(),
478478
})
@@ -521,6 +521,7 @@ impl ContentDiscovery for Shuffled {
521521
}
522522

523523
#[cfg(test)]
524+
#[cfg(feature = "fs-store")]
524525
mod tests {
525526
use std::ops::Deref;
526527

0 commit comments

Comments
 (0)