Merge pull request #2611 from TheBlueMatt/2023-09-no-close-on-bad-update
[rust-lightning] / lightning / src / ln / channelmanager.rs
index 918aca8e77a705201c3afab9e0890df02d280d30..392f3e1cb052cf196b50b2b7e3734b62ab167e4c 100644 (file)
@@ -64,7 +64,7 @@ use crate::util::ser::{BigSize, FixedLengthReader, Readable, ReadableArgs, Maybe
 use crate::util::logger::{Level, Logger};
 use crate::util::errors::APIError;
 
-use alloc::collections::BTreeMap;
+use alloc::collections::{btree_map, BTreeMap};
 
 use crate::io;
 use crate::prelude::*;
@@ -77,7 +77,7 @@ use core::time::Duration;
 use core::ops::Deref;
 
 // Re-export this for use in the public API.
-pub use crate::ln::outbound_payment::{PaymentSendFailure, Retry, RetryableSendFailure, RecipientOnionFields};
+pub use crate::ln::outbound_payment::{PaymentSendFailure, ProbeSendFailure, Retry, RetryableSendFailure, RecipientOnionFields};
 use crate::ln::script::ShutdownScript;
 
 // We hold various information about HTLC relay in the HTLC objects in Channel itself:
@@ -839,33 +839,46 @@ pub type SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, 'h, M, T, F, L> =
                &'g L
        >;
 
-macro_rules! define_test_pub_trait { ($vis: vis) => {
-/// A trivial trait which describes any [`ChannelManager`] used in testing.
-$vis trait AChannelManager {
+/// A trivial trait which describes any [`ChannelManager`].
+pub trait AChannelManager {
+       /// A type implementing [`chain::Watch`].
        type Watch: chain::Watch<Self::Signer> + ?Sized;
+       /// A type that may be dereferenced to [`Self::Watch`].
        type M: Deref<Target = Self::Watch>;
+       /// A type implementing [`BroadcasterInterface`].
        type Broadcaster: BroadcasterInterface + ?Sized;
+       /// A type that may be dereferenced to [`Self::Broadcaster`].
        type T: Deref<Target = Self::Broadcaster>;
+       /// A type implementing [`EntropySource`].
        type EntropySource: EntropySource + ?Sized;
+       /// A type that may be dereferenced to [`Self::EntropySource`].
        type ES: Deref<Target = Self::EntropySource>;
+       /// A type implementing [`NodeSigner`].
        type NodeSigner: NodeSigner + ?Sized;
+       /// A type that may be dereferenced to [`Self::NodeSigner`].
        type NS: Deref<Target = Self::NodeSigner>;
+       /// A type implementing [`WriteableEcdsaChannelSigner`].
        type Signer: WriteableEcdsaChannelSigner + Sized;
+       /// A type implementing [`SignerProvider`] for [`Self::Signer`].
        type SignerProvider: SignerProvider<Signer = Self::Signer> + ?Sized;
+       /// A type that may be dereferenced to [`Self::SignerProvider`].
        type SP: Deref<Target = Self::SignerProvider>;
+       /// A type implementing [`FeeEstimator`].
        type FeeEstimator: FeeEstimator + ?Sized;
+       /// A type that may be dereferenced to [`Self::FeeEstimator`].
        type F: Deref<Target = Self::FeeEstimator>;
+       /// A type implementing [`Router`].
        type Router: Router + ?Sized;
+       /// A type that may be dereferenced to [`Self::Router`].
        type R: Deref<Target = Self::Router>;
+       /// A type implementing [`Logger`].
        type Logger: Logger + ?Sized;
+       /// A type that may be dereferenced to [`Self::Logger`].
        type L: Deref<Target = Self::Logger>;
+       /// Returns a reference to the actual [`ChannelManager`] object.
        fn get_cm(&self) -> &ChannelManager<Self::M, Self::T, Self::ES, Self::NS, Self::SP, Self::F, Self::R, Self::L>;
 }
-} }
-#[cfg(any(test, feature = "_test_utils"))]
-define_test_pub_trait!(pub);
-#[cfg(not(any(test, feature = "_test_utils")))]
-define_test_pub_trait!(pub(crate));
+
 impl<M: Deref, T: Deref, ES: Deref, NS: Deref, SP: Deref, F: Deref, R: Deref, L: Deref> AChannelManager
 for ChannelManager<M, T, ES, NS, SP, F, R, L>
 where
@@ -910,12 +923,14 @@ where
 /// called [`funding_transaction_generated`] for outbound channels) being closed.
 ///
 /// Note that you can be a bit lazier about writing out `ChannelManager` than you can be with
-/// [`ChannelMonitor`]. With [`ChannelMonitor`] you MUST write each monitor update out to disk before
-/// returning from [`chain::Watch::watch_channel`]/[`update_channel`], with ChannelManagers, writing updates
-/// happens out-of-band (and will prevent any other `ChannelManager` operations from occurring during
-/// the serialization process). If the deserialized version is out-of-date compared to the
-/// [`ChannelMonitor`] passed by reference to [`read`], those channels will be force-closed based on the
-/// `ChannelMonitor` state and no funds will be lost (mod on-chain transaction fees).
+/// [`ChannelMonitor`]. With [`ChannelMonitor`] you MUST durably write each
+/// [`ChannelMonitorUpdate`] before returning from
+/// [`chain::Watch::watch_channel`]/[`update_channel`] or before completing async writes. With
+/// `ChannelManager`s, writing updates happens out-of-band (and will prevent any other
+/// `ChannelManager` operations from occurring during the serialization process). If the
+/// deserialized version is out-of-date compared to the [`ChannelMonitor`] passed by reference to
+/// [`read`], those channels will be force-closed based on the `ChannelMonitor` state and no funds
+/// will be lost (modulo on-chain transaction fees).
 ///
 /// Note that the deserializer is only implemented for `(`[`BlockHash`]`, `[`ChannelManager`]`)`, which
 /// tells you the last block hash which was connected. You should get the best block tip before using the manager.
@@ -1186,6 +1201,12 @@ where
        /// `PersistenceNotifierGuard::notify_on_drop(..)` and pass the lock to it, to ensure the
        /// Notifier the lock contains sends out a notification when the lock is released.
        total_consistency_lock: RwLock<()>,
+       /// Tracks the progress of channels going through batch funding by whether funding_signed was
+       /// received and the monitor has been persisted.
+       ///
+       /// This information does not need to be persisted as funding nodes can forget
+       /// unfunded channels upon disconnection.
+       funding_batch_states: Mutex<BTreeMap<Txid, Vec<(ChannelId, PublicKey, bool)>>>,
 
        background_events_processed_since_startup: AtomicBool,
 
@@ -1773,7 +1794,7 @@ macro_rules! handle_error {
                                let mut msg_events = Vec::with_capacity(2);
 
                                if let Some((shutdown_res, update_option)) = shutdown_finish {
-                                       $self.finish_force_close_channel(shutdown_res);
+                                       $self.finish_close_channel(shutdown_res);
                                        if let Some(update) = update_option {
                                                msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
                                                        msg: update
@@ -2010,9 +2031,54 @@ macro_rules! handle_monitor_update_completion {
                }
 
                let channel_id = $chan.context.channel_id();
+               let unbroadcasted_batch_funding_txid = $chan.context.unbroadcasted_batch_funding_txid();
                core::mem::drop($peer_state_lock);
                core::mem::drop($per_peer_state_lock);
 
+               // If the channel belongs to a batch funding transaction, the progress of the batch
+               // should be updated as we have received funding_signed and persisted the monitor.
+               if let Some(txid) = unbroadcasted_batch_funding_txid {
+                       let mut funding_batch_states = $self.funding_batch_states.lock().unwrap();
+                       let mut batch_completed = false;
+                       if let Some(batch_state) = funding_batch_states.get_mut(&txid) {
+                               let channel_state = batch_state.iter_mut().find(|(chan_id, pubkey, _)| (
+                                       *chan_id == channel_id &&
+                                       *pubkey == counterparty_node_id
+                               ));
+                               if let Some(channel_state) = channel_state {
+                                       channel_state.2 = true;
+                               } else {
+                                       debug_assert!(false, "Missing channel batch state for channel which completed initial monitor update");
+                               }
+                               batch_completed = batch_state.iter().all(|(_, _, completed)| *completed);
+                       } else {
+                               debug_assert!(false, "Missing batch state for channel which completed initial monitor update");
+                       }
+
+                       // When all channels in a batched funding transaction have become ready, it is not necessary
+                       // to track the progress of the batch anymore and the state of the channels can be updated.
+                       if batch_completed {
+                               let removed_batch_state = funding_batch_states.remove(&txid).into_iter().flatten();
+                               let per_peer_state = $self.per_peer_state.read().unwrap();
+                               let mut batch_funding_tx = None;
+                               for (channel_id, counterparty_node_id, _) in removed_batch_state {
+                                       if let Some(peer_state_mutex) = per_peer_state.get(&counterparty_node_id) {
+                                               let mut peer_state = peer_state_mutex.lock().unwrap();
+                                               if let Some(ChannelPhase::Funded(chan)) = peer_state.channel_by_id.get_mut(&channel_id) {
+                                                       batch_funding_tx = batch_funding_tx.or_else(|| chan.context.unbroadcasted_funding());
+                                                       chan.set_batch_ready();
+                                                       let mut pending_events = $self.pending_events.lock().unwrap();
+                                                       emit_channel_pending_event!(pending_events, chan);
+                                               }
+                                       }
+                               }
+                               if let Some(tx) = batch_funding_tx {
+                                       log_info!($self.logger, "Broadcasting batch funding transaction with txid {}", tx.txid());
+                                       $self.tx_broadcaster.broadcast_transactions(&[&tx]);
+                               }
+                       }
+               }
+
                $self.handle_monitor_update_completion_actions(update_actions);
 
                if let Some(forwards) = htlc_forwards {
@@ -2027,56 +2093,30 @@ macro_rules! handle_monitor_update_completion {
 }
 
 macro_rules! handle_new_monitor_update {
-       ($self: ident, $update_res: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan: expr, _internal, $remove: expr, $completed: expr) => { {
-               // update_maps_on_chan_removal needs to be able to take id_to_peer, so make sure we can in
-               // any case so that it won't deadlock.
-               debug_assert_ne!($self.id_to_peer.held_by_thread(), LockHeldState::HeldByThread);
+       ($self: ident, $update_res: expr, $chan: expr, _internal, $completed: expr) => { {
                debug_assert!($self.background_events_processed_since_startup.load(Ordering::Acquire));
                match $update_res {
+                       ChannelMonitorUpdateStatus::UnrecoverableError => {
+                               let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down.";
+                               log_error!($self.logger, "{}", err_str);
+                               panic!("{}", err_str);
+                       },
                        ChannelMonitorUpdateStatus::InProgress => {
                                log_debug!($self.logger, "ChannelMonitor update for {} in flight, holding messages until the update completes.",
                                        &$chan.context.channel_id());
-                               Ok(false)
-                       },
-                       ChannelMonitorUpdateStatus::PermanentFailure => {
-                               log_error!($self.logger, "Closing channel {} due to monitor update ChannelMonitorUpdateStatus::PermanentFailure",
-                                       &$chan.context.channel_id());
-                               update_maps_on_chan_removal!($self, &$chan.context);
-                               let res = Err(MsgHandleErrInternal::from_finish_shutdown(
-                                       "ChannelMonitor storage failure".to_owned(), $chan.context.channel_id(),
-                                       $chan.context.get_user_id(), $chan.context.force_shutdown(false),
-                                       $self.get_channel_update_for_broadcast(&$chan).ok(), $chan.context.get_value_satoshis()));
-                               $remove;
-                               res
+                               false
                        },
                        ChannelMonitorUpdateStatus::Completed => {
                                $completed;
-                               Ok(true)
+                               true
                        },
                }
        } };
-       ($self: ident, $update_res: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan: expr, MANUALLY_REMOVING_INITIAL_MONITOR, $remove: expr) => {
-               handle_new_monitor_update!($self, $update_res, $peer_state_lock, $peer_state,
-                       $per_peer_state_lock, $chan, _internal, $remove,
+       ($self: ident, $update_res: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan: expr, INITIAL_MONITOR) => {
+               handle_new_monitor_update!($self, $update_res, $chan, _internal,
                        handle_monitor_update_completion!($self, $peer_state_lock, $peer_state, $per_peer_state_lock, $chan))
        };
-       ($self: ident, $update_res: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan_entry: expr, INITIAL_MONITOR) => {
-               if let ChannelPhase::Funded(chan) = $chan_entry.get_mut() {
-                       handle_new_monitor_update!($self, $update_res, $peer_state_lock, $peer_state,
-                               $per_peer_state_lock, chan, MANUALLY_REMOVING_INITIAL_MONITOR, { $chan_entry.remove() })
-               } else {
-                       // We're not supposed to handle monitor updates for unfunded channels (they have no monitors to
-                       // update).
-                       debug_assert!(false);
-                       let channel_id = *$chan_entry.key();
-                       let (_, err) = convert_chan_phase_err!($self, ChannelError::Close(
-                               "Cannot update monitor for unfunded channels as they don't have monitors yet".into()),
-                               $chan_entry.get_mut(), &channel_id);
-                       $chan_entry.remove();
-                       Err(err)
-               }
-       };
-       ($self: ident, $funding_txo: expr, $update: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan: expr, MANUALLY_REMOVING, $remove: expr) => { {
+       ($self: ident, $funding_txo: expr, $update: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan: expr) => { {
                let in_flight_updates = $peer_state.in_flight_monitor_updates.entry($funding_txo)
                        .or_insert_with(Vec::new);
                // During startup, we push monitor updates as background events through to here in
@@ -2088,8 +2128,7 @@ macro_rules! handle_new_monitor_update {
                                in_flight_updates.len() - 1
                        });
                let update_res = $self.chain_monitor.update_channel($funding_txo, &in_flight_updates[idx]);
-               handle_new_monitor_update!($self, update_res, $peer_state_lock, $peer_state,
-                       $per_peer_state_lock, $chan, _internal, $remove,
+               handle_new_monitor_update!($self, update_res, $chan, _internal,
                        {
                                let _ = in_flight_updates.remove(idx);
                                if in_flight_updates.is_empty() && $chan.blocked_monitor_updates_pending() == 0 {
@@ -2097,22 +2136,6 @@ macro_rules! handle_new_monitor_update {
                                }
                        })
        } };
-       ($self: ident, $funding_txo: expr, $update: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan_entry: expr) => {
-               if let ChannelPhase::Funded(chan) = $chan_entry.get_mut() {
-                       handle_new_monitor_update!($self, $funding_txo, $update, $peer_state_lock, $peer_state,
-                               $per_peer_state_lock, chan, MANUALLY_REMOVING, { $chan_entry.remove() })
-               } else {
-                       // We're not supposed to handle monitor updates for unfunded channels (they have no monitors to
-                       // update).
-                       debug_assert!(false);
-                       let channel_id = *$chan_entry.key();
-                       let (_, err) = convert_chan_phase_err!($self, ChannelError::Close(
-                               "Cannot update monitor for unfunded channels as they don't have monitors yet".into()),
-                               $chan_entry.get_mut(), &channel_id);
-                       $chan_entry.remove();
-                       Err(err)
-               }
-       }
 }
 
 macro_rules! process_events_body {
@@ -2258,9 +2281,9 @@ where
                        pending_background_events: Mutex::new(Vec::new()),
                        total_consistency_lock: RwLock::new(()),
                        background_events_processed_since_startup: AtomicBool::new(false),
-
                        event_persist_notifier: Notifier::new(),
                        needs_persist_flag: AtomicBool::new(false),
+                       funding_batch_states: Mutex::new(BTreeMap::new()),
 
                        entropy_source,
                        node_signer,
@@ -2525,61 +2548,67 @@ where
                let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self);
 
                let mut failed_htlcs: Vec<(HTLCSource, PaymentHash)>;
-               let result: Result<(), _> = loop {
-                       {
-                               let per_peer_state = self.per_peer_state.read().unwrap();
+               let mut shutdown_result = None;
+               loop {
+                       let per_peer_state = self.per_peer_state.read().unwrap();
 
-                               let peer_state_mutex = per_peer_state.get(counterparty_node_id)
-                                       .ok_or_else(|| APIError::ChannelUnavailable { err: format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id) })?;
+                       let peer_state_mutex = per_peer_state.get(counterparty_node_id)
+                               .ok_or_else(|| APIError::ChannelUnavailable { err: format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id) })?;
 
-                               let mut peer_state_lock = peer_state_mutex.lock().unwrap();
-                               let peer_state = &mut *peer_state_lock;
+                       let mut peer_state_lock = peer_state_mutex.lock().unwrap();
+                       let peer_state = &mut *peer_state_lock;
 
-                               match peer_state.channel_by_id.entry(channel_id.clone()) {
-                                       hash_map::Entry::Occupied(mut chan_phase_entry) => {
-                                               if let ChannelPhase::Funded(chan) = chan_phase_entry.get_mut() {
-                                                       let funding_txo_opt = chan.context.get_funding_txo();
-                                                       let their_features = &peer_state.latest_features;
-                                                       let (shutdown_msg, mut monitor_update_opt, htlcs) =
-                                                               chan.get_shutdown(&self.signer_provider, their_features, target_feerate_sats_per_1000_weight, override_shutdown_script)?;
-                                                       failed_htlcs = htlcs;
+                       match peer_state.channel_by_id.entry(channel_id.clone()) {
+                               hash_map::Entry::Occupied(mut chan_phase_entry) => {
+                                       if let ChannelPhase::Funded(chan) = chan_phase_entry.get_mut() {
+                                               let funding_txo_opt = chan.context.get_funding_txo();
+                                               let their_features = &peer_state.latest_features;
+                                               let unbroadcasted_batch_funding_txid = chan.context.unbroadcasted_batch_funding_txid();
+                                               let (shutdown_msg, mut monitor_update_opt, htlcs) =
+                                                       chan.get_shutdown(&self.signer_provider, their_features, target_feerate_sats_per_1000_weight, override_shutdown_script)?;
+                                               failed_htlcs = htlcs;
+
+                                               // We can send the `shutdown` message before updating the `ChannelMonitor`
+                                               // here as we don't need the monitor update to complete until we send a
+                                               // `shutdown_signed`, which we'll delay if we're pending a monitor update.
+                                               peer_state.pending_msg_events.push(events::MessageSendEvent::SendShutdown {
+                                                       node_id: *counterparty_node_id,
+                                                       msg: shutdown_msg,
+                                               });
 
-                                                       // We can send the `shutdown` message before updating the `ChannelMonitor`
-                                                       // here as we don't need the monitor update to complete until we send a
-                                                       // `shutdown_signed`, which we'll delay if we're pending a monitor update.
-                                                       peer_state.pending_msg_events.push(events::MessageSendEvent::SendShutdown {
-                                                               node_id: *counterparty_node_id,
-                                                               msg: shutdown_msg,
-                                                       });
+                                               debug_assert!(monitor_update_opt.is_none() || !chan.is_shutdown(),
+                                                       "We can't both complete shutdown and generate a monitor update");
 
-                                                       // Update the monitor with the shutdown script if necessary.
-                                                       if let Some(monitor_update) = monitor_update_opt.take() {
-                                                               break handle_new_monitor_update!(self, funding_txo_opt.unwrap(), monitor_update,
-                                                                       peer_state_lock, peer_state, per_peer_state, chan_phase_entry).map(|_| ());
-                                                       }
+                                               // Update the monitor with the shutdown script if necessary.
+                                               if let Some(monitor_update) = monitor_update_opt.take() {
+                                                       handle_new_monitor_update!(self, funding_txo_opt.unwrap(), monitor_update,
+                                                               peer_state_lock, peer_state, per_peer_state, chan);
+                                                       break;
+                                               }
 
-                                                       if chan.is_shutdown() {
-                                                               if let ChannelPhase::Funded(chan) = remove_channel_phase!(self, chan_phase_entry) {
-                                                                       if let Ok(channel_update) = self.get_channel_update_for_broadcast(&chan) {
-                                                                               peer_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
-                                                                                       msg: channel_update
-                                                                               });
-                                                                       }
-                                                                       self.issue_channel_close_events(&chan.context, ClosureReason::HolderForceClosed);
+                                               if chan.is_shutdown() {
+                                                       if let ChannelPhase::Funded(chan) = remove_channel_phase!(self, chan_phase_entry) {
+                                                               if let Ok(channel_update) = self.get_channel_update_for_broadcast(&chan) {
+                                                                       peer_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
+                                                                               msg: channel_update
+                                                                       });
                                                                }
+                                                               self.issue_channel_close_events(&chan.context, ClosureReason::HolderForceClosed);
+                                                               shutdown_result = Some((None, Vec::new(), unbroadcasted_batch_funding_txid));
                                                        }
-                                                       break Ok(());
                                                }
-                                       },
-                                       hash_map::Entry::Vacant(_) => (),
-                               }
+                                               break;
+                                       }
+                               },
+                               hash_map::Entry::Vacant(_) => {
+                                       // If we reach this point, it means that the channel_id either refers to an unfunded channel or
+                                       // it does not exist for this peer. Either way, we can attempt to force-close it.
+                                       //
+                                       // An appropriate error will be returned for non-existence of the channel if that's the case.
+                                       return self.force_close_channel_with_peer(&channel_id, counterparty_node_id, None, false).map(|_| ())
+                               },
                        }
-                       // If we reach this point, it means that the channel_id either refers to an unfunded channel or
-                       // it does not exist for this peer. Either way, we can attempt to force-close it.
-                       //
-                       // An appropriate error will be returned for non-existence of the channel if that's the case.
-                       return self.force_close_channel_with_peer(&channel_id, counterparty_node_id, None, false).map(|_| ())
-               };
+               }
 
                for htlc_source in failed_htlcs.drain(..) {
                        let reason = HTLCFailReason::from_failure_code(0x4000 | 8);
@@ -2587,7 +2616,10 @@ where
                        self.fail_htlc_backwards_internal(&htlc_source.0, &htlc_source.1, &reason, receiver);
                }
 
-               let _ = handle_error!(self, result, *counterparty_node_id);
+               if let Some(shutdown_result) = shutdown_result {
+                       self.finish_close_channel(shutdown_result);
+               }
+
                Ok(())
        }
 
@@ -2652,9 +2684,14 @@ where
                self.close_channel_internal(channel_id, counterparty_node_id, target_feerate_sats_per_1000_weight, shutdown_script)
        }
 
-       #[inline]
-       fn finish_force_close_channel(&self, shutdown_res: ShutdownResult) {
-               let (monitor_update_option, mut failed_htlcs) = shutdown_res;
+       fn finish_close_channel(&self, shutdown_res: ShutdownResult) {
+               debug_assert_ne!(self.per_peer_state.held_by_thread(), LockHeldState::HeldByThread);
+               #[cfg(debug_assertions)]
+               for (_, peer) in self.per_peer_state.read().unwrap().iter() {
+                       debug_assert_ne!(peer.held_by_thread(), LockHeldState::HeldByThread);
+               }
+
+               let (monitor_update_option, mut failed_htlcs, unbroadcasted_batch_funding_txid) = shutdown_res;
                log_debug!(self.logger, "Finishing force-closure of channel with {} HTLCs to fail", failed_htlcs.len());
                for htlc_source in failed_htlcs.drain(..) {
                        let (source, payment_hash, counterparty_node_id, channel_id) = htlc_source;
@@ -2669,6 +2706,31 @@ where
                        // ignore the result here.
                        let _ = self.chain_monitor.update_channel(funding_txo, &monitor_update);
                }
+               let mut shutdown_results = Vec::new();
+               if let Some(txid) = unbroadcasted_batch_funding_txid {
+                       let mut funding_batch_states = self.funding_batch_states.lock().unwrap();
+                       let affected_channels = funding_batch_states.remove(&txid).into_iter().flatten();
+                       let per_peer_state = self.per_peer_state.read().unwrap();
+                       let mut has_uncompleted_channel = None;
+                       for (channel_id, counterparty_node_id, state) in affected_channels {
+                               if let Some(peer_state_mutex) = per_peer_state.get(&counterparty_node_id) {
+                                       let mut peer_state = peer_state_mutex.lock().unwrap();
+                                       if let Some(mut chan) = peer_state.channel_by_id.remove(&channel_id) {
+                                               update_maps_on_chan_removal!(self, &chan.context());
+                                               self.issue_channel_close_events(&chan.context(), ClosureReason::FundingBatchClosure);
+                                               shutdown_results.push(chan.context_mut().force_shutdown(false));
+                                       }
+                               }
+                               has_uncompleted_channel = Some(has_uncompleted_channel.map_or(!state, |v| v || !state));
+                       }
+                       debug_assert!(
+                               has_uncompleted_channel.unwrap_or(true),
+                               "Closing a batch where all channels have completed initial monitor update",
+                       );
+               }
+               for shutdown_result in shutdown_results.drain(..) {
+                       self.finish_close_channel(shutdown_result);
+               }
        }
 
        /// `peer_msg` should be set when we receive a message from a peer, but not set when the
@@ -2679,8 +2741,7 @@ where
                let peer_state_mutex = per_peer_state.get(peer_node_id)
                        .ok_or_else(|| APIError::ChannelUnavailable { err: format!("Can't find a peer matching the passed counterparty node_id {}", peer_node_id) })?;
                let (update_opt, counterparty_node_id) = {
-                       let mut peer_state_lock = peer_state_mutex.lock().unwrap();
-                       let peer_state = &mut *peer_state_lock;
+                       let mut peer_state = peer_state_mutex.lock().unwrap();
                        let closure_reason = if let Some(peer_msg) = peer_msg {
                                ClosureReason::CounterpartyForceClosed { peer_msg: UntrustedString(peer_msg.to_string()) }
                        } else {
@@ -2690,13 +2751,15 @@ where
                                log_error!(self.logger, "Force-closing channel {}", channel_id);
                                self.issue_channel_close_events(&chan_phase_entry.get().context(), closure_reason);
                                let mut chan_phase = remove_channel_phase!(self, chan_phase_entry);
+                               mem::drop(peer_state);
+                               mem::drop(per_peer_state);
                                match chan_phase {
                                        ChannelPhase::Funded(mut chan) => {
-                                               self.finish_force_close_channel(chan.context.force_shutdown(broadcast));
+                                               self.finish_close_channel(chan.context.force_shutdown(broadcast));
                                                (self.get_channel_update_for_broadcast(&chan).ok(), chan.context.get_counterparty_node_id())
                                        },
                                        ChannelPhase::UnfundedOutboundV1(_) | ChannelPhase::UnfundedInboundV1(_) => {
-                                               self.finish_force_close_channel(chan_phase.context_mut().force_shutdown(false));
+                                               self.finish_close_channel(chan_phase.context_mut().force_shutdown(false));
                                                // Unfunded channel has no update
                                                (None, chan_phase.context().get_counterparty_node_id())
                                        },
@@ -2712,10 +2775,17 @@ where
                        }
                };
                if let Some(update) = update_opt {
-                       let mut peer_state = peer_state_mutex.lock().unwrap();
-                       peer_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
-                               msg: update
-                       });
+                       // Try to send the `BroadcastChannelUpdate` to the peer we just force-closed on, but if
+                       // not try to broadcast it via whatever peer we have.
+                       let per_peer_state = self.per_peer_state.read().unwrap();
+                       let a_peer_state_opt = per_peer_state.get(peer_node_id)
+                               .ok_or(per_peer_state.values().next());
+                       if let Ok(a_peer_state_mutex) = a_peer_state_opt {
+                               let mut a_peer_state = a_peer_state_mutex.lock().unwrap();
+                               a_peer_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
+                                       msg: update
+                               });
+                       }
                }
 
                Ok(counterparty_node_id)
@@ -2795,7 +2865,7 @@ where
                let (short_channel_id, amt_to_forward, outgoing_cltv_value) = match hop_data {
                        msgs::InboundOnionPayload::Forward { short_channel_id, amt_to_forward, outgoing_cltv_value } =>
                                (short_channel_id, amt_to_forward, outgoing_cltv_value),
-                       msgs::InboundOnionPayload::Receive { .. } =>
+                       msgs::InboundOnionPayload::Receive { .. } | msgs::InboundOnionPayload::BlindedReceive { .. } =>
                                return Err(InboundOnionErr {
                                        msg: "Final Node OnionHopData provided for us as an intermediary node",
                                        err_code: 0x4000 | 22,
@@ -2827,12 +2897,19 @@ where
                                payment_data, keysend_preimage, custom_tlvs, amt_msat, outgoing_cltv_value, payment_metadata, ..
                        } =>
                                (payment_data, keysend_preimage, custom_tlvs, amt_msat, outgoing_cltv_value, payment_metadata),
-                       _ =>
+                       msgs::InboundOnionPayload::BlindedReceive {
+                               amt_msat, total_msat, outgoing_cltv_value, payment_secret, ..
+                       } => {
+                               let payment_data = msgs::FinalOnionHopData { payment_secret, total_msat };
+                               (Some(payment_data), None, Vec::new(), amt_msat, outgoing_cltv_value, None)
+                       }
+                       msgs::InboundOnionPayload::Forward { .. } => {
                                return Err(InboundOnionErr {
                                        err_code: 0x4000|22,
                                        err_data: Vec::new(),
                                        msg: "Got non final data with an HMAC of 0",
-                               }),
+                               })
+                       },
                };
                // final_incorrect_cltv_expiry
                if outgoing_cltv_value > cltv_expiry {
@@ -2972,7 +3049,10 @@ where
                        }
                }
 
-               let next_hop = match onion_utils::decode_next_payment_hop(shared_secret, &msg.onion_routing_packet.hop_data[..], msg.onion_routing_packet.hmac, msg.payment_hash) {
+               let next_hop = match onion_utils::decode_next_payment_hop(
+                       shared_secret, &msg.onion_routing_packet.hop_data[..], msg.onion_routing_packet.hmac,
+                       msg.payment_hash, &self.node_signer
+               ) {
                        Ok(res) => res,
                        Err(onion_utils::OnionDecodeErr::Malformed { err_msg, err_code }) => {
                                return_malformed_err!(err_msg, err_code);
@@ -2994,7 +3074,9 @@ where
                        // We'll do receive checks in [`Self::construct_pending_htlc_info`] so we have access to the
                        // inbound channel's state.
                        onion_utils::Hop::Receive { .. } => return Ok((next_hop, shared_secret, None)),
-                       onion_utils::Hop::Forward { next_hop_data: msgs::InboundOnionPayload::Receive { .. }, .. } => {
+                       onion_utils::Hop::Forward { next_hop_data: msgs::InboundOnionPayload::Receive { .. }, .. } |
+                               onion_utils::Hop::Forward { next_hop_data: msgs::InboundOnionPayload::BlindedReceive { .. }, .. } =>
+                       {
                                return_err!("Final Node OnionHopData provided for us as an intermediary node", 0x4000 | 22, &[0; 0]);
                        }
                };
@@ -3317,9 +3399,8 @@ where
                                                        }, onion_packet, None, &self.fee_estimator, &self.logger);
                                                match break_chan_phase_entry!(self, send_res, chan_phase_entry) {
                                                        Some(monitor_update) => {
-                                                               match handle_new_monitor_update!(self, funding_txo, monitor_update, peer_state_lock, peer_state, per_peer_state, chan_phase_entry) {
-                                                                       Err(e) => break Err(e),
-                                                                       Ok(false) => {
+                                                               match handle_new_monitor_update!(self, funding_txo, monitor_update, peer_state_lock, peer_state, per_peer_state, chan) {
+                                                                       false => {
                                                                                // Note that MonitorUpdateInProgress here indicates (per function
                                                                                // docs) that we will resend the commitment update once monitor
                                                                                // updating completes. Therefore, we must return an error
@@ -3328,7 +3409,7 @@ where
                                                                                // MonitorUpdateInProgress, below.
                                                                                return Err(APIError::MonitorUpdateInProgress);
                                                                        },
-                                                                       Ok(true) => {},
+                                                                       true => {},
                                                                }
                                                        },
                                                        None => {},
@@ -3534,10 +3615,121 @@ where
                outbound_payment::payment_is_probe(payment_hash, payment_id, self.probing_cookie_secret)
        }
 
+       /// Sends payment probes over all paths of a route that would be used to pay the given
+       /// amount to the given `node_id`.
+       ///
+       /// See [`ChannelManager::send_preflight_probes`] for more information.
+       pub fn send_spontaneous_preflight_probes(
+               &self, node_id: PublicKey, amount_msat: u64, final_cltv_expiry_delta: u32,
+               liquidity_limit_multiplier: Option<u64>,
+       ) -> Result<Vec<(PaymentHash, PaymentId)>, ProbeSendFailure> {
+               let payment_params =
+                       PaymentParameters::from_node_id(node_id, final_cltv_expiry_delta);
+
+               let route_params = RouteParameters::from_payment_params_and_value(payment_params, amount_msat);
+
+               self.send_preflight_probes(route_params, liquidity_limit_multiplier)
+       }
+
+       /// Sends payment probes over all paths of a route that would be used to pay a route found
+       /// according to the given [`RouteParameters`].
+       ///
+       /// This may be used to send "pre-flight" probes, i.e., to train our scorer before conducting
+       /// the actual payment. Note this is only useful if there likely is sufficient time for the
+       /// probe to settle before sending out the actual payment, e.g., when waiting for user
+       /// confirmation in a wallet UI.
+       ///
+       /// Otherwise, there is a chance the probe could take up some liquidity needed to complete the
+       /// actual payment. Users should therefore be cautious and might avoid sending probes if
+       /// liquidity is scarce and/or they don't expect the probe to return before they send the
+       /// payment. To mitigate this issue, channels with available liquidity less than the required
+       /// amount times the given `liquidity_limit_multiplier` won't be used to send pre-flight
+       /// probes. If `None` is given as `liquidity_limit_multiplier`, it defaults to `3`.
+       pub fn send_preflight_probes(
+               &self, route_params: RouteParameters, liquidity_limit_multiplier: Option<u64>,
+       ) -> Result<Vec<(PaymentHash, PaymentId)>, ProbeSendFailure> {
+               let liquidity_limit_multiplier = liquidity_limit_multiplier.unwrap_or(3);
+
+               let payer = self.get_our_node_id();
+               let usable_channels = self.list_usable_channels();
+               let first_hops = usable_channels.iter().collect::<Vec<_>>();
+               let inflight_htlcs = self.compute_inflight_htlcs();
+
+               let route = self
+                       .router
+                       .find_route(&payer, &route_params, Some(&first_hops), inflight_htlcs)
+                       .map_err(|e| {
+                               log_error!(self.logger, "Failed to find path for payment probe: {:?}", e);
+                               ProbeSendFailure::RouteNotFound
+                       })?;
+
+               let mut used_liquidity_map = HashMap::with_capacity(first_hops.len());
+
+               let mut res = Vec::new();
+
+               for mut path in route.paths {
+                       // If the last hop is probably an unannounced channel we refrain from probing all the
+                       // way through to the end and instead probe up to the second-to-last channel.
+                       while let Some(last_path_hop) = path.hops.last() {
+                               if last_path_hop.maybe_announced_channel {
+                                       // We found a potentially announced last hop.
+                                       break;
+                               } else {
+                                       // Drop the last hop, as it's likely unannounced.
+                                       log_debug!(
+                                               self.logger,
+                                               "Avoided sending payment probe all the way to last hop {} as it is likely unannounced.",
+                                               last_path_hop.short_channel_id
+                                       );
+                                       let final_value_msat = path.final_value_msat();
+                                       path.hops.pop();
+                                       if let Some(new_last) = path.hops.last_mut() {
+                                               new_last.fee_msat += final_value_msat;
+                                       }
+                               }
+                       }
+
+                       if path.hops.len() < 2 {
+                               log_debug!(
+                                       self.logger,
+                                       "Skipped sending payment probe over path with less than two hops."
+                               );
+                               continue;
+                       }
+
+                       if let Some(first_path_hop) = path.hops.first() {
+                               if let Some(first_hop) = first_hops.iter().find(|h| {
+                                       h.get_outbound_payment_scid() == Some(first_path_hop.short_channel_id)
+                               }) {
+                                       let path_value = path.final_value_msat() + path.fee_msat();
+                                       let used_liquidity =
+                                               used_liquidity_map.entry(first_path_hop.short_channel_id).or_insert(0);
+
+                                       if first_hop.next_outbound_htlc_limit_msat
+                                               < (*used_liquidity + path_value) * liquidity_limit_multiplier
+                                       {
+                                               log_debug!(self.logger, "Skipped sending payment probe to avoid putting channel {} under the liquidity limit.", first_path_hop.short_channel_id);
+                                               continue;
+                                       } else {
+                                               *used_liquidity += path_value;
+                                       }
+                               }
+                       }
+
+                       res.push(self.send_probe(path).map_err(|e| {
+                               log_error!(self.logger, "Failed to send pre-flight probe: {:?}", e);
+                               ProbeSendFailure::SendingFailed(e)
+                       })?);
+               }
+
+               Ok(res)
+       }
+
        /// Handles the generation of a funding transaction, optionally (for tests) with a function
        /// which checks the correctness of the funding transaction given the associated channel.
-       fn funding_transaction_generated_intern<FundingOutput: Fn(&OutboundV1Channel<SP>, &Transaction) -> Result<OutPoint, APIError>>(
-               &self, temporary_channel_id: &ChannelId, counterparty_node_id: &PublicKey, funding_transaction: Transaction, find_funding_output: FundingOutput
+       fn funding_transaction_generated_intern<FundingOutput: FnMut(&OutboundV1Channel<SP>, &Transaction) -> Result<OutPoint, APIError>>(
+               &self, temporary_channel_id: &ChannelId, counterparty_node_id: &PublicKey, funding_transaction: Transaction, is_batch_funding: bool,
+               mut find_funding_output: FundingOutput,
        ) -> Result<(), APIError> {
                let per_peer_state = self.per_peer_state.read().unwrap();
                let peer_state_mutex = per_peer_state.get(counterparty_node_id)
@@ -3549,7 +3741,7 @@ where
                        Some(ChannelPhase::UnfundedOutboundV1(chan)) => {
                                let funding_txo = find_funding_output(&chan, &funding_transaction)?;
 
-                               let funding_res = chan.get_funding_created(funding_transaction, funding_txo, &self.logger)
+                               let funding_res = chan.get_funding_created(funding_transaction, funding_txo, is_batch_funding, &self.logger)
                                        .map_err(|(mut chan, e)| if let ChannelError::Close(msg) = e {
                                                let channel_id = chan.context.channel_id();
                                                let user_id = chan.context.get_user_id();
@@ -3605,7 +3797,7 @@ where
 
        #[cfg(test)]
        pub(crate) fn funding_transaction_generated_unchecked(&self, temporary_channel_id: &ChannelId, counterparty_node_id: &PublicKey, funding_transaction: Transaction, output_index: u16) -> Result<(), APIError> {
-               self.funding_transaction_generated_intern(temporary_channel_id, counterparty_node_id, funding_transaction, |_, tx| {
+               self.funding_transaction_generated_intern(temporary_channel_id, counterparty_node_id, funding_transaction, false, |_, tx| {
                        Ok(OutPoint { txid: tx.txid(), index: output_index })
                })
        }
@@ -3641,17 +3833,37 @@ where
        /// [`Event::FundingGenerationReady`]: crate::events::Event::FundingGenerationReady
        /// [`Event::ChannelClosed`]: crate::events::Event::ChannelClosed
        pub fn funding_transaction_generated(&self, temporary_channel_id: &ChannelId, counterparty_node_id: &PublicKey, funding_transaction: Transaction) -> Result<(), APIError> {
+               self.batch_funding_transaction_generated(&[(temporary_channel_id, counterparty_node_id)], funding_transaction)
+       }
+
+       /// Call this upon creation of a batch funding transaction for the given channels.
+       ///
+       /// Return values are identical to [`Self::funding_transaction_generated`], respective to
+       /// each individual channel and transaction output.
+       ///
+       /// Do NOT broadcast the funding transaction yourself. This batch funding transcaction
+       /// will only be broadcast when we have safely received and persisted the counterparty's
+       /// signature for each channel.
+       ///
+       /// If there is an error, all channels in the batch are to be considered closed.
+       pub fn batch_funding_transaction_generated(&self, temporary_channels: &[(&ChannelId, &PublicKey)], funding_transaction: Transaction) -> Result<(), APIError> {
                let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self);
+               let mut result = Ok(());
 
                if !funding_transaction.is_coin_base() {
                        for inp in funding_transaction.input.iter() {
                                if inp.witness.is_empty() {
-                                       return Err(APIError::APIMisuseError {
+                                       result = result.and(Err(APIError::APIMisuseError {
                                                err: "Funding transaction must be fully signed and spend Segwit outputs".to_owned()
-                                       });
+                                       }));
                                }
                        }
                }
+               if funding_transaction.output.len() > u16::max_value() as usize {
+                       result = result.and(Err(APIError::APIMisuseError {
+                               err: "Transaction had more than 2^16 outputs, which is not supported".to_owned()
+                       }));
+               }
                {
                        let height = self.best_block.read().unwrap().height();
                        // Transactions are evaluated as final by network mempools if their locktime is strictly
@@ -3659,37 +3871,93 @@ where
                        // node might not have perfect sync about their blockchain views. Thus, if the wallet
                        // module is ahead of LDK, only allow one more block of headroom.
                        if !funding_transaction.input.iter().all(|input| input.sequence == Sequence::MAX) && LockTime::from(funding_transaction.lock_time).is_block_height() && funding_transaction.lock_time.0 > height + 1 {
-                               return Err(APIError::APIMisuseError {
+                               result = result.and(Err(APIError::APIMisuseError {
                                        err: "Funding transaction absolute timelock is non-final".to_owned()
-                               });
+                               }));
                        }
                }
-               self.funding_transaction_generated_intern(temporary_channel_id, counterparty_node_id, funding_transaction, |chan, tx| {
-                       if tx.output.len() > u16::max_value() as usize {
-                               return Err(APIError::APIMisuseError {
-                                       err: "Transaction had more than 2^16 outputs, which is not supported".to_owned()
-                               });
-                       }
 
-                       let mut output_index = None;
-                       let expected_spk = chan.context.get_funding_redeemscript().to_v0_p2wsh();
-                       for (idx, outp) in tx.output.iter().enumerate() {
-                               if outp.script_pubkey == expected_spk && outp.value == chan.context.get_value_satoshis() {
-                                       if output_index.is_some() {
+               let txid = funding_transaction.txid();
+               let is_batch_funding = temporary_channels.len() > 1;
+               let mut funding_batch_states = if is_batch_funding {
+                       Some(self.funding_batch_states.lock().unwrap())
+               } else {
+                       None
+               };
+               let mut funding_batch_state = funding_batch_states.as_mut().and_then(|states| {
+                       match states.entry(txid) {
+                               btree_map::Entry::Occupied(_) => {
+                                       result = result.clone().and(Err(APIError::APIMisuseError {
+                                               err: "Batch funding transaction with the same txid already exists".to_owned()
+                                       }));
+                                       None
+                               },
+                               btree_map::Entry::Vacant(vacant) => Some(vacant.insert(Vec::new())),
+                       }
+               });
+               for (channel_idx, &(temporary_channel_id, counterparty_node_id)) in temporary_channels.iter().enumerate() {
+                       result = result.and_then(|_| self.funding_transaction_generated_intern(
+                               temporary_channel_id,
+                               counterparty_node_id,
+                               funding_transaction.clone(),
+                               is_batch_funding,
+                               |chan, tx| {
+                                       let mut output_index = None;
+                                       let expected_spk = chan.context.get_funding_redeemscript().to_v0_p2wsh();
+                                       for (idx, outp) in tx.output.iter().enumerate() {
+                                               if outp.script_pubkey == expected_spk && outp.value == chan.context.get_value_satoshis() {
+                                                       if output_index.is_some() {
+                                                               return Err(APIError::APIMisuseError {
+                                                                       err: "Multiple outputs matched the expected script and value".to_owned()
+                                                               });
+                                                       }
+                                                       output_index = Some(idx as u16);
+                                               }
+                                       }
+                                       if output_index.is_none() {
                                                return Err(APIError::APIMisuseError {
-                                                       err: "Multiple outputs matched the expected script and value".to_owned()
+                                                       err: "No output matched the script_pubkey and value in the FundingGenerationReady event".to_owned()
                                                });
                                        }
-                                       output_index = Some(idx as u16);
+                                       let outpoint = OutPoint { txid: tx.txid(), index: output_index.unwrap() };
+                                       if let Some(funding_batch_state) = funding_batch_state.as_mut() {
+                                               funding_batch_state.push((outpoint.to_channel_id(), *counterparty_node_id, false));
+                                       }
+                                       Ok(outpoint)
+                               })
+                       );
+               }
+               if let Err(ref e) = result {
+                       // Remaining channels need to be removed on any error.
+                       let e = format!("Error in transaction funding: {:?}", e);
+                       let mut channels_to_remove = Vec::new();
+                       channels_to_remove.extend(funding_batch_states.as_mut()
+                               .and_then(|states| states.remove(&txid))
+                               .into_iter().flatten()
+                               .map(|(chan_id, node_id, _state)| (chan_id, node_id))
+                       );
+                       channels_to_remove.extend(temporary_channels.iter()
+                               .map(|(&chan_id, &node_id)| (chan_id, node_id))
+                       );
+                       let mut shutdown_results = Vec::new();
+                       {
+                               let per_peer_state = self.per_peer_state.read().unwrap();
+                               for (channel_id, counterparty_node_id) in channels_to_remove {
+                                       per_peer_state.get(&counterparty_node_id)
+                                               .map(|peer_state_mutex| peer_state_mutex.lock().unwrap())
+                                               .and_then(|mut peer_state| peer_state.channel_by_id.remove(&channel_id))
+                                               .map(|mut chan| {
+                                                       update_maps_on_chan_removal!(self, &chan.context());
+                                                       self.issue_channel_close_events(&chan.context(), ClosureReason::ProcessingError { err: e.clone() });
+                                                       shutdown_results.push(chan.context_mut().force_shutdown(false));
+                                               });
                                }
                        }
-                       if output_index.is_none() {
-                               return Err(APIError::APIMisuseError {
-                                       err: "No output matched the script_pubkey and value in the FundingGenerationReady event".to_owned()
-                               });
+                       for shutdown_result in shutdown_results.drain(..) {
+                               self.finish_close_channel(shutdown_result);
                        }
-                       Ok(OutPoint { txid: tx.txid(), index: output_index.unwrap() })
-               })
+               }
+               result
        }
 
        /// Atomically applies partial updates to the [`ChannelConfig`] of the given channels.
@@ -3981,7 +4249,10 @@ where
                                                                                        let phantom_pubkey_res = self.node_signer.get_node_id(Recipient::PhantomNode);
                                                                                        if phantom_pubkey_res.is_ok() && fake_scid::is_valid_phantom(&self.fake_scid_rand_bytes, short_chan_id, &self.genesis_hash) {
                                                                                                let phantom_shared_secret = self.node_signer.ecdh(Recipient::PhantomNode, &onion_packet.public_key.unwrap(), None).unwrap().secret_bytes();
-                                                                                               let next_hop = match onion_utils::decode_next_payment_hop(phantom_shared_secret, &onion_packet.hop_data, onion_packet.hmac, payment_hash) {
+                                                                                               let next_hop = match onion_utils::decode_next_payment_hop(
+                                                                                                       phantom_shared_secret, &onion_packet.hop_data, onion_packet.hmac,
+                                                                                                       payment_hash, &self.node_signer
+                                                                                               ) {
                                                                                                        Ok(res) => res,
                                                                                                        Err(onion_utils::OnionDecodeErr::Malformed { err_msg, err_code }) => {
                                                                                                                let sha256_of_onion = Sha256::hash(&onion_packet.hop_data).into_inner();
@@ -4397,33 +4668,29 @@ where
                                },
                                BackgroundEvent::MonitorUpdateRegeneratedOnStartup { counterparty_node_id, funding_txo, update } => {
                                        let mut updated_chan = false;
-                                       let res = {
+                                       {
                                                let per_peer_state = self.per_peer_state.read().unwrap();
                                                if let Some(peer_state_mutex) = per_peer_state.get(&counterparty_node_id) {
                                                        let mut peer_state_lock = peer_state_mutex.lock().unwrap();
                                                        let peer_state = &mut *peer_state_lock;
                                                        match peer_state.channel_by_id.entry(funding_txo.to_channel_id()) {
                                                                hash_map::Entry::Occupied(mut chan_phase) => {
-                                                                       updated_chan = true;
-                                                                       handle_new_monitor_update!(self, funding_txo, update.clone(),
-                                                                               peer_state_lock, peer_state, per_peer_state, chan_phase).map(|_| ())
+                                                                       if let ChannelPhase::Funded(chan) = chan_phase.get_mut() {
+                                                                               updated_chan = true;
+                                                                               handle_new_monitor_update!(self, funding_txo, update.clone(),
+                                                                                       peer_state_lock, peer_state, per_peer_state, chan);
+                                                                       } else {
+                                                                               debug_assert!(false, "We shouldn't have an update for a non-funded channel");
+                                                                       }
                                                                },
-                                                               hash_map::Entry::Vacant(_) => Ok(()),
+                                                               hash_map::Entry::Vacant(_) => {},
                                                        }
-                                               } else { Ok(()) }
-                                       };
+                                               }
+                                       }
                                        if !updated_chan {
                                                // TODO: Track this as in-flight even though the channel is closed.
                                                let _ = self.chain_monitor.update_channel(funding_txo, &update);
                                        }
-                                       // TODO: If this channel has since closed, we're likely providing a payment
-                                       // preimage update, which we must ensure is durable! We currently don't,
-                                       // however, ensure that.
-                                       if res.is_err() {
-                                               log_error!(self.logger,
-                                                       "Failed to provide ChannelMonitorUpdate to closed channel! This likely lost us a payment preimage!");
-                                       }
-                                       let _ = handle_error!(self, res, counterparty_node_id);
                                },
                                BackgroundEvent::MonitorUpdatesComplete { counterparty_node_id, channel_id } => {
                                        let per_peer_state = self.per_peer_state.read().unwrap();
@@ -4533,8 +4800,9 @@ where
                        let mut handle_errors: Vec<(Result<(), _>, _)> = Vec::new();
                        let mut timed_out_mpp_htlcs = Vec::new();
                        let mut pending_peers_awaiting_removal = Vec::new();
+                       let mut shutdown_channels = Vec::new();
 
-                       let process_unfunded_channel_tick = |
+                       let mut process_unfunded_channel_tick = |
                                chan_id: &ChannelId,
                                context: &mut ChannelContext<SP>,
                                unfunded_context: &mut UnfundedChannelContext,
@@ -4547,7 +4815,7 @@ where
                                                "Force-closing pending channel with ID {} for not establishing in a timely manner", chan_id);
                                        update_maps_on_chan_removal!(self, &context);
                                        self.issue_channel_close_events(&context, ClosureReason::HolderForceClosed);
-                                       self.finish_force_close_channel(context.force_shutdown(false));
+                                       shutdown_channels.push(context.force_shutdown(false));
                                        pending_msg_events.push(MessageSendEvent::HandleError {
                                                node_id: counterparty_node_id,
                                                action: msgs::ErrorAction::SendErrorMessage {
@@ -4740,6 +5008,10 @@ where
                                let _ = handle_error!(self, err, counterparty_node_id);
                        }
 
+                       for shutdown_res in shutdown_channels {
+                               self.finish_close_channel(shutdown_res);
+                       }
+
                        self.pending_outbound_payments.remove_stale_payments(&self.pending_events);
 
                        // Technically we don't need to do this here, but if we have holding cell entries in a
@@ -4896,6 +5168,7 @@ where
                // This ensures that future code doesn't introduce a lock-order requirement for
                // `forward_htlcs` to be locked after the `per_peer_state` peer locks, which calling
                // this function with any `per_peer_state` peer lock acquired would.
+               #[cfg(debug_assertions)]
                for (_, peer) in self.per_peer_state.read().unwrap().iter() {
                        debug_assert_ne!(peer.held_by_thread(), LockHeldState::HeldByThread);
                }
@@ -5146,15 +5419,8 @@ where
                                                                peer_state.monitor_update_blocked_actions.entry(chan_id).or_insert(Vec::new()).push(action);
                                                        }
                                                        if !during_init {
-                                                               let res = handle_new_monitor_update!(self, prev_hop.outpoint, monitor_update, peer_state_lock,
-                                                                       peer_state, per_peer_state, chan_phase_entry);
-                                                               if let Err(e) = res {
-                                                                       // TODO: This is a *critical* error - we probably updated the outbound edge
-                                                                       // of the HTLC's monitor with a preimage. We should retry this monitor
-                                                                       // update over and over again until morale improves.
-                                                                       log_error!(self.logger, "Failed to update channel monitor with preimage {:?}", payment_preimage);
-                                                                       return Err((counterparty_node_id, e));
-                                                               }
+                                                               handle_new_monitor_update!(self, prev_hop.outpoint, monitor_update, peer_state_lock,
+                                                                       peer_state, per_peer_state, chan);
                                                        } else {
                                                                // If we're running during init we cannot update a monitor directly -
                                                                // they probably haven't actually been loaded yet. Instead, push the
@@ -5781,47 +6047,42 @@ where
                                Err(MsgHandleErrInternal::send_err_msg_no_close("Already had channel with the new channel_id".to_owned(), funding_msg.channel_id))
                        },
                        hash_map::Entry::Vacant(e) => {
-                               match self.id_to_peer.lock().unwrap().entry(chan.context.channel_id()) {
+                               let mut id_to_peer_lock = self.id_to_peer.lock().unwrap();
+                               match id_to_peer_lock.entry(chan.context.channel_id()) {
                                        hash_map::Entry::Occupied(_) => {
                                                return Err(MsgHandleErrInternal::send_err_msg_no_close(
                                                        "The funding_created message had the same funding_txid as an existing channel - funding is not possible".to_owned(),
                                                        funding_msg.channel_id))
                                        },
                                        hash_map::Entry::Vacant(i_e) => {
-                                               i_e.insert(chan.context.get_counterparty_node_id());
-                                       }
-                               }
-
-                               // There's no problem signing a counterparty's funding transaction if our monitor
-                               // hasn't persisted to disk yet - we can't lose money on a transaction that we haven't
-                               // accepted payment from yet. We do, however, need to wait to send our channel_ready
-                               // until we have persisted our monitor.
-                               let new_channel_id = funding_msg.channel_id;
-                               peer_state.pending_msg_events.push(events::MessageSendEvent::SendFundingSigned {
-                                       node_id: counterparty_node_id.clone(),
-                                       msg: funding_msg,
-                               });
+                                               let monitor_res = self.chain_monitor.watch_channel(monitor.get_funding_txo().0, monitor);
+                                               if let Ok(persist_state) = monitor_res {
+                                                       i_e.insert(chan.context.get_counterparty_node_id());
+                                                       mem::drop(id_to_peer_lock);
+
+                                                       // There's no problem signing a counterparty's funding transaction if our monitor
+                                                       // hasn't persisted to disk yet - we can't lose money on a transaction that we haven't
+                                                       // accepted payment from yet. We do, however, need to wait to send our channel_ready
+                                                       // until we have persisted our monitor.
+                                                       peer_state.pending_msg_events.push(events::MessageSendEvent::SendFundingSigned {
+                                                               node_id: counterparty_node_id.clone(),
+                                                               msg: funding_msg,
+                                                       });
 
-                               let monitor_res = self.chain_monitor.watch_channel(monitor.get_funding_txo().0, monitor);
-
-                               if let ChannelPhase::Funded(chan) = e.insert(ChannelPhase::Funded(chan)) {
-                                       let mut res = handle_new_monitor_update!(self, monitor_res, peer_state_lock, peer_state,
-                                               per_peer_state, chan, MANUALLY_REMOVING_INITIAL_MONITOR,
-                                               { peer_state.channel_by_id.remove(&new_channel_id) });
-
-                                       // Note that we reply with the new channel_id in error messages if we gave up on the
-                                       // channel, not the temporary_channel_id. This is compatible with ourselves, but the
-                                       // spec is somewhat ambiguous here. Not a huge deal since we'll send error messages for
-                                       // any messages referencing a previously-closed channel anyway.
-                                       // We do not propagate the monitor update to the user as it would be for a monitor
-                                       // that we didn't manage to store (and that we don't care about - we don't respond
-                                       // with the funding_signed so the channel can never go on chain).
-                                       if let Err(MsgHandleErrInternal { shutdown_finish: Some((res, _)), .. }) = &mut res {
-                                               res.0 = None;
+                                                       if let ChannelPhase::Funded(chan) = e.insert(ChannelPhase::Funded(chan)) {
+                                                               handle_new_monitor_update!(self, persist_state, peer_state_lock, peer_state,
+                                                                       per_peer_state, chan, INITIAL_MONITOR);
+                                                       } else {
+                                                               unreachable!("This must be a funded channel as we just inserted it.");
+                                                       }
+                                                       Ok(())
+                                               } else {
+                                                       log_error!(self.logger, "Persisting initial ChannelMonitor failed, implying the funding outpoint was duplicated");
+                                                       return Err(MsgHandleErrInternal::send_err_msg_no_close(
+                                                               "The funding_created message had the same funding_txid as an existing channel - funding is not possible".to_owned(),
+                                                               funding_msg.channel_id));
+                                               }
                                        }
-                                       res.map(|_| ())
-                               } else {
-                                       unreachable!("This must be a funded channel as we just inserted it.");
                                }
                        }
                }
@@ -5844,17 +6105,12 @@ where
                                        ChannelPhase::Funded(ref mut chan) => {
                                                let monitor = try_chan_phase_entry!(self,
                                                        chan.funding_signed(&msg, best_block, &self.signer_provider, &self.logger), chan_phase_entry);
-                                               let update_res = self.chain_monitor.watch_channel(chan.context.get_funding_txo().unwrap(), monitor);
-                                               let mut res = handle_new_monitor_update!(self, update_res, peer_state_lock, peer_state, per_peer_state, chan_phase_entry, INITIAL_MONITOR);
-                                               if let Err(MsgHandleErrInternal { ref mut shutdown_finish, .. }) = res {
-                                                       // We weren't able to watch the channel to begin with, so no updates should be made on
-                                                       // it. Previously, full_stack_target found an (unreachable) panic when the
-                                                       // monitor update contained within `shutdown_finish` was applied.
-                                                       if let Some((ref mut shutdown_finish, _)) = shutdown_finish {
-                                                               shutdown_finish.0.take();
-                                                       }
+                                               if let Ok(persist_status) = self.chain_monitor.watch_channel(chan.context.get_funding_txo().unwrap(), monitor) {
+                                                       handle_new_monitor_update!(self, persist_status, peer_state_lock, peer_state, per_peer_state, chan, INITIAL_MONITOR);
+                                                       Ok(())
+                                               } else {
+                                                       try_chan_phase_entry!(self, Err(ChannelError::Close("Channel funding outpoint was a duplicate".to_owned())), chan_phase_entry)
                                                }
-                                               res.map(|_| ())
                                        },
                                        _ => {
                                                return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel".to_owned(), msg.channel_id));
@@ -5920,8 +6176,9 @@ where
        }
 
        fn internal_shutdown(&self, counterparty_node_id: &PublicKey, msg: &msgs::Shutdown) -> Result<(), MsgHandleErrInternal> {
-               let mut dropped_htlcs: Vec<(HTLCSource, PaymentHash)>;
-               let result: Result<(), _> = loop {
+               let mut dropped_htlcs: Vec<(HTLCSource, PaymentHash)> = Vec::new();
+               let mut finish_shutdown = None;
+               {
                        let per_peer_state = self.per_peer_state.read().unwrap();
                        let peer_state_mutex = per_peer_state.get(counterparty_node_id)
                                .ok_or_else(|| {
@@ -5956,34 +6213,37 @@ where
                                                }
                                                // Update the monitor with the shutdown script if necessary.
                                                if let Some(monitor_update) = monitor_update_opt {
-                                                       break handle_new_monitor_update!(self, funding_txo_opt.unwrap(), monitor_update,
-                                                               peer_state_lock, peer_state, per_peer_state, chan_phase_entry).map(|_| ());
+                                                       handle_new_monitor_update!(self, funding_txo_opt.unwrap(), monitor_update,
+                                                               peer_state_lock, peer_state, per_peer_state, chan);
                                                }
-                                               break Ok(());
                                        },
                                        ChannelPhase::UnfundedInboundV1(_) | ChannelPhase::UnfundedOutboundV1(_) => {
                                                let context = phase.context_mut();
                                                log_error!(self.logger, "Immediately closing unfunded channel {} as peer asked to cooperatively shut it down (which is unnecessary)", &msg.channel_id);
                                                self.issue_channel_close_events(&context, ClosureReason::CounterpartyCoopClosedUnfundedChannel);
                                                let mut chan = remove_channel_phase!(self, chan_phase_entry);
-                                               self.finish_force_close_channel(chan.context_mut().force_shutdown(false));
-                                               return Ok(());
+                                               finish_shutdown = Some(chan.context_mut().force_shutdown(false));
                                        },
                                }
                        } else {
                                return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id))
                        }
-               };
+               }
                for htlc_source in dropped_htlcs.drain(..) {
                        let receiver = HTLCDestination::NextHopChannel { node_id: Some(counterparty_node_id.clone()), channel_id: msg.channel_id };
                        let reason = HTLCFailReason::from_failure_code(0x4000 | 8);
                        self.fail_htlc_backwards_internal(&htlc_source.0, &htlc_source.1, &reason, receiver);
                }
+               if let Some(shutdown_res) = finish_shutdown {
+                       self.finish_close_channel(shutdown_res);
+               }
 
-               result
+               Ok(())
        }
 
        fn internal_closing_signed(&self, counterparty_node_id: &PublicKey, msg: &msgs::ClosingSigned) -> Result<(), MsgHandleErrInternal> {
+               let mut shutdown_result = None;
+               let unbroadcasted_batch_funding_txid;
                let per_peer_state = self.per_peer_state.read().unwrap();
                let peer_state_mutex = per_peer_state.get(counterparty_node_id)
                        .ok_or_else(|| {
@@ -5996,6 +6256,7 @@ where
                        match peer_state.channel_by_id.entry(msg.channel_id.clone()) {
                                hash_map::Entry::Occupied(mut chan_phase_entry) => {
                                        if let ChannelPhase::Funded(chan) = chan_phase_entry.get_mut() {
+                                               unbroadcasted_batch_funding_txid = chan.context.unbroadcasted_batch_funding_txid();
                                                let (closing_signed, tx) = try_chan_phase_entry!(self, chan.closing_signed(&self.fee_estimator, &msg), chan_phase_entry);
                                                if let Some(msg) = closing_signed {
                                                        peer_state.pending_msg_events.push(events::MessageSendEvent::SendClosingSigned {
@@ -6032,6 +6293,11 @@ where
                                });
                        }
                        self.issue_channel_close_events(&chan.context, ClosureReason::CooperativeClosure);
+                       shutdown_result = Some((None, Vec::new(), unbroadcasted_batch_funding_txid));
+               }
+               mem::drop(per_peer_state);
+               if let Some(shutdown_result) = shutdown_result {
+                       self.finish_close_channel(shutdown_result);
                }
                Ok(())
        }
@@ -6210,8 +6476,9 @@ where
                                        let monitor_update_opt = try_chan_phase_entry!(self, chan.commitment_signed(&msg, &self.logger), chan_phase_entry);
                                        if let Some(monitor_update) = monitor_update_opt {
                                                handle_new_monitor_update!(self, funding_txo.unwrap(), monitor_update, peer_state_lock,
-                                                       peer_state, per_peer_state, chan_phase_entry).map(|_| ())
-                                       } else { Ok(()) }
+                                                       peer_state, per_peer_state, chan);
+                                       }
+                                       Ok(())
                                } else {
                                        return try_chan_phase_entry!(self, Err(ChannelError::Close(
                                                "Got a commitment_signed message for an unfunded channel!".into())), chan_phase_entry);
@@ -6360,7 +6627,7 @@ where
        }
 
        fn internal_revoke_and_ack(&self, counterparty_node_id: &PublicKey, msg: &msgs::RevokeAndACK) -> Result<(), MsgHandleErrInternal> {
-               let (htlcs_to_fail, res) = {
+               let htlcs_to_fail = {
                        let per_peer_state = self.per_peer_state.read().unwrap();
                        let mut peer_state_lock = per_peer_state.get(counterparty_node_id)
                                .ok_or_else(|| {
@@ -6379,13 +6646,13 @@ where
                                                } else { false };
                                                let (htlcs_to_fail, monitor_update_opt) = try_chan_phase_entry!(self,
                                                        chan.revoke_and_ack(&msg, &self.fee_estimator, &self.logger, mon_update_blocked), chan_phase_entry);
-                                               let res = if let Some(monitor_update) = monitor_update_opt {
+                                               if let Some(monitor_update) = monitor_update_opt {
                                                        let funding_txo = funding_txo_opt
                                                                .expect("Funding outpoint must have been set for RAA handling to succeed");
                                                        handle_new_monitor_update!(self, funding_txo, monitor_update,
-                                                               peer_state_lock, peer_state, per_peer_state, chan_phase_entry).map(|_| ())
-                                               } else { Ok(()) };
-                                               (htlcs_to_fail, res)
+                                                               peer_state_lock, peer_state, per_peer_state, chan);
+                                               }
+                                               htlcs_to_fail
                                        } else {
                                                return try_chan_phase_entry!(self, Err(ChannelError::Close(
                                                        "Got a revoke_and_ack message for an unfunded channel!".into())), chan_phase_entry);
@@ -6395,7 +6662,7 @@ where
                        }
                };
                self.fail_holding_cell_htlcs(htlcs_to_fail, msg.channel_id, counterparty_node_id);
-               res
+               Ok(())
        }
 
        fn internal_update_fee(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateFee) -> Result<(), MsgHandleErrInternal> {
@@ -6489,7 +6756,7 @@ where
                                        if were_node_one == msg_from_node_one {
                                                return Ok(NotifyOption::SkipPersistNoEvents);
                                        } else {
-                                               log_debug!(self.logger, "Received channel_update for channel {}.", chan_id);
+                                               log_debug!(self.logger, "Received channel_update {:?} for channel {}.", msg, chan_id);
                                                try_chan_phase_entry!(self, chan.channel_update(&msg), chan_phase_entry);
                                        }
                                } else {
@@ -6591,8 +6858,7 @@ where
                                                        self.fail_htlc_backwards_internal(&htlc_update.source, &htlc_update.payment_hash, &reason, receiver);
                                                }
                                        },
-                                       MonitorEvent::CommitmentTxConfirmed(funding_outpoint) |
-                                       MonitorEvent::UpdateFailed(funding_outpoint) => {
+                                       MonitorEvent::HolderForceClosed(funding_outpoint) => {
                                                let counterparty_node_id_opt = match counterparty_node_id {
                                                        Some(cp_id) => Some(cp_id),
                                                        None => {
@@ -6616,12 +6882,7 @@ where
                                                                                                msg: update
                                                                                        });
                                                                                }
-                                                                               let reason = if let MonitorEvent::UpdateFailed(_) = monitor_event {
-                                                                                       ClosureReason::ProcessingError { err: "Failed to persist ChannelMonitor update during chain sync".to_string() }
-                                                                               } else {
-                                                                                       ClosureReason::CommitmentTxConfirmed
-                                                                               };
-                                                                               self.issue_channel_close_events(&chan.context, reason);
+                                                                               self.issue_channel_close_events(&chan.context, ClosureReason::HolderForceClosed);
                                                                                pending_msg_events.push(events::MessageSendEvent::HandleError {
                                                                                        node_id: chan.context.get_counterparty_node_id(),
                                                                                        action: msgs::ErrorAction::SendErrorMessage {
@@ -6641,7 +6902,7 @@ where
                }
 
                for failure in failed_channels.drain(..) {
-                       self.finish_force_close_channel(failure);
+                       self.finish_close_channel(failure);
                }
 
                has_pending_monitor_events
@@ -6662,7 +6923,6 @@ where
        fn check_free_holding_cells(&self) -> bool {
                let mut has_monitor_update = false;
                let mut failed_htlcs = Vec::new();
-               let mut handle_errors = Vec::new();
 
                // Walk our list of channels and find any that need to update. Note that when we do find an
                // update, if it includes actions that must be taken afterwards, we have to drop the
@@ -6687,13 +6947,8 @@ where
                                                if let Some(monitor_update) = monitor_opt {
                                                        has_monitor_update = true;
 
-                                                       let channel_id: ChannelId = *channel_id;
-                                                       let res = handle_new_monitor_update!(self, funding_txo.unwrap(), monitor_update,
-                                                               peer_state_lock, peer_state, per_peer_state, chan, MANUALLY_REMOVING,
-                                                               peer_state.channel_by_id.remove(&channel_id));
-                                                       if res.is_err() {
-                                                               handle_errors.push((counterparty_node_id, res));
-                                                       }
+                                                       handle_new_monitor_update!(self, funding_txo.unwrap(), monitor_update,
+                                                               peer_state_lock, peer_state, per_peer_state, chan);
                                                        continue 'peer_loop;
                                                }
                                        }
@@ -6703,15 +6958,11 @@ where
                        break 'peer_loop;
                }
 
-               let has_update = has_monitor_update || !failed_htlcs.is_empty() || !handle_errors.is_empty();
+               let has_update = has_monitor_update || !failed_htlcs.is_empty();
                for (failures, channel_id, counterparty_node_id) in failed_htlcs.drain(..) {
                        self.fail_holding_cell_htlcs(failures, channel_id, &counterparty_node_id);
                }
 
-               for (counterparty_node_id, err) in handle_errors.drain(..) {
-                       let _ = handle_error!(self, err, counterparty_node_id);
-               }
-
                has_update
        }
 
@@ -6721,6 +6972,8 @@ where
        fn maybe_generate_initial_closing_signed(&self) -> bool {
                let mut handle_errors: Vec<(PublicKey, Result<(), _>)> = Vec::new();
                let mut has_update = false;
+               let mut shutdown_result = None;
+               let mut unbroadcasted_batch_funding_txid = None;
                {
                        let per_peer_state = self.per_peer_state.read().unwrap();
 
@@ -6731,6 +6984,7 @@ where
                                peer_state.channel_by_id.retain(|channel_id, phase| {
                                        match phase {
                                                ChannelPhase::Funded(chan) => {
+                                                       unbroadcasted_batch_funding_txid = chan.context.unbroadcasted_batch_funding_txid();
                                                        match chan.maybe_propose_closing_signed(&self.fee_estimator, &self.logger) {
                                                                Ok((msg_opt, tx_opt)) => {
                                                                        if let Some(msg) = msg_opt {
@@ -6753,6 +7007,7 @@ where
                                                                                log_info!(self.logger, "Broadcasting {}", log_tx!(tx));
                                                                                self.tx_broadcaster.broadcast_transactions(&[&tx]);
                                                                                update_maps_on_chan_removal!(self, &chan.context);
+                                                                               shutdown_result = Some((None, Vec::new(), unbroadcasted_batch_funding_txid));
                                                                                false
                                                                        } else { true }
                                                                },
@@ -6774,6 +7029,10 @@ where
                        let _ = handle_error!(self, err, counterparty_node_id);
                }
 
+               if let Some(shutdown_result) = shutdown_result {
+                       self.finish_close_channel(shutdown_result);
+               }
+
                has_update
        }
 
@@ -6799,7 +7058,7 @@ where
                                                counterparty_node_id, funding_txo, update
                                        });
                        }
-                       self.finish_force_close_channel(failure);
+                       self.finish_close_channel(failure);
                }
        }
 
@@ -7006,7 +7265,6 @@ where
        /// operation. It will double-check that nothing *else* is also blocking the same channel from
        /// making progress and then let any blocked [`ChannelMonitorUpdate`]s fly.
        fn handle_monitor_update_release(&self, counterparty_node_id: PublicKey, channel_funding_outpoint: OutPoint, mut completed_blocker: Option<RAAMonitorUpdateBlockingAction>) {
-               let mut errors = Vec::new();
                loop {
                        let per_peer_state = self.per_peer_state.read().unwrap();
                        if let Some(peer_state_mtx) = per_peer_state.get(&counterparty_node_id) {
@@ -7038,11 +7296,8 @@ where
                                                if let Some((monitor_update, further_update_exists)) = chan.unblock_next_blocked_monitor_update() {
                                                        log_debug!(self.logger, "Unlocking monitor updating for channel {} and updating monitor",
                                                                channel_funding_outpoint.to_channel_id());
-                                                       if let Err(e) = handle_new_monitor_update!(self, channel_funding_outpoint, monitor_update,
-                                                               peer_state_lck, peer_state, per_peer_state, chan_phase_entry)
-                                                       {
-                                                               errors.push((e, counterparty_node_id));
-                                                       }
+                                                       handle_new_monitor_update!(self, channel_funding_outpoint, monitor_update,
+                                                               peer_state_lck, peer_state, per_peer_state, chan);
                                                        if further_update_exists {
                                                                // If there are more `ChannelMonitorUpdate`s to process, restart at the
                                                                // top of the loop.
@@ -7061,10 +7316,6 @@ where
                        }
                        break;
                }
-               for (err, counterparty_node_id) in errors {
-                       let res = Err::<(), _>(err);
-                       let _ = handle_error!(self, res, counterparty_node_id);
-               }
        }
 
        fn handle_post_event_actions(&self, actions: Vec<EventCompletionAction>) {
@@ -7747,7 +7998,6 @@ where
        fn peer_disconnected(&self, counterparty_node_id: &PublicKey) {
                let _persistence_guard = PersistenceNotifierGuard::optionally_notify(
                        self, || NotifyOption::SkipPersistHandleEvents);
-
                let mut failed_channels = Vec::new();
                let mut per_peer_state = self.per_peer_state.write().unwrap();
                let remove_peer = {
@@ -7760,24 +8010,24 @@ where
                                peer_state.channel_by_id.retain(|_, phase| {
                                        let context = match phase {
                                                ChannelPhase::Funded(chan) => {
-                                                       chan.remove_uncommitted_htlcs_and_mark_paused(&self.logger);
-                                                       // We only retain funded channels that are not shutdown.
-                                                       if !chan.is_shutdown() {
+                                                       if chan.remove_uncommitted_htlcs_and_mark_paused(&self.logger).is_ok() {
+                                                               // We only retain funded channels that are not shutdown.
                                                                return true;
                                                        }
-                                                       &chan.context
+                                                       &mut chan.context
                                                },
                                                // Unfunded channels will always be removed.
                                                ChannelPhase::UnfundedOutboundV1(chan) => {
-                                                       &chan.context
+                                                       &mut chan.context
                                                },
                                                ChannelPhase::UnfundedInboundV1(chan) => {
-                                                       &chan.context
+                                                       &mut chan.context
                                                },
                                        };
                                        // Clean up for removal.
                                        update_maps_on_chan_removal!(self, &context);
                                        self.issue_channel_close_events(&context, ClosureReason::DisconnectedPeer);
+                                       failed_channels.push(context.force_shutdown(false));
                                        false
                                });
                                // Note that we don't bother generating any events for pre-accept channels -
@@ -7836,7 +8086,7 @@ where
                mem::drop(per_peer_state);
 
                for failure in failed_channels.drain(..) {
-                       self.finish_force_close_channel(failure);
+                       self.finish_close_channel(failure);
                }
        }
 
@@ -8581,7 +8831,7 @@ where
                                }
 
                                number_of_funded_channels += peer_state.channel_by_id.iter().filter(
-                                       |(_, phase)| if let ChannelPhase::Funded(chan) = phase { chan.context.is_funding_initiated() } else { false }
+                                       |(_, phase)| if let ChannelPhase::Funded(chan) = phase { chan.context.is_funding_broadcast() } else { false }
                                ).count();
                        }
 
@@ -8592,7 +8842,7 @@ where
                                let peer_state = &mut *peer_state_lock;
                                for channel in peer_state.channel_by_id.iter().filter_map(
                                        |(_, phase)| if let ChannelPhase::Funded(channel) = phase {
-                                               if channel.context.is_funding_initiated() { Some(channel) } else { None }
+                                               if channel.context.is_funding_broadcast() { Some(channel) } else { None }
                                        } else { None }
                                ) {
                                        channel.write(writer)?;
@@ -9016,7 +9266,10 @@ where
                                                log_error!(args.logger, " The ChannelMonitor for channel {} is at counterparty commitment transaction number {} but the ChannelManager is at counterparty commitment transaction number {}.",
                                                        &channel.context.channel_id(), monitor.get_cur_counterparty_commitment_number(), channel.get_cur_counterparty_commitment_transaction_number());
                                        }
-                                       let (monitor_update, mut new_failed_htlcs) = channel.context.force_shutdown(true);
+                                       let (monitor_update, mut new_failed_htlcs, batch_funding_txid) = channel.context.force_shutdown(true);
+                                       if batch_funding_txid.is_some() {
+                                               return Err(DecodeError::InvalidValue);
+                                       }
                                        if let Some((counterparty_node_id, funding_txo, update)) = monitor_update {
                                                close_background_events.push(BackgroundEvent::MonitorUpdateRegeneratedOnStartup {
                                                        counterparty_node_id, funding_txo, update
@@ -9056,7 +9309,7 @@ where
                                        if let Some(short_channel_id) = channel.context.get_short_channel_id() {
                                                short_to_chan_info.insert(short_channel_id, (channel.context.get_counterparty_node_id(), channel.context.channel_id()));
                                        }
-                                       if channel.context.is_funding_initiated() {
+                                       if channel.context.is_funding_broadcast() {
                                                id_to_peer.insert(channel.context.channel_id(), channel.context.get_counterparty_node_id());
                                        }
                                        match funded_peer_channels.entry(channel.context.get_counterparty_node_id()) {
@@ -9421,6 +9674,7 @@ where
                                                                                pending_fee_msat: Some(path_fee),
                                                                                total_msat: path_amt,
                                                                                starting_block_height: best_block_height,
+                                                                               remaining_max_total_routing_fee_msat: None, // only used for retries, and we'll never retry on startup
                                                                        });
                                                                        log_info!(args.logger, "Added a pending payment for {} msat with payment hash {} for path with session priv {}",
                                                                                path_amt, &htlc.payment_hash,  log_bytes!(session_priv_bytes));
@@ -9769,6 +10023,8 @@ where
                        event_persist_notifier: Notifier::new(),
                        needs_persist_flag: AtomicBool::new(false),
 
+                       funding_batch_states: Mutex::new(BTreeMap::new()),
+
                        entropy_source: args.entropy_source,
                        node_signer: args.node_signer,
                        signer_provider: args.signer_provider,
@@ -10052,7 +10308,7 @@ mod tests {
                        TEST_FINAL_CLTV, false), 100_000);
                let route = find_route(
                        &nodes[0].node.get_our_node_id(), &route_params, &nodes[0].network_graph,
-                       None, nodes[0].logger, &scorer, &(), &random_seed_bytes
+                       None, nodes[0].logger, &scorer, &Default::default(), &random_seed_bytes
                ).unwrap();
                nodes[0].node.send_spontaneous_payment(&route, Some(payment_preimage),
                        RecipientOnionFields::spontaneous_empty(), PaymentId(payment_preimage.0)).unwrap();
@@ -10086,7 +10342,7 @@ mod tests {
                let payment_preimage = PaymentPreimage([42; 32]);
                let route = find_route(
                        &nodes[0].node.get_our_node_id(), &route_params, &nodes[0].network_graph,
-                       None, nodes[0].logger, &scorer, &(), &random_seed_bytes
+                       None, nodes[0].logger, &scorer, &Default::default(), &random_seed_bytes
                ).unwrap();
                let payment_hash = nodes[0].node.send_spontaneous_payment(&route, Some(payment_preimage),
                        RecipientOnionFields::spontaneous_empty(), PaymentId(payment_preimage.0)).unwrap();
@@ -10143,7 +10399,7 @@ mod tests {
                );
                let route = find_route(
                        &nodes[0].node.get_our_node_id(), &route_params, &nodes[0].network_graph,
-                       None, nodes[0].logger, &scorer, &(), &random_seed_bytes
+                       None, nodes[0].logger, &scorer, &Default::default(), &random_seed_bytes
                ).unwrap();
                let payment_id_2 = PaymentId([45; 32]);
                nodes[0].node.send_spontaneous_payment(&route, Some(payment_preimage),
@@ -10194,7 +10450,7 @@ mod tests {
                let random_seed_bytes = chanmon_cfgs[1].keys_manager.get_secure_random_bytes();
                let route = find_route(
                        &payer_pubkey, &route_params, &network_graph, Some(&first_hops.iter().collect::<Vec<_>>()),
-                       nodes[0].logger, &scorer, &(), &random_seed_bytes
+                       nodes[0].logger, &scorer, &Default::default(), &random_seed_bytes
                ).unwrap();
 
                let test_preimage = PaymentPreimage([42; 32]);
@@ -10239,7 +10495,7 @@ mod tests {
                let random_seed_bytes = chanmon_cfgs[1].keys_manager.get_secure_random_bytes();
                let route = find_route(
                        &payer_pubkey, &route_params, &network_graph, Some(&first_hops.iter().collect::<Vec<_>>()),
-                       nodes[0].logger, &scorer, &(), &random_seed_bytes
+                       nodes[0].logger, &scorer, &Default::default(), &random_seed_bytes
                ).unwrap();
 
                let test_preimage = PaymentPreimage([42; 32]);