X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning%2Fsrc%2Fln%2Fchannelmanager.rs;h=392f3e1cb052cf196b50b2b7e3734b62ab167e4c;hb=4ab6c551a00ce167abfad2ea97310e8043e08375;hp=ed00282fb718d526aebb5ea3d41b4614732d4b67;hpb=f6a45056791b2dbbd0b41f581080d9884cda1e3a;p=rust-lightning diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index ed00282f..392f3e1c 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -64,7 +64,7 @@ use crate::util::ser::{BigSize, FixedLengthReader, Readable, ReadableArgs, Maybe use crate::util::logger::{Level, Logger}; use crate::util::errors::APIError; -use alloc::collections::BTreeMap; +use alloc::collections::{btree_map, BTreeMap}; use crate::io; use crate::prelude::*; @@ -77,7 +77,7 @@ use core::time::Duration; use core::ops::Deref; // Re-export this for use in the public API. -pub use crate::ln::outbound_payment::{PaymentSendFailure, Retry, RetryableSendFailure, RecipientOnionFields}; +pub use crate::ln::outbound_payment::{PaymentSendFailure, ProbeSendFailure, Retry, RetryableSendFailure, RecipientOnionFields}; use crate::ln::script::ShutdownScript; // We hold various information about HTLC relay in the HTLC objects in Channel itself: @@ -177,7 +177,7 @@ pub(super) enum HTLCForwardInfo { } /// Tracks the inbound corresponding to an outbound HTLC -#[derive(Clone, Hash, PartialEq, Eq)] +#[derive(Clone, Debug, Hash, PartialEq, Eq)] pub(crate) struct HTLCPreviousHopData { // Note that this may be an outbound SCID alias for the associated channel. short_channel_id: u64, @@ -283,7 +283,7 @@ impl Readable for InterceptId { } } -#[derive(Clone, Copy, PartialEq, Eq, Hash)] +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] /// Uniquely describes an HTLC by its source. Just the guaranteed-unique subset of [`HTLCSource`]. pub(crate) enum SentHTLCId { PreviousHopData { short_channel_id: u64, htlc_id: u64 }, @@ -314,7 +314,7 @@ impl_writeable_tlv_based_enum!(SentHTLCId, /// Tracks the inbound corresponding to an outbound HTLC #[allow(clippy::derive_hash_xor_eq)] // Our Hash is faithful to the data, we just don't have SecretKey::hash -#[derive(Clone, PartialEq, Eq)] +#[derive(Clone, Debug, PartialEq, Eq)] pub(crate) enum HTLCSource { PreviousHopData(HTLCPreviousHopData), OutboundRoute { @@ -495,6 +495,10 @@ impl MsgHandleErrInternal { channel_capacity: None, } } + + fn closes_channel(&self) -> bool { + self.chan_id.is_some() + } } /// We hold back HTLCs we intend to relay for a random interval greater than this (see @@ -656,7 +660,6 @@ pub(crate) enum RAAMonitorUpdateBlockingAction { } impl RAAMonitorUpdateBlockingAction { - #[allow(unused)] fn from_prev_hop_data(prev_hop: &HTLCPreviousHopData) -> Self { Self::ForwardedPaymentInboundClaim { channel_id: prev_hop.outpoint.to_channel_id(), @@ -836,33 +839,46 @@ pub type SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, 'h, M, T, F, L> = &'g L >; -macro_rules! define_test_pub_trait { ($vis: vis) => { -/// A trivial trait which describes any [`ChannelManager`] used in testing. -$vis trait AChannelManager { +/// A trivial trait which describes any [`ChannelManager`]. +pub trait AChannelManager { + /// A type implementing [`chain::Watch`]. type Watch: chain::Watch + ?Sized; + /// A type that may be dereferenced to [`Self::Watch`]. type M: Deref; + /// A type implementing [`BroadcasterInterface`]. type Broadcaster: BroadcasterInterface + ?Sized; + /// A type that may be dereferenced to [`Self::Broadcaster`]. type T: Deref; + /// A type implementing [`EntropySource`]. type EntropySource: EntropySource + ?Sized; + /// A type that may be dereferenced to [`Self::EntropySource`]. type ES: Deref; + /// A type implementing [`NodeSigner`]. type NodeSigner: NodeSigner + ?Sized; + /// A type that may be dereferenced to [`Self::NodeSigner`]. type NS: Deref; + /// A type implementing [`WriteableEcdsaChannelSigner`]. type Signer: WriteableEcdsaChannelSigner + Sized; + /// A type implementing [`SignerProvider`] for [`Self::Signer`]. type SignerProvider: SignerProvider + ?Sized; + /// A type that may be dereferenced to [`Self::SignerProvider`]. type SP: Deref; + /// A type implementing [`FeeEstimator`]. type FeeEstimator: FeeEstimator + ?Sized; + /// A type that may be dereferenced to [`Self::FeeEstimator`]. type F: Deref; + /// A type implementing [`Router`]. type Router: Router + ?Sized; + /// A type that may be dereferenced to [`Self::Router`]. type R: Deref; + /// A type implementing [`Logger`]. type Logger: Logger + ?Sized; + /// A type that may be dereferenced to [`Self::Logger`]. type L: Deref; + /// Returns a reference to the actual [`ChannelManager`] object. fn get_cm(&self) -> &ChannelManager; } -} } -#[cfg(any(test, feature = "_test_utils"))] -define_test_pub_trait!(pub); -#[cfg(not(any(test, feature = "_test_utils")))] -define_test_pub_trait!(pub(crate)); + impl AChannelManager for ChannelManager where @@ -907,12 +923,14 @@ where /// called [`funding_transaction_generated`] for outbound channels) being closed. /// /// Note that you can be a bit lazier about writing out `ChannelManager` than you can be with -/// [`ChannelMonitor`]. With [`ChannelMonitor`] you MUST write each monitor update out to disk before -/// returning from [`chain::Watch::watch_channel`]/[`update_channel`], with ChannelManagers, writing updates -/// happens out-of-band (and will prevent any other `ChannelManager` operations from occurring during -/// the serialization process). If the deserialized version is out-of-date compared to the -/// [`ChannelMonitor`] passed by reference to [`read`], those channels will be force-closed based on the -/// `ChannelMonitor` state and no funds will be lost (mod on-chain transaction fees). +/// [`ChannelMonitor`]. With [`ChannelMonitor`] you MUST durably write each +/// [`ChannelMonitorUpdate`] before returning from +/// [`chain::Watch::watch_channel`]/[`update_channel`] or before completing async writes. With +/// `ChannelManager`s, writing updates happens out-of-band (and will prevent any other +/// `ChannelManager` operations from occurring during the serialization process). If the +/// deserialized version is out-of-date compared to the [`ChannelMonitor`] passed by reference to +/// [`read`], those channels will be force-closed based on the `ChannelMonitor` state and no funds +/// will be lost (modulo on-chain transaction fees). /// /// Note that the deserializer is only implemented for `(`[`BlockHash`]`, `[`ChannelManager`]`)`, which /// tells you the last block hash which was connected. You should get the best block tip before using the manager. @@ -1183,10 +1201,17 @@ where /// `PersistenceNotifierGuard::notify_on_drop(..)` and pass the lock to it, to ensure the /// Notifier the lock contains sends out a notification when the lock is released. total_consistency_lock: RwLock<()>, + /// Tracks the progress of channels going through batch funding by whether funding_signed was + /// received and the monitor has been persisted. + /// + /// This information does not need to be persisted as funding nodes can forget + /// unfunded channels upon disconnection. + funding_batch_states: Mutex>>, background_events_processed_since_startup: AtomicBool, - persistence_notifier: Notifier, + event_persist_notifier: Notifier, + needs_persist_flag: AtomicBool, entropy_source: ES, node_signer: NS, @@ -1215,7 +1240,8 @@ pub struct ChainParameters { #[must_use] enum NotifyOption { DoPersist, - SkipPersist, + SkipPersistHandleEvents, + SkipPersistNoEvents, } /// Whenever we release the `ChannelManager`'s `total_consistency_lock`, from read mode, it is @@ -1228,43 +1254,75 @@ enum NotifyOption { /// We allow callers to either always notify by constructing with `notify_on_drop` or choose to /// notify or not based on whether relevant changes have been made, providing a closure to /// `optionally_notify` which returns a `NotifyOption`. -struct PersistenceNotifierGuard<'a, F: Fn() -> NotifyOption> { - persistence_notifier: &'a Notifier, +struct PersistenceNotifierGuard<'a, F: FnMut() -> NotifyOption> { + event_persist_notifier: &'a Notifier, + needs_persist_flag: &'a AtomicBool, should_persist: F, // We hold onto this result so the lock doesn't get released immediately. _read_guard: RwLockReadGuard<'a, ()>, } impl<'a> PersistenceNotifierGuard<'a, fn() -> NotifyOption> { // We don't care what the concrete F is here, it's unused - fn notify_on_drop(cm: &'a C) -> PersistenceNotifierGuard<'a, impl Fn() -> NotifyOption> { + /// Notifies any waiters and indicates that we need to persist, in addition to possibly having + /// events to handle. + /// + /// This must always be called if the changes included a `ChannelMonitorUpdate`, as well as in + /// other cases where losing the changes on restart may result in a force-close or otherwise + /// isn't ideal. + fn notify_on_drop(cm: &'a C) -> PersistenceNotifierGuard<'a, impl FnMut() -> NotifyOption> { + Self::optionally_notify(cm, || -> NotifyOption { NotifyOption::DoPersist }) + } + + fn optionally_notify NotifyOption, C: AChannelManager>(cm: &'a C, mut persist_check: F) + -> PersistenceNotifierGuard<'a, impl FnMut() -> NotifyOption> { let read_guard = cm.get_cm().total_consistency_lock.read().unwrap(); - let _ = cm.get_cm().process_background_events(); // We always persist + let force_notify = cm.get_cm().process_background_events(); PersistenceNotifierGuard { - persistence_notifier: &cm.get_cm().persistence_notifier, - should_persist: || -> NotifyOption { NotifyOption::DoPersist }, + event_persist_notifier: &cm.get_cm().event_persist_notifier, + needs_persist_flag: &cm.get_cm().needs_persist_flag, + should_persist: move || { + // Pick the "most" action between `persist_check` and the background events + // processing and return that. + let notify = persist_check(); + match (notify, force_notify) { + (NotifyOption::DoPersist, _) => NotifyOption::DoPersist, + (_, NotifyOption::DoPersist) => NotifyOption::DoPersist, + (NotifyOption::SkipPersistHandleEvents, _) => NotifyOption::SkipPersistHandleEvents, + (_, NotifyOption::SkipPersistHandleEvents) => NotifyOption::SkipPersistHandleEvents, + _ => NotifyOption::SkipPersistNoEvents, + } + }, _read_guard: read_guard, } - } /// Note that if any [`ChannelMonitorUpdate`]s are possibly generated, - /// [`ChannelManager::process_background_events`] MUST be called first. - fn optionally_notify NotifyOption>(lock: &'a RwLock<()>, notifier: &'a Notifier, persist_check: F) -> PersistenceNotifierGuard<'a, F> { - let read_guard = lock.read().unwrap(); + /// [`ChannelManager::process_background_events`] MUST be called first (or + /// [`Self::optionally_notify`] used). + fn optionally_notify_skipping_background_events NotifyOption, C: AChannelManager> + (cm: &'a C, persist_check: F) -> PersistenceNotifierGuard<'a, F> { + let read_guard = cm.get_cm().total_consistency_lock.read().unwrap(); PersistenceNotifierGuard { - persistence_notifier: notifier, + event_persist_notifier: &cm.get_cm().event_persist_notifier, + needs_persist_flag: &cm.get_cm().needs_persist_flag, should_persist: persist_check, _read_guard: read_guard, } } } -impl<'a, F: Fn() -> NotifyOption> Drop for PersistenceNotifierGuard<'a, F> { +impl<'a, F: FnMut() -> NotifyOption> Drop for PersistenceNotifierGuard<'a, F> { fn drop(&mut self) { - if (self.should_persist)() == NotifyOption::DoPersist { - self.persistence_notifier.notify(); + match (self.should_persist)() { + NotifyOption::DoPersist => { + self.needs_persist_flag.store(true, Ordering::Release); + self.event_persist_notifier.notify() + }, + NotifyOption::SkipPersistHandleEvents => + self.event_persist_notifier.notify(), + NotifyOption::SkipPersistNoEvents => {}, } } } @@ -1736,7 +1794,7 @@ macro_rules! handle_error { let mut msg_events = Vec::with_capacity(2); if let Some((shutdown_res, update_option)) = shutdown_finish { - $self.finish_force_close_channel(shutdown_res); + $self.finish_close_channel(shutdown_res); if let Some(update) = update_option { msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { msg: update @@ -1973,9 +2031,54 @@ macro_rules! handle_monitor_update_completion { } let channel_id = $chan.context.channel_id(); + let unbroadcasted_batch_funding_txid = $chan.context.unbroadcasted_batch_funding_txid(); core::mem::drop($peer_state_lock); core::mem::drop($per_peer_state_lock); + // If the channel belongs to a batch funding transaction, the progress of the batch + // should be updated as we have received funding_signed and persisted the monitor. + if let Some(txid) = unbroadcasted_batch_funding_txid { + let mut funding_batch_states = $self.funding_batch_states.lock().unwrap(); + let mut batch_completed = false; + if let Some(batch_state) = funding_batch_states.get_mut(&txid) { + let channel_state = batch_state.iter_mut().find(|(chan_id, pubkey, _)| ( + *chan_id == channel_id && + *pubkey == counterparty_node_id + )); + if let Some(channel_state) = channel_state { + channel_state.2 = true; + } else { + debug_assert!(false, "Missing channel batch state for channel which completed initial monitor update"); + } + batch_completed = batch_state.iter().all(|(_, _, completed)| *completed); + } else { + debug_assert!(false, "Missing batch state for channel which completed initial monitor update"); + } + + // When all channels in a batched funding transaction have become ready, it is not necessary + // to track the progress of the batch anymore and the state of the channels can be updated. + if batch_completed { + let removed_batch_state = funding_batch_states.remove(&txid).into_iter().flatten(); + let per_peer_state = $self.per_peer_state.read().unwrap(); + let mut batch_funding_tx = None; + for (channel_id, counterparty_node_id, _) in removed_batch_state { + if let Some(peer_state_mutex) = per_peer_state.get(&counterparty_node_id) { + let mut peer_state = peer_state_mutex.lock().unwrap(); + if let Some(ChannelPhase::Funded(chan)) = peer_state.channel_by_id.get_mut(&channel_id) { + batch_funding_tx = batch_funding_tx.or_else(|| chan.context.unbroadcasted_funding()); + chan.set_batch_ready(); + let mut pending_events = $self.pending_events.lock().unwrap(); + emit_channel_pending_event!(pending_events, chan); + } + } + } + if let Some(tx) = batch_funding_tx { + log_info!($self.logger, "Broadcasting batch funding transaction with txid {}", tx.txid()); + $self.tx_broadcaster.broadcast_transactions(&[&tx]); + } + } + } + $self.handle_monitor_update_completion_actions(update_actions); if let Some(forwards) = htlc_forwards { @@ -1990,56 +2093,30 @@ macro_rules! handle_monitor_update_completion { } macro_rules! handle_new_monitor_update { - ($self: ident, $update_res: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan: expr, _internal, $remove: expr, $completed: expr) => { { - // update_maps_on_chan_removal needs to be able to take id_to_peer, so make sure we can in - // any case so that it won't deadlock. - debug_assert_ne!($self.id_to_peer.held_by_thread(), LockHeldState::HeldByThread); + ($self: ident, $update_res: expr, $chan: expr, _internal, $completed: expr) => { { debug_assert!($self.background_events_processed_since_startup.load(Ordering::Acquire)); match $update_res { + ChannelMonitorUpdateStatus::UnrecoverableError => { + let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down."; + log_error!($self.logger, "{}", err_str); + panic!("{}", err_str); + }, ChannelMonitorUpdateStatus::InProgress => { log_debug!($self.logger, "ChannelMonitor update for {} in flight, holding messages until the update completes.", &$chan.context.channel_id()); - Ok(false) - }, - ChannelMonitorUpdateStatus::PermanentFailure => { - log_error!($self.logger, "Closing channel {} due to monitor update ChannelMonitorUpdateStatus::PermanentFailure", - &$chan.context.channel_id()); - update_maps_on_chan_removal!($self, &$chan.context); - let res = Err(MsgHandleErrInternal::from_finish_shutdown( - "ChannelMonitor storage failure".to_owned(), $chan.context.channel_id(), - $chan.context.get_user_id(), $chan.context.force_shutdown(false), - $self.get_channel_update_for_broadcast(&$chan).ok(), $chan.context.get_value_satoshis())); - $remove; - res + false }, ChannelMonitorUpdateStatus::Completed => { $completed; - Ok(true) + true }, } } }; - ($self: ident, $update_res: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan: expr, MANUALLY_REMOVING_INITIAL_MONITOR, $remove: expr) => { - handle_new_monitor_update!($self, $update_res, $peer_state_lock, $peer_state, - $per_peer_state_lock, $chan, _internal, $remove, + ($self: ident, $update_res: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan: expr, INITIAL_MONITOR) => { + handle_new_monitor_update!($self, $update_res, $chan, _internal, handle_monitor_update_completion!($self, $peer_state_lock, $peer_state, $per_peer_state_lock, $chan)) }; - ($self: ident, $update_res: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan_entry: expr, INITIAL_MONITOR) => { - if let ChannelPhase::Funded(chan) = $chan_entry.get_mut() { - handle_new_monitor_update!($self, $update_res, $peer_state_lock, $peer_state, - $per_peer_state_lock, chan, MANUALLY_REMOVING_INITIAL_MONITOR, { $chan_entry.remove() }) - } else { - // We're not supposed to handle monitor updates for unfunded channels (they have no monitors to - // update). - debug_assert!(false); - let channel_id = *$chan_entry.key(); - let (_, err) = convert_chan_phase_err!($self, ChannelError::Close( - "Cannot update monitor for unfunded channels as they don't have monitors yet".into()), - $chan_entry.get_mut(), &channel_id); - $chan_entry.remove(); - Err(err) - } - }; - ($self: ident, $funding_txo: expr, $update: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan: expr, MANUALLY_REMOVING, $remove: expr) => { { + ($self: ident, $funding_txo: expr, $update: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan: expr) => { { let in_flight_updates = $peer_state.in_flight_monitor_updates.entry($funding_txo) .or_insert_with(Vec::new); // During startup, we push monitor updates as background events through to here in @@ -2051,8 +2128,7 @@ macro_rules! handle_new_monitor_update { in_flight_updates.len() - 1 }); let update_res = $self.chain_monitor.update_channel($funding_txo, &in_flight_updates[idx]); - handle_new_monitor_update!($self, update_res, $peer_state_lock, $peer_state, - $per_peer_state_lock, $chan, _internal, $remove, + handle_new_monitor_update!($self, update_res, $chan, _internal, { let _ = in_flight_updates.remove(idx); if in_flight_updates.is_empty() && $chan.blocked_monitor_updates_pending() == 0 { @@ -2060,22 +2136,6 @@ macro_rules! handle_new_monitor_update { } }) } }; - ($self: ident, $funding_txo: expr, $update: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan_entry: expr) => { - if let ChannelPhase::Funded(chan) = $chan_entry.get_mut() { - handle_new_monitor_update!($self, $funding_txo, $update, $peer_state_lock, $peer_state, - $per_peer_state_lock, chan, MANUALLY_REMOVING, { $chan_entry.remove() }) - } else { - // We're not supposed to handle monitor updates for unfunded channels (they have no monitors to - // update). - debug_assert!(false); - let channel_id = *$chan_entry.key(); - let (_, err) = convert_chan_phase_err!($self, ChannelError::Close( - "Cannot update monitor for unfunded channels as they don't have monitors yet".into()), - $chan_entry.get_mut(), &channel_id); - $chan_entry.remove(); - Err(err) - } - } } macro_rules! process_events_body { @@ -2086,7 +2146,7 @@ macro_rules! process_events_body { return; } - let mut result = NotifyOption::SkipPersist; + let mut result; { // We'll acquire our total consistency lock so that we can be sure no other @@ -2095,7 +2155,7 @@ macro_rules! process_events_body { // Because `handle_post_event_actions` may send `ChannelMonitorUpdate`s to the user we must // ensure any startup-generated background events are handled first. - if $self.process_background_events() == NotifyOption::DoPersist { result = NotifyOption::DoPersist; } + result = $self.process_background_events(); // TODO: This behavior should be documented. It's unintuitive that we query // ChannelMonitors when clearing other events. @@ -2135,8 +2195,14 @@ macro_rules! process_events_body { processed_all_events = false; } - if result == NotifyOption::DoPersist { - $self.persistence_notifier.notify(); + match result { + NotifyOption::DoPersist => { + $self.needs_persist_flag.store(true, Ordering::Release); + $self.event_persist_notifier.notify(); + }, + NotifyOption::SkipPersistHandleEvents => + $self.event_persist_notifier.notify(), + NotifyOption::SkipPersistNoEvents => {}, } } } @@ -2215,7 +2281,9 @@ where pending_background_events: Mutex::new(Vec::new()), total_consistency_lock: RwLock::new(()), background_events_processed_since_startup: AtomicBool::new(false), - persistence_notifier: Notifier::new(), + event_persist_notifier: Notifier::new(), + needs_persist_flag: AtomicBool::new(false), + funding_batch_states: Mutex::new(BTreeMap::new()), entropy_source, node_signer, @@ -2480,61 +2548,67 @@ where let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); let mut failed_htlcs: Vec<(HTLCSource, PaymentHash)>; - let result: Result<(), _> = loop { - { - let per_peer_state = self.per_peer_state.read().unwrap(); + let mut shutdown_result = None; + loop { + let per_peer_state = self.per_peer_state.read().unwrap(); - let peer_state_mutex = per_peer_state.get(counterparty_node_id) - .ok_or_else(|| APIError::ChannelUnavailable { err: format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id) })?; + let peer_state_mutex = per_peer_state.get(counterparty_node_id) + .ok_or_else(|| APIError::ChannelUnavailable { err: format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id) })?; - let mut peer_state_lock = peer_state_mutex.lock().unwrap(); - let peer_state = &mut *peer_state_lock; + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); + let peer_state = &mut *peer_state_lock; - match peer_state.channel_by_id.entry(channel_id.clone()) { - hash_map::Entry::Occupied(mut chan_phase_entry) => { - if let ChannelPhase::Funded(chan) = chan_phase_entry.get_mut() { - let funding_txo_opt = chan.context.get_funding_txo(); - let their_features = &peer_state.latest_features; - let (shutdown_msg, mut monitor_update_opt, htlcs) = - chan.get_shutdown(&self.signer_provider, their_features, target_feerate_sats_per_1000_weight, override_shutdown_script)?; - failed_htlcs = htlcs; + match peer_state.channel_by_id.entry(channel_id.clone()) { + hash_map::Entry::Occupied(mut chan_phase_entry) => { + if let ChannelPhase::Funded(chan) = chan_phase_entry.get_mut() { + let funding_txo_opt = chan.context.get_funding_txo(); + let their_features = &peer_state.latest_features; + let unbroadcasted_batch_funding_txid = chan.context.unbroadcasted_batch_funding_txid(); + let (shutdown_msg, mut monitor_update_opt, htlcs) = + chan.get_shutdown(&self.signer_provider, their_features, target_feerate_sats_per_1000_weight, override_shutdown_script)?; + failed_htlcs = htlcs; + + // We can send the `shutdown` message before updating the `ChannelMonitor` + // here as we don't need the monitor update to complete until we send a + // `shutdown_signed`, which we'll delay if we're pending a monitor update. + peer_state.pending_msg_events.push(events::MessageSendEvent::SendShutdown { + node_id: *counterparty_node_id, + msg: shutdown_msg, + }); - // We can send the `shutdown` message before updating the `ChannelMonitor` - // here as we don't need the monitor update to complete until we send a - // `shutdown_signed`, which we'll delay if we're pending a monitor update. - peer_state.pending_msg_events.push(events::MessageSendEvent::SendShutdown { - node_id: *counterparty_node_id, - msg: shutdown_msg, - }); + debug_assert!(monitor_update_opt.is_none() || !chan.is_shutdown(), + "We can't both complete shutdown and generate a monitor update"); - // Update the monitor with the shutdown script if necessary. - if let Some(monitor_update) = monitor_update_opt.take() { - break handle_new_monitor_update!(self, funding_txo_opt.unwrap(), monitor_update, - peer_state_lock, peer_state, per_peer_state, chan_phase_entry).map(|_| ()); - } + // Update the monitor with the shutdown script if necessary. + if let Some(monitor_update) = monitor_update_opt.take() { + handle_new_monitor_update!(self, funding_txo_opt.unwrap(), monitor_update, + peer_state_lock, peer_state, per_peer_state, chan); + break; + } - if chan.is_shutdown() { - if let ChannelPhase::Funded(chan) = remove_channel_phase!(self, chan_phase_entry) { - if let Ok(channel_update) = self.get_channel_update_for_broadcast(&chan) { - peer_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { - msg: channel_update - }); - } - self.issue_channel_close_events(&chan.context, ClosureReason::HolderForceClosed); + if chan.is_shutdown() { + if let ChannelPhase::Funded(chan) = remove_channel_phase!(self, chan_phase_entry) { + if let Ok(channel_update) = self.get_channel_update_for_broadcast(&chan) { + peer_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { + msg: channel_update + }); } + self.issue_channel_close_events(&chan.context, ClosureReason::HolderForceClosed); + shutdown_result = Some((None, Vec::new(), unbroadcasted_batch_funding_txid)); } - break Ok(()); } - }, - hash_map::Entry::Vacant(_) => (), - } + break; + } + }, + hash_map::Entry::Vacant(_) => { + // If we reach this point, it means that the channel_id either refers to an unfunded channel or + // it does not exist for this peer. Either way, we can attempt to force-close it. + // + // An appropriate error will be returned for non-existence of the channel if that's the case. + return self.force_close_channel_with_peer(&channel_id, counterparty_node_id, None, false).map(|_| ()) + }, } - // If we reach this point, it means that the channel_id either refers to an unfunded channel or - // it does not exist for this peer. Either way, we can attempt to force-close it. - // - // An appropriate error will be returned for non-existence of the channel if that's the case. - return self.force_close_channel_with_peer(&channel_id, counterparty_node_id, None, false).map(|_| ()) - }; + } for htlc_source in failed_htlcs.drain(..) { let reason = HTLCFailReason::from_failure_code(0x4000 | 8); @@ -2542,7 +2616,10 @@ where self.fail_htlc_backwards_internal(&htlc_source.0, &htlc_source.1, &reason, receiver); } - let _ = handle_error!(self, result, *counterparty_node_id); + if let Some(shutdown_result) = shutdown_result { + self.finish_close_channel(shutdown_result); + } + Ok(()) } @@ -2607,9 +2684,14 @@ where self.close_channel_internal(channel_id, counterparty_node_id, target_feerate_sats_per_1000_weight, shutdown_script) } - #[inline] - fn finish_force_close_channel(&self, shutdown_res: ShutdownResult) { - let (monitor_update_option, mut failed_htlcs) = shutdown_res; + fn finish_close_channel(&self, shutdown_res: ShutdownResult) { + debug_assert_ne!(self.per_peer_state.held_by_thread(), LockHeldState::HeldByThread); + #[cfg(debug_assertions)] + for (_, peer) in self.per_peer_state.read().unwrap().iter() { + debug_assert_ne!(peer.held_by_thread(), LockHeldState::HeldByThread); + } + + let (monitor_update_option, mut failed_htlcs, unbroadcasted_batch_funding_txid) = shutdown_res; log_debug!(self.logger, "Finishing force-closure of channel with {} HTLCs to fail", failed_htlcs.len()); for htlc_source in failed_htlcs.drain(..) { let (source, payment_hash, counterparty_node_id, channel_id) = htlc_source; @@ -2624,6 +2706,31 @@ where // ignore the result here. let _ = self.chain_monitor.update_channel(funding_txo, &monitor_update); } + let mut shutdown_results = Vec::new(); + if let Some(txid) = unbroadcasted_batch_funding_txid { + let mut funding_batch_states = self.funding_batch_states.lock().unwrap(); + let affected_channels = funding_batch_states.remove(&txid).into_iter().flatten(); + let per_peer_state = self.per_peer_state.read().unwrap(); + let mut has_uncompleted_channel = None; + for (channel_id, counterparty_node_id, state) in affected_channels { + if let Some(peer_state_mutex) = per_peer_state.get(&counterparty_node_id) { + let mut peer_state = peer_state_mutex.lock().unwrap(); + if let Some(mut chan) = peer_state.channel_by_id.remove(&channel_id) { + update_maps_on_chan_removal!(self, &chan.context()); + self.issue_channel_close_events(&chan.context(), ClosureReason::FundingBatchClosure); + shutdown_results.push(chan.context_mut().force_shutdown(false)); + } + } + has_uncompleted_channel = Some(has_uncompleted_channel.map_or(!state, |v| v || !state)); + } + debug_assert!( + has_uncompleted_channel.unwrap_or(true), + "Closing a batch where all channels have completed initial monitor update", + ); + } + for shutdown_result in shutdown_results.drain(..) { + self.finish_close_channel(shutdown_result); + } } /// `peer_msg` should be set when we receive a message from a peer, but not set when the @@ -2634,8 +2741,7 @@ where let peer_state_mutex = per_peer_state.get(peer_node_id) .ok_or_else(|| APIError::ChannelUnavailable { err: format!("Can't find a peer matching the passed counterparty node_id {}", peer_node_id) })?; let (update_opt, counterparty_node_id) = { - let mut peer_state_lock = peer_state_mutex.lock().unwrap(); - let peer_state = &mut *peer_state_lock; + let mut peer_state = peer_state_mutex.lock().unwrap(); let closure_reason = if let Some(peer_msg) = peer_msg { ClosureReason::CounterpartyForceClosed { peer_msg: UntrustedString(peer_msg.to_string()) } } else { @@ -2645,13 +2751,15 @@ where log_error!(self.logger, "Force-closing channel {}", channel_id); self.issue_channel_close_events(&chan_phase_entry.get().context(), closure_reason); let mut chan_phase = remove_channel_phase!(self, chan_phase_entry); + mem::drop(peer_state); + mem::drop(per_peer_state); match chan_phase { ChannelPhase::Funded(mut chan) => { - self.finish_force_close_channel(chan.context.force_shutdown(broadcast)); + self.finish_close_channel(chan.context.force_shutdown(broadcast)); (self.get_channel_update_for_broadcast(&chan).ok(), chan.context.get_counterparty_node_id()) }, ChannelPhase::UnfundedOutboundV1(_) | ChannelPhase::UnfundedInboundV1(_) => { - self.finish_force_close_channel(chan_phase.context_mut().force_shutdown(false)); + self.finish_close_channel(chan_phase.context_mut().force_shutdown(false)); // Unfunded channel has no update (None, chan_phase.context().get_counterparty_node_id()) }, @@ -2667,10 +2775,17 @@ where } }; if let Some(update) = update_opt { - let mut peer_state = peer_state_mutex.lock().unwrap(); - peer_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { - msg: update - }); + // Try to send the `BroadcastChannelUpdate` to the peer we just force-closed on, but if + // not try to broadcast it via whatever peer we have. + let per_peer_state = self.per_peer_state.read().unwrap(); + let a_peer_state_opt = per_peer_state.get(peer_node_id) + .ok_or(per_peer_state.values().next()); + if let Ok(a_peer_state_mutex) = a_peer_state_opt { + let mut a_peer_state = a_peer_state_mutex.lock().unwrap(); + a_peer_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { + msg: update + }); + } } Ok(counterparty_node_id) @@ -2750,7 +2865,7 @@ where let (short_channel_id, amt_to_forward, outgoing_cltv_value) = match hop_data { msgs::InboundOnionPayload::Forward { short_channel_id, amt_to_forward, outgoing_cltv_value } => (short_channel_id, amt_to_forward, outgoing_cltv_value), - msgs::InboundOnionPayload::Receive { .. } => + msgs::InboundOnionPayload::Receive { .. } | msgs::InboundOnionPayload::BlindedReceive { .. } => return Err(InboundOnionErr { msg: "Final Node OnionHopData provided for us as an intermediary node", err_code: 0x4000 | 22, @@ -2782,12 +2897,19 @@ where payment_data, keysend_preimage, custom_tlvs, amt_msat, outgoing_cltv_value, payment_metadata, .. } => (payment_data, keysend_preimage, custom_tlvs, amt_msat, outgoing_cltv_value, payment_metadata), - _ => + msgs::InboundOnionPayload::BlindedReceive { + amt_msat, total_msat, outgoing_cltv_value, payment_secret, .. + } => { + let payment_data = msgs::FinalOnionHopData { payment_secret, total_msat }; + (Some(payment_data), None, Vec::new(), amt_msat, outgoing_cltv_value, None) + } + msgs::InboundOnionPayload::Forward { .. } => { return Err(InboundOnionErr { err_code: 0x4000|22, err_data: Vec::new(), msg: "Got non final data with an HMAC of 0", - }), + }) + }, }; // final_incorrect_cltv_expiry if outgoing_cltv_value > cltv_expiry { @@ -2927,7 +3049,10 @@ where } } - let next_hop = match onion_utils::decode_next_payment_hop(shared_secret, &msg.onion_routing_packet.hop_data[..], msg.onion_routing_packet.hmac, msg.payment_hash) { + let next_hop = match onion_utils::decode_next_payment_hop( + shared_secret, &msg.onion_routing_packet.hop_data[..], msg.onion_routing_packet.hmac, + msg.payment_hash, &self.node_signer + ) { Ok(res) => res, Err(onion_utils::OnionDecodeErr::Malformed { err_msg, err_code }) => { return_malformed_err!(err_msg, err_code); @@ -2949,7 +3074,9 @@ where // We'll do receive checks in [`Self::construct_pending_htlc_info`] so we have access to the // inbound channel's state. onion_utils::Hop::Receive { .. } => return Ok((next_hop, shared_secret, None)), - onion_utils::Hop::Forward { next_hop_data: msgs::InboundOnionPayload::Receive { .. }, .. } => { + onion_utils::Hop::Forward { next_hop_data: msgs::InboundOnionPayload::Receive { .. }, .. } | + onion_utils::Hop::Forward { next_hop_data: msgs::InboundOnionPayload::BlindedReceive { .. }, .. } => + { return_err!("Final Node OnionHopData provided for us as an intermediary node", 0x4000 | 22, &[0; 0]); } }; @@ -3272,9 +3399,8 @@ where }, onion_packet, None, &self.fee_estimator, &self.logger); match break_chan_phase_entry!(self, send_res, chan_phase_entry) { Some(monitor_update) => { - match handle_new_monitor_update!(self, funding_txo, monitor_update, peer_state_lock, peer_state, per_peer_state, chan_phase_entry) { - Err(e) => break Err(e), - Ok(false) => { + match handle_new_monitor_update!(self, funding_txo, monitor_update, peer_state_lock, peer_state, per_peer_state, chan) { + false => { // Note that MonitorUpdateInProgress here indicates (per function // docs) that we will resend the commitment update once monitor // updating completes. Therefore, we must return an error @@ -3283,7 +3409,7 @@ where // MonitorUpdateInProgress, below. return Err(APIError::MonitorUpdateInProgress); }, - Ok(true) => {}, + true => {}, } }, None => {}, @@ -3489,10 +3615,121 @@ where outbound_payment::payment_is_probe(payment_hash, payment_id, self.probing_cookie_secret) } + /// Sends payment probes over all paths of a route that would be used to pay the given + /// amount to the given `node_id`. + /// + /// See [`ChannelManager::send_preflight_probes`] for more information. + pub fn send_spontaneous_preflight_probes( + &self, node_id: PublicKey, amount_msat: u64, final_cltv_expiry_delta: u32, + liquidity_limit_multiplier: Option, + ) -> Result, ProbeSendFailure> { + let payment_params = + PaymentParameters::from_node_id(node_id, final_cltv_expiry_delta); + + let route_params = RouteParameters::from_payment_params_and_value(payment_params, amount_msat); + + self.send_preflight_probes(route_params, liquidity_limit_multiplier) + } + + /// Sends payment probes over all paths of a route that would be used to pay a route found + /// according to the given [`RouteParameters`]. + /// + /// This may be used to send "pre-flight" probes, i.e., to train our scorer before conducting + /// the actual payment. Note this is only useful if there likely is sufficient time for the + /// probe to settle before sending out the actual payment, e.g., when waiting for user + /// confirmation in a wallet UI. + /// + /// Otherwise, there is a chance the probe could take up some liquidity needed to complete the + /// actual payment. Users should therefore be cautious and might avoid sending probes if + /// liquidity is scarce and/or they don't expect the probe to return before they send the + /// payment. To mitigate this issue, channels with available liquidity less than the required + /// amount times the given `liquidity_limit_multiplier` won't be used to send pre-flight + /// probes. If `None` is given as `liquidity_limit_multiplier`, it defaults to `3`. + pub fn send_preflight_probes( + &self, route_params: RouteParameters, liquidity_limit_multiplier: Option, + ) -> Result, ProbeSendFailure> { + let liquidity_limit_multiplier = liquidity_limit_multiplier.unwrap_or(3); + + let payer = self.get_our_node_id(); + let usable_channels = self.list_usable_channels(); + let first_hops = usable_channels.iter().collect::>(); + let inflight_htlcs = self.compute_inflight_htlcs(); + + let route = self + .router + .find_route(&payer, &route_params, Some(&first_hops), inflight_htlcs) + .map_err(|e| { + log_error!(self.logger, "Failed to find path for payment probe: {:?}", e); + ProbeSendFailure::RouteNotFound + })?; + + let mut used_liquidity_map = HashMap::with_capacity(first_hops.len()); + + let mut res = Vec::new(); + + for mut path in route.paths { + // If the last hop is probably an unannounced channel we refrain from probing all the + // way through to the end and instead probe up to the second-to-last channel. + while let Some(last_path_hop) = path.hops.last() { + if last_path_hop.maybe_announced_channel { + // We found a potentially announced last hop. + break; + } else { + // Drop the last hop, as it's likely unannounced. + log_debug!( + self.logger, + "Avoided sending payment probe all the way to last hop {} as it is likely unannounced.", + last_path_hop.short_channel_id + ); + let final_value_msat = path.final_value_msat(); + path.hops.pop(); + if let Some(new_last) = path.hops.last_mut() { + new_last.fee_msat += final_value_msat; + } + } + } + + if path.hops.len() < 2 { + log_debug!( + self.logger, + "Skipped sending payment probe over path with less than two hops." + ); + continue; + } + + if let Some(first_path_hop) = path.hops.first() { + if let Some(first_hop) = first_hops.iter().find(|h| { + h.get_outbound_payment_scid() == Some(first_path_hop.short_channel_id) + }) { + let path_value = path.final_value_msat() + path.fee_msat(); + let used_liquidity = + used_liquidity_map.entry(first_path_hop.short_channel_id).or_insert(0); + + if first_hop.next_outbound_htlc_limit_msat + < (*used_liquidity + path_value) * liquidity_limit_multiplier + { + log_debug!(self.logger, "Skipped sending payment probe to avoid putting channel {} under the liquidity limit.", first_path_hop.short_channel_id); + continue; + } else { + *used_liquidity += path_value; + } + } + } + + res.push(self.send_probe(path).map_err(|e| { + log_error!(self.logger, "Failed to send pre-flight probe: {:?}", e); + ProbeSendFailure::SendingFailed(e) + })?); + } + + Ok(res) + } + /// Handles the generation of a funding transaction, optionally (for tests) with a function /// which checks the correctness of the funding transaction given the associated channel. - fn funding_transaction_generated_intern, &Transaction) -> Result>( - &self, temporary_channel_id: &ChannelId, counterparty_node_id: &PublicKey, funding_transaction: Transaction, find_funding_output: FundingOutput + fn funding_transaction_generated_intern, &Transaction) -> Result>( + &self, temporary_channel_id: &ChannelId, counterparty_node_id: &PublicKey, funding_transaction: Transaction, is_batch_funding: bool, + mut find_funding_output: FundingOutput, ) -> Result<(), APIError> { let per_peer_state = self.per_peer_state.read().unwrap(); let peer_state_mutex = per_peer_state.get(counterparty_node_id) @@ -3504,7 +3741,7 @@ where Some(ChannelPhase::UnfundedOutboundV1(chan)) => { let funding_txo = find_funding_output(&chan, &funding_transaction)?; - let funding_res = chan.get_funding_created(funding_transaction, funding_txo, &self.logger) + let funding_res = chan.get_funding_created(funding_transaction, funding_txo, is_batch_funding, &self.logger) .map_err(|(mut chan, e)| if let ChannelError::Close(msg) = e { let channel_id = chan.context.channel_id(); let user_id = chan.context.get_user_id(); @@ -3560,7 +3797,7 @@ where #[cfg(test)] pub(crate) fn funding_transaction_generated_unchecked(&self, temporary_channel_id: &ChannelId, counterparty_node_id: &PublicKey, funding_transaction: Transaction, output_index: u16) -> Result<(), APIError> { - self.funding_transaction_generated_intern(temporary_channel_id, counterparty_node_id, funding_transaction, |_, tx| { + self.funding_transaction_generated_intern(temporary_channel_id, counterparty_node_id, funding_transaction, false, |_, tx| { Ok(OutPoint { txid: tx.txid(), index: output_index }) }) } @@ -3596,17 +3833,37 @@ where /// [`Event::FundingGenerationReady`]: crate::events::Event::FundingGenerationReady /// [`Event::ChannelClosed`]: crate::events::Event::ChannelClosed pub fn funding_transaction_generated(&self, temporary_channel_id: &ChannelId, counterparty_node_id: &PublicKey, funding_transaction: Transaction) -> Result<(), APIError> { + self.batch_funding_transaction_generated(&[(temporary_channel_id, counterparty_node_id)], funding_transaction) + } + + /// Call this upon creation of a batch funding transaction for the given channels. + /// + /// Return values are identical to [`Self::funding_transaction_generated`], respective to + /// each individual channel and transaction output. + /// + /// Do NOT broadcast the funding transaction yourself. This batch funding transcaction + /// will only be broadcast when we have safely received and persisted the counterparty's + /// signature for each channel. + /// + /// If there is an error, all channels in the batch are to be considered closed. + pub fn batch_funding_transaction_generated(&self, temporary_channels: &[(&ChannelId, &PublicKey)], funding_transaction: Transaction) -> Result<(), APIError> { let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); + let mut result = Ok(()); if !funding_transaction.is_coin_base() { for inp in funding_transaction.input.iter() { if inp.witness.is_empty() { - return Err(APIError::APIMisuseError { + result = result.and(Err(APIError::APIMisuseError { err: "Funding transaction must be fully signed and spend Segwit outputs".to_owned() - }); + })); } } } + if funding_transaction.output.len() > u16::max_value() as usize { + result = result.and(Err(APIError::APIMisuseError { + err: "Transaction had more than 2^16 outputs, which is not supported".to_owned() + })); + } { let height = self.best_block.read().unwrap().height(); // Transactions are evaluated as final by network mempools if their locktime is strictly @@ -3614,37 +3871,93 @@ where // node might not have perfect sync about their blockchain views. Thus, if the wallet // module is ahead of LDK, only allow one more block of headroom. if !funding_transaction.input.iter().all(|input| input.sequence == Sequence::MAX) && LockTime::from(funding_transaction.lock_time).is_block_height() && funding_transaction.lock_time.0 > height + 1 { - return Err(APIError::APIMisuseError { + result = result.and(Err(APIError::APIMisuseError { err: "Funding transaction absolute timelock is non-final".to_owned() - }); + })); } } - self.funding_transaction_generated_intern(temporary_channel_id, counterparty_node_id, funding_transaction, |chan, tx| { - if tx.output.len() > u16::max_value() as usize { - return Err(APIError::APIMisuseError { - err: "Transaction had more than 2^16 outputs, which is not supported".to_owned() - }); - } - let mut output_index = None; - let expected_spk = chan.context.get_funding_redeemscript().to_v0_p2wsh(); - for (idx, outp) in tx.output.iter().enumerate() { - if outp.script_pubkey == expected_spk && outp.value == chan.context.get_value_satoshis() { - if output_index.is_some() { + let txid = funding_transaction.txid(); + let is_batch_funding = temporary_channels.len() > 1; + let mut funding_batch_states = if is_batch_funding { + Some(self.funding_batch_states.lock().unwrap()) + } else { + None + }; + let mut funding_batch_state = funding_batch_states.as_mut().and_then(|states| { + match states.entry(txid) { + btree_map::Entry::Occupied(_) => { + result = result.clone().and(Err(APIError::APIMisuseError { + err: "Batch funding transaction with the same txid already exists".to_owned() + })); + None + }, + btree_map::Entry::Vacant(vacant) => Some(vacant.insert(Vec::new())), + } + }); + for (channel_idx, &(temporary_channel_id, counterparty_node_id)) in temporary_channels.iter().enumerate() { + result = result.and_then(|_| self.funding_transaction_generated_intern( + temporary_channel_id, + counterparty_node_id, + funding_transaction.clone(), + is_batch_funding, + |chan, tx| { + let mut output_index = None; + let expected_spk = chan.context.get_funding_redeemscript().to_v0_p2wsh(); + for (idx, outp) in tx.output.iter().enumerate() { + if outp.script_pubkey == expected_spk && outp.value == chan.context.get_value_satoshis() { + if output_index.is_some() { + return Err(APIError::APIMisuseError { + err: "Multiple outputs matched the expected script and value".to_owned() + }); + } + output_index = Some(idx as u16); + } + } + if output_index.is_none() { return Err(APIError::APIMisuseError { - err: "Multiple outputs matched the expected script and value".to_owned() + err: "No output matched the script_pubkey and value in the FundingGenerationReady event".to_owned() }); } - output_index = Some(idx as u16); + let outpoint = OutPoint { txid: tx.txid(), index: output_index.unwrap() }; + if let Some(funding_batch_state) = funding_batch_state.as_mut() { + funding_batch_state.push((outpoint.to_channel_id(), *counterparty_node_id, false)); + } + Ok(outpoint) + }) + ); + } + if let Err(ref e) = result { + // Remaining channels need to be removed on any error. + let e = format!("Error in transaction funding: {:?}", e); + let mut channels_to_remove = Vec::new(); + channels_to_remove.extend(funding_batch_states.as_mut() + .and_then(|states| states.remove(&txid)) + .into_iter().flatten() + .map(|(chan_id, node_id, _state)| (chan_id, node_id)) + ); + channels_to_remove.extend(temporary_channels.iter() + .map(|(&chan_id, &node_id)| (chan_id, node_id)) + ); + let mut shutdown_results = Vec::new(); + { + let per_peer_state = self.per_peer_state.read().unwrap(); + for (channel_id, counterparty_node_id) in channels_to_remove { + per_peer_state.get(&counterparty_node_id) + .map(|peer_state_mutex| peer_state_mutex.lock().unwrap()) + .and_then(|mut peer_state| peer_state.channel_by_id.remove(&channel_id)) + .map(|mut chan| { + update_maps_on_chan_removal!(self, &chan.context()); + self.issue_channel_close_events(&chan.context(), ClosureReason::ProcessingError { err: e.clone() }); + shutdown_results.push(chan.context_mut().force_shutdown(false)); + }); } } - if output_index.is_none() { - return Err(APIError::APIMisuseError { - err: "No output matched the script_pubkey and value in the FundingGenerationReady event".to_owned() - }); + for shutdown_result in shutdown_results.drain(..) { + self.finish_close_channel(shutdown_result); } - Ok(OutPoint { txid: tx.txid(), index: output_index.unwrap() }) - }) + } + result } /// Atomically applies partial updates to the [`ChannelConfig`] of the given channels. @@ -3936,7 +4249,10 @@ where let phantom_pubkey_res = self.node_signer.get_node_id(Recipient::PhantomNode); if phantom_pubkey_res.is_ok() && fake_scid::is_valid_phantom(&self.fake_scid_rand_bytes, short_chan_id, &self.genesis_hash) { let phantom_shared_secret = self.node_signer.ecdh(Recipient::PhantomNode, &onion_packet.public_key.unwrap(), None).unwrap().secret_bytes(); - let next_hop = match onion_utils::decode_next_payment_hop(phantom_shared_secret, &onion_packet.hop_data, onion_packet.hmac, payment_hash) { + let next_hop = match onion_utils::decode_next_payment_hop( + phantom_shared_secret, &onion_packet.hop_data, onion_packet.hmac, + payment_hash, &self.node_signer + ) { Ok(res) => res, Err(onion_utils::OnionDecodeErr::Malformed { err_msg, err_code }) => { let sha256_of_onion = Sha256::hash(&onion_packet.hop_data).into_inner(); @@ -4340,7 +4656,7 @@ where let mut background_events = Vec::new(); mem::swap(&mut *self.pending_background_events.lock().unwrap(), &mut background_events); if background_events.is_empty() { - return NotifyOption::SkipPersist; + return NotifyOption::SkipPersistNoEvents; } for event in background_events.drain(..) { @@ -4352,33 +4668,29 @@ where }, BackgroundEvent::MonitorUpdateRegeneratedOnStartup { counterparty_node_id, funding_txo, update } => { let mut updated_chan = false; - let res = { + { let per_peer_state = self.per_peer_state.read().unwrap(); if let Some(peer_state_mutex) = per_peer_state.get(&counterparty_node_id) { let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; match peer_state.channel_by_id.entry(funding_txo.to_channel_id()) { hash_map::Entry::Occupied(mut chan_phase) => { - updated_chan = true; - handle_new_monitor_update!(self, funding_txo, update.clone(), - peer_state_lock, peer_state, per_peer_state, chan_phase).map(|_| ()) + if let ChannelPhase::Funded(chan) = chan_phase.get_mut() { + updated_chan = true; + handle_new_monitor_update!(self, funding_txo, update.clone(), + peer_state_lock, peer_state, per_peer_state, chan); + } else { + debug_assert!(false, "We shouldn't have an update for a non-funded channel"); + } }, - hash_map::Entry::Vacant(_) => Ok(()), + hash_map::Entry::Vacant(_) => {}, } - } else { Ok(()) } - }; + } + } if !updated_chan { // TODO: Track this as in-flight even though the channel is closed. let _ = self.chain_monitor.update_channel(funding_txo, &update); } - // TODO: If this channel has since closed, we're likely providing a payment - // preimage update, which we must ensure is durable! We currently don't, - // however, ensure that. - if res.is_err() { - log_error!(self.logger, - "Failed to provide ChannelMonitorUpdate to closed channel! This likely lost us a payment preimage!"); - } - let _ = handle_error!(self, res, counterparty_node_id); }, BackgroundEvent::MonitorUpdatesComplete { counterparty_node_id, channel_id } => { let per_peer_state = self.per_peer_state.read().unwrap(); @@ -4409,17 +4721,17 @@ where } fn update_channel_fee(&self, chan_id: &ChannelId, chan: &mut Channel, new_feerate: u32) -> NotifyOption { - if !chan.context.is_outbound() { return NotifyOption::SkipPersist; } + if !chan.context.is_outbound() { return NotifyOption::SkipPersistNoEvents; } // If the feerate has decreased by less than half, don't bother if new_feerate <= chan.context.get_feerate_sat_per_1000_weight() && new_feerate * 2 > chan.context.get_feerate_sat_per_1000_weight() { log_trace!(self.logger, "Channel {} does not qualify for a feerate change from {} to {}.", - &chan_id, chan.context.get_feerate_sat_per_1000_weight(), new_feerate); - return NotifyOption::SkipPersist; + chan_id, chan.context.get_feerate_sat_per_1000_weight(), new_feerate); + return NotifyOption::SkipPersistNoEvents; } if !chan.context.is_live() { log_trace!(self.logger, "Channel {} does not qualify for a feerate change from {} to {} as it cannot currently be updated (probably the peer is disconnected).", - &chan_id, chan.context.get_feerate_sat_per_1000_weight(), new_feerate); - return NotifyOption::SkipPersist; + chan_id, chan.context.get_feerate_sat_per_1000_weight(), new_feerate); + return NotifyOption::SkipPersistNoEvents; } log_trace!(self.logger, "Channel {} qualifies for a feerate change from {} to {}.", &chan_id, chan.context.get_feerate_sat_per_1000_weight(), new_feerate); @@ -4434,8 +4746,8 @@ where /// these a fuzz failure (as they usually indicate a channel force-close, which is exactly what /// it wants to detect). Thus, we have a variant exposed here for its benefit. pub fn maybe_update_chan_fees(&self) { - PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, &self.persistence_notifier, || { - let mut should_persist = self.process_background_events(); + PersistenceNotifierGuard::optionally_notify(self, || { + let mut should_persist = NotifyOption::SkipPersistNoEvents; let normal_feerate = self.fee_estimator.bounded_sat_per_1000_weight(ConfirmationTarget::Normal); let min_mempool_feerate = self.fee_estimator.bounded_sat_per_1000_weight(ConfirmationTarget::MempoolMinimum); @@ -4479,8 +4791,8 @@ where /// [`ChannelUpdate`]: msgs::ChannelUpdate /// [`ChannelConfig`]: crate::util::config::ChannelConfig pub fn timer_tick_occurred(&self) { - PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, &self.persistence_notifier, || { - let mut should_persist = self.process_background_events(); + PersistenceNotifierGuard::optionally_notify(self, || { + let mut should_persist = NotifyOption::SkipPersistNoEvents; let normal_feerate = self.fee_estimator.bounded_sat_per_1000_weight(ConfirmationTarget::Normal); let min_mempool_feerate = self.fee_estimator.bounded_sat_per_1000_weight(ConfirmationTarget::MempoolMinimum); @@ -4488,8 +4800,9 @@ where let mut handle_errors: Vec<(Result<(), _>, _)> = Vec::new(); let mut timed_out_mpp_htlcs = Vec::new(); let mut pending_peers_awaiting_removal = Vec::new(); + let mut shutdown_channels = Vec::new(); - let process_unfunded_channel_tick = | + let mut process_unfunded_channel_tick = | chan_id: &ChannelId, context: &mut ChannelContext, unfunded_context: &mut UnfundedChannelContext, @@ -4502,7 +4815,7 @@ where "Force-closing pending channel with ID {} for not establishing in a timely manner", chan_id); update_maps_on_chan_removal!(self, &context); self.issue_channel_close_events(&context, ClosureReason::HolderForceClosed); - self.finish_force_close_channel(context.force_shutdown(false)); + shutdown_channels.push(context.force_shutdown(false)); pending_msg_events.push(MessageSendEvent::HandleError { node_id: counterparty_node_id, action: msgs::ErrorAction::SendErrorMessage { @@ -4695,6 +5008,10 @@ where let _ = handle_error!(self, err, counterparty_node_id); } + for shutdown_res in shutdown_channels { + self.finish_close_channel(shutdown_res); + } + self.pending_outbound_payments.remove_stale_payments(&self.pending_events); // Technically we don't need to do this here, but if we have holding cell entries in a @@ -4851,6 +5168,7 @@ where // This ensures that future code doesn't introduce a lock-order requirement for // `forward_htlcs` to be locked after the `per_peer_state` peer locks, which calling // this function with any `per_peer_state` peer lock acquired would. + #[cfg(debug_assertions)] for (_, peer) in self.per_peer_state.read().unwrap().iter() { debug_assert_ne!(peer.held_by_thread(), LockHeldState::HeldByThread); } @@ -5101,15 +5419,8 @@ where peer_state.monitor_update_blocked_actions.entry(chan_id).or_insert(Vec::new()).push(action); } if !during_init { - let res = handle_new_monitor_update!(self, prev_hop.outpoint, monitor_update, peer_state_lock, - peer_state, per_peer_state, chan_phase_entry); - if let Err(e) = res { - // TODO: This is a *critical* error - we probably updated the outbound edge - // of the HTLC's monitor with a preimage. We should retry this monitor - // update over and over again until morale improves. - log_error!(self.logger, "Failed to update channel monitor with preimage {:?}", payment_preimage); - return Err((counterparty_node_id, e)); - } + handle_new_monitor_update!(self, prev_hop.outpoint, monitor_update, peer_state_lock, + peer_state, per_peer_state, chan); } else { // If we're running during init we cannot update a monitor directly - // they probably haven't actually been loaded yet. Instead, push the @@ -5175,11 +5486,17 @@ where self.pending_outbound_payments.finalize_claims(sources, &self.pending_events); } - fn claim_funds_internal(&self, source: HTLCSource, payment_preimage: PaymentPreimage, forwarded_htlc_value_msat: Option, from_onchain: bool, next_channel_outpoint: OutPoint) { + fn claim_funds_internal(&self, source: HTLCSource, payment_preimage: PaymentPreimage, + forwarded_htlc_value_msat: Option, from_onchain: bool, + next_channel_counterparty_node_id: Option, next_channel_outpoint: OutPoint + ) { match source { HTLCSource::OutboundRoute { session_priv, payment_id, path, .. } => { debug_assert!(self.background_events_processed_since_startup.load(Ordering::Acquire), "We don't support claim_htlc claims during startup - monitors may not be available yet"); + if let Some(pubkey) = next_channel_counterparty_node_id { + debug_assert_eq!(pubkey, path.hops[0].pubkey); + } let ev_completion_action = EventCompletionAction::ReleaseRAAChannelMonitorUpdate { channel_funding_outpoint: next_channel_outpoint, counterparty_node_id: path.hops[0].pubkey, @@ -5190,6 +5507,7 @@ where }, HTLCSource::PreviousHopData(hop_data) => { let prev_outpoint = hop_data.outpoint; + let completed_blocker = RAAMonitorUpdateBlockingAction::from_prev_hop_data(&hop_data); let res = self.claim_funds_from_hop(hop_data, payment_preimage, |htlc_claim_value_msat| { if let Some(forwarded_htlc_value) = forwarded_htlc_value_msat { @@ -5205,7 +5523,17 @@ where next_channel_id: Some(next_channel_outpoint.to_channel_id()), outbound_amount_forwarded_msat: forwarded_htlc_value_msat, }, - downstream_counterparty_and_funding_outpoint: None, + downstream_counterparty_and_funding_outpoint: + if let Some(node_id) = next_channel_counterparty_node_id { + Some((node_id, next_channel_outpoint, completed_blocker)) + } else { + // We can only get `None` here if we are processing a + // `ChannelMonitor`-originated event, in which case we + // don't care about ensuring we wake the downstream + // channel's monitor updating - the channel is already + // closed. + None + }, }) } else { None } }); @@ -5541,6 +5869,8 @@ where } fn internal_open_channel(&self, counterparty_node_id: &PublicKey, msg: &msgs::OpenChannel) -> Result<(), MsgHandleErrInternal> { + // Note that the ChannelManager is NOT re-persisted on disk after this, so any changes are + // likely to be lost on restart! if msg.chain_hash != self.genesis_hash { return Err(MsgHandleErrInternal::send_err_msg_no_close("Unknown genesis block hash".to_owned(), msg.temporary_channel_id.clone())); } @@ -5640,6 +5970,8 @@ where } fn internal_accept_channel(&self, counterparty_node_id: &PublicKey, msg: &msgs::AcceptChannel) -> Result<(), MsgHandleErrInternal> { + // Note that the ChannelManager is NOT re-persisted on disk after this, so any changes are + // likely to be lost on restart! let (value, output_script, user_id) = { let per_peer_state = self.per_peer_state.read().unwrap(); let peer_state_mutex = per_peer_state.get(counterparty_node_id) @@ -5715,47 +6047,42 @@ where Err(MsgHandleErrInternal::send_err_msg_no_close("Already had channel with the new channel_id".to_owned(), funding_msg.channel_id)) }, hash_map::Entry::Vacant(e) => { - match self.id_to_peer.lock().unwrap().entry(chan.context.channel_id()) { + let mut id_to_peer_lock = self.id_to_peer.lock().unwrap(); + match id_to_peer_lock.entry(chan.context.channel_id()) { hash_map::Entry::Occupied(_) => { return Err(MsgHandleErrInternal::send_err_msg_no_close( "The funding_created message had the same funding_txid as an existing channel - funding is not possible".to_owned(), funding_msg.channel_id)) }, hash_map::Entry::Vacant(i_e) => { - i_e.insert(chan.context.get_counterparty_node_id()); - } - } - - // There's no problem signing a counterparty's funding transaction if our monitor - // hasn't persisted to disk yet - we can't lose money on a transaction that we haven't - // accepted payment from yet. We do, however, need to wait to send our channel_ready - // until we have persisted our monitor. - let new_channel_id = funding_msg.channel_id; - peer_state.pending_msg_events.push(events::MessageSendEvent::SendFundingSigned { - node_id: counterparty_node_id.clone(), - msg: funding_msg, - }); + let monitor_res = self.chain_monitor.watch_channel(monitor.get_funding_txo().0, monitor); + if let Ok(persist_state) = monitor_res { + i_e.insert(chan.context.get_counterparty_node_id()); + mem::drop(id_to_peer_lock); + + // There's no problem signing a counterparty's funding transaction if our monitor + // hasn't persisted to disk yet - we can't lose money on a transaction that we haven't + // accepted payment from yet. We do, however, need to wait to send our channel_ready + // until we have persisted our monitor. + peer_state.pending_msg_events.push(events::MessageSendEvent::SendFundingSigned { + node_id: counterparty_node_id.clone(), + msg: funding_msg, + }); - let monitor_res = self.chain_monitor.watch_channel(monitor.get_funding_txo().0, monitor); - - if let ChannelPhase::Funded(chan) = e.insert(ChannelPhase::Funded(chan)) { - let mut res = handle_new_monitor_update!(self, monitor_res, peer_state_lock, peer_state, - per_peer_state, chan, MANUALLY_REMOVING_INITIAL_MONITOR, - { peer_state.channel_by_id.remove(&new_channel_id) }); - - // Note that we reply with the new channel_id in error messages if we gave up on the - // channel, not the temporary_channel_id. This is compatible with ourselves, but the - // spec is somewhat ambiguous here. Not a huge deal since we'll send error messages for - // any messages referencing a previously-closed channel anyway. - // We do not propagate the monitor update to the user as it would be for a monitor - // that we didn't manage to store (and that we don't care about - we don't respond - // with the funding_signed so the channel can never go on chain). - if let Err(MsgHandleErrInternal { shutdown_finish: Some((res, _)), .. }) = &mut res { - res.0 = None; + if let ChannelPhase::Funded(chan) = e.insert(ChannelPhase::Funded(chan)) { + handle_new_monitor_update!(self, persist_state, peer_state_lock, peer_state, + per_peer_state, chan, INITIAL_MONITOR); + } else { + unreachable!("This must be a funded channel as we just inserted it."); + } + Ok(()) + } else { + log_error!(self.logger, "Persisting initial ChannelMonitor failed, implying the funding outpoint was duplicated"); + return Err(MsgHandleErrInternal::send_err_msg_no_close( + "The funding_created message had the same funding_txid as an existing channel - funding is not possible".to_owned(), + funding_msg.channel_id)); + } } - res.map(|_| ()) - } else { - unreachable!("This must be a funded channel as we just inserted it."); } } } @@ -5778,17 +6105,12 @@ where ChannelPhase::Funded(ref mut chan) => { let monitor = try_chan_phase_entry!(self, chan.funding_signed(&msg, best_block, &self.signer_provider, &self.logger), chan_phase_entry); - let update_res = self.chain_monitor.watch_channel(chan.context.get_funding_txo().unwrap(), monitor); - let mut res = handle_new_monitor_update!(self, update_res, peer_state_lock, peer_state, per_peer_state, chan_phase_entry, INITIAL_MONITOR); - if let Err(MsgHandleErrInternal { ref mut shutdown_finish, .. }) = res { - // We weren't able to watch the channel to begin with, so no updates should be made on - // it. Previously, full_stack_target found an (unreachable) panic when the - // monitor update contained within `shutdown_finish` was applied. - if let Some((ref mut shutdown_finish, _)) = shutdown_finish { - shutdown_finish.0.take(); - } + if let Ok(persist_status) = self.chain_monitor.watch_channel(chan.context.get_funding_txo().unwrap(), monitor) { + handle_new_monitor_update!(self, persist_status, peer_state_lock, peer_state, per_peer_state, chan, INITIAL_MONITOR); + Ok(()) + } else { + try_chan_phase_entry!(self, Err(ChannelError::Close("Channel funding outpoint was a duplicate".to_owned())), chan_phase_entry) } - res.map(|_| ()) }, _ => { return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel".to_owned(), msg.channel_id)); @@ -5800,6 +6122,8 @@ where } fn internal_channel_ready(&self, counterparty_node_id: &PublicKey, msg: &msgs::ChannelReady) -> Result<(), MsgHandleErrInternal> { + // Note that the ChannelManager is NOT re-persisted on disk after this (unless we error + // closing a channel), so any changes are likely to be lost on restart! let per_peer_state = self.per_peer_state.read().unwrap(); let peer_state_mutex = per_peer_state.get(counterparty_node_id) .ok_or_else(|| { @@ -5852,8 +6176,9 @@ where } fn internal_shutdown(&self, counterparty_node_id: &PublicKey, msg: &msgs::Shutdown) -> Result<(), MsgHandleErrInternal> { - let mut dropped_htlcs: Vec<(HTLCSource, PaymentHash)>; - let result: Result<(), _> = loop { + let mut dropped_htlcs: Vec<(HTLCSource, PaymentHash)> = Vec::new(); + let mut finish_shutdown = None; + { let per_peer_state = self.per_peer_state.read().unwrap(); let peer_state_mutex = per_peer_state.get(counterparty_node_id) .ok_or_else(|| { @@ -5888,34 +6213,37 @@ where } // Update the monitor with the shutdown script if necessary. if let Some(monitor_update) = monitor_update_opt { - break handle_new_monitor_update!(self, funding_txo_opt.unwrap(), monitor_update, - peer_state_lock, peer_state, per_peer_state, chan_phase_entry).map(|_| ()); + handle_new_monitor_update!(self, funding_txo_opt.unwrap(), monitor_update, + peer_state_lock, peer_state, per_peer_state, chan); } - break Ok(()); }, ChannelPhase::UnfundedInboundV1(_) | ChannelPhase::UnfundedOutboundV1(_) => { let context = phase.context_mut(); log_error!(self.logger, "Immediately closing unfunded channel {} as peer asked to cooperatively shut it down (which is unnecessary)", &msg.channel_id); self.issue_channel_close_events(&context, ClosureReason::CounterpartyCoopClosedUnfundedChannel); let mut chan = remove_channel_phase!(self, chan_phase_entry); - self.finish_force_close_channel(chan.context_mut().force_shutdown(false)); - return Ok(()); + finish_shutdown = Some(chan.context_mut().force_shutdown(false)); }, } } else { return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id)) } - }; + } for htlc_source in dropped_htlcs.drain(..) { let receiver = HTLCDestination::NextHopChannel { node_id: Some(counterparty_node_id.clone()), channel_id: msg.channel_id }; let reason = HTLCFailReason::from_failure_code(0x4000 | 8); self.fail_htlc_backwards_internal(&htlc_source.0, &htlc_source.1, &reason, receiver); } + if let Some(shutdown_res) = finish_shutdown { + self.finish_close_channel(shutdown_res); + } - result + Ok(()) } fn internal_closing_signed(&self, counterparty_node_id: &PublicKey, msg: &msgs::ClosingSigned) -> Result<(), MsgHandleErrInternal> { + let mut shutdown_result = None; + let unbroadcasted_batch_funding_txid; let per_peer_state = self.per_peer_state.read().unwrap(); let peer_state_mutex = per_peer_state.get(counterparty_node_id) .ok_or_else(|| { @@ -5928,6 +6256,7 @@ where match peer_state.channel_by_id.entry(msg.channel_id.clone()) { hash_map::Entry::Occupied(mut chan_phase_entry) => { if let ChannelPhase::Funded(chan) = chan_phase_entry.get_mut() { + unbroadcasted_batch_funding_txid = chan.context.unbroadcasted_batch_funding_txid(); let (closing_signed, tx) = try_chan_phase_entry!(self, chan.closing_signed(&self.fee_estimator, &msg), chan_phase_entry); if let Some(msg) = closing_signed { peer_state.pending_msg_events.push(events::MessageSendEvent::SendClosingSigned { @@ -5964,6 +6293,11 @@ where }); } self.issue_channel_close_events(&chan.context, ClosureReason::CooperativeClosure); + shutdown_result = Some((None, Vec::new(), unbroadcasted_batch_funding_txid)); + } + mem::drop(per_peer_state); + if let Some(shutdown_result) = shutdown_result { + self.finish_close_channel(shutdown_result); } Ok(()) } @@ -5978,6 +6312,9 @@ where //encrypted with the same key. It's not immediately obvious how to usefully exploit that, //but we should prevent it anyway. + // Note that the ChannelManager is NOT re-persisted on disk after this (unless we error + // closing a channel), so any changes are likely to be lost on restart! + let decoded_hop_res = self.decode_update_add_htlc_onion(msg); let per_peer_state = self.per_peer_state.read().unwrap(); let peer_state_mutex = per_peer_state.get(counterparty_node_id) @@ -6044,6 +6381,17 @@ where hash_map::Entry::Occupied(mut chan_phase_entry) => { if let ChannelPhase::Funded(chan) = chan_phase_entry.get_mut() { let res = try_chan_phase_entry!(self, chan.update_fulfill_htlc(&msg), chan_phase_entry); + if let HTLCSource::PreviousHopData(prev_hop) = &res.0 { + peer_state.actions_blocking_raa_monitor_updates.entry(msg.channel_id) + .or_insert_with(Vec::new) + .push(RAAMonitorUpdateBlockingAction::from_prev_hop_data(&prev_hop)); + } + // Note that we do not need to push an `actions_blocking_raa_monitor_updates` + // entry here, even though we *do* need to block the next RAA monitor update. + // We do this instead in the `claim_funds_internal` by attaching a + // `ReleaseRAAChannelMonitorUpdate` action to the event generated when the + // outbound HTLC is claimed. This is guaranteed to all complete before we + // process the RAA as messages are processed from single peers serially. funding_txo = chan.context.get_funding_txo().expect("We won't accept a fulfill until funded"); res } else { @@ -6054,11 +6402,13 @@ where hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id)) } }; - self.claim_funds_internal(htlc_source, msg.payment_preimage.clone(), Some(forwarded_htlc_value), false, funding_txo); + self.claim_funds_internal(htlc_source, msg.payment_preimage.clone(), Some(forwarded_htlc_value), false, Some(*counterparty_node_id), funding_txo); Ok(()) } fn internal_update_fail_htlc(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateFailHTLC) -> Result<(), MsgHandleErrInternal> { + // Note that the ChannelManager is NOT re-persisted on disk after this (unless we error + // closing a channel), so any changes are likely to be lost on restart! let per_peer_state = self.per_peer_state.read().unwrap(); let peer_state_mutex = per_peer_state.get(counterparty_node_id) .ok_or_else(|| { @@ -6082,6 +6432,8 @@ where } fn internal_update_fail_malformed_htlc(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateFailMalformedHTLC) -> Result<(), MsgHandleErrInternal> { + // Note that the ChannelManager is NOT re-persisted on disk after this (unless we error + // closing a channel), so any changes are likely to be lost on restart! let per_peer_state = self.per_peer_state.read().unwrap(); let peer_state_mutex = per_peer_state.get(counterparty_node_id) .ok_or_else(|| { @@ -6124,8 +6476,9 @@ where let monitor_update_opt = try_chan_phase_entry!(self, chan.commitment_signed(&msg, &self.logger), chan_phase_entry); if let Some(monitor_update) = monitor_update_opt { handle_new_monitor_update!(self, funding_txo.unwrap(), monitor_update, peer_state_lock, - peer_state, per_peer_state, chan_phase_entry).map(|_| ()) - } else { Ok(()) } + peer_state, per_peer_state, chan); + } + Ok(()) } else { return try_chan_phase_entry!(self, Err(ChannelError::Close( "Got a commitment_signed message for an unfunded channel!".into())), chan_phase_entry); @@ -6256,8 +6609,25 @@ where }) } + #[cfg(any(test, feature = "_test_utils"))] + pub(crate) fn test_raa_monitor_updates_held(&self, + counterparty_node_id: PublicKey, channel_id: ChannelId + ) -> bool { + let per_peer_state = self.per_peer_state.read().unwrap(); + if let Some(peer_state_mtx) = per_peer_state.get(&counterparty_node_id) { + let mut peer_state_lck = peer_state_mtx.lock().unwrap(); + let peer_state = &mut *peer_state_lck; + + if let Some(chan) = peer_state.channel_by_id.get(&channel_id) { + return self.raa_monitor_updates_held(&peer_state.actions_blocking_raa_monitor_updates, + chan.context().get_funding_txo().unwrap(), counterparty_node_id); + } + } + false + } + fn internal_revoke_and_ack(&self, counterparty_node_id: &PublicKey, msg: &msgs::RevokeAndACK) -> Result<(), MsgHandleErrInternal> { - let (htlcs_to_fail, res) = { + let htlcs_to_fail = { let per_peer_state = self.per_peer_state.read().unwrap(); let mut peer_state_lock = per_peer_state.get(counterparty_node_id) .ok_or_else(|| { @@ -6276,13 +6646,13 @@ where } else { false }; let (htlcs_to_fail, monitor_update_opt) = try_chan_phase_entry!(self, chan.revoke_and_ack(&msg, &self.fee_estimator, &self.logger, mon_update_blocked), chan_phase_entry); - let res = if let Some(monitor_update) = monitor_update_opt { + if let Some(monitor_update) = monitor_update_opt { let funding_txo = funding_txo_opt .expect("Funding outpoint must have been set for RAA handling to succeed"); handle_new_monitor_update!(self, funding_txo, monitor_update, - peer_state_lock, peer_state, per_peer_state, chan_phase_entry).map(|_| ()) - } else { Ok(()) }; - (htlcs_to_fail, res) + peer_state_lock, peer_state, per_peer_state, chan); + } + htlcs_to_fail } else { return try_chan_phase_entry!(self, Err(ChannelError::Close( "Got a revoke_and_ack message for an unfunded channel!".into())), chan_phase_entry); @@ -6292,7 +6662,7 @@ where } }; self.fail_holding_cell_htlcs(htlcs_to_fail, msg.channel_id, counterparty_node_id); - res + Ok(()) } fn internal_update_fee(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateFee) -> Result<(), MsgHandleErrInternal> { @@ -6353,19 +6723,19 @@ where Ok(()) } - /// Returns ShouldPersist if anything changed, otherwise either SkipPersist or an Err. + /// Returns DoPersist if anything changed, otherwise either SkipPersistNoEvents or an Err. fn internal_channel_update(&self, counterparty_node_id: &PublicKey, msg: &msgs::ChannelUpdate) -> Result { let (chan_counterparty_node_id, chan_id) = match self.short_to_chan_info.read().unwrap().get(&msg.contents.short_channel_id) { Some((cp_id, chan_id)) => (cp_id.clone(), chan_id.clone()), None => { // It's not a local channel - return Ok(NotifyOption::SkipPersist) + return Ok(NotifyOption::SkipPersistNoEvents) } }; let per_peer_state = self.per_peer_state.read().unwrap(); let peer_state_mutex_opt = per_peer_state.get(&chan_counterparty_node_id); if peer_state_mutex_opt.is_none() { - return Ok(NotifyOption::SkipPersist) + return Ok(NotifyOption::SkipPersistNoEvents) } let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); let peer_state = &mut *peer_state_lock; @@ -6377,16 +6747,16 @@ where // If the announcement is about a channel of ours which is public, some // other peer may simply be forwarding all its gossip to us. Don't provide // a scary-looking error message and return Ok instead. - return Ok(NotifyOption::SkipPersist); + return Ok(NotifyOption::SkipPersistNoEvents); } return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a channel_update for a channel from the wrong node - it shouldn't know about our private channels!".to_owned(), chan_id)); } let were_node_one = self.get_our_node_id().serialize()[..] < chan.context.get_counterparty_node_id().serialize()[..]; let msg_from_node_one = msg.contents.flags & 1 == 0; if were_node_one == msg_from_node_one { - return Ok(NotifyOption::SkipPersist); + return Ok(NotifyOption::SkipPersistNoEvents); } else { - log_debug!(self.logger, "Received channel_update for channel {}.", chan_id); + log_debug!(self.logger, "Received channel_update {:?} for channel {}.", msg, chan_id); try_chan_phase_entry!(self, chan.channel_update(&msg), chan_phase_entry); } } else { @@ -6394,12 +6764,12 @@ where "Got a channel_update for an unfunded channel!".into())), chan_phase_entry); } }, - hash_map::Entry::Vacant(_) => return Ok(NotifyOption::SkipPersist) + hash_map::Entry::Vacant(_) => return Ok(NotifyOption::SkipPersistNoEvents) } Ok(NotifyOption::DoPersist) } - fn internal_channel_reestablish(&self, counterparty_node_id: &PublicKey, msg: &msgs::ChannelReestablish) -> Result<(), MsgHandleErrInternal> { + fn internal_channel_reestablish(&self, counterparty_node_id: &PublicKey, msg: &msgs::ChannelReestablish) -> Result { let htlc_forwards; let need_lnd_workaround = { let per_peer_state = self.per_peer_state.read().unwrap(); @@ -6455,14 +6825,16 @@ where } }; + let mut persist = NotifyOption::SkipPersistHandleEvents; if let Some(forwards) = htlc_forwards { self.forward_htlcs(&mut [forwards][..]); + persist = NotifyOption::DoPersist; } if let Some(channel_ready_msg) = need_lnd_workaround { self.internal_channel_ready(counterparty_node_id, &channel_ready_msg)?; } - Ok(()) + Ok(persist) } /// Process pending events from the [`chain::Watch`], returning whether any events were processed. @@ -6477,8 +6849,8 @@ where match monitor_event { MonitorEvent::HTLCEvent(htlc_update) => { if let Some(preimage) = htlc_update.payment_preimage { - log_trace!(self.logger, "Claiming HTLC with preimage {} from our monitor", &preimage); - self.claim_funds_internal(htlc_update.source, preimage, htlc_update.htlc_value_satoshis.map(|v| v * 1000), true, funding_outpoint); + log_trace!(self.logger, "Claiming HTLC with preimage {} from our monitor", preimage); + self.claim_funds_internal(htlc_update.source, preimage, htlc_update.htlc_value_satoshis.map(|v| v * 1000), true, counterparty_node_id, funding_outpoint); } else { log_trace!(self.logger, "Failing HTLC with hash {} from our monitor", &htlc_update.payment_hash); let receiver = HTLCDestination::NextHopChannel { node_id: counterparty_node_id, channel_id: funding_outpoint.to_channel_id() }; @@ -6486,8 +6858,7 @@ where self.fail_htlc_backwards_internal(&htlc_update.source, &htlc_update.payment_hash, &reason, receiver); } }, - MonitorEvent::CommitmentTxConfirmed(funding_outpoint) | - MonitorEvent::UpdateFailed(funding_outpoint) => { + MonitorEvent::HolderForceClosed(funding_outpoint) => { let counterparty_node_id_opt = match counterparty_node_id { Some(cp_id) => Some(cp_id), None => { @@ -6511,12 +6882,7 @@ where msg: update }); } - let reason = if let MonitorEvent::UpdateFailed(_) = monitor_event { - ClosureReason::ProcessingError { err: "Failed to persist ChannelMonitor update during chain sync".to_string() } - } else { - ClosureReason::CommitmentTxConfirmed - }; - self.issue_channel_close_events(&chan.context, reason); + self.issue_channel_close_events(&chan.context, ClosureReason::HolderForceClosed); pending_msg_events.push(events::MessageSendEvent::HandleError { node_id: chan.context.get_counterparty_node_id(), action: msgs::ErrorAction::SendErrorMessage { @@ -6536,7 +6902,7 @@ where } for failure in failed_channels.drain(..) { - self.finish_force_close_channel(failure); + self.finish_close_channel(failure); } has_pending_monitor_events @@ -6557,7 +6923,6 @@ where fn check_free_holding_cells(&self) -> bool { let mut has_monitor_update = false; let mut failed_htlcs = Vec::new(); - let mut handle_errors = Vec::new(); // Walk our list of channels and find any that need to update. Note that when we do find an // update, if it includes actions that must be taken afterwards, we have to drop the @@ -6582,13 +6947,8 @@ where if let Some(monitor_update) = monitor_opt { has_monitor_update = true; - let channel_id: ChannelId = *channel_id; - let res = handle_new_monitor_update!(self, funding_txo.unwrap(), monitor_update, - peer_state_lock, peer_state, per_peer_state, chan, MANUALLY_REMOVING, - peer_state.channel_by_id.remove(&channel_id)); - if res.is_err() { - handle_errors.push((counterparty_node_id, res)); - } + handle_new_monitor_update!(self, funding_txo.unwrap(), monitor_update, + peer_state_lock, peer_state, per_peer_state, chan); continue 'peer_loop; } } @@ -6598,15 +6958,11 @@ where break 'peer_loop; } - let has_update = has_monitor_update || !failed_htlcs.is_empty() || !handle_errors.is_empty(); + let has_update = has_monitor_update || !failed_htlcs.is_empty(); for (failures, channel_id, counterparty_node_id) in failed_htlcs.drain(..) { self.fail_holding_cell_htlcs(failures, channel_id, &counterparty_node_id); } - for (counterparty_node_id, err) in handle_errors.drain(..) { - let _ = handle_error!(self, err, counterparty_node_id); - } - has_update } @@ -6616,6 +6972,8 @@ where fn maybe_generate_initial_closing_signed(&self) -> bool { let mut handle_errors: Vec<(PublicKey, Result<(), _>)> = Vec::new(); let mut has_update = false; + let mut shutdown_result = None; + let mut unbroadcasted_batch_funding_txid = None; { let per_peer_state = self.per_peer_state.read().unwrap(); @@ -6626,6 +6984,7 @@ where peer_state.channel_by_id.retain(|channel_id, phase| { match phase { ChannelPhase::Funded(chan) => { + unbroadcasted_batch_funding_txid = chan.context.unbroadcasted_batch_funding_txid(); match chan.maybe_propose_closing_signed(&self.fee_estimator, &self.logger) { Ok((msg_opt, tx_opt)) => { if let Some(msg) = msg_opt { @@ -6648,6 +7007,7 @@ where log_info!(self.logger, "Broadcasting {}", log_tx!(tx)); self.tx_broadcaster.broadcast_transactions(&[&tx]); update_maps_on_chan_removal!(self, &chan.context); + shutdown_result = Some((None, Vec::new(), unbroadcasted_batch_funding_txid)); false } else { true } }, @@ -6669,6 +7029,10 @@ where let _ = handle_error!(self, err, counterparty_node_id); } + if let Some(shutdown_result) = shutdown_result { + self.finish_close_channel(shutdown_result); + } + has_update } @@ -6694,7 +7058,7 @@ where counterparty_node_id, funding_txo, update }); } - self.finish_force_close_channel(failure); + self.finish_close_channel(failure); } } @@ -6901,7 +7265,6 @@ where /// operation. It will double-check that nothing *else* is also blocking the same channel from /// making progress and then let any blocked [`ChannelMonitorUpdate`]s fly. fn handle_monitor_update_release(&self, counterparty_node_id: PublicKey, channel_funding_outpoint: OutPoint, mut completed_blocker: Option) { - let mut errors = Vec::new(); loop { let per_peer_state = self.per_peer_state.read().unwrap(); if let Some(peer_state_mtx) = per_peer_state.get(&counterparty_node_id) { @@ -6933,11 +7296,8 @@ where if let Some((monitor_update, further_update_exists)) = chan.unblock_next_blocked_monitor_update() { log_debug!(self.logger, "Unlocking monitor updating for channel {} and updating monitor", channel_funding_outpoint.to_channel_id()); - if let Err(e) = handle_new_monitor_update!(self, channel_funding_outpoint, monitor_update, - peer_state_lck, peer_state, per_peer_state, chan_phase_entry) - { - errors.push((e, counterparty_node_id)); - } + handle_new_monitor_update!(self, channel_funding_outpoint, monitor_update, + peer_state_lck, peer_state, per_peer_state, chan); if further_update_exists { // If there are more `ChannelMonitorUpdate`s to process, restart at the // top of the loop. @@ -6956,10 +7316,6 @@ where } break; } - for (err, counterparty_node_id) in errors { - let res = Err::<(), _>(err); - let _ = handle_error!(self, res, counterparty_node_id); - } } fn handle_post_event_actions(&self, actions: Vec) { @@ -7012,8 +7368,8 @@ where /// the `MessageSendEvent`s to the specific peer they were generated under. fn get_and_clear_pending_msg_events(&self) -> Vec { let events = RefCell::new(Vec::new()); - PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, &self.persistence_notifier, || { - let mut result = self.process_background_events(); + PersistenceNotifierGuard::optionally_notify(self, || { + let mut result = NotifyOption::SkipPersistNoEvents; // TODO: This behavior should be documented. It's unintuitive that we query // ChannelMonitors when clearing other events. @@ -7094,8 +7450,9 @@ where } fn block_disconnected(&self, header: &BlockHeader, height: u32) { - let _persistence_guard = PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, - &self.persistence_notifier, || -> NotifyOption { NotifyOption::DoPersist }); + let _persistence_guard = + PersistenceNotifierGuard::optionally_notify_skipping_background_events( + self, || -> NotifyOption { NotifyOption::DoPersist }); let new_height = height - 1; { let mut best_block = self.best_block.write().unwrap(); @@ -7129,8 +7486,9 @@ where let block_hash = header.block_hash(); log_trace!(self.logger, "{} transactions included in block {} at height {} provided", txdata.len(), block_hash, height); - let _persistence_guard = PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, - &self.persistence_notifier, || -> NotifyOption { NotifyOption::DoPersist }); + let _persistence_guard = + PersistenceNotifierGuard::optionally_notify_skipping_background_events( + self, || -> NotifyOption { NotifyOption::DoPersist }); self.do_chain_event(Some(height), |channel| channel.transactions_confirmed(&block_hash, height, txdata, self.genesis_hash.clone(), &self.node_signer, &self.default_configuration, &self.logger) .map(|(a, b)| (a, Vec::new(), b))); @@ -7149,8 +7507,9 @@ where let block_hash = header.block_hash(); log_trace!(self.logger, "New best block: {} at height {}", block_hash, height); - let _persistence_guard = PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, - &self.persistence_notifier, || -> NotifyOption { NotifyOption::DoPersist }); + let _persistence_guard = + PersistenceNotifierGuard::optionally_notify_skipping_background_events( + self, || -> NotifyOption { NotifyOption::DoPersist }); *self.best_block.write().unwrap() = BestBlock::new(block_hash, height); self.do_chain_event(Some(height), |channel| channel.best_block_updated(height, header.time, self.genesis_hash.clone(), &self.node_signer, &self.default_configuration, &self.logger)); @@ -7193,8 +7552,9 @@ where } fn transaction_unconfirmed(&self, txid: &Txid) { - let _persistence_guard = PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, - &self.persistence_notifier, || -> NotifyOption { NotifyOption::DoPersist }); + let _persistence_guard = + PersistenceNotifierGuard::optionally_notify_skipping_background_events( + self, || -> NotifyOption { NotifyOption::DoPersist }); self.do_chain_event(None, |channel| { if let Some(funding_txo) = channel.context.get_funding_txo() { if funding_txo.txid == *txid { @@ -7377,18 +7737,26 @@ where } } - /// Gets a [`Future`] that completes when this [`ChannelManager`] needs to be persisted. + /// Gets a [`Future`] that completes when this [`ChannelManager`] may need to be persisted or + /// may have events that need processing. + /// + /// In order to check if this [`ChannelManager`] needs persisting, call + /// [`Self::get_and_clear_needs_persistence`]. /// /// Note that callbacks registered on the [`Future`] MUST NOT call back into this /// [`ChannelManager`] and should instead register actions to be taken later. - /// - pub fn get_persistable_update_future(&self) -> Future { - self.persistence_notifier.get_future() + pub fn get_event_or_persistence_needed_future(&self) -> Future { + self.event_persist_notifier.get_future() + } + + /// Returns true if this [`ChannelManager`] needs to be persisted. + pub fn get_and_clear_needs_persistence(&self) -> bool { + self.needs_persist_flag.swap(false, Ordering::AcqRel) } #[cfg(any(test, feature = "_test_utils"))] - pub fn get_persistence_condvar_value(&self) -> bool { - self.persistence_notifier.notify_pending() + pub fn get_event_or_persist_condvar_value(&self) -> bool { + self.event_persist_notifier.notify_pending() } /// Gets the latest best block which was connected either via the [`chain::Listen`] or @@ -7445,8 +7813,21 @@ where L::Target: Logger, { fn handle_open_channel(&self, counterparty_node_id: &PublicKey, msg: &msgs::OpenChannel) { - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); - let _ = handle_error!(self, self.internal_open_channel(counterparty_node_id, msg), *counterparty_node_id); + // Note that we never need to persist the updated ChannelManager for an inbound + // open_channel message - pre-funded channels are never written so there should be no + // change to the contents. + let _persistence_guard = PersistenceNotifierGuard::optionally_notify(self, || { + let res = self.internal_open_channel(counterparty_node_id, msg); + let persist = match &res { + Err(e) if e.closes_channel() => { + debug_assert!(false, "We shouldn't close a new channel"); + NotifyOption::DoPersist + }, + _ => NotifyOption::SkipPersistHandleEvents, + }; + let _ = handle_error!(self, res, *counterparty_node_id); + persist + }); } fn handle_open_channel_v2(&self, counterparty_node_id: &PublicKey, msg: &msgs::OpenChannelV2) { @@ -7456,8 +7837,13 @@ where } fn handle_accept_channel(&self, counterparty_node_id: &PublicKey, msg: &msgs::AcceptChannel) { - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); - let _ = handle_error!(self, self.internal_accept_channel(counterparty_node_id, msg), *counterparty_node_id); + // Note that we never need to persist the updated ChannelManager for an inbound + // accept_channel message - pre-funded channels are never written so there should be no + // change to the contents. + let _persistence_guard = PersistenceNotifierGuard::optionally_notify(self, || { + let _ = handle_error!(self, self.internal_accept_channel(counterparty_node_id, msg), *counterparty_node_id); + NotifyOption::SkipPersistHandleEvents + }); } fn handle_accept_channel_v2(&self, counterparty_node_id: &PublicKey, msg: &msgs::AcceptChannelV2) { @@ -7477,8 +7863,19 @@ where } fn handle_channel_ready(&self, counterparty_node_id: &PublicKey, msg: &msgs::ChannelReady) { - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); - let _ = handle_error!(self, self.internal_channel_ready(counterparty_node_id, msg), *counterparty_node_id); + // Note that we never need to persist the updated ChannelManager for an inbound + // channel_ready message - while the channel's state will change, any channel_ready message + // will ultimately be re-sent on startup and the `ChannelMonitor` won't be updated so we + // will not force-close the channel on startup. + let _persistence_guard = PersistenceNotifierGuard::optionally_notify(self, || { + let res = self.internal_channel_ready(counterparty_node_id, msg); + let persist = match &res { + Err(e) if e.closes_channel() => NotifyOption::DoPersist, + _ => NotifyOption::SkipPersistHandleEvents, + }; + let _ = handle_error!(self, res, *counterparty_node_id); + persist + }); } fn handle_shutdown(&self, counterparty_node_id: &PublicKey, msg: &msgs::Shutdown) { @@ -7492,8 +7889,19 @@ where } fn handle_update_add_htlc(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateAddHTLC) { - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); - let _ = handle_error!(self, self.internal_update_add_htlc(counterparty_node_id, msg), *counterparty_node_id); + // Note that we never need to persist the updated ChannelManager for an inbound + // update_add_htlc message - the message itself doesn't change our channel state only the + // `commitment_signed` message afterwards will. + let _persistence_guard = PersistenceNotifierGuard::optionally_notify(self, || { + let res = self.internal_update_add_htlc(counterparty_node_id, msg); + let persist = match &res { + Err(e) if e.closes_channel() => NotifyOption::DoPersist, + Err(_) => NotifyOption::SkipPersistHandleEvents, + Ok(()) => NotifyOption::SkipPersistNoEvents, + }; + let _ = handle_error!(self, res, *counterparty_node_id); + persist + }); } fn handle_update_fulfill_htlc(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateFulfillHTLC) { @@ -7502,13 +7910,35 @@ where } fn handle_update_fail_htlc(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateFailHTLC) { - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); - let _ = handle_error!(self, self.internal_update_fail_htlc(counterparty_node_id, msg), *counterparty_node_id); + // Note that we never need to persist the updated ChannelManager for an inbound + // update_fail_htlc message - the message itself doesn't change our channel state only the + // `commitment_signed` message afterwards will. + let _persistence_guard = PersistenceNotifierGuard::optionally_notify(self, || { + let res = self.internal_update_fail_htlc(counterparty_node_id, msg); + let persist = match &res { + Err(e) if e.closes_channel() => NotifyOption::DoPersist, + Err(_) => NotifyOption::SkipPersistHandleEvents, + Ok(()) => NotifyOption::SkipPersistNoEvents, + }; + let _ = handle_error!(self, res, *counterparty_node_id); + persist + }); } fn handle_update_fail_malformed_htlc(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateFailMalformedHTLC) { - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); - let _ = handle_error!(self, self.internal_update_fail_malformed_htlc(counterparty_node_id, msg), *counterparty_node_id); + // Note that we never need to persist the updated ChannelManager for an inbound + // update_fail_malformed_htlc message - the message itself doesn't change our channel state + // only the `commitment_signed` message afterwards will. + let _persistence_guard = PersistenceNotifierGuard::optionally_notify(self, || { + let res = self.internal_update_fail_malformed_htlc(counterparty_node_id, msg); + let persist = match &res { + Err(e) if e.closes_channel() => NotifyOption::DoPersist, + Err(_) => NotifyOption::SkipPersistHandleEvents, + Ok(()) => NotifyOption::SkipPersistNoEvents, + }; + let _ = handle_error!(self, res, *counterparty_node_id); + persist + }); } fn handle_commitment_signed(&self, counterparty_node_id: &PublicKey, msg: &msgs::CommitmentSigned) { @@ -7522,8 +7952,19 @@ where } fn handle_update_fee(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateFee) { - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); - let _ = handle_error!(self, self.internal_update_fee(counterparty_node_id, msg), *counterparty_node_id); + // Note that we never need to persist the updated ChannelManager for an inbound + // update_fee message - the message itself doesn't change our channel state only the + // `commitment_signed` message afterwards will. + let _persistence_guard = PersistenceNotifierGuard::optionally_notify(self, || { + let res = self.internal_update_fee(counterparty_node_id, msg); + let persist = match &res { + Err(e) if e.closes_channel() => NotifyOption::DoPersist, + Err(_) => NotifyOption::SkipPersistHandleEvents, + Ok(()) => NotifyOption::SkipPersistNoEvents, + }; + let _ = handle_error!(self, res, *counterparty_node_id); + persist + }); } fn handle_announcement_signatures(&self, counterparty_node_id: &PublicKey, msg: &msgs::AnnouncementSignatures) { @@ -7532,23 +7973,31 @@ where } fn handle_channel_update(&self, counterparty_node_id: &PublicKey, msg: &msgs::ChannelUpdate) { - PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, &self.persistence_notifier, || { - let force_persist = self.process_background_events(); + PersistenceNotifierGuard::optionally_notify(self, || { if let Ok(persist) = handle_error!(self, self.internal_channel_update(counterparty_node_id, msg), *counterparty_node_id) { - if force_persist == NotifyOption::DoPersist { NotifyOption::DoPersist } else { persist } + persist } else { - NotifyOption::SkipPersist + NotifyOption::DoPersist } }); } fn handle_channel_reestablish(&self, counterparty_node_id: &PublicKey, msg: &msgs::ChannelReestablish) { - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); - let _ = handle_error!(self, self.internal_channel_reestablish(counterparty_node_id, msg), *counterparty_node_id); + let _persistence_guard = PersistenceNotifierGuard::optionally_notify(self, || { + let res = self.internal_channel_reestablish(counterparty_node_id, msg); + let persist = match &res { + Err(e) if e.closes_channel() => NotifyOption::DoPersist, + Err(_) => NotifyOption::SkipPersistHandleEvents, + Ok(persist) => *persist, + }; + let _ = handle_error!(self, res, *counterparty_node_id); + persist + }); } fn peer_disconnected(&self, counterparty_node_id: &PublicKey) { - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); + let _persistence_guard = PersistenceNotifierGuard::optionally_notify( + self, || NotifyOption::SkipPersistHandleEvents); let mut failed_channels = Vec::new(); let mut per_peer_state = self.per_peer_state.write().unwrap(); let remove_peer = { @@ -7561,24 +8010,24 @@ where peer_state.channel_by_id.retain(|_, phase| { let context = match phase { ChannelPhase::Funded(chan) => { - chan.remove_uncommitted_htlcs_and_mark_paused(&self.logger); - // We only retain funded channels that are not shutdown. - if !chan.is_shutdown() { + if chan.remove_uncommitted_htlcs_and_mark_paused(&self.logger).is_ok() { + // We only retain funded channels that are not shutdown. return true; } - &chan.context + &mut chan.context }, // Unfunded channels will always be removed. ChannelPhase::UnfundedOutboundV1(chan) => { - &chan.context + &mut chan.context }, ChannelPhase::UnfundedInboundV1(chan) => { - &chan.context + &mut chan.context }, }; // Clean up for removal. update_maps_on_chan_removal!(self, &context); self.issue_channel_close_events(&context, ClosureReason::DisconnectedPeer); + failed_channels.push(context.force_shutdown(false)); false }); // Note that we don't bother generating any events for pre-accept channels - @@ -7637,7 +8086,7 @@ where mem::drop(per_peer_state); for failure in failed_channels.drain(..) { - self.finish_force_close_channel(failure); + self.finish_close_channel(failure); } } @@ -7647,76 +8096,82 @@ where return Err(()); } - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); + let mut res = Ok(()); - // If we have too many peers connected which don't have funded channels, disconnect the - // peer immediately (as long as it doesn't have funded channels). If we have a bunch of - // unfunded channels taking up space in memory for disconnected peers, we still let new - // peers connect, but we'll reject new channels from them. - let connected_peers_without_funded_channels = self.peers_without_funded_channels(|node| node.is_connected); - let inbound_peer_limited = inbound && connected_peers_without_funded_channels >= MAX_NO_CHANNEL_PEERS; + PersistenceNotifierGuard::optionally_notify(self, || { + // If we have too many peers connected which don't have funded channels, disconnect the + // peer immediately (as long as it doesn't have funded channels). If we have a bunch of + // unfunded channels taking up space in memory for disconnected peers, we still let new + // peers connect, but we'll reject new channels from them. + let connected_peers_without_funded_channels = self.peers_without_funded_channels(|node| node.is_connected); + let inbound_peer_limited = inbound && connected_peers_without_funded_channels >= MAX_NO_CHANNEL_PEERS; - { - let mut peer_state_lock = self.per_peer_state.write().unwrap(); - match peer_state_lock.entry(counterparty_node_id.clone()) { - hash_map::Entry::Vacant(e) => { - if inbound_peer_limited { - return Err(()); - } - e.insert(Mutex::new(PeerState { - channel_by_id: HashMap::new(), - inbound_channel_request_by_id: HashMap::new(), - latest_features: init_msg.features.clone(), - pending_msg_events: Vec::new(), - in_flight_monitor_updates: BTreeMap::new(), - monitor_update_blocked_actions: BTreeMap::new(), - actions_blocking_raa_monitor_updates: BTreeMap::new(), - is_connected: true, - })); - }, - hash_map::Entry::Occupied(e) => { - let mut peer_state = e.get().lock().unwrap(); - peer_state.latest_features = init_msg.features.clone(); - - let best_block_height = self.best_block.read().unwrap().height(); - if inbound_peer_limited && - Self::unfunded_channel_count(&*peer_state, best_block_height) == - peer_state.channel_by_id.len() - { - return Err(()); - } + { + let mut peer_state_lock = self.per_peer_state.write().unwrap(); + match peer_state_lock.entry(counterparty_node_id.clone()) { + hash_map::Entry::Vacant(e) => { + if inbound_peer_limited { + res = Err(()); + return NotifyOption::SkipPersistNoEvents; + } + e.insert(Mutex::new(PeerState { + channel_by_id: HashMap::new(), + inbound_channel_request_by_id: HashMap::new(), + latest_features: init_msg.features.clone(), + pending_msg_events: Vec::new(), + in_flight_monitor_updates: BTreeMap::new(), + monitor_update_blocked_actions: BTreeMap::new(), + actions_blocking_raa_monitor_updates: BTreeMap::new(), + is_connected: true, + })); + }, + hash_map::Entry::Occupied(e) => { + let mut peer_state = e.get().lock().unwrap(); + peer_state.latest_features = init_msg.features.clone(); + + let best_block_height = self.best_block.read().unwrap().height(); + if inbound_peer_limited && + Self::unfunded_channel_count(&*peer_state, best_block_height) == + peer_state.channel_by_id.len() + { + res = Err(()); + return NotifyOption::SkipPersistNoEvents; + } - debug_assert!(!peer_state.is_connected, "A peer shouldn't be connected twice"); - peer_state.is_connected = true; - }, + debug_assert!(!peer_state.is_connected, "A peer shouldn't be connected twice"); + peer_state.is_connected = true; + }, + } } - } - log_debug!(self.logger, "Generating channel_reestablish events for {}", log_pubkey!(counterparty_node_id)); + log_debug!(self.logger, "Generating channel_reestablish events for {}", log_pubkey!(counterparty_node_id)); - let per_peer_state = self.per_peer_state.read().unwrap(); - if let Some(peer_state_mutex) = per_peer_state.get(counterparty_node_id) { - let mut peer_state_lock = peer_state_mutex.lock().unwrap(); - let peer_state = &mut *peer_state_lock; - let pending_msg_events = &mut peer_state.pending_msg_events; + let per_peer_state = self.per_peer_state.read().unwrap(); + if let Some(peer_state_mutex) = per_peer_state.get(counterparty_node_id) { + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); + let peer_state = &mut *peer_state_lock; + let pending_msg_events = &mut peer_state.pending_msg_events; - peer_state.channel_by_id.iter_mut().filter_map(|(_, phase)| - if let ChannelPhase::Funded(chan) = phase { Some(chan) } else { - // Since unfunded channel maps are cleared upon disconnecting a peer, and they're not persisted - // (so won't be recovered after a crash), they shouldn't exist here and we would never need to - // worry about closing and removing them. - debug_assert!(false); - None - } - ).for_each(|chan| { - pending_msg_events.push(events::MessageSendEvent::SendChannelReestablish { - node_id: chan.context.get_counterparty_node_id(), - msg: chan.get_channel_reestablish(&self.logger), + peer_state.channel_by_id.iter_mut().filter_map(|(_, phase)| + if let ChannelPhase::Funded(chan) = phase { Some(chan) } else { + // Since unfunded channel maps are cleared upon disconnecting a peer, and they're not persisted + // (so won't be recovered after a crash), they shouldn't exist here and we would never need to + // worry about closing and removing them. + debug_assert!(false); + None + } + ).for_each(|chan| { + pending_msg_events.push(events::MessageSendEvent::SendChannelReestablish { + node_id: chan.context.get_counterparty_node_id(), + msg: chan.get_channel_reestablish(&self.logger), + }); }); - }); - } - //TODO: Also re-broadcast announcement_signatures - Ok(()) + } + + return NotifyOption::SkipPersistHandleEvents; + //TODO: Also re-broadcast announcement_signatures + }); + res } fn handle_error(&self, counterparty_node_id: &PublicKey, msg: &msgs::ErrorMessage) { @@ -8376,7 +8831,7 @@ where } number_of_funded_channels += peer_state.channel_by_id.iter().filter( - |(_, phase)| if let ChannelPhase::Funded(chan) = phase { chan.context.is_funding_initiated() } else { false } + |(_, phase)| if let ChannelPhase::Funded(chan) = phase { chan.context.is_funding_broadcast() } else { false } ).count(); } @@ -8387,7 +8842,7 @@ where let peer_state = &mut *peer_state_lock; for channel in peer_state.channel_by_id.iter().filter_map( |(_, phase)| if let ChannelPhase::Funded(channel) = phase { - if channel.context.is_funding_initiated() { Some(channel) } else { None } + if channel.context.is_funding_broadcast() { Some(channel) } else { None } } else { None } ) { channel.write(writer)?; @@ -8811,7 +9266,10 @@ where log_error!(args.logger, " The ChannelMonitor for channel {} is at counterparty commitment transaction number {} but the ChannelManager is at counterparty commitment transaction number {}.", &channel.context.channel_id(), monitor.get_cur_counterparty_commitment_number(), channel.get_cur_counterparty_commitment_transaction_number()); } - let (monitor_update, mut new_failed_htlcs) = channel.context.force_shutdown(true); + let (monitor_update, mut new_failed_htlcs, batch_funding_txid) = channel.context.force_shutdown(true); + if batch_funding_txid.is_some() { + return Err(DecodeError::InvalidValue); + } if let Some((counterparty_node_id, funding_txo, update)) = monitor_update { close_background_events.push(BackgroundEvent::MonitorUpdateRegeneratedOnStartup { counterparty_node_id, funding_txo, update @@ -8851,7 +9309,7 @@ where if let Some(short_channel_id) = channel.context.get_short_channel_id() { short_to_chan_info.insert(short_channel_id, (channel.context.get_counterparty_node_id(), channel.context.channel_id())); } - if channel.context.is_funding_initiated() { + if channel.context.is_funding_broadcast() { id_to_peer.insert(channel.context.channel_id(), channel.context.get_counterparty_node_id()); } match funded_peer_channels.entry(channel.context.get_counterparty_node_id()) { @@ -9216,6 +9674,7 @@ where pending_fee_msat: Some(path_fee), total_msat: path_amt, starting_block_height: best_block_height, + remaining_max_total_routing_fee_msat: None, // only used for retries, and we'll never retry on startup }); log_info!(args.logger, "Added a pending payment for {} msat with payment hash {} for path with session priv {}", path_amt, &htlc.payment_hash, log_bytes!(session_priv_bytes)); @@ -9298,6 +9757,7 @@ where // downstream chan is closed (because we don't have a // channel_id -> peer map entry). counterparty_opt.is_none(), + counterparty_opt.cloned().or(monitor.get_counterparty_node_id()), monitor.get_funding_txo().0)) } else { None } } else { @@ -9559,7 +10019,11 @@ where pending_background_events: Mutex::new(pending_background_events), total_consistency_lock: RwLock::new(()), background_events_processed_since_startup: AtomicBool::new(false), - persistence_notifier: Notifier::new(), + + event_persist_notifier: Notifier::new(), + needs_persist_flag: AtomicBool::new(false), + + funding_batch_states: Mutex::new(BTreeMap::new()), entropy_source: args.entropy_source, node_signer: args.node_signer, @@ -9576,12 +10040,12 @@ where channel_manager.fail_htlc_backwards_internal(&source, &payment_hash, &reason, receiver); } - for (source, preimage, downstream_value, downstream_closed, downstream_funding) in pending_claims_to_replay { + for (source, preimage, downstream_value, downstream_closed, downstream_node_id, downstream_funding) in pending_claims_to_replay { // We use `downstream_closed` in place of `from_onchain` here just as a guess - we // don't remember in the `ChannelMonitor` where we got a preimage from, but if the // channel is closed we just assume that it probably came from an on-chain claim. channel_manager.claim_funds_internal(source, preimage, Some(downstream_value), - downstream_closed, downstream_funding); + downstream_closed, downstream_node_id, downstream_funding); } //TODO: Broadcast channel update for closed channels, but only after we've made a @@ -9621,9 +10085,9 @@ mod tests { // All nodes start with a persistable update pending as `create_network` connects each node // with all other nodes to make most tests simpler. - assert!(nodes[0].node.get_persistable_update_future().poll_is_complete()); - assert!(nodes[1].node.get_persistable_update_future().poll_is_complete()); - assert!(nodes[2].node.get_persistable_update_future().poll_is_complete()); + assert!(nodes[0].node.get_event_or_persistence_needed_future().poll_is_complete()); + assert!(nodes[1].node.get_event_or_persistence_needed_future().poll_is_complete()); + assert!(nodes[2].node.get_event_or_persistence_needed_future().poll_is_complete()); let mut chan = create_announced_chan_between_nodes(&nodes, 0, 1); @@ -9637,19 +10101,19 @@ mod tests { &nodes[0].node.get_our_node_id()).pop().unwrap(); // The first two nodes (which opened a channel) should now require fresh persistence - assert!(nodes[0].node.get_persistable_update_future().poll_is_complete()); - assert!(nodes[1].node.get_persistable_update_future().poll_is_complete()); + assert!(nodes[0].node.get_event_or_persistence_needed_future().poll_is_complete()); + assert!(nodes[1].node.get_event_or_persistence_needed_future().poll_is_complete()); // ... but the last node should not. - assert!(!nodes[2].node.get_persistable_update_future().poll_is_complete()); + assert!(!nodes[2].node.get_event_or_persistence_needed_future().poll_is_complete()); // After persisting the first two nodes they should no longer need fresh persistence. - assert!(!nodes[0].node.get_persistable_update_future().poll_is_complete()); - assert!(!nodes[1].node.get_persistable_update_future().poll_is_complete()); + assert!(!nodes[0].node.get_event_or_persistence_needed_future().poll_is_complete()); + assert!(!nodes[1].node.get_event_or_persistence_needed_future().poll_is_complete()); // Node 3, unrelated to the only channel, shouldn't care if it receives a channel_update // about the channel. nodes[2].node.handle_channel_update(&nodes[1].node.get_our_node_id(), &chan.0); nodes[2].node.handle_channel_update(&nodes[1].node.get_our_node_id(), &chan.1); - assert!(!nodes[2].node.get_persistable_update_future().poll_is_complete()); + assert!(!nodes[2].node.get_event_or_persistence_needed_future().poll_is_complete()); // The nodes which are a party to the channel should also ignore messages from unrelated // parties. @@ -9657,8 +10121,8 @@ mod tests { nodes[0].node.handle_channel_update(&nodes[2].node.get_our_node_id(), &chan.1); nodes[1].node.handle_channel_update(&nodes[2].node.get_our_node_id(), &chan.0); nodes[1].node.handle_channel_update(&nodes[2].node.get_our_node_id(), &chan.1); - assert!(!nodes[0].node.get_persistable_update_future().poll_is_complete()); - assert!(!nodes[1].node.get_persistable_update_future().poll_is_complete()); + assert!(!nodes[0].node.get_event_or_persistence_needed_future().poll_is_complete()); + assert!(!nodes[1].node.get_event_or_persistence_needed_future().poll_is_complete()); // At this point the channel info given by peers should still be the same. assert_eq!(nodes[0].node.list_channels()[0], node_a_chan_info); @@ -9675,8 +10139,8 @@ mod tests { // persisted and that its channel info remains the same. nodes[0].node.handle_channel_update(&nodes[1].node.get_our_node_id(), &as_update); nodes[1].node.handle_channel_update(&nodes[0].node.get_our_node_id(), &bs_update); - assert!(!nodes[0].node.get_persistable_update_future().poll_is_complete()); - assert!(!nodes[1].node.get_persistable_update_future().poll_is_complete()); + assert!(!nodes[0].node.get_event_or_persistence_needed_future().poll_is_complete()); + assert!(!nodes[1].node.get_event_or_persistence_needed_future().poll_is_complete()); assert_eq!(nodes[0].node.list_channels()[0], node_a_chan_info); assert_eq!(nodes[1].node.list_channels()[0], node_b_chan_info); @@ -9684,8 +10148,8 @@ mod tests { // the channel info has updated. nodes[0].node.handle_channel_update(&nodes[1].node.get_our_node_id(), &bs_update); nodes[1].node.handle_channel_update(&nodes[0].node.get_our_node_id(), &as_update); - assert!(nodes[0].node.get_persistable_update_future().poll_is_complete()); - assert!(nodes[1].node.get_persistable_update_future().poll_is_complete()); + assert!(nodes[0].node.get_event_or_persistence_needed_future().poll_is_complete()); + assert!(nodes[1].node.get_event_or_persistence_needed_future().poll_is_complete()); assert_ne!(nodes[0].node.list_channels()[0], node_a_chan_info); assert_ne!(nodes[1].node.list_channels()[0], node_b_chan_info); } @@ -9844,7 +10308,7 @@ mod tests { TEST_FINAL_CLTV, false), 100_000); let route = find_route( &nodes[0].node.get_our_node_id(), &route_params, &nodes[0].network_graph, - None, nodes[0].logger, &scorer, &(), &random_seed_bytes + None, nodes[0].logger, &scorer, &Default::default(), &random_seed_bytes ).unwrap(); nodes[0].node.send_spontaneous_payment(&route, Some(payment_preimage), RecipientOnionFields::spontaneous_empty(), PaymentId(payment_preimage.0)).unwrap(); @@ -9878,7 +10342,7 @@ mod tests { let payment_preimage = PaymentPreimage([42; 32]); let route = find_route( &nodes[0].node.get_our_node_id(), &route_params, &nodes[0].network_graph, - None, nodes[0].logger, &scorer, &(), &random_seed_bytes + None, nodes[0].logger, &scorer, &Default::default(), &random_seed_bytes ).unwrap(); let payment_hash = nodes[0].node.send_spontaneous_payment(&route, Some(payment_preimage), RecipientOnionFields::spontaneous_empty(), PaymentId(payment_preimage.0)).unwrap(); @@ -9935,7 +10399,7 @@ mod tests { ); let route = find_route( &nodes[0].node.get_our_node_id(), &route_params, &nodes[0].network_graph, - None, nodes[0].logger, &scorer, &(), &random_seed_bytes + None, nodes[0].logger, &scorer, &Default::default(), &random_seed_bytes ).unwrap(); let payment_id_2 = PaymentId([45; 32]); nodes[0].node.send_spontaneous_payment(&route, Some(payment_preimage), @@ -9986,7 +10450,7 @@ mod tests { let random_seed_bytes = chanmon_cfgs[1].keys_manager.get_secure_random_bytes(); let route = find_route( &payer_pubkey, &route_params, &network_graph, Some(&first_hops.iter().collect::>()), - nodes[0].logger, &scorer, &(), &random_seed_bytes + nodes[0].logger, &scorer, &Default::default(), &random_seed_bytes ).unwrap(); let test_preimage = PaymentPreimage([42; 32]); @@ -10031,7 +10495,7 @@ mod tests { let random_seed_bytes = chanmon_cfgs[1].keys_manager.get_secure_random_bytes(); let route = find_route( &payer_pubkey, &route_params, &network_graph, Some(&first_hops.iter().collect::>()), - nodes[0].logger, &scorer, &(), &random_seed_bytes + nodes[0].logger, &scorer, &Default::default(), &random_seed_bytes ).unwrap(); let test_preimage = PaymentPreimage([42; 32]);