Skip to content

Commit 615f02d

Browse files
authored
fix: route successful PUT responses back through forwarding peers (#2036)
1 parent a4a834f commit 615f02d

File tree

2 files changed

+222
-3
lines changed

2 files changed

+222
-3
lines changed

crates/core/src/operations/put.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -269,7 +269,7 @@ impl Operation for PutOp {
269269
// Create a SeekNode message to forward to the next hop
270270
return_msg = Some(PutMsg::SeekNode {
271271
id: *id,
272-
sender: sender.clone(),
272+
sender: own_location.clone(),
273273
target: forward_target,
274274
value: modified_value.clone(),
275275
contract: contract.clone(),
@@ -298,7 +298,7 @@ impl Operation for PutOp {
298298
id: *id,
299299
target: sender.clone(),
300300
key,
301-
sender: own_location,
301+
sender: own_location.clone(),
302302
});
303303

304304
// Mark operation as finished

crates/core/tests/operations.rs

Lines changed: 220 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use anyhow::{anyhow, bail};
22
use freenet::{
33
config::{ConfigArgs, InlineGwConfig, NetworkArgs, SecretArgs, WebsocketApiArgs},
4-
dev_tool::TransportKeypair,
4+
dev_tool::{Location, TransportKeypair},
55
local_node::NodeConfig,
66
server::serve_gateway,
77
test_utils::{
@@ -98,6 +98,11 @@ fn gw_config(port: u16, path: &Path) -> anyhow::Result<InlineGwConfig> {
9898
})
9999
}
100100

101+
fn ring_distance(a: f64, b: f64) -> f64 {
102+
let diff = (a - b).abs();
103+
diff.min(1.0 - diff)
104+
}
105+
101106
async fn get_contract(
102107
client: &mut WebApi,
103108
key: ContractKey,
@@ -303,6 +308,220 @@ async fn test_put_contract() -> TestResult {
303308
Ok(())
304309
}
305310

311+
/// Ensure a client-only peer receives PutResponse when the contract is seeded on a third hop.
312+
#[test_log::test(tokio::test(flavor = "multi_thread", worker_threads = 4))]
313+
async fn test_put_contract_three_hop_returns_response() -> TestResult {
314+
const TEST_CONTRACT: &str = "test-contract-integration";
315+
let contract = test_utils::load_contract(TEST_CONTRACT, vec![].into())?;
316+
let contract_key = contract.key();
317+
let contract_location = Location::from(&contract_key);
318+
319+
let initial_state = test_utils::create_empty_todo_list();
320+
let wrapped_state = WrappedState::from(initial_state);
321+
322+
let network_socket_gw = TcpListener::bind("127.0.0.1:0")?;
323+
let ws_api_port_socket_a = TcpListener::bind("127.0.0.1:0")?;
324+
let ws_api_port_socket_b = TcpListener::bind("127.0.0.1:0")?;
325+
let ws_api_port_socket_c = TcpListener::bind("127.0.0.1:0")?;
326+
327+
let (mut config_b, preset_cfg_b, mut config_b_gw) = {
328+
let (cfg, preset) = base_node_test_config(
329+
true,
330+
vec![],
331+
Some(network_socket_gw.local_addr()?.port()),
332+
ws_api_port_socket_b.local_addr()?.port(),
333+
)
334+
.await?;
335+
let public_port = cfg.network_api.public_port.unwrap();
336+
let path = preset.temp_dir.path().to_path_buf();
337+
(cfg, preset, gw_config(public_port, &path)?)
338+
};
339+
340+
let gateway_location = 0.25;
341+
config_b.network_api.location = Some(gateway_location);
342+
config_b_gw.location = Some(gateway_location);
343+
let gw_config_json = serde_json::to_string(&config_b_gw)?;
344+
345+
let (mut config_c, preset_cfg_c) = base_node_test_config(
346+
false,
347+
vec![gw_config_json.clone()],
348+
None,
349+
ws_api_port_socket_c.local_addr()?.port(),
350+
)
351+
.await?;
352+
let ws_api_port_peer_c = config_c.ws_api.ws_api_port.unwrap();
353+
let mut target_location = contract_location.as_f64();
354+
if ring_distance(target_location, gateway_location) < 0.05 {
355+
target_location = (target_location + 0.1).rem_euclid(1.0);
356+
}
357+
config_c.network_api.location = Some(target_location);
358+
assert!(
359+
ring_distance(target_location, gateway_location) > 0.02,
360+
"target location unexpectedly close to gateway"
361+
);
362+
363+
let (mut config_a, preset_cfg_a) = base_node_test_config(
364+
false,
365+
vec![gw_config_json.clone()],
366+
None,
367+
ws_api_port_socket_a.local_addr()?.port(),
368+
)
369+
.await?;
370+
let mut far_location = (target_location + 0.5).rem_euclid(1.0);
371+
if ring_distance(far_location, target_location) < 0.05 {
372+
far_location = (far_location + 0.25).rem_euclid(1.0);
373+
}
374+
config_a.network_api.location = Some(far_location);
375+
assert!(
376+
ring_distance(far_location, target_location) > 0.02,
377+
"client location unexpectedly close to target"
378+
);
379+
let ws_api_port_peer_a = config_a.ws_api.ws_api_port.unwrap();
380+
let ws_api_port_peer_b = config_b.ws_api.ws_api_port.unwrap();
381+
382+
tracing::info!("Node A data dir: {:?}", preset_cfg_a.temp_dir.path());
383+
tracing::info!("Gateway node data dir: {:?}", preset_cfg_b.temp_dir.path());
384+
tracing::info!("Node C data dir: {:?}", preset_cfg_c.temp_dir.path());
385+
386+
std::mem::drop(ws_api_port_socket_a);
387+
std::mem::drop(ws_api_port_socket_b);
388+
std::mem::drop(ws_api_port_socket_c);
389+
std::mem::drop(network_socket_gw);
390+
391+
let node_a = async move {
392+
tracing::info!("Starting peer A node (client)");
393+
let config = config_a.build().await?;
394+
let node = NodeConfig::new(config.clone())
395+
.await?
396+
.build(serve_gateway(config.ws_api).await)
397+
.await?;
398+
tracing::info!("Peer A node running");
399+
node.run().await
400+
}
401+
.instrument(tracing::info_span!("test_peer", test_node = "peer-a"))
402+
.boxed_local();
403+
404+
let node_c = async move {
405+
tracing::info!("Starting peer C node (target)");
406+
let config = config_c.build().await?;
407+
let node = NodeConfig::new(config.clone())
408+
.await?
409+
.build(serve_gateway(config.ws_api).await)
410+
.await?;
411+
tracing::info!("Peer C node running");
412+
node.run().await
413+
}
414+
.instrument(tracing::info_span!("test_peer", test_node = "peer-c"))
415+
.boxed_local();
416+
417+
let node_b = async {
418+
tracing::info!("Starting gateway node");
419+
let config = config_b.build().await?;
420+
let node = NodeConfig::new(config.clone())
421+
.await?
422+
.build(serve_gateway(config.ws_api).await)
423+
.await?;
424+
tracing::info!("Gateway node running");
425+
node.run().await
426+
}
427+
.instrument(tracing::info_span!("test_peer", test_node = "gateway"))
428+
.boxed_local();
429+
430+
let test = tokio::time::timeout(Duration::from_secs(240), async {
431+
tracing::info!("Waiting for nodes to establish connections...");
432+
tokio::time::sleep(Duration::from_secs(15)).await;
433+
434+
let uri_a = format!(
435+
"ws://127.0.0.1:{ws_api_port_peer_a}/v1/contract/command?encodingProtocol=native"
436+
);
437+
let (stream_a, _) = connect_async(&uri_a).await?;
438+
let mut client_api_a = WebApi::start(stream_a);
439+
440+
make_put(
441+
&mut client_api_a,
442+
wrapped_state.clone(),
443+
contract.clone(),
444+
false,
445+
)
446+
.await?;
447+
448+
tracing::info!("Waiting for PUT response from peer A...");
449+
let resp = tokio::time::timeout(Duration::from_secs(120), client_api_a.recv()).await;
450+
match resp {
451+
Ok(Ok(HostResponse::ContractResponse(ContractResponse::PutResponse { key }))) => {
452+
tracing::info!("PUT successful for contract: {}", key);
453+
assert_eq!(key, contract_key);
454+
}
455+
Ok(Ok(other)) => {
456+
bail!("Unexpected response while waiting for put: {:?}", other);
457+
}
458+
Ok(Err(e)) => {
459+
bail!("Error receiving put response: {}", e);
460+
}
461+
Err(_) => {
462+
bail!("Timeout waiting for put response after 120 seconds");
463+
}
464+
}
465+
466+
let uri_c = format!(
467+
"ws://127.0.0.1:{ws_api_port_peer_c}/v1/contract/command?encodingProtocol=native"
468+
);
469+
let (stream_c, _) = connect_async(&uri_c).await?;
470+
let mut client_api_c = WebApi::start(stream_c);
471+
let (response_contract, response_state) =
472+
get_contract(&mut client_api_c, contract_key, &preset_cfg_c.temp_dir).await?;
473+
assert_eq!(response_contract, contract);
474+
assert_eq!(response_state, wrapped_state);
475+
476+
client_api_c
477+
.send(ClientRequest::Disconnect { cause: None })
478+
.await?;
479+
tokio::time::sleep(Duration::from_millis(100)).await;
480+
481+
client_api_a
482+
.send(ClientRequest::Disconnect { cause: None })
483+
.await?;
484+
tokio::time::sleep(Duration::from_millis(100)).await;
485+
486+
let uri_b = format!(
487+
"ws://127.0.0.1:{ws_api_port_peer_b}/v1/contract/command?encodingProtocol=native"
488+
);
489+
let (stream_b, _) = connect_async(&uri_b).await?;
490+
let mut client_api_b = WebApi::start(stream_b);
491+
let (gw_contract, gw_state) =
492+
get_contract(&mut client_api_b, contract_key, &preset_cfg_b.temp_dir).await?;
493+
assert_eq!(gw_contract, contract);
494+
assert_eq!(gw_state, wrapped_state);
495+
client_api_b
496+
.send(ClientRequest::Disconnect { cause: None })
497+
.await?;
498+
tokio::time::sleep(Duration::from_millis(100)).await;
499+
500+
Ok::<_, anyhow::Error>(())
501+
});
502+
503+
select! {
504+
a = node_a => {
505+
let Err(a) = a;
506+
return Err(anyhow!(a).into());
507+
}
508+
c = node_c => {
509+
let Err(c) = c;
510+
return Err(anyhow!(c).into());
511+
}
512+
b = node_b => {
513+
let Err(b) = b;
514+
return Err(anyhow!(b).into());
515+
}
516+
r = test => {
517+
r??;
518+
tokio::time::sleep(Duration::from_secs(3)).await;
519+
}
520+
}
521+
522+
Ok(())
523+
}
524+
306525
#[test_log::test(tokio::test(flavor = "multi_thread", worker_threads = 4))]
307526
async fn test_update_contract() -> TestResult {
308527
// Load test contract

0 commit comments

Comments
 (0)