Skip to content

Commit 85a9f7b

Browse files
committed
Make this also a lib
1 parent 658358b commit 85a9f7b

File tree

9 files changed

+484
-957
lines changed

9 files changed

+484
-957
lines changed

iroh-mainline-content-discovery/Cargo.lock

Lines changed: 122 additions & 787 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

iroh-mainline-content-discovery/Cargo.toml

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,15 @@ license = "MIT OR Apache-2.0"
1111
anyhow = { version = "1", features = ["backtrace"] }
1212
bao-tree = { version = "0.9.1", features = ["tokio_fsm"], default-features = false }
1313
bytes = "1"
14-
clap = { version = "4", features = ["derive"] }
14+
clap = { version = "4", features = ["derive"], optional = true }
1515
derive_more = { version = "1.0.0-beta.1", features = ["debug", "display", "from", "try_into"] }
1616
dirs-next = "2"
1717
ed25519-dalek = "2.1.0"
1818
futures = "0.3.25"
1919
hex = "0.4.3"
2020
humantime = "2.1.0"
21-
iroh = "0.12.0"
21+
iroh-net = "0.12.0"
22+
iroh-bytes = "0.12.0"
2223
iroh-pkarr-node-discovery = { path = "../iroh-pkarr-node-discovery" }
2324
mainline = "1.0.0"
2425
pkarr = { version = "1.0.1", features = ["async"] }
@@ -39,3 +40,11 @@ tracing = "0.1"
3940
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
4041
ttl_cache = "0.5.1"
4142
url = "2.5.0"
43+
44+
[features]
45+
cli = ["clap"]
46+
default = ["cli"]
47+
48+
[[bin]]
49+
name = "iroh-mainline-content-discovery"
50+
required-features = ["cli"]

iroh-mainline-content-discovery/src/args.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,10 @@
11
//! Command line arguments.
22
use clap::{Parser, Subcommand};
3-
use iroh::bytes::{Hash, HashAndFormat};
4-
use iroh::ticket::BlobTicket;
3+
use iroh_bytes::{Hash, HashAndFormat};
4+
use iroh_mainline_content_discovery::TrackerId;
5+
use iroh_net::{ticket::BlobTicket, NodeId};
56
use std::{fmt::Display, str::FromStr};
67

7-
use crate::{NodeId, TrackerId};
8-
98
#[derive(Parser, Debug)]
109
pub struct Args {
1110
#[clap(subcommand)]

iroh-mainline-content-discovery/src/io.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,12 @@ use std::{
88
};
99

1010
use anyhow::Context;
11-
use iroh::bytes::{get::Stats, HashAndFormat};
11+
use iroh_bytes::{get::Stats, HashAndFormat};
12+
use iroh_net::NodeId;
1213
use serde::{de::DeserializeOwned, Deserialize, Serialize};
1314
use tracing_subscriber::{prelude::*, EnvFilter};
1415

15-
use crate::{protocol::AnnounceKind, tracker::ProbeKind, NodeId};
16+
use crate::{protocol::AnnounceKind, tracker::ProbeKind};
1617

1718
pub const CONFIG_DEFAULTS_FILE: &str = "config.defaults.toml";
1819
pub const CONFIG_FILE: &str = "config.toml";

iroh-mainline-content-discovery/src/iroh_bytes_util.rs

Lines changed: 16 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use std::sync::Arc;
33

44
use bao_tree::{ByteNum, ChunkNum, ChunkRanges};
55
use bytes::Bytes;
6-
use iroh::bytes::{
6+
use iroh_bytes::{
77
get::{
88
fsm::{BlobContentNext, EndBlobNext},
99
Stats,
@@ -14,8 +14,6 @@ use iroh::bytes::{
1414
};
1515
use rand::Rng;
1616

17-
use crate::log;
18-
1917
/// Get the claimed size of a blob from a peer.
2018
///
2119
/// This is just reading the size header and then immediately closing the connection.
@@ -24,13 +22,13 @@ pub async fn unverified_size(
2422
connection: &quinn::Connection,
2523
hash: &Hash,
2624
) -> anyhow::Result<(u64, Stats)> {
27-
let request = iroh::bytes::protocol::GetRequest::new(
25+
let request = iroh_bytes::protocol::GetRequest::new(
2826
*hash,
2927
RangeSpecSeq::from_ranges(vec![ChunkRanges::from(ChunkNum(u64::MAX)..)]),
3028
);
31-
let request = iroh::bytes::get::fsm::start(connection.clone(), request);
29+
let request = iroh_bytes::get::fsm::start(connection.clone(), request);
3230
let connected = request.next().await?;
33-
let iroh::bytes::get::fsm::ConnectedNext::StartRoot(start) = connected.next().await? else {
31+
let iroh_bytes::get::fsm::ConnectedNext::StartRoot(start) = connected.next().await? else {
3432
unreachable!("expected start root");
3533
};
3634
let at_blob_header = start.next();
@@ -47,14 +45,14 @@ pub async fn verified_size(
4745
connection: &quinn::Connection,
4846
hash: &Hash,
4947
) -> anyhow::Result<(u64, Stats)> {
50-
log!("Getting verified size of {}", hash.to_hex());
51-
let request = iroh::bytes::protocol::GetRequest::new(
48+
tracing::debug!("Getting verified size of {}", hash.to_hex());
49+
let request = iroh_bytes::protocol::GetRequest::new(
5250
*hash,
5351
RangeSpecSeq::from_ranges(vec![ChunkRanges::from(ChunkNum(u64::MAX)..)]),
5452
);
55-
let request = iroh::bytes::get::fsm::start(connection.clone(), request);
53+
let request = iroh_bytes::get::fsm::start(connection.clone(), request);
5654
let connected = request.next().await?;
57-
let iroh::bytes::get::fsm::ConnectedNext::StartRoot(start) = connected.next().await? else {
55+
let iroh_bytes::get::fsm::ConnectedNext::StartRoot(start) = connected.next().await? else {
5856
unreachable!("expected start root");
5957
};
6058
let header = start.next();
@@ -74,7 +72,7 @@ pub async fn verified_size(
7472
unreachable!("expected closing");
7573
};
7674
let stats = closing.next().await?;
77-
log!(
75+
tracing::debug!(
7876
"Got verified size of {}, {:.6}s",
7977
hash.to_hex(),
8078
stats.elapsed.as_secs_f64()
@@ -88,17 +86,17 @@ pub async fn get_hash_seq_and_sizes(
8886
max_size: u64,
8987
) -> anyhow::Result<(HashSeq, Arc<[u64]>)> {
9088
let content = HashAndFormat::hash_seq(*hash);
91-
log!("Getting hash seq and children sizes of {}", content);
92-
let request = iroh::bytes::protocol::GetRequest::new(
89+
tracing::debug!("Getting hash seq and children sizes of {}", content);
90+
let request = iroh_bytes::protocol::GetRequest::new(
9391
*hash,
9492
RangeSpecSeq::from_ranges_infinite([
9593
ChunkRanges::all(),
9694
ChunkRanges::from(ChunkNum(u64::MAX)..),
9795
]),
9896
);
99-
let at_start = iroh::bytes::get::fsm::start(connection.clone(), request);
97+
let at_start = iroh_bytes::get::fsm::start(connection.clone(), request);
10098
let at_connected = at_start.next().await?;
101-
let iroh::bytes::get::fsm::ConnectedNext::StartRoot(start) = at_connected.next().await? else {
99+
let iroh_bytes::get::fsm::ConnectedNext::StartRoot(start) = at_connected.next().await? else {
102100
unreachable!("query includes root");
103101
};
104102
let at_start_root = start.next();
@@ -127,7 +125,7 @@ pub async fn get_hash_seq_and_sizes(
127125
}
128126
};
129127
let _stats = closing.next().await?;
130-
log!(
128+
tracing::debug!(
131129
"Got hash seq and children sizes of {}: {:?}",
132130
content,
133131
sizes
@@ -144,9 +142,9 @@ pub async fn chunk_probe(
144142
let ranges = ChunkRanges::from(chunk..chunk + 1);
145143
let ranges = RangeSpecSeq::from_ranges([ranges]);
146144
let request = GetRequest::new(*hash, ranges);
147-
let request = iroh::bytes::get::fsm::start(connection.clone(), request);
145+
let request = iroh_bytes::get::fsm::start(connection.clone(), request);
148146
let connected = request.next().await?;
149-
let iroh::bytes::get::fsm::ConnectedNext::StartRoot(start) = connected.next().await? else {
147+
let iroh_bytes::get::fsm::ConnectedNext::StartRoot(start) = connected.next().await? else {
150148
unreachable!("query includes root");
151149
};
152150
let header = start.next();
Lines changed: 195 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,195 @@
1+
use std::{net::SocketAddr, sync::Arc, time::Duration};
2+
3+
use anyhow::Context;
4+
use iroh_net::{
5+
magic_endpoint::{get_alpn, get_remote_node_id},
6+
MagicEndpoint, NodeId,
7+
};
8+
use iroh_pkarr_node_discovery::PkarrNodeDiscovery;
9+
use pkarr::PkarrClient;
10+
use protocol::QueryResponse;
11+
use tokio::io::AsyncWriteExt;
12+
13+
use crate::protocol::{Announce, Query, Request, Response, REQUEST_SIZE_LIMIT, TRACKER_ALPN};
14+
15+
pub mod io;
16+
pub mod iroh_bytes_util;
17+
pub mod options;
18+
pub mod protocol;
19+
pub mod tracker;
20+
21+
/// A tracker id for queries - either a node id or an address.
22+
#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
23+
pub enum TrackerId {
24+
NodeId(NodeId),
25+
Addr(std::net::SocketAddr),
26+
}
27+
28+
impl std::fmt::Display for TrackerId {
29+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
30+
match self {
31+
TrackerId::NodeId(node_id) => write!(f, "{}", node_id),
32+
TrackerId::Addr(addr) => write!(f, "{}", addr),
33+
}
34+
}
35+
}
36+
37+
impl std::str::FromStr for TrackerId {
38+
type Err = anyhow::Error;
39+
40+
fn from_str(s: &str) -> Result<Self, Self::Err> {
41+
if let Ok(node_id) = s.parse() {
42+
return Ok(TrackerId::NodeId(node_id));
43+
}
44+
if let Ok(addr) = s.parse() {
45+
return Ok(TrackerId::Addr(addr));
46+
}
47+
anyhow::bail!("invalid tracker id")
48+
}
49+
}
50+
51+
/// Accept an incoming connection and extract the client-provided [`NodeId`] and ALPN protocol.
52+
pub(crate) async fn accept_conn(
53+
mut conn: quinn::Connecting,
54+
) -> anyhow::Result<(NodeId, String, quinn::Connection)> {
55+
let alpn = get_alpn(&mut conn).await?;
56+
let conn = conn.await?;
57+
let peer_id = get_remote_node_id(&conn)?;
58+
Ok((peer_id, alpn, conn))
59+
}
60+
61+
/// Announce to a tracker.
62+
///
63+
/// You can only announce content you yourself claim to have, to avoid spamming other nodes.
64+
///
65+
/// `endpoint` is the magic endpoint to use for announcing.
66+
/// `tracker` is the node id of the tracker to announce to. It must understand the [TRACKER_ALPN] protocol.
67+
/// `content` is the content to announce.
68+
/// `kind` is the kind of the announcement. We can claim to have the complete data or only some of it.
69+
pub async fn announce(connection: quinn::Connection, args: Announce) -> anyhow::Result<()> {
70+
let (mut send, mut recv) = connection.open_bi().await?;
71+
tracing::debug!("opened bi stream");
72+
let request = Request::Announce(args);
73+
let request = postcard::to_stdvec(&request)?;
74+
tracing::debug!("sending announce");
75+
send.write_all(&request).await?;
76+
send.finish().await?;
77+
let _response = recv.read_to_end(REQUEST_SIZE_LIMIT).await?;
78+
Ok(())
79+
}
80+
81+
/// Assume an existing connection to a tracker and query it for peers for some content.
82+
pub async fn query(connection: quinn::Connection, args: Query) -> anyhow::Result<QueryResponse> {
83+
tracing::info!("connected to {:?}", connection.remote_address());
84+
let (mut send, mut recv) = connection.open_bi().await?;
85+
tracing::info!("opened bi stream");
86+
let request = Request::Query(args);
87+
let request = postcard::to_stdvec(&request)?;
88+
tracing::info!("sending query");
89+
send.write_all(&request).await?;
90+
send.finish().await?;
91+
let response = recv.read_to_end(REQUEST_SIZE_LIMIT).await?;
92+
let response = postcard::from_bytes::<Response>(&response)?;
93+
Ok(match response {
94+
Response::QueryResponse(response) => response,
95+
})
96+
}
97+
98+
/// Returns default server configuration along with its certificate.
99+
#[allow(clippy::field_reassign_with_default)] // https://github.com/rust-lang/rust-clippy/issues/6527
100+
fn configure_server(secret_key: &iroh_net::key::SecretKey) -> anyhow::Result<quinn::ServerConfig> {
101+
make_server_config(secret_key, 8, 1024, vec![TRACKER_ALPN.to_vec()])
102+
}
103+
104+
fn create_quinn_client(
105+
bind_addr: SocketAddr,
106+
alpn_protocols: Vec<Vec<u8>>,
107+
keylog: bool,
108+
) -> anyhow::Result<quinn::Endpoint> {
109+
let secret_key = iroh_net::key::SecretKey::generate();
110+
let tls_client_config =
111+
iroh_net::tls::make_client_config(&secret_key, None, alpn_protocols, keylog)?;
112+
let mut client_config = quinn::ClientConfig::new(Arc::new(tls_client_config));
113+
let mut endpoint = quinn::Endpoint::client(bind_addr)?;
114+
let mut transport_config = quinn::TransportConfig::default();
115+
transport_config.keep_alive_interval(Some(Duration::from_secs(1)));
116+
client_config.transport_config(Arc::new(transport_config));
117+
endpoint.set_default_client_config(client_config);
118+
Ok(endpoint)
119+
}
120+
121+
/// Create a [`quinn::ServerConfig`] with the given secret key and limits.
122+
pub fn make_server_config(
123+
secret_key: &iroh_net::key::SecretKey,
124+
max_streams: u64,
125+
max_connections: u32,
126+
alpn_protocols: Vec<Vec<u8>>,
127+
) -> anyhow::Result<quinn::ServerConfig> {
128+
let tls_server_config = iroh_net::tls::make_server_config(secret_key, alpn_protocols, false)?;
129+
let mut server_config = quinn::ServerConfig::with_crypto(Arc::new(tls_server_config));
130+
let mut transport_config = quinn::TransportConfig::default();
131+
transport_config
132+
.max_concurrent_bidi_streams(max_streams.try_into()?)
133+
.max_concurrent_uni_streams(0u32.into());
134+
135+
server_config
136+
.transport_config(Arc::new(transport_config))
137+
.concurrent_connections(max_connections);
138+
Ok(server_config)
139+
}
140+
141+
/// Loads a [`SecretKey`] from the provided file.
142+
pub async fn load_secret_key(
143+
key_path: std::path::PathBuf,
144+
) -> anyhow::Result<iroh_net::key::SecretKey> {
145+
if key_path.exists() {
146+
let keystr = tokio::fs::read(key_path).await?;
147+
let secret_key =
148+
iroh_net::key::SecretKey::try_from_openssh(keystr).context("invalid keyfile")?;
149+
Ok(secret_key)
150+
} else {
151+
let secret_key = iroh_net::key::SecretKey::generate();
152+
let ser_key = secret_key.to_openssh()?;
153+
154+
// Try to canoncialize if possible
155+
let key_path = key_path.canonicalize().unwrap_or(key_path);
156+
let key_path_parent = key_path.parent().ok_or_else(|| {
157+
anyhow::anyhow!("no parent directory found for '{}'", key_path.display())
158+
})?;
159+
tokio::fs::create_dir_all(&key_path_parent).await?;
160+
161+
// write to tempfile
162+
let (file, temp_file_path) = tempfile::NamedTempFile::new_in(key_path_parent)
163+
.context("unable to create tempfile")?
164+
.into_parts();
165+
let mut file = tokio::fs::File::from_std(file);
166+
file.write_all(ser_key.as_bytes())
167+
.await
168+
.context("unable to write keyfile")?;
169+
file.flush().await?;
170+
drop(file);
171+
172+
// move file
173+
tokio::fs::rename(temp_file_path, key_path)
174+
.await
175+
.context("failed to rename keyfile")?;
176+
177+
Ok(secret_key)
178+
}
179+
}
180+
181+
async fn create_endpoint(
182+
key: iroh_net::key::SecretKey,
183+
port: u16,
184+
publish: bool,
185+
) -> anyhow::Result<MagicEndpoint> {
186+
let pkarr = PkarrClient::new();
187+
let discovery_key = if publish { Some(&key) } else { None };
188+
let mainline_discovery = PkarrNodeDiscovery::new(pkarr, discovery_key);
189+
iroh_net::MagicEndpoint::builder()
190+
.secret_key(key)
191+
.discovery(Box::new(mainline_discovery))
192+
.alpns(vec![TRACKER_ALPN.to_vec()])
193+
.bind(port)
194+
.await
195+
}

0 commit comments

Comments
 (0)