X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning-background-processor%2Fsrc%2Flib.rs;h=0fef55a95c2c5369408613cebfa574e9b8b95c05;hb=d0c3fb745ddc0105838019f8a9a1ef04449d91ba;hp=55ac14c048c412e8a9d0a7f17318d62b7569b3f6;hpb=6b0a97be21706f44c5f3e9d08dd84c8b62389686;p=rust-lightning diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index 55ac14c0..0fef55a9 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -10,13 +10,14 @@ use lightning::chain; use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator}; -use lightning::chain::chainmonitor::ChainMonitor; -use lightning::chain::channelmonitor; +use lightning::chain::chainmonitor::{ChainMonitor, Persist}; use lightning::chain::keysinterface::{Sign, KeysInterface}; use lightning::ln::channelmanager::ChannelManager; use lightning::ln::msgs::{ChannelMessageHandler, RoutingMessageHandler}; use lightning::ln::peer_handler::{PeerManager, SocketDescriptor}; -use lightning::util::events::{EventHandler, EventsProvider}; +use lightning::ln::peer_handler::CustomMessageHandler; +use lightning::routing::network_graph::NetGraphMsgHandler; +use lightning::util::events::{Event, EventHandler, EventsProvider}; use lightning::util::logger::Logger; use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; @@ -25,20 +26,29 @@ use std::thread::JoinHandle; use std::time::{Duration, Instant}; use std::ops::Deref; -/// BackgroundProcessor takes care of tasks that (1) need to happen periodically to keep +/// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep /// Rust-Lightning running properly, and (2) either can or should be run in the background. Its /// responsibilities are: -/// * Monitoring whether the ChannelManager needs to be re-persisted to disk, and if so, +/// * Processing [`Event`]s with a user-provided [`EventHandler`]. +/// * Monitoring whether the [`ChannelManager`] needs to be re-persisted to disk, and if so, /// writing it to disk/backups by invoking the callback given to it at startup. -/// ChannelManager persistence should be done in the background. -/// * Calling `ChannelManager::timer_tick_occurred()` and -/// `PeerManager::timer_tick_occurred()` every minute (can be done in the -/// background). +/// [`ChannelManager`] persistence should be done in the background. +/// * Calling [`ChannelManager::timer_tick_occurred`] and [`PeerManager::timer_tick_occurred`] +/// at the appropriate intervals. /// -/// Note that if ChannelManager persistence fails and the persisted manager becomes out-of-date, -/// then there is a risk of channels force-closing on startup when the manager realizes it's -/// outdated. However, as long as `ChannelMonitor` backups are sound, no funds besides those used -/// for unilateral chain closure fees are at risk. +/// It will also call [`PeerManager::process_events`] periodically though this shouldn't be relied +/// upon as doing so may result in high latency. +/// +/// # Note +/// +/// If [`ChannelManager`] persistence fails and the persisted manager becomes out-of-date, then +/// there is a risk of channels force-closing on startup when the manager realizes it's outdated. +/// However, as long as [`ChannelMonitor`] backups are sound, no funds besides those used for +/// unilateral chain closure fees are at risk. +/// +/// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor +/// [`Event`]: lightning::util::events::Event +#[must_use = "BackgroundProcessor will immediately stop on drop. It should be stored until shutdown."] pub struct BackgroundProcessor { stop_thread: Arc, thread_handle: Option>>, @@ -49,6 +59,16 @@ const FRESHNESS_TIMER: u64 = 60; #[cfg(test)] const FRESHNESS_TIMER: u64 = 1; +#[cfg(all(not(test), not(debug_assertions)))] +const PING_TIMER: u64 = 5; +/// Signature operations take a lot longer without compiler optimisations. +/// Increasing the ping timer allows for this but slower devices will be disconnected if the +/// timeout is reached. +#[cfg(all(not(test), debug_assertions))] +const PING_TIMER: u64 = 30; +#[cfg(test)] +const PING_TIMER: u64 = 1; + /// Trait which handles persisting a [`ChannelManager`] to disk. /// /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager @@ -81,6 +101,33 @@ ChannelManagerPersister for Fun where } } +/// Decorates an [`EventHandler`] with common functionality provided by standard [`EventHandler`]s. +struct DecoratingEventHandler< + E: EventHandler, + N: Deref>, + A: Deref, + L: Deref, +> +where A::Target: chain::Access, L::Target: Logger { + event_handler: E, + net_graph_msg_handler: Option, +} + +impl< + E: EventHandler, + N: Deref>, + A: Deref, + L: Deref, +> EventHandler for DecoratingEventHandler +where A::Target: chain::Access, L::Target: Logger { + fn handle_event(&self, event: &Event) { + if let Some(event_handler) = &self.net_graph_msg_handler { + event_handler.handle_event(event); + } + self.event_handler.handle_event(event); + } +} + impl BackgroundProcessor { /// Start a background thread that takes care of responsibilities enumerated in the [top-level /// documentation]. @@ -89,23 +136,34 @@ impl BackgroundProcessor { /// `persist_manager` returns an error. In case of an error, the error is retrieved by calling /// either [`join`] or [`stop`]. /// - /// Typically, users should either implement [`ChannelManagerPersister`] to never return an - /// error or call [`join`] and handle any error that may arise. For the latter case, the - /// `BackgroundProcessor` must be restarted by calling `start` again after handling the error. + /// # Data Persistence /// /// `persist_manager` is responsible for writing out the [`ChannelManager`] to disk, and/or /// uploading to one or more backup services. See [`ChannelManager::write`] for writing out a /// [`ChannelManager`]. See [`FilesystemPersister::persist_manager`] for Rust-Lightning's /// provided implementation. /// - /// [top-level documentation]: Self + /// Typically, users should either implement [`ChannelManagerPersister`] to never return an + /// error or call [`join`] and handle any error that may arise. For the latter case, + /// `BackgroundProcessor` must be restarted by calling `start` again after handling the error. + /// + /// # Event Handling + /// + /// `event_handler` is responsible for handling events that users should be notified of (e.g., + /// payment failed). [`BackgroundProcessor`] may decorate the given [`EventHandler`] with common + /// functionality implemented by other handlers. + /// * [`NetGraphMsgHandler`] if given will update the [`NetworkGraph`] based on payment failures. + /// + /// [top-level documentation]: BackgroundProcessor /// [`join`]: Self::join /// [`stop`]: Self::stop /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager /// [`ChannelManager::write`]: lightning::ln::channelmanager::ChannelManager#impl-Writeable /// [`FilesystemPersister::persist_manager`]: lightning_persister::FilesystemPersister::persist_manager + /// [`NetworkGraph`]: lightning::routing::network_graph::NetworkGraph pub fn start< Signer: 'static + Sign, + CA: 'static + Deref + Send + Sync, CF: 'static + Deref + Send + Sync, CW: 'static + Deref + Send + Sync, T: 'static + Deref + Send + Sync, @@ -120,24 +178,36 @@ impl BackgroundProcessor { CMP: 'static + Send + ChannelManagerPersister, M: 'static + Deref> + Send + Sync, CM: 'static + Deref> + Send + Sync, - PM: 'static + Deref> + Send + Sync, - > - (persister: CMP, event_handler: EH, chain_monitor: M, channel_manager: CM, peer_manager: PM, logger: L) -> Self + NG: 'static + Deref> + Send + Sync, + UMH: 'static + Deref + Send + Sync, + PM: 'static + Deref> + Send + Sync, + >( + persister: CMP, event_handler: EH, chain_monitor: M, channel_manager: CM, + net_graph_msg_handler: Option, peer_manager: PM, logger: L + ) -> Self where + CA::Target: 'static + chain::Access, CF::Target: 'static + chain::Filter, CW::Target: 'static + chain::Watch, T::Target: 'static + BroadcasterInterface, K::Target: 'static + KeysInterface, F::Target: 'static + FeeEstimator, L::Target: 'static + Logger, - P::Target: 'static + channelmonitor::Persist, + P::Target: 'static + Persist, CMH::Target: 'static + ChannelMessageHandler, RMH::Target: 'static + RoutingMessageHandler, + UMH::Target: 'static + CustomMessageHandler, { let stop_thread = Arc::new(AtomicBool::new(false)); let stop_thread_clone = stop_thread.clone(); let handle = thread::spawn(move || -> Result<(), std::io::Error> { - let mut current_time = Instant::now(); + let event_handler = DecoratingEventHandler { event_handler, net_graph_msg_handler }; + + log_trace!(logger, "Calling ChannelManager's timer_tick_occurred on startup"); + channel_manager.timer_tick_occurred(); + + let mut last_freshness_call = Instant::now(); + let mut last_ping_call = Instant::now(); loop { peer_manager.process_events(); channel_manager.process_pending_events(&event_handler); @@ -152,11 +222,27 @@ impl BackgroundProcessor { log_trace!(logger, "Terminating background processor."); return Ok(()); } - if current_time.elapsed().as_secs() > FRESHNESS_TIMER { - log_trace!(logger, "Calling ChannelManager's and PeerManager's timer_tick_occurred"); + if last_freshness_call.elapsed().as_secs() > FRESHNESS_TIMER { + log_trace!(logger, "Calling ChannelManager's timer_tick_occurred"); channel_manager.timer_tick_occurred(); + last_freshness_call = Instant::now(); + } + if last_ping_call.elapsed().as_secs() > PING_TIMER * 2 { + // On various platforms, we may be starved of CPU cycles for several reasons. + // E.g. on iOS, if we've been in the background, we will be entirely paused. + // Similarly, if we're on a desktop platform and the device has been asleep, we + // may not get any cycles. + // In any case, if we've been entirely paused for more than double our ping + // timer, we should have disconnected all sockets by now (and they're probably + // dead anyway), so disconnect them by calling `timer_tick_occurred()` twice. + log_trace!(logger, "Awoke after more than double our ping timer, disconnecting peers."); peer_manager.timer_tick_occurred(); - current_time = Instant::now(); + peer_manager.timer_tick_occurred(); + last_ping_call = Instant::now(); + } else if last_ping_call.elapsed().as_secs() > PING_TIMER { + log_trace!(logger, "Calling PeerManager's timer_tick_occurred"); + peer_manager.timer_tick_occurred(); + last_ping_call = Instant::now(); } } }); @@ -223,8 +309,9 @@ mod tests { use lightning::get_event_msg; use lightning::ln::channelmanager::{BREAKDOWN_TIMEOUT, ChainParameters, ChannelManager, SimpleArcChannelManager}; use lightning::ln::features::InitFeatures; - use lightning::ln::msgs::ChannelMessageHandler; - use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor}; + use lightning::ln::msgs::{ChannelMessageHandler, Init}; + use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler}; + use lightning::routing::network_graph::{NetworkGraph, NetGraphMsgHandler}; use lightning::util::config::UserConfig; use lightning::util::events::{Event, MessageSendEventsProvider, MessageSendEvent}; use lightning::util::ser::Writeable; @@ -252,7 +339,8 @@ mod tests { struct Node { node: Arc>, - peer_manager: Arc, Arc, Arc>>, + net_graph_msg_handler: Option, Arc>>>, + peer_manager: Arc, Arc, Arc, IgnoringMessageHandler>>, chain_monitor: Arc, persister: Arc, tx_broadcaster: Arc, @@ -286,17 +374,28 @@ mod tests { let persister = Arc::new(FilesystemPersister::new(format!("{}_persister_{}", persist_dir, i))); let seed = [i as u8; 32]; let network = Network::Testnet; - let now = Duration::from_secs(genesis_block(network).header.time as u64); + let genesis_block = genesis_block(network); + let now = Duration::from_secs(genesis_block.header.time as u64); let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos())); let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(Some(chain_source.clone()), tx_broadcaster.clone(), logger.clone(), fee_estimator.clone(), persister.clone())); let best_block = BestBlock::from_genesis(network); let params = ChainParameters { network, best_block }; let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster.clone(), logger.clone(), keys_manager.clone(), UserConfig::default(), params)); + let network_graph = NetworkGraph::new(genesis_block.header.block_hash()); + let net_graph_msg_handler = Some(Arc::new(NetGraphMsgHandler::new(network_graph, Some(chain_source.clone()), logger.clone()))); let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new() )}; - let peer_manager = Arc::new(PeerManager::new(msg_handler, keys_manager.get_node_secret(), &seed, logger.clone())); - let node = Node { node: manager, peer_manager, chain_monitor, persister, tx_broadcaster, logger, best_block }; + let peer_manager = Arc::new(PeerManager::new(msg_handler, keys_manager.get_node_secret(), &seed, logger.clone(), IgnoringMessageHandler{})); + let node = Node { node: manager, net_graph_msg_handler, peer_manager, chain_monitor, persister, tx_broadcaster, logger, best_block }; nodes.push(node); } + + for i in 0..num_nodes { + for j in (i+1)..num_nodes { + nodes[i].node.peer_connected(&nodes[j].node.get_our_node_id(), &Init { features: InitFeatures::known() }); + nodes[j].node.peer_connected(&nodes[i].node.get_our_node_id(), &Init { features: InitFeatures::known() }); + } + } + nodes } @@ -305,7 +404,7 @@ mod tests { begin_open_channel!($node_a, $node_b, $channel_value); let events = $node_a.node.get_and_clear_pending_events(); assert_eq!(events.len(), 1); - let (temporary_channel_id, tx) = handle_funding_generation_ready!(events[0], $channel_value); + let (temporary_channel_id, tx) = handle_funding_generation_ready!(&events[0], $channel_value); end_open_channel!($node_a, $node_b, temporary_channel_id, tx); tx }} @@ -322,14 +421,14 @@ mod tests { macro_rules! handle_funding_generation_ready { ($event: expr, $channel_value: expr) => {{ match $event { - Event::FundingGenerationReady { ref temporary_channel_id, ref channel_value_satoshis, ref output_script, user_channel_id } => { - assert_eq!(*channel_value_satoshis, $channel_value); + &Event::FundingGenerationReady { temporary_channel_id, channel_value_satoshis, ref output_script, user_channel_id } => { + assert_eq!(channel_value_satoshis, $channel_value); assert_eq!(user_channel_id, 42); let tx = Transaction { version: 1 as i32, lock_time: 0, input: Vec::new(), output: vec![TxOut { - value: *channel_value_satoshis, script_pubkey: output_script.clone(), + value: channel_value_satoshis, script_pubkey: output_script.clone(), }]}; - (*temporary_channel_id, tx) + (temporary_channel_id, tx) }, _ => panic!("Unexpected event"), } @@ -383,8 +482,8 @@ mod tests { // Initiate the background processors to watch each node. let data_dir = nodes[0].persister.get_data_dir(); let persister = move |node: &ChannelManager, Arc, Arc, Arc, Arc>| FilesystemPersister::persist_manager(data_dir.clone(), node); - let event_handler = |_| {}; - let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone()); + let event_handler = |_: &_| {}; + let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone()); macro_rules! check_persisted_data { ($node: expr, $filepath: expr, $expected_bytes: expr) => { @@ -436,12 +535,14 @@ mod tests { let nodes = create_nodes(1, "test_timer_tick_called".to_string()); let data_dir = nodes[0].persister.get_data_dir(); let persister = move |node: &ChannelManager, Arc, Arc, Arc, Arc>| FilesystemPersister::persist_manager(data_dir.clone(), node); - let event_handler = |_| {}; - let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone()); + let event_handler = |_: &_| {}; + let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone()); loop { let log_entries = nodes[0].logger.lines.lock().unwrap(); - let desired_log = "Calling ChannelManager's and PeerManager's timer_tick_occurred".to_string(); - if log_entries.get(&("lightning_background_processor".to_string(), desired_log)).is_some() { + let desired_log = "Calling ChannelManager's timer_tick_occurred".to_string(); + let second_desired_log = "Calling PeerManager's timer_tick_occurred".to_string(); + if log_entries.get(&("lightning_background_processor".to_string(), desired_log)).is_some() && + log_entries.get(&("lightning_background_processor".to_string(), second_desired_log)).is_some() { break } } @@ -456,8 +557,8 @@ mod tests { open_channel!(nodes[0], nodes[1], 100000); let persister = |_: &_| Err(std::io::Error::new(std::io::ErrorKind::Other, "test")); - let event_handler = |_| {}; - let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone()); + let event_handler = |_: &_| {}; + let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone()); match bg_processor.join() { Ok(_) => panic!("Expected error persisting manager"), Err(e) => { @@ -476,10 +577,10 @@ mod tests { // Set up a background event handler for FundingGenerationReady events. let (sender, receiver) = std::sync::mpsc::sync_channel(1); - let event_handler = move |event| { + let event_handler = move |event: &Event| { sender.send(handle_funding_generation_ready!(event, channel_value)).unwrap(); }; - let bg_processor = BackgroundProcessor::start(persister.clone(), event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone()); + let bg_processor = BackgroundProcessor::start(persister.clone(), event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone()); // Open a channel and check that the FundingGenerationReady event was handled. begin_open_channel!(nodes[0], nodes[1], channel_value); @@ -502,8 +603,8 @@ mod tests { // Set up a background event handler for SpendableOutputs events. let (sender, receiver) = std::sync::mpsc::sync_channel(1); - let event_handler = move |event| sender.send(event).unwrap(); - let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone()); + let event_handler = move |event: &Event| sender.send(event.clone()).unwrap(); + let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone()); // Force close the channel and check that the SpendableOutputs event was handled. nodes[0].node.force_close_channel(&nodes[0].node.list_channels()[0].channel_id).unwrap(); @@ -514,6 +615,7 @@ mod tests { .expect("SpendableOutputs not handled within deadline"); match event { Event::SpendableOutputs { .. } => {}, + Event::ChannelClosed { .. } => {}, _ => panic!("Unexpected event: {:?}", event), }