Skip to content

Commit 3dfc9c6

Browse files
committed
Adapt ci and make some more things optional
1 parent bbcecfc commit 3dfc9c6

File tree

3 files changed

+57
-15
lines changed

3 files changed

+57
-15
lines changed

.github/workflows/ci.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ env:
1313
RUSTFLAGS: -Dwarnings
1414
RUSTDOCFLAGS: -Dwarnings
1515
MSRV: "1.72"
16-
RS_EXAMPLES_LIST: "content-tracker,iroh-ipfs,dumbpipe-web,iroh-pkarr-node-discovery"
16+
RS_EXAMPLES_LIST: "iroh-mainline-content-discovery,iroh-ipfs,dumbpipe-web,iroh-pkarr-node-discovery"
1717
GO_EXAMPLES_LIST: "dall_e_worker"
1818

1919
jobs:

iroh-mainline-content-discovery/Cargo.toml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ license = "MIT OR Apache-2.0"
99

1010
[dependencies]
1111
anyhow = { version = "1", features = ["backtrace"] }
12-
bao-tree = { version = "0.9.1", features = ["tokio_fsm"], default-features = false }
12+
bao-tree = { version = "0.9.1", features = ["tokio_fsm"], default-features = false, optional = true }
1313
bytes = "1"
1414
clap = { version = "4", features = ["derive"], optional = true }
1515
derive_more = { version = "1.0.0-beta.1", features = ["debug", "display", "from", "try_into"] }
@@ -44,7 +44,8 @@ flume = "0.11.0"
4444
genawaiter = { version = "0.99.1", features = ["futures03"] }
4545

4646
[features]
47-
cli = ["clap"]
47+
cli = ["clap", "tracker"]
48+
tracker = ["bao-tree"]
4849
default = ["cli"]
4950

5051
[[bin]]

iroh-mainline-content-discovery/src/lib.rs

Lines changed: 53 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,10 @@ use std::{
77
};
88

99
use anyhow::Context;
10-
use futures::{future::BoxFuture, FutureExt, Stream, StreamExt};
10+
use futures::{
11+
future::{self, BoxFuture},
12+
FutureExt, Stream, StreamExt,
13+
};
1114
use iroh_bytes::HashAndFormat;
1215
use iroh_net::{
1316
magic_endpoint::{get_alpn, get_remote_node_id},
@@ -21,10 +24,14 @@ use tokio::io::AsyncWriteExt;
2124

2225
use crate::protocol::{Announce, Query, Request, Response, REQUEST_SIZE_LIMIT, TRACKER_ALPN};
2326

27+
#[cfg(feature = "tracker")]
2428
pub mod io;
29+
#[cfg(feature = "tracker")]
2530
pub mod iroh_bytes_util;
31+
#[cfg(feature = "tracker")]
2632
pub mod options;
2733
pub mod protocol;
34+
#[cfg(feature = "tracker")]
2835
pub mod tracker;
2936

3037
/// Accept an incoming connection and extract the client-provided [`NodeId`] and ALPN protocol.
@@ -57,7 +64,8 @@ pub async fn announce(connection: quinn::Connection, args: Announce) -> anyhow::
5764
Ok(())
5865
}
5966

60-
fn to_infohash(haf: HashAndFormat) -> mainline::Id {
67+
/// The mapping from a [`HashAndFormat`] to a [`mainline::Id`].
68+
pub fn to_infohash(haf: HashAndFormat) -> mainline::Id {
6169
let mut data = [0u8; 20];
6270
data.copy_from_slice(&haf.hash.as_bytes()[..20]);
6371
mainline::Id::from_bytes(data).unwrap()
@@ -69,6 +77,7 @@ fn unique_tracker_addrs(
6977
Gen::new(|co| async move {
7078
let mut found = HashSet::new();
7179
while let Some(response) = response.next_async().await {
80+
println!("got get_peers response: {:?}", response);
7281
let tracker = response.peer;
7382
if !found.insert(tracker) {
7483
continue;
@@ -78,7 +87,7 @@ fn unique_tracker_addrs(
7887
})
7988
}
8089

81-
async fn query_one(
90+
async fn query_socket_one(
8291
endpoint: impl ConnectionProvider,
8392
addr: SocketAddr,
8493
args: Query,
@@ -88,6 +97,18 @@ async fn query_one(
8897
Ok(result.hosts)
8998
}
9099

100+
async fn query_magic_one(
101+
endpoint: MagicEndpoint,
102+
node_id: &NodeId,
103+
args: Query,
104+
) -> anyhow::Result<Vec<NodeId>> {
105+
let connection = endpoint
106+
.connect_by_node_id(node_id, protocol::TRACKER_ALPN)
107+
.await?;
108+
let result = query(connection, args).await?;
109+
Ok(result.hosts)
110+
}
111+
91112
/// A connection provider that can be used to connect to a tracker.
92113
///
93114
/// This can either be a [`quinn::Endpoint`] where connections are created on demand,
@@ -102,8 +123,29 @@ impl ConnectionProvider for quinn::Endpoint {
102123
}
103124
}
104125

126+
pub fn query_trackers(
127+
endpoint: MagicEndpoint,
128+
trackers: impl IntoIterator<Item = NodeId>,
129+
args: Query,
130+
query_parallelism: usize,
131+
) -> impl Stream<Item = anyhow::Result<NodeId>> {
132+
futures::stream::iter(trackers)
133+
.map(move |tracker| {
134+
let endpoint = endpoint.clone();
135+
async move {
136+
let hosts = match query_magic_one(endpoint, &tracker, args).await {
137+
Ok(hosts) => hosts.into_iter().map(anyhow::Ok).collect(),
138+
Err(cause) => vec![Err(cause)],
139+
};
140+
futures::stream::iter(hosts)
141+
}
142+
})
143+
.buffer_unordered(query_parallelism)
144+
.flatten()
145+
}
146+
105147
/// Query the mainline DHT for trackers for the given content, then query each tracker for peers.
106-
pub async fn query_dht(
148+
pub fn query_dht(
107149
endpoint: impl ConnectionProvider,
108150
dht: mainline::dht::Dht,
109151
args: Query,
@@ -117,7 +159,7 @@ pub async fn query_dht(
117159
.map(move |addr| {
118160
let endpoint = endpoint.clone();
119161
async move {
120-
let hosts = match query_one(endpoint, addr, args).await {
162+
let hosts = match query_socket_one(endpoint, addr, args).await {
121163
Ok(hosts) => hosts.into_iter().map(anyhow::Ok).collect(),
122164
Err(cause) => vec![Err(cause)],
123165
};
@@ -132,6 +174,7 @@ pub async fn query_dht(
132174
///
133175
/// Note that this should only be called from a publicly reachable node, where port is the port
134176
/// on which the tracker protocol is reachable.
177+
#[cfg(feature = "tracker")]
135178
pub fn announce_dht(
136179
dht: mainline::dht::Dht,
137180
content: BTreeSet<HashAndFormat>,
@@ -174,7 +217,7 @@ fn configure_server(secret_key: &iroh_net::key::SecretKey) -> anyhow::Result<qui
174217
make_server_config(secret_key, 8, 1024, vec![TRACKER_ALPN.to_vec()])
175218
}
176219

177-
fn create_quinn_client(
220+
pub fn create_quinn_client(
178221
bind_addr: SocketAddr,
179222
alpn_protocols: Vec<Vec<u8>>,
180223
keylog: bool,
@@ -192,7 +235,7 @@ fn create_quinn_client(
192235
}
193236

194237
/// Create a [`quinn::ServerConfig`] with the given secret key and limits.
195-
pub fn make_server_config(
238+
fn make_server_config(
196239
secret_key: &iroh_net::key::SecretKey,
197240
max_streams: u64,
198241
max_connections: u32,
@@ -212,6 +255,7 @@ pub fn make_server_config(
212255
}
213256

214257
/// Loads a [`SecretKey`] from the provided file.
258+
#[cfg(feature = "tracker")]
215259
pub async fn load_secret_key(
216260
key_path: std::path::PathBuf,
217261
) -> anyhow::Result<iroh_net::key::SecretKey> {
@@ -310,7 +354,7 @@ pub async fn connect(tracker: &TrackerId, local_port: u16) -> anyhow::Result<qui
310354
}
311355

312356
/// Create a magic endpoint and connect to a tracker using the [protocol::TRACKER_ALPN] protocol.
313-
pub async fn connect_magic(tracker: &NodeId, local_port: u16) -> anyhow::Result<quinn::Connection> {
357+
async fn connect_magic(tracker: &NodeId, local_port: u16) -> anyhow::Result<quinn::Connection> {
314358
// todo: uncomment once the connection problems are fixed
315359
// for now, a random node id is more reliable.
316360
// let key = load_secret_key(tracker_path(CLIENT_KEY)?).await?;
@@ -322,10 +366,7 @@ pub async fn connect_magic(tracker: &NodeId, local_port: u16) -> anyhow::Result<
322366
}
323367

324368
/// Create a quinn endpoint and connect to a tracker using the [protocol::TRACKER_ALPN] protocol.
325-
pub async fn connect_socket(
326-
tracker: SocketAddr,
327-
local_port: u16,
328-
) -> anyhow::Result<quinn::Connection> {
369+
async fn connect_socket(tracker: SocketAddr, local_port: u16) -> anyhow::Result<quinn::Connection> {
329370
let bind_addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, local_port));
330371
let endpoint = create_quinn_client(bind_addr, vec![TRACKER_ALPN.to_vec()], false)?;
331372
tracing::info!("trying to connect to tracker at {:?}", tracker);

0 commit comments

Comments
 (0)