From f63fd83fd68229054aa8c10b341207262b7b31a9 Mon Sep 17 00:00:00 2001 From: Jeffrey Czyz Date: Tue, 11 May 2021 08:34:57 -0700 Subject: [PATCH] Process ChainMonitor events in the background --- lightning-background-processor/src/lib.rs | 99 ++++++++++++++++++----- lightning/src/chain/channelmonitor.rs | 10 +-- 2 files changed, 84 insertions(+), 25 deletions(-) diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index 3cd01a5e4..69aa2ea00 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -10,6 +10,8 @@ use lightning::chain; use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator}; +use lightning::chain::chainmonitor::ChainMonitor; +use lightning::chain::channelmonitor; use lightning::chain::keysinterface::{Sign, KeysInterface}; use lightning::ln::channelmanager::ChannelManager; use lightning::ln::msgs::{ChannelMessageHandler, RoutingMessageHandler}; @@ -102,26 +104,31 @@ impl BackgroundProcessor { /// [`FilesystemPersister::persist_manager`]: lightning_persister::FilesystemPersister::persist_manager pub fn start< Signer: 'static + Sign, - M: 'static + Deref + Send + Sync, + CF: 'static + Deref + Send + Sync, + CW: 'static + Deref + Send + Sync, T: 'static + Deref + Send + Sync, K: 'static + Deref + Send + Sync, F: 'static + Deref + Send + Sync, L: 'static + Deref + Send + Sync, + P: 'static + Deref + Send + Sync, Descriptor: 'static + SocketDescriptor + Send + Sync, CMH: 'static + Deref + Send + Sync, RMH: 'static + Deref + Send + Sync, EH: 'static + EventHandler + Send + Sync, - CMP: 'static + Send + ChannelManagerPersister, - CM: 'static + Deref> + Send + Sync, + CMP: 'static + Send + ChannelManagerPersister, + M: 'static + Deref> + Send + Sync, + CM: 'static + Deref> + Send + Sync, PM: 'static + Deref> + Send + Sync, > - (persister: CMP, event_handler: EH, channel_manager: CM, peer_manager: PM, logger: L) -> Self + (persister: CMP, event_handler: EH, chain_monitor: M, channel_manager: CM, peer_manager: PM, logger: L) -> Self where - M::Target: 'static + chain::Watch, + CF::Target: 'static + chain::Filter, + CW::Target: 'static + chain::Watch, T::Target: 'static + BroadcasterInterface, K::Target: 'static + KeysInterface, F::Target: 'static + FeeEstimator, L::Target: 'static + Logger, + P::Target: 'static + channelmonitor::Persist, CMH::Target: 'static + ChannelMessageHandler, RMH::Target: 'static + RoutingMessageHandler, { @@ -132,6 +139,7 @@ impl BackgroundProcessor { loop { peer_manager.process_events(); channel_manager.process_pending_events(&event_handler); + chain_monitor.process_pending_events(&event_handler); let updates_available = channel_manager.await_persistable_update_timeout(Duration::from_millis(100)); if updates_available { @@ -162,10 +170,13 @@ impl BackgroundProcessor { #[cfg(test)] mod tests { + use bitcoin::blockdata::block::BlockHeader; use bitcoin::blockdata::constants::genesis_block; use bitcoin::blockdata::transaction::{Transaction, TxOut}; use bitcoin::network::constants::Network; + use lightning::chain::Confirm; use lightning::chain::chainmonitor; + use lightning::chain::channelmonitor::ANTI_REORG_DELAY; use lightning::chain::keysinterface::{InMemorySigner, KeysInterface, KeysManager}; use lightning::chain::transaction::OutPoint; use lightning::get_event_msg; @@ -184,6 +195,8 @@ mod tests { use std::time::Duration; use super::{BackgroundProcessor, FRESHNESS_TIMER}; + const EVENT_DEADLINE: u64 = 5 * FRESHNESS_TIMER; + #[derive(Clone, Eq, Hash, PartialEq)] struct TestDescriptor{} impl SocketDescriptor for TestDescriptor { @@ -199,8 +212,11 @@ mod tests { struct Node { node: Arc>, peer_manager: Arc, Arc, Arc>>, + chain_monitor: Arc, persister: Arc, + tx_broadcaster: Arc, logger: Arc, + best_block: BestBlock, } impl Drop for Node { @@ -232,14 +248,12 @@ mod tests { let now = Duration::from_secs(genesis_block(network).header.time as u64); let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos())); let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(Some(chain_source.clone()), tx_broadcaster.clone(), logger.clone(), fee_estimator.clone(), persister.clone())); - let params = ChainParameters { - network, - best_block: BestBlock::from_genesis(network), - }; - let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster, logger.clone(), keys_manager.clone(), UserConfig::default(), params)); + let best_block = BestBlock::from_genesis(network); + let params = ChainParameters { network, best_block }; + let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster.clone(), logger.clone(), keys_manager.clone(), UserConfig::default(), params)); let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new() )}; let peer_manager = Arc::new(PeerManager::new(msg_handler, keys_manager.get_node_secret(), &seed, logger.clone())); - let node = Node { node: manager, peer_manager, persister, logger }; + let node = Node { node: manager, peer_manager, chain_monitor, persister, tx_broadcaster, logger, best_block }; nodes.push(node); } nodes @@ -289,6 +303,27 @@ mod tests { }} } + fn confirm_transaction(node: &mut Node, tx: &Transaction) { + for i in 1..=ANTI_REORG_DELAY { + let prev_blockhash = node.best_block.block_hash(); + let height = node.best_block.height() + 1; + let header = BlockHeader { version: 0x20000000, prev_blockhash, merkle_root: Default::default(), time: height, bits: 42, nonce: 42 }; + let txdata = vec![(0, tx)]; + node.best_block = BestBlock::new(header.block_hash(), height); + match i { + 1 => { + node.node.transactions_confirmed(&header, &txdata, height); + node.chain_monitor.transactions_confirmed(&header, &txdata, height); + }, + ANTI_REORG_DELAY => { + node.node.best_block_updated(&header, height); + node.chain_monitor.best_block_updated(&header, height); + }, + _ => {}, + } + } + } + #[test] fn test_background_processor() { // Test that when a new channel is created, the ChannelManager needs to be re-persisted with @@ -305,7 +340,7 @@ mod tests { let data_dir = nodes[0].persister.get_data_dir(); let persister = move |node: &ChannelManager, Arc, Arc, Arc, Arc>| FilesystemPersister::persist_manager(data_dir.clone(), node); let event_handler = |_| {}; - let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].node.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone()); + let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone()); macro_rules! check_persisted_data { ($node: expr, $filepath: expr, $expected_bytes: expr) => { @@ -358,7 +393,7 @@ mod tests { let data_dir = nodes[0].persister.get_data_dir(); let persister = move |node: &ChannelManager, Arc, Arc, Arc, Arc>| FilesystemPersister::persist_manager(data_dir.clone(), node); let event_handler = |_| {}; - let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].node.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone()); + let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone()); loop { let log_entries = nodes[0].logger.lines.lock().unwrap(); let desired_log = "Calling ChannelManager's and PeerManager's timer_tick_occurred".to_string(); @@ -378,13 +413,13 @@ mod tests { let persister = |_: &_| Err(std::io::Error::new(std::io::ErrorKind::Other, "test")); let event_handler = |_| {}; - let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].node.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone()); + let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone()); let _ = bg_processor.thread_handle.join().unwrap().expect_err("Errored persisting manager: test"); } #[test] fn test_background_event_handling() { - let nodes = create_nodes(2, "test_background_event_handling".to_string()); + let mut nodes = create_nodes(2, "test_background_event_handling".to_string()); let channel_value = 100000; let data_dir = nodes[0].persister.get_data_dir(); let persister = move |node: &_| FilesystemPersister::persist_manager(data_dir.clone(), node); @@ -394,15 +429,39 @@ mod tests { let event_handler = move |event| { sender.send(handle_funding_generation_ready!(event, channel_value)).unwrap(); }; - let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].node.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone()); + let bg_processor = BackgroundProcessor::start(persister.clone(), event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone()); // Open a channel and check that the FundingGenerationReady event was handled. begin_open_channel!(nodes[0], nodes[1], channel_value); - let timeout = Duration::from_secs(5 * FRESHNESS_TIMER); - let (temporary_channel_id, tx) = receiver - .recv_timeout(timeout) + let (temporary_channel_id, funding_tx) = receiver + .recv_timeout(Duration::from_secs(EVENT_DEADLINE)) .expect("FundingGenerationReady not handled within deadline"); - end_open_channel!(nodes[0], nodes[1], temporary_channel_id, tx); + end_open_channel!(nodes[0], nodes[1], temporary_channel_id, funding_tx); + + // Confirm the funding transaction. + confirm_transaction(&mut nodes[0], &funding_tx); + confirm_transaction(&mut nodes[1], &funding_tx); + nodes[0].node.handle_funding_locked(&nodes[1].node.get_our_node_id(), &get_event_msg!(nodes[1], MessageSendEvent::SendFundingLocked, nodes[0].node.get_our_node_id())); + nodes[1].node.handle_funding_locked(&nodes[0].node.get_our_node_id(), &get_event_msg!(nodes[0], MessageSendEvent::SendFundingLocked, nodes[1].node.get_our_node_id())); + + assert!(bg_processor.stop().is_ok()); + + // Set up a background event handler for SpendableOutputs events. + let (sender, receiver) = std::sync::mpsc::sync_channel(1); + let event_handler = move |event| sender.send(event).unwrap(); + let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone()); + + // Force close the channel and check that the SpendableOutputs event was handled. + nodes[0].node.force_close_channel(&nodes[0].node.list_channels()[0].channel_id).unwrap(); + let commitment_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap(); + confirm_transaction(&mut nodes[0], &commitment_tx); + let event = receiver + .recv_timeout(Duration::from_secs(EVENT_DEADLINE)) + .expect("SpendableOutputs not handled within deadline"); + match event { + Event::SpendableOutputs { .. } => {}, + _ => panic!("Unexpected event: {:?}", event), + } assert!(bg_processor.stop().is_ok()); } diff --git a/lightning/src/chain/channelmonitor.rs b/lightning/src/chain/channelmonitor.rs index 8526ce517..819811156 100644 --- a/lightning/src/chain/channelmonitor.rs +++ b/lightning/src/chain/channelmonitor.rs @@ -222,11 +222,11 @@ pub(crate) const CLTV_CLAIM_BUFFER: u32 = 18; pub(crate) const LATENCY_GRACE_PERIOD_BLOCKS: u32 = 3; /// Number of blocks we wait on seeing a HTLC output being solved before we fail corresponding inbound /// HTLCs. This prevents us from failing backwards and then getting a reorg resulting in us losing money. -/// We use also this delay to be sure we can remove our in-flight claim txn from bump candidates buffer. -/// It may cause spurrious generation of bumped claim txn but that's allright given the outpoint is already -/// solved by a previous claim tx. What we want to avoid is reorg evicting our claim tx and us not -/// keeping bumping another claim tx to solve the outpoint. -pub(crate) const ANTI_REORG_DELAY: u32 = 6; +// We also use this delay to be sure we can remove our in-flight claim txn from bump candidates buffer. +// It may cause spurious generation of bumped claim txn but that's alright given the outpoint is already +// solved by a previous claim tx. What we want to avoid is reorg evicting our claim tx and us not +// keep bumping another claim tx to solve the outpoint. +pub const ANTI_REORG_DELAY: u32 = 6; /// Number of blocks before confirmation at which we fail back an un-relayed HTLC or at which we /// refuse to accept a new HTLC. /// -- 2.39.5