From efcb5e02dc5bbdb92e917234336ce37a204e1d57 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Thu, 9 Mar 2023 02:48:17 +0000 Subject: [PATCH] Move the pub `wait` methods from `ChannelManager` to `Future` Rather than having three ways to await a `ChannelManager` being persistable, this moves to just exposing the awaitable `Future` and having sleep functions on that. --- lightning-background-processor/src/lib.rs | 2 +- lightning/src/ln/channelmanager.rs | 59 +++++++---------------- lightning/src/util/wakers.rs | 36 ++++++++------ 3 files changed, 39 insertions(+), 58 deletions(-) diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index 884a7c226..cbff019a4 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -643,7 +643,7 @@ impl BackgroundProcessor { define_run_body!(persister, chain_monitor, chain_monitor.process_pending_events(&event_handler), channel_manager, channel_manager.process_pending_events(&event_handler), gossip_sync, peer_manager, logger, scorer, stop_thread.load(Ordering::Acquire), - channel_manager.await_persistable_update_timeout(Duration::from_millis(100)), + channel_manager.get_persistable_update_future().wait_timeout(Duration::from_millis(100)), |_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur) }); Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) } diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index d591788f6..7aca81bc0 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -6147,34 +6147,11 @@ where } } - /// Blocks until ChannelManager needs to be persisted or a timeout is reached. It returns a bool - /// indicating whether persistence is necessary. Only one listener on - /// [`await_persistable_update`], [`await_persistable_update_timeout`], or a future returned by - /// [`get_persistable_update_future`] is guaranteed to be woken up. + /// Gets a [`Future`] that completes when this [`ChannelManager`] needs to be persisted. /// - /// Note that this method is not available with the `no-std` feature. + /// Note that callbacks registered on the [`Future`] MUST NOT call back into this + /// [`ChannelManager`] and should instead register actions to be taken later. /// - /// [`await_persistable_update`]: Self::await_persistable_update - /// [`await_persistable_update_timeout`]: Self::await_persistable_update_timeout - /// [`get_persistable_update_future`]: Self::get_persistable_update_future - #[cfg(any(test, feature = "std"))] - pub fn await_persistable_update_timeout(&self, max_wait: Duration) -> bool { - self.persistence_notifier.wait_timeout(max_wait) - } - - /// Blocks until ChannelManager needs to be persisted. Only one listener on - /// [`await_persistable_update`], `await_persistable_update_timeout`, or a future returned by - /// [`get_persistable_update_future`] is guaranteed to be woken up. - /// - /// [`await_persistable_update`]: Self::await_persistable_update - /// [`get_persistable_update_future`]: Self::get_persistable_update_future - pub fn await_persistable_update(&self) { - self.persistence_notifier.wait() - } - - /// Gets a [`Future`] that completes when a persistable update is available. Note that - /// callbacks registered on the [`Future`] MUST NOT call back into this [`ChannelManager`] and - /// should instead register actions to be taken later. pub fn get_persistable_update_future(&self) -> Future { self.persistence_notifier.get_future() } @@ -7954,9 +7931,9 @@ mod tests { // All nodes start with a persistable update pending as `create_network` connects each node // with all other nodes to make most tests simpler. - assert!(nodes[0].node.await_persistable_update_timeout(Duration::from_millis(1))); - assert!(nodes[1].node.await_persistable_update_timeout(Duration::from_millis(1))); - assert!(nodes[2].node.await_persistable_update_timeout(Duration::from_millis(1))); + assert!(nodes[0].node.get_persistable_update_future().wait_timeout(Duration::from_millis(1))); + assert!(nodes[1].node.get_persistable_update_future().wait_timeout(Duration::from_millis(1))); + assert!(nodes[2].node.get_persistable_update_future().wait_timeout(Duration::from_millis(1))); let mut chan = create_announced_chan_between_nodes(&nodes, 0, 1); @@ -7970,19 +7947,19 @@ mod tests { &nodes[0].node.get_our_node_id()).pop().unwrap(); // The first two nodes (which opened a channel) should now require fresh persistence - assert!(nodes[0].node.await_persistable_update_timeout(Duration::from_millis(1))); - assert!(nodes[1].node.await_persistable_update_timeout(Duration::from_millis(1))); + assert!(nodes[0].node.get_persistable_update_future().wait_timeout(Duration::from_millis(1))); + assert!(nodes[1].node.get_persistable_update_future().wait_timeout(Duration::from_millis(1))); // ... but the last node should not. - assert!(!nodes[2].node.await_persistable_update_timeout(Duration::from_millis(1))); + assert!(!nodes[2].node.get_persistable_update_future().wait_timeout(Duration::from_millis(1))); // After persisting the first two nodes they should no longer need fresh persistence. - assert!(!nodes[0].node.await_persistable_update_timeout(Duration::from_millis(1))); - assert!(!nodes[1].node.await_persistable_update_timeout(Duration::from_millis(1))); + assert!(!nodes[0].node.get_persistable_update_future().wait_timeout(Duration::from_millis(1))); + assert!(!nodes[1].node.get_persistable_update_future().wait_timeout(Duration::from_millis(1))); // Node 3, unrelated to the only channel, shouldn't care if it receives a channel_update // about the channel. nodes[2].node.handle_channel_update(&nodes[1].node.get_our_node_id(), &chan.0); nodes[2].node.handle_channel_update(&nodes[1].node.get_our_node_id(), &chan.1); - assert!(!nodes[2].node.await_persistable_update_timeout(Duration::from_millis(1))); + assert!(!nodes[2].node.get_persistable_update_future().wait_timeout(Duration::from_millis(1))); // The nodes which are a party to the channel should also ignore messages from unrelated // parties. @@ -7990,8 +7967,8 @@ mod tests { nodes[0].node.handle_channel_update(&nodes[2].node.get_our_node_id(), &chan.1); nodes[1].node.handle_channel_update(&nodes[2].node.get_our_node_id(), &chan.0); nodes[1].node.handle_channel_update(&nodes[2].node.get_our_node_id(), &chan.1); - assert!(!nodes[0].node.await_persistable_update_timeout(Duration::from_millis(1))); - assert!(!nodes[1].node.await_persistable_update_timeout(Duration::from_millis(1))); + assert!(!nodes[0].node.get_persistable_update_future().wait_timeout(Duration::from_millis(1))); + assert!(!nodes[1].node.get_persistable_update_future().wait_timeout(Duration::from_millis(1))); // At this point the channel info given by peers should still be the same. assert_eq!(nodes[0].node.list_channels()[0], node_a_chan_info); @@ -8008,8 +7985,8 @@ mod tests { // persisted and that its channel info remains the same. nodes[0].node.handle_channel_update(&nodes[1].node.get_our_node_id(), &as_update); nodes[1].node.handle_channel_update(&nodes[0].node.get_our_node_id(), &bs_update); - assert!(!nodes[0].node.await_persistable_update_timeout(Duration::from_millis(1))); - assert!(!nodes[1].node.await_persistable_update_timeout(Duration::from_millis(1))); + assert!(!nodes[0].node.get_persistable_update_future().wait_timeout(Duration::from_millis(1))); + assert!(!nodes[1].node.get_persistable_update_future().wait_timeout(Duration::from_millis(1))); assert_eq!(nodes[0].node.list_channels()[0], node_a_chan_info); assert_eq!(nodes[1].node.list_channels()[0], node_b_chan_info); @@ -8017,8 +7994,8 @@ mod tests { // the channel info has updated. nodes[0].node.handle_channel_update(&nodes[1].node.get_our_node_id(), &bs_update); nodes[1].node.handle_channel_update(&nodes[0].node.get_our_node_id(), &as_update); - assert!(nodes[0].node.await_persistable_update_timeout(Duration::from_millis(1))); - assert!(nodes[1].node.await_persistable_update_timeout(Duration::from_millis(1))); + assert!(nodes[0].node.get_persistable_update_future().wait_timeout(Duration::from_millis(1))); + assert!(nodes[1].node.get_persistable_update_future().wait_timeout(Duration::from_millis(1))); assert_ne!(nodes[0].node.list_channels()[0], node_a_chan_info); assert_ne!(nodes[1].node.list_channels()[0], node_b_chan_info); } diff --git a/lightning/src/util/wakers.rs b/lightning/src/util/wakers.rs index 834721fd4..0385adc0c 100644 --- a/lightning/src/util/wakers.rs +++ b/lightning/src/util/wakers.rs @@ -39,15 +39,6 @@ impl Notifier { } } - pub(crate) fn wait(&self) { - Sleeper::from_single_future(self.get_future()).wait(); - } - - #[cfg(any(test, feature = "std"))] - pub(crate) fn wait_timeout(&self, max_wait: Duration) -> bool { - Sleeper::from_single_future(self.get_future()).wait_timeout(max_wait) - } - /// Wake waiters, tracking that wake needs to occur even if there are currently no waiters. pub(crate) fn notify(&self) { let mut lock = self.notify_pending.lock().unwrap(); @@ -167,6 +158,19 @@ impl Future { pub fn register_callback_fn(&self, callback: F) { self.register_callback(Box::new(callback)); } + + /// Waits until this [`Future`] completes. + pub fn wait(self) { + Sleeper::from_single_future(self).wait(); + } + + /// Waits until this [`Future`] completes or the given amount of time has elapsed. + /// + /// Returns true if the [`Future`] completed, false if the time elapsed. + #[cfg(any(test, feature = "std"))] + pub fn wait_timeout(self, max_wait: Duration) -> bool { + Sleeper::from_single_future(self).wait_timeout(max_wait) + } } use core::task::Waker; @@ -369,12 +373,12 @@ mod tests { }); // Check that we can block indefinitely until updates are available. - let _ = persistence_notifier.wait(); + let _ = persistence_notifier.get_future().wait(); // Check that the Notifier will return after the given duration if updates are // available. loop { - if persistence_notifier.wait_timeout(Duration::from_millis(100)) { + if persistence_notifier.get_future().wait_timeout(Duration::from_millis(100)) { break } } @@ -384,7 +388,7 @@ mod tests { // Check that the Notifier will return after the given duration even if no updates // are available. loop { - if !persistence_notifier.wait_timeout(Duration::from_millis(100)) { + if !persistence_notifier.get_future().wait_timeout(Duration::from_millis(100)) { break } } @@ -482,8 +486,8 @@ mod tests { // If we get a future and don't touch it we're definitely still notify-required. notifier.get_future(); - assert!(notifier.wait_timeout(Duration::from_millis(1))); - assert!(!notifier.wait_timeout(Duration::from_millis(1))); + assert!(notifier.get_future().wait_timeout(Duration::from_millis(1))); + assert!(!notifier.get_future().wait_timeout(Duration::from_millis(1))); // Even if we poll'd once but didn't observe a `Ready`, we should be notify-required. let mut future = notifier.get_future(); @@ -492,7 +496,7 @@ mod tests { notifier.notify(); assert!(woken.load(Ordering::SeqCst)); - assert!(notifier.wait_timeout(Duration::from_millis(1))); + assert!(notifier.get_future().wait_timeout(Duration::from_millis(1))); // However, once we do poll `Ready` it should wipe the notify-required flag. let mut future = notifier.get_future(); @@ -502,7 +506,7 @@ mod tests { notifier.notify(); assert!(woken.load(Ordering::SeqCst)); assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Ready(())); - assert!(!notifier.wait_timeout(Duration::from_millis(1))); + assert!(!notifier.get_future().wait_timeout(Duration::from_millis(1))); } #[test] -- 2.39.5