Skip to content

Commit 4222522

Browse files
committed
Add WaitingOn* spans to inbound HTLCs and forwards
1 parent b9afbfc commit 4222522

File tree

4 files changed

+187
-21
lines changed

4 files changed

+187
-21
lines changed

lightning/src/ln/channel.rs

Lines changed: 129 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -336,14 +336,70 @@ impl InboundHTLCOutput {
336336
where
337337
L::Target: Logger,
338338
{
339+
mem::drop(self.state_wrapper.waiting_on_peer_span.take());
340+
mem::drop(self.state_wrapper.waiting_on_monitor_persist_span.take());
339341
mem::drop(self.state_wrapper.span.take());
340342
self.state_wrapper =
341343
InboundHTLCStateWrapper::new(state, self.span.as_user_span_ref::<L>(), logger);
342344
}
345+
346+
fn waiting_on_peer<L: Deref>(&mut self, logger: &L)
347+
where
348+
L::Target: Logger,
349+
{
350+
self.state_wrapper.waiting_on_peer_span = Some(BoxedSpan::new(logger.start(
351+
Span::WaitingOnPeer,
352+
self.state_wrapper.span.as_ref().map(|s| s.as_user_span_ref::<L>()).flatten(),
353+
)));
354+
}
355+
356+
fn waiting_on_monitor_persist<L: Deref>(&mut self, logger: &L)
357+
where
358+
L::Target: Logger,
359+
{
360+
self.state_wrapper.waiting_on_monitor_persist_span = Some(BoxedSpan::new(logger.start(
361+
Span::WaitingOnMonitorPersist,
362+
self.state_wrapper.span.as_ref().map(|s| s.as_user_span_ref::<L>()).flatten(),
363+
)));
364+
}
365+
366+
fn monitor_persisted(&mut self) {
367+
if self.state_wrapper.waiting_on_monitor_persist_span.is_some() {
368+
mem::drop(self.state_wrapper.waiting_on_monitor_persist_span.take());
369+
}
370+
}
371+
372+
fn is_waiting_on_monitor_persist(&self) -> bool {
373+
self.state_wrapper.waiting_on_monitor_persist_span.is_some()
374+
}
375+
376+
fn waiting_on_async_signing<L: Deref>(&mut self, logger: &L)
377+
where
378+
L::Target: Logger,
379+
{
380+
self.state_wrapper.waiting_on_async_signing_span = Some(BoxedSpan::new(logger.start(
381+
Span::WaitingOnAsyncSigning,
382+
self.state_wrapper.span.as_ref().map(|s| s.as_user_span_ref::<L>()).flatten(),
383+
)));
384+
}
385+
386+
fn async_signing_completed(&mut self) {
387+
if self.state_wrapper.waiting_on_async_signing_span.is_some() {
388+
mem::drop(self.state_wrapper.waiting_on_async_signing_span.take());
389+
}
390+
}
391+
392+
fn is_waiting_on_async_signing(&self) -> bool {
393+
self.state_wrapper.waiting_on_async_signing_span.is_some()
394+
}
343395
}
344396

345397
struct InboundHTLCStateWrapper {
346398
state: InboundHTLCState,
399+
waiting_on_peer_span: Option<BoxedSpan>,
400+
waiting_on_monitor_persist_span: Option<BoxedSpan>,
401+
waiting_on_async_signing_span: Option<BoxedSpan>,
402+
// Drop full span last.
347403
span: Option<BoxedSpan>,
348404
}
349405

@@ -357,7 +413,13 @@ impl InboundHTLCStateWrapper {
357413
{
358414
let state_span =
359415
logger.start(Span::InboundHTLCState { state: (&state).into() }, parent_span);
360-
InboundHTLCStateWrapper { state, span: Some(BoxedSpan::new(state_span)) }
416+
InboundHTLCStateWrapper {
417+
state,
418+
span: Some(BoxedSpan::new(state_span)),
419+
waiting_on_peer_span: None,
420+
waiting_on_monitor_persist_span: None,
421+
waiting_on_async_signing_span: None,
422+
}
361423
}
362424
}
363425

@@ -1318,7 +1380,7 @@ pub(super) struct MonitorRestoreUpdates {
13181380
pub accepted_htlcs: Vec<(PendingHTLCInfo, u64)>,
13191381
pub failed_htlcs: Vec<(HTLCSource, PaymentHash, HTLCFailReason)>,
13201382
pub finalized_claimed_htlcs: Vec<(HTLCSource, Option<AttributionData>)>,
1321-
pub pending_update_adds: Vec<(msgs::UpdateAddHTLC, BoxedSpan)>,
1383+
pub pending_update_adds: Vec<(msgs::UpdateAddHTLC, BoxedSpan, BoxedSpan)>,
13221384
pub funding_broadcastable: Option<Transaction>,
13231385
pub channel_ready: Option<msgs::ChannelReady>,
13241386
pub announcement_sigs: Option<msgs::AnnouncementSignatures>,
@@ -2448,7 +2510,7 @@ where
24482510
monitor_pending_forwards: Vec<(PendingHTLCInfo, u64)>,
24492511
monitor_pending_failures: Vec<(HTLCSource, PaymentHash, HTLCFailReason)>,
24502512
monitor_pending_finalized_fulfills: Vec<(HTLCSource, Option<AttributionData>)>,
2451-
monitor_pending_update_adds: Vec<(msgs::UpdateAddHTLC, BoxedSpan)>,
2513+
monitor_pending_update_adds: Vec<(msgs::UpdateAddHTLC, BoxedSpan, BoxedSpan)>,
24522514
monitor_pending_tx_signatures: Option<msgs::TxSignatures>,
24532515

24542516
/// If we went to send a revoke_and_ack but our signer was unable to give us a signature,
@@ -6474,6 +6536,7 @@ where
64746536
)),
64756537
logger,
64766538
);
6539+
htlc.waiting_on_monitor_persist(logger);
64776540
}
64786541

64796542
UpdateFulfillFetch::NewClaim { monitor_update, htlc_value_msat, update_blocked: false }
@@ -6785,7 +6848,7 @@ where
67856848

67866849
// Now update local state:
67876850
self.context.next_counterparty_htlc_id += 1;
6788-
self.context.pending_inbound_htlcs.push(InboundHTLCOutput::new(
6851+
let mut output = InboundHTLCOutput::new(
67896852
self.context.channel_id(),
67906853
InboundHTLCOutputParams {
67916854
htlc_id: msg.htlc_id,
@@ -6797,7 +6860,9 @@ where
67976860
}),
67986861
},
67996862
logger,
6800-
));
6863+
);
6864+
output.waiting_on_peer(logger);
6865+
self.context.pending_inbound_htlcs.push(output);
68016866

68026867
Ok(())
68036868
}
@@ -7213,6 +7278,9 @@ where
72137278
InboundHTLCState::AwaitingRemoteRevokeToAnnounce(htlc_resolution.clone()),
72147279
logger,
72157280
);
7281+
if self.context.channel_state.is_awaiting_remote_revoke() {
7282+
htlc.waiting_on_peer(logger);
7283+
}
72167284
need_commitment = true;
72177285
}
72187286
}
@@ -7650,7 +7718,7 @@ where
76507718
&self.context.channel_id()
76517719
);
76527720
let mut to_forward_infos = Vec::new();
7653-
let mut pending_update_adds = Vec::<(msgs::UpdateAddHTLC, BoxedSpan)>::new();
7721+
let mut pending_update_adds = Vec::<(msgs::UpdateAddHTLC, BoxedSpan, BoxedSpan)>::new();
76547722
let mut revoked_htlcs = Vec::new();
76557723
let mut finalized_claimed_htlcs = Vec::new();
76567724
let mut update_fail_htlcs = Vec::new();
@@ -7727,6 +7795,7 @@ where
77277795
InboundHTLCState::AwaitingAnnouncedRemoteRevoke(resolution),
77287796
logger,
77297797
);
7798+
htlc.waiting_on_monitor_persist(logger);
77307799
require_commitment = true;
77317800
} else if let InboundHTLCState::AwaitingAnnouncedRemoteRevoke(resolution) =
77327801
state
@@ -7762,6 +7831,7 @@ where
77627831
update_fail_malformed_htlcs.push(msg)
77637832
},
77647833
}
7834+
htlc.waiting_on_monitor_persist(logger);
77657835
},
77667836
PendingHTLCStatus::Forward(forward_info) => {
77677837
log_trace!(logger, " ...promoting inbound AwaitingAnnouncedRemoteRevoke {} to Committed, attempting to forward", &htlc.payment_hash);
@@ -7773,10 +7843,18 @@ where
77737843
InboundHTLCResolution::Pending { update_add_htlc } => {
77747844
log_trace!(logger, " ...promoting inbound AwaitingAnnouncedRemoteRevoke {} to Committed", &htlc.payment_hash);
77757845
htlc.set_state(InboundHTLCState::Committed, logger);
7776-
let forward_span =
7777-
logger.start(Span::Forward, htlc.span.as_user_span_ref::<L>());
7778-
pending_update_adds
7779-
.push((update_add_htlc, BoxedSpan::new(forward_span)));
7846+
let forward_span = BoxedSpan::new(
7847+
logger.start(Span::Forward, htlc.span.as_user_span_ref::<L>()),
7848+
);
7849+
let waiting_on_persist_span = BoxedSpan::new(logger.start(
7850+
Span::WaitingOnMonitorPersist,
7851+
forward_span.as_user_span_ref::<L>(),
7852+
));
7853+
pending_update_adds.push((
7854+
update_add_htlc,
7855+
waiting_on_persist_span,
7856+
forward_span,
7857+
));
77807858
},
77817859
}
77827860
}
@@ -8267,7 +8345,11 @@ where
82678345
let mut finalized_claimed_htlcs = Vec::new();
82688346
mem::swap(&mut finalized_claimed_htlcs, &mut self.context.monitor_pending_finalized_fulfills);
82698347
let mut pending_update_adds = Vec::new();
8270-
mem::swap(&mut pending_update_adds, &mut self.context.monitor_pending_update_adds);
8348+
for (msg, waiting_on_persist_span, forward_span) in self.context.monitor_pending_update_adds.drain(..) {
8349+
mem::drop(waiting_on_persist_span);
8350+
let waiting_on_forward_span = BoxedSpan::new(logger.start(Span::WaitingOnForward, forward_span.as_user_span_ref::<L>()));
8351+
pending_update_adds.push((msg, waiting_on_forward_span, forward_span));
8352+
}
82718353
// For channels established with V2 establishment we won't send a `tx_signatures` when we're in
82728354
// MonitorUpdateInProgress (and we assume the user will never directly broadcast the funding
82738355
// transaction and waits for us to do it).
@@ -8307,6 +8389,23 @@ where
83078389
commitment_update = None;
83088390
}
83098391

8392+
for htlc in self.context.pending_inbound_htlcs.iter_mut() {
8393+
match htlc.state() {
8394+
InboundHTLCState::AwaitingAnnouncedRemoteRevoke(_) |
8395+
InboundHTLCState::LocalRemoved(_) => {
8396+
if htlc.is_waiting_on_monitor_persist() {
8397+
htlc.monitor_persisted();
8398+
if commitment_update.is_some() {
8399+
htlc.waiting_on_peer(logger);
8400+
} else if self.context.signer_pending_commitment_update {
8401+
htlc.waiting_on_async_signing(logger);
8402+
}
8403+
}
8404+
},
8405+
_ => {},
8406+
}
8407+
}
8408+
83108409
self.context.monitor_pending_revoke_and_ack = false;
83118410
self.context.monitor_pending_commitment_signed = false;
83128411
let order = self.context.resend_order.clone();
@@ -8450,6 +8549,19 @@ where
84508549
} else { (None, None, None) }
84518550
} else { (None, None, None) };
84528551

8552+
for htlc in self.context.pending_inbound_htlcs.iter_mut() {
8553+
match htlc.state() {
8554+
InboundHTLCState::AwaitingAnnouncedRemoteRevoke(_) |
8555+
InboundHTLCState::LocalRemoved(_) => {
8556+
if htlc.is_waiting_on_async_signing() && commitment_update.is_some() {
8557+
htlc.async_signing_completed();
8558+
htlc.waiting_on_peer(logger);
8559+
}
8560+
}
8561+
_ => {},
8562+
}
8563+
}
8564+
84538565
log_trace!(logger, "Signer unblocked with {} commitment_update, {} revoke_and_ack, with resend order {:?}, {} funding_signed, {} channel_ready,
84548566
{} closing_signed, {} signed_closing_tx, and {} shutdown result",
84558567
if commitment_update.is_some() { "a" } else { "no" },
@@ -10959,6 +11071,7 @@ where
1095911071
if let Some(state) = new_state {
1096011072
log_trace!(logger, " ...promoting inbound AwaitingRemoteRevokeToAnnounce {} to AwaitingAnnouncedRemoteRevoke", &htlc.payment_hash);
1096111073
htlc.set_state(state, logger);
11074+
htlc.waiting_on_monitor_persist(logger);
1096211075
}
1096311076
}
1096411077
for htlc in self.context.pending_outbound_htlcs.iter_mut() {
@@ -13704,8 +13817,11 @@ where
1370413817
.unwrap_or_default()
1370513818
.into_iter()
1370613819
.map(|msg| {
13707-
let span = BoxedSpan::new(logger.start(Span::Forward, None));
13708-
(msg, span)
13820+
let forward_span = BoxedSpan::new(logger.start(Span::Forward, None));
13821+
let waiting_span = BoxedSpan::new(
13822+
logger.start(Span::WaitingOnForward, forward_span.as_user_span_ref::<L>()),
13823+
);
13824+
(msg, waiting_span, forward_span)
1370913825
})
1371013826
.collect::<Vec<_>>();
1371113827

lightning/src/ln/channelmanager.rs

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2583,7 +2583,7 @@ pub struct ChannelManager<
25832583
/// `short_channel_id` here, nor the `channel_id` in `UpdateAddHTLC`!
25842584
///
25852585
/// See `ChannelManager` struct-level documentation for lock order requirements.
2586-
decode_update_add_htlcs: Mutex<HashMap<u64, Vec<(msgs::UpdateAddHTLC, BoxedSpan)>>>,
2586+
decode_update_add_htlcs: Mutex<HashMap<u64, Vec<(msgs::UpdateAddHTLC, BoxedSpan, BoxedSpan)>>>,
25872587

25882588
/// The sets of payments which are claimable or currently being claimed. See
25892589
/// [`ClaimablePayments`]' individual field docs for more info.
@@ -6245,7 +6245,7 @@ where
62456245

62466246
let mut htlc_forwards = Vec::new();
62476247
let mut htlc_fails = Vec::new();
6248-
for (update_add_htlc, forward_span) in update_add_htlcs {
6248+
for (update_add_htlc, _waiting_on_forward_span, forward_span) in update_add_htlcs {
62496249
let (next_hop, next_packet_details_opt) =
62506250
match decode_incoming_update_add_htlc_onion(
62516251
&update_add_htlc,
@@ -8787,11 +8787,11 @@ This indicates a bug inside LDK. Please report this error at https://github.com/
87878787
fn handle_channel_resumption(&self, pending_msg_events: &mut Vec<MessageSendEvent>,
87888788
channel: &mut FundedChannel<SP>, raa: Option<msgs::RevokeAndACK>,
87898789
commitment_update: Option<msgs::CommitmentUpdate>, order: RAACommitmentOrder,
8790-
pending_forwards: Vec<(PendingHTLCInfo, u64)>, pending_update_adds: Vec<(msgs::UpdateAddHTLC, BoxedSpan)>,
8790+
pending_forwards: Vec<(PendingHTLCInfo, u64)>, pending_update_adds: Vec<(msgs::UpdateAddHTLC, BoxedSpan, BoxedSpan)>,
87918791
funding_broadcastable: Option<Transaction>,
87928792
channel_ready: Option<msgs::ChannelReady>, announcement_sigs: Option<msgs::AnnouncementSignatures>,
87938793
tx_signatures: Option<msgs::TxSignatures>, tx_abort: Option<msgs::TxAbort>,
8794-
) -> (Option<(u64, Option<PublicKey>, OutPoint, ChannelId, u128, Vec<(PendingHTLCInfo, u64)>)>, Option<(u64, Vec<(msgs::UpdateAddHTLC, BoxedSpan)>)>) {
8794+
) -> (Option<(u64, Option<PublicKey>, OutPoint, ChannelId, u128, Vec<(PendingHTLCInfo, u64)>)>, Option<(u64, Vec<(msgs::UpdateAddHTLC, BoxedSpan, BoxedSpan)>)>) {
87958795
let logger = WithChannelContext::from(&self.logger, &channel.context, None);
87968796
log_trace!(logger, "Handling channel resumption for channel {} with {} RAA, {} commitment update, {} pending forwards, {} pending update_add_htlcs, {}broadcasting funding, {} channel ready, {} announcement, {} tx_signatures, {} tx_abort",
87978797
&channel.context.channel_id(),
@@ -10360,7 +10360,7 @@ This indicates a bug inside LDK. Please report this error at https://github.com/
1036010360
}
1036110361

1036210362
fn push_decode_update_add_htlcs(
10363-
&self, mut update_add_htlcs: (u64, Vec<(msgs::UpdateAddHTLC, BoxedSpan)>),
10363+
&self, mut update_add_htlcs: (u64, Vec<(msgs::UpdateAddHTLC, BoxedSpan, BoxedSpan)>),
1036410364
) {
1036510365
let mut decode_update_add_htlcs = self.decode_update_add_htlcs.lock().unwrap();
1036610366
let scid = update_add_htlcs.0;
@@ -15143,7 +15143,7 @@ where
1514315143
if !decode_update_add_htlcs.is_empty() {
1514415144
let mut without_spans = new_hash_map();
1514515145
for (scid, htlcs) in decode_update_add_htlcs.iter() {
15146-
without_spans.insert(scid, htlcs.iter().map(|(msg, _span)| msg).collect::<Vec<_>>());
15146+
without_spans.insert(scid, htlcs.iter().map(|(msg, _span1, _span2)| msg).collect::<Vec<_>>());
1514715147
}
1514815148
decode_update_add_htlcs_opt = Some(without_spans);
1514915149
}
@@ -16835,8 +16835,13 @@ where
1683516835
htlcs
1683616836
.into_iter()
1683716837
.map(|htlc| {
16838-
let span = BoxedSpan::new(args.logger.start(Span::Forward, None));
16839-
(htlc, span)
16838+
let forward_span =
16839+
BoxedSpan::new(args.logger.start(Span::Forward, None));
16840+
let waiting_on_forward_span = BoxedSpan::new(args.logger.start(
16841+
Span::WaitingOnForward,
16842+
forward_span.as_user_span_ref::<L>(),
16843+
));
16844+
(htlc, waiting_on_forward_span, forward_span)
1684016845
})
1684116846
.collect::<Vec<_>>(),
1684216847
)

0 commit comments

Comments
 (0)