Merge pull request #2400 from TheBlueMatt/2023-07-kill-vec_type
authorMatt Corallo <649246+TheBlueMatt@users.noreply.github.com>
Tue, 11 Jul 2023 19:58:34 +0000 (19:58 +0000)
committerGitHub <noreply@github.com>
Tue, 11 Jul 2023 19:58:34 +0000 (19:58 +0000)
Fix backwards compat for blocked_monitor_updates and finally kill `vec_type`

1  2 
lightning/src/chain/channelmonitor.rs
lightning/src/ln/channel.rs
lightning/src/ln/channelmanager.rs
lightning/src/routing/gossip.rs
lightning/src/routing/router.rs
lightning/src/util/ser_macros.rs

index 36e3bc46bfcc40370519a3d906d06e9e90458bea,71fc1cb6f06845e9ad49a611fce7be552a11fd00..78b5c65e1407170df338b71ca678e79b7738fda0
@@@ -49,7 -49,7 +49,7 @@@ use crate::chain::Filter
  use crate::util::logger::Logger;
  use crate::util::ser::{Readable, ReadableArgs, RequiredWrapper, MaybeReadable, UpgradableRequired, Writer, Writeable, U48};
  use crate::util::byte_utils;
 -use crate::events::Event;
 +use crate::events::{Event, EventHandler};
  use crate::events::bump_transaction::{AnchorDescriptor, HTLCDescriptor, BumpTransactionEvent};
  
  use crate::prelude::*;
@@@ -262,7 -262,7 +262,7 @@@ impl_writeable_tlv_based!(HolderSignedT
        (8, delayed_payment_key, required),
        (10, per_commitment_point, required),
        (12, feerate_per_kw, required),
-       (14, htlc_outputs, vec_type)
+       (14, htlc_outputs, required_vec)
  });
  
  impl HolderSignedTx {
@@@ -538,15 -538,15 +538,15 @@@ impl ChannelMonitorUpdateStep 
  impl_writeable_tlv_based_enum_upgradable!(ChannelMonitorUpdateStep,
        (0, LatestHolderCommitmentTXInfo) => {
                (0, commitment_tx, required),
-               (1, claimed_htlcs, vec_type),
-               (2, htlc_outputs, vec_type),
+               (1, claimed_htlcs, optional_vec),
+               (2, htlc_outputs, required_vec),
                (4, nondust_htlc_sources, optional_vec),
        },
        (1, LatestCounterpartyCommitmentTXInfo) => {
                (0, commitment_txid, required),
                (2, commitment_number, required),
                (4, their_per_commitment_point, required),
-               (6, htlc_outputs, vec_type),
+               (6, htlc_outputs, required_vec),
        },
        (2, PaymentPreimage) => {
                (0, payment_preimage, required),
@@@ -738,6 -738,11 +738,6 @@@ impl Readable for IrrevocablyResolvedHT
  /// You MUST ensure that no ChannelMonitors for a given channel anywhere contain out-of-date
  /// information and are actively monitoring the chain.
  ///
 -/// Pending Events or updated HTLCs which have not yet been read out by
 -/// get_and_clear_pending_monitor_events or get_and_clear_pending_events are serialized to disk and
 -/// reloaded at deserialize-time. Thus, you must ensure that, when handling events, all events
 -/// gotten are fully handled before re-serializing the new state.
 -///
  /// Note that the deserializer is only implemented for (BlockHash, ChannelMonitor), which
  /// tells you the last block hash which was block_connect()ed. You MUST rescan any blocks along
  /// the "reorg path" (ie disconnecting blocks until you find a common ancestor from both the
@@@ -747,7 -752,7 +747,7 @@@ pub struct ChannelMonitor<Signer: Write
        #[cfg(test)]
        pub(crate) inner: Mutex<ChannelMonitorImpl<Signer>>,
        #[cfg(not(test))]
 -      inner: Mutex<ChannelMonitorImpl<Signer>>,
 +      pub(super) inner: Mutex<ChannelMonitorImpl<Signer>>,
  }
  
  #[derive(PartialEq)]
@@@ -824,8 -829,7 +824,8 @@@ pub(crate) struct ChannelMonitorImpl<Si
        // we further MUST NOT generate events during block/transaction-disconnection.
        pending_monitor_events: Vec<MonitorEvent>,
  
 -      pending_events: Vec<Event>,
 +      pub(super) pending_events: Vec<Event>,
 +      pub(super) is_processing_pending_events: bool,
  
        // Used to track on-chain events (i.e., transactions part of channels confirmed on chain) on
        // which to take actions once they reach enough confirmations. Each entry includes the
@@@ -1071,12 -1075,12 +1071,12 @@@ impl<Signer: WriteableEcdsaChannelSigne
  
                write_tlv_fields!(writer, {
                        (1, self.funding_spend_confirmed, option),
-                       (3, self.htlcs_resolved_on_chain, vec_type),
-                       (5, self.pending_monitor_events, vec_type),
+                       (3, self.htlcs_resolved_on_chain, required_vec),
+                       (5, self.pending_monitor_events, required_vec),
                        (7, self.funding_spend_seen, required),
                        (9, self.counterparty_node_id, option),
                        (11, self.confirmed_commitment_tx_counterparty_output, option),
-                       (13, self.spendable_txids_confirmed, vec_type),
+                       (13, self.spendable_txids_confirmed, required_vec),
                        (15, self.counterparty_fulfilled_htlcs, required),
                });
  
        }
  }
  
 +macro_rules! _process_events_body {
 +      ($self_opt: expr, $event_to_handle: expr, $handle_event: expr) => {
 +              loop {
 +                      let (pending_events, repeated_events);
 +                      if let Some(us) = $self_opt {
 +                              let mut inner = us.inner.lock().unwrap();
 +                              if inner.is_processing_pending_events {
 +                                      break;
 +                              }
 +                              inner.is_processing_pending_events = true;
 +
 +                              pending_events = inner.pending_events.clone();
 +                              repeated_events = inner.get_repeated_events();
 +                      } else { break; }
 +                      let num_events = pending_events.len();
 +
 +                      for event in pending_events.into_iter().chain(repeated_events.into_iter()) {
 +                              $event_to_handle = event;
 +                              $handle_event;
 +                      }
 +
 +                      if let Some(us) = $self_opt {
 +                              let mut inner = us.inner.lock().unwrap();
 +                              inner.pending_events.drain(..num_events);
 +                              inner.is_processing_pending_events = false;
 +                              if !inner.pending_events.is_empty() {
 +                                      // If there's more events to process, go ahead and do so.
 +                                      continue;
 +                              }
 +                      }
 +                      break;
 +              }
 +      }
 +}
 +pub(super) use _process_events_body as process_events_body;
 +
  impl<Signer: WriteableEcdsaChannelSigner> ChannelMonitor<Signer> {
        /// For lockorder enforcement purposes, we need to have a single site which constructs the
        /// `inner` mutex, otherwise cases where we lock two monitors at the same time (eg in our
                        payment_preimages: HashMap::new(),
                        pending_monitor_events: Vec::new(),
                        pending_events: Vec::new(),
 +                      is_processing_pending_events: false,
  
                        onchain_events_awaiting_threshold_conf: Vec::new(),
                        outputs_to_watch,
                self.inner.lock().unwrap().get_and_clear_pending_monitor_events()
        }
  
 -      /// Gets the list of pending events which were generated by previous actions, clearing the list
 -      /// in the process.
 +      /// Processes [`SpendableOutputs`] events produced from each [`ChannelMonitor`] upon maturity.
 +      ///
 +      /// For channels featuring anchor outputs, this method will also process [`BumpTransaction`]
 +      /// events produced from each [`ChannelMonitor`] while there is a balance to claim onchain
 +      /// within each channel. As the confirmation of a commitment transaction may be critical to the
 +      /// safety of funds, we recommend invoking this every 30 seconds, or lower if running in an
 +      /// environment with spotty connections, like on mobile.
        ///
 -      /// This is called by the [`EventsProvider::process_pending_events`] implementation for
 -      /// [`ChainMonitor`].
 +      /// An [`EventHandler`] may safely call back to the provider, though this shouldn't be needed in
 +      /// order to handle these events.
 +      ///
 +      /// [`SpendableOutputs`]: crate::events::Event::SpendableOutputs
 +      /// [`BumpTransaction`]: crate::events::Event::BumpTransaction
 +      pub fn process_pending_events<H: Deref>(&self, handler: &H) where H::Target: EventHandler {
 +              let mut ev;
 +              process_events_body!(Some(self), ev, handler.handle_event(ev));
 +      }
 +
 +      /// Processes any events asynchronously.
        ///
 -      /// [`EventsProvider::process_pending_events`]: crate::events::EventsProvider::process_pending_events
 -      /// [`ChainMonitor`]: crate::chain::chainmonitor::ChainMonitor
 +      /// See [`Self::process_pending_events`] for more information.
 +      pub async fn process_pending_events_async<Future: core::future::Future, H: Fn(Event) -> Future>(
 +              &self, handler: &H
 +      ) {
 +              let mut ev;
 +              process_events_body!(Some(self), ev, { handler(ev).await });
 +      }
 +
 +      #[cfg(test)]
        pub fn get_and_clear_pending_events(&self) -> Vec<Event> {
 -              self.inner.lock().unwrap().get_and_clear_pending_events()
 +              let mut ret = Vec::new();
 +              let mut lck = self.inner.lock().unwrap();
 +              mem::swap(&mut ret, &mut lck.pending_events);
 +              ret.append(&mut lck.get_repeated_events());
 +              ret
        }
  
        pub(crate) fn get_min_seen_secret(&self) -> u64 {
@@@ -2589,13 -2531,10 +2589,13 @@@ impl<Signer: WriteableEcdsaChannelSigne
                ret
        }
  
 -      pub fn get_and_clear_pending_events(&mut self) -> Vec<Event> {
 -              let mut ret = Vec::new();
 -              mem::swap(&mut ret, &mut self.pending_events);
 -              for (claim_id, claim_event) in self.onchain_tx_handler.get_and_clear_pending_claim_events().drain(..) {
 +      /// Gets the set of events that are repeated regularly (e.g. those which RBF bump
 +      /// transactions). We're okay if we lose these on restart as they'll be regenerated for us at
 +      /// some regular interval via [`ChannelMonitor::rebroadcast_pending_claims`].
 +      pub(super) fn get_repeated_events(&mut self) -> Vec<Event> {
 +              let pending_claim_events = self.onchain_tx_handler.get_and_clear_pending_claim_events();
 +              let mut ret = Vec::with_capacity(pending_claim_events.len());
 +              for (claim_id, claim_event) in pending_claim_events {
                        match claim_event {
                                ClaimEvent::BumpCommitment {
                                        package_target_feerate_sat_per_1000_weight, commitment_tx, anchor_output_idx,
@@@ -4112,12 -4051,12 +4112,12 @@@ impl<'a, 'b, ES: EntropySource, SP: Sig
                let mut counterparty_fulfilled_htlcs = Some(HashMap::new());
                read_tlv_fields!(reader, {
                        (1, funding_spend_confirmed, option),
-                       (3, htlcs_resolved_on_chain, vec_type),
-                       (5, pending_monitor_events, vec_type),
+                       (3, htlcs_resolved_on_chain, optional_vec),
+                       (5, pending_monitor_events, optional_vec),
                        (7, funding_spend_seen, option),
                        (9, counterparty_node_id, option),
                        (11, confirmed_commitment_tx_counterparty_output, option),
-                       (13, spendable_txids_confirmed, vec_type),
+                       (13, spendable_txids_confirmed, optional_vec),
                        (15, counterparty_fulfilled_htlcs, option),
                });
  
                        payment_preimages,
                        pending_monitor_events: pending_monitor_events.unwrap(),
                        pending_events,
 +                      is_processing_pending_events: false,
  
                        onchain_events_awaiting_threshold_conf,
                        outputs_to_watch,
index ff34b05ff0e1e08180781e3fbbab3dcdd58fea1c,95409fa1e5c011147ef82cd5e69e6b6579dbd940..8232c5e1b1611e22b2a91f467f5a11e35a526215
@@@ -27,7 -27,7 +27,7 @@@ use crate::ln::features::{ChannelTypeFe
  use crate::ln::msgs;
  use crate::ln::msgs::DecodeError;
  use crate::ln::script::{self, ShutdownScript};
 -use crate::ln::channelmanager::{self, CounterpartyForwardingInfo, PendingHTLCStatus, HTLCSource, SentHTLCId, HTLCFailureMsg, PendingHTLCInfo, RAACommitmentOrder, BREAKDOWN_TIMEOUT, MIN_CLTV_EXPIRY_DELTA, MAX_LOCAL_BREAKDOWN_TIMEOUT};
 +use crate::ln::channelmanager::{self, CounterpartyForwardingInfo, PendingHTLCStatus, HTLCSource, SentHTLCId, HTLCFailureMsg, PendingHTLCInfo, RAACommitmentOrder, BREAKDOWN_TIMEOUT, MIN_CLTV_EXPIRY_DELTA, MAX_LOCAL_BREAKDOWN_TIMEOUT, ChannelShutdownState};
  use crate::ln::chan_utils::{CounterpartyCommitmentSecrets, TxCreationKeys, HTLCOutputInCommitment, htlc_success_tx_weight, htlc_timeout_tx_weight, make_funding_redeemscript, ChannelPublicKeys, CommitmentTransaction, HolderCommitmentTransaction, ChannelTransactionParameters, CounterpartyChannelTransactionParameters, MAX_HTLCS, get_commitment_transaction_number_obscure_factor, ClosingTransaction};
  use crate::ln::chan_utils;
  use crate::ln::onion_utils::HTLCFailReason;
@@@ -41,7 -41,7 +41,7 @@@ use crate::routing::gossip::NodeId
  use crate::util::ser::{Readable, ReadableArgs, Writeable, Writer, VecWriter};
  use crate::util::logger::Logger;
  use crate::util::errors::APIError;
 -use crate::util::config::{UserConfig, ChannelConfig, LegacyChannelConfig, ChannelHandshakeConfig, ChannelHandshakeLimits};
 +use crate::util::config::{UserConfig, ChannelConfig, LegacyChannelConfig, ChannelHandshakeConfig, ChannelHandshakeLimits, MaxDustHTLCExposure};
  use crate::util::scid_utils::scid_from_parts;
  
  use crate::io;
@@@ -527,10 -527,6 +527,10 @@@ pub(super) struct ReestablishResponses 
  }
  
  /// The return type of `force_shutdown`
 +///
 +/// Contains a (counterparty_node_id, funding_txo, [`ChannelMonitorUpdate`]) tuple
 +/// followed by a list of HTLCs to fail back in the form of the (source, payment hash, and this
 +/// channel's counterparty_node_id and channel_id).
  pub(crate) type ShutdownResult = (
        Option<(PublicKey, OutPoint, ChannelMonitorUpdate)>,
        Vec<(HTLCSource, PaymentHash, PublicKey, [u8; 32])>
@@@ -907,34 -903,6 +907,34 @@@ impl<Signer: ChannelSigner> ChannelCont
                (self.channel_state & mask) == (ChannelState::ChannelReady as u32) && !self.monitor_pending_channel_ready
        }
  
 +      /// shutdown state returns the state of the channel in its various stages of shutdown
 +      pub fn shutdown_state(&self) -> ChannelShutdownState {
 +              if self.channel_state & (ChannelState::ShutdownComplete as u32) != 0 {
 +                      return ChannelShutdownState::ShutdownComplete;
 +              }
 +              if self.channel_state & (ChannelState::LocalShutdownSent as u32) != 0 &&  self.channel_state & (ChannelState::RemoteShutdownSent as u32) == 0 {
 +                      return ChannelShutdownState::ShutdownInitiated;
 +              }
 +              if (self.channel_state & BOTH_SIDES_SHUTDOWN_MASK != 0) && !self.closing_negotiation_ready() {
 +                      return ChannelShutdownState::ResolvingHTLCs;
 +              }
 +              if (self.channel_state & BOTH_SIDES_SHUTDOWN_MASK != 0) && self.closing_negotiation_ready() {
 +                      return ChannelShutdownState::NegotiatingClosingFee;
 +              }
 +              return ChannelShutdownState::NotShuttingDown;
 +      }
 +
 +      fn closing_negotiation_ready(&self) -> bool {
 +              self.pending_inbound_htlcs.is_empty() &&
 +              self.pending_outbound_htlcs.is_empty() &&
 +              self.pending_update_fee.is_none() &&
 +              self.channel_state &
 +              (BOTH_SIDES_SHUTDOWN_MASK |
 +                      ChannelState::AwaitingRemoteRevoke as u32 |
 +                      ChannelState::PeerDisconnected as u32 |
 +                      ChannelState::MonitorUpdateInProgress as u32) == BOTH_SIDES_SHUTDOWN_MASK
 +      }
 +
        /// Returns true if this channel is currently available for use. This is a superset of
        /// is_usable() and considers things like the channel being temporarily disabled.
        /// Allowed in any state (including after shutdown)
                cmp::max(self.config.options.cltv_expiry_delta, MIN_CLTV_EXPIRY_DELTA)
        }
  
 -      pub fn get_max_dust_htlc_exposure_msat(&self) -> u64 {
 -              self.config.options.max_dust_htlc_exposure_msat
 +      pub fn get_max_dust_htlc_exposure_msat<F: Deref>(&self,
 +              fee_estimator: &LowerBoundedFeeEstimator<F>) -> u64
 +      where F::Target: FeeEstimator
 +      {
 +              match self.config.options.max_dust_htlc_exposure {
 +                      MaxDustHTLCExposure::FeeRateMultiplier(multiplier) => {
 +                              let feerate_per_kw = fee_estimator.bounded_sat_per_1000_weight(
 +                                      ConfirmationTarget::HighPriority);
 +                              feerate_per_kw as u64 * multiplier
 +                      },
 +                      MaxDustHTLCExposure::FixedLimitMsat(limit) => limit,
 +              }
        }
  
        /// Returns the previous [`ChannelConfig`] applied to this channel, if any.
        /// Doesn't bother handling the
        /// if-we-removed-it-already-but-haven't-fully-resolved-they-can-still-send-an-inbound-HTLC
        /// corner case properly.
 -      pub fn get_available_balances(&self) -> AvailableBalances {
 +      pub fn get_available_balances<F: Deref>(&self, fee_estimator: &LowerBoundedFeeEstimator<F>)
 +      -> AvailableBalances
 +      where F::Target: FeeEstimator
 +      {
                let context = &self;
                // Note that we have to handle overflow due to the above case.
                let inbound_stats = context.get_inbound_pending_htlc_stats(None);
                // send above the dust limit (as the router can always overpay to meet the dust limit).
                let mut remaining_msat_below_dust_exposure_limit = None;
                let mut dust_exposure_dust_limit_msat = 0;
 +              let max_dust_htlc_exposure_msat = context.get_max_dust_htlc_exposure_msat(fee_estimator);
  
                let (htlc_success_dust_limit, htlc_timeout_dust_limit) = if context.get_channel_type().supports_anchors_zero_fee_htlc_tx() {
                        (context.counterparty_dust_limit_satoshis, context.holder_dust_limit_satoshis)
                         context.holder_dust_limit_satoshis       + dust_buffer_feerate * htlc_timeout_tx_weight(context.get_channel_type()) / 1000)
                };
                let on_counterparty_dust_htlc_exposure_msat = inbound_stats.on_counterparty_tx_dust_exposure_msat + outbound_stats.on_counterparty_tx_dust_exposure_msat;
 -              if on_counterparty_dust_htlc_exposure_msat as i64 + htlc_success_dust_limit as i64 * 1000 - 1 > context.get_max_dust_htlc_exposure_msat() as i64 {
 +              if on_counterparty_dust_htlc_exposure_msat as i64 + htlc_success_dust_limit as i64 * 1000 - 1 > max_dust_htlc_exposure_msat as i64 {
                        remaining_msat_below_dust_exposure_limit =
 -                              Some(context.get_max_dust_htlc_exposure_msat().saturating_sub(on_counterparty_dust_htlc_exposure_msat));
 +                              Some(max_dust_htlc_exposure_msat.saturating_sub(on_counterparty_dust_htlc_exposure_msat));
                        dust_exposure_dust_limit_msat = cmp::max(dust_exposure_dust_limit_msat, htlc_success_dust_limit * 1000);
                }
  
                let on_holder_dust_htlc_exposure_msat = inbound_stats.on_holder_tx_dust_exposure_msat + outbound_stats.on_holder_tx_dust_exposure_msat;
 -              if on_holder_dust_htlc_exposure_msat as i64 + htlc_timeout_dust_limit as i64 * 1000 - 1 > context.get_max_dust_htlc_exposure_msat() as i64 {
 +              if on_holder_dust_htlc_exposure_msat as i64 + htlc_timeout_dust_limit as i64 * 1000 - 1 > max_dust_htlc_exposure_msat as i64 {
                        remaining_msat_below_dust_exposure_limit = Some(cmp::min(
                                remaining_msat_below_dust_exposure_limit.unwrap_or(u64::max_value()),
 -                              context.get_max_dust_htlc_exposure_msat().saturating_sub(on_holder_dust_htlc_exposure_msat)));
 +                              max_dust_htlc_exposure_msat.saturating_sub(on_holder_dust_htlc_exposure_msat)));
                        dust_exposure_dust_limit_msat = cmp::max(dust_exposure_dust_limit_msat, htlc_timeout_dust_limit * 1000);
                }
  
@@@ -2598,13 -2552,8 +2598,13 @@@ impl<Signer: WriteableEcdsaChannelSigne
                Ok(self.get_announcement_sigs(node_signer, genesis_block_hash, user_config, best_block.height(), logger))
        }
  
 -      pub fn update_add_htlc<F, L: Deref>(&mut self, msg: &msgs::UpdateAddHTLC, mut pending_forward_status: PendingHTLCStatus, create_pending_htlc_status: F, logger: &L) -> Result<(), ChannelError>
 -      where F: for<'a> Fn(&'a Self, PendingHTLCStatus, u16) -> PendingHTLCStatus, L::Target: Logger {
 +      pub fn update_add_htlc<F, FE: Deref, L: Deref>(
 +              &mut self, msg: &msgs::UpdateAddHTLC, mut pending_forward_status: PendingHTLCStatus,
 +              create_pending_htlc_status: F, fee_estimator: &LowerBoundedFeeEstimator<FE>, logger: &L
 +      ) -> Result<(), ChannelError>
 +      where F: for<'a> Fn(&'a Self, PendingHTLCStatus, u16) -> PendingHTLCStatus,
 +              FE::Target: FeeEstimator, L::Target: Logger,
 +      {
                // We can't accept HTLCs sent after we've sent a shutdown.
                let local_sent_shutdown = (self.context.channel_state & (ChannelState::ChannelReady as u32 | ChannelState::LocalShutdownSent as u32)) != (ChannelState::ChannelReady as u32);
                if local_sent_shutdown {
                        }
                }
  
 +              let max_dust_htlc_exposure_msat = self.context.get_max_dust_htlc_exposure_msat(fee_estimator);
                let (htlc_timeout_dust_limit, htlc_success_dust_limit) = if self.context.get_channel_type().supports_anchors_zero_fee_htlc_tx() {
                        (0, 0)
                } else {
                let exposure_dust_limit_timeout_sats = htlc_timeout_dust_limit + self.context.counterparty_dust_limit_satoshis;
                if msg.amount_msat / 1000 < exposure_dust_limit_timeout_sats {
                        let on_counterparty_tx_dust_htlc_exposure_msat = inbound_stats.on_counterparty_tx_dust_exposure_msat + outbound_stats.on_counterparty_tx_dust_exposure_msat + msg.amount_msat;
 -                      if on_counterparty_tx_dust_htlc_exposure_msat > self.context.get_max_dust_htlc_exposure_msat() {
 +                      if on_counterparty_tx_dust_htlc_exposure_msat > max_dust_htlc_exposure_msat {
                                log_info!(logger, "Cannot accept value that would put our exposure to dust HTLCs at {} over the limit {} on counterparty commitment tx",
 -                                      on_counterparty_tx_dust_htlc_exposure_msat, self.context.get_max_dust_htlc_exposure_msat());
 +                                      on_counterparty_tx_dust_htlc_exposure_msat, max_dust_htlc_exposure_msat);
                                pending_forward_status = create_pending_htlc_status(self, pending_forward_status, 0x1000|7);
                        }
                }
                let exposure_dust_limit_success_sats = htlc_success_dust_limit + self.context.holder_dust_limit_satoshis;
                if msg.amount_msat / 1000 < exposure_dust_limit_success_sats {
                        let on_holder_tx_dust_htlc_exposure_msat = inbound_stats.on_holder_tx_dust_exposure_msat + outbound_stats.on_holder_tx_dust_exposure_msat + msg.amount_msat;
 -                      if on_holder_tx_dust_htlc_exposure_msat > self.context.get_max_dust_htlc_exposure_msat() {
 +                      if on_holder_tx_dust_htlc_exposure_msat > max_dust_htlc_exposure_msat {
                                log_info!(logger, "Cannot accept value that would put our exposure to dust HTLCs at {} over the limit {} on holder commitment tx",
 -                                      on_holder_tx_dust_htlc_exposure_msat, self.context.get_max_dust_htlc_exposure_msat());
 +                                      on_holder_tx_dust_htlc_exposure_msat, max_dust_htlc_exposure_msat);
                                pending_forward_status = create_pending_htlc_status(self, pending_forward_status, 0x1000|7);
                        }
                }
        /// Public version of the below, checking relevant preconditions first.
        /// If we're not in a state where freeing the holding cell makes sense, this is a no-op and
        /// returns `(None, Vec::new())`.
 -      pub fn maybe_free_holding_cell_htlcs<L: Deref>(&mut self, logger: &L) -> (Option<ChannelMonitorUpdate>, Vec<(HTLCSource, PaymentHash)>) where L::Target: Logger {
 +      pub fn maybe_free_holding_cell_htlcs<F: Deref, L: Deref>(
 +              &mut self, fee_estimator: &LowerBoundedFeeEstimator<F>, logger: &L
 +      ) -> (Option<ChannelMonitorUpdate>, Vec<(HTLCSource, PaymentHash)>)
 +      where F::Target: FeeEstimator, L::Target: Logger
 +      {
                if self.context.channel_state >= ChannelState::ChannelReady as u32 &&
                   (self.context.channel_state & (ChannelState::AwaitingRemoteRevoke as u32 | ChannelState::PeerDisconnected as u32 | ChannelState::MonitorUpdateInProgress as u32)) == 0 {
 -                      self.free_holding_cell_htlcs(logger)
 +                      self.free_holding_cell_htlcs(fee_estimator, logger)
                } else { (None, Vec::new()) }
        }
  
        /// Frees any pending commitment updates in the holding cell, generating the relevant messages
        /// for our counterparty.
 -      fn free_holding_cell_htlcs<L: Deref>(&mut self, logger: &L) -> (Option<ChannelMonitorUpdate>, Vec<(HTLCSource, PaymentHash)>) where L::Target: Logger {
 +      fn free_holding_cell_htlcs<F: Deref, L: Deref>(
 +              &mut self, fee_estimator: &LowerBoundedFeeEstimator<F>, logger: &L
 +      ) -> (Option<ChannelMonitorUpdate>, Vec<(HTLCSource, PaymentHash)>)
 +      where F::Target: FeeEstimator, L::Target: Logger
 +      {
                assert_eq!(self.context.channel_state & ChannelState::MonitorUpdateInProgress as u32, 0);
                if self.context.holding_cell_htlc_updates.len() != 0 || self.context.holding_cell_update_fee.is_some() {
                        log_trace!(logger, "Freeing holding cell with {} HTLC updates{} in channel {}", self.context.holding_cell_htlc_updates.len(),
                                                skimmed_fee_msat, ..
                                        } => {
                                                match self.send_htlc(amount_msat, *payment_hash, cltv_expiry, source.clone(),
 -                                                      onion_routing_packet.clone(), false, skimmed_fee_msat, logger)
 +                                                      onion_routing_packet.clone(), false, skimmed_fee_msat, fee_estimator, logger)
                                                {
                                                        Ok(update_add_msg_option) => update_add_htlcs.push(update_add_msg_option.unwrap()),
                                                        Err(e) => {
                                return (None, htlcs_to_fail);
                        }
                        let update_fee = if let Some(feerate) = self.context.holding_cell_update_fee.take() {
 -                              self.send_update_fee(feerate, false, logger)
 +                              self.send_update_fee(feerate, false, fee_estimator, logger)
                        } else {
                                None
                        };
        /// waiting on this revoke_and_ack. The generation of this new commitment_signed may also fail,
        /// generating an appropriate error *after* the channel state has been updated based on the
        /// revoke_and_ack message.
 -      pub fn revoke_and_ack<L: Deref>(&mut self, msg: &msgs::RevokeAndACK, logger: &L) -> Result<(Vec<(HTLCSource, PaymentHash)>, Option<ChannelMonitorUpdate>), ChannelError>
 -              where L::Target: Logger,
 +      pub fn revoke_and_ack<F: Deref, L: Deref>(&mut self, msg: &msgs::RevokeAndACK,
 +              fee_estimator: &LowerBoundedFeeEstimator<F>, logger: &L
 +      ) -> Result<(Vec<(HTLCSource, PaymentHash)>, Option<ChannelMonitorUpdate>), ChannelError>
 +      where F::Target: FeeEstimator, L::Target: Logger,
        {
                if (self.context.channel_state & (ChannelState::ChannelReady as u32)) != (ChannelState::ChannelReady as u32) {
                        return Err(ChannelError::Close("Got revoke/ACK message when channel was not in an operational state".to_owned()));
                        return Ok((Vec::new(), self.push_ret_blockable_mon_update(monitor_update)));
                }
  
 -              match self.free_holding_cell_htlcs(logger) {
 +              match self.free_holding_cell_htlcs(fee_estimator, logger) {
                        (Some(mut additional_update), htlcs_to_fail) => {
                                // free_holding_cell_htlcs may bump latest_monitor_id multiple times but we want them to be
                                // strictly increasing by one, so decrement it here.
        /// Queues up an outbound update fee by placing it in the holding cell. You should call
        /// [`Self::maybe_free_holding_cell_htlcs`] in order to actually generate and send the
        /// commitment update.
 -      pub fn queue_update_fee<L: Deref>(&mut self, feerate_per_kw: u32, logger: &L) where L::Target: Logger {
 -              let msg_opt = self.send_update_fee(feerate_per_kw, true, logger);
 +      pub fn queue_update_fee<F: Deref, L: Deref>(&mut self, feerate_per_kw: u32,
 +              fee_estimator: &LowerBoundedFeeEstimator<F>, logger: &L)
 +      where F::Target: FeeEstimator, L::Target: Logger
 +      {
 +              let msg_opt = self.send_update_fee(feerate_per_kw, true, fee_estimator, logger);
                assert!(msg_opt.is_none(), "We forced holding cell?");
        }
  
        ///
        /// You MUST call [`Self::send_commitment_no_state_update`] prior to any other calls on this
        /// [`Channel`] if `force_holding_cell` is false.
 -      fn send_update_fee<L: Deref>(&mut self, feerate_per_kw: u32, mut force_holding_cell: bool, logger: &L) -> Option<msgs::UpdateFee> where L::Target: Logger {
 +      fn send_update_fee<F: Deref, L: Deref>(
 +              &mut self, feerate_per_kw: u32, mut force_holding_cell: bool,
 +              fee_estimator: &LowerBoundedFeeEstimator<F>, logger: &L
 +      ) -> Option<msgs::UpdateFee>
 +      where F::Target: FeeEstimator, L::Target: Logger
 +      {
                if !self.context.is_outbound() {
                        panic!("Cannot send fee from inbound channel");
                }
                // Note, we evaluate pending htlc "preemptive" trimmed-to-dust threshold at the proposed `feerate_per_kw`.
                let holder_tx_dust_exposure = inbound_stats.on_holder_tx_dust_exposure_msat + outbound_stats.on_holder_tx_dust_exposure_msat;
                let counterparty_tx_dust_exposure = inbound_stats.on_counterparty_tx_dust_exposure_msat + outbound_stats.on_counterparty_tx_dust_exposure_msat;
 -              if holder_tx_dust_exposure > self.context.get_max_dust_htlc_exposure_msat() {
 +              let max_dust_htlc_exposure_msat = self.context.get_max_dust_htlc_exposure_msat(fee_estimator);
 +              if holder_tx_dust_exposure > max_dust_htlc_exposure_msat {
                        log_debug!(logger, "Cannot afford to send new feerate at {} without infringing max dust htlc exposure", feerate_per_kw);
                        return None;
                }
 -              if counterparty_tx_dust_exposure > self.context.get_max_dust_htlc_exposure_msat() {
 +              if counterparty_tx_dust_exposure > max_dust_htlc_exposure_msat {
                        log_debug!(logger, "Cannot afford to send new feerate at {} without infringing max dust htlc exposure", feerate_per_kw);
                        return None;
                }
                        let outbound_stats = self.context.get_outbound_pending_htlc_stats(None);
                        let holder_tx_dust_exposure = inbound_stats.on_holder_tx_dust_exposure_msat + outbound_stats.on_holder_tx_dust_exposure_msat;
                        let counterparty_tx_dust_exposure = inbound_stats.on_counterparty_tx_dust_exposure_msat + outbound_stats.on_counterparty_tx_dust_exposure_msat;
 -                      if holder_tx_dust_exposure > self.context.get_max_dust_htlc_exposure_msat() {
 +                      let max_dust_htlc_exposure_msat = self.context.get_max_dust_htlc_exposure_msat(fee_estimator);
 +                      if holder_tx_dust_exposure > max_dust_htlc_exposure_msat {
                                return Err(ChannelError::Close(format!("Peer sent update_fee with a feerate ({}) which may over-expose us to dust-in-flight on our own transactions (totaling {} msat)",
                                        msg.feerate_per_kw, holder_tx_dust_exposure)));
                        }
 -                      if counterparty_tx_dust_exposure > self.context.get_max_dust_htlc_exposure_msat() {
 +                      if counterparty_tx_dust_exposure > max_dust_htlc_exposure_msat {
                                return Err(ChannelError::Close(format!("Peer sent update_fee with a feerate ({}) which may over-expose us to dust-in-flight on our counterparty's transactions (totaling {} msat)",
                                        msg.feerate_per_kw, counterparty_tx_dust_exposure)));
                        }
        /// this point if we're the funder we should send the initial closing_signed, and in any case
        /// shutdown should complete within a reasonable timeframe.
        fn closing_negotiation_ready(&self) -> bool {
 -              self.context.pending_inbound_htlcs.is_empty() && self.context.pending_outbound_htlcs.is_empty() &&
 -                      self.context.channel_state &
 -                              (BOTH_SIDES_SHUTDOWN_MASK | ChannelState::AwaitingRemoteRevoke as u32 |
 -                               ChannelState::PeerDisconnected as u32 | ChannelState::MonitorUpdateInProgress as u32)
 -                              == BOTH_SIDES_SHUTDOWN_MASK &&
 -                      self.context.pending_update_fee.is_none()
 +              self.context.closing_negotiation_ready()
        }
  
        /// Checks if the closing_signed negotiation is making appropriate progress, possibly returning
        /// commitment update.
        ///
        /// `Err`s will only be [`ChannelError::Ignore`].
 -      pub fn queue_add_htlc<L: Deref>(
 +      pub fn queue_add_htlc<F: Deref, L: Deref>(
                &mut self, amount_msat: u64, payment_hash: PaymentHash, cltv_expiry: u32, source: HTLCSource,
 -              onion_routing_packet: msgs::OnionPacket, skimmed_fee_msat: Option<u64>, logger: &L
 -      ) -> Result<(), ChannelError> where L::Target: Logger {
 +              onion_routing_packet: msgs::OnionPacket, skimmed_fee_msat: Option<u64>,
 +              fee_estimator: &LowerBoundedFeeEstimator<F>, logger: &L
 +      ) -> Result<(), ChannelError>
 +      where F::Target: FeeEstimator, L::Target: Logger
 +      {
                self
                        .send_htlc(amount_msat, payment_hash, cltv_expiry, source, onion_routing_packet, true,
 -                              skimmed_fee_msat, logger)
 +                              skimmed_fee_msat, fee_estimator, logger)
                        .map(|msg_opt| assert!(msg_opt.is_none(), "We forced holding cell?"))
                        .map_err(|err| {
                                if let ChannelError::Ignore(_) = err { /* fine */ }
        /// on this [`Channel`] if `force_holding_cell` is false.
        ///
        /// `Err`s will only be [`ChannelError::Ignore`].
 -      fn send_htlc<L: Deref>(
 +      fn send_htlc<F: Deref, L: Deref>(
                &mut self, amount_msat: u64, payment_hash: PaymentHash, cltv_expiry: u32, source: HTLCSource,
                onion_routing_packet: msgs::OnionPacket, mut force_holding_cell: bool,
 -              skimmed_fee_msat: Option<u64>, logger: &L
 -      ) -> Result<Option<msgs::UpdateAddHTLC>, ChannelError> where L::Target: Logger {
 +              skimmed_fee_msat: Option<u64>, fee_estimator: &LowerBoundedFeeEstimator<F>, logger: &L
 +      ) -> Result<Option<msgs::UpdateAddHTLC>, ChannelError>
 +      where F::Target: FeeEstimator, L::Target: Logger
 +      {
                if (self.context.channel_state & (ChannelState::ChannelReady as u32 | BOTH_SIDES_SHUTDOWN_MASK)) != (ChannelState::ChannelReady as u32) {
                        return Err(ChannelError::Ignore("Cannot send HTLC until channel is fully established and we haven't started shutting down".to_owned()));
                }
                        return Err(ChannelError::Ignore("Cannot send 0-msat HTLC".to_owned()));
                }
  
 -              let available_balances = self.context.get_available_balances();
 +              let available_balances = self.context.get_available_balances(fee_estimator);
                if amount_msat < available_balances.next_outbound_htlc_minimum_msat {
                        return Err(ChannelError::Ignore(format!("Cannot send less than our next-HTLC minimum - {} msat",
                                available_balances.next_outbound_htlc_minimum_msat)));
        ///
        /// Shorthand for calling [`Self::send_htlc`] followed by a commitment update, see docs on
        /// [`Self::send_htlc`] and [`Self::build_commitment_no_state_update`] for more info.
 -      pub fn send_htlc_and_commit<L: Deref>(
 -              &mut self, amount_msat: u64, payment_hash: PaymentHash, cltv_expiry: u32, source: HTLCSource,
 -              onion_routing_packet: msgs::OnionPacket, skimmed_fee_msat: Option<u64>, logger: &L
 -      ) -> Result<Option<ChannelMonitorUpdate>, ChannelError> where L::Target: Logger {
 +      pub fn send_htlc_and_commit<F: Deref, L: Deref>(
 +              &mut self, amount_msat: u64, payment_hash: PaymentHash, cltv_expiry: u32,
 +              source: HTLCSource, onion_routing_packet: msgs::OnionPacket, skimmed_fee_msat: Option<u64>,
 +              fee_estimator: &LowerBoundedFeeEstimator<F>, logger: &L
 +      ) -> Result<Option<ChannelMonitorUpdate>, ChannelError>
 +      where F::Target: FeeEstimator, L::Target: Logger
 +      {
                let send_res = self.send_htlc(amount_msat, payment_hash, cltv_expiry, source,
 -                      onion_routing_packet, false, skimmed_fee_msat, logger);
 +                      onion_routing_packet, false, skimmed_fee_msat, fee_estimator, logger);
                if let Err(e) = &send_res { if let ChannelError::Ignore(_) = e {} else { debug_assert!(false, "Sending cannot trigger channel failure"); } }
                match send_res? {
                        Some(_) => {
@@@ -6856,11 -6781,11 +6856,11 @@@ impl<Signer: WriteableEcdsaChannelSigne
                        (5, self.context.config, required),
                        (6, serialized_holder_htlc_max_in_flight, option),
                        (7, self.context.shutdown_scriptpubkey, option),
-                       (8, self.context.blocked_monitor_updates, vec_type),
+                       (8, self.context.blocked_monitor_updates, optional_vec),
                        (9, self.context.target_closing_feerate_sats_per_kw, option),
-                       (11, self.context.monitor_pending_finalized_fulfills, vec_type),
+                       (11, self.context.monitor_pending_finalized_fulfills, required_vec),
                        (13, self.context.channel_creation_height, required),
-                       (15, preimages, vec_type),
+                       (15, preimages, required_vec),
                        (17, self.context.announcement_sigs_state, required),
                        (19, self.context.latest_inbound_scid_alias, option),
                        (21, self.context.outbound_scid_alias, required),
@@@ -7164,11 -7089,11 +7164,11 @@@ impl<'a, 'b, 'c, ES: Deref, SP: Deref> 
                        (5, config, option), // Note that if none is provided we will *not* overwrite the existing one.
                        (6, holder_max_htlc_value_in_flight_msat, option),
                        (7, shutdown_scriptpubkey, option),
-                       (8, blocked_monitor_updates, vec_type),
+                       (8, blocked_monitor_updates, optional_vec),
                        (9, target_closing_feerate_sats_per_kw, option),
-                       (11, monitor_pending_finalized_fulfills, vec_type),
+                       (11, monitor_pending_finalized_fulfills, optional_vec),
                        (13, channel_creation_height, option),
-                       (15, preimages_opt, vec_type),
+                       (15, preimages_opt, optional_vec),
                        (17, announcement_sigs_state, option),
                        (19, latest_inbound_scid_alias, option),
                        (21, outbound_scid_alias, option),
index b87be9cbb75cca9bc838592145ca924eee3d6a7a,b4b1d64acb1b0a26b99e40d382230ef087d74b98..7df125b9323b9c3987cbfa91cf7dbaf1114d3a7d
@@@ -45,7 -45,7 +45,7 @@@ use crate::ln::features::{ChannelFeatur
  #[cfg(any(feature = "_test_utils", test))]
  use crate::ln::features::InvoiceFeatures;
  use crate::routing::gossip::NetworkGraph;
- use crate::routing::router::{BlindedTail, DefaultRouter, InFlightHtlcs, Path, Payee, PaymentParameters, Route, RouteHop, RouteParameters, Router};
+ use crate::routing::router::{BlindedTail, DefaultRouter, InFlightHtlcs, Path, Payee, PaymentParameters, Route, RouteParameters, Router};
  use crate::routing::scoring::{ProbabilisticScorer, ProbabilisticScoringFeeParameters};
  use crate::ln::msgs;
  use crate::ln::onion_utils;
@@@ -507,19 -507,19 +507,19 @@@ struct ClaimablePayments 
  /// running normally, and specifically must be processed before any other non-background
  /// [`ChannelMonitorUpdate`]s are applied.
  enum BackgroundEvent {
 -      /// Handle a ChannelMonitorUpdate which closes the channel. This is only separated from
 -      /// [`Self::MonitorUpdateRegeneratedOnStartup`] as the maybe-non-closing variant needs a public
 -      /// key to handle channel resumption, whereas if the channel has been force-closed we do not
 -      /// need the counterparty node_id.
 +      /// Handle a ChannelMonitorUpdate which closes the channel or for an already-closed channel.
 +      /// This is only separated from [`Self::MonitorUpdateRegeneratedOnStartup`] as the
 +      /// maybe-non-closing variant needs a public key to handle channel resumption, whereas if the
 +      /// channel has been force-closed we do not need the counterparty node_id.
        ///
        /// Note that any such events are lost on shutdown, so in general they must be updates which
        /// are regenerated on startup.
 -      ClosingMonitorUpdateRegeneratedOnStartup((OutPoint, ChannelMonitorUpdate)),
 +      ClosedMonitorUpdateRegeneratedOnStartup((OutPoint, ChannelMonitorUpdate)),
        /// Handle a ChannelMonitorUpdate which may or may not close the channel and may unblock the
        /// channel to continue normal operation.
        ///
        /// In general this should be used rather than
 -      /// [`Self::ClosingMonitorUpdateRegeneratedOnStartup`], however in cases where the
 +      /// [`Self::ClosedMonitorUpdateRegeneratedOnStartup`], however in cases where the
        /// `counterparty_node_id` is not available as the channel has closed from a [`ChannelMonitor`]
        /// error the other variant is acceptable.
        ///
@@@ -1114,6 -1114,7 +1114,6 @@@ wher
        /// Notifier the lock contains sends out a notification when the lock is released.
        total_consistency_lock: RwLock<()>,
  
 -      #[cfg(debug_assertions)]
        background_events_processed_since_startup: AtomicBool,
  
        persistence_notifier: Notifier,
@@@ -1479,9 -1480,6 +1479,9 @@@ pub struct ChannelDetails 
        ///
        /// [`confirmations_required`]: ChannelDetails::confirmations_required
        pub is_channel_ready: bool,
 +      /// The stage of the channel's shutdown.
 +      /// `None` for `ChannelDetails` serialized on LDK versions prior to 0.0.116.
 +      pub channel_shutdown_state: Option<ChannelShutdownState>,
        /// True if the channel is (a) confirmed and channel_ready messages have been exchanged, (b)
        /// the peer is connected, and (c) the channel is not currently negotiating a shutdown.
        ///
@@@ -1521,13 -1519,10 +1521,13 @@@ impl ChannelDetails 
                self.short_channel_id.or(self.outbound_scid_alias)
        }
  
 -      fn from_channel_context<Signer: WriteableEcdsaChannelSigner>(context: &ChannelContext<Signer>,
 -              best_block_height: u32, latest_features: InitFeatures) -> Self {
 -
 -              let balance = context.get_available_balances();
 +      fn from_channel_context<Signer: WriteableEcdsaChannelSigner, F: Deref>(
 +              context: &ChannelContext<Signer>, best_block_height: u32, latest_features: InitFeatures,
 +              fee_estimator: &LowerBoundedFeeEstimator<F>
 +      ) -> Self
 +      where F::Target: FeeEstimator
 +      {
 +              let balance = context.get_available_balances(fee_estimator);
                let (to_remote_reserve_satoshis, to_self_reserve_satoshis) =
                        context.get_holder_counterparty_selected_channel_reserve_satoshis();
                ChannelDetails {
                        inbound_htlc_minimum_msat: Some(context.get_holder_htlc_minimum_msat()),
                        inbound_htlc_maximum_msat: context.get_holder_htlc_maximum_msat(),
                        config: Some(context.config()),
 +                      channel_shutdown_state: Some(context.shutdown_state()),
                }
        }
  }
  
 +#[derive(Clone, Copy, Debug, PartialEq, Eq)]
 +/// Further information on the details of the channel shutdown.
 +/// Upon channels being forced closed (i.e. commitment transaction confirmation detected
 +/// by `ChainMonitor`), ChannelShutdownState will be set to `ShutdownComplete` or
 +/// the channel will be removed shortly.
 +/// Also note, that in normal operation, peers could disconnect at any of these states
 +/// and require peer re-connection before making progress onto other states
 +pub enum ChannelShutdownState {
 +      /// Channel has not sent or received a shutdown message.
 +      NotShuttingDown,
 +      /// Local node has sent a shutdown message for this channel.
 +      ShutdownInitiated,
 +      /// Shutdown message exchanges have concluded and the channels are in the midst of
 +      /// resolving all existing open HTLCs before closing can continue.
 +      ResolvingHTLCs,
 +      /// All HTLCs have been resolved, nodes are currently negotiating channel close onchain fee rates.
 +      NegotiatingClosingFee,
 +      /// We've successfully negotiated a closing_signed dance. At this point `ChannelManager` is about
 +      /// to drop the channel.
 +      ShutdownComplete,
 +}
 +
  /// Used by [`ChannelManager::list_recent_payments`] to express the status of recent payments.
  /// These include payments that have yet to find a successful path, or have unresolved HTLCs.
  #[derive(Debug, PartialEq)]
@@@ -1916,7 -1888,9 +1916,7 @@@ macro_rules! handle_new_monitor_update 
                // update_maps_on_chan_removal needs to be able to take id_to_peer, so make sure we can in
                // any case so that it won't deadlock.
                debug_assert_ne!($self.id_to_peer.held_by_thread(), LockHeldState::HeldByThread);
 -              #[cfg(debug_assertions)] {
 -                      debug_assert!($self.background_events_processed_since_startup.load(Ordering::Acquire));
 -              }
 +              debug_assert!($self.background_events_processed_since_startup.load(Ordering::Acquire));
                match $update_res {
                        ChannelMonitorUpdateStatus::InProgress => {
                                log_debug!($self.logger, "ChannelMonitor update for {} in flight, holding messages until the update completes.",
@@@ -2020,8 -1994,6 +2020,8 @@@ macro_rules! process_events_body 
                                let mut pending_events = $self.pending_events.lock().unwrap();
                                pending_events.drain(..num_events);
                                processed_all_events = pending_events.is_empty();
 +                              // Note that `push_pending_forwards_ev` relies on `pending_events_processor` being
 +                              // updated here with the `pending_events` lock acquired.
                                $self.pending_events_processor.store(false, Ordering::Release);
                        }
  
@@@ -2110,6 -2082,7 +2110,6 @@@ wher
                        pending_events_processor: AtomicBool::new(false),
                        pending_background_events: Mutex::new(Vec::new()),
                        total_consistency_lock: RwLock::new(()),
 -                      #[cfg(debug_assertions)]
                        background_events_processed_since_startup: AtomicBool::new(false),
                        persistence_notifier: Notifier::new(),
  
                                let peer_state = &mut *peer_state_lock;
                                for (_channel_id, channel) in peer_state.channel_by_id.iter().filter(f) {
                                        let details = ChannelDetails::from_channel_context(&channel.context, best_block_height,
 -                                              peer_state.latest_features.clone());
 +                                              peer_state.latest_features.clone(), &self.fee_estimator);
                                        res.push(details);
                                }
                        }
                                let peer_state = &mut *peer_state_lock;
                                for (_channel_id, channel) in peer_state.channel_by_id.iter() {
                                        let details = ChannelDetails::from_channel_context(&channel.context, best_block_height,
 -                                              peer_state.latest_features.clone());
 +                                              peer_state.latest_features.clone(), &self.fee_estimator);
                                        res.push(details);
                                }
                                for (_channel_id, channel) in peer_state.inbound_v1_channel_by_id.iter() {
                                        let details = ChannelDetails::from_channel_context(&channel.context, best_block_height,
 -                                              peer_state.latest_features.clone());
 +                                              peer_state.latest_features.clone(), &self.fee_estimator);
                                        res.push(details);
                                }
                                for (_channel_id, channel) in peer_state.outbound_v1_channel_by_id.iter() {
                                        let details = ChannelDetails::from_channel_context(&channel.context, best_block_height,
 -                                              peer_state.latest_features.clone());
 +                                              peer_state.latest_features.clone(), &self.fee_estimator);
                                        res.push(details);
                                }
                        }
                        return peer_state.channel_by_id
                                .iter()
                                .map(|(_, channel)|
 -                                      ChannelDetails::from_channel_context(&channel.context, best_block_height, features.clone()))
 +                                      ChannelDetails::from_channel_context(&channel.context, best_block_height,
 +                                      features.clone(), &self.fee_estimator))
                                .collect();
                }
                vec![]
                                                session_priv: session_priv.clone(),
                                                first_hop_htlc_msat: htlc_msat,
                                                payment_id,
 -                                      }, onion_packet, None, &self.logger);
 +                                      }, onion_packet, None, &self.fee_estimator, &self.logger);
                                match break_chan_entry!(self, send_res, chan) {
                                        Some(monitor_update) => {
                                                match handle_new_monitor_update!(self, funding_txo, monitor_update, peer_state_lock, peer_state, per_peer_state, chan) {
        /// irrevocably committed to on our end. In such a case, do NOT retry the payment with a
        /// different route unless you intend to pay twice!
        ///
+       /// [`RouteHop`]: crate::routing::router::RouteHop
        /// [`Event::PaymentSent`]: events::Event::PaymentSent
        /// [`Event::PaymentFailed`]: events::Event::PaymentFailed
        /// [`UpdateHTLCs`]: events::MessageSendEvent::UpdateHTLCs
                                                                                });
                                                                                if let Err(e) = chan.get_mut().queue_add_htlc(outgoing_amt_msat,
                                                                                        payment_hash, outgoing_cltv_value, htlc_source.clone(),
 -                                                                                      onion_packet, skimmed_fee_msat, &self.logger)
 +                                                                                      onion_packet, skimmed_fee_msat, &self.fee_estimator,
 +                                                                                      &self.logger)
                                                                                {
                                                                                        if let ChannelError::Ignore(msg) = e {
                                                                                                log_trace!(self.logger, "Failed to forward HTLC with payment_hash {}: {}", log_bytes!(payment_hash.0), msg);
        fn process_background_events(&self) -> NotifyOption {
                debug_assert_ne!(self.total_consistency_lock.held_by_thread(), LockHeldState::NotHeldByThread);
  
 -              #[cfg(debug_assertions)]
                self.background_events_processed_since_startup.store(true, Ordering::Release);
  
                let mut background_events = Vec::new();
  
                for event in background_events.drain(..) {
                        match event {
 -                              BackgroundEvent::ClosingMonitorUpdateRegeneratedOnStartup((funding_txo, update)) => {
 +                              BackgroundEvent::ClosedMonitorUpdateRegeneratedOnStartup((funding_txo, update)) => {
                                        // The channel has already been closed, so no use bothering to care about the
                                        // monitor updating completing.
                                        let _ = self.chain_monitor.update_channel(funding_txo, &update);
                log_trace!(self.logger, "Channel {} qualifies for a feerate change from {} to {}.",
                        log_bytes!(chan_id[..]), chan.context.get_feerate_sat_per_1000_weight(), new_feerate);
  
 -              chan.queue_update_fee(new_feerate, &self.logger);
 +              chan.queue_update_fee(new_feerate, &self.fee_estimator, &self.logger);
                NotifyOption::DoPersist
        }
  
        -> Result<(), (PublicKey, MsgHandleErrInternal)> {
                //TODO: Delay the claimed_funds relaying just like we do outbound relay!
  
 +              // If we haven't yet run background events assume we're still deserializing and shouldn't
 +              // actually pass `ChannelMonitorUpdate`s to users yet. Instead, queue them up as
 +              // `BackgroundEvent`s.
 +              let during_init = !self.background_events_processed_since_startup.load(Ordering::Acquire);
 +
                {
                        let per_peer_state = self.per_peer_state.read().unwrap();
                        let chan_id = prev_hop.outpoint.to_channel_id();
                                                                log_bytes!(chan_id), action);
                                                        peer_state.monitor_update_blocked_actions.entry(chan_id).or_insert(Vec::new()).push(action);
                                                }
 -                                              let res = handle_new_monitor_update!(self, prev_hop.outpoint, monitor_update, peer_state_lock,
 -                                                      peer_state, per_peer_state, chan);
 -                                              if let Err(e) = res {
 -                                                      // TODO: This is a *critical* error - we probably updated the outbound edge
 -                                                      // of the HTLC's monitor with a preimage. We should retry this monitor
 -                                                      // update over and over again until morale improves.
 -                                                      log_error!(self.logger, "Failed to update channel monitor with preimage {:?}", payment_preimage);
 -                                                      return Err((counterparty_node_id, e));
 +                                              if !during_init {
 +                                                      let res = handle_new_monitor_update!(self, prev_hop.outpoint, monitor_update, peer_state_lock,
 +                                                              peer_state, per_peer_state, chan);
 +                                                      if let Err(e) = res {
 +                                                              // TODO: This is a *critical* error - we probably updated the outbound edge
 +                                                              // of the HTLC's monitor with a preimage. We should retry this monitor
 +                                                              // update over and over again until morale improves.
 +                                                              log_error!(self.logger, "Failed to update channel monitor with preimage {:?}", payment_preimage);
 +                                                              return Err((counterparty_node_id, e));
 +                                                      }
 +                                              } else {
 +                                                      // If we're running during init we cannot update a monitor directly -
 +                                                      // they probably haven't actually been loaded yet. Instead, push the
 +                                                      // monitor update as a background event.
 +                                                      self.pending_background_events.lock().unwrap().push(
 +                                                              BackgroundEvent::MonitorUpdateRegeneratedOnStartup {
 +                                                                      counterparty_node_id,
 +                                                                      funding_txo: prev_hop.outpoint,
 +                                                                      update: monitor_update.clone(),
 +                                                              });
                                                }
                                        }
                                        return Ok(());
                                payment_preimage,
                        }],
                };
 -              // We update the ChannelMonitor on the backward link, after
 -              // receiving an `update_fulfill_htlc` from the forward link.
 -              let update_res = self.chain_monitor.update_channel(prev_hop.outpoint, &preimage_update);
 -              if update_res != ChannelMonitorUpdateStatus::Completed {
 -                      // TODO: This needs to be handled somehow - if we receive a monitor update
 -                      // with a preimage we *must* somehow manage to propagate it to the upstream
 -                      // channel, or we must have an ability to receive the same event and try
 -                      // again on restart.
 -                      log_error!(self.logger, "Critical error: failed to update channel monitor with preimage {:?}: {:?}",
 -                              payment_preimage, update_res);
 +
 +              if !during_init {
 +                      // We update the ChannelMonitor on the backward link, after
 +                      // receiving an `update_fulfill_htlc` from the forward link.
 +                      let update_res = self.chain_monitor.update_channel(prev_hop.outpoint, &preimage_update);
 +                      if update_res != ChannelMonitorUpdateStatus::Completed {
 +                              // TODO: This needs to be handled somehow - if we receive a monitor update
 +                              // with a preimage we *must* somehow manage to propagate it to the upstream
 +                              // channel, or we must have an ability to receive the same event and try
 +                              // again on restart.
 +                              log_error!(self.logger, "Critical error: failed to update channel monitor with preimage {:?}: {:?}",
 +                                      payment_preimage, update_res);
 +                      }
 +              } else {
 +                      // If we're running during init we cannot update a monitor directly - they probably
 +                      // haven't actually been loaded yet. Instead, push the monitor update as a background
 +                      // event.
 +                      // Note that while it's safe to use `ClosedMonitorUpdateRegeneratedOnStartup` here (the
 +                      // channel is already closed) we need to ultimately handle the monitor update
 +                      // completion action only after we've completed the monitor update. This is the only
 +                      // way to guarantee this update *will* be regenerated on startup (otherwise if this was
 +                      // from a forwarded HTLC the downstream preimage may be deleted before we claim
 +                      // upstream). Thus, we need to transition to some new `BackgroundEvent` type which will
 +                      // complete the monitor update completion action from `completion_action`.
 +                      self.pending_background_events.lock().unwrap().push(
 +                              BackgroundEvent::ClosedMonitorUpdateRegeneratedOnStartup((
 +                                      prev_hop.outpoint, preimage_update,
 +                              )));
                }
                // Note that we do process the completion action here. This totally could be a
                // duplicate claim, but we have no way of knowing without interrogating the
        fn claim_funds_internal(&self, source: HTLCSource, payment_preimage: PaymentPreimage, forwarded_htlc_value_msat: Option<u64>, from_onchain: bool, next_channel_id: [u8; 32]) {
                match source {
                        HTLCSource::OutboundRoute { session_priv, payment_id, path, .. } => {
 +                              debug_assert!(self.background_events_processed_since_startup.load(Ordering::Acquire),
 +                                      "We don't support claim_htlc claims during startup - monitors may not be available yet");
                                self.pending_outbound_payments.claim_htlc(payment_id, payment_preimage, session_priv, path, from_onchain, &self.pending_events, &self.logger);
                        },
                        HTLCSource::PreviousHopData(hop_data) => {
                                                _ => pending_forward_info
                                        }
                                };
 -                              try_chan_entry!(self, chan.get_mut().update_add_htlc(&msg, pending_forward_info, create_pending_htlc_status, &self.logger), chan);
 +                              try_chan_entry!(self, chan.get_mut().update_add_htlc(&msg, pending_forward_info, create_pending_htlc_status, &self.fee_estimator, &self.logger), chan);
                        },
                        hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id))
                }
                }
        }
  
 -      // We only want to push a PendingHTLCsForwardable event if no others are queued.
        fn push_pending_forwards_ev(&self) {
                let mut pending_events = self.pending_events.lock().unwrap();
 -              let forward_ev_exists = pending_events.iter()
 -                      .find(|(ev, _)| if let events::Event::PendingHTLCsForwardable { .. } = ev { true } else { false })
 -                      .is_some();
 -              if !forward_ev_exists {
 -                      pending_events.push_back((events::Event::PendingHTLCsForwardable {
 -                              time_forwardable:
 -                                      Duration::from_millis(MIN_HTLC_RELAY_HOLDING_CELL_MILLIS),
 +              let is_processing_events = self.pending_events_processor.load(Ordering::Acquire);
 +              let num_forward_events = pending_events.iter().filter(|(ev, _)|
 +                      if let events::Event::PendingHTLCsForwardable { .. } = ev { true } else { false }
 +              ).count();
 +              // We only want to push a PendingHTLCsForwardable event if no others are queued. Processing
 +              // events is done in batches and they are not removed until we're done processing each
 +              // batch. Since handling a `PendingHTLCsForwardable` event will call back into the
 +              // `ChannelManager`, we'll still see the original forwarding event not removed. Phantom
 +              // payments will need an additional forwarding event before being claimed to make them look
 +              // real by taking more time.
 +              if (is_processing_events && num_forward_events <= 1) || num_forward_events < 1 {
 +                      pending_events.push_back((Event::PendingHTLCsForwardable {
 +                              time_forwardable: Duration::from_millis(MIN_HTLC_RELAY_HOLDING_CELL_MILLIS),
                        }, None));
                }
        }
  
        /// Checks whether [`ChannelMonitorUpdate`]s generated by the receipt of a remote
 -      /// [`msgs::RevokeAndACK`] should be held for the given channel until some other event
 +      /// [`msgs::RevokeAndACK`] should be held for the given channel until some other action
        /// completes. Note that this needs to happen in the same [`PeerState`] mutex as any release of
        /// the [`ChannelMonitorUpdate`] in question.
        fn raa_monitor_updates_held(&self,
                        match peer_state.channel_by_id.entry(msg.channel_id) {
                                hash_map::Entry::Occupied(mut chan) => {
                                        let funding_txo = chan.get().context.get_funding_txo();
 -                                      let (htlcs_to_fail, monitor_update_opt) = try_chan_entry!(self, chan.get_mut().revoke_and_ack(&msg, &self.logger), chan);
 +                                      let (htlcs_to_fail, monitor_update_opt) = try_chan_entry!(self, chan.get_mut().revoke_and_ack(&msg, &self.fee_estimator, &self.logger), chan);
                                        let res = if let Some(monitor_update) = monitor_update_opt {
                                                handle_new_monitor_update!(self, funding_txo.unwrap(), monitor_update,
                                                        peer_state_lock, peer_state, per_peer_state, chan).map(|_| ())
                                                let counterparty_node_id = chan.context.get_counterparty_node_id();
                                                let funding_txo = chan.context.get_funding_txo();
                                                let (monitor_opt, holding_cell_failed_htlcs) =
 -                                                      chan.maybe_free_holding_cell_htlcs(&self.logger);
 +                                                      chan.maybe_free_holding_cell_htlcs(&self.fee_estimator, &self.logger);
                                                if !holding_cell_failed_htlcs.is_empty() {
                                                        failed_htlcs.push((holding_cell_failed_htlcs, *channel_id, counterparty_node_id));
                                                }
        /// When something which was blocking a channel from updating its [`ChannelMonitor`] (e.g. an
        /// [`Event`] being handled) completes, this should be called to restore the channel to normal
        /// operation. It will double-check that nothing *else* is also blocking the same channel from
 -      /// making progress and then any blocked [`ChannelMonitorUpdate`]s fly.
 +      /// making progress and then let any blocked [`ChannelMonitorUpdate`]s fly.
        fn handle_monitor_update_release(&self, counterparty_node_id: PublicKey, channel_funding_outpoint: OutPoint, mut completed_blocker: Option<RAAMonitorUpdateBlockingAction>) {
                let mut errors = Vec::new();
                loop {
@@@ -7435,7 -7366,6 +7436,7 @@@ impl Writeable for ChannelDetails 
                        (35, self.inbound_htlc_maximum_msat, option),
                        (37, user_channel_id_high_opt, option),
                        (39, self.feerate_sat_per_1000_weight, option),
 +                      (41, self.channel_shutdown_state, option),
                });
                Ok(())
        }
@@@ -7473,7 -7403,6 +7474,7 @@@ impl Readable for ChannelDetails 
                        (35, inbound_htlc_maximum_msat, option),
                        (37, user_channel_id_high_opt, option),
                        (39, feerate_sat_per_1000_weight, option),
 +                      (41, channel_shutdown_state, option),
                });
  
                // `user_channel_id` used to be a single u64 value. In order to remain backwards compatible with
                        inbound_htlc_minimum_msat,
                        inbound_htlc_maximum_msat,
                        feerate_sat_per_1000_weight,
 +                      channel_shutdown_state,
                })
        }
  }
  
  impl_writeable_tlv_based!(PhantomRouteHints, {
-       (2, channels, vec_type),
+       (2, channels, required_vec),
        (4, phantom_scid, required),
        (6, real_node_pubkey, required),
  });
@@@ -7707,7 -7635,7 +7708,7 @@@ impl Readable for HTLCSource 
                        0 => {
                                let mut session_priv: crate::util::ser::RequiredWrapper<SecretKey> = crate::util::ser::RequiredWrapper(None);
                                let mut first_hop_htlc_msat: u64 = 0;
-                               let mut path_hops: Option<Vec<RouteHop>> = Some(Vec::new());
+                               let mut path_hops = Vec::new();
                                let mut payment_id = None;
                                let mut payment_params: Option<PaymentParameters> = None;
                                let mut blinded_tail: Option<BlindedTail> = None;
                                        (0, session_priv, required),
                                        (1, payment_id, option),
                                        (2, first_hop_htlc_msat, required),
-                                       (4, path_hops, vec_type),
+                                       (4, path_hops, required_vec),
                                        (5, payment_params, (option: ReadableArgs, 0)),
                                        (6, blinded_tail, option),
                                });
                                        // instead.
                                        payment_id = Some(PaymentId(*session_priv.0.unwrap().as_ref()));
                                }
-                               let path = Path { hops: path_hops.ok_or(DecodeError::InvalidValue)?, blinded_tail };
+                               let path = Path { hops: path_hops, blinded_tail };
                                if path.hops.len() == 0 {
                                        return Err(DecodeError::InvalidValue);
                                }
@@@ -7759,7 -7687,7 +7760,7 @@@ impl Writeable for HTLCSource 
                                        (1, payment_id_opt, option),
                                        (2, first_hop_htlc_msat, required),
                                        // 3 was previously used to write a PaymentSecret for the payment.
-                                       (4, path.hops, vec_type),
+                                       (4, path.hops, required_vec),
                                        (5, None::<PaymentParameters>, option), // payment_params in LDK versions prior to 0.0.115
                                        (6, path.blinded_tail, option),
                                 });
@@@ -8009,7 -7937,7 +8010,7 @@@ wher
                        (6, monitor_update_blocked_actions_per_peer, option),
                        (7, self.fake_scid_rand_bytes, required),
                        (8, if events_not_backwards_compatible { Some(&*events) } else { None }, option),
-                       (9, htlc_purposes, vec_type),
+                       (9, htlc_purposes, required_vec),
                        (10, in_flight_monitor_updates, option),
                        (11, self.probing_cookie_secret, required),
                        (13, htlc_onion_fields, optional_vec),
@@@ -8060,14 -7988,6 +8061,14 @@@ impl Readable for VecDeque<(Event, Opti
        }
  }
  
 +impl_writeable_tlv_based_enum!(ChannelShutdownState,
 +      (0, NotShuttingDown) => {},
 +      (2, ShutdownInitiated) => {},
 +      (4, ResolvingHTLCs) => {},
 +      (6, NegotiatingClosingFee) => {},
 +      (8, ShutdownComplete) => {}, ;
 +);
 +
  /// Arguments for the creation of a ChannelManager that are not deserialized.
  ///
  /// At a high-level, the process for deserializing a ChannelManager and resuming normal operation
@@@ -8333,7 -8253,7 +8334,7 @@@ wher
                                        update_id: CLOSED_CHANNEL_UPDATE_ID,
                                        updates: vec![ChannelMonitorUpdateStep::ChannelForceClosed { should_broadcast: true }],
                                };
 -                              close_background_events.push(BackgroundEvent::ClosingMonitorUpdateRegeneratedOnStartup((*funding_txo, monitor_update)));
 +                              close_background_events.push(BackgroundEvent::ClosedMonitorUpdateRegeneratedOnStartup((*funding_txo, monitor_update)));
                        }
                }
  
                        (6, monitor_update_blocked_actions_per_peer, option),
                        (7, fake_scid_rand_bytes, option),
                        (8, events_override, option),
-                       (9, claimable_htlc_purposes, vec_type),
+                       (9, claimable_htlc_purposes, optional_vec),
                        (10, in_flight_monitor_updates, option),
                        (11, probing_cookie_secret, option),
                        (13, claimable_htlc_onion_fields, optional_vec),
                // Note that we have to do the above replays before we push new monitor updates.
                pending_background_events.append(&mut close_background_events);
  
 +              // If there's any preimages for forwarded HTLCs hanging around in ChannelMonitors we
 +              // should ensure we try them again on the inbound edge. We put them here and do so after we
 +              // have a fully-constructed `ChannelManager` at the end.
 +              let mut pending_claims_to_replay = Vec::new();
 +
                {
                        // If we're tracking pending payments, ensure we haven't lost any by looking at the
                        // ChannelMonitor data for any channels for which we do not have authorative state
                        // We only rebuild the pending payments map if we were most recently serialized by
                        // 0.0.102+
                        for (_, monitor) in args.channel_monitors.iter() {
 -                              if id_to_peer.get(&monitor.get_funding_txo().0.to_channel_id()).is_none() {
 +                              let counterparty_opt = id_to_peer.get(&monitor.get_funding_txo().0.to_channel_id());
 +                              if counterparty_opt.is_none() {
                                        for (htlc_source, (htlc, _)) in monitor.get_pending_or_resolved_outbound_htlcs() {
                                                if let HTLCSource::OutboundRoute { payment_id, session_priv, path, .. } = htlc_source {
                                                        if path.hops.is_empty() {
                                                }
                                        }
                                }
 +
 +                              // Whether the downstream channel was closed or not, try to re-apply any payment
 +                              // preimages from it which may be needed in upstream channels for forwarded
 +                              // payments.
 +                              let outbound_claimed_htlcs_iter = monitor.get_all_current_outbound_htlcs()
 +                                      .into_iter()
 +                                      .filter_map(|(htlc_source, (htlc, preimage_opt))| {
 +                                              if let HTLCSource::PreviousHopData(_) = htlc_source {
 +                                                      if let Some(payment_preimage) = preimage_opt {
 +                                                              Some((htlc_source, payment_preimage, htlc.amount_msat,
 +                                                                      // Check if `counterparty_opt.is_none()` to see if the
 +                                                                      // downstream chan is closed (because we don't have a
 +                                                                      // channel_id -> peer map entry).
 +                                                                      counterparty_opt.is_none(),
 +                                                                      monitor.get_funding_txo().0.to_channel_id()))
 +                                                      } else { None }
 +                                              } else {
 +                                                      // If it was an outbound payment, we've handled it above - if a preimage
 +                                                      // came in and we persisted the `ChannelManager` we either handled it and
 +                                                      // are good to go or the channel force-closed - we don't have to handle the
 +                                                      // channel still live case here.
 +                                                      None
 +                                              }
 +                                      });
 +                              for tuple in outbound_claimed_htlcs_iter {
 +                                      pending_claims_to_replay.push(tuple);
 +                              }
                        }
                }
  
                        pending_events_processor: AtomicBool::new(false),
                        pending_background_events: Mutex::new(pending_background_events),
                        total_consistency_lock: RwLock::new(()),
 -                      #[cfg(debug_assertions)]
                        background_events_processed_since_startup: AtomicBool::new(false),
                        persistence_notifier: Notifier::new(),
  
                        channel_manager.fail_htlc_backwards_internal(&source, &payment_hash, &reason, receiver);
                }
  
 +              for (source, preimage, downstream_value, downstream_closed, downstream_chan_id) in pending_claims_to_replay {
 +                      // We use `downstream_closed` in place of `from_onchain` here just as a guess - we
 +                      // don't remember in the `ChannelMonitor` where we got a preimage from, but if the
 +                      // channel is closed we just assume that it probably came from an on-chain claim.
 +                      channel_manager.claim_funds_internal(source, preimage, Some(downstream_value),
 +                              downstream_closed, downstream_chan_id);
 +              }
 +
                //TODO: Broadcast channel update for closed channels, but only after we've made a
                //connection or two.
  
@@@ -10144,7 -10024,7 +10145,7 @@@ pub mod bench 
        use crate::routing::gossip::NetworkGraph;
        use crate::routing::router::{PaymentParameters, RouteParameters};
        use crate::util::test_utils;
 -      use crate::util::config::UserConfig;
 +      use crate::util::config::{UserConfig, MaxDustHTLCExposure};
  
        use bitcoin::hashes::Hash;
        use bitcoin::hashes::sha256::Hash as Sha256;
                let router = test_utils::TestRouter::new(Arc::new(NetworkGraph::new(network, &logger_a)), &scorer);
  
                let mut config: UserConfig = Default::default();
 +              config.channel_config.max_dust_htlc_exposure = MaxDustHTLCExposure::FeeRateMultiplier(5_000_000 / 253);
                config.channel_handshake_config.minimum_depth = 1;
  
                let chain_monitor_a = ChainMonitor::new(None, &tx_broadcaster, &logger_a, &fee_estimator, &persister_a);
index 5ef107e299cfc1d3d1de02dfa043fc0a58b5df3d,644772b81498bdcbc94251f067b4740176531471..b9b70e0a03165e49f71f7f8c9bf185fd0388d7a2
@@@ -10,7 -10,7 +10,7 @@@
  //! The [`NetworkGraph`] stores the network gossip and [`P2PGossipSync`] fetches it from peers
  
  use bitcoin::secp256k1::constants::PUBLIC_KEY_SIZE;
 -use bitcoin::secp256k1::PublicKey;
 +use bitcoin::secp256k1::{PublicKey, Verification};
  use bitcoin::secp256k1::Secp256k1;
  use bitcoin::secp256k1;
  
@@@ -409,29 -409,6 +409,29 @@@ macro_rules! get_pubkey_from_node_id 
        }
  }
  
 +/// Verifies the signature of a [`NodeAnnouncement`].
 +///
 +/// Returns an error if it is invalid.
 +pub fn verify_node_announcement<C: Verification>(msg: &NodeAnnouncement, secp_ctx: &Secp256k1<C>) -> Result<(), LightningError> {
 +      let msg_hash = hash_to_message!(&Sha256dHash::hash(&msg.contents.encode()[..])[..]);
 +      secp_verify_sig!(secp_ctx, &msg_hash, &msg.signature, &get_pubkey_from_node_id!(msg.contents.node_id, "node_announcement"), "node_announcement");
 +
 +      Ok(())
 +}
 +
 +/// Verifies all signatures included in a [`ChannelAnnouncement`].
 +///
 +/// Returns an error if one of the signatures is invalid.
 +pub fn verify_channel_announcement<C: Verification>(msg: &ChannelAnnouncement, secp_ctx: &Secp256k1<C>) -> Result<(), LightningError> {
 +      let msg_hash = hash_to_message!(&Sha256dHash::hash(&msg.contents.encode()[..])[..]);
 +      secp_verify_sig!(secp_ctx, &msg_hash, &msg.node_signature_1, &get_pubkey_from_node_id!(msg.contents.node_id_1, "channel_announcement"), "channel_announcement");
 +      secp_verify_sig!(secp_ctx, &msg_hash, &msg.node_signature_2, &get_pubkey_from_node_id!(msg.contents.node_id_2, "channel_announcement"), "channel_announcement");
 +      secp_verify_sig!(secp_ctx, &msg_hash, &msg.bitcoin_signature_1, &get_pubkey_from_node_id!(msg.contents.bitcoin_key_1, "channel_announcement"), "channel_announcement");
 +      secp_verify_sig!(secp_ctx, &msg_hash, &msg.bitcoin_signature_2, &get_pubkey_from_node_id!(msg.contents.bitcoin_key_2, "channel_announcement"), "channel_announcement");
 +
 +      Ok(())
 +}
 +
  impl<G: Deref<Target=NetworkGraph<L>>, U: Deref, L: Deref> RoutingMessageHandler for P2PGossipSync<G, U, L>
  where U::Target: UtxoLookup, L::Target: Logger
  {
@@@ -1143,26 -1120,26 +1143,26 @@@ impl Writeable for NodeAnnouncementInf
                        (4, self.rgb, required),
                        (6, self.alias, required),
                        (8, self.announcement_message, option),
-                       (10, empty_addresses, vec_type), // Versions prior to 0.0.115 require this field
+                       (10, empty_addresses, required_vec), // Versions prior to 0.0.115 require this field
                });
                Ok(())
        }
  }
  
  impl Readable for NodeAnnouncementInfo {
-     fn read<R: io::Read>(reader: &mut R) -> Result<Self, DecodeError> {
+       fn read<R: io::Read>(reader: &mut R) -> Result<Self, DecodeError> {
                _init_and_read_tlv_fields!(reader, {
                        (0, features, required),
                        (2, last_update, required),
                        (4, rgb, required),
                        (6, alias, required),
                        (8, announcement_message, option),
-                       (10, _addresses, vec_type), // deprecated, not used anymore
+                       (10, _addresses, optional_vec), // deprecated, not used anymore
                });
                let _: Option<Vec<NetAddress>> = _addresses;
                Ok(Self { features: features.0.unwrap(), last_update: last_update.0.unwrap(), rgb: rgb.0.unwrap(),
                        alias: alias.0.unwrap(), announcement_message })
-     }
+       }
  }
  
  /// A user-defined name for a node, which may be used when displaying the node in a graph.
@@@ -1228,7 -1205,7 +1228,7 @@@ impl Writeable for NodeInfo 
                write_tlv_fields!(writer, {
                        // Note that older versions of LDK wrote the lowest inbound fees here at type 0
                        (2, self.announcement_info, option),
-                       (4, self.channels, vec_type),
+                       (4, self.channels, required_vec),
                });
                Ok(())
        }
@@@ -1259,19 -1236,17 +1259,17 @@@ impl Readable for NodeInfo 
                // with zero inbound fees, causing that heuristic to provide little gain. Worse, because it
                // requires additional complexity and lookups during routing, it ends up being a
                // performance loss. Thus, we simply ignore the old field here and no longer track it.
-               let mut _lowest_inbound_channel_fees: Option<RoutingFees> = None;
-               let mut announcement_info_wrap: Option<NodeAnnouncementInfoDeserWrapper> = None;
-               _init_tlv_field_var!(channels, vec_type);
-               read_tlv_fields!(reader, {
+               _init_and_read_tlv_fields!(reader, {
                        (0, _lowest_inbound_channel_fees, option),
                        (2, announcement_info_wrap, upgradable_option),
-                       (4, channels, vec_type),
+                       (4, channels, required_vec),
                });
+               let _: Option<RoutingFees> = _lowest_inbound_channel_fees;
+               let announcement_info_wrap: Option<NodeAnnouncementInfoDeserWrapper> = announcement_info_wrap;
  
                Ok(NodeInfo {
                        announcement_info: announcement_info_wrap.map(|w| w.0),
-                       channels: _init_tlv_based_struct_field!(channels, vec_type),
+                       channels,
                })
        }
  }
@@@ -1426,7 -1401,8 +1424,7 @@@ impl<L: Deref> NetworkGraph<L> where L:
        /// RoutingMessageHandler implementation to call it indirectly. This may be useful to accept
        /// routing messages from a source using a protocol other than the lightning P2P protocol.
        pub fn update_node_from_announcement(&self, msg: &msgs::NodeAnnouncement) -> Result<(), LightningError> {
 -              let msg_hash = hash_to_message!(&Sha256dHash::hash(&msg.contents.encode()[..])[..]);
 -              secp_verify_sig!(self.secp_ctx, &msg_hash, &msg.signature, &get_pubkey_from_node_id!(msg.contents.node_id, "node_announcement"), "node_announcement");
 +              verify_node_announcement(msg, &self.secp_ctx)?;
                self.update_node_from_announcement_intern(&msg.contents, Some(&msg))
        }
  
        where
                U::Target: UtxoLookup,
        {
 -              let msg_hash = hash_to_message!(&Sha256dHash::hash(&msg.contents.encode()[..])[..]);
 -              secp_verify_sig!(self.secp_ctx, &msg_hash, &msg.node_signature_1, &get_pubkey_from_node_id!(msg.contents.node_id_1, "channel_announcement"), "channel_announcement");
 -              secp_verify_sig!(self.secp_ctx, &msg_hash, &msg.node_signature_2, &get_pubkey_from_node_id!(msg.contents.node_id_2, "channel_announcement"), "channel_announcement");
 -              secp_verify_sig!(self.secp_ctx, &msg_hash, &msg.bitcoin_signature_1, &get_pubkey_from_node_id!(msg.contents.bitcoin_key_1, "channel_announcement"), "channel_announcement");
 -              secp_verify_sig!(self.secp_ctx, &msg_hash, &msg.bitcoin_signature_2, &get_pubkey_from_node_id!(msg.contents.bitcoin_key_2, "channel_announcement"), "channel_announcement");
 +              verify_channel_announcement(msg, &self.secp_ctx)?;
                self.update_channel_from_unsigned_announcement_intern(&msg.contents, Some(msg), utxo_lookup)
        }
  
index a83af28fcd62f9cae1ae268011ea5218d274c136,8c00df99a7eed96cbb3fb4633b2622c53532dfa6..d3539579c50ae16d8a8d92eb806af6364243327d
@@@ -204,15 -204,6 +204,15 @@@ impl InFlightHtlcs 
                }
        }
  
 +      /// Adds a known HTLC given the public key of the HTLC source, target, and short channel
 +      /// id.
 +      pub fn add_inflight_htlc(&mut self, source: &NodeId, target: &NodeId, channel_scid: u64, used_msat: u64){
 +              self.0
 +                      .entry((channel_scid, source < target))
 +                      .and_modify(|used_liquidity_msat| *used_liquidity_msat += used_msat)
 +                      .or_insert(used_msat);
 +      }
 +
        /// Returns liquidity in msat given the public key of the HTLC source, target, and short channel
        /// id.
        pub fn used_liquidity_msat(&self, source: &NodeId, target: &NodeId, channel_scid: u64) -> Option<u64> {
@@@ -292,7 -283,7 +292,7 @@@ pub struct BlindedTail 
  }
  
  impl_writeable_tlv_based!(BlindedTail, {
-       (0, hops, vec_type),
+       (0, hops, required_vec),
        (2, blinding_point, required),
        (4, excess_final_cltv_expiry_delta, required),
        (6, final_value_msat, required),
@@@ -559,10 -550,10 +559,10 @@@ impl Writeable for PaymentParameters 
                        (1, self.max_total_cltv_expiry_delta, required),
                        (2, self.payee.features(), option),
                        (3, self.max_path_count, required),
-                       (4, *clear_hints, vec_type),
+                       (4, *clear_hints, required_vec),
                        (5, self.max_channel_saturation_power_of_half, required),
                        (6, self.expiry_time, option),
-                       (7, self.previously_failed_channels, vec_type),
+                       (7, self.previously_failed_channels, required_vec),
                        (8, *blinded_hints, optional_vec),
                        (9, self.payee.final_cltv_expiry_delta(), option),
                });
@@@ -577,14 -568,13 +577,13 @@@ impl ReadableArgs<u32> for PaymentParam
                        (1, max_total_cltv_expiry_delta, (default_value, DEFAULT_MAX_TOTAL_CLTV_EXPIRY_DELTA)),
                        (2, features, (option: ReadableArgs, payee_pubkey.is_some())),
                        (3, max_path_count, (default_value, DEFAULT_MAX_PATH_COUNT)),
-                       (4, route_hints, vec_type),
+                       (4, clear_route_hints, required_vec),
                        (5, max_channel_saturation_power_of_half, (default_value, DEFAULT_MAX_CHANNEL_SATURATION_POW_HALF)),
                        (6, expiry_time, option),
-                       (7, previously_failed_channels, vec_type),
+                       (7, previously_failed_channels, optional_vec),
                        (8, blinded_route_hints, optional_vec),
                        (9, final_cltv_expiry_delta, (default_value, default_final_cltv_expiry_delta)),
                });
-               let clear_route_hints = route_hints.unwrap_or(vec![]);
                let blinded_route_hints = blinded_route_hints.unwrap_or(vec![]);
                let payee = if blinded_route_hints.len() != 0 {
                        if clear_route_hints.len() != 0 || payee_pubkey.is_some() { return Err(DecodeError::InvalidValue) }
@@@ -2696,8 -2686,7 +2695,8 @@@ mod tests 
                        inbound_htlc_minimum_msat: None,
                        inbound_htlc_maximum_msat: None,
                        config: None,
 -                      feerate_sat_per_1000_weight: None
 +                      feerate_sat_per_1000_weight: None,
 +                      channel_shutdown_state: Some(channelmanager::ChannelShutdownState::NotShuttingDown),
                }
        }
  
@@@ -6768,7 -6757,6 +6767,7 @@@ pub(crate) mod bench_utils 
                        inbound_htlc_maximum_msat: None,
                        config: None,
                        feerate_sat_per_1000_weight: None,
 +                      channel_shutdown_state: Some(channelmanager::ChannelShutdownState::NotShuttingDown),
                }
        }
  
index 742ea25714dafb4d291f249e30376366cfeebbaa,710085e2bbd5fa91cff4fea09d3065eda919bb50..1744b923d5e92589eee665c5a1ef1e263b6cfd8b
  //! [`Readable`]: crate::util::ser::Readable
  //! [`Writeable`]: crate::util::ser::Writeable
  
+ // There are quite a few TLV serialization "types" which behave differently. We currently only
+ // publicly document the `optional` and `required` types, not supporting anything else publicly and
+ // changing them at will.
+ //
+ // Some of the other types include:
+ //  * (default_value, $default) - reads optionally, reading $default if no TLV is present
+ //  * (static_value, $value) - ignores any TLVs, always using $value
+ //  * required_vec - reads into a Vec without a length prefix, failing if no TLV is present.
+ //  * optional_vec - reads into an Option<Vec> without a length prefix, continuing if no TLV is
+ //                   present. Writes from a Vec directly, only if any elements are present. Note
+ //                   that the struct deserialization macros return a Vec, not an Option.
+ //  * upgradable_option - reads via MaybeReadable.
+ //  * upgradable_required - reads via MaybeReadable, requiring a TLV be present but may return None
+ //                          if MaybeReadable::read() returns None.
  /// Implements serialization for a single TLV record.
  /// This is exported for use by other exported macros, do not use directly.
  #[doc(hidden)]
@@@ -29,7 -44,7 +44,7 @@@ macro_rules! _encode_tlv 
                BigSize($field.serialized_length() as u64).write($stream)?;
                $field.write($stream)?;
        };
-       ($stream: expr, $type: expr, $field: expr, vec_type) => {
+       ($stream: expr, $type: expr, $field: expr, required_vec) => {
                $crate::_encode_tlv!($stream, $type, $crate::util::ser::WithoutLength(&$field), required);
        };
        ($stream: expr, $optional_type: expr, $optional_field: expr, option) => {
@@@ -41,7 -56,7 +56,7 @@@
        };
        ($stream: expr, $type: expr, $field: expr, optional_vec) => {
                if !$field.is_empty() {
-                       $crate::_encode_tlv!($stream, $type, $field, vec_type);
+                       $crate::_encode_tlv!($stream, $type, $field, required_vec);
                }
        };
        ($stream: expr, $type: expr, $field: expr, upgradable_required) => {
@@@ -159,7 -174,7 +174,7 @@@ macro_rules! _get_varint_length_prefixe
                BigSize(field_len as u64).write(&mut $len).expect("No in-memory data may fail to serialize");
                $len.0 += field_len;
        };
-       ($len: expr, $type: expr, $field: expr, vec_type) => {
+       ($len: expr, $type: expr, $field: expr, required_vec) => {
                $crate::_get_varint_length_prefixed_tlv_length!($len, $type, $crate::util::ser::WithoutLength(&$field), required);
        };
        ($len: expr, $optional_type: expr, $optional_field: expr, option) => {
        };
        ($len: expr, $type: expr, $field: expr, optional_vec) => {
                if !$field.is_empty() {
-                       $crate::_get_varint_length_prefixed_tlv_length!($len, $type, $field, vec_type);
+                       $crate::_get_varint_length_prefixed_tlv_length!($len, $type, $field, required_vec);
                }
        };
        ($len: expr, $type: expr, $field: expr, (option: $trait: ident $(, $read_arg: expr)?)) => {
@@@ -236,8 -251,8 +251,8 @@@ macro_rules! _check_decoded_tlv_order 
        ($last_seen_type: expr, $typ: expr, $type: expr, $field: ident, option) => {{
                // no-op
        }};
-       ($last_seen_type: expr, $typ: expr, $type: expr, $field: ident, vec_type) => {{
-               // no-op
+       ($last_seen_type: expr, $typ: expr, $type: expr, $field: ident, required_vec) => {{
+               $crate::_check_decoded_tlv_order!($last_seen_type, $typ, $type, $field, required);
        }};
        ($last_seen_type: expr, $typ: expr, $type: expr, $field: ident, optional_vec) => {{
                // no-op
@@@ -281,8 -296,8 +296,8 @@@ macro_rules! _check_missing_tlv 
        ($last_seen_type: expr, $type: expr, $field: ident, (required: $trait: ident $(, $read_arg: expr)?)) => {{
                $crate::_check_missing_tlv!($last_seen_type, $type, $field, required);
        }};
-       ($last_seen_type: expr, $type: expr, $field: ident, vec_type) => {{
-               // no-op
+       ($last_seen_type: expr, $type: expr, $field: ident, required_vec) => {{
+               $crate::_check_missing_tlv!($last_seen_type, $type, $field, required);
        }};
        ($last_seen_type: expr, $type: expr, $field: ident, option) => {{
                // no-op
@@@ -320,15 -335,16 +335,16 @@@ macro_rules! _decode_tlv 
        ($reader: expr, $field: ident, (required: $trait: ident $(, $read_arg: expr)?)) => {{
                $field = $trait::read(&mut $reader $(, $read_arg)*)?;
        }};
-       ($reader: expr, $field: ident, vec_type) => {{
+       ($reader: expr, $field: ident, required_vec) => {{
                let f: $crate::util::ser::WithoutLength<Vec<_>> = $crate::util::ser::Readable::read(&mut $reader)?;
-               $field = Some(f.0);
+               $field = f.0;
        }};
        ($reader: expr, $field: ident, option) => {{
                $field = Some($crate::util::ser::Readable::read(&mut $reader)?);
        }};
        ($reader: expr, $field: ident, optional_vec) => {{
-               $crate::_decode_tlv!($reader, $field, vec_type);
+               let f: $crate::util::ser::WithoutLength<Vec<_>> = $crate::util::ser::Readable::read(&mut $reader)?;
+               $field = Some(f.0);
        }};
        // `upgradable_required` indicates we're reading a required TLV that may have been upgraded
        // without backwards compat. We'll error if the field is missing, and return `Ok(None)` if the
@@@ -694,8 -710,8 +710,8 @@@ macro_rules! _init_tlv_based_struct_fie
        ($field: ident, required) => {
                $field.0.unwrap()
        };
-       ($field: ident, vec_type) => {
-               $field.unwrap()
+       ($field: ident, required_vec) => {
+               $field
        };
        ($field: ident, optional_vec) => {
                $field.unwrap()
@@@ -720,8 -736,8 +736,8 @@@ macro_rules! _init_tlv_field_var 
        ($field: ident, (required: $trait: ident $(, $read_arg: expr)?)) => {
                $crate::_init_tlv_field_var!($field, required);
        };
-       ($field: ident, vec_type) => {
-               let mut $field = Some(Vec::new());
+       ($field: ident, required_vec) => {
+               let mut $field = Vec::new();
        };
        ($field: ident, option) => {
                let mut $field = None;
@@@ -981,7 -997,7 +997,7 @@@ macro_rules! impl_writeable_tlv_based_e
                                                f()
                                        }),*
                                        $($tuple_variant_id => {
 -                                              Ok($st::$tuple_variant_name(Readable::read(reader)?))
 +                                              Ok($st::$tuple_variant_name($crate::util::ser::Readable::read(reader)?))
                                        }),*
                                        _ => {
                                                Err($crate::ln::msgs::DecodeError::UnknownRequiredFeature)