Merge pull request #2128 from valentinewallace/2023-03-route-blinding-groundwork
[rust-lightning] / lightning / src / ln / channelmanager.rs
index 6bfa4f0b03a7c225a84b556d15a6cc6343b8d180..3757b06b3c94dc3b5f2bfca20d6096796980f9a1 100644 (file)
@@ -40,10 +40,10 @@ use crate::events::{Event, EventHandler, EventsProvider, MessageSendEvent, Messa
 // Since this struct is returned in `list_channels` methods, expose it here in case users want to
 // construct one themselves.
 use crate::ln::{inbound_payment, PaymentHash, PaymentPreimage, PaymentSecret};
-use crate::ln::channel::{Channel, ChannelContext, ChannelError, ChannelUpdateStatus, ShutdownResult, UpdateFulfillCommitFetch, OutboundV1Channel, InboundV1Channel};
+use crate::ln::channel::{Channel, ChannelContext, ChannelError, ChannelUpdateStatus, ShutdownResult, UnfundedChannelContext, UpdateFulfillCommitFetch, OutboundV1Channel, InboundV1Channel};
 use crate::ln::features::{ChannelFeatures, ChannelTypeFeatures, InitFeatures, NodeFeatures};
 #[cfg(any(feature = "_test_utils", test))]
-use crate::ln::features::InvoiceFeatures;
+use crate::ln::features::Bolt11InvoiceFeatures;
 use crate::routing::gossip::NetworkGraph;
 use crate::routing::router::{BlindedTail, DefaultRouter, InFlightHtlcs, Path, Payee, PaymentParameters, Route, RouteParameters, Router};
 use crate::routing::scoring::{ProbabilisticScorer, ProbabilisticScoringFeeParameters};
@@ -53,7 +53,7 @@ use crate::ln::onion_utils::HTLCFailReason;
 use crate::ln::msgs::{ChannelMessageHandler, DecodeError, LightningError};
 #[cfg(test)]
 use crate::ln::outbound_payment;
-use crate::ln::outbound_payment::{OutboundPayments, PaymentAttempts, PendingOutboundPayment};
+use crate::ln::outbound_payment::{OutboundPayments, PaymentAttempts, PendingOutboundPayment, SendAlongPathArgs};
 use crate::ln::wire::Encode;
 use crate::sign::{EntropySource, KeysManager, NodeSigner, Recipient, SignerProvider, ChannelSigner, WriteableEcdsaChannelSigner};
 use crate::util::config::{UserConfig, ChannelConfig, ChannelConfigUpdate};
@@ -685,7 +685,7 @@ impl <Signer: ChannelSigner> PeerState<Signer> {
                        && self.in_flight_monitor_updates.is_empty()
        }
 
-       // Returns a count of all channels we have with this peer, including pending channels.
+       // Returns a count of all channels we have with this peer, including unfunded channels.
        fn total_channel_count(&self) -> usize {
                self.channel_by_id.len() +
                        self.outbound_v1_channel_by_id.len() +
@@ -1749,12 +1749,12 @@ macro_rules! convert_chan_err {
                        },
                }
        };
-       ($self: ident, $err: expr, $channel_context: expr, $channel_id: expr, PREFUNDED) => {
+       ($self: ident, $err: expr, $channel_context: expr, $channel_id: expr, UNFUNDED) => {
                match $err {
-                       // We should only ever have `ChannelError::Close` when prefunded channels error.
+                       // We should only ever have `ChannelError::Close` when unfunded channels error.
                        // In any case, just close the channel.
                        ChannelError::Warn(msg) | ChannelError::Ignore(msg) | ChannelError::Close(msg) => {
-                               log_error!($self.logger, "Closing prefunded channel {} due to an error: {}", log_bytes!($channel_id[..]), msg);
+                               log_error!($self.logger, "Closing unfunded channel {} due to an error: {}", log_bytes!($channel_id[..]), msg);
                                update_maps_on_chan_removal!($self, &$channel_context);
                                let shutdown_res = $channel_context.force_shutdown(false);
                                (true, MsgHandleErrInternal::from_finish_shutdown(msg, *$channel_id, $channel_context.get_user_id(),
@@ -1784,7 +1784,7 @@ macro_rules! try_v1_outbound_chan_entry {
                match $res {
                        Ok(res) => res,
                        Err(e) => {
-                               let (drop, res) = convert_chan_err!($self, e, $entry.get_mut().context, $entry.key(), PREFUNDED);
+                               let (drop, res) = convert_chan_err!($self, e, $entry.get_mut().context, $entry.key(), UNFUNDED);
                                if drop {
                                        $entry.remove_entry();
                                }
@@ -2246,6 +2246,7 @@ where
                        for (_cp_id, peer_state_mutex) in per_peer_state.iter() {
                                let mut peer_state_lock = peer_state_mutex.lock().unwrap();
                                let peer_state = &mut *peer_state_lock;
+                               // Only `Channels` in the channel_by_id map can be considered funded.
                                for (_channel_id, channel) in peer_state.channel_by_id.iter().filter(f) {
                                        let details = ChannelDetails::from_channel_context(&channel.context, best_block_height,
                                                peer_state.latest_features.clone(), &self.fee_estimator);
@@ -2314,11 +2315,15 @@ where
                        let mut peer_state_lock = peer_state_mutex.lock().unwrap();
                        let peer_state = &mut *peer_state_lock;
                        let features = &peer_state.latest_features;
+                       let chan_context_to_details = |context| {
+                               ChannelDetails::from_channel_context(context, best_block_height, features.clone(), &self.fee_estimator)
+                       };
                        return peer_state.channel_by_id
                                .iter()
-                               .map(|(_, channel)|
-                                       ChannelDetails::from_channel_context(&channel.context, best_block_height,
-                                       features.clone(), &self.fee_estimator))
+                               .map(|(_, channel)| &channel.context)
+                               .chain(peer_state.outbound_v1_channel_by_id.iter().map(|(_, channel)| &channel.context))
+                               .chain(peer_state.inbound_v1_channel_by_id.iter().map(|(_, channel)| &channel.context))
+                               .map(chan_context_to_details)
                                .collect();
                }
                vec![]
@@ -2375,48 +2380,58 @@ where
 
                let mut failed_htlcs: Vec<(HTLCSource, PaymentHash)>;
                let result: Result<(), _> = loop {
-                       let per_peer_state = self.per_peer_state.read().unwrap();
+                       {
+                               let per_peer_state = self.per_peer_state.read().unwrap();
 
-                       let peer_state_mutex = per_peer_state.get(counterparty_node_id)
-                               .ok_or_else(|| APIError::ChannelUnavailable { err: format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id) })?;
+                               let peer_state_mutex = per_peer_state.get(counterparty_node_id)
+                                       .ok_or_else(|| APIError::ChannelUnavailable { err: format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id) })?;
 
-                       let mut peer_state_lock = peer_state_mutex.lock().unwrap();
-                       let peer_state = &mut *peer_state_lock;
-                       match peer_state.channel_by_id.entry(channel_id.clone()) {
-                               hash_map::Entry::Occupied(mut chan_entry) => {
-                                       let funding_txo_opt = chan_entry.get().context.get_funding_txo();
-                                       let their_features = &peer_state.latest_features;
-                                       let (shutdown_msg, mut monitor_update_opt, htlcs) = chan_entry.get_mut()
-                                               .get_shutdown(&self.signer_provider, their_features, target_feerate_sats_per_1000_weight, override_shutdown_script)?;
-                                       failed_htlcs = htlcs;
+                               let mut peer_state_lock = peer_state_mutex.lock().unwrap();
+                               let peer_state = &mut *peer_state_lock;
 
-                                       // We can send the `shutdown` message before updating the `ChannelMonitor`
-                                       // here as we don't need the monitor update to complete until we send a
-                                       // `shutdown_signed`, which we'll delay if we're pending a monitor update.
-                                       peer_state.pending_msg_events.push(events::MessageSendEvent::SendShutdown {
-                                               node_id: *counterparty_node_id,
-                                               msg: shutdown_msg,
-                                       });
+                               match peer_state.channel_by_id.entry(channel_id.clone()) {
+                                       hash_map::Entry::Occupied(mut chan_entry) => {
+                                               let funding_txo_opt = chan_entry.get().context.get_funding_txo();
+                                               let their_features = &peer_state.latest_features;
+                                               let (shutdown_msg, mut monitor_update_opt, htlcs) = chan_entry.get_mut()
+                                                       .get_shutdown(&self.signer_provider, their_features, target_feerate_sats_per_1000_weight, override_shutdown_script)?;
+                                               failed_htlcs = htlcs;
 
-                                       // Update the monitor with the shutdown script if necessary.
-                                       if let Some(monitor_update) = monitor_update_opt.take() {
-                                               break handle_new_monitor_update!(self, funding_txo_opt.unwrap(), monitor_update,
-                                                       peer_state_lock, peer_state, per_peer_state, chan_entry).map(|_| ());
-                                       }
+                                               // We can send the `shutdown` message before updating the `ChannelMonitor`
+                                               // here as we don't need the monitor update to complete until we send a
+                                               // `shutdown_signed`, which we'll delay if we're pending a monitor update.
+                                               peer_state.pending_msg_events.push(events::MessageSendEvent::SendShutdown {
+                                                       node_id: *counterparty_node_id,
+                                                       msg: shutdown_msg,
+                                               });
 
-                                       if chan_entry.get().is_shutdown() {
-                                               let channel = remove_channel!(self, chan_entry);
-                                               if let Ok(channel_update) = self.get_channel_update_for_broadcast(&channel) {
-                                                       peer_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
-                                                               msg: channel_update
-                                                       });
+                                               // Update the monitor with the shutdown script if necessary.
+                                               if let Some(monitor_update) = monitor_update_opt.take() {
+                                                       break handle_new_monitor_update!(self, funding_txo_opt.unwrap(), monitor_update,
+                                                               peer_state_lock, peer_state, per_peer_state, chan_entry).map(|_| ());
                                                }
-                                               self.issue_channel_close_events(&channel.context, ClosureReason::HolderForceClosed);
-                                       }
-                                       break Ok(());
-                               },
-                               hash_map::Entry::Vacant(_) => return Err(APIError::ChannelUnavailable{err: format!("Channel with id {} not found for the passed counterparty node_id {}", log_bytes!(*channel_id), counterparty_node_id) })
+
+                                               if chan_entry.get().is_shutdown() {
+                                                       let channel = remove_channel!(self, chan_entry);
+                                                       if let Ok(channel_update) = self.get_channel_update_for_broadcast(&channel) {
+                                                               peer_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
+                                                                       msg: channel_update
+                                                               });
+                                                       }
+                                                       self.issue_channel_close_events(&channel.context, ClosureReason::HolderForceClosed);
+                                               }
+                                               break Ok(());
+                                       },
+                                       hash_map::Entry::Vacant(_) => (),
+                               }
                        }
+                       // If we reach this point, it means that the channel_id either refers to an unfunded channel or
+                       // it does not exist for this peer. Either way, we can attempt to force-close it.
+                       //
+                       // An appropriate error will be returned for non-existence of the channel if that's the case.
+                       return self.force_close_channel_with_peer(&channel_id, counterparty_node_id, None, false).map(|_| ())
+                       // TODO(dunxen): This is still not ideal as we're doing some extra lookups.
+                       // Fix this with https://github.com/lightningdevkit/rust-lightning/issues/2422
                };
 
                for htlc_source in failed_htlcs.drain(..) {
@@ -2535,14 +2550,14 @@ where
                                self.issue_channel_close_events(&chan.get().context, closure_reason);
                                let mut chan = remove_channel!(self, chan);
                                self.finish_force_close_channel(chan.context.force_shutdown(false));
-                               // Prefunded channel has no update
+                               // Unfunded channel has no update
                                (None, chan.context.get_counterparty_node_id())
                        } else if let hash_map::Entry::Occupied(chan) = peer_state.inbound_v1_channel_by_id.entry(channel_id.clone()) {
                                log_error!(self.logger, "Force-closing channel {}", log_bytes!(channel_id[..]));
                                self.issue_channel_close_events(&chan.get().context, closure_reason);
                                let mut chan = remove_channel!(self, chan);
                                self.finish_force_close_channel(chan.context.force_shutdown(false));
-                               // Prefunded channel has no update
+                               // Unfunded channel has no update
                                (None, chan.context.get_counterparty_node_id())
                        } else {
                                return Err(APIError::ChannelUnavailable{ err: format!("Channel with id {} not found for the passed counterparty node_id {}", log_bytes!(*channel_id), peer_node_id) });
@@ -3096,10 +3111,17 @@ where
        #[cfg(test)]
        pub(crate) fn test_send_payment_along_path(&self, path: &Path, payment_hash: &PaymentHash, recipient_onion: RecipientOnionFields, total_value: u64, cur_height: u32, payment_id: PaymentId, keysend_preimage: &Option<PaymentPreimage>, session_priv_bytes: [u8; 32]) -> Result<(), APIError> {
                let _lck = self.total_consistency_lock.read().unwrap();
-               self.send_payment_along_path(path, payment_hash, recipient_onion, total_value, cur_height, payment_id, keysend_preimage, session_priv_bytes)
+               self.send_payment_along_path(SendAlongPathArgs {
+                       path, payment_hash, recipient_onion, total_value, cur_height, payment_id, keysend_preimage,
+                       session_priv_bytes
+               })
        }
 
-       fn send_payment_along_path(&self, path: &Path, payment_hash: &PaymentHash, recipient_onion: RecipientOnionFields, total_value: u64, cur_height: u32, payment_id: PaymentId, keysend_preimage: &Option<PaymentPreimage>, session_priv_bytes: [u8; 32]) -> Result<(), APIError> {
+       fn send_payment_along_path(&self, args: SendAlongPathArgs) -> Result<(), APIError> {
+               let SendAlongPathArgs {
+                       path, payment_hash, recipient_onion, total_value, cur_height, payment_id, keysend_preimage,
+                       session_priv_bytes
+               } = args;
                // The top-level caller should hold the total_consistency_lock read lock.
                debug_assert!(self.total_consistency_lock.try_write().is_err());
 
@@ -3229,9 +3251,9 @@ where
                let best_block_height = self.best_block.read().unwrap().height();
                let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self);
                self.pending_outbound_payments
-                       .send_payment_with_route(route, payment_hash, recipient_onion, payment_id, &self.entropy_source, &self.node_signer, best_block_height,
-                               |path, payment_hash, recipient_onion, total_value, cur_height, payment_id, keysend_preimage, session_priv|
-                               self.send_payment_along_path(path, payment_hash, recipient_onion, total_value, cur_height, payment_id, keysend_preimage, session_priv))
+                       .send_payment_with_route(route, payment_hash, recipient_onion, payment_id,
+                               &self.entropy_source, &self.node_signer, best_block_height,
+                               |args| self.send_payment_along_path(args))
        }
 
        /// Similar to [`ChannelManager::send_payment_with_route`], but will automatically find a route based on
@@ -3243,18 +3265,16 @@ where
                        .send_payment(payment_hash, recipient_onion, payment_id, retry_strategy, route_params,
                                &self.router, self.list_usable_channels(), || self.compute_inflight_htlcs(),
                                &self.entropy_source, &self.node_signer, best_block_height, &self.logger,
-                               &self.pending_events,
-                               |path, payment_hash, recipient_onion, total_value, cur_height, payment_id, keysend_preimage, session_priv|
-                               self.send_payment_along_path(path, payment_hash, recipient_onion, total_value, cur_height, payment_id, keysend_preimage, session_priv))
+                               &self.pending_events, |args| self.send_payment_along_path(args))
        }
 
        #[cfg(test)]
        pub(super) fn test_send_payment_internal(&self, route: &Route, payment_hash: PaymentHash, recipient_onion: RecipientOnionFields, keysend_preimage: Option<PaymentPreimage>, payment_id: PaymentId, recv_value_msat: Option<u64>, onion_session_privs: Vec<[u8; 32]>) -> Result<(), PaymentSendFailure> {
                let best_block_height = self.best_block.read().unwrap().height();
                let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self);
-               self.pending_outbound_payments.test_send_payment_internal(route, payment_hash, recipient_onion, keysend_preimage, payment_id, recv_value_msat, onion_session_privs, &self.node_signer, best_block_height,
-                       |path, payment_hash, recipient_onion, total_value, cur_height, payment_id, keysend_preimage, session_priv|
-                       self.send_payment_along_path(path, payment_hash, recipient_onion, total_value, cur_height, payment_id, keysend_preimage, session_priv))
+               self.pending_outbound_payments.test_send_payment_internal(route, payment_hash, recipient_onion,
+                       keysend_preimage, payment_id, recv_value_msat, onion_session_privs, &self.node_signer,
+                       best_block_height, |args| self.send_payment_along_path(args))
        }
 
        #[cfg(test)]
@@ -3308,9 +3328,7 @@ where
                let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self);
                self.pending_outbound_payments.send_spontaneous_payment_with_route(
                        route, payment_preimage, recipient_onion, payment_id, &self.entropy_source,
-                       &self.node_signer, best_block_height,
-                       |path, payment_hash, recipient_onion, total_value, cur_height, payment_id, keysend_preimage, session_priv|
-                       self.send_payment_along_path(path, payment_hash, recipient_onion, total_value, cur_height, payment_id, keysend_preimage, session_priv))
+                       &self.node_signer, best_block_height, |args| self.send_payment_along_path(args))
        }
 
        /// Similar to [`ChannelManager::send_spontaneous_payment`], but will automatically find a route
@@ -3326,9 +3344,7 @@ where
                self.pending_outbound_payments.send_spontaneous_payment(payment_preimage, recipient_onion,
                        payment_id, retry_strategy, route_params, &self.router, self.list_usable_channels(),
                        || self.compute_inflight_htlcs(),  &self.entropy_source, &self.node_signer, best_block_height,
-                       &self.logger, &self.pending_events,
-                       |path, payment_hash, recipient_onion, total_value, cur_height, payment_id, keysend_preimage, session_priv|
-                       self.send_payment_along_path(path, payment_hash, recipient_onion, total_value, cur_height, payment_id, keysend_preimage, session_priv))
+                       &self.logger, &self.pending_events, |args| self.send_payment_along_path(args))
        }
 
        /// Send a payment that is probing the given route for liquidity. We calculate the
@@ -3337,9 +3353,9 @@ where
        pub fn send_probe(&self, path: Path) -> Result<(PaymentHash, PaymentId), PaymentSendFailure> {
                let best_block_height = self.best_block.read().unwrap().height();
                let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self);
-               self.pending_outbound_payments.send_probe(path, self.probing_cookie_secret, &self.entropy_source, &self.node_signer, best_block_height,
-                       |path, payment_hash, recipient_onion, total_value, cur_height, payment_id, keysend_preimage, session_priv|
-                       self.send_payment_along_path(path, payment_hash, recipient_onion, total_value, cur_height, payment_id, keysend_preimage, session_priv))
+               self.pending_outbound_payments.send_probe(path, self.probing_cookie_secret,
+                       &self.entropy_source, &self.node_signer, best_block_height,
+                       |args| self.send_payment_along_path(args))
        }
 
        /// Returns whether a payment with the given [`PaymentHash`] and [`PaymentId`] is, in fact, a
@@ -3364,7 +3380,7 @@ where
                        Some(chan) => {
                                let funding_txo = find_funding_output(&chan, &funding_transaction)?;
 
-                               let funding_res = chan.get_outbound_funding_created(funding_transaction, funding_txo, &self.logger)
+                               let funding_res = chan.get_funding_created(funding_transaction, funding_txo, &self.logger)
                                        .map_err(|(mut chan, e)| if let ChannelError::Close(msg) = e {
                                                let channel_id = chan.context.channel_id();
                                                let user_id = chan.context.get_user_id();
@@ -3537,27 +3553,48 @@ where
                let mut peer_state_lock = peer_state_mutex.lock().unwrap();
                let peer_state = &mut *peer_state_lock;
                for channel_id in channel_ids {
-                       if !peer_state.channel_by_id.contains_key(channel_id) {
+                       if !peer_state.has_channel(channel_id) {
                                return Err(APIError::ChannelUnavailable {
                                        err: format!("Channel with ID {} was not found for the passed counterparty_node_id {}", log_bytes!(*channel_id), counterparty_node_id),
                                });
-                       }
+                       };
                }
                for channel_id in channel_ids {
-                       let channel = peer_state.channel_by_id.get_mut(channel_id).unwrap();
-                       let mut config = channel.context.config();
-                       config.apply(config_update);
-                       if !channel.context.update_config(&config) {
+                       if let Some(channel) = peer_state.channel_by_id.get_mut(channel_id) {
+                               let mut config = channel.context.config();
+                               config.apply(config_update);
+                               if !channel.context.update_config(&config) {
+                                       continue;
+                               }
+                               if let Ok(msg) = self.get_channel_update_for_broadcast(channel) {
+                                       peer_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { msg });
+                               } else if let Ok(msg) = self.get_channel_update_for_unicast(channel) {
+                                       peer_state.pending_msg_events.push(events::MessageSendEvent::SendChannelUpdate {
+                                               node_id: channel.context.get_counterparty_node_id(),
+                                               msg,
+                                       });
+                               }
                                continue;
                        }
-                       if let Ok(msg) = self.get_channel_update_for_broadcast(channel) {
-                               peer_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { msg });
-                       } else if let Ok(msg) = self.get_channel_update_for_unicast(channel) {
-                               peer_state.pending_msg_events.push(events::MessageSendEvent::SendChannelUpdate {
-                                       node_id: channel.context.get_counterparty_node_id(),
-                                       msg,
+
+                       let context = if let Some(channel) = peer_state.inbound_v1_channel_by_id.get_mut(channel_id) {
+                               &mut channel.context
+                       } else if let Some(channel) = peer_state.outbound_v1_channel_by_id.get_mut(channel_id) {
+                               &mut channel.context
+                       } else {
+                               // This should not be reachable as we've already checked for non-existence in the previous channel_id loop.
+                               debug_assert!(false);
+                               return Err(APIError::ChannelUnavailable {
+                                       err: format!(
+                                               "Channel with ID {} for passed counterparty_node_id {} disappeared after we confirmed its existence - this should not be reachable!",
+                                               log_bytes!(*channel_id), counterparty_node_id),
                                });
-                       }
+                       };
+                       let mut config = context.config();
+                       config.apply(config_update);
+                       // We update the config, but we MUST NOT broadcast a `channel_update` before `channel_ready`
+                       // which would be the case for pending inbound/outbound channels.
+                       context.update_config(&config);
                }
                Ok(())
        }
@@ -4144,9 +4181,7 @@ where
                let best_block_height = self.best_block.read().unwrap().height();
                self.pending_outbound_payments.check_retry_payments(&self.router, || self.list_usable_channels(),
                        || self.compute_inflight_htlcs(), &self.entropy_source, &self.node_signer, best_block_height,
-                       &self.pending_events, &self.logger,
-                       |path, payment_hash, recipient_onion, total_value, cur_height, payment_id, keysend_preimage, session_priv|
-                       self.send_payment_along_path(path, payment_hash, recipient_onion, total_value, cur_height, payment_id, keysend_preimage, session_priv));
+                       &self.pending_events, &self.logger, |args| self.send_payment_along_path(args));
 
                for (htlc_source, payment_hash, failure_reason, destination) in failed_forwards.drain(..) {
                        self.fail_htlc_backwards_internal(&htlc_source, &payment_hash, &failure_reason, destination);
@@ -4272,13 +4307,19 @@ where
                PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, &self.persistence_notifier, || {
                        let mut should_persist = self.process_background_events();
 
-                       let new_feerate = self.fee_estimator.bounded_sat_per_1000_weight(ConfirmationTarget::Normal);
+                       let normal_feerate = self.fee_estimator.bounded_sat_per_1000_weight(ConfirmationTarget::Normal);
+                       let min_mempool_feerate = self.fee_estimator.bounded_sat_per_1000_weight(ConfirmationTarget::MempoolMinimum);
 
                        let per_peer_state = self.per_peer_state.read().unwrap();
                        for (_cp_id, peer_state_mutex) in per_peer_state.iter() {
                                let mut peer_state_lock = peer_state_mutex.lock().unwrap();
                                let peer_state = &mut *peer_state_lock;
                                for (chan_id, chan) in peer_state.channel_by_id.iter_mut() {
+                                       let new_feerate = if chan.context.get_channel_type().supports_anchors_zero_fee_htlc_tx() {
+                                               min_mempool_feerate
+                                       } else {
+                                               normal_feerate
+                                       };
                                        let chan_needs_persist = self.update_channel_fee(chan_id, chan, new_feerate);
                                        if chan_needs_persist == NotifyOption::DoPersist { should_persist = NotifyOption::DoPersist; }
                                }
@@ -4298,6 +4339,7 @@ where
        ///  * Expiring a channel's previous [`ChannelConfig`] if necessary to only allow forwarding HTLCs
        ///    with the current [`ChannelConfig`].
        ///  * Removing peers which have disconnected but and no longer have any channels.
+       ///  * Force-closing and removing channels which have not completed establishment in a timely manner.
        ///
        /// Note that this may cause reentrancy through [`chain::Watch::update_channel`] calls or feerate
        /// estimate fetches.
@@ -4308,7 +4350,8 @@ where
                PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, &self.persistence_notifier, || {
                        let mut should_persist = self.process_background_events();
 
-                       let new_feerate = self.fee_estimator.bounded_sat_per_1000_weight(ConfirmationTarget::Normal);
+                       let normal_feerate = self.fee_estimator.bounded_sat_per_1000_weight(ConfirmationTarget::Normal);
+                       let min_mempool_feerate = self.fee_estimator.bounded_sat_per_1000_weight(ConfirmationTarget::MempoolMinimum);
 
                        let mut handle_errors: Vec<(Result<(), _>, _)> = Vec::new();
                        let mut timed_out_mpp_htlcs = Vec::new();
@@ -4321,6 +4364,11 @@ where
                                        let pending_msg_events = &mut peer_state.pending_msg_events;
                                        let counterparty_node_id = *counterparty_node_id;
                                        peer_state.channel_by_id.retain(|chan_id, chan| {
+                                               let new_feerate = if chan.context.get_channel_type().supports_anchors_zero_fee_htlc_tx() {
+                                                       min_mempool_feerate
+                                               } else {
+                                                       normal_feerate
+                                               };
                                                let chan_needs_persist = self.update_channel_fee(chan_id, chan, new_feerate);
                                                if chan_needs_persist == NotifyOption::DoPersist { should_persist = NotifyOption::DoPersist; }
 
@@ -4386,6 +4434,26 @@ where
 
                                                true
                                        });
+
+                                       let process_unfunded_channel_tick = |
+                                               chan_id: &[u8; 32],
+                                               chan_context: &mut ChannelContext<<SP::Target as SignerProvider>::Signer>,
+                                               unfunded_chan_context: &mut UnfundedChannelContext,
+                                       | {
+                                               chan_context.maybe_expire_prev_config();
+                                               if unfunded_chan_context.should_expire_unfunded_channel() {
+                                                       log_error!(self.logger, "Force-closing pending outbound channel {} for not establishing in a timely manner", log_bytes!(&chan_id[..]));
+                                                       update_maps_on_chan_removal!(self, &chan_context);
+                                                       self.issue_channel_close_events(&chan_context, ClosureReason::HolderForceClosed);
+                                                       self.finish_force_close_channel(chan_context.force_shutdown(false));
+                                                       false
+                                               } else {
+                                                       true
+                                               }
+                                       };
+                                       peer_state.outbound_v1_channel_by_id.retain(|chan_id, chan| process_unfunded_channel_tick(chan_id, &mut chan.context, &mut chan.unfunded_context));
+                                       peer_state.inbound_v1_channel_by_id.retain(|chan_id, chan| process_unfunded_channel_tick(chan_id, &mut chan.context, &mut chan.unfunded_context));
+
                                        if peer_state.ok_to_remove(true) {
                                                pending_peers_awaiting_removal.push(counterparty_node_id);
                                        }
@@ -5496,38 +5564,50 @@ where
                                })?;
                        let mut peer_state_lock = peer_state_mutex.lock().unwrap();
                        let peer_state = &mut *peer_state_lock;
-                       match peer_state.channel_by_id.entry(msg.channel_id.clone()) {
-                               hash_map::Entry::Occupied(mut chan_entry) => {
-
-                                       if !chan_entry.get().received_shutdown() {
-                                               log_info!(self.logger, "Received a shutdown message from our counterparty for channel {}{}.",
-                                                       log_bytes!(msg.channel_id),
-                                                       if chan_entry.get().sent_shutdown() { " after we initiated shutdown" } else { "" });
-                                       }
+                       // TODO(dunxen): Fix this duplication when we switch to a single map with enums as per
+                       // https://github.com/lightningdevkit/rust-lightning/issues/2422
+                       if let hash_map::Entry::Occupied(chan_entry) = peer_state.outbound_v1_channel_by_id.entry(msg.channel_id.clone()) {
+                               log_error!(self.logger, "Immediately closing unfunded channel {} as peer asked to cooperatively shut it down (which is unnecessary)", log_bytes!(&msg.channel_id[..]));
+                               self.issue_channel_close_events(&chan_entry.get().context, ClosureReason::CounterpartyCoopClosedUnfundedChannel);
+                               let mut chan = remove_channel!(self, chan_entry);
+                               self.finish_force_close_channel(chan.context.force_shutdown(false));
+                               return Ok(());
+                       } else if let hash_map::Entry::Occupied(chan_entry) = peer_state.inbound_v1_channel_by_id.entry(msg.channel_id.clone()) {
+                               log_error!(self.logger, "Immediately closing unfunded channel {} as peer asked to cooperatively shut it down (which is unnecessary)", log_bytes!(&msg.channel_id[..]));
+                               self.issue_channel_close_events(&chan_entry.get().context, ClosureReason::CounterpartyCoopClosedUnfundedChannel);
+                               let mut chan = remove_channel!(self, chan_entry);
+                               self.finish_force_close_channel(chan.context.force_shutdown(false));
+                               return Ok(());
+                       } else if let hash_map::Entry::Occupied(mut chan_entry) = peer_state.channel_by_id.entry(msg.channel_id.clone()) {
+                               if !chan_entry.get().received_shutdown() {
+                                       log_info!(self.logger, "Received a shutdown message from our counterparty for channel {}{}.",
+                                               log_bytes!(msg.channel_id),
+                                               if chan_entry.get().sent_shutdown() { " after we initiated shutdown" } else { "" });
+                               }
 
-                                       let funding_txo_opt = chan_entry.get().context.get_funding_txo();
-                                       let (shutdown, monitor_update_opt, htlcs) = try_chan_entry!(self,
-                                               chan_entry.get_mut().shutdown(&self.signer_provider, &peer_state.latest_features, &msg), chan_entry);
-                                       dropped_htlcs = htlcs;
+                               let funding_txo_opt = chan_entry.get().context.get_funding_txo();
+                               let (shutdown, monitor_update_opt, htlcs) = try_chan_entry!(self,
+                                       chan_entry.get_mut().shutdown(&self.signer_provider, &peer_state.latest_features, &msg), chan_entry);
+                               dropped_htlcs = htlcs;
 
-                                       if let Some(msg) = shutdown {
-                                               // We can send the `shutdown` message before updating the `ChannelMonitor`
-                                               // here as we don't need the monitor update to complete until we send a
-                                               // `shutdown_signed`, which we'll delay if we're pending a monitor update.
-                                               peer_state.pending_msg_events.push(events::MessageSendEvent::SendShutdown {
-                                                       node_id: *counterparty_node_id,
-                                                       msg,
-                                               });
-                                       }
+                               if let Some(msg) = shutdown {
+                                       // We can send the `shutdown` message before updating the `ChannelMonitor`
+                                       // here as we don't need the monitor update to complete until we send a
+                                       // `shutdown_signed`, which we'll delay if we're pending a monitor update.
+                                       peer_state.pending_msg_events.push(events::MessageSendEvent::SendShutdown {
+                                               node_id: *counterparty_node_id,
+                                               msg,
+                                       });
+                               }
 
-                                       // Update the monitor with the shutdown script if necessary.
-                                       if let Some(monitor_update) = monitor_update_opt {
-                                               break handle_new_monitor_update!(self, funding_txo_opt.unwrap(), monitor_update,
-                                                       peer_state_lock, peer_state, per_peer_state, chan_entry).map(|_| ());
-                                       }
-                                       break Ok(());
-                               },
-                               hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id))
+                               // Update the monitor with the shutdown script if necessary.
+                               if let Some(monitor_update) = monitor_update_opt {
+                                       break handle_new_monitor_update!(self, funding_txo_opt.unwrap(), monitor_update,
+                                               peer_state_lock, peer_state, per_peer_state, chan_entry).map(|_| ());
+                               }
+                               break Ok(());
+                       } else {
+                               return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id))
                        }
                };
                for htlc_source in dropped_htlcs.drain(..) {
@@ -6942,13 +7022,13 @@ where
                provided_node_features(&self.default_configuration)
        }
 
-       /// Fetches the set of [`InvoiceFeatures`] flags which are provided by or required by
+       /// Fetches the set of [`Bolt11InvoiceFeatures`] flags which are provided by or required by
        /// [`ChannelManager`].
        ///
        /// Note that the invoice feature flags can vary depending on if the invoice is a "phantom invoice"
        /// or not. Thus, this method is not public.
        #[cfg(any(feature = "_test_utils", test))]
-       pub fn invoice_features(&self) -> InvoiceFeatures {
+       pub fn invoice_features(&self) -> Bolt11InvoiceFeatures {
                provided_invoice_features(&self.default_configuration)
        }
 
@@ -7228,37 +7308,20 @@ where
                log_debug!(self.logger, "Generating channel_reestablish events for {}", log_pubkey!(counterparty_node_id));
 
                let per_peer_state = self.per_peer_state.read().unwrap();
-               for (_cp_id, peer_state_mutex) in per_peer_state.iter() {
+               if let Some(peer_state_mutex) = per_peer_state.get(counterparty_node_id) {
                        let mut peer_state_lock = peer_state_mutex.lock().unwrap();
                        let peer_state = &mut *peer_state_lock;
                        let pending_msg_events = &mut peer_state.pending_msg_events;
-                       peer_state.channel_by_id.retain(|_, chan| {
-                               let retain = if chan.context.get_counterparty_node_id() == *counterparty_node_id {
-                                       if !chan.context.have_received_message() {
-                                               // If we created this (outbound) channel while we were disconnected from the
-                                               // peer we probably failed to send the open_channel message, which is now
-                                               // lost. We can't have had anything pending related to this channel, so we just
-                                               // drop it.
-                                               false
-                                       } else {
-                                               pending_msg_events.push(events::MessageSendEvent::SendChannelReestablish {
-                                                       node_id: chan.context.get_counterparty_node_id(),
-                                                       msg: chan.get_channel_reestablish(&self.logger),
-                                               });
-                                               true
-                                       }
-                               } else { true };
-                               if retain && chan.context.get_counterparty_node_id() != *counterparty_node_id {
-                                       if let Some(msg) = chan.get_signed_channel_announcement(&self.node_signer, self.genesis_hash.clone(), self.best_block.read().unwrap().height(), &self.default_configuration) {
-                                               if let Ok(update_msg) = self.get_channel_update_for_broadcast(chan) {
-                                                       pending_msg_events.push(events::MessageSendEvent::SendChannelAnnouncement {
-                                                               node_id: *counterparty_node_id,
-                                                               msg, update_msg,
-                                                       });
-                                               }
-                                       }
-                               }
-                               retain
+
+                       // Since unfunded channel maps are cleared upon disconnecting a peer, and they're not persisted
+                       // (so won't be recovered after a crash) we don't need to bother closing unfunded channels and
+                       // clearing their maps here. Instead we can just send queue channel_reestablish messages for
+                       // channels in the channel_by_id map.
+                       peer_state.channel_by_id.iter_mut().for_each(|(_, chan)| {
+                               pending_msg_events.push(events::MessageSendEvent::SendChannelReestablish {
+                                       node_id: chan.context.get_counterparty_node_id(),
+                                       msg: chan.get_channel_reestablish(&self.logger),
+                               });
                        });
                }
                //TODO: Also re-broadcast announcement_signatures
@@ -7292,7 +7355,7 @@ where
                                let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap();
                                let peer_state = &mut *peer_state_lock;
                                if let Some(chan) = peer_state.outbound_v1_channel_by_id.get_mut(&msg.channel_id) {
-                                       if let Ok(msg) = chan.maybe_handle_error_without_close(self.genesis_hash) {
+                                       if let Ok(msg) = chan.maybe_handle_error_without_close(self.genesis_hash, &self.fee_estimator) {
                                                peer_state.pending_msg_events.push(events::MessageSendEvent::SendOpenChannel {
                                                        node_id: *counterparty_node_id,
                                                        msg,
@@ -7377,16 +7440,18 @@ where
 /// Fetches the set of [`NodeFeatures`] flags which are provided by or required by
 /// [`ChannelManager`].
 pub(crate) fn provided_node_features(config: &UserConfig) -> NodeFeatures {
-       provided_init_features(config).to_context()
+       let mut node_features = provided_init_features(config).to_context();
+       node_features.set_keysend_optional();
+       node_features
 }
 
-/// Fetches the set of [`InvoiceFeatures`] flags which are provided by or required by
+/// Fetches the set of [`Bolt11InvoiceFeatures`] flags which are provided by or required by
 /// [`ChannelManager`].
 ///
 /// Note that the invoice feature flags can vary depending on if the invoice is a "phantom invoice"
 /// or not. Thus, this method is not public.
 #[cfg(any(feature = "_test_utils", test))]
-pub(crate) fn provided_invoice_features(config: &UserConfig) -> InvoiceFeatures {
+pub(crate) fn provided_invoice_features(config: &UserConfig) -> Bolt11InvoiceFeatures {
        provided_init_features(config).to_context()
 }
 
@@ -10184,6 +10249,25 @@ mod tests {
                        MessageSendEvent::BroadcastChannelUpdate { .. } => {},
                        _ => panic!("expected BroadcastChannelUpdate event"),
                }
+
+               // If we provide a channel_id not associated with the peer, we should get an error and no updates
+               // should be applied to ensure update atomicity as specified in the API docs.
+               let bad_channel_id = [10; 32];
+               let current_fee = nodes[0].node.list_channels()[0].config.unwrap().forwarding_fee_proportional_millionths;
+               let new_fee = current_fee + 100;
+               assert!(
+                       matches!(
+                               nodes[0].node.update_partial_channel_config(&channel.counterparty.node_id, &[channel.channel_id, bad_channel_id], &ChannelConfigUpdate {
+                                       forwarding_fee_proportional_millionths: Some(new_fee),
+                                       ..Default::default()
+                               }),
+                               Err(APIError::ChannelUnavailable { err: _ }),
+                       )
+               );
+               // Check that the fee hasn't changed for the channel that exists.
+               assert_eq!(nodes[0].node.list_channels()[0].config.unwrap().forwarding_fee_proportional_millionths, current_fee);
+               let events = nodes[0].node.get_and_clear_pending_msg_events();
+               assert_eq!(events.len(), 0);
        }
 }