Skip to content

Commit a0b067d

Browse files
iduartgomezclaude
andauthored
fix: waker registration and cross-node PUT response routing (#1985)
Co-authored-by: Claude <noreply@anthropic.com>
1 parent 7f89b55 commit a0b067d

File tree

9 files changed

+3612
-1253
lines changed

9 files changed

+3612
-1253
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/core/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ opentelemetry_sdk = { optional = true, version = "0.31", features = ["rt-tokio"]
7777
# internal deps
7878
freenet-stdlib = { features = ["net"], workspace = true }
7979
console-subscriber = { version = "0.4.1", optional = true }
80+
tokio-stream = "0.1.17"
8081

8182
[target.'cfg(windows)'.dependencies]
8283
winapi = { version = "0.3", features = ["sysinfoapi"] }

crates/core/src/node/network_bridge/handshake.rs

Lines changed: 863 additions & 871 deletions
Large diffs are not rendered by default.

crates/core/src/node/network_bridge/p2p_protoc.rs

Lines changed: 312 additions & 131 deletions
Large diffs are not rendered by default.

crates/core/src/node/network_bridge/priority_select.rs

Lines changed: 2116 additions & 148 deletions
Large diffs are not rendered by default.

crates/core/src/node/op_state_manager.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,15 @@ impl OpManager {
229229
}
230230

231231
pub async fn push(&self, id: Transaction, op: OpEnum) -> Result<(), OpError> {
232+
// Check if operation is already completed - don't push back to HashMap
233+
if self.ops.completed.contains(&id) {
234+
tracing::debug!(
235+
tx = %id,
236+
"OpManager: Ignoring push for already completed operation"
237+
);
238+
return Ok(());
239+
}
240+
232241
if let Some(tx) = self.ops.under_progress.remove(&id) {
233242
if tx.timed_out() {
234243
self.ops.completed.insert(tx);

crates/core/src/operations/put.rs

Lines changed: 26 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,7 @@ impl Operation for PutOp {
163163
match input {
164164
PutMsg::RequestPut {
165165
id,
166+
sender,
166167
contract,
167168
related_contracts,
168169
value,
@@ -171,7 +172,7 @@ impl Operation for PutOp {
171172
} => {
172173
// Get the contract key and own location
173174
let key = contract.key();
174-
let sender = op_manager.ring.connection_manager.own_location();
175+
let own_location = op_manager.ring.connection_manager.own_location();
175176

176177
tracing::info!(
177178
"Requesting put for contract {} from {} to {}",
@@ -268,34 +269,41 @@ impl Operation for PutOp {
268269
// Create a SeekNode message to forward to the next hop
269270
return_msg = Some(PutMsg::SeekNode {
270271
id: *id,
271-
sender,
272+
sender: sender.clone(),
272273
target: forward_target,
273274
value: modified_value.clone(),
274275
contract: contract.clone(),
275276
related_contracts: related_contracts.clone(),
276277
htl: *htl,
277278
});
279+
280+
// Transition to AwaitingResponse state to handle future SuccessfulPut messages
281+
new_state = Some(PutState::AwaitingResponse {
282+
key,
283+
upstream: Some(sender.clone()),
284+
contract: contract.clone(),
285+
state: modified_value,
286+
subscribe: false,
287+
});
278288
} else {
279289
// No other peers to forward to - we're the final destination
280290
tracing::debug!(
281291
tx = %id,
282292
%key,
283-
"No peers to forward to - handling PUT completion locally"
293+
"No peers to forward to - handling PUT completion locally, sending SuccessfulPut back to sender"
284294
);
285-
return_msg = None;
286-
}
287295

288-
// Transition to AwaitingResponse state to handle future SuccessfulPut messages
289-
new_state = Some(PutState::AwaitingResponse {
290-
key,
291-
upstream: match &self.state {
292-
Some(PutState::ReceivedRequest) => None,
293-
_ => None,
294-
},
295-
contract: contract.clone(),
296-
state: modified_value,
297-
subscribe: false,
298-
});
296+
// Send SuccessfulPut back to the sender (upstream node)
297+
return_msg = Some(PutMsg::SuccessfulPut {
298+
id: *id,
299+
target: sender.clone(),
300+
key,
301+
sender: own_location,
302+
});
303+
304+
// Mark operation as finished
305+
new_state = Some(PutState::Finished { key });
306+
}
299307
}
300308
PutMsg::SeekNode {
301309
id,
@@ -1110,6 +1118,7 @@ pub(crate) async fn request_put(op_manager: &OpManager, mut put_op: PutOp) -> Re
11101118
// Create RequestPut message and forward to target peer
11111119
let msg = PutMsg::RequestPut {
11121120
id,
1121+
sender: own_location,
11131122
contract,
11141123
related_contracts,
11151124
value,
@@ -1272,6 +1281,7 @@ mod messages {
12721281
/// Internal node instruction to find a route to the target node.
12731282
RequestPut {
12741283
id: Transaction,
1284+
sender: PeerKeyLocation,
12751285
contract: ContractContainer,
12761286
#[serde(deserialize_with = "RelatedContracts::deser_related_contracts")]
12771287
related_contracts: RelatedContracts<'static>,

0 commit comments

Comments
 (0)