Fix persistence-required futures always completing instantly 2022-11-future-wake-fix
authorMatt Corallo <git@bluematt.me>
Thu, 10 Nov 2022 00:37:01 +0000 (00:37 +0000)
committerMatt Corallo <git@bluematt.me>
Fri, 11 Nov 2022 02:03:52 +0000 (02:03 +0000)
After the first persistence-required `Future` wakeup, we'll always
complete additional futures instantly as we don't clear the
"need wake" bit. Instead, we need to just assume that if a future
was generated (and not immediately drop'd) that its sufficient to
notify the user.

lightning/src/ln/channelmanager.rs
lightning/src/util/wakers.rs

index e3128d15f531ebfb94a6caf1bc386e57380190d3..8e279f78a3a6f4b72efc5500ac77dd266bbb5043 100644 (file)
@@ -5916,18 +5916,25 @@ where
 
        /// Blocks until ChannelManager needs to be persisted or a timeout is reached. It returns a bool
        /// indicating whether persistence is necessary. Only one listener on
-       /// `await_persistable_update` or `await_persistable_update_timeout` is guaranteed to be woken
-       /// up.
+       /// [`await_persistable_update`], [`await_persistable_update_timeout`], or a future returned by
+       /// [`get_persistable_update_future`] is guaranteed to be woken up.
        ///
        /// Note that this method is not available with the `no-std` feature.
+       ///
+       /// [`await_persistable_update`]: Self::await_persistable_update
+       /// [`await_persistable_update_timeout`]: Self::await_persistable_update_timeout
+       /// [`get_persistable_update_future`]: Self::get_persistable_update_future
        #[cfg(any(test, feature = "std"))]
        pub fn await_persistable_update_timeout(&self, max_wait: Duration) -> bool {
                self.persistence_notifier.wait_timeout(max_wait)
        }
 
        /// Blocks until ChannelManager needs to be persisted. Only one listener on
-       /// `await_persistable_update` or `await_persistable_update_timeout` is guaranteed to be woken
-       /// up.
+       /// [`await_persistable_update`], `await_persistable_update_timeout`, or a future returned by
+       /// [`get_persistable_update_future`] is guaranteed to be woken up.
+       ///
+       /// [`await_persistable_update`]: Self::await_persistable_update
+       /// [`get_persistable_update_future`]: Self::get_persistable_update_future
        pub fn await_persistable_update(&self) {
                self.persistence_notifier.wait()
        }
index 8eb6c25c6189b6aa27dcc62266dc9a40b709f689..655fc9cf7ed5db8f6bfbbc9285eb3c6bef94f037 100644 (file)
@@ -88,10 +88,19 @@ impl Notifier {
        /// Wake waiters, tracking that wake needs to occur even if there are currently no waiters.
        pub(crate) fn notify(&self) {
                let mut lock = self.notify_pending.lock().unwrap();
-               lock.0 = true;
+               let mut future_probably_generated_calls = false;
                if let Some(future_state) = lock.1.take() {
-                       future_state.lock().unwrap().complete();
+                       future_probably_generated_calls |= future_state.lock().unwrap().complete();
+                       future_probably_generated_calls |= Arc::strong_count(&future_state) > 1;
+               }
+               if future_probably_generated_calls {
+                       // If a future made some callbacks or has not yet been drop'd (i.e. the state has more
+                       // than the one reference we hold), assume the user was notified and skip setting the
+                       // notification-required flag. This will not cause the `wait` functions above to return
+                       // and avoid any future `Future`s starting in a completed state.
+                       return;
                }
+               lock.0 = true;
                mem::drop(lock);
                self.condvar.notify_all();
        }
@@ -147,11 +156,14 @@ pub(crate) struct FutureState {
 }
 
 impl FutureState {
-       fn complete(&mut self) {
+       fn complete(&mut self) -> bool {
+               let mut made_calls = false;
                for callback in self.callbacks.drain(..) {
                        callback.call();
+                       made_calls = true;
                }
                self.complete = true;
+               made_calls
        }
 }
 
@@ -231,6 +243,48 @@ mod tests {
                assert!(callback.load(Ordering::SeqCst));
        }
 
+       #[test]
+       fn notifier_future_completes_wake() {
+               // Previously, if we were only using the `Future` interface to learn when a `Notifier` has
+               // been notified, we'd never mark the notifier as not-awaiting-notify. This caused the
+               // `lightning-background-processor` to persist in a tight loop.
+               let notifier = Notifier::new();
+
+               // First check the simple case, ensuring if we get notified a new future isn't woken until
+               // a second `notify`.
+               let callback = Arc::new(AtomicBool::new(false));
+               let callback_ref = Arc::clone(&callback);
+               notifier.get_future().register_callback(Box::new(move || assert!(!callback_ref.fetch_or(true, Ordering::SeqCst))));
+               assert!(!callback.load(Ordering::SeqCst));
+
+               notifier.notify();
+               assert!(callback.load(Ordering::SeqCst));
+
+               let callback = Arc::new(AtomicBool::new(false));
+               let callback_ref = Arc::clone(&callback);
+               notifier.get_future().register_callback(Box::new(move || assert!(!callback_ref.fetch_or(true, Ordering::SeqCst))));
+               assert!(!callback.load(Ordering::SeqCst));
+
+               notifier.notify();
+               assert!(callback.load(Ordering::SeqCst));
+
+               // Then check the case where the future is fetched before the notification, but a callback
+               // is only registered after the `notify`, ensuring that it is still sufficient to ensure we
+               // don't get an instant-wake when we get a new future.
+               let future = notifier.get_future();
+               notifier.notify();
+
+               let callback = Arc::new(AtomicBool::new(false));
+               let callback_ref = Arc::clone(&callback);
+               future.register_callback(Box::new(move || assert!(!callback_ref.fetch_or(true, Ordering::SeqCst))));
+               assert!(callback.load(Ordering::SeqCst));
+
+               let callback = Arc::new(AtomicBool::new(false));
+               let callback_ref = Arc::clone(&callback);
+               notifier.get_future().register_callback(Box::new(move || assert!(!callback_ref.fetch_or(true, Ordering::SeqCst))));
+               assert!(!callback.load(Ordering::SeqCst));
+       }
+
        #[cfg(feature = "std")]
        #[test]
        fn test_wait_timeout() {