Remove unnecessary vecs in channel.rs
[rust-lightning] / lightning / src / ln / channelmanager.rs
index 65f442f1c097ea80018e33f9c56d06f1f61cc072..6bfa4f0b03a7c225a84b556d15a6cc6343b8d180 100644 (file)
@@ -45,7 +45,7 @@ use crate::ln::features::{ChannelFeatures, ChannelTypeFeatures, InitFeatures, No
 #[cfg(any(feature = "_test_utils", test))]
 use crate::ln::features::InvoiceFeatures;
 use crate::routing::gossip::NetworkGraph;
-use crate::routing::router::{BlindedTail, DefaultRouter, InFlightHtlcs, Path, Payee, PaymentParameters, Route, RouteHop, RouteParameters, Router};
+use crate::routing::router::{BlindedTail, DefaultRouter, InFlightHtlcs, Path, Payee, PaymentParameters, Route, RouteParameters, Router};
 use crate::routing::scoring::{ProbabilisticScorer, ProbabilisticScoringFeeParameters};
 use crate::ln::msgs;
 use crate::ln::onion_utils;
@@ -341,7 +341,7 @@ impl HTLCSource {
        }
 }
 
-struct ReceiveError {
+struct InboundOnionErr {
        err_code: u16,
        err_data: Vec<u8>,
        msg: &'static str,
@@ -507,19 +507,19 @@ struct ClaimablePayments {
 /// running normally, and specifically must be processed before any other non-background
 /// [`ChannelMonitorUpdate`]s are applied.
 enum BackgroundEvent {
-       /// Handle a ChannelMonitorUpdate which closes the channel. This is only separated from
-       /// [`Self::MonitorUpdateRegeneratedOnStartup`] as the maybe-non-closing variant needs a public
-       /// key to handle channel resumption, whereas if the channel has been force-closed we do not
-       /// need the counterparty node_id.
+       /// Handle a ChannelMonitorUpdate which closes the channel or for an already-closed channel.
+       /// This is only separated from [`Self::MonitorUpdateRegeneratedOnStartup`] as the
+       /// maybe-non-closing variant needs a public key to handle channel resumption, whereas if the
+       /// channel has been force-closed we do not need the counterparty node_id.
        ///
        /// Note that any such events are lost on shutdown, so in general they must be updates which
        /// are regenerated on startup.
-       ClosingMonitorUpdateRegeneratedOnStartup((OutPoint, ChannelMonitorUpdate)),
+       ClosedMonitorUpdateRegeneratedOnStartup((OutPoint, ChannelMonitorUpdate)),
        /// Handle a ChannelMonitorUpdate which may or may not close the channel and may unblock the
        /// channel to continue normal operation.
        ///
        /// In general this should be used rather than
-       /// [`Self::ClosingMonitorUpdateRegeneratedOnStartup`], however in cases where the
+       /// [`Self::ClosedMonitorUpdateRegeneratedOnStartup`], however in cases where the
        /// `counterparty_node_id` is not available as the channel has closed from a [`ChannelMonitor`]
        /// error the other variant is acceptable.
        ///
@@ -530,6 +530,13 @@ enum BackgroundEvent {
                funding_txo: OutPoint,
                update: ChannelMonitorUpdate
        },
+       /// Some [`ChannelMonitorUpdate`] (s) completed before we were serialized but we still have
+       /// them marked pending, thus we need to run any [`MonitorUpdateCompletionAction`] (s) pending
+       /// on a channel.
+       MonitorUpdatesComplete {
+               counterparty_node_id: PublicKey,
+               channel_id: [u8; 32],
+       },
 }
 
 #[derive(Debug)]
@@ -752,7 +759,23 @@ pub type SimpleArcChannelManager<M, T, F, L> = ChannelManager<
 /// of [`KeysManager`] and [`DefaultRouter`].
 ///
 /// This is not exported to bindings users as Arcs don't make sense in bindings
-pub type SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, 'h, M, T, F, L> = ChannelManager<&'a M, &'b T, &'c KeysManager, &'c KeysManager, &'c KeysManager, &'d F, &'e DefaultRouter<&'f NetworkGraph<&'g L>, &'g L, &'h Mutex<ProbabilisticScorer<&'f NetworkGraph<&'g L>, &'g L>>, ProbabilisticScoringFeeParameters, ProbabilisticScorer<&'f NetworkGraph<&'g L>, &'g L>>, &'g L>;
+pub type SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, 'h, M, T, F, L> =
+       ChannelManager<
+               &'a M,
+               &'b T,
+               &'c KeysManager,
+               &'c KeysManager,
+               &'c KeysManager,
+               &'d F,
+               &'e DefaultRouter<
+                       &'f NetworkGraph<&'g L>,
+                       &'g L,
+                       &'h Mutex<ProbabilisticScorer<&'f NetworkGraph<&'g L>, &'g L>>,
+                       ProbabilisticScoringFeeParameters,
+                       ProbabilisticScorer<&'f NetworkGraph<&'g L>, &'g L>
+               >,
+               &'g L
+       >;
 
 macro_rules! define_test_pub_trait { ($vis: vis) => {
 /// A trivial trait which describes any [`ChannelManager`] used in testing.
@@ -1098,7 +1121,6 @@ where
        /// Notifier the lock contains sends out a notification when the lock is released.
        total_consistency_lock: RwLock<()>,
 
-       #[cfg(debug_assertions)]
        background_events_processed_since_startup: AtomicBool,
 
        persistence_notifier: Notifier,
@@ -1464,6 +1486,9 @@ pub struct ChannelDetails {
        ///
        /// [`confirmations_required`]: ChannelDetails::confirmations_required
        pub is_channel_ready: bool,
+       /// The stage of the channel's shutdown.
+       /// `None` for `ChannelDetails` serialized on LDK versions prior to 0.0.116.
+       pub channel_shutdown_state: Option<ChannelShutdownState>,
        /// True if the channel is (a) confirmed and channel_ready messages have been exchanged, (b)
        /// the peer is connected, and (c) the channel is not currently negotiating a shutdown.
        ///
@@ -1503,10 +1528,13 @@ impl ChannelDetails {
                self.short_channel_id.or(self.outbound_scid_alias)
        }
 
-       fn from_channel_context<Signer: WriteableEcdsaChannelSigner>(context: &ChannelContext<Signer>,
-               best_block_height: u32, latest_features: InitFeatures) -> Self {
-
-               let balance = context.get_available_balances();
+       fn from_channel_context<Signer: WriteableEcdsaChannelSigner, F: Deref>(
+               context: &ChannelContext<Signer>, best_block_height: u32, latest_features: InitFeatures,
+               fee_estimator: &LowerBoundedFeeEstimator<F>
+       ) -> Self
+       where F::Target: FeeEstimator
+       {
+               let balance = context.get_available_balances(fee_estimator);
                let (to_remote_reserve_satoshis, to_self_reserve_satoshis) =
                        context.get_holder_counterparty_selected_channel_reserve_satoshis();
                ChannelDetails {
@@ -1551,10 +1579,33 @@ impl ChannelDetails {
                        inbound_htlc_minimum_msat: Some(context.get_holder_htlc_minimum_msat()),
                        inbound_htlc_maximum_msat: context.get_holder_htlc_maximum_msat(),
                        config: Some(context.config()),
+                       channel_shutdown_state: Some(context.shutdown_state()),
                }
        }
 }
 
+#[derive(Clone, Copy, Debug, PartialEq, Eq)]
+/// Further information on the details of the channel shutdown.
+/// Upon channels being forced closed (i.e. commitment transaction confirmation detected
+/// by `ChainMonitor`), ChannelShutdownState will be set to `ShutdownComplete` or
+/// the channel will be removed shortly.
+/// Also note, that in normal operation, peers could disconnect at any of these states
+/// and require peer re-connection before making progress onto other states
+pub enum ChannelShutdownState {
+       /// Channel has not sent or received a shutdown message.
+       NotShuttingDown,
+       /// Local node has sent a shutdown message for this channel.
+       ShutdownInitiated,
+       /// Shutdown message exchanges have concluded and the channels are in the midst of
+       /// resolving all existing open HTLCs before closing can continue.
+       ResolvingHTLCs,
+       /// All HTLCs have been resolved, nodes are currently negotiating channel close onchain fee rates.
+       NegotiatingClosingFee,
+       /// We've successfully negotiated a closing_signed dance. At this point `ChannelManager` is about
+       /// to drop the channel.
+       ShutdownComplete,
+}
+
 /// Used by [`ChannelManager::list_recent_payments`] to express the status of recent payments.
 /// These include payments that have yet to find a successful path, or have unresolved HTLCs.
 #[derive(Debug, PartialEq)]
@@ -1872,9 +1923,7 @@ macro_rules! handle_new_monitor_update {
                // update_maps_on_chan_removal needs to be able to take id_to_peer, so make sure we can in
                // any case so that it won't deadlock.
                debug_assert_ne!($self.id_to_peer.held_by_thread(), LockHeldState::HeldByThread);
-               #[cfg(debug_assertions)] {
-                       debug_assert!($self.background_events_processed_since_startup.load(Ordering::Acquire));
-               }
+               debug_assert!($self.background_events_processed_since_startup.load(Ordering::Acquire));
                match $update_res {
                        ChannelMonitorUpdateStatus::InProgress => {
                                log_debug!($self.logger, "ChannelMonitor update for {} in flight, holding messages until the update completes.",
@@ -1978,6 +2027,8 @@ macro_rules! process_events_body {
                                let mut pending_events = $self.pending_events.lock().unwrap();
                                pending_events.drain(..num_events);
                                processed_all_events = pending_events.is_empty();
+                               // Note that `push_pending_forwards_ev` relies on `pending_events_processor` being
+                               // updated here with the `pending_events` lock acquired.
                                $self.pending_events_processor.store(false, Ordering::Release);
                        }
 
@@ -2066,7 +2117,6 @@ where
                        pending_events_processor: AtomicBool::new(false),
                        pending_background_events: Mutex::new(Vec::new()),
                        total_consistency_lock: RwLock::new(()),
-                       #[cfg(debug_assertions)]
                        background_events_processed_since_startup: AtomicBool::new(false),
                        persistence_notifier: Notifier::new(),
 
@@ -2198,7 +2248,7 @@ where
                                let peer_state = &mut *peer_state_lock;
                                for (_channel_id, channel) in peer_state.channel_by_id.iter().filter(f) {
                                        let details = ChannelDetails::from_channel_context(&channel.context, best_block_height,
-                                               peer_state.latest_features.clone());
+                                               peer_state.latest_features.clone(), &self.fee_estimator);
                                        res.push(details);
                                }
                        }
@@ -2224,17 +2274,17 @@ where
                                let peer_state = &mut *peer_state_lock;
                                for (_channel_id, channel) in peer_state.channel_by_id.iter() {
                                        let details = ChannelDetails::from_channel_context(&channel.context, best_block_height,
-                                               peer_state.latest_features.clone());
+                                               peer_state.latest_features.clone(), &self.fee_estimator);
                                        res.push(details);
                                }
                                for (_channel_id, channel) in peer_state.inbound_v1_channel_by_id.iter() {
                                        let details = ChannelDetails::from_channel_context(&channel.context, best_block_height,
-                                               peer_state.latest_features.clone());
+                                               peer_state.latest_features.clone(), &self.fee_estimator);
                                        res.push(details);
                                }
                                for (_channel_id, channel) in peer_state.outbound_v1_channel_by_id.iter() {
                                        let details = ChannelDetails::from_channel_context(&channel.context, best_block_height,
-                                               peer_state.latest_features.clone());
+                                               peer_state.latest_features.clone(), &self.fee_estimator);
                                        res.push(details);
                                }
                        }
@@ -2267,7 +2317,8 @@ where
                        return peer_state.channel_by_id
                                .iter()
                                .map(|(_, channel)|
-                                       ChannelDetails::from_channel_context(&channel.context, best_block_height, features.clone()))
+                                       ChannelDetails::from_channel_context(&channel.context, best_block_height,
+                                       features.clone(), &self.fee_estimator))
                                .collect();
                }
                vec![]
@@ -2565,14 +2616,64 @@ where
                }
        }
 
+       fn construct_fwd_pending_htlc_info(
+               &self, msg: &msgs::UpdateAddHTLC, hop_data: msgs::InboundOnionPayload, hop_hmac: [u8; 32],
+               new_packet_bytes: [u8; onion_utils::ONION_DATA_LEN], shared_secret: [u8; 32],
+               next_packet_pubkey_opt: Option<Result<PublicKey, secp256k1::Error>>
+       ) -> Result<PendingHTLCInfo, InboundOnionErr> {
+               debug_assert!(next_packet_pubkey_opt.is_some());
+               let outgoing_packet = msgs::OnionPacket {
+                       version: 0,
+                       public_key: next_packet_pubkey_opt.unwrap_or(Err(secp256k1::Error::InvalidPublicKey)),
+                       hop_data: new_packet_bytes,
+                       hmac: hop_hmac,
+               };
+
+               let (short_channel_id, amt_to_forward, outgoing_cltv_value) = match hop_data {
+                       msgs::InboundOnionPayload::Forward { short_channel_id, amt_to_forward, outgoing_cltv_value } =>
+                               (short_channel_id, amt_to_forward, outgoing_cltv_value),
+                       msgs::InboundOnionPayload::Receive { .. } =>
+                               return Err(InboundOnionErr {
+                                       msg: "Final Node OnionHopData provided for us as an intermediary node",
+                                       err_code: 0x4000 | 22,
+                                       err_data: Vec::new(),
+                               }),
+               };
+
+               Ok(PendingHTLCInfo {
+                       routing: PendingHTLCRouting::Forward {
+                               onion_packet: outgoing_packet,
+                               short_channel_id,
+                       },
+                       payment_hash: msg.payment_hash,
+                       incoming_shared_secret: shared_secret,
+                       incoming_amt_msat: Some(msg.amount_msat),
+                       outgoing_amt_msat: amt_to_forward,
+                       outgoing_cltv_value,
+                       skimmed_fee_msat: None,
+               })
+       }
+
        fn construct_recv_pending_htlc_info(
-               &self, hop_data: msgs::OnionHopData, shared_secret: [u8; 32], payment_hash: PaymentHash,
+               &self, hop_data: msgs::InboundOnionPayload, shared_secret: [u8; 32], payment_hash: PaymentHash,
                amt_msat: u64, cltv_expiry: u32, phantom_shared_secret: Option<[u8; 32]>, allow_underpay: bool,
                counterparty_skimmed_fee_msat: Option<u64>,
-       ) -> Result<PendingHTLCInfo, ReceiveError> {
+       ) -> Result<PendingHTLCInfo, InboundOnionErr> {
+               let (payment_data, keysend_preimage, onion_amt_msat, outgoing_cltv_value, payment_metadata) = match hop_data {
+                       msgs::InboundOnionPayload::Receive {
+                               payment_data, keysend_preimage, amt_msat, outgoing_cltv_value, payment_metadata, ..
+                       } =>
+                               (payment_data, keysend_preimage, amt_msat, outgoing_cltv_value, payment_metadata),
+                       _ =>
+                               return Err(InboundOnionErr {
+                                       err_code: 0x4000|22,
+                                       err_data: Vec::new(),
+                                       msg: "Got non final data with an HMAC of 0",
+                               }),
+               };
                // final_incorrect_cltv_expiry
-               if hop_data.outgoing_cltv_value > cltv_expiry {
-                       return Err(ReceiveError {
+               if outgoing_cltv_value > cltv_expiry {
+                       return Err(InboundOnionErr {
                                msg: "Upstream node set CLTV to less than the CLTV set by the sender",
                                err_code: 18,
                                err_data: cltv_expiry.to_be_bytes().to_vec()
@@ -2586,85 +2687,74 @@ where
                // payment logic has enough time to fail the HTLC backward before our onchain logic triggers a
                // channel closure (see HTLC_FAIL_BACK_BUFFER rationale).
                let current_height: u32 = self.best_block.read().unwrap().height();
-               if (hop_data.outgoing_cltv_value as u64) <= current_height as u64 + HTLC_FAIL_BACK_BUFFER as u64 + 1 {
+               if (outgoing_cltv_value as u64) <= current_height as u64 + HTLC_FAIL_BACK_BUFFER as u64 + 1 {
                        let mut err_data = Vec::with_capacity(12);
                        err_data.extend_from_slice(&amt_msat.to_be_bytes());
                        err_data.extend_from_slice(&current_height.to_be_bytes());
-                       return Err(ReceiveError {
+                       return Err(InboundOnionErr {
                                err_code: 0x4000 | 15, err_data,
                                msg: "The final CLTV expiry is too soon to handle",
                        });
                }
-               if (!allow_underpay && hop_data.amt_to_forward > amt_msat) ||
-                       (allow_underpay && hop_data.amt_to_forward >
+               if (!allow_underpay && onion_amt_msat > amt_msat) ||
+                       (allow_underpay && onion_amt_msat >
                         amt_msat.saturating_add(counterparty_skimmed_fee_msat.unwrap_or(0)))
                {
-                       return Err(ReceiveError {
+                       return Err(InboundOnionErr {
                                err_code: 19,
                                err_data: amt_msat.to_be_bytes().to_vec(),
                                msg: "Upstream node sent less than we were supposed to receive in payment",
                        });
                }
 
-               let routing = match hop_data.format {
-                       msgs::OnionHopDataFormat::NonFinalNode { .. } => {
-                               return Err(ReceiveError {
+               let routing = if let Some(payment_preimage) = keysend_preimage {
+                       // We need to check that the sender knows the keysend preimage before processing this
+                       // payment further. Otherwise, an intermediary routing hop forwarding non-keysend-HTLC X
+                       // could discover the final destination of X, by probing the adjacent nodes on the route
+                       // with a keysend payment of identical payment hash to X and observing the processing
+                       // time discrepancies due to a hash collision with X.
+                       let hashed_preimage = PaymentHash(Sha256::hash(&payment_preimage.0).into_inner());
+                       if hashed_preimage != payment_hash {
+                               return Err(InboundOnionErr {
                                        err_code: 0x4000|22,
                                        err_data: Vec::new(),
-                                       msg: "Got non final data with an HMAC of 0",
+                                       msg: "Payment preimage didn't match payment hash",
                                });
-                       },
-                       msgs::OnionHopDataFormat::FinalNode { payment_data, keysend_preimage, payment_metadata } => {
-                               if let Some(payment_preimage) = keysend_preimage {
-                                       // We need to check that the sender knows the keysend preimage before processing this
-                                       // payment further. Otherwise, an intermediary routing hop forwarding non-keysend-HTLC X
-                                       // could discover the final destination of X, by probing the adjacent nodes on the route
-                                       // with a keysend payment of identical payment hash to X and observing the processing
-                                       // time discrepancies due to a hash collision with X.
-                                       let hashed_preimage = PaymentHash(Sha256::hash(&payment_preimage.0).into_inner());
-                                       if hashed_preimage != payment_hash {
-                                               return Err(ReceiveError {
-                                                       err_code: 0x4000|22,
-                                                       err_data: Vec::new(),
-                                                       msg: "Payment preimage didn't match payment hash",
-                                               });
-                                       }
-                                       if !self.default_configuration.accept_mpp_keysend && payment_data.is_some() {
-                                               return Err(ReceiveError {
-                                                       err_code: 0x4000|22,
-                                                       err_data: Vec::new(),
-                                                       msg: "We don't support MPP keysend payments",
-                                               });
-                                       }
-                                       PendingHTLCRouting::ReceiveKeysend {
-                                               payment_data,
-                                               payment_preimage,
-                                               payment_metadata,
-                                               incoming_cltv_expiry: hop_data.outgoing_cltv_value,
-                                       }
-                               } else if let Some(data) = payment_data {
-                                       PendingHTLCRouting::Receive {
-                                               payment_data: data,
-                                               payment_metadata,
-                                               incoming_cltv_expiry: hop_data.outgoing_cltv_value,
-                                               phantom_shared_secret,
-                                       }
-                               } else {
-                                       return Err(ReceiveError {
-                                               err_code: 0x4000|0x2000|3,
-                                               err_data: Vec::new(),
-                                               msg: "We require payment_secrets",
-                                       });
-                               }
-                       },
+                       }
+                       if !self.default_configuration.accept_mpp_keysend && payment_data.is_some() {
+                               return Err(InboundOnionErr {
+                                       err_code: 0x4000|22,
+                                       err_data: Vec::new(),
+                                       msg: "We don't support MPP keysend payments",
+                               });
+                       }
+                       PendingHTLCRouting::ReceiveKeysend {
+                               payment_data,
+                               payment_preimage,
+                               payment_metadata,
+                               incoming_cltv_expiry: outgoing_cltv_value,
+                       }
+               } else if let Some(data) = payment_data {
+                       PendingHTLCRouting::Receive {
+                               payment_data: data,
+                               payment_metadata,
+                               incoming_cltv_expiry: outgoing_cltv_value,
+                               phantom_shared_secret,
+                       }
+               } else {
+                       return Err(InboundOnionErr {
+                               err_code: 0x4000|0x2000|3,
+                               err_data: Vec::new(),
+                               msg: "We require payment_secrets",
+                       });
                };
                Ok(PendingHTLCInfo {
                        routing,
                        payment_hash,
                        incoming_shared_secret: shared_secret,
                        incoming_amt_msat: Some(amt_msat),
-                       outgoing_amt_msat: hop_data.amt_to_forward,
-                       outgoing_cltv_value: hop_data.outgoing_cltv_value,
+                       outgoing_amt_msat: onion_amt_msat,
+                       outgoing_cltv_value,
                        skimmed_fee_msat: counterparty_skimmed_fee_msat,
                })
        }
@@ -2728,9 +2818,8 @@ where
                };
                let (outgoing_scid, outgoing_amt_msat, outgoing_cltv_value, next_packet_pk_opt) = match next_hop {
                        onion_utils::Hop::Forward {
-                               next_hop_data: msgs::OnionHopData {
-                                       format: msgs::OnionHopDataFormat::NonFinalNode { short_channel_id }, amt_to_forward,
-                                       outgoing_cltv_value,
+                               next_hop_data: msgs::InboundOnionPayload::Forward {
+                                       short_channel_id, amt_to_forward, outgoing_cltv_value
                                }, ..
                        } => {
                                let next_pk = onion_utils::next_hop_packet_pubkey(&self.secp_ctx,
@@ -2740,9 +2829,7 @@ where
                        // We'll do receive checks in [`Self::construct_pending_htlc_info`] so we have access to the
                        // inbound channel's state.
                        onion_utils::Hop::Receive { .. } => return Ok((next_hop, shared_secret, None)),
-                       onion_utils::Hop::Forward {
-                               next_hop_data: msgs::OnionHopData { format: msgs::OnionHopDataFormat::FinalNode { .. }, .. }, ..
-                       } => {
+                       onion_utils::Hop::Forward { next_hop_data: msgs::InboundOnionPayload::Receive { .. }, .. } => {
                                return_err!("Final Node OnionHopData provided for us as an intermediary node", 0x4000 | 22, &[0; 0]);
                        }
                };
@@ -2913,37 +3000,15 @@ where
                                                // delay) once they've send us a commitment_signed!
                                                PendingHTLCStatus::Forward(info)
                                        },
-                                       Err(ReceiveError { err_code, err_data, msg }) => return_err!(msg, err_code, &err_data)
+                                       Err(InboundOnionErr { err_code, err_data, msg }) => return_err!(msg, err_code, &err_data)
                                }
                        },
                        onion_utils::Hop::Forward { next_hop_data, next_hop_hmac, new_packet_bytes } => {
-                               debug_assert!(next_packet_pubkey_opt.is_some());
-                               let outgoing_packet = msgs::OnionPacket {
-                                       version: 0,
-                                       public_key: next_packet_pubkey_opt.unwrap_or(Err(secp256k1::Error::InvalidPublicKey)),
-                                       hop_data: new_packet_bytes,
-                                       hmac: next_hop_hmac.clone(),
-                               };
-
-                               let short_channel_id = match next_hop_data.format {
-                                       msgs::OnionHopDataFormat::NonFinalNode { short_channel_id } => short_channel_id,
-                                       msgs::OnionHopDataFormat::FinalNode { .. } => {
-                                               return_err!("Final Node OnionHopData provided for us as an intermediary node", 0x4000 | 22, &[0;0]);
-                                       },
-                               };
-
-                               PendingHTLCStatus::Forward(PendingHTLCInfo {
-                                       routing: PendingHTLCRouting::Forward {
-                                               onion_packet: outgoing_packet,
-                                               short_channel_id,
-                                       },
-                                       payment_hash: msg.payment_hash.clone(),
-                                       incoming_shared_secret: shared_secret,
-                                       incoming_amt_msat: Some(msg.amount_msat),
-                                       outgoing_amt_msat: next_hop_data.amt_to_forward,
-                                       outgoing_cltv_value: next_hop_data.outgoing_cltv_value,
-                                       skimmed_fee_msat: None,
-                               })
+                               match self.construct_fwd_pending_htlc_info(msg, next_hop_data, next_hop_hmac,
+                                       new_packet_bytes, shared_secret, next_packet_pubkey_opt) {
+                                       Ok(info) => PendingHTLCStatus::Forward(info),
+                                       Err(InboundOnionErr { err_code, err_data, msg }) => return_err!(msg, err_code, &err_data)
+                               }
                        }
                }
        }
@@ -3071,7 +3136,7 @@ where
                                                session_priv: session_priv.clone(),
                                                first_hop_htlc_msat: htlc_msat,
                                                payment_id,
-                                       }, onion_packet, None, &self.logger);
+                                       }, onion_packet, None, &self.fee_estimator, &self.logger);
                                match break_chan_entry!(self, send_res, chan) {
                                        Some(monitor_update) => {
                                                match handle_new_monitor_update!(self, funding_txo, monitor_update, peer_state_lock, peer_state, per_peer_state, chan) {
@@ -3154,6 +3219,7 @@ where
        /// irrevocably committed to on our end. In such a case, do NOT retry the payment with a
        /// different route unless you intend to pay twice!
        ///
+       /// [`RouteHop`]: crate::routing::router::RouteHop
        /// [`Event::PaymentSent`]: events::Event::PaymentSent
        /// [`Event::PaymentFailed`]: events::Event::PaymentFailed
        /// [`UpdateHTLCs`]: events::MessageSendEvent::UpdateHTLCs
@@ -3725,7 +3791,7 @@ where
                                                                                                                        outgoing_cltv_value, Some(phantom_shared_secret), false, None)
                                                                                                                {
                                                                                                                        Ok(info) => phantom_receives.push((prev_short_channel_id, prev_funding_outpoint, prev_user_channel_id, vec![(info, prev_htlc_id)])),
-                                                                                                                       Err(ReceiveError { err_code, err_data, msg }) => failed_payment!(msg, err_code, err_data, Some(phantom_shared_secret))
+                                                                                                                       Err(InboundOnionErr { err_code, err_data, msg }) => failed_payment!(msg, err_code, err_data, Some(phantom_shared_secret))
                                                                                                                }
                                                                                                        },
                                                                                                        _ => panic!(),
@@ -3788,7 +3854,8 @@ where
                                                                                });
                                                                                if let Err(e) = chan.get_mut().queue_add_htlc(outgoing_amt_msat,
                                                                                        payment_hash, outgoing_cltv_value, htlc_source.clone(),
-                                                                                       onion_packet, skimmed_fee_msat, &self.logger)
+                                                                                       onion_packet, skimmed_fee_msat, &self.fee_estimator,
+                                                                                       &self.logger)
                                                                                {
                                                                                        if let ChannelError::Ignore(msg) = e {
                                                                                                log_trace!(self.logger, "Failed to forward HTLC with payment_hash {}: {}", log_bytes!(payment_hash.0), msg);
@@ -4103,7 +4170,6 @@ where
        fn process_background_events(&self) -> NotifyOption {
                debug_assert_ne!(self.total_consistency_lock.held_by_thread(), LockHeldState::NotHeldByThread);
 
-               #[cfg(debug_assertions)]
                self.background_events_processed_since_startup.store(true, Ordering::Release);
 
                let mut background_events = Vec::new();
@@ -4114,7 +4180,7 @@ where
 
                for event in background_events.drain(..) {
                        match event {
-                               BackgroundEvent::ClosingMonitorUpdateRegeneratedOnStartup((funding_txo, update)) => {
+                               BackgroundEvent::ClosedMonitorUpdateRegeneratedOnStartup((funding_txo, update)) => {
                                        // The channel has already been closed, so no use bothering to care about the
                                        // monitor updating completing.
                                        let _ = self.chain_monitor.update_channel(funding_txo, &update);
@@ -4149,6 +4215,22 @@ where
                                        }
                                        let _ = handle_error!(self, res, counterparty_node_id);
                                },
+                               BackgroundEvent::MonitorUpdatesComplete { counterparty_node_id, channel_id } => {
+                                       let per_peer_state = self.per_peer_state.read().unwrap();
+                                       if let Some(peer_state_mutex) = per_peer_state.get(&counterparty_node_id) {
+                                               let mut peer_state_lock = peer_state_mutex.lock().unwrap();
+                                               let peer_state = &mut *peer_state_lock;
+                                               if let Some(chan) = peer_state.channel_by_id.get_mut(&channel_id) {
+                                                       handle_monitor_update_completion!(self, peer_state_lock, peer_state, per_peer_state, chan);
+                                               } else {
+                                                       let update_actions = peer_state.monitor_update_blocked_actions
+                                                               .remove(&channel_id).unwrap_or(Vec::new());
+                                                       mem::drop(peer_state_lock);
+                                                       mem::drop(per_peer_state);
+                                                       self.handle_monitor_update_completion_actions(update_actions);
+                                               }
+                                       }
+                               },
                        }
                }
                NotifyOption::DoPersist
@@ -4177,7 +4259,7 @@ where
                log_trace!(self.logger, "Channel {} qualifies for a feerate change from {} to {}.",
                        log_bytes!(chan_id[..]), chan.context.get_feerate_sat_per_1000_weight(), new_feerate);
 
-               chan.queue_update_fee(new_feerate, &self.logger);
+               chan.queue_update_fee(new_feerate, &self.fee_estimator, &self.logger);
                NotifyOption::DoPersist
        }
 
@@ -4694,6 +4776,11 @@ where
        -> Result<(), (PublicKey, MsgHandleErrInternal)> {
                //TODO: Delay the claimed_funds relaying just like we do outbound relay!
 
+               // If we haven't yet run background events assume we're still deserializing and shouldn't
+               // actually pass `ChannelMonitorUpdate`s to users yet. Instead, queue them up as
+               // `BackgroundEvent`s.
+               let during_init = !self.background_events_processed_since_startup.load(Ordering::Acquire);
+
                {
                        let per_peer_state = self.per_peer_state.read().unwrap();
                        let chan_id = prev_hop.outpoint.to_channel_id();
@@ -4720,14 +4807,26 @@ where
                                                                log_bytes!(chan_id), action);
                                                        peer_state.monitor_update_blocked_actions.entry(chan_id).or_insert(Vec::new()).push(action);
                                                }
-                                               let res = handle_new_monitor_update!(self, prev_hop.outpoint, monitor_update, peer_state_lock,
-                                                       peer_state, per_peer_state, chan);
-                                               if let Err(e) = res {
-                                                       // TODO: This is a *critical* error - we probably updated the outbound edge
-                                                       // of the HTLC's monitor with a preimage. We should retry this monitor
-                                                       // update over and over again until morale improves.
-                                                       log_error!(self.logger, "Failed to update channel monitor with preimage {:?}", payment_preimage);
-                                                       return Err((counterparty_node_id, e));
+                                               if !during_init {
+                                                       let res = handle_new_monitor_update!(self, prev_hop.outpoint, monitor_update, peer_state_lock,
+                                                               peer_state, per_peer_state, chan);
+                                                       if let Err(e) = res {
+                                                               // TODO: This is a *critical* error - we probably updated the outbound edge
+                                                               // of the HTLC's monitor with a preimage. We should retry this monitor
+                                                               // update over and over again until morale improves.
+                                                               log_error!(self.logger, "Failed to update channel monitor with preimage {:?}", payment_preimage);
+                                                               return Err((counterparty_node_id, e));
+                                                       }
+                                               } else {
+                                                       // If we're running during init we cannot update a monitor directly -
+                                                       // they probably haven't actually been loaded yet. Instead, push the
+                                                       // monitor update as a background event.
+                                                       self.pending_background_events.lock().unwrap().push(
+                                                               BackgroundEvent::MonitorUpdateRegeneratedOnStartup {
+                                                                       counterparty_node_id,
+                                                                       funding_txo: prev_hop.outpoint,
+                                                                       update: monitor_update.clone(),
+                                                               });
                                                }
                                        }
                                        return Ok(());
@@ -4740,16 +4839,34 @@ where
                                payment_preimage,
                        }],
                };
-               // We update the ChannelMonitor on the backward link, after
-               // receiving an `update_fulfill_htlc` from the forward link.
-               let update_res = self.chain_monitor.update_channel(prev_hop.outpoint, &preimage_update);
-               if update_res != ChannelMonitorUpdateStatus::Completed {
-                       // TODO: This needs to be handled somehow - if we receive a monitor update
-                       // with a preimage we *must* somehow manage to propagate it to the upstream
-                       // channel, or we must have an ability to receive the same event and try
-                       // again on restart.
-                       log_error!(self.logger, "Critical error: failed to update channel monitor with preimage {:?}: {:?}",
-                               payment_preimage, update_res);
+
+               if !during_init {
+                       // We update the ChannelMonitor on the backward link, after
+                       // receiving an `update_fulfill_htlc` from the forward link.
+                       let update_res = self.chain_monitor.update_channel(prev_hop.outpoint, &preimage_update);
+                       if update_res != ChannelMonitorUpdateStatus::Completed {
+                               // TODO: This needs to be handled somehow - if we receive a monitor update
+                               // with a preimage we *must* somehow manage to propagate it to the upstream
+                               // channel, or we must have an ability to receive the same event and try
+                               // again on restart.
+                               log_error!(self.logger, "Critical error: failed to update channel monitor with preimage {:?}: {:?}",
+                                       payment_preimage, update_res);
+                       }
+               } else {
+                       // If we're running during init we cannot update a monitor directly - they probably
+                       // haven't actually been loaded yet. Instead, push the monitor update as a background
+                       // event.
+                       // Note that while it's safe to use `ClosedMonitorUpdateRegeneratedOnStartup` here (the
+                       // channel is already closed) we need to ultimately handle the monitor update
+                       // completion action only after we've completed the monitor update. This is the only
+                       // way to guarantee this update *will* be regenerated on startup (otherwise if this was
+                       // from a forwarded HTLC the downstream preimage may be deleted before we claim
+                       // upstream). Thus, we need to transition to some new `BackgroundEvent` type which will
+                       // complete the monitor update completion action from `completion_action`.
+                       self.pending_background_events.lock().unwrap().push(
+                               BackgroundEvent::ClosedMonitorUpdateRegeneratedOnStartup((
+                                       prev_hop.outpoint, preimage_update,
+                               )));
                }
                // Note that we do process the completion action here. This totally could be a
                // duplicate claim, but we have no way of knowing without interrogating the
@@ -4767,6 +4884,8 @@ where
        fn claim_funds_internal(&self, source: HTLCSource, payment_preimage: PaymentPreimage, forwarded_htlc_value_msat: Option<u64>, from_onchain: bool, next_channel_id: [u8; 32]) {
                match source {
                        HTLCSource::OutboundRoute { session_priv, payment_id, path, .. } => {
+                               debug_assert!(self.background_events_processed_since_startup.load(Ordering::Acquire),
+                                       "We don't support claim_htlc claims during startup - monitors may not be available yet");
                                self.pending_outbound_payments.claim_htlc(payment_id, payment_preimage, session_priv, path, from_onchain, &self.pending_events, &self.logger);
                        },
                        HTLCSource::PreviousHopData(hop_data) => {
@@ -4922,24 +5041,29 @@ where
                if peer_state_mutex_opt.is_none() { return }
                peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap();
                let peer_state = &mut *peer_state_lock;
-               let mut channel = {
-                       match peer_state.channel_by_id.entry(funding_txo.to_channel_id()){
-                               hash_map::Entry::Occupied(chan) => chan,
-                               hash_map::Entry::Vacant(_) => return,
-                       }
-               };
+               let channel =
+                       if let Some(chan) = peer_state.channel_by_id.get_mut(&funding_txo.to_channel_id()) {
+                               chan
+                       } else {
+                               let update_actions = peer_state.monitor_update_blocked_actions
+                                       .remove(&funding_txo.to_channel_id()).unwrap_or(Vec::new());
+                               mem::drop(peer_state_lock);
+                               mem::drop(per_peer_state);
+                               self.handle_monitor_update_completion_actions(update_actions);
+                               return;
+                       };
                let remaining_in_flight =
                        if let Some(pending) = peer_state.in_flight_monitor_updates.get_mut(funding_txo) {
                                pending.retain(|upd| upd.update_id > highest_applied_update_id);
                                pending.len()
                        } else { 0 };
                log_trace!(self.logger, "ChannelMonitor updated to {}. Current highest is {}. {} pending in-flight updates.",
-                       highest_applied_update_id, channel.get().context.get_latest_monitor_update_id(),
+                       highest_applied_update_id, channel.context.get_latest_monitor_update_id(),
                        remaining_in_flight);
-               if !channel.get().is_awaiting_monitor_update() || channel.get().context.get_latest_monitor_update_id() != highest_applied_update_id {
+               if !channel.is_awaiting_monitor_update() || channel.context.get_latest_monitor_update_id() != highest_applied_update_id {
                        return;
                }
-               handle_monitor_update_completion!(self, peer_state_lock, peer_state, per_peer_state, channel.get_mut());
+               handle_monitor_update_completion!(self, peer_state_lock, peer_state, per_peer_state, channel);
        }
 
        /// Accepts a request to open a channel after a [`Event::OpenChannelRequest`].
@@ -5513,7 +5637,7 @@ where
                                                _ => pending_forward_info
                                        }
                                };
-                               try_chan_entry!(self, chan.get_mut().update_add_htlc(&msg, pending_forward_info, create_pending_htlc_status, &self.logger), chan);
+                               try_chan_entry!(self, chan.get_mut().update_add_htlc(&msg, pending_forward_info, create_pending_htlc_status, &self.fee_estimator, &self.logger), chan);
                        },
                        hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id))
                }
@@ -5686,22 +5810,27 @@ where
                }
        }
 
-       // We only want to push a PendingHTLCsForwardable event if no others are queued.
        fn push_pending_forwards_ev(&self) {
                let mut pending_events = self.pending_events.lock().unwrap();
-               let forward_ev_exists = pending_events.iter()
-                       .find(|(ev, _)| if let events::Event::PendingHTLCsForwardable { .. } = ev { true } else { false })
-                       .is_some();
-               if !forward_ev_exists {
-                       pending_events.push_back((events::Event::PendingHTLCsForwardable {
-                               time_forwardable:
-                                       Duration::from_millis(MIN_HTLC_RELAY_HOLDING_CELL_MILLIS),
+               let is_processing_events = self.pending_events_processor.load(Ordering::Acquire);
+               let num_forward_events = pending_events.iter().filter(|(ev, _)|
+                       if let events::Event::PendingHTLCsForwardable { .. } = ev { true } else { false }
+               ).count();
+               // We only want to push a PendingHTLCsForwardable event if no others are queued. Processing
+               // events is done in batches and they are not removed until we're done processing each
+               // batch. Since handling a `PendingHTLCsForwardable` event will call back into the
+               // `ChannelManager`, we'll still see the original forwarding event not removed. Phantom
+               // payments will need an additional forwarding event before being claimed to make them look
+               // real by taking more time.
+               if (is_processing_events && num_forward_events <= 1) || num_forward_events < 1 {
+                       pending_events.push_back((Event::PendingHTLCsForwardable {
+                               time_forwardable: Duration::from_millis(MIN_HTLC_RELAY_HOLDING_CELL_MILLIS),
                        }, None));
                }
        }
 
        /// Checks whether [`ChannelMonitorUpdate`]s generated by the receipt of a remote
-       /// [`msgs::RevokeAndACK`] should be held for the given channel until some other event
+       /// [`msgs::RevokeAndACK`] should be held for the given channel until some other action
        /// completes. Note that this needs to happen in the same [`PeerState`] mutex as any release of
        /// the [`ChannelMonitorUpdate`] in question.
        fn raa_monitor_updates_held(&self,
@@ -5730,7 +5859,7 @@ where
                        match peer_state.channel_by_id.entry(msg.channel_id) {
                                hash_map::Entry::Occupied(mut chan) => {
                                        let funding_txo = chan.get().context.get_funding_txo();
-                                       let (htlcs_to_fail, monitor_update_opt) = try_chan_entry!(self, chan.get_mut().revoke_and_ack(&msg, &self.logger), chan);
+                                       let (htlcs_to_fail, monitor_update_opt) = try_chan_entry!(self, chan.get_mut().revoke_and_ack(&msg, &self.fee_estimator, &self.logger), chan);
                                        let res = if let Some(monitor_update) = monitor_update_opt {
                                                handle_new_monitor_update!(self, funding_txo.unwrap(), monitor_update,
                                                        peer_state_lock, peer_state, per_peer_state, chan).map(|_| ())
@@ -6001,7 +6130,7 @@ where
                                                let counterparty_node_id = chan.context.get_counterparty_node_id();
                                                let funding_txo = chan.context.get_funding_txo();
                                                let (monitor_opt, holding_cell_failed_htlcs) =
-                                                       chan.maybe_free_holding_cell_htlcs(&self.logger);
+                                                       chan.maybe_free_holding_cell_htlcs(&self.fee_estimator, &self.logger);
                                                if !holding_cell_failed_htlcs.is_empty() {
                                                        failed_htlcs.push((holding_cell_failed_htlcs, *channel_id, counterparty_node_id));
                                                }
@@ -6318,7 +6447,7 @@ where
        /// When something which was blocking a channel from updating its [`ChannelMonitor`] (e.g. an
        /// [`Event`] being handled) completes, this should be called to restore the channel to normal
        /// operation. It will double-check that nothing *else* is also blocking the same channel from
-       /// making progress and then any blocked [`ChannelMonitorUpdate`]s fly.
+       /// making progress and then let any blocked [`ChannelMonitorUpdate`]s fly.
        fn handle_monitor_update_release(&self, counterparty_node_id: PublicKey, channel_funding_outpoint: OutPoint, mut completed_blocker: Option<RAAMonitorUpdateBlockingAction>) {
                let mut errors = Vec::new();
                loop {
@@ -7349,6 +7478,7 @@ impl Writeable for ChannelDetails {
                        (35, self.inbound_htlc_maximum_msat, option),
                        (37, user_channel_id_high_opt, option),
                        (39, self.feerate_sat_per_1000_weight, option),
+                       (41, self.channel_shutdown_state, option),
                });
                Ok(())
        }
@@ -7386,6 +7516,7 @@ impl Readable for ChannelDetails {
                        (35, inbound_htlc_maximum_msat, option),
                        (37, user_channel_id_high_opt, option),
                        (39, feerate_sat_per_1000_weight, option),
+                       (41, channel_shutdown_state, option),
                });
 
                // `user_channel_id` used to be a single u64 value. In order to remain backwards compatible with
@@ -7421,12 +7552,13 @@ impl Readable for ChannelDetails {
                        inbound_htlc_minimum_msat,
                        inbound_htlc_maximum_msat,
                        feerate_sat_per_1000_weight,
+                       channel_shutdown_state,
                })
        }
 }
 
 impl_writeable_tlv_based!(PhantomRouteHints, {
-       (2, channels, vec_type),
+       (2, channels, required_vec),
        (4, phantom_scid, required),
        (6, real_node_pubkey, required),
 });
@@ -7618,7 +7750,7 @@ impl Readable for HTLCSource {
                        0 => {
                                let mut session_priv: crate::util::ser::RequiredWrapper<SecretKey> = crate::util::ser::RequiredWrapper(None);
                                let mut first_hop_htlc_msat: u64 = 0;
-                               let mut path_hops: Option<Vec<RouteHop>> = Some(Vec::new());
+                               let mut path_hops = Vec::new();
                                let mut payment_id = None;
                                let mut payment_params: Option<PaymentParameters> = None;
                                let mut blinded_tail: Option<BlindedTail> = None;
@@ -7626,7 +7758,7 @@ impl Readable for HTLCSource {
                                        (0, session_priv, required),
                                        (1, payment_id, option),
                                        (2, first_hop_htlc_msat, required),
-                                       (4, path_hops, vec_type),
+                                       (4, path_hops, required_vec),
                                        (5, payment_params, (option: ReadableArgs, 0)),
                                        (6, blinded_tail, option),
                                });
@@ -7635,7 +7767,7 @@ impl Readable for HTLCSource {
                                        // instead.
                                        payment_id = Some(PaymentId(*session_priv.0.unwrap().as_ref()));
                                }
-                               let path = Path { hops: path_hops.ok_or(DecodeError::InvalidValue)?, blinded_tail };
+                               let path = Path { hops: path_hops, blinded_tail };
                                if path.hops.len() == 0 {
                                        return Err(DecodeError::InvalidValue);
                                }
@@ -7670,7 +7802,7 @@ impl Writeable for HTLCSource {
                                        (1, payment_id_opt, option),
                                        (2, first_hop_htlc_msat, required),
                                        // 3 was previously used to write a PaymentSecret for the payment.
-                                       (4, path.hops, vec_type),
+                                       (4, path.hops, required_vec),
                                        (5, None::<PaymentParameters>, option), // payment_params in LDK versions prior to 0.0.115
                                        (6, path.blinded_tail, option),
                                 });
@@ -7920,7 +8052,7 @@ where
                        (6, monitor_update_blocked_actions_per_peer, option),
                        (7, self.fake_scid_rand_bytes, required),
                        (8, if events_not_backwards_compatible { Some(&*events) } else { None }, option),
-                       (9, htlc_purposes, vec_type),
+                       (9, htlc_purposes, required_vec),
                        (10, in_flight_monitor_updates, option),
                        (11, self.probing_cookie_secret, required),
                        (13, htlc_onion_fields, optional_vec),
@@ -7971,6 +8103,14 @@ impl Readable for VecDeque<(Event, Option<EventCompletionAction>)> {
        }
 }
 
+impl_writeable_tlv_based_enum!(ChannelShutdownState,
+       (0, NotShuttingDown) => {},
+       (2, ShutdownInitiated) => {},
+       (4, ResolvingHTLCs) => {},
+       (6, NegotiatingClosingFee) => {},
+       (8, ShutdownComplete) => {}, ;
+);
+
 /// Arguments for the creation of a ChannelManager that are not deserialized.
 ///
 /// At a high-level, the process for deserializing a ChannelManager and resuming normal operation
@@ -8236,7 +8376,7 @@ where
                                        update_id: CLOSED_CHANNEL_UPDATE_ID,
                                        updates: vec![ChannelMonitorUpdateStep::ChannelForceClosed { should_broadcast: true }],
                                };
-                               close_background_events.push(BackgroundEvent::ClosingMonitorUpdateRegeneratedOnStartup((*funding_txo, monitor_update)));
+                               close_background_events.push(BackgroundEvent::ClosedMonitorUpdateRegeneratedOnStartup((*funding_txo, monitor_update)));
                        }
                }
 
@@ -8359,7 +8499,7 @@ where
                        (6, monitor_update_blocked_actions_per_peer, option),
                        (7, fake_scid_rand_bytes, option),
                        (8, events_override, option),
-                       (9, claimable_htlc_purposes, vec_type),
+                       (9, claimable_htlc_purposes, optional_vec),
                        (10, in_flight_monitor_updates, option),
                        (11, probing_cookie_secret, option),
                        (13, claimable_htlc_onion_fields, optional_vec),
@@ -8423,6 +8563,16 @@ where
                                                        update: update.clone(),
                                                });
                                }
+                               if $chan_in_flight_upds.is_empty() {
+                                       // We had some updates to apply, but it turns out they had completed before we
+                                       // were serialized, we just weren't notified of that. Thus, we may have to run
+                                       // the completion actions for any monitor updates, but otherwise are done.
+                                       pending_background_events.push(
+                                               BackgroundEvent::MonitorUpdatesComplete {
+                                                       counterparty_node_id: $counterparty_node_id,
+                                                       channel_id: $funding_txo.to_channel_id(),
+                                               });
+                               }
                                if $peer_state.in_flight_monitor_updates.insert($funding_txo, $chan_in_flight_upds).is_some() {
                                        log_error!(args.logger, "Duplicate in-flight monitor update set for the same channel!");
                                        return Err(DecodeError::InvalidValue);
@@ -8491,6 +8641,11 @@ where
                // Note that we have to do the above replays before we push new monitor updates.
                pending_background_events.append(&mut close_background_events);
 
+               // If there's any preimages for forwarded HTLCs hanging around in ChannelMonitors we
+               // should ensure we try them again on the inbound edge. We put them here and do so after we
+               // have a fully-constructed `ChannelManager` at the end.
+               let mut pending_claims_to_replay = Vec::new();
+
                {
                        // If we're tracking pending payments, ensure we haven't lost any by looking at the
                        // ChannelMonitor data for any channels for which we do not have authorative state
@@ -8501,7 +8656,8 @@ where
                        // We only rebuild the pending payments map if we were most recently serialized by
                        // 0.0.102+
                        for (_, monitor) in args.channel_monitors.iter() {
-                               if id_to_peer.get(&monitor.get_funding_txo().0.to_channel_id()).is_none() {
+                               let counterparty_opt = id_to_peer.get(&monitor.get_funding_txo().0.to_channel_id());
+                               if counterparty_opt.is_none() {
                                        for (htlc_source, (htlc, _)) in monitor.get_pending_or_resolved_outbound_htlcs() {
                                                if let HTLCSource::OutboundRoute { payment_id, session_priv, path, .. } = htlc_source {
                                                        if path.hops.is_empty() {
@@ -8595,6 +8751,33 @@ where
                                                }
                                        }
                                }
+
+                               // Whether the downstream channel was closed or not, try to re-apply any payment
+                               // preimages from it which may be needed in upstream channels for forwarded
+                               // payments.
+                               let outbound_claimed_htlcs_iter = monitor.get_all_current_outbound_htlcs()
+                                       .into_iter()
+                                       .filter_map(|(htlc_source, (htlc, preimage_opt))| {
+                                               if let HTLCSource::PreviousHopData(_) = htlc_source {
+                                                       if let Some(payment_preimage) = preimage_opt {
+                                                               Some((htlc_source, payment_preimage, htlc.amount_msat,
+                                                                       // Check if `counterparty_opt.is_none()` to see if the
+                                                                       // downstream chan is closed (because we don't have a
+                                                                       // channel_id -> peer map entry).
+                                                                       counterparty_opt.is_none(),
+                                                                       monitor.get_funding_txo().0.to_channel_id()))
+                                                       } else { None }
+                                               } else {
+                                                       // If it was an outbound payment, we've handled it above - if a preimage
+                                                       // came in and we persisted the `ChannelManager` we either handled it and
+                                                       // are good to go or the channel force-closed - we don't have to handle the
+                                                       // channel still live case here.
+                                                       None
+                                               }
+                                       });
+                               for tuple in outbound_claimed_htlcs_iter {
+                                       pending_claims_to_replay.push(tuple);
+                               }
                        }
                }
 
@@ -8782,6 +8965,12 @@ where
                                                                blocked_peer_state.lock().unwrap().actions_blocking_raa_monitor_updates
                                                                        .entry(blocked_channel_outpoint.to_channel_id())
                                                                        .or_insert_with(Vec::new).push(blocking_action.clone());
+                                                       } else {
+                                                               // If the channel we were blocking has closed, we don't need to
+                                                               // worry about it - the blocked monitor update should never have
+                                                               // been released from the `Channel` object so it can't have
+                                                               // completed, and if the channel closed there's no reason to bother
+                                                               // anymore.
                                                        }
                                                }
                                        }
@@ -8827,7 +9016,6 @@ where
                        pending_events_processor: AtomicBool::new(false),
                        pending_background_events: Mutex::new(pending_background_events),
                        total_consistency_lock: RwLock::new(()),
-                       #[cfg(debug_assertions)]
                        background_events_processed_since_startup: AtomicBool::new(false),
                        persistence_notifier: Notifier::new(),
 
@@ -8846,6 +9034,14 @@ where
                        channel_manager.fail_htlc_backwards_internal(&source, &payment_hash, &reason, receiver);
                }
 
+               for (source, preimage, downstream_value, downstream_closed, downstream_chan_id) in pending_claims_to_replay {
+                       // We use `downstream_closed` in place of `from_onchain` here just as a guess - we
+                       // don't remember in the `ChannelMonitor` where we got a preimage from, but if the
+                       // channel is closed we just assume that it probably came from an on-chain claim.
+                       channel_manager.claim_funds_internal(source, preimage, Some(downstream_value),
+                               downstream_closed, downstream_chan_id);
+               }
+
                //TODO: Broadcast channel update for closed channels, but only after we've made a
                //connection or two.
 
@@ -9823,20 +10019,18 @@ mod tests {
                let node = create_network(1, &node_cfg, &node_chanmgr);
                let sender_intended_amt_msat = 100;
                let extra_fee_msat = 10;
-               let hop_data = msgs::OnionHopData {
-                       amt_to_forward: 100,
+               let hop_data = msgs::InboundOnionPayload::Receive {
+                       amt_msat: 100,
                        outgoing_cltv_value: 42,
-                       format: msgs::OnionHopDataFormat::FinalNode {
-                               keysend_preimage: None,
-                               payment_metadata: None,
-                               payment_data: Some(msgs::FinalOnionHopData {
-                                       payment_secret: PaymentSecret([0; 32]), total_msat: sender_intended_amt_msat,
-                               }),
-                       }
+                       payment_metadata: None,
+                       keysend_preimage: None,
+                       payment_data: Some(msgs::FinalOnionHopData {
+                               payment_secret: PaymentSecret([0; 32]), total_msat: sender_intended_amt_msat,
+                       }),
                };
                // Check that if the amount we received + the penultimate hop extra fee is less than the sender
                // intended amount, we fail the payment.
-               if let Err(crate::ln::channelmanager::ReceiveError { err_code, .. }) =
+               if let Err(crate::ln::channelmanager::InboundOnionErr { err_code, .. }) =
                        node[0].node.construct_recv_pending_htlc_info(hop_data, [0; 32], PaymentHash([0; 32]),
                                sender_intended_amt_msat - extra_fee_msat - 1, 42, None, true, Some(extra_fee_msat))
                {
@@ -9844,16 +10038,14 @@ mod tests {
                } else { panic!(); }
 
                // If amt_received + extra_fee is equal to the sender intended amount, we're fine.
-               let hop_data = msgs::OnionHopData { // This is the same hop_data as above, OnionHopData doesn't implement Clone
-                       amt_to_forward: 100,
+               let hop_data = msgs::InboundOnionPayload::Receive { // This is the same payload as above, InboundOnionPayload doesn't implement Clone
+                       amt_msat: 100,
                        outgoing_cltv_value: 42,
-                       format: msgs::OnionHopDataFormat::FinalNode {
-                               keysend_preimage: None,
-                               payment_metadata: None,
-                               payment_data: Some(msgs::FinalOnionHopData {
-                                       payment_secret: PaymentSecret([0; 32]), total_msat: sender_intended_amt_msat,
-                               }),
-                       }
+                       payment_metadata: None,
+                       keysend_preimage: None,
+                       payment_data: Some(msgs::FinalOnionHopData {
+                               payment_secret: PaymentSecret([0; 32]), total_msat: sender_intended_amt_msat,
+                       }),
                };
                assert!(node[0].node.construct_recv_pending_htlc_info(hop_data, [0; 32], PaymentHash([0; 32]),
                        sender_intended_amt_msat - extra_fee_msat, 42, None, true, Some(extra_fee_msat)).is_ok());
@@ -10007,7 +10199,7 @@ pub mod bench {
        use crate::routing::gossip::NetworkGraph;
        use crate::routing::router::{PaymentParameters, RouteParameters};
        use crate::util::test_utils;
-       use crate::util::config::UserConfig;
+       use crate::util::config::{UserConfig, MaxDustHTLCExposure};
 
        use bitcoin::hashes::Hash;
        use bitcoin::hashes::sha256::Hash as Sha256;
@@ -10054,6 +10246,7 @@ pub mod bench {
                let router = test_utils::TestRouter::new(Arc::new(NetworkGraph::new(network, &logger_a)), &scorer);
 
                let mut config: UserConfig = Default::default();
+               config.channel_config.max_dust_htlc_exposure = MaxDustHTLCExposure::FeeRateMultiplier(5_000_000 / 253);
                config.channel_handshake_config.minimum_depth = 1;
 
                let chain_monitor_a = ChainMonitor::new(None, &tx_broadcaster, &logger_a, &fee_estimator, &persister_a);