From 501b54300cfa6023f1294878ec6a62c1f5420f2c Mon Sep 17 00:00:00 2001 From: Jeffrey Czyz Date: Tue, 11 May 2021 08:07:54 -0700 Subject: [PATCH] Process ChannelManager events in the background --- lightning-background-processor/src/lib.rs | 101 +++++++++++++++------- 1 file changed, 69 insertions(+), 32 deletions(-) diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index 9273b358..3cd01a5e 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -14,6 +14,7 @@ use lightning::chain::keysinterface::{Sign, KeysInterface}; use lightning::ln::channelmanager::ChannelManager; use lightning::ln::msgs::{ChannelMessageHandler, RoutingMessageHandler}; use lightning::ln::peer_handler::{PeerManager, SocketDescriptor}; +use lightning::util::events::{EventHandler, EventsProvider}; use lightning::util::logger::Logger; use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; @@ -109,11 +110,12 @@ impl BackgroundProcessor { Descriptor: 'static + SocketDescriptor + Send + Sync, CMH: 'static + Deref + Send + Sync, RMH: 'static + Deref + Send + Sync, + EH: 'static + EventHandler + Send + Sync, CMP: 'static + Send + ChannelManagerPersister, CM: 'static + Deref> + Send + Sync, PM: 'static + Deref> + Send + Sync, > - (handler: CMP, channel_manager: CM, peer_manager: PM, logger: L) -> Self + (persister: CMP, event_handler: EH, channel_manager: CM, peer_manager: PM, logger: L) -> Self where M::Target: 'static + chain::Watch, T::Target: 'static + BroadcasterInterface, @@ -129,10 +131,11 @@ impl BackgroundProcessor { let mut current_time = Instant::now(); loop { peer_manager.process_events(); + channel_manager.process_pending_events(&event_handler); let updates_available = channel_manager.await_persistable_update_timeout(Duration::from_millis(100)); if updates_available { - handler.persist_manager(&*channel_manager)?; + persister.persist_manager(&*channel_manager)?; } // Exit the loop if the background processor was requested to stop. if stop_thread.load(Ordering::Acquire) == true { @@ -162,10 +165,8 @@ mod tests { use bitcoin::blockdata::constants::genesis_block; use bitcoin::blockdata::transaction::{Transaction, TxOut}; use bitcoin::network::constants::Network; - use lightning::chain; - use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator}; use lightning::chain::chainmonitor; - use lightning::chain::keysinterface::{Sign, InMemorySigner, KeysInterface, KeysManager}; + use lightning::chain::keysinterface::{InMemorySigner, KeysInterface, KeysManager}; use lightning::chain::transaction::OutPoint; use lightning::get_event_msg; use lightning::ln::channelmanager::{BestBlock, ChainParameters, ChannelManager, SimpleArcChannelManager}; @@ -174,7 +175,6 @@ mod tests { use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor}; use lightning::util::config::UserConfig; use lightning::util::events::{Event, MessageSendEventsProvider, MessageSendEvent}; - use lightning::util::logger::Logger; use lightning::util::ser::Writeable; use lightning::util::test_utils; use lightning_persister::FilesystemPersister; @@ -182,7 +182,7 @@ mod tests { use std::path::PathBuf; use std::sync::{Arc, Mutex}; use std::time::Duration; - use super::BackgroundProcessor; + use super::{BackgroundProcessor, FRESHNESS_TIMER}; #[derive(Clone, Eq, Hash, PartialEq)] struct TestDescriptor{} @@ -246,13 +246,27 @@ mod tests { } macro_rules! open_channel { + ($node_a: expr, $node_b: expr, $channel_value: expr) => {{ + begin_open_channel!($node_a, $node_b, $channel_value); + let events = $node_a.node.get_and_clear_pending_events(); + assert_eq!(events.len(), 1); + let (temporary_channel_id, tx) = handle_funding_generation_ready!(events[0], $channel_value); + end_open_channel!($node_a, $node_b, temporary_channel_id, tx); + tx + }} + } + + macro_rules! begin_open_channel { ($node_a: expr, $node_b: expr, $channel_value: expr) => {{ $node_a.node.create_channel($node_b.node.get_our_node_id(), $channel_value, 100, 42, None).unwrap(); $node_b.node.handle_open_channel(&$node_a.node.get_our_node_id(), InitFeatures::known(), &get_event_msg!($node_a, MessageSendEvent::SendOpenChannel, $node_b.node.get_our_node_id())); $node_a.node.handle_accept_channel(&$node_b.node.get_our_node_id(), InitFeatures::known(), &get_event_msg!($node_b, MessageSendEvent::SendAcceptChannel, $node_a.node.get_our_node_id())); - let events = $node_a.node.get_and_clear_pending_events(); - assert_eq!(events.len(), 1); - let (temporary_channel_id, tx) = match events[0] { + }} + } + + macro_rules! handle_funding_generation_ready { + ($event: expr, $channel_value: expr) => {{ + match $event { Event::FundingGenerationReady { ref temporary_channel_id, ref channel_value_satoshis, ref output_script, user_channel_id } => { assert_eq!(*channel_value_satoshis, $channel_value); assert_eq!(user_channel_id, 42); @@ -263,12 +277,15 @@ mod tests { (*temporary_channel_id, tx) }, _ => panic!("Unexpected event"), - }; + } + }} + } - $node_a.node.funding_transaction_generated(&temporary_channel_id, tx.clone()).unwrap(); + macro_rules! end_open_channel { + ($node_a: expr, $node_b: expr, $temporary_channel_id: expr, $tx: expr) => {{ + $node_a.node.funding_transaction_generated(&$temporary_channel_id, $tx.clone()).unwrap(); $node_b.node.handle_funding_created(&$node_a.node.get_our_node_id(), &get_event_msg!($node_a, MessageSendEvent::SendFundingCreated, $node_b.node.get_our_node_id())); $node_a.node.handle_funding_signed(&$node_b.node.get_our_node_id(), &get_event_msg!($node_b, MessageSendEvent::SendFundingSigned, $node_a.node.get_our_node_id())); - tx }} } @@ -279,13 +296,16 @@ mod tests { // re-persistence and is successfully re-persisted. let nodes = create_nodes(2, "test_background_processor".to_string()); + // Go through the channel creation process so that each node has something to persist. Since + // open_channel consumes events, it must complete before starting BackgroundProcessor to + // avoid a race with processing events. + let tx = open_channel!(nodes[0], nodes[1], 100000); + // Initiate the background processors to watch each node. let data_dir = nodes[0].persister.get_data_dir(); - let callback = move |node: &ChannelManager, Arc, Arc, Arc, Arc>| FilesystemPersister::persist_manager(data_dir.clone(), node); - let bg_processor = BackgroundProcessor::start(callback, nodes[0].node.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone()); - - // Go through the channel creation process until each node should have something persisted. - let tx = open_channel!(nodes[0], nodes[1], 100000); + let persister = move |node: &ChannelManager, Arc, Arc, Arc, Arc>| FilesystemPersister::persist_manager(data_dir.clone(), node); + let event_handler = |_| {}; + let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].node.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone()); macro_rules! check_persisted_data { ($node: expr, $filepath: expr, $expected_bytes: expr) => { @@ -336,8 +356,9 @@ mod tests { // `FRESHNESS_TIMER`. let nodes = create_nodes(1, "test_timer_tick_called".to_string()); let data_dir = nodes[0].persister.get_data_dir(); - let callback = move |node: &ChannelManager, Arc, Arc, Arc, Arc>| FilesystemPersister::persist_manager(data_dir.clone(), node); - let bg_processor = BackgroundProcessor::start(callback, nodes[0].node.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone()); + let persister = move |node: &ChannelManager, Arc, Arc, Arc, Arc>| FilesystemPersister::persist_manager(data_dir.clone(), node); + let event_handler = |_| {}; + let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].node.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone()); loop { let log_entries = nodes[0].logger.lines.lock().unwrap(); let desired_log = "Calling ChannelManager's and PeerManager's timer_tick_occurred".to_string(); @@ -352,21 +373,37 @@ mod tests { #[test] fn test_persist_error() { // Test that if we encounter an error during manager persistence, the thread panics. - fn persist_manager(_data: &ChannelManager, Arc, Arc, Arc, Arc>) -> Result<(), std::io::Error> - where Signer: 'static + Sign, - M: 'static + chain::Watch, - T: 'static + BroadcasterInterface, - K: 'static + KeysInterface, - F: 'static + FeeEstimator, - L: 'static + Logger, - { - Err(std::io::Error::new(std::io::ErrorKind::Other, "test")) - } - let nodes = create_nodes(2, "test_persist_error".to_string()); - let bg_processor = BackgroundProcessor::start(persist_manager, nodes[0].node.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone()); open_channel!(nodes[0], nodes[1], 100000); + let persister = |_: &_| Err(std::io::Error::new(std::io::ErrorKind::Other, "test")); + let event_handler = |_| {}; + let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].node.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone()); let _ = bg_processor.thread_handle.join().unwrap().expect_err("Errored persisting manager: test"); } + + #[test] + fn test_background_event_handling() { + let nodes = create_nodes(2, "test_background_event_handling".to_string()); + let channel_value = 100000; + let data_dir = nodes[0].persister.get_data_dir(); + let persister = move |node: &_| FilesystemPersister::persist_manager(data_dir.clone(), node); + + // Set up a background event handler for FundingGenerationReady events. + let (sender, receiver) = std::sync::mpsc::sync_channel(1); + let event_handler = move |event| { + sender.send(handle_funding_generation_ready!(event, channel_value)).unwrap(); + }; + let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].node.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone()); + + // Open a channel and check that the FundingGenerationReady event was handled. + begin_open_channel!(nodes[0], nodes[1], channel_value); + let timeout = Duration::from_secs(5 * FRESHNESS_TIMER); + let (temporary_channel_id, tx) = receiver + .recv_timeout(timeout) + .expect("FundingGenerationReady not handled within deadline"); + end_open_channel!(nodes[0], nodes[1], temporary_channel_id, tx); + + assert!(bg_processor.stop().is_ok()); + } } -- 2.30.2