X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning%2Fsrc%2Fln%2Fchannelmanager.rs;h=90451d59d66d635821ffbe3ce32790d471290c6c;hb=24db35eeea7049a3abde9766baf8dda69b462c3f;hp=64d5521dcb95f78ba3317398e19438f074b19dbc;hpb=2c4f82478e83ccfc8948b24d7cb14ef9913e011f;p=rust-lightning diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 64d5521d..90451d59 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -40,7 +40,7 @@ use crate::events::{Event, EventHandler, EventsProvider, MessageSendEvent, Messa // Since this struct is returned in `list_channels` methods, expose it here in case users want to // construct one themselves. use crate::ln::{inbound_payment, ChannelId, PaymentHash, PaymentPreimage, PaymentSecret}; -use crate::ln::channel::{Channel, ChannelContext, ChannelError, ChannelUpdateStatus, ShutdownResult, UnfundedChannelContext, UpdateFulfillCommitFetch, OutboundV1Channel, InboundV1Channel}; +use crate::ln::channel::{Channel, ChannelPhase, ChannelContext, ChannelError, ChannelUpdateStatus, ShutdownResult, UnfundedChannelContext, UpdateFulfillCommitFetch, OutboundV1Channel, InboundV1Channel}; use crate::ln::features::{ChannelFeatures, ChannelTypeFeatures, InitFeatures, NodeFeatures}; #[cfg(any(feature = "_test_utils", test))] use crate::ln::features::Bolt11InvoiceFeatures; @@ -177,7 +177,7 @@ pub(super) enum HTLCForwardInfo { } /// Tracks the inbound corresponding to an outbound HTLC -#[derive(Clone, Hash, PartialEq, Eq)] +#[derive(Clone, Debug, Hash, PartialEq, Eq)] pub(crate) struct HTLCPreviousHopData { // Note that this may be an outbound SCID alias for the associated channel. short_channel_id: u64, @@ -233,11 +233,17 @@ impl From<&ClaimableHTLC> for events::ClaimedHTLC { } } -/// A payment identifier used to uniquely identify a payment to LDK. +/// A user-provided identifier in [`ChannelManager::send_payment`] used to uniquely identify +/// a payment and ensure idempotency in LDK. /// /// This is not exported to bindings users as we just use [u8; 32] directly #[derive(Hash, Copy, Clone, PartialEq, Eq, Debug)] -pub struct PaymentId(pub [u8; 32]); +pub struct PaymentId(pub [u8; Self::LENGTH]); + +impl PaymentId { + /// Number of bytes in the id. + pub const LENGTH: usize = 32; +} impl Writeable for PaymentId { fn write(&self, w: &mut W) -> Result<(), io::Error> { @@ -277,7 +283,7 @@ impl Readable for InterceptId { } } -#[derive(Clone, Copy, PartialEq, Eq, Hash)] +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] /// Uniquely describes an HTLC by its source. Just the guaranteed-unique subset of [`HTLCSource`]. pub(crate) enum SentHTLCId { PreviousHopData { short_channel_id: u64, htlc_id: u64 }, @@ -308,7 +314,7 @@ impl_writeable_tlv_based_enum!(SentHTLCId, /// Tracks the inbound corresponding to an outbound HTLC #[allow(clippy::derive_hash_xor_eq)] // Our Hash is faithful to the data, we just don't have SecretKey::hash -#[derive(Clone, PartialEq, Eq)] +#[derive(Clone, Debug, PartialEq, Eq)] pub(crate) enum HTLCSource { PreviousHopData(HTLCPreviousHopData), OutboundRoute { @@ -489,6 +495,10 @@ impl MsgHandleErrInternal { channel_capacity: None, } } + + fn closes_channel(&self) -> bool { + self.chan_id.is_some() + } } /// We hold back HTLCs we intend to relay for a random interval greater than this (see @@ -650,7 +660,6 @@ pub(crate) enum RAAMonitorUpdateBlockingAction { } impl RAAMonitorUpdateBlockingAction { - #[allow(unused)] fn from_prev_hop_data(prev_hop: &HTLCPreviousHopData) -> Self { Self::ForwardedPaymentInboundClaim { channel_id: prev_hop.outpoint.to_channel_id(), @@ -666,22 +675,10 @@ impl_writeable_tlv_based_enum!(RAAMonitorUpdateBlockingAction, /// State we hold per-peer. pub(super) struct PeerState where SP::Target: SignerProvider { - /// `channel_id` -> `Channel`. - /// - /// Holds all funded channels where the peer is the counterparty. - pub(super) channel_by_id: HashMap>, - /// `temporary_channel_id` -> `OutboundV1Channel`. - /// - /// Holds all outbound V1 channels where the peer is the counterparty. Once an outbound channel has - /// been assigned a `channel_id`, the entry in this map is removed and one is created in - /// `channel_by_id`. - pub(super) outbound_v1_channel_by_id: HashMap>, - /// `temporary_channel_id` -> `InboundV1Channel`. - /// - /// Holds all inbound V1 channels where the peer is the counterparty. Once an inbound channel has - /// been assigned a `channel_id`, the entry in this map is removed and one is created in - /// `channel_by_id`. - pub(super) inbound_v1_channel_by_id: HashMap>, + /// `channel_id` -> `ChannelPhase` + /// + /// Holds all channels within corresponding `ChannelPhase`s where the peer is the counterparty. + pub(super) channel_by_id: HashMap>, /// `temporary_channel_id` -> `InboundChannelRequest`. /// /// When manual channel acceptance is enabled, this holds all unaccepted inbound channels where @@ -735,24 +732,20 @@ impl PeerState where SP::Target: SignerProvider { if require_disconnected && self.is_connected { return false } - self.channel_by_id.is_empty() && self.monitor_update_blocked_actions.is_empty() + self.channel_by_id.iter().filter(|(_, phase)| matches!(phase, ChannelPhase::Funded(_))).count() == 0 + && self.monitor_update_blocked_actions.is_empty() && self.in_flight_monitor_updates.is_empty() } // Returns a count of all channels we have with this peer, including unfunded channels. fn total_channel_count(&self) -> usize { - self.channel_by_id.len() + - self.outbound_v1_channel_by_id.len() + - self.inbound_v1_channel_by_id.len() + - self.inbound_channel_request_by_id.len() + self.channel_by_id.len() + self.inbound_channel_request_by_id.len() } // Returns a bool indicating if the given `channel_id` matches a channel we have with this peer. fn has_channel(&self, channel_id: &ChannelId) -> bool { - self.channel_by_id.contains_key(&channel_id) || - self.outbound_v1_channel_by_id.contains_key(&channel_id) || - self.inbound_v1_channel_by_id.contains_key(&channel_id) || - self.inbound_channel_request_by_id.contains_key(&channel_id) + self.channel_by_id.contains_key(channel_id) || + self.inbound_channel_request_by_id.contains_key(channel_id) } } @@ -1196,7 +1189,8 @@ where background_events_processed_since_startup: AtomicBool, - persistence_notifier: Notifier, + event_persist_notifier: Notifier, + needs_persist_flag: AtomicBool, entropy_source: ES, node_signer: NS, @@ -1225,7 +1219,8 @@ pub struct ChainParameters { #[must_use] enum NotifyOption { DoPersist, - SkipPersist, + SkipPersistHandleEvents, + SkipPersistNoEvents, } /// Whenever we release the `ChannelManager`'s `total_consistency_lock`, from read mode, it is @@ -1238,43 +1233,75 @@ enum NotifyOption { /// We allow callers to either always notify by constructing with `notify_on_drop` or choose to /// notify or not based on whether relevant changes have been made, providing a closure to /// `optionally_notify` which returns a `NotifyOption`. -struct PersistenceNotifierGuard<'a, F: Fn() -> NotifyOption> { - persistence_notifier: &'a Notifier, +struct PersistenceNotifierGuard<'a, F: FnMut() -> NotifyOption> { + event_persist_notifier: &'a Notifier, + needs_persist_flag: &'a AtomicBool, should_persist: F, // We hold onto this result so the lock doesn't get released immediately. _read_guard: RwLockReadGuard<'a, ()>, } impl<'a> PersistenceNotifierGuard<'a, fn() -> NotifyOption> { // We don't care what the concrete F is here, it's unused - fn notify_on_drop(cm: &'a C) -> PersistenceNotifierGuard<'a, impl Fn() -> NotifyOption> { + /// Notifies any waiters and indicates that we need to persist, in addition to possibly having + /// events to handle. + /// + /// This must always be called if the changes included a `ChannelMonitorUpdate`, as well as in + /// other cases where losing the changes on restart may result in a force-close or otherwise + /// isn't ideal. + fn notify_on_drop(cm: &'a C) -> PersistenceNotifierGuard<'a, impl FnMut() -> NotifyOption> { + Self::optionally_notify(cm, || -> NotifyOption { NotifyOption::DoPersist }) + } + + fn optionally_notify NotifyOption, C: AChannelManager>(cm: &'a C, mut persist_check: F) + -> PersistenceNotifierGuard<'a, impl FnMut() -> NotifyOption> { let read_guard = cm.get_cm().total_consistency_lock.read().unwrap(); - let _ = cm.get_cm().process_background_events(); // We always persist + let force_notify = cm.get_cm().process_background_events(); PersistenceNotifierGuard { - persistence_notifier: &cm.get_cm().persistence_notifier, - should_persist: || -> NotifyOption { NotifyOption::DoPersist }, + event_persist_notifier: &cm.get_cm().event_persist_notifier, + needs_persist_flag: &cm.get_cm().needs_persist_flag, + should_persist: move || { + // Pick the "most" action between `persist_check` and the background events + // processing and return that. + let notify = persist_check(); + match (notify, force_notify) { + (NotifyOption::DoPersist, _) => NotifyOption::DoPersist, + (_, NotifyOption::DoPersist) => NotifyOption::DoPersist, + (NotifyOption::SkipPersistHandleEvents, _) => NotifyOption::SkipPersistHandleEvents, + (_, NotifyOption::SkipPersistHandleEvents) => NotifyOption::SkipPersistHandleEvents, + _ => NotifyOption::SkipPersistNoEvents, + } + }, _read_guard: read_guard, } - } /// Note that if any [`ChannelMonitorUpdate`]s are possibly generated, - /// [`ChannelManager::process_background_events`] MUST be called first. - fn optionally_notify NotifyOption>(lock: &'a RwLock<()>, notifier: &'a Notifier, persist_check: F) -> PersistenceNotifierGuard<'a, F> { - let read_guard = lock.read().unwrap(); + /// [`ChannelManager::process_background_events`] MUST be called first (or + /// [`Self::optionally_notify`] used). + fn optionally_notify_skipping_background_events NotifyOption, C: AChannelManager> + (cm: &'a C, persist_check: F) -> PersistenceNotifierGuard<'a, F> { + let read_guard = cm.get_cm().total_consistency_lock.read().unwrap(); PersistenceNotifierGuard { - persistence_notifier: notifier, + event_persist_notifier: &cm.get_cm().event_persist_notifier, + needs_persist_flag: &cm.get_cm().needs_persist_flag, should_persist: persist_check, _read_guard: read_guard, } } } -impl<'a, F: Fn() -> NotifyOption> Drop for PersistenceNotifierGuard<'a, F> { +impl<'a, F: FnMut() -> NotifyOption> Drop for PersistenceNotifierGuard<'a, F> { fn drop(&mut self) { - if (self.should_persist)() == NotifyOption::DoPersist { - self.persistence_notifier.notify(); + match (self.should_persist)() { + NotifyOption::DoPersist => { + self.needs_persist_flag.store(true, Ordering::Release); + self.event_persist_notifier.notify() + }, + NotifyOption::SkipPersistHandleEvents => + self.event_persist_notifier.notify(), + NotifyOption::SkipPersistNoEvents => {}, } } } @@ -1336,11 +1363,6 @@ const CHECK_CLTV_EXPIRY_SANITY_2: u32 = MIN_CLTV_EXPIRY_DELTA as u32 - LATENCY_G /// The number of ticks of [`ChannelManager::timer_tick_occurred`] until expiry of incomplete MPPs pub(crate) const MPP_TIMEOUT_TICKS: u8 = 3; -/// The number of ticks of [`ChannelManager::timer_tick_occurred`] until we time-out the -/// idempotency of payments by [`PaymentId`]. See -/// [`OutboundPayments::remove_stale_resolved_payments`]. -pub(crate) const IDEMPOTENCY_TIMEOUT_TICKS: u8 = 7; - /// The number of ticks of [`ChannelManager::timer_tick_occurred`] where a peer is disconnected /// until we mark the channel disabled and gossip the update. pub(crate) const DISABLE_GOSSIP_TICKS: u8 = 10; @@ -1683,8 +1705,17 @@ pub enum ChannelShutdownState { /// These include payments that have yet to find a successful path, or have unresolved HTLCs. #[derive(Debug, PartialEq)] pub enum RecentPaymentDetails { + /// When an invoice was requested and thus a payment has not yet been sent. + AwaitingInvoice { + /// A user-provided identifier in [`ChannelManager::send_payment`] used to uniquely identify + /// a payment and ensure idempotency in LDK. + payment_id: PaymentId, + }, /// When a payment is still being sent and awaiting successful delivery. Pending { + /// A user-provided identifier in [`ChannelManager::send_payment`] used to uniquely identify + /// a payment and ensure idempotency in LDK. + payment_id: PaymentId, /// Hash of the payment that is currently being sent but has yet to be fulfilled or /// abandoned. payment_hash: PaymentHash, @@ -1696,6 +1727,9 @@ pub enum RecentPaymentDetails { /// been resolved. Upon receiving [`Event::PaymentSent`], we delay for a few minutes before the /// payment is removed from tracking. Fulfilled { + /// A user-provided identifier in [`ChannelManager::send_payment`] used to uniquely identify + /// a payment and ensure idempotency in LDK. + payment_id: PaymentId, /// Hash of the payment that was claimed. `None` for serializations of [`ChannelManager`] /// made before LDK version 0.0.104. payment_hash: Option, @@ -1704,6 +1738,9 @@ pub enum RecentPaymentDetails { /// abandoned via [`ChannelManager::abandon_payment`], it is marked as abandoned until all /// pending HTLCs for this payment resolve and an [`Event::PaymentFailed`] is generated. Abandoned { + /// A user-provided identifier in [`ChannelManager::send_payment`] used to uniquely identify + /// a payment and ensure idempotency in LDK. + payment_id: PaymentId, /// Hash of the payment that we have given up trying to send. payment_hash: PaymentHash, }, @@ -1806,45 +1843,55 @@ macro_rules! update_maps_on_chan_removal { } /// Returns (boolean indicating if we should remove the Channel object from memory, a mapped error) -macro_rules! convert_chan_err { - ($self: ident, $err: expr, $channel: expr, $channel_id: expr) => { +macro_rules! convert_chan_phase_err { + ($self: ident, $err: expr, $channel: expr, $channel_id: expr, MANUAL_CHANNEL_UPDATE, $channel_update: expr) => { match $err { ChannelError::Warn(msg) => { - (false, MsgHandleErrInternal::from_chan_no_close(ChannelError::Warn(msg), $channel_id.clone())) + (false, MsgHandleErrInternal::from_chan_no_close(ChannelError::Warn(msg), *$channel_id)) }, ChannelError::Ignore(msg) => { - (false, MsgHandleErrInternal::from_chan_no_close(ChannelError::Ignore(msg), $channel_id.clone())) + (false, MsgHandleErrInternal::from_chan_no_close(ChannelError::Ignore(msg), *$channel_id)) }, ChannelError::Close(msg) => { - log_error!($self.logger, "Closing channel {} due to close-required error: {}", &$channel_id, msg); - update_maps_on_chan_removal!($self, &$channel.context); + log_error!($self.logger, "Closing channel {} due to close-required error: {}", $channel_id, msg); + update_maps_on_chan_removal!($self, $channel.context); let shutdown_res = $channel.context.force_shutdown(true); - (true, MsgHandleErrInternal::from_finish_shutdown(msg, *$channel_id, $channel.context.get_user_id(), - shutdown_res, $self.get_channel_update_for_broadcast(&$channel).ok(), $channel.context.get_value_satoshis())) + let user_id = $channel.context.get_user_id(); + let channel_capacity_satoshis = $channel.context.get_value_satoshis(); + + (true, MsgHandleErrInternal::from_finish_shutdown(msg, *$channel_id, user_id, + shutdown_res, $channel_update, channel_capacity_satoshis)) }, } }; - ($self: ident, $err: expr, $channel_context: expr, $channel_id: expr, UNFUNDED) => { - match $err { - // We should only ever have `ChannelError::Close` when unfunded channels error. - // In any case, just close the channel. - ChannelError::Warn(msg) | ChannelError::Ignore(msg) | ChannelError::Close(msg) => { - log_error!($self.logger, "Closing unfunded channel {} due to an error: {}", &$channel_id, msg); - update_maps_on_chan_removal!($self, &$channel_context); - let shutdown_res = $channel_context.force_shutdown(false); - (true, MsgHandleErrInternal::from_finish_shutdown(msg, *$channel_id, $channel_context.get_user_id(), - shutdown_res, None, $channel_context.get_value_satoshis())) + ($self: ident, $err: expr, $channel: expr, $channel_id: expr, FUNDED_CHANNEL) => { + convert_chan_phase_err!($self, $err, $channel, $channel_id, MANUAL_CHANNEL_UPDATE, { $self.get_channel_update_for_broadcast($channel).ok() }) + }; + ($self: ident, $err: expr, $channel: expr, $channel_id: expr, UNFUNDED_CHANNEL) => { + convert_chan_phase_err!($self, $err, $channel, $channel_id, MANUAL_CHANNEL_UPDATE, None) + }; + ($self: ident, $err: expr, $channel_phase: expr, $channel_id: expr) => { + match $channel_phase { + ChannelPhase::Funded(channel) => { + convert_chan_phase_err!($self, $err, channel, $channel_id, FUNDED_CHANNEL) + }, + ChannelPhase::UnfundedOutboundV1(channel) => { + convert_chan_phase_err!($self, $err, channel, $channel_id, UNFUNDED_CHANNEL) + }, + ChannelPhase::UnfundedInboundV1(channel) => { + convert_chan_phase_err!($self, $err, channel, $channel_id, UNFUNDED_CHANNEL) }, } - } + }; } -macro_rules! break_chan_entry { +macro_rules! break_chan_phase_entry { ($self: ident, $res: expr, $entry: expr) => { match $res { Ok(res) => res, Err(e) => { - let (drop, res) = convert_chan_err!($self, e, $entry.get_mut(), $entry.key()); + let key = *$entry.key(); + let (drop, res) = convert_chan_phase_err!($self, e, $entry.get_mut(), &key); if drop { $entry.remove_entry(); } @@ -1854,27 +1901,13 @@ macro_rules! break_chan_entry { } } -macro_rules! try_v1_outbound_chan_entry { - ($self: ident, $res: expr, $entry: expr) => { - match $res { - Ok(res) => res, - Err(e) => { - let (drop, res) = convert_chan_err!($self, e, $entry.get_mut().context, $entry.key(), UNFUNDED); - if drop { - $entry.remove_entry(); - } - return Err(res); - } - } - } -} - -macro_rules! try_chan_entry { +macro_rules! try_chan_phase_entry { ($self: ident, $res: expr, $entry: expr) => { match $res { Ok(res) => res, Err(e) => { - let (drop, res) = convert_chan_err!($self, e, $entry.get_mut(), $entry.key()); + let key = *$entry.key(); + let (drop, res) = convert_chan_phase_err!($self, e, $entry.get_mut(), &key); if drop { $entry.remove_entry(); } @@ -1884,11 +1917,11 @@ macro_rules! try_chan_entry { } } -macro_rules! remove_channel { +macro_rules! remove_channel_phase { ($self: expr, $entry: expr) => { { let channel = $entry.remove_entry().1; - update_maps_on_chan_removal!($self, &channel.context); + update_maps_on_chan_removal!($self, &channel.context()); channel } } @@ -2028,7 +2061,20 @@ macro_rules! handle_new_monitor_update { handle_monitor_update_completion!($self, $peer_state_lock, $peer_state, $per_peer_state_lock, $chan)) }; ($self: ident, $update_res: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan_entry: expr, INITIAL_MONITOR) => { - handle_new_monitor_update!($self, $update_res, $peer_state_lock, $peer_state, $per_peer_state_lock, $chan_entry.get_mut(), MANUALLY_REMOVING_INITIAL_MONITOR, $chan_entry.remove_entry()) + if let ChannelPhase::Funded(chan) = $chan_entry.get_mut() { + handle_new_monitor_update!($self, $update_res, $peer_state_lock, $peer_state, + $per_peer_state_lock, chan, MANUALLY_REMOVING_INITIAL_MONITOR, { $chan_entry.remove() }) + } else { + // We're not supposed to handle monitor updates for unfunded channels (they have no monitors to + // update). + debug_assert!(false); + let channel_id = *$chan_entry.key(); + let (_, err) = convert_chan_phase_err!($self, ChannelError::Close( + "Cannot update monitor for unfunded channels as they don't have monitors yet".into()), + $chan_entry.get_mut(), &channel_id); + $chan_entry.remove(); + Err(err) + } }; ($self: ident, $funding_txo: expr, $update: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan: expr, MANUALLY_REMOVING, $remove: expr) => { { let in_flight_updates = $peer_state.in_flight_monitor_updates.entry($funding_txo) @@ -2052,7 +2098,20 @@ macro_rules! handle_new_monitor_update { }) } }; ($self: ident, $funding_txo: expr, $update: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan_entry: expr) => { - handle_new_monitor_update!($self, $funding_txo, $update, $peer_state_lock, $peer_state, $per_peer_state_lock, $chan_entry.get_mut(), MANUALLY_REMOVING, $chan_entry.remove_entry()) + if let ChannelPhase::Funded(chan) = $chan_entry.get_mut() { + handle_new_monitor_update!($self, $funding_txo, $update, $peer_state_lock, $peer_state, + $per_peer_state_lock, chan, MANUALLY_REMOVING, { $chan_entry.remove() }) + } else { + // We're not supposed to handle monitor updates for unfunded channels (they have no monitors to + // update). + debug_assert!(false); + let channel_id = *$chan_entry.key(); + let (_, err) = convert_chan_phase_err!($self, ChannelError::Close( + "Cannot update monitor for unfunded channels as they don't have monitors yet".into()), + $chan_entry.get_mut(), &channel_id); + $chan_entry.remove(); + Err(err) + } } } @@ -2064,7 +2123,7 @@ macro_rules! process_events_body { return; } - let mut result = NotifyOption::SkipPersist; + let mut result; { // We'll acquire our total consistency lock so that we can be sure no other @@ -2073,7 +2132,7 @@ macro_rules! process_events_body { // Because `handle_post_event_actions` may send `ChannelMonitorUpdate`s to the user we must // ensure any startup-generated background events are handled first. - if $self.process_background_events() == NotifyOption::DoPersist { result = NotifyOption::DoPersist; } + result = $self.process_background_events(); // TODO: This behavior should be documented. It's unintuitive that we query // ChannelMonitors when clearing other events. @@ -2113,8 +2172,14 @@ macro_rules! process_events_body { processed_all_events = false; } - if result == NotifyOption::DoPersist { - $self.persistence_notifier.notify(); + match result { + NotifyOption::DoPersist => { + $self.needs_persist_flag.store(true, Ordering::Release); + $self.event_persist_notifier.notify(); + }, + NotifyOption::SkipPersistHandleEvents => + $self.event_persist_notifier.notify(), + NotifyOption::SkipPersistNoEvents => {}, } } } @@ -2193,7 +2258,9 @@ where pending_background_events: Mutex::new(Vec::new()), total_consistency_lock: RwLock::new(()), background_events_processed_since_startup: AtomicBool::new(false), - persistence_notifier: Notifier::new(), + + event_persist_notifier: Notifier::new(), + needs_persist_flag: AtomicBool::new(false), entropy_source, node_signer, @@ -2289,7 +2356,7 @@ where let res = channel.get_open_channel(self.genesis_hash.clone()); let temporary_channel_id = channel.context.channel_id(); - match peer_state.outbound_v1_channel_by_id.entry(temporary_channel_id) { + match peer_state.channel_by_id.entry(temporary_channel_id) { hash_map::Entry::Occupied(_) => { if cfg!(fuzzing) { return Err(APIError::APIMisuseError { err: "Fuzzy bad RNG".to_owned() }); @@ -2297,7 +2364,7 @@ where panic!("RNG is bad???"); } }, - hash_map::Entry::Vacant(entry) => { entry.insert(channel); } + hash_map::Entry::Vacant(entry) => { entry.insert(ChannelPhase::UnfundedOutboundV1(channel)); } } peer_state.pending_msg_events.push(events::MessageSendEvent::SendOpenChannel { @@ -2321,12 +2388,18 @@ where for (_cp_id, peer_state_mutex) in per_peer_state.iter() { let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; - // Only `Channels` in the channel_by_id map can be considered funded. - for (_channel_id, channel) in peer_state.channel_by_id.iter().filter(f) { - let details = ChannelDetails::from_channel_context(&channel.context, best_block_height, - peer_state.latest_features.clone(), &self.fee_estimator); - res.push(details); - } + res.extend(peer_state.channel_by_id.iter() + .filter_map(|(chan_id, phase)| match phase { + // Only `Channels` in the `ChannelPhase::Funded` phase can be considered funded. + ChannelPhase::Funded(chan) => Some((chan_id, chan)), + _ => None, + }) + .filter(f) + .map(|(_channel_id, channel)| { + ChannelDetails::from_channel_context(&channel.context, best_block_height, + peer_state.latest_features.clone(), &self.fee_estimator) + }) + ); } } res @@ -2348,18 +2421,8 @@ where for (_cp_id, peer_state_mutex) in per_peer_state.iter() { let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; - for (_channel_id, channel) in peer_state.channel_by_id.iter() { - let details = ChannelDetails::from_channel_context(&channel.context, best_block_height, - peer_state.latest_features.clone(), &self.fee_estimator); - res.push(details); - } - for (_channel_id, channel) in peer_state.inbound_v1_channel_by_id.iter() { - let details = ChannelDetails::from_channel_context(&channel.context, best_block_height, - peer_state.latest_features.clone(), &self.fee_estimator); - res.push(details); - } - for (_channel_id, channel) in peer_state.outbound_v1_channel_by_id.iter() { - let details = ChannelDetails::from_channel_context(&channel.context, best_block_height, + for context in peer_state.channel_by_id.iter().map(|(_, phase)| phase.context()) { + let details = ChannelDetails::from_channel_context(context, best_block_height, peer_state.latest_features.clone(), &self.fee_estimator); res.push(details); } @@ -2390,15 +2453,13 @@ where let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; let features = &peer_state.latest_features; - let chan_context_to_details = |context| { + let context_to_details = |context| { ChannelDetails::from_channel_context(context, best_block_height, features.clone(), &self.fee_estimator) }; return peer_state.channel_by_id .iter() - .map(|(_, channel)| &channel.context) - .chain(peer_state.outbound_v1_channel_by_id.iter().map(|(_, channel)| &channel.context)) - .chain(peer_state.inbound_v1_channel_by_id.iter().map(|(_, channel)| &channel.context)) - .map(chan_context_to_details) + .map(|(_, phase)| phase.context()) + .map(context_to_details) .collect(); } vec![] @@ -2414,18 +2475,26 @@ where /// [`Event::PaymentSent`]: events::Event::PaymentSent pub fn list_recent_payments(&self) -> Vec { self.pending_outbound_payments.pending_outbound_payments.lock().unwrap().iter() - .filter_map(|(_, pending_outbound_payment)| match pending_outbound_payment { + .filter_map(|(payment_id, pending_outbound_payment)| match pending_outbound_payment { + PendingOutboundPayment::AwaitingInvoice { .. } => { + Some(RecentPaymentDetails::AwaitingInvoice { payment_id: *payment_id }) + }, + // InvoiceReceived is an intermediate state and doesn't need to be exposed + PendingOutboundPayment::InvoiceReceived { .. } => { + Some(RecentPaymentDetails::AwaitingInvoice { payment_id: *payment_id }) + }, PendingOutboundPayment::Retryable { payment_hash, total_msat, .. } => { Some(RecentPaymentDetails::Pending { + payment_id: *payment_id, payment_hash: *payment_hash, total_msat: *total_msat, }) }, PendingOutboundPayment::Abandoned { payment_hash, .. } => { - Some(RecentPaymentDetails::Abandoned { payment_hash: *payment_hash }) + Some(RecentPaymentDetails::Abandoned { payment_id: *payment_id, payment_hash: *payment_hash }) }, PendingOutboundPayment::Fulfilled { payment_hash, .. } => { - Some(RecentPaymentDetails::Fulfilled { payment_hash: *payment_hash }) + Some(RecentPaymentDetails::Fulfilled { payment_id: *payment_id, payment_hash: *payment_hash }) }, PendingOutboundPayment::Legacy { .. } => None }) @@ -2467,37 +2536,40 @@ where let peer_state = &mut *peer_state_lock; match peer_state.channel_by_id.entry(channel_id.clone()) { - hash_map::Entry::Occupied(mut chan_entry) => { - let funding_txo_opt = chan_entry.get().context.get_funding_txo(); - let their_features = &peer_state.latest_features; - let (shutdown_msg, mut monitor_update_opt, htlcs) = chan_entry.get_mut() - .get_shutdown(&self.signer_provider, their_features, target_feerate_sats_per_1000_weight, override_shutdown_script)?; - failed_htlcs = htlcs; - - // We can send the `shutdown` message before updating the `ChannelMonitor` - // here as we don't need the monitor update to complete until we send a - // `shutdown_signed`, which we'll delay if we're pending a monitor update. - peer_state.pending_msg_events.push(events::MessageSendEvent::SendShutdown { - node_id: *counterparty_node_id, - msg: shutdown_msg, - }); + hash_map::Entry::Occupied(mut chan_phase_entry) => { + if let ChannelPhase::Funded(chan) = chan_phase_entry.get_mut() { + let funding_txo_opt = chan.context.get_funding_txo(); + let their_features = &peer_state.latest_features; + let (shutdown_msg, mut monitor_update_opt, htlcs) = + chan.get_shutdown(&self.signer_provider, their_features, target_feerate_sats_per_1000_weight, override_shutdown_script)?; + failed_htlcs = htlcs; + + // We can send the `shutdown` message before updating the `ChannelMonitor` + // here as we don't need the monitor update to complete until we send a + // `shutdown_signed`, which we'll delay if we're pending a monitor update. + peer_state.pending_msg_events.push(events::MessageSendEvent::SendShutdown { + node_id: *counterparty_node_id, + msg: shutdown_msg, + }); - // Update the monitor with the shutdown script if necessary. - if let Some(monitor_update) = monitor_update_opt.take() { - break handle_new_monitor_update!(self, funding_txo_opt.unwrap(), monitor_update, - peer_state_lock, peer_state, per_peer_state, chan_entry).map(|_| ()); - } + // Update the monitor with the shutdown script if necessary. + if let Some(monitor_update) = monitor_update_opt.take() { + break handle_new_monitor_update!(self, funding_txo_opt.unwrap(), monitor_update, + peer_state_lock, peer_state, per_peer_state, chan_phase_entry).map(|_| ()); + } - if chan_entry.get().is_shutdown() { - let channel = remove_channel!(self, chan_entry); - if let Ok(channel_update) = self.get_channel_update_for_broadcast(&channel) { - peer_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { - msg: channel_update - }); + if chan.is_shutdown() { + if let ChannelPhase::Funded(chan) = remove_channel_phase!(self, chan_phase_entry) { + if let Ok(channel_update) = self.get_channel_update_for_broadcast(&chan) { + peer_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { + msg: channel_update + }); + } + self.issue_channel_close_events(&chan.context, ClosureReason::HolderForceClosed); + } } - self.issue_channel_close_events(&channel.context, ClosureReason::HolderForceClosed); + break Ok(()); } - break Ok(()); }, hash_map::Entry::Vacant(_) => (), } @@ -2507,8 +2579,6 @@ where // // An appropriate error will be returned for non-existence of the channel if that's the case. return self.force_close_channel_with_peer(&channel_id, counterparty_node_id, None, false).map(|_| ()) - // TODO(dunxen): This is still not ideal as we're doing some extra lookups. - // Fix this with https://github.com/lightningdevkit/rust-lightning/issues/2422 }; for htlc_source in failed_htlcs.drain(..) { @@ -2616,26 +2686,21 @@ where } else { ClosureReason::HolderForceClosed }; - if let hash_map::Entry::Occupied(chan) = peer_state.channel_by_id.entry(channel_id.clone()) { - log_error!(self.logger, "Force-closing channel {}", &channel_id); - self.issue_channel_close_events(&chan.get().context, closure_reason); - let mut chan = remove_channel!(self, chan); - self.finish_force_close_channel(chan.context.force_shutdown(broadcast)); - (self.get_channel_update_for_broadcast(&chan).ok(), chan.context.get_counterparty_node_id()) - } else if let hash_map::Entry::Occupied(chan) = peer_state.outbound_v1_channel_by_id.entry(channel_id.clone()) { - log_error!(self.logger, "Force-closing channel {}", &channel_id); - self.issue_channel_close_events(&chan.get().context, closure_reason); - let mut chan = remove_channel!(self, chan); - self.finish_force_close_channel(chan.context.force_shutdown(false)); - // Unfunded channel has no update - (None, chan.context.get_counterparty_node_id()) - } else if let hash_map::Entry::Occupied(chan) = peer_state.inbound_v1_channel_by_id.entry(channel_id.clone()) { - log_error!(self.logger, "Force-closing channel {}", &channel_id); - self.issue_channel_close_events(&chan.get().context, closure_reason); - let mut chan = remove_channel!(self, chan); - self.finish_force_close_channel(chan.context.force_shutdown(false)); - // Unfunded channel has no update - (None, chan.context.get_counterparty_node_id()) + if let hash_map::Entry::Occupied(chan_phase_entry) = peer_state.channel_by_id.entry(channel_id.clone()) { + log_error!(self.logger, "Force-closing channel {}", channel_id); + self.issue_channel_close_events(&chan_phase_entry.get().context(), closure_reason); + let mut chan_phase = remove_channel_phase!(self, chan_phase_entry); + match chan_phase { + ChannelPhase::Funded(mut chan) => { + self.finish_force_close_channel(chan.context.force_shutdown(broadcast)); + (self.get_channel_update_for_broadcast(&chan).ok(), chan.context.get_counterparty_node_id()) + }, + ChannelPhase::UnfundedOutboundV1(_) | ChannelPhase::UnfundedInboundV1(_) => { + self.finish_force_close_channel(chan_phase.context_mut().force_shutdown(false)); + // Unfunded channel has no update + (None, chan_phase.context().get_counterparty_node_id()) + }, + } } else if peer_state.inbound_channel_request_by_id.remove(channel_id).is_some() { log_error!(self.logger, "Force-closing channel {}", &channel_id); // N.B. that we don't send any channel close event here: we @@ -2730,7 +2795,7 @@ where let (short_channel_id, amt_to_forward, outgoing_cltv_value) = match hop_data { msgs::InboundOnionPayload::Forward { short_channel_id, amt_to_forward, outgoing_cltv_value } => (short_channel_id, amt_to_forward, outgoing_cltv_value), - msgs::InboundOnionPayload::Receive { .. } => + msgs::InboundOnionPayload::Receive { .. } | msgs::InboundOnionPayload::BlindedReceive { .. } => return Err(InboundOnionErr { msg: "Final Node OnionHopData provided for us as an intermediary node", err_code: 0x4000 | 22, @@ -2762,12 +2827,19 @@ where payment_data, keysend_preimage, custom_tlvs, amt_msat, outgoing_cltv_value, payment_metadata, .. } => (payment_data, keysend_preimage, custom_tlvs, amt_msat, outgoing_cltv_value, payment_metadata), - _ => + msgs::InboundOnionPayload::BlindedReceive { + amt_msat, total_msat, outgoing_cltv_value, payment_secret, .. + } => { + let payment_data = msgs::FinalOnionHopData { payment_secret, total_msat }; + (Some(payment_data), None, Vec::new(), amt_msat, outgoing_cltv_value, None) + } + msgs::InboundOnionPayload::Forward { .. } => { return Err(InboundOnionErr { err_code: 0x4000|22, err_data: Vec::new(), msg: "Got non final data with an HMAC of 0", - }), + }) + }, }; // final_incorrect_cltv_expiry if outgoing_cltv_value > cltv_expiry { @@ -2907,7 +2979,10 @@ where } } - let next_hop = match onion_utils::decode_next_payment_hop(shared_secret, &msg.onion_routing_packet.hop_data[..], msg.onion_routing_packet.hmac, msg.payment_hash) { + let next_hop = match onion_utils::decode_next_payment_hop( + shared_secret, &msg.onion_routing_packet.hop_data[..], msg.onion_routing_packet.hmac, + msg.payment_hash, &self.node_signer + ) { Ok(res) => res, Err(onion_utils::OnionDecodeErr::Malformed { err_msg, err_code }) => { return_malformed_err!(err_msg, err_code); @@ -2929,7 +3004,9 @@ where // We'll do receive checks in [`Self::construct_pending_htlc_info`] so we have access to the // inbound channel's state. onion_utils::Hop::Receive { .. } => return Ok((next_hop, shared_secret, None)), - onion_utils::Hop::Forward { next_hop_data: msgs::InboundOnionPayload::Receive { .. }, .. } => { + onion_utils::Hop::Forward { next_hop_data: msgs::InboundOnionPayload::Receive { .. }, .. } | + onion_utils::Hop::Forward { next_hop_data: msgs::InboundOnionPayload::BlindedReceive { .. }, .. } => + { return_err!("Final Node OnionHopData provided for us as an intermediary node", 0x4000 | 22, &[0; 0]); } }; @@ -2961,7 +3038,9 @@ where } let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); let peer_state = &mut *peer_state_lock; - let chan = match peer_state.channel_by_id.get_mut(&forwarding_id) { + let chan = match peer_state.channel_by_id.get_mut(&forwarding_id).map( + |chan_phase| if let ChannelPhase::Funded(chan) = chan_phase { Some(chan) } else { None } + ).flatten() { None => { // Channel was removed. The short_to_chan_info and channel_by_id maps // have no consistency guarantees. @@ -3234,36 +3313,41 @@ where .ok_or_else(|| APIError::ChannelUnavailable{err: "No peer matching the path's first hop found!".to_owned() })?; let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; - if let hash_map::Entry::Occupied(mut chan) = peer_state.channel_by_id.entry(id) { - if !chan.get().context.is_live() { - return Err(APIError::ChannelUnavailable{err: "Peer for first hop currently disconnected".to_owned()}); - } - let funding_txo = chan.get().context.get_funding_txo().unwrap(); - let send_res = chan.get_mut().send_htlc_and_commit(htlc_msat, payment_hash.clone(), - htlc_cltv, HTLCSource::OutboundRoute { - path: path.clone(), - session_priv: session_priv.clone(), - first_hop_htlc_msat: htlc_msat, - payment_id, - }, onion_packet, None, &self.fee_estimator, &self.logger); - match break_chan_entry!(self, send_res, chan) { - Some(monitor_update) => { - match handle_new_monitor_update!(self, funding_txo, monitor_update, peer_state_lock, peer_state, per_peer_state, chan) { - Err(e) => break Err(e), - Ok(false) => { - // Note that MonitorUpdateInProgress here indicates (per function - // docs) that we will resend the commitment update once monitor - // updating completes. Therefore, we must return an error - // indicating that it is unsafe to retry the payment wholesale, - // which we do in the send_payment check for - // MonitorUpdateInProgress, below. - return Err(APIError::MonitorUpdateInProgress); + if let hash_map::Entry::Occupied(mut chan_phase_entry) = peer_state.channel_by_id.entry(id) { + match chan_phase_entry.get_mut() { + ChannelPhase::Funded(chan) => { + if !chan.context.is_live() { + return Err(APIError::ChannelUnavailable{err: "Peer for first hop currently disconnected".to_owned()}); + } + let funding_txo = chan.context.get_funding_txo().unwrap(); + let send_res = chan.send_htlc_and_commit(htlc_msat, payment_hash.clone(), + htlc_cltv, HTLCSource::OutboundRoute { + path: path.clone(), + session_priv: session_priv.clone(), + first_hop_htlc_msat: htlc_msat, + payment_id, + }, onion_packet, None, &self.fee_estimator, &self.logger); + match break_chan_phase_entry!(self, send_res, chan_phase_entry) { + Some(monitor_update) => { + match handle_new_monitor_update!(self, funding_txo, monitor_update, peer_state_lock, peer_state, per_peer_state, chan_phase_entry) { + Err(e) => break Err(e), + Ok(false) => { + // Note that MonitorUpdateInProgress here indicates (per function + // docs) that we will resend the commitment update once monitor + // updating completes. Therefore, we must return an error + // indicating that it is unsafe to retry the payment wholesale, + // which we do in the send_payment check for + // MonitorUpdateInProgress, below. + return Err(APIError::MonitorUpdateInProgress); + }, + Ok(true) => {}, + } }, - Ok(true) => {}, + None => {}, } }, - None => { }, - } + _ => return Err(APIError::ChannelUnavailable{err: "Channel to first hop is unfunded".to_owned()}), + }; } else { // The channel was likely removed after we fetched the id from the // `short_to_chan_info` map, but before we successfully locked the @@ -3376,10 +3460,12 @@ where } - /// Signals that no further retries for the given payment should occur. Useful if you have a + /// Signals that no further attempts for the given payment should occur. Useful if you have a /// pending outbound payment with retries remaining, but wish to stop retrying the payment before /// retries are exhausted. /// + /// # Event Generation + /// /// If no [`Event::PaymentFailed`] event had been generated before, one will be generated as soon /// as there are no remaining pending HTLCs for this payment. /// @@ -3387,11 +3473,19 @@ where /// wait until you receive either a [`Event::PaymentFailed`] or [`Event::PaymentSent`] event to /// determine the ultimate status of a payment. /// - /// If an [`Event::PaymentFailed`] event is generated and we restart without this - /// [`ChannelManager`] having been persisted, another [`Event::PaymentFailed`] may be generated. + /// # Requested Invoices /// - /// [`Event::PaymentFailed`]: events::Event::PaymentFailed - /// [`Event::PaymentSent`]: events::Event::PaymentSent + /// In the case of paying a [`Bolt12Invoice`], abandoning the payment prior to receiving the + /// invoice will result in an [`Event::InvoiceRequestFailed`] and prevent any attempts at paying + /// it once received. The other events may only be generated once the invoice has been received. + /// + /// # Restart Behavior + /// + /// If an [`Event::PaymentFailed`] is generated and we restart without first persisting the + /// [`ChannelManager`], another [`Event::PaymentFailed`] may be generated; likewise for + /// [`Event::InvoiceRequestFailed`]. + /// + /// [`Bolt12Invoice`]: crate::offers::invoice::Bolt12Invoice pub fn abandon_payment(&self, payment_id: PaymentId) { let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); self.pending_outbound_payments.abandon_payment(payment_id, PaymentFailureReason::UserAbandoned, &self.pending_events); @@ -3463,8 +3557,8 @@ where let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; - let (chan, msg) = match peer_state.outbound_v1_channel_by_id.remove(&temporary_channel_id) { - Some(chan) => { + let (chan, msg) = match peer_state.channel_by_id.remove(temporary_channel_id) { + Some(ChannelPhase::UnfundedOutboundV1(chan)) => { let funding_txo = find_funding_output(&chan, &funding_transaction)?; let funding_res = chan.get_funding_created(funding_transaction, funding_txo, &self.logger) @@ -3488,13 +3582,18 @@ where }, } }, - None => { - return Err(APIError::ChannelUnavailable { + Some(phase) => { + peer_state.channel_by_id.insert(*temporary_channel_id, phase); + return Err(APIError::APIMisuseError { err: format!( - "Channel with id {} not found for the passed counterparty node_id {}", + "Channel with id {} for the passed counterparty node_id {} is not an unfunded, outbound V1 channel", temporary_channel_id, counterparty_node_id), }) }, + None => return Err(APIError::ChannelUnavailable {err: format!( + "Channel with id {} not found for the passed counterparty node_id {}", + temporary_channel_id, counterparty_node_id), + }), }; peer_state.pending_msg_events.push(events::MessageSendEvent::SendFundingCreated { @@ -3510,7 +3609,7 @@ where if id_to_peer.insert(chan.context.channel_id(), chan.context.get_counterparty_node_id()).is_some() { panic!("id_to_peer map already contained funding txid, which shouldn't be possible"); } - e.insert(chan); + e.insert(ChannelPhase::Funded(chan)); } } Ok(()) @@ -3556,11 +3655,13 @@ where pub fn funding_transaction_generated(&self, temporary_channel_id: &ChannelId, counterparty_node_id: &PublicKey, funding_transaction: Transaction) -> Result<(), APIError> { let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); - for inp in funding_transaction.input.iter() { - if inp.witness.is_empty() { - return Err(APIError::APIMisuseError { - err: "Funding transaction must be fully signed and spend Segwit outputs".to_owned() - }); + if !funding_transaction.is_coin_base() { + for inp in funding_transaction.input.iter() { + if inp.witness.is_empty() { + return Err(APIError::APIMisuseError { + err: "Funding transaction must be fully signed and spend Segwit outputs".to_owned() + }); + } } } { @@ -3648,27 +3749,23 @@ where }; } for channel_id in channel_ids { - if let Some(channel) = peer_state.channel_by_id.get_mut(channel_id) { - let mut config = channel.context.config(); + if let Some(channel_phase) = peer_state.channel_by_id.get_mut(channel_id) { + let mut config = channel_phase.context().config(); config.apply(config_update); - if !channel.context.update_config(&config) { + if !channel_phase.context_mut().update_config(&config) { continue; } - if let Ok(msg) = self.get_channel_update_for_broadcast(channel) { - peer_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { msg }); - } else if let Ok(msg) = self.get_channel_update_for_unicast(channel) { - peer_state.pending_msg_events.push(events::MessageSendEvent::SendChannelUpdate { - node_id: channel.context.get_counterparty_node_id(), - msg, - }); + if let ChannelPhase::Funded(channel) = channel_phase { + if let Ok(msg) = self.get_channel_update_for_broadcast(channel) { + peer_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { msg }); + } else if let Ok(msg) = self.get_channel_update_for_unicast(channel) { + peer_state.pending_msg_events.push(events::MessageSendEvent::SendChannelUpdate { + node_id: channel.context.get_counterparty_node_id(), + msg, + }); + } } continue; - } - - let context = if let Some(channel) = peer_state.inbound_v1_channel_by_id.get_mut(channel_id) { - &mut channel.context - } else if let Some(channel) = peer_state.outbound_v1_channel_by_id.get_mut(channel_id) { - &mut channel.context } else { // This should not be reachable as we've already checked for non-existence in the previous channel_id loop. debug_assert!(false); @@ -3678,11 +3775,6 @@ where channel_id, counterparty_node_id), }); }; - let mut config = context.config(); - config.apply(config_update); - // We update the config, but we MUST NOT broadcast a `channel_update` before `channel_ready` - // which would be the case for pending inbound/outbound channels. - context.update_config(&config); } Ok(()) } @@ -3749,8 +3841,8 @@ where .ok_or_else(|| APIError::ChannelUnavailable { err: format!("Can't find a peer matching the passed counterparty node_id {}", next_node_id) })?; let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; - match peer_state.channel_by_id.get(&next_hop_channel_id) { - Some(chan) => { + match peer_state.channel_by_id.get(next_hop_channel_id) { + Some(ChannelPhase::Funded(chan)) => { if !chan.context.is_usable() { return Err(APIError::ChannelUnavailable { err: format!("Channel with id {} not fully established", next_hop_channel_id) @@ -3758,8 +3850,12 @@ where } chan.context.get_short_channel_id().unwrap_or(chan.context.outbound_scid_alias()) }, + Some(_) => return Err(APIError::ChannelUnavailable { + err: format!("Channel with id {} for the passed counterparty node_id {} is still opening.", + next_hop_channel_id, next_node_id) + }), None => return Err(APIError::ChannelUnavailable { - err: format!("Funded channel with id {} not found for the passed counterparty node_id {}. Channel may still be opening.", + err: format!("Channel with id {} not found for the passed counterparty node_id {}.", next_hop_channel_id, next_node_id) }) } @@ -3897,7 +3993,10 @@ where let phantom_pubkey_res = self.node_signer.get_node_id(Recipient::PhantomNode); if phantom_pubkey_res.is_ok() && fake_scid::is_valid_phantom(&self.fake_scid_rand_bytes, short_chan_id, &self.genesis_hash) { let phantom_shared_secret = self.node_signer.ecdh(Recipient::PhantomNode, &onion_packet.public_key.unwrap(), None).unwrap().secret_bytes(); - let next_hop = match onion_utils::decode_next_payment_hop(phantom_shared_secret, &onion_packet.hop_data, onion_packet.hmac, payment_hash) { + let next_hop = match onion_utils::decode_next_payment_hop( + phantom_shared_secret, &onion_packet.hop_data, onion_packet.hmac, + payment_hash, &self.node_signer + ) { Ok(res) => res, Err(onion_utils::OnionDecodeErr::Malformed { err_msg, err_code }) => { let sha256_of_onion = Sha256::hash(&onion_packet.hop_data).into_inner(); @@ -3955,71 +4054,68 @@ where } let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); let peer_state = &mut *peer_state_lock; - match peer_state.channel_by_id.entry(forward_chan_id) { - hash_map::Entry::Vacant(_) => { - forwarding_channel_not_found!(); - continue; - }, - hash_map::Entry::Occupied(mut chan) => { - for forward_info in pending_forwards.drain(..) { - match forward_info { - HTLCForwardInfo::AddHTLC(PendingAddHTLCInfo { - prev_short_channel_id, prev_htlc_id, prev_funding_outpoint, prev_user_channel_id, - forward_info: PendingHTLCInfo { - incoming_shared_secret, payment_hash, outgoing_amt_msat, outgoing_cltv_value, - routing: PendingHTLCRouting::Forward { onion_packet, .. }, skimmed_fee_msat, .. - }, - }) => { - log_trace!(self.logger, "Adding HTLC from short id {} with payment_hash {} to channel with short id {} after delay", prev_short_channel_id, &payment_hash, short_chan_id); - let htlc_source = HTLCSource::PreviousHopData(HTLCPreviousHopData { - short_channel_id: prev_short_channel_id, - user_channel_id: Some(prev_user_channel_id), - outpoint: prev_funding_outpoint, - htlc_id: prev_htlc_id, - incoming_packet_shared_secret: incoming_shared_secret, - // Phantom payments are only PendingHTLCRouting::Receive. - phantom_shared_secret: None, - }); - if let Err(e) = chan.get_mut().queue_add_htlc(outgoing_amt_msat, - payment_hash, outgoing_cltv_value, htlc_source.clone(), - onion_packet, skimmed_fee_msat, &self.fee_estimator, - &self.logger) - { - if let ChannelError::Ignore(msg) = e { - log_trace!(self.logger, "Failed to forward HTLC with payment_hash {}: {}", &payment_hash, msg); - } else { - panic!("Stated return value requirements in send_htlc() were not met"); - } - let (failure_code, data) = self.get_htlc_temp_fail_err_and_data(0x1000|7, short_chan_id, chan.get()); - failed_forwards.push((htlc_source, payment_hash, - HTLCFailReason::reason(failure_code, data), - HTLCDestination::NextHopChannel { node_id: Some(chan.get().context.get_counterparty_node_id()), channel_id: forward_chan_id } - )); - continue; - } - }, - HTLCForwardInfo::AddHTLC { .. } => { - panic!("short_channel_id != 0 should imply any pending_forward entries are of type Forward"); + if let Some(ChannelPhase::Funded(ref mut chan)) = peer_state.channel_by_id.get_mut(&forward_chan_id) { + for forward_info in pending_forwards.drain(..) { + match forward_info { + HTLCForwardInfo::AddHTLC(PendingAddHTLCInfo { + prev_short_channel_id, prev_htlc_id, prev_funding_outpoint, prev_user_channel_id, + forward_info: PendingHTLCInfo { + incoming_shared_secret, payment_hash, outgoing_amt_msat, outgoing_cltv_value, + routing: PendingHTLCRouting::Forward { onion_packet, .. }, skimmed_fee_msat, .. }, - HTLCForwardInfo::FailHTLC { htlc_id, err_packet } => { - log_trace!(self.logger, "Failing HTLC back to channel with short id {} (backward HTLC ID {}) after delay", short_chan_id, htlc_id); - if let Err(e) = chan.get_mut().queue_fail_htlc( - htlc_id, err_packet, &self.logger - ) { - if let ChannelError::Ignore(msg) = e { - log_trace!(self.logger, "Failed to fail HTLC with ID {} backwards to short_id {}: {}", htlc_id, short_chan_id, msg); - } else { - panic!("Stated return value requirements in queue_fail_htlc() were not met"); - } - // fail-backs are best-effort, we probably already have one - // pending, and if not that's OK, if not, the channel is on - // the chain and sending the HTLC-Timeout is their problem. - continue; + }) => { + log_trace!(self.logger, "Adding HTLC from short id {} with payment_hash {} to channel with short id {} after delay", prev_short_channel_id, &payment_hash, short_chan_id); + let htlc_source = HTLCSource::PreviousHopData(HTLCPreviousHopData { + short_channel_id: prev_short_channel_id, + user_channel_id: Some(prev_user_channel_id), + outpoint: prev_funding_outpoint, + htlc_id: prev_htlc_id, + incoming_packet_shared_secret: incoming_shared_secret, + // Phantom payments are only PendingHTLCRouting::Receive. + phantom_shared_secret: None, + }); + if let Err(e) = chan.queue_add_htlc(outgoing_amt_msat, + payment_hash, outgoing_cltv_value, htlc_source.clone(), + onion_packet, skimmed_fee_msat, &self.fee_estimator, + &self.logger) + { + if let ChannelError::Ignore(msg) = e { + log_trace!(self.logger, "Failed to forward HTLC with payment_hash {}: {}", &payment_hash, msg); + } else { + panic!("Stated return value requirements in send_htlc() were not met"); } - }, - } + let (failure_code, data) = self.get_htlc_temp_fail_err_and_data(0x1000|7, short_chan_id, chan); + failed_forwards.push((htlc_source, payment_hash, + HTLCFailReason::reason(failure_code, data), + HTLCDestination::NextHopChannel { node_id: Some(chan.context.get_counterparty_node_id()), channel_id: forward_chan_id } + )); + continue; + } + }, + HTLCForwardInfo::AddHTLC { .. } => { + panic!("short_channel_id != 0 should imply any pending_forward entries are of type Forward"); + }, + HTLCForwardInfo::FailHTLC { htlc_id, err_packet } => { + log_trace!(self.logger, "Failing HTLC back to channel with short id {} (backward HTLC ID {}) after delay", short_chan_id, htlc_id); + if let Err(e) = chan.queue_fail_htlc( + htlc_id, err_packet, &self.logger + ) { + if let ChannelError::Ignore(msg) = e { + log_trace!(self.logger, "Failed to fail HTLC with ID {} backwards to short_id {}: {}", htlc_id, short_chan_id, msg); + } else { + panic!("Stated return value requirements in queue_fail_htlc() were not met"); + } + // fail-backs are best-effort, we probably already have one + // pending, and if not that's OK, if not, the channel is on + // the chain and sending the HTLC-Timeout is their problem. + continue; + } + }, } } + } else { + forwarding_channel_not_found!(); + continue; } } else { 'next_forwardable_htlc: for forward_info in pending_forwards.drain(..) { @@ -4304,7 +4400,7 @@ where let mut background_events = Vec::new(); mem::swap(&mut *self.pending_background_events.lock().unwrap(), &mut background_events); if background_events.is_empty() { - return NotifyOption::SkipPersist; + return NotifyOption::SkipPersistNoEvents; } for event in background_events.drain(..) { @@ -4322,10 +4418,10 @@ where let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; match peer_state.channel_by_id.entry(funding_txo.to_channel_id()) { - hash_map::Entry::Occupied(mut chan) => { + hash_map::Entry::Occupied(mut chan_phase) => { updated_chan = true; handle_new_monitor_update!(self, funding_txo, update.clone(), - peer_state_lock, peer_state, per_peer_state, chan).map(|_| ()) + peer_state_lock, peer_state, per_peer_state, chan_phase).map(|_| ()) }, hash_map::Entry::Vacant(_) => Ok(()), } @@ -4349,7 +4445,7 @@ where if let Some(peer_state_mutex) = per_peer_state.get(&counterparty_node_id) { let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; - if let Some(chan) = peer_state.channel_by_id.get_mut(&channel_id) { + if let Some(ChannelPhase::Funded(chan)) = peer_state.channel_by_id.get_mut(&channel_id) { handle_monitor_update_completion!(self, peer_state_lock, peer_state, per_peer_state, chan); } else { let update_actions = peer_state.monitor_update_blocked_actions @@ -4373,17 +4469,17 @@ where } fn update_channel_fee(&self, chan_id: &ChannelId, chan: &mut Channel, new_feerate: u32) -> NotifyOption { - if !chan.context.is_outbound() { return NotifyOption::SkipPersist; } + if !chan.context.is_outbound() { return NotifyOption::SkipPersistNoEvents; } // If the feerate has decreased by less than half, don't bother if new_feerate <= chan.context.get_feerate_sat_per_1000_weight() && new_feerate * 2 > chan.context.get_feerate_sat_per_1000_weight() { log_trace!(self.logger, "Channel {} does not qualify for a feerate change from {} to {}.", - &chan_id, chan.context.get_feerate_sat_per_1000_weight(), new_feerate); - return NotifyOption::SkipPersist; + chan_id, chan.context.get_feerate_sat_per_1000_weight(), new_feerate); + return NotifyOption::SkipPersistNoEvents; } if !chan.context.is_live() { log_trace!(self.logger, "Channel {} does not qualify for a feerate change from {} to {} as it cannot currently be updated (probably the peer is disconnected).", - &chan_id, chan.context.get_feerate_sat_per_1000_weight(), new_feerate); - return NotifyOption::SkipPersist; + chan_id, chan.context.get_feerate_sat_per_1000_weight(), new_feerate); + return NotifyOption::SkipPersistNoEvents; } log_trace!(self.logger, "Channel {} qualifies for a feerate change from {} to {}.", &chan_id, chan.context.get_feerate_sat_per_1000_weight(), new_feerate); @@ -4398,8 +4494,8 @@ where /// these a fuzz failure (as they usually indicate a channel force-close, which is exactly what /// it wants to detect). Thus, we have a variant exposed here for its benefit. pub fn maybe_update_chan_fees(&self) { - PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, &self.persistence_notifier, || { - let mut should_persist = self.process_background_events(); + PersistenceNotifierGuard::optionally_notify(self, || { + let mut should_persist = NotifyOption::SkipPersistNoEvents; let normal_feerate = self.fee_estimator.bounded_sat_per_1000_weight(ConfirmationTarget::Normal); let min_mempool_feerate = self.fee_estimator.bounded_sat_per_1000_weight(ConfirmationTarget::MempoolMinimum); @@ -4408,7 +4504,9 @@ where for (_cp_id, peer_state_mutex) in per_peer_state.iter() { let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; - for (chan_id, chan) in peer_state.channel_by_id.iter_mut() { + for (chan_id, chan) in peer_state.channel_by_id.iter_mut().filter_map( + |(chan_id, phase)| if let ChannelPhase::Funded(chan) = phase { Some((chan_id, chan)) } else { None } + ) { let new_feerate = if chan.context.get_channel_type().supports_anchors_zero_fee_htlc_tx() { min_mempool_feerate } else { @@ -4441,8 +4539,8 @@ where /// [`ChannelUpdate`]: msgs::ChannelUpdate /// [`ChannelConfig`]: crate::util::config::ChannelConfig pub fn timer_tick_occurred(&self) { - PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, &self.persistence_notifier, || { - let mut should_persist = self.process_background_events(); + PersistenceNotifierGuard::optionally_notify(self, || { + let mut should_persist = NotifyOption::SkipPersistNoEvents; let normal_feerate = self.fee_estimator.bounded_sat_per_1000_weight(ConfirmationTarget::Normal); let min_mempool_feerate = self.fee_estimator.bounded_sat_per_1000_weight(ConfirmationTarget::MempoolMinimum); @@ -4450,6 +4548,36 @@ where let mut handle_errors: Vec<(Result<(), _>, _)> = Vec::new(); let mut timed_out_mpp_htlcs = Vec::new(); let mut pending_peers_awaiting_removal = Vec::new(); + + let process_unfunded_channel_tick = | + chan_id: &ChannelId, + context: &mut ChannelContext, + unfunded_context: &mut UnfundedChannelContext, + pending_msg_events: &mut Vec, + counterparty_node_id: PublicKey, + | { + context.maybe_expire_prev_config(); + if unfunded_context.should_expire_unfunded_channel() { + log_error!(self.logger, + "Force-closing pending channel with ID {} for not establishing in a timely manner", chan_id); + update_maps_on_chan_removal!(self, &context); + self.issue_channel_close_events(&context, ClosureReason::HolderForceClosed); + self.finish_force_close_channel(context.force_shutdown(false)); + pending_msg_events.push(MessageSendEvent::HandleError { + node_id: counterparty_node_id, + action: msgs::ErrorAction::SendErrorMessage { + msg: msgs::ErrorMessage { + channel_id: *chan_id, + data: "Force-closing pending channel due to timeout awaiting establishment handshake".to_owned(), + }, + }, + }); + false + } else { + true + } + }; + { let per_peer_state = self.per_peer_state.read().unwrap(); for (counterparty_node_id, peer_state_mutex) in per_peer_state.iter() { @@ -4457,110 +4585,89 @@ where let peer_state = &mut *peer_state_lock; let pending_msg_events = &mut peer_state.pending_msg_events; let counterparty_node_id = *counterparty_node_id; - peer_state.channel_by_id.retain(|chan_id, chan| { - let new_feerate = if chan.context.get_channel_type().supports_anchors_zero_fee_htlc_tx() { - min_mempool_feerate - } else { - normal_feerate - }; - let chan_needs_persist = self.update_channel_fee(chan_id, chan, new_feerate); - if chan_needs_persist == NotifyOption::DoPersist { should_persist = NotifyOption::DoPersist; } - - if let Err(e) = chan.timer_check_closing_negotiation_progress() { - let (needs_close, err) = convert_chan_err!(self, e, chan, chan_id); - handle_errors.push((Err(err), counterparty_node_id)); - if needs_close { return false; } - } - - match chan.channel_update_status() { - ChannelUpdateStatus::Enabled if !chan.context.is_live() => chan.set_channel_update_status(ChannelUpdateStatus::DisabledStaged(0)), - ChannelUpdateStatus::Disabled if chan.context.is_live() => chan.set_channel_update_status(ChannelUpdateStatus::EnabledStaged(0)), - ChannelUpdateStatus::DisabledStaged(_) if chan.context.is_live() - => chan.set_channel_update_status(ChannelUpdateStatus::Enabled), - ChannelUpdateStatus::EnabledStaged(_) if !chan.context.is_live() - => chan.set_channel_update_status(ChannelUpdateStatus::Disabled), - ChannelUpdateStatus::DisabledStaged(mut n) if !chan.context.is_live() => { - n += 1; - if n >= DISABLE_GOSSIP_TICKS { - chan.set_channel_update_status(ChannelUpdateStatus::Disabled); - if let Ok(update) = self.get_channel_update_for_broadcast(&chan) { - pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { - msg: update - }); - } - should_persist = NotifyOption::DoPersist; - } else { - chan.set_channel_update_status(ChannelUpdateStatus::DisabledStaged(n)); - } - }, - ChannelUpdateStatus::EnabledStaged(mut n) if chan.context.is_live() => { - n += 1; - if n >= ENABLE_GOSSIP_TICKS { - chan.set_channel_update_status(ChannelUpdateStatus::Enabled); - if let Ok(update) = self.get_channel_update_for_broadcast(&chan) { - pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { - msg: update - }); - } - should_persist = NotifyOption::DoPersist; + peer_state.channel_by_id.retain(|chan_id, phase| { + match phase { + ChannelPhase::Funded(chan) => { + let new_feerate = if chan.context.get_channel_type().supports_anchors_zero_fee_htlc_tx() { + min_mempool_feerate } else { - chan.set_channel_update_status(ChannelUpdateStatus::EnabledStaged(n)); + normal_feerate + }; + let chan_needs_persist = self.update_channel_fee(chan_id, chan, new_feerate); + if chan_needs_persist == NotifyOption::DoPersist { should_persist = NotifyOption::DoPersist; } + + if let Err(e) = chan.timer_check_closing_negotiation_progress() { + let (needs_close, err) = convert_chan_phase_err!(self, e, chan, chan_id, FUNDED_CHANNEL); + handle_errors.push((Err(err), counterparty_node_id)); + if needs_close { return false; } } - }, - _ => {}, - } - chan.context.maybe_expire_prev_config(); - - if chan.should_disconnect_peer_awaiting_response() { - log_debug!(self.logger, "Disconnecting peer {} due to not making any progress on channel {}", - counterparty_node_id, chan_id); - pending_msg_events.push(MessageSendEvent::HandleError { - node_id: counterparty_node_id, - action: msgs::ErrorAction::DisconnectPeerWithWarning { - msg: msgs::WarningMessage { - channel_id: *chan_id, - data: "Disconnecting due to timeout awaiting response".to_owned(), + match chan.channel_update_status() { + ChannelUpdateStatus::Enabled if !chan.context.is_live() => chan.set_channel_update_status(ChannelUpdateStatus::DisabledStaged(0)), + ChannelUpdateStatus::Disabled if chan.context.is_live() => chan.set_channel_update_status(ChannelUpdateStatus::EnabledStaged(0)), + ChannelUpdateStatus::DisabledStaged(_) if chan.context.is_live() + => chan.set_channel_update_status(ChannelUpdateStatus::Enabled), + ChannelUpdateStatus::EnabledStaged(_) if !chan.context.is_live() + => chan.set_channel_update_status(ChannelUpdateStatus::Disabled), + ChannelUpdateStatus::DisabledStaged(mut n) if !chan.context.is_live() => { + n += 1; + if n >= DISABLE_GOSSIP_TICKS { + chan.set_channel_update_status(ChannelUpdateStatus::Disabled); + if let Ok(update) = self.get_channel_update_for_broadcast(&chan) { + pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { + msg: update + }); + } + should_persist = NotifyOption::DoPersist; + } else { + chan.set_channel_update_status(ChannelUpdateStatus::DisabledStaged(n)); + } }, - }, - }); - } + ChannelUpdateStatus::EnabledStaged(mut n) if chan.context.is_live() => { + n += 1; + if n >= ENABLE_GOSSIP_TICKS { + chan.set_channel_update_status(ChannelUpdateStatus::Enabled); + if let Ok(update) = self.get_channel_update_for_broadcast(&chan) { + pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { + msg: update + }); + } + should_persist = NotifyOption::DoPersist; + } else { + chan.set_channel_update_status(ChannelUpdateStatus::EnabledStaged(n)); + } + }, + _ => {}, + } - true - }); + chan.context.maybe_expire_prev_config(); + + if chan.should_disconnect_peer_awaiting_response() { + log_debug!(self.logger, "Disconnecting peer {} due to not making any progress on channel {}", + counterparty_node_id, chan_id); + pending_msg_events.push(MessageSendEvent::HandleError { + node_id: counterparty_node_id, + action: msgs::ErrorAction::DisconnectPeerWithWarning { + msg: msgs::WarningMessage { + channel_id: *chan_id, + data: "Disconnecting due to timeout awaiting response".to_owned(), + }, + }, + }); + } - let process_unfunded_channel_tick = | - chan_id: &ChannelId, - chan_context: &mut ChannelContext, - unfunded_chan_context: &mut UnfundedChannelContext, - pending_msg_events: &mut Vec, - | { - chan_context.maybe_expire_prev_config(); - if unfunded_chan_context.should_expire_unfunded_channel() { - log_error!(self.logger, - "Force-closing pending channel with ID {} for not establishing in a timely manner", - &chan_id); - update_maps_on_chan_removal!(self, &chan_context); - self.issue_channel_close_events(&chan_context, ClosureReason::HolderForceClosed); - self.finish_force_close_channel(chan_context.force_shutdown(false)); - pending_msg_events.push(MessageSendEvent::HandleError { - node_id: counterparty_node_id, - action: msgs::ErrorAction::SendErrorMessage { - msg: msgs::ErrorMessage { - channel_id: *chan_id, - data: "Force-closing pending channel due to timeout awaiting establishment handshake".to_owned(), - }, - }, - }); - false - } else { - true + true + }, + ChannelPhase::UnfundedInboundV1(chan) => { + process_unfunded_channel_tick(chan_id, &mut chan.context, &mut chan.unfunded_context, + pending_msg_events, counterparty_node_id) + }, + ChannelPhase::UnfundedOutboundV1(chan) => { + process_unfunded_channel_tick(chan_id, &mut chan.context, &mut chan.unfunded_context, + pending_msg_events, counterparty_node_id) + }, } - }; - peer_state.outbound_v1_channel_by_id.retain(|chan_id, chan| process_unfunded_channel_tick( - chan_id, &mut chan.context, &mut chan.unfunded_context, pending_msg_events)); - peer_state.inbound_v1_channel_by_id.retain(|chan_id, chan| process_unfunded_channel_tick( - chan_id, &mut chan.context, &mut chan.unfunded_context, pending_msg_events)); + }); for (chan_id, req) in peer_state.inbound_channel_request_by_id.iter_mut() { if { req.ticks_remaining -= 1 ; req.ticks_remaining } <= 0 { @@ -4648,7 +4755,7 @@ where let _ = handle_error!(self, err, counterparty_node_id); } - self.pending_outbound_payments.remove_stale_resolved_payments(&self.pending_events); + self.pending_outbound_payments.remove_stale_payments(&self.pending_events); // Technically we don't need to do this here, but if we have holding cell entries in a // channel that need freeing, it's better to do that here and block a background task @@ -4776,8 +4883,14 @@ where let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; match peer_state.channel_by_id.entry(channel_id) { - hash_map::Entry::Occupied(chan_entry) => { - self.get_htlc_inbound_temp_fail_err_and_data(0x1000|7, &chan_entry.get()) + hash_map::Entry::Occupied(chan_phase_entry) => { + if let ChannelPhase::Funded(chan) = chan_phase_entry.get() { + self.get_htlc_inbound_temp_fail_err_and_data(0x1000|7, &chan) + } else { + // We shouldn't be trying to fail holding cell HTLCs on an unfunded channel. + debug_assert!(false); + (0x4000|10, Vec::new()) + } }, hash_map::Entry::Vacant(_) => (0x4000|10, Vec::new()) } @@ -5036,36 +5149,38 @@ where if peer_state_opt.is_some() { let mut peer_state_lock = peer_state_opt.unwrap(); let peer_state = &mut *peer_state_lock; - if let hash_map::Entry::Occupied(mut chan) = peer_state.channel_by_id.entry(chan_id) { - let counterparty_node_id = chan.get().context.get_counterparty_node_id(); - let fulfill_res = chan.get_mut().get_update_fulfill_htlc_and_commit(prev_hop.htlc_id, payment_preimage, &self.logger); - - if let UpdateFulfillCommitFetch::NewClaim { htlc_value_msat, monitor_update } = fulfill_res { - if let Some(action) = completion_action(Some(htlc_value_msat)) { - log_trace!(self.logger, "Tracking monitor update completion action for channel {}: {:?}", - &chan_id, action); - peer_state.monitor_update_blocked_actions.entry(chan_id).or_insert(Vec::new()).push(action); - } - if !during_init { - let res = handle_new_monitor_update!(self, prev_hop.outpoint, monitor_update, peer_state_lock, - peer_state, per_peer_state, chan); - if let Err(e) = res { - // TODO: This is a *critical* error - we probably updated the outbound edge - // of the HTLC's monitor with a preimage. We should retry this monitor - // update over and over again until morale improves. - log_error!(self.logger, "Failed to update channel monitor with preimage {:?}", payment_preimage); - return Err((counterparty_node_id, e)); + if let hash_map::Entry::Occupied(mut chan_phase_entry) = peer_state.channel_by_id.entry(chan_id) { + if let ChannelPhase::Funded(chan) = chan_phase_entry.get_mut() { + let counterparty_node_id = chan.context.get_counterparty_node_id(); + let fulfill_res = chan.get_update_fulfill_htlc_and_commit(prev_hop.htlc_id, payment_preimage, &self.logger); + + if let UpdateFulfillCommitFetch::NewClaim { htlc_value_msat, monitor_update } = fulfill_res { + if let Some(action) = completion_action(Some(htlc_value_msat)) { + log_trace!(self.logger, "Tracking monitor update completion action for channel {}: {:?}", + chan_id, action); + peer_state.monitor_update_blocked_actions.entry(chan_id).or_insert(Vec::new()).push(action); + } + if !during_init { + let res = handle_new_monitor_update!(self, prev_hop.outpoint, monitor_update, peer_state_lock, + peer_state, per_peer_state, chan_phase_entry); + if let Err(e) = res { + // TODO: This is a *critical* error - we probably updated the outbound edge + // of the HTLC's monitor with a preimage. We should retry this monitor + // update over and over again until morale improves. + log_error!(self.logger, "Failed to update channel monitor with preimage {:?}", payment_preimage); + return Err((counterparty_node_id, e)); + } + } else { + // If we're running during init we cannot update a monitor directly - + // they probably haven't actually been loaded yet. Instead, push the + // monitor update as a background event. + self.pending_background_events.lock().unwrap().push( + BackgroundEvent::MonitorUpdateRegeneratedOnStartup { + counterparty_node_id, + funding_txo: prev_hop.outpoint, + update: monitor_update.clone(), + }); } - } else { - // If we're running during init we cannot update a monitor directly - - // they probably haven't actually been loaded yet. Instead, push the - // monitor update as a background event. - self.pending_background_events.lock().unwrap().push( - BackgroundEvent::MonitorUpdateRegeneratedOnStartup { - counterparty_node_id, - funding_txo: prev_hop.outpoint, - update: monitor_update.clone(), - }); } } return Ok(()); @@ -5120,11 +5235,17 @@ where self.pending_outbound_payments.finalize_claims(sources, &self.pending_events); } - fn claim_funds_internal(&self, source: HTLCSource, payment_preimage: PaymentPreimage, forwarded_htlc_value_msat: Option, from_onchain: bool, next_channel_outpoint: OutPoint) { + fn claim_funds_internal(&self, source: HTLCSource, payment_preimage: PaymentPreimage, + forwarded_htlc_value_msat: Option, from_onchain: bool, + next_channel_counterparty_node_id: Option, next_channel_outpoint: OutPoint + ) { match source { HTLCSource::OutboundRoute { session_priv, payment_id, path, .. } => { debug_assert!(self.background_events_processed_since_startup.load(Ordering::Acquire), "We don't support claim_htlc claims during startup - monitors may not be available yet"); + if let Some(pubkey) = next_channel_counterparty_node_id { + debug_assert_eq!(pubkey, path.hops[0].pubkey); + } let ev_completion_action = EventCompletionAction::ReleaseRAAChannelMonitorUpdate { channel_funding_outpoint: next_channel_outpoint, counterparty_node_id: path.hops[0].pubkey, @@ -5135,6 +5256,7 @@ where }, HTLCSource::PreviousHopData(hop_data) => { let prev_outpoint = hop_data.outpoint; + let completed_blocker = RAAMonitorUpdateBlockingAction::from_prev_hop_data(&hop_data); let res = self.claim_funds_from_hop(hop_data, payment_preimage, |htlc_claim_value_msat| { if let Some(forwarded_htlc_value) = forwarded_htlc_value_msat { @@ -5150,7 +5272,17 @@ where next_channel_id: Some(next_channel_outpoint.to_channel_id()), outbound_amount_forwarded_msat: forwarded_htlc_value_msat, }, - downstream_counterparty_and_funding_outpoint: None, + downstream_counterparty_and_funding_outpoint: + if let Some(node_id) = next_channel_counterparty_node_id { + Some((node_id, next_channel_outpoint, completed_blocker)) + } else { + // We can only get `None` here if we are processing a + // `ChannelMonitor`-originated event, in which case we + // don't care about ensuring we wake the downstream + // channel's monitor updating - the channel is already + // closed. + None + }, }) } else { None } }); @@ -5298,7 +5430,7 @@ where peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); let peer_state = &mut *peer_state_lock; let channel = - if let Some(chan) = peer_state.channel_by_id.get_mut(&funding_txo.to_channel_id()) { + if let Some(ChannelPhase::Funded(chan)) = peer_state.channel_by_id.get_mut(&funding_txo.to_channel_id()) { chan } else { let update_actions = peer_state.monitor_update_blocked_actions @@ -5428,7 +5560,7 @@ where msg: channel.accept_inbound_channel(), }); - peer_state.inbound_v1_channel_by_id.insert(temporary_channel_id.clone(), channel); + peer_state.channel_by_id.insert(temporary_channel_id.clone(), ChannelPhase::UnfundedInboundV1(channel)); Ok(()) } @@ -5460,24 +5592,34 @@ where peer: &PeerState, best_block_height: u32 ) -> usize { let mut num_unfunded_channels = 0; - for (_, chan) in peer.channel_by_id.iter() { - // This covers non-zero-conf inbound `Channel`s that we are currently monitoring, but those - // which have not yet had any confirmations on-chain. - if !chan.context.is_outbound() && chan.context.minimum_depth().unwrap_or(1) != 0 && - chan.context.get_funding_tx_confirmations(best_block_height) == 0 - { - num_unfunded_channels += 1; - } - } - for (_, chan) in peer.inbound_v1_channel_by_id.iter() { - if chan.context.minimum_depth().unwrap_or(1) != 0 { - num_unfunded_channels += 1; + for (_, phase) in peer.channel_by_id.iter() { + match phase { + ChannelPhase::Funded(chan) => { + // This covers non-zero-conf inbound `Channel`s that we are currently monitoring, but those + // which have not yet had any confirmations on-chain. + if !chan.context.is_outbound() && chan.context.minimum_depth().unwrap_or(1) != 0 && + chan.context.get_funding_tx_confirmations(best_block_height) == 0 + { + num_unfunded_channels += 1; + } + }, + ChannelPhase::UnfundedInboundV1(chan) => { + if chan.context.minimum_depth().unwrap_or(1) != 0 { + num_unfunded_channels += 1; + } + }, + ChannelPhase::UnfundedOutboundV1(_) => { + // Outbound channels don't contribute to the unfunded count in the DoS context. + continue; + } } } num_unfunded_channels + peer.inbound_channel_request_by_id.len() } fn internal_open_channel(&self, counterparty_node_id: &PublicKey, msg: &msgs::OpenChannel) -> Result<(), MsgHandleErrInternal> { + // Note that the ChannelManager is NOT re-persisted on disk after this, so any changes are + // likely to be lost on restart! if msg.chain_hash != self.genesis_hash { return Err(MsgHandleErrInternal::send_err_msg_no_close("Unknown genesis block hash".to_owned(), msg.temporary_channel_id.clone())); } @@ -5572,11 +5714,13 @@ where node_id: counterparty_node_id.clone(), msg: channel.accept_inbound_channel(), }); - peer_state.inbound_v1_channel_by_id.insert(channel_id, channel); + peer_state.channel_by_id.insert(channel_id, ChannelPhase::UnfundedInboundV1(channel)); Ok(()) } fn internal_accept_channel(&self, counterparty_node_id: &PublicKey, msg: &msgs::AcceptChannel) -> Result<(), MsgHandleErrInternal> { + // Note that the ChannelManager is NOT re-persisted on disk after this, so any changes are + // likely to be lost on restart! let (value, output_script, user_id) = { let per_peer_state = self.per_peer_state.read().unwrap(); let peer_state_mutex = per_peer_state.get(counterparty_node_id) @@ -5586,10 +5730,17 @@ where })?; let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; - match peer_state.outbound_v1_channel_by_id.entry(msg.temporary_channel_id) { - hash_map::Entry::Occupied(mut chan) => { - try_v1_outbound_chan_entry!(self, chan.get_mut().accept_channel(&msg, &self.default_configuration.channel_handshake_limits, &peer_state.latest_features), chan); - (chan.get().context.get_value_satoshis(), chan.get().context.get_funding_redeemscript().to_v0_p2wsh(), chan.get().context.get_user_id()) + match peer_state.channel_by_id.entry(msg.temporary_channel_id) { + hash_map::Entry::Occupied(mut phase) => { + match phase.get_mut() { + ChannelPhase::UnfundedOutboundV1(chan) => { + try_chan_phase_entry!(self, chan.accept_channel(&msg, &self.default_configuration.channel_handshake_limits, &peer_state.latest_features), phase); + (chan.context.get_value_satoshis(), chan.context.get_funding_redeemscript().to_v0_p2wsh(), chan.context.get_user_id()) + }, + _ => { + return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got an unexpected accept_channel message from peer with counterparty_node_id {}", counterparty_node_id), msg.temporary_channel_id)); + } + } }, hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.temporary_channel_id)) } @@ -5618,8 +5769,8 @@ where let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; let (chan, funding_msg, monitor) = - match peer_state.inbound_v1_channel_by_id.remove(&msg.temporary_channel_id) { - Some(inbound_chan) => { + match peer_state.channel_by_id.remove(&msg.temporary_channel_id) { + Some(ChannelPhase::UnfundedInboundV1(inbound_chan)) => { match inbound_chan.funding_created(msg, best_block, &self.signer_provider, &self.logger) { Ok(res) => res, Err((mut inbound_chan, err)) => { @@ -5634,6 +5785,9 @@ where }, } }, + Some(ChannelPhase::Funded(_)) | Some(ChannelPhase::UnfundedOutboundV1(_)) => { + return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got an unexpected funding_created message from peer with counterparty_node_id {}", counterparty_node_id), msg.temporary_channel_id)); + }, None => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.temporary_channel_id)) }; @@ -5665,22 +5819,25 @@ where let monitor_res = self.chain_monitor.watch_channel(monitor.get_funding_txo().0, monitor); - let chan = e.insert(chan); - let mut res = handle_new_monitor_update!(self, monitor_res, peer_state_lock, peer_state, - per_peer_state, chan, MANUALLY_REMOVING_INITIAL_MONITOR, - { peer_state.channel_by_id.remove(&new_channel_id) }); - - // Note that we reply with the new channel_id in error messages if we gave up on the - // channel, not the temporary_channel_id. This is compatible with ourselves, but the - // spec is somewhat ambiguous here. Not a huge deal since we'll send error messages for - // any messages referencing a previously-closed channel anyway. - // We do not propagate the monitor update to the user as it would be for a monitor - // that we didn't manage to store (and that we don't care about - we don't respond - // with the funding_signed so the channel can never go on chain). - if let Err(MsgHandleErrInternal { shutdown_finish: Some((res, _)), .. }) = &mut res { - res.0 = None; + if let ChannelPhase::Funded(chan) = e.insert(ChannelPhase::Funded(chan)) { + let mut res = handle_new_monitor_update!(self, monitor_res, peer_state_lock, peer_state, + per_peer_state, chan, MANUALLY_REMOVING_INITIAL_MONITOR, + { peer_state.channel_by_id.remove(&new_channel_id) }); + + // Note that we reply with the new channel_id in error messages if we gave up on the + // channel, not the temporary_channel_id. This is compatible with ourselves, but the + // spec is somewhat ambiguous here. Not a huge deal since we'll send error messages for + // any messages referencing a previously-closed channel anyway. + // We do not propagate the monitor update to the user as it would be for a monitor + // that we didn't manage to store (and that we don't care about - we don't respond + // with the funding_signed so the channel can never go on chain). + if let Err(MsgHandleErrInternal { shutdown_finish: Some((res, _)), .. }) = &mut res { + res.0 = None; + } + res.map(|_| ()) + } else { + unreachable!("This must be a funded channel as we just inserted it."); } - res.map(|_| ()) } } } @@ -5697,26 +5854,35 @@ where let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; match peer_state.channel_by_id.entry(msg.channel_id) { - hash_map::Entry::Occupied(mut chan) => { - let monitor = try_chan_entry!(self, - chan.get_mut().funding_signed(&msg, best_block, &self.signer_provider, &self.logger), chan); - let update_res = self.chain_monitor.watch_channel(chan.get().context.get_funding_txo().unwrap(), monitor); - let mut res = handle_new_monitor_update!(self, update_res, peer_state_lock, peer_state, per_peer_state, chan, INITIAL_MONITOR); - if let Err(MsgHandleErrInternal { ref mut shutdown_finish, .. }) = res { - // We weren't able to watch the channel to begin with, so no updates should be made on - // it. Previously, full_stack_target found an (unreachable) panic when the - // monitor update contained within `shutdown_finish` was applied. - if let Some((ref mut shutdown_finish, _)) = shutdown_finish { - shutdown_finish.0.take(); - } + hash_map::Entry::Occupied(mut chan_phase_entry) => { + match chan_phase_entry.get_mut() { + ChannelPhase::Funded(ref mut chan) => { + let monitor = try_chan_phase_entry!(self, + chan.funding_signed(&msg, best_block, &self.signer_provider, &self.logger), chan_phase_entry); + let update_res = self.chain_monitor.watch_channel(chan.context.get_funding_txo().unwrap(), monitor); + let mut res = handle_new_monitor_update!(self, update_res, peer_state_lock, peer_state, per_peer_state, chan_phase_entry, INITIAL_MONITOR); + if let Err(MsgHandleErrInternal { ref mut shutdown_finish, .. }) = res { + // We weren't able to watch the channel to begin with, so no updates should be made on + // it. Previously, full_stack_target found an (unreachable) panic when the + // monitor update contained within `shutdown_finish` was applied. + if let Some((ref mut shutdown_finish, _)) = shutdown_finish { + shutdown_finish.0.take(); + } + } + res.map(|_| ()) + }, + _ => { + return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel".to_owned(), msg.channel_id)); + }, } - res.map(|_| ()) }, hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel".to_owned(), msg.channel_id)) } } fn internal_channel_ready(&self, counterparty_node_id: &PublicKey, msg: &msgs::ChannelReady) -> Result<(), MsgHandleErrInternal> { + // Note that the ChannelManager is NOT re-persisted on disk after this (unless we error + // closing a channel), so any changes are likely to be lost on restart! let per_peer_state = self.per_peer_state.read().unwrap(); let peer_state_mutex = per_peer_state.get(counterparty_node_id) .ok_or_else(|| { @@ -5726,38 +5892,45 @@ where let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; match peer_state.channel_by_id.entry(msg.channel_id) { - hash_map::Entry::Occupied(mut chan) => { - let announcement_sigs_opt = try_chan_entry!(self, chan.get_mut().channel_ready(&msg, &self.node_signer, - self.genesis_hash.clone(), &self.default_configuration, &self.best_block.read().unwrap(), &self.logger), chan); - if let Some(announcement_sigs) = announcement_sigs_opt { - log_trace!(self.logger, "Sending announcement_signatures for channel {}", &chan.get().context.channel_id()); - peer_state.pending_msg_events.push(events::MessageSendEvent::SendAnnouncementSignatures { - node_id: counterparty_node_id.clone(), - msg: announcement_sigs, - }); - } else if chan.get().context.is_usable() { - // If we're sending an announcement_signatures, we'll send the (public) - // channel_update after sending a channel_announcement when we receive our - // counterparty's announcement_signatures. Thus, we only bother to send a - // channel_update here if the channel is not public, i.e. we're not sending an - // announcement_signatures. - log_trace!(self.logger, "Sending private initial channel_update for our counterparty on channel {}", &chan.get().context.channel_id()); - if let Ok(msg) = self.get_channel_update_for_unicast(chan.get()) { - peer_state.pending_msg_events.push(events::MessageSendEvent::SendChannelUpdate { + hash_map::Entry::Occupied(mut chan_phase_entry) => { + if let ChannelPhase::Funded(chan) = chan_phase_entry.get_mut() { + let announcement_sigs_opt = try_chan_phase_entry!(self, chan.channel_ready(&msg, &self.node_signer, + self.genesis_hash.clone(), &self.default_configuration, &self.best_block.read().unwrap(), &self.logger), chan_phase_entry); + if let Some(announcement_sigs) = announcement_sigs_opt { + log_trace!(self.logger, "Sending announcement_signatures for channel {}", chan.context.channel_id()); + peer_state.pending_msg_events.push(events::MessageSendEvent::SendAnnouncementSignatures { node_id: counterparty_node_id.clone(), - msg, + msg: announcement_sigs, }); + } else if chan.context.is_usable() { + // If we're sending an announcement_signatures, we'll send the (public) + // channel_update after sending a channel_announcement when we receive our + // counterparty's announcement_signatures. Thus, we only bother to send a + // channel_update here if the channel is not public, i.e. we're not sending an + // announcement_signatures. + log_trace!(self.logger, "Sending private initial channel_update for our counterparty on channel {}", chan.context.channel_id()); + if let Ok(msg) = self.get_channel_update_for_unicast(chan) { + peer_state.pending_msg_events.push(events::MessageSendEvent::SendChannelUpdate { + node_id: counterparty_node_id.clone(), + msg, + }); + } } - } - { - let mut pending_events = self.pending_events.lock().unwrap(); - emit_channel_ready_event!(pending_events, chan.get_mut()); - } + { + let mut pending_events = self.pending_events.lock().unwrap(); + emit_channel_ready_event!(pending_events, chan); + } - Ok(()) + Ok(()) + } else { + try_chan_phase_entry!(self, Err(ChannelError::Close( + "Got a channel_ready message for an unfunded channel!".into())), chan_phase_entry) + } }, - hash_map::Entry::Vacant(_) => Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id)) + hash_map::Entry::Vacant(_) => { + Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id)) + } } } @@ -5772,48 +5945,46 @@ where })?; let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; - // TODO(dunxen): Fix this duplication when we switch to a single map with enums as per - // https://github.com/lightningdevkit/rust-lightning/issues/2422 - if let hash_map::Entry::Occupied(chan_entry) = peer_state.outbound_v1_channel_by_id.entry(msg.channel_id.clone()) { - log_error!(self.logger, "Immediately closing unfunded channel {} as peer asked to cooperatively shut it down (which is unnecessary)", &msg.channel_id); - self.issue_channel_close_events(&chan_entry.get().context, ClosureReason::CounterpartyCoopClosedUnfundedChannel); - let mut chan = remove_channel!(self, chan_entry); - self.finish_force_close_channel(chan.context.force_shutdown(false)); - return Ok(()); - } else if let hash_map::Entry::Occupied(chan_entry) = peer_state.inbound_v1_channel_by_id.entry(msg.channel_id.clone()) { - log_error!(self.logger, "Immediately closing unfunded channel {} as peer asked to cooperatively shut it down (which is unnecessary)", &msg.channel_id); - self.issue_channel_close_events(&chan_entry.get().context, ClosureReason::CounterpartyCoopClosedUnfundedChannel); - let mut chan = remove_channel!(self, chan_entry); - self.finish_force_close_channel(chan.context.force_shutdown(false)); - return Ok(()); - } else if let hash_map::Entry::Occupied(mut chan_entry) = peer_state.channel_by_id.entry(msg.channel_id.clone()) { - if !chan_entry.get().received_shutdown() { - log_info!(self.logger, "Received a shutdown message from our counterparty for channel {}{}.", - &msg.channel_id, - if chan_entry.get().sent_shutdown() { " after we initiated shutdown" } else { "" }); - } + if let hash_map::Entry::Occupied(mut chan_phase_entry) = peer_state.channel_by_id.entry(msg.channel_id.clone()) { + let phase = chan_phase_entry.get_mut(); + match phase { + ChannelPhase::Funded(chan) => { + if !chan.received_shutdown() { + log_info!(self.logger, "Received a shutdown message from our counterparty for channel {}{}.", + msg.channel_id, + if chan.sent_shutdown() { " after we initiated shutdown" } else { "" }); + } - let funding_txo_opt = chan_entry.get().context.get_funding_txo(); - let (shutdown, monitor_update_opt, htlcs) = try_chan_entry!(self, - chan_entry.get_mut().shutdown(&self.signer_provider, &peer_state.latest_features, &msg), chan_entry); - dropped_htlcs = htlcs; - - if let Some(msg) = shutdown { - // We can send the `shutdown` message before updating the `ChannelMonitor` - // here as we don't need the monitor update to complete until we send a - // `shutdown_signed`, which we'll delay if we're pending a monitor update. - peer_state.pending_msg_events.push(events::MessageSendEvent::SendShutdown { - node_id: *counterparty_node_id, - msg, - }); - } + let funding_txo_opt = chan.context.get_funding_txo(); + let (shutdown, monitor_update_opt, htlcs) = try_chan_phase_entry!(self, + chan.shutdown(&self.signer_provider, &peer_state.latest_features, &msg), chan_phase_entry); + dropped_htlcs = htlcs; - // Update the monitor with the shutdown script if necessary. - if let Some(monitor_update) = monitor_update_opt { - break handle_new_monitor_update!(self, funding_txo_opt.unwrap(), monitor_update, - peer_state_lock, peer_state, per_peer_state, chan_entry).map(|_| ()); + if let Some(msg) = shutdown { + // We can send the `shutdown` message before updating the `ChannelMonitor` + // here as we don't need the monitor update to complete until we send a + // `shutdown_signed`, which we'll delay if we're pending a monitor update. + peer_state.pending_msg_events.push(events::MessageSendEvent::SendShutdown { + node_id: *counterparty_node_id, + msg, + }); + } + // Update the monitor with the shutdown script if necessary. + if let Some(monitor_update) = monitor_update_opt { + break handle_new_monitor_update!(self, funding_txo_opt.unwrap(), monitor_update, + peer_state_lock, peer_state, per_peer_state, chan_phase_entry).map(|_| ()); + } + break Ok(()); + }, + ChannelPhase::UnfundedInboundV1(_) | ChannelPhase::UnfundedOutboundV1(_) => { + let context = phase.context_mut(); + log_error!(self.logger, "Immediately closing unfunded channel {} as peer asked to cooperatively shut it down (which is unnecessary)", &msg.channel_id); + self.issue_channel_close_events(&context, ClosureReason::CounterpartyCoopClosedUnfundedChannel); + let mut chan = remove_channel_phase!(self, chan_phase_entry); + self.finish_force_close_channel(chan.context_mut().force_shutdown(false)); + return Ok(()); + }, } - break Ok(()); } else { return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id)) } @@ -5838,22 +6009,27 @@ where let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; match peer_state.channel_by_id.entry(msg.channel_id.clone()) { - hash_map::Entry::Occupied(mut chan_entry) => { - let (closing_signed, tx) = try_chan_entry!(self, chan_entry.get_mut().closing_signed(&self.fee_estimator, &msg), chan_entry); - if let Some(msg) = closing_signed { - peer_state.pending_msg_events.push(events::MessageSendEvent::SendClosingSigned { - node_id: counterparty_node_id.clone(), - msg, - }); + hash_map::Entry::Occupied(mut chan_phase_entry) => { + if let ChannelPhase::Funded(chan) = chan_phase_entry.get_mut() { + let (closing_signed, tx) = try_chan_phase_entry!(self, chan.closing_signed(&self.fee_estimator, &msg), chan_phase_entry); + if let Some(msg) = closing_signed { + peer_state.pending_msg_events.push(events::MessageSendEvent::SendClosingSigned { + node_id: counterparty_node_id.clone(), + msg, + }); + } + if tx.is_some() { + // We're done with this channel, we've got a signed closing transaction and + // will send the closing_signed back to the remote peer upon return. This + // also implies there are no pending HTLCs left on the channel, so we can + // fully delete it from tracking (the channel monitor is still around to + // watch for old state broadcasts)! + (tx, Some(remove_channel_phase!(self, chan_phase_entry))) + } else { (tx, None) } + } else { + return try_chan_phase_entry!(self, Err(ChannelError::Close( + "Got a closing_signed message for an unfunded channel!".into())), chan_phase_entry); } - if tx.is_some() { - // We're done with this channel, we've got a signed closing transaction and - // will send the closing_signed back to the remote peer upon return. This - // also implies there are no pending HTLCs left on the channel, so we can - // fully delete it from tracking (the channel monitor is still around to - // watch for old state broadcasts)! - (tx, Some(remove_channel!(self, chan_entry))) - } else { (tx, None) } }, hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id)) } @@ -5862,7 +6038,7 @@ where log_info!(self.logger, "Broadcasting {}", log_tx!(broadcast_tx)); self.tx_broadcaster.broadcast_transactions(&[&broadcast_tx]); } - if let Some(chan) = chan_option { + if let Some(ChannelPhase::Funded(chan)) = chan_option { if let Ok(update) = self.get_channel_update_for_broadcast(&chan) { let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; @@ -5885,6 +6061,9 @@ where //encrypted with the same key. It's not immediately obvious how to usefully exploit that, //but we should prevent it anyway. + // Note that the ChannelManager is NOT re-persisted on disk after this (unless we error + // closing a channel), so any changes are likely to be lost on restart! + let decoded_hop_res = self.decode_update_add_htlc_onion(msg); let per_peer_state = self.per_peer_state.read().unwrap(); let peer_state_mutex = per_peer_state.get(counterparty_node_id) @@ -5895,37 +6074,41 @@ where let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; match peer_state.channel_by_id.entry(msg.channel_id) { - hash_map::Entry::Occupied(mut chan) => { - - let pending_forward_info = match decoded_hop_res { - Ok((next_hop, shared_secret, next_packet_pk_opt)) => - self.construct_pending_htlc_status(msg, shared_secret, next_hop, - chan.get().context.config().accept_underpaying_htlcs, next_packet_pk_opt), - Err(e) => PendingHTLCStatus::Fail(e) - }; - let create_pending_htlc_status = |chan: &Channel, pending_forward_info: PendingHTLCStatus, error_code: u16| { - // If the update_add is completely bogus, the call will Err and we will close, - // but if we've sent a shutdown and they haven't acknowledged it yet, we just - // want to reject the new HTLC and fail it backwards instead of forwarding. - match pending_forward_info { - PendingHTLCStatus::Forward(PendingHTLCInfo { ref incoming_shared_secret, .. }) => { - let reason = if (error_code & 0x1000) != 0 { - let (real_code, error_data) = self.get_htlc_inbound_temp_fail_err_and_data(error_code, chan); - HTLCFailReason::reason(real_code, error_data) - } else { - HTLCFailReason::from_failure_code(error_code) - }.get_encrypted_failure_packet(incoming_shared_secret, &None); - let msg = msgs::UpdateFailHTLC { - channel_id: msg.channel_id, - htlc_id: msg.htlc_id, - reason - }; - PendingHTLCStatus::Fail(HTLCFailureMsg::Relay(msg)) - }, - _ => pending_forward_info - } - }; - try_chan_entry!(self, chan.get_mut().update_add_htlc(&msg, pending_forward_info, create_pending_htlc_status, &self.fee_estimator, &self.logger), chan); + hash_map::Entry::Occupied(mut chan_phase_entry) => { + if let ChannelPhase::Funded(chan) = chan_phase_entry.get_mut() { + let pending_forward_info = match decoded_hop_res { + Ok((next_hop, shared_secret, next_packet_pk_opt)) => + self.construct_pending_htlc_status(msg, shared_secret, next_hop, + chan.context.config().accept_underpaying_htlcs, next_packet_pk_opt), + Err(e) => PendingHTLCStatus::Fail(e) + }; + let create_pending_htlc_status = |chan: &Channel, pending_forward_info: PendingHTLCStatus, error_code: u16| { + // If the update_add is completely bogus, the call will Err and we will close, + // but if we've sent a shutdown and they haven't acknowledged it yet, we just + // want to reject the new HTLC and fail it backwards instead of forwarding. + match pending_forward_info { + PendingHTLCStatus::Forward(PendingHTLCInfo { ref incoming_shared_secret, .. }) => { + let reason = if (error_code & 0x1000) != 0 { + let (real_code, error_data) = self.get_htlc_inbound_temp_fail_err_and_data(error_code, chan); + HTLCFailReason::reason(real_code, error_data) + } else { + HTLCFailReason::from_failure_code(error_code) + }.get_encrypted_failure_packet(incoming_shared_secret, &None); + let msg = msgs::UpdateFailHTLC { + channel_id: msg.channel_id, + htlc_id: msg.htlc_id, + reason + }; + PendingHTLCStatus::Fail(HTLCFailureMsg::Relay(msg)) + }, + _ => pending_forward_info + } + }; + try_chan_phase_entry!(self, chan.update_add_htlc(&msg, pending_forward_info, create_pending_htlc_status, &self.fee_estimator, &self.logger), chan_phase_entry); + } else { + return try_chan_phase_entry!(self, Err(ChannelError::Close( + "Got an update_add_htlc message for an unfunded channel!".into())), chan_phase_entry); + } }, hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id)) } @@ -5944,19 +6127,37 @@ where let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; match peer_state.channel_by_id.entry(msg.channel_id) { - hash_map::Entry::Occupied(mut chan) => { - let res = try_chan_entry!(self, chan.get_mut().update_fulfill_htlc(&msg), chan); - funding_txo = chan.get().context.get_funding_txo().expect("We won't accept a fulfill until funded"); - res + hash_map::Entry::Occupied(mut chan_phase_entry) => { + if let ChannelPhase::Funded(chan) = chan_phase_entry.get_mut() { + let res = try_chan_phase_entry!(self, chan.update_fulfill_htlc(&msg), chan_phase_entry); + if let HTLCSource::PreviousHopData(prev_hop) = &res.0 { + peer_state.actions_blocking_raa_monitor_updates.entry(msg.channel_id) + .or_insert_with(Vec::new) + .push(RAAMonitorUpdateBlockingAction::from_prev_hop_data(&prev_hop)); + } + // Note that we do not need to push an `actions_blocking_raa_monitor_updates` + // entry here, even though we *do* need to block the next RAA monitor update. + // We do this instead in the `claim_funds_internal` by attaching a + // `ReleaseRAAChannelMonitorUpdate` action to the event generated when the + // outbound HTLC is claimed. This is guaranteed to all complete before we + // process the RAA as messages are processed from single peers serially. + funding_txo = chan.context.get_funding_txo().expect("We won't accept a fulfill until funded"); + res + } else { + return try_chan_phase_entry!(self, Err(ChannelError::Close( + "Got an update_fulfill_htlc message for an unfunded channel!".into())), chan_phase_entry); + } }, hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id)) } }; - self.claim_funds_internal(htlc_source, msg.payment_preimage.clone(), Some(forwarded_htlc_value), false, funding_txo); + self.claim_funds_internal(htlc_source, msg.payment_preimage.clone(), Some(forwarded_htlc_value), false, Some(*counterparty_node_id), funding_txo); Ok(()) } fn internal_update_fail_htlc(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateFailHTLC) -> Result<(), MsgHandleErrInternal> { + // Note that the ChannelManager is NOT re-persisted on disk after this (unless we error + // closing a channel), so any changes are likely to be lost on restart! let per_peer_state = self.per_peer_state.read().unwrap(); let peer_state_mutex = per_peer_state.get(counterparty_node_id) .ok_or_else(|| { @@ -5966,8 +6167,13 @@ where let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; match peer_state.channel_by_id.entry(msg.channel_id) { - hash_map::Entry::Occupied(mut chan) => { - try_chan_entry!(self, chan.get_mut().update_fail_htlc(&msg, HTLCFailReason::from_msg(msg)), chan); + hash_map::Entry::Occupied(mut chan_phase_entry) => { + if let ChannelPhase::Funded(chan) = chan_phase_entry.get_mut() { + try_chan_phase_entry!(self, chan.update_fail_htlc(&msg, HTLCFailReason::from_msg(msg)), chan_phase_entry); + } else { + return try_chan_phase_entry!(self, Err(ChannelError::Close( + "Got an update_fail_htlc message for an unfunded channel!".into())), chan_phase_entry); + } }, hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id)) } @@ -5975,6 +6181,8 @@ where } fn internal_update_fail_malformed_htlc(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateFailMalformedHTLC) -> Result<(), MsgHandleErrInternal> { + // Note that the ChannelManager is NOT re-persisted on disk after this (unless we error + // closing a channel), so any changes are likely to be lost on restart! let per_peer_state = self.per_peer_state.read().unwrap(); let peer_state_mutex = per_peer_state.get(counterparty_node_id) .ok_or_else(|| { @@ -5984,12 +6192,17 @@ where let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; match peer_state.channel_by_id.entry(msg.channel_id) { - hash_map::Entry::Occupied(mut chan) => { + hash_map::Entry::Occupied(mut chan_phase_entry) => { if (msg.failure_code & 0x8000) == 0 { let chan_err: ChannelError = ChannelError::Close("Got update_fail_malformed_htlc with BADONION not set".to_owned()); - try_chan_entry!(self, Err(chan_err), chan); + try_chan_phase_entry!(self, Err(chan_err), chan_phase_entry); + } + if let ChannelPhase::Funded(chan) = chan_phase_entry.get_mut() { + try_chan_phase_entry!(self, chan.update_fail_malformed_htlc(&msg, HTLCFailReason::reason(msg.failure_code, msg.sha256_of_onion.to_vec())), chan_phase_entry); + } else { + return try_chan_phase_entry!(self, Err(ChannelError::Close( + "Got an update_fail_malformed_htlc message for an unfunded channel!".into())), chan_phase_entry); } - try_chan_entry!(self, chan.get_mut().update_fail_malformed_htlc(&msg, HTLCFailReason::reason(msg.failure_code, msg.sha256_of_onion.to_vec())), chan); Ok(()) }, hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id)) @@ -6006,13 +6219,18 @@ where let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; match peer_state.channel_by_id.entry(msg.channel_id) { - hash_map::Entry::Occupied(mut chan) => { - let funding_txo = chan.get().context.get_funding_txo(); - let monitor_update_opt = try_chan_entry!(self, chan.get_mut().commitment_signed(&msg, &self.logger), chan); - if let Some(monitor_update) = monitor_update_opt { - handle_new_monitor_update!(self, funding_txo.unwrap(), monitor_update, peer_state_lock, - peer_state, per_peer_state, chan).map(|_| ()) - } else { Ok(()) } + hash_map::Entry::Occupied(mut chan_phase_entry) => { + if let ChannelPhase::Funded(chan) = chan_phase_entry.get_mut() { + let funding_txo = chan.context.get_funding_txo(); + let monitor_update_opt = try_chan_phase_entry!(self, chan.commitment_signed(&msg, &self.logger), chan_phase_entry); + if let Some(monitor_update) = monitor_update_opt { + handle_new_monitor_update!(self, funding_txo.unwrap(), monitor_update, peer_state_lock, + peer_state, per_peer_state, chan_phase_entry).map(|_| ()) + } else { Ok(()) } + } else { + return try_chan_phase_entry!(self, Err(ChannelError::Close( + "Got a commitment_signed message for an unfunded channel!".into())), chan_phase_entry); + } }, hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id)) } @@ -6139,6 +6357,23 @@ where }) } + #[cfg(any(test, feature = "_test_utils"))] + pub(crate) fn test_raa_monitor_updates_held(&self, + counterparty_node_id: PublicKey, channel_id: ChannelId + ) -> bool { + let per_peer_state = self.per_peer_state.read().unwrap(); + if let Some(peer_state_mtx) = per_peer_state.get(&counterparty_node_id) { + let mut peer_state_lck = peer_state_mtx.lock().unwrap(); + let peer_state = &mut *peer_state_lck; + + if let Some(chan) = peer_state.channel_by_id.get(&channel_id) { + return self.raa_monitor_updates_held(&peer_state.actions_blocking_raa_monitor_updates, + chan.context().get_funding_txo().unwrap(), counterparty_node_id); + } + } + false + } + fn internal_revoke_and_ack(&self, counterparty_node_id: &PublicKey, msg: &msgs::RevokeAndACK) -> Result<(), MsgHandleErrInternal> { let (htlcs_to_fail, res) = { let per_peer_state = self.per_peer_state.read().unwrap(); @@ -6149,22 +6384,27 @@ where }).map(|mtx| mtx.lock().unwrap())?; let peer_state = &mut *peer_state_lock; match peer_state.channel_by_id.entry(msg.channel_id) { - hash_map::Entry::Occupied(mut chan) => { - let funding_txo_opt = chan.get().context.get_funding_txo(); - let mon_update_blocked = if let Some(funding_txo) = funding_txo_opt { - self.raa_monitor_updates_held( - &peer_state.actions_blocking_raa_monitor_updates, funding_txo, - *counterparty_node_id) - } else { false }; - let (htlcs_to_fail, monitor_update_opt) = try_chan_entry!(self, - chan.get_mut().revoke_and_ack(&msg, &self.fee_estimator, &self.logger, mon_update_blocked), chan); - let res = if let Some(monitor_update) = monitor_update_opt { - let funding_txo = funding_txo_opt - .expect("Funding outpoint must have been set for RAA handling to succeed"); - handle_new_monitor_update!(self, funding_txo, monitor_update, - peer_state_lock, peer_state, per_peer_state, chan).map(|_| ()) - } else { Ok(()) }; - (htlcs_to_fail, res) + hash_map::Entry::Occupied(mut chan_phase_entry) => { + if let ChannelPhase::Funded(chan) = chan_phase_entry.get_mut() { + let funding_txo_opt = chan.context.get_funding_txo(); + let mon_update_blocked = if let Some(funding_txo) = funding_txo_opt { + self.raa_monitor_updates_held( + &peer_state.actions_blocking_raa_monitor_updates, funding_txo, + *counterparty_node_id) + } else { false }; + let (htlcs_to_fail, monitor_update_opt) = try_chan_phase_entry!(self, + chan.revoke_and_ack(&msg, &self.fee_estimator, &self.logger, mon_update_blocked), chan_phase_entry); + let res = if let Some(monitor_update) = monitor_update_opt { + let funding_txo = funding_txo_opt + .expect("Funding outpoint must have been set for RAA handling to succeed"); + handle_new_monitor_update!(self, funding_txo, monitor_update, + peer_state_lock, peer_state, per_peer_state, chan_phase_entry).map(|_| ()) + } else { Ok(()) }; + (htlcs_to_fail, res) + } else { + return try_chan_phase_entry!(self, Err(ChannelError::Close( + "Got a revoke_and_ack message for an unfunded channel!".into())), chan_phase_entry); + } }, hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id)) } @@ -6183,8 +6423,13 @@ where let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; match peer_state.channel_by_id.entry(msg.channel_id) { - hash_map::Entry::Occupied(mut chan) => { - try_chan_entry!(self, chan.get_mut().update_fee(&self.fee_estimator, &msg, &self.logger), chan); + hash_map::Entry::Occupied(mut chan_phase_entry) => { + if let ChannelPhase::Funded(chan) = chan_phase_entry.get_mut() { + try_chan_phase_entry!(self, chan.update_fee(&self.fee_estimator, &msg, &self.logger), chan_phase_entry); + } else { + return try_chan_phase_entry!(self, Err(ChannelError::Close( + "Got an update_fee message for an unfunded channel!".into())), chan_phase_entry); + } }, hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id)) } @@ -6201,68 +6446,78 @@ where let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; match peer_state.channel_by_id.entry(msg.channel_id) { - hash_map::Entry::Occupied(mut chan) => { - if !chan.get().context.is_usable() { - return Err(MsgHandleErrInternal::from_no_close(LightningError{err: "Got an announcement_signatures before we were ready for it".to_owned(), action: msgs::ErrorAction::IgnoreError})); - } + hash_map::Entry::Occupied(mut chan_phase_entry) => { + if let ChannelPhase::Funded(chan) = chan_phase_entry.get_mut() { + if !chan.context.is_usable() { + return Err(MsgHandleErrInternal::from_no_close(LightningError{err: "Got an announcement_signatures before we were ready for it".to_owned(), action: msgs::ErrorAction::IgnoreError})); + } - peer_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelAnnouncement { - msg: try_chan_entry!(self, chan.get_mut().announcement_signatures( - &self.node_signer, self.genesis_hash.clone(), self.best_block.read().unwrap().height(), - msg, &self.default_configuration - ), chan), - // Note that announcement_signatures fails if the channel cannot be announced, - // so get_channel_update_for_broadcast will never fail by the time we get here. - update_msg: Some(self.get_channel_update_for_broadcast(chan.get()).unwrap()), - }); + peer_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelAnnouncement { + msg: try_chan_phase_entry!(self, chan.announcement_signatures( + &self.node_signer, self.genesis_hash.clone(), self.best_block.read().unwrap().height(), + msg, &self.default_configuration + ), chan_phase_entry), + // Note that announcement_signatures fails if the channel cannot be announced, + // so get_channel_update_for_broadcast will never fail by the time we get here. + update_msg: Some(self.get_channel_update_for_broadcast(chan).unwrap()), + }); + } else { + return try_chan_phase_entry!(self, Err(ChannelError::Close( + "Got an announcement_signatures message for an unfunded channel!".into())), chan_phase_entry); + } }, hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id)) } Ok(()) } - /// Returns ShouldPersist if anything changed, otherwise either SkipPersist or an Err. + /// Returns DoPersist if anything changed, otherwise either SkipPersistNoEvents or an Err. fn internal_channel_update(&self, counterparty_node_id: &PublicKey, msg: &msgs::ChannelUpdate) -> Result { let (chan_counterparty_node_id, chan_id) = match self.short_to_chan_info.read().unwrap().get(&msg.contents.short_channel_id) { Some((cp_id, chan_id)) => (cp_id.clone(), chan_id.clone()), None => { // It's not a local channel - return Ok(NotifyOption::SkipPersist) + return Ok(NotifyOption::SkipPersistNoEvents) } }; let per_peer_state = self.per_peer_state.read().unwrap(); let peer_state_mutex_opt = per_peer_state.get(&chan_counterparty_node_id); if peer_state_mutex_opt.is_none() { - return Ok(NotifyOption::SkipPersist) + return Ok(NotifyOption::SkipPersistNoEvents) } let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); let peer_state = &mut *peer_state_lock; match peer_state.channel_by_id.entry(chan_id) { - hash_map::Entry::Occupied(mut chan) => { - if chan.get().context.get_counterparty_node_id() != *counterparty_node_id { - if chan.get().context.should_announce() { - // If the announcement is about a channel of ours which is public, some - // other peer may simply be forwarding all its gossip to us. Don't provide - // a scary-looking error message and return Ok instead. - return Ok(NotifyOption::SkipPersist); + hash_map::Entry::Occupied(mut chan_phase_entry) => { + if let ChannelPhase::Funded(chan) = chan_phase_entry.get_mut() { + if chan.context.get_counterparty_node_id() != *counterparty_node_id { + if chan.context.should_announce() { + // If the announcement is about a channel of ours which is public, some + // other peer may simply be forwarding all its gossip to us. Don't provide + // a scary-looking error message and return Ok instead. + return Ok(NotifyOption::SkipPersistNoEvents); + } + return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a channel_update for a channel from the wrong node - it shouldn't know about our private channels!".to_owned(), chan_id)); + } + let were_node_one = self.get_our_node_id().serialize()[..] < chan.context.get_counterparty_node_id().serialize()[..]; + let msg_from_node_one = msg.contents.flags & 1 == 0; + if were_node_one == msg_from_node_one { + return Ok(NotifyOption::SkipPersistNoEvents); + } else { + log_debug!(self.logger, "Received channel_update for channel {}.", chan_id); + try_chan_phase_entry!(self, chan.channel_update(&msg), chan_phase_entry); } - return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a channel_update for a channel from the wrong node - it shouldn't know about our private channels!".to_owned(), chan_id)); - } - let were_node_one = self.get_our_node_id().serialize()[..] < chan.get().context.get_counterparty_node_id().serialize()[..]; - let msg_from_node_one = msg.contents.flags & 1 == 0; - if were_node_one == msg_from_node_one { - return Ok(NotifyOption::SkipPersist); } else { - log_debug!(self.logger, "Received channel_update for channel {}.", &chan_id); - try_chan_entry!(self, chan.get_mut().channel_update(&msg), chan); + return try_chan_phase_entry!(self, Err(ChannelError::Close( + "Got a channel_update for an unfunded channel!".into())), chan_phase_entry); } }, - hash_map::Entry::Vacant(_) => return Ok(NotifyOption::SkipPersist) + hash_map::Entry::Vacant(_) => return Ok(NotifyOption::SkipPersistNoEvents) } Ok(NotifyOption::DoPersist) } - fn internal_channel_reestablish(&self, counterparty_node_id: &PublicKey, msg: &msgs::ChannelReestablish) -> Result<(), MsgHandleErrInternal> { + fn internal_channel_reestablish(&self, counterparty_node_id: &PublicKey, msg: &msgs::ChannelReestablish) -> Result { let htlc_forwards; let need_lnd_workaround = { let per_peer_state = self.per_peer_state.read().unwrap(); @@ -6275,52 +6530,59 @@ where let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; match peer_state.channel_by_id.entry(msg.channel_id) { - hash_map::Entry::Occupied(mut chan) => { - // Currently, we expect all holding cell update_adds to be dropped on peer - // disconnect, so Channel's reestablish will never hand us any holding cell - // freed HTLCs to fail backwards. If in the future we no longer drop pending - // add-HTLCs on disconnect, we may be handed HTLCs to fail backwards here. - let responses = try_chan_entry!(self, chan.get_mut().channel_reestablish( - msg, &self.logger, &self.node_signer, self.genesis_hash, - &self.default_configuration, &*self.best_block.read().unwrap()), chan); - let mut channel_update = None; - if let Some(msg) = responses.shutdown_msg { - peer_state.pending_msg_events.push(events::MessageSendEvent::SendShutdown { - node_id: counterparty_node_id.clone(), - msg, - }); - } else if chan.get().context.is_usable() { - // If the channel is in a usable state (ie the channel is not being shut - // down), send a unicast channel_update to our counterparty to make sure - // they have the latest channel parameters. - if let Ok(msg) = self.get_channel_update_for_unicast(chan.get()) { - channel_update = Some(events::MessageSendEvent::SendChannelUpdate { - node_id: chan.get().context.get_counterparty_node_id(), + hash_map::Entry::Occupied(mut chan_phase_entry) => { + if let ChannelPhase::Funded(chan) = chan_phase_entry.get_mut() { + // Currently, we expect all holding cell update_adds to be dropped on peer + // disconnect, so Channel's reestablish will never hand us any holding cell + // freed HTLCs to fail backwards. If in the future we no longer drop pending + // add-HTLCs on disconnect, we may be handed HTLCs to fail backwards here. + let responses = try_chan_phase_entry!(self, chan.channel_reestablish( + msg, &self.logger, &self.node_signer, self.genesis_hash, + &self.default_configuration, &*self.best_block.read().unwrap()), chan_phase_entry); + let mut channel_update = None; + if let Some(msg) = responses.shutdown_msg { + peer_state.pending_msg_events.push(events::MessageSendEvent::SendShutdown { + node_id: counterparty_node_id.clone(), msg, }); + } else if chan.context.is_usable() { + // If the channel is in a usable state (ie the channel is not being shut + // down), send a unicast channel_update to our counterparty to make sure + // they have the latest channel parameters. + if let Ok(msg) = self.get_channel_update_for_unicast(chan) { + channel_update = Some(events::MessageSendEvent::SendChannelUpdate { + node_id: chan.context.get_counterparty_node_id(), + msg, + }); + } } + let need_lnd_workaround = chan.context.workaround_lnd_bug_4006.take(); + htlc_forwards = self.handle_channel_resumption( + &mut peer_state.pending_msg_events, chan, responses.raa, responses.commitment_update, responses.order, + Vec::new(), None, responses.channel_ready, responses.announcement_sigs); + if let Some(upd) = channel_update { + peer_state.pending_msg_events.push(upd); + } + need_lnd_workaround + } else { + return try_chan_phase_entry!(self, Err(ChannelError::Close( + "Got a channel_reestablish message for an unfunded channel!".into())), chan_phase_entry); } - let need_lnd_workaround = chan.get_mut().context.workaround_lnd_bug_4006.take(); - htlc_forwards = self.handle_channel_resumption( - &mut peer_state.pending_msg_events, chan.get_mut(), responses.raa, responses.commitment_update, responses.order, - Vec::new(), None, responses.channel_ready, responses.announcement_sigs); - if let Some(upd) = channel_update { - peer_state.pending_msg_events.push(upd); - } - need_lnd_workaround }, hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id)) } }; + let mut persist = NotifyOption::SkipPersistHandleEvents; if let Some(forwards) = htlc_forwards { self.forward_htlcs(&mut [forwards][..]); + persist = NotifyOption::DoPersist; } if let Some(channel_ready_msg) = need_lnd_workaround { self.internal_channel_ready(counterparty_node_id, &channel_ready_msg)?; } - Ok(()) + Ok(persist) } /// Process pending events from the [`chain::Watch`], returning whether any events were processed. @@ -6335,8 +6597,8 @@ where match monitor_event { MonitorEvent::HTLCEvent(htlc_update) => { if let Some(preimage) = htlc_update.payment_preimage { - log_trace!(self.logger, "Claiming HTLC with preimage {} from our monitor", &preimage); - self.claim_funds_internal(htlc_update.source, preimage, htlc_update.htlc_value_satoshis.map(|v| v * 1000), true, funding_outpoint); + log_trace!(self.logger, "Claiming HTLC with preimage {} from our monitor", preimage); + self.claim_funds_internal(htlc_update.source, preimage, htlc_update.htlc_value_satoshis.map(|v| v * 1000), true, counterparty_node_id, funding_outpoint); } else { log_trace!(self.logger, "Failing HTLC with hash {} from our monitor", &htlc_update.payment_hash); let receiver = HTLCDestination::NextHopChannel { node_id: counterparty_node_id, channel_id: funding_outpoint.to_channel_id() }; @@ -6361,26 +6623,27 @@ where let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; let pending_msg_events = &mut peer_state.pending_msg_events; - if let hash_map::Entry::Occupied(chan_entry) = peer_state.channel_by_id.entry(funding_outpoint.to_channel_id()) { - let mut chan = remove_channel!(self, chan_entry); - failed_channels.push(chan.context.force_shutdown(false)); - if let Ok(update) = self.get_channel_update_for_broadcast(&chan) { - pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { - msg: update + if let hash_map::Entry::Occupied(chan_phase_entry) = peer_state.channel_by_id.entry(funding_outpoint.to_channel_id()) { + if let ChannelPhase::Funded(mut chan) = remove_channel_phase!(self, chan_phase_entry) { + failed_channels.push(chan.context.force_shutdown(false)); + if let Ok(update) = self.get_channel_update_for_broadcast(&chan) { + pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { + msg: update + }); + } + let reason = if let MonitorEvent::UpdateFailed(_) = monitor_event { + ClosureReason::ProcessingError { err: "Failed to persist ChannelMonitor update during chain sync".to_string() } + } else { + ClosureReason::CommitmentTxConfirmed + }; + self.issue_channel_close_events(&chan.context, reason); + pending_msg_events.push(events::MessageSendEvent::HandleError { + node_id: chan.context.get_counterparty_node_id(), + action: msgs::ErrorAction::SendErrorMessage { + msg: msgs::ErrorMessage { channel_id: chan.context.channel_id(), data: "Channel force-closed".to_owned() } + }, }); } - let reason = if let MonitorEvent::UpdateFailed(_) = monitor_event { - ClosureReason::ProcessingError { err: "Failed to persist ChannelMonitor update during chain sync".to_string() } - } else { - ClosureReason::CommitmentTxConfirmed - }; - self.issue_channel_close_events(&chan.context, reason); - pending_msg_events.push(events::MessageSendEvent::HandleError { - node_id: chan.context.get_counterparty_node_id(), - action: msgs::ErrorAction::SendErrorMessage { - msg: msgs::ErrorMessage { channel_id: chan.context.channel_id(), data: "Channel force-closed".to_owned() } - }, - }); } } } @@ -6426,7 +6689,9 @@ where 'chan_loop: loop { let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state: &mut PeerState<_> = &mut *peer_state_lock; - for (channel_id, chan) in peer_state.channel_by_id.iter_mut() { + for (channel_id, chan) in peer_state.channel_by_id.iter_mut().filter_map( + |(chan_id, phase)| if let ChannelPhase::Funded(chan) = phase { Some((chan_id, chan)) } else { None } + ) { let counterparty_node_id = chan.context.get_counterparty_node_id(); let funding_txo = chan.context.get_funding_txo(); let (monitor_opt, holding_cell_failed_htlcs) = @@ -6478,38 +6743,43 @@ where let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; let pending_msg_events = &mut peer_state.pending_msg_events; - peer_state.channel_by_id.retain(|channel_id, chan| { - match chan.maybe_propose_closing_signed(&self.fee_estimator, &self.logger) { - Ok((msg_opt, tx_opt)) => { - if let Some(msg) = msg_opt { - has_update = true; - pending_msg_events.push(events::MessageSendEvent::SendClosingSigned { - node_id: chan.context.get_counterparty_node_id(), msg, - }); - } - if let Some(tx) = tx_opt { - // We're done with this channel. We got a closing_signed and sent back - // a closing_signed with a closing transaction to broadcast. - if let Ok(update) = self.get_channel_update_for_broadcast(&chan) { - pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { - msg: update - }); - } + peer_state.channel_by_id.retain(|channel_id, phase| { + match phase { + ChannelPhase::Funded(chan) => { + match chan.maybe_propose_closing_signed(&self.fee_estimator, &self.logger) { + Ok((msg_opt, tx_opt)) => { + if let Some(msg) = msg_opt { + has_update = true; + pending_msg_events.push(events::MessageSendEvent::SendClosingSigned { + node_id: chan.context.get_counterparty_node_id(), msg, + }); + } + if let Some(tx) = tx_opt { + // We're done with this channel. We got a closing_signed and sent back + // a closing_signed with a closing transaction to broadcast. + if let Ok(update) = self.get_channel_update_for_broadcast(&chan) { + pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { + msg: update + }); + } - self.issue_channel_close_events(&chan.context, ClosureReason::CooperativeClosure); + self.issue_channel_close_events(&chan.context, ClosureReason::CooperativeClosure); - log_info!(self.logger, "Broadcasting {}", log_tx!(tx)); - self.tx_broadcaster.broadcast_transactions(&[&tx]); - update_maps_on_chan_removal!(self, &chan.context); - false - } else { true } + log_info!(self.logger, "Broadcasting {}", log_tx!(tx)); + self.tx_broadcaster.broadcast_transactions(&[&tx]); + update_maps_on_chan_removal!(self, &chan.context); + false + } else { true } + }, + Err(e) => { + has_update = true; + let (close_channel, res) = convert_chan_phase_err!(self, e, chan, channel_id, FUNDED_CHANNEL); + handle_errors.push((chan.context.get_counterparty_node_id(), Err(res))); + !close_channel + } + } }, - Err(e) => { - has_update = true; - let (close_channel, res) = convert_chan_err!(self, e, chan, channel_id); - handle_errors.push((chan.context.get_counterparty_node_id(), Err(res))); - !close_channel - } + _ => true, // Retain unfunded channels if present. } }); } @@ -6702,7 +6972,9 @@ where for (_cp_id, peer_state_mutex) in per_peer_state.iter() { let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; - for chan in peer_state.channel_by_id.values() { + for chan in peer_state.channel_by_id.values().filter_map( + |phase| if let ChannelPhase::Funded(chan) = phase { Some(chan) } else { None } + ) { for (htlc_source, _) in chan.inflight_htlc_sources() { if let HTLCSource::OutboundRoute { path, .. } = htlc_source { inflight_htlcs.process_path(path, self.get_our_node_id()); @@ -6775,24 +7047,26 @@ where break; } - if let hash_map::Entry::Occupied(mut chan) = peer_state.channel_by_id.entry(channel_funding_outpoint.to_channel_id()) { - debug_assert_eq!(chan.get().context.get_funding_txo().unwrap(), channel_funding_outpoint); - if let Some((monitor_update, further_update_exists)) = chan.get_mut().unblock_next_blocked_monitor_update() { - log_debug!(self.logger, "Unlocking monitor updating for channel {} and updating monitor", - &channel_funding_outpoint.to_channel_id()); - if let Err(e) = handle_new_monitor_update!(self, channel_funding_outpoint, monitor_update, - peer_state_lck, peer_state, per_peer_state, chan) - { - errors.push((e, counterparty_node_id)); - } - if further_update_exists { - // If there are more `ChannelMonitorUpdate`s to process, restart at the - // top of the loop. - continue; + if let hash_map::Entry::Occupied(mut chan_phase_entry) = peer_state.channel_by_id.entry(channel_funding_outpoint.to_channel_id()) { + if let ChannelPhase::Funded(chan) = chan_phase_entry.get_mut() { + debug_assert_eq!(chan.context.get_funding_txo().unwrap(), channel_funding_outpoint); + if let Some((monitor_update, further_update_exists)) = chan.unblock_next_blocked_monitor_update() { + log_debug!(self.logger, "Unlocking monitor updating for channel {} and updating monitor", + channel_funding_outpoint.to_channel_id()); + if let Err(e) = handle_new_monitor_update!(self, channel_funding_outpoint, monitor_update, + peer_state_lck, peer_state, per_peer_state, chan_phase_entry) + { + errors.push((e, counterparty_node_id)); + } + if further_update_exists { + // If there are more `ChannelMonitorUpdate`s to process, restart at the + // top of the loop. + continue; + } + } else { + log_trace!(self.logger, "Unlocked monitor updating for channel {} without monitors to update", + channel_funding_outpoint.to_channel_id()); } - } else { - log_trace!(self.logger, "Unlocked monitor updating for channel {} without monitors to update", - &channel_funding_outpoint.to_channel_id()); } } } else { @@ -6858,8 +7132,8 @@ where /// the `MessageSendEvent`s to the specific peer they were generated under. fn get_and_clear_pending_msg_events(&self) -> Vec { let events = RefCell::new(Vec::new()); - PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, &self.persistence_notifier, || { - let mut result = self.process_background_events(); + PersistenceNotifierGuard::optionally_notify(self, || { + let mut result = NotifyOption::SkipPersistNoEvents; // TODO: This behavior should be documented. It's unintuitive that we query // ChannelMonitors when clearing other events. @@ -6940,8 +7214,9 @@ where } fn block_disconnected(&self, header: &BlockHeader, height: u32) { - let _persistence_guard = PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, - &self.persistence_notifier, || -> NotifyOption { NotifyOption::DoPersist }); + let _persistence_guard = + PersistenceNotifierGuard::optionally_notify_skipping_background_events( + self, || -> NotifyOption { NotifyOption::DoPersist }); let new_height = height - 1; { let mut best_block = self.best_block.write().unwrap(); @@ -6975,8 +7250,9 @@ where let block_hash = header.block_hash(); log_trace!(self.logger, "{} transactions included in block {} at height {} provided", txdata.len(), block_hash, height); - let _persistence_guard = PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, - &self.persistence_notifier, || -> NotifyOption { NotifyOption::DoPersist }); + let _persistence_guard = + PersistenceNotifierGuard::optionally_notify_skipping_background_events( + self, || -> NotifyOption { NotifyOption::DoPersist }); self.do_chain_event(Some(height), |channel| channel.transactions_confirmed(&block_hash, height, txdata, self.genesis_hash.clone(), &self.node_signer, &self.default_configuration, &self.logger) .map(|(a, b)| (a, Vec::new(), b))); @@ -6995,8 +7271,9 @@ where let block_hash = header.block_hash(); log_trace!(self.logger, "New best block: {} at height {}", block_hash, height); - let _persistence_guard = PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, - &self.persistence_notifier, || -> NotifyOption { NotifyOption::DoPersist }); + let _persistence_guard = + PersistenceNotifierGuard::optionally_notify_skipping_background_events( + self, || -> NotifyOption { NotifyOption::DoPersist }); *self.best_block.write().unwrap() = BestBlock::new(block_hash, height); self.do_chain_event(Some(height), |channel| channel.best_block_updated(height, header.time, self.genesis_hash.clone(), &self.node_signer, &self.default_configuration, &self.logger)); @@ -7029,7 +7306,7 @@ where for (_cp_id, peer_state_mutex) in self.per_peer_state.read().unwrap().iter() { let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; - for chan in peer_state.channel_by_id.values() { + for chan in peer_state.channel_by_id.values().filter_map(|phase| if let ChannelPhase::Funded(chan) = phase { Some(chan) } else { None }) { if let (Some(funding_txo), Some(block_hash)) = (chan.context.get_funding_txo(), chan.context.get_funding_tx_confirmed_in()) { res.push((funding_txo.txid, Some(block_hash))); } @@ -7039,8 +7316,9 @@ where } fn transaction_unconfirmed(&self, txid: &Txid) { - let _persistence_guard = PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, - &self.persistence_notifier, || -> NotifyOption { NotifyOption::DoPersist }); + let _persistence_guard = + PersistenceNotifierGuard::optionally_notify_skipping_background_events( + self, || -> NotifyOption { NotifyOption::DoPersist }); self.do_chain_event(None, |channel| { if let Some(funding_txo) = channel.context.get_funding_txo() { if funding_txo.txid == *txid { @@ -7079,88 +7357,94 @@ where let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; let pending_msg_events = &mut peer_state.pending_msg_events; - peer_state.channel_by_id.retain(|_, channel| { - let res = f(channel); - if let Ok((channel_ready_opt, mut timed_out_pending_htlcs, announcement_sigs)) = res { - for (source, payment_hash) in timed_out_pending_htlcs.drain(..) { - let (failure_code, data) = self.get_htlc_inbound_temp_fail_err_and_data(0x1000|14 /* expiry_too_soon */, &channel); - timed_out_htlcs.push((source, payment_hash, HTLCFailReason::reason(failure_code, data), - HTLCDestination::NextHopChannel { node_id: Some(channel.context.get_counterparty_node_id()), channel_id: channel.context.channel_id() })); - } - if let Some(channel_ready) = channel_ready_opt { - send_channel_ready!(self, pending_msg_events, channel, channel_ready); - if channel.context.is_usable() { - log_trace!(self.logger, "Sending channel_ready with private initial channel_update for our counterparty on channel {}", &channel.context.channel_id()); - if let Ok(msg) = self.get_channel_update_for_unicast(channel) { - pending_msg_events.push(events::MessageSendEvent::SendChannelUpdate { - node_id: channel.context.get_counterparty_node_id(), - msg, - }); + peer_state.channel_by_id.retain(|_, phase| { + match phase { + // Retain unfunded channels. + ChannelPhase::UnfundedOutboundV1(_) | ChannelPhase::UnfundedInboundV1(_) => true, + ChannelPhase::Funded(channel) => { + let res = f(channel); + if let Ok((channel_ready_opt, mut timed_out_pending_htlcs, announcement_sigs)) = res { + for (source, payment_hash) in timed_out_pending_htlcs.drain(..) { + let (failure_code, data) = self.get_htlc_inbound_temp_fail_err_and_data(0x1000|14 /* expiry_too_soon */, &channel); + timed_out_htlcs.push((source, payment_hash, HTLCFailReason::reason(failure_code, data), + HTLCDestination::NextHopChannel { node_id: Some(channel.context.get_counterparty_node_id()), channel_id: channel.context.channel_id() })); + } + if let Some(channel_ready) = channel_ready_opt { + send_channel_ready!(self, pending_msg_events, channel, channel_ready); + if channel.context.is_usable() { + log_trace!(self.logger, "Sending channel_ready with private initial channel_update for our counterparty on channel {}", channel.context.channel_id()); + if let Ok(msg) = self.get_channel_update_for_unicast(channel) { + pending_msg_events.push(events::MessageSendEvent::SendChannelUpdate { + node_id: channel.context.get_counterparty_node_id(), + msg, + }); + } + } else { + log_trace!(self.logger, "Sending channel_ready WITHOUT channel_update for {}", channel.context.channel_id()); + } } - } else { - log_trace!(self.logger, "Sending channel_ready WITHOUT channel_update for {}", &channel.context.channel_id()); - } - } - { - let mut pending_events = self.pending_events.lock().unwrap(); - emit_channel_ready_event!(pending_events, channel); - } + { + let mut pending_events = self.pending_events.lock().unwrap(); + emit_channel_ready_event!(pending_events, channel); + } - if let Some(announcement_sigs) = announcement_sigs { - log_trace!(self.logger, "Sending announcement_signatures for channel {}", &channel.context.channel_id()); - pending_msg_events.push(events::MessageSendEvent::SendAnnouncementSignatures { - node_id: channel.context.get_counterparty_node_id(), - msg: announcement_sigs, - }); - if let Some(height) = height_opt { - if let Some(announcement) = channel.get_signed_channel_announcement(&self.node_signer, self.genesis_hash, height, &self.default_configuration) { - pending_msg_events.push(events::MessageSendEvent::BroadcastChannelAnnouncement { - msg: announcement, - // Note that announcement_signatures fails if the channel cannot be announced, - // so get_channel_update_for_broadcast will never fail by the time we get here. - update_msg: Some(self.get_channel_update_for_broadcast(channel).unwrap()), + if let Some(announcement_sigs) = announcement_sigs { + log_trace!(self.logger, "Sending announcement_signatures for channel {}", channel.context.channel_id()); + pending_msg_events.push(events::MessageSendEvent::SendAnnouncementSignatures { + node_id: channel.context.get_counterparty_node_id(), + msg: announcement_sigs, }); + if let Some(height) = height_opt { + if let Some(announcement) = channel.get_signed_channel_announcement(&self.node_signer, self.genesis_hash, height, &self.default_configuration) { + pending_msg_events.push(events::MessageSendEvent::BroadcastChannelAnnouncement { + msg: announcement, + // Note that announcement_signatures fails if the channel cannot be announced, + // so get_channel_update_for_broadcast will never fail by the time we get here. + update_msg: Some(self.get_channel_update_for_broadcast(channel).unwrap()), + }); + } + } } + if channel.is_our_channel_ready() { + if let Some(real_scid) = channel.context.get_short_channel_id() { + // If we sent a 0conf channel_ready, and now have an SCID, we add it + // to the short_to_chan_info map here. Note that we check whether we + // can relay using the real SCID at relay-time (i.e. + // enforce option_scid_alias then), and if the funding tx is ever + // un-confirmed we force-close the channel, ensuring short_to_chan_info + // is always consistent. + let mut short_to_chan_info = self.short_to_chan_info.write().unwrap(); + let scid_insert = short_to_chan_info.insert(real_scid, (channel.context.get_counterparty_node_id(), channel.context.channel_id())); + assert!(scid_insert.is_none() || scid_insert.unwrap() == (channel.context.get_counterparty_node_id(), channel.context.channel_id()), + "SCIDs should never collide - ensure you weren't behind by a full {} blocks when creating channels", + fake_scid::MAX_SCID_BLOCKS_FROM_NOW); + } + } + } else if let Err(reason) = res { + update_maps_on_chan_removal!(self, &channel.context); + // It looks like our counterparty went on-chain or funding transaction was + // reorged out of the main chain. Close the channel. + failed_channels.push(channel.context.force_shutdown(true)); + if let Ok(update) = self.get_channel_update_for_broadcast(&channel) { + pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { + msg: update + }); + } + let reason_message = format!("{}", reason); + self.issue_channel_close_events(&channel.context, reason); + pending_msg_events.push(events::MessageSendEvent::HandleError { + node_id: channel.context.get_counterparty_node_id(), + action: msgs::ErrorAction::SendErrorMessage { msg: msgs::ErrorMessage { + channel_id: channel.context.channel_id(), + data: reason_message, + } }, + }); + return false; } + true } - if channel.is_our_channel_ready() { - if let Some(real_scid) = channel.context.get_short_channel_id() { - // If we sent a 0conf channel_ready, and now have an SCID, we add it - // to the short_to_chan_info map here. Note that we check whether we - // can relay using the real SCID at relay-time (i.e. - // enforce option_scid_alias then), and if the funding tx is ever - // un-confirmed we force-close the channel, ensuring short_to_chan_info - // is always consistent. - let mut short_to_chan_info = self.short_to_chan_info.write().unwrap(); - let scid_insert = short_to_chan_info.insert(real_scid, (channel.context.get_counterparty_node_id(), channel.context.channel_id())); - assert!(scid_insert.is_none() || scid_insert.unwrap() == (channel.context.get_counterparty_node_id(), channel.context.channel_id()), - "SCIDs should never collide - ensure you weren't behind by a full {} blocks when creating channels", - fake_scid::MAX_SCID_BLOCKS_FROM_NOW); - } - } - } else if let Err(reason) = res { - update_maps_on_chan_removal!(self, &channel.context); - // It looks like our counterparty went on-chain or funding transaction was - // reorged out of the main chain. Close the channel. - failed_channels.push(channel.context.force_shutdown(true)); - if let Ok(update) = self.get_channel_update_for_broadcast(&channel) { - pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { - msg: update - }); - } - let reason_message = format!("{}", reason); - self.issue_channel_close_events(&channel.context, reason); - pending_msg_events.push(events::MessageSendEvent::HandleError { - node_id: channel.context.get_counterparty_node_id(), - action: msgs::ErrorAction::SendErrorMessage { msg: msgs::ErrorMessage { - channel_id: channel.context.channel_id(), - data: reason_message, - } }, - }); - return false; } - true }); } } @@ -7217,18 +7501,26 @@ where } } - /// Gets a [`Future`] that completes when this [`ChannelManager`] needs to be persisted. + /// Gets a [`Future`] that completes when this [`ChannelManager`] may need to be persisted or + /// may have events that need processing. + /// + /// In order to check if this [`ChannelManager`] needs persisting, call + /// [`Self::get_and_clear_needs_persistence`]. /// /// Note that callbacks registered on the [`Future`] MUST NOT call back into this /// [`ChannelManager`] and should instead register actions to be taken later. - /// - pub fn get_persistable_update_future(&self) -> Future { - self.persistence_notifier.get_future() + pub fn get_event_or_persistence_needed_future(&self) -> Future { + self.event_persist_notifier.get_future() + } + + /// Returns true if this [`ChannelManager`] needs to be persisted. + pub fn get_and_clear_needs_persistence(&self) -> bool { + self.needs_persist_flag.swap(false, Ordering::AcqRel) } #[cfg(any(test, feature = "_test_utils"))] - pub fn get_persistence_condvar_value(&self) -> bool { - self.persistence_notifier.notify_pending() + pub fn get_event_or_persist_condvar_value(&self) -> bool { + self.event_persist_notifier.notify_pending() } /// Gets the latest best block which was connected either via the [`chain::Listen`] or @@ -7285,8 +7577,21 @@ where L::Target: Logger, { fn handle_open_channel(&self, counterparty_node_id: &PublicKey, msg: &msgs::OpenChannel) { - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); - let _ = handle_error!(self, self.internal_open_channel(counterparty_node_id, msg), *counterparty_node_id); + // Note that we never need to persist the updated ChannelManager for an inbound + // open_channel message - pre-funded channels are never written so there should be no + // change to the contents. + let _persistence_guard = PersistenceNotifierGuard::optionally_notify(self, || { + let res = self.internal_open_channel(counterparty_node_id, msg); + let persist = match &res { + Err(e) if e.closes_channel() => { + debug_assert!(false, "We shouldn't close a new channel"); + NotifyOption::DoPersist + }, + _ => NotifyOption::SkipPersistHandleEvents, + }; + let _ = handle_error!(self, res, *counterparty_node_id); + persist + }); } fn handle_open_channel_v2(&self, counterparty_node_id: &PublicKey, msg: &msgs::OpenChannelV2) { @@ -7296,8 +7601,13 @@ where } fn handle_accept_channel(&self, counterparty_node_id: &PublicKey, msg: &msgs::AcceptChannel) { - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); - let _ = handle_error!(self, self.internal_accept_channel(counterparty_node_id, msg), *counterparty_node_id); + // Note that we never need to persist the updated ChannelManager for an inbound + // accept_channel message - pre-funded channels are never written so there should be no + // change to the contents. + let _persistence_guard = PersistenceNotifierGuard::optionally_notify(self, || { + let _ = handle_error!(self, self.internal_accept_channel(counterparty_node_id, msg), *counterparty_node_id); + NotifyOption::SkipPersistHandleEvents + }); } fn handle_accept_channel_v2(&self, counterparty_node_id: &PublicKey, msg: &msgs::AcceptChannelV2) { @@ -7317,8 +7627,19 @@ where } fn handle_channel_ready(&self, counterparty_node_id: &PublicKey, msg: &msgs::ChannelReady) { - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); - let _ = handle_error!(self, self.internal_channel_ready(counterparty_node_id, msg), *counterparty_node_id); + // Note that we never need to persist the updated ChannelManager for an inbound + // channel_ready message - while the channel's state will change, any channel_ready message + // will ultimately be re-sent on startup and the `ChannelMonitor` won't be updated so we + // will not force-close the channel on startup. + let _persistence_guard = PersistenceNotifierGuard::optionally_notify(self, || { + let res = self.internal_channel_ready(counterparty_node_id, msg); + let persist = match &res { + Err(e) if e.closes_channel() => NotifyOption::DoPersist, + _ => NotifyOption::SkipPersistHandleEvents, + }; + let _ = handle_error!(self, res, *counterparty_node_id); + persist + }); } fn handle_shutdown(&self, counterparty_node_id: &PublicKey, msg: &msgs::Shutdown) { @@ -7332,8 +7653,19 @@ where } fn handle_update_add_htlc(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateAddHTLC) { - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); - let _ = handle_error!(self, self.internal_update_add_htlc(counterparty_node_id, msg), *counterparty_node_id); + // Note that we never need to persist the updated ChannelManager for an inbound + // update_add_htlc message - the message itself doesn't change our channel state only the + // `commitment_signed` message afterwards will. + let _persistence_guard = PersistenceNotifierGuard::optionally_notify(self, || { + let res = self.internal_update_add_htlc(counterparty_node_id, msg); + let persist = match &res { + Err(e) if e.closes_channel() => NotifyOption::DoPersist, + Err(_) => NotifyOption::SkipPersistHandleEvents, + Ok(()) => NotifyOption::SkipPersistNoEvents, + }; + let _ = handle_error!(self, res, *counterparty_node_id); + persist + }); } fn handle_update_fulfill_htlc(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateFulfillHTLC) { @@ -7342,13 +7674,35 @@ where } fn handle_update_fail_htlc(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateFailHTLC) { - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); - let _ = handle_error!(self, self.internal_update_fail_htlc(counterparty_node_id, msg), *counterparty_node_id); + // Note that we never need to persist the updated ChannelManager for an inbound + // update_fail_htlc message - the message itself doesn't change our channel state only the + // `commitment_signed` message afterwards will. + let _persistence_guard = PersistenceNotifierGuard::optionally_notify(self, || { + let res = self.internal_update_fail_htlc(counterparty_node_id, msg); + let persist = match &res { + Err(e) if e.closes_channel() => NotifyOption::DoPersist, + Err(_) => NotifyOption::SkipPersistHandleEvents, + Ok(()) => NotifyOption::SkipPersistNoEvents, + }; + let _ = handle_error!(self, res, *counterparty_node_id); + persist + }); } fn handle_update_fail_malformed_htlc(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateFailMalformedHTLC) { - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); - let _ = handle_error!(self, self.internal_update_fail_malformed_htlc(counterparty_node_id, msg), *counterparty_node_id); + // Note that we never need to persist the updated ChannelManager for an inbound + // update_fail_malformed_htlc message - the message itself doesn't change our channel state + // only the `commitment_signed` message afterwards will. + let _persistence_guard = PersistenceNotifierGuard::optionally_notify(self, || { + let res = self.internal_update_fail_malformed_htlc(counterparty_node_id, msg); + let persist = match &res { + Err(e) if e.closes_channel() => NotifyOption::DoPersist, + Err(_) => NotifyOption::SkipPersistHandleEvents, + Ok(()) => NotifyOption::SkipPersistNoEvents, + }; + let _ = handle_error!(self, res, *counterparty_node_id); + persist + }); } fn handle_commitment_signed(&self, counterparty_node_id: &PublicKey, msg: &msgs::CommitmentSigned) { @@ -7362,8 +7716,19 @@ where } fn handle_update_fee(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateFee) { - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); - let _ = handle_error!(self, self.internal_update_fee(counterparty_node_id, msg), *counterparty_node_id); + // Note that we never need to persist the updated ChannelManager for an inbound + // update_fee message - the message itself doesn't change our channel state only the + // `commitment_signed` message afterwards will. + let _persistence_guard = PersistenceNotifierGuard::optionally_notify(self, || { + let res = self.internal_update_fee(counterparty_node_id, msg); + let persist = match &res { + Err(e) if e.closes_channel() => NotifyOption::DoPersist, + Err(_) => NotifyOption::SkipPersistHandleEvents, + Ok(()) => NotifyOption::SkipPersistNoEvents, + }; + let _ = handle_error!(self, res, *counterparty_node_id); + persist + }); } fn handle_announcement_signatures(&self, counterparty_node_id: &PublicKey, msg: &msgs::AnnouncementSignatures) { @@ -7372,23 +7737,32 @@ where } fn handle_channel_update(&self, counterparty_node_id: &PublicKey, msg: &msgs::ChannelUpdate) { - PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, &self.persistence_notifier, || { - let force_persist = self.process_background_events(); + PersistenceNotifierGuard::optionally_notify(self, || { if let Ok(persist) = handle_error!(self, self.internal_channel_update(counterparty_node_id, msg), *counterparty_node_id) { - if force_persist == NotifyOption::DoPersist { NotifyOption::DoPersist } else { persist } + persist } else { - NotifyOption::SkipPersist + NotifyOption::DoPersist } }); } fn handle_channel_reestablish(&self, counterparty_node_id: &PublicKey, msg: &msgs::ChannelReestablish) { - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); - let _ = handle_error!(self, self.internal_channel_reestablish(counterparty_node_id, msg), *counterparty_node_id); + let _persistence_guard = PersistenceNotifierGuard::optionally_notify(self, || { + let res = self.internal_channel_reestablish(counterparty_node_id, msg); + let persist = match &res { + Err(e) if e.closes_channel() => NotifyOption::DoPersist, + Err(_) => NotifyOption::SkipPersistHandleEvents, + Ok(persist) => *persist, + }; + let _ = handle_error!(self, res, *counterparty_node_id); + persist + }); } fn peer_disconnected(&self, counterparty_node_id: &PublicKey) { - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); + let _persistence_guard = PersistenceNotifierGuard::optionally_notify( + self, || NotifyOption::SkipPersistHandleEvents); + let mut failed_channels = Vec::new(); let mut per_peer_state = self.per_peer_state.write().unwrap(); let remove_peer = { @@ -7398,23 +7772,27 @@ where let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; let pending_msg_events = &mut peer_state.pending_msg_events; - peer_state.channel_by_id.retain(|_, chan| { - chan.remove_uncommitted_htlcs_and_mark_paused(&self.logger); - if chan.is_shutdown() { - update_maps_on_chan_removal!(self, &chan.context); - self.issue_channel_close_events(&chan.context, ClosureReason::DisconnectedPeer); - return false; - } - true - }); - peer_state.inbound_v1_channel_by_id.retain(|_, chan| { - update_maps_on_chan_removal!(self, &chan.context); - self.issue_channel_close_events(&chan.context, ClosureReason::DisconnectedPeer); - false - }); - peer_state.outbound_v1_channel_by_id.retain(|_, chan| { - update_maps_on_chan_removal!(self, &chan.context); - self.issue_channel_close_events(&chan.context, ClosureReason::DisconnectedPeer); + peer_state.channel_by_id.retain(|_, phase| { + let context = match phase { + ChannelPhase::Funded(chan) => { + chan.remove_uncommitted_htlcs_and_mark_paused(&self.logger); + // We only retain funded channels that are not shutdown. + if !chan.is_shutdown() { + return true; + } + &chan.context + }, + // Unfunded channels will always be removed. + ChannelPhase::UnfundedOutboundV1(chan) => { + &chan.context + }, + ChannelPhase::UnfundedInboundV1(chan) => { + &chan.context + }, + }; + // Clean up for removal. + update_maps_on_chan_removal!(self, &context); + self.issue_channel_close_events(&context, ClosureReason::DisconnectedPeer); false }); // Note that we don't bother generating any events for pre-accept channels - @@ -7483,74 +7861,82 @@ where return Err(()); } - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); + let mut res = Ok(()); - // If we have too many peers connected which don't have funded channels, disconnect the - // peer immediately (as long as it doesn't have funded channels). If we have a bunch of - // unfunded channels taking up space in memory for disconnected peers, we still let new - // peers connect, but we'll reject new channels from them. - let connected_peers_without_funded_channels = self.peers_without_funded_channels(|node| node.is_connected); - let inbound_peer_limited = inbound && connected_peers_without_funded_channels >= MAX_NO_CHANNEL_PEERS; + PersistenceNotifierGuard::optionally_notify(self, || { + // If we have too many peers connected which don't have funded channels, disconnect the + // peer immediately (as long as it doesn't have funded channels). If we have a bunch of + // unfunded channels taking up space in memory for disconnected peers, we still let new + // peers connect, but we'll reject new channels from them. + let connected_peers_without_funded_channels = self.peers_without_funded_channels(|node| node.is_connected); + let inbound_peer_limited = inbound && connected_peers_without_funded_channels >= MAX_NO_CHANNEL_PEERS; - { - let mut peer_state_lock = self.per_peer_state.write().unwrap(); - match peer_state_lock.entry(counterparty_node_id.clone()) { - hash_map::Entry::Vacant(e) => { - if inbound_peer_limited { - return Err(()); - } - e.insert(Mutex::new(PeerState { - channel_by_id: HashMap::new(), - outbound_v1_channel_by_id: HashMap::new(), - inbound_v1_channel_by_id: HashMap::new(), - inbound_channel_request_by_id: HashMap::new(), - latest_features: init_msg.features.clone(), - pending_msg_events: Vec::new(), - in_flight_monitor_updates: BTreeMap::new(), - monitor_update_blocked_actions: BTreeMap::new(), - actions_blocking_raa_monitor_updates: BTreeMap::new(), - is_connected: true, - })); - }, - hash_map::Entry::Occupied(e) => { - let mut peer_state = e.get().lock().unwrap(); - peer_state.latest_features = init_msg.features.clone(); - - let best_block_height = self.best_block.read().unwrap().height(); - if inbound_peer_limited && - Self::unfunded_channel_count(&*peer_state, best_block_height) == - peer_state.channel_by_id.len() - { - return Err(()); - } + { + let mut peer_state_lock = self.per_peer_state.write().unwrap(); + match peer_state_lock.entry(counterparty_node_id.clone()) { + hash_map::Entry::Vacant(e) => { + if inbound_peer_limited { + res = Err(()); + return NotifyOption::SkipPersistNoEvents; + } + e.insert(Mutex::new(PeerState { + channel_by_id: HashMap::new(), + inbound_channel_request_by_id: HashMap::new(), + latest_features: init_msg.features.clone(), + pending_msg_events: Vec::new(), + in_flight_monitor_updates: BTreeMap::new(), + monitor_update_blocked_actions: BTreeMap::new(), + actions_blocking_raa_monitor_updates: BTreeMap::new(), + is_connected: true, + })); + }, + hash_map::Entry::Occupied(e) => { + let mut peer_state = e.get().lock().unwrap(); + peer_state.latest_features = init_msg.features.clone(); + + let best_block_height = self.best_block.read().unwrap().height(); + if inbound_peer_limited && + Self::unfunded_channel_count(&*peer_state, best_block_height) == + peer_state.channel_by_id.len() + { + res = Err(()); + return NotifyOption::SkipPersistNoEvents; + } - debug_assert!(!peer_state.is_connected, "A peer shouldn't be connected twice"); - peer_state.is_connected = true; - }, + debug_assert!(!peer_state.is_connected, "A peer shouldn't be connected twice"); + peer_state.is_connected = true; + }, + } } - } - log_debug!(self.logger, "Generating channel_reestablish events for {}", log_pubkey!(counterparty_node_id)); + log_debug!(self.logger, "Generating channel_reestablish events for {}", log_pubkey!(counterparty_node_id)); - let per_peer_state = self.per_peer_state.read().unwrap(); - if let Some(peer_state_mutex) = per_peer_state.get(counterparty_node_id) { - let mut peer_state_lock = peer_state_mutex.lock().unwrap(); - let peer_state = &mut *peer_state_lock; - let pending_msg_events = &mut peer_state.pending_msg_events; - - // Since unfunded channel maps are cleared upon disconnecting a peer, and they're not persisted - // (so won't be recovered after a crash) we don't need to bother closing unfunded channels and - // clearing their maps here. Instead we can just send queue channel_reestablish messages for - // channels in the channel_by_id map. - peer_state.channel_by_id.iter_mut().for_each(|(_, chan)| { - pending_msg_events.push(events::MessageSendEvent::SendChannelReestablish { - node_id: chan.context.get_counterparty_node_id(), - msg: chan.get_channel_reestablish(&self.logger), + let per_peer_state = self.per_peer_state.read().unwrap(); + if let Some(peer_state_mutex) = per_peer_state.get(counterparty_node_id) { + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); + let peer_state = &mut *peer_state_lock; + let pending_msg_events = &mut peer_state.pending_msg_events; + + peer_state.channel_by_id.iter_mut().filter_map(|(_, phase)| + if let ChannelPhase::Funded(chan) = phase { Some(chan) } else { + // Since unfunded channel maps are cleared upon disconnecting a peer, and they're not persisted + // (so won't be recovered after a crash), they shouldn't exist here and we would never need to + // worry about closing and removing them. + debug_assert!(false); + None + } + ).for_each(|chan| { + pending_msg_events.push(events::MessageSendEvent::SendChannelReestablish { + node_id: chan.context.get_counterparty_node_id(), + msg: chan.get_channel_reestablish(&self.logger), + }); }); - }); - } - //TODO: Also re-broadcast announcement_signatures - Ok(()) + } + + return NotifyOption::SkipPersistHandleEvents; + //TODO: Also re-broadcast announcement_signatures + }); + res } fn handle_error(&self, counterparty_node_id: &PublicKey, msg: &msgs::ErrorMessage) { @@ -7572,7 +7958,7 @@ where let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id); if peer_state_mutex_opt.is_none() { return; } let mut peer_state = peer_state_mutex_opt.unwrap().lock().unwrap(); - if let Some(chan) = peer_state.channel_by_id.get(&msg.channel_id) { + if let Some(ChannelPhase::Funded(chan)) = peer_state.channel_by_id.get(&msg.channel_id) { if let Some(msg) = chan.get_outbound_shutdown() { peer_state.pending_msg_events.push(events::MessageSendEvent::SendShutdown { node_id: *counterparty_node_id, @@ -7606,9 +7992,7 @@ where // Note that we don't bother generating any events for pre-accept channels - // they're not considered "channels" yet from the PoV of our events interface. peer_state.inbound_channel_request_by_id.clear(); - peer_state.channel_by_id.keys().cloned() - .chain(peer_state.outbound_v1_channel_by_id.keys().cloned()) - .chain(peer_state.inbound_v1_channel_by_id.keys().cloned()).collect() + peer_state.channel_by_id.keys().cloned().collect() }; for channel_id in channel_ids { // Untrusted messages from peer, we throw away the error if id points to a non-existent channel @@ -7622,7 +8006,7 @@ where if peer_state_mutex_opt.is_none() { return; } let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); let peer_state = &mut *peer_state_lock; - if let Some(chan) = peer_state.outbound_v1_channel_by_id.get_mut(&msg.channel_id) { + if let Some(ChannelPhase::UnfundedOutboundV1(chan)) = peer_state.channel_by_id.get_mut(&msg.channel_id) { if let Ok(msg) = chan.maybe_handle_error_without_close(self.genesis_hash, &self.fee_estimator) { peer_state.pending_msg_events.push(events::MessageSendEvent::SendOpenChannel { node_id: *counterparty_node_id, @@ -8203,31 +8587,30 @@ where let mut serializable_peer_count: u64 = 0; { let per_peer_state = self.per_peer_state.read().unwrap(); - let mut unfunded_channels = 0; - let mut number_of_channels = 0; + let mut number_of_funded_channels = 0; for (_, peer_state_mutex) in per_peer_state.iter() { let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; if !peer_state.ok_to_remove(false) { serializable_peer_count += 1; } - number_of_channels += peer_state.channel_by_id.len(); - for (_, channel) in peer_state.channel_by_id.iter() { - if !channel.context.is_funding_initiated() { - unfunded_channels += 1; - } - } + + number_of_funded_channels += peer_state.channel_by_id.iter().filter( + |(_, phase)| if let ChannelPhase::Funded(chan) = phase { chan.context.is_funding_initiated() } else { false } + ).count(); } - ((number_of_channels - unfunded_channels) as u64).write(writer)?; + (number_of_funded_channels as u64).write(writer)?; for (_, peer_state_mutex) in per_peer_state.iter() { let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; - for (_, channel) in peer_state.channel_by_id.iter() { - if channel.context.is_funding_initiated() { - channel.write(writer)?; - } + for channel in peer_state.channel_by_id.iter().filter_map( + |(_, phase)| if let ChannelPhase::Funded(channel) = phase { + if channel.context.is_funding_initiated() { Some(channel) } else { None } + } else { None } + ) { + channel.write(writer)?; } } } @@ -8340,6 +8723,8 @@ where session_priv.write(writer)?; } } + PendingOutboundPayment::AwaitingInvoice { .. } => {}, + PendingOutboundPayment::InvoiceReceived { .. } => {}, PendingOutboundPayment::Fulfilled { .. } => {}, PendingOutboundPayment::Abandoned { .. } => {}, } @@ -8611,7 +8996,7 @@ where let channel_count: u64 = Readable::read(reader)?; let mut funding_txo_set = HashSet::with_capacity(cmp::min(channel_count as usize, 128)); - let mut peer_channels: HashMap>> = HashMap::with_capacity(cmp::min(channel_count as usize, 128)); + let mut funded_peer_channels: HashMap>> = HashMap::with_capacity(cmp::min(channel_count as usize, 128)); let mut id_to_peer = HashMap::with_capacity(cmp::min(channel_count as usize, 128)); let mut short_to_chan_info = HashMap::with_capacity(cmp::min(channel_count as usize, 128)); let mut channel_closures = VecDeque::new(); @@ -8630,8 +9015,22 @@ where // But if the channel is behind of the monitor, close the channel: log_error!(args.logger, "A ChannelManager is stale compared to the current ChannelMonitor!"); log_error!(args.logger, " The channel will be force-closed and the latest commitment transaction from the ChannelMonitor broadcast."); - log_error!(args.logger, " The ChannelMonitor for channel {} is at update_id {} but the ChannelManager is at update_id {}.", - &channel.context.channel_id(), monitor.get_latest_update_id(), channel.context.get_latest_monitor_update_id()); + if channel.context.get_latest_monitor_update_id() < monitor.get_latest_update_id() { + log_error!(args.logger, " The ChannelMonitor for channel {} is at update_id {} but the ChannelManager is at update_id {}.", + &channel.context.channel_id(), monitor.get_latest_update_id(), channel.context.get_latest_monitor_update_id()); + } + if channel.get_cur_holder_commitment_transaction_number() > monitor.get_cur_holder_commitment_number() { + log_error!(args.logger, " The ChannelMonitor for channel {} is at holder commitment number {} but the ChannelManager is at holder commitment number {}.", + &channel.context.channel_id(), monitor.get_cur_holder_commitment_number(), channel.get_cur_holder_commitment_transaction_number()); + } + if channel.get_revoked_counterparty_commitment_transaction_number() > monitor.get_min_seen_secret() { + log_error!(args.logger, " The ChannelMonitor for channel {} is at revoked counterparty transaction number {} but the ChannelManager is at revoked counterparty transaction number {}.", + &channel.context.channel_id(), monitor.get_min_seen_secret(), channel.get_revoked_counterparty_commitment_transaction_number()); + } + if channel.get_cur_counterparty_commitment_transaction_number() > monitor.get_cur_counterparty_commitment_number() { + log_error!(args.logger, " The ChannelMonitor for channel {} is at counterparty commitment transaction number {} but the ChannelManager is at counterparty commitment transaction number {}.", + &channel.context.channel_id(), monitor.get_cur_counterparty_commitment_number(), channel.get_cur_counterparty_commitment_transaction_number()); + } let (monitor_update, mut new_failed_htlcs) = channel.context.force_shutdown(true); if let Some((counterparty_node_id, funding_txo, update)) = monitor_update { close_background_events.push(BackgroundEvent::MonitorUpdateRegeneratedOnStartup { @@ -8675,14 +9074,14 @@ where if channel.context.is_funding_initiated() { id_to_peer.insert(channel.context.channel_id(), channel.context.get_counterparty_node_id()); } - match peer_channels.entry(channel.context.get_counterparty_node_id()) { + match funded_peer_channels.entry(channel.context.get_counterparty_node_id()) { hash_map::Entry::Occupied(mut entry) => { let by_id_map = entry.get_mut(); - by_id_map.insert(channel.context.channel_id(), channel); + by_id_map.insert(channel.context.channel_id(), ChannelPhase::Funded(channel)); }, hash_map::Entry::Vacant(entry) => { let mut by_id_map = HashMap::new(); - by_id_map.insert(channel.context.channel_id(), channel); + by_id_map.insert(channel.context.channel_id(), ChannelPhase::Funded(channel)); entry.insert(by_id_map); } } @@ -8749,8 +9148,6 @@ where let peer_state_from_chans = |channel_by_id| { PeerState { channel_by_id, - outbound_v1_channel_by_id: HashMap::new(), - inbound_v1_channel_by_id: HashMap::new(), inbound_channel_request_by_id: HashMap::new(), latest_features: InitFeatures::empty(), pending_msg_events: Vec::new(), @@ -8765,7 +9162,7 @@ where let mut per_peer_state = HashMap::with_capacity(cmp::min(peer_count as usize, MAX_ALLOC_SIZE/mem::size_of::<(PublicKey, Mutex>)>())); for _ in 0..peer_count { let peer_pubkey = Readable::read(reader)?; - let peer_chans = peer_channels.remove(&peer_pubkey).unwrap_or(HashMap::new()); + let peer_chans = funded_peer_channels.remove(&peer_pubkey).unwrap_or(HashMap::new()); let mut peer_state = peer_state_from_chans(peer_chans); peer_state.latest_features = Readable::read(reader)?; per_peer_state.insert(peer_pubkey, Mutex::new(peer_state)); @@ -8926,30 +9323,37 @@ where for (counterparty_id, peer_state_mtx) in per_peer_state.iter_mut() { let mut peer_state_lock = peer_state_mtx.lock().unwrap(); let peer_state = &mut *peer_state_lock; - for (_, chan) in peer_state.channel_by_id.iter() { - // Channels that were persisted have to be funded, otherwise they should have been - // discarded. - let funding_txo = chan.context.get_funding_txo().ok_or(DecodeError::InvalidValue)?; - let monitor = args.channel_monitors.get(&funding_txo) - .expect("We already checked for monitor presence when loading channels"); - let mut max_in_flight_update_id = monitor.get_latest_update_id(); - if let Some(in_flight_upds) = &mut in_flight_monitor_updates { - if let Some(mut chan_in_flight_upds) = in_flight_upds.remove(&(*counterparty_id, funding_txo)) { - max_in_flight_update_id = cmp::max(max_in_flight_update_id, - handle_in_flight_updates!(*counterparty_id, chan_in_flight_upds, - funding_txo, monitor, peer_state, "")); + for phase in peer_state.channel_by_id.values() { + if let ChannelPhase::Funded(chan) = phase { + // Channels that were persisted have to be funded, otherwise they should have been + // discarded. + let funding_txo = chan.context.get_funding_txo().ok_or(DecodeError::InvalidValue)?; + let monitor = args.channel_monitors.get(&funding_txo) + .expect("We already checked for monitor presence when loading channels"); + let mut max_in_flight_update_id = monitor.get_latest_update_id(); + if let Some(in_flight_upds) = &mut in_flight_monitor_updates { + if let Some(mut chan_in_flight_upds) = in_flight_upds.remove(&(*counterparty_id, funding_txo)) { + max_in_flight_update_id = cmp::max(max_in_flight_update_id, + handle_in_flight_updates!(*counterparty_id, chan_in_flight_upds, + funding_txo, monitor, peer_state, "")); + } } - } - if chan.get_latest_unblocked_monitor_update_id() > max_in_flight_update_id { - // If the channel is ahead of the monitor, return InvalidValue: - log_error!(args.logger, "A ChannelMonitor is stale compared to the current ChannelManager! This indicates a potentially-critical violation of the chain::Watch API!"); - log_error!(args.logger, " The ChannelMonitor for channel {} is at update_id {} with update_id through {} in-flight", - &chan.context.channel_id(), monitor.get_latest_update_id(), max_in_flight_update_id); - log_error!(args.logger, " but the ChannelManager is at update_id {}.", chan.get_latest_unblocked_monitor_update_id()); - log_error!(args.logger, " The chain::Watch API *requires* that monitors are persisted durably before returning,"); - log_error!(args.logger, " client applications must ensure that ChannelMonitor data is always available and the latest to avoid funds loss!"); - log_error!(args.logger, " Without the latest ChannelMonitor we cannot continue without risking funds."); - log_error!(args.logger, " Please ensure the chain::Watch API requirements are met and file a bug report at https://github.com/lightningdevkit/rust-lightning"); + if chan.get_latest_unblocked_monitor_update_id() > max_in_flight_update_id { + // If the channel is ahead of the monitor, return InvalidValue: + log_error!(args.logger, "A ChannelMonitor is stale compared to the current ChannelManager! This indicates a potentially-critical violation of the chain::Watch API!"); + log_error!(args.logger, " The ChannelMonitor for channel {} is at update_id {} with update_id through {} in-flight", + chan.context.channel_id(), monitor.get_latest_update_id(), max_in_flight_update_id); + log_error!(args.logger, " but the ChannelManager is at update_id {}.", chan.get_latest_unblocked_monitor_update_id()); + log_error!(args.logger, " The chain::Watch API *requires* that monitors are persisted durably before returning,"); + log_error!(args.logger, " client applications must ensure that ChannelMonitor data is always available and the latest to avoid funds loss!"); + log_error!(args.logger, " Without the latest ChannelMonitor we cannot continue without risking funds."); + log_error!(args.logger, " Please ensure the chain::Watch API requirements are met and file a bug report at https://github.com/lightningdevkit/rust-lightning"); + return Err(DecodeError::InvalidValue); + } + } else { + // We shouldn't have persisted (or read) any unfunded channel types so none should have been + // created in this `channel_by_id` map. + debug_assert!(false); return Err(DecodeError::InvalidValue); } } @@ -9114,6 +9518,7 @@ where // downstream chan is closed (because we don't have a // channel_id -> peer map entry). counterparty_opt.is_none(), + counterparty_opt.cloned().or(monitor.get_counterparty_node_id()), monitor.get_funding_txo().0)) } else { None } } else { @@ -9221,28 +9626,35 @@ where for (_peer_node_id, peer_state_mutex) in per_peer_state.iter_mut() { let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; - for (chan_id, chan) in peer_state.channel_by_id.iter_mut() { - if chan.context.outbound_scid_alias() == 0 { - let mut outbound_scid_alias; - loop { - outbound_scid_alias = fake_scid::Namespace::OutboundAlias - .get_fake_scid(best_block_height, &genesis_hash, fake_scid_rand_bytes.as_ref().unwrap(), &args.entropy_source); - if outbound_scid_aliases.insert(outbound_scid_alias) { break; } - } - chan.context.set_outbound_scid_alias(outbound_scid_alias); - } else if !outbound_scid_aliases.insert(chan.context.outbound_scid_alias()) { - // Note that in rare cases its possible to hit this while reading an older - // channel if we just happened to pick a colliding outbound alias above. - log_error!(args.logger, "Got duplicate outbound SCID alias; {}", chan.context.outbound_scid_alias()); - return Err(DecodeError::InvalidValue); - } - if chan.context.is_usable() { - if short_to_chan_info.insert(chan.context.outbound_scid_alias(), (chan.context.get_counterparty_node_id(), *chan_id)).is_some() { + for (chan_id, phase) in peer_state.channel_by_id.iter_mut() { + if let ChannelPhase::Funded(chan) = phase { + if chan.context.outbound_scid_alias() == 0 { + let mut outbound_scid_alias; + loop { + outbound_scid_alias = fake_scid::Namespace::OutboundAlias + .get_fake_scid(best_block_height, &genesis_hash, fake_scid_rand_bytes.as_ref().unwrap(), &args.entropy_source); + if outbound_scid_aliases.insert(outbound_scid_alias) { break; } + } + chan.context.set_outbound_scid_alias(outbound_scid_alias); + } else if !outbound_scid_aliases.insert(chan.context.outbound_scid_alias()) { // Note that in rare cases its possible to hit this while reading an older // channel if we just happened to pick a colliding outbound alias above. log_error!(args.logger, "Got duplicate outbound SCID alias; {}", chan.context.outbound_scid_alias()); return Err(DecodeError::InvalidValue); } + if chan.context.is_usable() { + if short_to_chan_info.insert(chan.context.outbound_scid_alias(), (chan.context.get_counterparty_node_id(), *chan_id)).is_some() { + // Note that in rare cases its possible to hit this while reading an older + // channel if we just happened to pick a colliding outbound alias above. + log_error!(args.logger, "Got duplicate outbound SCID alias; {}", chan.context.outbound_scid_alias()); + return Err(DecodeError::InvalidValue); + } + } + } else { + // We shouldn't have persisted (or read) any unfunded channel types so none should have been + // created in this `channel_by_id` map. + debug_assert!(false); + return Err(DecodeError::InvalidValue); } } } @@ -9284,7 +9696,7 @@ where let peer_state_mutex = per_peer_state.get(peer_node_id).unwrap(); let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; - if let Some(channel) = peer_state.channel_by_id.get_mut(&previous_channel_id) { + if let Some(ChannelPhase::Funded(channel)) = peer_state.channel_by_id.get_mut(&previous_channel_id) { channel.claim_htlc_while_disconnected_dropping_mon_update(claimable_htlc.prev_hop.htlc_id, payment_preimage, &args.logger); } } @@ -9368,7 +9780,9 @@ where pending_background_events: Mutex::new(pending_background_events), total_consistency_lock: RwLock::new(()), background_events_processed_since_startup: AtomicBool::new(false), - persistence_notifier: Notifier::new(), + + event_persist_notifier: Notifier::new(), + needs_persist_flag: AtomicBool::new(false), entropy_source: args.entropy_source, node_signer: args.node_signer, @@ -9385,12 +9799,12 @@ where channel_manager.fail_htlc_backwards_internal(&source, &payment_hash, &reason, receiver); } - for (source, preimage, downstream_value, downstream_closed, downstream_funding) in pending_claims_to_replay { + for (source, preimage, downstream_value, downstream_closed, downstream_node_id, downstream_funding) in pending_claims_to_replay { // We use `downstream_closed` in place of `from_onchain` here just as a guess - we // don't remember in the `ChannelMonitor` where we got a preimage from, but if the // channel is closed we just assume that it probably came from an on-chain claim. channel_manager.claim_funds_internal(source, preimage, Some(downstream_value), - downstream_closed, downstream_funding); + downstream_closed, downstream_node_id, downstream_funding); } //TODO: Broadcast channel update for closed channels, but only after we've made a @@ -9430,9 +9844,9 @@ mod tests { // All nodes start with a persistable update pending as `create_network` connects each node // with all other nodes to make most tests simpler. - assert!(nodes[0].node.get_persistable_update_future().poll_is_complete()); - assert!(nodes[1].node.get_persistable_update_future().poll_is_complete()); - assert!(nodes[2].node.get_persistable_update_future().poll_is_complete()); + assert!(nodes[0].node.get_event_or_persistence_needed_future().poll_is_complete()); + assert!(nodes[1].node.get_event_or_persistence_needed_future().poll_is_complete()); + assert!(nodes[2].node.get_event_or_persistence_needed_future().poll_is_complete()); let mut chan = create_announced_chan_between_nodes(&nodes, 0, 1); @@ -9446,19 +9860,19 @@ mod tests { &nodes[0].node.get_our_node_id()).pop().unwrap(); // The first two nodes (which opened a channel) should now require fresh persistence - assert!(nodes[0].node.get_persistable_update_future().poll_is_complete()); - assert!(nodes[1].node.get_persistable_update_future().poll_is_complete()); + assert!(nodes[0].node.get_event_or_persistence_needed_future().poll_is_complete()); + assert!(nodes[1].node.get_event_or_persistence_needed_future().poll_is_complete()); // ... but the last node should not. - assert!(!nodes[2].node.get_persistable_update_future().poll_is_complete()); + assert!(!nodes[2].node.get_event_or_persistence_needed_future().poll_is_complete()); // After persisting the first two nodes they should no longer need fresh persistence. - assert!(!nodes[0].node.get_persistable_update_future().poll_is_complete()); - assert!(!nodes[1].node.get_persistable_update_future().poll_is_complete()); + assert!(!nodes[0].node.get_event_or_persistence_needed_future().poll_is_complete()); + assert!(!nodes[1].node.get_event_or_persistence_needed_future().poll_is_complete()); // Node 3, unrelated to the only channel, shouldn't care if it receives a channel_update // about the channel. nodes[2].node.handle_channel_update(&nodes[1].node.get_our_node_id(), &chan.0); nodes[2].node.handle_channel_update(&nodes[1].node.get_our_node_id(), &chan.1); - assert!(!nodes[2].node.get_persistable_update_future().poll_is_complete()); + assert!(!nodes[2].node.get_event_or_persistence_needed_future().poll_is_complete()); // The nodes which are a party to the channel should also ignore messages from unrelated // parties. @@ -9466,8 +9880,8 @@ mod tests { nodes[0].node.handle_channel_update(&nodes[2].node.get_our_node_id(), &chan.1); nodes[1].node.handle_channel_update(&nodes[2].node.get_our_node_id(), &chan.0); nodes[1].node.handle_channel_update(&nodes[2].node.get_our_node_id(), &chan.1); - assert!(!nodes[0].node.get_persistable_update_future().poll_is_complete()); - assert!(!nodes[1].node.get_persistable_update_future().poll_is_complete()); + assert!(!nodes[0].node.get_event_or_persistence_needed_future().poll_is_complete()); + assert!(!nodes[1].node.get_event_or_persistence_needed_future().poll_is_complete()); // At this point the channel info given by peers should still be the same. assert_eq!(nodes[0].node.list_channels()[0], node_a_chan_info); @@ -9484,8 +9898,8 @@ mod tests { // persisted and that its channel info remains the same. nodes[0].node.handle_channel_update(&nodes[1].node.get_our_node_id(), &as_update); nodes[1].node.handle_channel_update(&nodes[0].node.get_our_node_id(), &bs_update); - assert!(!nodes[0].node.get_persistable_update_future().poll_is_complete()); - assert!(!nodes[1].node.get_persistable_update_future().poll_is_complete()); + assert!(!nodes[0].node.get_event_or_persistence_needed_future().poll_is_complete()); + assert!(!nodes[1].node.get_event_or_persistence_needed_future().poll_is_complete()); assert_eq!(nodes[0].node.list_channels()[0], node_a_chan_info); assert_eq!(nodes[1].node.list_channels()[0], node_b_chan_info); @@ -9493,8 +9907,8 @@ mod tests { // the channel info has updated. nodes[0].node.handle_channel_update(&nodes[1].node.get_our_node_id(), &bs_update); nodes[1].node.handle_channel_update(&nodes[0].node.get_our_node_id(), &as_update); - assert!(nodes[0].node.get_persistable_update_future().poll_is_complete()); - assert!(nodes[1].node.get_persistable_update_future().poll_is_complete()); + assert!(nodes[0].node.get_event_or_persistence_needed_future().poll_is_complete()); + assert!(nodes[1].node.get_event_or_persistence_needed_future().poll_is_complete()); assert_ne!(nodes[0].node.list_channels()[0], node_a_chan_info); assert_ne!(nodes[1].node.list_channels()[0], node_b_chan_info); } @@ -9645,13 +10059,12 @@ mod tests { // To start (1), send a regular payment but don't claim it. let expected_route = [&nodes[1]]; - let (payment_preimage, payment_hash, _) = route_payment(&nodes[0], &expected_route, 100_000); + let (payment_preimage, payment_hash, ..) = route_payment(&nodes[0], &expected_route, 100_000); // Next, attempt a keysend payment and make sure it fails. - let route_params = RouteParameters { - payment_params: PaymentParameters::for_keysend(expected_route.last().unwrap().node.get_our_node_id(), TEST_FINAL_CLTV, false), - final_value_msat: 100_000, - }; + let route_params = RouteParameters::from_payment_params_and_value( + PaymentParameters::for_keysend(expected_route.last().unwrap().node.get_our_node_id(), + TEST_FINAL_CLTV, false), 100_000); let route = find_route( &nodes[0].node.get_our_node_id(), &route_params, &nodes[0].network_graph, None, nodes[0].logger, &scorer, &(), &random_seed_bytes @@ -9739,10 +10152,10 @@ mod tests { pass_along_path(&nodes[0], &path, 100_000, payment_hash, None, event, true, Some(payment_preimage)); // Next, attempt a keysend payment and make sure it fails. - let route_params = RouteParameters { - payment_params: PaymentParameters::for_keysend(expected_route.last().unwrap().node.get_our_node_id(), TEST_FINAL_CLTV, false), - final_value_msat: 100_000, - }; + let route_params = RouteParameters::from_payment_params_and_value( + PaymentParameters::for_keysend(expected_route.last().unwrap().node.get_our_node_id(), TEST_FINAL_CLTV, false), + 100_000 + ); let route = find_route( &nodes[0].node.get_our_node_id(), &route_params, &nodes[0].network_graph, None, nodes[0].logger, &scorer, &(), &random_seed_bytes @@ -9788,10 +10201,8 @@ mod tests { let payee_pubkey = nodes[1].node.get_our_node_id(); let _chan = create_chan_between_nodes(&nodes[0], &nodes[1]); - let route_params = RouteParameters { - payment_params: PaymentParameters::for_keysend(payee_pubkey, 40, false), - final_value_msat: 10_000, - }; + let route_params = RouteParameters::from_payment_params_and_value( + PaymentParameters::for_keysend(payee_pubkey, 40, false), 10_000); let network_graph = nodes[0].network_graph.clone(); let first_hops = nodes[0].node.list_usable_channels(); let scorer = test_utils::TestScorer::new(); @@ -9835,10 +10246,8 @@ mod tests { let payee_pubkey = nodes[1].node.get_our_node_id(); let _chan = create_chan_between_nodes(&nodes[0], &nodes[1]); - let route_params = RouteParameters { - payment_params: PaymentParameters::for_keysend(payee_pubkey, 40, false), - final_value_msat: 10_000, - }; + let route_params = RouteParameters::from_payment_params_and_value( + PaymentParameters::for_keysend(payee_pubkey, 40, false), 10_000); let network_graph = nodes[0].network_graph.clone(); let first_hops = nodes[0].node.list_usable_channels(); let scorer = test_utils::TestScorer::new(); @@ -10735,9 +11144,9 @@ pub mod bench { let payment_secret = $node_b.create_inbound_payment_for_hash(payment_hash, None, 7200, None).unwrap(); $node_a.send_payment(payment_hash, RecipientOnionFields::secret_only(payment_secret), - PaymentId(payment_hash.0), RouteParameters { - payment_params, final_value_msat: 10_000, - }, Retry::Attempts(0)).unwrap(); + PaymentId(payment_hash.0), + RouteParameters::from_payment_params_and_value(payment_params, 10_000), + Retry::Attempts(0)).unwrap(); let payment_event = SendEvent::from_event($node_a.get_and_clear_pending_msg_events().pop().unwrap()); $node_b.handle_update_add_htlc(&$node_a.get_our_node_id(), &payment_event.msgs[0]); $node_b.handle_commitment_signed(&$node_a.get_our_node_id(), &payment_event.commitment_msg);