X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning-background-processor%2Fsrc%2Flib.rs;h=8f6ed657b3d8e116b5913ebb9b75e4dbdc218e41;hb=refs%2Fheads%2F2022-01-background-persist-exit;hp=c7d55ae61b7ecdc9ebb586654efd76a0dd650397;hpb=e635db0da31841e8a8ac5c5450c8f595541a6ced;p=rust-lightning diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index c7d55ae6..8f6ed657 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -10,14 +10,12 @@ use lightning::chain; use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator}; -use lightning::chain::chainmonitor::ChainMonitor; -use lightning::chain::channelmonitor; +use lightning::chain::chainmonitor::{ChainMonitor, Persist}; use lightning::chain::keysinterface::{Sign, KeysInterface}; use lightning::ln::channelmanager::ChannelManager; use lightning::ln::msgs::{ChannelMessageHandler, RoutingMessageHandler}; -use lightning::ln::peer_handler::{PeerManager, SocketDescriptor}; -use lightning::ln::peer_handler::CustomMessageHandler; -use lightning::routing::network_graph::NetGraphMsgHandler; +use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor}; +use lightning::routing::network_graph::{NetworkGraph, NetGraphMsgHandler}; use lightning::util::events::{Event, EventHandler, EventsProvider}; use lightning::util::logger::Logger; use std::sync::Arc; @@ -36,6 +34,8 @@ use std::ops::Deref; /// [`ChannelManager`] persistence should be done in the background. /// * Calling [`ChannelManager::timer_tick_occurred`] and [`PeerManager::timer_tick_occurred`] /// at the appropriate intervals. +/// * Calling [`NetworkGraph::remove_stale_channels`] (if a [`NetGraphMsgHandler`] is provided to +/// [`BackgroundProcessor::start`]). /// /// It will also call [`PeerManager::process_events`] periodically though this shouldn't be relied /// upon as doing so may result in high latency. @@ -60,13 +60,18 @@ const FRESHNESS_TIMER: u64 = 60; #[cfg(test)] const FRESHNESS_TIMER: u64 = 1; -#[cfg(not(debug_assertions))] +#[cfg(all(not(test), not(debug_assertions)))] const PING_TIMER: u64 = 5; /// Signature operations take a lot longer without compiler optimisations. /// Increasing the ping timer allows for this but slower devices will be disconnected if the /// timeout is reached. -#[cfg(debug_assertions)] +#[cfg(all(not(test), debug_assertions))] const PING_TIMER: u64 = 30; +#[cfg(test)] +const PING_TIMER: u64 = 1; + +/// Prune the network graph of stale entries hourly. +const NETWORK_PRUNE_TIMER: u64 = 60 * 60; /// Trait which handles persisting a [`ChannelManager`] to disk. /// @@ -103,7 +108,8 @@ ChannelManagerPersister for Fun where /// Decorates an [`EventHandler`] with common functionality provided by standard [`EventHandler`]s. struct DecoratingEventHandler< E: EventHandler, - N: Deref>, + N: Deref>, + G: Deref, A: Deref, L: Deref, > @@ -114,10 +120,11 @@ where A::Target: chain::Access, L::Target: Logger { impl< E: EventHandler, - N: Deref>, + N: Deref>, + G: Deref, A: Deref, L: Deref, -> EventHandler for DecoratingEventHandler +> EventHandler for DecoratingEventHandler where A::Target: chain::Access, L::Target: Logger { fn handle_event(&self, event: &Event) { if let Some(event_handler) = &self.net_graph_msg_handler { @@ -153,7 +160,7 @@ impl BackgroundProcessor { /// functionality implemented by other handlers. /// * [`NetGraphMsgHandler`] if given will update the [`NetworkGraph`] based on payment failures. /// - /// [top-level documentation]: Self + /// [top-level documentation]: BackgroundProcessor /// [`join`]: Self::join /// [`stop`]: Self::stop /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager @@ -168,16 +175,17 @@ impl BackgroundProcessor { T: 'static + Deref + Send + Sync, K: 'static + Deref + Send + Sync, F: 'static + Deref + Send + Sync, + G: 'static + Deref + Send + Sync, L: 'static + Deref + Send + Sync, P: 'static + Deref + Send + Sync, Descriptor: 'static + SocketDescriptor + Send + Sync, CMH: 'static + Deref + Send + Sync, RMH: 'static + Deref + Send + Sync, - EH: 'static + EventHandler + Send + Sync, + EH: 'static + EventHandler + Send, CMP: 'static + Send + ChannelManagerPersister, M: 'static + Deref> + Send + Sync, CM: 'static + Deref> + Send + Sync, - NG: 'static + Deref> + Send + Sync, + NG: 'static + Deref> + Send + Sync, UMH: 'static + Deref + Send + Sync, PM: 'static + Deref> + Send + Sync, >( @@ -192,7 +200,7 @@ impl BackgroundProcessor { K::Target: 'static + KeysInterface, F::Target: 'static + FeeEstimator, L::Target: 'static + Logger, - P::Target: 'static + channelmonitor::Persist, + P::Target: 'static + Persist, CMH::Target: 'static + ChannelMessageHandler, RMH::Target: 'static + RoutingMessageHandler, UMH::Target: 'static + CustomMessageHandler, @@ -200,13 +208,16 @@ impl BackgroundProcessor { let stop_thread = Arc::new(AtomicBool::new(false)); let stop_thread_clone = stop_thread.clone(); let handle = thread::spawn(move || -> Result<(), std::io::Error> { - let event_handler = DecoratingEventHandler { event_handler, net_graph_msg_handler }; + let event_handler = DecoratingEventHandler { event_handler, net_graph_msg_handler: net_graph_msg_handler.as_ref().map(|t| t.deref()) }; log_trace!(logger, "Calling ChannelManager's timer_tick_occurred on startup"); channel_manager.timer_tick_occurred(); let mut last_freshness_call = Instant::now(); let mut last_ping_call = Instant::now(); + let mut last_prune_call = Instant::now(); + let mut have_pruned = false; + loop { peer_manager.process_events(); channel_manager.process_pending_events(&event_handler); @@ -214,12 +225,14 @@ impl BackgroundProcessor { let updates_available = channel_manager.await_persistable_update_timeout(Duration::from_millis(100)); if updates_available { + log_trace!(logger, "Persisting ChannelManager..."); persister.persist_manager(&*channel_manager)?; + log_trace!(logger, "Done persisting ChannelManager."); } // Exit the loop if the background processor was requested to stop. if stop_thread.load(Ordering::Acquire) == true { log_trace!(logger, "Terminating background processor."); - return Ok(()); + break; } if last_freshness_call.elapsed().as_secs() > FRESHNESS_TIMER { log_trace!(logger, "Calling ChannelManager's timer_tick_occurred"); @@ -235,15 +248,31 @@ impl BackgroundProcessor { // timer, we should have disconnected all sockets by now (and they're probably // dead anyway), so disconnect them by calling `timer_tick_occurred()` twice. log_trace!(logger, "Awoke after more than double our ping timer, disconnecting peers."); - peer_manager.timer_tick_occurred(); - peer_manager.timer_tick_occurred(); + peer_manager.disconnect_all_peers(); last_ping_call = Instant::now(); } else if last_ping_call.elapsed().as_secs() > PING_TIMER { log_trace!(logger, "Calling PeerManager's timer_tick_occurred"); peer_manager.timer_tick_occurred(); last_ping_call = Instant::now(); } + + // Note that we want to run a graph prune once not long after startup before + // falling back to our usual hourly prunes. This avoids short-lived clients never + // pruning their network graph. We run once 60 seconds after startup before + // continuing our normal cadence. + if last_prune_call.elapsed().as_secs() > if have_pruned { NETWORK_PRUNE_TIMER } else { 60 } { + if let Some(ref handler) = net_graph_msg_handler { + log_trace!(logger, "Pruning network graph of stale entries"); + handler.network_graph().remove_stale_channels(); + last_prune_call = Instant::now(); + have_pruned = true; + } + } } + // After we exit, ensure we persist the ChannelManager one final time - this avoids + // some races where users quit while channel updates were in-flight, with + // ChannelMonitor update(s) persisted without a corresponding ChannelManager update. + persister.persist_manager(&*channel_manager) }); Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) } } @@ -315,6 +344,8 @@ mod tests { use lightning::util::events::{Event, MessageSendEventsProvider, MessageSendEvent}; use lightning::util::ser::Writeable; use lightning::util::test_utils; + use lightning_invoice::payment::{InvoicePayer, RetryAttempts}; + use lightning_invoice::utils::DefaultRouter; use lightning_persister::FilesystemPersister; use std::fs; use std::path::PathBuf; @@ -338,11 +369,12 @@ mod tests { struct Node { node: Arc>, - net_graph_msg_handler: Option, Arc>>>, + net_graph_msg_handler: Option, Arc, Arc>>>, peer_manager: Arc, Arc, Arc, IgnoringMessageHandler>>, chain_monitor: Arc, persister: Arc, tx_broadcaster: Arc, + network_graph: Arc, logger: Arc, best_block: BestBlock, } @@ -380,11 +412,11 @@ mod tests { let best_block = BestBlock::from_genesis(network); let params = ChainParameters { network, best_block }; let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster.clone(), logger.clone(), keys_manager.clone(), UserConfig::default(), params)); - let network_graph = NetworkGraph::new(genesis_block.header.block_hash()); - let net_graph_msg_handler = Some(Arc::new(NetGraphMsgHandler::new(network_graph, Some(chain_source.clone()), logger.clone()))); + let network_graph = Arc::new(NetworkGraph::new(genesis_block.header.block_hash())); + let net_graph_msg_handler = Some(Arc::new(NetGraphMsgHandler::new(network_graph.clone(), Some(chain_source.clone()), logger.clone()))); let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new() )}; let peer_manager = Arc::new(PeerManager::new(msg_handler, keys_manager.get_node_secret(), &seed, logger.clone(), IgnoringMessageHandler{})); - let node = Node { node: manager, net_graph_msg_handler, peer_manager, chain_monitor, persister, tx_broadcaster, logger, best_block }; + let node = Node { node: manager, net_graph_msg_handler, peer_manager, chain_monitor, persister, tx_broadcaster, network_graph, logger, best_block }; nodes.push(node); } @@ -486,9 +518,10 @@ mod tests { macro_rules! check_persisted_data { ($node: expr, $filepath: expr, $expected_bytes: expr) => { - match $node.write(&mut $expected_bytes) { - Ok(()) => { - loop { + loop { + $expected_bytes.clear(); + match $node.write(&mut $expected_bytes) { + Ok(()) => { match std::fs::read($filepath) { Ok(bytes) => { if bytes == $expected_bytes { @@ -499,9 +532,9 @@ mod tests { }, Err(_) => continue } - } - }, - Err(e) => panic!("Unexpected error: {}", e) + }, + Err(e) => panic!("Unexpected error: {}", e) + } } } } @@ -620,4 +653,19 @@ mod tests { assert!(bg_processor.stop().is_ok()); } + + #[test] + fn test_invoice_payer() { + let nodes = create_nodes(2, "test_invoice_payer".to_string()); + + // Initiate the background processors to watch each node. + let data_dir = nodes[0].persister.get_data_dir(); + let persister = move |node: &ChannelManager, Arc, Arc, Arc, Arc>| FilesystemPersister::persist_manager(data_dir.clone(), node); + let router = DefaultRouter::new(Arc::clone(&nodes[0].network_graph), Arc::clone(&nodes[0].logger)); + let scorer = Arc::new(Mutex::new(test_utils::TestScorer::default())); + let invoice_payer = Arc::new(InvoicePayer::new(Arc::clone(&nodes[0].node), router, scorer, Arc::clone(&nodes[0].logger), |_: &_| {}, RetryAttempts(2))); + let event_handler = Arc::clone(&invoice_payer); + let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone()); + assert!(bg_processor.stop().is_ok()); + } }