Add a background processing function that is async.
[rust-lightning] / lightning-background-processor / src / lib.rs
index b2edc2301efda2b39224cf557b82b313ef46890f..d38c30e584fa1237b0b3870dda8c7825d0d67e72 100644 (file)
@@ -31,6 +31,9 @@ use std::thread::JoinHandle;
 use std::time::{Duration, Instant};
 use std::ops::Deref;
 
+#[cfg(feature = "futures")]
+use futures::{select, future::FutureExt};
+
 /// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep
 /// Rust-Lightning running properly, and (2) either can or should be run in the background. Its
 /// responsibilities are:
@@ -79,7 +82,7 @@ const PING_TIMER: u64 = 1;
 /// Prune the network graph of stale entries hourly.
 const NETWORK_PRUNE_TIMER: u64 = 60 * 60;
 
-#[cfg(all(not(test), debug_assertions))]
+#[cfg(not(test))]
 const SCORER_PERSIST_TIMER: u64 = 30;
 #[cfg(test)]
 const SCORER_PERSIST_TIMER: u64 = 1;
@@ -137,6 +140,55 @@ where A::Target: chain::Access, L::Target: Logger {
        }
 }
 
+/// (C-not exported) as the bindings concretize everything and have constructors for us
+impl<P: Deref<Target = P2PGossipSync<G, A, L>>, G: Deref<Target = NetworkGraph<L>>, A: Deref, L: Deref>
+       GossipSync<P, &RapidGossipSync<G, L>, G, A, L>
+where
+       A::Target: chain::Access,
+       L::Target: Logger,
+{
+       /// Initializes a new [`GossipSync::P2P`] variant.
+       pub fn p2p(gossip_sync: P) -> Self {
+               GossipSync::P2P(gossip_sync)
+       }
+}
+
+/// (C-not exported) as the bindings concretize everything and have constructors for us
+impl<'a, R: Deref<Target = RapidGossipSync<G, L>>, G: Deref<Target = NetworkGraph<L>>, L: Deref>
+       GossipSync<
+               &P2PGossipSync<G, &'a (dyn chain::Access + Send + Sync), L>,
+               R,
+               G,
+               &'a (dyn chain::Access + Send + Sync),
+               L,
+       >
+where
+       L::Target: Logger,
+{
+       /// Initializes a new [`GossipSync::Rapid`] variant.
+       pub fn rapid(gossip_sync: R) -> Self {
+               GossipSync::Rapid(gossip_sync)
+       }
+}
+
+/// (C-not exported) as the bindings concretize everything and have constructors for us
+impl<'a, L: Deref>
+       GossipSync<
+               &P2PGossipSync<&'a NetworkGraph<L>, &'a (dyn chain::Access + Send + Sync), L>,
+               &RapidGossipSync<&'a NetworkGraph<L>, L>,
+               &'a NetworkGraph<L>,
+               &'a (dyn chain::Access + Send + Sync),
+               L,
+       >
+where
+       L::Target: Logger,
+{
+       /// Initializes a new [`GossipSync::None`] variant.
+       pub fn none() -> Self {
+               GossipSync::None
+       }
+}
+
 /// Decorates an [`EventHandler`] with common functionality provided by standard [`EventHandler`]s.
 struct DecoratingEventHandler<
        'a,
@@ -170,6 +222,203 @@ where A::Target: chain::Access, L::Target: Logger {
        }
 }
 
+macro_rules! define_run_body {
+       ($persister: ident, $event_handler: ident, $chain_monitor: ident, $channel_manager: ident,
+        $gossip_sync: ident, $peer_manager: ident, $logger: ident, $scorer: ident,
+        $loop_exit_check: expr, $await: expr)
+       => { {
+               let event_handler = DecoratingEventHandler {
+                       event_handler: $event_handler,
+                       gossip_sync: &$gossip_sync,
+               };
+
+               log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup");
+               $channel_manager.timer_tick_occurred();
+
+               let mut last_freshness_call = Instant::now();
+               let mut last_ping_call = Instant::now();
+               let mut last_prune_call = Instant::now();
+               let mut last_scorer_persist_call = Instant::now();
+               let mut have_pruned = false;
+
+               loop {
+                       $channel_manager.process_pending_events(&event_handler);
+                       $chain_monitor.process_pending_events(&event_handler);
+
+                       // Note that the PeerManager::process_events may block on ChannelManager's locks,
+                       // hence it comes last here. When the ChannelManager finishes whatever it's doing,
+                       // we want to ensure we get into `persist_manager` as quickly as we can, especially
+                       // without running the normal event processing above and handing events to users.
+                       //
+                       // Specifically, on an *extremely* slow machine, we may see ChannelManager start
+                       // processing a message effectively at any point during this loop. In order to
+                       // minimize the time between such processing completing and persisting the updated
+                       // ChannelManager, we want to minimize methods blocking on a ChannelManager
+                       // generally, and as a fallback place such blocking only immediately before
+                       // persistence.
+                       $peer_manager.process_events();
+
+                       // We wait up to 100ms, but track how long it takes to detect being put to sleep,
+                       // see `await_start`'s use below.
+                       let await_start = Instant::now();
+                       let updates_available = $await;
+                       let await_time = await_start.elapsed();
+
+                       if updates_available {
+                               log_trace!($logger, "Persisting ChannelManager...");
+                               $persister.persist_manager(&*$channel_manager)?;
+                               log_trace!($logger, "Done persisting ChannelManager.");
+                       }
+                       // Exit the loop if the background processor was requested to stop.
+                       if $loop_exit_check {
+                               log_trace!($logger, "Terminating background processor.");
+                               break;
+                       }
+                       if last_freshness_call.elapsed().as_secs() > FRESHNESS_TIMER {
+                               log_trace!($logger, "Calling ChannelManager's timer_tick_occurred");
+                               $channel_manager.timer_tick_occurred();
+                               last_freshness_call = Instant::now();
+                       }
+                       if await_time > Duration::from_secs(1) {
+                               // On various platforms, we may be starved of CPU cycles for several reasons.
+                               // E.g. on iOS, if we've been in the background, we will be entirely paused.
+                               // Similarly, if we're on a desktop platform and the device has been asleep, we
+                               // may not get any cycles.
+                               // We detect this by checking if our max-100ms-sleep, above, ran longer than a
+                               // full second, at which point we assume sockets may have been killed (they
+                               // appear to be at least on some platforms, even if it has only been a second).
+                               // Note that we have to take care to not get here just because user event
+                               // processing was slow at the top of the loop. For example, the sample client
+                               // may call Bitcoin Core RPCs during event handling, which very often takes
+                               // more than a handful of seconds to complete, and shouldn't disconnect all our
+                               // peers.
+                               log_trace!($logger, "100ms sleep took more than a second, disconnecting peers.");
+                               $peer_manager.disconnect_all_peers();
+                               last_ping_call = Instant::now();
+                       } else if last_ping_call.elapsed().as_secs() > PING_TIMER {
+                               log_trace!($logger, "Calling PeerManager's timer_tick_occurred");
+                               $peer_manager.timer_tick_occurred();
+                               last_ping_call = Instant::now();
+                       }
+
+                       // Note that we want to run a graph prune once not long after startup before
+                       // falling back to our usual hourly prunes. This avoids short-lived clients never
+                       // pruning their network graph. We run once 60 seconds after startup before
+                       // continuing our normal cadence.
+                       if last_prune_call.elapsed().as_secs() > if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER } {
+                               // The network graph must not be pruned while rapid sync completion is pending
+                               log_trace!($logger, "Assessing prunability of network graph");
+                               if let Some(network_graph) = $gossip_sync.prunable_network_graph() {
+                                       network_graph.remove_stale_channels();
+
+                                       if let Err(e) = $persister.persist_graph(network_graph) {
+                                               log_error!($logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
+                                       }
+
+                                       last_prune_call = Instant::now();
+                                       have_pruned = true;
+                               } else {
+                                       log_trace!($logger, "Not pruning network graph, either due to pending rapid gossip sync or absence of a prunable graph.");
+                               }
+                       }
+
+                       if last_scorer_persist_call.elapsed().as_secs() > SCORER_PERSIST_TIMER {
+                               if let Some(ref scorer) = $scorer {
+                                       log_trace!($logger, "Persisting scorer");
+                                       if let Err(e) = $persister.persist_scorer(&scorer) {
+                                               log_error!($logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
+                                       }
+                               }
+                               last_scorer_persist_call = Instant::now();
+                       }
+               }
+
+               // After we exit, ensure we persist the ChannelManager one final time - this avoids
+               // some races where users quit while channel updates were in-flight, with
+               // ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
+               $persister.persist_manager(&*$channel_manager)?;
+
+               // Persist Scorer on exit
+               if let Some(ref scorer) = $scorer {
+                       $persister.persist_scorer(&scorer)?;
+               }
+
+               // Persist NetworkGraph on exit
+               if let Some(network_graph) = $gossip_sync.network_graph() {
+                       $persister.persist_graph(network_graph)?;
+               }
+
+               Ok(())
+       } }
+}
+
+/// Processes background events in a future.
+///
+/// `sleeper` should return a future which completes in the given amount of time and returns a
+/// boolean indicating whether the background processing should continue. Once `sleeper` returns a
+/// future which outputs false, the loop will exit and this function's future will complete.
+///
+/// See [`BackgroundProcessor::start`] for information on which actions this handles.
+#[cfg(feature = "futures")]
+pub async fn process_events_async<
+       'a,
+       Signer: 'static + Sign,
+       CA: 'static + Deref + Send + Sync,
+       CF: 'static + Deref + Send + Sync,
+       CW: 'static + Deref + Send + Sync,
+       T: 'static + Deref + Send + Sync,
+       K: 'static + Deref + Send + Sync,
+       F: 'static + Deref + Send + Sync,
+       G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
+       L: 'static + Deref + Send + Sync,
+       P: 'static + Deref + Send + Sync,
+       Descriptor: 'static + SocketDescriptor + Send + Sync,
+       CMH: 'static + Deref + Send + Sync,
+       RMH: 'static + Deref + Send + Sync,
+       EH: 'static + EventHandler + Send,
+       PS: 'static + Deref + Send,
+       M: 'static + Deref<Target = ChainMonitor<Signer, CF, T, F, L, P>> + Send + Sync,
+       CM: 'static + Deref<Target = ChannelManager<Signer, CW, T, K, F, L>> + Send + Sync,
+       PGS: 'static + Deref<Target = P2PGossipSync<G, CA, L>> + Send + Sync,
+       RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
+       UMH: 'static + Deref + Send + Sync,
+       PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, L, UMH>> + Send + Sync,
+       S: 'static + Deref<Target = SC> + Send + Sync,
+       SC: WriteableScore<'a>,
+       SleepFuture: core::future::Future<Output = bool>,
+       Sleeper: Fn(Duration) -> SleepFuture
+>(
+       persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
+       gossip_sync: GossipSync<PGS, RGS, G, CA, L>, peer_manager: PM, logger: L, scorer: Option<S>,
+       sleeper: Sleeper,
+) -> Result<(), std::io::Error>
+where
+       CA::Target: 'static + chain::Access,
+       CF::Target: 'static + chain::Filter,
+       CW::Target: 'static + chain::Watch<Signer>,
+       T::Target: 'static + BroadcasterInterface,
+       K::Target: 'static + KeysInterface<Signer = Signer>,
+       F::Target: 'static + FeeEstimator,
+       L::Target: 'static + Logger,
+       P::Target: 'static + Persist<Signer>,
+       CMH::Target: 'static + ChannelMessageHandler,
+       RMH::Target: 'static + RoutingMessageHandler,
+       UMH::Target: 'static + CustomMessageHandler,
+       PS::Target: 'static + Persister<'a, Signer, CW, T, K, F, L, SC>,
+{
+       let mut should_continue = true;
+       define_run_body!(persister, event_handler, chain_monitor, channel_manager,
+               gossip_sync, peer_manager, logger, scorer, should_continue, {
+                       select! {
+                               _ = channel_manager.get_persistable_update_future().fuse() => true,
+                               cont = sleeper(Duration::from_millis(100)).fuse() => {
+                                       should_continue = cont;
+                                       false
+                               }
+                       }
+               })
+}
+
 impl BackgroundProcessor {
        /// Start a background thread that takes care of responsibilities enumerated in the [top-level
        /// documentation].
@@ -202,7 +451,7 @@ impl BackgroundProcessor {
        ///
        /// # Rapid Gossip Sync
        ///
-       /// If rapid gossip sync is meant to run at startup, pass a [`RapidGossipSync`] to `gossip_sync`
+       /// If rapid gossip sync is meant to run at startup, pass [`RapidGossipSync`] via `gossip_sync`
        /// to indicate that the [`BackgroundProcessor`] should not prune the [`NetworkGraph`] instance
        /// until the [`RapidGossipSync`] instance completes its first sync.
        ///
@@ -261,129 +510,9 @@ impl BackgroundProcessor {
                let stop_thread = Arc::new(AtomicBool::new(false));
                let stop_thread_clone = stop_thread.clone();
                let handle = thread::spawn(move || -> Result<(), std::io::Error> {
-                       let event_handler = DecoratingEventHandler {
-                               event_handler,
-                               gossip_sync: &gossip_sync,
-                       };
-
-                       log_trace!(logger, "Calling ChannelManager's timer_tick_occurred on startup");
-                       channel_manager.timer_tick_occurred();
-
-                       let mut last_freshness_call = Instant::now();
-                       let mut last_ping_call = Instant::now();
-                       let mut last_prune_call = Instant::now();
-                       let mut last_scorer_persist_call = Instant::now();
-                       let mut have_pruned = false;
-
-                       loop {
-                               channel_manager.process_pending_events(&event_handler);
-                               chain_monitor.process_pending_events(&event_handler);
-
-                               // Note that the PeerManager::process_events may block on ChannelManager's locks,
-                               // hence it comes last here. When the ChannelManager finishes whatever it's doing,
-                               // we want to ensure we get into `persist_manager` as quickly as we can, especially
-                               // without running the normal event processing above and handing events to users.
-                               //
-                               // Specifically, on an *extremely* slow machine, we may see ChannelManager start
-                               // processing a message effectively at any point during this loop. In order to
-                               // minimize the time between such processing completing and persisting the updated
-                               // ChannelManager, we want to minimize methods blocking on a ChannelManager
-                               // generally, and as a fallback place such blocking only immediately before
-                               // persistence.
-                               peer_manager.process_events();
-
-                               // We wait up to 100ms, but track how long it takes to detect being put to sleep,
-                               // see `await_start`'s use below.
-                               let await_start = Instant::now();
-                               let updates_available =
-                                       channel_manager.await_persistable_update_timeout(Duration::from_millis(100));
-                               let await_time = await_start.elapsed();
-
-                               if updates_available {
-                                       log_trace!(logger, "Persisting ChannelManager...");
-                                       persister.persist_manager(&*channel_manager)?;
-                                       log_trace!(logger, "Done persisting ChannelManager.");
-                               }
-                               // Exit the loop if the background processor was requested to stop.
-                               if stop_thread.load(Ordering::Acquire) == true {
-                                       log_trace!(logger, "Terminating background processor.");
-                                       break;
-                               }
-                               if last_freshness_call.elapsed().as_secs() > FRESHNESS_TIMER {
-                                       log_trace!(logger, "Calling ChannelManager's timer_tick_occurred");
-                                       channel_manager.timer_tick_occurred();
-                                       last_freshness_call = Instant::now();
-                               }
-                               if await_time > Duration::from_secs(1) {
-                                       // On various platforms, we may be starved of CPU cycles for several reasons.
-                                       // E.g. on iOS, if we've been in the background, we will be entirely paused.
-                                       // Similarly, if we're on a desktop platform and the device has been asleep, we
-                                       // may not get any cycles.
-                                       // We detect this by checking if our max-100ms-sleep, above, ran longer than a
-                                       // full second, at which point we assume sockets may have been killed (they
-                                       // appear to be at least on some platforms, even if it has only been a second).
-                                       // Note that we have to take care to not get here just because user event
-                                       // processing was slow at the top of the loop. For example, the sample client
-                                       // may call Bitcoin Core RPCs during event handling, which very often takes
-                                       // more than a handful of seconds to complete, and shouldn't disconnect all our
-                                       // peers.
-                                       log_trace!(logger, "100ms sleep took more than a second, disconnecting peers.");
-                                       peer_manager.disconnect_all_peers();
-                                       last_ping_call = Instant::now();
-                               } else if last_ping_call.elapsed().as_secs() > PING_TIMER {
-                                       log_trace!(logger, "Calling PeerManager's timer_tick_occurred");
-                                       peer_manager.timer_tick_occurred();
-                                       last_ping_call = Instant::now();
-                               }
-
-                               // Note that we want to run a graph prune once not long after startup before
-                               // falling back to our usual hourly prunes. This avoids short-lived clients never
-                               // pruning their network graph. We run once 60 seconds after startup before
-                               // continuing our normal cadence.
-                               if last_prune_call.elapsed().as_secs() > if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER } {
-                                       // The network graph must not be pruned while rapid sync completion is pending
-                                       log_trace!(logger, "Assessing prunability of network graph");
-                                       if let Some(network_graph) = gossip_sync.prunable_network_graph() {
-                                               network_graph.remove_stale_channels();
-
-                                               if let Err(e) = persister.persist_graph(network_graph) {
-                                                       log_error!(logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
-                                               }
-
-                                               last_prune_call = Instant::now();
-                                               have_pruned = true;
-                                       } else {
-                                               log_trace!(logger, "Not pruning network graph, either due to pending rapid gossip sync or absence of a prunable graph.");
-                                       }
-                               }
-
-                               if last_scorer_persist_call.elapsed().as_secs() > SCORER_PERSIST_TIMER {
-                                       if let Some(ref scorer) = scorer {
-                                               log_trace!(logger, "Persisting scorer");
-                                               if let Err(e) = persister.persist_scorer(&scorer) {
-                                                       log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
-                                               }
-                                       }
-                                       last_scorer_persist_call = Instant::now();
-                               }
-                       }
-
-                       // After we exit, ensure we persist the ChannelManager one final time - this avoids
-                       // some races where users quit while channel updates were in-flight, with
-                       // ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
-                       persister.persist_manager(&*channel_manager)?;
-
-                       // Persist Scorer on exit
-                       if let Some(ref scorer) = scorer {
-                               persister.persist_scorer(&scorer)?;
-                       }
-
-                       // Persist NetworkGraph on exit
-                       if let Some(network_graph) = gossip_sync.network_graph() {
-                               persister.persist_graph(network_graph)?;
-                       }
-
-                       Ok(())
+                       define_run_body!(persister, event_handler, chain_monitor, channel_manager,
+                               gossip_sync, peer_manager, logger, scorer, stop_thread.load(Ordering::Acquire),
+                               channel_manager.await_persistable_update_timeout(Duration::from_millis(100)))
                });
                Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
        }
@@ -744,7 +873,7 @@ mod tests {
                }
 
                // Force-close the channel.
-               nodes[0].node.force_close_channel(&OutPoint { txid: tx.txid(), index: 0 }.to_channel_id(), &nodes[1].node.get_our_node_id()).unwrap();
+               nodes[0].node.force_close_broadcasting_latest_txn(&OutPoint { txid: tx.txid(), index: 0 }.to_channel_id(), &nodes[1].node.get_our_node_id()).unwrap();
 
                // Check that the force-close updates are persisted.
                check_persisted_data!(nodes[0].node, filepath.clone());
@@ -880,7 +1009,7 @@ mod tests {
                let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
 
                // Force close the channel and check that the SpendableOutputs event was handled.
-               nodes[0].node.force_close_channel(&nodes[0].node.list_channels()[0].channel_id, &nodes[1].node.get_our_node_id()).unwrap();
+               nodes[0].node.force_close_broadcasting_latest_txn(&nodes[0].node.list_channels()[0].channel_id, &nodes[1].node.get_our_node_id()).unwrap();
                let commitment_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
                confirm_transaction_depth(&mut nodes[0], &commitment_tx, BREAKDOWN_TIMEOUT as u32);
                let event = receiver