Merge pull request #1019 from jkczyz/2021-07-shutdown-pubkey
authorMatt Corallo <649246+TheBlueMatt@users.noreply.github.com>
Mon, 9 Aug 2021 21:41:02 +0000 (21:41 +0000)
committerGitHub <noreply@github.com>
Mon, 9 Aug 2021 21:41:02 +0000 (21:41 +0000)
Fetch shutdown script based on `commit_upfront_shutdown_pubkey`

1  2 
lightning-background-processor/src/lib.rs
lightning/src/ln/channelmanager.rs

index 8af4cc6b3e51727c7c8261604342a8485bd6cc24,7c1da8cd9d3a1178e00a98eb7bfcf9cb09b00246..e73ddeb709c11aca96ea04887c4eca2cb6cea388
@@@ -39,7 -39,6 +39,7 @@@ use std::ops::Deref
  /// then there is a risk of channels force-closing on startup when the manager realizes it's
  /// outdated. However, as long as `ChannelMonitor` backups are sound, no funds besides those used
  /// for unilateral chain closure fees are at risk.
 +#[must_use = "BackgroundProcessor will immediately stop on drop. It should be stored until shutdown."]
  pub struct BackgroundProcessor {
        stop_thread: Arc<AtomicBool>,
        thread_handle: Option<JoinHandle<Result<(), std::io::Error>>>,
@@@ -50,8 -49,6 +50,8 @@@ const FRESHNESS_TIMER: u64 = 60
  #[cfg(test)]
  const FRESHNESS_TIMER: u64 = 1;
  
 +const PING_TIMER: u64 = 5;
 +
  /// Trait which handles persisting a [`ChannelManager`] to disk.
  ///
  /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
@@@ -140,8 -137,7 +140,8 @@@ impl BackgroundProcessor 
                let stop_thread = Arc::new(AtomicBool::new(false));
                let stop_thread_clone = stop_thread.clone();
                let handle = thread::spawn(move || -> Result<(), std::io::Error> {
 -                      let mut current_time = Instant::now();
 +                      let mut last_freshness_call = Instant::now();
 +                      let mut last_ping_call = Instant::now();
                        loop {
                                peer_manager.process_events();
                                channel_manager.process_pending_events(&event_handler);
                                        log_trace!(logger, "Terminating background processor.");
                                        return Ok(());
                                }
 -                              if current_time.elapsed().as_secs() > FRESHNESS_TIMER {
 -                                      log_trace!(logger, "Calling ChannelManager's and PeerManager's timer_tick_occurred");
 +                              if last_freshness_call.elapsed().as_secs() > FRESHNESS_TIMER {
 +                                      log_trace!(logger, "Calling ChannelManager's timer_tick_occurred");
                                        channel_manager.timer_tick_occurred();
 +                                      last_freshness_call = Instant::now();
 +                              }
 +                              if last_ping_call.elapsed().as_secs() > PING_TIMER * 2 {
 +                                      // On various platforms, we may be starved of CPU cycles for several reasons.
 +                                      // E.g. on iOS, if we've been in the background, we will be entirely paused.
 +                                      // Similarly, if we're on a desktop platform and the device has been asleep, we
 +                                      // may not get any cycles.
 +                                      // In any case, if we've been entirely paused for more than double our ping
 +                                      // timer, we should have disconnected all sockets by now (and they're probably
 +                                      // dead anyway), so disconnect them by calling `timer_tick_occurred()` twice.
 +                                      log_trace!(logger, "Awoke after more than double our ping timer, disconnecting peers.");
 +                                      peer_manager.timer_tick_occurred();
 +                                      peer_manager.timer_tick_occurred();
 +                                      last_ping_call = Instant::now();
 +                              } else if last_ping_call.elapsed().as_secs() > PING_TIMER {
 +                                      log_trace!(logger, "Calling PeerManager's timer_tick_occurred");
                                        peer_manager.timer_tick_occurred();
 -                                      current_time = Instant::now();
 +                                      last_ping_call = Instant::now();
                                }
                        }
                });
@@@ -243,7 -223,7 +243,7 @@@ mod tests 
        use lightning::get_event_msg;
        use lightning::ln::channelmanager::{BREAKDOWN_TIMEOUT, ChainParameters, ChannelManager, SimpleArcChannelManager};
        use lightning::ln::features::InitFeatures;
-       use lightning::ln::msgs::ChannelMessageHandler;
+       use lightning::ln::msgs::{ChannelMessageHandler, Init};
        use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor};
        use lightning::util::config::UserConfig;
        use lightning::util::events::{Event, MessageSendEventsProvider, MessageSendEvent};
                        let node = Node { node: manager, peer_manager, chain_monitor, persister, tx_broadcaster, logger, best_block };
                        nodes.push(node);
                }
+               for i in 0..num_nodes {
+                       for j in (i+1)..num_nodes {
+                               nodes[i].node.peer_connected(&nodes[j].node.get_our_node_id(), &Init { features: InitFeatures::known() });
+                               nodes[j].node.peer_connected(&nodes[i].node.get_our_node_id(), &Init { features: InitFeatures::known() });
+                       }
+               }
                nodes
        }
  
                let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone());
                loop {
                        let log_entries = nodes[0].logger.lines.lock().unwrap();
 -                      let desired_log = "Calling ChannelManager's and PeerManager's timer_tick_occurred".to_string();
 -                      if log_entries.get(&("lightning_background_processor".to_string(), desired_log)).is_some() {
 +                      let desired_log = "Calling ChannelManager's timer_tick_occurred".to_string();
 +                      let second_desired_log = "Calling PeerManager's timer_tick_occurred".to_string();
 +                      if log_entries.get(&("lightning_background_processor".to_string(), desired_log)).is_some() &&
 +                                      log_entries.get(&("lightning_background_processor".to_string(), second_desired_log)).is_some() {
                                break
                        }
                }
index 24c0c688cf0843274a61954433098d31324810a5,dc2cc54b53de422942e9718762f8eb7f286a3e18..09383d40301b9f387cd00ec6060dffc5d6674374
@@@ -491,6 -491,8 +491,8 @@@ pub struct ChannelManager<Signer: Sign
        /// Because adding or removing an entry is rare, we usually take an outer read lock and then
        /// operate on the inner value freely. Sadly, this prevents parallel operation when opening a
        /// new channel.
+       ///
+       /// If also holding `channel_state` lock, must lock `channel_state` prior to `per_peer_state`.
        per_peer_state: RwLock<HashMap<PublicKey, Mutex<PeerState>>>,
  
        pending_events: Mutex<Vec<events::Event>>,
@@@ -871,6 -873,18 +873,18 @@@ macro_rules! try_chan_entry 
        }
  }
  
+ macro_rules! remove_channel {
+       ($channel_state: expr, $entry: expr) => {
+               {
+                       let channel = $entry.remove_entry().1;
+                       if let Some(short_id) = channel.get_short_channel_id() {
+                               $channel_state.short_to_id.remove(&short_id);
+                       }
+                       channel
+               }
+       }
+ }
  macro_rules! handle_monitor_err {
        ($self: ident, $err: expr, $channel_state: expr, $entry: expr, $action_type: path, $resend_raa: expr, $resend_commitment: expr) => {
                handle_monitor_err!($self, $err, $channel_state, $entry, $action_type, $resend_raa, $resend_commitment, Vec::new(), Vec::new())
@@@ -1165,8 -1179,18 +1179,18 @@@ impl<Signer: Sign, M: Deref, T: Deref, 
                        return Err(APIError::APIMisuseError { err: format!("Channel value must be at least 1000 satoshis. It was {}", channel_value_satoshis) });
                }
  
-               let config = if override_config.is_some() { override_config.as_ref().unwrap() } else { &self.default_configuration };
-               let channel = Channel::new_outbound(&self.fee_estimator, &self.keys_manager, their_network_key, channel_value_satoshis, push_msat, user_id, config)?;
+               let channel = {
+                       let per_peer_state = self.per_peer_state.read().unwrap();
+                       match per_peer_state.get(&their_network_key) {
+                               Some(peer_state) => {
+                                       let peer_state = peer_state.lock().unwrap();
+                                       let their_features = &peer_state.latest_features;
+                                       let config = if override_config.is_some() { override_config.as_ref().unwrap() } else { &self.default_configuration };
+                                       Channel::new_outbound(&self.fee_estimator, &self.keys_manager, their_network_key, their_features, channel_value_satoshis, push_msat, user_id, config)?
+                               },
+                               None => return Err(APIError::ChannelUnavailable { err: format!("Not connected to node: {}", their_network_key) }),
+                       }
+               };
                let res = channel.get_open_channel(self.genesis_hash.clone());
  
                let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);
        pub fn close_channel(&self, channel_id: &[u8; 32]) -> Result<(), APIError> {
                let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);
  
-               let (mut failed_htlcs, chan_option) = {
+               let counterparty_node_id;
+               let mut failed_htlcs: Vec<(HTLCSource, PaymentHash)>;
+               let result: Result<(), _> = loop {
                        let mut channel_state_lock = self.channel_state.lock().unwrap();
                        let channel_state = &mut *channel_state_lock;
                        match channel_state.by_id.entry(channel_id.clone()) {
                                hash_map::Entry::Occupied(mut chan_entry) => {
-                                       let (shutdown_msg, failed_htlcs) = chan_entry.get_mut().get_shutdown()?;
+                                       counterparty_node_id = chan_entry.get().get_counterparty_node_id();
+                                       let per_peer_state = self.per_peer_state.read().unwrap();
+                                       let (shutdown_msg, monitor_update, htlcs) = match per_peer_state.get(&counterparty_node_id) {
+                                               Some(peer_state) => {
+                                                       let peer_state = peer_state.lock().unwrap();
+                                                       let their_features = &peer_state.latest_features;
+                                                       chan_entry.get_mut().get_shutdown(&self.keys_manager, their_features)?
+                                               },
+                                               None => return Err(APIError::ChannelUnavailable { err: format!("Not connected to node: {}", counterparty_node_id) }),
+                                       };
+                                       failed_htlcs = htlcs;
+                                       // Update the monitor with the shutdown script if necessary.
+                                       if let Some(monitor_update) = monitor_update {
+                                               if let Err(e) = self.chain_monitor.update_channel(chan_entry.get().get_funding_txo().unwrap(), monitor_update) {
+                                                       let (result, is_permanent) =
+                                                               handle_monitor_err!(self, e, channel_state.short_to_id, chan_entry.get_mut(), RAACommitmentOrder::CommitmentFirst, false, false, Vec::new(), Vec::new(), chan_entry.key());
+                                                       if is_permanent {
+                                                               remove_channel!(channel_state, chan_entry);
+                                                               break result;
+                                                       }
+                                               }
+                                       }
                                        channel_state.pending_msg_events.push(events::MessageSendEvent::SendShutdown {
-                                               node_id: chan_entry.get().get_counterparty_node_id(),
+                                               node_id: counterparty_node_id,
                                                msg: shutdown_msg
                                        });
                                        if chan_entry.get().is_shutdown() {
-                                               if let Some(short_id) = chan_entry.get().get_short_channel_id() {
-                                                       channel_state.short_to_id.remove(&short_id);
+                                               let channel = remove_channel!(channel_state, chan_entry);
+                                               if let Ok(channel_update) = self.get_channel_update_for_broadcast(&channel) {
+                                                       channel_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
+                                                               msg: channel_update
+                                                       });
                                                }
-                                               (failed_htlcs, Some(chan_entry.remove_entry().1))
-                                       } else { (failed_htlcs, None) }
+                                       }
+                                       break Ok(());
                                },
                                hash_map::Entry::Vacant(_) => return Err(APIError::ChannelUnavailable{err: "No such channel".to_owned()})
                        }
                };
                for htlc_source in failed_htlcs.drain(..) {
                        self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), htlc_source.0, &htlc_source.1, HTLCFailReason::Reason { failure_code: 0x4000 | 8, data: Vec::new() });
                }
-               let chan_update = if let Some(chan) = chan_option {
-                       self.get_channel_update_for_broadcast(&chan).ok()
-               } else { None };
-               if let Some(update) = chan_update {
-                       let mut channel_state = self.channel_state.lock().unwrap();
-                       channel_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
-                               msg: update
-                       });
-               }
  
+               let _ = handle_error!(self, result, counterparty_node_id);
                Ok(())
        }
  
                        return Err(MsgHandleErrInternal::send_err_msg_no_close("Unknown genesis block hash".to_owned(), msg.temporary_channel_id.clone()));
                }
  
-               let channel = Channel::new_from_req(&self.fee_estimator, &self.keys_manager, counterparty_node_id.clone(), their_features, msg, 0, &self.default_configuration)
+               let channel = Channel::new_from_req(&self.fee_estimator, &self.keys_manager, counterparty_node_id.clone(), &their_features, msg, 0, &self.default_configuration)
                        .map_err(|e| MsgHandleErrInternal::from_chan_no_close(e, msg.temporary_channel_id))?;
                let mut channel_state_lock = self.channel_state.lock().unwrap();
                let channel_state = &mut *channel_state_lock;
                                        if chan.get().get_counterparty_node_id() != *counterparty_node_id {
                                                return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!".to_owned(), msg.temporary_channel_id));
                                        }
-                                       try_chan_entry!(self, chan.get_mut().accept_channel(&msg, &self.default_configuration, their_features), channel_state, chan);
+                                       try_chan_entry!(self, chan.get_mut().accept_channel(&msg, &self.default_configuration, &their_features), channel_state, chan);
                                        (chan.get().get_value_satoshis(), chan.get().get_funding_redeemscript().to_v0_p2wsh(), chan.get().get_user_id())
                                },
                                hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel".to_owned(), msg.temporary_channel_id))
        }
  
        fn internal_shutdown(&self, counterparty_node_id: &PublicKey, their_features: &InitFeatures, msg: &msgs::Shutdown) -> Result<(), MsgHandleErrInternal> {
-               let (mut dropped_htlcs, chan_option) = {
+               let mut dropped_htlcs: Vec<(HTLCSource, PaymentHash)>;
+               let result: Result<(), _> = loop {
                        let mut channel_state_lock = self.channel_state.lock().unwrap();
                        let channel_state = &mut *channel_state_lock;
  
                                        if chan_entry.get().get_counterparty_node_id() != *counterparty_node_id {
                                                return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!".to_owned(), msg.channel_id));
                                        }
-                                       let (shutdown, closing_signed, dropped_htlcs) = try_chan_entry!(self, chan_entry.get_mut().shutdown(&self.fee_estimator, &their_features, &msg), channel_state, chan_entry);
+                                       let (shutdown, closing_signed, monitor_update, htlcs) = try_chan_entry!(self, chan_entry.get_mut().shutdown(&self.fee_estimator, &self.keys_manager, &their_features, &msg), channel_state, chan_entry);
+                                       dropped_htlcs = htlcs;
+                                       // Update the monitor with the shutdown script if necessary.
+                                       if let Some(monitor_update) = monitor_update {
+                                               if let Err(e) = self.chain_monitor.update_channel(chan_entry.get().get_funding_txo().unwrap(), monitor_update) {
+                                                       let (result, is_permanent) =
+                                                               handle_monitor_err!(self, e, channel_state.short_to_id, chan_entry.get_mut(), RAACommitmentOrder::CommitmentFirst, false, false, Vec::new(), Vec::new(), chan_entry.key());
+                                                       if is_permanent {
+                                                               remove_channel!(channel_state, chan_entry);
+                                                               break result;
+                                                       }
+                                               }
+                                       }
                                        if let Some(msg) = shutdown {
                                                channel_state.pending_msg_events.push(events::MessageSendEvent::SendShutdown {
-                                                       node_id: counterparty_node_id.clone(),
+                                                       node_id: *counterparty_node_id,
                                                        msg,
                                                });
                                        }
                                        if let Some(msg) = closing_signed {
+                                               // TODO: Do not send this if the monitor update failed.
                                                channel_state.pending_msg_events.push(events::MessageSendEvent::SendClosingSigned {
-                                                       node_id: counterparty_node_id.clone(),
+                                                       node_id: *counterparty_node_id,
                                                        msg,
                                                });
                                        }
-                                       if chan_entry.get().is_shutdown() {
-                                               if let Some(short_id) = chan_entry.get().get_short_channel_id() {
-                                                       channel_state.short_to_id.remove(&short_id);
-                                               }
-                                               (dropped_htlcs, Some(chan_entry.remove_entry().1))
-                                       } else { (dropped_htlcs, None) }
+                                       break Ok(());
                                },
                                hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel".to_owned(), msg.channel_id))
                        }
                for htlc_source in dropped_htlcs.drain(..) {
                        self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), htlc_source.0, &htlc_source.1, HTLCFailReason::Reason { failure_code: 0x4000 | 8, data: Vec::new() });
                }
-               if let Some(chan) = chan_option {
-                       if let Ok(update) = self.get_channel_update_for_broadcast(&chan) {
-                               let mut channel_state = self.channel_state.lock().unwrap();
-                               channel_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
-                                       msg: update
-                               });
-                       }
-               }
+               let _ = handle_error!(self, result, *counterparty_node_id);
                Ok(())
        }
  
@@@ -5000,10 -5052,6 +5052,10 @@@ impl<'a, Signer: Sign, M: Deref, T: Der
                                                channel.get_cur_counterparty_commitment_transaction_number() > monitor.get_cur_counterparty_commitment_number() ||
                                                channel.get_latest_monitor_update_id() < monitor.get_latest_update_id() {
                                        // But if the channel is behind of the monitor, close the channel:
 +                                      log_error!(args.logger, "A ChannelManager is stale compared to the current ChannelMonitor!");
 +                                      log_error!(args.logger, " The channel will be force-closed and the latest commitment transaction from the ChannelMonitor broadcast.");
 +                                      log_error!(args.logger, " The ChannelMonitor for channel {} is at update_id {} but the ChannelManager is at update_id {}.",
 +                                              log_bytes!(channel.channel_id()), monitor.get_latest_update_id(), channel.get_latest_monitor_update_id());
                                        let (_, mut new_failed_htlcs) = channel.force_shutdown(true);
                                        failed_htlcs.append(&mut new_failed_htlcs);
                                        monitor.broadcast_latest_holder_commitment_txn(&args.tx_broadcaster, &args.logger);
@@@ -5563,7 -5611,7 +5615,7 @@@ pub mod bench 
        use ln::channelmanager::{BestBlock, ChainParameters, ChannelManager, PaymentHash, PaymentPreimage};
        use ln::features::{InitFeatures, InvoiceFeatures};
        use ln::functional_test_utils::*;
-       use ln::msgs::ChannelMessageHandler;
+       use ln::msgs::{ChannelMessageHandler, Init};
        use routing::network_graph::NetworkGraph;
        use routing::router::get_route;
        use util::test_utils;
                });
                let node_b_holder = NodeHolder { node: &node_b };
  
+               node_a.peer_connected(&node_b.get_our_node_id(), &Init { features: InitFeatures::known() });
+               node_b.peer_connected(&node_a.get_our_node_id(), &Init { features: InitFeatures::known() });
                node_a.create_channel(node_b.get_our_node_id(), 8_000_000, 100_000_000, 42, None).unwrap();
                node_b.handle_open_channel(&node_a.get_our_node_id(), InitFeatures::known(), &get_event_msg!(node_a_holder, MessageSendEvent::SendOpenChannel, node_b.get_our_node_id()));
                node_a.handle_accept_channel(&node_b.get_our_node_id(), InitFeatures::known(), &get_event_msg!(node_b_holder, MessageSendEvent::SendAcceptChannel, node_a.get_our_node_id()));