Merge pull request #2014 from valentinewallace/2023-02-rework-partial-pmt-fail
authorMatt Corallo <649246+TheBlueMatt@users.noreply.github.com>
Thu, 23 Feb 2023 21:54:16 +0000 (21:54 +0000)
committerGitHub <noreply@github.com>
Thu, 23 Feb 2023 21:54:16 +0000 (21:54 +0000)
Rework auto-retry send errors

1  2 
lightning/src/ln/channelmanager.rs
lightning/src/ln/payment_tests.rs

index 61de296c1442f3096fcb8b88efbd341157c4de6d,3fd472925cf6948ec56a88986ed97a1368e99963..577e0984448a257a2ff9a23f305aa2684ef968dc
@@@ -65,8 -65,6 +65,8 @@@ use crate::util::ser::{BigSize, FixedLe
  use crate::util::logger::{Level, Logger};
  use crate::util::errors::APIError;
  
 +use alloc::collections::BTreeMap;
 +
  use crate::io;
  use crate::prelude::*;
  use core::{cmp, mem};
@@@ -78,7 -76,7 +78,7 @@@ use core::time::Duration
  use core::ops::Deref;
  
  // Re-export this for use in the public API.
- pub use crate::ln::outbound_payment::{PaymentSendFailure, Retry};
+ pub use crate::ln::outbound_payment::{PaymentSendFailure, Retry, RetryableSendFailure};
  
  // We hold various information about HTLC relay in the HTLC objects in Channel itself:
  //
@@@ -345,6 -343,17 +345,6 @@@ impl MsgHandleErrInternal 
                }
        }
        #[inline]
 -      fn ignore_no_close(err: String) -> Self {
 -              Self {
 -                      err: LightningError {
 -                              err,
 -                              action: msgs::ErrorAction::IgnoreError,
 -                      },
 -                      chan_id: None,
 -                      shutdown_finish: None,
 -              }
 -      }
 -      #[inline]
        fn from_no_close(err: msgs::LightningError) -> Self {
                Self { err, chan_id: None, shutdown_finish: None }
        }
@@@ -455,7 -464,6 +455,7 @@@ enum BackgroundEvent 
        ClosingMonitorUpdate((OutPoint, ChannelMonitorUpdate)),
  }
  
 +#[derive(Debug)]
  pub(crate) enum MonitorUpdateCompletionAction {
        /// Indicates that a payment ultimately destined for us was claimed and we should emit an
        /// [`events::Event::PaymentClaimed`] to the user if we haven't yet generated such an event for
        EmitEvent { event: events::Event },
  }
  
 +impl_writeable_tlv_based_enum_upgradable!(MonitorUpdateCompletionAction,
 +      (0, PaymentClaimed) => { (0, payment_hash, required) },
 +      (2, EmitEvent) => { (0, event, ignorable) },
 +);
 +
  /// State we hold per-peer.
  pub(super) struct PeerState<Signer: ChannelSigner> {
        /// `temporary_channel_id` or `channel_id` -> `channel`.
        /// Messages to send to the peer - pushed to in the same lock that they are generated in (except
        /// for broadcast messages, where ordering isn't as strict).
        pub(super) pending_msg_events: Vec<MessageSendEvent>,
 +      /// Map from a specific channel to some action(s) that should be taken when all pending
 +      /// [`ChannelMonitorUpdate`]s for the channel complete updating.
 +      ///
 +      /// Note that because we generally only have one entry here a HashMap is pretty overkill. A
 +      /// BTreeMap currently stores more than ten elements per leaf node, so even up to a few
 +      /// channels with a peer this will just be one allocation and will amount to a linear list of
 +      /// channels to walk, avoiding the whole hashing rigmarole.
 +      ///
 +      /// Note that the channel may no longer exist. For example, if a channel was closed but we
 +      /// later needed to claim an HTLC which is pending on-chain, we may generate a monitor update
 +      /// for a missing channel. While a malicious peer could construct a second channel with the
 +      /// same `temporary_channel_id` (or final `channel_id` in the case of 0conf channels or prior
 +      /// to funding appearing on-chain), the downstream `ChannelMonitor` set is required to ensure
 +      /// duplicates do not occur, so such channels should fail without a monitor update completing.
 +      monitor_update_blocked_actions: BTreeMap<[u8; 32], Vec<MonitorUpdateCompletionAction>>,
        /// The peer is currently connected (i.e. we've seen a
        /// [`ChannelMessageHandler::peer_connected`] and no corresponding
        /// [`ChannelMessageHandler::peer_disconnected`].
@@@ -513,7 -501,7 +513,7 @@@ impl <Signer: ChannelSigner> PeerState<
                if require_disconnected && self.is_connected {
                        return false
                }
 -              self.channel_by_id.len() == 0
 +              self.channel_by_id.is_empty() && self.monitor_update_blocked_actions.is_empty()
        }
  }
  
@@@ -605,15 -593,6 +605,15 @@@ pub type SimpleRefChannelManager<'a, 'b
  /// offline for a full minute. In order to track this, you must call
  /// timer_tick_occurred roughly once per minute, though it doesn't have to be perfect.
  ///
 +/// To avoid trivial DoS issues, ChannelManager limits the number of inbound connections and
 +/// inbound channels without confirmed funding transactions. This may result in nodes which we do
 +/// not have a channel with being unable to connect to us or open new channels with us if we have
 +/// many peers with unfunded channels.
 +///
 +/// Because it is an indication of trust, inbound channels which we've accepted as 0conf are
 +/// exempted from the count of unfunded channels. Similarly, outbound channels and connections are
 +/// never limited. Please ensure you limit the count of such channels yourself.
 +///
  /// Rather than using a plain ChannelManager, it is preferable to use either a SimpleArcChannelManager
  /// a SimpleRefChannelManager, for conciseness. See their documentation for more details, but
  /// essentially you should default to using a SimpleRefChannelManager, and use a
@@@ -964,19 -943,6 +964,19 @@@ pub(crate) const MPP_TIMEOUT_TICKS: u8 
  /// [`OutboundPayments::remove_stale_resolved_payments`].
  pub(crate) const IDEMPOTENCY_TIMEOUT_TICKS: u8 = 7;
  
 +/// The maximum number of unfunded channels we can have per-peer before we start rejecting new
 +/// (inbound) ones. The number of peers with unfunded channels is limited separately in
 +/// [`MAX_UNFUNDED_CHANNEL_PEERS`].
 +const MAX_UNFUNDED_CHANS_PER_PEER: usize = 4;
 +
 +/// The maximum number of peers from which we will allow pending unfunded channels. Once we reach
 +/// this many peers we reject new (inbound) channels from peers with which we don't have a channel.
 +const MAX_UNFUNDED_CHANNEL_PEERS: usize = 50;
 +
 +/// The maximum number of peers which we do not have a (funded) channel with. Once we reach this
 +/// many peers we reject new (inbound) connections.
 +const MAX_NO_CHANNEL_PEERS: usize = 250;
 +
  /// Information needed for constructing an invoice route hint for this channel.
  #[derive(Clone, Debug, PartialEq)]
  pub struct CounterpartyForwardingInfo {
@@@ -1379,6 -1345,78 +1379,6 @@@ macro_rules! remove_channel 
        }
  }
  
 -macro_rules! handle_monitor_update_res {
 -      ($self: ident, $err: expr, $chan: expr, $action_type: path, $resend_raa: expr, $resend_commitment: expr, $resend_channel_ready: expr, $failed_forwards: expr, $failed_fails: expr, $failed_finalized_fulfills: expr, $chan_id: expr) => {
 -              match $err {
 -                      ChannelMonitorUpdateStatus::PermanentFailure => {
 -                              log_error!($self.logger, "Closing channel {} due to monitor update ChannelMonitorUpdateStatus::PermanentFailure", log_bytes!($chan_id[..]));
 -                              update_maps_on_chan_removal!($self, $chan);
 -                              // TODO: $failed_fails is dropped here, which will cause other channels to hit the
 -                              // chain in a confused state! We need to move them into the ChannelMonitor which
 -                              // will be responsible for failing backwards once things confirm on-chain.
 -                              // It's ok that we drop $failed_forwards here - at this point we'd rather they
 -                              // broadcast HTLC-Timeout and pay the associated fees to get their funds back than
 -                              // us bother trying to claim it just to forward on to another peer. If we're
 -                              // splitting hairs we'd prefer to claim payments that were to us, but we haven't
 -                              // given up the preimage yet, so might as well just wait until the payment is
 -                              // retried, avoiding the on-chain fees.
 -                              let res: Result<(), _> = Err(MsgHandleErrInternal::from_finish_shutdown("ChannelMonitor storage failure".to_owned(), *$chan_id, $chan.get_user_id(),
 -                                              $chan.force_shutdown(false), $self.get_channel_update_for_broadcast(&$chan).ok() ));
 -                              (res, true)
 -                      },
 -                      ChannelMonitorUpdateStatus::InProgress => {
 -                              log_info!($self.logger, "Disabling channel {} due to monitor update in progress. On restore will send {} and process {} forwards, {} fails, and {} fulfill finalizations",
 -                                              log_bytes!($chan_id[..]),
 -                                              if $resend_commitment && $resend_raa {
 -                                                              match $action_type {
 -                                                                      RAACommitmentOrder::CommitmentFirst => { "commitment then RAA" },
 -                                                                      RAACommitmentOrder::RevokeAndACKFirst => { "RAA then commitment" },
 -                                                              }
 -                                                      } else if $resend_commitment { "commitment" }
 -                                                      else if $resend_raa { "RAA" }
 -                                                      else { "nothing" },
 -                                              (&$failed_forwards as &Vec<(PendingHTLCInfo, u64)>).len(),
 -                                              (&$failed_fails as &Vec<(HTLCSource, PaymentHash, HTLCFailReason)>).len(),
 -                                              (&$failed_finalized_fulfills as &Vec<HTLCSource>).len());
 -                              if !$resend_commitment {
 -                                      debug_assert!($action_type == RAACommitmentOrder::RevokeAndACKFirst || !$resend_raa);
 -                              }
 -                              if !$resend_raa {
 -                                      debug_assert!($action_type == RAACommitmentOrder::CommitmentFirst || !$resend_commitment);
 -                              }
 -                              $chan.monitor_updating_paused($resend_raa, $resend_commitment, $resend_channel_ready, $failed_forwards, $failed_fails, $failed_finalized_fulfills);
 -                              (Err(MsgHandleErrInternal::from_chan_no_close(ChannelError::Ignore("Failed to update ChannelMonitor".to_owned()), *$chan_id)), false)
 -                      },
 -                      ChannelMonitorUpdateStatus::Completed => {
 -                              (Ok(()), false)
 -                      },
 -              }
 -      };
 -      ($self: ident, $err: expr, $entry: expr, $action_type: path, $resend_raa: expr, $resend_commitment: expr, $resend_channel_ready: expr, $failed_forwards: expr, $failed_fails: expr, $failed_finalized_fulfills: expr) => { {
 -              let (res, drop) = handle_monitor_update_res!($self, $err, $entry.get_mut(), $action_type, $resend_raa, $resend_commitment, $resend_channel_ready, $failed_forwards, $failed_fails, $failed_finalized_fulfills, $entry.key());
 -              if drop {
 -                      $entry.remove_entry();
 -              }
 -              res
 -      } };
 -      ($self: ident, $err: expr, $entry: expr, $action_type: path, $chan_id: expr, COMMITMENT_UPDATE_ONLY) => { {
 -              debug_assert!($action_type == RAACommitmentOrder::CommitmentFirst);
 -              handle_monitor_update_res!($self, $err, $entry, $action_type, false, true, false, Vec::new(), Vec::new(), Vec::new(), $chan_id)
 -      } };
 -      ($self: ident, $err: expr, $entry: expr, $action_type: path, $chan_id: expr, NO_UPDATE) => {
 -              handle_monitor_update_res!($self, $err, $entry, $action_type, false, false, false, Vec::new(), Vec::new(), Vec::new(), $chan_id)
 -      };
 -      ($self: ident, $err: expr, $entry: expr, $action_type: path, $resend_channel_ready: expr, OPTIONALLY_RESEND_FUNDING_LOCKED) => {
 -              handle_monitor_update_res!($self, $err, $entry, $action_type, false, false, $resend_channel_ready, Vec::new(), Vec::new(), Vec::new())
 -      };
 -      ($self: ident, $err: expr, $entry: expr, $action_type: path, $resend_raa: expr, $resend_commitment: expr) => {
 -              handle_monitor_update_res!($self, $err, $entry, $action_type, $resend_raa, $resend_commitment, false, Vec::new(), Vec::new(), Vec::new())
 -      };
 -      ($self: ident, $err: expr, $entry: expr, $action_type: path, $resend_raa: expr, $resend_commitment: expr, $failed_forwards: expr, $failed_fails: expr) => {
 -              handle_monitor_update_res!($self, $err, $entry, $action_type, $resend_raa, $resend_commitment, false, $failed_forwards, $failed_fails, Vec::new())
 -      };
 -}
 -
  macro_rules! send_channel_ready {
        ($self: ident, $pending_msg_events: expr, $channel: expr, $channel_ready_msg: expr) => {{
                $pending_msg_events.push(events::MessageSendEvent::SendChannelReady {
@@@ -1416,93 -1454,6 +1416,93 @@@ macro_rules! emit_channel_ready_event 
        }
  }
  
 +macro_rules! handle_monitor_update_completion {
 +      ($self: ident, $update_id: expr, $peer_state_lock: expr, $peer_state: expr, $chan: expr) => { {
 +              let mut updates = $chan.monitor_updating_restored(&$self.logger,
 +                      &$self.node_signer, $self.genesis_hash, &$self.default_configuration,
 +                      $self.best_block.read().unwrap().height());
 +              let counterparty_node_id = $chan.get_counterparty_node_id();
 +              let channel_update = if updates.channel_ready.is_some() && $chan.is_usable() {
 +                      // We only send a channel_update in the case where we are just now sending a
 +                      // channel_ready and the channel is in a usable state. We may re-send a
 +                      // channel_update later through the announcement_signatures process for public
 +                      // channels, but there's no reason not to just inform our counterparty of our fees
 +                      // now.
 +                      if let Ok(msg) = $self.get_channel_update_for_unicast($chan) {
 +                              Some(events::MessageSendEvent::SendChannelUpdate {
 +                                      node_id: counterparty_node_id,
 +                                      msg,
 +                              })
 +                      } else { None }
 +              } else { None };
 +
 +              let update_actions = $peer_state.monitor_update_blocked_actions
 +                      .remove(&$chan.channel_id()).unwrap_or(Vec::new());
 +
 +              let htlc_forwards = $self.handle_channel_resumption(
 +                      &mut $peer_state.pending_msg_events, $chan, updates.raa,
 +                      updates.commitment_update, updates.order, updates.accepted_htlcs,
 +                      updates.funding_broadcastable, updates.channel_ready,
 +                      updates.announcement_sigs);
 +              if let Some(upd) = channel_update {
 +                      $peer_state.pending_msg_events.push(upd);
 +              }
 +
 +              let channel_id = $chan.channel_id();
 +              core::mem::drop($peer_state_lock);
 +
 +              $self.handle_monitor_update_completion_actions(update_actions);
 +
 +              if let Some(forwards) = htlc_forwards {
 +                      $self.forward_htlcs(&mut [forwards][..]);
 +              }
 +              $self.finalize_claims(updates.finalized_claimed_htlcs);
 +              for failure in updates.failed_htlcs.drain(..) {
 +                      let receiver = HTLCDestination::NextHopChannel { node_id: Some(counterparty_node_id), channel_id };
 +                      $self.fail_htlc_backwards_internal(&failure.0, &failure.1, &failure.2, receiver);
 +              }
 +      } }
 +}
 +
 +macro_rules! handle_new_monitor_update {
 +      ($self: ident, $update_res: expr, $update_id: expr, $peer_state_lock: expr, $peer_state: expr, $chan: expr, MANUALLY_REMOVING, $remove: expr) => { {
 +              // update_maps_on_chan_removal needs to be able to take id_to_peer, so make sure we can in
 +              // any case so that it won't deadlock.
 +              debug_assert!($self.id_to_peer.try_lock().is_ok());
 +              match $update_res {
 +                      ChannelMonitorUpdateStatus::InProgress => {
 +                              log_debug!($self.logger, "ChannelMonitor update for {} in flight, holding messages until the update completes.",
 +                                      log_bytes!($chan.channel_id()[..]));
 +                              Ok(())
 +                      },
 +                      ChannelMonitorUpdateStatus::PermanentFailure => {
 +                              log_error!($self.logger, "Closing channel {} due to monitor update ChannelMonitorUpdateStatus::PermanentFailure",
 +                                      log_bytes!($chan.channel_id()[..]));
 +                              update_maps_on_chan_removal!($self, $chan);
 +                              let res: Result<(), _> = Err(MsgHandleErrInternal::from_finish_shutdown(
 +                                      "ChannelMonitor storage failure".to_owned(), $chan.channel_id(),
 +                                      $chan.get_user_id(), $chan.force_shutdown(false),
 +                                      $self.get_channel_update_for_broadcast(&$chan).ok()));
 +                              $remove;
 +                              res
 +                      },
 +                      ChannelMonitorUpdateStatus::Completed => {
 +                              if ($update_id == 0 || $chan.get_next_monitor_update()
 +                                      .expect("We can't be processing a monitor update if it isn't queued")
 +                                      .update_id == $update_id) &&
 +                                      $chan.get_latest_monitor_update_id() == $update_id
 +                              {
 +                                      handle_monitor_update_completion!($self, $update_id, $peer_state_lock, $peer_state, $chan);
 +                              }
 +                              Ok(())
 +                      },
 +              }
 +      } };
 +      ($self: ident, $update_res: expr, $update_id: expr, $peer_state_lock: expr, $peer_state: expr, $chan_entry: expr) => {
 +              handle_new_monitor_update!($self, $update_res, $update_id, $peer_state_lock, $peer_state, $chan_entry.get_mut(), MANUALLY_REMOVING, $chan_entry.remove_entry())
 +      }
 +}
 +
  impl<M: Deref, T: Deref, ES: Deref, NS: Deref, SP: Deref, F: Deref, R: Deref, L: Deref> ChannelManager<M, T, ES, NS, SP, F, R, L>
  where
        M::Target: chain::Watch<<SP::Target as SignerProvider>::Signer>,
                        let peer_state = &mut *peer_state_lock;
                        match peer_state.channel_by_id.entry(channel_id.clone()) {
                                hash_map::Entry::Occupied(mut chan_entry) => {
 -                                      let (shutdown_msg, monitor_update, htlcs) = chan_entry.get_mut().get_shutdown(&self.signer_provider, &peer_state.latest_features, target_feerate_sats_per_1000_weight)?;
 +                                      let funding_txo_opt = chan_entry.get().get_funding_txo();
 +                                      let their_features = &peer_state.latest_features;
 +                                      let (shutdown_msg, mut monitor_update_opt, htlcs) = chan_entry.get_mut()
 +                                              .get_shutdown(&self.signer_provider, their_features, target_feerate_sats_per_1000_weight)?;
                                        failed_htlcs = htlcs;
  
 -                                      // Update the monitor with the shutdown script if necessary.
 -                                      if let Some(monitor_update) = monitor_update {
 -                                              let update_res = self.chain_monitor.update_channel(chan_entry.get().get_funding_txo().unwrap(), &monitor_update);
 -                                              let (result, is_permanent) =
 -                                                      handle_monitor_update_res!(self, update_res, chan_entry.get_mut(), RAACommitmentOrder::CommitmentFirst, chan_entry.key(), NO_UPDATE);
 -                                              if is_permanent {
 -                                                      remove_channel!(self, chan_entry);
 -                                                      break result;
 -                                              }
 -                                      }
 -
 +                                      // We can send the `shutdown` message before updating the `ChannelMonitor`
 +                                      // here as we don't need the monitor update to complete until we send a
 +                                      // `shutdown_signed`, which we'll delay if we're pending a monitor update.
                                        peer_state.pending_msg_events.push(events::MessageSendEvent::SendShutdown {
                                                node_id: *counterparty_node_id,
 -                                              msg: shutdown_msg
 +                                              msg: shutdown_msg,
                                        });
  
 +                                      // Update the monitor with the shutdown script if necessary.
 +                                      if let Some(monitor_update) = monitor_update_opt.take() {
 +                                              let update_id = monitor_update.update_id;
 +                                              let update_res = self.chain_monitor.update_channel(funding_txo_opt.unwrap(), monitor_update);
 +                                              break handle_new_monitor_update!(self, update_res, update_id, peer_state_lock, peer_state, chan_entry);
 +                                      }
 +
                                        if chan_entry.get().is_shutdown() {
                                                let channel = remove_channel!(self, chan_entry);
                                                if let Ok(channel_update) = self.get_channel_update_for_broadcast(&channel) {
  
                        let per_peer_state = self.per_peer_state.read().unwrap();
                        let peer_state_mutex = per_peer_state.get(&counterparty_node_id)
-                               .ok_or_else(|| APIError::InvalidRoute{err: "No peer matching the path's first hop found!" })?;
+                               .ok_or_else(|| APIError::ChannelUnavailable{err: "No peer matching the path's first hop found!".to_owned() })?;
                        let mut peer_state_lock = peer_state_mutex.lock().unwrap();
                        let peer_state = &mut *peer_state_lock;
                        if let hash_map::Entry::Occupied(mut chan) = peer_state.channel_by_id.entry(id) {
 -                              match {
 -                                      if !chan.get().is_live() {
 -                                              return Err(APIError::ChannelUnavailable{err: "Peer for first hop currently disconnected/pending monitor update!".to_owned()});
 -                                      }
 -                                      break_chan_entry!(self, chan.get_mut().send_htlc_and_commit(
 -                                              htlc_msat, payment_hash.clone(), htlc_cltv, HTLCSource::OutboundRoute {
 -                                                      path: path.clone(),
 -                                                      session_priv: session_priv.clone(),
 -                                                      first_hop_htlc_msat: htlc_msat,
 -                                                      payment_id,
 -                                                      payment_secret: payment_secret.clone(),
 -                                                      payment_params: payment_params.clone(),
 -                                              }, onion_packet, &self.logger),
 -                                              chan)
 -                              } {
 -                                      Some((update_add, commitment_signed, monitor_update)) => {
 -                                              let update_err = self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), &monitor_update);
 -                                              let chan_id = chan.get().channel_id();
 -                                              match (update_err,
 -                                                      handle_monitor_update_res!(self, update_err, chan,
 -                                                              RAACommitmentOrder::CommitmentFirst, false, true))
 -                                              {
 -                                                      (ChannelMonitorUpdateStatus::PermanentFailure, Err(e)) => break Err(e),
 -                                                      (ChannelMonitorUpdateStatus::Completed, Ok(())) => {},
 -                                                      (ChannelMonitorUpdateStatus::InProgress, Err(_)) => {
 -                                                              // Note that MonitorUpdateInProgress here indicates (per function
 -                                                              // docs) that we will resend the commitment update once monitor
 -                                                              // updating completes. Therefore, we must return an error
 -                                                              // indicating that it is unsafe to retry the payment wholesale,
 -                                                              // which we do in the send_payment check for
 -                                                              // MonitorUpdateInProgress, below.
 -                                                              return Err(APIError::MonitorUpdateInProgress);
 -                                                      },
 -                                                      _ => unreachable!(),
 +                              if !chan.get().is_live() {
 +                                      return Err(APIError::ChannelUnavailable{err: "Peer for first hop currently disconnected".to_owned()});
 +                              }
 +                              let funding_txo = chan.get().get_funding_txo().unwrap();
 +                              let send_res = chan.get_mut().send_htlc_and_commit(htlc_msat, payment_hash.clone(),
 +                                      htlc_cltv, HTLCSource::OutboundRoute {
 +                                              path: path.clone(),
 +                                              session_priv: session_priv.clone(),
 +                                              first_hop_htlc_msat: htlc_msat,
 +                                              payment_id,
 +                                              payment_secret: payment_secret.clone(),
 +                                              payment_params: payment_params.clone(),
 +                                      }, onion_packet, &self.logger);
 +                              match break_chan_entry!(self, send_res, chan) {
 +                                      Some(monitor_update) => {
 +                                              let update_id = monitor_update.update_id;
 +                                              let update_res = self.chain_monitor.update_channel(funding_txo, monitor_update);
 +                                              if let Err(e) = handle_new_monitor_update!(self, update_res, update_id, peer_state_lock, peer_state, chan) {
 +                                                      break Err(e);
 +                                              }
 +                                              if update_res == ChannelMonitorUpdateStatus::InProgress {
 +                                                      // Note that MonitorUpdateInProgress here indicates (per function
 +                                                      // docs) that we will resend the commitment update once monitor
 +                                                      // updating completes. Therefore, we must return an error
 +                                                      // indicating that it is unsafe to retry the payment wholesale,
 +                                                      // which we do in the send_payment check for
 +                                                      // MonitorUpdateInProgress, below.
 +                                                      return Err(APIError::MonitorUpdateInProgress);
                                                }
 -
 -                                              log_debug!(self.logger, "Sending payment along path resulted in a commitment_signed for channel {}", log_bytes!(chan_id));
 -                                              peer_state.pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs {
 -                                                      node_id: path.first().unwrap().pubkey,
 -                                                      updates: msgs::CommitmentUpdate {
 -                                                              update_add_htlcs: vec![update_add],
 -                                                              update_fulfill_htlcs: Vec::new(),
 -                                                              update_fail_htlcs: Vec::new(),
 -                                                              update_fail_malformed_htlcs: Vec::new(),
 -                                                              update_fee: None,
 -                                                              commitment_signed,
 -                                                      },
 -                                              });
                                        },
                                        None => { },
                                }
  
        /// Similar to [`ChannelManager::send_payment`], but will automatically find a route based on
        /// `route_params` and retry failed payment paths based on `retry_strategy`.
-       pub fn send_payment_with_retry(&self, payment_hash: PaymentHash, payment_secret: &Option<PaymentSecret>, payment_id: PaymentId, route_params: RouteParameters, retry_strategy: Retry) -> Result<(), PaymentSendFailure> {
+       pub fn send_payment_with_retry(&self, payment_hash: PaymentHash, payment_secret: &Option<PaymentSecret>, payment_id: PaymentId, route_params: RouteParameters, retry_strategy: Retry) -> Result<(), RetryableSendFailure> {
                let best_block_height = self.best_block.read().unwrap().height();
                self.pending_outbound_payments
                        .send_payment(payment_hash, payment_secret, payment_id, retry_strategy, route_params,
                                &self.router, self.list_usable_channels(), || self.compute_inflight_htlcs(),
                                &self.entropy_source, &self.node_signer, best_block_height, &self.logger,
+                               &self.pending_events,
                                |path, payment_params, payment_hash, payment_secret, total_value, cur_height, payment_id, keysend_preimage, session_priv|
                                self.send_payment_along_path(path, payment_params, payment_hash, payment_secret, total_value, cur_height, payment_id, keysend_preimage, session_priv))
        }
        /// payments.
        ///
        /// [`PaymentParameters::for_keysend`]: crate::routing::router::PaymentParameters::for_keysend
-       pub fn send_spontaneous_payment_with_retry(&self, payment_preimage: Option<PaymentPreimage>, payment_id: PaymentId, route_params: RouteParameters, retry_strategy: Retry) -> Result<PaymentHash, PaymentSendFailure> {
+       pub fn send_spontaneous_payment_with_retry(&self, payment_preimage: Option<PaymentPreimage>, payment_id: PaymentId, route_params: RouteParameters, retry_strategy: Retry) -> Result<PaymentHash, RetryableSendFailure> {
                let best_block_height = self.best_block.read().unwrap().height();
                self.pending_outbound_payments.send_spontaneous_payment(payment_preimage, payment_id,
                        retry_strategy, route_params, &self.router, self.list_usable_channels(),
                        || self.compute_inflight_htlcs(),  &self.entropy_source, &self.node_signer, best_block_height,
-                       &self.logger,
+                       &self.logger, &self.pending_events,
                        |path, payment_params, payment_hash, payment_secret, total_value, cur_height, payment_id, keysend_preimage, session_priv|
                        self.send_payment_along_path(path, payment_params, payment_hash, payment_secret, total_value, cur_height, payment_id, keysend_preimage, session_priv))
        }
                                        (chan, funding_msg)
                                },
                                Err(_) => { return Err(APIError::ChannelUnavailable {
 -                                      err: "Error deriving keys or signing initial commitment transactions - either our RNG or our counterparty's RNG is broken or the Signer refused to sign".to_owned()
 +                                      err: "Signer refused to sign the initial commitment transaction".to_owned()
                                }) },
                        }
                };
  
                let per_peer_state = self.per_peer_state.read().unwrap();
                let chan_id = prev_hop.outpoint.to_channel_id();
 -
                let counterparty_node_id_opt = match self.short_to_chan_info.read().unwrap().get(&prev_hop.short_channel_id) {
                        Some((cp_id, _dup_chan_id)) => Some(cp_id.clone()),
                        None => None
                        )
                ).unwrap_or(None);
  
 -              if let Some(hash_map::Entry::Occupied(mut chan)) = peer_state_opt.as_mut().map(|peer_state| peer_state.channel_by_id.entry(chan_id))
 -              {
 -                      let counterparty_node_id = chan.get().get_counterparty_node_id();
 -                      match chan.get_mut().get_update_fulfill_htlc_and_commit(prev_hop.htlc_id, payment_preimage, &self.logger) {
 -                              Ok(msgs_monitor_option) => {
 -                                      if let UpdateFulfillCommitFetch::NewClaim { msgs, htlc_value_msat, monitor_update } = msgs_monitor_option {
 -                                              match self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), &monitor_update) {
 -                                                      ChannelMonitorUpdateStatus::Completed => {},
 -                                                      e => {
 -                                                              log_given_level!(self.logger, if e == ChannelMonitorUpdateStatus::PermanentFailure { Level::Error } else { Level::Debug },
 -                                                                      "Failed to update channel monitor with preimage {:?}: {:?}",
 -                                                                      payment_preimage, e);
 -                                                              let err = handle_monitor_update_res!(self, e, chan, RAACommitmentOrder::CommitmentFirst, false, msgs.is_some()).unwrap_err();
 -                                                              mem::drop(peer_state_opt);
 -                                                              mem::drop(per_peer_state);
 -                                                              self.handle_monitor_update_completion_actions(completion_action(Some(htlc_value_msat)));
 -                                                              return Err((counterparty_node_id, err));
 -                                                      }
 -                                              }
 -                                              if let Some((msg, commitment_signed)) = msgs {
 -                                                      log_debug!(self.logger, "Claiming funds for HTLC with preimage {} resulted in a commitment_signed for channel {}",
 -                                                              log_bytes!(payment_preimage.0), log_bytes!(chan.get().channel_id()));
 -                                                      peer_state_opt.as_mut().unwrap().pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs {
 -                                                              node_id: counterparty_node_id,
 -                                                              updates: msgs::CommitmentUpdate {
 -                                                                      update_add_htlcs: Vec::new(),
 -                                                                      update_fulfill_htlcs: vec![msg],
 -                                                                      update_fail_htlcs: Vec::new(),
 -                                                                      update_fail_malformed_htlcs: Vec::new(),
 -                                                                      update_fee: None,
 -                                                                      commitment_signed,
 -                                                              }
 -                                                      });
 -                                              }
 -                                              mem::drop(peer_state_opt);
 -                                              mem::drop(per_peer_state);
 -                                              self.handle_monitor_update_completion_actions(completion_action(Some(htlc_value_msat)));
 -                                              Ok(())
 -                                      } else {
 -                                              Ok(())
 -                                      }
 -                              },
 -                              Err((e, monitor_update)) => {
 -                                      match self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), &monitor_update) {
 -                                              ChannelMonitorUpdateStatus::Completed => {},
 -                                              e => {
 -                                                      // TODO: This needs to be handled somehow - if we receive a monitor update
 -                                                      // with a preimage we *must* somehow manage to propagate it to the upstream
 -                                                      // channel, or we must have an ability to receive the same update and try
 -                                                      // again on restart.
 -                                                      log_given_level!(self.logger, if e == ChannelMonitorUpdateStatus::PermanentFailure { Level::Error } else { Level::Info },
 -                                                              "Failed to update channel monitor with preimage {:?} immediately prior to force-close: {:?}",
 -                                                              payment_preimage, e);
 -                                              },
 +              if let Some(mut peer_state_lock) = peer_state_opt.take() {
 +                      let peer_state = &mut *peer_state_lock;
 +                      if let hash_map::Entry::Occupied(mut chan) = peer_state.channel_by_id.entry(chan_id) {
 +                              let counterparty_node_id = chan.get().get_counterparty_node_id();
 +                              let fulfill_res = chan.get_mut().get_update_fulfill_htlc_and_commit(prev_hop.htlc_id, payment_preimage, &self.logger);
 +
 +                              if let UpdateFulfillCommitFetch::NewClaim { htlc_value_msat, monitor_update } = fulfill_res {
 +                                      if let Some(action) = completion_action(Some(htlc_value_msat)) {
 +                                              log_trace!(self.logger, "Tracking monitor update completion action for channel {}: {:?}",
 +                                                      log_bytes!(chan_id), action);
 +                                              peer_state.monitor_update_blocked_actions.entry(chan_id).or_insert(Vec::new()).push(action);
                                        }
 -                                      let (drop, res) = convert_chan_err!(self, e, chan.get_mut(), &chan_id);
 -                                      if drop {
 -                                              chan.remove_entry();
 +                                      let update_id = monitor_update.update_id;
 +                                      let update_res = self.chain_monitor.update_channel(prev_hop.outpoint, monitor_update);
 +                                      let res = handle_new_monitor_update!(self, update_res, update_id, peer_state_lock,
 +                                              peer_state, chan);
 +                                      if let Err(e) = res {
 +                                              // TODO: This is a *critical* error - we probably updated the outbound edge
 +                                              // of the HTLC's monitor with a preimage. We should retry this monitor
 +                                              // update over and over again until morale improves.
 +                                              log_error!(self.logger, "Failed to update channel monitor with preimage {:?}", payment_preimage);
 +                                              return Err((counterparty_node_id, e));
                                        }
 -                                      mem::drop(peer_state_opt);
 -                                      mem::drop(per_peer_state);
 -                                      self.handle_monitor_update_completion_actions(completion_action(None));
 -                                      Err((counterparty_node_id, res))
 -                              },
 -                      }
 -              } else {
 -                      let preimage_update = ChannelMonitorUpdate {
 -                              update_id: CLOSED_CHANNEL_UPDATE_ID,
 -                              updates: vec![ChannelMonitorUpdateStep::PaymentPreimage {
 -                                      payment_preimage,
 -                              }],
 -                      };
 -                      // We update the ChannelMonitor on the backward link, after
 -                      // receiving an `update_fulfill_htlc` from the forward link.
 -                      let update_res = self.chain_monitor.update_channel(prev_hop.outpoint, &preimage_update);
 -                      if update_res != ChannelMonitorUpdateStatus::Completed {
 -                              // TODO: This needs to be handled somehow - if we receive a monitor update
 -                              // with a preimage we *must* somehow manage to propagate it to the upstream
 -                              // channel, or we must have an ability to receive the same event and try
 -                              // again on restart.
 -                              log_error!(self.logger, "Critical error: failed to update channel monitor with preimage {:?}: {:?}",
 -                                      payment_preimage, update_res);
 +                              }
 +                              return Ok(());
                        }
 -                      mem::drop(peer_state_opt);
 -                      mem::drop(per_peer_state);
 -                      // Note that we do process the completion action here. This totally could be a
 -                      // duplicate claim, but we have no way of knowing without interrogating the
 -                      // `ChannelMonitor` we've provided the above update to. Instead, note that `Event`s are
 -                      // generally always allowed to be duplicative (and it's specifically noted in
 -                      // `PaymentForwarded`).
 -                      self.handle_monitor_update_completion_actions(completion_action(None));
 -                      Ok(())
                }
 +              let preimage_update = ChannelMonitorUpdate {
 +                      update_id: CLOSED_CHANNEL_UPDATE_ID,
 +                      updates: vec![ChannelMonitorUpdateStep::PaymentPreimage {
 +                              payment_preimage,
 +                      }],
 +              };
 +              // We update the ChannelMonitor on the backward link, after
 +              // receiving an `update_fulfill_htlc` from the forward link.
 +              let update_res = self.chain_monitor.update_channel(prev_hop.outpoint, &preimage_update);
 +              if update_res != ChannelMonitorUpdateStatus::Completed {
 +                      // TODO: This needs to be handled somehow - if we receive a monitor update
 +                      // with a preimage we *must* somehow manage to propagate it to the upstream
 +                      // channel, or we must have an ability to receive the same event and try
 +                      // again on restart.
 +                      log_error!(self.logger, "Critical error: failed to update channel monitor with preimage {:?}: {:?}",
 +                              payment_preimage, update_res);
 +              }
 +              // Note that we do process the completion action here. This totally could be a
 +              // duplicate claim, but we have no way of knowing without interrogating the
 +              // `ChannelMonitor` we've provided the above update to. Instead, note that `Event`s are
 +              // generally always allowed to be duplicative (and it's specifically noted in
 +              // `PaymentForwarded`).
 +              self.handle_monitor_update_completion_actions(completion_action(None));
 +              Ok(())
        }
  
        fn finalize_claims(&self, sources: Vec<HTLCSource>) {
                pending_forwards: Vec<(PendingHTLCInfo, u64)>, funding_broadcastable: Option<Transaction>,
                channel_ready: Option<msgs::ChannelReady>, announcement_sigs: Option<msgs::AnnouncementSignatures>)
        -> Option<(u64, OutPoint, u128, Vec<(PendingHTLCInfo, u64)>)> {
 +              log_trace!(self.logger, "Handling channel resumption for channel {} with {} RAA, {} commitment update, {} pending forwards, {}broadcasting funding, {} channel ready, {} announcement",
 +                      log_bytes!(channel.channel_id()),
 +                      if raa.is_some() { "an" } else { "no" },
 +                      if commitment_update.is_some() { "a" } else { "no" }, pending_forwards.len(),
 +                      if funding_broadcastable.is_some() { "" } else { "not " },
 +                      if channel_ready.is_some() { "sending" } else { "without" },
 +                      if announcement_sigs.is_some() { "sending" } else { "without" });
 +
                let mut htlc_forwards = None;
  
                let counterparty_node_id = channel.get_counterparty_node_id();
        fn channel_monitor_updated(&self, funding_txo: &OutPoint, highest_applied_update_id: u64, counterparty_node_id: Option<&PublicKey>) {
                let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);
  
 -              let htlc_forwards;
 -              let (mut pending_failures, finalized_claims, counterparty_node_id) = {
 -                      let counterparty_node_id = match counterparty_node_id {
 -                              Some(cp_id) => cp_id.clone(),
 -                              None => {
 -                                      // TODO: Once we can rely on the counterparty_node_id from the
 -                                      // monitor event, this and the id_to_peer map should be removed.
 -                                      let id_to_peer = self.id_to_peer.lock().unwrap();
 -                                      match id_to_peer.get(&funding_txo.to_channel_id()) {
 -                                              Some(cp_id) => cp_id.clone(),
 -                                              None => return,
 -                                      }
 -                              }
 -                      };
 -                      let per_peer_state = self.per_peer_state.read().unwrap();
 -                      let mut peer_state_lock;
 -                      let peer_state_mutex_opt = per_peer_state.get(&counterparty_node_id);
 -                      if peer_state_mutex_opt.is_none() { return }
 -                      peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap();
 -                      let peer_state = &mut *peer_state_lock;
 -                      let mut channel = {
 -                              match peer_state.channel_by_id.entry(funding_txo.to_channel_id()){
 -                                      hash_map::Entry::Occupied(chan) => chan,
 -                                      hash_map::Entry::Vacant(_) => return,
 +              let counterparty_node_id = match counterparty_node_id {
 +                      Some(cp_id) => cp_id.clone(),
 +                      None => {
 +                              // TODO: Once we can rely on the counterparty_node_id from the
 +                              // monitor event, this and the id_to_peer map should be removed.
 +                              let id_to_peer = self.id_to_peer.lock().unwrap();
 +                              match id_to_peer.get(&funding_txo.to_channel_id()) {
 +                                      Some(cp_id) => cp_id.clone(),
 +                                      None => return,
                                }
 -                      };
 -                      if !channel.get().is_awaiting_monitor_update() || channel.get().get_latest_monitor_update_id() != highest_applied_update_id {
 -                              return;
                        }
 -
 -                      let updates = channel.get_mut().monitor_updating_restored(&self.logger, &self.node_signer, self.genesis_hash, &self.default_configuration, self.best_block.read().unwrap().height());
 -                      let channel_update = if updates.channel_ready.is_some() && channel.get().is_usable() {
 -                              // We only send a channel_update in the case where we are just now sending a
 -                              // channel_ready and the channel is in a usable state. We may re-send a
 -                              // channel_update later through the announcement_signatures process for public
 -                              // channels, but there's no reason not to just inform our counterparty of our fees
 -                              // now.
 -                              if let Ok(msg) = self.get_channel_update_for_unicast(channel.get()) {
 -                                      Some(events::MessageSendEvent::SendChannelUpdate {
 -                                              node_id: channel.get().get_counterparty_node_id(),
 -                                              msg,
 -                                      })
 -                              } else { None }
 -                      } else { None };
 -                      htlc_forwards = self.handle_channel_resumption(&mut peer_state.pending_msg_events, channel.get_mut(), updates.raa, updates.commitment_update, updates.order, updates.accepted_htlcs, updates.funding_broadcastable, updates.channel_ready, updates.announcement_sigs);
 -                      if let Some(upd) = channel_update {
 -                              peer_state.pending_msg_events.push(upd);
 +              };
 +              let per_peer_state = self.per_peer_state.read().unwrap();
 +              let mut peer_state_lock;
 +              let peer_state_mutex_opt = per_peer_state.get(&counterparty_node_id);
 +              if peer_state_mutex_opt.is_none() { return }
 +              peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap();
 +              let peer_state = &mut *peer_state_lock;
 +              let mut channel = {
 +                      match peer_state.channel_by_id.entry(funding_txo.to_channel_id()){
 +                              hash_map::Entry::Occupied(chan) => chan,
 +                              hash_map::Entry::Vacant(_) => return,
                        }
 -
 -                      (updates.failed_htlcs, updates.finalized_claimed_htlcs, counterparty_node_id)
                };
 -              if let Some(forwards) = htlc_forwards {
 -                      self.forward_htlcs(&mut [forwards][..]);
 -              }
 -              self.finalize_claims(finalized_claims);
 -              for failure in pending_failures.drain(..) {
 -                      let receiver = HTLCDestination::NextHopChannel { node_id: Some(counterparty_node_id), channel_id: funding_txo.to_channel_id() };
 -                      self.fail_htlc_backwards_internal(&failure.0, &failure.1, &failure.2, receiver);
 +              log_trace!(self.logger, "ChannelMonitor updated to {}. Current highest is {}",
 +                      highest_applied_update_id, channel.get().get_latest_monitor_update_id());
 +              if !channel.get().is_awaiting_monitor_update() || channel.get().get_latest_monitor_update_id() != highest_applied_update_id {
 +                      return;
                }
 +              handle_monitor_update_completion!(self, highest_applied_update_id, peer_state_lock, peer_state, channel.get_mut());
        }
  
        /// Accepts a request to open a channel after a [`Event::OpenChannelRequest`].
        fn do_accept_inbound_channel(&self, temporary_channel_id: &[u8; 32], counterparty_node_id: &PublicKey, accept_0conf: bool, user_channel_id: u128) -> Result<(), APIError> {
                let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);
  
 +              let peers_without_funded_channels = self.peers_without_funded_channels(|peer| !peer.channel_by_id.is_empty());
                let per_peer_state = self.per_peer_state.read().unwrap();
                let peer_state_mutex = per_peer_state.get(counterparty_node_id)
                        .ok_or_else(|| APIError::ChannelUnavailable { err: format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id) })?;
                let mut peer_state_lock = peer_state_mutex.lock().unwrap();
                let peer_state = &mut *peer_state_lock;
 +              let is_only_peer_channel = peer_state.channel_by_id.len() == 1;
                match peer_state.channel_by_id.entry(temporary_channel_id.clone()) {
                        hash_map::Entry::Occupied(mut channel) => {
                                if !channel.get().inbound_is_awaiting_accept() {
                                        peer_state.pending_msg_events.push(send_msg_err_event);
                                        let _ = remove_channel!(self, channel);
                                        return Err(APIError::APIMisuseError { err: "Please use accept_inbound_channel_from_trusted_peer_0conf to accept channels with zero confirmations.".to_owned() });
 +                              } else {
 +                                      // If this peer already has some channels, a new channel won't increase our number of peers
 +                                      // with unfunded channels, so as long as we aren't over the maximum number of unfunded
 +                                      // channels per-peer we can accept channels from a peer with existing ones.
 +                                      if is_only_peer_channel && peers_without_funded_channels >= MAX_UNFUNDED_CHANNEL_PEERS {
 +                                              let send_msg_err_event = events::MessageSendEvent::HandleError {
 +                                                      node_id: channel.get().get_counterparty_node_id(),
 +                                                      action: msgs::ErrorAction::SendErrorMessage{
 +                                                              msg: msgs::ErrorMessage { channel_id: temporary_channel_id.clone(), data: "Have too many peers with unfunded channels, not accepting new ones".to_owned(), }
 +                                                      }
 +                                              };
 +                                              peer_state.pending_msg_events.push(send_msg_err_event);
 +                                              let _ = remove_channel!(self, channel);
 +                                              return Err(APIError::APIMisuseError { err: "Too many peers with unfunded channels, refusing to accept new ones".to_owned() });
 +                                      }
                                }
  
                                peer_state.pending_msg_events.push(events::MessageSendEvent::SendAcceptChannel {
                Ok(())
        }
  
 +      /// Gets the number of peers which match the given filter and do not have any funded, outbound,
 +      /// or 0-conf channels.
 +      ///
 +      /// The filter is called for each peer and provided with the number of unfunded, inbound, and
 +      /// non-0-conf channels we have with the peer.
 +      fn peers_without_funded_channels<Filter>(&self, maybe_count_peer: Filter) -> usize
 +      where Filter: Fn(&PeerState<<SP::Target as SignerProvider>::Signer>) -> bool {
 +              let mut peers_without_funded_channels = 0;
 +              let best_block_height = self.best_block.read().unwrap().height();
 +              {
 +                      let peer_state_lock = self.per_peer_state.read().unwrap();
 +                      for (_, peer_mtx) in peer_state_lock.iter() {
 +                              let peer = peer_mtx.lock().unwrap();
 +                              if !maybe_count_peer(&*peer) { continue; }
 +                              let num_unfunded_channels = Self::unfunded_channel_count(&peer, best_block_height);
 +                              if num_unfunded_channels == peer.channel_by_id.len() {
 +                                      peers_without_funded_channels += 1;
 +                              }
 +                      }
 +              }
 +              return peers_without_funded_channels;
 +      }
 +
 +      fn unfunded_channel_count(
 +              peer: &PeerState<<SP::Target as SignerProvider>::Signer>, best_block_height: u32
 +      ) -> usize {
 +              let mut num_unfunded_channels = 0;
 +              for (_, chan) in peer.channel_by_id.iter() {
 +                      if !chan.is_outbound() && chan.minimum_depth().unwrap_or(1) != 0 &&
 +                              chan.get_funding_tx_confirmations(best_block_height) == 0
 +                      {
 +                              num_unfunded_channels += 1;
 +                      }
 +              }
 +              num_unfunded_channels
 +      }
 +
        fn internal_open_channel(&self, counterparty_node_id: &PublicKey, msg: &msgs::OpenChannel) -> Result<(), MsgHandleErrInternal> {
                if msg.chain_hash != self.genesis_hash {
                        return Err(MsgHandleErrInternal::send_err_msg_no_close("Unknown genesis block hash".to_owned(), msg.temporary_channel_id.clone()));
                let mut random_bytes = [0u8; 16];
                random_bytes.copy_from_slice(&self.entropy_source.get_secure_random_bytes()[..16]);
                let user_channel_id = u128::from_be_bytes(random_bytes);
 -
                let outbound_scid_alias = self.create_and_insert_outbound_scid_alias();
 +
 +              // Get the number of peers with channels, but without funded ones. We don't care too much
 +              // about peers that never open a channel, so we filter by peers that have at least one
 +              // channel, and then limit the number of those with unfunded channels.
 +              let channeled_peers_without_funding = self.peers_without_funded_channels(|node| !node.channel_by_id.is_empty());
 +
                let per_peer_state = self.per_peer_state.read().unwrap();
                let peer_state_mutex = per_peer_state.get(counterparty_node_id)
                    .ok_or_else(|| {
                        })?;
                let mut peer_state_lock = peer_state_mutex.lock().unwrap();
                let peer_state = &mut *peer_state_lock;
 +
 +              // If this peer already has some channels, a new channel won't increase our number of peers
 +              // with unfunded channels, so as long as we aren't over the maximum number of unfunded
 +              // channels per-peer we can accept channels from a peer with existing ones.
 +              if peer_state.channel_by_id.is_empty() &&
 +                      channeled_peers_without_funding >= MAX_UNFUNDED_CHANNEL_PEERS &&
 +                      !self.default_configuration.manually_accept_inbound_channels
 +              {
 +                      return Err(MsgHandleErrInternal::send_err_msg_no_close(
 +                              "Have too many peers with unfunded channels, not accepting new ones".to_owned(),
 +                              msg.temporary_channel_id.clone()));
 +              }
 +
 +              let best_block_height = self.best_block.read().unwrap().height();
 +              if Self::unfunded_channel_count(peer_state, best_block_height) >= MAX_UNFUNDED_CHANS_PER_PEER {
 +                      return Err(MsgHandleErrInternal::send_err_msg_no_close(
 +                              format!("Refusing more than {} unfunded channels.", MAX_UNFUNDED_CHANS_PER_PEER),
 +                              msg.temporary_channel_id.clone()));
 +              }
 +
                let mut channel = match Channel::new_from_req(&self.fee_estimator, &self.entropy_source, &self.signer_provider,
 -                      counterparty_node_id.clone(), &self.channel_type_features(), &peer_state.latest_features, msg, user_channel_id, &self.default_configuration,
 -                      self.best_block.read().unwrap().height(), &self.logger, outbound_scid_alias)
 +                      counterparty_node_id.clone(), &self.channel_type_features(), &peer_state.latest_features, msg, user_channel_id,
 +                      &self.default_configuration, best_block_height, &self.logger, outbound_scid_alias)
                {
                        Err(e) => {
                                self.outbound_scid_aliases.lock().unwrap().remove(&outbound_scid_alias);
        }
  
        fn internal_funding_created(&self, counterparty_node_id: &PublicKey, msg: &msgs::FundingCreated) -> Result<(), MsgHandleErrInternal> {
 +              let best_block = *self.best_block.read().unwrap();
 +
                let per_peer_state = self.per_peer_state.read().unwrap();
                let peer_state_mutex = per_peer_state.get(counterparty_node_id)
                        .ok_or_else(|| {
                                debug_assert!(false);
                                MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.temporary_channel_id)
                        })?;
 -              let ((funding_msg, monitor, mut channel_ready), mut chan) = {
 -                      let best_block = *self.best_block.read().unwrap();
 -                      let mut peer_state_lock = peer_state_mutex.lock().unwrap();
 -                      let peer_state = &mut *peer_state_lock;
 +
 +              let mut peer_state_lock = peer_state_mutex.lock().unwrap();
 +              let peer_state = &mut *peer_state_lock;
 +              let ((funding_msg, monitor), chan) =
                        match peer_state.channel_by_id.entry(msg.temporary_channel_id) {
                                hash_map::Entry::Occupied(mut chan) => {
                                        (try_chan_entry!(self, chan.get_mut().funding_created(msg, best_block, &self.signer_provider, &self.logger), chan), chan.remove())
                                },
                                hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.temporary_channel_id))
 -                      }
 -              };
 -              // Because we have exclusive ownership of the channel here we can release the peer_state
 -              // lock before watch_channel
 -              match self.chain_monitor.watch_channel(monitor.get_funding_txo().0, monitor) {
 -                      ChannelMonitorUpdateStatus::Completed => {},
 -                      ChannelMonitorUpdateStatus::PermanentFailure => {
 -                              // Note that we reply with the new channel_id in error messages if we gave up on the
 -                              // channel, not the temporary_channel_id. This is compatible with ourselves, but the
 -                              // spec is somewhat ambiguous here. Not a huge deal since we'll send error messages for
 -                              // any messages referencing a previously-closed channel anyway.
 -                              // We do not propagate the monitor update to the user as it would be for a monitor
 -                              // that we didn't manage to store (and that we don't care about - we don't respond
 -                              // with the funding_signed so the channel can never go on chain).
 -                              let (_monitor_update, failed_htlcs) = chan.force_shutdown(false);
 -                              assert!(failed_htlcs.is_empty());
 -                              return Err(MsgHandleErrInternal::send_err_msg_no_close("ChannelMonitor storage failure".to_owned(), funding_msg.channel_id));
 -                      },
 -                      ChannelMonitorUpdateStatus::InProgress => {
 -                              // There's no problem signing a counterparty's funding transaction if our monitor
 -                              // hasn't persisted to disk yet - we can't lose money on a transaction that we haven't
 -                              // accepted payment from yet. We do, however, need to wait to send our channel_ready
 -                              // until we have persisted our monitor.
 -                              chan.monitor_updating_paused(false, false, channel_ready.is_some(), Vec::new(), Vec::new(), Vec::new());
 -                              channel_ready = None; // Don't send the channel_ready now
 -                      },
 -              }
 -              // It's safe to unwrap as we've held the `per_peer_state` read lock since checking that the
 -              // peer exists, despite the inner PeerState potentially having no channels after removing
 -              // the channel above.
 -              let mut peer_state_lock = peer_state_mutex.lock().unwrap();
 -              let peer_state = &mut *peer_state_lock;
 +                      };
 +
                match peer_state.channel_by_id.entry(funding_msg.channel_id) {
                        hash_map::Entry::Occupied(_) => {
 -                              return Err(MsgHandleErrInternal::send_err_msg_no_close("Already had channel with the new channel_id".to_owned(), funding_msg.channel_id))
 +                              Err(MsgHandleErrInternal::send_err_msg_no_close("Already had channel with the new channel_id".to_owned(), funding_msg.channel_id))
                        },
                        hash_map::Entry::Vacant(e) => {
 -                              let mut id_to_peer = self.id_to_peer.lock().unwrap();
 -                              match id_to_peer.entry(chan.channel_id()) {
 +                              match self.id_to_peer.lock().unwrap().entry(chan.channel_id()) {
                                        hash_map::Entry::Occupied(_) => {
                                                return Err(MsgHandleErrInternal::send_err_msg_no_close(
                                                        "The funding_created message had the same funding_txid as an existing channel - funding is not possible".to_owned(),
                                                i_e.insert(chan.get_counterparty_node_id());
                                        }
                                }
 +
 +                              // There's no problem signing a counterparty's funding transaction if our monitor
 +                              // hasn't persisted to disk yet - we can't lose money on a transaction that we haven't
 +                              // accepted payment from yet. We do, however, need to wait to send our channel_ready
 +                              // until we have persisted our monitor.
 +                              let new_channel_id = funding_msg.channel_id;
                                peer_state.pending_msg_events.push(events::MessageSendEvent::SendFundingSigned {
                                        node_id: counterparty_node_id.clone(),
                                        msg: funding_msg,
                                });
 -                              if let Some(msg) = channel_ready {
 -                                      send_channel_ready!(self, peer_state.pending_msg_events, chan, msg);
 +
 +                              let monitor_res = self.chain_monitor.watch_channel(monitor.get_funding_txo().0, monitor);
 +
 +                              let chan = e.insert(chan);
 +                              let mut res = handle_new_monitor_update!(self, monitor_res, 0, peer_state_lock, peer_state, chan, MANUALLY_REMOVING, { peer_state.channel_by_id.remove(&new_channel_id) });
 +
 +                              // Note that we reply with the new channel_id in error messages if we gave up on the
 +                              // channel, not the temporary_channel_id. This is compatible with ourselves, but the
 +                              // spec is somewhat ambiguous here. Not a huge deal since we'll send error messages for
 +                              // any messages referencing a previously-closed channel anyway.
 +                              // We do not propagate the monitor update to the user as it would be for a monitor
 +                              // that we didn't manage to store (and that we don't care about - we don't respond
 +                              // with the funding_signed so the channel can never go on chain).
 +                              if let Err(MsgHandleErrInternal { shutdown_finish: Some((res, _)), .. }) = &mut res {
 +                                      res.0 = None;
                                }
 -                              e.insert(chan);
 +                              res
                        }
                }
 -              Ok(())
        }
  
        fn internal_funding_signed(&self, counterparty_node_id: &PublicKey, msg: &msgs::FundingSigned) -> Result<(), MsgHandleErrInternal> {
 -              let funding_tx = {
 -                      let best_block = *self.best_block.read().unwrap();
 -                      let per_peer_state = self.per_peer_state.read().unwrap();
 -                      let peer_state_mutex = per_peer_state.get(counterparty_node_id)
 -                              .ok_or_else(|| {
 -                                      debug_assert!(false);
 -                                      MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id)
 -                              })?;
 +              let best_block = *self.best_block.read().unwrap();
 +              let per_peer_state = self.per_peer_state.read().unwrap();
 +              let peer_state_mutex = per_peer_state.get(counterparty_node_id)
 +                      .ok_or_else(|| {
 +                              debug_assert!(false);
 +                              MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id)
 +                      })?;
  
 -                      let mut peer_state_lock = peer_state_mutex.lock().unwrap();
 -                      let peer_state = &mut *peer_state_lock;
 -                      match peer_state.channel_by_id.entry(msg.channel_id) {
 -                              hash_map::Entry::Occupied(mut chan) => {
 -                                      let (monitor, funding_tx, channel_ready) = match chan.get_mut().funding_signed(&msg, best_block, &self.signer_provider, &self.logger) {
 -                                              Ok(update) => update,
 -                                              Err(e) => try_chan_entry!(self, Err(e), chan),
 -                                      };
 -                                      match self.chain_monitor.watch_channel(chan.get().get_funding_txo().unwrap(), monitor) {
 -                                              ChannelMonitorUpdateStatus::Completed => {},
 -                                              e => {
 -                                                      let mut res = handle_monitor_update_res!(self, e, chan, RAACommitmentOrder::RevokeAndACKFirst, channel_ready.is_some(), OPTIONALLY_RESEND_FUNDING_LOCKED);
 -                                                      if let Err(MsgHandleErrInternal { ref mut shutdown_finish, .. }) = res {
 -                                                              // We weren't able to watch the channel to begin with, so no updates should be made on
 -                                                              // it. Previously, full_stack_target found an (unreachable) panic when the
 -                                                              // monitor update contained within `shutdown_finish` was applied.
 -                                                              if let Some((ref mut shutdown_finish, _)) = shutdown_finish {
 -                                                                      shutdown_finish.0.take();
 -                                                              }
 -                                                      }
 -                                                      return res
 -                                              },
 -                                      }
 -                                      if let Some(msg) = channel_ready {
 -                                              send_channel_ready!(self, peer_state.pending_msg_events, chan.get(), msg);
 +              let mut peer_state_lock = peer_state_mutex.lock().unwrap();
 +              let peer_state = &mut *peer_state_lock;
 +              match peer_state.channel_by_id.entry(msg.channel_id) {
 +                      hash_map::Entry::Occupied(mut chan) => {
 +                              let monitor = try_chan_entry!(self,
 +                                      chan.get_mut().funding_signed(&msg, best_block, &self.signer_provider, &self.logger), chan);
 +                              let update_res = self.chain_monitor.watch_channel(chan.get().get_funding_txo().unwrap(), monitor);
 +                              let mut res = handle_new_monitor_update!(self, update_res, 0, peer_state_lock, peer_state, chan);
 +                              if let Err(MsgHandleErrInternal { ref mut shutdown_finish, .. }) = res {
 +                                      // We weren't able to watch the channel to begin with, so no updates should be made on
 +                                      // it. Previously, full_stack_target found an (unreachable) panic when the
 +                                      // monitor update contained within `shutdown_finish` was applied.
 +                                      if let Some((ref mut shutdown_finish, _)) = shutdown_finish {
 +                                              shutdown_finish.0.take();
                                        }
 -                                      funding_tx
 -                              },
 -                              hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id))
 -                      }
 -              };
 -              log_info!(self.logger, "Broadcasting funding transaction with txid {}", funding_tx.txid());
 -              self.tx_broadcaster.broadcast_transaction(&funding_tx);
 -              Ok(())
 +                              }
 +                              res
 +                      },
 +                      hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel".to_owned(), msg.channel_id))
 +              }
        }
  
        fn internal_channel_ready(&self, counterparty_node_id: &PublicKey, msg: &msgs::ChannelReady) -> Result<(), MsgHandleErrInternal> {
                                                        if chan_entry.get().sent_shutdown() { " after we initiated shutdown" } else { "" });
                                        }
  
 -                                      let (shutdown, monitor_update, htlcs) = try_chan_entry!(self, chan_entry.get_mut().shutdown(&self.signer_provider, &peer_state.latest_features, &msg), chan_entry);
 +                                      let funding_txo_opt = chan_entry.get().get_funding_txo();
 +                                      let (shutdown, monitor_update_opt, htlcs) = try_chan_entry!(self,
 +                                              chan_entry.get_mut().shutdown(&self.signer_provider, &peer_state.latest_features, &msg), chan_entry);
                                        dropped_htlcs = htlcs;
  
 -                                      // Update the monitor with the shutdown script if necessary.
 -                                      if let Some(monitor_update) = monitor_update {
 -                                              let update_res = self.chain_monitor.update_channel(chan_entry.get().get_funding_txo().unwrap(), &monitor_update);
 -                                              let (result, is_permanent) =
 -                                                      handle_monitor_update_res!(self, update_res, chan_entry.get_mut(), RAACommitmentOrder::CommitmentFirst, chan_entry.key(), NO_UPDATE);
 -                                              if is_permanent {
 -                                                      remove_channel!(self, chan_entry);
 -                                                      break result;
 -                                              }
 -                                      }
 -
                                        if let Some(msg) = shutdown {
 +                                              // We can send the `shutdown` message before updating the `ChannelMonitor`
 +                                              // here as we don't need the monitor update to complete until we send a
 +                                              // `shutdown_signed`, which we'll delay if we're pending a monitor update.
                                                peer_state.pending_msg_events.push(events::MessageSendEvent::SendShutdown {
                                                        node_id: *counterparty_node_id,
                                                        msg,
                                                });
                                        }
  
 +                                      // Update the monitor with the shutdown script if necessary.
 +                                      if let Some(monitor_update) = monitor_update_opt {
 +                                              let update_id = monitor_update.update_id;
 +                                              let update_res = self.chain_monitor.update_channel(funding_txo_opt.unwrap(), monitor_update);
 +                                              break handle_new_monitor_update!(self, update_res, update_id, peer_state_lock, peer_state, chan_entry);
 +                                      }
                                        break Ok(());
                                },
                                hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id))
                        self.fail_htlc_backwards_internal(&htlc_source.0, &htlc_source.1, &reason, receiver);
                }
  
 -              let _ = handle_error!(self, result, *counterparty_node_id);
 -              Ok(())
 +              result
        }
  
        fn internal_closing_signed(&self, counterparty_node_id: &PublicKey, msg: &msgs::ClosingSigned) -> Result<(), MsgHandleErrInternal> {
                let peer_state = &mut *peer_state_lock;
                match peer_state.channel_by_id.entry(msg.channel_id) {
                        hash_map::Entry::Occupied(mut chan) => {
 -                              let (revoke_and_ack, commitment_signed, monitor_update) =
 -                                      match chan.get_mut().commitment_signed(&msg, &self.logger) {
 -                                              Err((None, e)) => try_chan_entry!(self, Err(e), chan),
 -                                              Err((Some(update), e)) => {
 -                                                      assert!(chan.get().is_awaiting_monitor_update());
 -                                                      let _ = self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), &update);
 -                                                      try_chan_entry!(self, Err(e), chan);
 -                                                      unreachable!();
 -                                              },
 -                                              Ok(res) => res
 -                                      };
 -                              let update_res = self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), &monitor_update);
 -                              if let Err(e) = handle_monitor_update_res!(self, update_res, chan, RAACommitmentOrder::RevokeAndACKFirst, true, commitment_signed.is_some()) {
 -                                      return Err(e);
 -                              }
 -
 -                              peer_state.pending_msg_events.push(events::MessageSendEvent::SendRevokeAndACK {
 -                                      node_id: counterparty_node_id.clone(),
 -                                      msg: revoke_and_ack,
 -                              });
 -                              if let Some(msg) = commitment_signed {
 -                                      peer_state.pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs {
 -                                              node_id: counterparty_node_id.clone(),
 -                                              updates: msgs::CommitmentUpdate {
 -                                                      update_add_htlcs: Vec::new(),
 -                                                      update_fulfill_htlcs: Vec::new(),
 -                                                      update_fail_htlcs: Vec::new(),
 -                                                      update_fail_malformed_htlcs: Vec::new(),
 -                                                      update_fee: None,
 -                                                      commitment_signed: msg,
 -                                              },
 -                                      });
 -                              }
 -                              Ok(())
 +                              let funding_txo = chan.get().get_funding_txo();
 +                              let monitor_update = try_chan_entry!(self, chan.get_mut().commitment_signed(&msg, &self.logger), chan);
 +                              let update_res = self.chain_monitor.update_channel(funding_txo.unwrap(), monitor_update);
 +                              let update_id = monitor_update.update_id;
 +                              handle_new_monitor_update!(self, update_res, update_id, peer_state_lock,
 +                                      peer_state, chan)
                        },
                        hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id))
                }
        }
  
        fn internal_revoke_and_ack(&self, counterparty_node_id: &PublicKey, msg: &msgs::RevokeAndACK) -> Result<(), MsgHandleErrInternal> {
 -              let mut htlcs_to_fail = Vec::new();
 -              let res = loop {
 +              let (htlcs_to_fail, res) = {
                        let per_peer_state = self.per_peer_state.read().unwrap();
                        let peer_state_mutex = per_peer_state.get(counterparty_node_id)
                                .ok_or_else(|| {
                        let peer_state = &mut *peer_state_lock;
                        match peer_state.channel_by_id.entry(msg.channel_id) {
                                hash_map::Entry::Occupied(mut chan) => {
 -                                      let was_paused_for_mon_update = chan.get().is_awaiting_monitor_update();
 -                                      let raa_updates = break_chan_entry!(self,
 -                                              chan.get_mut().revoke_and_ack(&msg, &self.logger), chan);
 -                                      htlcs_to_fail = raa_updates.holding_cell_failed_htlcs;
 -                                      let update_res = self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), &raa_updates.monitor_update);
 -                                      if was_paused_for_mon_update {
 -                                              assert!(update_res != ChannelMonitorUpdateStatus::Completed);
 -                                              assert!(raa_updates.commitment_update.is_none());
 -                                              assert!(raa_updates.accepted_htlcs.is_empty());
 -                                              assert!(raa_updates.failed_htlcs.is_empty());
 -                                              assert!(raa_updates.finalized_claimed_htlcs.is_empty());
 -                                              break Err(MsgHandleErrInternal::ignore_no_close("Existing pending monitor update prevented responses to RAA".to_owned()));
 -                                      }
 -                                      if update_res != ChannelMonitorUpdateStatus::Completed {
 -                                              if let Err(e) = handle_monitor_update_res!(self, update_res, chan,
 -                                                              RAACommitmentOrder::CommitmentFirst, false,
 -                                                              raa_updates.commitment_update.is_some(), false,
 -                                                              raa_updates.accepted_htlcs, raa_updates.failed_htlcs,
 -                                                              raa_updates.finalized_claimed_htlcs) {
 -                                                      break Err(e);
 -                                              } else { unreachable!(); }
 -                                      }
 -                                      if let Some(updates) = raa_updates.commitment_update {
 -                                              peer_state.pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs {
 -                                                      node_id: counterparty_node_id.clone(),
 -                                                      updates,
 -                                              });
 -                                      }
 -                                      break Ok((raa_updates.accepted_htlcs, raa_updates.failed_htlcs,
 -                                                      raa_updates.finalized_claimed_htlcs,
 -                                                      chan.get().get_short_channel_id()
 -                                                              .unwrap_or(chan.get().outbound_scid_alias()),
 -                                                      chan.get().get_funding_txo().unwrap(),
 -                                                      chan.get().get_user_id()))
 +                                      let funding_txo = chan.get().get_funding_txo();
 +                                      let (htlcs_to_fail, monitor_update) = try_chan_entry!(self, chan.get_mut().revoke_and_ack(&msg, &self.logger), chan);
 +                                      let update_res = self.chain_monitor.update_channel(funding_txo.unwrap(), monitor_update);
 +                                      let update_id = monitor_update.update_id;
 +                                      let res = handle_new_monitor_update!(self, update_res, update_id, peer_state_lock,
 +                                              peer_state, chan);
 +                                      (htlcs_to_fail, res)
                                },
 -                              hash_map::Entry::Vacant(_) => break Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id))
 +                              hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id))
                        }
                };
                self.fail_holding_cell_htlcs(htlcs_to_fail, msg.channel_id, counterparty_node_id);
 -              match res {
 -                      Ok((pending_forwards, mut pending_failures, finalized_claim_htlcs,
 -                              short_channel_id, channel_outpoint, user_channel_id)) =>
 -                      {
 -                              for failure in pending_failures.drain(..) {
 -                                      let receiver = HTLCDestination::NextHopChannel { node_id: Some(*counterparty_node_id), channel_id: channel_outpoint.to_channel_id() };
 -                                      self.fail_htlc_backwards_internal(&failure.0, &failure.1, &failure.2, receiver);
 -                              }
 -                              self.forward_htlcs(&mut [(short_channel_id, channel_outpoint, user_channel_id, pending_forwards)]);
 -                              self.finalize_claims(finalized_claim_htlcs);
 -                              Ok(())
 -                      },
 -                      Err(e) => Err(e)
 -              }
 +              res
        }
  
        fn internal_update_fee(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateFee) -> Result<(), MsgHandleErrInternal> {
                let mut has_monitor_update = false;
                let mut failed_htlcs = Vec::new();
                let mut handle_errors = Vec::new();
 -              {
 -                      let per_peer_state = self.per_peer_state.read().unwrap();
 +              let per_peer_state = self.per_peer_state.read().unwrap();
  
 -                      for (_cp_id, peer_state_mutex) in per_peer_state.iter() {
 +              for (_cp_id, peer_state_mutex) in per_peer_state.iter() {
 +                      'chan_loop: loop {
                                let mut peer_state_lock = peer_state_mutex.lock().unwrap();
 -                              let peer_state = &mut *peer_state_lock;
 -                              let pending_msg_events = &mut peer_state.pending_msg_events;
 -                              peer_state.channel_by_id.retain(|channel_id, chan| {
 -                                      match chan.maybe_free_holding_cell_htlcs(&self.logger) {
 -                                              Ok((commitment_opt, holding_cell_failed_htlcs)) => {
 -                                                      if !holding_cell_failed_htlcs.is_empty() {
 -                                                              failed_htlcs.push((
 -                                                                      holding_cell_failed_htlcs,
 -                                                                      *channel_id,
 -                                                                      chan.get_counterparty_node_id()
 -                                                              ));
 -                                                      }
 -                                                      if let Some((commitment_update, monitor_update)) = commitment_opt {
 -                                                              match self.chain_monitor.update_channel(chan.get_funding_txo().unwrap(), &monitor_update) {
 -                                                                      ChannelMonitorUpdateStatus::Completed => {
 -                                                                              pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs {
 -                                                                                      node_id: chan.get_counterparty_node_id(),
 -                                                                                      updates: commitment_update,
 -                                                                              });
 -                                                                      },
 -                                                                      e => {
 -                                                                              has_monitor_update = true;
 -                                                                              let (res, close_channel) = handle_monitor_update_res!(self, e, chan, RAACommitmentOrder::CommitmentFirst, channel_id, COMMITMENT_UPDATE_ONLY);
 -                                                                              handle_errors.push((chan.get_counterparty_node_id(), res));
 -                                                                              if close_channel { return false; }
 -                                                                      },
 -                                                              }
 -                                                      }
 -                                                      true
 -                                              },
 -                                              Err(e) => {
 -                                                      let (close_channel, res) = convert_chan_err!(self, e, chan, channel_id);
 -                                                      handle_errors.push((chan.get_counterparty_node_id(), Err(res)));
 -                                                      // ChannelClosed event is generated by handle_error for us
 -                                                      !close_channel
 +                              let peer_state: &mut PeerState<_> = &mut *peer_state_lock;
 +                              for (channel_id, chan) in peer_state.channel_by_id.iter_mut() {
 +                                      let counterparty_node_id = chan.get_counterparty_node_id();
 +                                      let funding_txo = chan.get_funding_txo();
 +                                      let (monitor_opt, holding_cell_failed_htlcs) =
 +                                              chan.maybe_free_holding_cell_htlcs(&self.logger);
 +                                      if !holding_cell_failed_htlcs.is_empty() {
 +                                              failed_htlcs.push((holding_cell_failed_htlcs, *channel_id, counterparty_node_id));
 +                                      }
 +                                      if let Some(monitor_update) = monitor_opt {
 +                                              has_monitor_update = true;
 +
 +                                              let update_res = self.chain_monitor.update_channel(
 +                                                      funding_txo.expect("channel is live"), monitor_update);
 +                                              let update_id = monitor_update.update_id;
 +                                              let channel_id: [u8; 32] = *channel_id;
 +                                              let res = handle_new_monitor_update!(self, update_res, update_id,
 +                                                      peer_state_lock, peer_state, chan, MANUALLY_REMOVING,
 +                                                      peer_state.channel_by_id.remove(&channel_id));
 +                                              if res.is_err() {
 +                                                      handle_errors.push((counterparty_node_id, res));
                                                }
 +                                              continue 'chan_loop;
                                        }
 -                              });
 +                              }
 +                              break 'chan_loop;
                        }
                }
  
@@@ -6184,13 -6246,13 +6185,13 @@@ wher
                let _ = handle_error!(self, self.internal_channel_reestablish(counterparty_node_id, msg), *counterparty_node_id);
        }
  
 -      fn peer_disconnected(&self, counterparty_node_id: &PublicKey, no_connection_possible: bool) {
 +      fn peer_disconnected(&self, counterparty_node_id: &PublicKey) {
                let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);
                let mut failed_channels = Vec::new();
                let mut per_peer_state = self.per_peer_state.write().unwrap();
                let remove_peer = {
 -                      log_debug!(self.logger, "Marking channels with {} disconnected and generating channel_updates. We believe we {} make future connections to this peer.",
 -                              log_pubkey!(counterparty_node_id), if no_connection_possible { "cannot" } else { "can" });
 +                      log_debug!(self.logger, "Marking channels with {} disconnected and generating channel_updates.",
 +                              log_pubkey!(counterparty_node_id));
                        if let Some(peer_state_mutex) = per_peer_state.get(counterparty_node_id) {
                                let mut peer_state_lock = peer_state_mutex.lock().unwrap();
                                let peer_state = &mut *peer_state_lock;
                                debug_assert!(peer_state.is_connected, "A disconnected peer cannot disconnect");
                                peer_state.is_connected = false;
                                peer_state.ok_to_remove(true)
 -                      } else { true }
 +                      } else { debug_assert!(false, "Unconnected peer disconnected"); true }
                };
                if remove_peer {
                        per_peer_state.remove(counterparty_node_id);
                }
        }
  
 -      fn peer_connected(&self, counterparty_node_id: &PublicKey, init_msg: &msgs::Init) -> Result<(), ()> {
 +      fn peer_connected(&self, counterparty_node_id: &PublicKey, init_msg: &msgs::Init, inbound: bool) -> Result<(), ()> {
                if !init_msg.features.supports_static_remote_key() {
 -                      log_debug!(self.logger, "Peer {} does not support static remote key, disconnecting with no_connection_possible", log_pubkey!(counterparty_node_id));
 +                      log_debug!(self.logger, "Peer {} does not support static remote key, disconnecting", log_pubkey!(counterparty_node_id));
                        return Err(());
                }
  
 -              log_debug!(self.logger, "Generating channel_reestablish events for {}", log_pubkey!(counterparty_node_id));
 -
                let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);
  
 +              // If we have too many peers connected which don't have funded channels, disconnect the
 +              // peer immediately (as long as it doesn't have funded channels). If we have a bunch of
 +              // unfunded channels taking up space in memory for disconnected peers, we still let new
 +              // peers connect, but we'll reject new channels from them.
 +              let connected_peers_without_funded_channels = self.peers_without_funded_channels(|node| node.is_connected);
 +              let inbound_peer_limited = inbound && connected_peers_without_funded_channels >= MAX_NO_CHANNEL_PEERS;
 +
                {
                        let mut peer_state_lock = self.per_peer_state.write().unwrap();
                        match peer_state_lock.entry(counterparty_node_id.clone()) {
                                hash_map::Entry::Vacant(e) => {
 +                                      if inbound_peer_limited {
 +                                              return Err(());
 +                                      }
                                        e.insert(Mutex::new(PeerState {
                                                channel_by_id: HashMap::new(),
                                                latest_features: init_msg.features.clone(),
                                                pending_msg_events: Vec::new(),
 +                                              monitor_update_blocked_actions: BTreeMap::new(),
                                                is_connected: true,
                                        }));
                                },
                                hash_map::Entry::Occupied(e) => {
                                        let mut peer_state = e.get().lock().unwrap();
                                        peer_state.latest_features = init_msg.features.clone();
 +
 +                                      let best_block_height = self.best_block.read().unwrap().height();
 +                                      if inbound_peer_limited &&
 +                                              Self::unfunded_channel_count(&*peer_state, best_block_height) ==
 +                                              peer_state.channel_by_id.len()
 +                                      {
 +                                              return Err(());
 +                                      }
 +
                                        debug_assert!(!peer_state.is_connected, "A peer shouldn't be connected twice");
                                        peer_state.is_connected = true;
                                },
                        }
                }
  
 -              let per_peer_state = self.per_peer_state.read().unwrap();
 +              log_debug!(self.logger, "Generating channel_reestablish events for {}", log_pubkey!(counterparty_node_id));
  
 +              let per_peer_state = self.per_peer_state.read().unwrap();
                for (_cp_id, peer_state_mutex) in per_peer_state.iter() {
                        let mut peer_state_lock = peer_state_mutex.lock().unwrap();
                        let peer_state = &mut *peer_state_lock;
@@@ -6908,14 -6951,10 +6909,14 @@@ wher
                        htlc_purposes.push(purpose);
                }
  
 +              let mut monitor_update_blocked_actions_per_peer = None;
 +              let mut peer_states = Vec::new();
 +              for (_, peer_state_mutex) in per_peer_state.iter() {
 +                      peer_states.push(peer_state_mutex.lock().unwrap());
 +              }
 +
                (serializable_peer_count).write(writer)?;
 -              for (peer_pubkey, peer_state_mutex) in per_peer_state.iter() {
 -                      let peer_state_lock = peer_state_mutex.lock().unwrap();
 -                      let peer_state = &*peer_state_lock;
 +              for ((peer_pubkey, _), peer_state) in per_peer_state.iter().zip(peer_states.iter()) {
                        // Peers which we have no channels to should be dropped once disconnected. As we
                        // disconnect all peers when shutting down and serializing the ChannelManager, we
                        // consider all peers as disconnected here. There's therefore no need write peers with
                        if !peer_state.ok_to_remove(false) {
                                peer_pubkey.write(writer)?;
                                peer_state.latest_features.write(writer)?;
 +                              if !peer_state.monitor_update_blocked_actions.is_empty() {
 +                                      monitor_update_blocked_actions_per_peer
 +                                              .get_or_insert_with(Vec::new)
 +                                              .push((*peer_pubkey, &peer_state.monitor_update_blocked_actions));
 +                              }
                        }
                }
  
                        // LDK versions prior to 0.0.113 do not know how to read the pending claimed payments
                        // map. Thus, if there are no entries we skip writing a TLV for it.
                        pending_claiming_payments = None;
 -              } else {
 -                      debug_assert!(false, "While we have code to serialize pending_claiming_payments, the map should always be empty until a later PR");
                }
  
                write_tlv_fields!(writer, {
                        (3, pending_outbound_payments, required),
                        (4, pending_claiming_payments, option),
                        (5, self.our_network_pubkey, required),
 +                      (6, monitor_update_blocked_actions_per_peer, option),
                        (7, self.fake_scid_rand_bytes, required),
                        (9, htlc_purposes, vec_type),
                        (11, self.probing_cookie_secret, required),
@@@ -7326,7 -7361,6 +7327,7 @@@ wher
                                channel_by_id: peer_channels.remove(&peer_pubkey).unwrap_or(HashMap::new()),
                                latest_features: Readable::read(reader)?,
                                pending_msg_events: Vec::new(),
 +                              monitor_update_blocked_actions: BTreeMap::new(),
                                is_connected: false,
                        };
                        per_peer_state.insert(peer_pubkey, Mutex::new(peer_state));
                let mut probing_cookie_secret: Option<[u8; 32]> = None;
                let mut claimable_htlc_purposes = None;
                let mut pending_claiming_payments = Some(HashMap::new());
 +              let mut monitor_update_blocked_actions_per_peer = Some(Vec::new());
                read_tlv_fields!(reader, {
                        (1, pending_outbound_payments_no_retry, option),
                        (2, pending_intercepted_htlcs, option),
                        (3, pending_outbound_payments, option),
                        (4, pending_claiming_payments, option),
                        (5, received_network_pubkey, option),
 +                      (6, monitor_update_blocked_actions_per_peer, option),
                        (7, fake_scid_rand_bytes, option),
                        (9, claimable_htlc_purposes, vec_type),
                        (11, probing_cookie_secret, option),
                        }
                }
  
 +              for (node_id, monitor_update_blocked_actions) in monitor_update_blocked_actions_per_peer.unwrap() {
 +                      if let Some(peer_state) = per_peer_state.get_mut(&node_id) {
 +                              peer_state.lock().unwrap().monitor_update_blocked_actions = monitor_update_blocked_actions;
 +                      } else {
 +                              log_error!(args.logger, "Got blocked actions without a per-peer-state for {}", node_id);
 +                              return Err(DecodeError::InvalidValue);
 +                      }
 +              }
 +
                let channel_manager = ChannelManager {
                        genesis_hash,
                        fee_estimator: bounded_fee_estimator,
@@@ -8164,8 -8187,8 +8165,8 @@@ mod tests 
  
                let chan = create_announced_chan_between_nodes(&nodes, 0, 1);
  
 -              nodes[0].node.peer_disconnected(&nodes[1].node.get_our_node_id(), false);
 -              nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id(), false);
 +              nodes[0].node.peer_disconnected(&nodes[1].node.get_our_node_id());
 +              nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id());
  
                nodes[0].node.force_close_broadcasting_latest_txn(&chan.2, &nodes[1].node.get_our_node_id()).unwrap();
                check_closed_broadcast!(nodes[0], true);
                check_unkown_peer_error(nodes[0].node.update_channel_config(&unkown_public_key, &[channel_id], &ChannelConfig::default()), unkown_public_key);
        }
  
 +      #[test]
 +      fn test_connection_limiting() {
 +              // Test that we limit un-channel'd peers and un-funded channels properly.
 +              let chanmon_cfgs = create_chanmon_cfgs(2);
 +              let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
 +              let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
 +              let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
 +
 +              // Note that create_network connects the nodes together for us
 +
 +              nodes[0].node.create_channel(nodes[1].node.get_our_node_id(), 100_000, 0, 42, None).unwrap();
 +              let mut open_channel_msg = get_event_msg!(nodes[0], MessageSendEvent::SendOpenChannel, nodes[1].node.get_our_node_id());
 +
 +              let mut funding_tx = None;
 +              for idx in 0..super::MAX_UNFUNDED_CHANS_PER_PEER {
 +                      nodes[1].node.handle_open_channel(&nodes[0].node.get_our_node_id(), &open_channel_msg);
 +                      let accept_channel = get_event_msg!(nodes[1], MessageSendEvent::SendAcceptChannel, nodes[0].node.get_our_node_id());
 +
 +                      if idx == 0 {
 +                              nodes[0].node.handle_accept_channel(&nodes[1].node.get_our_node_id(), &accept_channel);
 +                              let (temporary_channel_id, tx, _) = create_funding_transaction(&nodes[0], &nodes[1].node.get_our_node_id(), 100_000, 42);
 +                              funding_tx = Some(tx.clone());
 +                              nodes[0].node.funding_transaction_generated(&temporary_channel_id, &nodes[1].node.get_our_node_id(), tx).unwrap();
 +                              let funding_created_msg = get_event_msg!(nodes[0], MessageSendEvent::SendFundingCreated, nodes[1].node.get_our_node_id());
 +
 +                              nodes[1].node.handle_funding_created(&nodes[0].node.get_our_node_id(), &funding_created_msg);
 +                              check_added_monitors!(nodes[1], 1);
 +                              let funding_signed = get_event_msg!(nodes[1], MessageSendEvent::SendFundingSigned, nodes[0].node.get_our_node_id());
 +
 +                              nodes[0].node.handle_funding_signed(&nodes[1].node.get_our_node_id(), &funding_signed);
 +                              check_added_monitors!(nodes[0], 1);
 +                      }
 +                      open_channel_msg.temporary_channel_id = nodes[0].keys_manager.get_secure_random_bytes();
 +              }
 +
 +              // A MAX_UNFUNDED_CHANS_PER_PEER + 1 channel will be summarily rejected
 +              open_channel_msg.temporary_channel_id = nodes[0].keys_manager.get_secure_random_bytes();
 +              nodes[1].node.handle_open_channel(&nodes[0].node.get_our_node_id(), &open_channel_msg);
 +              assert_eq!(get_err_msg!(nodes[1], nodes[0].node.get_our_node_id()).channel_id,
 +                      open_channel_msg.temporary_channel_id);
 +
 +              // Further, because all of our channels with nodes[0] are inbound, and none of them funded,
 +              // it doesn't count as a "protected" peer, i.e. it counts towards the MAX_NO_CHANNEL_PEERS
 +              // limit.
 +              let mut peer_pks = Vec::with_capacity(super::MAX_NO_CHANNEL_PEERS);
 +              for _ in 1..super::MAX_NO_CHANNEL_PEERS {
 +                      let random_pk = PublicKey::from_secret_key(&nodes[0].node.secp_ctx,
 +                              &SecretKey::from_slice(&nodes[1].keys_manager.get_secure_random_bytes()).unwrap());
 +                      peer_pks.push(random_pk);
 +                      nodes[1].node.peer_connected(&random_pk, &msgs::Init {
 +                              features: nodes[0].node.init_features(), remote_network_address: None }, true).unwrap();
 +              }
 +              let last_random_pk = PublicKey::from_secret_key(&nodes[0].node.secp_ctx,
 +                      &SecretKey::from_slice(&nodes[1].keys_manager.get_secure_random_bytes()).unwrap());
 +              nodes[1].node.peer_connected(&last_random_pk, &msgs::Init {
 +                      features: nodes[0].node.init_features(), remote_network_address: None }, true).unwrap_err();
 +
 +              // Also importantly, because nodes[0] isn't "protected", we will refuse a reconnection from
 +              // them if we have too many un-channel'd peers.
 +              nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id());
 +              let chan_closed_events = nodes[1].node.get_and_clear_pending_events();
 +              assert_eq!(chan_closed_events.len(), super::MAX_UNFUNDED_CHANS_PER_PEER - 1);
 +              for ev in chan_closed_events {
 +                      if let Event::ChannelClosed { .. } = ev { } else { panic!(); }
 +              }
 +              nodes[1].node.peer_connected(&last_random_pk, &msgs::Init {
 +                      features: nodes[0].node.init_features(), remote_network_address: None }, true).unwrap();
 +              nodes[1].node.peer_connected(&nodes[0].node.get_our_node_id(), &msgs::Init {
 +                      features: nodes[0].node.init_features(), remote_network_address: None }, true).unwrap_err();
 +
 +              // but of course if the connection is outbound its allowed...
 +              nodes[1].node.peer_connected(&nodes[0].node.get_our_node_id(), &msgs::Init {
 +                      features: nodes[0].node.init_features(), remote_network_address: None }, false).unwrap();
 +              nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id());
 +
 +              // Now nodes[0] is disconnected but still has a pending, un-funded channel lying around.
 +              // Even though we accept one more connection from new peers, we won't actually let them
 +              // open channels.
 +              assert!(peer_pks.len() > super::MAX_UNFUNDED_CHANNEL_PEERS - 1);
 +              for i in 0..super::MAX_UNFUNDED_CHANNEL_PEERS - 1 {
 +                      nodes[1].node.handle_open_channel(&peer_pks[i], &open_channel_msg);
 +                      get_event_msg!(nodes[1], MessageSendEvent::SendAcceptChannel, peer_pks[i]);
 +                      open_channel_msg.temporary_channel_id = nodes[0].keys_manager.get_secure_random_bytes();
 +              }
 +              nodes[1].node.handle_open_channel(&last_random_pk, &open_channel_msg);
 +              assert_eq!(get_err_msg!(nodes[1], last_random_pk).channel_id,
 +                      open_channel_msg.temporary_channel_id);
 +
 +              // Of course, however, outbound channels are always allowed
 +              nodes[1].node.create_channel(last_random_pk, 100_000, 0, 42, None).unwrap();
 +              get_event_msg!(nodes[1], MessageSendEvent::SendOpenChannel, last_random_pk);
 +
 +              // If we fund the first channel, nodes[0] has a live on-chain channel with us, it is now
 +              // "protected" and can connect again.
 +              mine_transaction(&nodes[1], funding_tx.as_ref().unwrap());
 +              nodes[1].node.peer_connected(&nodes[0].node.get_our_node_id(), &msgs::Init {
 +                      features: nodes[0].node.init_features(), remote_network_address: None }, true).unwrap();
 +              get_event_msg!(nodes[1], MessageSendEvent::SendChannelReestablish, nodes[0].node.get_our_node_id());
 +
 +              // Further, because the first channel was funded, we can open another channel with
 +              // last_random_pk.
 +              nodes[1].node.handle_open_channel(&last_random_pk, &open_channel_msg);
 +              get_event_msg!(nodes[1], MessageSendEvent::SendAcceptChannel, last_random_pk);
 +      }
 +
 +      #[test]
 +      fn test_outbound_chans_unlimited() {
 +              // Test that we never refuse an outbound channel even if a peer is unfuned-channel-limited
 +              let chanmon_cfgs = create_chanmon_cfgs(2);
 +              let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
 +              let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
 +              let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
 +
 +              // Note that create_network connects the nodes together for us
 +
 +              nodes[0].node.create_channel(nodes[1].node.get_our_node_id(), 100_000, 0, 42, None).unwrap();
 +              let mut open_channel_msg = get_event_msg!(nodes[0], MessageSendEvent::SendOpenChannel, nodes[1].node.get_our_node_id());
 +
 +              for _ in 0..super::MAX_UNFUNDED_CHANS_PER_PEER {
 +                      nodes[1].node.handle_open_channel(&nodes[0].node.get_our_node_id(), &open_channel_msg);
 +                      get_event_msg!(nodes[1], MessageSendEvent::SendAcceptChannel, nodes[0].node.get_our_node_id());
 +                      open_channel_msg.temporary_channel_id = nodes[0].keys_manager.get_secure_random_bytes();
 +              }
 +
 +              // Once we have MAX_UNFUNDED_CHANS_PER_PEER unfunded channels, new inbound channels will be
 +              // rejected.
 +              nodes[1].node.handle_open_channel(&nodes[0].node.get_our_node_id(), &open_channel_msg);
 +              assert_eq!(get_err_msg!(nodes[1], nodes[0].node.get_our_node_id()).channel_id,
 +                      open_channel_msg.temporary_channel_id);
 +
 +              // but we can still open an outbound channel.
 +              nodes[1].node.create_channel(nodes[0].node.get_our_node_id(), 100_000, 0, 42, None).unwrap();
 +              get_event_msg!(nodes[1], MessageSendEvent::SendOpenChannel, nodes[0].node.get_our_node_id());
 +
 +              // but even with such an outbound channel, additional inbound channels will still fail.
 +              nodes[1].node.handle_open_channel(&nodes[0].node.get_our_node_id(), &open_channel_msg);
 +              assert_eq!(get_err_msg!(nodes[1], nodes[0].node.get_our_node_id()).channel_id,
 +                      open_channel_msg.temporary_channel_id);
 +      }
 +
 +      #[test]
 +      fn test_0conf_limiting() {
 +              // Tests that we properly limit inbound channels when we have the manual-channel-acceptance
 +              // flag set and (sometimes) accept channels as 0conf.
 +              let chanmon_cfgs = create_chanmon_cfgs(2);
 +              let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
 +              let mut settings = test_default_channel_config();
 +              settings.manually_accept_inbound_channels = true;
 +              let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, Some(settings)]);
 +              let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
 +
 +              // Note that create_network connects the nodes together for us
 +
 +              nodes[0].node.create_channel(nodes[1].node.get_our_node_id(), 100_000, 0, 42, None).unwrap();
 +              let mut open_channel_msg = get_event_msg!(nodes[0], MessageSendEvent::SendOpenChannel, nodes[1].node.get_our_node_id());
 +
 +              // First, get us up to MAX_UNFUNDED_CHANNEL_PEERS so we can test at the edge
 +              for _ in 0..super::MAX_UNFUNDED_CHANNEL_PEERS - 1 {
 +                      let random_pk = PublicKey::from_secret_key(&nodes[0].node.secp_ctx,
 +                              &SecretKey::from_slice(&nodes[1].keys_manager.get_secure_random_bytes()).unwrap());
 +                      nodes[1].node.peer_connected(&random_pk, &msgs::Init {
 +                              features: nodes[0].node.init_features(), remote_network_address: None }, true).unwrap();
 +
 +                      nodes[1].node.handle_open_channel(&random_pk, &open_channel_msg);
 +                      let events = nodes[1].node.get_and_clear_pending_events();
 +                      match events[0] {
 +                              Event::OpenChannelRequest { temporary_channel_id, .. } => {
 +                                      nodes[1].node.accept_inbound_channel(&temporary_channel_id, &random_pk, 23).unwrap();
 +                              }
 +                              _ => panic!("Unexpected event"),
 +                      }
 +                      get_event_msg!(nodes[1], MessageSendEvent::SendAcceptChannel, random_pk);
 +                      open_channel_msg.temporary_channel_id = nodes[0].keys_manager.get_secure_random_bytes();
 +              }
 +
 +              // If we try to accept a channel from another peer non-0conf it will fail.
 +              let last_random_pk = PublicKey::from_secret_key(&nodes[0].node.secp_ctx,
 +                      &SecretKey::from_slice(&nodes[1].keys_manager.get_secure_random_bytes()).unwrap());
 +              nodes[1].node.peer_connected(&last_random_pk, &msgs::Init {
 +                      features: nodes[0].node.init_features(), remote_network_address: None }, true).unwrap();
 +              nodes[1].node.handle_open_channel(&last_random_pk, &open_channel_msg);
 +              let events = nodes[1].node.get_and_clear_pending_events();
 +              match events[0] {
 +                      Event::OpenChannelRequest { temporary_channel_id, .. } => {
 +                              match nodes[1].node.accept_inbound_channel(&temporary_channel_id, &last_random_pk, 23) {
 +                                      Err(APIError::APIMisuseError { err }) =>
 +                                              assert_eq!(err, "Too many peers with unfunded channels, refusing to accept new ones"),
 +                                      _ => panic!(),
 +                              }
 +                      }
 +                      _ => panic!("Unexpected event"),
 +              }
 +              assert_eq!(get_err_msg!(nodes[1], last_random_pk).channel_id,
 +                      open_channel_msg.temporary_channel_id);
 +
 +              // ...however if we accept the same channel 0conf it should work just fine.
 +              nodes[1].node.handle_open_channel(&last_random_pk, &open_channel_msg);
 +              let events = nodes[1].node.get_and_clear_pending_events();
 +              match events[0] {
 +                      Event::OpenChannelRequest { temporary_channel_id, .. } => {
 +                              nodes[1].node.accept_inbound_channel_from_trusted_peer_0conf(&temporary_channel_id, &last_random_pk, 23).unwrap();
 +                      }
 +                      _ => panic!("Unexpected event"),
 +              }
 +              get_event_msg!(nodes[1], MessageSendEvent::SendAcceptChannel, last_random_pk);
 +      }
 +
        #[cfg(anchors)]
        #[test]
        fn test_anchors_zero_fee_htlc_tx_fallback() {
@@@ -8702,8 -8518,8 +8703,8 @@@ pub mod bench 
                });
                let node_b_holder = NodeHolder { node: &node_b };
  
 -              node_a.peer_connected(&node_b.get_our_node_id(), &Init { features: node_b.init_features(), remote_network_address: None }).unwrap();
 -              node_b.peer_connected(&node_a.get_our_node_id(), &Init { features: node_a.init_features(), remote_network_address: None }).unwrap();
 +              node_a.peer_connected(&node_b.get_our_node_id(), &Init { features: node_b.init_features(), remote_network_address: None }, true).unwrap();
 +              node_b.peer_connected(&node_a.get_our_node_id(), &Init { features: node_a.init_features(), remote_network_address: None }, false).unwrap();
                node_a.create_channel(node_b.get_our_node_id(), 8_000_000, 100_000_000, 42, None).unwrap();
                node_b.handle_open_channel(&node_a.get_our_node_id(), &get_event_msg!(node_a_holder, MessageSendEvent::SendOpenChannel, node_b.get_our_node_id()));
                node_a.handle_accept_channel(&node_b.get_our_node_id(), &get_event_msg!(node_b_holder, MessageSendEvent::SendAcceptChannel, node_a.get_our_node_id()));
index 96a04f417d031978c1a7f6116cf04e133c225d11,1d6eba5ade6507ee92c7ba7a3ddbd3f02dfd6200..86648e5fb724d39b4e027f2152ea002b32166f79
@@@ -253,12 -253,12 +253,12 @@@ fn no_pending_leak_on_initial_send_fail
  
        let (route, payment_hash, _, payment_secret) = get_route_and_payment_hash!(nodes[0], nodes[1], 100_000);
  
 -      nodes[0].node.peer_disconnected(&nodes[1].node.get_our_node_id(), false);
 -      nodes[1].node.peer_disconnected(&nodes[1].node.get_our_node_id(), false);
 +      nodes[0].node.peer_disconnected(&nodes[1].node.get_our_node_id());
 +      nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id());
  
        unwrap_send_err!(nodes[0].node.send_payment(&route, payment_hash, &Some(payment_secret), PaymentId(payment_hash.0)),
                true, APIError::ChannelUnavailable { ref err },
 -              assert_eq!(err, "Peer for first hop currently disconnected/pending monitor update!"));
 +              assert_eq!(err, "Peer for first hop currently disconnected"));
  
        assert!(!nodes[0].node.has_pending_payments());
  }
@@@ -310,8 -310,8 +310,8 @@@ fn do_retry_with_no_persist(confirm_bef
        // We relay the payment to nodes[1] while its disconnected from nodes[2], causing the payment
        // to be returned immediately to nodes[0], without having nodes[2] fail the inbound payment
        // which would prevent retry.
 -      nodes[1].node.peer_disconnected(&nodes[2].node.get_our_node_id(), false);
 -      nodes[2].node.peer_disconnected(&nodes[1].node.get_our_node_id(), false);
 +      nodes[1].node.peer_disconnected(&nodes[2].node.get_our_node_id());
 +      nodes[2].node.peer_disconnected(&nodes[1].node.get_our_node_id());
  
        nodes[1].node.handle_update_add_htlc(&nodes[0].node.get_our_node_id(), &payment_event.msgs[0]);
        commitment_signed_dance!(nodes[1], nodes[0], payment_event.commitment_msg, false, true);
        assert_eq!(as_broadcasted_txn.len(), 1);
        assert_eq!(as_broadcasted_txn[0], as_commitment_tx);
  
 -      nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id(), false);
 -      nodes[0].node.peer_connected(&nodes[1].node.get_our_node_id(), &msgs::Init { features: nodes[1].node.init_features(), remote_network_address: None }).unwrap();
 +      nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id());
 +      nodes[0].node.peer_connected(&nodes[1].node.get_our_node_id(), &msgs::Init { features: nodes[1].node.init_features(), remote_network_address: None }, true).unwrap();
        assert!(nodes[0].node.get_and_clear_pending_msg_events().is_empty());
  
        // Now nodes[1] should send a channel reestablish, which nodes[0] will respond to with an
        // error, as the channel has hit the chain.
 -      nodes[1].node.peer_connected(&nodes[0].node.get_our_node_id(), &msgs::Init { features: nodes[0].node.init_features(), remote_network_address: None }).unwrap();
 +      nodes[1].node.peer_connected(&nodes[0].node.get_our_node_id(), &msgs::Init { features: nodes[0].node.init_features(), remote_network_address: None }, false).unwrap();
        let bs_reestablish = get_chan_reestablish_msgs!(nodes[1], nodes[0]).pop().unwrap();
        nodes[0].node.handle_channel_reestablish(&nodes[1].node.get_our_node_id(), &bs_reestablish);
        let as_err = nodes[0].node.get_and_clear_pending_msg_events();
@@@ -496,7 -496,7 +496,7 @@@ fn do_test_completed_payment_not_retrya
        let chan_0_monitor_serialized = get_monitor!(nodes[0], chan_id).encode();
  
        reload_node!(nodes[0], test_default_channel_config(), nodes_0_serialized, &[&chan_0_monitor_serialized], first_persister, first_new_chain_monitor, first_nodes_0_deserialized);
 -      nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id(), false);
 +      nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id());
  
        // On reload, the ChannelManager should realize it is stale compared to the ChannelMonitor and
        // force-close the channel.
        assert!(nodes[0].node.has_pending_payments());
        assert_eq!(nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().split_off(0).len(), 1);
  
 -      nodes[0].node.peer_connected(&nodes[1].node.get_our_node_id(), &msgs::Init { features: nodes[1].node.init_features(), remote_network_address: None }).unwrap();
 +      nodes[0].node.peer_connected(&nodes[1].node.get_our_node_id(), &msgs::Init { features: nodes[1].node.init_features(), remote_network_address: None }, true).unwrap();
        assert!(nodes[0].node.get_and_clear_pending_msg_events().is_empty());
  
        // Now nodes[1] should send a channel reestablish, which nodes[0] will respond to with an
        // error, as the channel has hit the chain.
 -      nodes[1].node.peer_connected(&nodes[0].node.get_our_node_id(), &msgs::Init { features: nodes[0].node.init_features(), remote_network_address: None }).unwrap();
 +      nodes[1].node.peer_connected(&nodes[0].node.get_our_node_id(), &msgs::Init { features: nodes[0].node.init_features(), remote_network_address: None }, false).unwrap();
        let bs_reestablish = get_chan_reestablish_msgs!(nodes[1], nodes[0]).pop().unwrap();
        nodes[0].node.handle_channel_reestablish(&nodes[1].node.get_our_node_id(), &bs_reestablish);
        let as_err = nodes[0].node.get_and_clear_pending_msg_events();
        assert!(!nodes[0].node.get_and_clear_pending_msg_events().is_empty());
  
        reload_node!(nodes[0], test_default_channel_config(), nodes_0_serialized, &[&chan_0_monitor_serialized, &chan_1_monitor_serialized], second_persister, second_new_chain_monitor, second_nodes_0_deserialized);
 -      nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id(), false);
 +      nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id());
  
        reconnect_nodes(&nodes[0], &nodes[1], (true, true), (0, 0), (0, 0), (0, 0), (0, 0), (0, 0), (false, false));
  
        // Check that after reload we can send the payment again (though we shouldn't, since it was
        // claimed previously).
        reload_node!(nodes[0], test_default_channel_config(), nodes_0_serialized, &[&chan_0_monitor_serialized, &chan_1_monitor_serialized], third_persister, third_new_chain_monitor, third_nodes_0_deserialized);
 -      nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id(), false);
 +      nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id());
  
        reconnect_nodes(&nodes[0], &nodes[1], (false, false), (0, 0), (0, 0), (0, 0), (0, 0), (0, 0), (false, false));
  
@@@ -660,8 -660,8 +660,8 @@@ fn do_test_dup_htlc_onchain_fails_on_re
        check_added_monitors!(nodes[0], 1);
        check_closed_event!(nodes[0], 1, ClosureReason::HolderForceClosed);
  
 -      nodes[0].node.peer_disconnected(&nodes[1].node.get_our_node_id(), false);
 -      nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id(), false);
 +      nodes[0].node.peer_disconnected(&nodes[1].node.get_our_node_id());
 +      nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id());
  
        // Connect blocks until the CLTV timeout is up so that we get an HTLC-Timeout transaction
        connect_blocks(&nodes[0], TEST_FINAL_CLTV + LATENCY_GRACE_PERIOD_BLOCKS + 1);
@@@ -812,7 -812,7 +812,7 @@@ fn test_fulfill_restart_failure() 
        // Now reload nodes[1]...
        reload_node!(nodes[1], &chan_manager_serialized, &[&chan_0_monitor_serialized], persister, new_chain_monitor, nodes_1_deserialized);
  
 -      nodes[0].node.peer_disconnected(&nodes[1].node.get_our_node_id(), false);
 +      nodes[0].node.peer_disconnected(&nodes[1].node.get_our_node_id());
        reconnect_nodes(&nodes[0], &nodes[1], (false, false), (0, 0), (0, 0), (0, 0), (0, 0), (0, 0), (false, false));
  
        nodes[1].node.fail_htlc_backwards(&payment_hash);
@@@ -1857,27 -1857,37 +1857,37 @@@ fn auto_retry_partial_failure() 
                payment_params: Some(route_params.payment_params.clone()),
        };
        nodes[0].router.expect_find_route(route_params.clone(), Ok(send_route));
+       let mut payment_params = route_params.payment_params.clone();
+       payment_params.previously_failed_channels.push(chan_2_id);
        nodes[0].router.expect_find_route(RouteParameters {
-                       payment_params: route_params.payment_params.clone(),
-                       final_value_msat: amt_msat / 2, final_cltv_expiry_delta: TEST_FINAL_CLTV
+                       payment_params, final_value_msat: amt_msat / 2, final_cltv_expiry_delta: TEST_FINAL_CLTV
                }, Ok(retry_1_route));
+       let mut payment_params = route_params.payment_params.clone();
+       payment_params.previously_failed_channels.push(chan_3_id);
        nodes[0].router.expect_find_route(RouteParameters {
-                       payment_params: route_params.payment_params.clone(),
-                       final_value_msat: amt_msat / 4, final_cltv_expiry_delta: TEST_FINAL_CLTV
+                       payment_params, final_value_msat: amt_msat / 4, final_cltv_expiry_delta: TEST_FINAL_CLTV
                }, Ok(retry_2_route));
  
        // Send a payment that will partially fail on send, then partially fail on retry, then succeed.
        nodes[0].node.send_payment_with_retry(payment_hash, &Some(payment_secret), PaymentId(payment_hash.0), route_params, Retry::Attempts(3)).unwrap();
        let closed_chan_events = nodes[0].node.get_and_clear_pending_events();
-       assert_eq!(closed_chan_events.len(), 2);
+       assert_eq!(closed_chan_events.len(), 4);
        match closed_chan_events[0] {
                Event::ChannelClosed { .. } => {},
                _ => panic!("Unexpected event"),
        }
        match closed_chan_events[1] {
+               Event::PaymentPathFailed { .. } => {},
+               _ => panic!("Unexpected event"),
+       }
+       match closed_chan_events[2] {
                Event::ChannelClosed { .. } => {},
                _ => panic!("Unexpected event"),
        }
+       match closed_chan_events[3] {
+               Event::PaymentPathFailed { .. } => {},
+               _ => panic!("Unexpected event"),
+       }
  
        // Pass the first part of the payment along the path.
        check_added_monitors!(nodes[0], 5); // three outbound channel updates succeeded, two permanently failed
@@@ -1993,11 -2003,13 +2003,13 @@@ fn auto_retry_zero_attempts_send_error(
        };
  
        chanmon_cfgs[0].persister.set_update_ret(ChannelMonitorUpdateStatus::PermanentFailure);
-       let err = nodes[0].node.send_payment_with_retry(payment_hash, &Some(payment_secret), PaymentId(payment_hash.0), route_params, Retry::Attempts(0)).unwrap_err();
-       if let PaymentSendFailure::AllFailedResendSafe(_) = err {
-       } else { panic!("Unexpected error"); }
+       nodes[0].node.send_payment_with_retry(payment_hash, &Some(payment_secret), PaymentId(payment_hash.0), route_params, Retry::Attempts(0)).unwrap();
        assert_eq!(nodes[0].node.get_and_clear_pending_msg_events().len(), 2); // channel close messages
-       assert_eq!(nodes[0].node.get_and_clear_pending_events().len(), 1); // channel close event
+       let events = nodes[0].node.get_and_clear_pending_events();
+       assert_eq!(events.len(), 3);
+       if let Event::ChannelClosed { .. } = events[0] { } else { panic!(); }
+       if let Event::PaymentPathFailed { .. } = events[1] { } else { panic!(); }
+       if let Event::PaymentFailed { .. } = events[2] { } else { panic!(); }
        check_added_monitors!(nodes[0], 2);
  }
  
@@@ -2103,8 -2115,10 +2115,10 @@@ fn retry_multi_path_single_failed_payme
        // On retry, split the payment across both channels.
        route.paths[0][0].fee_msat = 50_000_001;
        route.paths[1][0].fee_msat = 50_000_000;
+       let mut pay_params = route.payment_params.clone().unwrap();
+       pay_params.previously_failed_channels.push(chans[1].short_channel_id.unwrap());
        nodes[0].router.expect_find_route(RouteParameters {
-                       payment_params: route.payment_params.clone().unwrap(),
+                       payment_params: pay_params,
                        // Note that the second request here requests the amount we originally failed to send,
                        // not the amount remaining on the full payment, which should be changed.
                        final_value_msat: 100_000_001, final_cltv_expiry_delta: TEST_FINAL_CLTV
        }
  
        nodes[0].node.send_payment_with_retry(payment_hash, &Some(payment_secret), PaymentId(payment_hash.0), route_params, Retry::Attempts(1)).unwrap();
+       let events = nodes[0].node.get_and_clear_pending_events();
+       assert_eq!(events.len(), 1);
+       match events[0] {
+               Event::PaymentPathFailed { payment_hash: ev_payment_hash, payment_failed_permanently: false,
+                       network_update: None, all_paths_failed: false, short_channel_id: Some(expected_scid), .. } => {
+                       assert_eq!(payment_hash, ev_payment_hash);
+                       assert_eq!(expected_scid, route.paths[1][0].short_channel_id);
+               },
+               _ => panic!("Unexpected event"),
+       }
        let htlc_msgs = nodes[0].node.get_and_clear_pending_msg_events();
        assert_eq!(htlc_msgs.len(), 2);
        check_added_monitors!(nodes[0], 2);
@@@ -2176,12 -2200,24 +2200,24 @@@ fn immediate_retry_on_failure() 
        route.paths[0][0].short_channel_id = chans[1].short_channel_id.unwrap();
        route.paths[0][0].fee_msat = 50_000_000;
        route.paths[1][0].fee_msat = 50_000_001;
+       let mut pay_params = route_params.payment_params.clone();
+       pay_params.previously_failed_channels.push(chans[0].short_channel_id.unwrap());
        nodes[0].router.expect_find_route(RouteParameters {
-                       payment_params: route_params.payment_params.clone(),
-                       final_value_msat: amt_msat, final_cltv_expiry_delta: TEST_FINAL_CLTV
+                       payment_params: pay_params, final_value_msat: amt_msat,
+                       final_cltv_expiry_delta: TEST_FINAL_CLTV
                }, Ok(route.clone()));
  
        nodes[0].node.send_payment_with_retry(payment_hash, &Some(payment_secret), PaymentId(payment_hash.0), route_params, Retry::Attempts(1)).unwrap();
+       let events = nodes[0].node.get_and_clear_pending_events();
+       assert_eq!(events.len(), 1);
+       match events[0] {
+               Event::PaymentPathFailed { payment_hash: ev_payment_hash, payment_failed_permanently: false,
+               network_update: None, all_paths_failed: false, short_channel_id: Some(expected_scid), .. } => {
+                       assert_eq!(payment_hash, ev_payment_hash);
+                       assert_eq!(expected_scid, route.paths[1][0].short_channel_id);
+               },
+               _ => panic!("Unexpected event"),
+       }
        let htlc_msgs = nodes[0].node.get_and_clear_pending_msg_events();
        assert_eq!(htlc_msgs.len(), 2);
        check_added_monitors!(nodes[0], 2);