Handle `MonitorUpdateCompletionAction`s after monitor update sync
authorMatt Corallo <git@bluematt.me>
Fri, 27 Jan 2023 06:14:18 +0000 (06:14 +0000)
committerMatt Corallo <git@bluematt.me>
Wed, 22 Feb 2023 00:51:13 +0000 (00:51 +0000)
In a previous PR, we added a `MonitorUpdateCompletionAction` enum
which described actions to take after a `ChannelMonitorUpdate`
persistence completes. At the time, it was only used to execute
actions in-line, however in the next commit we'll start (correctly)
leaving the existing actions until after monitor updates complete.

lightning/src/ln/channelmanager.rs

index 6b9faa5644fa980783c592a66590c32b1ee56dba..7e9dc77ad182df1b6fde66b18fa52ebd95bebf96 100644 (file)
@@ -466,6 +466,7 @@ enum BackgroundEvent {
        ClosingMonitorUpdate((OutPoint, ChannelMonitorUpdate)),
 }
 
+#[derive(Debug)]
 pub(crate) enum MonitorUpdateCompletionAction {
        /// Indicates that a payment ultimately destined for us was claimed and we should emit an
        /// [`events::Event::PaymentClaimed`] to the user if we haven't yet generated such an event for
@@ -1476,6 +1477,54 @@ macro_rules! emit_channel_ready_event {
        }
 }
 
+macro_rules! handle_monitor_update_completion {
+       ($self: ident, $update_id: expr, $peer_state_lock: expr, $peer_state: expr, $chan: expr) => { {
+               let mut updates = $chan.monitor_updating_restored(&$self.logger,
+                       &$self.node_signer, $self.genesis_hash, &$self.default_configuration,
+                       $self.best_block.read().unwrap().height());
+               let counterparty_node_id = $chan.get_counterparty_node_id();
+               let channel_update = if updates.channel_ready.is_some() && $chan.is_usable() {
+                       // We only send a channel_update in the case where we are just now sending a
+                       // channel_ready and the channel is in a usable state. We may re-send a
+                       // channel_update later through the announcement_signatures process for public
+                       // channels, but there's no reason not to just inform our counterparty of our fees
+                       // now.
+                       if let Ok(msg) = $self.get_channel_update_for_unicast($chan) {
+                               Some(events::MessageSendEvent::SendChannelUpdate {
+                                       node_id: counterparty_node_id,
+                                       msg,
+                               })
+                       } else { None }
+               } else { None };
+
+               let update_actions = $peer_state.monitor_update_blocked_actions
+                       .remove(&$chan.channel_id()).unwrap_or(Vec::new());
+
+               let htlc_forwards = $self.handle_channel_resumption(
+                       &mut $peer_state.pending_msg_events, $chan, updates.raa,
+                       updates.commitment_update, updates.order, updates.accepted_htlcs,
+                       updates.funding_broadcastable, updates.channel_ready,
+                       updates.announcement_sigs);
+               if let Some(upd) = channel_update {
+                       $peer_state.pending_msg_events.push(upd);
+               }
+
+               let channel_id = $chan.channel_id();
+               core::mem::drop($peer_state_lock);
+
+               $self.handle_monitor_update_completion_actions(update_actions);
+
+               if let Some(forwards) = htlc_forwards {
+                       $self.forward_htlcs(&mut [forwards][..]);
+               }
+               $self.finalize_claims(updates.finalized_claimed_htlcs);
+               for failure in updates.failed_htlcs.drain(..) {
+                       let receiver = HTLCDestination::NextHopChannel { node_id: Some(counterparty_node_id), channel_id };
+                       $self.fail_htlc_backwards_internal(&failure.0, &failure.1, &failure.2, receiver);
+               }
+       } }
+}
+
 macro_rules! handle_new_monitor_update {
        ($self: ident, $update_res: expr, $update_id: expr, $peer_state_lock: expr, $peer_state: expr, $chan: expr, MANUALLY_REMOVING, $remove: expr) => { {
                // update_maps_on_chan_removal needs to be able to take id_to_peer, so make sure we can in
@@ -1504,43 +1553,7 @@ macro_rules! handle_new_monitor_update {
                                        .update_id == $update_id) &&
                                        $chan.get_latest_monitor_update_id() == $update_id
                                {
-                                       let mut updates = $chan.monitor_updating_restored(&$self.logger,
-                                               &$self.node_signer, $self.genesis_hash, &$self.default_configuration,
-                                               $self.best_block.read().unwrap().height());
-                                       let counterparty_node_id = $chan.get_counterparty_node_id();
-                                       let channel_update = if updates.channel_ready.is_some() && $chan.is_usable() {
-                                               // We only send a channel_update in the case where we are just now sending a
-                                               // channel_ready and the channel is in a usable state. We may re-send a
-                                               // channel_update later through the announcement_signatures process for public
-                                               // channels, but there's no reason not to just inform our counterparty of our fees
-                                               // now.
-                                               if let Ok(msg) = $self.get_channel_update_for_unicast($chan) {
-                                                       Some(events::MessageSendEvent::SendChannelUpdate {
-                                                               node_id: counterparty_node_id,
-                                                               msg,
-                                                       })
-                                               } else { None }
-                                       } else { None };
-                                       let htlc_forwards = $self.handle_channel_resumption(
-                                               &mut $peer_state.pending_msg_events, $chan, updates.raa,
-                                               updates.commitment_update, updates.order, updates.accepted_htlcs,
-                                               updates.funding_broadcastable, updates.channel_ready,
-                                               updates.announcement_sigs);
-                                       if let Some(upd) = channel_update {
-                                               $peer_state.pending_msg_events.push(upd);
-                                       }
-
-                                       let channel_id = $chan.channel_id();
-                                       core::mem::drop($peer_state_lock);
-
-                                       if let Some(forwards) = htlc_forwards {
-                                               $self.forward_htlcs(&mut [forwards][..]);
-                                       }
-                                       $self.finalize_claims(updates.finalized_claimed_htlcs);
-                                       for failure in updates.failed_htlcs.drain(..) {
-                                               let receiver = HTLCDestination::NextHopChannel { node_id: Some(counterparty_node_id), channel_id };
-                                               $self.fail_htlc_backwards_internal(&failure.0, &failure.1, &failure.2, receiver);
-                                       }
+                                       handle_monitor_update_completion!($self, $update_id, $peer_state_lock, $peer_state, $chan);
                                }
                                Ok(())
                        },
@@ -4208,6 +4221,14 @@ where
                pending_forwards: Vec<(PendingHTLCInfo, u64)>, funding_broadcastable: Option<Transaction>,
                channel_ready: Option<msgs::ChannelReady>, announcement_sigs: Option<msgs::AnnouncementSignatures>)
        -> Option<(u64, OutPoint, u128, Vec<(PendingHTLCInfo, u64)>)> {
+               log_trace!(self.logger, "Handling channel resumption for channel {} with {} RAA, {} commitment update, {} pending forwards, {}broadcasting funding, {} channel ready, {} announcement",
+                       log_bytes!(channel.channel_id()),
+                       if raa.is_some() { "an" } else { "no" },
+                       if commitment_update.is_some() { "a" } else { "no" }, pending_forwards.len(),
+                       if funding_broadcastable.is_some() { "" } else { "not " },
+                       if channel_ready.is_some() { "sending" } else { "without" },
+                       if announcement_sigs.is_some() { "sending" } else { "without" });
+
                let mut htlc_forwards = None;
 
                let counterparty_node_id = channel.get_counterparty_node_id();
@@ -4266,65 +4287,36 @@ where
        fn channel_monitor_updated(&self, funding_txo: &OutPoint, highest_applied_update_id: u64, counterparty_node_id: Option<&PublicKey>) {
                let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);
 
-               let htlc_forwards;
-               let (mut pending_failures, finalized_claims, counterparty_node_id) = {
-                       let counterparty_node_id = match counterparty_node_id {
-                               Some(cp_id) => cp_id.clone(),
-                               None => {
-                                       // TODO: Once we can rely on the counterparty_node_id from the
-                                       // monitor event, this and the id_to_peer map should be removed.
-                                       let id_to_peer = self.id_to_peer.lock().unwrap();
-                                       match id_to_peer.get(&funding_txo.to_channel_id()) {
-                                               Some(cp_id) => cp_id.clone(),
-                                               None => return,
-                                       }
-                               }
-                       };
-                       let per_peer_state = self.per_peer_state.read().unwrap();
-                       let mut peer_state_lock;
-                       let peer_state_mutex_opt = per_peer_state.get(&counterparty_node_id);
-                       if peer_state_mutex_opt.is_none() { return }
-                       peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap();
-                       let peer_state = &mut *peer_state_lock;
-                       let mut channel = {
-                               match peer_state.channel_by_id.entry(funding_txo.to_channel_id()){
-                                       hash_map::Entry::Occupied(chan) => chan,
-                                       hash_map::Entry::Vacant(_) => return,
+               let counterparty_node_id = match counterparty_node_id {
+                       Some(cp_id) => cp_id.clone(),
+                       None => {
+                               // TODO: Once we can rely on the counterparty_node_id from the
+                               // monitor event, this and the id_to_peer map should be removed.
+                               let id_to_peer = self.id_to_peer.lock().unwrap();
+                               match id_to_peer.get(&funding_txo.to_channel_id()) {
+                                       Some(cp_id) => cp_id.clone(),
+                                       None => return,
                                }
-                       };
-                       if !channel.get().is_awaiting_monitor_update() || channel.get().get_latest_monitor_update_id() != highest_applied_update_id {
-                               return;
                        }
-
-                       let updates = channel.get_mut().monitor_updating_restored(&self.logger, &self.node_signer, self.genesis_hash, &self.default_configuration, self.best_block.read().unwrap().height());
-                       let channel_update = if updates.channel_ready.is_some() && channel.get().is_usable() {
-                               // We only send a channel_update in the case where we are just now sending a
-                               // channel_ready and the channel is in a usable state. We may re-send a
-                               // channel_update later through the announcement_signatures process for public
-                               // channels, but there's no reason not to just inform our counterparty of our fees
-                               // now.
-                               if let Ok(msg) = self.get_channel_update_for_unicast(channel.get()) {
-                                       Some(events::MessageSendEvent::SendChannelUpdate {
-                                               node_id: channel.get().get_counterparty_node_id(),
-                                               msg,
-                                       })
-                               } else { None }
-                       } else { None };
-                       htlc_forwards = self.handle_channel_resumption(&mut peer_state.pending_msg_events, channel.get_mut(), updates.raa, updates.commitment_update, updates.order, updates.accepted_htlcs, updates.funding_broadcastable, updates.channel_ready, updates.announcement_sigs);
-                       if let Some(upd) = channel_update {
-                               peer_state.pending_msg_events.push(upd);
+               };
+               let per_peer_state = self.per_peer_state.read().unwrap();
+               let mut peer_state_lock;
+               let peer_state_mutex_opt = per_peer_state.get(&counterparty_node_id);
+               if peer_state_mutex_opt.is_none() { return }
+               peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap();
+               let peer_state = &mut *peer_state_lock;
+               let mut channel = {
+                       match peer_state.channel_by_id.entry(funding_txo.to_channel_id()){
+                               hash_map::Entry::Occupied(chan) => chan,
+                               hash_map::Entry::Vacant(_) => return,
                        }
-
-                       (updates.failed_htlcs, updates.finalized_claimed_htlcs, counterparty_node_id)
                };
-               if let Some(forwards) = htlc_forwards {
-                       self.forward_htlcs(&mut [forwards][..]);
-               }
-               self.finalize_claims(finalized_claims);
-               for failure in pending_failures.drain(..) {
-                       let receiver = HTLCDestination::NextHopChannel { node_id: Some(counterparty_node_id), channel_id: funding_txo.to_channel_id() };
-                       self.fail_htlc_backwards_internal(&failure.0, &failure.1, &failure.2, receiver);
+               log_trace!(self.logger, "ChannelMonitor updated to {}. Current highest is {}",
+                       highest_applied_update_id, channel.get().get_latest_monitor_update_id());
+               if !channel.get().is_awaiting_monitor_update() || channel.get().get_latest_monitor_update_id() != highest_applied_update_id {
+                       return;
                }
+               handle_monitor_update_completion!(self, highest_applied_update_id, peer_state_lock, peer_state, channel.get_mut());
        }
 
        /// Accepts a request to open a channel after a [`Event::OpenChannelRequest`].