Merge pull request #2146 from valentinewallace/2023-03-blinded-pathfinding-groundwork
[rust-lightning] / lightning-background-processor / src / lib.rs
index 627f0db15aed1f2da21a9a0fe58ce91c94d65372..b6b55e1c68a04e2998c5bcfc25090a36b3139438 100644 (file)
@@ -64,8 +64,8 @@ use alloc::vec::Vec;
 /// * Monitoring whether the [`ChannelManager`] needs to be re-persisted to disk, and if so,
 ///   writing it to disk/backups by invoking the callback given to it at startup.
 ///   [`ChannelManager`] persistence should be done in the background.
-/// * Calling [`ChannelManager::timer_tick_occurred`] and [`PeerManager::timer_tick_occurred`]
-///   at the appropriate intervals.
+/// * Calling [`ChannelManager::timer_tick_occurred`], [`ChainMonitor::rebroadcast_pending_claims`]
+///   and [`PeerManager::timer_tick_occurred`] at the appropriate intervals.
 /// * Calling [`NetworkGraph::remove_stale_channels_and_tracking`] (if a [`GossipSync`] with a
 ///   [`NetworkGraph`] is provided to [`BackgroundProcessor::start`]).
 ///
@@ -116,12 +116,17 @@ const FIRST_NETWORK_PRUNE_TIMER: u64 = 60;
 #[cfg(test)]
 const FIRST_NETWORK_PRUNE_TIMER: u64 = 1;
 
+#[cfg(not(test))]
+const REBROADCAST_TIMER: u64 = 30;
+#[cfg(test)]
+const REBROADCAST_TIMER: u64 = 1;
+
 #[cfg(feature = "futures")]
 /// core::cmp::min is not currently const, so we define a trivial (and equivalent) replacement
 const fn min_u64(a: u64, b: u64) -> u64 { if a < b { a } else { b } }
 #[cfg(feature = "futures")]
 const FASTEST_TIMER: u64 = min_u64(min_u64(FRESHNESS_TIMER, PING_TIMER),
-       min_u64(SCORER_PERSIST_TIMER, FIRST_NETWORK_PRUNE_TIMER));
+       min_u64(SCORER_PERSIST_TIMER, min_u64(FIRST_NETWORK_PRUNE_TIMER, REBROADCAST_TIMER)));
 
 /// Either [`P2PGossipSync`] or [`RapidGossipSync`].
 pub enum GossipSync<
@@ -265,11 +270,14 @@ macro_rules! define_run_body {
        => { {
                log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup");
                $channel_manager.timer_tick_occurred();
+               log_trace!($logger, "Rebroadcasting monitor's pending claims on startup");
+               $chain_monitor.rebroadcast_pending_claims();
 
                let mut last_freshness_call = $get_timer(FRESHNESS_TIMER);
                let mut last_ping_call = $get_timer(PING_TIMER);
                let mut last_prune_call = $get_timer(FIRST_NETWORK_PRUNE_TIMER);
                let mut last_scorer_persist_call = $get_timer(SCORER_PERSIST_TIMER);
+               let mut last_rebroadcast_call = $get_timer(REBROADCAST_TIMER);
                let mut have_pruned = false;
 
                loop {
@@ -337,7 +345,8 @@ macro_rules! define_run_body {
                        // falling back to our usual hourly prunes. This avoids short-lived clients never
                        // pruning their network graph. We run once 60 seconds after startup before
                        // continuing our normal cadence.
-                       if $timer_elapsed(&mut last_prune_call, if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER }) {
+                       let prune_timer = if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER };
+                       if $timer_elapsed(&mut last_prune_call, prune_timer) {
                                // The network graph must not be pruned while rapid sync completion is pending
                                if let Some(network_graph) = $gossip_sync.prunable_network_graph() {
                                        #[cfg(feature = "std")] {
@@ -355,7 +364,8 @@ macro_rules! define_run_body {
 
                                        have_pruned = true;
                                }
-                               last_prune_call = $get_timer(NETWORK_PRUNE_TIMER);
+                               let prune_timer = if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER };
+                               last_prune_call = $get_timer(prune_timer);
                        }
 
                        if $timer_elapsed(&mut last_scorer_persist_call, SCORER_PERSIST_TIMER) {
@@ -367,6 +377,12 @@ macro_rules! define_run_body {
                                }
                                last_scorer_persist_call = $get_timer(SCORER_PERSIST_TIMER);
                        }
+
+                       if $timer_elapsed(&mut last_rebroadcast_call, REBROADCAST_TIMER) {
+                               log_trace!($logger, "Rebroadcasting monitor's pending claims");
+                               $chain_monitor.rebroadcast_pending_claims();
+                               last_rebroadcast_call = $get_timer(REBROADCAST_TIMER);
+                       }
                }
 
                // After we exit, ensure we persist the ChannelManager one final time - this avoids
@@ -862,7 +878,10 @@ mod tests {
 
                        if key == "network_graph" {
                                if let Some(sender) = &self.graph_persistence_notifier {
-                                       sender.send(()).unwrap();
+                                       match sender.send(()) {
+                                               Ok(()) => {},
+                                               Err(std::sync::mpsc::SendError(())) => println!("Persister failed to notify as receiver went away."),
+                                       }
                                };
 
                                if let Some((error, message)) = self.graph_error {
@@ -1009,12 +1028,12 @@ mod tests {
        }
 
        fn create_nodes(num_nodes: usize, persist_dir: String) -> Vec<Node> {
+               let network = Network::Testnet;
                let mut nodes = Vec::new();
                for i in 0..num_nodes {
-                       let tx_broadcaster = Arc::new(test_utils::TestBroadcaster{txn_broadcasted: Mutex::new(Vec::new()), blocks: Arc::new(Mutex::new(Vec::new()))});
+                       let tx_broadcaster = Arc::new(test_utils::TestBroadcaster::new(network));
                        let fee_estimator = Arc::new(test_utils::TestFeeEstimator { sat_per_kw: Mutex::new(253) });
                        let logger = Arc::new(test_utils::TestLogger::with_id(format!("node {}", i)));
-                       let network = Network::Testnet;
                        let genesis_block = genesis_block(network);
                        let network_graph = Arc::new(NetworkGraph::new(network, logger.clone()));
                        let scorer = Arc::new(Mutex::new(TestScorer::new()));
@@ -1184,8 +1203,9 @@ mod tests {
 
        #[test]
        fn test_timer_tick_called() {
-               // Test that ChannelManager's and PeerManager's `timer_tick_occurred` is called every
-               // `FRESHNESS_TIMER`.
+               // Test that `ChannelManager::timer_tick_occurred` is called every `FRESHNESS_TIMER`,
+               // `ChainMonitor::rebroadcast_pending_claims` is called every `REBROADCAST_TIMER`, and
+               // `PeerManager::timer_tick_occurred` every `PING_TIMER`.
                let nodes = create_nodes(1, "test_timer_tick_called".to_string());
                let data_dir = nodes[0].persister.get_data_dir();
                let persister = Arc::new(Persister::new(data_dir));
@@ -1193,10 +1213,12 @@ mod tests {
                let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
                loop {
                        let log_entries = nodes[0].logger.lines.lock().unwrap();
-                       let desired_log = "Calling ChannelManager's timer_tick_occurred".to_string();
-                       let second_desired_log = "Calling PeerManager's timer_tick_occurred".to_string();
-                       if log_entries.get(&("lightning_background_processor".to_string(), desired_log)).is_some() &&
-                                       log_entries.get(&("lightning_background_processor".to_string(), second_desired_log)).is_some() {
+                       let desired_log_1 = "Calling ChannelManager's timer_tick_occurred".to_string();
+                       let desired_log_2 = "Calling PeerManager's timer_tick_occurred".to_string();
+                       let desired_log_3 = "Rebroadcasting monitor's pending claims".to_string();
+                       if log_entries.get(&("lightning_background_processor".to_string(), desired_log_1)).is_some() &&
+                               log_entries.get(&("lightning_background_processor".to_string(), desired_log_2)).is_some() &&
+                               log_entries.get(&("lightning_background_processor".to_string(), desired_log_3)).is_some() {
                                break
                        }
                }
@@ -1475,10 +1497,9 @@ mod tests {
                                })
                        }, false,
                );
-               // TODO: Drop _local and simply spawn after #2003
-               let local_set = tokio::task::LocalSet::new();
-               local_set.spawn_local(bp_future);
-               local_set.spawn_local(async move {
+
+               let t1 = tokio::spawn(bp_future);
+               let t2 = tokio::spawn(async move {
                        do_test_not_pruning_network_graph_until_graph_sync_completion!(nodes, {
                                let mut i = 0;
                                loop {
@@ -1490,7 +1511,9 @@ mod tests {
                        }, tokio::time::sleep(Duration::from_millis(1)).await);
                        exit_sender.send(()).unwrap();
                });
-               local_set.await;
+               let (r1, r2) = tokio::join!(t1, t2);
+               r1.unwrap().unwrap();
+               r2.unwrap()
        }
 
        macro_rules! do_test_payment_path_scoring {
@@ -1644,13 +1667,14 @@ mod tests {
                                })
                        }, false,
                );
-               // TODO: Drop _local and simply spawn after #2003
-               let local_set = tokio::task::LocalSet::new();
-               local_set.spawn_local(bp_future);
-               local_set.spawn_local(async move {
+               let t1 = tokio::spawn(bp_future);
+               let t2 = tokio::spawn(async move {
                        do_test_payment_path_scoring!(nodes, receiver.recv().await);
                        exit_sender.send(()).unwrap();
                });
-               local_set.await;
+
+               let (r1, r2) = tokio::join!(t1, t2);
+               r1.unwrap().unwrap();
+               r2.unwrap()
        }
 }