Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions crates/core/src/contract/executor/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -871,6 +871,11 @@ impl Executor<Runtime> {
.await
.map_err(ExecutorError::other)?;

tracing::info!(
"Contract state updated for {key}, new_size_bytes={}",
new_state.as_ref().len()
);

if let Err(err) = self
.send_update_notification(key, parameters, &new_state)
.await
Expand Down
16 changes: 16 additions & 0 deletions crates/core/src/operations/put.rs
Original file line number Diff line number Diff line change
Expand Up @@ -570,6 +570,11 @@ impl Operation for PutOp {
return_msg = None;
}
PutMsg::SuccessfulPut { id, .. } => {
tracing::debug!(
tx = %id,
current_state = ?self.state,
"PutOp::process_message: handling SuccessfulPut"
);
match self.state {
Some(PutState::AwaitingResponse {
key,
Expand Down Expand Up @@ -654,13 +659,24 @@ impl Operation for PutOp {

// Forward success message upstream if needed
if let Some(upstream) = upstream {
tracing::trace!(
tx = %id,
%key,
upstream = %upstream.peer,
"PutOp::process_message: Forwarding SuccessfulPut upstream"
);
return_msg = Some(PutMsg::SuccessfulPut {
id: *id,
target: upstream,
key,
sender: op_manager.ring.connection_manager.own_location(),
});
} else {
tracing::trace!(
tx = %id,
%key,
"PutOp::process_message: SuccessfulPut originated locally; no upstream"
);
return_msg = None;
}
}
Expand Down
79 changes: 74 additions & 5 deletions crates/core/src/operations/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -316,10 +316,33 @@ impl Operation for UpdateOp {
});
new_state = None;
} else {
// No peers available and we don't have the contract - error
// No peers available and we don't have the contract - capture context
let skip_list = [&self_location.peer, &request_sender.peer];
let subscribers = op_manager
.ring
.subscribers_of(key)
.map(|subs| {
subs.value()
.iter()
.map(|loc| format!("{:.8}", loc.peer))
.collect::<Vec<_>>()
})
.unwrap_or_default();
let candidates = op_manager
.ring
.k_closest_potentially_caching(key, skip_list.as_slice(), 5)
.into_iter()
.map(|loc| format!("{:.8}", loc.peer))
.collect::<Vec<_>>();
let connection_count =
op_manager.ring.connection_manager.num_connections();
tracing::error!(
tx = %id,
%key,
subscribers = ?subscribers,
candidates = ?candidates,
connection_count,
request_sender = %request_sender.peer,
"Cannot handle UPDATE: contract not found locally and no peers to forward to"
);
return Err(OpError::RingError(RingError::NoCachingPeers(*key)));
Expand Down Expand Up @@ -447,10 +470,33 @@ impl Operation for UpdateOp {
});
new_state = None;
} else {
// No more peers to try - error
// No more peers to try - capture context for diagnostics
let skip_list = [&sender.peer, &self_location.peer];
let subscribers = op_manager
.ring
.subscribers_of(key)
.map(|subs| {
subs.value()
.iter()
.map(|loc| format!("{:.8}", loc.peer))
.collect::<Vec<_>>()
})
.unwrap_or_default();
let candidates = op_manager
.ring
.k_closest_potentially_caching(key, skip_list.as_slice(), 5)
.into_iter()
.map(|loc| format!("{:.8}", loc.peer))
.collect::<Vec<_>>();
let connection_count =
op_manager.ring.connection_manager.num_connections();
tracing::error!(
tx = %id,
%key,
subscribers = ?subscribers,
candidates = ?candidates,
connection_count,
sender = %sender.peer,
"Cannot handle UPDATE SeekNode: contract not found and no peers to forward to"
);
return Err(OpError::RingError(RingError::NoCachingPeers(*key)));
Expand Down Expand Up @@ -649,9 +695,21 @@ impl OpManager {
.ring
.subscribers_of(key)
.map(|subs| {
let self_peer = self.ring.connection_manager.get_peer_key();
let allow_self = self_peer.as_ref().map(|me| me == sender).unwrap_or(false);
subs.value()
.iter()
.filter(|pk| &pk.peer != sender)
.filter(|pk| {
// Allow the sender (or ourselves) to stay in the broadcast list when we're
// originating the UPDATE so local auto-subscribes still receive events.
let is_sender = &pk.peer == sender;
let is_self = self_peer.as_ref() == Some(&pk.peer);
if is_sender || is_self {
allow_self
} else {
true
}
})
.cloned()
.collect::<Vec<_>>()
})
Expand All @@ -671,10 +729,21 @@ impl OpManager {
subscribers.len()
);
} else {
let own_peer = self.ring.connection_manager.get_peer_key();
let skip_slice = std::slice::from_ref(sender);
let fallback_candidates = self
.ring
.k_closest_potentially_caching(key, skip_slice, 5)
.into_iter()
.map(|candidate| format!("{:.8}", candidate.peer))
.collect::<Vec<_>>();

tracing::warn!(
"UPDATE_PROPAGATION: contract={:.8} from={} NO_TARGETS - update will not propagate",
"UPDATE_PROPAGATION: contract={:.8} from={} NO_TARGETS - update will not propagate (self={:?}, fallback_candidates={:?})",
key,
sender
sender,
own_peer.map(|p| format!("{:.8}", p)),
fallback_candidates
);
}

Expand Down
19 changes: 10 additions & 9 deletions crates/core/src/wasm_runtime/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::io::{self, BufReader, BufWriter, Seek, Write};
use std::path::{Path, PathBuf};
use std::time::Duration;
use std::{fs::File, io::Read};
use tracing::error;

const INTERNAL_KEY: usize = 32;
const TOMBSTONE_MARKER: usize = 1;
Expand Down Expand Up @@ -325,7 +326,7 @@ fn compact_index_file<S: StoreFsManagement>(key_file_path: &Path) -> std::io::Re
let mut original_reader = BufReader::new(original_file);
let mut temp_writer = SafeWriter::<S>::new(&temp_file_path, true).inspect_err(|_| {
if let Err(e) = fs::remove_file(&lock_file_path) {
eprintln!("{}:{}: Failed to remove lock file: {e}", file!(), line!());
error!("{}:{}: Failed to remove lock file: {e}", file!(), line!());
}
})?;

Expand All @@ -340,7 +341,7 @@ fn compact_index_file<S: StoreFsManagement>(key_file_path: &Path) -> std::io::Re
};
if let Err(err) = temp_writer.insert_record(store_key, value) {
if let Err(e) = fs::remove_file(&lock_file_path) {
eprintln!("{}:{}: Failed to remove lock file: {e}", file!(), line!());
error!("{}:{}: Failed to remove lock file: {e}", file!(), line!());
}
return Err(err);
}
Expand All @@ -356,7 +357,7 @@ fn compact_index_file<S: StoreFsManagement>(key_file_path: &Path) -> std::io::Re
Err(other) => {
// Handle other errors gracefully
if let Err(e) = fs::remove_file(&lock_file_path) {
eprintln!("{}:{}: Failed to remove lock file: {e}", file!(), line!());
error!("{}:{}: Failed to remove lock file: {e}", file!(), line!());
}
return Err(other);
}
Expand All @@ -366,30 +367,30 @@ fn compact_index_file<S: StoreFsManagement>(key_file_path: &Path) -> std::io::Re
// Check if any deleted records were found; if not, skip compaction
if !any_deleted {
if let Err(e) = fs::remove_file(&lock_file_path) {
eprintln!("{}:{}: Failed to remove lock file: {e}", file!(), line!());
error!("{}:{}: Failed to remove lock file: {e}", file!(), line!());
}
return Ok(());
}

// Clean up and finalize the compaction process
if let Err(e) = temp_writer.flush() {
if let Err(e) = fs::remove_file(&lock_file_path) {
eprintln!("{}:{}: Failed to remove lock file: {e}", file!(), line!());
error!("{}:{}: Failed to remove lock file: {e}", file!(), line!());
}
return Err(e);
}

// Replace the original file with the temporary file
if let Err(e) = fs::rename(&temp_file_path, key_file_path) {
if let Err(e) = fs::remove_file(&lock_file_path) {
eprintln!("{}:{}: Failed to remove lock file: {e}", file!(), line!());
error!("{}:{}: Failed to remove lock file: {e}", file!(), line!());
}
return Err(e);
}

// Remove the lock file
fs::remove_file(&lock_file_path).map_err(|e| {
eprintln!("{}:{}: Failed to remove lock file: {e}", file!(), line!());
error!("{}:{}: Failed to remove lock file: {e}", file!(), line!());
e
})?;

Expand Down Expand Up @@ -589,13 +590,13 @@ mod tests {
create_test_data(&mut file, &key_file_path, shared_data, i);
} else if let Err(err) = super::compact_index_file::<TestStore1>(&key_file_path)
{
eprintln!("Thread encountered an error during compaction: {err}");
error!("Thread encountered an error during compaction: {err}");
return Err(err);
}
barrier.wait();
// compact a last time so we know what data to compare against
super::compact_index_file::<TestStore1>(&key_file_path).map_err(|err| {
eprintln!("Thread encountered an error during compaction: {err}");
error!("Thread encountered an error during compaction: {err}");
err
})
})
Expand Down
9 changes: 5 additions & 4 deletions crates/core/src/wasm_runtime/tests/contract_metering.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::wasm_runtime::tests::TestSetup;
use crate::wasm_runtime::{ContractExecError, RuntimeInnerError};
use freenet_stdlib::prelude::*;
use std::time::Instant;
use tracing::info;

const TEST_CONTRACT_METERING: &str = "test_contract_metering";

Expand Down Expand Up @@ -52,7 +53,7 @@ fn validate_state_metering() -> Result<(), Box<dyn std::error::Error>> {
);

let duration = time.elapsed().as_secs_f64();
println!("Duration: {duration:.2}s");
info!("Duration: {duration:.2}s");

assert!(duration < 5.0, "Should not timeout");
assert!(
Expand Down Expand Up @@ -103,7 +104,7 @@ fn test_update_state_metering() -> Result<(), Box<dyn std::error::Error>> {
);

let duration = time.elapsed().as_secs_f64();
println!("Duration: {duration:.2}s");
info!("Duration: {duration:.2}s");

assert!(duration < 5.0, "Should not timeout");
assert!(
Expand Down Expand Up @@ -150,7 +151,7 @@ fn test_summarize_state_metering() -> Result<(), Box<dyn std::error::Error>> {
let result = runtime.summarize_state(&contract_key, &Parameters::from([].as_ref()), &state);

let duration = time.elapsed().as_secs_f64();
println!("Duration: {duration:.2}s");
info!("Duration: {duration:.2}s");

assert!(duration < 5.0, "Should not timeout");
assert!(
Expand Down Expand Up @@ -202,7 +203,7 @@ fn test_get_state_delta_metering() -> Result<(), Box<dyn std::error::Error>> {
);

let duration = time.elapsed().as_secs_f64();
println!("Duration: {duration:.2}s");
info!("Duration: {duration:.2}s");

assert!(duration < 5.0, "Should not timeout");
assert!(
Expand Down
5 changes: 3 additions & 2 deletions crates/core/src/wasm_runtime/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use freenet_stdlib::prelude::{

use crate::util::tests::get_temp_dir;
use crate::util::workspace::get_workspace_target_dir;
use tracing::info;

use super::{ContractStore, DelegateStore, SecretsStore};

Expand All @@ -22,7 +23,7 @@ pub(crate) fn get_test_module(name: &str) -> Result<Vec<u8>, Box<dyn std::error:
path.join("tests").join(name.replace('_', "-"))
};
let target = get_workspace_target_dir();
println!(
info!(
"trying to compile the test contract, target: {}",
target.display()
);
Expand All @@ -45,7 +46,7 @@ pub(crate) fn get_test_module(name: &str) -> Result<Vec<u8>, Box<dyn std::error:
.join("debug")
.join(name)
.with_extension("wasm");
println!("output file: {output_file:?}");
info!("output file: {output_file:?}");
Ok(std::fs::read(output_file)?)
}

Expand Down
Loading