Add infra to block `ChannelMonitorUpdate`s on forwarded claims 2023-04-monitor-e-monitor-prep
authorMatt Corallo <git@bluematt.me>
Fri, 7 Apr 2023 00:31:39 +0000 (00:31 +0000)
committerMatt Corallo <git@bluematt.me>
Tue, 30 May 2023 23:05:03 +0000 (23:05 +0000)
When we forward a payment and receive an `update_fulfill_htlc`
message from the downstream channel, we immediately claim the HTLC
on the upstream channel, before even doing a `commitment_signed`
dance on the downstream channel. This implies that our
`ChannelMonitorUpdate`s "go out" in the right order - first we
ensure we'll get our money by writing the preimage down, then we
write the update that resolves giving money on the downstream node.

This is safe as long as `ChannelMonitorUpdate`s complete in the
order in which they are generated, but of course looking forward we
want to support asynchronous updates, which may complete in any
order.

Here we add infrastructure to handle downstream
`ChannelMonitorUpdate`s which are blocked on an upstream
preimage-containing one. We don't yet actually do the blocking which
will come in a future commit.

lightning/src/ln/channelmanager.rs

index 2735aac5c51ac1df9dd6e1954aca95d2e739d8f4..0ec8bddf656a791cfe07cf279b8ecc76994a89ed 100644 (file)
@@ -532,13 +532,31 @@ pub(crate) enum MonitorUpdateCompletionAction {
        /// this payment. Note that this is only best-effort. On restart it's possible such a duplicate
        /// event can be generated.
        PaymentClaimed { payment_hash: PaymentHash },
-       /// Indicates an [`events::Event`] should be surfaced to the user.
-       EmitEvent { event: events::Event },
+       /// Indicates an [`events::Event`] should be surfaced to the user and possibly resume the
+       /// operation of another channel.
+       ///
+       /// This is usually generated when we've forwarded an HTLC and want to block the outbound edge
+       /// from completing a monitor update which removes the payment preimage until the inbound edge
+       /// completes a monitor update containing the payment preimage. In that case, after the inbound
+       /// edge completes, we will surface an [`Event::PaymentForwarded`] as well as unblock the
+       /// outbound edge.
+       EmitEventAndFreeOtherChannel {
+               event: events::Event,
+               downstream_counterparty_and_funding_outpoint: Option<(PublicKey, OutPoint, RAAMonitorUpdateBlockingAction)>,
+       },
 }
 
 impl_writeable_tlv_based_enum_upgradable!(MonitorUpdateCompletionAction,
        (0, PaymentClaimed) => { (0, payment_hash, required) },
-       (2, EmitEvent) => { (0, event, upgradable_required) },
+       (2, EmitEventAndFreeOtherChannel) => {
+               (0, event, upgradable_required),
+               // LDK prior to 0.0.116 did not have this field as the monitor update application order was
+               // required by clients. If we downgrade to something prior to 0.0.116 this may result in
+               // monitor updates which aren't properly blocked or resumed, however that's fine - we don't
+               // support async monitor updates even in LDK 0.0.116 and once we do we'll require no
+               // downgrades to prior versions.
+               (1, downstream_counterparty_and_funding_outpoint, option),
+       },
 );
 
 #[derive(Clone, Debug, PartialEq, Eq)]
@@ -555,6 +573,36 @@ impl_writeable_tlv_based_enum!(EventCompletionAction,
        };
 );
 
+#[derive(Clone, PartialEq, Eq, Debug)]
+/// If something is blocked on the completion of an RAA-generated [`ChannelMonitorUpdate`] we track
+/// the blocked action here. See enum variants for more info.
+pub(crate) enum RAAMonitorUpdateBlockingAction {
+       /// A forwarded payment was claimed. We block the downstream channel completing its monitor
+       /// update which removes the HTLC preimage until the upstream channel has gotten the preimage
+       /// durably to disk.
+       ForwardedPaymentInboundClaim {
+               /// The upstream channel ID (i.e. the inbound edge).
+               channel_id: [u8; 32],
+               /// The HTLC ID on the inbound edge.
+               htlc_id: u64,
+       },
+}
+
+impl RAAMonitorUpdateBlockingAction {
+       #[allow(unused)]
+       fn from_prev_hop_data(prev_hop: &HTLCPreviousHopData) -> Self {
+               Self::ForwardedPaymentInboundClaim {
+                       channel_id: prev_hop.outpoint.to_channel_id(),
+                       htlc_id: prev_hop.htlc_id,
+               }
+       }
+}
+
+impl_writeable_tlv_based_enum!(RAAMonitorUpdateBlockingAction,
+       (0, ForwardedPaymentInboundClaim) => { (0, channel_id, required), (2, htlc_id, required) }
+;);
+
+
 /// State we hold per-peer.
 pub(super) struct PeerState<Signer: ChannelSigner> {
        /// `temporary_channel_id` or `channel_id` -> `channel`.
@@ -583,6 +631,11 @@ pub(super) struct PeerState<Signer: ChannelSigner> {
        /// to funding appearing on-chain), the downstream `ChannelMonitor` set is required to ensure
        /// duplicates do not occur, so such channels should fail without a monitor update completing.
        monitor_update_blocked_actions: BTreeMap<[u8; 32], Vec<MonitorUpdateCompletionAction>>,
+       /// If another channel's [`ChannelMonitorUpdate`] needs to complete before a channel we have
+       /// with this peer can complete an RAA [`ChannelMonitorUpdate`] (e.g. because the RAA update
+       /// will remove a preimage that needs to be durably in an upstream channel first), we put an
+       /// entry here to note that the channel with the key's ID is blocked on a set of actions.
+       actions_blocking_raa_monitor_updates: BTreeMap<[u8; 32], Vec<RAAMonitorUpdateBlockingAction>>,
        /// The peer is currently connected (i.e. we've seen a
        /// [`ChannelMessageHandler::peer_connected`] and no corresponding
        /// [`ChannelMessageHandler::peer_disconnected`].
@@ -4490,16 +4543,16 @@ where
                                                                Some(claimed_htlc_value - forwarded_htlc_value)
                                                        } else { None };
 
-                                                       let prev_channel_id = Some(prev_outpoint.to_channel_id());
-                                                       let next_channel_id = Some(next_channel_id);
-
-                                                       Some(MonitorUpdateCompletionAction::EmitEvent { event: events::Event::PaymentForwarded {
-                                                               fee_earned_msat,
-                                                               claim_from_onchain_tx: from_onchain,
-                                                               prev_channel_id,
-                                                               next_channel_id,
-                                                               outbound_amount_forwarded_msat: forwarded_htlc_value_msat,
-                                                       }})
+                                                       Some(MonitorUpdateCompletionAction::EmitEventAndFreeOtherChannel {
+                                                               event: events::Event::PaymentForwarded {
+                                                                       fee_earned_msat,
+                                                                       claim_from_onchain_tx: from_onchain,
+                                                                       prev_channel_id: Some(prev_outpoint.to_channel_id()),
+                                                                       next_channel_id: Some(next_channel_id),
+                                                                       outbound_amount_forwarded_msat: forwarded_htlc_value_msat,
+                                                               },
+                                                               downstream_counterparty_and_funding_outpoint: None,
+                                                       })
                                                } else { None }
                                        });
                                if let Err((pk, err)) = res {
@@ -4526,8 +4579,13 @@ where
                                                }, None));
                                        }
                                },
-                               MonitorUpdateCompletionAction::EmitEvent { event } => {
+                               MonitorUpdateCompletionAction::EmitEventAndFreeOtherChannel {
+                                       event, downstream_counterparty_and_funding_outpoint
+                               } => {
                                        self.pending_events.lock().unwrap().push_back((event, None));
+                                       if let Some((node_id, funding_outpoint, blocker)) = downstream_counterparty_and_funding_outpoint {
+                                               self.handle_monitor_update_release(node_id, funding_outpoint, Some(blocker));
+                                       }
                                },
                        }
                }
@@ -5374,6 +5432,24 @@ where
                }
        }
 
+       /// Checks whether [`ChannelMonitorUpdate`]s generated by the receipt of a remote
+       /// [`msgs::RevokeAndACK`] should be held for the given channel until some other event
+       /// completes. Note that this needs to happen in the same [`PeerState`] mutex as any release of
+       /// the [`ChannelMonitorUpdate`] in question.
+       fn raa_monitor_updates_held(&self,
+               actions_blocking_raa_monitor_updates: &BTreeMap<[u8; 32], Vec<RAAMonitorUpdateBlockingAction>>,
+               channel_funding_outpoint: OutPoint, counterparty_node_id: PublicKey
+       ) -> bool {
+               actions_blocking_raa_monitor_updates
+                       .get(&channel_funding_outpoint.to_channel_id()).map(|v| !v.is_empty()).unwrap_or(false)
+               || self.pending_events.lock().unwrap().iter().any(|(_, action)| {
+                       action == &Some(EventCompletionAction::ReleaseRAAChannelMonitorUpdate {
+                               channel_funding_outpoint,
+                               counterparty_node_id,
+                       })
+               })
+       }
+
        fn internal_revoke_and_ack(&self, counterparty_node_id: &PublicKey, msg: &msgs::RevokeAndACK) -> Result<(), MsgHandleErrInternal> {
                let (htlcs_to_fail, res) = {
                        let per_peer_state = self.per_peer_state.read().unwrap();
@@ -6038,25 +6114,37 @@ where
                self.pending_outbound_payments.clear_pending_payments()
        }
 
-       fn handle_monitor_update_release(&self, counterparty_node_id: PublicKey, channel_funding_outpoint: OutPoint) {
+       /// When something which was blocking a channel from updating its [`ChannelMonitor`] (e.g. an
+       /// [`Event`] being handled) completes, this should be called to restore the channel to normal
+       /// operation. It will double-check that nothing *else* is also blocking the same channel from
+       /// making progress and then any blocked [`ChannelMonitorUpdate`]s fly.
+       fn handle_monitor_update_release(&self, counterparty_node_id: PublicKey, channel_funding_outpoint: OutPoint, mut completed_blocker: Option<RAAMonitorUpdateBlockingAction>) {
                let mut errors = Vec::new();
                loop {
                        let per_peer_state = self.per_peer_state.read().unwrap();
                        if let Some(peer_state_mtx) = per_peer_state.get(&counterparty_node_id) {
                                let mut peer_state_lck = peer_state_mtx.lock().unwrap();
                                let peer_state = &mut *peer_state_lck;
-                               if self.pending_events.lock().unwrap().iter()
-                                       .any(|(_ev, action_opt)| action_opt == &Some(EventCompletionAction::ReleaseRAAChannelMonitorUpdate {
-                                               channel_funding_outpoint, counterparty_node_id
-                                       }))
-                               {
-                                       // Check that, while holding the peer lock, we don't have another event
-                                       // blocking any monitor updates for this channel. If we do, let those
-                                       // events be the ones that ultimately release the monitor update(s).
-                                       log_trace!(self.logger, "Delaying monitor unlock for channel {} as another event is pending",
+
+                               if let Some(blocker) = completed_blocker.take() {
+                                       // Only do this on the first iteration of the loop.
+                                       if let Some(blockers) = peer_state.actions_blocking_raa_monitor_updates
+                                               .get_mut(&channel_funding_outpoint.to_channel_id())
+                                       {
+                                               blockers.retain(|iter| iter != &blocker);
+                                       }
+                               }
+
+                               if self.raa_monitor_updates_held(&peer_state.actions_blocking_raa_monitor_updates,
+                                       channel_funding_outpoint, counterparty_node_id) {
+                                       // Check that, while holding the peer lock, we don't have anything else
+                                       // blocking monitor updates for this channel. If we do, release the monitor
+                                       // update(s) when those blockers complete.
+                                       log_trace!(self.logger, "Delaying monitor unlock for channel {} as another channel's mon update needs to complete first",
                                                log_bytes!(&channel_funding_outpoint.to_channel_id()[..]));
                                        break;
                                }
+
                                if let hash_map::Entry::Occupied(mut chan) = peer_state.channel_by_id.entry(channel_funding_outpoint.to_channel_id()) {
                                        debug_assert_eq!(chan.get().get_funding_txo().unwrap(), channel_funding_outpoint);
                                        if let Some((monitor_update, further_update_exists)) = chan.get_mut().unblock_next_blocked_monitor_update() {
@@ -6098,7 +6186,7 @@ where
                                EventCompletionAction::ReleaseRAAChannelMonitorUpdate {
                                        channel_funding_outpoint, counterparty_node_id
                                } => {
-                                       self.handle_monitor_update_release(counterparty_node_id, channel_funding_outpoint);
+                                       self.handle_monitor_update_release(counterparty_node_id, channel_funding_outpoint, None);
                                }
                        }
                }
@@ -6774,6 +6862,7 @@ where
                                                latest_features: init_msg.features.clone(),
                                                pending_msg_events: Vec::new(),
                                                monitor_update_blocked_actions: BTreeMap::new(),
+                                               actions_blocking_raa_monitor_updates: BTreeMap::new(),
                                                is_connected: true,
                                        }));
                                },
@@ -7970,6 +8059,7 @@ where
                                latest_features: Readable::read(reader)?,
                                pending_msg_events: Vec::new(),
                                monitor_update_blocked_actions: BTreeMap::new(),
+                               actions_blocking_raa_monitor_updates: BTreeMap::new(),
                                is_connected: false,
                        };
                        per_peer_state.insert(peer_pubkey, Mutex::new(peer_state));
@@ -8051,7 +8141,7 @@ where
                let mut claimable_htlc_purposes = None;
                let mut claimable_htlc_onion_fields = None;
                let mut pending_claiming_payments = Some(HashMap::new());
-               let mut monitor_update_blocked_actions_per_peer = Some(Vec::new());
+               let mut monitor_update_blocked_actions_per_peer: Option<Vec<(_, BTreeMap<_, Vec<_>>)>> = Some(Vec::new());
                let mut events_override = None;
                read_tlv_fields!(reader, {
                        (1, pending_outbound_payments_no_retry, option),
@@ -8376,7 +8466,21 @@ where
                }
 
                for (node_id, monitor_update_blocked_actions) in monitor_update_blocked_actions_per_peer.unwrap() {
-                       if let Some(peer_state) = per_peer_state.get_mut(&node_id) {
+                       if let Some(peer_state) = per_peer_state.get(&node_id) {
+                               for (_, actions) in monitor_update_blocked_actions.iter() {
+                                       for action in actions.iter() {
+                                               if let MonitorUpdateCompletionAction::EmitEventAndFreeOtherChannel {
+                                                       downstream_counterparty_and_funding_outpoint:
+                                                               Some((blocked_node_id, blocked_channel_outpoint, blocking_action)), ..
+                                               } = action {
+                                                       if let Some(blocked_peer_state) = per_peer_state.get(&blocked_node_id) {
+                                                               blocked_peer_state.lock().unwrap().actions_blocking_raa_monitor_updates
+                                                                       .entry(blocked_channel_outpoint.to_channel_id())
+                                                                       .or_insert_with(Vec::new).push(blocking_action.clone());
+                                                       }
+                                               }
+                                       }
+                               }
                                peer_state.lock().unwrap().monitor_update_blocked_actions = monitor_update_blocked_actions;
                        } else {
                                log_error!(args.logger, "Got blocked actions without a per-peer-state for {}", node_id);