Merge pull request #2387 from vladimirfomene/add_extra_fields_to_ChannelClosed_event
[rust-lightning] / lightning / src / ln / channelmanager.rs
index 7bcc75c01c3e1c8be16f758eeee763e8219c7eb4..961f25cc664ad3425d39dcf5e099b66c44eb4949 100644 (file)
@@ -40,12 +40,12 @@ use crate::events::{Event, EventHandler, EventsProvider, MessageSendEvent, Messa
 // Since this struct is returned in `list_channels` methods, expose it here in case users want to
 // construct one themselves.
 use crate::ln::{inbound_payment, PaymentHash, PaymentPreimage, PaymentSecret};
-use crate::ln::channel::{Channel, ChannelContext, ChannelError, ChannelUpdateStatus, ShutdownResult, UpdateFulfillCommitFetch, OutboundV1Channel, InboundV1Channel};
+use crate::ln::channel::{Channel, ChannelContext, ChannelError, ChannelUpdateStatus, ShutdownResult, UnfundedChannelContext, UpdateFulfillCommitFetch, OutboundV1Channel, InboundV1Channel};
 use crate::ln::features::{ChannelFeatures, ChannelTypeFeatures, InitFeatures, NodeFeatures};
 #[cfg(any(feature = "_test_utils", test))]
-use crate::ln::features::InvoiceFeatures;
+use crate::ln::features::Bolt11InvoiceFeatures;
 use crate::routing::gossip::NetworkGraph;
-use crate::routing::router::{BlindedTail, DefaultRouter, InFlightHtlcs, Path, Payee, PaymentParameters, Route, RouteHop, RouteParameters, Router};
+use crate::routing::router::{BlindedTail, DefaultRouter, InFlightHtlcs, Path, Payee, PaymentParameters, Route, RouteParameters, Router};
 use crate::routing::scoring::{ProbabilisticScorer, ProbabilisticScoringFeeParameters};
 use crate::ln::msgs;
 use crate::ln::onion_utils;
@@ -53,7 +53,7 @@ use crate::ln::onion_utils::HTLCFailReason;
 use crate::ln::msgs::{ChannelMessageHandler, DecodeError, LightningError};
 #[cfg(test)]
 use crate::ln::outbound_payment;
-use crate::ln::outbound_payment::{OutboundPayments, PaymentAttempts, PendingOutboundPayment};
+use crate::ln::outbound_payment::{OutboundPayments, PaymentAttempts, PendingOutboundPayment, SendAlongPathArgs};
 use crate::ln::wire::Encode;
 use crate::sign::{EntropySource, KeysManager, NodeSigner, Recipient, SignerProvider, ChannelSigner, WriteableEcdsaChannelSigner};
 use crate::util::config::{UserConfig, ChannelConfig, ChannelConfigUpdate};
@@ -110,6 +110,8 @@ pub(super) enum PendingHTLCRouting {
                payment_metadata: Option<Vec<u8>>,
                incoming_cltv_expiry: u32, // Used to track when we should expire pending HTLCs that go unclaimed
                phantom_shared_secret: Option<[u8; 32]>,
+               /// See [`RecipientOnionFields::custom_tlvs`] for more info.
+               custom_tlvs: Vec<(u64, Vec<u8>)>,
        },
        ReceiveKeysend {
                /// This was added in 0.0.116 and will break deserialization on downgrades.
@@ -117,6 +119,8 @@ pub(super) enum PendingHTLCRouting {
                payment_preimage: PaymentPreimage,
                payment_metadata: Option<Vec<u8>>,
                incoming_cltv_expiry: u32, // Used to track when we should expire pending HTLCs that go unclaimed
+               /// See [`RecipientOnionFields::custom_tlvs`] for more info.
+               custom_tlvs: Vec<(u64, Vec<u8>)>,
        },
 }
 
@@ -131,6 +135,9 @@ pub(super) struct PendingHTLCInfo {
        /// may overshoot this in either case)
        pub(super) outgoing_amt_msat: u64,
        pub(super) outgoing_cltv_value: u32,
+       /// The fee being skimmed off the top of this HTLC. If this is a forward, it'll be the fee we are
+       /// skimming. If we're receiving this HTLC, it's the fee that our counterparty skimmed.
+       pub(super) skimmed_fee_msat: Option<u64>,
 }
 
 #[derive(Clone)] // See Channel::revoke_and_ack for why, tl;dr: Rust bug
@@ -210,6 +217,8 @@ struct ClaimableHTLC {
        total_value_received: Option<u64>,
        /// The sender intended sum total of all MPP parts specified in the onion
        total_msat: u64,
+       /// The extra fee our counterparty skimmed off the top of this HTLC.
+       counterparty_skimmed_fee_msat: Option<u64>,
 }
 
 /// A payment identifier used to uniquely identify a payment to LDK.
@@ -312,7 +321,7 @@ impl core::hash::Hash for HTLCSource {
        }
 }
 impl HTLCSource {
-       #[cfg(not(feature = "grind_signatures"))]
+       #[cfg(all(feature = "_test_vectors", not(feature = "grind_signatures")))]
        #[cfg(test)]
        pub fn dummy() -> Self {
                HTLCSource::OutboundRoute {
@@ -336,7 +345,7 @@ impl HTLCSource {
        }
 }
 
-struct ReceiveError {
+struct InboundOnionErr {
        err_code: u16,
        err_data: Vec<u8>,
        msg: &'static str,
@@ -350,15 +359,32 @@ struct ReceiveError {
 pub enum FailureCode {
        /// We had a temporary error processing the payment. Useful if no other error codes fit
        /// and you want to indicate that the payer may want to retry.
-       TemporaryNodeFailure             = 0x2000 | 2,
+       TemporaryNodeFailure,
        /// We have a required feature which was not in this onion. For example, you may require
        /// some additional metadata that was not provided with this payment.
-       RequiredNodeFeatureMissing       = 0x4000 | 0x2000 | 3,
+       RequiredNodeFeatureMissing,
        /// You may wish to use this when a `payment_preimage` is unknown, or the CLTV expiry of
        /// the HTLC is too close to the current block height for safe handling.
        /// Using this failure code in [`ChannelManager::fail_htlc_backwards_with_reason`] is
        /// equivalent to calling [`ChannelManager::fail_htlc_backwards`].
-       IncorrectOrUnknownPaymentDetails = 0x4000 | 15,
+       IncorrectOrUnknownPaymentDetails,
+       /// We failed to process the payload after the onion was decrypted. You may wish to
+       /// use this when receiving custom HTLC TLVs with even type numbers that you don't recognize.
+       ///
+       /// If available, the tuple data may include the type number and byte offset in the
+       /// decrypted byte stream where the failure occurred.
+       InvalidOnionPayload(Option<(u64, u16)>),
+}
+
+impl Into<u16> for FailureCode {
+    fn into(self) -> u16 {
+               match self {
+                       FailureCode::TemporaryNodeFailure => 0x2000 | 2,
+                       FailureCode::RequiredNodeFeatureMissing => 0x4000 | 0x2000 | 3,
+                       FailureCode::IncorrectOrUnknownPaymentDetails => 0x4000 | 15,
+                       FailureCode::InvalidOnionPayload(_) => 0x4000 | 22,
+               }
+       }
 }
 
 /// Error type returned across the peer_state mutex boundary. When an Err is generated for a
@@ -371,6 +397,7 @@ struct MsgHandleErrInternal {
        err: msgs::LightningError,
        chan_id: Option<([u8; 32], u128)>, // If Some a channel of ours has been closed
        shutdown_finish: Option<(ShutdownResult, Option<msgs::ChannelUpdate>)>,
+       channel_capacity: Option<u64>,
 }
 impl MsgHandleErrInternal {
        #[inline]
@@ -387,14 +414,15 @@ impl MsgHandleErrInternal {
                        },
                        chan_id: None,
                        shutdown_finish: None,
+                       channel_capacity: None,
                }
        }
        #[inline]
        fn from_no_close(err: msgs::LightningError) -> Self {
-               Self { err, chan_id: None, shutdown_finish: None }
+               Self { err, chan_id: None, shutdown_finish: None, channel_capacity: None }
        }
        #[inline]
-       fn from_finish_shutdown(err: String, channel_id: [u8; 32], user_channel_id: u128, shutdown_res: ShutdownResult, channel_update: Option<msgs::ChannelUpdate>) -> Self {
+       fn from_finish_shutdown(err: String, channel_id: [u8; 32], user_channel_id: u128, shutdown_res: ShutdownResult, channel_update: Option<msgs::ChannelUpdate>, channel_capacity: u64) -> Self {
                Self {
                        err: LightningError {
                                err: err.clone(),
@@ -407,6 +435,7 @@ impl MsgHandleErrInternal {
                        },
                        chan_id: Some((channel_id, user_channel_id)),
                        shutdown_finish: Some((shutdown_res, channel_update)),
+                       channel_capacity: Some(channel_capacity)
                }
        }
        #[inline]
@@ -439,6 +468,7 @@ impl MsgHandleErrInternal {
                        },
                        chan_id: None,
                        shutdown_finish: None,
+                       channel_capacity: None,
                }
        }
 }
@@ -502,19 +532,19 @@ struct ClaimablePayments {
 /// running normally, and specifically must be processed before any other non-background
 /// [`ChannelMonitorUpdate`]s are applied.
 enum BackgroundEvent {
-       /// Handle a ChannelMonitorUpdate which closes the channel. This is only separated from
-       /// [`Self::MonitorUpdateRegeneratedOnStartup`] as the maybe-non-closing variant needs a public
-       /// key to handle channel resumption, whereas if the channel has been force-closed we do not
-       /// need the counterparty node_id.
+       /// Handle a ChannelMonitorUpdate which closes the channel or for an already-closed channel.
+       /// This is only separated from [`Self::MonitorUpdateRegeneratedOnStartup`] as the
+       /// maybe-non-closing variant needs a public key to handle channel resumption, whereas if the
+       /// channel has been force-closed we do not need the counterparty node_id.
        ///
        /// Note that any such events are lost on shutdown, so in general they must be updates which
        /// are regenerated on startup.
-       ClosingMonitorUpdateRegeneratedOnStartup((OutPoint, ChannelMonitorUpdate)),
+       ClosedMonitorUpdateRegeneratedOnStartup((OutPoint, ChannelMonitorUpdate)),
        /// Handle a ChannelMonitorUpdate which may or may not close the channel and may unblock the
        /// channel to continue normal operation.
        ///
        /// In general this should be used rather than
-       /// [`Self::ClosingMonitorUpdateRegeneratedOnStartup`], however in cases where the
+       /// [`Self::ClosedMonitorUpdateRegeneratedOnStartup`], however in cases where the
        /// `counterparty_node_id` is not available as the channel has closed from a [`ChannelMonitor`]
        /// error the other variant is acceptable.
        ///
@@ -525,6 +555,13 @@ enum BackgroundEvent {
                funding_txo: OutPoint,
                update: ChannelMonitorUpdate
        },
+       /// Some [`ChannelMonitorUpdate`] (s) completed before we were serialized but we still have
+       /// them marked pending, thus we need to run any [`MonitorUpdateCompletionAction`] (s) pending
+       /// on a channel.
+       MonitorUpdatesComplete {
+               counterparty_node_id: PublicKey,
+               channel_id: [u8; 32],
+       },
 }
 
 #[derive(Debug)]
@@ -628,6 +665,13 @@ pub(super) struct PeerState<Signer: ChannelSigner> {
        /// Messages to send to the peer - pushed to in the same lock that they are generated in (except
        /// for broadcast messages, where ordering isn't as strict).
        pub(super) pending_msg_events: Vec<MessageSendEvent>,
+       /// Map from Channel IDs to pending [`ChannelMonitorUpdate`]s which have been passed to the
+       /// user but which have not yet completed.
+       ///
+       /// Note that the channel may no longer exist. For example if the channel was closed but we
+       /// later needed to claim an HTLC which is pending on-chain, we may generate a monitor update
+       /// for a missing channel.
+       in_flight_monitor_updates: BTreeMap<OutPoint, Vec<ChannelMonitorUpdate>>,
        /// Map from a specific channel to some action(s) that should be taken when all pending
        /// [`ChannelMonitorUpdate`]s for the channel complete updating.
        ///
@@ -663,9 +707,10 @@ impl <Signer: ChannelSigner> PeerState<Signer> {
                        return false
                }
                self.channel_by_id.is_empty() && self.monitor_update_blocked_actions.is_empty()
+                       && self.in_flight_monitor_updates.is_empty()
        }
 
-       // Returns a count of all channels we have with this peer, including pending channels.
+       // Returns a count of all channels we have with this peer, including unfunded channels.
        fn total_channel_count(&self) -> usize {
                self.channel_by_id.len() +
                        self.outbound_v1_channel_by_id.len() +
@@ -739,7 +784,23 @@ pub type SimpleArcChannelManager<M, T, F, L> = ChannelManager<
 /// of [`KeysManager`] and [`DefaultRouter`].
 ///
 /// This is not exported to bindings users as Arcs don't make sense in bindings
-pub type SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, 'h, M, T, F, L> = ChannelManager<&'a M, &'b T, &'c KeysManager, &'c KeysManager, &'c KeysManager, &'d F, &'e DefaultRouter<&'f NetworkGraph<&'g L>, &'g L, &'h Mutex<ProbabilisticScorer<&'f NetworkGraph<&'g L>, &'g L>>, ProbabilisticScoringFeeParameters, ProbabilisticScorer<&'f NetworkGraph<&'g L>, &'g L>>, &'g L>;
+pub type SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, 'h, M, T, F, L> =
+       ChannelManager<
+               &'a M,
+               &'b T,
+               &'c KeysManager,
+               &'c KeysManager,
+               &'c KeysManager,
+               &'d F,
+               &'e DefaultRouter<
+                       &'f NetworkGraph<&'g L>,
+                       &'g L,
+                       &'h Mutex<ProbabilisticScorer<&'f NetworkGraph<&'g L>, &'g L>>,
+                       ProbabilisticScoringFeeParameters,
+                       ProbabilisticScorer<&'f NetworkGraph<&'g L>, &'g L>
+               >,
+               &'g L
+       >;
 
 macro_rules! define_test_pub_trait { ($vis: vis) => {
 /// A trivial trait which describes any [`ChannelManager`] used in testing.
@@ -1085,7 +1146,6 @@ where
        /// Notifier the lock contains sends out a notification when the lock is released.
        total_consistency_lock: RwLock<()>,
 
-       #[cfg(debug_assertions)]
        background_events_processed_since_startup: AtomicBool,
 
        persistence_notifier: Notifier,
@@ -1451,6 +1511,9 @@ pub struct ChannelDetails {
        ///
        /// [`confirmations_required`]: ChannelDetails::confirmations_required
        pub is_channel_ready: bool,
+       /// The stage of the channel's shutdown.
+       /// `None` for `ChannelDetails` serialized on LDK versions prior to 0.0.116.
+       pub channel_shutdown_state: Option<ChannelShutdownState>,
        /// True if the channel is (a) confirmed and channel_ready messages have been exchanged, (b)
        /// the peer is connected, and (c) the channel is not currently negotiating a shutdown.
        ///
@@ -1490,10 +1553,13 @@ impl ChannelDetails {
                self.short_channel_id.or(self.outbound_scid_alias)
        }
 
-       fn from_channel_context<Signer: WriteableEcdsaChannelSigner>(context: &ChannelContext<Signer>,
-               best_block_height: u32, latest_features: InitFeatures) -> Self {
-
-               let balance = context.get_available_balances();
+       fn from_channel_context<Signer: WriteableEcdsaChannelSigner, F: Deref>(
+               context: &ChannelContext<Signer>, best_block_height: u32, latest_features: InitFeatures,
+               fee_estimator: &LowerBoundedFeeEstimator<F>
+       ) -> Self
+       where F::Target: FeeEstimator
+       {
+               let balance = context.get_available_balances(fee_estimator);
                let (to_remote_reserve_satoshis, to_self_reserve_satoshis) =
                        context.get_holder_counterparty_selected_channel_reserve_satoshis();
                ChannelDetails {
@@ -1538,10 +1604,33 @@ impl ChannelDetails {
                        inbound_htlc_minimum_msat: Some(context.get_holder_htlc_minimum_msat()),
                        inbound_htlc_maximum_msat: context.get_holder_htlc_maximum_msat(),
                        config: Some(context.config()),
+                       channel_shutdown_state: Some(context.shutdown_state()),
                }
        }
 }
 
+#[derive(Clone, Copy, Debug, PartialEq, Eq)]
+/// Further information on the details of the channel shutdown.
+/// Upon channels being forced closed (i.e. commitment transaction confirmation detected
+/// by `ChainMonitor`), ChannelShutdownState will be set to `ShutdownComplete` or
+/// the channel will be removed shortly.
+/// Also note, that in normal operation, peers could disconnect at any of these states
+/// and require peer re-connection before making progress onto other states
+pub enum ChannelShutdownState {
+       /// Channel has not sent or received a shutdown message.
+       NotShuttingDown,
+       /// Local node has sent a shutdown message for this channel.
+       ShutdownInitiated,
+       /// Shutdown message exchanges have concluded and the channels are in the midst of
+       /// resolving all existing open HTLCs before closing can continue.
+       ResolvingHTLCs,
+       /// All HTLCs have been resolved, nodes are currently negotiating channel close onchain fee rates.
+       NegotiatingClosingFee,
+       /// We've successfully negotiated a closing_signed dance. At this point `ChannelManager` is about
+       /// to drop the channel.
+       ShutdownComplete,
+}
+
 /// Used by [`ChannelManager::list_recent_payments`] to express the status of recent payments.
 /// These include payments that have yet to find a successful path, or have unresolved HTLCs.
 #[derive(Debug, PartialEq)]
@@ -1595,7 +1684,7 @@ macro_rules! handle_error {
 
                match $internal {
                        Ok(msg) => Ok(msg),
-                       Err(MsgHandleErrInternal { err, chan_id, shutdown_finish }) => {
+                       Err(MsgHandleErrInternal { err, chan_id, shutdown_finish, channel_capacity }) => {
                                let mut msg_events = Vec::with_capacity(2);
 
                                if let Some((shutdown_res, update_option)) = shutdown_finish {
@@ -1608,7 +1697,9 @@ macro_rules! handle_error {
                                        if let Some((channel_id, user_channel_id)) = chan_id {
                                                $self.pending_events.lock().unwrap().push_back((events::Event::ChannelClosed {
                                                        channel_id, user_channel_id,
-                                                       reason: ClosureReason::ProcessingError { err: err.err.clone() }
+                                                       reason: ClosureReason::ProcessingError { err: err.err.clone() },
+                                                       counterparty_node_id: Some($counterparty_node_id),
+                                                       channel_capacity_sats: channel_capacity,
                                                }, None));
                                        }
                                }
@@ -1681,20 +1772,20 @@ macro_rules! convert_chan_err {
                                update_maps_on_chan_removal!($self, &$channel.context);
                                let shutdown_res = $channel.context.force_shutdown(true);
                                (true, MsgHandleErrInternal::from_finish_shutdown(msg, *$channel_id, $channel.context.get_user_id(),
-                                       shutdown_res, $self.get_channel_update_for_broadcast(&$channel).ok()))
+                                       shutdown_res, $self.get_channel_update_for_broadcast(&$channel).ok(), $channel.context.get_value_satoshis()))
                        },
                }
        };
-       ($self: ident, $err: expr, $channel_context: expr, $channel_id: expr, PREFUNDED) => {
+       ($self: ident, $err: expr, $channel_context: expr, $channel_id: expr, UNFUNDED) => {
                match $err {
-                       // We should only ever have `ChannelError::Close` when prefunded channels error.
+                       // We should only ever have `ChannelError::Close` when unfunded channels error.
                        // In any case, just close the channel.
                        ChannelError::Warn(msg) | ChannelError::Ignore(msg) | ChannelError::Close(msg) => {
-                               log_error!($self.logger, "Closing prefunded channel {} due to an error: {}", log_bytes!($channel_id[..]), msg);
+                               log_error!($self.logger, "Closing unfunded channel {} due to an error: {}", log_bytes!($channel_id[..]), msg);
                                update_maps_on_chan_removal!($self, &$channel_context);
                                let shutdown_res = $channel_context.force_shutdown(false);
                                (true, MsgHandleErrInternal::from_finish_shutdown(msg, *$channel_id, $channel_context.get_user_id(),
-                                       shutdown_res, None))
+                                       shutdown_res, None, $channel_context.get_value_satoshis()))
                        },
                }
        }
@@ -1720,7 +1811,7 @@ macro_rules! try_v1_outbound_chan_entry {
                match $res {
                        Ok(res) => res,
                        Err(e) => {
-                               let (drop, res) = convert_chan_err!($self, e, $entry.get_mut().context, $entry.key(), PREFUNDED);
+                               let (drop, res) = convert_chan_err!($self, e, $entry.get_mut().context, $entry.key(), UNFUNDED);
                                if drop {
                                        $entry.remove_entry();
                                }
@@ -1806,7 +1897,7 @@ macro_rules! emit_channel_ready_event {
 }
 
 macro_rules! handle_monitor_update_completion {
-       ($self: ident, $update_id: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan: expr) => { {
+       ($self: ident, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan: expr) => { {
                let mut updates = $chan.monitor_updating_restored(&$self.logger,
                        &$self.node_signer, $self.genesis_hash, &$self.default_configuration,
                        $self.best_block.read().unwrap().height());
@@ -1855,41 +1946,65 @@ macro_rules! handle_monitor_update_completion {
 }
 
 macro_rules! handle_new_monitor_update {
-       ($self: ident, $update_res: expr, $update_id: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan: expr, MANUALLY_REMOVING, $remove: expr) => { {
+       ($self: ident, $update_res: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan: expr, _internal, $remove: expr, $completed: expr) => { {
                // update_maps_on_chan_removal needs to be able to take id_to_peer, so make sure we can in
                // any case so that it won't deadlock.
                debug_assert_ne!($self.id_to_peer.held_by_thread(), LockHeldState::HeldByThread);
-               #[cfg(debug_assertions)] {
-                       debug_assert!($self.background_events_processed_since_startup.load(Ordering::Acquire));
-               }
+               debug_assert!($self.background_events_processed_since_startup.load(Ordering::Acquire));
                match $update_res {
                        ChannelMonitorUpdateStatus::InProgress => {
                                log_debug!($self.logger, "ChannelMonitor update for {} in flight, holding messages until the update completes.",
                                        log_bytes!($chan.context.channel_id()[..]));
-                               Ok(())
+                               Ok(false)
                        },
                        ChannelMonitorUpdateStatus::PermanentFailure => {
                                log_error!($self.logger, "Closing channel {} due to monitor update ChannelMonitorUpdateStatus::PermanentFailure",
                                        log_bytes!($chan.context.channel_id()[..]));
                                update_maps_on_chan_removal!($self, &$chan.context);
-                               let res: Result<(), _> = Err(MsgHandleErrInternal::from_finish_shutdown(
+                               let res = Err(MsgHandleErrInternal::from_finish_shutdown(
                                        "ChannelMonitor storage failure".to_owned(), $chan.context.channel_id(),
                                        $chan.context.get_user_id(), $chan.context.force_shutdown(false),
-                                       $self.get_channel_update_for_broadcast(&$chan).ok()));
+                                       $self.get_channel_update_for_broadcast(&$chan).ok(), $chan.context.get_value_satoshis()));
                                $remove;
                                res
                        },
                        ChannelMonitorUpdateStatus::Completed => {
-                               $chan.complete_one_mon_update($update_id);
-                               if $chan.no_monitor_updates_pending() {
-                                       handle_monitor_update_completion!($self, $update_id, $peer_state_lock, $peer_state, $per_peer_state_lock, $chan);
-                               }
-                               Ok(())
+                               $completed;
+                               Ok(true)
                        },
                }
        } };
-       ($self: ident, $update_res: expr, $update_id: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan_entry: expr) => {
-               handle_new_monitor_update!($self, $update_res, $update_id, $peer_state_lock, $peer_state, $per_peer_state_lock, $chan_entry.get_mut(), MANUALLY_REMOVING, $chan_entry.remove_entry())
+       ($self: ident, $update_res: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan: expr, MANUALLY_REMOVING_INITIAL_MONITOR, $remove: expr) => {
+               handle_new_monitor_update!($self, $update_res, $peer_state_lock, $peer_state,
+                       $per_peer_state_lock, $chan, _internal, $remove,
+                       handle_monitor_update_completion!($self, $peer_state_lock, $peer_state, $per_peer_state_lock, $chan))
+       };
+       ($self: ident, $update_res: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan_entry: expr, INITIAL_MONITOR) => {
+               handle_new_monitor_update!($self, $update_res, $peer_state_lock, $peer_state, $per_peer_state_lock, $chan_entry.get_mut(), MANUALLY_REMOVING_INITIAL_MONITOR, $chan_entry.remove_entry())
+       };
+       ($self: ident, $funding_txo: expr, $update: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan: expr, MANUALLY_REMOVING, $remove: expr) => { {
+               let in_flight_updates = $peer_state.in_flight_monitor_updates.entry($funding_txo)
+                       .or_insert_with(Vec::new);
+               // During startup, we push monitor updates as background events through to here in
+               // order to replay updates that were in-flight when we shut down. Thus, we have to
+               // filter for uniqueness here.
+               let idx = in_flight_updates.iter().position(|upd| upd == &$update)
+                       .unwrap_or_else(|| {
+                               in_flight_updates.push($update);
+                               in_flight_updates.len() - 1
+                       });
+               let update_res = $self.chain_monitor.update_channel($funding_txo, &in_flight_updates[idx]);
+               handle_new_monitor_update!($self, update_res, $peer_state_lock, $peer_state,
+                       $per_peer_state_lock, $chan, _internal, $remove,
+                       {
+                               let _ = in_flight_updates.remove(idx);
+                               if in_flight_updates.is_empty() && $chan.blocked_monitor_updates_pending() == 0 {
+                                       handle_monitor_update_completion!($self, $peer_state_lock, $peer_state, $per_peer_state_lock, $chan);
+                               }
+                       })
+       } };
+       ($self: ident, $funding_txo: expr, $update: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan_entry: expr) => {
+               handle_new_monitor_update!($self, $funding_txo, $update, $peer_state_lock, $peer_state, $per_peer_state_lock, $chan_entry.get_mut(), MANUALLY_REMOVING, $chan_entry.remove_entry())
        }
 }
 
@@ -1939,6 +2054,8 @@ macro_rules! process_events_body {
                                let mut pending_events = $self.pending_events.lock().unwrap();
                                pending_events.drain(..num_events);
                                processed_all_events = pending_events.is_empty();
+                               // Note that `push_pending_forwards_ev` relies on `pending_events_processor` being
+                               // updated here with the `pending_events` lock acquired.
                                $self.pending_events_processor.store(false, Ordering::Release);
                        }
 
@@ -1968,6 +2085,8 @@ where
 {
        /// Constructs a new `ChannelManager` to hold several channels and route between them.
        ///
+       /// The current time or latest block header time can be provided as the `current_timestamp`.
+       ///
        /// This is the main "logic hub" for all channel-related actions, and implements
        /// [`ChannelMessageHandler`].
        ///
@@ -1981,7 +2100,11 @@ where
        /// [`block_connected`]: chain::Listen::block_connected
        /// [`block_disconnected`]: chain::Listen::block_disconnected
        /// [`params.best_block.block_hash`]: chain::BestBlock::block_hash
-       pub fn new(fee_est: F, chain_monitor: M, tx_broadcaster: T, router: R, logger: L, entropy_source: ES, node_signer: NS, signer_provider: SP, config: UserConfig, params: ChainParameters) -> Self {
+       pub fn new(
+               fee_est: F, chain_monitor: M, tx_broadcaster: T, router: R, logger: L, entropy_source: ES,
+               node_signer: NS, signer_provider: SP, config: UserConfig, params: ChainParameters,
+               current_timestamp: u32,
+       ) -> Self {
                let mut secp_ctx = Secp256k1::new();
                secp_ctx.seeded_randomize(&entropy_source.get_secure_random_bytes());
                let inbound_pmt_key_material = node_signer.get_inbound_payment_key_material();
@@ -2013,7 +2136,7 @@ where
 
                        probing_cookie_secret: entropy_source.get_secure_random_bytes(),
 
-                       highest_seen_timestamp: AtomicUsize::new(0),
+                       highest_seen_timestamp: AtomicUsize::new(current_timestamp as usize),
 
                        per_peer_state: FairRwLock::new(HashMap::new()),
 
@@ -2021,7 +2144,6 @@ where
                        pending_events_processor: AtomicBool::new(false),
                        pending_background_events: Mutex::new(Vec::new()),
                        total_consistency_lock: RwLock::new(()),
-                       #[cfg(debug_assertions)]
                        background_events_processed_since_startup: AtomicBool::new(false),
                        persistence_notifier: Notifier::new(),
 
@@ -2151,9 +2273,10 @@ where
                        for (_cp_id, peer_state_mutex) in per_peer_state.iter() {
                                let mut peer_state_lock = peer_state_mutex.lock().unwrap();
                                let peer_state = &mut *peer_state_lock;
+                               // Only `Channels` in the channel_by_id map can be considered funded.
                                for (_channel_id, channel) in peer_state.channel_by_id.iter().filter(f) {
                                        let details = ChannelDetails::from_channel_context(&channel.context, best_block_height,
-                                               peer_state.latest_features.clone());
+                                               peer_state.latest_features.clone(), &self.fee_estimator);
                                        res.push(details);
                                }
                        }
@@ -2179,17 +2302,17 @@ where
                                let peer_state = &mut *peer_state_lock;
                                for (_channel_id, channel) in peer_state.channel_by_id.iter() {
                                        let details = ChannelDetails::from_channel_context(&channel.context, best_block_height,
-                                               peer_state.latest_features.clone());
+                                               peer_state.latest_features.clone(), &self.fee_estimator);
                                        res.push(details);
                                }
                                for (_channel_id, channel) in peer_state.inbound_v1_channel_by_id.iter() {
                                        let details = ChannelDetails::from_channel_context(&channel.context, best_block_height,
-                                               peer_state.latest_features.clone());
+                                               peer_state.latest_features.clone(), &self.fee_estimator);
                                        res.push(details);
                                }
                                for (_channel_id, channel) in peer_state.outbound_v1_channel_by_id.iter() {
                                        let details = ChannelDetails::from_channel_context(&channel.context, best_block_height,
-                                               peer_state.latest_features.clone());
+                                               peer_state.latest_features.clone(), &self.fee_estimator);
                                        res.push(details);
                                }
                        }
@@ -2219,10 +2342,15 @@ where
                        let mut peer_state_lock = peer_state_mutex.lock().unwrap();
                        let peer_state = &mut *peer_state_lock;
                        let features = &peer_state.latest_features;
+                       let chan_context_to_details = |context| {
+                               ChannelDetails::from_channel_context(context, best_block_height, features.clone(), &self.fee_estimator)
+                       };
                        return peer_state.channel_by_id
                                .iter()
-                               .map(|(_, channel)|
-                                       ChannelDetails::from_channel_context(&channel.context, best_block_height, features.clone()))
+                               .map(|(_, channel)| &channel.context)
+                               .chain(peer_state.outbound_v1_channel_by_id.iter().map(|(_, channel)| &channel.context))
+                               .chain(peer_state.inbound_v1_channel_by_id.iter().map(|(_, channel)| &channel.context))
+                               .map(chan_context_to_details)
                                .collect();
                }
                vec![]
@@ -2270,7 +2398,9 @@ where
                pending_events_lock.push_back((events::Event::ChannelClosed {
                        channel_id: context.channel_id(),
                        user_channel_id: context.get_user_id(),
-                       reason: closure_reason
+                       reason: closure_reason,
+                       counterparty_node_id: Some(context.get_counterparty_node_id()),
+                       channel_capacity_sats: Some(context.get_value_satoshis()),
                }, None));
        }
 
@@ -2279,49 +2409,58 @@ where
 
                let mut failed_htlcs: Vec<(HTLCSource, PaymentHash)>;
                let result: Result<(), _> = loop {
-                       let per_peer_state = self.per_peer_state.read().unwrap();
+                       {
+                               let per_peer_state = self.per_peer_state.read().unwrap();
 
-                       let peer_state_mutex = per_peer_state.get(counterparty_node_id)
-                               .ok_or_else(|| APIError::ChannelUnavailable { err: format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id) })?;
+                               let peer_state_mutex = per_peer_state.get(counterparty_node_id)
+                                       .ok_or_else(|| APIError::ChannelUnavailable { err: format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id) })?;
 
-                       let mut peer_state_lock = peer_state_mutex.lock().unwrap();
-                       let peer_state = &mut *peer_state_lock;
-                       match peer_state.channel_by_id.entry(channel_id.clone()) {
-                               hash_map::Entry::Occupied(mut chan_entry) => {
-                                       let funding_txo_opt = chan_entry.get().context.get_funding_txo();
-                                       let their_features = &peer_state.latest_features;
-                                       let (shutdown_msg, mut monitor_update_opt, htlcs) = chan_entry.get_mut()
-                                               .get_shutdown(&self.signer_provider, their_features, target_feerate_sats_per_1000_weight, override_shutdown_script)?;
-                                       failed_htlcs = htlcs;
+                               let mut peer_state_lock = peer_state_mutex.lock().unwrap();
+                               let peer_state = &mut *peer_state_lock;
 
-                                       // We can send the `shutdown` message before updating the `ChannelMonitor`
-                                       // here as we don't need the monitor update to complete until we send a
-                                       // `shutdown_signed`, which we'll delay if we're pending a monitor update.
-                                       peer_state.pending_msg_events.push(events::MessageSendEvent::SendShutdown {
-                                               node_id: *counterparty_node_id,
-                                               msg: shutdown_msg,
-                                       });
+                               match peer_state.channel_by_id.entry(channel_id.clone()) {
+                                       hash_map::Entry::Occupied(mut chan_entry) => {
+                                               let funding_txo_opt = chan_entry.get().context.get_funding_txo();
+                                               let their_features = &peer_state.latest_features;
+                                               let (shutdown_msg, mut monitor_update_opt, htlcs) = chan_entry.get_mut()
+                                                       .get_shutdown(&self.signer_provider, their_features, target_feerate_sats_per_1000_weight, override_shutdown_script)?;
+                                               failed_htlcs = htlcs;
 
-                                       // Update the monitor with the shutdown script if necessary.
-                                       if let Some(monitor_update) = monitor_update_opt.take() {
-                                               let update_id = monitor_update.update_id;
-                                               let update_res = self.chain_monitor.update_channel(funding_txo_opt.unwrap(), monitor_update);
-                                               break handle_new_monitor_update!(self, update_res, update_id, peer_state_lock, peer_state, per_peer_state, chan_entry);
-                                       }
+                                               // We can send the `shutdown` message before updating the `ChannelMonitor`
+                                               // here as we don't need the monitor update to complete until we send a
+                                               // `shutdown_signed`, which we'll delay if we're pending a monitor update.
+                                               peer_state.pending_msg_events.push(events::MessageSendEvent::SendShutdown {
+                                                       node_id: *counterparty_node_id,
+                                                       msg: shutdown_msg,
+                                               });
 
-                                       if chan_entry.get().is_shutdown() {
-                                               let channel = remove_channel!(self, chan_entry);
-                                               if let Ok(channel_update) = self.get_channel_update_for_broadcast(&channel) {
-                                                       peer_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
-                                                               msg: channel_update
-                                                       });
+                                               // Update the monitor with the shutdown script if necessary.
+                                               if let Some(monitor_update) = monitor_update_opt.take() {
+                                                       break handle_new_monitor_update!(self, funding_txo_opt.unwrap(), monitor_update,
+                                                               peer_state_lock, peer_state, per_peer_state, chan_entry).map(|_| ());
                                                }
-                                               self.issue_channel_close_events(&channel.context, ClosureReason::HolderForceClosed);
-                                       }
-                                       break Ok(());
-                               },
-                               hash_map::Entry::Vacant(_) => return Err(APIError::ChannelUnavailable{err: format!("Channel with id {} not found for the passed counterparty node_id {}", log_bytes!(*channel_id), counterparty_node_id) })
+
+                                               if chan_entry.get().is_shutdown() {
+                                                       let channel = remove_channel!(self, chan_entry);
+                                                       if let Ok(channel_update) = self.get_channel_update_for_broadcast(&channel) {
+                                                               peer_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
+                                                                       msg: channel_update
+                                                               });
+                                                       }
+                                                       self.issue_channel_close_events(&channel.context, ClosureReason::HolderForceClosed);
+                                               }
+                                               break Ok(());
+                                       },
+                                       hash_map::Entry::Vacant(_) => (),
+                               }
                        }
+                       // If we reach this point, it means that the channel_id either refers to an unfunded channel or
+                       // it does not exist for this peer. Either way, we can attempt to force-close it.
+                       //
+                       // An appropriate error will be returned for non-existence of the channel if that's the case.
+                       return self.force_close_channel_with_peer(&channel_id, counterparty_node_id, None, false).map(|_| ())
+                       // TODO(dunxen): This is still not ideal as we're doing some extra lookups.
+                       // Fix this with https://github.com/lightningdevkit/rust-lightning/issues/2422
                };
 
                for htlc_source in failed_htlcs.drain(..) {
@@ -2440,14 +2579,14 @@ where
                                self.issue_channel_close_events(&chan.get().context, closure_reason);
                                let mut chan = remove_channel!(self, chan);
                                self.finish_force_close_channel(chan.context.force_shutdown(false));
-                               // Prefunded channel has no update
+                               // Unfunded channel has no update
                                (None, chan.context.get_counterparty_node_id())
                        } else if let hash_map::Entry::Occupied(chan) = peer_state.inbound_v1_channel_by_id.entry(channel_id.clone()) {
                                log_error!(self.logger, "Force-closing channel {}", log_bytes!(channel_id[..]));
                                self.issue_channel_close_events(&chan.get().context, closure_reason);
                                let mut chan = remove_channel!(self, chan);
                                self.finish_force_close_channel(chan.context.force_shutdown(false));
-                               // Prefunded channel has no update
+                               // Unfunded channel has no update
                                (None, chan.context.get_counterparty_node_id())
                        } else {
                                return Err(APIError::ChannelUnavailable{ err: format!("Channel with id {} not found for the passed counterparty node_id {}", log_bytes!(*channel_id), peer_node_id) });
@@ -2521,12 +2660,64 @@ where
                }
        }
 
-       fn construct_recv_pending_htlc_info(&self, hop_data: msgs::OnionHopData, shared_secret: [u8; 32],
-               payment_hash: PaymentHash, amt_msat: u64, cltv_expiry: u32, phantom_shared_secret: Option<[u8; 32]>) -> Result<PendingHTLCInfo, ReceiveError>
-       {
+       fn construct_fwd_pending_htlc_info(
+               &self, msg: &msgs::UpdateAddHTLC, hop_data: msgs::InboundOnionPayload, hop_hmac: [u8; 32],
+               new_packet_bytes: [u8; onion_utils::ONION_DATA_LEN], shared_secret: [u8; 32],
+               next_packet_pubkey_opt: Option<Result<PublicKey, secp256k1::Error>>
+       ) -> Result<PendingHTLCInfo, InboundOnionErr> {
+               debug_assert!(next_packet_pubkey_opt.is_some());
+               let outgoing_packet = msgs::OnionPacket {
+                       version: 0,
+                       public_key: next_packet_pubkey_opt.unwrap_or(Err(secp256k1::Error::InvalidPublicKey)),
+                       hop_data: new_packet_bytes,
+                       hmac: hop_hmac,
+               };
+
+               let (short_channel_id, amt_to_forward, outgoing_cltv_value) = match hop_data {
+                       msgs::InboundOnionPayload::Forward { short_channel_id, amt_to_forward, outgoing_cltv_value } =>
+                               (short_channel_id, amt_to_forward, outgoing_cltv_value),
+                       msgs::InboundOnionPayload::Receive { .. } =>
+                               return Err(InboundOnionErr {
+                                       msg: "Final Node OnionHopData provided for us as an intermediary node",
+                                       err_code: 0x4000 | 22,
+                                       err_data: Vec::new(),
+                               }),
+               };
+
+               Ok(PendingHTLCInfo {
+                       routing: PendingHTLCRouting::Forward {
+                               onion_packet: outgoing_packet,
+                               short_channel_id,
+                       },
+                       payment_hash: msg.payment_hash,
+                       incoming_shared_secret: shared_secret,
+                       incoming_amt_msat: Some(msg.amount_msat),
+                       outgoing_amt_msat: amt_to_forward,
+                       outgoing_cltv_value,
+                       skimmed_fee_msat: None,
+               })
+       }
+
+       fn construct_recv_pending_htlc_info(
+               &self, hop_data: msgs::InboundOnionPayload, shared_secret: [u8; 32], payment_hash: PaymentHash,
+               amt_msat: u64, cltv_expiry: u32, phantom_shared_secret: Option<[u8; 32]>, allow_underpay: bool,
+               counterparty_skimmed_fee_msat: Option<u64>,
+       ) -> Result<PendingHTLCInfo, InboundOnionErr> {
+               let (payment_data, keysend_preimage, custom_tlvs, onion_amt_msat, outgoing_cltv_value, payment_metadata) = match hop_data {
+                       msgs::InboundOnionPayload::Receive {
+                               payment_data, keysend_preimage, custom_tlvs, amt_msat, outgoing_cltv_value, payment_metadata, ..
+                       } =>
+                               (payment_data, keysend_preimage, custom_tlvs, amt_msat, outgoing_cltv_value, payment_metadata),
+                       _ =>
+                               return Err(InboundOnionErr {
+                                       err_code: 0x4000|22,
+                                       err_data: Vec::new(),
+                                       msg: "Got non final data with an HMAC of 0",
+                               }),
+               };
                // final_incorrect_cltv_expiry
-               if hop_data.outgoing_cltv_value > cltv_expiry {
-                       return Err(ReceiveError {
+               if outgoing_cltv_value > cltv_expiry {
+                       return Err(InboundOnionErr {
                                msg: "Upstream node set CLTV to less than the CLTV set by the sender",
                                err_code: 18,
                                err_data: cltv_expiry.to_be_bytes().to_vec()
@@ -2540,91 +2731,88 @@ where
                // payment logic has enough time to fail the HTLC backward before our onchain logic triggers a
                // channel closure (see HTLC_FAIL_BACK_BUFFER rationale).
                let current_height: u32 = self.best_block.read().unwrap().height();
-               if (hop_data.outgoing_cltv_value as u64) <= current_height as u64 + HTLC_FAIL_BACK_BUFFER as u64 + 1 {
+               if (outgoing_cltv_value as u64) <= current_height as u64 + HTLC_FAIL_BACK_BUFFER as u64 + 1 {
                        let mut err_data = Vec::with_capacity(12);
                        err_data.extend_from_slice(&amt_msat.to_be_bytes());
                        err_data.extend_from_slice(&current_height.to_be_bytes());
-                       return Err(ReceiveError {
+                       return Err(InboundOnionErr {
                                err_code: 0x4000 | 15, err_data,
                                msg: "The final CLTV expiry is too soon to handle",
                        });
                }
-               if hop_data.amt_to_forward > amt_msat {
-                       return Err(ReceiveError {
+               if (!allow_underpay && onion_amt_msat > amt_msat) ||
+                       (allow_underpay && onion_amt_msat >
+                        amt_msat.saturating_add(counterparty_skimmed_fee_msat.unwrap_or(0)))
+               {
+                       return Err(InboundOnionErr {
                                err_code: 19,
                                err_data: amt_msat.to_be_bytes().to_vec(),
                                msg: "Upstream node sent less than we were supposed to receive in payment",
                        });
                }
 
-               let routing = match hop_data.format {
-                       msgs::OnionHopDataFormat::NonFinalNode { .. } => {
-                               return Err(ReceiveError {
+               let routing = if let Some(payment_preimage) = keysend_preimage {
+                       // We need to check that the sender knows the keysend preimage before processing this
+                       // payment further. Otherwise, an intermediary routing hop forwarding non-keysend-HTLC X
+                       // could discover the final destination of X, by probing the adjacent nodes on the route
+                       // with a keysend payment of identical payment hash to X and observing the processing
+                       // time discrepancies due to a hash collision with X.
+                       let hashed_preimage = PaymentHash(Sha256::hash(&payment_preimage.0).into_inner());
+                       if hashed_preimage != payment_hash {
+                               return Err(InboundOnionErr {
                                        err_code: 0x4000|22,
                                        err_data: Vec::new(),
-                                       msg: "Got non final data with an HMAC of 0",
+                                       msg: "Payment preimage didn't match payment hash",
                                });
-                       },
-                       msgs::OnionHopDataFormat::FinalNode { payment_data, keysend_preimage, payment_metadata } => {
-                               if let Some(payment_preimage) = keysend_preimage {
-                                       // We need to check that the sender knows the keysend preimage before processing this
-                                       // payment further. Otherwise, an intermediary routing hop forwarding non-keysend-HTLC X
-                                       // could discover the final destination of X, by probing the adjacent nodes on the route
-                                       // with a keysend payment of identical payment hash to X and observing the processing
-                                       // time discrepancies due to a hash collision with X.
-                                       let hashed_preimage = PaymentHash(Sha256::hash(&payment_preimage.0).into_inner());
-                                       if hashed_preimage != payment_hash {
-                                               return Err(ReceiveError {
-                                                       err_code: 0x4000|22,
-                                                       err_data: Vec::new(),
-                                                       msg: "Payment preimage didn't match payment hash",
-                                               });
-                                       }
-                                       if !self.default_configuration.accept_mpp_keysend && payment_data.is_some() {
-                                               return Err(ReceiveError {
-                                                       err_code: 0x4000|22,
-                                                       err_data: Vec::new(),
-                                                       msg: "We don't support MPP keysend payments",
-                                               });
-                                       }
-                                       PendingHTLCRouting::ReceiveKeysend {
-                                               payment_data,
-                                               payment_preimage,
-                                               payment_metadata,
-                                               incoming_cltv_expiry: hop_data.outgoing_cltv_value,
-                                       }
-                               } else if let Some(data) = payment_data {
-                                       PendingHTLCRouting::Receive {
-                                               payment_data: data,
-                                               payment_metadata,
-                                               incoming_cltv_expiry: hop_data.outgoing_cltv_value,
-                                               phantom_shared_secret,
-                                       }
-                               } else {
-                                       return Err(ReceiveError {
-                                               err_code: 0x4000|0x2000|3,
-                                               err_data: Vec::new(),
-                                               msg: "We require payment_secrets",
-                                       });
-                               }
-                       },
+                       }
+                       if !self.default_configuration.accept_mpp_keysend && payment_data.is_some() {
+                               return Err(InboundOnionErr {
+                                       err_code: 0x4000|22,
+                                       err_data: Vec::new(),
+                                       msg: "We don't support MPP keysend payments",
+                               });
+                       }
+                       PendingHTLCRouting::ReceiveKeysend {
+                               payment_data,
+                               payment_preimage,
+                               payment_metadata,
+                               incoming_cltv_expiry: outgoing_cltv_value,
+                               custom_tlvs,
+                       }
+               } else if let Some(data) = payment_data {
+                       PendingHTLCRouting::Receive {
+                               payment_data: data,
+                               payment_metadata,
+                               incoming_cltv_expiry: outgoing_cltv_value,
+                               phantom_shared_secret,
+                               custom_tlvs,
+                       }
+               } else {
+                       return Err(InboundOnionErr {
+                               err_code: 0x4000|0x2000|3,
+                               err_data: Vec::new(),
+                               msg: "We require payment_secrets",
+                       });
                };
                Ok(PendingHTLCInfo {
                        routing,
                        payment_hash,
                        incoming_shared_secret: shared_secret,
                        incoming_amt_msat: Some(amt_msat),
-                       outgoing_amt_msat: hop_data.amt_to_forward,
-                       outgoing_cltv_value: hop_data.outgoing_cltv_value,
+                       outgoing_amt_msat: onion_amt_msat,
+                       outgoing_cltv_value,
+                       skimmed_fee_msat: counterparty_skimmed_fee_msat,
                })
        }
 
-       fn decode_update_add_htlc_onion(&self, msg: &msgs::UpdateAddHTLC) -> PendingHTLCStatus {
+       fn decode_update_add_htlc_onion(
+               &self, msg: &msgs::UpdateAddHTLC
+       ) -> Result<(onion_utils::Hop, [u8; 32], Option<Result<PublicKey, secp256k1::Error>>), HTLCFailureMsg> {
                macro_rules! return_malformed_err {
                        ($msg: expr, $err_code: expr) => {
                                {
                                        log_info!(self.logger, "Failed to accept/forward incoming HTLC: {}", $msg);
-                                       return PendingHTLCStatus::Fail(HTLCFailureMsg::Malformed(msgs::UpdateFailMalformedHTLC {
+                                       return Err(HTLCFailureMsg::Malformed(msgs::UpdateFailMalformedHTLC {
                                                channel_id: msg.channel_id,
                                                htlc_id: msg.htlc_id,
                                                sha256_of_onion: Sha256::hash(&msg.onion_routing_packet.hop_data).into_inner(),
@@ -2655,7 +2843,7 @@ where
                        ($msg: expr, $err_code: expr, $data: expr) => {
                                {
                                        log_info!(self.logger, "Failed to accept/forward incoming HTLC: {}", $msg);
-                                       return PendingHTLCStatus::Fail(HTLCFailureMsg::Relay(msgs::UpdateFailHTLC {
+                                       return Err(HTLCFailureMsg::Relay(msgs::UpdateFailHTLC {
                                                channel_id: msg.channel_id,
                                                htlc_id: msg.htlc_id,
                                                reason: HTLCFailReason::reason($err_code, $data.to_vec())
@@ -2674,11 +2862,183 @@ where
                                return_err!(err_msg, err_code, &[0; 0]);
                        },
                };
+               let (outgoing_scid, outgoing_amt_msat, outgoing_cltv_value, next_packet_pk_opt) = match next_hop {
+                       onion_utils::Hop::Forward {
+                               next_hop_data: msgs::InboundOnionPayload::Forward {
+                                       short_channel_id, amt_to_forward, outgoing_cltv_value
+                               }, ..
+                       } => {
+                               let next_pk = onion_utils::next_hop_packet_pubkey(&self.secp_ctx,
+                                       msg.onion_routing_packet.public_key.unwrap(), &shared_secret);
+                               (short_channel_id, amt_to_forward, outgoing_cltv_value, Some(next_pk))
+                       },
+                       // We'll do receive checks in [`Self::construct_pending_htlc_info`] so we have access to the
+                       // inbound channel's state.
+                       onion_utils::Hop::Receive { .. } => return Ok((next_hop, shared_secret, None)),
+                       onion_utils::Hop::Forward { next_hop_data: msgs::InboundOnionPayload::Receive { .. }, .. } => {
+                               return_err!("Final Node OnionHopData provided for us as an intermediary node", 0x4000 | 22, &[0; 0]);
+                       }
+               };
+
+               // Perform outbound checks here instead of in [`Self::construct_pending_htlc_info`] because we
+               // can't hold the outbound peer state lock at the same time as the inbound peer state lock.
+               if let Some((err, mut code, chan_update)) = loop {
+                       let id_option = self.short_to_chan_info.read().unwrap().get(&outgoing_scid).cloned();
+                       let forwarding_chan_info_opt = match id_option {
+                               None => { // unknown_next_peer
+                                       // Note that this is likely a timing oracle for detecting whether an scid is a
+                                       // phantom or an intercept.
+                                       if (self.default_configuration.accept_intercept_htlcs &&
+                                               fake_scid::is_valid_intercept(&self.fake_scid_rand_bytes, outgoing_scid, &self.genesis_hash)) ||
+                                               fake_scid::is_valid_phantom(&self.fake_scid_rand_bytes, outgoing_scid, &self.genesis_hash)
+                                       {
+                                               None
+                                       } else {
+                                               break Some(("Don't have available channel for forwarding as requested.", 0x4000 | 10, None));
+                                       }
+                               },
+                               Some((cp_id, id)) => Some((cp_id.clone(), id.clone())),
+                       };
+                       let chan_update_opt = if let Some((counterparty_node_id, forwarding_id)) = forwarding_chan_info_opt {
+                               let per_peer_state = self.per_peer_state.read().unwrap();
+                               let peer_state_mutex_opt = per_peer_state.get(&counterparty_node_id);
+                               if peer_state_mutex_opt.is_none() {
+                                       break Some(("Don't have available channel for forwarding as requested.", 0x4000 | 10, None));
+                               }
+                               let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap();
+                               let peer_state = &mut *peer_state_lock;
+                               let chan = match peer_state.channel_by_id.get_mut(&forwarding_id) {
+                                       None => {
+                                               // Channel was removed. The short_to_chan_info and channel_by_id maps
+                                               // have no consistency guarantees.
+                                               break Some(("Don't have available channel for forwarding as requested.", 0x4000 | 10, None));
+                                       },
+                                       Some(chan) => chan
+                               };
+                               if !chan.context.should_announce() && !self.default_configuration.accept_forwards_to_priv_channels {
+                                       // Note that the behavior here should be identical to the above block - we
+                                       // should NOT reveal the existence or non-existence of a private channel if
+                                       // we don't allow forwards outbound over them.
+                                       break Some(("Refusing to forward to a private channel based on our config.", 0x4000 | 10, None));
+                               }
+                               if chan.context.get_channel_type().supports_scid_privacy() && outgoing_scid != chan.context.outbound_scid_alias() {
+                                       // `option_scid_alias` (referred to in LDK as `scid_privacy`) means
+                                       // "refuse to forward unless the SCID alias was used", so we pretend
+                                       // we don't have the channel here.
+                                       break Some(("Refusing to forward over real channel SCID as our counterparty requested.", 0x4000 | 10, None));
+                               }
+                               let chan_update_opt = self.get_channel_update_for_onion(outgoing_scid, chan).ok();
+
+                               // Note that we could technically not return an error yet here and just hope
+                               // that the connection is reestablished or monitor updated by the time we get
+                               // around to doing the actual forward, but better to fail early if we can and
+                               // hopefully an attacker trying to path-trace payments cannot make this occur
+                               // on a small/per-node/per-channel scale.
+                               if !chan.context.is_live() { // channel_disabled
+                                       // If the channel_update we're going to return is disabled (i.e. the
+                                       // peer has been disabled for some time), return `channel_disabled`,
+                                       // otherwise return `temporary_channel_failure`.
+                                       if chan_update_opt.as_ref().map(|u| u.contents.flags & 2 == 2).unwrap_or(false) {
+                                               break Some(("Forwarding channel has been disconnected for some time.", 0x1000 | 20, chan_update_opt));
+                                       } else {
+                                               break Some(("Forwarding channel is not in a ready state.", 0x1000 | 7, chan_update_opt));
+                                       }
+                               }
+                               if outgoing_amt_msat < chan.context.get_counterparty_htlc_minimum_msat() { // amount_below_minimum
+                                       break Some(("HTLC amount was below the htlc_minimum_msat", 0x1000 | 11, chan_update_opt));
+                               }
+                               if let Err((err, code)) = chan.htlc_satisfies_config(&msg, outgoing_amt_msat, outgoing_cltv_value) {
+                                       break Some((err, code, chan_update_opt));
+                               }
+                               chan_update_opt
+                       } else {
+                               if (msg.cltv_expiry as u64) < (outgoing_cltv_value) as u64 + MIN_CLTV_EXPIRY_DELTA as u64 {
+                                       // We really should set `incorrect_cltv_expiry` here but as we're not
+                                       // forwarding over a real channel we can't generate a channel_update
+                                       // for it. Instead we just return a generic temporary_node_failure.
+                                       break Some((
+                                                       "Forwarding node has tampered with the intended HTLC values or origin node has an obsolete cltv_expiry_delta",
+                                                       0x2000 | 2, None,
+                                       ));
+                               }
+                               None
+                       };
+
+                       let cur_height = self.best_block.read().unwrap().height() + 1;
+                       // Theoretically, channel counterparty shouldn't send us a HTLC expiring now,
+                       // but we want to be robust wrt to counterparty packet sanitization (see
+                       // HTLC_FAIL_BACK_BUFFER rationale).
+                       if msg.cltv_expiry <= cur_height + HTLC_FAIL_BACK_BUFFER as u32 { // expiry_too_soon
+                               break Some(("CLTV expiry is too close", 0x1000 | 14, chan_update_opt));
+                       }
+                       if msg.cltv_expiry > cur_height + CLTV_FAR_FAR_AWAY as u32 { // expiry_too_far
+                               break Some(("CLTV expiry is too far in the future", 21, None));
+                       }
+                       // If the HTLC expires ~now, don't bother trying to forward it to our
+                       // counterparty. They should fail it anyway, but we don't want to bother with
+                       // the round-trips or risk them deciding they definitely want the HTLC and
+                       // force-closing to ensure they get it if we're offline.
+                       // We previously had a much more aggressive check here which tried to ensure
+                       // our counterparty receives an HTLC which has *our* risk threshold met on it,
+                       // but there is no need to do that, and since we're a bit conservative with our
+                       // risk threshold it just results in failing to forward payments.
+                       if (outgoing_cltv_value) as u64 <= (cur_height + LATENCY_GRACE_PERIOD_BLOCKS) as u64 {
+                               break Some(("Outgoing CLTV value is too soon", 0x1000 | 14, chan_update_opt));
+                       }
+
+                       break None;
+               }
+               {
+                       let mut res = VecWriter(Vec::with_capacity(chan_update.serialized_length() + 2 + 8 + 2));
+                       if let Some(chan_update) = chan_update {
+                               if code == 0x1000 | 11 || code == 0x1000 | 12 {
+                                       msg.amount_msat.write(&mut res).expect("Writes cannot fail");
+                               }
+                               else if code == 0x1000 | 13 {
+                                       msg.cltv_expiry.write(&mut res).expect("Writes cannot fail");
+                               }
+                               else if code == 0x1000 | 20 {
+                                       // TODO: underspecified, follow https://github.com/lightning/bolts/issues/791
+                                       0u16.write(&mut res).expect("Writes cannot fail");
+                               }
+                               (chan_update.serialized_length() as u16 + 2).write(&mut res).expect("Writes cannot fail");
+                               msgs::ChannelUpdate::TYPE.write(&mut res).expect("Writes cannot fail");
+                               chan_update.write(&mut res).expect("Writes cannot fail");
+                       } else if code & 0x1000 == 0x1000 {
+                               // If we're trying to return an error that requires a `channel_update` but
+                               // we're forwarding to a phantom or intercept "channel" (i.e. cannot
+                               // generate an update), just use the generic "temporary_node_failure"
+                               // instead.
+                               code = 0x2000 | 2;
+                       }
+                       return_err!(err, code, &res.0[..]);
+               }
+               Ok((next_hop, shared_secret, next_packet_pk_opt))
+       }
 
-               let pending_forward_info = match next_hop {
+       fn construct_pending_htlc_status<'a>(
+               &self, msg: &msgs::UpdateAddHTLC, shared_secret: [u8; 32], decoded_hop: onion_utils::Hop,
+               allow_underpay: bool, next_packet_pubkey_opt: Option<Result<PublicKey, secp256k1::Error>>
+       ) -> PendingHTLCStatus {
+               macro_rules! return_err {
+                       ($msg: expr, $err_code: expr, $data: expr) => {
+                               {
+                                       log_info!(self.logger, "Failed to accept/forward incoming HTLC: {}", $msg);
+                                       return PendingHTLCStatus::Fail(HTLCFailureMsg::Relay(msgs::UpdateFailHTLC {
+                                               channel_id: msg.channel_id,
+                                               htlc_id: msg.htlc_id,
+                                               reason: HTLCFailReason::reason($err_code, $data.to_vec())
+                                                       .get_encrypted_failure_packet(&shared_secret, &None),
+                                       }));
+                               }
+                       }
+               }
+               match decoded_hop {
                        onion_utils::Hop::Receive(next_hop_data) => {
                                // OUR PAYMENT!
-                               match self.construct_recv_pending_htlc_info(next_hop_data, shared_secret, msg.payment_hash, msg.amount_msat, msg.cltv_expiry, None) {
+                               match self.construct_recv_pending_htlc_info(next_hop_data, shared_secret, msg.payment_hash,
+                                       msg.amount_msat, msg.cltv_expiry, None, allow_underpay, msg.skimmed_fee_msat)
+                               {
                                        Ok(info) => {
                                                // Note that we could obviously respond immediately with an update_fulfill_htlc
                                                // message, however that would leak that we are the recipient of this payment, so
@@ -2686,179 +3046,17 @@ where
                                                // delay) once they've send us a commitment_signed!
                                                PendingHTLCStatus::Forward(info)
                                        },
-                                       Err(ReceiveError { err_code, err_data, msg }) => return_err!(msg, err_code, &err_data)
+                                       Err(InboundOnionErr { err_code, err_data, msg }) => return_err!(msg, err_code, &err_data)
                                }
                        },
                        onion_utils::Hop::Forward { next_hop_data, next_hop_hmac, new_packet_bytes } => {
-                               let new_pubkey = msg.onion_routing_packet.public_key.unwrap();
-                               let outgoing_packet = msgs::OnionPacket {
-                                       version: 0,
-                                       public_key: onion_utils::next_hop_packet_pubkey(&self.secp_ctx, new_pubkey, &shared_secret),
-                                       hop_data: new_packet_bytes,
-                                       hmac: next_hop_hmac.clone(),
-                               };
-
-                               let short_channel_id = match next_hop_data.format {
-                                       msgs::OnionHopDataFormat::NonFinalNode { short_channel_id } => short_channel_id,
-                                       msgs::OnionHopDataFormat::FinalNode { .. } => {
-                                               return_err!("Final Node OnionHopData provided for us as an intermediary node", 0x4000 | 22, &[0;0]);
-                                       },
-                               };
-
-                               PendingHTLCStatus::Forward(PendingHTLCInfo {
-                                       routing: PendingHTLCRouting::Forward {
-                                               onion_packet: outgoing_packet,
-                                               short_channel_id,
-                                       },
-                                       payment_hash: msg.payment_hash.clone(),
-                                       incoming_shared_secret: shared_secret,
-                                       incoming_amt_msat: Some(msg.amount_msat),
-                                       outgoing_amt_msat: next_hop_data.amt_to_forward,
-                                       outgoing_cltv_value: next_hop_data.outgoing_cltv_value,
-                               })
-                       }
-               };
-
-               if let &PendingHTLCStatus::Forward(PendingHTLCInfo { ref routing, ref outgoing_amt_msat, ref outgoing_cltv_value, .. }) = &pending_forward_info {
-                       // If short_channel_id is 0 here, we'll reject the HTLC as there cannot be a channel
-                       // with a short_channel_id of 0. This is important as various things later assume
-                       // short_channel_id is non-0 in any ::Forward.
-                       if let &PendingHTLCRouting::Forward { ref short_channel_id, .. } = routing {
-                               if let Some((err, mut code, chan_update)) = loop {
-                                       let id_option = self.short_to_chan_info.read().unwrap().get(short_channel_id).cloned();
-                                       let forwarding_chan_info_opt = match id_option {
-                                               None => { // unknown_next_peer
-                                                       // Note that this is likely a timing oracle for detecting whether an scid is a
-                                                       // phantom or an intercept.
-                                                       if (self.default_configuration.accept_intercept_htlcs &&
-                                                          fake_scid::is_valid_intercept(&self.fake_scid_rand_bytes, *short_channel_id, &self.genesis_hash)) ||
-                                                          fake_scid::is_valid_phantom(&self.fake_scid_rand_bytes, *short_channel_id, &self.genesis_hash)
-                                                       {
-                                                               None
-                                                       } else {
-                                                               break Some(("Don't have available channel for forwarding as requested.", 0x4000 | 10, None));
-                                                       }
-                                               },
-                                               Some((cp_id, id)) => Some((cp_id.clone(), id.clone())),
-                                       };
-                                       let chan_update_opt = if let Some((counterparty_node_id, forwarding_id)) = forwarding_chan_info_opt {
-                                               let per_peer_state = self.per_peer_state.read().unwrap();
-                                               let peer_state_mutex_opt = per_peer_state.get(&counterparty_node_id);
-                                               if peer_state_mutex_opt.is_none() {
-                                                       break Some(("Don't have available channel for forwarding as requested.", 0x4000 | 10, None));
-                                               }
-                                               let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap();
-                                               let peer_state = &mut *peer_state_lock;
-                                               let chan = match peer_state.channel_by_id.get_mut(&forwarding_id) {
-                                                       None => {
-                                                               // Channel was removed. The short_to_chan_info and channel_by_id maps
-                                                               // have no consistency guarantees.
-                                                               break Some(("Don't have available channel for forwarding as requested.", 0x4000 | 10, None));
-                                                       },
-                                                       Some(chan) => chan
-                                               };
-                                               if !chan.context.should_announce() && !self.default_configuration.accept_forwards_to_priv_channels {
-                                                       // Note that the behavior here should be identical to the above block - we
-                                                       // should NOT reveal the existence or non-existence of a private channel if
-                                                       // we don't allow forwards outbound over them.
-                                                       break Some(("Refusing to forward to a private channel based on our config.", 0x4000 | 10, None));
-                                               }
-                                               if chan.context.get_channel_type().supports_scid_privacy() && *short_channel_id != chan.context.outbound_scid_alias() {
-                                                       // `option_scid_alias` (referred to in LDK as `scid_privacy`) means
-                                                       // "refuse to forward unless the SCID alias was used", so we pretend
-                                                       // we don't have the channel here.
-                                                       break Some(("Refusing to forward over real channel SCID as our counterparty requested.", 0x4000 | 10, None));
-                                               }
-                                               let chan_update_opt = self.get_channel_update_for_onion(*short_channel_id, chan).ok();
-
-                                               // Note that we could technically not return an error yet here and just hope
-                                               // that the connection is reestablished or monitor updated by the time we get
-                                               // around to doing the actual forward, but better to fail early if we can and
-                                               // hopefully an attacker trying to path-trace payments cannot make this occur
-                                               // on a small/per-node/per-channel scale.
-                                               if !chan.context.is_live() { // channel_disabled
-                                                       // If the channel_update we're going to return is disabled (i.e. the
-                                                       // peer has been disabled for some time), return `channel_disabled`,
-                                                       // otherwise return `temporary_channel_failure`.
-                                                       if chan_update_opt.as_ref().map(|u| u.contents.flags & 2 == 2).unwrap_or(false) {
-                                                               break Some(("Forwarding channel has been disconnected for some time.", 0x1000 | 20, chan_update_opt));
-                                                       } else {
-                                                               break Some(("Forwarding channel is not in a ready state.", 0x1000 | 7, chan_update_opt));
-                                                       }
-                                               }
-                                               if *outgoing_amt_msat < chan.context.get_counterparty_htlc_minimum_msat() { // amount_below_minimum
-                                                       break Some(("HTLC amount was below the htlc_minimum_msat", 0x1000 | 11, chan_update_opt));
-                                               }
-                                               if let Err((err, code)) = chan.htlc_satisfies_config(&msg, *outgoing_amt_msat, *outgoing_cltv_value) {
-                                                       break Some((err, code, chan_update_opt));
-                                               }
-                                               chan_update_opt
-                                       } else {
-                                               if (msg.cltv_expiry as u64) < (*outgoing_cltv_value) as u64 + MIN_CLTV_EXPIRY_DELTA as u64 {
-                                                       // We really should set `incorrect_cltv_expiry` here but as we're not
-                                                       // forwarding over a real channel we can't generate a channel_update
-                                                       // for it. Instead we just return a generic temporary_node_failure.
-                                                       break Some((
-                                                               "Forwarding node has tampered with the intended HTLC values or origin node has an obsolete cltv_expiry_delta",
-                                                               0x2000 | 2, None,
-                                                       ));
-                                               }
-                                               None
-                                       };
-
-                                       let cur_height = self.best_block.read().unwrap().height() + 1;
-                                       // Theoretically, channel counterparty shouldn't send us a HTLC expiring now,
-                                       // but we want to be robust wrt to counterparty packet sanitization (see
-                                       // HTLC_FAIL_BACK_BUFFER rationale).
-                                       if msg.cltv_expiry <= cur_height + HTLC_FAIL_BACK_BUFFER as u32 { // expiry_too_soon
-                                               break Some(("CLTV expiry is too close", 0x1000 | 14, chan_update_opt));
-                                       }
-                                       if msg.cltv_expiry > cur_height + CLTV_FAR_FAR_AWAY as u32 { // expiry_too_far
-                                               break Some(("CLTV expiry is too far in the future", 21, None));
-                                       }
-                                       // If the HTLC expires ~now, don't bother trying to forward it to our
-                                       // counterparty. They should fail it anyway, but we don't want to bother with
-                                       // the round-trips or risk them deciding they definitely want the HTLC and
-                                       // force-closing to ensure they get it if we're offline.
-                                       // We previously had a much more aggressive check here which tried to ensure
-                                       // our counterparty receives an HTLC which has *our* risk threshold met on it,
-                                       // but there is no need to do that, and since we're a bit conservative with our
-                                       // risk threshold it just results in failing to forward payments.
-                                       if (*outgoing_cltv_value) as u64 <= (cur_height + LATENCY_GRACE_PERIOD_BLOCKS) as u64 {
-                                               break Some(("Outgoing CLTV value is too soon", 0x1000 | 14, chan_update_opt));
-                                       }
-
-                                       break None;
-                               }
-                               {
-                                       let mut res = VecWriter(Vec::with_capacity(chan_update.serialized_length() + 2 + 8 + 2));
-                                       if let Some(chan_update) = chan_update {
-                                               if code == 0x1000 | 11 || code == 0x1000 | 12 {
-                                                       msg.amount_msat.write(&mut res).expect("Writes cannot fail");
-                                               }
-                                               else if code == 0x1000 | 13 {
-                                                       msg.cltv_expiry.write(&mut res).expect("Writes cannot fail");
-                                               }
-                                               else if code == 0x1000 | 20 {
-                                                       // TODO: underspecified, follow https://github.com/lightning/bolts/issues/791
-                                                       0u16.write(&mut res).expect("Writes cannot fail");
-                                               }
-                                               (chan_update.serialized_length() as u16 + 2).write(&mut res).expect("Writes cannot fail");
-                                               msgs::ChannelUpdate::TYPE.write(&mut res).expect("Writes cannot fail");
-                                               chan_update.write(&mut res).expect("Writes cannot fail");
-                                       } else if code & 0x1000 == 0x1000 {
-                                               // If we're trying to return an error that requires a `channel_update` but
-                                               // we're forwarding to a phantom or intercept "channel" (i.e. cannot
-                                               // generate an update), just use the generic "temporary_node_failure"
-                                               // instead.
-                                               code = 0x2000 | 2;
-                                       }
-                                       return_err!(err, code, &res.0[..]);
+                               match self.construct_fwd_pending_htlc_info(msg, next_hop_data, next_hop_hmac,
+                                       new_packet_bytes, shared_secret, next_packet_pubkey_opt) {
+                                       Ok(info) => PendingHTLCStatus::Forward(info),
+                                       Err(InboundOnionErr { err_code, err_data, msg }) => return_err!(msg, err_code, &err_data)
                                }
                        }
                }
-
-               pending_forward_info
        }
 
        /// Gets the current [`channel_update`] for the given channel. This first checks if the channel is
@@ -2944,10 +3142,17 @@ where
        #[cfg(test)]
        pub(crate) fn test_send_payment_along_path(&self, path: &Path, payment_hash: &PaymentHash, recipient_onion: RecipientOnionFields, total_value: u64, cur_height: u32, payment_id: PaymentId, keysend_preimage: &Option<PaymentPreimage>, session_priv_bytes: [u8; 32]) -> Result<(), APIError> {
                let _lck = self.total_consistency_lock.read().unwrap();
-               self.send_payment_along_path(path, payment_hash, recipient_onion, total_value, cur_height, payment_id, keysend_preimage, session_priv_bytes)
+               self.send_payment_along_path(SendAlongPathArgs {
+                       path, payment_hash, recipient_onion, total_value, cur_height, payment_id, keysend_preimage,
+                       session_priv_bytes
+               })
        }
 
-       fn send_payment_along_path(&self, path: &Path, payment_hash: &PaymentHash, recipient_onion: RecipientOnionFields, total_value: u64, cur_height: u32, payment_id: PaymentId, keysend_preimage: &Option<PaymentPreimage>, session_priv_bytes: [u8; 32]) -> Result<(), APIError> {
+       fn send_payment_along_path(&self, args: SendAlongPathArgs) -> Result<(), APIError> {
+               let SendAlongPathArgs {
+                       path, payment_hash, recipient_onion, total_value, cur_height, payment_id, keysend_preimage,
+                       session_priv_bytes
+               } = args;
                // The top-level caller should hold the total_consistency_lock read lock.
                debug_assert!(self.total_consistency_lock.try_write().is_err());
 
@@ -2984,22 +3189,21 @@ where
                                                session_priv: session_priv.clone(),
                                                first_hop_htlc_msat: htlc_msat,
                                                payment_id,
-                                       }, onion_packet, &self.logger);
+                                       }, onion_packet, None, &self.fee_estimator, &self.logger);
                                match break_chan_entry!(self, send_res, chan) {
                                        Some(monitor_update) => {
-                                               let update_id = monitor_update.update_id;
-                                               let update_res = self.chain_monitor.update_channel(funding_txo, monitor_update);
-                                               if let Err(e) = handle_new_monitor_update!(self, update_res, update_id, peer_state_lock, peer_state, per_peer_state, chan) {
-                                                       break Err(e);
-                                               }
-                                               if update_res == ChannelMonitorUpdateStatus::InProgress {
-                                                       // Note that MonitorUpdateInProgress here indicates (per function
-                                                       // docs) that we will resend the commitment update once monitor
-                                                       // updating completes. Therefore, we must return an error
-                                                       // indicating that it is unsafe to retry the payment wholesale,
-                                                       // which we do in the send_payment check for
-                                                       // MonitorUpdateInProgress, below.
-                                                       return Err(APIError::MonitorUpdateInProgress);
+                                               match handle_new_monitor_update!(self, funding_txo, monitor_update, peer_state_lock, peer_state, per_peer_state, chan) {
+                                                       Err(e) => break Err(e),
+                                                       Ok(false) => {
+                                                               // Note that MonitorUpdateInProgress here indicates (per function
+                                                               // docs) that we will resend the commitment update once monitor
+                                                               // updating completes. Therefore, we must return an error
+                                                               // indicating that it is unsafe to retry the payment wholesale,
+                                                               // which we do in the send_payment check for
+                                                               // MonitorUpdateInProgress, below.
+                                                               return Err(APIError::MonitorUpdateInProgress);
+                                                       },
+                                                       Ok(true) => {},
                                                }
                                        },
                                        None => { },
@@ -3068,6 +3272,7 @@ where
        /// irrevocably committed to on our end. In such a case, do NOT retry the payment with a
        /// different route unless you intend to pay twice!
        ///
+       /// [`RouteHop`]: crate::routing::router::RouteHop
        /// [`Event::PaymentSent`]: events::Event::PaymentSent
        /// [`Event::PaymentFailed`]: events::Event::PaymentFailed
        /// [`UpdateHTLCs`]: events::MessageSendEvent::UpdateHTLCs
@@ -3077,9 +3282,9 @@ where
                let best_block_height = self.best_block.read().unwrap().height();
                let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self);
                self.pending_outbound_payments
-                       .send_payment_with_route(route, payment_hash, recipient_onion, payment_id, &self.entropy_source, &self.node_signer, best_block_height,
-                               |path, payment_hash, recipient_onion, total_value, cur_height, payment_id, keysend_preimage, session_priv|
-                               self.send_payment_along_path(path, payment_hash, recipient_onion, total_value, cur_height, payment_id, keysend_preimage, session_priv))
+                       .send_payment_with_route(route, payment_hash, recipient_onion, payment_id,
+                               &self.entropy_source, &self.node_signer, best_block_height,
+                               |args| self.send_payment_along_path(args))
        }
 
        /// Similar to [`ChannelManager::send_payment_with_route`], but will automatically find a route based on
@@ -3091,18 +3296,16 @@ where
                        .send_payment(payment_hash, recipient_onion, payment_id, retry_strategy, route_params,
                                &self.router, self.list_usable_channels(), || self.compute_inflight_htlcs(),
                                &self.entropy_source, &self.node_signer, best_block_height, &self.logger,
-                               &self.pending_events,
-                               |path, payment_hash, recipient_onion, total_value, cur_height, payment_id, keysend_preimage, session_priv|
-                               self.send_payment_along_path(path, payment_hash, recipient_onion, total_value, cur_height, payment_id, keysend_preimage, session_priv))
+                               &self.pending_events, |args| self.send_payment_along_path(args))
        }
 
        #[cfg(test)]
        pub(super) fn test_send_payment_internal(&self, route: &Route, payment_hash: PaymentHash, recipient_onion: RecipientOnionFields, keysend_preimage: Option<PaymentPreimage>, payment_id: PaymentId, recv_value_msat: Option<u64>, onion_session_privs: Vec<[u8; 32]>) -> Result<(), PaymentSendFailure> {
                let best_block_height = self.best_block.read().unwrap().height();
                let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self);
-               self.pending_outbound_payments.test_send_payment_internal(route, payment_hash, recipient_onion, keysend_preimage, payment_id, recv_value_msat, onion_session_privs, &self.node_signer, best_block_height,
-                       |path, payment_hash, recipient_onion, total_value, cur_height, payment_id, keysend_preimage, session_priv|
-                       self.send_payment_along_path(path, payment_hash, recipient_onion, total_value, cur_height, payment_id, keysend_preimage, session_priv))
+               self.pending_outbound_payments.test_send_payment_internal(route, payment_hash, recipient_onion,
+                       keysend_preimage, payment_id, recv_value_msat, onion_session_privs, &self.node_signer,
+                       best_block_height, |args| self.send_payment_along_path(args))
        }
 
        #[cfg(test)]
@@ -3156,9 +3359,7 @@ where
                let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self);
                self.pending_outbound_payments.send_spontaneous_payment_with_route(
                        route, payment_preimage, recipient_onion, payment_id, &self.entropy_source,
-                       &self.node_signer, best_block_height,
-                       |path, payment_hash, recipient_onion, total_value, cur_height, payment_id, keysend_preimage, session_priv|
-                       self.send_payment_along_path(path, payment_hash, recipient_onion, total_value, cur_height, payment_id, keysend_preimage, session_priv))
+                       &self.node_signer, best_block_height, |args| self.send_payment_along_path(args))
        }
 
        /// Similar to [`ChannelManager::send_spontaneous_payment`], but will automatically find a route
@@ -3174,9 +3375,7 @@ where
                self.pending_outbound_payments.send_spontaneous_payment(payment_preimage, recipient_onion,
                        payment_id, retry_strategy, route_params, &self.router, self.list_usable_channels(),
                        || self.compute_inflight_htlcs(),  &self.entropy_source, &self.node_signer, best_block_height,
-                       &self.logger, &self.pending_events,
-                       |path, payment_hash, recipient_onion, total_value, cur_height, payment_id, keysend_preimage, session_priv|
-                       self.send_payment_along_path(path, payment_hash, recipient_onion, total_value, cur_height, payment_id, keysend_preimage, session_priv))
+                       &self.logger, &self.pending_events, |args| self.send_payment_along_path(args))
        }
 
        /// Send a payment that is probing the given route for liquidity. We calculate the
@@ -3185,9 +3384,9 @@ where
        pub fn send_probe(&self, path: Path) -> Result<(PaymentHash, PaymentId), PaymentSendFailure> {
                let best_block_height = self.best_block.read().unwrap().height();
                let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self);
-               self.pending_outbound_payments.send_probe(path, self.probing_cookie_secret, &self.entropy_source, &self.node_signer, best_block_height,
-                       |path, payment_hash, recipient_onion, total_value, cur_height, payment_id, keysend_preimage, session_priv|
-                       self.send_payment_along_path(path, payment_hash, recipient_onion, total_value, cur_height, payment_id, keysend_preimage, session_priv))
+               self.pending_outbound_payments.send_probe(path, self.probing_cookie_secret,
+                       &self.entropy_source, &self.node_signer, best_block_height,
+                       |args| self.send_payment_along_path(args))
        }
 
        /// Returns whether a payment with the given [`PaymentHash`] and [`PaymentId`] is, in fact, a
@@ -3212,12 +3411,13 @@ where
                        Some(chan) => {
                                let funding_txo = find_funding_output(&chan, &funding_transaction)?;
 
-                               let funding_res = chan.get_outbound_funding_created(funding_transaction, funding_txo, &self.logger)
+                               let funding_res = chan.get_funding_created(funding_transaction, funding_txo, &self.logger)
                                        .map_err(|(mut chan, e)| if let ChannelError::Close(msg) = e {
                                                let channel_id = chan.context.channel_id();
                                                let user_id = chan.context.get_user_id();
                                                let shutdown_res = chan.context.force_shutdown(false);
-                                               (chan, MsgHandleErrInternal::from_finish_shutdown(msg, channel_id, user_id, shutdown_res, None))
+                                               let channel_capacity = chan.context.get_value_satoshis();
+                                               (chan, MsgHandleErrInternal::from_finish_shutdown(msg, channel_id, user_id, shutdown_res, None, channel_capacity))
                                        } else { unreachable!(); });
                                match funding_res {
                                        Ok((chan, funding_msg)) => (chan, funding_msg),
@@ -3385,27 +3585,48 @@ where
                let mut peer_state_lock = peer_state_mutex.lock().unwrap();
                let peer_state = &mut *peer_state_lock;
                for channel_id in channel_ids {
-                       if !peer_state.channel_by_id.contains_key(channel_id) {
+                       if !peer_state.has_channel(channel_id) {
                                return Err(APIError::ChannelUnavailable {
                                        err: format!("Channel with ID {} was not found for the passed counterparty_node_id {}", log_bytes!(*channel_id), counterparty_node_id),
                                });
-                       }
+                       };
                }
                for channel_id in channel_ids {
-                       let channel = peer_state.channel_by_id.get_mut(channel_id).unwrap();
-                       let mut config = channel.context.config();
-                       config.apply(config_update);
-                       if !channel.context.update_config(&config) {
+                       if let Some(channel) = peer_state.channel_by_id.get_mut(channel_id) {
+                               let mut config = channel.context.config();
+                               config.apply(config_update);
+                               if !channel.context.update_config(&config) {
+                                       continue;
+                               }
+                               if let Ok(msg) = self.get_channel_update_for_broadcast(channel) {
+                                       peer_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { msg });
+                               } else if let Ok(msg) = self.get_channel_update_for_unicast(channel) {
+                                       peer_state.pending_msg_events.push(events::MessageSendEvent::SendChannelUpdate {
+                                               node_id: channel.context.get_counterparty_node_id(),
+                                               msg,
+                                       });
+                               }
                                continue;
                        }
-                       if let Ok(msg) = self.get_channel_update_for_broadcast(channel) {
-                               peer_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { msg });
-                       } else if let Ok(msg) = self.get_channel_update_for_unicast(channel) {
-                               peer_state.pending_msg_events.push(events::MessageSendEvent::SendChannelUpdate {
-                                       node_id: channel.context.get_counterparty_node_id(),
-                                       msg,
+
+                       let context = if let Some(channel) = peer_state.inbound_v1_channel_by_id.get_mut(channel_id) {
+                               &mut channel.context
+                       } else if let Some(channel) = peer_state.outbound_v1_channel_by_id.get_mut(channel_id) {
+                               &mut channel.context
+                       } else {
+                               // This should not be reachable as we've already checked for non-existence in the previous channel_id loop.
+                               debug_assert!(false);
+                               return Err(APIError::ChannelUnavailable {
+                                       err: format!(
+                                               "Channel with ID {} for passed counterparty_node_id {} disappeared after we confirmed its existence - this should not be reachable!",
+                                               log_bytes!(*channel_id), counterparty_node_id),
                                });
-                       }
+                       };
+                       let mut config = context.config();
+                       config.apply(config_update);
+                       // We update the config, but we MUST NOT broadcast a `channel_update` before `channel_ready`
+                       // which would be the case for pending inbound/outbound channels.
+                       context.update_config(&config);
                }
                Ok(())
        }
@@ -3451,13 +3672,16 @@ where
        /// [`ChannelManager::fail_intercepted_htlc`] MUST be called in response to the event.
        ///
        /// Note that LDK does not enforce fee requirements in `amt_to_forward_msat`, and will not stop
-       /// you from forwarding more than you received.
+       /// you from forwarding more than you received. See
+       /// [`HTLCIntercepted::expected_outbound_amount_msat`] for more on forwarding a different amount
+       /// than expected.
        ///
        /// Errors if the event was not handled in time, in which case the HTLC was automatically failed
        /// backwards.
        ///
        /// [`UserConfig::accept_intercept_htlcs`]: crate::util::config::UserConfig::accept_intercept_htlcs
        /// [`HTLCIntercepted`]: events::Event::HTLCIntercepted
+       /// [`HTLCIntercepted::expected_outbound_amount_msat`]: events::Event::HTLCIntercepted::expected_outbound_amount_msat
        // TODO: when we move to deciding the best outbound channel at forward time, only take
        // `next_node_id` and not `next_hop_channel_id`
        pub fn forward_intercepted_htlc(&self, intercept_id: InterceptId, next_hop_channel_id: &[u8; 32], next_node_id: PublicKey, amt_to_forward_msat: u64) -> Result<(), APIError> {
@@ -3496,7 +3720,10 @@ where
                        },
                        _ => unreachable!() // Only `PendingHTLCRouting::Forward`s are intercepted
                };
+               let skimmed_fee_msat =
+                       payment.forward_info.outgoing_amt_msat.saturating_sub(amt_to_forward_msat);
                let pending_htlc_info = PendingHTLCInfo {
+                       skimmed_fee_msat: if skimmed_fee_msat == 0 { None } else { Some(skimmed_fee_msat) },
                        outgoing_amt_msat: amt_to_forward_msat, routing, ..payment.forward_info
                };
 
@@ -3566,7 +3793,7 @@ where
                                                                                prev_short_channel_id, prev_htlc_id, prev_funding_outpoint, prev_user_channel_id,
                                                                                forward_info: PendingHTLCInfo {
                                                                                        routing, incoming_shared_secret, payment_hash, outgoing_amt_msat,
-                                                                                       outgoing_cltv_value, incoming_amt_msat: _
+                                                                                       outgoing_cltv_value, ..
                                                                                }
                                                                        }) => {
                                                                                macro_rules! failure_handler {
@@ -3628,9 +3855,12 @@ where
                                                                                                };
                                                                                                match next_hop {
                                                                                                        onion_utils::Hop::Receive(hop_data) => {
-                                                                                                               match self.construct_recv_pending_htlc_info(hop_data, incoming_shared_secret, payment_hash, outgoing_amt_msat, outgoing_cltv_value, Some(phantom_shared_secret)) {
+                                                                                                               match self.construct_recv_pending_htlc_info(hop_data,
+                                                                                                                       incoming_shared_secret, payment_hash, outgoing_amt_msat,
+                                                                                                                       outgoing_cltv_value, Some(phantom_shared_secret), false, None)
+                                                                                                               {
                                                                                                                        Ok(info) => phantom_receives.push((prev_short_channel_id, prev_funding_outpoint, prev_user_channel_id, vec![(info, prev_htlc_id)])),
-                                                                                                                       Err(ReceiveError { err_code, err_data, msg }) => failed_payment!(msg, err_code, err_data, Some(phantom_shared_secret))
+                                                                                                                       Err(InboundOnionErr { err_code, err_data, msg }) => failed_payment!(msg, err_code, err_data, Some(phantom_shared_secret))
                                                                                                                }
                                                                                                        },
                                                                                                        _ => panic!(),
@@ -3679,7 +3909,7 @@ where
                                                                                prev_short_channel_id, prev_htlc_id, prev_funding_outpoint, prev_user_channel_id: _,
                                                                                forward_info: PendingHTLCInfo {
                                                                                        incoming_shared_secret, payment_hash, outgoing_amt_msat, outgoing_cltv_value,
-                                                                                       routing: PendingHTLCRouting::Forward { onion_packet, .. }, incoming_amt_msat: _,
+                                                                                       routing: PendingHTLCRouting::Forward { onion_packet, .. }, skimmed_fee_msat, ..
                                                                                },
                                                                        }) => {
                                                                                log_trace!(self.logger, "Adding HTLC from short id {} with payment_hash {} to channel with short id {} after delay", prev_short_channel_id, log_bytes!(payment_hash.0), short_chan_id);
@@ -3693,7 +3923,8 @@ where
                                                                                });
                                                                                if let Err(e) = chan.get_mut().queue_add_htlc(outgoing_amt_msat,
                                                                                        payment_hash, outgoing_cltv_value, htlc_source.clone(),
-                                                                                       onion_packet, &self.logger)
+                                                                                       onion_packet, skimmed_fee_msat, &self.fee_estimator,
+                                                                                       &self.logger)
                                                                                {
                                                                                        if let ChannelError::Ignore(msg) = e {
                                                                                                log_trace!(self.logger, "Failed to forward HTLC with payment_hash {}: {}", log_bytes!(payment_hash.0), msg);
@@ -3737,21 +3968,23 @@ where
                                                        HTLCForwardInfo::AddHTLC(PendingAddHTLCInfo {
                                                                prev_short_channel_id, prev_htlc_id, prev_funding_outpoint, prev_user_channel_id,
                                                                forward_info: PendingHTLCInfo {
-                                                                       routing, incoming_shared_secret, payment_hash, incoming_amt_msat, outgoing_amt_msat, ..
+                                                                       routing, incoming_shared_secret, payment_hash, incoming_amt_msat, outgoing_amt_msat,
+                                                                       skimmed_fee_msat, ..
                                                                }
                                                        }) => {
                                                                let (cltv_expiry, onion_payload, payment_data, phantom_shared_secret, mut onion_fields) = match routing {
-                                                                       PendingHTLCRouting::Receive { payment_data, payment_metadata, incoming_cltv_expiry, phantom_shared_secret } => {
+                                                                       PendingHTLCRouting::Receive { payment_data, payment_metadata, incoming_cltv_expiry, phantom_shared_secret, custom_tlvs } => {
                                                                                let _legacy_hop_data = Some(payment_data.clone());
-                                                                               let onion_fields =
-                                                                                       RecipientOnionFields { payment_secret: Some(payment_data.payment_secret), payment_metadata };
+                                                                               let onion_fields = RecipientOnionFields { payment_secret: Some(payment_data.payment_secret),
+                                                                                               payment_metadata, custom_tlvs };
                                                                                (incoming_cltv_expiry, OnionPayload::Invoice { _legacy_hop_data },
                                                                                        Some(payment_data), phantom_shared_secret, onion_fields)
                                                                        },
-                                                                       PendingHTLCRouting::ReceiveKeysend { payment_data, payment_preimage, payment_metadata, incoming_cltv_expiry } => {
+                                                                       PendingHTLCRouting::ReceiveKeysend { payment_data, payment_preimage, payment_metadata, incoming_cltv_expiry, custom_tlvs } => {
                                                                                let onion_fields = RecipientOnionFields {
                                                                                        payment_secret: payment_data.as_ref().map(|data| data.payment_secret),
-                                                                                       payment_metadata
+                                                                                       payment_metadata,
+                                                                                       custom_tlvs,
                                                                                };
                                                                                (incoming_cltv_expiry, OnionPayload::Spontaneous(payment_preimage),
                                                                                        payment_data, None, onion_fields)
@@ -3778,6 +4011,7 @@ where
                                                                        total_msat: if let Some(data) = &payment_data { data.total_msat } else { outgoing_amt_msat },
                                                                        cltv_expiry,
                                                                        onion_payload,
+                                                                       counterparty_skimmed_fee_msat: skimmed_fee_msat,
                                                                };
 
                                                                let mut committed_to_claimable = false;
@@ -3874,11 +4108,16 @@ where
                                                                                        htlcs.push(claimable_htlc);
                                                                                        let amount_msat = htlcs.iter().map(|htlc| htlc.value).sum();
                                                                                        htlcs.iter_mut().for_each(|htlc| htlc.total_value_received = Some(amount_msat));
+                                                                                       let counterparty_skimmed_fee_msat = htlcs.iter()
+                                                                                               .map(|htlc| htlc.counterparty_skimmed_fee_msat.unwrap_or(0)).sum();
+                                                                                       debug_assert!(total_value.saturating_sub(amount_msat) <=
+                                                                                               counterparty_skimmed_fee_msat);
                                                                                        new_events.push_back((events::Event::PaymentClaimable {
                                                                                                receiver_node_id: Some(receiver_node_id),
                                                                                                payment_hash,
                                                                                                purpose: $purpose,
                                                                                                amount_msat,
+                                                                                               counterparty_skimmed_fee_msat,
                                                                                                via_channel_id: Some(prev_channel_id),
                                                                                                via_user_channel_id: Some(prev_user_channel_id),
                                                                                                claim_deadline: Some(earliest_expiry - HTLC_FAIL_BACK_BUFFER),
@@ -3975,9 +4214,7 @@ where
                let best_block_height = self.best_block.read().unwrap().height();
                self.pending_outbound_payments.check_retry_payments(&self.router, || self.list_usable_channels(),
                        || self.compute_inflight_htlcs(), &self.entropy_source, &self.node_signer, best_block_height,
-                       &self.pending_events, &self.logger,
-                       |path, payment_hash, recipient_onion, total_value, cur_height, payment_id, keysend_preimage, session_priv|
-                       self.send_payment_along_path(path, payment_hash, recipient_onion, total_value, cur_height, payment_id, keysend_preimage, session_priv));
+                       &self.pending_events, &self.logger, |args| self.send_payment_along_path(args));
 
                for (htlc_source, payment_hash, failure_reason, destination) in failed_forwards.drain(..) {
                        self.fail_htlc_backwards_internal(&htlc_source, &payment_hash, &failure_reason, destination);
@@ -4001,7 +4238,6 @@ where
        fn process_background_events(&self) -> NotifyOption {
                debug_assert_ne!(self.total_consistency_lock.held_by_thread(), LockHeldState::NotHeldByThread);
 
-               #[cfg(debug_assertions)]
                self.background_events_processed_since_startup.store(true, Ordering::Release);
 
                let mut background_events = Vec::new();
@@ -4012,14 +4248,13 @@ where
 
                for event in background_events.drain(..) {
                        match event {
-                               BackgroundEvent::ClosingMonitorUpdateRegeneratedOnStartup((funding_txo, update)) => {
+                               BackgroundEvent::ClosedMonitorUpdateRegeneratedOnStartup((funding_txo, update)) => {
                                        // The channel has already been closed, so no use bothering to care about the
                                        // monitor updating completing.
                                        let _ = self.chain_monitor.update_channel(funding_txo, &update);
                                },
                                BackgroundEvent::MonitorUpdateRegeneratedOnStartup { counterparty_node_id, funding_txo, update } => {
-                                       let update_res = self.chain_monitor.update_channel(funding_txo, &update);
-
+                                       let mut updated_chan = false;
                                        let res = {
                                                let per_peer_state = self.per_peer_state.read().unwrap();
                                                if let Some(peer_state_mutex) = per_peer_state.get(&counterparty_node_id) {
@@ -4027,12 +4262,18 @@ where
                                                        let peer_state = &mut *peer_state_lock;
                                                        match peer_state.channel_by_id.entry(funding_txo.to_channel_id()) {
                                                                hash_map::Entry::Occupied(mut chan) => {
-                                                                       handle_new_monitor_update!(self, update_res, update.update_id, peer_state_lock, peer_state, per_peer_state, chan)
+                                                                       updated_chan = true;
+                                                                       handle_new_monitor_update!(self, funding_txo, update.clone(),
+                                                                               peer_state_lock, peer_state, per_peer_state, chan).map(|_| ())
                                                                },
                                                                hash_map::Entry::Vacant(_) => Ok(()),
                                                        }
                                                } else { Ok(()) }
                                        };
+                                       if !updated_chan {
+                                               // TODO: Track this as in-flight even though the channel is closed.
+                                               let _ = self.chain_monitor.update_channel(funding_txo, &update);
+                                       }
                                        // TODO: If this channel has since closed, we're likely providing a payment
                                        // preimage update, which we must ensure is durable! We currently don't,
                                        // however, ensure that.
@@ -4042,6 +4283,22 @@ where
                                        }
                                        let _ = handle_error!(self, res, counterparty_node_id);
                                },
+                               BackgroundEvent::MonitorUpdatesComplete { counterparty_node_id, channel_id } => {
+                                       let per_peer_state = self.per_peer_state.read().unwrap();
+                                       if let Some(peer_state_mutex) = per_peer_state.get(&counterparty_node_id) {
+                                               let mut peer_state_lock = peer_state_mutex.lock().unwrap();
+                                               let peer_state = &mut *peer_state_lock;
+                                               if let Some(chan) = peer_state.channel_by_id.get_mut(&channel_id) {
+                                                       handle_monitor_update_completion!(self, peer_state_lock, peer_state, per_peer_state, chan);
+                                               } else {
+                                                       let update_actions = peer_state.monitor_update_blocked_actions
+                                                               .remove(&channel_id).unwrap_or(Vec::new());
+                                                       mem::drop(peer_state_lock);
+                                                       mem::drop(per_peer_state);
+                                                       self.handle_monitor_update_completion_actions(update_actions);
+                                               }
+                                       }
+                               },
                        }
                }
                NotifyOption::DoPersist
@@ -4070,7 +4327,7 @@ where
                log_trace!(self.logger, "Channel {} qualifies for a feerate change from {} to {}.",
                        log_bytes!(chan_id[..]), chan.context.get_feerate_sat_per_1000_weight(), new_feerate);
 
-               chan.queue_update_fee(new_feerate, &self.logger);
+               chan.queue_update_fee(new_feerate, &self.fee_estimator, &self.logger);
                NotifyOption::DoPersist
        }
 
@@ -4083,13 +4340,19 @@ where
                PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, &self.persistence_notifier, || {
                        let mut should_persist = self.process_background_events();
 
-                       let new_feerate = self.fee_estimator.bounded_sat_per_1000_weight(ConfirmationTarget::Normal);
+                       let normal_feerate = self.fee_estimator.bounded_sat_per_1000_weight(ConfirmationTarget::Normal);
+                       let min_mempool_feerate = self.fee_estimator.bounded_sat_per_1000_weight(ConfirmationTarget::MempoolMinimum);
 
                        let per_peer_state = self.per_peer_state.read().unwrap();
                        for (_cp_id, peer_state_mutex) in per_peer_state.iter() {
                                let mut peer_state_lock = peer_state_mutex.lock().unwrap();
                                let peer_state = &mut *peer_state_lock;
                                for (chan_id, chan) in peer_state.channel_by_id.iter_mut() {
+                                       let new_feerate = if chan.context.get_channel_type().supports_anchors_zero_fee_htlc_tx() {
+                                               min_mempool_feerate
+                                       } else {
+                                               normal_feerate
+                                       };
                                        let chan_needs_persist = self.update_channel_fee(chan_id, chan, new_feerate);
                                        if chan_needs_persist == NotifyOption::DoPersist { should_persist = NotifyOption::DoPersist; }
                                }
@@ -4109,6 +4372,7 @@ where
        ///  * Expiring a channel's previous [`ChannelConfig`] if necessary to only allow forwarding HTLCs
        ///    with the current [`ChannelConfig`].
        ///  * Removing peers which have disconnected but and no longer have any channels.
+       ///  * Force-closing and removing channels which have not completed establishment in a timely manner.
        ///
        /// Note that this may cause reentrancy through [`chain::Watch::update_channel`] calls or feerate
        /// estimate fetches.
@@ -4119,7 +4383,8 @@ where
                PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, &self.persistence_notifier, || {
                        let mut should_persist = self.process_background_events();
 
-                       let new_feerate = self.fee_estimator.bounded_sat_per_1000_weight(ConfirmationTarget::Normal);
+                       let normal_feerate = self.fee_estimator.bounded_sat_per_1000_weight(ConfirmationTarget::Normal);
+                       let min_mempool_feerate = self.fee_estimator.bounded_sat_per_1000_weight(ConfirmationTarget::MempoolMinimum);
 
                        let mut handle_errors: Vec<(Result<(), _>, _)> = Vec::new();
                        let mut timed_out_mpp_htlcs = Vec::new();
@@ -4132,6 +4397,11 @@ where
                                        let pending_msg_events = &mut peer_state.pending_msg_events;
                                        let counterparty_node_id = *counterparty_node_id;
                                        peer_state.channel_by_id.retain(|chan_id, chan| {
+                                               let new_feerate = if chan.context.get_channel_type().supports_anchors_zero_fee_htlc_tx() {
+                                                       min_mempool_feerate
+                                               } else {
+                                                       normal_feerate
+                                               };
                                                let chan_needs_persist = self.update_channel_fee(chan_id, chan, new_feerate);
                                                if chan_needs_persist == NotifyOption::DoPersist { should_persist = NotifyOption::DoPersist; }
 
@@ -4197,6 +4467,26 @@ where
 
                                                true
                                        });
+
+                                       let process_unfunded_channel_tick = |
+                                               chan_id: &[u8; 32],
+                                               chan_context: &mut ChannelContext<<SP::Target as SignerProvider>::Signer>,
+                                               unfunded_chan_context: &mut UnfundedChannelContext,
+                                       | {
+                                               chan_context.maybe_expire_prev_config();
+                                               if unfunded_chan_context.should_expire_unfunded_channel() {
+                                                       log_error!(self.logger, "Force-closing pending outbound channel {} for not establishing in a timely manner", log_bytes!(&chan_id[..]));
+                                                       update_maps_on_chan_removal!(self, &chan_context);
+                                                       self.issue_channel_close_events(&chan_context, ClosureReason::HolderForceClosed);
+                                                       self.finish_force_close_channel(chan_context.force_shutdown(false));
+                                                       false
+                                               } else {
+                                                       true
+                                               }
+                                       };
+                                       peer_state.outbound_v1_channel_by_id.retain(|chan_id, chan| process_unfunded_channel_tick(chan_id, &mut chan.context, &mut chan.unfunded_context));
+                                       peer_state.inbound_v1_channel_by_id.retain(|chan_id, chan| process_unfunded_channel_tick(chan_id, &mut chan.context, &mut chan.unfunded_context));
+
                                        if peer_state.ok_to_remove(true) {
                                                pending_peers_awaiting_removal.push(counterparty_node_id);
                                        }
@@ -4319,12 +4609,19 @@ where
        /// Gets error data to form an [`HTLCFailReason`] given a [`FailureCode`] and [`ClaimableHTLC`].
        fn get_htlc_fail_reason_from_failure_code(&self, failure_code: FailureCode, htlc: &ClaimableHTLC) -> HTLCFailReason {
                match failure_code {
-                       FailureCode::TemporaryNodeFailure => HTLCFailReason::from_failure_code(failure_code as u16),
-                       FailureCode::RequiredNodeFeatureMissing => HTLCFailReason::from_failure_code(failure_code as u16),
+                       FailureCode::TemporaryNodeFailure => HTLCFailReason::from_failure_code(failure_code.into()),
+                       FailureCode::RequiredNodeFeatureMissing => HTLCFailReason::from_failure_code(failure_code.into()),
                        FailureCode::IncorrectOrUnknownPaymentDetails => {
                                let mut htlc_msat_height_data = htlc.value.to_be_bytes().to_vec();
                                htlc_msat_height_data.extend_from_slice(&self.best_block.read().unwrap().height().to_be_bytes());
-                               HTLCFailReason::reason(failure_code as u16, htlc_msat_height_data)
+                               HTLCFailReason::reason(failure_code.into(), htlc_msat_height_data)
+                       },
+                       FailureCode::InvalidOnionPayload(data) => {
+                               let fail_data = match data {
+                                       Some((typ, offset)) => [BigSize(typ).encode(), offset.encode()].concat(),
+                                       None => Vec::new(),
+                               };
+                               HTLCFailReason::reason(failure_code.into(), fail_data)
                        }
                }
        }
@@ -4471,13 +4768,35 @@ where
        /// event matches your expectation. If you fail to do so and call this method, you may provide
        /// the sender "proof-of-payment" when they did not fulfill the full expected payment.
        ///
+       /// This function will fail the payment if it has custom TLVs with even type numbers, as we
+       /// will assume they are unknown. If you intend to accept even custom TLVs, you should use
+       /// [`claim_funds_with_known_custom_tlvs`].
+       ///
        /// [`Event::PaymentClaimable`]: crate::events::Event::PaymentClaimable
        /// [`Event::PaymentClaimable::claim_deadline`]: crate::events::Event::PaymentClaimable::claim_deadline
        /// [`Event::PaymentClaimed`]: crate::events::Event::PaymentClaimed
        /// [`process_pending_events`]: EventsProvider::process_pending_events
        /// [`create_inbound_payment`]: Self::create_inbound_payment
        /// [`create_inbound_payment_for_hash`]: Self::create_inbound_payment_for_hash
+       /// [`claim_funds_with_known_custom_tlvs`]: Self::claim_funds_with_known_custom_tlvs
        pub fn claim_funds(&self, payment_preimage: PaymentPreimage) {
+               self.claim_payment_internal(payment_preimage, false);
+       }
+
+       /// This is a variant of [`claim_funds`] that allows accepting a payment with custom TLVs with
+       /// even type numbers.
+       ///
+       /// # Note
+       ///
+       /// You MUST check you've understood all even TLVs before using this to
+       /// claim, otherwise you may unintentionally agree to some protocol you do not understand.
+       ///
+       /// [`claim_funds`]: Self::claim_funds
+       pub fn claim_funds_with_known_custom_tlvs(&self, payment_preimage: PaymentPreimage) {
+               self.claim_payment_internal(payment_preimage, true);
+       }
+
+       fn claim_payment_internal(&self, payment_preimage: PaymentPreimage, custom_tlvs_known: bool) {
                let payment_hash = PaymentHash(Sha256::hash(&payment_preimage.0).into_inner());
 
                let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self);
@@ -4504,6 +4823,23 @@ where
                                        log_error!(self.logger, "Got a duplicate pending claimable event on payment hash {}! Please report this bug",
                                                log_bytes!(payment_hash.0));
                                }
+
+                               if let Some(RecipientOnionFields { ref custom_tlvs, .. }) = payment.onion_fields {
+                                       if !custom_tlvs_known && custom_tlvs.iter().any(|(typ, _)| typ % 2 == 0) {
+                                               log_info!(self.logger, "Rejecting payment with payment hash {} as we cannot accept payment with unknown even TLVs: {}",
+                                                       log_bytes!(payment_hash.0), log_iter!(custom_tlvs.iter().map(|(typ, _)| typ).filter(|typ| *typ % 2 == 0)));
+                                               claimable_payments.pending_claiming_payments.remove(&payment_hash);
+                                               mem::drop(claimable_payments);
+                                               for htlc in payment.htlcs {
+                                                       let reason = self.get_htlc_fail_reason_from_failure_code(FailureCode::InvalidOnionPayload(None), &htlc);
+                                                       let source = HTLCSource::PreviousHopData(htlc.prev_hop);
+                                                       let receiver = HTLCDestination::FailedPayment { payment_hash };
+                                                       self.fail_htlc_backwards_internal(&source, &payment_hash, &reason, receiver);
+                                               }
+                                               return;
+                                       }
+                               }
+
                                payment.htlcs
                        } else { return; }
                };
@@ -4587,6 +4923,11 @@ where
        -> Result<(), (PublicKey, MsgHandleErrInternal)> {
                //TODO: Delay the claimed_funds relaying just like we do outbound relay!
 
+               // If we haven't yet run background events assume we're still deserializing and shouldn't
+               // actually pass `ChannelMonitorUpdate`s to users yet. Instead, queue them up as
+               // `BackgroundEvent`s.
+               let during_init = !self.background_events_processed_since_startup.load(Ordering::Acquire);
+
                {
                        let per_peer_state = self.per_peer_state.read().unwrap();
                        let chan_id = prev_hop.outpoint.to_channel_id();
@@ -4613,16 +4954,26 @@ where
                                                                log_bytes!(chan_id), action);
                                                        peer_state.monitor_update_blocked_actions.entry(chan_id).or_insert(Vec::new()).push(action);
                                                }
-                                               let update_id = monitor_update.update_id;
-                                               let update_res = self.chain_monitor.update_channel(prev_hop.outpoint, monitor_update);
-                                               let res = handle_new_monitor_update!(self, update_res, update_id, peer_state_lock,
-                                                       peer_state, per_peer_state, chan);
-                                               if let Err(e) = res {
-                                                       // TODO: This is a *critical* error - we probably updated the outbound edge
-                                                       // of the HTLC's monitor with a preimage. We should retry this monitor
-                                                       // update over and over again until morale improves.
-                                                       log_error!(self.logger, "Failed to update channel monitor with preimage {:?}", payment_preimage);
-                                                       return Err((counterparty_node_id, e));
+                                               if !during_init {
+                                                       let res = handle_new_monitor_update!(self, prev_hop.outpoint, monitor_update, peer_state_lock,
+                                                               peer_state, per_peer_state, chan);
+                                                       if let Err(e) = res {
+                                                               // TODO: This is a *critical* error - we probably updated the outbound edge
+                                                               // of the HTLC's monitor with a preimage. We should retry this monitor
+                                                               // update over and over again until morale improves.
+                                                               log_error!(self.logger, "Failed to update channel monitor with preimage {:?}", payment_preimage);
+                                                               return Err((counterparty_node_id, e));
+                                                       }
+                                               } else {
+                                                       // If we're running during init we cannot update a monitor directly -
+                                                       // they probably haven't actually been loaded yet. Instead, push the
+                                                       // monitor update as a background event.
+                                                       self.pending_background_events.lock().unwrap().push(
+                                                               BackgroundEvent::MonitorUpdateRegeneratedOnStartup {
+                                                                       counterparty_node_id,
+                                                                       funding_txo: prev_hop.outpoint,
+                                                                       update: monitor_update.clone(),
+                                                               });
                                                }
                                        }
                                        return Ok(());
@@ -4635,16 +4986,34 @@ where
                                payment_preimage,
                        }],
                };
-               // We update the ChannelMonitor on the backward link, after
-               // receiving an `update_fulfill_htlc` from the forward link.
-               let update_res = self.chain_monitor.update_channel(prev_hop.outpoint, &preimage_update);
-               if update_res != ChannelMonitorUpdateStatus::Completed {
-                       // TODO: This needs to be handled somehow - if we receive a monitor update
-                       // with a preimage we *must* somehow manage to propagate it to the upstream
-                       // channel, or we must have an ability to receive the same event and try
-                       // again on restart.
-                       log_error!(self.logger, "Critical error: failed to update channel monitor with preimage {:?}: {:?}",
-                               payment_preimage, update_res);
+
+               if !during_init {
+                       // We update the ChannelMonitor on the backward link, after
+                       // receiving an `update_fulfill_htlc` from the forward link.
+                       let update_res = self.chain_monitor.update_channel(prev_hop.outpoint, &preimage_update);
+                       if update_res != ChannelMonitorUpdateStatus::Completed {
+                               // TODO: This needs to be handled somehow - if we receive a monitor update
+                               // with a preimage we *must* somehow manage to propagate it to the upstream
+                               // channel, or we must have an ability to receive the same event and try
+                               // again on restart.
+                               log_error!(self.logger, "Critical error: failed to update channel monitor with preimage {:?}: {:?}",
+                                       payment_preimage, update_res);
+                       }
+               } else {
+                       // If we're running during init we cannot update a monitor directly - they probably
+                       // haven't actually been loaded yet. Instead, push the monitor update as a background
+                       // event.
+                       // Note that while it's safe to use `ClosedMonitorUpdateRegeneratedOnStartup` here (the
+                       // channel is already closed) we need to ultimately handle the monitor update
+                       // completion action only after we've completed the monitor update. This is the only
+                       // way to guarantee this update *will* be regenerated on startup (otherwise if this was
+                       // from a forwarded HTLC the downstream preimage may be deleted before we claim
+                       // upstream). Thus, we need to transition to some new `BackgroundEvent` type which will
+                       // complete the monitor update completion action from `completion_action`.
+                       self.pending_background_events.lock().unwrap().push(
+                               BackgroundEvent::ClosedMonitorUpdateRegeneratedOnStartup((
+                                       prev_hop.outpoint, preimage_update,
+                               )));
                }
                // Note that we do process the completion action here. This totally could be a
                // duplicate claim, but we have no way of knowing without interrogating the
@@ -4662,6 +5031,8 @@ where
        fn claim_funds_internal(&self, source: HTLCSource, payment_preimage: PaymentPreimage, forwarded_htlc_value_msat: Option<u64>, from_onchain: bool, next_channel_id: [u8; 32]) {
                match source {
                        HTLCSource::OutboundRoute { session_priv, payment_id, path, .. } => {
+                               debug_assert!(self.background_events_processed_since_startup.load(Ordering::Acquire),
+                                       "We don't support claim_htlc claims during startup - monitors may not be available yet");
                                self.pending_outbound_payments.claim_htlc(payment_id, payment_preimage, session_priv, path, from_onchain, &self.pending_events, &self.logger);
                        },
                        HTLCSource::PreviousHopData(hop_data) => {
@@ -4817,18 +5188,29 @@ where
                if peer_state_mutex_opt.is_none() { return }
                peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap();
                let peer_state = &mut *peer_state_lock;
-               let mut channel = {
-                       match peer_state.channel_by_id.entry(funding_txo.to_channel_id()){
-                               hash_map::Entry::Occupied(chan) => chan,
-                               hash_map::Entry::Vacant(_) => return,
-                       }
-               };
-               log_trace!(self.logger, "ChannelMonitor updated to {}. Current highest is {}",
-                       highest_applied_update_id, channel.get().context.get_latest_monitor_update_id());
-               if !channel.get().is_awaiting_monitor_update() || channel.get().context.get_latest_monitor_update_id() != highest_applied_update_id {
+               let channel =
+                       if let Some(chan) = peer_state.channel_by_id.get_mut(&funding_txo.to_channel_id()) {
+                               chan
+                       } else {
+                               let update_actions = peer_state.monitor_update_blocked_actions
+                                       .remove(&funding_txo.to_channel_id()).unwrap_or(Vec::new());
+                               mem::drop(peer_state_lock);
+                               mem::drop(per_peer_state);
+                               self.handle_monitor_update_completion_actions(update_actions);
+                               return;
+                       };
+               let remaining_in_flight =
+                       if let Some(pending) = peer_state.in_flight_monitor_updates.get_mut(funding_txo) {
+                               pending.retain(|upd| upd.update_id > highest_applied_update_id);
+                               pending.len()
+                       } else { 0 };
+               log_trace!(self.logger, "ChannelMonitor updated to {}. Current highest is {}. {} pending in-flight updates.",
+                       highest_applied_update_id, channel.context.get_latest_monitor_update_id(),
+                       remaining_in_flight);
+               if !channel.is_awaiting_monitor_update() || channel.context.get_latest_monitor_update_id() != highest_applied_update_id {
                        return;
                }
-               handle_monitor_update_completion!(self, highest_applied_update_id, peer_state_lock, peer_state, per_peer_state, channel.get_mut());
+               handle_monitor_update_completion!(self, peer_state_lock, peer_state, per_peer_state, channel);
        }
 
        /// Accepts a request to open a channel after a [`Event::OpenChannelRequest`].
@@ -5039,9 +5421,13 @@ where
                        return Err(MsgHandleErrInternal::send_err_msg_no_close("temporary_channel_id collision for the same peer!".to_owned(), msg.temporary_channel_id.clone()))
                } else {
                        if !self.default_configuration.manually_accept_inbound_channels {
-                               if channel.context.get_channel_type().requires_zero_conf() {
+                               let channel_type = channel.context.get_channel_type();
+                               if channel_type.requires_zero_conf() {
                                        return Err(MsgHandleErrInternal::send_err_msg_no_close("No zero confirmation channels accepted".to_owned(), msg.temporary_channel_id.clone()));
                                }
+                               if channel_type.requires_anchors_zero_fee_htlc_tx() {
+                                       return Err(MsgHandleErrInternal::send_err_msg_no_close("No channels with anchor outputs accepted".to_owned(), msg.temporary_channel_id.clone()));
+                               }
                                peer_state.pending_msg_events.push(events::MessageSendEvent::SendAcceptChannel {
                                        node_id: counterparty_node_id.clone(),
                                        msg: channel.accept_inbound_channel(user_channel_id),
@@ -5115,7 +5501,7 @@ where
                                                        let user_id = inbound_chan.context.get_user_id();
                                                        let shutdown_res = inbound_chan.context.force_shutdown(false);
                                                        return Err(MsgHandleErrInternal::from_finish_shutdown(format!("{}", err),
-                                                               msg.temporary_channel_id, user_id, shutdown_res, None));
+                                                               msg.temporary_channel_id, user_id, shutdown_res, None, inbound_chan.context.get_value_satoshis()));
                                                },
                                        }
                                },
@@ -5151,8 +5537,9 @@ where
                                let monitor_res = self.chain_monitor.watch_channel(monitor.get_funding_txo().0, monitor);
 
                                let chan = e.insert(chan);
-                               let mut res = handle_new_monitor_update!(self, monitor_res, 0, peer_state_lock, peer_state,
-                                       per_peer_state, chan, MANUALLY_REMOVING, { peer_state.channel_by_id.remove(&new_channel_id) });
+                               let mut res = handle_new_monitor_update!(self, monitor_res, peer_state_lock, peer_state,
+                                       per_peer_state, chan, MANUALLY_REMOVING_INITIAL_MONITOR,
+                                       { peer_state.channel_by_id.remove(&new_channel_id) });
 
                                // Note that we reply with the new channel_id in error messages if we gave up on the
                                // channel, not the temporary_channel_id. This is compatible with ourselves, but the
@@ -5164,7 +5551,7 @@ where
                                if let Err(MsgHandleErrInternal { shutdown_finish: Some((res, _)), .. }) = &mut res {
                                        res.0 = None;
                                }
-                               res
+                               res.map(|_| ())
                        }
                }
        }
@@ -5185,7 +5572,7 @@ where
                                let monitor = try_chan_entry!(self,
                                        chan.get_mut().funding_signed(&msg, best_block, &self.signer_provider, &self.logger), chan);
                                let update_res = self.chain_monitor.watch_channel(chan.get().context.get_funding_txo().unwrap(), monitor);
-                               let mut res = handle_new_monitor_update!(self, update_res, 0, peer_state_lock, peer_state, per_peer_state, chan);
+                               let mut res = handle_new_monitor_update!(self, update_res, peer_state_lock, peer_state, per_peer_state, chan, INITIAL_MONITOR);
                                if let Err(MsgHandleErrInternal { ref mut shutdown_finish, .. }) = res {
                                        // We weren't able to watch the channel to begin with, so no updates should be made on
                                        // it. Previously, full_stack_target found an (unreachable) panic when the
@@ -5194,7 +5581,7 @@ where
                                                shutdown_finish.0.take();
                                        }
                                }
-                               res
+                               res.map(|_| ())
                        },
                        hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel".to_owned(), msg.channel_id))
                }
@@ -5256,39 +5643,50 @@ where
                                })?;
                        let mut peer_state_lock = peer_state_mutex.lock().unwrap();
                        let peer_state = &mut *peer_state_lock;
-                       match peer_state.channel_by_id.entry(msg.channel_id.clone()) {
-                               hash_map::Entry::Occupied(mut chan_entry) => {
-
-                                       if !chan_entry.get().received_shutdown() {
-                                               log_info!(self.logger, "Received a shutdown message from our counterparty for channel {}{}.",
-                                                       log_bytes!(msg.channel_id),
-                                                       if chan_entry.get().sent_shutdown() { " after we initiated shutdown" } else { "" });
-                                       }
+                       // TODO(dunxen): Fix this duplication when we switch to a single map with enums as per
+                       // https://github.com/lightningdevkit/rust-lightning/issues/2422
+                       if let hash_map::Entry::Occupied(chan_entry) = peer_state.outbound_v1_channel_by_id.entry(msg.channel_id.clone()) {
+                               log_error!(self.logger, "Immediately closing unfunded channel {} as peer asked to cooperatively shut it down (which is unnecessary)", log_bytes!(&msg.channel_id[..]));
+                               self.issue_channel_close_events(&chan_entry.get().context, ClosureReason::CounterpartyCoopClosedUnfundedChannel);
+                               let mut chan = remove_channel!(self, chan_entry);
+                               self.finish_force_close_channel(chan.context.force_shutdown(false));
+                               return Ok(());
+                       } else if let hash_map::Entry::Occupied(chan_entry) = peer_state.inbound_v1_channel_by_id.entry(msg.channel_id.clone()) {
+                               log_error!(self.logger, "Immediately closing unfunded channel {} as peer asked to cooperatively shut it down (which is unnecessary)", log_bytes!(&msg.channel_id[..]));
+                               self.issue_channel_close_events(&chan_entry.get().context, ClosureReason::CounterpartyCoopClosedUnfundedChannel);
+                               let mut chan = remove_channel!(self, chan_entry);
+                               self.finish_force_close_channel(chan.context.force_shutdown(false));
+                               return Ok(());
+                       } else if let hash_map::Entry::Occupied(mut chan_entry) = peer_state.channel_by_id.entry(msg.channel_id.clone()) {
+                               if !chan_entry.get().received_shutdown() {
+                                       log_info!(self.logger, "Received a shutdown message from our counterparty for channel {}{}.",
+                                               log_bytes!(msg.channel_id),
+                                               if chan_entry.get().sent_shutdown() { " after we initiated shutdown" } else { "" });
+                               }
 
-                                       let funding_txo_opt = chan_entry.get().context.get_funding_txo();
-                                       let (shutdown, monitor_update_opt, htlcs) = try_chan_entry!(self,
-                                               chan_entry.get_mut().shutdown(&self.signer_provider, &peer_state.latest_features, &msg), chan_entry);
-                                       dropped_htlcs = htlcs;
+                               let funding_txo_opt = chan_entry.get().context.get_funding_txo();
+                               let (shutdown, monitor_update_opt, htlcs) = try_chan_entry!(self,
+                                       chan_entry.get_mut().shutdown(&self.signer_provider, &peer_state.latest_features, &msg), chan_entry);
+                               dropped_htlcs = htlcs;
 
-                                       if let Some(msg) = shutdown {
-                                               // We can send the `shutdown` message before updating the `ChannelMonitor`
-                                               // here as we don't need the monitor update to complete until we send a
-                                               // `shutdown_signed`, which we'll delay if we're pending a monitor update.
-                                               peer_state.pending_msg_events.push(events::MessageSendEvent::SendShutdown {
-                                                       node_id: *counterparty_node_id,
-                                                       msg,
-                                               });
-                                       }
+                               if let Some(msg) = shutdown {
+                                       // We can send the `shutdown` message before updating the `ChannelMonitor`
+                                       // here as we don't need the monitor update to complete until we send a
+                                       // `shutdown_signed`, which we'll delay if we're pending a monitor update.
+                                       peer_state.pending_msg_events.push(events::MessageSendEvent::SendShutdown {
+                                               node_id: *counterparty_node_id,
+                                               msg,
+                                       });
+                               }
 
-                                       // Update the monitor with the shutdown script if necessary.
-                                       if let Some(monitor_update) = monitor_update_opt {
-                                               let update_id = monitor_update.update_id;
-                                               let update_res = self.chain_monitor.update_channel(funding_txo_opt.unwrap(), monitor_update);
-                                               break handle_new_monitor_update!(self, update_res, update_id, peer_state_lock, peer_state, per_peer_state, chan_entry);
-                                       }
-                                       break Ok(());
-                               },
-                               hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id))
+                               // Update the monitor with the shutdown script if necessary.
+                               if let Some(monitor_update) = monitor_update_opt {
+                                       break handle_new_monitor_update!(self, funding_txo_opt.unwrap(), monitor_update,
+                                               peer_state_lock, peer_state, per_peer_state, chan_entry).map(|_| ());
+                               }
+                               break Ok(());
+                       } else {
+                               return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id))
                        }
                };
                for htlc_source in dropped_htlcs.drain(..) {
@@ -5358,7 +5756,7 @@ where
                //encrypted with the same key. It's not immediately obvious how to usefully exploit that,
                //but we should prevent it anyway.
 
-               let pending_forward_info = self.decode_update_add_htlc_onion(msg);
+               let decoded_hop_res = self.decode_update_add_htlc_onion(msg);
                let per_peer_state = self.per_peer_state.read().unwrap();
                let peer_state_mutex = per_peer_state.get(counterparty_node_id)
                        .ok_or_else(|| {
@@ -5370,6 +5768,12 @@ where
                match peer_state.channel_by_id.entry(msg.channel_id) {
                        hash_map::Entry::Occupied(mut chan) => {
 
+                               let pending_forward_info = match decoded_hop_res {
+                                       Ok((next_hop, shared_secret, next_packet_pk_opt)) =>
+                                               self.construct_pending_htlc_status(msg, shared_secret, next_hop,
+                                                       chan.get().context.config().accept_underpaying_htlcs, next_packet_pk_opt),
+                                       Err(e) => PendingHTLCStatus::Fail(e)
+                               };
                                let create_pending_htlc_status = |chan: &Channel<<SP::Target as SignerProvider>::Signer>, pending_forward_info: PendingHTLCStatus, error_code: u16| {
                                        // If the update_add is completely bogus, the call will Err and we will close,
                                        // but if we've sent a shutdown and they haven't acknowledged it yet, we just
@@ -5392,7 +5796,7 @@ where
                                                _ => pending_forward_info
                                        }
                                };
-                               try_chan_entry!(self, chan.get_mut().update_add_htlc(&msg, pending_forward_info, create_pending_htlc_status, &self.logger), chan);
+                               try_chan_entry!(self, chan.get_mut().update_add_htlc(&msg, pending_forward_info, create_pending_htlc_status, &self.fee_estimator, &self.logger), chan);
                        },
                        hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id))
                }
@@ -5474,10 +5878,8 @@ where
                                let funding_txo = chan.get().context.get_funding_txo();
                                let monitor_update_opt = try_chan_entry!(self, chan.get_mut().commitment_signed(&msg, &self.logger), chan);
                                if let Some(monitor_update) = monitor_update_opt {
-                                       let update_res = self.chain_monitor.update_channel(funding_txo.unwrap(), monitor_update);
-                                       let update_id = monitor_update.update_id;
-                                       handle_new_monitor_update!(self, update_res, update_id, peer_state_lock,
-                                               peer_state, per_peer_state, chan)
+                                       handle_new_monitor_update!(self, funding_txo.unwrap(), monitor_update, peer_state_lock,
+                                               peer_state, per_peer_state, chan).map(|_| ())
                                } else { Ok(()) }
                        },
                        hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id))
@@ -5567,22 +5969,27 @@ where
                }
        }
 
-       // We only want to push a PendingHTLCsForwardable event if no others are queued.
        fn push_pending_forwards_ev(&self) {
                let mut pending_events = self.pending_events.lock().unwrap();
-               let forward_ev_exists = pending_events.iter()
-                       .find(|(ev, _)| if let events::Event::PendingHTLCsForwardable { .. } = ev { true } else { false })
-                       .is_some();
-               if !forward_ev_exists {
-                       pending_events.push_back((events::Event::PendingHTLCsForwardable {
-                               time_forwardable:
-                                       Duration::from_millis(MIN_HTLC_RELAY_HOLDING_CELL_MILLIS),
+               let is_processing_events = self.pending_events_processor.load(Ordering::Acquire);
+               let num_forward_events = pending_events.iter().filter(|(ev, _)|
+                       if let events::Event::PendingHTLCsForwardable { .. } = ev { true } else { false }
+               ).count();
+               // We only want to push a PendingHTLCsForwardable event if no others are queued. Processing
+               // events is done in batches and they are not removed until we're done processing each
+               // batch. Since handling a `PendingHTLCsForwardable` event will call back into the
+               // `ChannelManager`, we'll still see the original forwarding event not removed. Phantom
+               // payments will need an additional forwarding event before being claimed to make them look
+               // real by taking more time.
+               if (is_processing_events && num_forward_events <= 1) || num_forward_events < 1 {
+                       pending_events.push_back((Event::PendingHTLCsForwardable {
+                               time_forwardable: Duration::from_millis(MIN_HTLC_RELAY_HOLDING_CELL_MILLIS),
                        }, None));
                }
        }
 
        /// Checks whether [`ChannelMonitorUpdate`]s generated by the receipt of a remote
-       /// [`msgs::RevokeAndACK`] should be held for the given channel until some other event
+       /// [`msgs::RevokeAndACK`] should be held for the given channel until some other action
        /// completes. Note that this needs to happen in the same [`PeerState`] mutex as any release of
        /// the [`ChannelMonitorUpdate`] in question.
        fn raa_monitor_updates_held(&self,
@@ -5611,12 +6018,10 @@ where
                        match peer_state.channel_by_id.entry(msg.channel_id) {
                                hash_map::Entry::Occupied(mut chan) => {
                                        let funding_txo = chan.get().context.get_funding_txo();
-                                       let (htlcs_to_fail, monitor_update_opt) = try_chan_entry!(self, chan.get_mut().revoke_and_ack(&msg, &self.logger), chan);
+                                       let (htlcs_to_fail, monitor_update_opt) = try_chan_entry!(self, chan.get_mut().revoke_and_ack(&msg, &self.fee_estimator, &self.logger), chan);
                                        let res = if let Some(monitor_update) = monitor_update_opt {
-                                               let update_res = self.chain_monitor.update_channel(funding_txo.unwrap(), monitor_update);
-                                               let update_id = monitor_update.update_id;
-                                               handle_new_monitor_update!(self, update_res, update_id,
-                                                       peer_state_lock, peer_state, per_peer_state, chan)
+                                               handle_new_monitor_update!(self, funding_txo.unwrap(), monitor_update,
+                                                       peer_state_lock, peer_state, per_peer_state, chan).map(|_| ())
                                        } else { Ok(()) };
                                        (htlcs_to_fail, res)
                                },
@@ -5884,18 +6289,15 @@ where
                                                let counterparty_node_id = chan.context.get_counterparty_node_id();
                                                let funding_txo = chan.context.get_funding_txo();
                                                let (monitor_opt, holding_cell_failed_htlcs) =
-                                                       chan.maybe_free_holding_cell_htlcs(&self.logger);
+                                                       chan.maybe_free_holding_cell_htlcs(&self.fee_estimator, &self.logger);
                                                if !holding_cell_failed_htlcs.is_empty() {
                                                        failed_htlcs.push((holding_cell_failed_htlcs, *channel_id, counterparty_node_id));
                                                }
                                                if let Some(monitor_update) = monitor_opt {
                                                        has_monitor_update = true;
 
-                                                       let update_res = self.chain_monitor.update_channel(
-                                                               funding_txo.expect("channel is live"), monitor_update);
-                                                       let update_id = monitor_update.update_id;
                                                        let channel_id: [u8; 32] = *channel_id;
-                                                       let res = handle_new_monitor_update!(self, update_res, update_id,
+                                                       let res = handle_new_monitor_update!(self, funding_txo.unwrap(), monitor_update,
                                                                peer_state_lock, peer_state, per_peer_state, chan, MANUALLY_REMOVING,
                                                                peer_state.channel_by_id.remove(&channel_id));
                                                        if res.is_err() {
@@ -6204,7 +6606,7 @@ where
        /// When something which was blocking a channel from updating its [`ChannelMonitor`] (e.g. an
        /// [`Event`] being handled) completes, this should be called to restore the channel to normal
        /// operation. It will double-check that nothing *else* is also blocking the same channel from
-       /// making progress and then any blocked [`ChannelMonitorUpdate`]s fly.
+       /// making progress and then let any blocked [`ChannelMonitorUpdate`]s fly.
        fn handle_monitor_update_release(&self, counterparty_node_id: PublicKey, channel_funding_outpoint: OutPoint, mut completed_blocker: Option<RAAMonitorUpdateBlockingAction>) {
                let mut errors = Vec::new();
                loop {
@@ -6237,9 +6639,7 @@ where
                                        if let Some((monitor_update, further_update_exists)) = chan.get_mut().unblock_next_blocked_monitor_update() {
                                                log_debug!(self.logger, "Unlocking monitor updating for channel {} and updating monitor",
                                                        log_bytes!(&channel_funding_outpoint.to_channel_id()[..]));
-                                               let update_res = self.chain_monitor.update_channel(channel_funding_outpoint, monitor_update);
-                                               let update_id = monitor_update.update_id;
-                                               if let Err(e) = handle_new_monitor_update!(self, update_res, update_id,
+                                               if let Err(e) = handle_new_monitor_update!(self, channel_funding_outpoint, monitor_update,
                                                        peer_state_lck, peer_state, per_peer_state, chan)
                                                {
                                                        errors.push((e, counterparty_node_id));
@@ -6701,13 +7101,13 @@ where
                provided_node_features(&self.default_configuration)
        }
 
-       /// Fetches the set of [`InvoiceFeatures`] flags which are provided by or required by
+       /// Fetches the set of [`Bolt11InvoiceFeatures`] flags which are provided by or required by
        /// [`ChannelManager`].
        ///
        /// Note that the invoice feature flags can vary depending on if the invoice is a "phantom invoice"
        /// or not. Thus, this method is not public.
        #[cfg(any(feature = "_test_utils", test))]
-       pub fn invoice_features(&self) -> InvoiceFeatures {
+       pub fn invoice_features(&self) -> Bolt11InvoiceFeatures {
                provided_invoice_features(&self.default_configuration)
        }
 
@@ -6960,6 +7360,7 @@ where
                                                inbound_v1_channel_by_id: HashMap::new(),
                                                latest_features: init_msg.features.clone(),
                                                pending_msg_events: Vec::new(),
+                                               in_flight_monitor_updates: BTreeMap::new(),
                                                monitor_update_blocked_actions: BTreeMap::new(),
                                                actions_blocking_raa_monitor_updates: BTreeMap::new(),
                                                is_connected: true,
@@ -6986,37 +7387,20 @@ where
                log_debug!(self.logger, "Generating channel_reestablish events for {}", log_pubkey!(counterparty_node_id));
 
                let per_peer_state = self.per_peer_state.read().unwrap();
-               for (_cp_id, peer_state_mutex) in per_peer_state.iter() {
+               if let Some(peer_state_mutex) = per_peer_state.get(counterparty_node_id) {
                        let mut peer_state_lock = peer_state_mutex.lock().unwrap();
                        let peer_state = &mut *peer_state_lock;
                        let pending_msg_events = &mut peer_state.pending_msg_events;
-                       peer_state.channel_by_id.retain(|_, chan| {
-                               let retain = if chan.context.get_counterparty_node_id() == *counterparty_node_id {
-                                       if !chan.context.have_received_message() {
-                                               // If we created this (outbound) channel while we were disconnected from the
-                                               // peer we probably failed to send the open_channel message, which is now
-                                               // lost. We can't have had anything pending related to this channel, so we just
-                                               // drop it.
-                                               false
-                                       } else {
-                                               pending_msg_events.push(events::MessageSendEvent::SendChannelReestablish {
-                                                       node_id: chan.context.get_counterparty_node_id(),
-                                                       msg: chan.get_channel_reestablish(&self.logger),
-                                               });
-                                               true
-                                       }
-                               } else { true };
-                               if retain && chan.context.get_counterparty_node_id() != *counterparty_node_id {
-                                       if let Some(msg) = chan.get_signed_channel_announcement(&self.node_signer, self.genesis_hash.clone(), self.best_block.read().unwrap().height(), &self.default_configuration) {
-                                               if let Ok(update_msg) = self.get_channel_update_for_broadcast(chan) {
-                                                       pending_msg_events.push(events::MessageSendEvent::SendChannelAnnouncement {
-                                                               node_id: *counterparty_node_id,
-                                                               msg, update_msg,
-                                                       });
-                                               }
-                                       }
-                               }
-                               retain
+
+                       // Since unfunded channel maps are cleared upon disconnecting a peer, and they're not persisted
+                       // (so won't be recovered after a crash) we don't need to bother closing unfunded channels and
+                       // clearing their maps here. Instead we can just send queue channel_reestablish messages for
+                       // channels in the channel_by_id map.
+                       peer_state.channel_by_id.iter_mut().for_each(|(_, chan)| {
+                               pending_msg_events.push(events::MessageSendEvent::SendChannelReestablish {
+                                       node_id: chan.context.get_counterparty_node_id(),
+                                       msg: chan.get_channel_reestablish(&self.logger),
+                               });
                        });
                }
                //TODO: Also re-broadcast announcement_signatures
@@ -7050,7 +7434,7 @@ where
                                let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap();
                                let peer_state = &mut *peer_state_lock;
                                if let Some(chan) = peer_state.outbound_v1_channel_by_id.get_mut(&msg.channel_id) {
-                                       if let Ok(msg) = chan.maybe_handle_error_without_close(self.genesis_hash) {
+                                       if let Ok(msg) = chan.maybe_handle_error_without_close(self.genesis_hash, &self.fee_estimator) {
                                                peer_state.pending_msg_events.push(events::MessageSendEvent::SendOpenChannel {
                                                        node_id: *counterparty_node_id,
                                                        msg,
@@ -7135,16 +7519,18 @@ where
 /// Fetches the set of [`NodeFeatures`] flags which are provided by or required by
 /// [`ChannelManager`].
 pub(crate) fn provided_node_features(config: &UserConfig) -> NodeFeatures {
-       provided_init_features(config).to_context()
+       let mut node_features = provided_init_features(config).to_context();
+       node_features.set_keysend_optional();
+       node_features
 }
 
-/// Fetches the set of [`InvoiceFeatures`] flags which are provided by or required by
+/// Fetches the set of [`Bolt11InvoiceFeatures`] flags which are provided by or required by
 /// [`ChannelManager`].
 ///
 /// Note that the invoice feature flags can vary depending on if the invoice is a "phantom invoice"
 /// or not. Thus, this method is not public.
 #[cfg(any(feature = "_test_utils", test))]
-pub(crate) fn provided_invoice_features(config: &UserConfig) -> InvoiceFeatures {
+pub(crate) fn provided_invoice_features(config: &UserConfig) -> Bolt11InvoiceFeatures {
        provided_init_features(config).to_context()
 }
 
@@ -7162,7 +7548,7 @@ pub(crate) fn provided_channel_type_features(config: &UserConfig) -> ChannelType
 
 /// Fetches the set of [`InitFeatures`] flags which are provided by or required by
 /// [`ChannelManager`].
-pub fn provided_init_features(_config: &UserConfig) -> InitFeatures {
+pub fn provided_init_features(config: &UserConfig) -> InitFeatures {
        // Note that if new features are added here which other peers may (eventually) require, we
        // should also add the corresponding (optional) bit to the [`ChannelMessageHandler`] impl for
        // [`ErroringMessageHandler`].
@@ -7178,11 +7564,8 @@ pub fn provided_init_features(_config: &UserConfig) -> InitFeatures {
        features.set_channel_type_optional();
        features.set_scid_privacy_optional();
        features.set_zero_conf_optional();
-       #[cfg(anchors)]
-       { // Attributes are not allowed on if expressions on our current MSRV of 1.41.
-               if _config.channel_handshake_config.negotiate_anchors_zero_fee_htlc_tx {
-                       features.set_anchors_zero_fee_htlc_tx_optional();
-               }
+       if config.channel_handshake_config.negotiate_anchors_zero_fee_htlc_tx {
+               features.set_anchors_zero_fee_htlc_tx_optional();
        }
        features
 }
@@ -7239,6 +7622,7 @@ impl Writeable for ChannelDetails {
                        (35, self.inbound_htlc_maximum_msat, option),
                        (37, user_channel_id_high_opt, option),
                        (39, self.feerate_sat_per_1000_weight, option),
+                       (41, self.channel_shutdown_state, option),
                });
                Ok(())
        }
@@ -7276,6 +7660,7 @@ impl Readable for ChannelDetails {
                        (35, inbound_htlc_maximum_msat, option),
                        (37, user_channel_id_high_opt, option),
                        (39, feerate_sat_per_1000_weight, option),
+                       (41, channel_shutdown_state, option),
                });
 
                // `user_channel_id` used to be a single u64 value. In order to remain backwards compatible with
@@ -7311,12 +7696,13 @@ impl Readable for ChannelDetails {
                        inbound_htlc_minimum_msat,
                        inbound_htlc_maximum_msat,
                        feerate_sat_per_1000_weight,
+                       channel_shutdown_state,
                })
        }
 }
 
 impl_writeable_tlv_based!(PhantomRouteHints, {
-       (2, channels, vec_type),
+       (2, channels, required_vec),
        (4, phantom_scid, required),
        (6, real_node_pubkey, required),
 });
@@ -7331,12 +7717,14 @@ impl_writeable_tlv_based_enum!(PendingHTLCRouting,
                (1, phantom_shared_secret, option),
                (2, incoming_cltv_expiry, required),
                (3, payment_metadata, option),
+               (5, custom_tlvs, optional_vec),
        },
        (2, ReceiveKeysend) => {
                (0, payment_preimage, required),
                (2, incoming_cltv_expiry, required),
                (3, payment_metadata, option),
                (4, payment_data, option), // Added in 0.0.116
+               (5, custom_tlvs, optional_vec),
        },
 ;);
 
@@ -7347,6 +7735,7 @@ impl_writeable_tlv_based!(PendingHTLCInfo, {
        (6, outgoing_amt_msat, required),
        (8, outgoing_cltv_value, required),
        (9, incoming_amt_msat, option),
+       (10, skimmed_fee_msat, option),
 });
 
 
@@ -7445,6 +7834,7 @@ impl Writeable for ClaimableHTLC {
                        (5, self.total_value_received, option),
                        (6, self.cltv_expiry, required),
                        (8, keysend_preimage, option),
+                       (10, self.counterparty_skimmed_fee_msat, option),
                });
                Ok(())
        }
@@ -7452,24 +7842,19 @@ impl Writeable for ClaimableHTLC {
 
 impl Readable for ClaimableHTLC {
        fn read<R: Read>(reader: &mut R) -> Result<Self, DecodeError> {
-               let mut prev_hop = crate::util::ser::RequiredWrapper(None);
-               let mut value = 0;
-               let mut sender_intended_value = None;
-               let mut payment_data: Option<msgs::FinalOnionHopData> = None;
-               let mut cltv_expiry = 0;
-               let mut total_value_received = None;
-               let mut total_msat = None;
-               let mut keysend_preimage: Option<PaymentPreimage> = None;
-               read_tlv_fields!(reader, {
+               _init_and_read_tlv_fields!(reader, {
                        (0, prev_hop, required),
                        (1, total_msat, option),
-                       (2, value, required),
+                       (2, value_ser, required),
                        (3, sender_intended_value, option),
-                       (4, payment_data, option),
+                       (4, payment_data_opt, option),
                        (5, total_value_received, option),
                        (6, cltv_expiry, required),
-                       (8, keysend_preimage, option)
+                       (8, keysend_preimage, option),
+                       (10, counterparty_skimmed_fee_msat, option),
                });
+               let payment_data: Option<msgs::FinalOnionHopData> = payment_data_opt;
+               let value = value_ser.0.unwrap();
                let onion_payload = match keysend_preimage {
                        Some(p) => {
                                if payment_data.is_some() {
@@ -7498,7 +7883,8 @@ impl Readable for ClaimableHTLC {
                        total_value_received,
                        total_msat: total_msat.unwrap(),
                        onion_payload,
-                       cltv_expiry,
+                       cltv_expiry: cltv_expiry.0.unwrap(),
+                       counterparty_skimmed_fee_msat,
                })
        }
 }
@@ -7510,7 +7896,7 @@ impl Readable for HTLCSource {
                        0 => {
                                let mut session_priv: crate::util::ser::RequiredWrapper<SecretKey> = crate::util::ser::RequiredWrapper(None);
                                let mut first_hop_htlc_msat: u64 = 0;
-                               let mut path_hops: Option<Vec<RouteHop>> = Some(Vec::new());
+                               let mut path_hops = Vec::new();
                                let mut payment_id = None;
                                let mut payment_params: Option<PaymentParameters> = None;
                                let mut blinded_tail: Option<BlindedTail> = None;
@@ -7518,7 +7904,7 @@ impl Readable for HTLCSource {
                                        (0, session_priv, required),
                                        (1, payment_id, option),
                                        (2, first_hop_htlc_msat, required),
-                                       (4, path_hops, vec_type),
+                                       (4, path_hops, required_vec),
                                        (5, payment_params, (option: ReadableArgs, 0)),
                                        (6, blinded_tail, option),
                                });
@@ -7527,7 +7913,7 @@ impl Readable for HTLCSource {
                                        // instead.
                                        payment_id = Some(PaymentId(*session_priv.0.unwrap().as_ref()));
                                }
-                               let path = Path { hops: path_hops.ok_or(DecodeError::InvalidValue)?, blinded_tail };
+                               let path = Path { hops: path_hops, blinded_tail };
                                if path.hops.len() == 0 {
                                        return Err(DecodeError::InvalidValue);
                                }
@@ -7562,7 +7948,7 @@ impl Writeable for HTLCSource {
                                        (1, payment_id_opt, option),
                                        (2, first_hop_htlc_msat, required),
                                        // 3 was previously used to write a PaymentSecret for the payment.
-                                       (4, path.hops, vec_type),
+                                       (4, path.hops, required_vec),
                                        (5, None::<PaymentParameters>, option), // payment_params in LDK versions prior to 0.0.115
                                        (6, path.blinded_tail, option),
                                 });
@@ -7793,6 +8179,16 @@ where
                        pending_claiming_payments = None;
                }
 
+               let mut in_flight_monitor_updates: Option<HashMap<(&PublicKey, &OutPoint), &Vec<ChannelMonitorUpdate>>> = None;
+               for ((counterparty_id, _), peer_state) in per_peer_state.iter().zip(peer_states.iter()) {
+                       for (funding_outpoint, updates) in peer_state.in_flight_monitor_updates.iter() {
+                               if !updates.is_empty() {
+                                       if in_flight_monitor_updates.is_none() { in_flight_monitor_updates = Some(HashMap::new()); }
+                                       in_flight_monitor_updates.as_mut().unwrap().insert((counterparty_id, funding_outpoint), updates);
+                               }
+                       }
+               }
+
                write_tlv_fields!(writer, {
                        (1, pending_outbound_payments_no_retry, required),
                        (2, pending_intercepted_htlcs, option),
@@ -7802,7 +8198,8 @@ where
                        (6, monitor_update_blocked_actions_per_peer, option),
                        (7, self.fake_scid_rand_bytes, required),
                        (8, if events_not_backwards_compatible { Some(&*events) } else { None }, option),
-                       (9, htlc_purposes, vec_type),
+                       (9, htlc_purposes, required_vec),
+                       (10, in_flight_monitor_updates, option),
                        (11, self.probing_cookie_secret, required),
                        (13, htlc_onion_fields, optional_vec),
                });
@@ -7852,6 +8249,14 @@ impl Readable for VecDeque<(Event, Option<EventCompletionAction>)> {
        }
 }
 
+impl_writeable_tlv_based_enum!(ChannelShutdownState,
+       (0, NotShuttingDown) => {},
+       (2, ShutdownInitiated) => {},
+       (4, ResolvingHTLCs) => {},
+       (6, NegotiatingClosingFee) => {},
+       (8, ShutdownComplete) => {}, ;
+);
+
 /// Arguments for the creation of a ChannelManager that are not deserialized.
 ///
 /// At a high-level, the process for deserializing a ChannelManager and resuming normal operation
@@ -8019,7 +8424,7 @@ where
                let mut id_to_peer = HashMap::with_capacity(cmp::min(channel_count as usize, 128));
                let mut short_to_chan_info = HashMap::with_capacity(cmp::min(channel_count as usize, 128));
                let mut channel_closures = VecDeque::new();
-               let mut pending_background_events = Vec::new();
+               let mut close_background_events = Vec::new();
                for _ in 0..channel_count {
                        let mut channel: Channel<<SP::Target as SignerProvider>::Signer> = Channel::read(reader, (
                                &args.entropy_source, &args.signer_provider, best_block_height, &provided_channel_type_features(&args.default_config)
@@ -8027,17 +8432,7 @@ where
                        let funding_txo = channel.context.get_funding_txo().ok_or(DecodeError::InvalidValue)?;
                        funding_txo_set.insert(funding_txo.clone());
                        if let Some(ref mut monitor) = args.channel_monitors.get_mut(&funding_txo) {
-                               if channel.get_latest_complete_monitor_update_id() > monitor.get_latest_update_id() {
-                                       // If the channel is ahead of the monitor, return InvalidValue:
-                                       log_error!(args.logger, "A ChannelMonitor is stale compared to the current ChannelManager! This indicates a potentially-critical violation of the chain::Watch API!");
-                                       log_error!(args.logger, " The ChannelMonitor for channel {} is at update_id {} but the ChannelManager is at update_id {}.",
-                                               log_bytes!(channel.context.channel_id()), monitor.get_latest_update_id(), channel.get_latest_complete_monitor_update_id());
-                                       log_error!(args.logger, " The chain::Watch API *requires* that monitors are persisted durably before returning,");
-                                       log_error!(args.logger, " client applications must ensure that ChannelMonitor data is always available and the latest to avoid funds loss!");
-                                       log_error!(args.logger, " Without the latest ChannelMonitor we cannot continue without risking funds.");
-                                       log_error!(args.logger, " Please ensure the chain::Watch API requirements are met and file a bug report at https://github.com/lightningdevkit/rust-lightning");
-                                       return Err(DecodeError::InvalidValue);
-                               } else if channel.get_cur_holder_commitment_transaction_number() > monitor.get_cur_holder_commitment_number() ||
+                               if channel.get_cur_holder_commitment_transaction_number() > monitor.get_cur_holder_commitment_number() ||
                                                channel.get_revoked_counterparty_commitment_transaction_number() > monitor.get_min_seen_secret() ||
                                                channel.get_cur_counterparty_commitment_transaction_number() > monitor.get_cur_counterparty_commitment_number() ||
                                                channel.context.get_latest_monitor_update_id() < monitor.get_latest_update_id() {
@@ -8048,7 +8443,7 @@ where
                                                log_bytes!(channel.context.channel_id()), monitor.get_latest_update_id(), channel.context.get_latest_monitor_update_id());
                                        let (monitor_update, mut new_failed_htlcs) = channel.context.force_shutdown(true);
                                        if let Some((counterparty_node_id, funding_txo, update)) = monitor_update {
-                                               pending_background_events.push(BackgroundEvent::MonitorUpdateRegeneratedOnStartup {
+                                               close_background_events.push(BackgroundEvent::MonitorUpdateRegeneratedOnStartup {
                                                        counterparty_node_id, funding_txo, update
                                                });
                                        }
@@ -8056,7 +8451,9 @@ where
                                        channel_closures.push_back((events::Event::ChannelClosed {
                                                channel_id: channel.context.channel_id(),
                                                user_channel_id: channel.context.get_user_id(),
-                                               reason: ClosureReason::OutdatedChannelManager
+                                               reason: ClosureReason::OutdatedChannelManager,
+                                               counterparty_node_id: Some(channel.context.get_counterparty_node_id()),
+                                               channel_capacity_sats: Some(channel.context.get_value_satoshis()),
                                        }, None));
                                        for (channel_htlc_source, payment_hash) in channel.inflight_htlc_sources() {
                                                let mut found_htlc = false;
@@ -8081,7 +8478,6 @@ where
                                        log_info!(args.logger, "Successfully loaded channel {} at update_id {} against monitor at update id {}",
                                                log_bytes!(channel.context.channel_id()), channel.context.get_latest_monitor_update_id(),
                                                monitor.get_latest_update_id());
-                                       channel.complete_all_mon_updates_through(monitor.get_latest_update_id());
                                        if let Some(short_channel_id) = channel.context.get_short_channel_id() {
                                                short_to_chan_info.insert(short_channel_id, (channel.context.get_counterparty_node_id(), channel.context.channel_id()));
                                        }
@@ -8109,6 +8505,8 @@ where
                                        channel_id: channel.context.channel_id(),
                                        user_channel_id: channel.context.get_user_id(),
                                        reason: ClosureReason::DisconnectedPeer,
+                                       counterparty_node_id: Some(channel.context.get_counterparty_node_id()),
+                                       channel_capacity_sats: Some(channel.context.get_value_satoshis()),
                                }, None));
                        } else {
                                log_error!(args.logger, "Missing ChannelMonitor for channel {} needed by ChannelManager.", log_bytes!(channel.context.channel_id()));
@@ -8128,7 +8526,7 @@ where
                                        update_id: CLOSED_CHANNEL_UPDATE_ID,
                                        updates: vec![ChannelMonitorUpdateStep::ChannelForceClosed { should_broadcast: true }],
                                };
-                               pending_background_events.push(BackgroundEvent::ClosingMonitorUpdateRegeneratedOnStartup((*funding_txo, monitor_update)));
+                               close_background_events.push(BackgroundEvent::ClosedMonitorUpdateRegeneratedOnStartup((*funding_txo, monitor_update)));
                        }
                }
 
@@ -8157,20 +8555,27 @@ where
                        claimable_htlcs_list.push((payment_hash, previous_hops));
                }
 
-               let peer_count: u64 = Readable::read(reader)?;
-               let mut per_peer_state = HashMap::with_capacity(cmp::min(peer_count as usize, MAX_ALLOC_SIZE/mem::size_of::<(PublicKey, Mutex<PeerState<<SP::Target as SignerProvider>::Signer>>)>()));
-               for _ in 0..peer_count {
-                       let peer_pubkey = Readable::read(reader)?;
-                       let peer_state = PeerState {
-                               channel_by_id: peer_channels.remove(&peer_pubkey).unwrap_or(HashMap::new()),
+               let peer_state_from_chans = |channel_by_id| {
+                       PeerState {
+                               channel_by_id,
                                outbound_v1_channel_by_id: HashMap::new(),
                                inbound_v1_channel_by_id: HashMap::new(),
-                               latest_features: Readable::read(reader)?,
+                               latest_features: InitFeatures::empty(),
                                pending_msg_events: Vec::new(),
+                               in_flight_monitor_updates: BTreeMap::new(),
                                monitor_update_blocked_actions: BTreeMap::new(),
                                actions_blocking_raa_monitor_updates: BTreeMap::new(),
                                is_connected: false,
-                       };
+                       }
+               };
+
+               let peer_count: u64 = Readable::read(reader)?;
+               let mut per_peer_state = HashMap::with_capacity(cmp::min(peer_count as usize, MAX_ALLOC_SIZE/mem::size_of::<(PublicKey, Mutex<PeerState<<SP::Target as SignerProvider>::Signer>>)>()));
+               for _ in 0..peer_count {
+                       let peer_pubkey = Readable::read(reader)?;
+                       let peer_chans = peer_channels.remove(&peer_pubkey).unwrap_or(HashMap::new());
+                       let mut peer_state = peer_state_from_chans(peer_chans);
+                       peer_state.latest_features = Readable::read(reader)?;
                        per_peer_state.insert(peer_pubkey, Mutex::new(peer_state));
                }
 
@@ -8198,24 +8603,6 @@ where
                        }
                }
 
-               for (node_id, peer_mtx) in per_peer_state.iter() {
-                       let peer_state = peer_mtx.lock().unwrap();
-                       for (_, chan) in peer_state.channel_by_id.iter() {
-                               for update in chan.uncompleted_unblocked_mon_updates() {
-                                       if let Some(funding_txo) = chan.context.get_funding_txo() {
-                                               log_trace!(args.logger, "Replaying ChannelMonitorUpdate {} for channel {}",
-                                                       update.update_id, log_bytes!(funding_txo.to_channel_id()));
-                                               pending_background_events.push(
-                                                       BackgroundEvent::MonitorUpdateRegeneratedOnStartup {
-                                                               counterparty_node_id: *node_id, funding_txo, update: update.clone(),
-                                                       });
-                                       } else {
-                                               return Err(DecodeError::InvalidValue);
-                                       }
-                               }
-                       }
-               }
-
                let _last_node_announcement_serial: u32 = Readable::read(reader)?; // Only used < 0.0.111
                let highest_seen_timestamp: u32 = Readable::read(reader)?;
 
@@ -8252,6 +8639,7 @@ where
                let mut pending_claiming_payments = Some(HashMap::new());
                let mut monitor_update_blocked_actions_per_peer: Option<Vec<(_, BTreeMap<_, Vec<_>>)>> = Some(Vec::new());
                let mut events_override = None;
+               let mut in_flight_monitor_updates: Option<HashMap<(PublicKey, OutPoint), Vec<ChannelMonitorUpdate>>> = None;
                read_tlv_fields!(reader, {
                        (1, pending_outbound_payments_no_retry, option),
                        (2, pending_intercepted_htlcs, option),
@@ -8261,7 +8649,8 @@ where
                        (6, monitor_update_blocked_actions_per_peer, option),
                        (7, fake_scid_rand_bytes, option),
                        (8, events_override, option),
-                       (9, claimable_htlc_purposes, vec_type),
+                       (9, claimable_htlc_purposes, optional_vec),
+                       (10, in_flight_monitor_updates, option),
                        (11, probing_cookie_secret, option),
                        (13, claimable_htlc_onion_fields, optional_vec),
                });
@@ -8295,6 +8684,118 @@ where
                        retry_lock: Mutex::new(())
                };
 
+               // We have to replay (or skip, if they were completed after we wrote the `ChannelManager`)
+               // each `ChannelMonitorUpdate` in `in_flight_monitor_updates`. After doing so, we have to
+               // check that each channel we have isn't newer than the latest `ChannelMonitorUpdate`(s) we
+               // replayed, and for each monitor update we have to replay we have to ensure there's a
+               // `ChannelMonitor` for it.
+               //
+               // In order to do so we first walk all of our live channels (so that we can check their
+               // state immediately after doing the update replays, when we have the `update_id`s
+               // available) and then walk any remaining in-flight updates.
+               //
+               // Because the actual handling of the in-flight updates is the same, it's macro'ized here:
+               let mut pending_background_events = Vec::new();
+               macro_rules! handle_in_flight_updates {
+                       ($counterparty_node_id: expr, $chan_in_flight_upds: expr, $funding_txo: expr,
+                        $monitor: expr, $peer_state: expr, $channel_info_log: expr
+                       ) => { {
+                               let mut max_in_flight_update_id = 0;
+                               $chan_in_flight_upds.retain(|upd| upd.update_id > $monitor.get_latest_update_id());
+                               for update in $chan_in_flight_upds.iter() {
+                                       log_trace!(args.logger, "Replaying ChannelMonitorUpdate {} for {}channel {}",
+                                               update.update_id, $channel_info_log, log_bytes!($funding_txo.to_channel_id()));
+                                       max_in_flight_update_id = cmp::max(max_in_flight_update_id, update.update_id);
+                                       pending_background_events.push(
+                                               BackgroundEvent::MonitorUpdateRegeneratedOnStartup {
+                                                       counterparty_node_id: $counterparty_node_id,
+                                                       funding_txo: $funding_txo,
+                                                       update: update.clone(),
+                                               });
+                               }
+                               if $chan_in_flight_upds.is_empty() {
+                                       // We had some updates to apply, but it turns out they had completed before we
+                                       // were serialized, we just weren't notified of that. Thus, we may have to run
+                                       // the completion actions for any monitor updates, but otherwise are done.
+                                       pending_background_events.push(
+                                               BackgroundEvent::MonitorUpdatesComplete {
+                                                       counterparty_node_id: $counterparty_node_id,
+                                                       channel_id: $funding_txo.to_channel_id(),
+                                               });
+                               }
+                               if $peer_state.in_flight_monitor_updates.insert($funding_txo, $chan_in_flight_upds).is_some() {
+                                       log_error!(args.logger, "Duplicate in-flight monitor update set for the same channel!");
+                                       return Err(DecodeError::InvalidValue);
+                               }
+                               max_in_flight_update_id
+                       } }
+               }
+
+               for (counterparty_id, peer_state_mtx) in per_peer_state.iter_mut() {
+                       let mut peer_state_lock = peer_state_mtx.lock().unwrap();
+                       let peer_state = &mut *peer_state_lock;
+                       for (_, chan) in peer_state.channel_by_id.iter() {
+                               // Channels that were persisted have to be funded, otherwise they should have been
+                               // discarded.
+                               let funding_txo = chan.context.get_funding_txo().ok_or(DecodeError::InvalidValue)?;
+                               let monitor = args.channel_monitors.get(&funding_txo)
+                                       .expect("We already checked for monitor presence when loading channels");
+                               let mut max_in_flight_update_id = monitor.get_latest_update_id();
+                               if let Some(in_flight_upds) = &mut in_flight_monitor_updates {
+                                       if let Some(mut chan_in_flight_upds) = in_flight_upds.remove(&(*counterparty_id, funding_txo)) {
+                                               max_in_flight_update_id = cmp::max(max_in_flight_update_id,
+                                                       handle_in_flight_updates!(*counterparty_id, chan_in_flight_upds,
+                                                               funding_txo, monitor, peer_state, ""));
+                                       }
+                               }
+                               if chan.get_latest_unblocked_monitor_update_id() > max_in_flight_update_id {
+                                       // If the channel is ahead of the monitor, return InvalidValue:
+                                       log_error!(args.logger, "A ChannelMonitor is stale compared to the current ChannelManager! This indicates a potentially-critical violation of the chain::Watch API!");
+                                       log_error!(args.logger, " The ChannelMonitor for channel {} is at update_id {} with update_id through {} in-flight",
+                                               log_bytes!(chan.context.channel_id()), monitor.get_latest_update_id(), max_in_flight_update_id);
+                                       log_error!(args.logger, " but the ChannelManager is at update_id {}.", chan.get_latest_unblocked_monitor_update_id());
+                                       log_error!(args.logger, " The chain::Watch API *requires* that monitors are persisted durably before returning,");
+                                       log_error!(args.logger, " client applications must ensure that ChannelMonitor data is always available and the latest to avoid funds loss!");
+                                       log_error!(args.logger, " Without the latest ChannelMonitor we cannot continue without risking funds.");
+                                       log_error!(args.logger, " Please ensure the chain::Watch API requirements are met and file a bug report at https://github.com/lightningdevkit/rust-lightning");
+                                       return Err(DecodeError::InvalidValue);
+                               }
+                       }
+               }
+
+               if let Some(in_flight_upds) = in_flight_monitor_updates {
+                       for ((counterparty_id, funding_txo), mut chan_in_flight_updates) in in_flight_upds {
+                               if let Some(monitor) = args.channel_monitors.get(&funding_txo) {
+                                       // Now that we've removed all the in-flight monitor updates for channels that are
+                                       // still open, we need to replay any monitor updates that are for closed channels,
+                                       // creating the neccessary peer_state entries as we go.
+                                       let peer_state_mutex = per_peer_state.entry(counterparty_id).or_insert_with(|| {
+                                               Mutex::new(peer_state_from_chans(HashMap::new()))
+                                       });
+                                       let mut peer_state = peer_state_mutex.lock().unwrap();
+                                       handle_in_flight_updates!(counterparty_id, chan_in_flight_updates,
+                                               funding_txo, monitor, peer_state, "closed ");
+                               } else {
+                                       log_error!(args.logger, "A ChannelMonitor is missing even though we have in-flight updates for it! This indicates a potentially-critical violation of the chain::Watch API!");
+                                       log_error!(args.logger, " The ChannelMonitor for channel {} is missing.",
+                                               log_bytes!(funding_txo.to_channel_id()));
+                                       log_error!(args.logger, " The chain::Watch API *requires* that monitors are persisted durably before returning,");
+                                       log_error!(args.logger, " client applications must ensure that ChannelMonitor data is always available and the latest to avoid funds loss!");
+                                       log_error!(args.logger, " Without the latest ChannelMonitor we cannot continue without risking funds.");
+                                       log_error!(args.logger, " Please ensure the chain::Watch API requirements are met and file a bug report at https://github.com/lightningdevkit/rust-lightning");
+                                       return Err(DecodeError::InvalidValue);
+                               }
+                       }
+               }
+
+               // Note that we have to do the above replays before we push new monitor updates.
+               pending_background_events.append(&mut close_background_events);
+
+               // If there's any preimages for forwarded HTLCs hanging around in ChannelMonitors we
+               // should ensure we try them again on the inbound edge. We put them here and do so after we
+               // have a fully-constructed `ChannelManager` at the end.
+               let mut pending_claims_to_replay = Vec::new();
+
                {
                        // If we're tracking pending payments, ensure we haven't lost any by looking at the
                        // ChannelMonitor data for any channels for which we do not have authorative state
@@ -8305,7 +8806,8 @@ where
                        // We only rebuild the pending payments map if we were most recently serialized by
                        // 0.0.102+
                        for (_, monitor) in args.channel_monitors.iter() {
-                               if id_to_peer.get(&monitor.get_funding_txo().0.to_channel_id()).is_none() {
+                               let counterparty_opt = id_to_peer.get(&monitor.get_funding_txo().0.to_channel_id());
+                               if counterparty_opt.is_none() {
                                        for (htlc_source, (htlc, _)) in monitor.get_pending_or_resolved_outbound_htlcs() {
                                                if let HTLCSource::OutboundRoute { payment_id, session_priv, path, .. } = htlc_source {
                                                        if path.hops.is_empty() {
@@ -8333,6 +8835,7 @@ where
                                                                                payment_secret: None, // only used for retries, and we'll never retry on startup
                                                                                payment_metadata: None, // only used for retries, and we'll never retry on startup
                                                                                keysend_preimage: None, // only used for retries, and we'll never retry on startup
+                                                                               custom_tlvs: Vec::new(), // only used for retries, and we'll never retry on startup
                                                                                pending_amt_msat: path_amt,
                                                                                pending_fee_msat: Some(path_fee),
                                                                                total_msat: path_amt,
@@ -8399,6 +8902,33 @@ where
                                                }
                                        }
                                }
+
+                               // Whether the downstream channel was closed or not, try to re-apply any payment
+                               // preimages from it which may be needed in upstream channels for forwarded
+                               // payments.
+                               let outbound_claimed_htlcs_iter = monitor.get_all_current_outbound_htlcs()
+                                       .into_iter()
+                                       .filter_map(|(htlc_source, (htlc, preimage_opt))| {
+                                               if let HTLCSource::PreviousHopData(_) = htlc_source {
+                                                       if let Some(payment_preimage) = preimage_opt {
+                                                               Some((htlc_source, payment_preimage, htlc.amount_msat,
+                                                                       // Check if `counterparty_opt.is_none()` to see if the
+                                                                       // downstream chan is closed (because we don't have a
+                                                                       // channel_id -> peer map entry).
+                                                                       counterparty_opt.is_none(),
+                                                                       monitor.get_funding_txo().0.to_channel_id()))
+                                                       } else { None }
+                                               } else {
+                                                       // If it was an outbound payment, we've handled it above - if a preimage
+                                                       // came in and we persisted the `ChannelManager` we either handled it and
+                                                       // are good to go or the channel force-closed - we don't have to handle the
+                                                       // channel still live case here.
+                                                       None
+                                               }
+                                       });
+                               for tuple in outbound_claimed_htlcs_iter {
+                                       pending_claims_to_replay.push(tuple);
+                               }
                        }
                }
 
@@ -8586,6 +9116,12 @@ where
                                                                blocked_peer_state.lock().unwrap().actions_blocking_raa_monitor_updates
                                                                        .entry(blocked_channel_outpoint.to_channel_id())
                                                                        .or_insert_with(Vec::new).push(blocking_action.clone());
+                                                       } else {
+                                                               // If the channel we were blocking has closed, we don't need to
+                                                               // worry about it - the blocked monitor update should never have
+                                                               // been released from the `Channel` object so it can't have
+                                                               // completed, and if the channel closed there's no reason to bother
+                                                               // anymore.
                                                        }
                                                }
                                        }
@@ -8631,7 +9167,6 @@ where
                        pending_events_processor: AtomicBool::new(false),
                        pending_background_events: Mutex::new(pending_background_events),
                        total_consistency_lock: RwLock::new(()),
-                       #[cfg(debug_assertions)]
                        background_events_processed_since_startup: AtomicBool::new(false),
                        persistence_notifier: Notifier::new(),
 
@@ -8650,6 +9185,14 @@ where
                        channel_manager.fail_htlc_backwards_internal(&source, &payment_hash, &reason, receiver);
                }
 
+               for (source, preimage, downstream_value, downstream_closed, downstream_chan_id) in pending_claims_to_replay {
+                       // We use `downstream_closed` in place of `from_onchain` here just as a guess - we
+                       // don't remember in the `ChannelMonitor` where we got a preimage from, but if the
+                       // channel is closed we just assume that it probably came from an on-chain claim.
+                       channel_manager.claim_funds_internal(source, preimage, Some(downstream_value),
+                               downstream_closed, downstream_chan_id);
+               }
+
                //TODO: Broadcast channel update for closed channels, but only after we've made a
                //connection or two.
 
@@ -8667,7 +9210,7 @@ mod tests {
        use crate::ln::{PaymentPreimage, PaymentHash, PaymentSecret};
        use crate::ln::channelmanager::{inbound_payment, PaymentId, PaymentSendFailure, RecipientOnionFields, InterceptId};
        use crate::ln::functional_test_utils::*;
-       use crate::ln::msgs;
+       use crate::ln::msgs::{self, ErrorAction};
        use crate::ln::msgs::ChannelMessageHandler;
        use crate::routing::router::{PaymentParameters, RouteParameters, find_route};
        use crate::util::errors::APIError;
@@ -9180,7 +9723,7 @@ mod tests {
                nodes[0].node.force_close_broadcasting_latest_txn(&chan.2, &nodes[1].node.get_our_node_id()).unwrap();
                check_closed_broadcast!(nodes[0], true);
                check_added_monitors!(nodes[0], 1);
-               check_closed_event!(nodes[0], 1, ClosureReason::HolderForceClosed);
+               check_closed_event!(nodes[0], 1, ClosureReason::HolderForceClosed, [nodes[1].node.get_our_node_id()], 100000);
 
                {
                        // Assert that nodes[1] is awaiting removal for nodes[0] once nodes[1] has been
@@ -9343,8 +9886,8 @@ mod tests {
                }
                let (_nodes_1_update, _none) = get_closing_signed_broadcast!(nodes[1].node, nodes[0].node.get_our_node_id());
 
-               check_closed_event!(nodes[0], 1, ClosureReason::CooperativeClosure);
-               check_closed_event!(nodes[1], 1, ClosureReason::CooperativeClosure);
+               check_closed_event!(nodes[0], 1, ClosureReason::CooperativeClosure, [nodes[1].node.get_our_node_id()], 1000000);
+               check_closed_event!(nodes[1], 1, ClosureReason::CooperativeClosure, [nodes[0].node.get_our_node_id()], 1000000);
        }
 
        fn check_not_connected_to_peer_error<T>(res_err: Result<T, APIError>, expected_public_key: PublicKey) {
@@ -9619,7 +10162,92 @@ mod tests {
                get_event_msg!(nodes[1], MessageSendEvent::SendAcceptChannel, last_random_pk);
        }
 
-       #[cfg(anchors)]
+       #[test]
+       fn reject_excessively_underpaying_htlcs() {
+               let chanmon_cfg = create_chanmon_cfgs(1);
+               let node_cfg = create_node_cfgs(1, &chanmon_cfg);
+               let node_chanmgr = create_node_chanmgrs(1, &node_cfg, &[None]);
+               let node = create_network(1, &node_cfg, &node_chanmgr);
+               let sender_intended_amt_msat = 100;
+               let extra_fee_msat = 10;
+               let hop_data = msgs::InboundOnionPayload::Receive {
+                       amt_msat: 100,
+                       outgoing_cltv_value: 42,
+                       payment_metadata: None,
+                       keysend_preimage: None,
+                       payment_data: Some(msgs::FinalOnionHopData {
+                               payment_secret: PaymentSecret([0; 32]), total_msat: sender_intended_amt_msat,
+                       }),
+                       custom_tlvs: Vec::new(),
+               };
+               // Check that if the amount we received + the penultimate hop extra fee is less than the sender
+               // intended amount, we fail the payment.
+               if let Err(crate::ln::channelmanager::InboundOnionErr { err_code, .. }) =
+                       node[0].node.construct_recv_pending_htlc_info(hop_data, [0; 32], PaymentHash([0; 32]),
+                               sender_intended_amt_msat - extra_fee_msat - 1, 42, None, true, Some(extra_fee_msat))
+               {
+                       assert_eq!(err_code, 19);
+               } else { panic!(); }
+
+               // If amt_received + extra_fee is equal to the sender intended amount, we're fine.
+               let hop_data = msgs::InboundOnionPayload::Receive { // This is the same payload as above, InboundOnionPayload doesn't implement Clone
+                       amt_msat: 100,
+                       outgoing_cltv_value: 42,
+                       payment_metadata: None,
+                       keysend_preimage: None,
+                       payment_data: Some(msgs::FinalOnionHopData {
+                               payment_secret: PaymentSecret([0; 32]), total_msat: sender_intended_amt_msat,
+                       }),
+                       custom_tlvs: Vec::new(),
+               };
+               assert!(node[0].node.construct_recv_pending_htlc_info(hop_data, [0; 32], PaymentHash([0; 32]),
+                       sender_intended_amt_msat - extra_fee_msat, 42, None, true, Some(extra_fee_msat)).is_ok());
+       }
+
+       #[test]
+       fn test_inbound_anchors_manual_acceptance() {
+               // Tests that we properly limit inbound channels when we have the manual-channel-acceptance
+               // flag set and (sometimes) accept channels as 0conf.
+               let mut anchors_cfg = test_default_channel_config();
+               anchors_cfg.channel_handshake_config.negotiate_anchors_zero_fee_htlc_tx = true;
+
+               let mut anchors_manual_accept_cfg = anchors_cfg.clone();
+               anchors_manual_accept_cfg.manually_accept_inbound_channels = true;
+
+               let chanmon_cfgs = create_chanmon_cfgs(3);
+               let node_cfgs = create_node_cfgs(3, &chanmon_cfgs);
+               let node_chanmgrs = create_node_chanmgrs(3, &node_cfgs,
+                       &[Some(anchors_cfg.clone()), Some(anchors_cfg.clone()), Some(anchors_manual_accept_cfg.clone())]);
+               let nodes = create_network(3, &node_cfgs, &node_chanmgrs);
+
+               nodes[0].node.create_channel(nodes[1].node.get_our_node_id(), 100_000, 0, 42, None).unwrap();
+               let open_channel_msg = get_event_msg!(nodes[0], MessageSendEvent::SendOpenChannel, nodes[1].node.get_our_node_id());
+
+               nodes[1].node.handle_open_channel(&nodes[0].node.get_our_node_id(), &open_channel_msg);
+               assert!(nodes[1].node.get_and_clear_pending_events().is_empty());
+               let msg_events = nodes[1].node.get_and_clear_pending_msg_events();
+               match &msg_events[0] {
+                       MessageSendEvent::HandleError { node_id, action } => {
+                               assert_eq!(*node_id, nodes[0].node.get_our_node_id());
+                               match action {
+                                       ErrorAction::SendErrorMessage { msg } =>
+                                               assert_eq!(msg.data, "No channels with anchor outputs accepted".to_owned()),
+                                       _ => panic!("Unexpected error action"),
+                               }
+                       }
+                       _ => panic!("Unexpected event"),
+               }
+
+               nodes[2].node.handle_open_channel(&nodes[0].node.get_our_node_id(), &open_channel_msg);
+               let events = nodes[2].node.get_and_clear_pending_events();
+               match events[0] {
+                       Event::OpenChannelRequest { temporary_channel_id, .. } =>
+                               nodes[2].node.accept_inbound_channel(&temporary_channel_id, &nodes[0].node.get_our_node_id(), 23).unwrap(),
+                       _ => panic!("Unexpected event"),
+               }
+               get_event_msg!(nodes[2], MessageSendEvent::SendAcceptChannel, nodes[0].node.get_our_node_id());
+       }
+
        #[test]
        fn test_anchors_zero_fee_htlc_tx_fallback() {
                // Tests that if both nodes support anchors, but the remote node does not want to accept
@@ -9652,7 +10280,7 @@ mod tests {
                let open_channel_msg = get_event_msg!(nodes[0], MessageSendEvent::SendOpenChannel, nodes[1].node.get_our_node_id());
                assert!(!open_channel_msg.channel_type.unwrap().supports_anchors_zero_fee_htlc_tx());
 
-               check_closed_event!(nodes[1], 1, ClosureReason::HolderForceClosed);
+               check_closed_event!(nodes[1], 1, ClosureReason::HolderForceClosed, [nodes[0].node.get_our_node_id()], 100000);
        }
 
        #[test]
@@ -9709,6 +10337,25 @@ mod tests {
                        MessageSendEvent::BroadcastChannelUpdate { .. } => {},
                        _ => panic!("expected BroadcastChannelUpdate event"),
                }
+
+               // If we provide a channel_id not associated with the peer, we should get an error and no updates
+               // should be applied to ensure update atomicity as specified in the API docs.
+               let bad_channel_id = [10; 32];
+               let current_fee = nodes[0].node.list_channels()[0].config.unwrap().forwarding_fee_proportional_millionths;
+               let new_fee = current_fee + 100;
+               assert!(
+                       matches!(
+                               nodes[0].node.update_partial_channel_config(&channel.counterparty.node_id, &[channel.channel_id, bad_channel_id], &ChannelConfigUpdate {
+                                       forwarding_fee_proportional_millionths: Some(new_fee),
+                                       ..Default::default()
+                               }),
+                               Err(APIError::ChannelUnavailable { err: _ }),
+                       )
+               );
+               // Check that the fee hasn't changed for the channel that exists.
+               assert_eq!(nodes[0].node.list_channels()[0].config.unwrap().forwarding_fee_proportional_millionths, current_fee);
+               let events = nodes[0].node.get_and_clear_pending_msg_events();
+               assert_eq!(events.len(), 0);
        }
 }
 
@@ -9724,7 +10371,7 @@ pub mod bench {
        use crate::routing::gossip::NetworkGraph;
        use crate::routing::router::{PaymentParameters, RouteParameters};
        use crate::util::test_utils;
-       use crate::util::config::UserConfig;
+       use crate::util::config::{UserConfig, MaxDustHTLCExposure};
 
        use bitcoin::hashes::Hash;
        use bitcoin::hashes::sha256::Hash as Sha256;
@@ -9762,6 +10409,7 @@ pub mod bench {
                // Note that this is unrealistic as each payment send will require at least two fsync
                // calls per node.
                let network = bitcoin::Network::Testnet;
+               let genesis_block = bitcoin::blockdata::constants::genesis_block(network);
 
                let tx_broadcaster = test_utils::TestBroadcaster::new(network);
                let fee_estimator = test_utils::TestFeeEstimator { sat_per_kw: Mutex::new(253) };
@@ -9770,6 +10418,7 @@ pub mod bench {
                let router = test_utils::TestRouter::new(Arc::new(NetworkGraph::new(network, &logger_a)), &scorer);
 
                let mut config: UserConfig = Default::default();
+               config.channel_config.max_dust_htlc_exposure = MaxDustHTLCExposure::FeeRateMultiplier(5_000_000 / 253);
                config.channel_handshake_config.minimum_depth = 1;
 
                let chain_monitor_a = ChainMonitor::new(None, &tx_broadcaster, &logger_a, &fee_estimator, &persister_a);
@@ -9778,7 +10427,7 @@ pub mod bench {
                let node_a = ChannelManager::new(&fee_estimator, &chain_monitor_a, &tx_broadcaster, &router, &logger_a, &keys_manager_a, &keys_manager_a, &keys_manager_a, config.clone(), ChainParameters {
                        network,
                        best_block: BestBlock::from_network(network),
-               });
+               }, genesis_block.header.time);
                let node_a_holder = ANodeHolder { node: &node_a };
 
                let logger_b = test_utils::TestLogger::with_id("node a".to_owned());
@@ -9788,7 +10437,7 @@ pub mod bench {
                let node_b = ChannelManager::new(&fee_estimator, &chain_monitor_b, &tx_broadcaster, &router, &logger_b, &keys_manager_b, &keys_manager_b, &keys_manager_b, config.clone(), ChainParameters {
                        network,
                        best_block: BestBlock::from_network(network),
-               });
+               }, genesis_block.header.time);
                let node_b_holder = ANodeHolder { node: &node_b };
 
                node_a.peer_connected(&node_b.get_our_node_id(), &Init {