Implement the ability to block on multiple futures at once
authorMatt Corallo <git@bluematt.me>
Fri, 31 Mar 2023 05:13:35 +0000 (05:13 +0000)
committerMatt Corallo <git@bluematt.me>
Mon, 3 Apr 2023 16:49:54 +0000 (16:49 +0000)
In the next commits we'll be adding a second notify pipeline - from
the `ChainMonitor` back to the background processor. This will
cause the `background-processor` to need to await multiple wakers
at once, which we cannot do in the current scheme without taking on
a full async runtime.

Building a multi-future waiter isn't so bad, and notably will allow
us to remove the existing sleep pipeline entirely, reducing the
complexity of our wakers implementation by only having one notify
path to consider.

lightning/src/util/wakers.rs

index 82e2dfc335ce95c2723cbba059a6c0d052bdd6a7..834721fd4aa3abf207e70405774f29958f743bf4 100644 (file)
 
 use alloc::sync::Arc;
 use core::mem;
-use crate::sync::{Condvar, Mutex, MutexGuard};
+use crate::sync::{Condvar, Mutex};
 
 use crate::prelude::*;
 
 #[cfg(any(test, feature = "std"))]
-use std::time::{Duration, Instant};
+use std::time::Duration;
 
 use core::future::Future as StdFuture;
 use core::task::{Context, Poll};
@@ -30,75 +30,22 @@ use core::pin::Pin;
 /// Used to signal to one of many waiters that the condition they're waiting on has happened.
 pub(crate) struct Notifier {
        notify_pending: Mutex<(bool, Option<Arc<Mutex<FutureState>>>)>,
-       condvar: Condvar,
-}
-
-macro_rules! check_woken {
-       ($guard: expr, $retval: expr) => { {
-               if $guard.0 {
-                       $guard.0 = false;
-                       if $guard.1.as_ref().map(|l| l.lock().unwrap().complete).unwrap_or(false) {
-                               // If we're about to return as woken, and the future state is marked complete, wipe
-                               // the future state and let the next future wait until we get a new notify.
-                               $guard.1.take();
-                       }
-                       return $retval;
-               }
-       } }
 }
 
 impl Notifier {
        pub(crate) fn new() -> Self {
                Self {
                        notify_pending: Mutex::new((false, None)),
-                       condvar: Condvar::new(),
-               }
-       }
-
-       fn propagate_future_state_to_notify_flag(&self) -> MutexGuard<(bool, Option<Arc<Mutex<FutureState>>>)> {
-               let mut lock = self.notify_pending.lock().unwrap();
-               if let Some(existing_state) = &lock.1 {
-                       if existing_state.lock().unwrap().callbacks_made {
-                               // If the existing `FutureState` has completed and actually made callbacks,
-                               // consider the notification flag to have been cleared and reset the future state.
-                               lock.1.take();
-                               lock.0 = false;
-                       }
                }
-               lock
        }
 
        pub(crate) fn wait(&self) {
-               loop {
-                       let mut guard = self.propagate_future_state_to_notify_flag();
-                       check_woken!(guard, ());
-                       guard = self.condvar.wait(guard).unwrap();
-                       check_woken!(guard, ());
-               }
+               Sleeper::from_single_future(self.get_future()).wait();
        }
 
        #[cfg(any(test, feature = "std"))]
        pub(crate) fn wait_timeout(&self, max_wait: Duration) -> bool {
-               let current_time = Instant::now();
-               loop {
-                       let mut guard = self.propagate_future_state_to_notify_flag();
-                       check_woken!(guard, true);
-                       guard = self.condvar.wait_timeout(guard, max_wait).unwrap().0;
-                       check_woken!(guard, true);
-                       // Due to spurious wakeups that can happen on `wait_timeout`, here we need to check if the
-                       // desired wait time has actually passed, and if not then restart the loop with a reduced wait
-                       // time. Note that this logic can be highly simplified through the use of
-                       // `Condvar::wait_while` and `Condvar::wait_timeout_while`, if and when our MSRV is raised to
-                       // 1.42.0.
-                       let elapsed = current_time.elapsed();
-                       if elapsed >= max_wait {
-                               return false;
-                       }
-                       match max_wait.checked_sub(elapsed) {
-                               None => return false,
-                               Some(_) => continue
-                       }
-               }
+               Sleeper::from_single_future(self.get_future()).wait_timeout(max_wait)
        }
 
        /// Wake waiters, tracking that wake needs to occur even if there are currently no waiters.
@@ -111,13 +58,19 @@ impl Notifier {
                        }
                }
                lock.0 = true;
-               mem::drop(lock);
-               self.condvar.notify_all();
        }
 
        /// Gets a [`Future`] that will get woken up with any waiters
        pub(crate) fn get_future(&self) -> Future {
-               let mut lock = self.propagate_future_state_to_notify_flag();
+               let mut lock = self.notify_pending.lock().unwrap();
+               if let Some(existing_state) = &lock.1 {
+                       if existing_state.lock().unwrap().callbacks_made {
+                               // If the existing `FutureState` has completed and actually made callbacks,
+                               // consider the notification flag to have been cleared and reset the future state.
+                               lock.1.take();
+                               lock.0 = false;
+                       }
+               }
                if let Some(existing_state) = &lock.1 {
                        Future { state: Arc::clone(&existing_state) }
                } else {
@@ -182,6 +135,9 @@ impl FutureState {
 }
 
 /// A simple future which can complete once, and calls some callback(s) when it does so.
+///
+/// Clones can be made and all futures cloned from the same source will complete at the same time.
+#[derive(Clone)]
 pub struct Future {
        state: Arc<Mutex<FutureState>>,
 }
@@ -236,6 +192,77 @@ impl<'a> StdFuture for Future {
        }
 }
 
+/// A struct which can be used to select across many [`Future`]s at once without relying on a full
+/// async context.
+pub struct Sleeper {
+       notifiers: Vec<Arc<Mutex<FutureState>>>,
+}
+
+impl Sleeper {
+       /// Constructs a new sleeper from one future, allowing blocking on it.
+       pub fn from_single_future(future: Future) -> Self {
+               Self { notifiers: vec![future.state] }
+       }
+       /// Constructs a new sleeper from two futures, allowing blocking on both at once.
+       // Note that this is the common case - a ChannelManager and ChainMonitor.
+       pub fn from_two_futures(fut_a: Future, fut_b: Future) -> Self {
+               Self { notifiers: vec![fut_a.state, fut_b.state] }
+       }
+       /// Constructs a new sleeper on many futures, allowing blocking on all at once.
+       pub fn new(futures: Vec<Future>) -> Self {
+               Self { notifiers: futures.into_iter().map(|f| f.state).collect() }
+       }
+       /// Prepares to go into a wait loop body, creating a condition variable which we can block on
+       /// and an `Arc<Mutex<Option<_>>>` which gets set to the waking `Future`'s state prior to the
+       /// condition variable being woken.
+       fn setup_wait(&self) -> (Arc<Condvar>, Arc<Mutex<Option<Arc<Mutex<FutureState>>>>>) {
+               let cv = Arc::new(Condvar::new());
+               let notified_fut_mtx = Arc::new(Mutex::new(None));
+               {
+                       for notifier_mtx in self.notifiers.iter() {
+                               let cv_ref = Arc::clone(&cv);
+                               let notified_fut_ref = Arc::clone(&notified_fut_mtx);
+                               let notifier_ref = Arc::clone(&notifier_mtx);
+                               let mut notifier = notifier_mtx.lock().unwrap();
+                               if notifier.complete {
+                                       *notified_fut_mtx.lock().unwrap() = Some(notifier_ref);
+                                       break;
+                               }
+                               notifier.callbacks.push((false, Box::new(move || {
+                                       *notified_fut_ref.lock().unwrap() = Some(Arc::clone(&notifier_ref));
+                                       cv_ref.notify_all();
+                               })));
+                       }
+               }
+               (cv, notified_fut_mtx)
+       }
+
+       /// Wait until one of the [`Future`]s registered with this [`Sleeper`] has completed.
+       pub fn wait(&self) {
+               let (cv, notified_fut_mtx) = self.setup_wait();
+               let notified_fut = cv.wait_while(notified_fut_mtx.lock().unwrap(), |fut_opt| fut_opt.is_none())
+                       .unwrap().take().expect("CV wait shouldn't have returned until the notifying future was set");
+               notified_fut.lock().unwrap().callbacks_made = true;
+       }
+
+       /// Wait until one of the [`Future`]s registered with this [`Sleeper`] has completed or the
+       /// given amount of time has elapsed. Returns true if a [`Future`] completed, false if the time
+       /// elapsed.
+       #[cfg(any(test, feature = "std"))]
+       pub fn wait_timeout(&self, max_wait: Duration) -> bool {
+               let (cv, notified_fut_mtx) = self.setup_wait();
+               let notified_fut =
+                       match cv.wait_timeout_while(notified_fut_mtx.lock().unwrap(), max_wait, |fut_opt| fut_opt.is_none()) {
+                               Ok((_, e)) if e.timed_out() => return false,
+                               Ok((mut notified_fut, _)) =>
+                                       notified_fut.take().expect("CV wait shouldn't have returned until the notifying future was set"),
+                               Err(_) => panic!("Previous panic while a lock was held led to a lock panic"),
+                       };
+               notified_fut.lock().unwrap().callbacks_made = true;
+               true
+       }
+}
+
 #[cfg(test)]
 mod tests {
        use super::*;
@@ -334,10 +361,7 @@ mod tests {
                let exit_thread_clone = exit_thread.clone();
                thread::spawn(move || {
                        loop {
-                               let mut lock = thread_notifier.notify_pending.lock().unwrap();
-                               lock.0 = true;
-                               thread_notifier.condvar.notify_all();
-
+                               thread_notifier.notify();
                                if exit_thread_clone.load(Ordering::SeqCst) {
                                        break
                                }
@@ -539,4 +563,66 @@ mod tests {
                assert!(woken.load(Ordering::SeqCst));
                assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Ready(()));
        }
+
+       #[test]
+       fn test_multi_future_sleep() {
+               // Tests the `Sleeper` with multiple futures.
+               let notifier_a = Notifier::new();
+               let notifier_b = Notifier::new();
+
+               // Set both notifiers as woken without sleeping yet.
+               notifier_a.notify();
+               notifier_b.notify();
+               Sleeper::from_two_futures(notifier_a.get_future(), notifier_b.get_future()).wait();
+
+               // One future has woken us up, but the other should still have a pending notification.
+               Sleeper::from_two_futures(notifier_a.get_future(), notifier_b.get_future()).wait();
+
+               // However once we've slept twice, we should no longer have any pending notifications
+               assert!(!Sleeper::from_two_futures(notifier_a.get_future(), notifier_b.get_future())
+                       .wait_timeout(Duration::from_millis(10)));
+
+               // Test ordering somewhat more.
+               notifier_a.notify();
+               Sleeper::from_two_futures(notifier_a.get_future(), notifier_b.get_future()).wait();
+       }
+
+       #[test]
+       #[cfg(feature = "std")]
+       fn sleeper_with_pending_callbacks() {
+               // This is similar to the above `test_multi_future_sleep` test, but in addition registers
+               // "normal" callbacks which will cause the futures to assume notification has occurred,
+               // rather than waiting for a woken sleeper.
+               let notifier_a = Notifier::new();
+               let notifier_b = Notifier::new();
+
+               // Set both notifiers as woken without sleeping yet.
+               notifier_a.notify();
+               notifier_b.notify();
+
+               // After sleeping one future (not guaranteed which one, however) will have its notification
+               // bit cleared.
+               Sleeper::from_two_futures(notifier_a.get_future(), notifier_b.get_future()).wait();
+
+               // By registering a callback on the futures for both notifiers, one will complete
+               // immediately, but one will remain tied to the notifier, and will complete once the
+               // notifier is next woken, which will be considered the completion of the notification.
+               let callback_a = Arc::new(AtomicBool::new(false));
+               let callback_b = Arc::new(AtomicBool::new(false));
+               let callback_a_ref = Arc::clone(&callback_a);
+               let callback_b_ref = Arc::clone(&callback_b);
+               notifier_a.get_future().register_callback(Box::new(move || assert!(!callback_a_ref.fetch_or(true, Ordering::SeqCst))));
+               notifier_b.get_future().register_callback(Box::new(move || assert!(!callback_b_ref.fetch_or(true, Ordering::SeqCst))));
+               assert!(callback_a.load(Ordering::SeqCst) ^ callback_b.load(Ordering::SeqCst));
+
+               // If we now notify both notifiers again, the other callback will fire, completing the
+               // notification, and we'll be back to one pending notification.
+               notifier_a.notify();
+               notifier_b.notify();
+
+               assert!(callback_a.load(Ordering::SeqCst) && callback_b.load(Ordering::SeqCst));
+               Sleeper::from_two_futures(notifier_a.get_future(), notifier_b.get_future()).wait();
+               assert!(!Sleeper::from_two_futures(notifier_a.get_future(), notifier_b.get_future())
+                       .wait_timeout(Duration::from_millis(10)));
+       }
 }