Merge pull request #2822 from TheBlueMatt/2024-01-pm-dyn-ref
[rust-lightning] / lightning-background-processor / src / lib.rs
index 353ed6738d686698a9dd88079587316019f738d2..0f2c67538d65de59acb37da608ca0f48da78f227 100644 (file)
@@ -2,9 +2,8 @@
 //! running properly, and (2) either can or should be run in the background. See docs for
 //! [`BackgroundProcessor`] for more details on the nitty-gritty.
 
-// Prefix these with `rustdoc::` when we update our MSRV to be >= 1.52 to remove warnings.
-#![deny(broken_intra_doc_links)]
-#![deny(private_intra_doc_links)]
+#![deny(rustdoc::broken_intra_doc_links)]
+#![deny(rustdoc::private_intra_doc_links)]
 
 #![deny(missing_docs)]
 #![cfg_attr(not(feature = "futures"), deny(unsafe_code))]
@@ -28,8 +27,12 @@ use lightning::chain::chainmonitor::{ChainMonitor, Persist};
 use lightning::sign::{EntropySource, NodeSigner, SignerProvider};
 use lightning::events::{Event, PathFailure};
 #[cfg(feature = "std")]
-use lightning::events::{EventHandler, EventsProvider};
+use lightning::events::EventHandler;
+#[cfg(any(feature = "std", feature = "futures"))]
+use lightning::events::EventsProvider;
+
 use lightning::ln::channelmanager::ChannelManager;
+use lightning::ln::msgs::OnionMessageHandler;
 use lightning::ln::peer_handler::APeerManager;
 use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
 use lightning::routing::utxo::UtxoLookup;
@@ -104,11 +107,16 @@ const PING_TIMER: u64 = 30;
 #[cfg(test)]
 const PING_TIMER: u64 = 1;
 
+#[cfg(not(test))]
+const ONION_MESSAGE_HANDLER_TIMER: u64 = 10;
+#[cfg(test)]
+const ONION_MESSAGE_HANDLER_TIMER: u64 = 1;
+
 /// Prune the network graph of stale entries hourly.
 const NETWORK_PRUNE_TIMER: u64 = 60 * 60;
 
 #[cfg(not(test))]
-const SCORER_PERSIST_TIMER: u64 = 60 * 60;
+const SCORER_PERSIST_TIMER: u64 = 60 * 5;
 #[cfg(test)]
 const SCORER_PERSIST_TIMER: u64 = 1;
 
@@ -239,30 +247,30 @@ fn handle_network_graph_update<L: Deref>(
 /// Updates scorer based on event and returns whether an update occurred so we can decide whether
 /// to persist.
 fn update_scorer<'a, S: 'static + Deref<Target = SC> + Send + Sync, SC: 'a + WriteableScore<'a>>(
-       scorer: &'a S, event: &Event
+       scorer: &'a S, event: &Event, duration_since_epoch: Duration,
 ) -> bool {
        match event {
                Event::PaymentPathFailed { ref path, short_channel_id: Some(scid), .. } => {
                        let mut score = scorer.write_lock();
-                       score.payment_path_failed(path, *scid);
+                       score.payment_path_failed(path, *scid, duration_since_epoch);
                },
                Event::PaymentPathFailed { ref path, payment_failed_permanently: true, .. } => {
                        // Reached if the destination explicitly failed it back. We treat this as a successful probe
                        // because the payment made it all the way to the destination with sufficient liquidity.
                        let mut score = scorer.write_lock();
-                       score.probe_successful(path);
+                       score.probe_successful(path, duration_since_epoch);
                },
                Event::PaymentPathSuccessful { path, .. } => {
                        let mut score = scorer.write_lock();
-                       score.payment_path_successful(path);
+                       score.payment_path_successful(path, duration_since_epoch);
                },
                Event::ProbeSuccessful { path, .. } => {
                        let mut score = scorer.write_lock();
-                       score.probe_successful(path);
+                       score.probe_successful(path, duration_since_epoch);
                },
                Event::ProbeFailed { path, short_channel_id: Some(scid), .. } => {
                        let mut score = scorer.write_lock();
-                       score.probe_failed(path, *scid);
+                       score.probe_failed(path, *scid, duration_since_epoch);
                },
                _ => return false,
        }
@@ -270,27 +278,31 @@ fn update_scorer<'a, S: 'static + Deref<Target = SC> + Send + Sync, SC: 'a + Wri
 }
 
 macro_rules! define_run_body {
-       ($persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr,
-        $channel_manager: ident, $process_channel_manager_events: expr,
-        $gossip_sync: ident, $peer_manager: ident, $logger: ident, $scorer: ident,
-        $loop_exit_check: expr, $await: expr, $get_timer: expr, $timer_elapsed: expr,
-        $check_slow_await: expr)
-       => { {
+       (
+               $persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr,
+               $channel_manager: ident, $process_channel_manager_events: expr,
+               $peer_manager: ident, $process_onion_message_handler_events: expr, $gossip_sync: ident,
+               $logger: ident, $scorer: ident, $loop_exit_check: expr, $await: expr, $get_timer: expr,
+               $timer_elapsed: expr, $check_slow_await: expr, $time_fetch: expr,
+       ) => { {
                log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup");
                $channel_manager.timer_tick_occurred();
                log_trace!($logger, "Rebroadcasting monitor's pending claims on startup");
                $chain_monitor.rebroadcast_pending_claims();
 
                let mut last_freshness_call = $get_timer(FRESHNESS_TIMER);
+               let mut last_onion_message_handler_call = $get_timer(ONION_MESSAGE_HANDLER_TIMER);
                let mut last_ping_call = $get_timer(PING_TIMER);
                let mut last_prune_call = $get_timer(FIRST_NETWORK_PRUNE_TIMER);
                let mut last_scorer_persist_call = $get_timer(SCORER_PERSIST_TIMER);
                let mut last_rebroadcast_call = $get_timer(REBROADCAST_TIMER);
                let mut have_pruned = false;
+               let mut have_decayed_scorer = false;
 
                loop {
                        $process_channel_manager_events;
                        $process_chain_monitor_events;
+                       $process_onion_message_handler_events;
 
                        // Note that the PeerManager::process_events may block on ChannelManager's locks,
                        // hence it comes last here. When the ChannelManager finishes whatever it's doing,
@@ -315,7 +327,7 @@ macro_rules! define_run_body {
                        // see `await_start`'s use below.
                        let mut await_start = None;
                        if $check_slow_await { await_start = Some($get_timer(1)); }
-                       let updates_available = $await;
+                       $await;
                        let await_slow = if $check_slow_await { $timer_elapsed(&mut await_start.unwrap(), 1) } else { false };
 
                        // Exit the loop if the background processor was requested to stop.
@@ -324,7 +336,7 @@ macro_rules! define_run_body {
                                break;
                        }
 
-                       if updates_available {
+                       if $channel_manager.get_and_clear_needs_persistence() {
                                log_trace!($logger, "Persisting ChannelManager...");
                                $persister.persist_manager(&*$channel_manager)?;
                                log_trace!($logger, "Done persisting ChannelManager.");
@@ -334,6 +346,11 @@ macro_rules! define_run_body {
                                $channel_manager.timer_tick_occurred();
                                last_freshness_call = $get_timer(FRESHNESS_TIMER);
                        }
+                       if $timer_elapsed(&mut last_onion_message_handler_call, ONION_MESSAGE_HANDLER_TIMER) {
+                               log_trace!($logger, "Calling OnionMessageHandler's timer_tick_occurred");
+                               $peer_manager.onion_message_handler().timer_tick_occurred();
+                               last_onion_message_handler_call = $get_timer(ONION_MESSAGE_HANDLER_TIMER);
+                       }
                        if await_slow {
                                // On various platforms, we may be starved of CPU cycles for several reasons.
                                // E.g. on iOS, if we've been in the background, we will be entirely paused.
@@ -370,11 +387,10 @@ macro_rules! define_run_body {
                        if should_prune {
                                // The network graph must not be pruned while rapid sync completion is pending
                                if let Some(network_graph) = $gossip_sync.prunable_network_graph() {
-                                       #[cfg(feature = "std")] {
+                                       if let Some(duration_since_epoch) = $time_fetch() {
                                                log_trace!($logger, "Pruning and persisting network graph.");
-                                               network_graph.remove_stale_channels_and_tracking();
-                                       }
-                                       #[cfg(not(feature = "std"))] {
+                                               network_graph.remove_stale_channels_and_tracking_with_time(duration_since_epoch.as_secs());
+                                       } else {
                                                log_warn!($logger, "Not pruning network graph, consider enabling `std` or doing so manually with remove_stale_channels_and_tracking_with_time.");
                                                log_trace!($logger, "Persisting network graph.");
                                        }
@@ -389,9 +405,24 @@ macro_rules! define_run_body {
                                last_prune_call = $get_timer(prune_timer);
                        }
 
+                       if !have_decayed_scorer {
+                               if let Some(ref scorer) = $scorer {
+                                       if let Some(duration_since_epoch) = $time_fetch() {
+                                               log_trace!($logger, "Calling time_passed on scorer at startup");
+                                               scorer.write_lock().time_passed(duration_since_epoch);
+                                       }
+                               }
+                               have_decayed_scorer = true;
+                       }
+
                        if $timer_elapsed(&mut last_scorer_persist_call, SCORER_PERSIST_TIMER) {
                                if let Some(ref scorer) = $scorer {
-                                       log_trace!($logger, "Persisting scorer");
+                                       if let Some(duration_since_epoch) = $time_fetch() {
+                                               log_trace!($logger, "Calling time_passed and persisting scorer");
+                                               scorer.write_lock().time_passed(duration_since_epoch);
+                                       } else {
+                                               log_trace!($logger, "Persisting scorer");
+                                       }
                                        if let Err(e) = $persister.persist_scorer(&scorer) {
                                                log_error!($logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
                                        }
@@ -497,19 +528,23 @@ use core::task;
 /// are unsure, you should set the flag, as the performance impact of it is minimal unless there
 /// are hundreds or thousands of simultaneous process calls running.
 ///
+/// The `fetch_time` parameter should return the current wall clock time, if one is available. If
+/// no time is available, some features may be disabled, however the node will still operate fine.
+///
 /// For example, in order to process background events in a [Tokio](https://tokio.rs/) task, you
 /// could setup `process_events_async` like this:
 /// ```
 /// # use lightning::io;
-/// # use std::sync::{Arc, Mutex};
+/// # use std::sync::{Arc, RwLock};
 /// # use std::sync::atomic::{AtomicBool, Ordering};
+/// # use std::time::SystemTime;
 /// # use lightning_background_processor::{process_events_async, GossipSync};
 /// # struct MyStore {}
 /// # impl lightning::util::persist::KVStore for MyStore {
-/// #     fn read(&self, namespace: &str, sub_namespace: &str, key: &str) -> io::Result<Vec<u8>> { Ok(Vec::new()) }
-/// #     fn write(&self, namespace: &str, sub_namespace: &str, key: &str, buf: &[u8]) -> io::Result<()> { Ok(()) }
-/// #     fn remove(&self, namespace: &str, sub_namespace: &str, key: &str, lazy: bool) -> io::Result<()> { Ok(()) }
-/// #     fn list(&self, namespace: &str, sub_namespace: &str) -> io::Result<Vec<String>> { Ok(Vec::new()) }
+/// #     fn read(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> io::Result<Vec<u8>> { Ok(Vec::new()) }
+/// #     fn write(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8]) -> io::Result<()> { Ok(()) }
+/// #     fn remove(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool) -> io::Result<()> { Ok(()) }
+/// #     fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> io::Result<Vec<String>> { Ok(Vec::new()) }
 /// # }
 /// # struct MyEventHandler {}
 /// # impl MyEventHandler {
@@ -528,11 +563,11 @@ use core::task;
 /// # type MyFilter = dyn lightning::chain::Filter + Send + Sync;
 /// # type MyLogger = dyn lightning::util::logger::Logger + Send + Sync;
 /// # type MyChainMonitor = lightning::chain::chainmonitor::ChainMonitor<lightning::sign::InMemorySigner, Arc<MyFilter>, Arc<MyBroadcaster>, Arc<MyFeeEstimator>, Arc<MyLogger>, Arc<MyStore>>;
-/// # type MyPeerManager = lightning::ln::peer_handler::SimpleArcPeerManager<MySocketDescriptor, MyChainMonitor, MyBroadcaster, MyFeeEstimator, MyUtxoLookup, MyLogger>;
+/// # type MyPeerManager = lightning::ln::peer_handler::SimpleArcPeerManager<MySocketDescriptor, MyChainMonitor, MyBroadcaster, MyFeeEstimator, Arc<MyUtxoLookup>, MyLogger>;
 /// # type MyNetworkGraph = lightning::routing::gossip::NetworkGraph<Arc<MyLogger>>;
 /// # type MyGossipSync = lightning::routing::gossip::P2PGossipSync<Arc<MyNetworkGraph>, Arc<MyUtxoLookup>, Arc<MyLogger>>;
 /// # type MyChannelManager = lightning::ln::channelmanager::SimpleArcChannelManager<MyChainMonitor, MyBroadcaster, MyFeeEstimator, MyLogger>;
-/// # type MyScorer = Mutex<lightning::routing::scoring::ProbabilisticScorer<Arc<MyNetworkGraph>, Arc<MyLogger>>>;
+/// # type MyScorer = RwLock<lightning::routing::scoring::ProbabilisticScorer<Arc<MyNetworkGraph>, Arc<MyLogger>>>;
 ///
 /// # async fn setup_background_processing(my_persister: Arc<MyStore>, my_event_handler: Arc<MyEventHandler>, my_chain_monitor: Arc<MyChainMonitor>, my_channel_manager: Arc<MyChannelManager>, my_gossip_sync: Arc<MyGossipSync>, my_logger: Arc<MyLogger>, my_scorer: Arc<MyScorer>, my_peer_manager: Arc<MyPeerManager>) {
 ///    let background_persister = Arc::clone(&my_persister);
@@ -571,6 +606,7 @@ use core::task;
 ///                    Some(background_scorer),
 ///                    sleeper,
 ///                    mobile_interruptable_platform,
+///                    || Some(SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap())
 ///                    )
 ///                    .await
 ///                    .expect("Failed to process events");
@@ -599,25 +635,25 @@ pub async fn process_events_async<
        EventHandlerFuture: core::future::Future<Output = ()>,
        EventHandler: Fn(Event) -> EventHandlerFuture,
        PS: 'static + Deref + Send,
-       M: 'static + Deref<Target = ChainMonitor<<SP::Target as SignerProvider>::Signer, CF, T, F, L, P>> + Send + Sync,
+       M: 'static + Deref<Target = ChainMonitor<<SP::Target as SignerProvider>::EcdsaSigner, CF, T, F, L, P>> + Send + Sync,
        CM: 'static + Deref<Target = ChannelManager<CW, T, ES, NS, SP, F, R, L>> + Send + Sync,
        PGS: 'static + Deref<Target = P2PGossipSync<G, UL, L>> + Send + Sync,
        RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
-       APM: APeerManager + Send + Sync,
-       PM: 'static + Deref<Target = APM> + Send + Sync,
+       PM: 'static + Deref + Send + Sync,
        S: 'static + Deref<Target = SC> + Send + Sync,
        SC: for<'b> WriteableScore<'b>,
        SleepFuture: core::future::Future<Output = bool> + core::marker::Unpin,
-       Sleeper: Fn(Duration) -> SleepFuture
+       Sleeper: Fn(Duration) -> SleepFuture,
+       FetchTime: Fn() -> Option<Duration>,
 >(
        persister: PS, event_handler: EventHandler, chain_monitor: M, channel_manager: CM,
        gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM, logger: L, scorer: Option<S>,
-       sleeper: Sleeper, mobile_interruptable_platform: bool,
+       sleeper: Sleeper, mobile_interruptable_platform: bool, fetch_time: FetchTime,
 ) -> Result<(), lightning::io::Error>
 where
        UL::Target: 'static + UtxoLookup,
        CF::Target: 'static + chain::Filter,
-       CW::Target: 'static + chain::Watch<<SP::Target as SignerProvider>::Signer>,
+       CW::Target: 'static + chain::Watch<<SP::Target as SignerProvider>::EcdsaSigner>,
        T::Target: 'static + BroadcasterInterface,
        ES::Target: 'static + EntropySource,
        NS::Target: 'static + NodeSigner,
@@ -625,8 +661,9 @@ where
        F::Target: 'static + FeeEstimator,
        R::Target: 'static + Router,
        L::Target: 'static + Logger,
-       P::Target: 'static + Persist<<SP::Target as SignerProvider>::Signer>,
+       P::Target: 'static + Persist<<SP::Target as SignerProvider>::EcdsaSigner>,
        PS::Target: 'static + Persister<'a, CW, T, ES, NS, SP, F, R, L, SC>,
+       PM::Target: APeerManager + Send + Sync,
 {
        let mut should_break = false;
        let async_event_handler = |event| {
@@ -635,36 +672,39 @@ where
                let scorer = &scorer;
                let logger = &logger;
                let persister = &persister;
+               let fetch_time = &fetch_time;
                async move {
                        if let Some(network_graph) = network_graph {
                                handle_network_graph_update(network_graph, &event)
                        }
                        if let Some(ref scorer) = scorer {
-                               if update_scorer(scorer, &event) {
-                                       log_trace!(logger, "Persisting scorer after update");
-                                       if let Err(e) = persister.persist_scorer(&scorer) {
-                                               log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
+                               if let Some(duration_since_epoch) = fetch_time() {
+                                       if update_scorer(scorer, &event, duration_since_epoch) {
+                                               log_trace!(logger, "Persisting scorer after update");
+                                               if let Err(e) = persister.persist_scorer(&scorer) {
+                                                       log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
+                                               }
                                        }
                                }
                        }
                        event_handler(event).await;
                }
        };
-       define_run_body!(persister,
-               chain_monitor, chain_monitor.process_pending_events_async(async_event_handler).await,
+       define_run_body!(
+               persister, chain_monitor,
+               chain_monitor.process_pending_events_async(async_event_handler).await,
                channel_manager, channel_manager.process_pending_events_async(async_event_handler).await,
-               gossip_sync, peer_manager, logger, scorer, should_break, {
+               peer_manager, process_onion_message_handler_events_async(&peer_manager, async_event_handler).await,
+               gossip_sync, logger, scorer, should_break, {
                        let fut = Selector {
-                               a: channel_manager.get_persistable_update_future(),
+                               a: channel_manager.get_event_or_persistence_needed_future(),
                                b: chain_monitor.get_update_future(),
                                c: sleeper(if mobile_interruptable_platform { Duration::from_millis(100) } else { Duration::from_secs(FASTEST_TIMER) }),
                        };
                        match fut.await {
-                               SelectorOutput::A => true,
-                               SelectorOutput::B => false,
+                               SelectorOutput::A|SelectorOutput::B => {},
                                SelectorOutput::C(exit) => {
                                        should_break = exit;
-                                       false
                                }
                        }
                }, |t| sleeper(Duration::from_secs(t)),
@@ -675,7 +715,27 @@ where
                                task::Poll::Ready(exit) => { should_break = exit; true },
                                task::Poll::Pending => false,
                        }
-               }, mobile_interruptable_platform)
+               }, mobile_interruptable_platform, fetch_time,
+       )
+}
+
+#[cfg(feature = "futures")]
+async fn process_onion_message_handler_events_async<
+       EventHandlerFuture: core::future::Future<Output = ()>,
+       EventHandler: Fn(Event) -> EventHandlerFuture,
+       PM: 'static + Deref + Send + Sync,
+>(
+       peer_manager: &PM, handler: EventHandler
+)
+where
+       PM::Target: APeerManager + Send + Sync,
+{
+       let events = core::cell::RefCell::new(Vec::new());
+       peer_manager.onion_message_handler().process_pending_events(&|e| events.borrow_mut().push(e));
+
+       for event in events.into_inner() {
+               handler(event).await
+       }
 }
 
 #[cfg(feature = "std")]
@@ -740,12 +800,11 @@ impl BackgroundProcessor {
                P: 'static + Deref + Send + Sync,
                EH: 'static + EventHandler + Send,
                PS: 'static + Deref + Send,
-               M: 'static + Deref<Target = ChainMonitor<<SP::Target as SignerProvider>::Signer, CF, T, F, L, P>> + Send + Sync,
+               M: 'static + Deref<Target = ChainMonitor<<SP::Target as SignerProvider>::EcdsaSigner, CF, T, F, L, P>> + Send + Sync,
                CM: 'static + Deref<Target = ChannelManager<CW, T, ES, NS, SP, F, R, L>> + Send + Sync,
                PGS: 'static + Deref<Target = P2PGossipSync<G, UL, L>> + Send + Sync,
                RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
-               APM: APeerManager + Send + Sync,
-               PM: 'static + Deref<Target = APM> + Send + Sync,
+               PM: 'static + Deref + Send + Sync,
                S: 'static + Deref<Target = SC> + Send + Sync,
                SC: for <'b> WriteableScore<'b>,
        >(
@@ -755,7 +814,7 @@ impl BackgroundProcessor {
        where
                UL::Target: 'static + UtxoLookup,
                CF::Target: 'static + chain::Filter,
-               CW::Target: 'static + chain::Watch<<SP::Target as SignerProvider>::Signer>,
+               CW::Target: 'static + chain::Watch<<SP::Target as SignerProvider>::EcdsaSigner>,
                T::Target: 'static + BroadcasterInterface,
                ES::Target: 'static + EntropySource,
                NS::Target: 'static + NodeSigner,
@@ -763,8 +822,9 @@ impl BackgroundProcessor {
                F::Target: 'static + FeeEstimator,
                R::Target: 'static + Router,
                L::Target: 'static + Logger,
-               P::Target: 'static + Persist<<SP::Target as SignerProvider>::Signer>,
+               P::Target: 'static + Persist<<SP::Target as SignerProvider>::EcdsaSigner>,
                PS::Target: 'static + Persister<'a, CW, T, ES, NS, SP, F, R, L, SC>,
+               PM::Target: APeerManager + Send + Sync,
        {
                let stop_thread = Arc::new(AtomicBool::new(false));
                let stop_thread_clone = stop_thread.clone();
@@ -775,7 +835,10 @@ impl BackgroundProcessor {
                                        handle_network_graph_update(network_graph, &event)
                                }
                                if let Some(ref scorer) = scorer {
-                                       if update_scorer(scorer, &event) {
+                                       use std::time::SystemTime;
+                                       let duration_since_epoch = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH)
+                                               .expect("Time should be sometime after 1970");
+                                       if update_scorer(scorer, &event, duration_since_epoch) {
                                                log_trace!(logger, "Persisting scorer after update");
                                                if let Err(e) = persister.persist_scorer(&scorer) {
                                                        log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
@@ -784,14 +847,23 @@ impl BackgroundProcessor {
                                }
                                event_handler.handle_event(event);
                        };
-                       define_run_body!(persister, chain_monitor, chain_monitor.process_pending_events(&event_handler),
+                       define_run_body!(
+                               persister, chain_monitor, chain_monitor.process_pending_events(&event_handler),
                                channel_manager, channel_manager.process_pending_events(&event_handler),
-                               gossip_sync, peer_manager, logger, scorer, stop_thread.load(Ordering::Acquire),
-                               Sleeper::from_two_futures(
-                                       channel_manager.get_persistable_update_future(),
+                               peer_manager,
+                               peer_manager.onion_message_handler().process_pending_events(&event_handler),
+                               gossip_sync, logger, scorer, stop_thread.load(Ordering::Acquire),
+                               { Sleeper::from_two_futures(
+                                       channel_manager.get_event_or_persistence_needed_future(),
                                        chain_monitor.get_update_future()
-                               ).wait_timeout(Duration::from_millis(100)),
-                               |_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur, false)
+                               ).wait_timeout(Duration::from_millis(100)); },
+                               |_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur, false,
+                               || {
+                                       use std::time::SystemTime;
+                                       Some(SystemTime::now().duration_since(SystemTime::UNIX_EPOCH)
+                                               .expect("Time should be sometime after 1970"))
+                               },
+                       )
                });
                Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
        }
@@ -847,7 +919,7 @@ impl Drop for BackgroundProcessor {
 #[cfg(all(feature = "std", test))]
 mod tests {
        use bitcoin::blockdata::constants::{genesis_block, ChainHash};
-       use bitcoin::blockdata::locktime::PackedLockTime;
+       use bitcoin::blockdata::locktime::absolute::LockTime;
        use bitcoin::blockdata::transaction::{Transaction, TxOut};
        use bitcoin::network::constants::Network;
        use bitcoin::secp256k1::{SecretKey, PublicKey, Secp256k1};
@@ -864,13 +936,16 @@ mod tests {
        use lightning::ln::functional_test_utils::*;
        use lightning::ln::msgs::{ChannelMessageHandler, Init};
        use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler};
-       use lightning::routing::gossip::{NetworkGraph, NodeId, P2PGossipSync};
-       use lightning::routing::router::{DefaultRouter, Path, RouteHop};
-       use lightning::routing::scoring::{ChannelUsage, ScoreUpdate, ScoreLookUp};
+       use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
+       use lightning::routing::scoring::{ChannelUsage, ScoreUpdate, ScoreLookUp, LockableScore};
+       use lightning::routing::router::{DefaultRouter, Path, RouteHop, CandidateRouteHop};
        use lightning::util::config::UserConfig;
        use lightning::util::ser::Writeable;
        use lightning::util::test_utils;
-       use lightning::util::persist::{KVStore, CHANNEL_MANAGER_PERSISTENCE_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_SUB_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_KEY, NETWORK_GRAPH_PERSISTENCE_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_SUB_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_KEY, SCORER_PERSISTENCE_NAMESPACE, SCORER_PERSISTENCE_SUB_NAMESPACE, SCORER_PERSISTENCE_KEY};
+       use lightning::util::persist::{KVStore,
+               CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_KEY,
+               NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_KEY,
+               SCORER_PERSISTENCE_PRIMARY_NAMESPACE, SCORER_PERSISTENCE_SECONDARY_NAMESPACE, SCORER_PERSISTENCE_KEY};
        use lightning_persister::fs_store::FilesystemStore;
        use std::collections::VecDeque;
        use std::{fs, env};
@@ -893,6 +968,11 @@ mod tests {
                fn disconnect_socket(&mut self) {}
        }
 
+       #[cfg(c_bindings)]
+       type LockingWrapper<T> = lightning::routing::scoring::MultiThreadedLockableScore<T>;
+       #[cfg(not(c_bindings))]
+       type LockingWrapper<T> = Mutex<T>;
+
        type ChannelManager =
                channelmanager::ChannelManager<
                        Arc<ChainMonitor>,
@@ -904,7 +984,7 @@ mod tests {
                        Arc<DefaultRouter<
                                Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
                                Arc<test_utils::TestLogger>,
-                               Arc<Mutex<TestScorer>>,
+                               Arc<LockingWrapper<TestScorer>>,
                                (),
                                TestScorer>
                        >,
@@ -926,7 +1006,7 @@ mod tests {
                network_graph: Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
                logger: Arc<test_utils::TestLogger>,
                best_block: BestBlock,
-               scorer: Arc<Mutex<TestScorer>>,
+               scorer: Arc<LockingWrapper<TestScorer>>,
        }
 
        impl Node {
@@ -985,13 +1065,13 @@ mod tests {
        }
 
        impl KVStore for Persister {
-               fn read(&self, namespace: &str, sub_namespace: &str, key: &str) -> lightning::io::Result<Vec<u8>> {
-                       self.kv_store.read(namespace, sub_namespace, key)
+               fn read(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> lightning::io::Result<Vec<u8>> {
+                       self.kv_store.read(primary_namespace, secondary_namespace, key)
                }
 
-               fn write(&self, namespace: &str, sub_namespace: &str, key: &str, buf: &[u8]) -> lightning::io::Result<()> {
-                       if namespace == CHANNEL_MANAGER_PERSISTENCE_NAMESPACE &&
-                               sub_namespace == CHANNEL_MANAGER_PERSISTENCE_SUB_NAMESPACE &&
+               fn write(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8]) -> lightning::io::Result<()> {
+                       if primary_namespace == CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE &&
+                               secondary_namespace == CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE &&
                                key == CHANNEL_MANAGER_PERSISTENCE_KEY
                        {
                                if let Some((error, message)) = self.manager_error {
@@ -999,8 +1079,8 @@ mod tests {
                                }
                        }
 
-                       if namespace == NETWORK_GRAPH_PERSISTENCE_NAMESPACE &&
-                               sub_namespace == NETWORK_GRAPH_PERSISTENCE_SUB_NAMESPACE &&
+                       if primary_namespace == NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE &&
+                               secondary_namespace == NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE &&
                                key == NETWORK_GRAPH_PERSISTENCE_KEY
                        {
                                if let Some(sender) = &self.graph_persistence_notifier {
@@ -1015,8 +1095,8 @@ mod tests {
                                }
                        }
 
-                       if namespace == SCORER_PERSISTENCE_NAMESPACE &&
-                               sub_namespace == SCORER_PERSISTENCE_SUB_NAMESPACE &&
+                       if primary_namespace == SCORER_PERSISTENCE_PRIMARY_NAMESPACE &&
+                               secondary_namespace == SCORER_PERSISTENCE_SECONDARY_NAMESPACE &&
                                key == SCORER_PERSISTENCE_KEY
                        {
                                if let Some((error, message)) = self.scorer_error {
@@ -1024,15 +1104,15 @@ mod tests {
                                }
                        }
 
-                       self.kv_store.write(namespace, sub_namespace, key, buf)
+                       self.kv_store.write(primary_namespace, secondary_namespace, key, buf)
                }
 
-               fn remove(&self, namespace: &str, sub_namespace: &str, key: &str, lazy: bool) -> lightning::io::Result<()> {
-                       self.kv_store.remove(namespace, sub_namespace, key, lazy)
+               fn remove(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool) -> lightning::io::Result<()> {
+                       self.kv_store.remove(primary_namespace, secondary_namespace, key, lazy)
                }
 
-               fn list(&self, namespace: &str, sub_namespace: &str) -> lightning::io::Result<Vec<String>> {
-                       self.kv_store.list(namespace, sub_namespace)
+               fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> lightning::io::Result<Vec<String>> {
+                       self.kv_store.list(primary_namespace, secondary_namespace)
                }
        }
 
@@ -1065,12 +1145,12 @@ mod tests {
        impl ScoreLookUp for TestScorer {
                type ScoreParams = ();
                fn channel_penalty_msat(
-                       &self, _short_channel_id: u64, _source: &NodeId, _target: &NodeId, _usage: ChannelUsage, _score_params: &Self::ScoreParams
+                       &self, _candidate: &CandidateRouteHop, _usage: ChannelUsage, _score_params: &Self::ScoreParams
                ) -> u64 { unimplemented!(); }
        }
 
        impl ScoreUpdate for TestScorer {
-               fn payment_path_failed(&mut self, actual_path: &Path, actual_short_channel_id: u64) {
+               fn payment_path_failed(&mut self, actual_path: &Path, actual_short_channel_id: u64, _: Duration) {
                        if let Some(expectations) = &mut self.event_expectations {
                                match expectations.pop_front().unwrap() {
                                        TestResult::PaymentFailure { path, short_channel_id } => {
@@ -1090,7 +1170,7 @@ mod tests {
                        }
                }
 
-               fn payment_path_successful(&mut self, actual_path: &Path) {
+               fn payment_path_successful(&mut self, actual_path: &Path, _: Duration) {
                        if let Some(expectations) = &mut self.event_expectations {
                                match expectations.pop_front().unwrap() {
                                        TestResult::PaymentFailure { path, .. } => {
@@ -1109,7 +1189,7 @@ mod tests {
                        }
                }
 
-               fn probe_failed(&mut self, actual_path: &Path, _: u64) {
+               fn probe_failed(&mut self, actual_path: &Path, _: u64, _: Duration) {
                        if let Some(expectations) = &mut self.event_expectations {
                                match expectations.pop_front().unwrap() {
                                        TestResult::PaymentFailure { path, .. } => {
@@ -1127,7 +1207,7 @@ mod tests {
                                }
                        }
                }
-               fn probe_successful(&mut self, actual_path: &Path) {
+               fn probe_successful(&mut self, actual_path: &Path, _: Duration) {
                        if let Some(expectations) = &mut self.event_expectations {
                                match expectations.pop_front().unwrap() {
                                        TestResult::PaymentFailure { path, .. } => {
@@ -1145,8 +1225,12 @@ mod tests {
                                }
                        }
                }
+               fn time_passed(&mut self, _: Duration) {}
        }
 
+       #[cfg(c_bindings)]
+       impl lightning::routing::scoring::Score for TestScorer {}
+
        impl Drop for TestScorer {
                fn drop(&mut self) {
                        if std::thread::panicking() {
@@ -1178,9 +1262,9 @@ mod tests {
                        let logger = Arc::new(test_utils::TestLogger::with_id(format!("node {}", i)));
                        let genesis_block = genesis_block(network);
                        let network_graph = Arc::new(NetworkGraph::new(network, logger.clone()));
-                       let scorer = Arc::new(Mutex::new(TestScorer::new()));
+                       let scorer = Arc::new(LockingWrapper::new(TestScorer::new()));
                        let seed = [i as u8; 32];
-                       let router = Arc::new(DefaultRouter::new(network_graph.clone(), logger.clone(), seed, scorer.clone(), ()));
+                       let router = Arc::new(DefaultRouter::new(network_graph.clone(), logger.clone(), seed, scorer.clone(), Default::default()));
                        let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Bitcoin));
                        let kv_store = Arc::new(FilesystemStore::new(format!("{}_persister_{}", &persist_dir, i).into()));
                        let now = Duration::from_secs(genesis_block.header.time as u64);
@@ -1232,7 +1316,7 @@ mod tests {
 
        macro_rules! begin_open_channel {
                ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
-                       $node_a.node.create_channel($node_b.node.get_our_node_id(), $channel_value, 100, 42, None).unwrap();
+                       $node_a.node.create_channel($node_b.node.get_our_node_id(), $channel_value, 100, 42, None, None).unwrap();
                        $node_b.node.handle_open_channel(&$node_a.node.get_our_node_id(), &get_event_msg!($node_a, MessageSendEvent::SendOpenChannel, $node_b.node.get_our_node_id()));
                        $node_a.node.handle_accept_channel(&$node_b.node.get_our_node_id(), &get_event_msg!($node_b, MessageSendEvent::SendAcceptChannel, $node_a.node.get_our_node_id()));
                }}
@@ -1245,7 +1329,7 @@ mod tests {
                                        assert_eq!(channel_value_satoshis, $channel_value);
                                        assert_eq!(user_channel_id, 42);
 
-                                       let tx = Transaction { version: 1 as i32, lock_time: PackedLockTime(0), input: Vec::new(), output: vec![TxOut {
+                                       let tx = Transaction { version: 1 as i32, lock_time: LockTime::ZERO, input: Vec::new(), output: vec![TxOut {
                                                value: channel_value_satoshis, script_pubkey: output_script.clone(),
                                        }]};
                                        (temporary_channel_id, tx)
@@ -1326,7 +1410,7 @@ mod tests {
                check_persisted_data!(nodes[0].node, filepath.clone());
 
                loop {
-                       if !nodes[0].node.get_persistence_condvar_value() { break }
+                       if !nodes[0].node.get_event_or_persist_condvar_value() { break }
                }
 
                // Force-close the channel.
@@ -1335,7 +1419,7 @@ mod tests {
                // Check that the force-close updates are persisted.
                check_persisted_data!(nodes[0].node, filepath.clone());
                loop {
-                       if !nodes[0].node.get_persistence_condvar_value() { break }
+                       if !nodes[0].node.get_event_or_persist_condvar_value() { break }
                }
 
                // Check network graph is persisted
@@ -1353,9 +1437,11 @@ mod tests {
 
        #[test]
        fn test_timer_tick_called() {
-               // Test that `ChannelManager::timer_tick_occurred` is called every `FRESHNESS_TIMER`,
-               // `ChainMonitor::rebroadcast_pending_claims` is called every `REBROADCAST_TIMER`, and
-               // `PeerManager::timer_tick_occurred` every `PING_TIMER`.
+               // Test that:
+               // - `ChannelManager::timer_tick_occurred` is called every `FRESHNESS_TIMER`,
+               // - `ChainMonitor::rebroadcast_pending_claims` is called every `REBROADCAST_TIMER`,
+               // - `PeerManager::timer_tick_occurred` is called every `PING_TIMER`, and
+               // - `OnionMessageHandler::timer_tick_occurred` is called every `ONION_MESSAGE_HANDLER_TIMER`.
                let (_, nodes) = create_nodes(1, "test_timer_tick_called");
                let data_dir = nodes[0].kv_store.get_data_dir();
                let persister = Arc::new(Persister::new(data_dir));
@@ -1366,9 +1452,11 @@ mod tests {
                        let desired_log_1 = "Calling ChannelManager's timer_tick_occurred".to_string();
                        let desired_log_2 = "Calling PeerManager's timer_tick_occurred".to_string();
                        let desired_log_3 = "Rebroadcasting monitor's pending claims".to_string();
-                       if log_entries.get(&("lightning_background_processor".to_string(), desired_log_1)).is_some() &&
-                               log_entries.get(&("lightning_background_processor".to_string(), desired_log_2)).is_some() &&
-                               log_entries.get(&("lightning_background_processor".to_string(), desired_log_3)).is_some() {
+                       let desired_log_4 = "Calling OnionMessageHandler's timer_tick_occurred".to_string();
+                       if log_entries.get(&("lightning_background_processor", desired_log_1)).is_some() &&
+                               log_entries.get(&("lightning_background_processor", desired_log_2)).is_some() &&
+                               log_entries.get(&("lightning_background_processor", desired_log_3)).is_some() &&
+                               log_entries.get(&("lightning_background_processor", desired_log_4)).is_some() {
                                break
                        }
                }
@@ -1415,7 +1503,7 @@ mod tests {
                                        tokio::time::sleep(dur).await;
                                        false // Never exit
                                })
-                       }, false,
+                       }, false, || Some(Duration::ZERO),
                );
                match bp_future.await {
                        Ok(_) => panic!("Expected error persisting manager"),
@@ -1546,8 +1634,8 @@ mod tests {
 
                loop {
                        let log_entries = nodes[0].logger.lines.lock().unwrap();
-                       let expected_log = "Persisting scorer".to_string();
-                       if log_entries.get(&("lightning_background_processor".to_string(), expected_log)).is_some() {
+                       let expected_log = "Calling time_passed and persisting scorer".to_string();
+                       if log_entries.get(&("lightning_background_processor", expected_log)).is_some() {
                                break
                        }
                }
@@ -1571,7 +1659,7 @@ mod tests {
                                $sleep;
                                let log_entries = $nodes[0].logger.lines.lock().unwrap();
                                let loop_counter = "Calling ChannelManager's timer_tick_occurred".to_string();
-                               if *log_entries.get(&("lightning_background_processor".to_string(), loop_counter))
+                               if *log_entries.get(&("lightning_background_processor", loop_counter))
                                        .unwrap_or(&0) > 1
                                {
                                        // Wait until the loop has gone around at least twice.
@@ -1645,7 +1733,7 @@ mod tests {
                                                _ = exit_receiver.changed() => true,
                                        }
                                })
-                       }, false,
+                       }, false, || Some(Duration::from_secs(1696300000)),
                );
 
                let t1 = tokio::spawn(bp_future);
@@ -1685,9 +1773,10 @@ mod tests {
                                channel_features: ChannelFeatures::empty(),
                                fee_msat: 0,
                                cltv_expiry_delta: MIN_CLTV_EXPIRY_DELTA as u32,
+                               maybe_announced_channel: true,
                        }], blinded_tail: None };
 
-                       $nodes[0].scorer.lock().unwrap().expect(TestResult::PaymentFailure { path: path.clone(), short_channel_id: scored_scid });
+                       $nodes[0].scorer.write_lock().expect(TestResult::PaymentFailure { path: path.clone(), short_channel_id: scored_scid });
                        $nodes[0].node.push_pending_event(Event::PaymentPathFailed {
                                payment_id: None,
                                payment_hash: PaymentHash([42; 32]),
@@ -1704,7 +1793,7 @@ mod tests {
 
                        // Ensure we'll score payments that were explicitly failed back by the destination as
                        // ProbeSuccess.
-                       $nodes[0].scorer.lock().unwrap().expect(TestResult::ProbeSuccess { path: path.clone() });
+                       $nodes[0].scorer.write_lock().expect(TestResult::ProbeSuccess { path: path.clone() });
                        $nodes[0].node.push_pending_event(Event::PaymentPathFailed {
                                payment_id: None,
                                payment_hash: PaymentHash([42; 32]),
@@ -1719,7 +1808,7 @@ mod tests {
                                _ => panic!("Unexpected event"),
                        }
 
-                       $nodes[0].scorer.lock().unwrap().expect(TestResult::PaymentSuccess { path: path.clone() });
+                       $nodes[0].scorer.write_lock().expect(TestResult::PaymentSuccess { path: path.clone() });
                        $nodes[0].node.push_pending_event(Event::PaymentPathSuccessful {
                                payment_id: PaymentId([42; 32]),
                                payment_hash: None,
@@ -1731,7 +1820,7 @@ mod tests {
                                _ => panic!("Unexpected event"),
                        }
 
-                       $nodes[0].scorer.lock().unwrap().expect(TestResult::ProbeSuccess { path: path.clone() });
+                       $nodes[0].scorer.write_lock().expect(TestResult::ProbeSuccess { path: path.clone() });
                        $nodes[0].node.push_pending_event(Event::ProbeSuccessful {
                                payment_id: PaymentId([42; 32]),
                                payment_hash: PaymentHash([42; 32]),
@@ -1743,7 +1832,7 @@ mod tests {
                                _ => panic!("Unexpected event"),
                        }
 
-                       $nodes[0].scorer.lock().unwrap().expect(TestResult::ProbeFailure { path: path.clone() });
+                       $nodes[0].scorer.write_lock().expect(TestResult::ProbeFailure { path: path.clone() });
                        $nodes[0].node.push_pending_event(Event::ProbeFailed {
                                payment_id: PaymentId([42; 32]),
                                payment_hash: PaymentHash([42; 32]),
@@ -1782,7 +1871,7 @@ mod tests {
 
                let log_entries = nodes[0].logger.lines.lock().unwrap();
                let expected_log = "Persisting scorer after update".to_string();
-               assert_eq!(*log_entries.get(&("lightning_background_processor".to_string(), expected_log)).unwrap(), 5);
+               assert_eq!(*log_entries.get(&("lightning_background_processor", expected_log)).unwrap(), 5);
        }
 
        #[tokio::test]
@@ -1819,7 +1908,7 @@ mod tests {
                                                _ = exit_receiver.changed() => true,
                                        }
                                })
-                       }, false,
+                       }, false, || Some(Duration::ZERO),
                );
                let t1 = tokio::spawn(bp_future);
                let t2 = tokio::spawn(async move {
@@ -1828,7 +1917,7 @@ mod tests {
 
                        let log_entries = nodes[0].logger.lines.lock().unwrap();
                        let expected_log = "Persisting scorer after update".to_string();
-                       assert_eq!(*log_entries.get(&("lightning_background_processor".to_string(), expected_log)).unwrap(), 5);
+                       assert_eq!(*log_entries.get(&("lightning_background_processor", expected_log)).unwrap(), 5);
                });
 
                let (r1, r2) = tokio::join!(t1, t2);