Merge pull request #2006 from TheBlueMatt/2023-02-no-recursive-read-locks
[rust-lightning] / lightning / src / ln / channelmanager.rs
index 59d8e387529412a5d0dcd8adc61050791d8643fe..0757e117ce2e7660648856053070c12773b2b6b1 100644 (file)
@@ -65,18 +65,20 @@ use crate::util::ser::{BigSize, FixedLengthReader, Readable, ReadableArgs, Maybe
 use crate::util::logger::{Level, Logger};
 use crate::util::errors::APIError;
 
+use alloc::collections::BTreeMap;
+
 use crate::io;
 use crate::prelude::*;
 use core::{cmp, mem};
 use core::cell::RefCell;
 use crate::io::Read;
-use crate::sync::{Arc, Mutex, RwLock, RwLockReadGuard, FairRwLock};
+use crate::sync::{Arc, Mutex, RwLock, RwLockReadGuard, FairRwLock, LockTestExt, LockHeldState};
 use core::sync::atomic::{AtomicUsize, Ordering};
 use core::time::Duration;
 use core::ops::Deref;
 
 // Re-export this for use in the public API.
-pub use crate::ln::outbound_payment::{PaymentSendFailure, Retry};
+pub use crate::ln::outbound_payment::{PaymentSendFailure, Retry, RetryableSendFailure};
 
 // We hold various information about HTLC relay in the HTLC objects in Channel itself:
 //
@@ -343,17 +345,6 @@ impl MsgHandleErrInternal {
                }
        }
        #[inline]
-       fn ignore_no_close(err: String) -> Self {
-               Self {
-                       err: LightningError {
-                               err,
-                               action: msgs::ErrorAction::IgnoreError,
-                       },
-                       chan_id: None,
-                       shutdown_finish: None,
-               }
-       }
-       #[inline]
        fn from_no_close(err: msgs::LightningError) -> Self {
                Self { err, chan_id: None, shutdown_finish: None }
        }
@@ -464,6 +455,7 @@ enum BackgroundEvent {
        ClosingMonitorUpdate((OutPoint, ChannelMonitorUpdate)),
 }
 
+#[derive(Debug)]
 pub(crate) enum MonitorUpdateCompletionAction {
        /// Indicates that a payment ultimately destined for us was claimed and we should emit an
        /// [`events::Event::PaymentClaimed`] to the user if we haven't yet generated such an event for
@@ -474,6 +466,11 @@ pub(crate) enum MonitorUpdateCompletionAction {
        EmitEvent { event: events::Event },
 }
 
+impl_writeable_tlv_based_enum_upgradable!(MonitorUpdateCompletionAction,
+       (0, PaymentClaimed) => { (0, payment_hash, required) },
+       (2, EmitEvent) => { (0, event, upgradable_required) },
+);
+
 /// State we hold per-peer.
 pub(super) struct PeerState<Signer: ChannelSigner> {
        /// `temporary_channel_id` or `channel_id` -> `channel`.
@@ -487,6 +484,37 @@ pub(super) struct PeerState<Signer: ChannelSigner> {
        /// Messages to send to the peer - pushed to in the same lock that they are generated in (except
        /// for broadcast messages, where ordering isn't as strict).
        pub(super) pending_msg_events: Vec<MessageSendEvent>,
+       /// Map from a specific channel to some action(s) that should be taken when all pending
+       /// [`ChannelMonitorUpdate`]s for the channel complete updating.
+       ///
+       /// Note that because we generally only have one entry here a HashMap is pretty overkill. A
+       /// BTreeMap currently stores more than ten elements per leaf node, so even up to a few
+       /// channels with a peer this will just be one allocation and will amount to a linear list of
+       /// channels to walk, avoiding the whole hashing rigmarole.
+       ///
+       /// Note that the channel may no longer exist. For example, if a channel was closed but we
+       /// later needed to claim an HTLC which is pending on-chain, we may generate a monitor update
+       /// for a missing channel. While a malicious peer could construct a second channel with the
+       /// same `temporary_channel_id` (or final `channel_id` in the case of 0conf channels or prior
+       /// to funding appearing on-chain), the downstream `ChannelMonitor` set is required to ensure
+       /// duplicates do not occur, so such channels should fail without a monitor update completing.
+       monitor_update_blocked_actions: BTreeMap<[u8; 32], Vec<MonitorUpdateCompletionAction>>,
+       /// The peer is currently connected (i.e. we've seen a
+       /// [`ChannelMessageHandler::peer_connected`] and no corresponding
+       /// [`ChannelMessageHandler::peer_disconnected`].
+       is_connected: bool,
+}
+
+impl <Signer: ChannelSigner> PeerState<Signer> {
+       /// Indicates that a peer meets the criteria where we're ok to remove it from our storage.
+       /// If true is passed for `require_disconnected`, the function will return false if we haven't
+       /// disconnected from the node already, ie. `PeerState::is_connected` is set to `true`.
+       fn ok_to_remove(&self, require_disconnected: bool) -> bool {
+               if require_disconnected && self.is_connected {
+                       return false
+               }
+               self.channel_by_id.is_empty() && self.monitor_update_blocked_actions.is_empty()
+       }
 }
 
 /// Stores a PaymentSecret and any other data we may need to validate an inbound payment is
@@ -577,6 +605,15 @@ pub type SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, 'h, M, T, F, L> = C
 /// offline for a full minute. In order to track this, you must call
 /// timer_tick_occurred roughly once per minute, though it doesn't have to be perfect.
 ///
+/// To avoid trivial DoS issues, ChannelManager limits the number of inbound connections and
+/// inbound channels without confirmed funding transactions. This may result in nodes which we do
+/// not have a channel with being unable to connect to us or open new channels with us if we have
+/// many peers with unfunded channels.
+///
+/// Because it is an indication of trust, inbound channels which we've accepted as 0conf are
+/// exempted from the count of unfunded channels. Similarly, outbound channels and connections are
+/// never limited. Please ensure you limit the count of such channels yourself.
+///
 /// Rather than using a plain ChannelManager, it is preferable to use either a SimpleArcChannelManager
 /// a SimpleRefChannelManager, for conciseness. See their documentation for more details, but
 /// essentially you should default to using a SimpleRefChannelManager, and use a
@@ -763,9 +800,8 @@ where
        /// very far in the past, and can only ever be up to two hours in the future.
        highest_seen_timestamp: AtomicUsize,
 
-       /// The bulk of our storage will eventually be here (message queues and the like). Currently
-       /// the `per_peer_state` stores our channels on a per-peer basis, as well as the peer's latest
-       /// features.
+       /// The bulk of our storage. Currently the `per_peer_state` stores our channels on a per-peer
+       /// basis, as well as the peer's latest features.
        ///
        /// If we are connected to a peer we always at least have an entry here, even if no channels
        /// are currently open with that peer.
@@ -928,6 +964,19 @@ pub(crate) const MPP_TIMEOUT_TICKS: u8 = 3;
 /// [`OutboundPayments::remove_stale_resolved_payments`].
 pub(crate) const IDEMPOTENCY_TIMEOUT_TICKS: u8 = 7;
 
+/// The maximum number of unfunded channels we can have per-peer before we start rejecting new
+/// (inbound) ones. The number of peers with unfunded channels is limited separately in
+/// [`MAX_UNFUNDED_CHANNEL_PEERS`].
+const MAX_UNFUNDED_CHANS_PER_PEER: usize = 4;
+
+/// The maximum number of peers from which we will allow pending unfunded channels. Once we reach
+/// this many peers we reject new (inbound) channels from peers with which we don't have a channel.
+const MAX_UNFUNDED_CHANNEL_PEERS: usize = 50;
+
+/// The maximum number of peers which we do not have a (funded) channel with. Once we reach this
+/// many peers we reject new (inbound) connections.
+const MAX_NO_CHANNEL_PEERS: usize = 250;
+
 /// Information needed for constructing an invoice route hint for this channel.
 #[derive(Clone, Debug, PartialEq)]
 pub struct CounterpartyForwardingInfo {
@@ -1175,9 +1224,9 @@ pub enum RecentPaymentDetails {
                /// made before LDK version 0.0.104.
                payment_hash: Option<PaymentHash>,
        },
-       /// After a payment is explicitly abandoned by calling [`ChannelManager::abandon_payment`], it
-       /// is marked as abandoned until an [`Event::PaymentFailed`] is generated. A payment could also
-       /// be marked as abandoned if pathfinding fails repeatedly or retries have been exhausted.
+       /// After a payment's retries are exhausted per the provided [`Retry`], or it is explicitly
+       /// abandoned via [`ChannelManager::abandon_payment`], it is marked as abandoned until all
+       /// pending HTLCs for this payment resolve and an [`Event::PaymentFailed`] is generated.
        Abandoned {
                /// Hash of the payment that we have given up trying to send.
                payment_hash: PaymentHash,
@@ -1203,13 +1252,10 @@ macro_rules! handle_error {
                match $internal {
                        Ok(msg) => Ok(msg),
                        Err(MsgHandleErrInternal { err, chan_id, shutdown_finish }) => {
-                               #[cfg(any(feature = "_test_utils", test))]
-                               {
-                                       // In testing, ensure there are no deadlocks where the lock is already held upon
-                                       // entering the macro.
-                                       debug_assert!($self.pending_events.try_lock().is_ok());
-                                       debug_assert!($self.per_peer_state.try_write().is_ok());
-                               }
+                               // In testing, ensure there are no deadlocks where the lock is already held upon
+                               // entering the macro.
+                               debug_assert_ne!($self.pending_events.held_by_thread(), LockHeldState::HeldByThread);
+                               debug_assert_ne!($self.per_peer_state.held_by_thread(), LockHeldState::HeldByThread);
 
                                let mut msg_events = Vec::with_capacity(2);
 
@@ -1243,26 +1289,6 @@ macro_rules! handle_error {
                                                let mut peer_state = peer_state_mutex.lock().unwrap();
                                                peer_state.pending_msg_events.append(&mut msg_events);
                                        }
-                                       #[cfg(any(feature = "_test_utils", test))]
-                                       {
-                                               if let None = per_peer_state.get(&$counterparty_node_id) {
-                                                       // This shouldn't occour in tests unless an unkown counterparty_node_id
-                                                       // has been passed to our message handling functions.
-                                                       let expected_error_str = format!("Can't find a peer matching the passed counterparty node_id {}", $counterparty_node_id);
-                                                       match err.action {
-                                                               msgs::ErrorAction::SendErrorMessage {
-                                                                       msg: msgs::ErrorMessage { ref channel_id, ref data }
-                                                               }
-                                                               => {
-                                                                       assert_eq!(*data, expected_error_str);
-                                                                       if let Some((err_channel_id, _user_channel_id)) = chan_id {
-                                                                               debug_assert_eq!(*channel_id, err_channel_id);
-                                                                       }
-                                                               }
-                                                               _ => debug_assert!(false, "Unexpected event"),
-                                                       }
-                                               }
-                                       }
                                }
 
                                // Return error in case higher-API need one
@@ -1353,78 +1379,6 @@ macro_rules! remove_channel {
        }
 }
 
-macro_rules! handle_monitor_update_res {
-       ($self: ident, $err: expr, $chan: expr, $action_type: path, $resend_raa: expr, $resend_commitment: expr, $resend_channel_ready: expr, $failed_forwards: expr, $failed_fails: expr, $failed_finalized_fulfills: expr, $chan_id: expr) => {
-               match $err {
-                       ChannelMonitorUpdateStatus::PermanentFailure => {
-                               log_error!($self.logger, "Closing channel {} due to monitor update ChannelMonitorUpdateStatus::PermanentFailure", log_bytes!($chan_id[..]));
-                               update_maps_on_chan_removal!($self, $chan);
-                               // TODO: $failed_fails is dropped here, which will cause other channels to hit the
-                               // chain in a confused state! We need to move them into the ChannelMonitor which
-                               // will be responsible for failing backwards once things confirm on-chain.
-                               // It's ok that we drop $failed_forwards here - at this point we'd rather they
-                               // broadcast HTLC-Timeout and pay the associated fees to get their funds back than
-                               // us bother trying to claim it just to forward on to another peer. If we're
-                               // splitting hairs we'd prefer to claim payments that were to us, but we haven't
-                               // given up the preimage yet, so might as well just wait until the payment is
-                               // retried, avoiding the on-chain fees.
-                               let res: Result<(), _> = Err(MsgHandleErrInternal::from_finish_shutdown("ChannelMonitor storage failure".to_owned(), *$chan_id, $chan.get_user_id(),
-                                               $chan.force_shutdown(false), $self.get_channel_update_for_broadcast(&$chan).ok() ));
-                               (res, true)
-                       },
-                       ChannelMonitorUpdateStatus::InProgress => {
-                               log_info!($self.logger, "Disabling channel {} due to monitor update in progress. On restore will send {} and process {} forwards, {} fails, and {} fulfill finalizations",
-                                               log_bytes!($chan_id[..]),
-                                               if $resend_commitment && $resend_raa {
-                                                               match $action_type {
-                                                                       RAACommitmentOrder::CommitmentFirst => { "commitment then RAA" },
-                                                                       RAACommitmentOrder::RevokeAndACKFirst => { "RAA then commitment" },
-                                                               }
-                                                       } else if $resend_commitment { "commitment" }
-                                                       else if $resend_raa { "RAA" }
-                                                       else { "nothing" },
-                                               (&$failed_forwards as &Vec<(PendingHTLCInfo, u64)>).len(),
-                                               (&$failed_fails as &Vec<(HTLCSource, PaymentHash, HTLCFailReason)>).len(),
-                                               (&$failed_finalized_fulfills as &Vec<HTLCSource>).len());
-                               if !$resend_commitment {
-                                       debug_assert!($action_type == RAACommitmentOrder::RevokeAndACKFirst || !$resend_raa);
-                               }
-                               if !$resend_raa {
-                                       debug_assert!($action_type == RAACommitmentOrder::CommitmentFirst || !$resend_commitment);
-                               }
-                               $chan.monitor_updating_paused($resend_raa, $resend_commitment, $resend_channel_ready, $failed_forwards, $failed_fails, $failed_finalized_fulfills);
-                               (Err(MsgHandleErrInternal::from_chan_no_close(ChannelError::Ignore("Failed to update ChannelMonitor".to_owned()), *$chan_id)), false)
-                       },
-                       ChannelMonitorUpdateStatus::Completed => {
-                               (Ok(()), false)
-                       },
-               }
-       };
-       ($self: ident, $err: expr, $entry: expr, $action_type: path, $resend_raa: expr, $resend_commitment: expr, $resend_channel_ready: expr, $failed_forwards: expr, $failed_fails: expr, $failed_finalized_fulfills: expr) => { {
-               let (res, drop) = handle_monitor_update_res!($self, $err, $entry.get_mut(), $action_type, $resend_raa, $resend_commitment, $resend_channel_ready, $failed_forwards, $failed_fails, $failed_finalized_fulfills, $entry.key());
-               if drop {
-                       $entry.remove_entry();
-               }
-               res
-       } };
-       ($self: ident, $err: expr, $entry: expr, $action_type: path, $chan_id: expr, COMMITMENT_UPDATE_ONLY) => { {
-               debug_assert!($action_type == RAACommitmentOrder::CommitmentFirst);
-               handle_monitor_update_res!($self, $err, $entry, $action_type, false, true, false, Vec::new(), Vec::new(), Vec::new(), $chan_id)
-       } };
-       ($self: ident, $err: expr, $entry: expr, $action_type: path, $chan_id: expr, NO_UPDATE) => {
-               handle_monitor_update_res!($self, $err, $entry, $action_type, false, false, false, Vec::new(), Vec::new(), Vec::new(), $chan_id)
-       };
-       ($self: ident, $err: expr, $entry: expr, $action_type: path, $resend_channel_ready: expr, OPTIONALLY_RESEND_FUNDING_LOCKED) => {
-               handle_monitor_update_res!($self, $err, $entry, $action_type, false, false, $resend_channel_ready, Vec::new(), Vec::new(), Vec::new())
-       };
-       ($self: ident, $err: expr, $entry: expr, $action_type: path, $resend_raa: expr, $resend_commitment: expr) => {
-               handle_monitor_update_res!($self, $err, $entry, $action_type, $resend_raa, $resend_commitment, false, Vec::new(), Vec::new(), Vec::new())
-       };
-       ($self: ident, $err: expr, $entry: expr, $action_type: path, $resend_raa: expr, $resend_commitment: expr, $failed_forwards: expr, $failed_fails: expr) => {
-               handle_monitor_update_res!($self, $err, $entry, $action_type, $resend_raa, $resend_commitment, false, $failed_forwards, $failed_fails, Vec::new())
-       };
-}
-
 macro_rules! send_channel_ready {
        ($self: ident, $pending_msg_events: expr, $channel: expr, $channel_ready_msg: expr) => {{
                $pending_msg_events.push(events::MessageSendEvent::SendChannelReady {
@@ -1462,6 +1416,94 @@ macro_rules! emit_channel_ready_event {
        }
 }
 
+macro_rules! handle_monitor_update_completion {
+       ($self: ident, $update_id: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan: expr) => { {
+               let mut updates = $chan.monitor_updating_restored(&$self.logger,
+                       &$self.node_signer, $self.genesis_hash, &$self.default_configuration,
+                       $self.best_block.read().unwrap().height());
+               let counterparty_node_id = $chan.get_counterparty_node_id();
+               let channel_update = if updates.channel_ready.is_some() && $chan.is_usable() {
+                       // We only send a channel_update in the case where we are just now sending a
+                       // channel_ready and the channel is in a usable state. We may re-send a
+                       // channel_update later through the announcement_signatures process for public
+                       // channels, but there's no reason not to just inform our counterparty of our fees
+                       // now.
+                       if let Ok(msg) = $self.get_channel_update_for_unicast($chan) {
+                               Some(events::MessageSendEvent::SendChannelUpdate {
+                                       node_id: counterparty_node_id,
+                                       msg,
+                               })
+                       } else { None }
+               } else { None };
+
+               let update_actions = $peer_state.monitor_update_blocked_actions
+                       .remove(&$chan.channel_id()).unwrap_or(Vec::new());
+
+               let htlc_forwards = $self.handle_channel_resumption(
+                       &mut $peer_state.pending_msg_events, $chan, updates.raa,
+                       updates.commitment_update, updates.order, updates.accepted_htlcs,
+                       updates.funding_broadcastable, updates.channel_ready,
+                       updates.announcement_sigs);
+               if let Some(upd) = channel_update {
+                       $peer_state.pending_msg_events.push(upd);
+               }
+
+               let channel_id = $chan.channel_id();
+               core::mem::drop($peer_state_lock);
+               core::mem::drop($per_peer_state_lock);
+
+               $self.handle_monitor_update_completion_actions(update_actions);
+
+               if let Some(forwards) = htlc_forwards {
+                       $self.forward_htlcs(&mut [forwards][..]);
+               }
+               $self.finalize_claims(updates.finalized_claimed_htlcs);
+               for failure in updates.failed_htlcs.drain(..) {
+                       let receiver = HTLCDestination::NextHopChannel { node_id: Some(counterparty_node_id), channel_id };
+                       $self.fail_htlc_backwards_internal(&failure.0, &failure.1, &failure.2, receiver);
+               }
+       } }
+}
+
+macro_rules! handle_new_monitor_update {
+       ($self: ident, $update_res: expr, $update_id: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan: expr, MANUALLY_REMOVING, $remove: expr) => { {
+               // update_maps_on_chan_removal needs to be able to take id_to_peer, so make sure we can in
+               // any case so that it won't deadlock.
+               debug_assert!($self.id_to_peer.try_lock().is_ok());
+               match $update_res {
+                       ChannelMonitorUpdateStatus::InProgress => {
+                               log_debug!($self.logger, "ChannelMonitor update for {} in flight, holding messages until the update completes.",
+                                       log_bytes!($chan.channel_id()[..]));
+                               Ok(())
+                       },
+                       ChannelMonitorUpdateStatus::PermanentFailure => {
+                               log_error!($self.logger, "Closing channel {} due to monitor update ChannelMonitorUpdateStatus::PermanentFailure",
+                                       log_bytes!($chan.channel_id()[..]));
+                               update_maps_on_chan_removal!($self, $chan);
+                               let res: Result<(), _> = Err(MsgHandleErrInternal::from_finish_shutdown(
+                                       "ChannelMonitor storage failure".to_owned(), $chan.channel_id(),
+                                       $chan.get_user_id(), $chan.force_shutdown(false),
+                                       $self.get_channel_update_for_broadcast(&$chan).ok()));
+                               $remove;
+                               res
+                       },
+                       ChannelMonitorUpdateStatus::Completed => {
+                               if ($update_id == 0 || $chan.get_next_monitor_update()
+                                       .expect("We can't be processing a monitor update if it isn't queued")
+                                       .update_id == $update_id) &&
+                                       $chan.get_latest_monitor_update_id() == $update_id
+                               {
+                                       handle_monitor_update_completion!($self, $update_id, $peer_state_lock, $peer_state, $per_peer_state_lock, $chan);
+                               }
+                               Ok(())
+                       },
+               }
+       } };
+       ($self: ident, $update_res: expr, $update_id: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan_entry: expr) => {
+               handle_new_monitor_update!($self, $update_res, $update_id, $peer_state_lock, $peer_state, $per_peer_state_lock, $chan_entry.get_mut(), MANUALLY_REMOVING, $chan_entry.remove_entry())
+       }
+}
+
 impl<M: Deref, T: Deref, ES: Deref, NS: Deref, SP: Deref, F: Deref, R: Deref, L: Deref> ChannelManager<M, T, ES, NS, SP, F, R, L>
 where
        M::Target: chain::Watch<<SP::Target as SignerProvider>::Signer>,
@@ -1592,12 +1634,10 @@ where
 
                let per_peer_state = self.per_peer_state.read().unwrap();
 
-               let peer_state_mutex_opt = per_peer_state.get(&their_network_key);
-               if let None = peer_state_mutex_opt {
-                       return Err(APIError::APIMisuseError { err: format!("Not connected to node: {}", their_network_key) });
-               }
+               let peer_state_mutex = per_peer_state.get(&their_network_key)
+                       .ok_or_else(|| APIError::APIMisuseError{ err: format!("Not connected to node: {}", their_network_key) })?;
 
-               let mut peer_state = peer_state_mutex_opt.unwrap().lock().unwrap();
+               let mut peer_state = peer_state_mutex.lock().unwrap();
                let channel = {
                        let outbound_scid_alias = self.create_and_insert_outbound_scid_alias();
                        let their_features = &peer_state.latest_features;
@@ -1635,14 +1675,13 @@ where
        }
 
        fn list_channels_with_filter<Fn: FnMut(&(&[u8; 32], &Channel<<SP::Target as SignerProvider>::Signer>)) -> bool + Copy>(&self, f: Fn) -> Vec<ChannelDetails> {
-               let mut res = Vec::new();
                // Allocate our best estimate of the number of channels we have in the `res`
                // Vec. Sadly the `short_to_chan_info` map doesn't cover channels without
                // a scid or a scid alias, and the `id_to_peer` shouldn't be used outside
                // of the ChannelMonitor handling. Therefore reallocations may still occur, but is
                // unlikely as the `short_to_chan_info` map often contains 2 entries for
                // the same channel.
-               res.reserve(self.short_to_chan_info.read().unwrap().len());
+               let mut res = Vec::with_capacity(self.short_to_chan_info.read().unwrap().len());
                {
                        let best_block_height = self.best_block.read().unwrap().height();
                        let per_peer_state = self.per_peer_state.read().unwrap();
@@ -1726,7 +1765,7 @@ where
        ///
        /// This can be useful for payments that may have been prepared, but ultimately not sent, as a
        /// result of a crash. If such a payment exists, is not listed here, and an
-       /// [`Event::PaymentSent`] has not been received, you may consider retrying the payment.
+       /// [`Event::PaymentSent`] has not been received, you may consider resending the payment.
        ///
        /// [`Event::PaymentSent`]: events::Event::PaymentSent
        pub fn list_recent_payments(&self) -> Vec<RecentPaymentDetails> {
@@ -1772,34 +1811,34 @@ where
                let result: Result<(), _> = loop {
                        let per_peer_state = self.per_peer_state.read().unwrap();
 
-                       let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id);
-                       if let None = peer_state_mutex_opt {
-                               return Err(APIError::APIMisuseError { err: format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id) });
-                       }
+                       let peer_state_mutex = per_peer_state.get(counterparty_node_id)
+                               .ok_or_else(|| APIError::ChannelUnavailable { err: format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id) })?;
 
-                       let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap();
+                       let mut peer_state_lock = peer_state_mutex.lock().unwrap();
                        let peer_state = &mut *peer_state_lock;
                        match peer_state.channel_by_id.entry(channel_id.clone()) {
                                hash_map::Entry::Occupied(mut chan_entry) => {
-                                       let (shutdown_msg, monitor_update, htlcs) = chan_entry.get_mut().get_shutdown(&self.signer_provider, &peer_state.latest_features, target_feerate_sats_per_1000_weight)?;
+                                       let funding_txo_opt = chan_entry.get().get_funding_txo();
+                                       let their_features = &peer_state.latest_features;
+                                       let (shutdown_msg, mut monitor_update_opt, htlcs) = chan_entry.get_mut()
+                                               .get_shutdown(&self.signer_provider, their_features, target_feerate_sats_per_1000_weight)?;
                                        failed_htlcs = htlcs;
 
-                                       // Update the monitor with the shutdown script if necessary.
-                                       if let Some(monitor_update) = monitor_update {
-                                               let update_res = self.chain_monitor.update_channel(chan_entry.get().get_funding_txo().unwrap(), &monitor_update);
-                                               let (result, is_permanent) =
-                                                       handle_monitor_update_res!(self, update_res, chan_entry.get_mut(), RAACommitmentOrder::CommitmentFirst, chan_entry.key(), NO_UPDATE);
-                                               if is_permanent {
-                                                       remove_channel!(self, chan_entry);
-                                                       break result;
-                                               }
-                                       }
-
+                                       // We can send the `shutdown` message before updating the `ChannelMonitor`
+                                       // here as we don't need the monitor update to complete until we send a
+                                       // `shutdown_signed`, which we'll delay if we're pending a monitor update.
                                        peer_state.pending_msg_events.push(events::MessageSendEvent::SendShutdown {
                                                node_id: *counterparty_node_id,
-                                               msg: shutdown_msg
+                                               msg: shutdown_msg,
                                        });
 
+                                       // Update the monitor with the shutdown script if necessary.
+                                       if let Some(monitor_update) = monitor_update_opt.take() {
+                                               let update_id = monitor_update.update_id;
+                                               let update_res = self.chain_monitor.update_channel(funding_txo_opt.unwrap(), monitor_update);
+                                               break handle_new_monitor_update!(self, update_res, update_id, peer_state_lock, peer_state, per_peer_state, chan_entry);
+                                       }
+
                                        if chan_entry.get().is_shutdown() {
                                                let channel = remove_channel!(self, chan_entry);
                                                if let Ok(channel_update) = self.get_channel_update_for_broadcast(&channel) {
@@ -1893,12 +1932,10 @@ where
        fn force_close_channel_with_peer(&self, channel_id: &[u8; 32], peer_node_id: &PublicKey, peer_msg: Option<&String>, broadcast: bool)
        -> Result<PublicKey, APIError> {
                let per_peer_state = self.per_peer_state.read().unwrap();
-               let peer_state_mutex_opt = per_peer_state.get(peer_node_id);
+               let peer_state_mutex = per_peer_state.get(peer_node_id)
+                       .ok_or_else(|| APIError::ChannelUnavailable { err: format!("Can't find a peer matching the passed counterparty node_id {}", peer_node_id) })?;
                let mut chan = {
-                       if let None = peer_state_mutex_opt {
-                               return Err(APIError::APIMisuseError{ err: format!("Can't find a peer matching the passed counterparty node_id {}", peer_node_id) });
-                       }
-                       let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap();
+                       let mut peer_state_lock = peer_state_mutex.lock().unwrap();
                        let peer_state = &mut *peer_state_lock;
                        if let hash_map::Entry::Occupied(chan) = peer_state.channel_by_id.entry(channel_id.clone()) {
                                if let Some(peer_msg) = peer_msg {
@@ -1914,7 +1951,7 @@ where
                log_error!(self.logger, "Force-closing channel {}", log_bytes!(channel_id[..]));
                self.finish_force_close_channel(chan.force_shutdown(broadcast));
                if let Ok(update) = self.get_channel_update_for_broadcast(&chan) {
-                       let mut peer_state = peer_state_mutex_opt.unwrap().lock().unwrap();
+                       let mut peer_state = peer_state_mutex.lock().unwrap();
                        peer_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
                                msg: update
                        });
@@ -2201,7 +2238,7 @@ where
                                        let chan_update_opt = if let Some((counterparty_node_id, forwarding_id)) = forwarding_chan_info_opt {
                                                let per_peer_state = self.per_peer_state.read().unwrap();
                                                let peer_state_mutex_opt = per_peer_state.get(&counterparty_node_id);
-                                               if let None = peer_state_mutex_opt {
+                                               if peer_state_mutex_opt.is_none() {
                                                        break Some(("Don't have available channel for forwarding as requested.", 0x4000 | 10, None));
                                                }
                                                let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap();
@@ -2315,7 +2352,9 @@ where
        /// public, and thus should be called whenever the result is going to be passed out in a
        /// [`MessageSendEvent::BroadcastChannelUpdate`] event.
        ///
-       /// May be called with peer_state already locked!
+       /// Note that in `internal_closing_signed`, this function is called without the `peer_state`
+       /// corresponding to the channel's counterparty locked, as the channel been removed from the
+       /// storage and the `peer_state` lock has been dropped.
        fn get_channel_update_for_broadcast(&self, chan: &Channel<<SP::Target as SignerProvider>::Signer>) -> Result<msgs::ChannelUpdate, LightningError> {
                if !chan.should_announce() {
                        return Err(LightningError {
@@ -2334,7 +2373,10 @@ where
        /// is public (only returning an Err if the channel does not yet have an assigned short_id),
        /// and thus MUST NOT be called unless the recipient of the resulting message has already
        /// provided evidence that they know about the existence of the channel.
-       /// May be called with peer_state already locked!
+       ///
+       /// Note that through `internal_closing_signed`, this function is called without the
+       /// `peer_state`  corresponding to the channel's counterparty locked, as the channel been
+       /// removed from the storage and the `peer_state` lock has been dropped.
        fn get_channel_update_for_unicast(&self, chan: &Channel<<SP::Target as SignerProvider>::Signer>) -> Result<msgs::ChannelUpdate, LightningError> {
                log_trace!(self.logger, "Attempting to generate channel update for channel {}", log_bytes!(chan.channel_id()));
                let short_channel_id = match chan.get_short_channel_id().or(chan.latest_inbound_scid_alias()) {
@@ -2372,22 +2414,28 @@ where
                })
        }
 
-       // Only public for testing, this should otherwise never be called direcly
-       pub(crate) fn send_payment_along_path(&self, path: &Vec<RouteHop>, payment_params: &Option<PaymentParameters>, payment_hash: &PaymentHash, payment_secret: &Option<PaymentSecret>, total_value: u64, cur_height: u32, payment_id: PaymentId, keysend_preimage: &Option<PaymentPreimage>, session_priv_bytes: [u8; 32]) -> Result<(), APIError> {
+       #[cfg(test)]
+       pub(crate) fn test_send_payment_along_path(&self, path: &Vec<RouteHop>, payment_params: &Option<PaymentParameters>, payment_hash: &PaymentHash, payment_secret: &Option<PaymentSecret>, total_value: u64, cur_height: u32, payment_id: PaymentId, keysend_preimage: &Option<PaymentPreimage>, session_priv_bytes: [u8; 32]) -> Result<(), APIError> {
+               let _lck = self.total_consistency_lock.read().unwrap();
+               self.send_payment_along_path(path, payment_params, payment_hash, payment_secret, total_value, cur_height, payment_id, keysend_preimage, session_priv_bytes)
+       }
+
+       fn send_payment_along_path(&self, path: &Vec<RouteHop>, payment_params: &Option<PaymentParameters>, payment_hash: &PaymentHash, payment_secret: &Option<PaymentSecret>, total_value: u64, cur_height: u32, payment_id: PaymentId, keysend_preimage: &Option<PaymentPreimage>, session_priv_bytes: [u8; 32]) -> Result<(), APIError> {
+               // The top-level caller should hold the total_consistency_lock read lock.
+               debug_assert!(self.total_consistency_lock.try_write().is_err());
+
                log_trace!(self.logger, "Attempting to send payment for path with next hop {}", path.first().unwrap().short_channel_id);
                let prng_seed = self.entropy_source.get_secure_random_bytes();
                let session_priv = SecretKey::from_slice(&session_priv_bytes[..]).expect("RNG is busted");
 
                let onion_keys = onion_utils::construct_onion_keys(&self.secp_ctx, &path, &session_priv)
-                       .map_err(|_| APIError::InvalidRoute{err: "Pubkey along hop was maliciously selected"})?;
+                       .map_err(|_| APIError::InvalidRoute{err: "Pubkey along hop was maliciously selected".to_owned()})?;
                let (onion_payloads, htlc_msat, htlc_cltv) = onion_utils::build_onion_payloads(path, total_value, payment_secret, cur_height, keysend_preimage)?;
                if onion_utils::route_size_insane(&onion_payloads) {
-                       return Err(APIError::InvalidRoute{err: "Route size too large considering onion data"});
+                       return Err(APIError::InvalidRoute{err: "Route size too large considering onion data".to_owned()});
                }
                let onion_packet = onion_utils::construct_onion_packet(onion_payloads, onion_keys, prng_seed, payment_hash);
 
-               let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);
-
                let err: Result<(), _> = loop {
                        let (counterparty_node_id, id) = match self.short_to_chan_info.read().unwrap().get(&path.first().unwrap().short_channel_id) {
                                None => return Err(APIError::ChannelUnavailable{err: "No channel available with first hop!".to_owned()}),
@@ -2395,61 +2443,40 @@ where
                        };
 
                        let per_peer_state = self.per_peer_state.read().unwrap();
-                       let peer_state_mutex_opt = per_peer_state.get(&counterparty_node_id);
-                       if let None = peer_state_mutex_opt {
-                               return Err(APIError::InvalidRoute{err: "No peer matching the path's first hop found!" });
-                       }
-                       let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap();
+                       let peer_state_mutex = per_peer_state.get(&counterparty_node_id)
+                               .ok_or_else(|| APIError::ChannelUnavailable{err: "No peer matching the path's first hop found!".to_owned() })?;
+                       let mut peer_state_lock = peer_state_mutex.lock().unwrap();
                        let peer_state = &mut *peer_state_lock;
                        if let hash_map::Entry::Occupied(mut chan) = peer_state.channel_by_id.entry(id) {
-                               match {
-                                       if !chan.get().is_live() {
-                                               return Err(APIError::ChannelUnavailable{err: "Peer for first hop currently disconnected/pending monitor update!".to_owned()});
-                                       }
-                                       break_chan_entry!(self, chan.get_mut().send_htlc_and_commit(
-                                               htlc_msat, payment_hash.clone(), htlc_cltv, HTLCSource::OutboundRoute {
-                                                       path: path.clone(),
-                                                       session_priv: session_priv.clone(),
-                                                       first_hop_htlc_msat: htlc_msat,
-                                                       payment_id,
-                                                       payment_secret: payment_secret.clone(),
-                                                       payment_params: payment_params.clone(),
-                                               }, onion_packet, &self.logger),
-                                               chan)
-                               } {
-                                       Some((update_add, commitment_signed, monitor_update)) => {
-                                               let update_err = self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), &monitor_update);
-                                               let chan_id = chan.get().channel_id();
-                                               match (update_err,
-                                                       handle_monitor_update_res!(self, update_err, chan,
-                                                               RAACommitmentOrder::CommitmentFirst, false, true))
-                                               {
-                                                       (ChannelMonitorUpdateStatus::PermanentFailure, Err(e)) => break Err(e),
-                                                       (ChannelMonitorUpdateStatus::Completed, Ok(())) => {},
-                                                       (ChannelMonitorUpdateStatus::InProgress, Err(_)) => {
-                                                               // Note that MonitorUpdateInProgress here indicates (per function
-                                                               // docs) that we will resend the commitment update once monitor
-                                                               // updating completes. Therefore, we must return an error
-                                                               // indicating that it is unsafe to retry the payment wholesale,
-                                                               // which we do in the send_payment check for
-                                                               // MonitorUpdateInProgress, below.
-                                                               return Err(APIError::MonitorUpdateInProgress);
-                                                       },
-                                                       _ => unreachable!(),
+                               if !chan.get().is_live() {
+                                       return Err(APIError::ChannelUnavailable{err: "Peer for first hop currently disconnected".to_owned()});
+                               }
+                               let funding_txo = chan.get().get_funding_txo().unwrap();
+                               let send_res = chan.get_mut().send_htlc_and_commit(htlc_msat, payment_hash.clone(),
+                                       htlc_cltv, HTLCSource::OutboundRoute {
+                                               path: path.clone(),
+                                               session_priv: session_priv.clone(),
+                                               first_hop_htlc_msat: htlc_msat,
+                                               payment_id,
+                                               payment_secret: payment_secret.clone(),
+                                               payment_params: payment_params.clone(),
+                                       }, onion_packet, &self.logger);
+                               match break_chan_entry!(self, send_res, chan) {
+                                       Some(monitor_update) => {
+                                               let update_id = monitor_update.update_id;
+                                               let update_res = self.chain_monitor.update_channel(funding_txo, monitor_update);
+                                               if let Err(e) = handle_new_monitor_update!(self, update_res, update_id, peer_state_lock, peer_state, per_peer_state, chan) {
+                                                       break Err(e);
+                                               }
+                                               if update_res == ChannelMonitorUpdateStatus::InProgress {
+                                                       // Note that MonitorUpdateInProgress here indicates (per function
+                                                       // docs) that we will resend the commitment update once monitor
+                                                       // updating completes. Therefore, we must return an error
+                                                       // indicating that it is unsafe to retry the payment wholesale,
+                                                       // which we do in the send_payment check for
+                                                       // MonitorUpdateInProgress, below.
+                                                       return Err(APIError::MonitorUpdateInProgress);
                                                }
-
-                                               log_debug!(self.logger, "Sending payment along path resulted in a commitment_signed for channel {}", log_bytes!(chan_id));
-                                               peer_state.pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs {
-                                                       node_id: path.first().unwrap().pubkey,
-                                                       updates: msgs::CommitmentUpdate {
-                                                               update_add_htlcs: vec![update_add],
-                                                               update_fulfill_htlcs: Vec::new(),
-                                                               update_fail_htlcs: Vec::new(),
-                                                               update_fail_malformed_htlcs: Vec::new(),
-                                                               update_fee: None,
-                                                               commitment_signed,
-                                                       },
-                                               });
                                        },
                                        None => { },
                                }
@@ -2484,8 +2511,8 @@ where
        /// If a pending payment is currently in-flight with the same [`PaymentId`] provided, this
        /// method will error with an [`APIError::InvalidRoute`]. Note, however, that once a payment
        /// is no longer pending (either via [`ChannelManager::abandon_payment`], or handling of an
-       /// [`Event::PaymentSent`]) LDK will not stop you from sending a second payment with the same
-       /// [`PaymentId`].
+       /// [`Event::PaymentSent`] or [`Event::PaymentFailed`]) LDK will not stop you from sending a
+       /// second payment with the same [`PaymentId`].
        ///
        /// Thus, in order to ensure duplicate payments are not sent, you should implement your own
        /// tracking of payments, including state to indicate once a payment has completed. Because you
@@ -2530,10 +2557,12 @@ where
        /// [`Route`], we assume the invoice had the basic_mpp feature set.
        ///
        /// [`Event::PaymentSent`]: events::Event::PaymentSent
+       /// [`Event::PaymentFailed`]: events::Event::PaymentFailed
        /// [`PeerManager::process_events`]: crate::ln::peer_handler::PeerManager::process_events
        /// [`ChannelMonitorUpdateStatus::InProgress`]: crate::chain::ChannelMonitorUpdateStatus::InProgress
        pub fn send_payment(&self, route: &Route, payment_hash: PaymentHash, payment_secret: &Option<PaymentSecret>, payment_id: PaymentId) -> Result<(), PaymentSendFailure> {
                let best_block_height = self.best_block.read().unwrap().height();
+               let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);
                self.pending_outbound_payments
                        .send_payment_with_route(route, payment_hash, payment_secret, payment_id, &self.entropy_source, &self.node_signer, best_block_height,
                                |path, payment_params, payment_hash, payment_secret, total_value, cur_height, payment_id, keysend_preimage, session_priv|
@@ -2542,12 +2571,14 @@ where
 
        /// Similar to [`ChannelManager::send_payment`], but will automatically find a route based on
        /// `route_params` and retry failed payment paths based on `retry_strategy`.
-       pub fn send_payment_with_retry(&self, payment_hash: PaymentHash, payment_secret: &Option<PaymentSecret>, payment_id: PaymentId, route_params: RouteParameters, retry_strategy: Retry) -> Result<(), PaymentSendFailure> {
+       pub fn send_payment_with_retry(&self, payment_hash: PaymentHash, payment_secret: &Option<PaymentSecret>, payment_id: PaymentId, route_params: RouteParameters, retry_strategy: Retry) -> Result<(), RetryableSendFailure> {
                let best_block_height = self.best_block.read().unwrap().height();
+               let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);
                self.pending_outbound_payments
                        .send_payment(payment_hash, payment_secret, payment_id, retry_strategy, route_params,
-                               &self.router, self.list_usable_channels(), self.compute_inflight_htlcs(),
+                               &self.router, self.list_usable_channels(), || self.compute_inflight_htlcs(),
                                &self.entropy_source, &self.node_signer, best_block_height, &self.logger,
+                               &self.pending_events,
                                |path, payment_params, payment_hash, payment_secret, total_value, cur_height, payment_id, keysend_preimage, session_priv|
                                self.send_payment_along_path(path, payment_params, payment_hash, payment_secret, total_value, cur_height, payment_id, keysend_preimage, session_priv))
        }
@@ -2555,6 +2586,7 @@ where
        #[cfg(test)]
        fn test_send_payment_internal(&self, route: &Route, payment_hash: PaymentHash, payment_secret: &Option<PaymentSecret>, keysend_preimage: Option<PaymentPreimage>, payment_id: PaymentId, recv_value_msat: Option<u64>, onion_session_privs: Vec<[u8; 32]>) -> Result<(), PaymentSendFailure> {
                let best_block_height = self.best_block.read().unwrap().height();
+               let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);
                self.pending_outbound_payments.test_send_payment_internal(route, payment_hash, payment_secret, keysend_preimage, payment_id, recv_value_msat, onion_session_privs, &self.node_signer, best_block_height,
                        |path, payment_params, payment_hash, payment_secret, total_value, cur_height, payment_id, keysend_preimage, session_priv|
                        self.send_payment_along_path(path, payment_params, payment_hash, payment_secret, total_value, cur_height, payment_id, keysend_preimage, session_priv))
@@ -2567,48 +2599,25 @@ where
        }
 
 
-       /// Retries a payment along the given [`Route`].
-       ///
-       /// Errors returned are a superset of those returned from [`send_payment`], so see
-       /// [`send_payment`] documentation for more details on errors. This method will also error if the
-       /// retry amount puts the payment more than 10% over the payment's total amount, if the payment
-       /// for the given `payment_id` cannot be found (likely due to timeout or success), or if
-       /// further retries have been disabled with [`abandon_payment`].
-       ///
-       /// [`send_payment`]: [`ChannelManager::send_payment`]
-       /// [`abandon_payment`]: [`ChannelManager::abandon_payment`]
-       pub fn retry_payment(&self, route: &Route, payment_id: PaymentId) -> Result<(), PaymentSendFailure> {
-               let best_block_height = self.best_block.read().unwrap().height();
-               self.pending_outbound_payments.retry_payment_with_route(route, payment_id, &self.entropy_source, &self.node_signer, best_block_height,
-                       |path, payment_params, payment_hash, payment_secret, total_value, cur_height, payment_id, keysend_preimage, session_priv|
-                       self.send_payment_along_path(path, payment_params, payment_hash, payment_secret, total_value, cur_height, payment_id, keysend_preimage, session_priv))
-       }
-
-       /// Signals that no further retries for the given payment will occur.
+       /// Signals that no further retries for the given payment should occur. Useful if you have a
+       /// pending outbound payment with retries remaining, but wish to stop retrying the payment before
+       /// retries are exhausted.
        ///
-       /// After this method returns, no future calls to [`retry_payment`] for the given `payment_id`
-       /// are allowed. If no [`Event::PaymentFailed`] event had been generated before, one will be
-       /// generated as soon as there are no remaining pending HTLCs for this payment.
+       /// If no [`Event::PaymentFailed`] event had been generated before, one will be generated as soon
+       /// as there are no remaining pending HTLCs for this payment.
        ///
        /// Note that calling this method does *not* prevent a payment from succeeding. You must still
        /// wait until you receive either a [`Event::PaymentFailed`] or [`Event::PaymentSent`] event to
        /// determine the ultimate status of a payment.
        ///
        /// If an [`Event::PaymentFailed`] event is generated and we restart without this
-       /// [`ChannelManager`] having been persisted, the payment may still be in the pending state
-       /// upon restart. This allows further calls to [`retry_payment`] (and requiring a second call
-       /// to [`abandon_payment`] to mark the payment as failed again). Otherwise, future calls to
-       /// [`retry_payment`] will fail with [`PaymentSendFailure::ParameterError`].
+       /// [`ChannelManager`] having been persisted, another [`Event::PaymentFailed`] may be generated.
        ///
-       /// [`abandon_payment`]: Self::abandon_payment
-       /// [`retry_payment`]: Self::retry_payment
        /// [`Event::PaymentFailed`]: events::Event::PaymentFailed
        /// [`Event::PaymentSent`]: events::Event::PaymentSent
        pub fn abandon_payment(&self, payment_id: PaymentId) {
                let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);
-               if let Some(payment_failed_ev) = self.pending_outbound_payments.abandon_payment(payment_id) {
-                       self.pending_events.lock().unwrap().push(payment_failed_ev);
-               }
+               self.pending_outbound_payments.abandon_payment(payment_id, &self.pending_events);
        }
 
        /// Send a spontaneous payment, which is a payment that does not require the recipient to have
@@ -2628,6 +2637,7 @@ where
        /// [`send_payment`]: Self::send_payment
        pub fn send_spontaneous_payment(&self, route: &Route, payment_preimage: Option<PaymentPreimage>, payment_id: PaymentId) -> Result<PaymentHash, PaymentSendFailure> {
                let best_block_height = self.best_block.read().unwrap().height();
+               let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);
                self.pending_outbound_payments.send_spontaneous_payment_with_route(
                        route, payment_preimage, payment_id, &self.entropy_source, &self.node_signer,
                        best_block_height,
@@ -2642,12 +2652,13 @@ where
        /// payments.
        ///
        /// [`PaymentParameters::for_keysend`]: crate::routing::router::PaymentParameters::for_keysend
-       pub fn send_spontaneous_payment_with_retry(&self, payment_preimage: Option<PaymentPreimage>, payment_id: PaymentId, route_params: RouteParameters, retry_strategy: Retry) -> Result<PaymentHash, PaymentSendFailure> {
+       pub fn send_spontaneous_payment_with_retry(&self, payment_preimage: Option<PaymentPreimage>, payment_id: PaymentId, route_params: RouteParameters, retry_strategy: Retry) -> Result<PaymentHash, RetryableSendFailure> {
                let best_block_height = self.best_block.read().unwrap().height();
+               let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);
                self.pending_outbound_payments.send_spontaneous_payment(payment_preimage, payment_id,
                        retry_strategy, route_params, &self.router, self.list_usable_channels(),
-                       self.compute_inflight_htlcs(),  &self.entropy_source, &self.node_signer, best_block_height,
-                       &self.logger,
+                       || self.compute_inflight_htlcs(),  &self.entropy_source, &self.node_signer, best_block_height,
+                       &self.logger, &self.pending_events,
                        |path, payment_params, payment_hash, payment_secret, total_value, cur_height, payment_id, keysend_preimage, session_priv|
                        self.send_payment_along_path(path, payment_params, payment_hash, payment_secret, total_value, cur_height, payment_id, keysend_preimage, session_priv))
        }
@@ -2657,6 +2668,7 @@ where
        /// us to easily discern them from real payments.
        pub fn send_probe(&self, hops: Vec<RouteHop>) -> Result<(PaymentHash, PaymentId), PaymentSendFailure> {
                let best_block_height = self.best_block.read().unwrap().height();
+               let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);
                self.pending_outbound_payments.send_probe(hops, self.probing_cookie_secret, &self.entropy_source, &self.node_signer, best_block_height,
                        |path, payment_params, payment_hash, payment_secret, total_value, cur_height, payment_id, keysend_preimage, session_priv|
                        self.send_payment_along_path(path, payment_params, payment_hash, payment_secret, total_value, cur_height, payment_id, keysend_preimage, session_priv))
@@ -2675,12 +2687,10 @@ where
                &self, temporary_channel_id: &[u8; 32], counterparty_node_id: &PublicKey, funding_transaction: Transaction, find_funding_output: FundingOutput
        ) -> Result<(), APIError> {
                let per_peer_state = self.per_peer_state.read().unwrap();
-               let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id);
-               if let None = peer_state_mutex_opt {
-                       return Err(APIError::ChannelUnavailable { err: format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id) })
-               }
+               let peer_state_mutex = per_peer_state.get(counterparty_node_id)
+                       .ok_or_else(|| APIError::ChannelUnavailable { err: format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id) })?;
 
-               let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap();
+               let mut peer_state_lock = peer_state_mutex.lock().unwrap();
                let peer_state = &mut *peer_state_lock;
                let (chan, msg) = {
                        let (res, chan) = {
@@ -2702,7 +2712,7 @@ where
                                        (chan, funding_msg)
                                },
                                Err(_) => { return Err(APIError::ChannelUnavailable {
-                                       err: "Error deriving keys or signing initial commitment transactions - either our RNG or our counterparty's RNG is broken or the Signer refused to sign".to_owned()
+                                       err: "Signer refused to sign the initial commitment transaction".to_owned()
                                }) },
                        }
                };
@@ -2846,11 +2856,9 @@ where
                        &self.total_consistency_lock, &self.persistence_notifier,
                );
                let per_peer_state = self.per_peer_state.read().unwrap();
-               let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id);
-               if let None = peer_state_mutex_opt {
-                       return Err(APIError::APIMisuseError{ err: format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id) });
-               }
-               let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap();
+               let peer_state_mutex = per_peer_state.get(counterparty_node_id)
+                       .ok_or_else(|| APIError::ChannelUnavailable { err: format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id) })?;
+               let mut peer_state_lock = peer_state_mutex.lock().unwrap();
                let peer_state = &mut *peer_state_lock;
                for channel_id in channel_ids {
                        if !peer_state.channel_by_id.contains_key(channel_id) {
@@ -2903,24 +2911,22 @@ where
 
                let next_hop_scid = {
                        let peer_state_lock = self.per_peer_state.read().unwrap();
-                       if let Some(peer_state_mutex) = peer_state_lock.get(&next_node_id) {
-                               let mut peer_state_lock = peer_state_mutex.lock().unwrap();
-                               let peer_state = &mut *peer_state_lock;
-                               match peer_state.channel_by_id.get(next_hop_channel_id) {
-                                       Some(chan) => {
-                                               if !chan.is_usable() {
-                                                       return Err(APIError::ChannelUnavailable {
-                                                               err: format!("Channel with id {} not fully established", log_bytes!(*next_hop_channel_id))
-                                                       })
-                                               }
-                                               chan.get_short_channel_id().unwrap_or(chan.outbound_scid_alias())
-                                       },
-                                       None => return Err(APIError::ChannelUnavailable {
-                                               err: format!("Channel with id {} not found for the passed counterparty node_id {}", log_bytes!(*next_hop_channel_id), next_node_id)
-                                       })
-                               }
-                       } else {
-                               return Err(APIError::APIMisuseError{ err: format!("Can't find a peer matching the passed counterparty node_id {}", next_node_id) });
+                       let peer_state_mutex = peer_state_lock.get(&next_node_id)
+                               .ok_or_else(|| APIError::ChannelUnavailable { err: format!("Can't find a peer matching the passed counterparty node_id {}", next_node_id) })?;
+                       let mut peer_state_lock = peer_state_mutex.lock().unwrap();
+                       let peer_state = &mut *peer_state_lock;
+                       match peer_state.channel_by_id.get(next_hop_channel_id) {
+                               Some(chan) => {
+                                       if !chan.is_usable() {
+                                               return Err(APIError::ChannelUnavailable {
+                                                       err: format!("Channel with id {} not fully established", log_bytes!(*next_hop_channel_id))
+                                               })
+                                       }
+                                       chan.get_short_channel_id().unwrap_or(chan.outbound_scid_alias())
+                               },
+                               None => return Err(APIError::ChannelUnavailable {
+                                       err: format!("Channel with id {} not found for the passed counterparty node_id {}", log_bytes!(*next_hop_channel_id), next_node_id)
+                               })
                        }
                };
 
@@ -3100,7 +3106,7 @@ where
                                        };
                                        let per_peer_state = self.per_peer_state.read().unwrap();
                                        let peer_state_mutex_opt = per_peer_state.get(&counterparty_node_id);
-                                       if let None = peer_state_mutex_opt {
+                                       if peer_state_mutex_opt.is_none() {
                                                forwarding_channel_not_found!();
                                                continue;
                                        }
@@ -3387,7 +3393,8 @@ where
 
                let best_block_height = self.best_block.read().unwrap().height();
                self.pending_outbound_payments.check_retry_payments(&self.router, || self.list_usable_channels(),
-                       || self.compute_inflight_htlcs(), &self.entropy_source, &self.node_signer, best_block_height, &self.logger,
+                       || self.compute_inflight_htlcs(), &self.entropy_source, &self.node_signer, best_block_height,
+                       &self.pending_events, &self.logger,
                        |path, payment_params, payment_hash, payment_secret, total_value, cur_height, payment_id, keysend_preimage, session_priv|
                        self.send_payment_along_path(path, payment_params, payment_hash, payment_secret, total_value, cur_height, payment_id, keysend_preimage, session_priv));
 
@@ -3492,6 +3499,7 @@ where
        ///    the channel.
        ///  * Expiring a channel's previous `ChannelConfig` if necessary to only allow forwarding HTLCs
        ///    with the current `ChannelConfig`.
+       ///  * Removing peers which have disconnected but and no longer have any channels.
        ///
        /// Note that this may cause reentrancy through `chain::Watch::update_channel` calls or feerate
        /// estimate fetches.
@@ -3504,19 +3512,21 @@ where
 
                        let mut handle_errors: Vec<(Result<(), _>, _)> = Vec::new();
                        let mut timed_out_mpp_htlcs = Vec::new();
+                       let mut pending_peers_awaiting_removal = Vec::new();
                        {
                                let per_peer_state = self.per_peer_state.read().unwrap();
                                for (counterparty_node_id, peer_state_mutex) in per_peer_state.iter() {
                                        let mut peer_state_lock = peer_state_mutex.lock().unwrap();
                                        let peer_state = &mut *peer_state_lock;
                                        let pending_msg_events = &mut peer_state.pending_msg_events;
+                                       let counterparty_node_id = *counterparty_node_id;
                                        peer_state.channel_by_id.retain(|chan_id, chan| {
                                                let chan_needs_persist = self.update_channel_fee(chan_id, chan, new_feerate);
                                                if chan_needs_persist == NotifyOption::DoPersist { should_persist = NotifyOption::DoPersist; }
 
                                                if let Err(e) = chan.timer_check_closing_negotiation_progress() {
                                                        let (needs_close, err) = convert_chan_err!(self, e, chan, chan_id);
-                                                       handle_errors.push((Err(err), *counterparty_node_id));
+                                                       handle_errors.push((Err(err), counterparty_node_id));
                                                        if needs_close { return false; }
                                                }
 
@@ -3550,6 +3560,36 @@ where
 
                                                true
                                        });
+                                       if peer_state.ok_to_remove(true) {
+                                               pending_peers_awaiting_removal.push(counterparty_node_id);
+                                       }
+                               }
+                       }
+
+                       // When a peer disconnects but still has channels, the peer's `peer_state` entry in the
+                       // `per_peer_state` is not removed by the `peer_disconnected` function. If the channels
+                       // of to that peer is later closed while still being disconnected (i.e. force closed),
+                       // we therefore need to remove the peer from `peer_state` separately.
+                       // To avoid having to take the `per_peer_state` `write` lock once the channels are
+                       // closed, we instead remove such peers awaiting removal here on a timer, to limit the
+                       // negative effects on parallelism as much as possible.
+                       if pending_peers_awaiting_removal.len() > 0 {
+                               let mut per_peer_state = self.per_peer_state.write().unwrap();
+                               for counterparty_node_id in pending_peers_awaiting_removal {
+                                       match per_peer_state.entry(counterparty_node_id) {
+                                               hash_map::Entry::Occupied(entry) => {
+                                                       // Remove the entry if the peer is still disconnected and we still
+                                                       // have no channels to the peer.
+                                                       let remove_entry = {
+                                                               let peer_state = entry.get().lock().unwrap();
+                                                               peer_state.ok_to_remove(true)
+                                                       };
+                                                       if remove_entry {
+                                                               entry.remove_entry();
+                                                       }
+                                               },
+                                               hash_map::Entry::Vacant(_) => { /* The PeerState has already been removed */ }
+                                       }
                                }
                        }
 
@@ -3725,17 +3765,12 @@ where
        /// Fails an HTLC backwards to the sender of it to us.
        /// Note that we do not assume that channels corresponding to failed HTLCs are still available.
        fn fail_htlc_backwards_internal(&self, source: &HTLCSource, payment_hash: &PaymentHash, onion_error: &HTLCFailReason, destination: HTLCDestination) {
-               #[cfg(any(feature = "_test_utils", test))]
-               {
-                       // Ensure that no peer state channel storage lock is not held when calling this
-                       // function.
-                       // This ensures that future code doesn't introduce a lock_order requirement for
-                       // `forward_htlcs` to be locked after the `per_peer_state` peer locks, which calling
-                       // this function with any `per_peer_state` peer lock aquired would.
-                       let per_peer_state = self.per_peer_state.read().unwrap();
-                       for (_, peer) in per_peer_state.iter() {
-                               debug_assert!(peer.try_lock().is_ok());
-                       }
+               // Ensure that no peer state channel storage lock is held when calling this function.
+               // This ensures that future code doesn't introduce a lock-order requirement for
+               // `forward_htlcs` to be locked after the `per_peer_state` peer locks, which calling
+               // this function with any `per_peer_state` peer lock acquired would.
+               for (_, peer) in self.per_peer_state.read().unwrap().iter() {
+                       debug_assert_ne!(peer.held_by_thread(), LockHeldState::HeldByThread);
                }
 
                //TODO: There is a timing attack here where if a node fails an HTLC back to us they can
@@ -3748,16 +3783,19 @@ where
                // being fully configured. See the docs for `ChannelManagerReadArgs` for more.
                match source {
                        HTLCSource::OutboundRoute { ref path, ref session_priv, ref payment_id, ref payment_params, .. } => {
-                               self.pending_outbound_payments.fail_htlc(source, payment_hash, onion_error, path, session_priv, payment_id, payment_params, self.probing_cookie_secret, &self.secp_ctx, &self.pending_events, &self.logger);
+                               if self.pending_outbound_payments.fail_htlc(source, payment_hash, onion_error, path,
+                                       session_priv, payment_id, payment_params, self.probing_cookie_secret, &self.secp_ctx,
+                                       &self.pending_events, &self.logger)
+                               { self.push_pending_forwards_ev(); }
                        },
                        HTLCSource::PreviousHopData(HTLCPreviousHopData { ref short_channel_id, ref htlc_id, ref incoming_packet_shared_secret, ref phantom_shared_secret, ref outpoint }) => {
                                log_trace!(self.logger, "Failing HTLC with payment_hash {} backwards from us with {:?}", log_bytes!(payment_hash.0), onion_error);
                                let err_packet = onion_error.get_encrypted_failure_packet(incoming_packet_shared_secret, phantom_shared_secret);
 
-                               let mut forward_event = None;
+                               let mut push_forward_ev = false;
                                let mut forward_htlcs = self.forward_htlcs.lock().unwrap();
                                if forward_htlcs.is_empty() {
-                                       forward_event = Some(Duration::from_millis(MIN_HTLC_RELAY_HOLDING_CELL_MILLIS));
+                                       push_forward_ev = true;
                                }
                                match forward_htlcs.entry(*short_channel_id) {
                                        hash_map::Entry::Occupied(mut entry) => {
@@ -3768,12 +3806,8 @@ where
                                        }
                                }
                                mem::drop(forward_htlcs);
+                               if push_forward_ev { self.push_pending_forwards_ev(); }
                                let mut pending_events = self.pending_events.lock().unwrap();
-                               if let Some(time) = forward_event {
-                                       pending_events.push(events::Event::PendingHTLCsForwardable {
-                                               time_forwardable: time
-                                       });
-                               }
                                pending_events.push(events::Event::HTLCHandlingFailed {
                                        prev_channel_id: outpoint.to_channel_id(),
                                        failed_next_destination: destination,
@@ -3850,7 +3884,7 @@ where
                let mut expected_amt_msat = None;
                let mut valid_mpp = true;
                let mut errs = Vec::new();
-               let mut per_peer_state = Some(self.per_peer_state.read().unwrap());
+               let per_peer_state = self.per_peer_state.read().unwrap();
                for htlc in sources.iter() {
                        let (counterparty_node_id, chan_id) = match self.short_to_chan_info.read().unwrap().get(&htlc.prev_hop.short_channel_id) {
                                Some((cp_id, chan_id)) => (cp_id.clone(), chan_id.clone()),
@@ -3860,16 +3894,16 @@ where
                                }
                        };
 
-                       if let None = per_peer_state.as_ref().unwrap().get(&counterparty_node_id) {
+                       let peer_state_mutex_opt = per_peer_state.get(&counterparty_node_id);
+                       if peer_state_mutex_opt.is_none() {
                                valid_mpp = false;
                                break;
                        }
 
-                       let peer_state_mutex = per_peer_state.as_ref().unwrap().get(&counterparty_node_id).unwrap();
-                       let mut peer_state_lock = peer_state_mutex.lock().unwrap();
+                       let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap();
                        let peer_state = &mut *peer_state_lock;
 
-                       if let None = peer_state.channel_by_id.get(&chan_id) {
+                       if peer_state.channel_by_id.get(&chan_id).is_none() {
                                valid_mpp = false;
                                break;
                        }
@@ -3895,14 +3929,13 @@ where
 
                        claimable_amt_msat += htlc.value;
                }
+               mem::drop(per_peer_state);
                if sources.is_empty() || expected_amt_msat.is_none() {
-                       mem::drop(per_peer_state);
                        self.claimable_payments.lock().unwrap().pending_claiming_payments.remove(&payment_hash);
                        log_info!(self.logger, "Attempted to claim an incomplete payment which no longer had any available HTLCs!");
                        return;
                }
                if claimable_amt_msat != expected_amt_msat.unwrap() {
-                       mem::drop(per_peer_state);
                        self.claimable_payments.lock().unwrap().pending_claiming_payments.remove(&payment_hash);
                        log_info!(self.logger, "Attempted to claim an incomplete payment, expected {} msat, had {} available to claim.",
                                expected_amt_msat.unwrap(), claimable_amt_msat);
@@ -3910,8 +3943,7 @@ where
                }
                if valid_mpp {
                        for htlc in sources.drain(..) {
-                               if per_peer_state.is_none() { per_peer_state = Some(self.per_peer_state.read().unwrap()); }
-                               if let Err((pk, err)) = self.claim_funds_from_hop(per_peer_state.take().unwrap(),
+                               if let Err((pk, err)) = self.claim_funds_from_hop(
                                        htlc.prev_hop, payment_preimage,
                                        |_| Some(MonitorUpdateCompletionAction::PaymentClaimed { payment_hash }))
                                {
@@ -3923,7 +3955,6 @@ where
                                }
                        }
                }
-               mem::drop(per_peer_state);
                if !valid_mpp {
                        for htlc in sources.drain(..) {
                                let mut htlc_msat_height_data = htlc.value.to_be_bytes().to_vec();
@@ -3944,124 +3975,75 @@ where
        }
 
        fn claim_funds_from_hop<ComplFunc: FnOnce(Option<u64>) -> Option<MonitorUpdateCompletionAction>>(&self,
-               per_peer_state_lock: RwLockReadGuard<HashMap<PublicKey, Mutex<PeerState<<SP::Target as SignerProvider>::Signer>>>>,
                prev_hop: HTLCPreviousHopData, payment_preimage: PaymentPreimage, completion_action: ComplFunc)
        -> Result<(), (PublicKey, MsgHandleErrInternal)> {
                //TODO: Delay the claimed_funds relaying just like we do outbound relay!
 
+               let per_peer_state = self.per_peer_state.read().unwrap();
                let chan_id = prev_hop.outpoint.to_channel_id();
-
                let counterparty_node_id_opt = match self.short_to_chan_info.read().unwrap().get(&prev_hop.short_channel_id) {
                        Some((cp_id, _dup_chan_id)) => Some(cp_id.clone()),
                        None => None
                };
 
-               let (found_channel, mut peer_state_opt) = if counterparty_node_id_opt.is_some() && per_peer_state_lock.get(&counterparty_node_id_opt.unwrap()).is_some() {
-                       let peer_mutex = per_peer_state_lock.get(&counterparty_node_id_opt.unwrap()).unwrap();
-                       let peer_state = peer_mutex.lock().unwrap();
-                       let found_channel = peer_state.channel_by_id.contains_key(&chan_id);
-                       (found_channel, Some(peer_state))
-               }  else { (false, None) };
+               let mut peer_state_opt = counterparty_node_id_opt.as_ref().map(
+                       |counterparty_node_id| per_peer_state.get(counterparty_node_id).map(
+                               |peer_mutex| peer_mutex.lock().unwrap()
+                       )
+               ).unwrap_or(None);
 
-               if found_channel {
-                       let peer_state = &mut *peer_state_opt.as_mut().unwrap();
+               if peer_state_opt.is_some() {
+                       let mut peer_state_lock = peer_state_opt.unwrap();
+                       let peer_state = &mut *peer_state_lock;
                        if let hash_map::Entry::Occupied(mut chan) = peer_state.channel_by_id.entry(chan_id) {
                                let counterparty_node_id = chan.get().get_counterparty_node_id();
-                               match chan.get_mut().get_update_fulfill_htlc_and_commit(prev_hop.htlc_id, payment_preimage, &self.logger) {
-                                       Ok(msgs_monitor_option) => {
-                                               if let UpdateFulfillCommitFetch::NewClaim { msgs, htlc_value_msat, monitor_update } = msgs_monitor_option {
-                                                       match self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), &monitor_update) {
-                                                               ChannelMonitorUpdateStatus::Completed => {},
-                                                               e => {
-                                                                       log_given_level!(self.logger, if e == ChannelMonitorUpdateStatus::PermanentFailure { Level::Error } else { Level::Debug },
-                                                                               "Failed to update channel monitor with preimage {:?}: {:?}",
-                                                                               payment_preimage, e);
-                                                                       let err = handle_monitor_update_res!(self, e, chan, RAACommitmentOrder::CommitmentFirst, false, msgs.is_some()).unwrap_err();
-                                                                       mem::drop(peer_state_opt);
-                                                                       mem::drop(per_peer_state_lock);
-                                                                       self.handle_monitor_update_completion_actions(completion_action(Some(htlc_value_msat)));
-                                                                       return Err((counterparty_node_id, err));
-                                                               }
-                                                       }
-                                                       if let Some((msg, commitment_signed)) = msgs {
-                                                               log_debug!(self.logger, "Claiming funds for HTLC with preimage {} resulted in a commitment_signed for channel {}",
-                                                                       log_bytes!(payment_preimage.0), log_bytes!(chan.get().channel_id()));
-                                                               peer_state.pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs {
-                                                                       node_id: counterparty_node_id,
-                                                                       updates: msgs::CommitmentUpdate {
-                                                                               update_add_htlcs: Vec::new(),
-                                                                               update_fulfill_htlcs: vec![msg],
-                                                                               update_fail_htlcs: Vec::new(),
-                                                                               update_fail_malformed_htlcs: Vec::new(),
-                                                                               update_fee: None,
-                                                                               commitment_signed,
-                                                                       }
-                                                               });
-                                                       }
-                                                       mem::drop(peer_state_opt);
-                                                       mem::drop(per_peer_state_lock);
-                                                       self.handle_monitor_update_completion_actions(completion_action(Some(htlc_value_msat)));
-                                                       Ok(())
-                                               } else {
-                                                       Ok(())
-                                               }
-                                       },
-                                       Err((e, monitor_update)) => {
-                                               match self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), &monitor_update) {
-                                                       ChannelMonitorUpdateStatus::Completed => {},
-                                                       e => {
-                                                               // TODO: This needs to be handled somehow - if we receive a monitor update
-                                                               // with a preimage we *must* somehow manage to propagate it to the upstream
-                                                               // channel, or we must have an ability to receive the same update and try
-                                                               // again on restart.
-                                                               log_given_level!(self.logger, if e == ChannelMonitorUpdateStatus::PermanentFailure { Level::Error } else { Level::Info },
-                                                                       "Failed to update channel monitor with preimage {:?} immediately prior to force-close: {:?}",
-                                                                       payment_preimage, e);
-                                                       },
-                                               }
-                                               let (drop, res) = convert_chan_err!(self, e, chan.get_mut(), &chan_id);
-                                               if drop {
-                                                       chan.remove_entry();
-                                               }
-                                               mem::drop(peer_state_opt);
-                                               mem::drop(per_peer_state_lock);
-                                               self.handle_monitor_update_completion_actions(completion_action(None));
-                                               Err((counterparty_node_id, res))
-                                       },
+                               let fulfill_res = chan.get_mut().get_update_fulfill_htlc_and_commit(prev_hop.htlc_id, payment_preimage, &self.logger);
+
+                               if let UpdateFulfillCommitFetch::NewClaim { htlc_value_msat, monitor_update } = fulfill_res {
+                                       if let Some(action) = completion_action(Some(htlc_value_msat)) {
+                                               log_trace!(self.logger, "Tracking monitor update completion action for channel {}: {:?}",
+                                                       log_bytes!(chan_id), action);
+                                               peer_state.monitor_update_blocked_actions.entry(chan_id).or_insert(Vec::new()).push(action);
+                                       }
+                                       let update_id = monitor_update.update_id;
+                                       let update_res = self.chain_monitor.update_channel(prev_hop.outpoint, monitor_update);
+                                       let res = handle_new_monitor_update!(self, update_res, update_id, peer_state_lock,
+                                               peer_state, per_peer_state, chan);
+                                       if let Err(e) = res {
+                                               // TODO: This is a *critical* error - we probably updated the outbound edge
+                                               // of the HTLC's monitor with a preimage. We should retry this monitor
+                                               // update over and over again until morale improves.
+                                               log_error!(self.logger, "Failed to update channel monitor with preimage {:?}", payment_preimage);
+                                               return Err((counterparty_node_id, e));
+                                       }
                                }
-                       } else {
-                               // We've held the peer_state mutex since finding the channel and setting
-                               // found_channel to true, so the channel can't have been dropped.
-                               unreachable!()
-                       }
-               } else {
-                       let preimage_update = ChannelMonitorUpdate {
-                               update_id: CLOSED_CHANNEL_UPDATE_ID,
-                               updates: vec![ChannelMonitorUpdateStep::PaymentPreimage {
-                                       payment_preimage,
-                               }],
-                       };
-                       // We update the ChannelMonitor on the backward link, after
-                       // receiving an `update_fulfill_htlc` from the forward link.
-                       let update_res = self.chain_monitor.update_channel(prev_hop.outpoint, &preimage_update);
-                       if update_res != ChannelMonitorUpdateStatus::Completed {
-                               // TODO: This needs to be handled somehow - if we receive a monitor update
-                               // with a preimage we *must* somehow manage to propagate it to the upstream
-                               // channel, or we must have an ability to receive the same event and try
-                               // again on restart.
-                               log_error!(self.logger, "Critical error: failed to update channel monitor with preimage {:?}: {:?}",
-                                       payment_preimage, update_res);
+                               return Ok(());
                        }
-                       mem::drop(peer_state_opt);
-                       mem::drop(per_peer_state_lock);
-                       // Note that we do process the completion action here. This totally could be a
-                       // duplicate claim, but we have no way of knowing without interrogating the
-                       // `ChannelMonitor` we've provided the above update to. Instead, note that `Event`s are
-                       // generally always allowed to be duplicative (and it's specifically noted in
-                       // `PaymentForwarded`).
-                       self.handle_monitor_update_completion_actions(completion_action(None));
-                       Ok(())
                }
+               let preimage_update = ChannelMonitorUpdate {
+                       update_id: CLOSED_CHANNEL_UPDATE_ID,
+                       updates: vec![ChannelMonitorUpdateStep::PaymentPreimage {
+                               payment_preimage,
+                       }],
+               };
+               // We update the ChannelMonitor on the backward link, after
+               // receiving an `update_fulfill_htlc` from the forward link.
+               let update_res = self.chain_monitor.update_channel(prev_hop.outpoint, &preimage_update);
+               if update_res != ChannelMonitorUpdateStatus::Completed {
+                       // TODO: This needs to be handled somehow - if we receive a monitor update
+                       // with a preimage we *must* somehow manage to propagate it to the upstream
+                       // channel, or we must have an ability to receive the same event and try
+                       // again on restart.
+                       log_error!(self.logger, "Critical error: failed to update channel monitor with preimage {:?}: {:?}",
+                               payment_preimage, update_res);
+               }
+               // Note that we do process the completion action here. This totally could be a
+               // duplicate claim, but we have no way of knowing without interrogating the
+               // `ChannelMonitor` we've provided the above update to. Instead, note that `Event`s are
+               // generally always allowed to be duplicative (and it's specifically noted in
+               // `PaymentForwarded`).
+               self.handle_monitor_update_completion_actions(completion_action(None));
+               Ok(())
        }
 
        fn finalize_claims(&self, sources: Vec<HTLCSource>) {
@@ -4075,7 +4057,7 @@ where
                        },
                        HTLCSource::PreviousHopData(hop_data) => {
                                let prev_outpoint = hop_data.outpoint;
-                               let res = self.claim_funds_from_hop(self.per_peer_state.read().unwrap(), hop_data, payment_preimage,
+                               let res = self.claim_funds_from_hop(hop_data, payment_preimage,
                                        |htlc_claim_value_msat| {
                                                if let Some(forwarded_htlc_value) = forwarded_htlc_value_msat {
                                                        let fee_earned_msat = if let Some(claimed_htlc_value) = htlc_claim_value_msat {
@@ -4132,6 +4114,14 @@ where
                pending_forwards: Vec<(PendingHTLCInfo, u64)>, funding_broadcastable: Option<Transaction>,
                channel_ready: Option<msgs::ChannelReady>, announcement_sigs: Option<msgs::AnnouncementSignatures>)
        -> Option<(u64, OutPoint, u128, Vec<(PendingHTLCInfo, u64)>)> {
+               log_trace!(self.logger, "Handling channel resumption for channel {} with {} RAA, {} commitment update, {} pending forwards, {}broadcasting funding, {} channel ready, {} announcement",
+                       log_bytes!(channel.channel_id()),
+                       if raa.is_some() { "an" } else { "no" },
+                       if commitment_update.is_some() { "a" } else { "no" }, pending_forwards.len(),
+                       if funding_broadcastable.is_some() { "" } else { "not " },
+                       if channel_ready.is_some() { "sending" } else { "without" },
+                       if announcement_sigs.is_some() { "sending" } else { "without" });
+
                let mut htlc_forwards = None;
 
                let counterparty_node_id = channel.get_counterparty_node_id();
@@ -4188,67 +4178,38 @@ where
        }
 
        fn channel_monitor_updated(&self, funding_txo: &OutPoint, highest_applied_update_id: u64, counterparty_node_id: Option<&PublicKey>) {
-               let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);
+               debug_assert!(self.total_consistency_lock.try_write().is_err()); // Caller holds read lock
 
-               let htlc_forwards;
-               let (mut pending_failures, finalized_claims, counterparty_node_id) = {
-                       let counterparty_node_id = match counterparty_node_id {
-                               Some(cp_id) => cp_id.clone(),
-                               None => {
-                                       // TODO: Once we can rely on the counterparty_node_id from the
-                                       // monitor event, this and the id_to_peer map should be removed.
-                                       let id_to_peer = self.id_to_peer.lock().unwrap();
-                                       match id_to_peer.get(&funding_txo.to_channel_id()) {
-                                               Some(cp_id) => cp_id.clone(),
-                                               None => return,
-                                       }
-                               }
-                       };
-                       let per_peer_state = self.per_peer_state.read().unwrap();
-                       let mut peer_state_lock;
-                       let peer_state_mutex_opt = per_peer_state.get(&counterparty_node_id);
-                       if let None = peer_state_mutex_opt { return }
-                       peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap();
-                       let peer_state = &mut *peer_state_lock;
-                       let mut channel = {
-                               match peer_state.channel_by_id.entry(funding_txo.to_channel_id()){
-                                       hash_map::Entry::Occupied(chan) => chan,
-                                       hash_map::Entry::Vacant(_) => return,
+               let counterparty_node_id = match counterparty_node_id {
+                       Some(cp_id) => cp_id.clone(),
+                       None => {
+                               // TODO: Once we can rely on the counterparty_node_id from the
+                               // monitor event, this and the id_to_peer map should be removed.
+                               let id_to_peer = self.id_to_peer.lock().unwrap();
+                               match id_to_peer.get(&funding_txo.to_channel_id()) {
+                                       Some(cp_id) => cp_id.clone(),
+                                       None => return,
                                }
-                       };
-                       if !channel.get().is_awaiting_monitor_update() || channel.get().get_latest_monitor_update_id() != highest_applied_update_id {
-                               return;
                        }
-
-                       let updates = channel.get_mut().monitor_updating_restored(&self.logger, &self.node_signer, self.genesis_hash, &self.default_configuration, self.best_block.read().unwrap().height());
-                       let channel_update = if updates.channel_ready.is_some() && channel.get().is_usable() {
-                               // We only send a channel_update in the case where we are just now sending a
-                               // channel_ready and the channel is in a usable state. We may re-send a
-                               // channel_update later through the announcement_signatures process for public
-                               // channels, but there's no reason not to just inform our counterparty of our fees
-                               // now.
-                               if let Ok(msg) = self.get_channel_update_for_unicast(channel.get()) {
-                                       Some(events::MessageSendEvent::SendChannelUpdate {
-                                               node_id: channel.get().get_counterparty_node_id(),
-                                               msg,
-                                       })
-                               } else { None }
-                       } else { None };
-                       htlc_forwards = self.handle_channel_resumption(&mut peer_state.pending_msg_events, channel.get_mut(), updates.raa, updates.commitment_update, updates.order, updates.accepted_htlcs, updates.funding_broadcastable, updates.channel_ready, updates.announcement_sigs);
-                       if let Some(upd) = channel_update {
-                               peer_state.pending_msg_events.push(upd);
+               };
+               let per_peer_state = self.per_peer_state.read().unwrap();
+               let mut peer_state_lock;
+               let peer_state_mutex_opt = per_peer_state.get(&counterparty_node_id);
+               if peer_state_mutex_opt.is_none() { return }
+               peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap();
+               let peer_state = &mut *peer_state_lock;
+               let mut channel = {
+                       match peer_state.channel_by_id.entry(funding_txo.to_channel_id()){
+                               hash_map::Entry::Occupied(chan) => chan,
+                               hash_map::Entry::Vacant(_) => return,
                        }
-
-                       (updates.failed_htlcs, updates.finalized_claimed_htlcs, counterparty_node_id)
                };
-               if let Some(forwards) = htlc_forwards {
-                       self.forward_htlcs(&mut [forwards][..]);
-               }
-               self.finalize_claims(finalized_claims);
-               for failure in pending_failures.drain(..) {
-                       let receiver = HTLCDestination::NextHopChannel { node_id: Some(counterparty_node_id), channel_id: funding_txo.to_channel_id() };
-                       self.fail_htlc_backwards_internal(&failure.0, &failure.1, &failure.2, receiver);
+               log_trace!(self.logger, "ChannelMonitor updated to {}. Current highest is {}",
+                       highest_applied_update_id, channel.get().get_latest_monitor_update_id());
+               if !channel.get().is_awaiting_monitor_update() || channel.get().get_latest_monitor_update_id() != highest_applied_update_id {
+                       return;
                }
+               handle_monitor_update_completion!(self, highest_applied_update_id, peer_state_lock, peer_state, per_peer_state, channel.get_mut());
        }
 
        /// Accepts a request to open a channel after a [`Event::OpenChannelRequest`].
@@ -4296,13 +4257,13 @@ where
        fn do_accept_inbound_channel(&self, temporary_channel_id: &[u8; 32], counterparty_node_id: &PublicKey, accept_0conf: bool, user_channel_id: u128) -> Result<(), APIError> {
                let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);
 
+               let peers_without_funded_channels = self.peers_without_funded_channels(|peer| !peer.channel_by_id.is_empty());
                let per_peer_state = self.per_peer_state.read().unwrap();
-               let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id);
-               if let None = peer_state_mutex_opt {
-                       return Err(APIError::APIMisuseError { err: format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id) });
-               }
-               let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap();
+               let peer_state_mutex = per_peer_state.get(counterparty_node_id)
+                       .ok_or_else(|| APIError::ChannelUnavailable { err: format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id) })?;
+               let mut peer_state_lock = peer_state_mutex.lock().unwrap();
                let peer_state = &mut *peer_state_lock;
+               let is_only_peer_channel = peer_state.channel_by_id.len() == 1;
                match peer_state.channel_by_id.entry(temporary_channel_id.clone()) {
                        hash_map::Entry::Occupied(mut channel) => {
                                if !channel.get().inbound_is_awaiting_accept() {
@@ -4320,6 +4281,21 @@ where
                                        peer_state.pending_msg_events.push(send_msg_err_event);
                                        let _ = remove_channel!(self, channel);
                                        return Err(APIError::APIMisuseError { err: "Please use accept_inbound_channel_from_trusted_peer_0conf to accept channels with zero confirmations.".to_owned() });
+                               } else {
+                                       // If this peer already has some channels, a new channel won't increase our number of peers
+                                       // with unfunded channels, so as long as we aren't over the maximum number of unfunded
+                                       // channels per-peer we can accept channels from a peer with existing ones.
+                                       if is_only_peer_channel && peers_without_funded_channels >= MAX_UNFUNDED_CHANNEL_PEERS {
+                                               let send_msg_err_event = events::MessageSendEvent::HandleError {
+                                                       node_id: channel.get().get_counterparty_node_id(),
+                                                       action: msgs::ErrorAction::SendErrorMessage{
+                                                               msg: msgs::ErrorMessage { channel_id: temporary_channel_id.clone(), data: "Have too many peers with unfunded channels, not accepting new ones".to_owned(), }
+                                                       }
+                                               };
+                                               peer_state.pending_msg_events.push(send_msg_err_event);
+                                               let _ = remove_channel!(self, channel);
+                                               return Err(APIError::APIMisuseError { err: "Too many peers with unfunded channels, refusing to accept new ones".to_owned() });
+                                       }
                                }
 
                                peer_state.pending_msg_events.push(events::MessageSendEvent::SendAcceptChannel {
@@ -4334,6 +4310,43 @@ where
                Ok(())
        }
 
+       /// Gets the number of peers which match the given filter and do not have any funded, outbound,
+       /// or 0-conf channels.
+       ///
+       /// The filter is called for each peer and provided with the number of unfunded, inbound, and
+       /// non-0-conf channels we have with the peer.
+       fn peers_without_funded_channels<Filter>(&self, maybe_count_peer: Filter) -> usize
+       where Filter: Fn(&PeerState<<SP::Target as SignerProvider>::Signer>) -> bool {
+               let mut peers_without_funded_channels = 0;
+               let best_block_height = self.best_block.read().unwrap().height();
+               {
+                       let peer_state_lock = self.per_peer_state.read().unwrap();
+                       for (_, peer_mtx) in peer_state_lock.iter() {
+                               let peer = peer_mtx.lock().unwrap();
+                               if !maybe_count_peer(&*peer) { continue; }
+                               let num_unfunded_channels = Self::unfunded_channel_count(&peer, best_block_height);
+                               if num_unfunded_channels == peer.channel_by_id.len() {
+                                       peers_without_funded_channels += 1;
+                               }
+                       }
+               }
+               return peers_without_funded_channels;
+       }
+
+       fn unfunded_channel_count(
+               peer: &PeerState<<SP::Target as SignerProvider>::Signer>, best_block_height: u32
+       ) -> usize {
+               let mut num_unfunded_channels = 0;
+               for (_, chan) in peer.channel_by_id.iter() {
+                       if !chan.is_outbound() && chan.minimum_depth().unwrap_or(1) != 0 &&
+                               chan.get_funding_tx_confirmations(best_block_height) == 0
+                       {
+                               num_unfunded_channels += 1;
+                       }
+               }
+               num_unfunded_channels
+       }
+
        fn internal_open_channel(&self, counterparty_node_id: &PublicKey, msg: &msgs::OpenChannel) -> Result<(), MsgHandleErrInternal> {
                if msg.chain_hash != self.genesis_hash {
                        return Err(MsgHandleErrInternal::send_err_msg_no_close("Unknown genesis block hash".to_owned(), msg.temporary_channel_id.clone()));
@@ -4346,18 +4359,44 @@ where
                let mut random_bytes = [0u8; 16];
                random_bytes.copy_from_slice(&self.entropy_source.get_secure_random_bytes()[..16]);
                let user_channel_id = u128::from_be_bytes(random_bytes);
-
                let outbound_scid_alias = self.create_and_insert_outbound_scid_alias();
+
+               // Get the number of peers with channels, but without funded ones. We don't care too much
+               // about peers that never open a channel, so we filter by peers that have at least one
+               // channel, and then limit the number of those with unfunded channels.
+               let channeled_peers_without_funding = self.peers_without_funded_channels(|node| !node.channel_by_id.is_empty());
+
                let per_peer_state = self.per_peer_state.read().unwrap();
-               let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id);
-               if let None = peer_state_mutex_opt {
-                       return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.temporary_channel_id.clone()))
-               }
-               let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap();
+               let peer_state_mutex = per_peer_state.get(counterparty_node_id)
+                   .ok_or_else(|| {
+                               debug_assert!(false);
+                               MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.temporary_channel_id.clone())
+                       })?;
+               let mut peer_state_lock = peer_state_mutex.lock().unwrap();
                let peer_state = &mut *peer_state_lock;
+
+               // If this peer already has some channels, a new channel won't increase our number of peers
+               // with unfunded channels, so as long as we aren't over the maximum number of unfunded
+               // channels per-peer we can accept channels from a peer with existing ones.
+               if peer_state.channel_by_id.is_empty() &&
+                       channeled_peers_without_funding >= MAX_UNFUNDED_CHANNEL_PEERS &&
+                       !self.default_configuration.manually_accept_inbound_channels
+               {
+                       return Err(MsgHandleErrInternal::send_err_msg_no_close(
+                               "Have too many peers with unfunded channels, not accepting new ones".to_owned(),
+                               msg.temporary_channel_id.clone()));
+               }
+
+               let best_block_height = self.best_block.read().unwrap().height();
+               if Self::unfunded_channel_count(peer_state, best_block_height) >= MAX_UNFUNDED_CHANS_PER_PEER {
+                       return Err(MsgHandleErrInternal::send_err_msg_no_close(
+                               format!("Refusing more than {} unfunded channels.", MAX_UNFUNDED_CHANS_PER_PEER),
+                               msg.temporary_channel_id.clone()));
+               }
+
                let mut channel = match Channel::new_from_req(&self.fee_estimator, &self.entropy_source, &self.signer_provider,
-                       counterparty_node_id.clone(), &self.channel_type_features(), &peer_state.latest_features, msg, user_channel_id, &self.default_configuration,
-                       self.best_block.read().unwrap().height(), &self.logger, outbound_scid_alias)
+                       counterparty_node_id.clone(), &self.channel_type_features(), &peer_state.latest_features, msg, user_channel_id,
+                       &self.default_configuration, best_block_height, &self.logger, outbound_scid_alias)
                {
                        Err(e) => {
                                self.outbound_scid_aliases.lock().unwrap().remove(&outbound_scid_alias);
@@ -4401,11 +4440,12 @@ where
        fn internal_accept_channel(&self, counterparty_node_id: &PublicKey, msg: &msgs::AcceptChannel) -> Result<(), MsgHandleErrInternal> {
                let (value, output_script, user_id) = {
                        let per_peer_state = self.per_peer_state.read().unwrap();
-                       let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id);
-                       if let None = peer_state_mutex_opt {
-                               return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.temporary_channel_id))
-                       }
-                       let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap();
+                       let peer_state_mutex = per_peer_state.get(counterparty_node_id)
+                               .ok_or_else(|| {
+                                       debug_assert!(false);
+                                       MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.temporary_channel_id)
+                               })?;
+                       let mut peer_state_lock = peer_state_mutex.lock().unwrap();
                        let peer_state = &mut *peer_state_lock;
                        match peer_state.channel_by_id.entry(msg.temporary_channel_id) {
                                hash_map::Entry::Occupied(mut chan) => {
@@ -4427,59 +4467,31 @@ where
        }
 
        fn internal_funding_created(&self, counterparty_node_id: &PublicKey, msg: &msgs::FundingCreated) -> Result<(), MsgHandleErrInternal> {
+               let best_block = *self.best_block.read().unwrap();
+
                let per_peer_state = self.per_peer_state.read().unwrap();
-               let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id);
-               if let None = peer_state_mutex_opt {
-                       return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.temporary_channel_id))
-               }
-               let ((funding_msg, monitor, mut channel_ready), mut chan) = {
-                       let best_block = *self.best_block.read().unwrap();
-                       let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap();
-                       let peer_state = &mut *peer_state_lock;
+               let peer_state_mutex = per_peer_state.get(counterparty_node_id)
+                       .ok_or_else(|| {
+                               debug_assert!(false);
+                               MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.temporary_channel_id)
+                       })?;
+
+               let mut peer_state_lock = peer_state_mutex.lock().unwrap();
+               let peer_state = &mut *peer_state_lock;
+               let ((funding_msg, monitor), chan) =
                        match peer_state.channel_by_id.entry(msg.temporary_channel_id) {
                                hash_map::Entry::Occupied(mut chan) => {
                                        (try_chan_entry!(self, chan.get_mut().funding_created(msg, best_block, &self.signer_provider, &self.logger), chan), chan.remove())
                                },
                                hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.temporary_channel_id))
-                       }
-               };
-               // Because we have exclusive ownership of the channel here we can release the peer_state
-               // lock before watch_channel
-               match self.chain_monitor.watch_channel(monitor.get_funding_txo().0, monitor) {
-                       ChannelMonitorUpdateStatus::Completed => {},
-                       ChannelMonitorUpdateStatus::PermanentFailure => {
-                               // Note that we reply with the new channel_id in error messages if we gave up on the
-                               // channel, not the temporary_channel_id. This is compatible with ourselves, but the
-                               // spec is somewhat ambiguous here. Not a huge deal since we'll send error messages for
-                               // any messages referencing a previously-closed channel anyway.
-                               // We do not propagate the monitor update to the user as it would be for a monitor
-                               // that we didn't manage to store (and that we don't care about - we don't respond
-                               // with the funding_signed so the channel can never go on chain).
-                               let (_monitor_update, failed_htlcs) = chan.force_shutdown(false);
-                               assert!(failed_htlcs.is_empty());
-                               return Err(MsgHandleErrInternal::send_err_msg_no_close("ChannelMonitor storage failure".to_owned(), funding_msg.channel_id));
-                       },
-                       ChannelMonitorUpdateStatus::InProgress => {
-                               // There's no problem signing a counterparty's funding transaction if our monitor
-                               // hasn't persisted to disk yet - we can't lose money on a transaction that we haven't
-                               // accepted payment from yet. We do, however, need to wait to send our channel_ready
-                               // until we have persisted our monitor.
-                               chan.monitor_updating_paused(false, false, channel_ready.is_some(), Vec::new(), Vec::new(), Vec::new());
-                               channel_ready = None; // Don't send the channel_ready now
-                       },
-               }
-               // It's safe to unwrap as we've held the `per_peer_state` read lock since checking that the
-               // peer exists, despite the inner PeerState potentially having no channels after removing
-               // the channel above.
-               let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap();
-               let peer_state = &mut *peer_state_lock;
+                       };
+
                match peer_state.channel_by_id.entry(funding_msg.channel_id) {
                        hash_map::Entry::Occupied(_) => {
-                               return Err(MsgHandleErrInternal::send_err_msg_no_close("Already had channel with the new channel_id".to_owned(), funding_msg.channel_id))
+                               Err(MsgHandleErrInternal::send_err_msg_no_close("Already had channel with the new channel_id".to_owned(), funding_msg.channel_id))
                        },
                        hash_map::Entry::Vacant(e) => {
-                               let mut id_to_peer = self.id_to_peer.lock().unwrap();
-                               match id_to_peer.entry(chan.channel_id()) {
+                               match self.id_to_peer.lock().unwrap().entry(chan.channel_id()) {
                                        hash_map::Entry::Occupied(_) => {
                                                return Err(MsgHandleErrInternal::send_err_msg_no_close(
                                                        "The funding_created message had the same funding_txid as an existing channel - funding is not possible".to_owned(),
@@ -4489,71 +4501,77 @@ where
                                                i_e.insert(chan.get_counterparty_node_id());
                                        }
                                }
+
+                               // There's no problem signing a counterparty's funding transaction if our monitor
+                               // hasn't persisted to disk yet - we can't lose money on a transaction that we haven't
+                               // accepted payment from yet. We do, however, need to wait to send our channel_ready
+                               // until we have persisted our monitor.
+                               let new_channel_id = funding_msg.channel_id;
                                peer_state.pending_msg_events.push(events::MessageSendEvent::SendFundingSigned {
                                        node_id: counterparty_node_id.clone(),
                                        msg: funding_msg,
                                });
-                               if let Some(msg) = channel_ready {
-                                       send_channel_ready!(self, peer_state.pending_msg_events, chan, msg);
+
+                               let monitor_res = self.chain_monitor.watch_channel(monitor.get_funding_txo().0, monitor);
+
+                               let chan = e.insert(chan);
+                               let mut res = handle_new_monitor_update!(self, monitor_res, 0, peer_state_lock, peer_state,
+                                       per_peer_state, chan, MANUALLY_REMOVING, { peer_state.channel_by_id.remove(&new_channel_id) });
+
+                               // Note that we reply with the new channel_id in error messages if we gave up on the
+                               // channel, not the temporary_channel_id. This is compatible with ourselves, but the
+                               // spec is somewhat ambiguous here. Not a huge deal since we'll send error messages for
+                               // any messages referencing a previously-closed channel anyway.
+                               // We do not propagate the monitor update to the user as it would be for a monitor
+                               // that we didn't manage to store (and that we don't care about - we don't respond
+                               // with the funding_signed so the channel can never go on chain).
+                               if let Err(MsgHandleErrInternal { shutdown_finish: Some((res, _)), .. }) = &mut res {
+                                       res.0 = None;
                                }
-                               e.insert(chan);
+                               res
                        }
                }
-               Ok(())
        }
 
        fn internal_funding_signed(&self, counterparty_node_id: &PublicKey, msg: &msgs::FundingSigned) -> Result<(), MsgHandleErrInternal> {
-               let funding_tx = {
-                       let best_block = *self.best_block.read().unwrap();
-                       let per_peer_state = self.per_peer_state.read().unwrap();
-                       let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id);
-                       if let None = peer_state_mutex_opt {
-                               return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id))
-                       }
+               let best_block = *self.best_block.read().unwrap();
+               let per_peer_state = self.per_peer_state.read().unwrap();
+               let peer_state_mutex = per_peer_state.get(counterparty_node_id)
+                       .ok_or_else(|| {
+                               debug_assert!(false);
+                               MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id)
+                       })?;
 
-                       let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap();
-                       let peer_state = &mut *peer_state_lock;
-                       match peer_state.channel_by_id.entry(msg.channel_id) {
-                               hash_map::Entry::Occupied(mut chan) => {
-                                       let (monitor, funding_tx, channel_ready) = match chan.get_mut().funding_signed(&msg, best_block, &self.signer_provider, &self.logger) {
-                                               Ok(update) => update,
-                                               Err(e) => try_chan_entry!(self, Err(e), chan),
-                                       };
-                                       match self.chain_monitor.watch_channel(chan.get().get_funding_txo().unwrap(), monitor) {
-                                               ChannelMonitorUpdateStatus::Completed => {},
-                                               e => {
-                                                       let mut res = handle_monitor_update_res!(self, e, chan, RAACommitmentOrder::RevokeAndACKFirst, channel_ready.is_some(), OPTIONALLY_RESEND_FUNDING_LOCKED);
-                                                       if let Err(MsgHandleErrInternal { ref mut shutdown_finish, .. }) = res {
-                                                               // We weren't able to watch the channel to begin with, so no updates should be made on
-                                                               // it. Previously, full_stack_target found an (unreachable) panic when the
-                                                               // monitor update contained within `shutdown_finish` was applied.
-                                                               if let Some((ref mut shutdown_finish, _)) = shutdown_finish {
-                                                                       shutdown_finish.0.take();
-                                                               }
-                                                       }
-                                                       return res
-                                               },
-                                       }
-                                       if let Some(msg) = channel_ready {
-                                               send_channel_ready!(self, peer_state.pending_msg_events, chan.get(), msg);
+               let mut peer_state_lock = peer_state_mutex.lock().unwrap();
+               let peer_state = &mut *peer_state_lock;
+               match peer_state.channel_by_id.entry(msg.channel_id) {
+                       hash_map::Entry::Occupied(mut chan) => {
+                               let monitor = try_chan_entry!(self,
+                                       chan.get_mut().funding_signed(&msg, best_block, &self.signer_provider, &self.logger), chan);
+                               let update_res = self.chain_monitor.watch_channel(chan.get().get_funding_txo().unwrap(), monitor);
+                               let mut res = handle_new_monitor_update!(self, update_res, 0, peer_state_lock, peer_state, per_peer_state, chan);
+                               if let Err(MsgHandleErrInternal { ref mut shutdown_finish, .. }) = res {
+                                       // We weren't able to watch the channel to begin with, so no updates should be made on
+                                       // it. Previously, full_stack_target found an (unreachable) panic when the
+                                       // monitor update contained within `shutdown_finish` was applied.
+                                       if let Some((ref mut shutdown_finish, _)) = shutdown_finish {
+                                               shutdown_finish.0.take();
                                        }
-                                       funding_tx
-                               },
-                               hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id))
-                       }
-               };
-               log_info!(self.logger, "Broadcasting funding transaction with txid {}", funding_tx.txid());
-               self.tx_broadcaster.broadcast_transaction(&funding_tx);
-               Ok(())
+                               }
+                               res
+                       },
+                       hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel".to_owned(), msg.channel_id))
+               }
        }
 
        fn internal_channel_ready(&self, counterparty_node_id: &PublicKey, msg: &msgs::ChannelReady) -> Result<(), MsgHandleErrInternal> {
                let per_peer_state = self.per_peer_state.read().unwrap();
-               let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id);
-               if let None = peer_state_mutex_opt {
-                       return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id));
-               }
-               let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap();
+               let peer_state_mutex = per_peer_state.get(counterparty_node_id)
+                       .ok_or_else(|| {
+                               debug_assert!(false);
+                               MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id)
+                       })?;
+               let mut peer_state_lock = peer_state_mutex.lock().unwrap();
                let peer_state = &mut *peer_state_lock;
                match peer_state.channel_by_id.entry(msg.channel_id) {
                        hash_map::Entry::Occupied(mut chan) => {
@@ -4592,11 +4610,12 @@ where
                let mut dropped_htlcs: Vec<(HTLCSource, PaymentHash)>;
                let result: Result<(), _> = loop {
                        let per_peer_state = self.per_peer_state.read().unwrap();
-                       let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id);
-                       if let None = peer_state_mutex_opt {
-                               return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id))
-                       }
-                       let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap();
+                       let peer_state_mutex = per_peer_state.get(counterparty_node_id)
+                               .ok_or_else(|| {
+                                       debug_assert!(false);
+                                       MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id)
+                               })?;
+                       let mut peer_state_lock = peer_state_mutex.lock().unwrap();
                        let peer_state = &mut *peer_state_lock;
                        match peer_state.channel_by_id.entry(msg.channel_id.clone()) {
                                hash_map::Entry::Occupied(mut chan_entry) => {
@@ -4607,27 +4626,27 @@ where
                                                        if chan_entry.get().sent_shutdown() { " after we initiated shutdown" } else { "" });
                                        }
 
-                                       let (shutdown, monitor_update, htlcs) = try_chan_entry!(self, chan_entry.get_mut().shutdown(&self.signer_provider, &peer_state.latest_features, &msg), chan_entry);
+                                       let funding_txo_opt = chan_entry.get().get_funding_txo();
+                                       let (shutdown, monitor_update_opt, htlcs) = try_chan_entry!(self,
+                                               chan_entry.get_mut().shutdown(&self.signer_provider, &peer_state.latest_features, &msg), chan_entry);
                                        dropped_htlcs = htlcs;
 
-                                       // Update the monitor with the shutdown script if necessary.
-                                       if let Some(monitor_update) = monitor_update {
-                                               let update_res = self.chain_monitor.update_channel(chan_entry.get().get_funding_txo().unwrap(), &monitor_update);
-                                               let (result, is_permanent) =
-                                                       handle_monitor_update_res!(self, update_res, chan_entry.get_mut(), RAACommitmentOrder::CommitmentFirst, chan_entry.key(), NO_UPDATE);
-                                               if is_permanent {
-                                                       remove_channel!(self, chan_entry);
-                                                       break result;
-                                               }
-                                       }
-
                                        if let Some(msg) = shutdown {
+                                               // We can send the `shutdown` message before updating the `ChannelMonitor`
+                                               // here as we don't need the monitor update to complete until we send a
+                                               // `shutdown_signed`, which we'll delay if we're pending a monitor update.
                                                peer_state.pending_msg_events.push(events::MessageSendEvent::SendShutdown {
                                                        node_id: *counterparty_node_id,
                                                        msg,
                                                });
                                        }
 
+                                       // Update the monitor with the shutdown script if necessary.
+                                       if let Some(monitor_update) = monitor_update_opt {
+                                               let update_id = monitor_update.update_id;
+                                               let update_res = self.chain_monitor.update_channel(funding_txo_opt.unwrap(), monitor_update);
+                                               break handle_new_monitor_update!(self, update_res, update_id, peer_state_lock, peer_state, per_peer_state, chan_entry);
+                                       }
                                        break Ok(());
                                },
                                hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id))
@@ -4639,18 +4658,18 @@ where
                        self.fail_htlc_backwards_internal(&htlc_source.0, &htlc_source.1, &reason, receiver);
                }
 
-               let _ = handle_error!(self, result, *counterparty_node_id);
-               Ok(())
+               result
        }
 
        fn internal_closing_signed(&self, counterparty_node_id: &PublicKey, msg: &msgs::ClosingSigned) -> Result<(), MsgHandleErrInternal> {
                let per_peer_state = self.per_peer_state.read().unwrap();
-               let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id);
-               if let None = peer_state_mutex_opt {
-                       return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id))
-               }
+               let peer_state_mutex = per_peer_state.get(counterparty_node_id)
+                       .ok_or_else(|| {
+                               debug_assert!(false);
+                               MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id)
+                       })?;
                let (tx, chan_option) = {
-                       let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap();
+                       let mut peer_state_lock = peer_state_mutex.lock().unwrap();
                        let peer_state = &mut *peer_state_lock;
                        match peer_state.channel_by_id.entry(msg.channel_id.clone()) {
                                hash_map::Entry::Occupied(mut chan_entry) => {
@@ -4679,7 +4698,7 @@ where
                }
                if let Some(chan) = chan_option {
                        if let Ok(update) = self.get_channel_update_for_broadcast(&chan) {
-                               let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap();
+                               let mut peer_state_lock = peer_state_mutex.lock().unwrap();
                                let peer_state = &mut *peer_state_lock;
                                peer_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
                                        msg: update
@@ -4702,11 +4721,12 @@ where
 
                let pending_forward_info = self.decode_update_add_htlc_onion(msg);
                let per_peer_state = self.per_peer_state.read().unwrap();
-               let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id);
-               if let None = peer_state_mutex_opt {
-                       return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id))
-               }
-               let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap();
+               let peer_state_mutex = per_peer_state.get(counterparty_node_id)
+                       .ok_or_else(|| {
+                               debug_assert!(false);
+                               MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id)
+                       })?;
+               let mut peer_state_lock = peer_state_mutex.lock().unwrap();
                let peer_state = &mut *peer_state_lock;
                match peer_state.channel_by_id.entry(msg.channel_id) {
                        hash_map::Entry::Occupied(mut chan) => {
@@ -4743,11 +4763,12 @@ where
        fn internal_update_fulfill_htlc(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateFulfillHTLC) -> Result<(), MsgHandleErrInternal> {
                let (htlc_source, forwarded_htlc_value) = {
                        let per_peer_state = self.per_peer_state.read().unwrap();
-                       let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id);
-                       if let None = peer_state_mutex_opt {
-                               return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id));
-                       }
-                       let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap();
+                       let peer_state_mutex = per_peer_state.get(counterparty_node_id)
+                               .ok_or_else(|| {
+                                       debug_assert!(false);
+                                       MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id)
+                               })?;
+                       let mut peer_state_lock = peer_state_mutex.lock().unwrap();
                        let peer_state = &mut *peer_state_lock;
                        match peer_state.channel_by_id.entry(msg.channel_id) {
                                hash_map::Entry::Occupied(mut chan) => {
@@ -4762,11 +4783,12 @@ where
 
        fn internal_update_fail_htlc(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateFailHTLC) -> Result<(), MsgHandleErrInternal> {
                let per_peer_state = self.per_peer_state.read().unwrap();
-               let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id);
-               if let None = peer_state_mutex_opt {
-                       return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id));
-               }
-               let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap();
+               let peer_state_mutex = per_peer_state.get(counterparty_node_id)
+                       .ok_or_else(|| {
+                               debug_assert!(false);
+                               MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id)
+                       })?;
+               let mut peer_state_lock = peer_state_mutex.lock().unwrap();
                let peer_state = &mut *peer_state_lock;
                match peer_state.channel_by_id.entry(msg.channel_id) {
                        hash_map::Entry::Occupied(mut chan) => {
@@ -4779,11 +4801,12 @@ where
 
        fn internal_update_fail_malformed_htlc(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateFailMalformedHTLC) -> Result<(), MsgHandleErrInternal> {
                let per_peer_state = self.per_peer_state.read().unwrap();
-               let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id);
-               if let None = peer_state_mutex_opt {
-                       return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id))
-               }
-               let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap();
+               let peer_state_mutex = per_peer_state.get(counterparty_node_id)
+                       .ok_or_else(|| {
+                               debug_assert!(false);
+                               MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id)
+                       })?;
+               let mut peer_state_lock = peer_state_mutex.lock().unwrap();
                let peer_state = &mut *peer_state_lock;
                match peer_state.channel_by_id.entry(msg.channel_id) {
                        hash_map::Entry::Occupied(mut chan) => {
@@ -4800,48 +4823,21 @@ where
 
        fn internal_commitment_signed(&self, counterparty_node_id: &PublicKey, msg: &msgs::CommitmentSigned) -> Result<(), MsgHandleErrInternal> {
                let per_peer_state = self.per_peer_state.read().unwrap();
-               let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id);
-               if let None = peer_state_mutex_opt {
-                       return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id))
-               }
-               let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap();
+               let peer_state_mutex = per_peer_state.get(counterparty_node_id)
+                       .ok_or_else(|| {
+                               debug_assert!(false);
+                               MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id)
+                       })?;
+               let mut peer_state_lock = peer_state_mutex.lock().unwrap();
                let peer_state = &mut *peer_state_lock;
                match peer_state.channel_by_id.entry(msg.channel_id) {
                        hash_map::Entry::Occupied(mut chan) => {
-                               let (revoke_and_ack, commitment_signed, monitor_update) =
-                                       match chan.get_mut().commitment_signed(&msg, &self.logger) {
-                                               Err((None, e)) => try_chan_entry!(self, Err(e), chan),
-                                               Err((Some(update), e)) => {
-                                                       assert!(chan.get().is_awaiting_monitor_update());
-                                                       let _ = self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), &update);
-                                                       try_chan_entry!(self, Err(e), chan);
-                                                       unreachable!();
-                                               },
-                                               Ok(res) => res
-                                       };
-                               let update_res = self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), &monitor_update);
-                               if let Err(e) = handle_monitor_update_res!(self, update_res, chan, RAACommitmentOrder::RevokeAndACKFirst, true, commitment_signed.is_some()) {
-                                       return Err(e);
-                               }
-
-                               peer_state.pending_msg_events.push(events::MessageSendEvent::SendRevokeAndACK {
-                                       node_id: counterparty_node_id.clone(),
-                                       msg: revoke_and_ack,
-                               });
-                               if let Some(msg) = commitment_signed {
-                                       peer_state.pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs {
-                                               node_id: counterparty_node_id.clone(),
-                                               updates: msgs::CommitmentUpdate {
-                                                       update_add_htlcs: Vec::new(),
-                                                       update_fulfill_htlcs: Vec::new(),
-                                                       update_fail_htlcs: Vec::new(),
-                                                       update_fail_malformed_htlcs: Vec::new(),
-                                                       update_fee: None,
-                                                       commitment_signed: msg,
-                                               },
-                                       });
-                               }
-                               Ok(())
+                               let funding_txo = chan.get().get_funding_txo();
+                               let monitor_update = try_chan_entry!(self, chan.get_mut().commitment_signed(&msg, &self.logger), chan);
+                               let update_res = self.chain_monitor.update_channel(funding_txo.unwrap(), monitor_update);
+                               let update_id = monitor_update.update_id;
+                               handle_new_monitor_update!(self, update_res, update_id, peer_state_lock,
+                                       peer_state, per_peer_state, chan)
                        },
                        hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id))
                }
@@ -4850,7 +4846,7 @@ where
        #[inline]
        fn forward_htlcs(&self, per_source_pending_forwards: &mut [(u64, OutPoint, u128, Vec<(PendingHTLCInfo, u64)>)]) {
                for &mut (prev_short_channel_id, prev_funding_outpoint, prev_user_channel_id, ref mut pending_forwards) in per_source_pending_forwards {
-                       let mut forward_event = None;
+                       let mut push_forward_event = false;
                        let mut new_intercept_events = Vec::new();
                        let mut failed_intercept_forwards = Vec::new();
                        if !pending_forwards.is_empty() {
@@ -4908,7 +4904,7 @@ where
                                                                // We don't want to generate a PendingHTLCsForwardable event if only intercepted
                                                                // payments are being processed.
                                                                if forward_htlcs_empty {
-                                                                       forward_event = Some(Duration::from_millis(MIN_HTLC_RELAY_HOLDING_CELL_MILLIS));
+                                                                       push_forward_event = true;
                                                                }
                                                                entry.insert(vec!(HTLCForwardInfo::AddHTLC(PendingAddHTLCInfo {
                                                                        prev_short_channel_id, prev_funding_outpoint, prev_htlc_id, prev_user_channel_id, forward_info })));
@@ -4926,93 +4922,58 @@ where
                                let mut events = self.pending_events.lock().unwrap();
                                events.append(&mut new_intercept_events);
                        }
+                       if push_forward_event { self.push_pending_forwards_ev() }
+               }
+       }
 
-                       match forward_event {
-                               Some(time) => {
-                                       let mut pending_events = self.pending_events.lock().unwrap();
-                                       pending_events.push(events::Event::PendingHTLCsForwardable {
-                                               time_forwardable: time
-                                       });
-                               }
-                               None => {},
-                       }
+       // We only want to push a PendingHTLCsForwardable event if no others are queued.
+       fn push_pending_forwards_ev(&self) {
+               let mut pending_events = self.pending_events.lock().unwrap();
+               let forward_ev_exists = pending_events.iter()
+                       .find(|ev| if let events::Event::PendingHTLCsForwardable { .. } = ev { true } else { false })
+                       .is_some();
+               if !forward_ev_exists {
+                       pending_events.push(events::Event::PendingHTLCsForwardable {
+                               time_forwardable:
+                                       Duration::from_millis(MIN_HTLC_RELAY_HOLDING_CELL_MILLIS),
+                       });
                }
        }
 
        fn internal_revoke_and_ack(&self, counterparty_node_id: &PublicKey, msg: &msgs::RevokeAndACK) -> Result<(), MsgHandleErrInternal> {
-               let mut htlcs_to_fail = Vec::new();
-               let res = loop {
+               let (htlcs_to_fail, res) = {
                        let per_peer_state = self.per_peer_state.read().unwrap();
-                       let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id);
-                       if let None = peer_state_mutex_opt {
-                               break Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id))
-                       }
-                       let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap();
+                       let mut peer_state_lock = per_peer_state.get(counterparty_node_id)
+                               .ok_or_else(|| {
+                                       debug_assert!(false);
+                                       MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id)
+                               }).map(|mtx| mtx.lock().unwrap())?;
                        let peer_state = &mut *peer_state_lock;
                        match peer_state.channel_by_id.entry(msg.channel_id) {
                                hash_map::Entry::Occupied(mut chan) => {
-                                       let was_paused_for_mon_update = chan.get().is_awaiting_monitor_update();
-                                       let raa_updates = break_chan_entry!(self,
-                                               chan.get_mut().revoke_and_ack(&msg, &self.logger), chan);
-                                       htlcs_to_fail = raa_updates.holding_cell_failed_htlcs;
-                                       let update_res = self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), &raa_updates.monitor_update);
-                                       if was_paused_for_mon_update {
-                                               assert!(update_res != ChannelMonitorUpdateStatus::Completed);
-                                               assert!(raa_updates.commitment_update.is_none());
-                                               assert!(raa_updates.accepted_htlcs.is_empty());
-                                               assert!(raa_updates.failed_htlcs.is_empty());
-                                               assert!(raa_updates.finalized_claimed_htlcs.is_empty());
-                                               break Err(MsgHandleErrInternal::ignore_no_close("Existing pending monitor update prevented responses to RAA".to_owned()));
-                                       }
-                                       if update_res != ChannelMonitorUpdateStatus::Completed {
-                                               if let Err(e) = handle_monitor_update_res!(self, update_res, chan,
-                                                               RAACommitmentOrder::CommitmentFirst, false,
-                                                               raa_updates.commitment_update.is_some(), false,
-                                                               raa_updates.accepted_htlcs, raa_updates.failed_htlcs,
-                                                               raa_updates.finalized_claimed_htlcs) {
-                                                       break Err(e);
-                                               } else { unreachable!(); }
-                                       }
-                                       if let Some(updates) = raa_updates.commitment_update {
-                                               peer_state.pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs {
-                                                       node_id: counterparty_node_id.clone(),
-                                                       updates,
-                                               });
-                                       }
-                                       break Ok((raa_updates.accepted_htlcs, raa_updates.failed_htlcs,
-                                                       raa_updates.finalized_claimed_htlcs,
-                                                       chan.get().get_short_channel_id()
-                                                               .unwrap_or(chan.get().outbound_scid_alias()),
-                                                       chan.get().get_funding_txo().unwrap(),
-                                                       chan.get().get_user_id()))
+                                       let funding_txo = chan.get().get_funding_txo();
+                                       let (htlcs_to_fail, monitor_update) = try_chan_entry!(self, chan.get_mut().revoke_and_ack(&msg, &self.logger), chan);
+                                       let update_res = self.chain_monitor.update_channel(funding_txo.unwrap(), monitor_update);
+                                       let update_id = monitor_update.update_id;
+                                       let res = handle_new_monitor_update!(self, update_res, update_id,
+                                               peer_state_lock, peer_state, per_peer_state, chan);
+                                       (htlcs_to_fail, res)
                                },
-                               hash_map::Entry::Vacant(_) => break Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id))
+                               hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id))
                        }
                };
                self.fail_holding_cell_htlcs(htlcs_to_fail, msg.channel_id, counterparty_node_id);
-               match res {
-                       Ok((pending_forwards, mut pending_failures, finalized_claim_htlcs,
-                               short_channel_id, channel_outpoint, user_channel_id)) =>
-                       {
-                               for failure in pending_failures.drain(..) {
-                                       let receiver = HTLCDestination::NextHopChannel { node_id: Some(*counterparty_node_id), channel_id: channel_outpoint.to_channel_id() };
-                                       self.fail_htlc_backwards_internal(&failure.0, &failure.1, &failure.2, receiver);
-                               }
-                               self.forward_htlcs(&mut [(short_channel_id, channel_outpoint, user_channel_id, pending_forwards)]);
-                               self.finalize_claims(finalized_claim_htlcs);
-                               Ok(())
-                       },
-                       Err(e) => Err(e)
-               }
+               res
        }
 
        fn internal_update_fee(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateFee) -> Result<(), MsgHandleErrInternal> {
                let per_peer_state = self.per_peer_state.read().unwrap();
-               let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id);
-               if let None = peer_state_mutex_opt {
-                       return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id));
-               }
-               let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap();
+               let peer_state_mutex = per_peer_state.get(counterparty_node_id)
+                       .ok_or_else(|| {
+                               debug_assert!(false);
+                               MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id)
+                       })?;
+               let mut peer_state_lock = peer_state_mutex.lock().unwrap();
                let peer_state = &mut *peer_state_lock;
                match peer_state.channel_by_id.entry(msg.channel_id) {
                        hash_map::Entry::Occupied(mut chan) => {
@@ -5025,11 +4986,12 @@ where
 
        fn internal_announcement_signatures(&self, counterparty_node_id: &PublicKey, msg: &msgs::AnnouncementSignatures) -> Result<(), MsgHandleErrInternal> {
                let per_peer_state = self.per_peer_state.read().unwrap();
-               let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id);
-               if let None = peer_state_mutex_opt {
-                       return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id));
-               }
-               let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap();
+               let peer_state_mutex = per_peer_state.get(counterparty_node_id)
+                       .ok_or_else(|| {
+                               debug_assert!(false);
+                               MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id)
+                       })?;
+               let mut peer_state_lock = peer_state_mutex.lock().unwrap();
                let peer_state = &mut *peer_state_lock;
                match peer_state.channel_by_id.entry(msg.channel_id) {
                        hash_map::Entry::Occupied(mut chan) => {
@@ -5044,7 +5006,7 @@ where
                                        ), chan),
                                        // Note that announcement_signatures fails if the channel cannot be announced,
                                        // so get_channel_update_for_broadcast will never fail by the time we get here.
-                                       update_msg: self.get_channel_update_for_broadcast(chan.get()).unwrap(),
+                                       update_msg: Some(self.get_channel_update_for_broadcast(chan.get()).unwrap()),
                                });
                        },
                        hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id))
@@ -5063,7 +5025,7 @@ where
                };
                let per_peer_state = self.per_peer_state.read().unwrap();
                let peer_state_mutex_opt = per_peer_state.get(&chan_counterparty_node_id);
-               if let None = peer_state_mutex_opt {
+               if peer_state_mutex_opt.is_none() {
                        return Ok(NotifyOption::SkipPersist)
                }
                let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap();
@@ -5098,11 +5060,12 @@ where
                let need_lnd_workaround = {
                        let per_peer_state = self.per_peer_state.read().unwrap();
 
-                       let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id);
-                       if let None = peer_state_mutex_opt {
-                               return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id));
-                       }
-                       let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap();
+                       let peer_state_mutex = per_peer_state.get(counterparty_node_id)
+                               .ok_or_else(|| {
+                                       debug_assert!(false);
+                                       MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id)
+                               })?;
+                       let mut peer_state_lock = peer_state_mutex.lock().unwrap();
                        let peer_state = &mut *peer_state_lock;
                        match peer_state.channel_by_id.entry(msg.channel_id) {
                                hash_map::Entry::Occupied(mut chan) => {
@@ -5155,6 +5118,8 @@ where
 
        /// Process pending events from the `chain::Watch`, returning whether any events were processed.
        fn process_pending_monitor_events(&self) -> bool {
+               debug_assert!(self.total_consistency_lock.try_write().is_err()); // Caller holds read lock
+
                let mut failed_channels = Vec::new();
                let mut pending_monitor_events = self.chain_monitor.release_pending_monitor_events();
                let has_pending_monitor_events = !pending_monitor_events.is_empty();
@@ -5232,7 +5197,13 @@ where
        /// update events as a separate process method here.
        #[cfg(fuzzing)]
        pub fn process_monitor_events(&self) {
-               self.process_pending_monitor_events();
+               PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, &self.persistence_notifier, || {
+                       if self.process_pending_monitor_events() {
+                               NotifyOption::DoPersist
+                       } else {
+                               NotifyOption::SkipPersist
+                       }
+               });
        }
 
        /// Check the holding cell in each channel and free any pending HTLCs in them if possible.
@@ -5242,50 +5213,45 @@ where
                let mut has_monitor_update = false;
                let mut failed_htlcs = Vec::new();
                let mut handle_errors = Vec::new();
-               {
-                       let per_peer_state = self.per_peer_state.read().unwrap();
 
+               // Walk our list of channels and find any that need to update. Note that when we do find an
+               // update, if it includes actions that must be taken afterwards, we have to drop the
+               // per-peer state lock as well as the top level per_peer_state lock. Thus, we loop until we
+               // manage to go through all our peers without finding a single channel to update.
+               'peer_loop: loop {
+                       let per_peer_state = self.per_peer_state.read().unwrap();
                        for (_cp_id, peer_state_mutex) in per_peer_state.iter() {
-                               let mut peer_state_lock = peer_state_mutex.lock().unwrap();
-                               let peer_state = &mut *peer_state_lock;
-                               let pending_msg_events = &mut peer_state.pending_msg_events;
-                               peer_state.channel_by_id.retain(|channel_id, chan| {
-                                       match chan.maybe_free_holding_cell_htlcs(&self.logger) {
-                                               Ok((commitment_opt, holding_cell_failed_htlcs)) => {
-                                                       if !holding_cell_failed_htlcs.is_empty() {
-                                                               failed_htlcs.push((
-                                                                       holding_cell_failed_htlcs,
-                                                                       *channel_id,
-                                                                       chan.get_counterparty_node_id()
-                                                               ));
-                                                       }
-                                                       if let Some((commitment_update, monitor_update)) = commitment_opt {
-                                                               match self.chain_monitor.update_channel(chan.get_funding_txo().unwrap(), &monitor_update) {
-                                                                       ChannelMonitorUpdateStatus::Completed => {
-                                                                               pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs {
-                                                                                       node_id: chan.get_counterparty_node_id(),
-                                                                                       updates: commitment_update,
-                                                                               });
-                                                                       },
-                                                                       e => {
-                                                                               has_monitor_update = true;
-                                                                               let (res, close_channel) = handle_monitor_update_res!(self, e, chan, RAACommitmentOrder::CommitmentFirst, channel_id, COMMITMENT_UPDATE_ONLY);
-                                                                               handle_errors.push((chan.get_counterparty_node_id(), res));
-                                                                               if close_channel { return false; }
-                                                                       },
-                                                               }
+                               'chan_loop: loop {
+                                       let mut peer_state_lock = peer_state_mutex.lock().unwrap();
+                                       let peer_state: &mut PeerState<_> = &mut *peer_state_lock;
+                                       for (channel_id, chan) in peer_state.channel_by_id.iter_mut() {
+                                               let counterparty_node_id = chan.get_counterparty_node_id();
+                                               let funding_txo = chan.get_funding_txo();
+                                               let (monitor_opt, holding_cell_failed_htlcs) =
+                                                       chan.maybe_free_holding_cell_htlcs(&self.logger);
+                                               if !holding_cell_failed_htlcs.is_empty() {
+                                                       failed_htlcs.push((holding_cell_failed_htlcs, *channel_id, counterparty_node_id));
+                                               }
+                                               if let Some(monitor_update) = monitor_opt {
+                                                       has_monitor_update = true;
+
+                                                       let update_res = self.chain_monitor.update_channel(
+                                                               funding_txo.expect("channel is live"), monitor_update);
+                                                       let update_id = monitor_update.update_id;
+                                                       let channel_id: [u8; 32] = *channel_id;
+                                                       let res = handle_new_monitor_update!(self, update_res, update_id,
+                                                               peer_state_lock, peer_state, per_peer_state, chan, MANUALLY_REMOVING,
+                                                               peer_state.channel_by_id.remove(&channel_id));
+                                                       if res.is_err() {
+                                                               handle_errors.push((counterparty_node_id, res));
                                                        }
-                                                       true
-                                               },
-                                               Err(e) => {
-                                                       let (close_channel, res) = convert_chan_err!(self, e, chan, channel_id);
-                                                       handle_errors.push((chan.get_counterparty_node_id(), Err(res)));
-                                                       // ChannelClosed event is generated by handle_error for us
-                                                       !close_channel
+                                                       continue 'peer_loop;
                                                }
                                        }
-                               });
+                                       break 'chan_loop;
+                               }
                        }
+                       break 'peer_loop;
                }
 
                let has_update = has_monitor_update || !failed_htlcs.is_empty() || !handle_errors.is_empty();
@@ -5418,7 +5384,8 @@ where
        /// [`PaymentHash`] and [`PaymentPreimage`] for you.
        ///
        /// The [`PaymentPreimage`] will ultimately be returned to you in the [`PaymentClaimable`], which
-       /// will have the [`PaymentClaimable::payment_preimage`] field filled in. That should then be
+       /// will have the [`PaymentClaimable::purpose`] be [`PaymentPurpose::InvoicePayment`] with
+       /// its [`PaymentPurpose::InvoicePayment::payment_preimage`] field filled in. That should then be
        /// passed directly to [`claim_funds`].
        ///
        /// See [`create_inbound_payment_for_hash`] for detailed documentation on behavior and requirements.
@@ -5438,7 +5405,9 @@ where
        ///
        /// [`claim_funds`]: Self::claim_funds
        /// [`PaymentClaimable`]: events::Event::PaymentClaimable
-       /// [`PaymentClaimable::payment_preimage`]: events::Event::PaymentClaimable::payment_preimage
+       /// [`PaymentClaimable::purpose`]: events::Event::PaymentClaimable::purpose
+       /// [`PaymentPurpose::InvoicePayment`]: events::PaymentPurpose::InvoicePayment
+       /// [`PaymentPurpose::InvoicePayment::payment_preimage`]: events::PaymentPurpose::InvoicePayment::payment_preimage
        /// [`create_inbound_payment_for_hash`]: Self::create_inbound_payment_for_hash
        pub fn create_inbound_payment(&self, min_value_msat: Option<u64>, invoice_expiry_delta_secs: u32,
                min_final_cltv_expiry_delta: Option<u16>) -> Result<(PaymentHash, PaymentSecret), ()> {
@@ -5717,9 +5686,7 @@ where
                                let mut peer_state_lock = peer_state_mutex.lock().unwrap();
                                let peer_state = &mut *peer_state_lock;
                                if peer_state.pending_msg_events.len() > 0 {
-                                       let mut peer_pending_events = Vec::new();
-                                       mem::swap(&mut peer_pending_events, &mut peer_state.pending_msg_events);
-                                       pending_events.append(&mut peer_pending_events);
+                                       pending_events.append(&mut peer_state.pending_msg_events);
                                }
                        }
 
@@ -5970,7 +5937,7 @@ where
                                                                                msg: announcement,
                                                                                // Note that announcement_signatures fails if the channel cannot be announced,
                                                                                // so get_channel_update_for_broadcast will never fail by the time we get here.
-                                                                               update_msg: self.get_channel_update_for_broadcast(channel).unwrap(),
+                                                                               update_msg: Some(self.get_channel_update_for_broadcast(channel).unwrap()),
                                                                        });
                                                                }
                                                        }
@@ -6247,14 +6214,13 @@ where
                let _ = handle_error!(self, self.internal_channel_reestablish(counterparty_node_id, msg), *counterparty_node_id);
        }
 
-       fn peer_disconnected(&self, counterparty_node_id: &PublicKey, no_connection_possible: bool) {
+       fn peer_disconnected(&self, counterparty_node_id: &PublicKey) {
                let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);
                let mut failed_channels = Vec::new();
-               let mut no_channels_remain = true;
                let mut per_peer_state = self.per_peer_state.write().unwrap();
-               {
-                       log_debug!(self.logger, "Marking channels with {} disconnected and generating channel_updates. We believe we {} make future connections to this peer.",
-                               log_pubkey!(counterparty_node_id), if no_connection_possible { "cannot" } else { "can" });
+               let remove_peer = {
+                       log_debug!(self.logger, "Marking channels with {} disconnected and generating channel_updates.",
+                               log_pubkey!(counterparty_node_id));
                        if let Some(peer_state_mutex) = per_peer_state.get(counterparty_node_id) {
                                let mut peer_state_lock = peer_state_mutex.lock().unwrap();
                                let peer_state = &mut *peer_state_lock;
@@ -6265,8 +6231,6 @@ where
                                                update_maps_on_chan_removal!(self, chan);
                                                self.issue_channel_close_events(chan, ClosureReason::DisconnectedPeer);
                                                return false;
-                                       } else {
-                                               no_channels_remain = false;
                                        }
                                        true
                                });
@@ -6286,6 +6250,7 @@ where
                                                &events::MessageSendEvent::SendChannelAnnouncement { .. } => false,
                                                &events::MessageSendEvent::BroadcastChannelAnnouncement { .. } => true,
                                                &events::MessageSendEvent::BroadcastChannelUpdate { .. } => true,
+                                               &events::MessageSendEvent::BroadcastNodeAnnouncement { .. } => true,
                                                &events::MessageSendEvent::SendChannelUpdate { .. } => false,
                                                &events::MessageSendEvent::HandleError { .. } => false,
                                                &events::MessageSendEvent::SendChannelRangeQuery { .. } => false,
@@ -6294,9 +6259,12 @@ where
                                                &events::MessageSendEvent::SendGossipTimestampFilter { .. } => false,
                                        }
                                });
-                       }
-               }
-               if no_channels_remain {
+                               debug_assert!(peer_state.is_connected, "A disconnected peer cannot disconnect");
+                               peer_state.is_connected = false;
+                               peer_state.ok_to_remove(true)
+                       } else { debug_assert!(false, "Unconnected peer disconnected"); true }
+               };
+               if remove_peer {
                        per_peer_state.remove(counterparty_node_id);
                }
                mem::drop(per_peer_state);
@@ -6306,34 +6274,57 @@ where
                }
        }
 
-       fn peer_connected(&self, counterparty_node_id: &PublicKey, init_msg: &msgs::Init) -> Result<(), ()> {
+       fn peer_connected(&self, counterparty_node_id: &PublicKey, init_msg: &msgs::Init, inbound: bool) -> Result<(), ()> {
                if !init_msg.features.supports_static_remote_key() {
-                       log_debug!(self.logger, "Peer {} does not support static remote key, disconnecting with no_connection_possible", log_pubkey!(counterparty_node_id));
+                       log_debug!(self.logger, "Peer {} does not support static remote key, disconnecting", log_pubkey!(counterparty_node_id));
                        return Err(());
                }
 
-               log_debug!(self.logger, "Generating channel_reestablish events for {}", log_pubkey!(counterparty_node_id));
-
                let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);
 
+               // If we have too many peers connected which don't have funded channels, disconnect the
+               // peer immediately (as long as it doesn't have funded channels). If we have a bunch of
+               // unfunded channels taking up space in memory for disconnected peers, we still let new
+               // peers connect, but we'll reject new channels from them.
+               let connected_peers_without_funded_channels = self.peers_without_funded_channels(|node| node.is_connected);
+               let inbound_peer_limited = inbound && connected_peers_without_funded_channels >= MAX_NO_CHANNEL_PEERS;
+
                {
                        let mut peer_state_lock = self.per_peer_state.write().unwrap();
                        match peer_state_lock.entry(counterparty_node_id.clone()) {
                                hash_map::Entry::Vacant(e) => {
+                                       if inbound_peer_limited {
+                                               return Err(());
+                                       }
                                        e.insert(Mutex::new(PeerState {
                                                channel_by_id: HashMap::new(),
                                                latest_features: init_msg.features.clone(),
                                                pending_msg_events: Vec::new(),
+                                               monitor_update_blocked_actions: BTreeMap::new(),
+                                               is_connected: true,
                                        }));
                                },
                                hash_map::Entry::Occupied(e) => {
-                                       e.get().lock().unwrap().latest_features = init_msg.features.clone();
+                                       let mut peer_state = e.get().lock().unwrap();
+                                       peer_state.latest_features = init_msg.features.clone();
+
+                                       let best_block_height = self.best_block.read().unwrap().height();
+                                       if inbound_peer_limited &&
+                                               Self::unfunded_channel_count(&*peer_state, best_block_height) ==
+                                               peer_state.channel_by_id.len()
+                                       {
+                                               return Err(());
+                                       }
+
+                                       debug_assert!(!peer_state.is_connected, "A peer shouldn't be connected twice");
+                                       peer_state.is_connected = true;
                                },
                        }
                }
 
-               let per_peer_state = self.per_peer_state.read().unwrap();
+               log_debug!(self.logger, "Generating channel_reestablish events for {}", log_pubkey!(counterparty_node_id));
 
+               let per_peer_state = self.per_peer_state.read().unwrap();
                for (_cp_id, peer_state_mutex) in per_peer_state.iter() {
                        let mut peer_state_lock = peer_state_mutex.lock().unwrap();
                        let peer_state = &mut *peer_state_lock;
@@ -6378,7 +6369,7 @@ where
                        let channel_ids: Vec<[u8; 32]> = {
                                let per_peer_state = self.per_peer_state.read().unwrap();
                                let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id);
-                               if let None = peer_state_mutex_opt { return; }
+                               if peer_state_mutex_opt.is_none() { return; }
                                let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap();
                                let peer_state = &mut *peer_state_lock;
                                peer_state.channel_by_id.keys().cloned().collect()
@@ -6392,7 +6383,7 @@ where
                                // First check if we can advance the channel type and try again.
                                let per_peer_state = self.per_peer_state.read().unwrap();
                                let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id);
-                               if let None = peer_state_mutex_opt { return; }
+                               if peer_state_mutex_opt.is_none() { return; }
                                let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap();
                                let peer_state = &mut *peer_state_lock;
                                if let Some(chan) = peer_state.channel_by_id.get_mut(&msg.channel_id) {
@@ -6731,7 +6722,7 @@ impl Writeable for ClaimableHTLC {
 
 impl Readable for ClaimableHTLC {
        fn read<R: Read>(reader: &mut R) -> Result<Self, DecodeError> {
-               let mut prev_hop = crate::util::ser::OptionDeserWrapper(None);
+               let mut prev_hop = crate::util::ser::RequiredWrapper(None);
                let mut value = 0;
                let mut payment_data: Option<msgs::FinalOnionHopData> = None;
                let mut cltv_expiry = 0;
@@ -6781,29 +6772,38 @@ impl Readable for HTLCSource {
                let id: u8 = Readable::read(reader)?;
                match id {
                        0 => {
-                               let mut session_priv: crate::util::ser::OptionDeserWrapper<SecretKey> = crate::util::ser::OptionDeserWrapper(None);
+                               let mut session_priv: crate::util::ser::RequiredWrapper<SecretKey> = crate::util::ser::RequiredWrapper(None);
                                let mut first_hop_htlc_msat: u64 = 0;
-                               let mut path = Some(Vec::new());
+                               let mut path: Option<Vec<RouteHop>> = Some(Vec::new());
                                let mut payment_id = None;
                                let mut payment_secret = None;
-                               let mut payment_params = None;
+                               let mut payment_params: Option<PaymentParameters> = None;
                                read_tlv_fields!(reader, {
                                        (0, session_priv, required),
                                        (1, payment_id, option),
                                        (2, first_hop_htlc_msat, required),
                                        (3, payment_secret, option),
                                        (4, path, vec_type),
-                                       (5, payment_params, option),
+                                       (5, payment_params, (option: ReadableArgs, 0)),
                                });
                                if payment_id.is_none() {
                                        // For backwards compat, if there was no payment_id written, use the session_priv bytes
                                        // instead.
                                        payment_id = Some(PaymentId(*session_priv.0.unwrap().as_ref()));
                                }
+                               if path.is_none() || path.as_ref().unwrap().is_empty() {
+                                       return Err(DecodeError::InvalidValue);
+                               }
+                               let path = path.unwrap();
+                               if let Some(params) = payment_params.as_mut() {
+                                       if params.final_cltv_expiry_delta == 0 {
+                                               params.final_cltv_expiry_delta = path.last().unwrap().cltv_expiry_delta;
+                                       }
+                               }
                                Ok(HTLCSource::OutboundRoute {
                                        session_priv: session_priv.0.unwrap(),
                                        first_hop_htlc_msat,
-                                       path: path.unwrap(),
+                                       path,
                                        payment_id: payment_id.unwrap(),
                                        payment_secret,
                                        payment_params,
@@ -6886,6 +6886,7 @@ where
                        best_block.block_hash().write(writer)?;
                }
 
+               let mut serializable_peer_count: u64 = 0;
                {
                        let per_peer_state = self.per_peer_state.read().unwrap();
                        let mut unfunded_channels = 0;
@@ -6893,6 +6894,9 @@ where
                        for (_, peer_state_mutex) in per_peer_state.iter() {
                                let mut peer_state_lock = peer_state_mutex.lock().unwrap();
                                let peer_state = &mut *peer_state_lock;
+                               if !peer_state.ok_to_remove(false) {
+                                       serializable_peer_count += 1;
+                               }
                                number_of_channels += peer_state.channel_by_id.len();
                                for (_, channel) in peer_state.channel_by_id.iter() {
                                        if !channel.is_funding_initiated() {
@@ -6943,11 +6947,30 @@ where
                        htlc_purposes.push(purpose);
                }
 
-               (per_peer_state.len() as u64).write(writer)?;
-               for (peer_pubkey, peer_state_mutex) in per_peer_state.iter() {
-                       peer_pubkey.write(writer)?;
-                       let peer_state = peer_state_mutex.lock().unwrap();
-                       peer_state.latest_features.write(writer)?;
+               let mut monitor_update_blocked_actions_per_peer = None;
+               let mut peer_states = Vec::new();
+               for (_, peer_state_mutex) in per_peer_state.iter() {
+                       // Because we're holding the owning `per_peer_state` write lock here there's no chance
+                       // of a lockorder violation deadlock - no other thread can be holding any
+                       // per_peer_state lock at all.
+                       peer_states.push(peer_state_mutex.unsafe_well_ordered_double_lock_self());
+               }
+
+               (serializable_peer_count).write(writer)?;
+               for ((peer_pubkey, _), peer_state) in per_peer_state.iter().zip(peer_states.iter()) {
+                       // Peers which we have no channels to should be dropped once disconnected. As we
+                       // disconnect all peers when shutting down and serializing the ChannelManager, we
+                       // consider all peers as disconnected here. There's therefore no need write peers with
+                       // no channels.
+                       if !peer_state.ok_to_remove(false) {
+                               peer_pubkey.write(writer)?;
+                               peer_state.latest_features.write(writer)?;
+                               if !peer_state.monitor_update_blocked_actions.is_empty() {
+                                       monitor_update_blocked_actions_per_peer
+                                               .get_or_insert_with(Vec::new)
+                                               .push((*peer_pubkey, &peer_state.monitor_update_blocked_actions));
+                               }
+                       }
                }
 
                let events = self.pending_events.lock().unwrap();
@@ -7024,8 +7047,6 @@ where
                        // LDK versions prior to 0.0.113 do not know how to read the pending claimed payments
                        // map. Thus, if there are no entries we skip writing a TLV for it.
                        pending_claiming_payments = None;
-               } else {
-                       debug_assert!(false, "While we have code to serialize pending_claiming_payments, the map should always be empty until a later PR");
                }
 
                write_tlv_fields!(writer, {
@@ -7034,6 +7055,7 @@ where
                        (3, pending_outbound_payments, required),
                        (4, pending_claiming_payments, option),
                        (5, self.our_network_pubkey, required),
+                       (6, monitor_update_blocked_actions_per_peer, option),
                        (7, self.fake_scid_rand_bytes, required),
                        (9, htlc_purposes, vec_type),
                        (11, self.probing_cookie_secret, required),
@@ -7346,6 +7368,8 @@ where
                                channel_by_id: peer_channels.remove(&peer_pubkey).unwrap_or(HashMap::new()),
                                latest_features: Readable::read(reader)?,
                                pending_msg_events: Vec::new(),
+                               monitor_update_blocked_actions: BTreeMap::new(),
+                               is_connected: false,
                        };
                        per_peer_state.insert(peer_pubkey, Mutex::new(peer_state));
                }
@@ -7401,12 +7425,14 @@ where
                let mut probing_cookie_secret: Option<[u8; 32]> = None;
                let mut claimable_htlc_purposes = None;
                let mut pending_claiming_payments = Some(HashMap::new());
+               let mut monitor_update_blocked_actions_per_peer = Some(Vec::new());
                read_tlv_fields!(reader, {
                        (1, pending_outbound_payments_no_retry, option),
                        (2, pending_intercepted_htlcs, option),
                        (3, pending_outbound_payments, option),
                        (4, pending_claiming_payments, option),
                        (5, received_network_pubkey, option),
+                       (6, monitor_update_blocked_actions_per_peer, option),
                        (7, fake_scid_rand_bytes, option),
                        (9, claimable_htlc_purposes, vec_type),
                        (11, probing_cookie_secret, option),
@@ -7515,7 +7541,11 @@ where
                        }
                }
 
-               if !forward_htlcs.is_empty() {
+               let pending_outbounds = OutboundPayments {
+                       pending_outbound_payments: Mutex::new(pending_outbound_payments.unwrap()),
+                       retry_lock: Mutex::new(())
+               };
+               if !forward_htlcs.is_empty() || pending_outbounds.needs_abandon() {
                        // If we have pending HTLCs to forward, assume we either dropped a
                        // `PendingHTLCsForwardable` or the user received it but never processed it as they
                        // shut down before the timer hit. Either way, set the time_forwardable to a small
@@ -7672,6 +7702,15 @@ where
                        }
                }
 
+               for (node_id, monitor_update_blocked_actions) in monitor_update_blocked_actions_per_peer.unwrap() {
+                       if let Some(peer_state) = per_peer_state.get_mut(&node_id) {
+                               peer_state.lock().unwrap().monitor_update_blocked_actions = monitor_update_blocked_actions;
+                       } else {
+                               log_error!(args.logger, "Got blocked actions without a per-peer-state for {}", node_id);
+                               return Err(DecodeError::InvalidValue);
+                       }
+               }
+
                let channel_manager = ChannelManager {
                        genesis_hash,
                        fee_estimator: bounded_fee_estimator,
@@ -7683,7 +7722,7 @@ where
 
                        inbound_payment_key: expanded_inbound_key,
                        pending_inbound_payments: Mutex::new(pending_inbound_payments),
-                       pending_outbound_payments: OutboundPayments { pending_outbound_payments: Mutex::new(pending_outbound_payments.unwrap()) },
+                       pending_outbound_payments: pending_outbounds,
                        pending_intercepted_htlcs: Mutex::new(pending_intercepted_htlcs.unwrap()),
 
                        forward_htlcs: Mutex::new(forward_htlcs),
@@ -7733,19 +7772,14 @@ where
 mod tests {
        use bitcoin::hashes::Hash;
        use bitcoin::hashes::sha256::Hash as Sha256;
-       use bitcoin::hashes::hex::FromHex;
        use bitcoin::secp256k1::{PublicKey, Secp256k1, SecretKey};
-       use bitcoin::secp256k1::ecdsa::Signature;
-       use bitcoin::secp256k1::ffi::Signature as FFISignature;
-       use bitcoin::blockdata::script::Script;
-       use bitcoin::Txid;
        use core::time::Duration;
        use core::sync::atomic::Ordering;
        use crate::ln::{PaymentPreimage, PaymentHash, PaymentSecret};
        use crate::ln::channelmanager::{inbound_payment, PaymentId, PaymentSendFailure, InterceptId};
        use crate::ln::functional_test_utils::*;
        use crate::ln::msgs;
-       use crate::ln::msgs::{ChannelMessageHandler, OptionalField};
+       use crate::ln::msgs::ChannelMessageHandler;
        use crate::routing::router::{PaymentParameters, RouteParameters, find_route};
        use crate::util::errors::APIError;
        use crate::util::events::{Event, HTLCDestination, MessageSendEvent, MessageSendEventsProvider, ClosureReason};
@@ -7851,7 +7885,7 @@ mod tests {
                // indicates there are more HTLCs coming.
                let cur_height = CHAN_CONFIRM_DEPTH + 1; // route_payment calls send_payment, which adds 1 to the current height. So we do the same here to match.
                let session_privs = nodes[0].node.test_add_new_pending_payment(our_payment_hash, Some(payment_secret), payment_id, &mpp_route).unwrap();
-               nodes[0].node.send_payment_along_path(&mpp_route.paths[0], &route.payment_params, &our_payment_hash, &Some(payment_secret), 200_000, cur_height, payment_id, &None, session_privs[0]).unwrap();
+               nodes[0].node.test_send_payment_along_path(&mpp_route.paths[0], &route.payment_params, &our_payment_hash, &Some(payment_secret), 200_000, cur_height, payment_id, &None, session_privs[0]).unwrap();
                check_added_monitors!(nodes[0], 1);
                let mut events = nodes[0].node.get_and_clear_pending_msg_events();
                assert_eq!(events.len(), 1);
@@ -7881,7 +7915,7 @@ mod tests {
                expect_payment_failed!(nodes[0], our_payment_hash, true);
 
                // Send the second half of the original MPP payment.
-               nodes[0].node.send_payment_along_path(&mpp_route.paths[1], &route.payment_params, &our_payment_hash, &Some(payment_secret), 200_000, cur_height, payment_id, &None, session_privs[1]).unwrap();
+               nodes[0].node.test_send_payment_along_path(&mpp_route.paths[1], &route.payment_params, &our_payment_hash, &Some(payment_secret), 200_000, cur_height, payment_id, &None, session_privs[1]).unwrap();
                check_added_monitors!(nodes[0], 1);
                let mut events = nodes[0].node.get_and_clear_pending_msg_events();
                assert_eq!(events.len(), 1);
@@ -7962,7 +7996,7 @@ mod tests {
                let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
                let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
                create_announced_chan_between_nodes(&nodes, 0, 1);
-               let scorer = test_utils::TestScorer::with_penalty(0);
+               let scorer = test_utils::TestScorer::new();
                let random_seed_bytes = chanmon_cfgs[1].keys_manager.get_secure_random_bytes();
 
                // To start (1), send a regular payment but don't claim it.
@@ -7973,7 +8007,6 @@ mod tests {
                let route_params = RouteParameters {
                        payment_params: PaymentParameters::for_keysend(expected_route.last().unwrap().node.get_our_node_id(), TEST_FINAL_CLTV),
                        final_value_msat: 100_000,
-                       final_cltv_expiry_delta: TEST_FINAL_CLTV,
                };
                let route = find_route(
                        &nodes[0].node.get_our_node_id(), &route_params, &nodes[0].network_graph,
@@ -8059,18 +8092,15 @@ mod tests {
 
                let payer_pubkey = nodes[0].node.get_our_node_id();
                let payee_pubkey = nodes[1].node.get_our_node_id();
-               nodes[0].node.peer_connected(&payee_pubkey, &msgs::Init { features: nodes[1].node.init_features(), remote_network_address: None }).unwrap();
-               nodes[1].node.peer_connected(&payer_pubkey, &msgs::Init { features: nodes[0].node.init_features(), remote_network_address: None }).unwrap();
 
                let _chan = create_chan_between_nodes(&nodes[0], &nodes[1]);
                let route_params = RouteParameters {
                        payment_params: PaymentParameters::for_keysend(payee_pubkey, 40),
                        final_value_msat: 10_000,
-                       final_cltv_expiry_delta: 40,
                };
                let network_graph = nodes[0].network_graph.clone();
                let first_hops = nodes[0].node.list_usable_channels();
-               let scorer = test_utils::TestScorer::with_penalty(0);
+               let scorer = test_utils::TestScorer::new();
                let random_seed_bytes = chanmon_cfgs[1].keys_manager.get_secure_random_bytes();
                let route = find_route(
                        &payer_pubkey, &route_params, &network_graph, Some(&first_hops.iter().collect::<Vec<_>>()),
@@ -8104,18 +8134,15 @@ mod tests {
 
                let payer_pubkey = nodes[0].node.get_our_node_id();
                let payee_pubkey = nodes[1].node.get_our_node_id();
-               nodes[0].node.peer_connected(&payee_pubkey, &msgs::Init { features: nodes[1].node.init_features(), remote_network_address: None }).unwrap();
-               nodes[1].node.peer_connected(&payer_pubkey, &msgs::Init { features: nodes[0].node.init_features(), remote_network_address: None }).unwrap();
 
                let _chan = create_chan_between_nodes(&nodes[0], &nodes[1]);
                let route_params = RouteParameters {
                        payment_params: PaymentParameters::for_keysend(payee_pubkey, 40),
                        final_value_msat: 10_000,
-                       final_cltv_expiry_delta: 40,
                };
                let network_graph = nodes[0].network_graph.clone();
                let first_hops = nodes[0].node.list_usable_channels();
-               let scorer = test_utils::TestScorer::with_penalty(0);
+               let scorer = test_utils::TestScorer::new();
                let random_seed_bytes = chanmon_cfgs[1].keys_manager.get_secure_random_bytes();
                let route = find_route(
                        &payer_pubkey, &route_params, &network_graph, Some(&first_hops.iter().collect::<Vec<_>>()),
@@ -8170,6 +8197,40 @@ mod tests {
                }
        }
 
+       #[test]
+       fn test_drop_disconnected_peers_when_removing_channels() {
+               let chanmon_cfgs = create_chanmon_cfgs(2);
+               let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
+               let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
+               let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
+
+               let chan = create_announced_chan_between_nodes(&nodes, 0, 1);
+
+               nodes[0].node.peer_disconnected(&nodes[1].node.get_our_node_id());
+               nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id());
+
+               nodes[0].node.force_close_broadcasting_latest_txn(&chan.2, &nodes[1].node.get_our_node_id()).unwrap();
+               check_closed_broadcast!(nodes[0], true);
+               check_added_monitors!(nodes[0], 1);
+               check_closed_event!(nodes[0], 1, ClosureReason::HolderForceClosed);
+
+               {
+                       // Assert that nodes[1] is awaiting removal for nodes[0] once nodes[1] has been
+                       // disconnected and the channel between has been force closed.
+                       let nodes_0_per_peer_state = nodes[0].node.per_peer_state.read().unwrap();
+                       // Assert that nodes[1] isn't removed before `timer_tick_occurred` has been executed.
+                       assert_eq!(nodes_0_per_peer_state.len(), 1);
+                       assert!(nodes_0_per_peer_state.get(&nodes[1].node.get_our_node_id()).is_some());
+               }
+
+               nodes[0].node.timer_tick_occurred();
+
+               {
+                       // Assert that nodes[1] has now been removed.
+                       assert_eq!(nodes[0].node.per_peer_state.read().unwrap().len(), 0);
+               }
+       }
+
        #[test]
        fn bad_inbound_payment_hash() {
                // Add coverage for checking that a user-provided payment hash matches the payment secret.
@@ -8231,10 +8292,10 @@ mod tests {
                        let nodes_0_lock = nodes[0].node.id_to_peer.lock().unwrap();
                        assert_eq!(nodes_0_lock.len(), 1);
                        assert!(nodes_0_lock.contains_key(channel_id));
-
-                       assert_eq!(nodes[1].node.id_to_peer.lock().unwrap().len(), 0);
                }
 
+               assert_eq!(nodes[1].node.id_to_peer.lock().unwrap().len(), 0);
+
                let funding_created_msg = get_event_msg!(nodes[0], MessageSendEvent::SendFundingCreated, nodes[1].node.get_our_node_id());
 
                nodes[1].node.handle_funding_created(&nodes[0].node.get_our_node_id(), &funding_created_msg);
@@ -8242,7 +8303,9 @@ mod tests {
                        let nodes_0_lock = nodes[0].node.id_to_peer.lock().unwrap();
                        assert_eq!(nodes_0_lock.len(), 1);
                        assert!(nodes_0_lock.contains_key(channel_id));
+               }
 
+               {
                        // Assert that `nodes[1]`'s `id_to_peer` map is populated with the channel as soon as
                        // as it has the funding transaction.
                        let nodes_1_lock = nodes[1].node.id_to_peer.lock().unwrap();
@@ -8272,7 +8335,9 @@ mod tests {
                        let nodes_0_lock = nodes[0].node.id_to_peer.lock().unwrap();
                        assert_eq!(nodes_0_lock.len(), 1);
                        assert!(nodes_0_lock.contains_key(channel_id));
+               }
 
+               {
                        // At this stage, `nodes[1]` has proposed a fee for the closing transaction in the
                        // `handle_closing_signed` call above. As `nodes[1]` has not yet received the signature
                        // from `nodes[0]` for the closing transaction with the proposed fee, the channel is
@@ -8314,19 +8379,22 @@ mod tests {
 
        fn check_not_connected_to_peer_error<T>(res_err: Result<T, APIError>, expected_public_key: PublicKey) {
                let expected_message = format!("Not connected to node: {}", expected_public_key);
-               check_api_misuse_error_message(expected_message, res_err)
+               check_api_error_message(expected_message, res_err)
        }
 
        fn check_unkown_peer_error<T>(res_err: Result<T, APIError>, expected_public_key: PublicKey) {
                let expected_message = format!("Can't find a peer matching the passed counterparty node_id {}", expected_public_key);
-               check_api_misuse_error_message(expected_message, res_err)
+               check_api_error_message(expected_message, res_err)
        }
 
-       fn check_api_misuse_error_message<T>(expected_err_message: String, res_err: Result<T, APIError>) {
+       fn check_api_error_message<T>(expected_err_message: String, res_err: Result<T, APIError>) {
                match res_err {
                        Err(APIError::APIMisuseError { err }) => {
                                assert_eq!(err, expected_err_message);
                        },
+                       Err(APIError::ChannelUnavailable { err }) => {
+                               assert_eq!(err, expected_err_message);
+                       },
                        Ok(_) => panic!("Unexpected Ok"),
                        Err(_) => panic!("Unexpected Error"),
                }
@@ -8334,170 +8402,240 @@ mod tests {
 
        #[test]
        fn test_api_calls_with_unkown_counterparty_node() {
-               // Tests that our API functions and message handlers that expects a `counterparty_node_id`
-               // as input, behaves as expected if the `counterparty_node_id` is an unkown peer in the
+               // Tests that our API functions that expects a `counterparty_node_id` as input, behaves as
+               // expected if the `counterparty_node_id` is an unkown peer in the
                // `ChannelManager::per_peer_state` map.
                let chanmon_cfg = create_chanmon_cfgs(2);
                let node_cfg = create_node_cfgs(2, &chanmon_cfg);
                let node_chanmgr = create_node_chanmgrs(2, &node_cfg, &[None, None]);
                let nodes = create_network(2, &node_cfg, &node_chanmgr);
 
-               // Boilerplate code to produce `open_channel` and `accept_channel` msgs more densly than
-               // creating dummy ones.
-               nodes[0].node.create_channel(nodes[1].node.get_our_node_id(), 1_000_000, 500_000_000, 42, None).unwrap();
-               let open_channel_msg = get_event_msg!(nodes[0], MessageSendEvent::SendOpenChannel, nodes[1].node.get_our_node_id());
-               nodes[1].node.handle_open_channel(&nodes[0].node.get_our_node_id(), &open_channel_msg);
-               let accept_channel_msg = get_event_msg!(nodes[1], MessageSendEvent::SendAcceptChannel, nodes[0].node.get_our_node_id());
-
                // Dummy values
                let channel_id = [4; 32];
-               let signature = Signature::from(unsafe { FFISignature::new() });
                let unkown_public_key = PublicKey::from_secret_key(&Secp256k1::signing_only(), &SecretKey::from_slice(&[42; 32]).unwrap());
                let intercept_id = InterceptId([0; 32]);
 
-               // Dummy msgs
-               let funding_created_msg = msgs::FundingCreated {
-                       temporary_channel_id: open_channel_msg.temporary_channel_id,
-                       funding_txid: Txid::from_hex("ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff").unwrap(),
-                       funding_output_index: 0,
-                       signature: signature,
-               };
-
-               let funding_signed_msg = msgs::FundingSigned {
-                       channel_id: channel_id,
-                       signature: signature,
-               };
-
-               let channel_ready_msg = msgs::ChannelReady {
-                       channel_id: channel_id,
-                       next_per_commitment_point: unkown_public_key,
-                       short_channel_id_alias: None,
-               };
-
-               let announcement_signatures_msg = msgs::AnnouncementSignatures {
-                       channel_id: channel_id,
-                       short_channel_id: 0,
-                       node_signature: signature,
-                       bitcoin_signature: signature,
-               };
-
-               let channel_reestablish_msg = msgs::ChannelReestablish {
-                       channel_id: channel_id,
-                       next_local_commitment_number: 0,
-                       next_remote_commitment_number: 0,
-                       data_loss_protect: OptionalField::Absent,
-               };
-
-               let closing_signed_msg = msgs::ClosingSigned {
-                       channel_id: channel_id,
-                       fee_satoshis: 1000,
-                       signature: signature,
-                       fee_range: None,
-               };
-
-               let shutdown_msg = msgs::Shutdown {
-                       channel_id: channel_id,
-                       scriptpubkey: Script::new(),
-               };
-
-               let onion_routing_packet = msgs::OnionPacket {
-                       version: 255,
-                       public_key: Ok(unkown_public_key),
-                       hop_data: [1; 20*65],
-                       hmac: [2; 32]
-               };
-
-               let update_add_htlc_msg = msgs::UpdateAddHTLC {
-                       channel_id: channel_id,
-                       htlc_id: 0,
-                       amount_msat: 1000000,
-                       payment_hash: PaymentHash([1; 32]),
-                       cltv_expiry: 821716,
-                       onion_routing_packet
-               };
-
-               let commitment_signed_msg = msgs::CommitmentSigned {
-                       channel_id: channel_id,
-                       signature: signature,
-                       htlc_signatures: Vec::new(),
-               };
+               // Test the API functions.
+               check_not_connected_to_peer_error(nodes[0].node.create_channel(unkown_public_key, 1_000_000, 500_000_000, 42, None), unkown_public_key);
 
-               let update_fee_msg = msgs::UpdateFee {
-                       channel_id: channel_id,
-                       feerate_per_kw: 1000,
-               };
+               check_unkown_peer_error(nodes[0].node.accept_inbound_channel(&channel_id, &unkown_public_key, 42), unkown_public_key);
 
-               let malformed_update_msg = msgs::UpdateFailMalformedHTLC{
-                       channel_id: channel_id,
-                       htlc_id: 0,
-                       sha256_of_onion: [1; 32],
-                       failure_code: 0x8000,
-               };
+               check_unkown_peer_error(nodes[0].node.close_channel(&channel_id, &unkown_public_key), unkown_public_key);
 
-               let fulfill_update_msg = msgs::UpdateFulfillHTLC{
-                       channel_id: channel_id,
-                       htlc_id: 0,
-                       payment_preimage: PaymentPreimage([1; 32]),
-               };
+               check_unkown_peer_error(nodes[0].node.force_close_broadcasting_latest_txn(&channel_id, &unkown_public_key), unkown_public_key);
 
-               let fail_update_msg = msgs::UpdateFailHTLC{
-                       channel_id: channel_id,
-                       htlc_id: 0,
-                       reason: msgs::OnionErrorPacket { data: Vec::new()},
-               };
+               check_unkown_peer_error(nodes[0].node.force_close_without_broadcasting_txn(&channel_id, &unkown_public_key), unkown_public_key);
 
-               let revoke_and_ack_msg = msgs::RevokeAndACK {
-                       channel_id: channel_id,
-                       per_commitment_secret: [1; 32],
-                       next_per_commitment_point: unkown_public_key,
-               };
+               check_unkown_peer_error(nodes[0].node.forward_intercepted_htlc(intercept_id, &channel_id, unkown_public_key, 1_000_000), unkown_public_key);
 
-               // Test the API functions and message handlers.
-               check_not_connected_to_peer_error(nodes[0].node.create_channel(unkown_public_key, 1_000_000, 500_000_000, 42, None), unkown_public_key);
+               check_unkown_peer_error(nodes[0].node.update_channel_config(&unkown_public_key, &[channel_id], &ChannelConfig::default()), unkown_public_key);
+       }
 
-               nodes[1].node.handle_open_channel(&unkown_public_key, &open_channel_msg);
+       #[test]
+       fn test_connection_limiting() {
+               // Test that we limit un-channel'd peers and un-funded channels properly.
+               let chanmon_cfgs = create_chanmon_cfgs(2);
+               let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
+               let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
+               let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
 
-               nodes[0].node.handle_accept_channel(&unkown_public_key, &accept_channel_msg);
+               // Note that create_network connects the nodes together for us
 
-               check_unkown_peer_error(nodes[0].node.accept_inbound_channel(&open_channel_msg.temporary_channel_id, &unkown_public_key, 42), unkown_public_key);
+               nodes[0].node.create_channel(nodes[1].node.get_our_node_id(), 100_000, 0, 42, None).unwrap();
+               let mut open_channel_msg = get_event_msg!(nodes[0], MessageSendEvent::SendOpenChannel, nodes[1].node.get_our_node_id());
 
-               nodes[1].node.handle_funding_created(&unkown_public_key, &funding_created_msg);
+               let mut funding_tx = None;
+               for idx in 0..super::MAX_UNFUNDED_CHANS_PER_PEER {
+                       nodes[1].node.handle_open_channel(&nodes[0].node.get_our_node_id(), &open_channel_msg);
+                       let accept_channel = get_event_msg!(nodes[1], MessageSendEvent::SendAcceptChannel, nodes[0].node.get_our_node_id());
 
-               nodes[0].node.handle_funding_signed(&unkown_public_key, &funding_signed_msg);
+                       if idx == 0 {
+                               nodes[0].node.handle_accept_channel(&nodes[1].node.get_our_node_id(), &accept_channel);
+                               let (temporary_channel_id, tx, _) = create_funding_transaction(&nodes[0], &nodes[1].node.get_our_node_id(), 100_000, 42);
+                               funding_tx = Some(tx.clone());
+                               nodes[0].node.funding_transaction_generated(&temporary_channel_id, &nodes[1].node.get_our_node_id(), tx).unwrap();
+                               let funding_created_msg = get_event_msg!(nodes[0], MessageSendEvent::SendFundingCreated, nodes[1].node.get_our_node_id());
 
-               nodes[0].node.handle_channel_ready(&unkown_public_key, &channel_ready_msg);
+                               nodes[1].node.handle_funding_created(&nodes[0].node.get_our_node_id(), &funding_created_msg);
+                               check_added_monitors!(nodes[1], 1);
+                               let funding_signed = get_event_msg!(nodes[1], MessageSendEvent::SendFundingSigned, nodes[0].node.get_our_node_id());
 
-               nodes[1].node.handle_announcement_signatures(&unkown_public_key, &announcement_signatures_msg);
+                               nodes[0].node.handle_funding_signed(&nodes[1].node.get_our_node_id(), &funding_signed);
+                               check_added_monitors!(nodes[0], 1);
+                       }
+                       open_channel_msg.temporary_channel_id = nodes[0].keys_manager.get_secure_random_bytes();
+               }
 
-               check_unkown_peer_error(nodes[0].node.close_channel(&channel_id, &unkown_public_key), unkown_public_key);
+               // A MAX_UNFUNDED_CHANS_PER_PEER + 1 channel will be summarily rejected
+               open_channel_msg.temporary_channel_id = nodes[0].keys_manager.get_secure_random_bytes();
+               nodes[1].node.handle_open_channel(&nodes[0].node.get_our_node_id(), &open_channel_msg);
+               assert_eq!(get_err_msg!(nodes[1], nodes[0].node.get_our_node_id()).channel_id,
+                       open_channel_msg.temporary_channel_id);
+
+               // Further, because all of our channels with nodes[0] are inbound, and none of them funded,
+               // it doesn't count as a "protected" peer, i.e. it counts towards the MAX_NO_CHANNEL_PEERS
+               // limit.
+               let mut peer_pks = Vec::with_capacity(super::MAX_NO_CHANNEL_PEERS);
+               for _ in 1..super::MAX_NO_CHANNEL_PEERS {
+                       let random_pk = PublicKey::from_secret_key(&nodes[0].node.secp_ctx,
+                               &SecretKey::from_slice(&nodes[1].keys_manager.get_secure_random_bytes()).unwrap());
+                       peer_pks.push(random_pk);
+                       nodes[1].node.peer_connected(&random_pk, &msgs::Init {
+                               features: nodes[0].node.init_features(), remote_network_address: None }, true).unwrap();
+               }
+               let last_random_pk = PublicKey::from_secret_key(&nodes[0].node.secp_ctx,
+                       &SecretKey::from_slice(&nodes[1].keys_manager.get_secure_random_bytes()).unwrap());
+               nodes[1].node.peer_connected(&last_random_pk, &msgs::Init {
+                       features: nodes[0].node.init_features(), remote_network_address: None }, true).unwrap_err();
+
+               // Also importantly, because nodes[0] isn't "protected", we will refuse a reconnection from
+               // them if we have too many un-channel'd peers.
+               nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id());
+               let chan_closed_events = nodes[1].node.get_and_clear_pending_events();
+               assert_eq!(chan_closed_events.len(), super::MAX_UNFUNDED_CHANS_PER_PEER - 1);
+               for ev in chan_closed_events {
+                       if let Event::ChannelClosed { .. } = ev { } else { panic!(); }
+               }
+               nodes[1].node.peer_connected(&last_random_pk, &msgs::Init {
+                       features: nodes[0].node.init_features(), remote_network_address: None }, true).unwrap();
+               nodes[1].node.peer_connected(&nodes[0].node.get_our_node_id(), &msgs::Init {
+                       features: nodes[0].node.init_features(), remote_network_address: None }, true).unwrap_err();
+
+               // but of course if the connection is outbound its allowed...
+               nodes[1].node.peer_connected(&nodes[0].node.get_our_node_id(), &msgs::Init {
+                       features: nodes[0].node.init_features(), remote_network_address: None }, false).unwrap();
+               nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id());
+
+               // Now nodes[0] is disconnected but still has a pending, un-funded channel lying around.
+               // Even though we accept one more connection from new peers, we won't actually let them
+               // open channels.
+               assert!(peer_pks.len() > super::MAX_UNFUNDED_CHANNEL_PEERS - 1);
+               for i in 0..super::MAX_UNFUNDED_CHANNEL_PEERS - 1 {
+                       nodes[1].node.handle_open_channel(&peer_pks[i], &open_channel_msg);
+                       get_event_msg!(nodes[1], MessageSendEvent::SendAcceptChannel, peer_pks[i]);
+                       open_channel_msg.temporary_channel_id = nodes[0].keys_manager.get_secure_random_bytes();
+               }
+               nodes[1].node.handle_open_channel(&last_random_pk, &open_channel_msg);
+               assert_eq!(get_err_msg!(nodes[1], last_random_pk).channel_id,
+                       open_channel_msg.temporary_channel_id);
+
+               // Of course, however, outbound channels are always allowed
+               nodes[1].node.create_channel(last_random_pk, 100_000, 0, 42, None).unwrap();
+               get_event_msg!(nodes[1], MessageSendEvent::SendOpenChannel, last_random_pk);
+
+               // If we fund the first channel, nodes[0] has a live on-chain channel with us, it is now
+               // "protected" and can connect again.
+               mine_transaction(&nodes[1], funding_tx.as_ref().unwrap());
+               nodes[1].node.peer_connected(&nodes[0].node.get_our_node_id(), &msgs::Init {
+                       features: nodes[0].node.init_features(), remote_network_address: None }, true).unwrap();
+               get_event_msg!(nodes[1], MessageSendEvent::SendChannelReestablish, nodes[0].node.get_our_node_id());
+
+               // Further, because the first channel was funded, we can open another channel with
+               // last_random_pk.
+               nodes[1].node.handle_open_channel(&last_random_pk, &open_channel_msg);
+               get_event_msg!(nodes[1], MessageSendEvent::SendAcceptChannel, last_random_pk);
+       }
 
-               check_unkown_peer_error(nodes[0].node.force_close_broadcasting_latest_txn(&channel_id, &unkown_public_key), unkown_public_key);
+       #[test]
+       fn test_outbound_chans_unlimited() {
+               // Test that we never refuse an outbound channel even if a peer is unfuned-channel-limited
+               let chanmon_cfgs = create_chanmon_cfgs(2);
+               let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
+               let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
+               let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
 
-               check_unkown_peer_error(nodes[0].node.force_close_without_broadcasting_txn(&channel_id, &unkown_public_key), unkown_public_key);
+               // Note that create_network connects the nodes together for us
 
-               check_unkown_peer_error(nodes[0].node.forward_intercepted_htlc(intercept_id, &channel_id, unkown_public_key, 1_000_000), unkown_public_key);
+               nodes[0].node.create_channel(nodes[1].node.get_our_node_id(), 100_000, 0, 42, None).unwrap();
+               let mut open_channel_msg = get_event_msg!(nodes[0], MessageSendEvent::SendOpenChannel, nodes[1].node.get_our_node_id());
 
-               check_unkown_peer_error(nodes[0].node.update_channel_config(&unkown_public_key, &[channel_id], &ChannelConfig::default()), unkown_public_key);
+               for _ in 0..super::MAX_UNFUNDED_CHANS_PER_PEER {
+                       nodes[1].node.handle_open_channel(&nodes[0].node.get_our_node_id(), &open_channel_msg);
+                       get_event_msg!(nodes[1], MessageSendEvent::SendAcceptChannel, nodes[0].node.get_our_node_id());
+                       open_channel_msg.temporary_channel_id = nodes[0].keys_manager.get_secure_random_bytes();
+               }
 
-               nodes[0].node.handle_shutdown(&unkown_public_key, &shutdown_msg);
+               // Once we have MAX_UNFUNDED_CHANS_PER_PEER unfunded channels, new inbound channels will be
+               // rejected.
+               nodes[1].node.handle_open_channel(&nodes[0].node.get_our_node_id(), &open_channel_msg);
+               assert_eq!(get_err_msg!(nodes[1], nodes[0].node.get_our_node_id()).channel_id,
+                       open_channel_msg.temporary_channel_id);
 
-               nodes[1].node.handle_closing_signed(&unkown_public_key, &closing_signed_msg);
+               // but we can still open an outbound channel.
+               nodes[1].node.create_channel(nodes[0].node.get_our_node_id(), 100_000, 0, 42, None).unwrap();
+               get_event_msg!(nodes[1], MessageSendEvent::SendOpenChannel, nodes[0].node.get_our_node_id());
 
-               nodes[0].node.handle_channel_reestablish(&unkown_public_key, &channel_reestablish_msg);
+               // but even with such an outbound channel, additional inbound channels will still fail.
+               nodes[1].node.handle_open_channel(&nodes[0].node.get_our_node_id(), &open_channel_msg);
+               assert_eq!(get_err_msg!(nodes[1], nodes[0].node.get_our_node_id()).channel_id,
+                       open_channel_msg.temporary_channel_id);
+       }
 
-               nodes[1].node.handle_update_add_htlc(&unkown_public_key, &update_add_htlc_msg);
+       #[test]
+       fn test_0conf_limiting() {
+               // Tests that we properly limit inbound channels when we have the manual-channel-acceptance
+               // flag set and (sometimes) accept channels as 0conf.
+               let chanmon_cfgs = create_chanmon_cfgs(2);
+               let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
+               let mut settings = test_default_channel_config();
+               settings.manually_accept_inbound_channels = true;
+               let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, Some(settings)]);
+               let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
 
-               nodes[1].node.handle_commitment_signed(&unkown_public_key, &commitment_signed_msg);
+               // Note that create_network connects the nodes together for us
 
-               nodes[1].node.handle_update_fail_malformed_htlc(&unkown_public_key, &malformed_update_msg);
+               nodes[0].node.create_channel(nodes[1].node.get_our_node_id(), 100_000, 0, 42, None).unwrap();
+               let mut open_channel_msg = get_event_msg!(nodes[0], MessageSendEvent::SendOpenChannel, nodes[1].node.get_our_node_id());
 
-               nodes[1].node.handle_update_fail_htlc(&unkown_public_key, &fail_update_msg);
+               // First, get us up to MAX_UNFUNDED_CHANNEL_PEERS so we can test at the edge
+               for _ in 0..super::MAX_UNFUNDED_CHANNEL_PEERS - 1 {
+                       let random_pk = PublicKey::from_secret_key(&nodes[0].node.secp_ctx,
+                               &SecretKey::from_slice(&nodes[1].keys_manager.get_secure_random_bytes()).unwrap());
+                       nodes[1].node.peer_connected(&random_pk, &msgs::Init {
+                               features: nodes[0].node.init_features(), remote_network_address: None }, true).unwrap();
 
-               nodes[1].node.handle_update_fulfill_htlc(&unkown_public_key, &fulfill_update_msg);
+                       nodes[1].node.handle_open_channel(&random_pk, &open_channel_msg);
+                       let events = nodes[1].node.get_and_clear_pending_events();
+                       match events[0] {
+                               Event::OpenChannelRequest { temporary_channel_id, .. } => {
+                                       nodes[1].node.accept_inbound_channel(&temporary_channel_id, &random_pk, 23).unwrap();
+                               }
+                               _ => panic!("Unexpected event"),
+                       }
+                       get_event_msg!(nodes[1], MessageSendEvent::SendAcceptChannel, random_pk);
+                       open_channel_msg.temporary_channel_id = nodes[0].keys_manager.get_secure_random_bytes();
+               }
 
-               nodes[1].node.handle_revoke_and_ack(&unkown_public_key, &revoke_and_ack_msg);
+               // If we try to accept a channel from another peer non-0conf it will fail.
+               let last_random_pk = PublicKey::from_secret_key(&nodes[0].node.secp_ctx,
+                       &SecretKey::from_slice(&nodes[1].keys_manager.get_secure_random_bytes()).unwrap());
+               nodes[1].node.peer_connected(&last_random_pk, &msgs::Init {
+                       features: nodes[0].node.init_features(), remote_network_address: None }, true).unwrap();
+               nodes[1].node.handle_open_channel(&last_random_pk, &open_channel_msg);
+               let events = nodes[1].node.get_and_clear_pending_events();
+               match events[0] {
+                       Event::OpenChannelRequest { temporary_channel_id, .. } => {
+                               match nodes[1].node.accept_inbound_channel(&temporary_channel_id, &last_random_pk, 23) {
+                                       Err(APIError::APIMisuseError { err }) =>
+                                               assert_eq!(err, "Too many peers with unfunded channels, refusing to accept new ones"),
+                                       _ => panic!(),
+                               }
+                       }
+                       _ => panic!("Unexpected event"),
+               }
+               assert_eq!(get_err_msg!(nodes[1], last_random_pk).channel_id,
+                       open_channel_msg.temporary_channel_id);
 
-               nodes[1].node.handle_update_fee(&unkown_public_key, &update_fee_msg);
+               // ...however if we accept the same channel 0conf it should work just fine.
+               nodes[1].node.handle_open_channel(&last_random_pk, &open_channel_msg);
+               let events = nodes[1].node.get_and_clear_pending_events();
+               match events[0] {
+                       Event::OpenChannelRequest { temporary_channel_id, .. } => {
+                               nodes[1].node.accept_inbound_channel_from_trusted_peer_0conf(&temporary_channel_id, &last_random_pk, 23).unwrap();
+                       }
+                       _ => panic!("Unexpected event"),
+               }
+               get_event_msg!(nodes[1], MessageSendEvent::SendAcceptChannel, last_random_pk);
        }
 
        #[cfg(anchors)]
@@ -8580,12 +8718,12 @@ pub mod bench {
                // Note that this is unrealistic as each payment send will require at least two fsync
                // calls per node.
                let network = bitcoin::Network::Testnet;
-               let genesis_hash = bitcoin::blockdata::constants::genesis_block(network).header.block_hash();
 
                let tx_broadcaster = test_utils::TestBroadcaster{txn_broadcasted: Mutex::new(Vec::new()), blocks: Arc::new(Mutex::new(Vec::new()))};
                let fee_estimator = test_utils::TestFeeEstimator { sat_per_kw: Mutex::new(253) };
                let logger_a = test_utils::TestLogger::with_id("node a".to_owned());
-               let router = test_utils::TestRouter::new(Arc::new(NetworkGraph::new(genesis_hash, &logger_a)));
+               let scorer = Mutex::new(test_utils::TestScorer::new());
+               let router = test_utils::TestRouter::new(Arc::new(NetworkGraph::new(network, &logger_a)), &scorer);
 
                let mut config: UserConfig = Default::default();
                config.channel_handshake_config.minimum_depth = 1;
@@ -8595,7 +8733,7 @@ pub mod bench {
                let keys_manager_a = KeysManager::new(&seed_a, 42, 42);
                let node_a = ChannelManager::new(&fee_estimator, &chain_monitor_a, &tx_broadcaster, &router, &logger_a, &keys_manager_a, &keys_manager_a, &keys_manager_a, config.clone(), ChainParameters {
                        network,
-                       best_block: BestBlock::from_genesis(network),
+                       best_block: BestBlock::from_network(network),
                });
                let node_a_holder = NodeHolder { node: &node_a };
 
@@ -8605,12 +8743,12 @@ pub mod bench {
                let keys_manager_b = KeysManager::new(&seed_b, 42, 42);
                let node_b = ChannelManager::new(&fee_estimator, &chain_monitor_b, &tx_broadcaster, &router, &logger_b, &keys_manager_b, &keys_manager_b, &keys_manager_b, config.clone(), ChainParameters {
                        network,
-                       best_block: BestBlock::from_genesis(network),
+                       best_block: BestBlock::from_network(network),
                });
                let node_b_holder = NodeHolder { node: &node_b };
 
-               node_a.peer_connected(&node_b.get_our_node_id(), &Init { features: node_b.init_features(), remote_network_address: None }).unwrap();
-               node_b.peer_connected(&node_a.get_our_node_id(), &Init { features: node_a.init_features(), remote_network_address: None }).unwrap();
+               node_a.peer_connected(&node_b.get_our_node_id(), &Init { features: node_b.init_features(), remote_network_address: None }, true).unwrap();
+               node_b.peer_connected(&node_a.get_our_node_id(), &Init { features: node_a.init_features(), remote_network_address: None }, false).unwrap();
                node_a.create_channel(node_b.get_our_node_id(), 8_000_000, 100_000_000, 42, None).unwrap();
                node_b.handle_open_channel(&node_a.get_our_node_id(), &get_event_msg!(node_a_holder, MessageSendEvent::SendOpenChannel, node_b.get_our_node_id()));
                node_a.handle_accept_channel(&node_b.get_our_node_id(), &get_event_msg!(node_b_holder, MessageSendEvent::SendAcceptChannel, node_a.get_our_node_id()));
@@ -8629,7 +8767,7 @@ pub mod bench {
                assert_eq!(&tx_broadcaster.txn_broadcasted.lock().unwrap()[..], &[tx.clone()]);
 
                let block = Block {
-                       header: BlockHeader { version: 0x20000000, prev_blockhash: genesis_hash, merkle_root: TxMerkleNode::all_zeros(), time: 42, bits: 42, nonce: 42 },
+                       header: BlockHeader { version: 0x20000000, prev_blockhash: BestBlock::from_network(network).block_hash(), merkle_root: TxMerkleNode::all_zeros(), time: 42, bits: 42, nonce: 42 },
                        txdata: vec![tx],
                };
                Listen::block_connected(&node_a, &block, 1);
@@ -8668,7 +8806,7 @@ pub mod bench {
                        _ => panic!("Unexpected event"),
                }
 
-               let dummy_graph = NetworkGraph::new(genesis_hash, &logger_a);
+               let dummy_graph = NetworkGraph::new(network, &logger_a);
 
                let mut payment_count: u64 = 0;
                macro_rules! send_payment {
@@ -8676,7 +8814,7 @@ pub mod bench {
                                let usable_channels = $node_a.list_usable_channels();
                                let payment_params = PaymentParameters::from_node_id($node_b.get_our_node_id(), TEST_FINAL_CLTV)
                                        .with_features($node_b.invoice_features());
-                               let scorer = test_utils::TestScorer::with_penalty(0);
+                               let scorer = test_utils::TestScorer::new();
                                let seed = [3u8; 32];
                                let keys_manager = KeysManager::new(&seed, 42, 42);
                                let random_seed_bytes = keys_manager.get_secure_random_bytes();