AsyncPaymentsMessageHandler trait for OnionMessenger
[rust-lightning] / lightning-background-processor / src / lib.rs
index 62c03b81dfe89288e726ae1ee89f383322ce68ae..aae64e981bb54271d0279c26db666bdee77f4d10 100644 (file)
@@ -24,16 +24,18 @@ extern crate lightning_rapid_gossip_sync;
 use lightning::chain;
 use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
 use lightning::chain::chainmonitor::{ChainMonitor, Persist};
-use lightning::sign::{EntropySource, NodeSigner, SignerProvider};
 use lightning::events::{Event, PathFailure};
 #[cfg(feature = "std")]
-use lightning::events::{EventHandler, EventsProvider};
-use lightning::ln::channelmanager::ChannelManager;
+use lightning::events::EventHandler;
+#[cfg(feature = "std")]
+use lightning::events::EventsProvider;
+
+use lightning::ln::channelmanager::AChannelManager;
 use lightning::ln::msgs::OnionMessageHandler;
+use lightning::onion_message::messenger::AOnionMessenger;
 use lightning::ln::peer_handler::APeerManager;
 use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
 use lightning::routing::utxo::UtxoLookup;
-use lightning::routing::router::Router;
 use lightning::routing::scoring::{ScoreUpdate, WriteableScore};
 use lightning::util::logger::Logger;
 use lightning::util::persist::Persister;
@@ -54,7 +56,7 @@ use std::thread::{self, JoinHandle};
 use std::time::Instant;
 
 #[cfg(not(feature = "std"))]
-use alloc::vec::Vec;
+use alloc::boxed::Box;
 
 /// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep
 /// Rust-Lightning running properly, and (2) either can or should be run in the background. Its
@@ -78,6 +80,8 @@ use alloc::vec::Vec;
 /// However, as long as [`ChannelMonitor`] backups are sound, no funds besides those used for
 /// unilateral chain closure fees are at risk.
 ///
+/// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
+/// [`ChannelManager::timer_tick_occurred`]: lightning::ln::channelmanager::ChannelManager::timer_tick_occurred
 /// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor
 /// [`Event`]: lightning::events::Event
 /// [`PeerManager::timer_tick_occurred`]: lightning::ln::peer_handler::PeerManager::timer_tick_occurred
@@ -113,7 +117,7 @@ const ONION_MESSAGE_HANDLER_TIMER: u64 = 1;
 const NETWORK_PRUNE_TIMER: u64 = 60 * 60;
 
 #[cfg(not(test))]
-const SCORER_PERSIST_TIMER: u64 = 60 * 60;
+const SCORER_PERSIST_TIMER: u64 = 60 * 5;
 #[cfg(test)]
 const SCORER_PERSIST_TIMER: u64 = 1;
 
@@ -244,30 +248,30 @@ fn handle_network_graph_update<L: Deref>(
 /// Updates scorer based on event and returns whether an update occurred so we can decide whether
 /// to persist.
 fn update_scorer<'a, S: 'static + Deref<Target = SC> + Send + Sync, SC: 'a + WriteableScore<'a>>(
-       scorer: &'a S, event: &Event
+       scorer: &'a S, event: &Event, duration_since_epoch: Duration,
 ) -> bool {
        match event {
                Event::PaymentPathFailed { ref path, short_channel_id: Some(scid), .. } => {
                        let mut score = scorer.write_lock();
-                       score.payment_path_failed(path, *scid);
+                       score.payment_path_failed(path, *scid, duration_since_epoch);
                },
                Event::PaymentPathFailed { ref path, payment_failed_permanently: true, .. } => {
                        // Reached if the destination explicitly failed it back. We treat this as a successful probe
                        // because the payment made it all the way to the destination with sufficient liquidity.
                        let mut score = scorer.write_lock();
-                       score.probe_successful(path);
+                       score.probe_successful(path, duration_since_epoch);
                },
                Event::PaymentPathSuccessful { path, .. } => {
                        let mut score = scorer.write_lock();
-                       score.payment_path_successful(path);
+                       score.payment_path_successful(path, duration_since_epoch);
                },
                Event::ProbeSuccessful { path, .. } => {
                        let mut score = scorer.write_lock();
-                       score.probe_successful(path);
+                       score.probe_successful(path, duration_since_epoch);
                },
                Event::ProbeFailed { path, short_channel_id: Some(scid), .. } => {
                        let mut score = scorer.write_lock();
-                       score.probe_failed(path, *scid);
+                       score.probe_failed(path, *scid, duration_since_epoch);
                },
                _ => return false,
        }
@@ -278,12 +282,13 @@ macro_rules! define_run_body {
        (
                $persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr,
                $channel_manager: ident, $process_channel_manager_events: expr,
-               $peer_manager: ident, $process_onion_message_handler_events: expr, $gossip_sync: ident,
+               $onion_messenger: ident, $process_onion_message_handler_events: expr,
+               $peer_manager: ident, $gossip_sync: ident,
                $logger: ident, $scorer: ident, $loop_exit_check: expr, $await: expr, $get_timer: expr,
-               $timer_elapsed: expr, $check_slow_await: expr
+               $timer_elapsed: expr, $check_slow_await: expr, $time_fetch: expr,
        ) => { {
                log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup");
-               $channel_manager.timer_tick_occurred();
+               $channel_manager.get_cm().timer_tick_occurred();
                log_trace!($logger, "Rebroadcasting monitor's pending claims on startup");
                $chain_monitor.rebroadcast_pending_claims();
 
@@ -294,6 +299,7 @@ macro_rules! define_run_body {
                let mut last_scorer_persist_call = $get_timer(SCORER_PERSIST_TIMER);
                let mut last_rebroadcast_call = $get_timer(REBROADCAST_TIMER);
                let mut have_pruned = false;
+               let mut have_decayed_scorer = false;
 
                loop {
                        $process_channel_manager_events;
@@ -332,19 +338,21 @@ macro_rules! define_run_body {
                                break;
                        }
 
-                       if $channel_manager.get_and_clear_needs_persistence() {
+                       if $channel_manager.get_cm().get_and_clear_needs_persistence() {
                                log_trace!($logger, "Persisting ChannelManager...");
-                               $persister.persist_manager(&*$channel_manager)?;
+                               $persister.persist_manager(&$channel_manager)?;
                                log_trace!($logger, "Done persisting ChannelManager.");
                        }
                        if $timer_elapsed(&mut last_freshness_call, FRESHNESS_TIMER) {
                                log_trace!($logger, "Calling ChannelManager's timer_tick_occurred");
-                               $channel_manager.timer_tick_occurred();
+                               $channel_manager.get_cm().timer_tick_occurred();
                                last_freshness_call = $get_timer(FRESHNESS_TIMER);
                        }
                        if $timer_elapsed(&mut last_onion_message_handler_call, ONION_MESSAGE_HANDLER_TIMER) {
-                               log_trace!($logger, "Calling OnionMessageHandler's timer_tick_occurred");
-                               $peer_manager.onion_message_handler().timer_tick_occurred();
+                               if let Some(om) = &$onion_messenger {
+                                       log_trace!($logger, "Calling OnionMessageHandler's timer_tick_occurred");
+                                       om.get_om().timer_tick_occurred();
+                               }
                                last_onion_message_handler_call = $get_timer(ONION_MESSAGE_HANDLER_TIMER);
                        }
                        if await_slow {
@@ -383,11 +391,10 @@ macro_rules! define_run_body {
                        if should_prune {
                                // The network graph must not be pruned while rapid sync completion is pending
                                if let Some(network_graph) = $gossip_sync.prunable_network_graph() {
-                                       #[cfg(feature = "std")] {
+                                       if let Some(duration_since_epoch) = $time_fetch() {
                                                log_trace!($logger, "Pruning and persisting network graph.");
-                                               network_graph.remove_stale_channels_and_tracking();
-                                       }
-                                       #[cfg(not(feature = "std"))] {
+                                               network_graph.remove_stale_channels_and_tracking_with_time(duration_since_epoch.as_secs());
+                                       } else {
                                                log_warn!($logger, "Not pruning network graph, consider enabling `std` or doing so manually with remove_stale_channels_and_tracking_with_time.");
                                                log_trace!($logger, "Persisting network graph.");
                                        }
@@ -402,9 +409,24 @@ macro_rules! define_run_body {
                                last_prune_call = $get_timer(prune_timer);
                        }
 
+                       if !have_decayed_scorer {
+                               if let Some(ref scorer) = $scorer {
+                                       if let Some(duration_since_epoch) = $time_fetch() {
+                                               log_trace!($logger, "Calling time_passed on scorer at startup");
+                                               scorer.write_lock().time_passed(duration_since_epoch);
+                                       }
+                               }
+                               have_decayed_scorer = true;
+                       }
+
                        if $timer_elapsed(&mut last_scorer_persist_call, SCORER_PERSIST_TIMER) {
                                if let Some(ref scorer) = $scorer {
-                                       log_trace!($logger, "Persisting scorer");
+                                       if let Some(duration_since_epoch) = $time_fetch() {
+                                               log_trace!($logger, "Calling time_passed and persisting scorer");
+                                               scorer.write_lock().time_passed(duration_since_epoch);
+                                       } else {
+                                               log_trace!($logger, "Persisting scorer");
+                                       }
                                        if let Err(e) = $persister.persist_scorer(&scorer) {
                                                log_error!($logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
                                        }
@@ -422,7 +444,7 @@ macro_rules! define_run_body {
                // After we exit, ensure we persist the ChannelManager one final time - this avoids
                // some races where users quit while channel updates were in-flight, with
                // ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
-               $persister.persist_manager(&*$channel_manager)?;
+               $persister.persist_manager(&$channel_manager)?;
 
                // Persist Scorer on exit
                if let Some(ref scorer) = $scorer {
@@ -510,52 +532,78 @@ use core::task;
 /// are unsure, you should set the flag, as the performance impact of it is minimal unless there
 /// are hundreds or thousands of simultaneous process calls running.
 ///
+/// The `fetch_time` parameter should return the current wall clock time, if one is available. If
+/// no time is available, some features may be disabled, however the node will still operate fine.
+///
 /// For example, in order to process background events in a [Tokio](https://tokio.rs/) task, you
 /// could setup `process_events_async` like this:
 /// ```
 /// # use lightning::io;
 /// # use std::sync::{Arc, RwLock};
 /// # use std::sync::atomic::{AtomicBool, Ordering};
+/// # use std::time::SystemTime;
 /// # use lightning_background_processor::{process_events_async, GossipSync};
-/// # struct MyStore {}
-/// # impl lightning::util::persist::KVStore for MyStore {
+/// # struct Logger {}
+/// # impl lightning::util::logger::Logger for Logger {
+/// #     fn log(&self, _record: lightning::util::logger::Record) {}
+/// # }
+/// # struct Store {}
+/// # impl lightning::util::persist::KVStore for Store {
 /// #     fn read(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> io::Result<Vec<u8>> { Ok(Vec::new()) }
 /// #     fn write(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8]) -> io::Result<()> { Ok(()) }
 /// #     fn remove(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool) -> io::Result<()> { Ok(()) }
 /// #     fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> io::Result<Vec<String>> { Ok(Vec::new()) }
 /// # }
-/// # struct MyEventHandler {}
-/// # impl MyEventHandler {
+/// # struct EventHandler {}
+/// # impl EventHandler {
 /// #     async fn handle_event(&self, _: lightning::events::Event) {}
 /// # }
 /// # #[derive(Eq, PartialEq, Clone, Hash)]
-/// # struct MySocketDescriptor {}
-/// # impl lightning::ln::peer_handler::SocketDescriptor for MySocketDescriptor {
+/// # struct SocketDescriptor {}
+/// # impl lightning::ln::peer_handler::SocketDescriptor for SocketDescriptor {
 /// #     fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize { 0 }
 /// #     fn disconnect_socket(&mut self) {}
 /// # }
-/// # type MyBroadcaster = dyn lightning::chain::chaininterface::BroadcasterInterface + Send + Sync;
-/// # type MyFeeEstimator = dyn lightning::chain::chaininterface::FeeEstimator + Send + Sync;
-/// # type MyNodeSigner = dyn lightning::sign::NodeSigner + Send + Sync;
-/// # type MyUtxoLookup = dyn lightning::routing::utxo::UtxoLookup + Send + Sync;
-/// # type MyFilter = dyn lightning::chain::Filter + Send + Sync;
-/// # type MyLogger = dyn lightning::util::logger::Logger + Send + Sync;
-/// # type MyChainMonitor = lightning::chain::chainmonitor::ChainMonitor<lightning::sign::InMemorySigner, Arc<MyFilter>, Arc<MyBroadcaster>, Arc<MyFeeEstimator>, Arc<MyLogger>, Arc<MyStore>>;
-/// # type MyPeerManager = lightning::ln::peer_handler::SimpleArcPeerManager<MySocketDescriptor, MyChainMonitor, MyBroadcaster, MyFeeEstimator, Arc<MyUtxoLookup>, MyLogger>;
-/// # type MyNetworkGraph = lightning::routing::gossip::NetworkGraph<Arc<MyLogger>>;
-/// # type MyGossipSync = lightning::routing::gossip::P2PGossipSync<Arc<MyNetworkGraph>, Arc<MyUtxoLookup>, Arc<MyLogger>>;
-/// # type MyChannelManager = lightning::ln::channelmanager::SimpleArcChannelManager<MyChainMonitor, MyBroadcaster, MyFeeEstimator, MyLogger>;
-/// # type MyScorer = RwLock<lightning::routing::scoring::ProbabilisticScorer<Arc<MyNetworkGraph>, Arc<MyLogger>>>;
-///
-/// # async fn setup_background_processing(my_persister: Arc<MyStore>, my_event_handler: Arc<MyEventHandler>, my_chain_monitor: Arc<MyChainMonitor>, my_channel_manager: Arc<MyChannelManager>, my_gossip_sync: Arc<MyGossipSync>, my_logger: Arc<MyLogger>, my_scorer: Arc<MyScorer>, my_peer_manager: Arc<MyPeerManager>) {
-///    let background_persister = Arc::clone(&my_persister);
-///    let background_event_handler = Arc::clone(&my_event_handler);
-///    let background_chain_mon = Arc::clone(&my_chain_monitor);
-///    let background_chan_man = Arc::clone(&my_channel_manager);
-///    let background_gossip_sync = GossipSync::p2p(Arc::clone(&my_gossip_sync));
-///    let background_peer_man = Arc::clone(&my_peer_manager);
-///    let background_logger = Arc::clone(&my_logger);
-///    let background_scorer = Arc::clone(&my_scorer);
+/// # type ChainMonitor<B, F, FE> = lightning::chain::chainmonitor::ChainMonitor<lightning::sign::InMemorySigner, Arc<F>, Arc<B>, Arc<FE>, Arc<Logger>, Arc<Store>>;
+/// # type NetworkGraph = lightning::routing::gossip::NetworkGraph<Arc<Logger>>;
+/// # type P2PGossipSync<UL> = lightning::routing::gossip::P2PGossipSync<Arc<NetworkGraph>, Arc<UL>, Arc<Logger>>;
+/// # type ChannelManager<B, F, FE> = lightning::ln::channelmanager::SimpleArcChannelManager<ChainMonitor<B, F, FE>, B, FE, Logger>;
+/// # type OnionMessenger<B, F, FE> = lightning::onion_message::messenger::OnionMessenger<Arc<lightning::sign::KeysManager>, Arc<lightning::sign::KeysManager>, Arc<Logger>, Arc<ChannelManager<B, F, FE>>, Arc<lightning::onion_message::messenger::DefaultMessageRouter<Arc<NetworkGraph>, Arc<Logger>, Arc<lightning::sign::KeysManager>>>, Arc<ChannelManager<B, F, FE>>, lightning::ln::peer_handler::IgnoringMessageHandler, lightning::ln::peer_handler::IgnoringMessageHandler>;
+/// # type Scorer = RwLock<lightning::routing::scoring::ProbabilisticScorer<Arc<NetworkGraph>, Arc<Logger>>>;
+/// # type PeerManager<B, F, FE, UL> = lightning::ln::peer_handler::SimpleArcPeerManager<SocketDescriptor, ChainMonitor<B, F, FE>, B, FE, Arc<UL>, Logger>;
+/// #
+/// # struct Node<
+/// #     B: lightning::chain::chaininterface::BroadcasterInterface + Send + Sync + 'static,
+/// #     F: lightning::chain::Filter + Send + Sync + 'static,
+/// #     FE: lightning::chain::chaininterface::FeeEstimator + Send + Sync + 'static,
+/// #     UL: lightning::routing::utxo::UtxoLookup + Send + Sync + 'static,
+/// # > {
+/// #     peer_manager: Arc<PeerManager<B, F, FE, UL>>,
+/// #     event_handler: Arc<EventHandler>,
+/// #     channel_manager: Arc<ChannelManager<B, F, FE>>,
+/// #     onion_messenger: Arc<OnionMessenger<B, F, FE>>,
+/// #     chain_monitor: Arc<ChainMonitor<B, F, FE>>,
+/// #     gossip_sync: Arc<P2PGossipSync<UL>>,
+/// #     persister: Arc<Store>,
+/// #     logger: Arc<Logger>,
+/// #     scorer: Arc<Scorer>,
+/// # }
+/// #
+/// # async fn setup_background_processing<
+/// #     B: lightning::chain::chaininterface::BroadcasterInterface + Send + Sync + 'static,
+/// #     F: lightning::chain::Filter + Send + Sync + 'static,
+/// #     FE: lightning::chain::chaininterface::FeeEstimator + Send + Sync + 'static,
+/// #     UL: lightning::routing::utxo::UtxoLookup + Send + Sync + 'static,
+/// # >(node: Node<B, F, FE, UL>) {
+///    let background_persister = Arc::clone(&node.persister);
+///    let background_event_handler = Arc::clone(&node.event_handler);
+///    let background_chain_mon = Arc::clone(&node.chain_monitor);
+///    let background_chan_man = Arc::clone(&node.channel_manager);
+///    let background_gossip_sync = GossipSync::p2p(Arc::clone(&node.gossip_sync));
+///    let background_peer_man = Arc::clone(&node.peer_manager);
+///    let background_onion_messenger = Arc::clone(&node.onion_messenger);
+///    let background_logger = Arc::clone(&node.logger);
+///    let background_scorer = Arc::clone(&node.scorer);
 ///
 ///    // Setup the sleeper.
 ///    let (stop_sender, stop_receiver) = tokio::sync::watch::channel(());
@@ -578,15 +626,17 @@ use core::task;
 ///                    |e| background_event_handler.handle_event(e),
 ///                    background_chain_mon,
 ///                    background_chan_man,
+///                    Some(background_onion_messenger),
 ///                    background_gossip_sync,
 ///                    background_peer_man,
 ///                    background_logger,
 ///                    Some(background_scorer),
 ///                    sleeper,
 ///                    mobile_interruptable_platform,
-///                    )
-///                    .await
-///                    .expect("Failed to process events");
+///                    || Some(SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap())
+///            )
+///            .await
+///            .expect("Failed to process events");
 ///    });
 ///
 ///    // Stop the background processing.
@@ -599,46 +649,41 @@ pub async fn process_events_async<
        'a,
        UL: 'static + Deref + Send + Sync,
        CF: 'static + Deref + Send + Sync,
-       CW: 'static + Deref + Send + Sync,
        T: 'static + Deref + Send + Sync,
-       ES: 'static + Deref + Send + Sync,
-       NS: 'static + Deref + Send + Sync,
-       SP: 'static + Deref + Send + Sync,
        F: 'static + Deref + Send + Sync,
-       R: 'static + Deref + Send + Sync,
        G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
        L: 'static + Deref + Send + Sync,
        P: 'static + Deref + Send + Sync,
        EventHandlerFuture: core::future::Future<Output = ()>,
        EventHandler: Fn(Event) -> EventHandlerFuture,
        PS: 'static + Deref + Send,
-       M: 'static + Deref<Target = ChainMonitor<<SP::Target as SignerProvider>::EcdsaSigner, CF, T, F, L, P>> + Send + Sync,
-       CM: 'static + Deref<Target = ChannelManager<CW, T, ES, NS, SP, F, R, L>> + Send + Sync,
+       M: 'static + Deref<Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P>> + Send + Sync,
+       CM: 'static + Deref + Send + Sync,
+       OM: 'static + Deref + Send + Sync,
        PGS: 'static + Deref<Target = P2PGossipSync<G, UL, L>> + Send + Sync,
        RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
        PM: 'static + Deref + Send + Sync,
        S: 'static + Deref<Target = SC> + Send + Sync,
        SC: for<'b> WriteableScore<'b>,
        SleepFuture: core::future::Future<Output = bool> + core::marker::Unpin,
-       Sleeper: Fn(Duration) -> SleepFuture
+       Sleeper: Fn(Duration) -> SleepFuture,
+       FetchTime: Fn() -> Option<Duration>,
 >(
        persister: PS, event_handler: EventHandler, chain_monitor: M, channel_manager: CM,
+       onion_messenger: Option<OM>,
        gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM, logger: L, scorer: Option<S>,
-       sleeper: Sleeper, mobile_interruptable_platform: bool,
+       sleeper: Sleeper, mobile_interruptable_platform: bool, fetch_time: FetchTime,
 ) -> Result<(), lightning::io::Error>
 where
        UL::Target: 'static + UtxoLookup,
        CF::Target: 'static + chain::Filter,
-       CW::Target: 'static + chain::Watch<<SP::Target as SignerProvider>::EcdsaSigner>,
        T::Target: 'static + BroadcasterInterface,
-       ES::Target: 'static + EntropySource,
-       NS::Target: 'static + NodeSigner,
-       SP::Target: 'static + SignerProvider,
        F::Target: 'static + FeeEstimator,
-       R::Target: 'static + Router,
        L::Target: 'static + Logger,
-       P::Target: 'static + Persist<<SP::Target as SignerProvider>::EcdsaSigner>,
-       PS::Target: 'static + Persister<'a, CW, T, ES, NS, SP, F, R, L, SC>,
+       P::Target: 'static + Persist<<CM::Target as AChannelManager>::Signer>,
+       PS::Target: 'static + Persister<'a, CM, L, SC>,
+       CM::Target: AChannelManager + Send + Sync,
+       OM::Target: AOnionMessenger + Send + Sync,
        PM::Target: APeerManager + Send + Sync,
 {
        let mut should_break = false;
@@ -648,29 +693,32 @@ where
                let scorer = &scorer;
                let logger = &logger;
                let persister = &persister;
-               async move {
+               let fetch_time = &fetch_time;
+               Box::pin(async move { // We should be able to drop the Box once our MSRV is 1.68
                        if let Some(network_graph) = network_graph {
                                handle_network_graph_update(network_graph, &event)
                        }
                        if let Some(ref scorer) = scorer {
-                               if update_scorer(scorer, &event) {
-                                       log_trace!(logger, "Persisting scorer after update");
-                                       if let Err(e) = persister.persist_scorer(&scorer) {
-                                               log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
+                               if let Some(duration_since_epoch) = fetch_time() {
+                                       if update_scorer(scorer, &event, duration_since_epoch) {
+                                               log_trace!(logger, "Persisting scorer after update");
+                                               if let Err(e) = persister.persist_scorer(&scorer) {
+                                                       log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
+                                               }
                                        }
                                }
                        }
                        event_handler(event).await;
-               }
+               })
        };
        define_run_body!(
                persister, chain_monitor,
                chain_monitor.process_pending_events_async(async_event_handler).await,
-               channel_manager, channel_manager.process_pending_events_async(async_event_handler).await,
-               peer_manager, process_onion_message_handler_events_async(&peer_manager, async_event_handler).await,
-               gossip_sync, logger, scorer, should_break, {
+               channel_manager, channel_manager.get_cm().process_pending_events_async(async_event_handler).await,
+               onion_messenger, if let Some(om) = &onion_messenger { om.get_om().process_pending_events_async(async_event_handler).await },
+               peer_manager, gossip_sync, logger, scorer, should_break, {
                        let fut = Selector {
-                               a: channel_manager.get_event_or_persistence_needed_future(),
+                               a: channel_manager.get_cm().get_event_or_persistence_needed_future(),
                                b: chain_monitor.get_update_future(),
                                c: sleeper(if mobile_interruptable_platform { Duration::from_millis(100) } else { Duration::from_secs(FASTEST_TIMER) }),
                        };
@@ -688,31 +736,10 @@ where
                                task::Poll::Ready(exit) => { should_break = exit; true },
                                task::Poll::Pending => false,
                        }
-               }, mobile_interruptable_platform
+               }, mobile_interruptable_platform, fetch_time,
        )
 }
 
-#[cfg(feature = "futures")]
-async fn process_onion_message_handler_events_async<
-       EventHandlerFuture: core::future::Future<Output = ()>,
-       EventHandler: Fn(Event) -> EventHandlerFuture,
-       PM: 'static + Deref + Send + Sync,
->(
-       peer_manager: &PM, handler: EventHandler
-)
-where
-       PM::Target: APeerManager + Send + Sync,
-{
-       use lightning::events::EventsProvider;
-
-       let events = core::cell::RefCell::new(Vec::new());
-       peer_manager.onion_message_handler().process_pending_events(&|e| events.borrow_mut().push(e));
-
-       for event in events.into_inner() {
-               handler(event).await
-       }
-}
-
 #[cfg(feature = "std")]
 impl BackgroundProcessor {
        /// Start a background thread that takes care of responsibilities enumerated in the [top-level
@@ -763,20 +790,16 @@ impl BackgroundProcessor {
                'a,
                UL: 'static + Deref + Send + Sync,
                CF: 'static + Deref + Send + Sync,
-               CW: 'static + Deref + Send + Sync,
                T: 'static + Deref + Send + Sync,
-               ES: 'static + Deref + Send + Sync,
-               NS: 'static + Deref + Send + Sync,
-               SP: 'static + Deref + Send + Sync,
                F: 'static + Deref + Send + Sync,
-               R: 'static + Deref + Send + Sync,
                G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
                L: 'static + Deref + Send + Sync,
                P: 'static + Deref + Send + Sync,
                EH: 'static + EventHandler + Send,
                PS: 'static + Deref + Send,
-               M: 'static + Deref<Target = ChainMonitor<<SP::Target as SignerProvider>::EcdsaSigner, CF, T, F, L, P>> + Send + Sync,
-               CM: 'static + Deref<Target = ChannelManager<CW, T, ES, NS, SP, F, R, L>> + Send + Sync,
+               M: 'static + Deref<Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P>> + Send + Sync,
+               CM: 'static + Deref + Send + Sync,
+               OM: 'static + Deref + Send + Sync,
                PGS: 'static + Deref<Target = P2PGossipSync<G, UL, L>> + Send + Sync,
                RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
                PM: 'static + Deref + Send + Sync,
@@ -784,21 +807,19 @@ impl BackgroundProcessor {
                SC: for <'b> WriteableScore<'b>,
        >(
                persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
+               onion_messenger: Option<OM>,
                gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM, logger: L, scorer: Option<S>,
        ) -> Self
        where
                UL::Target: 'static + UtxoLookup,
                CF::Target: 'static + chain::Filter,
-               CW::Target: 'static + chain::Watch<<SP::Target as SignerProvider>::EcdsaSigner>,
                T::Target: 'static + BroadcasterInterface,
-               ES::Target: 'static + EntropySource,
-               NS::Target: 'static + NodeSigner,
-               SP::Target: 'static + SignerProvider,
                F::Target: 'static + FeeEstimator,
-               R::Target: 'static + Router,
                L::Target: 'static + Logger,
-               P::Target: 'static + Persist<<SP::Target as SignerProvider>::EcdsaSigner>,
-               PS::Target: 'static + Persister<'a, CW, T, ES, NS, SP, F, R, L, SC>,
+               P::Target: 'static + Persist<<CM::Target as AChannelManager>::Signer>,
+               PS::Target: 'static + Persister<'a, CM, L, SC>,
+               CM::Target: AChannelManager + Send + Sync,
+               OM::Target: AOnionMessenger + Send + Sync,
                PM::Target: APeerManager + Send + Sync,
        {
                let stop_thread = Arc::new(AtomicBool::new(false));
@@ -810,7 +831,10 @@ impl BackgroundProcessor {
                                        handle_network_graph_update(network_graph, &event)
                                }
                                if let Some(ref scorer) = scorer {
-                                       if update_scorer(scorer, &event) {
+                                       use std::time::SystemTime;
+                                       let duration_since_epoch = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH)
+                                               .expect("Time should be sometime after 1970");
+                                       if update_scorer(scorer, &event, duration_since_epoch) {
                                                log_trace!(logger, "Persisting scorer after update");
                                                if let Err(e) = persister.persist_scorer(&scorer) {
                                                        log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
@@ -821,15 +845,19 @@ impl BackgroundProcessor {
                        };
                        define_run_body!(
                                persister, chain_monitor, chain_monitor.process_pending_events(&event_handler),
-                               channel_manager, channel_manager.process_pending_events(&event_handler),
-                               peer_manager,
-                               peer_manager.onion_message_handler().process_pending_events(&event_handler),
-                               gossip_sync, logger, scorer, stop_thread.load(Ordering::Acquire),
+                               channel_manager, channel_manager.get_cm().process_pending_events(&event_handler),
+                               onion_messenger, if let Some(om) = &onion_messenger { om.get_om().process_pending_events(&event_handler) },
+                               peer_manager, gossip_sync, logger, scorer, stop_thread.load(Ordering::Acquire),
                                { Sleeper::from_two_futures(
-                                       channel_manager.get_event_or_persistence_needed_future(),
-                                       chain_monitor.get_update_future()
+                                       &channel_manager.get_cm().get_event_or_persistence_needed_future(),
+                                       &chain_monitor.get_update_future()
                                ).wait_timeout(Duration::from_millis(100)); },
-                               |_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur, false
+                               |_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur, false,
+                               || {
+                                       use std::time::SystemTime;
+                                       Some(SystemTime::now().duration_since(SystemTime::UNIX_EPOCH)
+                                               .expect("Time should be sometime after 1970"))
+                               },
                        )
                });
                Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
@@ -885,24 +913,28 @@ impl Drop for BackgroundProcessor {
 
 #[cfg(all(feature = "std", test))]
 mod tests {
+       use bitcoin::{Amount, ScriptBuf, Txid};
        use bitcoin::blockdata::constants::{genesis_block, ChainHash};
        use bitcoin::blockdata::locktime::absolute::LockTime;
        use bitcoin::blockdata::transaction::{Transaction, TxOut};
-       use bitcoin::network::constants::Network;
+       use bitcoin::hashes::Hash;
+       use bitcoin::network::Network;
        use bitcoin::secp256k1::{SecretKey, PublicKey, Secp256k1};
-       use lightning::chain::{BestBlock, Confirm, chainmonitor};
+       use bitcoin::transaction::Version;
+       use lightning::chain::{BestBlock, Confirm, chainmonitor, Filter};
        use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
-       use lightning::sign::{InMemorySigner, KeysManager};
+       use lightning::sign::{InMemorySigner, KeysManager, ChangeDestinationSource};
        use lightning::chain::transaction::OutPoint;
        use lightning::events::{Event, PathFailure, MessageSendEventsProvider, MessageSendEvent};
        use lightning::{get_event_msg, get_event};
-       use lightning::ln::PaymentHash;
+       use lightning::ln::types::{PaymentHash, ChannelId};
        use lightning::ln::channelmanager;
        use lightning::ln::channelmanager::{BREAKDOWN_TIMEOUT, ChainParameters, MIN_CLTV_EXPIRY_DELTA, PaymentId};
        use lightning::ln::features::{ChannelFeatures, NodeFeatures};
        use lightning::ln::functional_test_utils::*;
        use lightning::ln::msgs::{ChannelMessageHandler, Init};
        use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler};
+       use lightning::onion_message::messenger::{DefaultMessageRouter, OnionMessenger};
        use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
        use lightning::routing::scoring::{ChannelUsage, ScoreUpdate, ScoreLookUp, LockableScore};
        use lightning::routing::router::{DefaultRouter, Path, RouteHop, CandidateRouteHop};
@@ -913,6 +945,7 @@ mod tests {
                CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_KEY,
                NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_KEY,
                SCORER_PERSISTENCE_PRIMARY_NAMESPACE, SCORER_PERSISTENCE_SECONDARY_NAMESPACE, SCORER_PERSISTENCE_KEY};
+       use lightning::util::sweep::{OutputSweeper, OutputSpendStatus};
        use lightning_persister::fs_store::FilesystemStore;
        use std::collections::VecDeque;
        use std::{fs, env};
@@ -951,6 +984,7 @@ mod tests {
                        Arc<DefaultRouter<
                                Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
                                Arc<test_utils::TestLogger>,
+                               Arc<KeysManager>,
                                Arc<LockingWrapper<TestScorer>>,
                                (),
                                TestScorer>
@@ -962,11 +996,14 @@ mod tests {
        type PGS = Arc<P2PGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>>>;
        type RGS = Arc<RapidGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestLogger>>>;
 
+       type OM = OnionMessenger<Arc<KeysManager>, Arc<KeysManager>, Arc<test_utils::TestLogger>, Arc<ChannelManager>, Arc<DefaultMessageRouter<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestLogger>, Arc<KeysManager>>>, IgnoringMessageHandler, IgnoringMessageHandler, IgnoringMessageHandler>;
+
        struct Node {
                node: Arc<ChannelManager>,
+               messenger: Arc<OM>,
                p2p_gossip_sync: PGS,
                rapid_gossip_sync: RGS,
-               peer_manager: Arc<PeerManager<TestDescriptor, Arc<test_utils::TestChannelMessageHandler>, Arc<test_utils::TestRoutingMessageHandler>, IgnoringMessageHandler, Arc<test_utils::TestLogger>, IgnoringMessageHandler, Arc<KeysManager>>>,
+               peer_manager: Arc<PeerManager<TestDescriptor, Arc<test_utils::TestChannelMessageHandler>, Arc<test_utils::TestRoutingMessageHandler>, Arc<OM>, Arc<test_utils::TestLogger>, IgnoringMessageHandler, Arc<KeysManager>>>,
                chain_monitor: Arc<ChainMonitor>,
                kv_store: Arc<FilesystemStore>,
                tx_broadcaster: Arc<test_utils::TestBroadcaster>,
@@ -974,6 +1011,9 @@ mod tests {
                logger: Arc<test_utils::TestLogger>,
                best_block: BestBlock,
                scorer: Arc<LockingWrapper<TestScorer>>,
+               sweeper: Arc<OutputSweeper<Arc<test_utils::TestBroadcaster>, Arc<TestWallet>,
+                       Arc<test_utils::TestFeeEstimator>, Arc<dyn Filter + Sync + Send>, Arc<FilesystemStore>,
+                       Arc<test_utils::TestLogger>, Arc<KeysManager>>>,
        }
 
        impl Node {
@@ -1117,7 +1157,7 @@ mod tests {
        }
 
        impl ScoreUpdate for TestScorer {
-               fn payment_path_failed(&mut self, actual_path: &Path, actual_short_channel_id: u64) {
+               fn payment_path_failed(&mut self, actual_path: &Path, actual_short_channel_id: u64, _: Duration) {
                        if let Some(expectations) = &mut self.event_expectations {
                                match expectations.pop_front().unwrap() {
                                        TestResult::PaymentFailure { path, short_channel_id } => {
@@ -1137,7 +1177,7 @@ mod tests {
                        }
                }
 
-               fn payment_path_successful(&mut self, actual_path: &Path) {
+               fn payment_path_successful(&mut self, actual_path: &Path, _: Duration) {
                        if let Some(expectations) = &mut self.event_expectations {
                                match expectations.pop_front().unwrap() {
                                        TestResult::PaymentFailure { path, .. } => {
@@ -1156,7 +1196,7 @@ mod tests {
                        }
                }
 
-               fn probe_failed(&mut self, actual_path: &Path, _: u64) {
+               fn probe_failed(&mut self, actual_path: &Path, _: u64, _: Duration) {
                        if let Some(expectations) = &mut self.event_expectations {
                                match expectations.pop_front().unwrap() {
                                        TestResult::PaymentFailure { path, .. } => {
@@ -1174,7 +1214,7 @@ mod tests {
                                }
                        }
                }
-               fn probe_successful(&mut self, actual_path: &Path) {
+               fn probe_successful(&mut self, actual_path: &Path, _: Duration) {
                        if let Some(expectations) = &mut self.event_expectations {
                                match expectations.pop_front().unwrap() {
                                        TestResult::PaymentFailure { path, .. } => {
@@ -1192,6 +1232,7 @@ mod tests {
                                }
                        }
                }
+               fn time_passed(&mut self, _: Duration) {}
        }
 
        #[cfg(c_bindings)]
@@ -1211,6 +1252,14 @@ mod tests {
                }
        }
 
+       struct TestWallet {}
+
+       impl ChangeDestinationSource for TestWallet {
+               fn get_change_destination_script(&self) -> Result<ScriptBuf, ()> {
+                       Ok(ScriptBuf::new())
+               }
+       }
+
        fn get_full_filepath(filepath: String, filename: String) -> String {
                let mut path = PathBuf::from(filepath);
                path.push(filename);
@@ -1229,8 +1278,11 @@ mod tests {
                        let genesis_block = genesis_block(network);
                        let network_graph = Arc::new(NetworkGraph::new(network, logger.clone()));
                        let scorer = Arc::new(LockingWrapper::new(TestScorer::new()));
+                       let now = Duration::from_secs(genesis_block.header.time as u64);
                        let seed = [i as u8; 32];
-                       let router = Arc::new(DefaultRouter::new(network_graph.clone(), logger.clone(), seed, scorer.clone(), Default::default()));
+                       let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos()));
+                       let router = Arc::new(DefaultRouter::new(network_graph.clone(), logger.clone(), Arc::clone(&keys_manager), scorer.clone(), Default::default()));
+                       let msg_router = Arc::new(DefaultMessageRouter::new(network_graph.clone(), Arc::clone(&keys_manager)));
                        let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Bitcoin));
                        let kv_store = Arc::new(FilesystemStore::new(format!("{}_persister_{}", &persist_dir, i).into()));
                        let now = Duration::from_secs(genesis_block.header.time as u64);
@@ -1239,15 +1291,19 @@ mod tests {
                        let best_block = BestBlock::from_network(network);
                        let params = ChainParameters { network, best_block };
                        let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster.clone(), router.clone(), logger.clone(), keys_manager.clone(), keys_manager.clone(), keys_manager.clone(), UserConfig::default(), params, genesis_block.header.time));
+                       let messenger = Arc::new(OnionMessenger::new(keys_manager.clone(), keys_manager.clone(), logger.clone(), manager.clone(), msg_router.clone(), IgnoringMessageHandler {}, IgnoringMessageHandler {}, IgnoringMessageHandler {}));
+                       let wallet = Arc::new(TestWallet {});
+                       let sweeper = Arc::new(OutputSweeper::new(best_block, Arc::clone(&tx_broadcaster), Arc::clone(&fee_estimator),
+                               None::<Arc<dyn Filter + Sync + Send>>, Arc::clone(&keys_manager), wallet, Arc::clone(&kv_store), Arc::clone(&logger)));
                        let p2p_gossip_sync = Arc::new(P2PGossipSync::new(network_graph.clone(), Some(chain_source.clone()), logger.clone()));
                        let rapid_gossip_sync = Arc::new(RapidGossipSync::new(network_graph.clone(), logger.clone()));
                        let msg_handler = MessageHandler {
                                chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new(ChainHash::using_genesis_block(Network::Testnet))),
                                route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new()),
-                               onion_message_handler: IgnoringMessageHandler{}, custom_message_handler: IgnoringMessageHandler{}
+                               onion_message_handler: messenger.clone(), custom_message_handler: IgnoringMessageHandler{}
                        };
                        let peer_manager = Arc::new(PeerManager::new(msg_handler, 0, &seed, logger.clone(), keys_manager.clone()));
-                       let node = Node { node: manager, p2p_gossip_sync, rapid_gossip_sync, peer_manager, chain_monitor, kv_store, tx_broadcaster, network_graph, logger, best_block, scorer };
+                       let node = Node { node: manager, p2p_gossip_sync, rapid_gossip_sync, peer_manager, chain_monitor, kv_store, tx_broadcaster, network_graph, logger, best_block, scorer, sweeper, messenger };
                        nodes.push(node);
                }
 
@@ -1295,8 +1351,8 @@ mod tests {
                                        assert_eq!(channel_value_satoshis, $channel_value);
                                        assert_eq!(user_channel_id, 42);
 
-                                       let tx = Transaction { version: 1 as i32, lock_time: LockTime::ZERO, input: Vec::new(), output: vec![TxOut {
-                                               value: channel_value_satoshis, script_pubkey: output_script.clone(),
+                                       let tx = Transaction { version: Version::ONE, lock_time: LockTime::ZERO, input: Vec::new(), output: vec![TxOut {
+                                               value: Amount::from_sat(channel_value_satoshis), script_pubkey: output_script.clone(),
                                        }]};
                                        (temporary_channel_id, tx)
                                },
@@ -1307,8 +1363,8 @@ mod tests {
 
        fn confirm_transaction_depth(node: &mut Node, tx: &Transaction, depth: u32) {
                for i in 1..=depth {
-                       let prev_blockhash = node.best_block.block_hash();
-                       let height = node.best_block.height() + 1;
+                       let prev_blockhash = node.best_block.block_hash;
+                       let height = node.best_block.height + 1;
                        let header = create_dummy_header(prev_blockhash, height);
                        let txdata = vec![(0, tx)];
                        node.best_block = BestBlock::new(header.block_hash(), height);
@@ -1316,15 +1372,40 @@ mod tests {
                                1 => {
                                        node.node.transactions_confirmed(&header, &txdata, height);
                                        node.chain_monitor.transactions_confirmed(&header, &txdata, height);
+                                       node.sweeper.transactions_confirmed(&header, &txdata, height);
                                },
                                x if x == depth => {
+                                       // We need the TestBroadcaster to know about the new height so that it doesn't think
+                                       // we're violating the time lock requirements of transactions broadcasted at that
+                                       // point.
+                                       node.tx_broadcaster.blocks.lock().unwrap().push((genesis_block(Network::Bitcoin), height));
                                        node.node.best_block_updated(&header, height);
                                        node.chain_monitor.best_block_updated(&header, height);
+                                       node.sweeper.best_block_updated(&header, height);
                                },
                                _ => {},
                        }
                }
        }
+
+       fn advance_chain(node: &mut Node, num_blocks: u32) {
+               for i in 1..=num_blocks {
+                       let prev_blockhash = node.best_block.block_hash;
+                       let height = node.best_block.height + 1;
+                       let header = create_dummy_header(prev_blockhash, height);
+                       node.best_block = BestBlock::new(header.block_hash(), height);
+                       if i == num_blocks {
+                               // We need the TestBroadcaster to know about the new height so that it doesn't think
+                               // we're violating the time lock requirements of transactions broadcasted at that
+                               // point.
+                               node.tx_broadcaster.blocks.lock().unwrap().push((genesis_block(Network::Bitcoin), height));
+                               node.node.best_block_updated(&header, height);
+                               node.chain_monitor.best_block_updated(&header, height);
+                               node.sweeper.best_block_updated(&header, height);
+                       }
+               }
+       }
+
        fn confirm_transaction(node: &mut Node, tx: &Transaction) {
                confirm_transaction_depth(node, tx, ANTI_REORG_DELAY);
        }
@@ -1345,7 +1426,7 @@ mod tests {
                let data_dir = nodes[0].kv_store.get_data_dir();
                let persister = Arc::new(Persister::new(data_dir));
                let event_handler = |_: _| {};
-               let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
+               let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
 
                macro_rules! check_persisted_data {
                        ($node: expr, $filepath: expr) => {
@@ -1380,7 +1461,8 @@ mod tests {
                }
 
                // Force-close the channel.
-               nodes[0].node.force_close_broadcasting_latest_txn(&OutPoint { txid: tx.txid(), index: 0 }.to_channel_id(), &nodes[1].node.get_our_node_id()).unwrap();
+               let error_message = "Channel force-closed";
+               nodes[0].node.force_close_broadcasting_latest_txn(&ChannelId::v1_from_funding_outpoint(OutPoint { txid: tx.txid(), index: 0 }), &nodes[1].node.get_our_node_id(), error_message.to_string()).unwrap();
 
                // Check that the force-close updates are persisted.
                check_persisted_data!(nodes[0].node, filepath.clone());
@@ -1412,7 +1494,7 @@ mod tests {
                let data_dir = nodes[0].kv_store.get_data_dir();
                let persister = Arc::new(Persister::new(data_dir));
                let event_handler = |_: _| {};
-               let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
+               let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
                loop {
                        let log_entries = nodes[0].logger.lines.lock().unwrap();
                        let desired_log_1 = "Calling ChannelManager's timer_tick_occurred".to_string();
@@ -1441,7 +1523,7 @@ mod tests {
                let data_dir = nodes[0].kv_store.get_data_dir();
                let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
                let event_handler = |_: _| {};
-               let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
+               let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
                match bg_processor.join() {
                        Ok(_) => panic!("Expected error persisting manager"),
                        Err(e) => {
@@ -1462,14 +1544,14 @@ mod tests {
                let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
 
                let bp_future = super::process_events_async(
-                       persister, |_: _| {async {}}, nodes[0].chain_monitor.clone(), nodes[0].node.clone(),
+                       persister, |_: _| {async {}}, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()),
                        nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(),
                        Some(nodes[0].scorer.clone()), move |dur: Duration| {
                                Box::pin(async move {
                                        tokio::time::sleep(dur).await;
                                        false // Never exit
                                })
-                       }, false,
+                       }, false, || Some(Duration::ZERO),
                );
                match bp_future.await {
                        Ok(_) => panic!("Expected error persisting manager"),
@@ -1487,7 +1569,7 @@ mod tests {
                let data_dir = nodes[0].kv_store.get_data_dir();
                let persister = Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test"));
                let event_handler = |_: _| {};
-               let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
+               let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
 
                match bg_processor.stop() {
                        Ok(_) => panic!("Expected error persisting network graph"),
@@ -1505,7 +1587,7 @@ mod tests {
                let data_dir = nodes[0].kv_store.get_data_dir();
                let persister = Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test"));
                let event_handler = |_: _| {};
-               let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(),  nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
+               let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(),  nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
 
                match bg_processor.stop() {
                        Ok(_) => panic!("Expected error persisting scorer"),
@@ -1533,7 +1615,7 @@ mod tests {
                        _ => panic!("Unexpected event: {:?}", event),
                };
 
-               let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
+               let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
 
                // Open a channel and check that the FundingGenerationReady event was handled.
                begin_open_channel!(nodes[0], nodes[1], channel_value);
@@ -1556,6 +1638,9 @@ mod tests {
                let _as_channel_update = get_event_msg!(nodes[0], MessageSendEvent::SendChannelUpdate, nodes[1].node.get_our_node_id());
                nodes[1].node.handle_channel_ready(&nodes[0].node.get_our_node_id(), &as_funding);
                let _bs_channel_update = get_event_msg!(nodes[1], MessageSendEvent::SendChannelUpdate, nodes[0].node.get_our_node_id());
+               let broadcast_funding = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
+               assert_eq!(broadcast_funding.txid(), funding_tx.txid());
+               assert!(nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().is_empty());
 
                if !std::thread::panicking() {
                        bg_processor.stop().unwrap();
@@ -1570,10 +1655,11 @@ mod tests {
                        _ => panic!("Unexpected event: {:?}", event),
                };
                let persister = Arc::new(Persister::new(data_dir));
-               let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
+               let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
 
                // Force close the channel and check that the SpendableOutputs event was handled.
-               nodes[0].node.force_close_broadcasting_latest_txn(&nodes[0].node.list_channels()[0].channel_id, &nodes[1].node.get_our_node_id()).unwrap();
+               let error_message = "Channel force-closed";
+               nodes[0].node.force_close_broadcasting_latest_txn(&nodes[0].node.list_channels()[0].channel_id, &nodes[1].node.get_our_node_id(), error_message.to_string()).unwrap();
                let commitment_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
                confirm_transaction_depth(&mut nodes[0], &commitment_tx, BREAKDOWN_TIMEOUT as u32);
 
@@ -1581,10 +1667,95 @@ mod tests {
                        .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
                        .expect("Events not handled within deadline");
                match event {
-                       Event::SpendableOutputs { .. } => {},
+                       Event::SpendableOutputs { outputs, channel_id } => {
+                               nodes[0].sweeper.track_spendable_outputs(outputs, channel_id, false, Some(153)).unwrap();
+                       },
                        _ => panic!("Unexpected event: {:?}", event),
                }
 
+               // Check we don't generate an initial sweeping tx until we reach the required height.
+               assert_eq!(nodes[0].sweeper.tracked_spendable_outputs().len(), 1);
+               let tracked_output = nodes[0].sweeper.tracked_spendable_outputs().first().unwrap().clone();
+               if let Some(sweep_tx_0) = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop() {
+                       assert!(!tracked_output.is_spent_in(&sweep_tx_0));
+                       match tracked_output.status {
+                               OutputSpendStatus::PendingInitialBroadcast { delayed_until_height } => {
+                                       assert_eq!(delayed_until_height, Some(153));
+                               }
+                               _ => panic!("Unexpected status"),
+                       }
+               }
+
+               advance_chain(&mut nodes[0], 3);
+
+               // Check we generate an initial sweeping tx.
+               assert_eq!(nodes[0].sweeper.tracked_spendable_outputs().len(), 1);
+               let tracked_output = nodes[0].sweeper.tracked_spendable_outputs().first().unwrap().clone();
+               let sweep_tx_0 = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
+               match tracked_output.status {
+                       OutputSpendStatus::PendingFirstConfirmation { latest_spending_tx, .. } => {
+                               assert_eq!(sweep_tx_0.txid(), latest_spending_tx.txid());
+                       }
+                       _ => panic!("Unexpected status"),
+               }
+
+               // Check we regenerate and rebroadcast the sweeping tx each block.
+               advance_chain(&mut nodes[0], 1);
+               assert_eq!(nodes[0].sweeper.tracked_spendable_outputs().len(), 1);
+               let tracked_output = nodes[0].sweeper.tracked_spendable_outputs().first().unwrap().clone();
+               let sweep_tx_1 = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
+               match tracked_output.status {
+                       OutputSpendStatus::PendingFirstConfirmation { latest_spending_tx, .. } => {
+                               assert_eq!(sweep_tx_1.txid(), latest_spending_tx.txid());
+                       }
+                       _ => panic!("Unexpected status"),
+               }
+               assert_ne!(sweep_tx_0, sweep_tx_1);
+
+               advance_chain(&mut nodes[0], 1);
+               assert_eq!(nodes[0].sweeper.tracked_spendable_outputs().len(), 1);
+               let tracked_output = nodes[0].sweeper.tracked_spendable_outputs().first().unwrap().clone();
+               let sweep_tx_2 = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
+               match tracked_output.status {
+                       OutputSpendStatus::PendingFirstConfirmation { latest_spending_tx, .. } => {
+                               assert_eq!(sweep_tx_2.txid(), latest_spending_tx.txid());
+                       }
+                       _ => panic!("Unexpected status"),
+               }
+               assert_ne!(sweep_tx_0, sweep_tx_2);
+               assert_ne!(sweep_tx_1, sweep_tx_2);
+
+               // Check we still track the spendable outputs up to ANTI_REORG_DELAY confirmations.
+               confirm_transaction_depth(&mut nodes[0], &sweep_tx_2, 5);
+               assert_eq!(nodes[0].sweeper.tracked_spendable_outputs().len(), 1);
+               let tracked_output = nodes[0].sweeper.tracked_spendable_outputs().first().unwrap().clone();
+               match tracked_output.status {
+                       OutputSpendStatus::PendingThresholdConfirmations { latest_spending_tx, .. } => {
+                               assert_eq!(sweep_tx_2.txid(), latest_spending_tx.txid());
+                       }
+                       _ => panic!("Unexpected status"),
+               }
+
+               // Check we still see the transaction as confirmed if we unconfirm any untracked
+               // transaction. (We previously had a bug that would mark tracked transactions as
+               // unconfirmed if any transaction at an unknown block height would be unconfirmed.)
+               let unconf_txid = Txid::from_slice(&[0; 32]).unwrap();
+               nodes[0].sweeper.transaction_unconfirmed(&unconf_txid);
+
+               assert_eq!(nodes[0].sweeper.tracked_spendable_outputs().len(), 1);
+               let tracked_output = nodes[0].sweeper.tracked_spendable_outputs().first().unwrap().clone();
+               match tracked_output.status {
+                       OutputSpendStatus::PendingThresholdConfirmations { latest_spending_tx, .. } => {
+                               assert_eq!(sweep_tx_2.txid(), latest_spending_tx.txid());
+                       }
+                       _ => panic!("Unexpected status"),
+               }
+
+               // Check we stop tracking the spendable outputs when one of the txs reaches
+               // ANTI_REORG_DELAY confirmations.
+               confirm_transaction_depth(&mut nodes[0], &sweep_tx_0, ANTI_REORG_DELAY);
+               assert_eq!(nodes[0].sweeper.tracked_spendable_outputs().len(), 0);
+
                if !std::thread::panicking() {
                        bg_processor.stop().unwrap();
                }
@@ -1596,11 +1767,11 @@ mod tests {
                let data_dir = nodes[0].kv_store.get_data_dir();
                let persister = Arc::new(Persister::new(data_dir));
                let event_handler = |_: _| {};
-               let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
+               let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
 
                loop {
                        let log_entries = nodes[0].logger.lines.lock().unwrap();
-                       let expected_log = "Persisting scorer".to_string();
+                       let expected_log = "Calling time_passed and persisting scorer".to_string();
                        if log_entries.get(&("lightning_background_processor", expected_log)).is_some() {
                                break
                        }
@@ -1669,7 +1840,7 @@ mod tests {
                let persister = Arc::new(Persister::new(data_dir).with_graph_persistence_notifier(sender));
 
                let event_handler = |_: _| {};
-               let background_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
+               let background_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()), nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
 
                do_test_not_pruning_network_graph_until_graph_sync_completion!(nodes,
                        receiver.recv_timeout(Duration::from_secs(super::FIRST_NETWORK_PRUNE_TIMER * 5)),
@@ -1689,7 +1860,7 @@ mod tests {
 
                let (exit_sender, exit_receiver) = tokio::sync::watch::channel(());
                let bp_future = super::process_events_async(
-                       persister, |_: _| {async {}}, nodes[0].chain_monitor.clone(), nodes[0].node.clone(),
+                       persister, |_: _| {async {}}, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()),
                        nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(),
                        Some(nodes[0].scorer.clone()), move |dur: Duration| {
                                let mut exit_receiver = exit_receiver.clone();
@@ -1699,7 +1870,7 @@ mod tests {
                                                _ = exit_receiver.changed() => true,
                                        }
                                })
-                       }, false,
+                       }, false, || Some(Duration::from_secs(1696300000)),
                );
 
                let t1 = tokio::spawn(bp_future);
@@ -1827,7 +1998,7 @@ mod tests {
                let (_, nodes) = create_nodes(1, "test_payment_path_scoring");
                let data_dir = nodes[0].kv_store.get_data_dir();
                let persister = Arc::new(Persister::new(data_dir));
-               let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
+               let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
 
                do_test_payment_path_scoring!(nodes, receiver.recv_timeout(Duration::from_secs(EVENT_DEADLINE)));
 
@@ -1864,7 +2035,7 @@ mod tests {
                let (exit_sender, exit_receiver) = tokio::sync::watch::channel(());
 
                let bp_future = super::process_events_async(
-                       persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(),
+                       persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()),
                        nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(),
                        Some(nodes[0].scorer.clone()), move |dur: Duration| {
                                let mut exit_receiver = exit_receiver.clone();
@@ -1874,7 +2045,7 @@ mod tests {
                                                _ = exit_receiver.changed() => true,
                                        }
                                })
-                       }, false,
+                       }, false, || Some(Duration::ZERO),
                );
                let t1 = tokio::spawn(bp_future);
                let t2 = tokio::spawn(async move {