Merge pull request #2136 from marctyndel/2023-03-paymentforwarded-expose-amount-forwarded
[rust-lightning] / lightning / src / ln / channelmanager.rs
index d938e37102094679915841afb0c59c0a317a1a50..d591788f64a68209349b2d7b86c269997a63fbab 100644 (file)
@@ -9,15 +9,13 @@
 
 //! The top-level channel management and payment tracking stuff lives here.
 //!
-//! The ChannelManager is the main chunk of logic implementing the lightning protocol and is
+//! The [`ChannelManager`] is the main chunk of logic implementing the lightning protocol and is
 //! responsible for tracking which channels are open, HTLCs are in flight and reestablishing those
 //! upon reconnect to the relevant peer(s).
 //!
-//! It does not manage routing logic (see [`find_route`] for that) nor does it manage constructing
+//! It does not manage routing logic (see [`Router`] for that) nor does it manage constructing
 //! on-chain transactions (it only monitors the chain to watch for any force-closes that might
 //! imply it needs to fail HTLCs/payments/channels it manages).
-//!
-//! [`find_route`]: crate::routing::router::find_route
 
 use bitcoin::blockdata::block::BlockHeader;
 use bitcoin::blockdata::transaction::Transaction;
@@ -37,6 +35,8 @@ use crate::chain::{Confirm, ChannelMonitorUpdateStatus, Watch, BestBlock};
 use crate::chain::chaininterface::{BroadcasterInterface, ConfirmationTarget, FeeEstimator, LowerBoundedFeeEstimator};
 use crate::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, ChannelMonitorUpdateStep, HTLC_FAIL_BACK_BUFFER, CLTV_CLAIM_BUFFER, LATENCY_GRACE_PERIOD_BLOCKS, ANTI_REORG_DELAY, MonitorEvent, CLOSED_CHANNEL_UPDATE_ID};
 use crate::chain::transaction::{OutPoint, TransactionData};
+use crate::events;
+use crate::events::{Event, EventHandler, EventsProvider, MessageSendEvent, MessageSendEventsProvider, ClosureReason, HTLCDestination};
 // Since this struct is returned in `list_channels` methods, expose it here in case users want to
 // construct one themselves.
 use crate::ln::{inbound_payment, PaymentHash, PaymentPreimage, PaymentSecret};
@@ -55,12 +55,11 @@ use crate::ln::msgs::{ChannelMessageHandler, DecodeError, LightningError, MAX_VA
 use crate::ln::outbound_payment;
 use crate::ln::outbound_payment::{OutboundPayments, PaymentAttempts, PendingOutboundPayment};
 use crate::ln::wire::Encode;
-use crate::chain::keysinterface::{EntropySource, KeysManager, NodeSigner, Recipient, SignerProvider, ChannelSigner};
+use crate::chain::keysinterface::{EntropySource, KeysManager, NodeSigner, Recipient, SignerProvider, ChannelSigner, WriteableEcdsaChannelSigner};
 use crate::util::config::{UserConfig, ChannelConfig};
-use crate::util::events::{Event, EventHandler, EventsProvider, MessageSendEvent, MessageSendEventsProvider, ClosureReason, HTLCDestination};
-use crate::util::events;
 use crate::util::wakers::{Future, Notifier};
 use crate::util::scid_utils::fake_scid;
+use crate::util::string::UntrustedString;
 use crate::util::ser::{BigSize, FixedLengthReader, Readable, ReadableArgs, MaybeReadable, Writeable, Writer, VecWriter};
 use crate::util::logger::{Level, Logger};
 use crate::util::errors::APIError;
@@ -78,7 +77,7 @@ use core::time::Duration;
 use core::ops::Deref;
 
 // Re-export this for use in the public API.
-pub use crate::ln::outbound_payment::{PaymentSendFailure, Retry};
+pub use crate::ln::outbound_payment::{PaymentSendFailure, Retry, RetryableSendFailure};
 
 // We hold various information about HTLC relay in the HTLC objects in Channel itself:
 //
@@ -121,7 +120,10 @@ pub(super) struct PendingHTLCInfo {
        pub(super) routing: PendingHTLCRouting,
        pub(super) incoming_shared_secret: [u8; 32],
        payment_hash: PaymentHash,
+       /// Amount received
        pub(super) incoming_amt_msat: Option<u64>, // Added in 0.0.113
+       /// Sender intended amount to forward or receive (actual amount received
+       /// may overshoot this in either case)
        pub(super) outgoing_amt_msat: u64,
        pub(super) outgoing_cltv_value: u32,
 }
@@ -193,14 +195,21 @@ struct ClaimableHTLC {
        cltv_expiry: u32,
        /// The amount (in msats) of this MPP part
        value: u64,
+       /// The amount (in msats) that the sender intended to be sent in this MPP
+       /// part (used for validating total MPP amount)
+       sender_intended_value: u64,
        onion_payload: OnionPayload,
        timer_ticks: u8,
-       /// The sum total of all MPP parts
+       /// The total value received for a payment (sum of all MPP parts if the payment is a MPP).
+       /// Gets set to the amount reported when pushing [`Event::PaymentClaimable`].
+       total_value_received: Option<u64>,
+       /// The sender intended sum total of all MPP parts specified in the onion
        total_msat: u64,
 }
 
 /// A payment identifier used to uniquely identify a payment to LDK.
-/// (C-not exported) as we just use [u8; 32] directly
+///
+/// This is not exported to bindings users as we just use [u8; 32] directly
 #[derive(Hash, Copy, Clone, PartialEq, Eq, Debug)]
 pub struct PaymentId(pub [u8; 32]);
 
@@ -218,7 +227,8 @@ impl Readable for PaymentId {
 }
 
 /// An identifier used to uniquely identify an intercepted HTLC to LDK.
-/// (C-not exported) as we just use [u8; 32] directly
+///
+/// This is not exported to bindings users as we just use [u8; 32] directly
 #[derive(Hash, Copy, Clone, PartialEq, Eq, Debug)]
 pub struct InterceptId(pub [u8; 32]);
 
@@ -234,6 +244,36 @@ impl Readable for InterceptId {
                Ok(InterceptId(buf))
        }
 }
+
+#[derive(Clone, Copy, PartialEq, Eq, Hash)]
+/// Uniquely describes an HTLC by its source. Just the guaranteed-unique subset of [`HTLCSource`].
+pub(crate) enum SentHTLCId {
+       PreviousHopData { short_channel_id: u64, htlc_id: u64 },
+       OutboundRoute { session_priv: SecretKey },
+}
+impl SentHTLCId {
+       pub(crate) fn from_source(source: &HTLCSource) -> Self {
+               match source {
+                       HTLCSource::PreviousHopData(hop_data) => Self::PreviousHopData {
+                               short_channel_id: hop_data.short_channel_id,
+                               htlc_id: hop_data.htlc_id,
+                       },
+                       HTLCSource::OutboundRoute { session_priv, .. } =>
+                               Self::OutboundRoute { session_priv: *session_priv },
+               }
+       }
+}
+impl_writeable_tlv_based_enum!(SentHTLCId,
+       (0, PreviousHopData) => {
+               (0, short_channel_id, required),
+               (2, htlc_id, required),
+       },
+       (2, OutboundRoute) => {
+               (0, session_priv, required),
+       };
+);
+
+
 /// Tracks the inbound corresponding to an outbound HTLC
 #[allow(clippy::derive_hash_xor_eq)] // Our Hash is faithful to the data, we just don't have SecretKey::hash
 #[derive(Clone, PartialEq, Eq)]
@@ -247,11 +287,6 @@ pub(crate) enum HTLCSource {
                first_hop_htlc_msat: u64,
                payment_id: PaymentId,
                payment_secret: Option<PaymentSecret>,
-               /// Note that this is now "deprecated" - we write it for forwards (and read it for
-               /// backwards) compatibility reasons, but prefer to use the data in the
-               /// [`super::outbound_payment`] module, which stores per-payment data once instead of in
-               /// each HTLC.
-               payment_params: Option<PaymentParameters>,
        },
 }
 #[allow(clippy::derive_hash_xor_eq)] // Our Hash is faithful to the data, we just don't have SecretKey::hash
@@ -262,14 +297,13 @@ impl core::hash::Hash for HTLCSource {
                                0u8.hash(hasher);
                                prev_hop_data.hash(hasher);
                        },
-                       HTLCSource::OutboundRoute { path, session_priv, payment_id, payment_secret, first_hop_htlc_msat, payment_params } => {
+                       HTLCSource::OutboundRoute { path, session_priv, payment_id, payment_secret, first_hop_htlc_msat } => {
                                1u8.hash(hasher);
                                path.hash(hasher);
                                session_priv[..].hash(hasher);
                                payment_id.hash(hasher);
                                payment_secret.hash(hasher);
                                first_hop_htlc_msat.hash(hasher);
-                               payment_params.hash(hasher);
                        },
                }
        }
@@ -284,7 +318,6 @@ impl HTLCSource {
                        first_hop_htlc_msat: 0,
                        payment_id: PaymentId([2; 32]),
                        payment_secret: None,
-                       payment_params: None,
                }
        }
 }
@@ -468,7 +501,7 @@ pub(crate) enum MonitorUpdateCompletionAction {
 
 impl_writeable_tlv_based_enum_upgradable!(MonitorUpdateCompletionAction,
        (0, PaymentClaimed) => { (0, payment_hash, required) },
-       (2, EmitEvent) => { (0, event, ignorable) },
+       (2, EmitEvent) => { (0, event, upgradable_required) },
 );
 
 /// State we hold per-peer.
@@ -538,15 +571,16 @@ struct PendingInboundPayment {
        min_value_msat: Option<u64>,
 }
 
-/// SimpleArcChannelManager is useful when you need a ChannelManager with a static lifetime, e.g.
-/// when you're using lightning-net-tokio (since tokio::spawn requires parameters with static
+/// [`SimpleArcChannelManager`] is useful when you need a [`ChannelManager`] with a static lifetime, e.g.
+/// when you're using `lightning-net-tokio` (since `tokio::spawn` requires parameters with static
 /// lifetimes). Other times you can afford a reference, which is more efficient, in which case
-/// SimpleRefChannelManager is the more appropriate type. Defining these type aliases prevents
-/// issues such as overly long function definitions. Note that the ChannelManager can take any type
-/// that implements KeysInterface or Router for its keys manager and router, respectively, but this
-/// type alias chooses the concrete types of KeysManager and DefaultRouter.
+/// [`SimpleRefChannelManager`] is the more appropriate type. Defining these type aliases prevents
+/// issues such as overly long function definitions. Note that the `ChannelManager` can take any type
+/// that implements [`NodeSigner`], [`EntropySource`], and [`SignerProvider`] for its keys manager,
+/// or, respectively, [`Router`] for its router, but this type alias chooses the concrete types
+/// of [`KeysManager`] and [`DefaultRouter`].
 ///
-/// (C-not exported) as Arcs don't make sense in bindings
+/// This is not exported to bindings users as Arcs don't make sense in bindings
 pub type SimpleArcChannelManager<M, T, F, L> = ChannelManager<
        Arc<M>,
        Arc<T>,
@@ -562,50 +596,49 @@ pub type SimpleArcChannelManager<M, T, F, L> = ChannelManager<
        Arc<L>
 >;
 
-/// SimpleRefChannelManager is a type alias for a ChannelManager reference, and is the reference
-/// counterpart to the SimpleArcChannelManager type alias. Use this type by default when you don't
+/// [`SimpleRefChannelManager`] is a type alias for a ChannelManager reference, and is the reference
+/// counterpart to the [`SimpleArcChannelManager`] type alias. Use this type by default when you don't
 /// need a ChannelManager with a static lifetime. You'll need a static lifetime in cases such as
-/// usage of lightning-net-tokio (since tokio::spawn requires parameters with static lifetimes).
+/// usage of lightning-net-tokio (since `tokio::spawn` requires parameters with static lifetimes).
 /// But if this is not necessary, using a reference is more efficient. Defining these type aliases
 /// issues such as overly long function definitions. Note that the ChannelManager can take any type
-/// that implements KeysInterface or Router for its keys manager and router, respectively, but this
-/// type alias chooses the concrete types of KeysManager and DefaultRouter.
+/// that implements [`NodeSigner`], [`EntropySource`], and [`SignerProvider`] for its keys manager,
+/// or, respectively, [`Router`]  for its router, but this type alias chooses the concrete types
+/// of [`KeysManager`] and [`DefaultRouter`].
 ///
-/// (C-not exported) as Arcs don't make sense in bindings
+/// This is not exported to bindings users as Arcs don't make sense in bindings
 pub type SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, 'h, M, T, F, L> = ChannelManager<&'a M, &'b T, &'c KeysManager, &'c KeysManager, &'c KeysManager, &'d F, &'e DefaultRouter<&'f NetworkGraph<&'g L>, &'g L, &'h Mutex<ProbabilisticScorer<&'f NetworkGraph<&'g L>, &'g L>>>, &'g L>;
 
 /// Manager which keeps track of a number of channels and sends messages to the appropriate
 /// channel, also tracking HTLC preimages and forwarding onion packets appropriately.
 ///
-/// Implements ChannelMessageHandler, handling the multi-channel parts and passing things through
+/// Implements [`ChannelMessageHandler`], handling the multi-channel parts and passing things through
 /// to individual Channels.
 ///
-/// Implements Writeable to write out all channel state to disk. Implies peer_disconnected() for
+/// Implements [`Writeable`] to write out all channel state to disk. Implies [`peer_disconnected`] for
 /// all peers during write/read (though does not modify this instance, only the instance being
-/// serialized). This will result in any channels which have not yet exchanged funding_created (ie
-/// called funding_transaction_generated for outbound channels).
+/// serialized). This will result in any channels which have not yet exchanged [`funding_created`] (i.e.,
+/// called [`funding_transaction_generated`] for outbound channels) being closed.
 ///
-/// Note that you can be a bit lazier about writing out ChannelManager than you can be with
-/// ChannelMonitors. With ChannelMonitors you MUST write each monitor update out to disk before
-/// returning from chain::Watch::watch_/update_channel, with ChannelManagers, writing updates
-/// happens out-of-band (and will prevent any other ChannelManager operations from occurring during
+/// Note that you can be a bit lazier about writing out `ChannelManager` than you can be with
+/// [`ChannelMonitor`]. With [`ChannelMonitor`] you MUST write each monitor update out to disk before
+/// returning from [`chain::Watch::watch_channel`]/[`update_channel`], with ChannelManagers, writing updates
+/// happens out-of-band (and will prevent any other `ChannelManager` operations from occurring during
 /// the serialization process). If the deserialized version is out-of-date compared to the
-/// ChannelMonitors passed by reference to read(), those channels will be force-closed based on the
-/// ChannelMonitor state and no funds will be lost (mod on-chain transaction fees).
+/// [`ChannelMonitor`] passed by reference to [`read`], those channels will be force-closed based on the
+/// `ChannelMonitor` state and no funds will be lost (mod on-chain transaction fees).
 ///
-/// Note that the deserializer is only implemented for (BlockHash, ChannelManager), which
-/// tells you the last block hash which was block_connect()ed. You MUST rescan any blocks along
-/// the "reorg path" (ie call block_disconnected() until you get to a common block and then call
-/// block_connected() to step towards your best block) upon deserialization before using the
-/// object!
+/// Note that the deserializer is only implemented for `(`[`BlockHash`]`, `[`ChannelManager`]`)`, which
+/// tells you the last block hash which was connected. You should get the best block tip before using the manager.
+/// See [`chain::Listen`] and [`chain::Confirm`] for more details.
 ///
-/// Note that ChannelManager is responsible for tracking liveness of its channels and generating
-/// ChannelUpdate messages informing peers that the channel is temporarily disabled. To avoid
+/// Note that `ChannelManager` is responsible for tracking liveness of its channels and generating
+/// [`ChannelUpdate`] messages informing peers that the channel is temporarily disabled. To avoid
 /// spam due to quick disconnection/reconnection, updates are not sent until the channel has been
 /// offline for a full minute. In order to track this, you must call
-/// timer_tick_occurred roughly once per minute, though it doesn't have to be perfect.
+/// [`timer_tick_occurred`] roughly once per minute, though it doesn't have to be perfect.
 ///
-/// To avoid trivial DoS issues, ChannelManager limits the number of inbound connections and
+/// To avoid trivial DoS issues, `ChannelManager` limits the number of inbound connections and
 /// inbound channels without confirmed funding transactions. This may result in nodes which we do
 /// not have a channel with being unable to connect to us or open new channels with us if we have
 /// many peers with unfunded channels.
@@ -614,11 +647,20 @@ pub type SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, 'h, M, T, F, L> = C
 /// exempted from the count of unfunded channels. Similarly, outbound channels and connections are
 /// never limited. Please ensure you limit the count of such channels yourself.
 ///
-/// Rather than using a plain ChannelManager, it is preferable to use either a SimpleArcChannelManager
-/// a SimpleRefChannelManager, for conciseness. See their documentation for more details, but
-/// essentially you should default to using a SimpleRefChannelManager, and use a
-/// SimpleArcChannelManager when you require a ChannelManager with a static lifetime, such as when
+/// Rather than using a plain `ChannelManager`, it is preferable to use either a [`SimpleArcChannelManager`]
+/// a [`SimpleRefChannelManager`], for conciseness. See their documentation for more details, but
+/// essentially you should default to using a [`SimpleRefChannelManager`], and use a
+/// [`SimpleArcChannelManager`] when you require a `ChannelManager` with a static lifetime, such as when
 /// you're using lightning-net-tokio.
+///
+/// [`peer_disconnected`]: msgs::ChannelMessageHandler::peer_disconnected
+/// [`funding_created`]: msgs::FundingCreated
+/// [`funding_transaction_generated`]: Self::funding_transaction_generated
+/// [`BlockHash`]: bitcoin::hash_types::BlockHash
+/// [`update_channel`]: chain::Watch::update_channel
+/// [`ChannelUpdate`]: msgs::ChannelUpdate
+/// [`timer_tick_occurred`]: Self::timer_tick_occurred
+/// [`read`]: ReadableArgs::read
 //
 // Lock order:
 // The tree structure below illustrates the lock order requirements for the different locks of the
@@ -1019,7 +1061,7 @@ pub struct ChannelCounterparty {
        pub outbound_htlc_maximum_msat: Option<u64>,
 }
 
-/// Details of a channel, as returned by ChannelManager::list_channels and ChannelManager::list_usable_channels
+/// Details of a channel, as returned by [`ChannelManager::list_channels`] and [`ChannelManager::list_usable_channels`]
 #[derive(Clone, Debug, PartialEq)]
 pub struct ChannelDetails {
        /// The channel's ID (prior to funding transaction generation, this is a random 32 bytes,
@@ -1090,6 +1132,11 @@ pub struct ChannelDetails {
        /// inbound. This may be zero for inbound channels serialized with LDK versions prior to
        /// 0.0.113.
        pub user_channel_id: u128,
+       /// The currently negotiated fee rate denominated in satoshi per 1000 weight units,
+       /// which is applied to commitment and HTLC transactions.
+       ///
+       /// This value will be `None` for objects serialized with LDK versions prior to 0.0.115.
+       pub feerate_sat_per_1000_weight: Option<u32>,
        /// Our total balance.  This is the amount we would get if we close the channel.
        /// This value is not exact. Due to various in-flight changes and feerate changes, exactly this
        /// amount is not likely to be recoverable on close.
@@ -1201,6 +1248,56 @@ impl ChannelDetails {
        pub fn get_outbound_payment_scid(&self) -> Option<u64> {
                self.short_channel_id.or(self.outbound_scid_alias)
        }
+
+       fn from_channel<Signer: WriteableEcdsaChannelSigner>(channel: &Channel<Signer>,
+               best_block_height: u32, latest_features: InitFeatures) -> Self {
+
+               let balance = channel.get_available_balances();
+               let (to_remote_reserve_satoshis, to_self_reserve_satoshis) =
+                       channel.get_holder_counterparty_selected_channel_reserve_satoshis();
+               ChannelDetails {
+                       channel_id: channel.channel_id(),
+                       counterparty: ChannelCounterparty {
+                               node_id: channel.get_counterparty_node_id(),
+                               features: latest_features,
+                               unspendable_punishment_reserve: to_remote_reserve_satoshis,
+                               forwarding_info: channel.counterparty_forwarding_info(),
+                               // Ensures that we have actually received the `htlc_minimum_msat` value
+                               // from the counterparty through the `OpenChannel` or `AcceptChannel`
+                               // message (as they are always the first message from the counterparty).
+                               // Else `Channel::get_counterparty_htlc_minimum_msat` could return the
+                               // default `0` value set by `Channel::new_outbound`.
+                               outbound_htlc_minimum_msat: if channel.have_received_message() {
+                                       Some(channel.get_counterparty_htlc_minimum_msat()) } else { None },
+                               outbound_htlc_maximum_msat: channel.get_counterparty_htlc_maximum_msat(),
+                       },
+                       funding_txo: channel.get_funding_txo(),
+                       // Note that accept_channel (or open_channel) is always the first message, so
+                       // `have_received_message` indicates that type negotiation has completed.
+                       channel_type: if channel.have_received_message() { Some(channel.get_channel_type().clone()) } else { None },
+                       short_channel_id: channel.get_short_channel_id(),
+                       outbound_scid_alias: if channel.is_usable() { Some(channel.outbound_scid_alias()) } else { None },
+                       inbound_scid_alias: channel.latest_inbound_scid_alias(),
+                       channel_value_satoshis: channel.get_value_satoshis(),
+                       feerate_sat_per_1000_weight: Some(channel.get_feerate_sat_per_1000_weight()),
+                       unspendable_punishment_reserve: to_self_reserve_satoshis,
+                       balance_msat: balance.balance_msat,
+                       inbound_capacity_msat: balance.inbound_capacity_msat,
+                       outbound_capacity_msat: balance.outbound_capacity_msat,
+                       next_outbound_htlc_limit_msat: balance.next_outbound_htlc_limit_msat,
+                       user_channel_id: channel.get_user_id(),
+                       confirmations_required: channel.minimum_depth(),
+                       confirmations: Some(channel.get_funding_tx_confirmations(best_block_height)),
+                       force_close_spend_delay: channel.get_counterparty_selected_contest_delay(),
+                       is_outbound: channel.is_outbound(),
+                       is_channel_ready: channel.is_usable(),
+                       is_usable: channel.is_live(),
+                       is_public: channel.should_announce(),
+                       inbound_htlc_minimum_msat: Some(channel.get_holder_htlc_minimum_msat()),
+                       inbound_htlc_maximum_msat: channel.get_holder_htlc_maximum_msat(),
+                       config: Some(channel.config()),
+               }
+       }
 }
 
 /// Used by [`ChannelManager::list_recent_payments`] to express the status of recent payments.
@@ -1417,7 +1514,7 @@ macro_rules! emit_channel_ready_event {
 }
 
 macro_rules! handle_monitor_update_completion {
-       ($self: ident, $update_id: expr, $peer_state_lock: expr, $peer_state: expr, $chan: expr) => { {
+       ($self: ident, $update_id: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan: expr) => { {
                let mut updates = $chan.monitor_updating_restored(&$self.logger,
                        &$self.node_signer, $self.genesis_hash, &$self.default_configuration,
                        $self.best_block.read().unwrap().height());
@@ -1450,6 +1547,7 @@ macro_rules! handle_monitor_update_completion {
 
                let channel_id = $chan.channel_id();
                core::mem::drop($peer_state_lock);
+               core::mem::drop($per_peer_state_lock);
 
                $self.handle_monitor_update_completion_actions(update_actions);
 
@@ -1465,7 +1563,7 @@ macro_rules! handle_monitor_update_completion {
 }
 
 macro_rules! handle_new_monitor_update {
-       ($self: ident, $update_res: expr, $update_id: expr, $peer_state_lock: expr, $peer_state: expr, $chan: expr, MANUALLY_REMOVING, $remove: expr) => { {
+       ($self: ident, $update_res: expr, $update_id: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan: expr, MANUALLY_REMOVING, $remove: expr) => { {
                // update_maps_on_chan_removal needs to be able to take id_to_peer, so make sure we can in
                // any case so that it won't deadlock.
                debug_assert!($self.id_to_peer.try_lock().is_ok());
@@ -1492,14 +1590,14 @@ macro_rules! handle_new_monitor_update {
                                        .update_id == $update_id) &&
                                        $chan.get_latest_monitor_update_id() == $update_id
                                {
-                                       handle_monitor_update_completion!($self, $update_id, $peer_state_lock, $peer_state, $chan);
+                                       handle_monitor_update_completion!($self, $update_id, $peer_state_lock, $peer_state, $per_peer_state_lock, $chan);
                                }
                                Ok(())
                        },
                }
        } };
-       ($self: ident, $update_res: expr, $update_id: expr, $peer_state_lock: expr, $peer_state: expr, $chan_entry: expr) => {
-               handle_new_monitor_update!($self, $update_res, $update_id, $peer_state_lock, $peer_state, $chan_entry.get_mut(), MANUALLY_REMOVING, $chan_entry.remove_entry())
+       ($self: ident, $update_res: expr, $update_id: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan_entry: expr) => {
+               handle_new_monitor_update!($self, $update_res, $update_id, $peer_state_lock, $peer_state, $per_peer_state_lock, $chan_entry.get_mut(), MANUALLY_REMOVING, $chan_entry.remove_entry())
        }
 }
 
@@ -1514,16 +1612,21 @@ where
        R::Target: Router,
        L::Target: Logger,
 {
-       /// Constructs a new ChannelManager to hold several channels and route between them.
+       /// Constructs a new `ChannelManager` to hold several channels and route between them.
        ///
        /// This is the main "logic hub" for all channel-related actions, and implements
-       /// ChannelMessageHandler.
+       /// [`ChannelMessageHandler`].
        ///
        /// Non-proportional fees are fixed according to our risk using the provided fee estimator.
        ///
-       /// Users need to notify the new ChannelManager when a new block is connected or
-       /// disconnected using its `block_connected` and `block_disconnected` methods, starting
-       /// from after `params.latest_hash`.
+       /// Users need to notify the new `ChannelManager` when a new block is connected or
+       /// disconnected using its [`block_connected`] and [`block_disconnected`] methods, starting
+       /// from after [`params.best_block.block_hash`]. See [`chain::Listen`] and [`chain::Confirm`] for
+       /// more details.
+       ///
+       /// [`block_connected`]: chain::Listen::block_connected
+       /// [`block_disconnected`]: chain::Listen::block_disconnected
+       /// [`params.best_block.block_hash`]: chain::BestBlock::block_hash
        pub fn new(fee_est: F, chain_monitor: M, tx_broadcaster: T, router: R, logger: L, entropy_source: ES, node_signer: NS, signer_provider: SP, config: UserConfig, params: ChainParameters) -> Self {
                let mut secp_ctx = Secp256k1::new();
                secp_ctx.seeded_randomize(&entropy_source.get_secure_random_bytes());
@@ -1687,71 +1790,28 @@ where
                        for (_cp_id, peer_state_mutex) in per_peer_state.iter() {
                                let mut peer_state_lock = peer_state_mutex.lock().unwrap();
                                let peer_state = &mut *peer_state_lock;
-                               for (channel_id, channel) in peer_state.channel_by_id.iter().filter(f) {
-                                       let balance = channel.get_available_balances();
-                                       let (to_remote_reserve_satoshis, to_self_reserve_satoshis) =
-                                               channel.get_holder_counterparty_selected_channel_reserve_satoshis();
-                                       res.push(ChannelDetails {
-                                               channel_id: (*channel_id).clone(),
-                                               counterparty: ChannelCounterparty {
-                                                       node_id: channel.get_counterparty_node_id(),
-                                                       features: peer_state.latest_features.clone(),
-                                                       unspendable_punishment_reserve: to_remote_reserve_satoshis,
-                                                       forwarding_info: channel.counterparty_forwarding_info(),
-                                                       // Ensures that we have actually received the `htlc_minimum_msat` value
-                                                       // from the counterparty through the `OpenChannel` or `AcceptChannel`
-                                                       // message (as they are always the first message from the counterparty).
-                                                       // Else `Channel::get_counterparty_htlc_minimum_msat` could return the
-                                                       // default `0` value set by `Channel::new_outbound`.
-                                                       outbound_htlc_minimum_msat: if channel.have_received_message() {
-                                                               Some(channel.get_counterparty_htlc_minimum_msat()) } else { None },
-                                                       outbound_htlc_maximum_msat: channel.get_counterparty_htlc_maximum_msat(),
-                                               },
-                                               funding_txo: channel.get_funding_txo(),
-                                               // Note that accept_channel (or open_channel) is always the first message, so
-                                               // `have_received_message` indicates that type negotiation has completed.
-                                               channel_type: if channel.have_received_message() { Some(channel.get_channel_type().clone()) } else { None },
-                                               short_channel_id: channel.get_short_channel_id(),
-                                               outbound_scid_alias: if channel.is_usable() { Some(channel.outbound_scid_alias()) } else { None },
-                                               inbound_scid_alias: channel.latest_inbound_scid_alias(),
-                                               channel_value_satoshis: channel.get_value_satoshis(),
-                                               unspendable_punishment_reserve: to_self_reserve_satoshis,
-                                               balance_msat: balance.balance_msat,
-                                               inbound_capacity_msat: balance.inbound_capacity_msat,
-                                               outbound_capacity_msat: balance.outbound_capacity_msat,
-                                               next_outbound_htlc_limit_msat: balance.next_outbound_htlc_limit_msat,
-                                               user_channel_id: channel.get_user_id(),
-                                               confirmations_required: channel.minimum_depth(),
-                                               confirmations: Some(channel.get_funding_tx_confirmations(best_block_height)),
-                                               force_close_spend_delay: channel.get_counterparty_selected_contest_delay(),
-                                               is_outbound: channel.is_outbound(),
-                                               is_channel_ready: channel.is_usable(),
-                                               is_usable: channel.is_live(),
-                                               is_public: channel.should_announce(),
-                                               inbound_htlc_minimum_msat: Some(channel.get_holder_htlc_minimum_msat()),
-                                               inbound_htlc_maximum_msat: channel.get_holder_htlc_maximum_msat(),
-                                               config: Some(channel.config()),
-                                       });
+                               for (_channel_id, channel) in peer_state.channel_by_id.iter().filter(f) {
+                                       let details = ChannelDetails::from_channel(channel, best_block_height,
+                                               peer_state.latest_features.clone());
+                                       res.push(details);
                                }
                        }
                }
                res
        }
 
-       /// Gets the list of open channels, in random order. See ChannelDetail field documentation for
+       /// Gets the list of open channels, in random order. See [`ChannelDetails`] field documentation for
        /// more information.
        pub fn list_channels(&self) -> Vec<ChannelDetails> {
                self.list_channels_with_filter(|_| true)
        }
 
-       /// Gets the list of usable channels, in random order. Useful as an argument to [`find_route`]
-       /// to ensure non-announced channels are used.
+       /// Gets the list of usable channels, in random order. Useful as an argument to
+       /// [`Router::find_route`] to ensure non-announced channels are used.
        ///
        /// These are guaranteed to have their [`ChannelDetails::is_usable`] value set to true, see the
        /// documentation for [`ChannelDetails::is_usable`] for more info on exactly what the criteria
        /// are.
-       ///
-       /// [`find_route`]: crate::routing::router::find_route
        pub fn list_usable_channels(&self) -> Vec<ChannelDetails> {
                // Note we use is_live here instead of usable which leads to somewhat confused
                // internal/external nomenclature, but that's ok cause that's probably what the user
@@ -1759,6 +1819,24 @@ where
                self.list_channels_with_filter(|&(_, ref channel)| channel.is_live())
        }
 
+       /// Gets the list of channels we have with a given counterparty, in random order.
+       pub fn list_channels_with_counterparty(&self, counterparty_node_id: &PublicKey) -> Vec<ChannelDetails> {
+               let best_block_height = self.best_block.read().unwrap().height();
+               let per_peer_state = self.per_peer_state.read().unwrap();
+
+               if let Some(peer_state_mutex) = per_peer_state.get(counterparty_node_id) {
+                       let mut peer_state_lock = peer_state_mutex.lock().unwrap();
+                       let peer_state = &mut *peer_state_lock;
+                       let features = &peer_state.latest_features;
+                       return peer_state.channel_by_id
+                               .iter()
+                               .map(|(_, channel)|
+                                       ChannelDetails::from_channel(channel, best_block_height, features.clone()))
+                               .collect();
+               }
+               vec![]
+       }
+
        /// Returns in an undefined order recent payments that -- if not fulfilled -- have yet to find a
        /// successful path, or have unresolved HTLCs.
        ///
@@ -1835,7 +1913,7 @@ where
                                        if let Some(monitor_update) = monitor_update_opt.take() {
                                                let update_id = monitor_update.update_id;
                                                let update_res = self.chain_monitor.update_channel(funding_txo_opt.unwrap(), monitor_update);
-                                               break handle_new_monitor_update!(self, update_res, update_id, peer_state_lock, peer_state, chan_entry);
+                                               break handle_new_monitor_update!(self, update_res, update_id, peer_state_lock, peer_state, per_peer_state, chan_entry);
                                        }
 
                                        if chan_entry.get().is_shutdown() {
@@ -1875,11 +1953,12 @@ where
        ///    would appear on a force-closure transaction, whichever is lower. We will allow our
        ///    counterparty to pay as much fee as they'd like, however.
        ///
-       /// May generate a SendShutdown message event on success, which should be relayed.
+       /// May generate a [`SendShutdown`] message event on success, which should be relayed.
        ///
        /// [`ChannelConfig::force_close_avoidance_max_fee_satoshis`]: crate::util::config::ChannelConfig::force_close_avoidance_max_fee_satoshis
        /// [`Background`]: crate::chain::chaininterface::ConfirmationTarget::Background
        /// [`Normal`]: crate::chain::chaininterface::ConfirmationTarget::Normal
+       /// [`SendShutdown`]: crate::events::MessageSendEvent::SendShutdown
        pub fn close_channel(&self, channel_id: &[u8; 32], counterparty_node_id: &PublicKey) -> Result<(), APIError> {
                self.close_channel_internal(channel_id, counterparty_node_id, None)
        }
@@ -1898,11 +1977,12 @@ where
        ///    transaction feerate below `target_feerate_sat_per_1000_weight` (or the feerate which
        ///    will appear on a force-closure transaction, whichever is lower).
        ///
-       /// May generate a SendShutdown message event on success, which should be relayed.
+       /// May generate a [`SendShutdown`] message event on success, which should be relayed.
        ///
        /// [`ChannelConfig::force_close_avoidance_max_fee_satoshis`]: crate::util::config::ChannelConfig::force_close_avoidance_max_fee_satoshis
        /// [`Background`]: crate::chain::chaininterface::ConfirmationTarget::Background
        /// [`Normal`]: crate::chain::chaininterface::ConfirmationTarget::Normal
+       /// [`SendShutdown`]: crate::events::MessageSendEvent::SendShutdown
        pub fn close_channel_with_target_feerate(&self, channel_id: &[u8; 32], counterparty_node_id: &PublicKey, target_feerate_sats_per_1000_weight: u32) -> Result<(), APIError> {
                self.close_channel_internal(channel_id, counterparty_node_id, Some(target_feerate_sats_per_1000_weight))
        }
@@ -1938,7 +2018,7 @@ where
                        let peer_state = &mut *peer_state_lock;
                        if let hash_map::Entry::Occupied(chan) = peer_state.channel_by_id.entry(channel_id.clone()) {
                                if let Some(peer_msg) = peer_msg {
-                                       self.issue_channel_close_events(chan.get(),ClosureReason::CounterpartyForceClosed { peer_msg: peer_msg.to_string() });
+                                       self.issue_channel_close_events(chan.get(),ClosureReason::CounterpartyForceClosed { peer_msg: UntrustedString(peer_msg.to_string()) });
                                } else {
                                        self.issue_channel_close_events(chan.get(),ClosureReason::HolderForceClosed);
                                }
@@ -2021,9 +2101,9 @@ where
                payment_hash: PaymentHash, amt_msat: u64, cltv_expiry: u32, phantom_shared_secret: Option<[u8; 32]>) -> Result<PendingHTLCInfo, ReceiveError>
        {
                // final_incorrect_cltv_expiry
-               if hop_data.outgoing_cltv_value != cltv_expiry {
+               if hop_data.outgoing_cltv_value > cltv_expiry {
                        return Err(ReceiveError {
-                               msg: "Upstream node set CLTV to the wrong value",
+                               msg: "Upstream node set CLTV to less than the CLTV set by the sender",
                                err_code: 18,
                                err_data: cltv_expiry.to_be_bytes().to_vec()
                        })
@@ -2107,7 +2187,7 @@ where
                        payment_hash,
                        incoming_shared_secret: shared_secret,
                        incoming_amt_msat: Some(amt_msat),
-                       outgoing_amt_msat: amt_msat,
+                       outgoing_amt_msat: hop_data.amt_to_forward,
                        outgoing_cltv_value: hop_data.outgoing_cltv_value,
                })
        }
@@ -2347,13 +2427,16 @@ where
                pending_forward_info
        }
 
-       /// Gets the current channel_update for the given channel. This first checks if the channel is
+       /// Gets the current [`channel_update`] for the given channel. This first checks if the channel is
        /// public, and thus should be called whenever the result is going to be passed out in a
        /// [`MessageSendEvent::BroadcastChannelUpdate`] event.
        ///
-       /// Note that in `internal_closing_signed`, this function is called without the `peer_state`
+       /// Note that in [`internal_closing_signed`], this function is called without the `peer_state`
        /// corresponding to the channel's counterparty locked, as the channel been removed from the
        /// storage and the `peer_state` lock has been dropped.
+       ///
+       /// [`channel_update`]: msgs::ChannelUpdate
+       /// [`internal_closing_signed`]: Self::internal_closing_signed
        fn get_channel_update_for_broadcast(&self, chan: &Channel<<SP::Target as SignerProvider>::Signer>) -> Result<msgs::ChannelUpdate, LightningError> {
                if !chan.should_announce() {
                        return Err(LightningError {
@@ -2368,14 +2451,17 @@ where
                self.get_channel_update_for_unicast(chan)
        }
 
-       /// Gets the current channel_update for the given channel. This does not check if the channel
-       /// is public (only returning an Err if the channel does not yet have an assigned short_id),
+       /// Gets the current [`channel_update`] for the given channel. This does not check if the channel
+       /// is public (only returning an `Err` if the channel does not yet have an assigned SCID),
        /// and thus MUST NOT be called unless the recipient of the resulting message has already
        /// provided evidence that they know about the existence of the channel.
        ///
-       /// Note that through `internal_closing_signed`, this function is called without the
+       /// Note that through [`internal_closing_signed`], this function is called without the
        /// `peer_state`  corresponding to the channel's counterparty locked, as the channel been
        /// removed from the storage and the `peer_state` lock has been dropped.
+       ///
+       /// [`channel_update`]: msgs::ChannelUpdate
+       /// [`internal_closing_signed`]: Self::internal_closing_signed
        fn get_channel_update_for_unicast(&self, chan: &Channel<<SP::Target as SignerProvider>::Signer>) -> Result<msgs::ChannelUpdate, LightningError> {
                log_trace!(self.logger, "Attempting to generate channel update for channel {}", log_bytes!(chan.channel_id()));
                let short_channel_id = match chan.get_short_channel_id().or(chan.latest_inbound_scid_alias()) {
@@ -2413,22 +2499,28 @@ where
                })
        }
 
-       // Only public for testing, this should otherwise never be called direcly
-       pub(crate) fn send_payment_along_path(&self, path: &Vec<RouteHop>, payment_params: &Option<PaymentParameters>, payment_hash: &PaymentHash, payment_secret: &Option<PaymentSecret>, total_value: u64, cur_height: u32, payment_id: PaymentId, keysend_preimage: &Option<PaymentPreimage>, session_priv_bytes: [u8; 32]) -> Result<(), APIError> {
+       #[cfg(test)]
+       pub(crate) fn test_send_payment_along_path(&self, path: &Vec<RouteHop>, payment_hash: &PaymentHash, payment_secret: &Option<PaymentSecret>, total_value: u64, cur_height: u32, payment_id: PaymentId, keysend_preimage: &Option<PaymentPreimage>, session_priv_bytes: [u8; 32]) -> Result<(), APIError> {
+               let _lck = self.total_consistency_lock.read().unwrap();
+               self.send_payment_along_path(path, payment_hash, payment_secret, total_value, cur_height, payment_id, keysend_preimage, session_priv_bytes)
+       }
+
+       fn send_payment_along_path(&self, path: &Vec<RouteHop>, payment_hash: &PaymentHash, payment_secret: &Option<PaymentSecret>, total_value: u64, cur_height: u32, payment_id: PaymentId, keysend_preimage: &Option<PaymentPreimage>, session_priv_bytes: [u8; 32]) -> Result<(), APIError> {
+               // The top-level caller should hold the total_consistency_lock read lock.
+               debug_assert!(self.total_consistency_lock.try_write().is_err());
+
                log_trace!(self.logger, "Attempting to send payment for path with next hop {}", path.first().unwrap().short_channel_id);
                let prng_seed = self.entropy_source.get_secure_random_bytes();
                let session_priv = SecretKey::from_slice(&session_priv_bytes[..]).expect("RNG is busted");
 
                let onion_keys = onion_utils::construct_onion_keys(&self.secp_ctx, &path, &session_priv)
-                       .map_err(|_| APIError::InvalidRoute{err: "Pubkey along hop was maliciously selected"})?;
+                       .map_err(|_| APIError::InvalidRoute{err: "Pubkey along hop was maliciously selected".to_owned()})?;
                let (onion_payloads, htlc_msat, htlc_cltv) = onion_utils::build_onion_payloads(path, total_value, payment_secret, cur_height, keysend_preimage)?;
                if onion_utils::route_size_insane(&onion_payloads) {
-                       return Err(APIError::InvalidRoute{err: "Route size too large considering onion data"});
+                       return Err(APIError::InvalidRoute{err: "Route size too large considering onion data".to_owned()});
                }
                let onion_packet = onion_utils::construct_onion_packet(onion_payloads, onion_keys, prng_seed, payment_hash);
 
-               let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);
-
                let err: Result<(), _> = loop {
                        let (counterparty_node_id, id) = match self.short_to_chan_info.read().unwrap().get(&path.first().unwrap().short_channel_id) {
                                None => return Err(APIError::ChannelUnavailable{err: "No channel available with first hop!".to_owned()}),
@@ -2437,7 +2529,7 @@ where
 
                        let per_peer_state = self.per_peer_state.read().unwrap();
                        let peer_state_mutex = per_peer_state.get(&counterparty_node_id)
-                               .ok_or_else(|| APIError::InvalidRoute{err: "No peer matching the path's first hop found!" })?;
+                               .ok_or_else(|| APIError::ChannelUnavailable{err: "No peer matching the path's first hop found!".to_owned() })?;
                        let mut peer_state_lock = peer_state_mutex.lock().unwrap();
                        let peer_state = &mut *peer_state_lock;
                        if let hash_map::Entry::Occupied(mut chan) = peer_state.channel_by_id.entry(id) {
@@ -2452,13 +2544,12 @@ where
                                                first_hop_htlc_msat: htlc_msat,
                                                payment_id,
                                                payment_secret: payment_secret.clone(),
-                                               payment_params: payment_params.clone(),
                                        }, onion_packet, &self.logger);
                                match break_chan_entry!(self, send_res, chan) {
                                        Some(monitor_update) => {
                                                let update_id = monitor_update.update_id;
                                                let update_res = self.chain_monitor.update_channel(funding_txo, monitor_update);
-                                               if let Err(e) = handle_new_monitor_update!(self, update_res, update_id, peer_state_lock, peer_state, chan) {
+                                               if let Err(e) = handle_new_monitor_update!(self, update_res, update_id, peer_state_lock, peer_state, per_peer_state, chan) {
                                                        break Err(e);
                                                }
                                                if update_res == ChannelMonitorUpdateStatus::InProgress {
@@ -2496,7 +2587,7 @@ where
        /// Value parameters are provided via the last hop in route, see documentation for [`RouteHop`]
        /// fields for more info.
        ///
-       /// May generate SendHTLCs message(s) event on success, which should be relayed (e.g. via
+       /// May generate [`UpdateHTLCs`] message(s) event on success, which should be relayed (e.g. via
        /// [`PeerManager::process_events`]).
        ///
        /// # Avoiding Duplicate Payments
@@ -2520,7 +2611,7 @@ where
        ///
        /// # Possible Error States on [`PaymentSendFailure`]
        ///
-       /// Each path may have a different return value, and PaymentSendValue may return a Vec with
+       /// Each path may have a different return value, and [`PaymentSendFailure`] may return a `Vec` with
        /// each entry matching the corresponding-index entry in the route paths, see
        /// [`PaymentSendFailure`] for more info.
        ///
@@ -2533,7 +2624,7 @@ where
        ///  * [`APIError::MonitorUpdateInProgress`] if a new monitor update failure prevented sending the
        ///    relevant updates.
        ///
-       /// Note that depending on the type of the PaymentSendFailure the HTLC may have been
+       /// Note that depending on the type of the [`PaymentSendFailure`] the HTLC may have been
        /// irrevocably committed to on our end. In such a case, do NOT retry the payment with a
        /// different route unless you intend to pay twice!
        ///
@@ -2551,34 +2642,39 @@ where
        ///
        /// [`Event::PaymentSent`]: events::Event::PaymentSent
        /// [`Event::PaymentFailed`]: events::Event::PaymentFailed
+       /// [`UpdateHTLCs`]: events::MessageSendEvent::UpdateHTLCs
        /// [`PeerManager::process_events`]: crate::ln::peer_handler::PeerManager::process_events
        /// [`ChannelMonitorUpdateStatus::InProgress`]: crate::chain::ChannelMonitorUpdateStatus::InProgress
        pub fn send_payment(&self, route: &Route, payment_hash: PaymentHash, payment_secret: &Option<PaymentSecret>, payment_id: PaymentId) -> Result<(), PaymentSendFailure> {
                let best_block_height = self.best_block.read().unwrap().height();
+               let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);
                self.pending_outbound_payments
                        .send_payment_with_route(route, payment_hash, payment_secret, payment_id, &self.entropy_source, &self.node_signer, best_block_height,
-                               |path, payment_params, payment_hash, payment_secret, total_value, cur_height, payment_id, keysend_preimage, session_priv|
-                               self.send_payment_along_path(path, payment_params, payment_hash, payment_secret, total_value, cur_height, payment_id, keysend_preimage, session_priv))
+                               |path, payment_hash, payment_secret, total_value, cur_height, payment_id, keysend_preimage, session_priv|
+                               self.send_payment_along_path(path, payment_hash, payment_secret, total_value, cur_height, payment_id, keysend_preimage, session_priv))
        }
 
        /// Similar to [`ChannelManager::send_payment`], but will automatically find a route based on
        /// `route_params` and retry failed payment paths based on `retry_strategy`.
-       pub fn send_payment_with_retry(&self, payment_hash: PaymentHash, payment_secret: &Option<PaymentSecret>, payment_id: PaymentId, route_params: RouteParameters, retry_strategy: Retry) -> Result<(), PaymentSendFailure> {
+       pub fn send_payment_with_retry(&self, payment_hash: PaymentHash, payment_secret: &Option<PaymentSecret>, payment_id: PaymentId, route_params: RouteParameters, retry_strategy: Retry) -> Result<(), RetryableSendFailure> {
                let best_block_height = self.best_block.read().unwrap().height();
+               let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);
                self.pending_outbound_payments
                        .send_payment(payment_hash, payment_secret, payment_id, retry_strategy, route_params,
                                &self.router, self.list_usable_channels(), || self.compute_inflight_htlcs(),
                                &self.entropy_source, &self.node_signer, best_block_height, &self.logger,
-                               |path, payment_params, payment_hash, payment_secret, total_value, cur_height, payment_id, keysend_preimage, session_priv|
-                               self.send_payment_along_path(path, payment_params, payment_hash, payment_secret, total_value, cur_height, payment_id, keysend_preimage, session_priv))
+                               &self.pending_events,
+                               |path, payment_hash, payment_secret, total_value, cur_height, payment_id, keysend_preimage, session_priv|
+                               self.send_payment_along_path(path, payment_hash, payment_secret, total_value, cur_height, payment_id, keysend_preimage, session_priv))
        }
 
        #[cfg(test)]
-       fn test_send_payment_internal(&self, route: &Route, payment_hash: PaymentHash, payment_secret: &Option<PaymentSecret>, keysend_preimage: Option<PaymentPreimage>, payment_id: PaymentId, recv_value_msat: Option<u64>, onion_session_privs: Vec<[u8; 32]>) -> Result<(), PaymentSendFailure> {
+       pub(super) fn test_send_payment_internal(&self, route: &Route, payment_hash: PaymentHash, payment_secret: &Option<PaymentSecret>, keysend_preimage: Option<PaymentPreimage>, payment_id: PaymentId, recv_value_msat: Option<u64>, onion_session_privs: Vec<[u8; 32]>) -> Result<(), PaymentSendFailure> {
                let best_block_height = self.best_block.read().unwrap().height();
+               let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);
                self.pending_outbound_payments.test_send_payment_internal(route, payment_hash, payment_secret, keysend_preimage, payment_id, recv_value_msat, onion_session_privs, &self.node_signer, best_block_height,
-                       |path, payment_params, payment_hash, payment_secret, total_value, cur_height, payment_id, keysend_preimage, session_priv|
-                       self.send_payment_along_path(path, payment_params, payment_hash, payment_secret, total_value, cur_height, payment_id, keysend_preimage, session_priv))
+                       |path, payment_hash, payment_secret, total_value, cur_height, payment_id, keysend_preimage, session_priv|
+                       self.send_payment_along_path(path, payment_hash, payment_secret, total_value, cur_height, payment_id, keysend_preimage, session_priv))
        }
 
        #[cfg(test)]
@@ -2626,11 +2722,12 @@ where
        /// [`send_payment`]: Self::send_payment
        pub fn send_spontaneous_payment(&self, route: &Route, payment_preimage: Option<PaymentPreimage>, payment_id: PaymentId) -> Result<PaymentHash, PaymentSendFailure> {
                let best_block_height = self.best_block.read().unwrap().height();
+               let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);
                self.pending_outbound_payments.send_spontaneous_payment_with_route(
                        route, payment_preimage, payment_id, &self.entropy_source, &self.node_signer,
                        best_block_height,
-                       |path, payment_params, payment_hash, payment_secret, total_value, cur_height, payment_id, keysend_preimage, session_priv|
-                       self.send_payment_along_path(path, payment_params, payment_hash, payment_secret, total_value, cur_height, payment_id, keysend_preimage, session_priv))
+                       |path, payment_hash, payment_secret, total_value, cur_height, payment_id, keysend_preimage, session_priv|
+                       self.send_payment_along_path(path, payment_hash, payment_secret, total_value, cur_height, payment_id, keysend_preimage, session_priv))
        }
 
        /// Similar to [`ChannelManager::send_spontaneous_payment`], but will automatically find a route
@@ -2640,14 +2737,15 @@ where
        /// payments.
        ///
        /// [`PaymentParameters::for_keysend`]: crate::routing::router::PaymentParameters::for_keysend
-       pub fn send_spontaneous_payment_with_retry(&self, payment_preimage: Option<PaymentPreimage>, payment_id: PaymentId, route_params: RouteParameters, retry_strategy: Retry) -> Result<PaymentHash, PaymentSendFailure> {
+       pub fn send_spontaneous_payment_with_retry(&self, payment_preimage: Option<PaymentPreimage>, payment_id: PaymentId, route_params: RouteParameters, retry_strategy: Retry) -> Result<PaymentHash, RetryableSendFailure> {
                let best_block_height = self.best_block.read().unwrap().height();
+               let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);
                self.pending_outbound_payments.send_spontaneous_payment(payment_preimage, payment_id,
                        retry_strategy, route_params, &self.router, self.list_usable_channels(),
                        || self.compute_inflight_htlcs(),  &self.entropy_source, &self.node_signer, best_block_height,
-                       &self.logger,
-                       |path, payment_params, payment_hash, payment_secret, total_value, cur_height, payment_id, keysend_preimage, session_priv|
-                       self.send_payment_along_path(path, payment_params, payment_hash, payment_secret, total_value, cur_height, payment_id, keysend_preimage, session_priv))
+                       &self.logger, &self.pending_events,
+                       |path, payment_hash, payment_secret, total_value, cur_height, payment_id, keysend_preimage, session_priv|
+                       self.send_payment_along_path(path, payment_hash, payment_secret, total_value, cur_height, payment_id, keysend_preimage, session_priv))
        }
 
        /// Send a payment that is probing the given route for liquidity. We calculate the
@@ -2655,9 +2753,10 @@ where
        /// us to easily discern them from real payments.
        pub fn send_probe(&self, hops: Vec<RouteHop>) -> Result<(PaymentHash, PaymentId), PaymentSendFailure> {
                let best_block_height = self.best_block.read().unwrap().height();
+               let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);
                self.pending_outbound_payments.send_probe(hops, self.probing_cookie_secret, &self.entropy_source, &self.node_signer, best_block_height,
-                       |path, payment_params, payment_hash, payment_secret, total_value, cur_height, payment_id, keysend_preimage, session_priv|
-                       self.send_payment_along_path(path, payment_params, payment_hash, payment_secret, total_value, cur_height, payment_id, keysend_preimage, session_priv))
+                       |path, payment_hash, payment_secret, total_value, cur_height, payment_id, keysend_preimage, session_priv|
+                       self.send_payment_along_path(path, payment_hash, payment_secret, total_value, cur_height, payment_id, keysend_preimage, session_priv))
        }
 
        /// Returns whether a payment with the given [`PaymentHash`] and [`PaymentId`] is, in fact, a
@@ -2757,8 +2856,8 @@ where
        /// implemented by Bitcoin Core wallet. See <https://bitcoinops.org/en/topics/fee-sniping/>
        /// for more details.
        ///
-       /// [`Event::FundingGenerationReady`]: crate::util::events::Event::FundingGenerationReady
-       /// [`Event::ChannelClosed`]: crate::util::events::Event::ChannelClosed
+       /// [`Event::FundingGenerationReady`]: crate::events::Event::FundingGenerationReady
+       /// [`Event::ChannelClosed`]: crate::events::Event::ChannelClosed
        pub fn funding_transaction_generated(&self, temporary_channel_id: &[u8; 32], counterparty_node_id: &PublicKey, funding_transaction: Transaction) -> Result<(), APIError> {
                let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);
 
@@ -3168,7 +3267,7 @@ where
                                                        HTLCForwardInfo::AddHTLC(PendingAddHTLCInfo {
                                                                prev_short_channel_id, prev_htlc_id, prev_funding_outpoint, prev_user_channel_id,
                                                                forward_info: PendingHTLCInfo {
-                                                                       routing, incoming_shared_secret, payment_hash, outgoing_amt_msat, ..
+                                                                       routing, incoming_shared_secret, payment_hash, incoming_amt_msat, outgoing_amt_msat, ..
                                                                }
                                                        }) => {
                                                                let (cltv_expiry, onion_payload, payment_data, phantom_shared_secret) = match routing {
@@ -3182,7 +3281,7 @@ where
                                                                                panic!("short_channel_id == 0 should imply any pending_forward entries are of type Receive");
                                                                        }
                                                                };
-                                                               let claimable_htlc = ClaimableHTLC {
+                                                               let mut claimable_htlc = ClaimableHTLC {
                                                                        prev_hop: HTLCPreviousHopData {
                                                                                short_channel_id: prev_short_channel_id,
                                                                                outpoint: prev_funding_outpoint,
@@ -3190,8 +3289,13 @@ where
                                                                                incoming_packet_shared_secret: incoming_shared_secret,
                                                                                phantom_shared_secret,
                                                                        },
-                                                                       value: outgoing_amt_msat,
+                                                                       // We differentiate the received value from the sender intended value
+                                                                       // if possible so that we don't prematurely mark MPP payments complete
+                                                                       // if routing nodes overpay
+                                                                       value: incoming_amt_msat.unwrap_or(outgoing_amt_msat),
+                                                                       sender_intended_value: outgoing_amt_msat,
                                                                        timer_ticks: 0,
+                                                                       total_value_received: None,
                                                                        total_msat: if let Some(data) = &payment_data { data.total_msat } else { outgoing_amt_msat },
                                                                        cltv_expiry,
                                                                        onion_payload,
@@ -3236,7 +3340,7 @@ where
                                                                                        fail_htlc!(claimable_htlc, payment_hash);
                                                                                        continue
                                                                                }
-                                                                               let (_, htlcs) = claimable_payments.claimable_htlcs.entry(payment_hash)
+                                                                               let (_, ref mut htlcs) = claimable_payments.claimable_htlcs.entry(payment_hash)
                                                                                        .or_insert_with(|| (purpose(), Vec::new()));
                                                                                if htlcs.len() == 1 {
                                                                                        if let OnionPayload::Spontaneous(_) = htlcs[0].onion_payload {
@@ -3245,9 +3349,9 @@ where
                                                                                                continue
                                                                                        }
                                                                                }
-                                                                               let mut total_value = claimable_htlc.value;
+                                                                               let mut total_value = claimable_htlc.sender_intended_value;
                                                                                for htlc in htlcs.iter() {
-                                                                                       total_value += htlc.value;
+                                                                                       total_value += htlc.sender_intended_value;
                                                                                        match &htlc.onion_payload {
                                                                                                OnionPayload::Invoice { .. } => {
                                                                                                        if htlc.total_msat != $payment_data.total_msat {
@@ -3260,18 +3364,24 @@ where
                                                                                                _ => unreachable!(),
                                                                                        }
                                                                                }
-                                                                               if total_value >= msgs::MAX_VALUE_MSAT || total_value > $payment_data.total_msat {
-                                                                                       log_trace!(self.logger, "Failing HTLCs with payment_hash {} as the total value {} ran over expected value {} (or HTLCs were inconsistent)",
-                                                                                               log_bytes!(payment_hash.0), total_value, $payment_data.total_msat);
+                                                                               // The condition determining whether an MPP is complete must
+                                                                               // match exactly the condition used in `timer_tick_occurred`
+                                                                               if total_value >= msgs::MAX_VALUE_MSAT {
+                                                                                       fail_htlc!(claimable_htlc, payment_hash);
+                                                                               } else if total_value - claimable_htlc.sender_intended_value >= $payment_data.total_msat {
+                                                                                       log_trace!(self.logger, "Failing HTLC with payment_hash {} as payment is already claimable",
+                                                                                               log_bytes!(payment_hash.0));
                                                                                        fail_htlc!(claimable_htlc, payment_hash);
-                                                                               } else if total_value == $payment_data.total_msat {
+                                                                               } else if total_value >= $payment_data.total_msat {
                                                                                        let prev_channel_id = prev_funding_outpoint.to_channel_id();
                                                                                        htlcs.push(claimable_htlc);
+                                                                                       let amount_msat = htlcs.iter().map(|htlc| htlc.value).sum();
+                                                                                       htlcs.iter_mut().for_each(|htlc| htlc.total_value_received = Some(amount_msat));
                                                                                        new_events.push(events::Event::PaymentClaimable {
                                                                                                receiver_node_id: Some(receiver_node_id),
                                                                                                payment_hash,
                                                                                                purpose: purpose(),
-                                                                                               amount_msat: total_value,
+                                                                                               amount_msat,
                                                                                                via_channel_id: Some(prev_channel_id),
                                                                                                via_user_channel_id: Some(prev_user_channel_id),
                                                                                        });
@@ -3325,13 +3435,15 @@ where
                                                                                                }
                                                                                                match claimable_payments.claimable_htlcs.entry(payment_hash) {
                                                                                                        hash_map::Entry::Vacant(e) => {
+                                                                                                               let amount_msat = claimable_htlc.value;
+                                                                                                               claimable_htlc.total_value_received = Some(amount_msat);
                                                                                                                let purpose = events::PaymentPurpose::SpontaneousPayment(preimage);
                                                                                                                e.insert((purpose.clone(), vec![claimable_htlc]));
                                                                                                                let prev_channel_id = prev_funding_outpoint.to_channel_id();
                                                                                                                new_events.push(events::Event::PaymentClaimable {
                                                                                                                        receiver_node_id: Some(receiver_node_id),
                                                                                                                        payment_hash,
-                                                                                                                       amount_msat: outgoing_amt_msat,
+                                                                                                                       amount_msat,
                                                                                                                        purpose,
                                                                                                                        via_channel_id: Some(prev_channel_id),
                                                                                                                        via_user_channel_id: Some(prev_user_channel_id),
@@ -3381,8 +3493,8 @@ where
                self.pending_outbound_payments.check_retry_payments(&self.router, || self.list_usable_channels(),
                        || self.compute_inflight_htlcs(), &self.entropy_source, &self.node_signer, best_block_height,
                        &self.pending_events, &self.logger,
-                       |path, payment_params, payment_hash, payment_secret, total_value, cur_height, payment_id, keysend_preimage, session_priv|
-                       self.send_payment_along_path(path, payment_params, payment_hash, payment_secret, total_value, cur_height, payment_id, keysend_preimage, session_priv));
+                       |path, payment_hash, payment_secret, total_value, cur_height, payment_id, keysend_preimage, session_priv|
+                       self.send_payment_along_path(path, payment_hash, payment_secret, total_value, cur_height, payment_id, keysend_preimage, session_priv));
 
                for (htlc_source, payment_hash, failure_reason, destination) in failed_forwards.drain(..) {
                        self.fail_htlc_backwards_internal(&htlc_source, &payment_hash, &failure_reason, destination);
@@ -3434,18 +3546,18 @@ where
        fn update_channel_fee(&self, chan_id: &[u8; 32], chan: &mut Channel<<SP::Target as SignerProvider>::Signer>, new_feerate: u32) -> NotifyOption {
                if !chan.is_outbound() { return NotifyOption::SkipPersist; }
                // If the feerate has decreased by less than half, don't bother
-               if new_feerate <= chan.get_feerate() && new_feerate * 2 > chan.get_feerate() {
+               if new_feerate <= chan.get_feerate_sat_per_1000_weight() && new_feerate * 2 > chan.get_feerate_sat_per_1000_weight() {
                        log_trace!(self.logger, "Channel {} does not qualify for a feerate change from {} to {}.",
-                               log_bytes!(chan_id[..]), chan.get_feerate(), new_feerate);
+                               log_bytes!(chan_id[..]), chan.get_feerate_sat_per_1000_weight(), new_feerate);
                        return NotifyOption::SkipPersist;
                }
                if !chan.is_live() {
                        log_trace!(self.logger, "Channel {} does not qualify for a feerate change from {} to {} as it cannot currently be updated (probably the peer is disconnected).",
-                               log_bytes!(chan_id[..]), chan.get_feerate(), new_feerate);
+                               log_bytes!(chan_id[..]), chan.get_feerate_sat_per_1000_weight(), new_feerate);
                        return NotifyOption::SkipPersist;
                }
                log_trace!(self.logger, "Channel {} qualifies for a feerate change from {} to {}.",
-                       log_bytes!(chan_id[..]), chan.get_feerate(), new_feerate);
+                       log_bytes!(chan_id[..]), chan.get_feerate_sat_per_1000_weight(), new_feerate);
 
                chan.queue_update_fee(new_feerate, &self.logger);
                NotifyOption::DoPersist
@@ -3480,15 +3592,18 @@ where
        ///
        /// This currently includes:
        ///  * Increasing or decreasing the on-chain feerate estimates for our outbound channels,
-       ///  * Broadcasting `ChannelUpdate` messages if we've been disconnected from our peer for more
+       ///  * Broadcasting [`ChannelUpdate`] messages if we've been disconnected from our peer for more
        ///    than a minute, informing the network that they should no longer attempt to route over
        ///    the channel.
-       ///  * Expiring a channel's previous `ChannelConfig` if necessary to only allow forwarding HTLCs
-       ///    with the current `ChannelConfig`.
+       ///  * Expiring a channel's previous [`ChannelConfig`] if necessary to only allow forwarding HTLCs
+       ///    with the current [`ChannelConfig`].
        ///  * Removing peers which have disconnected but and no longer have any channels.
        ///
-       /// Note that this may cause reentrancy through `chain::Watch::update_channel` calls or feerate
+       /// Note that this may cause reentrancy through [`chain::Watch::update_channel`] calls or feerate
        /// estimate fetches.
+       ///
+       /// [`ChannelUpdate`]: msgs::ChannelUpdate
+       /// [`ChannelConfig`]: crate::util::config::ChannelConfig
        pub fn timer_tick_occurred(&self) {
                PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, &self.persistence_notifier, || {
                        let mut should_persist = NotifyOption::SkipPersist;
@@ -3588,7 +3703,9 @@ where
                                if let OnionPayload::Invoice { .. } = htlcs[0].onion_payload {
                                        // Check if we've received all the parts we need for an MPP (the value of the parts adds to total_msat).
                                        // In this case we're not going to handle any timeouts of the parts here.
-                                       if htlcs[0].total_msat == htlcs.iter().fold(0, |total, htlc| total + htlc.value) {
+                                       // This condition determining whether the MPP is complete here must match
+                                       // exactly the condition used in `process_pending_htlc_forwards`.
+                                       if htlcs[0].total_msat <= htlcs.iter().fold(0, |total, htlc| total + htlc.sender_intended_value) {
                                                return true;
                                        } else if htlcs.into_iter().any(|htlc| {
                                                htlc.timer_ticks += 1;
@@ -3639,14 +3756,14 @@ where
        /// [`events::Event::PaymentClaimed`] events even for payments you intend to fail, especially on
        /// startup during which time claims that were in-progress at shutdown may be replayed.
        pub fn fail_htlc_backwards(&self, payment_hash: &PaymentHash) {
-               self.fail_htlc_backwards_with_reason(payment_hash, &FailureCode::IncorrectOrUnknownPaymentDetails);
+               self.fail_htlc_backwards_with_reason(payment_hash, FailureCode::IncorrectOrUnknownPaymentDetails);
        }
 
        /// This is a variant of [`ChannelManager::fail_htlc_backwards`] that allows you to specify the
        /// reason for the failure.
        ///
        /// See [`FailureCode`] for valid failure codes.
-       pub fn fail_htlc_backwards_with_reason(&self, payment_hash: &PaymentHash, failure_code: &FailureCode) {
+       pub fn fail_htlc_backwards_with_reason(&self, payment_hash: &PaymentHash, failure_code: FailureCode) {
                let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);
 
                let removed_source = self.claimable_payments.lock().unwrap().claimable_htlcs.remove(payment_hash);
@@ -3661,14 +3778,14 @@ where
        }
 
        /// Gets error data to form an [`HTLCFailReason`] given a [`FailureCode`] and [`ClaimableHTLC`].
-       fn get_htlc_fail_reason_from_failure_code(&self, failure_code: &FailureCode, htlc: &ClaimableHTLC) -> HTLCFailReason {
+       fn get_htlc_fail_reason_from_failure_code(&self, failure_code: FailureCode, htlc: &ClaimableHTLC) -> HTLCFailReason {
                match failure_code {
-                       FailureCode::TemporaryNodeFailure => HTLCFailReason::from_failure_code(*failure_code as u16),
-                       FailureCode::RequiredNodeFeatureMissing => HTLCFailReason::from_failure_code(*failure_code as u16),
+                       FailureCode::TemporaryNodeFailure => HTLCFailReason::from_failure_code(failure_code as u16),
+                       FailureCode::RequiredNodeFeatureMissing => HTLCFailReason::from_failure_code(failure_code as u16),
                        FailureCode::IncorrectOrUnknownPaymentDetails => {
                                let mut htlc_msat_height_data = htlc.value.to_be_bytes().to_vec();
                                htlc_msat_height_data.extend_from_slice(&self.best_block.read().unwrap().height().to_be_bytes());
-                               HTLCFailReason::reason(*failure_code as u16, htlc_msat_height_data)
+                               HTLCFailReason::reason(failure_code as u16, htlc_msat_height_data)
                        }
                }
        }
@@ -3768,9 +3885,9 @@ where
                // from block_connected which may run during initialization prior to the chain_monitor
                // being fully configured. See the docs for `ChannelManagerReadArgs` for more.
                match source {
-                       HTLCSource::OutboundRoute { ref path, ref session_priv, ref payment_id, ref payment_params, .. } => {
+                       HTLCSource::OutboundRoute { ref path, ref session_priv, ref payment_id, .. } => {
                                if self.pending_outbound_payments.fail_htlc(source, payment_hash, onion_error, path,
-                                       session_priv, payment_id, payment_params, self.probing_cookie_secret, &self.secp_ctx,
+                                       session_priv, payment_id, self.probing_cookie_secret, &self.secp_ctx,
                                        &self.pending_events, &self.logger)
                                { self.push_pending_forwards_ev(); }
                        },
@@ -3814,8 +3931,8 @@ where
        /// event matches your expectation. If you fail to do so and call this method, you may provide
        /// the sender "proof-of-payment" when they did not fulfill the full expected payment.
        ///
-       /// [`Event::PaymentClaimable`]: crate::util::events::Event::PaymentClaimable
-       /// [`Event::PaymentClaimed`]: crate::util::events::Event::PaymentClaimed
+       /// [`Event::PaymentClaimable`]: crate::events::Event::PaymentClaimable
+       /// [`Event::PaymentClaimed`]: crate::events::Event::PaymentClaimed
        /// [`process_pending_events`]: EventsProvider::process_pending_events
        /// [`create_inbound_payment`]: Self::create_inbound_payment
        /// [`create_inbound_payment_for_hash`]: Self::create_inbound_payment_for_hash
@@ -3867,6 +3984,7 @@ where
                // provide the preimage, so worrying too much about the optimal handling isn't worth
                // it.
                let mut claimable_amt_msat = 0;
+               let mut prev_total_msat = None;
                let mut expected_amt_msat = None;
                let mut valid_mpp = true;
                let mut errs = Vec::new();
@@ -3894,14 +4012,22 @@ where
                                break;
                        }
 
-                       if expected_amt_msat.is_some() && expected_amt_msat != Some(htlc.total_msat) {
-                               log_error!(self.logger, "Somehow ended up with an MPP payment with different total amounts - this should not be reachable!");
+                       if prev_total_msat.is_some() && prev_total_msat != Some(htlc.total_msat) {
+                               log_error!(self.logger, "Somehow ended up with an MPP payment with different expected total amounts - this should not be reachable!");
                                debug_assert!(false);
                                valid_mpp = false;
                                break;
                        }
+                       prev_total_msat = Some(htlc.total_msat);
+
+                       if expected_amt_msat.is_some() && expected_amt_msat != htlc.total_value_received {
+                               log_error!(self.logger, "Somehow ended up with an MPP payment with different received total amounts - this should not be reachable!");
+                               debug_assert!(false);
+                               valid_mpp = false;
+                               break;
+                       }
+                       expected_amt_msat = htlc.total_value_received;
 
-                       expected_amt_msat = Some(htlc.total_msat);
                        if let OnionPayload::Spontaneous(_) = &htlc.onion_payload {
                                // We don't currently support MPP for spontaneous payments, so just check
                                // that there's one payment here and move on.
@@ -3972,13 +4098,14 @@ where
                        None => None
                };
 
-               let mut peer_state_opt = counterparty_node_id_opt.as_ref().map(
+               let peer_state_opt = counterparty_node_id_opt.as_ref().map(
                        |counterparty_node_id| per_peer_state.get(counterparty_node_id).map(
                                |peer_mutex| peer_mutex.lock().unwrap()
                        )
                ).unwrap_or(None);
 
-               if let Some(mut peer_state_lock) = peer_state_opt.take() {
+               if peer_state_opt.is_some() {
+                       let mut peer_state_lock = peer_state_opt.unwrap();
                        let peer_state = &mut *peer_state_lock;
                        if let hash_map::Entry::Occupied(mut chan) = peer_state.channel_by_id.entry(chan_id) {
                                let counterparty_node_id = chan.get().get_counterparty_node_id();
@@ -3993,7 +4120,7 @@ where
                                        let update_id = monitor_update.update_id;
                                        let update_res = self.chain_monitor.update_channel(prev_hop.outpoint, monitor_update);
                                        let res = handle_new_monitor_update!(self, update_res, update_id, peer_state_lock,
-                                               peer_state, chan);
+                                               peer_state, per_peer_state, chan);
                                        if let Err(e) = res {
                                                // TODO: This is a *critical* error - we probably updated the outbound edge
                                                // of the HTLC's monitor with a preimage. We should retry this monitor
@@ -4057,6 +4184,7 @@ where
                                                                claim_from_onchain_tx: from_onchain,
                                                                prev_channel_id,
                                                                next_channel_id,
+                                                               outbound_amount_forwarded_msat: forwarded_htlc_value_msat,
                                                        }})
                                                } else { None }
                                        });
@@ -4163,7 +4291,7 @@ where
        }
 
        fn channel_monitor_updated(&self, funding_txo: &OutPoint, highest_applied_update_id: u64, counterparty_node_id: Option<&PublicKey>) {
-               let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);
+               debug_assert!(self.total_consistency_lock.try_write().is_err()); // Caller holds read lock
 
                let counterparty_node_id = match counterparty_node_id {
                        Some(cp_id) => cp_id.clone(),
@@ -4194,7 +4322,7 @@ where
                if !channel.get().is_awaiting_monitor_update() || channel.get().get_latest_monitor_update_id() != highest_applied_update_id {
                        return;
                }
-               handle_monitor_update_completion!(self, highest_applied_update_id, peer_state_lock, peer_state, channel.get_mut());
+               handle_monitor_update_completion!(self, highest_applied_update_id, peer_state_lock, peer_state, per_peer_state, channel.get_mut());
        }
 
        /// Accepts a request to open a channel after a [`Event::OpenChannelRequest`].
@@ -4500,7 +4628,8 @@ where
                                let monitor_res = self.chain_monitor.watch_channel(monitor.get_funding_txo().0, monitor);
 
                                let chan = e.insert(chan);
-                               let mut res = handle_new_monitor_update!(self, monitor_res, 0, peer_state_lock, peer_state, chan, MANUALLY_REMOVING, { peer_state.channel_by_id.remove(&new_channel_id) });
+                               let mut res = handle_new_monitor_update!(self, monitor_res, 0, peer_state_lock, peer_state,
+                                       per_peer_state, chan, MANUALLY_REMOVING, { peer_state.channel_by_id.remove(&new_channel_id) });
 
                                // Note that we reply with the new channel_id in error messages if we gave up on the
                                // channel, not the temporary_channel_id. This is compatible with ourselves, but the
@@ -4533,7 +4662,7 @@ where
                                let monitor = try_chan_entry!(self,
                                        chan.get_mut().funding_signed(&msg, best_block, &self.signer_provider, &self.logger), chan);
                                let update_res = self.chain_monitor.watch_channel(chan.get().get_funding_txo().unwrap(), monitor);
-                               let mut res = handle_new_monitor_update!(self, update_res, 0, peer_state_lock, peer_state, chan);
+                               let mut res = handle_new_monitor_update!(self, update_res, 0, peer_state_lock, peer_state, per_peer_state, chan);
                                if let Err(MsgHandleErrInternal { ref mut shutdown_finish, .. }) = res {
                                        // We weren't able to watch the channel to begin with, so no updates should be made on
                                        // it. Previously, full_stack_target found an (unreachable) panic when the
@@ -4629,7 +4758,7 @@ where
                                        if let Some(monitor_update) = monitor_update_opt {
                                                let update_id = monitor_update.update_id;
                                                let update_res = self.chain_monitor.update_channel(funding_txo_opt.unwrap(), monitor_update);
-                                               break handle_new_monitor_update!(self, update_res, update_id, peer_state_lock, peer_state, chan_entry);
+                                               break handle_new_monitor_update!(self, update_res, update_id, peer_state_lock, peer_state, per_peer_state, chan_entry);
                                        }
                                        break Ok(());
                                },
@@ -4821,7 +4950,7 @@ where
                                let update_res = self.chain_monitor.update_channel(funding_txo.unwrap(), monitor_update);
                                let update_id = monitor_update.update_id;
                                handle_new_monitor_update!(self, update_res, update_id, peer_state_lock,
-                                       peer_state, chan)
+                                       peer_state, per_peer_state, chan)
                        },
                        hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id))
                }
@@ -4927,12 +5056,11 @@ where
        fn internal_revoke_and_ack(&self, counterparty_node_id: &PublicKey, msg: &msgs::RevokeAndACK) -> Result<(), MsgHandleErrInternal> {
                let (htlcs_to_fail, res) = {
                        let per_peer_state = self.per_peer_state.read().unwrap();
-                       let peer_state_mutex = per_peer_state.get(counterparty_node_id)
+                       let mut peer_state_lock = per_peer_state.get(counterparty_node_id)
                                .ok_or_else(|| {
                                        debug_assert!(false);
                                        MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id)
-                               })?;
-                       let mut peer_state_lock = peer_state_mutex.lock().unwrap();
+                               }).map(|mtx| mtx.lock().unwrap())?;
                        let peer_state = &mut *peer_state_lock;
                        match peer_state.channel_by_id.entry(msg.channel_id) {
                                hash_map::Entry::Occupied(mut chan) => {
@@ -4940,8 +5068,8 @@ where
                                        let (htlcs_to_fail, monitor_update) = try_chan_entry!(self, chan.get_mut().revoke_and_ack(&msg, &self.logger), chan);
                                        let update_res = self.chain_monitor.update_channel(funding_txo.unwrap(), monitor_update);
                                        let update_id = monitor_update.update_id;
-                                       let res = handle_new_monitor_update!(self, update_res, update_id, peer_state_lock,
-                                               peer_state, chan);
+                                       let res = handle_new_monitor_update!(self, update_res, update_id,
+                                               peer_state_lock, peer_state, per_peer_state, chan);
                                        (htlcs_to_fail, res)
                                },
                                hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id))
@@ -5101,8 +5229,10 @@ where
                Ok(())
        }
 
-       /// Process pending events from the `chain::Watch`, returning whether any events were processed.
+       /// Process pending events from the [`chain::Watch`], returning whether any events were processed.
        fn process_pending_monitor_events(&self) -> bool {
+               debug_assert!(self.total_consistency_lock.try_write().is_err()); // Caller holds read lock
+
                let mut failed_channels = Vec::new();
                let mut pending_monitor_events = self.chain_monitor.release_pending_monitor_events();
                let has_pending_monitor_events = !pending_monitor_events.is_empty();
@@ -5180,7 +5310,13 @@ where
        /// update events as a separate process method here.
        #[cfg(fuzzing)]
        pub fn process_monitor_events(&self) {
-               self.process_pending_monitor_events();
+               PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, &self.persistence_notifier, || {
+                       if self.process_pending_monitor_events() {
+                               NotifyOption::DoPersist
+                       } else {
+                               NotifyOption::SkipPersist
+                       }
+               });
        }
 
        /// Check the holding cell in each channel and free any pending HTLCs in them if possible.
@@ -5190,38 +5326,45 @@ where
                let mut has_monitor_update = false;
                let mut failed_htlcs = Vec::new();
                let mut handle_errors = Vec::new();
-               let per_peer_state = self.per_peer_state.read().unwrap();
-
-               for (_cp_id, peer_state_mutex) in per_peer_state.iter() {
-                       'chan_loop: loop {
-                               let mut peer_state_lock = peer_state_mutex.lock().unwrap();
-                               let peer_state: &mut PeerState<_> = &mut *peer_state_lock;
-                               for (channel_id, chan) in peer_state.channel_by_id.iter_mut() {
-                                       let counterparty_node_id = chan.get_counterparty_node_id();
-                                       let funding_txo = chan.get_funding_txo();
-                                       let (monitor_opt, holding_cell_failed_htlcs) =
-                                               chan.maybe_free_holding_cell_htlcs(&self.logger);
-                                       if !holding_cell_failed_htlcs.is_empty() {
-                                               failed_htlcs.push((holding_cell_failed_htlcs, *channel_id, counterparty_node_id));
-                                       }
-                                       if let Some(monitor_update) = monitor_opt {
-                                               has_monitor_update = true;
 
-                                               let update_res = self.chain_monitor.update_channel(
-                                                       funding_txo.expect("channel is live"), monitor_update);
-                                               let update_id = monitor_update.update_id;
-                                               let channel_id: [u8; 32] = *channel_id;
-                                               let res = handle_new_monitor_update!(self, update_res, update_id,
-                                                       peer_state_lock, peer_state, chan, MANUALLY_REMOVING,
-                                                       peer_state.channel_by_id.remove(&channel_id));
-                                               if res.is_err() {
-                                                       handle_errors.push((counterparty_node_id, res));
+               // Walk our list of channels and find any that need to update. Note that when we do find an
+               // update, if it includes actions that must be taken afterwards, we have to drop the
+               // per-peer state lock as well as the top level per_peer_state lock. Thus, we loop until we
+               // manage to go through all our peers without finding a single channel to update.
+               'peer_loop: loop {
+                       let per_peer_state = self.per_peer_state.read().unwrap();
+                       for (_cp_id, peer_state_mutex) in per_peer_state.iter() {
+                               'chan_loop: loop {
+                                       let mut peer_state_lock = peer_state_mutex.lock().unwrap();
+                                       let peer_state: &mut PeerState<_> = &mut *peer_state_lock;
+                                       for (channel_id, chan) in peer_state.channel_by_id.iter_mut() {
+                                               let counterparty_node_id = chan.get_counterparty_node_id();
+                                               let funding_txo = chan.get_funding_txo();
+                                               let (monitor_opt, holding_cell_failed_htlcs) =
+                                                       chan.maybe_free_holding_cell_htlcs(&self.logger);
+                                               if !holding_cell_failed_htlcs.is_empty() {
+                                                       failed_htlcs.push((holding_cell_failed_htlcs, *channel_id, counterparty_node_id));
+                                               }
+                                               if let Some(monitor_update) = monitor_opt {
+                                                       has_monitor_update = true;
+
+                                                       let update_res = self.chain_monitor.update_channel(
+                                                               funding_txo.expect("channel is live"), monitor_update);
+                                                       let update_id = monitor_update.update_id;
+                                                       let channel_id: [u8; 32] = *channel_id;
+                                                       let res = handle_new_monitor_update!(self, update_res, update_id,
+                                                               peer_state_lock, peer_state, per_peer_state, chan, MANUALLY_REMOVING,
+                                                               peer_state.channel_by_id.remove(&channel_id));
+                                                       if res.is_err() {
+                                                               handle_errors.push((counterparty_node_id, res));
+                                                       }
+                                                       continue 'peer_loop;
                                                }
-                                               continue 'chan_loop;
                                        }
+                                       break 'chan_loop;
                                }
-                               break 'chan_loop;
                        }
+                       break 'peer_loop;
                }
 
                let has_update = has_monitor_update || !failed_htlcs.is_empty() || !handle_errors.is_empty();
@@ -6413,8 +6556,8 @@ pub(crate) fn provided_channel_type_features(config: &UserConfig) -> ChannelType
 /// [`ChannelManager`].
 pub fn provided_init_features(_config: &UserConfig) -> InitFeatures {
        // Note that if new features are added here which other peers may (eventually) require, we
-       // should also add the corresponding (optional) bit to the ChannelMessageHandler impl for
-       // ErroringMessageHandler.
+       // should also add the corresponding (optional) bit to the [`ChannelMessageHandler`] impl for
+       // [`ErroringMessageHandler`].
        let mut features = InitFeatures::empty();
        features.set_data_loss_protect_optional();
        features.set_upfront_shutdown_script_optional();
@@ -6488,6 +6631,7 @@ impl Writeable for ChannelDetails {
                        (33, self.inbound_htlc_minimum_msat, option),
                        (35, self.inbound_htlc_maximum_msat, option),
                        (37, user_channel_id_high_opt, option),
+                       (39, self.feerate_sat_per_1000_weight, option),
                });
                Ok(())
        }
@@ -6523,6 +6667,7 @@ impl Readable for ChannelDetails {
                        (33, inbound_htlc_minimum_msat, option),
                        (35, inbound_htlc_maximum_msat, option),
                        (37, user_channel_id_high_opt, option),
+                       (39, feerate_sat_per_1000_weight, option),
                });
 
                // `user_channel_id` used to be a single u64 value. In order to remain backwards compatible with
@@ -6556,6 +6701,7 @@ impl Readable for ChannelDetails {
                        is_public: is_public.0.unwrap(),
                        inbound_htlc_minimum_msat,
                        inbound_htlc_maximum_msat,
+                       feerate_sat_per_1000_weight,
                })
        }
 }
@@ -6682,7 +6828,9 @@ impl Writeable for ClaimableHTLC {
                        (0, self.prev_hop, required),
                        (1, self.total_msat, required),
                        (2, self.value, required),
+                       (3, self.sender_intended_value, required),
                        (4, payment_data, option),
+                       (5, self.total_value_received, option),
                        (6, self.cltv_expiry, required),
                        (8, keysend_preimage, option),
                });
@@ -6692,17 +6840,21 @@ impl Writeable for ClaimableHTLC {
 
 impl Readable for ClaimableHTLC {
        fn read<R: Read>(reader: &mut R) -> Result<Self, DecodeError> {
-               let mut prev_hop = crate::util::ser::OptionDeserWrapper(None);
+               let mut prev_hop = crate::util::ser::RequiredWrapper(None);
                let mut value = 0;
+               let mut sender_intended_value = None;
                let mut payment_data: Option<msgs::FinalOnionHopData> = None;
                let mut cltv_expiry = 0;
+               let mut total_value_received = None;
                let mut total_msat = None;
                let mut keysend_preimage: Option<PaymentPreimage> = None;
                read_tlv_fields!(reader, {
                        (0, prev_hop, required),
                        (1, total_msat, option),
                        (2, value, required),
+                       (3, sender_intended_value, option),
                        (4, payment_data, option),
+                       (5, total_value_received, option),
                        (6, cltv_expiry, required),
                        (8, keysend_preimage, option)
                });
@@ -6730,6 +6882,8 @@ impl Readable for ClaimableHTLC {
                        prev_hop: prev_hop.0.unwrap(),
                        timer_ticks: 0,
                        value,
+                       sender_intended_value: sender_intended_value.unwrap_or(value),
+                       total_value_received,
                        total_msat: total_msat.unwrap(),
                        onion_payload,
                        cltv_expiry,
@@ -6742,32 +6896,40 @@ impl Readable for HTLCSource {
                let id: u8 = Readable::read(reader)?;
                match id {
                        0 => {
-                               let mut session_priv: crate::util::ser::OptionDeserWrapper<SecretKey> = crate::util::ser::OptionDeserWrapper(None);
+                               let mut session_priv: crate::util::ser::RequiredWrapper<SecretKey> = crate::util::ser::RequiredWrapper(None);
                                let mut first_hop_htlc_msat: u64 = 0;
-                               let mut path = Some(Vec::new());
+                               let mut path: Option<Vec<RouteHop>> = Some(Vec::new());
                                let mut payment_id = None;
                                let mut payment_secret = None;
-                               let mut payment_params = None;
+                               let mut payment_params: Option<PaymentParameters> = None;
                                read_tlv_fields!(reader, {
                                        (0, session_priv, required),
                                        (1, payment_id, option),
                                        (2, first_hop_htlc_msat, required),
                                        (3, payment_secret, option),
                                        (4, path, vec_type),
-                                       (5, payment_params, option),
+                                       (5, payment_params, (option: ReadableArgs, 0)),
                                });
                                if payment_id.is_none() {
                                        // For backwards compat, if there was no payment_id written, use the session_priv bytes
                                        // instead.
                                        payment_id = Some(PaymentId(*session_priv.0.unwrap().as_ref()));
                                }
+                               if path.is_none() || path.as_ref().unwrap().is_empty() {
+                                       return Err(DecodeError::InvalidValue);
+                               }
+                               let path = path.unwrap();
+                               if let Some(params) = payment_params.as_mut() {
+                                       if params.final_cltv_expiry_delta == 0 {
+                                               params.final_cltv_expiry_delta = path.last().unwrap().cltv_expiry_delta;
+                                       }
+                               }
                                Ok(HTLCSource::OutboundRoute {
                                        session_priv: session_priv.0.unwrap(),
                                        first_hop_htlc_msat,
-                                       path: path.unwrap(),
+                                       path,
                                        payment_id: payment_id.unwrap(),
                                        payment_secret,
-                                       payment_params,
                                })
                        }
                        1 => Ok(HTLCSource::PreviousHopData(Readable::read(reader)?)),
@@ -6779,7 +6941,7 @@ impl Readable for HTLCSource {
 impl Writeable for HTLCSource {
        fn write<W: Writer>(&self, writer: &mut W) -> Result<(), crate::io::Error> {
                match self {
-                       HTLCSource::OutboundRoute { ref session_priv, ref first_hop_htlc_msat, ref path, payment_id, payment_secret, payment_params } => {
+                       HTLCSource::OutboundRoute { ref session_priv, ref first_hop_htlc_msat, ref path, payment_id, payment_secret } => {
                                0u8.write(writer)?;
                                let payment_id_opt = Some(payment_id);
                                write_tlv_fields!(writer, {
@@ -6788,7 +6950,7 @@ impl Writeable for HTLCSource {
                                        (2, first_hop_htlc_msat, required),
                                        (3, payment_secret, option),
                                        (4, *path, vec_type),
-                                       (5, payment_params, option),
+                                       (5, None::<PaymentParameters>, option), // payment_params in LDK versions prior to 0.0.115
                                 });
                        }
                        HTLCSource::PreviousHopData(ref field) => {
@@ -6911,7 +7073,10 @@ where
                let mut monitor_update_blocked_actions_per_peer = None;
                let mut peer_states = Vec::new();
                for (_, peer_state_mutex) in per_peer_state.iter() {
-                       peer_states.push(peer_state_mutex.lock().unwrap());
+                       // Because we're holding the owning `per_peer_state` write lock here there's no chance
+                       // of a lockorder violation deadlock - no other thread can be holding any
+                       // per_peer_state lock at all.
+                       peer_states.push(peer_state_mutex.unsafe_well_ordered_double_lock_self());
                }
 
                (serializable_peer_count).write(writer)?;
@@ -7115,7 +7280,7 @@ where
        /// In such cases the latest local transactions will be sent to the tx_broadcaster included in
        /// this struct.
        ///
-       /// (C-not exported) because we have no HashMap bindings
+       /// This is not exported to bindings users because we have no HashMap bindings
        pub channel_monitors: HashMap<OutPoint, &'a mut ChannelMonitor<<SP::Target as SignerProvider>::Signer>>,
 }
 
@@ -7190,6 +7355,7 @@ where
                let mut id_to_peer = HashMap::with_capacity(cmp::min(channel_count as usize, 128));
                let mut short_to_chan_info = HashMap::with_capacity(cmp::min(channel_count as usize, 128));
                let mut channel_closures = Vec::new();
+               let mut pending_background_events = Vec::new();
                for _ in 0..channel_count {
                        let mut channel: Channel<<SP::Target as SignerProvider>::Signer> = Channel::read(reader, (
                                &args.entropy_source, &args.signer_provider, best_block_height, &provided_channel_type_features(&args.default_config)
@@ -7219,9 +7385,11 @@ where
                                        log_error!(args.logger, " The channel will be force-closed and the latest commitment transaction from the ChannelMonitor broadcast.");
                                        log_error!(args.logger, " The ChannelMonitor for channel {} is at update_id {} but the ChannelManager is at update_id {}.",
                                                log_bytes!(channel.channel_id()), monitor.get_latest_update_id(), channel.get_latest_monitor_update_id());
-                                       let (_, mut new_failed_htlcs) = channel.force_shutdown(true);
+                                       let (monitor_update, mut new_failed_htlcs) = channel.force_shutdown(true);
+                                       if let Some(monitor_update) = monitor_update {
+                                               pending_background_events.push(BackgroundEvent::ClosingMonitorUpdate(monitor_update));
+                                       }
                                        failed_htlcs.append(&mut new_failed_htlcs);
-                                       monitor.broadcast_latest_holder_commitment_txn(&args.tx_broadcaster, &args.logger);
                                        channel_closures.push(events::Event::ChannelClosed {
                                                channel_id: channel.channel_id(),
                                                user_channel_id: channel.get_user_id(),
@@ -7286,10 +7454,13 @@ where
                        }
                }
 
-               for (funding_txo, monitor) in args.channel_monitors.iter_mut() {
+               for (funding_txo, _) in args.channel_monitors.iter() {
                        if !funding_txo_set.contains(funding_txo) {
-                               log_info!(args.logger, "Broadcasting latest holder commitment transaction for closed channel {}", log_bytes!(funding_txo.to_channel_id()));
-                               monitor.broadcast_latest_holder_commitment_txn(&args.tx_broadcaster, &args.logger);
+                               let monitor_update = ChannelMonitorUpdate {
+                                       update_id: CLOSED_CHANNEL_UPDATE_ID,
+                                       updates: vec![ChannelMonitorUpdateStep::ChannelForceClosed { should_broadcast: true }],
+                               };
+                               pending_background_events.push(BackgroundEvent::ClosingMonitorUpdate((*funding_txo, monitor_update)));
                        }
                }
 
@@ -7342,10 +7513,17 @@ where
                }
 
                let background_event_count: u64 = Readable::read(reader)?;
-               let mut pending_background_events_read: Vec<BackgroundEvent> = Vec::with_capacity(cmp::min(background_event_count as usize, MAX_ALLOC_SIZE/mem::size_of::<BackgroundEvent>()));
                for _ in 0..background_event_count {
                        match <u8 as Readable>::read(reader)? {
-                               0 => pending_background_events_read.push(BackgroundEvent::ClosingMonitorUpdate((Readable::read(reader)?, Readable::read(reader)?))),
+                               0 => {
+                                       let (funding_txo, monitor_update): (OutPoint, ChannelMonitorUpdate) = (Readable::read(reader)?, Readable::read(reader)?);
+                                       if pending_background_events.iter().find(|e| {
+                                               let BackgroundEvent::ClosingMonitorUpdate((pending_funding_txo, pending_monitor_update)) = e;
+                                               *pending_funding_txo == funding_txo && *pending_monitor_update == monitor_update
+                                       }).is_none() {
+                                               pending_background_events.push(BackgroundEvent::ClosingMonitorUpdate((funding_txo, monitor_update)));
+                                       }
+                               }
                                _ => return Err(DecodeError::InvalidValue),
                        }
                }
@@ -7403,6 +7581,10 @@ where
                        probing_cookie_secret = Some(args.entropy_source.get_secure_random_bytes());
                }
 
+               if !channel_closures.is_empty() {
+                       pending_events_read.append(&mut channel_closures);
+               }
+
                if pending_outbound_payments.is_none() && pending_outbound_payments_no_retry.is_none() {
                        pending_outbound_payments = Some(pending_outbound_payments_compat);
                } else if pending_outbound_payments.is_none() {
@@ -7411,7 +7593,13 @@ where
                                outbounds.insert(id, PendingOutboundPayment::Legacy { session_privs });
                        }
                        pending_outbound_payments = Some(outbounds);
-               } else {
+               }
+               let pending_outbounds = OutboundPayments {
+                       pending_outbound_payments: Mutex::new(pending_outbound_payments.unwrap()),
+                       retry_lock: Mutex::new(())
+               };
+
+               {
                        // If we're tracking pending payments, ensure we haven't lost any by looking at the
                        // ChannelMonitor data for any channels for which we do not have authorative state
                        // (i.e. those for which we just force-closed above or we otherwise don't have a
@@ -7422,16 +7610,17 @@ where
                        // 0.0.102+
                        for (_, monitor) in args.channel_monitors.iter() {
                                if id_to_peer.get(&monitor.get_funding_txo().0.to_channel_id()).is_none() {
-                                       for (htlc_source, htlc) in monitor.get_pending_outbound_htlcs() {
+                                       for (htlc_source, (htlc, _)) in monitor.get_pending_or_resolved_outbound_htlcs() {
                                                if let HTLCSource::OutboundRoute { payment_id, session_priv, path, payment_secret, .. } = htlc_source {
                                                        if path.is_empty() {
                                                                log_error!(args.logger, "Got an empty path for a pending payment");
                                                                return Err(DecodeError::InvalidValue);
                                                        }
+
                                                        let path_amt = path.last().unwrap().fee_msat;
                                                        let mut session_priv_bytes = [0; 32];
                                                        session_priv_bytes[..].copy_from_slice(&session_priv[..]);
-                                                       match pending_outbound_payments.as_mut().unwrap().entry(payment_id) {
+                                                       match pending_outbounds.pending_outbound_payments.lock().unwrap().entry(payment_id) {
                                                                hash_map::Entry::Occupied(mut entry) => {
                                                                        let newly_added = entry.get_mut().insert(session_priv_bytes, &path);
                                                                        log_info!(args.logger, "{} a pending payment path for {} msat for session priv {} on an existing pending payment with payment hash {}",
@@ -7458,48 +7647,64 @@ where
                                                        }
                                                }
                                        }
-                                       for (htlc_source, htlc) in monitor.get_all_current_outbound_htlcs() {
-                                               if let HTLCSource::PreviousHopData(prev_hop_data) = htlc_source {
-                                                       let pending_forward_matches_htlc = |info: &PendingAddHTLCInfo| {
-                                                               info.prev_funding_outpoint == prev_hop_data.outpoint &&
-                                                                       info.prev_htlc_id == prev_hop_data.htlc_id
-                                                       };
-                                                       // The ChannelMonitor is now responsible for this HTLC's
-                                                       // failure/success and will let us know what its outcome is. If we
-                                                       // still have an entry for this HTLC in `forward_htlcs` or
-                                                       // `pending_intercepted_htlcs`, we were apparently not persisted after
-                                                       // the monitor was when forwarding the payment.
-                                                       forward_htlcs.retain(|_, forwards| {
-                                                               forwards.retain(|forward| {
-                                                                       if let HTLCForwardInfo::AddHTLC(htlc_info) = forward {
-                                                                               if pending_forward_matches_htlc(&htlc_info) {
-                                                                                       log_info!(args.logger, "Removing pending to-forward HTLC with hash {} as it was forwarded to the closed channel {}",
-                                                                                               log_bytes!(htlc.payment_hash.0), log_bytes!(monitor.get_funding_txo().0.to_channel_id()));
-                                                                                       false
+                                       for (htlc_source, (htlc, preimage_opt)) in monitor.get_all_current_outbound_htlcs() {
+                                               match htlc_source {
+                                                       HTLCSource::PreviousHopData(prev_hop_data) => {
+                                                               let pending_forward_matches_htlc = |info: &PendingAddHTLCInfo| {
+                                                                       info.prev_funding_outpoint == prev_hop_data.outpoint &&
+                                                                               info.prev_htlc_id == prev_hop_data.htlc_id
+                                                               };
+                                                               // The ChannelMonitor is now responsible for this HTLC's
+                                                               // failure/success and will let us know what its outcome is. If we
+                                                               // still have an entry for this HTLC in `forward_htlcs` or
+                                                               // `pending_intercepted_htlcs`, we were apparently not persisted after
+                                                               // the monitor was when forwarding the payment.
+                                                               forward_htlcs.retain(|_, forwards| {
+                                                                       forwards.retain(|forward| {
+                                                                               if let HTLCForwardInfo::AddHTLC(htlc_info) = forward {
+                                                                                       if pending_forward_matches_htlc(&htlc_info) {
+                                                                                               log_info!(args.logger, "Removing pending to-forward HTLC with hash {} as it was forwarded to the closed channel {}",
+                                                                                                       log_bytes!(htlc.payment_hash.0), log_bytes!(monitor.get_funding_txo().0.to_channel_id()));
+                                                                                               false
+                                                                                       } else { true }
                                                                                } else { true }
+                                                                       });
+                                                                       !forwards.is_empty()
+                                                               });
+                                                               pending_intercepted_htlcs.as_mut().unwrap().retain(|intercepted_id, htlc_info| {
+                                                                       if pending_forward_matches_htlc(&htlc_info) {
+                                                                               log_info!(args.logger, "Removing pending intercepted HTLC with hash {} as it was forwarded to the closed channel {}",
+                                                                                       log_bytes!(htlc.payment_hash.0), log_bytes!(monitor.get_funding_txo().0.to_channel_id()));
+                                                                               pending_events_read.retain(|event| {
+                                                                                       if let Event::HTLCIntercepted { intercept_id: ev_id, .. } = event {
+                                                                                               intercepted_id != ev_id
+                                                                                       } else { true }
+                                                                               });
+                                                                               false
                                                                        } else { true }
                                                                });
-                                                               !forwards.is_empty()
-                                                       });
-                                                       pending_intercepted_htlcs.as_mut().unwrap().retain(|intercepted_id, htlc_info| {
-                                                               if pending_forward_matches_htlc(&htlc_info) {
-                                                                       log_info!(args.logger, "Removing pending intercepted HTLC with hash {} as it was forwarded to the closed channel {}",
-                                                                               log_bytes!(htlc.payment_hash.0), log_bytes!(monitor.get_funding_txo().0.to_channel_id()));
-                                                                       pending_events_read.retain(|event| {
-                                                                               if let Event::HTLCIntercepted { intercept_id: ev_id, .. } = event {
-                                                                                       intercepted_id != ev_id
-                                                                               } else { true }
-                                                                       });
-                                                                       false
-                                                               } else { true }
-                                                       });
+                                                       },
+                                                       HTLCSource::OutboundRoute { payment_id, session_priv, path, .. } => {
+                                                               if let Some(preimage) = preimage_opt {
+                                                                       let pending_events = Mutex::new(pending_events_read);
+                                                                       // Note that we set `from_onchain` to "false" here,
+                                                                       // deliberately keeping the pending payment around forever.
+                                                                       // Given it should only occur when we have a channel we're
+                                                                       // force-closing for being stale that's okay.
+                                                                       // The alternative would be to wipe the state when claiming,
+                                                                       // generating a `PaymentPathSuccessful` event but regenerating
+                                                                       // it and the `PaymentSent` on every restart until the
+                                                                       // `ChannelMonitor` is removed.
+                                                                       pending_outbounds.claim_htlc(payment_id, preimage, session_priv, path, false, &pending_events, &args.logger);
+                                                                       pending_events_read = pending_events.into_inner().unwrap();
+                                                               }
+                                                       },
                                                }
                                        }
                                }
                        }
                }
 
-               let pending_outbounds = OutboundPayments { pending_outbound_payments: Mutex::new(pending_outbound_payments.unwrap()), retry_lock: Mutex::new(()) };
                if !forward_htlcs.is_empty() || pending_outbounds.needs_abandon() {
                        // If we have pending HTLCs to forward, assume we either dropped a
                        // `PendingHTLCsForwardable` or the user received it but never processed it as they
@@ -7557,10 +7762,6 @@ where
                let mut secp_ctx = Secp256k1::new();
                secp_ctx.seeded_randomize(&args.entropy_source.get_secure_random_bytes());
 
-               if !channel_closures.is_empty() {
-                       pending_events_read.append(&mut channel_closures);
-               }
-
                let our_network_pubkey = match args.node_signer.get_node_id(Recipient::Node) {
                        Ok(key) => key,
                        Err(()) => return Err(DecodeError::InvalidValue)
@@ -7697,7 +7898,7 @@ where
                        per_peer_state: FairRwLock::new(per_peer_state),
 
                        pending_events: Mutex::new(pending_events_read),
-                       pending_background_events: Mutex::new(pending_background_events_read),
+                       pending_background_events: Mutex::new(pending_background_events),
                        total_consistency_lock: RwLock::new(()),
                        persistence_notifier: Notifier::new(),
 
@@ -7730,6 +7931,7 @@ mod tests {
        use bitcoin::secp256k1::{PublicKey, Secp256k1, SecretKey};
        use core::time::Duration;
        use core::sync::atomic::Ordering;
+       use crate::events::{Event, HTLCDestination, MessageSendEvent, MessageSendEventsProvider, ClosureReason};
        use crate::ln::{PaymentPreimage, PaymentHash, PaymentSecret};
        use crate::ln::channelmanager::{inbound_payment, PaymentId, PaymentSendFailure, InterceptId};
        use crate::ln::functional_test_utils::*;
@@ -7737,7 +7939,6 @@ mod tests {
        use crate::ln::msgs::ChannelMessageHandler;
        use crate::routing::router::{PaymentParameters, RouteParameters, find_route};
        use crate::util::errors::APIError;
-       use crate::util::events::{Event, HTLCDestination, MessageSendEvent, MessageSendEventsProvider, ClosureReason};
        use crate::util::test_utils;
        use crate::util::config::ChannelConfig;
        use crate::chain::keysinterface::EntropySource;
@@ -7763,8 +7964,10 @@ mod tests {
                // to connect messages with new values
                chan.0.contents.fee_base_msat *= 2;
                chan.1.contents.fee_base_msat *= 2;
-               let node_a_chan_info = nodes[0].node.list_channels()[0].clone();
-               let node_b_chan_info = nodes[1].node.list_channels()[0].clone();
+               let node_a_chan_info = nodes[0].node.list_channels_with_counterparty(
+                       &nodes[1].node.get_our_node_id()).pop().unwrap();
+               let node_b_chan_info = nodes[1].node.list_channels_with_counterparty(
+                       &nodes[0].node.get_our_node_id()).pop().unwrap();
 
                // The first two nodes (which opened a channel) should now require fresh persistence
                assert!(nodes[0].node.await_persistable_update_timeout(Duration::from_millis(1)));
@@ -7840,7 +8043,7 @@ mod tests {
                // indicates there are more HTLCs coming.
                let cur_height = CHAN_CONFIRM_DEPTH + 1; // route_payment calls send_payment, which adds 1 to the current height. So we do the same here to match.
                let session_privs = nodes[0].node.test_add_new_pending_payment(our_payment_hash, Some(payment_secret), payment_id, &mpp_route).unwrap();
-               nodes[0].node.send_payment_along_path(&mpp_route.paths[0], &route.payment_params, &our_payment_hash, &Some(payment_secret), 200_000, cur_height, payment_id, &None, session_privs[0]).unwrap();
+               nodes[0].node.test_send_payment_along_path(&mpp_route.paths[0], &our_payment_hash, &Some(payment_secret), 200_000, cur_height, payment_id, &None, session_privs[0]).unwrap();
                check_added_monitors!(nodes[0], 1);
                let mut events = nodes[0].node.get_and_clear_pending_msg_events();
                assert_eq!(events.len(), 1);
@@ -7870,7 +8073,7 @@ mod tests {
                expect_payment_failed!(nodes[0], our_payment_hash, true);
 
                // Send the second half of the original MPP payment.
-               nodes[0].node.send_payment_along_path(&mpp_route.paths[1], &route.payment_params, &our_payment_hash, &Some(payment_secret), 200_000, cur_height, payment_id, &None, session_privs[1]).unwrap();
+               nodes[0].node.test_send_payment_along_path(&mpp_route.paths[1], &our_payment_hash, &Some(payment_secret), 200_000, cur_height, payment_id, &None, session_privs[1]).unwrap();
                check_added_monitors!(nodes[0], 1);
                let mut events = nodes[0].node.get_and_clear_pending_msg_events();
                assert_eq!(events.len(), 1);
@@ -7962,7 +8165,6 @@ mod tests {
                let route_params = RouteParameters {
                        payment_params: PaymentParameters::for_keysend(expected_route.last().unwrap().node.get_our_node_id(), TEST_FINAL_CLTV),
                        final_value_msat: 100_000,
-                       final_cltv_expiry_delta: TEST_FINAL_CLTV,
                };
                let route = find_route(
                        &nodes[0].node.get_our_node_id(), &route_params, &nodes[0].network_graph,
@@ -8053,7 +8255,6 @@ mod tests {
                let route_params = RouteParameters {
                        payment_params: PaymentParameters::for_keysend(payee_pubkey, 40),
                        final_value_msat: 10_000,
-                       final_cltv_expiry_delta: 40,
                };
                let network_graph = nodes[0].network_graph.clone();
                let first_hops = nodes[0].node.list_usable_channels();
@@ -8078,7 +8279,7 @@ mod tests {
                assert!(updates.update_fee.is_none());
                nodes[1].node.handle_update_add_htlc(&nodes[0].node.get_our_node_id(), &updates.update_add_htlcs[0]);
 
-               nodes[1].logger.assert_log_contains("lightning::ln::channelmanager".to_string(), "Payment preimage didn't match payment hash".to_string(), 1);
+               nodes[1].logger.assert_log_contains("lightning::ln::channelmanager", "Payment preimage didn't match payment hash", 1);
        }
 
        #[test]
@@ -8096,7 +8297,6 @@ mod tests {
                let route_params = RouteParameters {
                        payment_params: PaymentParameters::for_keysend(payee_pubkey, 40),
                        final_value_msat: 10_000,
-                       final_cltv_expiry_delta: 40,
                };
                let network_graph = nodes[0].network_graph.clone();
                let first_hops = nodes[0].node.list_usable_channels();
@@ -8122,7 +8322,7 @@ mod tests {
                assert!(updates.update_fee.is_none());
                nodes[1].node.handle_update_add_htlc(&nodes[0].node.get_our_node_id(), &updates.update_add_htlcs[0]);
 
-               nodes[1].logger.assert_log_contains("lightning::ln::channelmanager".to_string(), "We don't support MPP keysend payments".to_string(), 1);
+               nodes[1].logger.assert_log_contains("lightning::ln::channelmanager", "We don't support MPP keysend payments", 1);
        }
 
        #[test]
@@ -8150,7 +8350,8 @@ mod tests {
 
                match nodes[0].node.send_payment(&route, payment_hash, &None, PaymentId(payment_hash.0)).unwrap_err() {
                        PaymentSendFailure::ParameterError(APIError::APIMisuseError { ref err }) => {
-                               assert!(regex::Regex::new(r"Payment secret is required for multi-path payments").unwrap().is_match(err))                        },
+                               assert!(regex::Regex::new(r"Payment secret is required for multi-path payments").unwrap().is_match(err))
+                       },
                        _ => panic!("unexpected error")
                }
        }
@@ -8210,7 +8411,7 @@ mod tests {
                match inbound_payment::verify(bad_payment_hash, &payment_data, nodes[0].node.highest_seen_timestamp.load(Ordering::Acquire) as u64, &nodes[0].node.inbound_payment_key, &nodes[0].logger) {
                        Ok(_) => panic!("Unexpected ok"),
                        Err(()) => {
-                               nodes[0].logger.assert_log_contains("lightning::ln::inbound_payment".to_string(), "Failing HTLC with user-generated payment_hash".to_string(), 1);
+                               nodes[0].logger.assert_log_contains("lightning::ln::inbound_payment", "Failing HTLC with user-generated payment_hash", 1);
                        }
                }
 
@@ -8250,10 +8451,10 @@ mod tests {
                        let nodes_0_lock = nodes[0].node.id_to_peer.lock().unwrap();
                        assert_eq!(nodes_0_lock.len(), 1);
                        assert!(nodes_0_lock.contains_key(channel_id));
-
-                       assert_eq!(nodes[1].node.id_to_peer.lock().unwrap().len(), 0);
                }
 
+               assert_eq!(nodes[1].node.id_to_peer.lock().unwrap().len(), 0);
+
                let funding_created_msg = get_event_msg!(nodes[0], MessageSendEvent::SendFundingCreated, nodes[1].node.get_our_node_id());
 
                nodes[1].node.handle_funding_created(&nodes[0].node.get_our_node_id(), &funding_created_msg);
@@ -8261,7 +8462,9 @@ mod tests {
                        let nodes_0_lock = nodes[0].node.id_to_peer.lock().unwrap();
                        assert_eq!(nodes_0_lock.len(), 1);
                        assert!(nodes_0_lock.contains_key(channel_id));
+               }
 
+               {
                        // Assert that `nodes[1]`'s `id_to_peer` map is populated with the channel as soon as
                        // as it has the funding transaction.
                        let nodes_1_lock = nodes[1].node.id_to_peer.lock().unwrap();
@@ -8291,7 +8494,9 @@ mod tests {
                        let nodes_0_lock = nodes[0].node.id_to_peer.lock().unwrap();
                        assert_eq!(nodes_0_lock.len(), 1);
                        assert!(nodes_0_lock.contains_key(channel_id));
+               }
 
+               {
                        // At this stage, `nodes[1]` has proposed a fee for the closing transaction in the
                        // `handle_closing_signed` call above. As `nodes[1]` has not yet received the signature
                        // from `nodes[0]` for the closing transaction with the proposed fee, the channel is
@@ -8423,7 +8628,7 @@ mod tests {
                // A MAX_UNFUNDED_CHANS_PER_PEER + 1 channel will be summarily rejected
                open_channel_msg.temporary_channel_id = nodes[0].keys_manager.get_secure_random_bytes();
                nodes[1].node.handle_open_channel(&nodes[0].node.get_our_node_id(), &open_channel_msg);
-               assert_eq!(get_err_msg!(nodes[1], nodes[0].node.get_our_node_id()).channel_id,
+               assert_eq!(get_err_msg(&nodes[1], &nodes[0].node.get_our_node_id()).channel_id,
                        open_channel_msg.temporary_channel_id);
 
                // Further, because all of our channels with nodes[0] are inbound, and none of them funded,
@@ -8470,7 +8675,7 @@ mod tests {
                        open_channel_msg.temporary_channel_id = nodes[0].keys_manager.get_secure_random_bytes();
                }
                nodes[1].node.handle_open_channel(&last_random_pk, &open_channel_msg);
-               assert_eq!(get_err_msg!(nodes[1], last_random_pk).channel_id,
+               assert_eq!(get_err_msg(&nodes[1], &last_random_pk).channel_id,
                        open_channel_msg.temporary_channel_id);
 
                // Of course, however, outbound channels are always allowed
@@ -8512,7 +8717,7 @@ mod tests {
                // Once we have MAX_UNFUNDED_CHANS_PER_PEER unfunded channels, new inbound channels will be
                // rejected.
                nodes[1].node.handle_open_channel(&nodes[0].node.get_our_node_id(), &open_channel_msg);
-               assert_eq!(get_err_msg!(nodes[1], nodes[0].node.get_our_node_id()).channel_id,
+               assert_eq!(get_err_msg(&nodes[1], &nodes[0].node.get_our_node_id()).channel_id,
                        open_channel_msg.temporary_channel_id);
 
                // but we can still open an outbound channel.
@@ -8521,7 +8726,7 @@ mod tests {
 
                // but even with such an outbound channel, additional inbound channels will still fail.
                nodes[1].node.handle_open_channel(&nodes[0].node.get_our_node_id(), &open_channel_msg);
-               assert_eq!(get_err_msg!(nodes[1], nodes[0].node.get_our_node_id()).channel_id,
+               assert_eq!(get_err_msg(&nodes[1], &nodes[0].node.get_our_node_id()).channel_id,
                        open_channel_msg.temporary_channel_id);
        }
 
@@ -8577,7 +8782,7 @@ mod tests {
                        }
                        _ => panic!("Unexpected event"),
                }
-               assert_eq!(get_err_msg!(nodes[1], last_random_pk).channel_id,
+               assert_eq!(get_err_msg(&nodes[1], &last_random_pk).channel_id,
                        open_channel_msg.temporary_channel_id);
 
                // ...however if we accept the same channel 0conf it should work just fine.
@@ -8619,7 +8824,7 @@ mod tests {
                        _ => panic!("Unexpected event"),
                }
 
-               let error_msg = get_err_msg!(nodes[1], nodes[0].node.get_our_node_id());
+               let error_msg = get_err_msg(&nodes[1], &nodes[0].node.get_our_node_id());
                nodes[0].node.handle_error(&nodes[1].node.get_our_node_id(), &error_msg);
 
                let open_channel_msg = get_event_msg!(nodes[0], MessageSendEvent::SendOpenChannel, nodes[1].node.get_our_node_id());
@@ -8634,6 +8839,7 @@ pub mod bench {
        use crate::chain::Listen;
        use crate::chain::chainmonitor::{ChainMonitor, Persist};
        use crate::chain::keysinterface::{EntropySource, KeysManager, InMemorySigner};
+       use crate::events::{Event, MessageSendEvent, MessageSendEventsProvider};
        use crate::ln::channelmanager::{self, BestBlock, ChainParameters, ChannelManager, PaymentHash, PaymentPreimage, PaymentId};
        use crate::ln::functional_test_utils::*;
        use crate::ln::msgs::{ChannelMessageHandler, Init};
@@ -8641,7 +8847,6 @@ pub mod bench {
        use crate::routing::router::{PaymentParameters, get_route};
        use crate::util::test_utils;
        use crate::util::config::UserConfig;
-       use crate::util::events::{Event, MessageSendEvent, MessageSendEventsProvider};
 
        use bitcoin::hashes::Hash;
        use bitcoin::hashes::sha256::Hash as Sha256;
@@ -8785,7 +8990,7 @@ pub mod bench {
                                let payment_event = SendEvent::from_event($node_a.get_and_clear_pending_msg_events().pop().unwrap());
                                $node_b.handle_update_add_htlc(&$node_a.get_our_node_id(), &payment_event.msgs[0]);
                                $node_b.handle_commitment_signed(&$node_a.get_our_node_id(), &payment_event.commitment_msg);
-                               let (raa, cs) = get_revoke_commit_msgs!(NodeHolder { node: &$node_b }, $node_a.get_our_node_id());
+                               let (raa, cs) = do_get_revoke_commit_msgs!(NodeHolder { node: &$node_b }, &$node_a.get_our_node_id());
                                $node_a.handle_revoke_and_ack(&$node_b.get_our_node_id(), &raa);
                                $node_a.handle_commitment_signed(&$node_b.get_our_node_id(), &cs);
                                $node_b.handle_revoke_and_ack(&$node_a.get_our_node_id(), &get_event_msg!(NodeHolder { node: &$node_a }, MessageSendEvent::SendRevokeAndACK, $node_b.get_our_node_id()));
@@ -8804,7 +9009,7 @@ pub mod bench {
                                        _ => panic!("Failed to generate claim event"),
                                }
 
-                               let (raa, cs) = get_revoke_commit_msgs!(NodeHolder { node: &$node_a }, $node_b.get_our_node_id());
+                               let (raa, cs) = do_get_revoke_commit_msgs!(NodeHolder { node: &$node_a }, &$node_b.get_our_node_id());
                                $node_b.handle_revoke_and_ack(&$node_a.get_our_node_id(), &raa);
                                $node_b.handle_commitment_signed(&$node_a.get_our_node_id(), &cs);
                                $node_a.handle_revoke_and_ack(&$node_b.get_our_node_id(), &get_event_msg!(NodeHolder { node: &$node_b }, MessageSendEvent::SendRevokeAndACK, $node_a.get_our_node_id()));