X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning-background-processor%2Fsrc%2Flib.rs;h=3227b63fdee5d72c30d509c98a945d86a5a5a16e;hb=cbfff99124fbc30c73b82b37a9fd4151225b43ea;hp=18ed6da06cd5e7e5d0574432693f732be28961e3;hpb=d5b05e54c32cd53e0534849b57242c65747738b1;p=rust-lightning diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index 18ed6da0..3227b63f 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -64,8 +64,8 @@ use alloc::vec::Vec; /// * Monitoring whether the [`ChannelManager`] needs to be re-persisted to disk, and if so, /// writing it to disk/backups by invoking the callback given to it at startup. /// [`ChannelManager`] persistence should be done in the background. -/// * Calling [`ChannelManager::timer_tick_occurred`] and [`PeerManager::timer_tick_occurred`] -/// at the appropriate intervals. +/// * Calling [`ChannelManager::timer_tick_occurred`], [`ChainMonitor::rebroadcast_pending_claims`] +/// and [`PeerManager::timer_tick_occurred`] at the appropriate intervals. /// * Calling [`NetworkGraph::remove_stale_channels_and_tracking`] (if a [`GossipSync`] with a /// [`NetworkGraph`] is provided to [`BackgroundProcessor::start`]). /// @@ -116,12 +116,17 @@ const FIRST_NETWORK_PRUNE_TIMER: u64 = 60; #[cfg(test)] const FIRST_NETWORK_PRUNE_TIMER: u64 = 1; +#[cfg(not(test))] +const REBROADCAST_TIMER: u64 = 30; +#[cfg(test)] +const REBROADCAST_TIMER: u64 = 1; + #[cfg(feature = "futures")] /// core::cmp::min is not currently const, so we define a trivial (and equivalent) replacement const fn min_u64(a: u64, b: u64) -> u64 { if a < b { a } else { b } } #[cfg(feature = "futures")] const FASTEST_TIMER: u64 = min_u64(min_u64(FRESHNESS_TIMER, PING_TIMER), - min_u64(SCORER_PERSIST_TIMER, FIRST_NETWORK_PRUNE_TIMER)); + min_u64(SCORER_PERSIST_TIMER, min_u64(FIRST_NETWORK_PRUNE_TIMER, REBROADCAST_TIMER))); /// Either [`P2PGossipSync`] or [`RapidGossipSync`]. pub enum GossipSync< @@ -265,11 +270,14 @@ macro_rules! define_run_body { => { { log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup"); $channel_manager.timer_tick_occurred(); + log_trace!($logger, "Rebroadcasting monitor's pending claims on startup"); + $chain_monitor.rebroadcast_pending_claims(); let mut last_freshness_call = $get_timer(FRESHNESS_TIMER); let mut last_ping_call = $get_timer(PING_TIMER); let mut last_prune_call = $get_timer(FIRST_NETWORK_PRUNE_TIMER); let mut last_scorer_persist_call = $get_timer(SCORER_PERSIST_TIMER); + let mut last_rebroadcast_call = $get_timer(REBROADCAST_TIMER); let mut have_pruned = false; loop { @@ -289,6 +297,12 @@ macro_rules! define_run_body { // persistence. $peer_manager.process_events(); + // Exit the loop if the background processor was requested to stop. + if $loop_exit_check { + log_trace!($logger, "Terminating background processor."); + break; + } + // We wait up to 100ms, but track how long it takes to detect being put to sleep, // see `await_start`'s use below. let mut await_start = None; @@ -296,16 +310,17 @@ macro_rules! define_run_body { let updates_available = $await; let await_slow = if $check_slow_await { $timer_elapsed(&mut await_start.unwrap(), 1) } else { false }; - if updates_available { - log_trace!($logger, "Persisting ChannelManager..."); - $persister.persist_manager(&*$channel_manager)?; - log_trace!($logger, "Done persisting ChannelManager."); - } // Exit the loop if the background processor was requested to stop. if $loop_exit_check { log_trace!($logger, "Terminating background processor."); break; } + + if updates_available { + log_trace!($logger, "Persisting ChannelManager..."); + $persister.persist_manager(&*$channel_manager)?; + log_trace!($logger, "Done persisting ChannelManager."); + } if $timer_elapsed(&mut last_freshness_call, FRESHNESS_TIMER) { log_trace!($logger, "Calling ChannelManager's timer_tick_occurred"); $channel_manager.timer_tick_occurred(); @@ -337,7 +352,8 @@ macro_rules! define_run_body { // falling back to our usual hourly prunes. This avoids short-lived clients never // pruning their network graph. We run once 60 seconds after startup before // continuing our normal cadence. - if $timer_elapsed(&mut last_prune_call, if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER }) { + let prune_timer = if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER }; + if $timer_elapsed(&mut last_prune_call, prune_timer) { // The network graph must not be pruned while rapid sync completion is pending if let Some(network_graph) = $gossip_sync.prunable_network_graph() { #[cfg(feature = "std")] { @@ -355,7 +371,8 @@ macro_rules! define_run_body { have_pruned = true; } - last_prune_call = $get_timer(NETWORK_PRUNE_TIMER); + let prune_timer = if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER }; + last_prune_call = $get_timer(prune_timer); } if $timer_elapsed(&mut last_scorer_persist_call, SCORER_PERSIST_TIMER) { @@ -367,6 +384,12 @@ macro_rules! define_run_body { } last_scorer_persist_call = $get_timer(SCORER_PERSIST_TIMER); } + + if $timer_elapsed(&mut last_rebroadcast_call, REBROADCAST_TIMER) { + log_trace!($logger, "Rebroadcasting monitor's pending claims"); + $chain_monitor.rebroadcast_pending_claims(); + last_rebroadcast_call = $get_timer(REBROADCAST_TIMER); + } } // After we exit, ensure we persist the ChannelManager one final time - this avoids @@ -445,7 +468,8 @@ use core::task; /// /// `sleeper` should return a future which completes in the given amount of time and returns a /// boolean indicating whether the background processing should exit. Once `sleeper` returns a -/// future which outputs true, the loop will exit and this function's future will complete. +/// future which outputs `true`, the loop will exit and this function's future will complete. +/// The `sleeper` future is free to return early after it has triggered the exit condition. /// /// See [`BackgroundProcessor::start`] for information on which actions this handles. /// @@ -458,6 +482,87 @@ use core::task; /// mobile device, where we may need to check for interruption of the application regularly. If you /// are unsure, you should set the flag, as the performance impact of it is minimal unless there /// are hundreds or thousands of simultaneous process calls running. +/// +/// For example, in order to process background events in a [Tokio](https://tokio.rs/) task, you +/// could setup `process_events_async` like this: +/// ``` +/// # struct MyPersister {} +/// # impl lightning::util::persist::KVStorePersister for MyPersister { +/// # fn persist(&self, key: &str, object: &W) -> lightning::io::Result<()> { Ok(()) } +/// # } +/// # struct MyEventHandler {} +/// # impl MyEventHandler { +/// # async fn handle_event(&self, _: lightning::events::Event) {} +/// # } +/// # #[derive(Eq, PartialEq, Clone, Hash)] +/// # struct MySocketDescriptor {} +/// # impl lightning::ln::peer_handler::SocketDescriptor for MySocketDescriptor { +/// # fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize { 0 } +/// # fn disconnect_socket(&mut self) {} +/// # } +/// # use std::sync::{Arc, Mutex}; +/// # use std::sync::atomic::{AtomicBool, Ordering}; +/// # use lightning_background_processor::{process_events_async, GossipSync}; +/// # type MyBroadcaster = dyn lightning::chain::chaininterface::BroadcasterInterface + Send + Sync; +/// # type MyFeeEstimator = dyn lightning::chain::chaininterface::FeeEstimator + Send + Sync; +/// # type MyNodeSigner = dyn lightning::chain::keysinterface::NodeSigner + Send + Sync; +/// # type MyUtxoLookup = dyn lightning::routing::utxo::UtxoLookup + Send + Sync; +/// # type MyFilter = dyn lightning::chain::Filter + Send + Sync; +/// # type MyLogger = dyn lightning::util::logger::Logger + Send + Sync; +/// # type MyChainMonitor = lightning::chain::chainmonitor::ChainMonitor, Arc, Arc, Arc, Arc>; +/// # type MyPeerManager = lightning::ln::peer_handler::SimpleArcPeerManager; +/// # type MyNetworkGraph = lightning::routing::gossip::NetworkGraph>; +/// # type MyGossipSync = lightning::routing::gossip::P2PGossipSync, Arc, Arc>; +/// # type MyChannelManager = lightning::ln::channelmanager::SimpleArcChannelManager; +/// # type MyScorer = Mutex, Arc>>; +/// +/// # async fn setup_background_processing(my_persister: Arc, my_event_handler: Arc, my_chain_monitor: Arc, my_channel_manager: Arc, my_gossip_sync: Arc, my_logger: Arc, my_scorer: Arc, my_peer_manager: Arc) { +/// let background_persister = Arc::clone(&my_persister); +/// let background_event_handler = Arc::clone(&my_event_handler); +/// let background_chain_mon = Arc::clone(&my_chain_monitor); +/// let background_chan_man = Arc::clone(&my_channel_manager); +/// let background_gossip_sync = GossipSync::p2p(Arc::clone(&my_gossip_sync)); +/// let background_peer_man = Arc::clone(&my_peer_manager); +/// let background_logger = Arc::clone(&my_logger); +/// let background_scorer = Arc::clone(&my_scorer); +/// +/// // Setup the sleeper. +/// let (stop_sender, stop_receiver) = tokio::sync::watch::channel(()); +/// +/// let sleeper = move |d| { +/// let mut receiver = stop_receiver.clone(); +/// Box::pin(async move { +/// tokio::select!{ +/// _ = tokio::time::sleep(d) => false, +/// _ = receiver.changed() => true, +/// } +/// }) +/// }; +/// +/// let mobile_interruptable_platform = false; +/// +/// let handle = tokio::spawn(async move { +/// process_events_async( +/// background_persister, +/// |e| background_event_handler.handle_event(e), +/// background_chain_mon, +/// background_chan_man, +/// background_gossip_sync, +/// background_peer_man, +/// background_logger, +/// Some(background_scorer), +/// sleeper, +/// mobile_interruptable_platform, +/// ) +/// .await +/// .expect("Failed to process events"); +/// }); +/// +/// // Stop the background processing. +/// stop_sender.send(()).unwrap(); +/// handle.await.unwrap(); +/// # } +///``` #[cfg(feature = "futures")] pub async fn process_events_async< 'a, @@ -862,7 +967,10 @@ mod tests { if key == "network_graph" { if let Some(sender) = &self.graph_persistence_notifier { - sender.send(()).unwrap(); + match sender.send(()) { + Ok(()) => {}, + Err(std::sync::mpsc::SendError(())) => println!("Persister failed to notify as receiver went away."), + } }; if let Some((error, message)) = self.graph_error { @@ -1009,12 +1117,12 @@ mod tests { } fn create_nodes(num_nodes: usize, persist_dir: String) -> Vec { + let network = Network::Testnet; let mut nodes = Vec::new(); for i in 0..num_nodes { - let tx_broadcaster = Arc::new(test_utils::TestBroadcaster{txn_broadcasted: Mutex::new(Vec::new()), blocks: Arc::new(Mutex::new(Vec::new()))}); + let tx_broadcaster = Arc::new(test_utils::TestBroadcaster::new(network)); let fee_estimator = Arc::new(test_utils::TestFeeEstimator { sat_per_kw: Mutex::new(253) }); let logger = Arc::new(test_utils::TestLogger::with_id(format!("node {}", i))); - let network = Network::Testnet; let genesis_block = genesis_block(network); let network_graph = Arc::new(NetworkGraph::new(network, logger.clone())); let scorer = Arc::new(Mutex::new(TestScorer::new())); @@ -1184,8 +1292,9 @@ mod tests { #[test] fn test_timer_tick_called() { - // Test that ChannelManager's and PeerManager's `timer_tick_occurred` is called every - // `FRESHNESS_TIMER`. + // Test that `ChannelManager::timer_tick_occurred` is called every `FRESHNESS_TIMER`, + // `ChainMonitor::rebroadcast_pending_claims` is called every `REBROADCAST_TIMER`, and + // `PeerManager::timer_tick_occurred` every `PING_TIMER`. let nodes = create_nodes(1, "test_timer_tick_called".to_string()); let data_dir = nodes[0].persister.get_data_dir(); let persister = Arc::new(Persister::new(data_dir)); @@ -1193,10 +1302,12 @@ mod tests { let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone())); loop { let log_entries = nodes[0].logger.lines.lock().unwrap(); - let desired_log = "Calling ChannelManager's timer_tick_occurred".to_string(); - let second_desired_log = "Calling PeerManager's timer_tick_occurred".to_string(); - if log_entries.get(&("lightning_background_processor".to_string(), desired_log)).is_some() && - log_entries.get(&("lightning_background_processor".to_string(), second_desired_log)).is_some() { + let desired_log_1 = "Calling ChannelManager's timer_tick_occurred".to_string(); + let desired_log_2 = "Calling PeerManager's timer_tick_occurred".to_string(); + let desired_log_3 = "Rebroadcasting monitor's pending claims".to_string(); + if log_entries.get(&("lightning_background_processor".to_string(), desired_log_1)).is_some() && + log_entries.get(&("lightning_background_processor".to_string(), desired_log_2)).is_some() && + log_entries.get(&("lightning_background_processor".to_string(), desired_log_3)).is_some() { break } } @@ -1475,10 +1586,9 @@ mod tests { }) }, false, ); - // TODO: Drop _local and simply spawn after #2003 - let local_set = tokio::task::LocalSet::new(); - local_set.spawn_local(bp_future); - local_set.spawn_local(async move { + + let t1 = tokio::spawn(bp_future); + let t2 = tokio::spawn(async move { do_test_not_pruning_network_graph_until_graph_sync_completion!(nodes, { let mut i = 0; loop { @@ -1490,7 +1600,9 @@ mod tests { }, tokio::time::sleep(Duration::from_millis(1)).await); exit_sender.send(()).unwrap(); }); - local_set.await; + let (r1, r2) = tokio::join!(t1, t2); + r1.unwrap().unwrap(); + r2.unwrap() } macro_rules! do_test_payment_path_scoring { @@ -1512,7 +1624,7 @@ mod tests { channel_features: ChannelFeatures::empty(), fee_msat: 0, cltv_expiry_delta: MIN_CLTV_EXPIRY_DELTA as u32, - }]}; + }], blinded_tail: None }; $nodes[0].scorer.lock().unwrap().expect(TestResult::PaymentFailure { path: path.clone(), short_channel_id: scored_scid }); $nodes[0].node.push_pending_event(Event::PaymentPathFailed { @@ -1644,13 +1756,14 @@ mod tests { }) }, false, ); - // TODO: Drop _local and simply spawn after #2003 - let local_set = tokio::task::LocalSet::new(); - local_set.spawn_local(bp_future); - local_set.spawn_local(async move { + let t1 = tokio::spawn(bp_future); + let t2 = tokio::spawn(async move { do_test_payment_path_scoring!(nodes, receiver.recv().await); exit_sender.send(()).unwrap(); }); - local_set.await; + + let (r1, r2) = tokio::join!(t1, t2); + r1.unwrap().unwrap(); + r2.unwrap() } }