Merge pull request #3160 from TheBlueMatt/2024-07-better-enum-upgradable-ser
[rust-lightning] / lightning / src / ln / channelmanager.rs
index f5e83dcc38252868807e1f0a5165769d2c6c7592..b14b6e608777529c55f44b7feef6184757b0e18e 100644 (file)
@@ -31,6 +31,7 @@ use bitcoin::secp256k1::{SecretKey,PublicKey};
 use bitcoin::secp256k1::Secp256k1;
 use bitcoin::{secp256k1, Sequence};
 
+use crate::blinded_path::message::{MessageContext, OffersContext};
 use crate::blinded_path::{BlindedPath, NodeIdLookUp};
 use crate::blinded_path::message::ForwardNode;
 use crate::blinded_path::payment::{Bolt12OfferContext, Bolt12RefundContext, PaymentConstraints, PaymentContext, ReceiveTlvs};
@@ -58,7 +59,7 @@ use crate::ln::onion_utils::{HTLCFailReason, INVALID_ONION_BLINDING};
 use crate::ln::msgs::{ChannelMessageHandler, DecodeError, LightningError};
 #[cfg(test)]
 use crate::ln::outbound_payment;
-use crate::ln::outbound_payment::{Bolt12PaymentError, OutboundPayments, PaymentAttempts, PendingOutboundPayment, SendAlongPathArgs, StaleExpiration};
+use crate::ln::outbound_payment::{OutboundPayments, PaymentAttempts, PendingOutboundPayment, SendAlongPathArgs, StaleExpiration};
 use crate::ln::wire::Encode;
 use crate::offers::invoice::{BlindedPayInfo, Bolt12Invoice, DEFAULT_RELATIVE_EXPIRY, DerivedSigningPubkey, ExplicitSigningPubkey, InvoiceBuilder, UnsignedBolt12Invoice};
 use crate::offers::invoice_error::InvoiceError;
@@ -66,6 +67,7 @@ use crate::offers::invoice_request::{DerivedPayerId, InvoiceRequestBuilder};
 use crate::offers::offer::{Offer, OfferBuilder};
 use crate::offers::parse::Bolt12SemanticError;
 use crate::offers::refund::{Refund, RefundBuilder};
+use crate::onion_message::async_payments::{AsyncPaymentsMessage, HeldHtlcAvailable, ReleaseHeldHtlc, AsyncPaymentsMessageHandler};
 use crate::onion_message::messenger::{new_pending_onion_message, Destination, MessageRouter, PendingOnionMessage, Responder, ResponseInstruction};
 use crate::onion_message::offers::{OffersMessage, OffersMessageHandler};
 use crate::sign::{EntropySource, NodeSigner, Recipient, SignerProvider};
@@ -105,7 +107,7 @@ use core::time::Duration;
 use core::ops::Deref;
 
 // Re-export this for use in the public API.
-pub use crate::ln::outbound_payment::{PaymentSendFailure, ProbeSendFailure, Retry, RetryableSendFailure, RecipientOnionFields};
+pub use crate::ln::outbound_payment::{Bolt12PaymentError, PaymentSendFailure, ProbeSendFailure, Retry, RetryableSendFailure, RecipientOnionFields};
 use crate::ln::script::ShutdownScript;
 
 // We hold various information about HTLC relay in the HTLC objects in Channel itself:
@@ -306,6 +308,7 @@ pub(super) struct PendingAddHTLCInfo {
        // Note that this may be an outbound SCID alias for the associated channel.
        prev_short_channel_id: u64,
        prev_htlc_id: u64,
+       prev_counterparty_node_id: Option<PublicKey>,
        prev_channel_id: ChannelId,
        prev_funding_outpoint: OutPoint,
        prev_user_channel_id: u128,
@@ -349,9 +352,10 @@ pub(crate) struct HTLCPreviousHopData {
        blinded_failure: Option<BlindedFailure>,
        channel_id: ChannelId,
 
-       // This field is consumed by `claim_funds_from_hop()` when updating a force-closed backwards
+       // These fields are consumed by `claim_funds_from_hop()` when updating a force-closed backwards
        // channel with a preimage provided by the forward channel.
        outpoint: OutPoint,
+       counterparty_node_id: Option<PublicKey>,
 }
 
 enum OnionPayload {
@@ -472,7 +476,7 @@ impl_writeable_tlv_based_enum!(SentHTLCId,
        },
        (2, OutboundRoute) => {
                (0, session_priv, required),
-       };
+       },
 );
 
 
@@ -666,7 +670,7 @@ pub(super) const MIN_HTLC_RELAY_HOLDING_CELL_MILLIS: u64 = 100;
 /// be sent in the order they appear in the return value, however sometimes the order needs to be
 /// variable at runtime (eg Channel::channel_reestablish needs to re-send messages in the order
 /// they were originally sent). In those cases, this enum is also returned.
-#[derive(Clone, PartialEq)]
+#[derive(Clone, PartialEq, Debug)]
 pub(super) enum RAACommitmentOrder {
        /// Send the CommitmentUpdate messages first
        CommitmentFirst,
@@ -755,13 +759,55 @@ enum BackgroundEvent {
        },
 }
 
+/// A pointer to a channel that is unblocked when an event is surfaced
+#[derive(Debug)]
+pub(crate) struct EventUnblockedChannel {
+       counterparty_node_id: PublicKey,
+       funding_txo: OutPoint,
+       channel_id: ChannelId,
+       blocking_action: RAAMonitorUpdateBlockingAction,
+}
+
+impl Writeable for EventUnblockedChannel {
+       fn write<W: Writer>(&self, writer: &mut W) -> Result<(), io::Error> {
+               self.counterparty_node_id.write(writer)?;
+               self.funding_txo.write(writer)?;
+               self.channel_id.write(writer)?;
+               self.blocking_action.write(writer)
+       }
+}
+
+impl MaybeReadable for EventUnblockedChannel {
+       fn read<R: Read>(reader: &mut R) -> Result<Option<Self>, DecodeError> {
+               let counterparty_node_id = Readable::read(reader)?;
+               let funding_txo = Readable::read(reader)?;
+               let channel_id = Readable::read(reader)?;
+               let blocking_action = match RAAMonitorUpdateBlockingAction::read(reader)? {
+                       Some(blocking_action) => blocking_action,
+                       None => return Ok(None),
+               };
+               Ok(Some(EventUnblockedChannel {
+                       counterparty_node_id,
+                       funding_txo,
+                       channel_id,
+                       blocking_action,
+               }))
+       }
+}
+
 #[derive(Debug)]
 pub(crate) enum MonitorUpdateCompletionAction {
        /// Indicates that a payment ultimately destined for us was claimed and we should emit an
        /// [`events::Event::PaymentClaimed`] to the user if we haven't yet generated such an event for
        /// this payment. Note that this is only best-effort. On restart it's possible such a duplicate
        /// event can be generated.
-       PaymentClaimed { payment_hash: PaymentHash },
+       PaymentClaimed {
+               payment_hash: PaymentHash,
+               /// A pending MPP claim which hasn't yet completed.
+               ///
+               /// Not written to disk.
+               pending_mpp_claim: Option<(PublicKey, ChannelId, u64, PendingMPPClaimPointer)>,
+       },
        /// Indicates an [`events::Event`] should be surfaced to the user and possibly resume the
        /// operation of another channel.
        ///
@@ -772,7 +818,7 @@ pub(crate) enum MonitorUpdateCompletionAction {
        /// outbound edge.
        EmitEventAndFreeOtherChannel {
                event: events::Event,
-               downstream_counterparty_and_funding_outpoint: Option<(PublicKey, OutPoint, ChannelId, RAAMonitorUpdateBlockingAction)>,
+               downstream_counterparty_and_funding_outpoint: Option<EventUnblockedChannel>,
        },
        /// Indicates we should immediately resume the operation of another channel, unless there is
        /// some other reason why the channel is blocked. In practice this simply means immediately
@@ -795,13 +841,16 @@ pub(crate) enum MonitorUpdateCompletionAction {
 }
 
 impl_writeable_tlv_based_enum_upgradable!(MonitorUpdateCompletionAction,
-       (0, PaymentClaimed) => { (0, payment_hash, required) },
+       (0, PaymentClaimed) => {
+               (0, payment_hash, required),
+               (9999999999, pending_mpp_claim, (static_value, None)),
+       },
        // Note that FreeOtherChannelImmediately should never be written - we were supposed to free
        // *immediately*. However, for simplicity we implement read/write here.
        (1, FreeOtherChannelImmediately) => {
                (0, downstream_counterparty_node_id, required),
                (2, downstream_funding_outpoint, required),
-               (4, blocking_action, required),
+               (4, blocking_action, upgradable_required),
                // Note that by the time we get past the required read above, downstream_funding_outpoint will be
                // filled in, so we can safely unwrap it here.
                (5, downstream_channel_id, (default_value, ChannelId::v1_from_funding_outpoint(downstream_funding_outpoint.0.unwrap()))),
@@ -813,7 +862,7 @@ impl_writeable_tlv_based_enum_upgradable!(MonitorUpdateCompletionAction,
                // monitor updates which aren't properly blocked or resumed, however that's fine - we don't
                // support async monitor updates even in LDK 0.0.116 and once we do we'll require no
                // downgrades to prior versions.
-               (1, downstream_counterparty_and_funding_outpoint, option),
+               (1, downstream_counterparty_and_funding_outpoint, upgradable_option),
        },
 );
 
@@ -832,9 +881,29 @@ impl_writeable_tlv_based_enum!(EventCompletionAction,
                // Note that by the time we get past the required read above, channel_funding_outpoint will be
                // filled in, so we can safely unwrap it here.
                (3, channel_id, (default_value, ChannelId::v1_from_funding_outpoint(channel_funding_outpoint.0.unwrap()))),
-       };
+       }
 );
 
+#[derive(Debug)]
+pub(crate) struct PendingMPPClaim {
+       channels_without_preimage: Vec<(PublicKey, OutPoint, ChannelId, u64)>,
+       channels_with_preimage: Vec<(PublicKey, OutPoint, ChannelId)>,
+}
+
+#[derive(Clone)]
+pub(crate) struct PendingMPPClaimPointer(Arc<Mutex<PendingMPPClaim>>);
+
+impl PartialEq for PendingMPPClaimPointer {
+       fn eq(&self, o: &Self) -> bool { Arc::ptr_eq(&self.0, &o.0) }
+}
+impl Eq for PendingMPPClaimPointer {}
+
+impl core::fmt::Debug for PendingMPPClaimPointer {
+       fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> Result<(), core::fmt::Error> {
+               self.0.lock().unwrap().fmt(f)
+       }
+}
+
 #[derive(Clone, PartialEq, Eq, Debug)]
 /// If something is blocked on the completion of an RAA-generated [`ChannelMonitorUpdate`] we track
 /// the blocked action here. See enum variants for more info.
@@ -848,6 +917,16 @@ pub(crate) enum RAAMonitorUpdateBlockingAction {
                /// The HTLC ID on the inbound edge.
                htlc_id: u64,
        },
+       /// We claimed an MPP payment across multiple channels. We have to block removing the payment
+       /// preimage from any monitor until the last monitor is updated to contain the payment
+       /// preimage. Otherwise we may not be able to replay the preimage on the monitor(s) that
+       /// weren't updated on startup.
+       ///
+       /// This variant is *not* written to disk, instead being inferred from [`ChannelMonitor`]
+       /// state.
+       ClaimedMPPPayment {
+               pending_claim: PendingMPPClaimPointer,
+       }
 }
 
 impl RAAMonitorUpdateBlockingAction {
@@ -859,10 +938,16 @@ impl RAAMonitorUpdateBlockingAction {
        }
 }
 
-impl_writeable_tlv_based_enum!(RAAMonitorUpdateBlockingAction,
-       (0, ForwardedPaymentInboundClaim) => { (0, channel_id, required), (2, htlc_id, required) }
-;);
+impl_writeable_tlv_based_enum_upgradable!(RAAMonitorUpdateBlockingAction,
+       (0, ForwardedPaymentInboundClaim) => { (0, channel_id, required), (2, htlc_id, required) },
+       unread_variants: ClaimedMPPPayment
+);
 
+impl Readable for Option<RAAMonitorUpdateBlockingAction> {
+       fn read<R: Read>(reader: &mut R) -> Result<Self, DecodeError> {
+               Ok(RAAMonitorUpdateBlockingAction::read(reader)?)
+       }
+}
 
 /// State we hold per-peer.
 pub(super) struct PeerState<SP: Deref> where SP::Target: SignerProvider {
@@ -2989,7 +3074,7 @@ where
                        let config = if override_config.is_some() { override_config.as_ref().unwrap() } else { &self.default_configuration };
                        match OutboundV1Channel::new(&self.fee_estimator, &self.entropy_source, &self.signer_provider, their_network_key,
                                their_features, channel_value_satoshis, push_msat, user_channel_id, config,
-                               self.best_block.read().unwrap().height, outbound_scid_alias, temporary_channel_id)
+                               self.best_block.read().unwrap().height, outbound_scid_alias, temporary_channel_id, &*self.logger)
                        {
                                Ok(res) => res,
                                Err(e) => {
@@ -3505,7 +3590,7 @@ where
                        // peer has been disabled for some time), return `channel_disabled`,
                        // otherwise return `temporary_channel_failure`.
                        let chan_update_opt = self.get_channel_update_for_onion(next_packet.outgoing_scid, chan).ok();
-                       if chan_update_opt.as_ref().map(|u| u.contents.flags & 2 == 2).unwrap_or(false) {
+                       if chan_update_opt.as_ref().map(|u| u.contents.channel_flags & 2 == 2).unwrap_or(false) {
                                return Err(("Forwarding channel has been disconnected for some time.", 0x1000 | 20, chan_update_opt));
                        } else {
                                return Err(("Forwarding channel is not in a ready state.", 0x1000 | 7, chan_update_opt));
@@ -3784,7 +3869,8 @@ where
                        chain_hash: self.chain_hash,
                        short_channel_id,
                        timestamp: chan.context.get_update_time_counter(),
-                       flags: (!were_node_one) as u8 | ((!enabled as u8) << 1),
+                       message_flags: 1, // Only must_be_one
+                       channel_flags: (!were_node_one) as u8 | ((!enabled as u8) << 1),
                        cltv_expiry_delta: chan.context.get_cltv_expiry_delta(),
                        htlc_minimum_msat: chan.context.get_counterparty_htlc_minimum_msat(),
                        htlc_maximum_msat: chan.context.get_announced_htlc_max_msat(),
@@ -3996,14 +4082,43 @@ where
                self.pending_outbound_payments.test_set_payment_metadata(payment_id, new_payment_metadata);
        }
 
-       pub(super) fn send_payment_for_bolt12_invoice(&self, invoice: &Bolt12Invoice, payment_id: PaymentId) -> Result<(), Bolt12PaymentError> {
+       /// Pays the [`Bolt12Invoice`] associated with the `payment_id` encoded in its `payer_metadata`.
+       ///
+       /// The invoice's `payer_metadata` is used to authenticate that the invoice was indeed requested
+       /// before attempting a payment. [`Bolt12PaymentError::UnexpectedInvoice`] is returned if this
+       /// fails or if the encoded `payment_id` is not recognized. The latter may happen once the
+       /// payment is no longer tracked because the payment was attempted after:
+       /// - an invoice for the `payment_id` was already paid,
+       /// - one full [timer tick] has elapsed since initially requesting the invoice when paying an
+       ///   offer, or
+       /// - the refund corresponding to the invoice has already expired.
+       ///
+       /// To retry the payment, request another invoice using a new `payment_id`.
+       ///
+       /// Attempting to pay the same invoice twice while the first payment is still pending will
+       /// result in a [`Bolt12PaymentError::DuplicateInvoice`].
+       ///
+       /// Otherwise, either [`Event::PaymentSent`] or [`Event::PaymentFailed`] are used to indicate
+       /// whether or not the payment was successful.
+       ///
+       /// [timer tick]: Self::timer_tick_occurred
+       pub fn send_payment_for_bolt12_invoice(&self, invoice: &Bolt12Invoice) -> Result<(), Bolt12PaymentError> {
+               let secp_ctx = &self.secp_ctx;
+               let expanded_key = &self.inbound_payment_key;
+               match invoice.verify(expanded_key, secp_ctx) {
+                       Ok(payment_id) => self.send_payment_for_verified_bolt12_invoice(invoice, payment_id),
+                       Err(()) => Err(Bolt12PaymentError::UnexpectedInvoice),
+               }
+       }
+
+       fn send_payment_for_verified_bolt12_invoice(&self, invoice: &Bolt12Invoice, payment_id: PaymentId) -> Result<(), Bolt12PaymentError> {
                let best_block_height = self.best_block.read().unwrap().height;
                let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self);
                self.pending_outbound_payments
                        .send_payment_for_bolt12_invoice(
                                invoice, payment_id, &self.router, self.list_usable_channels(),
-                               || self.compute_inflight_htlcs(), &self.entropy_source, &self.node_signer,
-                               best_block_height, &self.logger, &self.pending_events,
+                               || self.compute_inflight_htlcs(), &self.entropy_source, &self.node_signer, &self,
+                               &self.secp_ctx, best_block_height, &self.logger, &self.pending_events,
                                |args| self.send_payment_along_path(args)
                        )
        }
@@ -4663,6 +4778,7 @@ where
 
                let mut per_source_pending_forward = [(
                        payment.prev_short_channel_id,
+                       payment.prev_counterparty_node_id,
                        payment.prev_funding_outpoint,
                        payment.prev_channel_id,
                        payment.prev_user_channel_id,
@@ -4693,6 +4809,7 @@ where
                                user_channel_id: Some(payment.prev_user_channel_id),
                                outpoint: payment.prev_funding_outpoint,
                                channel_id: payment.prev_channel_id,
+                               counterparty_node_id: payment.prev_counterparty_node_id,
                                htlc_id: payment.prev_htlc_id,
                                incoming_packet_shared_secret: payment.forward_info.incoming_shared_secret,
                                phantom_shared_secret: None,
@@ -4822,8 +4939,10 @@ where
 
                        // Process all of the forwards and failures for the channel in which the HTLCs were
                        // proposed to as a batch.
-                       let pending_forwards = (incoming_scid, incoming_funding_txo, incoming_channel_id,
-                               incoming_user_channel_id, htlc_forwards.drain(..).collect());
+                       let pending_forwards = (
+                               incoming_scid, Some(incoming_counterparty_node_id), incoming_funding_txo,
+                               incoming_channel_id, incoming_user_channel_id, htlc_forwards.drain(..).collect()
+                       );
                        self.forward_htlcs_without_forward_event(&mut [pending_forwards]);
                        for (htlc_fail, htlc_destination) in htlc_fails.drain(..) {
                                let failure = match htlc_fail {
@@ -4857,7 +4976,7 @@ where
 
                let mut new_events = VecDeque::new();
                let mut failed_forwards = Vec::new();
-               let mut phantom_receives: Vec<(u64, OutPoint, ChannelId, u128, Vec<(PendingHTLCInfo, u64)>)> = Vec::new();
+               let mut phantom_receives: Vec<(u64, Option<PublicKey>, OutPoint, ChannelId, u128, Vec<(PendingHTLCInfo, u64)>)> = Vec::new();
                {
                        let mut forward_htlcs = new_hash_map();
                        mem::swap(&mut forward_htlcs, &mut self.forward_htlcs.lock().unwrap());
@@ -4866,12 +4985,12 @@ where
                                if short_chan_id != 0 {
                                        let mut forwarding_counterparty = None;
                                        macro_rules! forwarding_channel_not_found {
-                                               () => {
-                                                       for forward_info in pending_forwards.drain(..) {
+                                               ($forward_infos: expr) => {
+                                                       for forward_info in $forward_infos {
                                                                match forward_info {
                                                                        HTLCForwardInfo::AddHTLC(PendingAddHTLCInfo {
                                                                                prev_short_channel_id, prev_htlc_id, prev_channel_id, prev_funding_outpoint,
-                                                                               prev_user_channel_id, forward_info: PendingHTLCInfo {
+                                                                               prev_user_channel_id, prev_counterparty_node_id, forward_info: PendingHTLCInfo {
                                                                                        routing, incoming_shared_secret, payment_hash, outgoing_amt_msat,
                                                                                        outgoing_cltv_value, ..
                                                                                }
@@ -4886,6 +5005,7 @@ where
                                                                                                        user_channel_id: Some(prev_user_channel_id),
                                                                                                        channel_id: prev_channel_id,
                                                                                                        outpoint: prev_funding_outpoint,
+                                                                                                       counterparty_node_id: prev_counterparty_node_id,
                                                                                                        htlc_id: prev_htlc_id,
                                                                                                        incoming_packet_shared_secret: incoming_shared_secret,
                                                                                                        phantom_shared_secret: $phantom_ss,
@@ -4948,7 +5068,10 @@ where
                                                                                                                        outgoing_cltv_value, Some(phantom_shared_secret), false, None,
                                                                                                                        current_height, self.default_configuration.accept_mpp_keysend)
                                                                                                                {
-                                                                                                                       Ok(info) => phantom_receives.push((prev_short_channel_id, prev_funding_outpoint, prev_channel_id, prev_user_channel_id, vec![(info, prev_htlc_id)])),
+                                                                                                                       Ok(info) => phantom_receives.push((
+                                                                                                                               prev_short_channel_id, prev_counterparty_node_id, prev_funding_outpoint,
+                                                                                                                               prev_channel_id, prev_user_channel_id, vec![(info, prev_htlc_id)]
+                                                                                                                       )),
                                                                                                                        Err(InboundHTLCErr { err_code, err_data, msg }) => failed_payment!(msg, err_code, err_data, Some(phantom_shared_secret))
                                                                                                                }
                                                                                                        },
@@ -4975,7 +5098,7 @@ where
                                        let (counterparty_node_id, forward_chan_id) = match chan_info_opt {
                                                Some((cp_id, chan_id)) => (cp_id, chan_id),
                                                None => {
-                                                       forwarding_channel_not_found!();
+                                                       forwarding_channel_not_found!(pending_forwards.drain(..));
                                                        continue;
                                                }
                                        };
@@ -4983,103 +5106,156 @@ where
                                        let per_peer_state = self.per_peer_state.read().unwrap();
                                        let peer_state_mutex_opt = per_peer_state.get(&counterparty_node_id);
                                        if peer_state_mutex_opt.is_none() {
-                                               forwarding_channel_not_found!();
+                                               forwarding_channel_not_found!(pending_forwards.drain(..));
                                                continue;
                                        }
                                        let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap();
                                        let peer_state = &mut *peer_state_lock;
-                                       if let Some(ChannelPhase::Funded(ref mut chan)) = peer_state.channel_by_id.get_mut(&forward_chan_id) {
-                                               let logger = WithChannelContext::from(&self.logger, &chan.context, None);
-                                               for forward_info in pending_forwards.drain(..) {
-                                                       let queue_fail_htlc_res = match forward_info {
-                                                               HTLCForwardInfo::AddHTLC(PendingAddHTLCInfo {
-                                                                       prev_short_channel_id, prev_htlc_id, prev_channel_id, prev_funding_outpoint,
-                                                                       prev_user_channel_id, forward_info: PendingHTLCInfo {
-                                                                               incoming_shared_secret, payment_hash, outgoing_amt_msat, outgoing_cltv_value,
-                                                                               routing: PendingHTLCRouting::Forward {
-                                                                                       onion_packet, blinded, ..
-                                                                               }, skimmed_fee_msat, ..
+                                       let mut draining_pending_forwards = pending_forwards.drain(..);
+                                       while let Some(forward_info) = draining_pending_forwards.next() {
+                                               let queue_fail_htlc_res = match forward_info {
+                                                       HTLCForwardInfo::AddHTLC(PendingAddHTLCInfo {
+                                                               prev_short_channel_id, prev_htlc_id, prev_channel_id, prev_funding_outpoint,
+                                                               prev_user_channel_id, prev_counterparty_node_id, forward_info: PendingHTLCInfo {
+                                                                       incoming_shared_secret, payment_hash, outgoing_amt_msat, outgoing_cltv_value,
+                                                                       routing: PendingHTLCRouting::Forward {
+                                                                               ref onion_packet, blinded, ..
+                                                                       }, skimmed_fee_msat, ..
+                                                               },
+                                                       }) => {
+                                                               let htlc_source = HTLCSource::PreviousHopData(HTLCPreviousHopData {
+                                                                       short_channel_id: prev_short_channel_id,
+                                                                       user_channel_id: Some(prev_user_channel_id),
+                                                                       counterparty_node_id: prev_counterparty_node_id,
+                                                                       channel_id: prev_channel_id,
+                                                                       outpoint: prev_funding_outpoint,
+                                                                       htlc_id: prev_htlc_id,
+                                                                       incoming_packet_shared_secret: incoming_shared_secret,
+                                                                       // Phantom payments are only PendingHTLCRouting::Receive.
+                                                                       phantom_shared_secret: None,
+                                                                       blinded_failure: blinded.map(|b| b.failure),
+                                                               });
+                                                               let next_blinding_point = blinded.and_then(|b| {
+                                                                       let encrypted_tlvs_ss = self.node_signer.ecdh(
+                                                                               Recipient::Node, &b.inbound_blinding_point, None
+                                                                       ).unwrap().secret_bytes();
+                                                                       onion_utils::next_hop_pubkey(
+                                                                               &self.secp_ctx, b.inbound_blinding_point, &encrypted_tlvs_ss
+                                                                       ).ok()
+                                                               });
+
+                                                               // Forward the HTLC over the most appropriate channel with the corresponding peer,
+                                                               // applying non-strict forwarding.
+                                                               // The channel with the least amount of outbound liquidity will be used to maximize the
+                                                               // probability of being able to successfully forward a subsequent HTLC.
+                                                               let maybe_optimal_channel = peer_state.channel_by_id.values_mut().filter_map(|phase| match phase {
+                                                                       ChannelPhase::Funded(chan) => {
+                                                                               let balances = chan.context.get_available_balances(&self.fee_estimator);
+                                                                               if outgoing_amt_msat <= balances.next_outbound_htlc_limit_msat &&
+                                                                                       outgoing_amt_msat >= balances.next_outbound_htlc_minimum_msat &&
+                                                                                       chan.context.is_usable() {
+                                                                                       Some((chan, balances))
+                                                                               } else {
+                                                                                       None
+                                                                               }
                                                                        },
-                                                               }) => {
-                                                                       let logger = WithChannelContext::from(&self.logger, &chan.context, Some(payment_hash));
-                                                                       log_trace!(logger, "Adding HTLC from short id {} with payment_hash {} to channel with short id {} after delay", prev_short_channel_id, &payment_hash, short_chan_id);
-                                                                       let htlc_source = HTLCSource::PreviousHopData(HTLCPreviousHopData {
-                                                                               short_channel_id: prev_short_channel_id,
-                                                                               user_channel_id: Some(prev_user_channel_id),
-                                                                               channel_id: prev_channel_id,
-                                                                               outpoint: prev_funding_outpoint,
-                                                                               htlc_id: prev_htlc_id,
-                                                                               incoming_packet_shared_secret: incoming_shared_secret,
-                                                                               // Phantom payments are only PendingHTLCRouting::Receive.
-                                                                               phantom_shared_secret: None,
-                                                                               blinded_failure: blinded.map(|b| b.failure),
-                                                                       });
-                                                                       let next_blinding_point = blinded.and_then(|b| {
-                                                                               let encrypted_tlvs_ss = self.node_signer.ecdh(
-                                                                                       Recipient::Node, &b.inbound_blinding_point, None
-                                                                               ).unwrap().secret_bytes();
-                                                                               onion_utils::next_hop_pubkey(
-                                                                                       &self.secp_ctx, b.inbound_blinding_point, &encrypted_tlvs_ss
-                                                                               ).ok()
-                                                                       });
-                                                                       if let Err(e) = chan.queue_add_htlc(outgoing_amt_msat,
-                                                                               payment_hash, outgoing_cltv_value, htlc_source.clone(),
-                                                                               onion_packet, skimmed_fee_msat, next_blinding_point, &self.fee_estimator,
-                                                                               &&logger)
-                                                                       {
-                                                                               if let ChannelError::Ignore(msg) = e {
-                                                                                       log_trace!(logger, "Failed to forward HTLC with payment_hash {}: {}", &payment_hash, msg);
+                                                                       _ => None,
+                                                               }).min_by_key(|(_, balances)| balances.next_outbound_htlc_limit_msat).map(|(c, _)| c);
+                                                               let optimal_channel = match maybe_optimal_channel {
+                                                                       Some(chan) => chan,
+                                                                       None => {
+                                                                               // Fall back to the specified channel to return an appropriate error.
+                                                                               if let Some(ChannelPhase::Funded(ref mut chan)) = peer_state.channel_by_id.get_mut(&forward_chan_id) {
+                                                                                       chan
                                                                                } else {
-                                                                                       panic!("Stated return value requirements in send_htlc() were not met");
+                                                                                       forwarding_channel_not_found!(core::iter::once(forward_info).chain(draining_pending_forwards));
+                                                                                       break;
                                                                                }
+                                                                       }
+                                                               };
+
+                                                               let logger = WithChannelContext::from(&self.logger, &optimal_channel.context, Some(payment_hash));
+                                                               let channel_description = if optimal_channel.context.get_short_channel_id() == Some(short_chan_id) {
+                                                                       "specified"
+                                                               } else {
+                                                                       "alternate"
+                                                               };
+                                                               log_trace!(logger, "Forwarding HTLC from SCID {} with payment_hash {} and next hop SCID {} over {} channel {} with corresponding peer {}",
+                                                                       prev_short_channel_id, &payment_hash, short_chan_id, channel_description, optimal_channel.context.channel_id(), &counterparty_node_id);
+                                                               if let Err(e) = optimal_channel.queue_add_htlc(outgoing_amt_msat,
+                                                                               payment_hash, outgoing_cltv_value, htlc_source.clone(),
+                                                                               onion_packet.clone(), skimmed_fee_msat, next_blinding_point, &self.fee_estimator,
+                                                                               &&logger)
+                                                               {
+                                                                       if let ChannelError::Ignore(msg) = e {
+                                                                               log_trace!(logger, "Failed to forward HTLC with payment_hash {} to peer {}: {}", &payment_hash, &counterparty_node_id, msg);
+                                                                       } else {
+                                                                               panic!("Stated return value requirements in send_htlc() were not met");
+                                                                       }
+
+                                                                       if let Some(ChannelPhase::Funded(ref mut chan)) = peer_state.channel_by_id.get_mut(&forward_chan_id) {
                                                                                let (failure_code, data) = self.get_htlc_temp_fail_err_and_data(0x1000|7, short_chan_id, chan);
                                                                                failed_forwards.push((htlc_source, payment_hash,
                                                                                        HTLCFailReason::reason(failure_code, data),
                                                                                        HTLCDestination::NextHopChannel { node_id: Some(chan.context.get_counterparty_node_id()), channel_id: forward_chan_id }
                                                                                ));
-                                                                               continue;
+                                                                       } else {
+                                                                               forwarding_channel_not_found!(core::iter::once(forward_info).chain(draining_pending_forwards));
+                                                                               break;
                                                                        }
-                                                                       None
-                                                               },
-                                                               HTLCForwardInfo::AddHTLC { .. } => {
-                                                                       panic!("short_channel_id != 0 should imply any pending_forward entries are of type Forward");
-                                                               },
-                                                               HTLCForwardInfo::FailHTLC { htlc_id, err_packet } => {
+                                                               }
+                                                               None
+                                                       },
+                                                       HTLCForwardInfo::AddHTLC { .. } => {
+                                                               panic!("short_channel_id != 0 should imply any pending_forward entries are of type Forward");
+                                                       },
+                                                       HTLCForwardInfo::FailHTLC { htlc_id, ref err_packet } => {
+                                                               if let Some(ChannelPhase::Funded(ref mut chan)) = peer_state.channel_by_id.get_mut(&forward_chan_id) {
+                                                                       let logger = WithChannelContext::from(&self.logger, &chan.context, None);
                                                                        log_trace!(logger, "Failing HTLC back to channel with short id {} (backward HTLC ID {}) after delay", short_chan_id, htlc_id);
-                                                                       Some((chan.queue_fail_htlc(htlc_id, err_packet, &&logger), htlc_id))
-                                                               },
-                                                               HTLCForwardInfo::FailMalformedHTLC { htlc_id, failure_code, sha256_of_onion } => {
+                                                                       Some((chan.queue_fail_htlc(htlc_id, err_packet.clone(), &&logger), htlc_id))
+                                                               } else {
+                                                                       forwarding_channel_not_found!(core::iter::once(forward_info).chain(draining_pending_forwards));
+                                                                       break;
+                                                               }
+                                                       },
+                                                       HTLCForwardInfo::FailMalformedHTLC { htlc_id, failure_code, sha256_of_onion } => {
+                                                               if let Some(ChannelPhase::Funded(ref mut chan)) = peer_state.channel_by_id.get_mut(&forward_chan_id) {
+                                                                       let logger = WithChannelContext::from(&self.logger, &chan.context, None);
                                                                        log_trace!(logger, "Failing malformed HTLC back to channel with short id {} (backward HTLC ID {}) after delay", short_chan_id, htlc_id);
                                                                        let res = chan.queue_fail_malformed_htlc(
                                                                                htlc_id, failure_code, sha256_of_onion, &&logger
                                                                        );
                                                                        Some((res, htlc_id))
-                                                               },
-                                                       };
-                                                       if let Some((queue_fail_htlc_res, htlc_id)) = queue_fail_htlc_res {
-                                                               if let Err(e) = queue_fail_htlc_res {
-                                                                       if let ChannelError::Ignore(msg) = e {
+                                                               } else {
+                                                                       forwarding_channel_not_found!(core::iter::once(forward_info).chain(draining_pending_forwards));
+                                                                       break;
+                                                               }
+                                                       },
+                                               };
+                                               if let Some((queue_fail_htlc_res, htlc_id)) = queue_fail_htlc_res {
+                                                       if let Err(e) = queue_fail_htlc_res {
+                                                               if let ChannelError::Ignore(msg) = e {
+                                                                       if let Some(ChannelPhase::Funded(ref mut chan)) = peer_state.channel_by_id.get_mut(&forward_chan_id) {
+                                                                               let logger = WithChannelContext::from(&self.logger, &chan.context, None);
                                                                                log_trace!(logger, "Failed to fail HTLC with ID {} backwards to short_id {}: {}", htlc_id, short_chan_id, msg);
-                                                                       } else {
-                                                                               panic!("Stated return value requirements in queue_fail_{{malformed_}}htlc() were not met");
                                                                        }
-                                                                       // fail-backs are best-effort, we probably already have one
-                                                                       // pending, and if not that's OK, if not, the channel is on
-                                                                       // the chain and sending the HTLC-Timeout is their problem.
-                                                                       continue;
+                                                               } else {
+                                                                       panic!("Stated return value requirements in queue_fail_{{malformed_}}htlc() were not met");
                                                                }
+                                                               // fail-backs are best-effort, we probably already have one
+                                                               // pending, and if not that's OK, if not, the channel is on
+                                                               // the chain and sending the HTLC-Timeout is their problem.
+                                                               continue;
                                                        }
                                                }
-                                       } else {
-                                               forwarding_channel_not_found!();
-                                               continue;
                                        }
                                } else {
                                        'next_forwardable_htlc: for forward_info in pending_forwards.drain(..) {
                                                match forward_info {
                                                        HTLCForwardInfo::AddHTLC(PendingAddHTLCInfo {
                                                                prev_short_channel_id, prev_htlc_id, prev_channel_id, prev_funding_outpoint,
-                                                               prev_user_channel_id, forward_info: PendingHTLCInfo {
+                                                               prev_user_channel_id, prev_counterparty_node_id, forward_info: PendingHTLCInfo {
                                                                        routing, incoming_shared_secret, payment_hash, incoming_amt_msat, outgoing_amt_msat,
                                                                        skimmed_fee_msat, ..
                                                                }
@@ -5117,6 +5293,7 @@ where
                                                                        prev_hop: HTLCPreviousHopData {
                                                                                short_channel_id: prev_short_channel_id,
                                                                                user_channel_id: Some(prev_user_channel_id),
+                                                                               counterparty_node_id: prev_counterparty_node_id,
                                                                                channel_id: prev_channel_id,
                                                                                outpoint: prev_funding_outpoint,
                                                                                htlc_id: prev_htlc_id,
@@ -5149,6 +5326,7 @@ where
                                                                                failed_forwards.push((HTLCSource::PreviousHopData(HTLCPreviousHopData {
                                                                                                short_channel_id: $htlc.prev_hop.short_channel_id,
                                                                                                user_channel_id: $htlc.prev_hop.user_channel_id,
+                                                                                               counterparty_node_id: $htlc.prev_hop.counterparty_node_id,
                                                                                                channel_id: prev_channel_id,
                                                                                                outpoint: prev_funding_outpoint,
                                                                                                htlc_id: $htlc.prev_hop.htlc_id,
@@ -6034,7 +6212,7 @@ where
 
                let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self);
 
-               let mut sources = {
+               let sources = {
                        let mut claimable_payments = self.claimable_payments.lock().unwrap();
                        if let Some(payment) = claimable_payments.claimable_payments.remove(&payment_hash) {
                                let mut receiver_node_id = self.our_network_pubkey;
@@ -6129,26 +6307,46 @@ where
                        return;
                }
                if valid_mpp {
-                       for htlc in sources.drain(..) {
-                               let prev_hop_chan_id = htlc.prev_hop.channel_id;
-                               if let Err((pk, err)) = self.claim_funds_from_hop(
+                       let pending_mpp_claim_ptr_opt = if sources.len() > 1 {
+                               let channels_without_preimage = sources.iter().filter_map(|htlc| {
+                                       if let Some(cp_id) = htlc.prev_hop.counterparty_node_id {
+                                               let prev_hop = &htlc.prev_hop;
+                                               Some((cp_id, prev_hop.outpoint, prev_hop.channel_id, prev_hop.htlc_id))
+                                       } else {
+                                               None
+                                       }
+                               }).collect();
+                               Some(Arc::new(Mutex::new(PendingMPPClaim {
+                                       channels_without_preimage,
+                                       channels_with_preimage: Vec::new(),
+                               })))
+                       } else {
+                               None
+                       };
+                       for htlc in sources {
+                               let this_mpp_claim = pending_mpp_claim_ptr_opt.as_ref().and_then(|pending_mpp_claim|
+                                       if let Some(cp_id) = htlc.prev_hop.counterparty_node_id {
+                                               let claim_ptr = PendingMPPClaimPointer(Arc::clone(pending_mpp_claim));
+                                               Some((cp_id, htlc.prev_hop.channel_id, htlc.prev_hop.htlc_id, claim_ptr))
+                                       } else {
+                                               None
+                                       }
+                               );
+                               let raa_blocker = pending_mpp_claim_ptr_opt.as_ref().map(|pending_claim| {
+                                       RAAMonitorUpdateBlockingAction::ClaimedMPPPayment {
+                                               pending_claim: PendingMPPClaimPointer(Arc::clone(pending_claim)),
+                                       }
+                               });
+                               self.claim_funds_from_hop(
                                        htlc.prev_hop, payment_preimage,
                                        |_, definitely_duplicate| {
                                                debug_assert!(!definitely_duplicate, "We shouldn't claim duplicatively from a payment");
-                                               Some(MonitorUpdateCompletionAction::PaymentClaimed { payment_hash })
+                                               (Some(MonitorUpdateCompletionAction::PaymentClaimed { payment_hash, pending_mpp_claim: this_mpp_claim }), raa_blocker)
                                        }
-                               ) {
-                                       if let msgs::ErrorAction::IgnoreError = err.err.action {
-                                               // We got a temporary failure updating monitor, but will claim the
-                                               // HTLC when the monitor updating is restored (or on chain).
-                                               let logger = WithContext::from(&self.logger, None, Some(prev_hop_chan_id), Some(payment_hash));
-                                               log_error!(logger, "Temporary failure claiming HTLC, treating as success: {}", err.err.err);
-                                       } else { errs.push((pk, err)); }
-                               }
+                               );
                        }
-               }
-               if !valid_mpp {
-                       for htlc in sources.drain(..) {
+               } else {
+                       for htlc in sources {
                                let mut htlc_msat_height_data = htlc.value.to_be_bytes().to_vec();
                                htlc_msat_height_data.extend_from_slice(&self.best_block.read().unwrap().height.to_be_bytes());
                                let source = HTLCSource::PreviousHopData(htlc.prev_hop);
@@ -6166,9 +6364,12 @@ where
                }
        }
 
-       fn claim_funds_from_hop<ComplFunc: FnOnce(Option<u64>, bool) -> Option<MonitorUpdateCompletionAction>>(&self,
-               prev_hop: HTLCPreviousHopData, payment_preimage: PaymentPreimage, completion_action: ComplFunc)
-       -> Result<(), (PublicKey, MsgHandleErrInternal)> {
+       fn claim_funds_from_hop<
+               ComplFunc: FnOnce(Option<u64>, bool) -> (Option<MonitorUpdateCompletionAction>, Option<RAAMonitorUpdateBlockingAction>)
+       >(
+               &self, prev_hop: HTLCPreviousHopData, payment_preimage: PaymentPreimage,
+               completion_action: ComplFunc,
+       ) {
                //TODO: Delay the claimed_funds relaying just like we do outbound relay!
 
                // If we haven't yet run background events assume we're still deserializing and shouldn't
@@ -6205,11 +6406,15 @@ where
 
                                                match fulfill_res {
                                                        UpdateFulfillCommitFetch::NewClaim { htlc_value_msat, monitor_update } => {
-                                                               if let Some(action) = completion_action(Some(htlc_value_msat), false) {
+                                                               let (action_opt, raa_blocker_opt) = completion_action(Some(htlc_value_msat), false);
+                                                               if let Some(action) = action_opt {
                                                                        log_trace!(logger, "Tracking monitor update completion action for channel {}: {:?}",
                                                                                chan_id, action);
                                                                        peer_state.monitor_update_blocked_actions.entry(chan_id).or_insert(Vec::new()).push(action);
                                                                }
+                                                               if let Some(raa_blocker) = raa_blocker_opt {
+                                                                       peer_state.actions_blocking_raa_monitor_updates.entry(chan_id).or_insert_with(Vec::new).push(raa_blocker);
+                                                               }
                                                                if !during_init {
                                                                        handle_new_monitor_update!(self, prev_hop.outpoint, monitor_update, peer_state_lock,
                                                                                peer_state, per_peer_state, chan);
@@ -6227,11 +6432,16 @@ where
                                                                }
                                                        }
                                                        UpdateFulfillCommitFetch::DuplicateClaim {} => {
-                                                               let action = if let Some(action) = completion_action(None, true) {
+                                                               let (action_opt, raa_blocker_opt) = completion_action(None, true);
+                                                               if let Some(raa_blocker) = raa_blocker_opt {
+                                                                       debug_assert!(peer_state.actions_blocking_raa_monitor_updates.get(&chan_id).unwrap().contains(&raa_blocker));
+                                                               }
+                                                               let action = if let Some(action) = action_opt {
                                                                        action
                                                                } else {
-                                                                       return Ok(());
+                                                                       return;
                                                                };
+
                                                                mem::drop(peer_state_lock);
 
                                                                log_trace!(logger, "Completing monitor update completion action for channel {} as claim was redundant: {:?}",
@@ -6246,7 +6456,7 @@ where
                                                                } else {
                                                                        debug_assert!(false,
                                                                                "Duplicate claims should always free another channel immediately");
-                                                                       return Ok(());
+                                                                       return;
                                                                };
                                                                if let Some(peer_state_mtx) = per_peer_state.get(&node_id) {
                                                                        let mut peer_state = peer_state_mtx.lock().unwrap();
@@ -6271,7 +6481,7 @@ where
                                                        }
                                                }
                                        }
-                                       return Ok(());
+                                       return;
                                }
                        }
                }
@@ -6318,8 +6528,46 @@ where
                // `ChannelMonitor` we've provided the above update to. Instead, note that `Event`s are
                // generally always allowed to be duplicative (and it's specifically noted in
                // `PaymentForwarded`).
-               self.handle_monitor_update_completion_actions(completion_action(None, false));
-               Ok(())
+               let (action_opt, raa_blocker_opt) = completion_action(None, false);
+
+               if let Some(raa_blocker) = raa_blocker_opt {
+                       let counterparty_node_id = prev_hop.counterparty_node_id.or_else(||
+                               // prev_hop.counterparty_node_id is always available for payments received after
+                               // LDK 0.0.123, but for those received on 0.0.123 and claimed later, we need to
+                               // look up the counterparty in the `action_opt`, if possible.
+                               action_opt.as_ref().and_then(|action|
+                                       if let MonitorUpdateCompletionAction::PaymentClaimed { pending_mpp_claim, .. } = action {
+                                               pending_mpp_claim.as_ref().map(|(node_id, _, _, _)| *node_id)
+                                       } else { None }
+                               )
+                       );
+                       if let Some(counterparty_node_id) = counterparty_node_id {
+                               // TODO: Avoid always blocking the world for the write lock here.
+                               let mut per_peer_state = self.per_peer_state.write().unwrap();
+                               let peer_state_mutex = per_peer_state.entry(counterparty_node_id).or_insert_with(||
+                                       Mutex::new(PeerState {
+                                               channel_by_id: new_hash_map(),
+                                               inbound_channel_request_by_id: new_hash_map(),
+                                               latest_features: InitFeatures::empty(),
+                                               pending_msg_events: Vec::new(),
+                                               in_flight_monitor_updates: BTreeMap::new(),
+                                               monitor_update_blocked_actions: BTreeMap::new(),
+                                               actions_blocking_raa_monitor_updates: BTreeMap::new(),
+                                               is_connected: false,
+                                       }));
+                               let mut peer_state = peer_state_mutex.lock().unwrap();
+
+                               peer_state.actions_blocking_raa_monitor_updates
+                                       .entry(prev_hop.channel_id)
+                                       .or_insert_with(Vec::new)
+                                       .push(raa_blocker);
+                       } else {
+                               debug_assert!(false,
+                                       "RAA ChannelMonitorUpdate blockers are only set with PaymentClaimed completion actions, so we should always have a counterparty node id");
+                       }
+               }
+
+               self.handle_monitor_update_completion_actions(action_opt);
        }
 
        fn finalize_claims(&self, sources: Vec<HTLCSource>) {
@@ -6352,11 +6600,16 @@ where
                                let completed_blocker = RAAMonitorUpdateBlockingAction::from_prev_hop_data(&hop_data);
                                #[cfg(debug_assertions)]
                                let claiming_chan_funding_outpoint = hop_data.outpoint;
-                               let res = self.claim_funds_from_hop(hop_data, payment_preimage,
+                               self.claim_funds_from_hop(hop_data, payment_preimage,
                                        |htlc_claim_value_msat, definitely_duplicate| {
                                                let chan_to_release =
                                                        if let Some(node_id) = next_channel_counterparty_node_id {
-                                                               Some((node_id, next_channel_outpoint, next_channel_id, completed_blocker))
+                                                               Some(EventUnblockedChannel {
+                                                                       counterparty_node_id: node_id,
+                                                                       funding_txo: next_channel_outpoint,
+                                                                       channel_id: next_channel_id,
+                                                                       blocking_action: completed_blocker
+                                                               })
                                                        } else {
                                                                // We can only get `None` here if we are processing a
                                                                // `ChannelMonitor`-originated event, in which case we
@@ -6413,16 +6666,16 @@ where
                                                                        }
                                                                }), "{:?}", *background_events);
                                                        }
-                                                       None
+                                                       (None, None)
                                                } else if definitely_duplicate {
                                                        if let Some(other_chan) = chan_to_release {
-                                                               Some(MonitorUpdateCompletionAction::FreeOtherChannelImmediately {
-                                                                       downstream_counterparty_node_id: other_chan.0,
-                                                                       downstream_funding_outpoint: other_chan.1,
-                                                                       downstream_channel_id: other_chan.2,
-                                                                       blocking_action: other_chan.3,
-                                                               })
-                                                       } else { None }
+                                                               (Some(MonitorUpdateCompletionAction::FreeOtherChannelImmediately {
+                                                                       downstream_counterparty_node_id: other_chan.counterparty_node_id,
+                                                                       downstream_funding_outpoint: other_chan.funding_txo,
+                                                                       downstream_channel_id: other_chan.channel_id,
+                                                                       blocking_action: other_chan.blocking_action,
+                                                               }), None)
+                                                       } else { (None, None) }
                                                } else {
                                                        let total_fee_earned_msat = if let Some(forwarded_htlc_value) = forwarded_htlc_value_msat {
                                                                if let Some(claimed_htlc_value) = htlc_claim_value_msat {
@@ -6431,7 +6684,7 @@ where
                                                        } else { None };
                                                        debug_assert!(skimmed_fee_msat <= total_fee_earned_msat,
                                                                "skimmed_fee_msat must always be included in total_fee_earned_msat");
-                                                       Some(MonitorUpdateCompletionAction::EmitEventAndFreeOtherChannel {
+                                                       (Some(MonitorUpdateCompletionAction::EmitEventAndFreeOtherChannel {
                                                                event: events::Event::PaymentForwarded {
                                                                        prev_channel_id: Some(prev_channel_id),
                                                                        next_channel_id: Some(next_channel_id),
@@ -6443,13 +6696,9 @@ where
                                                                        outbound_amount_forwarded_msat: forwarded_htlc_value_msat,
                                                                },
                                                                downstream_counterparty_and_funding_outpoint: chan_to_release,
-                                                       })
+                                                       }), None)
                                                }
                                        });
-                               if let Err((pk, err)) = res {
-                                       let result: Result<(), _> = Err(err);
-                                       let _ = handle_error!(self, result, pk);
-                               }
                        },
                }
        }
@@ -6464,9 +6713,44 @@ where
                debug_assert_ne!(self.claimable_payments.held_by_thread(), LockHeldState::HeldByThread);
                debug_assert_ne!(self.per_peer_state.held_by_thread(), LockHeldState::HeldByThread);
 
+               let mut freed_channels = Vec::new();
+
                for action in actions.into_iter() {
                        match action {
-                               MonitorUpdateCompletionAction::PaymentClaimed { payment_hash } => {
+                               MonitorUpdateCompletionAction::PaymentClaimed { payment_hash, pending_mpp_claim } => {
+                                       if let Some((counterparty_node_id, chan_id, htlc_id, claim_ptr)) = pending_mpp_claim {
+                                               let per_peer_state = self.per_peer_state.read().unwrap();
+                                               per_peer_state.get(&counterparty_node_id).map(|peer_state_mutex| {
+                                                       let mut peer_state = peer_state_mutex.lock().unwrap();
+                                                       let blockers_entry = peer_state.actions_blocking_raa_monitor_updates.entry(chan_id);
+                                                       if let btree_map::Entry::Occupied(mut blockers) = blockers_entry {
+                                                               blockers.get_mut().retain(|blocker|
+                                                                       if let &RAAMonitorUpdateBlockingAction::ClaimedMPPPayment { pending_claim } = &blocker {
+                                                                               if *pending_claim == claim_ptr {
+                                                                                       let mut pending_claim_state_lock = pending_claim.0.lock().unwrap();
+                                                                                       let pending_claim_state = &mut *pending_claim_state_lock;
+                                                                                       pending_claim_state.channels_without_preimage.retain(|(cp, outp, cid, hid)| {
+                                                                                               if *cp == counterparty_node_id && *cid == chan_id && *hid == htlc_id {
+                                                                                                       pending_claim_state.channels_with_preimage.push((*cp, *outp, *cid));
+                                                                                                       false
+                                                                                               } else { true }
+                                                                                       });
+                                                                                       if pending_claim_state.channels_without_preimage.is_empty() {
+                                                                                               for (cp, outp, cid) in pending_claim_state.channels_with_preimage.iter() {
+                                                                                                       freed_channels.push((*cp, *outp, *cid, blocker.clone()));
+                                                                                               }
+                                                                                       }
+                                                                                       !pending_claim_state.channels_without_preimage.is_empty()
+                                                                               } else { true }
+                                                                       } else { true }
+                                                               );
+                                                               if blockers.get().is_empty() {
+                                                                       blockers.remove();
+                                                               }
+                                                       }
+                                               });
+                                       }
+
                                        let payment = self.claimable_payments.lock().unwrap().pending_claiming_payments.remove(&payment_hash);
                                        if let Some(ClaimingPayment {
                                                amount_msat,
@@ -6491,8 +6775,11 @@ where
                                        event, downstream_counterparty_and_funding_outpoint
                                } => {
                                        self.pending_events.lock().unwrap().push_back((event, None));
-                                       if let Some((node_id, funding_outpoint, channel_id, blocker)) = downstream_counterparty_and_funding_outpoint {
-                                               self.handle_monitor_update_release(node_id, funding_outpoint, channel_id, Some(blocker));
+                                       if let Some(unblocked) = downstream_counterparty_and_funding_outpoint {
+                                               self.handle_monitor_update_release(
+                                                       unblocked.counterparty_node_id, unblocked.funding_txo,
+                                                       unblocked.channel_id, Some(unblocked.blocking_action),
+                                               );
                                        }
                                },
                                MonitorUpdateCompletionAction::FreeOtherChannelImmediately {
@@ -6507,6 +6794,10 @@ where
                                },
                        }
                }
+
+               for (node_id, funding_outpoint, channel_id, blocker) in freed_channels {
+                       self.handle_monitor_update_release(node_id, funding_outpoint, channel_id, Some(blocker));
+               }
        }
 
        /// Handles a channel reentering a functional state, either due to reconnect or a monitor
@@ -6517,7 +6808,7 @@ where
                pending_forwards: Vec<(PendingHTLCInfo, u64)>, pending_update_adds: Vec<msgs::UpdateAddHTLC>,
                funding_broadcastable: Option<Transaction>,
                channel_ready: Option<msgs::ChannelReady>, announcement_sigs: Option<msgs::AnnouncementSignatures>)
-       -> (Option<(u64, OutPoint, ChannelId, u128, Vec<(PendingHTLCInfo, u64)>)>, Option<(u64, Vec<msgs::UpdateAddHTLC>)>) {
+       -> (Option<(u64, Option<PublicKey>, OutPoint, ChannelId, u128, Vec<(PendingHTLCInfo, u64)>)>, Option<(u64, Vec<msgs::UpdateAddHTLC>)>) {
                let logger = WithChannelContext::from(&self.logger, &channel.context, None);
                log_trace!(logger, "Handling channel resumption for channel {} with {} RAA, {} commitment update, {} pending forwards, {} pending update_add_htlcs, {}broadcasting funding, {} channel ready, {} announcement",
                        &channel.context.channel_id(),
@@ -6533,8 +6824,11 @@ where
 
                let mut htlc_forwards = None;
                if !pending_forwards.is_empty() {
-                       htlc_forwards = Some((short_channel_id, channel.context.get_funding_txo().unwrap(),
-                               channel.context.channel_id(), channel.context.get_user_id(), pending_forwards));
+                       htlc_forwards = Some((
+                               short_channel_id, Some(channel.context.get_counterparty_node_id()),
+                               channel.context.get_funding_txo().unwrap(), channel.context.channel_id(),
+                               channel.context.get_user_id(), pending_forwards
+                       ));
                }
                let mut decode_update_add_htlcs = None;
                if !pending_update_adds.is_empty() {
@@ -6633,7 +6927,7 @@ where
                log_trace!(logger, "ChannelMonitor updated to {}. Current highest is {}. {} pending in-flight updates.",
                        highest_applied_update_id, channel.context.get_latest_monitor_update_id(),
                        remaining_in_flight);
-               if !channel.is_awaiting_monitor_update() || channel.context.get_latest_monitor_update_id() != highest_applied_update_id {
+               if !channel.is_awaiting_monitor_update() || remaining_in_flight != 0 {
                        return;
                }
                handle_monitor_update_completion!(self, peer_state_lock, peer_state, per_peer_state, channel);
@@ -7577,15 +7871,15 @@ where
        }
 
        #[inline]
-       fn forward_htlcs(&self, per_source_pending_forwards: &mut [(u64, OutPoint, ChannelId, u128, Vec<(PendingHTLCInfo, u64)>)]) {
+       fn forward_htlcs(&self, per_source_pending_forwards: &mut [(u64, Option<PublicKey>, OutPoint, ChannelId, u128, Vec<(PendingHTLCInfo, u64)>)]) {
                let push_forward_event = self.forward_htlcs_without_forward_event(per_source_pending_forwards);
                if push_forward_event { self.push_pending_forwards_ev() }
        }
 
        #[inline]
-       fn forward_htlcs_without_forward_event(&self, per_source_pending_forwards: &mut [(u64, OutPoint, ChannelId, u128, Vec<(PendingHTLCInfo, u64)>)]) -> bool {
+       fn forward_htlcs_without_forward_event(&self, per_source_pending_forwards: &mut [(u64, Option<PublicKey>, OutPoint, ChannelId, u128, Vec<(PendingHTLCInfo, u64)>)]) -> bool {
                let mut push_forward_event = false;
-               for &mut (prev_short_channel_id, prev_funding_outpoint, prev_channel_id, prev_user_channel_id, ref mut pending_forwards) in per_source_pending_forwards {
+               for &mut (prev_short_channel_id, prev_counterparty_node_id, prev_funding_outpoint, prev_channel_id, prev_user_channel_id, ref mut pending_forwards) in per_source_pending_forwards {
                        let mut new_intercept_events = VecDeque::new();
                        let mut failed_intercept_forwards = Vec::new();
                        if !pending_forwards.is_empty() {
@@ -7604,7 +7898,9 @@ where
                                        match forward_htlcs.entry(scid) {
                                                hash_map::Entry::Occupied(mut entry) => {
                                                        entry.get_mut().push(HTLCForwardInfo::AddHTLC(PendingAddHTLCInfo {
-                                                               prev_short_channel_id, prev_funding_outpoint, prev_channel_id, prev_htlc_id, prev_user_channel_id, forward_info }));
+                                                               prev_short_channel_id, prev_counterparty_node_id, prev_funding_outpoint,
+                                                               prev_channel_id, prev_htlc_id, prev_user_channel_id, forward_info
+                                                       }));
                                                },
                                                hash_map::Entry::Vacant(entry) => {
                                                        if !is_our_scid && forward_info.incoming_amt_msat.is_some() &&
@@ -7622,7 +7918,9 @@ where
                                                                                        intercept_id
                                                                                }, None));
                                                                                entry.insert(PendingAddHTLCInfo {
-                                                                                       prev_short_channel_id, prev_funding_outpoint, prev_channel_id, prev_htlc_id, prev_user_channel_id, forward_info });
+                                                                                       prev_short_channel_id, prev_counterparty_node_id, prev_funding_outpoint,
+                                                                                       prev_channel_id, prev_htlc_id, prev_user_channel_id, forward_info
+                                                                               });
                                                                        },
                                                                        hash_map::Entry::Occupied(_) => {
                                                                                let logger = WithContext::from(&self.logger, None, Some(prev_channel_id), Some(forward_info.payment_hash));
@@ -7630,6 +7928,7 @@ where
                                                                                let htlc_source = HTLCSource::PreviousHopData(HTLCPreviousHopData {
                                                                                        short_channel_id: prev_short_channel_id,
                                                                                        user_channel_id: Some(prev_user_channel_id),
+                                                                                       counterparty_node_id: prev_counterparty_node_id,
                                                                                        outpoint: prev_funding_outpoint,
                                                                                        channel_id: prev_channel_id,
                                                                                        htlc_id: prev_htlc_id,
@@ -7649,7 +7948,9 @@ where
                                                                // payments are being processed.
                                                                push_forward_event |= forward_htlcs_empty && decode_update_add_htlcs_empty;
                                                                entry.insert(vec!(HTLCForwardInfo::AddHTLC(PendingAddHTLCInfo {
-                                                                       prev_short_channel_id, prev_funding_outpoint, prev_channel_id, prev_htlc_id, prev_user_channel_id, forward_info })));
+                                                                       prev_short_channel_id, prev_counterparty_node_id, prev_funding_outpoint,
+                                                                       prev_channel_id, prev_htlc_id, prev_user_channel_id, forward_info
+                                                               })));
                                                        }
                                                }
                                        }
@@ -7851,7 +8152,7 @@ where
                                                return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a channel_update for a channel from the wrong node - it shouldn't know about our private channels!".to_owned(), chan_id));
                                        }
                                        let were_node_one = self.get_our_node_id().serialize()[..] < chan.context.get_counterparty_node_id().serialize()[..];
-                                       let msg_from_node_one = msg.contents.flags & 1 == 0;
+                                       let msg_from_node_one = msg.contents.channel_flags & 1 == 0;
                                        if were_node_one == msg_from_node_one {
                                                return Ok(NotifyOption::SkipPersistNoEvents);
                                        } else {
@@ -8126,11 +8427,26 @@ where
                        match phase {
                                ChannelPhase::Funded(chan) => {
                                        let msgs = chan.signer_maybe_unblocked(&self.logger);
-                                       if let Some(updates) = msgs.commitment_update {
-                                               pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs {
-                                                       node_id,
-                                                       updates,
-                                               });
+                                       let cu_msg = msgs.commitment_update.map(|updates| events::MessageSendEvent::UpdateHTLCs {
+                                               node_id,
+                                               updates,
+                                       });
+                                       let raa_msg = msgs.revoke_and_ack.map(|msg| events::MessageSendEvent::SendRevokeAndACK {
+                                               node_id,
+                                               msg,
+                                       });
+                                       match (cu_msg, raa_msg) {
+                                               (Some(cu), Some(raa)) if msgs.order == RAACommitmentOrder::CommitmentFirst => {
+                                                       pending_msg_events.push(cu);
+                                                       pending_msg_events.push(raa);
+                                               },
+                                               (Some(cu), Some(raa)) if msgs.order == RAACommitmentOrder::RevokeAndACKFirst => {
+                                                       pending_msg_events.push(raa);
+                                                       pending_msg_events.push(cu);
+                                               },
+                                               (Some(cu), _) => pending_msg_events.push(cu),
+                                               (_, Some(raa)) => pending_msg_events.push(raa),
+                                               (_, _) => {},
                                        }
                                        if let Some(msg) = msgs.funding_signed {
                                                pending_msg_events.push(events::MessageSendEvent::SendFundingSigned {
@@ -8305,8 +8621,10 @@ macro_rules! create_offer_builder { ($self: ident, $builder: ty) => {
                let entropy = &*$self.entropy_source;
                let secp_ctx = &$self.secp_ctx;
 
-               let path = $self.create_blinded_path_using_absolute_expiry(absolute_expiry)
+               let path = $self.create_blinded_paths_using_absolute_expiry(OffersContext::Unknown {}, absolute_expiry)
+                       .and_then(|paths| paths.into_iter().next().ok_or(()))
                        .map_err(|_| Bolt12SemanticError::MissingPaths)?;
+
                let builder = OfferBuilder::deriving_signing_pubkey(
                        node_id, expanded_key, entropy, secp_ctx
                )
@@ -8377,8 +8695,11 @@ macro_rules! create_refund_builder { ($self: ident, $builder: ty) => {
                let entropy = &*$self.entropy_source;
                let secp_ctx = &$self.secp_ctx;
 
-               let path = $self.create_blinded_path_using_absolute_expiry(Some(absolute_expiry))
+               let context = OffersContext::OutboundPayment { payment_id };
+               let path = $self.create_blinded_paths_using_absolute_expiry(context, Some(absolute_expiry))
+                       .and_then(|paths| paths.into_iter().next().ok_or(()))
                        .map_err(|_| Bolt12SemanticError::MissingPaths)?;
+
                let builder = RefundBuilder::deriving_payer_id(
                        node_id, expanded_key, entropy, secp_ctx, amount_msats, payment_id
                )?
@@ -8399,6 +8720,13 @@ macro_rules! create_refund_builder { ($self: ident, $builder: ty) => {
        }
 } }
 
+/// Defines the maximum number of [`OffersMessage`] including different reply paths to be sent
+/// along different paths.
+/// Sending multiple requests increases the chances of successful delivery in case some
+/// paths are unavailable. However, only one invoice for a given [`PaymentId`] will be paid,
+/// even if multiple invoices are received.
+const OFFERS_MESSAGE_REQUEST_LIMIT: usize = 10;
+
 impl<M: Deref, T: Deref, ES: Deref, NS: Deref, SP: Deref, F: Deref, R: Deref, L: Deref> ChannelManager<M, T, ES, NS, SP, F, R, L>
 where
        M::Target: chain::Watch<<SP::Target as SignerProvider>::EcdsaSigner>,
@@ -8500,7 +8828,9 @@ where
                        Some(payer_note) => builder.payer_note(payer_note),
                };
                let invoice_request = builder.build_and_sign()?;
-               let reply_path = self.create_blinded_path().map_err(|_| Bolt12SemanticError::MissingPaths)?;
+
+               let context = OffersContext::OutboundPayment { payment_id };
+               let reply_paths = self.create_blinded_paths(context).map_err(|_| Bolt12SemanticError::MissingPaths)?;
 
                let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self);
 
@@ -8513,25 +8843,27 @@ where
 
                let mut pending_offers_messages = self.pending_offers_messages.lock().unwrap();
                if !offer.paths().is_empty() {
-                       // Send as many invoice requests as there are paths in the offer (with an upper bound).
-                       // Using only one path could result in a failure if the path no longer exists. But only
-                       // one invoice for a given payment id will be paid, even if more than one is received.
-                       const REQUEST_LIMIT: usize = 10;
-                       for path in offer.paths().into_iter().take(REQUEST_LIMIT) {
+                       reply_paths
+                               .iter()
+                               .flat_map(|reply_path| offer.paths().iter().map(move |path| (path, reply_path)))
+                               .take(OFFERS_MESSAGE_REQUEST_LIMIT)
+                               .for_each(|(path, reply_path)| {
+                                       let message = new_pending_onion_message(
+                                               OffersMessage::InvoiceRequest(invoice_request.clone()),
+                                               Destination::BlindedPath(path.clone()),
+                                               Some(reply_path.clone()),
+                                       );
+                                       pending_offers_messages.push(message);
+                               });
+               } else if let Some(signing_pubkey) = offer.signing_pubkey() {
+                       for reply_path in reply_paths {
                                let message = new_pending_onion_message(
                                        OffersMessage::InvoiceRequest(invoice_request.clone()),
-                                       Destination::BlindedPath(path.clone()),
-                                       Some(reply_path.clone()),
+                                       Destination::Node(signing_pubkey),
+                                       Some(reply_path),
                                );
                                pending_offers_messages.push(message);
                        }
-               } else if let Some(signing_pubkey) = offer.signing_pubkey() {
-                       let message = new_pending_onion_message(
-                               OffersMessage::InvoiceRequest(invoice_request),
-                               Destination::Node(signing_pubkey),
-                               Some(reply_path),
-                       );
-                       pending_offers_messages.push(message);
                } else {
                        debug_assert!(false);
                        return Err(Bolt12SemanticError::MissingSigningPubkey);
@@ -8600,26 +8932,32 @@ where
                                )?;
                                let builder: InvoiceBuilder<DerivedSigningPubkey> = builder.into();
                                let invoice = builder.allow_mpp().build_and_sign(secp_ctx)?;
-                               let reply_path = self.create_blinded_path()
+                               let reply_paths = self.create_blinded_paths(OffersContext::Unknown {})
                                        .map_err(|_| Bolt12SemanticError::MissingPaths)?;
 
                                let mut pending_offers_messages = self.pending_offers_messages.lock().unwrap();
                                if refund.paths().is_empty() {
-                                       let message = new_pending_onion_message(
-                                               OffersMessage::Invoice(invoice.clone()),
-                                               Destination::Node(refund.payer_id()),
-                                               Some(reply_path),
-                                       );
-                                       pending_offers_messages.push(message);
-                               } else {
-                                       for path in refund.paths() {
+                                       for reply_path in reply_paths {
                                                let message = new_pending_onion_message(
                                                        OffersMessage::Invoice(invoice.clone()),
-                                                       Destination::BlindedPath(path.clone()),
-                                                       Some(reply_path.clone()),
+                                                       Destination::Node(refund.payer_id()),
+                                                       Some(reply_path),
                                                );
                                                pending_offers_messages.push(message);
                                        }
+                               } else {
+                                       reply_paths
+                                               .iter()
+                                               .flat_map(|reply_path| refund.paths().iter().map(move |path| (path, reply_path)))
+                                               .take(OFFERS_MESSAGE_REQUEST_LIMIT)
+                                               .for_each(|(path, reply_path)| {
+                                                       let message = new_pending_onion_message(
+                                                               OffersMessage::Invoice(invoice.clone()),
+                                                               Destination::BlindedPath(path.clone()),
+                                                               Some(reply_path.clone()),
+                                                       );
+                                                       pending_offers_messages.push(message);
+                                               });
                                }
 
                                Ok(invoice)
@@ -8726,22 +9064,22 @@ where
                inbound_payment::get_payment_preimage(payment_hash, payment_secret, &self.inbound_payment_key)
        }
 
-       /// Creates a blinded path by delegating to [`MessageRouter`] based on the path's intended
-       /// lifetime.
+       /// Creates a collection of blinded paths by delegating to [`MessageRouter`] based on
+       /// the path's intended lifetime.
        ///
        /// Whether or not the path is compact depends on whether the path is short-lived or long-lived,
        /// respectively, based on the given `absolute_expiry` as seconds since the Unix epoch. See
        /// [`MAX_SHORT_LIVED_RELATIVE_EXPIRY`].
-       fn create_blinded_path_using_absolute_expiry(
-               &self, absolute_expiry: Option<Duration>
-       ) -> Result<BlindedPath, ()> {
+       fn create_blinded_paths_using_absolute_expiry(
+               &self, context: OffersContext, absolute_expiry: Option<Duration>,
+       ) -> Result<Vec<BlindedPath>, ()> {
                let now = self.duration_since_epoch();
                let max_short_lived_absolute_expiry = now.saturating_add(MAX_SHORT_LIVED_RELATIVE_EXPIRY);
 
                if absolute_expiry.unwrap_or(Duration::MAX) <= max_short_lived_absolute_expiry {
-                       self.create_compact_blinded_path()
+                       self.create_compact_blinded_paths(context)
                } else {
-                       self.create_blinded_path()
+                       self.create_blinded_paths(context)
                }
        }
 
@@ -8758,10 +9096,11 @@ where
                now
        }
 
-       /// Creates a blinded path by delegating to [`MessageRouter::create_blinded_paths`].
+       /// Creates a collection of blinded paths by delegating to
+       /// [`MessageRouter::create_blinded_paths`].
        ///
-       /// Errors if the `MessageRouter` errors or returns an empty `Vec`.
-       fn create_blinded_path(&self) -> Result<BlindedPath, ()> {
+       /// Errors if the `MessageRouter` errors.
+       fn create_blinded_paths(&self, context: OffersContext) -> Result<Vec<BlindedPath>, ()> {
                let recipient = self.get_our_node_id();
                let secp_ctx = &self.secp_ctx;
 
@@ -8774,14 +9113,15 @@ where
                        .collect::<Vec<_>>();
 
                self.router
-                       .create_blinded_paths(recipient, peers, secp_ctx)
-                       .and_then(|paths| paths.into_iter().next().ok_or(()))
+                       .create_blinded_paths(recipient, MessageContext::Offers(context), peers, secp_ctx)
+                       .and_then(|paths| (!paths.is_empty()).then(|| paths).ok_or(()))
        }
 
-       /// Creates a blinded path by delegating to [`MessageRouter::create_compact_blinded_paths`].
+       /// Creates a collection of blinded paths by delegating to
+       /// [`MessageRouter::create_compact_blinded_paths`].
        ///
-       /// Errors if the `MessageRouter` errors or returns an empty `Vec`.
-       fn create_compact_blinded_path(&self) -> Result<BlindedPath, ()> {
+       /// Errors if the `MessageRouter` errors.
+       fn create_compact_blinded_paths(&self, context: OffersContext) -> Result<Vec<BlindedPath>, ()> {
                let recipient = self.get_our_node_id();
                let secp_ctx = &self.secp_ctx;
 
@@ -8801,8 +9141,8 @@ where
                        .collect::<Vec<_>>();
 
                self.router
-                       .create_compact_blinded_paths(recipient, peers, secp_ctx)
-                       .and_then(|paths| paths.into_iter().next().ok_or(()))
+                       .create_compact_blinded_paths(recipient, MessageContext::Offers(context), peers, secp_ctx)
+                       .and_then(|paths| (!paths.is_empty()).then(|| paths).ok_or(()))
        }
 
        /// Creates multi-hop blinded payment paths for the given `amount_msats` by delegating to
@@ -9438,6 +9778,7 @@ where
                                                htlc_id: htlc.prev_htlc_id,
                                                incoming_packet_shared_secret: htlc.forward_info.incoming_shared_secret,
                                                phantom_shared_secret: None,
+                                               counterparty_node_id: htlc.prev_counterparty_node_id,
                                                outpoint: htlc.prev_funding_outpoint,
                                                channel_id: htlc.prev_channel_id,
                                                blinded_failure: htlc.forward_info.routing.blinded_failure(),
@@ -9623,7 +9964,7 @@ where
        }
 
        #[cfg(splicing)]
-       fn handle_splice(&self, counterparty_node_id: &PublicKey, msg: &msgs::Splice) {
+       fn handle_splice_init(&self, counterparty_node_id: &PublicKey, msg: &msgs::SpliceInit) {
                let _: Result<(), _> = handle_error!(self, Err(MsgHandleErrInternal::send_err_msg_no_close(
                        "Splicing not supported".to_owned(),
                         msg.channel_id.clone())), *counterparty_node_id);
@@ -9830,7 +10171,7 @@ where
                                                // Quiescence
                                                &events::MessageSendEvent::SendStfu { .. } => false,
                                                // Splicing
-                                               &events::MessageSendEvent::SendSplice { .. } => false,
+                                               &events::MessageSendEvent::SendSpliceInit { .. } => false,
                                                &events::MessageSendEvent::SendSpliceAck { .. } => false,
                                                &events::MessageSendEvent::SendSpliceLocked { .. } => false,
                                                // Interactive Transaction Construction
@@ -10182,10 +10523,17 @@ where
        R::Target: Router,
        L::Target: Logger,
 {
-       fn handle_message(&self, message: OffersMessage, responder: Option<Responder>) -> ResponseInstruction<OffersMessage> {
+       fn handle_message(&self, message: OffersMessage, context: OffersContext, responder: Option<Responder>) -> ResponseInstruction<OffersMessage> {
                let secp_ctx = &self.secp_ctx;
                let expanded_key = &self.inbound_payment_key;
 
+               let abandon_if_payment = |context| {
+                       match context {
+                               OffersContext::OutboundPayment { payment_id } => self.abandon_payment(payment_id),
+                               _ => {},
+                       }
+               };
+
                match message {
                        OffersMessage::InvoiceRequest(invoice_request) => {
                                let responder = match responder {
@@ -10272,42 +10620,66 @@ where
                                };
 
                                match response {
-                                       Ok(invoice) => return responder.respond(OffersMessage::Invoice(invoice)),
-                                       Err(error) => return responder.respond(OffersMessage::InvoiceError(error.into())),
+                                       Ok(invoice) => responder.respond(OffersMessage::Invoice(invoice)),
+                                       Err(error) => responder.respond(OffersMessage::InvoiceError(error.into())),
                                }
                        },
                        OffersMessage::Invoice(invoice) => {
-                               let response = invoice
-                                       .verify(expanded_key, secp_ctx)
-                                       .map_err(|()| InvoiceError::from_string("Unrecognized invoice".to_owned()))
-                                       .and_then(|payment_id| {
+                               let result = match invoice.verify(expanded_key, secp_ctx) {
+                                       Ok(payment_id) => {
                                                let features = self.bolt12_invoice_features();
                                                if invoice.invoice_features().requires_unknown_bits_from(&features) {
                                                        Err(InvoiceError::from(Bolt12SemanticError::UnknownRequiredFeatures))
+                                               } else if self.default_configuration.manually_handle_bolt12_invoices {
+                                                       let event = Event::InvoiceReceived { payment_id, invoice, responder };
+                                                       self.pending_events.lock().unwrap().push_back((event, None));
+                                                       return ResponseInstruction::NoResponse;
                                                } else {
-                                                       self.send_payment_for_bolt12_invoice(&invoice, payment_id)
+                                                       self.send_payment_for_verified_bolt12_invoice(&invoice, payment_id)
                                                                .map_err(|e| {
                                                                        log_trace!(self.logger, "Failed paying invoice: {:?}", e);
                                                                        InvoiceError::from_string(format!("{:?}", e))
                                                                })
                                                }
-                                       });
+                                       },
+                                       Err(()) => Err(InvoiceError::from_string("Unrecognized invoice".to_owned())),
+                               };
 
-                               match (responder, response) {
-                                       (Some(responder), Err(e)) => responder.respond(OffersMessage::InvoiceError(e)),
-                                       (None, Err(_)) => {
-                                               log_trace!(
-                                                       self.logger,
-                                                       "A response was generated, but there is no reply_path specified for sending the response."
-                                               );
-                                               return ResponseInstruction::NoResponse;
-                                       }
-                                       _ => return ResponseInstruction::NoResponse,
+                               match result {
+                                       Ok(_) => ResponseInstruction::NoResponse,
+                                       Err(err) => match responder {
+                                               Some(responder) => {
+                                                       abandon_if_payment(context);
+                                                       responder.respond(OffersMessage::InvoiceError(err))
+                                               },
+                                               None => {
+                                                       abandon_if_payment(context);
+                                                       log_trace!(
+                                                               self.logger,
+                                                               "An error response was generated, but there is no reply_path specified \
+                                                               for sending the response. Error: {}",
+                                                               err
+                                                       );
+                                                       return ResponseInstruction::NoResponse;
+                                               },
+                                       },
+                               }
+                       },
+                       #[cfg(async_payments)]
+                       OffersMessage::StaticInvoice(_invoice) => {
+                               match responder {
+                                       Some(responder) => {
+                                               responder.respond(OffersMessage::InvoiceError(
+                                                               InvoiceError::from_string("Static invoices not yet supported".to_string())
+                                               ))
+                                       },
+                                       None => return ResponseInstruction::NoResponse,
                                }
                        },
                        OffersMessage::InvoiceError(invoice_error) => {
+                               abandon_if_payment(context);
                                log_trace!(self.logger, "Received invoice_error: {}", invoice_error);
-                               return ResponseInstruction::NoResponse;
+                               ResponseInstruction::NoResponse
                        },
                }
        }
@@ -10317,6 +10689,31 @@ where
        }
 }
 
+impl<M: Deref, T: Deref, ES: Deref, NS: Deref, SP: Deref, F: Deref, R: Deref, L: Deref>
+AsyncPaymentsMessageHandler for ChannelManager<M, T, ES, NS, SP, F, R, L>
+where
+       M::Target: chain::Watch<<SP::Target as SignerProvider>::EcdsaSigner>,
+       T::Target: BroadcasterInterface,
+       ES::Target: EntropySource,
+       NS::Target: NodeSigner,
+       SP::Target: SignerProvider,
+       F::Target: FeeEstimator,
+       R::Target: Router,
+       L::Target: Logger,
+{
+       fn held_htlc_available(
+               &self, _message: HeldHtlcAvailable, _responder: Option<Responder>
+       ) -> ResponseInstruction<ReleaseHeldHtlc> {
+               ResponseInstruction::NoResponse
+       }
+
+       fn release_held_htlc(&self, _message: ReleaseHeldHtlc) {}
+
+       fn release_pending_messages(&self) -> Vec<PendingOnionMessage<AsyncPaymentsMessage>> {
+               Vec::new()
+       }
+}
+
 impl<M: Deref, T: Deref, ES: Deref, NS: Deref, SP: Deref, F: Deref, R: Deref, L: Deref>
 NodeIdLookUp for ChannelManager<M, T, ES, NS, SP, F, R, L>
 where
@@ -10432,7 +10829,7 @@ impl_writeable_tlv_based_enum!(PendingHTLCRouting,
                (4, payment_data, option), // Added in 0.0.116
                (5, custom_tlvs, optional_vec),
        },
-;);
+);
 
 impl_writeable_tlv_based!(PendingHTLCInfo, {
        (0, routing, required),
@@ -10512,14 +10909,14 @@ impl Readable for HTLCFailureMsg {
        }
 }
 
-impl_writeable_tlv_based_enum!(PendingHTLCStatus, ;
+impl_writeable_tlv_based_enum_legacy!(PendingHTLCStatus, ;
        (0, Forward),
        (1, Fail),
 );
 
 impl_writeable_tlv_based_enum!(BlindedFailure,
        (0, FromIntroductionNode) => {},
-       (2, FromBlindedNode) => {}, ;
+       (2, FromBlindedNode) => {},
 );
 
 impl_writeable_tlv_based!(HTLCPreviousHopData, {
@@ -10533,6 +10930,7 @@ impl_writeable_tlv_based!(HTLCPreviousHopData, {
        // Note that by the time we get past the required read for type 2 above, outpoint will be
        // filled in, so we can safely unwrap it here.
        (9, channel_id, (default_value, ChannelId::v1_from_funding_outpoint(outpoint.0.unwrap()))),
+       (11, counterparty_node_id, option),
 });
 
 impl Writeable for ClaimableHTLC {
@@ -10689,6 +11087,7 @@ impl_writeable_tlv_based!(PendingAddHTLCInfo, {
        // Note that by the time we get past the required read for type 6 above, prev_funding_outpoint will be
        // filled in, so we can safely unwrap it here.
        (7, prev_channel_id, (default_value, ChannelId::v1_from_funding_outpoint(prev_funding_outpoint.0.unwrap()))),
+       (9, prev_counterparty_node_id, option),
 });
 
 impl Writeable for HTLCForwardInfo {
@@ -11966,7 +12365,12 @@ where
                                        for action in actions.iter() {
                                                if let MonitorUpdateCompletionAction::EmitEventAndFreeOtherChannel {
                                                        downstream_counterparty_and_funding_outpoint:
-                                                               Some((blocked_node_id, _blocked_channel_outpoint, blocked_channel_id, blocking_action)), ..
+                                                               Some(EventUnblockedChannel {
+                                                                       counterparty_node_id: blocked_node_id,
+                                                                       funding_txo: _,
+                                                                       channel_id: blocked_channel_id,
+                                                                       blocking_action,
+                                                               }), ..
                                                } = action {
                                                        if let Some(blocked_peer_state) = per_peer_state.get(blocked_node_id) {
                                                                log_trace!(logger,
@@ -12153,8 +12557,8 @@ mod tests {
                // update message and would always update the local fee info, even if our peer was
                // (spuriously) forwarding us our own channel_update.
                let as_node_one = nodes[0].node.get_our_node_id().serialize()[..] < nodes[1].node.get_our_node_id().serialize()[..];
-               let as_update = if as_node_one == (chan.0.contents.flags & 1 == 0 /* chan.0 is from node one */) { &chan.0 } else { &chan.1 };
-               let bs_update = if as_node_one == (chan.0.contents.flags & 1 == 0 /* chan.0 is from node one */) { &chan.1 } else { &chan.0 };
+               let as_update = if as_node_one == (chan.0.contents.channel_flags & 1 == 0 /* chan.0 is from node one */) { &chan.0 } else { &chan.1 };
+               let bs_update = if as_node_one == (chan.0.contents.channel_flags & 1 == 0 /* chan.0 is from node one */) { &chan.1 } else { &chan.0 };
 
                // First deliver each peers' own message, checking that the node doesn't need to be
                // persisted and that its channel info remains the same.