]> git.bitcoin.ninja Git - rust-lightning/commitdiff
Add utils to persist scorer in BackgroundProcessor
authorJurvis Tan <5944973+jurvis@users.noreply.github.com>
Thu, 28 Apr 2022 05:16:38 +0000 (22:16 -0700)
committerJurvis Tan <5944973+jurvis@users.noreply.github.com>
Tue, 3 May 2022 22:28:10 +0000 (15:28 -0700)
move last_prune_call back

lightning-background-processor/src/lib.rs
lightning/src/routing/scoring.rs
lightning/src/util/persist.rs

index e23737c0adcf16fec641c0d48aac940645c54716..e306aeb3c880247d1ea0c6b01c39abba2dfd75fd 100644 (file)
@@ -18,6 +18,7 @@ use lightning::ln::channelmanager::ChannelManager;
 use lightning::ln::msgs::{ChannelMessageHandler, RoutingMessageHandler};
 use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor};
 use lightning::routing::network_graph::{NetworkGraph, NetGraphMsgHandler};
+use lightning::routing::scoring::WriteableScore;
 use lightning::util::events::{Event, EventHandler, EventsProvider};
 use lightning::util::logger::Logger;
 use lightning::util::persist::Persister;
@@ -151,6 +152,7 @@ impl BackgroundProcessor {
        /// [`NetworkGraph`]: lightning::routing::network_graph::NetworkGraph
        /// [`NetworkGraph::write`]: lightning::routing::network_graph::NetworkGraph#impl-Writeable
        pub fn start<
+               'a,
                Signer: 'static + Sign,
                CA: 'static + Deref + Send + Sync,
                CF: 'static + Deref + Send + Sync,
@@ -171,9 +173,11 @@ impl BackgroundProcessor {
                NG: 'static + Deref<Target = NetGraphMsgHandler<G, CA, L>> + Send + Sync,
                UMH: 'static + Deref + Send + Sync,
                PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, L, UMH>> + Send + Sync,
+               S: 'static + Deref<Target = SC> + Send + Sync,
+               SC: WriteableScore<'a>,
        >(
                persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
-               net_graph_msg_handler: Option<NG>, peer_manager: PM, logger: L
+               net_graph_msg_handler: Option<NG>, peer_manager: PM, logger: L, scorer: Option<S>
        ) -> Self
        where
                CA::Target: 'static + chain::Access,
@@ -187,7 +191,7 @@ impl BackgroundProcessor {
                CMH::Target: 'static + ChannelMessageHandler,
                RMH::Target: 'static + RoutingMessageHandler,
                UMH::Target: 'static + CustomMessageHandler,
-               PS::Target: 'static + Persister<Signer, CW, T, K, F, L>
+               PS::Target: 'static + Persister<'a, Signer, CW, T, K, F, L, SC>,
        {
                let stop_thread = Arc::new(AtomicBool::new(false));
                let stop_thread_clone = stop_thread.clone();
@@ -277,6 +281,12 @@ impl BackgroundProcessor {
                                                last_prune_call = Instant::now();
                                                have_pruned = true;
                                        }
+                                       if let Some(ref scorer) = scorer {
+                                               log_trace!(logger, "Persisting scorer");
+                                               if let Err(e) = persister.persist_scorer(&scorer) {
+                                                       log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
+                                               }
+                                       }
                                }
                        }
 
@@ -285,10 +295,16 @@ impl BackgroundProcessor {
                        // ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
                        persister.persist_manager(&*channel_manager)?;
 
+                       // Persist Scorer on exit
+                       if let Some(ref scorer) = scorer {
+                               persister.persist_scorer(&scorer)?;
+                       }
+
                        // Persist NetworkGraph on exit
                        if let Some(ref handler) = net_graph_msg_handler {
                                persister.persist_graph(handler.network_graph())?;
                        }
+
                        Ok(())
                });
                Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
@@ -369,6 +385,7 @@ mod tests {
        use std::path::PathBuf;
        use std::sync::{Arc, Mutex};
        use std::time::Duration;
+       use lightning::routing::scoring::{FixedPenaltyScorer};
        use super::{BackgroundProcessor, FRESHNESS_TIMER};
 
        const EVENT_DEADLINE: u64 = 5 * FRESHNESS_TIMER;
@@ -395,6 +412,7 @@ mod tests {
                network_graph: Arc<NetworkGraph>,
                logger: Arc<test_utils::TestLogger>,
                best_block: BestBlock,
+               scorer: Arc<Mutex<FixedPenaltyScorer>>,
        }
 
        impl Drop for Node {
@@ -410,13 +428,14 @@ mod tests {
        struct Persister {
                graph_error: Option<(std::io::ErrorKind, &'static str)>,
                manager_error: Option<(std::io::ErrorKind, &'static str)>,
+               scorer_error: Option<(std::io::ErrorKind, &'static str)>,
                filesystem_persister: FilesystemPersister,
        }
 
        impl Persister {
                fn new(data_dir: String) -> Self {
                        let filesystem_persister = FilesystemPersister::new(data_dir.clone());
-                       Self { graph_error: None, manager_error: None, filesystem_persister }
+                       Self { graph_error: None, manager_error: None, scorer_error: None, filesystem_persister }
                }
 
                fn with_graph_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
@@ -426,6 +445,10 @@ mod tests {
                fn with_manager_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
                        Self { manager_error: Some((error, message)), ..self }
                }
+
+               fn with_scorer_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
+                       Self { scorer_error: Some((error, message)), ..self }
+               }
        }
 
        impl KVStorePersister for Persister {
@@ -442,6 +465,12 @@ mod tests {
                                }
                        }
 
+                       if key == "scorer" {
+                               if let Some((error, message)) = self.scorer_error {
+                                       return Err(std::io::Error::new(error, message))
+                               }
+                       }
+
                        self.filesystem_persister.persist(key, object)
                }
        }
@@ -473,7 +502,8 @@ mod tests {
                        let net_graph_msg_handler = Some(Arc::new(NetGraphMsgHandler::new(network_graph.clone(), Some(chain_source.clone()), logger.clone())));
                        let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new() )};
                        let peer_manager = Arc::new(PeerManager::new(msg_handler, keys_manager.get_node_secret(Recipient::Node).unwrap(), &seed, logger.clone(), IgnoringMessageHandler{}));
-                       let node = Node { node: manager, net_graph_msg_handler, peer_manager, chain_monitor, persister, tx_broadcaster, network_graph, logger, best_block };
+                       let scorer = Arc::new(Mutex::new(test_utils::TestScorer::with_penalty(0)));
+                       let node = Node { node: manager, net_graph_msg_handler, peer_manager, chain_monitor, persister, tx_broadcaster, network_graph, logger, best_block, scorer };
                        nodes.push(node);
                }
 
@@ -571,7 +601,7 @@ mod tests {
                let data_dir = nodes[0].persister.get_data_dir();
                let persister = Arc::new(Persister::new(data_dir));
                let event_handler = |_: &_| {};
-               let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone());
+               let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
 
                macro_rules! check_persisted_data {
                        ($node: expr, $filepath: expr) => {
@@ -621,6 +651,10 @@ mod tests {
                        check_persisted_data!(network_graph, filepath.clone());
                }
 
+               // Check scorer is persisted
+               let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "scorer".to_string());
+               check_persisted_data!(nodes[0].scorer, filepath.clone());
+
                assert!(bg_processor.stop().is_ok());
        }
 
@@ -632,7 +666,7 @@ mod tests {
                let data_dir = nodes[0].persister.get_data_dir();
                let persister = Arc::new(Persister::new(data_dir));
                let event_handler = |_: &_| {};
-               let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone());
+               let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
                loop {
                        let log_entries = nodes[0].logger.lines.lock().unwrap();
                        let desired_log = "Calling ChannelManager's timer_tick_occurred".to_string();
@@ -655,7 +689,7 @@ mod tests {
                let data_dir = nodes[0].persister.get_data_dir();
                let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
                let event_handler = |_: &_| {};
-               let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone());
+               let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
                match bg_processor.join() {
                        Ok(_) => panic!("Expected error persisting manager"),
                        Err(e) => {
@@ -672,7 +706,7 @@ mod tests {
                let data_dir = nodes[0].persister.get_data_dir();
                let persister = Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test"));
                let event_handler = |_: &_| {};
-               let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone());
+               let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
 
                match bg_processor.stop() {
                        Ok(_) => panic!("Expected error persisting network graph"),
@@ -683,6 +717,24 @@ mod tests {
                }
        }
 
+       #[test]
+       fn test_scorer_persist_error() {
+               // Test that if we encounter an error during scorer persistence, an error gets returned.
+               let nodes = create_nodes(2, "test_persist_scorer_error".to_string());
+               let data_dir = nodes[0].persister.get_data_dir();
+               let persister = Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test"));
+               let event_handler = |_: &_| {};
+               let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
+
+               match bg_processor.stop() {
+                       Ok(_) => panic!("Expected error persisting scorer"),
+                       Err(e) => {
+                               assert_eq!(e.kind(), std::io::ErrorKind::Other);
+                               assert_eq!(e.get_ref().unwrap().to_string(), "test");
+                       },
+               }
+       }
+
        #[test]
        fn test_background_event_handling() {
                let mut nodes = create_nodes(2, "test_background_event_handling".to_string());
@@ -695,7 +747,7 @@ mod tests {
                let event_handler = move |event: &Event| {
                        sender.send(handle_funding_generation_ready!(event, channel_value)).unwrap();
                };
-               let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone());
+               let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
 
                // Open a channel and check that the FundingGenerationReady event was handled.
                begin_open_channel!(nodes[0], nodes[1], channel_value);
@@ -720,7 +772,7 @@ mod tests {
                let (sender, receiver) = std::sync::mpsc::sync_channel(1);
                let event_handler = move |event: &Event| sender.send(event.clone()).unwrap();
                let persister = Arc::new(Persister::new(data_dir));
-               let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone());
+               let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
 
                // Force close the channel and check that the SpendableOutputs event was handled.
                nodes[0].node.force_close_channel(&nodes[0].node.list_channels()[0].channel_id).unwrap();
@@ -747,11 +799,10 @@ mod tests {
                // Initiate the background processors to watch each node.
                let data_dir = nodes[0].persister.get_data_dir();
                let persister = Arc::new(Persister::new(data_dir));
-               let scorer = Arc::new(Mutex::new(test_utils::TestScorer::with_penalty(0)));
                let router = DefaultRouter::new(Arc::clone(&nodes[0].network_graph), Arc::clone(&nodes[0].logger), random_seed_bytes);
-               let invoice_payer = Arc::new(InvoicePayer::new(Arc::clone(&nodes[0].node), router, scorer, Arc::clone(&nodes[0].logger), |_: &_| {}, RetryAttempts(2)));
+               let invoice_payer = Arc::new(InvoicePayer::new(Arc::clone(&nodes[0].node), router, Arc::clone(&nodes[0].scorer), Arc::clone(&nodes[0].logger), |_: &_| {}, RetryAttempts(2)));
                let event_handler = Arc::clone(&invoice_payer);
-               let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone());
+               let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
                assert!(bg_processor.stop().is_ok());
        }
 }
index 0e04c63903392b8ccfc36d72c3fa87109b86c17b..ce1149e0f6460f618f6e5d2befcf6669a33a5edc 100644 (file)
@@ -137,6 +137,14 @@ pub trait LockableScore<'a> {
        fn lock(&'a self) -> Self::Locked;
 }
 
+/// Refers to a scorer that is accessible under lock and also writeable to disk
+///
+/// We need this trait to be able to pass in a scorer to `lightning-background-processor` that will enable us to
+/// use the Persister to persist it.
+pub trait WriteableScore<'a>: LockableScore<'a> + Writeable {}
+
+impl<'a, T> WriteableScore<'a> for T where T: LockableScore<'a> + Writeable {}
+
 /// (C-not exported)
 impl<'a, T: 'a + Score> LockableScore<'a> for Mutex<T> {
        type Locked = MutexGuard<'a, T>;
index 9476331c15618fb3cf099b7b875dee8e2ec18cd0..5c124c21afdb25984d9d43362726f858fb292c8b 100644 (file)
@@ -11,6 +11,7 @@
 use core::ops::Deref;
 use bitcoin::hashes::hex::ToHex;
 use io::{self};
+use routing::scoring::WriteableScore;
 
 use crate::{chain::{keysinterface::{Sign, KeysInterface}, self, transaction::{OutPoint}, chaininterface::{BroadcasterInterface, FeeEstimator}, chainmonitor::{Persist, MonitorUpdateId}, channelmonitor::{ChannelMonitor, ChannelMonitorUpdate}}, ln::channelmanager::ChannelManager, routing::network_graph::NetworkGraph};
 use super::{logger::Logger, ser::Writeable};
@@ -24,37 +25,47 @@ pub trait KVStorePersister {
        fn persist<W: Writeable>(&self, key: &str, object: &W) -> io::Result<()>;
 }
 
-/// Trait that handles persisting a [`ChannelManager`] and [`NetworkGraph`] to disk.
-pub trait Persister<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
+/// Trait that handles persisting a [`ChannelManager`], [`NetworkGraph`], and [`WriteableScore`] to disk.
+pub trait Persister<'a, Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref, S>
        where M::Target: 'static + chain::Watch<Signer>,
                T::Target: 'static + BroadcasterInterface,
                K::Target: 'static + KeysInterface<Signer = Signer>,
                F::Target: 'static + FeeEstimator,
                L::Target: 'static + Logger,
+               S: WriteableScore<'a>,
 {
        /// Persist the given ['ChannelManager'] to disk, returning an error if persistence failed.
        fn persist_manager(&self, channel_manager: &ChannelManager<Signer, M, T, K, F, L>) -> Result<(), io::Error>;
 
        /// Persist the given [`NetworkGraph`] to disk, returning an error if persistence failed.
        fn persist_graph(&self, network_graph: &NetworkGraph) -> Result<(), io::Error>;
+
+       /// Persist the given [`WriteableScore`] to disk, returning an error if persistence failed.
+       fn persist_scorer(&self, scorer: &S) -> Result<(), io::Error>;
 }
 
-impl<A: KVStorePersister, Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> Persister<Signer, M, T, K, F, L> for A
+impl<'a, A: KVStorePersister, Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref, S> Persister<'a, Signer, M, T, K, F, L, S> for A
        where M::Target: 'static + chain::Watch<Signer>,
                T::Target: 'static + BroadcasterInterface,
                K::Target: 'static + KeysInterface<Signer = Signer>,
                F::Target: 'static + FeeEstimator,
                L::Target: 'static + Logger,
+               S: WriteableScore<'a>,
 {
-       /// Persist the given ['ChannelManager'] to disk, returning an error if persistence failed.
+       /// Persist the given ['ChannelManager'] to disk with the name "manager", returning an error if persistence failed.
        fn persist_manager(&self, channel_manager: &ChannelManager<Signer, M, T, K, F, L>) -> Result<(), io::Error> {
                self.persist("manager", channel_manager)
        }
 
-       /// Persist the given [`NetworkGraph`] to disk, returning an error if persistence failed.
+       /// Persist the given [`NetworkGraph`] to disk with the name "network_graph", returning an error if persistence failed.
        fn persist_graph(&self, network_graph: &NetworkGraph) -> Result<(), io::Error> {
                self.persist("network_graph", network_graph)
        }
+
+       /// Persist the given [`WriteableScore`] to disk with name "scorer", returning an error if persistence failed.
+       fn persist_scorer(&self, scorer: &S) -> Result<(), io::Error> {
+               self.persist("scorer", &scorer)
+       }
 }
 
 impl<ChannelSigner: Sign, K: KVStorePersister> Persist<ChannelSigner> for K {