From cd03cb647745afc4f886d94cec70528dc3cac6f1 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Thu, 2 Mar 2023 07:50:16 +0000 Subject: [PATCH] Make waking after a future completes propagates to the next future In our `wakers`, if we first `notify` a future, which is then `poll`ed complete, and then `notify` the same waker again before a new future is fetched, that new future will be marked as non-complete initially and wait for a third `notify`. The fix is luckily rather trivial, when we `notify` a future, if it is completed immediately, simply wipe the future state so that we look at the pending-notify flag when we generate the next future. --- lightning/src/util/wakers.rs | 67 ++++++++++++++++++++++++++++++++++-- 1 file changed, 65 insertions(+), 2 deletions(-) diff --git a/lightning/src/util/wakers.rs b/lightning/src/util/wakers.rs index fdbc22f11..f86fc376c 100644 --- a/lightning/src/util/wakers.rs +++ b/lightning/src/util/wakers.rs @@ -105,7 +105,10 @@ impl Notifier { pub(crate) fn notify(&self) { let mut lock = self.notify_pending.lock().unwrap(); if let Some(future_state) = &lock.1 { - future_state.lock().unwrap().complete(); + if future_state.lock().unwrap().complete() { + lock.1 = None; + return; + } } lock.0 = true; mem::drop(lock); @@ -161,12 +164,13 @@ pub(crate) struct FutureState { } impl FutureState { - fn complete(&mut self) { + fn complete(&mut self) -> bool { for (counts_as_call, callback) in self.callbacks.drain(..) { callback.call(); self.callbacks_made |= counts_as_call; } self.complete = true; + self.callbacks_made } } @@ -469,4 +473,63 @@ mod tests { assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Ready(())); assert!(!notifier.wait_timeout(Duration::from_millis(1))); } + + #[test] + fn test_poll_post_notify_completes() { + // Tests that if we have a future state that has completed, and we haven't yet requested a + // new future, if we get a notify prior to requesting that second future it is generated + // pre-completed. + let notifier = Notifier::new(); + + notifier.notify(); + let mut future = notifier.get_future(); + let (woken, waker) = create_waker(); + assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Ready(())); + assert!(!woken.load(Ordering::SeqCst)); + + notifier.notify(); + let mut future = notifier.get_future(); + let (woken, waker) = create_waker(); + assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Ready(())); + assert!(!woken.load(Ordering::SeqCst)); + + let mut future = notifier.get_future(); + let (woken, waker) = create_waker(); + assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Pending); + assert!(!woken.load(Ordering::SeqCst)); + + notifier.notify(); + assert!(woken.load(Ordering::SeqCst)); + assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Ready(())); + } + + #[test] + fn test_poll_post_notify_completes_initial_notified() { + // Identical to the previous test, but the first future completes via a wake rather than an + // immediate `Poll::Ready`. + let notifier = Notifier::new(); + + let mut future = notifier.get_future(); + let (woken, waker) = create_waker(); + assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Pending); + + notifier.notify(); + assert!(woken.load(Ordering::SeqCst)); + assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Ready(())); + + notifier.notify(); + let mut future = notifier.get_future(); + let (woken, waker) = create_waker(); + assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Ready(())); + assert!(!woken.load(Ordering::SeqCst)); + + let mut future = notifier.get_future(); + let (woken, waker) = create_waker(); + assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Pending); + assert!(!woken.load(Ordering::SeqCst)); + + notifier.notify(); + assert!(woken.load(Ordering::SeqCst)); + assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Ready(())); + } } -- 2.39.5