Skip to content

Commit a413514

Browse files
authored
fix: add update fallback propagation (#2063)
1 parent f34b396 commit a413514

File tree

6 files changed

+113
-20
lines changed

6 files changed

+113
-20
lines changed

crates/core/src/contract/executor/runtime.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -871,6 +871,11 @@ impl Executor<Runtime> {
871871
.await
872872
.map_err(ExecutorError::other)?;
873873

874+
tracing::info!(
875+
"Contract state updated for {key}, new_size_bytes={}",
876+
new_state.as_ref().len()
877+
);
878+
874879
if let Err(err) = self
875880
.send_update_notification(key, parameters, &new_state)
876881
.await

crates/core/src/operations/put.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -570,6 +570,11 @@ impl Operation for PutOp {
570570
return_msg = None;
571571
}
572572
PutMsg::SuccessfulPut { id, .. } => {
573+
tracing::debug!(
574+
tx = %id,
575+
current_state = ?self.state,
576+
"PutOp::process_message: handling SuccessfulPut"
577+
);
573578
match self.state {
574579
Some(PutState::AwaitingResponse {
575580
key,
@@ -654,13 +659,24 @@ impl Operation for PutOp {
654659

655660
// Forward success message upstream if needed
656661
if let Some(upstream) = upstream {
662+
tracing::trace!(
663+
tx = %id,
664+
%key,
665+
upstream = %upstream.peer,
666+
"PutOp::process_message: Forwarding SuccessfulPut upstream"
667+
);
657668
return_msg = Some(PutMsg::SuccessfulPut {
658669
id: *id,
659670
target: upstream,
660671
key,
661672
sender: op_manager.ring.connection_manager.own_location(),
662673
});
663674
} else {
675+
tracing::trace!(
676+
tx = %id,
677+
%key,
678+
"PutOp::process_message: SuccessfulPut originated locally; no upstream"
679+
);
664680
return_msg = None;
665681
}
666682
}

crates/core/src/operations/update.rs

Lines changed: 74 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -316,10 +316,33 @@ impl Operation for UpdateOp {
316316
});
317317
new_state = None;
318318
} else {
319-
// No peers available and we don't have the contract - error
319+
// No peers available and we don't have the contract - capture context
320+
let skip_list = [&self_location.peer, &request_sender.peer];
321+
let subscribers = op_manager
322+
.ring
323+
.subscribers_of(key)
324+
.map(|subs| {
325+
subs.value()
326+
.iter()
327+
.map(|loc| format!("{:.8}", loc.peer))
328+
.collect::<Vec<_>>()
329+
})
330+
.unwrap_or_default();
331+
let candidates = op_manager
332+
.ring
333+
.k_closest_potentially_caching(key, skip_list.as_slice(), 5)
334+
.into_iter()
335+
.map(|loc| format!("{:.8}", loc.peer))
336+
.collect::<Vec<_>>();
337+
let connection_count =
338+
op_manager.ring.connection_manager.num_connections();
320339
tracing::error!(
321340
tx = %id,
322341
%key,
342+
subscribers = ?subscribers,
343+
candidates = ?candidates,
344+
connection_count,
345+
request_sender = %request_sender.peer,
323346
"Cannot handle UPDATE: contract not found locally and no peers to forward to"
324347
);
325348
return Err(OpError::RingError(RingError::NoCachingPeers(*key)));
@@ -447,10 +470,33 @@ impl Operation for UpdateOp {
447470
});
448471
new_state = None;
449472
} else {
450-
// No more peers to try - error
473+
// No more peers to try - capture context for diagnostics
474+
let skip_list = [&sender.peer, &self_location.peer];
475+
let subscribers = op_manager
476+
.ring
477+
.subscribers_of(key)
478+
.map(|subs| {
479+
subs.value()
480+
.iter()
481+
.map(|loc| format!("{:.8}", loc.peer))
482+
.collect::<Vec<_>>()
483+
})
484+
.unwrap_or_default();
485+
let candidates = op_manager
486+
.ring
487+
.k_closest_potentially_caching(key, skip_list.as_slice(), 5)
488+
.into_iter()
489+
.map(|loc| format!("{:.8}", loc.peer))
490+
.collect::<Vec<_>>();
491+
let connection_count =
492+
op_manager.ring.connection_manager.num_connections();
451493
tracing::error!(
452494
tx = %id,
453495
%key,
496+
subscribers = ?subscribers,
497+
candidates = ?candidates,
498+
connection_count,
499+
sender = %sender.peer,
454500
"Cannot handle UPDATE SeekNode: contract not found and no peers to forward to"
455501
);
456502
return Err(OpError::RingError(RingError::NoCachingPeers(*key)));
@@ -649,9 +695,21 @@ impl OpManager {
649695
.ring
650696
.subscribers_of(key)
651697
.map(|subs| {
698+
let self_peer = self.ring.connection_manager.get_peer_key();
699+
let allow_self = self_peer.as_ref().map(|me| me == sender).unwrap_or(false);
652700
subs.value()
653701
.iter()
654-
.filter(|pk| &pk.peer != sender)
702+
.filter(|pk| {
703+
// Allow the sender (or ourselves) to stay in the broadcast list when we're
704+
// originating the UPDATE so local auto-subscribes still receive events.
705+
let is_sender = &pk.peer == sender;
706+
let is_self = self_peer.as_ref() == Some(&pk.peer);
707+
if is_sender || is_self {
708+
allow_self
709+
} else {
710+
true
711+
}
712+
})
655713
.cloned()
656714
.collect::<Vec<_>>()
657715
})
@@ -671,10 +729,21 @@ impl OpManager {
671729
subscribers.len()
672730
);
673731
} else {
732+
let own_peer = self.ring.connection_manager.get_peer_key();
733+
let skip_slice = std::slice::from_ref(sender);
734+
let fallback_candidates = self
735+
.ring
736+
.k_closest_potentially_caching(key, skip_slice, 5)
737+
.into_iter()
738+
.map(|candidate| format!("{:.8}", candidate.peer))
739+
.collect::<Vec<_>>();
740+
674741
tracing::warn!(
675-
"UPDATE_PROPAGATION: contract={:.8} from={} NO_TARGETS - update will not propagate",
742+
"UPDATE_PROPAGATION: contract={:.8} from={} NO_TARGETS - update will not propagate (self={:?}, fallback_candidates={:?})",
676743
key,
677-
sender
744+
sender,
745+
own_peer.map(|p| format!("{:.8}", p)),
746+
fallback_candidates
678747
);
679748
}
680749

crates/core/src/wasm_runtime/store.rs

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use std::io::{self, BufReader, BufWriter, Seek, Write};
77
use std::path::{Path, PathBuf};
88
use std::time::Duration;
99
use std::{fs::File, io::Read};
10+
use tracing::error;
1011

1112
const INTERNAL_KEY: usize = 32;
1213
const TOMBSTONE_MARKER: usize = 1;
@@ -325,7 +326,7 @@ fn compact_index_file<S: StoreFsManagement>(key_file_path: &Path) -> std::io::Re
325326
let mut original_reader = BufReader::new(original_file);
326327
let mut temp_writer = SafeWriter::<S>::new(&temp_file_path, true).inspect_err(|_| {
327328
if let Err(e) = fs::remove_file(&lock_file_path) {
328-
eprintln!("{}:{}: Failed to remove lock file: {e}", file!(), line!());
329+
error!("{}:{}: Failed to remove lock file: {e}", file!(), line!());
329330
}
330331
})?;
331332

@@ -340,7 +341,7 @@ fn compact_index_file<S: StoreFsManagement>(key_file_path: &Path) -> std::io::Re
340341
};
341342
if let Err(err) = temp_writer.insert_record(store_key, value) {
342343
if let Err(e) = fs::remove_file(&lock_file_path) {
343-
eprintln!("{}:{}: Failed to remove lock file: {e}", file!(), line!());
344+
error!("{}:{}: Failed to remove lock file: {e}", file!(), line!());
344345
}
345346
return Err(err);
346347
}
@@ -356,7 +357,7 @@ fn compact_index_file<S: StoreFsManagement>(key_file_path: &Path) -> std::io::Re
356357
Err(other) => {
357358
// Handle other errors gracefully
358359
if let Err(e) = fs::remove_file(&lock_file_path) {
359-
eprintln!("{}:{}: Failed to remove lock file: {e}", file!(), line!());
360+
error!("{}:{}: Failed to remove lock file: {e}", file!(), line!());
360361
}
361362
return Err(other);
362363
}
@@ -366,30 +367,30 @@ fn compact_index_file<S: StoreFsManagement>(key_file_path: &Path) -> std::io::Re
366367
// Check if any deleted records were found; if not, skip compaction
367368
if !any_deleted {
368369
if let Err(e) = fs::remove_file(&lock_file_path) {
369-
eprintln!("{}:{}: Failed to remove lock file: {e}", file!(), line!());
370+
error!("{}:{}: Failed to remove lock file: {e}", file!(), line!());
370371
}
371372
return Ok(());
372373
}
373374

374375
// Clean up and finalize the compaction process
375376
if let Err(e) = temp_writer.flush() {
376377
if let Err(e) = fs::remove_file(&lock_file_path) {
377-
eprintln!("{}:{}: Failed to remove lock file: {e}", file!(), line!());
378+
error!("{}:{}: Failed to remove lock file: {e}", file!(), line!());
378379
}
379380
return Err(e);
380381
}
381382

382383
// Replace the original file with the temporary file
383384
if let Err(e) = fs::rename(&temp_file_path, key_file_path) {
384385
if let Err(e) = fs::remove_file(&lock_file_path) {
385-
eprintln!("{}:{}: Failed to remove lock file: {e}", file!(), line!());
386+
error!("{}:{}: Failed to remove lock file: {e}", file!(), line!());
386387
}
387388
return Err(e);
388389
}
389390

390391
// Remove the lock file
391392
fs::remove_file(&lock_file_path).map_err(|e| {
392-
eprintln!("{}:{}: Failed to remove lock file: {e}", file!(), line!());
393+
error!("{}:{}: Failed to remove lock file: {e}", file!(), line!());
393394
e
394395
})?;
395396

@@ -589,13 +590,13 @@ mod tests {
589590
create_test_data(&mut file, &key_file_path, shared_data, i);
590591
} else if let Err(err) = super::compact_index_file::<TestStore1>(&key_file_path)
591592
{
592-
eprintln!("Thread encountered an error during compaction: {err}");
593+
error!("Thread encountered an error during compaction: {err}");
593594
return Err(err);
594595
}
595596
barrier.wait();
596597
// compact a last time so we know what data to compare against
597598
super::compact_index_file::<TestStore1>(&key_file_path).map_err(|err| {
598-
eprintln!("Thread encountered an error during compaction: {err}");
599+
error!("Thread encountered an error during compaction: {err}");
599600
err
600601
})
601602
})

crates/core/src/wasm_runtime/tests/contract_metering.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use crate::wasm_runtime::tests::TestSetup;
55
use crate::wasm_runtime::{ContractExecError, RuntimeInnerError};
66
use freenet_stdlib::prelude::*;
77
use std::time::Instant;
8+
use tracing::info;
89

910
const TEST_CONTRACT_METERING: &str = "test_contract_metering";
1011

@@ -52,7 +53,7 @@ fn validate_state_metering() -> Result<(), Box<dyn std::error::Error>> {
5253
);
5354

5455
let duration = time.elapsed().as_secs_f64();
55-
println!("Duration: {duration:.2}s");
56+
info!("Duration: {duration:.2}s");
5657

5758
assert!(duration < 5.0, "Should not timeout");
5859
assert!(
@@ -103,7 +104,7 @@ fn test_update_state_metering() -> Result<(), Box<dyn std::error::Error>> {
103104
);
104105

105106
let duration = time.elapsed().as_secs_f64();
106-
println!("Duration: {duration:.2}s");
107+
info!("Duration: {duration:.2}s");
107108

108109
assert!(duration < 5.0, "Should not timeout");
109110
assert!(
@@ -150,7 +151,7 @@ fn test_summarize_state_metering() -> Result<(), Box<dyn std::error::Error>> {
150151
let result = runtime.summarize_state(&contract_key, &Parameters::from([].as_ref()), &state);
151152

152153
let duration = time.elapsed().as_secs_f64();
153-
println!("Duration: {duration:.2}s");
154+
info!("Duration: {duration:.2}s");
154155

155156
assert!(duration < 5.0, "Should not timeout");
156157
assert!(
@@ -202,7 +203,7 @@ fn test_get_state_delta_metering() -> Result<(), Box<dyn std::error::Error>> {
202203
);
203204

204205
let duration = time.elapsed().as_secs_f64();
205-
println!("Duration: {duration:.2}s");
206+
info!("Duration: {duration:.2}s");
206207

207208
assert!(duration < 5.0, "Should not timeout");
208209
assert!(

crates/core/src/wasm_runtime/tests/mod.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use freenet_stdlib::prelude::{
66

77
use crate::util::tests::get_temp_dir;
88
use crate::util::workspace::get_workspace_target_dir;
9+
use tracing::info;
910

1011
use super::{ContractStore, DelegateStore, SecretsStore};
1112

@@ -22,7 +23,7 @@ pub(crate) fn get_test_module(name: &str) -> Result<Vec<u8>, Box<dyn std::error:
2223
path.join("tests").join(name.replace('_', "-"))
2324
};
2425
let target = get_workspace_target_dir();
25-
println!(
26+
info!(
2627
"trying to compile the test contract, target: {}",
2728
target.display()
2829
);
@@ -45,7 +46,7 @@ pub(crate) fn get_test_module(name: &str) -> Result<Vec<u8>, Box<dyn std::error:
4546
.join("debug")
4647
.join(name)
4748
.with_extension("wasm");
48-
println!("output file: {output_file:?}");
49+
info!("output file: {output_file:?}");
4950
Ok(std::fs::read(output_file)?)
5051
}
5152

0 commit comments

Comments
 (0)