Merge pull request #2413 from valentinewallace/2023-07-route-blinding
authorMatt Corallo <649246+TheBlueMatt@users.noreply.github.com>
Wed, 13 Sep 2023 20:51:59 +0000 (20:51 +0000)
committerGitHub <noreply@github.com>
Wed, 13 Sep 2023 20:51:59 +0000 (20:51 +0000)
Route blinding MVP

1  2 
lightning/src/ln/channelmanager.rs

index 918aca8e77a705201c3afab9e0890df02d280d30,d0dff57ed3b0c86642264188097a799f5b49cc23..90451d59d66d635821ffbe3ce32790d471290c6c
@@@ -177,7 -177,7 +177,7 @@@ pub(super) enum HTLCForwardInfo 
  }
  
  /// Tracks the inbound corresponding to an outbound HTLC
 -#[derive(Clone, Hash, PartialEq, Eq)]
 +#[derive(Clone, Debug, Hash, PartialEq, Eq)]
  pub(crate) struct HTLCPreviousHopData {
        // Note that this may be an outbound SCID alias for the associated channel.
        short_channel_id: u64,
@@@ -233,8 -233,7 +233,8 @@@ impl From<&ClaimableHTLC> for events::C
        }
  }
  
 -/// A payment identifier used to uniquely identify a payment to LDK.
 +/// A user-provided identifier in [`ChannelManager::send_payment`] used to uniquely identify
 +/// a payment and ensure idempotency in LDK.
  ///
  /// This is not exported to bindings users as we just use [u8; 32] directly
  #[derive(Hash, Copy, Clone, PartialEq, Eq, Debug)]
@@@ -283,7 -282,7 +283,7 @@@ impl Readable for InterceptId 
        }
  }
  
 -#[derive(Clone, Copy, PartialEq, Eq, Hash)]
 +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
  /// Uniquely describes an HTLC by its source. Just the guaranteed-unique subset of [`HTLCSource`].
  pub(crate) enum SentHTLCId {
        PreviousHopData { short_channel_id: u64, htlc_id: u64 },
@@@ -314,7 -313,7 +314,7 @@@ impl_writeable_tlv_based_enum!(SentHTLC
  
  /// Tracks the inbound corresponding to an outbound HTLC
  #[allow(clippy::derive_hash_xor_eq)] // Our Hash is faithful to the data, we just don't have SecretKey::hash
 -#[derive(Clone, PartialEq, Eq)]
 +#[derive(Clone, Debug, PartialEq, Eq)]
  pub(crate) enum HTLCSource {
        PreviousHopData(HTLCPreviousHopData),
        OutboundRoute {
@@@ -495,10 -494,6 +495,10 @@@ impl MsgHandleErrInternal 
                        channel_capacity: None,
                }
        }
 +
 +      fn closes_channel(&self) -> bool {
 +              self.chan_id.is_some()
 +      }
  }
  
  /// We hold back HTLCs we intend to relay for a random interval greater than this (see
@@@ -660,6 -655,7 +660,6 @@@ pub(crate) enum RAAMonitorUpdateBlockin
  }
  
  impl RAAMonitorUpdateBlockingAction {
 -      #[allow(unused)]
        fn from_prev_hop_data(prev_hop: &HTLCPreviousHopData) -> Self {
                Self::ForwardedPaymentInboundClaim {
                        channel_id: prev_hop.outpoint.to_channel_id(),
@@@ -1189,8 -1185,7 +1189,8 @@@ wher
  
        background_events_processed_since_startup: AtomicBool,
  
 -      persistence_notifier: Notifier,
 +      event_persist_notifier: Notifier,
 +      needs_persist_flag: AtomicBool,
  
        entropy_source: ES,
        node_signer: NS,
@@@ -1219,8 -1214,7 +1219,8 @@@ pub struct ChainParameters 
  #[must_use]
  enum NotifyOption {
        DoPersist,
 -      SkipPersist,
 +      SkipPersistHandleEvents,
 +      SkipPersistNoEvents,
  }
  
  /// Whenever we release the `ChannelManager`'s `total_consistency_lock`, from read mode, it is
  /// We allow callers to either always notify by constructing with `notify_on_drop` or choose to
  /// notify or not based on whether relevant changes have been made, providing a closure to
  /// `optionally_notify` which returns a `NotifyOption`.
 -struct PersistenceNotifierGuard<'a, F: Fn() -> NotifyOption> {
 -      persistence_notifier: &'a Notifier,
 +struct PersistenceNotifierGuard<'a, F: FnMut() -> NotifyOption> {
 +      event_persist_notifier: &'a Notifier,
 +      needs_persist_flag: &'a AtomicBool,
        should_persist: F,
        // We hold onto this result so the lock doesn't get released immediately.
        _read_guard: RwLockReadGuard<'a, ()>,
  }
  
  impl<'a> PersistenceNotifierGuard<'a, fn() -> NotifyOption> { // We don't care what the concrete F is here, it's unused
 -      fn notify_on_drop<C: AChannelManager>(cm: &'a C) -> PersistenceNotifierGuard<'a, impl Fn() -> NotifyOption> {
 +      /// Notifies any waiters and indicates that we need to persist, in addition to possibly having
 +      /// events to handle.
 +      ///
 +      /// This must always be called if the changes included a `ChannelMonitorUpdate`, as well as in
 +      /// other cases where losing the changes on restart may result in a force-close or otherwise
 +      /// isn't ideal.
 +      fn notify_on_drop<C: AChannelManager>(cm: &'a C) -> PersistenceNotifierGuard<'a, impl FnMut() -> NotifyOption> {
 +              Self::optionally_notify(cm, || -> NotifyOption { NotifyOption::DoPersist })
 +      }
 +
 +      fn optionally_notify<F: FnMut() -> NotifyOption, C: AChannelManager>(cm: &'a C, mut persist_check: F)
 +      -> PersistenceNotifierGuard<'a, impl FnMut() -> NotifyOption> {
                let read_guard = cm.get_cm().total_consistency_lock.read().unwrap();
 -              let _ = cm.get_cm().process_background_events(); // We always persist
 +              let force_notify = cm.get_cm().process_background_events();
  
                PersistenceNotifierGuard {
 -                      persistence_notifier: &cm.get_cm().persistence_notifier,
 -                      should_persist: || -> NotifyOption { NotifyOption::DoPersist },
 +                      event_persist_notifier: &cm.get_cm().event_persist_notifier,
 +                      needs_persist_flag: &cm.get_cm().needs_persist_flag,
 +                      should_persist: move || {
 +                              // Pick the "most" action between `persist_check` and the background events
 +                              // processing and return that.
 +                              let notify = persist_check();
 +                              match (notify, force_notify) {
 +                                      (NotifyOption::DoPersist, _) => NotifyOption::DoPersist,
 +                                      (_, NotifyOption::DoPersist) => NotifyOption::DoPersist,
 +                                      (NotifyOption::SkipPersistHandleEvents, _) => NotifyOption::SkipPersistHandleEvents,
 +                                      (_, NotifyOption::SkipPersistHandleEvents) => NotifyOption::SkipPersistHandleEvents,
 +                                      _ => NotifyOption::SkipPersistNoEvents,
 +                              }
 +                      },
                        _read_guard: read_guard,
                }
 -
        }
  
        /// Note that if any [`ChannelMonitorUpdate`]s are possibly generated,
 -      /// [`ChannelManager::process_background_events`] MUST be called first.
 -      fn optionally_notify<F: Fn() -> NotifyOption>(lock: &'a RwLock<()>, notifier: &'a Notifier, persist_check: F) -> PersistenceNotifierGuard<'a, F> {
 -              let read_guard = lock.read().unwrap();
 +      /// [`ChannelManager::process_background_events`] MUST be called first (or
 +      /// [`Self::optionally_notify`] used).
 +      fn optionally_notify_skipping_background_events<F: Fn() -> NotifyOption, C: AChannelManager>
 +      (cm: &'a C, persist_check: F) -> PersistenceNotifierGuard<'a, F> {
 +              let read_guard = cm.get_cm().total_consistency_lock.read().unwrap();
  
                PersistenceNotifierGuard {
 -                      persistence_notifier: notifier,
 +                      event_persist_notifier: &cm.get_cm().event_persist_notifier,
 +                      needs_persist_flag: &cm.get_cm().needs_persist_flag,
                        should_persist: persist_check,
                        _read_guard: read_guard,
                }
        }
  }
  
 -impl<'a, F: Fn() -> NotifyOption> Drop for PersistenceNotifierGuard<'a, F> {
 +impl<'a, F: FnMut() -> NotifyOption> Drop for PersistenceNotifierGuard<'a, F> {
        fn drop(&mut self) {
 -              if (self.should_persist)() == NotifyOption::DoPersist {
 -                      self.persistence_notifier.notify();
 +              match (self.should_persist)() {
 +                      NotifyOption::DoPersist => {
 +                              self.needs_persist_flag.store(true, Ordering::Release);
 +                              self.event_persist_notifier.notify()
 +                      },
 +                      NotifyOption::SkipPersistHandleEvents =>
 +                              self.event_persist_notifier.notify(),
 +                      NotifyOption::SkipPersistNoEvents => {},
                }
        }
  }
@@@ -1707,15 -1669,11 +1707,15 @@@ pub enum ChannelShutdownState 
  pub enum RecentPaymentDetails {
        /// When an invoice was requested and thus a payment has not yet been sent.
        AwaitingInvoice {
 -              /// Identifier for the payment to ensure idempotency.
 +              /// A user-provided identifier in [`ChannelManager::send_payment`] used to uniquely identify
 +              /// a payment and ensure idempotency in LDK.
                payment_id: PaymentId,
        },
        /// When a payment is still being sent and awaiting successful delivery.
        Pending {
 +              /// A user-provided identifier in [`ChannelManager::send_payment`] used to uniquely identify
 +              /// a payment and ensure idempotency in LDK.
 +              payment_id: PaymentId,
                /// Hash of the payment that is currently being sent but has yet to be fulfilled or
                /// abandoned.
                payment_hash: PaymentHash,
        /// been resolved. Upon receiving [`Event::PaymentSent`], we delay for a few minutes before the
        /// payment is removed from tracking.
        Fulfilled {
 +              /// A user-provided identifier in [`ChannelManager::send_payment`] used to uniquely identify
 +              /// a payment and ensure idempotency in LDK.
 +              payment_id: PaymentId,
                /// Hash of the payment that was claimed. `None` for serializations of [`ChannelManager`]
                /// made before LDK version 0.0.104.
                payment_hash: Option<PaymentHash>,
        /// abandoned via [`ChannelManager::abandon_payment`], it is marked as abandoned until all
        /// pending HTLCs for this payment resolve and an [`Event::PaymentFailed`] is generated.
        Abandoned {
 +              /// A user-provided identifier in [`ChannelManager::send_payment`] used to uniquely identify
 +              /// a payment and ensure idempotency in LDK.
 +              payment_id: PaymentId,
                /// Hash of the payment that we have given up trying to send.
                payment_hash: PaymentHash,
        },
@@@ -2123,7 -2075,7 +2123,7 @@@ macro_rules! process_events_body 
                                return;
                        }
  
 -                      let mut result = NotifyOption::SkipPersist;
 +                      let mut result;
  
                        {
                                // We'll acquire our total consistency lock so that we can be sure no other
  
                                // Because `handle_post_event_actions` may send `ChannelMonitorUpdate`s to the user we must
                                // ensure any startup-generated background events are handled first.
 -                              if $self.process_background_events() == NotifyOption::DoPersist { result = NotifyOption::DoPersist; }
 +                              result = $self.process_background_events();
  
                                // TODO: This behavior should be documented. It's unintuitive that we query
                                // ChannelMonitors when clearing other events.
                                processed_all_events = false;
                        }
  
 -                      if result == NotifyOption::DoPersist {
 -                              $self.persistence_notifier.notify();
 +                      match result {
 +                              NotifyOption::DoPersist => {
 +                                      $self.needs_persist_flag.store(true, Ordering::Release);
 +                                      $self.event_persist_notifier.notify();
 +                              },
 +                              NotifyOption::SkipPersistHandleEvents =>
 +                                      $self.event_persist_notifier.notify(),
 +                              NotifyOption::SkipPersistNoEvents => {},
                        }
                }
        }
@@@ -2258,9 -2204,7 +2258,9 @@@ wher
                        pending_background_events: Mutex::new(Vec::new()),
                        total_consistency_lock: RwLock::new(()),
                        background_events_processed_since_startup: AtomicBool::new(false),
 -                      persistence_notifier: Notifier::new(),
 +
 +                      event_persist_notifier: Notifier::new(),
 +                      needs_persist_flag: AtomicBool::new(false),
  
                        entropy_source,
                        node_signer,
                                },
                                PendingOutboundPayment::Retryable { payment_hash, total_msat, .. } => {
                                        Some(RecentPaymentDetails::Pending {
 +                                              payment_id: *payment_id,
                                                payment_hash: *payment_hash,
                                                total_msat: *total_msat,
                                        })
                                },
                                PendingOutboundPayment::Abandoned { payment_hash, .. } => {
 -                                      Some(RecentPaymentDetails::Abandoned { payment_hash: *payment_hash })
 +                                      Some(RecentPaymentDetails::Abandoned { payment_id: *payment_id, payment_hash: *payment_hash })
                                },
                                PendingOutboundPayment::Fulfilled { payment_hash, .. } => {
 -                                      Some(RecentPaymentDetails::Fulfilled { payment_hash: *payment_hash })
 +                                      Some(RecentPaymentDetails::Fulfilled { payment_id: *payment_id, payment_hash: *payment_hash })
                                },
                                PendingOutboundPayment::Legacy { .. } => None
                        })
                let (short_channel_id, amt_to_forward, outgoing_cltv_value) = match hop_data {
                        msgs::InboundOnionPayload::Forward { short_channel_id, amt_to_forward, outgoing_cltv_value } =>
                                (short_channel_id, amt_to_forward, outgoing_cltv_value),
-                       msgs::InboundOnionPayload::Receive { .. } =>
+                       msgs::InboundOnionPayload::Receive { .. } | msgs::InboundOnionPayload::BlindedReceive { .. } =>
                                return Err(InboundOnionErr {
                                        msg: "Final Node OnionHopData provided for us as an intermediary node",
                                        err_code: 0x4000 | 22,
                                payment_data, keysend_preimage, custom_tlvs, amt_msat, outgoing_cltv_value, payment_metadata, ..
                        } =>
                                (payment_data, keysend_preimage, custom_tlvs, amt_msat, outgoing_cltv_value, payment_metadata),
-                       _ =>
+                       msgs::InboundOnionPayload::BlindedReceive {
+                               amt_msat, total_msat, outgoing_cltv_value, payment_secret, ..
+                       } => {
+                               let payment_data = msgs::FinalOnionHopData { payment_secret, total_msat };
+                               (Some(payment_data), None, Vec::new(), amt_msat, outgoing_cltv_value, None)
+                       }
+                       msgs::InboundOnionPayload::Forward { .. } => {
                                return Err(InboundOnionErr {
                                        err_code: 0x4000|22,
                                        err_data: Vec::new(),
                                        msg: "Got non final data with an HMAC of 0",
-                               }),
+                               })
+                       },
                };
                // final_incorrect_cltv_expiry
                if outgoing_cltv_value > cltv_expiry {
                        }
                }
  
-               let next_hop = match onion_utils::decode_next_payment_hop(shared_secret, &msg.onion_routing_packet.hop_data[..], msg.onion_routing_packet.hmac, msg.payment_hash) {
+               let next_hop = match onion_utils::decode_next_payment_hop(
+                       shared_secret, &msg.onion_routing_packet.hop_data[..], msg.onion_routing_packet.hmac,
+                       msg.payment_hash, &self.node_signer
+               ) {
                        Ok(res) => res,
                        Err(onion_utils::OnionDecodeErr::Malformed { err_msg, err_code }) => {
                                return_malformed_err!(err_msg, err_code);
                        // We'll do receive checks in [`Self::construct_pending_htlc_info`] so we have access to the
                        // inbound channel's state.
                        onion_utils::Hop::Receive { .. } => return Ok((next_hop, shared_secret, None)),
-                       onion_utils::Hop::Forward { next_hop_data: msgs::InboundOnionPayload::Receive { .. }, .. } => {
+                       onion_utils::Hop::Forward { next_hop_data: msgs::InboundOnionPayload::Receive { .. }, .. } |
+                               onion_utils::Hop::Forward { next_hop_data: msgs::InboundOnionPayload::BlindedReceive { .. }, .. } =>
+                       {
                                return_err!("Final Node OnionHopData provided for us as an intermediary node", 0x4000 | 22, &[0; 0]);
                        }
                };
                                                                                        let phantom_pubkey_res = self.node_signer.get_node_id(Recipient::PhantomNode);
                                                                                        if phantom_pubkey_res.is_ok() && fake_scid::is_valid_phantom(&self.fake_scid_rand_bytes, short_chan_id, &self.genesis_hash) {
                                                                                                let phantom_shared_secret = self.node_signer.ecdh(Recipient::PhantomNode, &onion_packet.public_key.unwrap(), None).unwrap().secret_bytes();
-                                                                                               let next_hop = match onion_utils::decode_next_payment_hop(phantom_shared_secret, &onion_packet.hop_data, onion_packet.hmac, payment_hash) {
+                                                                                               let next_hop = match onion_utils::decode_next_payment_hop(
+                                                                                                       phantom_shared_secret, &onion_packet.hop_data, onion_packet.hmac,
+                                                                                                       payment_hash, &self.node_signer
+                                                                                               ) {
                                                                                                        Ok(res) => res,
                                                                                                        Err(onion_utils::OnionDecodeErr::Malformed { err_msg, err_code }) => {
                                                                                                                let sha256_of_onion = Sha256::hash(&onion_packet.hop_data).into_inner();
                let mut background_events = Vec::new();
                mem::swap(&mut *self.pending_background_events.lock().unwrap(), &mut background_events);
                if background_events.is_empty() {
 -                      return NotifyOption::SkipPersist;
 +                      return NotifyOption::SkipPersistNoEvents;
                }
  
                for event in background_events.drain(..) {
        }
  
        fn update_channel_fee(&self, chan_id: &ChannelId, chan: &mut Channel<SP>, new_feerate: u32) -> NotifyOption {
 -              if !chan.context.is_outbound() { return NotifyOption::SkipPersist; }
 +              if !chan.context.is_outbound() { return NotifyOption::SkipPersistNoEvents; }
                // If the feerate has decreased by less than half, don't bother
                if new_feerate <= chan.context.get_feerate_sat_per_1000_weight() && new_feerate * 2 > chan.context.get_feerate_sat_per_1000_weight() {
                        log_trace!(self.logger, "Channel {} does not qualify for a feerate change from {} to {}.",
 -                              &chan_id, chan.context.get_feerate_sat_per_1000_weight(), new_feerate);
 -                      return NotifyOption::SkipPersist;
 +                              chan_id, chan.context.get_feerate_sat_per_1000_weight(), new_feerate);
 +                      return NotifyOption::SkipPersistNoEvents;
                }
                if !chan.context.is_live() {
                        log_trace!(self.logger, "Channel {} does not qualify for a feerate change from {} to {} as it cannot currently be updated (probably the peer is disconnected).",
 -                              &chan_id, chan.context.get_feerate_sat_per_1000_weight(), new_feerate);
 -                      return NotifyOption::SkipPersist;
 +                              chan_id, chan.context.get_feerate_sat_per_1000_weight(), new_feerate);
 +                      return NotifyOption::SkipPersistNoEvents;
                }
                log_trace!(self.logger, "Channel {} qualifies for a feerate change from {} to {}.",
                        &chan_id, chan.context.get_feerate_sat_per_1000_weight(), new_feerate);
        /// these a fuzz failure (as they usually indicate a channel force-close, which is exactly what
        /// it wants to detect). Thus, we have a variant exposed here for its benefit.
        pub fn maybe_update_chan_fees(&self) {
 -              PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, &self.persistence_notifier, || {
 -                      let mut should_persist = self.process_background_events();
 +              PersistenceNotifierGuard::optionally_notify(self, || {
 +                      let mut should_persist = NotifyOption::SkipPersistNoEvents;
  
                        let normal_feerate = self.fee_estimator.bounded_sat_per_1000_weight(ConfirmationTarget::Normal);
                        let min_mempool_feerate = self.fee_estimator.bounded_sat_per_1000_weight(ConfirmationTarget::MempoolMinimum);
        /// [`ChannelUpdate`]: msgs::ChannelUpdate
        /// [`ChannelConfig`]: crate::util::config::ChannelConfig
        pub fn timer_tick_occurred(&self) {
 -              PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, &self.persistence_notifier, || {
 -                      let mut should_persist = self.process_background_events();
 +              PersistenceNotifierGuard::optionally_notify(self, || {
 +                      let mut should_persist = NotifyOption::SkipPersistNoEvents;
  
                        let normal_feerate = self.fee_estimator.bounded_sat_per_1000_weight(ConfirmationTarget::Normal);
                        let min_mempool_feerate = self.fee_estimator.bounded_sat_per_1000_weight(ConfirmationTarget::MempoolMinimum);
                self.pending_outbound_payments.finalize_claims(sources, &self.pending_events);
        }
  
 -      fn claim_funds_internal(&self, source: HTLCSource, payment_preimage: PaymentPreimage, forwarded_htlc_value_msat: Option<u64>, from_onchain: bool, next_channel_outpoint: OutPoint) {
 +      fn claim_funds_internal(&self, source: HTLCSource, payment_preimage: PaymentPreimage,
 +              forwarded_htlc_value_msat: Option<u64>, from_onchain: bool,
 +              next_channel_counterparty_node_id: Option<PublicKey>, next_channel_outpoint: OutPoint
 +      ) {
                match source {
                        HTLCSource::OutboundRoute { session_priv, payment_id, path, .. } => {
                                debug_assert!(self.background_events_processed_since_startup.load(Ordering::Acquire),
                                        "We don't support claim_htlc claims during startup - monitors may not be available yet");
 +                              if let Some(pubkey) = next_channel_counterparty_node_id {
 +                                      debug_assert_eq!(pubkey, path.hops[0].pubkey);
 +                              }
                                let ev_completion_action = EventCompletionAction::ReleaseRAAChannelMonitorUpdate {
                                        channel_funding_outpoint: next_channel_outpoint,
                                        counterparty_node_id: path.hops[0].pubkey,
                        },
                        HTLCSource::PreviousHopData(hop_data) => {
                                let prev_outpoint = hop_data.outpoint;
 +                              let completed_blocker = RAAMonitorUpdateBlockingAction::from_prev_hop_data(&hop_data);
                                let res = self.claim_funds_from_hop(hop_data, payment_preimage,
                                        |htlc_claim_value_msat| {
                                                if let Some(forwarded_htlc_value) = forwarded_htlc_value_msat {
                                                                        next_channel_id: Some(next_channel_outpoint.to_channel_id()),
                                                                        outbound_amount_forwarded_msat: forwarded_htlc_value_msat,
                                                                },
 -                                                              downstream_counterparty_and_funding_outpoint: None,
 +                                                              downstream_counterparty_and_funding_outpoint:
 +                                                                      if let Some(node_id) = next_channel_counterparty_node_id {
 +                                                                              Some((node_id, next_channel_outpoint, completed_blocker))
 +                                                                      } else {
 +                                                                              // We can only get `None` here if we are processing a
 +                                                                              // `ChannelMonitor`-originated event, in which case we
 +                                                                              // don't care about ensuring we wake the downstream
 +                                                                              // channel's monitor updating - the channel is already
 +                                                                              // closed.
 +                                                                              None
 +                                                                      },
                                                        })
                                                } else { None }
                                        });
        }
  
        fn internal_open_channel(&self, counterparty_node_id: &PublicKey, msg: &msgs::OpenChannel) -> Result<(), MsgHandleErrInternal> {
 +              // Note that the ChannelManager is NOT re-persisted on disk after this, so any changes are
 +              // likely to be lost on restart!
                if msg.chain_hash != self.genesis_hash {
                        return Err(MsgHandleErrInternal::send_err_msg_no_close("Unknown genesis block hash".to_owned(), msg.temporary_channel_id.clone()));
                }
        }
  
        fn internal_accept_channel(&self, counterparty_node_id: &PublicKey, msg: &msgs::AcceptChannel) -> Result<(), MsgHandleErrInternal> {
 +              // Note that the ChannelManager is NOT re-persisted on disk after this, so any changes are
 +              // likely to be lost on restart!
                let (value, output_script, user_id) = {
                        let per_peer_state = self.per_peer_state.read().unwrap();
                        let peer_state_mutex = per_peer_state.get(counterparty_node_id)
        }
  
        fn internal_channel_ready(&self, counterparty_node_id: &PublicKey, msg: &msgs::ChannelReady) -> Result<(), MsgHandleErrInternal> {
 +              // Note that the ChannelManager is NOT re-persisted on disk after this (unless we error
 +              // closing a channel), so any changes are likely to be lost on restart!
                let per_peer_state = self.per_peer_state.read().unwrap();
                let peer_state_mutex = per_peer_state.get(counterparty_node_id)
                        .ok_or_else(|| {
                //encrypted with the same key. It's not immediately obvious how to usefully exploit that,
                //but we should prevent it anyway.
  
 +              // Note that the ChannelManager is NOT re-persisted on disk after this (unless we error
 +              // closing a channel), so any changes are likely to be lost on restart!
 +
                let decoded_hop_res = self.decode_update_add_htlc_onion(msg);
                let per_peer_state = self.per_peer_state.read().unwrap();
                let peer_state_mutex = per_peer_state.get(counterparty_node_id)
                                hash_map::Entry::Occupied(mut chan_phase_entry) => {
                                        if let ChannelPhase::Funded(chan) = chan_phase_entry.get_mut() {
                                                let res = try_chan_phase_entry!(self, chan.update_fulfill_htlc(&msg), chan_phase_entry);
 +                                              if let HTLCSource::PreviousHopData(prev_hop) = &res.0 {
 +                                                      peer_state.actions_blocking_raa_monitor_updates.entry(msg.channel_id)
 +                                                              .or_insert_with(Vec::new)
 +                                                              .push(RAAMonitorUpdateBlockingAction::from_prev_hop_data(&prev_hop));
 +                                              }
 +                                              // Note that we do not need to push an `actions_blocking_raa_monitor_updates`
 +                                              // entry here, even though we *do* need to block the next RAA monitor update.
 +                                              // We do this instead in the `claim_funds_internal` by attaching a
 +                                              // `ReleaseRAAChannelMonitorUpdate` action to the event generated when the
 +                                              // outbound HTLC is claimed. This is guaranteed to all complete before we
 +                                              // process the RAA as messages are processed from single peers serially.
                                                funding_txo = chan.context.get_funding_txo().expect("We won't accept a fulfill until funded");
                                                res
                                        } else {
                                hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id))
                        }
                };
 -              self.claim_funds_internal(htlc_source, msg.payment_preimage.clone(), Some(forwarded_htlc_value), false, funding_txo);
 +              self.claim_funds_internal(htlc_source, msg.payment_preimage.clone(), Some(forwarded_htlc_value), false, Some(*counterparty_node_id), funding_txo);
                Ok(())
        }
  
        fn internal_update_fail_htlc(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateFailHTLC) -> Result<(), MsgHandleErrInternal> {
 +              // Note that the ChannelManager is NOT re-persisted on disk after this (unless we error
 +              // closing a channel), so any changes are likely to be lost on restart!
                let per_peer_state = self.per_peer_state.read().unwrap();
                let peer_state_mutex = per_peer_state.get(counterparty_node_id)
                        .ok_or_else(|| {
        }
  
        fn internal_update_fail_malformed_htlc(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateFailMalformedHTLC) -> Result<(), MsgHandleErrInternal> {
 +              // Note that the ChannelManager is NOT re-persisted on disk after this (unless we error
 +              // closing a channel), so any changes are likely to be lost on restart!
                let per_peer_state = self.per_peer_state.read().unwrap();
                let peer_state_mutex = per_peer_state.get(counterparty_node_id)
                        .ok_or_else(|| {
                })
        }
  
 +      #[cfg(any(test, feature = "_test_utils"))]
 +      pub(crate) fn test_raa_monitor_updates_held(&self,
 +              counterparty_node_id: PublicKey, channel_id: ChannelId
 +      ) -> bool {
 +              let per_peer_state = self.per_peer_state.read().unwrap();
 +              if let Some(peer_state_mtx) = per_peer_state.get(&counterparty_node_id) {
 +                      let mut peer_state_lck = peer_state_mtx.lock().unwrap();
 +                      let peer_state = &mut *peer_state_lck;
 +
 +                      if let Some(chan) = peer_state.channel_by_id.get(&channel_id) {
 +                              return self.raa_monitor_updates_held(&peer_state.actions_blocking_raa_monitor_updates,
 +                                      chan.context().get_funding_txo().unwrap(), counterparty_node_id);
 +                      }
 +              }
 +              false
 +      }
 +
        fn internal_revoke_and_ack(&self, counterparty_node_id: &PublicKey, msg: &msgs::RevokeAndACK) -> Result<(), MsgHandleErrInternal> {
                let (htlcs_to_fail, res) = {
                        let per_peer_state = self.per_peer_state.read().unwrap();
                Ok(())
        }
  
 -      /// Returns ShouldPersist if anything changed, otherwise either SkipPersist or an Err.
 +      /// Returns DoPersist if anything changed, otherwise either SkipPersistNoEvents or an Err.
        fn internal_channel_update(&self, counterparty_node_id: &PublicKey, msg: &msgs::ChannelUpdate) -> Result<NotifyOption, MsgHandleErrInternal> {
                let (chan_counterparty_node_id, chan_id) = match self.short_to_chan_info.read().unwrap().get(&msg.contents.short_channel_id) {
                        Some((cp_id, chan_id)) => (cp_id.clone(), chan_id.clone()),
                        None => {
                                // It's not a local channel
 -                              return Ok(NotifyOption::SkipPersist)
 +                              return Ok(NotifyOption::SkipPersistNoEvents)
                        }
                };
                let per_peer_state = self.per_peer_state.read().unwrap();
                let peer_state_mutex_opt = per_peer_state.get(&chan_counterparty_node_id);
                if peer_state_mutex_opt.is_none() {
 -                      return Ok(NotifyOption::SkipPersist)
 +                      return Ok(NotifyOption::SkipPersistNoEvents)
                }
                let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap();
                let peer_state = &mut *peer_state_lock;
                                                        // If the announcement is about a channel of ours which is public, some
                                                        // other peer may simply be forwarding all its gossip to us. Don't provide
                                                        // a scary-looking error message and return Ok instead.
 -                                                      return Ok(NotifyOption::SkipPersist);
 +                                                      return Ok(NotifyOption::SkipPersistNoEvents);
                                                }
                                                return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a channel_update for a channel from the wrong node - it shouldn't know about our private channels!".to_owned(), chan_id));
                                        }
                                        let were_node_one = self.get_our_node_id().serialize()[..] < chan.context.get_counterparty_node_id().serialize()[..];
                                        let msg_from_node_one = msg.contents.flags & 1 == 0;
                                        if were_node_one == msg_from_node_one {
 -                                              return Ok(NotifyOption::SkipPersist);
 +                                              return Ok(NotifyOption::SkipPersistNoEvents);
                                        } else {
                                                log_debug!(self.logger, "Received channel_update for channel {}.", chan_id);
                                                try_chan_phase_entry!(self, chan.channel_update(&msg), chan_phase_entry);
                                                "Got a channel_update for an unfunded channel!".into())), chan_phase_entry);
                                }
                        },
 -                      hash_map::Entry::Vacant(_) => return Ok(NotifyOption::SkipPersist)
 +                      hash_map::Entry::Vacant(_) => return Ok(NotifyOption::SkipPersistNoEvents)
                }
                Ok(NotifyOption::DoPersist)
        }
  
 -      fn internal_channel_reestablish(&self, counterparty_node_id: &PublicKey, msg: &msgs::ChannelReestablish) -> Result<(), MsgHandleErrInternal> {
 +      fn internal_channel_reestablish(&self, counterparty_node_id: &PublicKey, msg: &msgs::ChannelReestablish) -> Result<NotifyOption, MsgHandleErrInternal> {
                let htlc_forwards;
                let need_lnd_workaround = {
                        let per_peer_state = self.per_peer_state.read().unwrap();
                        }
                };
  
 +              let mut persist = NotifyOption::SkipPersistHandleEvents;
                if let Some(forwards) = htlc_forwards {
                        self.forward_htlcs(&mut [forwards][..]);
 +                      persist = NotifyOption::DoPersist;
                }
  
                if let Some(channel_ready_msg) = need_lnd_workaround {
                        self.internal_channel_ready(counterparty_node_id, &channel_ready_msg)?;
                }
 -              Ok(())
 +              Ok(persist)
        }
  
        /// Process pending events from the [`chain::Watch`], returning whether any events were processed.
                                match monitor_event {
                                        MonitorEvent::HTLCEvent(htlc_update) => {
                                                if let Some(preimage) = htlc_update.payment_preimage {
 -                                                      log_trace!(self.logger, "Claiming HTLC with preimage {} from our monitor", &preimage);
 -                                                      self.claim_funds_internal(htlc_update.source, preimage, htlc_update.htlc_value_satoshis.map(|v| v * 1000), true, funding_outpoint);
 +                                                      log_trace!(self.logger, "Claiming HTLC with preimage {} from our monitor", preimage);
 +                                                      self.claim_funds_internal(htlc_update.source, preimage, htlc_update.htlc_value_satoshis.map(|v| v * 1000), true, counterparty_node_id, funding_outpoint);
                                                } else {
                                                        log_trace!(self.logger, "Failing HTLC with hash {} from our monitor", &htlc_update.payment_hash);
                                                        let receiver = HTLCDestination::NextHopChannel { node_id: counterparty_node_id, channel_id: funding_outpoint.to_channel_id() };
@@@ -7117,8 -7015,8 +7132,8 @@@ wher
        /// the `MessageSendEvent`s to the specific peer they were generated under.
        fn get_and_clear_pending_msg_events(&self) -> Vec<MessageSendEvent> {
                let events = RefCell::new(Vec::new());
 -              PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, &self.persistence_notifier, || {
 -                      let mut result = self.process_background_events();
 +              PersistenceNotifierGuard::optionally_notify(self, || {
 +                      let mut result = NotifyOption::SkipPersistNoEvents;
  
                        // TODO: This behavior should be documented. It's unintuitive that we query
                        // ChannelMonitors when clearing other events.
@@@ -7199,9 -7097,8 +7214,9 @@@ wher
        }
  
        fn block_disconnected(&self, header: &BlockHeader, height: u32) {
 -              let _persistence_guard = PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock,
 -                      &self.persistence_notifier, || -> NotifyOption { NotifyOption::DoPersist });
 +              let _persistence_guard =
 +                      PersistenceNotifierGuard::optionally_notify_skipping_background_events(
 +                              self, || -> NotifyOption { NotifyOption::DoPersist });
                let new_height = height - 1;
                {
                        let mut best_block = self.best_block.write().unwrap();
@@@ -7235,9 -7132,8 +7250,9 @@@ wher
                let block_hash = header.block_hash();
                log_trace!(self.logger, "{} transactions included in block {} at height {} provided", txdata.len(), block_hash, height);
  
 -              let _persistence_guard = PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock,
 -                      &self.persistence_notifier, || -> NotifyOption { NotifyOption::DoPersist });
 +              let _persistence_guard =
 +                      PersistenceNotifierGuard::optionally_notify_skipping_background_events(
 +                              self, || -> NotifyOption { NotifyOption::DoPersist });
                self.do_chain_event(Some(height), |channel| channel.transactions_confirmed(&block_hash, height, txdata, self.genesis_hash.clone(), &self.node_signer, &self.default_configuration, &self.logger)
                        .map(|(a, b)| (a, Vec::new(), b)));
  
                let block_hash = header.block_hash();
                log_trace!(self.logger, "New best block: {} at height {}", block_hash, height);
  
 -              let _persistence_guard = PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock,
 -                      &self.persistence_notifier, || -> NotifyOption { NotifyOption::DoPersist });
 +              let _persistence_guard =
 +                      PersistenceNotifierGuard::optionally_notify_skipping_background_events(
 +                              self, || -> NotifyOption { NotifyOption::DoPersist });
                *self.best_block.write().unwrap() = BestBlock::new(block_hash, height);
  
                self.do_chain_event(Some(height), |channel| channel.best_block_updated(height, header.time, self.genesis_hash.clone(), &self.node_signer, &self.default_configuration, &self.logger));
        }
  
        fn transaction_unconfirmed(&self, txid: &Txid) {
 -              let _persistence_guard = PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock,
 -                      &self.persistence_notifier, || -> NotifyOption { NotifyOption::DoPersist });
 +              let _persistence_guard =
 +                      PersistenceNotifierGuard::optionally_notify_skipping_background_events(
 +                              self, || -> NotifyOption { NotifyOption::DoPersist });
                self.do_chain_event(None, |channel| {
                        if let Some(funding_txo) = channel.context.get_funding_txo() {
                                if funding_txo.txid == *txid {
@@@ -7486,26 -7380,18 +7501,26 @@@ wher
                }
        }
  
 -      /// Gets a [`Future`] that completes when this [`ChannelManager`] needs to be persisted.
 +      /// Gets a [`Future`] that completes when this [`ChannelManager`] may need to be persisted or
 +      /// may have events that need processing.
 +      ///
 +      /// In order to check if this [`ChannelManager`] needs persisting, call
 +      /// [`Self::get_and_clear_needs_persistence`].
        ///
        /// Note that callbacks registered on the [`Future`] MUST NOT call back into this
        /// [`ChannelManager`] and should instead register actions to be taken later.
 -      ///
 -      pub fn get_persistable_update_future(&self) -> Future {
 -              self.persistence_notifier.get_future()
 +      pub fn get_event_or_persistence_needed_future(&self) -> Future {
 +              self.event_persist_notifier.get_future()
 +      }
 +
 +      /// Returns true if this [`ChannelManager`] needs to be persisted.
 +      pub fn get_and_clear_needs_persistence(&self) -> bool {
 +              self.needs_persist_flag.swap(false, Ordering::AcqRel)
        }
  
        #[cfg(any(test, feature = "_test_utils"))]
 -      pub fn get_persistence_condvar_value(&self) -> bool {
 -              self.persistence_notifier.notify_pending()
 +      pub fn get_event_or_persist_condvar_value(&self) -> bool {
 +              self.event_persist_notifier.notify_pending()
        }
  
        /// Gets the latest best block which was connected either via the [`chain::Listen`] or
@@@ -7562,21 -7448,8 +7577,21 @@@ wher
        L::Target: Logger,
  {
        fn handle_open_channel(&self, counterparty_node_id: &PublicKey, msg: &msgs::OpenChannel) {
 -              let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self);
 -              let _ = handle_error!(self, self.internal_open_channel(counterparty_node_id, msg), *counterparty_node_id);
 +              // Note that we never need to persist the updated ChannelManager for an inbound
 +              // open_channel message - pre-funded channels are never written so there should be no
 +              // change to the contents.
 +              let _persistence_guard = PersistenceNotifierGuard::optionally_notify(self, || {
 +                      let res = self.internal_open_channel(counterparty_node_id, msg);
 +                      let persist = match &res {
 +                              Err(e) if e.closes_channel() => {
 +                                      debug_assert!(false, "We shouldn't close a new channel");
 +                                      NotifyOption::DoPersist
 +                              },
 +                              _ => NotifyOption::SkipPersistHandleEvents,
 +                      };
 +                      let _ = handle_error!(self, res, *counterparty_node_id);
 +                      persist
 +              });
        }
  
        fn handle_open_channel_v2(&self, counterparty_node_id: &PublicKey, msg: &msgs::OpenChannelV2) {
        }
  
        fn handle_accept_channel(&self, counterparty_node_id: &PublicKey, msg: &msgs::AcceptChannel) {
 -              let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self);
 -              let _ = handle_error!(self, self.internal_accept_channel(counterparty_node_id, msg), *counterparty_node_id);
 +              // Note that we never need to persist the updated ChannelManager for an inbound
 +              // accept_channel message - pre-funded channels are never written so there should be no
 +              // change to the contents.
 +              let _persistence_guard = PersistenceNotifierGuard::optionally_notify(self, || {
 +                      let _ = handle_error!(self, self.internal_accept_channel(counterparty_node_id, msg), *counterparty_node_id);
 +                      NotifyOption::SkipPersistHandleEvents
 +              });
        }
  
        fn handle_accept_channel_v2(&self, counterparty_node_id: &PublicKey, msg: &msgs::AcceptChannelV2) {
        }
  
        fn handle_channel_ready(&self, counterparty_node_id: &PublicKey, msg: &msgs::ChannelReady) {
 -              let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self);
 -              let _ = handle_error!(self, self.internal_channel_ready(counterparty_node_id, msg), *counterparty_node_id);
 +              // Note that we never need to persist the updated ChannelManager for an inbound
 +              // channel_ready message - while the channel's state will change, any channel_ready message
 +              // will ultimately be re-sent on startup and the `ChannelMonitor` won't be updated so we
 +              // will not force-close the channel on startup.
 +              let _persistence_guard = PersistenceNotifierGuard::optionally_notify(self, || {
 +                      let res = self.internal_channel_ready(counterparty_node_id, msg);
 +                      let persist = match &res {
 +                              Err(e) if e.closes_channel() => NotifyOption::DoPersist,
 +                              _ => NotifyOption::SkipPersistHandleEvents,
 +                      };
 +                      let _ = handle_error!(self, res, *counterparty_node_id);
 +                      persist
 +              });
        }
  
        fn handle_shutdown(&self, counterparty_node_id: &PublicKey, msg: &msgs::Shutdown) {
        }
  
        fn handle_update_add_htlc(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateAddHTLC) {
 -              let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self);
 -              let _ = handle_error!(self, self.internal_update_add_htlc(counterparty_node_id, msg), *counterparty_node_id);
 +              // Note that we never need to persist the updated ChannelManager for an inbound
 +              // update_add_htlc message - the message itself doesn't change our channel state only the
 +              // `commitment_signed` message afterwards will.
 +              let _persistence_guard = PersistenceNotifierGuard::optionally_notify(self, || {
 +                      let res = self.internal_update_add_htlc(counterparty_node_id, msg);
 +                      let persist = match &res {
 +                              Err(e) if e.closes_channel() => NotifyOption::DoPersist,
 +                              Err(_) => NotifyOption::SkipPersistHandleEvents,
 +                              Ok(()) => NotifyOption::SkipPersistNoEvents,
 +                      };
 +                      let _ = handle_error!(self, res, *counterparty_node_id);
 +                      persist
 +              });
        }
  
        fn handle_update_fulfill_htlc(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateFulfillHTLC) {
        }
  
        fn handle_update_fail_htlc(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateFailHTLC) {
 -              let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self);
 -              let _ = handle_error!(self, self.internal_update_fail_htlc(counterparty_node_id, msg), *counterparty_node_id);
 +              // Note that we never need to persist the updated ChannelManager for an inbound
 +              // update_fail_htlc message - the message itself doesn't change our channel state only the
 +              // `commitment_signed` message afterwards will.
 +              let _persistence_guard = PersistenceNotifierGuard::optionally_notify(self, || {
 +                      let res = self.internal_update_fail_htlc(counterparty_node_id, msg);
 +                      let persist = match &res {
 +                              Err(e) if e.closes_channel() => NotifyOption::DoPersist,
 +                              Err(_) => NotifyOption::SkipPersistHandleEvents,
 +                              Ok(()) => NotifyOption::SkipPersistNoEvents,
 +                      };
 +                      let _ = handle_error!(self, res, *counterparty_node_id);
 +                      persist
 +              });
        }
  
        fn handle_update_fail_malformed_htlc(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateFailMalformedHTLC) {
 -              let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self);
 -              let _ = handle_error!(self, self.internal_update_fail_malformed_htlc(counterparty_node_id, msg), *counterparty_node_id);
 +              // Note that we never need to persist the updated ChannelManager for an inbound
 +              // update_fail_malformed_htlc message - the message itself doesn't change our channel state
 +              // only the `commitment_signed` message afterwards will.
 +              let _persistence_guard = PersistenceNotifierGuard::optionally_notify(self, || {
 +                      let res = self.internal_update_fail_malformed_htlc(counterparty_node_id, msg);
 +                      let persist = match &res {
 +                              Err(e) if e.closes_channel() => NotifyOption::DoPersist,
 +                              Err(_) => NotifyOption::SkipPersistHandleEvents,
 +                              Ok(()) => NotifyOption::SkipPersistNoEvents,
 +                      };
 +                      let _ = handle_error!(self, res, *counterparty_node_id);
 +                      persist
 +              });
        }
  
        fn handle_commitment_signed(&self, counterparty_node_id: &PublicKey, msg: &msgs::CommitmentSigned) {
        }
  
        fn handle_update_fee(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateFee) {
 -              let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self);
 -              let _ = handle_error!(self, self.internal_update_fee(counterparty_node_id, msg), *counterparty_node_id);
 +              // Note that we never need to persist the updated ChannelManager for an inbound
 +              // update_fee message - the message itself doesn't change our channel state only the
 +              // `commitment_signed` message afterwards will.
 +              let _persistence_guard = PersistenceNotifierGuard::optionally_notify(self, || {
 +                      let res = self.internal_update_fee(counterparty_node_id, msg);
 +                      let persist = match &res {
 +                              Err(e) if e.closes_channel() => NotifyOption::DoPersist,
 +                              Err(_) => NotifyOption::SkipPersistHandleEvents,
 +                              Ok(()) => NotifyOption::SkipPersistNoEvents,
 +                      };
 +                      let _ = handle_error!(self, res, *counterparty_node_id);
 +                      persist
 +              });
        }
  
        fn handle_announcement_signatures(&self, counterparty_node_id: &PublicKey, msg: &msgs::AnnouncementSignatures) {
        }
  
        fn handle_channel_update(&self, counterparty_node_id: &PublicKey, msg: &msgs::ChannelUpdate) {
 -              PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, &self.persistence_notifier, || {
 -                      let force_persist = self.process_background_events();
 +              PersistenceNotifierGuard::optionally_notify(self, || {
                        if let Ok(persist) = handle_error!(self, self.internal_channel_update(counterparty_node_id, msg), *counterparty_node_id) {
 -                              if force_persist == NotifyOption::DoPersist { NotifyOption::DoPersist } else { persist }
 +                              persist
                        } else {
 -                              NotifyOption::SkipPersist
 +                              NotifyOption::DoPersist
                        }
                });
        }
  
        fn handle_channel_reestablish(&self, counterparty_node_id: &PublicKey, msg: &msgs::ChannelReestablish) {
 -              let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self);
 -              let _ = handle_error!(self, self.internal_channel_reestablish(counterparty_node_id, msg), *counterparty_node_id);
 +              let _persistence_guard = PersistenceNotifierGuard::optionally_notify(self, || {
 +                      let res = self.internal_channel_reestablish(counterparty_node_id, msg);
 +                      let persist = match &res {
 +                              Err(e) if e.closes_channel() => NotifyOption::DoPersist,
 +                              Err(_) => NotifyOption::SkipPersistHandleEvents,
 +                              Ok(persist) => *persist,
 +                      };
 +                      let _ = handle_error!(self, res, *counterparty_node_id);
 +                      persist
 +              });
        }
  
        fn peer_disconnected(&self, counterparty_node_id: &PublicKey) {
 -              let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self);
 +              let _persistence_guard = PersistenceNotifierGuard::optionally_notify(
 +                      self, || NotifyOption::SkipPersistHandleEvents);
 +
                let mut failed_channels = Vec::new();
                let mut per_peer_state = self.per_peer_state.write().unwrap();
                let remove_peer = {
                        return Err(());
                }
  
 -              let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self);
 +              let mut res = Ok(());
  
 -              // If we have too many peers connected which don't have funded channels, disconnect the
 -              // peer immediately (as long as it doesn't have funded channels). If we have a bunch of
 -              // unfunded channels taking up space in memory for disconnected peers, we still let new
 -              // peers connect, but we'll reject new channels from them.
 -              let connected_peers_without_funded_channels = self.peers_without_funded_channels(|node| node.is_connected);
 -              let inbound_peer_limited = inbound && connected_peers_without_funded_channels >= MAX_NO_CHANNEL_PEERS;
 +              PersistenceNotifierGuard::optionally_notify(self, || {
 +                      // If we have too many peers connected which don't have funded channels, disconnect the
 +                      // peer immediately (as long as it doesn't have funded channels). If we have a bunch of
 +                      // unfunded channels taking up space in memory for disconnected peers, we still let new
 +                      // peers connect, but we'll reject new channels from them.
 +                      let connected_peers_without_funded_channels = self.peers_without_funded_channels(|node| node.is_connected);
 +                      let inbound_peer_limited = inbound && connected_peers_without_funded_channels >= MAX_NO_CHANNEL_PEERS;
  
 -              {
 -                      let mut peer_state_lock = self.per_peer_state.write().unwrap();
 -                      match peer_state_lock.entry(counterparty_node_id.clone()) {
 -                              hash_map::Entry::Vacant(e) => {
 -                                      if inbound_peer_limited {
 -                                              return Err(());
 -                                      }
 -                                      e.insert(Mutex::new(PeerState {
 -                                              channel_by_id: HashMap::new(),
 -                                              inbound_channel_request_by_id: HashMap::new(),
 -                                              latest_features: init_msg.features.clone(),
 -                                              pending_msg_events: Vec::new(),
 -                                              in_flight_monitor_updates: BTreeMap::new(),
 -                                              monitor_update_blocked_actions: BTreeMap::new(),
 -                                              actions_blocking_raa_monitor_updates: BTreeMap::new(),
 -                                              is_connected: true,
 -                                      }));
 -                              },
 -                              hash_map::Entry::Occupied(e) => {
 -                                      let mut peer_state = e.get().lock().unwrap();
 -                                      peer_state.latest_features = init_msg.features.clone();
 -
 -                                      let best_block_height = self.best_block.read().unwrap().height();
 -                                      if inbound_peer_limited &&
 -                                              Self::unfunded_channel_count(&*peer_state, best_block_height) ==
 -                                              peer_state.channel_by_id.len()
 -                                      {
 -                                              return Err(());
 -                                      }
 +                      {
 +                              let mut peer_state_lock = self.per_peer_state.write().unwrap();
 +                              match peer_state_lock.entry(counterparty_node_id.clone()) {
 +                                      hash_map::Entry::Vacant(e) => {
 +                                              if inbound_peer_limited {
 +                                                      res = Err(());
 +                                                      return NotifyOption::SkipPersistNoEvents;
 +                                              }
 +                                              e.insert(Mutex::new(PeerState {
 +                                                      channel_by_id: HashMap::new(),
 +                                                      inbound_channel_request_by_id: HashMap::new(),
 +                                                      latest_features: init_msg.features.clone(),
 +                                                      pending_msg_events: Vec::new(),
 +                                                      in_flight_monitor_updates: BTreeMap::new(),
 +                                                      monitor_update_blocked_actions: BTreeMap::new(),
 +                                                      actions_blocking_raa_monitor_updates: BTreeMap::new(),
 +                                                      is_connected: true,
 +                                              }));
 +                                      },
 +                                      hash_map::Entry::Occupied(e) => {
 +                                              let mut peer_state = e.get().lock().unwrap();
 +                                              peer_state.latest_features = init_msg.features.clone();
 +
 +                                              let best_block_height = self.best_block.read().unwrap().height();
 +                                              if inbound_peer_limited &&
 +                                                      Self::unfunded_channel_count(&*peer_state, best_block_height) ==
 +                                                      peer_state.channel_by_id.len()
 +                                              {
 +                                                      res = Err(());
 +                                                      return NotifyOption::SkipPersistNoEvents;
 +                                              }
  
 -                                      debug_assert!(!peer_state.is_connected, "A peer shouldn't be connected twice");
 -                                      peer_state.is_connected = true;
 -                              },
 +                                              debug_assert!(!peer_state.is_connected, "A peer shouldn't be connected twice");
 +                                              peer_state.is_connected = true;
 +                                      },
 +                              }
                        }
 -              }
  
 -              log_debug!(self.logger, "Generating channel_reestablish events for {}", log_pubkey!(counterparty_node_id));
 +                      log_debug!(self.logger, "Generating channel_reestablish events for {}", log_pubkey!(counterparty_node_id));
  
 -              let per_peer_state = self.per_peer_state.read().unwrap();
 -              if let Some(peer_state_mutex) = per_peer_state.get(counterparty_node_id) {
 -                      let mut peer_state_lock = peer_state_mutex.lock().unwrap();
 -                      let peer_state = &mut *peer_state_lock;
 -                      let pending_msg_events = &mut peer_state.pending_msg_events;
 +                      let per_peer_state = self.per_peer_state.read().unwrap();
 +                      if let Some(peer_state_mutex) = per_peer_state.get(counterparty_node_id) {
 +                              let mut peer_state_lock = peer_state_mutex.lock().unwrap();
 +                              let peer_state = &mut *peer_state_lock;
 +                              let pending_msg_events = &mut peer_state.pending_msg_events;
  
 -                      peer_state.channel_by_id.iter_mut().filter_map(|(_, phase)|
 -                              if let ChannelPhase::Funded(chan) = phase { Some(chan) } else {
 -                                      // Since unfunded channel maps are cleared upon disconnecting a peer, and they're not persisted
 -                                      // (so won't be recovered after a crash), they shouldn't exist here and we would never need to
 -                                      // worry about closing and removing them.
 -                                      debug_assert!(false);
 -                                      None
 -                              }
 -                      ).for_each(|chan| {
 -                              pending_msg_events.push(events::MessageSendEvent::SendChannelReestablish {
 -                                      node_id: chan.context.get_counterparty_node_id(),
 -                                      msg: chan.get_channel_reestablish(&self.logger),
 +                              peer_state.channel_by_id.iter_mut().filter_map(|(_, phase)|
 +                                      if let ChannelPhase::Funded(chan) = phase { Some(chan) } else {
 +                                              // Since unfunded channel maps are cleared upon disconnecting a peer, and they're not persisted
 +                                              // (so won't be recovered after a crash), they shouldn't exist here and we would never need to
 +                                              // worry about closing and removing them.
 +                                              debug_assert!(false);
 +                                              None
 +                                      }
 +                              ).for_each(|chan| {
 +                                      pending_msg_events.push(events::MessageSendEvent::SendChannelReestablish {
 +                                              node_id: chan.context.get_counterparty_node_id(),
 +                                              msg: chan.get_channel_reestablish(&self.logger),
 +                                      });
                                });
 -                      });
 -              }
 -              //TODO: Also re-broadcast announcement_signatures
 -              Ok(())
 +                      }
 +
 +                      return NotifyOption::SkipPersistHandleEvents;
 +                      //TODO: Also re-broadcast announcement_signatures
 +              });
 +              res
        }
  
        fn handle_error(&self, counterparty_node_id: &PublicKey, msg: &msgs::ErrorMessage) {
@@@ -9503,7 -9301,6 +9518,7 @@@ wher
                                                                        // downstream chan is closed (because we don't have a
                                                                        // channel_id -> peer map entry).
                                                                        counterparty_opt.is_none(),
 +                                                                      counterparty_opt.cloned().or(monitor.get_counterparty_node_id()),
                                                                        monitor.get_funding_txo().0))
                                                        } else { None }
                                                } else {
                        pending_background_events: Mutex::new(pending_background_events),
                        total_consistency_lock: RwLock::new(()),
                        background_events_processed_since_startup: AtomicBool::new(false),
 -                      persistence_notifier: Notifier::new(),
 +
 +                      event_persist_notifier: Notifier::new(),
 +                      needs_persist_flag: AtomicBool::new(false),
  
                        entropy_source: args.entropy_source,
                        node_signer: args.node_signer,
                        channel_manager.fail_htlc_backwards_internal(&source, &payment_hash, &reason, receiver);
                }
  
 -              for (source, preimage, downstream_value, downstream_closed, downstream_funding) in pending_claims_to_replay {
 +              for (source, preimage, downstream_value, downstream_closed, downstream_node_id, downstream_funding) in pending_claims_to_replay {
                        // We use `downstream_closed` in place of `from_onchain` here just as a guess - we
                        // don't remember in the `ChannelMonitor` where we got a preimage from, but if the
                        // channel is closed we just assume that it probably came from an on-chain claim.
                        channel_manager.claim_funds_internal(source, preimage, Some(downstream_value),
 -                              downstream_closed, downstream_funding);
 +                              downstream_closed, downstream_node_id, downstream_funding);
                }
  
                //TODO: Broadcast channel update for closed channels, but only after we've made a
@@@ -9829,9 -9624,9 +9844,9 @@@ mod tests 
  
                // All nodes start with a persistable update pending as `create_network` connects each node
                // with all other nodes to make most tests simpler.
 -              assert!(nodes[0].node.get_persistable_update_future().poll_is_complete());
 -              assert!(nodes[1].node.get_persistable_update_future().poll_is_complete());
 -              assert!(nodes[2].node.get_persistable_update_future().poll_is_complete());
 +              assert!(nodes[0].node.get_event_or_persistence_needed_future().poll_is_complete());
 +              assert!(nodes[1].node.get_event_or_persistence_needed_future().poll_is_complete());
 +              assert!(nodes[2].node.get_event_or_persistence_needed_future().poll_is_complete());
  
                let mut chan = create_announced_chan_between_nodes(&nodes, 0, 1);
  
                        &nodes[0].node.get_our_node_id()).pop().unwrap();
  
                // The first two nodes (which opened a channel) should now require fresh persistence
 -              assert!(nodes[0].node.get_persistable_update_future().poll_is_complete());
 -              assert!(nodes[1].node.get_persistable_update_future().poll_is_complete());
 +              assert!(nodes[0].node.get_event_or_persistence_needed_future().poll_is_complete());
 +              assert!(nodes[1].node.get_event_or_persistence_needed_future().poll_is_complete());
                // ... but the last node should not.
 -              assert!(!nodes[2].node.get_persistable_update_future().poll_is_complete());
 +              assert!(!nodes[2].node.get_event_or_persistence_needed_future().poll_is_complete());
                // After persisting the first two nodes they should no longer need fresh persistence.
 -              assert!(!nodes[0].node.get_persistable_update_future().poll_is_complete());
 -              assert!(!nodes[1].node.get_persistable_update_future().poll_is_complete());
 +              assert!(!nodes[0].node.get_event_or_persistence_needed_future().poll_is_complete());
 +              assert!(!nodes[1].node.get_event_or_persistence_needed_future().poll_is_complete());
  
                // Node 3, unrelated to the only channel, shouldn't care if it receives a channel_update
                // about the channel.
                nodes[2].node.handle_channel_update(&nodes[1].node.get_our_node_id(), &chan.0);
                nodes[2].node.handle_channel_update(&nodes[1].node.get_our_node_id(), &chan.1);
 -              assert!(!nodes[2].node.get_persistable_update_future().poll_is_complete());
 +              assert!(!nodes[2].node.get_event_or_persistence_needed_future().poll_is_complete());
  
                // The nodes which are a party to the channel should also ignore messages from unrelated
                // parties.
                nodes[0].node.handle_channel_update(&nodes[2].node.get_our_node_id(), &chan.1);
                nodes[1].node.handle_channel_update(&nodes[2].node.get_our_node_id(), &chan.0);
                nodes[1].node.handle_channel_update(&nodes[2].node.get_our_node_id(), &chan.1);
 -              assert!(!nodes[0].node.get_persistable_update_future().poll_is_complete());
 -              assert!(!nodes[1].node.get_persistable_update_future().poll_is_complete());
 +              assert!(!nodes[0].node.get_event_or_persistence_needed_future().poll_is_complete());
 +              assert!(!nodes[1].node.get_event_or_persistence_needed_future().poll_is_complete());
  
                // At this point the channel info given by peers should still be the same.
                assert_eq!(nodes[0].node.list_channels()[0], node_a_chan_info);
                // persisted and that its channel info remains the same.
                nodes[0].node.handle_channel_update(&nodes[1].node.get_our_node_id(), &as_update);
                nodes[1].node.handle_channel_update(&nodes[0].node.get_our_node_id(), &bs_update);
 -              assert!(!nodes[0].node.get_persistable_update_future().poll_is_complete());
 -              assert!(!nodes[1].node.get_persistable_update_future().poll_is_complete());
 +              assert!(!nodes[0].node.get_event_or_persistence_needed_future().poll_is_complete());
 +              assert!(!nodes[1].node.get_event_or_persistence_needed_future().poll_is_complete());
                assert_eq!(nodes[0].node.list_channels()[0], node_a_chan_info);
                assert_eq!(nodes[1].node.list_channels()[0], node_b_chan_info);
  
                // the channel info has updated.
                nodes[0].node.handle_channel_update(&nodes[1].node.get_our_node_id(), &bs_update);
                nodes[1].node.handle_channel_update(&nodes[0].node.get_our_node_id(), &as_update);
 -              assert!(nodes[0].node.get_persistable_update_future().poll_is_complete());
 -              assert!(nodes[1].node.get_persistable_update_future().poll_is_complete());
 +              assert!(nodes[0].node.get_event_or_persistence_needed_future().poll_is_complete());
 +              assert!(nodes[1].node.get_event_or_persistence_needed_future().poll_is_complete());
                assert_ne!(nodes[0].node.list_channels()[0], node_a_chan_info);
                assert_ne!(nodes[1].node.list_channels()[0], node_b_chan_info);
        }
  
                // To start (1), send a regular payment but don't claim it.
                let expected_route = [&nodes[1]];
 -              let (payment_preimage, payment_hash, _) = route_payment(&nodes[0], &expected_route, 100_000);
 +              let (payment_preimage, payment_hash, ..) = route_payment(&nodes[0], &expected_route, 100_000);
  
                // Next, attempt a keysend payment and make sure it fails.
                let route_params = RouteParameters::from_payment_params_and_value(