Skip to content

Commit 0a1a2f5

Browse files
committed
fix(put): deliver SuccessfulPut directly to origin
1 parent 067b91f commit 0a1a2f5

File tree

2 files changed

+74
-5
lines changed

2 files changed

+74
-5
lines changed

crates/core/src/operations/put.rs

Lines changed: 73 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,7 @@ impl Operation for PutOp {
164164
PutMsg::RequestPut {
165165
id,
166166
sender,
167+
origin,
167168
contract,
168169
related_contracts,
169170
value,
@@ -276,6 +277,7 @@ impl Operation for PutOp {
276277
return_msg = Some(PutMsg::SeekNode {
277278
id: *id,
278279
sender: own_location.clone(),
280+
origin: origin.clone(),
279281
target: forward_target,
280282
value: modified_value.clone(),
281283
contract: contract.clone(),
@@ -290,6 +292,7 @@ impl Operation for PutOp {
290292
contract: contract.clone(),
291293
state: modified_value,
292294
subscribe,
295+
origin: origin.clone(),
293296
});
294297
} else {
295298
// No other peers to forward to - we're the final destination
@@ -305,6 +308,7 @@ impl Operation for PutOp {
305308
target: sender.clone(),
306309
key,
307310
sender: own_location.clone(),
311+
origin: origin.clone(),
308312
});
309313

310314
// Mark operation as finished
@@ -319,6 +323,7 @@ impl Operation for PutOp {
319323
htl,
320324
target,
321325
sender,
326+
origin,
322327
} => {
323328
// Get the contract key and check if we should handle it
324329
let key = contract.key();
@@ -345,6 +350,7 @@ impl Operation for PutOp {
345350
*id,
346351
new_htl,
347352
HashSet::from([sender.peer.clone()]),
353+
origin.clone(),
348354
)
349355
.await
350356
} else {
@@ -406,6 +412,7 @@ impl Operation for PutOp {
406412
last_hop,
407413
op_manager,
408414
self.state,
415+
origin.clone(),
409416
(broadcast_to, sender.clone()),
410417
key,
411418
(contract.clone(), value.clone()),
@@ -425,6 +432,7 @@ impl Operation for PutOp {
425432
new_value,
426433
contract,
427434
sender,
435+
origin,
428436
..
429437
} => {
430438
// Get own location
@@ -457,6 +465,7 @@ impl Operation for PutOp {
457465
false,
458466
op_manager,
459467
self.state,
468+
origin.clone(),
460469
(broadcast_to, sender.clone()),
461470
*key,
462471
(contract.clone(), updated_value),
@@ -478,6 +487,7 @@ impl Operation for PutOp {
478487
new_value,
479488
contract,
480489
upstream,
490+
origin,
481491
..
482492
} => {
483493
// Get own location and initialize counter
@@ -502,6 +512,7 @@ impl Operation for PutOp {
502512
target: upstream.clone(),
503513
key: *key,
504514
sender: sender.clone(),
515+
origin: origin.clone(),
505516
};
506517

507518
tracing::trace!(
@@ -526,6 +537,7 @@ impl Operation for PutOp {
526537
key: *key,
527538
new_value: new_value.clone(),
528539
sender: sender.clone(),
540+
origin: origin.clone(),
529541
contract: contract.clone(),
530542
target: peer.clone(),
531543
};
@@ -582,6 +594,7 @@ impl Operation for PutOp {
582594
contract,
583595
state,
584596
subscribe,
597+
origin: state_origin,
585598
}) => {
586599
tracing::debug!(
587600
tx = %id,
@@ -657,19 +670,22 @@ impl Operation for PutOp {
657670
}
658671
}
659672

673+
let local_peer = op_manager.ring.connection_manager.own_location();
674+
660675
// Forward success message upstream if needed
661-
if let Some(upstream) = upstream {
676+
if let Some(upstream_peer) = upstream.clone() {
662677
tracing::trace!(
663678
tx = %id,
664679
%key,
665-
upstream = %upstream.peer,
680+
upstream = %upstream_peer.peer,
666681
"PutOp::process_message: Forwarding SuccessfulPut upstream"
667682
);
668683
return_msg = Some(PutMsg::SuccessfulPut {
669684
id: *id,
670-
target: upstream,
685+
target: upstream_peer,
671686
key,
672-
sender: op_manager.ring.connection_manager.own_location(),
687+
sender: local_peer.clone(),
688+
origin: state_origin.clone(),
673689
});
674690
} else {
675691
tracing::trace!(
@@ -679,6 +695,34 @@ impl Operation for PutOp {
679695
);
680696
return_msg = None;
681697
}
698+
699+
// Send a direct acknowledgement to the original requester if we are not it
700+
if state_origin.peer != local_peer.peer
701+
&& !upstream
702+
.as_ref()
703+
.map(|u| u.peer == state_origin.peer)
704+
.unwrap_or(false)
705+
{
706+
let direct_ack = PutMsg::SuccessfulPut {
707+
id: *id,
708+
target: state_origin.clone(),
709+
key,
710+
sender: local_peer,
711+
origin: state_origin.clone(),
712+
};
713+
714+
if let Err(err) = conn_manager
715+
.send(&state_origin.peer, NetMessage::from(direct_ack))
716+
.await
717+
{
718+
tracing::warn!(
719+
tx = %id,
720+
%key,
721+
origin_peer = %state_origin.peer,
722+
"Failed to send direct SuccessfulPut to origin: {err}"
723+
);
724+
}
725+
}
682726
}
683727
Some(PutState::Finished { .. }) => {
684728
// Operation already completed - this is a duplicate SuccessfulPut message
@@ -700,6 +744,7 @@ impl Operation for PutOp {
700744
htl,
701745
sender,
702746
skip_list,
747+
origin,
703748
..
704749
} => {
705750
// Get contract key and own location
@@ -747,6 +792,7 @@ impl Operation for PutOp {
747792
*id,
748793
new_htl,
749794
new_skip_list.clone(),
795+
origin.clone(),
750796
)
751797
.await;
752798

@@ -815,6 +861,7 @@ impl Operation for PutOp {
815861
last_hop,
816862
op_manager,
817863
self.state,
864+
origin.clone(),
818865
(broadcast_to, sender.clone()),
819866
key,
820867
(contract.clone(), new_value.clone()),
@@ -868,11 +915,13 @@ fn build_op_result(
868915
})
869916
}
870917

918+
#[allow(clippy::too_many_arguments)]
871919
async fn try_to_broadcast(
872920
id: Transaction,
873921
last_hop: bool,
874922
op_manager: &OpManager,
875923
state: Option<PutState>,
924+
origin: PeerKeyLocation,
876925
(broadcast_to, upstream): (Vec<PeerKeyLocation>, PeerKeyLocation),
877926
key: ContractKey,
878927
(contract, new_value): (ContractContainer, WrappedState),
@@ -940,6 +989,7 @@ async fn try_to_broadcast(
940989
contract: contract.clone(), // No longer optional
941990
state: new_value.clone(),
942991
subscribe,
992+
origin: origin.clone(),
943993
});
944994
return_msg = None;
945995
} else if !broadcast_to.is_empty() {
@@ -954,6 +1004,7 @@ async fn try_to_broadcast(
9541004
contract,
9551005
upstream,
9561006
sender: op_manager.ring.connection_manager.own_location(),
1007+
origin: origin.clone(),
9571008
});
9581009

9591010
let op = PutOp {
@@ -971,6 +1022,7 @@ async fn try_to_broadcast(
9711022
target: upstream,
9721023
key,
9731024
sender: op_manager.ring.connection_manager.own_location(),
1025+
origin,
9741026
});
9751027
}
9761028
}
@@ -1030,6 +1082,7 @@ pub(crate) fn start_op_with_id(
10301082
}
10311083

10321084
#[derive(Debug)]
1085+
#[allow(clippy::large_enum_variant)]
10331086
pub enum PutState {
10341087
ReceivedRequest,
10351088
/// Preparing request for put op.
@@ -1047,6 +1100,7 @@ pub enum PutState {
10471100
contract: ContractContainer,
10481101
state: WrappedState,
10491102
subscribe: bool,
1103+
origin: PeerKeyLocation,
10501104
},
10511105
/// Broadcasting changes to subscribers.
10521106
BroadcastOngoing,
@@ -1127,6 +1181,7 @@ pub(crate) async fn request_put(op_manager: &OpManager, mut put_op: PutOp) -> Re
11271181
contract: contract.clone(),
11281182
state: updated_value.clone(),
11291183
subscribe,
1184+
origin: own_location.clone(),
11301185
});
11311186

11321187
// Create a SuccessfulPut message to trigger the completion handling
@@ -1135,6 +1190,7 @@ pub(crate) async fn request_put(op_manager: &OpManager, mut put_op: PutOp) -> Re
11351190
target: own_location.clone(),
11361191
key,
11371192
sender: own_location.clone(),
1193+
origin: own_location.clone(),
11381194
};
11391195

11401196
// Use notify_op_change to trigger the completion handling
@@ -1153,6 +1209,7 @@ pub(crate) async fn request_put(op_manager: &OpManager, mut put_op: PutOp) -> Re
11531209
false,
11541210
op_manager,
11551211
broadcast_state,
1212+
own_location.clone(),
11561213
(broadcast_to, sender),
11571214
key,
11581215
(contract.clone(), updated_value),
@@ -1217,12 +1274,14 @@ pub(crate) async fn request_put(op_manager: &OpManager, mut put_op: PutOp) -> Re
12171274
contract: contract.clone(),
12181275
state: updated_value.clone(),
12191276
subscribe,
1277+
origin: own_location.clone(),
12201278
});
12211279

12221280
// Create RequestPut message and forward to target peer
12231281
let msg = PutMsg::RequestPut {
12241282
id,
1225-
sender: own_location,
1283+
sender: own_location.clone(),
1284+
origin: own_location,
12261285
contract,
12271286
related_contracts,
12281287
value: updated_value,
@@ -1282,6 +1341,7 @@ async fn put_contract(
12821341
/// It returns whether this peer should be storing the contract or not.
12831342
///
12841343
/// This operation is "fire and forget" and the node does not keep track if is successful or not.
1344+
#[allow(clippy::too_many_arguments)]
12851345
async fn forward_put<CB>(
12861346
op_manager: &OpManager,
12871347
conn_manager: &CB,
@@ -1290,6 +1350,7 @@ async fn forward_put<CB>(
12901350
id: Transaction,
12911351
htl: usize,
12921352
skip_list: HashSet<PeerId>,
1353+
origin: PeerKeyLocation,
12931354
) -> bool
12941355
where
12951356
CB: NetworkBridge,
@@ -1347,6 +1408,7 @@ where
13471408
id,
13481409
sender: own_pkloc,
13491410
target: peer.clone(),
1411+
origin,
13501412
contract: contract.clone(),
13511413
new_value: new_value.clone(),
13521414
htl,
@@ -1386,6 +1448,7 @@ mod messages {
13861448
RequestPut {
13871449
id: Transaction,
13881450
sender: PeerKeyLocation,
1451+
origin: PeerKeyLocation,
13891452
contract: ContractContainer,
13901453
#[serde(deserialize_with = "RelatedContracts::deser_related_contracts")]
13911454
related_contracts: RelatedContracts<'static>,
@@ -1401,6 +1464,7 @@ mod messages {
14011464
id: Transaction,
14021465
sender: PeerKeyLocation,
14031466
target: PeerKeyLocation,
1467+
origin: PeerKeyLocation,
14041468
contract: ContractContainer,
14051469
new_value: WrappedState,
14061470
/// current htl, reduced by one at each hop
@@ -1413,12 +1477,14 @@ mod messages {
14131477
target: PeerKeyLocation,
14141478
key: ContractKey,
14151479
sender: PeerKeyLocation,
1480+
origin: PeerKeyLocation,
14161481
},
14171482
/// Target the node which is closest to the key
14181483
SeekNode {
14191484
id: Transaction,
14201485
sender: PeerKeyLocation,
14211486
target: PeerKeyLocation,
1487+
origin: PeerKeyLocation,
14221488
value: WrappedState,
14231489
contract: ContractContainer,
14241490
#[serde(deserialize_with = "RelatedContracts::deser_related_contracts")]
@@ -1436,11 +1502,13 @@ mod messages {
14361502
contract: ContractContainer,
14371503
upstream: PeerKeyLocation,
14381504
sender: PeerKeyLocation,
1505+
origin: PeerKeyLocation,
14391506
},
14401507
/// Broadcasting a change to a peer, which then will relay the changes to other peers.
14411508
BroadcastTo {
14421509
id: Transaction,
14431510
sender: PeerKeyLocation,
1511+
origin: PeerKeyLocation,
14441512
key: ContractKey,
14451513
new_value: WrappedState,
14461514
contract: ContractContainer,

crates/core/src/tracing/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,7 @@ impl<'a> NetEventLog<'a> {
241241
target,
242242
key,
243243
sender,
244+
..
244245
}) => EventKind::Put(PutEvent::PutSuccess {
245246
id: *id,
246247
requester: sender.clone(),

0 commit comments

Comments
 (0)