+/// Processes background events in a future.
+///
+/// `sleeper` should return a future which completes in the given amount of time and returns a
+/// boolean indicating whether the background processing should exit. Once `sleeper` returns a
+/// future which outputs `true`, the loop will exit and this function's future will complete.
+/// The `sleeper` future is free to return early after it has triggered the exit condition.
+///
+/// See [`BackgroundProcessor::start`] for information on which actions this handles.
+///
+/// Requires the `futures` feature. Note that while this method is available without the `std`
+/// feature, doing so will skip calling [`NetworkGraph::remove_stale_channels_and_tracking`],
+/// you should call [`NetworkGraph::remove_stale_channels_and_tracking_with_time`] regularly
+/// manually instead.
+///
+/// The `mobile_interruptable_platform` flag should be set if we're currently running on a
+/// mobile device, where we may need to check for interruption of the application regularly. If you
+/// are unsure, you should set the flag, as the performance impact of it is minimal unless there
+/// are hundreds or thousands of simultaneous process calls running.
+///
+/// For example, in order to process background events in a [Tokio](https://tokio.rs/) task, you
+/// could setup `process_events_async` like this:
+/// ```
+/// # struct MyPersister {}
+/// # impl lightning::util::persist::KVStorePersister for MyPersister {
+/// # fn persist<W: lightning::util::ser::Writeable>(&self, key: &str, object: &W) -> lightning::io::Result<()> { Ok(()) }
+/// # }
+/// # struct MyEventHandler {}
+/// # impl MyEventHandler {
+/// # async fn handle_event(&self, _: lightning::events::Event) {}
+/// # }
+/// # #[derive(Eq, PartialEq, Clone, Hash)]
+/// # struct MySocketDescriptor {}
+/// # impl lightning::ln::peer_handler::SocketDescriptor for MySocketDescriptor {
+/// # fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize { 0 }
+/// # fn disconnect_socket(&mut self) {}
+/// # }
+/// # use std::sync::{Arc, Mutex};
+/// # use std::sync::atomic::{AtomicBool, Ordering};
+/// # use lightning_background_processor::{process_events_async, GossipSync};
+/// # type MyBroadcaster = dyn lightning::chain::chaininterface::BroadcasterInterface + Send + Sync;
+/// # type MyFeeEstimator = dyn lightning::chain::chaininterface::FeeEstimator + Send + Sync;
+/// # type MyNodeSigner = dyn lightning::sign::NodeSigner + Send + Sync;
+/// # type MyUtxoLookup = dyn lightning::routing::utxo::UtxoLookup + Send + Sync;
+/// # type MyFilter = dyn lightning::chain::Filter + Send + Sync;
+/// # type MyLogger = dyn lightning::util::logger::Logger + Send + Sync;
+/// # type MyChainMonitor = lightning::chain::chainmonitor::ChainMonitor<lightning::sign::InMemorySigner, Arc<MyFilter>, Arc<MyBroadcaster>, Arc<MyFeeEstimator>, Arc<MyLogger>, Arc<MyPersister>>;
+/// # type MyPeerManager = lightning::ln::peer_handler::SimpleArcPeerManager<MySocketDescriptor, MyChainMonitor, MyBroadcaster, MyFeeEstimator, MyUtxoLookup, MyLogger>;
+/// # type MyNetworkGraph = lightning::routing::gossip::NetworkGraph<Arc<MyLogger>>;
+/// # type MyGossipSync = lightning::routing::gossip::P2PGossipSync<Arc<MyNetworkGraph>, Arc<MyUtxoLookup>, Arc<MyLogger>>;
+/// # type MyChannelManager = lightning::ln::channelmanager::SimpleArcChannelManager<MyChainMonitor, MyBroadcaster, MyFeeEstimator, MyLogger>;
+/// # type MyScorer = Mutex<lightning::routing::scoring::ProbabilisticScorer<Arc<MyNetworkGraph>, Arc<MyLogger>>>;
+///
+/// # async fn setup_background_processing(my_persister: Arc<MyPersister>, my_event_handler: Arc<MyEventHandler>, my_chain_monitor: Arc<MyChainMonitor>, my_channel_manager: Arc<MyChannelManager>, my_gossip_sync: Arc<MyGossipSync>, my_logger: Arc<MyLogger>, my_scorer: Arc<MyScorer>, my_peer_manager: Arc<MyPeerManager>) {
+/// let background_persister = Arc::clone(&my_persister);
+/// let background_event_handler = Arc::clone(&my_event_handler);
+/// let background_chain_mon = Arc::clone(&my_chain_monitor);
+/// let background_chan_man = Arc::clone(&my_channel_manager);
+/// let background_gossip_sync = GossipSync::p2p(Arc::clone(&my_gossip_sync));
+/// let background_peer_man = Arc::clone(&my_peer_manager);
+/// let background_logger = Arc::clone(&my_logger);
+/// let background_scorer = Arc::clone(&my_scorer);
+///
+/// // Setup the sleeper.
+/// let (stop_sender, stop_receiver) = tokio::sync::watch::channel(());
+///
+/// let sleeper = move |d| {
+/// let mut receiver = stop_receiver.clone();
+/// Box::pin(async move {
+/// tokio::select!{
+/// _ = tokio::time::sleep(d) => false,
+/// _ = receiver.changed() => true,
+/// }
+/// })
+/// };
+///
+/// let mobile_interruptable_platform = false;
+///
+/// let handle = tokio::spawn(async move {
+/// process_events_async(
+/// background_persister,
+/// |e| background_event_handler.handle_event(e),
+/// background_chain_mon,
+/// background_chan_man,
+/// background_gossip_sync,
+/// background_peer_man,
+/// background_logger,
+/// Some(background_scorer),
+/// sleeper,
+/// mobile_interruptable_platform,
+/// )
+/// .await
+/// .expect("Failed to process events");
+/// });
+///
+/// // Stop the background processing.
+/// stop_sender.send(()).unwrap();
+/// handle.await.unwrap();
+/// # }
+///```
+#[cfg(feature = "futures")]
+pub async fn process_events_async<
+ 'a,
+ UL: 'static + Deref + Send + Sync,
+ CF: 'static + Deref + Send + Sync,
+ CW: 'static + Deref + Send + Sync,
+ T: 'static + Deref + Send + Sync,
+ ES: 'static + Deref + Send + Sync,
+ NS: 'static + Deref + Send + Sync,
+ SP: 'static + Deref + Send + Sync,
+ F: 'static + Deref + Send + Sync,
+ R: 'static + Deref + Send + Sync,
+ G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
+ L: 'static + Deref + Send + Sync,
+ P: 'static + Deref + Send + Sync,
+ EventHandlerFuture: core::future::Future<Output = ()>,
+ EventHandler: Fn(Event) -> EventHandlerFuture,
+ PS: 'static + Deref + Send,
+ M: 'static + Deref<Target = ChainMonitor<<SP::Target as SignerProvider>::Signer, CF, T, F, L, P>> + Send + Sync,
+ CM: 'static + Deref<Target = ChannelManager<CW, T, ES, NS, SP, F, R, L>> + Send + Sync,
+ PGS: 'static + Deref<Target = P2PGossipSync<G, UL, L>> + Send + Sync,
+ RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
+ APM: APeerManager + Send + Sync,
+ PM: 'static + Deref<Target = APM> + Send + Sync,
+ S: 'static + Deref<Target = SC> + Send + Sync,
+ SC: for<'b> WriteableScore<'b>,
+ SleepFuture: core::future::Future<Output = bool> + core::marker::Unpin,
+ Sleeper: Fn(Duration) -> SleepFuture
+>(
+ persister: PS, event_handler: EventHandler, chain_monitor: M, channel_manager: CM,
+ gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM, logger: L, scorer: Option<S>,
+ sleeper: Sleeper, mobile_interruptable_platform: bool,
+) -> Result<(), lightning::io::Error>
+where
+ UL::Target: 'static + UtxoLookup,
+ CF::Target: 'static + chain::Filter,
+ CW::Target: 'static + chain::Watch<<SP::Target as SignerProvider>::Signer>,
+ T::Target: 'static + BroadcasterInterface,
+ ES::Target: 'static + EntropySource,
+ NS::Target: 'static + NodeSigner,
+ SP::Target: 'static + SignerProvider,
+ F::Target: 'static + FeeEstimator,
+ R::Target: 'static + Router,
+ L::Target: 'static + Logger,
+ P::Target: 'static + Persist<<SP::Target as SignerProvider>::Signer>,
+ PS::Target: 'static + Persister<'a, CW, T, ES, NS, SP, F, R, L, SC>,
+{
+ let mut should_break = false;
+ let async_event_handler = |event| {
+ let network_graph = gossip_sync.network_graph();
+ let event_handler = &event_handler;
+ let scorer = &scorer;
+ let logger = &logger;
+ let persister = &persister;
+ async move {
+ if let Some(network_graph) = network_graph {
+ handle_network_graph_update(network_graph, &event)
+ }
+ if let Some(ref scorer) = scorer {
+ if update_scorer(scorer, &event) {
+ log_trace!(logger, "Persisting scorer after update");
+ if let Err(e) = persister.persist_scorer(&scorer) {
+ log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
+ }
+ }
+ }
+ event_handler(event).await;
+ }
+ };
+ define_run_body!(persister,
+ chain_monitor, chain_monitor.process_pending_events_async(async_event_handler).await,
+ channel_manager, channel_manager.process_pending_events_async(async_event_handler).await,
+ gossip_sync, peer_manager, logger, scorer, should_break, {
+ let fut = Selector {
+ a: channel_manager.get_persistable_update_future(),
+ b: chain_monitor.get_update_future(),
+ c: sleeper(if mobile_interruptable_platform { Duration::from_millis(100) } else { Duration::from_secs(FASTEST_TIMER) }),
+ };
+ match fut.await {
+ SelectorOutput::A => true,
+ SelectorOutput::B => false,
+ SelectorOutput::C(exit) => {
+ should_break = exit;
+ false
+ }
+ }
+ }, |t| sleeper(Duration::from_secs(t)),
+ |fut: &mut SleepFuture, _| {
+ let mut waker = dummy_waker();
+ let mut ctx = task::Context::from_waker(&mut waker);
+ match core::pin::Pin::new(fut).poll(&mut ctx) {
+ task::Poll::Ready(exit) => { should_break = exit; true },
+ task::Poll::Pending => false,
+ }
+ }, mobile_interruptable_platform)
+}
+
+#[cfg(feature = "std")]