Skip to content

Commit c2a2422

Browse files
committed
fix(update): keep local subscribers and add PUT ack tracing
1 parent 492a61d commit c2a2422

File tree

2 files changed

+37
-2
lines changed

2 files changed

+37
-2
lines changed

crates/core/src/operations/put.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -570,6 +570,11 @@ impl Operation for PutOp {
570570
return_msg = None;
571571
}
572572
PutMsg::SuccessfulPut { id, .. } => {
573+
tracing::debug!(
574+
tx = %id,
575+
current_state = ?self.state,
576+
"PutOp::process_message: handling SuccessfulPut"
577+
);
573578
match self.state {
574579
Some(PutState::AwaitingResponse {
575580
key,
@@ -654,13 +659,24 @@ impl Operation for PutOp {
654659

655660
// Forward success message upstream if needed
656661
if let Some(upstream) = upstream {
662+
tracing::trace!(
663+
tx = %id,
664+
%key,
665+
upstream = %upstream.peer,
666+
"PutOp::process_message: Forwarding SuccessfulPut upstream"
667+
);
657668
return_msg = Some(PutMsg::SuccessfulPut {
658669
id: *id,
659670
target: upstream,
660671
key,
661672
sender: op_manager.ring.connection_manager.own_location(),
662673
});
663674
} else {
675+
tracing::trace!(
676+
tx = %id,
677+
%key,
678+
"PutOp::process_message: SuccessfulPut originated locally; no upstream"
679+
);
664680
return_msg = None;
665681
}
666682
}

crates/core/src/operations/update.rs

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -786,10 +786,29 @@ impl OpManager {
786786
.subscribers_of(key)
787787
.map(|subs| {
788788
let self_peer = self.ring.connection_manager.get_peer_key();
789+
let allow_self = self_peer.as_ref().map(|me| me == sender).unwrap_or(false);
789790
subs.value()
790791
.iter()
791-
.filter(|pk| &pk.peer != sender)
792-
.filter(|pk| self_peer.as_ref().map(|me| &pk.peer != me).unwrap_or(true))
792+
.filter(|pk| {
793+
// Allow the sender to remain in the broadcast list when we're the sender,
794+
// so local auto-subscribe via GET/PUT still receives notifications.
795+
if &pk.peer == sender {
796+
allow_self
797+
} else {
798+
true
799+
}
800+
})
801+
.filter(|pk| {
802+
if let Some(self_peer) = &self_peer {
803+
if &pk.peer == self_peer {
804+
allow_self
805+
} else {
806+
true
807+
}
808+
} else {
809+
true
810+
}
811+
})
793812
.cloned()
794813
.collect::<Vec<_>>()
795814
})

0 commit comments

Comments
 (0)