Avoid needless MPP on multiple channels to the same first-hop
[rust-lightning] / lightning-background-processor / src / lib.rs
index 593f84a90898534742f5dbd0c0458ea3f53cf059..084ea62efb6faf51f0ff67b0b5fb01fc61f965ef 100644 (file)
@@ -6,6 +6,8 @@
 #![deny(missing_docs)]
 #![deny(unsafe_code)]
 
+#![cfg_attr(docsrs, feature(doc_auto_cfg))]
+
 #[macro_use] extern crate lightning;
 
 use lightning::chain;
@@ -34,6 +36,8 @@ use std::ops::Deref;
 ///   [`ChannelManager`] persistence should be done in the background.
 /// * Calling [`ChannelManager::timer_tick_occurred`] and [`PeerManager::timer_tick_occurred`]
 ///   at the appropriate intervals.
+/// * Calling [`NetworkGraph::remove_stale_channels`] (if a [`NetGraphMsgHandler`] is provided to
+///   [`BackgroundProcessor::start`]).
 ///
 /// It will also call [`PeerManager::process_events`] periodically though this shouldn't be relied
 /// upon as doing so may result in high latency.
@@ -59,7 +63,7 @@ const FRESHNESS_TIMER: u64 = 60;
 const FRESHNESS_TIMER: u64 = 1;
 
 #[cfg(all(not(test), not(debug_assertions)))]
-const PING_TIMER: u64 = 5;
+const PING_TIMER: u64 = 10;
 /// Signature operations take a lot longer without compiler optimisations.
 /// Increasing the ping timer allows for this but slower devices will be disconnected if the
 /// timeout is reached.
@@ -68,6 +72,9 @@ const PING_TIMER: u64 = 30;
 #[cfg(test)]
 const PING_TIMER: u64 = 1;
 
+/// Prune the network graph of stale entries hourly.
+const NETWORK_PRUNE_TIMER: u64 = 60 * 60;
+
 /// Trait which handles persisting a [`ChannelManager`] to disk.
 ///
 /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
@@ -203,41 +210,57 @@ impl BackgroundProcessor {
                let stop_thread = Arc::new(AtomicBool::new(false));
                let stop_thread_clone = stop_thread.clone();
                let handle = thread::spawn(move || -> Result<(), std::io::Error> {
-                       let event_handler = DecoratingEventHandler { event_handler, net_graph_msg_handler };
+                       let event_handler = DecoratingEventHandler { event_handler, net_graph_msg_handler: net_graph_msg_handler.as_ref().map(|t| t.deref()) };
 
                        log_trace!(logger, "Calling ChannelManager's timer_tick_occurred on startup");
                        channel_manager.timer_tick_occurred();
 
                        let mut last_freshness_call = Instant::now();
                        let mut last_ping_call = Instant::now();
+                       let mut last_prune_call = Instant::now();
+                       let mut have_pruned = false;
+
                        loop {
-                               peer_manager.process_events();
+                               peer_manager.process_events(); // Note that this may block on ChannelManager's locking
                                channel_manager.process_pending_events(&event_handler);
                                chain_monitor.process_pending_events(&event_handler);
+
+                               // We wait up to 100ms, but track how long it takes to detect being put to sleep,
+                               // see `await_start`'s use below.
+                               let await_start = Instant::now();
                                let updates_available =
                                        channel_manager.await_persistable_update_timeout(Duration::from_millis(100));
+                               let await_time = await_start.elapsed();
+
                                if updates_available {
+                                       log_trace!(logger, "Persisting ChannelManager...");
                                        persister.persist_manager(&*channel_manager)?;
+                                       log_trace!(logger, "Done persisting ChannelManager.");
                                }
                                // Exit the loop if the background processor was requested to stop.
                                if stop_thread.load(Ordering::Acquire) == true {
                                        log_trace!(logger, "Terminating background processor.");
-                                       return Ok(());
+                                       break;
                                }
                                if last_freshness_call.elapsed().as_secs() > FRESHNESS_TIMER {
                                        log_trace!(logger, "Calling ChannelManager's timer_tick_occurred");
                                        channel_manager.timer_tick_occurred();
                                        last_freshness_call = Instant::now();
                                }
-                               if last_ping_call.elapsed().as_secs() > PING_TIMER * 2 {
+                               if await_time > Duration::from_secs(1) {
                                        // On various platforms, we may be starved of CPU cycles for several reasons.
                                        // E.g. on iOS, if we've been in the background, we will be entirely paused.
                                        // Similarly, if we're on a desktop platform and the device has been asleep, we
                                        // may not get any cycles.
-                                       // In any case, if we've been entirely paused for more than double our ping
-                                       // timer, we should have disconnected all sockets by now (and they're probably
-                                       // dead anyway), so disconnect them by calling `timer_tick_occurred()` twice.
-                                       log_trace!(logger, "Awoke after more than double our ping timer, disconnecting peers.");
+                                       // We detect this by checking if our max-100ms-sleep, above, ran longer than a
+                                       // full second, at which point we assume sockets may have been killed (they
+                                       // appear to be at least on some platforms, even if it has only been a second).
+                                       // Note that we have to take care to not get here just because user event
+                                       // processing was slow at the top of the loop. For example, the sample client
+                                       // may call Bitcoin Core RPCs during event handling, which very often takes
+                                       // more than a handful of seconds to complete, and shouldn't disconnect all our
+                                       // peers.
+                                       log_trace!(logger, "100ms sleep took more than a second, disconnecting peers.");
                                        peer_manager.disconnect_all_peers();
                                        last_ping_call = Instant::now();
                                } else if last_ping_call.elapsed().as_secs() > PING_TIMER {
@@ -245,7 +268,24 @@ impl BackgroundProcessor {
                                        peer_manager.timer_tick_occurred();
                                        last_ping_call = Instant::now();
                                }
+
+                               // Note that we want to run a graph prune once not long after startup before
+                               // falling back to our usual hourly prunes. This avoids short-lived clients never
+                               // pruning their network graph. We run once 60 seconds after startup before
+                               // continuing our normal cadence.
+                               if last_prune_call.elapsed().as_secs() > if have_pruned { NETWORK_PRUNE_TIMER } else { 60 } {
+                                       if let Some(ref handler) = net_graph_msg_handler {
+                                               log_trace!(logger, "Pruning network graph of stale entries");
+                                               handler.network_graph().remove_stale_channels();
+                                               last_prune_call = Instant::now();
+                                               have_pruned = true;
+                                       }
+                               }
                        }
+                       // After we exit, ensure we persist the ChannelManager one final time - this avoids
+                       // some races where users quit while channel updates were in-flight, with
+                       // ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
+                       persister.persist_manager(&*channel_manager)
                });
                Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
        }
@@ -305,7 +345,7 @@ mod tests {
        use bitcoin::network::constants::Network;
        use lightning::chain::{BestBlock, Confirm, chainmonitor};
        use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
-       use lightning::chain::keysinterface::{InMemorySigner, KeysInterface, KeysManager};
+       use lightning::chain::keysinterface::{InMemorySigner, Recipient, KeysInterface, KeysManager};
        use lightning::chain::transaction::OutPoint;
        use lightning::get_event_msg;
        use lightning::ln::channelmanager::{BREAKDOWN_TIMEOUT, ChainParameters, ChannelManager, SimpleArcChannelManager};
@@ -388,7 +428,7 @@ mod tests {
                        let network_graph = Arc::new(NetworkGraph::new(genesis_block.header.block_hash()));
                        let net_graph_msg_handler = Some(Arc::new(NetGraphMsgHandler::new(network_graph.clone(), Some(chain_source.clone()), logger.clone())));
                        let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new() )};
-                       let peer_manager = Arc::new(PeerManager::new(msg_handler, keys_manager.get_node_secret(), &seed, logger.clone(), IgnoringMessageHandler{}));
+                       let peer_manager = Arc::new(PeerManager::new(msg_handler, keys_manager.get_node_secret(Recipient::Node).unwrap(), &seed, logger.clone(), IgnoringMessageHandler{}));
                        let node = Node { node: manager, net_graph_msg_handler, peer_manager, chain_monitor, persister, tx_broadcaster, network_graph, logger, best_block };
                        nodes.push(node);
                }
@@ -491,9 +531,10 @@ mod tests {
 
                macro_rules! check_persisted_data {
                        ($node: expr, $filepath: expr, $expected_bytes: expr) => {
-                               match $node.write(&mut $expected_bytes) {
-                                       Ok(()) => {
-                                               loop {
+                               loop {
+                                       $expected_bytes.clear();
+                                       match $node.write(&mut $expected_bytes) {
+                                               Ok(()) => {
                                                        match std::fs::read($filepath) {
                                                                Ok(bytes) => {
                                                                        if bytes == $expected_bytes {
@@ -504,9 +545,9 @@ mod tests {
                                                                },
                                                                Err(_) => continue
                                                        }
-                                               }
-                                       },
-                                       Err(e) => panic!("Unexpected error: {}", e)
+                                               },
+                                               Err(e) => panic!("Unexpected error: {}", e)
+                                       }
                                }
                        }
                }
@@ -628,13 +669,15 @@ mod tests {
 
        #[test]
        fn test_invoice_payer() {
+               let keys_manager = test_utils::TestKeysInterface::new(&[0u8; 32], Network::Testnet);
+               let random_seed_bytes = keys_manager.get_secure_random_bytes();
                let nodes = create_nodes(2, "test_invoice_payer".to_string());
 
                // Initiate the background processors to watch each node.
                let data_dir = nodes[0].persister.get_data_dir();
                let persister = move |node: &ChannelManager<InMemorySigner, Arc<ChainMonitor>, Arc<test_utils::TestBroadcaster>, Arc<KeysManager>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>>| FilesystemPersister::persist_manager(data_dir.clone(), node);
-               let router = DefaultRouter::new(Arc::clone(&nodes[0].network_graph), Arc::clone(&nodes[0].logger));
-               let scorer = Arc::new(Mutex::new(test_utils::TestScorer::default()));
+               let scorer = Arc::new(Mutex::new(test_utils::TestScorer::with_penalty(0)));
+               let router = DefaultRouter::new(Arc::clone(&nodes[0].network_graph), Arc::clone(&nodes[0].logger), random_seed_bytes);
                let invoice_payer = Arc::new(InvoicePayer::new(Arc::clone(&nodes[0].node), router, scorer, Arc::clone(&nodes[0].logger), |_: &_| {}, RetryAttempts(2)));
                let event_handler = Arc::clone(&invoice_payer);
                let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone());