From 4896e20086ef555e70ec171278e66bd14788a265 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Mon, 30 Sep 2024 20:09:01 +0000 Subject: [PATCH] Replay MPP claims via background events using new CM metadata When we claim an MPP payment, then crash before persisting all the relevant `ChannelMonitor`s, we rely on the payment data being available in the `ChannelManager` on restart to re-claim any parts that haven't yet been claimed. This is fine as long as the `ChannelManager` was persisted before the `PaymentClaimable` event was processed, which is generally the case in our `lightning-background-processor`, but may not be in other cases or in a somewhat rare race. In order to fix this, we need to track where all the MPP parts of a payment are in the `ChannelMonitor`, allowing us to re-claim any missing pieces without reference to any `ChannelManager` data. Further, in order to properly generate a `PaymentClaimed` event against the re-started claim, we have to store various payment metadata with the HTLC list as well. Here we finally implement claiming using the new MPP part list and metadata stored in `ChannelMonitor`s. In doing so, we use much more of the existing HTLC-claiming pipeline in `ChannelManager`, utilizing the on-startup background events flow as well as properly re-applying the RAA-blockers to ensure preimages cannot be lost. --- lightning/src/ln/channel.rs | 2 +- lightning/src/ln/channelmanager.rs | 250 +++++++++++++++++++++-------- lightning/src/ln/reload_tests.rs | 27 +++- 3 files changed, 203 insertions(+), 76 deletions(-) diff --git a/lightning/src/ln/channel.rs b/lightning/src/ln/channel.rs index 42e4b2f3e..701889626 100644 --- a/lightning/src/ln/channel.rs +++ b/lightning/src/ln/channel.rs @@ -1290,7 +1290,7 @@ pub(super) struct ChannelContext where SP::Target: SignerProvider { // further `send_update_fee` calls, dropping the previous holding cell update entirely. holding_cell_update_fee: Option, next_holder_htlc_id: u64, - next_counterparty_htlc_id: u64, + pub(super) next_counterparty_htlc_id: u64, feerate_per_kw: u32, /// The timestamp set on our latest `channel_update` message for this channel. It is updated diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 779ac25f8..6f88985d9 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -1130,9 +1130,34 @@ impl_writeable_tlv_based_enum!(EventCompletionAction, } ); +/// The source argument which is passed to [`ChannelManager::claim_mpp_part`]. +/// +/// This is identical to [`MPPClaimHTLCSource`] except that [`Self::counterparty_node_id`] is an +/// `Option`, whereas it is required in [`MPPClaimHTLCSource`]. In the future, we should ideally +/// drop this and merge the two, however doing so may break upgrades for nodes which have pending +/// forwarded payments. +struct HTLCClaimSource { + counterparty_node_id: Option, + funding_txo: OutPoint, + channel_id: ChannelId, + htlc_id: u64, +} + +impl From<&MPPClaimHTLCSource> for HTLCClaimSource { + fn from(o: &MPPClaimHTLCSource) -> HTLCClaimSource { + HTLCClaimSource { + counterparty_node_id: Some(o.counterparty_node_id), + funding_txo: o.funding_txo, + channel_id: o.channel_id, + htlc_id: o.htlc_id, + } + } +} + #[derive(Clone, Debug, PartialEq, Eq)] /// The source of an HTLC which is being claimed as a part of an incoming payment. Each part is -/// tracked in [`PendingMPPClaim`]. +/// tracked in [`PendingMPPClaim`] as well as in [`ChannelMonitor`]s, so that it can be converted +/// to an [`HTLCClaimSource`] for claim replays on startup. struct MPPClaimHTLCSource { counterparty_node_id: PublicKey, funding_txo: OutPoint, @@ -6896,6 +6921,27 @@ where >( &self, prev_hop: HTLCPreviousHopData, payment_preimage: PaymentPreimage, payment_info: Option, completion_action: ComplFunc, + ) { + let counterparty_node_id = + match self.short_to_chan_info.read().unwrap().get(&prev_hop.short_channel_id) { + Some((cp_id, _dup_chan_id)) => Some(cp_id.clone()), + None => None + }; + + let htlc_source = HTLCClaimSource { + counterparty_node_id, + funding_txo: prev_hop.outpoint, + channel_id: prev_hop.channel_id, + htlc_id: prev_hop.htlc_id, + }; + self.claim_mpp_part(htlc_source, payment_preimage, payment_info, completion_action) + } + + fn claim_mpp_part< + ComplFunc: FnOnce(Option, bool) -> (Option, Option) + >( + &self, prev_hop: HTLCClaimSource, payment_preimage: PaymentPreimage, + payment_info: Option, completion_action: ComplFunc, ) { //TODO: Delay the claimed_funds relaying just like we do outbound relay! @@ -6912,12 +6958,8 @@ where { let per_peer_state = self.per_peer_state.read().unwrap(); let chan_id = prev_hop.channel_id; - let counterparty_node_id_opt = match self.short_to_chan_info.read().unwrap().get(&prev_hop.short_channel_id) { - Some((cp_id, _dup_chan_id)) => Some(cp_id.clone()), - None => None - }; - let peer_state_opt = counterparty_node_id_opt.as_ref().map( + let peer_state_opt = prev_hop.counterparty_node_id.as_ref().map( |counterparty_node_id| per_peer_state.get(counterparty_node_id) .map(|peer_mutex| peer_mutex.lock().unwrap()) ).unwrap_or(None); @@ -6944,7 +6986,7 @@ where peer_state.actions_blocking_raa_monitor_updates.entry(chan_id).or_insert_with(Vec::new).push(raa_blocker); } if !during_init { - handle_new_monitor_update!(self, prev_hop.outpoint, monitor_update, peer_state_lock, + handle_new_monitor_update!(self, prev_hop.funding_txo, monitor_update, peer_state_lock, peer_state, per_peer_state, chan); } else { // If we're running during init we cannot update a monitor directly - @@ -6953,7 +6995,7 @@ where self.pending_background_events.lock().unwrap().push( BackgroundEvent::MonitorUpdateRegeneratedOnStartup { counterparty_node_id, - funding_txo: prev_hop.outpoint, + funding_txo: prev_hop.funding_txo, channel_id: prev_hop.channel_id, update: monitor_update.clone(), }); @@ -7027,7 +7069,7 @@ where } let preimage_update = ChannelMonitorUpdate { update_id: CLOSED_CHANNEL_UPDATE_ID, - counterparty_node_id: None, + counterparty_node_id: prev_hop.counterparty_node_id, updates: vec![ChannelMonitorUpdateStep::PaymentPreimage { payment_preimage, payment_info, @@ -7038,7 +7080,7 @@ where if !during_init { // We update the ChannelMonitor on the backward link, after // receiving an `update_fulfill_htlc` from the forward link. - let update_res = self.chain_monitor.update_channel(prev_hop.outpoint, &preimage_update); + let update_res = self.chain_monitor.update_channel(prev_hop.funding_txo, &preimage_update); if update_res != ChannelMonitorUpdateStatus::Completed { // TODO: This needs to be handled somehow - if we receive a monitor update // with a preimage we *must* somehow manage to propagate it to the upstream @@ -7061,7 +7103,7 @@ where // complete the monitor update completion action from `completion_action`. self.pending_background_events.lock().unwrap().push( BackgroundEvent::ClosedMonitorUpdateRegeneratedOnStartup(( - prev_hop.outpoint, prev_hop.channel_id, preimage_update, + prev_hop.funding_txo, prev_hop.channel_id, preimage_update, ))); } // Note that we do process the completion action here. This totally could be a @@ -7312,7 +7354,7 @@ where onion_fields, payment_id, }) = payment { - self.pending_events.lock().unwrap().push_back((events::Event::PaymentClaimed { + let event = events::Event::PaymentClaimed { payment_hash, purpose, amount_msat, @@ -7321,7 +7363,16 @@ where sender_intended_total_msat, onion_fields, payment_id, - }, None)); + }; + let event_action = (event, None); + let mut pending_events = self.pending_events.lock().unwrap(); + // If we're replaying a claim on startup we may end up duplicating an event + // that's already in our queue, so check before we push another one. The + // `payment_id` should suffice to ensure we never spuriously drop a second + // event for a duplicate payment. + if !pending_events.contains(&event_action) { + pending_events.push_back(event_action); + } } }, MonitorUpdateCompletionAction::EmitEventAndFreeOtherChannel { @@ -13130,67 +13181,126 @@ where }; for (_, monitor) in args.channel_monitors.iter() { - for (payment_hash, (payment_preimage, _)) in monitor.get_stored_preimages() { - let per_peer_state = channel_manager.per_peer_state.read().unwrap(); - let mut claimable_payments = channel_manager.claimable_payments.lock().unwrap(); - let payment = claimable_payments.claimable_payments.remove(&payment_hash); - mem::drop(claimable_payments); - if let Some(payment) = payment { - log_info!(channel_manager.logger, "Re-claiming HTLCs with payment hash {} as we've released the preimage to a ChannelMonitor!", &payment_hash); - let mut claimable_amt_msat = 0; - let mut receiver_node_id = Some(our_network_pubkey); - let phantom_shared_secret = payment.htlcs[0].prev_hop.phantom_shared_secret; - if phantom_shared_secret.is_some() { - let phantom_pubkey = channel_manager.node_signer.get_node_id(Recipient::PhantomNode) - .expect("Failed to get node_id for phantom node recipient"); - receiver_node_id = Some(phantom_pubkey) - } - for claimable_htlc in &payment.htlcs { - claimable_amt_msat += claimable_htlc.value; - - // Add a holding-cell claim of the payment to the Channel, which should be - // applied ~immediately on peer reconnection. Because it won't generate a - // new commitment transaction we can just provide the payment preimage to - // the corresponding ChannelMonitor and nothing else. - // - // We do so directly instead of via the normal ChannelMonitor update - // procedure as the ChainMonitor hasn't yet been initialized, implying - // we're not allowed to call it directly yet. Further, we do the update - // without incrementing the ChannelMonitor update ID as there isn't any - // reason to. - // If we were to generate a new ChannelMonitor update ID here and then - // crash before the user finishes block connect we'd end up force-closing - // this channel as well. On the flip side, there's no harm in restarting - // without the new monitor persisted - we'll end up right back here on - // restart. - let previous_channel_id = claimable_htlc.prev_hop.channel_id; - let peer_node_id_opt = channel_manager.outpoint_to_peer.lock().unwrap() - .get(&claimable_htlc.prev_hop.outpoint).cloned(); - if let Some(peer_node_id) = peer_node_id_opt { - let peer_state_mutex = per_peer_state.get(&peer_node_id).unwrap(); - let mut peer_state_lock = peer_state_mutex.lock().unwrap(); - let peer_state = &mut *peer_state_lock; - if let Some(ChannelPhase::Funded(channel)) = peer_state.channel_by_id.get_mut(&previous_channel_id) { - let logger = WithChannelContext::from(&channel_manager.logger, &channel.context, Some(payment_hash)); - channel.claim_htlc_while_disconnected_dropping_mon_update(claimable_htlc.prev_hop.htlc_id, payment_preimage, &&logger); + for (payment_hash, (payment_preimage, payment_claims)) in monitor.get_stored_preimages() { + if !payment_claims.is_empty() { + for payment_claim in payment_claims { + if payment_claim.mpp_parts.is_empty() { + return Err(DecodeError::InvalidValue); + } + let pending_claims = PendingMPPClaim { + channels_without_preimage: payment_claim.mpp_parts.clone(), + channels_with_preimage: Vec::new(), + }; + let pending_claim_ptr_opt = Some(Arc::new(Mutex::new(pending_claims))); + + // While it may be duplicative to generate a PaymentClaimed here, trying to + // figure out if the user definitely saw it before shutdown would require some + // nontrivial logic and may break as we move away from regularly persisting + // ChannelManager. Instead, we rely on the users' event handler being + // idempotent and just blindly generate one no matter what, letting the + // preimages eventually timing out from ChannelMonitors to prevent us from + // doing so forever. + + let claim_found = + channel_manager.claimable_payments.lock().unwrap().begin_claiming_payment( + payment_hash, &channel_manager.node_signer, &channel_manager.logger, + &channel_manager.inbound_payment_id_secret, true, + ); + if claim_found.is_err() { + let mut claimable_payments = channel_manager.claimable_payments.lock().unwrap(); + match claimable_payments.pending_claiming_payments.entry(payment_hash) { + hash_map::Entry::Occupied(_) => { + debug_assert!(false, "Entry was added in begin_claiming_payment"); + return Err(DecodeError::InvalidValue); + }, + hash_map::Entry::Vacant(entry) => { + entry.insert(payment_claim.claiming_payment); + }, } } - if let Some(previous_hop_monitor) = args.channel_monitors.get(&claimable_htlc.prev_hop.outpoint) { - previous_hop_monitor.provide_payment_preimage(&payment_hash, &payment_preimage, &channel_manager.tx_broadcaster, &channel_manager.fee_estimator, &channel_manager.logger); + + for part in payment_claim.mpp_parts.iter() { + let pending_mpp_claim = pending_claim_ptr_opt.as_ref().map(|ptr| ( + part.counterparty_node_id, part.channel_id, part.htlc_id, + PendingMPPClaimPointer(Arc::clone(&ptr)) + )); + let pending_claim_ptr = pending_claim_ptr_opt.as_ref().map(|ptr| + RAAMonitorUpdateBlockingAction::ClaimedMPPPayment { + pending_claim: PendingMPPClaimPointer(Arc::clone(&ptr)), + } + ); + // Note that we don't need to pass the `payment_info` here - its + // already (clearly) durably on disk in the `ChannelMonitor` so there's + // no need to worry about getting it into others. + channel_manager.claim_mpp_part( + part.into(), payment_preimage, None, + |_, _| + (Some(MonitorUpdateCompletionAction::PaymentClaimed { payment_hash, pending_mpp_claim }), pending_claim_ptr) + ); } } - let mut pending_events = channel_manager.pending_events.lock().unwrap(); - let payment_id = payment.inbound_payment_id(&inbound_payment_id_secret.unwrap()); - pending_events.push_back((events::Event::PaymentClaimed { - receiver_node_id, - payment_hash, - purpose: payment.purpose, - amount_msat: claimable_amt_msat, - htlcs: payment.htlcs.iter().map(events::ClaimedHTLC::from).collect(), - sender_intended_total_msat: payment.htlcs.first().map(|htlc| htlc.total_msat), - onion_fields: payment.onion_fields, - payment_id: Some(payment_id), - }, None)); + } else { + let per_peer_state = channel_manager.per_peer_state.read().unwrap(); + let mut claimable_payments = channel_manager.claimable_payments.lock().unwrap(); + let payment = claimable_payments.claimable_payments.remove(&payment_hash); + mem::drop(claimable_payments); + if let Some(payment) = payment { + log_info!(channel_manager.logger, "Re-claiming HTLCs with payment hash {} as we've released the preimage to a ChannelMonitor!", &payment_hash); + let mut claimable_amt_msat = 0; + let mut receiver_node_id = Some(our_network_pubkey); + let phantom_shared_secret = payment.htlcs[0].prev_hop.phantom_shared_secret; + if phantom_shared_secret.is_some() { + let phantom_pubkey = channel_manager.node_signer.get_node_id(Recipient::PhantomNode) + .expect("Failed to get node_id for phantom node recipient"); + receiver_node_id = Some(phantom_pubkey) + } + for claimable_htlc in &payment.htlcs { + claimable_amt_msat += claimable_htlc.value; + + // Add a holding-cell claim of the payment to the Channel, which should be + // applied ~immediately on peer reconnection. Because it won't generate a + // new commitment transaction we can just provide the payment preimage to + // the corresponding ChannelMonitor and nothing else. + // + // We do so directly instead of via the normal ChannelMonitor update + // procedure as the ChainMonitor hasn't yet been initialized, implying + // we're not allowed to call it directly yet. Further, we do the update + // without incrementing the ChannelMonitor update ID as there isn't any + // reason to. + // If we were to generate a new ChannelMonitor update ID here and then + // crash before the user finishes block connect we'd end up force-closing + // this channel as well. On the flip side, there's no harm in restarting + // without the new monitor persisted - we'll end up right back here on + // restart. + let previous_channel_id = claimable_htlc.prev_hop.channel_id; + let peer_node_id_opt = channel_manager.outpoint_to_peer.lock().unwrap() + .get(&claimable_htlc.prev_hop.outpoint).cloned(); + if let Some(peer_node_id) = peer_node_id_opt { + let peer_state_mutex = per_peer_state.get(&peer_node_id).unwrap(); + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); + let peer_state = &mut *peer_state_lock; + if let Some(ChannelPhase::Funded(channel)) = peer_state.channel_by_id.get_mut(&previous_channel_id) { + let logger = WithChannelContext::from(&channel_manager.logger, &channel.context, Some(payment_hash)); + channel.claim_htlc_while_disconnected_dropping_mon_update(claimable_htlc.prev_hop.htlc_id, payment_preimage, &&logger); + } + } + if let Some(previous_hop_monitor) = args.channel_monitors.get(&claimable_htlc.prev_hop.outpoint) { + previous_hop_monitor.provide_payment_preimage(&payment_hash, &payment_preimage, &channel_manager.tx_broadcaster, &channel_manager.fee_estimator, &channel_manager.logger); + } + } + let mut pending_events = channel_manager.pending_events.lock().unwrap(); + let payment_id = payment.inbound_payment_id(&inbound_payment_id_secret.unwrap()); + pending_events.push_back((events::Event::PaymentClaimed { + receiver_node_id, + payment_hash, + purpose: payment.purpose, + amount_msat: claimable_amt_msat, + htlcs: payment.htlcs.iter().map(events::ClaimedHTLC::from).collect(), + sender_intended_total_msat: payment.htlcs.first().map(|htlc| htlc.total_msat), + onion_fields: payment.onion_fields, + payment_id: Some(payment_id), + }, None)); + } } } } diff --git a/lightning/src/ln/reload_tests.rs b/lightning/src/ln/reload_tests.rs index 6ae465b32..a043d5c0c 100644 --- a/lightning/src/ln/reload_tests.rs +++ b/lightning/src/ln/reload_tests.rs @@ -878,27 +878,39 @@ fn do_test_partial_claim_before_restart(persist_both_monitors: bool) { // Now restart nodes[3]. reload_node!(nodes[3], original_manager, &[&updated_monitor.0, &original_monitor.0], persister, new_chain_monitor, nodes_3_deserialized); - // On startup the preimage should have been copied into the non-persisted monitor: + // Until the startup background events are processed (in `get_and_clear_pending_events`, + // below), the preimage is not copied to the non-persisted monitor... assert!(get_monitor!(nodes[3], chan_id_persisted).get_stored_preimages().contains_key(&payment_hash)); - assert!(get_monitor!(nodes[3], chan_id_not_persisted).get_stored_preimages().contains_key(&payment_hash)); + assert_eq!( + get_monitor!(nodes[3], chan_id_not_persisted).get_stored_preimages().contains_key(&payment_hash), + persist_both_monitors, + ); nodes[1].node.peer_disconnected(nodes[3].node.get_our_node_id()); nodes[2].node.peer_disconnected(nodes[3].node.get_our_node_id()); // During deserialization, we should have closed one channel and broadcast its latest // commitment transaction. We should also still have the original PaymentClaimable event we - // never finished processing. + // never finished processing as well as a PaymentClaimed event regenerated when we replayed the + // preimage onto the non-persisted monitor. let events = nodes[3].node.get_and_clear_pending_events(); assert_eq!(events.len(), if persist_both_monitors { 4 } else { 3 }); if let Event::PaymentClaimable { amount_msat: 15_000_000, .. } = events[0] { } else { panic!(); } if let Event::ChannelClosed { reason: ClosureReason::OutdatedChannelManager, .. } = events[1] { } else { panic!(); } if persist_both_monitors { if let Event::ChannelClosed { reason: ClosureReason::OutdatedChannelManager, .. } = events[2] { } else { panic!(); } - check_added_monitors(&nodes[3], 2); + if let Event::PaymentClaimed { amount_msat: 15_000_000, .. } = events[3] { } else { panic!(); } + check_added_monitors(&nodes[3], 6); } else { - check_added_monitors(&nodes[3], 1); + if let Event::PaymentClaimed { amount_msat: 15_000_000, .. } = events[2] { } else { panic!(); } + check_added_monitors(&nodes[3], 3); } + // Now that we've processed background events, the preimage should have been copied into the + // non-persisted monitor: + assert!(get_monitor!(nodes[3], chan_id_persisted).get_stored_preimages().contains_key(&payment_hash)); + assert!(get_monitor!(nodes[3], chan_id_not_persisted).get_stored_preimages().contains_key(&payment_hash)); + // On restart, we should also get a duplicate PaymentClaimed event as we persisted the // ChannelManager prior to handling the original one. if let Event::PaymentClaimed { payment_hash: our_payment_hash, amount_msat: 15_000_000, .. } = @@ -948,6 +960,11 @@ fn do_test_partial_claim_before_restart(persist_both_monitors: bool) { nodes[0].node.handle_update_fulfill_htlc(nodes[2].node.get_our_node_id(), &cs_updates.update_fulfill_htlcs[0]); commitment_signed_dance!(nodes[0], nodes[2], cs_updates.commitment_signed, false, true); expect_payment_sent!(nodes[0], payment_preimage); + + // Ensure that the remaining channel is fully operation and not blocked (and that after a + // cycle of commitment updates the payment preimage is ultimately pruned). + send_payment(&nodes[0], &[&nodes[2], &nodes[3]], 100_000); + assert!(!get_monitor!(nodes[3], chan_id_not_persisted).get_stored_preimages().contains_key(&payment_hash)); } } -- 2.39.5