File tree Expand file tree Collapse file tree 2 files changed +37
-2
lines changed
crates/core/src/operations Expand file tree Collapse file tree 2 files changed +37
-2
lines changed Original file line number Diff line number Diff line change @@ -570,6 +570,11 @@ impl Operation for PutOp {
570570 return_msg = None ;
571571 }
572572 PutMsg :: SuccessfulPut { id, .. } => {
573+ tracing:: debug!(
574+ tx = %id,
575+ current_state = ?self . state,
576+ "PutOp::process_message: handling SuccessfulPut"
577+ ) ;
573578 match self . state {
574579 Some ( PutState :: AwaitingResponse {
575580 key,
@@ -654,13 +659,24 @@ impl Operation for PutOp {
654659
655660 // Forward success message upstream if needed
656661 if let Some ( upstream) = upstream {
662+ tracing:: trace!(
663+ tx = %id,
664+ %key,
665+ upstream = %upstream. peer,
666+ "PutOp::process_message: Forwarding SuccessfulPut upstream"
667+ ) ;
657668 return_msg = Some ( PutMsg :: SuccessfulPut {
658669 id : * id,
659670 target : upstream,
660671 key,
661672 sender : op_manager. ring . connection_manager . own_location ( ) ,
662673 } ) ;
663674 } else {
675+ tracing:: trace!(
676+ tx = %id,
677+ %key,
678+ "PutOp::process_message: SuccessfulPut originated locally; no upstream"
679+ ) ;
664680 return_msg = None ;
665681 }
666682 }
Original file line number Diff line number Diff line change @@ -786,10 +786,29 @@ impl OpManager {
786786 . subscribers_of ( key)
787787 . map ( |subs| {
788788 let self_peer = self . ring . connection_manager . get_peer_key ( ) ;
789+ let allow_self = self_peer. as_ref ( ) . map ( |me| me == sender) . unwrap_or ( false ) ;
789790 subs. value ( )
790791 . iter ( )
791- . filter ( |pk| & pk. peer != sender)
792- . filter ( |pk| self_peer. as_ref ( ) . map ( |me| & pk. peer != me) . unwrap_or ( true ) )
792+ . filter ( |pk| {
793+ // Allow the sender to remain in the broadcast list when we're the sender,
794+ // so local auto-subscribe via GET/PUT still receives notifications.
795+ if & pk. peer == sender {
796+ allow_self
797+ } else {
798+ true
799+ }
800+ } )
801+ . filter ( |pk| {
802+ if let Some ( self_peer) = & self_peer {
803+ if & pk. peer == self_peer {
804+ allow_self
805+ } else {
806+ true
807+ }
808+ } else {
809+ true
810+ }
811+ } )
793812 . cloned ( )
794813 . collect :: < Vec < _ > > ( )
795814 } )
You can’t perform that action at this time.
0 commit comments