From: Matt Corallo Date: Sun, 3 Oct 2021 22:33:12 +0000 (+0000) Subject: Track payments after they resolve until all HTLCs are finalized X-Git-Tag: v0.0.103~11^2~6 X-Git-Url: http://git.bitcoin.ninja/?a=commitdiff_plain;h=5fedae85d11289d5d39ead9d7dd07079c9a4b214;p=rust-lightning Track payments after they resolve until all HTLCs are finalized In the next commit, we will reload lost pending payments from ChannelMonitors during restart. However, in order to avoid re-adding pending payments which have already been fulfilled, we must ensure that we do not fully remove pending payments until all HTLCs for the payment have been fully removed from their ChannelMonitors. We do so here, introducing a new PendingOutboundPayment variant called `Completed` which only tracks the set of pending HTLCs. --- diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 019d61c57..187a95b53 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -416,19 +416,51 @@ pub(crate) enum PendingOutboundPayment { /// Our best known block height at the time this payment was initiated. starting_block_height: u32, }, + /// When a pending payment is fulfilled, we continue tracking it until all pending HTLCs have + /// been resolved. This ensures we don't look up pending payments in ChannelMonitors on restart + /// and add a pending payment that was already fulfilled. + Fulfilled { + session_privs: HashSet<[u8; 32]>, + }, } impl PendingOutboundPayment { - fn remove(&mut self, session_priv: &[u8; 32], part_amt_msat: u64) -> bool { + fn is_retryable(&self) -> bool { + match self { + PendingOutboundPayment::Retryable { .. } => true, + _ => false, + } + } + fn is_fulfilled(&self) -> bool { + match self { + PendingOutboundPayment::Fulfilled { .. } => true, + _ => false, + } + } + + fn mark_fulfilled(&mut self) { + let mut session_privs = HashSet::new(); + core::mem::swap(&mut session_privs, match self { + PendingOutboundPayment::Legacy { session_privs } | + PendingOutboundPayment::Retryable { session_privs, .. } | + PendingOutboundPayment::Fulfilled { session_privs } + => session_privs + }); + *self = PendingOutboundPayment::Fulfilled { session_privs }; + } + + /// panics if part_amt_msat is None and !self.is_fulfilled + fn remove(&mut self, session_priv: &[u8; 32], part_amt_msat: Option) -> bool { let remove_res = match self { PendingOutboundPayment::Legacy { session_privs } | - PendingOutboundPayment::Retryable { session_privs, .. } => { + PendingOutboundPayment::Retryable { session_privs, .. } | + PendingOutboundPayment::Fulfilled { session_privs } => { session_privs.remove(session_priv) } }; if remove_res { if let PendingOutboundPayment::Retryable { ref mut pending_amt_msat, .. } = self { - *pending_amt_msat -= part_amt_msat; + *pending_amt_msat -= part_amt_msat.expect("We must only not provide an amount if the payment was already fulfilled"); } } remove_res @@ -440,6 +472,7 @@ impl PendingOutboundPayment { PendingOutboundPayment::Retryable { session_privs, .. } => { session_privs.insert(session_priv) } + PendingOutboundPayment::Fulfilled { .. } => false }; if insert_res { if let PendingOutboundPayment::Retryable { ref mut pending_amt_msat, .. } = self { @@ -452,7 +485,8 @@ impl PendingOutboundPayment { fn remaining_parts(&self) -> usize { match self { PendingOutboundPayment::Legacy { session_privs } | - PendingOutboundPayment::Retryable { session_privs, .. } => { + PendingOutboundPayment::Retryable { session_privs, .. } | + PendingOutboundPayment::Fulfilled { session_privs } => { session_privs.len() } } @@ -1983,6 +2017,17 @@ impl ChannelMana let err: Result<(), _> = loop { let mut channel_lock = self.channel_state.lock().unwrap(); + + let mut pending_outbounds = self.pending_outbound_payments.lock().unwrap(); + let payment_entry = pending_outbounds.entry(payment_id); + if let hash_map::Entry::Occupied(payment) = &payment_entry { + if !payment.get().is_retryable() { + return Err(APIError::RouteError { + err: "Payment already completed" + }); + } + } + let id = match channel_lock.short_to_id.get(&path.first().unwrap().short_channel_id) { None => return Err(APIError::ChannelUnavailable{err: "No channel available with first hop!".to_owned()}), Some(id) => id.clone(), @@ -2006,8 +2051,7 @@ impl ChannelMana }, onion_packet, &self.logger), channel_state, chan); - let mut pending_outbounds = self.pending_outbound_payments.lock().unwrap(); - let payment = pending_outbounds.entry(payment_id).or_insert_with(|| PendingOutboundPayment::Retryable { + let payment = payment_entry.or_insert_with(|| PendingOutboundPayment::Retryable { session_privs: HashSet::new(), pending_amt_msat: 0, payment_hash: *payment_hash, @@ -2203,7 +2247,12 @@ impl ChannelMana return Err(PaymentSendFailure::ParameterError(APIError::APIMisuseError { err: "Unable to retry payments that were initially sent on LDK versions prior to 0.0.102".to_string() })) - } + }, + PendingOutboundPayment::Fulfilled { .. } => { + return Err(PaymentSendFailure::ParameterError(APIError::RouteError { + err: "Payment already completed" + })); + }, } } else { return Err(PaymentSendFailure::ParameterError(APIError::APIMisuseError { @@ -3031,7 +3080,9 @@ impl ChannelMana session_priv_bytes.copy_from_slice(&session_priv[..]); let mut outbounds = self.pending_outbound_payments.lock().unwrap(); if let hash_map::Entry::Occupied(mut payment) = outbounds.entry(payment_id) { - if payment.get_mut().remove(&session_priv_bytes, path.last().unwrap().fee_msat) { + if payment.get_mut().remove(&session_priv_bytes, Some(path.last().unwrap().fee_msat)) && + !payment.get().is_fulfilled() + { self.pending_events.lock().unwrap().push( events::Event::PaymentPathFailed { payment_hash, @@ -3077,10 +3128,14 @@ impl ChannelMana let mut outbounds = self.pending_outbound_payments.lock().unwrap(); let mut all_paths_failed = false; if let hash_map::Entry::Occupied(mut sessions) = outbounds.entry(payment_id) { - if !sessions.get_mut().remove(&session_priv_bytes, path.last().unwrap().fee_msat) { + if !sessions.get_mut().remove(&session_priv_bytes, Some(path.last().unwrap().fee_msat)) { log_trace!(self.logger, "Received duplicative fail for HTLC with payment_hash {}", log_bytes!(payment_hash.0)); return; } + if sessions.get().is_fulfilled() { + log_trace!(self.logger, "Received failure of HTLC with payment_hash {} after payment completion", log_bytes!(payment_hash.0)); + return; + } if sessions.get().remaining_parts() == 0 { all_paths_failed = true; } @@ -3329,6 +3384,23 @@ impl ChannelMana } else { unreachable!(); } } + fn finalize_claims(&self, mut sources: Vec) { + for source in sources.drain(..) { + if let HTLCSource::OutboundRoute { session_priv, payment_id, .. } = source { + let mut session_priv_bytes = [0; 32]; + session_priv_bytes.copy_from_slice(&session_priv[..]); + let mut outbounds = self.pending_outbound_payments.lock().unwrap(); + if let hash_map::Entry::Occupied(mut sessions) = outbounds.entry(payment_id) { + assert!(sessions.get().is_fulfilled()); + sessions.get_mut().remove(&session_priv_bytes, None); + if sessions.get().remaining_parts() == 0 { + sessions.remove(); + } + } + } + } + } + fn claim_funds_internal(&self, mut channel_state_lock: MutexGuard>, source: HTLCSource, payment_preimage: PaymentPreimage, forwarded_htlc_value_msat: Option, from_onchain: bool) { match source { HTLCSource::OutboundRoute { session_priv, payment_id, path, .. } => { @@ -3336,8 +3408,22 @@ impl ChannelMana let mut session_priv_bytes = [0; 32]; session_priv_bytes.copy_from_slice(&session_priv[..]); let mut outbounds = self.pending_outbound_payments.lock().unwrap(); - let found_payment = if let Some(mut sessions) = outbounds.remove(&payment_id) { - sessions.remove(&session_priv_bytes, path.last().unwrap().fee_msat) + let found_payment = if let hash_map::Entry::Occupied(mut sessions) = outbounds.entry(payment_id) { + let found_payment = !sessions.get().is_fulfilled(); + sessions.get_mut().mark_fulfilled(); + if from_onchain { + // We currently immediately remove HTLCs which were fulfilled on-chain. + // This could potentially lead to removing a pending payment too early, + // with a reorg of one block causing us to re-add the fulfilled payment on + // restart. + // TODO: We should have a second monitor event that informs us of payments + // irrevocably fulfilled. + sessions.get_mut().remove(&session_priv_bytes, Some(path.last().unwrap().fee_msat)); + if sessions.get().remaining_parts() == 0 { + sessions.remove(); + } + } + found_payment } else { false }; if found_payment { let payment_hash = PaymentHash(Sha256::hash(&payment_preimage.0).into_inner()); @@ -3412,7 +3498,7 @@ impl ChannelMana let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); let chan_restoration_res; - let mut pending_failures = { + let (mut pending_failures, finalized_claims) = { let mut channel_lock = self.channel_state.lock().unwrap(); let channel_state = &mut *channel_lock; let mut channel = match channel_state.by_id.entry(funding_txo.to_channel_id()) { @@ -3434,14 +3520,14 @@ impl ChannelMana msg: self.get_channel_update_for_unicast(channel.get()).unwrap(), }) } else { None }; - // TODO: Handle updates.finalized_claimed_htlcs! chan_restoration_res = handle_chan_restoration_locked!(self, channel_lock, channel_state, channel, updates.raa, updates.commitment_update, updates.order, None, updates.accepted_htlcs, updates.funding_broadcastable, updates.funding_locked); if let Some(upd) = channel_update { channel_state.pending_msg_events.push(upd); } - updates.failed_htlcs + (updates.failed_htlcs, updates.finalized_claimed_htlcs) }; post_handle_chan_restoration!(self, chan_restoration_res); + self.finalize_claims(finalized_claims); for failure in pending_failures.drain(..) { self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), failure.0, &failure.1, failure.2); } @@ -3962,6 +4048,7 @@ impl ChannelMana }); } break Ok((raa_updates.accepted_htlcs, raa_updates.failed_htlcs, + raa_updates.finalized_claimed_htlcs, chan.get().get_short_channel_id() .expect("RAA should only work on a short-id-available channel"), chan.get().get_funding_txo().unwrap())) @@ -3971,11 +4058,14 @@ impl ChannelMana }; self.fail_holding_cell_htlcs(htlcs_to_fail, msg.channel_id); match res { - Ok((pending_forwards, mut pending_failures, short_channel_id, channel_outpoint)) => { + Ok((pending_forwards, mut pending_failures, finalized_claim_htlcs, + short_channel_id, channel_outpoint)) => + { for failure in pending_failures.drain(..) { self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), failure.0, &failure.1, failure.2); } self.forward_htlcs(&mut [(short_channel_id, channel_outpoint, pending_forwards)]); + self.finalize_claims(finalized_claim_htlcs); Ok(()) }, Err(e) => Err(e) @@ -5338,10 +5428,13 @@ impl_writeable_tlv_based!(PendingInboundPayment, { (8, min_value_msat, required), }); -impl_writeable_tlv_based_enum!(PendingOutboundPayment, +impl_writeable_tlv_based_enum_upgradable!(PendingOutboundPayment, (0, Legacy) => { (0, session_privs, required), }, + (1, Fulfilled) => { + (0, session_privs, required), + }, (2, Retryable) => { (0, session_privs, required), (2, payment_hash, required), @@ -5350,7 +5443,7 @@ impl_writeable_tlv_based_enum!(PendingOutboundPayment, (8, pending_amt_msat, required), (10, starting_block_height, required), }, -;); +); impl Writeable for ChannelManager where M::Target: chain::Watch, @@ -5443,7 +5536,9 @@ impl Writeable f // For backwards compat, write the session privs and their total length. let mut num_pending_outbounds_compat: u64 = 0; for (_, outbound) in pending_outbound_payments.iter() { - num_pending_outbounds_compat += outbound.remaining_parts() as u64; + if !outbound.is_fulfilled() { + num_pending_outbounds_compat += outbound.remaining_parts() as u64; + } } num_pending_outbounds_compat.write(writer)?; for (_, outbound) in pending_outbound_payments.iter() { @@ -5454,6 +5549,7 @@ impl Writeable f session_priv.write(writer)?; } } + PendingOutboundPayment::Fulfilled { .. } => {}, } } @@ -5464,7 +5560,8 @@ impl Writeable f PendingOutboundPayment::Legacy { session_privs } | PendingOutboundPayment::Retryable { session_privs, .. } => { pending_outbound_payments_no_retry.insert(*id, session_privs.clone()); - } + }, + _ => {}, } } write_tlv_fields!(writer, {