Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
/target
iroh.config.toml
.vscode/*
103 changes: 101 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ postcard = { version = "1", default-features = false, features = [
"use-std",
"experimental-derive",
] }
quic-rpc = { version = "0.17", optional = true }
quic-rpc = { version = "0.17.1", optional = true }
quic-rpc-derive = { version = "0.17", optional = true }
quinn = { package = "iroh-quinn", version = "0.12", features = ["ring"] }
rand = "0.8"
Expand Down Expand Up @@ -117,6 +117,7 @@ example-iroh = [
"dep:console",
"iroh/discovery-local-network"
]
test = ["quic-rpc/quinn-transport"]

[package.metadata.docs.rs]
all-features = true
Expand Down
16 changes: 11 additions & 5 deletions src/rpc/client/blobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,10 +268,10 @@ where
}
});
tokio::spawn(async move {
// TODO: Is it important to catch this error? It should also result in an error on the
// response stream. If we deem it important, we could one-shot send it into the
// BlobAddProgress and return from there. Not sure.
if let Err(err) = sink.send_all(&mut input).await {
// if we get an error in send_all due to the connection being closed, this will just fail again.
// if we get an error due to something else (serialization or size limit), tell the remote to abort.
sink.send(AddStreamUpdate::Abort).await.ok();
warn!("Failed to send input stream to remote: {err:?}");
}
});
Expand All @@ -281,7 +281,7 @@ where

/// Write a blob by passing bytes.
pub async fn add_bytes(&self, bytes: impl Into<Bytes>) -> anyhow::Result<AddOutcome> {
let input = futures_lite::stream::once(Ok(bytes.into()));
let input = chunked_bytes_stream(bytes.into(), 1024 * 64).map(Ok);
self.add_stream(input, SetTagOption::Auto).await?.await
}

Expand All @@ -291,7 +291,7 @@ where
bytes: impl Into<Bytes>,
name: impl Into<Tag>,
) -> anyhow::Result<AddOutcome> {
let input = futures_lite::stream::once(Ok(bytes.into()));
let input = chunked_bytes_stream(bytes.into(), 1024 * 64).map(Ok);
self.add_stream(input, SetTagOption::Named(name.into()))
.await?
.await
Expand Down Expand Up @@ -987,6 +987,12 @@ pub struct DownloadOptions {
pub mode: DownloadMode,
}

fn chunked_bytes_stream(mut b: Bytes, c: usize) -> impl Stream<Item = Bytes> {
futures_lite::stream::iter(std::iter::from_fn(move || {
Some(b.split_to(b.len().min(c))).filter(|x| !x.is_empty())
}))
}

#[cfg(test)]
mod tests {
use std::{path::Path, time::Duration};
Expand Down
Loading
Loading