Add PaymentPurpose::is_keysend utility method
[rust-lightning] / lightning / src / ln / channelmanager.rs
index d41909beb3a08682b6fd4d0761f807a28b2771f0..0693f3c642ea735048d4c119d01754769c89514e 100644 (file)
@@ -31,8 +31,8 @@ use bitcoin::secp256k1::{SecretKey,PublicKey};
 use bitcoin::secp256k1::Secp256k1;
 use bitcoin::{secp256k1, Sequence};
 
-use crate::blinded_path::BlindedPath;
-use crate::blinded_path::payment::{PaymentConstraints, ReceiveTlvs};
+use crate::blinded_path::{BlindedPath, NodeIdLookUp};
+use crate::blinded_path::payment::{Bolt12OfferContext, Bolt12RefundContext, PaymentConstraints, PaymentContext, ReceiveTlvs};
 use crate::chain;
 use crate::chain::{Confirm, ChannelMonitorUpdateStatus, Watch, BestBlock};
 use crate::chain::chaininterface::{BroadcasterInterface, ConfirmationTarget, FeeEstimator, LowerBoundedFeeEstimator};
@@ -44,6 +44,7 @@ use crate::events::{Event, EventHandler, EventsProvider, MessageSendEvent, Messa
 // construct one themselves.
 use crate::ln::{inbound_payment, ChannelId, PaymentHash, PaymentPreimage, PaymentSecret};
 use crate::ln::channel::{self, Channel, ChannelPhase, ChannelContext, ChannelError, ChannelUpdateStatus, ShutdownResult, UnfundedChannelContext, UpdateFulfillCommitFetch, OutboundV1Channel, InboundV1Channel, WithChannelContext};
+pub use crate::ln::channel::{InboundHTLCDetails, InboundHTLCStateDetails, OutboundHTLCDetails, OutboundHTLCStateDetails};
 use crate::ln::features::{Bolt12InvoiceFeatures, ChannelFeatures, ChannelTypeFeatures, InitFeatures, NodeFeatures};
 #[cfg(any(feature = "_test_utils", test))]
 use crate::ln::features::Bolt11InvoiceFeatures;
@@ -57,9 +58,9 @@ use crate::ln::msgs::{ChannelMessageHandler, DecodeError, LightningError};
 use crate::ln::outbound_payment;
 use crate::ln::outbound_payment::{Bolt12PaymentError, OutboundPayments, PaymentAttempts, PendingOutboundPayment, SendAlongPathArgs, StaleExpiration};
 use crate::ln::wire::Encode;
-use crate::offers::invoice::{BlindedPayInfo, Bolt12Invoice, DEFAULT_RELATIVE_EXPIRY, DerivedSigningPubkey, InvoiceBuilder};
+use crate::offers::invoice::{BlindedPayInfo, Bolt12Invoice, DEFAULT_RELATIVE_EXPIRY, DerivedSigningPubkey, ExplicitSigningPubkey, InvoiceBuilder, UnsignedBolt12Invoice};
 use crate::offers::invoice_error::InvoiceError;
-use crate::offers::merkle::SignError;
+use crate::offers::invoice_request::{DerivedPayerId, InvoiceRequestBuilder};
 use crate::offers::offer::{Offer, OfferBuilder};
 use crate::offers::parse::Bolt12SemanticError;
 use crate::offers::refund::{Refund, RefundBuilder};
@@ -85,6 +86,7 @@ use {
 #[cfg(c_bindings)]
 use {
        crate::offers::offer::OfferWithDerivedMetadataBuilder,
+       crate::offers::refund::RefundMaybeWithDerivedMetadataBuilder,
 };
 
 use alloc::collections::{btree_map, BTreeMap};
@@ -153,6 +155,11 @@ pub enum PendingHTLCRouting {
                /// [`Event::PaymentClaimable::onion_fields`] as
                /// [`RecipientOnionFields::payment_metadata`].
                payment_metadata: Option<Vec<u8>>,
+               /// The context of the payment included by the recipient in a blinded path, or `None` if a
+               /// blinded path was not used.
+               ///
+               /// Used in part to determine the [`events::PaymentPurpose`].
+               payment_context: Option<PaymentContext>,
                /// CLTV expiry of the received HTLC.
                ///
                /// Used to track when we should expire pending HTLCs that go unclaimed.
@@ -197,6 +204,8 @@ pub enum PendingHTLCRouting {
                /// For HTLCs received by LDK, these will ultimately bubble back up as
                /// [`RecipientOnionFields::custom_tlvs`].
                custom_tlvs: Vec<(u64, Vec<u8>)>,
+               /// Set if this HTLC is the final hop in a multi-hop blinded path.
+               requires_blinded_error: bool,
        },
 }
 
@@ -218,6 +227,7 @@ impl PendingHTLCRouting {
                match self {
                        Self::Forward { blinded: Some(BlindedForward { failure, .. }), .. } => Some(*failure),
                        Self::Receive { requires_blinded_error: true, .. } => Some(BlindedFailure::FromBlindedNode),
+                       Self::ReceiveKeysend { requires_blinded_error: true, .. } => Some(BlindedFailure::FromBlindedNode),
                        _ => None,
                }
        }
@@ -293,6 +303,7 @@ pub(super) struct PendingAddHTLCInfo {
        // Note that this may be an outbound SCID alias for the associated channel.
        prev_short_channel_id: u64,
        prev_htlc_id: u64,
+       prev_channel_id: ChannelId,
        prev_funding_outpoint: OutPoint,
        prev_user_channel_id: u128,
 }
@@ -333,6 +344,7 @@ pub(crate) struct HTLCPreviousHopData {
        incoming_packet_shared_secret: [u8; 32],
        phantom_shared_secret: Option<[u8; 32]>,
        blinded_failure: Option<BlindedFailure>,
+       channel_id: ChannelId,
 
        // This field is consumed by `claim_funds_from_hop()` when updating a force-closed backwards
        // channel with a preimage provided by the forward channel.
@@ -345,6 +357,11 @@ enum OnionPayload {
                /// This is only here for backwards-compatibility in serialization, in the future it can be
                /// removed, breaking clients running 0.0.106 and earlier.
                _legacy_hop_data: Option<msgs::FinalOnionHopData>,
+               /// The context of the payment included by the recipient in a blinded path, or `None` if a
+               /// blinded path was not used.
+               ///
+               /// Used in part to determine the [`events::PaymentPurpose`].
+               payment_context: Option<PaymentContext>,
        },
        /// Contains the payer-provided preimage.
        Spontaneous(PaymentPreimage),
@@ -373,7 +390,7 @@ struct ClaimableHTLC {
 impl From<&ClaimableHTLC> for events::ClaimedHTLC {
        fn from(val: &ClaimableHTLC) -> Self {
                events::ClaimedHTLC {
-                       channel_id: val.prev_hop.outpoint.to_channel_id(),
+                       channel_id: val.prev_hop.channel_id,
                        user_channel_id: val.prev_hop.user_channel_id.unwrap_or(0),
                        cltv_expiry: val.cltv_expiry,
                        value_msat: val.value,
@@ -712,7 +729,7 @@ enum BackgroundEvent {
        ///
        /// Note that any such events are lost on shutdown, so in general they must be updates which
        /// are regenerated on startup.
-       ClosedMonitorUpdateRegeneratedOnStartup((OutPoint, ChannelMonitorUpdate)),
+       ClosedMonitorUpdateRegeneratedOnStartup((OutPoint, ChannelId, ChannelMonitorUpdate)),
        /// Handle a ChannelMonitorUpdate which may or may not close the channel and may unblock the
        /// channel to continue normal operation.
        ///
@@ -726,6 +743,7 @@ enum BackgroundEvent {
        MonitorUpdateRegeneratedOnStartup {
                counterparty_node_id: PublicKey,
                funding_txo: OutPoint,
+               channel_id: ChannelId,
                update: ChannelMonitorUpdate
        },
        /// Some [`ChannelMonitorUpdate`] (s) completed before we were serialized but we still have
@@ -754,7 +772,7 @@ pub(crate) enum MonitorUpdateCompletionAction {
        /// outbound edge.
        EmitEventAndFreeOtherChannel {
                event: events::Event,
-               downstream_counterparty_and_funding_outpoint: Option<(PublicKey, OutPoint, RAAMonitorUpdateBlockingAction)>,
+               downstream_counterparty_and_funding_outpoint: Option<(PublicKey, OutPoint, ChannelId, RAAMonitorUpdateBlockingAction)>,
        },
        /// Indicates we should immediately resume the operation of another channel, unless there is
        /// some other reason why the channel is blocked. In practice this simply means immediately
@@ -772,6 +790,7 @@ pub(crate) enum MonitorUpdateCompletionAction {
                downstream_counterparty_node_id: PublicKey,
                downstream_funding_outpoint: OutPoint,
                blocking_action: RAAMonitorUpdateBlockingAction,
+               downstream_channel_id: ChannelId,
        },
 }
 
@@ -783,6 +802,9 @@ impl_writeable_tlv_based_enum_upgradable!(MonitorUpdateCompletionAction,
                (0, downstream_counterparty_node_id, required),
                (2, downstream_funding_outpoint, required),
                (4, blocking_action, required),
+               // Note that by the time we get past the required read above, downstream_funding_outpoint will be
+               // filled in, so we can safely unwrap it here.
+               (5, downstream_channel_id, (default_value, ChannelId::v1_from_funding_outpoint(downstream_funding_outpoint.0.unwrap()))),
        },
        (2, EmitEventAndFreeOtherChannel) => {
                (0, event, upgradable_required),
@@ -800,12 +822,16 @@ pub(crate) enum EventCompletionAction {
        ReleaseRAAChannelMonitorUpdate {
                counterparty_node_id: PublicKey,
                channel_funding_outpoint: OutPoint,
+               channel_id: ChannelId,
        },
 }
 impl_writeable_tlv_based_enum!(EventCompletionAction,
        (0, ReleaseRAAChannelMonitorUpdate) => {
                (0, channel_funding_outpoint, required),
                (2, counterparty_node_id, required),
+               // Note that by the time we get past the required read above, channel_funding_outpoint will be
+               // filled in, so we can safely unwrap it here.
+               (3, channel_id, (default_value, ChannelId::v1_from_funding_outpoint(channel_funding_outpoint.0.unwrap()))),
        };
 );
 
@@ -827,7 +853,7 @@ pub(crate) enum RAAMonitorUpdateBlockingAction {
 impl RAAMonitorUpdateBlockingAction {
        fn from_prev_hop_data(prev_hop: &HTLCPreviousHopData) -> Self {
                Self::ForwardedPaymentInboundClaim {
-                       channel_id: prev_hop.outpoint.to_channel_id(),
+                       channel_id: prev_hop.channel_id,
                        htlc_id: prev_hop.htlc_id,
                }
        }
@@ -886,7 +912,7 @@ pub(super) struct PeerState<SP: Deref> where SP::Target: SignerProvider {
        /// The peer is currently connected (i.e. we've seen a
        /// [`ChannelMessageHandler::peer_connected`] and no corresponding
        /// [`ChannelMessageHandler::peer_disconnected`].
-       is_connected: bool,
+       pub is_connected: bool,
 }
 
 impl <SP: Deref> PeerState<SP> where SP::Target: SignerProvider {
@@ -897,7 +923,16 @@ impl <SP: Deref> PeerState<SP> where SP::Target: SignerProvider {
                if require_disconnected && self.is_connected {
                        return false
                }
-               self.channel_by_id.iter().filter(|(_, phase)| matches!(phase, ChannelPhase::Funded(_))).count() == 0
+               !self.channel_by_id.iter().any(|(_, phase)|
+                       match phase {
+                               ChannelPhase::Funded(_) | ChannelPhase::UnfundedOutboundV1(_) => true,
+                               ChannelPhase::UnfundedInboundV1(_) => false,
+                               #[cfg(dual_funding)]
+                               ChannelPhase::UnfundedOutboundV2(_) => true,
+                               #[cfg(dual_funding)]
+                               ChannelPhase::UnfundedInboundV2(_) => false,
+                       }
+               )
                        && self.monitor_update_blocked_actions.is_empty()
                        && self.in_flight_monitor_updates.is_empty()
        }
@@ -969,6 +1004,7 @@ pub type SimpleArcChannelManager<M, T, F, L> = ChannelManager<
        Arc<DefaultRouter<
                Arc<NetworkGraph<Arc<L>>>,
                Arc<L>,
+               Arc<KeysManager>,
                Arc<RwLock<ProbabilisticScorer<Arc<NetworkGraph<Arc<L>>>, Arc<L>>>>,
                ProbabilisticScoringFeeParameters,
                ProbabilisticScorer<Arc<NetworkGraph<Arc<L>>>, Arc<L>>,
@@ -999,6 +1035,7 @@ pub type SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, 'h, M, T, F, L> =
                &'e DefaultRouter<
                        &'f NetworkGraph<&'g L>,
                        &'g L,
+                       &'c KeysManager,
                        &'h RwLock<ProbabilisticScorer<&'f NetworkGraph<&'g L>, &'g L>>,
                        ProbabilisticScoringFeeParameters,
                        ProbabilisticScorer<&'f NetworkGraph<&'g L>, &'g L>
@@ -1081,11 +1118,626 @@ where
        fn get_cm(&self) -> &ChannelManager<M, T, ES, NS, SP, F, R, L> { self }
 }
 
-/// Manager which keeps track of a number of channels and sends messages to the appropriate
-/// channel, also tracking HTLC preimages and forwarding onion packets appropriately.
+/// A lightning node's channel state machine and payment management logic, which facilitates
+/// sending, forwarding, and receiving payments through lightning channels.
+///
+/// [`ChannelManager`] is parameterized by a number of components to achieve this.
+/// - [`chain::Watch`] (typically [`ChainMonitor`]) for on-chain monitoring and enforcement of each
+///   channel
+/// - [`BroadcasterInterface`] for broadcasting transactions related to opening, funding, and
+///   closing channels
+/// - [`EntropySource`] for providing random data needed for cryptographic operations
+/// - [`NodeSigner`] for cryptographic operations scoped to the node
+/// - [`SignerProvider`] for providing signers whose operations are scoped to individual channels
+/// - [`FeeEstimator`] to determine transaction fee rates needed to have a transaction mined in a
+///   timely manner
+/// - [`Router`] for finding payment paths when initiating and retrying payments
+/// - [`Logger`] for logging operational information of varying degrees
+///
+/// Additionally, it implements the following traits:
+/// - [`ChannelMessageHandler`] to handle off-chain channel activity from peers
+/// - [`MessageSendEventsProvider`] to similarly send such messages to peers
+/// - [`OffersMessageHandler`] for BOLT 12 message handling and sending
+/// - [`EventsProvider`] to generate user-actionable [`Event`]s
+/// - [`chain::Listen`] and [`chain::Confirm`] for notification of on-chain activity
+///
+/// Thus, [`ChannelManager`] is typically used to parameterize a [`MessageHandler`] and an
+/// [`OnionMessenger`]. The latter is required to support BOLT 12 functionality.
+///
+/// # `ChannelManager` vs `ChannelMonitor`
+///
+/// It's important to distinguish between the *off-chain* management and *on-chain* enforcement of
+/// lightning channels. [`ChannelManager`] exchanges messages with peers to manage the off-chain
+/// state of each channel. During this process, it generates a [`ChannelMonitor`] for each channel
+/// and a [`ChannelMonitorUpdate`] for each relevant change, notifying its parameterized
+/// [`chain::Watch`] of them.
+///
+/// An implementation of [`chain::Watch`], such as [`ChainMonitor`], is responsible for aggregating
+/// these [`ChannelMonitor`]s and applying any [`ChannelMonitorUpdate`]s to them. It then monitors
+/// for any pertinent on-chain activity, enforcing claims as needed.
+///
+/// This division of off-chain management and on-chain enforcement allows for interesting node
+/// setups. For instance, on-chain enforcement could be moved to a separate host or have added
+/// redundancy, possibly as a watchtower. See [`chain::Watch`] for the relevant interface.
+///
+/// # Initialization
+///
+/// Use [`ChannelManager::new`] with the most recent [`BlockHash`] when creating a fresh instance.
+/// Otherwise, if restarting, construct [`ChannelManagerReadArgs`] with the necessary parameters and
+/// references to any deserialized [`ChannelMonitor`]s that were previously persisted. Use this to
+/// deserialize the [`ChannelManager`] and feed it any new chain data since it was last online, as
+/// detailed in the [`ChannelManagerReadArgs`] documentation.
+///
+/// ```
+/// use bitcoin::BlockHash;
+/// use bitcoin::network::constants::Network;
+/// use lightning::chain::BestBlock;
+/// # use lightning::chain::channelmonitor::ChannelMonitor;
+/// use lightning::ln::channelmanager::{ChainParameters, ChannelManager, ChannelManagerReadArgs};
+/// # use lightning::routing::gossip::NetworkGraph;
+/// use lightning::util::config::UserConfig;
+/// use lightning::util::ser::ReadableArgs;
+///
+/// # fn read_channel_monitors() -> Vec<ChannelMonitor<lightning::sign::InMemorySigner>> { vec![] }
+/// # fn example<
+/// #     'a,
+/// #     L: lightning::util::logger::Logger,
+/// #     ES: lightning::sign::EntropySource,
+/// #     S: for <'b> lightning::routing::scoring::LockableScore<'b, ScoreLookUp = SL>,
+/// #     SL: lightning::routing::scoring::ScoreLookUp<ScoreParams = SP>,
+/// #     SP: Sized,
+/// #     R: lightning::io::Read,
+/// # >(
+/// #     fee_estimator: &dyn lightning::chain::chaininterface::FeeEstimator,
+/// #     chain_monitor: &dyn lightning::chain::Watch<lightning::sign::InMemorySigner>,
+/// #     tx_broadcaster: &dyn lightning::chain::chaininterface::BroadcasterInterface,
+/// #     router: &lightning::routing::router::DefaultRouter<&NetworkGraph<&'a L>, &'a L, &ES, &S, SP, SL>,
+/// #     logger: &L,
+/// #     entropy_source: &ES,
+/// #     node_signer: &dyn lightning::sign::NodeSigner,
+/// #     signer_provider: &lightning::sign::DynSignerProvider,
+/// #     best_block: lightning::chain::BestBlock,
+/// #     current_timestamp: u32,
+/// #     mut reader: R,
+/// # ) -> Result<(), lightning::ln::msgs::DecodeError> {
+/// // Fresh start with no channels
+/// let params = ChainParameters {
+///     network: Network::Bitcoin,
+///     best_block,
+/// };
+/// let default_config = UserConfig::default();
+/// let channel_manager = ChannelManager::new(
+///     fee_estimator, chain_monitor, tx_broadcaster, router, logger, entropy_source, node_signer,
+///     signer_provider, default_config, params, current_timestamp
+/// );
+///
+/// // Restart from deserialized data
+/// let mut channel_monitors = read_channel_monitors();
+/// let args = ChannelManagerReadArgs::new(
+///     entropy_source, node_signer, signer_provider, fee_estimator, chain_monitor, tx_broadcaster,
+///     router, logger, default_config, channel_monitors.iter_mut().collect()
+/// );
+/// let (block_hash, channel_manager) =
+///     <(BlockHash, ChannelManager<_, _, _, _, _, _, _, _>)>::read(&mut reader, args)?;
+///
+/// // Update the ChannelManager and ChannelMonitors with the latest chain data
+/// // ...
+///
+/// // Move the monitors to the ChannelManager's chain::Watch parameter
+/// for monitor in channel_monitors {
+///     chain_monitor.watch_channel(monitor.get_funding_txo().0, monitor);
+/// }
+/// # Ok(())
+/// # }
+/// ```
+///
+/// # Operation
+///
+/// The following is required for [`ChannelManager`] to function properly:
+/// - Handle messages from peers using its [`ChannelMessageHandler`] implementation (typically
+///   called by [`PeerManager::read_event`] when processing network I/O)
+/// - Send messages to peers obtained via its [`MessageSendEventsProvider`] implementation
+///   (typically initiated when [`PeerManager::process_events`] is called)
+/// - Feed on-chain activity using either its [`chain::Listen`] or [`chain::Confirm`] implementation
+///   as documented by those traits
+/// - Perform any periodic channel and payment checks by calling [`timer_tick_occurred`] roughly
+///   every minute
+/// - Persist to disk whenever [`get_and_clear_needs_persistence`] returns `true` using a
+///   [`Persister`] such as a [`KVStore`] implementation
+/// - Handle [`Event`]s obtained via its [`EventsProvider`] implementation
+///
+/// The [`Future`] returned by [`get_event_or_persistence_needed_future`] is useful in determining
+/// when the last two requirements need to be checked.
+///
+/// The [`lightning-block-sync`] and [`lightning-transaction-sync`] crates provide utilities that
+/// simplify feeding in on-chain activity using the [`chain::Listen`] and [`chain::Confirm`] traits,
+/// respectively. The remaining requirements can be met using the [`lightning-background-processor`]
+/// crate. For languages other than Rust, the availability of similar utilities may vary.
+///
+/// # Channels
+///
+/// [`ChannelManager`]'s primary function involves managing a channel state. Without channels,
+/// payments can't be sent. Use [`list_channels`] or [`list_usable_channels`] for a snapshot of the
+/// currently open channels.
+///
+/// ```
+/// # use lightning::ln::channelmanager::AChannelManager;
+/// #
+/// # fn example<T: AChannelManager>(channel_manager: T) {
+/// # let channel_manager = channel_manager.get_cm();
+/// let channels = channel_manager.list_usable_channels();
+/// for details in channels {
+///     println!("{:?}", details);
+/// }
+/// # }
+/// ```
+///
+/// Each channel is identified using a [`ChannelId`], which will change throughout the channel's
+/// life cycle. Additionally, channels are assigned a `user_channel_id`, which is given in
+/// [`Event`]s associated with the channel and serves as a fixed identifier but is otherwise unused
+/// by [`ChannelManager`].
+///
+/// ## Opening Channels
+///
+/// To an open a channel with a peer, call [`create_channel`]. This will initiate the process of
+/// opening an outbound channel, which requires self-funding when handling
+/// [`Event::FundingGenerationReady`].
+///
+/// ```
+/// # use bitcoin::{ScriptBuf, Transaction};
+/// # use bitcoin::secp256k1::PublicKey;
+/// # use lightning::ln::channelmanager::AChannelManager;
+/// # use lightning::events::{Event, EventsProvider};
+/// #
+/// # trait Wallet {
+/// #     fn create_funding_transaction(
+/// #         &self, _amount_sats: u64, _output_script: ScriptBuf
+/// #     ) -> Transaction;
+/// # }
+/// #
+/// # fn example<T: AChannelManager, W: Wallet>(channel_manager: T, wallet: W, peer_id: PublicKey) {
+/// # let channel_manager = channel_manager.get_cm();
+/// let value_sats = 1_000_000;
+/// let push_msats = 10_000_000;
+/// match channel_manager.create_channel(peer_id, value_sats, push_msats, 42, None, None) {
+///     Ok(channel_id) => println!("Opening channel {}", channel_id),
+///     Err(e) => println!("Error opening channel: {:?}", e),
+/// }
+///
+/// // On the event processing thread once the peer has responded
+/// channel_manager.process_pending_events(&|event| match event {
+///     Event::FundingGenerationReady {
+///         temporary_channel_id, counterparty_node_id, channel_value_satoshis, output_script,
+///         user_channel_id, ..
+///     } => {
+///         assert_eq!(user_channel_id, 42);
+///         let funding_transaction = wallet.create_funding_transaction(
+///             channel_value_satoshis, output_script
+///         );
+///         match channel_manager.funding_transaction_generated(
+///             &temporary_channel_id, &counterparty_node_id, funding_transaction
+///         ) {
+///             Ok(()) => println!("Funding channel {}", temporary_channel_id),
+///             Err(e) => println!("Error funding channel {}: {:?}", temporary_channel_id, e),
+///         }
+///     },
+///     Event::ChannelPending { channel_id, user_channel_id, former_temporary_channel_id, .. } => {
+///         assert_eq!(user_channel_id, 42);
+///         println!(
+///             "Channel {} now {} pending (funding transaction has been broadcasted)", channel_id,
+///             former_temporary_channel_id.unwrap()
+///         );
+///     },
+///     Event::ChannelReady { channel_id, user_channel_id, .. } => {
+///         assert_eq!(user_channel_id, 42);
+///         println!("Channel {} ready", channel_id);
+///     },
+///     // ...
+/// #     _ => {},
+/// });
+/// # }
+/// ```
+///
+/// ## Accepting Channels
+///
+/// Inbound channels are initiated by peers and are automatically accepted unless [`ChannelManager`]
+/// has [`UserConfig::manually_accept_inbound_channels`] set. In that case, the channel may be
+/// either accepted or rejected when handling [`Event::OpenChannelRequest`].
+///
+/// ```
+/// # use bitcoin::secp256k1::PublicKey;
+/// # use lightning::ln::channelmanager::AChannelManager;
+/// # use lightning::events::{Event, EventsProvider};
+/// #
+/// # fn is_trusted(counterparty_node_id: PublicKey) -> bool {
+/// #     // ...
+/// #     unimplemented!()
+/// # }
+/// #
+/// # fn example<T: AChannelManager>(channel_manager: T) {
+/// # let channel_manager = channel_manager.get_cm();
+/// channel_manager.process_pending_events(&|event| match event {
+///     Event::OpenChannelRequest { temporary_channel_id, counterparty_node_id, ..  } => {
+///         if !is_trusted(counterparty_node_id) {
+///             match channel_manager.force_close_without_broadcasting_txn(
+///                 &temporary_channel_id, &counterparty_node_id
+///             ) {
+///                 Ok(()) => println!("Rejecting channel {}", temporary_channel_id),
+///                 Err(e) => println!("Error rejecting channel {}: {:?}", temporary_channel_id, e),
+///             }
+///             return;
+///         }
+///
+///         let user_channel_id = 43;
+///         match channel_manager.accept_inbound_channel(
+///             &temporary_channel_id, &counterparty_node_id, user_channel_id
+///         ) {
+///             Ok(()) => println!("Accepting channel {}", temporary_channel_id),
+///             Err(e) => println!("Error accepting channel {}: {:?}", temporary_channel_id, e),
+///         }
+///     },
+///     // ...
+/// #     _ => {},
+/// });
+/// # }
+/// ```
 ///
-/// Implements [`ChannelMessageHandler`], handling the multi-channel parts and passing things through
-/// to individual Channels.
+/// ## Closing Channels
+///
+/// There are two ways to close a channel: either cooperatively using [`close_channel`] or
+/// unilaterally using [`force_close_broadcasting_latest_txn`]. The former is ideal as it makes for
+/// lower fees and immediate access to funds. However, the latter may be necessary if the
+/// counterparty isn't behaving properly or has gone offline. [`Event::ChannelClosed`] is generated
+/// once the channel has been closed successfully.
+///
+/// ```
+/// # use bitcoin::secp256k1::PublicKey;
+/// # use lightning::ln::ChannelId;
+/// # use lightning::ln::channelmanager::AChannelManager;
+/// # use lightning::events::{Event, EventsProvider};
+/// #
+/// # fn example<T: AChannelManager>(
+/// #     channel_manager: T, channel_id: ChannelId, counterparty_node_id: PublicKey
+/// # ) {
+/// # let channel_manager = channel_manager.get_cm();
+/// match channel_manager.close_channel(&channel_id, &counterparty_node_id) {
+///     Ok(()) => println!("Closing channel {}", channel_id),
+///     Err(e) => println!("Error closing channel {}: {:?}", channel_id, e),
+/// }
+///
+/// // On the event processing thread
+/// channel_manager.process_pending_events(&|event| match event {
+///     Event::ChannelClosed { channel_id, user_channel_id, ..  } => {
+///         assert_eq!(user_channel_id, 42);
+///         println!("Channel {} closed", channel_id);
+///     },
+///     // ...
+/// #     _ => {},
+/// });
+/// # }
+/// ```
+///
+/// # Payments
+///
+/// [`ChannelManager`] is responsible for sending, forwarding, and receiving payments through its
+/// channels. A payment is typically initiated from a [BOLT 11] invoice or a [BOLT 12] offer, though
+/// spontaneous (i.e., keysend) payments are also possible. Incoming payments don't require
+/// maintaining any additional state as [`ChannelManager`] can reconstruct the [`PaymentPreimage`]
+/// from the [`PaymentSecret`]. Sending payments, however, require tracking in order to retry failed
+/// HTLCs.
+///
+/// After a payment is initiated, it will appear in [`list_recent_payments`] until a short time
+/// after either an [`Event::PaymentSent`] or [`Event::PaymentFailed`] is handled. Failed HTLCs
+/// for a payment will be retried according to the payment's [`Retry`] strategy or until
+/// [`abandon_payment`] is called.
+///
+/// ## BOLT 11 Invoices
+///
+/// The [`lightning-invoice`] crate is useful for creating BOLT 11 invoices. Specifically, use the
+/// functions in its `utils` module for constructing invoices that are compatible with
+/// [`ChannelManager`]. These functions serve as a convenience for building invoices with the
+/// [`PaymentHash`] and [`PaymentSecret`] returned from [`create_inbound_payment`]. To provide your
+/// own [`PaymentHash`], use [`create_inbound_payment_for_hash`] or the corresponding functions in
+/// the [`lightning-invoice`] `utils` module.
+///
+/// [`ChannelManager`] generates an [`Event::PaymentClaimable`] once the full payment has been
+/// received. Call [`claim_funds`] to release the [`PaymentPreimage`], which in turn will result in
+/// an [`Event::PaymentClaimed`].
+///
+/// ```
+/// # use lightning::events::{Event, EventsProvider, PaymentPurpose};
+/// # use lightning::ln::channelmanager::AChannelManager;
+/// #
+/// # fn example<T: AChannelManager>(channel_manager: T) {
+/// # let channel_manager = channel_manager.get_cm();
+/// // Or use utils::create_invoice_from_channelmanager
+/// let known_payment_hash = match channel_manager.create_inbound_payment(
+///     Some(10_000_000), 3600, None
+/// ) {
+///     Ok((payment_hash, _payment_secret)) => {
+///         println!("Creating inbound payment {}", payment_hash);
+///         payment_hash
+///     },
+///     Err(()) => panic!("Error creating inbound payment"),
+/// };
+///
+/// // On the event processing thread
+/// channel_manager.process_pending_events(&|event| match event {
+///     Event::PaymentClaimable { payment_hash, purpose, .. } => match purpose {
+///         PaymentPurpose::Bolt11InvoicePayment { payment_preimage: Some(payment_preimage), .. } => {
+///             assert_eq!(payment_hash, known_payment_hash);
+///             println!("Claiming payment {}", payment_hash);
+///             channel_manager.claim_funds(payment_preimage);
+///         },
+///         PaymentPurpose::Bolt11InvoicePayment { payment_preimage: None, .. } => {
+///             println!("Unknown payment hash: {}", payment_hash);
+///         },
+///         PaymentPurpose::SpontaneousPayment(payment_preimage) => {
+///             assert_ne!(payment_hash, known_payment_hash);
+///             println!("Claiming spontaneous payment {}", payment_hash);
+///             channel_manager.claim_funds(payment_preimage);
+///         },
+///     },
+///     Event::PaymentClaimed { payment_hash, amount_msat, .. } => {
+///         assert_eq!(payment_hash, known_payment_hash);
+///         println!("Claimed {} msats", amount_msat);
+///     },
+///     // ...
+/// #     _ => {},
+/// });
+/// # }
+/// ```
+///
+/// For paying an invoice, [`lightning-invoice`] provides a `payment` module with convenience
+/// functions for use with [`send_payment`].
+///
+/// ```
+/// # use lightning::events::{Event, EventsProvider};
+/// # use lightning::ln::PaymentHash;
+/// # use lightning::ln::channelmanager::{AChannelManager, PaymentId, RecentPaymentDetails, RecipientOnionFields, Retry};
+/// # use lightning::routing::router::RouteParameters;
+/// #
+/// # fn example<T: AChannelManager>(
+/// #     channel_manager: T, payment_hash: PaymentHash, recipient_onion: RecipientOnionFields,
+/// #     route_params: RouteParameters, retry: Retry
+/// # ) {
+/// # let channel_manager = channel_manager.get_cm();
+/// // let (payment_hash, recipient_onion, route_params) =
+/// //     payment::payment_parameters_from_invoice(&invoice);
+/// let payment_id = PaymentId([42; 32]);
+/// match channel_manager.send_payment(
+///     payment_hash, recipient_onion, payment_id, route_params, retry
+/// ) {
+///     Ok(()) => println!("Sending payment with hash {}", payment_hash),
+///     Err(e) => println!("Failed sending payment with hash {}: {:?}", payment_hash, e),
+/// }
+///
+/// let expected_payment_id = payment_id;
+/// let expected_payment_hash = payment_hash;
+/// assert!(
+///     channel_manager.list_recent_payments().iter().find(|details| matches!(
+///         details,
+///         RecentPaymentDetails::Pending {
+///             payment_id: expected_payment_id,
+///             payment_hash: expected_payment_hash,
+///             ..
+///         }
+///     )).is_some()
+/// );
+///
+/// // On the event processing thread
+/// channel_manager.process_pending_events(&|event| match event {
+///     Event::PaymentSent { payment_hash, .. } => println!("Paid {}", payment_hash),
+///     Event::PaymentFailed { payment_hash, .. } => println!("Failed paying {}", payment_hash),
+///     // ...
+/// #     _ => {},
+/// });
+/// # }
+/// ```
+///
+/// ## BOLT 12 Offers
+///
+/// The [`offers`] module is useful for creating BOLT 12 offers. An [`Offer`] is a precursor to a
+/// [`Bolt12Invoice`], which must first be requested by the payer. The interchange of these messages
+/// as defined in the specification is handled by [`ChannelManager`] and its implementation of
+/// [`OffersMessageHandler`]. However, this only works with an [`Offer`] created using a builder
+/// returned by [`create_offer_builder`]. With this approach, BOLT 12 offers and invoices are
+/// stateless just as BOLT 11 invoices are.
+///
+/// ```
+/// # use lightning::events::{Event, EventsProvider, PaymentPurpose};
+/// # use lightning::ln::channelmanager::AChannelManager;
+/// # use lightning::offers::parse::Bolt12SemanticError;
+/// #
+/// # fn example<T: AChannelManager>(channel_manager: T) -> Result<(), Bolt12SemanticError> {
+/// # let channel_manager = channel_manager.get_cm();
+/// let offer = channel_manager
+///     .create_offer_builder("coffee".to_string())?
+/// # ;
+/// # // Needed for compiling for c_bindings
+/// # let builder: lightning::offers::offer::OfferBuilder<_, _> = offer.into();
+/// # let offer = builder
+///     .amount_msats(10_000_000)
+///     .build()?;
+/// let bech32_offer = offer.to_string();
+///
+/// // On the event processing thread
+/// channel_manager.process_pending_events(&|event| match event {
+///     Event::PaymentClaimable { payment_hash, purpose, .. } => match purpose {
+///         PaymentPurpose::Bolt11InvoicePayment { payment_preimage: Some(payment_preimage), .. } => {
+///             println!("Claiming payment {}", payment_hash);
+///             channel_manager.claim_funds(payment_preimage);
+///         },
+///         PaymentPurpose::Bolt11InvoicePayment { payment_preimage: None, .. } => {
+///             println!("Unknown payment hash: {}", payment_hash);
+///         },
+///         // ...
+/// #         _ => {},
+///     },
+///     Event::PaymentClaimed { payment_hash, amount_msat, .. } => {
+///         println!("Claimed {} msats", amount_msat);
+///     },
+///     // ...
+/// #     _ => {},
+/// });
+/// # Ok(())
+/// # }
+/// ```
+///
+/// Use [`pay_for_offer`] to initiated payment, which sends an [`InvoiceRequest`] for an [`Offer`]
+/// and pays the [`Bolt12Invoice`] response. In addition to success and failure events,
+/// [`ChannelManager`] may also generate an [`Event::InvoiceRequestFailed`].
+///
+/// ```
+/// # use lightning::events::{Event, EventsProvider};
+/// # use lightning::ln::channelmanager::{AChannelManager, PaymentId, RecentPaymentDetails, Retry};
+/// # use lightning::offers::offer::Offer;
+/// #
+/// # fn example<T: AChannelManager>(
+/// #     channel_manager: T, offer: &Offer, quantity: Option<u64>, amount_msats: Option<u64>,
+/// #     payer_note: Option<String>, retry: Retry, max_total_routing_fee_msat: Option<u64>
+/// # ) {
+/// # let channel_manager = channel_manager.get_cm();
+/// let payment_id = PaymentId([42; 32]);
+/// match channel_manager.pay_for_offer(
+///     offer, quantity, amount_msats, payer_note, payment_id, retry, max_total_routing_fee_msat
+/// ) {
+///     Ok(()) => println!("Requesting invoice for offer"),
+///     Err(e) => println!("Unable to request invoice for offer: {:?}", e),
+/// }
+///
+/// // First the payment will be waiting on an invoice
+/// let expected_payment_id = payment_id;
+/// assert!(
+///     channel_manager.list_recent_payments().iter().find(|details| matches!(
+///         details,
+///         RecentPaymentDetails::AwaitingInvoice { payment_id: expected_payment_id }
+///     )).is_some()
+/// );
+///
+/// // Once the invoice is received, a payment will be sent
+/// assert!(
+///     channel_manager.list_recent_payments().iter().find(|details| matches!(
+///         details,
+///         RecentPaymentDetails::Pending { payment_id: expected_payment_id, ..  }
+///     )).is_some()
+/// );
+///
+/// // On the event processing thread
+/// channel_manager.process_pending_events(&|event| match event {
+///     Event::PaymentSent { payment_id: Some(payment_id), .. } => println!("Paid {}", payment_id),
+///     Event::PaymentFailed { payment_id, .. } => println!("Failed paying {}", payment_id),
+///     Event::InvoiceRequestFailed { payment_id, .. } => println!("Failed paying {}", payment_id),
+///     // ...
+/// #     _ => {},
+/// });
+/// # }
+/// ```
+///
+/// ## BOLT 12 Refunds
+///
+/// A [`Refund`] is a request for an invoice to be paid. Like *paying* for an [`Offer`], *creating*
+/// a [`Refund`] involves maintaining state since it represents a future outbound payment.
+/// Therefore, use [`create_refund_builder`] when creating one, otherwise [`ChannelManager`] will
+/// refuse to pay any corresponding [`Bolt12Invoice`] that it receives.
+///
+/// ```
+/// # use core::time::Duration;
+/// # use lightning::events::{Event, EventsProvider};
+/// # use lightning::ln::channelmanager::{AChannelManager, PaymentId, RecentPaymentDetails, Retry};
+/// # use lightning::offers::parse::Bolt12SemanticError;
+/// #
+/// # fn example<T: AChannelManager>(
+/// #     channel_manager: T, amount_msats: u64, absolute_expiry: Duration, retry: Retry,
+/// #     max_total_routing_fee_msat: Option<u64>
+/// # ) -> Result<(), Bolt12SemanticError> {
+/// # let channel_manager = channel_manager.get_cm();
+/// let payment_id = PaymentId([42; 32]);
+/// let refund = channel_manager
+///     .create_refund_builder(
+///         "coffee".to_string(), amount_msats, absolute_expiry, payment_id, retry,
+///         max_total_routing_fee_msat
+///     )?
+/// # ;
+/// # // Needed for compiling for c_bindings
+/// # let builder: lightning::offers::refund::RefundBuilder<_> = refund.into();
+/// # let refund = builder
+///     .payer_note("refund for order 1234".to_string())
+///     .build()?;
+/// let bech32_refund = refund.to_string();
+///
+/// // First the payment will be waiting on an invoice
+/// let expected_payment_id = payment_id;
+/// assert!(
+///     channel_manager.list_recent_payments().iter().find(|details| matches!(
+///         details,
+///         RecentPaymentDetails::AwaitingInvoice { payment_id: expected_payment_id }
+///     )).is_some()
+/// );
+///
+/// // Once the invoice is received, a payment will be sent
+/// assert!(
+///     channel_manager.list_recent_payments().iter().find(|details| matches!(
+///         details,
+///         RecentPaymentDetails::Pending { payment_id: expected_payment_id, ..  }
+///     )).is_some()
+/// );
+///
+/// // On the event processing thread
+/// channel_manager.process_pending_events(&|event| match event {
+///     Event::PaymentSent { payment_id: Some(payment_id), .. } => println!("Paid {}", payment_id),
+///     Event::PaymentFailed { payment_id, .. } => println!("Failed paying {}", payment_id),
+///     // ...
+/// #     _ => {},
+/// });
+/// # Ok(())
+/// # }
+/// ```
+///
+/// Use [`request_refund_payment`] to send a [`Bolt12Invoice`] for receiving the refund. Similar to
+/// *creating* an [`Offer`], this is stateless as it represents an inbound payment.
+///
+/// ```
+/// # use lightning::events::{Event, EventsProvider, PaymentPurpose};
+/// # use lightning::ln::channelmanager::AChannelManager;
+/// # use lightning::offers::refund::Refund;
+/// #
+/// # fn example<T: AChannelManager>(channel_manager: T, refund: &Refund) {
+/// # let channel_manager = channel_manager.get_cm();
+/// let known_payment_hash = match channel_manager.request_refund_payment(refund) {
+///     Ok(invoice) => {
+///         let payment_hash = invoice.payment_hash();
+///         println!("Requesting refund payment {}", payment_hash);
+///         payment_hash
+///     },
+///     Err(e) => panic!("Unable to request payment for refund: {:?}", e),
+/// };
+///
+/// // On the event processing thread
+/// channel_manager.process_pending_events(&|event| match event {
+///     Event::PaymentClaimable { payment_hash, purpose, .. } => match purpose {
+///            PaymentPurpose::Bolt11InvoicePayment { payment_preimage: Some(payment_preimage), .. } => {
+///             assert_eq!(payment_hash, known_payment_hash);
+///             println!("Claiming payment {}", payment_hash);
+///             channel_manager.claim_funds(payment_preimage);
+///         },
+///            PaymentPurpose::Bolt11InvoicePayment { payment_preimage: None, .. } => {
+///             println!("Unknown payment hash: {}", payment_hash);
+///            },
+///         // ...
+/// #         _ => {},
+///     },
+///     Event::PaymentClaimed { payment_hash, amount_msat, .. } => {
+///         assert_eq!(payment_hash, known_payment_hash);
+///         println!("Claimed {} msats", amount_msat);
+///     },
+///     // ...
+/// #     _ => {},
+/// });
+/// # }
+/// ```
+///
+/// # Persistence
 ///
 /// Implements [`Writeable`] to write out all channel state to disk. Implies [`peer_disconnected`] for
 /// all peers during write/read (though does not modify this instance, only the instance being
@@ -1106,12 +1758,16 @@ where
 /// tells you the last block hash which was connected. You should get the best block tip before using the manager.
 /// See [`chain::Listen`] and [`chain::Confirm`] for more details.
 ///
+/// # `ChannelUpdate` Messages
+///
 /// Note that `ChannelManager` is responsible for tracking liveness of its channels and generating
 /// [`ChannelUpdate`] messages informing peers that the channel is temporarily disabled. To avoid
 /// spam due to quick disconnection/reconnection, updates are not sent until the channel has been
 /// offline for a full minute. In order to track this, you must call
 /// [`timer_tick_occurred`] roughly once per minute, though it doesn't have to be perfect.
 ///
+/// # DoS Mitigation
+///
 /// To avoid trivial DoS issues, `ChannelManager` limits the number of inbound connections and
 /// inbound channels without confirmed funding transactions. This may result in nodes which we do
 /// not have a channel with being unable to connect to us or open new channels with us if we have
@@ -1121,19 +1777,53 @@ where
 /// exempted from the count of unfunded channels. Similarly, outbound channels and connections are
 /// never limited. Please ensure you limit the count of such channels yourself.
 ///
+/// # Type Aliases
+///
 /// Rather than using a plain `ChannelManager`, it is preferable to use either a [`SimpleArcChannelManager`]
 /// a [`SimpleRefChannelManager`], for conciseness. See their documentation for more details, but
 /// essentially you should default to using a [`SimpleRefChannelManager`], and use a
 /// [`SimpleArcChannelManager`] when you require a `ChannelManager` with a static lifetime, such as when
 /// you're using lightning-net-tokio.
 ///
+/// [`ChainMonitor`]: crate::chain::chainmonitor::ChainMonitor
+/// [`MessageHandler`]: crate::ln::peer_handler::MessageHandler
+/// [`OnionMessenger`]: crate::onion_message::messenger::OnionMessenger
+/// [`PeerManager::read_event`]: crate::ln::peer_handler::PeerManager::read_event
+/// [`PeerManager::process_events`]: crate::ln::peer_handler::PeerManager::process_events
+/// [`timer_tick_occurred`]: Self::timer_tick_occurred
+/// [`get_and_clear_needs_persistence`]: Self::get_and_clear_needs_persistence
+/// [`Persister`]: crate::util::persist::Persister
+/// [`KVStore`]: crate::util::persist::KVStore
+/// [`get_event_or_persistence_needed_future`]: Self::get_event_or_persistence_needed_future
+/// [`lightning-block-sync`]: https://docs.rs/lightning_block_sync/latest/lightning_block_sync
+/// [`lightning-transaction-sync`]: https://docs.rs/lightning_transaction_sync/latest/lightning_transaction_sync
+/// [`lightning-background-processor`]: https://docs.rs/lightning_background_processor/lightning_background_processor
+/// [`list_channels`]: Self::list_channels
+/// [`list_usable_channels`]: Self::list_usable_channels
+/// [`create_channel`]: Self::create_channel
+/// [`close_channel`]: Self::force_close_broadcasting_latest_txn
+/// [`force_close_broadcasting_latest_txn`]: Self::force_close_broadcasting_latest_txn
+/// [BOLT 11]: https://github.com/lightning/bolts/blob/master/11-payment-encoding.md
+/// [BOLT 12]: https://github.com/rustyrussell/lightning-rfc/blob/guilt/offers/12-offer-encoding.md
+/// [`list_recent_payments`]: Self::list_recent_payments
+/// [`abandon_payment`]: Self::abandon_payment
+/// [`lightning-invoice`]: https://docs.rs/lightning_invoice/latest/lightning_invoice
+/// [`create_inbound_payment`]: Self::create_inbound_payment
+/// [`create_inbound_payment_for_hash`]: Self::create_inbound_payment_for_hash
+/// [`claim_funds`]: Self::claim_funds
+/// [`send_payment`]: Self::send_payment
+/// [`offers`]: crate::offers
+/// [`create_offer_builder`]: Self::create_offer_builder
+/// [`pay_for_offer`]: Self::pay_for_offer
+/// [`InvoiceRequest`]: crate::offers::invoice_request::InvoiceRequest
+/// [`create_refund_builder`]: Self::create_refund_builder
+/// [`request_refund_payment`]: Self::request_refund_payment
 /// [`peer_disconnected`]: msgs::ChannelMessageHandler::peer_disconnected
 /// [`funding_created`]: msgs::FundingCreated
 /// [`funding_transaction_generated`]: Self::funding_transaction_generated
 /// [`BlockHash`]: bitcoin::hash_types::BlockHash
 /// [`update_channel`]: chain::Watch::update_channel
 /// [`ChannelUpdate`]: msgs::ChannelUpdate
-/// [`timer_tick_occurred`]: Self::timer_tick_occurred
 /// [`read`]: ReadableArgs::read
 //
 // Lock order:
@@ -1153,6 +1843,8 @@ where
 //  |   |
 //  |   |__`pending_intercepted_htlcs`
 //  |
+//  |__`decode_update_add_htlcs`
+//  |
 //  |__`per_peer_state`
 //      |
 //      |__`pending_inbound_payments`
@@ -1243,6 +1935,18 @@ where
        /// See `ChannelManager` struct-level documentation for lock order requirements.
        pending_intercepted_htlcs: Mutex<HashMap<InterceptId, PendingAddHTLCInfo>>,
 
+       /// SCID/SCID Alias -> pending `update_add_htlc`s to decode.
+       ///
+       /// Note that because we may have an SCID Alias as the key we can have two entries per channel,
+       /// though in practice we probably won't be receiving HTLCs for a channel both via the alias
+       /// and via the classic SCID.
+       ///
+       /// Note that no consistency guarantees are made about the existence of a channel with the
+       /// `short_channel_id` here, nor the `channel_id` in `UpdateAddHTLC`!
+       ///
+       /// See `ChannelManager` struct-level documentation for lock order requirements.
+       decode_update_add_htlcs: Mutex<HashMap<u64, Vec<msgs::UpdateAddHTLC>>>,
+
        /// The sets of payments which are claimable or currently being claimed. See
        /// [`ClaimablePayments`]' individual field docs for more info.
        ///
@@ -1386,6 +2090,9 @@ where
 
        pending_offers_messages: Mutex<Vec<PendingOnionMessage<OffersMessage>>>,
 
+       /// Tracks the message events that are to be broadcasted when we are connected to some peer.
+       pending_broadcast_messages: Mutex<Vec<MessageSendEvent>>,
+
        entropy_source: ES,
        node_signer: NS,
        signer_provider: SP,
@@ -1630,9 +2337,6 @@ pub struct ChannelDetails {
        pub counterparty: ChannelCounterparty,
        /// The Channel's funding transaction output, if we've negotiated the funding transaction with
        /// our counterparty already.
-       ///
-       /// Note that, if this has been set, `channel_id` will be equivalent to
-       /// `funding_txo.unwrap().to_channel_id()`.
        pub funding_txo: Option<OutPoint>,
        /// The features which this channel operates with. See individual features for more info.
        ///
@@ -1798,6 +2502,14 @@ pub struct ChannelDetails {
        ///
        /// This field is only `None` for `ChannelDetails` objects serialized prior to LDK 0.0.109.
        pub config: Option<ChannelConfig>,
+       /// Pending inbound HTLCs.
+       ///
+       /// This field is empty for objects serialized with LDK versions prior to 0.0.122.
+       pub pending_inbound_htlcs: Vec<InboundHTLCDetails>,
+       /// Pending outbound HTLCs.
+       ///
+       /// This field is empty for objects serialized with LDK versions prior to 0.0.122.
+       pub pending_outbound_htlcs: Vec<OutboundHTLCDetails>,
 }
 
 impl ChannelDetails {
@@ -1875,6 +2587,8 @@ impl ChannelDetails {
                        inbound_htlc_maximum_msat: context.get_holder_htlc_maximum_msat(),
                        config: Some(context.config()),
                        channel_shutdown_state: Some(context.shutdown_state()),
+                       pending_inbound_htlcs: context.get_pending_inbound_htlc_details(),
+                       pending_outbound_htlcs: context.get_pending_outbound_htlc_details(),
                }
        }
 }
@@ -1970,7 +2684,7 @@ macro_rules! handle_error {
                match $internal {
                        Ok(msg) => Ok(msg),
                        Err(MsgHandleErrInternal { err, shutdown_finish, .. }) => {
-                               let mut msg_events = Vec::with_capacity(2);
+                               let mut msg_event = None;
 
                                if let Some((shutdown_res, update_option)) = shutdown_finish {
                                        let counterparty_node_id = shutdown_res.counterparty_node_id;
@@ -1982,7 +2696,8 @@ macro_rules! handle_error {
 
                                        $self.finish_close_channel(shutdown_res);
                                        if let Some(update) = update_option {
-                                               msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
+                                               let mut pending_broadcast_messages = $self.pending_broadcast_messages.lock().unwrap();
+                                               pending_broadcast_messages.push(events::MessageSendEvent::BroadcastChannelUpdate {
                                                        msg: update
                                                });
                                        }
@@ -1992,17 +2707,17 @@ macro_rules! handle_error {
 
                                if let msgs::ErrorAction::IgnoreError = err.action {
                                } else {
-                                       msg_events.push(events::MessageSendEvent::HandleError {
+                                       msg_event = Some(events::MessageSendEvent::HandleError {
                                                node_id: $counterparty_node_id,
                                                action: err.action.clone()
                                        });
                                }
 
-                               if !msg_events.is_empty() {
+                               if let Some(msg_event) = msg_event {
                                        let per_peer_state = $self.per_peer_state.read().unwrap();
                                        if let Some(peer_state_mutex) = per_peer_state.get(&$counterparty_node_id) {
                                                let mut peer_state = peer_state_mutex.lock().unwrap();
-                                               peer_state.pending_msg_events.append(&mut msg_events);
+                                               peer_state.pending_msg_events.push(msg_event);
                                        }
                                }
 
@@ -2074,6 +2789,14 @@ macro_rules! convert_chan_phase_err {
                        ChannelPhase::UnfundedInboundV1(channel) => {
                                convert_chan_phase_err!($self, $err, channel, $channel_id, UNFUNDED_CHANNEL)
                        },
+                       #[cfg(dual_funding)]
+                       ChannelPhase::UnfundedOutboundV2(channel) => {
+                               convert_chan_phase_err!($self, $err, channel, $channel_id, UNFUNDED_CHANNEL)
+                       },
+                       #[cfg(dual_funding)]
+                       ChannelPhase::UnfundedInboundV2(channel) => {
+                               convert_chan_phase_err!($self, $err, channel, $channel_id, UNFUNDED_CHANNEL)
+                       },
                }
        };
 }
@@ -2149,6 +2872,7 @@ macro_rules! emit_channel_pending_event {
                                counterparty_node_id: $channel.context.get_counterparty_node_id(),
                                user_channel_id: $channel.context.get_user_id(),
                                funding_txo: $channel.context.get_funding_txo().unwrap().into_bitcoin_outpoint(),
+                               channel_type: Some($channel.context.get_channel_type().clone()),
                        }, None));
                        $channel.context.set_channel_pending_event_emitted();
                }
@@ -2175,7 +2899,7 @@ macro_rules! handle_monitor_update_completion {
                let logger = WithChannelContext::from(&$self.logger, &$chan.context);
                let mut updates = $chan.monitor_updating_restored(&&logger,
                        &$self.node_signer, $self.chain_hash, &$self.default_configuration,
-                       $self.best_block.read().unwrap().height());
+                       $self.best_block.read().unwrap().height);
                let counterparty_node_id = $chan.context.get_counterparty_node_id();
                let channel_update = if updates.channel_ready.is_some() && $chan.context.is_usable() {
                        // We only send a channel_update in the case where we are just now sending a
@@ -2194,9 +2918,9 @@ macro_rules! handle_monitor_update_completion {
                let update_actions = $peer_state.monitor_update_blocked_actions
                        .remove(&$chan.context.channel_id()).unwrap_or(Vec::new());
 
-               let htlc_forwards = $self.handle_channel_resumption(
+               let (htlc_forwards, decode_update_add_htlcs) = $self.handle_channel_resumption(
                        &mut $peer_state.pending_msg_events, $chan, updates.raa,
-                       updates.commitment_update, updates.order, updates.accepted_htlcs,
+                       updates.commitment_update, updates.order, updates.accepted_htlcs, updates.pending_update_adds,
                        updates.funding_broadcastable, updates.channel_ready,
                        updates.announcement_sigs);
                if let Some(upd) = channel_update {
@@ -2257,6 +2981,9 @@ macro_rules! handle_monitor_update_completion {
                if let Some(forwards) = htlc_forwards {
                        $self.forward_htlcs(&mut [forwards][..]);
                }
+               if let Some(decode) = decode_update_add_htlcs {
+                       $self.push_decode_update_add_htlcs(decode);
+               }
                $self.finalize_claims(updates.finalized_claimed_htlcs);
                for failure in updates.failed_htlcs.drain(..) {
                        let receiver = HTLCDestination::NextHopChannel { node_id: Some(counterparty_node_id), channel_id };
@@ -2429,14 +3156,15 @@ where
 
                        best_block: RwLock::new(params.best_block),
 
-                       outbound_scid_aliases: Mutex::new(HashSet::new()),
-                       pending_inbound_payments: Mutex::new(HashMap::new()),
+                       outbound_scid_aliases: Mutex::new(new_hash_set()),
+                       pending_inbound_payments: Mutex::new(new_hash_map()),
                        pending_outbound_payments: OutboundPayments::new(),
-                       forward_htlcs: Mutex::new(HashMap::new()),
-                       claimable_payments: Mutex::new(ClaimablePayments { claimable_payments: HashMap::new(), pending_claiming_payments: HashMap::new() }),
-                       pending_intercepted_htlcs: Mutex::new(HashMap::new()),
-                       outpoint_to_peer: Mutex::new(HashMap::new()),
-                       short_to_chan_info: FairRwLock::new(HashMap::new()),
+                       forward_htlcs: Mutex::new(new_hash_map()),
+                       decode_update_add_htlcs: Mutex::new(new_hash_map()),
+                       claimable_payments: Mutex::new(ClaimablePayments { claimable_payments: new_hash_map(), pending_claiming_payments: new_hash_map() }),
+                       pending_intercepted_htlcs: Mutex::new(new_hash_map()),
+                       outpoint_to_peer: Mutex::new(new_hash_map()),
+                       short_to_chan_info: FairRwLock::new(new_hash_map()),
 
                        our_network_pubkey: node_signer.get_node_id(Recipient::Node).unwrap(),
                        secp_ctx,
@@ -2448,7 +3176,7 @@ where
 
                        highest_seen_timestamp: AtomicUsize::new(current_timestamp as usize),
 
-                       per_peer_state: FairRwLock::new(HashMap::new()),
+                       per_peer_state: FairRwLock::new(new_hash_map()),
 
                        pending_events: Mutex::new(VecDeque::new()),
                        pending_events_processor: AtomicBool::new(false),
@@ -2460,6 +3188,7 @@ where
                        funding_batch_states: Mutex::new(BTreeMap::new()),
 
                        pending_offers_messages: Mutex::new(Vec::new()),
+                       pending_broadcast_messages: Mutex::new(Vec::new()),
 
                        entropy_source,
                        node_signer,
@@ -2475,7 +3204,7 @@ where
        }
 
        fn create_and_insert_outbound_scid_alias(&self) -> u64 {
-               let height = self.best_block.read().unwrap().height();
+               let height = self.best_block.read().unwrap().height;
                let mut outbound_scid_alias = 0;
                let mut i = 0;
                loop {
@@ -2553,7 +3282,7 @@ where
                        let config = if override_config.is_some() { override_config.as_ref().unwrap() } else { &self.default_configuration };
                        match OutboundV1Channel::new(&self.fee_estimator, &self.entropy_source, &self.signer_provider, their_network_key,
                                their_features, channel_value_satoshis, push_msat, user_channel_id, config,
-                               self.best_block.read().unwrap().height(), outbound_scid_alias, temporary_channel_id)
+                               self.best_block.read().unwrap().height, outbound_scid_alias, temporary_channel_id)
                        {
                                Ok(res) => res,
                                Err(e) => {
@@ -2592,7 +3321,7 @@ where
                // the same channel.
                let mut res = Vec::with_capacity(self.short_to_chan_info.read().unwrap().len());
                {
-                       let best_block_height = self.best_block.read().unwrap().height();
+                       let best_block_height = self.best_block.read().unwrap().height;
                        let per_peer_state = self.per_peer_state.read().unwrap();
                        for (_cp_id, peer_state_mutex) in per_peer_state.iter() {
                                let mut peer_state_lock = peer_state_mutex.lock().unwrap();
@@ -2625,7 +3354,7 @@ where
                // the same channel.
                let mut res = Vec::with_capacity(self.short_to_chan_info.read().unwrap().len());
                {
-                       let best_block_height = self.best_block.read().unwrap().height();
+                       let best_block_height = self.best_block.read().unwrap().height;
                        let per_peer_state = self.per_peer_state.read().unwrap();
                        for (_cp_id, peer_state_mutex) in per_peer_state.iter() {
                                let mut peer_state_lock = peer_state_mutex.lock().unwrap();
@@ -2655,7 +3384,7 @@ where
 
        /// Gets the list of channels we have with a given counterparty, in random order.
        pub fn list_channels_with_counterparty(&self, counterparty_node_id: &PublicKey) -> Vec<ChannelDetails> {
-               let best_block_height = self.best_block.read().unwrap().height();
+               let best_block_height = self.best_block.read().unwrap().height;
                let per_peer_state = self.per_peer_state.read().unwrap();
 
                if let Some(peer_state_mutex) = per_peer_state.get(counterparty_node_id) {
@@ -2858,7 +3587,7 @@ where
                        let receiver = HTLCDestination::NextHopChannel { node_id: Some(counterparty_node_id), channel_id };
                        self.fail_htlc_backwards_internal(&source, &payment_hash, &reason, receiver);
                }
-               if let Some((_, funding_txo, monitor_update)) = shutdown_res.monitor_update {
+               if let Some((_, funding_txo, _channel_id, monitor_update)) = shutdown_res.monitor_update {
                        // There isn't anything we can do if we get an update failure - we're already
                        // force-closing. The monitor update on the required in-memory copy should broadcast
                        // the latest local state, which is the best we can do anyway. Thus, it is safe to
@@ -2939,6 +3668,13 @@ where
                                                // Unfunded channel has no update
                                                (None, chan_phase.context().get_counterparty_node_id())
                                        },
+                                       // TODO(dual_funding): Combine this match arm with above once #[cfg(dual_funding)] is removed.
+                                       #[cfg(dual_funding)]
+                                       ChannelPhase::UnfundedOutboundV2(_) | ChannelPhase::UnfundedInboundV2(_) => {
+                                               self.finish_close_channel(chan_phase.context_mut().force_shutdown(false, closure_reason));
+                                               // Unfunded channel has no update
+                                               (None, chan_phase.context().get_counterparty_node_id())
+                                       },
                                }
                        } else if peer_state.inbound_channel_request_by_id.remove(channel_id).is_some() {
                                log_error!(logger, "Force-closing channel {}", &channel_id);
@@ -2951,17 +3687,11 @@ where
                        }
                };
                if let Some(update) = update_opt {
-                       // Try to send the `BroadcastChannelUpdate` to the peer we just force-closed on, but if
-                       // not try to broadcast it via whatever peer we have.
-                       let per_peer_state = self.per_peer_state.read().unwrap();
-                       let a_peer_state_opt = per_peer_state.get(peer_node_id)
-                               .ok_or(per_peer_state.values().next());
-                       if let Ok(a_peer_state_mutex) = a_peer_state_opt {
-                               let mut a_peer_state = a_peer_state_mutex.lock().unwrap();
-                               a_peer_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
-                                       msg: update
-                               });
-                       }
+                       // If we have some Channel Update to broadcast, we cache it and broadcast it later.
+                       let mut pending_broadcast_messages = self.pending_broadcast_messages.lock().unwrap();
+                       pending_broadcast_messages.push(events::MessageSendEvent::BroadcastChannelUpdate {
+                               msg: update
+                       });
                }
 
                Ok(counterparty_node_id)
@@ -3002,8 +3732,8 @@ where
        /// the latest local transaction(s). Fails if `channel_id` is unknown to the manager, or if the
        /// `counterparty_node_id` isn't the counterparty of the corresponding channel.
        ///
-       /// You can always get the latest local transaction(s) to broadcast from
-       /// [`ChannelMonitor::get_latest_holder_commitment_txn`].
+       /// You can always broadcast the latest local transaction(s) via
+       /// [`ChannelMonitor::broadcast_latest_holder_commitment_txn`].
        pub fn force_close_without_broadcasting_txn(&self, channel_id: &ChannelId, counterparty_node_id: &PublicKey)
        -> Result<(), APIError> {
                self.force_close_sending_error(channel_id, counterparty_node_id, false)
@@ -3025,6 +3755,163 @@ where
                }
        }
 
+       fn can_forward_htlc_to_outgoing_channel(
+               &self, chan: &mut Channel<SP>, msg: &msgs::UpdateAddHTLC, next_packet: &NextPacketDetails
+       ) -> Result<(), (&'static str, u16, Option<msgs::ChannelUpdate>)> {
+               if !chan.context.should_announce() && !self.default_configuration.accept_forwards_to_priv_channels {
+                       // Note that the behavior here should be identical to the above block - we
+                       // should NOT reveal the existence or non-existence of a private channel if
+                       // we don't allow forwards outbound over them.
+                       return Err(("Refusing to forward to a private channel based on our config.", 0x4000 | 10, None));
+               }
+               if chan.context.get_channel_type().supports_scid_privacy() && next_packet.outgoing_scid != chan.context.outbound_scid_alias() {
+                       // `option_scid_alias` (referred to in LDK as `scid_privacy`) means
+                       // "refuse to forward unless the SCID alias was used", so we pretend
+                       // we don't have the channel here.
+                       return Err(("Refusing to forward over real channel SCID as our counterparty requested.", 0x4000 | 10, None));
+               }
+
+               // Note that we could technically not return an error yet here and just hope
+               // that the connection is reestablished or monitor updated by the time we get
+               // around to doing the actual forward, but better to fail early if we can and
+               // hopefully an attacker trying to path-trace payments cannot make this occur
+               // on a small/per-node/per-channel scale.
+               if !chan.context.is_live() { // channel_disabled
+                       // If the channel_update we're going to return is disabled (i.e. the
+                       // peer has been disabled for some time), return `channel_disabled`,
+                       // otherwise return `temporary_channel_failure`.
+                       let chan_update_opt = self.get_channel_update_for_onion(next_packet.outgoing_scid, chan).ok();
+                       if chan_update_opt.as_ref().map(|u| u.contents.flags & 2 == 2).unwrap_or(false) {
+                               return Err(("Forwarding channel has been disconnected for some time.", 0x1000 | 20, chan_update_opt));
+                       } else {
+                               return Err(("Forwarding channel is not in a ready state.", 0x1000 | 7, chan_update_opt));
+                       }
+               }
+               if next_packet.outgoing_amt_msat < chan.context.get_counterparty_htlc_minimum_msat() { // amount_below_minimum
+                       let chan_update_opt = self.get_channel_update_for_onion(next_packet.outgoing_scid, chan).ok();
+                       return Err(("HTLC amount was below the htlc_minimum_msat", 0x1000 | 11, chan_update_opt));
+               }
+               if let Err((err, code)) = chan.htlc_satisfies_config(msg, next_packet.outgoing_amt_msat, next_packet.outgoing_cltv_value) {
+                       let chan_update_opt = self.get_channel_update_for_onion(next_packet.outgoing_scid, chan).ok();
+                       return Err((err, code, chan_update_opt));
+               }
+
+               Ok(())
+       }
+
+       /// Executes a callback `C` that returns some value `X` on the channel found with the given
+       /// `scid`. `None` is returned when the channel is not found.
+       fn do_funded_channel_callback<X, C: Fn(&mut Channel<SP>) -> X>(
+               &self, scid: u64, callback: C,
+       ) -> Option<X> {
+               let (counterparty_node_id, channel_id) = match self.short_to_chan_info.read().unwrap().get(&scid).cloned() {
+                       None => return None,
+                       Some((cp_id, id)) => (cp_id, id),
+               };
+               let per_peer_state = self.per_peer_state.read().unwrap();
+               let peer_state_mutex_opt = per_peer_state.get(&counterparty_node_id);
+               if peer_state_mutex_opt.is_none() {
+                       return None;
+               }
+               let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap();
+               let peer_state = &mut *peer_state_lock;
+               match peer_state.channel_by_id.get_mut(&channel_id).and_then(
+                       |chan_phase| if let ChannelPhase::Funded(chan) = chan_phase { Some(chan) } else { None }
+               ) {
+                       None => None,
+                       Some(chan) => Some(callback(chan)),
+               }
+       }
+
+       fn can_forward_htlc(
+               &self, msg: &msgs::UpdateAddHTLC, next_packet_details: &NextPacketDetails
+       ) -> Result<(), (&'static str, u16, Option<msgs::ChannelUpdate>)> {
+               match self.do_funded_channel_callback(next_packet_details.outgoing_scid, |chan: &mut Channel<SP>| {
+                       self.can_forward_htlc_to_outgoing_channel(chan, msg, next_packet_details)
+               }) {
+                       Some(Ok(())) => {},
+                       Some(Err(e)) => return Err(e),
+                       None => {
+                               // If we couldn't find the channel info for the scid, it may be a phantom or
+                               // intercept forward.
+                               if (self.default_configuration.accept_intercept_htlcs &&
+                                       fake_scid::is_valid_intercept(&self.fake_scid_rand_bytes, next_packet_details.outgoing_scid, &self.chain_hash)) ||
+                                       fake_scid::is_valid_phantom(&self.fake_scid_rand_bytes, next_packet_details.outgoing_scid, &self.chain_hash)
+                               {} else {
+                                       return Err(("Don't have available channel for forwarding as requested.", 0x4000 | 10, None));
+                               }
+                       }
+               }
+
+               let cur_height = self.best_block.read().unwrap().height + 1;
+               if let Err((err_msg, err_code)) = check_incoming_htlc_cltv(
+                       cur_height, next_packet_details.outgoing_cltv_value, msg.cltv_expiry
+               ) {
+                       let chan_update_opt = self.do_funded_channel_callback(next_packet_details.outgoing_scid, |chan: &mut Channel<SP>| {
+                               self.get_channel_update_for_onion(next_packet_details.outgoing_scid, chan).ok()
+                       }).flatten();
+                       return Err((err_msg, err_code, chan_update_opt));
+               }
+
+               Ok(())
+       }
+
+       fn htlc_failure_from_update_add_err(
+               &self, msg: &msgs::UpdateAddHTLC, counterparty_node_id: &PublicKey, err_msg: &'static str,
+               mut err_code: u16, chan_update: Option<msgs::ChannelUpdate>, is_intro_node_blinded_forward: bool,
+               shared_secret: &[u8; 32]
+       ) -> HTLCFailureMsg {
+               let mut res = VecWriter(Vec::with_capacity(chan_update.serialized_length() + 2 + 8 + 2));
+               if chan_update.is_some() && err_code & 0x1000 == 0x1000 {
+                       let chan_update = chan_update.unwrap();
+                       if err_code == 0x1000 | 11 || err_code == 0x1000 | 12 {
+                               msg.amount_msat.write(&mut res).expect("Writes cannot fail");
+                       }
+                       else if err_code == 0x1000 | 13 {
+                               msg.cltv_expiry.write(&mut res).expect("Writes cannot fail");
+                       }
+                       else if err_code == 0x1000 | 20 {
+                               // TODO: underspecified, follow https://github.com/lightning/bolts/issues/791
+                               0u16.write(&mut res).expect("Writes cannot fail");
+                       }
+                       (chan_update.serialized_length() as u16 + 2).write(&mut res).expect("Writes cannot fail");
+                       msgs::ChannelUpdate::TYPE.write(&mut res).expect("Writes cannot fail");
+                       chan_update.write(&mut res).expect("Writes cannot fail");
+               } else if err_code & 0x1000 == 0x1000 {
+                       // If we're trying to return an error that requires a `channel_update` but
+                       // we're forwarding to a phantom or intercept "channel" (i.e. cannot
+                       // generate an update), just use the generic "temporary_node_failure"
+                       // instead.
+                       err_code = 0x2000 | 2;
+               }
+
+               log_info!(
+                       WithContext::from(&self.logger, Some(*counterparty_node_id), Some(msg.channel_id)),
+                       "Failed to accept/forward incoming HTLC: {}", err_msg
+               );
+               // If `msg.blinding_point` is set, we must always fail with malformed.
+               if msg.blinding_point.is_some() {
+                       return HTLCFailureMsg::Malformed(msgs::UpdateFailMalformedHTLC {
+                               channel_id: msg.channel_id,
+                               htlc_id: msg.htlc_id,
+                               sha256_of_onion: [0; 32],
+                               failure_code: INVALID_ONION_BLINDING,
+                       });
+               }
+
+               let (err_code, err_data) = if is_intro_node_blinded_forward {
+                       (INVALID_ONION_BLINDING, &[0; 32][..])
+               } else {
+                       (err_code, &res.0[..])
+               };
+               HTLCFailureMsg::Relay(msgs::UpdateFailHTLC {
+                       channel_id: msg.channel_id,
+                       htlc_id: msg.htlc_id,
+                       reason: HTLCFailReason::reason(err_code, err_data.to_vec())
+                               .get_encrypted_failure_packet(shared_secret, &None),
+               })
+       }
+
        fn decode_update_add_htlc_onion(
                &self, msg: &msgs::UpdateAddHTLC, counterparty_node_id: &PublicKey,
        ) -> Result<
@@ -3034,48 +3921,7 @@ where
                        msg, &self.node_signer, &self.logger, &self.secp_ctx
                )?;
 
-               let is_intro_node_forward = match next_hop {
-                       onion_utils::Hop::Forward {
-                               next_hop_data: msgs::InboundOnionPayload::BlindedForward {
-                                       intro_node_blinding_point: Some(_), ..
-                               }, ..
-                       } => true,
-                       _ => false,
-               };
-
-               macro_rules! return_err {
-                       ($msg: expr, $err_code: expr, $data: expr) => {
-                               {
-                                       log_info!(
-                                               WithContext::from(&self.logger, Some(*counterparty_node_id), Some(msg.channel_id)),
-                                               "Failed to accept/forward incoming HTLC: {}", $msg
-                                       );
-                                       // If `msg.blinding_point` is set, we must always fail with malformed.
-                                       if msg.blinding_point.is_some() {
-                                               return Err(HTLCFailureMsg::Malformed(msgs::UpdateFailMalformedHTLC {
-                                                       channel_id: msg.channel_id,
-                                                       htlc_id: msg.htlc_id,
-                                                       sha256_of_onion: [0; 32],
-                                                       failure_code: INVALID_ONION_BLINDING,
-                                               }));
-                                       }
-
-                                       let (err_code, err_data) = if is_intro_node_forward {
-                                               (INVALID_ONION_BLINDING, &[0; 32][..])
-                                       } else { ($err_code, $data) };
-                                       return Err(HTLCFailureMsg::Relay(msgs::UpdateFailHTLC {
-                                               channel_id: msg.channel_id,
-                                               htlc_id: msg.htlc_id,
-                                               reason: HTLCFailReason::reason(err_code, err_data.to_vec())
-                                                       .get_encrypted_failure_packet(&shared_secret, &None),
-                                       }));
-                               }
-                       }
-               }
-
-               let NextPacketDetails {
-                       next_packet_pubkey, outgoing_amt_msat, outgoing_scid, outgoing_cltv_value
-               } = match next_packet_details_opt {
+               let next_packet_details = match next_packet_details_opt {
                        Some(next_packet_details) => next_packet_details,
                        // it is a receive, so no need for outbound checks
                        None => return Ok((next_hop, shared_secret, None)),
@@ -3083,124 +3929,15 @@ where
 
                // Perform outbound checks here instead of in [`Self::construct_pending_htlc_info`] because we
                // can't hold the outbound peer state lock at the same time as the inbound peer state lock.
-               if let Some((err, mut code, chan_update)) = loop {
-                       let id_option = self.short_to_chan_info.read().unwrap().get(&outgoing_scid).cloned();
-                       let forwarding_chan_info_opt = match id_option {
-                               None => { // unknown_next_peer
-                                       // Note that this is likely a timing oracle for detecting whether an scid is a
-                                       // phantom or an intercept.
-                                       if (self.default_configuration.accept_intercept_htlcs &&
-                                               fake_scid::is_valid_intercept(&self.fake_scid_rand_bytes, outgoing_scid, &self.chain_hash)) ||
-                                               fake_scid::is_valid_phantom(&self.fake_scid_rand_bytes, outgoing_scid, &self.chain_hash)
-                                       {
-                                               None
-                                       } else {
-                                               break Some(("Don't have available channel for forwarding as requested.", 0x4000 | 10, None));
-                                       }
-                               },
-                               Some((cp_id, id)) => Some((cp_id.clone(), id.clone())),
-                       };
-                       let chan_update_opt = if let Some((counterparty_node_id, forwarding_id)) = forwarding_chan_info_opt {
-                               let per_peer_state = self.per_peer_state.read().unwrap();
-                               let peer_state_mutex_opt = per_peer_state.get(&counterparty_node_id);
-                               if peer_state_mutex_opt.is_none() {
-                                       break Some(("Don't have available channel for forwarding as requested.", 0x4000 | 10, None));
-                               }
-                               let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap();
-                               let peer_state = &mut *peer_state_lock;
-                               let chan = match peer_state.channel_by_id.get_mut(&forwarding_id).map(
-                                       |chan_phase| if let ChannelPhase::Funded(chan) = chan_phase { Some(chan) } else { None }
-                               ).flatten() {
-                                       None => {
-                                               // Channel was removed. The short_to_chan_info and channel_by_id maps
-                                               // have no consistency guarantees.
-                                               break Some(("Don't have available channel for forwarding as requested.", 0x4000 | 10, None));
-                                       },
-                                       Some(chan) => chan
-                               };
-                               if !chan.context.should_announce() && !self.default_configuration.accept_forwards_to_priv_channels {
-                                       // Note that the behavior here should be identical to the above block - we
-                                       // should NOT reveal the existence or non-existence of a private channel if
-                                       // we don't allow forwards outbound over them.
-                                       break Some(("Refusing to forward to a private channel based on our config.", 0x4000 | 10, None));
-                               }
-                               if chan.context.get_channel_type().supports_scid_privacy() && outgoing_scid != chan.context.outbound_scid_alias() {
-                                       // `option_scid_alias` (referred to in LDK as `scid_privacy`) means
-                                       // "refuse to forward unless the SCID alias was used", so we pretend
-                                       // we don't have the channel here.
-                                       break Some(("Refusing to forward over real channel SCID as our counterparty requested.", 0x4000 | 10, None));
-                               }
-                               let chan_update_opt = self.get_channel_update_for_onion(outgoing_scid, chan).ok();
-
-                               // Note that we could technically not return an error yet here and just hope
-                               // that the connection is reestablished or monitor updated by the time we get
-                               // around to doing the actual forward, but better to fail early if we can and
-                               // hopefully an attacker trying to path-trace payments cannot make this occur
-                               // on a small/per-node/per-channel scale.
-                               if !chan.context.is_live() { // channel_disabled
-                                       // If the channel_update we're going to return is disabled (i.e. the
-                                       // peer has been disabled for some time), return `channel_disabled`,
-                                       // otherwise return `temporary_channel_failure`.
-                                       if chan_update_opt.as_ref().map(|u| u.contents.flags & 2 == 2).unwrap_or(false) {
-                                               break Some(("Forwarding channel has been disconnected for some time.", 0x1000 | 20, chan_update_opt));
-                                       } else {
-                                               break Some(("Forwarding channel is not in a ready state.", 0x1000 | 7, chan_update_opt));
-                                       }
-                               }
-                               if outgoing_amt_msat < chan.context.get_counterparty_htlc_minimum_msat() { // amount_below_minimum
-                                       break Some(("HTLC amount was below the htlc_minimum_msat", 0x1000 | 11, chan_update_opt));
-                               }
-                               if let Err((err, code)) = chan.htlc_satisfies_config(&msg, outgoing_amt_msat, outgoing_cltv_value) {
-                                       break Some((err, code, chan_update_opt));
-                               }
-                               chan_update_opt
-                       } else {
-                               None
-                       };
-
-                       let cur_height = self.best_block.read().unwrap().height() + 1;
-
-                       if let Err((err_msg, code)) = check_incoming_htlc_cltv(
-                               cur_height, outgoing_cltv_value, msg.cltv_expiry
-                       ) {
-                               if code & 0x1000 != 0 && chan_update_opt.is_none() {
-                                       // We really should set `incorrect_cltv_expiry` here but as we're not
-                                       // forwarding over a real channel we can't generate a channel_update
-                                       // for it. Instead we just return a generic temporary_node_failure.
-                                       break Some((err_msg, 0x2000 | 2, None))
-                               }
-                               let chan_update_opt = if code & 0x1000 != 0 { chan_update_opt } else { None };
-                               break Some((err_msg, code, chan_update_opt));
-                       }
+               self.can_forward_htlc(&msg, &next_packet_details).map_err(|e| {
+                       let (err_msg, err_code, chan_update_opt) = e;
+                       self.htlc_failure_from_update_add_err(
+                               msg, counterparty_node_id, err_msg, err_code, chan_update_opt,
+                               next_hop.is_intro_node_blinded_forward(), &shared_secret
+                       )
+               })?;
 
-                       break None;
-               }
-               {
-                       let mut res = VecWriter(Vec::with_capacity(chan_update.serialized_length() + 2 + 8 + 2));
-                       if let Some(chan_update) = chan_update {
-                               if code == 0x1000 | 11 || code == 0x1000 | 12 {
-                                       msg.amount_msat.write(&mut res).expect("Writes cannot fail");
-                               }
-                               else if code == 0x1000 | 13 {
-                                       msg.cltv_expiry.write(&mut res).expect("Writes cannot fail");
-                               }
-                               else if code == 0x1000 | 20 {
-                                       // TODO: underspecified, follow https://github.com/lightning/bolts/issues/791
-                                       0u16.write(&mut res).expect("Writes cannot fail");
-                               }
-                               (chan_update.serialized_length() as u16 + 2).write(&mut res).expect("Writes cannot fail");
-                               msgs::ChannelUpdate::TYPE.write(&mut res).expect("Writes cannot fail");
-                               chan_update.write(&mut res).expect("Writes cannot fail");
-                       } else if code & 0x1000 == 0x1000 {
-                               // If we're trying to return an error that requires a `channel_update` but
-                               // we're forwarding to a phantom or intercept "channel" (i.e. cannot
-                               // generate an update), just use the generic "temporary_node_failure"
-                               // instead.
-                               code = 0x2000 | 2;
-                       }
-                       return_err!(err, code, &res.0[..]);
-               }
-               Ok((next_hop, shared_secret, Some(next_packet_pubkey)))
+               Ok((next_hop, shared_secret, Some(next_packet_details.next_packet_pubkey)))
        }
 
        fn construct_pending_htlc_status<'a>(
@@ -3235,7 +3972,7 @@ where
                match decoded_hop {
                        onion_utils::Hop::Receive(next_hop_data) => {
                                // OUR PAYMENT!
-                               let current_height: u32 = self.best_block.read().unwrap().height();
+                               let current_height: u32 = self.best_block.read().unwrap().height;
                                match create_recv_pending_htlc_info(next_hop_data, shared_secret, msg.payment_hash,
                                        msg.amount_msat, msg.cltv_expiry, None, allow_underpay, msg.skimmed_fee_msat,
                                        current_height, self.default_configuration.accept_mpp_keysend)
@@ -3495,7 +4232,7 @@ where
        /// [`PeerManager::process_events`]: crate::ln::peer_handler::PeerManager::process_events
        /// [`ChannelMonitorUpdateStatus::InProgress`]: crate::chain::ChannelMonitorUpdateStatus::InProgress
        pub fn send_payment_with_route(&self, route: &Route, payment_hash: PaymentHash, recipient_onion: RecipientOnionFields, payment_id: PaymentId) -> Result<(), PaymentSendFailure> {
-               let best_block_height = self.best_block.read().unwrap().height();
+               let best_block_height = self.best_block.read().unwrap().height;
                let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self);
                self.pending_outbound_payments
                        .send_payment_with_route(route, payment_hash, recipient_onion, payment_id,
@@ -3506,7 +4243,7 @@ where
        /// Similar to [`ChannelManager::send_payment_with_route`], but will automatically find a route based on
        /// `route_params` and retry failed payment paths based on `retry_strategy`.
        pub fn send_payment(&self, payment_hash: PaymentHash, recipient_onion: RecipientOnionFields, payment_id: PaymentId, route_params: RouteParameters, retry_strategy: Retry) -> Result<(), RetryableSendFailure> {
-               let best_block_height = self.best_block.read().unwrap().height();
+               let best_block_height = self.best_block.read().unwrap().height;
                let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self);
                self.pending_outbound_payments
                        .send_payment(payment_hash, recipient_onion, payment_id, retry_strategy, route_params,
@@ -3517,7 +4254,7 @@ where
 
        #[cfg(test)]
        pub(super) fn test_send_payment_internal(&self, route: &Route, payment_hash: PaymentHash, recipient_onion: RecipientOnionFields, keysend_preimage: Option<PaymentPreimage>, payment_id: PaymentId, recv_value_msat: Option<u64>, onion_session_privs: Vec<[u8; 32]>) -> Result<(), PaymentSendFailure> {
-               let best_block_height = self.best_block.read().unwrap().height();
+               let best_block_height = self.best_block.read().unwrap().height;
                let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self);
                self.pending_outbound_payments.test_send_payment_internal(route, payment_hash, recipient_onion,
                        keysend_preimage, payment_id, recv_value_msat, onion_session_privs, &self.node_signer,
@@ -3526,7 +4263,7 @@ where
 
        #[cfg(test)]
        pub(crate) fn test_add_new_pending_payment(&self, payment_hash: PaymentHash, recipient_onion: RecipientOnionFields, payment_id: PaymentId, route: &Route) -> Result<Vec<[u8; 32]>, PaymentSendFailure> {
-               let best_block_height = self.best_block.read().unwrap().height();
+               let best_block_height = self.best_block.read().unwrap().height;
                self.pending_outbound_payments.test_add_new_pending_payment(payment_hash, recipient_onion, payment_id, route, None, &self.entropy_source, best_block_height)
        }
 
@@ -3536,7 +4273,7 @@ where
        }
 
        pub(super) fn send_payment_for_bolt12_invoice(&self, invoice: &Bolt12Invoice, payment_id: PaymentId) -> Result<(), Bolt12PaymentError> {
-               let best_block_height = self.best_block.read().unwrap().height();
+               let best_block_height = self.best_block.read().unwrap().height;
                let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self);
                self.pending_outbound_payments
                        .send_payment_for_bolt12_invoice(
@@ -3593,7 +4330,7 @@ where
        ///
        /// [`send_payment`]: Self::send_payment
        pub fn send_spontaneous_payment(&self, route: &Route, payment_preimage: Option<PaymentPreimage>, recipient_onion: RecipientOnionFields, payment_id: PaymentId) -> Result<PaymentHash, PaymentSendFailure> {
-               let best_block_height = self.best_block.read().unwrap().height();
+               let best_block_height = self.best_block.read().unwrap().height;
                let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self);
                self.pending_outbound_payments.send_spontaneous_payment_with_route(
                        route, payment_preimage, recipient_onion, payment_id, &self.entropy_source,
@@ -3608,7 +4345,7 @@ where
        ///
        /// [`PaymentParameters::for_keysend`]: crate::routing::router::PaymentParameters::for_keysend
        pub fn send_spontaneous_payment_with_retry(&self, payment_preimage: Option<PaymentPreimage>, recipient_onion: RecipientOnionFields, payment_id: PaymentId, route_params: RouteParameters, retry_strategy: Retry) -> Result<PaymentHash, RetryableSendFailure> {
-               let best_block_height = self.best_block.read().unwrap().height();
+               let best_block_height = self.best_block.read().unwrap().height;
                let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self);
                self.pending_outbound_payments.send_spontaneous_payment(payment_preimage, recipient_onion,
                        payment_id, retry_strategy, route_params, &self.router, self.list_usable_channels(),
@@ -3620,7 +4357,7 @@ where
        /// [`PaymentHash`] of probes based on a static secret and a random [`PaymentId`], which allows
        /// us to easily discern them from real payments.
        pub fn send_probe(&self, path: Path) -> Result<(PaymentHash, PaymentId), PaymentSendFailure> {
-               let best_block_height = self.best_block.read().unwrap().height();
+               let best_block_height = self.best_block.read().unwrap().height;
                let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self);
                self.pending_outbound_payments.send_probe(path, self.probing_cookie_secret,
                        &self.entropy_source, &self.node_signer, best_block_height,
@@ -3682,7 +4419,7 @@ where
                                ProbeSendFailure::RouteNotFound
                        })?;
 
-               let mut used_liquidity_map = HashMap::with_capacity(first_hops.len());
+               let mut used_liquidity_map = hash_map_with_capacity(first_hops.len());
 
                let mut res = Vec::new();
 
@@ -3898,7 +4635,7 @@ where
                        }));
                }
                {
-                       let height = self.best_block.read().unwrap().height();
+                       let height = self.best_block.read().unwrap().height;
                        // Transactions are evaluated as final by network mempools if their locktime is strictly
                        // lower than the next block height. However, the modules constituting our Lightning
                        // node might not have perfect sync about their blockchain views. Thus, if the wallet
@@ -3957,7 +4694,10 @@ where
                                        }
                                        let outpoint = OutPoint { txid: tx.txid(), index: output_index.unwrap() };
                                        if let Some(funding_batch_state) = funding_batch_state.as_mut() {
-                                               funding_batch_state.push((outpoint.to_channel_id(), *counterparty_node_id, false));
+                                               // TODO(dual_funding): We only do batch funding for V1 channels at the moment, but we'll probably
+                                               // need to fix this somehow to not rely on using the outpoint for the channel ID if we
+                                               // want to support V2 batching here as well.
+                                               funding_batch_state.push((ChannelId::v1_from_funding_outpoint(outpoint), *counterparty_node_id, false));
                                        }
                                        Ok(outpoint)
                                })
@@ -4034,6 +4774,7 @@ where
                        .ok_or_else(|| APIError::ChannelUnavailable { err: format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id) })?;
                let mut peer_state_lock = peer_state_mutex.lock().unwrap();
                let peer_state = &mut *peer_state_lock;
+
                for channel_id in channel_ids {
                        if !peer_state.has_channel(channel_id) {
                                return Err(APIError::ChannelUnavailable {
@@ -4050,7 +4791,8 @@ where
                                }
                                if let ChannelPhase::Funded(channel) = channel_phase {
                                        if let Ok(msg) = self.get_channel_update_for_broadcast(channel) {
-                                               peer_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { msg });
+                                               let mut pending_broadcast_messages = self.pending_broadcast_messages.lock().unwrap();
+                                               pending_broadcast_messages.push(events::MessageSendEvent::BroadcastChannelUpdate { msg });
                                        } else if let Ok(msg) = self.get_channel_update_for_unicast(channel) {
                                                peer_state.pending_msg_events.push(events::MessageSendEvent::SendChannelUpdate {
                                                        node_id: channel.context.get_counterparty_node_id(),
@@ -4182,6 +4924,7 @@ where
                let mut per_source_pending_forward = [(
                        payment.prev_short_channel_id,
                        payment.prev_funding_outpoint,
+                       payment.prev_channel_id,
                        payment.prev_user_channel_id,
                        vec![(pending_htlc_info, payment.prev_htlc_id)]
                )];
@@ -4209,6 +4952,7 @@ where
                                short_channel_id: payment.prev_short_channel_id,
                                user_channel_id: Some(payment.prev_user_channel_id),
                                outpoint: payment.prev_funding_outpoint,
+                               channel_id: payment.prev_channel_id,
                                htlc_id: payment.prev_htlc_id,
                                incoming_packet_shared_secret: payment.forward_info.incoming_shared_secret,
                                phantom_shared_secret: None,
@@ -4223,6 +4967,145 @@ where
                Ok(())
        }
 
+       fn process_pending_update_add_htlcs(&self) {
+               let mut decode_update_add_htlcs = new_hash_map();
+               mem::swap(&mut decode_update_add_htlcs, &mut self.decode_update_add_htlcs.lock().unwrap());
+
+               let get_failed_htlc_destination = |outgoing_scid_opt: Option<u64>, payment_hash: PaymentHash| {
+                       if let Some(outgoing_scid) = outgoing_scid_opt {
+                               match self.short_to_chan_info.read().unwrap().get(&outgoing_scid) {
+                                       Some((outgoing_counterparty_node_id, outgoing_channel_id)) =>
+                                               HTLCDestination::NextHopChannel {
+                                                       node_id: Some(*outgoing_counterparty_node_id),
+                                                       channel_id: *outgoing_channel_id,
+                                               },
+                                       None => HTLCDestination::UnknownNextHop {
+                                               requested_forward_scid: outgoing_scid,
+                                       },
+                               }
+                       } else {
+                               HTLCDestination::FailedPayment { payment_hash }
+                       }
+               };
+
+               'outer_loop: for (incoming_scid, update_add_htlcs) in decode_update_add_htlcs {
+                       let incoming_channel_details_opt = self.do_funded_channel_callback(incoming_scid, |chan: &mut Channel<SP>| {
+                               let counterparty_node_id = chan.context.get_counterparty_node_id();
+                               let channel_id = chan.context.channel_id();
+                               let funding_txo = chan.context.get_funding_txo().unwrap();
+                               let user_channel_id = chan.context.get_user_id();
+                               let accept_underpaying_htlcs = chan.context.config().accept_underpaying_htlcs;
+                               (counterparty_node_id, channel_id, funding_txo, user_channel_id, accept_underpaying_htlcs)
+                       });
+                       let (
+                               incoming_counterparty_node_id, incoming_channel_id, incoming_funding_txo,
+                               incoming_user_channel_id, incoming_accept_underpaying_htlcs
+                        ) = if let Some(incoming_channel_details) = incoming_channel_details_opt {
+                               incoming_channel_details
+                       } else {
+                               // The incoming channel no longer exists, HTLCs should be resolved onchain instead.
+                               continue;
+                       };
+
+                       let mut htlc_forwards = Vec::new();
+                       let mut htlc_fails = Vec::new();
+                       for update_add_htlc in &update_add_htlcs {
+                               let (next_hop, shared_secret, next_packet_details_opt) = match decode_incoming_update_add_htlc_onion(
+                                       &update_add_htlc, &self.node_signer, &self.logger, &self.secp_ctx
+                               ) {
+                                       Ok(decoded_onion) => decoded_onion,
+                                       Err(htlc_fail) => {
+                                               htlc_fails.push((htlc_fail, HTLCDestination::InvalidOnion));
+                                               continue;
+                                       },
+                               };
+
+                               let is_intro_node_blinded_forward = next_hop.is_intro_node_blinded_forward();
+                               let outgoing_scid_opt = next_packet_details_opt.as_ref().map(|d| d.outgoing_scid);
+
+                               // Process the HTLC on the incoming channel.
+                               match self.do_funded_channel_callback(incoming_scid, |chan: &mut Channel<SP>| {
+                                       let logger = WithChannelContext::from(&self.logger, &chan.context);
+                                       chan.can_accept_incoming_htlc(
+                                               update_add_htlc, &self.fee_estimator, &logger,
+                                       )
+                               }) {
+                                       Some(Ok(_)) => {},
+                                       Some(Err((err, code))) => {
+                                               let outgoing_chan_update_opt = if let Some(outgoing_scid) = outgoing_scid_opt.as_ref() {
+                                                       self.do_funded_channel_callback(*outgoing_scid, |chan: &mut Channel<SP>| {
+                                                               self.get_channel_update_for_onion(*outgoing_scid, chan).ok()
+                                                       }).flatten()
+                                               } else {
+                                                       None
+                                               };
+                                               let htlc_fail = self.htlc_failure_from_update_add_err(
+                                                       &update_add_htlc, &incoming_counterparty_node_id, err, code,
+                                                       outgoing_chan_update_opt, is_intro_node_blinded_forward, &shared_secret,
+                                               );
+                                               let htlc_destination = get_failed_htlc_destination(outgoing_scid_opt, update_add_htlc.payment_hash);
+                                               htlc_fails.push((htlc_fail, htlc_destination));
+                                               continue;
+                                       },
+                                       // The incoming channel no longer exists, HTLCs should be resolved onchain instead.
+                                       None => continue 'outer_loop,
+                               }
+
+                               // Now process the HTLC on the outgoing channel if it's a forward.
+                               if let Some(next_packet_details) = next_packet_details_opt.as_ref() {
+                                       if let Err((err, code, chan_update_opt)) = self.can_forward_htlc(
+                                               &update_add_htlc, next_packet_details
+                                       ) {
+                                               let htlc_fail = self.htlc_failure_from_update_add_err(
+                                                       &update_add_htlc, &incoming_counterparty_node_id, err, code,
+                                                       chan_update_opt, is_intro_node_blinded_forward, &shared_secret,
+                                               );
+                                               let htlc_destination = get_failed_htlc_destination(outgoing_scid_opt, update_add_htlc.payment_hash);
+                                               htlc_fails.push((htlc_fail, htlc_destination));
+                                               continue;
+                                       }
+                               }
+
+                               match self.construct_pending_htlc_status(
+                                       &update_add_htlc, &incoming_counterparty_node_id, shared_secret, next_hop,
+                                       incoming_accept_underpaying_htlcs, next_packet_details_opt.map(|d| d.next_packet_pubkey),
+                               ) {
+                                       PendingHTLCStatus::Forward(htlc_forward) => {
+                                               htlc_forwards.push((htlc_forward, update_add_htlc.htlc_id));
+                                       },
+                                       PendingHTLCStatus::Fail(htlc_fail) => {
+                                               let htlc_destination = get_failed_htlc_destination(outgoing_scid_opt, update_add_htlc.payment_hash);
+                                               htlc_fails.push((htlc_fail, htlc_destination));
+                                       },
+                               }
+                       }
+
+                       // Process all of the forwards and failures for the channel in which the HTLCs were
+                       // proposed to as a batch.
+                       let pending_forwards = (incoming_scid, incoming_funding_txo, incoming_channel_id,
+                               incoming_user_channel_id, htlc_forwards.drain(..).collect());
+                       self.forward_htlcs_without_forward_event(&mut [pending_forwards]);
+                       for (htlc_fail, htlc_destination) in htlc_fails.drain(..) {
+                               let failure = match htlc_fail {
+                                       HTLCFailureMsg::Relay(fail_htlc) => HTLCForwardInfo::FailHTLC {
+                                               htlc_id: fail_htlc.htlc_id,
+                                               err_packet: fail_htlc.reason,
+                                       },
+                                       HTLCFailureMsg::Malformed(fail_malformed_htlc) => HTLCForwardInfo::FailMalformedHTLC {
+                                               htlc_id: fail_malformed_htlc.htlc_id,
+                                               sha256_of_onion: fail_malformed_htlc.sha256_of_onion,
+                                               failure_code: fail_malformed_htlc.failure_code,
+                                       },
+                               };
+                               self.forward_htlcs.lock().unwrap().entry(incoming_scid).or_insert(vec![]).push(failure);
+                               self.pending_events.lock().unwrap().push_back((events::Event::HTLCHandlingFailed {
+                                       prev_channel_id: incoming_channel_id,
+                                       failed_next_destination: htlc_destination,
+                               }, None));
+                       }
+               }
+       }
+
        /// Processes HTLCs which are pending waiting on random forward delay.
        ///
        /// Should only really ever be called in response to a PendingHTLCsForwardable event.
@@ -4230,11 +5113,13 @@ where
        pub fn process_pending_htlc_forwards(&self) {
                let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self);
 
+               self.process_pending_update_add_htlcs();
+
                let mut new_events = VecDeque::new();
                let mut failed_forwards = Vec::new();
-               let mut phantom_receives: Vec<(u64, OutPoint, u128, Vec<(PendingHTLCInfo, u64)>)> = Vec::new();
+               let mut phantom_receives: Vec<(u64, OutPoint, ChannelId, u128, Vec<(PendingHTLCInfo, u64)>)> = Vec::new();
                {
-                       let mut forward_htlcs = HashMap::new();
+                       let mut forward_htlcs = new_hash_map();
                        mem::swap(&mut forward_htlcs, &mut self.forward_htlcs.lock().unwrap());
 
                        for (short_chan_id, mut pending_forwards) in forward_htlcs {
@@ -4245,20 +5130,21 @@ where
                                                        for forward_info in pending_forwards.drain(..) {
                                                                match forward_info {
                                                                        HTLCForwardInfo::AddHTLC(PendingAddHTLCInfo {
-                                                                               prev_short_channel_id, prev_htlc_id, prev_funding_outpoint, prev_user_channel_id,
-                                                                               forward_info: PendingHTLCInfo {
+                                                                               prev_short_channel_id, prev_htlc_id, prev_channel_id, prev_funding_outpoint,
+                                                                               prev_user_channel_id, forward_info: PendingHTLCInfo {
                                                                                        routing, incoming_shared_secret, payment_hash, outgoing_amt_msat,
                                                                                        outgoing_cltv_value, ..
                                                                                }
                                                                        }) => {
                                                                                macro_rules! failure_handler {
                                                                                        ($msg: expr, $err_code: expr, $err_data: expr, $phantom_ss: expr, $next_hop_unknown: expr) => {
-                                                                                               let logger = WithContext::from(&self.logger, forwarding_counterparty, Some(prev_funding_outpoint.to_channel_id()));
+                                                                                               let logger = WithContext::from(&self.logger, forwarding_counterparty, Some(prev_channel_id));
                                                                                                log_info!(logger, "Failed to accept/forward incoming HTLC: {}", $msg);
 
                                                                                                let htlc_source = HTLCSource::PreviousHopData(HTLCPreviousHopData {
                                                                                                        short_channel_id: prev_short_channel_id,
                                                                                                        user_channel_id: Some(prev_user_channel_id),
+                                                                                                       channel_id: prev_channel_id,
                                                                                                        outpoint: prev_funding_outpoint,
                                                                                                        htlc_id: prev_htlc_id,
                                                                                                        incoming_packet_shared_secret: incoming_shared_secret,
@@ -4316,13 +5202,13 @@ where
                                                                                                };
                                                                                                match next_hop {
                                                                                                        onion_utils::Hop::Receive(hop_data) => {
-                                                                                                               let current_height: u32 = self.best_block.read().unwrap().height();
+                                                                                                               let current_height: u32 = self.best_block.read().unwrap().height;
                                                                                                                match create_recv_pending_htlc_info(hop_data,
                                                                                                                        incoming_shared_secret, payment_hash, outgoing_amt_msat,
                                                                                                                        outgoing_cltv_value, Some(phantom_shared_secret), false, None,
                                                                                                                        current_height, self.default_configuration.accept_mpp_keysend)
                                                                                                                {
-                                                                                                                       Ok(info) => phantom_receives.push((prev_short_channel_id, prev_funding_outpoint, prev_user_channel_id, vec![(info, prev_htlc_id)])),
+                                                                                                                       Ok(info) => phantom_receives.push((prev_short_channel_id, prev_funding_outpoint, prev_channel_id, prev_user_channel_id, vec![(info, prev_htlc_id)])),
                                                                                                                        Err(InboundHTLCErr { err_code, err_data, msg }) => failed_payment!(msg, err_code, err_data, Some(phantom_shared_secret))
                                                                                                                }
                                                                                                        },
@@ -4367,8 +5253,8 @@ where
                                                for forward_info in pending_forwards.drain(..) {
                                                        let queue_fail_htlc_res = match forward_info {
                                                                HTLCForwardInfo::AddHTLC(PendingAddHTLCInfo {
-                                                                       prev_short_channel_id, prev_htlc_id, prev_funding_outpoint, prev_user_channel_id,
-                                                                       forward_info: PendingHTLCInfo {
+                                                                       prev_short_channel_id, prev_htlc_id, prev_channel_id, prev_funding_outpoint,
+                                                                       prev_user_channel_id, forward_info: PendingHTLCInfo {
                                                                                incoming_shared_secret, payment_hash, outgoing_amt_msat, outgoing_cltv_value,
                                                                                routing: PendingHTLCRouting::Forward {
                                                                                        onion_packet, blinded, ..
@@ -4379,6 +5265,7 @@ where
                                                                        let htlc_source = HTLCSource::PreviousHopData(HTLCPreviousHopData {
                                                                                short_channel_id: prev_short_channel_id,
                                                                                user_channel_id: Some(prev_user_channel_id),
+                                                                               channel_id: prev_channel_id,
                                                                                outpoint: prev_funding_outpoint,
                                                                                htlc_id: prev_htlc_id,
                                                                                incoming_packet_shared_secret: incoming_shared_secret,
@@ -4450,8 +5337,8 @@ where
                                        'next_forwardable_htlc: for forward_info in pending_forwards.drain(..) {
                                                match forward_info {
                                                        HTLCForwardInfo::AddHTLC(PendingAddHTLCInfo {
-                                                               prev_short_channel_id, prev_htlc_id, prev_funding_outpoint, prev_user_channel_id,
-                                                               forward_info: PendingHTLCInfo {
+                                                               prev_short_channel_id, prev_htlc_id, prev_channel_id, prev_funding_outpoint,
+                                                               prev_user_channel_id, forward_info: PendingHTLCInfo {
                                                                        routing, incoming_shared_secret, payment_hash, incoming_amt_msat, outgoing_amt_msat,
                                                                        skimmed_fee_msat, ..
                                                                }
@@ -4459,16 +5346,20 @@ where
                                                                let blinded_failure = routing.blinded_failure();
                                                                let (cltv_expiry, onion_payload, payment_data, phantom_shared_secret, mut onion_fields) = match routing {
                                                                        PendingHTLCRouting::Receive {
-                                                                               payment_data, payment_metadata, incoming_cltv_expiry, phantom_shared_secret,
-                                                                               custom_tlvs, requires_blinded_error: _
+                                                                               payment_data, payment_metadata, payment_context,
+                                                                               incoming_cltv_expiry, phantom_shared_secret, custom_tlvs,
+                                                                               requires_blinded_error: _
                                                                        } => {
                                                                                let _legacy_hop_data = Some(payment_data.clone());
                                                                                let onion_fields = RecipientOnionFields { payment_secret: Some(payment_data.payment_secret),
                                                                                                payment_metadata, custom_tlvs };
-                                                                               (incoming_cltv_expiry, OnionPayload::Invoice { _legacy_hop_data },
+                                                                               (incoming_cltv_expiry, OnionPayload::Invoice { _legacy_hop_data, payment_context },
                                                                                        Some(payment_data), phantom_shared_secret, onion_fields)
                                                                        },
-                                                                       PendingHTLCRouting::ReceiveKeysend { payment_data, payment_preimage, payment_metadata, incoming_cltv_expiry, custom_tlvs } => {
+                                                                       PendingHTLCRouting::ReceiveKeysend {
+                                                                               payment_data, payment_preimage, payment_metadata,
+                                                                               incoming_cltv_expiry, custom_tlvs, requires_blinded_error: _
+                                                                       } => {
                                                                                let onion_fields = RecipientOnionFields {
                                                                                        payment_secret: payment_data.as_ref().map(|data| data.payment_secret),
                                                                                        payment_metadata,
@@ -4485,6 +5376,7 @@ where
                                                                        prev_hop: HTLCPreviousHopData {
                                                                                short_channel_id: prev_short_channel_id,
                                                                                user_channel_id: Some(prev_user_channel_id),
+                                                                               channel_id: prev_channel_id,
                                                                                outpoint: prev_funding_outpoint,
                                                                                htlc_id: prev_htlc_id,
                                                                                incoming_packet_shared_secret: incoming_shared_secret,
@@ -4511,11 +5403,12 @@ where
                                                                                debug_assert!(!committed_to_claimable);
                                                                                let mut htlc_msat_height_data = $htlc.value.to_be_bytes().to_vec();
                                                                                htlc_msat_height_data.extend_from_slice(
-                                                                                       &self.best_block.read().unwrap().height().to_be_bytes(),
+                                                                                       &self.best_block.read().unwrap().height.to_be_bytes(),
                                                                                );
                                                                                failed_forwards.push((HTLCSource::PreviousHopData(HTLCPreviousHopData {
                                                                                                short_channel_id: $htlc.prev_hop.short_channel_id,
                                                                                                user_channel_id: $htlc.prev_hop.user_channel_id,
+                                                                                               channel_id: prev_channel_id,
                                                                                                outpoint: prev_funding_outpoint,
                                                                                                htlc_id: $htlc.prev_hop.htlc_id,
                                                                                                incoming_packet_shared_secret: $htlc.prev_hop.incoming_packet_shared_secret,
@@ -4538,10 +5431,7 @@ where
                                                                macro_rules! check_total_value {
                                                                        ($purpose: expr) => {{
                                                                                let mut payment_claimable_generated = false;
-                                                                               let is_keysend = match $purpose {
-                                                                                       events::PaymentPurpose::SpontaneousPayment(_) => true,
-                                                                                       events::PaymentPurpose::InvoicePayment { .. } => false,
-                                                                               };
+                                                                               let is_keysend = $purpose.is_keysend();
                                                                                let mut claimable_payments = self.claimable_payments.lock().unwrap();
                                                                                if claimable_payments.pending_claiming_payments.contains_key(&payment_hash) {
                                                                                        fail_htlc!(claimable_htlc, payment_hash);
@@ -4596,7 +5486,6 @@ where
                                                                                        #[allow(unused_assignments)] {
                                                                                                committed_to_claimable = true;
                                                                                        }
-                                                                                       let prev_channel_id = prev_funding_outpoint.to_channel_id();
                                                                                        htlcs.push(claimable_htlc);
                                                                                        let amount_msat = htlcs.iter().map(|htlc| htlc.value).sum();
                                                                                        htlcs.iter_mut().for_each(|htlc| htlc.total_value_received = Some(amount_msat));
@@ -4649,14 +5538,14 @@ where
                                                                                                        }
                                                                                                };
                                                                                                if let Some(min_final_cltv_expiry_delta) = min_final_cltv_expiry_delta {
-                                                                                                       let expected_min_expiry_height = (self.current_best_block().height() + min_final_cltv_expiry_delta as u32) as u64;
+                                                                                                       let expected_min_expiry_height = (self.current_best_block().height + min_final_cltv_expiry_delta as u32) as u64;
                                                                                                        if (cltv_expiry as u64) < expected_min_expiry_height {
                                                                                                                log_trace!(self.logger, "Failing new HTLC with payment_hash {} as its CLTV expiry was too soon (had {}, earliest expected {})",
                                                                                                                        &payment_hash, cltv_expiry, expected_min_expiry_height);
                                                                                                                fail_htlc!(claimable_htlc, payment_hash);
                                                                                                        }
                                                                                                }
-                                                                                               let purpose = events::PaymentPurpose::InvoicePayment {
+                                                                                               let purpose = events::PaymentPurpose::Bolt11InvoicePayment {
                                                                                                        payment_preimage: payment_preimage.clone(),
                                                                                                        payment_secret: payment_data.payment_secret,
                                                                                                };
@@ -4682,7 +5571,7 @@ where
                                                                                                &payment_hash, payment_data.total_msat, inbound_payment.get().min_value_msat.unwrap());
                                                                                        fail_htlc!(claimable_htlc, payment_hash);
                                                                                } else {
-                                                                                       let purpose = events::PaymentPurpose::InvoicePayment {
+                                                                                       let purpose = events::PaymentPurpose::Bolt11InvoicePayment {
                                                                                                payment_preimage: inbound_payment.get().payment_preimage,
                                                                                                payment_secret: payment_data.payment_secret,
                                                                                        };
@@ -4703,7 +5592,7 @@ where
                        }
                }
 
-               let best_block_height = self.best_block.read().unwrap().height();
+               let best_block_height = self.best_block.read().unwrap().height;
                self.pending_outbound_payments.check_retry_payments(&self.router, || self.list_usable_channels(),
                        || self.compute_inflight_htlcs(), &self.entropy_source, &self.node_signer, best_block_height,
                        &self.pending_events, &self.logger, |args| self.send_payment_along_path(args));
@@ -4740,19 +5629,19 @@ where
 
                for event in background_events.drain(..) {
                        match event {
-                               BackgroundEvent::ClosedMonitorUpdateRegeneratedOnStartup((funding_txo, update)) => {
+                               BackgroundEvent::ClosedMonitorUpdateRegeneratedOnStartup((funding_txo, _channel_id, update)) => {
                                        // The channel has already been closed, so no use bothering to care about the
                                        // monitor updating completing.
                                        let _ = self.chain_monitor.update_channel(funding_txo, &update);
                                },
-                               BackgroundEvent::MonitorUpdateRegeneratedOnStartup { counterparty_node_id, funding_txo, update } => {
+                               BackgroundEvent::MonitorUpdateRegeneratedOnStartup { counterparty_node_id, funding_txo, channel_id, update } => {
                                        let mut updated_chan = false;
                                        {
                                                let per_peer_state = self.per_peer_state.read().unwrap();
                                                if let Some(peer_state_mutex) = per_peer_state.get(&counterparty_node_id) {
                                                        let mut peer_state_lock = peer_state_mutex.lock().unwrap();
                                                        let peer_state = &mut *peer_state_lock;
-                                                       match peer_state.channel_by_id.entry(funding_txo.to_channel_id()) {
+                                                       match peer_state.channel_by_id.entry(channel_id) {
                                                                hash_map::Entry::Occupied(mut chan_phase) => {
                                                                        if let ChannelPhase::Funded(chan) = chan_phase.get_mut() {
                                                                                updated_chan = true;
@@ -4806,10 +5695,6 @@ where
 
                // If the feerate has decreased by less than half, don't bother
                if new_feerate <= chan.context.get_feerate_sat_per_1000_weight() && new_feerate * 2 > chan.context.get_feerate_sat_per_1000_weight() {
-                       if new_feerate != chan.context.get_feerate_sat_per_1000_weight() {
-                               log_trace!(logger, "Channel {} does not qualify for a feerate change from {} to {}.",
-                               chan_id, chan.context.get_feerate_sat_per_1000_weight(), new_feerate);
-                       }
                        return NotifyOption::SkipPersistNoEvents;
                }
                if !chan.context.is_live() {
@@ -4955,7 +5840,8 @@ where
                                                                                if n >= DISABLE_GOSSIP_TICKS {
                                                                                        chan.set_channel_update_status(ChannelUpdateStatus::Disabled);
                                                                                        if let Ok(update) = self.get_channel_update_for_broadcast(&chan) {
-                                                                                               pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
+                                                                                               let mut pending_broadcast_messages = self.pending_broadcast_messages.lock().unwrap();
+                                                                                               pending_broadcast_messages.push(events::MessageSendEvent::BroadcastChannelUpdate {
                                                                                                        msg: update
                                                                                                });
                                                                                        }
@@ -4969,7 +5855,8 @@ where
                                                                                if n >= ENABLE_GOSSIP_TICKS {
                                                                                        chan.set_channel_update_status(ChannelUpdateStatus::Enabled);
                                                                                        if let Ok(update) = self.get_channel_update_for_broadcast(&chan) {
-                                                                                               pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
+                                                                                               let mut pending_broadcast_messages = self.pending_broadcast_messages.lock().unwrap();
+                                                                                               pending_broadcast_messages.push(events::MessageSendEvent::BroadcastChannelUpdate {
                                                                                                        msg: update
                                                                                                });
                                                                                        }
@@ -5008,6 +5895,16 @@ where
                                                                process_unfunded_channel_tick(chan_id, &mut chan.context, &mut chan.unfunded_context,
                                                                        pending_msg_events, counterparty_node_id)
                                                        },
+                                                       #[cfg(dual_funding)]
+                                                       ChannelPhase::UnfundedInboundV2(chan) => {
+                                                               process_unfunded_channel_tick(chan_id, &mut chan.context, &mut chan.unfunded_context,
+                                                                       pending_msg_events, counterparty_node_id)
+                                                       },
+                                                       #[cfg(dual_funding)]
+                                                       ChannelPhase::UnfundedOutboundV2(chan) => {
+                                                               process_unfunded_channel_tick(chan_id, &mut chan.context, &mut chan.unfunded_context,
+                                                                       pending_msg_events, counterparty_node_id)
+                                                       },
                                                }
                                        });
 
@@ -5168,7 +6065,7 @@ where
                        FailureCode::RequiredNodeFeatureMissing => HTLCFailReason::from_failure_code(failure_code.into()),
                        FailureCode::IncorrectOrUnknownPaymentDetails => {
                                let mut htlc_msat_height_data = htlc.value.to_be_bytes().to_vec();
-                               htlc_msat_height_data.extend_from_slice(&self.best_block.read().unwrap().height().to_be_bytes());
+                               htlc_msat_height_data.extend_from_slice(&self.best_block.read().unwrap().height.to_be_bytes());
                                HTLCFailReason::reason(failure_code.into(), htlc_msat_height_data)
                        },
                        FailureCode::InvalidOnionPayload(data) => {
@@ -5262,9 +6159,14 @@ where
                }
        }
 
+       fn fail_htlc_backwards_internal(&self, source: &HTLCSource, payment_hash: &PaymentHash, onion_error: &HTLCFailReason, destination: HTLCDestination) {
+               let push_forward_event = self.fail_htlc_backwards_internal_without_forward_event(source, payment_hash, onion_error, destination);
+               if push_forward_event { self.push_pending_forwards_ev(); }
+       }
+
        /// Fails an HTLC backwards to the sender of it to us.
        /// Note that we do not assume that channels corresponding to failed HTLCs are still available.
-       fn fail_htlc_backwards_internal(&self, source: &HTLCSource, payment_hash: &PaymentHash, onion_error: &HTLCFailReason, destination: HTLCDestination) {
+       fn fail_htlc_backwards_internal_without_forward_event(&self, source: &HTLCSource, payment_hash: &PaymentHash, onion_error: &HTLCFailReason, destination: HTLCDestination) -> bool {
                // Ensure that no peer state channel storage lock is held when calling this function.
                // This ensures that future code doesn't introduce a lock-order requirement for
                // `forward_htlcs` to be locked after the `per_peer_state` peer locks, which calling
@@ -5282,19 +6184,19 @@ where
                // Note that we MUST NOT end up calling methods on self.chain_monitor here - we're called
                // from block_connected which may run during initialization prior to the chain_monitor
                // being fully configured. See the docs for `ChannelManagerReadArgs` for more.
+               let mut push_forward_event;
                match source {
                        HTLCSource::OutboundRoute { ref path, ref session_priv, ref payment_id, .. } => {
-                               if self.pending_outbound_payments.fail_htlc(source, payment_hash, onion_error, path,
+                               push_forward_event = self.pending_outbound_payments.fail_htlc(source, payment_hash, onion_error, path,
                                        session_priv, payment_id, self.probing_cookie_secret, &self.secp_ctx,
-                                       &self.pending_events, &self.logger)
-                               { self.push_pending_forwards_ev(); }
+                                       &self.pending_events, &self.logger);
                        },
                        HTLCSource::PreviousHopData(HTLCPreviousHopData {
                                ref short_channel_id, ref htlc_id, ref incoming_packet_shared_secret,
-                               ref phantom_shared_secret, ref outpoint, ref blinded_failure, ..
+                               ref phantom_shared_secret, outpoint: _, ref blinded_failure, ref channel_id, ..
                        }) => {
                                log_trace!(
-                                       WithContext::from(&self.logger, None, Some(outpoint.to_channel_id())),
+                                       WithContext::from(&self.logger, None, Some(*channel_id)),
                                        "Failing {}HTLC with payment_hash {} backwards from us: {:?}",
                                        if blinded_failure.is_some() { "blinded " } else { "" }, &payment_hash, onion_error
                                );
@@ -5321,11 +6223,9 @@ where
                                        }
                                };
 
-                               let mut push_forward_ev = false;
+                               push_forward_event = self.decode_update_add_htlcs.lock().unwrap().is_empty();
                                let mut forward_htlcs = self.forward_htlcs.lock().unwrap();
-                               if forward_htlcs.is_empty() {
-                                       push_forward_ev = true;
-                               }
+                               push_forward_event &= forward_htlcs.is_empty();
                                match forward_htlcs.entry(*short_channel_id) {
                                        hash_map::Entry::Occupied(mut entry) => {
                                                entry.get_mut().push(failure);
@@ -5335,14 +6235,14 @@ where
                                        }
                                }
                                mem::drop(forward_htlcs);
-                               if push_forward_ev { self.push_pending_forwards_ev(); }
                                let mut pending_events = self.pending_events.lock().unwrap();
                                pending_events.push_back((events::Event::HTLCHandlingFailed {
-                                       prev_channel_id: outpoint.to_channel_id(),
+                                       prev_channel_id: *channel_id,
                                        failed_next_destination: destination,
                                }, None));
                        },
                }
+               push_forward_event
        }
 
        /// Provides a payment preimage in response to [`Event::PaymentClaimable`], generating any
@@ -5479,7 +6379,7 @@ where
                }
                if valid_mpp {
                        for htlc in sources.drain(..) {
-                               let prev_hop_chan_id = htlc.prev_hop.outpoint.to_channel_id();
+                               let prev_hop_chan_id = htlc.prev_hop.channel_id;
                                if let Err((pk, err)) = self.claim_funds_from_hop(
                                        htlc.prev_hop, payment_preimage,
                                        |_, definitely_duplicate| {
@@ -5499,7 +6399,7 @@ where
                if !valid_mpp {
                        for htlc in sources.drain(..) {
                                let mut htlc_msat_height_data = htlc.value.to_be_bytes().to_vec();
-                               htlc_msat_height_data.extend_from_slice(&self.best_block.read().unwrap().height().to_be_bytes());
+                               htlc_msat_height_data.extend_from_slice(&self.best_block.read().unwrap().height.to_be_bytes());
                                let source = HTLCSource::PreviousHopData(htlc.prev_hop);
                                let reason = HTLCFailReason::reason(0x4000 | 15, htlc_msat_height_data);
                                let receiver = HTLCDestination::FailedPayment { payment_hash };
@@ -5532,7 +6432,7 @@ where
 
                {
                        let per_peer_state = self.per_peer_state.read().unwrap();
-                       let chan_id = prev_hop.outpoint.to_channel_id();
+                       let chan_id = prev_hop.channel_id;
                        let counterparty_node_id_opt = match self.short_to_chan_info.read().unwrap().get(&prev_hop.short_channel_id) {
                                Some((cp_id, _dup_chan_id)) => Some(cp_id.clone()),
                                None => None
@@ -5570,6 +6470,7 @@ where
                                                                                BackgroundEvent::MonitorUpdateRegeneratedOnStartup {
                                                                                        counterparty_node_id,
                                                                                        funding_txo: prev_hop.outpoint,
+                                                                                       channel_id: prev_hop.channel_id,
                                                                                        update: monitor_update.clone(),
                                                                                });
                                                                }
@@ -5584,13 +6485,13 @@ where
 
                                                                log_trace!(logger, "Completing monitor update completion action for channel {} as claim was redundant: {:?}",
                                                                        chan_id, action);
-                                                               let (node_id, funding_outpoint, blocker) =
+                                                               let (node_id, _funding_outpoint, channel_id, blocker) =
                                                                if let MonitorUpdateCompletionAction::FreeOtherChannelImmediately {
                                                                        downstream_counterparty_node_id: node_id,
                                                                        downstream_funding_outpoint: funding_outpoint,
-                                                                       blocking_action: blocker,
+                                                                       blocking_action: blocker, downstream_channel_id: channel_id,
                                                                } = action {
-                                                                       (node_id, funding_outpoint, blocker)
+                                                                       (node_id, funding_outpoint, channel_id, blocker)
                                                                } else {
                                                                        debug_assert!(false,
                                                                                "Duplicate claims should always free another channel immediately");
@@ -5600,7 +6501,7 @@ where
                                                                        let mut peer_state = peer_state_mtx.lock().unwrap();
                                                                        if let Some(blockers) = peer_state
                                                                                .actions_blocking_raa_monitor_updates
-                                                                               .get_mut(&funding_outpoint.to_channel_id())
+                                                                               .get_mut(&channel_id)
                                                                        {
                                                                                let mut found_blocker = false;
                                                                                blockers.retain(|iter| {
@@ -5629,6 +6530,7 @@ where
                        updates: vec![ChannelMonitorUpdateStep::PaymentPreimage {
                                payment_preimage,
                        }],
+                       channel_id: Some(prev_hop.channel_id),
                };
 
                if !during_init {
@@ -5640,7 +6542,8 @@ where
                                // with a preimage we *must* somehow manage to propagate it to the upstream
                                // channel, or we must have an ability to receive the same event and try
                                // again on restart.
-                               log_error!(WithContext::from(&self.logger, None, Some(prev_hop.outpoint.to_channel_id())), "Critical error: failed to update channel monitor with preimage {:?}: {:?}",
+                               log_error!(WithContext::from(&self.logger, None, Some(prev_hop.channel_id)),
+                                       "Critical error: failed to update channel monitor with preimage {:?}: {:?}",
                                        payment_preimage, update_res);
                        }
                } else {
@@ -5656,7 +6559,7 @@ where
                        // complete the monitor update completion action from `completion_action`.
                        self.pending_background_events.lock().unwrap().push(
                                BackgroundEvent::ClosedMonitorUpdateRegeneratedOnStartup((
-                                       prev_hop.outpoint, preimage_update,
+                                       prev_hop.outpoint, prev_hop.channel_id, preimage_update,
                                )));
                }
                // Note that we do process the completion action here. This totally could be a
@@ -5673,8 +6576,9 @@ where
        }
 
        fn claim_funds_internal(&self, source: HTLCSource, payment_preimage: PaymentPreimage,
-               forwarded_htlc_value_msat: Option<u64>, from_onchain: bool, startup_replay: bool,
-               next_channel_counterparty_node_id: Option<PublicKey>, next_channel_outpoint: OutPoint
+               forwarded_htlc_value_msat: Option<u64>, skimmed_fee_msat: Option<u64>, from_onchain: bool,
+               startup_replay: bool, next_channel_counterparty_node_id: Option<PublicKey>,
+               next_channel_outpoint: OutPoint, next_channel_id: ChannelId, next_user_channel_id: Option<u128>,
        ) {
                match source {
                        HTLCSource::OutboundRoute { session_priv, payment_id, path, .. } => {
@@ -5684,7 +6588,7 @@ where
                                        debug_assert_eq!(pubkey, path.hops[0].pubkey);
                                }
                                let ev_completion_action = EventCompletionAction::ReleaseRAAChannelMonitorUpdate {
-                                       channel_funding_outpoint: next_channel_outpoint,
+                                       channel_funding_outpoint: next_channel_outpoint, channel_id: next_channel_id,
                                        counterparty_node_id: path.hops[0].pubkey,
                                };
                                self.pending_outbound_payments.claim_htlc(payment_id, payment_preimage,
@@ -5692,7 +6596,8 @@ where
                                        &self.logger);
                        },
                        HTLCSource::PreviousHopData(hop_data) => {
-                               let prev_outpoint = hop_data.outpoint;
+                               let prev_channel_id = hop_data.channel_id;
+                               let prev_user_channel_id = hop_data.user_channel_id;
                                let completed_blocker = RAAMonitorUpdateBlockingAction::from_prev_hop_data(&hop_data);
                                #[cfg(debug_assertions)]
                                let claiming_chan_funding_outpoint = hop_data.outpoint;
@@ -5700,7 +6605,7 @@ where
                                        |htlc_claim_value_msat, definitely_duplicate| {
                                                let chan_to_release =
                                                        if let Some(node_id) = next_channel_counterparty_node_id {
-                                                               Some((node_id, next_channel_outpoint, completed_blocker))
+                                                               Some((node_id, next_channel_outpoint, next_channel_id, completed_blocker))
                                                        } else {
                                                                // We can only get `None` here if we are processing a
                                                                // `ChannelMonitor`-originated event, in which case we
@@ -5737,7 +6642,7 @@ where
                                                                                },
                                                                                // or the channel we'd unblock is already closed,
                                                                                BackgroundEvent::ClosedMonitorUpdateRegeneratedOnStartup(
-                                                                                       (funding_txo, monitor_update)
+                                                                                       (funding_txo, _channel_id, monitor_update)
                                                                                ) => {
                                                                                        if *funding_txo == next_channel_outpoint {
                                                                                                assert_eq!(monitor_update.updates.len(), 1);
@@ -5753,7 +6658,7 @@ where
                                                                                BackgroundEvent::MonitorUpdatesComplete {
                                                                                        channel_id, ..
                                                                                } =>
-                                                                                       *channel_id == claiming_chan_funding_outpoint.to_channel_id(),
+                                                                                       *channel_id == prev_channel_id,
                                                                        }
                                                                }), "{:?}", *background_events);
                                                        }
@@ -5763,21 +6668,27 @@ where
                                                                Some(MonitorUpdateCompletionAction::FreeOtherChannelImmediately {
                                                                        downstream_counterparty_node_id: other_chan.0,
                                                                        downstream_funding_outpoint: other_chan.1,
-                                                                       blocking_action: other_chan.2,
+                                                                       downstream_channel_id: other_chan.2,
+                                                                       blocking_action: other_chan.3,
                                                                })
                                                        } else { None }
                                                } else {
-                                                       let fee_earned_msat = if let Some(forwarded_htlc_value) = forwarded_htlc_value_msat {
+                                                       let total_fee_earned_msat = if let Some(forwarded_htlc_value) = forwarded_htlc_value_msat {
                                                                if let Some(claimed_htlc_value) = htlc_claim_value_msat {
                                                                        Some(claimed_htlc_value - forwarded_htlc_value)
                                                                } else { None }
                                                        } else { None };
+                                                       debug_assert!(skimmed_fee_msat <= total_fee_earned_msat,
+                                                               "skimmed_fee_msat must always be included in total_fee_earned_msat");
                                                        Some(MonitorUpdateCompletionAction::EmitEventAndFreeOtherChannel {
                                                                event: events::Event::PaymentForwarded {
-                                                                       fee_earned_msat,
+                                                                       prev_channel_id: Some(prev_channel_id),
+                                                                       next_channel_id: Some(next_channel_id),
+                                                                       prev_user_channel_id,
+                                                                       next_user_channel_id,
+                                                                       total_fee_earned_msat,
+                                                                       skimmed_fee_msat,
                                                                        claim_from_onchain_tx: from_onchain,
-                                                                       prev_channel_id: Some(prev_outpoint.to_channel_id()),
-                                                                       next_channel_id: Some(next_channel_outpoint.to_channel_id()),
                                                                        outbound_amount_forwarded_msat: forwarded_htlc_value_msat,
                                                                },
                                                                downstream_counterparty_and_funding_outpoint: chan_to_release,
@@ -5827,16 +6738,17 @@ where
                                        event, downstream_counterparty_and_funding_outpoint
                                } => {
                                        self.pending_events.lock().unwrap().push_back((event, None));
-                                       if let Some((node_id, funding_outpoint, blocker)) = downstream_counterparty_and_funding_outpoint {
-                                               self.handle_monitor_update_release(node_id, funding_outpoint, Some(blocker));
+                                       if let Some((node_id, funding_outpoint, channel_id, blocker)) = downstream_counterparty_and_funding_outpoint {
+                                               self.handle_monitor_update_release(node_id, funding_outpoint, channel_id, Some(blocker));
                                        }
                                },
                                MonitorUpdateCompletionAction::FreeOtherChannelImmediately {
-                                       downstream_counterparty_node_id, downstream_funding_outpoint, blocking_action,
+                                       downstream_counterparty_node_id, downstream_funding_outpoint, downstream_channel_id, blocking_action,
                                } => {
                                        self.handle_monitor_update_release(
                                                downstream_counterparty_node_id,
                                                downstream_funding_outpoint,
+                                               downstream_channel_id,
                                                Some(blocking_action),
                                        );
                                },
@@ -5849,24 +6761,31 @@ where
        fn handle_channel_resumption(&self, pending_msg_events: &mut Vec<MessageSendEvent>,
                channel: &mut Channel<SP>, raa: Option<msgs::RevokeAndACK>,
                commitment_update: Option<msgs::CommitmentUpdate>, order: RAACommitmentOrder,
-               pending_forwards: Vec<(PendingHTLCInfo, u64)>, funding_broadcastable: Option<Transaction>,
+               pending_forwards: Vec<(PendingHTLCInfo, u64)>, pending_update_adds: Vec<msgs::UpdateAddHTLC>,
+               funding_broadcastable: Option<Transaction>,
                channel_ready: Option<msgs::ChannelReady>, announcement_sigs: Option<msgs::AnnouncementSignatures>)
-       -> Option<(u64, OutPoint, u128, Vec<(PendingHTLCInfo, u64)>)> {
+       -> (Option<(u64, OutPoint, ChannelId, u128, Vec<(PendingHTLCInfo, u64)>)>, Option<(u64, Vec<msgs::UpdateAddHTLC>)>) {
                let logger = WithChannelContext::from(&self.logger, &channel.context);
-               log_trace!(logger, "Handling channel resumption for channel {} with {} RAA, {} commitment update, {} pending forwards, {}broadcasting funding, {} channel ready, {} announcement",
+               log_trace!(logger, "Handling channel resumption for channel {} with {} RAA, {} commitment update, {} pending forwards, {} pending update_add_htlcs, {}broadcasting funding, {} channel ready, {} announcement",
                        &channel.context.channel_id(),
                        if raa.is_some() { "an" } else { "no" },
-                       if commitment_update.is_some() { "a" } else { "no" }, pending_forwards.len(),
+                       if commitment_update.is_some() { "a" } else { "no" },
+                       pending_forwards.len(), pending_update_adds.len(),
                        if funding_broadcastable.is_some() { "" } else { "not " },
                        if channel_ready.is_some() { "sending" } else { "without" },
                        if announcement_sigs.is_some() { "sending" } else { "without" });
 
-               let mut htlc_forwards = None;
-
                let counterparty_node_id = channel.context.get_counterparty_node_id();
+               let short_channel_id = channel.context.get_short_channel_id().unwrap_or(channel.context.outbound_scid_alias());
+
+               let mut htlc_forwards = None;
                if !pending_forwards.is_empty() {
-                       htlc_forwards = Some((channel.context.get_short_channel_id().unwrap_or(channel.context.outbound_scid_alias()),
-                               channel.context.get_funding_txo().unwrap(), channel.context.get_user_id(), pending_forwards));
+                       htlc_forwards = Some((short_channel_id, channel.context.get_funding_txo().unwrap(),
+                               channel.context.channel_id(), channel.context.get_user_id(), pending_forwards));
+               }
+               let mut decode_update_add_htlcs = None;
+               if !pending_update_adds.is_empty() {
+                       decode_update_add_htlcs = Some((short_channel_id, pending_update_adds));
                }
 
                if let Some(msg) = channel_ready {
@@ -5917,10 +6836,10 @@ where
                        emit_channel_ready_event!(pending_events, channel);
                }
 
-               htlc_forwards
+               (htlc_forwards, decode_update_add_htlcs)
        }
 
-       fn channel_monitor_updated(&self, funding_txo: &OutPoint, highest_applied_update_id: u64, counterparty_node_id: Option<&PublicKey>) {
+       fn channel_monitor_updated(&self, funding_txo: &OutPoint, channel_id: &ChannelId, highest_applied_update_id: u64, counterparty_node_id: Option<&PublicKey>) {
                debug_assert!(self.total_consistency_lock.try_write().is_err()); // Caller holds read lock
 
                let counterparty_node_id = match counterparty_node_id {
@@ -5929,7 +6848,7 @@ where
                                // TODO: Once we can rely on the counterparty_node_id from the
                                // monitor event, this and the outpoint_to_peer map should be removed.
                                let outpoint_to_peer = self.outpoint_to_peer.lock().unwrap();
-                               match outpoint_to_peer.get(&funding_txo) {
+                               match outpoint_to_peer.get(funding_txo) {
                                        Some(cp_id) => cp_id.clone(),
                                        None => return,
                                }
@@ -5942,11 +6861,11 @@ where
                peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap();
                let peer_state = &mut *peer_state_lock;
                let channel =
-                       if let Some(ChannelPhase::Funded(chan)) = peer_state.channel_by_id.get_mut(&funding_txo.to_channel_id()) {
+                       if let Some(ChannelPhase::Funded(chan)) = peer_state.channel_by_id.get_mut(channel_id) {
                                chan
                        } else {
                                let update_actions = peer_state.monitor_update_blocked_actions
-                                       .remove(&funding_txo.to_channel_id()).unwrap_or(Vec::new());
+                                       .remove(&channel_id).unwrap_or(Vec::new());
                                mem::drop(peer_state_lock);
                                mem::drop(per_peer_state);
                                self.handle_monitor_update_completion_actions(update_actions);
@@ -6032,73 +6951,82 @@ where
                // happening and return an error. N.B. that we create channel with an outbound SCID of zero so
                // that we can delay allocating the SCID until after we're sure that the checks below will
                // succeed.
-               let mut channel = match peer_state.inbound_channel_request_by_id.remove(temporary_channel_id) {
+               let res = match peer_state.inbound_channel_request_by_id.remove(temporary_channel_id) {
                        Some(unaccepted_channel) => {
-                               let best_block_height = self.best_block.read().unwrap().height();
+                               let best_block_height = self.best_block.read().unwrap().height;
                                InboundV1Channel::new(&self.fee_estimator, &self.entropy_source, &self.signer_provider,
                                        counterparty_node_id.clone(), &self.channel_type_features(), &peer_state.latest_features,
                                        &unaccepted_channel.open_channel_msg, user_channel_id, &self.default_configuration, best_block_height,
-                                       &self.logger, accept_0conf).map_err(|e| {
-                                               let err_str = e.to_string();
-                                               log_error!(logger, "{}", err_str);
-
-                                               APIError::ChannelUnavailable { err: err_str }
-                                       })
-                               }
+                                       &self.logger, accept_0conf).map_err(|err| MsgHandleErrInternal::from_chan_no_close(err, *temporary_channel_id))
+                       },
                        _ => {
                                let err_str = "No such channel awaiting to be accepted.".to_owned();
                                log_error!(logger, "{}", err_str);
 
-                               Err(APIError::APIMisuseError { err: err_str })
+                               return Err(APIError::APIMisuseError { err: err_str });
                        }
-               }?;
+               };
 
-               if accept_0conf {
-                       // This should have been correctly configured by the call to InboundV1Channel::new.
-                       debug_assert!(channel.context.minimum_depth().unwrap() == 0);
-               } else if channel.context.get_channel_type().requires_zero_conf() {
-                       let send_msg_err_event = events::MessageSendEvent::HandleError {
-                               node_id: channel.context.get_counterparty_node_id(),
-                               action: msgs::ErrorAction::SendErrorMessage{
-                                       msg: msgs::ErrorMessage { channel_id: temporary_channel_id.clone(), data: "No zero confirmation channels accepted".to_owned(), }
+               match res {
+                       Err(err) => {
+                               mem::drop(peer_state_lock);
+                               mem::drop(per_peer_state);
+                               match handle_error!(self, Result::<(), MsgHandleErrInternal>::Err(err), *counterparty_node_id) {
+                                       Ok(_) => unreachable!("`handle_error` only returns Err as we've passed in an Err"),
+                                       Err(e) => {
+                                               return Err(APIError::ChannelUnavailable { err: e.err });
+                                       },
                                }
-                       };
-                       peer_state.pending_msg_events.push(send_msg_err_event);
-                       let err_str = "Please use accept_inbound_channel_from_trusted_peer_0conf to accept channels with zero confirmations.".to_owned();
-                       log_error!(logger, "{}", err_str);
+                       }
+                       Ok(mut channel) => {
+                               if accept_0conf {
+                                       // This should have been correctly configured by the call to InboundV1Channel::new.
+                                       debug_assert!(channel.context.minimum_depth().unwrap() == 0);
+                               } else if channel.context.get_channel_type().requires_zero_conf() {
+                                       let send_msg_err_event = events::MessageSendEvent::HandleError {
+                                               node_id: channel.context.get_counterparty_node_id(),
+                                               action: msgs::ErrorAction::SendErrorMessage{
+                                                       msg: msgs::ErrorMessage { channel_id: temporary_channel_id.clone(), data: "No zero confirmation channels accepted".to_owned(), }
+                                               }
+                                       };
+                                       peer_state.pending_msg_events.push(send_msg_err_event);
+                                       let err_str = "Please use accept_inbound_channel_from_trusted_peer_0conf to accept channels with zero confirmations.".to_owned();
+                                       log_error!(logger, "{}", err_str);
 
-                       return Err(APIError::APIMisuseError { err: err_str });
-               } else {
-                       // If this peer already has some channels, a new channel won't increase our number of peers
-                       // with unfunded channels, so as long as we aren't over the maximum number of unfunded
-                       // channels per-peer we can accept channels from a peer with existing ones.
-                       if is_only_peer_channel && peers_without_funded_channels >= MAX_UNFUNDED_CHANNEL_PEERS {
-                               let send_msg_err_event = events::MessageSendEvent::HandleError {
-                                       node_id: channel.context.get_counterparty_node_id(),
-                                       action: msgs::ErrorAction::SendErrorMessage{
-                                               msg: msgs::ErrorMessage { channel_id: temporary_channel_id.clone(), data: "Have too many peers with unfunded channels, not accepting new ones".to_owned(), }
-                                       }
-                               };
-                               peer_state.pending_msg_events.push(send_msg_err_event);
-                               let err_str = "Too many peers with unfunded channels, refusing to accept new ones".to_owned();
-                               log_error!(logger, "{}", err_str);
+                                       return Err(APIError::APIMisuseError { err: err_str });
+                               } else {
+                                       // If this peer already has some channels, a new channel won't increase our number of peers
+                                       // with unfunded channels, so as long as we aren't over the maximum number of unfunded
+                                       // channels per-peer we can accept channels from a peer with existing ones.
+                                       if is_only_peer_channel && peers_without_funded_channels >= MAX_UNFUNDED_CHANNEL_PEERS {
+                                               let send_msg_err_event = events::MessageSendEvent::HandleError {
+                                                       node_id: channel.context.get_counterparty_node_id(),
+                                                       action: msgs::ErrorAction::SendErrorMessage{
+                                                               msg: msgs::ErrorMessage { channel_id: temporary_channel_id.clone(), data: "Have too many peers with unfunded channels, not accepting new ones".to_owned(), }
+                                                       }
+                                               };
+                                               peer_state.pending_msg_events.push(send_msg_err_event);
+                                               let err_str = "Too many peers with unfunded channels, refusing to accept new ones".to_owned();
+                                               log_error!(logger, "{}", err_str);
 
-                               return Err(APIError::APIMisuseError { err: err_str });
-                       }
-               }
+                                               return Err(APIError::APIMisuseError { err: err_str });
+                                       }
+                               }
 
-               // Now that we know we have a channel, assign an outbound SCID alias.
-               let outbound_scid_alias = self.create_and_insert_outbound_scid_alias();
-               channel.context.set_outbound_scid_alias(outbound_scid_alias);
+                               // Now that we know we have a channel, assign an outbound SCID alias.
+                               let outbound_scid_alias = self.create_and_insert_outbound_scid_alias();
+                               channel.context.set_outbound_scid_alias(outbound_scid_alias);
 
-               peer_state.pending_msg_events.push(events::MessageSendEvent::SendAcceptChannel {
-                       node_id: channel.context.get_counterparty_node_id(),
-                       msg: channel.accept_inbound_channel(),
-               });
+                               peer_state.pending_msg_events.push(events::MessageSendEvent::SendAcceptChannel {
+                                       node_id: channel.context.get_counterparty_node_id(),
+                                       msg: channel.accept_inbound_channel(),
+                               });
 
-               peer_state.channel_by_id.insert(temporary_channel_id.clone(), ChannelPhase::UnfundedInboundV1(channel));
+                               peer_state.channel_by_id.insert(temporary_channel_id.clone(), ChannelPhase::UnfundedInboundV1(channel));
 
-               Ok(())
+                               Ok(())
+                       },
+               }
        }
 
        /// Gets the number of peers which match the given filter and do not have any funded, outbound,
@@ -6109,7 +7037,7 @@ where
        fn peers_without_funded_channels<Filter>(&self, maybe_count_peer: Filter) -> usize
        where Filter: Fn(&PeerState<SP>) -> bool {
                let mut peers_without_funded_channels = 0;
-               let best_block_height = self.best_block.read().unwrap().height();
+               let best_block_height = self.best_block.read().unwrap().height;
                {
                        let peer_state_lock = self.per_peer_state.read().unwrap();
                        for (_, peer_mtx) in peer_state_lock.iter() {
@@ -6144,9 +7072,25 @@ where
                                                num_unfunded_channels += 1;
                                        }
                                },
+                               // TODO(dual_funding): Combine this match arm with above once #[cfg(dual_funding)] is removed.
+                               #[cfg(dual_funding)]
+                               ChannelPhase::UnfundedInboundV2(chan) => {
+                                       // Only inbound V2 channels that are not 0conf and that we do not contribute to will be
+                                       // included in the unfunded count.
+                                       if chan.context.minimum_depth().unwrap_or(1) != 0 &&
+                                               chan.dual_funding_context.our_funding_satoshis == 0 {
+                                               num_unfunded_channels += 1;
+                                       }
+                               },
                                ChannelPhase::UnfundedOutboundV1(_) => {
                                        // Outbound channels don't contribute to the unfunded count in the DoS context.
                                        continue;
+                               },
+                               // TODO(dual_funding): Combine this match arm with above once #[cfg(dual_funding)] is removed.
+                               #[cfg(dual_funding)]
+                               ChannelPhase::UnfundedOutboundV2(_) => {
+                                       // Outbound channels don't contribute to the unfunded count in the DoS context.
+                                       continue;
                                }
                        }
                }
@@ -6156,12 +7100,14 @@ where
        fn internal_open_channel(&self, counterparty_node_id: &PublicKey, msg: &msgs::OpenChannel) -> Result<(), MsgHandleErrInternal> {
                // Note that the ChannelManager is NOT re-persisted on disk after this, so any changes are
                // likely to be lost on restart!
-               if msg.chain_hash != self.chain_hash {
-                       return Err(MsgHandleErrInternal::send_err_msg_no_close("Unknown genesis block hash".to_owned(), msg.temporary_channel_id.clone()));
+               if msg.common_fields.chain_hash != self.chain_hash {
+                       return Err(MsgHandleErrInternal::send_err_msg_no_close("Unknown genesis block hash".to_owned(),
+                                msg.common_fields.temporary_channel_id.clone()));
                }
 
                if !self.default_configuration.accept_inbound_channels {
-                       return Err(MsgHandleErrInternal::send_err_msg_no_close("No inbound channels accepted".to_owned(), msg.temporary_channel_id.clone()));
+                       return Err(MsgHandleErrInternal::send_err_msg_no_close("No inbound channels accepted".to_owned(),
+                                msg.common_fields.temporary_channel_id.clone()));
                }
 
                // Get the number of peers with channels, but without funded ones. We don't care too much
@@ -6174,7 +7120,9 @@ where
                let peer_state_mutex = per_peer_state.get(counterparty_node_id)
                    .ok_or_else(|| {
                                debug_assert!(false);
-                               MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.temporary_channel_id.clone())
+                               MsgHandleErrInternal::send_err_msg_no_close(
+                                       format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id),
+                                       msg.common_fields.temporary_channel_id.clone())
                        })?;
                let mut peer_state_lock = peer_state_mutex.lock().unwrap();
                let peer_state = &mut *peer_state_lock;
@@ -6188,34 +7136,36 @@ where
                {
                        return Err(MsgHandleErrInternal::send_err_msg_no_close(
                                "Have too many peers with unfunded channels, not accepting new ones".to_owned(),
-                               msg.temporary_channel_id.clone()));
+                               msg.common_fields.temporary_channel_id.clone()));
                }
 
-               let best_block_height = self.best_block.read().unwrap().height();
+               let best_block_height = self.best_block.read().unwrap().height;
                if Self::unfunded_channel_count(peer_state, best_block_height) >= MAX_UNFUNDED_CHANS_PER_PEER {
                        return Err(MsgHandleErrInternal::send_err_msg_no_close(
                                format!("Refusing more than {} unfunded channels.", MAX_UNFUNDED_CHANS_PER_PEER),
-                               msg.temporary_channel_id.clone()));
+                               msg.common_fields.temporary_channel_id.clone()));
                }
 
-               let channel_id = msg.temporary_channel_id;
+               let channel_id = msg.common_fields.temporary_channel_id;
                let channel_exists = peer_state.has_channel(&channel_id);
                if channel_exists {
-                       return Err(MsgHandleErrInternal::send_err_msg_no_close("temporary_channel_id collision for the same peer!".to_owned(), msg.temporary_channel_id.clone()));
+                       return Err(MsgHandleErrInternal::send_err_msg_no_close(
+                               "temporary_channel_id collision for the same peer!".to_owned(),
+                               msg.common_fields.temporary_channel_id.clone()));
                }
 
                // If we're doing manual acceptance checks on the channel, then defer creation until we're sure we want to accept.
                if self.default_configuration.manually_accept_inbound_channels {
                        let channel_type = channel::channel_type_from_open_channel(
-                                       &msg, &peer_state.latest_features, &self.channel_type_features()
+                                       &msg.common_fields, &peer_state.latest_features, &self.channel_type_features()
                                ).map_err(|e|
-                                       MsgHandleErrInternal::from_chan_no_close(e, msg.temporary_channel_id)
+                                       MsgHandleErrInternal::from_chan_no_close(e, msg.common_fields.temporary_channel_id)
                                )?;
                        let mut pending_events = self.pending_events.lock().unwrap();
                        pending_events.push_back((events::Event::OpenChannelRequest {
-                               temporary_channel_id: msg.temporary_channel_id.clone(),
+                               temporary_channel_id: msg.common_fields.temporary_channel_id.clone(),
                                counterparty_node_id: counterparty_node_id.clone(),
-                               funding_satoshis: msg.funding_satoshis,
+                               funding_satoshis: msg.common_fields.funding_satoshis,
                                push_msat: msg.push_msat,
                                channel_type,
                        }, None));
@@ -6235,17 +7185,21 @@ where
                        &self.default_configuration, best_block_height, &self.logger, /*is_0conf=*/false)
                {
                        Err(e) => {
-                               return Err(MsgHandleErrInternal::from_chan_no_close(e, msg.temporary_channel_id));
+                               return Err(MsgHandleErrInternal::from_chan_no_close(e, msg.common_fields.temporary_channel_id));
                        },
                        Ok(res) => res
                };
 
                let channel_type = channel.context.get_channel_type();
                if channel_type.requires_zero_conf() {
-                       return Err(MsgHandleErrInternal::send_err_msg_no_close("No zero confirmation channels accepted".to_owned(), msg.temporary_channel_id.clone()));
+                       return Err(MsgHandleErrInternal::send_err_msg_no_close(
+                               "No zero confirmation channels accepted".to_owned(),
+                               msg.common_fields.temporary_channel_id.clone()));
                }
                if channel_type.requires_anchors_zero_fee_htlc_tx() {
-                       return Err(MsgHandleErrInternal::send_err_msg_no_close("No channels with anchor outputs accepted".to_owned(), msg.temporary_channel_id.clone()));
+                       return Err(MsgHandleErrInternal::send_err_msg_no_close(
+                               "No channels with anchor outputs accepted".to_owned(),
+                               msg.common_fields.temporary_channel_id.clone()));
                }
 
                let outbound_scid_alias = self.create_and_insert_outbound_scid_alias();
@@ -6267,11 +7221,11 @@ where
                        let peer_state_mutex = per_peer_state.get(counterparty_node_id)
                                .ok_or_else(|| {
                                        debug_assert!(false);
-                                       MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.temporary_channel_id)
+                                       MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.common_fields.temporary_channel_id)
                                })?;
                        let mut peer_state_lock = peer_state_mutex.lock().unwrap();
                        let peer_state = &mut *peer_state_lock;
-                       match peer_state.channel_by_id.entry(msg.temporary_channel_id) {
+                       match peer_state.channel_by_id.entry(msg.common_fields.temporary_channel_id) {
                                hash_map::Entry::Occupied(mut phase) => {
                                        match phase.get_mut() {
                                                ChannelPhase::UnfundedOutboundV1(chan) => {
@@ -6279,16 +7233,16 @@ where
                                                        (chan.context.get_value_satoshis(), chan.context.get_funding_redeemscript().to_v0_p2wsh(), chan.context.get_user_id())
                                                },
                                                _ => {
-                                                       return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got an unexpected accept_channel message from peer with counterparty_node_id {}", counterparty_node_id), msg.temporary_channel_id));
+                                                       return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got an unexpected accept_channel message from peer with counterparty_node_id {}", counterparty_node_id), msg.common_fields.temporary_channel_id));
                                                }
                                        }
                                },
-                               hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.temporary_channel_id))
+                               hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.common_fields.temporary_channel_id))
                        }
                };
                let mut pending_events = self.pending_events.lock().unwrap();
                pending_events.push_back((events::Event::FundingGenerationReady {
-                       temporary_channel_id: msg.temporary_channel_id,
+                       temporary_channel_id: msg.common_fields.temporary_channel_id,
                        counterparty_node_id: *counterparty_node_id,
                        channel_value_satoshis: value,
                        output_script,
@@ -6559,6 +7513,14 @@ where
                                                let mut chan = remove_channel_phase!(self, chan_phase_entry);
                                                finish_shutdown = Some(chan.context_mut().force_shutdown(false, ClosureReason::CounterpartyCoopClosedUnfundedChannel));
                                        },
+                                       // TODO(dual_funding): Combine this match arm with above.
+                                       #[cfg(dual_funding)]
+                                       ChannelPhase::UnfundedInboundV2(_) | ChannelPhase::UnfundedOutboundV2(_) => {
+                                               let context = phase.context_mut();
+                                               log_error!(self.logger, "Immediately closing unfunded channel {} as peer asked to cooperatively shut it down (which is unnecessary)", &msg.channel_id);
+                                               let mut chan = remove_channel_phase!(self, chan_phase_entry);
+                                               finish_shutdown = Some(chan.context_mut().force_shutdown(false, ClosureReason::CounterpartyCoopClosedUnfundedChannel));
+                                       },
                                }
                        } else {
                                return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id))
@@ -6620,9 +7582,8 @@ where
                }
                if let Some(ChannelPhase::Funded(chan)) = chan_option {
                        if let Ok(update) = self.get_channel_update_for_broadcast(&chan) {
-                               let mut peer_state_lock = peer_state_mutex.lock().unwrap();
-                               let peer_state = &mut *peer_state_lock;
-                               peer_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
+                               let mut pending_broadcast_messages = self.pending_broadcast_messages.lock().unwrap();
+                               pending_broadcast_messages.push(events::MessageSendEvent::BroadcastChannelUpdate {
                                        msg: update
                                });
                        }
@@ -6659,7 +7620,7 @@ where
                match peer_state.channel_by_id.entry(msg.channel_id) {
                        hash_map::Entry::Occupied(mut chan_phase_entry) => {
                                if let ChannelPhase::Funded(chan) = chan_phase_entry.get_mut() {
-                                       let pending_forward_info = match decoded_hop_res {
+                                       let mut pending_forward_info = match decoded_hop_res {
                                                Ok((next_hop, shared_secret, next_packet_pk_opt)) =>
                                                        self.construct_pending_htlc_status(
                                                                msg, counterparty_node_id, shared_secret, next_hop,
@@ -6667,44 +7628,45 @@ where
                                                        ),
                                                Err(e) => PendingHTLCStatus::Fail(e)
                                        };
-                                       let create_pending_htlc_status = |chan: &Channel<SP>, pending_forward_info: PendingHTLCStatus, error_code: u16| {
+                                       let logger = WithChannelContext::from(&self.logger, &chan.context);
+                                       // If the update_add is completely bogus, the call will Err and we will close,
+                                       // but if we've sent a shutdown and they haven't acknowledged it yet, we just
+                                       // want to reject the new HTLC and fail it backwards instead of forwarding.
+                                       if let Err((_, error_code)) = chan.can_accept_incoming_htlc(&msg, &self.fee_estimator, &logger) {
                                                if msg.blinding_point.is_some() {
-                                                       return PendingHTLCStatus::Fail(HTLCFailureMsg::Malformed(
-                                                                       msgs::UpdateFailMalformedHTLC {
-                                                                               channel_id: msg.channel_id,
-                                                                               htlc_id: msg.htlc_id,
-                                                                               sha256_of_onion: [0; 32],
-                                                                               failure_code: INVALID_ONION_BLINDING,
-                                                                       }
-                                                       ))
-                                               }
-                                               // If the update_add is completely bogus, the call will Err and we will close,
-                                               // but if we've sent a shutdown and they haven't acknowledged it yet, we just
-                                               // want to reject the new HTLC and fail it backwards instead of forwarding.
-                                               match pending_forward_info {
-                                                       PendingHTLCStatus::Forward(PendingHTLCInfo {
-                                                               ref incoming_shared_secret, ref routing, ..
-                                                       }) => {
-                                                               let reason = if routing.blinded_failure().is_some() {
-                                                                       HTLCFailReason::reason(INVALID_ONION_BLINDING, vec![0; 32])
-                                                               } else if (error_code & 0x1000) != 0 {
-                                                                       let (real_code, error_data) = self.get_htlc_inbound_temp_fail_err_and_data(error_code, chan);
-                                                                       HTLCFailReason::reason(real_code, error_data)
-                                                               } else {
-                                                                       HTLCFailReason::from_failure_code(error_code)
-                                                               }.get_encrypted_failure_packet(incoming_shared_secret, &None);
-                                                               let msg = msgs::UpdateFailHTLC {
+                                                       pending_forward_info = PendingHTLCStatus::Fail(HTLCFailureMsg::Malformed(
+                                                               msgs::UpdateFailMalformedHTLC {
                                                                        channel_id: msg.channel_id,
                                                                        htlc_id: msg.htlc_id,
-                                                                       reason
-                                                               };
-                                                               PendingHTLCStatus::Fail(HTLCFailureMsg::Relay(msg))
-                                                       },
-                                                       _ => pending_forward_info
+                                                                       sha256_of_onion: [0; 32],
+                                                                       failure_code: INVALID_ONION_BLINDING,
+                                                               }
+                                                       ))
+                                               } else {
+                                                       match pending_forward_info {
+                                                               PendingHTLCStatus::Forward(PendingHTLCInfo {
+                                                                       ref incoming_shared_secret, ref routing, ..
+                                                               }) => {
+                                                                       let reason = if routing.blinded_failure().is_some() {
+                                                                               HTLCFailReason::reason(INVALID_ONION_BLINDING, vec![0; 32])
+                                                                       } else if (error_code & 0x1000) != 0 {
+                                                                               let (real_code, error_data) = self.get_htlc_inbound_temp_fail_err_and_data(error_code, chan);
+                                                                               HTLCFailReason::reason(real_code, error_data)
+                                                                       } else {
+                                                                               HTLCFailReason::from_failure_code(error_code)
+                                                                       }.get_encrypted_failure_packet(incoming_shared_secret, &None);
+                                                                       let msg = msgs::UpdateFailHTLC {
+                                                                               channel_id: msg.channel_id,
+                                                                               htlc_id: msg.htlc_id,
+                                                                               reason
+                                                                       };
+                                                                       pending_forward_info = PendingHTLCStatus::Fail(HTLCFailureMsg::Relay(msg));
+                                                               },
+                                                               _ => {},
+                                                       }
                                                }
-                                       };
-                                       let logger = WithChannelContext::from(&self.logger, &chan.context);
-                                       try_chan_phase_entry!(self, chan.update_add_htlc(&msg, pending_forward_info, create_pending_htlc_status, &self.fee_estimator, &&logger), chan_phase_entry);
+                                       }
+                                       try_chan_phase_entry!(self, chan.update_add_htlc(&msg, pending_forward_info), chan_phase_entry);
                                } else {
                                        return try_chan_phase_entry!(self, Err(ChannelError::Close(
                                                "Got an update_add_htlc message for an unfunded channel!".into())), chan_phase_entry);
@@ -6717,7 +7679,8 @@ where
 
        fn internal_update_fulfill_htlc(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateFulfillHTLC) -> Result<(), MsgHandleErrInternal> {
                let funding_txo;
-               let (htlc_source, forwarded_htlc_value) = {
+               let next_user_channel_id;
+               let (htlc_source, forwarded_htlc_value, skimmed_fee_msat) = {
                        let per_peer_state = self.per_peer_state.read().unwrap();
                        let peer_state_mutex = per_peer_state.get(counterparty_node_id)
                                .ok_or_else(|| {
@@ -6746,6 +7709,7 @@ where
                                                // outbound HTLC is claimed. This is guaranteed to all complete before we
                                                // process the RAA as messages are processed from single peers serially.
                                                funding_txo = chan.context.get_funding_txo().expect("We won't accept a fulfill until funded");
+                                               next_user_channel_id = chan.context.get_user_id();
                                                res
                                        } else {
                                                return try_chan_phase_entry!(self, Err(ChannelError::Close(
@@ -6755,7 +7719,11 @@ where
                                hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id))
                        }
                };
-               self.claim_funds_internal(htlc_source, msg.payment_preimage.clone(), Some(forwarded_htlc_value), false, false, Some(*counterparty_node_id), funding_txo);
+               self.claim_funds_internal(htlc_source, msg.payment_preimage.clone(),
+                       Some(forwarded_htlc_value), skimmed_fee_msat, false, false, Some(*counterparty_node_id),
+                       funding_txo, msg.channel_id, Some(next_user_channel_id),
+               );
+
                Ok(())
        }
 
@@ -6842,10 +7810,28 @@ where
                }
        }
 
+       fn push_decode_update_add_htlcs(&self, mut update_add_htlcs: (u64, Vec<msgs::UpdateAddHTLC>)) {
+               let mut push_forward_event = self.forward_htlcs.lock().unwrap().is_empty();
+               let mut decode_update_add_htlcs = self.decode_update_add_htlcs.lock().unwrap();
+               push_forward_event &= decode_update_add_htlcs.is_empty();
+               let scid = update_add_htlcs.0;
+               match decode_update_add_htlcs.entry(scid) {
+                       hash_map::Entry::Occupied(mut e) => { e.get_mut().append(&mut update_add_htlcs.1); },
+                       hash_map::Entry::Vacant(e) => { e.insert(update_add_htlcs.1); },
+               }
+               if push_forward_event { self.push_pending_forwards_ev(); }
+       }
+
+       #[inline]
+       fn forward_htlcs(&self, per_source_pending_forwards: &mut [(u64, OutPoint, ChannelId, u128, Vec<(PendingHTLCInfo, u64)>)]) {
+               let push_forward_event = self.forward_htlcs_without_forward_event(per_source_pending_forwards);
+               if push_forward_event { self.push_pending_forwards_ev() }
+       }
+
        #[inline]
-       fn forward_htlcs(&self, per_source_pending_forwards: &mut [(u64, OutPoint, u128, Vec<(PendingHTLCInfo, u64)>)]) {
-               for &mut (prev_short_channel_id, prev_funding_outpoint, prev_user_channel_id, ref mut pending_forwards) in per_source_pending_forwards {
-                       let mut push_forward_event = false;
+       fn forward_htlcs_without_forward_event(&self, per_source_pending_forwards: &mut [(u64, OutPoint, ChannelId, u128, Vec<(PendingHTLCInfo, u64)>)]) -> bool {
+               let mut push_forward_event = false;
+               for &mut (prev_short_channel_id, prev_funding_outpoint, prev_channel_id, prev_user_channel_id, ref mut pending_forwards) in per_source_pending_forwards {
                        let mut new_intercept_events = VecDeque::new();
                        let mut failed_intercept_forwards = Vec::new();
                        if !pending_forwards.is_empty() {
@@ -6858,12 +7844,13 @@ where
                                        // Pull this now to avoid introducing a lock order with `forward_htlcs`.
                                        let is_our_scid = self.short_to_chan_info.read().unwrap().contains_key(&scid);
 
+                                       let decode_update_add_htlcs_empty = self.decode_update_add_htlcs.lock().unwrap().is_empty();
                                        let mut forward_htlcs = self.forward_htlcs.lock().unwrap();
                                        let forward_htlcs_empty = forward_htlcs.is_empty();
                                        match forward_htlcs.entry(scid) {
                                                hash_map::Entry::Occupied(mut entry) => {
                                                        entry.get_mut().push(HTLCForwardInfo::AddHTLC(PendingAddHTLCInfo {
-                                                               prev_short_channel_id, prev_funding_outpoint, prev_htlc_id, prev_user_channel_id, forward_info }));
+                                                               prev_short_channel_id, prev_funding_outpoint, prev_channel_id, prev_htlc_id, prev_user_channel_id, forward_info }));
                                                },
                                                hash_map::Entry::Vacant(entry) => {
                                                        if !is_our_scid && forward_info.incoming_amt_msat.is_some() &&
@@ -6881,15 +7868,16 @@ where
                                                                                        intercept_id
                                                                                }, None));
                                                                                entry.insert(PendingAddHTLCInfo {
-                                                                                       prev_short_channel_id, prev_funding_outpoint, prev_htlc_id, prev_user_channel_id, forward_info });
+                                                                                       prev_short_channel_id, prev_funding_outpoint, prev_channel_id, prev_htlc_id, prev_user_channel_id, forward_info });
                                                                        },
                                                                        hash_map::Entry::Occupied(_) => {
-                                                                               let logger = WithContext::from(&self.logger, None, Some(prev_funding_outpoint.to_channel_id()));
+                                                                               let logger = WithContext::from(&self.logger, None, Some(prev_channel_id));
                                                                                log_info!(logger, "Failed to forward incoming HTLC: detected duplicate intercepted payment over short channel id {}", scid);
                                                                                let htlc_source = HTLCSource::PreviousHopData(HTLCPreviousHopData {
                                                                                        short_channel_id: prev_short_channel_id,
                                                                                        user_channel_id: Some(prev_user_channel_id),
                                                                                        outpoint: prev_funding_outpoint,
+                                                                                       channel_id: prev_channel_id,
                                                                                        htlc_id: prev_htlc_id,
                                                                                        incoming_packet_shared_secret: forward_info.incoming_shared_secret,
                                                                                        phantom_shared_secret: None,
@@ -6905,11 +7893,9 @@ where
                                                        } else {
                                                                // We don't want to generate a PendingHTLCsForwardable event if only intercepted
                                                                // payments are being processed.
-                                                               if forward_htlcs_empty {
-                                                                       push_forward_event = true;
-                                                               }
+                                                               push_forward_event |= forward_htlcs_empty && decode_update_add_htlcs_empty;
                                                                entry.insert(vec!(HTLCForwardInfo::AddHTLC(PendingAddHTLCInfo {
-                                                                       prev_short_channel_id, prev_funding_outpoint, prev_htlc_id, prev_user_channel_id, forward_info })));
+                                                                       prev_short_channel_id, prev_funding_outpoint, prev_channel_id, prev_htlc_id, prev_user_channel_id, forward_info })));
                                                        }
                                                }
                                        }
@@ -6917,15 +7903,15 @@ where
                        }
 
                        for (htlc_source, payment_hash, failure_reason, destination) in failed_intercept_forwards.drain(..) {
-                               self.fail_htlc_backwards_internal(&htlc_source, &payment_hash, &failure_reason, destination);
+                               push_forward_event |= self.fail_htlc_backwards_internal_without_forward_event(&htlc_source, &payment_hash, &failure_reason, destination);
                        }
 
                        if !new_intercept_events.is_empty() {
                                let mut events = self.pending_events.lock().unwrap();
                                events.append(&mut new_intercept_events);
                        }
-                       if push_forward_event { self.push_pending_forwards_ev() }
                }
+               push_forward_event
        }
 
        fn push_pending_forwards_ev(&self) {
@@ -6953,13 +7939,14 @@ where
        /// the [`ChannelMonitorUpdate`] in question.
        fn raa_monitor_updates_held(&self,
                actions_blocking_raa_monitor_updates: &BTreeMap<ChannelId, Vec<RAAMonitorUpdateBlockingAction>>,
-               channel_funding_outpoint: OutPoint, counterparty_node_id: PublicKey
+               channel_funding_outpoint: OutPoint, channel_id: ChannelId, counterparty_node_id: PublicKey
        ) -> bool {
                actions_blocking_raa_monitor_updates
-                       .get(&channel_funding_outpoint.to_channel_id()).map(|v| !v.is_empty()).unwrap_or(false)
+                       .get(&channel_id).map(|v| !v.is_empty()).unwrap_or(false)
                || self.pending_events.lock().unwrap().iter().any(|(_, action)| {
                        action == &Some(EventCompletionAction::ReleaseRAAChannelMonitorUpdate {
                                channel_funding_outpoint,
+                               channel_id,
                                counterparty_node_id,
                        })
                })
@@ -6976,7 +7963,7 @@ where
 
                        if let Some(chan) = peer_state.channel_by_id.get(&channel_id) {
                                return self.raa_monitor_updates_held(&peer_state.actions_blocking_raa_monitor_updates,
-                                       chan.context().get_funding_txo().unwrap(), counterparty_node_id);
+                                       chan.context().get_funding_txo().unwrap(), channel_id, counterparty_node_id);
                        }
                }
                false
@@ -6998,7 +7985,7 @@ where
                                                let funding_txo_opt = chan.context.get_funding_txo();
                                                let mon_update_blocked = if let Some(funding_txo) = funding_txo_opt {
                                                        self.raa_monitor_updates_held(
-                                                               &peer_state.actions_blocking_raa_monitor_updates, funding_txo,
+                                                               &peer_state.actions_blocking_raa_monitor_updates, funding_txo, msg.channel_id,
                                                                *counterparty_node_id)
                                                } else { false };
                                                let (htlcs_to_fail, monitor_update_opt) = try_chan_phase_entry!(self,
@@ -7064,7 +8051,7 @@ where
 
                                        peer_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelAnnouncement {
                                                msg: try_chan_phase_entry!(self, chan.announcement_signatures(
-                                                       &self.node_signer, self.chain_hash, self.best_block.read().unwrap().height(),
+                                                       &self.node_signer, self.chain_hash, self.best_block.read().unwrap().height,
                                                        msg, &self.default_configuration
                                                ), chan_phase_entry),
                                                // Note that announcement_signatures fails if the channel cannot be announced,
@@ -7134,7 +8121,6 @@ where
        }
 
        fn internal_channel_reestablish(&self, counterparty_node_id: &PublicKey, msg: &msgs::ChannelReestablish) -> Result<NotifyOption, MsgHandleErrInternal> {
-               let htlc_forwards;
                let need_lnd_workaround = {
                        let per_peer_state = self.per_peer_state.read().unwrap();
 
@@ -7177,9 +8163,11 @@ where
                                                        }
                                                }
                                                let need_lnd_workaround = chan.context.workaround_lnd_bug_4006.take();
-                                               htlc_forwards = self.handle_channel_resumption(
+                                               let (htlc_forwards, decode_update_add_htlcs) = self.handle_channel_resumption(
                                                        &mut peer_state.pending_msg_events, chan, responses.raa, responses.commitment_update, responses.order,
-                                                       Vec::new(), None, responses.channel_ready, responses.announcement_sigs);
+                                                       Vec::new(), Vec::new(), None, responses.channel_ready, responses.announcement_sigs);
+                                               debug_assert!(htlc_forwards.is_none());
+                                               debug_assert!(decode_update_add_htlcs.is_none());
                                                if let Some(upd) = channel_update {
                                                        peer_state.pending_msg_events.push(upd);
                                                }
@@ -7225,16 +8213,10 @@ where
                        }
                };
 
-               let mut persist = NotifyOption::SkipPersistHandleEvents;
-               if let Some(forwards) = htlc_forwards {
-                       self.forward_htlcs(&mut [forwards][..]);
-                       persist = NotifyOption::DoPersist;
-               }
-
                if let Some(channel_ready_msg) = need_lnd_workaround {
                        self.internal_channel_ready(counterparty_node_id, &channel_ready_msg)?;
                }
-               Ok(persist)
+               Ok(NotifyOption::SkipPersistHandleEvents)
        }
 
        /// Process pending events from the [`chain::Watch`], returning whether any events were processed.
@@ -7244,22 +8226,24 @@ where
                let mut failed_channels = Vec::new();
                let mut pending_monitor_events = self.chain_monitor.release_pending_monitor_events();
                let has_pending_monitor_events = !pending_monitor_events.is_empty();
-               for (funding_outpoint, mut monitor_events, counterparty_node_id) in pending_monitor_events.drain(..) {
+               for (funding_outpoint, channel_id, mut monitor_events, counterparty_node_id) in pending_monitor_events.drain(..) {
                        for monitor_event in monitor_events.drain(..) {
                                match monitor_event {
                                        MonitorEvent::HTLCEvent(htlc_update) => {
-                                               let logger = WithContext::from(&self.logger, counterparty_node_id, Some(funding_outpoint.to_channel_id()));
+                                               let logger = WithContext::from(&self.logger, counterparty_node_id, Some(channel_id));
                                                if let Some(preimage) = htlc_update.payment_preimage {
                                                        log_trace!(logger, "Claiming HTLC with preimage {} from our monitor", preimage);
-                                                       self.claim_funds_internal(htlc_update.source, preimage, htlc_update.htlc_value_satoshis.map(|v| v * 1000), true, false, counterparty_node_id, funding_outpoint);
+                                                       self.claim_funds_internal(htlc_update.source, preimage,
+                                                               htlc_update.htlc_value_satoshis.map(|v| v * 1000), None, true,
+                                                               false, counterparty_node_id, funding_outpoint, channel_id, None);
                                                } else {
                                                        log_trace!(logger, "Failing HTLC with hash {} from our monitor", &htlc_update.payment_hash);
-                                                       let receiver = HTLCDestination::NextHopChannel { node_id: counterparty_node_id, channel_id: funding_outpoint.to_channel_id() };
+                                                       let receiver = HTLCDestination::NextHopChannel { node_id: counterparty_node_id, channel_id };
                                                        let reason = HTLCFailReason::from_failure_code(0x4000 | 8);
                                                        self.fail_htlc_backwards_internal(&htlc_update.source, &htlc_update.payment_hash, &reason, receiver);
                                                }
                                        },
-                                       MonitorEvent::HolderForceClosed(funding_outpoint) => {
+                                       MonitorEvent::HolderForceClosed(_) | MonitorEvent::HolderForceClosedWithInfo { .. } => {
                                                let counterparty_node_id_opt = match counterparty_node_id {
                                                        Some(cp_id) => Some(cp_id),
                                                        None => {
@@ -7275,18 +8259,24 @@ where
                                                                let mut peer_state_lock = peer_state_mutex.lock().unwrap();
                                                                let peer_state = &mut *peer_state_lock;
                                                                let pending_msg_events = &mut peer_state.pending_msg_events;
-                                                               if let hash_map::Entry::Occupied(chan_phase_entry) = peer_state.channel_by_id.entry(funding_outpoint.to_channel_id()) {
+                                                               if let hash_map::Entry::Occupied(chan_phase_entry) = peer_state.channel_by_id.entry(channel_id) {
                                                                        if let ChannelPhase::Funded(mut chan) = remove_channel_phase!(self, chan_phase_entry) {
-                                                                               failed_channels.push(chan.context.force_shutdown(false, ClosureReason::HolderForceClosed));
+                                                                               let reason = if let MonitorEvent::HolderForceClosedWithInfo { reason, .. } = monitor_event {
+                                                                                       reason
+                                                                               } else {
+                                                                                       ClosureReason::HolderForceClosed
+                                                                               };
+                                                                               failed_channels.push(chan.context.force_shutdown(false, reason.clone()));
                                                                                if let Ok(update) = self.get_channel_update_for_broadcast(&chan) {
-                                                                                       pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
+                                                                                       let mut pending_broadcast_messages = self.pending_broadcast_messages.lock().unwrap();
+                                                                                       pending_broadcast_messages.push(events::MessageSendEvent::BroadcastChannelUpdate {
                                                                                                msg: update
                                                                                        });
                                                                                }
                                                                                pending_msg_events.push(events::MessageSendEvent::HandleError {
                                                                                        node_id: chan.context.get_counterparty_node_id(),
                                                                                        action: msgs::ErrorAction::DisconnectPeer {
-                                                                                               msg: Some(msgs::ErrorMessage { channel_id: chan.context.channel_id(), data: "Channel force-closed".to_owned() })
+                                                                                               msg: Some(msgs::ErrorMessage { channel_id: chan.context.channel_id(), data: reason.to_string() })
                                                                                        },
                                                                                });
                                                                        }
@@ -7294,8 +8284,8 @@ where
                                                        }
                                                }
                                        },
-                                       MonitorEvent::Completed { funding_txo, monitor_update_id } => {
-                                               self.channel_monitor_updated(&funding_txo, monitor_update_id, counterparty_node_id.as_ref());
+                                       MonitorEvent::Completed { funding_txo, channel_id, monitor_update_id } => {
+                                               self.channel_monitor_updated(&funding_txo, &channel_id, monitor_update_id, counterparty_node_id.as_ref());
                                        },
                                }
                        }
@@ -7464,7 +8454,8 @@ where
                                                                                // We're done with this channel. We got a closing_signed and sent back
                                                                                // a closing_signed with a closing transaction to broadcast.
                                                                                if let Ok(update) = self.get_channel_update_for_broadcast(&chan) {
-                                                                                       pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
+                                                                                       let mut pending_broadcast_messages = self.pending_broadcast_messages.lock().unwrap();
+                                                                                       pending_broadcast_messages.push(events::MessageSendEvent::BroadcastChannelUpdate {
                                                                                                msg: update
                                                                                        });
                                                                                }
@@ -7512,14 +8503,14 @@ where
                        // Channel::force_shutdown tries to make us do) as we may still be in initialization,
                        // so we track the update internally and handle it when the user next calls
                        // timer_tick_occurred, guaranteeing we're running normally.
-                       if let Some((counterparty_node_id, funding_txo, update)) = failure.monitor_update.take() {
+                       if let Some((counterparty_node_id, funding_txo, channel_id, update)) = failure.monitor_update.take() {
                                assert_eq!(update.updates.len(), 1);
                                if let ChannelMonitorUpdateStep::ChannelForceClosed { should_broadcast } = update.updates[0] {
                                        assert!(should_broadcast);
                                } else { unreachable!(); }
                                self.pending_background_events.lock().unwrap().push(
                                        BackgroundEvent::MonitorUpdateRegeneratedOnStartup {
-                                               counterparty_node_id, funding_txo, update
+                                               counterparty_node_id, funding_txo, update, channel_id,
                                        });
                        }
                        self.finish_close_channel(failure);
@@ -7574,23 +8565,7 @@ macro_rules! create_offer_builder { ($self: ident, $builder: ty) => {
        }
 } }
 
-impl<M: Deref, T: Deref, ES: Deref, NS: Deref, SP: Deref, F: Deref, R: Deref, L: Deref> ChannelManager<M, T, ES, NS, SP, F, R, L>
-where
-       M::Target: chain::Watch<<SP::Target as SignerProvider>::EcdsaSigner>,
-       T::Target: BroadcasterInterface,
-       ES::Target: EntropySource,
-       NS::Target: NodeSigner,
-       SP::Target: SignerProvider,
-       F::Target: FeeEstimator,
-       R::Target: Router,
-       L::Target: Logger,
-{
-       #[cfg(not(c_bindings))]
-       create_offer_builder!(self, OfferBuilder<DerivedMetadata, secp256k1::All>);
-
-       #[cfg(c_bindings)]
-       create_offer_builder!(self, OfferWithDerivedMetadataBuilder);
-
+macro_rules! create_refund_builder { ($self: ident, $builder: ty) => {
        /// Creates a [`RefundBuilder`] such that the [`Refund`] it builds is recognized by the
        /// [`ChannelManager`] when handling [`Bolt12Invoice`] messages for the refund.
        ///
@@ -7640,31 +8615,55 @@ where
        /// [`Bolt12Invoice::payment_paths`]: crate::offers::invoice::Bolt12Invoice::payment_paths
        /// [Avoiding Duplicate Payments]: #avoiding-duplicate-payments
        pub fn create_refund_builder(
-               &self, description: String, amount_msats: u64, absolute_expiry: Duration,
+               &$self, description: String, amount_msats: u64, absolute_expiry: Duration,
                payment_id: PaymentId, retry_strategy: Retry, max_total_routing_fee_msat: Option<u64>
-       ) -> Result<RefundBuilder<secp256k1::All>, Bolt12SemanticError> {
-               let node_id = self.get_our_node_id();
-               let expanded_key = &self.inbound_payment_key;
-               let entropy = &*self.entropy_source;
-               let secp_ctx = &self.secp_ctx;
+       ) -> Result<$builder, Bolt12SemanticError> {
+               let node_id = $self.get_our_node_id();
+               let expanded_key = &$self.inbound_payment_key;
+               let entropy = &*$self.entropy_source;
+               let secp_ctx = &$self.secp_ctx;
 
-               let path = self.create_blinded_path().map_err(|_| Bolt12SemanticError::MissingPaths)?;
+               let path = $self.create_blinded_path().map_err(|_| Bolt12SemanticError::MissingPaths)?;
                let builder = RefundBuilder::deriving_payer_id(
                        description, node_id, expanded_key, entropy, secp_ctx, amount_msats, payment_id
                )?
-                       .chain_hash(self.chain_hash)
+                       .chain_hash($self.chain_hash)
                        .absolute_expiry(absolute_expiry)
                        .path(path);
 
+               let _persistence_guard = PersistenceNotifierGuard::notify_on_drop($self);
+
                let expiration = StaleExpiration::AbsoluteTimeout(absolute_expiry);
-               self.pending_outbound_payments
+               $self.pending_outbound_payments
                        .add_new_awaiting_invoice(
                                payment_id, expiration, retry_strategy, max_total_routing_fee_msat,
                        )
                        .map_err(|_| Bolt12SemanticError::DuplicatePaymentId)?;
 
-               Ok(builder)
+               Ok(builder.into())
        }
+} }
+
+impl<M: Deref, T: Deref, ES: Deref, NS: Deref, SP: Deref, F: Deref, R: Deref, L: Deref> ChannelManager<M, T, ES, NS, SP, F, R, L>
+where
+       M::Target: chain::Watch<<SP::Target as SignerProvider>::EcdsaSigner>,
+       T::Target: BroadcasterInterface,
+       ES::Target: EntropySource,
+       NS::Target: NodeSigner,
+       SP::Target: SignerProvider,
+       F::Target: FeeEstimator,
+       R::Target: Router,
+       L::Target: Logger,
+{
+       #[cfg(not(c_bindings))]
+       create_offer_builder!(self, OfferBuilder<DerivedMetadata, secp256k1::All>);
+       #[cfg(not(c_bindings))]
+       create_refund_builder!(self, RefundBuilder<secp256k1::All>);
+
+       #[cfg(c_bindings)]
+       create_offer_builder!(self, OfferWithDerivedMetadataBuilder);
+       #[cfg(c_bindings)]
+       create_refund_builder!(self, RefundMaybeWithDerivedMetadataBuilder);
 
        /// Pays for an [`Offer`] using the given parameters by creating an [`InvoiceRequest`] and
        /// enqueuing it to be sent via an onion message. [`ChannelManager`] will pay the actual
@@ -7709,6 +8708,7 @@ where
        /// Errors if:
        /// - a duplicate `payment_id` is provided given the caveats in the aforementioned link,
        /// - the provided parameters are invalid for the offer,
+       /// - the offer is for an unsupported chain, or
        /// - the parameterized [`Router`] is unable to create a blinded reply path for the invoice
        ///   request.
        ///
@@ -7728,9 +8728,11 @@ where
                let entropy = &*self.entropy_source;
                let secp_ctx = &self.secp_ctx;
 
-               let builder = offer
+               let builder: InvoiceRequestBuilder<DerivedPayerId, secp256k1::All> = offer
                        .request_invoice_deriving_payer_id(expanded_key, entropy, secp_ctx, payment_id)?
-                       .chain_hash(self.chain_hash)?;
+                       .into();
+               let builder = builder.chain_hash(self.chain_hash)?;
+
                let builder = match quantity {
                        None => builder,
                        Some(quantity) => builder.quantity(quantity)?,
@@ -7746,6 +8748,8 @@ where
                let invoice_request = builder.build_and_sign()?;
                let reply_path = self.create_blinded_path().map_err(|_| Bolt12SemanticError::MissingPaths)?;
 
+               let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self);
+
                let expiration = StaleExpiration::TimerTicks(1);
                self.pending_outbound_payments
                        .add_new_awaiting_invoice(
@@ -7784,7 +8788,7 @@ where
        ///
        /// The resulting invoice uses a [`PaymentHash`] recognized by the [`ChannelManager`] and a
        /// [`BlindedPath`] containing the [`PaymentSecret`] needed to reconstruct the corresponding
-       /// [`PaymentPreimage`].
+       /// [`PaymentPreimage`]. It is returned purely for informational purposes.
        ///
        /// # Limitations
        ///
@@ -7795,11 +8799,15 @@ where
        ///
        /// # Errors
        ///
-       /// Errors if the parameterized [`Router`] is unable to create a blinded payment path or reply
-       /// path for the invoice.
+       /// Errors if:
+       /// - the refund is for an unsupported chain, or
+       /// - the parameterized [`Router`] is unable to create a blinded payment path or reply path for
+       ///   the invoice.
        ///
        /// [`Bolt12Invoice`]: crate::offers::invoice::Bolt12Invoice
-       pub fn request_refund_payment(&self, refund: &Refund) -> Result<(), Bolt12SemanticError> {
+       pub fn request_refund_payment(
+               &self, refund: &Refund
+       ) -> Result<Bolt12Invoice, Bolt12SemanticError> {
                let expanded_key = &self.inbound_payment_key;
                let entropy = &*self.entropy_source;
                let secp_ctx = &self.secp_ctx;
@@ -7807,9 +8815,18 @@ where
                let amount_msats = refund.amount_msats();
                let relative_expiry = DEFAULT_RELATIVE_EXPIRY.as_secs() as u32;
 
+               if refund.chain() != self.chain_hash {
+                       return Err(Bolt12SemanticError::UnsupportedChain);
+               }
+
+               let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self);
+
                match self.create_inbound_payment(Some(amount_msats), relative_expiry, None) {
                        Ok((payment_hash, payment_secret)) => {
-                               let payment_paths = self.create_blinded_payment_paths(amount_msats, payment_secret)
+                               let payment_context = PaymentContext::Bolt12Refund(Bolt12RefundContext {});
+                               let payment_paths = self.create_blinded_payment_paths(
+                                       amount_msats, payment_secret, payment_context
+                               )
                                        .map_err(|_| Bolt12SemanticError::MissingPaths)?;
 
                                #[cfg(feature = "std")]
@@ -7824,6 +8841,7 @@ where
                                let builder = refund.respond_using_derived_keys_no_std(
                                        payment_paths, payment_hash, created_at, expanded_key, entropy
                                )?;
+                               let builder: InvoiceBuilder<DerivedSigningPubkey> = builder.into();
                                let invoice = builder.allow_mpp().build_and_sign(secp_ctx)?;
                                let reply_path = self.create_blinded_path()
                                        .map_err(|_| Bolt12SemanticError::MissingPaths)?;
@@ -7831,7 +8849,7 @@ where
                                let mut pending_offers_messages = self.pending_offers_messages.lock().unwrap();
                                if refund.paths().is_empty() {
                                        let message = new_pending_onion_message(
-                                               OffersMessage::Invoice(invoice),
+                                               OffersMessage::Invoice(invoice.clone()),
                                                Destination::Node(refund.payer_id()),
                                                Some(reply_path),
                                        );
@@ -7847,7 +8865,7 @@ where
                                        }
                                }
 
-                               Ok(())
+                               Ok(invoice)
                        },
                        Err(()) => Err(Bolt12SemanticError::InvalidAmount),
                }
@@ -7860,9 +8878,9 @@ where
        /// [`PaymentHash`] and [`PaymentPreimage`] for you.
        ///
        /// The [`PaymentPreimage`] will ultimately be returned to you in the [`PaymentClaimable`], which
-       /// will have the [`PaymentClaimable::purpose`] be [`PaymentPurpose::InvoicePayment`] with
-       /// its [`PaymentPurpose::InvoicePayment::payment_preimage`] field filled in. That should then be
-       /// passed directly to [`claim_funds`].
+       /// will have the [`PaymentClaimable::purpose`] be [`PaymentPurpose::Bolt11InvoicePayment`] with
+       /// its [`PaymentPurpose::Bolt11InvoicePayment::payment_preimage`] field filled in. That should
+       /// then be passed directly to [`claim_funds`].
        ///
        /// See [`create_inbound_payment_for_hash`] for detailed documentation on behavior and requirements.
        ///
@@ -7882,8 +8900,8 @@ where
        /// [`claim_funds`]: Self::claim_funds
        /// [`PaymentClaimable`]: events::Event::PaymentClaimable
        /// [`PaymentClaimable::purpose`]: events::Event::PaymentClaimable::purpose
-       /// [`PaymentPurpose::InvoicePayment`]: events::PaymentPurpose::InvoicePayment
-       /// [`PaymentPurpose::InvoicePayment::payment_preimage`]: events::PaymentPurpose::InvoicePayment::payment_preimage
+       /// [`PaymentPurpose::Bolt11InvoicePayment`]: events::PaymentPurpose::Bolt11InvoicePayment
+       /// [`PaymentPurpose::Bolt11InvoicePayment::payment_preimage`]: events::PaymentPurpose::Bolt11InvoicePayment::payment_preimage
        /// [`create_inbound_payment_for_hash`]: Self::create_inbound_payment_for_hash
        pub fn create_inbound_payment(&self, min_value_msat: Option<u64>, invoice_expiry_delta_secs: u32,
                min_final_cltv_expiry_delta: Option<u16>) -> Result<(PaymentHash, PaymentSecret), ()> {
@@ -7958,7 +8976,6 @@ where
        /// Errors if the `MessageRouter` errors or returns an empty `Vec`.
        fn create_blinded_path(&self) -> Result<BlindedPath, ()> {
                let recipient = self.get_our_node_id();
-               let entropy_source = self.entropy_source.deref();
                let secp_ctx = &self.secp_ctx;
 
                let peers = self.per_peer_state.read().unwrap()
@@ -7968,21 +8985,20 @@ where
                        .collect::<Vec<_>>();
 
                self.router
-                       .create_blinded_paths(recipient, peers, entropy_source, secp_ctx)
+                       .create_blinded_paths(recipient, peers, secp_ctx)
                        .and_then(|paths| paths.into_iter().next().ok_or(()))
        }
 
        /// Creates multi-hop blinded payment paths for the given `amount_msats` by delegating to
        /// [`Router::create_blinded_payment_paths`].
        fn create_blinded_payment_paths(
-               &self, amount_msats: u64, payment_secret: PaymentSecret
+               &self, amount_msats: u64, payment_secret: PaymentSecret, payment_context: PaymentContext
        ) -> Result<Vec<(BlindedPayInfo, BlindedPath)>, ()> {
-               let entropy_source = self.entropy_source.deref();
                let secp_ctx = &self.secp_ctx;
 
                let first_hops = self.list_usable_channels();
                let payee_node_id = self.get_our_node_id();
-               let max_cltv_expiry = self.best_block.read().unwrap().height() + CLTV_FAR_FAR_AWAY
+               let max_cltv_expiry = self.best_block.read().unwrap().height + CLTV_FAR_FAR_AWAY
                        + LATENCY_GRACE_PERIOD_BLOCKS;
                let payee_tlvs = ReceiveTlvs {
                        payment_secret,
@@ -7990,9 +9006,10 @@ where
                                max_cltv_expiry,
                                htlc_minimum_msat: 1,
                        },
+                       payment_context,
                };
                self.router.create_blinded_payment_paths(
-                       payee_node_id, first_hops, payee_tlvs, amount_msats, entropy_source, secp_ctx
+                       payee_node_id, first_hops, payee_tlvs, amount_msats, secp_ctx
                )
        }
 
@@ -8001,7 +9018,7 @@ where
        ///
        /// [phantom node payments]: crate::sign::PhantomKeysManager
        pub fn get_phantom_scid(&self) -> u64 {
-               let best_block_height = self.best_block.read().unwrap().height();
+               let best_block_height = self.best_block.read().unwrap().height;
                let short_to_chan_info = self.short_to_chan_info.read().unwrap();
                loop {
                        let scid_candidate = fake_scid::Namespace::Phantom.get_fake_scid(best_block_height, &self.chain_hash, &self.fake_scid_rand_bytes, &self.entropy_source);
@@ -8031,7 +9048,7 @@ where
        /// Note that this method is not guaranteed to return unique values, you may need to call it a few
        /// times to get a unique scid.
        pub fn get_intercept_scid(&self) -> u64 {
-               let best_block_height = self.best_block.read().unwrap().height();
+               let best_block_height = self.best_block.read().unwrap().height;
                let short_to_chan_info = self.short_to_chan_info.read().unwrap();
                loop {
                        let scid_candidate = fake_scid::Namespace::Intercept.get_fake_scid(best_block_height, &self.chain_hash, &self.fake_scid_rand_bytes, &self.entropy_source);
@@ -8098,9 +9115,12 @@ where
        /// [`Event`] being handled) completes, this should be called to restore the channel to normal
        /// operation. It will double-check that nothing *else* is also blocking the same channel from
        /// making progress and then let any blocked [`ChannelMonitorUpdate`]s fly.
-       fn handle_monitor_update_release(&self, counterparty_node_id: PublicKey, channel_funding_outpoint: OutPoint, mut completed_blocker: Option<RAAMonitorUpdateBlockingAction>) {
+       fn handle_monitor_update_release(&self, counterparty_node_id: PublicKey,
+               channel_funding_outpoint: OutPoint, channel_id: ChannelId,
+               mut completed_blocker: Option<RAAMonitorUpdateBlockingAction>) {
+
                let logger = WithContext::from(
-                       &self.logger, Some(counterparty_node_id), Some(channel_funding_outpoint.to_channel_id())
+                       &self.logger, Some(counterparty_node_id), Some(channel_id),
                );
                loop {
                        let per_peer_state = self.per_peer_state.read().unwrap();
@@ -8110,28 +9130,29 @@ where
                                if let Some(blocker) = completed_blocker.take() {
                                        // Only do this on the first iteration of the loop.
                                        if let Some(blockers) = peer_state.actions_blocking_raa_monitor_updates
-                                               .get_mut(&channel_funding_outpoint.to_channel_id())
+                                               .get_mut(&channel_id)
                                        {
                                                blockers.retain(|iter| iter != &blocker);
                                        }
                                }
 
                                if self.raa_monitor_updates_held(&peer_state.actions_blocking_raa_monitor_updates,
-                                       channel_funding_outpoint, counterparty_node_id) {
+                                       channel_funding_outpoint, channel_id, counterparty_node_id) {
                                        // Check that, while holding the peer lock, we don't have anything else
                                        // blocking monitor updates for this channel. If we do, release the monitor
                                        // update(s) when those blockers complete.
                                        log_trace!(logger, "Delaying monitor unlock for channel {} as another channel's mon update needs to complete first",
-                                               &channel_funding_outpoint.to_channel_id());
+                                               &channel_id);
                                        break;
                                }
 
-                               if let hash_map::Entry::Occupied(mut chan_phase_entry) = peer_state.channel_by_id.entry(channel_funding_outpoint.to_channel_id()) {
+                               if let hash_map::Entry::Occupied(mut chan_phase_entry) = peer_state.channel_by_id.entry(
+                                       channel_id) {
                                        if let ChannelPhase::Funded(chan) = chan_phase_entry.get_mut() {
                                                debug_assert_eq!(chan.context.get_funding_txo().unwrap(), channel_funding_outpoint);
                                                if let Some((monitor_update, further_update_exists)) = chan.unblock_next_blocked_monitor_update() {
                                                        log_debug!(logger, "Unlocking monitor updating for channel {} and updating monitor",
-                                                               channel_funding_outpoint.to_channel_id());
+                                                               channel_id);
                                                        handle_new_monitor_update!(self, channel_funding_outpoint, monitor_update,
                                                                peer_state_lck, peer_state, per_peer_state, chan);
                                                        if further_update_exists {
@@ -8141,7 +9162,7 @@ where
                                                        }
                                                } else {
                                                        log_trace!(logger, "Unlocked monitor updating for channel {} without monitors to update",
-                                                               channel_funding_outpoint.to_channel_id());
+                                                               channel_id);
                                                }
                                        }
                                }
@@ -8158,9 +9179,9 @@ where
                for action in actions {
                        match action {
                                EventCompletionAction::ReleaseRAAChannelMonitorUpdate {
-                                       channel_funding_outpoint, counterparty_node_id
+                                       channel_funding_outpoint, channel_id, counterparty_node_id
                                } => {
-                                       self.handle_monitor_update_release(counterparty_node_id, channel_funding_outpoint, None);
+                                       self.handle_monitor_update_release(counterparty_node_id, channel_funding_outpoint, channel_id, None);
                                }
                        }
                }
@@ -8200,7 +9221,7 @@ where
        /// will randomly be placed first or last in the returned array.
        ///
        /// Note that even though `BroadcastChannelAnnouncement` and `BroadcastChannelUpdate`
-       /// `MessageSendEvent`s are intended to be broadcasted to all peers, they will be pleaced among
+       /// `MessageSendEvent`s are intended to be broadcasted to all peers, they will be placed among
        /// the `MessageSendEvent`s to the specific peer they were generated under.
        fn get_and_clear_pending_msg_events(&self) -> Vec<MessageSendEvent> {
                let events = RefCell::new(Vec::new());
@@ -8220,6 +9241,7 @@ where
                                result = NotifyOption::DoPersist;
                        }
 
+                       let mut is_any_peer_connected = false;
                        let mut pending_events = Vec::new();
                        let per_peer_state = self.per_peer_state.read().unwrap();
                        for (_cp_id, peer_state_mutex) in per_peer_state.iter() {
@@ -8228,6 +9250,15 @@ where
                                if peer_state.pending_msg_events.len() > 0 {
                                        pending_events.append(&mut peer_state.pending_msg_events);
                                }
+                               if peer_state.is_connected {
+                                       is_any_peer_connected = true
+                               }
+                       }
+
+                       // Ensure that we are connected to some peers before getting broadcast messages.
+                       if is_any_peer_connected {
+                               let mut broadcast_msgs = self.pending_broadcast_messages.lock().unwrap();
+                               pending_events.append(&mut broadcast_msgs);
                        }
 
                        if !pending_events.is_empty() {
@@ -8275,9 +9306,9 @@ where
        fn filtered_block_connected(&self, header: &Header, txdata: &TransactionData, height: u32) {
                {
                        let best_block = self.best_block.read().unwrap();
-                       assert_eq!(best_block.block_hash(), header.prev_blockhash,
+                       assert_eq!(best_block.block_hash, header.prev_blockhash,
                                "Blocks must be connected in chain-order - the connected header must build on the last connected header");
-                       assert_eq!(best_block.height(), height - 1,
+                       assert_eq!(best_block.height, height - 1,
                                "Blocks must be connected in chain-order - the connected block height must be one greater than the previous height");
                }
 
@@ -8292,9 +9323,9 @@ where
                let new_height = height - 1;
                {
                        let mut best_block = self.best_block.write().unwrap();
-                       assert_eq!(best_block.block_hash(), header.block_hash(),
+                       assert_eq!(best_block.block_hash, header.block_hash(),
                                "Blocks must be disconnected in chain-order - the disconnected header must be the last connected header");
-                       assert_eq!(best_block.height(), height,
+                       assert_eq!(best_block.height, height,
                                "Blocks must be disconnected in chain-order - the disconnected block must have the correct height");
                        *best_block = BestBlock::new(header.prev_blockhash, new_height)
                }
@@ -8328,7 +9359,7 @@ where
                self.do_chain_event(Some(height), |channel| channel.transactions_confirmed(&block_hash, height, txdata, self.chain_hash, &self.node_signer, &self.default_configuration, &&WithChannelContext::from(&self.logger, &channel.context))
                        .map(|(a, b)| (a, Vec::new(), b)));
 
-               let last_best_block_height = self.best_block.read().unwrap().height();
+               let last_best_block_height = self.best_block.read().unwrap().height;
                if height < last_best_block_height {
                        let timestamp = self.highest_seen_timestamp.load(Ordering::Acquire);
                        self.do_chain_event(Some(last_best_block_height), |channel| channel.best_block_updated(last_best_block_height, timestamp as u32, self.chain_hash, &self.node_signer, &self.default_configuration, &&WithChannelContext::from(&self.logger, &channel.context)));
@@ -8432,10 +9463,14 @@ where
                                let mut peer_state_lock = peer_state_mutex.lock().unwrap();
                                let peer_state = &mut *peer_state_lock;
                                let pending_msg_events = &mut peer_state.pending_msg_events;
+
                                peer_state.channel_by_id.retain(|_, phase| {
                                        match phase {
                                                // Retain unfunded channels.
                                                ChannelPhase::UnfundedOutboundV1(_) | ChannelPhase::UnfundedInboundV1(_) => true,
+                                               // TODO(dual_funding): Combine this match arm with above.
+                                               #[cfg(dual_funding)]
+                                               ChannelPhase::UnfundedOutboundV2(_) | ChannelPhase::UnfundedInboundV2(_) => true,
                                                ChannelPhase::Funded(channel) => {
                                                        let res = f(channel);
                                                        if let Ok((channel_ready_opt, mut timed_out_pending_htlcs, announcement_sigs)) = res {
@@ -8504,7 +9539,8 @@ where
                                                                let reason_message = format!("{}", reason);
                                                                failed_channels.push(channel.context.force_shutdown(true, reason));
                                                                if let Ok(update) = self.get_channel_update_for_broadcast(&channel) {
-                                                                       pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
+                                                                       let mut pending_broadcast_messages = self.pending_broadcast_messages.lock().unwrap();
+                                                                       pending_broadcast_messages.push(events::MessageSendEvent::BroadcastChannelUpdate {
                                                                                msg: update
                                                                        });
                                                                }
@@ -8556,6 +9592,7 @@ where
                                                incoming_packet_shared_secret: htlc.forward_info.incoming_shared_secret,
                                                phantom_shared_secret: None,
                                                outpoint: htlc.prev_funding_outpoint,
+                                               channel_id: htlc.prev_channel_id,
                                                blinded_failure: htlc.forward_info.routing.blinded_failure(),
                                        });
 
@@ -8567,7 +9604,7 @@ where
                                                        HTLCFailReason::from_failure_code(0x2000 | 2),
                                                        HTLCDestination::InvalidForward { requested_forward_scid }));
                                        let logger = WithContext::from(
-                                               &self.logger, None, Some(htlc.prev_funding_outpoint.to_channel_id())
+                                               &self.logger, None, Some(htlc.prev_channel_id)
                                        );
                                        log_trace!(logger, "Timing out intercepted HTLC with requested forward scid {}", requested_forward_scid);
                                        false
@@ -8595,6 +9632,9 @@ where
        }
 
        /// Returns true if this [`ChannelManager`] needs to be persisted.
+       ///
+       /// See [`Self::get_event_or_persistence_needed_future`] for retrieving a [`Future`] that
+       /// indicates this should be checked.
        pub fn get_and_clear_needs_persistence(&self) -> bool {
                self.needs_persist_flag.swap(false, Ordering::AcqRel)
        }
@@ -8684,7 +9724,7 @@ where
        fn handle_open_channel_v2(&self, counterparty_node_id: &PublicKey, msg: &msgs::OpenChannelV2) {
                let _: Result<(), _> = handle_error!(self, Err(MsgHandleErrInternal::send_err_msg_no_close(
                        "Dual-funded channels not supported".to_owned(),
-                        msg.temporary_channel_id.clone())), *counterparty_node_id);
+                        msg.common_fields.temporary_channel_id.clone())), *counterparty_node_id);
        }
 
        fn handle_accept_channel(&self, counterparty_node_id: &PublicKey, msg: &msgs::AcceptChannel) {
@@ -8700,7 +9740,7 @@ where
        fn handle_accept_channel_v2(&self, counterparty_node_id: &PublicKey, msg: &msgs::AcceptChannelV2) {
                let _: Result<(), _> = handle_error!(self, Err(MsgHandleErrInternal::send_err_msg_no_close(
                        "Dual-funded channels not supported".to_owned(),
-                        msg.temporary_channel_id.clone())), *counterparty_node_id);
+                        msg.common_fields.temporary_channel_id.clone())), *counterparty_node_id);
        }
 
        fn handle_funding_created(&self, counterparty_node_id: &PublicKey, msg: &msgs::FundingCreated) {
@@ -8895,13 +9935,23 @@ where
                                                        }
                                                        &mut chan.context
                                                },
-                                               // Unfunded channels will always be removed.
-                                               ChannelPhase::UnfundedOutboundV1(chan) => {
-                                                       &mut chan.context
+                                               // We retain UnfundedOutboundV1 channel for some time in case
+                                               // peer unexpectedly disconnects, and intends to reconnect again.
+                                               ChannelPhase::UnfundedOutboundV1(_) => {
+                                                       return true;
                                                },
+                                               // Unfunded inbound channels will always be removed.
                                                ChannelPhase::UnfundedInboundV1(chan) => {
                                                        &mut chan.context
                                                },
+                                               #[cfg(dual_funding)]
+                                               ChannelPhase::UnfundedOutboundV2(chan) => {
+                                                       &mut chan.context
+                                               },
+                                               #[cfg(dual_funding)]
+                                               ChannelPhase::UnfundedInboundV2(chan) => {
+                                                       &mut chan.context
+                                               },
                                        };
                                        // Clean up for removal.
                                        update_maps_on_chan_removal!(self, &context);
@@ -8950,7 +10000,12 @@ where
                                                // Gossip
                                                &events::MessageSendEvent::SendChannelAnnouncement { .. } => false,
                                                &events::MessageSendEvent::BroadcastChannelAnnouncement { .. } => true,
-                                               &events::MessageSendEvent::BroadcastChannelUpdate { .. } => true,
+                                               // [`ChannelManager::pending_broadcast_events`] holds the [`BroadcastChannelUpdate`]
+                                               // This check here is to ensure exhaustivity.
+                                               &events::MessageSendEvent::BroadcastChannelUpdate { .. } => {
+                                                       debug_assert!(false, "This event shouldn't have been here");
+                                                       false
+                                               },
                                                &events::MessageSendEvent::BroadcastNodeAnnouncement { .. } => true,
                                                &events::MessageSendEvent::SendChannelUpdate { .. } => false,
                                                &events::MessageSendEvent::SendChannelRangeQuery { .. } => false,
@@ -9000,8 +10055,8 @@ where
                                                        return NotifyOption::SkipPersistNoEvents;
                                                }
                                                e.insert(Mutex::new(PeerState {
-                                                       channel_by_id: HashMap::new(),
-                                                       inbound_channel_request_by_id: HashMap::new(),
+                                                       channel_by_id: new_hash_map(),
+                                                       inbound_channel_request_by_id: new_hash_map(),
                                                        latest_features: init_msg.features.clone(),
                                                        pending_msg_events: Vec::new(),
                                                        in_flight_monitor_updates: BTreeMap::new(),
@@ -9014,7 +10069,7 @@ where
                                                let mut peer_state = e.get().lock().unwrap();
                                                peer_state.latest_features = init_msg.features.clone();
 
-                                               let best_block_height = self.best_block.read().unwrap().height();
+                                               let best_block_height = self.best_block.read().unwrap().height;
                                                if inbound_peer_limited &&
                                                        Self::unfunded_channel_count(&*peer_state, best_block_height) ==
                                                        peer_state.channel_by_id.len()
@@ -9037,15 +10092,49 @@ where
                                let peer_state = &mut *peer_state_lock;
                                let pending_msg_events = &mut peer_state.pending_msg_events;
 
-                               peer_state.channel_by_id.iter_mut().filter_map(|(_, phase)|
-                                       if let ChannelPhase::Funded(chan) = phase { Some(chan) } else { None }
-                               ).for_each(|chan| {
-                                       let logger = WithChannelContext::from(&self.logger, &chan.context);
-                                       pending_msg_events.push(events::MessageSendEvent::SendChannelReestablish {
-                                               node_id: chan.context.get_counterparty_node_id(),
-                                               msg: chan.get_channel_reestablish(&&logger),
-                                       });
-                               });
+                               for (_, phase) in peer_state.channel_by_id.iter_mut() {
+                                       match phase {
+                                               ChannelPhase::Funded(chan) => {
+                                                       let logger = WithChannelContext::from(&self.logger, &chan.context);
+                                                       pending_msg_events.push(events::MessageSendEvent::SendChannelReestablish {
+                                                               node_id: chan.context.get_counterparty_node_id(),
+                                                               msg: chan.get_channel_reestablish(&&logger),
+                                                       });
+                                               }
+
+                                               ChannelPhase::UnfundedOutboundV1(chan) => {
+                                                       pending_msg_events.push(events::MessageSendEvent::SendOpenChannel {
+                                                               node_id: chan.context.get_counterparty_node_id(),
+                                                               msg: chan.get_open_channel(self.chain_hash),
+                                                       });
+                                               }
+
+                                               // TODO(dual_funding): Combine this match arm with above once #[cfg(dual_funding)] is removed.
+                                               #[cfg(dual_funding)]
+                                               ChannelPhase::UnfundedOutboundV2(chan) => {
+                                                       pending_msg_events.push(events::MessageSendEvent::SendOpenChannelV2 {
+                                                               node_id: chan.context.get_counterparty_node_id(),
+                                                               msg: chan.get_open_channel_v2(self.chain_hash),
+                                                       });
+                                               },
+
+                                               ChannelPhase::UnfundedInboundV1(_) => {
+                                                       // Since unfunded inbound channel maps are cleared upon disconnecting a peer,
+                                                       // they are not persisted and won't be recovered after a crash.
+                                                       // Therefore, they shouldn't exist at this point.
+                                                       debug_assert!(false);
+                                               }
+
+                                               // TODO(dual_funding): Combine this match arm with above once #[cfg(dual_funding)] is removed.
+                                               #[cfg(dual_funding)]
+                                               ChannelPhase::UnfundedInboundV2(channel) => {
+                                                       // Since unfunded inbound channel maps are cleared upon disconnecting a peer,
+                                                       // they are not persisted and won't be recovered after a crash.
+                                                       // Therefore, they shouldn't exist at this point.
+                                                       debug_assert!(false);
+                                               },
+                                       }
+                               }
                        }
 
                        return NotifyOption::SkipPersistHandleEvents;
@@ -9055,8 +10144,6 @@ where
        }
 
        fn handle_error(&self, counterparty_node_id: &PublicKey, msg: &msgs::ErrorMessage) {
-               let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self);
-
                match &msg.data as &str {
                        "cannot co-op close channel w/ active htlcs"|
                        "link failed to shutdown" =>
@@ -9069,34 +10156,45 @@ where
                                // We're not going to bother handling this in a sensible way, instead simply
                                // repeating the Shutdown message on repeat until morale improves.
                                if !msg.channel_id.is_zero() {
-                                       let per_peer_state = self.per_peer_state.read().unwrap();
-                                       let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id);
-                                       if peer_state_mutex_opt.is_none() { return; }
-                                       let mut peer_state = peer_state_mutex_opt.unwrap().lock().unwrap();
-                                       if let Some(ChannelPhase::Funded(chan)) = peer_state.channel_by_id.get(&msg.channel_id) {
-                                               if let Some(msg) = chan.get_outbound_shutdown() {
-                                                       peer_state.pending_msg_events.push(events::MessageSendEvent::SendShutdown {
-                                                               node_id: *counterparty_node_id,
-                                                               msg,
-                                                       });
-                                               }
-                                               peer_state.pending_msg_events.push(events::MessageSendEvent::HandleError {
-                                                       node_id: *counterparty_node_id,
-                                                       action: msgs::ErrorAction::SendWarningMessage {
-                                                               msg: msgs::WarningMessage {
-                                                                       channel_id: msg.channel_id,
-                                                                       data: "You appear to be exhibiting LND bug 6039, we'll keep sending you shutdown messages until you handle them correctly".to_owned()
-                                                               },
-                                                               log_level: Level::Trace,
+                                       PersistenceNotifierGuard::optionally_notify(
+                                               self,
+                                               || -> NotifyOption {
+                                                       let per_peer_state = self.per_peer_state.read().unwrap();
+                                                       let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id);
+                                                       if peer_state_mutex_opt.is_none() { return NotifyOption::SkipPersistNoEvents; }
+                                                       let mut peer_state = peer_state_mutex_opt.unwrap().lock().unwrap();
+                                                       if let Some(ChannelPhase::Funded(chan)) = peer_state.channel_by_id.get(&msg.channel_id) {
+                                                               if let Some(msg) = chan.get_outbound_shutdown() {
+                                                                       peer_state.pending_msg_events.push(events::MessageSendEvent::SendShutdown {
+                                                                               node_id: *counterparty_node_id,
+                                                                               msg,
+                                                                       });
+                                                               }
+                                                               peer_state.pending_msg_events.push(events::MessageSendEvent::HandleError {
+                                                                       node_id: *counterparty_node_id,
+                                                                       action: msgs::ErrorAction::SendWarningMessage {
+                                                                               msg: msgs::WarningMessage {
+                                                                                       channel_id: msg.channel_id,
+                                                                                       data: "You appear to be exhibiting LND bug 6039, we'll keep sending you shutdown messages until you handle them correctly".to_owned()
+                                                                               },
+                                                                               log_level: Level::Trace,
+                                                                       }
+                                                               });
+                                                               // This can happen in a fairly tight loop, so we absolutely cannot trigger
+                                                               // a `ChannelManager` write here.
+                                                               return NotifyOption::SkipPersistHandleEvents;
                                                        }
-                                               });
-                                       }
+                                                       NotifyOption::SkipPersistNoEvents
+                                               }
+                                       );
                                }
                                return;
                        }
                        _ => {}
                }
 
+               let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self);
+
                if msg.channel_id.is_zero() {
                        let channel_ids: Vec<ChannelId> = {
                                let per_peer_state = self.per_peer_state.read().unwrap();
@@ -9121,14 +10219,29 @@ where
                                if peer_state_mutex_opt.is_none() { return; }
                                let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap();
                                let peer_state = &mut *peer_state_lock;
-                               if let Some(ChannelPhase::UnfundedOutboundV1(chan)) = peer_state.channel_by_id.get_mut(&msg.channel_id) {
-                                       if let Ok(msg) = chan.maybe_handle_error_without_close(self.chain_hash, &self.fee_estimator) {
-                                               peer_state.pending_msg_events.push(events::MessageSendEvent::SendOpenChannel {
-                                                       node_id: *counterparty_node_id,
-                                                       msg,
-                                               });
-                                               return;
-                                       }
+                               match peer_state.channel_by_id.get_mut(&msg.channel_id) {
+                                       Some(ChannelPhase::UnfundedOutboundV1(ref mut chan)) => {
+                                               if let Ok(msg) = chan.maybe_handle_error_without_close(self.chain_hash, &self.fee_estimator) {
+                                                       peer_state.pending_msg_events.push(events::MessageSendEvent::SendOpenChannel {
+                                                               node_id: *counterparty_node_id,
+                                                               msg,
+                                                       });
+                                                       return;
+                                               }
+                                       },
+                                       #[cfg(dual_funding)]
+                                       Some(ChannelPhase::UnfundedOutboundV2(ref mut chan)) => {
+                                               if let Ok(msg) = chan.maybe_handle_error_without_close(self.chain_hash, &self.fee_estimator) {
+                                                       peer_state.pending_msg_events.push(events::MessageSendEvent::SendOpenChannelV2 {
+                                                               node_id: *counterparty_node_id,
+                                                               msg,
+                                                       });
+                                                       return;
+                                               }
+                                       },
+                                       None | Some(ChannelPhase::UnfundedInboundV1(_) | ChannelPhase::Funded(_)) => (),
+                                       #[cfg(dual_funding)]
+                                       Some(ChannelPhase::UnfundedInboundV2(_)) => (),
                                }
                        }
 
@@ -9247,8 +10360,11 @@ where
                                        },
                                };
 
+                               let payment_context = PaymentContext::Bolt12Offer(Bolt12OfferContext {
+                                       offer_id: invoice_request.offer_id,
+                               });
                                let payment_paths = match self.create_blinded_payment_paths(
-                                       amount_msats, payment_secret
+                                       amount_msats, payment_secret, payment_context
                                ) {
                                        Ok(payment_paths) => payment_paths,
                                        Err(()) => {
@@ -9262,7 +10378,7 @@ where
                                        self.highest_seen_timestamp.load(Ordering::Acquire) as u64
                                );
 
-                               if invoice_request.keys.is_some() {
+                               let response = if invoice_request.keys.is_some() {
                                        #[cfg(feature = "std")]
                                        let builder = invoice_request.respond_using_derived_keys(
                                                payment_paths, payment_hash
@@ -9271,10 +10387,10 @@ where
                                        let builder = invoice_request.respond_using_derived_keys_no_std(
                                                payment_paths, payment_hash, created_at
                                        );
-                                       match builder.and_then(|b| b.allow_mpp().build_and_sign(secp_ctx)) {
-                                               Ok(invoice) => Some(OffersMessage::Invoice(invoice)),
-                                               Err(error) => Some(OffersMessage::InvoiceError(error.into())),
-                                       }
+                                       builder
+                                               .map(InvoiceBuilder::<DerivedSigningPubkey>::from)
+                                               .and_then(|builder| builder.allow_mpp().build_and_sign(secp_ctx))
+                                               .map_err(InvoiceError::from)
                                } else {
                                        #[cfg(feature = "std")]
                                        let builder = invoice_request.respond_with(payment_paths, payment_hash);
@@ -9282,40 +10398,46 @@ where
                                        let builder = invoice_request.respond_with_no_std(
                                                payment_paths, payment_hash, created_at
                                        );
-                                       let response = builder.and_then(|builder| builder.allow_mpp().build())
-                                               .map_err(|e| OffersMessage::InvoiceError(e.into()))
-                                               .and_then(|invoice|
-                                                       match invoice.sign(|invoice| self.node_signer.sign_bolt12_invoice(invoice)) {
-                                                               Ok(invoice) => Ok(OffersMessage::Invoice(invoice)),
-                                                               Err(SignError::Signing(())) => Err(OffersMessage::InvoiceError(
-                                                                               InvoiceError::from_string("Failed signing invoice".to_string())
-                                                               )),
-                                                               Err(SignError::Verification(_)) => Err(OffersMessage::InvoiceError(
-                                                                               InvoiceError::from_string("Failed invoice signature verification".to_string())
-                                                               )),
-                                                       });
-                                       match response {
-                                               Ok(invoice) => Some(invoice),
-                                               Err(error) => Some(error),
-                                       }
+                                       builder
+                                               .map(InvoiceBuilder::<ExplicitSigningPubkey>::from)
+                                               .and_then(|builder| builder.allow_mpp().build())
+                                               .map_err(InvoiceError::from)
+                                               .and_then(|invoice| {
+                                                       #[cfg(c_bindings)]
+                                                       let mut invoice = invoice;
+                                                       invoice
+                                                               .sign(|invoice: &UnsignedBolt12Invoice|
+                                                                       self.node_signer.sign_bolt12_invoice(invoice)
+                                                               )
+                                                               .map_err(InvoiceError::from)
+                                               })
+                               };
+
+                               match response {
+                                       Ok(invoice) => Some(OffersMessage::Invoice(invoice)),
+                                       Err(error) => Some(OffersMessage::InvoiceError(error.into())),
                                }
                        },
                        OffersMessage::Invoice(invoice) => {
-                               match invoice.verify(expanded_key, secp_ctx) {
-                                       Err(()) => {
-                                               Some(OffersMessage::InvoiceError(InvoiceError::from_string("Unrecognized invoice".to_owned())))
-                                       },
-                                       Ok(_) if invoice.invoice_features().requires_unknown_bits_from(&self.bolt12_invoice_features()) => {
-                                               Some(OffersMessage::InvoiceError(Bolt12SemanticError::UnknownRequiredFeatures.into()))
-                                       },
-                                       Ok(payment_id) => {
-                                               if let Err(e) = self.send_payment_for_bolt12_invoice(&invoice, payment_id) {
-                                                       log_trace!(self.logger, "Failed paying invoice: {:?}", e);
-                                                       Some(OffersMessage::InvoiceError(InvoiceError::from_string(format!("{:?}", e))))
+                               let response = invoice
+                                       .verify(expanded_key, secp_ctx)
+                                       .map_err(|()| InvoiceError::from_string("Unrecognized invoice".to_owned()))
+                                       .and_then(|payment_id| {
+                                               let features = self.bolt12_invoice_features();
+                                               if invoice.invoice_features().requires_unknown_bits_from(&features) {
+                                                       Err(InvoiceError::from(Bolt12SemanticError::UnknownRequiredFeatures))
                                                } else {
-                                                       None
+                                                       self.send_payment_for_bolt12_invoice(&invoice, payment_id)
+                                                               .map_err(|e| {
+                                                                       log_trace!(self.logger, "Failed paying invoice: {:?}", e);
+                                                                       InvoiceError::from_string(format!("{:?}", e))
+                                                               })
                                                }
-                                       },
+                                       });
+
+                               match response {
+                                       Ok(()) => None,
+                                       Err(e) => Some(OffersMessage::InvoiceError(e)),
                                }
                        },
                        OffersMessage::InvoiceError(invoice_error) => {
@@ -9330,6 +10452,23 @@ where
        }
 }
 
+impl<M: Deref, T: Deref, ES: Deref, NS: Deref, SP: Deref, F: Deref, R: Deref, L: Deref>
+NodeIdLookUp for ChannelManager<M, T, ES, NS, SP, F, R, L>
+where
+       M::Target: chain::Watch<<SP::Target as SignerProvider>::EcdsaSigner>,
+       T::Target: BroadcasterInterface,
+       ES::Target: EntropySource,
+       NS::Target: NodeSigner,
+       SP::Target: SignerProvider,
+       F::Target: FeeEstimator,
+       R::Target: Router,
+       L::Target: Logger,
+{
+       fn next_node_id(&self, short_channel_id: u64) -> Option<PublicKey> {
+               self.short_to_chan_info.read().unwrap().get(&short_channel_id).map(|(pubkey, _)| *pubkey)
+       }
+}
+
 /// Fetches the set of [`NodeFeatures`] flags that are provided by or required by
 /// [`ChannelManager`].
 pub(crate) fn provided_node_features(config: &UserConfig) -> NodeFeatures {
@@ -9444,6 +10583,8 @@ impl Writeable for ChannelDetails {
                        (37, user_channel_id_high_opt, option),
                        (39, self.feerate_sat_per_1000_weight, option),
                        (41, self.channel_shutdown_state, option),
+                       (43, self.pending_inbound_htlcs, optional_vec),
+                       (45, self.pending_outbound_htlcs, optional_vec),
                });
                Ok(())
        }
@@ -9482,6 +10623,8 @@ impl Readable for ChannelDetails {
                        (37, user_channel_id_high_opt, option),
                        (39, feerate_sat_per_1000_weight, option),
                        (41, channel_shutdown_state, option),
+                       (43, pending_inbound_htlcs, optional_vec),
+                       (45, pending_outbound_htlcs, optional_vec),
                });
 
                // `user_channel_id` used to be a single u64 value. In order to remain backwards compatible with
@@ -9518,6 +10661,8 @@ impl Readable for ChannelDetails {
                        inbound_htlc_maximum_msat,
                        feerate_sat_per_1000_weight,
                        channel_shutdown_state,
+                       pending_inbound_htlcs: pending_inbound_htlcs.unwrap_or(Vec::new()),
+                       pending_outbound_htlcs: pending_outbound_htlcs.unwrap_or(Vec::new()),
                })
        }
 }
@@ -9546,9 +10691,11 @@ impl_writeable_tlv_based_enum!(PendingHTLCRouting,
                (3, payment_metadata, option),
                (5, custom_tlvs, optional_vec),
                (7, requires_blinded_error, (default_value, false)),
+               (9, payment_context, option),
        },
        (2, ReceiveKeysend) => {
                (0, payment_preimage, required),
+               (1, requires_blinded_error, (default_value, false)),
                (2, incoming_cltv_expiry, required),
                (3, payment_metadata, option),
                (4, payment_data, option), // Added in 0.0.116
@@ -9652,13 +10799,18 @@ impl_writeable_tlv_based!(HTLCPreviousHopData, {
        (4, htlc_id, required),
        (6, incoming_packet_shared_secret, required),
        (7, user_channel_id, option),
+       // Note that by the time we get past the required read for type 2 above, outpoint will be
+       // filled in, so we can safely unwrap it here.
+       (9, channel_id, (default_value, ChannelId::v1_from_funding_outpoint(outpoint.0.unwrap()))),
 });
 
 impl Writeable for ClaimableHTLC {
        fn write<W: Writer>(&self, writer: &mut W) -> Result<(), io::Error> {
-               let (payment_data, keysend_preimage) = match &self.onion_payload {
-                       OnionPayload::Invoice { _legacy_hop_data } => (_legacy_hop_data.as_ref(), None),
-                       OnionPayload::Spontaneous(preimage) => (None, Some(preimage)),
+               let (payment_data, keysend_preimage, payment_context) = match &self.onion_payload {
+                       OnionPayload::Invoice { _legacy_hop_data, payment_context } => {
+                               (_legacy_hop_data.as_ref(), None, payment_context.as_ref())
+                       },
+                       OnionPayload::Spontaneous(preimage) => (None, Some(preimage), None),
                };
                write_tlv_fields!(writer, {
                        (0, self.prev_hop, required),
@@ -9670,6 +10822,7 @@ impl Writeable for ClaimableHTLC {
                        (6, self.cltv_expiry, required),
                        (8, keysend_preimage, option),
                        (10, self.counterparty_skimmed_fee_msat, option),
+                       (11, payment_context, option),
                });
                Ok(())
        }
@@ -9687,6 +10840,7 @@ impl Readable for ClaimableHTLC {
                        (6, cltv_expiry, required),
                        (8, keysend_preimage, option),
                        (10, counterparty_skimmed_fee_msat, option),
+                       (11, payment_context, option),
                });
                let payment_data: Option<msgs::FinalOnionHopData> = payment_data_opt;
                let value = value_ser.0.unwrap();
@@ -9707,7 +10861,7 @@ impl Readable for ClaimableHTLC {
                                        }
                                        total_msat = Some(payment_data.as_ref().unwrap().total_msat);
                                }
-                               OnionPayload::Invoice { _legacy_hop_data: payment_data }
+                               OnionPayload::Invoice { _legacy_hop_data: payment_data, payment_context }
                        },
                };
                Ok(Self {
@@ -9803,6 +10957,9 @@ impl_writeable_tlv_based!(PendingAddHTLCInfo, {
        (2, prev_short_channel_id, required),
        (4, prev_htlc_id, required),
        (6, prev_funding_outpoint, required),
+       // Note that by the time we get past the required read for type 6 above, prev_funding_outpoint will be
+       // filled in, so we can safely unwrap it here.
+       (7, prev_channel_id, (default_value, ChannelId::v1_from_funding_outpoint(prev_funding_outpoint.0.unwrap()))),
 });
 
 impl Writeable for HTLCForwardInfo {
@@ -9895,8 +11052,8 @@ where
                self.chain_hash.write(writer)?;
                {
                        let best_block = self.best_block.read().unwrap();
-                       best_block.height().write(writer)?;
-                       best_block.block_hash().write(writer)?;
+                       best_block.height.write(writer)?;
+                       best_block.block_hash.write(writer)?;
                }
 
                let mut serializable_peer_count: u64 = 0;
@@ -9942,6 +11099,12 @@ where
                        }
                }
 
+               let mut decode_update_add_htlcs_opt = None;
+               let decode_update_add_htlcs = self.decode_update_add_htlcs.lock().unwrap();
+               if !decode_update_add_htlcs.is_empty() {
+                       decode_update_add_htlcs_opt = Some(decode_update_add_htlcs);
+               }
+
                let per_peer_state = self.per_peer_state.write().unwrap();
 
                let pending_inbound_payments = self.pending_inbound_payments.lock().unwrap();
@@ -10046,7 +11209,7 @@ where
                }
 
                // Encode without retry info for 0.0.101 compatibility.
-               let mut pending_outbound_payments_no_retry: HashMap<PaymentId, HashSet<[u8; 32]>> = HashMap::new();
+               let mut pending_outbound_payments_no_retry: HashMap<PaymentId, HashSet<[u8; 32]>> = new_hash_map();
                for (id, outbound) in pending_outbound_payments.iter() {
                        match outbound {
                                PendingOutboundPayment::Legacy { session_privs } |
@@ -10074,7 +11237,7 @@ where
                for ((counterparty_id, _), peer_state) in per_peer_state.iter().zip(peer_states.iter()) {
                        for (funding_outpoint, updates) in peer_state.in_flight_monitor_updates.iter() {
                                if !updates.is_empty() {
-                                       if in_flight_monitor_updates.is_none() { in_flight_monitor_updates = Some(HashMap::new()); }
+                                       if in_flight_monitor_updates.is_none() { in_flight_monitor_updates = Some(new_hash_map()); }
                                        in_flight_monitor_updates.as_mut().unwrap().insert((counterparty_id, funding_outpoint), updates);
                                }
                        }
@@ -10093,6 +11256,7 @@ where
                        (10, in_flight_monitor_updates, option),
                        (11, self.probing_cookie_secret, required),
                        (13, htlc_onion_fields, optional_vec),
+                       (14, decode_update_add_htlcs_opt, option),
                });
 
                Ok(())
@@ -10263,7 +11427,9 @@ where
                        mut channel_monitors: Vec<&'a mut ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>>) -> Self {
                Self {
                        entropy_source, node_signer, signer_provider, fee_estimator, chain_monitor, tx_broadcaster, router, logger, default_config,
-                       channel_monitors: channel_monitors.drain(..).map(|monitor| { (monitor.get_funding_txo().0, monitor) }).collect()
+                       channel_monitors: hash_map_from_iter(
+                               channel_monitors.drain(..).map(|monitor| { (monitor.get_funding_txo().0, monitor) })
+                       ),
                }
        }
 }
@@ -10310,18 +11476,20 @@ where
                let mut failed_htlcs = Vec::new();
 
                let channel_count: u64 = Readable::read(reader)?;
-               let mut funding_txo_set = HashSet::with_capacity(cmp::min(channel_count as usize, 128));
-               let mut funded_peer_channels: HashMap<PublicKey, HashMap<ChannelId, ChannelPhase<SP>>> = HashMap::with_capacity(cmp::min(channel_count as usize, 128));
-               let mut outpoint_to_peer = HashMap::with_capacity(cmp::min(channel_count as usize, 128));
-               let mut short_to_chan_info = HashMap::with_capacity(cmp::min(channel_count as usize, 128));
+               let mut funding_txo_set = hash_set_with_capacity(cmp::min(channel_count as usize, 128));
+               let mut funded_peer_channels: HashMap<PublicKey, HashMap<ChannelId, ChannelPhase<SP>>> = hash_map_with_capacity(cmp::min(channel_count as usize, 128));
+               let mut outpoint_to_peer = hash_map_with_capacity(cmp::min(channel_count as usize, 128));
+               let mut short_to_chan_info = hash_map_with_capacity(cmp::min(channel_count as usize, 128));
                let mut channel_closures = VecDeque::new();
                let mut close_background_events = Vec::new();
+               let mut funding_txo_to_channel_id = hash_map_with_capacity(channel_count as usize);
                for _ in 0..channel_count {
                        let mut channel: Channel<SP> = Channel::read(reader, (
                                &args.entropy_source, &args.signer_provider, best_block_height, &provided_channel_type_features(&args.default_config)
                        ))?;
                        let logger = WithChannelContext::from(&args.logger, &channel.context);
                        let funding_txo = channel.context.get_funding_txo().ok_or(DecodeError::InvalidValue)?;
+                       funding_txo_to_channel_id.insert(funding_txo, channel.context.channel_id());
                        funding_txo_set.insert(funding_txo.clone());
                        if let Some(ref mut monitor) = args.channel_monitors.get_mut(&funding_txo) {
                                if channel.get_cur_holder_commitment_transaction_number() > monitor.get_cur_holder_commitment_number() ||
@@ -10351,9 +11519,9 @@ where
                                        if shutdown_result.unbroadcasted_batch_funding_txid.is_some() {
                                                return Err(DecodeError::InvalidValue);
                                        }
-                                       if let Some((counterparty_node_id, funding_txo, update)) = shutdown_result.monitor_update {
+                                       if let Some((counterparty_node_id, funding_txo, channel_id, update)) = shutdown_result.monitor_update {
                                                close_background_events.push(BackgroundEvent::MonitorUpdateRegeneratedOnStartup {
-                                                       counterparty_node_id, funding_txo, update
+                                                       counterparty_node_id, funding_txo, channel_id, update
                                                });
                                        }
                                        failed_htlcs.append(&mut shutdown_result.dropped_outbound_htlcs);
@@ -10400,7 +11568,7 @@ where
                                                        by_id_map.insert(channel.context.channel_id(), ChannelPhase::Funded(channel));
                                                },
                                                hash_map::Entry::Vacant(entry) => {
-                                                       let mut by_id_map = HashMap::new();
+                                                       let mut by_id_map = new_hash_map();
                                                        by_id_map.insert(channel.context.channel_id(), ChannelPhase::Funded(channel));
                                                        entry.insert(by_id_map);
                                                }
@@ -10432,20 +11600,22 @@ where
                for (funding_txo, monitor) in args.channel_monitors.iter() {
                        if !funding_txo_set.contains(funding_txo) {
                                let logger = WithChannelMonitor::from(&args.logger, monitor);
+                               let channel_id = monitor.channel_id();
                                log_info!(logger, "Queueing monitor update to ensure missing channel {} is force closed",
-                                       &funding_txo.to_channel_id());
+                                       &channel_id);
                                let monitor_update = ChannelMonitorUpdate {
                                        update_id: CLOSED_CHANNEL_UPDATE_ID,
                                        counterparty_node_id: None,
                                        updates: vec![ChannelMonitorUpdateStep::ChannelForceClosed { should_broadcast: true }],
+                                       channel_id: Some(monitor.channel_id()),
                                };
-                               close_background_events.push(BackgroundEvent::ClosedMonitorUpdateRegeneratedOnStartup((*funding_txo, monitor_update)));
+                               close_background_events.push(BackgroundEvent::ClosedMonitorUpdateRegeneratedOnStartup((*funding_txo, channel_id, monitor_update)));
                        }
                }
 
                const MAX_ALLOC_SIZE: usize = 1024 * 64;
                let forward_htlcs_count: u64 = Readable::read(reader)?;
-               let mut forward_htlcs = HashMap::with_capacity(cmp::min(forward_htlcs_count as usize, 128));
+               let mut forward_htlcs = hash_map_with_capacity(cmp::min(forward_htlcs_count as usize, 128));
                for _ in 0..forward_htlcs_count {
                        let short_channel_id = Readable::read(reader)?;
                        let pending_forwards_count: u64 = Readable::read(reader)?;
@@ -10471,7 +11641,7 @@ where
                let peer_state_from_chans = |channel_by_id| {
                        PeerState {
                                channel_by_id,
-                               inbound_channel_request_by_id: HashMap::new(),
+                               inbound_channel_request_by_id: new_hash_map(),
                                latest_features: InitFeatures::empty(),
                                pending_msg_events: Vec::new(),
                                in_flight_monitor_updates: BTreeMap::new(),
@@ -10482,10 +11652,10 @@ where
                };
 
                let peer_count: u64 = Readable::read(reader)?;
-               let mut per_peer_state = HashMap::with_capacity(cmp::min(peer_count as usize, MAX_ALLOC_SIZE/mem::size_of::<(PublicKey, Mutex<PeerState<SP>>)>()));
+               let mut per_peer_state = hash_map_with_capacity(cmp::min(peer_count as usize, MAX_ALLOC_SIZE/mem::size_of::<(PublicKey, Mutex<PeerState<SP>>)>()));
                for _ in 0..peer_count {
                        let peer_pubkey = Readable::read(reader)?;
-                       let peer_chans = funded_peer_channels.remove(&peer_pubkey).unwrap_or(HashMap::new());
+                       let peer_chans = funded_peer_channels.remove(&peer_pubkey).unwrap_or(new_hash_map());
                        let mut peer_state = peer_state_from_chans(peer_chans);
                        peer_state.latest_features = Readable::read(reader)?;
                        per_peer_state.insert(peer_pubkey, Mutex::new(peer_state));
@@ -10519,7 +11689,7 @@ where
                let highest_seen_timestamp: u32 = Readable::read(reader)?;
 
                let pending_inbound_payment_count: u64 = Readable::read(reader)?;
-               let mut pending_inbound_payments: HashMap<PaymentHash, PendingInboundPayment> = HashMap::with_capacity(cmp::min(pending_inbound_payment_count as usize, MAX_ALLOC_SIZE/(3*32)));
+               let mut pending_inbound_payments: HashMap<PaymentHash, PendingInboundPayment> = hash_map_with_capacity(cmp::min(pending_inbound_payment_count as usize, MAX_ALLOC_SIZE/(3*32)));
                for _ in 0..pending_inbound_payment_count {
                        if pending_inbound_payments.insert(Readable::read(reader)?, Readable::read(reader)?).is_some() {
                                return Err(DecodeError::InvalidValue);
@@ -10528,11 +11698,11 @@ where
 
                let pending_outbound_payments_count_compat: u64 = Readable::read(reader)?;
                let mut pending_outbound_payments_compat: HashMap<PaymentId, PendingOutboundPayment> =
-                       HashMap::with_capacity(cmp::min(pending_outbound_payments_count_compat as usize, MAX_ALLOC_SIZE/32));
+                       hash_map_with_capacity(cmp::min(pending_outbound_payments_count_compat as usize, MAX_ALLOC_SIZE/32));
                for _ in 0..pending_outbound_payments_count_compat {
                        let session_priv = Readable::read(reader)?;
                        let payment = PendingOutboundPayment::Legacy {
-                               session_privs: [session_priv].iter().cloned().collect()
+                               session_privs: hash_set_from_iter([session_priv]),
                        };
                        if pending_outbound_payments_compat.insert(PaymentId(session_priv), payment).is_some() {
                                return Err(DecodeError::InvalidValue)
@@ -10542,16 +11712,17 @@ where
                // pending_outbound_payments_no_retry is for compatibility with 0.0.101 clients.
                let mut pending_outbound_payments_no_retry: Option<HashMap<PaymentId, HashSet<[u8; 32]>>> = None;
                let mut pending_outbound_payments = None;
-               let mut pending_intercepted_htlcs: Option<HashMap<InterceptId, PendingAddHTLCInfo>> = Some(HashMap::new());
+               let mut pending_intercepted_htlcs: Option<HashMap<InterceptId, PendingAddHTLCInfo>> = Some(new_hash_map());
                let mut received_network_pubkey: Option<PublicKey> = None;
                let mut fake_scid_rand_bytes: Option<[u8; 32]> = None;
                let mut probing_cookie_secret: Option<[u8; 32]> = None;
                let mut claimable_htlc_purposes = None;
                let mut claimable_htlc_onion_fields = None;
-               let mut pending_claiming_payments = Some(HashMap::new());
+               let mut pending_claiming_payments = Some(new_hash_map());
                let mut monitor_update_blocked_actions_per_peer: Option<Vec<(_, BTreeMap<_, Vec<_>>)>> = Some(Vec::new());
                let mut events_override = None;
                let mut in_flight_monitor_updates: Option<HashMap<(PublicKey, OutPoint), Vec<ChannelMonitorUpdate>>> = None;
+               let mut decode_update_add_htlcs: Option<HashMap<u64, Vec<msgs::UpdateAddHTLC>>> = None;
                read_tlv_fields!(reader, {
                        (1, pending_outbound_payments_no_retry, option),
                        (2, pending_intercepted_htlcs, option),
@@ -10565,7 +11736,9 @@ where
                        (10, in_flight_monitor_updates, option),
                        (11, probing_cookie_secret, option),
                        (13, claimable_htlc_onion_fields, optional_vec),
+                       (14, decode_update_add_htlcs, option),
                });
+               let mut decode_update_add_htlcs = decode_update_add_htlcs.unwrap_or_else(|| new_hash_map());
                if fake_scid_rand_bytes.is_none() {
                        fake_scid_rand_bytes = Some(args.entropy_source.get_secure_random_bytes());
                }
@@ -10585,7 +11758,7 @@ where
                if pending_outbound_payments.is_none() && pending_outbound_payments_no_retry.is_none() {
                        pending_outbound_payments = Some(pending_outbound_payments_compat);
                } else if pending_outbound_payments.is_none() {
-                       let mut outbounds = HashMap::new();
+                       let mut outbounds = new_hash_map();
                        for (id, session_privs) in pending_outbound_payments_no_retry.unwrap().drain() {
                                outbounds.insert(id, PendingOutboundPayment::Legacy { session_privs });
                        }
@@ -10616,12 +11789,13 @@ where
                                $chan_in_flight_upds.retain(|upd| upd.update_id > $monitor.get_latest_update_id());
                                for update in $chan_in_flight_upds.iter() {
                                        log_trace!($logger, "Replaying ChannelMonitorUpdate {} for {}channel {}",
-                                               update.update_id, $channel_info_log, &$funding_txo.to_channel_id());
+                                               update.update_id, $channel_info_log, &$monitor.channel_id());
                                        max_in_flight_update_id = cmp::max(max_in_flight_update_id, update.update_id);
                                        pending_background_events.push(
                                                BackgroundEvent::MonitorUpdateRegeneratedOnStartup {
                                                        counterparty_node_id: $counterparty_node_id,
                                                        funding_txo: $funding_txo,
+                                                       channel_id: $monitor.channel_id(),
                                                        update: update.clone(),
                                                });
                                }
@@ -10632,7 +11806,7 @@ where
                                        pending_background_events.push(
                                                BackgroundEvent::MonitorUpdatesComplete {
                                                        counterparty_node_id: $counterparty_node_id,
-                                                       channel_id: $funding_txo.to_channel_id(),
+                                                       channel_id: $monitor.channel_id(),
                                                });
                                }
                                if $peer_state.in_flight_monitor_updates.insert($funding_txo, $chan_in_flight_upds).is_some() {
@@ -10664,7 +11838,7 @@ where
                                                }
                                        }
                                        if chan.get_latest_unblocked_monitor_update_id() > max_in_flight_update_id {
-                                               // If the channel is ahead of the monitor, return InvalidValue:
+                                               // If the channel is ahead of the monitor, return DangerousValue:
                                                log_error!(logger, "A ChannelMonitor is stale compared to the current ChannelManager! This indicates a potentially-critical violation of the chain::Watch API!");
                                                log_error!(logger, " The ChannelMonitor for channel {} is at update_id {} with update_id through {} in-flight",
                                                        chan.context.channel_id(), monitor.get_latest_update_id(), max_in_flight_update_id);
@@ -10673,7 +11847,7 @@ where
                                                log_error!(logger, " client applications must ensure that ChannelMonitor data is always available and the latest to avoid funds loss!");
                                                log_error!(logger, " Without the latest ChannelMonitor we cannot continue without risking funds.");
                                                log_error!(logger, " Please ensure the chain::Watch API requirements are met and file a bug report at https://github.com/lightningdevkit/rust-lightning");
-                                               return Err(DecodeError::InvalidValue);
+                                               return Err(DecodeError::DangerousValue);
                                        }
                                } else {
                                        // We shouldn't have persisted (or read) any unfunded channel types so none should have been
@@ -10686,21 +11860,22 @@ where
 
                if let Some(in_flight_upds) = in_flight_monitor_updates {
                        for ((counterparty_id, funding_txo), mut chan_in_flight_updates) in in_flight_upds {
-                               let logger = WithContext::from(&args.logger, Some(counterparty_id), Some(funding_txo.to_channel_id()));
+                               let channel_id = funding_txo_to_channel_id.get(&funding_txo).copied();
+                               let logger = WithContext::from(&args.logger, Some(counterparty_id), channel_id);
                                if let Some(monitor) = args.channel_monitors.get(&funding_txo) {
                                        // Now that we've removed all the in-flight monitor updates for channels that are
                                        // still open, we need to replay any monitor updates that are for closed channels,
                                        // creating the neccessary peer_state entries as we go.
                                        let peer_state_mutex = per_peer_state.entry(counterparty_id).or_insert_with(|| {
-                                               Mutex::new(peer_state_from_chans(HashMap::new()))
+                                               Mutex::new(peer_state_from_chans(new_hash_map()))
                                        });
                                        let mut peer_state = peer_state_mutex.lock().unwrap();
                                        handle_in_flight_updates!(counterparty_id, chan_in_flight_updates,
                                                funding_txo, monitor, peer_state, logger, "closed ");
                                } else {
                                        log_error!(logger, "A ChannelMonitor is missing even though we have in-flight updates for it! This indicates a potentially-critical violation of the chain::Watch API!");
-                                       log_error!(logger, " The ChannelMonitor for channel {} is missing.",
-                                               &funding_txo.to_channel_id());
+                                       log_error!(logger, " The ChannelMonitor for channel {} is missing.", if let Some(channel_id) =
+                                               channel_id { channel_id.to_string() } else { format!("with outpoint {}", funding_txo) } );
                                        log_error!(logger, " The chain::Watch API *requires* that monitors are persisted durably before returning,");
                                        log_error!(logger, " client applications must ensure that ChannelMonitor data is always available and the latest to avoid funds loss!");
                                        log_error!(logger, " Without the latest ChannelMonitor we cannot continue without risking funds.");
@@ -10753,7 +11928,7 @@ where
                                                                                retry_strategy: None,
                                                                                attempts: PaymentAttempts::new(),
                                                                                payment_params: None,
-                                                                               session_privs: [session_priv_bytes].iter().map(|a| *a).collect(),
+                                                                               session_privs: hash_set_from_iter([session_priv_bytes]),
                                                                                payment_hash: htlc.payment_hash,
                                                                                payment_secret: None, // only used for retries, and we'll never retry on startup
                                                                                payment_metadata: None, // only used for retries, and we'll never retry on startup
@@ -10783,12 +11958,24 @@ where
                                                                // still have an entry for this HTLC in `forward_htlcs` or
                                                                // `pending_intercepted_htlcs`, we were apparently not persisted after
                                                                // the monitor was when forwarding the payment.
+                                                               decode_update_add_htlcs.retain(|scid, update_add_htlcs| {
+                                                                       update_add_htlcs.retain(|update_add_htlc| {
+                                                                               let matches = *scid == prev_hop_data.short_channel_id &&
+                                                                                       update_add_htlc.htlc_id == prev_hop_data.htlc_id;
+                                                                               if matches {
+                                                                                       log_info!(logger, "Removing pending to-decode HTLC with hash {} as it was forwarded to the closed channel {}",
+                                                                                               &htlc.payment_hash, &monitor.channel_id());
+                                                                               }
+                                                                               !matches
+                                                                       });
+                                                                       !update_add_htlcs.is_empty()
+                                                               });
                                                                forward_htlcs.retain(|_, forwards| {
                                                                        forwards.retain(|forward| {
                                                                                if let HTLCForwardInfo::AddHTLC(htlc_info) = forward {
                                                                                        if pending_forward_matches_htlc(&htlc_info) {
                                                                                                log_info!(logger, "Removing pending to-forward HTLC with hash {} as it was forwarded to the closed channel {}",
-                                                                                                       &htlc.payment_hash, &monitor.get_funding_txo().0.to_channel_id());
+                                                                                                       &htlc.payment_hash, &monitor.channel_id());
                                                                                                false
                                                                                        } else { true }
                                                                                } else { true }
@@ -10798,7 +11985,7 @@ where
                                                                pending_intercepted_htlcs.as_mut().unwrap().retain(|intercepted_id, htlc_info| {
                                                                        if pending_forward_matches_htlc(&htlc_info) {
                                                                                log_info!(logger, "Removing pending intercepted HTLC with hash {} as it was forwarded to the closed channel {}",
-                                                                                       &htlc.payment_hash, &monitor.get_funding_txo().0.to_channel_id());
+                                                                                       &htlc.payment_hash, &monitor.channel_id());
                                                                                pending_events_read.retain(|(event, _)| {
                                                                                        if let Event::HTLCIntercepted { intercept_id: ev_id, .. } = event {
                                                                                                intercepted_id != ev_id
@@ -10822,6 +12009,7 @@ where
                                                                        let compl_action =
                                                                                EventCompletionAction::ReleaseRAAChannelMonitorUpdate {
                                                                                        channel_funding_outpoint: monitor.get_funding_txo().0,
+                                                                                       channel_id: monitor.channel_id(),
                                                                                        counterparty_node_id: path.hops[0].pubkey,
                                                                                };
                                                                        pending_outbounds.claim_htlc(payment_id, preimage, session_priv,
@@ -10847,7 +12035,7 @@ where
                                                                        // channel_id -> peer map entry).
                                                                        counterparty_opt.is_none(),
                                                                        counterparty_opt.cloned().or(monitor.get_counterparty_node_id()),
-                                                                       monitor.get_funding_txo().0))
+                                                                       monitor.get_funding_txo().0, monitor.channel_id()))
                                                        } else { None }
                                                } else {
                                                        // If it was an outbound payment, we've handled it above - if a preimage
@@ -10863,7 +12051,7 @@ where
                        }
                }
 
-               if !forward_htlcs.is_empty() || pending_outbounds.needs_abandon() {
+               if !forward_htlcs.is_empty() || !decode_update_add_htlcs.is_empty() || pending_outbounds.needs_abandon() {
                        // If we have pending HTLCs to forward, assume we either dropped a
                        // `PendingHTLCsForwardable` or the user received it but never processed it as they
                        // shut down before the timer hit. Either way, set the time_forwardable to a small
@@ -10877,7 +12065,7 @@ where
                let inbound_pmt_key_material = args.node_signer.get_inbound_payment_key_material();
                let expanded_inbound_key = inbound_payment::ExpandedKey::new(&inbound_pmt_key_material);
 
-               let mut claimable_payments = HashMap::with_capacity(claimable_htlcs_list.len());
+               let mut claimable_payments = hash_map_with_capacity(claimable_htlcs_list.len());
                if let Some(purposes) = claimable_htlc_purposes {
                        if purposes.len() != claimable_htlcs_list.len() {
                                return Err(DecodeError::InvalidValue);
@@ -10910,9 +12098,9 @@ where
                                        return Err(DecodeError::InvalidValue);
                                }
                                let purpose = match &htlcs[0].onion_payload {
-                                       OnionPayload::Invoice { _legacy_hop_data } => {
+                                       OnionPayload::Invoice { _legacy_hop_data, payment_context: _ } => {
                                                if let Some(hop_data) = _legacy_hop_data {
-                                                       events::PaymentPurpose::InvoicePayment {
+                                                       events::PaymentPurpose::Bolt11InvoicePayment {
                                                                payment_preimage: match pending_inbound_payments.get(&payment_hash) {
                                                                        Some(inbound_payment) => inbound_payment.payment_preimage,
                                                                        None => match inbound_payment::verify(payment_hash, &hop_data, 0, &expanded_inbound_key, &args.logger) {
@@ -10950,7 +12138,7 @@ where
                        }
                }
 
-               let mut outbound_scid_aliases = HashSet::new();
+               let mut outbound_scid_aliases = new_hash_set();
                for (_peer_node_id, peer_state_mutex) in per_peer_state.iter_mut() {
                        let mut peer_state_lock = peer_state_mutex.lock().unwrap();
                        let peer_state = &mut *peer_state_lock;
@@ -11020,7 +12208,7 @@ where
                                                // this channel as well. On the flip side, there's no harm in restarting
                                                // without the new monitor persisted - we'll end up right back here on
                                                // restart.
-                                               let previous_channel_id = claimable_htlc.prev_hop.outpoint.to_channel_id();
+                                               let previous_channel_id = claimable_htlc.prev_hop.channel_id;
                                                if let Some(peer_node_id) = outpoint_to_peer.get(&claimable_htlc.prev_hop.outpoint) {
                                                        let peer_state_mutex = per_peer_state.get(peer_node_id).unwrap();
                                                        let mut peer_state_lock = peer_state_mutex.lock().unwrap();
@@ -11053,14 +12241,14 @@ where
                                        for action in actions.iter() {
                                                if let MonitorUpdateCompletionAction::EmitEventAndFreeOtherChannel {
                                                        downstream_counterparty_and_funding_outpoint:
-                                                               Some((blocked_node_id, blocked_channel_outpoint, blocking_action)), ..
+                                                               Some((blocked_node_id, _blocked_channel_outpoint, blocked_channel_id, blocking_action)), ..
                                                } = action {
-                                                       if let Some(blocked_peer_state) = per_peer_state.get(&blocked_node_id) {
+                                                       if let Some(blocked_peer_state) = per_peer_state.get(blocked_node_id) {
                                                                log_trace!(logger,
                                                                        "Holding the next revoke_and_ack from {} until the preimage is durably persisted in the inbound edge's ChannelMonitor",
-                                                                       blocked_channel_outpoint.to_channel_id());
+                                                                       blocked_channel_id);
                                                                blocked_peer_state.lock().unwrap().actions_blocking_raa_monitor_updates
-                                                                       .entry(blocked_channel_outpoint.to_channel_id())
+                                                                       .entry(*blocked_channel_id)
                                                                        .or_insert_with(Vec::new).push(blocking_action.clone());
                                                        } else {
                                                                // If the channel we were blocking has closed, we don't need to
@@ -11097,6 +12285,7 @@ where
                        pending_intercepted_htlcs: Mutex::new(pending_intercepted_htlcs.unwrap()),
 
                        forward_htlcs: Mutex::new(forward_htlcs),
+                       decode_update_add_htlcs: Mutex::new(decode_update_add_htlcs),
                        claimable_payments: Mutex::new(ClaimablePayments { claimable_payments, pending_claiming_payments: pending_claiming_payments.unwrap() }),
                        outbound_scid_aliases: Mutex::new(outbound_scid_aliases),
                        outpoint_to_peer: Mutex::new(outpoint_to_peer),
@@ -11125,6 +12314,8 @@ where
 
                        pending_offers_messages: Mutex::new(Vec::new()),
 
+                       pending_broadcast_messages: Mutex::new(Vec::new()),
+
                        entropy_source: args.entropy_source,
                        node_signer: args.node_signer,
                        signer_provider: args.signer_provider,
@@ -11140,12 +12331,14 @@ where
                        channel_manager.fail_htlc_backwards_internal(&source, &payment_hash, &reason, receiver);
                }
 
-               for (source, preimage, downstream_value, downstream_closed, downstream_node_id, downstream_funding) in pending_claims_to_replay {
+               for (source, preimage, downstream_value, downstream_closed, downstream_node_id, downstream_funding, downstream_channel_id) in pending_claims_to_replay {
                        // We use `downstream_closed` in place of `from_onchain` here just as a guess - we
                        // don't remember in the `ChannelMonitor` where we got a preimage from, but if the
                        // channel is closed we just assume that it probably came from an on-chain claim.
-                       channel_manager.claim_funds_internal(source, preimage, Some(downstream_value),
-                               downstream_closed, true, downstream_node_id, downstream_funding);
+                       channel_manager.claim_funds_internal(source, preimage, Some(downstream_value), None,
+                               downstream_closed, true, downstream_node_id, downstream_funding,
+                               downstream_channel_id, None
+                       );
                }
 
                //TODO: Broadcast channel update for closed channels, but only after we've made a
@@ -11654,6 +12847,61 @@ mod tests {
                }
        }
 
+       #[test]
+       fn test_channel_update_cached() {
+               let chanmon_cfgs = create_chanmon_cfgs(3);
+               let node_cfgs = create_node_cfgs(3, &chanmon_cfgs);
+               let node_chanmgrs = create_node_chanmgrs(3, &node_cfgs, &[None, None, None]);
+               let nodes = create_network(3, &node_cfgs, &node_chanmgrs);
+
+               let chan = create_announced_chan_between_nodes(&nodes, 0, 1);
+
+               nodes[0].node.force_close_channel_with_peer(&chan.2, &nodes[1].node.get_our_node_id(), None, true).unwrap();
+               check_added_monitors!(nodes[0], 1);
+               check_closed_event!(nodes[0], 1, ClosureReason::HolderForceClosed, [nodes[1].node.get_our_node_id()], 100000);
+
+               // Confirm that the channel_update was not sent immediately to node[1] but was cached.
+               let node_1_events = nodes[1].node.get_and_clear_pending_msg_events();
+               assert_eq!(node_1_events.len(), 0);
+
+               {
+                       // Assert that ChannelUpdate message has been added to node[0] pending broadcast messages
+                       let pending_broadcast_messages= nodes[0].node.pending_broadcast_messages.lock().unwrap();
+                       assert_eq!(pending_broadcast_messages.len(), 1);
+               }
+
+               // Test that we do not retrieve the pending broadcast messages when we are not connected to any peer
+               nodes[0].node.peer_disconnected(&nodes[1].node.get_our_node_id());
+               nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id());
+
+               nodes[0].node.peer_disconnected(&nodes[2].node.get_our_node_id());
+               nodes[2].node.peer_disconnected(&nodes[0].node.get_our_node_id());
+
+               let node_0_events = nodes[0].node.get_and_clear_pending_msg_events();
+               assert_eq!(node_0_events.len(), 0);
+
+               // Now we reconnect to a peer
+               nodes[0].node.peer_connected(&nodes[2].node.get_our_node_id(), &msgs::Init {
+                       features: nodes[2].node.init_features(), networks: None, remote_network_address: None
+               }, true).unwrap();
+               nodes[2].node.peer_connected(&nodes[0].node.get_our_node_id(), &msgs::Init {
+                       features: nodes[0].node.init_features(), networks: None, remote_network_address: None
+               }, false).unwrap();
+
+               // Confirm that get_and_clear_pending_msg_events correctly captures pending broadcast messages
+               let node_0_events = nodes[0].node.get_and_clear_pending_msg_events();
+               assert_eq!(node_0_events.len(), 1);
+               match &node_0_events[0] {
+                       MessageSendEvent::BroadcastChannelUpdate { .. } => (),
+                       _ => panic!("Unexpected event"),
+               }
+               {
+                       // Assert that ChannelUpdate message has been cleared from nodes[0] pending broadcast messages
+                       let pending_broadcast_messages= nodes[0].node.pending_broadcast_messages.lock().unwrap();
+                       assert_eq!(pending_broadcast_messages.len(), 0);
+               }
+       }
+
        #[test]
        fn test_drop_disconnected_peers_when_removing_channels() {
                let chanmon_cfgs = create_chanmon_cfgs(2);
@@ -11832,8 +13080,8 @@ mod tests {
                }
                let (_nodes_1_update, _none) = get_closing_signed_broadcast!(nodes[1].node, nodes[0].node.get_our_node_id());
 
-               check_closed_event!(nodes[0], 1, ClosureReason::CooperativeClosure, [nodes[1].node.get_our_node_id()], 1000000);
-               check_closed_event!(nodes[1], 1, ClosureReason::CooperativeClosure, [nodes[0].node.get_our_node_id()], 1000000);
+               check_closed_event!(nodes[0], 1, ClosureReason::LocallyInitiatedCooperativeClosure, [nodes[1].node.get_our_node_id()], 1000000);
+               check_closed_event!(nodes[1], 1, ClosureReason::CounterpartyInitiatedCooperativeClosure, [nodes[0].node.get_our_node_id()], 1000000);
        }
 
        fn check_not_connected_to_peer_error<T>(res_err: Result<T, APIError>, expected_public_key: PublicKey) {
@@ -11965,14 +13213,15 @@ mod tests {
                                check_added_monitors!(nodes[0], 1);
                                expect_channel_pending_event(&nodes[0], &nodes[1].node.get_our_node_id());
                        }
-                       open_channel_msg.temporary_channel_id = ChannelId::temporary_from_entropy_source(&nodes[0].keys_manager);
+                       open_channel_msg.common_fields.temporary_channel_id = ChannelId::temporary_from_entropy_source(&nodes[0].keys_manager);
                }
 
                // A MAX_UNFUNDED_CHANS_PER_PEER + 1 channel will be summarily rejected
-               open_channel_msg.temporary_channel_id = ChannelId::temporary_from_entropy_source(&nodes[0].keys_manager);
+               open_channel_msg.common_fields.temporary_channel_id = ChannelId::temporary_from_entropy_source(
+                       &nodes[0].keys_manager);
                nodes[1].node.handle_open_channel(&nodes[0].node.get_our_node_id(), &open_channel_msg);
                assert_eq!(get_err_msg(&nodes[1], &nodes[0].node.get_our_node_id()).channel_id,
-                       open_channel_msg.temporary_channel_id);
+                       open_channel_msg.common_fields.temporary_channel_id);
 
                // Further, because all of our channels with nodes[0] are inbound, and none of them funded,
                // it doesn't count as a "protected" peer, i.e. it counts towards the MAX_NO_CHANNEL_PEERS
@@ -12020,11 +13269,11 @@ mod tests {
                for i in 0..super::MAX_UNFUNDED_CHANNEL_PEERS - 1 {
                        nodes[1].node.handle_open_channel(&peer_pks[i], &open_channel_msg);
                        get_event_msg!(nodes[1], MessageSendEvent::SendAcceptChannel, peer_pks[i]);
-                       open_channel_msg.temporary_channel_id = ChannelId::temporary_from_entropy_source(&nodes[0].keys_manager);
+                       open_channel_msg.common_fields.temporary_channel_id = ChannelId::temporary_from_entropy_source(&nodes[0].keys_manager);
                }
                nodes[1].node.handle_open_channel(&last_random_pk, &open_channel_msg);
                assert_eq!(get_err_msg(&nodes[1], &last_random_pk).channel_id,
-                       open_channel_msg.temporary_channel_id);
+                       open_channel_msg.common_fields.temporary_channel_id);
 
                // Of course, however, outbound channels are always allowed
                nodes[1].node.create_channel(last_random_pk, 100_000, 0, 42, None, None).unwrap();
@@ -12060,14 +13309,14 @@ mod tests {
                for _ in 0..super::MAX_UNFUNDED_CHANS_PER_PEER {
                        nodes[1].node.handle_open_channel(&nodes[0].node.get_our_node_id(), &open_channel_msg);
                        get_event_msg!(nodes[1], MessageSendEvent::SendAcceptChannel, nodes[0].node.get_our_node_id());
-                       open_channel_msg.temporary_channel_id = ChannelId::temporary_from_entropy_source(&nodes[0].keys_manager);
+                       open_channel_msg.common_fields.temporary_channel_id = ChannelId::temporary_from_entropy_source(&nodes[0].keys_manager);
                }
 
                // Once we have MAX_UNFUNDED_CHANS_PER_PEER unfunded channels, new inbound channels will be
                // rejected.
                nodes[1].node.handle_open_channel(&nodes[0].node.get_our_node_id(), &open_channel_msg);
                assert_eq!(get_err_msg(&nodes[1], &nodes[0].node.get_our_node_id()).channel_id,
-                       open_channel_msg.temporary_channel_id);
+                       open_channel_msg.common_fields.temporary_channel_id);
 
                // but we can still open an outbound channel.
                nodes[1].node.create_channel(nodes[0].node.get_our_node_id(), 100_000, 0, 42, None, None).unwrap();
@@ -12076,7 +13325,7 @@ mod tests {
                // but even with such an outbound channel, additional inbound channels will still fail.
                nodes[1].node.handle_open_channel(&nodes[0].node.get_our_node_id(), &open_channel_msg);
                assert_eq!(get_err_msg(&nodes[1], &nodes[0].node.get_our_node_id()).channel_id,
-                       open_channel_msg.temporary_channel_id);
+                       open_channel_msg.common_fields.temporary_channel_id);
        }
 
        #[test]
@@ -12112,7 +13361,7 @@ mod tests {
                                _ => panic!("Unexpected event"),
                        }
                        get_event_msg!(nodes[1], MessageSendEvent::SendAcceptChannel, random_pk);
-                       open_channel_msg.temporary_channel_id = ChannelId::temporary_from_entropy_source(&nodes[0].keys_manager);
+                       open_channel_msg.common_fields.temporary_channel_id = ChannelId::temporary_from_entropy_source(&nodes[0].keys_manager);
                }
 
                // If we try to accept a channel from another peer non-0conf it will fail.
@@ -12134,7 +13383,7 @@ mod tests {
                        _ => panic!("Unexpected event"),
                }
                assert_eq!(get_err_msg(&nodes[1], &last_random_pk).channel_id,
-                       open_channel_msg.temporary_channel_id);
+                       open_channel_msg.common_fields.temporary_channel_id);
 
                // ...however if we accept the same channel 0conf it should work just fine.
                nodes[1].node.handle_open_channel(&last_random_pk, &open_channel_msg);
@@ -12168,7 +13417,7 @@ mod tests {
                };
                // Check that if the amount we received + the penultimate hop extra fee is less than the sender
                // intended amount, we fail the payment.
-               let current_height: u32 = node[0].node.best_block.read().unwrap().height();
+               let current_height: u32 = node[0].node.best_block.read().unwrap().height;
                if let Err(crate::ln::channelmanager::InboundHTLCErr { err_code, .. }) =
                        create_recv_pending_htlc_info(hop_data, [0; 32], PaymentHash([0; 32]),
                                sender_intended_amt_msat - extra_fee_msat - 1, 42, None, true, Some(extra_fee_msat),
@@ -12188,7 +13437,7 @@ mod tests {
                        }),
                        custom_tlvs: Vec::new(),
                };
-               let current_height: u32 = node[0].node.best_block.read().unwrap().height();
+               let current_height: u32 = node[0].node.best_block.read().unwrap().height;
                assert!(create_recv_pending_htlc_info(hop_data, [0; 32], PaymentHash([0; 32]),
                        sender_intended_amt_msat - extra_fee_msat, 42, None, true, Some(extra_fee_msat),
                        current_height, node[0].node.default_configuration.accept_mpp_keysend).is_ok());
@@ -12201,7 +13450,7 @@ mod tests {
                let node_chanmgr = create_node_chanmgrs(1, &node_cfg, &[None]);
                let node = create_network(1, &node_cfg, &node_chanmgr);
 
-               let current_height: u32 = node[0].node.best_block.read().unwrap().height();
+               let current_height: u32 = node[0].node.best_block.read().unwrap().height;
                let result = create_recv_pending_htlc_info(msgs::InboundOnionPayload::Receive {
                        sender_intended_htlc_amt_msat: 100,
                        cltv_expiry_height: 22,
@@ -12279,7 +13528,7 @@ mod tests {
 
                nodes[0].node.create_channel(nodes[1].node.get_our_node_id(), 100_000, 0, 0, None, None).unwrap();
                let open_channel_msg = get_event_msg!(nodes[0], MessageSendEvent::SendOpenChannel, nodes[1].node.get_our_node_id());
-               assert!(open_channel_msg.channel_type.as_ref().unwrap().supports_anchors_zero_fee_htlc_tx());
+               assert!(open_channel_msg.common_fields.channel_type.as_ref().unwrap().supports_anchors_zero_fee_htlc_tx());
 
                nodes[1].node.handle_open_channel(&nodes[0].node.get_our_node_id(), &open_channel_msg);
                let events = nodes[1].node.get_and_clear_pending_events();
@@ -12294,7 +13543,7 @@ mod tests {
                nodes[0].node.handle_error(&nodes[1].node.get_our_node_id(), &error_msg);
 
                let open_channel_msg = get_event_msg!(nodes[0], MessageSendEvent::SendOpenChannel, nodes[1].node.get_our_node_id());
-               assert!(!open_channel_msg.channel_type.unwrap().supports_anchors_zero_fee_htlc_tx());
+               assert!(!open_channel_msg.common_fields.channel_type.unwrap().supports_anchors_zero_fee_htlc_tx());
 
                // Since nodes[1] should not have accepted the channel, it should
                // not have generated any events.
@@ -12483,7 +13732,7 @@ mod tests {
 
 
                let (scid_1, scid_2) = (42, 43);
-               let mut forward_htlcs = HashMap::new();
+               let mut forward_htlcs = new_hash_map();
                forward_htlcs.insert(scid_1, dummy_htlcs_1.clone());
                forward_htlcs.insert(scid_2, dummy_htlcs_2.clone());
 
@@ -12627,7 +13876,7 @@ pub mod bench {
 
                assert_eq!(&tx_broadcaster.txn_broadcasted.lock().unwrap()[..], &[tx.clone()]);
 
-               let block = create_dummy_block(BestBlock::from_network(network).block_hash(), 42, vec![tx]);
+               let block = create_dummy_block(BestBlock::from_network(network).block_hash, 42, vec![tx]);
                Listen::block_connected(&node_a, &block, 1);
                Listen::block_connected(&node_b, &block, 1);