From 9802afa53bac30e8bb831d267868b2aaabb61668 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Fri, 27 Jan 2023 06:14:18 +0000 Subject: [PATCH] Handle `MonitorUpdateCompletionAction`s after monitor update sync In a previous PR, we added a `MonitorUpdateCompletionAction` enum which described actions to take after a `ChannelMonitorUpdate` persistence completes. At the time, it was only used to execute actions in-line, however in the next commit we'll start (correctly) leaving the existing actions until after monitor updates complete. --- lightning/src/ln/channelmanager.rs | 174 ++++++++++++++--------------- 1 file changed, 83 insertions(+), 91 deletions(-) diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 6b9faa56..7e9dc77a 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -466,6 +466,7 @@ enum BackgroundEvent { ClosingMonitorUpdate((OutPoint, ChannelMonitorUpdate)), } +#[derive(Debug)] pub(crate) enum MonitorUpdateCompletionAction { /// Indicates that a payment ultimately destined for us was claimed and we should emit an /// [`events::Event::PaymentClaimed`] to the user if we haven't yet generated such an event for @@ -1476,6 +1477,54 @@ macro_rules! emit_channel_ready_event { } } +macro_rules! handle_monitor_update_completion { + ($self: ident, $update_id: expr, $peer_state_lock: expr, $peer_state: expr, $chan: expr) => { { + let mut updates = $chan.monitor_updating_restored(&$self.logger, + &$self.node_signer, $self.genesis_hash, &$self.default_configuration, + $self.best_block.read().unwrap().height()); + let counterparty_node_id = $chan.get_counterparty_node_id(); + let channel_update = if updates.channel_ready.is_some() && $chan.is_usable() { + // We only send a channel_update in the case where we are just now sending a + // channel_ready and the channel is in a usable state. We may re-send a + // channel_update later through the announcement_signatures process for public + // channels, but there's no reason not to just inform our counterparty of our fees + // now. + if let Ok(msg) = $self.get_channel_update_for_unicast($chan) { + Some(events::MessageSendEvent::SendChannelUpdate { + node_id: counterparty_node_id, + msg, + }) + } else { None } + } else { None }; + + let update_actions = $peer_state.monitor_update_blocked_actions + .remove(&$chan.channel_id()).unwrap_or(Vec::new()); + + let htlc_forwards = $self.handle_channel_resumption( + &mut $peer_state.pending_msg_events, $chan, updates.raa, + updates.commitment_update, updates.order, updates.accepted_htlcs, + updates.funding_broadcastable, updates.channel_ready, + updates.announcement_sigs); + if let Some(upd) = channel_update { + $peer_state.pending_msg_events.push(upd); + } + + let channel_id = $chan.channel_id(); + core::mem::drop($peer_state_lock); + + $self.handle_monitor_update_completion_actions(update_actions); + + if let Some(forwards) = htlc_forwards { + $self.forward_htlcs(&mut [forwards][..]); + } + $self.finalize_claims(updates.finalized_claimed_htlcs); + for failure in updates.failed_htlcs.drain(..) { + let receiver = HTLCDestination::NextHopChannel { node_id: Some(counterparty_node_id), channel_id }; + $self.fail_htlc_backwards_internal(&failure.0, &failure.1, &failure.2, receiver); + } + } } +} + macro_rules! handle_new_monitor_update { ($self: ident, $update_res: expr, $update_id: expr, $peer_state_lock: expr, $peer_state: expr, $chan: expr, MANUALLY_REMOVING, $remove: expr) => { { // update_maps_on_chan_removal needs to be able to take id_to_peer, so make sure we can in @@ -1504,43 +1553,7 @@ macro_rules! handle_new_monitor_update { .update_id == $update_id) && $chan.get_latest_monitor_update_id() == $update_id { - let mut updates = $chan.monitor_updating_restored(&$self.logger, - &$self.node_signer, $self.genesis_hash, &$self.default_configuration, - $self.best_block.read().unwrap().height()); - let counterparty_node_id = $chan.get_counterparty_node_id(); - let channel_update = if updates.channel_ready.is_some() && $chan.is_usable() { - // We only send a channel_update in the case where we are just now sending a - // channel_ready and the channel is in a usable state. We may re-send a - // channel_update later through the announcement_signatures process for public - // channels, but there's no reason not to just inform our counterparty of our fees - // now. - if let Ok(msg) = $self.get_channel_update_for_unicast($chan) { - Some(events::MessageSendEvent::SendChannelUpdate { - node_id: counterparty_node_id, - msg, - }) - } else { None } - } else { None }; - let htlc_forwards = $self.handle_channel_resumption( - &mut $peer_state.pending_msg_events, $chan, updates.raa, - updates.commitment_update, updates.order, updates.accepted_htlcs, - updates.funding_broadcastable, updates.channel_ready, - updates.announcement_sigs); - if let Some(upd) = channel_update { - $peer_state.pending_msg_events.push(upd); - } - - let channel_id = $chan.channel_id(); - core::mem::drop($peer_state_lock); - - if let Some(forwards) = htlc_forwards { - $self.forward_htlcs(&mut [forwards][..]); - } - $self.finalize_claims(updates.finalized_claimed_htlcs); - for failure in updates.failed_htlcs.drain(..) { - let receiver = HTLCDestination::NextHopChannel { node_id: Some(counterparty_node_id), channel_id }; - $self.fail_htlc_backwards_internal(&failure.0, &failure.1, &failure.2, receiver); - } + handle_monitor_update_completion!($self, $update_id, $peer_state_lock, $peer_state, $chan); } Ok(()) }, @@ -4208,6 +4221,14 @@ where pending_forwards: Vec<(PendingHTLCInfo, u64)>, funding_broadcastable: Option, channel_ready: Option, announcement_sigs: Option) -> Option<(u64, OutPoint, u128, Vec<(PendingHTLCInfo, u64)>)> { + log_trace!(self.logger, "Handling channel resumption for channel {} with {} RAA, {} commitment update, {} pending forwards, {}broadcasting funding, {} channel ready, {} announcement", + log_bytes!(channel.channel_id()), + if raa.is_some() { "an" } else { "no" }, + if commitment_update.is_some() { "a" } else { "no" }, pending_forwards.len(), + if funding_broadcastable.is_some() { "" } else { "not " }, + if channel_ready.is_some() { "sending" } else { "without" }, + if announcement_sigs.is_some() { "sending" } else { "without" }); + let mut htlc_forwards = None; let counterparty_node_id = channel.get_counterparty_node_id(); @@ -4266,65 +4287,36 @@ where fn channel_monitor_updated(&self, funding_txo: &OutPoint, highest_applied_update_id: u64, counterparty_node_id: Option<&PublicKey>) { let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); - let htlc_forwards; - let (mut pending_failures, finalized_claims, counterparty_node_id) = { - let counterparty_node_id = match counterparty_node_id { - Some(cp_id) => cp_id.clone(), - None => { - // TODO: Once we can rely on the counterparty_node_id from the - // monitor event, this and the id_to_peer map should be removed. - let id_to_peer = self.id_to_peer.lock().unwrap(); - match id_to_peer.get(&funding_txo.to_channel_id()) { - Some(cp_id) => cp_id.clone(), - None => return, - } - } - }; - let per_peer_state = self.per_peer_state.read().unwrap(); - let mut peer_state_lock; - let peer_state_mutex_opt = per_peer_state.get(&counterparty_node_id); - if peer_state_mutex_opt.is_none() { return } - peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); - let peer_state = &mut *peer_state_lock; - let mut channel = { - match peer_state.channel_by_id.entry(funding_txo.to_channel_id()){ - hash_map::Entry::Occupied(chan) => chan, - hash_map::Entry::Vacant(_) => return, + let counterparty_node_id = match counterparty_node_id { + Some(cp_id) => cp_id.clone(), + None => { + // TODO: Once we can rely on the counterparty_node_id from the + // monitor event, this and the id_to_peer map should be removed. + let id_to_peer = self.id_to_peer.lock().unwrap(); + match id_to_peer.get(&funding_txo.to_channel_id()) { + Some(cp_id) => cp_id.clone(), + None => return, } - }; - if !channel.get().is_awaiting_monitor_update() || channel.get().get_latest_monitor_update_id() != highest_applied_update_id { - return; } - - let updates = channel.get_mut().monitor_updating_restored(&self.logger, &self.node_signer, self.genesis_hash, &self.default_configuration, self.best_block.read().unwrap().height()); - let channel_update = if updates.channel_ready.is_some() && channel.get().is_usable() { - // We only send a channel_update in the case where we are just now sending a - // channel_ready and the channel is in a usable state. We may re-send a - // channel_update later through the announcement_signatures process for public - // channels, but there's no reason not to just inform our counterparty of our fees - // now. - if let Ok(msg) = self.get_channel_update_for_unicast(channel.get()) { - Some(events::MessageSendEvent::SendChannelUpdate { - node_id: channel.get().get_counterparty_node_id(), - msg, - }) - } else { None } - } else { None }; - htlc_forwards = self.handle_channel_resumption(&mut peer_state.pending_msg_events, channel.get_mut(), updates.raa, updates.commitment_update, updates.order, updates.accepted_htlcs, updates.funding_broadcastable, updates.channel_ready, updates.announcement_sigs); - if let Some(upd) = channel_update { - peer_state.pending_msg_events.push(upd); + }; + let per_peer_state = self.per_peer_state.read().unwrap(); + let mut peer_state_lock; + let peer_state_mutex_opt = per_peer_state.get(&counterparty_node_id); + if peer_state_mutex_opt.is_none() { return } + peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); + let peer_state = &mut *peer_state_lock; + let mut channel = { + match peer_state.channel_by_id.entry(funding_txo.to_channel_id()){ + hash_map::Entry::Occupied(chan) => chan, + hash_map::Entry::Vacant(_) => return, } - - (updates.failed_htlcs, updates.finalized_claimed_htlcs, counterparty_node_id) }; - if let Some(forwards) = htlc_forwards { - self.forward_htlcs(&mut [forwards][..]); - } - self.finalize_claims(finalized_claims); - for failure in pending_failures.drain(..) { - let receiver = HTLCDestination::NextHopChannel { node_id: Some(counterparty_node_id), channel_id: funding_txo.to_channel_id() }; - self.fail_htlc_backwards_internal(&failure.0, &failure.1, &failure.2, receiver); + log_trace!(self.logger, "ChannelMonitor updated to {}. Current highest is {}", + highest_applied_update_id, channel.get().get_latest_monitor_update_id()); + if !channel.get().is_awaiting_monitor_update() || channel.get().get_latest_monitor_update_id() != highest_applied_update_id { + return; } + handle_monitor_update_completion!(self, highest_applied_update_id, peer_state_lock, peer_state, channel.get_mut()); } /// Accepts a request to open a channel after a [`Event::OpenChannelRequest`]. -- 2.30.2