Skip to content

Commit b9d43f2

Browse files
committed
fix: add update fallback propagation
1 parent a413514 commit b9d43f2

File tree

1 file changed

+163
-49
lines changed

1 file changed

+163
-49
lines changed

crates/core/src/operations/update.rs

Lines changed: 163 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use freenet_stdlib::client_api::{ErrorKind, HostResponse};
44
use freenet_stdlib::prelude::*;
55

66
pub(crate) use self::messages::UpdateMsg;
7-
use super::{OpEnum, OpError, OpInitialization, OpOutcome, Operation, OperationResult};
7+
use super::{get, OpEnum, OpError, OpInitialization, OpOutcome, Operation, OperationResult};
88
use crate::contract::{ContractHandlerEvent, StoreResponse};
99
use crate::message::{InnerMessage, NetMessage, NodeEvent, Transaction};
1010
use crate::node::IsOperationCompleted;
@@ -13,6 +13,7 @@ use crate::{
1313
client_events::HostResult,
1414
node::{NetworkBridge, OpManager, PeerId},
1515
};
16+
use std::collections::HashSet;
1617

1718
pub(crate) struct UpdateOp {
1819
pub id: Transaction,
@@ -248,9 +249,14 @@ impl Operation for UpdateOp {
248249
return_msg = None;
249250
} else {
250251
// Get broadcast targets for propagating UPDATE to subscribers
251-
let broadcast_to = op_manager
252+
let mut broadcast_to = op_manager
252253
.get_broadcast_targets_update(key, &request_sender.peer);
253254

255+
if broadcast_to.is_empty() {
256+
broadcast_to = op_manager
257+
.compute_update_fallback_targets(key, &request_sender.peer);
258+
}
259+
254260
if broadcast_to.is_empty() {
255261
tracing::debug!(
256262
tx = %id,
@@ -292,10 +298,21 @@ impl Operation for UpdateOp {
292298
}
293299
} else {
294300
// Contract not found locally - forward to another peer
295-
let next_target = op_manager.ring.closest_potentially_caching(
296-
key,
297-
[&self_location.peer, &request_sender.peer].as_slice(),
298-
);
301+
let skip_peers = [&self_location.peer, &request_sender.peer];
302+
let next_target = op_manager
303+
.ring
304+
.closest_potentially_caching(key, skip_peers.as_slice())
305+
.or_else(|| {
306+
op_manager
307+
.ring
308+
.k_closest_potentially_caching(
309+
key,
310+
skip_peers.as_slice(),
311+
5,
312+
)
313+
.into_iter()
314+
.next()
315+
});
299316

300317
if let Some(forward_target) = next_target {
301318
tracing::debug!(
@@ -316,7 +333,6 @@ impl Operation for UpdateOp {
316333
});
317334
new_state = None;
318335
} else {
319-
// No peers available and we don't have the contract - capture context
320336
let skip_list = [&self_location.peer, &request_sender.peer];
321337
let subscribers = op_manager
322338
.ring
@@ -336,6 +352,7 @@ impl Operation for UpdateOp {
336352
.collect::<Vec<_>>();
337353
let connection_count =
338354
op_manager.ring.connection_manager.num_connections();
355+
// No peers available and we don't have the contract - error
339356
tracing::error!(
340357
tx = %id,
341358
%key,
@@ -409,10 +426,14 @@ impl Operation for UpdateOp {
409426
return_msg = None;
410427
} else {
411428
// Get broadcast targets
412-
let broadcast_to =
429+
let mut broadcast_to =
413430
op_manager.get_broadcast_targets_update(key, &sender.peer);
414431

415-
// If no peers to broadcast to, nothing else to do
432+
if broadcast_to.is_empty() {
433+
broadcast_to =
434+
op_manager.compute_update_fallback_targets(key, &sender.peer);
435+
}
436+
416437
if broadcast_to.is_empty() {
417438
tracing::debug!(
418439
tx = %id,
@@ -470,36 +491,100 @@ impl Operation for UpdateOp {
470491
});
471492
new_state = None;
472493
} else {
473-
// No more peers to try - capture context for diagnostics
474-
let skip_list = [&sender.peer, &self_location.peer];
475-
let subscribers = op_manager
476-
.ring
477-
.subscribers_of(key)
478-
.map(|subs| {
479-
subs.value()
480-
.iter()
481-
.map(|loc| format!("{:.8}", loc.peer))
482-
.collect::<Vec<_>>()
483-
})
484-
.unwrap_or_default();
485-
let candidates = op_manager
486-
.ring
487-
.k_closest_potentially_caching(key, skip_list.as_slice(), 5)
488-
.into_iter()
489-
.map(|loc| format!("{:.8}", loc.peer))
490-
.collect::<Vec<_>>();
491-
let connection_count =
492-
op_manager.ring.connection_manager.num_connections();
493-
tracing::error!(
494+
tracing::warn!(
494495
tx = %id,
495496
%key,
496-
subscribers = ?subscribers,
497-
candidates = ?candidates,
498-
connection_count,
499-
sender = %sender.peer,
500-
"Cannot handle UPDATE SeekNode: contract not found and no peers to forward to"
497+
"No forwarding targets for UPDATE SeekNode - attempting local fetch"
501498
);
502-
return Err(OpError::RingError(RingError::NoCachingPeers(*key)));
499+
500+
let mut fetch_skip = HashSet::new();
501+
fetch_skip.insert(sender.peer.clone());
502+
503+
let get_op = get::start_op(*key, true, false);
504+
if let Err(fetch_err) =
505+
get::request_get(op_manager, get_op, fetch_skip).await
506+
{
507+
tracing::warn!(
508+
tx = %id,
509+
%key,
510+
error = %fetch_err,
511+
"Failed to fetch contract while handling UPDATE SeekNode"
512+
);
513+
return Err(OpError::RingError(RingError::NoCachingPeers(*key)));
514+
}
515+
516+
if super::has_contract(op_manager, *key).await? {
517+
tracing::info!(
518+
tx = %id,
519+
%key,
520+
"Successfully fetched contract locally, applying UPDATE"
521+
);
522+
let UpdateExecution {
523+
value: updated_value,
524+
summary: _summary,
525+
changed,
526+
} = update_contract(
527+
op_manager,
528+
*key,
529+
value.clone(),
530+
related_contracts.clone(),
531+
)
532+
.await?;
533+
534+
if !changed {
535+
tracing::debug!(
536+
tx = %id,
537+
%key,
538+
"Fetched contract apply produced no change during SeekNode fallback"
539+
);
540+
new_state = None;
541+
return_msg = None;
542+
} else {
543+
let mut broadcast_to =
544+
op_manager.get_broadcast_targets_update(key, &sender.peer);
545+
546+
if broadcast_to.is_empty() {
547+
broadcast_to = op_manager
548+
.compute_update_fallback_targets(key, &sender.peer);
549+
}
550+
551+
if broadcast_to.is_empty() {
552+
tracing::debug!(
553+
tx = %id,
554+
%key,
555+
"No broadcast targets after SeekNode fallback apply; finishing locally"
556+
);
557+
new_state = None;
558+
return_msg = None;
559+
} else {
560+
match try_to_broadcast(
561+
*id,
562+
true,
563+
op_manager,
564+
self.state,
565+
(broadcast_to, sender.clone()),
566+
*key,
567+
updated_value.clone(),
568+
false,
569+
)
570+
.await
571+
{
572+
Ok((state, msg)) => {
573+
new_state = state;
574+
return_msg = msg;
575+
}
576+
Err(err) => return Err(err),
577+
}
578+
}
579+
}
580+
} else {
581+
tracing::error!(
582+
tx = %id,
583+
%key,
584+
"Contract still unavailable after fetch attempt during UPDATE SeekNode"
585+
);
586+
return Err(OpError::RingError(RingError::NoCachingPeers(*key)));
587+
}
503588
}
504589
}
505590
}
@@ -533,9 +618,14 @@ impl Operation for UpdateOp {
533618
new_state = None;
534619
return_msg = None;
535620
} else {
536-
let broadcast_to =
621+
let mut broadcast_to =
537622
op_manager.get_broadcast_targets_update(key, &sender.peer);
538623

624+
if broadcast_to.is_empty() {
625+
broadcast_to =
626+
op_manager.compute_update_fallback_targets(key, &sender.peer);
627+
}
628+
539629
tracing::debug!(
540630
"Successfully updated a value for contract {} @ {:?} - BroadcastTo - update",
541631
key,
@@ -696,20 +786,10 @@ impl OpManager {
696786
.subscribers_of(key)
697787
.map(|subs| {
698788
let self_peer = self.ring.connection_manager.get_peer_key();
699-
let allow_self = self_peer.as_ref().map(|me| me == sender).unwrap_or(false);
700789
subs.value()
701790
.iter()
702-
.filter(|pk| {
703-
// Allow the sender (or ourselves) to stay in the broadcast list when we're
704-
// originating the UPDATE so local auto-subscribes still receive events.
705-
let is_sender = &pk.peer == sender;
706-
let is_self = self_peer.as_ref() == Some(&pk.peer);
707-
if is_sender || is_self {
708-
allow_self
709-
} else {
710-
true
711-
}
712-
})
791+
.filter(|pk| &pk.peer != sender)
792+
.filter(|pk| self_peer.as_ref().map(|me| &pk.peer != me).unwrap_or(true))
713793
.cloned()
714794
.collect::<Vec<_>>()
715795
})
@@ -749,6 +829,40 @@ impl OpManager {
749829

750830
subscribers
751831
}
832+
833+
fn compute_update_fallback_targets(
834+
&self,
835+
key: &ContractKey,
836+
sender: &PeerId,
837+
) -> Vec<PeerKeyLocation> {
838+
let mut skip: HashSet<PeerId> = HashSet::new();
839+
skip.insert(sender.clone());
840+
if let Some(self_peer) = self.ring.connection_manager.get_peer_key() {
841+
skip.insert(self_peer);
842+
}
843+
844+
let candidates = self
845+
.ring
846+
.k_closest_potentially_caching(key, &skip, 3)
847+
.into_iter()
848+
.filter(|candidate| &candidate.peer != sender)
849+
.collect::<Vec<_>>();
850+
851+
if !candidates.is_empty() {
852+
tracing::info!(
853+
"UPDATE_PROPAGATION: contract={:.8} from={} using fallback targets={}",
854+
key,
855+
sender,
856+
candidates
857+
.iter()
858+
.map(|c| format!("{:.8}", c.peer))
859+
.collect::<Vec<_>>()
860+
.join(",")
861+
);
862+
}
863+
864+
candidates
865+
}
752866
}
753867

754868
fn build_op_result(

0 commit comments

Comments
 (0)