]> git.bitcoin.ninja Git - rust-lightning/commitdiff
`rustfmt`: Run on `lightning-background-processor/src/lib.rs`
authorElias Rohrer <dev@tnull.de>
Thu, 20 Jun 2024 12:11:14 +0000 (14:11 +0200)
committerElias Rohrer <dev@tnull.de>
Tue, 9 Jul 2024 08:26:00 +0000 (10:26 +0200)
lightning-background-processor/src/lib.rs

index 6798d382d5b385420693a9d843e15088bedd4e0e..940d1b029e730045d25afd775a6cfe17461f27b6 100644 (file)
@@ -4,12 +4,9 @@
 
 #![deny(rustdoc::broken_intra_doc_links)]
 #![deny(rustdoc::private_intra_doc_links)]
-
 #![deny(missing_docs)]
 #![cfg_attr(not(feature = "futures"), deny(unsafe_code))]
-
 #![cfg_attr(docsrs, feature(doc_auto_cfg))]
-
 #![cfg_attr(all(not(feature = "std"), not(test)), no_std)]
 
 #[cfg(any(test, feature = "std"))]
@@ -18,25 +15,26 @@ extern crate core;
 #[cfg(not(feature = "std"))]
 extern crate alloc;
 
-#[macro_use] extern crate lightning;
+#[macro_use]
+extern crate lightning;
 extern crate lightning_rapid_gossip_sync;
 
 use lightning::chain;
 use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
 use lightning::chain::chainmonitor::{ChainMonitor, Persist};
-use lightning::events::{Event, PathFailure};
 #[cfg(feature = "std")]
 use lightning::events::EventHandler;
 #[cfg(feature = "std")]
 use lightning::events::EventsProvider;
+use lightning::events::{Event, PathFailure};
 
 use lightning::ln::channelmanager::AChannelManager;
 use lightning::ln::msgs::OnionMessageHandler;
-use lightning::onion_message::messenger::AOnionMessenger;
 use lightning::ln::peer_handler::APeerManager;
+use lightning::onion_message::messenger::AOnionMessenger;
 use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
-use lightning::routing::utxo::UtxoLookup;
 use lightning::routing::scoring::{ScoreUpdate, WriteableScore};
+use lightning::routing::utxo::UtxoLookup;
 use lightning::util::logger::Logger;
 use lightning::util::persist::Persister;
 #[cfg(feature = "std")]
@@ -46,11 +44,11 @@ use lightning_rapid_gossip_sync::RapidGossipSync;
 use core::ops::Deref;
 use core::time::Duration;
 
-#[cfg(feature = "std")]
-use std::sync::Arc;
 #[cfg(feature = "std")]
 use core::sync::atomic::{AtomicBool, Ordering};
 #[cfg(feature = "std")]
+use std::sync::Arc;
+#[cfg(feature = "std")]
 use std::thread::{self, JoinHandle};
 #[cfg(feature = "std")]
 use std::time::Instant;
@@ -133,10 +131,18 @@ const REBROADCAST_TIMER: u64 = 1;
 
 #[cfg(feature = "futures")]
 /// core::cmp::min is not currently const, so we define a trivial (and equivalent) replacement
-const fn min_u64(a: u64, b: u64) -> u64 { if a < b { a } else { b } }
+const fn min_u64(a: u64, b: u64) -> u64 {
+       if a < b {
+               a
+       } else {
+               b
+       }
+}
 #[cfg(feature = "futures")]
-const FASTEST_TIMER: u64 = min_u64(min_u64(FRESHNESS_TIMER, PING_TIMER),
-       min_u64(SCORER_PERSIST_TIMER, min_u64(FIRST_NETWORK_PRUNE_TIMER, REBROADCAST_TIMER)));
+const FASTEST_TIMER: u64 = min_u64(
+       min_u64(FRESHNESS_TIMER, PING_TIMER),
+       min_u64(SCORER_PERSIST_TIMER, min_u64(FIRST_NETWORK_PRUNE_TIMER, REBROADCAST_TIMER)),
+);
 
 /// Either [`P2PGossipSync`] or [`RapidGossipSync`].
 pub enum GossipSync<
@@ -145,8 +151,10 @@ pub enum GossipSync<
        G: Deref<Target = NetworkGraph<L>>,
        U: Deref,
        L: Deref,
->
-where U::Target: UtxoLookup, L::Target: Logger {
+> where
+       U::Target: UtxoLookup,
+       L::Target: Logger,
+{
        /// Gossip sync via the lightning peer-to-peer network as defined by BOLT 7.
        P2P(P),
        /// Rapid gossip sync from a trusted server.
@@ -156,13 +164,16 @@ where U::Target: UtxoLookup, L::Target: Logger {
 }
 
 impl<
-       P: Deref<Target = P2PGossipSync<G, U, L>>,
-       R: Deref<Target = RapidGossipSync<G, L>>,
-       G: Deref<Target = NetworkGraph<L>>,
-       U: Deref,
-       L: Deref,
-> GossipSync<P, R, G, U, L>
-where U::Target: UtxoLookup, L::Target: Logger {
+               P: Deref<Target = P2PGossipSync<G, U, L>>,
+               R: Deref<Target = RapidGossipSync<G, L>>,
+               G: Deref<Target = NetworkGraph<L>>,
+               U: Deref,
+               L: Deref,
+       > GossipSync<P, R, G, U, L>
+where
+       U::Target: UtxoLookup,
+       L::Target: Logger,
+{
        fn network_graph(&self) -> Option<&G> {
                match self {
                        GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
@@ -187,8 +198,12 @@ where U::Target: UtxoLookup, L::Target: Logger {
 }
 
 /// This is not exported to bindings users as the bindings concretize everything and have constructors for us
-impl<P: Deref<Target = P2PGossipSync<G, U, L>>, G: Deref<Target = NetworkGraph<L>>, U: Deref, L: Deref>
-       GossipSync<P, &RapidGossipSync<G, L>, G, U, L>
+impl<
+               P: Deref<Target = P2PGossipSync<G, U, L>>,
+               G: Deref<Target = NetworkGraph<L>>,
+               U: Deref,
+               L: Deref,
+       > GossipSync<P, &RapidGossipSync<G, L>, G, U, L>
 where
        U::Target: UtxoLookup,
        L::Target: Logger,
@@ -200,15 +215,19 @@ where
 }
 
 /// This is not exported to bindings users as the bindings concretize everything and have constructors for us
-impl<'a, R: Deref<Target = RapidGossipSync<G, L>>, G: Deref<Target = NetworkGraph<L>>, L: Deref>
+impl<
+               'a,
+               R: Deref<Target = RapidGossipSync<G, L>>,
+               G: Deref<Target = NetworkGraph<L>>,
+               L: Deref,
+       >
        GossipSync<
                &P2PGossipSync<G, &'a (dyn UtxoLookup + Send + Sync), L>,
                R,
                G,
                &'a (dyn UtxoLookup + Send + Sync),
                L,
-       >
-where
+       > where
        L::Target: Logger,
 {
        /// Initializes a new [`GossipSync::Rapid`] variant.
@@ -225,8 +244,7 @@ impl<'a, L: Deref>
                &'a NetworkGraph<L>,
                &'a (dyn UtxoLookup + Send + Sync),
                L,
-       >
-where
+       > where
        L::Target: Logger,
 {
        /// Initializes a new [`GossipSync::None`] variant.
@@ -235,11 +253,14 @@ where
        }
 }
 
-fn handle_network_graph_update<L: Deref>(
-       network_graph: &NetworkGraph<L>, event: &Event
-) where L::Target: Logger {
+fn handle_network_graph_update<L: Deref>(network_graph: &NetworkGraph<L>, event: &Event)
+where
+       L::Target: Logger,
+{
        if let Event::PaymentPathFailed {
-               failure: PathFailure::OnPath { network_update: Some(ref upd) }, .. } = event
+               failure: PathFailure::OnPath { network_update: Some(ref upd) },
+               ..
+       } = event
        {
                network_graph.handle_network_update(upd);
        }
@@ -463,35 +484,50 @@ macro_rules! define_run_body {
 #[cfg(feature = "futures")]
 pub(crate) mod futures_util {
        use core::future::Future;
-       use core::task::{Poll, Waker, RawWaker, RawWakerVTable};
-       use core::pin::Pin;
        use core::marker::Unpin;
+       use core::pin::Pin;
+       use core::task::{Poll, RawWaker, RawWakerVTable, Waker};
        pub(crate) struct Selector<
-               A: Future<Output=()> + Unpin, B: Future<Output=()> + Unpin, C: Future<Output=bool> + Unpin
+               A: Future<Output = ()> + Unpin,
+               B: Future<Output = ()> + Unpin,
+               C: Future<Output = bool> + Unpin,
        > {
                pub a: A,
                pub b: B,
                pub c: C,
        }
        pub(crate) enum SelectorOutput {
-               A, B, C(bool),
+               A,
+               B,
+               C(bool),
        }
 
        impl<
-               A: Future<Output=()> + Unpin, B: Future<Output=()> + Unpin, C: Future<Output=bool> + Unpin
-       > Future for Selector<A, B, C> {
+                       A: Future<Output = ()> + Unpin,
+                       B: Future<Output = ()> + Unpin,
+                       C: Future<Output = bool> + Unpin,
+               > Future for Selector<A, B, C>
+       {
                type Output = SelectorOutput;
-               fn poll(mut self: Pin<&mut Self>, ctx: &mut core::task::Context<'_>) -> Poll<SelectorOutput> {
+               fn poll(
+                       mut self: Pin<&mut Self>, ctx: &mut core::task::Context<'_>,
+               ) -> Poll<SelectorOutput> {
                        match Pin::new(&mut self.a).poll(ctx) {
-                               Poll::Ready(()) => { return Poll::Ready(SelectorOutput::A); },
+                               Poll::Ready(()) => {
+                                       return Poll::Ready(SelectorOutput::A);
+                               },
                                Poll::Pending => {},
                        }
                        match Pin::new(&mut self.b).poll(ctx) {
-                               Poll::Ready(()) => { return Poll::Ready(SelectorOutput::B); },
+                               Poll::Ready(()) => {
+                                       return Poll::Ready(SelectorOutput::B);
+                               },
                                Poll::Pending => {},
                        }
                        match Pin::new(&mut self.c).poll(ctx) {
-                               Poll::Ready(res) => { return Poll::Ready(SelectorOutput::C(res)); },
+                               Poll::Ready(res) => {
+                                       return Poll::Ready(SelectorOutput::C(res));
+                               },
                                Poll::Pending => {},
                        }
                        Poll::Pending
@@ -501,17 +537,25 @@ pub(crate) mod futures_util {
        // If we want to poll a future without an async context to figure out if it has completed or
        // not without awaiting, we need a Waker, which needs a vtable...we fill it with dummy values
        // but sadly there's a good bit of boilerplate here.
-       fn dummy_waker_clone(_: *const ()) -> RawWaker { RawWaker::new(core::ptr::null(), &DUMMY_WAKER_VTABLE) }
-       fn dummy_waker_action(_: *const ()) { }
+       fn dummy_waker_clone(_: *const ()) -> RawWaker {
+               RawWaker::new(core::ptr::null(), &DUMMY_WAKER_VTABLE)
+       }
+       fn dummy_waker_action(_: *const ()) {}
 
        const DUMMY_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new(
-               dummy_waker_clone, dummy_waker_action, dummy_waker_action, dummy_waker_action);
-       pub(crate) fn dummy_waker() -> Waker { unsafe { Waker::from_raw(RawWaker::new(core::ptr::null(), &DUMMY_WAKER_VTABLE)) } }
+               dummy_waker_clone,
+               dummy_waker_action,
+               dummy_waker_action,
+               dummy_waker_action,
+       );
+       pub(crate) fn dummy_waker() -> Waker {
+               unsafe { Waker::from_raw(RawWaker::new(core::ptr::null(), &DUMMY_WAKER_VTABLE)) }
+       }
 }
 #[cfg(feature = "futures")]
-use futures_util::{Selector, SelectorOutput, dummy_waker};
-#[cfg(feature = "futures")]
 use core::task;
+#[cfg(feature = "futures")]
+use futures_util::{dummy_waker, Selector, SelectorOutput};
 
 /// Processes background events in a future.
 ///
@@ -657,7 +701,10 @@ pub async fn process_events_async<
        EventHandlerFuture: core::future::Future<Output = ()>,
        EventHandler: Fn(Event) -> EventHandlerFuture,
        PS: 'static + Deref + Send,
-       M: 'static + Deref<Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P>> + Send + Sync,
+       M: 'static
+               + Deref<Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P>>
+               + Send
+               + Sync,
        CM: 'static + Deref + Send + Sync,
        OM: 'static + Deref + Send + Sync,
        PGS: 'static + Deref<Target = P2PGossipSync<G, UL, L>> + Send + Sync,
@@ -670,9 +717,9 @@ pub async fn process_events_async<
        FetchTime: Fn() -> Option<Duration>,
 >(
        persister: PS, event_handler: EventHandler, chain_monitor: M, channel_manager: CM,
-       onion_messenger: Option<OM>,
-       gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM, logger: L, scorer: Option<S>,
-       sleeper: Sleeper, mobile_interruptable_platform: bool, fetch_time: FetchTime,
+       onion_messenger: Option<OM>, gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM,
+       logger: L, scorer: Option<S>, sleeper: Sleeper, mobile_interruptable_platform: bool,
+       fetch_time: FetchTime,
 ) -> Result<(), lightning::io::Error>
 where
        UL::Target: 'static + UtxoLookup,
@@ -713,31 +760,51 @@ where
                })
        };
        define_run_body!(
-               persister, chain_monitor,
+               persister,
+               chain_monitor,
                chain_monitor.process_pending_events_async(async_event_handler).await,
-               channel_manager, channel_manager.get_cm().process_pending_events_async(async_event_handler).await,
-               onion_messenger, if let Some(om) = &onion_messenger { om.get_om().process_pending_events_async(async_event_handler).await },
-               peer_manager, gossip_sync, logger, scorer, should_break, {
+               channel_manager,
+               channel_manager.get_cm().process_pending_events_async(async_event_handler).await,
+               onion_messenger,
+               if let Some(om) = &onion_messenger {
+                       om.get_om().process_pending_events_async(async_event_handler).await
+               },
+               peer_manager,
+               gossip_sync,
+               logger,
+               scorer,
+               should_break,
+               {
                        let fut = Selector {
                                a: channel_manager.get_cm().get_event_or_persistence_needed_future(),
                                b: chain_monitor.get_update_future(),
-                               c: sleeper(if mobile_interruptable_platform { Duration::from_millis(100) } else { Duration::from_secs(FASTEST_TIMER) }),
+                               c: sleeper(if mobile_interruptable_platform {
+                                       Duration::from_millis(100)
+                               } else {
+                                       Duration::from_secs(FASTEST_TIMER)
+                               }),
                        };
                        match fut.await {
-                               SelectorOutput::A|SelectorOutput::B => {},
+                               SelectorOutput::A | SelectorOutput::B => {},
                                SelectorOutput::C(exit) => {
                                        should_break = exit;
-                               }
+                               },
                        }
-               }, |t| sleeper(Duration::from_secs(t)),
+               },
+               |t| sleeper(Duration::from_secs(t)),
                |fut: &mut SleepFuture, _| {
                        let mut waker = dummy_waker();
                        let mut ctx = task::Context::from_waker(&mut waker);
                        match core::pin::Pin::new(fut).poll(&mut ctx) {
-                               task::Poll::Ready(exit) => { should_break = exit; true },
+                               task::Poll::Ready(exit) => {
+                                       should_break = exit;
+                                       true
+                               },
                                task::Poll::Pending => false,
                        }
-               }, mobile_interruptable_platform, fetch_time,
+               },
+               mobile_interruptable_platform,
+               fetch_time,
        )
 }
 
@@ -798,18 +865,21 @@ impl BackgroundProcessor {
                P: 'static + Deref + Send + Sync,
                EH: 'static + EventHandler + Send,
                PS: 'static + Deref + Send,
-               M: 'static + Deref<Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P>> + Send + Sync,
+               M: 'static
+                       + Deref<Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P>>
+                       + Send
+                       + Sync,
                CM: 'static + Deref + Send + Sync,
                OM: 'static + Deref + Send + Sync,
                PGS: 'static + Deref<Target = P2PGossipSync<G, UL, L>> + Send + Sync,
                RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
                PM: 'static + Deref + Send + Sync,
                S: 'static + Deref<Target = SC> + Send + Sync,
-               SC: for <'b> WriteableScore<'b>,
+               SC: for<'b> WriteableScore<'b>,
        >(
                persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
-               onion_messenger: Option<OM>,
-               gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM, logger: L, scorer: Option<S>,
+               onion_messenger: Option<OM>, gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM,
+               logger: L, scorer: Option<S>,
        ) -> Self
        where
                UL::Target: 'static + UtxoLookup,
@@ -833,7 +903,8 @@ impl BackgroundProcessor {
                                }
                                if let Some(ref scorer) = scorer {
                                        use std::time::SystemTime;
-                                       let duration_since_epoch = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH)
+                                       let duration_since_epoch = SystemTime::now()
+                                               .duration_since(SystemTime::UNIX_EPOCH)
                                                .expect("Time should be sometime after 1970");
                                        if update_scorer(scorer, &event, duration_since_epoch) {
                                                log_trace!(logger, "Persisting scorer after update");
@@ -845,19 +916,37 @@ impl BackgroundProcessor {
                                event_handler.handle_event(event);
                        };
                        define_run_body!(
-                               persister, chain_monitor, chain_monitor.process_pending_events(&event_handler),
-                               channel_manager, channel_manager.get_cm().process_pending_events(&event_handler),
-                               onion_messenger, if let Some(om) = &onion_messenger { om.get_om().process_pending_events(&event_handler) },
-                               peer_manager, gossip_sync, logger, scorer, stop_thread.load(Ordering::Acquire),
-                               { Sleeper::from_two_futures(
-                                       &channel_manager.get_cm().get_event_or_persistence_needed_future(),
-                                       &chain_monitor.get_update_future()
-                               ).wait_timeout(Duration::from_millis(100)); },
-                               |_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur, false,
+                               persister,
+                               chain_monitor,
+                               chain_monitor.process_pending_events(&event_handler),
+                               channel_manager,
+                               channel_manager.get_cm().process_pending_events(&event_handler),
+                               onion_messenger,
+                               if let Some(om) = &onion_messenger {
+                                       om.get_om().process_pending_events(&event_handler)
+                               },
+                               peer_manager,
+                               gossip_sync,
+                               logger,
+                               scorer,
+                               stop_thread.load(Ordering::Acquire),
+                               {
+                                       Sleeper::from_two_futures(
+                                               &channel_manager.get_cm().get_event_or_persistence_needed_future(),
+                                               &chain_monitor.get_update_future(),
+                                       )
+                                       .wait_timeout(Duration::from_millis(100));
+                               },
+                               |_| Instant::now(),
+                               |time: &Instant, dur| time.elapsed().as_secs() > dur,
+                               false,
                                || {
                                        use std::time::SystemTime;
-                                       Some(SystemTime::now().duration_since(SystemTime::UNIX_EPOCH)
-                                               .expect("Time should be sometime after 1970"))
+                                       Some(
+                                               SystemTime::now()
+                                                       .duration_since(SystemTime::UNIX_EPOCH)
+                                                       .expect("Time should be sometime after 1970"),
+                                       )
                                },
                        )
                });
@@ -914,53 +1003,60 @@ impl Drop for BackgroundProcessor {
 
 #[cfg(all(feature = "std", test))]
 mod tests {
-       use bitcoin::{Amount, ScriptBuf, Txid};
+       use super::{BackgroundProcessor, GossipSync, FRESHNESS_TIMER};
        use bitcoin::blockdata::constants::{genesis_block, ChainHash};
        use bitcoin::blockdata::locktime::absolute::LockTime;
        use bitcoin::blockdata::transaction::{Transaction, TxOut};
        use bitcoin::hashes::Hash;
        use bitcoin::network::Network;
-       use bitcoin::secp256k1::{SecretKey, PublicKey, Secp256k1};
+       use bitcoin::secp256k1::{PublicKey, Secp256k1, SecretKey};
        use bitcoin::transaction::Version;
-       use lightning::chain::{BestBlock, Confirm, chainmonitor, Filter};
+       use bitcoin::{Amount, ScriptBuf, Txid};
        use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
-       use lightning::sign::{InMemorySigner, KeysManager, ChangeDestinationSource};
        use lightning::chain::transaction::OutPoint;
-       use lightning::events::{Event, PathFailure, MessageSendEventsProvider, MessageSendEvent};
-       use lightning::{get_event_msg, get_event};
-       use lightning::ln::types::{PaymentHash, ChannelId};
+       use lightning::chain::{chainmonitor, BestBlock, Confirm, Filter};
+       use lightning::events::{Event, MessageSendEvent, MessageSendEventsProvider, PathFailure};
        use lightning::ln::channelmanager;
-       use lightning::ln::channelmanager::{BREAKDOWN_TIMEOUT, ChainParameters, MIN_CLTV_EXPIRY_DELTA, PaymentId};
+       use lightning::ln::channelmanager::{
+               ChainParameters, PaymentId, BREAKDOWN_TIMEOUT, MIN_CLTV_EXPIRY_DELTA,
+       };
        use lightning::ln::features::{ChannelFeatures, NodeFeatures};
        use lightning::ln::functional_test_utils::*;
        use lightning::ln::msgs::{ChannelMessageHandler, Init};
-       use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler};
+       use lightning::ln::peer_handler::{
+               IgnoringMessageHandler, MessageHandler, PeerManager, SocketDescriptor,
+       };
+       use lightning::ln::types::{ChannelId, PaymentHash};
        use lightning::onion_message::messenger::{DefaultMessageRouter, OnionMessenger};
        use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
-       use lightning::routing::scoring::{ChannelUsage, ScoreUpdate, ScoreLookUp, LockableScore};
-       use lightning::routing::router::{DefaultRouter, Path, RouteHop, CandidateRouteHop};
+       use lightning::routing::router::{CandidateRouteHop, DefaultRouter, Path, RouteHop};
+       use lightning::routing::scoring::{ChannelUsage, LockableScore, ScoreLookUp, ScoreUpdate};
+       use lightning::sign::{ChangeDestinationSource, InMemorySigner, KeysManager};
        use lightning::util::config::UserConfig;
+       use lightning::util::persist::{
+               KVStore, CHANNEL_MANAGER_PERSISTENCE_KEY, CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
+               CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_KEY,
+               NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
+               SCORER_PERSISTENCE_KEY, SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
+               SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
+       };
        use lightning::util::ser::Writeable;
+       use lightning::util::sweep::{OutputSpendStatus, OutputSweeper};
        use lightning::util::test_utils;
-       use lightning::util::persist::{KVStore,
-               CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_KEY,
-               NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_KEY,
-               SCORER_PERSISTENCE_PRIMARY_NAMESPACE, SCORER_PERSISTENCE_SECONDARY_NAMESPACE, SCORER_PERSISTENCE_KEY};
-       use lightning::util::sweep::{OutputSweeper, OutputSpendStatus};
+       use lightning::{get_event, get_event_msg};
        use lightning_persister::fs_store::FilesystemStore;
+       use lightning_rapid_gossip_sync::RapidGossipSync;
        use std::collections::VecDeque;
-       use std::{fs, env};
        use std::path::PathBuf;
-       use std::sync::{Arc, Mutex};
        use std::sync::mpsc::SyncSender;
+       use std::sync::{Arc, Mutex};
        use std::time::Duration;
-       use lightning_rapid_gossip_sync::RapidGossipSync;
-       use super::{BackgroundProcessor, GossipSync, FRESHNESS_TIMER};
+       use std::{env, fs};
 
        const EVENT_DEADLINE: u64 = 5 * FRESHNESS_TIMER;
 
        #[derive(Clone, Hash, PartialEq, Eq)]
-       struct TestDescriptor{}
+       struct TestDescriptor {}
        impl SocketDescriptor for TestDescriptor {
                fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize {
                        0
@@ -974,37 +1070,82 @@ mod tests {
        #[cfg(not(c_bindings))]
        type LockingWrapper<T> = Mutex<T>;
 
-       type ChannelManager =
-               channelmanager::ChannelManager<
-                       Arc<ChainMonitor>,
-                       Arc<test_utils::TestBroadcaster>,
-                       Arc<KeysManager>,
-                       Arc<KeysManager>,
-                       Arc<KeysManager>,
-                       Arc<test_utils::TestFeeEstimator>,
-                       Arc<DefaultRouter<
+       type ChannelManager = channelmanager::ChannelManager<
+               Arc<ChainMonitor>,
+               Arc<test_utils::TestBroadcaster>,
+               Arc<KeysManager>,
+               Arc<KeysManager>,
+               Arc<KeysManager>,
+               Arc<test_utils::TestFeeEstimator>,
+               Arc<
+                       DefaultRouter<
                                Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
                                Arc<test_utils::TestLogger>,
                                Arc<KeysManager>,
                                Arc<LockingWrapper<TestScorer>>,
                                (),
-                               TestScorer>
+                               TestScorer,
                        >,
-                       Arc<test_utils::TestLogger>>;
-
-       type ChainMonitor = chainmonitor::ChainMonitor<InMemorySigner, Arc<test_utils::TestChainSource>, Arc<test_utils::TestBroadcaster>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>, Arc<FilesystemStore>>;
-
-       type PGS = Arc<P2PGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>>>;
-       type RGS = Arc<RapidGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestLogger>>>;
-
-       type OM = OnionMessenger<Arc<KeysManager>, Arc<KeysManager>, Arc<test_utils::TestLogger>, Arc<ChannelManager>, Arc<DefaultMessageRouter<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestLogger>, Arc<KeysManager>>>, IgnoringMessageHandler, Arc<ChannelManager>, IgnoringMessageHandler>;
+               >,
+               Arc<test_utils::TestLogger>,
+       >;
+
+       type ChainMonitor = chainmonitor::ChainMonitor<
+               InMemorySigner,
+               Arc<test_utils::TestChainSource>,
+               Arc<test_utils::TestBroadcaster>,
+               Arc<test_utils::TestFeeEstimator>,
+               Arc<test_utils::TestLogger>,
+               Arc<FilesystemStore>,
+       >;
+
+       type PGS = Arc<
+               P2PGossipSync<
+                       Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
+                       Arc<test_utils::TestChainSource>,
+                       Arc<test_utils::TestLogger>,
+               >,
+       >;
+       type RGS = Arc<
+               RapidGossipSync<
+                       Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
+                       Arc<test_utils::TestLogger>,
+               >,
+       >;
+
+       type OM = OnionMessenger<
+               Arc<KeysManager>,
+               Arc<KeysManager>,
+               Arc<test_utils::TestLogger>,
+               Arc<ChannelManager>,
+               Arc<
+                       DefaultMessageRouter<
+                               Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
+                               Arc<test_utils::TestLogger>,
+                               Arc<KeysManager>,
+                       >,
+               >,
+               IgnoringMessageHandler,
+               Arc<ChannelManager>,
+               IgnoringMessageHandler,
+       >;
 
        struct Node {
                node: Arc<ChannelManager>,
                messenger: Arc<OM>,
                p2p_gossip_sync: PGS,
                rapid_gossip_sync: RGS,
-               peer_manager: Arc<PeerManager<TestDescriptor, Arc<test_utils::TestChannelMessageHandler>, Arc<test_utils::TestRoutingMessageHandler>, Arc<OM>, Arc<test_utils::TestLogger>, IgnoringMessageHandler, Arc<KeysManager>>>,
+               peer_manager: Arc<
+                       PeerManager<
+                               TestDescriptor,
+                               Arc<test_utils::TestChannelMessageHandler>,
+                               Arc<test_utils::TestRoutingMessageHandler>,
+                               Arc<OM>,
+                               Arc<test_utils::TestLogger>,
+                               IgnoringMessageHandler,
+                               Arc<KeysManager>,
+                       >,
+               >,
                chain_monitor: Arc<ChainMonitor>,
                kv_store: Arc<FilesystemStore>,
                tx_broadcaster: Arc<test_utils::TestBroadcaster>,
@@ -1012,21 +1153,53 @@ mod tests {
                logger: Arc<test_utils::TestLogger>,
                best_block: BestBlock,
                scorer: Arc<LockingWrapper<TestScorer>>,
-               sweeper: Arc<OutputSweeper<Arc<test_utils::TestBroadcaster>, Arc<TestWallet>,
-                       Arc<test_utils::TestFeeEstimator>, Arc<dyn Filter + Sync + Send>, Arc<FilesystemStore>,
-                       Arc<test_utils::TestLogger>, Arc<KeysManager>>>,
+               sweeper: Arc<
+                       OutputSweeper<
+                               Arc<test_utils::TestBroadcaster>,
+                               Arc<TestWallet>,
+                               Arc<test_utils::TestFeeEstimator>,
+                               Arc<dyn Filter + Sync + Send>,
+                               Arc<FilesystemStore>,
+                               Arc<test_utils::TestLogger>,
+                               Arc<KeysManager>,
+                       >,
+               >,
        }
 
        impl Node {
-               fn p2p_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
+               fn p2p_gossip_sync(
+                       &self,
+               ) -> GossipSync<
+                       PGS,
+                       RGS,
+                       Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
+                       Arc<test_utils::TestChainSource>,
+                       Arc<test_utils::TestLogger>,
+               > {
                        GossipSync::P2P(self.p2p_gossip_sync.clone())
                }
 
-               fn rapid_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
+               fn rapid_gossip_sync(
+                       &self,
+               ) -> GossipSync<
+                       PGS,
+                       RGS,
+                       Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
+                       Arc<test_utils::TestChainSource>,
+                       Arc<test_utils::TestLogger>,
+               > {
                        GossipSync::Rapid(self.rapid_gossip_sync.clone())
                }
 
-               fn no_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
+               fn no_gossip_sync(
+                       &self,
+               ) -> GossipSync<
+                       PGS,
+                       RGS,
+                       Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
+                       Arc<test_utils::TestChainSource>,
+                       Arc<test_utils::TestLogger>,
+               > {
                        GossipSync::None
                }
        }
@@ -1035,8 +1208,10 @@ mod tests {
                fn drop(&mut self) {
                        let data_dir = self.kv_store.get_data_dir();
                        match fs::remove_dir_all(data_dir.clone()) {
-                               Err(e) => println!("Failed to remove test store directory {}: {}", data_dir.display(), e),
-                               _ => {}
+                               Err(e) => {
+                                       println!("Failed to remove test store directory {}: {}", data_dir.display(), e)
+                               },
+                               _ => {},
                        }
                }
        }
@@ -1052,7 +1227,13 @@ mod tests {
        impl Persister {
                fn new(data_dir: PathBuf) -> Self {
                        let kv_store = FilesystemStore::new(data_dir);
-                       Self { graph_error: None, graph_persistence_notifier: None, manager_error: None, scorer_error: None, kv_store }
+                       Self {
+                               graph_error: None,
+                               graph_persistence_notifier: None,
+                               manager_error: None,
+                               scorer_error: None,
+                               kv_store,
+                       }
                }
 
                fn with_graph_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
@@ -1073,53 +1254,63 @@ mod tests {
        }
 
        impl KVStore for Persister {
-               fn read(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> lightning::io::Result<Vec<u8>> {
+               fn read(
+                       &self, primary_namespace: &str, secondary_namespace: &str, key: &str,
+               ) -> lightning::io::Result<Vec<u8>> {
                        self.kv_store.read(primary_namespace, secondary_namespace, key)
                }
 
-               fn write(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8]) -> lightning::io::Result<()> {
-                       if primary_namespace == CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE &&
-                               secondary_namespace == CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE &&
-                               key == CHANNEL_MANAGER_PERSISTENCE_KEY
+               fn write(
+                       &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8],
+               ) -> lightning::io::Result<()> {
+                       if primary_namespace == CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE
+                               && secondary_namespace == CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE
+                               && key == CHANNEL_MANAGER_PERSISTENCE_KEY
                        {
                                if let Some((error, message)) = self.manager_error {
-                                       return Err(std::io::Error::new(error, message))
+                                       return Err(std::io::Error::new(error, message));
                                }
                        }
 
-                       if primary_namespace == NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE &&
-                               secondary_namespace == NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE &&
-                               key == NETWORK_GRAPH_PERSISTENCE_KEY
+                       if primary_namespace == NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE
+                               && secondary_namespace == NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE
+                               && key == NETWORK_GRAPH_PERSISTENCE_KEY
                        {
                                if let Some(sender) = &self.graph_persistence_notifier {
                                        match sender.send(()) {
                                                Ok(()) => {},
-                                               Err(std::sync::mpsc::SendError(())) => println!("Persister failed to notify as receiver went away."),
+                                               Err(std::sync::mpsc::SendError(())) => {
+                                                       println!("Persister failed to notify as receiver went away.")
+                                               },
                                        }
                                };
 
                                if let Some((error, message)) = self.graph_error {
-                                       return Err(std::io::Error::new(error, message))
+                                       return Err(std::io::Error::new(error, message));
                                }
                        }
 
-                       if primary_namespace == SCORER_PERSISTENCE_PRIMARY_NAMESPACE &&
-                               secondary_namespace == SCORER_PERSISTENCE_SECONDARY_NAMESPACE &&
-                               key == SCORER_PERSISTENCE_KEY
+                       if primary_namespace == SCORER_PERSISTENCE_PRIMARY_NAMESPACE
+                               && secondary_namespace == SCORER_PERSISTENCE_SECONDARY_NAMESPACE
+                               && key == SCORER_PERSISTENCE_KEY
                        {
                                if let Some((error, message)) = self.scorer_error {
-                                       return Err(std::io::Error::new(error, message))
+                                       return Err(std::io::Error::new(error, message));
                                }
                        }
 
                        self.kv_store.write(primary_namespace, secondary_namespace, key, buf)
                }
 
-               fn remove(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool) -> lightning::io::Result<()> {
+               fn remove(
+                       &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool,
+               ) -> lightning::io::Result<()> {
                        self.kv_store.remove(primary_namespace, secondary_namespace, key, lazy)
                }
 
-               fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> lightning::io::Result<Vec<String>> {
+               fn list(
+                       &self, primary_namespace: &str, secondary_namespace: &str,
+               ) -> lightning::io::Result<Vec<String>> {
                        self.kv_store.list(primary_namespace, secondary_namespace)
                }
        }
@@ -1147,18 +1338,27 @@ mod tests {
        }
 
        impl lightning::util::ser::Writeable for TestScorer {
-               fn write<W: lightning::util::ser::Writer>(&self, _: &mut W) -> Result<(), lightning::io::Error> { Ok(()) }
+               fn write<W: lightning::util::ser::Writer>(
+                       &self, _: &mut W,
+               ) -> Result<(), lightning::io::Error> {
+                       Ok(())
+               }
        }
 
        impl ScoreLookUp for TestScorer {
                type ScoreParams = ();
                fn channel_penalty_msat(
-                       &self, _candidate: &CandidateRouteHop, _usage: ChannelUsage, _score_params: &Self::ScoreParams
-               ) -> u64 { unimplemented!(); }
+                       &self, _candidate: &CandidateRouteHop, _usage: ChannelUsage,
+                       _score_params: &Self::ScoreParams,
+               ) -> u64 {
+                       unimplemented!();
+               }
        }
 
        impl ScoreUpdate for TestScorer {
-               fn payment_path_failed(&mut self, actual_path: &Path, actual_short_channel_id: u64, _: Duration) {
+               fn payment_path_failed(
+                       &mut self, actual_path: &Path, actual_short_channel_id: u64, _: Duration,
+               ) {
                        if let Some(expectations) = &mut self.event_expectations {
                                match expectations.pop_front().unwrap() {
                                        TestResult::PaymentFailure { path, short_channel_id } => {
@@ -1173,7 +1373,7 @@ mod tests {
                                        },
                                        TestResult::ProbeSuccess { path } => {
                                                panic!("Unexpected probe success: {:?}", path)
-                                       }
+                                       },
                                }
                        }
                }
@@ -1192,7 +1392,7 @@ mod tests {
                                        },
                                        TestResult::ProbeSuccess { path } => {
                                                panic!("Unexpected probe success: {:?}", path)
-                                       }
+                                       },
                                }
                        }
                }
@@ -1211,7 +1411,7 @@ mod tests {
                                        },
                                        TestResult::ProbeSuccess { path } => {
                                                panic!("Unexpected probe success: {:?}", path)
-                                       }
+                                       },
                                }
                        }
                }
@@ -1229,7 +1429,7 @@ mod tests {
                                        },
                                        TestResult::ProbeSuccess { path } => {
                                                assert_eq!(actual_path, &path);
-                                       }
+                                       },
                                }
                        }
                }
@@ -1274,7 +1474,8 @@ mod tests {
                let mut nodes = Vec::new();
                for i in 0..num_nodes {
                        let tx_broadcaster = Arc::new(test_utils::TestBroadcaster::new(network));
-                       let fee_estimator = Arc::new(test_utils::TestFeeEstimator { sat_per_kw: Mutex::new(253) });
+                       let fee_estimator =
+                               Arc::new(test_utils::TestFeeEstimator { sat_per_kw: Mutex::new(253) });
                        let logger = Arc::new(test_utils::TestLogger::with_id(format!("node {}", i)));
                        let genesis_block = genesis_block(network);
                        let network_graph = Arc::new(NetworkGraph::new(network, logger.clone()));
@@ -1282,42 +1483,125 @@ mod tests {
                        let now = Duration::from_secs(genesis_block.header.time as u64);
                        let seed = [i as u8; 32];
                        let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos()));
-                       let router = Arc::new(DefaultRouter::new(network_graph.clone(), logger.clone(), Arc::clone(&keys_manager), scorer.clone(), Default::default()));
-                       let msg_router = Arc::new(DefaultMessageRouter::new(network_graph.clone(), Arc::clone(&keys_manager)));
+                       let router = Arc::new(DefaultRouter::new(
+                               network_graph.clone(),
+                               logger.clone(),
+                               Arc::clone(&keys_manager),
+                               scorer.clone(),
+                               Default::default(),
+                       ));
+                       let msg_router = Arc::new(DefaultMessageRouter::new(
+                               network_graph.clone(),
+                               Arc::clone(&keys_manager),
+                       ));
                        let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Bitcoin));
-                       let kv_store = Arc::new(FilesystemStore::new(format!("{}_persister_{}", &persist_dir, i).into()));
+                       let kv_store =
+                               Arc::new(FilesystemStore::new(format!("{}_persister_{}", &persist_dir, i).into()));
                        let now = Duration::from_secs(genesis_block.header.time as u64);
                        let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos()));
-                       let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(Some(chain_source.clone()), tx_broadcaster.clone(), logger.clone(), fee_estimator.clone(), kv_store.clone()));
+                       let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(
+                               Some(chain_source.clone()),
+                               tx_broadcaster.clone(),
+                               logger.clone(),
+                               fee_estimator.clone(),
+                               kv_store.clone(),
+                       ));
                        let best_block = BestBlock::from_network(network);
                        let params = ChainParameters { network, best_block };
-                       let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster.clone(), router.clone(), logger.clone(), keys_manager.clone(), keys_manager.clone(), keys_manager.clone(), UserConfig::default(), params, genesis_block.header.time));
-                       let messenger = Arc::new(OnionMessenger::new(keys_manager.clone(), keys_manager.clone(), logger.clone(), manager.clone(), msg_router.clone(), IgnoringMessageHandler {}, manager.clone(), IgnoringMessageHandler {}));
+                       let manager = Arc::new(ChannelManager::new(
+                               fee_estimator.clone(),
+                               chain_monitor.clone(),
+                               tx_broadcaster.clone(),
+                               router.clone(),
+                               logger.clone(),
+                               keys_manager.clone(),
+                               keys_manager.clone(),
+                               keys_manager.clone(),
+                               UserConfig::default(),
+                               params,
+                               genesis_block.header.time,
+                       ));
+                       let messenger = Arc::new(OnionMessenger::new(
+                               keys_manager.clone(),
+                               keys_manager.clone(),
+                               logger.clone(),
+                               manager.clone(),
+                               msg_router.clone(),
+                               IgnoringMessageHandler {},
+                               manager.clone(),
+                               IgnoringMessageHandler {},
+                       ));
                        let wallet = Arc::new(TestWallet {});
-                       let sweeper = Arc::new(OutputSweeper::new(best_block, Arc::clone(&tx_broadcaster), Arc::clone(&fee_estimator),
-                               None::<Arc<dyn Filter + Sync + Send>>, Arc::clone(&keys_manager), wallet, Arc::clone(&kv_store), Arc::clone(&logger)));
-                       let p2p_gossip_sync = Arc::new(P2PGossipSync::new(network_graph.clone(), Some(chain_source.clone()), logger.clone()));
-                       let rapid_gossip_sync = Arc::new(RapidGossipSync::new(network_graph.clone(), logger.clone()));
+                       let sweeper = Arc::new(OutputSweeper::new(
+                               best_block,
+                               Arc::clone(&tx_broadcaster),
+                               Arc::clone(&fee_estimator),
+                               None::<Arc<dyn Filter + Sync + Send>>,
+                               Arc::clone(&keys_manager),
+                               wallet,
+                               Arc::clone(&kv_store),
+                               Arc::clone(&logger),
+                       ));
+                       let p2p_gossip_sync = Arc::new(P2PGossipSync::new(
+                               network_graph.clone(),
+                               Some(chain_source.clone()),
+                               logger.clone(),
+                       ));
+                       let rapid_gossip_sync =
+                               Arc::new(RapidGossipSync::new(network_graph.clone(), logger.clone()));
                        let msg_handler = MessageHandler {
-                               chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new(ChainHash::using_genesis_block(Network::Testnet))),
+                               chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new(
+                                       ChainHash::using_genesis_block(Network::Testnet),
+                               )),
                                route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new()),
-                               onion_message_handler: messenger.clone(), custom_message_handler: IgnoringMessageHandler{}
+                               onion_message_handler: messenger.clone(),
+                               custom_message_handler: IgnoringMessageHandler {},
+                       };
+                       let peer_manager = Arc::new(PeerManager::new(
+                               msg_handler,
+                               0,
+                               &seed,
+                               logger.clone(),
+                               keys_manager.clone(),
+                       ));
+                       let node = Node {
+                               node: manager,
+                               p2p_gossip_sync,
+                               rapid_gossip_sync,
+                               peer_manager,
+                               chain_monitor,
+                               kv_store,
+                               tx_broadcaster,
+                               network_graph,
+                               logger,
+                               best_block,
+                               scorer,
+                               sweeper,
+                               messenger,
                        };
-                       let peer_manager = Arc::new(PeerManager::new(msg_handler, 0, &seed, logger.clone(), keys_manager.clone()));
-                       let node = Node { node: manager, p2p_gossip_sync, rapid_gossip_sync, peer_manager, chain_monitor, kv_store, tx_broadcaster, network_graph, logger, best_block, scorer, sweeper, messenger };
                        nodes.push(node);
                }
 
                for i in 0..num_nodes {
-                       for j in (i+1)..num_nodes {
+                       for j in (i + 1)..num_nodes {
                                let init_i = Init {
-                                       features: nodes[j].node.init_features(), networks: None, remote_network_address: None
+                                       features: nodes[j].node.init_features(),
+                                       networks: None,
+                                       remote_network_address: None,
                                };
-                               nodes[i].node.peer_connected(&nodes[j].node.get_our_node_id(), &init_i, true).unwrap();
+                               nodes[i]
+                                       .node
+                                       .peer_connected(&nodes[j].node.get_our_node_id(), &init_i, true)
+                                       .unwrap();
                                let init_j = Init {
-                                       features: nodes[i].node.init_features(), networks: None, remote_network_address: None
+                                       features: nodes[i].node.init_features(),
+                                       networks: None,
+                                       remote_network_address: None,
                                };
-                               nodes[j].node.peer_connected(&nodes[i].node.get_our_node_id(), &init_j, false).unwrap();
+                               nodes[j]
+                                       .node
+                                       .peer_connected(&nodes[i].node.get_our_node_id(), &init_j, false)
+                                       .unwrap();
                        }
                }
 
@@ -1329,43 +1613,82 @@ mod tests {
                        begin_open_channel!($node_a, $node_b, $channel_value);
                        let events = $node_a.node.get_and_clear_pending_events();
                        assert_eq!(events.len(), 1);
-                       let (temporary_channel_id, tx) = handle_funding_generation_ready!(events[0], $channel_value);
-                       $node_a.node.funding_transaction_generated(&temporary_channel_id, &$node_b.node.get_our_node_id(), tx.clone()).unwrap();
-                       let msg_a = get_event_msg!($node_a, MessageSendEvent::SendFundingCreated, $node_b.node.get_our_node_id());
+                       let (temporary_channel_id, tx) =
+                               handle_funding_generation_ready!(events[0], $channel_value);
+                       $node_a
+                               .node
+                               .funding_transaction_generated(
+                                       &temporary_channel_id,
+                                       &$node_b.node.get_our_node_id(),
+                                       tx.clone(),
+                               )
+                               .unwrap();
+                       let msg_a = get_event_msg!(
+                               $node_a,
+                               MessageSendEvent::SendFundingCreated,
+                               $node_b.node.get_our_node_id()
+                       );
                        $node_b.node.handle_funding_created(&$node_a.node.get_our_node_id(), &msg_a);
                        get_event!($node_b, Event::ChannelPending);
-                       let msg_b = get_event_msg!($node_b, MessageSendEvent::SendFundingSigned, $node_a.node.get_our_node_id());
+                       let msg_b = get_event_msg!(
+                               $node_b,
+                               MessageSendEvent::SendFundingSigned,
+                               $node_a.node.get_our_node_id()
+                       );
                        $node_a.node.handle_funding_signed(&$node_b.node.get_our_node_id(), &msg_b);
                        get_event!($node_a, Event::ChannelPending);
                        tx
-               }}
+               }};
        }
 
        macro_rules! begin_open_channel {
                ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
-                       $node_a.node.create_channel($node_b.node.get_our_node_id(), $channel_value, 100, 42, None, None).unwrap();
-                       let msg_a = get_event_msg!($node_a, MessageSendEvent::SendOpenChannel, $node_b.node.get_our_node_id());
+                       $node_a
+                               .node
+                               .create_channel($node_b.node.get_our_node_id(), $channel_value, 100, 42, None, None)
+                               .unwrap();
+                       let msg_a = get_event_msg!(
+                               $node_a,
+                               MessageSendEvent::SendOpenChannel,
+                               $node_b.node.get_our_node_id()
+                       );
                        $node_b.node.handle_open_channel(&$node_a.node.get_our_node_id(), &msg_a);
-                       let msg_b = get_event_msg!($node_b, MessageSendEvent::SendAcceptChannel, $node_a.node.get_our_node_id());
+                       let msg_b = get_event_msg!(
+                               $node_b,
+                               MessageSendEvent::SendAcceptChannel,
+                               $node_a.node.get_our_node_id()
+                       );
                        $node_a.node.handle_accept_channel(&$node_b.node.get_our_node_id(), &msg_b);
-               }}
+               }};
        }
 
        macro_rules! handle_funding_generation_ready {
                ($event: expr, $channel_value: expr) => {{
                        match $event {
-                               Event::FundingGenerationReady { temporary_channel_id, channel_value_satoshis, ref output_script, user_channel_id, .. } => {
+                               Event::FundingGenerationReady {
+                                       temporary_channel_id,
+                                       channel_value_satoshis,
+                                       ref output_script,
+                                       user_channel_id,
+                                       ..
+                               } => {
                                        assert_eq!(channel_value_satoshis, $channel_value);
                                        assert_eq!(user_channel_id, 42);
 
-                                       let tx = Transaction { version: Version::ONE, lock_time: LockTime::ZERO, input: Vec::new(), output: vec![TxOut {
-                                               value: Amount::from_sat(channel_value_satoshis), script_pubkey: output_script.clone(),
-                                       }]};
+                                       let tx = Transaction {
+                                               version: Version::ONE,
+                                               lock_time: LockTime::ZERO,
+                                               input: Vec::new(),
+                                               output: vec![TxOut {
+                                                       value: Amount::from_sat(channel_value_satoshis),
+                                                       script_pubkey: output_script.clone(),
+                                               }],
+                                       };
                                        (temporary_channel_id, tx)
                                },
                                _ => panic!("Unexpected event"),
                        }
-               }}
+               }};
        }
 
        fn confirm_transaction_depth(node: &mut Node, tx: &Transaction, depth: u32) {
@@ -1435,7 +1758,17 @@ mod tests {
                let data_dir = nodes[0].kv_store.get_data_dir();
                let persister = Arc::new(Persister::new(data_dir));
                let event_handler = |_: _| {};
-               let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
+               let bg_processor = BackgroundProcessor::start(
+                       persister,
+                       event_handler,
+                       nodes[0].chain_monitor.clone(),
+                       nodes[0].node.clone(),
+                       Some(nodes[0].messenger.clone()),
+                       nodes[0].p2p_gossip_sync(),
+                       nodes[0].peer_manager.clone(),
+                       nodes[0].logger.clone(),
+                       Some(nodes[0].scorer.clone()),
+               );
 
                macro_rules! check_persisted_data {
                        ($node: expr, $filepath: expr) => {
@@ -1443,48 +1776,60 @@ mod tests {
                                loop {
                                        expected_bytes.clear();
                                        match $node.write(&mut expected_bytes) {
-                                               Ok(()) => {
-                                                       match std::fs::read($filepath) {
-                                                               Ok(bytes) => {
-                                                                       if bytes == expected_bytes {
-                                                                               break
-                                                                       } else {
-                                                                               continue
-                                                                       }
-                                                               },
-                                                               Err(_) => continue
-                                                       }
+                                               Ok(()) => match std::fs::read($filepath) {
+                                                       Ok(bytes) => {
+                                                               if bytes == expected_bytes {
+                                                                       break;
+                                                               } else {
+                                                                       continue;
+                                                               }
+                                                       },
+                                                       Err(_) => continue,
                                                },
-                                               Err(e) => panic!("Unexpected error: {}", e)
+                                               Err(e) => panic!("Unexpected error: {}", e),
                                        }
                                }
-                       }
+                       };
                }
 
                // Check that the initial channel manager data is persisted as expected.
-               let filepath = get_full_filepath(format!("{}_persister_0", &persist_dir), "manager".to_string());
+               let filepath =
+                       get_full_filepath(format!("{}_persister_0", &persist_dir), "manager".to_string());
                check_persisted_data!(nodes[0].node, filepath.clone());
 
                loop {
-                       if !nodes[0].node.get_event_or_persist_condvar_value() { break }
+                       if !nodes[0].node.get_event_or_persist_condvar_value() {
+                               break;
+                       }
                }
 
                // Force-close the channel.
                let error_message = "Channel force-closed";
-               nodes[0].node.force_close_broadcasting_latest_txn(&ChannelId::v1_from_funding_outpoint(OutPoint { txid: tx.txid(), index: 0 }), &nodes[1].node.get_our_node_id(), error_message.to_string()).unwrap();
+               nodes[0]
+                       .node
+                       .force_close_broadcasting_latest_txn(
+                               &ChannelId::v1_from_funding_outpoint(OutPoint { txid: tx.txid(), index: 0 }),
+                               &nodes[1].node.get_our_node_id(),
+                               error_message.to_string(),
+                       )
+                       .unwrap();
 
                // Check that the force-close updates are persisted.
                check_persisted_data!(nodes[0].node, filepath.clone());
                loop {
-                       if !nodes[0].node.get_event_or_persist_condvar_value() { break }
+                       if !nodes[0].node.get_event_or_persist_condvar_value() {
+                               break;
+                       }
                }
 
                // Check network graph is persisted
-               let filepath = get_full_filepath(format!("{}_persister_0", &persist_dir), "network_graph".to_string());
+               let filepath =
+                       get_full_filepath(format!("{}_persister_0", &persist_dir), "network_graph".to_string());
                check_persisted_data!(nodes[0].network_graph, filepath.clone());
 
                // Check scorer is persisted
-               let filepath = get_full_filepath(format!("{}_persister_0", &persist_dir), "scorer".to_string());
+               let filepath =
+                       get_full_filepath(format!("{}_persister_0", &persist_dir), "scorer".to_string());
                check_persisted_data!(nodes[0].scorer, filepath.clone());
 
                if !std::thread::panicking() {
@@ -1503,18 +1848,29 @@ mod tests {
                let data_dir = nodes[0].kv_store.get_data_dir();
                let persister = Arc::new(Persister::new(data_dir));
                let event_handler = |_: _| {};
-               let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
+               let bg_processor = BackgroundProcessor::start(
+                       persister,
+                       event_handler,
+                       nodes[0].chain_monitor.clone(),
+                       nodes[0].node.clone(),
+                       Some(nodes[0].messenger.clone()),
+                       nodes[0].no_gossip_sync(),
+                       nodes[0].peer_manager.clone(),
+                       nodes[0].logger.clone(),
+                       Some(nodes[0].scorer.clone()),
+               );
                loop {
                        let log_entries = nodes[0].logger.lines.lock().unwrap();
                        let desired_log_1 = "Calling ChannelManager's timer_tick_occurred".to_string();
                        let desired_log_2 = "Calling PeerManager's timer_tick_occurred".to_string();
                        let desired_log_3 = "Rebroadcasting monitor's pending claims".to_string();
                        let desired_log_4 = "Calling OnionMessageHandler's timer_tick_occurred".to_string();
-                       if log_entries.get(&("lightning_background_processor", desired_log_1)).is_some() &&
-                               log_entries.get(&("lightning_background_processor", desired_log_2)).is_some() &&
-                               log_entries.get(&("lightning_background_processor", desired_log_3)).is_some() &&
-                               log_entries.get(&("lightning_background_processor", desired_log_4)).is_some() {
-                               break
+                       if log_entries.get(&("lightning_background_processor", desired_log_1)).is_some()
+                               && log_entries.get(&("lightning_background_processor", desired_log_2)).is_some()
+                               && log_entries.get(&("lightning_background_processor", desired_log_3)).is_some()
+                               && log_entries.get(&("lightning_background_processor", desired_log_4)).is_some()
+                       {
+                               break;
                        }
                }
 
@@ -1530,9 +1886,21 @@ mod tests {
                open_channel!(nodes[0], nodes[1], 100000);
 
                let data_dir = nodes[0].kv_store.get_data_dir();
-               let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
+               let persister = Arc::new(
+                       Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"),
+               );
                let event_handler = |_: _| {};
-               let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
+               let bg_processor = BackgroundProcessor::start(
+                       persister,
+                       event_handler,
+                       nodes[0].chain_monitor.clone(),
+                       nodes[0].node.clone(),
+                       Some(nodes[0].messenger.clone()),
+                       nodes[0].no_gossip_sync(),
+                       nodes[0].peer_manager.clone(),
+                       nodes[0].logger.clone(),
+                       Some(nodes[0].scorer.clone()),
+               );
                match bg_processor.join() {
                        Ok(_) => panic!("Expected error persisting manager"),
                        Err(e) => {
@@ -1550,17 +1918,28 @@ mod tests {
                open_channel!(nodes[0], nodes[1], 100000);
 
                let data_dir = nodes[0].kv_store.get_data_dir();
-               let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
+               let persister = Arc::new(
+                       Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"),
+               );
 
                let bp_future = super::process_events_async(
-                       persister, |_: _| {async {}}, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()),
-                       nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(),
-                       Some(nodes[0].scorer.clone()), move |dur: Duration| {
+                       persister,
+                       |_: _| async {},
+                       nodes[0].chain_monitor.clone(),
+                       nodes[0].node.clone(),
+                       Some(nodes[0].messenger.clone()),
+                       nodes[0].rapid_gossip_sync(),
+                       nodes[0].peer_manager.clone(),
+                       nodes[0].logger.clone(),
+                       Some(nodes[0].scorer.clone()),
+                       move |dur: Duration| {
                                Box::pin(async move {
                                        tokio::time::sleep(dur).await;
                                        false // Never exit
                                })
-                       }, false, || Some(Duration::ZERO),
+                       },
+                       false,
+                       || Some(Duration::ZERO),
                );
                match bp_future.await {
                        Ok(_) => panic!("Expected error persisting manager"),
@@ -1576,9 +1955,20 @@ mod tests {
                // Test that if we encounter an error during network graph persistence, an error gets returned.
                let (_, nodes) = create_nodes(2, "test_persist_network_graph_error");
                let data_dir = nodes[0].kv_store.get_data_dir();
-               let persister = Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test"));
+               let persister =
+                       Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test"));
                let event_handler = |_: _| {};
-               let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
+               let bg_processor = BackgroundProcessor::start(
+                       persister,
+                       event_handler,
+                       nodes[0].chain_monitor.clone(),
+                       nodes[0].node.clone(),
+                       Some(nodes[0].messenger.clone()),
+                       nodes[0].p2p_gossip_sync(),
+                       nodes[0].peer_manager.clone(),
+                       nodes[0].logger.clone(),
+                       Some(nodes[0].scorer.clone()),
+               );
 
                match bg_processor.stop() {
                        Ok(_) => panic!("Expected error persisting network graph"),
@@ -1594,9 +1984,20 @@ mod tests {
                // Test that if we encounter an error during scorer persistence, an error gets returned.
                let (_, nodes) = create_nodes(2, "test_persist_scorer_error");
                let data_dir = nodes[0].kv_store.get_data_dir();
-               let persister = Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test"));
+               let persister =
+                       Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test"));
                let event_handler = |_: _| {};
-               let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(),  nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
+               let bg_processor = BackgroundProcessor::start(
+                       persister,
+                       event_handler,
+                       nodes[0].chain_monitor.clone(),
+                       nodes[0].node.clone(),
+                       Some(nodes[0].messenger.clone()),
+                       nodes[0].no_gossip_sync(),
+                       nodes[0].peer_manager.clone(),
+                       nodes[0].logger.clone(),
+                       Some(nodes[0].scorer.clone()),
+               );
 
                match bg_processor.stop() {
                        Ok(_) => panic!("Expected error persisting scorer"),
@@ -1621,26 +2022,42 @@ mod tests {
                let (funding_generation_send, funding_generation_recv) = std::sync::mpsc::sync_channel(1);
                let (channel_pending_send, channel_pending_recv) = std::sync::mpsc::sync_channel(1);
                let event_handler = move |event: Event| match event {
-                       Event::FundingGenerationReady { .. } => funding_generation_send.send(handle_funding_generation_ready!(event, channel_value)).unwrap(),
+                       Event::FundingGenerationReady { .. } => funding_generation_send
+                               .send(handle_funding_generation_ready!(event, channel_value))
+                               .unwrap(),
                        Event::ChannelPending { .. } => channel_pending_send.send(()).unwrap(),
                        Event::ChannelReady { .. } => {},
                        _ => panic!("Unexpected event: {:?}", event),
                };
 
-               let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
+               let bg_processor = BackgroundProcessor::start(
+                       persister,
+                       event_handler,
+                       nodes[0].chain_monitor.clone(),
+                       nodes[0].node.clone(),
+                       Some(nodes[0].messenger.clone()),
+                       nodes[0].no_gossip_sync(),
+                       nodes[0].peer_manager.clone(),
+                       nodes[0].logger.clone(),
+                       Some(nodes[0].scorer.clone()),
+               );
 
                // Open a channel and check that the FundingGenerationReady event was handled.
                begin_open_channel!(nodes[0], nodes[1], channel_value);
                let (temporary_channel_id, funding_tx) = funding_generation_recv
                        .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
                        .expect("FundingGenerationReady not handled within deadline");
-               nodes[0].node.funding_transaction_generated(&temporary_channel_id, &node_1_id, funding_tx.clone()).unwrap();
+               nodes[0]
+                       .node
+                       .funding_transaction_generated(&temporary_channel_id, &node_1_id, funding_tx.clone())
+                       .unwrap();
                let msg_0 = get_event_msg!(nodes[0], MessageSendEvent::SendFundingCreated, node_1_id);
                nodes[1].node.handle_funding_created(&node_0_id, &msg_0);
                get_event!(nodes[1], Event::ChannelPending);
                let msg_1 = get_event_msg!(nodes[1], MessageSendEvent::SendFundingSigned, node_0_id);
                nodes[0].node.handle_funding_signed(&node_1_id, &msg_1);
-               let _ = channel_pending_recv.recv_timeout(Duration::from_secs(EVENT_DEADLINE))
+               let _ = channel_pending_recv
+                       .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
                        .expect("ChannelPending not handled within deadline");
 
                // Confirm the funding transaction.
@@ -1649,10 +2066,13 @@ mod tests {
                confirm_transaction(&mut nodes[1], &funding_tx);
                let bs_funding = get_event_msg!(nodes[1], MessageSendEvent::SendChannelReady, node_0_id);
                nodes[0].node.handle_channel_ready(&node_1_id, &bs_funding);
-               let _as_channel_update = get_event_msg!(nodes[0], MessageSendEvent::SendChannelUpdate, node_1_id);
+               let _as_channel_update =
+                       get_event_msg!(nodes[0], MessageSendEvent::SendChannelUpdate, node_1_id);
                nodes[1].node.handle_channel_ready(&node_0_id, &as_funding);
-               let _bs_channel_update = get_event_msg!(nodes[1], MessageSendEvent::SendChannelUpdate, node_0_id);
-               let broadcast_funding = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
+               let _bs_channel_update =
+                       get_event_msg!(nodes[1], MessageSendEvent::SendChannelUpdate, node_0_id);
+               let broadcast_funding =
+                       nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
                assert_eq!(broadcast_funding.txid(), funding_tx.txid());
                assert!(nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().is_empty());
 
@@ -1669,11 +2089,28 @@ mod tests {
                        _ => panic!("Unexpected event: {:?}", event),
                };
                let persister = Arc::new(Persister::new(data_dir));
-               let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
+               let bg_processor = BackgroundProcessor::start(
+                       persister,
+                       event_handler,
+                       nodes[0].chain_monitor.clone(),
+                       nodes[0].node.clone(),
+                       Some(nodes[0].messenger.clone()),
+                       nodes[0].no_gossip_sync(),
+                       nodes[0].peer_manager.clone(),
+                       nodes[0].logger.clone(),
+                       Some(nodes[0].scorer.clone()),
+               );
 
                // Force close the channel and check that the SpendableOutputs event was handled.
                let error_message = "Channel force-closed";
-               nodes[0].node.force_close_broadcasting_latest_txn(&nodes[0].node.list_channels()[0].channel_id, &node_1_id, error_message.to_string()).unwrap();
+               nodes[0]
+                       .node
+                       .force_close_broadcasting_latest_txn(
+                               &nodes[0].node.list_channels()[0].channel_id,
+                               &node_1_id,
+                               error_message.to_string(),
+                       )
+                       .unwrap();
                let commitment_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
                confirm_transaction_depth(&mut nodes[0], &commitment_tx, BREAKDOWN_TIMEOUT as u32);
 
@@ -1682,7 +2119,10 @@ mod tests {
                        .expect("Events not handled within deadline");
                match event {
                        Event::SpendableOutputs { outputs, channel_id } => {
-                               nodes[0].sweeper.track_spendable_outputs(outputs, channel_id, false, Some(153)).unwrap();
+                               nodes[0]
+                                       .sweeper
+                                       .track_spendable_outputs(outputs, channel_id, false, Some(153))
+                                       .unwrap();
                        },
                        _ => panic!("Unexpected event: {:?}", event),
                }
@@ -1695,7 +2135,7 @@ mod tests {
                        match tracked_output.status {
                                OutputSpendStatus::PendingInitialBroadcast { delayed_until_height } => {
                                        assert_eq!(delayed_until_height, Some(153));
-                               }
+                               },
                                _ => panic!("Unexpected status"),
                        }
                }
@@ -1709,7 +2149,7 @@ mod tests {
                match tracked_output.status {
                        OutputSpendStatus::PendingFirstConfirmation { latest_spending_tx, .. } => {
                                assert_eq!(sweep_tx_0.txid(), latest_spending_tx.txid());
-                       }
+                       },
                        _ => panic!("Unexpected status"),
                }
 
@@ -1721,7 +2161,7 @@ mod tests {
                match tracked_output.status {
                        OutputSpendStatus::PendingFirstConfirmation { latest_spending_tx, .. } => {
                                assert_eq!(sweep_tx_1.txid(), latest_spending_tx.txid());
-                       }
+                       },
                        _ => panic!("Unexpected status"),
                }
                assert_ne!(sweep_tx_0, sweep_tx_1);
@@ -1733,7 +2173,7 @@ mod tests {
                match tracked_output.status {
                        OutputSpendStatus::PendingFirstConfirmation { latest_spending_tx, .. } => {
                                assert_eq!(sweep_tx_2.txid(), latest_spending_tx.txid());
-                       }
+                       },
                        _ => panic!("Unexpected status"),
                }
                assert_ne!(sweep_tx_0, sweep_tx_2);
@@ -1746,7 +2186,7 @@ mod tests {
                match tracked_output.status {
                        OutputSpendStatus::PendingThresholdConfirmations { latest_spending_tx, .. } => {
                                assert_eq!(sweep_tx_2.txid(), latest_spending_tx.txid());
-                       }
+                       },
                        _ => panic!("Unexpected status"),
                }
 
@@ -1761,7 +2201,7 @@ mod tests {
                match tracked_output.status {
                        OutputSpendStatus::PendingThresholdConfirmations { latest_spending_tx, .. } => {
                                assert_eq!(sweep_tx_2.txid(), latest_spending_tx.txid());
-                       }
+                       },
                        _ => panic!("Unexpected status"),
                }
 
@@ -1781,13 +2221,23 @@ mod tests {
                let data_dir = nodes[0].kv_store.get_data_dir();
                let persister = Arc::new(Persister::new(data_dir));
                let event_handler = |_: _| {};
-               let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
+               let bg_processor = BackgroundProcessor::start(
+                       persister,
+                       event_handler,
+                       nodes[0].chain_monitor.clone(),
+                       nodes[0].node.clone(),
+                       Some(nodes[0].messenger.clone()),
+                       nodes[0].no_gossip_sync(),
+                       nodes[0].peer_manager.clone(),
+                       nodes[0].logger.clone(),
+                       Some(nodes[0].scorer.clone()),
+               );
 
                loop {
                        let log_entries = nodes[0].logger.lines.lock().unwrap();
                        let expected_log = "Calling time_passed and persisting scorer".to_string();
                        if log_entries.get(&("lightning_background_processor", expected_log)).is_some() {
-                               break
+                               break;
                        }
                }
 
@@ -1799,9 +2249,16 @@ mod tests {
        macro_rules! do_test_not_pruning_network_graph_until_graph_sync_completion {
                ($nodes: expr, $receive: expr, $sleep: expr) => {
                        let features = ChannelFeatures::empty();
-                       $nodes[0].network_graph.add_channel_from_partial_announcement(
-                               42, 53, features, $nodes[0].node.get_our_node_id(), $nodes[1].node.get_our_node_id()
-                       ).expect("Failed to update channel from partial announcement");
+                       $nodes[0]
+                               .network_graph
+                               .add_channel_from_partial_announcement(
+                                       42,
+                                       53,
+                                       features,
+                                       $nodes[0].node.get_our_node_id(),
+                                       $nodes[1].node.get_our_node_id(),
+                               )
+                               .expect("Failed to update channel from partial announcement");
                        let original_graph_description = $nodes[0].network_graph.to_string();
                        assert!(original_graph_description.contains("42: features: 0000, node_one:"));
                        assert_eq!($nodes[0].network_graph.read_only().channels().len(), 1);
@@ -1810,30 +2267,34 @@ mod tests {
                                $sleep;
                                let log_entries = $nodes[0].logger.lines.lock().unwrap();
                                let loop_counter = "Calling ChannelManager's timer_tick_occurred".to_string();
-                               if *log_entries.get(&("lightning_background_processor", loop_counter))
-                                       .unwrap_or(&0) > 1
+                               if *log_entries.get(&("lightning_background_processor", loop_counter)).unwrap_or(&0)
+                                       > 1
                                {
                                        // Wait until the loop has gone around at least twice.
-                                       break
+                                       break;
                                }
                        }
 
                        let initialization_input = vec![
-                               76, 68, 75, 1, 111, 226, 140, 10, 182, 241, 179, 114, 193, 166, 162, 70, 174, 99, 247,
-                               79, 147, 30, 131, 101, 225, 90, 8, 156, 104, 214, 25, 0, 0, 0, 0, 0, 97, 227, 98, 218,
-                               0, 0, 0, 4, 2, 22, 7, 207, 206, 25, 164, 197, 231, 230, 231, 56, 102, 61, 250, 251,
-                               187, 172, 38, 46, 79, 247, 108, 44, 155, 48, 219, 238, 252, 53, 192, 6, 67, 2, 36, 125,
-                               157, 176, 223, 175, 234, 116, 94, 248, 201, 225, 97, 235, 50, 47, 115, 172, 63, 136,
-                               88, 216, 115, 11, 111, 217, 114, 84, 116, 124, 231, 107, 2, 158, 1, 242, 121, 152, 106,
-                               204, 131, 186, 35, 93, 70, 216, 10, 237, 224, 183, 89, 95, 65, 3, 83, 185, 58, 138,
-                               181, 64, 187, 103, 127, 68, 50, 2, 201, 19, 17, 138, 136, 149, 185, 226, 156, 137, 175,
-                               110, 32, 237, 0, 217, 90, 31, 100, 228, 149, 46, 219, 175, 168, 77, 4, 143, 38, 128,
-                               76, 97, 0, 0, 0, 2, 0, 0, 255, 8, 153, 192, 0, 2, 27, 0, 0, 0, 1, 0, 0, 255, 2, 68,
-                               226, 0, 6, 11, 0, 1, 2, 3, 0, 0, 0, 2, 0, 40, 0, 0, 0, 0, 0, 0, 3, 232, 0, 0, 3, 232,
-                               0, 0, 0, 1, 0, 0, 0, 0, 58, 85, 116, 216, 255, 8, 153, 192, 0, 2, 27, 0, 0, 25, 0, 0,
-                               0, 1, 0, 0, 0, 125, 255, 2, 68, 226, 0, 6, 11, 0, 1, 5, 0, 0, 0, 0, 29, 129, 25, 192,
+                               76, 68, 75, 1, 111, 226, 140, 10, 182, 241, 179, 114, 193, 166, 162, 70, 174, 99,
+                               247, 79, 147, 30, 131, 101, 225, 90, 8, 156, 104, 214, 25, 0, 0, 0, 0, 0, 97, 227,
+                               98, 218, 0, 0, 0, 4, 2, 22, 7, 207, 206, 25, 164, 197, 231, 230, 231, 56, 102, 61,
+                               250, 251, 187, 172, 38, 46, 79, 247, 108, 44, 155, 48, 219, 238, 252, 53, 192, 6,
+                               67, 2, 36, 125, 157, 176, 223, 175, 234, 116, 94, 248, 201, 225, 97, 235, 50, 47,
+                               115, 172, 63, 136, 88, 216, 115, 11, 111, 217, 114, 84, 116, 124, 231, 107, 2, 158,
+                               1, 242, 121, 152, 106, 204, 131, 186, 35, 93, 70, 216, 10, 237, 224, 183, 89, 95,
+                               65, 3, 83, 185, 58, 138, 181, 64, 187, 103, 127, 68, 50, 2, 201, 19, 17, 138, 136,
+                               149, 185, 226, 156, 137, 175, 110, 32, 237, 0, 217, 90, 31, 100, 228, 149, 46, 219,
+                               175, 168, 77, 4, 143, 38, 128, 76, 97, 0, 0, 0, 2, 0, 0, 255, 8, 153, 192, 0, 2,
+                               27, 0, 0, 0, 1, 0, 0, 255, 2, 68, 226, 0, 6, 11, 0, 1, 2, 3, 0, 0, 0, 2, 0, 40, 0,
+                               0, 0, 0, 0, 0, 3, 232, 0, 0, 3, 232, 0, 0, 0, 1, 0, 0, 0, 0, 58, 85, 116, 216, 255,
+                               8, 153, 192, 0, 2, 27, 0, 0, 25, 0, 0, 0, 1, 0, 0, 0, 125, 255, 2, 68, 226, 0, 6,
+                               11, 0, 1, 5, 0, 0, 0, 0, 29, 129, 25, 192,
                        ];
-                       $nodes[0].rapid_gossip_sync.update_network_graph_no_std(&initialization_input[..], Some(1642291930)).unwrap();
+                       $nodes[0]
+                               .rapid_gossip_sync
+                               .update_network_graph_no_std(&initialization_input[..], Some(1642291930))
+                               .unwrap();
 
                        // this should have added two channels and pruned the previous one.
                        assert_eq!($nodes[0].network_graph.read_only().channels().len(), 2);
@@ -1842,23 +2303,36 @@ mod tests {
 
                        // all channels should now be pruned
                        assert_eq!($nodes[0].network_graph.read_only().channels().len(), 0);
-               }
+               };
        }
 
        #[test]
        fn test_not_pruning_network_graph_until_graph_sync_completion() {
                let (sender, receiver) = std::sync::mpsc::sync_channel(1);
 
-               let (_, nodes) = create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion");
+               let (_, nodes) =
+                       create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion");
                let data_dir = nodes[0].kv_store.get_data_dir();
                let persister = Arc::new(Persister::new(data_dir).with_graph_persistence_notifier(sender));
 
                let event_handler = |_: _| {};
-               let background_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()), nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
+               let background_processor = BackgroundProcessor::start(
+                       persister,
+                       event_handler,
+                       nodes[0].chain_monitor.clone(),
+                       nodes[0].node.clone(),
+                       Some(nodes[0].messenger.clone()),
+                       nodes[0].rapid_gossip_sync(),
+                       nodes[0].peer_manager.clone(),
+                       nodes[0].logger.clone(),
+                       Some(nodes[0].scorer.clone()),
+               );
 
-               do_test_not_pruning_network_graph_until_graph_sync_completion!(nodes,
+               do_test_not_pruning_network_graph_until_graph_sync_completion!(
+                       nodes,
                        receiver.recv_timeout(Duration::from_secs(super::FIRST_NETWORK_PRUNE_TIMER * 5)),
-                       std::thread::sleep(Duration::from_millis(1)));
+                       std::thread::sleep(Duration::from_millis(1))
+               );
 
                background_processor.stop().unwrap();
        }
@@ -1868,15 +2342,23 @@ mod tests {
        async fn test_not_pruning_network_graph_until_graph_sync_completion_async() {
                let (sender, receiver) = std::sync::mpsc::sync_channel(1);
 
-               let (_, nodes) = create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion_async");
+               let (_, nodes) =
+                       create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion_async");
                let data_dir = nodes[0].kv_store.get_data_dir();
                let persister = Arc::new(Persister::new(data_dir).with_graph_persistence_notifier(sender));
 
                let (exit_sender, exit_receiver) = tokio::sync::watch::channel(());
                let bp_future = super::process_events_async(
-                       persister, |_: _| {async {}}, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()),
-                       nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(),
-                       Some(nodes[0].scorer.clone()), move |dur: Duration| {
+                       persister,
+                       |_: _| async {},
+                       nodes[0].chain_monitor.clone(),
+                       nodes[0].node.clone(),
+                       Some(nodes[0].messenger.clone()),
+                       nodes[0].rapid_gossip_sync(),
+                       nodes[0].peer_manager.clone(),
+                       nodes[0].logger.clone(),
+                       Some(nodes[0].scorer.clone()),
+                       move |dur: Duration| {
                                let mut exit_receiver = exit_receiver.clone();
                                Box::pin(async move {
                                        tokio::select! {
@@ -1884,20 +2366,29 @@ mod tests {
                                                _ = exit_receiver.changed() => true,
                                        }
                                })
-                       }, false, || Some(Duration::from_secs(1696300000)),
+                       },
+                       false,
+                       || Some(Duration::from_secs(1696300000)),
                );
 
                let t1 = tokio::spawn(bp_future);
                let t2 = tokio::spawn(async move {
-                       do_test_not_pruning_network_graph_until_graph_sync_completion!(nodes, {
-                               let mut i = 0;
-                               loop {
-                                       tokio::time::sleep(Duration::from_secs(super::FIRST_NETWORK_PRUNE_TIMER)).await;
-                                       if let Ok(()) = receiver.try_recv() { break Ok::<(), ()>(()); }
-                                       assert!(i < 5);
-                                       i += 1;
-                               }
-                       }, tokio::time::sleep(Duration::from_millis(1)).await);
+                       do_test_not_pruning_network_graph_until_graph_sync_completion!(
+                               nodes,
+                               {
+                                       let mut i = 0;
+                                       loop {
+                                               tokio::time::sleep(Duration::from_secs(super::FIRST_NETWORK_PRUNE_TIMER))
+                                                       .await;
+                                               if let Ok(()) = receiver.try_recv() {
+                                                       break Ok::<(), ()>(());
+                                               }
+                                               assert!(i < 5);
+                                               i += 1;
+                                       }
+                               },
+                               tokio::time::sleep(Duration::from_millis(1)).await
+                       );
                        exit_sender.send(()).unwrap();
                });
                let (r1, r2) = tokio::join!(t1, t2);
@@ -2012,9 +2503,22 @@ mod tests {
                let (_, nodes) = create_nodes(1, "test_payment_path_scoring");
                let data_dir = nodes[0].kv_store.get_data_dir();
                let persister = Arc::new(Persister::new(data_dir));
-               let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
+               let bg_processor = BackgroundProcessor::start(
+                       persister,
+                       event_handler,
+                       nodes[0].chain_monitor.clone(),
+                       nodes[0].node.clone(),
+                       Some(nodes[0].messenger.clone()),
+                       nodes[0].no_gossip_sync(),
+                       nodes[0].peer_manager.clone(),
+                       nodes[0].logger.clone(),
+                       Some(nodes[0].scorer.clone()),
+               );
 
-               do_test_payment_path_scoring!(nodes, receiver.recv_timeout(Duration::from_secs(EVENT_DEADLINE)));
+               do_test_payment_path_scoring!(
+                       nodes,
+                       receiver.recv_timeout(Duration::from_secs(EVENT_DEADLINE))
+               );
 
                if !std::thread::panicking() {
                        bg_processor.stop().unwrap();
@@ -2033,10 +2537,10 @@ mod tests {
                        let sender_ref = sender.clone();
                        async move {
                                match event {
-                                       Event::PaymentPathFailed { .. } => { sender_ref.send(event).await.unwrap() },
-                                       Event::PaymentPathSuccessful { .. } => { sender_ref.send(event).await.unwrap() },
-                                       Event::ProbeSuccessful { .. } => { sender_ref.send(event).await.unwrap() },
-                                       Event::ProbeFailed { .. } => { sender_ref.send(event).await.unwrap() },
+                                       Event::PaymentPathFailed { .. } => sender_ref.send(event).await.unwrap(),
+                                       Event::PaymentPathSuccessful { .. } => sender_ref.send(event).await.unwrap(),
+                                       Event::ProbeSuccessful { .. } => sender_ref.send(event).await.unwrap(),
+                                       Event::ProbeFailed { .. } => sender_ref.send(event).await.unwrap(),
                                        _ => panic!("Unexpected event: {:?}", event),
                                }
                        }
@@ -2049,9 +2553,16 @@ mod tests {
                let (exit_sender, exit_receiver) = tokio::sync::watch::channel(());
 
                let bp_future = super::process_events_async(
-                       persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()),
-                       nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(),
-                       Some(nodes[0].scorer.clone()), move |dur: Duration| {
+                       persister,
+                       event_handler,
+                       nodes[0].chain_monitor.clone(),
+                       nodes[0].node.clone(),
+                       Some(nodes[0].messenger.clone()),
+                       nodes[0].no_gossip_sync(),
+                       nodes[0].peer_manager.clone(),
+                       nodes[0].logger.clone(),
+                       Some(nodes[0].scorer.clone()),
+                       move |dur: Duration| {
                                let mut exit_receiver = exit_receiver.clone();
                                Box::pin(async move {
                                        tokio::select! {
@@ -2059,7 +2570,9 @@ mod tests {
                                                _ = exit_receiver.changed() => true,
                                        }
                                })
-                       }, false, || Some(Duration::ZERO),
+                       },
+                       false,
+                       || Some(Duration::ZERO),
                );
                let t1 = tokio::spawn(bp_future);
                let t2 = tokio::spawn(async move {
@@ -2068,7 +2581,10 @@ mod tests {
 
                        let log_entries = nodes[0].logger.lines.lock().unwrap();
                        let expected_log = "Persisting scorer after update".to_string();
-                       assert_eq!(*log_entries.get(&("lightning_background_processor", expected_log)).unwrap(), 5);
+                       assert_eq!(
+                               *log_entries.get(&("lightning_background_processor", expected_log)).unwrap(),
+                               5
+                       );
                });
 
                let (r1, r2) = tokio::join!(t1, t2);