Skip to content
5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ futures-util = "0.3.30"
testdir = "0.9.1"

[features]
default = ["fs-store", "net_protocol"]
default = ["fs-store", "net_protocol", "formats-collection"]
downloader = ["dep:parking_lot", "tokio-util/time", "dep:hashlink"]
net_protocol = ["downloader", "dep:futures-util"]
fs-store = ["dep:reflink-copy", "redb", "dep:tempfile"]
Expand All @@ -112,6 +112,8 @@ rpc = [
"dep:walkdir",
"downloader",
]
formats = []
formats-collection = ["formats"]

example-iroh = [
"dep:clap",
Expand All @@ -127,6 +129,7 @@ rustdoc-args = ["--cfg", "iroh_docsrs"]

[[example]]
name = "provide-bytes"
required-features = ["formats-collection"]

[[example]]
name = "fetch-fsm"
Expand Down
5 changes: 4 additions & 1 deletion examples/local-swarm-discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,10 @@ mod progress {
ProgressStyle,
};
use iroh_blobs::{
get::{db::DownloadProgress, progress::BlobProgress, Stats},
get::{
progress::{BlobProgress, DownloadProgress},
Stats,
},
Hash,
};

Expand Down
9 changes: 6 additions & 3 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@ use iroh::{NodeAddr, PublicKey, RelayUrl};
use tokio::io::AsyncWriteExt;

use crate::{
get::{db::DownloadProgress, progress::BlobProgress, Stats},
net_protocol::DownloadMode,
get::{
progress::{BlobProgress, DownloadProgress},
Stats,
},
provider::AddProgress,
rpc::client::blobs::{
self, BlobInfo, BlobStatus, CollectionInfo, DownloadOptions, IncompleteBlobInfo, WrapOption,
self, BlobInfo, BlobStatus, CollectionInfo, DownloadMode, DownloadOptions,
IncompleteBlobInfo, WrapOption,
},
store::{ConsistencyCheckProgress, ExportFormat, ExportMode, ReportLevel, ValidateProgress},
ticket::BlobTicket,
Expand Down
2 changes: 1 addition & 1 deletion src/downloader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ use tokio_util::{either::Either, sync::CancellationToken, time::delay_queue};
use tracing::{debug, error, error_span, trace, warn, Instrument};

use crate::{
get::{db::DownloadProgress, Stats},
get::{progress::DownloadProgress, Stats},
metrics::Metrics,
store::Store,
util::{local_pool::LocalPoolHandle, progress::ProgressSender},
Expand Down
34 changes: 16 additions & 18 deletions src/downloader/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,20 @@ use iroh::endpoint;

use super::{progress::BroadcastProgressSender, DownloadKind, FailureAction, GetStartFut, Getter};
use crate::{
get::{db::get_to_db_in_steps, error::GetError},
store::Store,
get::Error,
store::{get_to_db_in_steps, FetchState, FetchStateNeedsConn, Store},
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so there is get and fetch now? this is confusing 😅

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

everything that needs a store is in the store module. I might even make it a fn on the store traits (see the OO or not OO question)

One reason for this is that I think it would be neat if you could hide the entire store behind a ff, but to do that you must make sure that store stuff is not all over the place, otherwise you will go crazy.

};

impl From<GetError> for FailureAction {
fn from(e: GetError) -> Self {
impl From<Error> for FailureAction {
fn from(e: Error) -> Self {
match e {
e @ GetError::NotFound(_) => FailureAction::AbortRequest(e.into()),
e @ GetError::RemoteReset(_) => FailureAction::RetryLater(e.into()),
e @ GetError::NoncompliantNode(_) => FailureAction::DropPeer(e.into()),
e @ GetError::Io(_) => FailureAction::RetryLater(e.into()),
e @ GetError::BadRequest(_) => FailureAction::AbortRequest(e.into()),
e @ Error::NotFound(_) => FailureAction::AbortRequest(e.into()),
e @ Error::RemoteReset(_) => FailureAction::RetryLater(e.into()),
e @ Error::NoncompliantNode(_) => FailureAction::DropPeer(e.into()),
e @ Error::Io(_) => FailureAction::RetryLater(e.into()),
e @ Error::BadRequest(_) => FailureAction::AbortRequest(e.into()),
// TODO: what do we want to do on local failures?
e @ GetError::LocalFailure(_) => FailureAction::AbortRequest(e.into()),
e @ Error::LocalFailure(_) => FailureAction::AbortRequest(e.into()),
}
}
}
Expand All @@ -34,7 +34,7 @@ pub(crate) struct IoGetter<S: Store> {

impl<S: Store> Getter for IoGetter<S> {
type Connection = endpoint::Connection;
type NeedsConn = crate::get::db::GetStateNeedsConn;
type NeedsConn = FetchStateNeedsConn;

fn get(
&mut self,
Expand All @@ -45,10 +45,8 @@ impl<S: Store> Getter for IoGetter<S> {
async move {
match get_to_db_in_steps(store, kind.hash_and_format(), progress_sender).await {
Err(err) => Err(err.into()),
Ok(crate::get::db::GetState::Complete(stats)) => {
Ok(super::GetOutput::Complete(stats))
}
Ok(crate::get::db::GetState::NeedsConn(needs_conn)) => {
Ok(FetchState::Complete(stats)) => Ok(super::GetOutput::Complete(stats)),
Ok(FetchState::NeedsConn(needs_conn)) => {
Ok(super::GetOutput::NeedsConn(needs_conn))
}
}
Expand All @@ -57,7 +55,7 @@ impl<S: Store> Getter for IoGetter<S> {
}
}

impl super::NeedsConn<endpoint::Connection> for crate::get::db::GetStateNeedsConn {
impl super::NeedsConn<endpoint::Connection> for FetchStateNeedsConn {
fn proceed(self, conn: endpoint::Connection) -> super::GetProceedFut {
async move {
let res = self.proceed(conn).await;
Expand All @@ -73,7 +71,7 @@ impl super::NeedsConn<endpoint::Connection> for crate::get::db::GetStateNeedsCon
}

#[cfg(feature = "metrics")]
fn track_metrics(res: &Result<crate::get::Stats, GetError>) {
fn track_metrics(res: &Result<crate::get::Stats, Error>) {
use iroh_metrics::{inc, inc_by};

use crate::metrics::Metrics;
Expand All @@ -90,7 +88,7 @@ fn track_metrics(res: &Result<crate::get::Stats, GetError>) {
inc_by!(Metrics, download_time_total, elapsed.as_millis() as u64);
}
Err(e) => match &e {
GetError::NotFound(_) => inc!(Metrics, downloads_notfound),
Error::NotFound(_) => inc!(Metrics, downloads_notfound),
_ => inc!(Metrics, downloads_error),
},
}
Expand Down
4 changes: 2 additions & 2 deletions src/downloader/progress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use parking_lot::Mutex;

use super::DownloadKind;
use crate::{
get::{db::DownloadProgress, progress::TransferState},
get::progress::{DownloadProgress, TransferState},
util::progress::{AsyncChannelProgressSender, IdGenerator, ProgressSendError, ProgressSender},
};

Expand All @@ -21,7 +21,7 @@ pub type ProgressSubscriber = AsyncChannelProgressSender<DownloadProgress>;
/// Track the progress of downloads.
///
/// This struct allows to create [`ProgressSender`] structs to be passed to
/// [`crate::get::db::get_to_db`]. Each progress sender can be subscribed to by any number of
/// [`crate::store::get_to_db`]. Each progress sender can be subscribed to by any number of
/// [`ProgressSubscriber`] channel senders, which will receive each progress update (if they have
/// capacity). Additionally, the [`ProgressTracker`] maintains a [`TransferState`] for each
/// transfer, applying each progress update to update this state. When subscribing to an already
Expand Down
5 changes: 1 addition & 4 deletions src/downloader/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,7 @@ use iroh::SecretKey;

use super::*;
use crate::{
get::{
db::BlobId,
progress::{BlobProgress, TransferState},
},
get::progress::{BlobId, BlobProgress, TransferState},
util::{
local_pool::LocalPool,
progress::{AsyncChannelProgressSender, IdGenerator},
Expand Down
10 changes: 5 additions & 5 deletions src/downloader/test/getter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,14 @@ impl Getter for TestingGetter {
// since for testing we don't need a real connection, just keep track of what peer is the
// request being sent to
type Connection = NodeId;
type NeedsConn = GetStateNeedsConn;
type NeedsConn = FetchStateNeedsConn;

fn get(
&mut self,
kind: DownloadKind,
progress_sender: BroadcastProgressSender,
) -> GetStartFut<Self::NeedsConn> {
std::future::ready(Ok(downloader::GetOutput::NeedsConn(GetStateNeedsConn(
std::future::ready(Ok(downloader::GetOutput::NeedsConn(FetchStateNeedsConn(
self.clone(),
kind,
progress_sender,
Expand All @@ -53,11 +53,11 @@ impl Getter for TestingGetter {
}

#[derive(Debug)]
pub(super) struct GetStateNeedsConn(TestingGetter, DownloadKind, BroadcastProgressSender);
pub(super) struct FetchStateNeedsConn(TestingGetter, DownloadKind, BroadcastProgressSender);

impl downloader::NeedsConn<NodeId> for GetStateNeedsConn {
impl downloader::NeedsConn<NodeId> for FetchStateNeedsConn {
fn proceed(self, peer: NodeId) -> super::GetProceedFut {
let GetStateNeedsConn(getter, kind, progress_sender) = self;
let FetchStateNeedsConn(getter, kind, progress_sender) = self;
let mut inner = getter.0.write();
inner.request_history.push((kind, peer));
let request_duration = inner.request_duration;
Expand Down
1 change: 1 addition & 0 deletions src/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,5 @@
//! n-1 items, where n is the number of blobs in the HashSeq.
//!
//! [postcard]: https://docs.rs/postcard/latest/postcard/
#[cfg(feature = "formats-collection")]
pub mod collection;
71 changes: 5 additions & 66 deletions src/get.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! The client side API
//!
//! To get data, create a connection using [iroh-net] or use any quinn
//! To get data, create a connection using [iroh] or use any [iroh-quinn]
//! connection that was obtained in another way.
//!
//! Create a request describing the data you want to get.
Expand All @@ -11,9 +11,9 @@
//! For some states you have to provide additional arguments when calling next,
//! or you can choose to finish early.
//!
//! [iroh-net]: https://docs.rs/iroh-net
//! [iroh-net]: https://docs.rs/iroh
//! [iroh-quinn]: https://docs.rs/iroh-quinn
use std::{
error::Error,
fmt::{self, Debug},
time::{Duration, Instant},
};
Expand All @@ -30,8 +30,8 @@ use crate::{
Hash, IROH_BLOCK_SIZE,
};

pub mod db;
pub mod error;
mod error;
pub use error::Error;
pub mod progress;
pub mod request;

Expand Down Expand Up @@ -882,64 +882,3 @@ pub mod fsm {
ranges_iter: RangesIter,
}
}

/// Error when processing a response
#[derive(thiserror::Error, Debug)]
pub enum GetResponseError {
/// Error when opening a stream
#[error("connection: {0}")]
Connection(#[from] endpoint::ConnectionError),
/// Error when writing the handshake or request to the stream
#[error("write: {0}")]
Write(#[from] endpoint::WriteError),
/// Error when reading from the stream
#[error("read: {0}")]
Read(#[from] endpoint::ReadError),
/// Error when decoding, e.g. hash mismatch
#[error("decode: {0}")]
Decode(bao_tree::io::DecodeError),
/// A generic error
#[error("generic: {0}")]
Generic(anyhow::Error),
}

impl From<postcard::Error> for GetResponseError {
fn from(cause: postcard::Error) -> Self {
Self::Generic(cause.into())
}
}

impl From<bao_tree::io::DecodeError> for GetResponseError {
fn from(cause: bao_tree::io::DecodeError) -> Self {
match cause {
bao_tree::io::DecodeError::Io(cause) => {
// try to downcast to specific quinn errors
if let Some(source) = cause.source() {
if let Some(error) = source.downcast_ref::<endpoint::ConnectionError>() {
return Self::Connection(error.clone());
}
if let Some(error) = source.downcast_ref::<endpoint::ReadError>() {
return Self::Read(error.clone());
}
if let Some(error) = source.downcast_ref::<endpoint::WriteError>() {
return Self::Write(error.clone());
}
}
Self::Generic(cause.into())
}
_ => Self::Decode(cause),
}
}
}

impl From<anyhow::Error> for GetResponseError {
fn from(cause: anyhow::Error) -> Self {
Self::Generic(cause)
}
}

impl From<GetResponseError> for std::io::Error {
fn from(cause: GetResponseError) -> Self {
Self::new(std::io::ErrorKind::Other, cause)
}
}
Loading
Loading