Merge pull request #2534 from tnull/2023-08-upstream-preflight-probing
[rust-lightning] / lightning / src / ln / channelmanager.rs
index 785595a89ef2f82b1fba5f50484171fe5b4d4086..0cadbd41a29d21ccb99de1e0d7bf84319e33971c 100644 (file)
@@ -495,6 +495,10 @@ impl MsgHandleErrInternal {
                        channel_capacity: None,
                }
        }
+
+       fn closes_channel(&self) -> bool {
+               self.chan_id.is_some()
+       }
 }
 
 /// We hold back HTLCs we intend to relay for a random interval greater than this (see
@@ -1198,7 +1202,8 @@ where
 
        background_events_processed_since_startup: AtomicBool,
 
-       persistence_notifier: Notifier,
+       event_persist_notifier: Notifier,
+       needs_persist_flag: AtomicBool,
 
        entropy_source: ES,
        node_signer: NS,
@@ -1227,7 +1232,8 @@ pub struct ChainParameters {
 #[must_use]
 enum NotifyOption {
        DoPersist,
-       SkipPersist,
+       SkipPersistHandleEvents,
+       SkipPersistNoEvents,
 }
 
 /// Whenever we release the `ChannelManager`'s `total_consistency_lock`, from read mode, it is
@@ -1240,43 +1246,75 @@ enum NotifyOption {
 /// We allow callers to either always notify by constructing with `notify_on_drop` or choose to
 /// notify or not based on whether relevant changes have been made, providing a closure to
 /// `optionally_notify` which returns a `NotifyOption`.
-struct PersistenceNotifierGuard<'a, F: Fn() -> NotifyOption> {
-       persistence_notifier: &'a Notifier,
+struct PersistenceNotifierGuard<'a, F: FnMut() -> NotifyOption> {
+       event_persist_notifier: &'a Notifier,
+       needs_persist_flag: &'a AtomicBool,
        should_persist: F,
        // We hold onto this result so the lock doesn't get released immediately.
        _read_guard: RwLockReadGuard<'a, ()>,
 }
 
 impl<'a> PersistenceNotifierGuard<'a, fn() -> NotifyOption> { // We don't care what the concrete F is here, it's unused
-       fn notify_on_drop<C: AChannelManager>(cm: &'a C) -> PersistenceNotifierGuard<'a, impl Fn() -> NotifyOption> {
+       /// Notifies any waiters and indicates that we need to persist, in addition to possibly having
+       /// events to handle.
+       ///
+       /// This must always be called if the changes included a `ChannelMonitorUpdate`, as well as in
+       /// other cases where losing the changes on restart may result in a force-close or otherwise
+       /// isn't ideal.
+       fn notify_on_drop<C: AChannelManager>(cm: &'a C) -> PersistenceNotifierGuard<'a, impl FnMut() -> NotifyOption> {
+               Self::optionally_notify(cm, || -> NotifyOption { NotifyOption::DoPersist })
+       }
+
+       fn optionally_notify<F: FnMut() -> NotifyOption, C: AChannelManager>(cm: &'a C, mut persist_check: F)
+       -> PersistenceNotifierGuard<'a, impl FnMut() -> NotifyOption> {
                let read_guard = cm.get_cm().total_consistency_lock.read().unwrap();
-               let _ = cm.get_cm().process_background_events(); // We always persist
+               let force_notify = cm.get_cm().process_background_events();
 
                PersistenceNotifierGuard {
-                       persistence_notifier: &cm.get_cm().persistence_notifier,
-                       should_persist: || -> NotifyOption { NotifyOption::DoPersist },
+                       event_persist_notifier: &cm.get_cm().event_persist_notifier,
+                       needs_persist_flag: &cm.get_cm().needs_persist_flag,
+                       should_persist: move || {
+                               // Pick the "most" action between `persist_check` and the background events
+                               // processing and return that.
+                               let notify = persist_check();
+                               match (notify, force_notify) {
+                                       (NotifyOption::DoPersist, _) => NotifyOption::DoPersist,
+                                       (_, NotifyOption::DoPersist) => NotifyOption::DoPersist,
+                                       (NotifyOption::SkipPersistHandleEvents, _) => NotifyOption::SkipPersistHandleEvents,
+                                       (_, NotifyOption::SkipPersistHandleEvents) => NotifyOption::SkipPersistHandleEvents,
+                                       _ => NotifyOption::SkipPersistNoEvents,
+                               }
+                       },
                        _read_guard: read_guard,
                }
-
        }
 
        /// Note that if any [`ChannelMonitorUpdate`]s are possibly generated,
-       /// [`ChannelManager::process_background_events`] MUST be called first.
-       fn optionally_notify<F: Fn() -> NotifyOption>(lock: &'a RwLock<()>, notifier: &'a Notifier, persist_check: F) -> PersistenceNotifierGuard<'a, F> {
-               let read_guard = lock.read().unwrap();
+       /// [`ChannelManager::process_background_events`] MUST be called first (or
+       /// [`Self::optionally_notify`] used).
+       fn optionally_notify_skipping_background_events<F: Fn() -> NotifyOption, C: AChannelManager>
+       (cm: &'a C, persist_check: F) -> PersistenceNotifierGuard<'a, F> {
+               let read_guard = cm.get_cm().total_consistency_lock.read().unwrap();
 
                PersistenceNotifierGuard {
-                       persistence_notifier: notifier,
+                       event_persist_notifier: &cm.get_cm().event_persist_notifier,
+                       needs_persist_flag: &cm.get_cm().needs_persist_flag,
                        should_persist: persist_check,
                        _read_guard: read_guard,
                }
        }
 }
 
-impl<'a, F: Fn() -> NotifyOption> Drop for PersistenceNotifierGuard<'a, F> {
+impl<'a, F: FnMut() -> NotifyOption> Drop for PersistenceNotifierGuard<'a, F> {
        fn drop(&mut self) {
-               if (self.should_persist)() == NotifyOption::DoPersist {
-                       self.persistence_notifier.notify();
+               match (self.should_persist)() {
+                       NotifyOption::DoPersist => {
+                               self.needs_persist_flag.store(true, Ordering::Release);
+                               self.event_persist_notifier.notify()
+                       },
+                       NotifyOption::SkipPersistHandleEvents =>
+                               self.event_persist_notifier.notify(),
+                       NotifyOption::SkipPersistNoEvents => {},
                }
        }
 }
@@ -2098,7 +2136,7 @@ macro_rules! process_events_body {
                                return;
                        }
 
-                       let mut result = NotifyOption::SkipPersist;
+                       let mut result;
 
                        {
                                // We'll acquire our total consistency lock so that we can be sure no other
@@ -2107,7 +2145,7 @@ macro_rules! process_events_body {
 
                                // Because `handle_post_event_actions` may send `ChannelMonitorUpdate`s to the user we must
                                // ensure any startup-generated background events are handled first.
-                               if $self.process_background_events() == NotifyOption::DoPersist { result = NotifyOption::DoPersist; }
+                               result = $self.process_background_events();
 
                                // TODO: This behavior should be documented. It's unintuitive that we query
                                // ChannelMonitors when clearing other events.
@@ -2147,8 +2185,14 @@ macro_rules! process_events_body {
                                processed_all_events = false;
                        }
 
-                       if result == NotifyOption::DoPersist {
-                               $self.persistence_notifier.notify();
+                       match result {
+                               NotifyOption::DoPersist => {
+                                       $self.needs_persist_flag.store(true, Ordering::Release);
+                                       $self.event_persist_notifier.notify();
+                               },
+                               NotifyOption::SkipPersistHandleEvents =>
+                                       $self.event_persist_notifier.notify(),
+                               NotifyOption::SkipPersistNoEvents => {},
                        }
                }
        }
@@ -2227,7 +2271,9 @@ where
                        pending_background_events: Mutex::new(Vec::new()),
                        total_consistency_lock: RwLock::new(()),
                        background_events_processed_since_startup: AtomicBool::new(false),
-                       persistence_notifier: Notifier::new(),
+
+                       event_persist_notifier: Notifier::new(),
+                       needs_persist_flag: AtomicBool::new(false),
 
                        entropy_source,
                        node_signer,
@@ -2762,7 +2808,7 @@ where
                let (short_channel_id, amt_to_forward, outgoing_cltv_value) = match hop_data {
                        msgs::InboundOnionPayload::Forward { short_channel_id, amt_to_forward, outgoing_cltv_value } =>
                                (short_channel_id, amt_to_forward, outgoing_cltv_value),
-                       msgs::InboundOnionPayload::Receive { .. } =>
+                       msgs::InboundOnionPayload::Receive { .. } | msgs::InboundOnionPayload::BlindedReceive { .. } =>
                                return Err(InboundOnionErr {
                                        msg: "Final Node OnionHopData provided for us as an intermediary node",
                                        err_code: 0x4000 | 22,
@@ -2794,12 +2840,19 @@ where
                                payment_data, keysend_preimage, custom_tlvs, amt_msat, outgoing_cltv_value, payment_metadata, ..
                        } =>
                                (payment_data, keysend_preimage, custom_tlvs, amt_msat, outgoing_cltv_value, payment_metadata),
-                       _ =>
+                       msgs::InboundOnionPayload::BlindedReceive {
+                               amt_msat, total_msat, outgoing_cltv_value, payment_secret, ..
+                       } => {
+                               let payment_data = msgs::FinalOnionHopData { payment_secret, total_msat };
+                               (Some(payment_data), None, Vec::new(), amt_msat, outgoing_cltv_value, None)
+                       }
+                       msgs::InboundOnionPayload::Forward { .. } => {
                                return Err(InboundOnionErr {
                                        err_code: 0x4000|22,
                                        err_data: Vec::new(),
                                        msg: "Got non final data with an HMAC of 0",
-                               }),
+                               })
+                       },
                };
                // final_incorrect_cltv_expiry
                if outgoing_cltv_value > cltv_expiry {
@@ -2939,7 +2992,10 @@ where
                        }
                }
 
-               let next_hop = match onion_utils::decode_next_payment_hop(shared_secret, &msg.onion_routing_packet.hop_data[..], msg.onion_routing_packet.hmac, msg.payment_hash) {
+               let next_hop = match onion_utils::decode_next_payment_hop(
+                       shared_secret, &msg.onion_routing_packet.hop_data[..], msg.onion_routing_packet.hmac,
+                       msg.payment_hash, &self.node_signer
+               ) {
                        Ok(res) => res,
                        Err(onion_utils::OnionDecodeErr::Malformed { err_msg, err_code }) => {
                                return_malformed_err!(err_msg, err_code);
@@ -2961,7 +3017,9 @@ where
                        // We'll do receive checks in [`Self::construct_pending_htlc_info`] so we have access to the
                        // inbound channel's state.
                        onion_utils::Hop::Receive { .. } => return Ok((next_hop, shared_secret, None)),
-                       onion_utils::Hop::Forward { next_hop_data: msgs::InboundOnionPayload::Receive { .. }, .. } => {
+                       onion_utils::Hop::Forward { next_hop_data: msgs::InboundOnionPayload::Receive { .. }, .. } |
+                               onion_utils::Hop::Forward { next_hop_data: msgs::InboundOnionPayload::BlindedReceive { .. }, .. } =>
+                       {
                                return_err!("Final Node OnionHopData provided for us as an intermediary node", 0x4000 | 22, &[0; 0]);
                        }
                };
@@ -4058,7 +4116,10 @@ where
                                                                                        let phantom_pubkey_res = self.node_signer.get_node_id(Recipient::PhantomNode);
                                                                                        if phantom_pubkey_res.is_ok() && fake_scid::is_valid_phantom(&self.fake_scid_rand_bytes, short_chan_id, &self.genesis_hash) {
                                                                                                let phantom_shared_secret = self.node_signer.ecdh(Recipient::PhantomNode, &onion_packet.public_key.unwrap(), None).unwrap().secret_bytes();
-                                                                                               let next_hop = match onion_utils::decode_next_payment_hop(phantom_shared_secret, &onion_packet.hop_data, onion_packet.hmac, payment_hash) {
+                                                                                               let next_hop = match onion_utils::decode_next_payment_hop(
+                                                                                                       phantom_shared_secret, &onion_packet.hop_data, onion_packet.hmac,
+                                                                                                       payment_hash, &self.node_signer
+                                                                                               ) {
                                                                                                        Ok(res) => res,
                                                                                                        Err(onion_utils::OnionDecodeErr::Malformed { err_msg, err_code }) => {
                                                                                                                let sha256_of_onion = Sha256::hash(&onion_packet.hop_data).into_inner();
@@ -4462,7 +4523,7 @@ where
                let mut background_events = Vec::new();
                mem::swap(&mut *self.pending_background_events.lock().unwrap(), &mut background_events);
                if background_events.is_empty() {
-                       return NotifyOption::SkipPersist;
+                       return NotifyOption::SkipPersistNoEvents;
                }
 
                for event in background_events.drain(..) {
@@ -4531,17 +4592,17 @@ where
        }
 
        fn update_channel_fee(&self, chan_id: &ChannelId, chan: &mut Channel<SP>, new_feerate: u32) -> NotifyOption {
-               if !chan.context.is_outbound() { return NotifyOption::SkipPersist; }
+               if !chan.context.is_outbound() { return NotifyOption::SkipPersistNoEvents; }
                // If the feerate has decreased by less than half, don't bother
                if new_feerate <= chan.context.get_feerate_sat_per_1000_weight() && new_feerate * 2 > chan.context.get_feerate_sat_per_1000_weight() {
                        log_trace!(self.logger, "Channel {} does not qualify for a feerate change from {} to {}.",
-                               &chan_id, chan.context.get_feerate_sat_per_1000_weight(), new_feerate);
-                       return NotifyOption::SkipPersist;
+                               chan_id, chan.context.get_feerate_sat_per_1000_weight(), new_feerate);
+                       return NotifyOption::SkipPersistNoEvents;
                }
                if !chan.context.is_live() {
                        log_trace!(self.logger, "Channel {} does not qualify for a feerate change from {} to {} as it cannot currently be updated (probably the peer is disconnected).",
-                               &chan_id, chan.context.get_feerate_sat_per_1000_weight(), new_feerate);
-                       return NotifyOption::SkipPersist;
+                               chan_id, chan.context.get_feerate_sat_per_1000_weight(), new_feerate);
+                       return NotifyOption::SkipPersistNoEvents;
                }
                log_trace!(self.logger, "Channel {} qualifies for a feerate change from {} to {}.",
                        &chan_id, chan.context.get_feerate_sat_per_1000_weight(), new_feerate);
@@ -4556,8 +4617,8 @@ where
        /// these a fuzz failure (as they usually indicate a channel force-close, which is exactly what
        /// it wants to detect). Thus, we have a variant exposed here for its benefit.
        pub fn maybe_update_chan_fees(&self) {
-               PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, &self.persistence_notifier, || {
-                       let mut should_persist = self.process_background_events();
+               PersistenceNotifierGuard::optionally_notify(self, || {
+                       let mut should_persist = NotifyOption::SkipPersistNoEvents;
 
                        let normal_feerate = self.fee_estimator.bounded_sat_per_1000_weight(ConfirmationTarget::Normal);
                        let min_mempool_feerate = self.fee_estimator.bounded_sat_per_1000_weight(ConfirmationTarget::MempoolMinimum);
@@ -4601,8 +4662,8 @@ where
        /// [`ChannelUpdate`]: msgs::ChannelUpdate
        /// [`ChannelConfig`]: crate::util::config::ChannelConfig
        pub fn timer_tick_occurred(&self) {
-               PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, &self.persistence_notifier, || {
-                       let mut should_persist = self.process_background_events();
+               PersistenceNotifierGuard::optionally_notify(self, || {
+                       let mut should_persist = NotifyOption::SkipPersistNoEvents;
 
                        let normal_feerate = self.fee_estimator.bounded_sat_per_1000_weight(ConfirmationTarget::Normal);
                        let min_mempool_feerate = self.fee_estimator.bounded_sat_per_1000_weight(ConfirmationTarget::MempoolMinimum);
@@ -5680,6 +5741,8 @@ where
        }
 
        fn internal_open_channel(&self, counterparty_node_id: &PublicKey, msg: &msgs::OpenChannel) -> Result<(), MsgHandleErrInternal> {
+               // Note that the ChannelManager is NOT re-persisted on disk after this, so any changes are
+               // likely to be lost on restart!
                if msg.chain_hash != self.genesis_hash {
                        return Err(MsgHandleErrInternal::send_err_msg_no_close("Unknown genesis block hash".to_owned(), msg.temporary_channel_id.clone()));
                }
@@ -5779,6 +5842,8 @@ where
        }
 
        fn internal_accept_channel(&self, counterparty_node_id: &PublicKey, msg: &msgs::AcceptChannel) -> Result<(), MsgHandleErrInternal> {
+               // Note that the ChannelManager is NOT re-persisted on disk after this, so any changes are
+               // likely to be lost on restart!
                let (value, output_script, user_id) = {
                        let per_peer_state = self.per_peer_state.read().unwrap();
                        let peer_state_mutex = per_peer_state.get(counterparty_node_id)
@@ -5939,6 +6004,8 @@ where
        }
 
        fn internal_channel_ready(&self, counterparty_node_id: &PublicKey, msg: &msgs::ChannelReady) -> Result<(), MsgHandleErrInternal> {
+               // Note that the ChannelManager is NOT re-persisted on disk after this (unless we error
+               // closing a channel), so any changes are likely to be lost on restart!
                let per_peer_state = self.per_peer_state.read().unwrap();
                let peer_state_mutex = per_peer_state.get(counterparty_node_id)
                        .ok_or_else(|| {
@@ -6117,6 +6184,9 @@ where
                //encrypted with the same key. It's not immediately obvious how to usefully exploit that,
                //but we should prevent it anyway.
 
+               // Note that the ChannelManager is NOT re-persisted on disk after this (unless we error
+               // closing a channel), so any changes are likely to be lost on restart!
+
                let decoded_hop_res = self.decode_update_add_htlc_onion(msg);
                let per_peer_state = self.per_peer_state.read().unwrap();
                let peer_state_mutex = per_peer_state.get(counterparty_node_id)
@@ -6209,6 +6279,8 @@ where
        }
 
        fn internal_update_fail_htlc(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateFailHTLC) -> Result<(), MsgHandleErrInternal> {
+               // Note that the ChannelManager is NOT re-persisted on disk after this (unless we error
+               // closing a channel), so any changes are likely to be lost on restart!
                let per_peer_state = self.per_peer_state.read().unwrap();
                let peer_state_mutex = per_peer_state.get(counterparty_node_id)
                        .ok_or_else(|| {
@@ -6232,6 +6304,8 @@ where
        }
 
        fn internal_update_fail_malformed_htlc(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateFailMalformedHTLC) -> Result<(), MsgHandleErrInternal> {
+               // Note that the ChannelManager is NOT re-persisted on disk after this (unless we error
+               // closing a channel), so any changes are likely to be lost on restart!
                let per_peer_state = self.per_peer_state.read().unwrap();
                let peer_state_mutex = per_peer_state.get(counterparty_node_id)
                        .ok_or_else(|| {
@@ -6520,19 +6594,19 @@ where
                Ok(())
        }
 
-       /// Returns ShouldPersist if anything changed, otherwise either SkipPersist or an Err.
+       /// Returns DoPersist if anything changed, otherwise either SkipPersistNoEvents or an Err.
        fn internal_channel_update(&self, counterparty_node_id: &PublicKey, msg: &msgs::ChannelUpdate) -> Result<NotifyOption, MsgHandleErrInternal> {
                let (chan_counterparty_node_id, chan_id) = match self.short_to_chan_info.read().unwrap().get(&msg.contents.short_channel_id) {
                        Some((cp_id, chan_id)) => (cp_id.clone(), chan_id.clone()),
                        None => {
                                // It's not a local channel
-                               return Ok(NotifyOption::SkipPersist)
+                               return Ok(NotifyOption::SkipPersistNoEvents)
                        }
                };
                let per_peer_state = self.per_peer_state.read().unwrap();
                let peer_state_mutex_opt = per_peer_state.get(&chan_counterparty_node_id);
                if peer_state_mutex_opt.is_none() {
-                       return Ok(NotifyOption::SkipPersist)
+                       return Ok(NotifyOption::SkipPersistNoEvents)
                }
                let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap();
                let peer_state = &mut *peer_state_lock;
@@ -6544,14 +6618,14 @@ where
                                                        // If the announcement is about a channel of ours which is public, some
                                                        // other peer may simply be forwarding all its gossip to us. Don't provide
                                                        // a scary-looking error message and return Ok instead.
-                                                       return Ok(NotifyOption::SkipPersist);
+                                                       return Ok(NotifyOption::SkipPersistNoEvents);
                                                }
                                                return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a channel_update for a channel from the wrong node - it shouldn't know about our private channels!".to_owned(), chan_id));
                                        }
                                        let were_node_one = self.get_our_node_id().serialize()[..] < chan.context.get_counterparty_node_id().serialize()[..];
                                        let msg_from_node_one = msg.contents.flags & 1 == 0;
                                        if were_node_one == msg_from_node_one {
-                                               return Ok(NotifyOption::SkipPersist);
+                                               return Ok(NotifyOption::SkipPersistNoEvents);
                                        } else {
                                                log_debug!(self.logger, "Received channel_update for channel {}.", chan_id);
                                                try_chan_phase_entry!(self, chan.channel_update(&msg), chan_phase_entry);
@@ -6561,12 +6635,12 @@ where
                                                "Got a channel_update for an unfunded channel!".into())), chan_phase_entry);
                                }
                        },
-                       hash_map::Entry::Vacant(_) => return Ok(NotifyOption::SkipPersist)
+                       hash_map::Entry::Vacant(_) => return Ok(NotifyOption::SkipPersistNoEvents)
                }
                Ok(NotifyOption::DoPersist)
        }
 
-       fn internal_channel_reestablish(&self, counterparty_node_id: &PublicKey, msg: &msgs::ChannelReestablish) -> Result<(), MsgHandleErrInternal> {
+       fn internal_channel_reestablish(&self, counterparty_node_id: &PublicKey, msg: &msgs::ChannelReestablish) -> Result<NotifyOption, MsgHandleErrInternal> {
                let htlc_forwards;
                let need_lnd_workaround = {
                        let per_peer_state = self.per_peer_state.read().unwrap();
@@ -6622,14 +6696,16 @@ where
                        }
                };
 
+               let mut persist = NotifyOption::SkipPersistHandleEvents;
                if let Some(forwards) = htlc_forwards {
                        self.forward_htlcs(&mut [forwards][..]);
+                       persist = NotifyOption::DoPersist;
                }
 
                if let Some(channel_ready_msg) = need_lnd_workaround {
                        self.internal_channel_ready(counterparty_node_id, &channel_ready_msg)?;
                }
-               Ok(())
+               Ok(persist)
        }
 
        /// Process pending events from the [`chain::Watch`], returning whether any events were processed.
@@ -7179,8 +7255,8 @@ where
        /// the `MessageSendEvent`s to the specific peer they were generated under.
        fn get_and_clear_pending_msg_events(&self) -> Vec<MessageSendEvent> {
                let events = RefCell::new(Vec::new());
-               PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, &self.persistence_notifier, || {
-                       let mut result = self.process_background_events();
+               PersistenceNotifierGuard::optionally_notify(self, || {
+                       let mut result = NotifyOption::SkipPersistNoEvents;
 
                        // TODO: This behavior should be documented. It's unintuitive that we query
                        // ChannelMonitors when clearing other events.
@@ -7261,8 +7337,9 @@ where
        }
 
        fn block_disconnected(&self, header: &BlockHeader, height: u32) {
-               let _persistence_guard = PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock,
-                       &self.persistence_notifier, || -> NotifyOption { NotifyOption::DoPersist });
+               let _persistence_guard =
+                       PersistenceNotifierGuard::optionally_notify_skipping_background_events(
+                               self, || -> NotifyOption { NotifyOption::DoPersist });
                let new_height = height - 1;
                {
                        let mut best_block = self.best_block.write().unwrap();
@@ -7296,8 +7373,9 @@ where
                let block_hash = header.block_hash();
                log_trace!(self.logger, "{} transactions included in block {} at height {} provided", txdata.len(), block_hash, height);
 
-               let _persistence_guard = PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock,
-                       &self.persistence_notifier, || -> NotifyOption { NotifyOption::DoPersist });
+               let _persistence_guard =
+                       PersistenceNotifierGuard::optionally_notify_skipping_background_events(
+                               self, || -> NotifyOption { NotifyOption::DoPersist });
                self.do_chain_event(Some(height), |channel| channel.transactions_confirmed(&block_hash, height, txdata, self.genesis_hash.clone(), &self.node_signer, &self.default_configuration, &self.logger)
                        .map(|(a, b)| (a, Vec::new(), b)));
 
@@ -7316,8 +7394,9 @@ where
                let block_hash = header.block_hash();
                log_trace!(self.logger, "New best block: {} at height {}", block_hash, height);
 
-               let _persistence_guard = PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock,
-                       &self.persistence_notifier, || -> NotifyOption { NotifyOption::DoPersist });
+               let _persistence_guard =
+                       PersistenceNotifierGuard::optionally_notify_skipping_background_events(
+                               self, || -> NotifyOption { NotifyOption::DoPersist });
                *self.best_block.write().unwrap() = BestBlock::new(block_hash, height);
 
                self.do_chain_event(Some(height), |channel| channel.best_block_updated(height, header.time, self.genesis_hash.clone(), &self.node_signer, &self.default_configuration, &self.logger));
@@ -7360,8 +7439,9 @@ where
        }
 
        fn transaction_unconfirmed(&self, txid: &Txid) {
-               let _persistence_guard = PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock,
-                       &self.persistence_notifier, || -> NotifyOption { NotifyOption::DoPersist });
+               let _persistence_guard =
+                       PersistenceNotifierGuard::optionally_notify_skipping_background_events(
+                               self, || -> NotifyOption { NotifyOption::DoPersist });
                self.do_chain_event(None, |channel| {
                        if let Some(funding_txo) = channel.context.get_funding_txo() {
                                if funding_txo.txid == *txid {
@@ -7544,18 +7624,26 @@ where
                }
        }
 
-       /// Gets a [`Future`] that completes when this [`ChannelManager`] needs to be persisted.
+       /// Gets a [`Future`] that completes when this [`ChannelManager`] may need to be persisted or
+       /// may have events that need processing.
+       ///
+       /// In order to check if this [`ChannelManager`] needs persisting, call
+       /// [`Self::get_and_clear_needs_persistence`].
        ///
        /// Note that callbacks registered on the [`Future`] MUST NOT call back into this
        /// [`ChannelManager`] and should instead register actions to be taken later.
-       ///
-       pub fn get_persistable_update_future(&self) -> Future {
-               self.persistence_notifier.get_future()
+       pub fn get_event_or_persistence_needed_future(&self) -> Future {
+               self.event_persist_notifier.get_future()
+       }
+
+       /// Returns true if this [`ChannelManager`] needs to be persisted.
+       pub fn get_and_clear_needs_persistence(&self) -> bool {
+               self.needs_persist_flag.swap(false, Ordering::AcqRel)
        }
 
        #[cfg(any(test, feature = "_test_utils"))]
-       pub fn get_persistence_condvar_value(&self) -> bool {
-               self.persistence_notifier.notify_pending()
+       pub fn get_event_or_persist_condvar_value(&self) -> bool {
+               self.event_persist_notifier.notify_pending()
        }
 
        /// Gets the latest best block which was connected either via the [`chain::Listen`] or
@@ -7612,8 +7700,21 @@ where
        L::Target: Logger,
 {
        fn handle_open_channel(&self, counterparty_node_id: &PublicKey, msg: &msgs::OpenChannel) {
-               let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self);
-               let _ = handle_error!(self, self.internal_open_channel(counterparty_node_id, msg), *counterparty_node_id);
+               // Note that we never need to persist the updated ChannelManager for an inbound
+               // open_channel message - pre-funded channels are never written so there should be no
+               // change to the contents.
+               let _persistence_guard = PersistenceNotifierGuard::optionally_notify(self, || {
+                       let res = self.internal_open_channel(counterparty_node_id, msg);
+                       let persist = match &res {
+                               Err(e) if e.closes_channel() => {
+                                       debug_assert!(false, "We shouldn't close a new channel");
+                                       NotifyOption::DoPersist
+                               },
+                               _ => NotifyOption::SkipPersistHandleEvents,
+                       };
+                       let _ = handle_error!(self, res, *counterparty_node_id);
+                       persist
+               });
        }
 
        fn handle_open_channel_v2(&self, counterparty_node_id: &PublicKey, msg: &msgs::OpenChannelV2) {
@@ -7623,8 +7724,13 @@ where
        }
 
        fn handle_accept_channel(&self, counterparty_node_id: &PublicKey, msg: &msgs::AcceptChannel) {
-               let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self);
-               let _ = handle_error!(self, self.internal_accept_channel(counterparty_node_id, msg), *counterparty_node_id);
+               // Note that we never need to persist the updated ChannelManager for an inbound
+               // accept_channel message - pre-funded channels are never written so there should be no
+               // change to the contents.
+               let _persistence_guard = PersistenceNotifierGuard::optionally_notify(self, || {
+                       let _ = handle_error!(self, self.internal_accept_channel(counterparty_node_id, msg), *counterparty_node_id);
+                       NotifyOption::SkipPersistHandleEvents
+               });
        }
 
        fn handle_accept_channel_v2(&self, counterparty_node_id: &PublicKey, msg: &msgs::AcceptChannelV2) {
@@ -7644,8 +7750,19 @@ where
        }
 
        fn handle_channel_ready(&self, counterparty_node_id: &PublicKey, msg: &msgs::ChannelReady) {
-               let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self);
-               let _ = handle_error!(self, self.internal_channel_ready(counterparty_node_id, msg), *counterparty_node_id);
+               // Note that we never need to persist the updated ChannelManager for an inbound
+               // channel_ready message - while the channel's state will change, any channel_ready message
+               // will ultimately be re-sent on startup and the `ChannelMonitor` won't be updated so we
+               // will not force-close the channel on startup.
+               let _persistence_guard = PersistenceNotifierGuard::optionally_notify(self, || {
+                       let res = self.internal_channel_ready(counterparty_node_id, msg);
+                       let persist = match &res {
+                               Err(e) if e.closes_channel() => NotifyOption::DoPersist,
+                               _ => NotifyOption::SkipPersistHandleEvents,
+                       };
+                       let _ = handle_error!(self, res, *counterparty_node_id);
+                       persist
+               });
        }
 
        fn handle_shutdown(&self, counterparty_node_id: &PublicKey, msg: &msgs::Shutdown) {
@@ -7659,8 +7776,19 @@ where
        }
 
        fn handle_update_add_htlc(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateAddHTLC) {
-               let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self);
-               let _ = handle_error!(self, self.internal_update_add_htlc(counterparty_node_id, msg), *counterparty_node_id);
+               // Note that we never need to persist the updated ChannelManager for an inbound
+               // update_add_htlc message - the message itself doesn't change our channel state only the
+               // `commitment_signed` message afterwards will.
+               let _persistence_guard = PersistenceNotifierGuard::optionally_notify(self, || {
+                       let res = self.internal_update_add_htlc(counterparty_node_id, msg);
+                       let persist = match &res {
+                               Err(e) if e.closes_channel() => NotifyOption::DoPersist,
+                               Err(_) => NotifyOption::SkipPersistHandleEvents,
+                               Ok(()) => NotifyOption::SkipPersistNoEvents,
+                       };
+                       let _ = handle_error!(self, res, *counterparty_node_id);
+                       persist
+               });
        }
 
        fn handle_update_fulfill_htlc(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateFulfillHTLC) {
@@ -7669,13 +7797,35 @@ where
        }
 
        fn handle_update_fail_htlc(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateFailHTLC) {
-               let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self);
-               let _ = handle_error!(self, self.internal_update_fail_htlc(counterparty_node_id, msg), *counterparty_node_id);
+               // Note that we never need to persist the updated ChannelManager for an inbound
+               // update_fail_htlc message - the message itself doesn't change our channel state only the
+               // `commitment_signed` message afterwards will.
+               let _persistence_guard = PersistenceNotifierGuard::optionally_notify(self, || {
+                       let res = self.internal_update_fail_htlc(counterparty_node_id, msg);
+                       let persist = match &res {
+                               Err(e) if e.closes_channel() => NotifyOption::DoPersist,
+                               Err(_) => NotifyOption::SkipPersistHandleEvents,
+                               Ok(()) => NotifyOption::SkipPersistNoEvents,
+                       };
+                       let _ = handle_error!(self, res, *counterparty_node_id);
+                       persist
+               });
        }
 
        fn handle_update_fail_malformed_htlc(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateFailMalformedHTLC) {
-               let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self);
-               let _ = handle_error!(self, self.internal_update_fail_malformed_htlc(counterparty_node_id, msg), *counterparty_node_id);
+               // Note that we never need to persist the updated ChannelManager for an inbound
+               // update_fail_malformed_htlc message - the message itself doesn't change our channel state
+               // only the `commitment_signed` message afterwards will.
+               let _persistence_guard = PersistenceNotifierGuard::optionally_notify(self, || {
+                       let res = self.internal_update_fail_malformed_htlc(counterparty_node_id, msg);
+                       let persist = match &res {
+                               Err(e) if e.closes_channel() => NotifyOption::DoPersist,
+                               Err(_) => NotifyOption::SkipPersistHandleEvents,
+                               Ok(()) => NotifyOption::SkipPersistNoEvents,
+                       };
+                       let _ = handle_error!(self, res, *counterparty_node_id);
+                       persist
+               });
        }
 
        fn handle_commitment_signed(&self, counterparty_node_id: &PublicKey, msg: &msgs::CommitmentSigned) {
@@ -7689,8 +7839,19 @@ where
        }
 
        fn handle_update_fee(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateFee) {
-               let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self);
-               let _ = handle_error!(self, self.internal_update_fee(counterparty_node_id, msg), *counterparty_node_id);
+               // Note that we never need to persist the updated ChannelManager for an inbound
+               // update_fee message - the message itself doesn't change our channel state only the
+               // `commitment_signed` message afterwards will.
+               let _persistence_guard = PersistenceNotifierGuard::optionally_notify(self, || {
+                       let res = self.internal_update_fee(counterparty_node_id, msg);
+                       let persist = match &res {
+                               Err(e) if e.closes_channel() => NotifyOption::DoPersist,
+                               Err(_) => NotifyOption::SkipPersistHandleEvents,
+                               Ok(()) => NotifyOption::SkipPersistNoEvents,
+                       };
+                       let _ = handle_error!(self, res, *counterparty_node_id);
+                       persist
+               });
        }
 
        fn handle_announcement_signatures(&self, counterparty_node_id: &PublicKey, msg: &msgs::AnnouncementSignatures) {
@@ -7699,23 +7860,32 @@ where
        }
 
        fn handle_channel_update(&self, counterparty_node_id: &PublicKey, msg: &msgs::ChannelUpdate) {
-               PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, &self.persistence_notifier, || {
-                       let force_persist = self.process_background_events();
+               PersistenceNotifierGuard::optionally_notify(self, || {
                        if let Ok(persist) = handle_error!(self, self.internal_channel_update(counterparty_node_id, msg), *counterparty_node_id) {
-                               if force_persist == NotifyOption::DoPersist { NotifyOption::DoPersist } else { persist }
+                               persist
                        } else {
-                               NotifyOption::SkipPersist
+                               NotifyOption::DoPersist
                        }
                });
        }
 
        fn handle_channel_reestablish(&self, counterparty_node_id: &PublicKey, msg: &msgs::ChannelReestablish) {
-               let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self);
-               let _ = handle_error!(self, self.internal_channel_reestablish(counterparty_node_id, msg), *counterparty_node_id);
+               let _persistence_guard = PersistenceNotifierGuard::optionally_notify(self, || {
+                       let res = self.internal_channel_reestablish(counterparty_node_id, msg);
+                       let persist = match &res {
+                               Err(e) if e.closes_channel() => NotifyOption::DoPersist,
+                               Err(_) => NotifyOption::SkipPersistHandleEvents,
+                               Ok(persist) => *persist,
+                       };
+                       let _ = handle_error!(self, res, *counterparty_node_id);
+                       persist
+               });
        }
 
        fn peer_disconnected(&self, counterparty_node_id: &PublicKey) {
-               let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self);
+               let _persistence_guard = PersistenceNotifierGuard::optionally_notify(
+                       self, || NotifyOption::SkipPersistHandleEvents);
+
                let mut failed_channels = Vec::new();
                let mut per_peer_state = self.per_peer_state.write().unwrap();
                let remove_peer = {
@@ -7814,76 +7984,82 @@ where
                        return Err(());
                }
 
-               let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self);
+               let mut res = Ok(());
 
-               // If we have too many peers connected which don't have funded channels, disconnect the
-               // peer immediately (as long as it doesn't have funded channels). If we have a bunch of
-               // unfunded channels taking up space in memory for disconnected peers, we still let new
-               // peers connect, but we'll reject new channels from them.
-               let connected_peers_without_funded_channels = self.peers_without_funded_channels(|node| node.is_connected);
-               let inbound_peer_limited = inbound && connected_peers_without_funded_channels >= MAX_NO_CHANNEL_PEERS;
+               PersistenceNotifierGuard::optionally_notify(self, || {
+                       // If we have too many peers connected which don't have funded channels, disconnect the
+                       // peer immediately (as long as it doesn't have funded channels). If we have a bunch of
+                       // unfunded channels taking up space in memory for disconnected peers, we still let new
+                       // peers connect, but we'll reject new channels from them.
+                       let connected_peers_without_funded_channels = self.peers_without_funded_channels(|node| node.is_connected);
+                       let inbound_peer_limited = inbound && connected_peers_without_funded_channels >= MAX_NO_CHANNEL_PEERS;
 
-               {
-                       let mut peer_state_lock = self.per_peer_state.write().unwrap();
-                       match peer_state_lock.entry(counterparty_node_id.clone()) {
-                               hash_map::Entry::Vacant(e) => {
-                                       if inbound_peer_limited {
-                                               return Err(());
-                                       }
-                                       e.insert(Mutex::new(PeerState {
-                                               channel_by_id: HashMap::new(),
-                                               inbound_channel_request_by_id: HashMap::new(),
-                                               latest_features: init_msg.features.clone(),
-                                               pending_msg_events: Vec::new(),
-                                               in_flight_monitor_updates: BTreeMap::new(),
-                                               monitor_update_blocked_actions: BTreeMap::new(),
-                                               actions_blocking_raa_monitor_updates: BTreeMap::new(),
-                                               is_connected: true,
-                                       }));
-                               },
-                               hash_map::Entry::Occupied(e) => {
-                                       let mut peer_state = e.get().lock().unwrap();
-                                       peer_state.latest_features = init_msg.features.clone();
-
-                                       let best_block_height = self.best_block.read().unwrap().height();
-                                       if inbound_peer_limited &&
-                                               Self::unfunded_channel_count(&*peer_state, best_block_height) ==
-                                               peer_state.channel_by_id.len()
-                                       {
-                                               return Err(());
-                                       }
+                       {
+                               let mut peer_state_lock = self.per_peer_state.write().unwrap();
+                               match peer_state_lock.entry(counterparty_node_id.clone()) {
+                                       hash_map::Entry::Vacant(e) => {
+                                               if inbound_peer_limited {
+                                                       res = Err(());
+                                                       return NotifyOption::SkipPersistNoEvents;
+                                               }
+                                               e.insert(Mutex::new(PeerState {
+                                                       channel_by_id: HashMap::new(),
+                                                       inbound_channel_request_by_id: HashMap::new(),
+                                                       latest_features: init_msg.features.clone(),
+                                                       pending_msg_events: Vec::new(),
+                                                       in_flight_monitor_updates: BTreeMap::new(),
+                                                       monitor_update_blocked_actions: BTreeMap::new(),
+                                                       actions_blocking_raa_monitor_updates: BTreeMap::new(),
+                                                       is_connected: true,
+                                               }));
+                                       },
+                                       hash_map::Entry::Occupied(e) => {
+                                               let mut peer_state = e.get().lock().unwrap();
+                                               peer_state.latest_features = init_msg.features.clone();
+
+                                               let best_block_height = self.best_block.read().unwrap().height();
+                                               if inbound_peer_limited &&
+                                                       Self::unfunded_channel_count(&*peer_state, best_block_height) ==
+                                                       peer_state.channel_by_id.len()
+                                               {
+                                                       res = Err(());
+                                                       return NotifyOption::SkipPersistNoEvents;
+                                               }
 
-                                       debug_assert!(!peer_state.is_connected, "A peer shouldn't be connected twice");
-                                       peer_state.is_connected = true;
-                               },
+                                               debug_assert!(!peer_state.is_connected, "A peer shouldn't be connected twice");
+                                               peer_state.is_connected = true;
+                                       },
+                               }
                        }
-               }
 
-               log_debug!(self.logger, "Generating channel_reestablish events for {}", log_pubkey!(counterparty_node_id));
+                       log_debug!(self.logger, "Generating channel_reestablish events for {}", log_pubkey!(counterparty_node_id));
 
-               let per_peer_state = self.per_peer_state.read().unwrap();
-               if let Some(peer_state_mutex) = per_peer_state.get(counterparty_node_id) {
-                       let mut peer_state_lock = peer_state_mutex.lock().unwrap();
-                       let peer_state = &mut *peer_state_lock;
-                       let pending_msg_events = &mut peer_state.pending_msg_events;
+                       let per_peer_state = self.per_peer_state.read().unwrap();
+                       if let Some(peer_state_mutex) = per_peer_state.get(counterparty_node_id) {
+                               let mut peer_state_lock = peer_state_mutex.lock().unwrap();
+                               let peer_state = &mut *peer_state_lock;
+                               let pending_msg_events = &mut peer_state.pending_msg_events;
 
-                       peer_state.channel_by_id.iter_mut().filter_map(|(_, phase)|
-                               if let ChannelPhase::Funded(chan) = phase { Some(chan) } else {
-                                       // Since unfunded channel maps are cleared upon disconnecting a peer, and they're not persisted
-                                       // (so won't be recovered after a crash), they shouldn't exist here and we would never need to
-                                       // worry about closing and removing them.
-                                       debug_assert!(false);
-                                       None
-                               }
-                       ).for_each(|chan| {
-                               pending_msg_events.push(events::MessageSendEvent::SendChannelReestablish {
-                                       node_id: chan.context.get_counterparty_node_id(),
-                                       msg: chan.get_channel_reestablish(&self.logger),
+                               peer_state.channel_by_id.iter_mut().filter_map(|(_, phase)|
+                                       if let ChannelPhase::Funded(chan) = phase { Some(chan) } else {
+                                               // Since unfunded channel maps are cleared upon disconnecting a peer, and they're not persisted
+                                               // (so won't be recovered after a crash), they shouldn't exist here and we would never need to
+                                               // worry about closing and removing them.
+                                               debug_assert!(false);
+                                               None
+                                       }
+                               ).for_each(|chan| {
+                                       pending_msg_events.push(events::MessageSendEvent::SendChannelReestablish {
+                                               node_id: chan.context.get_counterparty_node_id(),
+                                               msg: chan.get_channel_reestablish(&self.logger),
+                                       });
                                });
-                       });
-               }
-               //TODO: Also re-broadcast announcement_signatures
-               Ok(())
+                       }
+
+                       return NotifyOption::SkipPersistHandleEvents;
+                       //TODO: Also re-broadcast announcement_signatures
+               });
+               res
        }
 
        fn handle_error(&self, counterparty_node_id: &PublicKey, msg: &msgs::ErrorMessage) {
@@ -9727,7 +9903,9 @@ where
                        pending_background_events: Mutex::new(pending_background_events),
                        total_consistency_lock: RwLock::new(()),
                        background_events_processed_since_startup: AtomicBool::new(false),
-                       persistence_notifier: Notifier::new(),
+
+                       event_persist_notifier: Notifier::new(),
+                       needs_persist_flag: AtomicBool::new(false),
 
                        entropy_source: args.entropy_source,
                        node_signer: args.node_signer,
@@ -9789,9 +9967,9 @@ mod tests {
 
                // All nodes start with a persistable update pending as `create_network` connects each node
                // with all other nodes to make most tests simpler.
-               assert!(nodes[0].node.get_persistable_update_future().poll_is_complete());
-               assert!(nodes[1].node.get_persistable_update_future().poll_is_complete());
-               assert!(nodes[2].node.get_persistable_update_future().poll_is_complete());
+               assert!(nodes[0].node.get_event_or_persistence_needed_future().poll_is_complete());
+               assert!(nodes[1].node.get_event_or_persistence_needed_future().poll_is_complete());
+               assert!(nodes[2].node.get_event_or_persistence_needed_future().poll_is_complete());
 
                let mut chan = create_announced_chan_between_nodes(&nodes, 0, 1);
 
@@ -9805,19 +9983,19 @@ mod tests {
                        &nodes[0].node.get_our_node_id()).pop().unwrap();
 
                // The first two nodes (which opened a channel) should now require fresh persistence
-               assert!(nodes[0].node.get_persistable_update_future().poll_is_complete());
-               assert!(nodes[1].node.get_persistable_update_future().poll_is_complete());
+               assert!(nodes[0].node.get_event_or_persistence_needed_future().poll_is_complete());
+               assert!(nodes[1].node.get_event_or_persistence_needed_future().poll_is_complete());
                // ... but the last node should not.
-               assert!(!nodes[2].node.get_persistable_update_future().poll_is_complete());
+               assert!(!nodes[2].node.get_event_or_persistence_needed_future().poll_is_complete());
                // After persisting the first two nodes they should no longer need fresh persistence.
-               assert!(!nodes[0].node.get_persistable_update_future().poll_is_complete());
-               assert!(!nodes[1].node.get_persistable_update_future().poll_is_complete());
+               assert!(!nodes[0].node.get_event_or_persistence_needed_future().poll_is_complete());
+               assert!(!nodes[1].node.get_event_or_persistence_needed_future().poll_is_complete());
 
                // Node 3, unrelated to the only channel, shouldn't care if it receives a channel_update
                // about the channel.
                nodes[2].node.handle_channel_update(&nodes[1].node.get_our_node_id(), &chan.0);
                nodes[2].node.handle_channel_update(&nodes[1].node.get_our_node_id(), &chan.1);
-               assert!(!nodes[2].node.get_persistable_update_future().poll_is_complete());
+               assert!(!nodes[2].node.get_event_or_persistence_needed_future().poll_is_complete());
 
                // The nodes which are a party to the channel should also ignore messages from unrelated
                // parties.
@@ -9825,8 +10003,8 @@ mod tests {
                nodes[0].node.handle_channel_update(&nodes[2].node.get_our_node_id(), &chan.1);
                nodes[1].node.handle_channel_update(&nodes[2].node.get_our_node_id(), &chan.0);
                nodes[1].node.handle_channel_update(&nodes[2].node.get_our_node_id(), &chan.1);
-               assert!(!nodes[0].node.get_persistable_update_future().poll_is_complete());
-               assert!(!nodes[1].node.get_persistable_update_future().poll_is_complete());
+               assert!(!nodes[0].node.get_event_or_persistence_needed_future().poll_is_complete());
+               assert!(!nodes[1].node.get_event_or_persistence_needed_future().poll_is_complete());
 
                // At this point the channel info given by peers should still be the same.
                assert_eq!(nodes[0].node.list_channels()[0], node_a_chan_info);
@@ -9843,8 +10021,8 @@ mod tests {
                // persisted and that its channel info remains the same.
                nodes[0].node.handle_channel_update(&nodes[1].node.get_our_node_id(), &as_update);
                nodes[1].node.handle_channel_update(&nodes[0].node.get_our_node_id(), &bs_update);
-               assert!(!nodes[0].node.get_persistable_update_future().poll_is_complete());
-               assert!(!nodes[1].node.get_persistable_update_future().poll_is_complete());
+               assert!(!nodes[0].node.get_event_or_persistence_needed_future().poll_is_complete());
+               assert!(!nodes[1].node.get_event_or_persistence_needed_future().poll_is_complete());
                assert_eq!(nodes[0].node.list_channels()[0], node_a_chan_info);
                assert_eq!(nodes[1].node.list_channels()[0], node_b_chan_info);
 
@@ -9852,8 +10030,8 @@ mod tests {
                // the channel info has updated.
                nodes[0].node.handle_channel_update(&nodes[1].node.get_our_node_id(), &bs_update);
                nodes[1].node.handle_channel_update(&nodes[0].node.get_our_node_id(), &as_update);
-               assert!(nodes[0].node.get_persistable_update_future().poll_is_complete());
-               assert!(nodes[1].node.get_persistable_update_future().poll_is_complete());
+               assert!(nodes[0].node.get_event_or_persistence_needed_future().poll_is_complete());
+               assert!(nodes[1].node.get_event_or_persistence_needed_future().poll_is_complete());
                assert_ne!(nodes[0].node.list_channels()[0], node_a_chan_info);
                assert_ne!(nodes[1].node.list_channels()[0], node_b_chan_info);
        }