Persist `ChannelManager` before `BackgroundProcessor` exits
[rust-lightning] / lightning-background-processor / src / lib.rs
index 50743774b7aa487753f416de66f1c952a7c86a3f..8f6ed657b3d8e116b5913ebb9b75e4dbdc218e41 100644 (file)
@@ -34,6 +34,8 @@ use std::ops::Deref;
 ///   [`ChannelManager`] persistence should be done in the background.
 /// * Calling [`ChannelManager::timer_tick_occurred`] and [`PeerManager::timer_tick_occurred`]
 ///   at the appropriate intervals.
+/// * Calling [`NetworkGraph::remove_stale_channels`] (if a [`NetGraphMsgHandler`] is provided to
+///   [`BackgroundProcessor::start`]).
 ///
 /// It will also call [`PeerManager::process_events`] periodically though this shouldn't be relied
 /// upon as doing so may result in high latency.
@@ -68,6 +70,9 @@ const PING_TIMER: u64 = 30;
 #[cfg(test)]
 const PING_TIMER: u64 = 1;
 
+/// Prune the network graph of stale entries hourly.
+const NETWORK_PRUNE_TIMER: u64 = 60 * 60;
+
 /// Trait which handles persisting a [`ChannelManager`] to disk.
 ///
 /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
@@ -155,7 +160,7 @@ impl BackgroundProcessor {
        /// functionality implemented by other handlers.
        /// * [`NetGraphMsgHandler`] if given will update the [`NetworkGraph`] based on payment failures.
        ///
-       /// [top-level documentation]: Self
+       /// [top-level documentation]: BackgroundProcessor
        /// [`join`]: Self::join
        /// [`stop`]: Self::stop
        /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
@@ -203,13 +208,16 @@ impl BackgroundProcessor {
                let stop_thread = Arc::new(AtomicBool::new(false));
                let stop_thread_clone = stop_thread.clone();
                let handle = thread::spawn(move || -> Result<(), std::io::Error> {
-                       let event_handler = DecoratingEventHandler { event_handler, net_graph_msg_handler };
+                       let event_handler = DecoratingEventHandler { event_handler, net_graph_msg_handler: net_graph_msg_handler.as_ref().map(|t| t.deref()) };
 
                        log_trace!(logger, "Calling ChannelManager's timer_tick_occurred on startup");
                        channel_manager.timer_tick_occurred();
 
                        let mut last_freshness_call = Instant::now();
                        let mut last_ping_call = Instant::now();
+                       let mut last_prune_call = Instant::now();
+                       let mut have_pruned = false;
+
                        loop {
                                peer_manager.process_events();
                                channel_manager.process_pending_events(&event_handler);
@@ -217,12 +225,14 @@ impl BackgroundProcessor {
                                let updates_available =
                                        channel_manager.await_persistable_update_timeout(Duration::from_millis(100));
                                if updates_available {
+                                       log_trace!(logger, "Persisting ChannelManager...");
                                        persister.persist_manager(&*channel_manager)?;
+                                       log_trace!(logger, "Done persisting ChannelManager.");
                                }
                                // Exit the loop if the background processor was requested to stop.
                                if stop_thread.load(Ordering::Acquire) == true {
                                        log_trace!(logger, "Terminating background processor.");
-                                       return Ok(());
+                                       break;
                                }
                                if last_freshness_call.elapsed().as_secs() > FRESHNESS_TIMER {
                                        log_trace!(logger, "Calling ChannelManager's timer_tick_occurred");
@@ -245,7 +255,24 @@ impl BackgroundProcessor {
                                        peer_manager.timer_tick_occurred();
                                        last_ping_call = Instant::now();
                                }
+
+                               // Note that we want to run a graph prune once not long after startup before
+                               // falling back to our usual hourly prunes. This avoids short-lived clients never
+                               // pruning their network graph. We run once 60 seconds after startup before
+                               // continuing our normal cadence.
+                               if last_prune_call.elapsed().as_secs() > if have_pruned { NETWORK_PRUNE_TIMER } else { 60 } {
+                                       if let Some(ref handler) = net_graph_msg_handler {
+                                               log_trace!(logger, "Pruning network graph of stale entries");
+                                               handler.network_graph().remove_stale_channels();
+                                               last_prune_call = Instant::now();
+                                               have_pruned = true;
+                                       }
+                               }
                        }
+                       // After we exit, ensure we persist the ChannelManager one final time - this avoids
+                       // some races where users quit while channel updates were in-flight, with
+                       // ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
+                       persister.persist_manager(&*channel_manager)
                });
                Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
        }
@@ -491,9 +518,10 @@ mod tests {
 
                macro_rules! check_persisted_data {
                        ($node: expr, $filepath: expr, $expected_bytes: expr) => {
-                               match $node.write(&mut $expected_bytes) {
-                                       Ok(()) => {
-                                               loop {
+                               loop {
+                                       $expected_bytes.clear();
+                                       match $node.write(&mut $expected_bytes) {
+                                               Ok(()) => {
                                                        match std::fs::read($filepath) {
                                                                Ok(bytes) => {
                                                                        if bytes == $expected_bytes {
@@ -504,9 +532,9 @@ mod tests {
                                                                },
                                                                Err(_) => continue
                                                        }
-                                               }
-                                       },
-                                       Err(e) => panic!("Unexpected error: {}", e)
+                                               },
+                                               Err(e) => panic!("Unexpected error: {}", e)
+                                       }
                                }
                        }
                }