Skip to content

Commit fa08a89

Browse files
iduartgomezclaude
andauthored
fix: critical operation state management issues (#1977)
Co-authored-by: Claude <noreply@anthropic.com>
1 parent 822caf4 commit fa08a89

File tree

5 files changed

+198
-98
lines changed

5 files changed

+198
-98
lines changed

crates/core/src/node/op_state_manager.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -305,6 +305,7 @@ impl OpManager {
305305

306306
pub fn completed(&self, id: Transaction) {
307307
self.ring.live_tx_tracker.remove_finished_transaction(id);
308+
self.ops.under_progress.remove(&id);
308309
self.ops.completed.insert(id);
309310
}
310311

crates/core/src/operations/get.rs

Lines changed: 114 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,7 @@ pub(crate) async fn request_get(
186186
let msg = GetMsg::RequestGet {
187187
id,
188188
key: key_val,
189+
sender: op_manager.ring.connection_manager.own_location(),
189190
target: target.clone(),
190191
fetch_contract,
191192
skip_list,
@@ -212,8 +213,9 @@ pub(crate) async fn request_get(
212213

213214
#[derive(Debug)]
214215
enum GetState {
215-
/// A new petition for a get op.
216-
ReceivedRequest,
216+
/// A new petition for a get op received from another peer.
217+
/// The requester field stores who sent us this request, so we can send the result back.
218+
ReceivedRequest { requester: Option<PeerKeyLocation> },
217219
/// Preparing request for get op.
218220
PrepareRequest {
219221
key: ContractKey,
@@ -243,7 +245,7 @@ enum GetState {
243245
impl Display for GetState {
244246
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
245247
match self {
246-
GetState::ReceivedRequest => write!(f, "ReceivedRequest"),
248+
GetState::ReceivedRequest { .. } => write!(f, "ReceivedRequest"),
247249
GetState::PrepareRequest {
248250
key,
249251
id,
@@ -388,9 +390,10 @@ impl Operation for GetOp {
388390
}
389391
Ok(None) => {
390392
// new request to get a value for a contract, initialize the machine
393+
let requester = msg.sender().cloned();
391394
Ok(OpInitialization {
392395
op: Self {
393-
state: Some(GetState::ReceivedRequest),
396+
state: Some(GetState::ReceivedRequest { requester }),
394397
id: tx,
395398
result: None,
396399
stats: None, // don't care about stats in target peers
@@ -424,76 +427,114 @@ impl Operation for GetOp {
424427
GetMsg::RequestGet {
425428
key,
426429
id,
430+
sender: _,
427431
target,
428432
fetch_contract,
429433
skip_list,
430434
} => {
431-
// fast tracked from the request_get func
432-
debug_assert!(matches!(
433-
self.state,
434-
Some(GetState::AwaitingResponse { .. })
435-
));
436-
tracing::info!(tx = %id, %key, target = %target.peer, "Seek contract");
437-
438-
// Initialize stats for tracking the operation
439-
stats = Some(Box::new(GetStats {
440-
contract_location: Location::from(key),
441-
next_peer: None,
442-
transfer_time: None,
443-
first_response_time: None,
444-
}));
445-
446-
// First check if we have the contract locally before forwarding
447-
let get_result = op_manager
448-
.notify_contract_handler(ContractHandlerEvent::GetQuery {
449-
key: *key,
450-
return_contract_code: *fetch_contract,
451-
})
452-
.await;
453-
454-
match get_result {
455-
Ok(ContractHandlerEvent::GetResponse {
456-
response:
457-
Ok(StoreResponse {
458-
state: Some(state),
459-
contract,
460-
}),
461-
..
462-
}) => {
463-
// Contract found locally!
464-
tracing::debug!(tx = %id, %key, "Contract found locally in RequestGet handler");
465-
466-
// Since this is RequestGet, we are the original requester
467-
new_state = Some(GetState::Finished { key: *key });
468-
return_msg = None;
469-
result = Some(GetResult {
435+
// Check if operation is already completed
436+
if matches!(self.state, Some(GetState::Finished { .. })) {
437+
tracing::debug!(
438+
tx = %id,
439+
"GET: RequestGet received for already completed operation, ignoring duplicate request"
440+
);
441+
// Return the operation in its current state
442+
new_state = self.state;
443+
return_msg = None;
444+
result = self.result;
445+
} else {
446+
// Normal case: operation should be in ReceivedRequest or AwaitingResponse state
447+
debug_assert!(matches!(
448+
self.state,
449+
Some(GetState::ReceivedRequest { .. })
450+
| Some(GetState::AwaitingResponse { .. })
451+
));
452+
tracing::info!(tx = %id, %key, target = %target.peer, "Seek contract");
453+
454+
// Initialize stats for tracking the operation
455+
stats = Some(Box::new(GetStats {
456+
contract_location: Location::from(key),
457+
next_peer: None,
458+
transfer_time: None,
459+
first_response_time: None,
460+
}));
461+
462+
// First check if we have the contract locally before forwarding
463+
let get_result = op_manager
464+
.notify_contract_handler(ContractHandlerEvent::GetQuery {
470465
key: *key,
471-
state,
472-
contract,
473-
});
474-
}
475-
_ => {
476-
// Contract not found locally, proceed with forwarding
477-
tracing::debug!(tx = %id, %key, "Contract not found locally, forwarding to {}", target.peer);
466+
return_contract_code: *fetch_contract,
467+
})
468+
.await;
478469

479-
// Keep current state
480-
new_state = self.state;
470+
match get_result {
471+
Ok(ContractHandlerEvent::GetResponse {
472+
response:
473+
Ok(StoreResponse {
474+
state: Some(state),
475+
contract,
476+
}),
477+
..
478+
}) => {
479+
// Contract found locally!
480+
tracing::debug!(tx = %id, %key, "Contract found locally in RequestGet handler");
481+
482+
// Check if this is a forwarded request or a local request
483+
match &self.state {
484+
Some(GetState::ReceivedRequest { requester })
485+
if requester.is_some() =>
486+
{
487+
// This is a forwarded request - send result back to requester
488+
let requester = requester.clone().unwrap();
489+
tracing::debug!(tx = %id, "Returning contract {} to requester {}", key, requester.peer);
490+
new_state = None;
491+
return_msg = Some(GetMsg::ReturnGet {
492+
id: *id,
493+
key: *key,
494+
value: StoreResponse {
495+
state: Some(state),
496+
contract,
497+
},
498+
sender: target.clone(),
499+
target: requester,
500+
skip_list: skip_list.clone(),
501+
});
502+
}
503+
_ => {
504+
// This is the original requester (locally initiated request)
505+
new_state = Some(GetState::Finished { key: *key });
506+
return_msg = None;
507+
result = Some(GetResult {
508+
key: *key,
509+
state,
510+
contract,
511+
});
512+
}
513+
}
514+
}
515+
_ => {
516+
// Contract not found locally, proceed with forwarding
517+
tracing::debug!(tx = %id, %key, "Contract not found locally, forwarding to {}", target.peer);
481518

482-
// Prepare skip list with own peer ID
483-
let own_loc = op_manager.ring.connection_manager.own_location();
484-
let mut new_skip_list = skip_list.clone();
485-
new_skip_list.insert(own_loc.peer.clone());
519+
// Keep current state
520+
new_state = self.state;
486521

487-
// Create seek node message
488-
return_msg = Some(GetMsg::SeekNode {
489-
key: *key,
490-
id: *id,
491-
target: target.clone(),
492-
sender: own_loc.clone(),
493-
fetch_contract: *fetch_contract,
494-
htl: op_manager.ring.max_hops_to_live,
495-
skip_list: new_skip_list,
496-
});
522+
// Prepare skip list with own peer ID
523+
let own_loc = op_manager.ring.connection_manager.own_location();
524+
let mut new_skip_list = skip_list.clone();
525+
new_skip_list.insert(own_loc.peer.clone());
526+
527+
// Create seek node message
528+
return_msg = Some(GetMsg::SeekNode {
529+
key: *key,
530+
id: *id,
531+
target: target.clone(),
532+
sender: own_loc.clone(),
533+
fetch_contract: *fetch_contract,
534+
htl: op_manager.ring.max_hops_to_live,
535+
skip_list: new_skip_list,
536+
});
537+
}
497538
}
498539
}
499540
}
@@ -568,7 +609,7 @@ impl Operation for GetOp {
568609
return_msg = None;
569610
}
570611
}
571-
Some(GetState::ReceivedRequest) => {
612+
Some(GetState::ReceivedRequest { .. }) => {
572613
// Return contract to sender
573614
new_state = None;
574615
tracing::debug!(tx = %id, "Returning contract {} to {}", key, sender.peer);
@@ -800,7 +841,7 @@ impl Operation for GetOp {
800841
}
801842
}
802843
}
803-
Some(GetState::ReceivedRequest) => {
844+
Some(GetState::ReceivedRequest { .. }) => {
804845
// Return failure to sender
805846
tracing::debug!(tx = %id, "Returning contract {} to {}", key, sender.peer);
806847
new_state = None;
@@ -1045,7 +1086,7 @@ impl Operation for GetOp {
10451086
contract: contract.clone(),
10461087
});
10471088
}
1048-
Some(GetState::ReceivedRequest) => {
1089+
Some(GetState::ReceivedRequest { .. }) => {
10491090
// Return response to sender
10501091
tracing::info!(tx = %id, "Returning contract {} to {}", key, sender.peer);
10511092
new_state = None;
@@ -1223,6 +1264,7 @@ mod messages {
12231264
RequestGet {
12241265
id: Transaction,
12251266
target: PeerKeyLocation,
1267+
sender: PeerKeyLocation,
12261268
key: ContractKey,
12271269
fetch_contract: bool,
12281270
skip_list: HashSet<PeerId>,
@@ -1275,8 +1317,9 @@ mod messages {
12751317
impl GetMsg {
12761318
pub fn sender(&self) -> Option<&PeerKeyLocation> {
12771319
match self {
1278-
Self::SeekNode { target, .. } => Some(target),
1279-
_ => None,
1320+
Self::RequestGet { sender, .. } => Some(sender),
1321+
Self::SeekNode { sender, .. } => Some(sender),
1322+
Self::ReturnGet { sender, .. } => Some(sender),
12801323
}
12811324
}
12821325
}

crates/core/src/operations/mod.rs

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -115,14 +115,27 @@ where
115115
return_msg: Some(msg),
116116
state: Some(updated_state),
117117
}) => {
118-
// updated op
119-
let id = *msg.id();
120-
tracing::debug!(%id, "updated op state");
121-
if let Some(target) = msg.target() {
122-
tracing::debug!(%id, %target, "sending updated op state");
123-
network_bridge.send(&target.peer, msg).await?;
118+
// Check if operation is finalized while sending a message (e.g., forwarding upstream)
119+
if updated_state.finalized() {
120+
// Operation is complete but needs to send a message
121+
let id = *msg.id();
122+
tracing::debug!(%id, "operation finalized with message to send");
123+
op_manager.completed(id);
124+
if let Some(target) = msg.target() {
125+
tracing::debug!(%id, %target, "sending final message to target");
126+
network_bridge.send(&target.peer, msg).await?;
127+
}
128+
return Ok(Some(updated_state));
129+
} else {
130+
// Normal case: operation in progress, send message and push back
131+
let id = *msg.id();
132+
tracing::debug!(%id, "updated op state");
133+
if let Some(target) = msg.target() {
134+
tracing::debug!(%id, %target, "sending updated op state");
135+
network_bridge.send(&target.peer, msg).await?;
136+
}
137+
op_manager.push(id, updated_state).await?;
124138
}
125-
op_manager.push(id, updated_state).await?;
126139
}
127140

128141
Ok(OperationResult {

crates/core/src/operations/put.rs

Lines changed: 51 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -258,20 +258,44 @@ impl Operation for PutOp {
258258
value.clone()
259259
};
260260

261-
// Create a SeekNode message to find the target node
262-
// fixme: this node should filter out incoming redundant puts since is the one initiating the request
263-
return_msg = Some(PutMsg::SeekNode {
264-
id: *id,
265-
sender,
266-
target: target.clone(),
267-
value: modified_value, // Use the modified value from put_contract
261+
// Determine next forwarding target - find peers closer to the contract location
262+
// Don't reuse the target from RequestPut as that's US (the current processing peer)
263+
let next_target = op_manager
264+
.ring
265+
.closest_potentially_caching(&key, [&sender.peer].as_slice());
266+
267+
if let Some(forward_target) = next_target {
268+
// Create a SeekNode message to forward to the next hop
269+
return_msg = Some(PutMsg::SeekNode {
270+
id: *id,
271+
sender,
272+
target: forward_target,
273+
value: modified_value.clone(),
274+
contract: contract.clone(),
275+
related_contracts: related_contracts.clone(),
276+
htl: *htl,
277+
});
278+
} else {
279+
// No other peers to forward to - we're the final destination
280+
tracing::debug!(
281+
tx = %id,
282+
%key,
283+
"No peers to forward to - handling PUT completion locally"
284+
);
285+
return_msg = None;
286+
}
287+
288+
// Transition to AwaitingResponse state to handle future SuccessfulPut messages
289+
new_state = Some(PutState::AwaitingResponse {
290+
key,
291+
upstream: match &self.state {
292+
Some(PutState::ReceivedRequest) => None,
293+
_ => None,
294+
},
268295
contract: contract.clone(),
269-
related_contracts: related_contracts.clone(),
270-
htl: *htl,
296+
state: modified_value,
297+
subscribe: false,
271298
});
272-
273-
// No changes to state yet, still in AwaitResponse state
274-
new_state = self.state;
275299
}
276300
PutMsg::SeekNode {
277301
id,
@@ -587,6 +611,16 @@ impl Operation for PutOp {
587611
return_msg = None;
588612
}
589613
}
614+
Some(PutState::Finished { .. }) => {
615+
// Operation already completed - this is a duplicate SuccessfulPut message
616+
// This can happen when multiple peers send success confirmations
617+
tracing::debug!(
618+
tx = %id,
619+
"Received duplicate SuccessfulPut for already completed operation, ignoring"
620+
);
621+
new_state = None; // Mark for completion
622+
return_msg = None;
623+
}
590624
_ => return Err(OpError::invalid_transition(self.id)),
591625
};
592626
}
@@ -802,7 +836,11 @@ async fn try_to_broadcast(
802836
new_state = Some(PutState::Finished { key });
803837
return_msg = None;
804838
}
805-
Some(PutState::ReceivedRequest | PutState::BroadcastOngoing) => {
839+
Some(
840+
PutState::ReceivedRequest
841+
| PutState::BroadcastOngoing
842+
| PutState::AwaitingResponse { .. },
843+
) => {
806844
if broadcast_to.is_empty() && !last_hop {
807845
// broadcast complete
808846
tracing::debug!(

0 commit comments

Comments
 (0)