Fix persistence-required futures always completing instantly
[rust-lightning] / lightning / src / util / wakers.rs
index 9636466aaa2f1d750cb14b3c630aa56e4c764e2f..655fc9cf7ed5db8f6bfbbc9285eb3c6bef94f037 100644 (file)
 
 use alloc::sync::Arc;
 use core::mem;
-use core::time::Duration;
-use sync::{Condvar, Mutex};
+use crate::sync::{Condvar, Mutex};
+
+use crate::prelude::*;
 
 #[cfg(any(test, feature = "std"))]
-use std::time::Instant;
+use std::time::{Duration, Instant};
 
 use core::future::Future as StdFuture;
 use core::task::{Context, Poll};
 use core::pin::Pin;
 
-use prelude::*;
 
 /// Used to signal to one of many waiters that the condition they're waiting on has happened.
 pub(crate) struct Notifier {
@@ -88,10 +88,19 @@ impl Notifier {
        /// Wake waiters, tracking that wake needs to occur even if there are currently no waiters.
        pub(crate) fn notify(&self) {
                let mut lock = self.notify_pending.lock().unwrap();
-               lock.0 = true;
+               let mut future_probably_generated_calls = false;
                if let Some(future_state) = lock.1.take() {
-                       future_state.lock().unwrap().complete();
+                       future_probably_generated_calls |= future_state.lock().unwrap().complete();
+                       future_probably_generated_calls |= Arc::strong_count(&future_state) > 1;
+               }
+               if future_probably_generated_calls {
+                       // If a future made some callbacks or has not yet been drop'd (i.e. the state has more
+                       // than the one reference we hold), assume the user was notified and skip setting the
+                       // notification-required flag. This will not cause the `wait` functions above to return
+                       // and avoid any future `Future`s starting in a completed state.
+                       return;
                }
+               lock.0 = true;
                mem::drop(lock);
                self.condvar.notify_all();
        }
@@ -103,7 +112,7 @@ impl Notifier {
                        Future {
                                state: Arc::new(Mutex::new(FutureState {
                                        callbacks: Vec::new(),
-                                       complete: false,
+                                       complete: true,
                                }))
                        }
                } else if let Some(existing_state) = &lock.1 {
@@ -147,11 +156,14 @@ pub(crate) struct FutureState {
 }
 
 impl FutureState {
-       fn complete(&mut self) {
+       fn complete(&mut self) -> bool {
+               let mut made_calls = false;
                for callback in self.callbacks.drain(..) {
                        callback.call();
+                       made_calls = true;
                }
                self.complete = true;
+               made_calls
        }
 }
 
@@ -163,6 +175,8 @@ pub struct Future {
 impl Future {
        /// Registers a callback to be called upon completion of this future. If the future has already
        /// completed, the callback will be called immediately.
+       ///
+       /// (C-not exported) use the bindings-only `register_callback_fn` instead
        pub fn register_callback(&self, callback: Box<dyn FutureCallback>) {
                let mut state = self.state.lock().unwrap();
                if state.complete {
@@ -172,6 +186,16 @@ impl Future {
                        state.callbacks.push(callback);
                }
        }
+
+       // C bindings don't (currently) know how to map `Box<dyn Trait>`, and while it could add the
+       // following wrapper, doing it in the bindings is currently much more work than simply doing it
+       // here.
+       /// Registers a callback to be called upon completion of this future. If the future has already
+       /// completed, the callback will be called immediately.
+       #[cfg(c_bindings)]
+       pub fn register_callback_fn<F: 'static + FutureCallback>(&self, callback: F) {
+               self.register_callback(Box::new(callback));
+       }
 }
 
 mod std_future {
@@ -205,10 +229,66 @@ mod tests {
        use core::future::Future as FutureTrait;
        use core::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
 
+       #[test]
+       fn notifier_pre_notified_future() {
+               // Previously, if we generated a future after a `Notifier` had been notified, the future
+               // would never complete. This tests this behavior, ensuring the future instead completes
+               // immediately.
+               let notifier = Notifier::new();
+               notifier.notify();
+
+               let callback = Arc::new(AtomicBool::new(false));
+               let callback_ref = Arc::clone(&callback);
+               notifier.get_future().register_callback(Box::new(move || assert!(!callback_ref.fetch_or(true, Ordering::SeqCst))));
+               assert!(callback.load(Ordering::SeqCst));
+       }
+
+       #[test]
+       fn notifier_future_completes_wake() {
+               // Previously, if we were only using the `Future` interface to learn when a `Notifier` has
+               // been notified, we'd never mark the notifier as not-awaiting-notify. This caused the
+               // `lightning-background-processor` to persist in a tight loop.
+               let notifier = Notifier::new();
+
+               // First check the simple case, ensuring if we get notified a new future isn't woken until
+               // a second `notify`.
+               let callback = Arc::new(AtomicBool::new(false));
+               let callback_ref = Arc::clone(&callback);
+               notifier.get_future().register_callback(Box::new(move || assert!(!callback_ref.fetch_or(true, Ordering::SeqCst))));
+               assert!(!callback.load(Ordering::SeqCst));
+
+               notifier.notify();
+               assert!(callback.load(Ordering::SeqCst));
+
+               let callback = Arc::new(AtomicBool::new(false));
+               let callback_ref = Arc::clone(&callback);
+               notifier.get_future().register_callback(Box::new(move || assert!(!callback_ref.fetch_or(true, Ordering::SeqCst))));
+               assert!(!callback.load(Ordering::SeqCst));
+
+               notifier.notify();
+               assert!(callback.load(Ordering::SeqCst));
+
+               // Then check the case where the future is fetched before the notification, but a callback
+               // is only registered after the `notify`, ensuring that it is still sufficient to ensure we
+               // don't get an instant-wake when we get a new future.
+               let future = notifier.get_future();
+               notifier.notify();
+
+               let callback = Arc::new(AtomicBool::new(false));
+               let callback_ref = Arc::clone(&callback);
+               future.register_callback(Box::new(move || assert!(!callback_ref.fetch_or(true, Ordering::SeqCst))));
+               assert!(callback.load(Ordering::SeqCst));
+
+               let callback = Arc::new(AtomicBool::new(false));
+               let callback_ref = Arc::clone(&callback);
+               notifier.get_future().register_callback(Box::new(move || assert!(!callback_ref.fetch_or(true, Ordering::SeqCst))));
+               assert!(!callback.load(Ordering::SeqCst));
+       }
+
        #[cfg(feature = "std")]
        #[test]
        fn test_wait_timeout() {
-               use sync::Arc;
+               use crate::sync::Arc;
                use std::thread;
 
                let persistence_notifier = Arc::new(Notifier::new());
@@ -292,7 +372,7 @@ mod tests {
        // waker, which we do here with a trivial Arc<AtomicBool> data element to track woke-ness.
        const WAKER_V_TABLE: RawWakerVTable = RawWakerVTable::new(waker_clone, wake, wake_by_ref, drop);
        unsafe fn wake_by_ref(ptr: *const ()) { let p = ptr as *const Arc<AtomicBool>; assert!(!(*p).fetch_or(true, Ordering::SeqCst)); }
-       unsafe fn drop(ptr: *const ()) { let p = ptr as *mut Arc<AtomicBool>; Box::from_raw(p); }
+       unsafe fn drop(ptr: *const ()) { let p = ptr as *mut Arc<AtomicBool>; let _freed = Box::from_raw(p); }
        unsafe fn wake(ptr: *const ()) { wake_by_ref(ptr); drop(ptr); }
        unsafe fn waker_clone(ptr: *const ()) -> RawWaker {
                let p = ptr as *const Arc<AtomicBool>;