Skip to content

Commit a2f6097

Browse files
committed
Implementing DHT publishing
1 parent 85a9f7b commit a2f6097

File tree

7 files changed

+227
-85
lines changed

7 files changed

+227
-85
lines changed

iroh-mainline-content-discovery/Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

iroh-mainline-content-discovery/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ tracing = "0.1"
4040
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
4141
ttl_cache = "0.5.1"
4242
url = "2.5.0"
43+
flume = "0.11.0"
44+
genawaiter = { version = "0.99.1", features = ["futures03"] }
4345

4446
[features]
4547
cli = ["clap"]

iroh-mainline-content-discovery/src/lib.rs

Lines changed: 170 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,20 @@
1-
use std::{net::SocketAddr, sync::Arc, time::Duration};
1+
use genawaiter::sync::Gen;
2+
use std::{
3+
collections::{BTreeSet, HashSet},
4+
net::{Ipv4Addr, SocketAddr, SocketAddrV4},
5+
sync::Arc,
6+
time::Duration,
7+
};
28

39
use anyhow::Context;
10+
use futures::{future::BoxFuture, FutureExt, Stream, StreamExt};
11+
use iroh_bytes::HashAndFormat;
412
use iroh_net::{
513
magic_endpoint::{get_alpn, get_remote_node_id},
614
MagicEndpoint, NodeId,
715
};
816
use iroh_pkarr_node_discovery::PkarrNodeDiscovery;
17+
use mainline::common::{GetPeerResponse, StoreQueryMetdata};
918
use pkarr::PkarrClient;
1019
use protocol::QueryResponse;
1120
use tokio::io::AsyncWriteExt;
@@ -18,36 +27,6 @@ pub mod options;
1827
pub mod protocol;
1928
pub mod tracker;
2029

21-
/// A tracker id for queries - either a node id or an address.
22-
#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
23-
pub enum TrackerId {
24-
NodeId(NodeId),
25-
Addr(std::net::SocketAddr),
26-
}
27-
28-
impl std::fmt::Display for TrackerId {
29-
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
30-
match self {
31-
TrackerId::NodeId(node_id) => write!(f, "{}", node_id),
32-
TrackerId::Addr(addr) => write!(f, "{}", addr),
33-
}
34-
}
35-
}
36-
37-
impl std::str::FromStr for TrackerId {
38-
type Err = anyhow::Error;
39-
40-
fn from_str(s: &str) -> Result<Self, Self::Err> {
41-
if let Ok(node_id) = s.parse() {
42-
return Ok(TrackerId::NodeId(node_id));
43-
}
44-
if let Ok(addr) = s.parse() {
45-
return Ok(TrackerId::Addr(addr));
46-
}
47-
anyhow::bail!("invalid tracker id")
48-
}
49-
}
50-
5130
/// Accept an incoming connection and extract the client-provided [`NodeId`] and ALPN protocol.
5231
pub(crate) async fn accept_conn(
5332
mut conn: quinn::Connecting,
@@ -78,6 +57,100 @@ pub async fn announce(connection: quinn::Connection, args: Announce) -> anyhow::
7857
Ok(())
7958
}
8059

60+
fn to_infohash(haf: HashAndFormat) -> mainline::Id {
61+
let mut data = [0u8; 20];
62+
data.copy_from_slice(&haf.hash.as_bytes()[..20]);
63+
mainline::Id::from_bytes(data).unwrap()
64+
}
65+
66+
fn unique_tracker_addrs(
67+
mut response: mainline::common::Response<GetPeerResponse>,
68+
) -> impl Stream<Item = SocketAddr> {
69+
Gen::new(|co| async move {
70+
let mut found = HashSet::new();
71+
while let Some(response) = response.next_async().await {
72+
let tracker = response.peer;
73+
if !found.insert(tracker) {
74+
continue;
75+
}
76+
co.yield_(tracker).await;
77+
}
78+
})
79+
}
80+
81+
async fn query_one(
82+
endpoint: impl ConnectionProvider,
83+
addr: SocketAddr,
84+
args: Query,
85+
) -> anyhow::Result<Vec<NodeId>> {
86+
let connection = endpoint.connect(addr).await?;
87+
let result = query(connection, args).await?;
88+
Ok(result.hosts)
89+
}
90+
91+
/// A connection provider that can be used to connect to a tracker.
92+
///
93+
/// This can either be a [`quinn::Endpoint`] where connections are created on demand,
94+
/// or some sort of connection pool.
95+
pub trait ConnectionProvider: Clone {
96+
fn connect(&self, addr: SocketAddr) -> BoxFuture<anyhow::Result<quinn::Connection>>;
97+
}
98+
99+
impl ConnectionProvider for quinn::Endpoint {
100+
fn connect(&self, addr: SocketAddr) -> BoxFuture<anyhow::Result<quinn::Connection>> {
101+
async move { Ok(self.connect(addr, "localhost")?.await?) }.boxed()
102+
}
103+
}
104+
105+
/// Query the mainline DHT for trackers for the given content, then query each tracker for peers.
106+
pub async fn query_dht(
107+
endpoint: impl ConnectionProvider,
108+
dht: mainline::dht::Dht,
109+
args: Query,
110+
query_parallelism: usize,
111+
) -> impl Stream<Item = anyhow::Result<NodeId>> {
112+
let dht = dht.as_async();
113+
let info_hash = to_infohash(args.content);
114+
let response: mainline::common::Response<GetPeerResponse> = dht.get_peers(info_hash);
115+
let unique_tracker_addrs = unique_tracker_addrs(response);
116+
unique_tracker_addrs
117+
.map(move |addr| {
118+
let endpoint = endpoint.clone();
119+
async move {
120+
let hosts = match query_one(endpoint, addr, args).await {
121+
Ok(hosts) => hosts.into_iter().map(anyhow::Ok).collect(),
122+
Err(cause) => vec![Err(cause)],
123+
};
124+
futures::stream::iter(hosts)
125+
}
126+
})
127+
.buffer_unordered(query_parallelism)
128+
.flatten()
129+
}
130+
131+
/// Announce to the mainline DHT in parallel.
132+
///
133+
/// Note that this should only be called from a publicly reachable node, where port is the port
134+
/// on which the tracker protocol is reachable.
135+
pub fn announce_dht(
136+
dht: mainline::dht::Dht,
137+
content: BTreeSet<HashAndFormat>,
138+
port: u16,
139+
announce_parallelism: usize,
140+
) -> impl Stream<Item = (HashAndFormat, mainline::Result<StoreQueryMetdata>)> {
141+
let dht = dht.as_async();
142+
futures::stream::iter(content)
143+
.map(move |content| {
144+
let dht = dht.clone();
145+
async move {
146+
let info_hash = to_infohash(content);
147+
let res = dht.announce_peer(info_hash, Some(port)).await;
148+
(content, res)
149+
}
150+
})
151+
.buffer_unordered(announce_parallelism)
152+
}
153+
81154
/// Assume an existing connection to a tracker and query it for peers for some content.
82155
pub async fn query(connection: quinn::Connection, args: Query) -> anyhow::Result<QueryResponse> {
83156
tracing::info!("connected to {:?}", connection.remote_address());
@@ -193,3 +266,69 @@ async fn create_endpoint(
193266
.bind(port)
194267
.await
195268
}
269+
270+
/// A tracker id for queries - either a node id or an address.
271+
#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
272+
pub enum TrackerId {
273+
NodeId(NodeId),
274+
Addr(std::net::SocketAddr),
275+
}
276+
277+
impl std::fmt::Display for TrackerId {
278+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
279+
match self {
280+
TrackerId::NodeId(node_id) => write!(f, "{}", node_id),
281+
TrackerId::Addr(addr) => write!(f, "{}", addr),
282+
}
283+
}
284+
}
285+
286+
impl std::str::FromStr for TrackerId {
287+
type Err = anyhow::Error;
288+
289+
fn from_str(s: &str) -> Result<Self, Self::Err> {
290+
if let Ok(node_id) = s.parse() {
291+
return Ok(TrackerId::NodeId(node_id));
292+
}
293+
if let Ok(addr) = s.parse() {
294+
return Ok(TrackerId::Addr(addr));
295+
}
296+
anyhow::bail!("invalid tracker id")
297+
}
298+
}
299+
300+
/// Connect to a tracker using the [protocol::TRACKER_ALPN] protocol, using either
301+
/// a node id or an address.
302+
///
303+
/// Note that this is less efficient than using an existing endpoint when doing multiple requests.
304+
/// It is provided as a convenience function for short lived utilities.
305+
pub async fn connect(tracker: &TrackerId, local_port: u16) -> anyhow::Result<quinn::Connection> {
306+
match tracker {
307+
TrackerId::Addr(tracker) => connect_socket(*tracker, local_port).await,
308+
TrackerId::NodeId(tracker) => connect_magic(&tracker, local_port).await,
309+
}
310+
}
311+
312+
/// Create a magic endpoint and connect to a tracker using the [protocol::TRACKER_ALPN] protocol.
313+
pub async fn connect_magic(tracker: &NodeId, local_port: u16) -> anyhow::Result<quinn::Connection> {
314+
// todo: uncomment once the connection problems are fixed
315+
// for now, a random node id is more reliable.
316+
// let key = load_secret_key(tracker_path(CLIENT_KEY)?).await?;
317+
let key = iroh_net::key::SecretKey::generate();
318+
let endpoint = create_endpoint(key, local_port, false).await?;
319+
tracing::info!("trying to connect to tracker at {:?}", tracker);
320+
let connection = endpoint.connect_by_node_id(tracker, TRACKER_ALPN).await?;
321+
Ok(connection)
322+
}
323+
324+
/// Create a quinn endpoint and connect to a tracker using the [protocol::TRACKER_ALPN] protocol.
325+
pub async fn connect_socket(
326+
tracker: SocketAddr,
327+
local_port: u16,
328+
) -> anyhow::Result<quinn::Connection> {
329+
let bind_addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, local_port));
330+
let endpoint = create_quinn_client(bind_addr, vec![TRACKER_ALPN.to_vec()], false)?;
331+
tracing::info!("trying to connect to tracker at {:?}", tracker);
332+
let connection = endpoint.connect(tracker, "localhost")?.await?;
333+
Ok(connection)
334+
}

iroh-mainline-content-discovery/src/main.rs

Lines changed: 7 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -12,17 +12,13 @@ use std::{
1212

1313
use anyhow::Context;
1414
use clap::Parser;
15-
use iroh_mainline_content_discovery::TrackerId;
1615
use iroh_mainline_content_discovery::{
1716
io::{
1817
self, load_from_file, setup_logging, tracker_home, tracker_path, CONFIG_DEFAULTS_FILE,
1918
CONFIG_FILE, SERVER_KEY_FILE,
2019
},
2120
options::Options,
22-
protocol::{
23-
Announce, AnnounceKind, Query, QueryFlags, Request, Response, REQUEST_SIZE_LIMIT,
24-
TRACKER_ALPN,
25-
},
21+
protocol::{Announce, AnnounceKind, Query, QueryFlags, TRACKER_ALPN},
2622
tracker::Tracker,
2723
};
2824
use iroh_net::{
@@ -127,16 +123,17 @@ async fn server(args: ServerArgs) -> anyhow::Result<()> {
127123
"tracker query --tracker {} <hash> or <ticket>",
128124
addr.node_id
129125
);
130-
log!();
131126
let db2 = db.clone();
132127
let db3 = db.clone();
128+
let db4 = db.clone();
133129
let endpoint2 = endpoint.clone();
134130
let _probe_task = tpc.spawn_pinned(move || db2.probe_loop(endpoint2));
131+
let _announce_task = tpc.spawn_pinned(move || db3.dht_announce_loop(args.quinn_port));
135132
let magic_accept_task = tokio::spawn(db.magic_accept_loop(endpoint));
136133
let server_config = configure_server(&key)?;
137134
let bind_addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, args.quinn_port));
138135
let quinn_endpoint = quinn::Endpoint::server(server_config, bind_addr)?;
139-
let quinn_accept_task = tokio::spawn(db3.quinn_accept_loop(quinn_endpoint));
136+
let quinn_accept_task = tokio::spawn(db4.quinn_accept_loop(quinn_endpoint));
140137
magic_accept_task.await??;
141138
quinn_accept_task.await??;
142139
Ok(())
@@ -189,7 +186,9 @@ async fn announce(args: AnnounceArgs) -> anyhow::Result<()> {
189186
}
190187

191188
async fn query(args: QueryArgs) -> anyhow::Result<()> {
192-
let connection = connect(&args.tracker).await?;
189+
let connection =
190+
iroh_mainline_content_discovery::connect(&args.tracker, args.port.unwrap_or_default())
191+
.await?;
193192
let q = Query {
194193
content: args.content.hash_and_format(),
195194
flags: QueryFlags {
@@ -208,32 +207,6 @@ async fn query(args: QueryArgs) -> anyhow::Result<()> {
208207
Ok(())
209208
}
210209

211-
async fn connect(tracker: &TrackerId) -> anyhow::Result<quinn::Connection> {
212-
match tracker {
213-
TrackerId::Addr(tracker) => connect_socket(*tracker).await,
214-
TrackerId::NodeId(tracker) => connect_magic(tracker.clone()).await,
215-
}
216-
}
217-
218-
async fn connect_magic(tracker: NodeId) -> anyhow::Result<quinn::Connection> {
219-
// todo: uncomment once the connection problems are fixed
220-
// for now, a random node id is more reliable.
221-
// let key = load_secret_key(tracker_path(CLIENT_KEY)?).await?;
222-
let key = iroh_net::key::SecretKey::generate();
223-
let endpoint = create_endpoint(key, 0, false).await?;
224-
tracing::info!("trying to connect to tracker at {:?}", tracker);
225-
let connection = endpoint.connect_by_node_id(&tracker, TRACKER_ALPN).await?;
226-
Ok(connection)
227-
}
228-
229-
async fn connect_socket(tracker: SocketAddr) -> anyhow::Result<quinn::Connection> {
230-
let bind_addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0));
231-
let endpoint = create_quinn_client(bind_addr, vec![TRACKER_ALPN.to_vec()], false)?;
232-
tracing::info!("trying to connect to tracker at {:?}", tracker);
233-
let connection = endpoint.connect(tracker, "localhost")?.await?;
234-
Ok(connection)
235-
}
236-
237210
#[tokio::main(flavor = "multi_thread")]
238211
async fn main() -> anyhow::Result<()> {
239212
setup_logging();
@@ -251,23 +224,6 @@ fn configure_server(secret_key: &iroh_net::key::SecretKey) -> anyhow::Result<qui
251224
make_server_config(secret_key, 8, 1024, vec![TRACKER_ALPN.to_vec()])
252225
}
253226

254-
fn create_quinn_client(
255-
bind_addr: SocketAddr,
256-
alpn_protocols: Vec<Vec<u8>>,
257-
keylog: bool,
258-
) -> anyhow::Result<quinn::Endpoint> {
259-
let secret_key = iroh_net::key::SecretKey::generate();
260-
let tls_client_config =
261-
iroh_net::tls::make_client_config(&secret_key, None, alpn_protocols, keylog)?;
262-
let mut client_config = quinn::ClientConfig::new(Arc::new(tls_client_config));
263-
let mut endpoint = quinn::Endpoint::client(bind_addr)?;
264-
let mut transport_config = quinn::TransportConfig::default();
265-
transport_config.keep_alive_interval(Some(Duration::from_secs(1)));
266-
client_config.transport_config(Arc::new(transport_config));
267-
endpoint.set_default_client_config(client_config);
268-
Ok(endpoint)
269-
}
270-
271227
/// Create a [`quinn::ServerConfig`] with the given secret key and limits.
272228
pub fn make_server_config(
273229
secret_key: &iroh_net::key::SecretKey,

iroh-mainline-content-discovery/src/options.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,10 @@ pub struct Options {
2929
pub announce_data_path: Option<PathBuf>,
3030
// number of peers to probe in parallel
3131
pub probe_parallelism: usize,
32+
33+
pub dht_announce_parallelism: usize,
34+
35+
pub dht_announce_interval: Duration,
3236
}
3337

3438
impl Default for Options {
@@ -43,6 +47,8 @@ impl Default for Options {
4347
probe_log: Some("probe.log".into()),
4448
announce_data_path: Some("announce.data.toml".into()),
4549
probe_parallelism: 4,
50+
dht_announce_parallelism: 4,
51+
dht_announce_interval: Duration::from_secs(10),
4652
}
4753
}
4854
}

iroh-mainline-content-discovery/src/protocol.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ pub struct Announce {
4545
}
4646

4747
///
48-
#[derive(Debug, Clone, Serialize, Deserialize)]
48+
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
4949
pub struct QueryFlags {
5050
/// Only return peers that supposedly have the complete data.
5151
///
@@ -63,7 +63,7 @@ pub struct QueryFlags {
6363
}
6464

6565
/// Query a peer for a blob or set of blobs.
66-
#[derive(Debug, Clone, Serialize, Deserialize)]
66+
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
6767
pub struct Query {
6868
/// The content we want to find.
6969
///

0 commit comments

Comments
 (0)