X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;ds=sidebyside;f=lightning%2Fsrc%2Fln%2Fchannelmanager.rs;h=918aca8e77a705201c3afab9e0890df02d280d30;hb=286d1db2cd36e287ebc518b7253b2cd7a62513dd;hp=dbd0c8d1bf2c42251840589caa39bdc7ca6f9cca;hpb=15199a65a74118386b4223cb32d9ffebb41727be;p=rust-lightning diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index dbd0c8d1..918aca8e 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -177,7 +177,7 @@ pub(super) enum HTLCForwardInfo { } /// Tracks the inbound corresponding to an outbound HTLC -#[derive(Clone, Hash, PartialEq, Eq)] +#[derive(Clone, Debug, Hash, PartialEq, Eq)] pub(crate) struct HTLCPreviousHopData { // Note that this may be an outbound SCID alias for the associated channel. short_channel_id: u64, @@ -233,7 +233,8 @@ impl From<&ClaimableHTLC> for events::ClaimedHTLC { } } -/// A payment identifier used to uniquely identify a payment to LDK. +/// A user-provided identifier in [`ChannelManager::send_payment`] used to uniquely identify +/// a payment and ensure idempotency in LDK. /// /// This is not exported to bindings users as we just use [u8; 32] directly #[derive(Hash, Copy, Clone, PartialEq, Eq, Debug)] @@ -282,7 +283,7 @@ impl Readable for InterceptId { } } -#[derive(Clone, Copy, PartialEq, Eq, Hash)] +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] /// Uniquely describes an HTLC by its source. Just the guaranteed-unique subset of [`HTLCSource`]. pub(crate) enum SentHTLCId { PreviousHopData { short_channel_id: u64, htlc_id: u64 }, @@ -313,7 +314,7 @@ impl_writeable_tlv_based_enum!(SentHTLCId, /// Tracks the inbound corresponding to an outbound HTLC #[allow(clippy::derive_hash_xor_eq)] // Our Hash is faithful to the data, we just don't have SecretKey::hash -#[derive(Clone, PartialEq, Eq)] +#[derive(Clone, Debug, PartialEq, Eq)] pub(crate) enum HTLCSource { PreviousHopData(HTLCPreviousHopData), OutboundRoute { @@ -494,6 +495,10 @@ impl MsgHandleErrInternal { channel_capacity: None, } } + + fn closes_channel(&self) -> bool { + self.chan_id.is_some() + } } /// We hold back HTLCs we intend to relay for a random interval greater than this (see @@ -655,7 +660,6 @@ pub(crate) enum RAAMonitorUpdateBlockingAction { } impl RAAMonitorUpdateBlockingAction { - #[allow(unused)] fn from_prev_hop_data(prev_hop: &HTLCPreviousHopData) -> Self { Self::ForwardedPaymentInboundClaim { channel_id: prev_hop.outpoint.to_channel_id(), @@ -675,18 +679,6 @@ pub(super) struct PeerState where SP::Target: SignerProvider { /// /// Holds all channels within corresponding `ChannelPhase`s where the peer is the counterparty. pub(super) channel_by_id: HashMap>, - /// `temporary_channel_id` -> `OutboundV1Channel`. - /// - /// Holds all outbound V1 channels where the peer is the counterparty. Once an outbound channel has - /// been assigned a `channel_id`, the entry in this map is removed and one is created in - /// `channel_by_id`. - pub(super) outbound_v1_channel_by_id: HashMap>, - /// `temporary_channel_id` -> `InboundV1Channel`. - /// - /// Holds all inbound V1 channels where the peer is the counterparty. Once an inbound channel has - /// been assigned a `channel_id`, the entry in this map is removed and one is created in - /// `channel_by_id`. - pub(super) inbound_v1_channel_by_id: HashMap>, /// `temporary_channel_id` -> `InboundChannelRequest`. /// /// When manual channel acceptance is enabled, this holds all unaccepted inbound channels where @@ -740,24 +732,20 @@ impl PeerState where SP::Target: SignerProvider { if require_disconnected && self.is_connected { return false } - self.channel_by_id.is_empty() && self.monitor_update_blocked_actions.is_empty() + self.channel_by_id.iter().filter(|(_, phase)| matches!(phase, ChannelPhase::Funded(_))).count() == 0 + && self.monitor_update_blocked_actions.is_empty() && self.in_flight_monitor_updates.is_empty() } // Returns a count of all channels we have with this peer, including unfunded channels. fn total_channel_count(&self) -> usize { - self.channel_by_id.len() + - self.outbound_v1_channel_by_id.len() + - self.inbound_v1_channel_by_id.len() + - self.inbound_channel_request_by_id.len() + self.channel_by_id.len() + self.inbound_channel_request_by_id.len() } // Returns a bool indicating if the given `channel_id` matches a channel we have with this peer. fn has_channel(&self, channel_id: &ChannelId) -> bool { - self.channel_by_id.contains_key(&channel_id) || - self.outbound_v1_channel_by_id.contains_key(&channel_id) || - self.inbound_v1_channel_by_id.contains_key(&channel_id) || - self.inbound_channel_request_by_id.contains_key(&channel_id) + self.channel_by_id.contains_key(channel_id) || + self.inbound_channel_request_by_id.contains_key(channel_id) } } @@ -1201,7 +1189,8 @@ where background_events_processed_since_startup: AtomicBool, - persistence_notifier: Notifier, + event_persist_notifier: Notifier, + needs_persist_flag: AtomicBool, entropy_source: ES, node_signer: NS, @@ -1230,7 +1219,8 @@ pub struct ChainParameters { #[must_use] enum NotifyOption { DoPersist, - SkipPersist, + SkipPersistHandleEvents, + SkipPersistNoEvents, } /// Whenever we release the `ChannelManager`'s `total_consistency_lock`, from read mode, it is @@ -1243,43 +1233,75 @@ enum NotifyOption { /// We allow callers to either always notify by constructing with `notify_on_drop` or choose to /// notify or not based on whether relevant changes have been made, providing a closure to /// `optionally_notify` which returns a `NotifyOption`. -struct PersistenceNotifierGuard<'a, F: Fn() -> NotifyOption> { - persistence_notifier: &'a Notifier, +struct PersistenceNotifierGuard<'a, F: FnMut() -> NotifyOption> { + event_persist_notifier: &'a Notifier, + needs_persist_flag: &'a AtomicBool, should_persist: F, // We hold onto this result so the lock doesn't get released immediately. _read_guard: RwLockReadGuard<'a, ()>, } impl<'a> PersistenceNotifierGuard<'a, fn() -> NotifyOption> { // We don't care what the concrete F is here, it's unused - fn notify_on_drop(cm: &'a C) -> PersistenceNotifierGuard<'a, impl Fn() -> NotifyOption> { + /// Notifies any waiters and indicates that we need to persist, in addition to possibly having + /// events to handle. + /// + /// This must always be called if the changes included a `ChannelMonitorUpdate`, as well as in + /// other cases where losing the changes on restart may result in a force-close or otherwise + /// isn't ideal. + fn notify_on_drop(cm: &'a C) -> PersistenceNotifierGuard<'a, impl FnMut() -> NotifyOption> { + Self::optionally_notify(cm, || -> NotifyOption { NotifyOption::DoPersist }) + } + + fn optionally_notify NotifyOption, C: AChannelManager>(cm: &'a C, mut persist_check: F) + -> PersistenceNotifierGuard<'a, impl FnMut() -> NotifyOption> { let read_guard = cm.get_cm().total_consistency_lock.read().unwrap(); - let _ = cm.get_cm().process_background_events(); // We always persist + let force_notify = cm.get_cm().process_background_events(); PersistenceNotifierGuard { - persistence_notifier: &cm.get_cm().persistence_notifier, - should_persist: || -> NotifyOption { NotifyOption::DoPersist }, + event_persist_notifier: &cm.get_cm().event_persist_notifier, + needs_persist_flag: &cm.get_cm().needs_persist_flag, + should_persist: move || { + // Pick the "most" action between `persist_check` and the background events + // processing and return that. + let notify = persist_check(); + match (notify, force_notify) { + (NotifyOption::DoPersist, _) => NotifyOption::DoPersist, + (_, NotifyOption::DoPersist) => NotifyOption::DoPersist, + (NotifyOption::SkipPersistHandleEvents, _) => NotifyOption::SkipPersistHandleEvents, + (_, NotifyOption::SkipPersistHandleEvents) => NotifyOption::SkipPersistHandleEvents, + _ => NotifyOption::SkipPersistNoEvents, + } + }, _read_guard: read_guard, } - } /// Note that if any [`ChannelMonitorUpdate`]s are possibly generated, - /// [`ChannelManager::process_background_events`] MUST be called first. - fn optionally_notify NotifyOption>(lock: &'a RwLock<()>, notifier: &'a Notifier, persist_check: F) -> PersistenceNotifierGuard<'a, F> { - let read_guard = lock.read().unwrap(); + /// [`ChannelManager::process_background_events`] MUST be called first (or + /// [`Self::optionally_notify`] used). + fn optionally_notify_skipping_background_events NotifyOption, C: AChannelManager> + (cm: &'a C, persist_check: F) -> PersistenceNotifierGuard<'a, F> { + let read_guard = cm.get_cm().total_consistency_lock.read().unwrap(); PersistenceNotifierGuard { - persistence_notifier: notifier, + event_persist_notifier: &cm.get_cm().event_persist_notifier, + needs_persist_flag: &cm.get_cm().needs_persist_flag, should_persist: persist_check, _read_guard: read_guard, } } } -impl<'a, F: Fn() -> NotifyOption> Drop for PersistenceNotifierGuard<'a, F> { +impl<'a, F: FnMut() -> NotifyOption> Drop for PersistenceNotifierGuard<'a, F> { fn drop(&mut self) { - if (self.should_persist)() == NotifyOption::DoPersist { - self.persistence_notifier.notify(); + match (self.should_persist)() { + NotifyOption::DoPersist => { + self.needs_persist_flag.store(true, Ordering::Release); + self.event_persist_notifier.notify() + }, + NotifyOption::SkipPersistHandleEvents => + self.event_persist_notifier.notify(), + NotifyOption::SkipPersistNoEvents => {}, } } } @@ -1685,11 +1707,15 @@ pub enum ChannelShutdownState { pub enum RecentPaymentDetails { /// When an invoice was requested and thus a payment has not yet been sent. AwaitingInvoice { - /// Identifier for the payment to ensure idempotency. + /// A user-provided identifier in [`ChannelManager::send_payment`] used to uniquely identify + /// a payment and ensure idempotency in LDK. payment_id: PaymentId, }, /// When a payment is still being sent and awaiting successful delivery. Pending { + /// A user-provided identifier in [`ChannelManager::send_payment`] used to uniquely identify + /// a payment and ensure idempotency in LDK. + payment_id: PaymentId, /// Hash of the payment that is currently being sent but has yet to be fulfilled or /// abandoned. payment_hash: PaymentHash, @@ -1701,6 +1727,9 @@ pub enum RecentPaymentDetails { /// been resolved. Upon receiving [`Event::PaymentSent`], we delay for a few minutes before the /// payment is removed from tracking. Fulfilled { + /// A user-provided identifier in [`ChannelManager::send_payment`] used to uniquely identify + /// a payment and ensure idempotency in LDK. + payment_id: PaymentId, /// Hash of the payment that was claimed. `None` for serializations of [`ChannelManager`] /// made before LDK version 0.0.104. payment_hash: Option, @@ -1709,6 +1738,9 @@ pub enum RecentPaymentDetails { /// abandoned via [`ChannelManager::abandon_payment`], it is marked as abandoned until all /// pending HTLCs for this payment resolve and an [`Event::PaymentFailed`] is generated. Abandoned { + /// A user-provided identifier in [`ChannelManager::send_payment`] used to uniquely identify + /// a payment and ensure idempotency in LDK. + payment_id: PaymentId, /// Hash of the payment that we have given up trying to send. payment_hash: PaymentHash, }, @@ -1810,28 +1842,6 @@ macro_rules! update_maps_on_chan_removal { }} } -/// Returns (boolean indicating if we should remove the Channel object from memory, a mapped error) -macro_rules! convert_unfunded_chan_err { - ($self: ident, $err: expr, $channel: expr, $channel_id: expr) => { - match $err { - ChannelError::Warn(msg) => { - (false, MsgHandleErrInternal::from_chan_no_close(ChannelError::Warn(msg), $channel_id.clone())) - }, - ChannelError::Ignore(msg) => { - (false, MsgHandleErrInternal::from_chan_no_close(ChannelError::Ignore(msg), $channel_id.clone())) - }, - ChannelError::Close(msg) => { - log_error!($self.logger, "Closing channel {} due to close-required error: {}", &$channel_id, msg); - update_maps_on_chan_removal!($self, &$channel.context); - let shutdown_res = $channel.context.force_shutdown(true); - - (true, MsgHandleErrInternal::from_finish_shutdown(msg, *$channel_id, $channel.context.get_user_id(), - shutdown_res, None, $channel.context.get_value_satoshis())) - }, - } - }; -} - /// Returns (boolean indicating if we should remove the Channel object from memory, a mapped error) macro_rules! convert_chan_phase_err { ($self: ident, $err: expr, $channel: expr, $channel_id: expr, MANUAL_CHANNEL_UPDATE, $channel_update: expr) => { @@ -1907,31 +1917,6 @@ macro_rules! try_chan_phase_entry { } } -macro_rules! try_unfunded_chan_entry { - ($self: ident, $res: expr, $entry: expr) => { - match $res { - Ok(res) => res, - Err(e) => { - let (drop, res) = convert_unfunded_chan_err!($self, e, $entry.get_mut(), $entry.key()); - if drop { - $entry.remove_entry(); - } - return Err(res); - } - } - } -} - -macro_rules! remove_channel { - ($self: expr, $entry: expr) => { - { - let channel = $entry.remove_entry().1; - update_maps_on_chan_removal!($self, &channel.context); - channel - } - } -} - macro_rules! remove_channel_phase { ($self: expr, $entry: expr) => { { @@ -2138,7 +2123,7 @@ macro_rules! process_events_body { return; } - let mut result = NotifyOption::SkipPersist; + let mut result; { // We'll acquire our total consistency lock so that we can be sure no other @@ -2147,7 +2132,7 @@ macro_rules! process_events_body { // Because `handle_post_event_actions` may send `ChannelMonitorUpdate`s to the user we must // ensure any startup-generated background events are handled first. - if $self.process_background_events() == NotifyOption::DoPersist { result = NotifyOption::DoPersist; } + result = $self.process_background_events(); // TODO: This behavior should be documented. It's unintuitive that we query // ChannelMonitors when clearing other events. @@ -2187,8 +2172,14 @@ macro_rules! process_events_body { processed_all_events = false; } - if result == NotifyOption::DoPersist { - $self.persistence_notifier.notify(); + match result { + NotifyOption::DoPersist => { + $self.needs_persist_flag.store(true, Ordering::Release); + $self.event_persist_notifier.notify(); + }, + NotifyOption::SkipPersistHandleEvents => + $self.event_persist_notifier.notify(), + NotifyOption::SkipPersistNoEvents => {}, } } } @@ -2267,7 +2258,9 @@ where pending_background_events: Mutex::new(Vec::new()), total_consistency_lock: RwLock::new(()), background_events_processed_since_startup: AtomicBool::new(false), - persistence_notifier: Notifier::new(), + + event_persist_notifier: Notifier::new(), + needs_persist_flag: AtomicBool::new(false), entropy_source, node_signer, @@ -2363,7 +2356,7 @@ where let res = channel.get_open_channel(self.genesis_hash.clone()); let temporary_channel_id = channel.context.channel_id(); - match peer_state.outbound_v1_channel_by_id.entry(temporary_channel_id) { + match peer_state.channel_by_id.entry(temporary_channel_id) { hash_map::Entry::Occupied(_) => { if cfg!(fuzzing) { return Err(APIError::APIMisuseError { err: "Fuzzy bad RNG".to_owned() }); @@ -2371,7 +2364,7 @@ where panic!("RNG is bad???"); } }, - hash_map::Entry::Vacant(entry) => { entry.insert(channel); } + hash_map::Entry::Vacant(entry) => { entry.insert(ChannelPhase::UnfundedOutboundV1(channel)); } } peer_state.pending_msg_events.push(events::MessageSendEvent::SendOpenChannel { @@ -2433,16 +2426,6 @@ where peer_state.latest_features.clone(), &self.fee_estimator); res.push(details); } - for (_channel_id, channel) in peer_state.inbound_v1_channel_by_id.iter() { - let details = ChannelDetails::from_channel_context(&channel.context, best_block_height, - peer_state.latest_features.clone(), &self.fee_estimator); - res.push(details); - } - for (_channel_id, channel) in peer_state.outbound_v1_channel_by_id.iter() { - let details = ChannelDetails::from_channel_context(&channel.context, best_block_height, - peer_state.latest_features.clone(), &self.fee_estimator); - res.push(details); - } } } res @@ -2476,8 +2459,6 @@ where return peer_state.channel_by_id .iter() .map(|(_, phase)| phase.context()) - .chain(peer_state.outbound_v1_channel_by_id.iter().map(|(_, channel)| &channel.context)) - .chain(peer_state.inbound_v1_channel_by_id.iter().map(|(_, channel)| &channel.context)) .map(context_to_details) .collect(); } @@ -2504,15 +2485,16 @@ where }, PendingOutboundPayment::Retryable { payment_hash, total_msat, .. } => { Some(RecentPaymentDetails::Pending { + payment_id: *payment_id, payment_hash: *payment_hash, total_msat: *total_msat, }) }, PendingOutboundPayment::Abandoned { payment_hash, .. } => { - Some(RecentPaymentDetails::Abandoned { payment_hash: *payment_hash }) + Some(RecentPaymentDetails::Abandoned { payment_id: *payment_id, payment_hash: *payment_hash }) }, PendingOutboundPayment::Fulfilled { payment_hash, .. } => { - Some(RecentPaymentDetails::Fulfilled { payment_hash: *payment_hash }) + Some(RecentPaymentDetails::Fulfilled { payment_id: *payment_id, payment_hash: *payment_hash }) }, PendingOutboundPayment::Legacy { .. } => None }) @@ -2719,20 +2701,6 @@ where (None, chan_phase.context().get_counterparty_node_id()) }, } - } else if let hash_map::Entry::Occupied(chan) = peer_state.outbound_v1_channel_by_id.entry(channel_id.clone()) { - log_error!(self.logger, "Force-closing channel {}", &channel_id); - self.issue_channel_close_events(&chan.get().context, closure_reason); - let mut chan = remove_channel!(self, chan); - self.finish_force_close_channel(chan.context.force_shutdown(false)); - // Unfunded channel has no update - (None, chan.context.get_counterparty_node_id()) - } else if let hash_map::Entry::Occupied(chan) = peer_state.inbound_v1_channel_by_id.entry(channel_id.clone()) { - log_error!(self.logger, "Force-closing channel {}", &channel_id); - self.issue_channel_close_events(&chan.get().context, closure_reason); - let mut chan = remove_channel!(self, chan); - self.finish_force_close_channel(chan.context.force_shutdown(false)); - // Unfunded channel has no update - (None, chan.context.get_counterparty_node_id()) } else if peer_state.inbound_channel_request_by_id.remove(channel_id).is_some() { log_error!(self.logger, "Force-closing channel {}", &channel_id); // N.B. that we don't send any channel close event here: we @@ -3577,8 +3545,8 @@ where let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; - let (chan, msg) = match peer_state.outbound_v1_channel_by_id.remove(&temporary_channel_id) { - Some(chan) => { + let (chan, msg) = match peer_state.channel_by_id.remove(temporary_channel_id) { + Some(ChannelPhase::UnfundedOutboundV1(chan)) => { let funding_txo = find_funding_output(&chan, &funding_transaction)?; let funding_res = chan.get_funding_created(funding_transaction, funding_txo, &self.logger) @@ -3602,13 +3570,18 @@ where }, } }, - None => { - return Err(APIError::ChannelUnavailable { + Some(phase) => { + peer_state.channel_by_id.insert(*temporary_channel_id, phase); + return Err(APIError::APIMisuseError { err: format!( - "Channel with id {} not found for the passed counterparty node_id {}", + "Channel with id {} for the passed counterparty node_id {} is not an unfunded, outbound V1 channel", temporary_channel_id, counterparty_node_id), }) }, + None => return Err(APIError::ChannelUnavailable {err: format!( + "Channel with id {} not found for the passed counterparty node_id {}", + temporary_channel_id, counterparty_node_id), + }), }; peer_state.pending_msg_events.push(events::MessageSendEvent::SendFundingCreated { @@ -3781,12 +3754,6 @@ where } } continue; - } - - let context = if let Some(channel) = peer_state.inbound_v1_channel_by_id.get_mut(channel_id) { - &mut channel.context - } else if let Some(channel) = peer_state.outbound_v1_channel_by_id.get_mut(channel_id) { - &mut channel.context } else { // This should not be reachable as we've already checked for non-existence in the previous channel_id loop. debug_assert!(false); @@ -3796,11 +3763,6 @@ where channel_id, counterparty_node_id), }); }; - let mut config = context.config(); - config.apply(config_update); - // We update the config, but we MUST NOT broadcast a `channel_update` before `channel_ready` - // which would be the case for pending inbound/outbound channels. - context.update_config(&config); } Ok(()) } @@ -4423,7 +4385,7 @@ where let mut background_events = Vec::new(); mem::swap(&mut *self.pending_background_events.lock().unwrap(), &mut background_events); if background_events.is_empty() { - return NotifyOption::SkipPersist; + return NotifyOption::SkipPersistNoEvents; } for event in background_events.drain(..) { @@ -4492,17 +4454,17 @@ where } fn update_channel_fee(&self, chan_id: &ChannelId, chan: &mut Channel, new_feerate: u32) -> NotifyOption { - if !chan.context.is_outbound() { return NotifyOption::SkipPersist; } + if !chan.context.is_outbound() { return NotifyOption::SkipPersistNoEvents; } // If the feerate has decreased by less than half, don't bother if new_feerate <= chan.context.get_feerate_sat_per_1000_weight() && new_feerate * 2 > chan.context.get_feerate_sat_per_1000_weight() { log_trace!(self.logger, "Channel {} does not qualify for a feerate change from {} to {}.", - &chan_id, chan.context.get_feerate_sat_per_1000_weight(), new_feerate); - return NotifyOption::SkipPersist; + chan_id, chan.context.get_feerate_sat_per_1000_weight(), new_feerate); + return NotifyOption::SkipPersistNoEvents; } if !chan.context.is_live() { log_trace!(self.logger, "Channel {} does not qualify for a feerate change from {} to {} as it cannot currently be updated (probably the peer is disconnected).", - &chan_id, chan.context.get_feerate_sat_per_1000_weight(), new_feerate); - return NotifyOption::SkipPersist; + chan_id, chan.context.get_feerate_sat_per_1000_weight(), new_feerate); + return NotifyOption::SkipPersistNoEvents; } log_trace!(self.logger, "Channel {} qualifies for a feerate change from {} to {}.", &chan_id, chan.context.get_feerate_sat_per_1000_weight(), new_feerate); @@ -4517,8 +4479,8 @@ where /// these a fuzz failure (as they usually indicate a channel force-close, which is exactly what /// it wants to detect). Thus, we have a variant exposed here for its benefit. pub fn maybe_update_chan_fees(&self) { - PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, &self.persistence_notifier, || { - let mut should_persist = self.process_background_events(); + PersistenceNotifierGuard::optionally_notify(self, || { + let mut should_persist = NotifyOption::SkipPersistNoEvents; let normal_feerate = self.fee_estimator.bounded_sat_per_1000_weight(ConfirmationTarget::Normal); let min_mempool_feerate = self.fee_estimator.bounded_sat_per_1000_weight(ConfirmationTarget::MempoolMinimum); @@ -4562,8 +4524,8 @@ where /// [`ChannelUpdate`]: msgs::ChannelUpdate /// [`ChannelConfig`]: crate::util::config::ChannelConfig pub fn timer_tick_occurred(&self) { - PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, &self.persistence_notifier, || { - let mut should_persist = self.process_background_events(); + PersistenceNotifierGuard::optionally_notify(self, || { + let mut should_persist = NotifyOption::SkipPersistNoEvents; let normal_feerate = self.fee_estimator.bounded_sat_per_1000_weight(ConfirmationTarget::Normal); let min_mempool_feerate = self.fee_estimator.bounded_sat_per_1000_weight(ConfirmationTarget::MempoolMinimum); @@ -4692,13 +4654,6 @@ where } }); - peer_state.outbound_v1_channel_by_id.retain(|chan_id, chan| process_unfunded_channel_tick( - chan_id, &mut chan.context, &mut chan.unfunded_context, pending_msg_events, - counterparty_node_id)); - peer_state.inbound_v1_channel_by_id.retain(|chan_id, chan| process_unfunded_channel_tick( - chan_id, &mut chan.context, &mut chan.unfunded_context, pending_msg_events, - counterparty_node_id)); - for (chan_id, req) in peer_state.inbound_channel_request_by_id.iter_mut() { if { req.ticks_remaining -= 1 ; req.ticks_remaining } <= 0 { log_error!(self.logger, "Force-closing unaccepted inbound channel {} for not accepting in a timely manner", &chan_id); @@ -5265,11 +5220,17 @@ where self.pending_outbound_payments.finalize_claims(sources, &self.pending_events); } - fn claim_funds_internal(&self, source: HTLCSource, payment_preimage: PaymentPreimage, forwarded_htlc_value_msat: Option, from_onchain: bool, next_channel_outpoint: OutPoint) { + fn claim_funds_internal(&self, source: HTLCSource, payment_preimage: PaymentPreimage, + forwarded_htlc_value_msat: Option, from_onchain: bool, + next_channel_counterparty_node_id: Option, next_channel_outpoint: OutPoint + ) { match source { HTLCSource::OutboundRoute { session_priv, payment_id, path, .. } => { debug_assert!(self.background_events_processed_since_startup.load(Ordering::Acquire), "We don't support claim_htlc claims during startup - monitors may not be available yet"); + if let Some(pubkey) = next_channel_counterparty_node_id { + debug_assert_eq!(pubkey, path.hops[0].pubkey); + } let ev_completion_action = EventCompletionAction::ReleaseRAAChannelMonitorUpdate { channel_funding_outpoint: next_channel_outpoint, counterparty_node_id: path.hops[0].pubkey, @@ -5280,6 +5241,7 @@ where }, HTLCSource::PreviousHopData(hop_data) => { let prev_outpoint = hop_data.outpoint; + let completed_blocker = RAAMonitorUpdateBlockingAction::from_prev_hop_data(&hop_data); let res = self.claim_funds_from_hop(hop_data, payment_preimage, |htlc_claim_value_msat| { if let Some(forwarded_htlc_value) = forwarded_htlc_value_msat { @@ -5295,7 +5257,17 @@ where next_channel_id: Some(next_channel_outpoint.to_channel_id()), outbound_amount_forwarded_msat: forwarded_htlc_value_msat, }, - downstream_counterparty_and_funding_outpoint: None, + downstream_counterparty_and_funding_outpoint: + if let Some(node_id) = next_channel_counterparty_node_id { + Some((node_id, next_channel_outpoint, completed_blocker)) + } else { + // We can only get `None` here if we are processing a + // `ChannelMonitor`-originated event, in which case we + // don't care about ensuring we wake the downstream + // channel's monitor updating - the channel is already + // closed. + None + }, }) } else { None } }); @@ -5573,7 +5545,7 @@ where msg: channel.accept_inbound_channel(), }); - peer_state.inbound_v1_channel_by_id.insert(temporary_channel_id.clone(), channel); + peer_state.channel_by_id.insert(temporary_channel_id.clone(), ChannelPhase::UnfundedInboundV1(channel)); Ok(()) } @@ -5605,26 +5577,34 @@ where peer: &PeerState, best_block_height: u32 ) -> usize { let mut num_unfunded_channels = 0; - for chan in peer.channel_by_id.iter().filter_map( - |(_, phase)| if let ChannelPhase::Funded(chan) = phase { Some(chan) } else { None } - ) { - // This covers non-zero-conf inbound `Channel`s that we are currently monitoring, but those - // which have not yet had any confirmations on-chain. - if !chan.context.is_outbound() && chan.context.minimum_depth().unwrap_or(1) != 0 && - chan.context.get_funding_tx_confirmations(best_block_height) == 0 - { - num_unfunded_channels += 1; - } - } - for (_, chan) in peer.inbound_v1_channel_by_id.iter() { - if chan.context.minimum_depth().unwrap_or(1) != 0 { - num_unfunded_channels += 1; + for (_, phase) in peer.channel_by_id.iter() { + match phase { + ChannelPhase::Funded(chan) => { + // This covers non-zero-conf inbound `Channel`s that we are currently monitoring, but those + // which have not yet had any confirmations on-chain. + if !chan.context.is_outbound() && chan.context.minimum_depth().unwrap_or(1) != 0 && + chan.context.get_funding_tx_confirmations(best_block_height) == 0 + { + num_unfunded_channels += 1; + } + }, + ChannelPhase::UnfundedInboundV1(chan) => { + if chan.context.minimum_depth().unwrap_or(1) != 0 { + num_unfunded_channels += 1; + } + }, + ChannelPhase::UnfundedOutboundV1(_) => { + // Outbound channels don't contribute to the unfunded count in the DoS context. + continue; + } } } num_unfunded_channels + peer.inbound_channel_request_by_id.len() } fn internal_open_channel(&self, counterparty_node_id: &PublicKey, msg: &msgs::OpenChannel) -> Result<(), MsgHandleErrInternal> { + // Note that the ChannelManager is NOT re-persisted on disk after this, so any changes are + // likely to be lost on restart! if msg.chain_hash != self.genesis_hash { return Err(MsgHandleErrInternal::send_err_msg_no_close("Unknown genesis block hash".to_owned(), msg.temporary_channel_id.clone())); } @@ -5719,11 +5699,13 @@ where node_id: counterparty_node_id.clone(), msg: channel.accept_inbound_channel(), }); - peer_state.inbound_v1_channel_by_id.insert(channel_id, channel); + peer_state.channel_by_id.insert(channel_id, ChannelPhase::UnfundedInboundV1(channel)); Ok(()) } fn internal_accept_channel(&self, counterparty_node_id: &PublicKey, msg: &msgs::AcceptChannel) -> Result<(), MsgHandleErrInternal> { + // Note that the ChannelManager is NOT re-persisted on disk after this, so any changes are + // likely to be lost on restart! let (value, output_script, user_id) = { let per_peer_state = self.per_peer_state.read().unwrap(); let peer_state_mutex = per_peer_state.get(counterparty_node_id) @@ -5733,10 +5715,17 @@ where })?; let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; - match peer_state.outbound_v1_channel_by_id.entry(msg.temporary_channel_id) { - hash_map::Entry::Occupied(mut chan) => { - try_unfunded_chan_entry!(self, chan.get_mut().accept_channel(&msg, &self.default_configuration.channel_handshake_limits, &peer_state.latest_features), chan); - (chan.get().context.get_value_satoshis(), chan.get().context.get_funding_redeemscript().to_v0_p2wsh(), chan.get().context.get_user_id()) + match peer_state.channel_by_id.entry(msg.temporary_channel_id) { + hash_map::Entry::Occupied(mut phase) => { + match phase.get_mut() { + ChannelPhase::UnfundedOutboundV1(chan) => { + try_chan_phase_entry!(self, chan.accept_channel(&msg, &self.default_configuration.channel_handshake_limits, &peer_state.latest_features), phase); + (chan.context.get_value_satoshis(), chan.context.get_funding_redeemscript().to_v0_p2wsh(), chan.context.get_user_id()) + }, + _ => { + return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got an unexpected accept_channel message from peer with counterparty_node_id {}", counterparty_node_id), msg.temporary_channel_id)); + } + } }, hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.temporary_channel_id)) } @@ -5765,8 +5754,8 @@ where let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; let (chan, funding_msg, monitor) = - match peer_state.inbound_v1_channel_by_id.remove(&msg.temporary_channel_id) { - Some(inbound_chan) => { + match peer_state.channel_by_id.remove(&msg.temporary_channel_id) { + Some(ChannelPhase::UnfundedInboundV1(inbound_chan)) => { match inbound_chan.funding_created(msg, best_block, &self.signer_provider, &self.logger) { Ok(res) => res, Err((mut inbound_chan, err)) => { @@ -5781,6 +5770,9 @@ where }, } }, + Some(ChannelPhase::Funded(_)) | Some(ChannelPhase::UnfundedOutboundV1(_)) => { + return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got an unexpected funding_created message from peer with counterparty_node_id {}", counterparty_node_id), msg.temporary_channel_id)); + }, None => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.temporary_channel_id)) }; @@ -5874,6 +5866,8 @@ where } fn internal_channel_ready(&self, counterparty_node_id: &PublicKey, msg: &msgs::ChannelReady) -> Result<(), MsgHandleErrInternal> { + // Note that the ChannelManager is NOT re-persisted on disk after this (unless we error + // closing a channel), so any changes are likely to be lost on restart! let per_peer_state = self.per_peer_state.read().unwrap(); let peer_state_mutex = per_peer_state.get(counterparty_node_id) .ok_or_else(|| { @@ -5936,49 +5930,45 @@ where })?; let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; - // TODO(dunxen): Fix this duplication when we switch to a single map with enums as per - // https://github.com/lightningdevkit/rust-lightning/issues/2422 - if let hash_map::Entry::Occupied(chan_entry) = peer_state.outbound_v1_channel_by_id.entry(msg.channel_id.clone()) { - log_error!(self.logger, "Immediately closing unfunded channel {} as peer asked to cooperatively shut it down (which is unnecessary)", &msg.channel_id); - self.issue_channel_close_events(&chan_entry.get().context, ClosureReason::CounterpartyCoopClosedUnfundedChannel); - let mut chan = remove_channel!(self, chan_entry); - self.finish_force_close_channel(chan.context.force_shutdown(false)); - return Ok(()); - } else if let hash_map::Entry::Occupied(chan_entry) = peer_state.inbound_v1_channel_by_id.entry(msg.channel_id.clone()) { - log_error!(self.logger, "Immediately closing unfunded channel {} as peer asked to cooperatively shut it down (which is unnecessary)", &msg.channel_id); - self.issue_channel_close_events(&chan_entry.get().context, ClosureReason::CounterpartyCoopClosedUnfundedChannel); - let mut chan = remove_channel!(self, chan_entry); - self.finish_force_close_channel(chan.context.force_shutdown(false)); - return Ok(()); - } else if let hash_map::Entry::Occupied(mut chan_phase_entry) = peer_state.channel_by_id.entry(msg.channel_id.clone()) { - if let ChannelPhase::Funded(chan) = chan_phase_entry.get_mut() { - if !chan.received_shutdown() { - log_info!(self.logger, "Received a shutdown message from our counterparty for channel {}{}.", - msg.channel_id, - if chan.sent_shutdown() { " after we initiated shutdown" } else { "" }); - } - - let funding_txo_opt = chan.context.get_funding_txo(); - let (shutdown, monitor_update_opt, htlcs) = try_chan_phase_entry!(self, - chan.shutdown(&self.signer_provider, &peer_state.latest_features, &msg), chan_phase_entry); - dropped_htlcs = htlcs; + if let hash_map::Entry::Occupied(mut chan_phase_entry) = peer_state.channel_by_id.entry(msg.channel_id.clone()) { + let phase = chan_phase_entry.get_mut(); + match phase { + ChannelPhase::Funded(chan) => { + if !chan.received_shutdown() { + log_info!(self.logger, "Received a shutdown message from our counterparty for channel {}{}.", + msg.channel_id, + if chan.sent_shutdown() { " after we initiated shutdown" } else { "" }); + } - if let Some(msg) = shutdown { - // We can send the `shutdown` message before updating the `ChannelMonitor` - // here as we don't need the monitor update to complete until we send a - // `shutdown_signed`, which we'll delay if we're pending a monitor update. - peer_state.pending_msg_events.push(events::MessageSendEvent::SendShutdown { - node_id: *counterparty_node_id, - msg, - }); - } + let funding_txo_opt = chan.context.get_funding_txo(); + let (shutdown, monitor_update_opt, htlcs) = try_chan_phase_entry!(self, + chan.shutdown(&self.signer_provider, &peer_state.latest_features, &msg), chan_phase_entry); + dropped_htlcs = htlcs; - // Update the monitor with the shutdown script if necessary. - if let Some(monitor_update) = monitor_update_opt { - break handle_new_monitor_update!(self, funding_txo_opt.unwrap(), monitor_update, - peer_state_lock, peer_state, per_peer_state, chan_phase_entry).map(|_| ()); - } - break Ok(()); + if let Some(msg) = shutdown { + // We can send the `shutdown` message before updating the `ChannelMonitor` + // here as we don't need the monitor update to complete until we send a + // `shutdown_signed`, which we'll delay if we're pending a monitor update. + peer_state.pending_msg_events.push(events::MessageSendEvent::SendShutdown { + node_id: *counterparty_node_id, + msg, + }); + } + // Update the monitor with the shutdown script if necessary. + if let Some(monitor_update) = monitor_update_opt { + break handle_new_monitor_update!(self, funding_txo_opt.unwrap(), monitor_update, + peer_state_lock, peer_state, per_peer_state, chan_phase_entry).map(|_| ()); + } + break Ok(()); + }, + ChannelPhase::UnfundedInboundV1(_) | ChannelPhase::UnfundedOutboundV1(_) => { + let context = phase.context_mut(); + log_error!(self.logger, "Immediately closing unfunded channel {} as peer asked to cooperatively shut it down (which is unnecessary)", &msg.channel_id); + self.issue_channel_close_events(&context, ClosureReason::CounterpartyCoopClosedUnfundedChannel); + let mut chan = remove_channel_phase!(self, chan_phase_entry); + self.finish_force_close_channel(chan.context_mut().force_shutdown(false)); + return Ok(()); + }, } } else { return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id)) @@ -6056,6 +6046,9 @@ where //encrypted with the same key. It's not immediately obvious how to usefully exploit that, //but we should prevent it anyway. + // Note that the ChannelManager is NOT re-persisted on disk after this (unless we error + // closing a channel), so any changes are likely to be lost on restart! + let decoded_hop_res = self.decode_update_add_htlc_onion(msg); let per_peer_state = self.per_peer_state.read().unwrap(); let peer_state_mutex = per_peer_state.get(counterparty_node_id) @@ -6122,6 +6115,17 @@ where hash_map::Entry::Occupied(mut chan_phase_entry) => { if let ChannelPhase::Funded(chan) = chan_phase_entry.get_mut() { let res = try_chan_phase_entry!(self, chan.update_fulfill_htlc(&msg), chan_phase_entry); + if let HTLCSource::PreviousHopData(prev_hop) = &res.0 { + peer_state.actions_blocking_raa_monitor_updates.entry(msg.channel_id) + .or_insert_with(Vec::new) + .push(RAAMonitorUpdateBlockingAction::from_prev_hop_data(&prev_hop)); + } + // Note that we do not need to push an `actions_blocking_raa_monitor_updates` + // entry here, even though we *do* need to block the next RAA monitor update. + // We do this instead in the `claim_funds_internal` by attaching a + // `ReleaseRAAChannelMonitorUpdate` action to the event generated when the + // outbound HTLC is claimed. This is guaranteed to all complete before we + // process the RAA as messages are processed from single peers serially. funding_txo = chan.context.get_funding_txo().expect("We won't accept a fulfill until funded"); res } else { @@ -6132,11 +6136,13 @@ where hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id)) } }; - self.claim_funds_internal(htlc_source, msg.payment_preimage.clone(), Some(forwarded_htlc_value), false, funding_txo); + self.claim_funds_internal(htlc_source, msg.payment_preimage.clone(), Some(forwarded_htlc_value), false, Some(*counterparty_node_id), funding_txo); Ok(()) } fn internal_update_fail_htlc(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateFailHTLC) -> Result<(), MsgHandleErrInternal> { + // Note that the ChannelManager is NOT re-persisted on disk after this (unless we error + // closing a channel), so any changes are likely to be lost on restart! let per_peer_state = self.per_peer_state.read().unwrap(); let peer_state_mutex = per_peer_state.get(counterparty_node_id) .ok_or_else(|| { @@ -6160,6 +6166,8 @@ where } fn internal_update_fail_malformed_htlc(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateFailMalformedHTLC) -> Result<(), MsgHandleErrInternal> { + // Note that the ChannelManager is NOT re-persisted on disk after this (unless we error + // closing a channel), so any changes are likely to be lost on restart! let per_peer_state = self.per_peer_state.read().unwrap(); let peer_state_mutex = per_peer_state.get(counterparty_node_id) .ok_or_else(|| { @@ -6334,6 +6342,23 @@ where }) } + #[cfg(any(test, feature = "_test_utils"))] + pub(crate) fn test_raa_monitor_updates_held(&self, + counterparty_node_id: PublicKey, channel_id: ChannelId + ) -> bool { + let per_peer_state = self.per_peer_state.read().unwrap(); + if let Some(peer_state_mtx) = per_peer_state.get(&counterparty_node_id) { + let mut peer_state_lck = peer_state_mtx.lock().unwrap(); + let peer_state = &mut *peer_state_lck; + + if let Some(chan) = peer_state.channel_by_id.get(&channel_id) { + return self.raa_monitor_updates_held(&peer_state.actions_blocking_raa_monitor_updates, + chan.context().get_funding_txo().unwrap(), counterparty_node_id); + } + } + false + } + fn internal_revoke_and_ack(&self, counterparty_node_id: &PublicKey, msg: &msgs::RevokeAndACK) -> Result<(), MsgHandleErrInternal> { let (htlcs_to_fail, res) = { let per_peer_state = self.per_peer_state.read().unwrap(); @@ -6431,19 +6456,19 @@ where Ok(()) } - /// Returns ShouldPersist if anything changed, otherwise either SkipPersist or an Err. + /// Returns DoPersist if anything changed, otherwise either SkipPersistNoEvents or an Err. fn internal_channel_update(&self, counterparty_node_id: &PublicKey, msg: &msgs::ChannelUpdate) -> Result { let (chan_counterparty_node_id, chan_id) = match self.short_to_chan_info.read().unwrap().get(&msg.contents.short_channel_id) { Some((cp_id, chan_id)) => (cp_id.clone(), chan_id.clone()), None => { // It's not a local channel - return Ok(NotifyOption::SkipPersist) + return Ok(NotifyOption::SkipPersistNoEvents) } }; let per_peer_state = self.per_peer_state.read().unwrap(); let peer_state_mutex_opt = per_peer_state.get(&chan_counterparty_node_id); if peer_state_mutex_opt.is_none() { - return Ok(NotifyOption::SkipPersist) + return Ok(NotifyOption::SkipPersistNoEvents) } let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); let peer_state = &mut *peer_state_lock; @@ -6455,14 +6480,14 @@ where // If the announcement is about a channel of ours which is public, some // other peer may simply be forwarding all its gossip to us. Don't provide // a scary-looking error message and return Ok instead. - return Ok(NotifyOption::SkipPersist); + return Ok(NotifyOption::SkipPersistNoEvents); } return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a channel_update for a channel from the wrong node - it shouldn't know about our private channels!".to_owned(), chan_id)); } let were_node_one = self.get_our_node_id().serialize()[..] < chan.context.get_counterparty_node_id().serialize()[..]; let msg_from_node_one = msg.contents.flags & 1 == 0; if were_node_one == msg_from_node_one { - return Ok(NotifyOption::SkipPersist); + return Ok(NotifyOption::SkipPersistNoEvents); } else { log_debug!(self.logger, "Received channel_update for channel {}.", chan_id); try_chan_phase_entry!(self, chan.channel_update(&msg), chan_phase_entry); @@ -6472,12 +6497,12 @@ where "Got a channel_update for an unfunded channel!".into())), chan_phase_entry); } }, - hash_map::Entry::Vacant(_) => return Ok(NotifyOption::SkipPersist) + hash_map::Entry::Vacant(_) => return Ok(NotifyOption::SkipPersistNoEvents) } Ok(NotifyOption::DoPersist) } - fn internal_channel_reestablish(&self, counterparty_node_id: &PublicKey, msg: &msgs::ChannelReestablish) -> Result<(), MsgHandleErrInternal> { + fn internal_channel_reestablish(&self, counterparty_node_id: &PublicKey, msg: &msgs::ChannelReestablish) -> Result { let htlc_forwards; let need_lnd_workaround = { let per_peer_state = self.per_peer_state.read().unwrap(); @@ -6533,14 +6558,16 @@ where } }; + let mut persist = NotifyOption::SkipPersistHandleEvents; if let Some(forwards) = htlc_forwards { self.forward_htlcs(&mut [forwards][..]); + persist = NotifyOption::DoPersist; } if let Some(channel_ready_msg) = need_lnd_workaround { self.internal_channel_ready(counterparty_node_id, &channel_ready_msg)?; } - Ok(()) + Ok(persist) } /// Process pending events from the [`chain::Watch`], returning whether any events were processed. @@ -6555,8 +6582,8 @@ where match monitor_event { MonitorEvent::HTLCEvent(htlc_update) => { if let Some(preimage) = htlc_update.payment_preimage { - log_trace!(self.logger, "Claiming HTLC with preimage {} from our monitor", &preimage); - self.claim_funds_internal(htlc_update.source, preimage, htlc_update.htlc_value_satoshis.map(|v| v * 1000), true, funding_outpoint); + log_trace!(self.logger, "Claiming HTLC with preimage {} from our monitor", preimage); + self.claim_funds_internal(htlc_update.source, preimage, htlc_update.htlc_value_satoshis.map(|v| v * 1000), true, counterparty_node_id, funding_outpoint); } else { log_trace!(self.logger, "Failing HTLC with hash {} from our monitor", &htlc_update.payment_hash); let receiver = HTLCDestination::NextHopChannel { node_id: counterparty_node_id, channel_id: funding_outpoint.to_channel_id() }; @@ -7090,8 +7117,8 @@ where /// the `MessageSendEvent`s to the specific peer they were generated under. fn get_and_clear_pending_msg_events(&self) -> Vec { let events = RefCell::new(Vec::new()); - PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, &self.persistence_notifier, || { - let mut result = self.process_background_events(); + PersistenceNotifierGuard::optionally_notify(self, || { + let mut result = NotifyOption::SkipPersistNoEvents; // TODO: This behavior should be documented. It's unintuitive that we query // ChannelMonitors when clearing other events. @@ -7172,8 +7199,9 @@ where } fn block_disconnected(&self, header: &BlockHeader, height: u32) { - let _persistence_guard = PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, - &self.persistence_notifier, || -> NotifyOption { NotifyOption::DoPersist }); + let _persistence_guard = + PersistenceNotifierGuard::optionally_notify_skipping_background_events( + self, || -> NotifyOption { NotifyOption::DoPersist }); let new_height = height - 1; { let mut best_block = self.best_block.write().unwrap(); @@ -7207,8 +7235,9 @@ where let block_hash = header.block_hash(); log_trace!(self.logger, "{} transactions included in block {} at height {} provided", txdata.len(), block_hash, height); - let _persistence_guard = PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, - &self.persistence_notifier, || -> NotifyOption { NotifyOption::DoPersist }); + let _persistence_guard = + PersistenceNotifierGuard::optionally_notify_skipping_background_events( + self, || -> NotifyOption { NotifyOption::DoPersist }); self.do_chain_event(Some(height), |channel| channel.transactions_confirmed(&block_hash, height, txdata, self.genesis_hash.clone(), &self.node_signer, &self.default_configuration, &self.logger) .map(|(a, b)| (a, Vec::new(), b))); @@ -7227,8 +7256,9 @@ where let block_hash = header.block_hash(); log_trace!(self.logger, "New best block: {} at height {}", block_hash, height); - let _persistence_guard = PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, - &self.persistence_notifier, || -> NotifyOption { NotifyOption::DoPersist }); + let _persistence_guard = + PersistenceNotifierGuard::optionally_notify_skipping_background_events( + self, || -> NotifyOption { NotifyOption::DoPersist }); *self.best_block.write().unwrap() = BestBlock::new(block_hash, height); self.do_chain_event(Some(height), |channel| channel.best_block_updated(height, header.time, self.genesis_hash.clone(), &self.node_signer, &self.default_configuration, &self.logger)); @@ -7271,8 +7301,9 @@ where } fn transaction_unconfirmed(&self, txid: &Txid) { - let _persistence_guard = PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, - &self.persistence_notifier, || -> NotifyOption { NotifyOption::DoPersist }); + let _persistence_guard = + PersistenceNotifierGuard::optionally_notify_skipping_background_events( + self, || -> NotifyOption { NotifyOption::DoPersist }); self.do_chain_event(None, |channel| { if let Some(funding_txo) = channel.context.get_funding_txo() { if funding_txo.txid == *txid { @@ -7455,18 +7486,26 @@ where } } - /// Gets a [`Future`] that completes when this [`ChannelManager`] needs to be persisted. + /// Gets a [`Future`] that completes when this [`ChannelManager`] may need to be persisted or + /// may have events that need processing. + /// + /// In order to check if this [`ChannelManager`] needs persisting, call + /// [`Self::get_and_clear_needs_persistence`]. /// /// Note that callbacks registered on the [`Future`] MUST NOT call back into this /// [`ChannelManager`] and should instead register actions to be taken later. - /// - pub fn get_persistable_update_future(&self) -> Future { - self.persistence_notifier.get_future() + pub fn get_event_or_persistence_needed_future(&self) -> Future { + self.event_persist_notifier.get_future() + } + + /// Returns true if this [`ChannelManager`] needs to be persisted. + pub fn get_and_clear_needs_persistence(&self) -> bool { + self.needs_persist_flag.swap(false, Ordering::AcqRel) } #[cfg(any(test, feature = "_test_utils"))] - pub fn get_persistence_condvar_value(&self) -> bool { - self.persistence_notifier.notify_pending() + pub fn get_event_or_persist_condvar_value(&self) -> bool { + self.event_persist_notifier.notify_pending() } /// Gets the latest best block which was connected either via the [`chain::Listen`] or @@ -7523,8 +7562,21 @@ where L::Target: Logger, { fn handle_open_channel(&self, counterparty_node_id: &PublicKey, msg: &msgs::OpenChannel) { - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); - let _ = handle_error!(self, self.internal_open_channel(counterparty_node_id, msg), *counterparty_node_id); + // Note that we never need to persist the updated ChannelManager for an inbound + // open_channel message - pre-funded channels are never written so there should be no + // change to the contents. + let _persistence_guard = PersistenceNotifierGuard::optionally_notify(self, || { + let res = self.internal_open_channel(counterparty_node_id, msg); + let persist = match &res { + Err(e) if e.closes_channel() => { + debug_assert!(false, "We shouldn't close a new channel"); + NotifyOption::DoPersist + }, + _ => NotifyOption::SkipPersistHandleEvents, + }; + let _ = handle_error!(self, res, *counterparty_node_id); + persist + }); } fn handle_open_channel_v2(&self, counterparty_node_id: &PublicKey, msg: &msgs::OpenChannelV2) { @@ -7534,8 +7586,13 @@ where } fn handle_accept_channel(&self, counterparty_node_id: &PublicKey, msg: &msgs::AcceptChannel) { - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); - let _ = handle_error!(self, self.internal_accept_channel(counterparty_node_id, msg), *counterparty_node_id); + // Note that we never need to persist the updated ChannelManager for an inbound + // accept_channel message - pre-funded channels are never written so there should be no + // change to the contents. + let _persistence_guard = PersistenceNotifierGuard::optionally_notify(self, || { + let _ = handle_error!(self, self.internal_accept_channel(counterparty_node_id, msg), *counterparty_node_id); + NotifyOption::SkipPersistHandleEvents + }); } fn handle_accept_channel_v2(&self, counterparty_node_id: &PublicKey, msg: &msgs::AcceptChannelV2) { @@ -7555,8 +7612,19 @@ where } fn handle_channel_ready(&self, counterparty_node_id: &PublicKey, msg: &msgs::ChannelReady) { - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); - let _ = handle_error!(self, self.internal_channel_ready(counterparty_node_id, msg), *counterparty_node_id); + // Note that we never need to persist the updated ChannelManager for an inbound + // channel_ready message - while the channel's state will change, any channel_ready message + // will ultimately be re-sent on startup and the `ChannelMonitor` won't be updated so we + // will not force-close the channel on startup. + let _persistence_guard = PersistenceNotifierGuard::optionally_notify(self, || { + let res = self.internal_channel_ready(counterparty_node_id, msg); + let persist = match &res { + Err(e) if e.closes_channel() => NotifyOption::DoPersist, + _ => NotifyOption::SkipPersistHandleEvents, + }; + let _ = handle_error!(self, res, *counterparty_node_id); + persist + }); } fn handle_shutdown(&self, counterparty_node_id: &PublicKey, msg: &msgs::Shutdown) { @@ -7570,8 +7638,19 @@ where } fn handle_update_add_htlc(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateAddHTLC) { - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); - let _ = handle_error!(self, self.internal_update_add_htlc(counterparty_node_id, msg), *counterparty_node_id); + // Note that we never need to persist the updated ChannelManager for an inbound + // update_add_htlc message - the message itself doesn't change our channel state only the + // `commitment_signed` message afterwards will. + let _persistence_guard = PersistenceNotifierGuard::optionally_notify(self, || { + let res = self.internal_update_add_htlc(counterparty_node_id, msg); + let persist = match &res { + Err(e) if e.closes_channel() => NotifyOption::DoPersist, + Err(_) => NotifyOption::SkipPersistHandleEvents, + Ok(()) => NotifyOption::SkipPersistNoEvents, + }; + let _ = handle_error!(self, res, *counterparty_node_id); + persist + }); } fn handle_update_fulfill_htlc(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateFulfillHTLC) { @@ -7580,13 +7659,35 @@ where } fn handle_update_fail_htlc(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateFailHTLC) { - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); - let _ = handle_error!(self, self.internal_update_fail_htlc(counterparty_node_id, msg), *counterparty_node_id); + // Note that we never need to persist the updated ChannelManager for an inbound + // update_fail_htlc message - the message itself doesn't change our channel state only the + // `commitment_signed` message afterwards will. + let _persistence_guard = PersistenceNotifierGuard::optionally_notify(self, || { + let res = self.internal_update_fail_htlc(counterparty_node_id, msg); + let persist = match &res { + Err(e) if e.closes_channel() => NotifyOption::DoPersist, + Err(_) => NotifyOption::SkipPersistHandleEvents, + Ok(()) => NotifyOption::SkipPersistNoEvents, + }; + let _ = handle_error!(self, res, *counterparty_node_id); + persist + }); } fn handle_update_fail_malformed_htlc(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateFailMalformedHTLC) { - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); - let _ = handle_error!(self, self.internal_update_fail_malformed_htlc(counterparty_node_id, msg), *counterparty_node_id); + // Note that we never need to persist the updated ChannelManager for an inbound + // update_fail_malformed_htlc message - the message itself doesn't change our channel state + // only the `commitment_signed` message afterwards will. + let _persistence_guard = PersistenceNotifierGuard::optionally_notify(self, || { + let res = self.internal_update_fail_malformed_htlc(counterparty_node_id, msg); + let persist = match &res { + Err(e) if e.closes_channel() => NotifyOption::DoPersist, + Err(_) => NotifyOption::SkipPersistHandleEvents, + Ok(()) => NotifyOption::SkipPersistNoEvents, + }; + let _ = handle_error!(self, res, *counterparty_node_id); + persist + }); } fn handle_commitment_signed(&self, counterparty_node_id: &PublicKey, msg: &msgs::CommitmentSigned) { @@ -7600,8 +7701,19 @@ where } fn handle_update_fee(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateFee) { - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); - let _ = handle_error!(self, self.internal_update_fee(counterparty_node_id, msg), *counterparty_node_id); + // Note that we never need to persist the updated ChannelManager for an inbound + // update_fee message - the message itself doesn't change our channel state only the + // `commitment_signed` message afterwards will. + let _persistence_guard = PersistenceNotifierGuard::optionally_notify(self, || { + let res = self.internal_update_fee(counterparty_node_id, msg); + let persist = match &res { + Err(e) if e.closes_channel() => NotifyOption::DoPersist, + Err(_) => NotifyOption::SkipPersistHandleEvents, + Ok(()) => NotifyOption::SkipPersistNoEvents, + }; + let _ = handle_error!(self, res, *counterparty_node_id); + persist + }); } fn handle_announcement_signatures(&self, counterparty_node_id: &PublicKey, msg: &msgs::AnnouncementSignatures) { @@ -7610,23 +7722,32 @@ where } fn handle_channel_update(&self, counterparty_node_id: &PublicKey, msg: &msgs::ChannelUpdate) { - PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, &self.persistence_notifier, || { - let force_persist = self.process_background_events(); + PersistenceNotifierGuard::optionally_notify(self, || { if let Ok(persist) = handle_error!(self, self.internal_channel_update(counterparty_node_id, msg), *counterparty_node_id) { - if force_persist == NotifyOption::DoPersist { NotifyOption::DoPersist } else { persist } + persist } else { - NotifyOption::SkipPersist + NotifyOption::DoPersist } }); } fn handle_channel_reestablish(&self, counterparty_node_id: &PublicKey, msg: &msgs::ChannelReestablish) { - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); - let _ = handle_error!(self, self.internal_channel_reestablish(counterparty_node_id, msg), *counterparty_node_id); + let _persistence_guard = PersistenceNotifierGuard::optionally_notify(self, || { + let res = self.internal_channel_reestablish(counterparty_node_id, msg); + let persist = match &res { + Err(e) if e.closes_channel() => NotifyOption::DoPersist, + Err(_) => NotifyOption::SkipPersistHandleEvents, + Ok(persist) => *persist, + }; + let _ = handle_error!(self, res, *counterparty_node_id); + persist + }); } fn peer_disconnected(&self, counterparty_node_id: &PublicKey) { - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); + let _persistence_guard = PersistenceNotifierGuard::optionally_notify( + self, || NotifyOption::SkipPersistHandleEvents); + let mut failed_channels = Vec::new(); let mut per_peer_state = self.per_peer_state.write().unwrap(); let remove_peer = { @@ -7646,6 +7767,7 @@ where } &chan.context }, + // Unfunded channels will always be removed. ChannelPhase::UnfundedOutboundV1(chan) => { &chan.context }, @@ -7653,22 +7775,11 @@ where &chan.context }, }; - // Clean up for removal. update_maps_on_chan_removal!(self, &context); self.issue_channel_close_events(&context, ClosureReason::DisconnectedPeer); false }); - peer_state.inbound_v1_channel_by_id.retain(|_, chan| { - update_maps_on_chan_removal!(self, &chan.context); - self.issue_channel_close_events(&chan.context, ClosureReason::DisconnectedPeer); - false - }); - peer_state.outbound_v1_channel_by_id.retain(|_, chan| { - update_maps_on_chan_removal!(self, &chan.context); - self.issue_channel_close_events(&chan.context, ClosureReason::DisconnectedPeer); - false - }); // Note that we don't bother generating any events for pre-accept channels - // they're not considered "channels" yet from the PoV of our events interface. peer_state.inbound_channel_request_by_id.clear(); @@ -7735,78 +7846,82 @@ where return Err(()); } - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); + let mut res = Ok(()); - // If we have too many peers connected which don't have funded channels, disconnect the - // peer immediately (as long as it doesn't have funded channels). If we have a bunch of - // unfunded channels taking up space in memory for disconnected peers, we still let new - // peers connect, but we'll reject new channels from them. - let connected_peers_without_funded_channels = self.peers_without_funded_channels(|node| node.is_connected); - let inbound_peer_limited = inbound && connected_peers_without_funded_channels >= MAX_NO_CHANNEL_PEERS; + PersistenceNotifierGuard::optionally_notify(self, || { + // If we have too many peers connected which don't have funded channels, disconnect the + // peer immediately (as long as it doesn't have funded channels). If we have a bunch of + // unfunded channels taking up space in memory for disconnected peers, we still let new + // peers connect, but we'll reject new channels from them. + let connected_peers_without_funded_channels = self.peers_without_funded_channels(|node| node.is_connected); + let inbound_peer_limited = inbound && connected_peers_without_funded_channels >= MAX_NO_CHANNEL_PEERS; - { - let mut peer_state_lock = self.per_peer_state.write().unwrap(); - match peer_state_lock.entry(counterparty_node_id.clone()) { - hash_map::Entry::Vacant(e) => { - if inbound_peer_limited { - return Err(()); - } - e.insert(Mutex::new(PeerState { - channel_by_id: HashMap::new(), - outbound_v1_channel_by_id: HashMap::new(), - inbound_v1_channel_by_id: HashMap::new(), - inbound_channel_request_by_id: HashMap::new(), - latest_features: init_msg.features.clone(), - pending_msg_events: Vec::new(), - in_flight_monitor_updates: BTreeMap::new(), - monitor_update_blocked_actions: BTreeMap::new(), - actions_blocking_raa_monitor_updates: BTreeMap::new(), - is_connected: true, - })); - }, - hash_map::Entry::Occupied(e) => { - let mut peer_state = e.get().lock().unwrap(); - peer_state.latest_features = init_msg.features.clone(); - - let best_block_height = self.best_block.read().unwrap().height(); - if inbound_peer_limited && - Self::unfunded_channel_count(&*peer_state, best_block_height) == - peer_state.channel_by_id.len() - { - return Err(()); - } + { + let mut peer_state_lock = self.per_peer_state.write().unwrap(); + match peer_state_lock.entry(counterparty_node_id.clone()) { + hash_map::Entry::Vacant(e) => { + if inbound_peer_limited { + res = Err(()); + return NotifyOption::SkipPersistNoEvents; + } + e.insert(Mutex::new(PeerState { + channel_by_id: HashMap::new(), + inbound_channel_request_by_id: HashMap::new(), + latest_features: init_msg.features.clone(), + pending_msg_events: Vec::new(), + in_flight_monitor_updates: BTreeMap::new(), + monitor_update_blocked_actions: BTreeMap::new(), + actions_blocking_raa_monitor_updates: BTreeMap::new(), + is_connected: true, + })); + }, + hash_map::Entry::Occupied(e) => { + let mut peer_state = e.get().lock().unwrap(); + peer_state.latest_features = init_msg.features.clone(); + + let best_block_height = self.best_block.read().unwrap().height(); + if inbound_peer_limited && + Self::unfunded_channel_count(&*peer_state, best_block_height) == + peer_state.channel_by_id.len() + { + res = Err(()); + return NotifyOption::SkipPersistNoEvents; + } - debug_assert!(!peer_state.is_connected, "A peer shouldn't be connected twice"); - peer_state.is_connected = true; - }, + debug_assert!(!peer_state.is_connected, "A peer shouldn't be connected twice"); + peer_state.is_connected = true; + }, + } } - } - log_debug!(self.logger, "Generating channel_reestablish events for {}", log_pubkey!(counterparty_node_id)); + log_debug!(self.logger, "Generating channel_reestablish events for {}", log_pubkey!(counterparty_node_id)); - let per_peer_state = self.per_peer_state.read().unwrap(); - if let Some(peer_state_mutex) = per_peer_state.get(counterparty_node_id) { - let mut peer_state_lock = peer_state_mutex.lock().unwrap(); - let peer_state = &mut *peer_state_lock; - let pending_msg_events = &mut peer_state.pending_msg_events; + let per_peer_state = self.per_peer_state.read().unwrap(); + if let Some(peer_state_mutex) = per_peer_state.get(counterparty_node_id) { + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); + let peer_state = &mut *peer_state_lock; + let pending_msg_events = &mut peer_state.pending_msg_events; - peer_state.channel_by_id.iter_mut().filter_map(|(_, phase)| - if let ChannelPhase::Funded(chan) = phase { Some(chan) } else { - // Since unfunded channel maps are cleared upon disconnecting a peer, and they're not persisted - // (so won't be recovered after a crash), they shouldn't exist here and we would never need to - // worry about closing and removing them. - debug_assert!(false); - None - } - ).for_each(|chan| { - pending_msg_events.push(events::MessageSendEvent::SendChannelReestablish { - node_id: chan.context.get_counterparty_node_id(), - msg: chan.get_channel_reestablish(&self.logger), + peer_state.channel_by_id.iter_mut().filter_map(|(_, phase)| + if let ChannelPhase::Funded(chan) = phase { Some(chan) } else { + // Since unfunded channel maps are cleared upon disconnecting a peer, and they're not persisted + // (so won't be recovered after a crash), they shouldn't exist here and we would never need to + // worry about closing and removing them. + debug_assert!(false); + None + } + ).for_each(|chan| { + pending_msg_events.push(events::MessageSendEvent::SendChannelReestablish { + node_id: chan.context.get_counterparty_node_id(), + msg: chan.get_channel_reestablish(&self.logger), + }); }); - }); - } - //TODO: Also re-broadcast announcement_signatures - Ok(()) + } + + return NotifyOption::SkipPersistHandleEvents; + //TODO: Also re-broadcast announcement_signatures + }); + res } fn handle_error(&self, counterparty_node_id: &PublicKey, msg: &msgs::ErrorMessage) { @@ -7862,9 +7977,7 @@ where // Note that we don't bother generating any events for pre-accept channels - // they're not considered "channels" yet from the PoV of our events interface. peer_state.inbound_channel_request_by_id.clear(); - peer_state.channel_by_id.keys().cloned() - .chain(peer_state.outbound_v1_channel_by_id.keys().cloned()) - .chain(peer_state.inbound_v1_channel_by_id.keys().cloned()).collect() + peer_state.channel_by_id.keys().cloned().collect() }; for channel_id in channel_ids { // Untrusted messages from peer, we throw away the error if id points to a non-existent channel @@ -7878,7 +7991,7 @@ where if peer_state_mutex_opt.is_none() { return; } let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); let peer_state = &mut *peer_state_lock; - if let Some(chan) = peer_state.outbound_v1_channel_by_id.get_mut(&msg.channel_id) { + if let Some(ChannelPhase::UnfundedOutboundV1(chan)) = peer_state.channel_by_id.get_mut(&msg.channel_id) { if let Ok(msg) = chan.maybe_handle_error_without_close(self.genesis_hash, &self.fee_estimator) { peer_state.pending_msg_events.push(events::MessageSendEvent::SendOpenChannel { node_id: *counterparty_node_id, @@ -9020,8 +9133,6 @@ where let peer_state_from_chans = |channel_by_id| { PeerState { channel_by_id, - outbound_v1_channel_by_id: HashMap::new(), - inbound_v1_channel_by_id: HashMap::new(), inbound_channel_request_by_id: HashMap::new(), latest_features: InitFeatures::empty(), pending_msg_events: Vec::new(), @@ -9392,6 +9503,7 @@ where // downstream chan is closed (because we don't have a // channel_id -> peer map entry). counterparty_opt.is_none(), + counterparty_opt.cloned().or(monitor.get_counterparty_node_id()), monitor.get_funding_txo().0)) } else { None } } else { @@ -9653,7 +9765,9 @@ where pending_background_events: Mutex::new(pending_background_events), total_consistency_lock: RwLock::new(()), background_events_processed_since_startup: AtomicBool::new(false), - persistence_notifier: Notifier::new(), + + event_persist_notifier: Notifier::new(), + needs_persist_flag: AtomicBool::new(false), entropy_source: args.entropy_source, node_signer: args.node_signer, @@ -9670,12 +9784,12 @@ where channel_manager.fail_htlc_backwards_internal(&source, &payment_hash, &reason, receiver); } - for (source, preimage, downstream_value, downstream_closed, downstream_funding) in pending_claims_to_replay { + for (source, preimage, downstream_value, downstream_closed, downstream_node_id, downstream_funding) in pending_claims_to_replay { // We use `downstream_closed` in place of `from_onchain` here just as a guess - we // don't remember in the `ChannelMonitor` where we got a preimage from, but if the // channel is closed we just assume that it probably came from an on-chain claim. channel_manager.claim_funds_internal(source, preimage, Some(downstream_value), - downstream_closed, downstream_funding); + downstream_closed, downstream_node_id, downstream_funding); } //TODO: Broadcast channel update for closed channels, but only after we've made a @@ -9715,9 +9829,9 @@ mod tests { // All nodes start with a persistable update pending as `create_network` connects each node // with all other nodes to make most tests simpler. - assert!(nodes[0].node.get_persistable_update_future().poll_is_complete()); - assert!(nodes[1].node.get_persistable_update_future().poll_is_complete()); - assert!(nodes[2].node.get_persistable_update_future().poll_is_complete()); + assert!(nodes[0].node.get_event_or_persistence_needed_future().poll_is_complete()); + assert!(nodes[1].node.get_event_or_persistence_needed_future().poll_is_complete()); + assert!(nodes[2].node.get_event_or_persistence_needed_future().poll_is_complete()); let mut chan = create_announced_chan_between_nodes(&nodes, 0, 1); @@ -9731,19 +9845,19 @@ mod tests { &nodes[0].node.get_our_node_id()).pop().unwrap(); // The first two nodes (which opened a channel) should now require fresh persistence - assert!(nodes[0].node.get_persistable_update_future().poll_is_complete()); - assert!(nodes[1].node.get_persistable_update_future().poll_is_complete()); + assert!(nodes[0].node.get_event_or_persistence_needed_future().poll_is_complete()); + assert!(nodes[1].node.get_event_or_persistence_needed_future().poll_is_complete()); // ... but the last node should not. - assert!(!nodes[2].node.get_persistable_update_future().poll_is_complete()); + assert!(!nodes[2].node.get_event_or_persistence_needed_future().poll_is_complete()); // After persisting the first two nodes they should no longer need fresh persistence. - assert!(!nodes[0].node.get_persistable_update_future().poll_is_complete()); - assert!(!nodes[1].node.get_persistable_update_future().poll_is_complete()); + assert!(!nodes[0].node.get_event_or_persistence_needed_future().poll_is_complete()); + assert!(!nodes[1].node.get_event_or_persistence_needed_future().poll_is_complete()); // Node 3, unrelated to the only channel, shouldn't care if it receives a channel_update // about the channel. nodes[2].node.handle_channel_update(&nodes[1].node.get_our_node_id(), &chan.0); nodes[2].node.handle_channel_update(&nodes[1].node.get_our_node_id(), &chan.1); - assert!(!nodes[2].node.get_persistable_update_future().poll_is_complete()); + assert!(!nodes[2].node.get_event_or_persistence_needed_future().poll_is_complete()); // The nodes which are a party to the channel should also ignore messages from unrelated // parties. @@ -9751,8 +9865,8 @@ mod tests { nodes[0].node.handle_channel_update(&nodes[2].node.get_our_node_id(), &chan.1); nodes[1].node.handle_channel_update(&nodes[2].node.get_our_node_id(), &chan.0); nodes[1].node.handle_channel_update(&nodes[2].node.get_our_node_id(), &chan.1); - assert!(!nodes[0].node.get_persistable_update_future().poll_is_complete()); - assert!(!nodes[1].node.get_persistable_update_future().poll_is_complete()); + assert!(!nodes[0].node.get_event_or_persistence_needed_future().poll_is_complete()); + assert!(!nodes[1].node.get_event_or_persistence_needed_future().poll_is_complete()); // At this point the channel info given by peers should still be the same. assert_eq!(nodes[0].node.list_channels()[0], node_a_chan_info); @@ -9769,8 +9883,8 @@ mod tests { // persisted and that its channel info remains the same. nodes[0].node.handle_channel_update(&nodes[1].node.get_our_node_id(), &as_update); nodes[1].node.handle_channel_update(&nodes[0].node.get_our_node_id(), &bs_update); - assert!(!nodes[0].node.get_persistable_update_future().poll_is_complete()); - assert!(!nodes[1].node.get_persistable_update_future().poll_is_complete()); + assert!(!nodes[0].node.get_event_or_persistence_needed_future().poll_is_complete()); + assert!(!nodes[1].node.get_event_or_persistence_needed_future().poll_is_complete()); assert_eq!(nodes[0].node.list_channels()[0], node_a_chan_info); assert_eq!(nodes[1].node.list_channels()[0], node_b_chan_info); @@ -9778,8 +9892,8 @@ mod tests { // the channel info has updated. nodes[0].node.handle_channel_update(&nodes[1].node.get_our_node_id(), &bs_update); nodes[1].node.handle_channel_update(&nodes[0].node.get_our_node_id(), &as_update); - assert!(nodes[0].node.get_persistable_update_future().poll_is_complete()); - assert!(nodes[1].node.get_persistable_update_future().poll_is_complete()); + assert!(nodes[0].node.get_event_or_persistence_needed_future().poll_is_complete()); + assert!(nodes[1].node.get_event_or_persistence_needed_future().poll_is_complete()); assert_ne!(nodes[0].node.list_channels()[0], node_a_chan_info); assert_ne!(nodes[1].node.list_channels()[0], node_b_chan_info); } @@ -9930,7 +10044,7 @@ mod tests { // To start (1), send a regular payment but don't claim it. let expected_route = [&nodes[1]]; - let (payment_preimage, payment_hash, _) = route_payment(&nodes[0], &expected_route, 100_000); + let (payment_preimage, payment_hash, ..) = route_payment(&nodes[0], &expected_route, 100_000); // Next, attempt a keysend payment and make sure it fails. let route_params = RouteParameters::from_payment_params_and_value(