From 66110e0daf6bac29cf72f2b8f55ae8e20792a811 Mon Sep 17 00:00:00 2001 From: Ian Clarke Date: Fri, 7 Nov 2025 16:36:04 +0100 Subject: [PATCH 1/4] fix: add update fallback propagation --- crates/core/src/contract/executor/runtime.rs | 5 + crates/core/src/operations/update.rs | 213 ++++++++++++++++-- crates/core/src/wasm_runtime/store.rs | 19 +- .../wasm_runtime/tests/contract_metering.rs | 9 +- crates/core/src/wasm_runtime/tests/mod.rs | 5 +- 5 files changed, 221 insertions(+), 30 deletions(-) diff --git a/crates/core/src/contract/executor/runtime.rs b/crates/core/src/contract/executor/runtime.rs index dc9cdabc1..ff152c69e 100644 --- a/crates/core/src/contract/executor/runtime.rs +++ b/crates/core/src/contract/executor/runtime.rs @@ -871,6 +871,11 @@ impl Executor { .await .map_err(ExecutorError::other)?; + tracing::info!( + "Contract state updated for {key}, new_size_bytes={}", + new_state.as_ref().len() + ); + if let Err(err) = self .send_update_notification(key, parameters, &new_state) .await diff --git a/crates/core/src/operations/update.rs b/crates/core/src/operations/update.rs index 4b21ccc72..b6ba487a4 100644 --- a/crates/core/src/operations/update.rs +++ b/crates/core/src/operations/update.rs @@ -4,7 +4,7 @@ use freenet_stdlib::client_api::{ErrorKind, HostResponse}; use freenet_stdlib::prelude::*; pub(crate) use self::messages::UpdateMsg; -use super::{OpEnum, OpError, OpInitialization, OpOutcome, Operation, OperationResult}; +use super::{get, OpEnum, OpError, OpInitialization, OpOutcome, Operation, OperationResult}; use crate::contract::{ContractHandlerEvent, StoreResponse}; use crate::message::{InnerMessage, NetMessage, NodeEvent, Transaction}; use crate::node::IsOperationCompleted; @@ -13,6 +13,7 @@ use crate::{ client_events::HostResult, node::{NetworkBridge, OpManager, PeerId}, }; +use std::collections::HashSet; pub(crate) struct UpdateOp { pub id: Transaction, @@ -248,9 +249,14 @@ impl Operation for UpdateOp { return_msg = None; } else { // Get broadcast targets for propagating UPDATE to subscribers - let broadcast_to = op_manager + let mut broadcast_to = op_manager .get_broadcast_targets_update(key, &request_sender.peer); + if broadcast_to.is_empty() { + broadcast_to = op_manager + .compute_update_fallback_targets(key, &request_sender.peer); + } + if broadcast_to.is_empty() { tracing::debug!( tx = %id, @@ -292,10 +298,21 @@ impl Operation for UpdateOp { } } else { // Contract not found locally - forward to another peer - let next_target = op_manager.ring.closest_potentially_caching( - key, - [&self_location.peer, &request_sender.peer].as_slice(), - ); + let skip_peers = [&self_location.peer, &request_sender.peer]; + let next_target = op_manager + .ring + .closest_potentially_caching(key, skip_peers.as_slice()) + .or_else(|| { + op_manager + .ring + .k_closest_potentially_caching( + key, + skip_peers.as_slice(), + 5, + ) + .into_iter() + .next() + }); if let Some(forward_target) = next_target { tracing::debug!( @@ -316,10 +333,33 @@ impl Operation for UpdateOp { }); new_state = None; } else { + let skip_list = [&self_location.peer, &request_sender.peer]; + let subscribers = op_manager + .ring + .subscribers_of(key) + .map(|subs| { + subs.value() + .iter() + .map(|loc| format!("{:.8}", loc.peer)) + .collect::>() + }) + .unwrap_or_default(); + let candidates = op_manager + .ring + .k_closest_potentially_caching(key, skip_list.as_slice(), 5) + .into_iter() + .map(|loc| format!("{:.8}", loc.peer)) + .collect::>(); + let connection_count = + op_manager.ring.connection_manager.num_connections(); // No peers available and we don't have the contract - error tracing::error!( tx = %id, %key, + subscribers = ?subscribers, + candidates = ?candidates, + connection_count, + request_sender = %request_sender.peer, "Cannot handle UPDATE: contract not found locally and no peers to forward to" ); return Err(OpError::RingError(RingError::NoCachingPeers(*key))); @@ -386,10 +426,14 @@ impl Operation for UpdateOp { return_msg = None; } else { // Get broadcast targets - let broadcast_to = + let mut broadcast_to = op_manager.get_broadcast_targets_update(key, &sender.peer); - // If no peers to broadcast to, nothing else to do + if broadcast_to.is_empty() { + broadcast_to = + op_manager.compute_update_fallback_targets(key, &sender.peer); + } + if broadcast_to.is_empty() { tracing::debug!( tx = %id, @@ -447,13 +491,100 @@ impl Operation for UpdateOp { }); new_state = None; } else { - // No more peers to try - error - tracing::error!( + tracing::warn!( tx = %id, %key, - "Cannot handle UPDATE SeekNode: contract not found and no peers to forward to" + "No forwarding targets for UPDATE SeekNode - attempting local fetch" ); - return Err(OpError::RingError(RingError::NoCachingPeers(*key))); + + let mut fetch_skip = HashSet::new(); + fetch_skip.insert(sender.peer.clone()); + + let get_op = get::start_op(*key, true, false); + if let Err(fetch_err) = + get::request_get(op_manager, get_op, fetch_skip).await + { + tracing::warn!( + tx = %id, + %key, + error = %fetch_err, + "Failed to fetch contract while handling UPDATE SeekNode" + ); + return Err(OpError::RingError(RingError::NoCachingPeers(*key))); + } + + if super::has_contract(op_manager, *key).await? { + tracing::info!( + tx = %id, + %key, + "Successfully fetched contract locally, applying UPDATE" + ); + let UpdateExecution { + value: updated_value, + summary: _summary, + changed, + } = update_contract( + op_manager, + *key, + value.clone(), + related_contracts.clone(), + ) + .await?; + + if !changed { + tracing::debug!( + tx = %id, + %key, + "Fetched contract apply produced no change during SeekNode fallback" + ); + new_state = None; + return_msg = None; + } else { + let mut broadcast_to = + op_manager.get_broadcast_targets_update(key, &sender.peer); + + if broadcast_to.is_empty() { + broadcast_to = op_manager + .compute_update_fallback_targets(key, &sender.peer); + } + + if broadcast_to.is_empty() { + tracing::debug!( + tx = %id, + %key, + "No broadcast targets after SeekNode fallback apply; finishing locally" + ); + new_state = None; + return_msg = None; + } else { + match try_to_broadcast( + *id, + true, + op_manager, + self.state, + (broadcast_to, sender.clone()), + *key, + updated_value.clone(), + false, + ) + .await + { + Ok((state, msg)) => { + new_state = state; + return_msg = msg; + } + Err(err) => return Err(err), + } + } + } + } else { + tracing::error!( + tx = %id, + %key, + "Contract still unavailable after fetch attempt during UPDATE SeekNode" + ); + return Err(OpError::RingError(RingError::NoCachingPeers(*key))); + } } } } @@ -487,9 +618,14 @@ impl Operation for UpdateOp { new_state = None; return_msg = None; } else { - let broadcast_to = + let mut broadcast_to = op_manager.get_broadcast_targets_update(key, &sender.peer); + if broadcast_to.is_empty() { + broadcast_to = + op_manager.compute_update_fallback_targets(key, &sender.peer); + } + tracing::debug!( "Successfully updated a value for contract {} @ {:?} - BroadcastTo - update", key, @@ -649,9 +785,11 @@ impl OpManager { .ring .subscribers_of(key) .map(|subs| { + let self_peer = self.ring.connection_manager.get_peer_key(); subs.value() .iter() .filter(|pk| &pk.peer != sender) + .filter(|pk| self_peer.as_ref().map(|me| &pk.peer != me).unwrap_or(true)) .cloned() .collect::>() }) @@ -671,15 +809,60 @@ impl OpManager { subscribers.len() ); } else { + let own_peer = self.ring.connection_manager.get_peer_key(); + let skip_slice = std::slice::from_ref(sender); + let fallback_candidates = self + .ring + .k_closest_potentially_caching(key, skip_slice, 5) + .into_iter() + .map(|candidate| format!("{:.8}", candidate.peer)) + .collect::>(); + tracing::warn!( - "UPDATE_PROPAGATION: contract={:.8} from={} NO_TARGETS - update will not propagate", + "UPDATE_PROPAGATION: contract={:.8} from={} NO_TARGETS - update will not propagate (self={:?}, fallback_candidates={:?})", key, - sender + sender, + own_peer.map(|p| format!("{:.8}", p)), + fallback_candidates ); } subscribers } + + fn compute_update_fallback_targets( + &self, + key: &ContractKey, + sender: &PeerId, + ) -> Vec { + let mut skip: HashSet = HashSet::new(); + skip.insert(sender.clone()); + if let Some(self_peer) = self.ring.connection_manager.get_peer_key() { + skip.insert(self_peer); + } + + let candidates = self + .ring + .k_closest_potentially_caching(key, &skip, 3) + .into_iter() + .filter(|candidate| &candidate.peer != sender) + .collect::>(); + + if !candidates.is_empty() { + tracing::info!( + "UPDATE_PROPAGATION: contract={:.8} from={} using fallback targets={}", + key, + sender, + candidates + .iter() + .map(|c| format!("{:.8}", c.peer)) + .collect::>() + .join(",") + ); + } + + candidates + } } fn build_op_result( diff --git a/crates/core/src/wasm_runtime/store.rs b/crates/core/src/wasm_runtime/store.rs index 15c701cbe..07044f377 100644 --- a/crates/core/src/wasm_runtime/store.rs +++ b/crates/core/src/wasm_runtime/store.rs @@ -7,6 +7,7 @@ use std::io::{self, BufReader, BufWriter, Seek, Write}; use std::path::{Path, PathBuf}; use std::time::Duration; use std::{fs::File, io::Read}; +use tracing::error; const INTERNAL_KEY: usize = 32; const TOMBSTONE_MARKER: usize = 1; @@ -325,7 +326,7 @@ fn compact_index_file(key_file_path: &Path) -> std::io::Re let mut original_reader = BufReader::new(original_file); let mut temp_writer = SafeWriter::::new(&temp_file_path, true).inspect_err(|_| { if let Err(e) = fs::remove_file(&lock_file_path) { - eprintln!("{}:{}: Failed to remove lock file: {e}", file!(), line!()); + error!("{}:{}: Failed to remove lock file: {e}", file!(), line!()); } })?; @@ -340,7 +341,7 @@ fn compact_index_file(key_file_path: &Path) -> std::io::Re }; if let Err(err) = temp_writer.insert_record(store_key, value) { if let Err(e) = fs::remove_file(&lock_file_path) { - eprintln!("{}:{}: Failed to remove lock file: {e}", file!(), line!()); + error!("{}:{}: Failed to remove lock file: {e}", file!(), line!()); } return Err(err); } @@ -356,7 +357,7 @@ fn compact_index_file(key_file_path: &Path) -> std::io::Re Err(other) => { // Handle other errors gracefully if let Err(e) = fs::remove_file(&lock_file_path) { - eprintln!("{}:{}: Failed to remove lock file: {e}", file!(), line!()); + error!("{}:{}: Failed to remove lock file: {e}", file!(), line!()); } return Err(other); } @@ -366,7 +367,7 @@ fn compact_index_file(key_file_path: &Path) -> std::io::Re // Check if any deleted records were found; if not, skip compaction if !any_deleted { if let Err(e) = fs::remove_file(&lock_file_path) { - eprintln!("{}:{}: Failed to remove lock file: {e}", file!(), line!()); + error!("{}:{}: Failed to remove lock file: {e}", file!(), line!()); } return Ok(()); } @@ -374,7 +375,7 @@ fn compact_index_file(key_file_path: &Path) -> std::io::Re // Clean up and finalize the compaction process if let Err(e) = temp_writer.flush() { if let Err(e) = fs::remove_file(&lock_file_path) { - eprintln!("{}:{}: Failed to remove lock file: {e}", file!(), line!()); + error!("{}:{}: Failed to remove lock file: {e}", file!(), line!()); } return Err(e); } @@ -382,14 +383,14 @@ fn compact_index_file(key_file_path: &Path) -> std::io::Re // Replace the original file with the temporary file if let Err(e) = fs::rename(&temp_file_path, key_file_path) { if let Err(e) = fs::remove_file(&lock_file_path) { - eprintln!("{}:{}: Failed to remove lock file: {e}", file!(), line!()); + error!("{}:{}: Failed to remove lock file: {e}", file!(), line!()); } return Err(e); } // Remove the lock file fs::remove_file(&lock_file_path).map_err(|e| { - eprintln!("{}:{}: Failed to remove lock file: {e}", file!(), line!()); + error!("{}:{}: Failed to remove lock file: {e}", file!(), line!()); e })?; @@ -589,13 +590,13 @@ mod tests { create_test_data(&mut file, &key_file_path, shared_data, i); } else if let Err(err) = super::compact_index_file::(&key_file_path) { - eprintln!("Thread encountered an error during compaction: {err}"); + error!("Thread encountered an error during compaction: {err}"); return Err(err); } barrier.wait(); // compact a last time so we know what data to compare against super::compact_index_file::(&key_file_path).map_err(|err| { - eprintln!("Thread encountered an error during compaction: {err}"); + error!("Thread encountered an error during compaction: {err}"); err }) }) diff --git a/crates/core/src/wasm_runtime/tests/contract_metering.rs b/crates/core/src/wasm_runtime/tests/contract_metering.rs index 88c63a857..c4d849ebe 100644 --- a/crates/core/src/wasm_runtime/tests/contract_metering.rs +++ b/crates/core/src/wasm_runtime/tests/contract_metering.rs @@ -5,6 +5,7 @@ use crate::wasm_runtime::tests::TestSetup; use crate::wasm_runtime::{ContractExecError, RuntimeInnerError}; use freenet_stdlib::prelude::*; use std::time::Instant; +use tracing::info; const TEST_CONTRACT_METERING: &str = "test_contract_metering"; @@ -52,7 +53,7 @@ fn validate_state_metering() -> Result<(), Box> { ); let duration = time.elapsed().as_secs_f64(); - println!("Duration: {duration:.2}s"); + info!("Duration: {duration:.2}s"); assert!(duration < 5.0, "Should not timeout"); assert!( @@ -103,7 +104,7 @@ fn test_update_state_metering() -> Result<(), Box> { ); let duration = time.elapsed().as_secs_f64(); - println!("Duration: {duration:.2}s"); + info!("Duration: {duration:.2}s"); assert!(duration < 5.0, "Should not timeout"); assert!( @@ -150,7 +151,7 @@ fn test_summarize_state_metering() -> Result<(), Box> { let result = runtime.summarize_state(&contract_key, &Parameters::from([].as_ref()), &state); let duration = time.elapsed().as_secs_f64(); - println!("Duration: {duration:.2}s"); + info!("Duration: {duration:.2}s"); assert!(duration < 5.0, "Should not timeout"); assert!( @@ -202,7 +203,7 @@ fn test_get_state_delta_metering() -> Result<(), Box> { ); let duration = time.elapsed().as_secs_f64(); - println!("Duration: {duration:.2}s"); + info!("Duration: {duration:.2}s"); assert!(duration < 5.0, "Should not timeout"); assert!( diff --git a/crates/core/src/wasm_runtime/tests/mod.rs b/crates/core/src/wasm_runtime/tests/mod.rs index 955c4062e..110c49a0c 100644 --- a/crates/core/src/wasm_runtime/tests/mod.rs +++ b/crates/core/src/wasm_runtime/tests/mod.rs @@ -6,6 +6,7 @@ use freenet_stdlib::prelude::{ use crate::util::tests::get_temp_dir; use crate::util::workspace::get_workspace_target_dir; +use tracing::info; use super::{ContractStore, DelegateStore, SecretsStore}; @@ -22,7 +23,7 @@ pub(crate) fn get_test_module(name: &str) -> Result, Box Result, Box Date: Fri, 7 Nov 2025 22:11:03 +0100 Subject: [PATCH 2/4] fix(update): keep local subscribers and add PUT ack tracing --- crates/core/src/operations/put.rs | 16 ++++++++++++++++ crates/core/src/operations/update.rs | 23 +++++++++++++++++++++-- 2 files changed, 37 insertions(+), 2 deletions(-) diff --git a/crates/core/src/operations/put.rs b/crates/core/src/operations/put.rs index 2996bab9a..07dc0accc 100644 --- a/crates/core/src/operations/put.rs +++ b/crates/core/src/operations/put.rs @@ -570,6 +570,11 @@ impl Operation for PutOp { return_msg = None; } PutMsg::SuccessfulPut { id, .. } => { + tracing::debug!( + tx = %id, + current_state = ?self.state, + "PutOp::process_message: handling SuccessfulPut" + ); match self.state { Some(PutState::AwaitingResponse { key, @@ -654,6 +659,12 @@ impl Operation for PutOp { // Forward success message upstream if needed if let Some(upstream) = upstream { + tracing::trace!( + tx = %id, + %key, + upstream = %upstream.peer, + "PutOp::process_message: Forwarding SuccessfulPut upstream" + ); return_msg = Some(PutMsg::SuccessfulPut { id: *id, target: upstream, @@ -661,6 +672,11 @@ impl Operation for PutOp { sender: op_manager.ring.connection_manager.own_location(), }); } else { + tracing::trace!( + tx = %id, + %key, + "PutOp::process_message: SuccessfulPut originated locally; no upstream" + ); return_msg = None; } } diff --git a/crates/core/src/operations/update.rs b/crates/core/src/operations/update.rs index b6ba487a4..7e1f31921 100644 --- a/crates/core/src/operations/update.rs +++ b/crates/core/src/operations/update.rs @@ -786,10 +786,29 @@ impl OpManager { .subscribers_of(key) .map(|subs| { let self_peer = self.ring.connection_manager.get_peer_key(); + let allow_self = self_peer.as_ref().map(|me| me == sender).unwrap_or(false); subs.value() .iter() - .filter(|pk| &pk.peer != sender) - .filter(|pk| self_peer.as_ref().map(|me| &pk.peer != me).unwrap_or(true)) + .filter(|pk| { + // Allow the sender to remain in the broadcast list when we're the sender, + // so local auto-subscribe via GET/PUT still receives notifications. + if &pk.peer == sender { + allow_self + } else { + true + } + }) + .filter(|pk| { + if let Some(self_peer) = &self_peer { + if &pk.peer == self_peer { + allow_self + } else { + true + } + } else { + true + } + }) .cloned() .collect::>() }) From 4a6938745728e1c3e8d297648f14dd62c7cfd0b0 Mon Sep 17 00:00:00 2001 From: Ian Clarke Date: Sun, 9 Nov 2025 10:54:48 -0600 Subject: [PATCH 3/4] fix(update): remove GET fallback propagation --- crates/core/src/operations/update.rs | 217 ++++++--------------------- 1 file changed, 42 insertions(+), 175 deletions(-) diff --git a/crates/core/src/operations/update.rs b/crates/core/src/operations/update.rs index 7e1f31921..d5e45319d 100644 --- a/crates/core/src/operations/update.rs +++ b/crates/core/src/operations/update.rs @@ -4,7 +4,7 @@ use freenet_stdlib::client_api::{ErrorKind, HostResponse}; use freenet_stdlib::prelude::*; pub(crate) use self::messages::UpdateMsg; -use super::{get, OpEnum, OpError, OpInitialization, OpOutcome, Operation, OperationResult}; +use super::{OpEnum, OpError, OpInitialization, OpOutcome, Operation, OperationResult}; use crate::contract::{ContractHandlerEvent, StoreResponse}; use crate::message::{InnerMessage, NetMessage, NodeEvent, Transaction}; use crate::node::IsOperationCompleted; @@ -13,7 +13,6 @@ use crate::{ client_events::HostResult, node::{NetworkBridge, OpManager, PeerId}, }; -use std::collections::HashSet; pub(crate) struct UpdateOp { pub id: Transaction, @@ -249,14 +248,9 @@ impl Operation for UpdateOp { return_msg = None; } else { // Get broadcast targets for propagating UPDATE to subscribers - let mut broadcast_to = op_manager + let broadcast_to = op_manager .get_broadcast_targets_update(key, &request_sender.peer); - if broadcast_to.is_empty() { - broadcast_to = op_manager - .compute_update_fallback_targets(key, &request_sender.peer); - } - if broadcast_to.is_empty() { tracing::debug!( tx = %id, @@ -298,21 +292,10 @@ impl Operation for UpdateOp { } } else { // Contract not found locally - forward to another peer - let skip_peers = [&self_location.peer, &request_sender.peer]; - let next_target = op_manager - .ring - .closest_potentially_caching(key, skip_peers.as_slice()) - .or_else(|| { - op_manager - .ring - .k_closest_potentially_caching( - key, - skip_peers.as_slice(), - 5, - ) - .into_iter() - .next() - }); + let next_target = op_manager.ring.closest_potentially_caching( + key, + [&self_location.peer, &request_sender.peer].as_slice(), + ); if let Some(forward_target) = next_target { tracing::debug!( @@ -333,6 +316,7 @@ impl Operation for UpdateOp { }); new_state = None; } else { + // No peers available and we don't have the contract - capture context let skip_list = [&self_location.peer, &request_sender.peer]; let subscribers = op_manager .ring @@ -352,7 +336,6 @@ impl Operation for UpdateOp { .collect::>(); let connection_count = op_manager.ring.connection_manager.num_connections(); - // No peers available and we don't have the contract - error tracing::error!( tx = %id, %key, @@ -426,14 +409,10 @@ impl Operation for UpdateOp { return_msg = None; } else { // Get broadcast targets - let mut broadcast_to = + let broadcast_to = op_manager.get_broadcast_targets_update(key, &sender.peer); - if broadcast_to.is_empty() { - broadcast_to = - op_manager.compute_update_fallback_targets(key, &sender.peer); - } - + // If no peers to broadcast to, nothing else to do if broadcast_to.is_empty() { tracing::debug!( tx = %id, @@ -491,100 +470,36 @@ impl Operation for UpdateOp { }); new_state = None; } else { - tracing::warn!( + // No more peers to try - capture context for diagnostics + let skip_list = [&sender.peer, &self_location.peer]; + let subscribers = op_manager + .ring + .subscribers_of(key) + .map(|subs| { + subs.value() + .iter() + .map(|loc| format!("{:.8}", loc.peer)) + .collect::>() + }) + .unwrap_or_default(); + let candidates = op_manager + .ring + .k_closest_potentially_caching(key, skip_list.as_slice(), 5) + .into_iter() + .map(|loc| format!("{:.8}", loc.peer)) + .collect::>(); + let connection_count = + op_manager.ring.connection_manager.num_connections(); + tracing::error!( tx = %id, %key, - "No forwarding targets for UPDATE SeekNode - attempting local fetch" + subscribers = ?subscribers, + candidates = ?candidates, + connection_count, + sender = %sender.peer, + "Cannot handle UPDATE SeekNode: contract not found and no peers to forward to" ); - - let mut fetch_skip = HashSet::new(); - fetch_skip.insert(sender.peer.clone()); - - let get_op = get::start_op(*key, true, false); - if let Err(fetch_err) = - get::request_get(op_manager, get_op, fetch_skip).await - { - tracing::warn!( - tx = %id, - %key, - error = %fetch_err, - "Failed to fetch contract while handling UPDATE SeekNode" - ); - return Err(OpError::RingError(RingError::NoCachingPeers(*key))); - } - - if super::has_contract(op_manager, *key).await? { - tracing::info!( - tx = %id, - %key, - "Successfully fetched contract locally, applying UPDATE" - ); - let UpdateExecution { - value: updated_value, - summary: _summary, - changed, - } = update_contract( - op_manager, - *key, - value.clone(), - related_contracts.clone(), - ) - .await?; - - if !changed { - tracing::debug!( - tx = %id, - %key, - "Fetched contract apply produced no change during SeekNode fallback" - ); - new_state = None; - return_msg = None; - } else { - let mut broadcast_to = - op_manager.get_broadcast_targets_update(key, &sender.peer); - - if broadcast_to.is_empty() { - broadcast_to = op_manager - .compute_update_fallback_targets(key, &sender.peer); - } - - if broadcast_to.is_empty() { - tracing::debug!( - tx = %id, - %key, - "No broadcast targets after SeekNode fallback apply; finishing locally" - ); - new_state = None; - return_msg = None; - } else { - match try_to_broadcast( - *id, - true, - op_manager, - self.state, - (broadcast_to, sender.clone()), - *key, - updated_value.clone(), - false, - ) - .await - { - Ok((state, msg)) => { - new_state = state; - return_msg = msg; - } - Err(err) => return Err(err), - } - } - } - } else { - tracing::error!( - tx = %id, - %key, - "Contract still unavailable after fetch attempt during UPDATE SeekNode" - ); - return Err(OpError::RingError(RingError::NoCachingPeers(*key))); - } + return Err(OpError::RingError(RingError::NoCachingPeers(*key))); } } } @@ -618,14 +533,9 @@ impl Operation for UpdateOp { new_state = None; return_msg = None; } else { - let mut broadcast_to = + let broadcast_to = op_manager.get_broadcast_targets_update(key, &sender.peer); - if broadcast_to.is_empty() { - broadcast_to = - op_manager.compute_update_fallback_targets(key, &sender.peer); - } - tracing::debug!( "Successfully updated a value for contract {} @ {:?} - BroadcastTo - update", key, @@ -790,25 +700,16 @@ impl OpManager { subs.value() .iter() .filter(|pk| { - // Allow the sender to remain in the broadcast list when we're the sender, - // so local auto-subscribe via GET/PUT still receives notifications. - if &pk.peer == sender { + // Allow the sender (or ourselves) to stay in the broadcast list when we're + // originating the UPDATE so local auto-subscribes still receive events. + let is_sender = &pk.peer == sender; + let is_self = self_peer.as_ref().map_or(false, |me| &pk.peer == me); + if is_sender || is_self { allow_self } else { true } }) - .filter(|pk| { - if let Some(self_peer) = &self_peer { - if &pk.peer == self_peer { - allow_self - } else { - true - } - } else { - true - } - }) .cloned() .collect::>() }) @@ -848,40 +749,6 @@ impl OpManager { subscribers } - - fn compute_update_fallback_targets( - &self, - key: &ContractKey, - sender: &PeerId, - ) -> Vec { - let mut skip: HashSet = HashSet::new(); - skip.insert(sender.clone()); - if let Some(self_peer) = self.ring.connection_manager.get_peer_key() { - skip.insert(self_peer); - } - - let candidates = self - .ring - .k_closest_potentially_caching(key, &skip, 3) - .into_iter() - .filter(|candidate| &candidate.peer != sender) - .collect::>(); - - if !candidates.is_empty() { - tracing::info!( - "UPDATE_PROPAGATION: contract={:.8} from={} using fallback targets={}", - key, - sender, - candidates - .iter() - .map(|c| format!("{:.8}", c.peer)) - .collect::>() - .join(",") - ); - } - - candidates - } } fn build_op_result( From e16169078f4969334db716f966f5686c80ec4385 Mon Sep 17 00:00:00 2001 From: Ian Clarke Date: Sun, 9 Nov 2025 10:55:49 -0600 Subject: [PATCH 4/4] style(update): use direct equality in allow-self check --- crates/core/src/operations/update.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/core/src/operations/update.rs b/crates/core/src/operations/update.rs index d5e45319d..7743ce95b 100644 --- a/crates/core/src/operations/update.rs +++ b/crates/core/src/operations/update.rs @@ -703,7 +703,7 @@ impl OpManager { // Allow the sender (or ourselves) to stay in the broadcast list when we're // originating the UPDATE so local auto-subscribes still receive events. let is_sender = &pk.peer == sender; - let is_self = self_peer.as_ref().map_or(false, |me| &pk.peer == me); + let is_self = self_peer.as_ref() == Some(&pk.peer); if is_sender || is_self { allow_self } else {