X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning-background-processor%2Fsrc%2Flib.rs;h=b2ee865471c89f3ca1d537088c8e3425c9d4cbf8;hb=e59b3847a363afba5d30e1f1271341e3163f7591;hp=76a7cfc9c4c526f76a5c2c49ef3df35f8405ed9f;hpb=6ca49948c169a297144d5d1474a2cef3827237e1;p=rust-lightning diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index 76a7cfc9..b2ee8654 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -17,11 +17,12 @@ extern crate lightning_rapid_gossip_sync; use lightning::chain; use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator}; use lightning::chain::chainmonitor::{ChainMonitor, Persist}; -use lightning::chain::keysinterface::{Sign, KeysInterface}; +use lightning::chain::keysinterface::{EntropySource, NodeSigner, SignerProvider}; use lightning::ln::channelmanager::ChannelManager; use lightning::ln::msgs::{ChannelMessageHandler, OnionMessageHandler, RoutingMessageHandler}; use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor}; use lightning::routing::gossip::{NetworkGraph, P2PGossipSync}; +use lightning::routing::router::Router; use lightning::routing::scoring::WriteableScore; use lightning::util::events::{Event, EventHandler, EventsProvider}; use lightning::util::logger::Logger; @@ -35,7 +36,7 @@ use std::time::{Duration, Instant}; use std::ops::Deref; #[cfg(feature = "futures")] -use futures_util::{select_biased, future::FutureExt}; +use futures_util::{select_biased, future::FutureExt, task}; /// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep /// Rust-Lightning running properly, and (2) either can or should be run in the background. Its @@ -192,61 +193,34 @@ where } } -/// Decorates an [`EventHandler`] with common functionality provided by standard [`EventHandler`]s. -struct DecoratingEventHandler< - 'a, - E: EventHandler, - PGS: Deref>, - RGS: Deref>, - G: Deref>, - A: Deref, - L: Deref, -> -where A::Target: chain::Access, L::Target: Logger { - event_handler: E, - gossip_sync: &'a GossipSync, -} - -impl< - 'a, - E: EventHandler, - PGS: Deref>, - RGS: Deref>, - G: Deref>, - A: Deref, - L: Deref, -> EventHandler for DecoratingEventHandler<'a, E, PGS, RGS, G, A, L> -where A::Target: chain::Access, L::Target: Logger { - fn handle_event(&self, event: &Event) { - if let Some(network_graph) = self.gossip_sync.network_graph() { - network_graph.handle_event(event); +fn handle_network_graph_update( + network_graph: &NetworkGraph, event: &Event +) where L::Target: Logger { + if let Event::PaymentPathFailed { ref network_update, .. } = event { + if let Some(network_update) = network_update { + network_graph.handle_network_update(&network_update); } - self.event_handler.handle_event(event); } } macro_rules! define_run_body { - ($persister: ident, $event_handler: ident, $chain_monitor: ident, $channel_manager: ident, + ($persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr, + $channel_manager: ident, $process_channel_manager_events: expr, $gossip_sync: ident, $peer_manager: ident, $logger: ident, $scorer: ident, - $loop_exit_check: expr, $await: expr) + $loop_exit_check: expr, $await: expr, $get_timer: expr, $timer_elapsed: expr) => { { - let event_handler = DecoratingEventHandler { - event_handler: $event_handler, - gossip_sync: &$gossip_sync, - }; - log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup"); $channel_manager.timer_tick_occurred(); - let mut last_freshness_call = Instant::now(); - let mut last_ping_call = Instant::now(); - let mut last_prune_call = Instant::now(); - let mut last_scorer_persist_call = Instant::now(); + let mut last_freshness_call = $get_timer(FRESHNESS_TIMER); + let mut last_ping_call = $get_timer(PING_TIMER); + let mut last_prune_call = $get_timer(FIRST_NETWORK_PRUNE_TIMER); + let mut last_scorer_persist_call = $get_timer(SCORER_PERSIST_TIMER); let mut have_pruned = false; loop { - $channel_manager.process_pending_events(&event_handler); - $chain_monitor.process_pending_events(&event_handler); + $process_channel_manager_events; + $process_chain_monitor_events; // Note that the PeerManager::process_events may block on ChannelManager's locks, // hence it comes last here. When the ChannelManager finishes whatever it's doing, @@ -263,9 +237,9 @@ macro_rules! define_run_body { // We wait up to 100ms, but track how long it takes to detect being put to sleep, // see `await_start`'s use below. - let await_start = Instant::now(); + let mut await_start = $get_timer(1); let updates_available = $await; - let await_time = await_start.elapsed(); + let await_slow = $timer_elapsed(&mut await_start, 1); if updates_available { log_trace!($logger, "Persisting ChannelManager..."); @@ -277,12 +251,12 @@ macro_rules! define_run_body { log_trace!($logger, "Terminating background processor."); break; } - if last_freshness_call.elapsed().as_secs() > FRESHNESS_TIMER { + if $timer_elapsed(&mut last_freshness_call, FRESHNESS_TIMER) { log_trace!($logger, "Calling ChannelManager's timer_tick_occurred"); $channel_manager.timer_tick_occurred(); - last_freshness_call = Instant::now(); + last_freshness_call = $get_timer(FRESHNESS_TIMER); } - if await_time > Duration::from_secs(1) { + if await_slow { // On various platforms, we may be starved of CPU cycles for several reasons. // E.g. on iOS, if we've been in the background, we will be entirely paused. // Similarly, if we're on a desktop platform and the device has been asleep, we @@ -297,42 +271,40 @@ macro_rules! define_run_body { // peers. log_trace!($logger, "100ms sleep took more than a second, disconnecting peers."); $peer_manager.disconnect_all_peers(); - last_ping_call = Instant::now(); - } else if last_ping_call.elapsed().as_secs() > PING_TIMER { + last_ping_call = $get_timer(PING_TIMER); + } else if $timer_elapsed(&mut last_ping_call, PING_TIMER) { log_trace!($logger, "Calling PeerManager's timer_tick_occurred"); $peer_manager.timer_tick_occurred(); - last_ping_call = Instant::now(); + last_ping_call = $get_timer(PING_TIMER); } // Note that we want to run a graph prune once not long after startup before // falling back to our usual hourly prunes. This avoids short-lived clients never // pruning their network graph. We run once 60 seconds after startup before // continuing our normal cadence. - if last_prune_call.elapsed().as_secs() > if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER } { + if $timer_elapsed(&mut last_prune_call, if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER }) { // The network graph must not be pruned while rapid sync completion is pending - log_trace!($logger, "Assessing prunability of network graph"); if let Some(network_graph) = $gossip_sync.prunable_network_graph() { + log_trace!($logger, "Pruning and persisting network graph."); network_graph.remove_stale_channels_and_tracking(); if let Err(e) = $persister.persist_graph(network_graph) { log_error!($logger, "Error: Failed to persist network graph, check your disk and permissions {}", e) } - last_prune_call = Instant::now(); + last_prune_call = $get_timer(NETWORK_PRUNE_TIMER); have_pruned = true; - } else { - log_trace!($logger, "Not pruning network graph, either due to pending rapid gossip sync or absence of a prunable graph."); } } - if last_scorer_persist_call.elapsed().as_secs() > SCORER_PERSIST_TIMER { + if $timer_elapsed(&mut last_scorer_persist_call, SCORER_PERSIST_TIMER) { if let Some(ref scorer) = $scorer { log_trace!($logger, "Persisting scorer"); if let Err(e) = $persister.persist_scorer(&scorer) { log_error!($logger, "Error: Failed to persist scorer, check your disk and permissions {}", e) } } - last_scorer_persist_call = Instant::now(); + last_scorer_persist_call = $get_timer(SCORER_PERSIST_TIMER); } } @@ -358,20 +330,22 @@ macro_rules! define_run_body { /// Processes background events in a future. /// /// `sleeper` should return a future which completes in the given amount of time and returns a -/// boolean indicating whether the background processing should continue. Once `sleeper` returns a -/// future which outputs false, the loop will exit and this function's future will complete. +/// boolean indicating whether the background processing should exit. Once `sleeper` returns a +/// future which outputs true, the loop will exit and this function's future will complete. /// /// See [`BackgroundProcessor::start`] for information on which actions this handles. #[cfg(feature = "futures")] pub async fn process_events_async< 'a, - Signer: 'static + Sign, CA: 'static + Deref + Send + Sync, CF: 'static + Deref + Send + Sync, CW: 'static + Deref + Send + Sync, T: 'static + Deref + Send + Sync, - K: 'static + Deref + Send + Sync, + ES: 'static + Deref + Send + Sync, + NS: 'static + Deref + Send + Sync, + SP: 'static + Deref + Send + Sync, F: 'static + Deref + Send + Sync, + R: 'static + Deref + Send + Sync, G: 'static + Deref> + Send + Sync, L: 'static + Deref + Send + Sync, P: 'static + Deref + Send + Sync, @@ -379,48 +353,69 @@ pub async fn process_events_async< CMH: 'static + Deref + Send + Sync, RMH: 'static + Deref + Send + Sync, OMH: 'static + Deref + Send + Sync, - EH: 'static + EventHandler + Send, + EventHandlerFuture: core::future::Future, + EventHandler: Fn(Event) -> EventHandlerFuture, PS: 'static + Deref + Send, - M: 'static + Deref> + Send + Sync, - CM: 'static + Deref> + Send + Sync, + M: 'static + Deref::Signer, CF, T, F, L, P>> + Send + Sync, + CM: 'static + Deref> + Send + Sync, PGS: 'static + Deref> + Send + Sync, RGS: 'static + Deref> + Send, UMH: 'static + Deref + Send + Sync, PM: 'static + Deref> + Send + Sync, S: 'static + Deref + Send + Sync, SC: WriteableScore<'a>, - SleepFuture: core::future::Future, + SleepFuture: core::future::Future + core::marker::Unpin, Sleeper: Fn(Duration) -> SleepFuture >( - persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM, + persister: PS, event_handler: EventHandler, chain_monitor: M, channel_manager: CM, gossip_sync: GossipSync, peer_manager: PM, logger: L, scorer: Option, sleeper: Sleeper, ) -> Result<(), std::io::Error> where CA::Target: 'static + chain::Access, CF::Target: 'static + chain::Filter, - CW::Target: 'static + chain::Watch, + CW::Target: 'static + chain::Watch<::Signer>, T::Target: 'static + BroadcasterInterface, - K::Target: 'static + KeysInterface, + ES::Target: 'static + EntropySource, + NS::Target: 'static + NodeSigner, + SP::Target: 'static + SignerProvider, F::Target: 'static + FeeEstimator, + R::Target: 'static + Router, L::Target: 'static + Logger, - P::Target: 'static + Persist, + P::Target: 'static + Persist<::Signer>, CMH::Target: 'static + ChannelMessageHandler, OMH::Target: 'static + OnionMessageHandler, RMH::Target: 'static + RoutingMessageHandler, UMH::Target: 'static + CustomMessageHandler, - PS::Target: 'static + Persister<'a, Signer, CW, T, K, F, L, SC>, + PS::Target: 'static + Persister<'a, CW, T, ES, NS, SP, F, R, L, SC>, { - let mut should_continue = true; - define_run_body!(persister, event_handler, chain_monitor, channel_manager, - gossip_sync, peer_manager, logger, scorer, should_continue, { + let mut should_break = true; + let async_event_handler = |event| { + let network_graph = gossip_sync.network_graph(); + let event_handler = &event_handler; + async move { + if let Some(network_graph) = network_graph { + handle_network_graph_update(network_graph, &event) + } + event_handler(event).await; + } + }; + define_run_body!(persister, + chain_monitor, chain_monitor.process_pending_events_async(async_event_handler).await, + channel_manager, channel_manager.process_pending_events_async(async_event_handler).await, + gossip_sync, peer_manager, logger, scorer, should_break, { select_biased! { _ = channel_manager.get_persistable_update_future().fuse() => true, - cont = sleeper(Duration::from_millis(100)).fuse() => { - should_continue = cont; + exit = sleeper(Duration::from_millis(100)).fuse() => { + should_break = exit; false } } + }, |t| sleeper(Duration::from_secs(t)), + |fut: &mut SleepFuture, _| { + let mut waker = task::noop_waker(); + let mut ctx = task::Context::from_waker(&mut waker); + core::pin::Pin::new(fut).poll(&mut ctx).is_ready() }) } @@ -471,13 +466,15 @@ impl BackgroundProcessor { /// [`NetworkGraph::write`]: lightning::routing::gossip::NetworkGraph#impl-Writeable pub fn start< 'a, - Signer: 'static + Sign, CA: 'static + Deref + Send + Sync, CF: 'static + Deref + Send + Sync, CW: 'static + Deref + Send + Sync, T: 'static + Deref + Send + Sync, - K: 'static + Deref + Send + Sync, + ES: 'static + Deref + Send + Sync, + NS: 'static + Deref + Send + Sync, + SP: 'static + Deref + Send + Sync, F: 'static + Deref + Send + Sync, + R: 'static + Deref + Send + Sync, G: 'static + Deref> + Send + Sync, L: 'static + Deref + Send + Sync, P: 'static + Deref + Send + Sync, @@ -487,8 +484,8 @@ impl BackgroundProcessor { RMH: 'static + Deref + Send + Sync, EH: 'static + EventHandler + Send, PS: 'static + Deref + Send, - M: 'static + Deref> + Send + Sync, - CM: 'static + Deref> + Send + Sync, + M: 'static + Deref::Signer, CF, T, F, L, P>> + Send + Sync, + CM: 'static + Deref> + Send + Sync, PGS: 'static + Deref> + Send + Sync, RGS: 'static + Deref> + Send, UMH: 'static + Deref + Send + Sync, @@ -502,24 +499,36 @@ impl BackgroundProcessor { where CA::Target: 'static + chain::Access, CF::Target: 'static + chain::Filter, - CW::Target: 'static + chain::Watch, + CW::Target: 'static + chain::Watch<::Signer>, T::Target: 'static + BroadcasterInterface, - K::Target: 'static + KeysInterface, + ES::Target: 'static + EntropySource, + NS::Target: 'static + NodeSigner, + SP::Target: 'static + SignerProvider, F::Target: 'static + FeeEstimator, + R::Target: 'static + Router, L::Target: 'static + Logger, - P::Target: 'static + Persist, + P::Target: 'static + Persist<::Signer>, CMH::Target: 'static + ChannelMessageHandler, OMH::Target: 'static + OnionMessageHandler, RMH::Target: 'static + RoutingMessageHandler, UMH::Target: 'static + CustomMessageHandler, - PS::Target: 'static + Persister<'a, Signer, CW, T, K, F, L, SC>, + PS::Target: 'static + Persister<'a, CW, T, ES, NS, SP, F, R, L, SC>, { let stop_thread = Arc::new(AtomicBool::new(false)); let stop_thread_clone = stop_thread.clone(); let handle = thread::spawn(move || -> Result<(), std::io::Error> { - define_run_body!(persister, event_handler, chain_monitor, channel_manager, + let event_handler = |event| { + let network_graph = gossip_sync.network_graph(); + if let Some(network_graph) = network_graph { + handle_network_graph_update(network_graph, &event) + } + event_handler.handle_event(event); + }; + define_run_body!(persister, chain_monitor, chain_monitor.process_pending_events(&event_handler), + channel_manager, channel_manager.process_pending_events(&event_handler), gossip_sync, peer_manager, logger, scorer, stop_thread.load(Ordering::Acquire), - channel_manager.await_persistable_update_timeout(Duration::from_millis(100))) + channel_manager.await_persistable_update_timeout(Duration::from_millis(100)), + |_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur) }); Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) } } @@ -580,21 +589,22 @@ mod tests { use bitcoin::network::constants::Network; use lightning::chain::{BestBlock, Confirm, chainmonitor}; use lightning::chain::channelmonitor::ANTI_REORG_DELAY; - use lightning::chain::keysinterface::{InMemorySigner, Recipient, KeysInterface, KeysManager}; + use lightning::chain::keysinterface::{InMemorySigner, Recipient, EntropySource, KeysManager, NodeSigner}; use lightning::chain::transaction::OutPoint; use lightning::get_event_msg; - use lightning::ln::channelmanager::{self, BREAKDOWN_TIMEOUT, ChainParameters, ChannelManager, SimpleArcChannelManager}; + use lightning::ln::channelmanager::{BREAKDOWN_TIMEOUT, ChainParameters, ChannelManager, SimpleArcChannelManager}; use lightning::ln::features::ChannelFeatures; use lightning::ln::msgs::{ChannelMessageHandler, Init}; use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler}; use lightning::routing::gossip::{NetworkGraph, P2PGossipSync}; + use lightning::routing::router::DefaultRouter; + use lightning::routing::scoring::{ProbabilisticScoringParameters, ProbabilisticScorer}; use lightning::util::config::UserConfig; use lightning::util::events::{Event, MessageSendEventsProvider, MessageSendEvent}; use lightning::util::ser::Writeable; use lightning::util::test_utils; use lightning::util::persist::KVStorePersister; use lightning_invoice::payment::{InvoicePayer, Retry}; - use lightning_invoice::utils::DefaultRouter; use lightning_persister::FilesystemPersister; use std::fs; use std::path::PathBuf; @@ -603,7 +613,6 @@ mod tests { use std::time::Duration; use bitcoin::hashes::Hash; use bitcoin::TxMerkleNode; - use lightning::routing::scoring::{FixedPenaltyScorer}; use lightning_rapid_gossip_sync::RapidGossipSync; use super::{BackgroundProcessor, GossipSync, FRESHNESS_TIMER}; @@ -635,7 +644,7 @@ mod tests { network_graph: Arc>>, logger: Arc, best_block: BestBlock, - scorer: Arc>, + scorer: Arc>>, Arc>>>, } impl Node { @@ -732,32 +741,34 @@ mod tests { for i in 0..num_nodes { let tx_broadcaster = Arc::new(test_utils::TestBroadcaster{txn_broadcasted: Mutex::new(Vec::new()), blocks: Arc::new(Mutex::new(Vec::new()))}); let fee_estimator = Arc::new(test_utils::TestFeeEstimator { sat_per_kw: Mutex::new(253) }); - let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Testnet)); let logger = Arc::new(test_utils::TestLogger::with_id(format!("node {}", i))); - let persister = Arc::new(FilesystemPersister::new(format!("{}_persister_{}", persist_dir, i))); - let seed = [i as u8; 32]; let network = Network::Testnet; let genesis_block = genesis_block(network); + let network_graph = Arc::new(NetworkGraph::new(genesis_block.header.block_hash(), logger.clone())); + let params = ProbabilisticScoringParameters::default(); + let scorer = Arc::new(Mutex::new(ProbabilisticScorer::new(params, network_graph.clone(), logger.clone()))); + let seed = [i as u8; 32]; + let router = Arc::new(DefaultRouter::new(network_graph.clone(), logger.clone(), seed, scorer.clone())); + let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Testnet)); + let persister = Arc::new(FilesystemPersister::new(format!("{}_persister_{}", persist_dir, i))); let now = Duration::from_secs(genesis_block.header.time as u64); let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos())); let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(Some(chain_source.clone()), tx_broadcaster.clone(), logger.clone(), fee_estimator.clone(), persister.clone())); let best_block = BestBlock::from_genesis(network); let params = ChainParameters { network, best_block }; - let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster.clone(), logger.clone(), keys_manager.clone(), UserConfig::default(), params)); - let network_graph = Arc::new(NetworkGraph::new(genesis_block.header.block_hash(), logger.clone())); + let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster.clone(), router.clone(), logger.clone(), keys_manager.clone(), keys_manager.clone(), keys_manager.clone(), UserConfig::default(), params)); let p2p_gossip_sync = Arc::new(P2PGossipSync::new(network_graph.clone(), Some(chain_source.clone()), logger.clone())); let rapid_gossip_sync = Arc::new(RapidGossipSync::new(network_graph.clone())); let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new()), onion_message_handler: IgnoringMessageHandler{}}; let peer_manager = Arc::new(PeerManager::new(msg_handler, keys_manager.get_node_secret(Recipient::Node).unwrap(), 0, &seed, logger.clone(), IgnoringMessageHandler{})); - let scorer = Arc::new(Mutex::new(test_utils::TestScorer::with_penalty(0))); let node = Node { node: manager, p2p_gossip_sync, rapid_gossip_sync, peer_manager, chain_monitor, persister, tx_broadcaster, network_graph, logger, best_block, scorer }; nodes.push(node); } for i in 0..num_nodes { for j in (i+1)..num_nodes { - nodes[i].node.peer_connected(&nodes[j].node.get_our_node_id(), &Init { features: channelmanager::provided_init_features(), remote_network_address: None }).unwrap(); - nodes[j].node.peer_connected(&nodes[i].node.get_our_node_id(), &Init { features: channelmanager::provided_init_features(), remote_network_address: None }).unwrap(); + nodes[i].node.peer_connected(&nodes[j].node.get_our_node_id(), &Init { features: nodes[j].node.init_features(), remote_network_address: None }).unwrap(); + nodes[j].node.peer_connected(&nodes[i].node.get_our_node_id(), &Init { features: nodes[i].node.init_features(), remote_network_address: None }).unwrap(); } } @@ -769,7 +780,7 @@ mod tests { begin_open_channel!($node_a, $node_b, $channel_value); let events = $node_a.node.get_and_clear_pending_events(); assert_eq!(events.len(), 1); - let (temporary_channel_id, tx) = handle_funding_generation_ready!(&events[0], $channel_value); + let (temporary_channel_id, tx) = handle_funding_generation_ready!(events[0], $channel_value); end_open_channel!($node_a, $node_b, temporary_channel_id, tx); tx }} @@ -778,15 +789,15 @@ mod tests { macro_rules! begin_open_channel { ($node_a: expr, $node_b: expr, $channel_value: expr) => {{ $node_a.node.create_channel($node_b.node.get_our_node_id(), $channel_value, 100, 42, None).unwrap(); - $node_b.node.handle_open_channel(&$node_a.node.get_our_node_id(), channelmanager::provided_init_features(), &get_event_msg!($node_a, MessageSendEvent::SendOpenChannel, $node_b.node.get_our_node_id())); - $node_a.node.handle_accept_channel(&$node_b.node.get_our_node_id(), channelmanager::provided_init_features(), &get_event_msg!($node_b, MessageSendEvent::SendAcceptChannel, $node_a.node.get_our_node_id())); + $node_b.node.handle_open_channel(&$node_a.node.get_our_node_id(), $node_a.node.init_features(), &get_event_msg!($node_a, MessageSendEvent::SendOpenChannel, $node_b.node.get_our_node_id())); + $node_a.node.handle_accept_channel(&$node_b.node.get_our_node_id(), $node_b.node.init_features(), &get_event_msg!($node_b, MessageSendEvent::SendAcceptChannel, $node_a.node.get_our_node_id())); }} } macro_rules! handle_funding_generation_ready { ($event: expr, $channel_value: expr) => {{ match $event { - &Event::FundingGenerationReady { temporary_channel_id, channel_value_satoshis, ref output_script, user_channel_id, .. } => { + Event::FundingGenerationReady { temporary_channel_id, channel_value_satoshis, ref output_script, user_channel_id, .. } => { assert_eq!(channel_value_satoshis, $channel_value); assert_eq!(user_channel_id, 42); @@ -847,7 +858,7 @@ mod tests { // Initiate the background processors to watch each node. let data_dir = nodes[0].persister.get_data_dir(); let persister = Arc::new(Persister::new(data_dir)); - let event_handler = |_: &_| {}; + let event_handler = |_: _| {}; let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone())); macro_rules! check_persisted_data { @@ -909,7 +920,7 @@ mod tests { let nodes = create_nodes(1, "test_timer_tick_called".to_string()); let data_dir = nodes[0].persister.get_data_dir(); let persister = Arc::new(Persister::new(data_dir)); - let event_handler = |_: &_| {}; + let event_handler = |_: _| {}; let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone())); loop { let log_entries = nodes[0].logger.lines.lock().unwrap(); @@ -932,7 +943,7 @@ mod tests { let data_dir = nodes[0].persister.get_data_dir(); let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test")); - let event_handler = |_: &_| {}; + let event_handler = |_: _| {}; let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone())); match bg_processor.join() { Ok(_) => panic!("Expected error persisting manager"), @@ -949,7 +960,7 @@ mod tests { let nodes = create_nodes(2, "test_persist_network_graph_error".to_string()); let data_dir = nodes[0].persister.get_data_dir(); let persister = Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test")); - let event_handler = |_: &_| {}; + let event_handler = |_: _| {}; let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone())); match bg_processor.stop() { @@ -967,7 +978,7 @@ mod tests { let nodes = create_nodes(2, "test_persist_scorer_error".to_string()); let data_dir = nodes[0].persister.get_data_dir(); let persister = Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test")); - let event_handler = |_: &_| {}; + let event_handler = |_: _| {}; let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone())); match bg_processor.stop() { @@ -988,9 +999,12 @@ mod tests { // Set up a background event handler for FundingGenerationReady events. let (sender, receiver) = std::sync::mpsc::sync_channel(1); - let event_handler = move |event: &Event| { - sender.send(handle_funding_generation_ready!(event, channel_value)).unwrap(); + let event_handler = move |event: Event| match event { + Event::FundingGenerationReady { .. } => sender.send(handle_funding_generation_ready!(event, channel_value)).unwrap(), + Event::ChannelReady { .. } => {}, + _ => panic!("Unexpected event: {:?}", event), }; + let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone())); // Open a channel and check that the FundingGenerationReady event was handled. @@ -1014,7 +1028,12 @@ mod tests { // Set up a background event handler for SpendableOutputs events. let (sender, receiver) = std::sync::mpsc::sync_channel(1); - let event_handler = move |event: &Event| sender.send(event.clone()).unwrap(); + let event_handler = move |event: Event| match event { + Event::SpendableOutputs { .. } => sender.send(event.clone()).unwrap(), + Event::ChannelReady { .. } => {}, + Event::ChannelClosed { .. } => {}, + _ => panic!("Unexpected event: {:?}", event), + }; let persister = Arc::new(Persister::new(data_dir)); let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone())); @@ -1022,12 +1041,12 @@ mod tests { nodes[0].node.force_close_broadcasting_latest_txn(&nodes[0].node.list_channels()[0].channel_id, &nodes[1].node.get_our_node_id()).unwrap(); let commitment_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap(); confirm_transaction_depth(&mut nodes[0], &commitment_tx, BREAKDOWN_TIMEOUT as u32); + let event = receiver .recv_timeout(Duration::from_secs(EVENT_DEADLINE)) - .expect("SpendableOutputs not handled within deadline"); + .expect("Events not handled within deadline"); match event { Event::SpendableOutputs { .. } => {}, - Event::ChannelClosed { .. } => {}, _ => panic!("Unexpected event: {:?}", event), } @@ -1039,7 +1058,7 @@ mod tests { let nodes = create_nodes(2, "test_scorer_persistence".to_string()); let data_dir = nodes[0].persister.get_data_dir(); let persister = Arc::new(Persister::new(data_dir)); - let event_handler = |_: &_| {}; + let event_handler = |_: _| {}; let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone())); loop { @@ -1067,15 +1086,16 @@ mod tests { assert!(original_graph_description.contains("42: features: 0000, node_one:")); assert_eq!(network_graph.read_only().channels().len(), 1); - let event_handler = |_: &_| {}; + let event_handler = |_: _| {}; let background_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone())); loop { let log_entries = nodes[0].logger.lines.lock().unwrap(); - let expected_log_a = "Assessing prunability of network graph".to_string(); - let expected_log_b = "Not pruning network graph, either due to pending rapid gossip sync or absence of a prunable graph.".to_string(); - if log_entries.get(&("lightning_background_processor".to_string(), expected_log_a)).is_some() && - log_entries.get(&("lightning_background_processor".to_string(), expected_log_b)).is_some() { + let loop_counter = "Calling ChannelManager's timer_tick_occurred".to_string(); + if *log_entries.get(&("lightning_background_processor".to_string(), loop_counter)) + .unwrap_or(&0) > 1 + { + // Wait until the loop has gone around at least twice. break } } @@ -1120,7 +1140,7 @@ mod tests { let data_dir = nodes[0].persister.get_data_dir(); let persister = Arc::new(Persister::new(data_dir)); let router = DefaultRouter::new(Arc::clone(&nodes[0].network_graph), Arc::clone(&nodes[0].logger), random_seed_bytes, Arc::clone(&nodes[0].scorer)); - let invoice_payer = Arc::new(InvoicePayer::new(Arc::clone(&nodes[0].node), router, Arc::clone(&nodes[0].logger), |_: &_| {}, Retry::Attempts(2))); + let invoice_payer = Arc::new(InvoicePayer::new(Arc::clone(&nodes[0].node), router, Arc::clone(&nodes[0].logger), |_: _| {}, Retry::Attempts(2))); let event_handler = Arc::clone(&invoice_payer); let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone())); assert!(bg_processor.stop().is_ok());