X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;ds=sidebyside;f=lightning-background-processor%2Fsrc%2Flib.rs;h=5fb8499b689ac7956a077a30903226781cd5bf3a;hb=1c8a06cf61921b5edd637c066ca8bd001183042d;hp=484439b3907b364dddc3dc2c13bb6c078be2ad0c;hpb=d8cca9806c27014462bfb5ccb9e6488a14445425;p=rust-lightning diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index 484439b3..5fb8499b 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -2,7 +2,10 @@ //! running properly, and (2) either can or should be run in the background. See docs for //! [`BackgroundProcessor`] for more details on the nitty-gritty. +// Prefix these with `rustdoc::` when we update our MSRV to be >= 1.52 to remove warnings. #![deny(broken_intra_doc_links)] +#![deny(private_intra_doc_links)] + #![deny(missing_docs)] #![deny(unsafe_code)] @@ -14,9 +17,9 @@ extern crate lightning_rapid_gossip_sync; use lightning::chain; use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator}; use lightning::chain::chainmonitor::{ChainMonitor, Persist}; -use lightning::chain::keysinterface::{Sign, KeysInterface}; +use lightning::chain::keysinterface::KeysInterface; use lightning::ln::channelmanager::ChannelManager; -use lightning::ln::msgs::{ChannelMessageHandler, RoutingMessageHandler}; +use lightning::ln::msgs::{ChannelMessageHandler, OnionMessageHandler, RoutingMessageHandler}; use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor}; use lightning::routing::gossip::{NetworkGraph, P2PGossipSync}; use lightning::routing::scoring::WriteableScore; @@ -31,6 +34,9 @@ use std::thread::JoinHandle; use std::time::{Duration, Instant}; use std::ops::Deref; +#[cfg(feature = "futures")] +use futures_util::{select_biased, future::FutureExt}; + /// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep /// Rust-Lightning running properly, and (2) either can or should be run in the background. Its /// responsibilities are: @@ -40,8 +46,8 @@ use std::ops::Deref; /// [`ChannelManager`] persistence should be done in the background. /// * Calling [`ChannelManager::timer_tick_occurred`] and [`PeerManager::timer_tick_occurred`] /// at the appropriate intervals. -/// * Calling [`NetworkGraph::remove_stale_channels`] (if a [`GossipSync`] with a [`NetworkGraph`] -/// is provided to [`BackgroundProcessor::start`]). +/// * Calling [`NetworkGraph::remove_stale_channels_and_tracking`] (if a [`GossipSync`] with a +/// [`NetworkGraph`] is provided to [`BackgroundProcessor::start`]). /// /// It will also call [`PeerManager::process_events`] periodically though this shouldn't be relied /// upon as doing so may result in high latency. @@ -219,6 +225,204 @@ where A::Target: chain::Access, L::Target: Logger { } } +macro_rules! define_run_body { + ($persister: ident, $event_handler: ident, $chain_monitor: ident, $channel_manager: ident, + $gossip_sync: ident, $peer_manager: ident, $logger: ident, $scorer: ident, + $loop_exit_check: expr, $await: expr) + => { { + let event_handler = DecoratingEventHandler { + event_handler: $event_handler, + gossip_sync: &$gossip_sync, + }; + + log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup"); + $channel_manager.timer_tick_occurred(); + + let mut last_freshness_call = Instant::now(); + let mut last_ping_call = Instant::now(); + let mut last_prune_call = Instant::now(); + let mut last_scorer_persist_call = Instant::now(); + let mut have_pruned = false; + + loop { + $channel_manager.process_pending_events(&event_handler); + $chain_monitor.process_pending_events(&event_handler); + + // Note that the PeerManager::process_events may block on ChannelManager's locks, + // hence it comes last here. When the ChannelManager finishes whatever it's doing, + // we want to ensure we get into `persist_manager` as quickly as we can, especially + // without running the normal event processing above and handing events to users. + // + // Specifically, on an *extremely* slow machine, we may see ChannelManager start + // processing a message effectively at any point during this loop. In order to + // minimize the time between such processing completing and persisting the updated + // ChannelManager, we want to minimize methods blocking on a ChannelManager + // generally, and as a fallback place such blocking only immediately before + // persistence. + $peer_manager.process_events(); + + // We wait up to 100ms, but track how long it takes to detect being put to sleep, + // see `await_start`'s use below. + let await_start = Instant::now(); + let updates_available = $await; + let await_time = await_start.elapsed(); + + if updates_available { + log_trace!($logger, "Persisting ChannelManager..."); + $persister.persist_manager(&*$channel_manager)?; + log_trace!($logger, "Done persisting ChannelManager."); + } + // Exit the loop if the background processor was requested to stop. + if $loop_exit_check { + log_trace!($logger, "Terminating background processor."); + break; + } + if last_freshness_call.elapsed().as_secs() > FRESHNESS_TIMER { + log_trace!($logger, "Calling ChannelManager's timer_tick_occurred"); + $channel_manager.timer_tick_occurred(); + last_freshness_call = Instant::now(); + } + if await_time > Duration::from_secs(1) { + // On various platforms, we may be starved of CPU cycles for several reasons. + // E.g. on iOS, if we've been in the background, we will be entirely paused. + // Similarly, if we're on a desktop platform and the device has been asleep, we + // may not get any cycles. + // We detect this by checking if our max-100ms-sleep, above, ran longer than a + // full second, at which point we assume sockets may have been killed (they + // appear to be at least on some platforms, even if it has only been a second). + // Note that we have to take care to not get here just because user event + // processing was slow at the top of the loop. For example, the sample client + // may call Bitcoin Core RPCs during event handling, which very often takes + // more than a handful of seconds to complete, and shouldn't disconnect all our + // peers. + log_trace!($logger, "100ms sleep took more than a second, disconnecting peers."); + $peer_manager.disconnect_all_peers(); + last_ping_call = Instant::now(); + } else if last_ping_call.elapsed().as_secs() > PING_TIMER { + log_trace!($logger, "Calling PeerManager's timer_tick_occurred"); + $peer_manager.timer_tick_occurred(); + last_ping_call = Instant::now(); + } + + // Note that we want to run a graph prune once not long after startup before + // falling back to our usual hourly prunes. This avoids short-lived clients never + // pruning their network graph. We run once 60 seconds after startup before + // continuing our normal cadence. + if last_prune_call.elapsed().as_secs() > if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER } { + // The network graph must not be pruned while rapid sync completion is pending + log_trace!($logger, "Assessing prunability of network graph"); + if let Some(network_graph) = $gossip_sync.prunable_network_graph() { + network_graph.remove_stale_channels_and_tracking(); + + if let Err(e) = $persister.persist_graph(network_graph) { + log_error!($logger, "Error: Failed to persist network graph, check your disk and permissions {}", e) + } + + last_prune_call = Instant::now(); + have_pruned = true; + } else { + log_trace!($logger, "Not pruning network graph, either due to pending rapid gossip sync or absence of a prunable graph."); + } + } + + if last_scorer_persist_call.elapsed().as_secs() > SCORER_PERSIST_TIMER { + if let Some(ref scorer) = $scorer { + log_trace!($logger, "Persisting scorer"); + if let Err(e) = $persister.persist_scorer(&scorer) { + log_error!($logger, "Error: Failed to persist scorer, check your disk and permissions {}", e) + } + } + last_scorer_persist_call = Instant::now(); + } + } + + // After we exit, ensure we persist the ChannelManager one final time - this avoids + // some races where users quit while channel updates were in-flight, with + // ChannelMonitor update(s) persisted without a corresponding ChannelManager update. + $persister.persist_manager(&*$channel_manager)?; + + // Persist Scorer on exit + if let Some(ref scorer) = $scorer { + $persister.persist_scorer(&scorer)?; + } + + // Persist NetworkGraph on exit + if let Some(network_graph) = $gossip_sync.network_graph() { + $persister.persist_graph(network_graph)?; + } + + Ok(()) + } } +} + +/// Processes background events in a future. +/// +/// `sleeper` should return a future which completes in the given amount of time and returns a +/// boolean indicating whether the background processing should continue. Once `sleeper` returns a +/// future which outputs false, the loop will exit and this function's future will complete. +/// +/// See [`BackgroundProcessor::start`] for information on which actions this handles. +#[cfg(feature = "futures")] +pub async fn process_events_async< + 'a, + CA: 'static + Deref + Send + Sync, + CF: 'static + Deref + Send + Sync, + CW: 'static + Deref + Send + Sync, + T: 'static + Deref + Send + Sync, + K: 'static + Deref + Send + Sync, + F: 'static + Deref + Send + Sync, + G: 'static + Deref> + Send + Sync, + L: 'static + Deref + Send + Sync, + P: 'static + Deref + Send + Sync, + Descriptor: 'static + SocketDescriptor + Send + Sync, + CMH: 'static + Deref + Send + Sync, + RMH: 'static + Deref + Send + Sync, + OMH: 'static + Deref + Send + Sync, + EH: 'static + EventHandler + Send, + PS: 'static + Deref + Send, + M: 'static + Deref::Signer, CF, T, F, L, P>> + Send + Sync, + CM: 'static + Deref> + Send + Sync, + PGS: 'static + Deref> + Send + Sync, + RGS: 'static + Deref> + Send, + UMH: 'static + Deref + Send + Sync, + PM: 'static + Deref> + Send + Sync, + S: 'static + Deref + Send + Sync, + SC: WriteableScore<'a>, + SleepFuture: core::future::Future, + Sleeper: Fn(Duration) -> SleepFuture +>( + persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM, + gossip_sync: GossipSync, peer_manager: PM, logger: L, scorer: Option, + sleeper: Sleeper, +) -> Result<(), std::io::Error> +where + CA::Target: 'static + chain::Access, + CF::Target: 'static + chain::Filter, + CW::Target: 'static + chain::Watch<::Signer>, + T::Target: 'static + BroadcasterInterface, + K::Target: 'static + KeysInterface, + F::Target: 'static + FeeEstimator, + L::Target: 'static + Logger, + P::Target: 'static + Persist<::Signer>, + CMH::Target: 'static + ChannelMessageHandler, + OMH::Target: 'static + OnionMessageHandler, + RMH::Target: 'static + RoutingMessageHandler, + UMH::Target: 'static + CustomMessageHandler, + PS::Target: 'static + Persister<'a, CW, T, K, F, L, SC>, +{ + let mut should_continue = true; + define_run_body!(persister, event_handler, chain_monitor, channel_manager, + gossip_sync, peer_manager, logger, scorer, should_continue, { + select_biased! { + _ = channel_manager.get_persistable_update_future().fuse() => true, + cont = sleeper(Duration::from_millis(100)).fuse() => { + should_continue = cont; + false + } + } + }) +} + impl BackgroundProcessor { /// Start a background thread that takes care of responsibilities enumerated in the [top-level /// documentation]. @@ -266,7 +470,6 @@ impl BackgroundProcessor { /// [`NetworkGraph::write`]: lightning::routing::gossip::NetworkGraph#impl-Writeable pub fn start< 'a, - Signer: 'static + Sign, CA: 'static + Deref + Send + Sync, CF: 'static + Deref + Send + Sync, CW: 'static + Deref + Send + Sync, @@ -278,15 +481,16 @@ impl BackgroundProcessor { P: 'static + Deref + Send + Sync, Descriptor: 'static + SocketDescriptor + Send + Sync, CMH: 'static + Deref + Send + Sync, + OMH: 'static + Deref + Send + Sync, RMH: 'static + Deref + Send + Sync, EH: 'static + EventHandler + Send, PS: 'static + Deref + Send, - M: 'static + Deref> + Send + Sync, - CM: 'static + Deref> + Send + Sync, + M: 'static + Deref::Signer, CF, T, F, L, P>> + Send + Sync, + CM: 'static + Deref> + Send + Sync, PGS: 'static + Deref> + Send + Sync, RGS: 'static + Deref> + Send, UMH: 'static + Deref + Send + Sync, - PM: 'static + Deref> + Send + Sync, + PM: 'static + Deref> + Send + Sync, S: 'static + Deref + Send + Sync, SC: WriteableScore<'a>, >( @@ -296,143 +500,24 @@ impl BackgroundProcessor { where CA::Target: 'static + chain::Access, CF::Target: 'static + chain::Filter, - CW::Target: 'static + chain::Watch, + CW::Target: 'static + chain::Watch<::Signer>, T::Target: 'static + BroadcasterInterface, - K::Target: 'static + KeysInterface, + K::Target: 'static + KeysInterface, F::Target: 'static + FeeEstimator, L::Target: 'static + Logger, - P::Target: 'static + Persist, + P::Target: 'static + Persist<::Signer>, CMH::Target: 'static + ChannelMessageHandler, + OMH::Target: 'static + OnionMessageHandler, RMH::Target: 'static + RoutingMessageHandler, UMH::Target: 'static + CustomMessageHandler, - PS::Target: 'static + Persister<'a, Signer, CW, T, K, F, L, SC>, + PS::Target: 'static + Persister<'a, CW, T, K, F, L, SC>, { let stop_thread = Arc::new(AtomicBool::new(false)); let stop_thread_clone = stop_thread.clone(); let handle = thread::spawn(move || -> Result<(), std::io::Error> { - let event_handler = DecoratingEventHandler { - event_handler, - gossip_sync: &gossip_sync, - }; - - log_trace!(logger, "Calling ChannelManager's timer_tick_occurred on startup"); - channel_manager.timer_tick_occurred(); - - let mut last_freshness_call = Instant::now(); - let mut last_ping_call = Instant::now(); - let mut last_prune_call = Instant::now(); - let mut last_scorer_persist_call = Instant::now(); - let mut have_pruned = false; - - loop { - channel_manager.process_pending_events(&event_handler); - chain_monitor.process_pending_events(&event_handler); - - // Note that the PeerManager::process_events may block on ChannelManager's locks, - // hence it comes last here. When the ChannelManager finishes whatever it's doing, - // we want to ensure we get into `persist_manager` as quickly as we can, especially - // without running the normal event processing above and handing events to users. - // - // Specifically, on an *extremely* slow machine, we may see ChannelManager start - // processing a message effectively at any point during this loop. In order to - // minimize the time between such processing completing and persisting the updated - // ChannelManager, we want to minimize methods blocking on a ChannelManager - // generally, and as a fallback place such blocking only immediately before - // persistence. - peer_manager.process_events(); - - // We wait up to 100ms, but track how long it takes to detect being put to sleep, - // see `await_start`'s use below. - let await_start = Instant::now(); - let updates_available = - channel_manager.await_persistable_update_timeout(Duration::from_millis(100)); - let await_time = await_start.elapsed(); - - if updates_available { - log_trace!(logger, "Persisting ChannelManager..."); - persister.persist_manager(&*channel_manager)?; - log_trace!(logger, "Done persisting ChannelManager."); - } - // Exit the loop if the background processor was requested to stop. - if stop_thread.load(Ordering::Acquire) == true { - log_trace!(logger, "Terminating background processor."); - break; - } - if last_freshness_call.elapsed().as_secs() > FRESHNESS_TIMER { - log_trace!(logger, "Calling ChannelManager's timer_tick_occurred"); - channel_manager.timer_tick_occurred(); - last_freshness_call = Instant::now(); - } - if await_time > Duration::from_secs(1) { - // On various platforms, we may be starved of CPU cycles for several reasons. - // E.g. on iOS, if we've been in the background, we will be entirely paused. - // Similarly, if we're on a desktop platform and the device has been asleep, we - // may not get any cycles. - // We detect this by checking if our max-100ms-sleep, above, ran longer than a - // full second, at which point we assume sockets may have been killed (they - // appear to be at least on some platforms, even if it has only been a second). - // Note that we have to take care to not get here just because user event - // processing was slow at the top of the loop. For example, the sample client - // may call Bitcoin Core RPCs during event handling, which very often takes - // more than a handful of seconds to complete, and shouldn't disconnect all our - // peers. - log_trace!(logger, "100ms sleep took more than a second, disconnecting peers."); - peer_manager.disconnect_all_peers(); - last_ping_call = Instant::now(); - } else if last_ping_call.elapsed().as_secs() > PING_TIMER { - log_trace!(logger, "Calling PeerManager's timer_tick_occurred"); - peer_manager.timer_tick_occurred(); - last_ping_call = Instant::now(); - } - - // Note that we want to run a graph prune once not long after startup before - // falling back to our usual hourly prunes. This avoids short-lived clients never - // pruning their network graph. We run once 60 seconds after startup before - // continuing our normal cadence. - if last_prune_call.elapsed().as_secs() > if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER } { - // The network graph must not be pruned while rapid sync completion is pending - log_trace!(logger, "Assessing prunability of network graph"); - if let Some(network_graph) = gossip_sync.prunable_network_graph() { - network_graph.remove_stale_channels(); - - if let Err(e) = persister.persist_graph(network_graph) { - log_error!(logger, "Error: Failed to persist network graph, check your disk and permissions {}", e) - } - - last_prune_call = Instant::now(); - have_pruned = true; - } else { - log_trace!(logger, "Not pruning network graph, either due to pending rapid gossip sync or absence of a prunable graph."); - } - } - - if last_scorer_persist_call.elapsed().as_secs() > SCORER_PERSIST_TIMER { - if let Some(ref scorer) = scorer { - log_trace!(logger, "Persisting scorer"); - if let Err(e) = persister.persist_scorer(&scorer) { - log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e) - } - } - last_scorer_persist_call = Instant::now(); - } - } - - // After we exit, ensure we persist the ChannelManager one final time - this avoids - // some races where users quit while channel updates were in-flight, with - // ChannelMonitor update(s) persisted without a corresponding ChannelManager update. - persister.persist_manager(&*channel_manager)?; - - // Persist Scorer on exit - if let Some(ref scorer) = scorer { - persister.persist_scorer(&scorer)?; - } - - // Persist NetworkGraph on exit - if let Some(network_graph) = gossip_sync.network_graph() { - persister.persist_graph(network_graph)?; - } - - Ok(()) + define_run_body!(persister, event_handler, chain_monitor, channel_manager, + gossip_sync, peer_manager, logger, scorer, stop_thread.load(Ordering::Acquire), + channel_manager.await_persistable_update_timeout(Duration::from_millis(100))) }); Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) } } @@ -488,6 +573,7 @@ impl Drop for BackgroundProcessor { mod tests { use bitcoin::blockdata::block::BlockHeader; use bitcoin::blockdata::constants::genesis_block; + use bitcoin::blockdata::locktime::PackedLockTime; use bitcoin::blockdata::transaction::{Transaction, TxOut}; use bitcoin::network::constants::Network; use lightning::chain::{BestBlock, Confirm, chainmonitor}; @@ -495,8 +581,8 @@ mod tests { use lightning::chain::keysinterface::{InMemorySigner, Recipient, KeysInterface, KeysManager}; use lightning::chain::transaction::OutPoint; use lightning::get_event_msg; - use lightning::ln::channelmanager::{BREAKDOWN_TIMEOUT, ChainParameters, ChannelManager, SimpleArcChannelManager}; - use lightning::ln::features::{ChannelFeatures, InitFeatures}; + use lightning::ln::channelmanager::{self, BREAKDOWN_TIMEOUT, ChainParameters, ChannelManager, SimpleArcChannelManager}; + use lightning::ln::features::ChannelFeatures; use lightning::ln::msgs::{ChannelMessageHandler, Init}; use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler}; use lightning::routing::gossip::{NetworkGraph, P2PGossipSync}; @@ -513,13 +599,15 @@ mod tests { use std::sync::{Arc, Mutex}; use std::sync::mpsc::SyncSender; use std::time::Duration; + use bitcoin::hashes::Hash; + use bitcoin::TxMerkleNode; use lightning::routing::scoring::{FixedPenaltyScorer}; use lightning_rapid_gossip_sync::RapidGossipSync; use super::{BackgroundProcessor, GossipSync, FRESHNESS_TIMER}; const EVENT_DEADLINE: u64 = 5 * FRESHNESS_TIMER; - #[derive(Clone, Eq, Hash, PartialEq)] + #[derive(Clone, Hash, PartialEq, Eq)] struct TestDescriptor{} impl SocketDescriptor for TestDescriptor { fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize { @@ -538,7 +626,7 @@ mod tests { node: Arc>, p2p_gossip_sync: PGS, rapid_gossip_sync: RGS, - peer_manager: Arc, Arc, Arc, IgnoringMessageHandler>>, + peer_manager: Arc, Arc, IgnoringMessageHandler, Arc, IgnoringMessageHandler>>, chain_monitor: Arc, persister: Arc, tx_broadcaster: Arc, @@ -657,8 +745,8 @@ mod tests { let network_graph = Arc::new(NetworkGraph::new(genesis_block.header.block_hash(), logger.clone())); let p2p_gossip_sync = Arc::new(P2PGossipSync::new(network_graph.clone(), Some(chain_source.clone()), logger.clone())); let rapid_gossip_sync = Arc::new(RapidGossipSync::new(network_graph.clone())); - let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new() )}; - let peer_manager = Arc::new(PeerManager::new(msg_handler, keys_manager.get_node_secret(Recipient::Node).unwrap(), &seed, logger.clone(), IgnoringMessageHandler{})); + let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new()), onion_message_handler: IgnoringMessageHandler{}}; + let peer_manager = Arc::new(PeerManager::new(msg_handler, keys_manager.get_node_secret(Recipient::Node).unwrap(), 0, &seed, logger.clone(), IgnoringMessageHandler{})); let scorer = Arc::new(Mutex::new(test_utils::TestScorer::with_penalty(0))); let node = Node { node: manager, p2p_gossip_sync, rapid_gossip_sync, peer_manager, chain_monitor, persister, tx_broadcaster, network_graph, logger, best_block, scorer }; nodes.push(node); @@ -666,8 +754,8 @@ mod tests { for i in 0..num_nodes { for j in (i+1)..num_nodes { - nodes[i].node.peer_connected(&nodes[j].node.get_our_node_id(), &Init { features: InitFeatures::known(), remote_network_address: None }); - nodes[j].node.peer_connected(&nodes[i].node.get_our_node_id(), &Init { features: InitFeatures::known(), remote_network_address: None }); + nodes[i].node.peer_connected(&nodes[j].node.get_our_node_id(), &Init { features: channelmanager::provided_init_features(), remote_network_address: None }).unwrap(); + nodes[j].node.peer_connected(&nodes[i].node.get_our_node_id(), &Init { features: channelmanager::provided_init_features(), remote_network_address: None }).unwrap(); } } @@ -688,8 +776,8 @@ mod tests { macro_rules! begin_open_channel { ($node_a: expr, $node_b: expr, $channel_value: expr) => {{ $node_a.node.create_channel($node_b.node.get_our_node_id(), $channel_value, 100, 42, None).unwrap(); - $node_b.node.handle_open_channel(&$node_a.node.get_our_node_id(), InitFeatures::known(), &get_event_msg!($node_a, MessageSendEvent::SendOpenChannel, $node_b.node.get_our_node_id())); - $node_a.node.handle_accept_channel(&$node_b.node.get_our_node_id(), InitFeatures::known(), &get_event_msg!($node_b, MessageSendEvent::SendAcceptChannel, $node_a.node.get_our_node_id())); + $node_b.node.handle_open_channel(&$node_a.node.get_our_node_id(), channelmanager::provided_init_features(), &get_event_msg!($node_a, MessageSendEvent::SendOpenChannel, $node_b.node.get_our_node_id())); + $node_a.node.handle_accept_channel(&$node_b.node.get_our_node_id(), channelmanager::provided_init_features(), &get_event_msg!($node_b, MessageSendEvent::SendAcceptChannel, $node_a.node.get_our_node_id())); }} } @@ -700,7 +788,7 @@ mod tests { assert_eq!(channel_value_satoshis, $channel_value); assert_eq!(user_channel_id, 42); - let tx = Transaction { version: 1 as i32, lock_time: 0, input: Vec::new(), output: vec![TxOut { + let tx = Transaction { version: 1 as i32, lock_time: PackedLockTime(0), input: Vec::new(), output: vec![TxOut { value: channel_value_satoshis, script_pubkey: output_script.clone(), }]}; (temporary_channel_id, tx) @@ -722,7 +810,7 @@ mod tests { for i in 1..=depth { let prev_blockhash = node.best_block.block_hash(); let height = node.best_block.height() + 1; - let header = BlockHeader { version: 0x20000000, prev_blockhash, merkle_root: Default::default(), time: height, bits: 42, nonce: 42 }; + let header = BlockHeader { version: 0x20000000, prev_blockhash, merkle_root: TxMerkleNode::all_zeros(), time: height, bits: 42, nonce: 42 }; let txdata = vec![(0, tx)]; node.best_block = BestBlock::new(header.block_hash(), height); match i { @@ -898,9 +986,12 @@ mod tests { // Set up a background event handler for FundingGenerationReady events. let (sender, receiver) = std::sync::mpsc::sync_channel(1); - let event_handler = move |event: &Event| { - sender.send(handle_funding_generation_ready!(event, channel_value)).unwrap(); + let event_handler = move |event: &Event| match event { + Event::FundingGenerationReady { .. } => sender.send(handle_funding_generation_ready!(event, channel_value)).unwrap(), + Event::ChannelReady { .. } => {}, + _ => panic!("Unexpected event: {:?}", event), }; + let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone())); // Open a channel and check that the FundingGenerationReady event was handled. @@ -924,7 +1015,12 @@ mod tests { // Set up a background event handler for SpendableOutputs events. let (sender, receiver) = std::sync::mpsc::sync_channel(1); - let event_handler = move |event: &Event| sender.send(event.clone()).unwrap(); + let event_handler = move |event: &Event| match event { + Event::SpendableOutputs { .. } => sender.send(event.clone()).unwrap(), + Event::ChannelReady { .. } => {}, + Event::ChannelClosed { .. } => {}, + _ => panic!("Unexpected event: {:?}", event), + }; let persister = Arc::new(Persister::new(data_dir)); let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone())); @@ -932,12 +1028,12 @@ mod tests { nodes[0].node.force_close_broadcasting_latest_txn(&nodes[0].node.list_channels()[0].channel_id, &nodes[1].node.get_our_node_id()).unwrap(); let commitment_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap(); confirm_transaction_depth(&mut nodes[0], &commitment_tx, BREAKDOWN_TIMEOUT as u32); + let event = receiver .recv_timeout(Duration::from_secs(EVENT_DEADLINE)) - .expect("SpendableOutputs not handled within deadline"); + .expect("Events not handled within deadline"); match event { Event::SpendableOutputs { .. } => {}, - Event::ChannelClosed { .. } => {}, _ => panic!("Unexpected event: {:?}", event), } @@ -1029,8 +1125,8 @@ mod tests { // Initiate the background processors to watch each node. let data_dir = nodes[0].persister.get_data_dir(); let persister = Arc::new(Persister::new(data_dir)); - let router = DefaultRouter::new(Arc::clone(&nodes[0].network_graph), Arc::clone(&nodes[0].logger), random_seed_bytes); - let invoice_payer = Arc::new(InvoicePayer::new(Arc::clone(&nodes[0].node), router, Arc::clone(&nodes[0].scorer), Arc::clone(&nodes[0].logger), |_: &_| {}, Retry::Attempts(2))); + let router = DefaultRouter::new(Arc::clone(&nodes[0].network_graph), Arc::clone(&nodes[0].logger), random_seed_bytes, Arc::clone(&nodes[0].scorer)); + let invoice_payer = Arc::new(InvoicePayer::new(Arc::clone(&nodes[0].node), router, Arc::clone(&nodes[0].logger), |_: &_| {}, Retry::Attempts(2))); let event_handler = Arc::clone(&invoice_payer); let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone())); assert!(bg_processor.stop().is_ok());