@@ -164,6 +164,7 @@ impl Operation for PutOp {
164164 PutMsg :: RequestPut {
165165 id,
166166 sender,
167+ origin,
167168 contract,
168169 related_contracts,
169170 value,
@@ -276,6 +277,7 @@ impl Operation for PutOp {
276277 return_msg = Some ( PutMsg :: SeekNode {
277278 id : * id,
278279 sender : own_location. clone ( ) ,
280+ origin : origin. clone ( ) ,
279281 target : forward_target,
280282 value : modified_value. clone ( ) ,
281283 contract : contract. clone ( ) ,
@@ -290,6 +292,7 @@ impl Operation for PutOp {
290292 contract : contract. clone ( ) ,
291293 state : modified_value,
292294 subscribe,
295+ origin : origin. clone ( ) ,
293296 } ) ;
294297 } else {
295298 // No other peers to forward to - we're the final destination
@@ -305,6 +308,7 @@ impl Operation for PutOp {
305308 target : sender. clone ( ) ,
306309 key,
307310 sender : own_location. clone ( ) ,
311+ origin : origin. clone ( ) ,
308312 } ) ;
309313
310314 // Mark operation as finished
@@ -319,6 +323,7 @@ impl Operation for PutOp {
319323 htl,
320324 target,
321325 sender,
326+ origin,
322327 } => {
323328 // Get the contract key and check if we should handle it
324329 let key = contract. key ( ) ;
@@ -345,6 +350,7 @@ impl Operation for PutOp {
345350 * id,
346351 new_htl,
347352 HashSet :: from ( [ sender. peer . clone ( ) ] ) ,
353+ origin. clone ( ) ,
348354 )
349355 . await
350356 } else {
@@ -406,6 +412,7 @@ impl Operation for PutOp {
406412 last_hop,
407413 op_manager,
408414 self . state ,
415+ origin. clone ( ) ,
409416 ( broadcast_to, sender. clone ( ) ) ,
410417 key,
411418 ( contract. clone ( ) , value. clone ( ) ) ,
@@ -425,6 +432,7 @@ impl Operation for PutOp {
425432 new_value,
426433 contract,
427434 sender,
435+ origin,
428436 ..
429437 } => {
430438 // Get own location
@@ -457,6 +465,7 @@ impl Operation for PutOp {
457465 false ,
458466 op_manager,
459467 self . state ,
468+ origin. clone ( ) ,
460469 ( broadcast_to, sender. clone ( ) ) ,
461470 * key,
462471 ( contract. clone ( ) , updated_value) ,
@@ -478,6 +487,7 @@ impl Operation for PutOp {
478487 new_value,
479488 contract,
480489 upstream,
490+ origin,
481491 ..
482492 } => {
483493 // Get own location and initialize counter
@@ -502,6 +512,7 @@ impl Operation for PutOp {
502512 target : upstream. clone ( ) ,
503513 key : * key,
504514 sender : sender. clone ( ) ,
515+ origin : origin. clone ( ) ,
505516 } ;
506517
507518 tracing:: trace!(
@@ -526,6 +537,7 @@ impl Operation for PutOp {
526537 key : * key,
527538 new_value : new_value. clone ( ) ,
528539 sender : sender. clone ( ) ,
540+ origin : origin. clone ( ) ,
529541 contract : contract. clone ( ) ,
530542 target : peer. clone ( ) ,
531543 } ;
@@ -582,6 +594,7 @@ impl Operation for PutOp {
582594 contract,
583595 state,
584596 subscribe,
597+ origin : state_origin,
585598 } ) => {
586599 tracing:: debug!(
587600 tx = %id,
@@ -657,19 +670,22 @@ impl Operation for PutOp {
657670 }
658671 }
659672
673+ let local_peer = op_manager. ring . connection_manager . own_location ( ) ;
674+
660675 // Forward success message upstream if needed
661- if let Some ( upstream ) = upstream {
676+ if let Some ( upstream_peer ) = upstream. clone ( ) {
662677 tracing:: trace!(
663678 tx = %id,
664679 %key,
665- upstream = %upstream . peer,
680+ upstream = %upstream_peer . peer,
666681 "PutOp::process_message: Forwarding SuccessfulPut upstream"
667682 ) ;
668683 return_msg = Some ( PutMsg :: SuccessfulPut {
669684 id : * id,
670- target : upstream ,
685+ target : upstream_peer ,
671686 key,
672- sender : op_manager. ring . connection_manager . own_location ( ) ,
687+ sender : local_peer. clone ( ) ,
688+ origin : state_origin. clone ( ) ,
673689 } ) ;
674690 } else {
675691 tracing:: trace!(
@@ -679,6 +695,34 @@ impl Operation for PutOp {
679695 ) ;
680696 return_msg = None ;
681697 }
698+
699+ // Send a direct acknowledgement to the original requester if we are not it
700+ if state_origin. peer != local_peer. peer
701+ && !upstream
702+ . as_ref ( )
703+ . map ( |u| u. peer == state_origin. peer )
704+ . unwrap_or ( false )
705+ {
706+ let direct_ack = PutMsg :: SuccessfulPut {
707+ id : * id,
708+ target : state_origin. clone ( ) ,
709+ key,
710+ sender : local_peer,
711+ origin : state_origin. clone ( ) ,
712+ } ;
713+
714+ if let Err ( err) = conn_manager
715+ . send ( & state_origin. peer , NetMessage :: from ( direct_ack) )
716+ . await
717+ {
718+ tracing:: warn!(
719+ tx = %id,
720+ %key,
721+ origin_peer = %state_origin. peer,
722+ "Failed to send direct SuccessfulPut to origin: {err}"
723+ ) ;
724+ }
725+ }
682726 }
683727 Some ( PutState :: Finished { .. } ) => {
684728 // Operation already completed - this is a duplicate SuccessfulPut message
@@ -700,6 +744,7 @@ impl Operation for PutOp {
700744 htl,
701745 sender,
702746 skip_list,
747+ origin,
703748 ..
704749 } => {
705750 // Get contract key and own location
@@ -747,6 +792,7 @@ impl Operation for PutOp {
747792 * id,
748793 new_htl,
749794 new_skip_list. clone ( ) ,
795+ origin. clone ( ) ,
750796 )
751797 . await ;
752798
@@ -815,6 +861,7 @@ impl Operation for PutOp {
815861 last_hop,
816862 op_manager,
817863 self . state ,
864+ origin. clone ( ) ,
818865 ( broadcast_to, sender. clone ( ) ) ,
819866 key,
820867 ( contract. clone ( ) , new_value. clone ( ) ) ,
@@ -868,11 +915,13 @@ fn build_op_result(
868915 } )
869916}
870917
918+ #[ allow( clippy:: too_many_arguments) ]
871919async fn try_to_broadcast (
872920 id : Transaction ,
873921 last_hop : bool ,
874922 op_manager : & OpManager ,
875923 state : Option < PutState > ,
924+ origin : PeerKeyLocation ,
876925 ( broadcast_to, upstream) : ( Vec < PeerKeyLocation > , PeerKeyLocation ) ,
877926 key : ContractKey ,
878927 ( contract, new_value) : ( ContractContainer , WrappedState ) ,
@@ -940,6 +989,7 @@ async fn try_to_broadcast(
940989 contract : contract. clone ( ) , // No longer optional
941990 state : new_value. clone ( ) ,
942991 subscribe,
992+ origin : origin. clone ( ) ,
943993 } ) ;
944994 return_msg = None ;
945995 } else if !broadcast_to. is_empty ( ) {
@@ -954,6 +1004,7 @@ async fn try_to_broadcast(
9541004 contract,
9551005 upstream,
9561006 sender : op_manager. ring . connection_manager . own_location ( ) ,
1007+ origin : origin. clone ( ) ,
9571008 } ) ;
9581009
9591010 let op = PutOp {
@@ -971,6 +1022,7 @@ async fn try_to_broadcast(
9711022 target : upstream,
9721023 key,
9731024 sender : op_manager. ring . connection_manager . own_location ( ) ,
1025+ origin,
9741026 } ) ;
9751027 }
9761028 }
@@ -1030,6 +1082,7 @@ pub(crate) fn start_op_with_id(
10301082}
10311083
10321084#[ derive( Debug ) ]
1085+ #[ allow( clippy:: large_enum_variant) ]
10331086pub enum PutState {
10341087 ReceivedRequest ,
10351088 /// Preparing request for put op.
@@ -1047,6 +1100,7 @@ pub enum PutState {
10471100 contract : ContractContainer ,
10481101 state : WrappedState ,
10491102 subscribe : bool ,
1103+ origin : PeerKeyLocation ,
10501104 } ,
10511105 /// Broadcasting changes to subscribers.
10521106 BroadcastOngoing ,
@@ -1127,6 +1181,7 @@ pub(crate) async fn request_put(op_manager: &OpManager, mut put_op: PutOp) -> Re
11271181 contract : contract. clone ( ) ,
11281182 state : updated_value. clone ( ) ,
11291183 subscribe,
1184+ origin : own_location. clone ( ) ,
11301185 } ) ;
11311186
11321187 // Create a SuccessfulPut message to trigger the completion handling
@@ -1135,6 +1190,7 @@ pub(crate) async fn request_put(op_manager: &OpManager, mut put_op: PutOp) -> Re
11351190 target : own_location. clone ( ) ,
11361191 key,
11371192 sender : own_location. clone ( ) ,
1193+ origin : own_location. clone ( ) ,
11381194 } ;
11391195
11401196 // Use notify_op_change to trigger the completion handling
@@ -1153,6 +1209,7 @@ pub(crate) async fn request_put(op_manager: &OpManager, mut put_op: PutOp) -> Re
11531209 false ,
11541210 op_manager,
11551211 broadcast_state,
1212+ own_location. clone ( ) ,
11561213 ( broadcast_to, sender) ,
11571214 key,
11581215 ( contract. clone ( ) , updated_value) ,
@@ -1217,12 +1274,14 @@ pub(crate) async fn request_put(op_manager: &OpManager, mut put_op: PutOp) -> Re
12171274 contract : contract. clone ( ) ,
12181275 state : updated_value. clone ( ) ,
12191276 subscribe,
1277+ origin : own_location. clone ( ) ,
12201278 } ) ;
12211279
12221280 // Create RequestPut message and forward to target peer
12231281 let msg = PutMsg :: RequestPut {
12241282 id,
1225- sender : own_location,
1283+ sender : own_location. clone ( ) ,
1284+ origin : own_location,
12261285 contract,
12271286 related_contracts,
12281287 value : updated_value,
@@ -1282,6 +1341,7 @@ async fn put_contract(
12821341/// It returns whether this peer should be storing the contract or not.
12831342///
12841343/// This operation is "fire and forget" and the node does not keep track if is successful or not.
1344+ #[ allow( clippy:: too_many_arguments) ]
12851345async fn forward_put < CB > (
12861346 op_manager : & OpManager ,
12871347 conn_manager : & CB ,
@@ -1290,6 +1350,7 @@ async fn forward_put<CB>(
12901350 id : Transaction ,
12911351 htl : usize ,
12921352 skip_list : HashSet < PeerId > ,
1353+ origin : PeerKeyLocation ,
12931354) -> bool
12941355where
12951356 CB : NetworkBridge ,
@@ -1347,6 +1408,7 @@ where
13471408 id,
13481409 sender : own_pkloc,
13491410 target : peer. clone ( ) ,
1411+ origin,
13501412 contract : contract. clone ( ) ,
13511413 new_value : new_value. clone ( ) ,
13521414 htl,
@@ -1386,6 +1448,7 @@ mod messages {
13861448 RequestPut {
13871449 id : Transaction ,
13881450 sender : PeerKeyLocation ,
1451+ origin : PeerKeyLocation ,
13891452 contract : ContractContainer ,
13901453 #[ serde( deserialize_with = "RelatedContracts::deser_related_contracts" ) ]
13911454 related_contracts : RelatedContracts < ' static > ,
@@ -1401,6 +1464,7 @@ mod messages {
14011464 id : Transaction ,
14021465 sender : PeerKeyLocation ,
14031466 target : PeerKeyLocation ,
1467+ origin : PeerKeyLocation ,
14041468 contract : ContractContainer ,
14051469 new_value : WrappedState ,
14061470 /// current htl, reduced by one at each hop
@@ -1413,12 +1477,14 @@ mod messages {
14131477 target : PeerKeyLocation ,
14141478 key : ContractKey ,
14151479 sender : PeerKeyLocation ,
1480+ origin : PeerKeyLocation ,
14161481 } ,
14171482 /// Target the node which is closest to the key
14181483 SeekNode {
14191484 id : Transaction ,
14201485 sender : PeerKeyLocation ,
14211486 target : PeerKeyLocation ,
1487+ origin : PeerKeyLocation ,
14221488 value : WrappedState ,
14231489 contract : ContractContainer ,
14241490 #[ serde( deserialize_with = "RelatedContracts::deser_related_contracts" ) ]
@@ -1436,11 +1502,13 @@ mod messages {
14361502 contract : ContractContainer ,
14371503 upstream : PeerKeyLocation ,
14381504 sender : PeerKeyLocation ,
1505+ origin : PeerKeyLocation ,
14391506 } ,
14401507 /// Broadcasting a change to a peer, which then will relay the changes to other peers.
14411508 BroadcastTo {
14421509 id : Transaction ,
14431510 sender : PeerKeyLocation ,
1511+ origin : PeerKeyLocation ,
14441512 key : ContractKey ,
14451513 new_value : WrappedState ,
14461514 contract : ContractContainer ,
0 commit comments