Skip to content

Commit 84fbe80

Browse files
committed
clippy
1 parent 0706098 commit 84fbe80

File tree

5 files changed

+93
-79
lines changed

5 files changed

+93
-79
lines changed

examples/multiprovider.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,7 @@ async fn download(args: DownloadArgs) -> anyhow::Result<()> {
236236
let bitmap = bitmap(current, requested, rows as usize);
237237
print!("\r{bitmap}");
238238
if progress.is_done() {
239-
println!("");
239+
println!();
240240
break;
241241
}
242242
}

src/downloader2.rs

Lines changed: 12 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,9 @@ use state::*;
5151
mod actor;
5252
use actor::*;
5353

54+
mod content_discovery;
55+
pub use content_discovery::*;
56+
5457
#[derive(
5558
Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, derive_more::From,
5659
)]
@@ -76,35 +79,6 @@ struct PeerDownloadId(u64);
7679
)]
7780
struct BitfieldSubscriptionId(u64);
7881

79-
/// Announce kind
80-
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Default)]
81-
pub enum AnnounceKind {
82-
/// The peer supposedly has some of the data.
83-
Partial = 0,
84-
/// The peer supposedly has the complete data.
85-
#[default]
86-
Complete,
87-
}
88-
89-
/// Options for finding peers
90-
#[derive(Debug, Default)]
91-
pub struct FindPeersOpts {
92-
/// Kind of announce
93-
pub kind: AnnounceKind,
94-
}
95-
96-
/// A pluggable content discovery mechanism
97-
pub trait ContentDiscovery: std::fmt::Debug + Send + 'static {
98-
/// Find peers that have the given blob.
99-
///
100-
/// The returned stream is a handle for the discovery task. It should be an
101-
/// infinite stream that only stops when it is dropped.
102-
fn find_peers(&mut self, hash: Hash, opts: FindPeersOpts) -> BoxStream<'static, NodeId>;
103-
}
104-
105-
/// A boxed content discovery
106-
pub type BoxedContentDiscovery = Box<dyn ContentDiscovery>;
107-
10882
/// A pluggable bitfield subscription mechanism
10983
pub trait BitfieldSubscription: std::fmt::Debug + Send + 'static {
11084
/// Subscribe to a bitfield
@@ -265,7 +239,7 @@ impl<S> DownloaderBuilder<S> {
265239
{
266240
let store = self.store;
267241
let discovery = self.discovery.expect("discovery not set");
268-
let local_pool = self.local_pool.unwrap_or_else(|| LocalPool::single());
242+
let local_pool = self.local_pool.unwrap_or_else(LocalPool::single);
269243
let planner = self
270244
.planner
271245
.unwrap_or_else(|| Box::new(StripePlanner2::new(0, 10)));
@@ -350,33 +324,6 @@ impl Downloader {
350324
}
351325
}
352326

353-
/// A simple static content discovery mechanism
354-
#[derive(Debug)]
355-
pub struct StaticContentDiscovery {
356-
info: BTreeMap<Hash, Vec<NodeId>>,
357-
default: Vec<NodeId>,
358-
}
359-
360-
impl StaticContentDiscovery {
361-
/// Create a new static content discovery mechanism
362-
pub fn new(mut info: BTreeMap<Hash, Vec<NodeId>>, mut default: Vec<NodeId>) -> Self {
363-
default.sort();
364-
default.dedup();
365-
for (_, peers) in info.iter_mut() {
366-
peers.sort();
367-
peers.dedup();
368-
}
369-
Self { info, default }
370-
}
371-
}
372-
373-
impl ContentDiscovery for StaticContentDiscovery {
374-
fn find_peers(&mut self, hash: Hash, _opts: FindPeersOpts) -> BoxStream<'static, NodeId> {
375-
let peers = self.info.get(&hash).unwrap_or(&self.default).clone();
376-
Box::pin(futures_lite::stream::iter(peers).chain(futures_lite::stream::pending()))
377-
}
378-
}
379-
380327
/// A bitfield subscription that just returns nothing for local and everything(*) for remote
381328
///
382329
/// * Still need to figure out how to deal with open ended chunk ranges.
@@ -423,7 +370,7 @@ impl<S> SimpleBitfieldSubscription<S> {
423370
}
424371

425372
async fn get_valid_ranges_local<S: Store>(hash: &Hash, store: S) -> anyhow::Result<ChunkRanges> {
426-
if let Some(entry) = store.get_mut(&hash).await? {
373+
if let Some(entry) = store.get_mut(hash).await? {
427374
crate::get::db::valid_ranges::<S>(&entry).await
428375
} else {
429376
Ok(ChunkRanges::empty())
@@ -436,7 +383,7 @@ async fn get_valid_ranges_remote(
436383
hash: &Hash,
437384
) -> anyhow::Result<ChunkRanges> {
438385
let conn = endpoint.connect(id, crate::ALPN).await?;
439-
let (size, _) = crate::get::request::get_verified_size(&conn, &hash).await?;
386+
let (size, _) = crate::get::request::get_verified_size(&conn, hash).await?;
440387
let chunks = (size + 1023) / 1024;
441388
Ok(ChunkRanges::from(ChunkNum(0)..ChunkNum(chunks)))
442389
}
@@ -491,6 +438,7 @@ impl<S: Store> BitfieldSubscription for SimpleBitfieldSubscription<S> {
491438

492439
#[cfg(test)]
493440
mod tests {
441+
#![allow(clippy::single_range_in_vec_init)]
494442
use std::ops::Range;
495443

496444
use crate::net_protocol::Blobs;
@@ -534,7 +482,7 @@ mod tests {
534482
let mut planner = StripePlanner2::new(0, 4);
535483
let hash = Hash::new(b"test");
536484
let mut ranges = make_range_map(&[chunk_ranges([0..50]), chunk_ranges([50..100])]);
537-
println!("");
485+
println!();
538486
print_range_map(&ranges);
539487
println!("planning");
540488
planner.plan(hash, &mut ranges);
@@ -550,7 +498,7 @@ mod tests {
550498
chunk_ranges([0..100]),
551499
chunk_ranges([0..100]),
552500
]);
553-
println!("");
501+
println!();
554502
print_range_map(&ranges);
555503
println!("planning");
556504
planner.plan(hash, &mut ranges);
@@ -567,7 +515,7 @@ mod tests {
567515
chunk_ranges([0..120]),
568516
chunk_ranges([0..50]),
569517
]);
570-
println!("");
518+
println!();
571519
print_range_map(&ranges);
572520
println!("planning");
573521
planner.plan(hash, &mut ranges);
@@ -656,10 +604,7 @@ mod tests {
656604
.discovery_n0()
657605
.bind()
658606
.await?;
659-
let discovery = StaticContentDiscovery {
660-
info: BTreeMap::new(),
661-
default: vec![peer],
662-
};
607+
let discovery = StaticContentDiscovery::new(BTreeMap::new(), vec![peer]);
663608
let bitfield_subscription = TestBitfieldSubscription;
664609
let downloader = Downloader::builder(endpoint, store)
665610
.discovery(discovery)
@@ -697,10 +642,7 @@ mod tests {
697642
.discovery_n0()
698643
.bind()
699644
.await?;
700-
let discovery = StaticContentDiscovery {
701-
info: BTreeMap::new(),
702-
default: peers,
703-
};
645+
let discovery = StaticContentDiscovery::new(BTreeMap::new(), peers);
704646
let downloader = Downloader::builder(endpoint, store)
705647
.discovery(discovery)
706648
.planner(StripePlanner2::new(0, 8))

src/downloader2/actor.rs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,17 +15,23 @@ pub(super) enum UserCommand {
1515
}
1616

1717
pub(super) struct DownloaderActor<S> {
18+
state: DownloaderState,
1819
local_pool: LocalPool,
1920
endpoint: Endpoint,
2021
command_rx: mpsc::Receiver<Command>,
2122
command_tx: mpsc::Sender<Command>,
22-
state: DownloaderState,
2323
store: S,
24+
/// Content discovery
2425
discovery: BoxedContentDiscovery,
26+
/// Bitfield subscription
2527
subscribe_bitfield: BoxedBitfieldSubscription,
28+
/// Futures for downloads
2629
download_futs: BTreeMap<DownloadId, tokio::sync::oneshot::Sender<()>>,
30+
/// Tasks for peer downloads
2731
peer_download_tasks: BTreeMap<PeerDownloadId, local_pool::Run<()>>,
32+
/// Tasks for discovery
2833
discovery_tasks: BTreeMap<DiscoveryId, AbortOnDropHandle<()>>,
34+
/// Tasks for bitfield subscriptions
2935
bitfield_subscription_tasks: BTreeMap<BitfieldSubscriptionId, AbortOnDropHandle<()>>,
3036
/// Id generator for download ids
3137
download_id_gen: IdGenerator<DownloadId>,
@@ -67,13 +73,13 @@ impl<S: Store> DownloaderActor<S> {
6773
}
6874
}
6975

70-
pub(super) async fn run(mut self, mut channel: mpsc::Receiver<UserCommand>) {
76+
pub(super) async fn run(mut self, mut user_commands: mpsc::Receiver<UserCommand>) {
7177
let mut ticks = tokio::time::interval(Duration::from_millis(100));
7278
loop {
7379
trace!("downloader actor tick");
7480
tokio::select! {
7581
biased;
76-
Some(cmd) = channel.recv() => {
82+
Some(cmd) = user_commands.recv() => {
7783
debug!("user command {cmd:?}");
7884
match cmd {
7985
UserCommand::Download {
@@ -228,6 +234,7 @@ impl<S: Store> DownloaderActor<S> {
228234
}
229235
}
230236

237+
#[allow(clippy::too_many_arguments)]
231238
async fn peer_download_task<S: Store>(
232239
id: PeerDownloadId,
233240
endpoint: Endpoint,
@@ -327,6 +334,7 @@ async fn peer_download<S: Store>(
327334
Ok(stats)
328335
}
329336

337+
/// Spawn a future and wrap it in a [`AbortOnDropHandle`]
330338
pub(super) fn spawn<F, T>(f: F) -> AbortOnDropHandle<T>
331339
where
332340
F: Future<Output = T> + Send + 'static,
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
use std::collections::BTreeMap;
2+
3+
use crate::Hash;
4+
use futures_lite::stream::StreamExt;
5+
use futures_util::stream::BoxStream;
6+
use iroh::NodeId;
7+
use serde::{Deserialize, Serialize};
8+
9+
/// Announce kind
10+
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Default)]
11+
pub enum AnnounceKind {
12+
/// The peer supposedly has some of the data.
13+
Partial = 0,
14+
/// The peer supposedly has the complete data.
15+
#[default]
16+
Complete,
17+
}
18+
19+
/// Options for finding peers
20+
#[derive(Debug, Default)]
21+
pub struct FindPeersOpts {
22+
/// Kind of announce
23+
pub kind: AnnounceKind,
24+
}
25+
26+
/// A pluggable content discovery mechanism
27+
pub trait ContentDiscovery: std::fmt::Debug + Send + 'static {
28+
/// Find peers that have the given blob.
29+
///
30+
/// The returned stream is a handle for the discovery task. It should be an
31+
/// infinite stream that only stops when it is dropped.
32+
fn find_peers(&mut self, hash: Hash, opts: FindPeersOpts) -> BoxStream<'static, NodeId>;
33+
}
34+
35+
/// A boxed content discovery
36+
pub type BoxedContentDiscovery = Box<dyn ContentDiscovery>;
37+
38+
/// A simple static content discovery mechanism
39+
#[derive(Debug)]
40+
pub struct StaticContentDiscovery {
41+
info: BTreeMap<Hash, Vec<NodeId>>,
42+
default: Vec<NodeId>,
43+
}
44+
45+
impl StaticContentDiscovery {
46+
/// Create a new static content discovery mechanism
47+
pub fn new(mut info: BTreeMap<Hash, Vec<NodeId>>, mut default: Vec<NodeId>) -> Self {
48+
default.sort();
49+
default.dedup();
50+
for (_, peers) in info.iter_mut() {
51+
peers.sort();
52+
peers.dedup();
53+
}
54+
Self { info, default }
55+
}
56+
}
57+
58+
impl ContentDiscovery for StaticContentDiscovery {
59+
fn find_peers(&mut self, hash: Hash, _opts: FindPeersOpts) -> BoxStream<'static, NodeId> {
60+
let peers = self.info.get(&hash).unwrap_or(&self.default).clone();
61+
Box::pin(futures_lite::stream::iter(peers).chain(futures_lite::stream::pending()))
62+
}
63+
}

src/downloader2/state.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -64,13 +64,13 @@ pub(super) enum Command {
6464
DropPeer { peer: NodeId },
6565
/// A peer has been discovered
6666
PeerDiscovered { peer: NodeId, hash: Hash },
67-
///
67+
/// Start observing a local bitfield
6868
ObserveLocal {
6969
id: ObserveId,
7070
hash: Hash,
7171
ranges: ChunkRanges,
7272
},
73-
///
73+
/// Stop observing a local bitfield
7474
StopObserveLocal { id: ObserveId },
7575
/// A tick from the driver, for rebalancing
7676
Tick { time: Duration },
@@ -319,9 +319,9 @@ impl DownloaderState {
319319
evs.push(Event::UnsubscribeBitfield {
320320
id: state.subscription_id,
321321
});
322-
return false;
322+
false
323323
} else {
324-
return true;
324+
true
325325
}
326326
});
327327
self.peers.remove(&peer);
@@ -666,7 +666,7 @@ impl DownloaderState {
666666
"Stopping {} old peer downloads",
667667
download.peer_downloads.len()
668668
);
669-
for (_, state) in &download.peer_downloads {
669+
for state in download.peer_downloads.values() {
670670
// stop all downloads
671671
evs.push(Event::StopPeerDownload { id: state.id });
672672
}
@@ -906,6 +906,7 @@ fn total_chunks(chunks: &ChunkRanges) -> Option<u64> {
906906

907907
#[cfg(test)]
908908
mod tests {
909+
#![allow(clippy::single_range_in_vec_init)]
909910

910911
use super::super::tests::{
911912
chunk_ranges, has_all_events, has_one_event, has_one_event_matching, noop_planner,

0 commit comments

Comments
 (0)