Make waking after a future completes propagates to the next future 2023-03-debug-futures
authorMatt Corallo <git@bluematt.me>
Thu, 2 Mar 2023 07:50:16 +0000 (07:50 +0000)
committerMatt Corallo <git@bluematt.me>
Thu, 2 Mar 2023 07:50:16 +0000 (07:50 +0000)
In our `wakers`, if we first `notify` a future, which is then
`poll`ed complete, and then `notify` the same waker again before a
new future is fetched, that new future will be marked as
non-complete initially and wait for a third `notify`.

The fix is luckily rather trivial, when we `notify` a future, if it
is completed immediately, simply wipe the future state so that we
look at the pending-notify flag when we generate the next future.

lightning/src/util/wakers.rs

index fdbc22f116600b7162bdbdcafa4a6f314af944e1..f86fc376cee0202323b9924057157aaf8379f86a 100644 (file)
@@ -105,7 +105,10 @@ impl Notifier {
        pub(crate) fn notify(&self) {
                let mut lock = self.notify_pending.lock().unwrap();
                if let Some(future_state) = &lock.1 {
-                       future_state.lock().unwrap().complete();
+                       if future_state.lock().unwrap().complete() {
+                               lock.1 = None;
+                               return;
+                       }
                }
                lock.0 = true;
                mem::drop(lock);
@@ -161,12 +164,13 @@ pub(crate) struct FutureState {
 }
 
 impl FutureState {
-       fn complete(&mut self) {
+       fn complete(&mut self) -> bool {
                for (counts_as_call, callback) in self.callbacks.drain(..) {
                        callback.call();
                        self.callbacks_made |= counts_as_call;
                }
                self.complete = true;
+               self.callbacks_made
        }
 }
 
@@ -469,4 +473,63 @@ mod tests {
                assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Ready(()));
                assert!(!notifier.wait_timeout(Duration::from_millis(1)));
        }
+
+       #[test]
+       fn test_poll_post_notify_completes() {
+               // Tests that if we have a future state that has completed, and we haven't yet requested a
+               // new future, if we get a notify prior to requesting that second future it is generated
+               // pre-completed.
+               let notifier = Notifier::new();
+
+               notifier.notify();
+               let mut future = notifier.get_future();
+               let (woken, waker) = create_waker();
+               assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Ready(()));
+               assert!(!woken.load(Ordering::SeqCst));
+
+               notifier.notify();
+               let mut future = notifier.get_future();
+               let (woken, waker) = create_waker();
+               assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Ready(()));
+               assert!(!woken.load(Ordering::SeqCst));
+
+               let mut future = notifier.get_future();
+               let (woken, waker) = create_waker();
+               assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Pending);
+               assert!(!woken.load(Ordering::SeqCst));
+
+               notifier.notify();
+               assert!(woken.load(Ordering::SeqCst));
+               assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Ready(()));
+       }
+
+       #[test]
+       fn test_poll_post_notify_completes_initial_notified() {
+               // Identical to the previous test, but the first future completes via a wake rather than an
+               // immediate `Poll::Ready`.
+               let notifier = Notifier::new();
+
+               let mut future = notifier.get_future();
+               let (woken, waker) = create_waker();
+               assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Pending);
+
+               notifier.notify();
+               assert!(woken.load(Ordering::SeqCst));
+               assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Ready(()));
+
+               notifier.notify();
+               let mut future = notifier.get_future();
+               let (woken, waker) = create_waker();
+               assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Ready(()));
+               assert!(!woken.load(Ordering::SeqCst));
+
+               let mut future = notifier.get_future();
+               let (woken, waker) = create_waker();
+               assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Pending);
+               assert!(!woken.load(Ordering::SeqCst));
+
+               notifier.notify();
+               assert!(woken.load(Ordering::SeqCst));
+               assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Ready(()));
+       }
 }