Add `counterparty_node_id` to `FundingGenerationReady`
[rust-lightning] / lightning-background-processor / src / lib.rs
index 0b5f8e4875d84f5b05e340dba0d1901371efc998..a0ac265ea6b31778b780f3ff36481dae4e1874bf 100644 (file)
@@ -18,8 +18,10 @@ use lightning::ln::channelmanager::ChannelManager;
 use lightning::ln::msgs::{ChannelMessageHandler, RoutingMessageHandler};
 use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor};
 use lightning::routing::network_graph::{NetworkGraph, NetGraphMsgHandler};
+use lightning::routing::scoring::WriteableScore;
 use lightning::util::events::{Event, EventHandler, EventsProvider};
 use lightning::util::logger::Logger;
+use lightning::util::persist::Persister;
 use std::sync::Arc;
 use std::sync::atomic::{AtomicBool, Ordering};
 use std::thread;
@@ -80,22 +82,6 @@ const FIRST_NETWORK_PRUNE_TIMER: u64 = 60;
 #[cfg(test)]
 const FIRST_NETWORK_PRUNE_TIMER: u64 = 1;
 
-/// Trait that handles persisting a [`ChannelManager`] and [`NetworkGraph`] to disk.
-pub trait Persister<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
-where
-       M::Target: 'static + chain::Watch<Signer>,
-       T::Target: 'static + BroadcasterInterface,
-       K::Target: 'static + KeysInterface<Signer = Signer>,
-       F::Target: 'static + FeeEstimator,
-       L::Target: 'static + Logger,
-{
-       /// Persist the given [`ChannelManager`] to disk, returning an error if persistence failed
-       /// (which will cause the [`BackgroundProcessor`] which called this method to exit).
-       fn persist_manager(&self, channel_manager: &ChannelManager<Signer, M, T, K, F, L>) -> Result<(), std::io::Error>;
-
-       /// Persist the given [`NetworkGraph`] to disk, returning an error if persistence failed.
-       fn persist_graph(&self, network_graph: &NetworkGraph) -> Result<(), std::io::Error>;
-}
 
 /// Decorates an [`EventHandler`] with common functionality provided by standard [`EventHandler`]s.
 struct DecoratingEventHandler<
@@ -138,12 +124,12 @@ impl BackgroundProcessor {
        ///
        /// [`Persister::persist_manager`] is responsible for writing out the [`ChannelManager`] to disk, and/or
        /// uploading to one or more backup services. See [`ChannelManager::write`] for writing out a
-       /// [`ChannelManager`]. See [`FilesystemPersister::persist_manager`] for Rust-Lightning's
+       /// [`ChannelManager`]. See the `lightning-persister` crate for LDK's
        /// provided implementation.
        ///
        /// [`Persister::persist_graph`] is responsible for writing out the [`NetworkGraph`] to disk. See
-       /// [`NetworkGraph::write`] for writing out a [`NetworkGraph`]. See [`FilesystemPersister::persist_network_graph`]
-       /// for Rust-Lightning's provided implementation.
+       /// [`NetworkGraph::write`] for writing out a [`NetworkGraph`]. See the `lightning-persister` crate
+       /// for LDK's provided implementation.
        ///
        /// Typically, users should either implement [`Persister::persist_manager`] to never return an
        /// error or call [`join`] and handle any error that may arise. For the latter case,
@@ -161,11 +147,12 @@ impl BackgroundProcessor {
        /// [`stop`]: Self::stop
        /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
        /// [`ChannelManager::write`]: lightning::ln::channelmanager::ChannelManager#impl-Writeable
-       /// [`FilesystemPersister::persist_manager`]: lightning_persister::FilesystemPersister::persist_manager
-       /// [`FilesystemPersister::persist_network_graph`]: lightning_persister::FilesystemPersister::persist_network_graph
+       /// [`Persister::persist_manager`]: lightning::util::persist::Persister::persist_manager
+       /// [`Persister::persist_graph`]: lightning::util::persist::Persister::persist_graph
        /// [`NetworkGraph`]: lightning::routing::network_graph::NetworkGraph
        /// [`NetworkGraph::write`]: lightning::routing::network_graph::NetworkGraph#impl-Writeable
        pub fn start<
+               'a,
                Signer: 'static + Sign,
                CA: 'static + Deref + Send + Sync,
                CF: 'static + Deref + Send + Sync,
@@ -180,15 +167,17 @@ impl BackgroundProcessor {
                CMH: 'static + Deref + Send + Sync,
                RMH: 'static + Deref + Send + Sync,
                EH: 'static + EventHandler + Send,
-               PS: 'static + Send + Persister<Signer, CW, T, K, F, L>,
+               PS: 'static + Deref + Send,
                M: 'static + Deref<Target = ChainMonitor<Signer, CF, T, F, L, P>> + Send + Sync,
                CM: 'static + Deref<Target = ChannelManager<Signer, CW, T, K, F, L>> + Send + Sync,
                NG: 'static + Deref<Target = NetGraphMsgHandler<G, CA, L>> + Send + Sync,
                UMH: 'static + Deref + Send + Sync,
                PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, L, UMH>> + Send + Sync,
+               S: 'static + Deref<Target = SC> + Send + Sync,
+               SC: WriteableScore<'a>,
        >(
                persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
-               net_graph_msg_handler: Option<NG>, peer_manager: PM, logger: L
+               net_graph_msg_handler: Option<NG>, peer_manager: PM, logger: L, scorer: Option<S>
        ) -> Self
        where
                CA::Target: 'static + chain::Access,
@@ -202,6 +191,7 @@ impl BackgroundProcessor {
                CMH::Target: 'static + ChannelMessageHandler,
                RMH::Target: 'static + RoutingMessageHandler,
                UMH::Target: 'static + CustomMessageHandler,
+               PS::Target: 'static + Persister<'a, Signer, CW, T, K, F, L, SC>,
        {
                let stop_thread = Arc::new(AtomicBool::new(false));
                let stop_thread_clone = stop_thread.clone();
@@ -217,10 +207,22 @@ impl BackgroundProcessor {
                        let mut have_pruned = false;
 
                        loop {
-                               peer_manager.process_events(); // Note that this may block on ChannelManager's locking
                                channel_manager.process_pending_events(&event_handler);
                                chain_monitor.process_pending_events(&event_handler);
 
+                               // Note that the PeerManager::process_events may block on ChannelManager's locks,
+                               // hence it comes last here. When the ChannelManager finishes whatever it's doing,
+                               // we want to ensure we get into `persist_manager` as quickly as we can, especially
+                               // without running the normal event processing above and handing events to users.
+                               //
+                               // Specifically, on an *extremely* slow machine, we may see ChannelManager start
+                               // processing a message effectively at any point during this loop. In order to
+                               // minimize the time between such processing completing and persisting the updated
+                               // ChannelManager, we want to minimize methods blocking on a ChannelManager
+                               // generally, and as a fallback place such blocking only immediately before
+                               // persistence.
+                               peer_manager.process_events();
+
                                // We wait up to 100ms, but track how long it takes to detect being put to sleep,
                                // see `await_start`'s use below.
                                let await_start = Instant::now();
@@ -272,13 +274,20 @@ impl BackgroundProcessor {
                                if last_prune_call.elapsed().as_secs() > if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER } {
                                        if let Some(ref handler) = net_graph_msg_handler {
                                                log_trace!(logger, "Pruning network graph of stale entries");
-                                               handler.network_graph().remove_stale_channels(); 
+                                               handler.network_graph().remove_stale_channels();
                                                if let Err(e) = persister.persist_graph(handler.network_graph()) {
                                                        log_error!(logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
                                                }
-                                               last_prune_call = Instant::now();
-                                               have_pruned = true;
                                        }
+                                       if let Some(ref scorer) = scorer {
+                                               log_trace!(logger, "Persisting scorer");
+                                               if let Err(e) = persister.persist_scorer(&scorer) {
+                                                       log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
+                                               }
+                                       }
+
+                                       last_prune_call = Instant::now();
+                                       have_pruned = true;
                                }
                        }
 
@@ -287,10 +296,16 @@ impl BackgroundProcessor {
                        // ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
                        persister.persist_manager(&*channel_manager)?;
 
+                       // Persist Scorer on exit
+                       if let Some(ref scorer) = scorer {
+                               persister.persist_scorer(&scorer)?;
+                       }
+
                        // Persist NetworkGraph on exit
                        if let Some(ref handler) = net_graph_msg_handler {
                                persister.persist_graph(handler.network_graph())?;
                        }
+
                        Ok(())
                });
                Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
@@ -349,10 +364,9 @@ mod tests {
        use bitcoin::blockdata::constants::genesis_block;
        use bitcoin::blockdata::transaction::{Transaction, TxOut};
        use bitcoin::network::constants::Network;
-       use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
-       use lightning::chain::{BestBlock, Confirm, chainmonitor, self};
+       use lightning::chain::{BestBlock, Confirm, chainmonitor};
        use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
-       use lightning::chain::keysinterface::{InMemorySigner, Recipient, KeysInterface, KeysManager, Sign};
+       use lightning::chain::keysinterface::{InMemorySigner, Recipient, KeysInterface, KeysManager};
        use lightning::chain::transaction::OutPoint;
        use lightning::get_event_msg;
        use lightning::ln::channelmanager::{BREAKDOWN_TIMEOUT, ChainParameters, ChannelManager, SimpleArcChannelManager};
@@ -362,17 +376,17 @@ mod tests {
        use lightning::routing::network_graph::{NetworkGraph, NetGraphMsgHandler};
        use lightning::util::config::UserConfig;
        use lightning::util::events::{Event, MessageSendEventsProvider, MessageSendEvent};
-       use lightning::util::logger::Logger;
        use lightning::util::ser::Writeable;
        use lightning::util::test_utils;
+       use lightning::util::persist::KVStorePersister;
        use lightning_invoice::payment::{InvoicePayer, RetryAttempts};
        use lightning_invoice::utils::DefaultRouter;
        use lightning_persister::FilesystemPersister;
        use std::fs;
-       use std::ops::Deref;
        use std::path::PathBuf;
        use std::sync::{Arc, Mutex};
        use std::time::Duration;
+       use lightning::routing::scoring::{FixedPenaltyScorer};
        use super::{BackgroundProcessor, FRESHNESS_TIMER};
 
        const EVENT_DEADLINE: u64 = 5 * FRESHNESS_TIMER;
@@ -399,6 +413,7 @@ mod tests {
                network_graph: Arc<NetworkGraph>,
                logger: Arc<test_utils::TestLogger>,
                best_block: BestBlock,
+               scorer: Arc<Mutex<FixedPenaltyScorer>>,
        }
 
        impl Drop for Node {
@@ -412,22 +427,52 @@ mod tests {
        }
 
        struct Persister {
-               data_dir: String,
+               graph_error: Option<(std::io::ErrorKind, &'static str)>,
+               manager_error: Option<(std::io::ErrorKind, &'static str)>,
+               scorer_error: Option<(std::io::ErrorKind, &'static str)>,
+               filesystem_persister: FilesystemPersister,
        }
 
-       impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L:Deref> super::Persister<Signer, M, T, K, F, L> for Persister where
-               M::Target: 'static + chain::Watch<Signer>,
-               T::Target: 'static + BroadcasterInterface,
-               K::Target: 'static + KeysInterface<Signer = Signer>,
-               F::Target: 'static + FeeEstimator,
-               L::Target: 'static + Logger,
-       {
-               fn persist_manager(&self, channel_manager: &ChannelManager<Signer, M, T, K, F, L>) -> Result<(), std::io::Error> {
-                       FilesystemPersister::persist_manager(self.data_dir.clone(), channel_manager)
+       impl Persister {
+               fn new(data_dir: String) -> Self {
+                       let filesystem_persister = FilesystemPersister::new(data_dir.clone());
+                       Self { graph_error: None, manager_error: None, scorer_error: None, filesystem_persister }
                }
 
-               fn persist_graph(&self, network_graph: &NetworkGraph) -> Result<(), std::io::Error> {
-                       FilesystemPersister::persist_network_graph(self.data_dir.clone(), network_graph)
+               fn with_graph_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
+                       Self { graph_error: Some((error, message)), ..self }
+               }
+
+               fn with_manager_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
+                       Self { manager_error: Some((error, message)), ..self }
+               }
+
+               fn with_scorer_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
+                       Self { scorer_error: Some((error, message)), ..self }
+               }
+       }
+
+       impl KVStorePersister for Persister {
+               fn persist<W: Writeable>(&self, key: &str, object: &W) -> std::io::Result<()> {
+                       if key == "manager" {
+                               if let Some((error, message)) = self.manager_error {
+                                       return Err(std::io::Error::new(error, message))
+                               }
+                       }
+
+                       if key == "network_graph" {
+                               if let Some((error, message)) = self.graph_error {
+                                       return Err(std::io::Error::new(error, message))
+                               }
+                       }
+
+                       if key == "scorer" {
+                               if let Some((error, message)) = self.scorer_error {
+                                       return Err(std::io::Error::new(error, message))
+                               }
+                       }
+
+                       self.filesystem_persister.persist(key, object)
                }
        }
 
@@ -458,14 +503,15 @@ mod tests {
                        let net_graph_msg_handler = Some(Arc::new(NetGraphMsgHandler::new(network_graph.clone(), Some(chain_source.clone()), logger.clone())));
                        let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new() )};
                        let peer_manager = Arc::new(PeerManager::new(msg_handler, keys_manager.get_node_secret(Recipient::Node).unwrap(), &seed, logger.clone(), IgnoringMessageHandler{}));
-                       let node = Node { node: manager, net_graph_msg_handler, peer_manager, chain_monitor, persister, tx_broadcaster, network_graph, logger, best_block };
+                       let scorer = Arc::new(Mutex::new(test_utils::TestScorer::with_penalty(0)));
+                       let node = Node { node: manager, net_graph_msg_handler, peer_manager, chain_monitor, persister, tx_broadcaster, network_graph, logger, best_block, scorer };
                        nodes.push(node);
                }
 
                for i in 0..num_nodes {
                        for j in (i+1)..num_nodes {
-                               nodes[i].node.peer_connected(&nodes[j].node.get_our_node_id(), &Init { features: InitFeatures::known() });
-                               nodes[j].node.peer_connected(&nodes[i].node.get_our_node_id(), &Init { features: InitFeatures::known() });
+                               nodes[i].node.peer_connected(&nodes[j].node.get_our_node_id(), &Init { features: InitFeatures::known(), remote_network_address: None });
+                               nodes[j].node.peer_connected(&nodes[i].node.get_our_node_id(), &Init { features: InitFeatures::known(), remote_network_address: None });
                        }
                }
 
@@ -494,7 +540,7 @@ mod tests {
        macro_rules! handle_funding_generation_ready {
                ($event: expr, $channel_value: expr) => {{
                        match $event {
-                               &Event::FundingGenerationReady { temporary_channel_id, channel_value_satoshis, ref output_script, user_channel_id } => {
+                               &Event::FundingGenerationReady { temporary_channel_id, channel_value_satoshis, ref output_script, user_channel_id, .. } => {
                                        assert_eq!(channel_value_satoshis, $channel_value);
                                        assert_eq!(user_channel_id, 42);
 
@@ -554,19 +600,20 @@ mod tests {
 
                // Initiate the background processors to watch each node.
                let data_dir = nodes[0].persister.get_data_dir();
-               let persister = Persister { data_dir };
+               let persister = Arc::new(Persister::new(data_dir));
                let event_handler = |_: &_| {};
-               let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone());
+               let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
 
                macro_rules! check_persisted_data {
-                       ($node: expr, $filepath: expr, $expected_bytes: expr) => {
+                       ($node: expr, $filepath: expr) => {
+                               let mut expected_bytes = Vec::new();
                                loop {
-                                       $expected_bytes.clear();
-                                       match $node.write(&mut $expected_bytes) {
+                                       expected_bytes.clear();
+                                       match $node.write(&mut expected_bytes) {
                                                Ok(()) => {
                                                        match std::fs::read($filepath) {
                                                                Ok(bytes) => {
-                                                                       if bytes == $expected_bytes {
+                                                                       if bytes == expected_bytes {
                                                                                break
                                                                        } else {
                                                                                continue
@@ -583,8 +630,7 @@ mod tests {
 
                // Check that the initial channel manager data is persisted as expected.
                let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "manager".to_string());
-               let mut expected_bytes = Vec::new();
-               check_persisted_data!(nodes[0].node, filepath.clone(), expected_bytes);
+               check_persisted_data!(nodes[0].node, filepath.clone());
 
                loop {
                        if !nodes[0].node.get_persistence_condvar_value() { break }
@@ -594,20 +640,22 @@ mod tests {
                nodes[0].node.force_close_channel(&OutPoint { txid: tx.txid(), index: 0 }.to_channel_id()).unwrap();
 
                // Check that the force-close updates are persisted.
-               let mut expected_bytes = Vec::new();
-               check_persisted_data!(nodes[0].node, filepath.clone(), expected_bytes);
+               check_persisted_data!(nodes[0].node, filepath.clone());
                loop {
                        if !nodes[0].node.get_persistence_condvar_value() { break }
                }
 
                // Check network graph is persisted
                let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "network_graph".to_string());
-               let mut expected_bytes = Vec::new();
                if let Some(ref handler) = nodes[0].net_graph_msg_handler {
                        let network_graph = handler.network_graph();
-                       check_persisted_data!(network_graph, filepath.clone(), expected_bytes);
+                       check_persisted_data!(network_graph, filepath.clone());
                }
 
+               // Check scorer is persisted
+               let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "scorer".to_string());
+               check_persisted_data!(nodes[0].scorer, filepath.clone());
+
                assert!(bg_processor.stop().is_ok());
        }
 
@@ -617,9 +665,9 @@ mod tests {
                // `FRESHNESS_TIMER`.
                let nodes = create_nodes(1, "test_timer_tick_called".to_string());
                let data_dir = nodes[0].persister.get_data_dir();
-               let persister = Persister { data_dir };
+               let persister = Arc::new(Persister::new(data_dir));
                let event_handler = |_: &_| {};
-               let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone());
+               let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
                loop {
                        let log_entries = nodes[0].logger.lines.lock().unwrap();
                        let desired_log = "Calling ChannelManager's timer_tick_occurred".to_string();
@@ -639,30 +687,10 @@ mod tests {
                let nodes = create_nodes(2, "test_persist_error".to_string());
                open_channel!(nodes[0], nodes[1], 100000);
 
-               struct ChannelManagerErrorPersister {
-                       data_dir: String,
-               }
-
-               impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L:Deref> super::Persister<Signer, M, T, K, F, L> for ChannelManagerErrorPersister where
-                       M::Target: 'static + chain::Watch<Signer>,
-                       T::Target: 'static + BroadcasterInterface,
-                       K::Target: 'static + KeysInterface<Signer = Signer>,
-                       F::Target: 'static + FeeEstimator,
-                       L::Target: 'static + Logger,
-               {
-                       fn persist_manager(&self, _channel_manager: &ChannelManager<Signer, M, T, K, F, L>) -> Result<(), std::io::Error> {
-                               Err(std::io::Error::new(std::io::ErrorKind::Other, "test"))
-                       }
-
-                       fn persist_graph(&self, network_graph: &NetworkGraph) -> Result<(), std::io::Error> {
-                               FilesystemPersister::persist_network_graph(self.data_dir.clone(), network_graph)
-                       }
-               }
-
                let data_dir = nodes[0].persister.get_data_dir();
-               let persister = ChannelManagerErrorPersister{ data_dir };
+               let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
                let event_handler = |_: &_| {};
-               let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone());
+               let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
                match bg_processor.join() {
                        Ok(_) => panic!("Expected error persisting manager"),
                        Err(e) => {
@@ -676,33 +704,31 @@ mod tests {
        fn test_network_graph_persist_error() {
                // Test that if we encounter an error during network graph persistence, an error gets returned.
                let nodes = create_nodes(2, "test_persist_network_graph_error".to_string());
-               struct NetworkGraphErrorPersister {
-                       data_dir: String,
-               }
-
-               impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L:Deref> super::Persister<Signer, M, T, K, F, L> for NetworkGraphErrorPersister where
-                       M::Target: 'static + chain::Watch<Signer>,
-                       T::Target: 'static + BroadcasterInterface,
-                       K::Target: 'static + KeysInterface<Signer = Signer>,
-                       F::Target: 'static + FeeEstimator,
-                       L::Target: 'static + Logger,
-               {
-                       fn persist_manager(&self, channel_manager: &ChannelManager<Signer, M, T, K, F, L>) -> Result<(), std::io::Error> {
-                               FilesystemPersister::persist_manager(self.data_dir.clone(), channel_manager)
-                       }
+               let data_dir = nodes[0].persister.get_data_dir();
+               let persister = Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test"));
+               let event_handler = |_: &_| {};
+               let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
 
-                       fn persist_graph(&self, _network_graph: &NetworkGraph) -> Result<(), std::io::Error> {
-                               Err(std::io::Error::new(std::io::ErrorKind::Other, "test"))
-                       }
+               match bg_processor.stop() {
+                       Ok(_) => panic!("Expected error persisting network graph"),
+                       Err(e) => {
+                               assert_eq!(e.kind(), std::io::ErrorKind::Other);
+                               assert_eq!(e.get_ref().unwrap().to_string(), "test");
+                       },
                }
+       }
 
+       #[test]
+       fn test_scorer_persist_error() {
+               // Test that if we encounter an error during scorer persistence, an error gets returned.
+               let nodes = create_nodes(2, "test_persist_scorer_error".to_string());
                let data_dir = nodes[0].persister.get_data_dir();
-               let persister = NetworkGraphErrorPersister { data_dir };
+               let persister = Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test"));
                let event_handler = |_: &_| {};
-               let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone());
+               let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
 
                match bg_processor.stop() {
-                       Ok(_) => panic!("Expected error persisting network graph"),
+                       Ok(_) => panic!("Expected error persisting scorer"),
                        Err(e) => {
                                assert_eq!(e.kind(), std::io::ErrorKind::Other);
                                assert_eq!(e.get_ref().unwrap().to_string(), "test");
@@ -715,14 +741,14 @@ mod tests {
                let mut nodes = create_nodes(2, "test_background_event_handling".to_string());
                let channel_value = 100000;
                let data_dir = nodes[0].persister.get_data_dir();
-               let persister = Persister { data_dir: data_dir.clone() };
+               let persister = Arc::new(Persister::new(data_dir.clone()));
 
                // Set up a background event handler for FundingGenerationReady events.
                let (sender, receiver) = std::sync::mpsc::sync_channel(1);
                let event_handler = move |event: &Event| {
                        sender.send(handle_funding_generation_ready!(event, channel_value)).unwrap();
                };
-               let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone());
+               let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
 
                // Open a channel and check that the FundingGenerationReady event was handled.
                begin_open_channel!(nodes[0], nodes[1], channel_value);
@@ -746,7 +772,8 @@ mod tests {
                // Set up a background event handler for SpendableOutputs events.
                let (sender, receiver) = std::sync::mpsc::sync_channel(1);
                let event_handler = move |event: &Event| sender.send(event.clone()).unwrap();
-               let bg_processor = BackgroundProcessor::start(Persister{ data_dir: data_dir.clone() }, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone());
+               let persister = Arc::new(Persister::new(data_dir));
+               let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
 
                // Force close the channel and check that the SpendableOutputs event was handled.
                nodes[0].node.force_close_channel(&nodes[0].node.list_channels()[0].channel_id).unwrap();
@@ -772,12 +799,11 @@ mod tests {
 
                // Initiate the background processors to watch each node.
                let data_dir = nodes[0].persister.get_data_dir();
-               let persister = Persister { data_dir };
-               let scorer = Arc::new(Mutex::new(test_utils::TestScorer::with_penalty(0)));
+               let persister = Arc::new(Persister::new(data_dir));
                let router = DefaultRouter::new(Arc::clone(&nodes[0].network_graph), Arc::clone(&nodes[0].logger), random_seed_bytes);
-               let invoice_payer = Arc::new(InvoicePayer::new(Arc::clone(&nodes[0].node), router, scorer, Arc::clone(&nodes[0].logger), |_: &_| {}, RetryAttempts(2)));
+               let invoice_payer = Arc::new(InvoicePayer::new(Arc::clone(&nodes[0].node), router, Arc::clone(&nodes[0].scorer), Arc::clone(&nodes[0].logger), |_: &_| {}, RetryAttempts(2)));
                let event_handler = Arc::clone(&invoice_payer);
-               let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone());
+               let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
                assert!(bg_processor.stop().is_ok());
        }
 }