]> git.bitcoin.ninja Git - rust-lightning/commitdiff
Block monitor updates to ensure preimages are in each MPP part
authorMatt Corallo <git@bluematt.me>
Thu, 13 Jun 2024 00:29:01 +0000 (00:29 +0000)
committerMatt Corallo <git@bluematt.me>
Mon, 8 Jul 2024 19:07:11 +0000 (19:07 +0000)
If we claim an MPP payment and only persist some of the
`ChannelMonitorUpdate`s which include the preimage prior to
shutting down, we may be in a state where some of our
`ChannelMonitor`s have the preimage for a payment while others do
not.

This, it turns out, is actually mostly safe - on startup
`ChanelManager` will detect there's a payment it has as unclaimed
but there's a corresponding payment preimage in a `ChannelMonitor`
and go claim the other MPP parts. This works so long as the
`ChannelManager` has been persisted after the payment has been
received but prior to the `PaymentClaimable` event being processed
(and the claim itself occurring). This is not always true and
certainly not required on our API, but our
`lightning-background-processor` today does persist prior to event
handling so is generally true subject to some race conditions.

In order to address this we need to use copy payment preimages
across channels irrespective of the `ChannelManager`'s payment
state, but this introduces another wrinkle - if one channel makes
substantial progress while other channel(s) are still waiting to
get the payment preimage in `ChannelMonitor`(s) while the
`ChannelManager` hasn't been persisted after the payment was
received, we may end up without the preimage on disk.

Here, we address this issue by using the new
`RAAMonitorUpdateBlockingAction` variant for this specific case. We
block persistence of an RAA `ChannelMonitorUpdate` which may remove
the preimage from disk until all channels have had the preimage
added to their `ChannelMonitor`.

We do this only in-memory (and not on disk) as we can recreate this
blocker during the startup re-claim logic.

This will enable us to claim MPP parts without using the
`ChannelManager`'s payment state in later work.

lightning/src/ln/channelmanager.rs

index 11f686bf3addca8a15adba49ec6bf66c91a696e5..67159ef52f39a75291a9b9f0c70e37b6a3bd151d 100644 (file)
@@ -799,7 +799,13 @@ pub(crate) enum MonitorUpdateCompletionAction {
        /// [`events::Event::PaymentClaimed`] to the user if we haven't yet generated such an event for
        /// this payment. Note that this is only best-effort. On restart it's possible such a duplicate
        /// event can be generated.
-       PaymentClaimed { payment_hash: PaymentHash },
+       PaymentClaimed {
+               payment_hash: PaymentHash,
+               /// A pending MPP claim which hasn't yet completed.
+               ///
+               /// Not written to disk.
+               pending_mpp_claim: Option<(PublicKey, ChannelId, u64, PendingMPPClaimPointer)>,
+       },
        /// Indicates an [`events::Event`] should be surfaced to the user and possibly resume the
        /// operation of another channel.
        ///
@@ -833,7 +839,10 @@ pub(crate) enum MonitorUpdateCompletionAction {
 }
 
 impl_writeable_tlv_based_enum_upgradable!(MonitorUpdateCompletionAction,
-       (0, PaymentClaimed) => { (0, payment_hash, required) },
+       (0, PaymentClaimed) => {
+               (0, payment_hash, required),
+               (9999999999, pending_mpp_claim, (static_value, None)),
+       },
        // Note that FreeOtherChannelImmediately should never be written - we were supposed to free
        // *immediately*. However, for simplicity we implement read/write here.
        (1, FreeOtherChannelImmediately) => {
@@ -6200,7 +6209,7 @@ where
 
                let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self);
 
-               let mut sources = {
+               let sources = {
                        let mut claimable_payments = self.claimable_payments.lock().unwrap();
                        if let Some(payment) = claimable_payments.claimable_payments.remove(&payment_hash) {
                                let mut receiver_node_id = self.our_network_pubkey;
@@ -6295,18 +6304,46 @@ where
                        return;
                }
                if valid_mpp {
-                       for htlc in sources.drain(..) {
+                       let pending_mpp_claim_ptr_opt = if sources.len() > 1 {
+                               let channels_without_preimage = sources.iter().filter_map(|htlc| {
+                                       if let Some(cp_id) = htlc.prev_hop.counterparty_node_id {
+                                               let prev_hop = &htlc.prev_hop;
+                                               Some((cp_id, prev_hop.outpoint, prev_hop.channel_id, prev_hop.htlc_id))
+                                       } else {
+                                               None
+                                       }
+                               }).collect();
+                               Some(Arc::new(Mutex::new(PendingMPPClaim {
+                                       channels_without_preimage,
+                                       channels_with_preimage: Vec::new(),
+                               })))
+                       } else {
+                               None
+                       };
+                       for htlc in sources {
+                               let this_mpp_claim = pending_mpp_claim_ptr_opt.as_ref().and_then(|pending_mpp_claim|
+                                       if let Some(cp_id) = htlc.prev_hop.counterparty_node_id {
+                                               let claim_ptr = PendingMPPClaimPointer(Arc::clone(pending_mpp_claim));
+                                               Some((cp_id, htlc.prev_hop.channel_id, htlc.prev_hop.htlc_id, claim_ptr))
+                                       } else {
+                                               None
+                                       }
+                               );
+                               let raa_blocker = pending_mpp_claim_ptr_opt.as_ref().map(|pending_claim| {
+                                       RAAMonitorUpdateBlockingAction::ClaimedMPPPayment {
+                                               pending_claim: PendingMPPClaimPointer(Arc::clone(pending_claim)),
+                                       }
+                               });
                                self.claim_funds_from_hop(
                                        htlc.prev_hop, payment_preimage,
                                        |_, definitely_duplicate| {
                                                debug_assert!(!definitely_duplicate, "We shouldn't claim duplicatively from a payment");
-                                               Some(MonitorUpdateCompletionAction::PaymentClaimed { payment_hash })
+                                               (Some(MonitorUpdateCompletionAction::PaymentClaimed { payment_hash, pending_mpp_claim: this_mpp_claim }), raa_blocker)
                                        }
                                );
                        }
-               }
-               if !valid_mpp {
-                       for htlc in sources.drain(..) {
+               } else {
+                       for htlc in sources {
                                let mut htlc_msat_height_data = htlc.value.to_be_bytes().to_vec();
                                htlc_msat_height_data.extend_from_slice(&self.best_block.read().unwrap().height.to_be_bytes());
                                let source = HTLCSource::PreviousHopData(htlc.prev_hop);
@@ -6324,7 +6361,9 @@ where
                }
        }
 
-       fn claim_funds_from_hop<ComplFunc: FnOnce(Option<u64>, bool) -> Option<MonitorUpdateCompletionAction>>(
+       fn claim_funds_from_hop<
+               ComplFunc: FnOnce(Option<u64>, bool) -> (Option<MonitorUpdateCompletionAction>, Option<RAAMonitorUpdateBlockingAction>)
+       >(
                &self, prev_hop: HTLCPreviousHopData, payment_preimage: PaymentPreimage,
                completion_action: ComplFunc,
        ) {
@@ -6364,11 +6403,15 @@ where
 
                                                match fulfill_res {
                                                        UpdateFulfillCommitFetch::NewClaim { htlc_value_msat, monitor_update } => {
-                                                               if let Some(action) = completion_action(Some(htlc_value_msat), false) {
+                                                               let (action_opt, raa_blocker_opt) = completion_action(Some(htlc_value_msat), false);
+                                                               if let Some(action) = action_opt {
                                                                        log_trace!(logger, "Tracking monitor update completion action for channel {}: {:?}",
                                                                                chan_id, action);
                                                                        peer_state.monitor_update_blocked_actions.entry(chan_id).or_insert(Vec::new()).push(action);
                                                                }
+                                                               if let Some(raa_blocker) = raa_blocker_opt {
+                                                                       peer_state.actions_blocking_raa_monitor_updates.entry(chan_id).or_insert_with(Vec::new).push(raa_blocker);
+                                                               }
                                                                if !during_init {
                                                                        handle_new_monitor_update!(self, prev_hop.outpoint, monitor_update, peer_state_lock,
                                                                                peer_state, per_peer_state, chan);
@@ -6386,11 +6429,16 @@ where
                                                                }
                                                        }
                                                        UpdateFulfillCommitFetch::DuplicateClaim {} => {
-                                                               let action = if let Some(action) = completion_action(None, true) {
+                                                               let (action_opt, raa_blocker_opt) = completion_action(None, true);
+                                                               if let Some(raa_blocker) = raa_blocker_opt {
+                                                                       debug_assert!(peer_state.actions_blocking_raa_monitor_updates.get(&chan_id).unwrap().contains(&raa_blocker));
+                                                               }
+                                                               let action = if let Some(action) = action_opt {
                                                                        action
                                                                } else {
                                                                        return;
                                                                };
+
                                                                mem::drop(peer_state_lock);
 
                                                                log_trace!(logger, "Completing monitor update completion action for channel {} as claim was redundant: {:?}",
@@ -6477,7 +6525,46 @@ where
                // `ChannelMonitor` we've provided the above update to. Instead, note that `Event`s are
                // generally always allowed to be duplicative (and it's specifically noted in
                // `PaymentForwarded`).
-               self.handle_monitor_update_completion_actions(completion_action(None, false));
+               let (action_opt, raa_blocker_opt) = completion_action(None, false);
+
+               if let Some(raa_blocker) = raa_blocker_opt {
+                       let counterparty_node_id = prev_hop.counterparty_node_id.or_else(||
+                               // prev_hop.counterparty_node_id is always available for payments received after
+                               // LDK 0.0.123, but for those received on 0.0.123 and claimed later, we need to
+                               // look up the counterparty in the `action_opt`, if possible.
+                               action_opt.as_ref().and_then(|action|
+                                       if let MonitorUpdateCompletionAction::PaymentClaimed { pending_mpp_claim, .. } = action {
+                                               pending_mpp_claim.as_ref().map(|(node_id, _, _, _)| *node_id)
+                                       } else { None }
+                               )
+                       );
+                       if let Some(counterparty_node_id) = counterparty_node_id {
+                               // TODO: Avoid always blocking the world for the write lock here.
+                               let mut per_peer_state = self.per_peer_state.write().unwrap();
+                               let peer_state_mutex = per_peer_state.entry(counterparty_node_id).or_insert_with(||
+                                       Mutex::new(PeerState {
+                                               channel_by_id: new_hash_map(),
+                                               inbound_channel_request_by_id: new_hash_map(),
+                                               latest_features: InitFeatures::empty(),
+                                               pending_msg_events: Vec::new(),
+                                               in_flight_monitor_updates: BTreeMap::new(),
+                                               monitor_update_blocked_actions: BTreeMap::new(),
+                                               actions_blocking_raa_monitor_updates: BTreeMap::new(),
+                                               is_connected: false,
+                                       }));
+                               let mut peer_state = peer_state_mutex.lock().unwrap();
+
+                               peer_state.actions_blocking_raa_monitor_updates
+                                       .entry(prev_hop.channel_id)
+                                       .or_insert_with(Vec::new)
+                                       .push(raa_blocker);
+                       } else {
+                               debug_assert!(false,
+                                       "RAA ChannelMonitorUpdate blockers are only set with PaymentClaimed completion actions, so we should always have a counterparty node id");
+                       }
+               }
+
+               self.handle_monitor_update_completion_actions(action_opt);
        }
 
        fn finalize_claims(&self, sources: Vec<HTLCSource>) {
@@ -6576,16 +6663,16 @@ where
                                                                        }
                                                                }), "{:?}", *background_events);
                                                        }
-                                                       None
+                                                       (None, None)
                                                } else if definitely_duplicate {
                                                        if let Some(other_chan) = chan_to_release {
-                                                               Some(MonitorUpdateCompletionAction::FreeOtherChannelImmediately {
+                                                               (Some(MonitorUpdateCompletionAction::FreeOtherChannelImmediately {
                                                                        downstream_counterparty_node_id: other_chan.counterparty_node_id,
                                                                        downstream_funding_outpoint: other_chan.funding_txo,
                                                                        downstream_channel_id: other_chan.channel_id,
                                                                        blocking_action: other_chan.blocking_action,
-                                                               })
-                                                       } else { None }
+                                                               }), None)
+                                                       } else { (None, None) }
                                                } else {
                                                        let total_fee_earned_msat = if let Some(forwarded_htlc_value) = forwarded_htlc_value_msat {
                                                                if let Some(claimed_htlc_value) = htlc_claim_value_msat {
@@ -6594,7 +6681,7 @@ where
                                                        } else { None };
                                                        debug_assert!(skimmed_fee_msat <= total_fee_earned_msat,
                                                                "skimmed_fee_msat must always be included in total_fee_earned_msat");
-                                                       Some(MonitorUpdateCompletionAction::EmitEventAndFreeOtherChannel {
+                                                       (Some(MonitorUpdateCompletionAction::EmitEventAndFreeOtherChannel {
                                                                event: events::Event::PaymentForwarded {
                                                                        prev_channel_id: Some(prev_channel_id),
                                                                        next_channel_id: Some(next_channel_id),
@@ -6606,7 +6693,7 @@ where
                                                                        outbound_amount_forwarded_msat: forwarded_htlc_value_msat,
                                                                },
                                                                downstream_counterparty_and_funding_outpoint: chan_to_release,
-                                                       })
+                                                       }), None)
                                                }
                                        });
                        },
@@ -6623,9 +6710,44 @@ where
                debug_assert_ne!(self.claimable_payments.held_by_thread(), LockHeldState::HeldByThread);
                debug_assert_ne!(self.per_peer_state.held_by_thread(), LockHeldState::HeldByThread);
 
+               let mut freed_channels = Vec::new();
+
                for action in actions.into_iter() {
                        match action {
-                               MonitorUpdateCompletionAction::PaymentClaimed { payment_hash } => {
+                               MonitorUpdateCompletionAction::PaymentClaimed { payment_hash, pending_mpp_claim } => {
+                                       if let Some((counterparty_node_id, chan_id, htlc_id, claim_ptr)) = pending_mpp_claim {
+                                               let per_peer_state = self.per_peer_state.read().unwrap();
+                                               per_peer_state.get(&counterparty_node_id).map(|peer_state_mutex| {
+                                                       let mut peer_state = peer_state_mutex.lock().unwrap();
+                                                       let blockers_entry = peer_state.actions_blocking_raa_monitor_updates.entry(chan_id);
+                                                       if let btree_map::Entry::Occupied(mut blockers) = blockers_entry {
+                                                               blockers.get_mut().retain(|blocker|
+                                                                       if let &RAAMonitorUpdateBlockingAction::ClaimedMPPPayment { pending_claim } = &blocker {
+                                                                               if *pending_claim == claim_ptr {
+                                                                                       let mut pending_claim_state_lock = pending_claim.0.lock().unwrap();
+                                                                                       let pending_claim_state = &mut *pending_claim_state_lock;
+                                                                                       pending_claim_state.channels_without_preimage.retain(|(cp, outp, cid, hid)| {
+                                                                                               if *cp == counterparty_node_id && *cid == chan_id && *hid == htlc_id {
+                                                                                                       pending_claim_state.channels_with_preimage.push((*cp, *outp, *cid));
+                                                                                                       false
+                                                                                               } else { true }
+                                                                                       });
+                                                                                       if pending_claim_state.channels_without_preimage.is_empty() {
+                                                                                               for (cp, outp, cid) in pending_claim_state.channels_with_preimage.iter() {
+                                                                                                       freed_channels.push((*cp, *outp, *cid, blocker.clone()));
+                                                                                               }
+                                                                                       }
+                                                                                       !pending_claim_state.channels_without_preimage.is_empty()
+                                                                               } else { true }
+                                                                       } else { true }
+                                                               );
+                                                               if blockers.get().is_empty() {
+                                                                       blockers.remove();
+                                                               }
+                                                       }
+                                               });
+                                       }
+
                                        let payment = self.claimable_payments.lock().unwrap().pending_claiming_payments.remove(&payment_hash);
                                        if let Some(ClaimingPayment {
                                                amount_msat,
@@ -6669,6 +6791,10 @@ where
                                },
                        }
                }
+
+               for (node_id, funding_outpoint, channel_id, blocker) in freed_channels {
+                       self.handle_monitor_update_release(node_id, funding_outpoint, channel_id, Some(blocker));
+               }
        }
 
        /// Handles a channel reentering a functional state, either due to reconnect or a monitor