Skip to content
Merged
164 changes: 52 additions & 112 deletions lightning/src/ln/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -488,13 +488,13 @@ enum UpdateFulfillFetch {
}

/// The return type of get_update_fulfill_htlc_and_commit.
pub enum UpdateFulfillCommitFetch<'a> {
pub enum UpdateFulfillCommitFetch {
/// Indicates the HTLC fulfill is new, and either generated an update_fulfill message, placed
/// it in the holding cell, or re-generated the update_fulfill message after the same claim was
/// previously placed in the holding cell (and has since been removed).
NewClaim {
/// The ChannelMonitorUpdate which places the new payment preimage in the channel monitor
monitor_update: &'a ChannelMonitorUpdate,
monitor_update: ChannelMonitorUpdate,
/// The value of the HTLC which was claimed, in msat.
htlc_value_msat: u64,
},
Expand Down Expand Up @@ -588,17 +588,10 @@ pub(crate) const DISCONNECT_PEER_AWAITING_RESPONSE_TICKS: usize = 2;

struct PendingChannelMonitorUpdate {
update: ChannelMonitorUpdate,
/// In some cases we need to delay letting the [`ChannelMonitorUpdate`] go until after an
/// `Event` is processed by the user. This bool indicates the [`ChannelMonitorUpdate`] is
/// blocked on some external event and the [`ChannelManager`] will update us when we're ready.
///
/// [`ChannelManager`]: super::channelmanager::ChannelManager
blocked: bool,
}

impl_writeable_tlv_based!(PendingChannelMonitorUpdate, {
(0, update, required),
(2, blocked, required),
});

/// Contains everything about the channel including state, and various flags.
Expand Down Expand Up @@ -869,11 +862,9 @@ pub(super) struct ChannelContext<Signer: ChannelSigner> {
/// [`SignerProvider::derive_channel_signer`].
channel_keys_id: [u8; 32],

/// When we generate [`ChannelMonitorUpdate`]s to persist, they may not be persisted immediately.
/// If we then persist the [`channelmanager::ChannelManager`] and crash before the persistence
/// completes we still need to be able to complete the persistence. Thus, we have to keep a
/// copy of the [`ChannelMonitorUpdate`] here until it is complete.
pending_monitor_updates: Vec<PendingChannelMonitorUpdate>,
/// If we can't release a [`ChannelMonitorUpdate`] until some external action completes, we
/// store it here and only release it to the `ChannelManager` once it asks for it.
blocked_monitor_updates: Vec<PendingChannelMonitorUpdate>,
}

impl<Signer: ChannelSigner> ChannelContext<Signer> {
Expand Down Expand Up @@ -2264,51 +2255,38 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
}

pub fn get_update_fulfill_htlc_and_commit<L: Deref>(&mut self, htlc_id: u64, payment_preimage: PaymentPreimage, logger: &L) -> UpdateFulfillCommitFetch where L::Target: Logger {
let release_cs_monitor = self.context.pending_monitor_updates.iter().all(|upd| !upd.blocked);
let release_cs_monitor = self.context.blocked_monitor_updates.is_empty();
match self.get_update_fulfill_htlc(htlc_id, payment_preimage, logger) {
UpdateFulfillFetch::NewClaim { mut monitor_update, htlc_value_msat, msg } => {
// Even if we aren't supposed to let new monitor updates with commitment state
// updates run, we still need to push the preimage ChannelMonitorUpdateStep no
// matter what. Sadly, to push a new monitor update which flies before others
// already queued, we have to insert it into the pending queue and update the
// update_ids of all the following monitors.
let unblocked_update_pos = if release_cs_monitor && msg.is_some() {
if release_cs_monitor && msg.is_some() {
let mut additional_update = self.build_commitment_no_status_check(logger);
// build_commitment_no_status_check may bump latest_monitor_id but we want them
// to be strictly increasing by one, so decrement it here.
self.context.latest_monitor_update_id = monitor_update.update_id;
monitor_update.updates.append(&mut additional_update.updates);
self.context.pending_monitor_updates.push(PendingChannelMonitorUpdate {
update: monitor_update, blocked: false,
});
self.context.pending_monitor_updates.len() - 1
} else {
let insert_pos = self.context.pending_monitor_updates.iter().position(|upd| upd.blocked)
.unwrap_or(self.context.pending_monitor_updates.len());
let new_mon_id = self.context.pending_monitor_updates.get(insert_pos)
let new_mon_id = self.context.blocked_monitor_updates.get(0)
.map(|upd| upd.update.update_id).unwrap_or(monitor_update.update_id);
monitor_update.update_id = new_mon_id;
self.context.pending_monitor_updates.insert(insert_pos, PendingChannelMonitorUpdate {
update: monitor_update, blocked: false,
});
for held_update in self.context.pending_monitor_updates.iter_mut().skip(insert_pos + 1) {
for held_update in self.context.blocked_monitor_updates.iter_mut() {
held_update.update.update_id += 1;
}
if msg.is_some() {
debug_assert!(false, "If there is a pending blocked monitor we should have MonitorUpdateInProgress set");
let update = self.build_commitment_no_status_check(logger);
self.context.pending_monitor_updates.push(PendingChannelMonitorUpdate {
update, blocked: true,
self.context.blocked_monitor_updates.push(PendingChannelMonitorUpdate {
update,
});
}
insert_pos
};
self.monitor_updating_paused(false, msg.is_some(), false, Vec::new(), Vec::new(), Vec::new());
UpdateFulfillCommitFetch::NewClaim {
monitor_update: &self.context.pending_monitor_updates.get(unblocked_update_pos)
.expect("We just pushed the monitor update").update,
htlc_value_msat,
}

self.monitor_updating_paused(false, msg.is_some(), false, Vec::new(), Vec::new(), Vec::new());
UpdateFulfillCommitFetch::NewClaim { monitor_update, htlc_value_msat, }
},
UpdateFulfillFetch::DuplicateClaim {} => UpdateFulfillCommitFetch::DuplicateClaim {},
}
Expand Down Expand Up @@ -2798,7 +2776,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
Ok(())
}

pub fn commitment_signed<L: Deref>(&mut self, msg: &msgs::CommitmentSigned, logger: &L) -> Result<Option<&ChannelMonitorUpdate>, ChannelError>
pub fn commitment_signed<L: Deref>(&mut self, msg: &msgs::CommitmentSigned, logger: &L) -> Result<Option<ChannelMonitorUpdate>, ChannelError>
where L::Target: Logger
{
if (self.context.channel_state & (ChannelState::ChannelReady as u32)) != (ChannelState::ChannelReady as u32) {
Expand Down Expand Up @@ -3022,7 +3000,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
/// Public version of the below, checking relevant preconditions first.
/// If we're not in a state where freeing the holding cell makes sense, this is a no-op and
/// returns `(None, Vec::new())`.
pub fn maybe_free_holding_cell_htlcs<L: Deref>(&mut self, logger: &L) -> (Option<&ChannelMonitorUpdate>, Vec<(HTLCSource, PaymentHash)>) where L::Target: Logger {
pub fn maybe_free_holding_cell_htlcs<L: Deref>(&mut self, logger: &L) -> (Option<ChannelMonitorUpdate>, Vec<(HTLCSource, PaymentHash)>) where L::Target: Logger {
if self.context.channel_state >= ChannelState::ChannelReady as u32 &&
(self.context.channel_state & (ChannelState::AwaitingRemoteRevoke as u32 | ChannelState::PeerDisconnected as u32 | ChannelState::MonitorUpdateInProgress as u32)) == 0 {
self.free_holding_cell_htlcs(logger)
Expand All @@ -3031,7 +3009,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {

/// Frees any pending commitment updates in the holding cell, generating the relevant messages
/// for our counterparty.
fn free_holding_cell_htlcs<L: Deref>(&mut self, logger: &L) -> (Option<&ChannelMonitorUpdate>, Vec<(HTLCSource, PaymentHash)>) where L::Target: Logger {
fn free_holding_cell_htlcs<L: Deref>(&mut self, logger: &L) -> (Option<ChannelMonitorUpdate>, Vec<(HTLCSource, PaymentHash)>) where L::Target: Logger {
assert_eq!(self.context.channel_state & ChannelState::MonitorUpdateInProgress as u32, 0);
if self.context.holding_cell_htlc_updates.len() != 0 || self.context.holding_cell_update_fee.is_some() {
log_trace!(logger, "Freeing holding cell with {} HTLC updates{} in channel {}", self.context.holding_cell_htlc_updates.len(),
Expand Down Expand Up @@ -3147,7 +3125,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
/// waiting on this revoke_and_ack. The generation of this new commitment_signed may also fail,
/// generating an appropriate error *after* the channel state has been updated based on the
/// revoke_and_ack message.
pub fn revoke_and_ack<L: Deref>(&mut self, msg: &msgs::RevokeAndACK, logger: &L) -> Result<(Vec<(HTLCSource, PaymentHash)>, Option<&ChannelMonitorUpdate>), ChannelError>
pub fn revoke_and_ack<L: Deref>(&mut self, msg: &msgs::RevokeAndACK, logger: &L) -> Result<(Vec<(HTLCSource, PaymentHash)>, Option<ChannelMonitorUpdate>), ChannelError>
where L::Target: Logger,
{
if (self.context.channel_state & (ChannelState::ChannelReady as u32)) != (ChannelState::ChannelReady as u32) {
Expand Down Expand Up @@ -3349,8 +3327,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
}

match self.free_holding_cell_htlcs(logger) {
(Some(_), htlcs_to_fail) => {
let mut additional_update = self.context.pending_monitor_updates.pop().unwrap().update;
(Some(mut additional_update), htlcs_to_fail) => {
// free_holding_cell_htlcs may bump latest_monitor_id multiple times but we want them to be
// strictly increasing by one, so decrement it here.
self.context.latest_monitor_update_id = monitor_update.update_id;
Expand Down Expand Up @@ -3566,12 +3543,6 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
{
assert_eq!(self.context.channel_state & ChannelState::MonitorUpdateInProgress as u32, ChannelState::MonitorUpdateInProgress as u32);
self.context.channel_state &= !(ChannelState::MonitorUpdateInProgress as u32);
let mut found_blocked = false;
self.context.pending_monitor_updates.retain(|upd| {
if found_blocked { debug_assert!(upd.blocked, "No mons may be unblocked after a blocked one"); }
if upd.blocked { found_blocked = true; }
upd.blocked
});

// If we're past (or at) the FundingSent stage on an outbound channel, try to
// (re-)broadcast the funding transaction as we may have declined to broadcast it when we
Expand Down Expand Up @@ -4075,7 +4046,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {

pub fn shutdown<SP: Deref>(
&mut self, signer_provider: &SP, their_features: &InitFeatures, msg: &msgs::Shutdown
) -> Result<(Option<msgs::Shutdown>, Option<&ChannelMonitorUpdate>, Vec<(HTLCSource, PaymentHash)>), ChannelError>
) -> Result<(Option<msgs::Shutdown>, Option<ChannelMonitorUpdate>, Vec<(HTLCSource, PaymentHash)>), ChannelError>
where SP::Target: SignerProvider
{
if self.context.channel_state & (ChannelState::PeerDisconnected as u32) == ChannelState::PeerDisconnected as u32 {
Expand Down Expand Up @@ -4141,9 +4112,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
}],
};
self.monitor_updating_paused(false, false, false, Vec::new(), Vec::new(), Vec::new());
if self.push_blockable_mon_update(monitor_update) {
self.context.pending_monitor_updates.last().map(|upd| &upd.update)
} else { None }
self.push_ret_blockable_mon_update(monitor_update)
} else { None };
let shutdown = if send_shutdown {
Some(msgs::Shutdown {
Expand Down Expand Up @@ -4433,64 +4402,37 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
(self.context.channel_state & ChannelState::MonitorUpdateInProgress as u32) != 0
}

pub fn get_latest_complete_monitor_update_id(&self) -> u64 {
if self.context.pending_monitor_updates.is_empty() { return self.context.get_latest_monitor_update_id(); }
self.context.pending_monitor_updates[0].update.update_id - 1
/// Gets the latest [`ChannelMonitorUpdate`] ID which has been released and is in-flight.
pub fn get_latest_unblocked_monitor_update_id(&self) -> u64 {
if self.context.blocked_monitor_updates.is_empty() { return self.context.get_latest_monitor_update_id(); }
self.context.blocked_monitor_updates[0].update.update_id - 1
}

/// Returns the next blocked monitor update, if one exists, and a bool which indicates a
/// further blocked monitor update exists after the next.
pub fn unblock_next_blocked_monitor_update(&mut self) -> Option<(&ChannelMonitorUpdate, bool)> {
for i in 0..self.context.pending_monitor_updates.len() {
if self.context.pending_monitor_updates[i].blocked {
self.context.pending_monitor_updates[i].blocked = false;
return Some((&self.context.pending_monitor_updates[i].update,
self.context.pending_monitor_updates.len() > i + 1));
}
}
None
pub fn unblock_next_blocked_monitor_update(&mut self) -> Option<(ChannelMonitorUpdate, bool)> {
if self.context.blocked_monitor_updates.is_empty() { return None; }
Some((self.context.blocked_monitor_updates.remove(0).update,
!self.context.blocked_monitor_updates.is_empty()))
}

/// Pushes a new monitor update into our monitor update queue, returning whether it should be
/// immediately given to the user for persisting or if it should be held as blocked.
fn push_blockable_mon_update(&mut self, update: ChannelMonitorUpdate) -> bool {
let release_monitor = self.context.pending_monitor_updates.iter().all(|upd| !upd.blocked);
self.context.pending_monitor_updates.push(PendingChannelMonitorUpdate {
update, blocked: !release_monitor
});
release_monitor
}

/// Pushes a new monitor update into our monitor update queue, returning a reference to it if
/// it should be immediately given to the user for persisting or `None` if it should be held as
/// blocked.
/// Pushes a new monitor update into our monitor update queue, returning it if it should be
/// immediately given to the user for persisting or `None` if it should be held as blocked.
fn push_ret_blockable_mon_update(&mut self, update: ChannelMonitorUpdate)
-> Option<&ChannelMonitorUpdate> {
let release_monitor = self.push_blockable_mon_update(update);
if release_monitor { self.context.pending_monitor_updates.last().map(|upd| &upd.update) } else { None }
}

pub fn no_monitor_updates_pending(&self) -> bool {
self.context.pending_monitor_updates.is_empty()
}

pub fn complete_all_mon_updates_through(&mut self, update_id: u64) {
self.context.pending_monitor_updates.retain(|upd| {
if upd.update.update_id <= update_id {
assert!(!upd.blocked, "Completed update must have flown");
false
} else { true }
});
}

pub fn complete_one_mon_update(&mut self, update_id: u64) {
self.context.pending_monitor_updates.retain(|upd| upd.update.update_id != update_id);
-> Option<ChannelMonitorUpdate> {
let release_monitor = self.context.blocked_monitor_updates.is_empty();
if !release_monitor {
self.context.blocked_monitor_updates.push(PendingChannelMonitorUpdate {
update,
});
None
} else {
Some(update)
}
}

/// Returns an iterator over all unblocked monitor updates which have not yet completed.
pub fn uncompleted_unblocked_mon_updates(&self) -> impl Iterator<Item=&ChannelMonitorUpdate> {
self.context.pending_monitor_updates.iter()
.filter_map(|upd| if upd.blocked { None } else { Some(&upd.update) })
pub fn blocked_monitor_updates_pending(&self) -> usize {
self.context.blocked_monitor_updates.len()
}

/// Returns true if the channel is awaiting the persistence of the initial ChannelMonitor.
Expand Down Expand Up @@ -5302,7 +5244,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
pub fn send_htlc_and_commit<L: Deref>(
&mut self, amount_msat: u64, payment_hash: PaymentHash, cltv_expiry: u32, source: HTLCSource,
onion_routing_packet: msgs::OnionPacket, skimmed_fee_msat: Option<u64>, logger: &L
) -> Result<Option<&ChannelMonitorUpdate>, ChannelError> where L::Target: Logger {
) -> Result<Option<ChannelMonitorUpdate>, ChannelError> where L::Target: Logger {
let send_res = self.send_htlc(amount_msat, payment_hash, cltv_expiry, source,
onion_routing_packet, false, skimmed_fee_msat, logger);
if let Err(e) = &send_res { if let ChannelError::Ignore(_) = e {} else { debug_assert!(false, "Sending cannot trigger channel failure"); } }
Expand Down Expand Up @@ -5336,7 +5278,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
/// [`ChannelMonitorUpdate`] will be returned).
pub fn get_shutdown<SP: Deref>(&mut self, signer_provider: &SP, their_features: &InitFeatures,
target_feerate_sats_per_kw: Option<u32>, override_shutdown_script: Option<ShutdownScript>)
-> Result<(msgs::Shutdown, Option<&ChannelMonitorUpdate>, Vec<(HTLCSource, PaymentHash)>), APIError>
-> Result<(msgs::Shutdown, Option<ChannelMonitorUpdate>, Vec<(HTLCSource, PaymentHash)>), APIError>
where SP::Target: SignerProvider {
for htlc in self.context.pending_outbound_htlcs.iter() {
if let OutboundHTLCState::LocalAnnounced(_) = htlc.state {
Expand Down Expand Up @@ -5407,9 +5349,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
}],
};
self.monitor_updating_paused(false, false, false, Vec::new(), Vec::new(), Vec::new());
if self.push_blockable_mon_update(monitor_update) {
self.context.pending_monitor_updates.last().map(|upd| &upd.update)
} else { None }
self.push_ret_blockable_mon_update(monitor_update)
} else { None };
let shutdown = msgs::Shutdown {
channel_id: self.context.channel_id,
Expand Down Expand Up @@ -5648,7 +5588,7 @@ impl<Signer: WriteableEcdsaChannelSigner> OutboundV1Channel<Signer> {
channel_type,
channel_keys_id,

pending_monitor_updates: Vec::new(),
blocked_monitor_updates: Vec::new(),
}
})
}
Expand Down Expand Up @@ -6278,7 +6218,7 @@ impl<Signer: WriteableEcdsaChannelSigner> InboundV1Channel<Signer> {
channel_type,
channel_keys_id,

pending_monitor_updates: Vec::new(),
blocked_monitor_updates: Vec::new(),
}
};

Expand Down Expand Up @@ -6864,7 +6804,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Writeable for Channel<Signer> {
(28, holder_max_accepted_htlcs, option),
(29, self.context.temporary_channel_id, option),
(31, channel_pending_event_emitted, option),
(33, self.context.pending_monitor_updates, vec_type),
(33, self.context.blocked_monitor_updates, vec_type),
(35, pending_outbound_skimmed_fees, optional_vec),
(37, holding_cell_skimmed_fees, optional_vec),
});
Expand Down Expand Up @@ -7145,7 +7085,7 @@ impl<'a, 'b, 'c, ES: Deref, SP: Deref> ReadableArgs<(&'a ES, &'b SP, u32, &'c Ch
let mut temporary_channel_id: Option<[u8; 32]> = None;
let mut holder_max_accepted_htlcs: Option<u16> = None;

let mut pending_monitor_updates = Some(Vec::new());
let mut blocked_monitor_updates = Some(Vec::new());

let mut pending_outbound_skimmed_fees_opt: Option<Vec<Option<u64>>> = None;
let mut holding_cell_skimmed_fees_opt: Option<Vec<Option<u64>>> = None;
Expand All @@ -7172,7 +7112,7 @@ impl<'a, 'b, 'c, ES: Deref, SP: Deref> ReadableArgs<(&'a ES, &'b SP, u32, &'c Ch
(28, holder_max_accepted_htlcs, option),
(29, temporary_channel_id, option),
(31, channel_pending_event_emitted, option),
(33, pending_monitor_updates, vec_type),
(33, blocked_monitor_updates, vec_type),
(35, pending_outbound_skimmed_fees_opt, optional_vec),
(37, holding_cell_skimmed_fees_opt, optional_vec),
});
Expand Down Expand Up @@ -7365,7 +7305,7 @@ impl<'a, 'b, 'c, ES: Deref, SP: Deref> ReadableArgs<(&'a ES, &'b SP, u32, &'c Ch
channel_type: channel_type.unwrap(),
channel_keys_id,

pending_monitor_updates: pending_monitor_updates.unwrap(),
blocked_monitor_updates: blocked_monitor_updates.unwrap(),
}
})
}
Expand Down
Loading