Skip to content
5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ futures-util = "0.3.30"
testdir = "0.9.1"

[features]
default = ["fs-store", "net_protocol"]
default = ["fs-store", "net_protocol", "formats-collection"]
downloader = ["dep:parking_lot", "tokio-util/time", "dep:hashlink"]
net_protocol = ["downloader", "dep:futures-util"]
fs-store = ["dep:reflink-copy", "redb", "dep:tempfile"]
Expand All @@ -112,6 +112,8 @@ rpc = [
"dep:walkdir",
"downloader",
]
formats = []
formats-collection = ["formats"]

example-iroh = [
"dep:clap",
Expand All @@ -127,6 +129,7 @@ rustdoc-args = ["--cfg", "iroh_docsrs"]

[[example]]
name = "provide-bytes"
required-features = ["formats-collection"]

[[example]]
name = "fetch-fsm"
Expand Down
4 changes: 2 additions & 2 deletions examples/hello-world-fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,13 @@ async fn main() -> Result<()> {
"'Hello World' example expects to fetch a single blob, but the ticket indicates a collection.",
);

// `download` returns a stream of `DownloadProgress` events. You can iterate through these updates to get progress
// `download` returns a stream of `DownloadProgressEvent`. You can iterate through these updates to get progress
// on the state of your download.
let download_stream = blobs_client
.download(ticket.hash(), ticket.node_addr().clone())
.await?;

// You can also just `await` the stream, which will poll the `DownloadProgress` stream for you.
// You can also just `await` the stream, which will poll the `DownloadProgressEvent` stream for you.
let outcome = download_stream.await.context("unable to download hash")?;

println!(
Expand Down
29 changes: 15 additions & 14 deletions examples/local-swarm-discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,13 +140,14 @@ mod progress {
ProgressStyle,
};
use iroh_blobs::{
get::{db::DownloadProgress, progress::BlobProgress, Stats},
get::Stats,
rpc::client::blobs::{BlobProgressEvent, DownloadProgressEvent},
Hash,
};

pub async fn show_download_progress(
hash: Hash,
mut stream: impl Stream<Item = Result<DownloadProgress>> + Unpin,
mut stream: impl Stream<Item = Result<DownloadProgressEvent>> + Unpin,
) -> Result<()> {
eprintln!("Fetching: {}", hash);
let mp = MultiProgress::new();
Expand All @@ -157,7 +158,7 @@ mod progress {
let mut seq = false;
while let Some(x) = stream.next().await {
match x? {
DownloadProgress::InitialState(state) => {
DownloadProgressEvent::InitialState(state) => {
if state.connected {
op.set_message(format!("{} Requesting ...\n", style("[2/3]").bold().dim()));
}
Expand All @@ -177,21 +178,21 @@ mod progress {
ip.set_length(size.value());
ip.reset();
match blob.progress {
BlobProgress::Pending => {}
BlobProgress::Progressing(offset) => ip.set_position(offset),
BlobProgress::Done => ip.finish_and_clear(),
BlobProgressEvent::Pending => {}
BlobProgressEvent::Progressing(offset) => ip.set_position(offset),
BlobProgressEvent::Done => ip.finish_and_clear(),
}
if !seq {
op.finish_and_clear();
}
}
}
}
DownloadProgress::FoundLocal { .. } => {}
DownloadProgress::Connected => {
DownloadProgressEvent::FoundLocal { .. } => {}
DownloadProgressEvent::Connected => {
op.set_message(format!("{} Requesting ...\n", style("[2/3]").bold().dim()));
}
DownloadProgress::FoundHashSeq { children, .. } => {
DownloadProgressEvent::FoundHashSeq { children, .. } => {
op.set_message(format!(
"{} Downloading {} blob(s)\n",
style("[3/3]").bold().dim(),
Expand All @@ -201,7 +202,7 @@ mod progress {
op.reset();
seq = true;
}
DownloadProgress::Found { size, child, .. } => {
DownloadProgressEvent::Found { size, child, .. } => {
if seq {
op.set_position(child.into());
} else {
Expand All @@ -210,13 +211,13 @@ mod progress {
ip.set_length(size);
ip.reset();
}
DownloadProgress::Progress { offset, .. } => {
DownloadProgressEvent::Progress { offset, .. } => {
ip.set_position(offset);
}
DownloadProgress::Done { .. } => {
DownloadProgressEvent::Done { .. } => {
ip.finish_and_clear();
}
DownloadProgress::AllDone(Stats {
DownloadProgressEvent::AllDone(Stats {
bytes_read,
elapsed,
..
Expand All @@ -230,7 +231,7 @@ mod progress {
);
break;
}
DownloadProgress::Abort(e) => {
DownloadProgressEvent::Abort(e) => {
bail!("download aborted: {}", e);
}
}
Expand Down
47 changes: 23 additions & 24 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,10 @@ use iroh::{NodeAddr, PublicKey, RelayUrl};
use tokio::io::AsyncWriteExt;

use crate::{
get::{db::DownloadProgress, progress::BlobProgress, Stats},
net_protocol::DownloadMode,
provider::AddProgress,
get::Stats,
rpc::client::blobs::{
self, BlobInfo, BlobStatus, CollectionInfo, DownloadOptions, IncompleteBlobInfo, WrapOption,
self, AddProgressEvent, BlobInfo, BlobProgressEvent, BlobStatus, CollectionInfo,
DownloadMode, DownloadOptions, DownloadProgressEvent, IncompleteBlobInfo, WrapOption,
},
store::{ConsistencyCheckProgress, ExportFormat, ExportMode, ReportLevel, ValidateProgress},
ticket::BlobTicket,
Expand Down Expand Up @@ -895,29 +894,29 @@ pub struct ProvideResponseEntry {
pub hash: Hash,
}

/// Combines the [`AddProgress`] outputs from a [`Stream`] into a single tuple.
/// Combines the [`AddProgressEvent`] outputs from a [`Stream`] into a single tuple.
pub async fn aggregate_add_response(
mut stream: impl Stream<Item = Result<AddProgress>> + Unpin,
mut stream: impl Stream<Item = Result<AddProgressEvent>> + Unpin,
) -> Result<(Hash, BlobFormat, Vec<ProvideResponseEntry>)> {
let mut hash_and_format = None;
let mut collections = BTreeMap::<u64, (String, u64, Option<Hash>)>::new();
let mut mp = Some(ProvideProgressState::new());
while let Some(item) = stream.next().await {
match item? {
AddProgress::Found { name, id, size } => {
AddProgressEvent::Found { name, id, size } => {
tracing::trace!("Found({id},{name},{size})");
if let Some(mp) = mp.as_mut() {
mp.found(name.clone(), id, size);
}
collections.insert(id, (name, size, None));
}
AddProgress::Progress { id, offset } => {
AddProgressEvent::Progress { id, offset } => {
tracing::trace!("Progress({id}, {offset})");
if let Some(mp) = mp.as_mut() {
mp.progress(id, offset);
}
}
AddProgress::Done { hash, id } => {
AddProgressEvent::Done { hash, id } => {
tracing::trace!("Done({id},{hash:?})");
if let Some(mp) = mp.as_mut() {
mp.done(id, hash);
Expand All @@ -931,15 +930,15 @@ pub async fn aggregate_add_response(
}
}
}
AddProgress::AllDone { hash, format, .. } => {
AddProgressEvent::AllDone { hash, format, .. } => {
tracing::trace!("AllDone({hash:?})");
if let Some(mp) = mp.take() {
mp.all_done();
}
hash_and_format = Some(HashAndFormat { hash, format });
break;
}
AddProgress::Abort(e) => {
AddProgressEvent::Abort(e) => {
if let Some(mp) = mp.take() {
mp.error();
}
Expand Down Expand Up @@ -1032,7 +1031,7 @@ impl ProvideProgressState {
/// Displays the download progress for a given stream.
pub async fn show_download_progress(
hash: Hash,
mut stream: impl Stream<Item = Result<DownloadProgress>> + Unpin,
mut stream: impl Stream<Item = Result<DownloadProgressEvent>> + Unpin,
) -> Result<()> {
eprintln!("Fetching: {}", hash);
let mp = MultiProgress::new();
Expand All @@ -1043,7 +1042,7 @@ pub async fn show_download_progress(
let mut seq = false;
while let Some(x) = stream.next().await {
match x? {
DownloadProgress::InitialState(state) => {
DownloadProgressEvent::InitialState(state) => {
if state.connected {
op.set_message(format!("{} Requesting ...\n", style("[2/3]").bold().dim()));
}
Expand All @@ -1063,21 +1062,21 @@ pub async fn show_download_progress(
ip.set_length(size.value());
ip.reset();
match blob.progress {
BlobProgress::Pending => {}
BlobProgress::Progressing(offset) => ip.set_position(offset),
BlobProgress::Done => ip.finish_and_clear(),
BlobProgressEvent::Pending => {}
BlobProgressEvent::Progressing(offset) => ip.set_position(offset),
BlobProgressEvent::Done => ip.finish_and_clear(),
}
if !seq {
op.finish_and_clear();
}
}
}
}
DownloadProgress::FoundLocal { .. } => {}
DownloadProgress::Connected => {
DownloadProgressEvent::FoundLocal { .. } => {}
DownloadProgressEvent::Connected => {
op.set_message(format!("{} Requesting ...\n", style("[2/3]").bold().dim()));
}
DownloadProgress::FoundHashSeq { children, .. } => {
DownloadProgressEvent::FoundHashSeq { children, .. } => {
op.set_message(format!(
"{} Downloading {} blob(s)\n",
style("[3/3]").bold().dim(),
Expand All @@ -1087,7 +1086,7 @@ pub async fn show_download_progress(
op.reset();
seq = true;
}
DownloadProgress::Found { size, child, .. } => {
DownloadProgressEvent::Found { size, child, .. } => {
if seq {
op.set_position(child.into());
} else {
Expand All @@ -1096,13 +1095,13 @@ pub async fn show_download_progress(
ip.set_length(size);
ip.reset();
}
DownloadProgress::Progress { offset, .. } => {
DownloadProgressEvent::Progress { offset, .. } => {
ip.set_position(offset);
}
DownloadProgress::Done { .. } => {
DownloadProgressEvent::Done { .. } => {
ip.finish_and_clear();
}
DownloadProgress::AllDone(Stats {
DownloadProgressEvent::AllDone(Stats {
bytes_read,
elapsed,
..
Expand All @@ -1116,7 +1115,7 @@ pub async fn show_download_progress(
);
break;
}
DownloadProgress::Abort(e) => {
DownloadProgressEvent::Abort(e) => {
bail!("download aborted: {}", e);
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/downloader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ use tokio_util::{either::Either, sync::CancellationToken, time::delay_queue};
use tracing::{debug, error, error_span, trace, warn, Instrument};

use crate::{
get::{db::DownloadProgress, Stats},
get::{progress::DownloadProgressEvent, Stats},
metrics::Metrics,
store::Store,
util::{local_pool::LocalPoolHandle, progress::ProgressSender},
Expand Down Expand Up @@ -797,7 +797,7 @@ impl<G: Getter<Connection = D::Connection>, D: DialerT> Service<G, D> {
if let Some(sender) = handlers.on_progress {
self.progress_tracker.unsubscribe(&kind, &sender);
sender
.send(DownloadProgress::Abort(serde_error::Error::new(
.send(DownloadProgressEvent::Abort(serde_error::Error::new(
&*anyhow::Error::from(DownloadError::Cancelled),
)))
.await
Expand Down
34 changes: 16 additions & 18 deletions src/downloader/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,20 @@ use iroh::endpoint;

use super::{progress::BroadcastProgressSender, DownloadKind, FailureAction, GetStartFut, Getter};
use crate::{
get::{db::get_to_db_in_steps, error::GetError},
store::Store,
get::Error,
store::{get_to_db_in_steps, FetchState, FetchStateNeedsConn, Store},
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so there is get and fetch now? this is confusing 😅

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

everything that needs a store is in the store module. I might even make it a fn on the store traits (see the OO or not OO question)

One reason for this is that I think it would be neat if you could hide the entire store behind a ff, but to do that you must make sure that store stuff is not all over the place, otherwise you will go crazy.

};

impl From<GetError> for FailureAction {
fn from(e: GetError) -> Self {
impl From<Error> for FailureAction {
fn from(e: Error) -> Self {
match e {
e @ GetError::NotFound(_) => FailureAction::AbortRequest(e.into()),
e @ GetError::RemoteReset(_) => FailureAction::RetryLater(e.into()),
e @ GetError::NoncompliantNode(_) => FailureAction::DropPeer(e.into()),
e @ GetError::Io(_) => FailureAction::RetryLater(e.into()),
e @ GetError::BadRequest(_) => FailureAction::AbortRequest(e.into()),
e @ Error::NotFound(_) => FailureAction::AbortRequest(e.into()),
e @ Error::RemoteReset(_) => FailureAction::RetryLater(e.into()),
e @ Error::NoncompliantNode(_) => FailureAction::DropPeer(e.into()),
e @ Error::Io(_) => FailureAction::RetryLater(e.into()),
e @ Error::BadRequest(_) => FailureAction::AbortRequest(e.into()),
// TODO: what do we want to do on local failures?
e @ GetError::LocalFailure(_) => FailureAction::AbortRequest(e.into()),
e @ Error::LocalFailure(_) => FailureAction::AbortRequest(e.into()),
}
}
}
Expand All @@ -34,7 +34,7 @@ pub(crate) struct IoGetter<S: Store> {

impl<S: Store> Getter for IoGetter<S> {
type Connection = endpoint::Connection;
type NeedsConn = crate::get::db::GetStateNeedsConn;
type NeedsConn = FetchStateNeedsConn;

fn get(
&mut self,
Expand All @@ -45,10 +45,8 @@ impl<S: Store> Getter for IoGetter<S> {
async move {
match get_to_db_in_steps(store, kind.hash_and_format(), progress_sender).await {
Err(err) => Err(err.into()),
Ok(crate::get::db::GetState::Complete(stats)) => {
Ok(super::GetOutput::Complete(stats))
}
Ok(crate::get::db::GetState::NeedsConn(needs_conn)) => {
Ok(FetchState::Complete(stats)) => Ok(super::GetOutput::Complete(stats)),
Ok(FetchState::NeedsConn(needs_conn)) => {
Ok(super::GetOutput::NeedsConn(needs_conn))
}
}
Expand All @@ -57,7 +55,7 @@ impl<S: Store> Getter for IoGetter<S> {
}
}

impl super::NeedsConn<endpoint::Connection> for crate::get::db::GetStateNeedsConn {
impl super::NeedsConn<endpoint::Connection> for FetchStateNeedsConn {
fn proceed(self, conn: endpoint::Connection) -> super::GetProceedFut {
async move {
let res = self.proceed(conn).await;
Expand All @@ -73,7 +71,7 @@ impl super::NeedsConn<endpoint::Connection> for crate::get::db::GetStateNeedsCon
}

#[cfg(feature = "metrics")]
fn track_metrics(res: &Result<crate::get::Stats, GetError>) {
fn track_metrics(res: &Result<crate::get::Stats, Error>) {
use iroh_metrics::{inc, inc_by};

use crate::metrics::Metrics;
Expand All @@ -90,7 +88,7 @@ fn track_metrics(res: &Result<crate::get::Stats, GetError>) {
inc_by!(Metrics, download_time_total, elapsed.as_millis() as u64);
}
Err(e) => match &e {
GetError::NotFound(_) => inc!(Metrics, downloads_notfound),
Error::NotFound(_) => inc!(Metrics, downloads_notfound),
_ => inc!(Metrics, downloads_error),
},
}
Expand Down
Loading
Loading