Avoid needless MPP on multiple channels to the same first-hop
[rust-lightning] / lightning-background-processor / src / lib.rs
index 2dbc8053b4eb1247e5d64dc482e1817ad5db08ec..084ea62efb6faf51f0ff67b0b5fb01fc61f965ef 100644 (file)
@@ -6,6 +6,8 @@
 #![deny(missing_docs)]
 #![deny(unsafe_code)]
 
+#![cfg_attr(docsrs, feature(doc_auto_cfg))]
+
 #[macro_use] extern crate lightning;
 
 use lightning::chain;
@@ -61,7 +63,7 @@ const FRESHNESS_TIMER: u64 = 60;
 const FRESHNESS_TIMER: u64 = 1;
 
 #[cfg(all(not(test), not(debug_assertions)))]
-const PING_TIMER: u64 = 5;
+const PING_TIMER: u64 = 10;
 /// Signature operations take a lot longer without compiler optimisations.
 /// Increasing the ping timer allows for this but slower devices will be disconnected if the
 /// timeout is reached.
@@ -219,11 +221,17 @@ impl BackgroundProcessor {
                        let mut have_pruned = false;
 
                        loop {
-                               peer_manager.process_events();
+                               peer_manager.process_events(); // Note that this may block on ChannelManager's locking
                                channel_manager.process_pending_events(&event_handler);
                                chain_monitor.process_pending_events(&event_handler);
+
+                               // We wait up to 100ms, but track how long it takes to detect being put to sleep,
+                               // see `await_start`'s use below.
+                               let await_start = Instant::now();
                                let updates_available =
                                        channel_manager.await_persistable_update_timeout(Duration::from_millis(100));
+                               let await_time = await_start.elapsed();
+
                                if updates_available {
                                        log_trace!(logger, "Persisting ChannelManager...");
                                        persister.persist_manager(&*channel_manager)?;
@@ -232,22 +240,27 @@ impl BackgroundProcessor {
                                // Exit the loop if the background processor was requested to stop.
                                if stop_thread.load(Ordering::Acquire) == true {
                                        log_trace!(logger, "Terminating background processor.");
-                                       return Ok(());
+                                       break;
                                }
                                if last_freshness_call.elapsed().as_secs() > FRESHNESS_TIMER {
                                        log_trace!(logger, "Calling ChannelManager's timer_tick_occurred");
                                        channel_manager.timer_tick_occurred();
                                        last_freshness_call = Instant::now();
                                }
-                               if last_ping_call.elapsed().as_secs() > PING_TIMER * 2 {
+                               if await_time > Duration::from_secs(1) {
                                        // On various platforms, we may be starved of CPU cycles for several reasons.
                                        // E.g. on iOS, if we've been in the background, we will be entirely paused.
                                        // Similarly, if we're on a desktop platform and the device has been asleep, we
                                        // may not get any cycles.
-                                       // In any case, if we've been entirely paused for more than double our ping
-                                       // timer, we should have disconnected all sockets by now (and they're probably
-                                       // dead anyway), so disconnect them by calling `timer_tick_occurred()` twice.
-                                       log_trace!(logger, "Awoke after more than double our ping timer, disconnecting peers.");
+                                       // We detect this by checking if our max-100ms-sleep, above, ran longer than a
+                                       // full second, at which point we assume sockets may have been killed (they
+                                       // appear to be at least on some platforms, even if it has only been a second).
+                                       // Note that we have to take care to not get here just because user event
+                                       // processing was slow at the top of the loop. For example, the sample client
+                                       // may call Bitcoin Core RPCs during event handling, which very often takes
+                                       // more than a handful of seconds to complete, and shouldn't disconnect all our
+                                       // peers.
+                                       log_trace!(logger, "100ms sleep took more than a second, disconnecting peers.");
                                        peer_manager.disconnect_all_peers();
                                        last_ping_call = Instant::now();
                                } else if last_ping_call.elapsed().as_secs() > PING_TIMER {
@@ -269,6 +282,10 @@ impl BackgroundProcessor {
                                        }
                                }
                        }
+                       // After we exit, ensure we persist the ChannelManager one final time - this avoids
+                       // some races where users quit while channel updates were in-flight, with
+                       // ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
+                       persister.persist_manager(&*channel_manager)
                });
                Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
        }
@@ -328,7 +345,7 @@ mod tests {
        use bitcoin::network::constants::Network;
        use lightning::chain::{BestBlock, Confirm, chainmonitor};
        use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
-       use lightning::chain::keysinterface::{InMemorySigner, KeysInterface, KeysManager};
+       use lightning::chain::keysinterface::{InMemorySigner, Recipient, KeysInterface, KeysManager};
        use lightning::chain::transaction::OutPoint;
        use lightning::get_event_msg;
        use lightning::ln::channelmanager::{BREAKDOWN_TIMEOUT, ChainParameters, ChannelManager, SimpleArcChannelManager};
@@ -411,7 +428,7 @@ mod tests {
                        let network_graph = Arc::new(NetworkGraph::new(genesis_block.header.block_hash()));
                        let net_graph_msg_handler = Some(Arc::new(NetGraphMsgHandler::new(network_graph.clone(), Some(chain_source.clone()), logger.clone())));
                        let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new() )};
-                       let peer_manager = Arc::new(PeerManager::new(msg_handler, keys_manager.get_node_secret(), &seed, logger.clone(), IgnoringMessageHandler{}));
+                       let peer_manager = Arc::new(PeerManager::new(msg_handler, keys_manager.get_node_secret(Recipient::Node).unwrap(), &seed, logger.clone(), IgnoringMessageHandler{}));
                        let node = Node { node: manager, net_graph_msg_handler, peer_manager, chain_monitor, persister, tx_broadcaster, network_graph, logger, best_block };
                        nodes.push(node);
                }
@@ -652,13 +669,15 @@ mod tests {
 
        #[test]
        fn test_invoice_payer() {
+               let keys_manager = test_utils::TestKeysInterface::new(&[0u8; 32], Network::Testnet);
+               let random_seed_bytes = keys_manager.get_secure_random_bytes();
                let nodes = create_nodes(2, "test_invoice_payer".to_string());
 
                // Initiate the background processors to watch each node.
                let data_dir = nodes[0].persister.get_data_dir();
                let persister = move |node: &ChannelManager<InMemorySigner, Arc<ChainMonitor>, Arc<test_utils::TestBroadcaster>, Arc<KeysManager>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>>| FilesystemPersister::persist_manager(data_dir.clone(), node);
-               let router = DefaultRouter::new(Arc::clone(&nodes[0].network_graph), Arc::clone(&nodes[0].logger));
-               let scorer = Arc::new(Mutex::new(test_utils::TestScorer::default()));
+               let scorer = Arc::new(Mutex::new(test_utils::TestScorer::with_penalty(0)));
+               let router = DefaultRouter::new(Arc::clone(&nodes[0].network_graph), Arc::clone(&nodes[0].logger), random_seed_bytes);
                let invoice_payer = Arc::new(InvoicePayer::new(Arc::clone(&nodes[0].node), router, scorer, Arc::clone(&nodes[0].logger), |_: &_| {}, RetryAttempts(2)));
                let event_handler = Arc::clone(&invoice_payer);
                let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone());