]> git.bitcoin.ninja Git - rust-lightning/commitdiff
Add a `RAAMonitorUpdateBlockingAction::ClaimedMPPPayment`
authorMatt Corallo <git@bluematt.me>
Tue, 14 May 2024 00:03:39 +0000 (00:03 +0000)
committerMatt Corallo <git@bluematt.me>
Mon, 8 Jul 2024 19:06:59 +0000 (19:06 +0000)
If we claim an MPP payment and only persist some of the
`ChannelMonitorUpdate`s which include the preimage prior to
shutting down, we may be in a state where some of our
`ChannelMonitor`s have the preimage for a payment while others do
not.

This, it turns out, is actually mostly safe - on startup
`ChanelManager` will detect there's a payment it has as unclaimed
but there's a corresponding payment preimage in a `ChannelMonitor`
and go claim the other MPP parts. This works so long as the
`ChannelManager` has been persisted after the payment has been
received but prior to the `PaymentClaimable` event being processed
(and the claim itself occurring). This is not always true and
certainly not required on our API, but our
`lightning-background-processor` today does persist prior to event
handling so is generally true subject to some race conditions.

In order to address this race we need to use copy payment preimages
across channels irrespective of the `ChannelManager`'s payment
state, but this introduces another wrinkle - if one channel makes
substantial progress while other channel(s) are still waiting to
get the payment preimage in `ChannelMonitor`(s) while the
`ChannelManager` hasn't been persisted after the payment was
received, we may end up without the preimage on disk.

Here, we address this issue with a new
`RAAMonitorUpdateBlockingAction` variant for this specific case. We
block persistence of an RAA `ChannelMonitorUpdate` which may remove
the preimage from disk until all channels have had the preimage
added to their `ChannelMonitor`.

We do this only in-memory (and not on disk) as we can recreate this
blocker during the startup re-claim logic.

This will enable us to claim MPP parts without using the
`ChannelManager`'s payment state in later work.

lightning/src/ln/channelmanager.rs

index a248be1f0891ebb835ddc056805076a6b4b8a7e6..11f686bf3addca8a15adba49ec6bf66c91a696e5 100644 (file)
@@ -757,6 +757,42 @@ enum BackgroundEvent {
        },
 }
 
+/// A pointer to a channel that is unblocked when an event is surfaced
+#[derive(Debug)]
+pub(crate) struct EventUnblockedChannel {
+       counterparty_node_id: PublicKey,
+       funding_txo: OutPoint,
+       channel_id: ChannelId,
+       blocking_action: RAAMonitorUpdateBlockingAction,
+}
+
+impl Writeable for EventUnblockedChannel {
+       fn write<W: Writer>(&self, writer: &mut W) -> Result<(), io::Error> {
+               self.counterparty_node_id.write(writer)?;
+               self.funding_txo.write(writer)?;
+               self.channel_id.write(writer)?;
+               self.blocking_action.write(writer)
+       }
+}
+
+impl MaybeReadable for EventUnblockedChannel {
+       fn read<R: Read>(reader: &mut R) -> Result<Option<Self>, DecodeError> {
+               let counterparty_node_id = Readable::read(reader)?;
+               let funding_txo = Readable::read(reader)?;
+               let channel_id = Readable::read(reader)?;
+               let blocking_action = match RAAMonitorUpdateBlockingAction::read(reader)? {
+                       Some(blocking_action) => blocking_action,
+                       None => return Ok(None),
+               };
+               Ok(Some(EventUnblockedChannel {
+                       counterparty_node_id,
+                       funding_txo,
+                       channel_id,
+                       blocking_action,
+               }))
+       }
+}
+
 #[derive(Debug)]
 pub(crate) enum MonitorUpdateCompletionAction {
        /// Indicates that a payment ultimately destined for us was claimed and we should emit an
@@ -774,7 +810,7 @@ pub(crate) enum MonitorUpdateCompletionAction {
        /// outbound edge.
        EmitEventAndFreeOtherChannel {
                event: events::Event,
-               downstream_counterparty_and_funding_outpoint: Option<(PublicKey, OutPoint, ChannelId, RAAMonitorUpdateBlockingAction)>,
+               downstream_counterparty_and_funding_outpoint: Option<EventUnblockedChannel>,
        },
        /// Indicates we should immediately resume the operation of another channel, unless there is
        /// some other reason why the channel is blocked. In practice this simply means immediately
@@ -803,7 +839,7 @@ impl_writeable_tlv_based_enum_upgradable!(MonitorUpdateCompletionAction,
        (1, FreeOtherChannelImmediately) => {
                (0, downstream_counterparty_node_id, required),
                (2, downstream_funding_outpoint, required),
-               (4, blocking_action, required),
+               (4, blocking_action, upgradable_required),
                // Note that by the time we get past the required read above, downstream_funding_outpoint will be
                // filled in, so we can safely unwrap it here.
                (5, downstream_channel_id, (default_value, ChannelId::v1_from_funding_outpoint(downstream_funding_outpoint.0.unwrap()))),
@@ -815,7 +851,7 @@ impl_writeable_tlv_based_enum_upgradable!(MonitorUpdateCompletionAction,
                // monitor updates which aren't properly blocked or resumed, however that's fine - we don't
                // support async monitor updates even in LDK 0.0.116 and once we do we'll require no
                // downgrades to prior versions.
-               (1, downstream_counterparty_and_funding_outpoint, option),
+               (1, downstream_counterparty_and_funding_outpoint, upgradable_option),
        },
 );
 
@@ -837,6 +873,26 @@ impl_writeable_tlv_based_enum!(EventCompletionAction,
        };
 );
 
+#[derive(Debug)]
+pub(crate) struct PendingMPPClaim {
+       channels_without_preimage: Vec<(PublicKey, OutPoint, ChannelId, u64)>,
+       channels_with_preimage: Vec<(PublicKey, OutPoint, ChannelId)>,
+}
+
+#[derive(Clone)]
+pub(crate) struct PendingMPPClaimPointer(Arc<Mutex<PendingMPPClaim>>);
+
+impl PartialEq for PendingMPPClaimPointer {
+       fn eq(&self, o: &Self) -> bool { Arc::ptr_eq(&self.0, &o.0) }
+}
+impl Eq for PendingMPPClaimPointer {}
+
+impl core::fmt::Debug for PendingMPPClaimPointer {
+       fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> Result<(), core::fmt::Error> {
+               self.0.lock().unwrap().fmt(f)
+       }
+}
+
 #[derive(Clone, PartialEq, Eq, Debug)]
 /// If something is blocked on the completion of an RAA-generated [`ChannelMonitorUpdate`] we track
 /// the blocked action here. See enum variants for more info.
@@ -850,6 +906,16 @@ pub(crate) enum RAAMonitorUpdateBlockingAction {
                /// The HTLC ID on the inbound edge.
                htlc_id: u64,
        },
+       /// We claimed an MPP payment across multiple channels. We have to block removing the payment
+       /// preimage from any monitor until the last monitor is updated to contain the payment
+       /// preimage. Otherwise we may not be able to replay the preimage on the monitor(s) that
+       /// weren't updated on startup.
+       ///
+       /// This variant is *not* written to disk, instead being inferred from [`ChannelMonitor`]
+       /// state.
+       ClaimedMPPPayment {
+               pending_claim: PendingMPPClaimPointer,
+       }
 }
 
 impl RAAMonitorUpdateBlockingAction {
@@ -861,10 +927,16 @@ impl RAAMonitorUpdateBlockingAction {
        }
 }
 
-impl_writeable_tlv_based_enum!(RAAMonitorUpdateBlockingAction,
-       (0, ForwardedPaymentInboundClaim) => { (0, channel_id, required), (2, htlc_id, required) }
-;);
+impl_writeable_tlv_based_enum_upgradable!(RAAMonitorUpdateBlockingAction,
+       (0, ForwardedPaymentInboundClaim) => { (0, channel_id, required), (2, htlc_id, required) },
+       unread_variants: ClaimedMPPPayment
+);
 
+impl Readable for Option<RAAMonitorUpdateBlockingAction> {
+       fn read<R: Read>(reader: &mut R) -> Result<Self, DecodeError> {
+               Ok(RAAMonitorUpdateBlockingAction::read(reader)?)
+       }
+}
 
 /// State we hold per-peer.
 pub(super) struct PeerState<SP: Deref> where SP::Target: SignerProvider {
@@ -6442,7 +6514,12 @@ where
                                        |htlc_claim_value_msat, definitely_duplicate| {
                                                let chan_to_release =
                                                        if let Some(node_id) = next_channel_counterparty_node_id {
-                                                               Some((node_id, next_channel_outpoint, next_channel_id, completed_blocker))
+                                                               Some(EventUnblockedChannel {
+                                                                       counterparty_node_id: node_id,
+                                                                       funding_txo: next_channel_outpoint,
+                                                                       channel_id: next_channel_id,
+                                                                       blocking_action: completed_blocker
+                                                               })
                                                        } else {
                                                                // We can only get `None` here if we are processing a
                                                                // `ChannelMonitor`-originated event, in which case we
@@ -6503,10 +6580,10 @@ where
                                                } else if definitely_duplicate {
                                                        if let Some(other_chan) = chan_to_release {
                                                                Some(MonitorUpdateCompletionAction::FreeOtherChannelImmediately {
-                                                                       downstream_counterparty_node_id: other_chan.0,
-                                                                       downstream_funding_outpoint: other_chan.1,
-                                                                       downstream_channel_id: other_chan.2,
-                                                                       blocking_action: other_chan.3,
+                                                                       downstream_counterparty_node_id: other_chan.counterparty_node_id,
+                                                                       downstream_funding_outpoint: other_chan.funding_txo,
+                                                                       downstream_channel_id: other_chan.channel_id,
+                                                                       blocking_action: other_chan.blocking_action,
                                                                })
                                                        } else { None }
                                                } else {
@@ -6573,8 +6650,11 @@ where
                                        event, downstream_counterparty_and_funding_outpoint
                                } => {
                                        self.pending_events.lock().unwrap().push_back((event, None));
-                                       if let Some((node_id, funding_outpoint, channel_id, blocker)) = downstream_counterparty_and_funding_outpoint {
-                                               self.handle_monitor_update_release(node_id, funding_outpoint, channel_id, Some(blocker));
+                                       if let Some(unblocked) = downstream_counterparty_and_funding_outpoint {
+                                               self.handle_monitor_update_release(
+                                                       unblocked.counterparty_node_id, unblocked.funding_txo,
+                                                       unblocked.channel_id, Some(unblocked.blocking_action),
+                                               );
                                        }
                                },
                                MonitorUpdateCompletionAction::FreeOtherChannelImmediately {
@@ -12075,7 +12155,12 @@ where
                                        for action in actions.iter() {
                                                if let MonitorUpdateCompletionAction::EmitEventAndFreeOtherChannel {
                                                        downstream_counterparty_and_funding_outpoint:
-                                                               Some((blocked_node_id, _blocked_channel_outpoint, blocked_channel_id, blocking_action)), ..
+                                                               Some(EventUnblockedChannel {
+                                                                       counterparty_node_id: blocked_node_id,
+                                                                       funding_txo: _,
+                                                                       channel_id: blocked_channel_id,
+                                                                       blocking_action,
+                                                               }), ..
                                                } = action {
                                                        if let Some(blocked_peer_state) = per_peer_state.get(blocked_node_id) {
                                                                log_trace!(logger,