Immediately unblock channels on duplicate claims 2023-10-dup-claim-chan-hang
authorMatt Corallo <git@bluematt.me>
Wed, 11 Oct 2023 14:01:28 +0000 (14:01 +0000)
committerMatt Corallo <git@bluematt.me>
Thu, 19 Oct 2023 15:27:57 +0000 (15:27 +0000)
When `MonitorUpdateCompletionAction`s were added, we didn't
consider the case of a duplicate claim during normal HTLC
processing (as the handling only had an `if let` rather than a
`match`, which made the branch easy to miss). This can lead to a
channel freezing indefinitely if an HTLC is claimed (without a
`commitment_signed`), the peer disconnects, and then the HTLC is
claimed again, leading to a never-completing
`MonitorUpdateCompletionAction`.

The fix is simple - if we get back an
`UpdateFulfillCommitFetch::DuplicateClaim` when claiming from the
inbound edge, immediately unlock the outbound edge channel with a
new `MonitorUpdateCompletionAction::FreeOtherChannelImmediately`.

Here we implement this fix by actually generating the new variant
when a claim is duplicative.

lightning/src/ln/chanmon_update_fail_tests.rs
lightning/src/ln/channelmanager.rs

index dc9bbd3e4048c902116b875a1a995e5186302266..6fb21b3a15b1d0631e513068c66a8afc6769baa3 100644 (file)
@@ -3431,3 +3431,72 @@ fn test_reload_mon_update_completion_actions() {
        do_test_reload_mon_update_completion_actions(true);
        do_test_reload_mon_update_completion_actions(false);
 }
+
+fn do_test_glacial_peer_cant_hang(hold_chan_a: bool) {
+       // Test that if a peer manages to send an `update_fulfill_htlc` message without a
+       // `commitment_signed`, disconnects, then replays the `update_fulfill_htlc` message it doesn't
+       // result in a channel hang. This was previously broken as the `DuplicateClaim` case wasn't
+       // handled when claiming an HTLC and handling wasn't added when completion actions were added
+       // (which must always complete at some point).
+       let chanmon_cfgs = create_chanmon_cfgs(3);
+       let node_cfgs = create_node_cfgs(3, &chanmon_cfgs);
+
+       let node_chanmgrs = create_node_chanmgrs(3, &node_cfgs, &[None, None, None]);
+       let mut nodes = create_network(3, &node_cfgs, &node_chanmgrs);
+
+       create_announced_chan_between_nodes(&nodes, 0, 1);
+       create_announced_chan_between_nodes(&nodes, 1, 2);
+
+       // Route a payment from A, through B, to C, then claim it on C. Replay the
+       // `update_fulfill_htlc` twice on B to check that B doesn't hang.
+       let (payment_preimage, payment_hash, ..) = route_payment(&nodes[0], &[&nodes[1], &nodes[2]], 1_000_000);
+
+       nodes[2].node.claim_funds(payment_preimage);
+       check_added_monitors(&nodes[2], 1);
+       expect_payment_claimed!(nodes[2], payment_hash, 1_000_000);
+
+       let cs_updates = get_htlc_update_msgs(&nodes[2], &nodes[1].node.get_our_node_id());
+       if hold_chan_a {
+               // The first update will be on the A <-> B channel, which we allow to complete.
+               chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::InProgress);
+       }
+       nodes[1].node.handle_update_fulfill_htlc(&nodes[2].node.get_our_node_id(), &cs_updates.update_fulfill_htlcs[0]);
+       check_added_monitors(&nodes[1], 1);
+
+       if !hold_chan_a {
+               let bs_updates = get_htlc_update_msgs(&nodes[1], &nodes[0].node.get_our_node_id());
+               nodes[0].node.handle_update_fulfill_htlc(&nodes[1].node.get_our_node_id(), &bs_updates.update_fulfill_htlcs[0]);
+               commitment_signed_dance!(nodes[0], nodes[1], bs_updates.commitment_signed, false);
+               expect_payment_sent!(&nodes[0], payment_preimage);
+       }
+
+       nodes[1].node.peer_disconnected(&nodes[2].node.get_our_node_id());
+       nodes[2].node.peer_disconnected(&nodes[1].node.get_our_node_id());
+
+       let mut reconnect = ReconnectArgs::new(&nodes[1], &nodes[2]);
+       reconnect.pending_htlc_claims = (1, 0);
+       reconnect_nodes(reconnect);
+
+       if !hold_chan_a {
+               expect_payment_forwarded!(nodes[1], nodes[0], nodes[2], Some(1000), false, false);
+               send_payment(&nodes[0], &[&nodes[1], &nodes[2]], 100_000);
+       } else {
+               assert!(nodes[1].node.get_and_clear_pending_events().is_empty());
+               assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty());
+
+               let (route, payment_hash_2, _, payment_secret_2) = get_route_and_payment_hash!(&nodes[1], nodes[2], 1_000_000);
+
+               nodes[1].node.send_payment_with_route(&route, payment_hash_2,
+                       RecipientOnionFields::secret_only(payment_secret_2), PaymentId(payment_hash_2.0)).unwrap();
+               check_added_monitors(&nodes[1], 0);
+
+               assert!(nodes[1].node.get_and_clear_pending_events().is_empty());
+               assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty());
+       }
+}
+
+#[test]
+fn test_glacial_peer_cant_hang() {
+       do_test_glacial_peer_cant_hang(false);
+       do_test_glacial_peer_cant_hang(true);
+}
index aa5f27606f04188be7c92147392ec733521bdf21..1a4bdfbf6eee8bfc9c34c38df1baeb86cde80c1d 100644 (file)
@@ -563,6 +563,7 @@ struct ClaimablePayments {
 /// usually because we're running pre-full-init. They are handled immediately once we detect we are
 /// running normally, and specifically must be processed before any other non-background
 /// [`ChannelMonitorUpdate`]s are applied.
+#[derive(Debug)]
 enum BackgroundEvent {
        /// Handle a ChannelMonitorUpdate which closes the channel or for an already-closed channel.
        /// This is only separated from [`Self::MonitorUpdateRegeneratedOnStartup`] as the
@@ -5381,8 +5382,11 @@ where
                        for htlc in sources.drain(..) {
                                if let Err((pk, err)) = self.claim_funds_from_hop(
                                        htlc.prev_hop, payment_preimage,
-                                       |_| Some(MonitorUpdateCompletionAction::PaymentClaimed { payment_hash }))
-                               {
+                                       |_, definitely_duplicate| {
+                                               debug_assert!(!definitely_duplicate, "We shouldn't claim duplicatively from a payment");
+                                               Some(MonitorUpdateCompletionAction::PaymentClaimed { payment_hash })
+                                       }
+                               ) {
                                        if let msgs::ErrorAction::IgnoreError = err.err.action {
                                                // We got a temporary failure updating monitor, but will claim the
                                                // HTLC when the monitor updating is restored (or on chain).
@@ -5410,7 +5414,7 @@ where
                }
        }
 
-       fn claim_funds_from_hop<ComplFunc: FnOnce(Option<u64>) -> Option<MonitorUpdateCompletionAction>>(&self,
+       fn claim_funds_from_hop<ComplFunc: FnOnce(Option<u64>, bool) -> Option<MonitorUpdateCompletionAction>>(&self,
                prev_hop: HTLCPreviousHopData, payment_preimage: PaymentPreimage, completion_action: ComplFunc)
        -> Result<(), (PublicKey, MsgHandleErrInternal)> {
                //TODO: Delay the claimed_funds relaying just like we do outbound relay!
@@ -5420,6 +5424,11 @@ where
                // `BackgroundEvent`s.
                let during_init = !self.background_events_processed_since_startup.load(Ordering::Acquire);
 
+               // As we may call handle_monitor_update_completion_actions in rather rare cases, check that
+               // the required mutexes are not held before we start.
+               debug_assert_ne!(self.pending_events.held_by_thread(), LockHeldState::HeldByThread);
+               debug_assert_ne!(self.claimable_payments.held_by_thread(), LockHeldState::HeldByThread);
+
                {
                        let per_peer_state = self.per_peer_state.read().unwrap();
                        let chan_id = prev_hop.outpoint.to_channel_id();
@@ -5441,25 +5450,70 @@ where
                                                let counterparty_node_id = chan.context.get_counterparty_node_id();
                                                let fulfill_res = chan.get_update_fulfill_htlc_and_commit(prev_hop.htlc_id, payment_preimage, &self.logger);
 
-                                               if let UpdateFulfillCommitFetch::NewClaim { htlc_value_msat, monitor_update } = fulfill_res {
-                                                       if let Some(action) = completion_action(Some(htlc_value_msat)) {
-                                                               log_trace!(self.logger, "Tracking monitor update completion action for channel {}: {:?}",
-                                                                       chan_id, action);
-                                                               peer_state.monitor_update_blocked_actions.entry(chan_id).or_insert(Vec::new()).push(action);
+                                               match fulfill_res {
+                                                       UpdateFulfillCommitFetch::NewClaim { htlc_value_msat, monitor_update } => {
+                                                               if let Some(action) = completion_action(Some(htlc_value_msat), false) {
+                                                                       log_trace!(self.logger, "Tracking monitor update completion action for channel {}: {:?}",
+                                                                               chan_id, action);
+                                                                       peer_state.monitor_update_blocked_actions.entry(chan_id).or_insert(Vec::new()).push(action);
+                                                               }
+                                                               if !during_init {
+                                                                       handle_new_monitor_update!(self, prev_hop.outpoint, monitor_update, peer_state_lock,
+                                                                               peer_state, per_peer_state, chan);
+                                                               } else {
+                                                                       // If we're running during init we cannot update a monitor directly -
+                                                                       // they probably haven't actually been loaded yet. Instead, push the
+                                                                       // monitor update as a background event.
+                                                                       self.pending_background_events.lock().unwrap().push(
+                                                                               BackgroundEvent::MonitorUpdateRegeneratedOnStartup {
+                                                                                       counterparty_node_id,
+                                                                                       funding_txo: prev_hop.outpoint,
+                                                                                       update: monitor_update.clone(),
+                                                                               });
+                                                               }
                                                        }
-                                                       if !during_init {
-                                                               handle_new_monitor_update!(self, prev_hop.outpoint, monitor_update, peer_state_lock,
-                                                                       peer_state, per_peer_state, chan);
-                                                       } else {
-                                                               // If we're running during init we cannot update a monitor directly -
-                                                               // they probably haven't actually been loaded yet. Instead, push the
-                                                               // monitor update as a background event.
-                                                               self.pending_background_events.lock().unwrap().push(
-                                                                       BackgroundEvent::MonitorUpdateRegeneratedOnStartup {
-                                                                               counterparty_node_id,
-                                                                               funding_txo: prev_hop.outpoint,
-                                                                               update: monitor_update.clone(),
-                                                                       });
+                                                       UpdateFulfillCommitFetch::DuplicateClaim {} => {
+                                                               let action = if let Some(action) = completion_action(None, true) {
+                                                                       action
+                                                               } else {
+                                                                       return Ok(());
+                                                               };
+                                                               mem::drop(peer_state_lock);
+
+                                                               log_trace!(self.logger, "Completing monitor update completion action for channel {} as claim was redundant: {:?}",
+                                                                       chan_id, action);
+                                                               let (node_id, funding_outpoint, blocker) =
+                                                               if let MonitorUpdateCompletionAction::FreeOtherChannelImmediately {
+                                                                       downstream_counterparty_node_id: node_id,
+                                                                       downstream_funding_outpoint: funding_outpoint,
+                                                                       blocking_action: blocker,
+                                                               } = action {
+                                                                       (node_id, funding_outpoint, blocker)
+                                                               } else {
+                                                                       debug_assert!(false,
+                                                                               "Duplicate claims should always free another channel immediately");
+                                                                       return Ok(());
+                                                               };
+                                                               if let Some(peer_state_mtx) = per_peer_state.get(&node_id) {
+                                                                       let mut peer_state = peer_state_mtx.lock().unwrap();
+                                                                       if let Some(blockers) = peer_state
+                                                                               .actions_blocking_raa_monitor_updates
+                                                                               .get_mut(&funding_outpoint.to_channel_id())
+                                                                       {
+                                                                               let mut found_blocker = false;
+                                                                               blockers.retain(|iter| {
+                                                                                       // Note that we could actually be blocked, in
+                                                                                       // which case we need to only remove the one
+                                                                                       // blocker which was added duplicatively.
+                                                                                       let first_blocker = !found_blocker;
+                                                                                       if *iter == blocker { found_blocker = true; }
+                                                                                       *iter != blocker || !first_blocker
+                                                                               });
+                                                                               debug_assert!(found_blocker);
+                                                                       }
+                                                               } else {
+                                                                       debug_assert!(false);
+                                                               }
                                                        }
                                                }
                                        }
@@ -5507,7 +5561,7 @@ where
                // `ChannelMonitor` we've provided the above update to. Instead, note that `Event`s are
                // generally always allowed to be duplicative (and it's specifically noted in
                // `PaymentForwarded`).
-               self.handle_monitor_update_completion_actions(completion_action(None));
+               self.handle_monitor_update_completion_actions(completion_action(None, false));
                Ok(())
        }
 
@@ -5537,13 +5591,84 @@ where
                        HTLCSource::PreviousHopData(hop_data) => {
                                let prev_outpoint = hop_data.outpoint;
                                let completed_blocker = RAAMonitorUpdateBlockingAction::from_prev_hop_data(&hop_data);
+                               #[cfg(debug_assertions)]
+                               let claiming_chan_funding_outpoint = hop_data.outpoint;
                                let res = self.claim_funds_from_hop(hop_data, payment_preimage,
-                                       |htlc_claim_value_msat| {
-                                               if let Some(forwarded_htlc_value) = forwarded_htlc_value_msat {
-                                                       let fee_earned_msat = if let Some(claimed_htlc_value) = htlc_claim_value_msat {
-                                                               Some(claimed_htlc_value - forwarded_htlc_value)
-                                                       } else { None };
+                                       |htlc_claim_value_msat, definitely_duplicate| {
+                                               let chan_to_release =
+                                                       if let Some(node_id) = next_channel_counterparty_node_id {
+                                                               Some((node_id, next_channel_outpoint, completed_blocker))
+                                                       } else {
+                                                               // We can only get `None` here if we are processing a
+                                                               // `ChannelMonitor`-originated event, in which case we
+                                                               // don't care about ensuring we wake the downstream
+                                                               // channel's monitor updating - the channel is already
+                                                               // closed.
+                                                               None
+                                                       };
 
+                                               if definitely_duplicate && startup_replay {
+                                                       // On startup we may get redundant claims which are related to
+                                                       // monitor updates still in flight. In that case, we shouldn't
+                                                       // immediately free, but instead let that monitor update complete
+                                                       // in the background.
+                                                       #[cfg(debug_assertions)] {
+                                                               let background_events = self.pending_background_events.lock().unwrap();
+                                                               // There should be a `BackgroundEvent` pending...
+                                                               assert!(background_events.iter().any(|ev| {
+                                                                       match ev {
+                                                                               // to apply a monitor update that blocked the claiming channel,
+                                                                               BackgroundEvent::MonitorUpdateRegeneratedOnStartup {
+                                                                                       funding_txo, update, ..
+                                                                               } => {
+                                                                                       if *funding_txo == claiming_chan_funding_outpoint {
+                                                                                               assert!(update.updates.iter().any(|upd|
+                                                                                                       if let ChannelMonitorUpdateStep::PaymentPreimage {
+                                                                                                               payment_preimage: update_preimage
+                                                                                                       } = upd {
+                                                                                                               payment_preimage == *update_preimage
+                                                                                                       } else { false }
+                                                                                               ), "{:?}", update);
+                                                                                               true
+                                                                                       } else { false }
+                                                                               },
+                                                                               // or the channel we'd unblock is already closed,
+                                                                               BackgroundEvent::ClosedMonitorUpdateRegeneratedOnStartup(
+                                                                                       (funding_txo, monitor_update)
+                                                                               ) => {
+                                                                                       if *funding_txo == next_channel_outpoint {
+                                                                                               assert_eq!(monitor_update.updates.len(), 1);
+                                                                                               assert!(matches!(
+                                                                                                       monitor_update.updates[0],
+                                                                                                       ChannelMonitorUpdateStep::ChannelForceClosed { .. }
+                                                                                               ));
+                                                                                               true
+                                                                                       } else { false }
+                                                                               },
+                                                                               // or the monitor update has completed and will unblock
+                                                                               // immediately once we get going.
+                                                                               BackgroundEvent::MonitorUpdatesComplete {
+                                                                                       channel_id, ..
+                                                                               } =>
+                                                                                       *channel_id == claiming_chan_funding_outpoint.to_channel_id(),
+                                                                       }
+                                                               }), "{:?}", *background_events);
+                                                       }
+                                                       None
+                                               } else if definitely_duplicate {
+                                                       if let Some(other_chan) = chan_to_release {
+                                                               Some(MonitorUpdateCompletionAction::FreeOtherChannelImmediately {
+                                                                       downstream_counterparty_node_id: other_chan.0,
+                                                                       downstream_funding_outpoint: other_chan.1,
+                                                                       blocking_action: other_chan.2,
+                                                               })
+                                                       } else { None }
+                                               } else {
+                                                       let fee_earned_msat = if let Some(forwarded_htlc_value) = forwarded_htlc_value_msat {
+                                                               if let Some(claimed_htlc_value) = htlc_claim_value_msat {
+                                                                       Some(claimed_htlc_value - forwarded_htlc_value)
+                                                               } else { None }
+                                                       } else { None };
                                                        Some(MonitorUpdateCompletionAction::EmitEventAndFreeOtherChannel {
                                                                event: events::Event::PaymentForwarded {
                                                                        fee_earned_msat,
@@ -5552,19 +5677,9 @@ where
                                                                        next_channel_id: Some(next_channel_outpoint.to_channel_id()),
                                                                        outbound_amount_forwarded_msat: forwarded_htlc_value_msat,
                                                                },
-                                                               downstream_counterparty_and_funding_outpoint:
-                                                                       if let Some(node_id) = next_channel_counterparty_node_id {
-                                                                               Some((node_id, next_channel_outpoint, completed_blocker))
-                                                                       } else {
-                                                                               // We can only get `None` here if we are processing a
-                                                                               // `ChannelMonitor`-originated event, in which case we
-                                                                               // don't care about ensuring we wake the downstream
-                                                                               // channel's monitor updating - the channel is already
-                                                                               // closed.
-                                                                               None
-                                                                       },
+                                                               downstream_counterparty_and_funding_outpoint: chan_to_release,
                                                        })
-                                               } else { None }
+                                               }
                                        });
                                if let Err((pk, err)) = res {
                                        let result: Result<(), _> = Err(err);
@@ -5580,6 +5695,10 @@ where
        }
 
        fn handle_monitor_update_completion_actions<I: IntoIterator<Item=MonitorUpdateCompletionAction>>(&self, actions: I) {
+               debug_assert_ne!(self.pending_events.held_by_thread(), LockHeldState::HeldByThread);
+               debug_assert_ne!(self.claimable_payments.held_by_thread(), LockHeldState::HeldByThread);
+               debug_assert_ne!(self.per_peer_state.held_by_thread(), LockHeldState::HeldByThread);
+
                for action in actions.into_iter() {
                        match action {
                                MonitorUpdateCompletionAction::PaymentClaimed { payment_hash } => {