Use the user-provided `SleepFuture` for interval checks in BP
[rust-lightning] / lightning-background-processor / src / lib.rs
index ec7d29256e62d751f5bf6633139c6c97d888c830..b2ee865471c89f3ca1d537088c8e3425c9d4cbf8 100644 (file)
@@ -17,11 +17,12 @@ extern crate lightning_rapid_gossip_sync;
 use lightning::chain;
 use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
 use lightning::chain::chainmonitor::{ChainMonitor, Persist};
-use lightning::chain::keysinterface::{Sign, KeysInterface};
+use lightning::chain::keysinterface::{EntropySource, NodeSigner, SignerProvider};
 use lightning::ln::channelmanager::ChannelManager;
 use lightning::ln::msgs::{ChannelMessageHandler, OnionMessageHandler, RoutingMessageHandler};
 use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor};
 use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
+use lightning::routing::router::Router;
 use lightning::routing::scoring::WriteableScore;
 use lightning::util::events::{Event, EventHandler, EventsProvider};
 use lightning::util::logger::Logger;
@@ -35,7 +36,7 @@ use std::time::{Duration, Instant};
 use std::ops::Deref;
 
 #[cfg(feature = "futures")]
-use futures_util::{select_biased, future::FutureExt};
+use futures_util::{select_biased, future::FutureExt, task};
 
 /// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep
 /// Rust-Lightning running properly, and (2) either can or should be run in the background. Its
@@ -206,15 +207,15 @@ macro_rules! define_run_body {
        ($persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr,
         $channel_manager: ident, $process_channel_manager_events: expr,
         $gossip_sync: ident, $peer_manager: ident, $logger: ident, $scorer: ident,
-        $loop_exit_check: expr, $await: expr)
+        $loop_exit_check: expr, $await: expr, $get_timer: expr, $timer_elapsed: expr)
        => { {
                log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup");
                $channel_manager.timer_tick_occurred();
 
-               let mut last_freshness_call = Instant::now();
-               let mut last_ping_call = Instant::now();
-               let mut last_prune_call = Instant::now();
-               let mut last_scorer_persist_call = Instant::now();
+               let mut last_freshness_call = $get_timer(FRESHNESS_TIMER);
+               let mut last_ping_call = $get_timer(PING_TIMER);
+               let mut last_prune_call = $get_timer(FIRST_NETWORK_PRUNE_TIMER);
+               let mut last_scorer_persist_call = $get_timer(SCORER_PERSIST_TIMER);
                let mut have_pruned = false;
 
                loop {
@@ -236,9 +237,9 @@ macro_rules! define_run_body {
 
                        // We wait up to 100ms, but track how long it takes to detect being put to sleep,
                        // see `await_start`'s use below.
-                       let await_start = Instant::now();
+                       let mut await_start = $get_timer(1);
                        let updates_available = $await;
-                       let await_time = await_start.elapsed();
+                       let await_slow = $timer_elapsed(&mut await_start, 1);
 
                        if updates_available {
                                log_trace!($logger, "Persisting ChannelManager...");
@@ -250,12 +251,12 @@ macro_rules! define_run_body {
                                log_trace!($logger, "Terminating background processor.");
                                break;
                        }
-                       if last_freshness_call.elapsed().as_secs() > FRESHNESS_TIMER {
+                       if $timer_elapsed(&mut last_freshness_call, FRESHNESS_TIMER) {
                                log_trace!($logger, "Calling ChannelManager's timer_tick_occurred");
                                $channel_manager.timer_tick_occurred();
-                               last_freshness_call = Instant::now();
+                               last_freshness_call = $get_timer(FRESHNESS_TIMER);
                        }
-                       if await_time > Duration::from_secs(1) {
+                       if await_slow {
                                // On various platforms, we may be starved of CPU cycles for several reasons.
                                // E.g. on iOS, if we've been in the background, we will be entirely paused.
                                // Similarly, if we're on a desktop platform and the device has been asleep, we
@@ -270,42 +271,40 @@ macro_rules! define_run_body {
                                // peers.
                                log_trace!($logger, "100ms sleep took more than a second, disconnecting peers.");
                                $peer_manager.disconnect_all_peers();
-                               last_ping_call = Instant::now();
-                       } else if last_ping_call.elapsed().as_secs() > PING_TIMER {
+                               last_ping_call = $get_timer(PING_TIMER);
+                       } else if $timer_elapsed(&mut last_ping_call, PING_TIMER) {
                                log_trace!($logger, "Calling PeerManager's timer_tick_occurred");
                                $peer_manager.timer_tick_occurred();
-                               last_ping_call = Instant::now();
+                               last_ping_call = $get_timer(PING_TIMER);
                        }
 
                        // Note that we want to run a graph prune once not long after startup before
                        // falling back to our usual hourly prunes. This avoids short-lived clients never
                        // pruning their network graph. We run once 60 seconds after startup before
                        // continuing our normal cadence.
-                       if last_prune_call.elapsed().as_secs() > if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER } {
+                       if $timer_elapsed(&mut last_prune_call, if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER }) {
                                // The network graph must not be pruned while rapid sync completion is pending
-                               log_trace!($logger, "Assessing prunability of network graph");
                                if let Some(network_graph) = $gossip_sync.prunable_network_graph() {
+                                       log_trace!($logger, "Pruning and persisting network graph.");
                                        network_graph.remove_stale_channels_and_tracking();
 
                                        if let Err(e) = $persister.persist_graph(network_graph) {
                                                log_error!($logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
                                        }
 
-                                       last_prune_call = Instant::now();
+                                       last_prune_call = $get_timer(NETWORK_PRUNE_TIMER);
                                        have_pruned = true;
-                               } else {
-                                       log_trace!($logger, "Not pruning network graph, either due to pending rapid gossip sync or absence of a prunable graph.");
                                }
                        }
 
-                       if last_scorer_persist_call.elapsed().as_secs() > SCORER_PERSIST_TIMER {
+                       if $timer_elapsed(&mut last_scorer_persist_call, SCORER_PERSIST_TIMER) {
                                if let Some(ref scorer) = $scorer {
                                        log_trace!($logger, "Persisting scorer");
                                        if let Err(e) = $persister.persist_scorer(&scorer) {
                                                log_error!($logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
                                        }
                                }
-                               last_scorer_persist_call = Instant::now();
+                               last_scorer_persist_call = $get_timer(SCORER_PERSIST_TIMER);
                        }
                }
 
@@ -338,13 +337,15 @@ macro_rules! define_run_body {
 #[cfg(feature = "futures")]
 pub async fn process_events_async<
        'a,
-       Signer: 'static + Sign,
        CA: 'static + Deref + Send + Sync,
        CF: 'static + Deref + Send + Sync,
        CW: 'static + Deref + Send + Sync,
        T: 'static + Deref + Send + Sync,
-       K: 'static + Deref + Send + Sync,
+       ES: 'static + Deref + Send + Sync,
+       NS: 'static + Deref + Send + Sync,
+       SP: 'static + Deref + Send + Sync,
        F: 'static + Deref + Send + Sync,
+       R: 'static + Deref + Send + Sync,
        G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
        L: 'static + Deref + Send + Sync,
        P: 'static + Deref + Send + Sync,
@@ -355,15 +356,15 @@ pub async fn process_events_async<
        EventHandlerFuture: core::future::Future<Output = ()>,
        EventHandler: Fn(Event) -> EventHandlerFuture,
        PS: 'static + Deref + Send,
-       M: 'static + Deref<Target = ChainMonitor<Signer, CF, T, F, L, P>> + Send + Sync,
-       CM: 'static + Deref<Target = ChannelManager<CW, T, K, F, L>> + Send + Sync,
+       M: 'static + Deref<Target = ChainMonitor<<SP::Target as SignerProvider>::Signer, CF, T, F, L, P>> + Send + Sync,
+       CM: 'static + Deref<Target = ChannelManager<CW, T, ES, NS, SP, F, R, L>> + Send + Sync,
        PGS: 'static + Deref<Target = P2PGossipSync<G, CA, L>> + Send + Sync,
        RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
        UMH: 'static + Deref + Send + Sync,
        PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH>> + Send + Sync,
        S: 'static + Deref<Target = SC> + Send + Sync,
        SC: WriteableScore<'a>,
-       SleepFuture: core::future::Future<Output = bool>,
+       SleepFuture: core::future::Future<Output = bool> + core::marker::Unpin,
        Sleeper: Fn(Duration) -> SleepFuture
 >(
        persister: PS, event_handler: EventHandler, chain_monitor: M, channel_manager: CM,
@@ -373,17 +374,20 @@ pub async fn process_events_async<
 where
        CA::Target: 'static + chain::Access,
        CF::Target: 'static + chain::Filter,
-       CW::Target: 'static + chain::Watch<Signer>,
+       CW::Target: 'static + chain::Watch<<SP::Target as SignerProvider>::Signer>,
        T::Target: 'static + BroadcasterInterface,
-       K::Target: 'static + KeysInterface<Signer = Signer>,
+       ES::Target: 'static + EntropySource,
+       NS::Target: 'static + NodeSigner,
+       SP::Target: 'static + SignerProvider,
        F::Target: 'static + FeeEstimator,
+       R::Target: 'static + Router,
        L::Target: 'static + Logger,
-       P::Target: 'static + Persist<Signer>,
+       P::Target: 'static + Persist<<SP::Target as SignerProvider>::Signer>,
        CMH::Target: 'static + ChannelMessageHandler,
        OMH::Target: 'static + OnionMessageHandler,
        RMH::Target: 'static + RoutingMessageHandler,
        UMH::Target: 'static + CustomMessageHandler,
-       PS::Target: 'static + Persister<'a, Signer, CW, T, K, F, L, SC>,
+       PS::Target: 'static + Persister<'a, CW, T, ES, NS, SP, F, R, L, SC>,
 {
        let mut should_break = true;
        let async_event_handler = |event| {
@@ -407,6 +411,11 @@ where
                                        false
                                }
                        }
+               }, |t| sleeper(Duration::from_secs(t)),
+               |fut: &mut SleepFuture, _| {
+                       let mut waker = task::noop_waker();
+                       let mut ctx = task::Context::from_waker(&mut waker);
+                       core::pin::Pin::new(fut).poll(&mut ctx).is_ready()
                })
 }
 
@@ -457,13 +466,15 @@ impl BackgroundProcessor {
        /// [`NetworkGraph::write`]: lightning::routing::gossip::NetworkGraph#impl-Writeable
        pub fn start<
                'a,
-               Signer: 'static + Sign,
                CA: 'static + Deref + Send + Sync,
                CF: 'static + Deref + Send + Sync,
                CW: 'static + Deref + Send + Sync,
                T: 'static + Deref + Send + Sync,
-               K: 'static + Deref + Send + Sync,
+               ES: 'static + Deref + Send + Sync,
+               NS: 'static + Deref + Send + Sync,
+               SP: 'static + Deref + Send + Sync,
                F: 'static + Deref + Send + Sync,
+               R: 'static + Deref + Send + Sync,
                G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
                L: 'static + Deref + Send + Sync,
                P: 'static + Deref + Send + Sync,
@@ -473,8 +484,8 @@ impl BackgroundProcessor {
                RMH: 'static + Deref + Send + Sync,
                EH: 'static + EventHandler + Send,
                PS: 'static + Deref + Send,
-               M: 'static + Deref<Target = ChainMonitor<Signer, CF, T, F, L, P>> + Send + Sync,
-               CM: 'static + Deref<Target = ChannelManager<CW, T, K, F, L>> + Send + Sync,
+               M: 'static + Deref<Target = ChainMonitor<<SP::Target as SignerProvider>::Signer, CF, T, F, L, P>> + Send + Sync,
+               CM: 'static + Deref<Target = ChannelManager<CW, T, ES, NS, SP, F, R, L>> + Send + Sync,
                PGS: 'static + Deref<Target = P2PGossipSync<G, CA, L>> + Send + Sync,
                RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
                UMH: 'static + Deref + Send + Sync,
@@ -488,17 +499,20 @@ impl BackgroundProcessor {
        where
                CA::Target: 'static + chain::Access,
                CF::Target: 'static + chain::Filter,
-               CW::Target: 'static + chain::Watch<Signer>,
+               CW::Target: 'static + chain::Watch<<SP::Target as SignerProvider>::Signer>,
                T::Target: 'static + BroadcasterInterface,
-               K::Target: 'static + KeysInterface<Signer = Signer>,
+               ES::Target: 'static + EntropySource,
+               NS::Target: 'static + NodeSigner,
+               SP::Target: 'static + SignerProvider,
                F::Target: 'static + FeeEstimator,
+               R::Target: 'static + Router,
                L::Target: 'static + Logger,
-               P::Target: 'static + Persist<Signer>,
+               P::Target: 'static + Persist<<SP::Target as SignerProvider>::Signer>,
                CMH::Target: 'static + ChannelMessageHandler,
                OMH::Target: 'static + OnionMessageHandler,
                RMH::Target: 'static + RoutingMessageHandler,
                UMH::Target: 'static + CustomMessageHandler,
-               PS::Target: 'static + Persister<'a, Signer, CW, T, K, F, L, SC>,
+               PS::Target: 'static + Persister<'a, CW, T, ES, NS, SP, F, R, L, SC>,
        {
                let stop_thread = Arc::new(AtomicBool::new(false));
                let stop_thread_clone = stop_thread.clone();
@@ -513,7 +527,8 @@ impl BackgroundProcessor {
                        define_run_body!(persister, chain_monitor, chain_monitor.process_pending_events(&event_handler),
                                channel_manager, channel_manager.process_pending_events(&event_handler),
                                gossip_sync, peer_manager, logger, scorer, stop_thread.load(Ordering::Acquire),
-                               channel_manager.await_persistable_update_timeout(Duration::from_millis(100)))
+                               channel_manager.await_persistable_update_timeout(Duration::from_millis(100)),
+                               |_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur)
                });
                Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
        }
@@ -574,21 +589,22 @@ mod tests {
        use bitcoin::network::constants::Network;
        use lightning::chain::{BestBlock, Confirm, chainmonitor};
        use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
-       use lightning::chain::keysinterface::{InMemorySigner, Recipient, KeysInterface, KeysManager};
+       use lightning::chain::keysinterface::{InMemorySigner, Recipient, EntropySource, KeysManager, NodeSigner};
        use lightning::chain::transaction::OutPoint;
        use lightning::get_event_msg;
-       use lightning::ln::channelmanager::{self, BREAKDOWN_TIMEOUT, ChainParameters, ChannelManager, SimpleArcChannelManager};
+       use lightning::ln::channelmanager::{BREAKDOWN_TIMEOUT, ChainParameters, ChannelManager, SimpleArcChannelManager};
        use lightning::ln::features::ChannelFeatures;
        use lightning::ln::msgs::{ChannelMessageHandler, Init};
        use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler};
        use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
+       use lightning::routing::router::DefaultRouter;
+       use lightning::routing::scoring::{ProbabilisticScoringParameters, ProbabilisticScorer};
        use lightning::util::config::UserConfig;
        use lightning::util::events::{Event, MessageSendEventsProvider, MessageSendEvent};
        use lightning::util::ser::Writeable;
        use lightning::util::test_utils;
        use lightning::util::persist::KVStorePersister;
        use lightning_invoice::payment::{InvoicePayer, Retry};
-       use lightning_invoice::utils::DefaultRouter;
        use lightning_persister::FilesystemPersister;
        use std::fs;
        use std::path::PathBuf;
@@ -597,7 +613,6 @@ mod tests {
        use std::time::Duration;
        use bitcoin::hashes::Hash;
        use bitcoin::TxMerkleNode;
-       use lightning::routing::scoring::{FixedPenaltyScorer};
        use lightning_rapid_gossip_sync::RapidGossipSync;
        use super::{BackgroundProcessor, GossipSync, FRESHNESS_TIMER};
 
@@ -629,7 +644,7 @@ mod tests {
                network_graph: Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
                logger: Arc<test_utils::TestLogger>,
                best_block: BestBlock,
-               scorer: Arc<Mutex<FixedPenaltyScorer>>,
+               scorer: Arc<Mutex<ProbabilisticScorer<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestLogger>>>>,
        }
 
        impl Node {
@@ -726,32 +741,34 @@ mod tests {
                for i in 0..num_nodes {
                        let tx_broadcaster = Arc::new(test_utils::TestBroadcaster{txn_broadcasted: Mutex::new(Vec::new()), blocks: Arc::new(Mutex::new(Vec::new()))});
                        let fee_estimator = Arc::new(test_utils::TestFeeEstimator { sat_per_kw: Mutex::new(253) });
-                       let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Testnet));
                        let logger = Arc::new(test_utils::TestLogger::with_id(format!("node {}", i)));
-                       let persister = Arc::new(FilesystemPersister::new(format!("{}_persister_{}", persist_dir, i)));
-                       let seed = [i as u8; 32];
                        let network = Network::Testnet;
                        let genesis_block = genesis_block(network);
+                       let network_graph = Arc::new(NetworkGraph::new(genesis_block.header.block_hash(), logger.clone()));
+                       let params = ProbabilisticScoringParameters::default();
+                       let scorer = Arc::new(Mutex::new(ProbabilisticScorer::new(params, network_graph.clone(), logger.clone())));
+                       let seed = [i as u8; 32];
+                       let router = Arc::new(DefaultRouter::new(network_graph.clone(), logger.clone(), seed, scorer.clone()));
+                       let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Testnet));
+                       let persister = Arc::new(FilesystemPersister::new(format!("{}_persister_{}", persist_dir, i)));
                        let now = Duration::from_secs(genesis_block.header.time as u64);
                        let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos()));
                        let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(Some(chain_source.clone()), tx_broadcaster.clone(), logger.clone(), fee_estimator.clone(), persister.clone()));
                        let best_block = BestBlock::from_genesis(network);
                        let params = ChainParameters { network, best_block };
-                       let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster.clone(), logger.clone(), keys_manager.clone(), UserConfig::default(), params));
-                       let network_graph = Arc::new(NetworkGraph::new(genesis_block.header.block_hash(), logger.clone()));
+                       let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster.clone(), router.clone(), logger.clone(), keys_manager.clone(), keys_manager.clone(), keys_manager.clone(), UserConfig::default(), params));
                        let p2p_gossip_sync = Arc::new(P2PGossipSync::new(network_graph.clone(), Some(chain_source.clone()), logger.clone()));
                        let rapid_gossip_sync = Arc::new(RapidGossipSync::new(network_graph.clone()));
                        let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new()), onion_message_handler: IgnoringMessageHandler{}};
                        let peer_manager = Arc::new(PeerManager::new(msg_handler, keys_manager.get_node_secret(Recipient::Node).unwrap(), 0, &seed, logger.clone(), IgnoringMessageHandler{}));
-                       let scorer = Arc::new(Mutex::new(test_utils::TestScorer::with_penalty(0)));
                        let node = Node { node: manager, p2p_gossip_sync, rapid_gossip_sync, peer_manager, chain_monitor, persister, tx_broadcaster, network_graph, logger, best_block, scorer };
                        nodes.push(node);
                }
 
                for i in 0..num_nodes {
                        for j in (i+1)..num_nodes {
-                               nodes[i].node.peer_connected(&nodes[j].node.get_our_node_id(), &Init { features: channelmanager::provided_init_features(), remote_network_address: None }).unwrap();
-                               nodes[j].node.peer_connected(&nodes[i].node.get_our_node_id(), &Init { features: channelmanager::provided_init_features(), remote_network_address: None }).unwrap();
+                               nodes[i].node.peer_connected(&nodes[j].node.get_our_node_id(), &Init { features: nodes[j].node.init_features(), remote_network_address: None }).unwrap();
+                               nodes[j].node.peer_connected(&nodes[i].node.get_our_node_id(), &Init { features: nodes[i].node.init_features(), remote_network_address: None }).unwrap();
                        }
                }
 
@@ -772,8 +789,8 @@ mod tests {
        macro_rules! begin_open_channel {
                ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
                        $node_a.node.create_channel($node_b.node.get_our_node_id(), $channel_value, 100, 42, None).unwrap();
-                       $node_b.node.handle_open_channel(&$node_a.node.get_our_node_id(), channelmanager::provided_init_features(), &get_event_msg!($node_a, MessageSendEvent::SendOpenChannel, $node_b.node.get_our_node_id()));
-                       $node_a.node.handle_accept_channel(&$node_b.node.get_our_node_id(), channelmanager::provided_init_features(), &get_event_msg!($node_b, MessageSendEvent::SendAcceptChannel, $node_a.node.get_our_node_id()));
+                       $node_b.node.handle_open_channel(&$node_a.node.get_our_node_id(), $node_a.node.init_features(), &get_event_msg!($node_a, MessageSendEvent::SendOpenChannel, $node_b.node.get_our_node_id()));
+                       $node_a.node.handle_accept_channel(&$node_b.node.get_our_node_id(), $node_b.node.init_features(), &get_event_msg!($node_b, MessageSendEvent::SendAcceptChannel, $node_a.node.get_our_node_id()));
                }}
        }
 
@@ -1074,10 +1091,11 @@ mod tests {
 
                loop {
                        let log_entries = nodes[0].logger.lines.lock().unwrap();
-                       let expected_log_a = "Assessing prunability of network graph".to_string();
-                       let expected_log_b = "Not pruning network graph, either due to pending rapid gossip sync or absence of a prunable graph.".to_string();
-                       if log_entries.get(&("lightning_background_processor".to_string(), expected_log_a)).is_some() &&
-                               log_entries.get(&("lightning_background_processor".to_string(), expected_log_b)).is_some() {
+                       let loop_counter = "Calling ChannelManager's timer_tick_occurred".to_string();
+                       if *log_entries.get(&("lightning_background_processor".to_string(), loop_counter))
+                               .unwrap_or(&0) > 1
+                       {
+                               // Wait until the loop has gone around at least twice.
                                break
                        }
                }