X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;ds=inline;f=background-processor%2Fsrc%2Flib.rs;h=248870073658ecb2b9119bcda45e2872a427fe4a;hb=8088e4ba1505c026ac91f8c1489fb7d53d6b656e;hp=0a09ae95c209e41e6040f85445b8bad4062beb91;hpb=523fcb6f3f16b8cd79cd2c5f21c4f92923ccc0e3;p=rust-lightning diff --git a/background-processor/src/lib.rs b/background-processor/src/lib.rs index 0a09ae95..24887007 100644 --- a/background-processor/src/lib.rs +++ b/background-processor/src/lib.rs @@ -1,9 +1,19 @@ +//! Utilities that take care of tasks that (1) need to happen periodically to keep Rust-Lightning +//! running properly, and (2) either can or should be run in the background. See docs for +//! [`BackgroundProcessor`] for more details on the nitty-gritty. + +#![deny(broken_intra_doc_links)] +#![deny(missing_docs)] +#![deny(unsafe_code)] + #[macro_use] extern crate lightning; use lightning::chain; use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator}; use lightning::chain::keysinterface::{Sign, KeysInterface}; use lightning::ln::channelmanager::ChannelManager; +use lightning::ln::msgs::{ChannelMessageHandler, RoutingMessageHandler}; +use lightning::ln::peer_handler::{PeerManager, SocketDescriptor}; use lightning::util::logger::Logger; use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; @@ -40,52 +50,65 @@ impl BackgroundProcessor { /// Start a background thread that takes care of responsibilities enumerated in the top-level /// documentation. /// - /// If `persist_manager` returns an error, then this thread will return said error (and `start()` - /// will need to be called again to restart the `BackgroundProcessor`). Users should wait on - /// [`thread_handle`]'s `join()` method to be able to tell if and when an error is returned, or - /// implement `persist_manager` such that an error is never returned to the `BackgroundProcessor` + /// If `persist_manager` returns an error, then this thread will return said error (and + /// `start()` will need to be called again to restart the `BackgroundProcessor`). Users should + /// wait on [`thread_handle`]'s `join()` method to be able to tell if and when an error is + /// returned, or implement `persist_manager` such that an error is never returned to the + /// `BackgroundProcessor` /// - /// `persist_manager` is responsible for writing out the `ChannelManager` to disk, and/or uploading - /// to one or more backup services. See [`ChannelManager::write`] for writing out a `ChannelManager`. - /// See [`FilesystemPersister::persist_manager`] for Rust-Lightning's provided implementation. + /// `persist_manager` is responsible for writing out the [`ChannelManager`] to disk, and/or + /// uploading to one or more backup services. See [`ChannelManager::write`] for writing out a + /// [`ChannelManager`]. See [`FilesystemPersister::persist_manager`] for Rust-Lightning's + /// provided implementation. /// - /// [`thread_handle`]: struct.BackgroundProcessor.html#structfield.thread_handle - /// [`ChannelManager::write`]: ../lightning/ln/channelmanager/struct.ChannelManager.html#method.write - /// [`FilesystemPersister::persist_manager`]: ../lightning_persister/struct.FilesystemPersister.html#impl - pub fn start(persist_manager: PM, manager: Arc, Arc, Arc, Arc, Arc>>, logger: Arc) -> Self - where Signer: 'static + Sign, - M: 'static + chain::Watch, - T: 'static + BroadcasterInterface, - K: 'static + KeysInterface, - F: 'static + FeeEstimator, - L: 'static + Logger, - PM: 'static + Send + Fn(&ChannelManager, Arc, Arc, Arc, Arc>) -> Result<(), std::io::Error>, + /// [`thread_handle`]: BackgroundProcessor::thread_handle + /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager + /// [`ChannelManager::write`]: lightning::ln::channelmanager::ChannelManager#impl-Writeable + /// [`FilesystemPersister::persist_manager`]: lightning_persister::FilesystemPersister::persist_manager + pub fn start( + persist_channel_manager: PM, + channel_manager: Arc, Arc, Arc, Arc, Arc>>, + peer_manager: Arc, Arc, Arc>>, logger: Arc, + ) -> Self + where + Signer: 'static + Sign, + M: 'static + chain::Watch, + T: 'static + BroadcasterInterface, + K: 'static + KeysInterface, + F: 'static + FeeEstimator, + L: 'static + Logger, + CM: 'static + ChannelMessageHandler, + RM: 'static + RoutingMessageHandler, + PM: 'static + + Send + + Fn( + &ChannelManager, Arc, Arc, Arc, Arc>, + ) -> Result<(), std::io::Error>, { let stop_thread = Arc::new(AtomicBool::new(false)); let stop_thread_clone = stop_thread.clone(); let handle = thread::spawn(move || -> Result<(), std::io::Error> { let mut current_time = Instant::now(); loop { - let updates_available = manager.wait_timeout(Duration::from_millis(100)); + peer_manager.process_events(); + let updates_available = + channel_manager.await_persistable_update_timeout(Duration::from_millis(100)); if updates_available { - persist_manager(&*manager)?; + persist_channel_manager(&*channel_manager)?; } // Exit the loop if the background processor was requested to stop. if stop_thread.load(Ordering::Acquire) == true { log_trace!(logger, "Terminating background processor."); - return Ok(()) + return Ok(()); } if current_time.elapsed().as_secs() > CHAN_FRESHNESS_TIMER { log_trace!(logger, "Calling manager's timer_chan_freshness_every_min"); - manager.timer_chan_freshness_every_min(); + channel_manager.timer_chan_freshness_every_min(); current_time = Instant::now(); } } }); - Self { - stop_thread: stop_thread_clone, - thread_handle: handle, - } + Self { stop_thread: stop_thread_clone, thread_handle: handle } } /// Stop `BackgroundProcessor`'s thread. @@ -106,9 +129,10 @@ mod tests { use lightning::chain::keysinterface::{Sign, InMemorySigner, KeysInterface, KeysManager}; use lightning::chain::transaction::OutPoint; use lightning::get_event_msg; - use lightning::ln::channelmanager::{ChannelManager, SimpleArcChannelManager}; + use lightning::ln::channelmanager::{ChainParameters, ChannelManager, SimpleArcChannelManager}; use lightning::ln::features::InitFeatures; use lightning::ln::msgs::ChannelMessageHandler; + use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor}; use lightning::util::config::UserConfig; use lightning::util::events::{Event, EventsProvider, MessageSendEventsProvider, MessageSendEvent}; use lightning::util::logger::Logger; @@ -121,10 +145,21 @@ mod tests { use std::time::Duration; use super::BackgroundProcessor; + #[derive(Clone, Eq, Hash, PartialEq)] + struct TestDescriptor{} + impl SocketDescriptor for TestDescriptor { + fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize { + 0 + } + + fn disconnect_socket(&mut self) {} + } + type ChainMonitor = chainmonitor::ChainMonitor, Arc, Arc, Arc, Arc>; struct Node { - node: SimpleArcChannelManager, + node: Arc>, + peer_manager: Arc, Arc, Arc>>, persister: Arc, logger: Arc, } @@ -155,11 +190,19 @@ mod tests { let persister = Arc::new(FilesystemPersister::new(format!("{}_persister_{}", persist_dir, i))); let seed = [i as u8; 32]; let network = Network::Testnet; - let now = Duration::from_secs(genesis_block(network).header.time as u64); + let genesis_block = genesis_block(network); + let now = Duration::from_secs(genesis_block.header.time as u64); let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos())); let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(Some(chain_source.clone()), tx_broadcaster.clone(), logger.clone(), fee_estimator.clone(), persister.clone())); - let manager = Arc::new(ChannelManager::new(Network::Testnet, fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster, logger.clone(), keys_manager.clone(), UserConfig::default(), i)); - let node = Node { node: manager, persister, logger }; + let params = ChainParameters { + network, + latest_hash: genesis_block.block_hash(), + latest_height: 0, + }; + let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster, logger.clone(), keys_manager.clone(), UserConfig::default(), params)); + let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new() )}; + let peer_manager = Arc::new(PeerManager::new(msg_handler, keys_manager.get_node_secret(), &seed, logger.clone())); + let node = Node { node: manager, peer_manager, persister, logger }; nodes.push(node); } nodes @@ -172,7 +215,7 @@ mod tests { $node_a.node.handle_accept_channel(&$node_b.node.get_our_node_id(), InitFeatures::known(), &get_event_msg!($node_b, MessageSendEvent::SendAcceptChannel, $node_a.node.get_our_node_id())); let events = $node_a.node.get_and_clear_pending_events(); assert_eq!(events.len(), 1); - let (temporary_channel_id, tx, funding_output) = match events[0] { + let (temporary_channel_id, tx) = match events[0] { Event::FundingGenerationReady { ref temporary_channel_id, ref channel_value_satoshis, ref output_script, user_channel_id } => { assert_eq!(*channel_value_satoshis, $channel_value); assert_eq!(user_channel_id, 42); @@ -180,13 +223,12 @@ mod tests { let tx = Transaction { version: 1 as i32, lock_time: 0, input: Vec::new(), output: vec![TxOut { value: *channel_value_satoshis, script_pubkey: output_script.clone(), }]}; - let funding_outpoint = OutPoint { txid: tx.txid(), index: 0 }; - (*temporary_channel_id, tx, funding_outpoint) + (*temporary_channel_id, tx) }, _ => panic!("Unexpected event"), }; - $node_a.node.funding_transaction_generated(&temporary_channel_id, funding_output); + $node_a.node.funding_transaction_generated(&temporary_channel_id, tx.clone()).unwrap(); $node_b.node.handle_funding_created(&$node_a.node.get_our_node_id(), &get_event_msg!($node_a, MessageSendEvent::SendFundingCreated, $node_b.node.get_our_node_id())); $node_a.node.handle_funding_signed(&$node_b.node.get_our_node_id(), &get_event_msg!($node_b, MessageSendEvent::SendFundingSigned, $node_a.node.get_our_node_id())); tx @@ -203,7 +245,7 @@ mod tests { // Initiate the background processors to watch each node. let data_dir = nodes[0].persister.get_data_dir(); let callback = move |node: &ChannelManager, Arc, Arc, Arc, Arc>| FilesystemPersister::persist_manager(data_dir.clone(), node); - let bg_processor = BackgroundProcessor::start(callback, nodes[0].node.clone(), nodes[0].logger.clone()); + let bg_processor = BackgroundProcessor::start(callback, nodes[0].node.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone()); // Go through the channel creation process until each node should have something persisted. let tx = open_channel!(nodes[0], nodes[1], 100000); @@ -258,11 +300,11 @@ mod tests { let nodes = create_nodes(1, "test_chan_freshness_called".to_string()); let data_dir = nodes[0].persister.get_data_dir(); let callback = move |node: &ChannelManager, Arc, Arc, Arc, Arc>| FilesystemPersister::persist_manager(data_dir.clone(), node); - let bg_processor = BackgroundProcessor::start(callback, nodes[0].node.clone(), nodes[0].logger.clone()); + let bg_processor = BackgroundProcessor::start(callback, nodes[0].node.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone()); loop { let log_entries = nodes[0].logger.lines.lock().unwrap(); let desired_log = "Calling manager's timer_chan_freshness_every_min".to_string(); - if log_entries.get(&("background_processor".to_string(), desired_log)).is_some() { + if log_entries.get(&("lightning_background_processor".to_string(), desired_log)).is_some() { break } } @@ -285,7 +327,7 @@ mod tests { } let nodes = create_nodes(2, "test_persist_error".to_string()); - let bg_processor = BackgroundProcessor::start(persist_manager, nodes[0].node.clone(), nodes[0].logger.clone()); + let bg_processor = BackgroundProcessor::start(persist_manager, nodes[0].node.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone()); open_channel!(nodes[0], nodes[1], 100000); let _ = bg_processor.thread_handle.join().unwrap().expect_err("Errored persisting manager: test");