Merge pull request #3152 from alecchendev/2024-06-async-commit-secret-raa
[rust-lightning] / lightning / src / ln / channelmanager.rs
index f189aee04cf75b2933a5f1508ec065b4b1356104..561053d85b4158b0c5d61546b8a1428ea18cce31 100644 (file)
@@ -31,6 +31,7 @@ use bitcoin::secp256k1::{SecretKey,PublicKey};
 use bitcoin::secp256k1::Secp256k1;
 use bitcoin::{secp256k1, Sequence};
 
+use crate::blinded_path::message::{MessageContext, OffersContext};
 use crate::blinded_path::{BlindedPath, NodeIdLookUp};
 use crate::blinded_path::message::ForwardNode;
 use crate::blinded_path::payment::{Bolt12OfferContext, Bolt12RefundContext, PaymentConstraints, PaymentContext, ReceiveTlvs};
@@ -40,7 +41,7 @@ use crate::chain::chaininterface::{BroadcasterInterface, ConfirmationTarget, Fee
 use crate::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, WithChannelMonitor, ChannelMonitorUpdateStep, HTLC_FAIL_BACK_BUFFER, CLTV_CLAIM_BUFFER, LATENCY_GRACE_PERIOD_BLOCKS, ANTI_REORG_DELAY, MonitorEvent, CLOSED_CHANNEL_UPDATE_ID};
 use crate::chain::transaction::{OutPoint, TransactionData};
 use crate::events;
-use crate::events::{Event, EventHandler, EventsProvider, MessageSendEvent, MessageSendEventsProvider, ClosureReason, HTLCDestination, PaymentFailureReason};
+use crate::events::{Event, EventHandler, EventsProvider, MessageSendEvent, MessageSendEventsProvider, ClosureReason, HTLCDestination, PaymentFailureReason, ReplayEvent};
 // Since this struct is returned in `list_channels` methods, expose it here in case users want to
 // construct one themselves.
 use crate::ln::inbound_payment;
@@ -66,6 +67,7 @@ use crate::offers::invoice_request::{DerivedPayerId, InvoiceRequestBuilder};
 use crate::offers::offer::{Offer, OfferBuilder};
 use crate::offers::parse::Bolt12SemanticError;
 use crate::offers::refund::{Refund, RefundBuilder};
+use crate::onion_message::async_payments::{AsyncPaymentsMessage, HeldHtlcAvailable, ReleaseHeldHtlc, AsyncPaymentsMessageHandler};
 use crate::onion_message::messenger::{new_pending_onion_message, Destination, MessageRouter, PendingOnionMessage, Responder, ResponseInstruction};
 use crate::onion_message::offers::{OffersMessage, OffersMessageHandler};
 use crate::sign::{EntropySource, NodeSigner, Recipient, SignerProvider};
@@ -306,6 +308,7 @@ pub(super) struct PendingAddHTLCInfo {
        // Note that this may be an outbound SCID alias for the associated channel.
        prev_short_channel_id: u64,
        prev_htlc_id: u64,
+       prev_counterparty_node_id: Option<PublicKey>,
        prev_channel_id: ChannelId,
        prev_funding_outpoint: OutPoint,
        prev_user_channel_id: u128,
@@ -349,9 +352,10 @@ pub(crate) struct HTLCPreviousHopData {
        blinded_failure: Option<BlindedFailure>,
        channel_id: ChannelId,
 
-       // This field is consumed by `claim_funds_from_hop()` when updating a force-closed backwards
+       // These fields are consumed by `claim_funds_from_hop()` when updating a force-closed backwards
        // channel with a preimage provided by the forward channel.
        outpoint: OutPoint,
+       counterparty_node_id: Option<PublicKey>,
 }
 
 enum OnionPayload {
@@ -472,7 +476,7 @@ impl_writeable_tlv_based_enum!(SentHTLCId,
        },
        (2, OutboundRoute) => {
                (0, session_priv, required),
-       };
+       },
 );
 
 
@@ -666,7 +670,7 @@ pub(super) const MIN_HTLC_RELAY_HOLDING_CELL_MILLIS: u64 = 100;
 /// be sent in the order they appear in the return value, however sometimes the order needs to be
 /// variable at runtime (eg Channel::channel_reestablish needs to re-send messages in the order
 /// they were originally sent). In those cases, this enum is also returned.
-#[derive(Clone, PartialEq)]
+#[derive(Clone, PartialEq, Debug)]
 pub(super) enum RAACommitmentOrder {
        /// Send the CommitmentUpdate messages first
        CommitmentFirst,
@@ -755,13 +759,55 @@ enum BackgroundEvent {
        },
 }
 
+/// A pointer to a channel that is unblocked when an event is surfaced
+#[derive(Debug)]
+pub(crate) struct EventUnblockedChannel {
+       counterparty_node_id: PublicKey,
+       funding_txo: OutPoint,
+       channel_id: ChannelId,
+       blocking_action: RAAMonitorUpdateBlockingAction,
+}
+
+impl Writeable for EventUnblockedChannel {
+       fn write<W: Writer>(&self, writer: &mut W) -> Result<(), io::Error> {
+               self.counterparty_node_id.write(writer)?;
+               self.funding_txo.write(writer)?;
+               self.channel_id.write(writer)?;
+               self.blocking_action.write(writer)
+       }
+}
+
+impl MaybeReadable for EventUnblockedChannel {
+       fn read<R: Read>(reader: &mut R) -> Result<Option<Self>, DecodeError> {
+               let counterparty_node_id = Readable::read(reader)?;
+               let funding_txo = Readable::read(reader)?;
+               let channel_id = Readable::read(reader)?;
+               let blocking_action = match RAAMonitorUpdateBlockingAction::read(reader)? {
+                       Some(blocking_action) => blocking_action,
+                       None => return Ok(None),
+               };
+               Ok(Some(EventUnblockedChannel {
+                       counterparty_node_id,
+                       funding_txo,
+                       channel_id,
+                       blocking_action,
+               }))
+       }
+}
+
 #[derive(Debug)]
 pub(crate) enum MonitorUpdateCompletionAction {
        /// Indicates that a payment ultimately destined for us was claimed and we should emit an
        /// [`events::Event::PaymentClaimed`] to the user if we haven't yet generated such an event for
        /// this payment. Note that this is only best-effort. On restart it's possible such a duplicate
        /// event can be generated.
-       PaymentClaimed { payment_hash: PaymentHash },
+       PaymentClaimed {
+               payment_hash: PaymentHash,
+               /// A pending MPP claim which hasn't yet completed.
+               ///
+               /// Not written to disk.
+               pending_mpp_claim: Option<(PublicKey, ChannelId, u64, PendingMPPClaimPointer)>,
+       },
        /// Indicates an [`events::Event`] should be surfaced to the user and possibly resume the
        /// operation of another channel.
        ///
@@ -772,7 +818,7 @@ pub(crate) enum MonitorUpdateCompletionAction {
        /// outbound edge.
        EmitEventAndFreeOtherChannel {
                event: events::Event,
-               downstream_counterparty_and_funding_outpoint: Option<(PublicKey, OutPoint, ChannelId, RAAMonitorUpdateBlockingAction)>,
+               downstream_counterparty_and_funding_outpoint: Option<EventUnblockedChannel>,
        },
        /// Indicates we should immediately resume the operation of another channel, unless there is
        /// some other reason why the channel is blocked. In practice this simply means immediately
@@ -795,13 +841,16 @@ pub(crate) enum MonitorUpdateCompletionAction {
 }
 
 impl_writeable_tlv_based_enum_upgradable!(MonitorUpdateCompletionAction,
-       (0, PaymentClaimed) => { (0, payment_hash, required) },
+       (0, PaymentClaimed) => {
+               (0, payment_hash, required),
+               (9999999999, pending_mpp_claim, (static_value, None)),
+       },
        // Note that FreeOtherChannelImmediately should never be written - we were supposed to free
        // *immediately*. However, for simplicity we implement read/write here.
        (1, FreeOtherChannelImmediately) => {
                (0, downstream_counterparty_node_id, required),
                (2, downstream_funding_outpoint, required),
-               (4, blocking_action, required),
+               (4, blocking_action, upgradable_required),
                // Note that by the time we get past the required read above, downstream_funding_outpoint will be
                // filled in, so we can safely unwrap it here.
                (5, downstream_channel_id, (default_value, ChannelId::v1_from_funding_outpoint(downstream_funding_outpoint.0.unwrap()))),
@@ -813,7 +862,7 @@ impl_writeable_tlv_based_enum_upgradable!(MonitorUpdateCompletionAction,
                // monitor updates which aren't properly blocked or resumed, however that's fine - we don't
                // support async monitor updates even in LDK 0.0.116 and once we do we'll require no
                // downgrades to prior versions.
-               (1, downstream_counterparty_and_funding_outpoint, option),
+               (1, downstream_counterparty_and_funding_outpoint, upgradable_option),
        },
 );
 
@@ -832,9 +881,29 @@ impl_writeable_tlv_based_enum!(EventCompletionAction,
                // Note that by the time we get past the required read above, channel_funding_outpoint will be
                // filled in, so we can safely unwrap it here.
                (3, channel_id, (default_value, ChannelId::v1_from_funding_outpoint(channel_funding_outpoint.0.unwrap()))),
-       };
+       }
 );
 
+#[derive(Debug)]
+pub(crate) struct PendingMPPClaim {
+       channels_without_preimage: Vec<(PublicKey, OutPoint, ChannelId, u64)>,
+       channels_with_preimage: Vec<(PublicKey, OutPoint, ChannelId)>,
+}
+
+#[derive(Clone)]
+pub(crate) struct PendingMPPClaimPointer(Arc<Mutex<PendingMPPClaim>>);
+
+impl PartialEq for PendingMPPClaimPointer {
+       fn eq(&self, o: &Self) -> bool { Arc::ptr_eq(&self.0, &o.0) }
+}
+impl Eq for PendingMPPClaimPointer {}
+
+impl core::fmt::Debug for PendingMPPClaimPointer {
+       fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> Result<(), core::fmt::Error> {
+               self.0.lock().unwrap().fmt(f)
+       }
+}
+
 #[derive(Clone, PartialEq, Eq, Debug)]
 /// If something is blocked on the completion of an RAA-generated [`ChannelMonitorUpdate`] we track
 /// the blocked action here. See enum variants for more info.
@@ -848,6 +917,16 @@ pub(crate) enum RAAMonitorUpdateBlockingAction {
                /// The HTLC ID on the inbound edge.
                htlc_id: u64,
        },
+       /// We claimed an MPP payment across multiple channels. We have to block removing the payment
+       /// preimage from any monitor until the last monitor is updated to contain the payment
+       /// preimage. Otherwise we may not be able to replay the preimage on the monitor(s) that
+       /// weren't updated on startup.
+       ///
+       /// This variant is *not* written to disk, instead being inferred from [`ChannelMonitor`]
+       /// state.
+       ClaimedMPPPayment {
+               pending_claim: PendingMPPClaimPointer,
+       }
 }
 
 impl RAAMonitorUpdateBlockingAction {
@@ -859,10 +938,16 @@ impl RAAMonitorUpdateBlockingAction {
        }
 }
 
-impl_writeable_tlv_based_enum!(RAAMonitorUpdateBlockingAction,
-       (0, ForwardedPaymentInboundClaim) => { (0, channel_id, required), (2, htlc_id, required) }
-;);
+impl_writeable_tlv_based_enum_upgradable!(RAAMonitorUpdateBlockingAction,
+       (0, ForwardedPaymentInboundClaim) => { (0, channel_id, required), (2, htlc_id, required) },
+       unread_variants: ClaimedMPPPayment
+);
 
+impl Readable for Option<RAAMonitorUpdateBlockingAction> {
+       fn read<R: Read>(reader: &mut R) -> Result<Self, DecodeError> {
+               Ok(RAAMonitorUpdateBlockingAction::read(reader)?)
+       }
+}
 
 /// State we hold per-peer.
 pub(super) struct PeerState<SP: Deref> where SP::Target: SignerProvider {
@@ -1310,35 +1395,38 @@ where
 /// }
 ///
 /// // On the event processing thread once the peer has responded
-/// channel_manager.process_pending_events(&|event| match event {
-///     Event::FundingGenerationReady {
-///         temporary_channel_id, counterparty_node_id, channel_value_satoshis, output_script,
-///         user_channel_id, ..
-///     } => {
-///         assert_eq!(user_channel_id, 42);
-///         let funding_transaction = wallet.create_funding_transaction(
-///             channel_value_satoshis, output_script
-///         );
-///         match channel_manager.funding_transaction_generated(
-///             &temporary_channel_id, &counterparty_node_id, funding_transaction
-///         ) {
-///             Ok(()) => println!("Funding channel {}", temporary_channel_id),
-///             Err(e) => println!("Error funding channel {}: {:?}", temporary_channel_id, e),
-///         }
-///     },
-///     Event::ChannelPending { channel_id, user_channel_id, former_temporary_channel_id, .. } => {
-///         assert_eq!(user_channel_id, 42);
-///         println!(
-///             "Channel {} now {} pending (funding transaction has been broadcasted)", channel_id,
-///             former_temporary_channel_id.unwrap()
-///         );
-///     },
-///     Event::ChannelReady { channel_id, user_channel_id, .. } => {
-///         assert_eq!(user_channel_id, 42);
-///         println!("Channel {} ready", channel_id);
-///     },
-///     // ...
-/// #     _ => {},
+/// channel_manager.process_pending_events(&|event| {
+///     match event {
+///         Event::FundingGenerationReady {
+///             temporary_channel_id, counterparty_node_id, channel_value_satoshis, output_script,
+///             user_channel_id, ..
+///         } => {
+///             assert_eq!(user_channel_id, 42);
+///             let funding_transaction = wallet.create_funding_transaction(
+///                 channel_value_satoshis, output_script
+///             );
+///             match channel_manager.funding_transaction_generated(
+///                 &temporary_channel_id, &counterparty_node_id, funding_transaction
+///             ) {
+///                 Ok(()) => println!("Funding channel {}", temporary_channel_id),
+///                 Err(e) => println!("Error funding channel {}: {:?}", temporary_channel_id, e),
+///             }
+///         },
+///         Event::ChannelPending { channel_id, user_channel_id, former_temporary_channel_id, .. } => {
+///             assert_eq!(user_channel_id, 42);
+///             println!(
+///                 "Channel {} now {} pending (funding transaction has been broadcasted)", channel_id,
+///                 former_temporary_channel_id.unwrap()
+///             );
+///         },
+///         Event::ChannelReady { channel_id, user_channel_id, .. } => {
+///             assert_eq!(user_channel_id, 42);
+///             println!("Channel {} ready", channel_id);
+///         },
+///         // ...
+///     #     _ => {},
+///     }
+///     Ok(())
 /// });
 /// # }
 /// ```
@@ -1362,28 +1450,31 @@ where
 /// # fn example<T: AChannelManager>(channel_manager: T) {
 /// # let channel_manager = channel_manager.get_cm();
 /// # let error_message = "Channel force-closed";
-/// channel_manager.process_pending_events(&|event| match event {
-///     Event::OpenChannelRequest { temporary_channel_id, counterparty_node_id, ..  } => {
-///         if !is_trusted(counterparty_node_id) {
-///             match channel_manager.force_close_without_broadcasting_txn(
-///                 &temporary_channel_id, &counterparty_node_id, error_message.to_string()
-///             ) {
-///                 Ok(()) => println!("Rejecting channel {}", temporary_channel_id),
-///                 Err(e) => println!("Error rejecting channel {}: {:?}", temporary_channel_id, e),
+/// channel_manager.process_pending_events(&|event| {
+///     match event {
+///         Event::OpenChannelRequest { temporary_channel_id, counterparty_node_id, ..  } => {
+///             if !is_trusted(counterparty_node_id) {
+///                 match channel_manager.force_close_without_broadcasting_txn(
+///                     &temporary_channel_id, &counterparty_node_id, error_message.to_string()
+///                 ) {
+///                     Ok(()) => println!("Rejecting channel {}", temporary_channel_id),
+///                     Err(e) => println!("Error rejecting channel {}: {:?}", temporary_channel_id, e),
+///                 }
+///                 return Ok(());
 ///             }
-///             return;
-///         }
 ///
-///         let user_channel_id = 43;
-///         match channel_manager.accept_inbound_channel(
-///             &temporary_channel_id, &counterparty_node_id, user_channel_id
-///         ) {
-///             Ok(()) => println!("Accepting channel {}", temporary_channel_id),
-///             Err(e) => println!("Error accepting channel {}: {:?}", temporary_channel_id, e),
-///         }
-///     },
-///     // ...
-/// #     _ => {},
+///             let user_channel_id = 43;
+///             match channel_manager.accept_inbound_channel(
+///                 &temporary_channel_id, &counterparty_node_id, user_channel_id
+///             ) {
+///                 Ok(()) => println!("Accepting channel {}", temporary_channel_id),
+///                 Err(e) => println!("Error accepting channel {}: {:?}", temporary_channel_id, e),
+///             }
+///         },
+///         // ...
+///     #     _ => {},
+///     }
+///     Ok(())
 /// });
 /// # }
 /// ```
@@ -1412,13 +1503,16 @@ where
 /// }
 ///
 /// // On the event processing thread
-/// channel_manager.process_pending_events(&|event| match event {
-///     Event::ChannelClosed { channel_id, user_channel_id, ..  } => {
-///         assert_eq!(user_channel_id, 42);
-///         println!("Channel {} closed", channel_id);
-///     },
-///     // ...
-/// #     _ => {},
+/// channel_manager.process_pending_events(&|event| {
+///     match event {
+///         Event::ChannelClosed { channel_id, user_channel_id, ..  } => {
+///             assert_eq!(user_channel_id, 42);
+///             println!("Channel {} closed", channel_id);
+///         },
+///         // ...
+///     #     _ => {},
+///     }
+///     Ok(())
 /// });
 /// # }
 /// ```
@@ -1468,30 +1562,33 @@ where
 /// };
 ///
 /// // On the event processing thread
-/// channel_manager.process_pending_events(&|event| match event {
-///     Event::PaymentClaimable { payment_hash, purpose, .. } => match purpose {
-///         PaymentPurpose::Bolt11InvoicePayment { payment_preimage: Some(payment_preimage), .. } => {
-///             assert_eq!(payment_hash, known_payment_hash);
-///             println!("Claiming payment {}", payment_hash);
-///             channel_manager.claim_funds(payment_preimage);
+/// channel_manager.process_pending_events(&|event| {
+///     match event {
+///         Event::PaymentClaimable { payment_hash, purpose, .. } => match purpose {
+///             PaymentPurpose::Bolt11InvoicePayment { payment_preimage: Some(payment_preimage), .. } => {
+///                 assert_eq!(payment_hash, known_payment_hash);
+///                 println!("Claiming payment {}", payment_hash);
+///                 channel_manager.claim_funds(payment_preimage);
+///             },
+///             PaymentPurpose::Bolt11InvoicePayment { payment_preimage: None, .. } => {
+///                 println!("Unknown payment hash: {}", payment_hash);
+///             },
+///             PaymentPurpose::SpontaneousPayment(payment_preimage) => {
+///                 assert_ne!(payment_hash, known_payment_hash);
+///                 println!("Claiming spontaneous payment {}", payment_hash);
+///                 channel_manager.claim_funds(payment_preimage);
+///             },
+///             // ...
+/// #           _ => {},
 ///         },
-///         PaymentPurpose::Bolt11InvoicePayment { payment_preimage: None, .. } => {
-///             println!("Unknown payment hash: {}", payment_hash);
-///         },
-///         PaymentPurpose::SpontaneousPayment(payment_preimage) => {
-///             assert_ne!(payment_hash, known_payment_hash);
-///             println!("Claiming spontaneous payment {}", payment_hash);
-///             channel_manager.claim_funds(payment_preimage);
+///         Event::PaymentClaimed { payment_hash, amount_msat, .. } => {
+///             assert_eq!(payment_hash, known_payment_hash);
+///             println!("Claimed {} msats", amount_msat);
 ///         },
 ///         // ...
-/// #         _ => {},
-///     },
-///     Event::PaymentClaimed { payment_hash, amount_msat, .. } => {
-///         assert_eq!(payment_hash, known_payment_hash);
-///         println!("Claimed {} msats", amount_msat);
-///     },
-///     // ...
-/// #     _ => {},
+/// #       _ => {},
+///     }
+///     Ok(())
 /// });
 /// # }
 /// ```
@@ -1534,11 +1631,14 @@ where
 /// );
 ///
 /// // On the event processing thread
-/// channel_manager.process_pending_events(&|event| match event {
-///     Event::PaymentSent { payment_hash, .. } => println!("Paid {}", payment_hash),
-///     Event::PaymentFailed { payment_hash, .. } => println!("Failed paying {}", payment_hash),
-///     // ...
-/// #     _ => {},
+/// channel_manager.process_pending_events(&|event| {
+///     match event {
+///         Event::PaymentSent { payment_hash, .. } => println!("Paid {}", payment_hash),
+///         Event::PaymentFailed { payment_hash, .. } => println!("Failed paying {}", payment_hash),
+///         // ...
+///     #     _ => {},
+///     }
+///     Ok(())
 /// });
 /// # }
 /// ```
@@ -1572,23 +1672,25 @@ where
 /// let bech32_offer = offer.to_string();
 ///
 /// // On the event processing thread
-/// channel_manager.process_pending_events(&|event| match event {
-///     Event::PaymentClaimable { payment_hash, purpose, .. } => match purpose {
-///         PaymentPurpose::Bolt12OfferPayment { payment_preimage: Some(payment_preimage), .. } => {
-///             println!("Claiming payment {}", payment_hash);
-///             channel_manager.claim_funds(payment_preimage);
+/// channel_manager.process_pending_events(&|event| {
+///     match event {
+///         Event::PaymentClaimable { payment_hash, purpose, .. } => match purpose {
+///             PaymentPurpose::Bolt12OfferPayment { payment_preimage: Some(payment_preimage), .. } => {
+///                 println!("Claiming payment {}", payment_hash);
+///                 channel_manager.claim_funds(payment_preimage);
+///             },
+///             PaymentPurpose::Bolt12OfferPayment { payment_preimage: None, .. } => {
+///                 println!("Unknown payment hash: {}", payment_hash);
+///             }
+/// #           _ => {},
 ///         },
-///         PaymentPurpose::Bolt12OfferPayment { payment_preimage: None, .. } => {
-///             println!("Unknown payment hash: {}", payment_hash);
+///         Event::PaymentClaimed { payment_hash, amount_msat, .. } => {
+///             println!("Claimed {} msats", amount_msat);
 ///         },
 ///         // ...
-/// #         _ => {},
-///     },
-///     Event::PaymentClaimed { payment_hash, amount_msat, .. } => {
-///         println!("Claimed {} msats", amount_msat);
-///     },
-///     // ...
-/// #     _ => {},
+///     #     _ => {},
+///     }
+///     Ok(())
 /// });
 /// # Ok(())
 /// # }
@@ -1634,12 +1736,15 @@ where
 /// );
 ///
 /// // On the event processing thread
-/// channel_manager.process_pending_events(&|event| match event {
-///     Event::PaymentSent { payment_id: Some(payment_id), .. } => println!("Paid {}", payment_id),
-///     Event::PaymentFailed { payment_id, .. } => println!("Failed paying {}", payment_id),
-///     Event::InvoiceRequestFailed { payment_id, .. } => println!("Failed paying {}", payment_id),
-///     // ...
-/// #     _ => {},
+/// channel_manager.process_pending_events(&|event| {
+///     match event {
+///         Event::PaymentSent { payment_id: Some(payment_id), .. } => println!("Paid {}", payment_id),
+///         Event::PaymentFailed { payment_id, .. } => println!("Failed paying {}", payment_id),
+///         Event::InvoiceRequestFailed { payment_id, .. } => println!("Failed paying {}", payment_id),
+///         // ...
+///     #     _ => {},
+///     }
+///     Ok(())
 /// });
 /// # }
 /// ```
@@ -1694,11 +1799,14 @@ where
 /// );
 ///
 /// // On the event processing thread
-/// channel_manager.process_pending_events(&|event| match event {
-///     Event::PaymentSent { payment_id: Some(payment_id), .. } => println!("Paid {}", payment_id),
-///     Event::PaymentFailed { payment_id, .. } => println!("Failed paying {}", payment_id),
-///     // ...
-/// #     _ => {},
+/// channel_manager.process_pending_events(&|event| {
+///     match event {
+///         Event::PaymentSent { payment_id: Some(payment_id), .. } => println!("Paid {}", payment_id),
+///         Event::PaymentFailed { payment_id, .. } => println!("Failed paying {}", payment_id),
+///         // ...
+///     #     _ => {},
+///     }
+///     Ok(())
 /// });
 /// # Ok(())
 /// # }
@@ -1724,18 +1832,19 @@ where
 /// };
 ///
 /// // On the event processing thread
-/// channel_manager.process_pending_events(&|event| match event {
-///     Event::PaymentClaimable { payment_hash, purpose, .. } => match purpose {
-///            PaymentPurpose::Bolt12RefundPayment { payment_preimage: Some(payment_preimage), .. } => {
-///             assert_eq!(payment_hash, known_payment_hash);
-///             println!("Claiming payment {}", payment_hash);
-///             channel_manager.claim_funds(payment_preimage);
-///         },
-///            PaymentPurpose::Bolt12RefundPayment { payment_preimage: None, .. } => {
-///             println!("Unknown payment hash: {}", payment_hash);
-///            },
-///         // ...
-/// #         _ => {},
+/// channel_manager.process_pending_events(&|event| {
+///     match event {
+///         Event::PaymentClaimable { payment_hash, purpose, .. } => match purpose {
+///             PaymentPurpose::Bolt12RefundPayment { payment_preimage: Some(payment_preimage), .. } => {
+///                 assert_eq!(payment_hash, known_payment_hash);
+///                 println!("Claiming payment {}", payment_hash);
+///                 channel_manager.claim_funds(payment_preimage);
+///             },
+///             PaymentPurpose::Bolt12RefundPayment { payment_preimage: None, .. } => {
+///                 println!("Unknown payment hash: {}", payment_hash);
+///             },
+///             // ...
+/// #           _ => {},
 ///     },
 ///     Event::PaymentClaimed { payment_hash, amount_msat, .. } => {
 ///         assert_eq!(payment_hash, known_payment_hash);
@@ -1743,6 +1852,8 @@ where
 ///     },
 ///     // ...
 /// #     _ => {},
+///     }
+///     Ok(())
 /// });
 /// # }
 /// ```
@@ -2746,8 +2857,9 @@ macro_rules! handle_new_monitor_update {
 
 macro_rules! process_events_body {
        ($self: expr, $event_to_handle: expr, $handle_event: expr) => {
+               let mut handling_failed = false;
                let mut processed_all_events = false;
-               while !processed_all_events {
+               while !handling_failed && !processed_all_events {
                        if $self.pending_events_processor.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed).is_err() {
                                return;
                        }
@@ -2771,24 +2883,34 @@ macro_rules! process_events_body {
                        }
 
                        let pending_events = $self.pending_events.lock().unwrap().clone();
-                       let num_events = pending_events.len();
                        if !pending_events.is_empty() {
                                result = NotifyOption::DoPersist;
                        }
 
                        let mut post_event_actions = Vec::new();
 
+                       let mut num_handled_events = 0;
                        for (event, action_opt) in pending_events {
                                $event_to_handle = event;
-                               $handle_event;
-                               if let Some(action) = action_opt {
-                                       post_event_actions.push(action);
+                               match $handle_event {
+                                       Ok(()) => {
+                                               if let Some(action) = action_opt {
+                                                       post_event_actions.push(action);
+                                               }
+                                               num_handled_events += 1;
+                                       }
+                                       Err(_e) => {
+                                               // If we encounter an error we stop handling events and make sure to replay
+                                               // any unhandled events on the next invocation.
+                                               handling_failed = true;
+                                               break;
+                                       }
                                }
                        }
 
                        {
                                let mut pending_events = $self.pending_events.lock().unwrap();
-                               pending_events.drain(..num_events);
+                               pending_events.drain(..num_handled_events);
                                processed_all_events = pending_events.is_empty();
                                // Note that `push_pending_forwards_ev` relies on `pending_events_processor` being
                                // updated here with the `pending_events` lock acquired.
@@ -2989,7 +3111,7 @@ where
                        let config = if override_config.is_some() { override_config.as_ref().unwrap() } else { &self.default_configuration };
                        match OutboundV1Channel::new(&self.fee_estimator, &self.entropy_source, &self.signer_provider, their_network_key,
                                their_features, channel_value_satoshis, push_msat, user_channel_id, config,
-                               self.best_block.read().unwrap().height, outbound_scid_alias, temporary_channel_id)
+                               self.best_block.read().unwrap().height, outbound_scid_alias, temporary_channel_id, &*self.logger)
                        {
                                Ok(res) => res,
                                Err(e) => {
@@ -3505,7 +3627,7 @@ where
                        // peer has been disabled for some time), return `channel_disabled`,
                        // otherwise return `temporary_channel_failure`.
                        let chan_update_opt = self.get_channel_update_for_onion(next_packet.outgoing_scid, chan).ok();
-                       if chan_update_opt.as_ref().map(|u| u.contents.flags & 2 == 2).unwrap_or(false) {
+                       if chan_update_opt.as_ref().map(|u| u.contents.channel_flags & 2 == 2).unwrap_or(false) {
                                return Err(("Forwarding channel has been disconnected for some time.", 0x1000 | 20, chan_update_opt));
                        } else {
                                return Err(("Forwarding channel is not in a ready state.", 0x1000 | 7, chan_update_opt));
@@ -3784,7 +3906,8 @@ where
                        chain_hash: self.chain_hash,
                        short_channel_id,
                        timestamp: chan.context.get_update_time_counter(),
-                       flags: (!were_node_one) as u8 | ((!enabled as u8) << 1),
+                       message_flags: 1, // Only must_be_one
+                       channel_flags: (!were_node_one) as u8 | ((!enabled as u8) << 1),
                        cltv_expiry_delta: chan.context.get_cltv_expiry_delta(),
                        htlc_minimum_msat: chan.context.get_counterparty_htlc_minimum_msat(),
                        htlc_maximum_msat: chan.context.get_announced_htlc_max_msat(),
@@ -4031,8 +4154,8 @@ where
                self.pending_outbound_payments
                        .send_payment_for_bolt12_invoice(
                                invoice, payment_id, &self.router, self.list_usable_channels(),
-                               || self.compute_inflight_htlcs(), &self.entropy_source, &self.node_signer,
-                               best_block_height, &self.logger, &self.pending_events,
+                               || self.compute_inflight_htlcs(), &self.entropy_source, &self.node_signer, &self,
+                               &self.secp_ctx, best_block_height, &self.logger, &self.pending_events,
                                |args| self.send_payment_along_path(args)
                        )
        }
@@ -4692,6 +4815,7 @@ where
 
                let mut per_source_pending_forward = [(
                        payment.prev_short_channel_id,
+                       payment.prev_counterparty_node_id,
                        payment.prev_funding_outpoint,
                        payment.prev_channel_id,
                        payment.prev_user_channel_id,
@@ -4722,6 +4846,7 @@ where
                                user_channel_id: Some(payment.prev_user_channel_id),
                                outpoint: payment.prev_funding_outpoint,
                                channel_id: payment.prev_channel_id,
+                               counterparty_node_id: payment.prev_counterparty_node_id,
                                htlc_id: payment.prev_htlc_id,
                                incoming_packet_shared_secret: payment.forward_info.incoming_shared_secret,
                                phantom_shared_secret: None,
@@ -4851,8 +4976,10 @@ where
 
                        // Process all of the forwards and failures for the channel in which the HTLCs were
                        // proposed to as a batch.
-                       let pending_forwards = (incoming_scid, incoming_funding_txo, incoming_channel_id,
-                               incoming_user_channel_id, htlc_forwards.drain(..).collect());
+                       let pending_forwards = (
+                               incoming_scid, Some(incoming_counterparty_node_id), incoming_funding_txo,
+                               incoming_channel_id, incoming_user_channel_id, htlc_forwards.drain(..).collect()
+                       );
                        self.forward_htlcs_without_forward_event(&mut [pending_forwards]);
                        for (htlc_fail, htlc_destination) in htlc_fails.drain(..) {
                                let failure = match htlc_fail {
@@ -4886,7 +5013,7 @@ where
 
                let mut new_events = VecDeque::new();
                let mut failed_forwards = Vec::new();
-               let mut phantom_receives: Vec<(u64, OutPoint, ChannelId, u128, Vec<(PendingHTLCInfo, u64)>)> = Vec::new();
+               let mut phantom_receives: Vec<(u64, Option<PublicKey>, OutPoint, ChannelId, u128, Vec<(PendingHTLCInfo, u64)>)> = Vec::new();
                {
                        let mut forward_htlcs = new_hash_map();
                        mem::swap(&mut forward_htlcs, &mut self.forward_htlcs.lock().unwrap());
@@ -4895,12 +5022,12 @@ where
                                if short_chan_id != 0 {
                                        let mut forwarding_counterparty = None;
                                        macro_rules! forwarding_channel_not_found {
-                                               () => {
-                                                       for forward_info in pending_forwards.drain(..) {
+                                               ($forward_infos: expr) => {
+                                                       for forward_info in $forward_infos {
                                                                match forward_info {
                                                                        HTLCForwardInfo::AddHTLC(PendingAddHTLCInfo {
                                                                                prev_short_channel_id, prev_htlc_id, prev_channel_id, prev_funding_outpoint,
-                                                                               prev_user_channel_id, forward_info: PendingHTLCInfo {
+                                                                               prev_user_channel_id, prev_counterparty_node_id, forward_info: PendingHTLCInfo {
                                                                                        routing, incoming_shared_secret, payment_hash, outgoing_amt_msat,
                                                                                        outgoing_cltv_value, ..
                                                                                }
@@ -4915,6 +5042,7 @@ where
                                                                                                        user_channel_id: Some(prev_user_channel_id),
                                                                                                        channel_id: prev_channel_id,
                                                                                                        outpoint: prev_funding_outpoint,
+                                                                                                       counterparty_node_id: prev_counterparty_node_id,
                                                                                                        htlc_id: prev_htlc_id,
                                                                                                        incoming_packet_shared_secret: incoming_shared_secret,
                                                                                                        phantom_shared_secret: $phantom_ss,
@@ -4977,7 +5105,10 @@ where
                                                                                                                        outgoing_cltv_value, Some(phantom_shared_secret), false, None,
                                                                                                                        current_height, self.default_configuration.accept_mpp_keysend)
                                                                                                                {
-                                                                                                                       Ok(info) => phantom_receives.push((prev_short_channel_id, prev_funding_outpoint, prev_channel_id, prev_user_channel_id, vec![(info, prev_htlc_id)])),
+                                                                                                                       Ok(info) => phantom_receives.push((
+                                                                                                                               prev_short_channel_id, prev_counterparty_node_id, prev_funding_outpoint,
+                                                                                                                               prev_channel_id, prev_user_channel_id, vec![(info, prev_htlc_id)]
+                                                                                                                       )),
                                                                                                                        Err(InboundHTLCErr { err_code, err_data, msg }) => failed_payment!(msg, err_code, err_data, Some(phantom_shared_secret))
                                                                                                                }
                                                                                                        },
@@ -5004,7 +5135,7 @@ where
                                        let (counterparty_node_id, forward_chan_id) = match chan_info_opt {
                                                Some((cp_id, chan_id)) => (cp_id, chan_id),
                                                None => {
-                                                       forwarding_channel_not_found!();
+                                                       forwarding_channel_not_found!(pending_forwards.drain(..));
                                                        continue;
                                                }
                                        };
@@ -5012,103 +5143,156 @@ where
                                        let per_peer_state = self.per_peer_state.read().unwrap();
                                        let peer_state_mutex_opt = per_peer_state.get(&counterparty_node_id);
                                        if peer_state_mutex_opt.is_none() {
-                                               forwarding_channel_not_found!();
+                                               forwarding_channel_not_found!(pending_forwards.drain(..));
                                                continue;
                                        }
                                        let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap();
                                        let peer_state = &mut *peer_state_lock;
-                                       if let Some(ChannelPhase::Funded(ref mut chan)) = peer_state.channel_by_id.get_mut(&forward_chan_id) {
-                                               let logger = WithChannelContext::from(&self.logger, &chan.context, None);
-                                               for forward_info in pending_forwards.drain(..) {
-                                                       let queue_fail_htlc_res = match forward_info {
-                                                               HTLCForwardInfo::AddHTLC(PendingAddHTLCInfo {
-                                                                       prev_short_channel_id, prev_htlc_id, prev_channel_id, prev_funding_outpoint,
-                                                                       prev_user_channel_id, forward_info: PendingHTLCInfo {
-                                                                               incoming_shared_secret, payment_hash, outgoing_amt_msat, outgoing_cltv_value,
-                                                                               routing: PendingHTLCRouting::Forward {
-                                                                                       onion_packet, blinded, ..
-                                                                               }, skimmed_fee_msat, ..
+                                       let mut draining_pending_forwards = pending_forwards.drain(..);
+                                       while let Some(forward_info) = draining_pending_forwards.next() {
+                                               let queue_fail_htlc_res = match forward_info {
+                                                       HTLCForwardInfo::AddHTLC(PendingAddHTLCInfo {
+                                                               prev_short_channel_id, prev_htlc_id, prev_channel_id, prev_funding_outpoint,
+                                                               prev_user_channel_id, prev_counterparty_node_id, forward_info: PendingHTLCInfo {
+                                                                       incoming_shared_secret, payment_hash, outgoing_amt_msat, outgoing_cltv_value,
+                                                                       routing: PendingHTLCRouting::Forward {
+                                                                               ref onion_packet, blinded, ..
+                                                                       }, skimmed_fee_msat, ..
+                                                               },
+                                                       }) => {
+                                                               let htlc_source = HTLCSource::PreviousHopData(HTLCPreviousHopData {
+                                                                       short_channel_id: prev_short_channel_id,
+                                                                       user_channel_id: Some(prev_user_channel_id),
+                                                                       counterparty_node_id: prev_counterparty_node_id,
+                                                                       channel_id: prev_channel_id,
+                                                                       outpoint: prev_funding_outpoint,
+                                                                       htlc_id: prev_htlc_id,
+                                                                       incoming_packet_shared_secret: incoming_shared_secret,
+                                                                       // Phantom payments are only PendingHTLCRouting::Receive.
+                                                                       phantom_shared_secret: None,
+                                                                       blinded_failure: blinded.map(|b| b.failure),
+                                                               });
+                                                               let next_blinding_point = blinded.and_then(|b| {
+                                                                       let encrypted_tlvs_ss = self.node_signer.ecdh(
+                                                                               Recipient::Node, &b.inbound_blinding_point, None
+                                                                       ).unwrap().secret_bytes();
+                                                                       onion_utils::next_hop_pubkey(
+                                                                               &self.secp_ctx, b.inbound_blinding_point, &encrypted_tlvs_ss
+                                                                       ).ok()
+                                                               });
+
+                                                               // Forward the HTLC over the most appropriate channel with the corresponding peer,
+                                                               // applying non-strict forwarding.
+                                                               // The channel with the least amount of outbound liquidity will be used to maximize the
+                                                               // probability of being able to successfully forward a subsequent HTLC.
+                                                               let maybe_optimal_channel = peer_state.channel_by_id.values_mut().filter_map(|phase| match phase {
+                                                                       ChannelPhase::Funded(chan) => {
+                                                                               let balances = chan.context.get_available_balances(&self.fee_estimator);
+                                                                               if outgoing_amt_msat <= balances.next_outbound_htlc_limit_msat &&
+                                                                                       outgoing_amt_msat >= balances.next_outbound_htlc_minimum_msat &&
+                                                                                       chan.context.is_usable() {
+                                                                                       Some((chan, balances))
+                                                                               } else {
+                                                                                       None
+                                                                               }
                                                                        },
-                                                               }) => {
-                                                                       let logger = WithChannelContext::from(&self.logger, &chan.context, Some(payment_hash));
-                                                                       log_trace!(logger, "Adding HTLC from short id {} with payment_hash {} to channel with short id {} after delay", prev_short_channel_id, &payment_hash, short_chan_id);
-                                                                       let htlc_source = HTLCSource::PreviousHopData(HTLCPreviousHopData {
-                                                                               short_channel_id: prev_short_channel_id,
-                                                                               user_channel_id: Some(prev_user_channel_id),
-                                                                               channel_id: prev_channel_id,
-                                                                               outpoint: prev_funding_outpoint,
-                                                                               htlc_id: prev_htlc_id,
-                                                                               incoming_packet_shared_secret: incoming_shared_secret,
-                                                                               // Phantom payments are only PendingHTLCRouting::Receive.
-                                                                               phantom_shared_secret: None,
-                                                                               blinded_failure: blinded.map(|b| b.failure),
-                                                                       });
-                                                                       let next_blinding_point = blinded.and_then(|b| {
-                                                                               let encrypted_tlvs_ss = self.node_signer.ecdh(
-                                                                                       Recipient::Node, &b.inbound_blinding_point, None
-                                                                               ).unwrap().secret_bytes();
-                                                                               onion_utils::next_hop_pubkey(
-                                                                                       &self.secp_ctx, b.inbound_blinding_point, &encrypted_tlvs_ss
-                                                                               ).ok()
-                                                                       });
-                                                                       if let Err(e) = chan.queue_add_htlc(outgoing_amt_msat,
-                                                                               payment_hash, outgoing_cltv_value, htlc_source.clone(),
-                                                                               onion_packet, skimmed_fee_msat, next_blinding_point, &self.fee_estimator,
-                                                                               &&logger)
-                                                                       {
-                                                                               if let ChannelError::Ignore(msg) = e {
-                                                                                       log_trace!(logger, "Failed to forward HTLC with payment_hash {}: {}", &payment_hash, msg);
+                                                                       _ => None,
+                                                               }).min_by_key(|(_, balances)| balances.next_outbound_htlc_limit_msat).map(|(c, _)| c);
+                                                               let optimal_channel = match maybe_optimal_channel {
+                                                                       Some(chan) => chan,
+                                                                       None => {
+                                                                               // Fall back to the specified channel to return an appropriate error.
+                                                                               if let Some(ChannelPhase::Funded(ref mut chan)) = peer_state.channel_by_id.get_mut(&forward_chan_id) {
+                                                                                       chan
                                                                                } else {
-                                                                                       panic!("Stated return value requirements in send_htlc() were not met");
+                                                                                       forwarding_channel_not_found!(core::iter::once(forward_info).chain(draining_pending_forwards));
+                                                                                       break;
                                                                                }
+                                                                       }
+                                                               };
+
+                                                               let logger = WithChannelContext::from(&self.logger, &optimal_channel.context, Some(payment_hash));
+                                                               let channel_description = if optimal_channel.context.get_short_channel_id() == Some(short_chan_id) {
+                                                                       "specified"
+                                                               } else {
+                                                                       "alternate"
+                                                               };
+                                                               log_trace!(logger, "Forwarding HTLC from SCID {} with payment_hash {} and next hop SCID {} over {} channel {} with corresponding peer {}",
+                                                                       prev_short_channel_id, &payment_hash, short_chan_id, channel_description, optimal_channel.context.channel_id(), &counterparty_node_id);
+                                                               if let Err(e) = optimal_channel.queue_add_htlc(outgoing_amt_msat,
+                                                                               payment_hash, outgoing_cltv_value, htlc_source.clone(),
+                                                                               onion_packet.clone(), skimmed_fee_msat, next_blinding_point, &self.fee_estimator,
+                                                                               &&logger)
+                                                               {
+                                                                       if let ChannelError::Ignore(msg) = e {
+                                                                               log_trace!(logger, "Failed to forward HTLC with payment_hash {} to peer {}: {}", &payment_hash, &counterparty_node_id, msg);
+                                                                       } else {
+                                                                               panic!("Stated return value requirements in send_htlc() were not met");
+                                                                       }
+
+                                                                       if let Some(ChannelPhase::Funded(ref mut chan)) = peer_state.channel_by_id.get_mut(&forward_chan_id) {
                                                                                let (failure_code, data) = self.get_htlc_temp_fail_err_and_data(0x1000|7, short_chan_id, chan);
                                                                                failed_forwards.push((htlc_source, payment_hash,
                                                                                        HTLCFailReason::reason(failure_code, data),
                                                                                        HTLCDestination::NextHopChannel { node_id: Some(chan.context.get_counterparty_node_id()), channel_id: forward_chan_id }
                                                                                ));
-                                                                               continue;
+                                                                       } else {
+                                                                               forwarding_channel_not_found!(core::iter::once(forward_info).chain(draining_pending_forwards));
+                                                                               break;
                                                                        }
-                                                                       None
-                                                               },
-                                                               HTLCForwardInfo::AddHTLC { .. } => {
-                                                                       panic!("short_channel_id != 0 should imply any pending_forward entries are of type Forward");
-                                                               },
-                                                               HTLCForwardInfo::FailHTLC { htlc_id, err_packet } => {
+                                                               }
+                                                               None
+                                                       },
+                                                       HTLCForwardInfo::AddHTLC { .. } => {
+                                                               panic!("short_channel_id != 0 should imply any pending_forward entries are of type Forward");
+                                                       },
+                                                       HTLCForwardInfo::FailHTLC { htlc_id, ref err_packet } => {
+                                                               if let Some(ChannelPhase::Funded(ref mut chan)) = peer_state.channel_by_id.get_mut(&forward_chan_id) {
+                                                                       let logger = WithChannelContext::from(&self.logger, &chan.context, None);
                                                                        log_trace!(logger, "Failing HTLC back to channel with short id {} (backward HTLC ID {}) after delay", short_chan_id, htlc_id);
-                                                                       Some((chan.queue_fail_htlc(htlc_id, err_packet, &&logger), htlc_id))
-                                                               },
-                                                               HTLCForwardInfo::FailMalformedHTLC { htlc_id, failure_code, sha256_of_onion } => {
+                                                                       Some((chan.queue_fail_htlc(htlc_id, err_packet.clone(), &&logger), htlc_id))
+                                                               } else {
+                                                                       forwarding_channel_not_found!(core::iter::once(forward_info).chain(draining_pending_forwards));
+                                                                       break;
+                                                               }
+                                                       },
+                                                       HTLCForwardInfo::FailMalformedHTLC { htlc_id, failure_code, sha256_of_onion } => {
+                                                               if let Some(ChannelPhase::Funded(ref mut chan)) = peer_state.channel_by_id.get_mut(&forward_chan_id) {
+                                                                       let logger = WithChannelContext::from(&self.logger, &chan.context, None);
                                                                        log_trace!(logger, "Failing malformed HTLC back to channel with short id {} (backward HTLC ID {}) after delay", short_chan_id, htlc_id);
                                                                        let res = chan.queue_fail_malformed_htlc(
                                                                                htlc_id, failure_code, sha256_of_onion, &&logger
                                                                        );
                                                                        Some((res, htlc_id))
-                                                               },
-                                                       };
-                                                       if let Some((queue_fail_htlc_res, htlc_id)) = queue_fail_htlc_res {
-                                                               if let Err(e) = queue_fail_htlc_res {
-                                                                       if let ChannelError::Ignore(msg) = e {
+                                                               } else {
+                                                                       forwarding_channel_not_found!(core::iter::once(forward_info).chain(draining_pending_forwards));
+                                                                       break;
+                                                               }
+                                                       },
+                                               };
+                                               if let Some((queue_fail_htlc_res, htlc_id)) = queue_fail_htlc_res {
+                                                       if let Err(e) = queue_fail_htlc_res {
+                                                               if let ChannelError::Ignore(msg) = e {
+                                                                       if let Some(ChannelPhase::Funded(ref mut chan)) = peer_state.channel_by_id.get_mut(&forward_chan_id) {
+                                                                               let logger = WithChannelContext::from(&self.logger, &chan.context, None);
                                                                                log_trace!(logger, "Failed to fail HTLC with ID {} backwards to short_id {}: {}", htlc_id, short_chan_id, msg);
-                                                                       } else {
-                                                                               panic!("Stated return value requirements in queue_fail_{{malformed_}}htlc() were not met");
                                                                        }
-                                                                       // fail-backs are best-effort, we probably already have one
-                                                                       // pending, and if not that's OK, if not, the channel is on
-                                                                       // the chain and sending the HTLC-Timeout is their problem.
-                                                                       continue;
+                                                               } else {
+                                                                       panic!("Stated return value requirements in queue_fail_{{malformed_}}htlc() were not met");
                                                                }
+                                                               // fail-backs are best-effort, we probably already have one
+                                                               // pending, and if not that's OK, if not, the channel is on
+                                                               // the chain and sending the HTLC-Timeout is their problem.
+                                                               continue;
                                                        }
                                                }
-                                       } else {
-                                               forwarding_channel_not_found!();
-                                               continue;
                                        }
                                } else {
                                        'next_forwardable_htlc: for forward_info in pending_forwards.drain(..) {
                                                match forward_info {
                                                        HTLCForwardInfo::AddHTLC(PendingAddHTLCInfo {
                                                                prev_short_channel_id, prev_htlc_id, prev_channel_id, prev_funding_outpoint,
-                                                               prev_user_channel_id, forward_info: PendingHTLCInfo {
+                                                               prev_user_channel_id, prev_counterparty_node_id, forward_info: PendingHTLCInfo {
                                                                        routing, incoming_shared_secret, payment_hash, incoming_amt_msat, outgoing_amt_msat,
                                                                        skimmed_fee_msat, ..
                                                                }
@@ -5146,6 +5330,7 @@ where
                                                                        prev_hop: HTLCPreviousHopData {
                                                                                short_channel_id: prev_short_channel_id,
                                                                                user_channel_id: Some(prev_user_channel_id),
+                                                                               counterparty_node_id: prev_counterparty_node_id,
                                                                                channel_id: prev_channel_id,
                                                                                outpoint: prev_funding_outpoint,
                                                                                htlc_id: prev_htlc_id,
@@ -5178,6 +5363,7 @@ where
                                                                                failed_forwards.push((HTLCSource::PreviousHopData(HTLCPreviousHopData {
                                                                                                short_channel_id: $htlc.prev_hop.short_channel_id,
                                                                                                user_channel_id: $htlc.prev_hop.user_channel_id,
+                                                                                               counterparty_node_id: $htlc.prev_hop.counterparty_node_id,
                                                                                                channel_id: prev_channel_id,
                                                                                                outpoint: prev_funding_outpoint,
                                                                                                htlc_id: $htlc.prev_hop.htlc_id,
@@ -6063,7 +6249,7 @@ where
 
                let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self);
 
-               let mut sources = {
+               let sources = {
                        let mut claimable_payments = self.claimable_payments.lock().unwrap();
                        if let Some(payment) = claimable_payments.claimable_payments.remove(&payment_hash) {
                                let mut receiver_node_id = self.our_network_pubkey;
@@ -6158,26 +6344,46 @@ where
                        return;
                }
                if valid_mpp {
-                       for htlc in sources.drain(..) {
-                               let prev_hop_chan_id = htlc.prev_hop.channel_id;
-                               if let Err((pk, err)) = self.claim_funds_from_hop(
+                       let pending_mpp_claim_ptr_opt = if sources.len() > 1 {
+                               let channels_without_preimage = sources.iter().filter_map(|htlc| {
+                                       if let Some(cp_id) = htlc.prev_hop.counterparty_node_id {
+                                               let prev_hop = &htlc.prev_hop;
+                                               Some((cp_id, prev_hop.outpoint, prev_hop.channel_id, prev_hop.htlc_id))
+                                       } else {
+                                               None
+                                       }
+                               }).collect();
+                               Some(Arc::new(Mutex::new(PendingMPPClaim {
+                                       channels_without_preimage,
+                                       channels_with_preimage: Vec::new(),
+                               })))
+                       } else {
+                               None
+                       };
+                       for htlc in sources {
+                               let this_mpp_claim = pending_mpp_claim_ptr_opt.as_ref().and_then(|pending_mpp_claim|
+                                       if let Some(cp_id) = htlc.prev_hop.counterparty_node_id {
+                                               let claim_ptr = PendingMPPClaimPointer(Arc::clone(pending_mpp_claim));
+                                               Some((cp_id, htlc.prev_hop.channel_id, htlc.prev_hop.htlc_id, claim_ptr))
+                                       } else {
+                                               None
+                                       }
+                               );
+                               let raa_blocker = pending_mpp_claim_ptr_opt.as_ref().map(|pending_claim| {
+                                       RAAMonitorUpdateBlockingAction::ClaimedMPPPayment {
+                                               pending_claim: PendingMPPClaimPointer(Arc::clone(pending_claim)),
+                                       }
+                               });
+                               self.claim_funds_from_hop(
                                        htlc.prev_hop, payment_preimage,
                                        |_, definitely_duplicate| {
                                                debug_assert!(!definitely_duplicate, "We shouldn't claim duplicatively from a payment");
-                                               Some(MonitorUpdateCompletionAction::PaymentClaimed { payment_hash })
+                                               (Some(MonitorUpdateCompletionAction::PaymentClaimed { payment_hash, pending_mpp_claim: this_mpp_claim }), raa_blocker)
                                        }
-                               ) {
-                                       if let msgs::ErrorAction::IgnoreError = err.err.action {
-                                               // We got a temporary failure updating monitor, but will claim the
-                                               // HTLC when the monitor updating is restored (or on chain).
-                                               let logger = WithContext::from(&self.logger, None, Some(prev_hop_chan_id), Some(payment_hash));
-                                               log_error!(logger, "Temporary failure claiming HTLC, treating as success: {}", err.err.err);
-                                       } else { errs.push((pk, err)); }
-                               }
+                               );
                        }
-               }
-               if !valid_mpp {
-                       for htlc in sources.drain(..) {
+               } else {
+                       for htlc in sources {
                                let mut htlc_msat_height_data = htlc.value.to_be_bytes().to_vec();
                                htlc_msat_height_data.extend_from_slice(&self.best_block.read().unwrap().height.to_be_bytes());
                                let source = HTLCSource::PreviousHopData(htlc.prev_hop);
@@ -6195,9 +6401,12 @@ where
                }
        }
 
-       fn claim_funds_from_hop<ComplFunc: FnOnce(Option<u64>, bool) -> Option<MonitorUpdateCompletionAction>>(&self,
-               prev_hop: HTLCPreviousHopData, payment_preimage: PaymentPreimage, completion_action: ComplFunc)
-       -> Result<(), (PublicKey, MsgHandleErrInternal)> {
+       fn claim_funds_from_hop<
+               ComplFunc: FnOnce(Option<u64>, bool) -> (Option<MonitorUpdateCompletionAction>, Option<RAAMonitorUpdateBlockingAction>)
+       >(
+               &self, prev_hop: HTLCPreviousHopData, payment_preimage: PaymentPreimage,
+               completion_action: ComplFunc,
+       ) {
                //TODO: Delay the claimed_funds relaying just like we do outbound relay!
 
                // If we haven't yet run background events assume we're still deserializing and shouldn't
@@ -6234,11 +6443,15 @@ where
 
                                                match fulfill_res {
                                                        UpdateFulfillCommitFetch::NewClaim { htlc_value_msat, monitor_update } => {
-                                                               if let Some(action) = completion_action(Some(htlc_value_msat), false) {
+                                                               let (action_opt, raa_blocker_opt) = completion_action(Some(htlc_value_msat), false);
+                                                               if let Some(action) = action_opt {
                                                                        log_trace!(logger, "Tracking monitor update completion action for channel {}: {:?}",
                                                                                chan_id, action);
                                                                        peer_state.monitor_update_blocked_actions.entry(chan_id).or_insert(Vec::new()).push(action);
                                                                }
+                                                               if let Some(raa_blocker) = raa_blocker_opt {
+                                                                       peer_state.actions_blocking_raa_monitor_updates.entry(chan_id).or_insert_with(Vec::new).push(raa_blocker);
+                                                               }
                                                                if !during_init {
                                                                        handle_new_monitor_update!(self, prev_hop.outpoint, monitor_update, peer_state_lock,
                                                                                peer_state, per_peer_state, chan);
@@ -6256,11 +6469,16 @@ where
                                                                }
                                                        }
                                                        UpdateFulfillCommitFetch::DuplicateClaim {} => {
-                                                               let action = if let Some(action) = completion_action(None, true) {
+                                                               let (action_opt, raa_blocker_opt) = completion_action(None, true);
+                                                               if let Some(raa_blocker) = raa_blocker_opt {
+                                                                       debug_assert!(peer_state.actions_blocking_raa_monitor_updates.get(&chan_id).unwrap().contains(&raa_blocker));
+                                                               }
+                                                               let action = if let Some(action) = action_opt {
                                                                        action
                                                                } else {
-                                                                       return Ok(());
+                                                                       return;
                                                                };
+
                                                                mem::drop(peer_state_lock);
 
                                                                log_trace!(logger, "Completing monitor update completion action for channel {} as claim was redundant: {:?}",
@@ -6275,7 +6493,7 @@ where
                                                                } else {
                                                                        debug_assert!(false,
                                                                                "Duplicate claims should always free another channel immediately");
-                                                                       return Ok(());
+                                                                       return;
                                                                };
                                                                if let Some(peer_state_mtx) = per_peer_state.get(&node_id) {
                                                                        let mut peer_state = peer_state_mtx.lock().unwrap();
@@ -6300,7 +6518,7 @@ where
                                                        }
                                                }
                                        }
-                                       return Ok(());
+                                       return;
                                }
                        }
                }
@@ -6347,8 +6565,46 @@ where
                // `ChannelMonitor` we've provided the above update to. Instead, note that `Event`s are
                // generally always allowed to be duplicative (and it's specifically noted in
                // `PaymentForwarded`).
-               self.handle_monitor_update_completion_actions(completion_action(None, false));
-               Ok(())
+               let (action_opt, raa_blocker_opt) = completion_action(None, false);
+
+               if let Some(raa_blocker) = raa_blocker_opt {
+                       let counterparty_node_id = prev_hop.counterparty_node_id.or_else(||
+                               // prev_hop.counterparty_node_id is always available for payments received after
+                               // LDK 0.0.123, but for those received on 0.0.123 and claimed later, we need to
+                               // look up the counterparty in the `action_opt`, if possible.
+                               action_opt.as_ref().and_then(|action|
+                                       if let MonitorUpdateCompletionAction::PaymentClaimed { pending_mpp_claim, .. } = action {
+                                               pending_mpp_claim.as_ref().map(|(node_id, _, _, _)| *node_id)
+                                       } else { None }
+                               )
+                       );
+                       if let Some(counterparty_node_id) = counterparty_node_id {
+                               // TODO: Avoid always blocking the world for the write lock here.
+                               let mut per_peer_state = self.per_peer_state.write().unwrap();
+                               let peer_state_mutex = per_peer_state.entry(counterparty_node_id).or_insert_with(||
+                                       Mutex::new(PeerState {
+                                               channel_by_id: new_hash_map(),
+                                               inbound_channel_request_by_id: new_hash_map(),
+                                               latest_features: InitFeatures::empty(),
+                                               pending_msg_events: Vec::new(),
+                                               in_flight_monitor_updates: BTreeMap::new(),
+                                               monitor_update_blocked_actions: BTreeMap::new(),
+                                               actions_blocking_raa_monitor_updates: BTreeMap::new(),
+                                               is_connected: false,
+                                       }));
+                               let mut peer_state = peer_state_mutex.lock().unwrap();
+
+                               peer_state.actions_blocking_raa_monitor_updates
+                                       .entry(prev_hop.channel_id)
+                                       .or_insert_with(Vec::new)
+                                       .push(raa_blocker);
+                       } else {
+                               debug_assert!(false,
+                                       "RAA ChannelMonitorUpdate blockers are only set with PaymentClaimed completion actions, so we should always have a counterparty node id");
+                       }
+               }
+
+               self.handle_monitor_update_completion_actions(action_opt);
        }
 
        fn finalize_claims(&self, sources: Vec<HTLCSource>) {
@@ -6381,11 +6637,16 @@ where
                                let completed_blocker = RAAMonitorUpdateBlockingAction::from_prev_hop_data(&hop_data);
                                #[cfg(debug_assertions)]
                                let claiming_chan_funding_outpoint = hop_data.outpoint;
-                               let res = self.claim_funds_from_hop(hop_data, payment_preimage,
+                               self.claim_funds_from_hop(hop_data, payment_preimage,
                                        |htlc_claim_value_msat, definitely_duplicate| {
                                                let chan_to_release =
                                                        if let Some(node_id) = next_channel_counterparty_node_id {
-                                                               Some((node_id, next_channel_outpoint, next_channel_id, completed_blocker))
+                                                               Some(EventUnblockedChannel {
+                                                                       counterparty_node_id: node_id,
+                                                                       funding_txo: next_channel_outpoint,
+                                                                       channel_id: next_channel_id,
+                                                                       blocking_action: completed_blocker
+                                                               })
                                                        } else {
                                                                // We can only get `None` here if we are processing a
                                                                // `ChannelMonitor`-originated event, in which case we
@@ -6442,16 +6703,16 @@ where
                                                                        }
                                                                }), "{:?}", *background_events);
                                                        }
-                                                       None
+                                                       (None, None)
                                                } else if definitely_duplicate {
                                                        if let Some(other_chan) = chan_to_release {
-                                                               Some(MonitorUpdateCompletionAction::FreeOtherChannelImmediately {
-                                                                       downstream_counterparty_node_id: other_chan.0,
-                                                                       downstream_funding_outpoint: other_chan.1,
-                                                                       downstream_channel_id: other_chan.2,
-                                                                       blocking_action: other_chan.3,
-                                                               })
-                                                       } else { None }
+                                                               (Some(MonitorUpdateCompletionAction::FreeOtherChannelImmediately {
+                                                                       downstream_counterparty_node_id: other_chan.counterparty_node_id,
+                                                                       downstream_funding_outpoint: other_chan.funding_txo,
+                                                                       downstream_channel_id: other_chan.channel_id,
+                                                                       blocking_action: other_chan.blocking_action,
+                                                               }), None)
+                                                       } else { (None, None) }
                                                } else {
                                                        let total_fee_earned_msat = if let Some(forwarded_htlc_value) = forwarded_htlc_value_msat {
                                                                if let Some(claimed_htlc_value) = htlc_claim_value_msat {
@@ -6460,7 +6721,7 @@ where
                                                        } else { None };
                                                        debug_assert!(skimmed_fee_msat <= total_fee_earned_msat,
                                                                "skimmed_fee_msat must always be included in total_fee_earned_msat");
-                                                       Some(MonitorUpdateCompletionAction::EmitEventAndFreeOtherChannel {
+                                                       (Some(MonitorUpdateCompletionAction::EmitEventAndFreeOtherChannel {
                                                                event: events::Event::PaymentForwarded {
                                                                        prev_channel_id: Some(prev_channel_id),
                                                                        next_channel_id: Some(next_channel_id),
@@ -6472,13 +6733,9 @@ where
                                                                        outbound_amount_forwarded_msat: forwarded_htlc_value_msat,
                                                                },
                                                                downstream_counterparty_and_funding_outpoint: chan_to_release,
-                                                       })
+                                                       }), None)
                                                }
                                        });
-                               if let Err((pk, err)) = res {
-                                       let result: Result<(), _> = Err(err);
-                                       let _ = handle_error!(self, result, pk);
-                               }
                        },
                }
        }
@@ -6493,9 +6750,44 @@ where
                debug_assert_ne!(self.claimable_payments.held_by_thread(), LockHeldState::HeldByThread);
                debug_assert_ne!(self.per_peer_state.held_by_thread(), LockHeldState::HeldByThread);
 
+               let mut freed_channels = Vec::new();
+
                for action in actions.into_iter() {
                        match action {
-                               MonitorUpdateCompletionAction::PaymentClaimed { payment_hash } => {
+                               MonitorUpdateCompletionAction::PaymentClaimed { payment_hash, pending_mpp_claim } => {
+                                       if let Some((counterparty_node_id, chan_id, htlc_id, claim_ptr)) = pending_mpp_claim {
+                                               let per_peer_state = self.per_peer_state.read().unwrap();
+                                               per_peer_state.get(&counterparty_node_id).map(|peer_state_mutex| {
+                                                       let mut peer_state = peer_state_mutex.lock().unwrap();
+                                                       let blockers_entry = peer_state.actions_blocking_raa_monitor_updates.entry(chan_id);
+                                                       if let btree_map::Entry::Occupied(mut blockers) = blockers_entry {
+                                                               blockers.get_mut().retain(|blocker|
+                                                                       if let &RAAMonitorUpdateBlockingAction::ClaimedMPPPayment { pending_claim } = &blocker {
+                                                                               if *pending_claim == claim_ptr {
+                                                                                       let mut pending_claim_state_lock = pending_claim.0.lock().unwrap();
+                                                                                       let pending_claim_state = &mut *pending_claim_state_lock;
+                                                                                       pending_claim_state.channels_without_preimage.retain(|(cp, outp, cid, hid)| {
+                                                                                               if *cp == counterparty_node_id && *cid == chan_id && *hid == htlc_id {
+                                                                                                       pending_claim_state.channels_with_preimage.push((*cp, *outp, *cid));
+                                                                                                       false
+                                                                                               } else { true }
+                                                                                       });
+                                                                                       if pending_claim_state.channels_without_preimage.is_empty() {
+                                                                                               for (cp, outp, cid) in pending_claim_state.channels_with_preimage.iter() {
+                                                                                                       freed_channels.push((*cp, *outp, *cid, blocker.clone()));
+                                                                                               }
+                                                                                       }
+                                                                                       !pending_claim_state.channels_without_preimage.is_empty()
+                                                                               } else { true }
+                                                                       } else { true }
+                                                               );
+                                                               if blockers.get().is_empty() {
+                                                                       blockers.remove();
+                                                               }
+                                                       }
+                                               });
+                                       }
+
                                        let payment = self.claimable_payments.lock().unwrap().pending_claiming_payments.remove(&payment_hash);
                                        if let Some(ClaimingPayment {
                                                amount_msat,
@@ -6520,8 +6812,11 @@ where
                                        event, downstream_counterparty_and_funding_outpoint
                                } => {
                                        self.pending_events.lock().unwrap().push_back((event, None));
-                                       if let Some((node_id, funding_outpoint, channel_id, blocker)) = downstream_counterparty_and_funding_outpoint {
-                                               self.handle_monitor_update_release(node_id, funding_outpoint, channel_id, Some(blocker));
+                                       if let Some(unblocked) = downstream_counterparty_and_funding_outpoint {
+                                               self.handle_monitor_update_release(
+                                                       unblocked.counterparty_node_id, unblocked.funding_txo,
+                                                       unblocked.channel_id, Some(unblocked.blocking_action),
+                                               );
                                        }
                                },
                                MonitorUpdateCompletionAction::FreeOtherChannelImmediately {
@@ -6536,6 +6831,10 @@ where
                                },
                        }
                }
+
+               for (node_id, funding_outpoint, channel_id, blocker) in freed_channels {
+                       self.handle_monitor_update_release(node_id, funding_outpoint, channel_id, Some(blocker));
+               }
        }
 
        /// Handles a channel reentering a functional state, either due to reconnect or a monitor
@@ -6546,7 +6845,7 @@ where
                pending_forwards: Vec<(PendingHTLCInfo, u64)>, pending_update_adds: Vec<msgs::UpdateAddHTLC>,
                funding_broadcastable: Option<Transaction>,
                channel_ready: Option<msgs::ChannelReady>, announcement_sigs: Option<msgs::AnnouncementSignatures>)
-       -> (Option<(u64, OutPoint, ChannelId, u128, Vec<(PendingHTLCInfo, u64)>)>, Option<(u64, Vec<msgs::UpdateAddHTLC>)>) {
+       -> (Option<(u64, Option<PublicKey>, OutPoint, ChannelId, u128, Vec<(PendingHTLCInfo, u64)>)>, Option<(u64, Vec<msgs::UpdateAddHTLC>)>) {
                let logger = WithChannelContext::from(&self.logger, &channel.context, None);
                log_trace!(logger, "Handling channel resumption for channel {} with {} RAA, {} commitment update, {} pending forwards, {} pending update_add_htlcs, {}broadcasting funding, {} channel ready, {} announcement",
                        &channel.context.channel_id(),
@@ -6562,8 +6861,11 @@ where
 
                let mut htlc_forwards = None;
                if !pending_forwards.is_empty() {
-                       htlc_forwards = Some((short_channel_id, channel.context.get_funding_txo().unwrap(),
-                               channel.context.channel_id(), channel.context.get_user_id(), pending_forwards));
+                       htlc_forwards = Some((
+                               short_channel_id, Some(channel.context.get_counterparty_node_id()),
+                               channel.context.get_funding_txo().unwrap(), channel.context.channel_id(),
+                               channel.context.get_user_id(), pending_forwards
+                       ));
                }
                let mut decode_update_add_htlcs = None;
                if !pending_update_adds.is_empty() {
@@ -7606,15 +7908,15 @@ where
        }
 
        #[inline]
-       fn forward_htlcs(&self, per_source_pending_forwards: &mut [(u64, OutPoint, ChannelId, u128, Vec<(PendingHTLCInfo, u64)>)]) {
+       fn forward_htlcs(&self, per_source_pending_forwards: &mut [(u64, Option<PublicKey>, OutPoint, ChannelId, u128, Vec<(PendingHTLCInfo, u64)>)]) {
                let push_forward_event = self.forward_htlcs_without_forward_event(per_source_pending_forwards);
                if push_forward_event { self.push_pending_forwards_ev() }
        }
 
        #[inline]
-       fn forward_htlcs_without_forward_event(&self, per_source_pending_forwards: &mut [(u64, OutPoint, ChannelId, u128, Vec<(PendingHTLCInfo, u64)>)]) -> bool {
+       fn forward_htlcs_without_forward_event(&self, per_source_pending_forwards: &mut [(u64, Option<PublicKey>, OutPoint, ChannelId, u128, Vec<(PendingHTLCInfo, u64)>)]) -> bool {
                let mut push_forward_event = false;
-               for &mut (prev_short_channel_id, prev_funding_outpoint, prev_channel_id, prev_user_channel_id, ref mut pending_forwards) in per_source_pending_forwards {
+               for &mut (prev_short_channel_id, prev_counterparty_node_id, prev_funding_outpoint, prev_channel_id, prev_user_channel_id, ref mut pending_forwards) in per_source_pending_forwards {
                        let mut new_intercept_events = VecDeque::new();
                        let mut failed_intercept_forwards = Vec::new();
                        if !pending_forwards.is_empty() {
@@ -7633,7 +7935,9 @@ where
                                        match forward_htlcs.entry(scid) {
                                                hash_map::Entry::Occupied(mut entry) => {
                                                        entry.get_mut().push(HTLCForwardInfo::AddHTLC(PendingAddHTLCInfo {
-                                                               prev_short_channel_id, prev_funding_outpoint, prev_channel_id, prev_htlc_id, prev_user_channel_id, forward_info }));
+                                                               prev_short_channel_id, prev_counterparty_node_id, prev_funding_outpoint,
+                                                               prev_channel_id, prev_htlc_id, prev_user_channel_id, forward_info
+                                                       }));
                                                },
                                                hash_map::Entry::Vacant(entry) => {
                                                        if !is_our_scid && forward_info.incoming_amt_msat.is_some() &&
@@ -7651,7 +7955,9 @@ where
                                                                                        intercept_id
                                                                                }, None));
                                                                                entry.insert(PendingAddHTLCInfo {
-                                                                                       prev_short_channel_id, prev_funding_outpoint, prev_channel_id, prev_htlc_id, prev_user_channel_id, forward_info });
+                                                                                       prev_short_channel_id, prev_counterparty_node_id, prev_funding_outpoint,
+                                                                                       prev_channel_id, prev_htlc_id, prev_user_channel_id, forward_info
+                                                                               });
                                                                        },
                                                                        hash_map::Entry::Occupied(_) => {
                                                                                let logger = WithContext::from(&self.logger, None, Some(prev_channel_id), Some(forward_info.payment_hash));
@@ -7659,6 +7965,7 @@ where
                                                                                let htlc_source = HTLCSource::PreviousHopData(HTLCPreviousHopData {
                                                                                        short_channel_id: prev_short_channel_id,
                                                                                        user_channel_id: Some(prev_user_channel_id),
+                                                                                       counterparty_node_id: prev_counterparty_node_id,
                                                                                        outpoint: prev_funding_outpoint,
                                                                                        channel_id: prev_channel_id,
                                                                                        htlc_id: prev_htlc_id,
@@ -7678,7 +7985,9 @@ where
                                                                // payments are being processed.
                                                                push_forward_event |= forward_htlcs_empty && decode_update_add_htlcs_empty;
                                                                entry.insert(vec!(HTLCForwardInfo::AddHTLC(PendingAddHTLCInfo {
-                                                                       prev_short_channel_id, prev_funding_outpoint, prev_channel_id, prev_htlc_id, prev_user_channel_id, forward_info })));
+                                                                       prev_short_channel_id, prev_counterparty_node_id, prev_funding_outpoint,
+                                                                       prev_channel_id, prev_htlc_id, prev_user_channel_id, forward_info
+                                                               })));
                                                        }
                                                }
                                        }
@@ -7880,7 +8189,7 @@ where
                                                return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a channel_update for a channel from the wrong node - it shouldn't know about our private channels!".to_owned(), chan_id));
                                        }
                                        let were_node_one = self.get_our_node_id().serialize()[..] < chan.context.get_counterparty_node_id().serialize()[..];
-                                       let msg_from_node_one = msg.contents.flags & 1 == 0;
+                                       let msg_from_node_one = msg.contents.channel_flags & 1 == 0;
                                        if were_node_one == msg_from_node_one {
                                                return Ok(NotifyOption::SkipPersistNoEvents);
                                        } else {
@@ -8155,11 +8464,26 @@ where
                        match phase {
                                ChannelPhase::Funded(chan) => {
                                        let msgs = chan.signer_maybe_unblocked(&self.logger);
-                                       if let Some(updates) = msgs.commitment_update {
-                                               pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs {
-                                                       node_id,
-                                                       updates,
-                                               });
+                                       let cu_msg = msgs.commitment_update.map(|updates| events::MessageSendEvent::UpdateHTLCs {
+                                               node_id,
+                                               updates,
+                                       });
+                                       let raa_msg = msgs.revoke_and_ack.map(|msg| events::MessageSendEvent::SendRevokeAndACK {
+                                               node_id,
+                                               msg,
+                                       });
+                                       match (cu_msg, raa_msg) {
+                                               (Some(cu), Some(raa)) if msgs.order == RAACommitmentOrder::CommitmentFirst => {
+                                                       pending_msg_events.push(cu);
+                                                       pending_msg_events.push(raa);
+                                               },
+                                               (Some(cu), Some(raa)) if msgs.order == RAACommitmentOrder::RevokeAndACKFirst => {
+                                                       pending_msg_events.push(raa);
+                                                       pending_msg_events.push(cu);
+                                               },
+                                               (Some(cu), _) => pending_msg_events.push(cu),
+                                               (_, Some(raa)) => pending_msg_events.push(raa),
+                                               (_, _) => {},
                                        }
                                        if let Some(msg) = msgs.funding_signed {
                                                pending_msg_events.push(events::MessageSendEvent::SendFundingSigned {
@@ -8334,8 +8658,10 @@ macro_rules! create_offer_builder { ($self: ident, $builder: ty) => {
                let entropy = &*$self.entropy_source;
                let secp_ctx = &$self.secp_ctx;
 
-               let path = $self.create_blinded_path_using_absolute_expiry(absolute_expiry)
+               let path = $self.create_blinded_paths_using_absolute_expiry(OffersContext::Unknown {}, absolute_expiry)
+                       .and_then(|paths| paths.into_iter().next().ok_or(()))
                        .map_err(|_| Bolt12SemanticError::MissingPaths)?;
+
                let builder = OfferBuilder::deriving_signing_pubkey(
                        node_id, expanded_key, entropy, secp_ctx
                )
@@ -8406,8 +8732,11 @@ macro_rules! create_refund_builder { ($self: ident, $builder: ty) => {
                let entropy = &*$self.entropy_source;
                let secp_ctx = &$self.secp_ctx;
 
-               let path = $self.create_blinded_path_using_absolute_expiry(Some(absolute_expiry))
+               let context = OffersContext::OutboundPayment { payment_id };
+               let path = $self.create_blinded_paths_using_absolute_expiry(context, Some(absolute_expiry))
+                       .and_then(|paths| paths.into_iter().next().ok_or(()))
                        .map_err(|_| Bolt12SemanticError::MissingPaths)?;
+
                let builder = RefundBuilder::deriving_payer_id(
                        node_id, expanded_key, entropy, secp_ctx, amount_msats, payment_id
                )?
@@ -8428,6 +8757,13 @@ macro_rules! create_refund_builder { ($self: ident, $builder: ty) => {
        }
 } }
 
+/// Defines the maximum number of [`OffersMessage`] including different reply paths to be sent
+/// along different paths.
+/// Sending multiple requests increases the chances of successful delivery in case some
+/// paths are unavailable. However, only one invoice for a given [`PaymentId`] will be paid,
+/// even if multiple invoices are received.
+const OFFERS_MESSAGE_REQUEST_LIMIT: usize = 10;
+
 impl<M: Deref, T: Deref, ES: Deref, NS: Deref, SP: Deref, F: Deref, R: Deref, L: Deref> ChannelManager<M, T, ES, NS, SP, F, R, L>
 where
        M::Target: chain::Watch<<SP::Target as SignerProvider>::EcdsaSigner>,
@@ -8529,7 +8865,9 @@ where
                        Some(payer_note) => builder.payer_note(payer_note),
                };
                let invoice_request = builder.build_and_sign()?;
-               let reply_path = self.create_blinded_path().map_err(|_| Bolt12SemanticError::MissingPaths)?;
+
+               let context = OffersContext::OutboundPayment { payment_id };
+               let reply_paths = self.create_blinded_paths(context).map_err(|_| Bolt12SemanticError::MissingPaths)?;
 
                let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self);
 
@@ -8542,25 +8880,27 @@ where
 
                let mut pending_offers_messages = self.pending_offers_messages.lock().unwrap();
                if !offer.paths().is_empty() {
-                       // Send as many invoice requests as there are paths in the offer (with an upper bound).
-                       // Using only one path could result in a failure if the path no longer exists. But only
-                       // one invoice for a given payment id will be paid, even if more than one is received.
-                       const REQUEST_LIMIT: usize = 10;
-                       for path in offer.paths().into_iter().take(REQUEST_LIMIT) {
+                       reply_paths
+                               .iter()
+                               .flat_map(|reply_path| offer.paths().iter().map(move |path| (path, reply_path)))
+                               .take(OFFERS_MESSAGE_REQUEST_LIMIT)
+                               .for_each(|(path, reply_path)| {
+                                       let message = new_pending_onion_message(
+                                               OffersMessage::InvoiceRequest(invoice_request.clone()),
+                                               Destination::BlindedPath(path.clone()),
+                                               Some(reply_path.clone()),
+                                       );
+                                       pending_offers_messages.push(message);
+                               });
+               } else if let Some(signing_pubkey) = offer.signing_pubkey() {
+                       for reply_path in reply_paths {
                                let message = new_pending_onion_message(
                                        OffersMessage::InvoiceRequest(invoice_request.clone()),
-                                       Destination::BlindedPath(path.clone()),
-                                       Some(reply_path.clone()),
+                                       Destination::Node(signing_pubkey),
+                                       Some(reply_path),
                                );
                                pending_offers_messages.push(message);
                        }
-               } else if let Some(signing_pubkey) = offer.signing_pubkey() {
-                       let message = new_pending_onion_message(
-                               OffersMessage::InvoiceRequest(invoice_request),
-                               Destination::Node(signing_pubkey),
-                               Some(reply_path),
-                       );
-                       pending_offers_messages.push(message);
                } else {
                        debug_assert!(false);
                        return Err(Bolt12SemanticError::MissingSigningPubkey);
@@ -8629,26 +8969,32 @@ where
                                )?;
                                let builder: InvoiceBuilder<DerivedSigningPubkey> = builder.into();
                                let invoice = builder.allow_mpp().build_and_sign(secp_ctx)?;
-                               let reply_path = self.create_blinded_path()
+                               let reply_paths = self.create_blinded_paths(OffersContext::Unknown {})
                                        .map_err(|_| Bolt12SemanticError::MissingPaths)?;
 
                                let mut pending_offers_messages = self.pending_offers_messages.lock().unwrap();
                                if refund.paths().is_empty() {
-                                       let message = new_pending_onion_message(
-                                               OffersMessage::Invoice(invoice.clone()),
-                                               Destination::Node(refund.payer_id()),
-                                               Some(reply_path),
-                                       );
-                                       pending_offers_messages.push(message);
-                               } else {
-                                       for path in refund.paths() {
+                                       for reply_path in reply_paths {
                                                let message = new_pending_onion_message(
                                                        OffersMessage::Invoice(invoice.clone()),
-                                                       Destination::BlindedPath(path.clone()),
-                                                       Some(reply_path.clone()),
+                                                       Destination::Node(refund.payer_id()),
+                                                       Some(reply_path),
                                                );
                                                pending_offers_messages.push(message);
                                        }
+                               } else {
+                                       reply_paths
+                                               .iter()
+                                               .flat_map(|reply_path| refund.paths().iter().map(move |path| (path, reply_path)))
+                                               .take(OFFERS_MESSAGE_REQUEST_LIMIT)
+                                               .for_each(|(path, reply_path)| {
+                                                       let message = new_pending_onion_message(
+                                                               OffersMessage::Invoice(invoice.clone()),
+                                                               Destination::BlindedPath(path.clone()),
+                                                               Some(reply_path.clone()),
+                                                       );
+                                                       pending_offers_messages.push(message);
+                                               });
                                }
 
                                Ok(invoice)
@@ -8755,22 +9101,22 @@ where
                inbound_payment::get_payment_preimage(payment_hash, payment_secret, &self.inbound_payment_key)
        }
 
-       /// Creates a blinded path by delegating to [`MessageRouter`] based on the path's intended
-       /// lifetime.
+       /// Creates a collection of blinded paths by delegating to [`MessageRouter`] based on
+       /// the path's intended lifetime.
        ///
        /// Whether or not the path is compact depends on whether the path is short-lived or long-lived,
        /// respectively, based on the given `absolute_expiry` as seconds since the Unix epoch. See
        /// [`MAX_SHORT_LIVED_RELATIVE_EXPIRY`].
-       fn create_blinded_path_using_absolute_expiry(
-               &self, absolute_expiry: Option<Duration>
-       ) -> Result<BlindedPath, ()> {
+       fn create_blinded_paths_using_absolute_expiry(
+               &self, context: OffersContext, absolute_expiry: Option<Duration>,
+       ) -> Result<Vec<BlindedPath>, ()> {
                let now = self.duration_since_epoch();
                let max_short_lived_absolute_expiry = now.saturating_add(MAX_SHORT_LIVED_RELATIVE_EXPIRY);
 
                if absolute_expiry.unwrap_or(Duration::MAX) <= max_short_lived_absolute_expiry {
-                       self.create_compact_blinded_path()
+                       self.create_compact_blinded_paths(context)
                } else {
-                       self.create_blinded_path()
+                       self.create_blinded_paths(context)
                }
        }
 
@@ -8787,10 +9133,11 @@ where
                now
        }
 
-       /// Creates a blinded path by delegating to [`MessageRouter::create_blinded_paths`].
+       /// Creates a collection of blinded paths by delegating to
+       /// [`MessageRouter::create_blinded_paths`].
        ///
-       /// Errors if the `MessageRouter` errors or returns an empty `Vec`.
-       fn create_blinded_path(&self) -> Result<BlindedPath, ()> {
+       /// Errors if the `MessageRouter` errors.
+       fn create_blinded_paths(&self, context: OffersContext) -> Result<Vec<BlindedPath>, ()> {
                let recipient = self.get_our_node_id();
                let secp_ctx = &self.secp_ctx;
 
@@ -8803,14 +9150,15 @@ where
                        .collect::<Vec<_>>();
 
                self.router
-                       .create_blinded_paths(recipient, peers, secp_ctx)
-                       .and_then(|paths| paths.into_iter().next().ok_or(()))
+                       .create_blinded_paths(recipient, MessageContext::Offers(context), peers, secp_ctx)
+                       .and_then(|paths| (!paths.is_empty()).then(|| paths).ok_or(()))
        }
 
-       /// Creates a blinded path by delegating to [`MessageRouter::create_compact_blinded_paths`].
+       /// Creates a collection of blinded paths by delegating to
+       /// [`MessageRouter::create_compact_blinded_paths`].
        ///
-       /// Errors if the `MessageRouter` errors or returns an empty `Vec`.
-       fn create_compact_blinded_path(&self) -> Result<BlindedPath, ()> {
+       /// Errors if the `MessageRouter` errors.
+       fn create_compact_blinded_paths(&self, context: OffersContext) -> Result<Vec<BlindedPath>, ()> {
                let recipient = self.get_our_node_id();
                let secp_ctx = &self.secp_ctx;
 
@@ -8830,8 +9178,8 @@ where
                        .collect::<Vec<_>>();
 
                self.router
-                       .create_compact_blinded_paths(recipient, peers, secp_ctx)
-                       .and_then(|paths| paths.into_iter().next().ok_or(()))
+                       .create_compact_blinded_paths(recipient, MessageContext::Offers(context), peers, secp_ctx)
+                       .and_then(|paths| (!paths.is_empty()).then(|| paths).ok_or(()))
        }
 
        /// Creates multi-hop blinded payment paths for the given `amount_msats` by delegating to
@@ -8929,7 +9277,7 @@ where
        #[cfg(any(test, feature = "_test_utils"))]
        pub fn get_and_clear_pending_events(&self) -> Vec<events::Event> {
                let events = core::cell::RefCell::new(Vec::new());
-               let event_handler = |event: events::Event| events.borrow_mut().push(event);
+               let event_handler = |event: events::Event| Ok(events.borrow_mut().push(event));
                self.process_pending_events(&event_handler);
                events.into_inner()
        }
@@ -9036,7 +9384,7 @@ where
        /// using the given event handler.
        ///
        /// See the trait-level documentation of [`EventsProvider`] for requirements.
-       pub async fn process_pending_events_async<Future: core::future::Future, H: Fn(Event) -> Future>(
+       pub async fn process_pending_events_async<Future: core::future::Future<Output = Result<(), ReplayEvent>>, H: Fn(Event) -> Future>(
                &self, handler: H
        ) {
                let mut ev;
@@ -9467,6 +9815,7 @@ where
                                                htlc_id: htlc.prev_htlc_id,
                                                incoming_packet_shared_secret: htlc.forward_info.incoming_shared_secret,
                                                phantom_shared_secret: None,
+                                               counterparty_node_id: htlc.prev_counterparty_node_id,
                                                outpoint: htlc.prev_funding_outpoint,
                                                channel_id: htlc.prev_channel_id,
                                                blinded_failure: htlc.forward_info.routing.blinded_failure(),
@@ -9652,7 +10001,7 @@ where
        }
 
        #[cfg(splicing)]
-       fn handle_splice(&self, counterparty_node_id: &PublicKey, msg: &msgs::Splice) {
+       fn handle_splice_init(&self, counterparty_node_id: &PublicKey, msg: &msgs::SpliceInit) {
                let _: Result<(), _> = handle_error!(self, Err(MsgHandleErrInternal::send_err_msg_no_close(
                        "Splicing not supported".to_owned(),
                         msg.channel_id.clone())), *counterparty_node_id);
@@ -9859,7 +10208,7 @@ where
                                                // Quiescence
                                                &events::MessageSendEvent::SendStfu { .. } => false,
                                                // Splicing
-                                               &events::MessageSendEvent::SendSplice { .. } => false,
+                                               &events::MessageSendEvent::SendSpliceInit { .. } => false,
                                                &events::MessageSendEvent::SendSpliceAck { .. } => false,
                                                &events::MessageSendEvent::SendSpliceLocked { .. } => false,
                                                // Interactive Transaction Construction
@@ -10211,10 +10560,17 @@ where
        R::Target: Router,
        L::Target: Logger,
 {
-       fn handle_message(&self, message: OffersMessage, responder: Option<Responder>) -> ResponseInstruction<OffersMessage> {
+       fn handle_message(&self, message: OffersMessage, context: OffersContext, responder: Option<Responder>) -> ResponseInstruction<OffersMessage> {
                let secp_ctx = &self.secp_ctx;
                let expanded_key = &self.inbound_payment_key;
 
+               let abandon_if_payment = |context| {
+                       match context {
+                               OffersContext::OutboundPayment { payment_id } => self.abandon_payment(payment_id),
+                               _ => {},
+                       }
+               };
+
                match message {
                        OffersMessage::InvoiceRequest(invoice_request) => {
                                let responder = match responder {
@@ -10327,16 +10683,26 @@ where
                                };
 
                                match result {
-                                       Ok(()) => ResponseInstruction::NoResponse,
-                                       Err(e) => match responder {
-                                               Some(responder) => responder.respond(OffersMessage::InvoiceError(e)),
+                                       Ok(_) => ResponseInstruction::NoResponse,
+                                       Err(err) => match responder {
+                                               Some(responder) => {
+                                                       abandon_if_payment(context);
+                                                       responder.respond(OffersMessage::InvoiceError(err))
+                                               },
                                                None => {
-                                                       log_trace!(self.logger, "No reply path for sending invoice error: {:?}", e);
-                                                       ResponseInstruction::NoResponse
+                                                       abandon_if_payment(context);
+                                                       log_trace!(
+                                                               self.logger,
+                                                               "An error response was generated, but there is no reply_path specified \
+                                                               for sending the response. Error: {}",
+                                                               err
+                                                       );
+                                                       return ResponseInstruction::NoResponse;
                                                },
                                        },
                                }
                        },
+                       #[cfg(async_payments)]
                        OffersMessage::StaticInvoice(_invoice) => {
                                match responder {
                                        Some(responder) => {
@@ -10348,6 +10714,7 @@ where
                                }
                        },
                        OffersMessage::InvoiceError(invoice_error) => {
+                               abandon_if_payment(context);
                                log_trace!(self.logger, "Received invoice_error: {}", invoice_error);
                                ResponseInstruction::NoResponse
                        },
@@ -10359,6 +10726,31 @@ where
        }
 }
 
+impl<M: Deref, T: Deref, ES: Deref, NS: Deref, SP: Deref, F: Deref, R: Deref, L: Deref>
+AsyncPaymentsMessageHandler for ChannelManager<M, T, ES, NS, SP, F, R, L>
+where
+       M::Target: chain::Watch<<SP::Target as SignerProvider>::EcdsaSigner>,
+       T::Target: BroadcasterInterface,
+       ES::Target: EntropySource,
+       NS::Target: NodeSigner,
+       SP::Target: SignerProvider,
+       F::Target: FeeEstimator,
+       R::Target: Router,
+       L::Target: Logger,
+{
+       fn held_htlc_available(
+               &self, _message: HeldHtlcAvailable, _responder: Option<Responder>
+       ) -> ResponseInstruction<ReleaseHeldHtlc> {
+               ResponseInstruction::NoResponse
+       }
+
+       fn release_held_htlc(&self, _message: ReleaseHeldHtlc) {}
+
+       fn release_pending_messages(&self) -> Vec<PendingOnionMessage<AsyncPaymentsMessage>> {
+               Vec::new()
+       }
+}
+
 impl<M: Deref, T: Deref, ES: Deref, NS: Deref, SP: Deref, F: Deref, R: Deref, L: Deref>
 NodeIdLookUp for ChannelManager<M, T, ES, NS, SP, F, R, L>
 where
@@ -10474,7 +10866,7 @@ impl_writeable_tlv_based_enum!(PendingHTLCRouting,
                (4, payment_data, option), // Added in 0.0.116
                (5, custom_tlvs, optional_vec),
        },
-;);
+);
 
 impl_writeable_tlv_based!(PendingHTLCInfo, {
        (0, routing, required),
@@ -10554,14 +10946,14 @@ impl Readable for HTLCFailureMsg {
        }
 }
 
-impl_writeable_tlv_based_enum!(PendingHTLCStatus, ;
+impl_writeable_tlv_based_enum_legacy!(PendingHTLCStatus, ;
        (0, Forward),
        (1, Fail),
 );
 
 impl_writeable_tlv_based_enum!(BlindedFailure,
        (0, FromIntroductionNode) => {},
-       (2, FromBlindedNode) => {}, ;
+       (2, FromBlindedNode) => {},
 );
 
 impl_writeable_tlv_based!(HTLCPreviousHopData, {
@@ -10575,6 +10967,7 @@ impl_writeable_tlv_based!(HTLCPreviousHopData, {
        // Note that by the time we get past the required read for type 2 above, outpoint will be
        // filled in, so we can safely unwrap it here.
        (9, channel_id, (default_value, ChannelId::v1_from_funding_outpoint(outpoint.0.unwrap()))),
+       (11, counterparty_node_id, option),
 });
 
 impl Writeable for ClaimableHTLC {
@@ -10731,6 +11124,7 @@ impl_writeable_tlv_based!(PendingAddHTLCInfo, {
        // Note that by the time we get past the required read for type 6 above, prev_funding_outpoint will be
        // filled in, so we can safely unwrap it here.
        (7, prev_channel_id, (default_value, ChannelId::v1_from_funding_outpoint(prev_funding_outpoint.0.unwrap()))),
+       (9, prev_counterparty_node_id, option),
 });
 
 impl Writeable for HTLCForwardInfo {
@@ -12008,7 +12402,12 @@ where
                                        for action in actions.iter() {
                                                if let MonitorUpdateCompletionAction::EmitEventAndFreeOtherChannel {
                                                        downstream_counterparty_and_funding_outpoint:
-                                                               Some((blocked_node_id, _blocked_channel_outpoint, blocked_channel_id, blocking_action)), ..
+                                                               Some(EventUnblockedChannel {
+                                                                       counterparty_node_id: blocked_node_id,
+                                                                       funding_txo: _,
+                                                                       channel_id: blocked_channel_id,
+                                                                       blocking_action,
+                                                               }), ..
                                                } = action {
                                                        if let Some(blocked_peer_state) = per_peer_state.get(blocked_node_id) {
                                                                log_trace!(logger,
@@ -12195,8 +12594,8 @@ mod tests {
                // update message and would always update the local fee info, even if our peer was
                // (spuriously) forwarding us our own channel_update.
                let as_node_one = nodes[0].node.get_our_node_id().serialize()[..] < nodes[1].node.get_our_node_id().serialize()[..];
-               let as_update = if as_node_one == (chan.0.contents.flags & 1 == 0 /* chan.0 is from node one */) { &chan.0 } else { &chan.1 };
-               let bs_update = if as_node_one == (chan.0.contents.flags & 1 == 0 /* chan.0 is from node one */) { &chan.1 } else { &chan.0 };
+               let as_update = if as_node_one == (chan.0.contents.channel_flags & 1 == 0 /* chan.0 is from node one */) { &chan.0 } else { &chan.1 };
+               let bs_update = if as_node_one == (chan.0.contents.channel_flags & 1 == 0 /* chan.0 is from node one */) { &chan.1 } else { &chan.0 };
 
                // First deliver each peers' own message, checking that the node doesn't need to be
                // persisted and that its channel info remains the same.