X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=background-processor%2Fsrc%2Flib.rs;h=6d9db076fa44a6cd0fd2c749f76aa6c4448e18b7;hb=de6ddedba816cee1d2d979a6c498c2865b6b6939;hp=0c7191fbad2bd74fc8ebeafebfe044dbfa97ab96;hpb=03a518965100b6852f36e4f95ead4c1d93f5c4b0;p=rust-lightning diff --git a/background-processor/src/lib.rs b/background-processor/src/lib.rs index 0c7191fb..6d9db076 100644 --- a/background-processor/src/lib.rs +++ b/background-processor/src/lib.rs @@ -1,9 +1,19 @@ +//! Utilities that take care of tasks that (1) need to happen periodically to keep Rust-Lightning +//! running properly, and (2) either can or should be run in the background. See docs for +//! [`BackgroundProcessor`] for more details on the nitty-gritty. + +#![deny(broken_intra_doc_links)] +#![deny(missing_docs)] +#![deny(unsafe_code)] + #[macro_use] extern crate lightning; use lightning::chain; use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator}; use lightning::chain::keysinterface::{Sign, KeysInterface}; use lightning::ln::channelmanager::ChannelManager; +use lightning::ln::msgs::{ChannelMessageHandler, RoutingMessageHandler}; +use lightning::ln::peer_handler::{PeerManager, SocketDescriptor}; use lightning::util::logger::Logger; use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; @@ -40,52 +50,65 @@ impl BackgroundProcessor { /// Start a background thread that takes care of responsibilities enumerated in the top-level /// documentation. /// - /// If `persist_manager` returns an error, then this thread will return said error (and `start()` - /// will need to be called again to restart the `BackgroundProcessor`). Users should wait on - /// [`thread_handle`]'s `join()` method to be able to tell if and when an error is returned, or - /// implement `persist_manager` such that an error is never returned to the `BackgroundProcessor` + /// If `persist_manager` returns an error, then this thread will return said error (and + /// `start()` will need to be called again to restart the `BackgroundProcessor`). Users should + /// wait on [`thread_handle`]'s `join()` method to be able to tell if and when an error is + /// returned, or implement `persist_manager` such that an error is never returned to the + /// `BackgroundProcessor` /// - /// `persist_manager` is responsible for writing out the `ChannelManager` to disk, and/or uploading - /// to one or more backup services. See [`ChannelManager::write`] for writing out a `ChannelManager`. - /// See [`FilesystemPersister::persist_manager`] for Rust-Lightning's provided implementation. + /// `persist_manager` is responsible for writing out the [`ChannelManager`] to disk, and/or + /// uploading to one or more backup services. See [`ChannelManager::write`] for writing out a + /// [`ChannelManager`]. See [`FilesystemPersister::persist_manager`] for Rust-Lightning's + /// provided implementation. /// - /// [`thread_handle`]: struct.BackgroundProcessor.html#structfield.thread_handle - /// [`ChannelManager::write`]: ../lightning/ln/channelmanager/struct.ChannelManager.html#method.write - /// [`FilesystemPersister::persist_manager`]: ../lightning_persister/struct.FilesystemPersister.html#impl - pub fn start(persist_manager: PM, manager: Arc, Arc, Arc, Arc, Arc>>, logger: Arc) -> Self - where Signer: 'static + Sign, - M: 'static + chain::Watch, - T: 'static + BroadcasterInterface, - K: 'static + KeysInterface, - F: 'static + FeeEstimator, - L: 'static + Logger, - PM: 'static + Send + Fn(&ChannelManager, Arc, Arc, Arc, Arc>) -> Result<(), std::io::Error>, + /// [`thread_handle`]: BackgroundProcessor::thread_handle + /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager + /// [`ChannelManager::write`]: lightning::ln::channelmanager::ChannelManager#impl-Writeable + /// [`FilesystemPersister::persist_manager`]: lightning_persister::FilesystemPersister::persist_manager + pub fn start( + persist_channel_manager: PM, + channel_manager: Arc, Arc, Arc, Arc, Arc>>, + peer_manager: Arc, Arc, Arc>>, logger: Arc, + ) -> Self + where + Signer: 'static + Sign, + M: 'static + chain::Watch, + T: 'static + BroadcasterInterface, + K: 'static + KeysInterface, + F: 'static + FeeEstimator, + L: 'static + Logger, + CM: 'static + ChannelMessageHandler, + RM: 'static + RoutingMessageHandler, + PM: 'static + + Send + + Fn( + &ChannelManager, Arc, Arc, Arc, Arc>, + ) -> Result<(), std::io::Error>, { let stop_thread = Arc::new(AtomicBool::new(false)); let stop_thread_clone = stop_thread.clone(); let handle = thread::spawn(move || -> Result<(), std::io::Error> { let mut current_time = Instant::now(); loop { - let updates_available = manager.wait_timeout(Duration::from_millis(100)); + peer_manager.process_events(); + let updates_available = + channel_manager.await_persistable_update_timeout(Duration::from_millis(100)); if updates_available { - persist_manager(&*manager)?; + persist_channel_manager(&*channel_manager)?; } // Exit the loop if the background processor was requested to stop. if stop_thread.load(Ordering::Acquire) == true { log_trace!(logger, "Terminating background processor."); - return Ok(()) + return Ok(()); } if current_time.elapsed().as_secs() > CHAN_FRESHNESS_TIMER { log_trace!(logger, "Calling manager's timer_chan_freshness_every_min"); - manager.timer_chan_freshness_every_min(); + channel_manager.timer_chan_freshness_every_min(); current_time = Instant::now(); } } }); - Self { - stop_thread: stop_thread_clone, - thread_handle: handle, - } + Self { stop_thread: stop_thread_clone, thread_handle: handle } } /// Stop `BackgroundProcessor`'s thread. @@ -106,9 +129,10 @@ mod tests { use lightning::chain::keysinterface::{Sign, InMemorySigner, KeysInterface, KeysManager}; use lightning::chain::transaction::OutPoint; use lightning::get_event_msg; - use lightning::ln::channelmanager::{ChannelManager, SimpleArcChannelManager}; + use lightning::ln::channelmanager::{ChainParameters, ChannelManager, SimpleArcChannelManager}; use lightning::ln::features::InitFeatures; use lightning::ln::msgs::ChannelMessageHandler; + use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor}; use lightning::util::config::UserConfig; use lightning::util::events::{Event, EventsProvider, MessageSendEventsProvider, MessageSendEvent}; use lightning::util::logger::Logger; @@ -121,10 +145,21 @@ mod tests { use std::time::Duration; use super::BackgroundProcessor; + #[derive(Clone, Eq, Hash, PartialEq)] + struct TestDescriptor{} + impl SocketDescriptor for TestDescriptor { + fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize { + 0 + } + + fn disconnect_socket(&mut self) {} + } + type ChainMonitor = chainmonitor::ChainMonitor, Arc, Arc, Arc, Arc>; struct Node { node: Arc>, + peer_manager: Arc, Arc, Arc>>, persister: Arc, logger: Arc, } @@ -155,11 +190,19 @@ mod tests { let persister = Arc::new(FilesystemPersister::new(format!("{}_persister_{}", persist_dir, i))); let seed = [i as u8; 32]; let network = Network::Testnet; - let now = Duration::from_secs(genesis_block(network).header.time as u64); + let genesis_block = genesis_block(network); + let now = Duration::from_secs(genesis_block.header.time as u64); let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos())); let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(Some(chain_source.clone()), tx_broadcaster.clone(), logger.clone(), fee_estimator.clone(), persister.clone())); - let manager = Arc::new(ChannelManager::new(Network::Testnet, fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster, logger.clone(), keys_manager.clone(), UserConfig::default(), i)); - let node = Node { node: manager, persister, logger }; + let params = ChainParameters { + network, + latest_hash: genesis_block.block_hash(), + latest_height: 0, + }; + let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster, logger.clone(), keys_manager.clone(), UserConfig::default(), params)); + let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new() )}; + let peer_manager = Arc::new(PeerManager::new(msg_handler, keys_manager.get_node_secret(), &seed, logger.clone())); + let node = Node { node: manager, peer_manager, persister, logger }; nodes.push(node); } nodes @@ -203,7 +246,7 @@ mod tests { // Initiate the background processors to watch each node. let data_dir = nodes[0].persister.get_data_dir(); let callback = move |node: &ChannelManager, Arc, Arc, Arc, Arc>| FilesystemPersister::persist_manager(data_dir.clone(), node); - let bg_processor = BackgroundProcessor::start(callback, nodes[0].node.clone(), nodes[0].logger.clone()); + let bg_processor = BackgroundProcessor::start(callback, nodes[0].node.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone()); // Go through the channel creation process until each node should have something persisted. let tx = open_channel!(nodes[0], nodes[1], 100000); @@ -258,11 +301,11 @@ mod tests { let nodes = create_nodes(1, "test_chan_freshness_called".to_string()); let data_dir = nodes[0].persister.get_data_dir(); let callback = move |node: &ChannelManager, Arc, Arc, Arc, Arc>| FilesystemPersister::persist_manager(data_dir.clone(), node); - let bg_processor = BackgroundProcessor::start(callback, nodes[0].node.clone(), nodes[0].logger.clone()); + let bg_processor = BackgroundProcessor::start(callback, nodes[0].node.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone()); loop { let log_entries = nodes[0].logger.lines.lock().unwrap(); let desired_log = "Calling manager's timer_chan_freshness_every_min".to_string(); - if log_entries.get(&("background_processor".to_string(), desired_log)).is_some() { + if log_entries.get(&("lightning_background_processor".to_string(), desired_log)).is_some() { break } } @@ -285,7 +328,7 @@ mod tests { } let nodes = create_nodes(2, "test_persist_error".to_string()); - let bg_processor = BackgroundProcessor::start(persist_manager, nodes[0].node.clone(), nodes[0].logger.clone()); + let bg_processor = BackgroundProcessor::start(persist_manager, nodes[0].node.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone()); open_channel!(nodes[0], nodes[1], 100000); let _ = bg_processor.thread_handle.join().unwrap().expect_err("Errored persisting manager: test");