Merge pull request #2014 from valentinewallace/2023-02-rework-partial-pmt-fail
[rust-lightning] / lightning / src / ln / channelmanager.rs
index 2071fd51bbb4a29de0f910d0de26e3b34c305dc4..577e0984448a257a2ff9a23f305aa2684ef968dc 100644 (file)
@@ -78,7 +78,7 @@ use core::time::Duration;
 use core::ops::Deref;
 
 // Re-export this for use in the public API.
-pub use crate::ln::outbound_payment::{PaymentSendFailure, Retry};
+pub use crate::ln::outbound_payment::{PaymentSendFailure, Retry, RetryableSendFailure};
 
 // We hold various information about HTLC relay in the HTLC objects in Channel itself:
 //
@@ -605,6 +605,15 @@ pub type SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, 'h, M, T, F, L> = C
 /// offline for a full minute. In order to track this, you must call
 /// timer_tick_occurred roughly once per minute, though it doesn't have to be perfect.
 ///
+/// To avoid trivial DoS issues, ChannelManager limits the number of inbound connections and
+/// inbound channels without confirmed funding transactions. This may result in nodes which we do
+/// not have a channel with being unable to connect to us or open new channels with us if we have
+/// many peers with unfunded channels.
+///
+/// Because it is an indication of trust, inbound channels which we've accepted as 0conf are
+/// exempted from the count of unfunded channels. Similarly, outbound channels and connections are
+/// never limited. Please ensure you limit the count of such channels yourself.
+///
 /// Rather than using a plain ChannelManager, it is preferable to use either a SimpleArcChannelManager
 /// a SimpleRefChannelManager, for conciseness. See their documentation for more details, but
 /// essentially you should default to using a SimpleRefChannelManager, and use a
@@ -955,6 +964,19 @@ pub(crate) const MPP_TIMEOUT_TICKS: u8 = 3;
 /// [`OutboundPayments::remove_stale_resolved_payments`].
 pub(crate) const IDEMPOTENCY_TIMEOUT_TICKS: u8 = 7;
 
+/// The maximum number of unfunded channels we can have per-peer before we start rejecting new
+/// (inbound) ones. The number of peers with unfunded channels is limited separately in
+/// [`MAX_UNFUNDED_CHANNEL_PEERS`].
+const MAX_UNFUNDED_CHANS_PER_PEER: usize = 4;
+
+/// The maximum number of peers from which we will allow pending unfunded channels. Once we reach
+/// this many peers we reject new (inbound) channels from peers with which we don't have a channel.
+const MAX_UNFUNDED_CHANNEL_PEERS: usize = 50;
+
+/// The maximum number of peers which we do not have a (funded) channel with. Once we reach this
+/// many peers we reject new (inbound) connections.
+const MAX_NO_CHANNEL_PEERS: usize = 250;
+
 /// Information needed for constructing an invoice route hint for this channel.
 #[derive(Clone, Debug, PartialEq)]
 pub struct CounterpartyForwardingInfo {
@@ -2415,7 +2437,7 @@ where
 
                        let per_peer_state = self.per_peer_state.read().unwrap();
                        let peer_state_mutex = per_peer_state.get(&counterparty_node_id)
-                               .ok_or_else(|| APIError::InvalidRoute{err: "No peer matching the path's first hop found!" })?;
+                               .ok_or_else(|| APIError::ChannelUnavailable{err: "No peer matching the path's first hop found!".to_owned() })?;
                        let mut peer_state_lock = peer_state_mutex.lock().unwrap();
                        let peer_state = &mut *peer_state_lock;
                        if let hash_map::Entry::Occupied(mut chan) = peer_state.channel_by_id.entry(id) {
@@ -2541,12 +2563,13 @@ where
 
        /// Similar to [`ChannelManager::send_payment`], but will automatically find a route based on
        /// `route_params` and retry failed payment paths based on `retry_strategy`.
-       pub fn send_payment_with_retry(&self, payment_hash: PaymentHash, payment_secret: &Option<PaymentSecret>, payment_id: PaymentId, route_params: RouteParameters, retry_strategy: Retry) -> Result<(), PaymentSendFailure> {
+       pub fn send_payment_with_retry(&self, payment_hash: PaymentHash, payment_secret: &Option<PaymentSecret>, payment_id: PaymentId, route_params: RouteParameters, retry_strategy: Retry) -> Result<(), RetryableSendFailure> {
                let best_block_height = self.best_block.read().unwrap().height();
                self.pending_outbound_payments
                        .send_payment(payment_hash, payment_secret, payment_id, retry_strategy, route_params,
                                &self.router, self.list_usable_channels(), || self.compute_inflight_htlcs(),
                                &self.entropy_source, &self.node_signer, best_block_height, &self.logger,
+                               &self.pending_events,
                                |path, payment_params, payment_hash, payment_secret, total_value, cur_height, payment_id, keysend_preimage, session_priv|
                                self.send_payment_along_path(path, payment_params, payment_hash, payment_secret, total_value, cur_height, payment_id, keysend_preimage, session_priv))
        }
@@ -2618,12 +2641,12 @@ where
        /// payments.
        ///
        /// [`PaymentParameters::for_keysend`]: crate::routing::router::PaymentParameters::for_keysend
-       pub fn send_spontaneous_payment_with_retry(&self, payment_preimage: Option<PaymentPreimage>, payment_id: PaymentId, route_params: RouteParameters, retry_strategy: Retry) -> Result<PaymentHash, PaymentSendFailure> {
+       pub fn send_spontaneous_payment_with_retry(&self, payment_preimage: Option<PaymentPreimage>, payment_id: PaymentId, route_params: RouteParameters, retry_strategy: Retry) -> Result<PaymentHash, RetryableSendFailure> {
                let best_block_height = self.best_block.read().unwrap().height();
                self.pending_outbound_payments.send_spontaneous_payment(payment_preimage, payment_id,
                        retry_strategy, route_params, &self.router, self.list_usable_channels(),
                        || self.compute_inflight_htlcs(),  &self.entropy_source, &self.node_signer, best_block_height,
-                       &self.logger,
+                       &self.logger, &self.pending_events,
                        |path, payment_params, payment_hash, payment_secret, total_value, cur_height, payment_id, keysend_preimage, session_priv|
                        self.send_payment_along_path(path, payment_params, payment_hash, payment_secret, total_value, cur_height, payment_id, keysend_preimage, session_priv))
        }
@@ -2676,7 +2699,7 @@ where
                                        (chan, funding_msg)
                                },
                                Err(_) => { return Err(APIError::ChannelUnavailable {
-                                       err: "Error deriving keys or signing initial commitment transactions - either our RNG or our counterparty's RNG is broken or the Signer refused to sign".to_owned()
+                                       err: "Signer refused to sign the initial commitment transaction".to_owned()
                                }) },
                        }
                };
@@ -3747,16 +3770,19 @@ where
                // being fully configured. See the docs for `ChannelManagerReadArgs` for more.
                match source {
                        HTLCSource::OutboundRoute { ref path, ref session_priv, ref payment_id, ref payment_params, .. } => {
-                               self.pending_outbound_payments.fail_htlc(source, payment_hash, onion_error, path, session_priv, payment_id, payment_params, self.probing_cookie_secret, &self.secp_ctx, &self.pending_events, &self.logger);
+                               if self.pending_outbound_payments.fail_htlc(source, payment_hash, onion_error, path,
+                                       session_priv, payment_id, payment_params, self.probing_cookie_secret, &self.secp_ctx,
+                                       &self.pending_events, &self.logger)
+                               { self.push_pending_forwards_ev(); }
                        },
                        HTLCSource::PreviousHopData(HTLCPreviousHopData { ref short_channel_id, ref htlc_id, ref incoming_packet_shared_secret, ref phantom_shared_secret, ref outpoint }) => {
                                log_trace!(self.logger, "Failing HTLC with payment_hash {} backwards from us with {:?}", log_bytes!(payment_hash.0), onion_error);
                                let err_packet = onion_error.get_encrypted_failure_packet(incoming_packet_shared_secret, phantom_shared_secret);
 
-                               let mut forward_event = None;
+                               let mut push_forward_ev = false;
                                let mut forward_htlcs = self.forward_htlcs.lock().unwrap();
                                if forward_htlcs.is_empty() {
-                                       forward_event = Some(Duration::from_millis(MIN_HTLC_RELAY_HOLDING_CELL_MILLIS));
+                                       push_forward_ev = true;
                                }
                                match forward_htlcs.entry(*short_channel_id) {
                                        hash_map::Entry::Occupied(mut entry) => {
@@ -3767,12 +3793,8 @@ where
                                        }
                                }
                                mem::drop(forward_htlcs);
+                               if push_forward_ev { self.push_pending_forwards_ev(); }
                                let mut pending_events = self.pending_events.lock().unwrap();
-                               if let Some(time) = forward_event {
-                                       pending_events.push(events::Event::PendingHTLCsForwardable {
-                                               time_forwardable: time
-                                       });
-                               }
                                pending_events.push(events::Event::HTLCHandlingFailed {
                                        prev_channel_id: outpoint.to_channel_id(),
                                        failed_next_destination: destination,
@@ -4221,11 +4243,13 @@ where
        fn do_accept_inbound_channel(&self, temporary_channel_id: &[u8; 32], counterparty_node_id: &PublicKey, accept_0conf: bool, user_channel_id: u128) -> Result<(), APIError> {
                let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);
 
+               let peers_without_funded_channels = self.peers_without_funded_channels(|peer| !peer.channel_by_id.is_empty());
                let per_peer_state = self.per_peer_state.read().unwrap();
                let peer_state_mutex = per_peer_state.get(counterparty_node_id)
                        .ok_or_else(|| APIError::ChannelUnavailable { err: format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id) })?;
                let mut peer_state_lock = peer_state_mutex.lock().unwrap();
                let peer_state = &mut *peer_state_lock;
+               let is_only_peer_channel = peer_state.channel_by_id.len() == 1;
                match peer_state.channel_by_id.entry(temporary_channel_id.clone()) {
                        hash_map::Entry::Occupied(mut channel) => {
                                if !channel.get().inbound_is_awaiting_accept() {
@@ -4243,6 +4267,21 @@ where
                                        peer_state.pending_msg_events.push(send_msg_err_event);
                                        let _ = remove_channel!(self, channel);
                                        return Err(APIError::APIMisuseError { err: "Please use accept_inbound_channel_from_trusted_peer_0conf to accept channels with zero confirmations.".to_owned() });
+                               } else {
+                                       // If this peer already has some channels, a new channel won't increase our number of peers
+                                       // with unfunded channels, so as long as we aren't over the maximum number of unfunded
+                                       // channels per-peer we can accept channels from a peer with existing ones.
+                                       if is_only_peer_channel && peers_without_funded_channels >= MAX_UNFUNDED_CHANNEL_PEERS {
+                                               let send_msg_err_event = events::MessageSendEvent::HandleError {
+                                                       node_id: channel.get().get_counterparty_node_id(),
+                                                       action: msgs::ErrorAction::SendErrorMessage{
+                                                               msg: msgs::ErrorMessage { channel_id: temporary_channel_id.clone(), data: "Have too many peers with unfunded channels, not accepting new ones".to_owned(), }
+                                                       }
+                                               };
+                                               peer_state.pending_msg_events.push(send_msg_err_event);
+                                               let _ = remove_channel!(self, channel);
+                                               return Err(APIError::APIMisuseError { err: "Too many peers with unfunded channels, refusing to accept new ones".to_owned() });
+                                       }
                                }
 
                                peer_state.pending_msg_events.push(events::MessageSendEvent::SendAcceptChannel {
@@ -4257,6 +4296,43 @@ where
                Ok(())
        }
 
+       /// Gets the number of peers which match the given filter and do not have any funded, outbound,
+       /// or 0-conf channels.
+       ///
+       /// The filter is called for each peer and provided with the number of unfunded, inbound, and
+       /// non-0-conf channels we have with the peer.
+       fn peers_without_funded_channels<Filter>(&self, maybe_count_peer: Filter) -> usize
+       where Filter: Fn(&PeerState<<SP::Target as SignerProvider>::Signer>) -> bool {
+               let mut peers_without_funded_channels = 0;
+               let best_block_height = self.best_block.read().unwrap().height();
+               {
+                       let peer_state_lock = self.per_peer_state.read().unwrap();
+                       for (_, peer_mtx) in peer_state_lock.iter() {
+                               let peer = peer_mtx.lock().unwrap();
+                               if !maybe_count_peer(&*peer) { continue; }
+                               let num_unfunded_channels = Self::unfunded_channel_count(&peer, best_block_height);
+                               if num_unfunded_channels == peer.channel_by_id.len() {
+                                       peers_without_funded_channels += 1;
+                               }
+                       }
+               }
+               return peers_without_funded_channels;
+       }
+
+       fn unfunded_channel_count(
+               peer: &PeerState<<SP::Target as SignerProvider>::Signer>, best_block_height: u32
+       ) -> usize {
+               let mut num_unfunded_channels = 0;
+               for (_, chan) in peer.channel_by_id.iter() {
+                       if !chan.is_outbound() && chan.minimum_depth().unwrap_or(1) != 0 &&
+                               chan.get_funding_tx_confirmations(best_block_height) == 0
+                       {
+                               num_unfunded_channels += 1;
+                       }
+               }
+               num_unfunded_channels
+       }
+
        fn internal_open_channel(&self, counterparty_node_id: &PublicKey, msg: &msgs::OpenChannel) -> Result<(), MsgHandleErrInternal> {
                if msg.chain_hash != self.genesis_hash {
                        return Err(MsgHandleErrInternal::send_err_msg_no_close("Unknown genesis block hash".to_owned(), msg.temporary_channel_id.clone()));
@@ -4269,8 +4345,13 @@ where
                let mut random_bytes = [0u8; 16];
                random_bytes.copy_from_slice(&self.entropy_source.get_secure_random_bytes()[..16]);
                let user_channel_id = u128::from_be_bytes(random_bytes);
-
                let outbound_scid_alias = self.create_and_insert_outbound_scid_alias();
+
+               // Get the number of peers with channels, but without funded ones. We don't care too much
+               // about peers that never open a channel, so we filter by peers that have at least one
+               // channel, and then limit the number of those with unfunded channels.
+               let channeled_peers_without_funding = self.peers_without_funded_channels(|node| !node.channel_by_id.is_empty());
+
                let per_peer_state = self.per_peer_state.read().unwrap();
                let peer_state_mutex = per_peer_state.get(counterparty_node_id)
                    .ok_or_else(|| {
@@ -4279,9 +4360,29 @@ where
                        })?;
                let mut peer_state_lock = peer_state_mutex.lock().unwrap();
                let peer_state = &mut *peer_state_lock;
+
+               // If this peer already has some channels, a new channel won't increase our number of peers
+               // with unfunded channels, so as long as we aren't over the maximum number of unfunded
+               // channels per-peer we can accept channels from a peer with existing ones.
+               if peer_state.channel_by_id.is_empty() &&
+                       channeled_peers_without_funding >= MAX_UNFUNDED_CHANNEL_PEERS &&
+                       !self.default_configuration.manually_accept_inbound_channels
+               {
+                       return Err(MsgHandleErrInternal::send_err_msg_no_close(
+                               "Have too many peers with unfunded channels, not accepting new ones".to_owned(),
+                               msg.temporary_channel_id.clone()));
+               }
+
+               let best_block_height = self.best_block.read().unwrap().height();
+               if Self::unfunded_channel_count(peer_state, best_block_height) >= MAX_UNFUNDED_CHANS_PER_PEER {
+                       return Err(MsgHandleErrInternal::send_err_msg_no_close(
+                               format!("Refusing more than {} unfunded channels.", MAX_UNFUNDED_CHANS_PER_PEER),
+                               msg.temporary_channel_id.clone()));
+               }
+
                let mut channel = match Channel::new_from_req(&self.fee_estimator, &self.entropy_source, &self.signer_provider,
-                       counterparty_node_id.clone(), &self.channel_type_features(), &peer_state.latest_features, msg, user_channel_id, &self.default_configuration,
-                       self.best_block.read().unwrap().height(), &self.logger, outbound_scid_alias)
+                       counterparty_node_id.clone(), &self.channel_type_features(), &peer_state.latest_features, msg, user_channel_id,
+                       &self.default_configuration, best_block_height, &self.logger, outbound_scid_alias)
                {
                        Err(e) => {
                                self.outbound_scid_aliases.lock().unwrap().remove(&outbound_scid_alias);
@@ -4352,60 +4453,31 @@ where
        }
 
        fn internal_funding_created(&self, counterparty_node_id: &PublicKey, msg: &msgs::FundingCreated) -> Result<(), MsgHandleErrInternal> {
+               let best_block = *self.best_block.read().unwrap();
+
                let per_peer_state = self.per_peer_state.read().unwrap();
                let peer_state_mutex = per_peer_state.get(counterparty_node_id)
                        .ok_or_else(|| {
                                debug_assert!(false);
                                MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.temporary_channel_id)
                        })?;
-               let ((funding_msg, monitor, mut channel_ready), mut chan) = {
-                       let best_block = *self.best_block.read().unwrap();
-                       let mut peer_state_lock = peer_state_mutex.lock().unwrap();
-                       let peer_state = &mut *peer_state_lock;
+
+               let mut peer_state_lock = peer_state_mutex.lock().unwrap();
+               let peer_state = &mut *peer_state_lock;
+               let ((funding_msg, monitor), chan) =
                        match peer_state.channel_by_id.entry(msg.temporary_channel_id) {
                                hash_map::Entry::Occupied(mut chan) => {
                                        (try_chan_entry!(self, chan.get_mut().funding_created(msg, best_block, &self.signer_provider, &self.logger), chan), chan.remove())
                                },
                                hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.temporary_channel_id))
-                       }
-               };
-               // Because we have exclusive ownership of the channel here we can release the peer_state
-               // lock before watch_channel
-               match self.chain_monitor.watch_channel(monitor.get_funding_txo().0, monitor) {
-                       ChannelMonitorUpdateStatus::Completed => {},
-                       ChannelMonitorUpdateStatus::PermanentFailure => {
-                               // Note that we reply with the new channel_id in error messages if we gave up on the
-                               // channel, not the temporary_channel_id. This is compatible with ourselves, but the
-                               // spec is somewhat ambiguous here. Not a huge deal since we'll send error messages for
-                               // any messages referencing a previously-closed channel anyway.
-                               // We do not propagate the monitor update to the user as it would be for a monitor
-                               // that we didn't manage to store (and that we don't care about - we don't respond
-                               // with the funding_signed so the channel can never go on chain).
-                               let (_monitor_update, failed_htlcs) = chan.force_shutdown(false);
-                               assert!(failed_htlcs.is_empty());
-                               return Err(MsgHandleErrInternal::send_err_msg_no_close("ChannelMonitor storage failure".to_owned(), funding_msg.channel_id));
-                       },
-                       ChannelMonitorUpdateStatus::InProgress => {
-                               // There's no problem signing a counterparty's funding transaction if our monitor
-                               // hasn't persisted to disk yet - we can't lose money on a transaction that we haven't
-                               // accepted payment from yet. We do, however, need to wait to send our channel_ready
-                               // until we have persisted our monitor.
-                               chan.monitor_updating_paused(false, false, channel_ready.is_some(), Vec::new(), Vec::new(), Vec::new());
-                               channel_ready = None; // Don't send the channel_ready now
-                       },
-               }
-               // It's safe to unwrap as we've held the `per_peer_state` read lock since checking that the
-               // peer exists, despite the inner PeerState potentially having no channels after removing
-               // the channel above.
-               let mut peer_state_lock = peer_state_mutex.lock().unwrap();
-               let peer_state = &mut *peer_state_lock;
+                       };
+
                match peer_state.channel_by_id.entry(funding_msg.channel_id) {
                        hash_map::Entry::Occupied(_) => {
-                               return Err(MsgHandleErrInternal::send_err_msg_no_close("Already had channel with the new channel_id".to_owned(), funding_msg.channel_id))
+                               Err(MsgHandleErrInternal::send_err_msg_no_close("Already had channel with the new channel_id".to_owned(), funding_msg.channel_id))
                        },
                        hash_map::Entry::Vacant(e) => {
-                               let mut id_to_peer = self.id_to_peer.lock().unwrap();
-                               match id_to_peer.entry(chan.channel_id()) {
+                               match self.id_to_peer.lock().unwrap().entry(chan.channel_id()) {
                                        hash_map::Entry::Occupied(_) => {
                                                return Err(MsgHandleErrInternal::send_err_msg_no_close(
                                                        "The funding_created message had the same funding_txid as an existing channel - funding is not possible".to_owned(),
@@ -4415,17 +4487,35 @@ where
                                                i_e.insert(chan.get_counterparty_node_id());
                                        }
                                }
+
+                               // There's no problem signing a counterparty's funding transaction if our monitor
+                               // hasn't persisted to disk yet - we can't lose money on a transaction that we haven't
+                               // accepted payment from yet. We do, however, need to wait to send our channel_ready
+                               // until we have persisted our monitor.
+                               let new_channel_id = funding_msg.channel_id;
                                peer_state.pending_msg_events.push(events::MessageSendEvent::SendFundingSigned {
                                        node_id: counterparty_node_id.clone(),
                                        msg: funding_msg,
                                });
-                               if let Some(msg) = channel_ready {
-                                       send_channel_ready!(self, peer_state.pending_msg_events, chan, msg);
+
+                               let monitor_res = self.chain_monitor.watch_channel(monitor.get_funding_txo().0, monitor);
+
+                               let chan = e.insert(chan);
+                               let mut res = handle_new_monitor_update!(self, monitor_res, 0, peer_state_lock, peer_state, chan, MANUALLY_REMOVING, { peer_state.channel_by_id.remove(&new_channel_id) });
+
+                               // Note that we reply with the new channel_id in error messages if we gave up on the
+                               // channel, not the temporary_channel_id. This is compatible with ourselves, but the
+                               // spec is somewhat ambiguous here. Not a huge deal since we'll send error messages for
+                               // any messages referencing a previously-closed channel anyway.
+                               // We do not propagate the monitor update to the user as it would be for a monitor
+                               // that we didn't manage to store (and that we don't care about - we don't respond
+                               // with the funding_signed so the channel can never go on chain).
+                               if let Err(MsgHandleErrInternal { shutdown_finish: Some((res, _)), .. }) = &mut res {
+                                       res.0 = None;
                                }
-                               e.insert(chan);
+                               res
                        }
                }
-               Ok(())
        }
 
        fn internal_funding_signed(&self, counterparty_node_id: &PublicKey, msg: &msgs::FundingSigned) -> Result<(), MsgHandleErrInternal> {
@@ -4741,7 +4831,7 @@ where
        #[inline]
        fn forward_htlcs(&self, per_source_pending_forwards: &mut [(u64, OutPoint, u128, Vec<(PendingHTLCInfo, u64)>)]) {
                for &mut (prev_short_channel_id, prev_funding_outpoint, prev_user_channel_id, ref mut pending_forwards) in per_source_pending_forwards {
-                       let mut forward_event = None;
+                       let mut push_forward_event = false;
                        let mut new_intercept_events = Vec::new();
                        let mut failed_intercept_forwards = Vec::new();
                        if !pending_forwards.is_empty() {
@@ -4799,7 +4889,7 @@ where
                                                                // We don't want to generate a PendingHTLCsForwardable event if only intercepted
                                                                // payments are being processed.
                                                                if forward_htlcs_empty {
-                                                                       forward_event = Some(Duration::from_millis(MIN_HTLC_RELAY_HOLDING_CELL_MILLIS));
+                                                                       push_forward_event = true;
                                                                }
                                                                entry.insert(vec!(HTLCForwardInfo::AddHTLC(PendingAddHTLCInfo {
                                                                        prev_short_channel_id, prev_funding_outpoint, prev_htlc_id, prev_user_channel_id, forward_info })));
@@ -4817,16 +4907,21 @@ where
                                let mut events = self.pending_events.lock().unwrap();
                                events.append(&mut new_intercept_events);
                        }
+                       if push_forward_event { self.push_pending_forwards_ev() }
+               }
+       }
 
-                       match forward_event {
-                               Some(time) => {
-                                       let mut pending_events = self.pending_events.lock().unwrap();
-                                       pending_events.push(events::Event::PendingHTLCsForwardable {
-                                               time_forwardable: time
-                                       });
-                               }
-                               None => {},
-                       }
+       // We only want to push a PendingHTLCsForwardable event if no others are queued.
+       fn push_pending_forwards_ev(&self) {
+               let mut pending_events = self.pending_events.lock().unwrap();
+               let forward_ev_exists = pending_events.iter()
+                       .find(|ev| if let events::Event::PendingHTLCsForwardable { .. } = ev { true } else { false })
+                       .is_some();
+               if !forward_ev_exists {
+                       pending_events.push(events::Event::PendingHTLCsForwardable {
+                               time_forwardable:
+                                       Duration::from_millis(MIN_HTLC_RELAY_HOLDING_CELL_MILLIS),
+                       });
                }
        }
 
@@ -6090,13 +6185,13 @@ where
                let _ = handle_error!(self, self.internal_channel_reestablish(counterparty_node_id, msg), *counterparty_node_id);
        }
 
-       fn peer_disconnected(&self, counterparty_node_id: &PublicKey, no_connection_possible: bool) {
+       fn peer_disconnected(&self, counterparty_node_id: &PublicKey) {
                let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);
                let mut failed_channels = Vec::new();
                let mut per_peer_state = self.per_peer_state.write().unwrap();
                let remove_peer = {
-                       log_debug!(self.logger, "Marking channels with {} disconnected and generating channel_updates. We believe we {} make future connections to this peer.",
-                               log_pubkey!(counterparty_node_id), if no_connection_possible { "cannot" } else { "can" });
+                       log_debug!(self.logger, "Marking channels with {} disconnected and generating channel_updates.",
+                               log_pubkey!(counterparty_node_id));
                        if let Some(peer_state_mutex) = per_peer_state.get(counterparty_node_id) {
                                let mut peer_state_lock = peer_state_mutex.lock().unwrap();
                                let peer_state = &mut *peer_state_lock;
@@ -6138,7 +6233,7 @@ where
                                debug_assert!(peer_state.is_connected, "A disconnected peer cannot disconnect");
                                peer_state.is_connected = false;
                                peer_state.ok_to_remove(true)
-                       } else { true }
+                       } else { debug_assert!(false, "Unconnected peer disconnected"); true }
                };
                if remove_peer {
                        per_peer_state.remove(counterparty_node_id);
@@ -6150,20 +6245,28 @@ where
                }
        }
 
-       fn peer_connected(&self, counterparty_node_id: &PublicKey, init_msg: &msgs::Init) -> Result<(), ()> {
+       fn peer_connected(&self, counterparty_node_id: &PublicKey, init_msg: &msgs::Init, inbound: bool) -> Result<(), ()> {
                if !init_msg.features.supports_static_remote_key() {
-                       log_debug!(self.logger, "Peer {} does not support static remote key, disconnecting with no_connection_possible", log_pubkey!(counterparty_node_id));
+                       log_debug!(self.logger, "Peer {} does not support static remote key, disconnecting", log_pubkey!(counterparty_node_id));
                        return Err(());
                }
 
-               log_debug!(self.logger, "Generating channel_reestablish events for {}", log_pubkey!(counterparty_node_id));
-
                let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);
 
+               // If we have too many peers connected which don't have funded channels, disconnect the
+               // peer immediately (as long as it doesn't have funded channels). If we have a bunch of
+               // unfunded channels taking up space in memory for disconnected peers, we still let new
+               // peers connect, but we'll reject new channels from them.
+               let connected_peers_without_funded_channels = self.peers_without_funded_channels(|node| node.is_connected);
+               let inbound_peer_limited = inbound && connected_peers_without_funded_channels >= MAX_NO_CHANNEL_PEERS;
+
                {
                        let mut peer_state_lock = self.per_peer_state.write().unwrap();
                        match peer_state_lock.entry(counterparty_node_id.clone()) {
                                hash_map::Entry::Vacant(e) => {
+                                       if inbound_peer_limited {
+                                               return Err(());
+                                       }
                                        e.insert(Mutex::new(PeerState {
                                                channel_by_id: HashMap::new(),
                                                latest_features: init_msg.features.clone(),
@@ -6175,14 +6278,24 @@ where
                                hash_map::Entry::Occupied(e) => {
                                        let mut peer_state = e.get().lock().unwrap();
                                        peer_state.latest_features = init_msg.features.clone();
+
+                                       let best_block_height = self.best_block.read().unwrap().height();
+                                       if inbound_peer_limited &&
+                                               Self::unfunded_channel_count(&*peer_state, best_block_height) ==
+                                               peer_state.channel_by_id.len()
+                                       {
+                                               return Err(());
+                                       }
+
                                        debug_assert!(!peer_state.is_connected, "A peer shouldn't be connected twice");
                                        peer_state.is_connected = true;
                                },
                        }
                }
 
-               let per_peer_state = self.per_peer_state.read().unwrap();
+               log_debug!(self.logger, "Generating channel_reestablish events for {}", log_pubkey!(counterparty_node_id));
 
+               let per_peer_state = self.per_peer_state.read().unwrap();
                for (_cp_id, peer_state_mutex) in per_peer_state.iter() {
                        let mut peer_state_lock = peer_state_mutex.lock().unwrap();
                        let peer_state = &mut *peer_state_lock;
@@ -7387,7 +7500,8 @@ where
                        }
                }
 
-               if !forward_htlcs.is_empty() {
+               let pending_outbounds = OutboundPayments { pending_outbound_payments: Mutex::new(pending_outbound_payments.unwrap()), retry_lock: Mutex::new(()) };
+               if !forward_htlcs.is_empty() || pending_outbounds.needs_abandon() {
                        // If we have pending HTLCs to forward, assume we either dropped a
                        // `PendingHTLCsForwardable` or the user received it but never processed it as they
                        // shut down before the timer hit. Either way, set the time_forwardable to a small
@@ -7564,7 +7678,7 @@ where
 
                        inbound_payment_key: expanded_inbound_key,
                        pending_inbound_payments: Mutex::new(pending_inbound_payments),
-                       pending_outbound_payments: OutboundPayments { pending_outbound_payments: Mutex::new(pending_outbound_payments.unwrap()), retry_lock: Mutex::new(()), },
+                       pending_outbound_payments: pending_outbounds,
                        pending_intercepted_htlcs: Mutex::new(pending_intercepted_htlcs.unwrap()),
 
                        forward_htlcs: Mutex::new(forward_htlcs),
@@ -8051,8 +8165,8 @@ mod tests {
 
                let chan = create_announced_chan_between_nodes(&nodes, 0, 1);
 
-               nodes[0].node.peer_disconnected(&nodes[1].node.get_our_node_id(), false);
-               nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id(), false);
+               nodes[0].node.peer_disconnected(&nodes[1].node.get_our_node_id());
+               nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id());
 
                nodes[0].node.force_close_broadcasting_latest_txn(&chan.2, &nodes[1].node.get_our_node_id()).unwrap();
                check_closed_broadcast!(nodes[0], true);
@@ -8272,6 +8386,213 @@ mod tests {
                check_unkown_peer_error(nodes[0].node.update_channel_config(&unkown_public_key, &[channel_id], &ChannelConfig::default()), unkown_public_key);
        }
 
+       #[test]
+       fn test_connection_limiting() {
+               // Test that we limit un-channel'd peers and un-funded channels properly.
+               let chanmon_cfgs = create_chanmon_cfgs(2);
+               let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
+               let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
+               let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
+
+               // Note that create_network connects the nodes together for us
+
+               nodes[0].node.create_channel(nodes[1].node.get_our_node_id(), 100_000, 0, 42, None).unwrap();
+               let mut open_channel_msg = get_event_msg!(nodes[0], MessageSendEvent::SendOpenChannel, nodes[1].node.get_our_node_id());
+
+               let mut funding_tx = None;
+               for idx in 0..super::MAX_UNFUNDED_CHANS_PER_PEER {
+                       nodes[1].node.handle_open_channel(&nodes[0].node.get_our_node_id(), &open_channel_msg);
+                       let accept_channel = get_event_msg!(nodes[1], MessageSendEvent::SendAcceptChannel, nodes[0].node.get_our_node_id());
+
+                       if idx == 0 {
+                               nodes[0].node.handle_accept_channel(&nodes[1].node.get_our_node_id(), &accept_channel);
+                               let (temporary_channel_id, tx, _) = create_funding_transaction(&nodes[0], &nodes[1].node.get_our_node_id(), 100_000, 42);
+                               funding_tx = Some(tx.clone());
+                               nodes[0].node.funding_transaction_generated(&temporary_channel_id, &nodes[1].node.get_our_node_id(), tx).unwrap();
+                               let funding_created_msg = get_event_msg!(nodes[0], MessageSendEvent::SendFundingCreated, nodes[1].node.get_our_node_id());
+
+                               nodes[1].node.handle_funding_created(&nodes[0].node.get_our_node_id(), &funding_created_msg);
+                               check_added_monitors!(nodes[1], 1);
+                               let funding_signed = get_event_msg!(nodes[1], MessageSendEvent::SendFundingSigned, nodes[0].node.get_our_node_id());
+
+                               nodes[0].node.handle_funding_signed(&nodes[1].node.get_our_node_id(), &funding_signed);
+                               check_added_monitors!(nodes[0], 1);
+                       }
+                       open_channel_msg.temporary_channel_id = nodes[0].keys_manager.get_secure_random_bytes();
+               }
+
+               // A MAX_UNFUNDED_CHANS_PER_PEER + 1 channel will be summarily rejected
+               open_channel_msg.temporary_channel_id = nodes[0].keys_manager.get_secure_random_bytes();
+               nodes[1].node.handle_open_channel(&nodes[0].node.get_our_node_id(), &open_channel_msg);
+               assert_eq!(get_err_msg!(nodes[1], nodes[0].node.get_our_node_id()).channel_id,
+                       open_channel_msg.temporary_channel_id);
+
+               // Further, because all of our channels with nodes[0] are inbound, and none of them funded,
+               // it doesn't count as a "protected" peer, i.e. it counts towards the MAX_NO_CHANNEL_PEERS
+               // limit.
+               let mut peer_pks = Vec::with_capacity(super::MAX_NO_CHANNEL_PEERS);
+               for _ in 1..super::MAX_NO_CHANNEL_PEERS {
+                       let random_pk = PublicKey::from_secret_key(&nodes[0].node.secp_ctx,
+                               &SecretKey::from_slice(&nodes[1].keys_manager.get_secure_random_bytes()).unwrap());
+                       peer_pks.push(random_pk);
+                       nodes[1].node.peer_connected(&random_pk, &msgs::Init {
+                               features: nodes[0].node.init_features(), remote_network_address: None }, true).unwrap();
+               }
+               let last_random_pk = PublicKey::from_secret_key(&nodes[0].node.secp_ctx,
+                       &SecretKey::from_slice(&nodes[1].keys_manager.get_secure_random_bytes()).unwrap());
+               nodes[1].node.peer_connected(&last_random_pk, &msgs::Init {
+                       features: nodes[0].node.init_features(), remote_network_address: None }, true).unwrap_err();
+
+               // Also importantly, because nodes[0] isn't "protected", we will refuse a reconnection from
+               // them if we have too many un-channel'd peers.
+               nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id());
+               let chan_closed_events = nodes[1].node.get_and_clear_pending_events();
+               assert_eq!(chan_closed_events.len(), super::MAX_UNFUNDED_CHANS_PER_PEER - 1);
+               for ev in chan_closed_events {
+                       if let Event::ChannelClosed { .. } = ev { } else { panic!(); }
+               }
+               nodes[1].node.peer_connected(&last_random_pk, &msgs::Init {
+                       features: nodes[0].node.init_features(), remote_network_address: None }, true).unwrap();
+               nodes[1].node.peer_connected(&nodes[0].node.get_our_node_id(), &msgs::Init {
+                       features: nodes[0].node.init_features(), remote_network_address: None }, true).unwrap_err();
+
+               // but of course if the connection is outbound its allowed...
+               nodes[1].node.peer_connected(&nodes[0].node.get_our_node_id(), &msgs::Init {
+                       features: nodes[0].node.init_features(), remote_network_address: None }, false).unwrap();
+               nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id());
+
+               // Now nodes[0] is disconnected but still has a pending, un-funded channel lying around.
+               // Even though we accept one more connection from new peers, we won't actually let them
+               // open channels.
+               assert!(peer_pks.len() > super::MAX_UNFUNDED_CHANNEL_PEERS - 1);
+               for i in 0..super::MAX_UNFUNDED_CHANNEL_PEERS - 1 {
+                       nodes[1].node.handle_open_channel(&peer_pks[i], &open_channel_msg);
+                       get_event_msg!(nodes[1], MessageSendEvent::SendAcceptChannel, peer_pks[i]);
+                       open_channel_msg.temporary_channel_id = nodes[0].keys_manager.get_secure_random_bytes();
+               }
+               nodes[1].node.handle_open_channel(&last_random_pk, &open_channel_msg);
+               assert_eq!(get_err_msg!(nodes[1], last_random_pk).channel_id,
+                       open_channel_msg.temporary_channel_id);
+
+               // Of course, however, outbound channels are always allowed
+               nodes[1].node.create_channel(last_random_pk, 100_000, 0, 42, None).unwrap();
+               get_event_msg!(nodes[1], MessageSendEvent::SendOpenChannel, last_random_pk);
+
+               // If we fund the first channel, nodes[0] has a live on-chain channel with us, it is now
+               // "protected" and can connect again.
+               mine_transaction(&nodes[1], funding_tx.as_ref().unwrap());
+               nodes[1].node.peer_connected(&nodes[0].node.get_our_node_id(), &msgs::Init {
+                       features: nodes[0].node.init_features(), remote_network_address: None }, true).unwrap();
+               get_event_msg!(nodes[1], MessageSendEvent::SendChannelReestablish, nodes[0].node.get_our_node_id());
+
+               // Further, because the first channel was funded, we can open another channel with
+               // last_random_pk.
+               nodes[1].node.handle_open_channel(&last_random_pk, &open_channel_msg);
+               get_event_msg!(nodes[1], MessageSendEvent::SendAcceptChannel, last_random_pk);
+       }
+
+       #[test]
+       fn test_outbound_chans_unlimited() {
+               // Test that we never refuse an outbound channel even if a peer is unfuned-channel-limited
+               let chanmon_cfgs = create_chanmon_cfgs(2);
+               let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
+               let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
+               let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
+
+               // Note that create_network connects the nodes together for us
+
+               nodes[0].node.create_channel(nodes[1].node.get_our_node_id(), 100_000, 0, 42, None).unwrap();
+               let mut open_channel_msg = get_event_msg!(nodes[0], MessageSendEvent::SendOpenChannel, nodes[1].node.get_our_node_id());
+
+               for _ in 0..super::MAX_UNFUNDED_CHANS_PER_PEER {
+                       nodes[1].node.handle_open_channel(&nodes[0].node.get_our_node_id(), &open_channel_msg);
+                       get_event_msg!(nodes[1], MessageSendEvent::SendAcceptChannel, nodes[0].node.get_our_node_id());
+                       open_channel_msg.temporary_channel_id = nodes[0].keys_manager.get_secure_random_bytes();
+               }
+
+               // Once we have MAX_UNFUNDED_CHANS_PER_PEER unfunded channels, new inbound channels will be
+               // rejected.
+               nodes[1].node.handle_open_channel(&nodes[0].node.get_our_node_id(), &open_channel_msg);
+               assert_eq!(get_err_msg!(nodes[1], nodes[0].node.get_our_node_id()).channel_id,
+                       open_channel_msg.temporary_channel_id);
+
+               // but we can still open an outbound channel.
+               nodes[1].node.create_channel(nodes[0].node.get_our_node_id(), 100_000, 0, 42, None).unwrap();
+               get_event_msg!(nodes[1], MessageSendEvent::SendOpenChannel, nodes[0].node.get_our_node_id());
+
+               // but even with such an outbound channel, additional inbound channels will still fail.
+               nodes[1].node.handle_open_channel(&nodes[0].node.get_our_node_id(), &open_channel_msg);
+               assert_eq!(get_err_msg!(nodes[1], nodes[0].node.get_our_node_id()).channel_id,
+                       open_channel_msg.temporary_channel_id);
+       }
+
+       #[test]
+       fn test_0conf_limiting() {
+               // Tests that we properly limit inbound channels when we have the manual-channel-acceptance
+               // flag set and (sometimes) accept channels as 0conf.
+               let chanmon_cfgs = create_chanmon_cfgs(2);
+               let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
+               let mut settings = test_default_channel_config();
+               settings.manually_accept_inbound_channels = true;
+               let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, Some(settings)]);
+               let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
+
+               // Note that create_network connects the nodes together for us
+
+               nodes[0].node.create_channel(nodes[1].node.get_our_node_id(), 100_000, 0, 42, None).unwrap();
+               let mut open_channel_msg = get_event_msg!(nodes[0], MessageSendEvent::SendOpenChannel, nodes[1].node.get_our_node_id());
+
+               // First, get us up to MAX_UNFUNDED_CHANNEL_PEERS so we can test at the edge
+               for _ in 0..super::MAX_UNFUNDED_CHANNEL_PEERS - 1 {
+                       let random_pk = PublicKey::from_secret_key(&nodes[0].node.secp_ctx,
+                               &SecretKey::from_slice(&nodes[1].keys_manager.get_secure_random_bytes()).unwrap());
+                       nodes[1].node.peer_connected(&random_pk, &msgs::Init {
+                               features: nodes[0].node.init_features(), remote_network_address: None }, true).unwrap();
+
+                       nodes[1].node.handle_open_channel(&random_pk, &open_channel_msg);
+                       let events = nodes[1].node.get_and_clear_pending_events();
+                       match events[0] {
+                               Event::OpenChannelRequest { temporary_channel_id, .. } => {
+                                       nodes[1].node.accept_inbound_channel(&temporary_channel_id, &random_pk, 23).unwrap();
+                               }
+                               _ => panic!("Unexpected event"),
+                       }
+                       get_event_msg!(nodes[1], MessageSendEvent::SendAcceptChannel, random_pk);
+                       open_channel_msg.temporary_channel_id = nodes[0].keys_manager.get_secure_random_bytes();
+               }
+
+               // If we try to accept a channel from another peer non-0conf it will fail.
+               let last_random_pk = PublicKey::from_secret_key(&nodes[0].node.secp_ctx,
+                       &SecretKey::from_slice(&nodes[1].keys_manager.get_secure_random_bytes()).unwrap());
+               nodes[1].node.peer_connected(&last_random_pk, &msgs::Init {
+                       features: nodes[0].node.init_features(), remote_network_address: None }, true).unwrap();
+               nodes[1].node.handle_open_channel(&last_random_pk, &open_channel_msg);
+               let events = nodes[1].node.get_and_clear_pending_events();
+               match events[0] {
+                       Event::OpenChannelRequest { temporary_channel_id, .. } => {
+                               match nodes[1].node.accept_inbound_channel(&temporary_channel_id, &last_random_pk, 23) {
+                                       Err(APIError::APIMisuseError { err }) =>
+                                               assert_eq!(err, "Too many peers with unfunded channels, refusing to accept new ones"),
+                                       _ => panic!(),
+                               }
+                       }
+                       _ => panic!("Unexpected event"),
+               }
+               assert_eq!(get_err_msg!(nodes[1], last_random_pk).channel_id,
+                       open_channel_msg.temporary_channel_id);
+
+               // ...however if we accept the same channel 0conf it should work just fine.
+               nodes[1].node.handle_open_channel(&last_random_pk, &open_channel_msg);
+               let events = nodes[1].node.get_and_clear_pending_events();
+               match events[0] {
+                       Event::OpenChannelRequest { temporary_channel_id, .. } => {
+                               nodes[1].node.accept_inbound_channel_from_trusted_peer_0conf(&temporary_channel_id, &last_random_pk, 23).unwrap();
+                       }
+                       _ => panic!("Unexpected event"),
+               }
+               get_event_msg!(nodes[1], MessageSendEvent::SendAcceptChannel, last_random_pk);
+       }
+
        #[cfg(anchors)]
        #[test]
        fn test_anchors_zero_fee_htlc_tx_fallback() {
@@ -8382,8 +8703,8 @@ pub mod bench {
                });
                let node_b_holder = NodeHolder { node: &node_b };
 
-               node_a.peer_connected(&node_b.get_our_node_id(), &Init { features: node_b.init_features(), remote_network_address: None }).unwrap();
-               node_b.peer_connected(&node_a.get_our_node_id(), &Init { features: node_a.init_features(), remote_network_address: None }).unwrap();
+               node_a.peer_connected(&node_b.get_our_node_id(), &Init { features: node_b.init_features(), remote_network_address: None }, true).unwrap();
+               node_b.peer_connected(&node_a.get_our_node_id(), &Init { features: node_a.init_features(), remote_network_address: None }, false).unwrap();
                node_a.create_channel(node_b.get_our_node_id(), 8_000_000, 100_000_000, 42, None).unwrap();
                node_b.handle_open_channel(&node_a.get_our_node_id(), &get_event_msg!(node_a_holder, MessageSendEvent::SendOpenChannel, node_b.get_our_node_id()));
                node_a.handle_accept_channel(&node_b.get_our_node_id(), &get_event_msg!(node_b_holder, MessageSendEvent::SendAcceptChannel, node_a.get_our_node_id()));