Merge pull request #2661 from TheBlueMatt/2023-10-dup-claim-chan-hang
[rust-lightning] / lightning / src / ln / channelmanager.rs
index 113d690215599d87618b535ab6259bab89635a69..9af65d96c3b088e931ecd029fcff6961fa27b5b1 100644 (file)
@@ -568,6 +568,7 @@ struct ClaimablePayments {
 /// usually because we're running pre-full-init. They are handled immediately once we detect we are
 /// running normally, and specifically must be processed before any other non-background
 /// [`ChannelMonitorUpdate`]s are applied.
+#[derive(Debug)]
 enum BackgroundEvent {
        /// Handle a ChannelMonitorUpdate which closes the channel or for an already-closed channel.
        /// This is only separated from [`Self::MonitorUpdateRegeneratedOnStartup`] as the
@@ -620,10 +621,34 @@ pub(crate) enum MonitorUpdateCompletionAction {
                event: events::Event,
                downstream_counterparty_and_funding_outpoint: Option<(PublicKey, OutPoint, RAAMonitorUpdateBlockingAction)>,
        },
+       /// Indicates we should immediately resume the operation of another channel, unless there is
+       /// some other reason why the channel is blocked. In practice this simply means immediately
+       /// removing the [`RAAMonitorUpdateBlockingAction`] provided from the blocking set.
+       ///
+       /// This is usually generated when we've forwarded an HTLC and want to block the outbound edge
+       /// from completing a monitor update which removes the payment preimage until the inbound edge
+       /// completes a monitor update containing the payment preimage. However, we use this variant
+       /// instead of [`Self::EmitEventAndFreeOtherChannel`] when we discover that the claim was in
+       /// fact duplicative and we simply want to resume the outbound edge channel immediately.
+       ///
+       /// This variant should thus never be written to disk, as it is processed inline rather than
+       /// stored for later processing.
+       FreeOtherChannelImmediately {
+               downstream_counterparty_node_id: PublicKey,
+               downstream_funding_outpoint: OutPoint,
+               blocking_action: RAAMonitorUpdateBlockingAction,
+       },
 }
 
 impl_writeable_tlv_based_enum_upgradable!(MonitorUpdateCompletionAction,
        (0, PaymentClaimed) => { (0, payment_hash, required) },
+       // Note that FreeOtherChannelImmediately should never be written - we were supposed to free
+       // *immediately*. However, for simplicity we implement read/write here.
+       (1, FreeOtherChannelImmediately) => {
+               (0, downstream_counterparty_node_id, required),
+               (2, downstream_funding_outpoint, required),
+               (4, blocking_action, required),
+       },
        (2, EmitEventAndFreeOtherChannel) => {
                (0, event, upgradable_required),
                // LDK prior to 0.0.116 did not have this field as the monitor update application order was
@@ -5378,8 +5403,11 @@ where
                        for htlc in sources.drain(..) {
                                if let Err((pk, err)) = self.claim_funds_from_hop(
                                        htlc.prev_hop, payment_preimage,
-                                       |_| Some(MonitorUpdateCompletionAction::PaymentClaimed { payment_hash }))
-                               {
+                                       |_, definitely_duplicate| {
+                                               debug_assert!(!definitely_duplicate, "We shouldn't claim duplicatively from a payment");
+                                               Some(MonitorUpdateCompletionAction::PaymentClaimed { payment_hash })
+                                       }
+                               ) {
                                        if let msgs::ErrorAction::IgnoreError = err.err.action {
                                                // We got a temporary failure updating monitor, but will claim the
                                                // HTLC when the monitor updating is restored (or on chain).
@@ -5407,7 +5435,7 @@ where
                }
        }
 
-       fn claim_funds_from_hop<ComplFunc: FnOnce(Option<u64>) -> Option<MonitorUpdateCompletionAction>>(&self,
+       fn claim_funds_from_hop<ComplFunc: FnOnce(Option<u64>, bool) -> Option<MonitorUpdateCompletionAction>>(&self,
                prev_hop: HTLCPreviousHopData, payment_preimage: PaymentPreimage, completion_action: ComplFunc)
        -> Result<(), (PublicKey, MsgHandleErrInternal)> {
                //TODO: Delay the claimed_funds relaying just like we do outbound relay!
@@ -5417,6 +5445,11 @@ where
                // `BackgroundEvent`s.
                let during_init = !self.background_events_processed_since_startup.load(Ordering::Acquire);
 
+               // As we may call handle_monitor_update_completion_actions in rather rare cases, check that
+               // the required mutexes are not held before we start.
+               debug_assert_ne!(self.pending_events.held_by_thread(), LockHeldState::HeldByThread);
+               debug_assert_ne!(self.claimable_payments.held_by_thread(), LockHeldState::HeldByThread);
+
                {
                        let per_peer_state = self.per_peer_state.read().unwrap();
                        let chan_id = prev_hop.outpoint.to_channel_id();
@@ -5438,25 +5471,70 @@ where
                                                let counterparty_node_id = chan.context.get_counterparty_node_id();
                                                let fulfill_res = chan.get_update_fulfill_htlc_and_commit(prev_hop.htlc_id, payment_preimage, &self.logger);
 
-                                               if let UpdateFulfillCommitFetch::NewClaim { htlc_value_msat, monitor_update } = fulfill_res {
-                                                       if let Some(action) = completion_action(Some(htlc_value_msat)) {
-                                                               log_trace!(self.logger, "Tracking monitor update completion action for channel {}: {:?}",
-                                                                       chan_id, action);
-                                                               peer_state.monitor_update_blocked_actions.entry(chan_id).or_insert(Vec::new()).push(action);
+                                               match fulfill_res {
+                                                       UpdateFulfillCommitFetch::NewClaim { htlc_value_msat, monitor_update } => {
+                                                               if let Some(action) = completion_action(Some(htlc_value_msat), false) {
+                                                                       log_trace!(self.logger, "Tracking monitor update completion action for channel {}: {:?}",
+                                                                               chan_id, action);
+                                                                       peer_state.monitor_update_blocked_actions.entry(chan_id).or_insert(Vec::new()).push(action);
+                                                               }
+                                                               if !during_init {
+                                                                       handle_new_monitor_update!(self, prev_hop.outpoint, monitor_update, peer_state_lock,
+                                                                               peer_state, per_peer_state, chan);
+                                                               } else {
+                                                                       // If we're running during init we cannot update a monitor directly -
+                                                                       // they probably haven't actually been loaded yet. Instead, push the
+                                                                       // monitor update as a background event.
+                                                                       self.pending_background_events.lock().unwrap().push(
+                                                                               BackgroundEvent::MonitorUpdateRegeneratedOnStartup {
+                                                                                       counterparty_node_id,
+                                                                                       funding_txo: prev_hop.outpoint,
+                                                                                       update: monitor_update.clone(),
+                                                                               });
+                                                               }
                                                        }
-                                                       if !during_init {
-                                                               handle_new_monitor_update!(self, prev_hop.outpoint, monitor_update, peer_state_lock,
-                                                                       peer_state, per_peer_state, chan);
-                                                       } else {
-                                                               // If we're running during init we cannot update a monitor directly -
-                                                               // they probably haven't actually been loaded yet. Instead, push the
-                                                               // monitor update as a background event.
-                                                               self.pending_background_events.lock().unwrap().push(
-                                                                       BackgroundEvent::MonitorUpdateRegeneratedOnStartup {
-                                                                               counterparty_node_id,
-                                                                               funding_txo: prev_hop.outpoint,
-                                                                               update: monitor_update.clone(),
-                                                                       });
+                                                       UpdateFulfillCommitFetch::DuplicateClaim {} => {
+                                                               let action = if let Some(action) = completion_action(None, true) {
+                                                                       action
+                                                               } else {
+                                                                       return Ok(());
+                                                               };
+                                                               mem::drop(peer_state_lock);
+
+                                                               log_trace!(self.logger, "Completing monitor update completion action for channel {} as claim was redundant: {:?}",
+                                                                       chan_id, action);
+                                                               let (node_id, funding_outpoint, blocker) =
+                                                               if let MonitorUpdateCompletionAction::FreeOtherChannelImmediately {
+                                                                       downstream_counterparty_node_id: node_id,
+                                                                       downstream_funding_outpoint: funding_outpoint,
+                                                                       blocking_action: blocker,
+                                                               } = action {
+                                                                       (node_id, funding_outpoint, blocker)
+                                                               } else {
+                                                                       debug_assert!(false,
+                                                                               "Duplicate claims should always free another channel immediately");
+                                                                       return Ok(());
+                                                               };
+                                                               if let Some(peer_state_mtx) = per_peer_state.get(&node_id) {
+                                                                       let mut peer_state = peer_state_mtx.lock().unwrap();
+                                                                       if let Some(blockers) = peer_state
+                                                                               .actions_blocking_raa_monitor_updates
+                                                                               .get_mut(&funding_outpoint.to_channel_id())
+                                                                       {
+                                                                               let mut found_blocker = false;
+                                                                               blockers.retain(|iter| {
+                                                                                       // Note that we could actually be blocked, in
+                                                                                       // which case we need to only remove the one
+                                                                                       // blocker which was added duplicatively.
+                                                                                       let first_blocker = !found_blocker;
+                                                                                       if *iter == blocker { found_blocker = true; }
+                                                                                       *iter != blocker || !first_blocker
+                                                                               });
+                                                                               debug_assert!(found_blocker);
+                                                                       }
+                                                               } else {
+                                                                       debug_assert!(false);
+                                                               }
                                                        }
                                                }
                                        }
@@ -5504,7 +5582,7 @@ where
                // `ChannelMonitor` we've provided the above update to. Instead, note that `Event`s are
                // generally always allowed to be duplicative (and it's specifically noted in
                // `PaymentForwarded`).
-               self.handle_monitor_update_completion_actions(completion_action(None));
+               self.handle_monitor_update_completion_actions(completion_action(None, false));
                Ok(())
        }
 
@@ -5513,7 +5591,7 @@ where
        }
 
        fn claim_funds_internal(&self, source: HTLCSource, payment_preimage: PaymentPreimage,
-               forwarded_htlc_value_msat: Option<u64>, from_onchain: bool,
+               forwarded_htlc_value_msat: Option<u64>, from_onchain: bool, startup_replay: bool,
                next_channel_counterparty_node_id: Option<PublicKey>, next_channel_outpoint: OutPoint
        ) {
                match source {
@@ -5534,13 +5612,84 @@ where
                        HTLCSource::PreviousHopData(hop_data) => {
                                let prev_outpoint = hop_data.outpoint;
                                let completed_blocker = RAAMonitorUpdateBlockingAction::from_prev_hop_data(&hop_data);
+                               #[cfg(debug_assertions)]
+                               let claiming_chan_funding_outpoint = hop_data.outpoint;
                                let res = self.claim_funds_from_hop(hop_data, payment_preimage,
-                                       |htlc_claim_value_msat| {
-                                               if let Some(forwarded_htlc_value) = forwarded_htlc_value_msat {
-                                                       let fee_earned_msat = if let Some(claimed_htlc_value) = htlc_claim_value_msat {
-                                                               Some(claimed_htlc_value - forwarded_htlc_value)
-                                                       } else { None };
+                                       |htlc_claim_value_msat, definitely_duplicate| {
+                                               let chan_to_release =
+                                                       if let Some(node_id) = next_channel_counterparty_node_id {
+                                                               Some((node_id, next_channel_outpoint, completed_blocker))
+                                                       } else {
+                                                               // We can only get `None` here if we are processing a
+                                                               // `ChannelMonitor`-originated event, in which case we
+                                                               // don't care about ensuring we wake the downstream
+                                                               // channel's monitor updating - the channel is already
+                                                               // closed.
+                                                               None
+                                                       };
 
+                                               if definitely_duplicate && startup_replay {
+                                                       // On startup we may get redundant claims which are related to
+                                                       // monitor updates still in flight. In that case, we shouldn't
+                                                       // immediately free, but instead let that monitor update complete
+                                                       // in the background.
+                                                       #[cfg(debug_assertions)] {
+                                                               let background_events = self.pending_background_events.lock().unwrap();
+                                                               // There should be a `BackgroundEvent` pending...
+                                                               assert!(background_events.iter().any(|ev| {
+                                                                       match ev {
+                                                                               // to apply a monitor update that blocked the claiming channel,
+                                                                               BackgroundEvent::MonitorUpdateRegeneratedOnStartup {
+                                                                                       funding_txo, update, ..
+                                                                               } => {
+                                                                                       if *funding_txo == claiming_chan_funding_outpoint {
+                                                                                               assert!(update.updates.iter().any(|upd|
+                                                                                                       if let ChannelMonitorUpdateStep::PaymentPreimage {
+                                                                                                               payment_preimage: update_preimage
+                                                                                                       } = upd {
+                                                                                                               payment_preimage == *update_preimage
+                                                                                                       } else { false }
+                                                                                               ), "{:?}", update);
+                                                                                               true
+                                                                                       } else { false }
+                                                                               },
+                                                                               // or the channel we'd unblock is already closed,
+                                                                               BackgroundEvent::ClosedMonitorUpdateRegeneratedOnStartup(
+                                                                                       (funding_txo, monitor_update)
+                                                                               ) => {
+                                                                                       if *funding_txo == next_channel_outpoint {
+                                                                                               assert_eq!(monitor_update.updates.len(), 1);
+                                                                                               assert!(matches!(
+                                                                                                       monitor_update.updates[0],
+                                                                                                       ChannelMonitorUpdateStep::ChannelForceClosed { .. }
+                                                                                               ));
+                                                                                               true
+                                                                                       } else { false }
+                                                                               },
+                                                                               // or the monitor update has completed and will unblock
+                                                                               // immediately once we get going.
+                                                                               BackgroundEvent::MonitorUpdatesComplete {
+                                                                                       channel_id, ..
+                                                                               } =>
+                                                                                       *channel_id == claiming_chan_funding_outpoint.to_channel_id(),
+                                                                       }
+                                                               }), "{:?}", *background_events);
+                                                       }
+                                                       None
+                                               } else if definitely_duplicate {
+                                                       if let Some(other_chan) = chan_to_release {
+                                                               Some(MonitorUpdateCompletionAction::FreeOtherChannelImmediately {
+                                                                       downstream_counterparty_node_id: other_chan.0,
+                                                                       downstream_funding_outpoint: other_chan.1,
+                                                                       blocking_action: other_chan.2,
+                                                               })
+                                                       } else { None }
+                                               } else {
+                                                       let fee_earned_msat = if let Some(forwarded_htlc_value) = forwarded_htlc_value_msat {
+                                                               if let Some(claimed_htlc_value) = htlc_claim_value_msat {
+                                                                       Some(claimed_htlc_value - forwarded_htlc_value)
+                                                               } else { None }
+                                                       } else { None };
                                                        Some(MonitorUpdateCompletionAction::EmitEventAndFreeOtherChannel {
                                                                event: events::Event::PaymentForwarded {
                                                                        fee_earned_msat,
@@ -5549,19 +5698,9 @@ where
                                                                        next_channel_id: Some(next_channel_outpoint.to_channel_id()),
                                                                        outbound_amount_forwarded_msat: forwarded_htlc_value_msat,
                                                                },
-                                                               downstream_counterparty_and_funding_outpoint:
-                                                                       if let Some(node_id) = next_channel_counterparty_node_id {
-                                                                               Some((node_id, next_channel_outpoint, completed_blocker))
-                                                                       } else {
-                                                                               // We can only get `None` here if we are processing a
-                                                                               // `ChannelMonitor`-originated event, in which case we
-                                                                               // don't care about ensuring we wake the downstream
-                                                                               // channel's monitor updating - the channel is already
-                                                                               // closed.
-                                                                               None
-                                                                       },
+                                                               downstream_counterparty_and_funding_outpoint: chan_to_release,
                                                        })
-                                               } else { None }
+                                               }
                                        });
                                if let Err((pk, err)) = res {
                                        let result: Result<(), _> = Err(err);
@@ -5577,6 +5716,10 @@ where
        }
 
        fn handle_monitor_update_completion_actions<I: IntoIterator<Item=MonitorUpdateCompletionAction>>(&self, actions: I) {
+               debug_assert_ne!(self.pending_events.held_by_thread(), LockHeldState::HeldByThread);
+               debug_assert_ne!(self.claimable_payments.held_by_thread(), LockHeldState::HeldByThread);
+               debug_assert_ne!(self.per_peer_state.held_by_thread(), LockHeldState::HeldByThread);
+
                for action in actions.into_iter() {
                        match action {
                                MonitorUpdateCompletionAction::PaymentClaimed { payment_hash } => {
@@ -5606,6 +5749,15 @@ where
                                                self.handle_monitor_update_release(node_id, funding_outpoint, Some(blocker));
                                        }
                                },
+                               MonitorUpdateCompletionAction::FreeOtherChannelImmediately {
+                                       downstream_counterparty_node_id, downstream_funding_outpoint, blocking_action,
+                               } => {
+                                       self.handle_monitor_update_release(
+                                               downstream_counterparty_node_id,
+                                               downstream_funding_outpoint,
+                                               Some(blocking_action),
+                                       );
+                               },
                        }
                }
        }
@@ -6408,6 +6560,9 @@ where
                                        if let ChannelPhase::Funded(chan) = chan_phase_entry.get_mut() {
                                                let res = try_chan_phase_entry!(self, chan.update_fulfill_htlc(&msg), chan_phase_entry);
                                                if let HTLCSource::PreviousHopData(prev_hop) = &res.0 {
+                                                       log_trace!(self.logger,
+                                                               "Holding the next revoke_and_ack from {} until the preimage is durably persisted in the inbound edge's ChannelMonitor",
+                                                               msg.channel_id);
                                                        peer_state.actions_blocking_raa_monitor_updates.entry(msg.channel_id)
                                                                .or_insert_with(Vec::new)
                                                                .push(RAAMonitorUpdateBlockingAction::from_prev_hop_data(&prev_hop));
@@ -6428,7 +6583,7 @@ where
                                hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id))
                        }
                };
-               self.claim_funds_internal(htlc_source, msg.payment_preimage.clone(), Some(forwarded_htlc_value), false, Some(*counterparty_node_id), funding_txo);
+               self.claim_funds_internal(htlc_source, msg.payment_preimage.clone(), Some(forwarded_htlc_value), false, false, Some(*counterparty_node_id), funding_txo);
                Ok(())
        }
 
@@ -6916,7 +7071,7 @@ where
                                        MonitorEvent::HTLCEvent(htlc_update) => {
                                                if let Some(preimage) = htlc_update.payment_preimage {
                                                        log_trace!(self.logger, "Claiming HTLC with preimage {} from our monitor", preimage);
-                                                       self.claim_funds_internal(htlc_update.source, preimage, htlc_update.htlc_value_satoshis.map(|v| v * 1000), true, counterparty_node_id, funding_outpoint);
+                                                       self.claim_funds_internal(htlc_update.source, preimage, htlc_update.htlc_value_satoshis.map(|v| v * 1000), true, false, counterparty_node_id, funding_outpoint);
                                                } else {
                                                        log_trace!(self.logger, "Failing HTLC with hash {} from our monitor", &htlc_update.payment_hash);
                                                        let receiver = HTLCDestination::NextHopChannel { node_id: counterparty_node_id, channel_id: funding_outpoint.to_channel_id() };
@@ -10103,6 +10258,9 @@ where
                                                                Some((blocked_node_id, blocked_channel_outpoint, blocking_action)), ..
                                                } = action {
                                                        if let Some(blocked_peer_state) = per_peer_state.get(&blocked_node_id) {
+                                                               log_trace!(args.logger,
+                                                                       "Holding the next revoke_and_ack from {} until the preimage is durably persisted in the inbound edge's ChannelMonitor",
+                                                                       blocked_channel_outpoint.to_channel_id());
                                                                blocked_peer_state.lock().unwrap().actions_blocking_raa_monitor_updates
                                                                        .entry(blocked_channel_outpoint.to_channel_id())
                                                                        .or_insert_with(Vec::new).push(blocking_action.clone());
@@ -10114,6 +10272,9 @@ where
                                                                // anymore.
                                                        }
                                                }
+                                               if let MonitorUpdateCompletionAction::FreeOtherChannelImmediately { .. } = action {
+                                                       debug_assert!(false, "Non-event-generating channel freeing should not appear in our queue");
+                                               }
                                        }
                                }
                                peer_state.lock().unwrap().monitor_update_blocked_actions = monitor_update_blocked_actions;
@@ -10184,7 +10345,7 @@ where
                        // don't remember in the `ChannelMonitor` where we got a preimage from, but if the
                        // channel is closed we just assume that it probably came from an on-chain claim.
                        channel_manager.claim_funds_internal(source, preimage, Some(downstream_value),
-                               downstream_closed, downstream_node_id, downstream_funding);
+                               downstream_closed, true, downstream_node_id, downstream_funding);
                }
 
                //TODO: Broadcast channel update for closed channels, but only after we've made a