Add NetworkGraph persistence
authorJurvis Tan <5944973+jurvis@users.noreply.github.com>
Tue, 22 Mar 2022 03:13:14 +0000 (20:13 -0700)
committerJurvis Tan <5944973+jurvis@users.noreply.github.com>
Wed, 30 Mar 2022 02:38:40 +0000 (19:38 -0700)
Instead of creating a separate trait for persisting NetworkGraph, use and rename the existing ChannelManagerPersister to handle them both. persist_graph is then called on removal of stale channels and on exit.

lightning-background-processor/src/lib.rs
lightning-persister/src/lib.rs

index 084ea62efb6faf51f0ff67b0b5fb01fc61f965ef..0b5f8e4875d84f5b05e340dba0d1901371efc998 100644 (file)
@@ -75,10 +75,13 @@ const PING_TIMER: u64 = 1;
 /// Prune the network graph of stale entries hourly.
 const NETWORK_PRUNE_TIMER: u64 = 60 * 60;
 
-/// Trait which handles persisting a [`ChannelManager`] to disk.
-///
-/// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
-pub trait ChannelManagerPersister<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
+#[cfg(not(test))]
+const FIRST_NETWORK_PRUNE_TIMER: u64 = 60;
+#[cfg(test)]
+const FIRST_NETWORK_PRUNE_TIMER: u64 = 1;
+
+/// Trait that handles persisting a [`ChannelManager`] and [`NetworkGraph`] to disk.
+pub trait Persister<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
 where
        M::Target: 'static + chain::Watch<Signer>,
        T::Target: 'static + BroadcasterInterface,
@@ -87,24 +90,11 @@ where
        L::Target: 'static + Logger,
 {
        /// Persist the given [`ChannelManager`] to disk, returning an error if persistence failed
-       /// (which will cause the [`BackgroundProcessor`] which called this method to exit.
-       ///
-       /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
+       /// (which will cause the [`BackgroundProcessor`] which called this method to exit).
        fn persist_manager(&self, channel_manager: &ChannelManager<Signer, M, T, K, F, L>) -> Result<(), std::io::Error>;
-}
 
-impl<Fun, Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
-ChannelManagerPersister<Signer, M, T, K, F, L> for Fun where
-       M::Target: 'static + chain::Watch<Signer>,
-       T::Target: 'static + BroadcasterInterface,
-       K::Target: 'static + KeysInterface<Signer = Signer>,
-       F::Target: 'static + FeeEstimator,
-       L::Target: 'static + Logger,
-       Fun: Fn(&ChannelManager<Signer, M, T, K, F, L>) -> Result<(), std::io::Error>,
-{
-       fn persist_manager(&self, channel_manager: &ChannelManager<Signer, M, T, K, F, L>) -> Result<(), std::io::Error> {
-               self(channel_manager)
-       }
+       /// Persist the given [`NetworkGraph`] to disk, returning an error if persistence failed.
+       fn persist_graph(&self, network_graph: &NetworkGraph) -> Result<(), std::io::Error>;
 }
 
 /// Decorates an [`EventHandler`] with common functionality provided by standard [`EventHandler`]s.
@@ -141,17 +131,21 @@ impl BackgroundProcessor {
        /// documentation].
        ///
        /// The thread runs indefinitely unless the object is dropped, [`stop`] is called, or
-       /// `persist_manager` returns an error. In case of an error, the error is retrieved by calling
+       /// [`Persister::persist_manager`] returns an error. In case of an error, the error is retrieved by calling
        /// either [`join`] or [`stop`].
        ///
        /// # Data Persistence
        ///
-       /// `persist_manager` is responsible for writing out the [`ChannelManager`] to disk, and/or
+       /// [`Persister::persist_manager`] is responsible for writing out the [`ChannelManager`] to disk, and/or
        /// uploading to one or more backup services. See [`ChannelManager::write`] for writing out a
        /// [`ChannelManager`]. See [`FilesystemPersister::persist_manager`] for Rust-Lightning's
        /// provided implementation.
        ///
-       /// Typically, users should either implement [`ChannelManagerPersister`] to never return an
+       /// [`Persister::persist_graph`] is responsible for writing out the [`NetworkGraph`] to disk. See
+       /// [`NetworkGraph::write`] for writing out a [`NetworkGraph`]. See [`FilesystemPersister::persist_network_graph`]
+       /// for Rust-Lightning's provided implementation.
+       ///
+       /// Typically, users should either implement [`Persister::persist_manager`] to never return an
        /// error or call [`join`] and handle any error that may arise. For the latter case,
        /// `BackgroundProcessor` must be restarted by calling `start` again after handling the error.
        ///
@@ -168,7 +162,9 @@ impl BackgroundProcessor {
        /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
        /// [`ChannelManager::write`]: lightning::ln::channelmanager::ChannelManager#impl-Writeable
        /// [`FilesystemPersister::persist_manager`]: lightning_persister::FilesystemPersister::persist_manager
+       /// [`FilesystemPersister::persist_network_graph`]: lightning_persister::FilesystemPersister::persist_network_graph
        /// [`NetworkGraph`]: lightning::routing::network_graph::NetworkGraph
+       /// [`NetworkGraph::write`]: lightning::routing::network_graph::NetworkGraph#impl-Writeable
        pub fn start<
                Signer: 'static + Sign,
                CA: 'static + Deref + Send + Sync,
@@ -184,14 +180,14 @@ impl BackgroundProcessor {
                CMH: 'static + Deref + Send + Sync,
                RMH: 'static + Deref + Send + Sync,
                EH: 'static + EventHandler + Send,
-               CMP: 'static + Send + ChannelManagerPersister<Signer, CW, T, K, F, L>,
+               PS: 'static + Send + Persister<Signer, CW, T, K, F, L>,
                M: 'static + Deref<Target = ChainMonitor<Signer, CF, T, F, L, P>> + Send + Sync,
                CM: 'static + Deref<Target = ChannelManager<Signer, CW, T, K, F, L>> + Send + Sync,
                NG: 'static + Deref<Target = NetGraphMsgHandler<G, CA, L>> + Send + Sync,
                UMH: 'static + Deref + Send + Sync,
                PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, L, UMH>> + Send + Sync,
        >(
-               persister: CMP, event_handler: EH, chain_monitor: M, channel_manager: CM,
+               persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
                net_graph_msg_handler: Option<NG>, peer_manager: PM, logger: L
        ) -> Self
        where
@@ -273,19 +269,29 @@ impl BackgroundProcessor {
                                // falling back to our usual hourly prunes. This avoids short-lived clients never
                                // pruning their network graph. We run once 60 seconds after startup before
                                // continuing our normal cadence.
-                               if last_prune_call.elapsed().as_secs() > if have_pruned { NETWORK_PRUNE_TIMER } else { 60 } {
+                               if last_prune_call.elapsed().as_secs() > if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER } {
                                        if let Some(ref handler) = net_graph_msg_handler {
                                                log_trace!(logger, "Pruning network graph of stale entries");
-                                               handler.network_graph().remove_stale_channels();
+                                               handler.network_graph().remove_stale_channels(); 
+                                               if let Err(e) = persister.persist_graph(handler.network_graph()) {
+                                                       log_error!(logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
+                                               }
                                                last_prune_call = Instant::now();
                                                have_pruned = true;
                                        }
                                }
                        }
+
                        // After we exit, ensure we persist the ChannelManager one final time - this avoids
                        // some races where users quit while channel updates were in-flight, with
                        // ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
-                       persister.persist_manager(&*channel_manager)
+                       persister.persist_manager(&*channel_manager)?;
+
+                       // Persist NetworkGraph on exit
+                       if let Some(ref handler) = net_graph_msg_handler {
+                               persister.persist_graph(handler.network_graph())?;
+                       }
+                       Ok(())
                });
                Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
        }
@@ -343,9 +349,10 @@ mod tests {
        use bitcoin::blockdata::constants::genesis_block;
        use bitcoin::blockdata::transaction::{Transaction, TxOut};
        use bitcoin::network::constants::Network;
-       use lightning::chain::{BestBlock, Confirm, chainmonitor};
+       use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
+       use lightning::chain::{BestBlock, Confirm, chainmonitor, self};
        use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
-       use lightning::chain::keysinterface::{InMemorySigner, Recipient, KeysInterface, KeysManager};
+       use lightning::chain::keysinterface::{InMemorySigner, Recipient, KeysInterface, KeysManager, Sign};
        use lightning::chain::transaction::OutPoint;
        use lightning::get_event_msg;
        use lightning::ln::channelmanager::{BREAKDOWN_TIMEOUT, ChainParameters, ChannelManager, SimpleArcChannelManager};
@@ -355,12 +362,14 @@ mod tests {
        use lightning::routing::network_graph::{NetworkGraph, NetGraphMsgHandler};
        use lightning::util::config::UserConfig;
        use lightning::util::events::{Event, MessageSendEventsProvider, MessageSendEvent};
+       use lightning::util::logger::Logger;
        use lightning::util::ser::Writeable;
        use lightning::util::test_utils;
        use lightning_invoice::payment::{InvoicePayer, RetryAttempts};
        use lightning_invoice::utils::DefaultRouter;
        use lightning_persister::FilesystemPersister;
        use std::fs;
+       use std::ops::Deref;
        use std::path::PathBuf;
        use std::sync::{Arc, Mutex};
        use std::time::Duration;
@@ -402,6 +411,26 @@ mod tests {
                }
        }
 
+       struct Persister {
+               data_dir: String,
+       }
+
+       impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L:Deref> super::Persister<Signer, M, T, K, F, L> for Persister where
+               M::Target: 'static + chain::Watch<Signer>,
+               T::Target: 'static + BroadcasterInterface,
+               K::Target: 'static + KeysInterface<Signer = Signer>,
+               F::Target: 'static + FeeEstimator,
+               L::Target: 'static + Logger,
+       {
+               fn persist_manager(&self, channel_manager: &ChannelManager<Signer, M, T, K, F, L>) -> Result<(), std::io::Error> {
+                       FilesystemPersister::persist_manager(self.data_dir.clone(), channel_manager)
+               }
+
+               fn persist_graph(&self, network_graph: &NetworkGraph) -> Result<(), std::io::Error> {
+                       FilesystemPersister::persist_network_graph(self.data_dir.clone(), network_graph)
+               }
+       }
+
        fn get_full_filepath(filepath: String, filename: String) -> String {
                let mut path = PathBuf::from(filepath);
                path.push(filename);
@@ -525,7 +554,7 @@ mod tests {
 
                // Initiate the background processors to watch each node.
                let data_dir = nodes[0].persister.get_data_dir();
-               let persister = move |node: &ChannelManager<InMemorySigner, Arc<ChainMonitor>, Arc<test_utils::TestBroadcaster>, Arc<KeysManager>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>>| FilesystemPersister::persist_manager(data_dir.clone(), node);
+               let persister = Persister { data_dir };
                let event_handler = |_: &_| {};
                let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone());
 
@@ -556,6 +585,7 @@ mod tests {
                let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "manager".to_string());
                let mut expected_bytes = Vec::new();
                check_persisted_data!(nodes[0].node, filepath.clone(), expected_bytes);
+
                loop {
                        if !nodes[0].node.get_persistence_condvar_value() { break }
                }
@@ -570,6 +600,14 @@ mod tests {
                        if !nodes[0].node.get_persistence_condvar_value() { break }
                }
 
+               // Check network graph is persisted
+               let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "network_graph".to_string());
+               let mut expected_bytes = Vec::new();
+               if let Some(ref handler) = nodes[0].net_graph_msg_handler {
+                       let network_graph = handler.network_graph();
+                       check_persisted_data!(network_graph, filepath.clone(), expected_bytes);
+               }
+
                assert!(bg_processor.stop().is_ok());
        }
 
@@ -579,7 +617,7 @@ mod tests {
                // `FRESHNESS_TIMER`.
                let nodes = create_nodes(1, "test_timer_tick_called".to_string());
                let data_dir = nodes[0].persister.get_data_dir();
-               let persister = move |node: &ChannelManager<InMemorySigner, Arc<ChainMonitor>, Arc<test_utils::TestBroadcaster>, Arc<KeysManager>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>>| FilesystemPersister::persist_manager(data_dir.clone(), node);
+               let persister = Persister { data_dir };
                let event_handler = |_: &_| {};
                let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone());
                loop {
@@ -596,12 +634,33 @@ mod tests {
        }
 
        #[test]
-       fn test_persist_error() {
+       fn test_channel_manager_persist_error() {
                // Test that if we encounter an error during manager persistence, the thread panics.
                let nodes = create_nodes(2, "test_persist_error".to_string());
                open_channel!(nodes[0], nodes[1], 100000);
 
-               let persister = |_: &_| Err(std::io::Error::new(std::io::ErrorKind::Other, "test"));
+               struct ChannelManagerErrorPersister {
+                       data_dir: String,
+               }
+
+               impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L:Deref> super::Persister<Signer, M, T, K, F, L> for ChannelManagerErrorPersister where
+                       M::Target: 'static + chain::Watch<Signer>,
+                       T::Target: 'static + BroadcasterInterface,
+                       K::Target: 'static + KeysInterface<Signer = Signer>,
+                       F::Target: 'static + FeeEstimator,
+                       L::Target: 'static + Logger,
+               {
+                       fn persist_manager(&self, _channel_manager: &ChannelManager<Signer, M, T, K, F, L>) -> Result<(), std::io::Error> {
+                               Err(std::io::Error::new(std::io::ErrorKind::Other, "test"))
+                       }
+
+                       fn persist_graph(&self, network_graph: &NetworkGraph) -> Result<(), std::io::Error> {
+                               FilesystemPersister::persist_network_graph(self.data_dir.clone(), network_graph)
+                       }
+               }
+
+               let data_dir = nodes[0].persister.get_data_dir();
+               let persister = ChannelManagerErrorPersister{ data_dir };
                let event_handler = |_: &_| {};
                let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone());
                match bg_processor.join() {
@@ -613,19 +672,57 @@ mod tests {
                }
        }
 
+       #[test]
+       fn test_network_graph_persist_error() {
+               // Test that if we encounter an error during network graph persistence, an error gets returned.
+               let nodes = create_nodes(2, "test_persist_network_graph_error".to_string());
+               struct NetworkGraphErrorPersister {
+                       data_dir: String,
+               }
+
+               impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L:Deref> super::Persister<Signer, M, T, K, F, L> for NetworkGraphErrorPersister where
+                       M::Target: 'static + chain::Watch<Signer>,
+                       T::Target: 'static + BroadcasterInterface,
+                       K::Target: 'static + KeysInterface<Signer = Signer>,
+                       F::Target: 'static + FeeEstimator,
+                       L::Target: 'static + Logger,
+               {
+                       fn persist_manager(&self, channel_manager: &ChannelManager<Signer, M, T, K, F, L>) -> Result<(), std::io::Error> {
+                               FilesystemPersister::persist_manager(self.data_dir.clone(), channel_manager)
+                       }
+
+                       fn persist_graph(&self, _network_graph: &NetworkGraph) -> Result<(), std::io::Error> {
+                               Err(std::io::Error::new(std::io::ErrorKind::Other, "test"))
+                       }
+               }
+
+               let data_dir = nodes[0].persister.get_data_dir();
+               let persister = NetworkGraphErrorPersister { data_dir };
+               let event_handler = |_: &_| {};
+               let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone());
+
+               match bg_processor.stop() {
+                       Ok(_) => panic!("Expected error persisting network graph"),
+                       Err(e) => {
+                               assert_eq!(e.kind(), std::io::ErrorKind::Other);
+                               assert_eq!(e.get_ref().unwrap().to_string(), "test");
+                       },
+               }
+       }
+
        #[test]
        fn test_background_event_handling() {
                let mut nodes = create_nodes(2, "test_background_event_handling".to_string());
                let channel_value = 100000;
                let data_dir = nodes[0].persister.get_data_dir();
-               let persister = move |node: &_| FilesystemPersister::persist_manager(data_dir.clone(), node);
+               let persister = Persister { data_dir: data_dir.clone() };
 
                // Set up a background event handler for FundingGenerationReady events.
                let (sender, receiver) = std::sync::mpsc::sync_channel(1);
                let event_handler = move |event: &Event| {
                        sender.send(handle_funding_generation_ready!(event, channel_value)).unwrap();
                };
-               let bg_processor = BackgroundProcessor::start(persister.clone(), event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone());
+               let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone());
 
                // Open a channel and check that the FundingGenerationReady event was handled.
                begin_open_channel!(nodes[0], nodes[1], channel_value);
@@ -649,7 +746,7 @@ mod tests {
                // Set up a background event handler for SpendableOutputs events.
                let (sender, receiver) = std::sync::mpsc::sync_channel(1);
                let event_handler = move |event: &Event| sender.send(event.clone()).unwrap();
-               let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone());
+               let bg_processor = BackgroundProcessor::start(Persister{ data_dir: data_dir.clone() }, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone());
 
                // Force close the channel and check that the SpendableOutputs event was handled.
                nodes[0].node.force_close_channel(&nodes[0].node.list_channels()[0].channel_id).unwrap();
@@ -675,7 +772,7 @@ mod tests {
 
                // Initiate the background processors to watch each node.
                let data_dir = nodes[0].persister.get_data_dir();
-               let persister = move |node: &ChannelManager<InMemorySigner, Arc<ChainMonitor>, Arc<test_utils::TestBroadcaster>, Arc<KeysManager>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>>| FilesystemPersister::persist_manager(data_dir.clone(), node);
+               let persister = Persister { data_dir };
                let scorer = Arc::new(Mutex::new(test_utils::TestScorer::with_penalty(0)));
                let router = DefaultRouter::new(Arc::clone(&nodes[0].network_graph), Arc::clone(&nodes[0].logger), random_seed_bytes);
                let invoice_payer = Arc::new(InvoicePayer::new(Arc::clone(&nodes[0].node), router, scorer, Arc::clone(&nodes[0].logger), |_: &_| {}, RetryAttempts(2)));
index ef914700a16302c0e5dacad1414d3e0e8c1eb03c..da64cb375da11d71f517bcd31c47c5686796b088 100644 (file)
@@ -16,6 +16,7 @@ extern crate libc;
 
 use bitcoin::hash_types::{BlockHash, Txid};
 use bitcoin::hashes::hex::{FromHex, ToHex};
+use lightning::routing::network_graph::NetworkGraph;
 use crate::util::DiskWriteable;
 use lightning::chain;
 use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
@@ -66,6 +67,12 @@ where
        }
 }
 
+impl DiskWriteable for NetworkGraph {
+       fn write_to_file(&self, writer: &mut fs::File) -> Result<(), std::io::Error> {
+               self.write(writer)
+       }
+}
+
 impl FilesystemPersister {
        /// Initialize a new FilesystemPersister and set the path to the individual channels'
        /// files.
@@ -103,6 +110,13 @@ impl FilesystemPersister {
                util::write_to_file(path, "manager".to_string(), manager)
        }
 
+       /// Write the provided `NetworkGraph` to the path provided at `FilesystemPersister`
+       /// initialization, within a file called "network_graph"
+       pub fn persist_network_graph(data_dir: String, network_graph: &NetworkGraph) -> Result<(), std::io::Error> {
+               let path = PathBuf::from(data_dir);
+               util::write_to_file(path, "network_graph".to_string(), network_graph)
+       }
+
        /// Read `ChannelMonitor`s from disk.
        pub fn read_channelmonitors<Signer: Sign, K: Deref> (
                &self, keys_manager: K