From: Matt Corallo <649246+TheBlueMatt@users.noreply.github.com> Date: Mon, 9 Aug 2021 21:41:02 +0000 (+0000) Subject: Merge pull request #1019 from jkczyz/2021-07-shutdown-pubkey X-Git-Tag: v0.0.100~7 X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=commitdiff_plain;h=767f12030b2e33cc8c47b4394401f2b4ba62a3e9;hp=-c;p=rust-lightning Merge pull request #1019 from jkczyz/2021-07-shutdown-pubkey Fetch shutdown script based on `commit_upfront_shutdown_pubkey` --- 767f12030b2e33cc8c47b4394401f2b4ba62a3e9 diff --combined lightning-background-processor/src/lib.rs index 8af4cc6b,7c1da8cd..e73ddeb7 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@@ -39,7 -39,6 +39,7 @@@ use std::ops::Deref /// then there is a risk of channels force-closing on startup when the manager realizes it's /// outdated. However, as long as `ChannelMonitor` backups are sound, no funds besides those used /// for unilateral chain closure fees are at risk. +#[must_use = "BackgroundProcessor will immediately stop on drop. It should be stored until shutdown."] pub struct BackgroundProcessor { stop_thread: Arc, thread_handle: Option>>, @@@ -50,8 -49,6 +50,8 @@@ const FRESHNESS_TIMER: u64 = 60 #[cfg(test)] const FRESHNESS_TIMER: u64 = 1; +const PING_TIMER: u64 = 5; + /// Trait which handles persisting a [`ChannelManager`] to disk. /// /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager @@@ -140,8 -137,7 +140,8 @@@ impl BackgroundProcessor let stop_thread = Arc::new(AtomicBool::new(false)); let stop_thread_clone = stop_thread.clone(); let handle = thread::spawn(move || -> Result<(), std::io::Error> { - let mut current_time = Instant::now(); + let mut last_freshness_call = Instant::now(); + let mut last_ping_call = Instant::now(); loop { peer_manager.process_events(); channel_manager.process_pending_events(&event_handler); @@@ -156,27 -152,11 +156,27 @@@ log_trace!(logger, "Terminating background processor."); return Ok(()); } - if current_time.elapsed().as_secs() > FRESHNESS_TIMER { - log_trace!(logger, "Calling ChannelManager's and PeerManager's timer_tick_occurred"); + if last_freshness_call.elapsed().as_secs() > FRESHNESS_TIMER { + log_trace!(logger, "Calling ChannelManager's timer_tick_occurred"); channel_manager.timer_tick_occurred(); + last_freshness_call = Instant::now(); + } + if last_ping_call.elapsed().as_secs() > PING_TIMER * 2 { + // On various platforms, we may be starved of CPU cycles for several reasons. + // E.g. on iOS, if we've been in the background, we will be entirely paused. + // Similarly, if we're on a desktop platform and the device has been asleep, we + // may not get any cycles. + // In any case, if we've been entirely paused for more than double our ping + // timer, we should have disconnected all sockets by now (and they're probably + // dead anyway), so disconnect them by calling `timer_tick_occurred()` twice. + log_trace!(logger, "Awoke after more than double our ping timer, disconnecting peers."); + peer_manager.timer_tick_occurred(); + peer_manager.timer_tick_occurred(); + last_ping_call = Instant::now(); + } else if last_ping_call.elapsed().as_secs() > PING_TIMER { + log_trace!(logger, "Calling PeerManager's timer_tick_occurred"); peer_manager.timer_tick_occurred(); - current_time = Instant::now(); + last_ping_call = Instant::now(); } } }); @@@ -243,7 -223,7 +243,7 @@@ mod tests use lightning::get_event_msg; use lightning::ln::channelmanager::{BREAKDOWN_TIMEOUT, ChainParameters, ChannelManager, SimpleArcChannelManager}; use lightning::ln::features::InitFeatures; - use lightning::ln::msgs::ChannelMessageHandler; + use lightning::ln::msgs::{ChannelMessageHandler, Init}; use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor}; use lightning::util::config::UserConfig; use lightning::util::events::{Event, MessageSendEventsProvider, MessageSendEvent}; @@@ -317,6 -297,14 +317,14 @@@ let node = Node { node: manager, peer_manager, chain_monitor, persister, tx_broadcaster, logger, best_block }; nodes.push(node); } + + for i in 0..num_nodes { + for j in (i+1)..num_nodes { + nodes[i].node.peer_connected(&nodes[j].node.get_our_node_id(), &Init { features: InitFeatures::known() }); + nodes[j].node.peer_connected(&nodes[i].node.get_our_node_id(), &Init { features: InitFeatures::known() }); + } + } + nodes } @@@ -460,10 -448,8 +468,10 @@@ let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone()); loop { let log_entries = nodes[0].logger.lines.lock().unwrap(); - let desired_log = "Calling ChannelManager's and PeerManager's timer_tick_occurred".to_string(); - if log_entries.get(&("lightning_background_processor".to_string(), desired_log)).is_some() { + let desired_log = "Calling ChannelManager's timer_tick_occurred".to_string(); + let second_desired_log = "Calling PeerManager's timer_tick_occurred".to_string(); + if log_entries.get(&("lightning_background_processor".to_string(), desired_log)).is_some() && + log_entries.get(&("lightning_background_processor".to_string(), second_desired_log)).is_some() { break } } diff --combined lightning/src/ln/channelmanager.rs index 24c0c688,dc2cc54b..09383d40 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@@ -491,6 -491,8 +491,8 @@@ pub struct ChannelManager>>, pending_events: Mutex>, @@@ -871,6 -873,18 +873,18 @@@ macro_rules! try_chan_entry } } + macro_rules! remove_channel { + ($channel_state: expr, $entry: expr) => { + { + let channel = $entry.remove_entry().1; + if let Some(short_id) = channel.get_short_channel_id() { + $channel_state.short_to_id.remove(&short_id); + } + channel + } + } + } + macro_rules! handle_monitor_err { ($self: ident, $err: expr, $channel_state: expr, $entry: expr, $action_type: path, $resend_raa: expr, $resend_commitment: expr) => { handle_monitor_err!($self, $err, $channel_state, $entry, $action_type, $resend_raa, $resend_commitment, Vec::new(), Vec::new()) @@@ -1165,8 -1179,18 +1179,18 @@@ impl { + let peer_state = peer_state.lock().unwrap(); + let their_features = &peer_state.latest_features; + let config = if override_config.is_some() { override_config.as_ref().unwrap() } else { &self.default_configuration }; + Channel::new_outbound(&self.fee_estimator, &self.keys_manager, their_network_key, their_features, channel_value_satoshis, push_msat, user_id, config)? + }, + None => return Err(APIError::ChannelUnavailable { err: format!("Not connected to node: {}", their_network_key) }), + } + }; let res = channel.get_open_channel(self.genesis_hash.clone()); let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); @@@ -1260,40 -1284,61 +1284,61 @@@ pub fn close_channel(&self, channel_id: &[u8; 32]) -> Result<(), APIError> { let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); - let (mut failed_htlcs, chan_option) = { + let counterparty_node_id; + let mut failed_htlcs: Vec<(HTLCSource, PaymentHash)>; + let result: Result<(), _> = loop { let mut channel_state_lock = self.channel_state.lock().unwrap(); let channel_state = &mut *channel_state_lock; match channel_state.by_id.entry(channel_id.clone()) { hash_map::Entry::Occupied(mut chan_entry) => { - let (shutdown_msg, failed_htlcs) = chan_entry.get_mut().get_shutdown()?; + counterparty_node_id = chan_entry.get().get_counterparty_node_id(); + let per_peer_state = self.per_peer_state.read().unwrap(); + let (shutdown_msg, monitor_update, htlcs) = match per_peer_state.get(&counterparty_node_id) { + Some(peer_state) => { + let peer_state = peer_state.lock().unwrap(); + let their_features = &peer_state.latest_features; + chan_entry.get_mut().get_shutdown(&self.keys_manager, their_features)? + }, + None => return Err(APIError::ChannelUnavailable { err: format!("Not connected to node: {}", counterparty_node_id) }), + }; + failed_htlcs = htlcs; + + // Update the monitor with the shutdown script if necessary. + if let Some(monitor_update) = monitor_update { + if let Err(e) = self.chain_monitor.update_channel(chan_entry.get().get_funding_txo().unwrap(), monitor_update) { + let (result, is_permanent) = + handle_monitor_err!(self, e, channel_state.short_to_id, chan_entry.get_mut(), RAACommitmentOrder::CommitmentFirst, false, false, Vec::new(), Vec::new(), chan_entry.key()); + if is_permanent { + remove_channel!(channel_state, chan_entry); + break result; + } + } + } + channel_state.pending_msg_events.push(events::MessageSendEvent::SendShutdown { - node_id: chan_entry.get().get_counterparty_node_id(), + node_id: counterparty_node_id, msg: shutdown_msg }); + if chan_entry.get().is_shutdown() { - if let Some(short_id) = chan_entry.get().get_short_channel_id() { - channel_state.short_to_id.remove(&short_id); + let channel = remove_channel!(channel_state, chan_entry); + if let Ok(channel_update) = self.get_channel_update_for_broadcast(&channel) { + channel_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { + msg: channel_update + }); } - (failed_htlcs, Some(chan_entry.remove_entry().1)) - } else { (failed_htlcs, None) } + } + break Ok(()); }, hash_map::Entry::Vacant(_) => return Err(APIError::ChannelUnavailable{err: "No such channel".to_owned()}) } }; + for htlc_source in failed_htlcs.drain(..) { self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), htlc_source.0, &htlc_source.1, HTLCFailReason::Reason { failure_code: 0x4000 | 8, data: Vec::new() }); } - let chan_update = if let Some(chan) = chan_option { - self.get_channel_update_for_broadcast(&chan).ok() - } else { None }; - - if let Some(update) = chan_update { - let mut channel_state = self.channel_state.lock().unwrap(); - channel_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { - msg: update - }); - } + let _ = handle_error!(self, result, counterparty_node_id); Ok(()) } @@@ -3027,7 -3072,7 +3072,7 @@@ return Err(MsgHandleErrInternal::send_err_msg_no_close("Unknown genesis block hash".to_owned(), msg.temporary_channel_id.clone())); } - let channel = Channel::new_from_req(&self.fee_estimator, &self.keys_manager, counterparty_node_id.clone(), their_features, msg, 0, &self.default_configuration) + let channel = Channel::new_from_req(&self.fee_estimator, &self.keys_manager, counterparty_node_id.clone(), &their_features, msg, 0, &self.default_configuration) .map_err(|e| MsgHandleErrInternal::from_chan_no_close(e, msg.temporary_channel_id))?; let mut channel_state_lock = self.channel_state.lock().unwrap(); let channel_state = &mut *channel_state_lock; @@@ -3053,7 -3098,7 +3098,7 @@@ if chan.get().get_counterparty_node_id() != *counterparty_node_id { return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!".to_owned(), msg.temporary_channel_id)); } - try_chan_entry!(self, chan.get_mut().accept_channel(&msg, &self.default_configuration, their_features), channel_state, chan); + try_chan_entry!(self, chan.get_mut().accept_channel(&msg, &self.default_configuration, &their_features), channel_state, chan); (chan.get().get_value_satoshis(), chan.get().get_funding_redeemscript().to_v0_p2wsh(), chan.get().get_user_id()) }, hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel".to_owned(), msg.temporary_channel_id)) @@@ -3190,7 -3235,8 +3235,8 @@@ } fn internal_shutdown(&self, counterparty_node_id: &PublicKey, their_features: &InitFeatures, msg: &msgs::Shutdown) -> Result<(), MsgHandleErrInternal> { - let (mut dropped_htlcs, chan_option) = { + let mut dropped_htlcs: Vec<(HTLCSource, PaymentHash)>; + let result: Result<(), _> = loop { let mut channel_state_lock = self.channel_state.lock().unwrap(); let channel_state = &mut *channel_state_lock; @@@ -3199,25 -3245,37 +3245,37 @@@ if chan_entry.get().get_counterparty_node_id() != *counterparty_node_id { return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!".to_owned(), msg.channel_id)); } - let (shutdown, closing_signed, dropped_htlcs) = try_chan_entry!(self, chan_entry.get_mut().shutdown(&self.fee_estimator, &their_features, &msg), channel_state, chan_entry); + + let (shutdown, closing_signed, monitor_update, htlcs) = try_chan_entry!(self, chan_entry.get_mut().shutdown(&self.fee_estimator, &self.keys_manager, &their_features, &msg), channel_state, chan_entry); + dropped_htlcs = htlcs; + + // Update the monitor with the shutdown script if necessary. + if let Some(monitor_update) = monitor_update { + if let Err(e) = self.chain_monitor.update_channel(chan_entry.get().get_funding_txo().unwrap(), monitor_update) { + let (result, is_permanent) = + handle_monitor_err!(self, e, channel_state.short_to_id, chan_entry.get_mut(), RAACommitmentOrder::CommitmentFirst, false, false, Vec::new(), Vec::new(), chan_entry.key()); + if is_permanent { + remove_channel!(channel_state, chan_entry); + break result; + } + } + } + if let Some(msg) = shutdown { channel_state.pending_msg_events.push(events::MessageSendEvent::SendShutdown { - node_id: counterparty_node_id.clone(), + node_id: *counterparty_node_id, msg, }); } if let Some(msg) = closing_signed { + // TODO: Do not send this if the monitor update failed. channel_state.pending_msg_events.push(events::MessageSendEvent::SendClosingSigned { - node_id: counterparty_node_id.clone(), + node_id: *counterparty_node_id, msg, }); } - if chan_entry.get().is_shutdown() { - if let Some(short_id) = chan_entry.get().get_short_channel_id() { - channel_state.short_to_id.remove(&short_id); - } - (dropped_htlcs, Some(chan_entry.remove_entry().1)) - } else { (dropped_htlcs, None) } + + break Ok(()); }, hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel".to_owned(), msg.channel_id)) } @@@ -3225,14 -3283,8 +3283,8 @@@ for htlc_source in dropped_htlcs.drain(..) { self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), htlc_source.0, &htlc_source.1, HTLCFailReason::Reason { failure_code: 0x4000 | 8, data: Vec::new() }); } - if let Some(chan) = chan_option { - if let Ok(update) = self.get_channel_update_for_broadcast(&chan) { - let mut channel_state = self.channel_state.lock().unwrap(); - channel_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { - msg: update - }); - } - } + + let _ = handle_error!(self, result, *counterparty_node_id); Ok(()) } @@@ -5000,10 -5052,6 +5052,10 @@@ impl<'a, Signer: Sign, M: Deref, T: Der channel.get_cur_counterparty_commitment_transaction_number() > monitor.get_cur_counterparty_commitment_number() || channel.get_latest_monitor_update_id() < monitor.get_latest_update_id() { // But if the channel is behind of the monitor, close the channel: + log_error!(args.logger, "A ChannelManager is stale compared to the current ChannelMonitor!"); + log_error!(args.logger, " The channel will be force-closed and the latest commitment transaction from the ChannelMonitor broadcast."); + log_error!(args.logger, " The ChannelMonitor for channel {} is at update_id {} but the ChannelManager is at update_id {}.", + log_bytes!(channel.channel_id()), monitor.get_latest_update_id(), channel.get_latest_monitor_update_id()); let (_, mut new_failed_htlcs) = channel.force_shutdown(true); failed_htlcs.append(&mut new_failed_htlcs); monitor.broadcast_latest_holder_commitment_txn(&args.tx_broadcaster, &args.logger); @@@ -5563,7 -5611,7 +5615,7 @@@ pub mod bench use ln::channelmanager::{BestBlock, ChainParameters, ChannelManager, PaymentHash, PaymentPreimage}; use ln::features::{InitFeatures, InvoiceFeatures}; use ln::functional_test_utils::*; - use ln::msgs::ChannelMessageHandler; + use ln::msgs::{ChannelMessageHandler, Init}; use routing::network_graph::NetworkGraph; use routing::router::get_route; use util::test_utils; @@@ -5626,6 -5674,8 +5678,8 @@@ }); let node_b_holder = NodeHolder { node: &node_b }; + node_a.peer_connected(&node_b.get_our_node_id(), &Init { features: InitFeatures::known() }); + node_b.peer_connected(&node_a.get_our_node_id(), &Init { features: InitFeatures::known() }); node_a.create_channel(node_b.get_our_node_id(), 8_000_000, 100_000_000, 42, None).unwrap(); node_b.handle_open_channel(&node_a.get_our_node_id(), InitFeatures::known(), &get_event_msg!(node_a_holder, MessageSendEvent::SendOpenChannel, node_b.get_our_node_id())); node_a.handle_accept_channel(&node_b.get_our_node_id(), InitFeatures::known(), &get_event_msg!(node_b_holder, MessageSendEvent::SendAcceptChannel, node_a.get_our_node_id()));