Merge pull request #2534 from tnull/2023-08-upstream-preflight-probing
authorMatt Corallo <649246+TheBlueMatt@users.noreply.github.com>
Mon, 18 Sep 2023 16:41:57 +0000 (16:41 +0000)
committerGitHub <noreply@github.com>
Mon, 18 Sep 2023 16:41:57 +0000 (16:41 +0000)
Upstream and fix preflight probing

1  2 
fuzz/src/chanmon_consistency.rs
lightning-background-processor/src/lib.rs
lightning/src/ln/channelmanager.rs
lightning/src/ln/onion_utils.rs
lightning/src/ln/outbound_payment.rs
lightning/src/routing/scoring.rs

index 8afc2e15187edb5397474a64b2cf33e34359a667,b68310264280dbded23ca6000714cbee74902d6a..b6d41fb99140cd2cb012ffbaf08c190efdd443fd
@@@ -125,6 -125,7 +125,6 @@@ struct TestChainMonitor 
        // "fails" if we ever force-close a channel, we avoid doing so, always saving the latest
        // fully-serialized monitor state here, as well as the corresponding update_id.
        pub latest_monitors: Mutex<HashMap<OutPoint, (u64, Vec<u8>)>>,
 -      pub should_update_manager: atomic::AtomicBool,
  }
  impl TestChainMonitor {
        pub fn new(broadcaster: Arc<TestBroadcaster>, logger: Arc<dyn Logger>, feeest: Arc<FuzzEstimator>, persister: Arc<TestPersister>, keys: Arc<KeyProvider>) -> Self {
                        keys,
                        persister,
                        latest_monitors: Mutex::new(HashMap::new()),
 -                      should_update_manager: atomic::AtomicBool::new(false),
                }
        }
  }
@@@ -144,6 -146,7 +144,6 @@@ impl chain::Watch<TestChannelSigner> fo
                if let Some(_) = self.latest_monitors.lock().unwrap().insert(funding_txo, (monitor.get_latest_update_id(), ser.0)) {
                        panic!("Already had monitor pre-watch_channel");
                }
 -              self.should_update_manager.store(true, atomic::Ordering::Relaxed);
                self.chain_monitor.watch_channel(funding_txo, monitor)
        }
  
                let mut ser = VecWriter(Vec::new());
                deserialized_monitor.write(&mut ser).unwrap();
                map_entry.insert((update.update_id, ser.0));
 -              self.should_update_manager.store(true, atomic::Ordering::Relaxed);
                self.chain_monitor.update_channel(funding_txo, update)
        }
  
@@@ -371,6 -375,7 +371,7 @@@ fn send_payment(source: &ChanMan, dest
                        channel_features: dest.channel_features(),
                        fee_msat: amt,
                        cltv_expiry_delta: 200,
+                       maybe_announced_channel: true,
                }], blinded_tail: None }],
                route_params: None,
        }, payment_hash, RecipientOnionFields::secret_only(payment_secret), PaymentId(payment_id)) {
@@@ -405,6 -410,7 +406,7 @@@ fn send_hop_payment(source: &ChanMan, m
                        channel_features: middle.channel_features(),
                        fee_msat: first_hop_fee,
                        cltv_expiry_delta: 100,
+                       maybe_announced_channel: true,
                }, RouteHop {
                        pubkey: dest.get_our_node_id(),
                        node_features: dest.node_features(),
                        channel_features: dest.channel_features(),
                        fee_msat: amt,
                        cltv_expiry_delta: 200,
+                       maybe_announced_channel: true,
                }], blinded_tail: None }],
                route_params: None,
        }, payment_hash, RecipientOnionFields::secret_only(payment_secret), PaymentId(payment_id)) {
@@@ -1097,9 -1104,11 +1100,9 @@@ pub fn do_test<Out: Output>(data: &[u8]
                                if !chan_a_disconnected {
                                        nodes[1].peer_disconnected(&nodes[0].get_our_node_id());
                                        chan_a_disconnected = true;
 -                                      drain_msg_events_on_disconnect!(0);
 -                              }
 -                              if monitor_a.should_update_manager.load(atomic::Ordering::Relaxed) {
 -                                      node_a_ser.0.clear();
 -                                      nodes[0].write(&mut node_a_ser).unwrap();
 +                                      push_excess_b_events!(nodes[1].get_and_clear_pending_msg_events().drain(..), Some(0));
 +                                      ab_events.clear();
 +                                      ba_events.clear();
                                }
                                let (new_node_a, new_monitor_a) = reload_node!(node_a_ser, 0, monitor_a, keys_manager_a, fee_est_a);
                                nodes[0] = new_node_a;
                                if !chan_b_disconnected {
                                        nodes[1].peer_disconnected(&nodes[2].get_our_node_id());
                                        chan_b_disconnected = true;
 -                                      drain_msg_events_on_disconnect!(2);
 -                              }
 -                              if monitor_c.should_update_manager.load(atomic::Ordering::Relaxed) {
 -                                      node_c_ser.0.clear();
 -                                      nodes[2].write(&mut node_c_ser).unwrap();
 +                                      push_excess_b_events!(nodes[1].get_and_clear_pending_msg_events().drain(..), Some(2));
 +                                      bc_events.clear();
 +                                      cb_events.clear();
                                }
                                let (new_node_c, new_monitor_c) = reload_node!(node_c_ser, 2, monitor_c, keys_manager_c, fee_est_c);
                                nodes[2] = new_node_c;
                        _ => test_return!(),
                }
  
 -              node_a_ser.0.clear();
 -              nodes[0].write(&mut node_a_ser).unwrap();
 -              monitor_a.should_update_manager.store(false, atomic::Ordering::Relaxed);
 -              node_b_ser.0.clear();
 -              nodes[1].write(&mut node_b_ser).unwrap();
 -              monitor_b.should_update_manager.store(false, atomic::Ordering::Relaxed);
 -              node_c_ser.0.clear();
 -              nodes[2].write(&mut node_c_ser).unwrap();
 -              monitor_c.should_update_manager.store(false, atomic::Ordering::Relaxed);
 +              if nodes[0].get_and_clear_needs_persistence() == true {
 +                      node_a_ser.0.clear();
 +                      nodes[0].write(&mut node_a_ser).unwrap();
 +              }
 +              if nodes[1].get_and_clear_needs_persistence() == true {
 +                      node_b_ser.0.clear();
 +                      nodes[1].write(&mut node_b_ser).unwrap();
 +              }
 +              if nodes[2].get_and_clear_needs_persistence() == true {
 +                      node_c_ser.0.clear();
 +                      nodes[2].write(&mut node_c_ser).unwrap();
 +              }
        }
  }
  
index 6a36874a384bf9ca23a91d2cef6557853943aa2f,b3d1f3755eb0f4b06a9696cee95fe6857a65506f..7ae14b4b4aabd5e92c3a610b5cfd622c2314264f
@@@ -315,7 -315,7 +315,7 @@@ macro_rules! define_run_body 
                        // see `await_start`'s use below.
                        let mut await_start = None;
                        if $check_slow_await { await_start = Some($get_timer(1)); }
 -                      let updates_available = $await;
 +                      $await;
                        let await_slow = if $check_slow_await { $timer_elapsed(&mut await_start.unwrap(), 1) } else { false };
  
                        // Exit the loop if the background processor was requested to stop.
                                break;
                        }
  
 -                      if updates_available {
 +                      if $channel_manager.get_and_clear_needs_persistence() {
                                log_trace!($logger, "Persisting ChannelManager...");
                                $persister.persist_manager(&*$channel_manager)?;
                                log_trace!($logger, "Done persisting ChannelManager.");
@@@ -655,14 -655,16 +655,14 @@@ wher
                channel_manager, channel_manager.process_pending_events_async(async_event_handler).await,
                gossip_sync, peer_manager, logger, scorer, should_break, {
                        let fut = Selector {
 -                              a: channel_manager.get_persistable_update_future(),
 +                              a: channel_manager.get_event_or_persistence_needed_future(),
                                b: chain_monitor.get_update_future(),
                                c: sleeper(if mobile_interruptable_platform { Duration::from_millis(100) } else { Duration::from_secs(FASTEST_TIMER) }),
                        };
                        match fut.await {
 -                              SelectorOutput::A => true,
 -                              SelectorOutput::B => false,
 +                              SelectorOutput::A|SelectorOutput::B => {},
                                SelectorOutput::C(exit) => {
                                        should_break = exit;
 -                                      false
                                }
                        }
                }, |t| sleeper(Duration::from_secs(t)),
@@@ -785,10 -787,10 +785,10 @@@ impl BackgroundProcessor 
                        define_run_body!(persister, chain_monitor, chain_monitor.process_pending_events(&event_handler),
                                channel_manager, channel_manager.process_pending_events(&event_handler),
                                gossip_sync, peer_manager, logger, scorer, stop_thread.load(Ordering::Acquire),
 -                              Sleeper::from_two_futures(
 -                                      channel_manager.get_persistable_update_future(),
 +                              Sleeper::from_two_futures(
 +                                      channel_manager.get_event_or_persistence_needed_future(),
                                        chain_monitor.get_update_future()
 -                              ).wait_timeout(Duration::from_millis(100)),
 +                              ).wait_timeout(Duration::from_millis(100)); },
                                |_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur, false)
                });
                Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
@@@ -1324,7 -1326,7 +1324,7 @@@ mod tests 
                check_persisted_data!(nodes[0].node, filepath.clone());
  
                loop {
 -                      if !nodes[0].node.get_persistence_condvar_value() { break }
 +                      if !nodes[0].node.get_event_or_persist_condvar_value() { break }
                }
  
                // Force-close the channel.
                // Check that the force-close updates are persisted.
                check_persisted_data!(nodes[0].node, filepath.clone());
                loop {
 -                      if !nodes[0].node.get_persistence_condvar_value() { break }
 +                      if !nodes[0].node.get_event_or_persist_condvar_value() { break }
                }
  
                // Check network graph is persisted
                                channel_features: ChannelFeatures::empty(),
                                fee_msat: 0,
                                cltv_expiry_delta: MIN_CLTV_EXPIRY_DELTA as u32,
+                               maybe_announced_channel: true,
                        }], blinded_tail: None };
  
                        $nodes[0].scorer.lock().unwrap().expect(TestResult::PaymentFailure { path: path.clone(), short_channel_id: scored_scid });
index 90451d59d66d635821ffbe3ce32790d471290c6c,785595a89ef2f82b1fba5f50484171fe5b4d4086..0cadbd41a29d21ccb99de1e0d7bf84319e33971c
@@@ -77,7 -77,7 +77,7 @@@ use core::time::Duration
  use core::ops::Deref;
  
  // Re-export this for use in the public API.
- pub use crate::ln::outbound_payment::{PaymentSendFailure, Retry, RetryableSendFailure, RecipientOnionFields};
+ pub use crate::ln::outbound_payment::{PaymentSendFailure, ProbeSendFailure, Retry, RetryableSendFailure, RecipientOnionFields};
  use crate::ln::script::ShutdownScript;
  
  // We hold various information about HTLC relay in the HTLC objects in Channel itself:
@@@ -495,10 -495,6 +495,10 @@@ impl MsgHandleErrInternal 
                        channel_capacity: None,
                }
        }
 +
 +      fn closes_channel(&self) -> bool {
 +              self.chan_id.is_some()
 +      }
  }
  
  /// We hold back HTLCs we intend to relay for a random interval greater than this (see
@@@ -839,33 -835,46 +839,46 @@@ pub type SimpleRefChannelManager<'a, 'b
                &'g L
        >;
  
- macro_rules! define_test_pub_trait { ($vis: vis) => {
- /// A trivial trait which describes any [`ChannelManager`] used in testing.
- $vis trait AChannelManager {
+ /// A trivial trait which describes any [`ChannelManager`].
+ pub trait AChannelManager {
+       /// A type implementing [`chain::Watch`].
        type Watch: chain::Watch<Self::Signer> + ?Sized;
+       /// A type that may be dereferenced to [`Self::Watch`].
        type M: Deref<Target = Self::Watch>;
+       /// A type implementing [`BroadcasterInterface`].
        type Broadcaster: BroadcasterInterface + ?Sized;
+       /// A type that may be dereferenced to [`Self::Broadcaster`].
        type T: Deref<Target = Self::Broadcaster>;
+       /// A type implementing [`EntropySource`].
        type EntropySource: EntropySource + ?Sized;
+       /// A type that may be dereferenced to [`Self::EntropySource`].
        type ES: Deref<Target = Self::EntropySource>;
+       /// A type implementing [`NodeSigner`].
        type NodeSigner: NodeSigner + ?Sized;
+       /// A type that may be dereferenced to [`Self::NodeSigner`].
        type NS: Deref<Target = Self::NodeSigner>;
+       /// A type implementing [`WriteableEcdsaChannelSigner`].
        type Signer: WriteableEcdsaChannelSigner + Sized;
+       /// A type implementing [`SignerProvider`] for [`Self::Signer`].
        type SignerProvider: SignerProvider<Signer = Self::Signer> + ?Sized;
+       /// A type that may be dereferenced to [`Self::SignerProvider`].
        type SP: Deref<Target = Self::SignerProvider>;
+       /// A type implementing [`FeeEstimator`].
        type FeeEstimator: FeeEstimator + ?Sized;
+       /// A type that may be dereferenced to [`Self::FeeEstimator`].
        type F: Deref<Target = Self::FeeEstimator>;
+       /// A type implementing [`Router`].
        type Router: Router + ?Sized;
+       /// A type that may be dereferenced to [`Self::Router`].
        type R: Deref<Target = Self::Router>;
+       /// A type implementing [`Logger`].
        type Logger: Logger + ?Sized;
+       /// A type that may be dereferenced to [`Self::Logger`].
        type L: Deref<Target = Self::Logger>;
+       /// Returns a reference to the actual [`ChannelManager`] object.
        fn get_cm(&self) -> &ChannelManager<Self::M, Self::T, Self::ES, Self::NS, Self::SP, Self::F, Self::R, Self::L>;
  }
- } }
- #[cfg(any(test, feature = "_test_utils"))]
- define_test_pub_trait!(pub);
- #[cfg(not(any(test, feature = "_test_utils")))]
- define_test_pub_trait!(pub(crate));
  impl<M: Deref, T: Deref, ES: Deref, NS: Deref, SP: Deref, F: Deref, R: Deref, L: Deref> AChannelManager
  for ChannelManager<M, T, ES, NS, SP, F, R, L>
  where
@@@ -1189,8 -1198,7 +1202,8 @@@ wher
  
        background_events_processed_since_startup: AtomicBool,
  
 -      persistence_notifier: Notifier,
 +      event_persist_notifier: Notifier,
 +      needs_persist_flag: AtomicBool,
  
        entropy_source: ES,
        node_signer: NS,
@@@ -1219,8 -1227,7 +1232,8 @@@ pub struct ChainParameters 
  #[must_use]
  enum NotifyOption {
        DoPersist,
 -      SkipPersist,
 +      SkipPersistHandleEvents,
 +      SkipPersistNoEvents,
  }
  
  /// Whenever we release the `ChannelManager`'s `total_consistency_lock`, from read mode, it is
  /// We allow callers to either always notify by constructing with `notify_on_drop` or choose to
  /// notify or not based on whether relevant changes have been made, providing a closure to
  /// `optionally_notify` which returns a `NotifyOption`.
 -struct PersistenceNotifierGuard<'a, F: Fn() -> NotifyOption> {
 -      persistence_notifier: &'a Notifier,
 +struct PersistenceNotifierGuard<'a, F: FnMut() -> NotifyOption> {
 +      event_persist_notifier: &'a Notifier,
 +      needs_persist_flag: &'a AtomicBool,
        should_persist: F,
        // We hold onto this result so the lock doesn't get released immediately.
        _read_guard: RwLockReadGuard<'a, ()>,
  }
  
  impl<'a> PersistenceNotifierGuard<'a, fn() -> NotifyOption> { // We don't care what the concrete F is here, it's unused
 -      fn notify_on_drop<C: AChannelManager>(cm: &'a C) -> PersistenceNotifierGuard<'a, impl Fn() -> NotifyOption> {
 +      /// Notifies any waiters and indicates that we need to persist, in addition to possibly having
 +      /// events to handle.
 +      ///
 +      /// This must always be called if the changes included a `ChannelMonitorUpdate`, as well as in
 +      /// other cases where losing the changes on restart may result in a force-close or otherwise
 +      /// isn't ideal.
 +      fn notify_on_drop<C: AChannelManager>(cm: &'a C) -> PersistenceNotifierGuard<'a, impl FnMut() -> NotifyOption> {
 +              Self::optionally_notify(cm, || -> NotifyOption { NotifyOption::DoPersist })
 +      }
 +
 +      fn optionally_notify<F: FnMut() -> NotifyOption, C: AChannelManager>(cm: &'a C, mut persist_check: F)
 +      -> PersistenceNotifierGuard<'a, impl FnMut() -> NotifyOption> {
                let read_guard = cm.get_cm().total_consistency_lock.read().unwrap();
 -              let _ = cm.get_cm().process_background_events(); // We always persist
 +              let force_notify = cm.get_cm().process_background_events();
  
                PersistenceNotifierGuard {
 -                      persistence_notifier: &cm.get_cm().persistence_notifier,
 -                      should_persist: || -> NotifyOption { NotifyOption::DoPersist },
 +                      event_persist_notifier: &cm.get_cm().event_persist_notifier,
 +                      needs_persist_flag: &cm.get_cm().needs_persist_flag,
 +                      should_persist: move || {
 +                              // Pick the "most" action between `persist_check` and the background events
 +                              // processing and return that.
 +                              let notify = persist_check();
 +                              match (notify, force_notify) {
 +                                      (NotifyOption::DoPersist, _) => NotifyOption::DoPersist,
 +                                      (_, NotifyOption::DoPersist) => NotifyOption::DoPersist,
 +                                      (NotifyOption::SkipPersistHandleEvents, _) => NotifyOption::SkipPersistHandleEvents,
 +                                      (_, NotifyOption::SkipPersistHandleEvents) => NotifyOption::SkipPersistHandleEvents,
 +                                      _ => NotifyOption::SkipPersistNoEvents,
 +                              }
 +                      },
                        _read_guard: read_guard,
                }
 -
        }
  
        /// Note that if any [`ChannelMonitorUpdate`]s are possibly generated,
 -      /// [`ChannelManager::process_background_events`] MUST be called first.
 -      fn optionally_notify<F: Fn() -> NotifyOption>(lock: &'a RwLock<()>, notifier: &'a Notifier, persist_check: F) -> PersistenceNotifierGuard<'a, F> {
 -              let read_guard = lock.read().unwrap();
 +      /// [`ChannelManager::process_background_events`] MUST be called first (or
 +      /// [`Self::optionally_notify`] used).
 +      fn optionally_notify_skipping_background_events<F: Fn() -> NotifyOption, C: AChannelManager>
 +      (cm: &'a C, persist_check: F) -> PersistenceNotifierGuard<'a, F> {
 +              let read_guard = cm.get_cm().total_consistency_lock.read().unwrap();
  
                PersistenceNotifierGuard {
 -                      persistence_notifier: notifier,
 +                      event_persist_notifier: &cm.get_cm().event_persist_notifier,
 +                      needs_persist_flag: &cm.get_cm().needs_persist_flag,
                        should_persist: persist_check,
                        _read_guard: read_guard,
                }
        }
  }
  
 -impl<'a, F: Fn() -> NotifyOption> Drop for PersistenceNotifierGuard<'a, F> {
 +impl<'a, F: FnMut() -> NotifyOption> Drop for PersistenceNotifierGuard<'a, F> {
        fn drop(&mut self) {
 -              if (self.should_persist)() == NotifyOption::DoPersist {
 -                      self.persistence_notifier.notify();
 +              match (self.should_persist)() {
 +                      NotifyOption::DoPersist => {
 +                              self.needs_persist_flag.store(true, Ordering::Release);
 +                              self.event_persist_notifier.notify()
 +                      },
 +                      NotifyOption::SkipPersistHandleEvents =>
 +                              self.event_persist_notifier.notify(),
 +                      NotifyOption::SkipPersistNoEvents => {},
                }
        }
  }
@@@ -2123,7 -2098,7 +2136,7 @@@ macro_rules! process_events_body 
                                return;
                        }
  
 -                      let mut result = NotifyOption::SkipPersist;
 +                      let mut result;
  
                        {
                                // We'll acquire our total consistency lock so that we can be sure no other
  
                                // Because `handle_post_event_actions` may send `ChannelMonitorUpdate`s to the user we must
                                // ensure any startup-generated background events are handled first.
 -                              if $self.process_background_events() == NotifyOption::DoPersist { result = NotifyOption::DoPersist; }
 +                              result = $self.process_background_events();
  
                                // TODO: This behavior should be documented. It's unintuitive that we query
                                // ChannelMonitors when clearing other events.
                                processed_all_events = false;
                        }
  
 -                      if result == NotifyOption::DoPersist {
 -                              $self.persistence_notifier.notify();
 +                      match result {
 +                              NotifyOption::DoPersist => {
 +                                      $self.needs_persist_flag.store(true, Ordering::Release);
 +                                      $self.event_persist_notifier.notify();
 +                              },
 +                              NotifyOption::SkipPersistHandleEvents =>
 +                                      $self.event_persist_notifier.notify(),
 +                              NotifyOption::SkipPersistNoEvents => {},
                        }
                }
        }
@@@ -2258,9 -2227,7 +2271,9 @@@ wher
                        pending_background_events: Mutex::new(Vec::new()),
                        total_consistency_lock: RwLock::new(()),
                        background_events_processed_since_startup: AtomicBool::new(false),
 -                      persistence_notifier: Notifier::new(),
 +
 +                      event_persist_notifier: Notifier::new(),
 +                      needs_persist_flag: AtomicBool::new(false),
  
                        entropy_source,
                        node_signer,
                let (short_channel_id, amt_to_forward, outgoing_cltv_value) = match hop_data {
                        msgs::InboundOnionPayload::Forward { short_channel_id, amt_to_forward, outgoing_cltv_value } =>
                                (short_channel_id, amt_to_forward, outgoing_cltv_value),
 -                      msgs::InboundOnionPayload::Receive { .. } =>
 +                      msgs::InboundOnionPayload::Receive { .. } | msgs::InboundOnionPayload::BlindedReceive { .. } =>
                                return Err(InboundOnionErr {
                                        msg: "Final Node OnionHopData provided for us as an intermediary node",
                                        err_code: 0x4000 | 22,
                                payment_data, keysend_preimage, custom_tlvs, amt_msat, outgoing_cltv_value, payment_metadata, ..
                        } =>
                                (payment_data, keysend_preimage, custom_tlvs, amt_msat, outgoing_cltv_value, payment_metadata),
 -                      _ =>
 +                      msgs::InboundOnionPayload::BlindedReceive {
 +                              amt_msat, total_msat, outgoing_cltv_value, payment_secret, ..
 +                      } => {
 +                              let payment_data = msgs::FinalOnionHopData { payment_secret, total_msat };
 +                              (Some(payment_data), None, Vec::new(), amt_msat, outgoing_cltv_value, None)
 +                      }
 +                      msgs::InboundOnionPayload::Forward { .. } => {
                                return Err(InboundOnionErr {
                                        err_code: 0x4000|22,
                                        err_data: Vec::new(),
                                        msg: "Got non final data with an HMAC of 0",
 -                              }),
 +                              })
 +                      },
                };
                // final_incorrect_cltv_expiry
                if outgoing_cltv_value > cltv_expiry {
                        }
                }
  
 -              let next_hop = match onion_utils::decode_next_payment_hop(shared_secret, &msg.onion_routing_packet.hop_data[..], msg.onion_routing_packet.hmac, msg.payment_hash) {
 +              let next_hop = match onion_utils::decode_next_payment_hop(
 +                      shared_secret, &msg.onion_routing_packet.hop_data[..], msg.onion_routing_packet.hmac,
 +                      msg.payment_hash, &self.node_signer
 +              ) {
                        Ok(res) => res,
                        Err(onion_utils::OnionDecodeErr::Malformed { err_msg, err_code }) => {
                                return_malformed_err!(err_msg, err_code);
                        // We'll do receive checks in [`Self::construct_pending_htlc_info`] so we have access to the
                        // inbound channel's state.
                        onion_utils::Hop::Receive { .. } => return Ok((next_hop, shared_secret, None)),
 -                      onion_utils::Hop::Forward { next_hop_data: msgs::InboundOnionPayload::Receive { .. }, .. } => {
 +                      onion_utils::Hop::Forward { next_hop_data: msgs::InboundOnionPayload::Receive { .. }, .. } |
 +                              onion_utils::Hop::Forward { next_hop_data: msgs::InboundOnionPayload::BlindedReceive { .. }, .. } =>
 +                      {
                                return_err!("Final Node OnionHopData provided for us as an intermediary node", 0x4000 | 22, &[0; 0]);
                        }
                };
                outbound_payment::payment_is_probe(payment_hash, payment_id, self.probing_cookie_secret)
        }
  
+       /// Sends payment probes over all paths of a route that would be used to pay the given
+       /// amount to the given `node_id`.
+       ///
+       /// See [`ChannelManager::send_preflight_probes`] for more information.
+       pub fn send_spontaneous_preflight_probes(
+               &self, node_id: PublicKey, amount_msat: u64, final_cltv_expiry_delta: u32, 
+               liquidity_limit_multiplier: Option<u64>,
+       ) -> Result<Vec<(PaymentHash, PaymentId)>, ProbeSendFailure> {
+               let payment_params =
+                       PaymentParameters::from_node_id(node_id, final_cltv_expiry_delta);
+               let route_params = RouteParameters { payment_params, final_value_msat: amount_msat };
+               self.send_preflight_probes(route_params, liquidity_limit_multiplier)
+       }
+       /// Sends payment probes over all paths of a route that would be used to pay a route found
+       /// according to the given [`RouteParameters`].
+       ///
+       /// This may be used to send "pre-flight" probes, i.e., to train our scorer before conducting
+       /// the actual payment. Note this is only useful if there likely is sufficient time for the
+       /// probe to settle before sending out the actual payment, e.g., when waiting for user
+       /// confirmation in a wallet UI.
+       ///
+       /// Otherwise, there is a chance the probe could take up some liquidity needed to complete the
+       /// actual payment. Users should therefore be cautious and might avoid sending probes if
+       /// liquidity is scarce and/or they don't expect the probe to return before they send the
+       /// payment. To mitigate this issue, channels with available liquidity less than the required
+       /// amount times the given `liquidity_limit_multiplier` won't be used to send pre-flight
+       /// probes. If `None` is given as `liquidity_limit_multiplier`, it defaults to `3`.
+       pub fn send_preflight_probes(
+               &self, route_params: RouteParameters, liquidity_limit_multiplier: Option<u64>,
+       ) -> Result<Vec<(PaymentHash, PaymentId)>, ProbeSendFailure> {
+               let liquidity_limit_multiplier = liquidity_limit_multiplier.unwrap_or(3);
+               let payer = self.get_our_node_id();
+               let usable_channels = self.list_usable_channels();
+               let first_hops = usable_channels.iter().collect::<Vec<_>>();
+               let inflight_htlcs = self.compute_inflight_htlcs();
+               let route = self
+                       .router
+                       .find_route(&payer, &route_params, Some(&first_hops), inflight_htlcs)
+                       .map_err(|e| {
+                               log_error!(self.logger, "Failed to find path for payment probe: {:?}", e);
+                               ProbeSendFailure::RouteNotFound
+                       })?;
+               let mut used_liquidity_map = HashMap::with_capacity(first_hops.len());
+               let mut res = Vec::new();
+               for mut path in route.paths {
+                       // If the last hop is probably an unannounced channel we refrain from probing all the
+                       // way through to the end and instead probe up to the second-to-last channel.
+                       while let Some(last_path_hop) = path.hops.last() {
+                               if last_path_hop.maybe_announced_channel {
+                                       // We found a potentially announced last hop.
+                                       break;
+                               } else {
+                                       // Drop the last hop, as it's likely unannounced.
+                                       log_debug!(
+                                               self.logger,
+                                               "Avoided sending payment probe all the way to last hop {} as it is likely unannounced.",
+                                               last_path_hop.short_channel_id
+                                       );
+                                       let final_value_msat = path.final_value_msat();
+                                       path.hops.pop();
+                                       if let Some(new_last) = path.hops.last_mut() {
+                                               new_last.fee_msat += final_value_msat;
+                                       }
+                               }
+                       }
+                       if path.hops.len() < 2 {
+                               log_debug!(
+                                       self.logger,
+                                       "Skipped sending payment probe over path with less than two hops."
+                               );
+                               continue;
+                       }
+                       if let Some(first_path_hop) = path.hops.first() {
+                               if let Some(first_hop) = first_hops.iter().find(|h| {
+                                       h.get_outbound_payment_scid() == Some(first_path_hop.short_channel_id)
+                               }) {
+                                       let path_value = path.final_value_msat() + path.fee_msat();
+                                       let used_liquidity =
+                                               used_liquidity_map.entry(first_path_hop.short_channel_id).or_insert(0);
+                                       if first_hop.next_outbound_htlc_limit_msat
+                                               < (*used_liquidity + path_value) * liquidity_limit_multiplier
+                                       {
+                                               log_debug!(self.logger, "Skipped sending payment probe to avoid putting channel {} under the liquidity limit.", first_path_hop.short_channel_id);
+                                               continue;
+                                       } else {
+                                               *used_liquidity += path_value;
+                                       }
+                               }
+                       }
+                       res.push(self.send_probe(path).map_err(|e| {
+                               log_error!(self.logger, "Failed to send pre-flight probe: {:?}", e);
+                               ProbeSendFailure::SendingFailed(e)
+                       })?);
+               }
+               Ok(res)
+       }
        /// Handles the generation of a funding transaction, optionally (for tests) with a function
        /// which checks the correctness of the funding transaction given the associated channel.
        fn funding_transaction_generated_intern<FundingOutput: Fn(&OutboundV1Channel<SP>, &Transaction) -> Result<OutPoint, APIError>>(
                                                                                        let phantom_pubkey_res = self.node_signer.get_node_id(Recipient::PhantomNode);
                                                                                        if phantom_pubkey_res.is_ok() && fake_scid::is_valid_phantom(&self.fake_scid_rand_bytes, short_chan_id, &self.genesis_hash) {
                                                                                                let phantom_shared_secret = self.node_signer.ecdh(Recipient::PhantomNode, &onion_packet.public_key.unwrap(), None).unwrap().secret_bytes();
 -                                                                                              let next_hop = match onion_utils::decode_next_payment_hop(phantom_shared_secret, &onion_packet.hop_data, onion_packet.hmac, payment_hash) {
 +                                                                                              let next_hop = match onion_utils::decode_next_payment_hop(
 +                                                                                                      phantom_shared_secret, &onion_packet.hop_data, onion_packet.hmac,
 +                                                                                                      payment_hash, &self.node_signer
 +                                                                                              ) {
                                                                                                        Ok(res) => res,
                                                                                                        Err(onion_utils::OnionDecodeErr::Malformed { err_msg, err_code }) => {
                                                                                                                let sha256_of_onion = Sha256::hash(&onion_packet.hop_data).into_inner();
                let mut background_events = Vec::new();
                mem::swap(&mut *self.pending_background_events.lock().unwrap(), &mut background_events);
                if background_events.is_empty() {
 -                      return NotifyOption::SkipPersist;
 +                      return NotifyOption::SkipPersistNoEvents;
                }
  
                for event in background_events.drain(..) {
        }
  
        fn update_channel_fee(&self, chan_id: &ChannelId, chan: &mut Channel<SP>, new_feerate: u32) -> NotifyOption {
 -              if !chan.context.is_outbound() { return NotifyOption::SkipPersist; }
 +              if !chan.context.is_outbound() { return NotifyOption::SkipPersistNoEvents; }
                // If the feerate has decreased by less than half, don't bother
                if new_feerate <= chan.context.get_feerate_sat_per_1000_weight() && new_feerate * 2 > chan.context.get_feerate_sat_per_1000_weight() {
                        log_trace!(self.logger, "Channel {} does not qualify for a feerate change from {} to {}.",
 -                              &chan_id, chan.context.get_feerate_sat_per_1000_weight(), new_feerate);
 -                      return NotifyOption::SkipPersist;
 +                              chan_id, chan.context.get_feerate_sat_per_1000_weight(), new_feerate);
 +                      return NotifyOption::SkipPersistNoEvents;
                }
                if !chan.context.is_live() {
                        log_trace!(self.logger, "Channel {} does not qualify for a feerate change from {} to {} as it cannot currently be updated (probably the peer is disconnected).",
 -                              &chan_id, chan.context.get_feerate_sat_per_1000_weight(), new_feerate);
 -                      return NotifyOption::SkipPersist;
 +                              chan_id, chan.context.get_feerate_sat_per_1000_weight(), new_feerate);
 +                      return NotifyOption::SkipPersistNoEvents;
                }
                log_trace!(self.logger, "Channel {} qualifies for a feerate change from {} to {}.",
                        &chan_id, chan.context.get_feerate_sat_per_1000_weight(), new_feerate);
        /// these a fuzz failure (as they usually indicate a channel force-close, which is exactly what
        /// it wants to detect). Thus, we have a variant exposed here for its benefit.
        pub fn maybe_update_chan_fees(&self) {
 -              PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, &self.persistence_notifier, || {
 -                      let mut should_persist = self.process_background_events();
 +              PersistenceNotifierGuard::optionally_notify(self, || {
 +                      let mut should_persist = NotifyOption::SkipPersistNoEvents;
  
                        let normal_feerate = self.fee_estimator.bounded_sat_per_1000_weight(ConfirmationTarget::Normal);
                        let min_mempool_feerate = self.fee_estimator.bounded_sat_per_1000_weight(ConfirmationTarget::MempoolMinimum);
        /// [`ChannelUpdate`]: msgs::ChannelUpdate
        /// [`ChannelConfig`]: crate::util::config::ChannelConfig
        pub fn timer_tick_occurred(&self) {
 -              PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, &self.persistence_notifier, || {
 -                      let mut should_persist = self.process_background_events();
 +              PersistenceNotifierGuard::optionally_notify(self, || {
 +                      let mut should_persist = NotifyOption::SkipPersistNoEvents;
  
                        let normal_feerate = self.fee_estimator.bounded_sat_per_1000_weight(ConfirmationTarget::Normal);
                        let min_mempool_feerate = self.fee_estimator.bounded_sat_per_1000_weight(ConfirmationTarget::MempoolMinimum);
        }
  
        fn internal_open_channel(&self, counterparty_node_id: &PublicKey, msg: &msgs::OpenChannel) -> Result<(), MsgHandleErrInternal> {
 +              // Note that the ChannelManager is NOT re-persisted on disk after this, so any changes are
 +              // likely to be lost on restart!
                if msg.chain_hash != self.genesis_hash {
                        return Err(MsgHandleErrInternal::send_err_msg_no_close("Unknown genesis block hash".to_owned(), msg.temporary_channel_id.clone()));
                }
        }
  
        fn internal_accept_channel(&self, counterparty_node_id: &PublicKey, msg: &msgs::AcceptChannel) -> Result<(), MsgHandleErrInternal> {
 +              // Note that the ChannelManager is NOT re-persisted on disk after this, so any changes are
 +              // likely to be lost on restart!
                let (value, output_script, user_id) = {
                        let per_peer_state = self.per_peer_state.read().unwrap();
                        let peer_state_mutex = per_peer_state.get(counterparty_node_id)
        }
  
        fn internal_channel_ready(&self, counterparty_node_id: &PublicKey, msg: &msgs::ChannelReady) -> Result<(), MsgHandleErrInternal> {
 +              // Note that the ChannelManager is NOT re-persisted on disk after this (unless we error
 +              // closing a channel), so any changes are likely to be lost on restart!
                let per_peer_state = self.per_peer_state.read().unwrap();
                let peer_state_mutex = per_peer_state.get(counterparty_node_id)
                        .ok_or_else(|| {
                //encrypted with the same key. It's not immediately obvious how to usefully exploit that,
                //but we should prevent it anyway.
  
 +              // Note that the ChannelManager is NOT re-persisted on disk after this (unless we error
 +              // closing a channel), so any changes are likely to be lost on restart!
 +
                let decoded_hop_res = self.decode_update_add_htlc_onion(msg);
                let per_peer_state = self.per_peer_state.read().unwrap();
                let peer_state_mutex = per_peer_state.get(counterparty_node_id)
        }
  
        fn internal_update_fail_htlc(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateFailHTLC) -> Result<(), MsgHandleErrInternal> {
 +              // Note that the ChannelManager is NOT re-persisted on disk after this (unless we error
 +              // closing a channel), so any changes are likely to be lost on restart!
                let per_peer_state = self.per_peer_state.read().unwrap();
                let peer_state_mutex = per_peer_state.get(counterparty_node_id)
                        .ok_or_else(|| {
        }
  
        fn internal_update_fail_malformed_htlc(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateFailMalformedHTLC) -> Result<(), MsgHandleErrInternal> {
 +              // Note that the ChannelManager is NOT re-persisted on disk after this (unless we error
 +              // closing a channel), so any changes are likely to be lost on restart!
                let per_peer_state = self.per_peer_state.read().unwrap();
                let peer_state_mutex = per_peer_state.get(counterparty_node_id)
                        .ok_or_else(|| {
                Ok(())
        }
  
 -      /// Returns ShouldPersist if anything changed, otherwise either SkipPersist or an Err.
 +      /// Returns DoPersist if anything changed, otherwise either SkipPersistNoEvents or an Err.
        fn internal_channel_update(&self, counterparty_node_id: &PublicKey, msg: &msgs::ChannelUpdate) -> Result<NotifyOption, MsgHandleErrInternal> {
                let (chan_counterparty_node_id, chan_id) = match self.short_to_chan_info.read().unwrap().get(&msg.contents.short_channel_id) {
                        Some((cp_id, chan_id)) => (cp_id.clone(), chan_id.clone()),
                        None => {
                                // It's not a local channel
 -                              return Ok(NotifyOption::SkipPersist)
 +                              return Ok(NotifyOption::SkipPersistNoEvents)
                        }
                };
                let per_peer_state = self.per_peer_state.read().unwrap();
                let peer_state_mutex_opt = per_peer_state.get(&chan_counterparty_node_id);
                if peer_state_mutex_opt.is_none() {
 -                      return Ok(NotifyOption::SkipPersist)
 +                      return Ok(NotifyOption::SkipPersistNoEvents)
                }
                let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap();
                let peer_state = &mut *peer_state_lock;
                                                        // If the announcement is about a channel of ours which is public, some
                                                        // other peer may simply be forwarding all its gossip to us. Don't provide
                                                        // a scary-looking error message and return Ok instead.
 -                                                      return Ok(NotifyOption::SkipPersist);
 +                                                      return Ok(NotifyOption::SkipPersistNoEvents);
                                                }
                                                return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a channel_update for a channel from the wrong node - it shouldn't know about our private channels!".to_owned(), chan_id));
                                        }
                                        let were_node_one = self.get_our_node_id().serialize()[..] < chan.context.get_counterparty_node_id().serialize()[..];
                                        let msg_from_node_one = msg.contents.flags & 1 == 0;
                                        if were_node_one == msg_from_node_one {
 -                                              return Ok(NotifyOption::SkipPersist);
 +                                              return Ok(NotifyOption::SkipPersistNoEvents);
                                        } else {
                                                log_debug!(self.logger, "Received channel_update for channel {}.", chan_id);
                                                try_chan_phase_entry!(self, chan.channel_update(&msg), chan_phase_entry);
                                                "Got a channel_update for an unfunded channel!".into())), chan_phase_entry);
                                }
                        },
 -                      hash_map::Entry::Vacant(_) => return Ok(NotifyOption::SkipPersist)
 +                      hash_map::Entry::Vacant(_) => return Ok(NotifyOption::SkipPersistNoEvents)
                }
                Ok(NotifyOption::DoPersist)
        }
  
 -      fn internal_channel_reestablish(&self, counterparty_node_id: &PublicKey, msg: &msgs::ChannelReestablish) -> Result<(), MsgHandleErrInternal> {
 +      fn internal_channel_reestablish(&self, counterparty_node_id: &PublicKey, msg: &msgs::ChannelReestablish) -> Result<NotifyOption, MsgHandleErrInternal> {
                let htlc_forwards;
                let need_lnd_workaround = {
                        let per_peer_state = self.per_peer_state.read().unwrap();
                        }
                };
  
 +              let mut persist = NotifyOption::SkipPersistHandleEvents;
                if let Some(forwards) = htlc_forwards {
                        self.forward_htlcs(&mut [forwards][..]);
 +                      persist = NotifyOption::DoPersist;
                }
  
                if let Some(channel_ready_msg) = need_lnd_workaround {
                        self.internal_channel_ready(counterparty_node_id, &channel_ready_msg)?;
                }
 -              Ok(())
 +              Ok(persist)
        }
  
        /// Process pending events from the [`chain::Watch`], returning whether any events were processed.
@@@ -7132,8 -7179,8 +7255,8 @@@ wher
        /// the `MessageSendEvent`s to the specific peer they were generated under.
        fn get_and_clear_pending_msg_events(&self) -> Vec<MessageSendEvent> {
                let events = RefCell::new(Vec::new());
 -              PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, &self.persistence_notifier, || {
 -                      let mut result = self.process_background_events();
 +              PersistenceNotifierGuard::optionally_notify(self, || {
 +                      let mut result = NotifyOption::SkipPersistNoEvents;
  
                        // TODO: This behavior should be documented. It's unintuitive that we query
                        // ChannelMonitors when clearing other events.
@@@ -7214,9 -7261,8 +7337,9 @@@ wher
        }
  
        fn block_disconnected(&self, header: &BlockHeader, height: u32) {
 -              let _persistence_guard = PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock,
 -                      &self.persistence_notifier, || -> NotifyOption { NotifyOption::DoPersist });
 +              let _persistence_guard =
 +                      PersistenceNotifierGuard::optionally_notify_skipping_background_events(
 +                              self, || -> NotifyOption { NotifyOption::DoPersist });
                let new_height = height - 1;
                {
                        let mut best_block = self.best_block.write().unwrap();
@@@ -7250,9 -7296,8 +7373,9 @@@ wher
                let block_hash = header.block_hash();
                log_trace!(self.logger, "{} transactions included in block {} at height {} provided", txdata.len(), block_hash, height);
  
 -              let _persistence_guard = PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock,
 -                      &self.persistence_notifier, || -> NotifyOption { NotifyOption::DoPersist });
 +              let _persistence_guard =
 +                      PersistenceNotifierGuard::optionally_notify_skipping_background_events(
 +                              self, || -> NotifyOption { NotifyOption::DoPersist });
                self.do_chain_event(Some(height), |channel| channel.transactions_confirmed(&block_hash, height, txdata, self.genesis_hash.clone(), &self.node_signer, &self.default_configuration, &self.logger)
                        .map(|(a, b)| (a, Vec::new(), b)));
  
                let block_hash = header.block_hash();
                log_trace!(self.logger, "New best block: {} at height {}", block_hash, height);
  
 -              let _persistence_guard = PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock,
 -                      &self.persistence_notifier, || -> NotifyOption { NotifyOption::DoPersist });
 +              let _persistence_guard =
 +                      PersistenceNotifierGuard::optionally_notify_skipping_background_events(
 +                              self, || -> NotifyOption { NotifyOption::DoPersist });
                *self.best_block.write().unwrap() = BestBlock::new(block_hash, height);
  
                self.do_chain_event(Some(height), |channel| channel.best_block_updated(height, header.time, self.genesis_hash.clone(), &self.node_signer, &self.default_configuration, &self.logger));
        }
  
        fn transaction_unconfirmed(&self, txid: &Txid) {
 -              let _persistence_guard = PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock,
 -                      &self.persistence_notifier, || -> NotifyOption { NotifyOption::DoPersist });
 +              let _persistence_guard =
 +                      PersistenceNotifierGuard::optionally_notify_skipping_background_events(
 +                              self, || -> NotifyOption { NotifyOption::DoPersist });
                self.do_chain_event(None, |channel| {
                        if let Some(funding_txo) = channel.context.get_funding_txo() {
                                if funding_txo.txid == *txid {
@@@ -7501,26 -7544,18 +7624,26 @@@ wher
                }
        }
  
 -      /// Gets a [`Future`] that completes when this [`ChannelManager`] needs to be persisted.
 +      /// Gets a [`Future`] that completes when this [`ChannelManager`] may need to be persisted or
 +      /// may have events that need processing.
 +      ///
 +      /// In order to check if this [`ChannelManager`] needs persisting, call
 +      /// [`Self::get_and_clear_needs_persistence`].
        ///
        /// Note that callbacks registered on the [`Future`] MUST NOT call back into this
        /// [`ChannelManager`] and should instead register actions to be taken later.
 -      ///
 -      pub fn get_persistable_update_future(&self) -> Future {
 -              self.persistence_notifier.get_future()
 +      pub fn get_event_or_persistence_needed_future(&self) -> Future {
 +              self.event_persist_notifier.get_future()
 +      }
 +
 +      /// Returns true if this [`ChannelManager`] needs to be persisted.
 +      pub fn get_and_clear_needs_persistence(&self) -> bool {
 +              self.needs_persist_flag.swap(false, Ordering::AcqRel)
        }
  
        #[cfg(any(test, feature = "_test_utils"))]
 -      pub fn get_persistence_condvar_value(&self) -> bool {
 -              self.persistence_notifier.notify_pending()
 +      pub fn get_event_or_persist_condvar_value(&self) -> bool {
 +              self.event_persist_notifier.notify_pending()
        }
  
        /// Gets the latest best block which was connected either via the [`chain::Listen`] or
@@@ -7577,21 -7612,8 +7700,21 @@@ wher
        L::Target: Logger,
  {
        fn handle_open_channel(&self, counterparty_node_id: &PublicKey, msg: &msgs::OpenChannel) {
 -              let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self);
 -              let _ = handle_error!(self, self.internal_open_channel(counterparty_node_id, msg), *counterparty_node_id);
 +              // Note that we never need to persist the updated ChannelManager for an inbound
 +              // open_channel message - pre-funded channels are never written so there should be no
 +              // change to the contents.
 +              let _persistence_guard = PersistenceNotifierGuard::optionally_notify(self, || {
 +                      let res = self.internal_open_channel(counterparty_node_id, msg);
 +                      let persist = match &res {
 +                              Err(e) if e.closes_channel() => {
 +                                      debug_assert!(false, "We shouldn't close a new channel");
 +                                      NotifyOption::DoPersist
 +                              },
 +                              _ => NotifyOption::SkipPersistHandleEvents,
 +                      };
 +                      let _ = handle_error!(self, res, *counterparty_node_id);
 +                      persist
 +              });
        }
  
        fn handle_open_channel_v2(&self, counterparty_node_id: &PublicKey, msg: &msgs::OpenChannelV2) {
        }
  
        fn handle_accept_channel(&self, counterparty_node_id: &PublicKey, msg: &msgs::AcceptChannel) {
 -              let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self);
 -              let _ = handle_error!(self, self.internal_accept_channel(counterparty_node_id, msg), *counterparty_node_id);
 +              // Note that we never need to persist the updated ChannelManager for an inbound
 +              // accept_channel message - pre-funded channels are never written so there should be no
 +              // change to the contents.
 +              let _persistence_guard = PersistenceNotifierGuard::optionally_notify(self, || {
 +                      let _ = handle_error!(self, self.internal_accept_channel(counterparty_node_id, msg), *counterparty_node_id);
 +                      NotifyOption::SkipPersistHandleEvents
 +              });
        }
  
        fn handle_accept_channel_v2(&self, counterparty_node_id: &PublicKey, msg: &msgs::AcceptChannelV2) {
        }
  
        fn handle_channel_ready(&self, counterparty_node_id: &PublicKey, msg: &msgs::ChannelReady) {
 -              let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self);
 -              let _ = handle_error!(self, self.internal_channel_ready(counterparty_node_id, msg), *counterparty_node_id);
 +              // Note that we never need to persist the updated ChannelManager for an inbound
 +              // channel_ready message - while the channel's state will change, any channel_ready message
 +              // will ultimately be re-sent on startup and the `ChannelMonitor` won't be updated so we
 +              // will not force-close the channel on startup.
 +              let _persistence_guard = PersistenceNotifierGuard::optionally_notify(self, || {
 +                      let res = self.internal_channel_ready(counterparty_node_id, msg);
 +                      let persist = match &res {
 +                              Err(e) if e.closes_channel() => NotifyOption::DoPersist,
 +                              _ => NotifyOption::SkipPersistHandleEvents,
 +                      };
 +                      let _ = handle_error!(self, res, *counterparty_node_id);
 +                      persist
 +              });
        }
  
        fn handle_shutdown(&self, counterparty_node_id: &PublicKey, msg: &msgs::Shutdown) {
        }
  
        fn handle_update_add_htlc(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateAddHTLC) {
 -              let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self);
 -              let _ = handle_error!(self, self.internal_update_add_htlc(counterparty_node_id, msg), *counterparty_node_id);
 +              // Note that we never need to persist the updated ChannelManager for an inbound
 +              // update_add_htlc message - the message itself doesn't change our channel state only the
 +              // `commitment_signed` message afterwards will.
 +              let _persistence_guard = PersistenceNotifierGuard::optionally_notify(self, || {
 +                      let res = self.internal_update_add_htlc(counterparty_node_id, msg);
 +                      let persist = match &res {
 +                              Err(e) if e.closes_channel() => NotifyOption::DoPersist,
 +                              Err(_) => NotifyOption::SkipPersistHandleEvents,
 +                              Ok(()) => NotifyOption::SkipPersistNoEvents,
 +                      };
 +                      let _ = handle_error!(self, res, *counterparty_node_id);
 +                      persist
 +              });
        }
  
        fn handle_update_fulfill_htlc(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateFulfillHTLC) {
        }
  
        fn handle_update_fail_htlc(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateFailHTLC) {
 -              let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self);
 -              let _ = handle_error!(self, self.internal_update_fail_htlc(counterparty_node_id, msg), *counterparty_node_id);
 +              // Note that we never need to persist the updated ChannelManager for an inbound
 +              // update_fail_htlc message - the message itself doesn't change our channel state only the
 +              // `commitment_signed` message afterwards will.
 +              let _persistence_guard = PersistenceNotifierGuard::optionally_notify(self, || {
 +                      let res = self.internal_update_fail_htlc(counterparty_node_id, msg);
 +                      let persist = match &res {
 +                              Err(e) if e.closes_channel() => NotifyOption::DoPersist,
 +                              Err(_) => NotifyOption::SkipPersistHandleEvents,
 +                              Ok(()) => NotifyOption::SkipPersistNoEvents,
 +                      };
 +                      let _ = handle_error!(self, res, *counterparty_node_id);
 +                      persist
 +              });
        }
  
        fn handle_update_fail_malformed_htlc(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateFailMalformedHTLC) {
 -              let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self);
 -              let _ = handle_error!(self, self.internal_update_fail_malformed_htlc(counterparty_node_id, msg), *counterparty_node_id);
 +              // Note that we never need to persist the updated ChannelManager for an inbound
 +              // update_fail_malformed_htlc message - the message itself doesn't change our channel state
 +              // only the `commitment_signed` message afterwards will.
 +              let _persistence_guard = PersistenceNotifierGuard::optionally_notify(self, || {
 +                      let res = self.internal_update_fail_malformed_htlc(counterparty_node_id, msg);
 +                      let persist = match &res {
 +                              Err(e) if e.closes_channel() => NotifyOption::DoPersist,
 +                              Err(_) => NotifyOption::SkipPersistHandleEvents,
 +                              Ok(()) => NotifyOption::SkipPersistNoEvents,
 +                      };
 +                      let _ = handle_error!(self, res, *counterparty_node_id);
 +                      persist
 +              });
        }
  
        fn handle_commitment_signed(&self, counterparty_node_id: &PublicKey, msg: &msgs::CommitmentSigned) {
        }
  
        fn handle_update_fee(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateFee) {
 -              let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self);
 -              let _ = handle_error!(self, self.internal_update_fee(counterparty_node_id, msg), *counterparty_node_id);
 +              // Note that we never need to persist the updated ChannelManager for an inbound
 +              // update_fee message - the message itself doesn't change our channel state only the
 +              // `commitment_signed` message afterwards will.
 +              let _persistence_guard = PersistenceNotifierGuard::optionally_notify(self, || {
 +                      let res = self.internal_update_fee(counterparty_node_id, msg);
 +                      let persist = match &res {
 +                              Err(e) if e.closes_channel() => NotifyOption::DoPersist,
 +                              Err(_) => NotifyOption::SkipPersistHandleEvents,
 +                              Ok(()) => NotifyOption::SkipPersistNoEvents,
 +                      };
 +                      let _ = handle_error!(self, res, *counterparty_node_id);
 +                      persist
 +              });
        }
  
        fn handle_announcement_signatures(&self, counterparty_node_id: &PublicKey, msg: &msgs::AnnouncementSignatures) {
        }
  
        fn handle_channel_update(&self, counterparty_node_id: &PublicKey, msg: &msgs::ChannelUpdate) {
 -              PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, &self.persistence_notifier, || {
 -                      let force_persist = self.process_background_events();
 +              PersistenceNotifierGuard::optionally_notify(self, || {
                        if let Ok(persist) = handle_error!(self, self.internal_channel_update(counterparty_node_id, msg), *counterparty_node_id) {
 -                              if force_persist == NotifyOption::DoPersist { NotifyOption::DoPersist } else { persist }
 +                              persist
                        } else {
 -                              NotifyOption::SkipPersist
 +                              NotifyOption::DoPersist
                        }
                });
        }
  
        fn handle_channel_reestablish(&self, counterparty_node_id: &PublicKey, msg: &msgs::ChannelReestablish) {
 -              let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self);
 -              let _ = handle_error!(self, self.internal_channel_reestablish(counterparty_node_id, msg), *counterparty_node_id);
 +              let _persistence_guard = PersistenceNotifierGuard::optionally_notify(self, || {
 +                      let res = self.internal_channel_reestablish(counterparty_node_id, msg);
 +                      let persist = match &res {
 +                              Err(e) if e.closes_channel() => NotifyOption::DoPersist,
 +                              Err(_) => NotifyOption::SkipPersistHandleEvents,
 +                              Ok(persist) => *persist,
 +                      };
 +                      let _ = handle_error!(self, res, *counterparty_node_id);
 +                      persist
 +              });
        }
  
        fn peer_disconnected(&self, counterparty_node_id: &PublicKey) {
 -              let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self);
 +              let _persistence_guard = PersistenceNotifierGuard::optionally_notify(
 +                      self, || NotifyOption::SkipPersistHandleEvents);
 +
                let mut failed_channels = Vec::new();
                let mut per_peer_state = self.per_peer_state.write().unwrap();
                let remove_peer = {
                        return Err(());
                }
  
 -              let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self);
 +              let mut res = Ok(());
  
 -              // If we have too many peers connected which don't have funded channels, disconnect the
 -              // peer immediately (as long as it doesn't have funded channels). If we have a bunch of
 -              // unfunded channels taking up space in memory for disconnected peers, we still let new
 -              // peers connect, but we'll reject new channels from them.
 -              let connected_peers_without_funded_channels = self.peers_without_funded_channels(|node| node.is_connected);
 -              let inbound_peer_limited = inbound && connected_peers_without_funded_channels >= MAX_NO_CHANNEL_PEERS;
 +              PersistenceNotifierGuard::optionally_notify(self, || {
 +                      // If we have too many peers connected which don't have funded channels, disconnect the
 +                      // peer immediately (as long as it doesn't have funded channels). If we have a bunch of
 +                      // unfunded channels taking up space in memory for disconnected peers, we still let new
 +                      // peers connect, but we'll reject new channels from them.
 +                      let connected_peers_without_funded_channels = self.peers_without_funded_channels(|node| node.is_connected);
 +                      let inbound_peer_limited = inbound && connected_peers_without_funded_channels >= MAX_NO_CHANNEL_PEERS;
  
 -              {
 -                      let mut peer_state_lock = self.per_peer_state.write().unwrap();
 -                      match peer_state_lock.entry(counterparty_node_id.clone()) {
 -                              hash_map::Entry::Vacant(e) => {
 -                                      if inbound_peer_limited {
 -                                              return Err(());
 -                                      }
 -                                      e.insert(Mutex::new(PeerState {
 -                                              channel_by_id: HashMap::new(),
 -                                              inbound_channel_request_by_id: HashMap::new(),
 -                                              latest_features: init_msg.features.clone(),
 -                                              pending_msg_events: Vec::new(),
 -                                              in_flight_monitor_updates: BTreeMap::new(),
 -                                              monitor_update_blocked_actions: BTreeMap::new(),
 -                                              actions_blocking_raa_monitor_updates: BTreeMap::new(),
 -                                              is_connected: true,
 -                                      }));
 -                              },
 -                              hash_map::Entry::Occupied(e) => {
 -                                      let mut peer_state = e.get().lock().unwrap();
 -                                      peer_state.latest_features = init_msg.features.clone();
 -
 -                                      let best_block_height = self.best_block.read().unwrap().height();
 -                                      if inbound_peer_limited &&
 -                                              Self::unfunded_channel_count(&*peer_state, best_block_height) ==
 -                                              peer_state.channel_by_id.len()
 -                                      {
 -                                              return Err(());
 -                                      }
 +                      {
 +                              let mut peer_state_lock = self.per_peer_state.write().unwrap();
 +                              match peer_state_lock.entry(counterparty_node_id.clone()) {
 +                                      hash_map::Entry::Vacant(e) => {
 +                                              if inbound_peer_limited {
 +                                                      res = Err(());
 +                                                      return NotifyOption::SkipPersistNoEvents;
 +                                              }
 +                                              e.insert(Mutex::new(PeerState {
 +                                                      channel_by_id: HashMap::new(),
 +                                                      inbound_channel_request_by_id: HashMap::new(),
 +                                                      latest_features: init_msg.features.clone(),
 +                                                      pending_msg_events: Vec::new(),
 +                                                      in_flight_monitor_updates: BTreeMap::new(),
 +                                                      monitor_update_blocked_actions: BTreeMap::new(),
 +                                                      actions_blocking_raa_monitor_updates: BTreeMap::new(),
 +                                                      is_connected: true,
 +                                              }));
 +                                      },
 +                                      hash_map::Entry::Occupied(e) => {
 +                                              let mut peer_state = e.get().lock().unwrap();
 +                                              peer_state.latest_features = init_msg.features.clone();
 +
 +                                              let best_block_height = self.best_block.read().unwrap().height();
 +                                              if inbound_peer_limited &&
 +                                                      Self::unfunded_channel_count(&*peer_state, best_block_height) ==
 +                                                      peer_state.channel_by_id.len()
 +                                              {
 +                                                      res = Err(());
 +                                                      return NotifyOption::SkipPersistNoEvents;
 +                                              }
  
 -                                      debug_assert!(!peer_state.is_connected, "A peer shouldn't be connected twice");
 -                                      peer_state.is_connected = true;
 -                              },
 +                                              debug_assert!(!peer_state.is_connected, "A peer shouldn't be connected twice");
 +                                              peer_state.is_connected = true;
 +                                      },
 +                              }
                        }
 -              }
  
 -              log_debug!(self.logger, "Generating channel_reestablish events for {}", log_pubkey!(counterparty_node_id));
 +                      log_debug!(self.logger, "Generating channel_reestablish events for {}", log_pubkey!(counterparty_node_id));
  
 -              let per_peer_state = self.per_peer_state.read().unwrap();
 -              if let Some(peer_state_mutex) = per_peer_state.get(counterparty_node_id) {
 -                      let mut peer_state_lock = peer_state_mutex.lock().unwrap();
 -                      let peer_state = &mut *peer_state_lock;
 -                      let pending_msg_events = &mut peer_state.pending_msg_events;
 +                      let per_peer_state = self.per_peer_state.read().unwrap();
 +                      if let Some(peer_state_mutex) = per_peer_state.get(counterparty_node_id) {
 +                              let mut peer_state_lock = peer_state_mutex.lock().unwrap();
 +                              let peer_state = &mut *peer_state_lock;
 +                              let pending_msg_events = &mut peer_state.pending_msg_events;
  
 -                      peer_state.channel_by_id.iter_mut().filter_map(|(_, phase)|
 -                              if let ChannelPhase::Funded(chan) = phase { Some(chan) } else {
 -                                      // Since unfunded channel maps are cleared upon disconnecting a peer, and they're not persisted
 -                                      // (so won't be recovered after a crash), they shouldn't exist here and we would never need to
 -                                      // worry about closing and removing them.
 -                                      debug_assert!(false);
 -                                      None
 -                              }
 -                      ).for_each(|chan| {
 -                              pending_msg_events.push(events::MessageSendEvent::SendChannelReestablish {
 -                                      node_id: chan.context.get_counterparty_node_id(),
 -                                      msg: chan.get_channel_reestablish(&self.logger),
 +                              peer_state.channel_by_id.iter_mut().filter_map(|(_, phase)|
 +                                      if let ChannelPhase::Funded(chan) = phase { Some(chan) } else {
 +                                              // Since unfunded channel maps are cleared upon disconnecting a peer, and they're not persisted
 +                                              // (so won't be recovered after a crash), they shouldn't exist here and we would never need to
 +                                              // worry about closing and removing them.
 +                                              debug_assert!(false);
 +                                              None
 +                                      }
 +                              ).for_each(|chan| {
 +                                      pending_msg_events.push(events::MessageSendEvent::SendChannelReestablish {
 +                                              node_id: chan.context.get_counterparty_node_id(),
 +                                              msg: chan.get_channel_reestablish(&self.logger),
 +                                      });
                                });
 -                      });
 -              }
 -              //TODO: Also re-broadcast announcement_signatures
 -              Ok(())
 +                      }
 +
 +                      return NotifyOption::SkipPersistHandleEvents;
 +                      //TODO: Also re-broadcast announcement_signatures
 +              });
 +              res
        }
  
        fn handle_error(&self, counterparty_node_id: &PublicKey, msg: &msgs::ErrorMessage) {
@@@ -9780,9 -9727,7 +9903,9 @@@ wher
                        pending_background_events: Mutex::new(pending_background_events),
                        total_consistency_lock: RwLock::new(()),
                        background_events_processed_since_startup: AtomicBool::new(false),
 -                      persistence_notifier: Notifier::new(),
 +
 +                      event_persist_notifier: Notifier::new(),
 +                      needs_persist_flag: AtomicBool::new(false),
  
                        entropy_source: args.entropy_source,
                        node_signer: args.node_signer,
@@@ -9844,9 -9789,9 +9967,9 @@@ mod tests 
  
                // All nodes start with a persistable update pending as `create_network` connects each node
                // with all other nodes to make most tests simpler.
 -              assert!(nodes[0].node.get_persistable_update_future().poll_is_complete());
 -              assert!(nodes[1].node.get_persistable_update_future().poll_is_complete());
 -              assert!(nodes[2].node.get_persistable_update_future().poll_is_complete());
 +              assert!(nodes[0].node.get_event_or_persistence_needed_future().poll_is_complete());
 +              assert!(nodes[1].node.get_event_or_persistence_needed_future().poll_is_complete());
 +              assert!(nodes[2].node.get_event_or_persistence_needed_future().poll_is_complete());
  
                let mut chan = create_announced_chan_between_nodes(&nodes, 0, 1);
  
                        &nodes[0].node.get_our_node_id()).pop().unwrap();
  
                // The first two nodes (which opened a channel) should now require fresh persistence
 -              assert!(nodes[0].node.get_persistable_update_future().poll_is_complete());
 -              assert!(nodes[1].node.get_persistable_update_future().poll_is_complete());
 +              assert!(nodes[0].node.get_event_or_persistence_needed_future().poll_is_complete());
 +              assert!(nodes[1].node.get_event_or_persistence_needed_future().poll_is_complete());
                // ... but the last node should not.
 -              assert!(!nodes[2].node.get_persistable_update_future().poll_is_complete());
 +              assert!(!nodes[2].node.get_event_or_persistence_needed_future().poll_is_complete());
                // After persisting the first two nodes they should no longer need fresh persistence.
 -              assert!(!nodes[0].node.get_persistable_update_future().poll_is_complete());
 -              assert!(!nodes[1].node.get_persistable_update_future().poll_is_complete());
 +              assert!(!nodes[0].node.get_event_or_persistence_needed_future().poll_is_complete());
 +              assert!(!nodes[1].node.get_event_or_persistence_needed_future().poll_is_complete());
  
                // Node 3, unrelated to the only channel, shouldn't care if it receives a channel_update
                // about the channel.
                nodes[2].node.handle_channel_update(&nodes[1].node.get_our_node_id(), &chan.0);
                nodes[2].node.handle_channel_update(&nodes[1].node.get_our_node_id(), &chan.1);
 -              assert!(!nodes[2].node.get_persistable_update_future().poll_is_complete());
 +              assert!(!nodes[2].node.get_event_or_persistence_needed_future().poll_is_complete());
  
                // The nodes which are a party to the channel should also ignore messages from unrelated
                // parties.
                nodes[0].node.handle_channel_update(&nodes[2].node.get_our_node_id(), &chan.1);
                nodes[1].node.handle_channel_update(&nodes[2].node.get_our_node_id(), &chan.0);
                nodes[1].node.handle_channel_update(&nodes[2].node.get_our_node_id(), &chan.1);
 -              assert!(!nodes[0].node.get_persistable_update_future().poll_is_complete());
 -              assert!(!nodes[1].node.get_persistable_update_future().poll_is_complete());
 +              assert!(!nodes[0].node.get_event_or_persistence_needed_future().poll_is_complete());
 +              assert!(!nodes[1].node.get_event_or_persistence_needed_future().poll_is_complete());
  
                // At this point the channel info given by peers should still be the same.
                assert_eq!(nodes[0].node.list_channels()[0], node_a_chan_info);
                // persisted and that its channel info remains the same.
                nodes[0].node.handle_channel_update(&nodes[1].node.get_our_node_id(), &as_update);
                nodes[1].node.handle_channel_update(&nodes[0].node.get_our_node_id(), &bs_update);
 -              assert!(!nodes[0].node.get_persistable_update_future().poll_is_complete());
 -              assert!(!nodes[1].node.get_persistable_update_future().poll_is_complete());
 +              assert!(!nodes[0].node.get_event_or_persistence_needed_future().poll_is_complete());
 +              assert!(!nodes[1].node.get_event_or_persistence_needed_future().poll_is_complete());
                assert_eq!(nodes[0].node.list_channels()[0], node_a_chan_info);
                assert_eq!(nodes[1].node.list_channels()[0], node_b_chan_info);
  
                // the channel info has updated.
                nodes[0].node.handle_channel_update(&nodes[1].node.get_our_node_id(), &bs_update);
                nodes[1].node.handle_channel_update(&nodes[0].node.get_our_node_id(), &as_update);
 -              assert!(nodes[0].node.get_persistable_update_future().poll_is_complete());
 -              assert!(nodes[1].node.get_persistable_update_future().poll_is_complete());
 +              assert!(nodes[0].node.get_event_or_persistence_needed_future().poll_is_complete());
 +              assert!(nodes[1].node.get_event_or_persistence_needed_future().poll_is_complete());
                assert_ne!(nodes[0].node.list_channels()[0], node_a_chan_info);
                assert_ne!(nodes[1].node.list_channels()[0], node_b_chan_info);
        }
index 666221b2dd47fbe400b506d7602435905f9a2010,9b6babe345b9f18d05479c735d3e218a309a2f30..52cd3ca96d94760f71fcc1d354dc60e79360f427
@@@ -12,8 -12,7 +12,8 @@@ use crate::ln::channelmanager::{HTLCSou
  use crate::ln::msgs;
  use crate::ln::wire::Encode;
  use crate::routing::gossip::NetworkUpdate;
 -use crate::routing::router::{Path, RouteHop};
 +use crate::routing::router::{BlindedTail, Path, RouteHop};
 +use crate::sign::NodeSigner;
  use crate::util::chacha20::{ChaCha20, ChaChaReader};
  use crate::util::errors::{self, APIError};
  use crate::util::ser::{Readable, ReadableArgs, Writeable, Writer, LengthCalculatingWriter};
@@@ -170,9 -169,7 +170,9 @@@ pub(super) fn build_onion_payloads(path
        let mut cur_value_msat = 0u64;
        let mut cur_cltv = starting_htlc_offset;
        let mut last_short_channel_id = 0;
 -      let mut res: Vec<msgs::OutboundOnionPayload> = Vec::with_capacity(path.hops.len());
 +      let mut res: Vec<msgs::OutboundOnionPayload> = Vec::with_capacity(
 +              path.hops.len() + path.blinded_tail.as_ref().map_or(0, |t| t.hops.len())
 +      );
  
        for (idx, hop) in path.hops.iter().rev().enumerate() {
                // First hop gets special values so that it can check, on receipt, that everything is
                // the intended recipient).
                let value_msat = if cur_value_msat == 0 { hop.fee_msat } else { cur_value_msat };
                let cltv = if cur_cltv == starting_htlc_offset { hop.cltv_expiry_delta + starting_htlc_offset } else { cur_cltv };
 -              res.insert(0, if idx == 0 {
 -                      msgs::OutboundOnionPayload::Receive {
 -                              payment_data: if let Some(secret) = recipient_onion.payment_secret.take() {
 -                                      Some(msgs::FinalOnionHopData {
 -                                              payment_secret: secret,
 -                                              total_msat,
 -                                      })
 -                              } else { None },
 -                              payment_metadata: recipient_onion.payment_metadata.take(),
 -                              keysend_preimage: *keysend_preimage,
 -                              custom_tlvs: recipient_onion.custom_tlvs.clone(),
 -                              amt_msat: value_msat,
 -                              outgoing_cltv_value: cltv,
 +              if idx == 0 {
 +                      if let Some(BlindedTail {
 +                              blinding_point, hops, final_value_msat, excess_final_cltv_expiry_delta, ..
 +                      }) = &path.blinded_tail {
 +                              let mut blinding_point = Some(*blinding_point);
 +                              for (i, blinded_hop) in hops.iter().enumerate() {
 +                                      if i == hops.len() - 1 {
 +                                              cur_value_msat += final_value_msat;
 +                                              cur_cltv += excess_final_cltv_expiry_delta;
 +                                              res.push(msgs::OutboundOnionPayload::BlindedReceive {
 +                                                      amt_msat: *final_value_msat,
 +                                                      total_msat,
 +                                                      outgoing_cltv_value: cltv,
 +                                                      encrypted_tlvs: blinded_hop.encrypted_payload.clone(),
 +                                                      intro_node_blinding_point: blinding_point.take(),
 +                                              });
 +                                      } else {
 +                                              res.push(msgs::OutboundOnionPayload::BlindedForward {
 +                                                      encrypted_tlvs: blinded_hop.encrypted_payload.clone(),
 +                                                      intro_node_blinding_point: blinding_point.take(),
 +                                              });
 +                                      }
 +                              }
 +                      } else {
 +                              res.push(msgs::OutboundOnionPayload::Receive {
 +                                      payment_data: if let Some(secret) = recipient_onion.payment_secret.take() {
 +                                              Some(msgs::FinalOnionHopData {
 +                                                      payment_secret: secret,
 +                                                      total_msat,
 +                                              })
 +                                      } else { None },
 +                                      payment_metadata: recipient_onion.payment_metadata.take(),
 +                                      keysend_preimage: *keysend_preimage,
 +                                      custom_tlvs: recipient_onion.custom_tlvs.clone(),
 +                                      amt_msat: value_msat,
 +                                      outgoing_cltv_value: cltv,
 +                              });
                        }
                } else {
 -                      msgs::OutboundOnionPayload::Forward {
 +                      res.insert(0, msgs::OutboundOnionPayload::Forward {
                                short_channel_id: last_short_channel_id,
                                amt_to_forward: value_msat,
                                outgoing_cltv_value: cltv,
 -                      }
 -              });
 +                      });
 +              }
                cur_value_msat += hop.fee_msat;
                if cur_value_msat >= 21000000 * 100000000 * 1000 {
                        return Err(APIError::InvalidRoute{err: "Channel fees overflowed?".to_owned()});
@@@ -886,11 -859,8 +886,11 @@@ pub(crate) enum OnionDecodeErr 
        },
  }
  
 -pub(crate) fn decode_next_payment_hop(shared_secret: [u8; 32], hop_data: &[u8], hmac_bytes: [u8; 32], payment_hash: PaymentHash) -> Result<Hop, OnionDecodeErr> {
 -      match decode_next_hop(shared_secret, hop_data, hmac_bytes, Some(payment_hash), ()) {
 +pub(crate) fn decode_next_payment_hop<NS: Deref>(
 +      shared_secret: [u8; 32], hop_data: &[u8], hmac_bytes: [u8; 32], payment_hash: PaymentHash,
 +      node_signer: &NS,
 +) -> Result<Hop, OnionDecodeErr> where NS::Target: NodeSigner {
 +      match decode_next_hop(shared_secret, hop_data, hmac_bytes, Some(payment_hash), node_signer) {
                Ok((next_hop_data, None)) => Ok(Hop::Receive(next_hop_data)),
                Ok((next_hop_data, Some((next_hop_hmac, FixedSizeOnionPacket(new_packet_bytes))))) => {
                        Ok(Hop::Forward {
@@@ -1014,27 -984,27 +1014,27 @@@ mod tests 
                                        RouteHop {
                                                pubkey: PublicKey::from_slice(&hex::decode("02eec7245d6b7d2ccb30380bfbe2a3648cd7a942653f5aa340edcea1f283686619").unwrap()[..]).unwrap(),
                                                channel_features: ChannelFeatures::empty(), node_features: NodeFeatures::empty(),
-                                               short_channel_id: 0, fee_msat: 0, cltv_expiry_delta: 0 // We fill in the payloads manually instead of generating them from RouteHops.
+                                               short_channel_id: 0, fee_msat: 0, cltv_expiry_delta: 0, maybe_announced_channel: true, // We fill in the payloads manually instead of generating them from RouteHops.
                                        },
                                        RouteHop {
                                                pubkey: PublicKey::from_slice(&hex::decode("0324653eac434488002cc06bbfb7f10fe18991e35f9fe4302dbea6d2353dc0ab1c").unwrap()[..]).unwrap(),
                                                channel_features: ChannelFeatures::empty(), node_features: NodeFeatures::empty(),
-                                               short_channel_id: 0, fee_msat: 0, cltv_expiry_delta: 0 // We fill in the payloads manually instead of generating them from RouteHops.
+                                               short_channel_id: 0, fee_msat: 0, cltv_expiry_delta: 0, maybe_announced_channel: true, // We fill in the payloads manually instead of generating them from RouteHops.
                                        },
                                        RouteHop {
                                                pubkey: PublicKey::from_slice(&hex::decode("027f31ebc5462c1fdce1b737ecff52d37d75dea43ce11c74d25aa297165faa2007").unwrap()[..]).unwrap(),
                                                channel_features: ChannelFeatures::empty(), node_features: NodeFeatures::empty(),
-                                               short_channel_id: 0, fee_msat: 0, cltv_expiry_delta: 0 // We fill in the payloads manually instead of generating them from RouteHops.
+                                               short_channel_id: 0, fee_msat: 0, cltv_expiry_delta: 0, maybe_announced_channel: true, // We fill in the payloads manually instead of generating them from RouteHops.
                                        },
                                        RouteHop {
                                                pubkey: PublicKey::from_slice(&hex::decode("032c0b7cf95324a07d05398b240174dc0c2be444d96b159aa6c7f7b1e668680991").unwrap()[..]).unwrap(),
                                                channel_features: ChannelFeatures::empty(), node_features: NodeFeatures::empty(),
-                                               short_channel_id: 0, fee_msat: 0, cltv_expiry_delta: 0 // We fill in the payloads manually instead of generating them from RouteHops.
+                                               short_channel_id: 0, fee_msat: 0, cltv_expiry_delta: 0, maybe_announced_channel: true, // We fill in the payloads manually instead of generating them from RouteHops.
                                        },
                                        RouteHop {
                                                pubkey: PublicKey::from_slice(&hex::decode("02edabbd16b41c8371b92ef2f04c1185b4f03b6dcd52ba9b78d9d7c89c8f221145").unwrap()[..]).unwrap(),
                                                channel_features: ChannelFeatures::empty(), node_features: NodeFeatures::empty(),
-                                               short_channel_id: 0, fee_msat: 0, cltv_expiry_delta: 0 // We fill in the payloads manually instead of generating them from RouteHops.
+                                               short_channel_id: 0, fee_msat: 0, cltv_expiry_delta: 0, maybe_announced_channel: true, // We fill in the payloads manually instead of generating them from RouteHops.
                                        },
                        ], blinded_tail: None }],
                        route_params: None,
index 5ea772e5d4ffbfb0b82c9fc19e014781e3219aa0,583c54056fbe3f547bb05f91783ccab3ee8c39d4..023412e1afb56cdcdd0a2d9195064e5744ccab6c
@@@ -391,7 -391,7 +391,7 @@@ pub enum RetryableSendFailure 
  /// is in, see the description of individual enum states for more.
  ///
  /// [`ChannelManager::send_payment_with_route`]: crate::ln::channelmanager::ChannelManager::send_payment_with_route
- #[derive(Clone, Debug)]
+ #[derive(Clone, Debug, PartialEq, Eq)]
  pub enum PaymentSendFailure {
        /// A parameter which was passed to send_payment was invalid, preventing us from attempting to
        /// send the payment at all.
@@@ -465,6 -465,18 +465,18 @@@ pub(super) enum Bolt12PaymentError 
        DuplicateInvoice,
  }
  
+ /// Indicates that we failed to send a payment probe. Further errors may be surfaced later via
+ /// [`Event::ProbeFailed`].
+ ///
+ /// [`Event::ProbeFailed`]: crate::events::Event::ProbeFailed
+ #[derive(Clone, Debug, PartialEq, Eq)]
+ pub enum ProbeSendFailure {
+       /// We were unable to find a route to the destination.
+       RouteNotFound,
+       /// We failed to send the payment probes.
+       SendingFailed(PaymentSendFailure),
+ }
  /// Information which is provided, encrypted, to the payment recipient when sending HTLCs.
  ///
  /// This should generally be constructed with data communicated to us from the recipient (via a
@@@ -1103,6 -1115,7 +1115,7 @@@ impl OutboundPayments 
                F: Fn(SendAlongPathArgs) -> Result<(), APIError>,
        {
                let payment_id = PaymentId(entropy_source.get_secure_random_bytes());
+               let payment_secret = PaymentSecret(entropy_source.get_secure_random_bytes());
  
                let payment_hash = probing_cookie_from_id(&payment_id, probing_cookie_secret);
  
  
                let route = Route { paths: vec![path], route_params: None };
                let onion_session_privs = self.add_new_pending_payment(payment_hash,
-                       RecipientOnionFields::spontaneous_empty(), payment_id, None, &route, None, None,
+                       RecipientOnionFields::secret_only(payment_secret), payment_id, None, &route, None, None,
                        entropy_source, best_block_height)?;
  
                match self.pay_route_internal(&route, payment_hash, RecipientOnionFields::spontaneous_empty(),
                if route.paths.len() < 1 {
                        return Err(PaymentSendFailure::ParameterError(APIError::InvalidRoute{err: "There must be at least one path to send over".to_owned()}));
                }
 -              if recipient_onion.payment_secret.is_none() && route.paths.len() > 1 {
 +              if recipient_onion.payment_secret.is_none() && route.paths.len() > 1
 +                      && !route.paths.iter().any(|p| p.blinded_tail.is_some())
 +              {
                        return Err(PaymentSendFailure::ParameterError(APIError::APIMisuseError{err: "Payment secret is required for multi-path payments".to_owned()}));
                }
                let mut total_value = 0;
                                path_errs.push(Err(APIError::InvalidRoute{err: "Path didn't go anywhere/had bogus size".to_owned()}));
                                continue 'path_check;
                        }
 -                      if path.blinded_tail.is_some() {
 -                              path_errs.push(Err(APIError::InvalidRoute{err: "Sending to blinded paths isn't supported yet".to_owned()}));
 -                              continue 'path_check;
 -                      }
                        let dest_hop_idx = if path.blinded_tail.is_some() && path.blinded_tail.as_ref().unwrap().hops.len() > 1 {
                                usize::max_value() } else { path.hops.len() - 1 };
                        for (idx, hop) in path.hops.iter().enumerate() {
@@@ -1850,6 -1865,7 +1863,7 @@@ mod tests 
                                channel_features: ChannelFeatures::empty(),
                                fee_msat: 0,
                                cltv_expiry_delta: 0,
+                               maybe_announced_channel: true,
                        }], blinded_tail: None }],
                        route_params: Some(route_params.clone()),
                };
                                                                channel_features: ChannelFeatures::empty(),
                                                                fee_msat: invoice.amount_msats(),
                                                                cltv_expiry_delta: 0,
+                                                               maybe_announced_channel: true,
                                                        }
                                                ],
                                                blinded_tail: None,
index 26d555819d35c15e2f8aff22b0667d5e6ba49500,c4f31db9471039581d028888725de76c999c7ac4..748edd31eaed679af2e6c5d767b9abd8139c25ec
@@@ -773,36 -773,21 +773,36 @@@ impl<G: Deref<Target = NetworkGraph<L>>
                                                let amt = directed_info.effective_capacity().as_msat();
                                                let dir_liq = liq.as_directed(source, target, 0, amt, self.decay_params);
  
 -                                              let (min_buckets, max_buckets, _) = dir_liq.liquidity_history
 +                                              let (min_buckets, max_buckets) = dir_liq.liquidity_history
                                                        .get_decayed_buckets(now, *dir_liq.last_updated,
 -                                                              self.decay_params.historical_no_updates_half_life);
 +                                                              self.decay_params.historical_no_updates_half_life)
 +                                                      .unwrap_or(([0; 32], [0; 32]));
  
                                                log_debug!(self.logger, core::concat!(
                                                        "Liquidity from {} to {} via {} is in the range ({}, {}).\n",
 -                                                      "\tHistorical min liquidity octile relative probabilities: {} {} {} {} {} {} {} {}\n",
 -                                                      "\tHistorical max liquidity octile relative probabilities: {} {} {} {} {} {} {} {}"),
 +                                                      "\tHistorical min liquidity bucket relative probabilities:\n",
 +                                                      "\t\t{} {} {} {} {} {} {} {} {} {} {} {} {} {} {} {} {} {} {} {} {} {} {} {} {} {} {} {} {} {} {} {}\n",
 +                                                      "\tHistorical max liquidity bucket relative probabilities:\n",
 +                                                      "\t\t{} {} {} {} {} {} {} {} {} {} {} {} {} {} {} {} {} {} {} {} {} {} {} {} {} {} {} {} {} {} {} {}"),
                                                        source, target, scid, dir_liq.min_liquidity_msat(), dir_liq.max_liquidity_msat(),
 -                                                      min_buckets[0], min_buckets[1], min_buckets[2], min_buckets[3],
 -                                                      min_buckets[4], min_buckets[5], min_buckets[6], min_buckets[7],
 +                                                      min_buckets[ 0], min_buckets[ 1], min_buckets[ 2], min_buckets[ 3],
 +                                                      min_buckets[ 4], min_buckets[ 5], min_buckets[ 6], min_buckets[ 7],
 +                                                      min_buckets[ 8], min_buckets[ 9], min_buckets[10], min_buckets[11],
 +                                                      min_buckets[12], min_buckets[13], min_buckets[14], min_buckets[15],
 +                                                      min_buckets[16], min_buckets[17], min_buckets[18], min_buckets[19],
 +                                                      min_buckets[20], min_buckets[21], min_buckets[22], min_buckets[23],
 +                                                      min_buckets[24], min_buckets[25], min_buckets[26], min_buckets[27],
 +                                                      min_buckets[28], min_buckets[29], min_buckets[30], min_buckets[31],
                                                        // Note that the liquidity buckets are an offset from the edge, so we
                                                        // inverse the max order to get the probabilities from zero.
 -                                                      max_buckets[7], max_buckets[6], max_buckets[5], max_buckets[4],
 -                                                      max_buckets[3], max_buckets[2], max_buckets[1], max_buckets[0]);
 +                                                      max_buckets[31], max_buckets[30], max_buckets[29], max_buckets[28],
 +                                                      max_buckets[27], max_buckets[26], max_buckets[25], max_buckets[24],
 +                                                      max_buckets[23], max_buckets[22], max_buckets[21], max_buckets[20],
 +                                                      max_buckets[19], max_buckets[18], max_buckets[17], max_buckets[16],
 +                                                      max_buckets[15], max_buckets[14], max_buckets[13], max_buckets[12],
 +                                                      max_buckets[11], max_buckets[10], max_buckets[ 9], max_buckets[ 8],
 +                                                      max_buckets[ 7], max_buckets[ 6], max_buckets[ 5], max_buckets[ 4],
 +                                                      max_buckets[ 3], max_buckets[ 2], max_buckets[ 1], max_buckets[ 0]);
                                        } else {
                                                log_debug!(self.logger, "No amount known for SCID {} from {:?} to {:?}", scid, source, target);
                                        }
        /// Query the historical estimated minimum and maximum liquidity available for sending a
        /// payment over the channel with `scid` towards the given `target` node.
        ///
 -      /// Returns two sets of 8 buckets. The first set describes the octiles for lower-bound
 -      /// liquidity estimates, the second set describes the octiles for upper-bound liquidity
 -      /// estimates. Each bucket describes the relative frequency at which we've seen a liquidity
 -      /// bound in the octile relative to the channel's total capacity, on an arbitrary scale.
 -      /// Because the values are slowly decayed, more recent data points are weighted more heavily
 -      /// than older datapoints.
 +      /// Returns two sets of 32 buckets. The first set describes the lower-bound liquidity history,
 +      /// the second set describes the upper-bound liquidity history. Each bucket describes the
 +      /// relative frequency at which we've seen a liquidity bound in the bucket's range relative to
 +      /// the channel's total capacity, on an arbitrary scale. Because the values are slowly decayed,
 +      /// more recent data points are weighted more heavily than older datapoints.
        ///
 -      /// When scoring, the estimated probability that an upper-/lower-bound lies in a given octile
 -      /// relative to the channel's total capacity is calculated by dividing that bucket's value with
 -      /// the total of all buckets for the given bound.
 +      /// Note that the range of each bucket varies by its location to provide more granular results
 +      /// at the edges of a channel's capacity, where it is more likely to sit.
        ///
 -      /// For example, a value of `[0, 0, 0, 0, 0, 0, 32]` indicates that we believe the probability
 -      /// of a bound being in the top octile to be 100%, and have never (recently) seen it in any
 -      /// other octiles. A value of `[31, 0, 0, 0, 0, 0, 0, 32]` indicates we've seen the bound being
 -      /// both in the top and bottom octile, and roughly with similar (recent) frequency.
 +      /// When scoring, the estimated probability that an upper-/lower-bound lies in a given bucket
 +      /// is calculated by dividing that bucket's value with the total value of all buckets.
 +      ///
 +      /// For example, using a lower bucket count for illustrative purposes, a value of
 +      /// `[0, 0, 0, ..., 0, 32]` indicates that we believe the probability of a bound being very
 +      /// close to the channel's capacity to be 100%, and have never (recently) seen it in any other
 +      /// bucket. A value of `[31, 0, 0, ..., 0, 0, 32]` indicates we've seen the bound being both
 +      /// in the top and bottom bucket, and roughly with similar (recent) frequency.
        ///
        /// Because the datapoints are decayed slowly over time, values will eventually return to
 -      /// `Some(([0; 8], [0; 8]))`.
 +      /// `Some(([1; 32], [1; 32]))` and then to `None` once no datapoints remain.
        ///
        /// In order to fetch a single success probability from the buckets provided here, as used in
        /// the scoring model, see [`Self::historical_estimated_payment_success_probability`].
        pub fn historical_estimated_channel_liquidity_probabilities(&self, scid: u64, target: &NodeId)
 -      -> Option<([u16; 8], [u16; 8])> {
 +      -> Option<([u16; 32], [u16; 32])> {
                let graph = self.network_graph.read_only();
  
                if let Some(chan) = graph.channels().get(&scid) {
                                        let amt = directed_info.effective_capacity().as_msat();
                                        let dir_liq = liq.as_directed(source, target, 0, amt, self.decay_params);
  
 -                                      let (min_buckets, mut max_buckets, _) = dir_liq.liquidity_history
 -                                              .get_decayed_buckets(dir_liq.now, *dir_liq.last_updated,
 -                                                      self.decay_params.historical_no_updates_half_life);
 +                                      let (min_buckets, mut max_buckets) =
 +                                              dir_liq.liquidity_history.get_decayed_buckets(
 +                                                      dir_liq.now, *dir_liq.last_updated,
 +                                                      self.decay_params.historical_no_updates_half_life
 +                                              )?;
 +
                                        // Note that the liquidity buckets are an offset from the edge, so we inverse
                                        // the max order to get the probabilities from zero.
                                        max_buckets.reverse();
@@@ -1141,7 -1121,7 +1141,7 @@@ impl<L: DerefMut<Target = u64>, BRT: De
                        log_trace!(logger, "Max liquidity of {} is {} (already less than or equal to {})",
                                chan_descr, existing_max_msat, amount_msat);
                }
 -              self.update_history_buckets();
 +              self.update_history_buckets(0);
        }
  
        /// Adjusts the channel liquidity balance bounds when failing to route `amount_msat` downstream.
                        log_trace!(logger, "Min liquidity of {} is {} (already greater than or equal to {})",
                                chan_descr, existing_min_msat, amount_msat);
                }
 -              self.update_history_buckets();
 +              self.update_history_buckets(0);
        }
  
        /// Adjusts the channel liquidity balance bounds when successfully routing `amount_msat`.
                let max_liquidity_msat = self.max_liquidity_msat().checked_sub(amount_msat).unwrap_or(0);
                log_debug!(logger, "Subtracting {} from max liquidity of {} (setting it to {})", amount_msat, chan_descr, max_liquidity_msat);
                self.set_max_liquidity_msat(max_liquidity_msat);
 -              self.update_history_buckets();
 +              self.update_history_buckets(amount_msat);
        }
  
 -      fn update_history_buckets(&mut self) {
 +      /// Updates the history buckets for this channel. Because the history buckets track what we now
 +      /// know about the channel's state *prior to our payment* (i.e. what we assume is "steady
 +      /// state"), we allow the caller to set an offset applied to our liquidity bounds which
 +      /// represents the amount of the successful payment we just made.
 +      fn update_history_buckets(&mut self, bucket_offset_msat: u64) {
                let half_lives = self.now.duration_since(*self.last_updated).as_secs()
                        .checked_div(self.decay_params.historical_no_updates_half_life.as_secs())
                        .map(|v| v.try_into().unwrap_or(u32::max_value())).unwrap_or(u32::max_value());
  
                let min_liquidity_offset_msat = self.decayed_offset_msat(*self.min_liquidity_offset_msat);
                self.liquidity_history.min_liquidity_offset_history.track_datapoint(
 -                      min_liquidity_offset_msat, self.capacity_msat
 +                      min_liquidity_offset_msat + bucket_offset_msat, self.capacity_msat
                );
                let max_liquidity_offset_msat = self.decayed_offset_msat(*self.max_liquidity_offset_msat);
                self.liquidity_history.max_liquidity_offset_history.track_datapoint(
 -                      max_liquidity_offset_msat, self.capacity_msat
 +                      max_liquidity_offset_msat.saturating_sub(bucket_offset_msat), self.capacity_msat
                );
        }
  
@@@ -1642,125 -1618,17 +1642,125 @@@ mod approx 
  mod bucketed_history {
        use super::*;
  
 +      // Because liquidity is often skewed heavily in one direction, we store historical state
 +      // distribution in buckets of different size. For backwards compatibility, buckets of size 1/8th
 +      // must fit evenly into the buckets here.
 +      //
 +      // The smallest bucket is 2^-14th of the channel, for each of our 32 buckets here we define the
 +      // width of the bucket in 2^14'ths of the channel. This increases exponentially until we reach
 +      // a full 16th of the channel's capacity, which is reapeated a few times for backwards
 +      // compatibility. The four middle buckets represent full octiles of the channel's capacity.
 +      //
 +      // For a 1 BTC channel, this let's us differentiate between failures in the bottom 6k sats, or
 +      // between the 12,000th sat and 24,000th sat, while only needing to store and operate on 32
 +      // buckets in total.
 +
 +      const BUCKET_START_POS: [u16; 33] = [
 +              0, 1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 3072, 4096, 6144, 8192, 10240, 12288,
 +              13312, 14336, 15360, 15872, 16128, 16256, 16320, 16352, 16368, 16376, 16380, 16382, 16383, 16384,
 +      ];
 +
 +      const LEGACY_TO_BUCKET_RANGE: [(u8, u8); 8] = [
 +              (0, 12), (12, 14), (14, 15), (15, 16), (16, 17), (17, 18), (18, 20), (20, 32)
 +      ];
 +
 +      const POSITION_TICKS: u16 = 1 << 14;
 +
 +      fn pos_to_bucket(pos: u16) -> usize {
 +              for bucket in 0..32 {
 +                      if pos < BUCKET_START_POS[bucket + 1] {
 +                              return bucket;
 +                      }
 +              }
 +              debug_assert!(false);
 +              return 32;
 +      }
 +
 +      #[cfg(test)]
 +      #[test]
 +      fn check_bucket_maps() {
 +              const BUCKET_WIDTH_IN_16384S: [u16; 32] = [
 +                      1, 1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024, 1024, 1024, 2048, 2048,
 +                      2048, 2048, 1024, 1024, 1024, 512, 256, 128, 64, 32, 16, 8, 4, 2, 1, 1];
 +
 +              let mut min_size_iter = 0;
 +              let mut legacy_bucket_iter = 0;
 +              for (bucket, width) in BUCKET_WIDTH_IN_16384S.iter().enumerate() {
 +                      assert_eq!(BUCKET_START_POS[bucket], min_size_iter);
 +                      for i in 0..*width {
 +                              assert_eq!(pos_to_bucket(min_size_iter + i) as usize, bucket);
 +                      }
 +                      min_size_iter += *width;
 +                      if min_size_iter % (POSITION_TICKS / 8) == 0 {
 +                              assert_eq!(LEGACY_TO_BUCKET_RANGE[legacy_bucket_iter].1 as usize, bucket + 1);
 +                              if legacy_bucket_iter + 1 < 8 {
 +                                      assert_eq!(LEGACY_TO_BUCKET_RANGE[legacy_bucket_iter + 1].0 as usize, bucket + 1);
 +                              }
 +                              legacy_bucket_iter += 1;
 +                      }
 +              }
 +              assert_eq!(BUCKET_START_POS[32], POSITION_TICKS);
 +              assert_eq!(min_size_iter, POSITION_TICKS);
 +      }
 +
 +      #[inline]
 +      fn amount_to_pos(amount_msat: u64, capacity_msat: u64) -> u16 {
 +              let pos = if amount_msat < u64::max_value() / (POSITION_TICKS as u64) {
 +                      (amount_msat * (POSITION_TICKS as u64) / capacity_msat.saturating_add(1))
 +                              .try_into().unwrap_or(POSITION_TICKS)
 +              } else {
 +                      // Only use 128-bit arithmetic when multiplication will overflow to avoid 128-bit
 +                      // division. This branch should only be hit in fuzz testing since the amount would
 +                      // need to be over 2.88 million BTC in practice.
 +                      ((amount_msat as u128) * (POSITION_TICKS as u128)
 +                                      / (capacity_msat as u128).saturating_add(1))
 +                              .try_into().unwrap_or(POSITION_TICKS)
 +              };
 +              // If we are running in a client that doesn't validate gossip, its possible for a channel's
 +              // capacity to change due to a `channel_update` message which, if received while a payment
 +              // is in-flight, could cause this to fail. Thus, we only assert in test.
 +              #[cfg(test)]
 +              debug_assert!(pos < POSITION_TICKS);
 +              pos
 +      }
 +
 +      /// Prior to LDK 0.0.117 we used eight buckets which were split evenly across the either
 +      /// octiles. This was changed to use 32 buckets for accuracy reasons in 0.0.117, however we
 +      /// support reading the legacy values here for backwards compatibility.
 +      pub(super) struct LegacyHistoricalBucketRangeTracker {
 +              buckets: [u16; 8],
 +      }
 +
 +      impl LegacyHistoricalBucketRangeTracker {
 +              pub(crate) fn into_current(&self) -> HistoricalBucketRangeTracker {
 +                      let mut buckets = [0; 32];
 +                      for (idx, legacy_bucket) in self.buckets.iter().enumerate() {
 +                              let mut new_val = *legacy_bucket;
 +                              let (start, end) = LEGACY_TO_BUCKET_RANGE[idx];
 +                              new_val /= (end - start) as u16;
 +                              for i in start..end {
 +                                      buckets[i as usize] = new_val;
 +                              }
 +                      }
 +                      HistoricalBucketRangeTracker { buckets }
 +              }
 +      }
 +
        /// Tracks the historical state of a distribution as a weighted average of how much time was spent
 -      /// in each of 8 buckets.
 +      /// in each of 32 buckets.
        #[derive(Clone, Copy)]
        pub(super) struct HistoricalBucketRangeTracker {
 -              buckets: [u16; 8],
 +              buckets: [u16; 32],
        }
  
 +      /// Buckets are stored in fixed point numbers with a 5 bit fractional part. Thus, the value
 +      /// "one" is 32, or this constant.
 +      pub const BUCKET_FIXED_POINT_ONE: u16 = 32;
 +
        impl HistoricalBucketRangeTracker {
 -              pub(super) fn new() -> Self { Self { buckets: [0; 8] } }
 +              pub(super) fn new() -> Self { Self { buckets: [0; 32] } }
                pub(super) fn track_datapoint(&mut self, liquidity_offset_msat: u64, capacity_msat: u64) {
 -                      // We have 8 leaky buckets for min and max liquidity. Each bucket tracks the amount of time
 +                      // We have 32 leaky buckets for min and max liquidity. Each bucket tracks the amount of time
                        // we spend in each bucket as a 16-bit fixed-point number with a 5 bit fractional part.
                        //
                        // Each time we update our liquidity estimate, we add 32 (1.0 in our fixed-point system) to
                        // The constants were picked experimentally, selecting a decay amount that restricts us
                        // from overflowing buckets without having to cap them manually.
  
 -                      // Ensure the bucket index is in the range [0, 7], even if the liquidity offset is zero or
 -                      // the channel's capacity, though the second should generally never happen.
 -                      debug_assert!(liquidity_offset_msat <= capacity_msat);
 -                      let bucket_idx: u8 = (liquidity_offset_msat * 8 / capacity_msat.saturating_add(1))
 -                              .try_into().unwrap_or(32); // 32 is bogus for 8 buckets, and will be ignored
 -                      debug_assert!(bucket_idx < 8);
 -                      if bucket_idx < 8 {
 +                      let pos: u16 = amount_to_pos(liquidity_offset_msat, capacity_msat);
 +                      if pos < POSITION_TICKS {
                                for e in self.buckets.iter_mut() {
                                        *e = ((*e as u32) * 2047 / 2048) as u16;
                                }
 -                              self.buckets[bucket_idx as usize] = self.buckets[bucket_idx as usize].saturating_add(32);
 +                              let bucket = pos_to_bucket(pos);
 +                              self.buckets[bucket] = self.buckets[bucket].saturating_add(BUCKET_FIXED_POINT_ONE);
                        }
                }
                /// Decay all buckets by the given number of half-lives. Used to more aggressively remove old
                /// datapoints as we receive newer information.
 +              #[inline]
                pub(super) fn time_decay_data(&mut self, half_lives: u32) {
                        for e in self.buckets.iter_mut() {
                                *e = e.checked_shr(half_lives).unwrap_or(0);
        }
  
        impl_writeable_tlv_based!(HistoricalBucketRangeTracker, { (0, buckets, required) });
 +      impl_writeable_tlv_based!(LegacyHistoricalBucketRangeTracker, { (0, buckets, required) });
  
 +      /// A set of buckets representing the history of where we've seen the minimum- and maximum-
 +      /// liquidity bounds for a given channel.
        pub(super) struct HistoricalMinMaxBuckets<D: Deref<Target = HistoricalBucketRangeTracker>> {
 +              /// Buckets tracking where and how often we've seen the minimum liquidity bound for a
 +              /// channel.
                pub(super) min_liquidity_offset_history: D,
 +              /// Buckets tracking where and how often we've seen the maximum liquidity bound for a
 +              /// channel.
                pub(super) max_liquidity_offset_history: D,
        }
  
        impl<D: Deref<Target = HistoricalBucketRangeTracker>> HistoricalMinMaxBuckets<D> {
 -              #[inline]
                pub(super) fn get_decayed_buckets<T: Time>(&self, now: T, last_updated: T, half_life: Duration)
 -              -> ([u16; 8], [u16; 8], u32) {
 -                      let required_decays = now.duration_since(last_updated).as_secs()
 -                              .checked_div(half_life.as_secs())
 -                              .map_or(u32::max_value(), |decays| cmp::min(decays, u32::max_value() as u64) as u32);
 +              -> Option<([u16; 32], [u16; 32])> {
 +                      let (_, required_decays) = self.get_total_valid_points(now, last_updated, half_life)?;
 +
                        let mut min_buckets = *self.min_liquidity_offset_history;
                        min_buckets.time_decay_data(required_decays);
                        let mut max_buckets = *self.max_liquidity_offset_history;
                        max_buckets.time_decay_data(required_decays);
 -                      (min_buckets.buckets, max_buckets.buckets, required_decays)
 +                      Some((min_buckets.buckets, max_buckets.buckets))
 +              }
 +              #[inline]
 +              pub(super) fn get_total_valid_points<T: Time>(&self, now: T, last_updated: T, half_life: Duration)
 +              -> Option<(u64, u32)> {
 +                      let required_decays = now.duration_since(last_updated).as_secs()
 +                              .checked_div(half_life.as_secs())
 +                              .map_or(u32::max_value(), |decays| cmp::min(decays, u32::max_value() as u64) as u32);
 +
 +                      let mut total_valid_points_tracked = 0;
 +                      for (min_idx, min_bucket) in self.min_liquidity_offset_history.buckets.iter().enumerate() {
 +                              for max_bucket in self.max_liquidity_offset_history.buckets.iter().take(32 - min_idx) {
 +                                      total_valid_points_tracked += (*min_bucket as u64) * (*max_bucket as u64);
 +                              }
 +                      }
 +
 +                      // If the total valid points is smaller than 1.0 (i.e. 32 in our fixed-point scheme),
 +                      // treat it as if we were fully decayed.
 +                      const FULLY_DECAYED: u16 = BUCKET_FIXED_POINT_ONE * BUCKET_FIXED_POINT_ONE;
 +                      if total_valid_points_tracked.checked_shr(required_decays).unwrap_or(0) < FULLY_DECAYED.into() {
 +                              return None;
 +                      }
 +
 +                      Some((total_valid_points_tracked, required_decays))
                }
  
                #[inline]
                pub(super) fn calculate_success_probability_times_billion<T: Time>(
                        &self, now: T, last_updated: T, half_life: Duration, amount_msat: u64, capacity_msat: u64)
                -> Option<u64> {
 -                      // If historical penalties are enabled, calculate the penalty by walking the set of
 -                      // historical liquidity bucket (min, max) combinations (where min_idx < max_idx) and, for
 -                      // each, calculate the probability of success given our payment amount, then total the
 -                      // weighted average probability of success.
 -                      //
 -                      // We use a sliding scale to decide which point within a given bucket will be compared to
 -                      // the amount being sent - for lower-bounds, the amount being sent is compared to the lower
 -                      // edge of the first bucket (i.e. zero), but compared to the upper 7/8ths of the last
 -                      // bucket (i.e. 9 times the index, or 63), with each bucket in between increasing the
 -                      // comparison point by 1/64th. For upper-bounds, the same applies, however with an offset
 -                      // of 1/64th (i.e. starting at one and ending at 64). This avoids failing to assign
 -                      // penalties to channels at the edges.
 -                      //
 -                      // If we used the bottom edge of buckets, we'd end up never assigning any penalty at all to
 -                      // such a channel when sending less than ~0.19% of the channel's capacity (e.g. ~200k sats
 -                      // for a 1 BTC channel!).
 -                      //
 -                      // If we used the middle of each bucket we'd never assign any penalty at all when sending
 -                      // less than 1/16th of a channel's capacity, or 1/8th if we used the top of the bucket.
 -                      let mut total_valid_points_tracked = 0;
 -
 -                      let payment_amt_64th_bucket: u8 = if amount_msat < u64::max_value() / 64 {
 -                              (amount_msat * 64 / capacity_msat.saturating_add(1))
 -                                      .try_into().unwrap_or(65)
 -                      } else {
 -                              // Only use 128-bit arithmetic when multiplication will overflow to avoid 128-bit
 -                              // division. This branch should only be hit in fuzz testing since the amount would
 -                              // need to be over 2.88 million BTC in practice.
 -                              ((amount_msat as u128) * 64 / (capacity_msat as u128).saturating_add(1))
 -                                      .try_into().unwrap_or(65)
 -                      };
 -                      #[cfg(not(fuzzing))]
 -                      debug_assert!(payment_amt_64th_bucket <= 64);
 -                      if payment_amt_64th_bucket >= 64 { return None; }
 +                      // If historical penalties are enabled, we try to calculate a probability of success
 +                      // given our historical distribution of min- and max-liquidity bounds in a channel.
 +                      // To do so, we walk the set of historical liquidity bucket (min, max) combinations
 +                      // (where min_idx < max_idx, as having a minimum above our maximum is an invalid
 +                      // state). For each pair, we calculate the probability as if the bucket's corresponding
 +                      // min- and max- liquidity bounds were our current liquidity bounds and then multiply
 +                      // that probability by the weight of the selected buckets.
 +                      let payment_pos = amount_to_pos(amount_msat, capacity_msat);
 +                      if payment_pos >= POSITION_TICKS { return None; }
  
                        // Check if all our buckets are zero, once decayed and treat it as if we had no data. We
                        // don't actually use the decayed buckets, though, as that would lose precision.
 -                      let (decayed_min_buckets, decayed_max_buckets, required_decays) =
 -                              self.get_decayed_buckets(now, last_updated, half_life);
 -                      if decayed_min_buckets.iter().all(|v| *v == 0) || decayed_max_buckets.iter().all(|v| *v == 0) {
 -                              return None;
 -                      }
 +                      let (total_valid_points_tracked, _)
 +                              = self.get_total_valid_points(now, last_updated, half_life)?;
  
 -                      for (min_idx, min_bucket) in self.min_liquidity_offset_history.buckets.iter().enumerate() {
 -                              for max_bucket in self.max_liquidity_offset_history.buckets.iter().take(8 - min_idx) {
 -                                      total_valid_points_tracked += (*min_bucket as u64) * (*max_bucket as u64);
 +                      let mut cumulative_success_prob_times_billion = 0;
 +                      // Special-case the 0th min bucket - it generally means we failed a payment, so only
 +                      // consider the highest (i.e. largest-offset-from-max-capacity) max bucket for all
 +                      // points against the 0th min bucket. This avoids the case where we fail to route
 +                      // increasingly lower values over a channel, but treat each failure as a separate
 +                      // datapoint, many of which may have relatively high maximum-available-liquidity
 +                      // values, which will result in us thinking we have some nontrivial probability of
 +                      // routing up to that amount.
 +                      if self.min_liquidity_offset_history.buckets[0] != 0 {
 +                              let mut highest_max_bucket_with_points = 0; // The highest max-bucket with any data
 +                              let mut total_max_points = 0; // Total points in max-buckets to consider
 +                              for (max_idx, max_bucket) in self.max_liquidity_offset_history.buckets.iter().enumerate() {
 +                                      if *max_bucket >= BUCKET_FIXED_POINT_ONE {
 +                                              highest_max_bucket_with_points = cmp::max(highest_max_bucket_with_points, max_idx);
 +                                      }
 +                                      total_max_points += *max_bucket as u64;
 +                              }
 +                              let max_bucket_end_pos = BUCKET_START_POS[32 - highest_max_bucket_with_points] - 1;
 +                              if payment_pos < max_bucket_end_pos {
 +                                      let bucket_prob_times_billion =
 +                                              (self.min_liquidity_offset_history.buckets[0] as u64) * total_max_points
 +                                                      * 1024 * 1024 * 1024 / total_valid_points_tracked;
 +                                      cumulative_success_prob_times_billion += bucket_prob_times_billion *
 +                                              ((max_bucket_end_pos - payment_pos) as u64) /
 +                                              // Add an additional one in the divisor as the payment bucket has been
 +                                              // rounded down.
 +                                              (max_bucket_end_pos + 1) as u64;
                                }
 -                      }
 -                      // If the total valid points is smaller than 1.0 (i.e. 32 in our fixed-point scheme), treat
 -                      // it as if we were fully decayed.
 -                      if total_valid_points_tracked.checked_shr(required_decays).unwrap_or(0) < 32*32 {
 -                              return None;
                        }
  
 -                      let mut cumulative_success_prob_times_billion = 0;
 -                      for (min_idx, min_bucket) in self.min_liquidity_offset_history.buckets.iter().enumerate() {
 -                              for (max_idx, max_bucket) in self.max_liquidity_offset_history.buckets.iter().enumerate().take(8 - min_idx) {
 -                                      let bucket_prob_times_million = (*min_bucket as u64) * (*max_bucket as u64)
 -                                              * 1024 * 1024 / total_valid_points_tracked;
 -                                      let min_64th_bucket = min_idx as u8 * 9;
 -                                      let max_64th_bucket = (7 - max_idx as u8) * 9 + 1;
 -                                      if payment_amt_64th_bucket > max_64th_bucket {
 -                                              // Success probability 0, the payment amount is above the max liquidity
 -                                      } else if payment_amt_64th_bucket <= min_64th_bucket {
 -                                              cumulative_success_prob_times_billion += bucket_prob_times_million * 1024;
 +                      for (min_idx, min_bucket) in self.min_liquidity_offset_history.buckets.iter().enumerate().skip(1) {
 +                              let min_bucket_start_pos = BUCKET_START_POS[min_idx];
 +                              for (max_idx, max_bucket) in self.max_liquidity_offset_history.buckets.iter().enumerate().take(32 - min_idx) {
 +                                      let max_bucket_end_pos = BUCKET_START_POS[32 - max_idx] - 1;
 +                                      // Note that this multiply can only barely not overflow - two 16 bit ints plus
 +                                      // 30 bits is 62 bits.
 +                                      let bucket_prob_times_billion = (*min_bucket as u64) * (*max_bucket as u64)
 +                                              * 1024 * 1024 * 1024 / total_valid_points_tracked;
 +                                      if payment_pos >= max_bucket_end_pos {
 +                                              // Success probability 0, the payment amount may be above the max liquidity
 +                                              break;
 +                                      } else if payment_pos < min_bucket_start_pos {
 +                                              cumulative_success_prob_times_billion += bucket_prob_times_billion;
                                        } else {
 -                                              cumulative_success_prob_times_billion += bucket_prob_times_million *
 -                                                      ((max_64th_bucket - payment_amt_64th_bucket) as u64) * 1024 /
 -                                                      ((max_64th_bucket - min_64th_bucket) as u64);
 +                                              cumulative_success_prob_times_billion += bucket_prob_times_billion *
 +                                                      ((max_bucket_end_pos - payment_pos) as u64) /
 +                                                      // Add an additional one in the divisor as the payment bucket has been
 +                                                      // rounded down.
 +                                                      ((max_bucket_end_pos - min_bucket_start_pos + 1) as u64);
                                        }
                                }
                        }
                }
        }
  }
 -use bucketed_history::{HistoricalBucketRangeTracker, HistoricalMinMaxBuckets};
 +use bucketed_history::{LegacyHistoricalBucketRangeTracker, HistoricalBucketRangeTracker, HistoricalMinMaxBuckets};
  
  impl<G: Deref<Target = NetworkGraph<L>>, L: Deref, T: Time> Writeable for ProbabilisticScorerUsingTime<G, L, T> where L::Target: Logger {
        #[inline]
@@@ -1963,12 -1811,10 +1963,12 @@@ impl<T: Time> Writeable for ChannelLiqu
                let duration_since_epoch = T::duration_since_epoch() - self.last_updated.elapsed();
                write_tlv_fields!(w, {
                        (0, self.min_liquidity_offset_msat, required),
 -                      (1, Some(self.min_liquidity_offset_history), option),
 +                      // 1 was the min_liquidity_offset_history in octile form
                        (2, self.max_liquidity_offset_msat, required),
 -                      (3, Some(self.max_liquidity_offset_history), option),
 +                      // 3 was the max_liquidity_offset_history in octile form
                        (4, duration_since_epoch, required),
 +                      (5, Some(self.min_liquidity_offset_history), option),
 +                      (7, Some(self.max_liquidity_offset_history), option),
                });
                Ok(())
        }
@@@ -1979,19 -1825,15 +1979,19 @@@ impl<T: Time> Readable for ChannelLiqui
        fn read<R: Read>(r: &mut R) -> Result<Self, DecodeError> {
                let mut min_liquidity_offset_msat = 0;
                let mut max_liquidity_offset_msat = 0;
 -              let mut min_liquidity_offset_history = Some(HistoricalBucketRangeTracker::new());
 -              let mut max_liquidity_offset_history = Some(HistoricalBucketRangeTracker::new());
 +              let mut legacy_min_liq_offset_history: Option<LegacyHistoricalBucketRangeTracker> = None;
 +              let mut legacy_max_liq_offset_history: Option<LegacyHistoricalBucketRangeTracker> = None;
 +              let mut min_liquidity_offset_history: Option<HistoricalBucketRangeTracker> = None;
 +              let mut max_liquidity_offset_history: Option<HistoricalBucketRangeTracker> = None;
                let mut duration_since_epoch = Duration::from_secs(0);
                read_tlv_fields!(r, {
                        (0, min_liquidity_offset_msat, required),
 -                      (1, min_liquidity_offset_history, option),
 +                      (1, legacy_min_liq_offset_history, option),
                        (2, max_liquidity_offset_msat, required),
 -                      (3, max_liquidity_offset_history, option),
 +                      (3, legacy_max_liq_offset_history, option),
                        (4, duration_since_epoch, required),
 +                      (5, min_liquidity_offset_history, option),
 +                      (7, max_liquidity_offset_history, option),
                });
                // On rust prior to 1.60 `Instant::duration_since` will panic if time goes backwards.
                // We write `last_updated` as wallclock time even though its ultimately an `Instant` (which
                let last_updated = if wall_clock_now > duration_since_epoch {
                        now - (wall_clock_now - duration_since_epoch)
                } else { now };
 +              if min_liquidity_offset_history.is_none() {
 +                      if let Some(legacy_buckets) = legacy_min_liq_offset_history {
 +                              min_liquidity_offset_history = Some(legacy_buckets.into_current());
 +                      } else {
 +                              min_liquidity_offset_history = Some(HistoricalBucketRangeTracker::new());
 +                      }
 +              }
 +              if max_liquidity_offset_history.is_none() {
 +                      if let Some(legacy_buckets) = legacy_max_liq_offset_history {
 +                              max_liquidity_offset_history = Some(legacy_buckets.into_current());
 +                      } else {
 +                              max_liquidity_offset_history = Some(HistoricalBucketRangeTracker::new());
 +                      }
 +              }
                Ok(Self {
                        min_liquidity_offset_msat,
                        max_liquidity_offset_msat,
@@@ -2147,20 -1975,20 +2147,20 @@@ mod tests 
                let chain_source: Option<&crate::util::test_utils::TestChainSource> = None;
                network_graph.update_channel_from_announcement(
                        &signed_announcement, &chain_source).unwrap();
 -              update_channel(network_graph, short_channel_id, node_1_key, 0, 1_000);
 -              update_channel(network_graph, short_channel_id, node_2_key, 1, 0);
 +              update_channel(network_graph, short_channel_id, node_1_key, 0, 1_000, 100);
 +              update_channel(network_graph, short_channel_id, node_2_key, 1, 0, 100);
        }
  
        fn update_channel(
                network_graph: &mut NetworkGraph<&TestLogger>, short_channel_id: u64, node_key: SecretKey,
 -              flags: u8, htlc_maximum_msat: u64
 +              flags: u8, htlc_maximum_msat: u64, timestamp: u32,
        ) {
                let genesis_hash = genesis_block(Network::Testnet).header.block_hash();
                let secp_ctx = Secp256k1::new();
                let unsigned_update = UnsignedChannelUpdate {
                        chain_hash: genesis_hash,
                        short_channel_id,
 -                      timestamp: 100,
 +                      timestamp,
                        flags,
                        cltv_expiry_delta: 18,
                        htlc_minimum_msat: 0,
                        channel_features: channelmanager::provided_channel_features(&config),
                        fee_msat,
                        cltv_expiry_delta: 18,
+                       maybe_announced_channel: true,
                }
        }
  
                        inflight_htlc_msat: 0,
                        effective_capacity: EffectiveCapacity::Total { capacity_msat: 1_024, htlc_maximum_msat: 1_024 },
                };
 +              let usage_1 = ChannelUsage {
 +                      amount_msat: 1,
 +                      inflight_htlc_msat: 0,
 +                      effective_capacity: EffectiveCapacity::Total { capacity_msat: 1_024, htlc_maximum_msat: 1_024 },
 +              };
 +
                // With no historical data the normal liquidity penalty calculation is used.
                assert_eq!(scorer.channel_penalty_msat(42, &source, &target, usage, &params), 47);
                assert_eq!(scorer.historical_estimated_channel_liquidity_probabilities(42, &target),
  
                scorer.payment_path_failed(&payment_path_for_amount(1), 42);
                assert_eq!(scorer.channel_penalty_msat(42, &source, &target, usage, &params), 2048);
 -              // The "it failed" increment is 32, where the probability should lie fully in the first
 -              // octile.
 +              assert_eq!(scorer.channel_penalty_msat(42, &source, &target, usage_1, &params), 128);
 +              // The "it failed" increment is 32, where the probability should lie several buckets into
 +              // the first octile.
                assert_eq!(scorer.historical_estimated_channel_liquidity_probabilities(42, &target),
 -                      Some(([32, 0, 0, 0, 0, 0, 0, 0], [32, 0, 0, 0, 0, 0, 0, 0])));
 -              assert_eq!(scorer.historical_estimated_payment_success_probability(42, &target, 1),
 -                      Some(1.0));
 +                      Some(([32, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
 +                              [0, 0, 0, 0, 0, 0, 32, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0])));
 +              assert!(scorer.historical_estimated_payment_success_probability(42, &target, 1)
 +                      .unwrap() > 0.35);
                assert_eq!(scorer.historical_estimated_payment_success_probability(42, &target, 500),
                        Some(0.0));
  
                // Even after we tell the scorer we definitely have enough available liquidity, it will
                // still remember that there was some failure in the past, and assign a non-0 penalty.
                scorer.payment_path_failed(&payment_path_for_amount(1000), 43);
 -              assert_eq!(scorer.channel_penalty_msat(42, &source, &target, usage, &params), 198);
 -              // The first octile should be decayed just slightly and the last octile has a new point.
 +              assert_eq!(scorer.channel_penalty_msat(42, &source, &target, usage, &params), 32);
 +              // The first points should be decayed just slightly and the last bucket has a new point.
                assert_eq!(scorer.historical_estimated_channel_liquidity_probabilities(42, &target),
 -                      Some(([31, 0, 0, 0, 0, 0, 0, 32], [31, 0, 0, 0, 0, 0, 0, 32])));
 +                      Some(([31, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 32, 0, 0, 0, 0, 0],
 +                              [0, 0, 0, 0, 0, 0, 31, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 32])));
  
                // The exact success probability is a bit complicated and involves integer rounding, so we
                // simply check bounds here.
                let five_hundred_prob =
                        scorer.historical_estimated_payment_success_probability(42, &target, 500).unwrap();
 -              assert!(five_hundred_prob > 0.5);
 -              assert!(five_hundred_prob < 0.52);
 +              assert!(five_hundred_prob > 0.66);
 +              assert!(five_hundred_prob < 0.68);
                let one_prob =
                        scorer.historical_estimated_payment_success_probability(42, &target, 1).unwrap();
                assert!(one_prob < 1.0);
 -              assert!(one_prob > 0.99);
 +              assert!(one_prob > 0.95);
  
                // Advance the time forward 16 half-lives (which the docs claim will ensure all data is
                // gone), and check that we're back to where we started.
                // Once fully decayed we still have data, but its all-0s. In the future we may remove the
                // data entirely instead.
                assert_eq!(scorer.historical_estimated_channel_liquidity_probabilities(42, &target),
 -                      Some(([0; 8], [0; 8])));
 +                      None);
                assert_eq!(scorer.historical_estimated_payment_success_probability(42, &target, 1), None);
  
                let mut usage = ChannelUsage {
                scorer.payment_path_failed(&payment_path_for_amount(1), 42);
                assert_eq!(scorer.channel_penalty_msat(42, &source, &target, usage, &params), 2048);
                usage.inflight_htlc_msat = 0;
 -              assert_eq!(scorer.channel_penalty_msat(42, &source, &target, usage, &params), 409);
 +              assert_eq!(scorer.channel_penalty_msat(42, &source, &target, usage, &params), 866);
  
                let usage = ChannelUsage {
                        amount_msat: 1,
                assert_eq!(liquidity.min_liquidity_msat(), 256);
                assert_eq!(liquidity.max_liquidity_msat(), 768);
        }
 +
 +      #[test]
 +      fn realistic_historical_failures() {
 +              // The motivation for the unequal sized buckets came largely from attempting to pay 10k
 +              // sats over a one bitcoin channel. This tests that case explicitly, ensuring that we score
 +              // properly.
 +              let logger = TestLogger::new();
 +              let mut network_graph = network_graph(&logger);
 +              let params = ProbabilisticScoringFeeParameters {
 +                      historical_liquidity_penalty_multiplier_msat: 1024,
 +                      historical_liquidity_penalty_amount_multiplier_msat: 1024,
 +                      ..ProbabilisticScoringFeeParameters::zero_penalty()
 +              };
 +              let decay_params = ProbabilisticScoringDecayParameters {
 +                      liquidity_offset_half_life: Duration::from_secs(60 * 60),
 +                      historical_no_updates_half_life: Duration::from_secs(10),
 +                      ..ProbabilisticScoringDecayParameters::default()
 +              };
 +
 +              let capacity_msat = 100_000_000_000;
 +              update_channel(&mut network_graph, 42, source_privkey(), 0, capacity_msat, 200);
 +              update_channel(&mut network_graph, 42, target_privkey(), 1, capacity_msat, 200);
 +
 +              let mut scorer = ProbabilisticScorer::new(decay_params, &network_graph, &logger);
 +              let source = source_node_id();
 +              let target = target_node_id();
 +
 +              let mut amount_msat = 10_000_000;
 +              let usage = ChannelUsage {
 +                      amount_msat,
 +                      inflight_htlc_msat: 0,
 +                      effective_capacity: EffectiveCapacity::Total { capacity_msat, htlc_maximum_msat: capacity_msat },
 +              };
 +              // With no historical data the normal liquidity penalty calculation is used, which in this
 +              // case is diminuitively low.
 +              assert_eq!(scorer.channel_penalty_msat(42, &source, &target, usage, &params), 0);
 +              assert_eq!(scorer.historical_estimated_channel_liquidity_probabilities(42, &target),
 +                      None);
 +              assert_eq!(scorer.historical_estimated_payment_success_probability(42, &target, 42),
 +                      None);
 +
 +              // Fail to pay once, and then check the buckets and penalty.
 +              scorer.payment_path_failed(&payment_path_for_amount(amount_msat), 42);
 +              // The penalty should be the maximum penalty, as the payment we're scoring is now in the
 +              // same bucket which is the only maximum datapoint.
 +              assert_eq!(scorer.channel_penalty_msat(42, &source, &target, usage, &params),
 +                      2048 + 2048 * amount_msat / super::AMOUNT_PENALTY_DIVISOR);
 +              // The "it failed" increment is 32, which we should apply to the first upper-bound (between
 +              // 6k sats and 12k sats).
 +              assert_eq!(scorer.historical_estimated_channel_liquidity_probabilities(42, &target),
 +                      Some(([32, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
 +                              [0, 32, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0])));
 +              // The success probability estimate itself should be zero.
 +              assert_eq!(scorer.historical_estimated_payment_success_probability(42, &target, amount_msat),
 +                      Some(0.0));
 +
 +              // Now test again with the amount in the bottom bucket.
 +              amount_msat /= 2;
 +              // The new amount is entirely within the only minimum bucket with score, so the probability
 +              // we assign is 1/2.
 +              assert_eq!(scorer.historical_estimated_payment_success_probability(42, &target, amount_msat),
 +                      Some(0.5));
 +
 +              // ...but once we see a failure, we consider the payment to be substantially less likely,
 +              // even though not a probability of zero as we still look at the second max bucket which
 +              // now shows 31.
 +              scorer.payment_path_failed(&payment_path_for_amount(amount_msat), 42);
 +              assert_eq!(scorer.historical_estimated_channel_liquidity_probabilities(42, &target),
 +                      Some(([63, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
 +                              [32, 31, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0])));
 +              assert_eq!(scorer.historical_estimated_payment_success_probability(42, &target, amount_msat),
 +                      Some(0.0));
 +      }
  }