Skip to content

Commit 86f5119

Browse files
authored
refactor: refine fire-and-forget UPDATE semantics (#2038)
1 parent 615f02d commit 86f5119

File tree

7 files changed

+401
-322
lines changed

7 files changed

+401
-322
lines changed

crates/core/src/node/network_bridge/p2p_protoc.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1662,7 +1662,6 @@ fn extract_sender_from_message(msg: &NetMessage) -> Option<PeerKeyLocation> {
16621662
// Update messages have sender in some variants
16631663
NetMessageV1::Update(update_msg) => match update_msg {
16641664
UpdateMsg::SeekNode { sender, .. } => Some(sender.clone()),
1665-
UpdateMsg::SuccessfulUpdate { sender, .. } => Some(sender.clone()),
16661665
UpdateMsg::Broadcasting { sender, .. } => Some(sender.clone()),
16671666
UpdateMsg::BroadcastTo { sender, .. } => Some(sender.clone()),
16681667
_ => None,

crates/core/src/operations/mod.rs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,8 +133,21 @@ where
133133
if let Some(target) = msg.target() {
134134
tracing::debug!(%id, %target, "sending updated op state");
135135
network_bridge.send(&target.peer, msg).await?;
136+
op_manager.push(id, updated_state).await?;
137+
} else {
138+
tracing::debug!(%id, "queueing op state for local processing");
139+
debug_assert!(
140+
matches!(
141+
msg,
142+
NetMessage::V1(NetMessageV1::Update(
143+
crate::operations::update::UpdateMsg::Broadcasting { .. }
144+
))
145+
),
146+
"Only Update::Broadcasting messages should be re-queued locally"
147+
);
148+
op_manager.notify_op_change(msg, updated_state).await?;
149+
return Err(OpError::StatePushed);
136150
}
137-
op_manager.push(id, updated_state).await?;
138151
}
139152
}
140153

crates/core/src/operations/put.rs

Lines changed: 35 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -477,6 +477,40 @@ impl Operation for PutOp {
477477
let sender = op_manager.ring.connection_manager.own_location();
478478
let mut broadcasted_to = *broadcasted_to;
479479

480+
if upstream.peer == sender.peer {
481+
// Originator reached the subscription tree. This path should be filtered
482+
// out by the deduplication layer, so treat it as a warning if it happens
483+
// to help surface potential bugs.
484+
tracing::warn!(
485+
tx = %id,
486+
%key,
487+
"PUT originator re-entered broadcast loop; dedup should have completed"
488+
);
489+
new_state = Some(PutState::Finished { key: *key });
490+
} else {
491+
// Notify the upstream hop right away so the request
492+
// path does not wait for the broadcast to finish.
493+
let ack = PutMsg::SuccessfulPut {
494+
id: *id,
495+
target: upstream.clone(),
496+
key: *key,
497+
sender: sender.clone(),
498+
};
499+
500+
tracing::trace!(
501+
tx = %id,
502+
%key,
503+
upstream = %upstream.peer,
504+
"Forwarding SuccessfulPut upstream before broadcast"
505+
);
506+
507+
conn_manager
508+
.send(&upstream.peer, NetMessage::from(ack))
509+
.await?;
510+
511+
new_state = None;
512+
}
513+
480514
// Broadcast to all peers in parallel
481515
let mut broadcasting = Vec::with_capacity(broadcast_to.len());
482516
for peer in broadcast_to.iter() {
@@ -526,14 +560,7 @@ impl Operation for PutOp {
526560
"Successfully broadcasted put into contract {key} to {broadcasted_to} peers"
527561
);
528562

529-
// Subscriber nodes have been notified of the change, the operation is completed
530-
return_msg = Some(PutMsg::SuccessfulPut {
531-
id: *id,
532-
target: upstream.clone(),
533-
key: *key,
534-
sender,
535-
});
536-
new_state = None;
563+
return_msg = None;
537564
}
538565
PutMsg::SuccessfulPut { id, .. } => {
539566
match self.state {

0 commit comments

Comments
 (0)