Skip to content

Commit 492a61d

Browse files
committed
fix: add update fallback propagation
1 parent 9003287 commit 492a61d

File tree

5 files changed

+221
-30
lines changed

5 files changed

+221
-30
lines changed

crates/core/src/contract/executor/runtime.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -871,6 +871,11 @@ impl Executor<Runtime> {
871871
.await
872872
.map_err(ExecutorError::other)?;
873873

874+
tracing::info!(
875+
"Contract state updated for {key}, new_size_bytes={}",
876+
new_state.as_ref().len()
877+
);
878+
874879
if let Err(err) = self
875880
.send_update_notification(key, parameters, &new_state)
876881
.await

crates/core/src/operations/update.rs

Lines changed: 198 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use freenet_stdlib::client_api::{ErrorKind, HostResponse};
44
use freenet_stdlib::prelude::*;
55

66
pub(crate) use self::messages::UpdateMsg;
7-
use super::{OpEnum, OpError, OpInitialization, OpOutcome, Operation, OperationResult};
7+
use super::{get, OpEnum, OpError, OpInitialization, OpOutcome, Operation, OperationResult};
88
use crate::contract::{ContractHandlerEvent, StoreResponse};
99
use crate::message::{InnerMessage, NetMessage, NodeEvent, Transaction};
1010
use crate::node::IsOperationCompleted;
@@ -13,6 +13,7 @@ use crate::{
1313
client_events::HostResult,
1414
node::{NetworkBridge, OpManager, PeerId},
1515
};
16+
use std::collections::HashSet;
1617

1718
pub(crate) struct UpdateOp {
1819
pub id: Transaction,
@@ -248,9 +249,14 @@ impl Operation for UpdateOp {
248249
return_msg = None;
249250
} else {
250251
// Get broadcast targets for propagating UPDATE to subscribers
251-
let broadcast_to = op_manager
252+
let mut broadcast_to = op_manager
252253
.get_broadcast_targets_update(key, &request_sender.peer);
253254

255+
if broadcast_to.is_empty() {
256+
broadcast_to = op_manager
257+
.compute_update_fallback_targets(key, &request_sender.peer);
258+
}
259+
254260
if broadcast_to.is_empty() {
255261
tracing::debug!(
256262
tx = %id,
@@ -292,10 +298,21 @@ impl Operation for UpdateOp {
292298
}
293299
} else {
294300
// Contract not found locally - forward to another peer
295-
let next_target = op_manager.ring.closest_potentially_caching(
296-
key,
297-
[&self_location.peer, &request_sender.peer].as_slice(),
298-
);
301+
let skip_peers = [&self_location.peer, &request_sender.peer];
302+
let next_target = op_manager
303+
.ring
304+
.closest_potentially_caching(key, skip_peers.as_slice())
305+
.or_else(|| {
306+
op_manager
307+
.ring
308+
.k_closest_potentially_caching(
309+
key,
310+
skip_peers.as_slice(),
311+
5,
312+
)
313+
.into_iter()
314+
.next()
315+
});
299316

300317
if let Some(forward_target) = next_target {
301318
tracing::debug!(
@@ -316,10 +333,33 @@ impl Operation for UpdateOp {
316333
});
317334
new_state = None;
318335
} else {
336+
let skip_list = [&self_location.peer, &request_sender.peer];
337+
let subscribers = op_manager
338+
.ring
339+
.subscribers_of(key)
340+
.map(|subs| {
341+
subs.value()
342+
.iter()
343+
.map(|loc| format!("{:.8}", loc.peer))
344+
.collect::<Vec<_>>()
345+
})
346+
.unwrap_or_default();
347+
let candidates = op_manager
348+
.ring
349+
.k_closest_potentially_caching(key, skip_list.as_slice(), 5)
350+
.into_iter()
351+
.map(|loc| format!("{:.8}", loc.peer))
352+
.collect::<Vec<_>>();
353+
let connection_count =
354+
op_manager.ring.connection_manager.num_connections();
319355
// No peers available and we don't have the contract - error
320356
tracing::error!(
321357
tx = %id,
322358
%key,
359+
subscribers = ?subscribers,
360+
candidates = ?candidates,
361+
connection_count,
362+
request_sender = %request_sender.peer,
323363
"Cannot handle UPDATE: contract not found locally and no peers to forward to"
324364
);
325365
return Err(OpError::RingError(RingError::NoCachingPeers(*key)));
@@ -386,10 +426,14 @@ impl Operation for UpdateOp {
386426
return_msg = None;
387427
} else {
388428
// Get broadcast targets
389-
let broadcast_to =
429+
let mut broadcast_to =
390430
op_manager.get_broadcast_targets_update(key, &sender.peer);
391431

392-
// If no peers to broadcast to, nothing else to do
432+
if broadcast_to.is_empty() {
433+
broadcast_to =
434+
op_manager.compute_update_fallback_targets(key, &sender.peer);
435+
}
436+
393437
if broadcast_to.is_empty() {
394438
tracing::debug!(
395439
tx = %id,
@@ -447,13 +491,100 @@ impl Operation for UpdateOp {
447491
});
448492
new_state = None;
449493
} else {
450-
// No more peers to try - error
451-
tracing::error!(
494+
tracing::warn!(
452495
tx = %id,
453496
%key,
454-
"Cannot handle UPDATE SeekNode: contract not found and no peers to forward to"
497+
"No forwarding targets for UPDATE SeekNode - attempting local fetch"
455498
);
456-
return Err(OpError::RingError(RingError::NoCachingPeers(*key)));
499+
500+
let mut fetch_skip = HashSet::new();
501+
fetch_skip.insert(sender.peer.clone());
502+
503+
let get_op = get::start_op(*key, true, false);
504+
if let Err(fetch_err) =
505+
get::request_get(op_manager, get_op, fetch_skip).await
506+
{
507+
tracing::warn!(
508+
tx = %id,
509+
%key,
510+
error = %fetch_err,
511+
"Failed to fetch contract while handling UPDATE SeekNode"
512+
);
513+
return Err(OpError::RingError(RingError::NoCachingPeers(*key)));
514+
}
515+
516+
if super::has_contract(op_manager, *key).await? {
517+
tracing::info!(
518+
tx = %id,
519+
%key,
520+
"Successfully fetched contract locally, applying UPDATE"
521+
);
522+
let UpdateExecution {
523+
value: updated_value,
524+
summary: _summary,
525+
changed,
526+
} = update_contract(
527+
op_manager,
528+
*key,
529+
value.clone(),
530+
related_contracts.clone(),
531+
)
532+
.await?;
533+
534+
if !changed {
535+
tracing::debug!(
536+
tx = %id,
537+
%key,
538+
"Fetched contract apply produced no change during SeekNode fallback"
539+
);
540+
new_state = None;
541+
return_msg = None;
542+
} else {
543+
let mut broadcast_to =
544+
op_manager.get_broadcast_targets_update(key, &sender.peer);
545+
546+
if broadcast_to.is_empty() {
547+
broadcast_to = op_manager
548+
.compute_update_fallback_targets(key, &sender.peer);
549+
}
550+
551+
if broadcast_to.is_empty() {
552+
tracing::debug!(
553+
tx = %id,
554+
%key,
555+
"No broadcast targets after SeekNode fallback apply; finishing locally"
556+
);
557+
new_state = None;
558+
return_msg = None;
559+
} else {
560+
match try_to_broadcast(
561+
*id,
562+
true,
563+
op_manager,
564+
self.state,
565+
(broadcast_to, sender.clone()),
566+
*key,
567+
updated_value.clone(),
568+
false,
569+
)
570+
.await
571+
{
572+
Ok((state, msg)) => {
573+
new_state = state;
574+
return_msg = msg;
575+
}
576+
Err(err) => return Err(err),
577+
}
578+
}
579+
}
580+
} else {
581+
tracing::error!(
582+
tx = %id,
583+
%key,
584+
"Contract still unavailable after fetch attempt during UPDATE SeekNode"
585+
);
586+
return Err(OpError::RingError(RingError::NoCachingPeers(*key)));
587+
}
457588
}
458589
}
459590
}
@@ -487,9 +618,14 @@ impl Operation for UpdateOp {
487618
new_state = None;
488619
return_msg = None;
489620
} else {
490-
let broadcast_to =
621+
let mut broadcast_to =
491622
op_manager.get_broadcast_targets_update(key, &sender.peer);
492623

624+
if broadcast_to.is_empty() {
625+
broadcast_to =
626+
op_manager.compute_update_fallback_targets(key, &sender.peer);
627+
}
628+
493629
tracing::debug!(
494630
"Successfully updated a value for contract {} @ {:?} - BroadcastTo - update",
495631
key,
@@ -649,9 +785,11 @@ impl OpManager {
649785
.ring
650786
.subscribers_of(key)
651787
.map(|subs| {
788+
let self_peer = self.ring.connection_manager.get_peer_key();
652789
subs.value()
653790
.iter()
654791
.filter(|pk| &pk.peer != sender)
792+
.filter(|pk| self_peer.as_ref().map(|me| &pk.peer != me).unwrap_or(true))
655793
.cloned()
656794
.collect::<Vec<_>>()
657795
})
@@ -671,15 +809,60 @@ impl OpManager {
671809
subscribers.len()
672810
);
673811
} else {
812+
let own_peer = self.ring.connection_manager.get_peer_key();
813+
let skip_slice = std::slice::from_ref(sender);
814+
let fallback_candidates = self
815+
.ring
816+
.k_closest_potentially_caching(key, skip_slice, 5)
817+
.into_iter()
818+
.map(|candidate| format!("{:.8}", candidate.peer))
819+
.collect::<Vec<_>>();
820+
674821
tracing::warn!(
675-
"UPDATE_PROPAGATION: contract={:.8} from={} NO_TARGETS - update will not propagate",
822+
"UPDATE_PROPAGATION: contract={:.8} from={} NO_TARGETS - update will not propagate (self={:?}, fallback_candidates={:?})",
676823
key,
677-
sender
824+
sender,
825+
own_peer.map(|p| format!("{:.8}", p)),
826+
fallback_candidates
678827
);
679828
}
680829

681830
subscribers
682831
}
832+
833+
fn compute_update_fallback_targets(
834+
&self,
835+
key: &ContractKey,
836+
sender: &PeerId,
837+
) -> Vec<PeerKeyLocation> {
838+
let mut skip: HashSet<PeerId> = HashSet::new();
839+
skip.insert(sender.clone());
840+
if let Some(self_peer) = self.ring.connection_manager.get_peer_key() {
841+
skip.insert(self_peer);
842+
}
843+
844+
let candidates = self
845+
.ring
846+
.k_closest_potentially_caching(key, &skip, 3)
847+
.into_iter()
848+
.filter(|candidate| &candidate.peer != sender)
849+
.collect::<Vec<_>>();
850+
851+
if !candidates.is_empty() {
852+
tracing::info!(
853+
"UPDATE_PROPAGATION: contract={:.8} from={} using fallback targets={}",
854+
key,
855+
sender,
856+
candidates
857+
.iter()
858+
.map(|c| format!("{:.8}", c.peer))
859+
.collect::<Vec<_>>()
860+
.join(",")
861+
);
862+
}
863+
864+
candidates
865+
}
683866
}
684867

685868
fn build_op_result(

0 commit comments

Comments
 (0)