Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/api/blobs/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,14 +221,14 @@ mod tests {

use super::*;
use crate::{
protocol::ChunkRangesExt,
store::{
fs::{
tests::{create_n0_bao, test_data, INTERESTING_SIZES},
FsStore,
},
mem::MemStore,
},
util::ChunkRangesExt,
};

async fn reader_smoke(blobs: &Blobs) -> TestResult<()> {
Expand Down
2 changes: 1 addition & 1 deletion src/api/downloader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ async fn execute_get(
request: request.clone(),
})
.await?;
let conn = pool.connect(provider);
let conn = pool.get_or_connect(provider);
let local = remote.local_for_request(request.clone()).await?;
if local.is_complete() {
return Ok(());
Expand Down
3 changes: 1 addition & 2 deletions src/api/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1067,7 +1067,7 @@ mod tests {

use crate::{
api::blobs::Blobs,
protocol::{ChunkRangesSeq, GetRequest},
protocol::{ChunkRangesExt, ChunkRangesSeq, GetRequest},
store::{
fs::{
tests::{create_n0_bao, test_data, INTERESTING_SIZES},
Expand All @@ -1076,7 +1076,6 @@ mod tests {
mem::MemStore,
},
tests::{add_test_hash_seq, add_test_hash_seq_incomplete},
util::ChunkRangesExt,
};

#[tokio::test]
Expand Down
3 changes: 1 addition & 2 deletions src/get/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ use super::{fsm, GetError, GetResult, Stats};
use crate::{
get::error::{BadRequestSnafu, LocalFailureSnafu},
hashseq::HashSeq,
protocol::{ChunkRangesSeq, GetRequest},
util::ChunkRangesExt,
protocol::{ChunkRangesExt, ChunkRangesSeq, GetRequest},
Hash, HashAndFormat,
};

Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ pub mod ticket;

#[doc(hidden)]
pub mod test;
mod util;
pub mod util;

#[cfg(test)]
mod tests;
Expand Down
77 changes: 74 additions & 3 deletions src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -373,21 +373,25 @@
//! a large existing system that has demonstrated performance issues.
//!
//! If in doubt, just use multiple requests and multiple connections.
use std::io;
use std::{
io,
ops::{Bound, RangeBounds},
};

use bao_tree::{io::round_up_to_chunks, ChunkNum};
use builder::GetRequestBuilder;
use derive_more::From;
use iroh::endpoint::VarInt;
use irpc::util::AsyncReadVarintExt;
use postcard::experimental::max_size::MaxSize;
use range_collections::{range_set::RangeSetEntry, RangeSet2};
use serde::{Deserialize, Serialize};
mod range_spec;
pub use bao_tree::ChunkRanges;
pub use range_spec::{ChunkRangesSeq, NonEmptyRequestRangeSpecIter, RangeSpec};
use snafu::{GenerateImplicitData, Snafu};
use tokio::io::AsyncReadExt;

pub use crate::util::ChunkRangesExt;
use crate::{api::blobs::Bitfield, provider::CountingReader, BlobFormat, Hash, HashAndFormat};

/// Maximum message size is limited to 100MiB for now.
Expand Down Expand Up @@ -714,6 +718,73 @@ impl TryFrom<VarInt> for Closed {
}
}

pub trait ChunkRangesExt {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this thing should move to bao-tree at some point!

fn last_chunk() -> Self;
fn chunk(offset: u64) -> Self;
fn bytes(ranges: impl RangeBounds<u64>) -> Self;
fn chunks(ranges: impl RangeBounds<u64>) -> Self;
fn offset(offset: u64) -> Self;
}

impl ChunkRangesExt for ChunkRanges {
fn last_chunk() -> Self {
ChunkRanges::from(ChunkNum(u64::MAX)..)
}

/// Create a chunk range that contains a single chunk.
fn chunk(offset: u64) -> Self {
ChunkRanges::from(ChunkNum(offset)..ChunkNum(offset + 1))
}

/// Create a range of chunks that contains the given byte ranges.
/// The byte ranges are rounded up to the nearest chunk size.
fn bytes(ranges: impl RangeBounds<u64>) -> Self {
round_up_to_chunks(&bounds_from_range(ranges, |v| v))
}

/// Create a range of chunks from u64 chunk bounds.
///
/// This is equivalent but more convenient than using the ChunkNum newtype.
fn chunks(ranges: impl RangeBounds<u64>) -> Self {
bounds_from_range(ranges, ChunkNum)
}

/// Create a chunk range that contains a single byte offset.
fn offset(offset: u64) -> Self {
Self::bytes(offset..offset + 1)
}
}

// todo: move to range_collections
pub(crate) fn bounds_from_range<R, T, F>(range: R, f: F) -> RangeSet2<T>
where
R: RangeBounds<u64>,
T: RangeSetEntry,
F: Fn(u64) -> T,
{
let from = match range.start_bound() {
Bound::Included(start) => Some(*start),
Bound::Excluded(start) => {
let Some(start) = start.checked_add(1) else {
return RangeSet2::empty();
};
Some(start)
}
Bound::Unbounded => None,
};
let to = match range.end_bound() {
Bound::Included(end) => end.checked_add(1),
Bound::Excluded(end) => Some(*end),
Bound::Unbounded => None,
};
match (from, to) {
(Some(from), Some(to)) => RangeSet2::from(f(from)..f(to)),
(Some(from), None) => RangeSet2::from(f(from)..),
(None, Some(to)) => RangeSet2::from(..f(to)),
(None, None) => RangeSet2::all(),
}
}

pub mod builder {
use std::collections::BTreeMap;

Expand Down Expand Up @@ -863,7 +934,7 @@ pub mod builder {
use bao_tree::ChunkNum;

use super::*;
use crate::{protocol::GetManyRequest, util::ChunkRangesExt};
use crate::protocol::{ChunkRangesExt, GetManyRequest};

#[test]
fn chunk_ranges_ext() {
Expand Down
4 changes: 2 additions & 2 deletions src/protocol/range_spec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use bao_tree::{ChunkNum, ChunkRangesRef};
use serde::{Deserialize, Serialize};
use smallvec::{smallvec, SmallVec};

pub use crate::util::ChunkRangesExt;
use crate::protocol::ChunkRangesExt;

static CHUNK_RANGES_EMPTY: OnceLock<ChunkRanges> = OnceLock::new();

Expand Down Expand Up @@ -511,7 +511,7 @@ mod tests {
use proptest::prelude::*;

use super::*;
use crate::util::ChunkRangesExt;
use crate::protocol::ChunkRangesExt;

fn ranges(value_range: Range<u64>) -> impl Strategy<Value = ChunkRanges> {
prop::collection::vec((value_range.clone(), value_range), 0..16).prop_map(|v| {
Expand Down
2 changes: 1 addition & 1 deletion src/store/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ use crate::{
},
ApiClient,
},
protocol::ChunkRangesExt,
store::{
fs::{
bao_file::{
Expand All @@ -125,7 +126,6 @@ use crate::{
util::{
channel::oneshot,
temp_tag::{TagDrop, TempTag, TempTagScope, TempTags},
ChunkRangesExt,
},
};
mod bao_file;
Expand Down
6 changes: 2 additions & 4 deletions src/store/mem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,12 @@ use crate::{
tags::TagInfo,
ApiClient,
},
protocol::ChunkRangesExt,
store::{
util::{SizeInfo, SparseMemFile, Tag},
HashAndFormat, IROH_BLOCK_SIZE,
},
util::{
temp_tag::{TagDrop, TempTagScope, TempTags},
ChunkRangesExt,
},
util::temp_tag::{TagDrop, TempTagScope, TempTags},
BlobFormat, Hash,
};

Expand Down
2 changes: 1 addition & 1 deletion src/store/readonly_mem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ use crate::{
},
ApiClient, TempTag,
},
protocol::ChunkRangesExt,
store::{mem::CompleteStorage, IROH_BLOCK_SIZE},
util::ChunkRangesExt,
Hash,
};

Expand Down
83 changes: 6 additions & 77 deletions src/util.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,9 @@
use std::ops::{Bound, RangeBounds};

use bao_tree::{io::round_up_to_chunks, ChunkNum, ChunkRanges};
use range_collections::{range_set::RangeSetEntry, RangeSet2};

pub mod channel;
pub(crate) mod connection_pool;
//! Utilities
pub(crate) mod channel;
pub mod connection_pool;
pub(crate) mod temp_tag;

pub mod serde {
pub(crate) mod serde {
// Module that handles io::Error serialization/deserialization
pub mod io_error_serde {
use std::{fmt, io};
Expand Down Expand Up @@ -218,74 +214,7 @@ pub mod serde {
}
}

pub trait ChunkRangesExt {
fn last_chunk() -> Self;
fn chunk(offset: u64) -> Self;
fn bytes(ranges: impl RangeBounds<u64>) -> Self;
fn chunks(ranges: impl RangeBounds<u64>) -> Self;
fn offset(offset: u64) -> Self;
}

impl ChunkRangesExt for ChunkRanges {
fn last_chunk() -> Self {
ChunkRanges::from(ChunkNum(u64::MAX)..)
}

/// Create a chunk range that contains a single chunk.
fn chunk(offset: u64) -> Self {
ChunkRanges::from(ChunkNum(offset)..ChunkNum(offset + 1))
}

/// Create a range of chunks that contains the given byte ranges.
/// The byte ranges are rounded up to the nearest chunk size.
fn bytes(ranges: impl RangeBounds<u64>) -> Self {
round_up_to_chunks(&bounds_from_range(ranges, |v| v))
}

/// Create a range of chunks from u64 chunk bounds.
///
/// This is equivalent but more convenient than using the ChunkNum newtype.
fn chunks(ranges: impl RangeBounds<u64>) -> Self {
bounds_from_range(ranges, ChunkNum)
}

/// Create a chunk range that contains a single byte offset.
fn offset(offset: u64) -> Self {
Self::bytes(offset..offset + 1)
}
}

// todo: move to range_collections
pub(crate) fn bounds_from_range<R, T, F>(range: R, f: F) -> RangeSet2<T>
where
R: RangeBounds<u64>,
T: RangeSetEntry,
F: Fn(u64) -> T,
{
let from = match range.start_bound() {
Bound::Included(start) => Some(*start),
Bound::Excluded(start) => {
let Some(start) = start.checked_add(1) else {
return RangeSet2::empty();
};
Some(start)
}
Bound::Unbounded => None,
};
let to = match range.end_bound() {
Bound::Included(end) => end.checked_add(1),
Bound::Excluded(end) => Some(*end),
Bound::Unbounded => None,
};
match (from, to) {
(Some(from), Some(to)) => RangeSet2::from(f(from)..f(to)),
(Some(from), None) => RangeSet2::from(f(from)..),
(None, Some(to)) => RangeSet2::from(..f(to)),
(None, None) => RangeSet2::all(),
}
}

pub mod outboard_with_progress {
pub(crate) mod outboard_with_progress {
use std::io::{self, BufReader, Read};

use bao_tree::{
Expand Down Expand Up @@ -433,7 +362,7 @@ pub mod outboard_with_progress {
}
}

pub mod sink {
pub(crate) mod sink {
use std::{future::Future, io};

use irpc::RpcMessage;
Expand Down
4 changes: 2 additions & 2 deletions src/util/connection_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
//! Entry point is [`ConnectionPool`]. You create a connection pool for a specific
//! ALPN and [`Options`]. Then the pool will manage connections for you.
//!
//! Access to connections is via the [`ConnectionPool::connect`] method, which
//! Access to connections is via the [`ConnectionPool::get_or_connect`] method, which
//! gives you access to a connection via a [`ConnectionRef`] if possible.
//!
//! It is important that you keep the [`ConnectionRef`] alive while you are using
Expand Down Expand Up @@ -360,7 +360,7 @@ impl ConnectionPool {
///
/// This is guaranteed to return after approximately [Options::connect_timeout]
/// with either an error or a connection.
pub async fn connect(
pub async fn get_or_connect(
&self,
id: NodeId,
) -> std::result::Result<ConnectionRef, PoolConnectError> {
Expand Down
Loading