X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning-background-processor%2Fsrc%2Flib.rs;h=dd13a35b46f6055807add340b88c41f41dcb7d6d;hb=7b4b75301047b7c122fb1e893658384e726ec60c;hp=3cd01a5e4c78f293b86acfe94d87e7750ec69227;hpb=501b54300cfa6023f1294878ec6a62c1f5420f2c;p=rust-lightning diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index 3cd01a5e..dd13a35b 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -10,6 +10,8 @@ use lightning::chain; use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator}; +use lightning::chain::chainmonitor::ChainMonitor; +use lightning::chain::channelmonitor; use lightning::chain::keysinterface::{Sign, KeysInterface}; use lightning::ln::channelmanager::ChannelManager; use lightning::ln::msgs::{ChannelMessageHandler, RoutingMessageHandler}; @@ -102,26 +104,31 @@ impl BackgroundProcessor { /// [`FilesystemPersister::persist_manager`]: lightning_persister::FilesystemPersister::persist_manager pub fn start< Signer: 'static + Sign, - M: 'static + Deref + Send + Sync, + CF: 'static + Deref + Send + Sync, + CW: 'static + Deref + Send + Sync, T: 'static + Deref + Send + Sync, K: 'static + Deref + Send + Sync, F: 'static + Deref + Send + Sync, L: 'static + Deref + Send + Sync, + P: 'static + Deref + Send + Sync, Descriptor: 'static + SocketDescriptor + Send + Sync, CMH: 'static + Deref + Send + Sync, RMH: 'static + Deref + Send + Sync, EH: 'static + EventHandler + Send + Sync, - CMP: 'static + Send + ChannelManagerPersister, - CM: 'static + Deref> + Send + Sync, + CMP: 'static + Send + ChannelManagerPersister, + M: 'static + Deref> + Send + Sync, + CM: 'static + Deref> + Send + Sync, PM: 'static + Deref> + Send + Sync, > - (persister: CMP, event_handler: EH, channel_manager: CM, peer_manager: PM, logger: L) -> Self + (persister: CMP, event_handler: EH, chain_monitor: M, channel_manager: CM, peer_manager: PM, logger: L) -> Self where - M::Target: 'static + chain::Watch, + CF::Target: 'static + chain::Filter, + CW::Target: 'static + chain::Watch, T::Target: 'static + BroadcasterInterface, K::Target: 'static + KeysInterface, F::Target: 'static + FeeEstimator, L::Target: 'static + Logger, + P::Target: 'static + channelmonitor::Persist, CMH::Target: 'static + ChannelMessageHandler, RMH::Target: 'static + RoutingMessageHandler, { @@ -132,6 +139,7 @@ impl BackgroundProcessor { loop { peer_manager.process_events(); channel_manager.process_pending_events(&event_handler); + chain_monitor.process_pending_events(&event_handler); let updates_available = channel_manager.await_persistable_update_timeout(Duration::from_millis(100)); if updates_available { @@ -162,14 +170,17 @@ impl BackgroundProcessor { #[cfg(test)] mod tests { + use bitcoin::blockdata::block::BlockHeader; use bitcoin::blockdata::constants::genesis_block; use bitcoin::blockdata::transaction::{Transaction, TxOut}; use bitcoin::network::constants::Network; + use lightning::chain::Confirm; use lightning::chain::chainmonitor; + use lightning::chain::channelmonitor::ANTI_REORG_DELAY; use lightning::chain::keysinterface::{InMemorySigner, KeysInterface, KeysManager}; use lightning::chain::transaction::OutPoint; use lightning::get_event_msg; - use lightning::ln::channelmanager::{BestBlock, ChainParameters, ChannelManager, SimpleArcChannelManager}; + use lightning::ln::channelmanager::{BREAKDOWN_TIMEOUT, BestBlock, ChainParameters, ChannelManager, SimpleArcChannelManager}; use lightning::ln::features::InitFeatures; use lightning::ln::msgs::ChannelMessageHandler; use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor}; @@ -184,6 +195,8 @@ mod tests { use std::time::Duration; use super::{BackgroundProcessor, FRESHNESS_TIMER}; + const EVENT_DEADLINE: u64 = 5 * FRESHNESS_TIMER; + #[derive(Clone, Eq, Hash, PartialEq)] struct TestDescriptor{} impl SocketDescriptor for TestDescriptor { @@ -199,8 +212,11 @@ mod tests { struct Node { node: Arc>, peer_manager: Arc, Arc, Arc>>, + chain_monitor: Arc, persister: Arc, + tx_broadcaster: Arc, logger: Arc, + best_block: BestBlock, } impl Drop for Node { @@ -222,7 +238,7 @@ mod tests { fn create_nodes(num_nodes: usize, persist_dir: String) -> Vec { let mut nodes = Vec::new(); for i in 0..num_nodes { - let tx_broadcaster = Arc::new(test_utils::TestBroadcaster{txn_broadcasted: Mutex::new(Vec::new())}); + let tx_broadcaster = Arc::new(test_utils::TestBroadcaster{txn_broadcasted: Mutex::new(Vec::new()), blocks: Arc::new(Mutex::new(Vec::new()))}); let fee_estimator = Arc::new(test_utils::TestFeeEstimator { sat_per_kw: 253 }); let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Testnet)); let logger = Arc::new(test_utils::TestLogger::with_id(format!("node {}", i))); @@ -232,14 +248,12 @@ mod tests { let now = Duration::from_secs(genesis_block(network).header.time as u64); let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos())); let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(Some(chain_source.clone()), tx_broadcaster.clone(), logger.clone(), fee_estimator.clone(), persister.clone())); - let params = ChainParameters { - network, - best_block: BestBlock::from_genesis(network), - }; - let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster, logger.clone(), keys_manager.clone(), UserConfig::default(), params)); + let best_block = BestBlock::from_genesis(network); + let params = ChainParameters { network, best_block }; + let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster.clone(), logger.clone(), keys_manager.clone(), UserConfig::default(), params)); let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new() )}; let peer_manager = Arc::new(PeerManager::new(msg_handler, keys_manager.get_node_secret(), &seed, logger.clone())); - let node = Node { node: manager, peer_manager, persister, logger }; + let node = Node { node: manager, peer_manager, chain_monitor, persister, tx_broadcaster, logger, best_block }; nodes.push(node); } nodes @@ -289,6 +303,30 @@ mod tests { }} } + fn confirm_transaction_depth(node: &mut Node, tx: &Transaction, depth: u32) { + for i in 1..=depth { + let prev_blockhash = node.best_block.block_hash(); + let height = node.best_block.height() + 1; + let header = BlockHeader { version: 0x20000000, prev_blockhash, merkle_root: Default::default(), time: height, bits: 42, nonce: 42 }; + let txdata = vec![(0, tx)]; + node.best_block = BestBlock::new(header.block_hash(), height); + match i { + 1 => { + node.node.transactions_confirmed(&header, &txdata, height); + node.chain_monitor.transactions_confirmed(&header, &txdata, height); + }, + x if x == depth => { + node.node.best_block_updated(&header, height); + node.chain_monitor.best_block_updated(&header, height); + }, + _ => {}, + } + } + } + fn confirm_transaction(node: &mut Node, tx: &Transaction) { + confirm_transaction_depth(node, tx, ANTI_REORG_DELAY); + } + #[test] fn test_background_processor() { // Test that when a new channel is created, the ChannelManager needs to be re-persisted with @@ -305,7 +343,7 @@ mod tests { let data_dir = nodes[0].persister.get_data_dir(); let persister = move |node: &ChannelManager, Arc, Arc, Arc, Arc>| FilesystemPersister::persist_manager(data_dir.clone(), node); let event_handler = |_| {}; - let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].node.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone()); + let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone()); macro_rules! check_persisted_data { ($node: expr, $filepath: expr, $expected_bytes: expr) => { @@ -358,7 +396,7 @@ mod tests { let data_dir = nodes[0].persister.get_data_dir(); let persister = move |node: &ChannelManager, Arc, Arc, Arc, Arc>| FilesystemPersister::persist_manager(data_dir.clone(), node); let event_handler = |_| {}; - let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].node.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone()); + let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone()); loop { let log_entries = nodes[0].logger.lines.lock().unwrap(); let desired_log = "Calling ChannelManager's and PeerManager's timer_tick_occurred".to_string(); @@ -378,13 +416,13 @@ mod tests { let persister = |_: &_| Err(std::io::Error::new(std::io::ErrorKind::Other, "test")); let event_handler = |_| {}; - let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].node.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone()); + let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone()); let _ = bg_processor.thread_handle.join().unwrap().expect_err("Errored persisting manager: test"); } #[test] fn test_background_event_handling() { - let nodes = create_nodes(2, "test_background_event_handling".to_string()); + let mut nodes = create_nodes(2, "test_background_event_handling".to_string()); let channel_value = 100000; let data_dir = nodes[0].persister.get_data_dir(); let persister = move |node: &_| FilesystemPersister::persist_manager(data_dir.clone(), node); @@ -394,15 +432,39 @@ mod tests { let event_handler = move |event| { sender.send(handle_funding_generation_ready!(event, channel_value)).unwrap(); }; - let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].node.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone()); + let bg_processor = BackgroundProcessor::start(persister.clone(), event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone()); // Open a channel and check that the FundingGenerationReady event was handled. begin_open_channel!(nodes[0], nodes[1], channel_value); - let timeout = Duration::from_secs(5 * FRESHNESS_TIMER); - let (temporary_channel_id, tx) = receiver - .recv_timeout(timeout) + let (temporary_channel_id, funding_tx) = receiver + .recv_timeout(Duration::from_secs(EVENT_DEADLINE)) .expect("FundingGenerationReady not handled within deadline"); - end_open_channel!(nodes[0], nodes[1], temporary_channel_id, tx); + end_open_channel!(nodes[0], nodes[1], temporary_channel_id, funding_tx); + + // Confirm the funding transaction. + confirm_transaction(&mut nodes[0], &funding_tx); + confirm_transaction(&mut nodes[1], &funding_tx); + nodes[0].node.handle_funding_locked(&nodes[1].node.get_our_node_id(), &get_event_msg!(nodes[1], MessageSendEvent::SendFundingLocked, nodes[0].node.get_our_node_id())); + nodes[1].node.handle_funding_locked(&nodes[0].node.get_our_node_id(), &get_event_msg!(nodes[0], MessageSendEvent::SendFundingLocked, nodes[1].node.get_our_node_id())); + + assert!(bg_processor.stop().is_ok()); + + // Set up a background event handler for SpendableOutputs events. + let (sender, receiver) = std::sync::mpsc::sync_channel(1); + let event_handler = move |event| sender.send(event).unwrap(); + let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone()); + + // Force close the channel and check that the SpendableOutputs event was handled. + nodes[0].node.force_close_channel(&nodes[0].node.list_channels()[0].channel_id).unwrap(); + let commitment_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap(); + confirm_transaction_depth(&mut nodes[0], &commitment_tx, BREAKDOWN_TIMEOUT as u32); + let event = receiver + .recv_timeout(Duration::from_secs(EVENT_DEADLINE)) + .expect("SpendableOutputs not handled within deadline"); + match event { + Event::SpendableOutputs { .. } => {}, + _ => panic!("Unexpected event: {:?}", event), + } assert!(bg_processor.stop().is_ok()); }