+/// (C-not exported) as the bindings concretize everything and have constructors for us
+impl<P: Deref<Target = P2PGossipSync<G, A, L>>, G: Deref<Target = NetworkGraph<L>>, A: Deref, L: Deref>
+ GossipSync<P, &RapidGossipSync<G, L>, G, A, L>
+where
+ A::Target: chain::Access,
+ L::Target: Logger,
+{
+ /// Initializes a new [`GossipSync::P2P`] variant.
+ pub fn p2p(gossip_sync: P) -> Self {
+ GossipSync::P2P(gossip_sync)
+ }
+}
+
+/// (C-not exported) as the bindings concretize everything and have constructors for us
+impl<'a, R: Deref<Target = RapidGossipSync<G, L>>, G: Deref<Target = NetworkGraph<L>>, L: Deref>
+ GossipSync<
+ &P2PGossipSync<G, &'a (dyn chain::Access + Send + Sync), L>,
+ R,
+ G,
+ &'a (dyn chain::Access + Send + Sync),
+ L,
+ >
+where
+ L::Target: Logger,
+{
+ /// Initializes a new [`GossipSync::Rapid`] variant.
+ pub fn rapid(gossip_sync: R) -> Self {
+ GossipSync::Rapid(gossip_sync)
+ }
+}
+
+/// (C-not exported) as the bindings concretize everything and have constructors for us
+impl<'a, L: Deref>
+ GossipSync<
+ &P2PGossipSync<&'a NetworkGraph<L>, &'a (dyn chain::Access + Send + Sync), L>,
+ &RapidGossipSync<&'a NetworkGraph<L>, L>,
+ &'a NetworkGraph<L>,
+ &'a (dyn chain::Access + Send + Sync),
+ L,
+ >
+where
+ L::Target: Logger,
+{
+ /// Initializes a new [`GossipSync::None`] variant.
+ pub fn none() -> Self {
+ GossipSync::None
+ }
+}
+
+fn handle_network_graph_update<L: Deref>(
+ network_graph: &NetworkGraph<L>, event: &Event
+) where L::Target: Logger {
+ if let Event::PaymentPathFailed { ref network_update, .. } = event {
+ if let Some(network_update) = network_update {
+ network_graph.handle_network_update(&network_update);
+ }
+ }
+}
+
+macro_rules! define_run_body {
+ ($persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr,
+ $channel_manager: ident, $process_channel_manager_events: expr,
+ $gossip_sync: ident, $peer_manager: ident, $logger: ident, $scorer: ident,
+ $loop_exit_check: expr, $await: expr)
+ => { {
+ log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup");
+ $channel_manager.timer_tick_occurred();
+
+ let mut last_freshness_call = Instant::now();
+ let mut last_ping_call = Instant::now();
+ let mut last_prune_call = Instant::now();
+ let mut last_scorer_persist_call = Instant::now();
+ let mut have_pruned = false;
+
+ loop {
+ $process_channel_manager_events;
+ $process_chain_monitor_events;
+
+ // Note that the PeerManager::process_events may block on ChannelManager's locks,
+ // hence it comes last here. When the ChannelManager finishes whatever it's doing,
+ // we want to ensure we get into `persist_manager` as quickly as we can, especially
+ // without running the normal event processing above and handing events to users.
+ //
+ // Specifically, on an *extremely* slow machine, we may see ChannelManager start
+ // processing a message effectively at any point during this loop. In order to
+ // minimize the time between such processing completing and persisting the updated
+ // ChannelManager, we want to minimize methods blocking on a ChannelManager
+ // generally, and as a fallback place such blocking only immediately before
+ // persistence.
+ $peer_manager.process_events();
+
+ // We wait up to 100ms, but track how long it takes to detect being put to sleep,
+ // see `await_start`'s use below.
+ let await_start = Instant::now();
+ let updates_available = $await;
+ let await_time = await_start.elapsed();
+
+ if updates_available {
+ log_trace!($logger, "Persisting ChannelManager...");
+ $persister.persist_manager(&*$channel_manager)?;
+ log_trace!($logger, "Done persisting ChannelManager.");
+ }
+ // Exit the loop if the background processor was requested to stop.
+ if $loop_exit_check {
+ log_trace!($logger, "Terminating background processor.");
+ break;
+ }
+ if last_freshness_call.elapsed().as_secs() > FRESHNESS_TIMER {
+ log_trace!($logger, "Calling ChannelManager's timer_tick_occurred");
+ $channel_manager.timer_tick_occurred();
+ last_freshness_call = Instant::now();
+ }
+ if await_time > Duration::from_secs(1) {
+ // On various platforms, we may be starved of CPU cycles for several reasons.
+ // E.g. on iOS, if we've been in the background, we will be entirely paused.
+ // Similarly, if we're on a desktop platform and the device has been asleep, we
+ // may not get any cycles.
+ // We detect this by checking if our max-100ms-sleep, above, ran longer than a
+ // full second, at which point we assume sockets may have been killed (they
+ // appear to be at least on some platforms, even if it has only been a second).
+ // Note that we have to take care to not get here just because user event
+ // processing was slow at the top of the loop. For example, the sample client
+ // may call Bitcoin Core RPCs during event handling, which very often takes
+ // more than a handful of seconds to complete, and shouldn't disconnect all our
+ // peers.
+ log_trace!($logger, "100ms sleep took more than a second, disconnecting peers.");
+ $peer_manager.disconnect_all_peers();
+ last_ping_call = Instant::now();
+ } else if last_ping_call.elapsed().as_secs() > PING_TIMER {
+ log_trace!($logger, "Calling PeerManager's timer_tick_occurred");
+ $peer_manager.timer_tick_occurred();
+ last_ping_call = Instant::now();
+ }
+
+ // Note that we want to run a graph prune once not long after startup before
+ // falling back to our usual hourly prunes. This avoids short-lived clients never
+ // pruning their network graph. We run once 60 seconds after startup before
+ // continuing our normal cadence.
+ if last_prune_call.elapsed().as_secs() > if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER } {
+ // The network graph must not be pruned while rapid sync completion is pending
+ log_trace!($logger, "Assessing prunability of network graph");
+ if let Some(network_graph) = $gossip_sync.prunable_network_graph() {
+ network_graph.remove_stale_channels_and_tracking();
+
+ if let Err(e) = $persister.persist_graph(network_graph) {
+ log_error!($logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
+ }
+
+ last_prune_call = Instant::now();
+ have_pruned = true;
+ } else {
+ log_trace!($logger, "Not pruning network graph, either due to pending rapid gossip sync or absence of a prunable graph.");
+ }
+ }
+
+ if last_scorer_persist_call.elapsed().as_secs() > SCORER_PERSIST_TIMER {
+ if let Some(ref scorer) = $scorer {
+ log_trace!($logger, "Persisting scorer");
+ if let Err(e) = $persister.persist_scorer(&scorer) {
+ log_error!($logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
+ }
+ }
+ last_scorer_persist_call = Instant::now();
+ }
+ }
+
+ // After we exit, ensure we persist the ChannelManager one final time - this avoids
+ // some races where users quit while channel updates were in-flight, with
+ // ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
+ $persister.persist_manager(&*$channel_manager)?;
+
+ // Persist Scorer on exit
+ if let Some(ref scorer) = $scorer {
+ $persister.persist_scorer(&scorer)?;
+ }
+
+ // Persist NetworkGraph on exit
+ if let Some(network_graph) = $gossip_sync.network_graph() {
+ $persister.persist_graph(network_graph)?;
+ }
+
+ Ok(())
+ } }
+}
+
+/// Processes background events in a future.
+///
+/// `sleeper` should return a future which completes in the given amount of time and returns a
+/// boolean indicating whether the background processing should exit. Once `sleeper` returns a
+/// future which outputs true, the loop will exit and this function's future will complete.
+///
+/// See [`BackgroundProcessor::start`] for information on which actions this handles.
+#[cfg(feature = "futures")]
+pub async fn process_events_async<
+ 'a,
+ Signer: 'static + Sign,
+ CA: 'static + Deref + Send + Sync,
+ CF: 'static + Deref + Send + Sync,
+ CW: 'static + Deref + Send + Sync,
+ T: 'static + Deref + Send + Sync,
+ K: 'static + Deref + Send + Sync,
+ F: 'static + Deref + Send + Sync,
+ G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
+ L: 'static + Deref + Send + Sync,
+ P: 'static + Deref + Send + Sync,
+ Descriptor: 'static + SocketDescriptor + Send + Sync,
+ CMH: 'static + Deref + Send + Sync,
+ RMH: 'static + Deref + Send + Sync,
+ OMH: 'static + Deref + Send + Sync,
+ EventHandlerFuture: core::future::Future<Output = ()>,
+ EventHandler: Fn(Event) -> EventHandlerFuture,
+ PS: 'static + Deref + Send,
+ M: 'static + Deref<Target = ChainMonitor<Signer, CF, T, F, L, P>> + Send + Sync,
+ CM: 'static + Deref<Target = ChannelManager<CW, T, K, F, L>> + Send + Sync,
+ PGS: 'static + Deref<Target = P2PGossipSync<G, CA, L>> + Send + Sync,
+ RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
+ UMH: 'static + Deref + Send + Sync,
+ PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH>> + Send + Sync,
+ S: 'static + Deref<Target = SC> + Send + Sync,
+ SC: WriteableScore<'a>,
+ SleepFuture: core::future::Future<Output = bool>,
+ Sleeper: Fn(Duration) -> SleepFuture
+>(
+ persister: PS, event_handler: EventHandler, chain_monitor: M, channel_manager: CM,
+ gossip_sync: GossipSync<PGS, RGS, G, CA, L>, peer_manager: PM, logger: L, scorer: Option<S>,
+ sleeper: Sleeper,
+) -> Result<(), std::io::Error>
+where
+ CA::Target: 'static + chain::Access,
+ CF::Target: 'static + chain::Filter,
+ CW::Target: 'static + chain::Watch<Signer>,
+ T::Target: 'static + BroadcasterInterface,
+ K::Target: 'static + KeysInterface<Signer = Signer>,
+ F::Target: 'static + FeeEstimator,
+ L::Target: 'static + Logger,
+ P::Target: 'static + Persist<Signer>,
+ CMH::Target: 'static + ChannelMessageHandler,
+ OMH::Target: 'static + OnionMessageHandler,
+ RMH::Target: 'static + RoutingMessageHandler,
+ UMH::Target: 'static + CustomMessageHandler,
+ PS::Target: 'static + Persister<'a, Signer, CW, T, K, F, L, SC>,
+{
+ let mut should_break = true;
+ let async_event_handler = |event| {
+ let network_graph = gossip_sync.network_graph();
+ let event_handler = &event_handler;
+ async move {
+ if let Some(network_graph) = network_graph {
+ handle_network_graph_update(network_graph, &event)
+ }
+ event_handler(event).await;
+ }
+ };
+ define_run_body!(persister,
+ chain_monitor, chain_monitor.process_pending_events_async(async_event_handler).await,
+ channel_manager, channel_manager.process_pending_events_async(async_event_handler).await,
+ gossip_sync, peer_manager, logger, scorer, should_break, {
+ select_biased! {
+ _ = channel_manager.get_persistable_update_future().fuse() => true,
+ exit = sleeper(Duration::from_millis(100)).fuse() => {
+ should_break = exit;
+ false
+ }
+ }
+ })
+}
+