Merge pull request #1657 from TheBlueMatt/2022-08-async-man-update
authorvalentinewallace <valentinewallace@users.noreply.github.com>
Tue, 6 Sep 2022 20:06:06 +0000 (16:06 -0400)
committerGitHub <noreply@github.com>
Tue, 6 Sep 2022 20:06:06 +0000 (16:06 -0400)
Add a background processor which is async

1  2 
lightning-background-processor/Cargo.toml
lightning-background-processor/src/lib.rs
lightning/src/ln/channelmanager.rs

index ef07a3c9df5fe9dfa0dd9752da03c674d8c1271a,e51e2ac1b8e0aa4352664fa8e81db362437cc7ac..c5dcae1d972ce3b656acffc7845b4946c4faf4a4
@@@ -14,9 -14,10 +14,10 @@@ all-features = tru
  rustdoc-args = ["--cfg", "docsrs"]
  
  [dependencies]
 -bitcoin = "0.28.1"
 +bitcoin = "0.29.0"
  lightning = { version = "0.0.110", path = "../lightning", features = ["std"] }
  lightning-rapid-gossip-sync = { version = "0.0.110", path = "../lightning-rapid-gossip-sync" }
+ futures = { version = "0.3", optional = true }
  
  [dev-dependencies]
  lightning = { version = "0.0.110", path = "../lightning", features = ["_test_utils"] }
index e95c9c3709edfe80cfce45d57bdd003a928725f7,d38c30e584fa1237b0b3870dda8c7825d0d67e72..e1fa8144ac4fce2066ea7bf0c47ee5fcdfb5b8aa
@@@ -2,10 -2,7 +2,10 @@@
  //! running properly, and (2) either can or should be run in the background. See docs for
  //! [`BackgroundProcessor`] for more details on the nitty-gritty.
  
 +// Prefix these with `rustdoc::` when we update our MSRV to be >= 1.52 to remove warnings.
  #![deny(broken_intra_doc_links)]
 +#![deny(private_intra_doc_links)]
 +
  #![deny(missing_docs)]
  #![deny(unsafe_code)]
  
@@@ -19,7 -16,7 +19,7 @@@ use lightning::chain::chaininterface::{
  use lightning::chain::chainmonitor::{ChainMonitor, Persist};
  use lightning::chain::keysinterface::{Sign, KeysInterface};
  use lightning::ln::channelmanager::ChannelManager;
 -use lightning::ln::msgs::{ChannelMessageHandler, RoutingMessageHandler};
 +use lightning::ln::msgs::{ChannelMessageHandler, OnionMessageHandler, RoutingMessageHandler};
  use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor};
  use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
  use lightning::routing::scoring::WriteableScore;
@@@ -34,6 -31,9 +34,9 @@@ use std::thread::JoinHandle
  use std::time::{Duration, Instant};
  use std::ops::Deref;
  
+ #[cfg(feature = "futures")]
+ use futures::{select, future::FutureExt};
  /// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep
  /// Rust-Lightning running properly, and (2) either can or should be run in the background. Its
  /// responsibilities are:
@@@ -222,6 -222,203 +225,203 @@@ where A::Target: chain::Access, L::Targ
        }
  }
  
+ macro_rules! define_run_body {
+       ($persister: ident, $event_handler: ident, $chain_monitor: ident, $channel_manager: ident,
+        $gossip_sync: ident, $peer_manager: ident, $logger: ident, $scorer: ident,
+        $loop_exit_check: expr, $await: expr)
+       => { {
+               let event_handler = DecoratingEventHandler {
+                       event_handler: $event_handler,
+                       gossip_sync: &$gossip_sync,
+               };
+               log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup");
+               $channel_manager.timer_tick_occurred();
+               let mut last_freshness_call = Instant::now();
+               let mut last_ping_call = Instant::now();
+               let mut last_prune_call = Instant::now();
+               let mut last_scorer_persist_call = Instant::now();
+               let mut have_pruned = false;
+               loop {
+                       $channel_manager.process_pending_events(&event_handler);
+                       $chain_monitor.process_pending_events(&event_handler);
+                       // Note that the PeerManager::process_events may block on ChannelManager's locks,
+                       // hence it comes last here. When the ChannelManager finishes whatever it's doing,
+                       // we want to ensure we get into `persist_manager` as quickly as we can, especially
+                       // without running the normal event processing above and handing events to users.
+                       //
+                       // Specifically, on an *extremely* slow machine, we may see ChannelManager start
+                       // processing a message effectively at any point during this loop. In order to
+                       // minimize the time between such processing completing and persisting the updated
+                       // ChannelManager, we want to minimize methods blocking on a ChannelManager
+                       // generally, and as a fallback place such blocking only immediately before
+                       // persistence.
+                       $peer_manager.process_events();
+                       // We wait up to 100ms, but track how long it takes to detect being put to sleep,
+                       // see `await_start`'s use below.
+                       let await_start = Instant::now();
+                       let updates_available = $await;
+                       let await_time = await_start.elapsed();
+                       if updates_available {
+                               log_trace!($logger, "Persisting ChannelManager...");
+                               $persister.persist_manager(&*$channel_manager)?;
+                               log_trace!($logger, "Done persisting ChannelManager.");
+                       }
+                       // Exit the loop if the background processor was requested to stop.
+                       if $loop_exit_check {
+                               log_trace!($logger, "Terminating background processor.");
+                               break;
+                       }
+                       if last_freshness_call.elapsed().as_secs() > FRESHNESS_TIMER {
+                               log_trace!($logger, "Calling ChannelManager's timer_tick_occurred");
+                               $channel_manager.timer_tick_occurred();
+                               last_freshness_call = Instant::now();
+                       }
+                       if await_time > Duration::from_secs(1) {
+                               // On various platforms, we may be starved of CPU cycles for several reasons.
+                               // E.g. on iOS, if we've been in the background, we will be entirely paused.
+                               // Similarly, if we're on a desktop platform and the device has been asleep, we
+                               // may not get any cycles.
+                               // We detect this by checking if our max-100ms-sleep, above, ran longer than a
+                               // full second, at which point we assume sockets may have been killed (they
+                               // appear to be at least on some platforms, even if it has only been a second).
+                               // Note that we have to take care to not get here just because user event
+                               // processing was slow at the top of the loop. For example, the sample client
+                               // may call Bitcoin Core RPCs during event handling, which very often takes
+                               // more than a handful of seconds to complete, and shouldn't disconnect all our
+                               // peers.
+                               log_trace!($logger, "100ms sleep took more than a second, disconnecting peers.");
+                               $peer_manager.disconnect_all_peers();
+                               last_ping_call = Instant::now();
+                       } else if last_ping_call.elapsed().as_secs() > PING_TIMER {
+                               log_trace!($logger, "Calling PeerManager's timer_tick_occurred");
+                               $peer_manager.timer_tick_occurred();
+                               last_ping_call = Instant::now();
+                       }
+                       // Note that we want to run a graph prune once not long after startup before
+                       // falling back to our usual hourly prunes. This avoids short-lived clients never
+                       // pruning their network graph. We run once 60 seconds after startup before
+                       // continuing our normal cadence.
+                       if last_prune_call.elapsed().as_secs() > if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER } {
+                               // The network graph must not be pruned while rapid sync completion is pending
+                               log_trace!($logger, "Assessing prunability of network graph");
+                               if let Some(network_graph) = $gossip_sync.prunable_network_graph() {
+                                       network_graph.remove_stale_channels();
+                                       if let Err(e) = $persister.persist_graph(network_graph) {
+                                               log_error!($logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
+                                       }
+                                       last_prune_call = Instant::now();
+                                       have_pruned = true;
+                               } else {
+                                       log_trace!($logger, "Not pruning network graph, either due to pending rapid gossip sync or absence of a prunable graph.");
+                               }
+                       }
+                       if last_scorer_persist_call.elapsed().as_secs() > SCORER_PERSIST_TIMER {
+                               if let Some(ref scorer) = $scorer {
+                                       log_trace!($logger, "Persisting scorer");
+                                       if let Err(e) = $persister.persist_scorer(&scorer) {
+                                               log_error!($logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
+                                       }
+                               }
+                               last_scorer_persist_call = Instant::now();
+                       }
+               }
+               // After we exit, ensure we persist the ChannelManager one final time - this avoids
+               // some races where users quit while channel updates were in-flight, with
+               // ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
+               $persister.persist_manager(&*$channel_manager)?;
+               // Persist Scorer on exit
+               if let Some(ref scorer) = $scorer {
+                       $persister.persist_scorer(&scorer)?;
+               }
+               // Persist NetworkGraph on exit
+               if let Some(network_graph) = $gossip_sync.network_graph() {
+                       $persister.persist_graph(network_graph)?;
+               }
+               Ok(())
+       } }
+ }
+ /// Processes background events in a future.
+ ///
+ /// `sleeper` should return a future which completes in the given amount of time and returns a
+ /// boolean indicating whether the background processing should continue. Once `sleeper` returns a
+ /// future which outputs false, the loop will exit and this function's future will complete.
+ ///
+ /// See [`BackgroundProcessor::start`] for information on which actions this handles.
+ #[cfg(feature = "futures")]
+ pub async fn process_events_async<
+       'a,
+       Signer: 'static + Sign,
+       CA: 'static + Deref + Send + Sync,
+       CF: 'static + Deref + Send + Sync,
+       CW: 'static + Deref + Send + Sync,
+       T: 'static + Deref + Send + Sync,
+       K: 'static + Deref + Send + Sync,
+       F: 'static + Deref + Send + Sync,
+       G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
+       L: 'static + Deref + Send + Sync,
+       P: 'static + Deref + Send + Sync,
+       Descriptor: 'static + SocketDescriptor + Send + Sync,
+       CMH: 'static + Deref + Send + Sync,
+       RMH: 'static + Deref + Send + Sync,
+       EH: 'static + EventHandler + Send,
+       PS: 'static + Deref + Send,
+       M: 'static + Deref<Target = ChainMonitor<Signer, CF, T, F, L, P>> + Send + Sync,
+       CM: 'static + Deref<Target = ChannelManager<Signer, CW, T, K, F, L>> + Send + Sync,
+       PGS: 'static + Deref<Target = P2PGossipSync<G, CA, L>> + Send + Sync,
+       RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
+       UMH: 'static + Deref + Send + Sync,
+       PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, L, UMH>> + Send + Sync,
+       S: 'static + Deref<Target = SC> + Send + Sync,
+       SC: WriteableScore<'a>,
+       SleepFuture: core::future::Future<Output = bool>,
+       Sleeper: Fn(Duration) -> SleepFuture
+ >(
+       persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
+       gossip_sync: GossipSync<PGS, RGS, G, CA, L>, peer_manager: PM, logger: L, scorer: Option<S>,
+       sleeper: Sleeper,
+ ) -> Result<(), std::io::Error>
+ where
+       CA::Target: 'static + chain::Access,
+       CF::Target: 'static + chain::Filter,
+       CW::Target: 'static + chain::Watch<Signer>,
+       T::Target: 'static + BroadcasterInterface,
+       K::Target: 'static + KeysInterface<Signer = Signer>,
+       F::Target: 'static + FeeEstimator,
+       L::Target: 'static + Logger,
+       P::Target: 'static + Persist<Signer>,
+       CMH::Target: 'static + ChannelMessageHandler,
+       RMH::Target: 'static + RoutingMessageHandler,
+       UMH::Target: 'static + CustomMessageHandler,
+       PS::Target: 'static + Persister<'a, Signer, CW, T, K, F, L, SC>,
+ {
+       let mut should_continue = true;
+       define_run_body!(persister, event_handler, chain_monitor, channel_manager,
+               gossip_sync, peer_manager, logger, scorer, should_continue, {
+                       select! {
+                               _ = channel_manager.get_persistable_update_future().fuse() => true,
+                               cont = sleeper(Duration::from_millis(100)).fuse() => {
+                                       should_continue = cont;
+                                       false
+                               }
+                       }
+               })
+ }
  impl BackgroundProcessor {
        /// Start a background thread that takes care of responsibilities enumerated in the [top-level
        /// documentation].
                P: 'static + Deref + Send + Sync,
                Descriptor: 'static + SocketDescriptor + Send + Sync,
                CMH: 'static + Deref + Send + Sync,
 +              OMH: 'static + Deref + Send + Sync,
                RMH: 'static + Deref + Send + Sync,
                EH: 'static + EventHandler + Send,
                PS: 'static + Deref + Send,
                PGS: 'static + Deref<Target = P2PGossipSync<G, CA, L>> + Send + Sync,
                RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
                UMH: 'static + Deref + Send + Sync,
 -              PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, L, UMH>> + Send + Sync,
 +              PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH>> + Send + Sync,
                S: 'static + Deref<Target = SC> + Send + Sync,
                SC: WriteableScore<'a>,
        >(
                L::Target: 'static + Logger,
                P::Target: 'static + Persist<Signer>,
                CMH::Target: 'static + ChannelMessageHandler,
 +              OMH::Target: 'static + OnionMessageHandler,
                RMH::Target: 'static + RoutingMessageHandler,
                UMH::Target: 'static + CustomMessageHandler,
                PS::Target: 'static + Persister<'a, Signer, CW, T, K, F, L, SC>,
                let stop_thread = Arc::new(AtomicBool::new(false));
                let stop_thread_clone = stop_thread.clone();
                let handle = thread::spawn(move || -> Result<(), std::io::Error> {
-                       let event_handler = DecoratingEventHandler {
-                               event_handler,
-                               gossip_sync: &gossip_sync,
-                       };
-                       log_trace!(logger, "Calling ChannelManager's timer_tick_occurred on startup");
-                       channel_manager.timer_tick_occurred();
-                       let mut last_freshness_call = Instant::now();
-                       let mut last_ping_call = Instant::now();
-                       let mut last_prune_call = Instant::now();
-                       let mut last_scorer_persist_call = Instant::now();
-                       let mut have_pruned = false;
-                       loop {
-                               channel_manager.process_pending_events(&event_handler);
-                               chain_monitor.process_pending_events(&event_handler);
-                               // Note that the PeerManager::process_events may block on ChannelManager's locks,
-                               // hence it comes last here. When the ChannelManager finishes whatever it's doing,
-                               // we want to ensure we get into `persist_manager` as quickly as we can, especially
-                               // without running the normal event processing above and handing events to users.
-                               //
-                               // Specifically, on an *extremely* slow machine, we may see ChannelManager start
-                               // processing a message effectively at any point during this loop. In order to
-                               // minimize the time between such processing completing and persisting the updated
-                               // ChannelManager, we want to minimize methods blocking on a ChannelManager
-                               // generally, and as a fallback place such blocking only immediately before
-                               // persistence.
-                               peer_manager.process_events();
-                               // We wait up to 100ms, but track how long it takes to detect being put to sleep,
-                               // see `await_start`'s use below.
-                               let await_start = Instant::now();
-                               let updates_available =
-                                       channel_manager.await_persistable_update_timeout(Duration::from_millis(100));
-                               let await_time = await_start.elapsed();
-                               if updates_available {
-                                       log_trace!(logger, "Persisting ChannelManager...");
-                                       persister.persist_manager(&*channel_manager)?;
-                                       log_trace!(logger, "Done persisting ChannelManager.");
-                               }
-                               // Exit the loop if the background processor was requested to stop.
-                               if stop_thread.load(Ordering::Acquire) == true {
-                                       log_trace!(logger, "Terminating background processor.");
-                                       break;
-                               }
-                               if last_freshness_call.elapsed().as_secs() > FRESHNESS_TIMER {
-                                       log_trace!(logger, "Calling ChannelManager's timer_tick_occurred");
-                                       channel_manager.timer_tick_occurred();
-                                       last_freshness_call = Instant::now();
-                               }
-                               if await_time > Duration::from_secs(1) {
-                                       // On various platforms, we may be starved of CPU cycles for several reasons.
-                                       // E.g. on iOS, if we've been in the background, we will be entirely paused.
-                                       // Similarly, if we're on a desktop platform and the device has been asleep, we
-                                       // may not get any cycles.
-                                       // We detect this by checking if our max-100ms-sleep, above, ran longer than a
-                                       // full second, at which point we assume sockets may have been killed (they
-                                       // appear to be at least on some platforms, even if it has only been a second).
-                                       // Note that we have to take care to not get here just because user event
-                                       // processing was slow at the top of the loop. For example, the sample client
-                                       // may call Bitcoin Core RPCs during event handling, which very often takes
-                                       // more than a handful of seconds to complete, and shouldn't disconnect all our
-                                       // peers.
-                                       log_trace!(logger, "100ms sleep took more than a second, disconnecting peers.");
-                                       peer_manager.disconnect_all_peers();
-                                       last_ping_call = Instant::now();
-                               } else if last_ping_call.elapsed().as_secs() > PING_TIMER {
-                                       log_trace!(logger, "Calling PeerManager's timer_tick_occurred");
-                                       peer_manager.timer_tick_occurred();
-                                       last_ping_call = Instant::now();
-                               }
-                               // Note that we want to run a graph prune once not long after startup before
-                               // falling back to our usual hourly prunes. This avoids short-lived clients never
-                               // pruning their network graph. We run once 60 seconds after startup before
-                               // continuing our normal cadence.
-                               if last_prune_call.elapsed().as_secs() > if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER } {
-                                       // The network graph must not be pruned while rapid sync completion is pending
-                                       log_trace!(logger, "Assessing prunability of network graph");
-                                       if let Some(network_graph) = gossip_sync.prunable_network_graph() {
-                                               network_graph.remove_stale_channels();
-                                               if let Err(e) = persister.persist_graph(network_graph) {
-                                                       log_error!(logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
-                                               }
-                                               last_prune_call = Instant::now();
-                                               have_pruned = true;
-                                       } else {
-                                               log_trace!(logger, "Not pruning network graph, either due to pending rapid gossip sync or absence of a prunable graph.");
-                                       }
-                               }
-                               if last_scorer_persist_call.elapsed().as_secs() > SCORER_PERSIST_TIMER {
-                                       if let Some(ref scorer) = scorer {
-                                               log_trace!(logger, "Persisting scorer");
-                                               if let Err(e) = persister.persist_scorer(&scorer) {
-                                                       log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
-                                               }
-                                       }
-                                       last_scorer_persist_call = Instant::now();
-                               }
-                       }
-                       // After we exit, ensure we persist the ChannelManager one final time - this avoids
-                       // some races where users quit while channel updates were in-flight, with
-                       // ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
-                       persister.persist_manager(&*channel_manager)?;
-                       // Persist Scorer on exit
-                       if let Some(ref scorer) = scorer {
-                               persister.persist_scorer(&scorer)?;
-                       }
-                       // Persist NetworkGraph on exit
-                       if let Some(network_graph) = gossip_sync.network_graph() {
-                               persister.persist_graph(network_graph)?;
-                       }
-                       Ok(())
+                       define_run_body!(persister, event_handler, chain_monitor, channel_manager,
+                               gossip_sync, peer_manager, logger, scorer, stop_thread.load(Ordering::Acquire),
+                               channel_manager.await_persistable_update_timeout(Duration::from_millis(100)))
                });
                Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
        }
@@@ -493,7 -568,6 +573,7 @@@ impl Drop for BackgroundProcessor 
  mod tests {
        use bitcoin::blockdata::block::BlockHeader;
        use bitcoin::blockdata::constants::genesis_block;
 +      use bitcoin::blockdata::locktime::PackedLockTime;
        use bitcoin::blockdata::transaction::{Transaction, TxOut};
        use bitcoin::network::constants::Network;
        use lightning::chain::{BestBlock, Confirm, chainmonitor};
        use std::sync::{Arc, Mutex};
        use std::sync::mpsc::SyncSender;
        use std::time::Duration;
 +      use bitcoin::hashes::Hash;
 +      use bitcoin::TxMerkleNode;
        use lightning::routing::scoring::{FixedPenaltyScorer};
        use lightning_rapid_gossip_sync::RapidGossipSync;
        use super::{BackgroundProcessor, GossipSync, FRESHNESS_TIMER};
                node: Arc<SimpleArcChannelManager<ChainMonitor, test_utils::TestBroadcaster, test_utils::TestFeeEstimator, test_utils::TestLogger>>,
                p2p_gossip_sync: PGS,
                rapid_gossip_sync: RGS,
 -              peer_manager: Arc<PeerManager<TestDescriptor, Arc<test_utils::TestChannelMessageHandler>, Arc<test_utils::TestRoutingMessageHandler>, Arc<test_utils::TestLogger>, IgnoringMessageHandler>>,
 +              peer_manager: Arc<PeerManager<TestDescriptor, Arc<test_utils::TestChannelMessageHandler>, Arc<test_utils::TestRoutingMessageHandler>, IgnoringMessageHandler, Arc<test_utils::TestLogger>, IgnoringMessageHandler>>,
                chain_monitor: Arc<ChainMonitor>,
                persister: Arc<FilesystemPersister>,
                tx_broadcaster: Arc<test_utils::TestBroadcaster>,
                        let network_graph = Arc::new(NetworkGraph::new(genesis_block.header.block_hash(), logger.clone()));
                        let p2p_gossip_sync = Arc::new(P2PGossipSync::new(network_graph.clone(), Some(chain_source.clone()), logger.clone()));
                        let rapid_gossip_sync = Arc::new(RapidGossipSync::new(network_graph.clone()));
 -                      let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new() )};
 +                      let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new()), onion_message_handler: IgnoringMessageHandler{}};
                        let peer_manager = Arc::new(PeerManager::new(msg_handler, keys_manager.get_node_secret(Recipient::Node).unwrap(), &seed, logger.clone(), IgnoringMessageHandler{}));
                        let scorer = Arc::new(Mutex::new(test_utils::TestScorer::with_penalty(0)));
                        let node = Node { node: manager, p2p_gossip_sync, rapid_gossip_sync, peer_manager, chain_monitor, persister, tx_broadcaster, network_graph, logger, best_block, scorer };
                                        assert_eq!(channel_value_satoshis, $channel_value);
                                        assert_eq!(user_channel_id, 42);
  
 -                                      let tx = Transaction { version: 1 as i32, lock_time: 0, input: Vec::new(), output: vec![TxOut {
 +                                      let tx = Transaction { version: 1 as i32, lock_time: PackedLockTime(0), input: Vec::new(), output: vec![TxOut {
                                                value: channel_value_satoshis, script_pubkey: output_script.clone(),
                                        }]};
                                        (temporary_channel_id, tx)
                for i in 1..=depth {
                        let prev_blockhash = node.best_block.block_hash();
                        let height = node.best_block.height() + 1;
 -                      let header = BlockHeader { version: 0x20000000, prev_blockhash, merkle_root: Default::default(), time: height, bits: 42, nonce: 42 };
 +                      let header = BlockHeader { version: 0x20000000, prev_blockhash, merkle_root: TxMerkleNode::all_zeros(), time: height, bits: 42, nonce: 42 };
                        let txdata = vec![(0, tx)];
                        node.best_block = BestBlock::new(header.block_hash(), height);
                        match i {
index 4545cff51b2c721f588c3d89e9d2afd3814daa8e,467db740f0fe82f6e5614edc453329c4bc50526b..3b83726958b3a655b681f9a821d114b5a97e7c8a
@@@ -32,7 -32,7 +32,7 @@@ use bitcoin::hash_types::{BlockHash, Tx
  use bitcoin::secp256k1::{SecretKey,PublicKey};
  use bitcoin::secp256k1::Secp256k1;
  use bitcoin::secp256k1::ecdh::SharedSecret;
 -use bitcoin::secp256k1;
 +use bitcoin::{LockTime, secp256k1, Sequence};
  
  use chain;
  use chain::{Confirm, ChannelMonitorUpdateErr, Watch, BestBlock};
@@@ -54,6 -54,8 +54,8 @@@ use chain::keysinterface::{Sign, KeysIn
  use util::config::{UserConfig, ChannelConfig};
  use util::events::{EventHandler, EventsProvider, MessageSendEvent, MessageSendEventsProvider, ClosureReason, HTLCDestination};
  use util::{byte_utils, events};
+ use util::crypto::sign;
+ use util::wakers::{Future, Notifier};
  use util::scid_utils::fake_scid;
  use util::ser::{BigSize, FixedLengthReader, Readable, ReadableArgs, MaybeReadable, Writeable, Writer, VecWriter};
  use util::logger::{Level, Logger};
@@@ -64,15 -66,11 +66,11 @@@ use prelude::*
  use core::{cmp, mem};
  use core::cell::RefCell;
  use io::Read;
- use sync::{Arc, Condvar, Mutex, MutexGuard, RwLock, RwLockReadGuard};
+ use sync::{Arc, Mutex, MutexGuard, RwLock, RwLockReadGuard};
  use core::sync::atomic::{AtomicUsize, Ordering};
  use core::time::Duration;
  use core::ops::Deref;
  
- #[cfg(any(test, feature = "std"))]
- use std::time::Instant;
- use util::crypto::sign;
  // We hold various information about HTLC relay in the HTLC objects in Channel itself:
  //
  // Upon receipt of an HTLC from a peer, we'll give it a PendingHTLCStatus indicating if it should
@@@ -792,10 -790,10 +790,10 @@@ pub struct ChannelManager<Signer: Sign
        /// Taken first everywhere where we are making changes before any other locks.
        /// When acquiring this lock in read mode, rather than acquiring it directly, call
        /// `PersistenceNotifierGuard::notify_on_drop(..)` and pass the lock to it, to ensure the
-       /// PersistenceNotifier the lock contains sends out a notification when the lock is released.
+       /// Notifier the lock contains sends out a notification when the lock is released.
        total_consistency_lock: RwLock<()>,
  
-       persistence_notifier: PersistenceNotifier,
+       persistence_notifier: Notifier,
  
        keys_manager: K,
  
@@@ -835,18 -833,18 +833,18 @@@ enum NotifyOption 
  /// notify or not based on whether relevant changes have been made, providing a closure to
  /// `optionally_notify` which returns a `NotifyOption`.
  struct PersistenceNotifierGuard<'a, F: Fn() -> NotifyOption> {
-       persistence_notifier: &'a PersistenceNotifier,
+       persistence_notifier: &'a Notifier,
        should_persist: F,
        // We hold onto this result so the lock doesn't get released immediately.
        _read_guard: RwLockReadGuard<'a, ()>,
  }
  
  impl<'a> PersistenceNotifierGuard<'a, fn() -> NotifyOption> { // We don't care what the concrete F is here, it's unused
-       fn notify_on_drop(lock: &'a RwLock<()>, notifier: &'a PersistenceNotifier) -> PersistenceNotifierGuard<'a, impl Fn() -> NotifyOption> {
+       fn notify_on_drop(lock: &'a RwLock<()>, notifier: &'a Notifier) -> PersistenceNotifierGuard<'a, impl Fn() -> NotifyOption> {
                PersistenceNotifierGuard::optionally_notify(lock, notifier, || -> NotifyOption { NotifyOption::DoPersist })
        }
  
-       fn optionally_notify<F: Fn() -> NotifyOption>(lock: &'a RwLock<()>, notifier: &'a PersistenceNotifier, persist_check: F) -> PersistenceNotifierGuard<'a, F> {
+       fn optionally_notify<F: Fn() -> NotifyOption>(lock: &'a RwLock<()>, notifier: &'a Notifier, persist_check: F) -> PersistenceNotifierGuard<'a, F> {
                let read_guard = lock.read().unwrap();
  
                PersistenceNotifierGuard {
@@@ -1627,7 -1625,7 +1625,7 @@@ impl<Signer: Sign, M: Deref, T: Deref, 
                        pending_events: Mutex::new(Vec::new()),
                        pending_background_events: Mutex::new(Vec::new()),
                        total_consistency_lock: RwLock::new(()),
-                       persistence_notifier: PersistenceNotifier::new(),
+                       persistence_notifier: Notifier::new(),
  
                        keys_manager,
  
                        // constituting our Lightning node might not have perfect sync about their blockchain views. Thus, if
                        // the wallet module is in advance on the LDK view, allow one more block of headroom.
                        // TODO: updated if/when https://github.com/rust-bitcoin/rust-bitcoin/pull/994 landed and rust-bitcoin bumped.
 -                      if !funding_transaction.input.iter().all(|input| input.sequence == 0xffffffff) && funding_transaction.lock_time < 500_000_000 && funding_transaction.lock_time > height + 2 {
 +                      if !funding_transaction.input.iter().all(|input| input.sequence == Sequence::MAX) && LockTime::from(funding_transaction.lock_time).is_block_height() && funding_transaction.lock_time.0 > height + 2 {
                                return Err(APIError::APIMisuseError {
                                        err: "Funding transaction absolute timelock is non-final".to_owned()
                                });
                                if were_node_one == msg_from_node_one {
                                        return Ok(NotifyOption::SkipPersist);
                                } else {
 +                                      log_debug!(self.logger, "Received channel_update for channel {}.", log_bytes!(chan_id));
                                        try_chan_entry!(self, chan.get_mut().channel_update(&msg), channel_state, chan);
                                }
                        },
@@@ -5991,12 -5988,16 +5989,16 @@@ wher
                self.persistence_notifier.wait()
        }
  
+       /// Gets a [`Future`] that completes when a persistable update is available. Note that
+       /// callbacks registered on the [`Future`] MUST NOT call back into this [`ChannelManager`] and
+       /// should instead register actions to be taken later.
+       pub fn get_persistable_update_future(&self) -> Future {
+               self.persistence_notifier.get_future()
+       }
        #[cfg(any(test, feature = "_test_utils"))]
        pub fn get_persistence_condvar_value(&self) -> bool {
-               let mutcond = &self.persistence_notifier.persistence_lock;
-               let &(ref mtx, _) = mutcond;
-               let guard = mtx.lock().unwrap();
-               *guard
+               self.persistence_notifier.notify_pending()
        }
  
        /// Gets the latest best block which was connected either via the [`chain::Listen`] or
@@@ -6238,77 -6239,6 +6240,6 @@@ impl<Signer: Sign, M: Deref , T: Deref 
        }
  }
  
- /// Used to signal to the ChannelManager persister that the manager needs to be re-persisted to
- /// disk/backups, through `await_persistable_update_timeout` and `await_persistable_update`.
- struct PersistenceNotifier {
-       /// Users won't access the persistence_lock directly, but rather wait on its bool using
-       /// `wait_timeout` and `wait`.
-       persistence_lock: (Mutex<bool>, Condvar),
- }
- impl PersistenceNotifier {
-       fn new() -> Self {
-               Self {
-                       persistence_lock: (Mutex::new(false), Condvar::new()),
-               }
-       }
-       fn wait(&self) {
-               loop {
-                       let &(ref mtx, ref cvar) = &self.persistence_lock;
-                       let mut guard = mtx.lock().unwrap();
-                       if *guard {
-                               *guard = false;
-                               return;
-                       }
-                       guard = cvar.wait(guard).unwrap();
-                       let result = *guard;
-                       if result {
-                               *guard = false;
-                               return
-                       }
-               }
-       }
-       #[cfg(any(test, feature = "std"))]
-       fn wait_timeout(&self, max_wait: Duration) -> bool {
-               let current_time = Instant::now();
-               loop {
-                       let &(ref mtx, ref cvar) = &self.persistence_lock;
-                       let mut guard = mtx.lock().unwrap();
-                       if *guard {
-                               *guard = false;
-                               return true;
-                       }
-                       guard = cvar.wait_timeout(guard, max_wait).unwrap().0;
-                       // Due to spurious wakeups that can happen on `wait_timeout`, here we need to check if the
-                       // desired wait time has actually passed, and if not then restart the loop with a reduced wait
-                       // time. Note that this logic can be highly simplified through the use of
-                       // `Condvar::wait_while` and `Condvar::wait_timeout_while`, if and when our MSRV is raised to
-                       // 1.42.0.
-                       let elapsed = current_time.elapsed();
-                       let result = *guard;
-                       if result || elapsed >= max_wait {
-                               *guard = false;
-                               return result;
-                       }
-                       match max_wait.checked_sub(elapsed) {
-                               None => return result,
-                               Some(_) => continue
-                       }
-               }
-       }
-       // Signal to the ChannelManager persister that there are updates necessitating persisting to disk.
-       fn notify(&self) {
-               let &(ref persist_mtx, ref cnd) = &self.persistence_lock;
-               let mut persistence_lock = persist_mtx.lock().unwrap();
-               *persistence_lock = true;
-               mem::drop(persistence_lock);
-               cnd.notify_all();
-       }
- }
  const SERIALIZATION_VERSION: u8 = 1;
  const MIN_SERIALIZATION_VERSION: u8 = 1;
  
@@@ -7317,7 -7247,7 +7248,7 @@@ impl<'a, Signer: Sign, M: Deref, T: Der
                        pending_events: Mutex::new(pending_events_read),
                        pending_background_events: Mutex::new(pending_background_events_read),
                        total_consistency_lock: RwLock::new(()),
-                       persistence_notifier: PersistenceNotifier::new(),
+                       persistence_notifier: Notifier::new(),
  
                        keys_manager: args.keys_manager,
                        logger: args.logger,
@@@ -7356,54 -7286,6 +7287,6 @@@ mod tests 
        use util::test_utils;
        use chain::keysinterface::KeysInterface;
  
-       #[cfg(feature = "std")]
-       #[test]
-       fn test_wait_timeout() {
-               use ln::channelmanager::PersistenceNotifier;
-               use sync::Arc;
-               use core::sync::atomic::AtomicBool;
-               use std::thread;
-               let persistence_notifier = Arc::new(PersistenceNotifier::new());
-               let thread_notifier = Arc::clone(&persistence_notifier);
-               let exit_thread = Arc::new(AtomicBool::new(false));
-               let exit_thread_clone = exit_thread.clone();
-               thread::spawn(move || {
-                       loop {
-                               let &(ref persist_mtx, ref cnd) = &thread_notifier.persistence_lock;
-                               let mut persistence_lock = persist_mtx.lock().unwrap();
-                               *persistence_lock = true;
-                               cnd.notify_all();
-                               if exit_thread_clone.load(Ordering::SeqCst) {
-                                       break
-                               }
-                       }
-               });
-               // Check that we can block indefinitely until updates are available.
-               let _ = persistence_notifier.wait();
-               // Check that the PersistenceNotifier will return after the given duration if updates are
-               // available.
-               loop {
-                       if persistence_notifier.wait_timeout(Duration::from_millis(100)) {
-                               break
-                       }
-               }
-               exit_thread.store(true, Ordering::SeqCst);
-               // Check that the PersistenceNotifier will return after the given duration even if no updates
-               // are available.
-               loop {
-                       if !persistence_notifier.wait_timeout(Duration::from_millis(100)) {
-                               break
-                       }
-               }
-       }
        #[test]
        fn test_notify_limits() {
                // Check that a few cases which don't require the persistence of a new ChannelManager,
@@@ -7975,7 -7857,7 +7858,7 @@@ pub mod bench 
  
        use bitcoin::hashes::Hash;
        use bitcoin::hashes::sha256::Hash as Sha256;
 -      use bitcoin::{Block, BlockHeader, Transaction, TxOut};
 +      use bitcoin::{Block, BlockHeader, PackedLockTime, Transaction, TxMerkleNode, TxOut};
  
        use sync::{Arc, Mutex};
  
  
                let tx;
                if let Event::FundingGenerationReady { temporary_channel_id, output_script, .. } = get_event!(node_a_holder, Event::FundingGenerationReady) {
 -                      tx = Transaction { version: 2, lock_time: 0, input: Vec::new(), output: vec![TxOut {
 +                      tx = Transaction { version: 2, lock_time: PackedLockTime::ZERO, input: Vec::new(), output: vec![TxOut {
                                value: 8_000_000, script_pubkey: output_script,
                        }]};
                        node_a.funding_transaction_generated(&temporary_channel_id, &node_b.get_our_node_id(), tx.clone()).unwrap();
                assert_eq!(&tx_broadcaster.txn_broadcasted.lock().unwrap()[..], &[tx.clone()]);
  
                let block = Block {
 -                      header: BlockHeader { version: 0x20000000, prev_blockhash: genesis_hash, merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 },
 +                      header: BlockHeader { version: 0x20000000, prev_blockhash: genesis_hash, merkle_root: TxMerkleNode::all_zeros(), time: 42, bits: 42, nonce: 42 },
                        txdata: vec![tx],
                };
                Listen::block_connected(&node_a, &block, 1);