From: Matt Corallo Date: Wed, 22 Feb 2023 02:40:59 +0000 (+0000) Subject: Track claimed outbound HTLCs in ChannelMonitors X-Git-Tag: v0.0.114~8^2~1 X-Git-Url: http://git.bitcoin.ninja/?a=commitdiff_plain;h=0ad1f4c943bdc9037d0c43d1b74c745befa065f0;p=rust-lightning Track claimed outbound HTLCs in ChannelMonitors When we receive an update_fulfill_htlc message, we immediately try to "claim" the HTLC against the HTLCSource. If there is one, this works great, we immediately generate a `ChannelMonitorUpdate` for the corresponding inbound HTLC and persist that before we ever get to processing our counterparty's `commitment_signed` and persisting the corresponding `ChannelMonitorUpdate`. However, if there isn't one (and this is the first successful HTLC for a payment we sent), we immediately generate a `PaymentSent` event and queue it up for the user. Then, a millisecond later, we receive the `commitment_signed` from our peer, removing the HTLC from the latest local commitment transaction as a side-effect of the `ChannelMonitorUpdate` applied. If the user has processed the `PaymentSent` event by that point, great, we're done. However, if they have not, and we crash prior to persisting the `ChannelManager`, on startup we get confused about the state of the payment. We'll force-close the channel for being stale, and see an HTLC which was removed and is no longer present in the latest commitment transaction (which we're broadcasting). Because we claim corresponding inbound HTLCs before updating a `ChannelMonitor`, we assume such HTLCs have failed - attempting to fail after having claimed should be a noop. However, in the sent-payment case we now generate a `PaymentFailed` event for the user, allowing an HTLC to complete without giving the user a preimage. Here we address this issue by storing the payment preimages for claimed outbound HTLCs in the `ChannelMonitor`, in addition to the existing inbound HTLC preimages already stored there. This allows us to fix the specific issue described by checking for a preimage and switching the type of event generated in response. In addition, it reduces the risk of future confusion by ensuring we don't fail HTLCs which were claimed but not fully committed to before a crash. It does not, however, full fix the issue here - because the preimages are removed after the HTLC has been fully removed from available commitment transactions if we are substantially delayed in persisting the `ChannelManager` from the time we receive the `update_fulfill_htlc` until after a full commitment signed dance completes we may still hit this issue. The full fix for this issue is to delay the persistence of the `ChannelMonitorUpdate` until after the `PaymentSent` event has been processed. This avoids the issue entirely, ensuring we process the event before updating the `ChannelMonitor`, the same as we ensure the upstream HTLC has been claimed before updating the `ChannelMonitor` for forwarded payments. The full solution will be implemented in a later work, however this change still makes sense at that point as well - if we were to delay the initial `commitment_signed` `ChannelMonitorUpdate` util after the `PaymentSent` event has been processed (which likely requires a database update on the users' end), we'd hold our `commitment_signed` + `revoke_and_ack` response for two DB writes (i.e. `fsync()` calls), making our commitment transaction processing a full `fsync` slower. By making this change first, we can instead delay the `ChannelMonitorUpdate` from the counterparty's final `revoke_and_ack` message until the event has been processed, giving us a full network roundtrip to do so and avoiding delaying our response as long as an `fsync` is faster than a network roundtrip. --- diff --git a/lightning/src/chain/channelmonitor.rs b/lightning/src/chain/channelmonitor.rs index 9e3afec6f..b830a7417 100644 --- a/lightning/src/chain/channelmonitor.rs +++ b/lightning/src/chain/channelmonitor.rs @@ -37,7 +37,7 @@ use crate::ln::{PaymentHash, PaymentPreimage}; use crate::ln::msgs::DecodeError; use crate::ln::chan_utils; use crate::ln::chan_utils::{CounterpartyCommitmentSecrets, HTLCOutputInCommitment, HTLCClaim, ChannelTransactionParameters, HolderCommitmentTransaction}; -use crate::ln::channelmanager::HTLCSource; +use crate::ln::channelmanager::{HTLCSource, SentHTLCId}; use crate::chain; use crate::chain::{BestBlock, WatchedOutput}; use crate::chain::chaininterface::{BroadcasterInterface, FeeEstimator, LowerBoundedFeeEstimator}; @@ -494,6 +494,7 @@ pub(crate) enum ChannelMonitorUpdateStep { LatestHolderCommitmentTXInfo { commitment_tx: HolderCommitmentTransaction, htlc_outputs: Vec<(HTLCOutputInCommitment, Option, Option)>, + claimed_htlcs: Vec<(SentHTLCId, PaymentPreimage)>, }, LatestCounterpartyCommitmentTXInfo { commitment_txid: Txid, @@ -536,6 +537,7 @@ impl ChannelMonitorUpdateStep { impl_writeable_tlv_based_enum_upgradable!(ChannelMonitorUpdateStep, (0, LatestHolderCommitmentTXInfo) => { (0, commitment_tx, required), + (1, claimed_htlcs, vec_type), (2, htlc_outputs, vec_type), }, (1, LatestCounterpartyCommitmentTXInfo) => { @@ -750,6 +752,8 @@ pub(crate) struct ChannelMonitorImpl { /// Serialized to disk but should generally not be sent to Watchtowers. counterparty_hash_commitment_number: HashMap, + counterparty_fulfilled_htlcs: HashMap, + // We store two holder commitment transactions to avoid any race conditions where we may update // some monitors (potentially on watchtowers) but then fail to update others, resulting in the // various monitors for one channel being out of sync, and us broadcasting a holder @@ -1033,6 +1037,7 @@ impl Writeable for ChannelMonitorImpl ChannelMonitor { counterparty_claimable_outpoints: HashMap::new(), counterparty_commitment_txn_on_chain: HashMap::new(), counterparty_hash_commitment_number: HashMap::new(), + counterparty_fulfilled_htlcs: HashMap::new(), prev_holder_signed_commitment_tx: None, current_holder_commitment_tx: holder_commitment_tx, @@ -1174,7 +1180,7 @@ impl ChannelMonitor { &self, holder_commitment_tx: HolderCommitmentTransaction, htlc_outputs: Vec<(HTLCOutputInCommitment, Option, Option)>, ) -> Result<(), ()> { - self.inner.lock().unwrap().provide_latest_holder_commitment_tx(holder_commitment_tx, htlc_outputs).map_err(|_| ()) + self.inner.lock().unwrap().provide_latest_holder_commitment_tx(holder_commitment_tx, htlc_outputs, &Vec::new()).map_err(|_| ()) } /// This is used to provide payment preimage(s) out-of-band during startup without updating the @@ -1810,9 +1816,10 @@ impl ChannelMonitor { /// `ChannelMonitor`. This is used to determine if an HTLC was removed from the channel prior /// to the `ChannelManager` having been persisted. /// - /// This is similar to [`Self::get_pending_outbound_htlcs`] except it includes HTLCs which were - /// resolved by this `ChannelMonitor`. - pub(crate) fn get_all_current_outbound_htlcs(&self) -> HashMap { + /// This is similar to [`Self::get_pending_or_resolved_outbound_htlcs`] except it includes + /// HTLCs which were resolved on-chain (i.e. where the final HTLC resolution was done by an + /// event from this `ChannelMonitor`). + pub(crate) fn get_all_current_outbound_htlcs(&self) -> HashMap)> { let mut res = HashMap::new(); // Just examine the available counterparty commitment transactions. See docs on // `fail_unbroadcast_htlcs`, below, for justification. @@ -1822,7 +1829,8 @@ impl ChannelMonitor { if let Some(ref latest_outpoints) = us.counterparty_claimable_outpoints.get($txid) { for &(ref htlc, ref source_option) in latest_outpoints.iter() { if let &Some(ref source) = source_option { - res.insert((**source).clone(), htlc.clone()); + res.insert((**source).clone(), (htlc.clone(), + us.counterparty_fulfilled_htlcs.get(&SentHTLCId::from_source(source)).cloned())); } } } @@ -1837,9 +1845,14 @@ impl ChannelMonitor { res } - /// Gets the set of outbound HTLCs which are pending resolution in this channel. + /// Gets the set of outbound HTLCs which are pending resolution in this channel or which were + /// resolved with a preimage from our counterparty. + /// /// This is used to reconstruct pending outbound payments on restart in the ChannelManager. - pub(crate) fn get_pending_outbound_htlcs(&self) -> HashMap { + /// + /// Currently, the preimage is unused, however if it is present in the relevant internal state + /// an HTLC is always included even if it has been resolved. + pub(crate) fn get_pending_or_resolved_outbound_htlcs(&self) -> HashMap)> { let us = self.inner.lock().unwrap(); // We're only concerned with the confirmation count of HTLC transactions, and don't // actually care how many confirmations a commitment transaction may or may not have. Thus, @@ -1887,8 +1900,10 @@ impl ChannelMonitor { Some(commitment_tx_output_idx) == htlc.transaction_output_index } else { false } }); - if !htlc_update_confd { - res.insert(source.clone(), htlc.clone()); + let counterparty_resolved_preimage_opt = + us.counterparty_fulfilled_htlcs.get(&SentHTLCId::from_source(source)).cloned(); + if !htlc_update_confd || counterparty_resolved_preimage_opt.is_some() { + res.insert(source.clone(), (htlc.clone(), counterparty_resolved_preimage_opt)); } } } @@ -1970,6 +1985,9 @@ macro_rules! fail_unbroadcast_htlcs { } } if matched_htlc { continue; } + if $self.counterparty_fulfilled_htlcs.get(&SentHTLCId::from_source(source)).is_some() { + continue; + } $self.onchain_events_awaiting_threshold_conf.retain(|ref entry| { if entry.height != $commitment_tx_conf_height { return true; } match entry.event { @@ -2042,6 +2060,17 @@ impl ChannelMonitorImpl { // events for now-revoked/fulfilled HTLCs. if let Some(txid) = self.prev_counterparty_commitment_txid.take() { if self.current_counterparty_commitment_txid.unwrap() != txid { + let cur_claimables = self.counterparty_claimable_outpoints.get( + &self.current_counterparty_commitment_txid.unwrap()).unwrap(); + for (_, ref source_opt) in self.counterparty_claimable_outpoints.get(&txid).unwrap() { + if let Some(source) = source_opt { + if !cur_claimables.iter() + .any(|(_, cur_source_opt)| cur_source_opt == source_opt) + { + self.counterparty_fulfilled_htlcs.remove(&SentHTLCId::from_source(source)); + } + } + } for &mut (_, ref mut source_opt) in self.counterparty_claimable_outpoints.get_mut(&txid).unwrap() { *source_opt = None; } @@ -2131,7 +2160,7 @@ impl ChannelMonitorImpl { /// is important that any clones of this channel monitor (including remote clones) by kept /// up-to-date as our holder commitment transaction is updated. /// Panics if set_on_holder_tx_csv has never been called. - fn provide_latest_holder_commitment_tx(&mut self, holder_commitment_tx: HolderCommitmentTransaction, htlc_outputs: Vec<(HTLCOutputInCommitment, Option, Option)>) -> Result<(), &'static str> { + fn provide_latest_holder_commitment_tx(&mut self, holder_commitment_tx: HolderCommitmentTransaction, htlc_outputs: Vec<(HTLCOutputInCommitment, Option, Option)>, claimed_htlcs: &[(SentHTLCId, PaymentPreimage)]) -> Result<(), &'static str> { // block for Rust 1.34 compat let mut new_holder_commitment_tx = { let trusted_tx = holder_commitment_tx.trust(); @@ -2153,6 +2182,18 @@ impl ChannelMonitorImpl { self.onchain_tx_handler.provide_latest_holder_tx(holder_commitment_tx); mem::swap(&mut new_holder_commitment_tx, &mut self.current_holder_commitment_tx); self.prev_holder_signed_commitment_tx = Some(new_holder_commitment_tx); + for (claimed_htlc_id, claimed_preimage) in claimed_htlcs { + #[cfg(debug_assertions)] { + let cur_counterparty_htlcs = self.counterparty_claimable_outpoints.get( + &self.current_counterparty_commitment_txid.unwrap()).unwrap(); + assert!(cur_counterparty_htlcs.iter().any(|(_, source_opt)| { + if let Some(source) = source_opt { + SentHTLCId::from_source(source) == *claimed_htlc_id + } else { false } + })); + } + self.counterparty_fulfilled_htlcs.insert(*claimed_htlc_id, *claimed_preimage); + } if self.holder_tx_signed { return Err("Latest holder commitment signed has already been signed, update is rejected"); } @@ -2247,10 +2288,10 @@ impl ChannelMonitorImpl { let bounded_fee_estimator = LowerBoundedFeeEstimator::new(&*fee_estimator); for update in updates.updates.iter() { match update { - ChannelMonitorUpdateStep::LatestHolderCommitmentTXInfo { commitment_tx, htlc_outputs } => { + ChannelMonitorUpdateStep::LatestHolderCommitmentTXInfo { commitment_tx, htlc_outputs, claimed_htlcs } => { log_trace!(logger, "Updating ChannelMonitor with latest holder commitment transaction info"); if self.lockdown_from_offchain { panic!(); } - if let Err(e) = self.provide_latest_holder_commitment_tx(commitment_tx.clone(), htlc_outputs.clone()) { + if let Err(e) = self.provide_latest_holder_commitment_tx(commitment_tx.clone(), htlc_outputs.clone(), &claimed_htlcs) { log_error!(logger, "Providing latest holder commitment transaction failed/was refused:"); log_error!(logger, " {}", e); ret = Err(()); @@ -3872,6 +3913,7 @@ impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP let mut counterparty_node_id = None; let mut confirmed_commitment_tx_counterparty_output = None; let mut spendable_txids_confirmed = Some(Vec::new()); + let mut counterparty_fulfilled_htlcs = Some(HashMap::new()); read_tlv_fields!(reader, { (1, funding_spend_confirmed, option), (3, htlcs_resolved_on_chain, vec_type), @@ -3880,6 +3922,7 @@ impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP (9, counterparty_node_id, option), (11, confirmed_commitment_tx_counterparty_output, option), (13, spendable_txids_confirmed, vec_type), + (15, counterparty_fulfilled_htlcs, option), }); Ok((best_block.block_hash(), ChannelMonitor::from_impl(ChannelMonitorImpl { @@ -3908,6 +3951,7 @@ impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP counterparty_claimable_outpoints, counterparty_commitment_txn_on_chain, counterparty_hash_commitment_number, + counterparty_fulfilled_htlcs: counterparty_fulfilled_htlcs.unwrap(), prev_holder_signed_commitment_tx, current_holder_commitment_tx, @@ -4081,7 +4125,6 @@ mod tests { let fee_estimator = TestFeeEstimator { sat_per_kw: Mutex::new(253) }; let dummy_key = PublicKey::from_secret_key(&secp_ctx, &SecretKey::from_slice(&[42; 32]).unwrap()); - let dummy_tx = Transaction { version: 0, lock_time: PackedLockTime::ZERO, input: Vec::new(), output: Vec::new() }; let mut preimages = Vec::new(); { @@ -4171,7 +4214,6 @@ mod tests { HolderCommitmentTransaction::dummy(), best_block, dummy_key); monitor.provide_latest_holder_commitment_tx(HolderCommitmentTransaction::dummy(), preimages_to_holder_htlcs!(preimages[0..10])).unwrap(); - let dummy_txid = dummy_tx.txid(); monitor.provide_latest_counterparty_commitment_tx(Txid::from_inner(Sha256::hash(b"1").into_inner()), preimages_slice_to_htlc_outputs!(preimages[5..15]), 281474976710655, dummy_key, &logger); monitor.provide_latest_counterparty_commitment_tx(Txid::from_inner(Sha256::hash(b"2").into_inner()), diff --git a/lightning/src/ln/channel.rs b/lightning/src/ln/channel.rs index 2b9920aa2..842c91180 100644 --- a/lightning/src/ln/channel.rs +++ b/lightning/src/ln/channel.rs @@ -27,7 +27,7 @@ use crate::ln::features::{ChannelTypeFeatures, InitFeatures}; use crate::ln::msgs; use crate::ln::msgs::{DecodeError, OptionalField, DataLossProtect}; use crate::ln::script::{self, ShutdownScript}; -use crate::ln::channelmanager::{self, CounterpartyForwardingInfo, PendingHTLCStatus, HTLCSource, HTLCFailureMsg, PendingHTLCInfo, RAACommitmentOrder, BREAKDOWN_TIMEOUT, MIN_CLTV_EXPIRY_DELTA, MAX_LOCAL_BREAKDOWN_TIMEOUT}; +use crate::ln::channelmanager::{self, CounterpartyForwardingInfo, PendingHTLCStatus, HTLCSource, SentHTLCId, HTLCFailureMsg, PendingHTLCInfo, RAACommitmentOrder, BREAKDOWN_TIMEOUT, MIN_CLTV_EXPIRY_DELTA, MAX_LOCAL_BREAKDOWN_TIMEOUT}; use crate::ln::chan_utils::{CounterpartyCommitmentSecrets, TxCreationKeys, HTLCOutputInCommitment, htlc_success_tx_weight, htlc_timeout_tx_weight, make_funding_redeemscript, ChannelPublicKeys, CommitmentTransaction, HolderCommitmentTransaction, ChannelTransactionParameters, CounterpartyChannelTransactionParameters, MAX_HTLCS, get_commitment_transaction_number_obscure_factor, ClosingTransaction}; use crate::ln::chan_utils; use crate::ln::onion_utils::HTLCFailReason; @@ -192,6 +192,7 @@ enum OutboundHTLCState { #[derive(Clone)] enum OutboundHTLCOutcome { + /// LDK version 0.0.105+ will always fill in the preimage here. Success(Option), Failure(HTLCFailReason), } @@ -3159,15 +3160,6 @@ impl Channel { } } - self.latest_monitor_update_id += 1; - let mut monitor_update = ChannelMonitorUpdate { - update_id: self.latest_monitor_update_id, - updates: vec![ChannelMonitorUpdateStep::LatestHolderCommitmentTXInfo { - commitment_tx: holder_commitment_tx, - htlc_outputs: htlcs_and_sigs - }] - }; - for htlc in self.pending_inbound_htlcs.iter_mut() { let new_forward = if let &InboundHTLCState::RemoteAnnounced(ref forward_info) = &htlc.state { Some(forward_info.clone()) @@ -3179,6 +3171,7 @@ impl Channel { need_commitment = true; } } + let mut claimed_htlcs = Vec::new(); for htlc in self.pending_outbound_htlcs.iter_mut() { if let &mut OutboundHTLCState::RemoteRemoved(ref mut outcome) = &mut htlc.state { log_trace!(logger, "Updating HTLC {} to AwaitingRemoteRevokeToRemove due to commitment_signed in channel {}.", @@ -3186,11 +3179,30 @@ impl Channel { // Grab the preimage, if it exists, instead of cloning let mut reason = OutboundHTLCOutcome::Success(None); mem::swap(outcome, &mut reason); + if let OutboundHTLCOutcome::Success(Some(preimage)) = reason { + // If a user (a) receives an HTLC claim using LDK 0.0.104 or before, then (b) + // upgrades to LDK 0.0.114 or later before the HTLC is fully resolved, we could + // have a `Success(None)` reason. In this case we could forget some HTLC + // claims, but such an upgrade is unlikely and including claimed HTLCs here + // fixes a bug which the user was exposed to on 0.0.104 when they started the + // claim anyway. + claimed_htlcs.push((SentHTLCId::from_source(&htlc.source), preimage)); + } htlc.state = OutboundHTLCState::AwaitingRemoteRevokeToRemove(reason); need_commitment = true; } } + self.latest_monitor_update_id += 1; + let mut monitor_update = ChannelMonitorUpdate { + update_id: self.latest_monitor_update_id, + updates: vec![ChannelMonitorUpdateStep::LatestHolderCommitmentTXInfo { + commitment_tx: holder_commitment_tx, + htlc_outputs: htlcs_and_sigs, + claimed_htlcs, + }] + }; + self.cur_holder_commitment_transaction_number -= 1; // Note that if we need_commitment & !AwaitingRemoteRevoke we'll call // build_commitment_no_status_check() next which will reset this to RAAFirst. diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 0757e117c..06d679ab8 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -234,6 +234,36 @@ impl Readable for InterceptId { Ok(InterceptId(buf)) } } + +#[derive(Clone, Copy, PartialEq, Eq, Hash)] +/// Uniquely describes an HTLC by its source. Just the guaranteed-unique subset of [`HTLCSource`]. +pub(crate) enum SentHTLCId { + PreviousHopData { short_channel_id: u64, htlc_id: u64 }, + OutboundRoute { session_priv: SecretKey }, +} +impl SentHTLCId { + pub(crate) fn from_source(source: &HTLCSource) -> Self { + match source { + HTLCSource::PreviousHopData(hop_data) => Self::PreviousHopData { + short_channel_id: hop_data.short_channel_id, + htlc_id: hop_data.htlc_id, + }, + HTLCSource::OutboundRoute { session_priv, .. } => + Self::OutboundRoute { session_priv: *session_priv }, + } + } +} +impl_writeable_tlv_based_enum!(SentHTLCId, + (0, PreviousHopData) => { + (0, short_channel_id, required), + (2, htlc_id, required), + }, + (2, OutboundRoute) => { + (0, session_priv, required), + }; +); + + /// Tracks the inbound corresponding to an outbound HTLC #[allow(clippy::derive_hash_xor_eq)] // Our Hash is faithful to the data, we just don't have SecretKey::hash #[derive(Clone, PartialEq, Eq)] @@ -7445,6 +7475,10 @@ where probing_cookie_secret = Some(args.entropy_source.get_secure_random_bytes()); } + if !channel_closures.is_empty() { + pending_events_read.append(&mut channel_closures); + } + if pending_outbound_payments.is_none() && pending_outbound_payments_no_retry.is_none() { pending_outbound_payments = Some(pending_outbound_payments_compat); } else if pending_outbound_payments.is_none() { @@ -7453,7 +7487,13 @@ where outbounds.insert(id, PendingOutboundPayment::Legacy { session_privs }); } pending_outbound_payments = Some(outbounds); - } else { + } + let pending_outbounds = OutboundPayments { + pending_outbound_payments: Mutex::new(pending_outbound_payments.unwrap()), + retry_lock: Mutex::new(()) + }; + + { // If we're tracking pending payments, ensure we haven't lost any by looking at the // ChannelMonitor data for any channels for which we do not have authorative state // (i.e. those for which we just force-closed above or we otherwise don't have a @@ -7464,16 +7504,17 @@ where // 0.0.102+ for (_, monitor) in args.channel_monitors.iter() { if id_to_peer.get(&monitor.get_funding_txo().0.to_channel_id()).is_none() { - for (htlc_source, htlc) in monitor.get_pending_outbound_htlcs() { + for (htlc_source, (htlc, _)) in monitor.get_pending_or_resolved_outbound_htlcs() { if let HTLCSource::OutboundRoute { payment_id, session_priv, path, payment_secret, .. } = htlc_source { if path.is_empty() { log_error!(args.logger, "Got an empty path for a pending payment"); return Err(DecodeError::InvalidValue); } + let path_amt = path.last().unwrap().fee_msat; let mut session_priv_bytes = [0; 32]; session_priv_bytes[..].copy_from_slice(&session_priv[..]); - match pending_outbound_payments.as_mut().unwrap().entry(payment_id) { + match pending_outbounds.pending_outbound_payments.lock().unwrap().entry(payment_id) { hash_map::Entry::Occupied(mut entry) => { let newly_added = entry.get_mut().insert(session_priv_bytes, &path); log_info!(args.logger, "{} a pending payment path for {} msat for session priv {} on an existing pending payment with payment hash {}", @@ -7500,51 +7541,64 @@ where } } } - for (htlc_source, htlc) in monitor.get_all_current_outbound_htlcs() { - if let HTLCSource::PreviousHopData(prev_hop_data) = htlc_source { - let pending_forward_matches_htlc = |info: &PendingAddHTLCInfo| { - info.prev_funding_outpoint == prev_hop_data.outpoint && - info.prev_htlc_id == prev_hop_data.htlc_id - }; - // The ChannelMonitor is now responsible for this HTLC's - // failure/success and will let us know what its outcome is. If we - // still have an entry for this HTLC in `forward_htlcs` or - // `pending_intercepted_htlcs`, we were apparently not persisted after - // the monitor was when forwarding the payment. - forward_htlcs.retain(|_, forwards| { - forwards.retain(|forward| { - if let HTLCForwardInfo::AddHTLC(htlc_info) = forward { - if pending_forward_matches_htlc(&htlc_info) { - log_info!(args.logger, "Removing pending to-forward HTLC with hash {} as it was forwarded to the closed channel {}", - log_bytes!(htlc.payment_hash.0), log_bytes!(monitor.get_funding_txo().0.to_channel_id())); - false + for (htlc_source, (htlc, preimage_opt)) in monitor.get_all_current_outbound_htlcs() { + match htlc_source { + HTLCSource::PreviousHopData(prev_hop_data) => { + let pending_forward_matches_htlc = |info: &PendingAddHTLCInfo| { + info.prev_funding_outpoint == prev_hop_data.outpoint && + info.prev_htlc_id == prev_hop_data.htlc_id + }; + // The ChannelMonitor is now responsible for this HTLC's + // failure/success and will let us know what its outcome is. If we + // still have an entry for this HTLC in `forward_htlcs` or + // `pending_intercepted_htlcs`, we were apparently not persisted after + // the monitor was when forwarding the payment. + forward_htlcs.retain(|_, forwards| { + forwards.retain(|forward| { + if let HTLCForwardInfo::AddHTLC(htlc_info) = forward { + if pending_forward_matches_htlc(&htlc_info) { + log_info!(args.logger, "Removing pending to-forward HTLC with hash {} as it was forwarded to the closed channel {}", + log_bytes!(htlc.payment_hash.0), log_bytes!(monitor.get_funding_txo().0.to_channel_id())); + false + } else { true } } else { true } + }); + !forwards.is_empty() + }); + pending_intercepted_htlcs.as_mut().unwrap().retain(|intercepted_id, htlc_info| { + if pending_forward_matches_htlc(&htlc_info) { + log_info!(args.logger, "Removing pending intercepted HTLC with hash {} as it was forwarded to the closed channel {}", + log_bytes!(htlc.payment_hash.0), log_bytes!(monitor.get_funding_txo().0.to_channel_id())); + pending_events_read.retain(|event| { + if let Event::HTLCIntercepted { intercept_id: ev_id, .. } = event { + intercepted_id != ev_id + } else { true } + }); + false } else { true } }); - !forwards.is_empty() - }); - pending_intercepted_htlcs.as_mut().unwrap().retain(|intercepted_id, htlc_info| { - if pending_forward_matches_htlc(&htlc_info) { - log_info!(args.logger, "Removing pending intercepted HTLC with hash {} as it was forwarded to the closed channel {}", - log_bytes!(htlc.payment_hash.0), log_bytes!(monitor.get_funding_txo().0.to_channel_id())); - pending_events_read.retain(|event| { - if let Event::HTLCIntercepted { intercept_id: ev_id, .. } = event { - intercepted_id != ev_id - } else { true } - }); - false - } else { true } - }); + }, + HTLCSource::OutboundRoute { payment_id, session_priv, path, .. } => { + if let Some(preimage) = preimage_opt { + let pending_events = Mutex::new(pending_events_read); + // Note that we set `from_onchain` to "false" here, + // deliberately keeping the pending payment around forever. + // Given it should only occur when we have a channel we're + // force-closing for being stale that's okay. + // The alternative would be to wipe the state when claiming, + // generating a `PaymentPathSuccessful` event but regenerating + // it and the `PaymentSent` on every restart until the + // `ChannelMonitor` is removed. + pending_outbounds.claim_htlc(payment_id, preimage, session_priv, path, false, &pending_events, &args.logger); + pending_events_read = pending_events.into_inner().unwrap(); + } + }, } } } } } - let pending_outbounds = OutboundPayments { - pending_outbound_payments: Mutex::new(pending_outbound_payments.unwrap()), - retry_lock: Mutex::new(()) - }; if !forward_htlcs.is_empty() || pending_outbounds.needs_abandon() { // If we have pending HTLCs to forward, assume we either dropped a // `PendingHTLCsForwardable` or the user received it but never processed it as they @@ -7602,10 +7656,6 @@ where let mut secp_ctx = Secp256k1::new(); secp_ctx.seeded_randomize(&args.entropy_source.get_secure_random_bytes()); - if !channel_closures.is_empty() { - pending_events_read.append(&mut channel_closures); - } - let our_network_pubkey = match args.node_signer.get_node_id(Recipient::Node) { Ok(key) => key, Err(()) => return Err(DecodeError::InvalidValue) diff --git a/lightning/src/ln/payment_tests.rs b/lightning/src/ln/payment_tests.rs index c4cd0fc1b..15361b98a 100644 --- a/lightning/src/ln/payment_tests.rs +++ b/lightning/src/ln/payment_tests.rs @@ -2748,3 +2748,83 @@ fn test_threaded_payment_retries() { } } } + +fn do_no_missing_sent_on_midpoint_reload(persist_manager_with_payment: bool) { + // Test that if we reload in the middle of an HTLC claim commitment signed dance we'll still + // receive the PaymentSent event even if the ChannelManager had no idea about the payment when + // it was last persisted. + let chanmon_cfgs = create_chanmon_cfgs(2); + let node_cfgs = create_node_cfgs(2, &chanmon_cfgs); + let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]); + let (persister_a, persister_b, persister_c); + let (chain_monitor_a, chain_monitor_b, chain_monitor_c); + let (nodes_0_deserialized, nodes_0_deserialized_b, nodes_0_deserialized_c); + let mut nodes = create_network(2, &node_cfgs, &node_chanmgrs); + + let chan_id = create_announced_chan_between_nodes(&nodes, 0, 1).2; + + let mut nodes_0_serialized = Vec::new(); + if !persist_manager_with_payment { + nodes_0_serialized = nodes[0].node.encode(); + } + + let (our_payment_preimage, our_payment_hash, _) = route_payment(&nodes[0], &[&nodes[1]], 1_000_000); + + if persist_manager_with_payment { + nodes_0_serialized = nodes[0].node.encode(); + } + + nodes[1].node.claim_funds(our_payment_preimage); + check_added_monitors!(nodes[1], 1); + expect_payment_claimed!(nodes[1], our_payment_hash, 1_000_000); + + let updates = get_htlc_update_msgs!(nodes[1], nodes[0].node.get_our_node_id()); + nodes[0].node.handle_update_fulfill_htlc(&nodes[1].node.get_our_node_id(), &updates.update_fulfill_htlcs[0]); + nodes[0].node.handle_commitment_signed(&nodes[1].node.get_our_node_id(), &updates.commitment_signed); + check_added_monitors!(nodes[0], 1); + + // The ChannelMonitor should always be the latest version, as we're required to persist it + // during the commitment signed handling. + let chan_0_monitor_serialized = get_monitor!(nodes[0], chan_id).encode(); + reload_node!(nodes[0], test_default_channel_config(), &nodes_0_serialized, &[&chan_0_monitor_serialized], persister_a, chain_monitor_a, nodes_0_deserialized); + + let events = nodes[0].node.get_and_clear_pending_events(); + assert_eq!(events.len(), 2); + if let Event::ChannelClosed { reason: ClosureReason::OutdatedChannelManager, .. } = events[0] {} else { panic!(); } + if let Event::PaymentSent { payment_preimage, .. } = events[1] { assert_eq!(payment_preimage, our_payment_preimage); } else { panic!(); } + // Note that we don't get a PaymentPathSuccessful here as we leave the HTLC pending to avoid + // the double-claim that would otherwise appear at the end of this test. + let as_broadcasted_txn = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().split_off(0); + assert_eq!(as_broadcasted_txn.len(), 1); + + // Ensure that, even after some time, if we restart we still include *something* in the current + // `ChannelManager` which prevents a `PaymentFailed` when we restart even if pending resolved + // payments have since been timed out thanks to `IDEMPOTENCY_TIMEOUT_TICKS`. + // A naive implementation of the fix here would wipe the pending payments set, causing a + // failure event when we restart. + for _ in 0..(IDEMPOTENCY_TIMEOUT_TICKS * 2) { nodes[0].node.timer_tick_occurred(); } + + let chan_0_monitor_serialized = get_monitor!(nodes[0], chan_id).encode(); + reload_node!(nodes[0], test_default_channel_config(), &nodes[0].node.encode(), &[&chan_0_monitor_serialized], persister_b, chain_monitor_b, nodes_0_deserialized_b); + let events = nodes[0].node.get_and_clear_pending_events(); + assert!(events.is_empty()); + + // Ensure that we don't generate any further events even after the channel-closing commitment + // transaction is confirmed on-chain. + confirm_transaction(&nodes[0], &as_broadcasted_txn[0]); + for _ in 0..(IDEMPOTENCY_TIMEOUT_TICKS * 2) { nodes[0].node.timer_tick_occurred(); } + + let events = nodes[0].node.get_and_clear_pending_events(); + assert!(events.is_empty()); + + let chan_0_monitor_serialized = get_monitor!(nodes[0], chan_id).encode(); + reload_node!(nodes[0], test_default_channel_config(), &nodes[0].node.encode(), &[&chan_0_monitor_serialized], persister_c, chain_monitor_c, nodes_0_deserialized_c); + let events = nodes[0].node.get_and_clear_pending_events(); + assert!(events.is_empty()); +} + +#[test] +fn no_missing_sent_on_midpoint_reload() { + do_no_missing_sent_on_midpoint_reload(false); + do_no_missing_sent_on_midpoint_reload(true); +} diff --git a/lightning/src/sync/debug_sync.rs b/lightning/src/sync/debug_sync.rs index 5b6acbcad..dd106a9e8 100644 --- a/lightning/src/sync/debug_sync.rs +++ b/lightning/src/sync/debug_sync.rs @@ -201,6 +201,11 @@ pub struct Mutex { inner: StdMutex, deps: Arc, } +impl Mutex { + pub(crate) fn into_inner(self) -> LockResult { + self.inner.into_inner().map_err(|_| ()) + } +} #[must_use = "if unused the Mutex will immediately unlock"] pub struct MutexGuard<'a, T: Sized + 'a> { diff --git a/lightning/src/sync/nostd_sync.rs b/lightning/src/sync/nostd_sync.rs index 858f60db5..17307997d 100644 --- a/lightning/src/sync/nostd_sync.rs +++ b/lightning/src/sync/nostd_sync.rs @@ -60,6 +60,10 @@ impl Mutex { pub fn try_lock<'a>(&'a self) -> LockResult> { Ok(MutexGuard { lock: self.inner.borrow_mut() }) } + + pub fn into_inner(self) -> LockResult { + Ok(self.inner.into_inner()) + } } impl<'a, T: 'a> LockTestExt<'a> for Mutex {