From feda5d343e23a2817f305af060e2ac130eab558d Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Thu, 2 Feb 2023 14:55:58 -0600 Subject: [PATCH] Add Tokio example to `process_events_async` docs --- lightning-background-processor/src/lib.rs | 84 ++++++++++++++++++++++- 1 file changed, 83 insertions(+), 1 deletion(-) diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index af9fbcd15..7e4f47913 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -466,7 +466,8 @@ use core::task; /// /// `sleeper` should return a future which completes in the given amount of time and returns a /// boolean indicating whether the background processing should exit. Once `sleeper` returns a -/// future which outputs true, the loop will exit and this function's future will complete. +/// future which outputs `true`, the loop will exit and this function's future will complete. +/// The `sleeper` future is free to return early after it has triggered the exit condition. /// /// See [`BackgroundProcessor::start`] for information on which actions this handles. /// @@ -479,6 +480,87 @@ use core::task; /// mobile device, where we may need to check for interruption of the application regularly. If you /// are unsure, you should set the flag, as the performance impact of it is minimal unless there /// are hundreds or thousands of simultaneous process calls running. +/// +/// For example, in order to process background events in a [Tokio](https://tokio.rs/) task, you +/// could setup `process_events_async` like this: +/// ``` +/// # struct MyPersister {} +/// # impl lightning::util::persist::KVStorePersister for MyPersister { +/// # fn persist(&self, key: &str, object: &W) -> lightning::io::Result<()> { Ok(()) } +/// # } +/// # struct MyEventHandler {} +/// # impl MyEventHandler { +/// # async fn handle_event(&self, _: lightning::events::Event) {} +/// # } +/// # #[derive(Eq, PartialEq, Clone, Hash)] +/// # struct MySocketDescriptor {} +/// # impl lightning::ln::peer_handler::SocketDescriptor for MySocketDescriptor { +/// # fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize { 0 } +/// # fn disconnect_socket(&mut self) {} +/// # } +/// # use std::sync::{Arc, Mutex}; +/// # use std::sync::atomic::{AtomicBool, Ordering}; +/// # use lightning_background_processor::{process_events_async, GossipSync}; +/// # type MyBroadcaster = dyn lightning::chain::chaininterface::BroadcasterInterface + Send + Sync; +/// # type MyFeeEstimator = dyn lightning::chain::chaininterface::FeeEstimator + Send + Sync; +/// # type MyNodeSigner = dyn lightning::chain::keysinterface::NodeSigner + Send + Sync; +/// # type MyUtxoLookup = dyn lightning::routing::utxo::UtxoLookup + Send + Sync; +/// # type MyFilter = dyn lightning::chain::Filter + Send + Sync; +/// # type MyLogger = dyn lightning::util::logger::Logger + Send + Sync; +/// # type MyChainMonitor = lightning::chain::chainmonitor::ChainMonitor, Arc, Arc, Arc, Arc>; +/// # type MyPeerManager = lightning::ln::peer_handler::SimpleArcPeerManager; +/// # type MyNetworkGraph = lightning::routing::gossip::NetworkGraph>; +/// # type MyGossipSync = lightning::routing::gossip::P2PGossipSync, Arc, Arc>; +/// # type MyChannelManager = lightning::ln::channelmanager::SimpleArcChannelManager; +/// # type MyScorer = Mutex, Arc>>; +/// +/// # async fn setup_background_processing(my_persister: Arc, my_event_handler: Arc, my_chain_monitor: Arc, my_channel_manager: Arc, my_gossip_sync: Arc, my_logger: Arc, my_scorer: Arc, my_peer_manager: Arc) { +/// let background_persister = Arc::clone(&my_persister); +/// let background_event_handler = Arc::clone(&my_event_handler); +/// let background_chain_mon = Arc::clone(&my_chain_monitor); +/// let background_chan_man = Arc::clone(&my_channel_manager); +/// let background_gossip_sync = GossipSync::p2p(Arc::clone(&my_gossip_sync)); +/// let background_peer_man = Arc::clone(&my_peer_manager); +/// let background_logger = Arc::clone(&my_logger); +/// let background_scorer = Arc::clone(&my_scorer); +/// +/// // Setup the sleeper. +/// let (stop_sender, stop_receiver) = tokio::sync::watch::channel(()); +/// +/// let sleeper = move |d| { +/// let mut receiver = stop_receiver.clone(); +/// Box::pin(async move { +/// tokio::select!{ +/// _ = tokio::time::sleep(d) => false, +/// _ = receiver.changed() => true, +/// } +/// }) +/// }; +/// +/// let mobile_interruptable_platform = false; +/// +/// let handle = tokio::spawn(async move { +/// process_events_async( +/// background_persister, +/// |e| background_event_handler.handle_event(e), +/// background_chain_mon, +/// background_chan_man, +/// background_gossip_sync, +/// background_peer_man, +/// background_logger, +/// Some(background_scorer), +/// sleeper, +/// mobile_interruptable_platform, +/// ) +/// .await +/// .expect("Failed to process events"); +/// }); +/// +/// // Stop the background processing. +/// stop_sender.send(()).unwrap(); +/// handle.await.unwrap(); +/// # } +///``` #[cfg(feature = "futures")] pub async fn process_events_async< 'a, -- 2.39.5