/// Prune the network graph of stale entries hourly.
const NETWORK_PRUNE_TIMER: u64 = 60 * 60;
-/// Trait which handles persisting a [`ChannelManager`] to disk.
-///
-/// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
-pub trait ChannelManagerPersister<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
+#[cfg(not(test))]
+const FIRST_NETWORK_PRUNE_TIMER: u64 = 60;
+#[cfg(test)]
+const FIRST_NETWORK_PRUNE_TIMER: u64 = 1;
+
+/// Trait that handles persisting a [`ChannelManager`] and [`NetworkGraph`] to disk.
+pub trait Persister<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
where
M::Target: 'static + chain::Watch<Signer>,
T::Target: 'static + BroadcasterInterface,
L::Target: 'static + Logger,
{
/// Persist the given [`ChannelManager`] to disk, returning an error if persistence failed
- /// (which will cause the [`BackgroundProcessor`] which called this method to exit.
- ///
- /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
+ /// (which will cause the [`BackgroundProcessor`] which called this method to exit).
fn persist_manager(&self, channel_manager: &ChannelManager<Signer, M, T, K, F, L>) -> Result<(), std::io::Error>;
-}
-impl<Fun, Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
-ChannelManagerPersister<Signer, M, T, K, F, L> for Fun where
- M::Target: 'static + chain::Watch<Signer>,
- T::Target: 'static + BroadcasterInterface,
- K::Target: 'static + KeysInterface<Signer = Signer>,
- F::Target: 'static + FeeEstimator,
- L::Target: 'static + Logger,
- Fun: Fn(&ChannelManager<Signer, M, T, K, F, L>) -> Result<(), std::io::Error>,
-{
- fn persist_manager(&self, channel_manager: &ChannelManager<Signer, M, T, K, F, L>) -> Result<(), std::io::Error> {
- self(channel_manager)
- }
+ /// Persist the given [`NetworkGraph`] to disk, returning an error if persistence failed.
+ fn persist_graph(&self, network_graph: &NetworkGraph) -> Result<(), std::io::Error>;
}
/// Decorates an [`EventHandler`] with common functionality provided by standard [`EventHandler`]s.
/// documentation].
///
/// The thread runs indefinitely unless the object is dropped, [`stop`] is called, or
- /// `persist_manager` returns an error. In case of an error, the error is retrieved by calling
+ /// [`Persister::persist_manager`] returns an error. In case of an error, the error is retrieved by calling
/// either [`join`] or [`stop`].
///
/// # Data Persistence
///
- /// `persist_manager` is responsible for writing out the [`ChannelManager`] to disk, and/or
+ /// [`Persister::persist_manager`] is responsible for writing out the [`ChannelManager`] to disk, and/or
/// uploading to one or more backup services. See [`ChannelManager::write`] for writing out a
/// [`ChannelManager`]. See [`FilesystemPersister::persist_manager`] for Rust-Lightning's
/// provided implementation.
///
- /// Typically, users should either implement [`ChannelManagerPersister`] to never return an
+ /// [`Persister::persist_graph`] is responsible for writing out the [`NetworkGraph`] to disk. See
+ /// [`NetworkGraph::write`] for writing out a [`NetworkGraph`]. See [`FilesystemPersister::persist_network_graph`]
+ /// for Rust-Lightning's provided implementation.
+ ///
+ /// Typically, users should either implement [`Persister::persist_manager`] to never return an
/// error or call [`join`] and handle any error that may arise. For the latter case,
/// `BackgroundProcessor` must be restarted by calling `start` again after handling the error.
///
/// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
/// [`ChannelManager::write`]: lightning::ln::channelmanager::ChannelManager#impl-Writeable
/// [`FilesystemPersister::persist_manager`]: lightning_persister::FilesystemPersister::persist_manager
+ /// [`FilesystemPersister::persist_network_graph`]: lightning_persister::FilesystemPersister::persist_network_graph
/// [`NetworkGraph`]: lightning::routing::network_graph::NetworkGraph
+ /// [`NetworkGraph::write`]: lightning::routing::network_graph::NetworkGraph#impl-Writeable
pub fn start<
Signer: 'static + Sign,
CA: 'static + Deref + Send + Sync,
CMH: 'static + Deref + Send + Sync,
RMH: 'static + Deref + Send + Sync,
EH: 'static + EventHandler + Send,
- CMP: 'static + Send + ChannelManagerPersister<Signer, CW, T, K, F, L>,
+ PS: 'static + Send + Persister<Signer, CW, T, K, F, L>,
M: 'static + Deref<Target = ChainMonitor<Signer, CF, T, F, L, P>> + Send + Sync,
CM: 'static + Deref<Target = ChannelManager<Signer, CW, T, K, F, L>> + Send + Sync,
NG: 'static + Deref<Target = NetGraphMsgHandler<G, CA, L>> + Send + Sync,
UMH: 'static + Deref + Send + Sync,
PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, L, UMH>> + Send + Sync,
>(
- persister: CMP, event_handler: EH, chain_monitor: M, channel_manager: CM,
+ persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
net_graph_msg_handler: Option<NG>, peer_manager: PM, logger: L
) -> Self
where
// falling back to our usual hourly prunes. This avoids short-lived clients never
// pruning their network graph. We run once 60 seconds after startup before
// continuing our normal cadence.
- if last_prune_call.elapsed().as_secs() > if have_pruned { NETWORK_PRUNE_TIMER } else { 60 } {
+ if last_prune_call.elapsed().as_secs() > if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER } {
if let Some(ref handler) = net_graph_msg_handler {
log_trace!(logger, "Pruning network graph of stale entries");
- handler.network_graph().remove_stale_channels();
+ handler.network_graph().remove_stale_channels();
+ if let Err(e) = persister.persist_graph(handler.network_graph()) {
+ log_error!(logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
+ }
last_prune_call = Instant::now();
have_pruned = true;
}
}
}
+
// After we exit, ensure we persist the ChannelManager one final time - this avoids
// some races where users quit while channel updates were in-flight, with
// ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
- persister.persist_manager(&*channel_manager)
+ persister.persist_manager(&*channel_manager)?;
+
+ // Persist NetworkGraph on exit
+ if let Some(ref handler) = net_graph_msg_handler {
+ persister.persist_graph(handler.network_graph())?;
+ }
+ Ok(())
});
Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
}
use bitcoin::blockdata::constants::genesis_block;
use bitcoin::blockdata::transaction::{Transaction, TxOut};
use bitcoin::network::constants::Network;
- use lightning::chain::{BestBlock, Confirm, chainmonitor};
+ use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
+ use lightning::chain::{BestBlock, Confirm, chainmonitor, self};
use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
- use lightning::chain::keysinterface::{InMemorySigner, Recipient, KeysInterface, KeysManager};
+ use lightning::chain::keysinterface::{InMemorySigner, Recipient, KeysInterface, KeysManager, Sign};
use lightning::chain::transaction::OutPoint;
use lightning::get_event_msg;
use lightning::ln::channelmanager::{BREAKDOWN_TIMEOUT, ChainParameters, ChannelManager, SimpleArcChannelManager};
use lightning::routing::network_graph::{NetworkGraph, NetGraphMsgHandler};
use lightning::util::config::UserConfig;
use lightning::util::events::{Event, MessageSendEventsProvider, MessageSendEvent};
+ use lightning::util::logger::Logger;
use lightning::util::ser::Writeable;
use lightning::util::test_utils;
use lightning_invoice::payment::{InvoicePayer, RetryAttempts};
use lightning_invoice::utils::DefaultRouter;
use lightning_persister::FilesystemPersister;
use std::fs;
+ use std::ops::Deref;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use std::time::Duration;
}
}
+ struct Persister {
+ data_dir: String,
+ }
+
+ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L:Deref> super::Persister<Signer, M, T, K, F, L> for Persister where
+ M::Target: 'static + chain::Watch<Signer>,
+ T::Target: 'static + BroadcasterInterface,
+ K::Target: 'static + KeysInterface<Signer = Signer>,
+ F::Target: 'static + FeeEstimator,
+ L::Target: 'static + Logger,
+ {
+ fn persist_manager(&self, channel_manager: &ChannelManager<Signer, M, T, K, F, L>) -> Result<(), std::io::Error> {
+ FilesystemPersister::persist_manager(self.data_dir.clone(), channel_manager)
+ }
+
+ fn persist_graph(&self, network_graph: &NetworkGraph) -> Result<(), std::io::Error> {
+ FilesystemPersister::persist_network_graph(self.data_dir.clone(), network_graph)
+ }
+ }
+
fn get_full_filepath(filepath: String, filename: String) -> String {
let mut path = PathBuf::from(filepath);
path.push(filename);
// Initiate the background processors to watch each node.
let data_dir = nodes[0].persister.get_data_dir();
- let persister = move |node: &ChannelManager<InMemorySigner, Arc<ChainMonitor>, Arc<test_utils::TestBroadcaster>, Arc<KeysManager>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>>| FilesystemPersister::persist_manager(data_dir.clone(), node);
+ let persister = Persister { data_dir };
let event_handler = |_: &_| {};
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone());
let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "manager".to_string());
let mut expected_bytes = Vec::new();
check_persisted_data!(nodes[0].node, filepath.clone(), expected_bytes);
+
loop {
if !nodes[0].node.get_persistence_condvar_value() { break }
}
if !nodes[0].node.get_persistence_condvar_value() { break }
}
+ // Check network graph is persisted
+ let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "network_graph".to_string());
+ let mut expected_bytes = Vec::new();
+ if let Some(ref handler) = nodes[0].net_graph_msg_handler {
+ let network_graph = handler.network_graph();
+ check_persisted_data!(network_graph, filepath.clone(), expected_bytes);
+ }
+
assert!(bg_processor.stop().is_ok());
}
// `FRESHNESS_TIMER`.
let nodes = create_nodes(1, "test_timer_tick_called".to_string());
let data_dir = nodes[0].persister.get_data_dir();
- let persister = move |node: &ChannelManager<InMemorySigner, Arc<ChainMonitor>, Arc<test_utils::TestBroadcaster>, Arc<KeysManager>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>>| FilesystemPersister::persist_manager(data_dir.clone(), node);
+ let persister = Persister { data_dir };
let event_handler = |_: &_| {};
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone());
loop {
}
#[test]
- fn test_persist_error() {
+ fn test_channel_manager_persist_error() {
// Test that if we encounter an error during manager persistence, the thread panics.
let nodes = create_nodes(2, "test_persist_error".to_string());
open_channel!(nodes[0], nodes[1], 100000);
- let persister = |_: &_| Err(std::io::Error::new(std::io::ErrorKind::Other, "test"));
+ struct ChannelManagerErrorPersister {
+ data_dir: String,
+ }
+
+ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L:Deref> super::Persister<Signer, M, T, K, F, L> for ChannelManagerErrorPersister where
+ M::Target: 'static + chain::Watch<Signer>,
+ T::Target: 'static + BroadcasterInterface,
+ K::Target: 'static + KeysInterface<Signer = Signer>,
+ F::Target: 'static + FeeEstimator,
+ L::Target: 'static + Logger,
+ {
+ fn persist_manager(&self, _channel_manager: &ChannelManager<Signer, M, T, K, F, L>) -> Result<(), std::io::Error> {
+ Err(std::io::Error::new(std::io::ErrorKind::Other, "test"))
+ }
+
+ fn persist_graph(&self, network_graph: &NetworkGraph) -> Result<(), std::io::Error> {
+ FilesystemPersister::persist_network_graph(self.data_dir.clone(), network_graph)
+ }
+ }
+
+ let data_dir = nodes[0].persister.get_data_dir();
+ let persister = ChannelManagerErrorPersister{ data_dir };
let event_handler = |_: &_| {};
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone());
match bg_processor.join() {
}
}
+ #[test]
+ fn test_network_graph_persist_error() {
+ // Test that if we encounter an error during network graph persistence, an error gets returned.
+ let nodes = create_nodes(2, "test_persist_network_graph_error".to_string());
+ struct NetworkGraphErrorPersister {
+ data_dir: String,
+ }
+
+ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L:Deref> super::Persister<Signer, M, T, K, F, L> for NetworkGraphErrorPersister where
+ M::Target: 'static + chain::Watch<Signer>,
+ T::Target: 'static + BroadcasterInterface,
+ K::Target: 'static + KeysInterface<Signer = Signer>,
+ F::Target: 'static + FeeEstimator,
+ L::Target: 'static + Logger,
+ {
+ fn persist_manager(&self, channel_manager: &ChannelManager<Signer, M, T, K, F, L>) -> Result<(), std::io::Error> {
+ FilesystemPersister::persist_manager(self.data_dir.clone(), channel_manager)
+ }
+
+ fn persist_graph(&self, _network_graph: &NetworkGraph) -> Result<(), std::io::Error> {
+ Err(std::io::Error::new(std::io::ErrorKind::Other, "test"))
+ }
+ }
+
+ let data_dir = nodes[0].persister.get_data_dir();
+ let persister = NetworkGraphErrorPersister { data_dir };
+ let event_handler = |_: &_| {};
+ let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone());
+
+ match bg_processor.stop() {
+ Ok(_) => panic!("Expected error persisting network graph"),
+ Err(e) => {
+ assert_eq!(e.kind(), std::io::ErrorKind::Other);
+ assert_eq!(e.get_ref().unwrap().to_string(), "test");
+ },
+ }
+ }
+
#[test]
fn test_background_event_handling() {
let mut nodes = create_nodes(2, "test_background_event_handling".to_string());
let channel_value = 100000;
let data_dir = nodes[0].persister.get_data_dir();
- let persister = move |node: &_| FilesystemPersister::persist_manager(data_dir.clone(), node);
+ let persister = Persister { data_dir: data_dir.clone() };
// Set up a background event handler for FundingGenerationReady events.
let (sender, receiver) = std::sync::mpsc::sync_channel(1);
let event_handler = move |event: &Event| {
sender.send(handle_funding_generation_ready!(event, channel_value)).unwrap();
};
- let bg_processor = BackgroundProcessor::start(persister.clone(), event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone());
+ let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone());
// Open a channel and check that the FundingGenerationReady event was handled.
begin_open_channel!(nodes[0], nodes[1], channel_value);
// Set up a background event handler for SpendableOutputs events.
let (sender, receiver) = std::sync::mpsc::sync_channel(1);
let event_handler = move |event: &Event| sender.send(event.clone()).unwrap();
- let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone());
+ let bg_processor = BackgroundProcessor::start(Persister{ data_dir: data_dir.clone() }, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone());
// Force close the channel and check that the SpendableOutputs event was handled.
nodes[0].node.force_close_channel(&nodes[0].node.list_channels()[0].channel_id).unwrap();
// Initiate the background processors to watch each node.
let data_dir = nodes[0].persister.get_data_dir();
- let persister = move |node: &ChannelManager<InMemorySigner, Arc<ChainMonitor>, Arc<test_utils::TestBroadcaster>, Arc<KeysManager>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>>| FilesystemPersister::persist_manager(data_dir.clone(), node);
+ let persister = Persister { data_dir };
let scorer = Arc::new(Mutex::new(test_utils::TestScorer::with_penalty(0)));
let router = DefaultRouter::new(Arc::clone(&nodes[0].network_graph), Arc::clone(&nodes[0].logger), random_seed_bytes);
let invoice_payer = Arc::new(InvoicePayer::new(Arc::clone(&nodes[0].node), router, scorer, Arc::clone(&nodes[0].logger), |_: &_| {}, RetryAttempts(2)));