Merge pull request #2395 from wpaulino/phantom-deduped-forward-event
authorElias Rohrer <dev@tnull.de>
Tue, 11 Jul 2023 07:31:37 +0000 (09:31 +0200)
committerGitHub <noreply@github.com>
Tue, 11 Jul 2023 07:31:37 +0000 (09:31 +0200)
Force enqueue second forward event for phantom receives

1  2 
lightning/src/ln/channelmanager.rs
lightning/src/ln/functional_test_utils.rs

index 61895544e8f404e1e0fda5be12b95cb5cbdf1330,2cbda06684915a882ee74475368bbd51c20d59bd..b87be9cbb75cca9bc838592145ca924eee3d6a7a
@@@ -507,19 -507,19 +507,19 @@@ struct ClaimablePayments 
  /// running normally, and specifically must be processed before any other non-background
  /// [`ChannelMonitorUpdate`]s are applied.
  enum BackgroundEvent {
 -      /// Handle a ChannelMonitorUpdate which closes the channel. This is only separated from
 -      /// [`Self::MonitorUpdateRegeneratedOnStartup`] as the maybe-non-closing variant needs a public
 -      /// key to handle channel resumption, whereas if the channel has been force-closed we do not
 -      /// need the counterparty node_id.
 +      /// Handle a ChannelMonitorUpdate which closes the channel or for an already-closed channel.
 +      /// This is only separated from [`Self::MonitorUpdateRegeneratedOnStartup`] as the
 +      /// maybe-non-closing variant needs a public key to handle channel resumption, whereas if the
 +      /// channel has been force-closed we do not need the counterparty node_id.
        ///
        /// Note that any such events are lost on shutdown, so in general they must be updates which
        /// are regenerated on startup.
 -      ClosingMonitorUpdateRegeneratedOnStartup((OutPoint, ChannelMonitorUpdate)),
 +      ClosedMonitorUpdateRegeneratedOnStartup((OutPoint, ChannelMonitorUpdate)),
        /// Handle a ChannelMonitorUpdate which may or may not close the channel and may unblock the
        /// channel to continue normal operation.
        ///
        /// In general this should be used rather than
 -      /// [`Self::ClosingMonitorUpdateRegeneratedOnStartup`], however in cases where the
 +      /// [`Self::ClosedMonitorUpdateRegeneratedOnStartup`], however in cases where the
        /// `counterparty_node_id` is not available as the channel has closed from a [`ChannelMonitor`]
        /// error the other variant is acceptable.
        ///
@@@ -752,23 -752,7 +752,23 @@@ pub type SimpleArcChannelManager<M, T, 
  /// of [`KeysManager`] and [`DefaultRouter`].
  ///
  /// This is not exported to bindings users as Arcs don't make sense in bindings
 -pub type SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, 'h, M, T, F, L> = ChannelManager<&'a M, &'b T, &'c KeysManager, &'c KeysManager, &'c KeysManager, &'d F, &'e DefaultRouter<&'f NetworkGraph<&'g L>, &'g L, &'h Mutex<ProbabilisticScorer<&'f NetworkGraph<&'g L>, &'g L>>, ProbabilisticScoringFeeParameters, ProbabilisticScorer<&'f NetworkGraph<&'g L>, &'g L>>, &'g L>;
 +pub type SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, 'h, M, T, F, L> =
 +      ChannelManager<
 +              &'a M,
 +              &'b T,
 +              &'c KeysManager,
 +              &'c KeysManager,
 +              &'c KeysManager,
 +              &'d F,
 +              &'e DefaultRouter<
 +                      &'f NetworkGraph<&'g L>,
 +                      &'g L,
 +                      &'h Mutex<ProbabilisticScorer<&'f NetworkGraph<&'g L>, &'g L>>,
 +                      ProbabilisticScoringFeeParameters,
 +                      ProbabilisticScorer<&'f NetworkGraph<&'g L>, &'g L>
 +              >,
 +              &'g L
 +      >;
  
  macro_rules! define_test_pub_trait { ($vis: vis) => {
  /// A trivial trait which describes any [`ChannelManager`] used in testing.
@@@ -1114,6 -1098,7 +1114,6 @@@ wher
        /// Notifier the lock contains sends out a notification when the lock is released.
        total_consistency_lock: RwLock<()>,
  
 -      #[cfg(debug_assertions)]
        background_events_processed_since_startup: AtomicBool,
  
        persistence_notifier: Notifier,
@@@ -1479,9 -1464,6 +1479,9 @@@ pub struct ChannelDetails 
        ///
        /// [`confirmations_required`]: ChannelDetails::confirmations_required
        pub is_channel_ready: bool,
 +      /// The stage of the channel's shutdown.
 +      /// `None` for `ChannelDetails` serialized on LDK versions prior to 0.0.116.
 +      pub channel_shutdown_state: Option<ChannelShutdownState>,
        /// True if the channel is (a) confirmed and channel_ready messages have been exchanged, (b)
        /// the peer is connected, and (c) the channel is not currently negotiating a shutdown.
        ///
@@@ -1521,13 -1503,10 +1521,13 @@@ impl ChannelDetails 
                self.short_channel_id.or(self.outbound_scid_alias)
        }
  
 -      fn from_channel_context<Signer: WriteableEcdsaChannelSigner>(context: &ChannelContext<Signer>,
 -              best_block_height: u32, latest_features: InitFeatures) -> Self {
 -
 -              let balance = context.get_available_balances();
 +      fn from_channel_context<Signer: WriteableEcdsaChannelSigner, F: Deref>(
 +              context: &ChannelContext<Signer>, best_block_height: u32, latest_features: InitFeatures,
 +              fee_estimator: &LowerBoundedFeeEstimator<F>
 +      ) -> Self
 +      where F::Target: FeeEstimator
 +      {
 +              let balance = context.get_available_balances(fee_estimator);
                let (to_remote_reserve_satoshis, to_self_reserve_satoshis) =
                        context.get_holder_counterparty_selected_channel_reserve_satoshis();
                ChannelDetails {
                        inbound_htlc_minimum_msat: Some(context.get_holder_htlc_minimum_msat()),
                        inbound_htlc_maximum_msat: context.get_holder_htlc_maximum_msat(),
                        config: Some(context.config()),
 +                      channel_shutdown_state: Some(context.shutdown_state()),
                }
        }
  }
  
 +#[derive(Clone, Copy, Debug, PartialEq, Eq)]
 +/// Further information on the details of the channel shutdown.
 +/// Upon channels being forced closed (i.e. commitment transaction confirmation detected
 +/// by `ChainMonitor`), ChannelShutdownState will be set to `ShutdownComplete` or
 +/// the channel will be removed shortly.
 +/// Also note, that in normal operation, peers could disconnect at any of these states
 +/// and require peer re-connection before making progress onto other states
 +pub enum ChannelShutdownState {
 +      /// Channel has not sent or received a shutdown message.
 +      NotShuttingDown,
 +      /// Local node has sent a shutdown message for this channel.
 +      ShutdownInitiated,
 +      /// Shutdown message exchanges have concluded and the channels are in the midst of
 +      /// resolving all existing open HTLCs before closing can continue.
 +      ResolvingHTLCs,
 +      /// All HTLCs have been resolved, nodes are currently negotiating channel close onchain fee rates.
 +      NegotiatingClosingFee,
 +      /// We've successfully negotiated a closing_signed dance. At this point `ChannelManager` is about
 +      /// to drop the channel.
 +      ShutdownComplete,
 +}
 +
  /// Used by [`ChannelManager::list_recent_payments`] to express the status of recent payments.
  /// These include payments that have yet to find a successful path, or have unresolved HTLCs.
  #[derive(Debug, PartialEq)]
@@@ -1916,7 -1872,9 +1916,7 @@@ macro_rules! handle_new_monitor_update 
                // update_maps_on_chan_removal needs to be able to take id_to_peer, so make sure we can in
                // any case so that it won't deadlock.
                debug_assert_ne!($self.id_to_peer.held_by_thread(), LockHeldState::HeldByThread);
 -              #[cfg(debug_assertions)] {
 -                      debug_assert!($self.background_events_processed_since_startup.load(Ordering::Acquire));
 -              }
 +              debug_assert!($self.background_events_processed_since_startup.load(Ordering::Acquire));
                match $update_res {
                        ChannelMonitorUpdateStatus::InProgress => {
                                log_debug!($self.logger, "ChannelMonitor update for {} in flight, holding messages until the update completes.",
@@@ -2020,6 -1978,8 +2020,8 @@@ macro_rules! process_events_body 
                                let mut pending_events = $self.pending_events.lock().unwrap();
                                pending_events.drain(..num_events);
                                processed_all_events = pending_events.is_empty();
+                               // Note that `push_pending_forwards_ev` relies on `pending_events_processor` being
+                               // updated here with the `pending_events` lock acquired.
                                $self.pending_events_processor.store(false, Ordering::Release);
                        }
  
@@@ -2108,6 -2068,7 +2110,6 @@@ wher
                        pending_events_processor: AtomicBool::new(false),
                        pending_background_events: Mutex::new(Vec::new()),
                        total_consistency_lock: RwLock::new(()),
 -                      #[cfg(debug_assertions)]
                        background_events_processed_since_startup: AtomicBool::new(false),
                        persistence_notifier: Notifier::new(),
  
                                let peer_state = &mut *peer_state_lock;
                                for (_channel_id, channel) in peer_state.channel_by_id.iter().filter(f) {
                                        let details = ChannelDetails::from_channel_context(&channel.context, best_block_height,
 -                                              peer_state.latest_features.clone());
 +                                              peer_state.latest_features.clone(), &self.fee_estimator);
                                        res.push(details);
                                }
                        }
                                let peer_state = &mut *peer_state_lock;
                                for (_channel_id, channel) in peer_state.channel_by_id.iter() {
                                        let details = ChannelDetails::from_channel_context(&channel.context, best_block_height,
 -                                              peer_state.latest_features.clone());
 +                                              peer_state.latest_features.clone(), &self.fee_estimator);
                                        res.push(details);
                                }
                                for (_channel_id, channel) in peer_state.inbound_v1_channel_by_id.iter() {
                                        let details = ChannelDetails::from_channel_context(&channel.context, best_block_height,
 -                                              peer_state.latest_features.clone());
 +                                              peer_state.latest_features.clone(), &self.fee_estimator);
                                        res.push(details);
                                }
                                for (_channel_id, channel) in peer_state.outbound_v1_channel_by_id.iter() {
                                        let details = ChannelDetails::from_channel_context(&channel.context, best_block_height,
 -                                              peer_state.latest_features.clone());
 +                                              peer_state.latest_features.clone(), &self.fee_estimator);
                                        res.push(details);
                                }
                        }
                        return peer_state.channel_by_id
                                .iter()
                                .map(|(_, channel)|
 -                                      ChannelDetails::from_channel_context(&channel.context, best_block_height, features.clone()))
 +                                      ChannelDetails::from_channel_context(&channel.context, best_block_height,
 +                                      features.clone(), &self.fee_estimator))
                                .collect();
                }
                vec![]
                                                session_priv: session_priv.clone(),
                                                first_hop_htlc_msat: htlc_msat,
                                                payment_id,
 -                                      }, onion_packet, None, &self.logger);
 +                                      }, onion_packet, None, &self.fee_estimator, &self.logger);
                                match break_chan_entry!(self, send_res, chan) {
                                        Some(monitor_update) => {
                                                match handle_new_monitor_update!(self, funding_txo, monitor_update, peer_state_lock, peer_state, per_peer_state, chan) {
                                                                                });
                                                                                if let Err(e) = chan.get_mut().queue_add_htlc(outgoing_amt_msat,
                                                                                        payment_hash, outgoing_cltv_value, htlc_source.clone(),
 -                                                                                      onion_packet, skimmed_fee_msat, &self.logger)
 +                                                                                      onion_packet, skimmed_fee_msat, &self.fee_estimator,
 +                                                                                      &self.logger)
                                                                                {
                                                                                        if let ChannelError::Ignore(msg) = e {
                                                                                                log_trace!(self.logger, "Failed to forward HTLC with payment_hash {}: {}", log_bytes!(payment_hash.0), msg);
        fn process_background_events(&self) -> NotifyOption {
                debug_assert_ne!(self.total_consistency_lock.held_by_thread(), LockHeldState::NotHeldByThread);
  
 -              #[cfg(debug_assertions)]
                self.background_events_processed_since_startup.store(true, Ordering::Release);
  
                let mut background_events = Vec::new();
  
                for event in background_events.drain(..) {
                        match event {
 -                              BackgroundEvent::ClosingMonitorUpdateRegeneratedOnStartup((funding_txo, update)) => {
 +                              BackgroundEvent::ClosedMonitorUpdateRegeneratedOnStartup((funding_txo, update)) => {
                                        // The channel has already been closed, so no use bothering to care about the
                                        // monitor updating completing.
                                        let _ = self.chain_monitor.update_channel(funding_txo, &update);
                log_trace!(self.logger, "Channel {} qualifies for a feerate change from {} to {}.",
                        log_bytes!(chan_id[..]), chan.context.get_feerate_sat_per_1000_weight(), new_feerate);
  
 -              chan.queue_update_fee(new_feerate, &self.logger);
 +              chan.queue_update_fee(new_feerate, &self.fee_estimator, &self.logger);
                NotifyOption::DoPersist
        }
  
        -> Result<(), (PublicKey, MsgHandleErrInternal)> {
                //TODO: Delay the claimed_funds relaying just like we do outbound relay!
  
 +              // If we haven't yet run background events assume we're still deserializing and shouldn't
 +              // actually pass `ChannelMonitorUpdate`s to users yet. Instead, queue them up as
 +              // `BackgroundEvent`s.
 +              let during_init = !self.background_events_processed_since_startup.load(Ordering::Acquire);
 +
                {
                        let per_peer_state = self.per_peer_state.read().unwrap();
                        let chan_id = prev_hop.outpoint.to_channel_id();
                                                                log_bytes!(chan_id), action);
                                                        peer_state.monitor_update_blocked_actions.entry(chan_id).or_insert(Vec::new()).push(action);
                                                }
 -                                              let res = handle_new_monitor_update!(self, prev_hop.outpoint, monitor_update, peer_state_lock,
 -                                                      peer_state, per_peer_state, chan);
 -                                              if let Err(e) = res {
 -                                                      // TODO: This is a *critical* error - we probably updated the outbound edge
 -                                                      // of the HTLC's monitor with a preimage. We should retry this monitor
 -                                                      // update over and over again until morale improves.
 -                                                      log_error!(self.logger, "Failed to update channel monitor with preimage {:?}", payment_preimage);
 -                                                      return Err((counterparty_node_id, e));
 +                                              if !during_init {
 +                                                      let res = handle_new_monitor_update!(self, prev_hop.outpoint, monitor_update, peer_state_lock,
 +                                                              peer_state, per_peer_state, chan);
 +                                                      if let Err(e) = res {
 +                                                              // TODO: This is a *critical* error - we probably updated the outbound edge
 +                                                              // of the HTLC's monitor with a preimage. We should retry this monitor
 +                                                              // update over and over again until morale improves.
 +                                                              log_error!(self.logger, "Failed to update channel monitor with preimage {:?}", payment_preimage);
 +                                                              return Err((counterparty_node_id, e));
 +                                                      }
 +                                              } else {
 +                                                      // If we're running during init we cannot update a monitor directly -
 +                                                      // they probably haven't actually been loaded yet. Instead, push the
 +                                                      // monitor update as a background event.
 +                                                      self.pending_background_events.lock().unwrap().push(
 +                                                              BackgroundEvent::MonitorUpdateRegeneratedOnStartup {
 +                                                                      counterparty_node_id,
 +                                                                      funding_txo: prev_hop.outpoint,
 +                                                                      update: monitor_update.clone(),
 +                                                              });
                                                }
                                        }
                                        return Ok(());
                                payment_preimage,
                        }],
                };
 -              // We update the ChannelMonitor on the backward link, after
 -              // receiving an `update_fulfill_htlc` from the forward link.
 -              let update_res = self.chain_monitor.update_channel(prev_hop.outpoint, &preimage_update);
 -              if update_res != ChannelMonitorUpdateStatus::Completed {
 -                      // TODO: This needs to be handled somehow - if we receive a monitor update
 -                      // with a preimage we *must* somehow manage to propagate it to the upstream
 -                      // channel, or we must have an ability to receive the same event and try
 -                      // again on restart.
 -                      log_error!(self.logger, "Critical error: failed to update channel monitor with preimage {:?}: {:?}",
 -                              payment_preimage, update_res);
 +
 +              if !during_init {
 +                      // We update the ChannelMonitor on the backward link, after
 +                      // receiving an `update_fulfill_htlc` from the forward link.
 +                      let update_res = self.chain_monitor.update_channel(prev_hop.outpoint, &preimage_update);
 +                      if update_res != ChannelMonitorUpdateStatus::Completed {
 +                              // TODO: This needs to be handled somehow - if we receive a monitor update
 +                              // with a preimage we *must* somehow manage to propagate it to the upstream
 +                              // channel, or we must have an ability to receive the same event and try
 +                              // again on restart.
 +                              log_error!(self.logger, "Critical error: failed to update channel monitor with preimage {:?}: {:?}",
 +                                      payment_preimage, update_res);
 +                      }
 +              } else {
 +                      // If we're running during init we cannot update a monitor directly - they probably
 +                      // haven't actually been loaded yet. Instead, push the monitor update as a background
 +                      // event.
 +                      // Note that while it's safe to use `ClosedMonitorUpdateRegeneratedOnStartup` here (the
 +                      // channel is already closed) we need to ultimately handle the monitor update
 +                      // completion action only after we've completed the monitor update. This is the only
 +                      // way to guarantee this update *will* be regenerated on startup (otherwise if this was
 +                      // from a forwarded HTLC the downstream preimage may be deleted before we claim
 +                      // upstream). Thus, we need to transition to some new `BackgroundEvent` type which will
 +                      // complete the monitor update completion action from `completion_action`.
 +                      self.pending_background_events.lock().unwrap().push(
 +                              BackgroundEvent::ClosedMonitorUpdateRegeneratedOnStartup((
 +                                      prev_hop.outpoint, preimage_update,
 +                              )));
                }
                // Note that we do process the completion action here. This totally could be a
                // duplicate claim, but we have no way of knowing without interrogating the
        fn claim_funds_internal(&self, source: HTLCSource, payment_preimage: PaymentPreimage, forwarded_htlc_value_msat: Option<u64>, from_onchain: bool, next_channel_id: [u8; 32]) {
                match source {
                        HTLCSource::OutboundRoute { session_priv, payment_id, path, .. } => {
 +                              debug_assert!(self.background_events_processed_since_startup.load(Ordering::Acquire),
 +                                      "We don't support claim_htlc claims during startup - monitors may not be available yet");
                                self.pending_outbound_payments.claim_htlc(payment_id, payment_preimage, session_priv, path, from_onchain, &self.pending_events, &self.logger);
                        },
                        HTLCSource::PreviousHopData(hop_data) => {
                                                _ => pending_forward_info
                                        }
                                };
 -                              try_chan_entry!(self, chan.get_mut().update_add_htlc(&msg, pending_forward_info, create_pending_htlc_status, &self.logger), chan);
 +                              try_chan_entry!(self, chan.get_mut().update_add_htlc(&msg, pending_forward_info, create_pending_htlc_status, &self.fee_estimator, &self.logger), chan);
                        },
                        hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id))
                }
                }
        }
  
-       // We only want to push a PendingHTLCsForwardable event if no others are queued.
        fn push_pending_forwards_ev(&self) {
                let mut pending_events = self.pending_events.lock().unwrap();
-               let forward_ev_exists = pending_events.iter()
-                       .find(|(ev, _)| if let events::Event::PendingHTLCsForwardable { .. } = ev { true } else { false })
-                       .is_some();
-               if !forward_ev_exists {
-                       pending_events.push_back((events::Event::PendingHTLCsForwardable {
-                               time_forwardable:
-                                       Duration::from_millis(MIN_HTLC_RELAY_HOLDING_CELL_MILLIS),
+               let is_processing_events = self.pending_events_processor.load(Ordering::Acquire);
+               let num_forward_events = pending_events.iter().filter(|(ev, _)|
+                       if let events::Event::PendingHTLCsForwardable { .. } = ev { true } else { false }
+               ).count();
+               // We only want to push a PendingHTLCsForwardable event if no others are queued. Processing
+               // events is done in batches and they are not removed until we're done processing each
+               // batch. Since handling a `PendingHTLCsForwardable` event will call back into the
+               // `ChannelManager`, we'll still see the original forwarding event not removed. Phantom
+               // payments will need an additional forwarding event before being claimed to make them look
+               // real by taking more time.
+               if (is_processing_events && num_forward_events <= 1) || num_forward_events < 1 {
+                       pending_events.push_back((Event::PendingHTLCsForwardable {
+                               time_forwardable: Duration::from_millis(MIN_HTLC_RELAY_HOLDING_CELL_MILLIS),
                        }, None));
                }
        }
  
        /// Checks whether [`ChannelMonitorUpdate`]s generated by the receipt of a remote
 -      /// [`msgs::RevokeAndACK`] should be held for the given channel until some other event
 +      /// [`msgs::RevokeAndACK`] should be held for the given channel until some other action
        /// completes. Note that this needs to happen in the same [`PeerState`] mutex as any release of
        /// the [`ChannelMonitorUpdate`] in question.
        fn raa_monitor_updates_held(&self,
                        match peer_state.channel_by_id.entry(msg.channel_id) {
                                hash_map::Entry::Occupied(mut chan) => {
                                        let funding_txo = chan.get().context.get_funding_txo();
 -                                      let (htlcs_to_fail, monitor_update_opt) = try_chan_entry!(self, chan.get_mut().revoke_and_ack(&msg, &self.logger), chan);
 +                                      let (htlcs_to_fail, monitor_update_opt) = try_chan_entry!(self, chan.get_mut().revoke_and_ack(&msg, &self.fee_estimator, &self.logger), chan);
                                        let res = if let Some(monitor_update) = monitor_update_opt {
                                                handle_new_monitor_update!(self, funding_txo.unwrap(), monitor_update,
                                                        peer_state_lock, peer_state, per_peer_state, chan).map(|_| ())
                                                let counterparty_node_id = chan.context.get_counterparty_node_id();
                                                let funding_txo = chan.context.get_funding_txo();
                                                let (monitor_opt, holding_cell_failed_htlcs) =
 -                                                      chan.maybe_free_holding_cell_htlcs(&self.logger);
 +                                                      chan.maybe_free_holding_cell_htlcs(&self.fee_estimator, &self.logger);
                                                if !holding_cell_failed_htlcs.is_empty() {
                                                        failed_htlcs.push((holding_cell_failed_htlcs, *channel_id, counterparty_node_id));
                                                }
        /// When something which was blocking a channel from updating its [`ChannelMonitor`] (e.g. an
        /// [`Event`] being handled) completes, this should be called to restore the channel to normal
        /// operation. It will double-check that nothing *else* is also blocking the same channel from
 -      /// making progress and then any blocked [`ChannelMonitorUpdate`]s fly.
 +      /// making progress and then let any blocked [`ChannelMonitorUpdate`]s fly.
        fn handle_monitor_update_release(&self, counterparty_node_id: PublicKey, channel_funding_outpoint: OutPoint, mut completed_blocker: Option<RAAMonitorUpdateBlockingAction>) {
                let mut errors = Vec::new();
                loop {
@@@ -7428,7 -7356,6 +7435,7 @@@ impl Writeable for ChannelDetails 
                        (35, self.inbound_htlc_maximum_msat, option),
                        (37, user_channel_id_high_opt, option),
                        (39, self.feerate_sat_per_1000_weight, option),
 +                      (41, self.channel_shutdown_state, option),
                });
                Ok(())
        }
@@@ -7466,7 -7393,6 +7473,7 @@@ impl Readable for ChannelDetails 
                        (35, inbound_htlc_maximum_msat, option),
                        (37, user_channel_id_high_opt, option),
                        (39, feerate_sat_per_1000_weight, option),
 +                      (41, channel_shutdown_state, option),
                });
  
                // `user_channel_id` used to be a single u64 value. In order to remain backwards compatible with
                        inbound_htlc_minimum_msat,
                        inbound_htlc_maximum_msat,
                        feerate_sat_per_1000_weight,
 +                      channel_shutdown_state,
                })
        }
  }
@@@ -8053,14 -7978,6 +8060,14 @@@ impl Readable for VecDeque<(Event, Opti
        }
  }
  
 +impl_writeable_tlv_based_enum!(ChannelShutdownState,
 +      (0, NotShuttingDown) => {},
 +      (2, ShutdownInitiated) => {},
 +      (4, ResolvingHTLCs) => {},
 +      (6, NegotiatingClosingFee) => {},
 +      (8, ShutdownComplete) => {}, ;
 +);
 +
  /// Arguments for the creation of a ChannelManager that are not deserialized.
  ///
  /// At a high-level, the process for deserializing a ChannelManager and resuming normal operation
@@@ -8326,7 -8243,7 +8333,7 @@@ wher
                                        update_id: CLOSED_CHANNEL_UPDATE_ID,
                                        updates: vec![ChannelMonitorUpdateStep::ChannelForceClosed { should_broadcast: true }],
                                };
 -                              close_background_events.push(BackgroundEvent::ClosingMonitorUpdateRegeneratedOnStartup((*funding_txo, monitor_update)));
 +                              close_background_events.push(BackgroundEvent::ClosedMonitorUpdateRegeneratedOnStartup((*funding_txo, monitor_update)));
                        }
                }
  
                // Note that we have to do the above replays before we push new monitor updates.
                pending_background_events.append(&mut close_background_events);
  
 +              // If there's any preimages for forwarded HTLCs hanging around in ChannelMonitors we
 +              // should ensure we try them again on the inbound edge. We put them here and do so after we
 +              // have a fully-constructed `ChannelManager` at the end.
 +              let mut pending_claims_to_replay = Vec::new();
 +
                {
                        // If we're tracking pending payments, ensure we haven't lost any by looking at the
                        // ChannelMonitor data for any channels for which we do not have authorative state
                        // We only rebuild the pending payments map if we were most recently serialized by
                        // 0.0.102+
                        for (_, monitor) in args.channel_monitors.iter() {
 -                              if id_to_peer.get(&monitor.get_funding_txo().0.to_channel_id()).is_none() {
 +                              let counterparty_opt = id_to_peer.get(&monitor.get_funding_txo().0.to_channel_id());
 +                              if counterparty_opt.is_none() {
                                        for (htlc_source, (htlc, _)) in monitor.get_pending_or_resolved_outbound_htlcs() {
                                                if let HTLCSource::OutboundRoute { payment_id, session_priv, path, .. } = htlc_source {
                                                        if path.hops.is_empty() {
                                                }
                                        }
                                }
 +
 +                              // Whether the downstream channel was closed or not, try to re-apply any payment
 +                              // preimages from it which may be needed in upstream channels for forwarded
 +                              // payments.
 +                              let outbound_claimed_htlcs_iter = monitor.get_all_current_outbound_htlcs()
 +                                      .into_iter()
 +                                      .filter_map(|(htlc_source, (htlc, preimage_opt))| {
 +                                              if let HTLCSource::PreviousHopData(_) = htlc_source {
 +                                                      if let Some(payment_preimage) = preimage_opt {
 +                                                              Some((htlc_source, payment_preimage, htlc.amount_msat,
 +                                                                      // Check if `counterparty_opt.is_none()` to see if the
 +                                                                      // downstream chan is closed (because we don't have a
 +                                                                      // channel_id -> peer map entry).
 +                                                                      counterparty_opt.is_none(),
 +                                                                      monitor.get_funding_txo().0.to_channel_id()))
 +                                                      } else { None }
 +                                              } else {
 +                                                      // If it was an outbound payment, we've handled it above - if a preimage
 +                                                      // came in and we persisted the `ChannelManager` we either handled it and
 +                                                      // are good to go or the channel force-closed - we don't have to handle the
 +                                                      // channel still live case here.
 +                                                      None
 +                                              }
 +                                      });
 +                              for tuple in outbound_claimed_htlcs_iter {
 +                                      pending_claims_to_replay.push(tuple);
 +                              }
                        }
                }
  
                        pending_events_processor: AtomicBool::new(false),
                        pending_background_events: Mutex::new(pending_background_events),
                        total_consistency_lock: RwLock::new(()),
 -                      #[cfg(debug_assertions)]
                        background_events_processed_since_startup: AtomicBool::new(false),
                        persistence_notifier: Notifier::new(),
  
                        channel_manager.fail_htlc_backwards_internal(&source, &payment_hash, &reason, receiver);
                }
  
 +              for (source, preimage, downstream_value, downstream_closed, downstream_chan_id) in pending_claims_to_replay {
 +                      // We use `downstream_closed` in place of `from_onchain` here just as a guess - we
 +                      // don't remember in the `ChannelMonitor` where we got a preimage from, but if the
 +                      // channel is closed we just assume that it probably came from an on-chain claim.
 +                      channel_manager.claim_funds_internal(source, preimage, Some(downstream_value),
 +                              downstream_closed, downstream_chan_id);
 +              }
 +
                //TODO: Broadcast channel update for closed channels, but only after we've made a
                //connection or two.
  
@@@ -10137,7 -10014,7 +10144,7 @@@ pub mod bench 
        use crate::routing::gossip::NetworkGraph;
        use crate::routing::router::{PaymentParameters, RouteParameters};
        use crate::util::test_utils;
 -      use crate::util::config::UserConfig;
 +      use crate::util::config::{UserConfig, MaxDustHTLCExposure};
  
        use bitcoin::hashes::Hash;
        use bitcoin::hashes::sha256::Hash as Sha256;
                let router = test_utils::TestRouter::new(Arc::new(NetworkGraph::new(network, &logger_a)), &scorer);
  
                let mut config: UserConfig = Default::default();
 +              config.channel_config.max_dust_htlc_exposure = MaxDustHTLCExposure::FeeRateMultiplier(5_000_000 / 253);
                config.channel_handshake_config.minimum_depth = 1;
  
                let chain_monitor_a = ChainMonitor::new(None, &tx_broadcaster, &logger_a, &fee_estimator, &persister_a);
index 694e8d7a7d9c5756c0f02ad7d2a8ea8b255ad1e4,f2c783fa67fc08584f1c0d0540a58c7796329907..024690d00daad73da84072674bcc2094d32df149
@@@ -27,7 -27,7 +27,7 @@@ use crate::util::scid_utils
  use crate::util::test_utils;
  use crate::util::test_utils::{panicking, TestChainMonitor, TestScorer, TestKeysInterface};
  use crate::util::errors::APIError;
 -use crate::util::config::UserConfig;
 +use crate::util::config::{UserConfig, MaxDustHTLCExposure};
  use crate::util::ser::{ReadableArgs, Writeable};
  
  use bitcoin::blockdata::block::{Block, BlockHeader};
@@@ -1806,6 -1806,28 +1806,28 @@@ macro_rules! get_route_and_payment_has
        }}
  }
  
+ pub fn check_payment_claimable(
+       event: &Event, expected_payment_hash: PaymentHash, expected_payment_secret: PaymentSecret,
+       expected_recv_value: u64, expected_payment_preimage: Option<PaymentPreimage>,
+       expected_receiver_node_id: PublicKey,
+ ) {
+       match event {
+               Event::PaymentClaimable { ref payment_hash, ref purpose, amount_msat, receiver_node_id, .. } => {
+                       assert_eq!(expected_payment_hash, *payment_hash);
+                       assert_eq!(expected_recv_value, *amount_msat);
+                       assert_eq!(expected_receiver_node_id, receiver_node_id.unwrap());
+                       match purpose {
+                               PaymentPurpose::InvoicePayment { payment_preimage, payment_secret, .. } => {
+                                       assert_eq!(&expected_payment_preimage, payment_preimage);
+                                       assert_eq!(expected_payment_secret, *payment_secret);
+                               },
+                               _ => {},
+                       }
+               },
+               _ => panic!("Unexpected event"),
+       }
+ }
  #[macro_export]
  #[cfg(any(test, ldk_bench, feature = "_test_utils"))]
  macro_rules! expect_payment_claimable {
        ($node: expr, $expected_payment_hash: expr, $expected_payment_secret: expr, $expected_recv_value: expr, $expected_payment_preimage: expr, $expected_receiver_node_id: expr) => {
                let events = $node.node.get_and_clear_pending_events();
                assert_eq!(events.len(), 1);
-               match events[0] {
-                       $crate::events::Event::PaymentClaimable { ref payment_hash, ref purpose, amount_msat, receiver_node_id, .. } => {
-                               assert_eq!($expected_payment_hash, *payment_hash);
-                               assert_eq!($expected_recv_value, amount_msat);
-                               assert_eq!($expected_receiver_node_id, receiver_node_id.unwrap());
-                               match purpose {
-                                       $crate::events::PaymentPurpose::InvoicePayment { payment_preimage, payment_secret, .. } => {
-                                               assert_eq!(&$expected_payment_preimage, payment_preimage);
-                                               assert_eq!($expected_payment_secret, *payment_secret);
-                                       },
-                                       _ => {},
-                               }
-                       },
-                       _ => panic!("Unexpected event"),
-               }
-       }
+               $crate::ln::functional_test_utils::check_payment_claimable(&events[0], $expected_payment_hash, $expected_payment_secret, $expected_recv_value, $expected_payment_preimage, $expected_receiver_node_id)
+       };
  }
  
  #[macro_export]
@@@ -1951,16 -1959,6 +1959,16 @@@ macro_rules! expect_payment_forwarded 
        }
  }
  
 +#[cfg(test)]
 +#[macro_export]
 +macro_rules! expect_channel_shutdown_state {
 +      ($node: expr, $chan_id: expr, $state: path) => {
 +              let chan_details = $node.node.list_channels().into_iter().filter(|cd| cd.channel_id == $chan_id).collect::<Vec<ChannelDetails>>();
 +              assert_eq!(chan_details.len(), 1);
 +              assert_eq!(chan_details[0].channel_shutdown_state, Some($state));
 +      }
 +}
 +
  #[cfg(any(test, ldk_bench, feature = "_test_utils"))]
  pub fn expect_channel_pending_event<'a, 'b, 'c, 'd>(node: &'a Node<'b, 'c, 'd>, expected_counterparty_node_id: &PublicKey) {
        let events = node.node.get_and_clear_pending_events();
@@@ -2582,10 -2580,8 +2590,10 @@@ pub fn test_default_channel_config() -
        // It now defaults to 1, so we simply set it to the expected value here.
        default_config.channel_handshake_config.our_htlc_minimum_msat = 1000;
        // When most of our tests were written, we didn't have the notion of a `max_dust_htlc_exposure_msat`,
 -      // It now defaults to 5_000_000 msat; to avoid interfering with tests we bump it to 50_000_000 msat.
 -      default_config.channel_config.max_dust_htlc_exposure_msat = 50_000_000;
 +      // to avoid interfering with tests we bump it to 50_000_000 msat (assuming the default test
 +      // feerate of 253).
 +      default_config.channel_config.max_dust_htlc_exposure =
 +              MaxDustHTLCExposure::FeeRateMultiplier(50_000_000 / 253);
        default_config
  }