use std::time::{Duration, Instant};
use std::ops::Deref;
+#[cfg(feature = "futures")]
+use futures::{select, future::FutureExt};
+
/// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep
/// Rust-Lightning running properly, and (2) either can or should be run in the background. Its
/// responsibilities are:
}
}
+macro_rules! define_run_body {
+ ($persister: ident, $event_handler: ident, $chain_monitor: ident, $channel_manager: ident,
+ $gossip_sync: ident, $peer_manager: ident, $logger: ident, $scorer: ident,
+ $loop_exit_check: expr, $await: expr)
+ => { {
+ let event_handler = DecoratingEventHandler {
+ event_handler: $event_handler,
+ gossip_sync: &$gossip_sync,
+ };
+
+ log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup");
+ $channel_manager.timer_tick_occurred();
+
+ let mut last_freshness_call = Instant::now();
+ let mut last_ping_call = Instant::now();
+ let mut last_prune_call = Instant::now();
+ let mut last_scorer_persist_call = Instant::now();
+ let mut have_pruned = false;
+
+ loop {
+ $channel_manager.process_pending_events(&event_handler);
+ $chain_monitor.process_pending_events(&event_handler);
+
+ // Note that the PeerManager::process_events may block on ChannelManager's locks,
+ // hence it comes last here. When the ChannelManager finishes whatever it's doing,
+ // we want to ensure we get into `persist_manager` as quickly as we can, especially
+ // without running the normal event processing above and handing events to users.
+ //
+ // Specifically, on an *extremely* slow machine, we may see ChannelManager start
+ // processing a message effectively at any point during this loop. In order to
+ // minimize the time between such processing completing and persisting the updated
+ // ChannelManager, we want to minimize methods blocking on a ChannelManager
+ // generally, and as a fallback place such blocking only immediately before
+ // persistence.
+ $peer_manager.process_events();
+
+ // We wait up to 100ms, but track how long it takes to detect being put to sleep,
+ // see `await_start`'s use below.
+ let await_start = Instant::now();
+ let updates_available = $await;
+ let await_time = await_start.elapsed();
+
+ if updates_available {
+ log_trace!($logger, "Persisting ChannelManager...");
+ $persister.persist_manager(&*$channel_manager)?;
+ log_trace!($logger, "Done persisting ChannelManager.");
+ }
+ // Exit the loop if the background processor was requested to stop.
+ if $loop_exit_check {
+ log_trace!($logger, "Terminating background processor.");
+ break;
+ }
+ if last_freshness_call.elapsed().as_secs() > FRESHNESS_TIMER {
+ log_trace!($logger, "Calling ChannelManager's timer_tick_occurred");
+ $channel_manager.timer_tick_occurred();
+ last_freshness_call = Instant::now();
+ }
+ if await_time > Duration::from_secs(1) {
+ // On various platforms, we may be starved of CPU cycles for several reasons.
+ // E.g. on iOS, if we've been in the background, we will be entirely paused.
+ // Similarly, if we're on a desktop platform and the device has been asleep, we
+ // may not get any cycles.
+ // We detect this by checking if our max-100ms-sleep, above, ran longer than a
+ // full second, at which point we assume sockets may have been killed (they
+ // appear to be at least on some platforms, even if it has only been a second).
+ // Note that we have to take care to not get here just because user event
+ // processing was slow at the top of the loop. For example, the sample client
+ // may call Bitcoin Core RPCs during event handling, which very often takes
+ // more than a handful of seconds to complete, and shouldn't disconnect all our
+ // peers.
+ log_trace!($logger, "100ms sleep took more than a second, disconnecting peers.");
+ $peer_manager.disconnect_all_peers();
+ last_ping_call = Instant::now();
+ } else if last_ping_call.elapsed().as_secs() > PING_TIMER {
+ log_trace!($logger, "Calling PeerManager's timer_tick_occurred");
+ $peer_manager.timer_tick_occurred();
+ last_ping_call = Instant::now();
+ }
+
+ // Note that we want to run a graph prune once not long after startup before
+ // falling back to our usual hourly prunes. This avoids short-lived clients never
+ // pruning their network graph. We run once 60 seconds after startup before
+ // continuing our normal cadence.
+ if last_prune_call.elapsed().as_secs() > if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER } {
+ // The network graph must not be pruned while rapid sync completion is pending
+ log_trace!($logger, "Assessing prunability of network graph");
+ if let Some(network_graph) = $gossip_sync.prunable_network_graph() {
+ network_graph.remove_stale_channels();
+
+ if let Err(e) = $persister.persist_graph(network_graph) {
+ log_error!($logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
+ }
+
+ last_prune_call = Instant::now();
+ have_pruned = true;
+ } else {
+ log_trace!($logger, "Not pruning network graph, either due to pending rapid gossip sync or absence of a prunable graph.");
+ }
+ }
+
+ if last_scorer_persist_call.elapsed().as_secs() > SCORER_PERSIST_TIMER {
+ if let Some(ref scorer) = $scorer {
+ log_trace!($logger, "Persisting scorer");
+ if let Err(e) = $persister.persist_scorer(&scorer) {
+ log_error!($logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
+ }
+ }
+ last_scorer_persist_call = Instant::now();
+ }
+ }
+
+ // After we exit, ensure we persist the ChannelManager one final time - this avoids
+ // some races where users quit while channel updates were in-flight, with
+ // ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
+ $persister.persist_manager(&*$channel_manager)?;
+
+ // Persist Scorer on exit
+ if let Some(ref scorer) = $scorer {
+ $persister.persist_scorer(&scorer)?;
+ }
+
+ // Persist NetworkGraph on exit
+ if let Some(network_graph) = $gossip_sync.network_graph() {
+ $persister.persist_graph(network_graph)?;
+ }
+
+ Ok(())
+ } }
+}
+
+/// Processes background events in a future.
+///
+/// `sleeper` should return a future which completes in the given amount of time and returns a
+/// boolean indicating whether the background processing should continue. Once `sleeper` returns a
+/// future which outputs false, the loop will exit and this function's future will complete.
+///
+/// See [`BackgroundProcessor::start`] for information on which actions this handles.
+#[cfg(feature = "futures")]
+pub async fn process_events_async<
+ 'a,
+ Signer: 'static + Sign,
+ CA: 'static + Deref + Send + Sync,
+ CF: 'static + Deref + Send + Sync,
+ CW: 'static + Deref + Send + Sync,
+ T: 'static + Deref + Send + Sync,
+ K: 'static + Deref + Send + Sync,
+ F: 'static + Deref + Send + Sync,
+ G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
+ L: 'static + Deref + Send + Sync,
+ P: 'static + Deref + Send + Sync,
+ Descriptor: 'static + SocketDescriptor + Send + Sync,
+ CMH: 'static + Deref + Send + Sync,
+ RMH: 'static + Deref + Send + Sync,
+ EH: 'static + EventHandler + Send,
+ PS: 'static + Deref + Send,
+ M: 'static + Deref<Target = ChainMonitor<Signer, CF, T, F, L, P>> + Send + Sync,
+ CM: 'static + Deref<Target = ChannelManager<Signer, CW, T, K, F, L>> + Send + Sync,
+ PGS: 'static + Deref<Target = P2PGossipSync<G, CA, L>> + Send + Sync,
+ RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
+ UMH: 'static + Deref + Send + Sync,
+ PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, L, UMH>> + Send + Sync,
+ S: 'static + Deref<Target = SC> + Send + Sync,
+ SC: WriteableScore<'a>,
+ SleepFuture: core::future::Future<Output = bool>,
+ Sleeper: Fn(Duration) -> SleepFuture
+>(
+ persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
+ gossip_sync: GossipSync<PGS, RGS, G, CA, L>, peer_manager: PM, logger: L, scorer: Option<S>,
+ sleeper: Sleeper,
+) -> Result<(), std::io::Error>
+where
+ CA::Target: 'static + chain::Access,
+ CF::Target: 'static + chain::Filter,
+ CW::Target: 'static + chain::Watch<Signer>,
+ T::Target: 'static + BroadcasterInterface,
+ K::Target: 'static + KeysInterface<Signer = Signer>,
+ F::Target: 'static + FeeEstimator,
+ L::Target: 'static + Logger,
+ P::Target: 'static + Persist<Signer>,
+ CMH::Target: 'static + ChannelMessageHandler,
+ RMH::Target: 'static + RoutingMessageHandler,
+ UMH::Target: 'static + CustomMessageHandler,
+ PS::Target: 'static + Persister<'a, Signer, CW, T, K, F, L, SC>,
+{
+ let mut should_continue = true;
+ define_run_body!(persister, event_handler, chain_monitor, channel_manager,
+ gossip_sync, peer_manager, logger, scorer, should_continue, {
+ select! {
+ _ = channel_manager.get_persistable_update_future().fuse() => true,
+ cont = sleeper(Duration::from_millis(100)).fuse() => {
+ should_continue = cont;
+ false
+ }
+ }
+ })
+}
+
impl BackgroundProcessor {
/// Start a background thread that takes care of responsibilities enumerated in the [top-level
/// documentation].
let stop_thread = Arc::new(AtomicBool::new(false));
let stop_thread_clone = stop_thread.clone();
let handle = thread::spawn(move || -> Result<(), std::io::Error> {
- let event_handler = DecoratingEventHandler {
- event_handler,
- gossip_sync: &gossip_sync,
- };
-
- log_trace!(logger, "Calling ChannelManager's timer_tick_occurred on startup");
- channel_manager.timer_tick_occurred();
-
- let mut last_freshness_call = Instant::now();
- let mut last_ping_call = Instant::now();
- let mut last_prune_call = Instant::now();
- let mut last_scorer_persist_call = Instant::now();
- let mut have_pruned = false;
-
- loop {
- channel_manager.process_pending_events(&event_handler);
- chain_monitor.process_pending_events(&event_handler);
-
- // Note that the PeerManager::process_events may block on ChannelManager's locks,
- // hence it comes last here. When the ChannelManager finishes whatever it's doing,
- // we want to ensure we get into `persist_manager` as quickly as we can, especially
- // without running the normal event processing above and handing events to users.
- //
- // Specifically, on an *extremely* slow machine, we may see ChannelManager start
- // processing a message effectively at any point during this loop. In order to
- // minimize the time between such processing completing and persisting the updated
- // ChannelManager, we want to minimize methods blocking on a ChannelManager
- // generally, and as a fallback place such blocking only immediately before
- // persistence.
- peer_manager.process_events();
-
- // We wait up to 100ms, but track how long it takes to detect being put to sleep,
- // see `await_start`'s use below.
- let await_start = Instant::now();
- let updates_available =
- channel_manager.await_persistable_update_timeout(Duration::from_millis(100));
- let await_time = await_start.elapsed();
-
- if updates_available {
- log_trace!(logger, "Persisting ChannelManager...");
- persister.persist_manager(&*channel_manager)?;
- log_trace!(logger, "Done persisting ChannelManager.");
- }
- // Exit the loop if the background processor was requested to stop.
- if stop_thread.load(Ordering::Acquire) == true {
- log_trace!(logger, "Terminating background processor.");
- break;
- }
- if last_freshness_call.elapsed().as_secs() > FRESHNESS_TIMER {
- log_trace!(logger, "Calling ChannelManager's timer_tick_occurred");
- channel_manager.timer_tick_occurred();
- last_freshness_call = Instant::now();
- }
- if await_time > Duration::from_secs(1) {
- // On various platforms, we may be starved of CPU cycles for several reasons.
- // E.g. on iOS, if we've been in the background, we will be entirely paused.
- // Similarly, if we're on a desktop platform and the device has been asleep, we
- // may not get any cycles.
- // We detect this by checking if our max-100ms-sleep, above, ran longer than a
- // full second, at which point we assume sockets may have been killed (they
- // appear to be at least on some platforms, even if it has only been a second).
- // Note that we have to take care to not get here just because user event
- // processing was slow at the top of the loop. For example, the sample client
- // may call Bitcoin Core RPCs during event handling, which very often takes
- // more than a handful of seconds to complete, and shouldn't disconnect all our
- // peers.
- log_trace!(logger, "100ms sleep took more than a second, disconnecting peers.");
- peer_manager.disconnect_all_peers();
- last_ping_call = Instant::now();
- } else if last_ping_call.elapsed().as_secs() > PING_TIMER {
- log_trace!(logger, "Calling PeerManager's timer_tick_occurred");
- peer_manager.timer_tick_occurred();
- last_ping_call = Instant::now();
- }
-
- // Note that we want to run a graph prune once not long after startup before
- // falling back to our usual hourly prunes. This avoids short-lived clients never
- // pruning their network graph. We run once 60 seconds after startup before
- // continuing our normal cadence.
- if last_prune_call.elapsed().as_secs() > if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER } {
- // The network graph must not be pruned while rapid sync completion is pending
- log_trace!(logger, "Assessing prunability of network graph");
- if let Some(network_graph) = gossip_sync.prunable_network_graph() {
- network_graph.remove_stale_channels();
-
- if let Err(e) = persister.persist_graph(network_graph) {
- log_error!(logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
- }
-
- last_prune_call = Instant::now();
- have_pruned = true;
- } else {
- log_trace!(logger, "Not pruning network graph, either due to pending rapid gossip sync or absence of a prunable graph.");
- }
- }
-
- if last_scorer_persist_call.elapsed().as_secs() > SCORER_PERSIST_TIMER {
- if let Some(ref scorer) = scorer {
- log_trace!(logger, "Persisting scorer");
- if let Err(e) = persister.persist_scorer(&scorer) {
- log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
- }
- }
- last_scorer_persist_call = Instant::now();
- }
- }
-
- // After we exit, ensure we persist the ChannelManager one final time - this avoids
- // some races where users quit while channel updates were in-flight, with
- // ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
- persister.persist_manager(&*channel_manager)?;
-
- // Persist Scorer on exit
- if let Some(ref scorer) = scorer {
- persister.persist_scorer(&scorer)?;
- }
-
- // Persist NetworkGraph on exit
- if let Some(network_graph) = gossip_sync.network_graph() {
- persister.persist_graph(network_graph)?;
- }
-
- Ok(())
+ define_run_body!(persister, event_handler, chain_monitor, channel_manager,
+ gossip_sync, peer_manager, logger, scorer, stop_thread.load(Ordering::Acquire),
+ channel_manager.await_persistable_update_timeout(Duration::from_millis(100)))
});
Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
}