From 0c34529083db452f1d5508f0c3b9ffd3a7d6928a Mon Sep 17 00:00:00 2001 From: Valentine Wallace Date: Fri, 2 Apr 2021 18:40:57 -0400 Subject: [PATCH] Call peer_manager.process_events() in BackgroundProcessor --- background-processor/src/lib.rs | 66 +++++++++++++++++++++++---------- 1 file changed, 46 insertions(+), 20 deletions(-) diff --git a/background-processor/src/lib.rs b/background-processor/src/lib.rs index c3db4d55a..6d9db076f 100644 --- a/background-processor/src/lib.rs +++ b/background-processor/src/lib.rs @@ -12,6 +12,8 @@ use lightning::chain; use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator}; use lightning::chain::keysinterface::{Sign, KeysInterface}; use lightning::ln::channelmanager::ChannelManager; +use lightning::ln::msgs::{ChannelMessageHandler, RoutingMessageHandler}; +use lightning::ln::peer_handler::{PeerManager, SocketDescriptor}; use lightning::util::logger::Logger; use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; @@ -63,40 +65,50 @@ impl BackgroundProcessor { /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager /// [`ChannelManager::write`]: lightning::ln::channelmanager::ChannelManager#impl-Writeable /// [`FilesystemPersister::persist_manager`]: lightning_persister::FilesystemPersister::persist_manager - pub fn start(persist_manager: PM, manager: Arc, Arc, Arc, Arc, Arc>>, logger: Arc) -> Self - where Signer: 'static + Sign, - M: 'static + chain::Watch, - T: 'static + BroadcasterInterface, - K: 'static + KeysInterface, - F: 'static + FeeEstimator, - L: 'static + Logger, - PM: 'static + Send + Fn(&ChannelManager, Arc, Arc, Arc, Arc>) -> Result<(), std::io::Error>, + pub fn start( + persist_channel_manager: PM, + channel_manager: Arc, Arc, Arc, Arc, Arc>>, + peer_manager: Arc, Arc, Arc>>, logger: Arc, + ) -> Self + where + Signer: 'static + Sign, + M: 'static + chain::Watch, + T: 'static + BroadcasterInterface, + K: 'static + KeysInterface, + F: 'static + FeeEstimator, + L: 'static + Logger, + CM: 'static + ChannelMessageHandler, + RM: 'static + RoutingMessageHandler, + PM: 'static + + Send + + Fn( + &ChannelManager, Arc, Arc, Arc, Arc>, + ) -> Result<(), std::io::Error>, { let stop_thread = Arc::new(AtomicBool::new(false)); let stop_thread_clone = stop_thread.clone(); let handle = thread::spawn(move || -> Result<(), std::io::Error> { let mut current_time = Instant::now(); loop { - let updates_available = manager.await_persistable_update_timeout(Duration::from_millis(100)); + peer_manager.process_events(); + let updates_available = + channel_manager.await_persistable_update_timeout(Duration::from_millis(100)); if updates_available { - persist_manager(&*manager)?; + persist_channel_manager(&*channel_manager)?; } // Exit the loop if the background processor was requested to stop. if stop_thread.load(Ordering::Acquire) == true { log_trace!(logger, "Terminating background processor."); - return Ok(()) + return Ok(()); } if current_time.elapsed().as_secs() > CHAN_FRESHNESS_TIMER { log_trace!(logger, "Calling manager's timer_chan_freshness_every_min"); - manager.timer_chan_freshness_every_min(); + channel_manager.timer_chan_freshness_every_min(); current_time = Instant::now(); } } }); - Self { - stop_thread: stop_thread_clone, - thread_handle: handle, - } + Self { stop_thread: stop_thread_clone, thread_handle: handle } } /// Stop `BackgroundProcessor`'s thread. @@ -120,6 +132,7 @@ mod tests { use lightning::ln::channelmanager::{ChainParameters, ChannelManager, SimpleArcChannelManager}; use lightning::ln::features::InitFeatures; use lightning::ln::msgs::ChannelMessageHandler; + use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor}; use lightning::util::config::UserConfig; use lightning::util::events::{Event, EventsProvider, MessageSendEventsProvider, MessageSendEvent}; use lightning::util::logger::Logger; @@ -132,10 +145,21 @@ mod tests { use std::time::Duration; use super::BackgroundProcessor; + #[derive(Clone, Eq, Hash, PartialEq)] + struct TestDescriptor{} + impl SocketDescriptor for TestDescriptor { + fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize { + 0 + } + + fn disconnect_socket(&mut self) {} + } + type ChainMonitor = chainmonitor::ChainMonitor, Arc, Arc, Arc, Arc>; struct Node { node: Arc>, + peer_manager: Arc, Arc, Arc>>, persister: Arc, logger: Arc, } @@ -176,7 +200,9 @@ mod tests { latest_height: 0, }; let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster, logger.clone(), keys_manager.clone(), UserConfig::default(), params)); - let node = Node { node: manager, persister, logger }; + let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new() )}; + let peer_manager = Arc::new(PeerManager::new(msg_handler, keys_manager.get_node_secret(), &seed, logger.clone())); + let node = Node { node: manager, peer_manager, persister, logger }; nodes.push(node); } nodes @@ -220,7 +246,7 @@ mod tests { // Initiate the background processors to watch each node. let data_dir = nodes[0].persister.get_data_dir(); let callback = move |node: &ChannelManager, Arc, Arc, Arc, Arc>| FilesystemPersister::persist_manager(data_dir.clone(), node); - let bg_processor = BackgroundProcessor::start(callback, nodes[0].node.clone(), nodes[0].logger.clone()); + let bg_processor = BackgroundProcessor::start(callback, nodes[0].node.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone()); // Go through the channel creation process until each node should have something persisted. let tx = open_channel!(nodes[0], nodes[1], 100000); @@ -275,7 +301,7 @@ mod tests { let nodes = create_nodes(1, "test_chan_freshness_called".to_string()); let data_dir = nodes[0].persister.get_data_dir(); let callback = move |node: &ChannelManager, Arc, Arc, Arc, Arc>| FilesystemPersister::persist_manager(data_dir.clone(), node); - let bg_processor = BackgroundProcessor::start(callback, nodes[0].node.clone(), nodes[0].logger.clone()); + let bg_processor = BackgroundProcessor::start(callback, nodes[0].node.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone()); loop { let log_entries = nodes[0].logger.lines.lock().unwrap(); let desired_log = "Calling manager's timer_chan_freshness_every_min".to_string(); @@ -302,7 +328,7 @@ mod tests { } let nodes = create_nodes(2, "test_persist_error".to_string()); - let bg_processor = BackgroundProcessor::start(persist_manager, nodes[0].node.clone(), nodes[0].logger.clone()); + let bg_processor = BackgroundProcessor::start(persist_manager, nodes[0].node.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone()); open_channel!(nodes[0], nodes[1], 100000); let _ = bg_processor.thread_handle.join().unwrap().expect_err("Errored persisting manager: test"); -- 2.39.5