Look up node id from scid in OnionMessenger
[rust-lightning] / lightning / src / ln / channelmanager.rs
index 685caba75c2024469178b04e367db625fd4fbfce..49ced4db33530b07622ccc32fe06fcfe90308f50 100644 (file)
@@ -31,7 +31,7 @@ use bitcoin::secp256k1::{SecretKey,PublicKey};
 use bitcoin::secp256k1::Secp256k1;
 use bitcoin::{secp256k1, Sequence};
 
-use crate::blinded_path::BlindedPath;
+use crate::blinded_path::{BlindedPath, NodeIdLookUp};
 use crate::blinded_path::payment::{PaymentConstraints, ReceiveTlvs};
 use crate::chain;
 use crate::chain::{Confirm, ChannelMonitorUpdateStatus, Watch, BestBlock};
@@ -903,7 +903,7 @@ pub(super) struct PeerState<SP: Deref> where SP::Target: SignerProvider {
        /// The peer is currently connected (i.e. we've seen a
        /// [`ChannelMessageHandler::peer_connected`] and no corresponding
        /// [`ChannelMessageHandler::peer_disconnected`].
-       is_connected: bool,
+       pub is_connected: bool,
 }
 
 impl <SP: Deref> PeerState<SP> where SP::Target: SignerProvider {
@@ -1109,11 +1109,620 @@ where
        fn get_cm(&self) -> &ChannelManager<M, T, ES, NS, SP, F, R, L> { self }
 }
 
-/// Manager which keeps track of a number of channels and sends messages to the appropriate
-/// channel, also tracking HTLC preimages and forwarding onion packets appropriately.
+/// A lightning node's channel state machine and payment management logic, which facilitates
+/// sending, forwarding, and receiving payments through lightning channels.
 ///
-/// Implements [`ChannelMessageHandler`], handling the multi-channel parts and passing things through
-/// to individual Channels.
+/// [`ChannelManager`] is parameterized by a number of components to achieve this.
+/// - [`chain::Watch`] (typically [`ChainMonitor`]) for on-chain monitoring and enforcement of each
+///   channel
+/// - [`BroadcasterInterface`] for broadcasting transactions related to opening, funding, and
+///   closing channels
+/// - [`EntropySource`] for providing random data needed for cryptographic operations
+/// - [`NodeSigner`] for cryptographic operations scoped to the node
+/// - [`SignerProvider`] for providing signers whose operations are scoped to individual channels
+/// - [`FeeEstimator`] to determine transaction fee rates needed to have a transaction mined in a
+///   timely manner
+/// - [`Router`] for finding payment paths when initiating and retrying payments
+/// - [`Logger`] for logging operational information of varying degrees
+///
+/// Additionally, it implements the following traits:
+/// - [`ChannelMessageHandler`] to handle off-chain channel activity from peers
+/// - [`MessageSendEventsProvider`] to similarly send such messages to peers
+/// - [`OffersMessageHandler`] for BOLT 12 message handling and sending
+/// - [`EventsProvider`] to generate user-actionable [`Event`]s
+/// - [`chain::Listen`] and [`chain::Confirm`] for notification of on-chain activity
+///
+/// Thus, [`ChannelManager`] is typically used to parameterize a [`MessageHandler`] and an
+/// [`OnionMessenger`]. The latter is required to support BOLT 12 functionality.
+///
+/// # `ChannelManager` vs `ChannelMonitor`
+///
+/// It's important to distinguish between the *off-chain* management and *on-chain* enforcement of
+/// lightning channels. [`ChannelManager`] exchanges messages with peers to manage the off-chain
+/// state of each channel. During this process, it generates a [`ChannelMonitor`] for each channel
+/// and a [`ChannelMonitorUpdate`] for each relevant change, notifying its parameterized
+/// [`chain::Watch`] of them.
+///
+/// An implementation of [`chain::Watch`], such as [`ChainMonitor`], is responsible for aggregating
+/// these [`ChannelMonitor`]s and applying any [`ChannelMonitorUpdate`]s to them. It then monitors
+/// for any pertinent on-chain activity, enforcing claims as needed.
+///
+/// This division of off-chain management and on-chain enforcement allows for interesting node
+/// setups. For instance, on-chain enforcement could be moved to a separate host or have added
+/// redundancy, possibly as a watchtower. See [`chain::Watch`] for the relevant interface.
+///
+/// # Initialization
+///
+/// Use [`ChannelManager::new`] with the most recent [`BlockHash`] when creating a fresh instance.
+/// Otherwise, if restarting, construct [`ChannelManagerReadArgs`] with the necessary parameters and
+/// references to any deserialized [`ChannelMonitor`]s that were previously persisted. Use this to
+/// deserialize the [`ChannelManager`] and feed it any new chain data since it was last online, as
+/// detailed in the [`ChannelManagerReadArgs`] documentation.
+///
+/// ```
+/// use bitcoin::BlockHash;
+/// use bitcoin::network::constants::Network;
+/// use lightning::chain::BestBlock;
+/// # use lightning::chain::channelmonitor::ChannelMonitor;
+/// use lightning::ln::channelmanager::{ChainParameters, ChannelManager, ChannelManagerReadArgs};
+/// # use lightning::routing::gossip::NetworkGraph;
+/// use lightning::util::config::UserConfig;
+/// use lightning::util::ser::ReadableArgs;
+///
+/// # fn read_channel_monitors() -> Vec<ChannelMonitor<lightning::sign::InMemorySigner>> { vec![] }
+/// # fn example<
+/// #     'a,
+/// #     L: lightning::util::logger::Logger,
+/// #     ES: lightning::sign::EntropySource,
+/// #     S: for <'b> lightning::routing::scoring::LockableScore<'b, ScoreLookUp = SL>,
+/// #     SL: lightning::routing::scoring::ScoreLookUp<ScoreParams = SP>,
+/// #     SP: Sized,
+/// #     R: lightning::io::Read,
+/// # >(
+/// #     fee_estimator: &dyn lightning::chain::chaininterface::FeeEstimator,
+/// #     chain_monitor: &dyn lightning::chain::Watch<lightning::sign::InMemorySigner>,
+/// #     tx_broadcaster: &dyn lightning::chain::chaininterface::BroadcasterInterface,
+/// #     router: &lightning::routing::router::DefaultRouter<&NetworkGraph<&'a L>, &'a L, &ES, &S, SP, SL>,
+/// #     logger: &L,
+/// #     entropy_source: &ES,
+/// #     node_signer: &dyn lightning::sign::NodeSigner,
+/// #     signer_provider: &lightning::sign::DynSignerProvider,
+/// #     best_block: lightning::chain::BestBlock,
+/// #     current_timestamp: u32,
+/// #     mut reader: R,
+/// # ) -> Result<(), lightning::ln::msgs::DecodeError> {
+/// // Fresh start with no channels
+/// let params = ChainParameters {
+///     network: Network::Bitcoin,
+///     best_block,
+/// };
+/// let default_config = UserConfig::default();
+/// let channel_manager = ChannelManager::new(
+///     fee_estimator, chain_monitor, tx_broadcaster, router, logger, entropy_source, node_signer,
+///     signer_provider, default_config, params, current_timestamp
+/// );
+///
+/// // Restart from deserialized data
+/// let mut channel_monitors = read_channel_monitors();
+/// let args = ChannelManagerReadArgs::new(
+///     entropy_source, node_signer, signer_provider, fee_estimator, chain_monitor, tx_broadcaster,
+///     router, logger, default_config, channel_monitors.iter_mut().collect()
+/// );
+/// let (block_hash, channel_manager) =
+///     <(BlockHash, ChannelManager<_, _, _, _, _, _, _, _>)>::read(&mut reader, args)?;
+///
+/// // Update the ChannelManager and ChannelMonitors with the latest chain data
+/// // ...
+///
+/// // Move the monitors to the ChannelManager's chain::Watch parameter
+/// for monitor in channel_monitors {
+///     chain_monitor.watch_channel(monitor.get_funding_txo().0, monitor);
+/// }
+/// # Ok(())
+/// # }
+/// ```
+///
+/// # Operation
+///
+/// The following is required for [`ChannelManager`] to function properly:
+/// - Handle messages from peers using its [`ChannelMessageHandler`] implementation (typically
+///   called by [`PeerManager::read_event`] when processing network I/O)
+/// - Send messages to peers obtained via its [`MessageSendEventsProvider`] implementation
+///   (typically initiated when [`PeerManager::process_events`] is called)
+/// - Feed on-chain activity using either its [`chain::Listen`] or [`chain::Confirm`] implementation
+///   as documented by those traits
+/// - Perform any periodic channel and payment checks by calling [`timer_tick_occurred`] roughly
+///   every minute
+/// - Persist to disk whenever [`get_and_clear_needs_persistence`] returns `true` using a
+///   [`Persister`] such as a [`KVStore`] implementation
+/// - Handle [`Event`]s obtained via its [`EventsProvider`] implementation
+///
+/// The [`Future`] returned by [`get_event_or_persistence_needed_future`] is useful in determining
+/// when the last two requirements need to be checked.
+///
+/// The [`lightning-block-sync`] and [`lightning-transaction-sync`] crates provide utilities that
+/// simplify feeding in on-chain activity using the [`chain::Listen`] and [`chain::Confirm`] traits,
+/// respectively. The remaining requirements can be met using the [`lightning-background-processor`]
+/// crate. For languages other than Rust, the availability of similar utilities may vary.
+///
+/// # Channels
+///
+/// [`ChannelManager`]'s primary function involves managing a channel state. Without channels,
+/// payments can't be sent. Use [`list_channels`] or [`list_usable_channels`] for a snapshot of the
+/// currently open channels.
+///
+/// ```
+/// # use lightning::ln::channelmanager::AChannelManager;
+/// #
+/// # fn example<T: AChannelManager>(channel_manager: T) {
+/// # let channel_manager = channel_manager.get_cm();
+/// let channels = channel_manager.list_usable_channels();
+/// for details in channels {
+///     println!("{:?}", details);
+/// }
+/// # }
+/// ```
+///
+/// Each channel is identified using a [`ChannelId`], which will change throughout the channel's
+/// life cycle. Additionally, channels are assigned a `user_channel_id`, which is given in
+/// [`Event`]s associated with the channel and serves as a fixed identifier but is otherwise unused
+/// by [`ChannelManager`].
+///
+/// ## Opening Channels
+///
+/// To an open a channel with a peer, call [`create_channel`]. This will initiate the process of
+/// opening an outbound channel, which requires self-funding when handling
+/// [`Event::FundingGenerationReady`].
+///
+/// ```
+/// # use bitcoin::{ScriptBuf, Transaction};
+/// # use bitcoin::secp256k1::PublicKey;
+/// # use lightning::ln::channelmanager::AChannelManager;
+/// # use lightning::events::{Event, EventsProvider};
+/// #
+/// # trait Wallet {
+/// #     fn create_funding_transaction(
+/// #         &self, _amount_sats: u64, _output_script: ScriptBuf
+/// #     ) -> Transaction;
+/// # }
+/// #
+/// # fn example<T: AChannelManager, W: Wallet>(channel_manager: T, wallet: W, peer_id: PublicKey) {
+/// # let channel_manager = channel_manager.get_cm();
+/// let value_sats = 1_000_000;
+/// let push_msats = 10_000_000;
+/// match channel_manager.create_channel(peer_id, value_sats, push_msats, 42, None, None) {
+///     Ok(channel_id) => println!("Opening channel {}", channel_id),
+///     Err(e) => println!("Error opening channel: {:?}", e),
+/// }
+///
+/// // On the event processing thread once the peer has responded
+/// channel_manager.process_pending_events(&|event| match event {
+///     Event::FundingGenerationReady {
+///         temporary_channel_id, counterparty_node_id, channel_value_satoshis, output_script,
+///         user_channel_id, ..
+///     } => {
+///         assert_eq!(user_channel_id, 42);
+///         let funding_transaction = wallet.create_funding_transaction(
+///             channel_value_satoshis, output_script
+///         );
+///         match channel_manager.funding_transaction_generated(
+///             &temporary_channel_id, &counterparty_node_id, funding_transaction
+///         ) {
+///             Ok(()) => println!("Funding channel {}", temporary_channel_id),
+///             Err(e) => println!("Error funding channel {}: {:?}", temporary_channel_id, e),
+///         }
+///     },
+///     Event::ChannelPending { channel_id, user_channel_id, former_temporary_channel_id, .. } => {
+///         assert_eq!(user_channel_id, 42);
+///         println!(
+///             "Channel {} now {} pending (funding transaction has been broadcasted)", channel_id,
+///             former_temporary_channel_id.unwrap()
+///         );
+///     },
+///     Event::ChannelReady { channel_id, user_channel_id, .. } => {
+///         assert_eq!(user_channel_id, 42);
+///         println!("Channel {} ready", channel_id);
+///     },
+///     // ...
+/// #     _ => {},
+/// });
+/// # }
+/// ```
+///
+/// ## Accepting Channels
+///
+/// Inbound channels are initiated by peers and are automatically accepted unless [`ChannelManager`]
+/// has [`UserConfig::manually_accept_inbound_channels`] set. In that case, the channel may be
+/// either accepted or rejected when handling [`Event::OpenChannelRequest`].
+///
+/// ```
+/// # use bitcoin::secp256k1::PublicKey;
+/// # use lightning::ln::channelmanager::AChannelManager;
+/// # use lightning::events::{Event, EventsProvider};
+/// #
+/// # fn is_trusted(counterparty_node_id: PublicKey) -> bool {
+/// #     // ...
+/// #     unimplemented!()
+/// # }
+/// #
+/// # fn example<T: AChannelManager>(channel_manager: T) {
+/// # let channel_manager = channel_manager.get_cm();
+/// channel_manager.process_pending_events(&|event| match event {
+///     Event::OpenChannelRequest { temporary_channel_id, counterparty_node_id, ..  } => {
+///         if !is_trusted(counterparty_node_id) {
+///             match channel_manager.force_close_without_broadcasting_txn(
+///                 &temporary_channel_id, &counterparty_node_id
+///             ) {
+///                 Ok(()) => println!("Rejecting channel {}", temporary_channel_id),
+///                 Err(e) => println!("Error rejecting channel {}: {:?}", temporary_channel_id, e),
+///             }
+///             return;
+///         }
+///
+///         let user_channel_id = 43;
+///         match channel_manager.accept_inbound_channel(
+///             &temporary_channel_id, &counterparty_node_id, user_channel_id
+///         ) {
+///             Ok(()) => println!("Accepting channel {}", temporary_channel_id),
+///             Err(e) => println!("Error accepting channel {}: {:?}", temporary_channel_id, e),
+///         }
+///     },
+///     // ...
+/// #     _ => {},
+/// });
+/// # }
+/// ```
+///
+/// ## Closing Channels
+///
+/// There are two ways to close a channel: either cooperatively using [`close_channel`] or
+/// unilaterally using [`force_close_broadcasting_latest_txn`]. The former is ideal as it makes for
+/// lower fees and immediate access to funds. However, the latter may be necessary if the
+/// counterparty isn't behaving properly or has gone offline. [`Event::ChannelClosed`] is generated
+/// once the channel has been closed successfully.
+///
+/// ```
+/// # use bitcoin::secp256k1::PublicKey;
+/// # use lightning::ln::ChannelId;
+/// # use lightning::ln::channelmanager::AChannelManager;
+/// # use lightning::events::{Event, EventsProvider};
+/// #
+/// # fn example<T: AChannelManager>(
+/// #     channel_manager: T, channel_id: ChannelId, counterparty_node_id: PublicKey
+/// # ) {
+/// # let channel_manager = channel_manager.get_cm();
+/// match channel_manager.close_channel(&channel_id, &counterparty_node_id) {
+///     Ok(()) => println!("Closing channel {}", channel_id),
+///     Err(e) => println!("Error closing channel {}: {:?}", channel_id, e),
+/// }
+///
+/// // On the event processing thread
+/// channel_manager.process_pending_events(&|event| match event {
+///     Event::ChannelClosed { channel_id, user_channel_id, ..  } => {
+///         assert_eq!(user_channel_id, 42);
+///         println!("Channel {} closed", channel_id);
+///     },
+///     // ...
+/// #     _ => {},
+/// });
+/// # }
+/// ```
+///
+/// # Payments
+///
+/// [`ChannelManager`] is responsible for sending, forwarding, and receiving payments through its
+/// channels. A payment is typically initiated from a [BOLT 11] invoice or a [BOLT 12] offer, though
+/// spontaneous (i.e., keysend) payments are also possible. Incoming payments don't require
+/// maintaining any additional state as [`ChannelManager`] can reconstruct the [`PaymentPreimage`]
+/// from the [`PaymentSecret`]. Sending payments, however, require tracking in order to retry failed
+/// HTLCs.
+///
+/// After a payment is initiated, it will appear in [`list_recent_payments`] until a short time
+/// after either an [`Event::PaymentSent`] or [`Event::PaymentFailed`] is handled. Failed HTLCs
+/// for a payment will be retried according to the payment's [`Retry`] strategy or until
+/// [`abandon_payment`] is called.
+///
+/// ## BOLT 11 Invoices
+///
+/// The [`lightning-invoice`] crate is useful for creating BOLT 11 invoices. Specifically, use the
+/// functions in its `utils` module for constructing invoices that are compatible with
+/// [`ChannelManager`]. These functions serve as a convenience for building invoices with the
+/// [`PaymentHash`] and [`PaymentSecret`] returned from [`create_inbound_payment`]. To provide your
+/// own [`PaymentHash`], use [`create_inbound_payment_for_hash`] or the corresponding functions in
+/// the [`lightning-invoice`] `utils` module.
+///
+/// [`ChannelManager`] generates an [`Event::PaymentClaimable`] once the full payment has been
+/// received. Call [`claim_funds`] to release the [`PaymentPreimage`], which in turn will result in
+/// an [`Event::PaymentClaimed`].
+///
+/// ```
+/// # use lightning::events::{Event, EventsProvider, PaymentPurpose};
+/// # use lightning::ln::channelmanager::AChannelManager;
+/// #
+/// # fn example<T: AChannelManager>(channel_manager: T) {
+/// # let channel_manager = channel_manager.get_cm();
+/// // Or use utils::create_invoice_from_channelmanager
+/// let known_payment_hash = match channel_manager.create_inbound_payment(
+///     Some(10_000_000), 3600, None
+/// ) {
+///     Ok((payment_hash, _payment_secret)) => {
+///         println!("Creating inbound payment {}", payment_hash);
+///         payment_hash
+///     },
+///     Err(()) => panic!("Error creating inbound payment"),
+/// };
+///
+/// // On the event processing thread
+/// channel_manager.process_pending_events(&|event| match event {
+///     Event::PaymentClaimable { payment_hash, purpose, .. } => match purpose {
+///         PaymentPurpose::InvoicePayment { payment_preimage: Some(payment_preimage), .. } => {
+///             assert_eq!(payment_hash, known_payment_hash);
+///             println!("Claiming payment {}", payment_hash);
+///             channel_manager.claim_funds(payment_preimage);
+///         },
+///         PaymentPurpose::InvoicePayment { payment_preimage: None, .. } => {
+///             println!("Unknown payment hash: {}", payment_hash);
+///         },
+///         PaymentPurpose::SpontaneousPayment(payment_preimage) => {
+///             assert_ne!(payment_hash, known_payment_hash);
+///             println!("Claiming spontaneous payment {}", payment_hash);
+///             channel_manager.claim_funds(payment_preimage);
+///         },
+///     },
+///     Event::PaymentClaimed { payment_hash, amount_msat, .. } => {
+///         assert_eq!(payment_hash, known_payment_hash);
+///         println!("Claimed {} msats", amount_msat);
+///     },
+///     // ...
+/// #     _ => {},
+/// });
+/// # }
+/// ```
+///
+/// For paying an invoice, [`lightning-invoice`] provides a `payment` module with convenience
+/// functions for use with [`send_payment`].
+///
+/// ```
+/// # use lightning::events::{Event, EventsProvider};
+/// # use lightning::ln::PaymentHash;
+/// # use lightning::ln::channelmanager::{AChannelManager, PaymentId, RecentPaymentDetails, RecipientOnionFields, Retry};
+/// # use lightning::routing::router::RouteParameters;
+/// #
+/// # fn example<T: AChannelManager>(
+/// #     channel_manager: T, payment_hash: PaymentHash, recipient_onion: RecipientOnionFields,
+/// #     route_params: RouteParameters, retry: Retry
+/// # ) {
+/// # let channel_manager = channel_manager.get_cm();
+/// // let (payment_hash, recipient_onion, route_params) =
+/// //     payment::payment_parameters_from_invoice(&invoice);
+/// let payment_id = PaymentId([42; 32]);
+/// match channel_manager.send_payment(
+///     payment_hash, recipient_onion, payment_id, route_params, retry
+/// ) {
+///     Ok(()) => println!("Sending payment with hash {}", payment_hash),
+///     Err(e) => println!("Failed sending payment with hash {}: {:?}", payment_hash, e),
+/// }
+///
+/// let expected_payment_id = payment_id;
+/// let expected_payment_hash = payment_hash;
+/// assert!(
+///     channel_manager.list_recent_payments().iter().find(|details| matches!(
+///         details,
+///         RecentPaymentDetails::Pending {
+///             payment_id: expected_payment_id,
+///             payment_hash: expected_payment_hash,
+///             ..
+///         }
+///     )).is_some()
+/// );
+///
+/// // On the event processing thread
+/// channel_manager.process_pending_events(&|event| match event {
+///     Event::PaymentSent { payment_hash, .. } => println!("Paid {}", payment_hash),
+///     Event::PaymentFailed { payment_hash, .. } => println!("Failed paying {}", payment_hash),
+///     // ...
+/// #     _ => {},
+/// });
+/// # }
+/// ```
+///
+/// ## BOLT 12 Offers
+///
+/// The [`offers`] module is useful for creating BOLT 12 offers. An [`Offer`] is a precursor to a
+/// [`Bolt12Invoice`], which must first be requested by the payer. The interchange of these messages
+/// as defined in the specification is handled by [`ChannelManager`] and its implementation of
+/// [`OffersMessageHandler`]. However, this only works with an [`Offer`] created using a builder
+/// returned by [`create_offer_builder`]. With this approach, BOLT 12 offers and invoices are
+/// stateless just as BOLT 11 invoices are.
+///
+/// ```
+/// # use lightning::events::{Event, EventsProvider, PaymentPurpose};
+/// # use lightning::ln::channelmanager::AChannelManager;
+/// # use lightning::offers::parse::Bolt12SemanticError;
+/// #
+/// # fn example<T: AChannelManager>(channel_manager: T) -> Result<(), Bolt12SemanticError> {
+/// # let channel_manager = channel_manager.get_cm();
+/// let offer = channel_manager
+///     .create_offer_builder("coffee".to_string())?
+/// # ;
+/// # // Needed for compiling for c_bindings
+/// # let builder: lightning::offers::offer::OfferBuilder<_, _> = offer.into();
+/// # let offer = builder
+///     .amount_msats(10_000_000)
+///     .build()?;
+/// let bech32_offer = offer.to_string();
+///
+/// // On the event processing thread
+/// channel_manager.process_pending_events(&|event| match event {
+///     Event::PaymentClaimable { payment_hash, purpose, .. } => match purpose {
+///         PaymentPurpose::InvoicePayment { payment_preimage: Some(payment_preimage), .. } => {
+///             println!("Claiming payment {}", payment_hash);
+///             channel_manager.claim_funds(payment_preimage);
+///         },
+///         PaymentPurpose::InvoicePayment { payment_preimage: None, .. } => {
+///             println!("Unknown payment hash: {}", payment_hash);
+///         },
+///         // ...
+/// #         _ => {},
+///     },
+///     Event::PaymentClaimed { payment_hash, amount_msat, .. } => {
+///         println!("Claimed {} msats", amount_msat);
+///     },
+///     // ...
+/// #     _ => {},
+/// });
+/// # Ok(())
+/// # }
+/// ```
+///
+/// Use [`pay_for_offer`] to initiated payment, which sends an [`InvoiceRequest`] for an [`Offer`]
+/// and pays the [`Bolt12Invoice`] response. In addition to success and failure events,
+/// [`ChannelManager`] may also generate an [`Event::InvoiceRequestFailed`].
+///
+/// ```
+/// # use lightning::events::{Event, EventsProvider};
+/// # use lightning::ln::channelmanager::{AChannelManager, PaymentId, RecentPaymentDetails, Retry};
+/// # use lightning::offers::offer::Offer;
+/// #
+/// # fn example<T: AChannelManager>(
+/// #     channel_manager: T, offer: &Offer, quantity: Option<u64>, amount_msats: Option<u64>,
+/// #     payer_note: Option<String>, retry: Retry, max_total_routing_fee_msat: Option<u64>
+/// # ) {
+/// # let channel_manager = channel_manager.get_cm();
+/// let payment_id = PaymentId([42; 32]);
+/// match channel_manager.pay_for_offer(
+///     offer, quantity, amount_msats, payer_note, payment_id, retry, max_total_routing_fee_msat
+/// ) {
+///     Ok(()) => println!("Requesting invoice for offer"),
+///     Err(e) => println!("Unable to request invoice for offer: {:?}", e),
+/// }
+///
+/// // First the payment will be waiting on an invoice
+/// let expected_payment_id = payment_id;
+/// assert!(
+///     channel_manager.list_recent_payments().iter().find(|details| matches!(
+///         details,
+///         RecentPaymentDetails::AwaitingInvoice { payment_id: expected_payment_id }
+///     )).is_some()
+/// );
+///
+/// // Once the invoice is received, a payment will be sent
+/// assert!(
+///     channel_manager.list_recent_payments().iter().find(|details| matches!(
+///         details,
+///         RecentPaymentDetails::Pending { payment_id: expected_payment_id, ..  }
+///     )).is_some()
+/// );
+///
+/// // On the event processing thread
+/// channel_manager.process_pending_events(&|event| match event {
+///     Event::PaymentSent { payment_id: Some(payment_id), .. } => println!("Paid {}", payment_id),
+///     Event::PaymentFailed { payment_id, .. } => println!("Failed paying {}", payment_id),
+///     Event::InvoiceRequestFailed { payment_id, .. } => println!("Failed paying {}", payment_id),
+///     // ...
+/// #     _ => {},
+/// });
+/// # }
+/// ```
+///
+/// ## BOLT 12 Refunds
+///
+/// A [`Refund`] is a request for an invoice to be paid. Like *paying* for an [`Offer`], *creating*
+/// a [`Refund`] involves maintaining state since it represents a future outbound payment.
+/// Therefore, use [`create_refund_builder`] when creating one, otherwise [`ChannelManager`] will
+/// refuse to pay any corresponding [`Bolt12Invoice`] that it receives.
+///
+/// ```
+/// # use core::time::Duration;
+/// # use lightning::events::{Event, EventsProvider};
+/// # use lightning::ln::channelmanager::{AChannelManager, PaymentId, RecentPaymentDetails, Retry};
+/// # use lightning::offers::parse::Bolt12SemanticError;
+/// #
+/// # fn example<T: AChannelManager>(
+/// #     channel_manager: T, amount_msats: u64, absolute_expiry: Duration, retry: Retry,
+/// #     max_total_routing_fee_msat: Option<u64>
+/// # ) -> Result<(), Bolt12SemanticError> {
+/// # let channel_manager = channel_manager.get_cm();
+/// let payment_id = PaymentId([42; 32]);
+/// let refund = channel_manager
+///     .create_refund_builder(
+///         "coffee".to_string(), amount_msats, absolute_expiry, payment_id, retry,
+///         max_total_routing_fee_msat
+///     )?
+/// # ;
+/// # // Needed for compiling for c_bindings
+/// # let builder: lightning::offers::refund::RefundBuilder<_> = refund.into();
+/// # let refund = builder
+///     .payer_note("refund for order 1234".to_string())
+///     .build()?;
+/// let bech32_refund = refund.to_string();
+///
+/// // First the payment will be waiting on an invoice
+/// let expected_payment_id = payment_id;
+/// assert!(
+///     channel_manager.list_recent_payments().iter().find(|details| matches!(
+///         details,
+///         RecentPaymentDetails::AwaitingInvoice { payment_id: expected_payment_id }
+///     )).is_some()
+/// );
+///
+/// // Once the invoice is received, a payment will be sent
+/// assert!(
+///     channel_manager.list_recent_payments().iter().find(|details| matches!(
+///         details,
+///         RecentPaymentDetails::Pending { payment_id: expected_payment_id, ..  }
+///     )).is_some()
+/// );
+///
+/// // On the event processing thread
+/// channel_manager.process_pending_events(&|event| match event {
+///     Event::PaymentSent { payment_id: Some(payment_id), .. } => println!("Paid {}", payment_id),
+///     Event::PaymentFailed { payment_id, .. } => println!("Failed paying {}", payment_id),
+///     // ...
+/// #     _ => {},
+/// });
+/// # Ok(())
+/// # }
+/// ```
+///
+/// Use [`request_refund_payment`] to send a [`Bolt12Invoice`] for receiving the refund. Similar to
+/// *creating* an [`Offer`], this is stateless as it represents an inbound payment.
+///
+/// ```
+/// # use lightning::events::{Event, EventsProvider, PaymentPurpose};
+/// # use lightning::ln::channelmanager::AChannelManager;
+/// # use lightning::offers::refund::Refund;
+/// #
+/// # fn example<T: AChannelManager>(channel_manager: T, refund: &Refund) {
+/// # let channel_manager = channel_manager.get_cm();
+/// match channel_manager.request_refund_payment(refund) {
+///     Ok(()) => println!("Requesting payment for refund"),
+///     Err(e) => println!("Unable to request payment for refund: {:?}", e),
+/// }
+///
+/// // On the event processing thread
+/// channel_manager.process_pending_events(&|event| match event {
+///     Event::PaymentClaimable { payment_hash, purpose, .. } => match purpose {
+///            PaymentPurpose::InvoicePayment { payment_preimage: Some(payment_preimage), .. } => {
+///             println!("Claiming payment {}", payment_hash);
+///             channel_manager.claim_funds(payment_preimage);
+///         },
+///            PaymentPurpose::InvoicePayment { payment_preimage: None, .. } => {
+///             println!("Unknown payment hash: {}", payment_hash);
+///            },
+///         // ...
+/// #         _ => {},
+///     },
+///     Event::PaymentClaimed { payment_hash, amount_msat, .. } => {
+///         println!("Claimed {} msats", amount_msat);
+///     },
+///     // ...
+/// #     _ => {},
+/// });
+/// # }
+/// ```
+///
+/// # Persistence
 ///
 /// Implements [`Writeable`] to write out all channel state to disk. Implies [`peer_disconnected`] for
 /// all peers during write/read (though does not modify this instance, only the instance being
@@ -1134,12 +1743,16 @@ where
 /// tells you the last block hash which was connected. You should get the best block tip before using the manager.
 /// See [`chain::Listen`] and [`chain::Confirm`] for more details.
 ///
+/// # `ChannelUpdate` Messages
+///
 /// Note that `ChannelManager` is responsible for tracking liveness of its channels and generating
 /// [`ChannelUpdate`] messages informing peers that the channel is temporarily disabled. To avoid
 /// spam due to quick disconnection/reconnection, updates are not sent until the channel has been
 /// offline for a full minute. In order to track this, you must call
 /// [`timer_tick_occurred`] roughly once per minute, though it doesn't have to be perfect.
 ///
+/// # DoS Mitigation
+///
 /// To avoid trivial DoS issues, `ChannelManager` limits the number of inbound connections and
 /// inbound channels without confirmed funding transactions. This may result in nodes which we do
 /// not have a channel with being unable to connect to us or open new channels with us if we have
@@ -1149,19 +1762,53 @@ where
 /// exempted from the count of unfunded channels. Similarly, outbound channels and connections are
 /// never limited. Please ensure you limit the count of such channels yourself.
 ///
+/// # Type Aliases
+///
 /// Rather than using a plain `ChannelManager`, it is preferable to use either a [`SimpleArcChannelManager`]
 /// a [`SimpleRefChannelManager`], for conciseness. See their documentation for more details, but
 /// essentially you should default to using a [`SimpleRefChannelManager`], and use a
 /// [`SimpleArcChannelManager`] when you require a `ChannelManager` with a static lifetime, such as when
 /// you're using lightning-net-tokio.
 ///
+/// [`ChainMonitor`]: crate::chain::chainmonitor::ChainMonitor
+/// [`MessageHandler`]: crate::ln::peer_handler::MessageHandler
+/// [`OnionMessenger`]: crate::onion_message::messenger::OnionMessenger
+/// [`PeerManager::read_event`]: crate::ln::peer_handler::PeerManager::read_event
+/// [`PeerManager::process_events`]: crate::ln::peer_handler::PeerManager::process_events
+/// [`timer_tick_occurred`]: Self::timer_tick_occurred
+/// [`get_and_clear_needs_persistence`]: Self::get_and_clear_needs_persistence
+/// [`Persister`]: crate::util::persist::Persister
+/// [`KVStore`]: crate::util::persist::KVStore
+/// [`get_event_or_persistence_needed_future`]: Self::get_event_or_persistence_needed_future
+/// [`lightning-block-sync`]: https://docs.rs/lightning_block_sync/latest/lightning_block_sync
+/// [`lightning-transaction-sync`]: https://docs.rs/lightning_transaction_sync/latest/lightning_transaction_sync
+/// [`lightning-background-processor`]: https://docs.rs/lightning_background_processor/lightning_background_processor
+/// [`list_channels`]: Self::list_channels
+/// [`list_usable_channels`]: Self::list_usable_channels
+/// [`create_channel`]: Self::create_channel
+/// [`close_channel`]: Self::force_close_broadcasting_latest_txn
+/// [`force_close_broadcasting_latest_txn`]: Self::force_close_broadcasting_latest_txn
+/// [BOLT 11]: https://github.com/lightning/bolts/blob/master/11-payment-encoding.md
+/// [BOLT 12]: https://github.com/rustyrussell/lightning-rfc/blob/guilt/offers/12-offer-encoding.md
+/// [`list_recent_payments`]: Self::list_recent_payments
+/// [`abandon_payment`]: Self::abandon_payment
+/// [`lightning-invoice`]: https://docs.rs/lightning_invoice/latest/lightning_invoice
+/// [`create_inbound_payment`]: Self::create_inbound_payment
+/// [`create_inbound_payment_for_hash`]: Self::create_inbound_payment_for_hash
+/// [`claim_funds`]: Self::claim_funds
+/// [`send_payment`]: Self::send_payment
+/// [`offers`]: crate::offers
+/// [`create_offer_builder`]: Self::create_offer_builder
+/// [`pay_for_offer`]: Self::pay_for_offer
+/// [`InvoiceRequest`]: crate::offers::invoice_request::InvoiceRequest
+/// [`create_refund_builder`]: Self::create_refund_builder
+/// [`request_refund_payment`]: Self::request_refund_payment
 /// [`peer_disconnected`]: msgs::ChannelMessageHandler::peer_disconnected
 /// [`funding_created`]: msgs::FundingCreated
 /// [`funding_transaction_generated`]: Self::funding_transaction_generated
 /// [`BlockHash`]: bitcoin::hash_types::BlockHash
 /// [`update_channel`]: chain::Watch::update_channel
 /// [`ChannelUpdate`]: msgs::ChannelUpdate
-/// [`timer_tick_occurred`]: Self::timer_tick_occurred
 /// [`read`]: ReadableArgs::read
 //
 // Lock order:
@@ -1428,6 +2075,9 @@ where
 
        pending_offers_messages: Mutex<Vec<PendingOnionMessage<OffersMessage>>>,
 
+       /// Tracks the message events that are to be broadcasted when we are connected to some peer.
+       pending_broadcast_messages: Mutex<Vec<MessageSendEvent>>,
+
        entropy_source: ES,
        node_signer: NS,
        signer_provider: SP,
@@ -2019,7 +2669,7 @@ macro_rules! handle_error {
                match $internal {
                        Ok(msg) => Ok(msg),
                        Err(MsgHandleErrInternal { err, shutdown_finish, .. }) => {
-                               let mut msg_events = Vec::with_capacity(2);
+                               let mut msg_event = None;
 
                                if let Some((shutdown_res, update_option)) = shutdown_finish {
                                        let counterparty_node_id = shutdown_res.counterparty_node_id;
@@ -2031,7 +2681,8 @@ macro_rules! handle_error {
 
                                        $self.finish_close_channel(shutdown_res);
                                        if let Some(update) = update_option {
-                                               msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
+                                               let mut pending_broadcast_messages = $self.pending_broadcast_messages.lock().unwrap();
+                                               pending_broadcast_messages.push(events::MessageSendEvent::BroadcastChannelUpdate {
                                                        msg: update
                                                });
                                        }
@@ -2041,17 +2692,17 @@ macro_rules! handle_error {
 
                                if let msgs::ErrorAction::IgnoreError = err.action {
                                } else {
-                                       msg_events.push(events::MessageSendEvent::HandleError {
+                                       msg_event = Some(events::MessageSendEvent::HandleError {
                                                node_id: $counterparty_node_id,
                                                action: err.action.clone()
                                        });
                                }
 
-                               if !msg_events.is_empty() {
+                               if let Some(msg_event) = msg_event {
                                        let per_peer_state = $self.per_peer_state.read().unwrap();
                                        if let Some(peer_state_mutex) = per_peer_state.get(&$counterparty_node_id) {
                                                let mut peer_state = peer_state_mutex.lock().unwrap();
-                                               peer_state.pending_msg_events.append(&mut msg_events);
+                                               peer_state.pending_msg_events.push(msg_event);
                                        }
                                }
 
@@ -2522,6 +3173,7 @@ where
                        funding_batch_states: Mutex::new(BTreeMap::new()),
 
                        pending_offers_messages: Mutex::new(Vec::new()),
+                       pending_broadcast_messages: Mutex::new(Vec::new()),
 
                        entropy_source,
                        node_signer,
@@ -3020,17 +3672,11 @@ where
                        }
                };
                if let Some(update) = update_opt {
-                       // Try to send the `BroadcastChannelUpdate` to the peer we just force-closed on, but if
-                       // not try to broadcast it via whatever peer we have.
-                       let per_peer_state = self.per_peer_state.read().unwrap();
-                       let a_peer_state_opt = per_peer_state.get(peer_node_id)
-                               .ok_or(per_peer_state.values().next());
-                       if let Ok(a_peer_state_mutex) = a_peer_state_opt {
-                               let mut a_peer_state = a_peer_state_mutex.lock().unwrap();
-                               a_peer_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
-                                       msg: update
-                               });
-                       }
+                       // If we have some Channel Update to broadcast, we cache it and broadcast it later.
+                       let mut pending_broadcast_messages = self.pending_broadcast_messages.lock().unwrap();
+                       pending_broadcast_messages.push(events::MessageSendEvent::BroadcastChannelUpdate {
+                               msg: update
+                       });
                }
 
                Ok(counterparty_node_id)
@@ -3094,6 +3740,163 @@ where
                }
        }
 
+       fn can_forward_htlc_to_outgoing_channel(
+               &self, chan: &mut Channel<SP>, msg: &msgs::UpdateAddHTLC, next_packet: &NextPacketDetails
+       ) -> Result<(), (&'static str, u16, Option<msgs::ChannelUpdate>)> {
+               if !chan.context.should_announce() && !self.default_configuration.accept_forwards_to_priv_channels {
+                       // Note that the behavior here should be identical to the above block - we
+                       // should NOT reveal the existence or non-existence of a private channel if
+                       // we don't allow forwards outbound over them.
+                       return Err(("Refusing to forward to a private channel based on our config.", 0x4000 | 10, None));
+               }
+               if chan.context.get_channel_type().supports_scid_privacy() && next_packet.outgoing_scid != chan.context.outbound_scid_alias() {
+                       // `option_scid_alias` (referred to in LDK as `scid_privacy`) means
+                       // "refuse to forward unless the SCID alias was used", so we pretend
+                       // we don't have the channel here.
+                       return Err(("Refusing to forward over real channel SCID as our counterparty requested.", 0x4000 | 10, None));
+               }
+
+               // Note that we could technically not return an error yet here and just hope
+               // that the connection is reestablished or monitor updated by the time we get
+               // around to doing the actual forward, but better to fail early if we can and
+               // hopefully an attacker trying to path-trace payments cannot make this occur
+               // on a small/per-node/per-channel scale.
+               if !chan.context.is_live() { // channel_disabled
+                       // If the channel_update we're going to return is disabled (i.e. the
+                       // peer has been disabled for some time), return `channel_disabled`,
+                       // otherwise return `temporary_channel_failure`.
+                       let chan_update_opt = self.get_channel_update_for_onion(next_packet.outgoing_scid, chan).ok();
+                       if chan_update_opt.as_ref().map(|u| u.contents.flags & 2 == 2).unwrap_or(false) {
+                               return Err(("Forwarding channel has been disconnected for some time.", 0x1000 | 20, chan_update_opt));
+                       } else {
+                               return Err(("Forwarding channel is not in a ready state.", 0x1000 | 7, chan_update_opt));
+                       }
+               }
+               if next_packet.outgoing_amt_msat < chan.context.get_counterparty_htlc_minimum_msat() { // amount_below_minimum
+                       let chan_update_opt = self.get_channel_update_for_onion(next_packet.outgoing_scid, chan).ok();
+                       return Err(("HTLC amount was below the htlc_minimum_msat", 0x1000 | 11, chan_update_opt));
+               }
+               if let Err((err, code)) = chan.htlc_satisfies_config(msg, next_packet.outgoing_amt_msat, next_packet.outgoing_cltv_value) {
+                       let chan_update_opt = self.get_channel_update_for_onion(next_packet.outgoing_scid, chan).ok();
+                       return Err((err, code, chan_update_opt));
+               }
+
+               Ok(())
+       }
+
+       /// Executes a callback `C` that returns some value `X` on the channel found with the given
+       /// `scid`. `None` is returned when the channel is not found.
+       fn do_funded_channel_callback<X, C: Fn(&mut Channel<SP>) -> X>(
+               &self, scid: u64, callback: C,
+       ) -> Option<X> {
+               let (counterparty_node_id, channel_id) = match self.short_to_chan_info.read().unwrap().get(&scid).cloned() {
+                       None => return None,
+                       Some((cp_id, id)) => (cp_id, id),
+               };
+               let per_peer_state = self.per_peer_state.read().unwrap();
+               let peer_state_mutex_opt = per_peer_state.get(&counterparty_node_id);
+               if peer_state_mutex_opt.is_none() {
+                       return None;
+               }
+               let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap();
+               let peer_state = &mut *peer_state_lock;
+               match peer_state.channel_by_id.get_mut(&channel_id).and_then(
+                       |chan_phase| if let ChannelPhase::Funded(chan) = chan_phase { Some(chan) } else { None }
+               ) {
+                       None => None,
+                       Some(chan) => Some(callback(chan)),
+               }
+       }
+
+       fn can_forward_htlc(
+               &self, msg: &msgs::UpdateAddHTLC, next_packet_details: &NextPacketDetails
+       ) -> Result<(), (&'static str, u16, Option<msgs::ChannelUpdate>)> {
+               match self.do_funded_channel_callback(next_packet_details.outgoing_scid, |chan: &mut Channel<SP>| {
+                       self.can_forward_htlc_to_outgoing_channel(chan, msg, next_packet_details)
+               }) {
+                       Some(Ok(())) => {},
+                       Some(Err(e)) => return Err(e),
+                       None => {
+                               // If we couldn't find the channel info for the scid, it may be a phantom or
+                               // intercept forward.
+                               if (self.default_configuration.accept_intercept_htlcs &&
+                                       fake_scid::is_valid_intercept(&self.fake_scid_rand_bytes, next_packet_details.outgoing_scid, &self.chain_hash)) ||
+                                       fake_scid::is_valid_phantom(&self.fake_scid_rand_bytes, next_packet_details.outgoing_scid, &self.chain_hash)
+                               {} else {
+                                       return Err(("Don't have available channel for forwarding as requested.", 0x4000 | 10, None));
+                               }
+                       }
+               }
+
+               let cur_height = self.best_block.read().unwrap().height + 1;
+               if let Err((err_msg, err_code)) = check_incoming_htlc_cltv(
+                       cur_height, next_packet_details.outgoing_cltv_value, msg.cltv_expiry
+               ) {
+                       let chan_update_opt = self.do_funded_channel_callback(next_packet_details.outgoing_scid, |chan: &mut Channel<SP>| {
+                               self.get_channel_update_for_onion(next_packet_details.outgoing_scid, chan).ok()
+                       }).flatten();
+                       return Err((err_msg, err_code, chan_update_opt));
+               }
+
+               Ok(())
+       }
+
+       fn htlc_failure_from_update_add_err(
+               &self, msg: &msgs::UpdateAddHTLC, counterparty_node_id: &PublicKey, err_msg: &'static str,
+               mut err_code: u16, chan_update: Option<msgs::ChannelUpdate>, is_intro_node_blinded_forward: bool,
+               shared_secret: &[u8; 32]
+       ) -> HTLCFailureMsg {
+               let mut res = VecWriter(Vec::with_capacity(chan_update.serialized_length() + 2 + 8 + 2));
+               if chan_update.is_some() && err_code & 0x1000 == 0x1000 {
+                       let chan_update = chan_update.unwrap();
+                       if err_code == 0x1000 | 11 || err_code == 0x1000 | 12 {
+                               msg.amount_msat.write(&mut res).expect("Writes cannot fail");
+                       }
+                       else if err_code == 0x1000 | 13 {
+                               msg.cltv_expiry.write(&mut res).expect("Writes cannot fail");
+                       }
+                       else if err_code == 0x1000 | 20 {
+                               // TODO: underspecified, follow https://github.com/lightning/bolts/issues/791
+                               0u16.write(&mut res).expect("Writes cannot fail");
+                       }
+                       (chan_update.serialized_length() as u16 + 2).write(&mut res).expect("Writes cannot fail");
+                       msgs::ChannelUpdate::TYPE.write(&mut res).expect("Writes cannot fail");
+                       chan_update.write(&mut res).expect("Writes cannot fail");
+               } else if err_code & 0x1000 == 0x1000 {
+                       // If we're trying to return an error that requires a `channel_update` but
+                       // we're forwarding to a phantom or intercept "channel" (i.e. cannot
+                       // generate an update), just use the generic "temporary_node_failure"
+                       // instead.
+                       err_code = 0x2000 | 2;
+               }
+
+               log_info!(
+                       WithContext::from(&self.logger, Some(*counterparty_node_id), Some(msg.channel_id)),
+                       "Failed to accept/forward incoming HTLC: {}", err_msg
+               );
+               // If `msg.blinding_point` is set, we must always fail with malformed.
+               if msg.blinding_point.is_some() {
+                       return HTLCFailureMsg::Malformed(msgs::UpdateFailMalformedHTLC {
+                               channel_id: msg.channel_id,
+                               htlc_id: msg.htlc_id,
+                               sha256_of_onion: [0; 32],
+                               failure_code: INVALID_ONION_BLINDING,
+                       });
+               }
+
+               let (err_code, err_data) = if is_intro_node_blinded_forward {
+                       (INVALID_ONION_BLINDING, &[0; 32][..])
+               } else {
+                       (err_code, &res.0[..])
+               };
+               HTLCFailureMsg::Relay(msgs::UpdateFailHTLC {
+                       channel_id: msg.channel_id,
+                       htlc_id: msg.htlc_id,
+                       reason: HTLCFailReason::reason(err_code, err_data.to_vec())
+                               .get_encrypted_failure_packet(shared_secret, &None),
+               })
+       }
+
        fn decode_update_add_htlc_onion(
                &self, msg: &msgs::UpdateAddHTLC, counterparty_node_id: &PublicKey,
        ) -> Result<
@@ -3103,39 +3906,7 @@ where
                        msg, &self.node_signer, &self.logger, &self.secp_ctx
                )?;
 
-               macro_rules! return_err {
-                       ($msg: expr, $err_code: expr, $data: expr) => {
-                               {
-                                       log_info!(
-                                               WithContext::from(&self.logger, Some(*counterparty_node_id), Some(msg.channel_id)),
-                                               "Failed to accept/forward incoming HTLC: {}", $msg
-                                       );
-                                       // If `msg.blinding_point` is set, we must always fail with malformed.
-                                       if msg.blinding_point.is_some() {
-                                               return Err(HTLCFailureMsg::Malformed(msgs::UpdateFailMalformedHTLC {
-                                                       channel_id: msg.channel_id,
-                                                       htlc_id: msg.htlc_id,
-                                                       sha256_of_onion: [0; 32],
-                                                       failure_code: INVALID_ONION_BLINDING,
-                                               }));
-                                       }
-
-                                       let (err_code, err_data) = if next_hop.is_intro_node_blinded_forward() {
-                                               (INVALID_ONION_BLINDING, &[0; 32][..])
-                                       } else { ($err_code, $data) };
-                                       return Err(HTLCFailureMsg::Relay(msgs::UpdateFailHTLC {
-                                               channel_id: msg.channel_id,
-                                               htlc_id: msg.htlc_id,
-                                               reason: HTLCFailReason::reason(err_code, err_data.to_vec())
-                                                       .get_encrypted_failure_packet(&shared_secret, &None),
-                                       }));
-                               }
-                       }
-               }
-
-               let NextPacketDetails {
-                       next_packet_pubkey, outgoing_amt_msat, outgoing_scid, outgoing_cltv_value
-               } = match next_packet_details_opt {
+               let next_packet_details = match next_packet_details_opt {
                        Some(next_packet_details) => next_packet_details,
                        // it is a receive, so no need for outbound checks
                        None => return Ok((next_hop, shared_secret, None)),
@@ -3143,124 +3914,15 @@ where
 
                // Perform outbound checks here instead of in [`Self::construct_pending_htlc_info`] because we
                // can't hold the outbound peer state lock at the same time as the inbound peer state lock.
-               if let Some((err, mut code, chan_update)) = loop {
-                       let id_option = self.short_to_chan_info.read().unwrap().get(&outgoing_scid).cloned();
-                       let forwarding_chan_info_opt = match id_option {
-                               None => { // unknown_next_peer
-                                       // Note that this is likely a timing oracle for detecting whether an scid is a
-                                       // phantom or an intercept.
-                                       if (self.default_configuration.accept_intercept_htlcs &&
-                                               fake_scid::is_valid_intercept(&self.fake_scid_rand_bytes, outgoing_scid, &self.chain_hash)) ||
-                                               fake_scid::is_valid_phantom(&self.fake_scid_rand_bytes, outgoing_scid, &self.chain_hash)
-                                       {
-                                               None
-                                       } else {
-                                               break Some(("Don't have available channel for forwarding as requested.", 0x4000 | 10, None));
-                                       }
-                               },
-                               Some((cp_id, id)) => Some((cp_id.clone(), id.clone())),
-                       };
-                       let chan_update_opt = if let Some((counterparty_node_id, forwarding_id)) = forwarding_chan_info_opt {
-                               let per_peer_state = self.per_peer_state.read().unwrap();
-                               let peer_state_mutex_opt = per_peer_state.get(&counterparty_node_id);
-                               if peer_state_mutex_opt.is_none() {
-                                       break Some(("Don't have available channel for forwarding as requested.", 0x4000 | 10, None));
-                               }
-                               let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap();
-                               let peer_state = &mut *peer_state_lock;
-                               let chan = match peer_state.channel_by_id.get_mut(&forwarding_id).map(
-                                       |chan_phase| if let ChannelPhase::Funded(chan) = chan_phase { Some(chan) } else { None }
-                               ).flatten() {
-                                       None => {
-                                               // Channel was removed. The short_to_chan_info and channel_by_id maps
-                                               // have no consistency guarantees.
-                                               break Some(("Don't have available channel for forwarding as requested.", 0x4000 | 10, None));
-                                       },
-                                       Some(chan) => chan
-                               };
-                               if !chan.context.should_announce() && !self.default_configuration.accept_forwards_to_priv_channels {
-                                       // Note that the behavior here should be identical to the above block - we
-                                       // should NOT reveal the existence or non-existence of a private channel if
-                                       // we don't allow forwards outbound over them.
-                                       break Some(("Refusing to forward to a private channel based on our config.", 0x4000 | 10, None));
-                               }
-                               if chan.context.get_channel_type().supports_scid_privacy() && outgoing_scid != chan.context.outbound_scid_alias() {
-                                       // `option_scid_alias` (referred to in LDK as `scid_privacy`) means
-                                       // "refuse to forward unless the SCID alias was used", so we pretend
-                                       // we don't have the channel here.
-                                       break Some(("Refusing to forward over real channel SCID as our counterparty requested.", 0x4000 | 10, None));
-                               }
-                               let chan_update_opt = self.get_channel_update_for_onion(outgoing_scid, chan).ok();
-
-                               // Note that we could technically not return an error yet here and just hope
-                               // that the connection is reestablished or monitor updated by the time we get
-                               // around to doing the actual forward, but better to fail early if we can and
-                               // hopefully an attacker trying to path-trace payments cannot make this occur
-                               // on a small/per-node/per-channel scale.
-                               if !chan.context.is_live() { // channel_disabled
-                                       // If the channel_update we're going to return is disabled (i.e. the
-                                       // peer has been disabled for some time), return `channel_disabled`,
-                                       // otherwise return `temporary_channel_failure`.
-                                       if chan_update_opt.as_ref().map(|u| u.contents.flags & 2 == 2).unwrap_or(false) {
-                                               break Some(("Forwarding channel has been disconnected for some time.", 0x1000 | 20, chan_update_opt));
-                                       } else {
-                                               break Some(("Forwarding channel is not in a ready state.", 0x1000 | 7, chan_update_opt));
-                                       }
-                               }
-                               if outgoing_amt_msat < chan.context.get_counterparty_htlc_minimum_msat() { // amount_below_minimum
-                                       break Some(("HTLC amount was below the htlc_minimum_msat", 0x1000 | 11, chan_update_opt));
-                               }
-                               if let Err((err, code)) = chan.htlc_satisfies_config(&msg, outgoing_amt_msat, outgoing_cltv_value) {
-                                       break Some((err, code, chan_update_opt));
-                               }
-                               chan_update_opt
-                       } else {
-                               None
-                       };
-
-                       let cur_height = self.best_block.read().unwrap().height + 1;
-
-                       if let Err((err_msg, code)) = check_incoming_htlc_cltv(
-                               cur_height, outgoing_cltv_value, msg.cltv_expiry
-                       ) {
-                               if code & 0x1000 != 0 && chan_update_opt.is_none() {
-                                       // We really should set `incorrect_cltv_expiry` here but as we're not
-                                       // forwarding over a real channel we can't generate a channel_update
-                                       // for it. Instead we just return a generic temporary_node_failure.
-                                       break Some((err_msg, 0x2000 | 2, None))
-                               }
-                               let chan_update_opt = if code & 0x1000 != 0 { chan_update_opt } else { None };
-                               break Some((err_msg, code, chan_update_opt));
-                       }
+               self.can_forward_htlc(&msg, &next_packet_details).map_err(|e| {
+                       let (err_msg, err_code, chan_update_opt) = e;
+                       self.htlc_failure_from_update_add_err(
+                               msg, counterparty_node_id, err_msg, err_code, chan_update_opt,
+                               next_hop.is_intro_node_blinded_forward(), &shared_secret
+                       )
+               })?;
 
-                       break None;
-               }
-               {
-                       let mut res = VecWriter(Vec::with_capacity(chan_update.serialized_length() + 2 + 8 + 2));
-                       if let Some(chan_update) = chan_update {
-                               if code == 0x1000 | 11 || code == 0x1000 | 12 {
-                                       msg.amount_msat.write(&mut res).expect("Writes cannot fail");
-                               }
-                               else if code == 0x1000 | 13 {
-                                       msg.cltv_expiry.write(&mut res).expect("Writes cannot fail");
-                               }
-                               else if code == 0x1000 | 20 {
-                                       // TODO: underspecified, follow https://github.com/lightning/bolts/issues/791
-                                       0u16.write(&mut res).expect("Writes cannot fail");
-                               }
-                               (chan_update.serialized_length() as u16 + 2).write(&mut res).expect("Writes cannot fail");
-                               msgs::ChannelUpdate::TYPE.write(&mut res).expect("Writes cannot fail");
-                               chan_update.write(&mut res).expect("Writes cannot fail");
-                       } else if code & 0x1000 == 0x1000 {
-                               // If we're trying to return an error that requires a `channel_update` but
-                               // we're forwarding to a phantom or intercept "channel" (i.e. cannot
-                               // generate an update), just use the generic "temporary_node_failure"
-                               // instead.
-                               code = 0x2000 | 2;
-                       }
-                       return_err!(err, code, &res.0[..]);
-               }
-               Ok((next_hop, shared_secret, Some(next_packet_pubkey)))
+               Ok((next_hop, shared_secret, Some(next_packet_details.next_packet_pubkey)))
        }
 
        fn construct_pending_htlc_status<'a>(
@@ -4097,6 +4759,7 @@ where
                        .ok_or_else(|| APIError::ChannelUnavailable { err: format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id) })?;
                let mut peer_state_lock = peer_state_mutex.lock().unwrap();
                let peer_state = &mut *peer_state_lock;
+
                for channel_id in channel_ids {
                        if !peer_state.has_channel(channel_id) {
                                return Err(APIError::ChannelUnavailable {
@@ -4113,7 +4776,8 @@ where
                                }
                                if let ChannelPhase::Funded(channel) = channel_phase {
                                        if let Ok(msg) = self.get_channel_update_for_broadcast(channel) {
-                                               peer_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { msg });
+                                               let mut pending_broadcast_messages = self.pending_broadcast_messages.lock().unwrap();
+                                               pending_broadcast_messages.push(events::MessageSendEvent::BroadcastChannelUpdate { msg });
                                        } else if let Ok(msg) = self.get_channel_update_for_unicast(channel) {
                                                peer_state.pending_msg_events.push(events::MessageSendEvent::SendChannelUpdate {
                                                        node_id: channel.context.get_counterparty_node_id(),
@@ -4288,6 +4952,145 @@ where
                Ok(())
        }
 
+       fn process_pending_update_add_htlcs(&self) {
+               let mut decode_update_add_htlcs = new_hash_map();
+               mem::swap(&mut decode_update_add_htlcs, &mut self.decode_update_add_htlcs.lock().unwrap());
+
+               let get_failed_htlc_destination = |outgoing_scid_opt: Option<u64>, payment_hash: PaymentHash| {
+                       if let Some(outgoing_scid) = outgoing_scid_opt {
+                               match self.short_to_chan_info.read().unwrap().get(&outgoing_scid) {
+                                       Some((outgoing_counterparty_node_id, outgoing_channel_id)) =>
+                                               HTLCDestination::NextHopChannel {
+                                                       node_id: Some(*outgoing_counterparty_node_id),
+                                                       channel_id: *outgoing_channel_id,
+                                               },
+                                       None => HTLCDestination::UnknownNextHop {
+                                               requested_forward_scid: outgoing_scid,
+                                       },
+                               }
+                       } else {
+                               HTLCDestination::FailedPayment { payment_hash }
+                       }
+               };
+
+               'outer_loop: for (incoming_scid, update_add_htlcs) in decode_update_add_htlcs {
+                       let incoming_channel_details_opt = self.do_funded_channel_callback(incoming_scid, |chan: &mut Channel<SP>| {
+                               let counterparty_node_id = chan.context.get_counterparty_node_id();
+                               let channel_id = chan.context.channel_id();
+                               let funding_txo = chan.context.get_funding_txo().unwrap();
+                               let user_channel_id = chan.context.get_user_id();
+                               let accept_underpaying_htlcs = chan.context.config().accept_underpaying_htlcs;
+                               (counterparty_node_id, channel_id, funding_txo, user_channel_id, accept_underpaying_htlcs)
+                       });
+                       let (
+                               incoming_counterparty_node_id, incoming_channel_id, incoming_funding_txo,
+                               incoming_user_channel_id, incoming_accept_underpaying_htlcs
+                        ) = if let Some(incoming_channel_details) = incoming_channel_details_opt {
+                               incoming_channel_details
+                       } else {
+                               // The incoming channel no longer exists, HTLCs should be resolved onchain instead.
+                               continue;
+                       };
+
+                       let mut htlc_forwards = Vec::new();
+                       let mut htlc_fails = Vec::new();
+                       for update_add_htlc in &update_add_htlcs {
+                               let (next_hop, shared_secret, next_packet_details_opt) = match decode_incoming_update_add_htlc_onion(
+                                       &update_add_htlc, &self.node_signer, &self.logger, &self.secp_ctx
+                               ) {
+                                       Ok(decoded_onion) => decoded_onion,
+                                       Err(htlc_fail) => {
+                                               htlc_fails.push((htlc_fail, HTLCDestination::InvalidOnion));
+                                               continue;
+                                       },
+                               };
+
+                               let is_intro_node_blinded_forward = next_hop.is_intro_node_blinded_forward();
+                               let outgoing_scid_opt = next_packet_details_opt.as_ref().map(|d| d.outgoing_scid);
+
+                               // Process the HTLC on the incoming channel.
+                               match self.do_funded_channel_callback(incoming_scid, |chan: &mut Channel<SP>| {
+                                       let logger = WithChannelContext::from(&self.logger, &chan.context);
+                                       chan.can_accept_incoming_htlc(
+                                               update_add_htlc, &self.fee_estimator, &logger,
+                                       )
+                               }) {
+                                       Some(Ok(_)) => {},
+                                       Some(Err((err, code))) => {
+                                               let outgoing_chan_update_opt = if let Some(outgoing_scid) = outgoing_scid_opt.as_ref() {
+                                                       self.do_funded_channel_callback(*outgoing_scid, |chan: &mut Channel<SP>| {
+                                                               self.get_channel_update_for_onion(*outgoing_scid, chan).ok()
+                                                       }).flatten()
+                                               } else {
+                                                       None
+                                               };
+                                               let htlc_fail = self.htlc_failure_from_update_add_err(
+                                                       &update_add_htlc, &incoming_counterparty_node_id, err, code,
+                                                       outgoing_chan_update_opt, is_intro_node_blinded_forward, &shared_secret,
+                                               );
+                                               let htlc_destination = get_failed_htlc_destination(outgoing_scid_opt, update_add_htlc.payment_hash);
+                                               htlc_fails.push((htlc_fail, htlc_destination));
+                                               continue;
+                                       },
+                                       // The incoming channel no longer exists, HTLCs should be resolved onchain instead.
+                                       None => continue 'outer_loop,
+                               }
+
+                               // Now process the HTLC on the outgoing channel if it's a forward.
+                               if let Some(next_packet_details) = next_packet_details_opt.as_ref() {
+                                       if let Err((err, code, chan_update_opt)) = self.can_forward_htlc(
+                                               &update_add_htlc, next_packet_details
+                                       ) {
+                                               let htlc_fail = self.htlc_failure_from_update_add_err(
+                                                       &update_add_htlc, &incoming_counterparty_node_id, err, code,
+                                                       chan_update_opt, is_intro_node_blinded_forward, &shared_secret,
+                                               );
+                                               let htlc_destination = get_failed_htlc_destination(outgoing_scid_opt, update_add_htlc.payment_hash);
+                                               htlc_fails.push((htlc_fail, htlc_destination));
+                                               continue;
+                                       }
+                               }
+
+                               match self.construct_pending_htlc_status(
+                                       &update_add_htlc, &incoming_counterparty_node_id, shared_secret, next_hop,
+                                       incoming_accept_underpaying_htlcs, next_packet_details_opt.map(|d| d.next_packet_pubkey),
+                               ) {
+                                       PendingHTLCStatus::Forward(htlc_forward) => {
+                                               htlc_forwards.push((htlc_forward, update_add_htlc.htlc_id));
+                                       },
+                                       PendingHTLCStatus::Fail(htlc_fail) => {
+                                               let htlc_destination = get_failed_htlc_destination(outgoing_scid_opt, update_add_htlc.payment_hash);
+                                               htlc_fails.push((htlc_fail, htlc_destination));
+                                       },
+                               }
+                       }
+
+                       // Process all of the forwards and failures for the channel in which the HTLCs were
+                       // proposed to as a batch.
+                       let pending_forwards = (incoming_scid, incoming_funding_txo, incoming_channel_id,
+                               incoming_user_channel_id, htlc_forwards.drain(..).collect());
+                       self.forward_htlcs_without_forward_event(&mut [pending_forwards]);
+                       for (htlc_fail, htlc_destination) in htlc_fails.drain(..) {
+                               let failure = match htlc_fail {
+                                       HTLCFailureMsg::Relay(fail_htlc) => HTLCForwardInfo::FailHTLC {
+                                               htlc_id: fail_htlc.htlc_id,
+                                               err_packet: fail_htlc.reason,
+                                       },
+                                       HTLCFailureMsg::Malformed(fail_malformed_htlc) => HTLCForwardInfo::FailMalformedHTLC {
+                                               htlc_id: fail_malformed_htlc.htlc_id,
+                                               sha256_of_onion: fail_malformed_htlc.sha256_of_onion,
+                                               failure_code: fail_malformed_htlc.failure_code,
+                                       },
+                               };
+                               self.forward_htlcs.lock().unwrap().entry(incoming_scid).or_insert(vec![]).push(failure);
+                               self.pending_events.lock().unwrap().push_back((events::Event::HTLCHandlingFailed {
+                                       prev_channel_id: incoming_channel_id,
+                                       failed_next_destination: htlc_destination,
+                               }, None));
+                       }
+               }
+       }
+
        /// Processes HTLCs which are pending waiting on random forward delay.
        ///
        /// Should only really ever be called in response to a PendingHTLCsForwardable event.
@@ -4295,6 +5098,8 @@ where
        pub fn process_pending_htlc_forwards(&self) {
                let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self);
 
+               self.process_pending_update_add_htlcs();
+
                let mut new_events = VecDeque::new();
                let mut failed_forwards = Vec::new();
                let mut phantom_receives: Vec<(u64, OutPoint, ChannelId, u128, Vec<(PendingHTLCInfo, u64)>)> = Vec::new();
@@ -5022,7 +5827,8 @@ where
                                                                                if n >= DISABLE_GOSSIP_TICKS {
                                                                                        chan.set_channel_update_status(ChannelUpdateStatus::Disabled);
                                                                                        if let Ok(update) = self.get_channel_update_for_broadcast(&chan) {
-                                                                                               pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
+                                                                                               let mut pending_broadcast_messages = self.pending_broadcast_messages.lock().unwrap();
+                                                                                               pending_broadcast_messages.push(events::MessageSendEvent::BroadcastChannelUpdate {
                                                                                                        msg: update
                                                                                                });
                                                                                        }
@@ -5036,7 +5842,8 @@ where
                                                                                if n >= ENABLE_GOSSIP_TICKS {
                                                                                        chan.set_channel_update_status(ChannelUpdateStatus::Enabled);
                                                                                        if let Ok(update) = self.get_channel_update_for_broadcast(&chan) {
-                                                                                               pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
+                                                                                               let mut pending_broadcast_messages = self.pending_broadcast_messages.lock().unwrap();
+                                                                                               pending_broadcast_messages.push(events::MessageSendEvent::BroadcastChannelUpdate {
                                                                                                        msg: update
                                                                                                });
                                                                                        }
@@ -5339,9 +6146,14 @@ where
                }
        }
 
+       fn fail_htlc_backwards_internal(&self, source: &HTLCSource, payment_hash: &PaymentHash, onion_error: &HTLCFailReason, destination: HTLCDestination) {
+               let push_forward_event = self.fail_htlc_backwards_internal_without_forward_event(source, payment_hash, onion_error, destination);
+               if push_forward_event { self.push_pending_forwards_ev(); }
+       }
+
        /// Fails an HTLC backwards to the sender of it to us.
        /// Note that we do not assume that channels corresponding to failed HTLCs are still available.
-       fn fail_htlc_backwards_internal(&self, source: &HTLCSource, payment_hash: &PaymentHash, onion_error: &HTLCFailReason, destination: HTLCDestination) {
+       fn fail_htlc_backwards_internal_without_forward_event(&self, source: &HTLCSource, payment_hash: &PaymentHash, onion_error: &HTLCFailReason, destination: HTLCDestination) -> bool {
                // Ensure that no peer state channel storage lock is held when calling this function.
                // This ensures that future code doesn't introduce a lock-order requirement for
                // `forward_htlcs` to be locked after the `per_peer_state` peer locks, which calling
@@ -5359,12 +6171,12 @@ where
                // Note that we MUST NOT end up calling methods on self.chain_monitor here - we're called
                // from block_connected which may run during initialization prior to the chain_monitor
                // being fully configured. See the docs for `ChannelManagerReadArgs` for more.
+               let mut push_forward_event;
                match source {
                        HTLCSource::OutboundRoute { ref path, ref session_priv, ref payment_id, .. } => {
-                               if self.pending_outbound_payments.fail_htlc(source, payment_hash, onion_error, path,
+                               push_forward_event = self.pending_outbound_payments.fail_htlc(source, payment_hash, onion_error, path,
                                        session_priv, payment_id, self.probing_cookie_secret, &self.secp_ctx,
-                                       &self.pending_events, &self.logger)
-                               { self.push_pending_forwards_ev(); }
+                                       &self.pending_events, &self.logger);
                        },
                        HTLCSource::PreviousHopData(HTLCPreviousHopData {
                                ref short_channel_id, ref htlc_id, ref incoming_packet_shared_secret,
@@ -5398,11 +6210,9 @@ where
                                        }
                                };
 
-                               let mut push_forward_ev = false;
+                               push_forward_event = self.decode_update_add_htlcs.lock().unwrap().is_empty();
                                let mut forward_htlcs = self.forward_htlcs.lock().unwrap();
-                               if forward_htlcs.is_empty() {
-                                       push_forward_ev = true;
-                               }
+                               push_forward_event &= forward_htlcs.is_empty();
                                match forward_htlcs.entry(*short_channel_id) {
                                        hash_map::Entry::Occupied(mut entry) => {
                                                entry.get_mut().push(failure);
@@ -5412,7 +6222,6 @@ where
                                        }
                                }
                                mem::drop(forward_htlcs);
-                               if push_forward_ev { self.push_pending_forwards_ev(); }
                                let mut pending_events = self.pending_events.lock().unwrap();
                                pending_events.push_back((events::Event::HTLCHandlingFailed {
                                        prev_channel_id: *channel_id,
@@ -5420,6 +6229,7 @@ where
                                }, None));
                        },
                }
+               push_forward_event
        }
 
        /// Provides a payment preimage in response to [`Event::PaymentClaimable`], generating any
@@ -6759,9 +7569,8 @@ where
                }
                if let Some(ChannelPhase::Funded(chan)) = chan_option {
                        if let Ok(update) = self.get_channel_update_for_broadcast(&chan) {
-                               let mut peer_state_lock = peer_state_mutex.lock().unwrap();
-                               let peer_state = &mut *peer_state_lock;
-                               peer_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
+                               let mut pending_broadcast_messages = self.pending_broadcast_messages.lock().unwrap();
+                               pending_broadcast_messages.push(events::MessageSendEvent::BroadcastChannelUpdate {
                                        msg: update
                                });
                        }
@@ -6798,7 +7607,7 @@ where
                match peer_state.channel_by_id.entry(msg.channel_id) {
                        hash_map::Entry::Occupied(mut chan_phase_entry) => {
                                if let ChannelPhase::Funded(chan) = chan_phase_entry.get_mut() {
-                                       let pending_forward_info = match decoded_hop_res {
+                                       let mut pending_forward_info = match decoded_hop_res {
                                                Ok((next_hop, shared_secret, next_packet_pk_opt)) =>
                                                        self.construct_pending_htlc_status(
                                                                msg, counterparty_node_id, shared_secret, next_hop,
@@ -6806,44 +7615,45 @@ where
                                                        ),
                                                Err(e) => PendingHTLCStatus::Fail(e)
                                        };
-                                       let create_pending_htlc_status = |chan: &Channel<SP>, pending_forward_info: PendingHTLCStatus, error_code: u16| {
+                                       let logger = WithChannelContext::from(&self.logger, &chan.context);
+                                       // If the update_add is completely bogus, the call will Err and we will close,
+                                       // but if we've sent a shutdown and they haven't acknowledged it yet, we just
+                                       // want to reject the new HTLC and fail it backwards instead of forwarding.
+                                       if let Err((_, error_code)) = chan.can_accept_incoming_htlc(&msg, &self.fee_estimator, &logger) {
                                                if msg.blinding_point.is_some() {
-                                                       return PendingHTLCStatus::Fail(HTLCFailureMsg::Malformed(
-                                                                       msgs::UpdateFailMalformedHTLC {
-                                                                               channel_id: msg.channel_id,
-                                                                               htlc_id: msg.htlc_id,
-                                                                               sha256_of_onion: [0; 32],
-                                                                               failure_code: INVALID_ONION_BLINDING,
-                                                                       }
-                                                       ))
-                                               }
-                                               // If the update_add is completely bogus, the call will Err and we will close,
-                                               // but if we've sent a shutdown and they haven't acknowledged it yet, we just
-                                               // want to reject the new HTLC and fail it backwards instead of forwarding.
-                                               match pending_forward_info {
-                                                       PendingHTLCStatus::Forward(PendingHTLCInfo {
-                                                               ref incoming_shared_secret, ref routing, ..
-                                                       }) => {
-                                                               let reason = if routing.blinded_failure().is_some() {
-                                                                       HTLCFailReason::reason(INVALID_ONION_BLINDING, vec![0; 32])
-                                                               } else if (error_code & 0x1000) != 0 {
-                                                                       let (real_code, error_data) = self.get_htlc_inbound_temp_fail_err_and_data(error_code, chan);
-                                                                       HTLCFailReason::reason(real_code, error_data)
-                                                               } else {
-                                                                       HTLCFailReason::from_failure_code(error_code)
-                                                               }.get_encrypted_failure_packet(incoming_shared_secret, &None);
-                                                               let msg = msgs::UpdateFailHTLC {
+                                                       pending_forward_info = PendingHTLCStatus::Fail(HTLCFailureMsg::Malformed(
+                                                               msgs::UpdateFailMalformedHTLC {
                                                                        channel_id: msg.channel_id,
                                                                        htlc_id: msg.htlc_id,
-                                                                       reason
-                                                               };
-                                                               PendingHTLCStatus::Fail(HTLCFailureMsg::Relay(msg))
-                                                       },
-                                                       _ => pending_forward_info
+                                                                       sha256_of_onion: [0; 32],
+                                                                       failure_code: INVALID_ONION_BLINDING,
+                                                               }
+                                                       ))
+                                               } else {
+                                                       match pending_forward_info {
+                                                               PendingHTLCStatus::Forward(PendingHTLCInfo {
+                                                                       ref incoming_shared_secret, ref routing, ..
+                                                               }) => {
+                                                                       let reason = if routing.blinded_failure().is_some() {
+                                                                               HTLCFailReason::reason(INVALID_ONION_BLINDING, vec![0; 32])
+                                                                       } else if (error_code & 0x1000) != 0 {
+                                                                               let (real_code, error_data) = self.get_htlc_inbound_temp_fail_err_and_data(error_code, chan);
+                                                                               HTLCFailReason::reason(real_code, error_data)
+                                                                       } else {
+                                                                               HTLCFailReason::from_failure_code(error_code)
+                                                                       }.get_encrypted_failure_packet(incoming_shared_secret, &None);
+                                                                       let msg = msgs::UpdateFailHTLC {
+                                                                               channel_id: msg.channel_id,
+                                                                               htlc_id: msg.htlc_id,
+                                                                               reason
+                                                                       };
+                                                                       pending_forward_info = PendingHTLCStatus::Fail(HTLCFailureMsg::Relay(msg));
+                                                               },
+                                                               _ => {},
+                                                       }
                                                }
-                                       };
-                                       let logger = WithChannelContext::from(&self.logger, &chan.context);
-                                       try_chan_phase_entry!(self, chan.update_add_htlc(&msg, pending_forward_info, create_pending_htlc_status, &self.fee_estimator, &&logger), chan_phase_entry);
+                                       }
+                                       try_chan_phase_entry!(self, chan.update_add_htlc(&msg, pending_forward_info), chan_phase_entry);
                                } else {
                                        return try_chan_phase_entry!(self, Err(ChannelError::Close(
                                                "Got an update_add_htlc message for an unfunded channel!".into())), chan_phase_entry);
@@ -6988,18 +7798,27 @@ where
        }
 
        fn push_decode_update_add_htlcs(&self, mut update_add_htlcs: (u64, Vec<msgs::UpdateAddHTLC>)) {
+               let mut push_forward_event = self.forward_htlcs.lock().unwrap().is_empty();
                let mut decode_update_add_htlcs = self.decode_update_add_htlcs.lock().unwrap();
+               push_forward_event &= decode_update_add_htlcs.is_empty();
                let scid = update_add_htlcs.0;
                match decode_update_add_htlcs.entry(scid) {
                        hash_map::Entry::Occupied(mut e) => { e.get_mut().append(&mut update_add_htlcs.1); },
                        hash_map::Entry::Vacant(e) => { e.insert(update_add_htlcs.1); },
                }
+               if push_forward_event { self.push_pending_forwards_ev(); }
        }
 
        #[inline]
        fn forward_htlcs(&self, per_source_pending_forwards: &mut [(u64, OutPoint, ChannelId, u128, Vec<(PendingHTLCInfo, u64)>)]) {
+               let push_forward_event = self.forward_htlcs_without_forward_event(per_source_pending_forwards);
+               if push_forward_event { self.push_pending_forwards_ev() }
+       }
+
+       #[inline]
+       fn forward_htlcs_without_forward_event(&self, per_source_pending_forwards: &mut [(u64, OutPoint, ChannelId, u128, Vec<(PendingHTLCInfo, u64)>)]) -> bool {
+               let mut push_forward_event = false;
                for &mut (prev_short_channel_id, prev_funding_outpoint, prev_channel_id, prev_user_channel_id, ref mut pending_forwards) in per_source_pending_forwards {
-                       let mut push_forward_event = false;
                        let mut new_intercept_events = VecDeque::new();
                        let mut failed_intercept_forwards = Vec::new();
                        if !pending_forwards.is_empty() {
@@ -7012,6 +7831,7 @@ where
                                        // Pull this now to avoid introducing a lock order with `forward_htlcs`.
                                        let is_our_scid = self.short_to_chan_info.read().unwrap().contains_key(&scid);
 
+                                       let decode_update_add_htlcs_empty = self.decode_update_add_htlcs.lock().unwrap().is_empty();
                                        let mut forward_htlcs = self.forward_htlcs.lock().unwrap();
                                        let forward_htlcs_empty = forward_htlcs.is_empty();
                                        match forward_htlcs.entry(scid) {
@@ -7060,9 +7880,7 @@ where
                                                        } else {
                                                                // We don't want to generate a PendingHTLCsForwardable event if only intercepted
                                                                // payments are being processed.
-                                                               if forward_htlcs_empty {
-                                                                       push_forward_event = true;
-                                                               }
+                                                               push_forward_event |= forward_htlcs_empty && decode_update_add_htlcs_empty;
                                                                entry.insert(vec!(HTLCForwardInfo::AddHTLC(PendingAddHTLCInfo {
                                                                        prev_short_channel_id, prev_funding_outpoint, prev_channel_id, prev_htlc_id, prev_user_channel_id, forward_info })));
                                                        }
@@ -7072,15 +7890,15 @@ where
                        }
 
                        for (htlc_source, payment_hash, failure_reason, destination) in failed_intercept_forwards.drain(..) {
-                               self.fail_htlc_backwards_internal(&htlc_source, &payment_hash, &failure_reason, destination);
+                               push_forward_event |= self.fail_htlc_backwards_internal_without_forward_event(&htlc_source, &payment_hash, &failure_reason, destination);
                        }
 
                        if !new_intercept_events.is_empty() {
                                let mut events = self.pending_events.lock().unwrap();
                                events.append(&mut new_intercept_events);
                        }
-                       if push_forward_event { self.push_pending_forwards_ev() }
                }
+               push_forward_event
        }
 
        fn push_pending_forwards_ev(&self) {
@@ -7437,7 +8255,8 @@ where
                                                                                };
                                                                                failed_channels.push(chan.context.force_shutdown(false, reason.clone()));
                                                                                if let Ok(update) = self.get_channel_update_for_broadcast(&chan) {
-                                                                                       pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
+                                                                                       let mut pending_broadcast_messages = self.pending_broadcast_messages.lock().unwrap();
+                                                                                       pending_broadcast_messages.push(events::MessageSendEvent::BroadcastChannelUpdate {
                                                                                                msg: update
                                                                                        });
                                                                                }
@@ -7622,7 +8441,8 @@ where
                                                                                // We're done with this channel. We got a closing_signed and sent back
                                                                                // a closing_signed with a closing transaction to broadcast.
                                                                                if let Ok(update) = self.get_channel_update_for_broadcast(&chan) {
-                                                                                       pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
+                                                                                       let mut pending_broadcast_messages = self.pending_broadcast_messages.lock().unwrap();
+                                                                                       pending_broadcast_messages.push(events::MessageSendEvent::BroadcastChannelUpdate {
                                                                                                msg: update
                                                                                        });
                                                                                }
@@ -8382,7 +9202,7 @@ where
        /// will randomly be placed first or last in the returned array.
        ///
        /// Note that even though `BroadcastChannelAnnouncement` and `BroadcastChannelUpdate`
-       /// `MessageSendEvent`s are intended to be broadcasted to all peers, they will be pleaced among
+       /// `MessageSendEvent`s are intended to be broadcasted to all peers, they will be placed among
        /// the `MessageSendEvent`s to the specific peer they were generated under.
        fn get_and_clear_pending_msg_events(&self) -> Vec<MessageSendEvent> {
                let events = RefCell::new(Vec::new());
@@ -8402,6 +9222,7 @@ where
                                result = NotifyOption::DoPersist;
                        }
 
+                       let mut is_any_peer_connected = false;
                        let mut pending_events = Vec::new();
                        let per_peer_state = self.per_peer_state.read().unwrap();
                        for (_cp_id, peer_state_mutex) in per_peer_state.iter() {
@@ -8410,6 +9231,15 @@ where
                                if peer_state.pending_msg_events.len() > 0 {
                                        pending_events.append(&mut peer_state.pending_msg_events);
                                }
+                               if peer_state.is_connected {
+                                       is_any_peer_connected = true
+                               }
+                       }
+
+                       // Ensure that we are connected to some peers before getting broadcast messages.
+                       if is_any_peer_connected {
+                               let mut broadcast_msgs = self.pending_broadcast_messages.lock().unwrap();
+                               pending_events.append(&mut broadcast_msgs);
                        }
 
                        if !pending_events.is_empty() {
@@ -8614,6 +9444,7 @@ where
                                let mut peer_state_lock = peer_state_mutex.lock().unwrap();
                                let peer_state = &mut *peer_state_lock;
                                let pending_msg_events = &mut peer_state.pending_msg_events;
+
                                peer_state.channel_by_id.retain(|_, phase| {
                                        match phase {
                                                // Retain unfunded channels.
@@ -8689,7 +9520,8 @@ where
                                                                let reason_message = format!("{}", reason);
                                                                failed_channels.push(channel.context.force_shutdown(true, reason));
                                                                if let Ok(update) = self.get_channel_update_for_broadcast(&channel) {
-                                                                       pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
+                                                                       let mut pending_broadcast_messages = self.pending_broadcast_messages.lock().unwrap();
+                                                                       pending_broadcast_messages.push(events::MessageSendEvent::BroadcastChannelUpdate {
                                                                                msg: update
                                                                        });
                                                                }
@@ -8781,6 +9613,9 @@ where
        }
 
        /// Returns true if this [`ChannelManager`] needs to be persisted.
+       ///
+       /// See [`Self::get_event_or_persistence_needed_future`] for retrieving a [`Future`] that
+       /// indicates this should be checked.
        pub fn get_and_clear_needs_persistence(&self) -> bool {
                self.needs_persist_flag.swap(false, Ordering::AcqRel)
        }
@@ -9146,7 +9981,12 @@ where
                                                // Gossip
                                                &events::MessageSendEvent::SendChannelAnnouncement { .. } => false,
                                                &events::MessageSendEvent::BroadcastChannelAnnouncement { .. } => true,
-                                               &events::MessageSendEvent::BroadcastChannelUpdate { .. } => true,
+                                               // [`ChannelManager::pending_broadcast_events`] holds the [`BroadcastChannelUpdate`]
+                                               // This check here is to ensure exhaustivity.
+                                               &events::MessageSendEvent::BroadcastChannelUpdate { .. } => {
+                                                       debug_assert!(false, "This event shouldn't have been here");
+                                                       false
+                                               },
                                                &events::MessageSendEvent::BroadcastNodeAnnouncement { .. } => true,
                                                &events::MessageSendEvent::SendChannelUpdate { .. } => false,
                                                &events::MessageSendEvent::SendChannelRangeQuery { .. } => false,
@@ -9593,6 +10433,23 @@ where
        }
 }
 
+impl<M: Deref, T: Deref, ES: Deref, NS: Deref, SP: Deref, F: Deref, R: Deref, L: Deref>
+NodeIdLookUp for ChannelManager<M, T, ES, NS, SP, F, R, L>
+where
+       M::Target: chain::Watch<<SP::Target as SignerProvider>::EcdsaSigner>,
+       T::Target: BroadcasterInterface,
+       ES::Target: EntropySource,
+       NS::Target: NodeSigner,
+       SP::Target: SignerProvider,
+       F::Target: FeeEstimator,
+       R::Target: Router,
+       L::Target: Logger,
+{
+       fn next_node_id(&self, short_channel_id: u64) -> Option<PublicKey> {
+               self.short_to_chan_info.read().unwrap().get(&short_channel_id).map(|(pubkey, _)| *pubkey)
+       }
+}
+
 /// Fetches the set of [`NodeFeatures`] flags that are provided by or required by
 /// [`ChannelManager`].
 pub(crate) fn provided_node_features(config: &UserConfig) -> NodeFeatures {
@@ -10857,7 +11714,7 @@ where
                        (13, claimable_htlc_onion_fields, optional_vec),
                        (14, decode_update_add_htlcs, option),
                });
-               let decode_update_add_htlcs = decode_update_add_htlcs.unwrap_or_else(|| new_hash_map());
+               let mut decode_update_add_htlcs = decode_update_add_htlcs.unwrap_or_else(|| new_hash_map());
                if fake_scid_rand_bytes.is_none() {
                        fake_scid_rand_bytes = Some(args.entropy_source.get_secure_random_bytes());
                }
@@ -10957,7 +11814,7 @@ where
                                                }
                                        }
                                        if chan.get_latest_unblocked_monitor_update_id() > max_in_flight_update_id {
-                                               // If the channel is ahead of the monitor, return InvalidValue:
+                                               // If the channel is ahead of the monitor, return DangerousValue:
                                                log_error!(logger, "A ChannelMonitor is stale compared to the current ChannelManager! This indicates a potentially-critical violation of the chain::Watch API!");
                                                log_error!(logger, " The ChannelMonitor for channel {} is at update_id {} with update_id through {} in-flight",
                                                        chan.context.channel_id(), monitor.get_latest_update_id(), max_in_flight_update_id);
@@ -10966,7 +11823,7 @@ where
                                                log_error!(logger, " client applications must ensure that ChannelMonitor data is always available and the latest to avoid funds loss!");
                                                log_error!(logger, " Without the latest ChannelMonitor we cannot continue without risking funds.");
                                                log_error!(logger, " Please ensure the chain::Watch API requirements are met and file a bug report at https://github.com/lightningdevkit/rust-lightning");
-                                               return Err(DecodeError::InvalidValue);
+                                               return Err(DecodeError::DangerousValue);
                                        }
                                } else {
                                        // We shouldn't have persisted (or read) any unfunded channel types so none should have been
@@ -11077,6 +11934,18 @@ where
                                                                // still have an entry for this HTLC in `forward_htlcs` or
                                                                // `pending_intercepted_htlcs`, we were apparently not persisted after
                                                                // the monitor was when forwarding the payment.
+                                                               decode_update_add_htlcs.retain(|scid, update_add_htlcs| {
+                                                                       update_add_htlcs.retain(|update_add_htlc| {
+                                                                               let matches = *scid == prev_hop_data.short_channel_id &&
+                                                                                       update_add_htlc.htlc_id == prev_hop_data.htlc_id;
+                                                                               if matches {
+                                                                                       log_info!(logger, "Removing pending to-decode HTLC with hash {} as it was forwarded to the closed channel {}",
+                                                                                               &htlc.payment_hash, &monitor.channel_id());
+                                                                               }
+                                                                               !matches
+                                                                       });
+                                                                       !update_add_htlcs.is_empty()
+                                                               });
                                                                forward_htlcs.retain(|_, forwards| {
                                                                        forwards.retain(|forward| {
                                                                                if let HTLCForwardInfo::AddHTLC(htlc_info) = forward {
@@ -11158,7 +12027,7 @@ where
                        }
                }
 
-               if !forward_htlcs.is_empty() || pending_outbounds.needs_abandon() {
+               if !forward_htlcs.is_empty() || !decode_update_add_htlcs.is_empty() || pending_outbounds.needs_abandon() {
                        // If we have pending HTLCs to forward, assume we either dropped a
                        // `PendingHTLCsForwardable` or the user received it but never processed it as they
                        // shut down before the timer hit. Either way, set the time_forwardable to a small
@@ -11421,6 +12290,8 @@ where
 
                        pending_offers_messages: Mutex::new(Vec::new()),
 
+                       pending_broadcast_messages: Mutex::new(Vec::new()),
+
                        entropy_source: args.entropy_source,
                        node_signer: args.node_signer,
                        signer_provider: args.signer_provider,
@@ -11952,6 +12823,61 @@ mod tests {
                }
        }
 
+       #[test]
+       fn test_channel_update_cached() {
+               let chanmon_cfgs = create_chanmon_cfgs(3);
+               let node_cfgs = create_node_cfgs(3, &chanmon_cfgs);
+               let node_chanmgrs = create_node_chanmgrs(3, &node_cfgs, &[None, None, None]);
+               let nodes = create_network(3, &node_cfgs, &node_chanmgrs);
+
+               let chan = create_announced_chan_between_nodes(&nodes, 0, 1);
+
+               nodes[0].node.force_close_channel_with_peer(&chan.2, &nodes[1].node.get_our_node_id(), None, true).unwrap();
+               check_added_monitors!(nodes[0], 1);
+               check_closed_event!(nodes[0], 1, ClosureReason::HolderForceClosed, [nodes[1].node.get_our_node_id()], 100000);
+
+               // Confirm that the channel_update was not sent immediately to node[1] but was cached.
+               let node_1_events = nodes[1].node.get_and_clear_pending_msg_events();
+               assert_eq!(node_1_events.len(), 0);
+
+               {
+                       // Assert that ChannelUpdate message has been added to node[0] pending broadcast messages
+                       let pending_broadcast_messages= nodes[0].node.pending_broadcast_messages.lock().unwrap();
+                       assert_eq!(pending_broadcast_messages.len(), 1);
+               }
+
+               // Test that we do not retrieve the pending broadcast messages when we are not connected to any peer
+               nodes[0].node.peer_disconnected(&nodes[1].node.get_our_node_id());
+               nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id());
+
+               nodes[0].node.peer_disconnected(&nodes[2].node.get_our_node_id());
+               nodes[2].node.peer_disconnected(&nodes[0].node.get_our_node_id());
+
+               let node_0_events = nodes[0].node.get_and_clear_pending_msg_events();
+               assert_eq!(node_0_events.len(), 0);
+
+               // Now we reconnect to a peer
+               nodes[0].node.peer_connected(&nodes[2].node.get_our_node_id(), &msgs::Init {
+                       features: nodes[2].node.init_features(), networks: None, remote_network_address: None
+               }, true).unwrap();
+               nodes[2].node.peer_connected(&nodes[0].node.get_our_node_id(), &msgs::Init {
+                       features: nodes[0].node.init_features(), networks: None, remote_network_address: None
+               }, false).unwrap();
+
+               // Confirm that get_and_clear_pending_msg_events correctly captures pending broadcast messages
+               let node_0_events = nodes[0].node.get_and_clear_pending_msg_events();
+               assert_eq!(node_0_events.len(), 1);
+               match &node_0_events[0] {
+                       MessageSendEvent::BroadcastChannelUpdate { .. } => (),
+                       _ => panic!("Unexpected event"),
+               }
+               {
+                       // Assert that ChannelUpdate message has been cleared from nodes[0] pending broadcast messages
+                       let pending_broadcast_messages= nodes[0].node.pending_broadcast_messages.lock().unwrap();
+                       assert_eq!(pending_broadcast_messages.len(), 0);
+               }
+       }
+
        #[test]
        fn test_drop_disconnected_peers_when_removing_channels() {
                let chanmon_cfgs = create_chanmon_cfgs(2);